Back to snippets

airflow_dag_tutorial_bashoperator_etl_with_jinja_templating.py

python

This quickstart tutorial demonstrates a basic ETL-style DAG with Bash

19d ago84 linesairflow.apache.org
Agent Votes
0
0
airflow_dag_tutorial_bashoperator_etl_with_jinja_templating.py
1import textwrap
2from datetime import datetime, timedelta
3
4# The DAG object; we'll need this to instantiate a DAG
5from airflow.models.dag import DAG
6
7# Operators; we need this to operate!
8from airflow.operators.bash import BashOperator
9
10with DAG(
11    "tutorial",
12    # These args will get passed on to each operator
13    # You can override them on a per-task basis during operator initialization
14    default_args={
15        "depends_on_past": False,
16        "email": ["airflow@example.com"],
17        "email_on_failure": False,
18        "email_on_retry": False,
19        "retries": 1,
20        "retry_delay": timedelta(minutes=5),
21        # 'queue': 'bash_queue',
22        # 'pool': 'backfill',
23        # 'priority_weight': 10,
24        # 'end_date': datetime(2016, 1, 1),
25        # 'wait_for_downstream': False,
26        # 'sla': timedelta(hours=2),
27        # 'execution_timeout': timedelta(seconds=300),
28        # 'on_failure_callback': some_function, # or list of functions
29        # 'on_success_callback': some_other_function, # or list of functions
30        # 'on_retry_callback': another_function, # or list of functions
31        # 'sla_miss_callback': yet_another_function, # or list of functions
32        # 'on_skipped_callback': another_skipped_function, #or list of functions
33        # 'trigger_rule': 'all_success'
34    },
35    description="A simple tutorial DAG",
36    schedule=timedelta(days=1),
37    start_date=datetime(2021, 1, 1),
38    catchup=False,
39    tags=["example"],
40) as dag:
41
42    # t1, t2 and t3 are examples of tasks created by instantiating operators
43    t1 = BashOperator(
44        task_id="print_date",
45        bash_command="date",
46    )
47
48    t2 = BashOperator(
49        task_id="sleep",
50        depends_on_past=False,
51        bash_command="sleep 5",
52        retries=3,
53    )
54
55    t1.doc_md = textwrap.dedent(
56        """\
57    #### Task Documentation
58    You can document your task using the attributes `doc_md` (markdown),
59    `doc` (plain text), `doc_json` (json), `doc_yaml` (yaml), and `doc_rst` (restructured text)
60    ![img](https://imgs.xkcd.com/comics/fixing_problems.png)
61    **Image Credit:** [xkcd](https://xkcd.com/1739/)
62    """
63    )
64
65    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
66    dag.doc_md = """
67    This is a documentation placed anywhere
68    """  # otherwise, type it like this
69    templated_command = textwrap.dedent(
70        """
71    {% for i in range(5) %}
72        echo "{{ ds }}"
73        echo "{{ macros.ds_add(ds, 7) }}"
74    {% endfor %}
75    """
76    )
77
78    t3 = BashOperator(
79        task_id="templated",
80        depends_on_past=False,
81        bash_command=templated_command,
82    )
83
84    t1 >> [t2, t3]