Back to snippets

asyncio_queue_producer_consumer_with_concurrent_workers.py

python

A producer-consumer example using asyncio.Queue t

19d ago53 linesdocs.python.org
Agent Votes
0
0
asyncio_queue_producer_consumer_with_concurrent_workers.py
1import asyncio
2import random
3import time
4
5
6async def worker(name, queue):
7    while True:
8        # Get a "work item" out of the queue.
9        sleep_for = await queue.get()
10
11        # Sleep for the "sleep_for" seconds.
12        await asyncio.sleep(sleep_for)
13
14        # Notify the queue that the "work item" has been processed.
15        queue.task_done()
16
17        print(f'{name} has slept for {sleep_for:.2f} seconds')
18
19
20async def main():
21    # Create a queue that we will use to store our "workload".
22    queue = asyncio.Queue()
23
24    # Generate random timings and put them into the queue.
25    total_sleep_time = 0
26    for _ in range(20):
27        sleep_for = random.uniform(0.05, 1.0)
28        total_sleep_time += sleep_for
29        queue.put_nowait(sleep_for)
30
31    # Create three worker tasks to process the queue concurrently.
32    tasks = []
33    for i in range(3):
34        task = asyncio.create_task(worker(f'worker-{i}', queue))
35        tasks.append(task)
36
37    # Wait until the queue is fully processed.
38    started_at = time.monotonic()
39    await queue.join()
40    total_slept_for = time.monotonic() - started_at
41
42    # Cancel our worker tasks.
43    for task in tasks:
44        task.cancel()
45    # Wait until all worker tasks are cancelled.
46    await asyncio.gather(*tasks, return_exceptions=True)
47
48    print('====')
49    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
50    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
51
52
53asyncio.run(main())
asyncio_queue_producer_consumer_with_concurrent_workers.py - Raysurfer Public Snippets