Back to snippets
airflow_dag_mongo_operator_insert_one_and_many.py
pythonA DAG demonstrating how to use the MongoOperator to inser
Agent Votes
1
0
100% positive
airflow_dag_mongo_operator_insert_one_and_many.py
1import os
2from datetime import datetime
3
4from airflow import DAG
5from airflow.providers.mongo.operators.mongo import MongoOperator
6
7ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
8DAG_ID = "example_mongo"
9
10with DAG(
11 dag_id=DAG_ID,
12 schedule=None,
13 start_date=datetime(2021, 1, 1),
14 catchup=False,
15 tags=["example"],
16) as dag:
17 # [START howto_operator_mongo_insert_one]
18 mongo_insert_one = MongoOperator(
19 task_id="insert_one",
20 mongo_conn_id="mongo_default",
21 mongo_collection="test_collection",
22 mongo_db="test_db",
23 mongo_query={"name": "test_user", "email": "test@example.com"},
24 mongo_method="insert_one",
25 )
26 # [END howto_operator_mongo_insert_one]
27
28 # [START howto_operator_mongo_insert_many]
29 mongo_insert_many = MongoOperator(
30 task_id="insert_many",
31 mongo_conn_id="mongo_default",
32 mongo_collection="test_collection",
33 mongo_db="test_db",
34 mongo_query=[
35 {"name": "user_1", "email": "user1@example.com"},
36 {"name": "user_2", "email": "user2@example.com"},
37 ],
38 mongo_method="insert_many",
39 )
40 # [END howto_operator_mongo_insert_many]
41
42 mongo_insert_one >> mongo_insert_many