I use this node.js script to migrate MongoDB collection to a schema to another. It does work if the collection is <20k documents, but slows down to a crawl and sometime throw a
FATAL ERROR: JS Allocation failed - process out of memory
Is it because of a memory leak? Or the intensive call to process.nextTick() (once per document)?
My guess is that if the mongoDB server fast enough to the save/too many documents, callback piles up and eat the CPU/RAM away and end up crashing the script.
Is my guess right? Is there a memory leak? What can I do to speed up the script/fix existing error(s)?
var fs = require('fs'),
mongoose = require('mongoose'),
db = {}, //db.read will be the connection to the collection to migrate, db.write to the collection to migrate to
S_logs = {}, //Mongoose schema (read and write)
M_logs = {}, //Mongoose models (read and write)
config, //Config file
launched = 0,//Number of document migration started
ended = 0, //Number of document migration finished
percent = 0, //Used to calculate the progress bar
stats = {
ok: 0, //Number of succesful migration
error: 0 //Number of failed migration
};
function getCollection(callback) {
console.log('|0% |10% |20% |30% |40% |50% |60% |70% |80% |90% |100%')
M_logs.read.count({}, function (err, count) {
var stream = M_logs.read.find().stream(); //Stream the entire collection, document by document
stream.on('data', function (document) {
launched = launched + 1;
//Adapt the document to the new schema
var P_log = {};
for (name in config.write.data) {
if (config.write.data.hasOwnProperty(name)) {
P_log[name] = document[name] || '';
}
}
//Clear the queue
process.nextTick(function () {
//Save the new document
new M_logs.write(P_log).save(function (err) {
//Update the progress bar
while(ended > ((percent * count)/100)) {
process.stdout.write('-');
percent = percent + 1;
}
//Update the stats
if(err) {
stats.error = stats.error + 1;
} else {
stats.ok = stats.ok + 1;
}
ended = ended + 1;
});
});
}).on('error', function (err) {
launched = launched + 1;
while(ended > ((percent * count)/100)) {
process.stdout.write('-');
percent = percent + 1;
}
ended = ended + 1;
}).on('close', function () {
process.nextTick(function (){
//Wait for all transfert to end
var wait = setInterval(function () {
if(ended === launched) {
clearInterval(wait);
console.log('\nTransfert lancé: ' + launched);
console.log('Transfert terminé: ' + ended);
callback();
}
}, 1000);
});
});
});
}
function connect(callback) {
db.read = mongoose.createConnection(config.read.url);
db.read.on('error', console.error.bind(console, 'connection error:'));
db.read.once('open', function () {
db.write = mongoose.createConnection(config.write.url);
db.write.on('error', console.error.bind(console, 'connection error:'));
db.write.once('open', function () {
S_logs.read = new mongoose.Schema(
config.read.data,
{
strict: false,
collection: config.read.base
}
);
M_logs.read = db.read.model(config.read.base, S_logs.read, config.read.base);
S_logs.write = new mongoose.Schema(
config.write.data,
{
collection: config.write.base
}
);
S_logs.write.index(config.write.index, {unique: true});
M_logs.write = db.write.model(config.write.base, S_logs.write, config.write.base);
callback();
});
});
}
process.stdout.write('Reading config');
fs.readFile(process.argv[2] || 'config.js', function (err, data) {
if (err) {
console.log('Error in config.js:' + err);
process.exit();
}
config = JSON.parse(data);
console.log('...OK');
console.log('From ' + config.read.url + ' / ' + config.read.base);
console.log('To ' + config.write.url + ' / ' + config.write.base);
process.stdout.write('Connecting to Mongo')
connect(function () {
console.log('...OK');
getCollection(function () {
console.log('OK: ' + stats.ok + ' / Error: ' + stats.error);
process.exit();
});
});
});
Exemple of config file:
{
"read":
{
"url" : "mongodb://localhost/read",
"base" : "read_collection",
"data":
{
"user_ip" : "String",
"user_id" : "String",
"user_agent" : "String",
"canal_id" : "String",
"theme_id" : "String",
"video_id" : "String",
"time" : "Number",
"action" : "String",
"is_newuser" : "String",
"operator" : "String",
"template" : "String",
"catalogue" : "String",
"referer" : "String",
"from" : "String",
"request" : "String",
"smil" : "String",
"smil_path" : "String"
}
},
"write":
{
"url" : "mongodb://localhost/write",
"base" : "write_collection",
"data":
{
"user_ip" : "String",
"user_id" : "String",
"user_agent" : "String",
"canal_id" : "String",
"theme_id" : "String",
"pays" : "String",
"lang" : "String",
"video_id" : "String",
"time" : "Number",
"action" : "String",
"is_newuser" : "String",
"operator" : "String",
"template" : "String",
"catalogue" : "String",
"referer" : "String",
"from" : "String",
"request" : "String",
"smil" : "String",
"smil_path" : "String"
},
"index" :
{
"user_ip" : 1,
"user_id" : 1,
"time" : 1
}
}
}
Looking at the CPU and RAM usage while the script try to transfer a 500k documents collection, I can see that the CPU is not really a issue (oscillating between 50% and 75% max), but RAM slowly but steadily grow. My guess is I should find a way to regulate the number of document waiting for a answer of the mongoDB server, and pause the streaming until that number as fall to a reasonably low level...