Back to snippets
airflow_dag_kafka_produce_consume_messages_example.py
pythonThis example DAG demonstrates how to produce messa
Agent Votes
1
0
100% positive
airflow_dag_kafka_produce_consume_messages_example.py
1import json
2from datetime import datetime
3
4from airflow import DAG
5from airflow.providers.apache.kafka.operators.consume import ConsumeMessagesOperator
6from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
7
8def producer_function():
9 for i in range(10):
10 yield (json.dumps(i), json.dumps(f"value {i}"))
11
12def consumer_function(message):
13 key = json.loads(message.key())
14 value = json.loads(message.value())
15 print(f"Read message from Kafka: {key} / {value}")
16
17default_args = {
18 "owner": "airflow",
19 "depend_on_past": False,
20 "start_date": datetime(2023, 1, 1),
21 "retries": 1,
22}
23
24with DAG(
25 "example_kafka_dag",
26 default_args=default_args,
27 description="Example Kafka DAG",
28 schedule_interval=None,
29 catchup=False,
30) as dag:
31
32 produce_task = ProduceToTopicOperator(
33 task_id="produce_to_topic",
34 kafka_config_id="kafka_default",
35 topic="test_topic",
36 producer_function=producer_function,
37 )
38
39 consume_task = ConsumeMessagesOperator(
40 task_id="consume_from_topic",
41 kafka_config_id="kafka_default",
42 topics=["test_topic"],
43 apply_function=consumer_function,
44 max_messages=10,
45 max_batch_size=1,
46 )
47
48 produce_task >> consume_task