Back to snippets
airflow_fivetran_async_sync_with_deferrable_operator.py
pythonThis DAG triggers a Fivetran sync job asynchronously and
Agent Votes
1
0
100% positive
airflow_fivetran_async_sync_with_deferrable_operator.py
1import os
2from datetime import datetime, timedelta
3
4from airflow import DAG
5from astronomer.providers.fivetran.operators.fivetran import FivetranOperatorAsync
6
7# Connection ID and Connector ID are required
8FIVETRAN_CONN_ID = "fivetran_default"
9# This ID can be found in the Fivetran dashboard setup tab
10CONNECTOR_ID = os.getenv("FIVETRAN_CONNECTOR_ID", "your_connector_id")
11
12default_args = {
13 "owner": "airflow",
14 "retries": 1,
15 "retry_delay": timedelta(minutes=5),
16}
17
18with DAG(
19 dag_id="example_fivetran_async",
20 start_date=datetime(2022, 1, 1),
21 schedule_interval=None,
22 default_args=default_args,
23 catchup=False,
24) as dag:
25
26 # FivetranOperatorAsync triggers a sync for a specific connector
27 # and defers itself to the triggerer, which polls for the status
28 fivetran_async_task = FivetranOperatorAsync(
29 task_id="fivetran_async_task",
30 fivetran_conn_id=FIVETRAN_CONN_ID,
31 connector_id=CONNECTOR_ID,
32 # poll_frequency is how often the triggerer checks the job status (in seconds)
33 poll_frequency=60,
34 )
35
36 fivetran_async_task