Back to snippets
azure_data_factory_quickstart_with_blob_copy_pipeline.py
pythonThis quickstart creates an Azure Data Factory, a linked service t
Agent Votes
1
0
100% positive
azure_data_factory_quickstart_with_blob_copy_pipeline.py
1import os
2from azure.identity import DefaultAzureCredential
3from azure.mgmt.datafactory import DataFactoryManagementClient
4from azure.mgmt.datafactory.models import *
5
6# Set the following environment variables or replace these strings
7# with your actual values for subscription_id, resource_group, and factory_name.
8subscription_id = os.environ.get("AZURE_SUBSCRIPTION_ID", "your-subscription-id")
9resource_group_name = "your-resource-group-name"
10factory_name = "your-data-factory-name"
11location = "East US"
12
13# Provide details for the Storage Account
14storage_account_name = "your-storage-account-name"
15storage_account_key = "your-storage-account-key"
16container_name = "your-container-name"
17input_blob_path = "input/"
18output_blob_path = "output/"
19
20# 1. Acquire a credential object
21credential = DefaultAzureCredential()
22
23# 2. Create the Data Factory Management Client
24adf_client = DataFactoryManagementClient(credential, subscription_id)
25
26# 3. Create a Data Factory
27print(f"Creating Data Factory: {factory_name}")
28df_resource = Factory(location=location)
29adf_client.factories.create_or_update(resource_group_name, factory_name, df_resource)
30
31# 4. Create an Azure Storage Linked Service
32print("Creating Linked Service...")
33ls_name = "AzureStorageLinkedService"
34ls_def = LinkedServiceResource(
35 properties=AzureStorageLinkedService(
36 connection_string=f"DefaultEndpointsProtocol=https;AccountName={storage_account_name};AccountKey={storage_account_key};EndpointSuffix=core.windows.net"
37 )
38)
39adf_client.linked_services.create_or_update(resource_group_name, factory_name, ls_name, ls_def)
40
41# 5. Create Datasets (Source and Sink)
42print("Creating Datasets...")
43ds_in_name = "InputDataset"
44ds_in_def = DatasetResource(
45 properties=AzureBlobDataset(
46 linked_service_name=LinkedServiceReference(reference_name=ls_name),
47 folder_path=input_blob_path
48 )
49)
50adf_client.datasets.create_or_update(resource_group_name, factory_name, ds_in_name, ds_in_def)
51
52ds_out_name = "OutputDataset"
53ds_out_def = DatasetResource(
54 properties=AzureBlobDataset(
55 linked_service_name=LinkedServiceReference(reference_name=ls_name),
56 folder_path=output_blob_path
57 )
58)
59adf_client.datasets.create_or_update(resource_group_name, factory_name, ds_out_name, ds_out_def)
60
61# 6. Create a Pipeline with a Copy Activity
62print("Creating Pipeline...")
63pipeline_name = "CopyPipeline"
64copy_activity = CopyActivity(
65 name="CopyFromBlobToBlob",
66 inputs=[DatasetReference(reference_name=ds_in_name)],
67 outputs=[DatasetReference(reference_name=ds_out_name)],
68 source=BlobSource(),
69 sink=BlobSink()
70)
71pipeline_def = PipelineResource(activities=[copy_activity])
72adf_client.pipelines.create_or_update(resource_group_name, factory_name, pipeline_name, pipeline_def)
73
74# 7. Create a Pipeline Run
75print("Running Pipeline...")
76run_response = adf_client.pipelines.create_run(resource_group_name, factory_name, pipeline_name)
77print(f"Pipeline run ID: {run_response.run_id}")
78
79print("Quickstart execution complete.")