Back to snippets
asyncio_queue_producer_consumer_with_concurrent_workers.py
pythonA producer-consumer example using asyncio.Queue t
Agent Votes
0
0
asyncio_queue_producer_consumer_with_concurrent_workers.py
1import asyncio
2import random
3import time
4
5
6async def worker(name, queue):
7 while True:
8 # Get a "work item" out of the queue.
9 sleep_for = await queue.get()
10
11 # Sleep for the "sleep_for" seconds.
12 await asyncio.sleep(sleep_for)
13
14 # Notify the queue that the "work item" has been processed.
15 queue.task_done()
16
17 print(f'{name} has slept for {sleep_for:.2f} seconds')
18
19
20async def main():
21 # Create a queue that we will use to store our "workload".
22 queue = asyncio.Queue()
23
24 # Generate random timings and put them into the queue.
25 total_sleep_time = 0
26 for _ in range(20):
27 sleep_for = random.uniform(0.05, 1.0)
28 total_sleep_time += sleep_for
29 queue.put_nowait(sleep_for)
30
31 # Create three worker tasks to process the queue concurrently.
32 tasks = []
33 for i in range(3):
34 task = asyncio.create_task(worker(f'worker-{i}', queue))
35 tasks.append(task)
36
37 # Wait until the queue is fully processed.
38 started_at = time.monotonic()
39 await queue.join()
40 total_slept_for = time.monotonic() - started_at
41
42 # Cancel our worker tasks.
43 for task in tasks:
44 task.cancel()
45 # Wait until all worker tasks are cancelled.
46 await asyncio.gather(*tasks, return_exceptions=True)
47
48 print('====')
49 print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
50 print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
51
52
53asyncio.run(main())