4

I'm currently migrating some Python code that used to be blocking to use asyncio with async/await. It is a lot of code to migrate at once so I would prefer to do it gradually and have metrics. With that thing in mind I want to create a decorator to wrap some functions and know how long they are blocking the event loop. For example:

def measure_blocking_code(f):
    def wrapper(*args, **kwargs):
        # ?????
        # It should measure JUST 1 second
        # not 5 which is what the whole async function takes
    return wrapper

@measure_blocking_code
async def my_function():
    my_blocking_function()  # Takes 1 seconds
    await my_async_function()  # Takes 2 seconds
    await my_async_function_2()  # Takes 2 seconds

I know the event loop has a debug function that already report this, but I need to get that information for specific functions.

2
  • 1
    If you decorate my_function, and my_function ACTUALLY takes 5 seconds to execute, how do you expect Python to know that you really want to measure the time duration of one of its internal steps? Shouldn't you be decorating my_blocking_function instead? Commented Jul 19, 2022 at 4:16
  • @PaulCornelius Python yields the control when you call await. 4 of those 5 seconds are spent in the event loop, not in my code. The reason I need to decorate my_function is because I don't exactly know which calls are blocking. In this example is really easy to see, but in a big codebase is harder, also I need the aggregate, not each function individually. Commented Jul 19, 2022 at 15:04

3 Answers 3

4

TLDR;

This decorator does the job:

def measure_blocking_code(f):
    async def wrapper(*args, **kwargs):
        t = 0
        coro = f()
        try:
            while True:
                t0 = time.perf_counter()
                future = coro.send(None)
                t1 = time.perf_counter()
                t += t1 - t0
                while not future.done():
                    await asyncio.sleep(0)
                future.result()  # raises exceptions if any
        except StopIteration as e:
            print(f'Function took {t:.2e} sec')
            return e.value
    return wrapper

Explanation

This workaround exploits the conventions used in asyncio implementation in cPython. These conventions are a superset of PEP-492. In other words:

  1. You can generally use async/await without knowing these details.
  2. This might not work with other async libraries like trio.

An asyncio coro object (coro) can be executed by calling .send() member. This will only run the blocking code, until an async call yields a Future object. By only measuring the time spent in .send(), the duration of the blocking code can be determined.

Sign up to request clarification or add additional context in comments.

Comments

0

I finally found the way. I hope it helps somebody


import asyncio
import time


def measure(f):    
    async def wrapper(*args, **kwargs):
        coro_wrapper = f(*args, **kwargs).__await__()
        fut = asyncio.Future()

        total_time = 0
        def done(arg=None):
            try:
                nonlocal total_time
                start_time = time.perf_counter()
                next_fut = coro_wrapper.send(arg)
                end_time = time.perf_counter()
                total_time += end_time - start_time
                next_fut.add_done_callback(done)
            except StopIteration:
                fut.set_result(arg)
            except Exception as e:
                fut.set_exception(e)

        done()
        res = await fut
        print('Blocked for: ' + str(total_time) + ' seconds')
        return res

    return wrapper

Comments

0

Based on the above code from The SWE, I made some improvements as follows:

def measure1(f):
    async def wrapper(*args, **kwargs):
        coroutine = f(*args, **kwargs).__await__()
        fut = asyncio.Future()
        s = time.perf_counter()

        def done(arg=None):
            try:
                next_ = coroutine.send(arg)
                next_.add_done_callback(done)
            except StopIteration as e:
                print(round(time.perf_counter() - s, 2))
                fut.set_result(e.value)
        done()

        return await fut
    return wrapper

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.