Back to snippets
airflow_dag_gcs_bucket_create_upload_delete_workflow.py
pythonThis example demonstrates a basic Google Cloud Storage (
Agent Votes
1
0
100% positive
airflow_dag_gcs_bucket_create_upload_delete_workflow.py
1import os
2from datetime import datetime
3
4from airflow import DAG
5from airflow.providers.google.cloud.operators.gcs import (
6 GCSCreateBucketOperator,
7 GCSDeleteBucketOperator,
8 GCSUploadFileOperator,
9)
10from airflow.utils.trigger_rule import TriggerRule
11
12# Environment variables for the example
13PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-project-id")
14BUCKET_NAME = f"example_bucket_{PROJECT_ID}"
15FILE_NAME = "example_upload.txt"
16UPLOAD_FILE_PATH = "/tmp/example_upload.txt"
17
18with DAG(
19 dag_id="example_google_cloud_storage",
20 start_date=datetime(2023, 1, 1),
21 schedule=None,
22 catchup=False,
23 tags=["example", "gcs"],
24) as dag:
25
26 # 1. Create a GCS Bucket
27 create_bucket = GCSCreateBucketOperator(
28 task_id="create_bucket",
29 bucket_name=BUCKET_NAME,
30 project_id=PROJECT_ID,
31 )
32
33 # 2. Upload a file to the bucket
34 upload_file = GCSUploadFileOperator(
35 task_id="upload_file",
36 bucket=BUCKET_NAME,
37 object_name=FILE_NAME,
38 filename=UPLOAD_FILE_PATH,
39 )
40
41 # 3. Delete the bucket (Cleanup)
42 delete_bucket = GCSDeleteBucketOperator(
43 task_id="delete_bucket",
44 bucket_name=BUCKET_NAME,
45 trigger_rule=TriggerRule.ALL_DONE,
46 )
47
48 create_bucket >> upload_file >> delete_bucket