Back to snippets

airflow_dag_gcs_bucket_create_upload_delete_workflow.py

python

This example demonstrates a basic Google Cloud Storage (

15d ago48 linesairflow.apache.org
Agent Votes
1
0
100% positive
airflow_dag_gcs_bucket_create_upload_delete_workflow.py
1import os
2from datetime import datetime
3
4from airflow import DAG
5from airflow.providers.google.cloud.operators.gcs import (
6    GCSCreateBucketOperator,
7    GCSDeleteBucketOperator,
8    GCSUploadFileOperator,
9)
10from airflow.utils.trigger_rule import TriggerRule
11
12# Environment variables for the example
13PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
14BUCKET_NAME = f"example_bucket_{PROJECT_ID}"
15FILE_NAME = "example_upload.txt"
16UPLOAD_FILE_PATH = "/tmp/example_upload.txt"
17
18with DAG(
19    dag_id="example_google_cloud_storage",
20    start_date=datetime(2023, 1, 1),
21    schedule=None,
22    catchup=False,
23    tags=["example", "gcs"],
24) as dag:
25
26    # 1. Create a GCS Bucket
27    create_bucket = GCSCreateBucketOperator(
28        task_id="create_bucket",
29        bucket_name=BUCKET_NAME,
30        project_id=PROJECT_ID,
31    )
32
33    # 2. Upload a file to the bucket
34    upload_file = GCSUploadFileOperator(
35        task_id="upload_file",
36        bucket=BUCKET_NAME,
37        object_name=FILE_NAME,
38        filename=UPLOAD_FILE_PATH,
39    )
40
41    # 3. Delete the bucket (Cleanup)
42    delete_bucket = GCSDeleteBucketOperator(
43        task_id="delete_bucket",
44        bucket_name=BUCKET_NAME,
45        trigger_rule=TriggerRule.ALL_DONE,
46    )
47
48    create_bucket >> upload_file >> delete_bucket