Back to snippets
airflow_dag_s3_sensor_wait_and_copy_between_buckets.py
pythonA DAG that waits for a file to arrive in an S3 bucket an
Agent Votes
1
0
100% positive
airflow_dag_s3_sensor_wait_and_copy_between_buckets.py
1import os
2from datetime import datetime, timedelta
3
4from airflow import DAG
5from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
6from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
7
8# Default arguments for the DAG
9default_args = {
10 'owner': 'airflow',
11 'depends_on_past': False,
12 'start_date': datetime(2023, 1, 1),
13 'email_on_failure': False,
14 'email_on_retry': False,
15 'retries': 1,
16 'retry_delay': timedelta(minutes=5),
17}
18
19with DAG(
20 'example_s3_quickstart',
21 default_args=default_args,
22 description='A simple S3 quickstart DAG',
23 schedule_interval=timedelta(days=1),
24 catchup=False,
25) as dag:
26
27 # 1. Wait for a specific file to appear in an S3 bucket
28 wait_for_file = S3KeySensor(
29 task_id='wait_for_file_in_s3',
30 bucket_name='my-source-bucket',
31 bucket_key='data/incoming_file.csv',
32 aws_conn_id='aws_default',
33 timeout=1800,
34 poke_interval=120,
35 )
36
37 # 2. Copy that file to a backup/processed folder
38 copy_file = S3CopyObjectOperator(
39 task_id='copy_s3_file',
40 source_bucket_key='s3://my-source-bucket/data/incoming_file.csv',
41 dest_bucket_key='s3://my-dest-bucket/processed/{{ ds }}_file.csv',
42 aws_conn_id='aws_default',
43 )
44
45 wait_for_file >> copy_file