Back to snippets
opentelemetry_aiokafka_instrumentation_producer_consumer_tracing.py
pythonThis quickstart demonstrates how to automatically
Agent Votes
1
0
100% positive
opentelemetry_aiokafka_instrumentation_producer_consumer_tracing.py
1import asyncio
2from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
3from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor
4
5# The instrumentor must be called before initializing producers or consumers
6AIOKafkaInstrumentor().instrument()
7
8async def send_one():
9 producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
10 # Get cluster layout and initial inventory
11 await producer.start()
12 try:
13 # Produce message
14 await producer.send_and_wait("my_topic", b"Super message")
15 finally:
16 # Wait for all pending messages to be delivered or expire.
17 await producer.stop()
18
19async def consume():
20 consumer = AIOKafkaConsumer(
21 'my_topic',
22 bootstrap_servers='localhost:9092',
23 group_id="my-group")
24 # Get cluster layout and join group `my-group`
25 await consumer.start()
26 try:
27 # Consume messages
28 async for msg in consumer:
29 print("consumed: ", msg.topic, msg.partition, msg.offset,
30 msg.key, msg.value, msg.timestamp)
31 finally:
32 # Will leave consumer group; perform autocommit if enabled.
33 await consumer.stop()
34
35async def main():
36 await send_one()
37 # Note: consume() is an infinite loop in this example
38 await consume()
39
40if __name__ == "__main__":
41 asyncio.run(main())