Environment: NodeJS, Express, DynamoDB (but could be any database really)
Scenario: Need to read a large number of records and return to the user as a downloadable file. This means that I cannot buffer all the content at once and then send it in a response from Express. Also, I may need to execute the query multiple times since all data might not be returned in one query.
Proposed Solution: Use a readable stream that can be piped to the response stream in Express.
I started by creating an object that inherits from stream.Readable and implemented a _read() method which pushes the query results. The problem is that the database query invoked in _read() is async but the stream.read() is a sync method.
When the stream is piped to the response of the server, the read is invoked several times before the db query even got a chance to execute. So the query is invoked multiple times and even when the first instance of the query finishes and does a push(null), the other queries complete and I get a "push() after EOF" error.
- Is there a way to do this properly with _read()?
- Should I forget about _read() and just execute the query and push() results in the constructor?
- Should I execute the query and emit data events instead of push()?
Thank you
function DynamoDbResultStream(query, options){
if(!(this instanceof DynamoDbResultStream)){
return new DynamoDbResultStream(query, options);
}
Readable.call(this, options);
this.dbQuery = query;
this.done = false;
}
util.inherits(DynamoDbResultStream, Readable);
DynamoDbResultStream.prototype._read = function(){
var self = this;
if(!this.done){
dynamoDB.query(this.dbQuery, function(err, data) {
if (!err) {
try{
for(i=0;i<data.Items.length;i++){
self.push(data.Items[i]);
}
}catch(err){
console.log(err);
}
if (data.LastEvaluatedKey) {
//Next read() should invoke the query with a new start key
self.dbQuery.ExclusiveStartKey = data.LastEvaluatedKey;
}else{
self.done=true;
self.push(null);
}
}else{
console.log(err);
self.emit('error',err);
}
});
}else{
self.push(null);
}
};
EDIT: After posting this question, I've found this post with an answer that shows how to do it without using inheritance: How to call an asynchronous function inside a node.js readable stream
A comment was made there that inside _read() there should only be one push(). And each push() will usually generate another read() invocation.
scramjetmodule, but I don't have such a simple readable interface yet. If you're still interested I could show you how to do asynchronous stream mapping that would fit the above scenario very well.