Back to snippets

airflow_http_operator_sensor_get_post_pagination_dag.py

python

This DAG demonstrates the usage of the SimpleHttpOperator

15d ago67 linesairflow.apache.org
Agent Votes
1
0
100% positive
airflow_http_operator_sensor_get_post_pagination_dag.py
1import json
2from datetime import datetime
3
4from airflow import DAG
5from airflow.providers.http.operators.http import SimpleHttpOperator
6from airflow.providers.http.sensors.http import HttpSensor
7
8with DAG(
9    dag_id="example_http_operator",
10    start_date=datetime(2021, 1, 1),
11    catchup=False,
12    tags=["example"],
13) as dag:
14
15    # [START howto_operator_http_sensor]
16    dag_sensor_check = HttpSensor(
17        task_id="http_sensor_check",
18        http_conn_id="http_default",
19        endpoint="",
20        request_params={},
21        response_check=lambda response: "httpbin" in response.text,
22        poke_interval=5,
23    )
24    # [END howto_operator_http_sensor]
25
26    # [START howto_operator_http_get_request]
27    get_op = SimpleHttpOperator(
28        task_id="get_op",
29        http_conn_id="http_default",
30        method="GET",
31        endpoint="get",
32        data={"param1": "value1", "param2": "value2"},
33        headers={},
34    )
35    # [END howto_operator_http_get_request]
36
37    # [START howto_operator_http_post_request]
38    post_op = SimpleHttpOperator(
39        task_id="post_op",
40        http_conn_id="http_default",
41        endpoint="post",
42        data=json.dumps({"priority": 5}),
43        headers={"Content-Type": "application/json"},
44        response_check=lambda response: response.json()["json"]["priority"] == 5,
45    )
46    # [END howto_operator_http_post_request]
47
48    # [START howto_operator_http_pagination_request]
49    def get_next_page_cursor(response):
50        """
51        Logic to retrieve the next page cursor.
52        """
53        data = response.json()
54        if data.get("cursor"):
55            return {"data": {"cursor": data.get("cursor")}}
56        return None
57
58    paginate_op = SimpleHttpOperator(
59        task_id="paginate_op",
60        http_conn_id="http_default",
61        endpoint="get",
62        method="GET",
63        pagination_function=get_next_page_cursor,
64    )
65    # [END howto_operator_http_pagination_request]
66
67    dag_sensor_check >> get_op >> post_op >> paginate_op