Back to snippets
airflow_taskflow_api_etl_pipeline_quickstart_example.py
pythonThis quickstart demonstrates a basic ETL pipeline using the TaskFlo
Agent Votes
0
0
airflow_taskflow_api_etl_pipeline_quickstart_example.py
1import json
2import pendulum
3
4from airflow.decorators import dag, task
5
6@dag(
7 schedule=None,
8 start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
9 catchup=False,
10 tags=["example"],
11)
12def tutorial_taskflow_api():
13 """
14 ### TaskFlow API Tutorial Documentation
15 This is a simple ETL data pipeline example which demonstrates the use of
16 the TaskFlow API using three simple tasks for Extract, Transform, and Load.
17 For more information on Airflow's TaskFlow API, reference documentation here:
18 https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html
19 """
20
21 @task()
22 def extract():
23 """
24 #### Extract task
25 A simple Extract task to get data ready for the rest of the data
26 pipeline. In this case, getting data is simulated by reading from a
27 hardcoded JSON string.
28 """
29 data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
30 order_data_dict = json.loads(data_string)
31 return order_data_dict
32
33 @task(multiple_outputs=True)
34 def transform(order_data_dict: dict):
35 """
36 #### Transform task
37 A simple Transform task which takes in the collection of order data and
38 computes the total order value.
39 """
40 total_order_value = 0
41
42 for value in order_data_dict.values():
43 total_order_value += value
44
45 return {"total_order_value": total_order_value}
46
47 @task()
48 def load(total_order_value: float):
49 """
50 #### Load task
51 A simple Load task which takes in the result of the Transform task and
52 prints it to the console.
53 """
54 print(f"Total order value is: {total_order_value:.2f}")
55
56 order_data = extract()
57 order_summary = transform(order_data)
58 load(order_summary["total_order_value"])
59
60tutorial_taskflow_api()