I have an Iterable[String] and I want to stream that to an external Process and return an Iterable[String] for the output.
I feel like this should work as it compiles
import scala.sys.process._
object PipeUtils {
implicit class IteratorStream(s: TraversableOnce[String]) {
def pipe(cmd: String) = s.toStream.#>(cmd).lines
def run(cmd: String) = s.toStream.#>(cmd).!
}
}
However, Scala tries to execute the contents of s instead of pass them in to standard in. Can anyone please tell me what I'm doing wrong?
UPDATE:
I think that my original problem was that the s.toStream was being implicity converted to a ProcessBuilder and then executed. This is incorrect as it's the input to the process.
I have come up with the following solution. This feels very hacky and wrong but it seems to work for now. I'm not writing this as an answer because I feel like the answer should be one line and not this gigantic thing.
object PipeUtils {
/**
* This class feels wrong. I think that for the pipe command it actually loads all of the output
* into memory. This could blow up the machine if used wrong, however, I cannot figure out how to get it to
* work properly. Hopefully http://stackoverflow.com/questions/28095469/stream-input-to-external-process-in-scala
* will get some good responses.
* @param s
*/
implicit class IteratorStream(s: TraversableOnce[String]) {
val in = (in: OutputStream) => {
s.foreach(x => in.write((x + "\n").getBytes))
in.close
}
def pipe(cmd: String) = {
val output = ListBuffer[String]()
val io = new ProcessIO(in,
out => {Source.fromInputStream(out).getLines.foreach(output += _)},
err => {Source.fromInputStream(err).getLines.foreach(println)})
cmd.run(io).exitValue
output.toIterable
}
def run(cmd: String) = {
cmd.run(BasicIO.standard(in)).exitValue
}
}
}
EDIT
The motivation for this comes from using Spark's .pipe function on an RDD. I want this exact same functionality on my local code.
s.toStreamto a ProcessBuilder. Anyways, wouldn'tdef pipe(cmd: String): Stream[String] = (cmd +: s.toSeq).lineStreamalso work or am I missing something?