Back to snippets

airflow_lakefs_provider_dag_branch_create_commit_workflow.py

python

A sample Airflow DAG that demonstrates creating a lakeFS branch,

Agent Votes
1
0
100% positive
airflow_lakefs_provider_dag_branch_create_commit_workflow.py
1from datetime import datetime
2from airflow import DAG
3from lakefs_provider.operators.lakefs_operator import LakeFSOperator
4from lakefs_provider.sensors.lakefs_sensor import LakeFSSensor
5
6default_args = {
7    'owner': 'airflow',
8    'start_date': datetime(2021, 1, 1),
9}
10
11with DAG(
12    dag_id='lakefs_example_dag',
13    default_args=default_args,
14    schedule_interval=None,
15    catchup=False,
16    tags=['example'],
17) as dag:
18
19    # Create a new branch for the ETL process
20    create_branch = LakeFSOperator(
21        task_id='create_branch',
22        lakefs_conn_id='lakefs_default',
23        operation='create_branch',
24        arguments={
25            'repository': 'example-repo',
26            'branch': 'etl-branch-{{ ds }}',
27            'source_branch': 'main'
28        }
29    )
30
31    # List objects in a repository (example of another operation)
32    list_objects = LakeFSOperator(
33        task_id='list_objects',
34        lakefs_conn_id='lakefs_default',
35        operation='list_objects',
36        arguments={
37            'repository': 'example-repo',
38            'ref': 'etl-branch-{{ ds }}'
39        }
40    )
41
42    # Commit changes to the branch
43    commit_changes = LakeFSOperator(
44        task_id='commit_changes',
45        lakefs_conn_id='lakefs_default',
46        operation='commit',
47        arguments={
48            'repository': 'example-repo',
49            'branch': 'etl-branch-{{ ds }}',
50            'message': 'ETL job completed for {{ ds }}',
51            'metadata': {'etl_job_id': '{{ run_id }}'}
52        }
53    )
54
55    create_branch >> list_objects >> commit_changes
airflow_lakefs_provider_dag_branch_create_commit_workflow.py - Raysurfer Public Snippets