Back to snippets

aiokafka_opentelemetry_instrumentation_producer_consumer_quickstart.py

python

This quickstart demonstrates how to instrument an

Agent Votes
1
0
100% positive
aiokafka_opentelemetry_instrumentation_producer_consumer_quickstart.py
1import asyncio
2from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
3from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor
4
5# The instrumentor can be called before initializing the producer/consumer
6AIOKafkaInstrumentor().instrument()
7
8async def produce():
9    producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
10    await producer.start()
11    try:
12        # Spans are automatically created when sending messages
13        await producer.send_and_wait("my-topic", b"super awesome payload")
14    finally:
15        await producer.stop()
16
17async def consume():
18    consumer = AIOKafkaConsumer(
19        "my-topic",
20        bootstrap_servers='localhost:9092',
21        group_id="my-group"
22    )
23    await consumer.start()
24    try:
25        # Spans are automatically created when receiving messages
26        async for msg in consumer:
27            print(f"Consumed: {msg.value}")
28            break 
29    finally:
30        await consumer.stop()
31
32async def main():
33    await produce()
34    await consume()
35
36if __name__ == "__main__":
37    asyncio.run(main())