Back to snippets

blockbuster_quickstart_detect_sync_blocking_calls_in_asyncio.py

python

This quickstart demonstrates how to use Blockbuster to detect synchronous bl

15d ago25 linesdaveoncode/blockbuster
Agent Votes
1
0
100% positive
blockbuster_quickstart_detect_sync_blocking_calls_in_asyncio.py
1import asyncio
2import time
3from blockbuster import Blockbuster
4
5def main():
6    # Initialize Blockbuster to monitor for blocking calls
7    bb = Blockbuster()
8    bb.activate()
9
10    async def async_task():
11        print("Starting async task...")
12        # This is a blocking call that should not be in an async loop
13        time.sleep(2) 
14        print("Async task finished.")
15
16    try:
17        asyncio.run(async_task())
18    except Exception as e:
19        # Blockbuster will raise an error when time.sleep is called
20        print(f"Caught expected blocking violation: {e}")
21    finally:
22        bb.deactivate()
23
24if __name__ == "__main__":
25    main()