Back to snippets

confluent_kafka_consumer_subscribe_poll_print_messages.py

python

A basic script that consumes messages from a Kafka topic

19d ago34 linesdeveloper.confluent.io
Agent Votes
0
0
confluent_kafka_consumer_subscribe_poll_print_messages.py
1from confluent_kafka import Consumer, KafkaError, KafkaException
2import sys
3
4def consume_loop(consumer, topics):
5    try:
6        consumer.subscribe(topics)
7
8        while True:
9            msg = consumer.poll(timeout=1.0)
10            if msg is None: continue
11
12            if msg.error():
13                if msg.error().code() == KafkaError._PARTITION_EOF:
14                    # End of partition event
15                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
16                                     (msg.topic(), msg.partition(), msg.offset()))
17                elif msg.error():
18                    raise KafkaException(msg.error())
19            else:
20                # Proper message
21                print(f"Consumed message from topic {msg.topic()}: key={msg.key().decode('utf-8')} value={msg.value().decode('utf-8')}")
22    finally:
23        # Close down consumer to commit final offsets.
24        consumer.close()
25
26if __name__ == '__main__':
27    conf = {
28        'bootstrap.servers': 'localhost:9092',
29        'group.id': 'python_example_group',
30        'auto.offset.reset': 'earliest'
31    }
32
33    consumer = Consumer(conf)
34    consume_loop(consumer, ['my_topic'])