0

I have designed a piece of python code which essentially works as a microservice in the larger scheme of things.

There are two tasks that I have scheduled on the loop and two tasks that I set up to be run in executor.

The strange part is the code runs well. Does everything I expect. But when I end it with KeyboardInterrupt (Ctrl+C) I get to see error and exception. Which makes me feel I am definitely misusing the asyncio patterns here. I will try to provide a brief overview of the code without going into the lengthy details right away:

class Prototype:
    def _redis_subscriber(self):
        self._p = self._redis_itx.pubsub(ignore_subscribe_messages=True)
        self._p.subscribe("channel1")
        while True:
            pubbed_msg = self._p.get_message()
            if pubbed_msg is not None:
                #process process process 
            time.sleep(0.01)

    def _generic_worker_on_internal_q(self):
        while True:
            item = self.q.get() #blocking call
            #process item 

    async def task1(self):
        #network I/O bound code 

    async def task2(self):
        #network I/O bound code. also fills with self.q.put() 

    def run(self):
        asyncio.ensure_future(self.task1(), loop=self._event_loop)
        asyncio.ensure_future(self.task2(), loop=self._event_loop)
        asyncio.ensure_future(self._event_loop.run_in_executor(None, self._redis_subscriber))
        asyncio.ensure_future(self._event_loop.run_in_executor(None, self._generic_worker_on_internal_q))
        self._event_loop.run_forever()

if __name__ == '__main__':
    p = Prototype()
    p.run()

Also, I tried experimenting with another approach in the Protoype.run() method:

def __init__(self):
    self._tasks = []

def run(self):
    self._tasks.append(asyncio.ensure_future(self._task1()))
    self._tasks.append(asyncio.ensure_future(self._task2()))

 self._tasks.append(asyncio.ensure_future(self._event_loop.run_in_executor(None, self._redis_subscriber)))

  self._tasks.append(asyncio.ensure_future(self._event_loop.run_in_executor(None, self._generic_worker_on_internal_q)))
    self._event_loop.run_until_complete(self._tasks)

Regardless, when I try to end the running script with Ctrl+C,it doesn't exit at the first attempt. I have to press it twice. And this is what comes up:

KeyboardInterrupt
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/thread.py", line 40, in _python_exit
    t.join()
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
Exception ignored in: <bound method BaseEventLoop.call_exception_handler of <_UnixSelectorEventLoop running=False closed=False debug=False>>
Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 1296, in call_exception_handler
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1335, in error
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1442, in _log
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1452, in handle
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1514, in callHandlers
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 863, in handle
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1069, in emit
  File "/usr/local/Cellar/python3/3.6.4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/logging/__init__.py", line 1059, in _open
NameError: name 'open' is not defined

Where am I going wrong?

2
  • Maybe try the same with debug enabled. docs.python.org/3/library/asyncio-dev.html ? Commented Feb 9, 2018 at 12:07
  • One of your run_in_executor threads is not exiting on the interrupt; the atexit handler is sending None to the worker queues and then joining them. None signals those threads to exit, but the function being run never exits so the worker never does. Commented Feb 9, 2018 at 12:09

1 Answer 1

1

You scheduled two infinite tasks in the executor. Thes tasks are blocking exiting.

The default task executor puts those in a thread managed by a queue, and when exiting, the queue is signalled to stop executing tasks. However, if your task never returns, the queue manager never can check for this state.

You can avoid this state by not running an infinite loop. Instead, reschedule your task each time you reach the end, and do not block on getting messages:

def _redis_subscriber(self):
    self._p = self._redis_itx.pubsub(ignore_subscribe_messages=True)
    self._p.subscribe("channel1")

    def process_message():
        # non-blocking task to repeatedly run in the executor
        pubbed_msg = self._p.get_message(False)
        if pubbed_msg is not None:
            # process process process 
        time.sleep(0.01)
        # reschedule function for next message
        asyncio.ensure_future(self._event_loop.run_in_executor(None, process_message))

    # kick of first handler
    process_message()

You still run this function in the executor to kick it off:

def run(self):
    # ...
    asyncio.ensure_future(self._event_loop.run_in_executor(None, self._redis_subscriber))

Do the same for _generic_worker_on_internal_q() and make sure you avoid using blocking calls to the Queue.get(), so use self.q.get(False).

You could even use a decorator for this:

import asyncio
from functools import partial, wraps


def auto_reschedule(loop=None, executor=None):
    """Repeatedly re-schedule function in the given executor"""
    def decorator(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            result = f(*args, **kwargs)
            callable = wrapper
            if args or kwargs:
                callable = partial(callable, *args, **kwargs)
            current_loop = loop
            if current_loop is None:
                current_loop = asyncio.get_event_loop()
            current_loop.run_in_executor(executor, callable)
            return result
        return wrapper
    return decorator

and use this decorator on your inner function, where you have access to your instance attribute referencing the loop:

def _redis_subscriber(self):
    self._p = self._redis_itx.pubsub(ignore_subscribe_messages=True)
    self._p.subscribe("channel1")

    @auto_reschedule(self._event_loop)
    def process_message():
        # non-blocking task to repeatedly run in the executor
        pubbed_msg = self._p.get_message(False)
        if pubbed_msg is not None:
            # process process process 
        time.sleep(0.01)

    # kick of first handler
    process_message()

A quick demo of the latter:

import asyncio
import time
import random

# auto_reschedule imported or defined

def create_thread_task(i, loop):
    @auto_reschedule(loop)
    def thread_task():
        print(f'Task #{i} running in worker')
        time.sleep(random.uniform(1, 3))

    return thread_task


def main():
    loop = asyncio.get_event_loop()
    for i in range(5):
        asyncio.ensure_future(
            loop.run_in_executor(None, create_thread_task(i, loop)))
    loop.run_forever()


if __name__ == '__main__':
    main()
Sign up to request clarification or add additional context in comments.

2 Comments

You are absolutely right in detecting the cause. Even though the solution proposed did not kick off the execution of the two thread workers at all, I have at least got the answer to my original question. And with that a good perspective on the bad pattern I was using. Thank you.
@anomit: interesting, but the principle is sound. See gist.github.com/mjpieters/fa6277893f500df71f1381f7f61e3ab3 for a working demo where a nested function reschedules itself. A keyboard interrupt exits the script cleanly (after at most 3 seconds as the tasks exit after the time.sleep() call).

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.