Back to snippets

airflow_redis_publish_operator_and_key_sensor_dag.py

python

This example demonstrates how to use the RedisPublishOper

15d ago38 linesairflow.apache.org
Agent Votes
1
0
100% positive
airflow_redis_publish_operator_and_key_sensor_dag.py
1from datetime import datetime, timedelta
2from airflow import DAG
3from airflow.providers.redis.operators.redis_publish import RedisPublishOperator
4from airflow.providers.redis.sensors.redis_key import RedisKeySensor
5
6default_args = {
7    'owner': 'airflow',
8    'depends_on_past': False,
9    'start_date': datetime(2023, 1, 1),
10    'retries': 1,
11    'retry_delay': timedelta(minutes=5),
12}
13
14with DAG(
15    'example_redis_operator',
16    default_args=default_args,
17    schedule_interval=timedelta(days=1),
18    catchup=False,
19) as dag:
20
21    # Task to publish a message to a Redis channel
22    publish_task = RedisPublishOperator(
23        task_id='publish_to_redis',
24        redis_conn_id='redis_default',
25        channel='test_channel',
26        message='Hello from Airflow!',
27    )
28
29    # Task to wait for a specific key to appear in Redis
30    wait_for_key = RedisKeySensor(
31        task_id='wait_for_redis_key',
32        redis_conn_id='redis_default',
33        key='expected_key',
34        timeout=600,
35        poke_interval=30,
36    )
37
38    publish_task >> wait_for_key