1

I am using aiohttp session along with a semaphore within a custom class:

async def get_url(self, url):

    async with self.semaphore:
        async with self.session.get(url) as response:
            try:
                text_response = await response.text()
                read_response = await response.read()
                json_response = await response.json()
                await asyncio.sleep(random.uniform(0.1, 0.5))
            except aiohttp.client_exceptions.ContentTypeError:
                json_response = {}

            return {
                'json': json_response,
                'text': text_response,
                'read': read_response,
                'status': response.status,
                'url': response.url,
            }

I have two questions:

  1. Is it correct/incorrect to to have multiple await statements within a single async function? I need to return both the response.text() and response.read(). However, depending on the URL, the response.json() may or may not be available so I've thrown everything into a try/except block to catch this exception.

  2. Since I am using this function to loop through a list of different RESTful API endpoints, I am controlling the number of simultaneous requests through the semaphore (set to max of 100) but I also need to stagger the requests so they aren't log jamming the host machine. So, I thought I could accomplish this by adding an asyncio.sleep that is randomly chosen between 0.1-0.5 seconds. Is this the best way to enforce a small wait in between requests? Should I move this to the beginning of the function instead of near the end?

3
  • Did you try using: coroutine asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED) ? It's in the official python documentation: docs.python.org/3/library/asyncio-task.html#coroutine Commented May 9, 2018 at 0:35
  • I've used it in other contexts but I'm failing to see how/where it will apply here. I don't want to wait for all of the URLs to return back at the same time. I only want to wait for the response components and sleep to be completed at the same time. Can you provide some code to elaborate further on what you mean? Commented May 9, 2018 at 1:04
  • Sorry I thought you wanted to wait to all the responses to end, I guess you could do that way, using this random sleep function, but I can't ensure that this is the best way or recomended way of doing it, I guess there is a way to do all the requests without repeating code Commented May 9, 2018 at 1:18

1 Answer 1

2
  1. It is absolutely fine to have multiple awaits in one async function, as far as you know what you are awaiting for, and each of them are awaited one by one, just like the very normal sequential execution. One thing to mention about aiohttp is that, you'd better call read() first and catch UnicodeDecodeError too, because internally text() and json() call read() first and process its result, you don't want the processing to prevent returning at least read_response. You don't have to worry about read() being called multiple times, it is simply cached in the response instance on the first call.

  2. Random stagger is an easy and effective solution for sudden traffic. However if you want to control exactly the minimum time interval between any two requests - for academic reasons, you could set up two semaphores:

    def __init__(self):
        # something else
        self.starter = asyncio.Semaphore(0)
        self.ender = asyncio.Semaphore(30)
    

    Then change get_url() to use them:

    async def get_url(self, url):
        await self.starter.acquire()
        try:
            async with self.session.get(url) as response:
                # your code
        finally:
            self.ender.release()
    

    Because starter was initialized with zero, so all get_url() coroutines will block on starter. We'll use a separate coroutine to control it:

    async def controller(self):
        last = 0
        while self.running:
            await self.ender.acquire()
            sleep = 0.5 - (self.loop.time() - last)  # at most 2 requests per second
            if sleep > 0:
                await asyncio.sleep(sleep)
            last = self.loop.time()
            self.starter.release()
    

    And your main program should look something like this:

    def run(self):
        for url in [...]:
            self.loop.create_task(self.get_url(url))
        self.loop.create_task(self.controller())
    

    So at first, the controller will release starter 30 times evenly in 15 seconds, because that is the initial value of ender. After that, the controller would release starter as soon as any get_url() ends, if 0.5 seconds have passed since the last release of starter, or it will wait up to that time.

    One issue here: if the URLs to fetch is not a constant list in memory (e.g. coming from network constantly with unpredictable delays between URLs), the RPS limiter will fail (starter released too early before there is actually a URL to fetch). You'll need further tweaks for this case, even though the chance of a traffic burst is already very low.

Sign up to request clarification or add additional context in comments.

1 Comment

Thank you for your answer. I'm not sure I follow how to correctly use two semaphores. Would you mind providing a more complete code example or point me to relevant resources? I would greatly appreciate it as it sounds like something I should learn.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.