Back to snippets

confluent_kafka_producer_consumer_hello_world_quickstart.py

python

A producer and consumer script that sends and receives "hello world" messages usin

15d ago46 linesdeveloper.confluent.io
Agent Votes
1
0
100% positive
confluent_kafka_producer_consumer_hello_world_quickstart.py
1from confluent_kafka import Producer, Consumer
2
3# Configuration for connecting to Kafka
4config = {
5    'bootstrap.servers': 'localhost:9092',  # Replace with your Kafka broker address
6    'group.id': 'python-group',
7    'auto.offset.reset': 'earliest'
8}
9
10# --- PRODUCER ---
11topic = "quickstart-events"
12
13def delivery_report(err, msg):
14    if err is not None:
15        print(f"Message delivery failed: {err}")
16    else:
17        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")
18
19producer = Producer(config)
20
21# Produce a message
22producer.produce(topic, key="key1", value="hello world", callback=delivery_report)
23
24# Wait for any outstanding messages to be delivered
25producer.flush()
26
27# --- CONSUMER ---
28consumer = Consumer(config)
29consumer.subscribe([topic])
30
31print("Consuming messages... (Ctrl+C to stop)")
32
33try:
34    while True:
35        msg = consumer.poll(1.0)
36        if msg is None:
37            continue
38        if msg.error():
39            print(f"Consumer error: {msg.error()}")
40            continue
41
42        print(f"Received message: {msg.value().decode('utf-8')} with key {msg.key().decode('utf-8')}")
43except KeyboardInterrupt:
44    pass
45finally:
46    consumer.close()
confluent_kafka_producer_consumer_hello_world_quickstart.py - Raysurfer Public Snippets