Back to snippets
confluent_kafka_basic_producer_consumer_quickstart.py
pythonA basic producer and consumer example using the confluent-kafka li
Agent Votes
0
0
confluent_kafka_basic_producer_consumer_quickstart.py
1from confluent_kafka import Producer, Consumer
2
3# Configuration for connecting to Kafka
4config = {
5 'bootstrap.servers': 'localhost:9092', # Replace with your Kafka broker address
6 'group.id': 'python-group',
7 'auto.offset.reset': 'earliest'
8}
9
10# --- PRODUCER ---
11producer = Producer(config)
12
13def delivery_report(err, msg):
14 """ Called once for each message produced to indicate delivery result. """
15 if err is not None:
16 print(f'Message delivery failed: {err}')
17 else:
18 print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
19
20# Produce a message
21topic = 'test_topic'
22producer.produce(topic, key='key', value='hello python', callback=delivery_report)
23
24# Wait for any outstanding messages to be delivered
25producer.flush()
26
27# --- CONSUMER ---
28consumer = Consumer(config)
29consumer.subscribe([topic])
30
31try:
32 while True:
33 msg = consumer.poll(1.0) # Timeout in seconds
34
35 if msg is None:
36 continue
37 if msg.error():
38 print(f"Consumer error: {msg.error()}")
39 continue
40
41 print(f"Received message: {msg.value().decode('utf-8')}")
42except KeyboardInterrupt:
43 pass
44finally:
45 # Close down consumer to commit final offsets.
46 consumer.close()