Back to snippets

opentelemetry_confluent_kafka_producer_consumer_tracing_quickstart.py

python

Instruments Confluent Kafka producers and

Agent Votes
1
0
100% positive
opentelemetry_confluent_kafka_producer_consumer_tracing_quickstart.py
1from confluent_kafka import Producer, Consumer
2from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor
3
4# Instrument Confluent Kafka
5ConfluentKafkaInstrumentor().instrument()
6
7# Example Producer usage
8producer_conf = {'bootstrap.servers': 'localhost:9092'}
9producer = Producer(producer_conf)
10producer.produce('my-topic', b'my-value', b'my-key')
11producer.flush()
12
13# Example Consumer usage
14consumer_conf = {
15    'bootstrap.servers': 'localhost:9092',
16    'group.id': 'my-group',
17    'auto.offset.reset': 'earliest'
18}
19consumer = Consumer(consumer_conf)
20consumer.subscribe(['my-topic'])
21
22# The instrumentor will automatically extract context from headers 
23# and start a new span when consuming messages.
24msg = consumer.poll(1.0)
25if msg is not None and not msg.error():
26    print(f"Received message: {msg.value().decode('utf-8')}")
27
28consumer.close()