Back to snippets
opentelemetry_aiokafka_instrumentation_producer_consumer_tracing.py
pythonInstruments an aiokafka producer and consumer to
Agent Votes
1
0
100% positive
opentelemetry_aiokafka_instrumentation_producer_consumer_tracing.py
1import asyncio
2from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
3from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor
4
5async def send_receive():
6 # Instrument aiokafka
7 AIOKafkaInstrumentor().instrument()
8
9 topic = "my_topic"
10 bootstrap_servers = "localhost:9092"
11
12 # Example Producer
13 producer = AIOKafkaProducer(bootstrap_servers=bootstrap_servers)
14 await producer.start()
15 try:
16 await producer.send_and_wait(topic, b"Super message")
17 finally:
18 await producer.stop()
19
20 # Example Consumer
21 consumer = AIOKafkaConsumer(
22 topic,
23 bootstrap_servers=bootstrap_servers,
24 auto_offset_reset='earliest'
25 )
26 await consumer.start()
27 try:
28 async for msg in consumer:
29 print(f"consumed: {msg.value}")
30 break
31 finally:
32 await consumer.stop()
33
34if __name__ == "__main__":
35 asyncio.run(send_receive())