2

I have an Iterable[String] and I want to stream that to an external Process and return an Iterable[String] for the output.

I feel like this should work as it compiles

import scala.sys.process._

object PipeUtils {
  implicit class IteratorStream(s: TraversableOnce[String]) {
    def pipe(cmd: String) = s.toStream.#>(cmd).lines
    def run(cmd: String) = s.toStream.#>(cmd).!
  }
}

However, Scala tries to execute the contents of s instead of pass them in to standard in. Can anyone please tell me what I'm doing wrong?

UPDATE:

I think that my original problem was that the s.toStream was being implicity converted to a ProcessBuilder and then executed. This is incorrect as it's the input to the process.

I have come up with the following solution. This feels very hacky and wrong but it seems to work for now. I'm not writing this as an answer because I feel like the answer should be one line and not this gigantic thing.

object PipeUtils {

  /**
   * This class feels wrong.  I think that for the pipe command it actually loads all of the output
   * into memory.  This could blow up the machine if used wrong, however, I cannot figure out how to get it to
   * work properly.  Hopefully http://stackoverflow.com/questions/28095469/stream-input-to-external-process-in-scala
   * will get some good responses.
   * @param s
   */
  implicit class IteratorStream(s: TraversableOnce[String]) {

    val in = (in: OutputStream) => {
      s.foreach(x => in.write((x + "\n").getBytes))
      in.close
    }

    def pipe(cmd: String) = {
      val output = ListBuffer[String]()
      val io = new ProcessIO(in,
      out => {Source.fromInputStream(out).getLines.foreach(output += _)},
      err => {Source.fromInputStream(err).getLines.foreach(println)})

      cmd.run(io).exitValue
      output.toIterable
    }

    def run(cmd: String) = {
      cmd.run(BasicIO.standard(in)).exitValue
    }
  }
}

EDIT

The motivation for this comes from using Spark's .pipe function on an RDD. I want this exact same functionality on my local code.

5
  • You're right about the implicit conversion from s.toStream to a ProcessBuilder. Anyways, wouldn't def pipe(cmd: String): Stream[String] = (cmd +: s.toSeq).lineStream also work or am I missing something? Commented Jan 22, 2015 at 20:42
  • How would this work for an infinite stream as an input? Or in the case of the real world a very large stream that is too big to fit in Seq? Commented Jan 22, 2015 at 22:58
  • Ok, wasn't clear to me, that your input potentially can be very big. Commented Jan 22, 2015 at 23:03
  • Also the solution you provided causes the native program to fail. I think that what you said puts the input as parameters. The input is not parameters, it's being read from stdin. Think 'cat file.txt | cmd' Commented Jan 22, 2015 at 23:04
  • Yes, you're right, so I obviously missed something. Thanks for clarifying. Commented Jan 22, 2015 at 23:08

2 Answers 2

4

Assuming scala 2.11+, you should use lineStream as suggested by @edi. The reason is that you get a streaming response as it becomes available instead of a batched response. Let's say I have a shell script echo-sleep.sh:

#/usr/bin/env bash
# echo-sleep.sh
while read line; do echo $line; sleep 1; done

and we want to call it from scala using code like the following:

import scala.sys.process._
import scala.language.postfixOps
import java.io.ByteArrayInputStream

implicit class X(in: TraversableOnce[String]) {
  // Don't do the BAOS construction in real code.  Just for illustration.
  def pipe(cmd: String) = 
    cmd #< new ByteArrayInputStream(in.mkString("\n").getBytes) lineStream
}

Then if we do a final call like:

1 to 10 map (_.toString) pipe "echo-sleep.sh" foreach println

a number in the sequence appears on STDOUT every 1 second. If you buffer, and convert to an Iterable as in your example, you will lose this responsiveness.

Sign up to request clarification or add additional context in comments.

2 Comments

How would this work when the input is very large? Is this a streaming input?
Also, I know you suggested not to use BOAS for real, so how would you do this for real?
3

Here's a solution demonstrating how to write the process code so that it streams both the input and output. The key is to produce a java.io.PipedInputStream that is passed to the input of the process. This stream is filled from the iterator asynchronously via a java.io.PipedOutputStream. Obviously, feel free to change the input type of the implicit class to an Iterable.

Here's an iterator used to show this works.

/**
 * An iterator with pauses used to illustrate data streaming to the process to be run.
 */
class PausingIterator[A](zero: A, until: A, pauseMs: Int)(subsequent: A => A) 
extends Iterator[A] {
  private[this] var current = zero
  def hasNext = current != until
  def next(): A = {
    if (!hasNext) throw new NoSuchElementException
    val r = current
    current = subsequent(current)
    Thread.sleep(pauseMs)
    r
  }
}

Here's the actual code you want

import java.io.PipedOutputStream
import java.io.PipedInputStream
import java.io.InputStream
import java.io.PrintWriter

// For process stuff
import scala.sys.process._
import scala.language.postfixOps

// For asynchronous stream writing.
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

/**
 * A streaming version of the original class.  This does not block to wait for the entire 
 * input or output to be constructed.  This allows the process to get data ASAP and allows 
 * the process to return information back to the scala environment ASAP.  
 *
 * NOTE: Don't forget about error handling in the final production code.
 */
implicit class X(it: Iterator[String]) {
  def pipe(cmd: String) = cmd #< iter2is(it) lineStream

  /**
   * Convert an iterator to an InputStream for use in the pipe function.
   * @param it an iterator to convert
   */
  private[this] def iter2is[A](it: Iterator[A]): InputStream = {
    // What is written to the output stream will appear in the input stream.
    val pos = new PipedOutputStream
    val pis = new PipedInputStream(pos)
    val w = new PrintWriter(pos, true)

    // Scala 2.11 (scala 2.10, use 'future').  Executes asynchrously.  
    // Fill the stream, then close.
    Future {
      it foreach w.println
      w.close
    }

    // Return possibly before pis is fully written to.
    pis
  }
}

The final call will show display 0 through 9 and will pause for 3 seconds in between the displaying of each number (second pause on the scala side, 1 second pause on the shell script side).

// echo-sleep.sh is the same script as in my previous post
new PausingIterator(0, 10, 2000)(_ + 1)
  .map(_.toString)
  .pipe("echo-sleep.sh")
  .foreach(println)

Output

0          [ pause 3 secs ]
1          [ pause 3 secs ]
...
8          [ pause 3 secs ]
9          [ pause 3 secs ]

1 Comment

Following this example, we succeed in making the streaming IO very efficient. The key is to read from another thread, as lineStream is otherwise a blocking call

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.