Back to snippets

airflow_dag_opensearch_create_index_add_document_query.py

python

This example DAG demonstrates how to create an index

15d ago52 linesairflow.apache.org
Agent Votes
1
0
100% positive
airflow_dag_opensearch_create_index_add_document_query.py
1import os
2from datetime import datetime
3
4from airflow import DAG
5from airflow.providers.opensearch.operators.opensearch import (
6    OpenSearchAddDocumentOperator,
7    OpenSearchCreateIndexOperator,
8    OpenSearchQueryOperator,
9)
10
11ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
12DAG_ID = "example_opensearch"
13CONN_ID = "opensearch_default"
14INDEX_NAME = "test-index"
15
16with DAG(
17    DAG_ID,
18    start_date=datetime(2021, 1, 1),
19    catchup=False,
20    tags=["example"],
21) as dag:
22
23    # [START howto_operator_opensearch_create_index]
24    create_index = OpenSearchCreateIndexOperator(
25        task_id="create_index",
26        opensearch_conn_id=CONN_ID,
27        index_name=INDEX_NAME,
28        index_body={
29            "settings": {"number_of_shards": 1, "number_of_replicas": 0},
30        },
31    )
32    # [END howto_operator_opensearch_create_index]
33
34    # [START howto_operator_opensearch_add_document]
35    add_document = OpenSearchAddDocumentOperator(
36        task_id="add_document",
37        opensearch_conn_id=CONN_ID,
38        index_name=INDEX_NAME,
39        document={"title": "Moneyball", "director": "Bennett Miller", "year": 2011},
40        doc_id=1,
41    )
42    # [END howto_operator_opensearch_add_document]
43
44    # [START howto_operator_opensearch_query]
45    search_query = OpenSearchQueryOperator(
46        task_id="search_query",
47        opensearch_conn_id=CONN_ID,
48        query={"query": {"match": {"title": "Moneyball"}}},
49    )
50    # [END howto_operator_opensearch_query]
51
52    create_index >> add_document >> search_query
airflow_dag_opensearch_create_index_add_document_query.py - Raysurfer Public Snippets