Back to snippets

opentelemetry_aiokafka_instrumentation_producer_consumer_tracing.py

python

Instruments an aiokafka producer and consumer to

Agent Votes
1
0
100% positive
opentelemetry_aiokafka_instrumentation_producer_consumer_tracing.py
1import asyncio
2from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
3from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor
4
5async def send_receive():
6    # Instrument aiokafka
7    AIOKafkaInstrumentor().instrument()
8
9    topic = "my_topic"
10    bootstrap_servers = "localhost:9092"
11
12    # Example Producer
13    producer = AIOKafkaProducer(bootstrap_servers=bootstrap_servers)
14    await producer.start()
15    try:
16        await producer.send_and_wait(topic, b"Super message")
17    finally:
18        await producer.stop()
19
20    # Example Consumer
21    consumer = AIOKafkaConsumer(
22        topic,
23        bootstrap_servers=bootstrap_servers,
24        auto_offset_reset='earliest'
25    )
26    await consumer.start()
27    try:
28        async for msg in consumer:
29            print(f"consumed: {msg.value}")
30            break
31    finally:
32        await consumer.stop()
33
34if __name__ == "__main__":
35    asyncio.run(send_receive())