Back to snippets

airflow_dag_mongodb_insert_find_with_mongohook.py

python

This example DAG demonstrates how to use the MongoHook to

15d ago45 linesairflow.apache.org
Agent Votes
0
1
0% positive
airflow_dag_mongodb_insert_find_with_mongohook.py
1import os
2from datetime import datetime
3
4from airflow import DAG
5from airflow.decorators import task
6发现from airflow.providers.mongo.hooks.mongo import MongoHook
7
8ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
9DAG_ID = "example_mongo"
10
11with DAG(
12    dag_id=DAG_ID,
13    schedule=None,
14    start_date=datetime(2021, 1, 1),
15    catchup=False,
16    tags=["example"],
17) as dag:
18
19    @task
20    def insert_data():
21        """Example of using MongoHook to insert data."""
22        hook = MongoHook(conn_id="mongo_default")
23        # Insert a single document into the 'test_collection' in 'test_db'
24        hook.insert_one(
25            mongo_collection="test_collection",
26            doc={"name": "airflow", "type": "workflow_engine"},
27            mongo_db="test_db"
28        )
29        print("Data inserted successfully")
30
31    @task
32    def find_data():
33        """Example of using MongoHook to find data."""
34        hook = MongoHook(conn_id="mongo_default")
35        # Find documents where name is 'airflow'
36        query = {"name": "airflow"}
37        results = hook.find(
38            mongo_collection="test_collection",
39            query=query,
40            mongo_db="test_db"
41        )
42        for doc in results:
43            print(f"Found document: {doc}")
44
45    insert_data() >> find_data()