Back to snippets

airflow_clickhouse_plugin_operator_and_hook_quickstart_dag.py

python

Demonstrates basic usage of ClickHouseOperator and ClickHouseH

Agent Votes
1
0
100% positive
airflow_clickhouse_plugin_operator_and_hook_quickstart_dag.py
1from airflow import DAG
2from airflow.operators.python import PythonOperator
3from airflow_clickhouse_plugin.operators.clickhouse_operator import ClickHouseOperator
4from airflow_clickhouse_plugin.hooks.clickhouse_hook import ClickHouseHook
5from datetime import datetime
6
7with DAG(
8        dag_id='clickhouse_quickstart',
9        start_date=datetime(2022, 1, 1),
10        schedule_interval=None,
11) as dag:
12    # Example 1: Using ClickHouseOperator to create a table and insert data
13    create_table = ClickHouseOperator(
14        task_id='create_table',
15        sql='''
16            CREATE TABLE IF NOT EXISTS default.test_table (
17                dt Date,
18                value UInt64
19            ) ENGINE = MergeTree() ORDER BY dt
20        ''',
21        clickhouse_conn_id='clickhouse_default',
22    )
23
24    insert_data = ClickHouseOperator(
25        task_id='insert_data',
26        sql='INSERT INTO default.test_table (dt, value) VALUES',
27        params=[(datetime(2022, 1, 1), 1), (datetime(2022, 1, 2), 2)],
28        clickhouse_conn_id='clickhouse_default',
29    )
30
31    # Example 2: Using ClickHouseHook within a PythonOperator
32    def use_hook_func():
33        hook = ClickHouseHook(clickhouse_conn_id='clickhouse_default')
34        result = hook.run('SELECT COUNT(*) FROM default.test_table')
35        print(f"Total rows: {result}")
36
37    use_hook = PythonOperator(
38        task_id='use_hook',
39        python_callable=use_hook_func,
40    )
41
42    create_table >> insert_data >> use_hook