64

I have a small issue that I'm not quite sure how to solve. Here is a minimal example:

What I have

scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while(some_criterium):
    line = scan_process.stdout.readline()
    some_criterium = do_something(line)

What I would like

scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while(some_criterium):
    line = scan_process.stdout.readline()
    if nothing_happens_after_10s:
        break
    else:
        some_criterium = do_something(line)

I read a line from a subprocess and do something with it. How can I exit if no line arrived after a fixed time interval?

4

8 Answers 8

35

Thanks for all the answers!

I found a way to solve my problem by simply using select.poll to peek into standard output.

import select
...
scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
poll_obj = select.poll()
poll_obj.register(scan_process.stdout, select.POLLIN)
while(some_criterium and not time_limit):
    poll_result = poll_obj.poll(0)
    if poll_result:
        line = scan_process.stdout.readline()
        some_criterium = do_something(line)
    update(time_limit)
Sign up to request clarification or add additional context in comments.

7 Comments

while this appears to work, it's not robust -- consider if your child process outputs something without a new line. select/poll will trigger, but readline will block indefinitely.
May not work on Windows, where select.poll() only works for sockets. docs.python.org/2/library/select.html
I haven't tested the solution in Windows, so you might be right, I know it's working under OSX and Linux.
@DimaTisnek, so if there is no line return at all, the program will still be blocked by the readline forever?
|
33

Here's a portable solution that enforces the timeout for reading a single line using asyncio:

#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT

async def run_command(*args, timeout=None):
    # Start child process
    # NOTE: universal_newlines parameter is not supported
    process = await asyncio.create_subprocess_exec(*args,
            stdout=PIPE, stderr=STDOUT)

    # Read line (sequence of bytes ending with b'\n') asynchronously
    while True:
        try:
            line = await asyncio.wait_for(process.stdout.readline(), timeout)
        except asyncio.TimeoutError:
            pass
        else:
            if not line: # EOF
                break
            elif do_something(line):
                continue # While some criterium is satisfied
        process.kill() # Timeout or some criterion is not satisfied
        break
    return await process.wait() # Wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop() # For subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

returncode = loop.run_until_complete(run_command("cmd", "arg 1", "arg 2",
                                                 timeout=10))
loop.close()

7 Comments

This is awesome, great work! I would suggest process.stdout.read() instead of readline() for someone else who may have more than just one expected line.
@jftuga: .read() would be incorrect here. The question is about .readline(). If you need all the output then it is simpler to use .communicate() with timeout. Read my comment under the answer that uses .communicate().
@JanKaifer yes. Both the link to Python 3 docs and the explicit shebang #!... python3 point to Python 3. The current Python version is 3.6. The syntax in the answer is Python 3.5 (released in 2015).
This is great if you can switch everything you do to asyncio. Want to interact with anything using queue.Queue? Tough, that breaks asyncio. Got a non-asyncio library that you want to register a callback with? Tough. asyncio doesn't interact well with anything else and seems to almost always be more trouble than its worth.
@secavfr: the code worked as is (last time I've tried). In 2022, I would replace everything starting with if sys.platform with just asyncio.run(main()) where inside async def main() you just await run_command(..).
|
14

I used something a bit more general in Python (if I remember correctly, also pieced together from Stack Overflow questions, but I cannot recall which ones).

import thread
from threading import Timer

def run_with_timeout(timeout, default, f, *args, **kwargs):
    if not timeout:
        return f(*args, **kwargs)
    try:
        timeout_timer = Timer(timeout, thread.interrupt_main)
        timeout_timer.start()
        result = f(*args, **kwargs)
        return result
    except KeyboardInterrupt:
        return default
    finally:
        timeout_timer.cancel()

Be warned, though. This uses an interrupt to stop whatever function you give it. This might not be a good idea for all functions and it also prevents you from closing the program with Ctrl + C during the timeout (i.e. Ctrl + C will be handled as a timeout).

You could use this and call it like:

scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while(some_criterium):
    line = run_with_timeout(timeout, None, scan_process.stdout.readline)
    if line is None:
        break
    else:
        some_criterium = do_something(line)

It might be a bit overkill, though. I suspect there is a simpler option for your case that I don't know.

6 Comments

it is not necessary to create a new thread for each line: a single watchdog thread is enough
Works like a charm and should be picked as best :-) Thanks, @Flogo!
Isn't it better to put the first two lines inside try-block i.e. "timeout_timer = Timer( ....upto.... timer.start()" outside try-except?
@AshKetchum: the line timeout_timer.start() should be in the try-block. Imagine you have a very short time limit and there is a context switch after starting the thread and before entering the try-block. That could theoretically lead to a KeyboardInterrupt sent to the main thread. The line initializing the Timer could be outside, I guess.
Does not seem to work on Ubuntu 18.04, python 3.6.9. Altough _thread.interrupt_main() gets executed, scan_process.stdout.readline() cannot be interrupted.
|
10

While Tom's solution works, using select() in the C idiom is more compact, this is the equivalent of your answer:

from select import select
scan_process = subprocess.Popen(command,
                                stdout=subprocess.PIPE,
                                stderr=subprocess.STDOUT,
                                bufsize=1)  # Line buffered
