4

Lets say I have a C++ function result_type compute(input_type input), which I have made available to python using cython. My python code executes multiple computations like this:


def compute_total_result()
  inputs = ...
  total_result = ...

  for input in inputs:
    result = compute_python_wrapper(input)
    update_total_result(total_result)

  return total_result

Since the computation takes a long time, I have implemented a C++ thread pool (like this) and written a function std::future<result_type> compute_threaded(input_type input), which returns a future that becomes ready as soon as the thread pool is done executing.

What I would like to do is to use this C++ function in python as well. A simple way to do this would be to wrap the std::future<result_type> including its get() function, wait for all results like this:


def compute_total_results_parallel()
  inputs = ...
  total_result = ...
  futures = []
  for input in inputs:
    futures.append(compute_threaded_python_wrapper(input))

  for future in futures:
    update_total_result(future.get())
  
  return total_result

I suppose this works well enough in this case, but it becomes very complicated very fast, because I have to pass futures around.

However, I think that conceptually, waiting for these C++ results is no different from waiting for file or network I/O.

To facilitate I/O operations, the python devs introduced the async / await keywords. If my compute_threaded_python_wrapper would be part of asyncio, I could simply rewrite it as


async def compute_total_results_async()
  inputs = ...
  total_result = ...
  for input in inputs:
    result = await compute_threaded_python_wrapper(input)
    update_total_result(total_result)

  return total_result

And I could execute the whole code via result = asyncio.run(compute_total_results_async()).

There are a lot of tutorials regarding async programming in python, but most of them deal with using coroutines where the bedrock seem to be some call into the asyncio package, mostly calling asyncio.sleep(delay) as a proxy for I/O.

My question is: (How) Can I implement coroutines in python, enabling python to await the wrapped future object (There is some mention of a __await__ method returning an iterator)?

1 Answer 1

3

First, an inaccuracy in the question needs to be corrected:

If my compute_threaded_python_wrapper would be part of asyncio, I could simply rewrite it as [...]

The rewrite is incorrect: await means "wait until the computation finishes", so the loop as written would execute the code sequentially. A rewrite that actually runs the tasks in parallel would be something like:

# a direct translation of the "parallel" version
def compute_total_results_async()
    inputs = ...
    total_result = ...
    tasks = []
    # first spawn all the tasks
    for input in inputs:
        tasks.append(
            asyncio.create_task(compute_threaded_python_wrapper(input))
        )
    # and then await them
    for task in tasks:
        update_total_result(await task)
    return total_result

This spawn-all-await-all pattern is so uniquitous that asyncio provides a helper function, asyncio.gather(), which makes it much shorter, especially when combined with a list comprehension:

# a more idiomatic version
def compute_total_results_async()
    inputs = ...
    total_result = ...
    results = await asyncio.gather(
        *[compute_threaded_python_wrapper(input) for input in inputs]
    )
    for result in results:
        update_total_result(result)
    return total_result

With that out of the way, we can proceed to the main question:

My question is: (How) Can I implement coroutines in python, enabling python to await the wrapped future object (There is some mention of a __await__ method returning an iterator)?

Yes, awaitable objects are implemented using iterators that yield to indicate suspension. But that is way too low-level a tool for what you actually need. You don't need just any awaitable, but one that works with the asyncio event loop, which has specific expectations of the underlying iterator. You need a mechanism to resume the awaitable when the result is ready, where you again depend on asyncio.

Asyncio already provides awaitable objects that can be externally assigned a value: futures. An asyncio future represents an async value that will become available at some point in the future. They are related to, but not semantically equivalent to C++ futures, and should not to be confused with multi-threaded futures from the concurrent.futures stdlib module.

To create an awaitable object that is activated by something that happens in another thread, you need to create a future, and then start your off-thread task, instructing it to mark the future as completed when it finishes execution. Since asyncio futures are not thread-safe, this must be done using the call_soon_threadsafe event loop method provided by asyncio for such situations. In Python it would be done like this:

def run_async():
    loop = asyncio.get_event_loop()
    future = loop.create_future()
    def on_done(result):
        # when done, notify the future in a thread-safe manner
        loop.call_soon_threadsafe(future.set_result, resut)
    # start the worker in a thread owned by the pool
    pool.submit(_worker, on_done)
    # returning a future makes run_async() awaitable, and
    # passable to asyncio.gather() etc.
    return future

def _worker(on_done):
    # this runs in a different thread
    ... processing goes here ...
    result = ...
    on_done(result)

In your case, the worker would be presumably implemented in Cython combined with C++.

Sign up to request clarification or add additional context in comments.

4 Comments

I appreciate your answer in this matter, of course I should not await single results. I tried to rig up a MWE according to your suggestions. The problem is that, as far as I understand you, the C++ task assigned to the thread pool needs to call back into python in order to set the future, correct? I see Fatal Python error: PyThreadState_Get: no current thread when I try...
@hfhc2 Correct. You also need to acquire the GIL using PyGILState_Ensure before doing anything with python from a non-Python thread (and release it with PyGILState_Release afterwards), see the documentation for details. If you use a C++ wrapper for the Pythnon/C API, it probably offers a RAII-style guard for that as well.
@hfhc2 of course I should not await single results - Apologies if I pointed out what you already know. Many asyncio beginners, especially those without prior exposure to async/await from other languages, expect that await will automatically parallelize their code, and are unpleasantly surprised to learn that it does the almost exact opposite.
Thank you for your help, I added a working example here: github.com/hfhc2/native-future-example

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.