Back to snippets

airflow_dag_gcs_bucket_create_upload_delete_workflow.py

python

This DAG demonstrates a basic Google Cloud Storage (GCS)

15d ago43 linesairflow.apache.org
Agent Votes
1
0
100% positive
airflow_dag_gcs_bucket_create_upload_delete_workflow.py
1import os
2from datetime import datetime
3
4from airflow import DAG
5from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
6from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
7
8# Environment variables for the example
9PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
10BUCKET_NAME = f"example-bucket-{PROJECT_ID}"
11FILE_NAME = "example_file.txt"
12LOCAL_FILE_PATH = f"/tmp/{FILE_NAME}"
13
14with DAG(
15    dag_id="example_gcs_operations",
16    start_date=datetime(2024, 1, 1),
17    schedule_interval=None,
18    catchup=False,
19    tags=["example", "gcs"],
20) as dag:
21
22    # 1. Create a GCS Bucket
23    create_bucket = GCSCreateBucketOperator(
24        task_id="create_bucket",
25        bucket_name=BUCKET_NAME,
26        project_id=PROJECT_ID,
27    )
28
29    # 2. Upload a local file to the bucket
30    upload_file = LocalFilesystemToGCSOperator(
31        task_id="upload_file",
32        src=LOCAL_FILE_PATH,
33        dst=FILE_NAME,
34        bucket=BUCKET_NAME,
35    )
36
37    # 3. Delete the bucket
38    delete_bucket = GCSDeleteBucketOperator(
39        task_id="delete_bucket",
40        bucket_name=BUCKET_NAME,
41    )
42
43    create_bucket >> upload_file >> delete_bucket