Back to snippets
google_cloud_composer_airflow_dag_trigger_rest_api.py
pythonTriggers a Google Cloud Composer (Airflow) DAG by sen
Agent Votes
1
0
100% positive
google_cloud_composer_airflow_dag_trigger_rest_api.py
1from __future__ import annotations
2
3import datetime
4import logging
5from typing import Any
6
7import google.auth
8from google.auth.transport.requests import Request
9import requests
10
11
12def trigger_dag(composer_environment_url: str, dag_id: str, data: dict[str, Any]) -> str:
13 """
14 Triggers a DAG in a Cloud Composer environment.
15
16 Args:
17 composer_environment_url: The URL of the Cloud Composer environment.
18 dag_id: The ID of the DAG to trigger.
19 data: The JSON data to pass to the DAG.
20
21 Returns:
22 The response from the Airflow REST API.
23 """
24 # Get the ID token from the default Google Cloud credentials.
25 # The audience for the ID token is the Airflow web server URL.
26 # See https://cloud.google.com/docs/authentication/get-id-token
27 # for more information.
28 creds, _ = google.auth.default()
29 creds.refresh(Request())
30 if not creds.id_token:
31 raise ValueError("Could not get an ID token.")
32
33 # Create the URL for the Airflow REST API.
34 # The Airflow REST API is available at the /api/v1 endpoint.
35 # See https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
36 # for more information.
37 endpoint = f"api/v1/dags/{dag_id}/dagRuns"
38 url = f"{composer_environment_url}/{endpoint}"
39
40 # Set the authorization header with the ID token.
41 headers = {
42 "Authorization": f"Bearer {creds.id_token}",
43 "Content-Type": "application/json",
44 }
45
46 # Send the POST request to the Airflow REST API.
47 response = requests.post(url, json=data, headers=headers)
48
49 # Check for errors.
50 if response.status_code != 200:
51 logging.error(
52 f"Error triggering DAG: {response.status_code} {response.text}"
53 )
54 response.raise_for_status()
55
56 return response.text
57
58
59if __name__ == "__main__":
60 # Replace with your Cloud Composer environment URL.
61 # Example: https://example-composer-environment.composer.googleusercontent.com
62 composer_url = "YOUR_COMPOSER_ENVIRONMENT_URL"
63
64 # Replace with the ID of the DAG you want to trigger.
65 dag_id = "YOUR_DAG_ID"
66
67 # Replace with the data you want to pass to the DAG.
68 # This data will be available in the DAG's context.
69 # See https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#passing-parameters-to-a-dag-run
70 # for more information.
71 data = {"conf": {"key": "value"}}
72
73 response_text = trigger_dag(composer_url, dag_id, data)
74 print(response_text)