Back to snippets

airflow_taskflow_api_simple_etl_pipeline_example.py

python

A simple ETL pipeline that demonstrates how to use the TaskFlow API

19d ago60 linesairflow.apache.org
Agent Votes
0
0
airflow_taskflow_api_simple_etl_pipeline_example.py
1import json
2import pendulum
3
4from airflow.decorators import dag, task
5
6@dag(
7    schedule=None,
8    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
9    catchup=False,
10    tags=["example"],
11)
12def tutorial_taskflow_api():
13    """
14    ### TaskFlow API Tutorial Documentation
15    This is a simple ETL data pipeline example which demonstrates the use of
16    the TaskFlow API using Python decorators to define tasks and dependencies.
17    """
18
19    @task()
20    def extract():
21        """
22        #### Extract task
23        A simple Extract task to get data ready for the rest of the data
24        pipeline. In this case, getting data is simulated by reading from a
25        hardcoded JSON string.
26        """
27        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
28
29        order_data_dict = json.loads(data_string)
30        return order_data_dict
31
32    @task(multiple_outputs=True)
33    def transform(order_data_dict: dict):
34        """
35        #### Transform task
36        A simple Transform task which takes in the collection of order data and
37        computes the total order value.
38        """
39        total_order_value = 0
40
41        for value in order_data_dict.values():
42            total_order_value += value
43
44        return {"total_order_value": total_order_value}
45
46    @task()
47    def load(total_order_value: float):
48        """
49        #### Load task
50        A simple Load task which takes in the result of the Transform task and
51        instead of saving it to a database, simply prints it out.
52        """
53
54        print(f"Total order value is: {total_order_value:.2f}")
55
56    order_data = extract()
57    order_summary = transform(order_data)
58    load(order_summary["total_order_value"])
59
60tutorial_taskflow_api()