1

I would like to implement and run custom Python coroutines (without using asyncio) to have a better "under the hood" understanding of asynchronous mechanisms.

I was expected to be able to use concurrency to start a second task when the first task is waiting, doing nothing.

Here the synchronous implementation of a stacker (which is an arbitrary use case).

def log(*msg):
    print(int(time() - start), ':', *msg)

def stack(stack, item):
    sleep(1)
    stack.append(item)

start = time()
words = []
stack(words, 'hello')
log(words)
stack(words, 'world')
log(words)

Here the output, as I was expected:

1 : ['hello']
2 : ['hello', 'world']

Then an attempt of the asynchronous implementation of the same stacker.

def coroutine(func):
    def starter(*args, **kwargs):
        gen = func(*args, **kwargs)
        next(gen)
        return gen
    return starter

@coroutine
def a_sleep(count):
    while True:
        yield
        sleep(count)

@coroutine
def a_stack(stack):
    while True:
        item = yield
        yield from a_sleep(1)
        stack.append(item)

start = time()
words = []
a_stack(words).send('hello')
log(words)
a_stack(words).send('world')
log(words)

# Wait all tasks to finish
sleep(4)
log(words)

Expected output:

0 : []
1 : ['hello', 'world']
5 : ['hello', 'world']

Real output:

1 : []
2 : []
6 : []

I figure I missed something important. I hope my approach is relevant.

With additional logs, I have noticed that the a_stack function never execute the append part.

10
  • sleep(1) does not yield to other coroutines. It stops the thread entirely. Commented Feb 10, 2018 at 18:38
  • You also have no event loop managing your tasks. The asyncio loop is the key component here, switching between tasks as futures are produced for tasks waiting. You are simply calling send() on two generators, in sequence. Commented Feb 10, 2018 at 18:39
  • I wonder how it was possible to do that before asyncio, without using asyncio.sleep, asyncio.get_event_loop, etc. Commented Feb 10, 2018 at 18:46
  • You'll have to create an event loop, at the very least, to keep advancing your generators continually to multiplex them. And avoid infinite generators if you expect to run more code following them, they'll not return. Commented Feb 10, 2018 at 18:49
  • @MartijnPieters The restriction against infinite generator applies only to leaf generators, such as a_sleep. There's nothing wrong with other infinite generators; they're used all over the place in asyncio. (For example, while True: cmd = await reader.readline(); response = execute(cmd); writer.write(response) is an oft-seen pattern with stream-based communication.) Commented Feb 10, 2018 at 19:06

2 Answers 2

2

Your problem is that your generators are paused at the yield expression in the a_sleep() function (via the yield from a_sleep(1)) delegation. The generator there is also infinite and so will never return. You can never advance your generators far enough to reach the stack.append(item) calls.

I think you misunderstood what yield from does here. yield from moves control of the generator to another generator; that other generator has to complete iteration before the yield from expression completes and returns:

>>> @coroutine
... def a_sleep(count):
...     while True:
...         yield 'sleeping'  # to illustrate where we are stuck
...         sleep(count)
...
>>> words = []
>>> g = a_stack(words)
>>> g.send('hello')
'sleeping'
>>> g.send('hello')
'sleeping'
>>> g.send('hello')
'sleeping'

Instead of using sleep() and an infinite loop, record the time, and loop until the time has passed:

>>> @coroutine
... def a_sleep(count):
...     start = time()
...     while int(time() - start) < count:
...         yield 'sleeping'
...
>>> g = a_stack(words)
>>> g.send('hello')
'sleeping'
>>> g.send('hello')
'sleeping'
>>> g.send('hello')
>>> words
['hello']

You'll have to keep iterating over your generators (in a loop, perhaps?) to have them alternate execution.

The asyncio.sleep() function is of course much more efficient than that; it uses a Future() object that attaches to the AbstactEventLoop.call_later() functionality offered by the event loop. The loop lets the future object know when the time is up, at which point the future is marked 'ready' and the coroutine that produced it is continued again.

Sign up to request clarification or add additional context in comments.

3 Comments

If I understood correctly, the while True in my a_sleep made my generator infinite. If I remove it, my function become strictly equivalent to a normal sleep. How can I implement a sleep which will not blocking my thread? Using threading?
@srjjio: I added an example already, just loop until enough time has passed?
Now I finally see what I was missing and why the loop is essential! Thank you @Martijn
1

The line:

a_stack(words).send('hello')

does two things:

  1. create and run a new generator
  2. send the string hello into the generator

The created generator waits for item to arrive, and then, once resumed, does something with the item. And that is the problem, you never resume the generator, you throw it away and create a new one, and proceed to use in the same manner. To fix it, your sending code should do something like:

coro = a_stack(words)
coro.send('hello')
log(words)
coro.send('world')
log(words)

But there is another problem. Before actually appending to the stack, a_stack defers its execution to another iterator, which never stops yielding. One way to code a_sleep that fixes the problem is:

@coroutine
def a_sleep(count):
    t0 = time()
    while time() - t0 < count:
        yield 'notyet'

Then you need either a scheduler or at least a more resilient version of send, which can actually deal with a task deferring its execution. A simple (and very inefficient) one could looks like this:

def sync_send(c, v):
    while True:
        ret = c.send(v)
        if ret != 'notyet':
            return ret

After replacing coro.send('hello') with sync_send(coro, 'hello'), the expected output is displayed.

A real scheduler would never busy-loop; it would be instructed by sleep and by other potentially blocking calls, such as reads from files or sockets, which IO/timing events it must wait for. After the appropriate event arrived, it would wake up the correct task. This is the core of what asyncio does.

To learn more about how generators and yield from are used as a core abstraction for asynchronous programming, I recommend the excellent lecture Python Concurrency From the Ground by Dave Beazley. In the lecture Dave implements a coroutine scheduler in front of live audience, showing the design of his curio library.

2 Comments

Thank you for your answer, which really helps me too, and also deserves to be marked as accepted. I am going to take a look on your link.
@srjjio I didn't see that Martijn already answered while I was writing mine. It's remarkable how similar our solutions ended up being.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.