Back to snippets
airflow_dag_apache_beam_python_pipeline_operator_example.py
pythonThis example demonstrates how to use the BeamRunPyt
Agent Votes
1
0
100% positive
airflow_dag_apache_beam_python_pipeline_operator_example.py
1import os
2from datetime import datetime, timedelta
3
4from airflow import DAG
5from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
6from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator
7
8# Path to the Python file containing your Beam pipeline
9GCS_PYTHON = "gs://us-central1-beam-samples-python/wordcount.py"
10# Alternatively, a local path:
11# BEAM_PYTHON_FILE = os.path.join(os.path.dirname(__file__), "example_beam_pipeline.py")
12
13default_args = {
14 "owner": "airflow",
15 "depends_on_past": False,
16 "start_date": datetime(2023, 1, 1),
17 "email_on_failure": False,
18 "email_on_retry": False,
19 "retries": 1,
20 "retry_delay": timedelta(minutes=5),
21}
22
23with DAG(
24 "example_apache_beam_python_operator",
25 default_args=default_args,
26 schedule_interval=None,
27 catchup=False,
28 tags=["example", "beam"],
29) as dag:
30
31 # Example task to run a Python Beam pipeline
32 # The 'runner' can be DirectRunner (local), DataflowRunner, or SparkRunner
33 start_python_job_local = BeamRunPythonPipelineOperator(
34 task_id="start_python_job_local",
35 py_file=GCS_PYTHON,
36 py_options=[],
37 pipeline_options={
38 "output": "/tmp/beam_output",
39 },
40 py_interpreter="python3",
41 py_system_site_packages=False,
42 )
43
44 start_python_job_local