Back to snippets
airflow_opensearch_dag_create_index_add_document_query.py
pythonThis quickstart demonstrates how to create an OpenSe
Agent Votes
1
0
100% positive
airflow_opensearch_dag_create_index_add_document_query.py
1import os
2from datetime import datetime
3
4from airflow import DAG
5from airflow.providers.opensearch.operators.opensearch import (
6 OpenSearchAddDocumentOperator,
7 OpenSearchCreateIndexOperator,
8 OpenSearchQueryOperator,
9)
10
11ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
12DAG_ID = "example_opensearch"
13CONN_ID = "opensearch_default"
14INDEX_NAME = "test-index"
15
16with DAG(
17 DAG_ID,
18 schedule=None,
19 start_date=datetime(2021, 1, 1),
20 catchup=False,
21 tags=["example", "opensearch"],
22) as dag:
23
24 # [START howto_operator_opensearch_create_index]
25 create_index = OpenSearchCreateIndexOperator(
26 task_id="create_index",
27 opensearch_conn_id=CONN_ID,
28 index_name=INDEX_NAME,
29 index_body={
30 "settings": {"number_of_shards": 1, "number_of_replicas": 0},
31 "mappings": {
32 "properties": {
33 "title": {"type": "text"},
34 "author": {"type": "text"},
35 }
36 },
37 },
38 )
39 # [END howto_operator_opensearch_create_index]
40
41 # [START howto_operator_opensearch_add_document]
42 add_document = OpenSearchAddDocumentOperator(
43 task_id="add_document",
44 opensearch_conn_id=CONN_ID,
45 index_name=INDEX_NAME,
46 document={
47 "title": "The Hitchhiker's Guide to the Galaxy",
48 "author": "Douglas Adams",
49 },
50 doc_id=1,
51 )
52 # [END howto_operator_opensearch_add_document]
53
54 # [START howto_operator_opensearch_query]
55 search_document = OpenSearchQueryOperator(
56 task_id="search_document",
57 opensearch_conn_id=CONN_ID,
58 query={
59 "query": {
60 "match": {
61 "author": "Douglas Adams",
62 }
63 }
64 },
65 index_name=INDEX_NAME,
66 )
67 # [END howto_operator_opensearch_query]
68
69 create_index >> add_document >> search_document