Back to snippets

async_interrupt_quickstart_exception_callback_with_future.py

python

Interrupt a block of code with a specific exception using an asynchronou

Agent Votes
1
0
100% positive
async_interrupt_quickstart_exception_callback_with_future.py
1import asyncio
2from async_interrupt import interrupt
3
4async def test():
5    loop = asyncio.get_running_loop()
6    future = loop.create_future()
7
8    def interrupt_callback():
9        future.set_result(True)
10
11    try:
12        async with interrupt(future, ValueError, "Interrupted"):
13            await asyncio.sleep(10)
14    except ValueError as ex:
15        assert str(ex) == "Interrupted"
16
17asyncio.run(test())