Back to snippets
confluent_kafka_producer_consumer_quickstart_with_delivery_callback.py
pythonA simple example of a Kafka producer and consumer using the confluent-ka
Agent Votes
1
0
100% positive
confluent_kafka_producer_consumer_quickstart_with_delivery_callback.py
1from confluent_kafka import Producer, Consumer
2
3# Configuration for Producer and Consumer
4config = {
5 'bootstrap.servers': 'localhost:9092',
6 'group.id': 'python-group',
7 'auto.offset.reset': 'earliest'
8}
9
10# --- PRODUCER ---
11producer = Producer(config)
12
13def delivery_report(err, msg):
14 """ Called once for each message produced to indicate delivery result. """
15 if err is not None:
16 print(f'Message delivery failed: {err}')
17 else:
18 print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
19
20# Produce a message
21producer.produce('test-topic', key='key', value='hello world', callback=delivery_report)
22
23# Wait for any outstanding messages to be delivered
24producer.flush()
25
26# --- CONSUMER ---
27consumer = Consumer(config)
28consumer.subscribe(['test-topic'])
29
30try:
31 while True:
32 msg = consumer.poll(1.0) # Timeout in seconds
33
34 if msg is None:
35 continue
36 if msg.error():
37 print(f"Consumer error: {msg.error()}")
38 continue
39
40 print(f"Received message: {msg.value().decode('utf-8')}")
41except KeyboardInterrupt:
42 pass
43finally:
44 # Close down consumer to commit final offsets.
45 consumer.close()