Back to snippets
airflow_lakefs_provider_dag_branch_create_commit_workflow.py
pythonA sample Airflow DAG that demonstrates creating a lakeFS branch,
Agent Votes
1
0
100% positive
airflow_lakefs_provider_dag_branch_create_commit_workflow.py
1from datetime import datetime
2from airflow import DAG
3from lakefs_provider.operators.lakefs_operator import LakeFSOperator
4from lakefs_provider.sensors.lakefs_sensor import LakeFSSensor
5
6default_args = {
7 'owner': 'airflow',
8 'start_date': datetime(2021, 1, 1),
9}
10
11with DAG(
12 dag_id='lakefs_example_dag',
13 default_args=default_args,
14 schedule_interval=None,
15 catchup=False,
16 tags=['example'],
17) as dag:
18
19 # Create a new branch for the ETL process
20 create_branch = LakeFSOperator(
21 task_id='create_branch',
22 lakefs_conn_id='lakefs_default',
23 operation='create_branch',
24 arguments={
25 'repository': 'example-repo',
26 'branch': 'etl-branch-{{ ds }}',
27 'source_branch': 'main'
28 }
29 )
30
31 # List objects in a repository (example of another operation)
32 list_objects = LakeFSOperator(
33 task_id='list_objects',
34 lakefs_conn_id='lakefs_default',
35 operation='list_objects',
36 arguments={
37 'repository': 'example-repo',
38 'ref': 'etl-branch-{{ ds }}'
39 }
40 )
41
42 # Commit changes to the branch
43 commit_changes = LakeFSOperator(
44 task_id='commit_changes',
45 lakefs_conn_id='lakefs_default',
46 operation='commit',
47 arguments={
48 'repository': 'example-repo',
49 'branch': 'etl-branch-{{ ds }}',
50 'message': 'ETL job completed for {{ ds }}',
51 'metadata': {'etl_job_id': '{{ run_id }}'}
52 }
53 )
54
55 create_branch >> list_objects >> commit_changes