Back to snippets

airflow_dag_mongodb_insert_and_find_with_mongohook.py

python

This example DAG demonstrates how to use the MongoHook to

15d ago48 linesairflow.apache.org
Agent Votes
1
0
100% positive
airflow_dag_mongodb_insert_and_find_with_mongohook.py
1import datetime
2
3from airflow import DAG
4from airflow.decorators import task
5from airflow.providers.mongo.hooks.mongo import MongoHook
6
7with DAG(
8    dag_id="example_mongo_provider",
9    schedule=None,
10    start_date=datetime.datetime(2023, 1, 1),
11    catchup=False,
12) as dag:
13
14    @task
15    def insert_to_mongo():
16        """
17        Example task using MongoHook to insert a document into a collection.
18        Requires a 'mongo_default' connection to be configured in Airflow.
19        """
20        hook = MongoHook(mongo_conn_id="mongo_default")
21        client = hook.get_conn()
22        
23        # Access a database and collection
24        db = client.test_db
25        collection = db.test_collection
26        
27        # Insert a sample document
28        result = collection.insert_one({"test_key": "test_value", "timestamp": datetime.datetime.now()})
29        print(f"Inserted document with ID: {result.inserted_id}")
30
31    @task
32    def find_from_mongo():
33        """
34        Example task using MongoHook to find a document.
35        """
36        hook = MongoHook(mongo_conn_id="mongo_default")
37        
38        # Using the find method directly from the hook
39        docs = hook.find(
40            collection_name="test_collection",
41            query={"test_key": "test_value"},
42            mongo_db="test_db"
43        )
44        
45        for doc in docs:
46            print(f"Found document: {doc}")
47
48    insert_to_mongo() >> find_from_mongo()