I am working with a non asyncio library that does not do i/o, only simple calculations. One of its features is a series of callbacks on a set of user provided observers in the order they happen, while the user controls a higher level api.
class Calculator:
def __init__(self):
self._internal = 0
self._observers = [
def register(observer):
self._observers.append(observer)
def add(int val):
for i in range(val):
self._internal += 1
def _call_observers(self):
for o in self._observers:
o.on_add()
@property
def value(self):
return self._internal
I need to use this from a couroutine to call other coroutines. One idea I had was to subclass an AsyncCalculator and override relevant methods to be coroutines, storing a reference to the event loop.
(not real code)
class AsyncCalculator(Calculator):
def __init__(self, loop):
...
async def _call_observers(self):
...
I was wondering if there is a way to wait on a coroutine from a normal function if there is a reference to the event loop or any other structure available? It seems that if the function is being called from a coroutine, then there might be some hacky way to await another coroutine from the function. Then maybe that hack could be thrown in a utility?