2

What I'm Working On

I'm attempting to write a Transform stream that will be used as part of a process that reads data from an HTTP API (Salesforce), transforms each resource into a standard internal data structure, and then sends the transformed data to be indexed by Elasticsearch.

My Problem

I've written a simple Transform stream, but it appears to be swallowing data, instead of passing it through.

Here's my class AdapterStream

const { Transform } = require('stream')

class AdapterStream extends Transform {
  constructor (adapter) {
    super({objectMode: true})
    // adapter is a function that contains the transform logic
    this.adapter = adapter
  }

  _transform (chunk, _encoding, callback) {
    this.push(this.adapter(chunk))
    callback()
  }
}

module.exports = AdapterStream

Here's how I'm using it

// query is an EventEmitter (https://jsforce.github.io/document/#query)
query.run({autoFetch: true, maxFetch: 10})
     .pipe(transform)
     .pipe(process.stdout)

What I expect to happen

I expect to see the transformed data in my terminal window.

What is actually happening

Nothing is printed in my terminal window

Things I've Tried

Writing to a file

query.run({autoFetch: true, maxFetch: 10})
     .pipe(transform)
     .pipe(fs.createWriteStream('./test.json'))

This creates the file ./test.json as expected, but it is empty.

Subscribing to events

query.run({autoFetch: true, maxFetch: 10})
     .pipe(transform)

transform.on('readable', () => { console.log(transform.read()) })

This does what I expected .pipe(process.stdout) to do: print the transformed records to the console.

My question is, why doesn't pipe() do what I'm expecting it to do? There must be something simple that I'm missing.

2
  • The node documentation points to what is probably the cause of my problem: Note: Care must be taken when using Transform streams in that data written to the stream can cause the Writable side of the stream to become paused if the output on the Readable side is not consumed. stream_implementing_a_transform_stream. I'm not sure what exactly that means for me though Commented Feb 16, 2018 at 22:51
  • Would you mind to provide a minimal example of a query that I can actually run to reproduce the issue? Commented Feb 18, 2018 at 8:44

1 Answer 1

1

You want to return the data as the second parameter to the callback. Like below.

const { Transform } = require('stream')

class AdapterStream extends Transform {
  constructor (adapter) {
    super({objectMode: true})
    // adapter is a function that contains the transform logic
    this.adapter = adapter
  }

  transform (chunk, _encoding, callback) {
    callback(null, this.adapter(chunk))
  }
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.