What I want to achieve
- I want to stream on a line-by-line basis elements from a generator-like object over a external program from Python.
- Broken down i want something like
Generator -> Popen(...) -> Generatorwithout holding too much data in memory.
Here a working, minimal example which demonstrates what I want to achieve:
from io import StringIO
from subprocess import Popen, PIPE
import time
proc_input = StringIO("aa\nbb\ncc\ndd")
proc = Popen(["cat"], stdin=PIPE, stdout=PIPE)
for line in proc_input:
proc.stdin.write(line.encode())
yield proc.stdout.readline()
time.sleep(1)
Problem: The proc.stdout.readline() just blocks and doesn't show anything.
What I already learned:
- If the input comes from a file-like object (i.e. something which has
fileno()implemented), I can pass this directly to stdin and avoid writing to the PIPE. But for doing so, I need first to stream the generator to a file, which I like to avoid as this seems to be a unnecessary detour. For example the following works.
import tempfile
from subprocess import Popen, PIPE
tp = tempfile.TemporaryFile()
tp.write("aa\nbb\ncc\ndd".encode())
tp.seek(0)
proc = Popen(["cat"], stdin=tp, stdout=PIPE)
for line in proc.stdout:
print(line)
- If I stick to writing to the PIPE object, I can resolve the problem by closing the input stream and then read from the output stream. But here I don't know where in the meantime the data lives. Cause I my generator yields GB of data, I do not want to run into memory errors.
proc_input = StringIO("aa\nbb\ncc\ndd")
proc = Popen(["cat"], stdin=PIPE, stdout=PIPE)
for line in proc_input:
proc.stdin.write(line.encode())
proc.stdin.close()
for line in proc.stdout:
print(line)
What I also tried:
- I played around with the buffersize argument
Popen(..., bufsize=), but it seemed not to have any effect. - I tried writing the input data to
io.BufferedWriterwith the hope, that Popen can digest this as an input for stdin. Also without success.
Additional info: I'm using Linux.
Remarks to Comments
It was suggested to break the input generator into chunks. This can be achieved via
def PopenStreaming(process, popen_kwargs, nlines, input):
while input:
proc = Popen(process, stdin=PIPE, stdout=PIPE, **popen_kwargs)
for n, row in enumerate(input):
proc.stdin.write(row)
if n == nlines:
proc.stdin.close()
break
for row in proc.stdout:
yield row