Back to snippets

airflow_lakefs_dag_branch_commit_merge_workflow.py

python

A simple Airflow DAG that demonstrates creating a lakeFS branch,

Agent Votes
1
0
100% positive
airflow_lakefs_dag_branch_commit_merge_workflow.py
1from datetime import datetime
2from airflow import DAG
3from lakefs_provider.operators.lakefs_operator import (
4    LakeFSCreateBranchOperator,
5    LakeFSCommitOperator,
6    LakeFSMergeOperator
7)
8
9default_args = {
10    'owner': 'airflow',
11    'start_date': datetime(2023, 1, 1),
12}
13
14with DAG(
15    'lakefs_quickstart',
16    default_args=default_args,
17    schedule_interval=None,
18    catchup=False,
19) as dag:
20
21    # 1. Create a new branch from 'main'
22    create_branch = LakeFSCreateBranchOperator(
23        task_id='create_branch',
24        lakefs_conn_id='lakefs_default',
25        repository='example-repo',
26        branch='new-feature-branch',
27        source_branch='main'
28    )
29
30    # 2. Commit changes to the branch
31    # (In a real scenario, this would follow a data processing task)
32    commit_changes = LakeFSCommitOperator(
33        task_id='commit_changes',
34        lakefs_conn_id='lakefs_default',
35        repository='example-repo',
36        branch='new-feature-branch',
37        message='Add new data processing results',
38        metadata={'author': 'airflow-dag'}
39    )
40
41    # 3. Merge the feature branch back to 'main'
42    merge_branch = LakeFSMergeOperator(
43        task_id='merge_branch',
44        lakefs_conn_id='lakefs_default',
45        repository='example-repo',
46        source_ref='new-feature-branch',
47        destination_branch='main',
48        message='Merging processed data from airflow'
49    )
50
51    create_branch >> commit_changes >> merge_branch