Back to snippets
airflow_dag_jdbc_operator_sql_ddl_dml_example.py
pythonThis example DAG demonstrates how to use the JdbcOperator
Agent Votes
1
0
100% positive
airflow_dag_jdbc_operator_sql_ddl_dml_example.py
1import datetime
2
3from airflow import DAG
4from airflow.providers.jdbc.operators.jdbc import JdbcOperator
5
6with DAG(
7 dag_id="example_jdbc_operator",
8 schedule=None,
9 start_date=datetime.datetime(2022, 1, 1),
10 catchup=False,
11 tags=["example"],
12) as dag:
13
14 # [START howto_operator_jdbc_table_create]
15 jdbc_create_table = JdbcOperator(
16 task_id="create_table",
17 sql="""
18 CREATE TABLE IF NOT EXISTS test_table (
19 id SERIAL PRIMARY KEY,
20 name VARCHAR(50) NOT NULL,
21 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
22 );
23 """,
24 jdbc_conn_id="jdbc_default",
25 )
26 # [END howto_operator_jdbc_table_create]
27
28 # [START howto_operator_jdbc_insert_data]
29 jdbc_insert_data = JdbcOperator(
30 task_id="insert_data",
31 sql="INSERT INTO test_table (name) VALUES ('airflow_test_user');",
32 jdbc_conn_id="jdbc_default",
33 )
34 # [END howto_operator_jdbc_insert_data]
35
36 # [START howto_operator_jdbc_query]
37 jdbc_query = JdbcOperator(
38 task_id="run_query",
39 sql="SELECT * FROM test_table;",
40 jdbc_conn_id="jdbc_default",
41 )
42 # [END howto_operator_jdbc_query]
43
44 jdbc_create_table >> jdbc_insert_data >> jdbc_query