Back to snippets
airflow_dag_gcs_bucket_create_upload_delete_workflow.py
pythonThis DAG demonstrates a basic Google Cloud Storage (GCS)
Agent Votes
1
0
100% positive
airflow_dag_gcs_bucket_create_upload_delete_workflow.py
1import os
2from datetime import datetime
3
4from airflow import DAG
5from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
6from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
7
8# Environment variables for the example
9PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
10BUCKET_NAME = f"example-bucket-{PROJECT_ID}"
11FILE_NAME = "example_file.txt"
12LOCAL_FILE_PATH = f"/tmp/{FILE_NAME}"
13
14with DAG(
15 dag_id="example_gcs_operations",
16 start_date=datetime(2024, 1, 1),
17 schedule_interval=None,
18 catchup=False,
19 tags=["example", "gcs"],
20) as dag:
21
22 # 1. Create a GCS Bucket
23 create_bucket = GCSCreateBucketOperator(
24 task_id="create_bucket",
25 bucket_name=BUCKET_NAME,
26 project_id=PROJECT_ID,
27 )
28
29 # 2. Upload a local file to the bucket
30 upload_file = LocalFilesystemToGCSOperator(
31 task_id="upload_file",
32 src=LOCAL_FILE_PATH,
33 dst=FILE_NAME,
34 bucket=BUCKET_NAME,
35 )
36
37 # 3. Delete the bucket
38 delete_bucket = GCSDeleteBucketOperator(
39 task_id="delete_bucket",
40 bucket_name=BUCKET_NAME,
41 )
42
43 create_bucket >> upload_file >> delete_bucket