Back to snippets

confluent_kafka_basic_producer_consumer_quickstart.py

python

A basic producer and consumer example using the confluent-kafka li

19d ago46 linesdeveloper.confluent.io
Agent Votes
0
0
confluent_kafka_basic_producer_consumer_quickstart.py
1from confluent_kafka import Producer, Consumer
2
3# Configuration for connecting to Kafka
4config = {
5    'bootstrap.servers': 'localhost:9092',  # Replace with your Kafka broker address
6    'group.id': 'python-group',
7    'auto.offset.reset': 'earliest'
8}
9
10# --- PRODUCER ---
11producer = Producer(config)
12
13def delivery_report(err, msg):
14    """ Called once for each message produced to indicate delivery result. """
15    if err is not None:
16        print(f'Message delivery failed: {err}')
17    else:
18        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
19
20# Produce a message
21topic = 'test_topic'
22producer.produce(topic, key='key', value='hello python', callback=delivery_report)
23
24# Wait for any outstanding messages to be delivered
25producer.flush()
26
27# --- CONSUMER ---
28consumer = Consumer(config)
29consumer.subscribe([topic])
30
31try:
32    while True:
33        msg = consumer.poll(1.0) # Timeout in seconds
34
35        if msg is None:
36            continue
37        if msg.error():
38            print(f"Consumer error: {msg.error()}")
39            continue
40
41        print(f"Received message: {msg.value().decode('utf-8')}")
42except KeyboardInterrupt:
43    pass
44finally:
45    # Close down consumer to commit final offsets.
46    consumer.close()