What I'm Working On
I'm attempting to write a Transform stream that will be used as part of a process that reads data from an HTTP API (Salesforce), transforms each resource into a standard internal data structure, and then sends the transformed data to be indexed by Elasticsearch.
My Problem
I've written a simple Transform stream, but it appears to be swallowing data, instead of passing it through.
Here's my class AdapterStream
const { Transform } = require('stream')
class AdapterStream extends Transform {
constructor (adapter) {
super({objectMode: true})
// adapter is a function that contains the transform logic
this.adapter = adapter
}
_transform (chunk, _encoding, callback) {
this.push(this.adapter(chunk))
callback()
}
}
module.exports = AdapterStream
Here's how I'm using it
// query is an EventEmitter (https://jsforce.github.io/document/#query)
query.run({autoFetch: true, maxFetch: 10})
.pipe(transform)
.pipe(process.stdout)
What I expect to happen
I expect to see the transformed data in my terminal window.
What is actually happening
Nothing is printed in my terminal window
Things I've Tried
Writing to a file
query.run({autoFetch: true, maxFetch: 10})
.pipe(transform)
.pipe(fs.createWriteStream('./test.json'))
This creates the file ./test.json as expected, but it is empty.
Subscribing to events
query.run({autoFetch: true, maxFetch: 10})
.pipe(transform)
transform.on('readable', () => { console.log(transform.read()) })
This does what I expected .pipe(process.stdout) to do: print the transformed records to the console.
My question is, why doesn't pipe() do what I'm expecting it to do? There must be something simple that I'm missing.
Note: Care must be taken when using Transform streams in that data written to the stream can cause the Writable side of the stream to become paused if the output on the Readable side is not consumed.stream_implementing_a_transform_stream. I'm not sure what exactly that means for me though