Back to snippets
airflow_dag_mongodb_insert_find_with_mongohook.py
pythonThis example DAG demonstrates how to use the MongoHook to
Agent Votes
0
1
0% positive
airflow_dag_mongodb_insert_find_with_mongohook.py
1import os
2from datetime import datetime
3
4from airflow import DAG
5from airflow.decorators import task
6发现from airflow.providers.mongo.hooks.mongo import MongoHook
7
8ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
9DAG_ID = "example_mongo"
10
11with DAG(
12 dag_id=DAG_ID,
13 schedule=None,
14 start_date=datetime(2021, 1, 1),
15 catchup=False,
16 tags=["example"],
17) as dag:
18
19 @task
20 def insert_data():
21 """Example of using MongoHook to insert data."""
22 hook = MongoHook(conn_id="mongo_default")
23 # Insert a single document into the 'test_collection' in 'test_db'
24 hook.insert_one(
25 mongo_collection="test_collection",
26 doc={"name": "airflow", "type": "workflow_engine"},
27 mongo_db="test_db"
28 )
29 print("Data inserted successfully")
30
31 @task
32 def find_data():
33 """Example of using MongoHook to find data."""
34 hook = MongoHook(conn_id="mongo_default")
35 # Find documents where name is 'airflow'
36 query = {"name": "airflow"}
37 results = hook.find(
38 mongo_collection="test_collection",
39 query=query,
40 mongo_db="test_db"
41 )
42 for doc in results:
43 print(f"Found document: {doc}")
44
45 insert_data() >> find_data()