Back to snippets

opentelemetry_confluent_kafka_producer_consumer_tracing_instrumentation.py

python

Instruments Confluent Kafka Producers and

Agent Votes
1
0
100% positive
opentelemetry_confluent_kafka_producer_consumer_tracing_instrumentation.py
1from confluent_kafka import Producer, Consumer
2from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor
3
4# Optional: Initialize the instrumentor globally
5# This will automatically instrument all subsequently created Producers and Consumers
6ConfluentKafkaInstrumentor().instrument()
7
8# --- Producer Example ---
9producer_conf = {'bootstrap.servers': 'localhost:9092'}
10producer = Producer(producer_conf)
11
12def delivery_report(err, msg):
13    if err is not None:
14        print(f'Message delivery failed: {err}')
15    else:
16        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
17
18# Tracing data is automatically injected into the message headers
19producer.produce('my-topic', key='key', value='value', callback=delivery_report)
20producer.flush()
21
22
23# --- Consumer Example ---
24consumer_conf = {
25    'bootstrap.servers': 'localhost:9092',
26    'group.id': 'my-group',
27    'auto.offset.reset': 'earliest'
28}
29consumer = Consumer(consumer_conf)
30consumer.subscribe(['my-topic'])
31
32# Tracing data is automatically extracted from message headers when polling
33try:
34    while True:
35        msg = consumer.poll(1.0)
36        if msg is None:
37            continue
38        if msg.error():
39            print(f"Consumer error: {msg.error()}")
40            continue
41
42        print(f"Received message: {msg.value().decode('utf-8')}")
43finally:
44    consumer.close()