4

I'm getting very familiar with python's asyncio, the asynchronous programming in python, co-routines etc. I want to be able to executing several co-routines with my own custom made eventloop.

I'm curious if i can write my own eventloop without importing asyncio at all

2 Answers 2

3

I want to be able to executing several co-routines with my own custom made eventloop.

The asyncio event loop is well-tested and can be easily extended to acknowledge non-asyncio events. If you describe the actual use case, it might be easier to help. But if your goal is to learn about async programming and coroutines, read on.

I'm curious if i can write my own eventloop without importing asyncio at all

It's definitely possible - asyncio itself is just a library, after all - but it will take some work for your event loop to be useful. See this excellent talk by David Beazley where he demonstrates writing an event loop in front of a live audience. (Don't be put off by David using the older yield from syntax - await works exactly the same way.)

Sign up to request clarification or add additional context in comments.

Comments

1

Ok, so i found an example somewhere (sorry, don't remember where, no link), and changed a little bit.

An eventloop and co-routins without even importing asyncio:

import datetime
import heapq
import types
import time

class Task:
    def __init__(self, wait_until, coro):
        self.coro = coro
        self.waiting_until = wait_until

    def __eq__(self, other):
        return self.waiting_until == other.waiting_until

    def __lt__(self, other):
        return self.waiting_until < other.waiting_until

class SleepingLoop:
    def __init__(self, *coros):
        self._new = coros
        self._waiting = []

    def run_until_complete(self):
        # Start all the coroutines.
        for coro in self._new:
            wait_for = coro.send(None)
            heapq.heappush(self._waiting, Task(wait_for, coro))

        # Keep running until there is no more work to do.
        while self._waiting:
            now = datetime.datetime.now()
            # Get the coroutine with the soonest resumption time.
            task = heapq.heappop(self._waiting)
            if now < task.waiting_until:
                # We're ahead of schedule; wait until it's time to resume.
                delta = task.waiting_until - now
                time.sleep(delta.total_seconds())
                now = datetime.datetime.now()
            try:
                # It's time to resume the coroutine.
                wait_until = task.coro.send(now)
                heapq.heappush(self._waiting, Task(wait_until, task.coro))
            except StopIteration:
                # The coroutine is done.
                pass


@types.coroutine
def async_sleep(seconds):
    now = datetime.datetime.now()
    wait_until = now + datetime.timedelta(seconds=seconds)
    actual = yield wait_until

    return actual - now


async def countdown(label, total_seconds_wait, *, delay=0):
    print(label, 'waiting', delay, 'seconds before starting countdown')
    delta = await async_sleep(delay)
    print(label, 'starting after waiting', delta)
    while total_seconds_wait:
        print(label, 'T-minus', total_seconds_wait)
        waited = await async_sleep(1)
        total_seconds_wait -= 1
    print(label, 'lift-off!')


def main():
    loop = SleepingLoop(countdown('A', 5, delay=0),
                        countdown('B', 3, delay=2),
                        countdown('C', 4, delay=1))
    start = datetime.datetime.now()
    loop.run_until_complete()

    print('Total elapsed time is', datetime.datetime.now() - start)



if __name__ == '__main__':
    main()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.