50

How can I asynchronously insert tasks to run in an asyncio event loop running in another thread?

My motivation is to support interactive asynchronous workloads in the interpreter. I can't block the main REPL thread.

Example

My current flawed understanding says that the following should work. Why doesn't it? What is a better way to accomplish goal above?

import asyncio
from threading import Thread

loop = asyncio.new_event_loop()

def f(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

t = Thread(target=f, args=(loop,))
t.start()    

@asyncio.coroutine
def g():
    yield from asyncio.sleep(1)
    print('Hello, world!')

asyncio.async(g(), loop=loop)
3
  • 1
    You may try urwid as REPL -- it works with asyncio out of the box. Commented Aug 18, 2015 at 0:21
  • Or ipython -- it also can run async functions right in the REPL since version 7.0. Commented Feb 25, 2019 at 11:34
  • 2
    Is there any way to run a whole python script in another thread. I have similar needs as specified in this topic. Commented Jun 22, 2020 at 17:00

2 Answers 2

30

You must use call_soon_threadsafe to schedule callbacks from different threads:

import asyncio
from threading import Thread

loop = asyncio.new_event_loop()

def f(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

t = Thread(target=f, args=(loop,))
t.start()    

@asyncio.coroutine
def g():
    yield from asyncio.sleep(1)
    print('Hello, world!')

loop.call_soon_threadsafe(asyncio.async, g())

See https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading for more information.

EDIT: Example of an interpreter supporting asynchronous workloads

# vim: filetype=python3 tabstop=2 expandtab

import asyncio as aio
import random

@aio.coroutine
def async_eval(input_, sec):
  yield from aio.sleep(sec)
  print("")
  try:
    result = eval(input_)
  except Exception as e:
    print("< {!r} does not compute >".format(input_))
  else:  
    print("< {!r} = {} >".format(input_, result))

@aio.coroutine
def main(loop):
  while True:
    input_ = yield from loop.run_in_executor(None, input, "> ")

    if input_ == "quit":
      break
    elif input_ == "":
      continue
    else:
      sec = random.uniform(5, 10)
      print("< {!r} scheduled for execution in {:.02} sec>".format(input_, sec))
      aio.async(async_eval(input_, sec))

loop = aio.get_event_loop()

loop.run_until_complete(main(loop))
loop.close()
Sign up to request clarification or add additional context in comments.

9 Comments

Nice. How about if I wanted to get the returned result of g()? (if g actually returned something.)
You could pass a Future into g and set it's result from within g and then you could yield from that_future in another event loop in the other thread. You could also just create another coroutine in which you yield from g() and then call_soon_threadsafe on that new coroutine.
if you need to run a coroutine on the event loop in the other thread use: asyncio.run_coroutine_threadsafe(my_coro(param1), loop)
What is the valid syntax for 3.7- I'm hitting this too
@PhilBot Hopefully you aren't still waiting for an answer but I created a 3.7+ example in a separate answer.
|
15

The first example in Jashandeep Sohi's answer does not work for me in 3.7+ and prints warnings about the deprecated annotation. I reworked this into a something that runs under 3.8. I tweaked it a little to meet my needs as well. I am new to multi-threading in Python (but not multithreading in general) so any advice, guidance, etc. is appreciated:

import asyncio
from threading import Thread


loop = asyncio.new_event_loop()
running = True


def evaluate(future):
    global running
    stop = future.result()
    if stop:
        print("press enter to exit...")
        running = False


def side_thread(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


thread = Thread(target=side_thread, args=(loop,), daemon=True)
thread.start()


async def display(text):
    await asyncio.sleep(5)
    print("echo:", text)
    return text == "exit"


while running:
  text = input("enter text: ")
  future = asyncio.run_coroutine_threadsafe(display(text), loop)
  future.add_done_callback(evaluate)


print("exiting")

The echo and other output will conflict with the prompts but it should be good enough to demonstrate it is working.

One thing I am unsure about is setting the global running from one thread and reading it from another. I think maybe the GIL synchronizes the thread cache but I'd love to get confirmation (or not) about that.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.