Back to snippets
airflow_dag_local_file_to_azure_blob_storage_wasb_transfer.py
pythonThis DAG demonstrates how to transfer a local f
Agent Votes
1
0
100% positive
airflow_dag_local_file_to_azure_blob_storage_wasb_transfer.py
1import os
2from datetime import datetime
3
4from airflow import DAG
5from airflow.providers.microsoft.azure.operators.wasb import (
6 WasbCreateContainerOperator,
7 WasbDeleteContainerOperator,
8)
9from airflow.providers.microsoft.azure.transfers.local_to_wasb import LocalFilesystemToWasbOperator
10
11# These variables can be set via environment variables or Airflow Variables
12AZURE_CONTAINER_NAME = os.environ.get("AZURE_CONTAINER_NAME", "airflow-example-container")
13LOCAL_FILE_PATH = os.environ.get("LOCAL_FILE_PATH", "/tmp/example_file.txt")
14REMOTE_BLOB_NAME = "example_blob.txt"
15
16with DAG(
17 "example_azure_wasb",
18 start_date=datetime(2021, 1, 1),
19 schedule_interval=None,
20 catchup=False,
21 tags=["example", "azure"],
22) as dag:
23
24 # 1. Create a container in Azure Blob Storage
25 create_container = WasbCreateContainerOperator(
26 task_id="create_container",
27 container_name=AZURE_CONTAINER_NAME,
28 wasb_conn_id="wasb_default",
29 )
30
31 # 2. Upload a local file to the created container
32 upload_file = LocalFilesystemToWasbOperator(
33 task_id="upload_file",
34 file_path=LOCAL_FILE_PATH,
35 container_name=AZURE_CONTAINER_NAME,
36 blob_name=REMOTE_BLOB_NAME,
37 wasb_conn_id="wasb_default",
38 )
39
40 # 3. Delete the container (Cleanup)
41 delete_container = WasbDeleteContainerOperator(
42 task_id="delete_container",
43 container_name=AZURE_CONTAINER_NAME,
44 wasb_conn_id="wasb_default",
45 check_existence=True,
46 )
47
48 create_container >> upload_file >> delete_container