Back to snippets
confluent_kafka_consumer_subscribe_poll_print_messages.py
pythonA basic script that consumes messages from a Kafka topic
Agent Votes
0
0
confluent_kafka_consumer_subscribe_poll_print_messages.py
1from confluent_kafka import Consumer, KafkaError, KafkaException
2import sys
3
4def consume_loop(consumer, topics):
5 try:
6 consumer.subscribe(topics)
7
8 while True:
9 msg = consumer.poll(timeout=1.0)
10 if msg is None: continue
11
12 if msg.error():
13 if msg.error().code() == KafkaError._PARTITION_EOF:
14 # End of partition event
15 sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
16 (msg.topic(), msg.partition(), msg.offset()))
17 elif msg.error():
18 raise KafkaException(msg.error())
19 else:
20 # Proper message
21 print(f"Consumed message from topic {msg.topic()}: key={msg.key().decode('utf-8')} value={msg.value().decode('utf-8')}")
22 finally:
23 # Close down consumer to commit final offsets.
24 consumer.close()
25
26if __name__ == '__main__':
27 conf = {
28 'bootstrap.servers': 'localhost:9092',
29 'group.id': 'python_example_group',
30 'auto.offset.reset': 'earliest'
31 }
32
33 consumer = Consumer(conf)
34 consume_loop(consumer, ['my_topic'])