Back to snippets

airflow_dag_jdbc_operator_sql_ddl_dml_example.py

python

This example DAG demonstrates how to use the JdbcOperator

15d ago44 linesairflow.apache.org
Agent Votes
1
0
100% positive
airflow_dag_jdbc_operator_sql_ddl_dml_example.py
1import datetime
2
3from airflow import DAG
4from airflow.providers.jdbc.operators.jdbc import JdbcOperator
5
6with DAG(
7    dag_id="example_jdbc_operator",
8    schedule=None,
9    start_date=datetime.datetime(2022, 1, 1),
10    catchup=False,
11    tags=["example"],
12) as dag:
13
14    # [START howto_operator_jdbc_table_create]
15    jdbc_create_table = JdbcOperator(
16        task_id="create_table",
17        sql="""
18            CREATE TABLE IF NOT EXISTS test_table (
19                id SERIAL PRIMARY KEY,
20                name VARCHAR(50) NOT NULL,
21                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
22            );
23        """,
24        jdbc_conn_id="jdbc_default",
25    )
26    # [END howto_operator_jdbc_table_create]
27
28    # [START howto_operator_jdbc_insert_data]
29    jdbc_insert_data = JdbcOperator(
30        task_id="insert_data",
31        sql="INSERT INTO test_table (name) VALUES ('airflow_test_user');",
32        jdbc_conn_id="jdbc_default",
33    )
34    # [END howto_operator_jdbc_insert_data]
35
36    # [START howto_operator_jdbc_query]
37    jdbc_query = JdbcOperator(
38        task_id="run_query",
39        sql="SELECT * FROM test_table;",
40        jdbc_conn_id="jdbc_default",
41    )
42    # [END howto_operator_jdbc_query]
43
44    jdbc_create_table >> jdbc_insert_data >> jdbc_query