7

I have a simple class that leverages an async generator to retrieve a list of URLs:

import aiohttp
import asyncio
import logging
import sys

LOOP = asyncio.get_event_loop()
N_SEMAPHORE = 3

FORMAT = '[%(asctime)s] - %(message)s'
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)

class ASYNC_GENERATOR(object):
    def __init__(self, n_semaphore=N_SEMAPHORE, loop=LOOP):
        self.loop = loop
        self.semaphore = asyncio.Semaphore(n_semaphore)
        self.session = aiohttp.ClientSession(loop=self.loop)

    async def _get_url(self, url):
        """
        Sends an http GET request to an API endpoint
        """

        async with self.semaphore:
            async with self.session.get(url) as response:
                logger.info(f'Request URL: {url} [{response.status}]')
                read_response = await response.read()

                return {
                    'read': read_response,
                    'status': response.status,
                }

    def get_routes(self, urls):
        """
        Wrapper around _get_url (multiple urls asynchronously)

        This returns an async generator
        """

        # Asynchronous http GET requests
        coros = [self._get_url(url) for url in urls]
        futures = asyncio.as_completed(coros)
        for future in futures:
            yield self.loop.run_until_complete(future)

    def close(self):
        self.session._connector.close()

When I execute this main part of the code:

if __name__ == '__main__':
    ag = ASYNC_GENERATOR()
    urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
    responses = ag.get_routes(urls)
    for response in responses:
        response = next(ag.get_routes(['https://httpbin.org/get']))
    ag.close()

The log prints out:

[2018-05-15 12:59:49,228] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 12:59:49,235] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 12:59:49,242] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 12:59:49,285] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 12:59:49,290] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 12:59:49,295] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 12:59:49,335] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 12:59:49,340] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 12:59:49,347] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 12:59:49,388] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 12:59:49,394] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,444] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,503] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,553] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,603] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,650] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,700] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,825] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,875] - Request URL: https://httpbin.org/get [200]
[2018-05-15 12:59:49,922] - Request URL: https://httpbin.org/get [200]

Since responses is an async generator, I expect it to yield one response from the async generator (which should only send the request upon actually yielding), send a separate request to the endpoint with no x parameter, and then yield the next response from the async generator. This should flip back and forth between a request with an x parameter and a request with no parameters. Instead, it is yielding all responses from the async generator with an x parameter and then followed by all of the https requests that have no parameters.

Something similar happens when I do:

ag = ASYNC_GENERATOR()
urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
responses = ag.get_routes(urls)
next(responses)
response = next(ag.get_routes(['https://httpbin.org/get']))
ag.close()

And the log prints:

[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,656] - Request URL: https://httpbin.org/get?x=1 [200]
[2018-05-15 13:08:38,681] - Request URL: https://httpbin.org/get?x=3 [200]
[2018-05-15 13:08:38,695] - Request URL: https://httpbin.org/get?x=4 [200]
[2018-05-15 13:08:38,717] - Request URL: https://httpbin.org/get?x=6 [200]
[2018-05-15 13:08:38,741] - Request URL: https://httpbin.org/get?x=2 [200]
[2018-05-15 13:08:38,750] - Request URL: https://httpbin.org/get?x=0 [200]
[2018-05-15 13:08:38,773] - Request URL: https://httpbin.org/get?x=9 [200]
[2018-05-15 13:08:38,792] - Request URL: https://httpbin.org/get?x=7 [200]
[2018-05-15 13:08:38,803] - Request URL: https://httpbin.org/get?x=5 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]

Instead, what I want is:

[2018-05-15 13:08:38,643] - Request URL: https://httpbin.org/get?x=8 [200]
[2018-05-15 13:08:38,826] - Request URL: https://httpbin.org/get [200]

There are times when I want to retrieve all of the responses first before doing anything else. However, there are also times when I want to interject and make intermediate requests before yielding the next item from the generator (i.e., the generator returns results from paginated search results and I want to process further links from each page before moving onto the next page).

What do I need to change to achieve the required result?

