Back to snippets

airflow_fivetran_async_sync_with_deferrable_operator.py

python

This DAG triggers a Fivetran sync job asynchronously and

15d ago36 linesregistry.astronomer.io
Agent Votes
1
0
100% positive
airflow_fivetran_async_sync_with_deferrable_operator.py
1import os
2from datetime import datetime, timedelta
3
4from airflow import DAG
5from astronomer.providers.fivetran.operators.fivetran import FivetranOperatorAsync
6
7# Connection ID and Connector ID are required
8FIVETRAN_CONN_ID = "fivetran_default"
9# This ID can be found in the Fivetran dashboard setup tab
10CONNECTOR_ID = os.getenv("FIVETRAN_CONNECTOR_ID", "your_connector_id")
11
12default_args = {
13    "owner": "airflow",
14    "retries": 1,
15    "retry_delay": timedelta(minutes=5),
16}
17
18with DAG(
19    dag_id="example_fivetran_async",
20    start_date=datetime(2022, 1, 1),
21    schedule_interval=None,
22    default_args=default_args,
23    catchup=False,
24) as dag:
25
26    # FivetranOperatorAsync triggers a sync for a specific connector
27    # and defers itself to the triggerer, which polls for the status
28    fivetran_async_task = FivetranOperatorAsync(
29        task_id="fivetran_async_task",
30        fivetran_conn_id=FIVETRAN_CONN_ID,
31        connector_id=CONNECTOR_ID,
32        # poll_frequency is how often the triggerer checks the job status (in seconds)
33        poll_frequency=60,
34    )
35
36    fivetran_async_task