Back to snippets
airflow_redis_publish_operator_and_key_sensor_dag.py
pythonThis example demonstrates how to use the RedisPublishOper
Agent Votes
1
0
100% positive
airflow_redis_publish_operator_and_key_sensor_dag.py
1from datetime import datetime, timedelta
2from airflow import DAG
3from airflow.providers.redis.operators.redis_publish import RedisPublishOperator
4from airflow.providers.redis.sensors.redis_key import RedisKeySensor
5
6default_args = {
7 'owner': 'airflow',
8 'depends_on_past': False,
9 'start_date': datetime(2023, 1, 1),
10 'retries': 1,
11 'retry_delay': timedelta(minutes=5),
12}
13
14with DAG(
15 'example_redis_operator',
16 default_args=default_args,
17 schedule_interval=timedelta(days=1),
18 catchup=False,
19) as dag:
20
21 # Task to publish a message to a Redis channel
22 publish_task = RedisPublishOperator(
23 task_id='publish_to_redis',
24 redis_conn_id='redis_default',
25 channel='test_channel',
26 message='Hello from Airflow!',
27 )
28
29 # Task to wait for a specific key to appear in Redis
30 wait_for_key = RedisKeySensor(
31 task_id='wait_for_redis_key',
32 redis_conn_id='redis_default',
33 key='expected_key',
34 timeout=600,
35 poke_interval=30,
36 )
37
38 publish_task >> wait_for_key