30

Consider the following:

var asyncFunction = function(data, callback) {
  doAsyncyThing(function(data){
    // do some stuff
    return callback(err)
  })
}
fs.createReadStream('eupmc_lite_metadata_2016_04_15.json')
  .pipe(JSONstream.parse())
  .on('data', asyncFunction)   // <- how to let asyncFunction complete before continuing

How does the stream know when asyncFunction has completed? Is there any way to use asynchronous functions from within streams?

5
  • I'm not sure how the stream will handle the callback since the documentation doesn't show two parameters in on('data', function). If you do want to do something fancy though, you can pause the stream, do your stuff, then resume the stream. Commented Apr 16, 2016 at 11:01
  • @DaveBriand are you saying categorically that you cannot do this, or are you restating the question? :) Commented Apr 16, 2016 at 13:47
  • Categorically you can't pass a two argument function to the data stream event. However, you can pause the stream on the data event, do some asynchronous processing, then resume the stream when your processing is complete. Commented Apr 18, 2016 at 14:42
  • Great! Is there a cleanish way to code this? Could you give an example? Commented Apr 19, 2016 at 20:44
  • 1
    Just for clarity: yes, on('data', asyncFunction) cannot deal with callbacks, since asyncFunction must be in the form function(data). My point is: "how then do you deal with callbacks?" Commented Aug 10, 2016 at 16:51

2 Answers 2

24

Check out transform streams. They give you the ability to run async code on a chunk, and then call a callback when you are finished. Here are the docs: https://nodejs.org/api/stream.html#transform_transformchunk-encoding-callback

As a simple example, you can do something like:

const Transform = require('stream').Transform
class WorkerThing extends Transform {
    _transform(chunk, encoding, cb) {
        asyncFunction(chunk, cb)
    }
}

const workerThing = new WorkerThing()

fs.createReadStream('eupmc_lite_metadata_2016_04_15.json')
.pipe(JSONstream.parse())
.pipe(workerThing)
Sign up to request clarification or add additional context in comments.

1 Comment

I didnt really understand what you were saying at first, but yes, I see now that transform streams are probably the way forward. Will try this out.
2

I think this is enough:

const Transform = require('node:stream').Transform

const deferTransform = new Transform({
  transform: (chunk, encoding, next) => {
    Promise.resolve(`${chunk.toString().toUpperCase()} `).then((data) =>
      next(null, data)
    );
  },
});


fs.createReadStream('eupmc_lite_metadata_2016_04_15.json')
.pipe(JSONstream.parse())
.pipe(deferTransform)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.