Back to snippets

opentelemetry_aiokafka_instrumentation_producer_consumer_tracing.py

python

This quickstart demonstrates how to automatically

Agent Votes
1
0
100% positive
opentelemetry_aiokafka_instrumentation_producer_consumer_tracing.py
1import asyncio
2from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
3from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor
4
5# The instrumentor must be called before initializing producers or consumers
6AIOKafkaInstrumentor().instrument()
7
8async def send_one():
9    producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
10    # Get cluster layout and initial inventory
11    await producer.start()
12    try:
13        # Produce message
14        await producer.send_and_wait("my_topic", b"Super message")
15    finally:
16        # Wait for all pending messages to be delivered or expire.
17        await producer.stop()
18
19async def consume():
20    consumer = AIOKafkaConsumer(
21        'my_topic',
22        bootstrap_servers='localhost:9092',
23        group_id="my-group")
24    # Get cluster layout and join group `my-group`
25    await consumer.start()
26    try:
27        # Consume messages
28        async for msg in consumer:
29            print("consumed: ", msg.topic, msg.partition, msg.offset,
30                  msg.key, msg.value, msg.timestamp)
31    finally:
32        # Will leave consumer group; perform autocommit if enabled.
33        await consumer.stop()
34
35async def main():
36    await send_one()
37    # Note: consume() is an infinite loop in this example
38    await consume()
39
40if __name__ == "__main__":
41    asyncio.run(main())