while some_criterium and not time_limit:
    poll_result = select([scan_process.stdout], [], [], time_limit)[0]

The rest is the same.

See pydoc select.select.

[Note: this is Unix-specific, as are some of the other answers.]

[Note 2: edited to add line buffering as per OP request]

[Note 3: the line buffering may not be reliable in all circumstances, leading to readline() blocking]

6 Comments

note: this as as well as @Tom's answer doesn't work on Windows and it resets the timeout if any input is received. OP wants to reset the timeout only if a newline is received (though it is straightforward to accommodate this requirement).
also, to avoid blocking on .readline() like in @Tom's answer, use os.read(scan_process.stdout.fileno(), 512) after the select (it is also not 100% percent safe if something else has access to the pipe) but it is less likely to block after the select than .readline().
I thought the whole idea was to block until either a line is read or timeout has been reached?... sorry if I'm misunderstanding.
think: if your code is blocked on readline() then how do you expect to respect the timeout
You don't know whether child's stdout is line-buffered (bufsize=1 has no effect on the child process; it only regulates the buffer used in the parent to read the output) and typically the stdout is block-buffered if it is redirected to a pipe i.e., select() may return without the full line being available.
|
5

A portable solution is to use a thread to kill the child process if reading a line takes too long:

#!/usr/bin/env python3
from subprocess import Popen, PIPE, STDOUT

timeout = 10
with Popen(command, stdout=PIPE, stderr=STDOUT,
           universal_newlines=True) as process:  # text mode
    # kill process in timeout seconds unless the timer is restarted
    watchdog = WatchdogTimer(timeout, callback=process.kill, daemon=True)
    watchdog.start()
    for line in process.stdout:
        # don't invoke the watcthdog callback if do_something() takes too long
        with watchdog.blocked:
            if not do_something(line):  # some criterium is not satisfied
                process.kill()
                break
            watchdog.restart()  # restart timer just before reading the next line
    watchdog.cancel()

where WatchdogTimer class is like threading.Timer that can be restarted and/or blocked:

from threading import Event, Lock, Thread
from subprocess import Popen, PIPE, STDOUT
from time import monotonic  # use time.time or monotonic.monotonic on Python 2

class WatchdogTimer(Thread):
    """Run *callback* in *timeout* seconds unless the timer is restarted."""

    def __init__(self, timeout, callback, *args, timer=monotonic, **kwargs):
        super().__init__(**kwargs)
        self.timeout = timeout
        self.callback = callback
        self.args = args
        self.timer = timer
        self.cancelled = Event()
        self.blocked = Lock()

    def run(self):
        self.restart() # don't start timer until `.start()` is called
        # wait until timeout happens or the timer is canceled
        while not self.cancelled.wait(self.deadline - self.timer()):
            # don't test the timeout while something else holds the lock
            # allow the timer to be restarted while blocked
            with self.blocked:
                if self.deadline <= self.timer() and not self.cancelled.is_set():
                    return self.callback(*self.args)  # on timeout

    def restart(self):
        """Restart the watchdog timer."""
        self.deadline = self.timer() + self.timeout

    def cancel(self):
        self.cancelled.set()

Comments

5

Try using signal.alarm:

#timeout.py
import signal, sys

def timeout(sig, frm):
  print "This is taking too long..."
  sys.exit(1)

signal.signal(signal.SIGALRM, timeout)
signal.alarm(10)
byte = 0

while 'IT' not in open('/dev/urandom').read(2):
  byte += 2
print "I got IT in %s byte(s)!" % byte

A couple of runs to show it works:

$ python timeout.py 
This is taking too long...
$ python timeout.py 
I got IT in 4672 byte(s)!

For a more detailed example, see pGuides.

2 Comments

This is Unix-only, won't work on Windows as SIGALRM and signal.alarm are unavailable.
This is the simplest if you are unix-only and just need a way to bail out when something isn't happening as quickly as it should because somethings wrong (and works for any situation, not just reads).
1

Using threading

import subprocess, threading, time

def _watcher(proc, delay):
    time.sleep(delay)
    proc.kill()

try:

    scan_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    threading.Thread(target = _watcher, args = (scan_process, 10)).start()
    
    while(some_criterium):
        line = scan_process.stdout.readline()
        if nothing_happens_after_10s:
            break
        else:
            some_criterium = do_something(line)

except Exception as e:
    print(e)

Please also refer How to run a process with timeout and still get stdout at runtime

Comments

0
import asyncio

async def read_stdout(process):
    # Read from the stdout pipe
    while True:
        line = await process.stdout.readline()
        if not line:
            break
        yield line.decode().strip()


async def main():
    # Create a subprocess
    process = await asyncio.create_subprocess_exec('ls', stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)

    # Set the timeout in seconds
    timeout = 10

    try:
        while True:
            line = await asyncio.wait_for(read_stdout(process).__anext__(), timeout=timeout)
            if not line:
                break
            print(line)
    except asyncio.TimeoutError:
        # If no data is available within the timeout, handle it
        print("Timeout occurred")
    finally:
        # Ensure the subprocess is terminated if necessary
        if process.returncode is None:
            process.terminate()
            await process.wait()

asyncio.run(main())

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.