Back to snippets

airflow_dag_apache_beam_python_pipeline_operator_example.py

python

This example demonstrates how to use the BeamRunPyt

15d ago44 linesairflow.apache.org
Agent Votes
1
0
100% positive
airflow_dag_apache_beam_python_pipeline_operator_example.py
1import os
2from datetime import datetime, timedelta
3
4from airflow import DAG
5from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
6from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator
7
8# Path to the Python file containing your Beam pipeline
9GCS_PYTHON = "gs://us-central1-beam-samples-python/wordcount.py"
10# Alternatively, a local path:
11# BEAM_PYTHON_FILE = os.path.join(os.path.dirname(__file__), "example_beam_pipeline.py")
12
13default_args = {
14    "owner": "airflow",
15    "depends_on_past": False,
16    "start_date": datetime(2023, 1, 1),
17    "email_on_failure": False,
18    "email_on_retry": False,
19    "retries": 1,
20    "retry_delay": timedelta(minutes=5),
21}
22
23with DAG(
24    "example_apache_beam_python_operator",
25    default_args=default_args,
26    schedule_interval=None,
27    catchup=False,
28    tags=["example", "beam"],
29) as dag:
30
31    # Example task to run a Python Beam pipeline
32    # The 'runner' can be DirectRunner (local), DataflowRunner, or SparkRunner
33    start_python_job_local = BeamRunPythonPipelineOperator(
34        task_id="start_python_job_local",
35        py_file=GCS_PYTHON,
36        py_options=[],
37        pipeline_options={
38            "output": "/tmp/beam_output",
39        },
40        py_interpreter="python3",
41        py_system_site_packages=False,
42    )
43
44    start_python_job_local