2

I am trying to upload and insert large csv files (100K's of rows; 10-100M+) into mongo.

The code below is the route I use to accept input from a form and insert the record first into a meta-data collection for all my csv's and then insert the records of the csv into it's own collection. It works for smaller files (thousands of rows) but takes waay too long when it gets in the order of 50K+.

The next snippet is using the csv stream for larger files (see below) but I get errors when trying to use the stream.

Question: Can someone help modify the first example into a stream so that it will proces large csv's without hanging.

exports.addCSV = function(req,res){

var body = req.body;

fileSystem.renameSync(req.files.myCSV.path, 'uploads/myFile', function(err){
    if(err){
        fileSystem.unlink(req.files.myCSV.path, function(){});
        throw error;
    }
});

var myObject = {  userid: body.userid,
                  name: body.name, 
                  description: body.description 
               };

var MongoClient = require('mongodb').MongoClient;
MongoClient.connect('mongodb://localhost:27017/csvdb', function(err, db){

  if(err) throw err;

  var collection = db.collection('myCSVs');

  collection.insert(myObject, function(err, insertedMyObject){

        csvParser.mapFile('uploads/myFile', function(err, allRows){
                if (err) throw err;

                var collectionId = "Rows_ForID_" + insertedMyObject[0]._id;

                for (r in allRows) {
                    allRows[r].metric = parseFloat(allRows[r].metric);
                }

                var finalcollection = db.collection(collectionId);
                finalcollection.insert(allRows, function(err, insertedAllRows) {
                        if (err) {
                            res.send(404, "Error");
                        }
                        else { 
                            res.send(200);
                        }
                });
        });
    });
});

}

EDIT (To get people to remove the Hold status):

I tried this approach using the stream:

exports.addCSV = function(req,res){

  var body = req.body;

  fileSystem.renameSync(req.files.myCSV.path, 'uploads/myFile', function(err){
    if(err){
      fileSystem.unlink(req.files.myCSV.path, function(){});
      throw error;
    }
  });

  var myObject = {  userid: body.userid,
                name: body.name, 
                description: body.description 
             };

  var MongoClient = require('mongodb').MongoClient;
  MongoClient.connect('mongodb://localhost:27017/csvdb', function(err, db){

    if(err) throw err;

    var collection = db.collection('myCSVs');

    collection.insert(myObject, function(err, insertedMyObject){

      var collectionId = "Rows_ForID_" + insertedMyObject[0]._id;
      var finalcollection = db.collection(collectionId);
      var q = async.queue(finalcollection.insert.bind(finalcollection), 5);

      q.drain = function() {
          console.log('all items have been processed');
      }

      csv()
      .from.path('uploads/myFile', {columns: true})
      .transform(function(data, index, cb){

              q.push(data, cb); 

      })
      .on('end', function () {
          res.send(200);
          console.log('on.end() executed');
      })
      .on('error', function (err) {
          res.end(500, err.message);
          console.log('on.error() executed');
      });

  });

 });

}

But I get this error:

events.js:72
    throw er; // Unhandled 'error' event
          ^
TypeError: object is not a function

Third, I tried this streaming approach:

var q = async.queue(function (task,callback) {
finalollection.insert.bind(task,function(err, row) { });
callback();
}, 5);

q.drain = function() {
    console.log('all items have been processed');
}

csv()
.from.path('uploads/myFile', {columns: true})
.transform(function(data, index, cb){
    q.push(data) 
})
.on('end', function () {
    res.send(200);
    console.log('on.end() executed');
})
.on('error', function (err) {
    res.end(500, err.message);
    console.log('on.error() executed');
});

This inserts a few and then aborts:

all items have been processed
all items have been processed
Error: Request aborted
    at IncomingMessage.<anonymous>      

This one actually tries to insert multiple collections of the same csv into the db. Finally, I tried the one liner definition of q:

var q = async.queue(finalcollection.insert.bind(finalcollection), 5);

Along with:

.transform(function(data, index, cb){

                q.push(data,function (err) {
                    console.log('finished processing foo');
                });

})

And it inserts the collection several times and aborts each time (below is the output that happens each time - why is it not exiting correctly and re-inserting?):

finished processing foo
finished processing foo
finished processing foo
finished processing foo
finished processing foo
all items have been processed

Error: Request aborted
    at IncomingMessage.<anonymous>    (.../node_modules/express/node_modules/connect/node_modules/multiparty/index.js:93:17)
    at IncomingMessage.EventEmitter.emit (events.js:92:17)
    at abortIncoming (http.js:1892:11)
at Socket.serverSocketCloseListener (http.js:1904:5)
at Socket.EventEmitter.emit (events.js:117:20)
at TCP.close (net.js:466:12)
6
  • Refer the answer here: stackoverflow.com/questions/8045838/… Commented Dec 21, 2013 at 19:50
  • 1
    Try to determine where the slowness is coming from. mapFile reads the entire CSV file into memory, for instance, which may cause swapping if your server is low on free memory. Also, instead of bulk-inserting all records at once with insert, try to split them up into more manageable chunks. And don't forget to use for (var r in ...) to prevent creating an overwritable global variable. Commented Dec 21, 2013 at 20:17
  • phineas, I already looked at that post and there is no solution. This isn't a command line problem. robertkelp, too vague. Commented Dec 21, 2013 at 20:28
  • sure there is a solution ! simply split the csv into several fitting chunks on the file system and the software inserts the chunks seperate Commented Dec 21, 2013 at 20:53
  • Could you provide details of the timings to show where the slowness is occurring? It's not clear from reading your question. You may be saturating the DB. You might turn off write concerns for the connection. Commented Dec 21, 2013 at 23:13

1 Answer 1

1

You should deal with a big file with streams.

Here is a possible solution:

var queue = async.queue(collection.insert.bind(collection), 5);

csv()
.from.path('./input.csv', { columns: true })
.transform(function (data, index, cb) {
    queue.push(data, function (err, res) {
        if (err) return cb(err);
        cb(null, res[0]);
    });
})
.on('error', function (err) {
    res.send(500, err.message);
})
.on('end', function () {
    queue.drain = function() {
        res.send(200);
    };
});

Please note:

  • that we use the stream API of node-csv, which ensures that the data is processed at the same time as the file is read: in this way the whole file isn't read in memory at once. The transform handler is executed for each record;
  • that we use async.queue, which is an asynchronous processing queue: at most 5 handlers (finalcollection.insert) are executed in parallel.

This example should be tested, as I'm not really sure that it handles back pressure really well. Also, the concurrency level of the queue should be adjusted to your specific configuration.

You can also find a working gist here.

Sign up to request clarification or add additional context in comments.

8 Comments

I get this error when trying your code: dbName = self.db.databaseName; ^ TypeError: Cannot read property 'databaseName' of undefined
Looks like a binding issue, try with var queue = async.queue(finalcollection.insert.bind(finalcollection), 5);
events.js:72 throw er; // Unhandled 'error' event ^ TypeError: Object #<Object> has no method 'indexOf'
Hi Paul, I added much more info up top. Unfortunately the streamer is not working out right now, hope you can shed some light!
How about: calling res.end() inside the q.drain handler, and only adding q.drain when inside the .end() of the CSV stream?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.