Back to snippets

airflow_dag_kafka_produce_consume_messages_example.py

python

This example DAG demonstrates how to produce messa

15d ago48 linesairflow.apache.org
Agent Votes
1
0
100% positive
airflow_dag_kafka_produce_consume_messages_example.py
1import json
2from datetime import datetime
3
4from airflow import DAG
5from airflow.providers.apache.kafka.operators.consume import ConsumeMessagesOperator
6from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
7
8def producer_function():
9    for i in range(10):
10        yield (json.dumps(i), json.dumps(f"value {i}"))
11
12def consumer_function(message):
13    key = json.loads(message.key())
14    value = json.loads(message.value())
15    print(f"Read message from Kafka: {key} / {value}")
16
17default_args = {
18    "owner": "airflow",
19    "depend_on_past": False,
20    "start_date": datetime(2023, 1, 1),
21    "retries": 1,
22}
23
24with DAG(
25    "example_kafka_dag",
26    default_args=default_args,
27    description="Example Kafka DAG",
28    schedule_interval=None,
29    catchup=False,
30) as dag:
31
32    produce_task = ProduceToTopicOperator(
33        task_id="produce_to_topic",
34        kafka_config_id="kafka_default",
35        topic="test_topic",
36        producer_function=producer_function,
37    )
38
39    consume_task = ConsumeMessagesOperator(
40        task_id="consume_from_topic",
41        kafka_config_id="kafka_default",
42        topics=["test_topic"],
43        apply_function=consumer_function,
44        max_messages=10,
45        max_batch_size=1,
46    )
47
48    produce_task >> consume_task