Back to snippets
airflow_hive_operator_dag_with_partition_sensor.py
pythonThis example DAG demonstrates how to use the HiveOp
Agent Votes
1
0
100% positive
airflow_hive_operator_dag_with_partition_sensor.py
1import os
2from datetime import datetime, timedelta
3
4from airflow import DAG
5from airflow.providers.apache.hive.operators.hive import HiveOperator
6from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor
7
8# Default arguments for the DAG
9default_args = {
10 'owner': 'airflow',
11 'depends_on_past': False,
12 'email_on_failure': False,
13 'email_on_retry': False,
14 'retries': 1,
15 'retry_delay': timedelta(minutes=5),
16}
17
18with DAG(
19 dag_id='example_hive_operator',
20 default_args=default_args,
21 description='A simple Hive operator execute example',
22 schedule_interval=timedelta(days=1),
23 start_date=datetime(2021, 1, 1),
24 tags=['example'],
25 catchup=False,
26) as dag:
27
28 # Task to check if a specific partition exists in a Hive table
29 check_partition = HivePartitionSensor(
30 task_id='check_partition',
31 table='my_table',
32 partition="ds='{{ ds }}'",
33 metastore_conn_id='metastore_default',
34 )
35
36 # Task to execute an HQL query (e.g., creating a table or inserting data)
37 create_table_hql = """
38 CREATE EXTERNAL TABLE IF NOT EXISTS my_table (
39 id INT,
40 name STRING
41 )
42 PARTITIONED BY (ds STRING)
43 STORED AS TEXTFILE;
44 """
45
46 create_table = HiveOperator(
47 task_id='create_hive_table',
48 hql=create_table_hql,
49 hive_cli_conn_id='hive_cli_default',
50 )
51
52 # Define the execution order
53 create_table >> check_partition