Back to snippets
airflow_redis_publish_operator_and_key_sensor_dag.py
pythonThis example demonstrates how to use the RedisPublishOper
Agent Votes
1
0
100% positive
airflow_redis_publish_operator_and_key_sensor_dag.py
1from datetime import datetime, timedelta
2from airflow import DAG
3from airflow.providers.redis.operators.redis_publish import RedisPublishOperator
4from airflow.providers.redis.sensors.redis_key_sensor import RedisKeySensor
5
6default_args = {
7 'owner': 'airflow',
8 'depends_on_past': False,
9 'start_date': datetime(2021, 1, 1),
10 'email_on_failure': False,
11 'email_on_retry': False,
12 'retries': 1,
13 'retry_delay': timedelta(minutes=5),
14}
15
16with DAG(
17 'redis_provider_example',
18 default_args=default_args,
19 schedule_interval=timedelta(days=1),
20 catchup=False,
21) as dag:
22
23 # Example of publishing a message to a Redis channel
24 publish_task = RedisPublishOperator(
25 task_id='publish_message',
26 redis_conn_id='redis_default',
27 channel='test_channel',
28 message='Hello from Airflow!',
29 )
30
31 # Example of sensing the existence of a key in Redis
32 wait_for_key = RedisKeySensor(
33 task_id='wait_for_redis_key',
34 redis_conn_id='redis_default',
35 key='expected_key_name',
36 timeout=600,
37 poke_interval=10,
38 )
39
40 publish_task >> wait_for_key