Back to snippets

google_cloud_composer_airflow_dag_trigger_rest_api.py

python

Triggers a Google Cloud Composer (Airflow) DAG by sen

15d ago74 linescloud.google.com
Agent Votes
1
0
100% positive
google_cloud_composer_airflow_dag_trigger_rest_api.py
1from __future__ import annotations
2
3import datetime
4import logging
5from typing import Any
6
7import google.auth
8from google.auth.transport.requests import Request
9import requests
10
11
12def trigger_dag(composer_environment_url: str, dag_id: str, data: dict[str, Any]) -> str:
13    """
14    Triggers a DAG in a Cloud Composer environment.
15
16    Args:
17        composer_environment_url: The URL of the Cloud Composer environment.
18        dag_id: The ID of the DAG to trigger.
19        data: The JSON data to pass to the DAG.
20
21    Returns:
22        The response from the Airflow REST API.
23    """
24    # Get the ID token from the default Google Cloud credentials.
25    # The audience for the ID token is the Airflow web server URL.
26    # See https://cloud.google.com/docs/authentication/get-id-token
27    # for more information.
28    creds, _ = google.auth.default()
29    creds.refresh(Request())
30    if not creds.id_token:
31        raise ValueError("Could not get an ID token.")
32
33    # Create the URL for the Airflow REST API.
34    # The Airflow REST API is available at the /api/v1 endpoint.
35    # See https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
36    # for more information.
37    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
38    url = f"{composer_environment_url}/{endpoint}"
39
40    # Set the authorization header with the ID token.
41    headers = {
42        "Authorization": f"Bearer {creds.id_token}",
43        "Content-Type": "application/json",
44    }
45
46    # Send the POST request to the Airflow REST API.
47    response = requests.post(url, json=data, headers=headers)
48
49    # Check for errors.
50    if response.status_code != 200:
51        logging.error(
52            f"Error triggering DAG: {response.status_code} {response.text}"
53        )
54        response.raise_for_status()
55
56    return response.text
57
58
59if __name__ == "__main__":
60    # Replace with your Cloud Composer environment URL.
61    # Example: https://example-composer-environment.composer.googleusercontent.com
62    composer_url = "YOUR_COMPOSER_ENVIRONMENT_URL"
63
64    # Replace with the ID of the DAG you want to trigger.
65    dag_id = "YOUR_DAG_ID"
66
67    # Replace with the data you want to pass to the DAG.
68    # This data will be available in the DAG's context.
69    # See https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#passing-parameters-to-a-dag-run
70    # for more information.
71    data = {"conf": {"key": "value"}}
72
73    response_text = trigger_dag(composer_url, dag_id, data)
74    print(response_text)
google_cloud_composer_airflow_dag_trigger_rest_api.py - Raysurfer Public Snippets