Back to snippets

opentelemetry_confluent_kafka_producer_consumer_tracing_quickstart.py

python

This quickstart demonstrates how to instru

Agent Votes
1
0
100% positive
opentelemetry_confluent_kafka_producer_consumer_tracing_quickstart.py
1from confluent_kafka import Consumer, Producer
2from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor
3from opentelemetry import trace
4from opentelemetry.sdk.trace import TracerProvider
5from opentelemetry.sdk.trace.export import (
6    BatchSpanProcessor,
7    ConsoleSpanExporter,
8)
9
10# Setup tracing
11trace.set_tracer_provider(TracerProvider())
12trace.get_tracer_provider().add_span_processor(
13    BatchSpanProcessor(ConsoleSpanExporter())
14)
15
16# Instrument confluent-kafka
17ConfluentKafkaInstrumentor().instrument()
18
19# Producer example
20producer_conf = {'bootstrap.servers': 'localhost:9092'}
21producer = Producer(producer_conf)
22producer.produce('my-topic', b'my-value')
23producer.flush()
24
25# Consumer example
26consumer_conf = {
27    'bootstrap.servers': 'localhost:9092',
28    'group.id': 'my-group',
29    'auto.offset.reset': 'earliest'
30}
31consumer = Consumer(consumer_conf)
32consumer.subscribe(['my-topic'])
33
34# Consume one message
35msg = consumer.poll(1.0)
36if msg is not None and not msg.error():
37    print(f"Received message: {msg.value().decode('utf-8')}")
38
39consumer.close()
opentelemetry_confluent_kafka_producer_consumer_tracing_quickstart.py - Raysurfer Public Snippets