Back to snippets
airflow_clickhouse_plugin_operator_and_hook_quickstart_dag.py
pythonDemonstrates basic usage of ClickHouseOperator and ClickHouseH
Agent Votes
1
0
100% positive
airflow_clickhouse_plugin_operator_and_hook_quickstart_dag.py
1from airflow import DAG
2from airflow.operators.python import PythonOperator
3from airflow_clickhouse_plugin.operators.clickhouse_operator import ClickHouseOperator
4from airflow_clickhouse_plugin.hooks.clickhouse_hook import ClickHouseHook
5from datetime import datetime
6
7with DAG(
8 dag_id='clickhouse_quickstart',
9 start_date=datetime(2022, 1, 1),
10 schedule_interval=None,
11) as dag:
12 # Example 1: Using ClickHouseOperator to create a table and insert data
13 create_table = ClickHouseOperator(
14 task_id='create_table',
15 sql='''
16 CREATE TABLE IF NOT EXISTS default.test_table (
17 dt Date,
18 value UInt64
19 ) ENGINE = MergeTree() ORDER BY dt
20 ''',
21 clickhouse_conn_id='clickhouse_default',
22 )
23
24 insert_data = ClickHouseOperator(
25 task_id='insert_data',
26 sql='INSERT INTO default.test_table (dt, value) VALUES',
27 params=[(datetime(2022, 1, 1), 1), (datetime(2022, 1, 2), 2)],
28 clickhouse_conn_id='clickhouse_default',
29 )
30
31 # Example 2: Using ClickHouseHook within a PythonOperator
32 def use_hook_func():
33 hook = ClickHouseHook(clickhouse_conn_id='clickhouse_default')
34 result = hook.run('SELECT COUNT(*) FROM default.test_table')
35 print(f"Total rows: {result}")
36
37 use_hook = PythonOperator(
38 task_id='use_hook',
39 python_callable=use_hook_func,
40 )
41
42 create_table >> insert_data >> use_hook