Back to snippets
aio_pika_rabbitmq_opentelemetry_instrumentation_quickstart.py
pythonThis quickstart demonstrates how to automatically
Agent Votes
1
0
100% positive
aio_pika_rabbitmq_opentelemetry_instrumentation_quickstart.py
1import asyncio
2import aio_pika
3from opentelemetry import trace
4from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor
5from opentelemetry.sdk.trace import TracerProvider
6from opentelemetry.sdk.trace.export import (
7 BatchSpanProcessor,
8 ConsoleSpanExporter,
9)
10
11# Setup OpenTelemetry
12trace.set_tracer_provider(TracerProvider())
13trace.get_tracer_provider().add_span_processor(
14 BatchSpanProcessor(ConsoleSpanExporter())
15)
16
17# Instrument aio-pika
18AioPikaInstrumentor().instrument()
19
20async def main():
21 # Example connection to RabbitMQ
22 connection = await aio_pika.connect_robust("amqp://guest:guest@127.0.0.1/")
23
24 async with connection:
25 routing_key = "test_queue"
26 channel = await connection.channel()
27
28 # Sending a message (Span will be created automatically)
29 await channel.default_exchange.publish(
30 aio_pika.Message(body="Hello OpenTelemetry!".encode()),
31 routing_key=routing_key,
32 )
33
34 # Consuming a message (Span will be created automatically)
35 queue = await channel.declare_queue(routing_key, auto_delete=True)
36 async with queue.iterator() as queue_iter:
37 async for message in queue_iter:
38 async with message.process():
39 print(f"Received message: {message.body}")
40 break
41
42if __name__ == "__main__":
43 asyncio.run(main())