I am trying to upload and insert large csv files (100K's of rows; 10-100M+) into mongo.
The code below is the route I use to accept input from a form and insert the record first into a meta-data collection for all my csv's and then insert the records of the csv into it's own collection. It works for smaller files (thousands of rows) but takes waay too long when it gets in the order of 50K+.
The next snippet is using the csv stream for larger files (see below) but I get errors when trying to use the stream.
Question: Can someone help modify the first example into a stream so that it will proces large csv's without hanging.
exports.addCSV = function(req,res){
var body = req.body;
fileSystem.renameSync(req.files.myCSV.path, 'uploads/myFile', function(err){
if(err){
fileSystem.unlink(req.files.myCSV.path, function(){});
throw error;
}
});
var myObject = { userid: body.userid,
name: body.name,
description: body.description
};
var MongoClient = require('mongodb').MongoClient;
MongoClient.connect('mongodb://localhost:27017/csvdb', function(err, db){
if(err) throw err;
var collection = db.collection('myCSVs');
collection.insert(myObject, function(err, insertedMyObject){
csvParser.mapFile('uploads/myFile', function(err, allRows){
if (err) throw err;
var collectionId = "Rows_ForID_" + insertedMyObject[0]._id;
for (r in allRows) {
allRows[r].metric = parseFloat(allRows[r].metric);
}
var finalcollection = db.collection(collectionId);
finalcollection.insert(allRows, function(err, insertedAllRows) {
if (err) {
res.send(404, "Error");
}
else {
res.send(200);
}
});
});
});
});
}
EDIT (To get people to remove the Hold status):
I tried this approach using the stream:
exports.addCSV = function(req,res){
var body = req.body;
fileSystem.renameSync(req.files.myCSV.path, 'uploads/myFile', function(err){
if(err){
fileSystem.unlink(req.files.myCSV.path, function(){});
throw error;
}
});
var myObject = { userid: body.userid,
name: body.name,
description: body.description
};
var MongoClient = require('mongodb').MongoClient;
MongoClient.connect('mongodb://localhost:27017/csvdb', function(err, db){
if(err) throw err;
var collection = db.collection('myCSVs');
collection.insert(myObject, function(err, insertedMyObject){
var collectionId = "Rows_ForID_" + insertedMyObject[0]._id;
var finalcollection = db.collection(collectionId);
var q = async.queue(finalcollection.insert.bind(finalcollection), 5);
q.drain = function() {
console.log('all items have been processed');
}
csv()
.from.path('uploads/myFile', {columns: true})
.transform(function(data, index, cb){
q.push(data, cb);
})
.on('end', function () {
res.send(200);
console.log('on.end() executed');
})
.on('error', function (err) {
res.end(500, err.message);
console.log('on.error() executed');
});
});
});
}
But I get this error:
events.js:72
throw er; // Unhandled 'error' event
^
TypeError: object is not a function
Third, I tried this streaming approach:
var q = async.queue(function (task,callback) {
finalollection.insert.bind(task,function(err, row) { });
callback();
}, 5);
q.drain = function() {
console.log('all items have been processed');
}
csv()
.from.path('uploads/myFile', {columns: true})
.transform(function(data, index, cb){
q.push(data)
})
.on('end', function () {
res.send(200);
console.log('on.end() executed');
})
.on('error', function (err) {
res.end(500, err.message);
console.log('on.error() executed');
});
This inserts a few and then aborts:
all items have been processed
all items have been processed
Error: Request aborted
at IncomingMessage.<anonymous>
This one actually tries to insert multiple collections of the same csv into the db. Finally, I tried the one liner definition of q:
var q = async.queue(finalcollection.insert.bind(finalcollection), 5);
Along with:
.transform(function(data, index, cb){
q.push(data,function (err) {
console.log('finished processing foo');
});
})
And it inserts the collection several times and aborts each time (below is the output that happens each time - why is it not exiting correctly and re-inserting?):
finished processing foo
finished processing foo
finished processing foo
finished processing foo
finished processing foo
all items have been processed
Error: Request aborted
at IncomingMessage.<anonymous> (.../node_modules/express/node_modules/connect/node_modules/multiparty/index.js:93:17)
at IncomingMessage.EventEmitter.emit (events.js:92:17)
at abortIncoming (http.js:1892:11)
at Socket.serverSocketCloseListener (http.js:1904:5)
at Socket.EventEmitter.emit (events.js:117:20)
at TCP.close (net.js:466:12)
mapFilereads the entire CSV file into memory, for instance, which may cause swapping if your server is low on free memory. Also, instead of bulk-inserting all records at once withinsert, try to split them up into more manageable chunks. And don't forget to usefor (var r in ...)to prevent creating an overwritable global variable.