1

I was asked to import a csv file from a server daily and parse the respective header to the appropriate fields in mongoose.

My first idea was to make it to run automatically with a scheduler using the cron module.

const CronJob = require('cron').CronJob;
const fs      = require("fs");
const csv     = require("fast-csv")

new CronJob('30 2 * * *', async function() {
  await parseCSV();
  this.stop();
}, function() {
  this.start()
}, true);

Next, the parseCSV() function code is as follow: (I have simplify some of the data)

function parseCSV() {
  let buffer = [];

  let stream = fs.createReadStream("data.csv");
  csv.fromStream(stream, {headers:
        [
              "lot", "order", "cwotdt"
        ]
  , trim:true})
  .on("data", async (data) =>{
        let data = { "order": data.order, "lot": data.lot, "date": data.cwotdt};

        // Only add product that fulfill the following condition
        if (data.cwotdt !== "000000"){
              let product = {"order": data.order, "lot": data.lot}
              // Check whether product exist in database or not
              await db.Product.find(product, function(err, foundProduct){
                    if(foundProduct && foundProduct.length !== 0){
                          console.log("Product exists")
                    } else{
                          buffer.push(product);
                          console.log("Product not exists")
                    }    
              })
        }
  })
  .on("end", function(){
        db.Product.find({}, function(err, productAvailable){
              // Check whether database exists or not
              if(productAvailable.length !== 0){
                    // console.log("Database Exists");
                    // Add subsequent onward
                    db.Product.insertMany(buffer)
                    buffer = [];
              } else{
                    // Add first time
                    db.Product.insertMany(buffer)
                    buffer = [];
              }
        })
  });
}

It is not a problem if it's just a few line of rows in the csv file but just only reaching 2k rows, I encountered a problem. The culprit is due to the if condition checking when listening to the event handler on, it needs to check every single row to see whether the database contains the data already or not.

The reason I'm doing this is that the csv file will have new data added into it and I need to add all the data for the first time if database is empty or look into every single row and only add those new data into mongoose.

The 1st approach I did from here (as in the code),was using async/await to make sure that all the datas have been read before proceeding to the event handler end. This helps but I see from time to time (with mongoose.set("debug", true);), some data are being queried twice, which I have no idea why.

The 2nd approach was not to use the async/await feature, this has some downside since the data was not fully queried, it proceeded straight to the event handler end and then insertMany some of the datas which were able to get pushed into the buffer.

If i stick with the current approach, it is not an issue, but the query will take 1 to 2 minutes, not to mention even more if the database keeps growing. So, during those few minutes of querying, the event queue got blocked and therefore when sending request to the server, the server time out.

I used stream.pause() and stream.resume() before this code but I can't get it to work as it just jump straight to the end event handler first. This cause the buffer to be empty every single time since end event handler runs before the on event handler

I cant' remember the links that I used but the fundamentals that I got from is through this.

Import CSV Using Mongoose Schema

I saw these threads:

Insert a large csv file, 200'000 rows+, into MongoDB in NodeJS

Can't populate big chunk of data to mongodb using Node.js

to be similar to what I need but it's a bit too complicated for me to understand what is going on. Seems like using socket or a child process maybe? Furthermore, I still need to check conditions before adding into the buffer

Anyone care to guide me on this?

Edit: await is removed from console.log as it is not asynchronous

4
  • I would fork a child_process to handle the importing of data. By doing so, request to server will still function as normal. But that's if your sever have more than one vCPU. Commented Oct 15, 2018 at 13:46
  • Do you have a simple guideline on how to do this? I have never done this before. Yes, it has more than 1 CPU core Commented Oct 15, 2018 at 13:49
  • Posted my approach when i process 2gb of data file. Commented Oct 15, 2018 at 13:56
  • 1
    Thanks. I will have look at it and get back to you if it works Commented Oct 15, 2018 at 14:32

2 Answers 2

0

Forking a child process approach:

  1. When web service got a request of csv data file save it somewhere in app
  2. Fork a child process -> child process example
  3. Pass the file url to the child_process to run the insert checks
  4. When child process finish processing the csv file, delete the file

Like what Joe said, indexing the DB would speed up the processing time by a lot when there are lots(millions) of tuples.

Sign up to request clarification or add additional context in comments.

9 Comments

I tried with createIndex and it does help by a lot, from ~2mins, it dropped to like 15sec. I'm guessing if my database gets large, the time will increase?
The database query time will surely increase but during the processing part the event loop will be flooded and other API calls will be slowed down/unresponsive. The only way around it, is to process that big chuck of data in a different nodejs thread (forking).
@alexwck check out this 6min forking tutorial youtube.com/watch?v=tBZKQ53vVQI
Hey. Just want to tell you how grateful for the video link you gave. It freaking works ! No more worries on server time out ! Though this begs me to ask more: 1. Is using a child process like this consider beneficial or any downside from using child process (aside from opening too many that cause memory management issue? 2. Why not use cluster? (From the docs: "The worker processes are spawned using the child_process.fork() method, so that they can communicate with the parent via IPC and pass server handles back and forth.")
Hahaha. Sure !, if you ever come down and remember this guy from SO. Still , I can just have a worker doing this js code I'm having at the top. I can't fathom why so many child processes need to be created after just created one. Like you said, just queue the job in one child process.
|
0

If you create an index on order and lot. The query should be very fast.

db.Product.createIndex( { order: 1, lot: 1 }

Note: This is a compound index and may not be the ideal solution. Index strategies

Also, your await on console.log is weird. That may be causing your timing issues. console.log is not async. Additionally the function is not marked async

        // removing await from console.log
        let product = {"order": data.order, "lot": data.lot}
          // Check whether product exist in database or not
          await db.Product.find(product, function(err, foundProduct){
                if(foundProduct && foundProduct.length !== 0){
                      console.log("Product exists")
                } else{
                      buffer.push(product);
                      console.log("Product not exists")
                }    
          })

I would try with removing the await on console.log (that may be a red herring if console.log is for stackoverflow and hiding the actual async method.) However, be sure to mark the function with async if that is the case.

Lastly, if the problem still exists. I may look into a 2 tiered approach.

  1. Insert all lines from the CSV file into a mongo collection.
  2. Process that mongo collection after the CSV has been parsed. Removing the CSV from the equation.

3 Comments

Hi, sorry for the await on console.log, i actually did not run any of the console.log inside the condition of my code as I already commented out that 2 lines of code. (I will edit it ). I did try the 2 tiered approach before. Passing all CSV file into the collection that fulfilled the if (data.cwotdt !== "000000") condition only. This actually speeds up a lot, but I got stuck in the 2nd approach. How to let mongo to know that most of the documents already exists in the database and only add those that aren't ?
In addition, this kind of indexing strategy db.Product.createIndex( { order: 1, lot: 1 } are normally added between which part of the code?, Is it before the querying db.Product.find() ?
Woah, createIndex does help a ton lot. Thanks !

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.