8

When starting two processes with the old school subprocess.Popen() API, I can easily connect standard out of one process to standard in of another process, creating a pipeline in the same way as a UNIX shell will do when connecting commands with |:

from subprocess import Popen, PIPE

process_1 = Popen(['ls'], stdout = PIPE)
process_2 = Popen(['wc'], stdin = process_1.stdout)

process_1.wait()
process_2.wait()

How can I accomplish the same when using the asynchronous API from asyncio.subprocess.create_subprocess_exec() (or similar)? This is what I tried:

from asyncio.events import get_event_loop
from asyncio.subprocess import PIPE, create_subprocess_exec

async def main():
    process_1 = await create_subprocess_exec('ls', stdout = PIPE)
    process_2 = await create_subprocess_exec('wc', stdin = process_1.stdout)

    await process_1.wait()
    await process_2.wait()

get_event_loop().run_until_complete(main())

But the second call to create_subprocess_exec() complains that the argument passed to stdin has no fileno (which is true):

Traceback (most recent call last):
  File ".../test-async.py", line 11, in <module>
     get_event_loop().run_until_complete(main())
[...]
  File ".../test-async.py", line 6, in main
     process_2 = await create_subprocess_exec('wc', stdin = process_1.stdout)
[...]
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/subprocess.py", line 1388, in _get_handles
     p2cread = stdin.fileno()
AttributeError: 'StreamReader' object has no attribute 'fileno'

How can I get the same result as in the synchronous example above?

1

1 Answer 1

17

In asyncio, process.stdout is actually a StreamReader, not a file object. The file object can be accessed through process._transport._proc.stdout. Unfortunately, you won't be able to use it since it has already been registered in the event loop in order to provide the stream interface process.stdout.

One way to deal with the issue is to create your own pipe and pass the file descriptors to the subprocess:

async def main():
    read, write = os.pipe()
    process_1 = await create_subprocess_exec('ls', stdout=write)
    os.close(write)
    process_2 = await create_subprocess_exec('wc', stdin=read, stdout=PIPE)
    os.close(read)
    return await process_2.stdout.read()

Note that the write file descriptor should be explicitly closed once the first subprocess is started (it is not automatically closed unless you use subprocess.PIPE). The read file descriptor also needs to be closed, as explained here.

Sign up to request clarification or add additional context in comments.

4 Comments

could you provide a reference for the statement: "it is closed automatically only if you use subprocess.PIPE"? As I understand the reason you want os.close(write) in the parent is so that ls would get SIGPIPE and EPIPE if wc dies prematurely. It is the same for subprocess.PIPE (at least for ordinary Popen() as demonstrated by the code example in the docs)
@J.F.Sebastian I noticed it by looking at the code, I don't know if it's documented somewhere. However, your example is about closing the reader, not the writer. In my example, the writer needs to be closed in order to set up the pipe properly (see this answer).
You are right about the write end of the pipe (it is closed because it is unused in the parent—it would also explain why it is the responsibility of the code that creates the subprocess.PIPE pipe to close it—it is unrelated to asyncio and the corresponding os.close() call is in the subprocess module). The example that I cited would correspond to process_1.stdout.close() above i.e, os.close(read) should be called.
@J.F.Sebastian You're right, I didn't realize the reader also needs to be closed. Thanks for your insight, see my edit.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.