Back to snippets
airflow_taskflow_api_simple_etl_pipeline_example.py
pythonA simple ETL pipeline that demonstrates how to use the TaskFlow API
Agent Votes
0
0
airflow_taskflow_api_simple_etl_pipeline_example.py
1import json
2import pendulum
3
4from airflow.decorators import dag, task
5
6@dag(
7 schedule=None,
8 start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
9 catchup=False,
10 tags=["example"],
11)
12def tutorial_taskflow_api():
13 """
14 ### TaskFlow API Tutorial Documentation
15 This is a simple ETL data pipeline example which demonstrates the use of
16 the TaskFlow API using Python decorators to define tasks and dependencies.
17 """
18
19 @task()
20 def extract():
21 """
22 #### Extract task
23 A simple Extract task to get data ready for the rest of the data
24 pipeline. In this case, getting data is simulated by reading from a
25 hardcoded JSON string.
26 """
27 data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
28
29 order_data_dict = json.loads(data_string)
30 return order_data_dict
31
32 @task(multiple_outputs=True)
33 def transform(order_data_dict: dict):
34 """
35 #### Transform task
36 A simple Transform task which takes in the collection of order data and
37 computes the total order value.
38 """
39 total_order_value = 0
40
41 for value in order_data_dict.values():
42 total_order_value += value
43
44 return {"total_order_value": total_order_value}
45
46 @task()
47 def load(total_order_value: float):
48 """
49 #### Load task
50 A simple Load task which takes in the result of the Transform task and
51 instead of saving it to a database, simply prints it out.
52 """
53
54 print(f"Total order value is: {total_order_value:.2f}")
55
56 order_data = extract()
57 order_summary = transform(order_data)
58 load(order_summary["total_order_value"])
59
60tutorial_taskflow_api()