0

Could you tell me if this is a correct approach to build several independent async loops inside own threads?

def init():
    print("Initializing Async...")
    global loop_heavy
    loop_heavy = asyncio.new_event_loop()
    start_loop(loop_heavy)

def start_loop(loop):
    thread = threading.Thread(target=loop.run_forever)
    thread.start()

def submit_heavy(task):
    future = asyncio.run_coroutine_threadsafe(task, loop_heavy)
    try:
        future.result()
    except Exception as e:
        print(e)

def stop():
    loop_heavy.call_soon_threadsafe(loop_heavy.stop)

async def heavy():
    print("3. heavy start %s" % threading.current_thread().name)
    await asyncio.sleep(3) # or await asyncio.sleep(3, loop=loop_heavy)
    print("4. heavy done")

Then I am testing it with:

if __name__ == "__main__":
    init()
    print("1. submit heavy: %s" % threading.current_thread().name)
    submit_heavy(heavy())
    print("2. submit is done")
    stop()

I am expecting to see 1->3->2->4 but in fact it is 1->3->4->2:

Initializing Async...
1. submit heavy: MainThread
3. heavy start Thread-1
4. heavy done
2. submit is done

I think that I miss something in understanding async and threads.
Threads are different. Why am I waiting inside MainThread until the job inside Thread-1 is finished?

2 Answers 2

3

Why am I waiting inside MainThread until the job inside Thread-1 is finished?

Good question, why are you?

One possible answer is, because you actually want to block the current thread until the job is finished. This is one of the reasons to put the event loop in another thread and use run_coroutine_threadsafe.

The other possible answer is that you don't have to if you don't want. You can simply return from submit_heavy() the concurrent.futures.Future object returned by run_coroutine_threadsafe, and leave it to the caller to wait for the result (or check if one is ready) at their own leisure.

Finally, if your goal is just to run a regular function "in the background" (without blocking the current thread), perhaps you don't need asyncio at all. Take a look at the concurrent.futures module, whose ThreadPoolExecutor allows you to easily submit a function to a thread pool and leave it to execute unassisted.

Sign up to request clarification or add additional context in comments.

2 Comments

I don't want to block MainThread. If I return Future from run_coroutine_threadsafe back into the MainThread - the MainThread will be blocked by waiting until the Future is finished, isn't it?
@IgorZ No, it won't, that's the whole point of Future. (And be careful not to confuse asyncio Futures with concurrent.future Futures, although in this particular respect they're the same.) When you have a future object on your hands, you get to choose if and when to block waiting for its result.
-1

I will add one of the possible solutions that I found from the asyncio documentation.
I'm not sure that it is the correct way, but it works as expected (MainThread is not blocked by the execution of the child thread)

Running Blocking Code
Blocking (CPU-bound) code should not be called directly. For example, if a function performs a CPU-intensive calculation for 1 second, all concurrent asyncio Tasks and IO operations would be delayed by 1 second.
An executor can be used to run a task in a different thread or even in a different process to avoid blocking block the OS thread with the event loop. See the loop.run_in_executor() method for more details.

Applying to my code:

import asyncio
import threading
import concurrent.futures
import multiprocessing
import time

def init():
    print("Initializing Async...")

    global loop, thread_executor_pool

    thread_executor_pool = concurrent.futures.ThreadPoolExecutor(max_workers=multiprocessing.cpu_count())
    loop = asyncio.get_event_loop()

    thread = threading.Thread(target=loop.run_forever)
    thread.start()

def submit_task(task, *args):
    loop.run_in_executor(thread_executor_pool, task, *args)

def stop():
    loop.call_soon_threadsafe(loop.stop)
    thread_executor_pool.shutdown()

def blocked_task(msg1, msg2):
    print("3. task start msg: %s, %s, thread: %s" % (msg1, msg2, threading.current_thread().name))
    time.sleep(3)
    print("4. task is done -->")

if __name__ == "__main__":
    init()
    print("1. --> submit task: %s" % threading.current_thread().name)
    submit_task(blocked_task, "a", "b")

    print("2. --> submit is done")
    stop()

Output:

Initializing Async...

1. --> submit task: MainThread
3. task start msg: a, b, thread: ThreadPoolExecutor-0_0
2. --> submit is done
4. task is done  -->

Correct me if there are still any mistakes or it can be done in the other way.

3 Comments

It's not correct because run_in_executor returns a future they you're supposed to await, which your code never does. If you just want to run a regular function "on the background" (without blocking the current thread), perhaps you don't need asyncio at all. Take a look at concurrent.futures, which allows you to submit a function to a thread pool and leave it to execute unassisted.
Yes, I think you are right. It is more like executing task in a background without using async at all. Thank you for this notice.
I've now updated my answer to cover the option of not using asyncio at all.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.