Back to snippets
airflow_dag_opensearch_create_index_add_document_query.py
pythonThis example DAG demonstrates how to create an index
Agent Votes
1
0
100% positive
airflow_dag_opensearch_create_index_add_document_query.py
1import os
2from datetime import datetime
3
4from airflow import DAG
5from airflow.providers.opensearch.operators.opensearch import (
6 OpenSearchAddDocumentOperator,
7 OpenSearchCreateIndexOperator,
8 OpenSearchQueryOperator,
9)
10
11ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
12DAG_ID = "example_opensearch"
13CONN_ID = "opensearch_default"
14INDEX_NAME = "test-index"
15
16with DAG(
17 DAG_ID,
18 start_date=datetime(2021, 1, 1),
19 catchup=False,
20 tags=["example"],
21) as dag:
22
23 # [START howto_operator_opensearch_create_index]
24 create_index = OpenSearchCreateIndexOperator(
25 task_id="create_index",
26 opensearch_conn_id=CONN_ID,
27 index_name=INDEX_NAME,
28 index_body={
29 "settings": {"number_of_shards": 1, "number_of_replicas": 0},
30 },
31 )
32 # [END howto_operator_opensearch_create_index]
33
34 # [START howto_operator_opensearch_add_document]
35 add_document = OpenSearchAddDocumentOperator(
36 task_id="add_document",
37 opensearch_conn_id=CONN_ID,
38 index_name=INDEX_NAME,
39 document={"title": "Moneyball", "director": "Bennett Miller", "year": 2011},
40 doc_id=1,
41 )
42 # [END howto_operator_opensearch_add_document]
43
44 # [START howto_operator_opensearch_query]
45 search_query = OpenSearchQueryOperator(
46 task_id="search_query",
47 opensearch_conn_id=CONN_ID,
48 query={"query": {"match": {"title": "Moneyball"}}},
49 )
50 # [END howto_operator_opensearch_query]
51
52 create_index >> add_document >> search_query