13

I have a situation where I have a "server" thread which should listen calls/events from other server threads while at the same time executing some other code. Lately I have worked a lot with Node.js so I thought that it would be nice to use async/await to create an event loop where I can wait other threads to join in event loop and handle their response when they finally join.

To test the idea I wrote the following test script in Python 3.5:

# http://stackabuse.com/python-async-await-tutorial/
# Testing out Python's asynchronous features
import asyncio
from time import sleep
import threading
from threading import Thread
import random

class MyThread(Thread):

    def __init__(self, message):
        Thread.__init__(self)
        self._message = message

    def run(self):
        self._return = self._message + " oli viesti"
        a = random.randint(1, 5)
        print("Sleep for ", a)
        sleep(a)
        print("Thread exiting...")


    def join(self):
        Thread.join(self)
        return self._return



async def send(message):
    t = MyThread(message)  # daemon = True
    t.start()
    print("asd")
    return t.join()

async def sendmsg(msg):
    response = await send(msg)
    print("response is ", response)


if __name__ == "__main__":
    # Initiate a new thread and pass in keyword argument dictionary as parameters
    loop = asyncio.get_event_loop()
    tasks = [
        asyncio.ensure_future(sendmsg("hippa1"), loop=loop),
        asyncio.ensure_future(sendmsg("hippa2"), loop=loop),
        asyncio.ensure_future(sendmsg("hippa3"), loop=loop)
    ]

    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

In the example I want to kick off three worker threads with different strings and wait for them to finish. Workers sleep random amount of time so I expect them to finish in random order when script is run multiple times. Turns out that they seem to execute sequentially instead, second thread starting after first.

What is my error here? Shouldn't sleep block only the thread it is in? Is my event loop set up correctly? Can I async/await joins?

Ultimately I want to send messages to other threads and wait for their response to then run a callback function with returned value.

EDIT: To clarify, ultimately I want to wait for conditional variables with async/await in my main thread and run other code until some of the conditional variables let execution through. In this example code I was trying to do the same with worker thread's join.

2
  • time.sleep isn't asynchronous, try it with await asyncio.sleep Commented May 14, 2016 at 7:46
  • But it is triggered in a separate thread so shouldn't it just block the separate thread instead of the main thread where the event loop is? My understanding is that sleep is thread, not process blocking. So why join blocks my main thread even with async/await structure? Commented May 14, 2016 at 11:42

2 Answers 2

4

Ultimately, it is running sequentially because of this code:

async def send(message):
    t = MyThread(message)  # daemon = True
    t.start()
    print("asd")
    return t.join()

You start a thread, then immediately wait on that thread to finish before you continue. This is why they're executed sequentially.

Node.js and asyncio do not necessarily create new threads to execute their operations. Node.js for example only ever uses a single thread, but it uses kernel level functions ( for example 'epoll' ) to call the callbacks that you indicate when some new network activity occurred. This allows a single thread to manage hundreds of network connections.

That's why probably when you executed this without the Thread instance, you'd call sleep on the currently running thread, which is the same as the main thread. When you use asyncio with networking functions, then you can use "yield from" structures, which allows other code blocks to execute while the other tasks are doing things with other remote services.

The main structure is correct. You want this block of code:

loop.run_until_complete(asyncio.wait(tasks))

But don't rely on 'sleep' to test the function, you need to make a networking call, or use:

yield from asyncio.sleep(1)

And there's no need to start separate threads in that case.

Sign up to request clarification or add additional context in comments.

4 Comments

Good point about the thread starting. However, in my case I truly need separate threads but I want to wait for conditions from them asynchronously if possible. So I'd like to have a function which checks join in event loop but if still executes other arbitrary code until join unblocks in loop. So my case isn't completely analogue to Node.js and I instead want to wait other threads to finish/set conditional variable without blocking my main thread by using async event loop.
In short I can't figure out how to write something like: yield t.join()
In that case I suggest you don't use asyncio at all, but do work with a queue inbetween. You can wait on the queue with task_done() and use get() to pick up work items. You can then use another queue to communicate the results back to the main thread. There's a code example that shows it on the doc page: docs.python.org/3/library/queue.html
Good to know. I actually finally implemented it that way when I couldn't get asyncio to work. Architecture turned out clean enough to work with that.
1

You already know the reason: Thread.join() blocks the event loop. So I will go straight to the solution.

  1. If you really need asynchronous Thread.join(), you're out of luck. Because in this case you need to start a separate thread to wait:

    await asyncio.to_thread(t.join)
    

    or

    await asyncio.get_running_loop().run_in_executor(executor, t.join)
    

    So due to the way threads are implemented in the threading module. And of course it has disadvantages: you can't cancel a started thread. You will have to forget about asyncio timeouts, because every second attempt to do Thread.join() will start another thread. You will actually get a thread leak!

    Both alternative and combined solutions will be to poll that the thread is alive. This way you will be wasting CPU resources. If you start an extra thread to wait, so the return will be fast. But if it's not, the return will be as long as the timeout you set.

  2. If you have to wait for an event from another thread, you're in luck. This question has already been asked on StackOverflow. Just use any solution from there and call the set() method in your thread.

  3. If you want to wait for something to be processed by multiple threads, you need a proper notification mechanism. I suggest not reinventing the wheel, as there is already aiologic.CountdownEvent (I'm the creator of aiologic).

    import time
    import asyncio
    
    from threading import Thread
    
    from aiologic import CountdownEvent
    
    
    def work(event):
        try:
            time.sleep(1)
        finally:
            event.down()  # -1 thread to wait
    
    
    async def main():
        event = CountdownEvent()
    
        for _ in range(4):
            event.up()  # +1 thread to wait
    
            Thread(target=work, args=[event]).start()
    
        print("before")
    
        await event  # wait for all threads
    
        print("after")
    
    
    asyncio.run(main())
    

    This is a universal synchronization primitive with which you can wait for what you want. The up() and down() methods respectively increase and decrease the countdown event value. When it's zero, all waiting tasks are woken up.

  4. If you try to do what existing synchronization primitives usually do, don't do it. Use the synchronization primitives from aiologic, such as aiologic.Lock, aiologic.Semaphore, or aiologic.Condition. They just work.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.