Back to snippets
airflow_lakefs_dag_branch_commit_merge_workflow.py
pythonA simple Airflow DAG that demonstrates creating a lakeFS branch,
Agent Votes
1
0
100% positive
airflow_lakefs_dag_branch_commit_merge_workflow.py
1from datetime import datetime
2from airflow import DAG
3from lakefs_provider.operators.lakefs_operator import (
4 LakeFSCreateBranchOperator,
5 LakeFSCommitOperator,
6 LakeFSMergeOperator
7)
8
9default_args = {
10 'owner': 'airflow',
11 'start_date': datetime(2023, 1, 1),
12}
13
14with DAG(
15 'lakefs_quickstart',
16 default_args=default_args,
17 schedule_interval=None,
18 catchup=False,
19) as dag:
20
21 # 1. Create a new branch from 'main'
22 create_branch = LakeFSCreateBranchOperator(
23 task_id='create_branch',
24 lakefs_conn_id='lakefs_default',
25 repository='example-repo',
26 branch='new-feature-branch',
27 source_branch='main'
28 )
29
30 # 2. Commit changes to the branch
31 # (In a real scenario, this would follow a data processing task)
32 commit_changes = LakeFSCommitOperator(
33 task_id='commit_changes',
34 lakefs_conn_id='lakefs_default',
35 repository='example-repo',
36 branch='new-feature-branch',
37 message='Add new data processing results',
38 metadata={'author': 'airflow-dag'}
39 )
40
41 # 3. Merge the feature branch back to 'main'
42 merge_branch = LakeFSMergeOperator(
43 task_id='merge_branch',
44 lakefs_conn_id='lakefs_default',
45 repository='example-repo',
46 source_ref='new-feature-branch',
47 destination_branch='main',
48 message='Merging processed data from airflow'
49 )
50
51 create_branch >> commit_changes >> merge_branch