4

I'm using python to create a script which runs and interacts with some processes simultaneously. For that I'm using asyncio to implement this parallelism. The main problem is how to run another cleanup routine when a KeyboardInterrupt or a SIGINT occurs.

Here's an example code I wrote to show the problem:

import asyncio
import logging
import signal
from time import sleep


class Process:
    async def start(self, arguments):
        self._process = await asyncio.create_subprocess_exec("/bin/bash", *arguments)

        return await self._process.wait()

    async def stop(self):
        self._process.terminate()


class BackgroundTask:

    async def start(self):
        # Very important process which needs to run while process 2 is running
        self._process1 = Process()
        self._process1_task = asyncio.create_task(self._process1.start(["-c", "sleep 100"]))

        self._process2 = Process()
        self._process2_task = asyncio.create_task(self._process2.start(["-c", "sleep 50"]))

        await asyncio.wait([self._process1_task, self._process2_task], return_when=asyncio.ALL_COMPLETED)

    async def stop(self):
        # Stop process
        await self._process1.stop()

        # Call a cleanup process which cleans up process 1
        cleanup_process = Process()
        await cleanup_process.start(["-c", "sleep 10"])

        # After that we can stop our second process
        await self._process2.stop()


backgroundTask = BackgroundTask()


async def main():
    await asyncio.create_task(backgroundTask.start())


logging.basicConfig(level=logging.DEBUG)
asyncio.run(main(), debug=True)

This code creates a background task which starts two processes (in this example two bash sleep commands) and waits for them to finish. This works fine and both command are running in parallel.

The main problem is the stop routine. I'd like to run the stop method when the program receives a SIGINT or KeyboardInterrupt, which first stops the process1, then starts a cleanup method and stops process2 afterwards. This is necessary because the cleanup command depends on process2.

What I've tried (instead of the asyncio.run() and the async main):

def main():
    try:
        asyncio.get_event_loop().run_until_complete(backgroundTask.start())
    except KeyboardInterrupt:
        asyncio.get_event_loop().run_until_complete(backgroundTask.stop())

main()

This of course doesn't work as expected, because as soon as a KeyboardInterrupt exception occurs the backgroundTask.start Task is canceled and the backgroundTask.stop is started in the main loop, so my processes are canceled and can't stop properly.

So is there a way to detect the KeyboardInterrupt without canceling the current main loop and run my backgroundTask.stop method instead?

1 Answer 1

1

You want to add a signal handler as shown in this example in the docs:

import asyncio
import functools
import os
import signal

def ask_exit(signame, loop):
    print("got signal %s: exit" % signame)
    loop.stop()

async def main():
    loop = asyncio.get_running_loop()

    for signame in {'SIGINT', 'SIGTERM'}:
        loop.add_signal_handler(
            getattr(signal, signame),
            functools.partial(ask_exit, signame, loop))

    await asyncio.sleep(3600)

print("Event loop running for 1 hour, press Ctrl+C to interrupt.")
print(f"pid {os.getpid()}: send SIGINT or SIGTERM to exit.")

asyncio.run(main())

That's a bit of an overcomplicated/outdated example though, consider it more like this (your coroutine code goes where the asyncio.sleep call is):

import asyncio
from signal import SIGINT, SIGTERM
    

async def main():
    loop = asyncio.get_running_loop()
    for signal_enum in [SIGINT, SIGTERM]:
        loop.add_signal_handler(signal_enum, loop.stop)

    await asyncio.sleep(3600) # Your code here


asyncio.run(main())

At this point a Ctrl + C will break the loop and raise a RuntimeError, which you can catch by putting the asyncio.run call in a try/except block like so:

try:
    asyncio.run(main())
except RuntimeError as exc:
    expected_msg = "Event loop stopped before Future completed."
    if exc.args and exc.args[0] == expected_msg:
        print("Bye")
    else:
        raise

That's not very satisfying though (what if something else caused the same error?), so I'd prefer to raise a distinct error. Also, if you're exiting on the command line, the proper thing to do is to return the proper exit code (in fact, the code in the example just uses the name, but it's actually an IntEnum with that numeric exit code in it!)

import asyncio
from functools import partial
from signal import SIGINT, SIGTERM
from sys import stderr

class SignalHaltError(SystemExit):
    def __init__(self, signal_enum):
        self.signal_enum = signal_enum
        print(repr(self), file=stderr)
        super().__init__(self.exit_code)

    @property
    def exit_code(self):
        return self.signal_enum.value

    def __repr__(self):
        return f"\nExitted due to {self.signal_enum.name}"

def immediate_exit(signal_enum, loop):
    loop.stop()
    raise SignalHaltError(signal_enum=signal_enum)

async def main():
    loop = asyncio.get_running_loop()

    for signal_enum in [SIGINT, SIGTERM]:
        exit_func = partial(immediate_exit, signal_enum=signal_enum, loop=loop)
        loop.add_signal_handler(signal_enum, exit_func)

    await asyncio.sleep(3600)

print("Event loop running for 1 hour, press Ctrl+C to interrupt.")

asyncio.run(main())

Which when Ctrl + C'd out of gives:

python cancelling_original.py

Event loop running for 1 hour, press Ctrl+C to interrupt.
^C
Exitted due to SIGINT
echo $?

2

Now there's some code I'd be happy to serve! :^)

P.S. here it is with type annotations:

from __future__ import annotations

import asyncio
from asyncio.events import AbstractEventLoop
from functools import partial
from signal import Signals, SIGINT, SIGTERM
from sys import stderr
from typing import Coroutine

class SignalHaltError(SystemExit):
    def __init__(self, signal_enum: Signals):
        self.signal_enum = signal_enum
        print(repr(self), file=stderr)
        super().__init__(self.exit_code)

    @property
    def exit_code(self) -> int:
        return self.signal_enum.value

    def __repr__(self) -> str:
        return f"\nExitted due to {self.signal_enum.name}"

def immediate_exit(signal_enum: Signals, loop: AbstractEventLoop) -> None:
    loop.stop()
    raise SignalHaltError(signal_enum=signal_enum)

async def main() -> Coroutine:
    loop = asyncio.get_running_loop()

    for signal_enum in [SIGINT, SIGTERM]:
        exit_func = partial(immediate_exit, signal_enum=signal_enum, loop=loop)
        loop.add_signal_handler(signal_enum, exit_func)

    return await asyncio.sleep(3600)

print("Event loop running for 1 hour, press Ctrl+C to interrupt.")

asyncio.run(main())

The advantage of a custom exception here is that you can then catch it specifically, and avoid the traceback being dumped to the screen

try:
    asyncio.run(main())
except SignalHaltError as exc:
    # log.debug(exc)
    pass
else:
    raise
Sign up to request clarification or add additional context in comments.

1 Comment

what if your main() function had a while true inside with a sleep of 1s and then kept running some code? how would you break out of that

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.