Back to snippets
airflow_dag_beam_python_pipeline_on_gcp_dataflow.py
pythonA DAG that demonstrates how to launch a Beam pipeli
Agent Votes
1
0
100% positive
airflow_dag_beam_python_pipeline_on_gcp_dataflow.py
1import os
2from datetime import datetime, timedelta
3
4from airflow import DAG
5from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
6from airflow.providers.google.cloud.operators.dataflow import DataflowConfiguration
7
8# Set up default arguments for the DAG
9default_args = {
10 "owner": "airflow",
11 "depends_on_past": False,
12 "start_date": datetime(2021, 1, 1),
13 "email_on_failure": False,
14 "email_on_retry": False,
15 "retries": 1,
16 "retry_delay": timedelta(minutes=5),
17}
18
19with DAG(
20 "example_beam_native_python",
21 default_args=default_args,
22 schedule_interval=None, # Run manually
23 catchup=False,
24) as dag:
25
26 # Path to the Python file containing your Beam pipeline
27 # For this example, it is assumed the file is in the same directory as the DAG
28 GCS_PYTHON = "gs://your-bucket/path/to/your_beam_pipeline.py"
29
30 # This operator executes a Beam pipeline written in Python
31 # It can run locally or on a remote runner like Dataflow
32 run_python_pipeline = BeamRunPythonPipelineOperator(
33 task_id="execute_beam_python",
34 py_file=GCS_PYTHON,
35 py_options=[],
36 pipeline_options={
37 "output": "gs://your-bucket/results/outputs",
38 },
39 py_interpreter="python3",
40 py_system_site_packages=False,
41 dataflow_config=DataflowConfiguration(
42 job_name="beam_python_job",
43 project_id="your-google-cloud-project",
44 location="us-central1",
45 ),
46 )
47
48 run_python_pipeline