I'm attempting to combine two projects:
and
https://github.com/meanjs/mean
So far it's been going pretty well, but I've recently ran into a problem I don't quite understand. The error I'm getting is "cursor has no method 'intervalEach'". Now intervalEach is a prototype of the MongoDB Cursor. See code below:
'use strict';
/**
* Module dependencies.
*/
var init = require('./config/init')(),
config = require('./config/config'),
mongoose = require('mongoose'),
emitter = require("events").EventEmitter,
mongo = require("mongodb"),
QueryCommand = mongo.QueryCommand,
Cursor = mongo.Cursor,
Collection = mongo.Collection,
http = require('http'),
url = require('url'),
fs = require('fs');
/**
* Main application entry file.
* Please note that the order of loading is important.
*/
// Heroku-style environment variables
var uristring = process.env.MONGOLAB_URI || "mongodb://localhost/logdb";
var mongoUrl = url.parse (uristring);
// Bootstrap db connection
var db = mongoose.connect(config.db);
// Init the express application
var app = require('./config/express')(db);
// Bootstrap passport config
require('./config/passport')();
// Start the app by listening on <port>
//app.listen(config.port);
// Init Socket.IO
var io = require("socket.io").listen(app.listen(config.port));
// Duck-punching mongodb driver Cursor.each. This now takes an interval that waits
// "interval" milliseconds before it makes the next object request...
Cursor.prototype.intervalEach = function (interval, callback) {
var self = this;
if (!callback) {
throw new Error("callback is mandatory");
}
if (this.state != Cursor.CLOSED) {
//FIX: stack overflow (on deep callback) (cred: https://github.com/limp/node-mongodb-native/commit/27da7e4b2af02035847f262b29837a94bbbf6ce2)
setTimeout(function () {
// Fetch the next object until there is no more objects
self.nextObject(function (err, item) {
if (err != null) return callback(err, null);
if (item != null) {
callback(null, item);
self.intervalEach(interval, callback);
} else {
// Close the cursor if done
self.state = Cursor.CLOSED;
callback(err, null);
}
item = null;
});
}, interval);
} else {
callback(new Error("Cursor is closed"), null);
}
};
//
// Open mongo database connection
// A capped collection is needed to use tailable cursors
//
mongo.Db.connect(uristring, function (err, db) {
console.log("Attempting connection to " + mongoUrl.protocol + "//" + mongoUrl.hostname + " (complete URL supressed).");
db.collection("log", function (err, collection) {
collection.isCapped(function (err, capped) {
if (err) {
console.log("Error when detecting capped collection. Aborting. Capped collections are necessary for tailed cursors.");
process.exit(1);
}
if (!capped) {
console.log(collection.collectionName + " is not a capped collection. Aborting. Please use a capped collection for tailable cursors.");
process.exit(2);
}
console.log("Success connecting to " + mongoUrl.protocol + "//" + mongoUrl.hostname + ".");
startIOServer(collection);
});
});
});
//
// Bind send action to "connection" event
//
function startIOServer(collection) {
console.log("Starting ...");
// for testing
Cursor.intervalEach(300,function(err,item){
});
// Many hosted environments do not support all transport forms currently, (specifically WebSockets).
// So we force a relatively safe xhr-polling transport.
// Modify io.configure call to allow other transports.
/*io.configure(function () {
io.set("transports", config[platform].transports); // Set config in ./config.js
io.set("polling duration", 10);
io.set("log level", 2);
});*/
io.sockets.on("connection", function (socket) {
readAndSend(socket, collection);
});
};
//
// Read and send data to socket.
// The real work is done here upon receiving a new client connection.
// Queries the database twice and starts sending two types of messages to the client.
// (known bug: if there are no documents in the collection, it doesn't work.)
//
function readAndSend(socket, collection) {
collection.find({}, {"tailable": 1, "sort": [
["$natural", 1]
]}, function (err, cursor) {
console.log(cursor)
cursor.intervalEach(300, function (err, item) { // intervalEach() is a duck-punched version of each() that waits N milliseconds between each iteration.
if (item != null) {
socket.emit("all", item); // sends to clients subscribed to type "all"
}
});
});
collection.find({"messagetype": "complex"}, {"tailable": 1, "sort": [
["$natural", 1]
]}, function (err, cursor) {
cursor.intervalEach(900, function (err, item) {
if (item != null) {
socket.emit("complex", item); // sends to clients subscribe to type "complex"
}
});
});
};
// Expose app
exports = module.exports = app;
// Logging initialization
console.log('LogFusion application started on port ' + config.port);
From what I can tell this is implemented correctly. If you view the project in the first link, their implementation works fine, so I'm not quite sure what I'm doing wrong.
My full source code can be grabbed here: LogFusion - Github