16

I have a csv parser implemented as a series of transform streams:

process.stdin
    .pipe(iconv.decodeStream('win1252'))
    .pipe(csv.parse())
    .pipe(buildObject())
    .pipe(process.stdout);

I'd like to abstract the parser (in its own module) and be able to do:

process.stdin.
    .pipe(parser)
    .pipe(process.stdout);

where parser is just the composition of the previously used transform streams.

If I do

var parser = iconv.decodeStream('win1252')
    .pipe(csv.parse())
    .pipe(buildObject());

then parser is set to the buildObject() stream and only this transformation stream receives the data.

If I do

var parser = iconv.decodeStream('win1252');
parser
    .pipe(csv.parse())
    .pipe(buildObject());

it doesn't work either, as .pipe(process.stdout) will be called on the 1st transform stream and the 2 others will be bypassed.

Any recommendation for an elegant composition of streams?

5 Answers 5

13

Unfortunately, there is no built-in way to do that, but there is cool multipipe package. Use like this:

var multipipe = require('multipipe');

var parser = multipipe(iconv.decodeStream('win1252'), csv.parse(), buildObject());
Sign up to request clarification or add additional context in comments.

2 Comments

Same. This is brilliant. Also, multipipe's source code is very short, so worth checking out the implementation for anyone who values a look under the hood.
There is also pumpify which similarly allows you to compose multiple streams.
5

As of 2022, and nodejs v16, there is a new compose function in the stream module, that build a Duplex stream from a list of streams.

see : https://nodejs.org/api/stream.html#streamcomposestreams

works with .pipe() and async syntax.

Comments

4

I've been struggling with this issue (and some others!). I found highlandjs solved nearly all my problems. In this case their pipeline command did the trick:

var h = require('highland');
var parser = h.pipeline(iconv.decodeStream('win1252'), csv.parse(), buildObject());

1 Comment

I would've sold my first child for this answer. thank you!
3

I think this can be done natively now.

const { PassThrough, Transform } = require('stream');

const compose = (...streams) => {
  const first = new PassThrough();
  const last = new PassThrough();
  const result = new Transform();

  [first, ...streams, last].reduce(
    (chain, stream) => (
      stream.on('error', (error) => result.emit('error', error)),
      chain.pipe(stream)
    ),
  );

  result._transform = (chunk, enc, cb) => {
    last.once('data', (chunk) => cb(null, chunk));
    first.push(chunk, enc);
  };

  result._flush = (cb) => {
    last.once('end', () => cb(null));
    first.push(null);
  };

  return result;
};

Comments

0

While stream.compose is still experimental you can use to wrap all streams into one stream. The resulting stream is composable into other compositions at any position.

export const compose = (...streams) => {
  const first = streams[0]

  const last = pipeline(
    ...streams,
    function emptyErrorHandler() {}
  )

  const stream = new Transform({
    writableObjectMode: first.writableObjectMode,
    readableObjectMode: last.readableObjectMode,
    transform(chunk, enc, cb) {
      first.write(chunk, enc, cb)
    },
    flush(cb) {
      first.end()
      last.once('finish', cb)
    }
  })

  last.on('data', chunk => stream.emit('data', chunk))
  last.once('error', err => stream.emit('error', err))

  return stream;
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.