0

I'm attempting to combine two projects:

Real-time apps using MongoDB

and

https://github.com/meanjs/mean

So far it's been going pretty well, but I've recently ran into a problem I don't quite understand. The error I'm getting is "cursor has no method 'intervalEach'". Now intervalEach is a prototype of the MongoDB Cursor. See code below:

'use strict';
/**
 * Module dependencies.
 */
var init = require('./config/init')(),
    config = require('./config/config'),
    mongoose = require('mongoose'),
    emitter = require("events").EventEmitter,
    mongo = require("mongodb"),
    QueryCommand = mongo.QueryCommand,
    Cursor = mongo.Cursor,
    Collection = mongo.Collection,
    http = require('http'),
    url = require('url'),
    fs = require('fs');

/**
 * Main application entry file.
 * Please note that the order of loading is important.
 */

// Heroku-style environment variables
var uristring = process.env.MONGOLAB_URI || "mongodb://localhost/logdb";
var mongoUrl = url.parse (uristring);

// Bootstrap db connection
var db = mongoose.connect(config.db);

// Init the express application
var app = require('./config/express')(db);

// Bootstrap passport config
require('./config/passport')();


// Start the app by listening on <port>
//app.listen(config.port);

// Init Socket.IO
var io = require("socket.io").listen(app.listen(config.port));

// Duck-punching mongodb driver Cursor.each.  This now takes an interval that waits
// "interval" milliseconds before it makes the next object request...
Cursor.prototype.intervalEach = function (interval, callback) {
    var self = this;
    if (!callback) {
        throw new Error("callback is mandatory");
    }

    if (this.state != Cursor.CLOSED) {
        //FIX: stack overflow (on deep callback) (cred: https://github.com/limp/node-mongodb-native/commit/27da7e4b2af02035847f262b29837a94bbbf6ce2)
        setTimeout(function () {
            // Fetch the next object until there is no more objects
            self.nextObject(function (err, item) {
                if (err != null) return callback(err, null);

                if (item != null) {
                    callback(null, item);
                    self.intervalEach(interval, callback);
                } else {
                    // Close the cursor if done
                    self.state = Cursor.CLOSED;
                    callback(err, null);
                }

                item = null;
            });
        }, interval);
    } else {
        callback(new Error("Cursor is closed"), null);
    }
};

//
// Open mongo database connection
// A capped collection is needed to use tailable cursors
//
mongo.Db.connect(uristring, function (err, db) {
    console.log("Attempting connection to " + mongoUrl.protocol + "//" + mongoUrl.hostname + " (complete URL supressed).");
    db.collection("log", function (err, collection) {
        collection.isCapped(function (err, capped) {
            if (err) {
                console.log("Error when detecting capped collection.  Aborting.  Capped collections are necessary for tailed cursors.");
                process.exit(1);
            }
            if (!capped) {
                console.log(collection.collectionName + " is not a capped collection. Aborting.  Please use a capped collection for tailable cursors.");
                process.exit(2);
            }
            console.log("Success connecting to " + mongoUrl.protocol + "//" + mongoUrl.hostname + ".");
            startIOServer(collection);
        });
    });
});


//
// Bind send action to "connection" event
//
function startIOServer(collection) {
    console.log("Starting ...");

    // for testing
    Cursor.intervalEach(300,function(err,item){

    });
    // Many hosted environments do not support all transport forms currently, (specifically WebSockets).
    // So we force a relatively safe xhr-polling transport.
    // Modify io.configure call to allow other transports.

    /*io.configure(function () {
        io.set("transports", config[platform].transports); // Set config in ./config.js
        io.set("polling duration", 10);
        io.set("log level", 2);
    });*/
    io.sockets.on("connection", function (socket) {
        readAndSend(socket, collection);
    });
};

//
// Read and send data to socket.
// The real work is done here upon receiving a new client connection.
// Queries the database twice and starts sending two types of messages to the client.
// (known bug: if there are no documents in the collection, it doesn't work.)
//
function readAndSend(socket, collection) {
    collection.find({}, {"tailable": 1, "sort": [
        ["$natural", 1]
    ]}, function (err, cursor) {
        console.log(cursor)
        cursor.intervalEach(300, function (err, item) { // intervalEach() is a duck-punched version of each() that waits N milliseconds between each iteration.
            if (item != null) {
                socket.emit("all", item); // sends to clients subscribed to type "all"
            }
        });
    });
    collection.find({"messagetype": "complex"}, {"tailable": 1, "sort": [
        ["$natural", 1]
    ]}, function (err, cursor) {
        cursor.intervalEach(900, function (err, item) {
            if (item != null) {
                socket.emit("complex", item); // sends to clients subscribe to type "complex"
            }
        });
    });
};




// Expose app
exports = module.exports = app;

// Logging initialization
console.log('LogFusion application started on port ' + config.port);

From what I can tell this is implemented correctly. If you view the project in the first link, their implementation works fine, so I'm not quite sure what I'm doing wrong.

My full source code can be grabbed here: LogFusion - Github

2
  • The cursor returned by collection.find is not the same as the exported Cursor, in fact it is hidden and doesn't even use a prototype :| Commented Jun 2, 2014 at 13:13
  • @Esailija Turns out it is a versioning issue. If I use "mongodb": "1.2.13" and "socket.io": "0.9.13", then it works fine. Commented Jun 2, 2014 at 17:00

3 Answers 3

1

We hit this too, and reverting to version 1.2 of the mongodb driver won't work for us. In the current default version of the driver (1.4.6 according to npm), your perfectly reasonable code won't work because of https://github.com/mongodb/node-mongodb-native/blob/1.4/lib/mongodb/scope.js. Groan. Thanks for the pointer @Esailija.

Sign up to request clarification or add additional context in comments.

Comments

1

I came across this problem when resurrecting some old demo code from a Mongo User Group. It seems the intervalEach construct has been done away with in favour of the (more idiomatic) node event model.

This works for me with the latest socket.io (1.0.6) and mongodb (1.4.7)

var dataStream = collection.find({}, {tailable: true, awaitdata: true}).stream();

dataStream.on('data', function(data) { 
    // do something with the data
});

dataStream.on('error', function(error) { 
    // handle error
});

There's also a fuller gist on github and a blog article about it by Jaan Paljasma.

Comments

0

This was due to a versioning issue with MongoDB and Socket.IO

If I use "mongodb": "1.2.13" and "socket.io": "0.9.13", then it works fine

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.