7
  • responses is not an async generator; it's a regular, normal, synchronous generator function. To be async, you'd have to use async def responses(...): Commented May 15, 2018 at 17:18
  • I don't think I follow. Would you mind providing a code snippet? Would async def responses(...): sit between get_routes and _get_url? Or do I use this inside __main__? And what would I be awaiting inside async def responses(...):? Thank you for your patience as I am still learning this complicated async stuff! Commented May 15, 2018 at 17:44
  • @MartijnPieters The trouble is that the OP is using as_completed, which is itself an ordinary generator, but designed (strangely) for use with asyncio. Thus it comes naturally to wrap it using another ordinary generator. Using run_until_complete on coroutines yielded (synchronously) by as_completed is a legitimate way of using it, though not something I'd recommend. Commented May 15, 2018 at 18:07
  • @user4815162342 I am flexible and willing to learn the best way to go about this if you don't mind offering an answer with the right solution? I cobbled this together using limited knowledge scraped from disparate examples from the web so I'd appreciate any help that I can get. Hopefully, it is clear what I am trying to achieve? Commented May 15, 2018 at 18:21
  • 1
    @slaw: sorry, that was too brief a comment and I had to step away. The term async generator has a very specific meaning in Python; your generator function pushes tasks into the asyncio loop but is not an async generator. You are firing off a series of tasks and then waiting for the next one to be complete; all those tasks are executing cooperatively. Commented May 15, 2018 at 19:21

1 Answer 1

7

Leaving aside the technical question of whether responses is an async generator (it's not, as Python uses the term), your problem lies in as_completed. as_completed starts a bunch of coroutines in parallel and provides means to obtain their results as they complete. That the futures run in parallel is not exactly obvious from the documentation (improved in later versions), but it makes sense if you consider that the original concurrent.futures.as_completed works on thread-based futures which revolve around parallel execution. Conceptually, the same is true of asyncio futures.

Your code obtains only the first (fastest-arriving) result and then start doing something else, also using asyncio. The remaining coroutines passed to as_completed are not frozen up merely because no one is collecting their results - they are doing their jobs in the background, and once done are ready to be awaited (in your case by the code inside as_completed, which you access using loop.run_until_complete()). I would venture to guess that the URL without parameters takes longer to retrieve than the URL with just the parameter x, which is why it gets printed after all other coroutines.

In other words, those log lines being printed means that asyncio is doing its job and providing the parallel execution you requested! If you don't want parallel execution, then don't ask for it, execute them serially:

def get_routes(self, urls):
    for url in urls:
        yield loop.run_until_complete(self._get_url(url))

But this is a poor way of using asyncio - its main loop is non-reentrant, so to ensure composability, you almost certainly want the loop to be spun just once once at top-level. This is typically done with a construct like loop.run_until_complete(main()) or loop.run_forever(). As Martijn pointed out, you could achieve that, while retaining the nice generator API, by making get_routes an actual async generator:

async def get_routes(self, urls):
    for url in urls:
        result = await self._get_url(url)
        yield result

Now you can have a main() coroutine that looks like this:

async def main():
    ag = ASYNC_GENERATOR()
    urls = [f'https://httpbin.org/get?x={i}' for i in range(10)]
    responses = ag.get_routes(urls)
    async for response in responses:
        # simulate `next` with async iteration
        async for other_response in ag.get_routes(['https://httpbin.org/get']):
            break
    ag.close()

loop.run_until_complete(main())
Sign up to request clarification or add additional context in comments.

11 Comments

The get_routes() async generator could also add the plain non-query-string URL when the await self._get_url(url) call returns a result, perhaps after the yield result? That way at least those requests would be run in parallel to the synchronous processing of the urls list.
@MartijnPieters You mean create the task before yield result (which is safe because at that point we know the URL is done and the order respected), but actually await (and yield) it afterwards. That's very clever (unless the OP wanted them not to overlap at all), but it breaks the get_routes abstraction, and might be a bit too advanced for an asyncio beginner.
Sure, I'd at least change the name of the function, I'm thinking of a general refactor to get closer to what the OP was trying to do.
Is there a reason why loop.run_until_complete can't be placed within a function? From an abstraction standpoint, I don't want the user to have to know anything about async or event loops. I've seen the main() function being used in many online examples but now the user needs to know to call the class, throw it inside a main() func, and pass it to the event loop when all the want is paginated responses. Also, as an async noob, I'd be happy to study the more clever code if you wanted to include at the bottom as an update? I'll play with this more but, nonetheless, thank you for your help!
@slaw Sure, If you really don't want the caller to know anything about asyncio, you can place run_until_complete in a function. I was just pointing out that it's a suboptimal use of asyncio. Normally asyncio allows one to compose different async code by starting multiple coroutines "in parallel" or integrating them with callback-based futures. You can even have coroutines that don't know of each other, all running in the same event loop. By hiding the use of asyncio inside a function, you have a design that disables that.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.