I have designed a piece of python code which essentially works as a microservice in the larger scheme of things.
There are two tasks that I have scheduled on the loop and two tasks that I set up to be run in executor.
The strange part is the code runs well. Does everything I expect. But when I end it with KeyboardInterrupt (Ctrl+C) I get to see error and exception. Which makes me feel I am definitely misusing the asyncio patterns here. I will try to provide a brief overview of the code without going into the lengthy details right away:
class Prototype:
def _redis_subscriber(self):
self._p = self._redis_itx.pubsub(ignore_subscribe_messages=True)
self._p.subscribe("channel1")
while True:
pubbed_msg = self._p.get_message()
if pubbed_msg is not None:
#process process process
time.sleep(0.01)
def _generic_worker_on_internal_q(self):
while True:
item = self.q.get() #blocking call
#process item
async def task1(self):
#network I/O bound code
async def task2(self):
#network I/O bound code. also fills with self.q.put()
def run(self):
asyncio.ensure_future(self.task1(), loop=self._event_loop)
asyncio.ensure_future(self.task2(), loop=self._event_loop)
asyncio.ensure_future(self._event_loop.run_in_executor(None, self._redis_subscriber))
asyncio.ensure_future(self._event_loop.run_in_executor(None, self._generic_worker_on_internal_q))
self._event_loop.run_forever()
if __name__ == '__main__':
p = Prototype()
p.run()
Also, I tried experimenting with another approach in the Protoype.run() method:
def __init__(self):
self._tasks = []
def run(self):
self._tasks.append(asyncio.ensure_future(self._task1()))
self._tasks.append(asyncio.ensure_future(self._task2()))
self._tasks.append(asyncio.ensure_future(self._event_loop.run_in_executor(None, self._redis_subscriber)))
self._tasks.append(asyncio.ensure_future(self._event_loop.run_in_executor(None, self._generic_worker_on_internal_q)))
self._event_loop.run_until_complete(self._tasks)
Regardless, when I try to end the running script with Ctrl+C,it doesn't exit at the first attempt. I have to press it twice. And this is what comes up:
KeyboardInterrupt
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/thread.py", line 40, in _python_exit
t.join()
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1056, in join
self._wait_for_tstate_lock()
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
Exception ignored in: <bound method BaseEventLoop.call_exception_handler of <_UnixSelectorEventLoop running=False closed=False debug=False>>
Traceback (most recent call last):
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 1296, in call_exception_handler
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1335, in error
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1442, in _log
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1452, in handle
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1514, in callHandlers
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 863, in handle
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1069, in emit
File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1059, in _open
NameError: name 'open' is not defined
Where am I going wrong?
run_in_executorthreads is not exiting on the interrupt; the atexit handler is sendingNoneto the worker queues and then joining them.Nonesignals those threads to exit, but the function being run never exits so the worker never does.