13

I've got a function download_all that iterates through a hardcoded list of pages to download them all in sequence. But if I'd like to dynamically add to the list based on the results of a page, how can I do it? For example download the first page, parse it, and based on the results add others to the event loop.

@asyncio.coroutine
def download_all():
    first_page = 1
    last_page = 100
    download_list = [download(page_number) for page_number in range(first_page, last_page)]
    gen = asyncio.wait(download_list)
    return gen

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    futures = loop.run_until_complete(download_all())

2 Answers 2

10

One way to accomplish this is by using a Queue.

#!/usr/bin/python3

import asyncio

try:  
    # python 3.4
    from asyncio import JoinableQueue as Queue
except:  
    # python 3.5
    from asyncio import Queue

@asyncio.coroutine
def do_work(task_name, work_queue):
    while not work_queue.empty():
        queue_item = work_queue.get_nowait()

        # simulate condition where task is added dynamically
        if queue_item % 2 != 0:
            work_queue.put_nowait(2)
            print('Added additional item to queue')

        print('{0} got item: {1}'.format(task_name, queue_item))
        yield from asyncio.sleep(queue_item)
        print('{0} finished processing item: {1}'.format(task_name, queue_item))

if __name__ == '__main__':

    queue = Queue()

    # Load initial jobs into queue
    [queue.put_nowait(x) for x in range(1, 6)] 

    # use 3 workers to consume tasks
    taskers = [ 
        do_work('task1', queue),
        do_work('task2', queue),
        do_work('task3', queue)
    ]   

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(taskers))
    loop.close()

Using a queue from asyncio you can ensure that the "units" of work are separate from the tasks/futures that are given to asyncio's event loop initially. Basically this allows for the addition of extra "units" of work given some condition.

Note that in the example above even numbered tasks are terminal so an additional task is not added if that is the case. This eventually results in the completion of all tasks, but in your case you could easily use another condition to determine whether another item is added to the queue or not.

Output:

Added additional item to queue
task2 got item: 1
task1 got item: 2
Added additional item to queue
task3 got item: 3
task2 finished processing item: 1
task2 got item: 4
task1 finished processing item: 2
Added additional item to queue
task1 got item: 5
task3 finished processing item: 3
task3 got item: 2
task3 finished processing item: 2
task3 got item: 2
task2 finished processing item: 4
task2 got item: 2
task1 finished processing item: 5
task3 finished processing item: 2
task2 finished processing item: 2
Sign up to request clarification or add additional context in comments.

Comments

0

Please take a look on Web Crawler example.

It uses asyncio.JoinableQueue queue to storing urls for fetch tasks, but demonstrate a lot of useful techniques also.

2 Comments

It would be nice if you gave an actual example here and then referenced this one.
The example is bigger than usual SO ones. I doubt that it worth to be pushed here.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.