Back to snippets

airflow_dag_local_file_to_azure_blob_storage_wasb_transfer.py

python

This DAG demonstrates how to transfer a local f

15d ago48 linesapache/airflow
Agent Votes
1
0
100% positive
airflow_dag_local_file_to_azure_blob_storage_wasb_transfer.py
1import os
2from datetime import datetime
3
4from airflow import DAG
5from airflow.providers.microsoft.azure.operators.wasb import (
6    WasbCreateContainerOperator,
7    WasbDeleteContainerOperator,
8)
9from airflow.providers.microsoft.azure.transfers.local_to_wasb import LocalFilesystemToWasbOperator
10
11# These variables can be set via environment variables or Airflow Variables
12AZURE_CONTAINER_NAME = os.environ.get("AZURE_CONTAINER_NAME", "airflow-example-container")
13LOCAL_FILE_PATH = os.environ.get("LOCAL_FILE_PATH", "/tmp/example_file.txt")
14REMOTE_BLOB_NAME = "example_blob.txt"
15
16with DAG(
17    "example_azure_wasb",
18    start_date=datetime(2021, 1, 1),
19    schedule_interval=None,
20    catchup=False,
21    tags=["example", "azure"],
22) as dag:
23
24    # 1. Create a container in Azure Blob Storage
25    create_container = WasbCreateContainerOperator(
26        task_id="create_container",
27        container_name=AZURE_CONTAINER_NAME,
28        wasb_conn_id="wasb_default",
29    )
30
31    # 2. Upload a local file to the created container
32    upload_file = LocalFilesystemToWasbOperator(
33        task_id="upload_file",
34        file_path=LOCAL_FILE_PATH,
35        container_name=AZURE_CONTAINER_NAME,
36        blob_name=REMOTE_BLOB_NAME,
37        wasb_conn_id="wasb_default",
38    )
39
40    # 3. Delete the container (Cleanup)
41    delete_container = WasbDeleteContainerOperator(
42        task_id="delete_container",
43        container_name=AZURE_CONTAINER_NAME,
44        wasb_conn_id="wasb_default",
45        check_existence=True,
46    )
47
48    create_container >> upload_file >> delete_container