36

I have been searching for an example of how I can stream the result of a MongoDB query to a nodejs client. All solutions I have found so far seem to read the query result at once and then send the result back to the server.

Instead, I would (obviously) like to supply a callback to the query method and have MongoDB call that when the next chunk of the result set is available.

I have been looking at mongoose - should I probably use a different driver?

Jan

5 Answers 5

41

node-mongodb-driver (the underlying layer that every mongoDB client uses in nodejs) except the cursor API that others mentioned has a nice stream API (#458). Unfortunately i did not find it documented elsewhere.

Update: there are docs.

It can be used like this:

var stream = collection.find().stream()
stream.on('error', function (err) {
  console.error(err)
})
stream.on('data', function (doc) {
  console.log(doc)
})

It actually implements the ReadableStream interface, so it has all the goodies (pause/resume etc)

Sign up to request clarification or add additional context in comments.

3 Comments

I found documentation to what @Dan Milon is referring to at mongodb.github.com website. here it is CusrsorStream
Didn't even know this existed! Thanks! mongodb.github.com/node-mongodb-native that is for the matter.
The documentation is now at docs.mongodb.com/drivers/node/current/fundamentals/crud/… - I've updated the post to this effect.
34

Streaming in Mongoose became available in version 2.4.0 which appeared three months after you've posted this question:

Model.where('created').gte(twoWeeksAgo).stream().pipe(writeStream);

More elaborated examples can be found on their documentation page.

2 Comments

Mongoose: Query.prototype.stream() is deprecated in mongoose >= 4.5.0, use Query.prototype.cursor() instead
Can i execute 4 queries using streams and write in the same writableStream?
11

mongoose is not really "driver", it's actually an ORM wrapper around the MongoDB driver (node-mongodb-native).

To do what you're doing, take a look at the driver's .find and .each method. Here's some code from the examples:

// Find all records. find() returns a cursor
collection.find(function(err, cursor) {
  sys.puts("Printing docs from Cursor Each")
  cursor.each(function(err, doc) {
    if(doc != null) sys.puts("Doc from Each " + sys.inspect(doc));
  })                    
});

To stream the results, you're basically replacing that sys.puts with your "stream" function. Not sure how you plan to stream the results. I think you can do response.write() + response.flush(), but you may also want to checkout socket.io.

6 Comments

Thanks - the driver issue I found out about yesterday. The find/cursor solution I was expecting, but it is surprisingly hard to find examples. Most do a find and then docs.foreach( ... )
Update: Actually, I did not get this to work in the way you describe. What I had to do is to create an EventEmitter to glue the response stream and the data stream from the backend together.
You're right about the examples, best you can hope for is the "examples" folder in the source code. EventEmitter also sounds correct. If you have a good example, we can definitely update this answer with something more detailed.
Ok, edited your answer to point to mine so I can accept yours
@JanAlgermissen Do you have a link to a proof-of-concept? That would be handy.
|
4

Here is the solution I found (please correct me anyone if thatis the wrong way to do it): (Also excuse the bad coding - too late for me now to prettify this)

var sys = require('sys')
var http = require("http");

var Db = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Db,
  Connection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Connection,
  Collection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Collection,
  Server = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Server;

var db = new Db('test', new Server('localhost',Connection.DEFAULT_PORT , {}));

var products;

db.open(function (error, client) {
  if (error) throw error;
  products = new Collection(client, 'products');
});

function ProductReader(collection) {
        this.collection = collection;
}

ProductReader.prototype = new process.EventEmitter();

ProductReader.prototype.do = function() {
        var self = this;

        this.collection.find(function(err, cursor) {
                if (err) {
                        self.emit('e1');
                        return;

                }
                sys.puts("Printing docs from Cursor Each");

                self.emit('start');
                cursor.each(function(err, doc) {
                        if (!err) {
                                self.emit('e2');
                                self.emit('end');
                                return;
                        }

                        if(doc != null) {
                                sys.puts("doc:" + doc.name);
                                self.emit('doc',doc);
                        } else {
                                self.emit('end');
                        }
                })
        });
};
http.createServer(function(req,res){
        pr = new ProductReader(products);
        pr.on('e1',function(){
                sys.puts("E1");
                res.writeHead(400,{"Content-Type": "text/plain"});
                res.write("e1 occurred\n");
                res.end();
        });
        pr.on('e2',function(){
                sys.puts("E2");
                res.write("ERROR\n");
        });

        pr.on('start',function(){
                sys.puts("START");
                res.writeHead(200,{"Content-Type": "text/plain"});
                res.write("<products>\n");
        });

        pr.on('doc',function(doc){
                sys.puts("A DOCUMENT" + doc.name);
                res.write("<product><name>" + doc.name + "</name></product>\n");
        });

        pr.on('end',function(){
                sys.puts("END");
                res.write("</products>");
                res.end();
        });

        pr.do();

  }).listen(8000);

Comments

0

I have been studying mongodb streams myself, while I do not have the entire answer you are looking for, I do have part of it. you can setup a socket.io stream

this is using javascript socket.io and socket.io-streaming available at NPM also mongodb for the database because using a 40 year old database that has issues is incorrect, time to modernize also the 40 year old db is SQL and SQL doesn't do streams to my knowledge

So although you only asked about data going from server to client, I also want to get client to server in my answer because I can NEVER find it anywhere when I search and I wanted to setup one place with both the send and receive elements via stream so everyone could get the hang of it quickly.

client side sending data to server via streaming

stream = ss.createStream();
blobstream=ss.createBlobReadStream(data);
blobstream.pipe(stream);
ss(socket).emit('data.stream',stream,{},function(err,successful_db_insert_id){
 //if you get back the id it went into the db and everything worked
});

server receiving stream from the client side and then replying when done

ss(socket).on('data.stream.out',function(stream,o,c){
 buffer=[];
 stream.on('data',function(chunk){buffer.push(chunk);});
 stream.on('end',function(){
  buffer=Buffer.concat(buffer);
  db.insert(buffer,function(err,res){
   res=insertedId[0];
   c(null,res);
  });
 });
});

//This is the other half of that the fetching of data and streaming to the client

client side requesting and receiving stream data from server

stream=ss.createStream();
binarystring='';
stream.on('data',function(chunk){ 
 for(var I=0;i<chunk.length;i++){
  binarystring+=String.fromCharCode(chunk[i]); 
 }
});
stream.on('end',function(){ data=window.btoa(binarystring); c(null,data); });
ss(socket).emit('data.stream.get,stream,o,c);

server side replying to request for streaming data

ss(socket).on('data.stream.get',function(stream,o,c){
 stream.on('end',function(){
  c(null,true);
 });
 db.find().stream().pipe(stream);
});

The very last one there is the only one where I am kind of just pulling it out of my butt because I have not yet tried it, but that should work. I actually do something similar but I write the file to the hard drive then use fs.createReadStream to stream it to the client. So not sure if 100% but from what I read it should be, I'll get back to you once I test it.

P.s. anyone want to bug me about my colloquial way of talking, I'm Canadian, and I love saying "eh" come at me with your hugs and hits bros/sis' :D

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.