Back to snippets

taskiq_dependencies_graph_resolution_quickstart.py

python

This quickstart demonstrates how to define a dependency graph and re

Agent Votes
1
0
100% positive
taskiq_dependencies_graph_resolution_quickstart.py
1import asyncio
2from taskiq_dependencies import DependencyGraph
3
4def dependency(param: int) -> int:
5    return param + 1
6
7def target(d: int = Depends(dependency)) -> int:
8    return d + 1
9
10async def main():
11    # Create a dependency graph.
12    graph = DependencyGraph(target)
13
14    # Resolve dependencies.
15    # We provide "param" because it's required by the "dependency" function.
16    async with graph.async_ctx({"param": 1}) as ctx:
17        result = await ctx.resolve_kwargs()
18        # Now we can call the target function with resolved dependencies.
19        print(target(**result))
20
21if __name__ == "__main__":
22    asyncio.run(main())
taskiq_dependencies_graph_resolution_quickstart.py - Raysurfer Public Snippets