0

I'm trying to write a node code that will read multiple json files from directory and insert mapped data into mongodb collection. In order to speed up the process, I have follwed following steps:

  • Put all filename inside Array.
  • Make Segment based on CPU length.
  • Create Worker thread read these file and store into MongoDB collection
  • Returning the combined result as a JSON result

Below Code Sample

readFiles.js

const {Worker} = require('worker_threads');
const os = require('os');
const cpuCount = os.cpus().length;
const path = require('path');
const workerPath = path.resolve('./worker.js');
const fileArray = ['D:/jsonFiles/abc_1310202220102022.json', 'D:/jsonFiles/def_1310202220102022.json', 'D:/jsonFiles/ghi_1310202220102022.json'];
let failureCount = 0;
let successCount = 0;
let failureMessage = [];
const splitRecords = (files, done) => {
  const segmentSize = Math.ceil(files.length/cpuCount);
  const segments = [];
  for (let segmentIndex=0; segmentIndex<cpuCount;segmentIndex++) {
    const start = segmentIndex * segmentSize; //0 * 60
    const end = start + segmentSize; // 0 + 60
    const segment = files.slice(start,end);
    segments.push(segment);
  }
  const finalSegments = [];
  segments.forEach((seg)=>{
    if(seg.length>0) {
      finalSegments.push(seg);
    }
  });
  done (finalSegments);
}

const createWorker = (records) => {
  return new Promise((resolve, reject) => {
    let workerData = {'records': records};
    const worker = new Worker(workerPath, {workerData});
    worker.on('message', function(data) {
      failureCount += data.failureCount;
      successCount += data.successCount;
      failureMessage.push(data.detailError);
      resolve({
            "failureCount":failureCount, "successCount":successCount, 
            "failureMessage":failureMessage
        });
    });
    worker.on('error', function(dataErr) {
      resolve(dataErr)
    });
  })
}

(async () => {
    if (fileArray.length > 0) {
        let segmentArray = [];
        splitRecords(fileArray, (data) => {
            segmentArray = data;
        });
        if (segmentArray.length > 0) {
            segmentArray.forEach(async (list) => {
                createWorker(list);
            }
        }
    }
})();

Worker.js

const mongoose = require('mongoose');
require('../config/mongoconfig')(mongoose);
const { workerData, isMainThread, parentPort } = require('worker_threads');
const axios = require('axios');
const Blog = require('../models/blogs');
let failureCount = 0;
let successCount = 0;
let detailsFailure = [];
var fs = require('fs');

const getFeatured = async function(imageURL) {
  return new Promise(async(resolve, reject) => {
    var url = cmsconfig.channelsUrl
    let options = {
      method: 'post',
      url: 'http://10.65.32.90:2045/v1/imageupload',
      headers: {},
      data: { imageUrl:imageURL}
    }
    await axios(options)
    .then(async function (response) {
      if (typeof response.data.url !== 'undefined') {
        resolve(response.data.url);
      } else {
        resolve();
      }
    })
    .catch(function (error) {
      resolve();
    })
  });
};
const mappingData = async (jsonPayload) => {
  return new Promise(async(resolve, reject) => {
      await Promise.all([jsonPayload["data"].forEach(async function (item) {
          let blogPayload = {};
          blogPayload["post_title"] = item.post_title;
          blogPayload["category"] = item.category;
          blogPayload["sub_category"] = item.sub_category;
          blogPayload["contents"] = item.contents;
          blogPayload["featured_image"] = await getFeatured(item.featured_image); // uploading file into AWS cloud and return url
          blogPayload["author"] = item.author;
          const newBlog = new Blog(blogPayload);
          await newBlog.save((idbErr) => {
              if (idbErr) {
                 failureCount += 1;
                 detailsFailure.push(idbErr);
              } else {
                 successCount += 1;
              }
          });
      })]);
  resolve({"successCount": successCount, "failureCount": failureCount, "detailError": detailsFailure});
  });
};
async () => {
  console.log("Worker started");
  if (!isMainThread ) {
    console.log("Inside not equal MainThread");
    const items = workerData.records;
    console.log("items length" + items.length);
    for (const dbData of items) {
      fs.readFile(dbData, async function(err, fileData) {
        if (err) throw err;
        let payload = JSON.parse(fileData.toString());
        let mappingResult = await mappingData(payload);
        parentPort.postMessage(mappingResult);
      });
    }
  } else {
    console.log("Outside MainThread");
  }
})();

Problems:

  1. When I run code CPU Utilization is two high
  2. MongoDB connection is also two high.
  3. Throwing multiple MongoDB connection issue
5
  • I'm sorry, but I can't find a question in this question. What are you having trouble with? Commented Jan 3, 2023 at 12:40
  • @AKX I have added code sample and problem both Commented Jan 3, 2023 at 13:50
  • @robertklep could you please help me to solve problem Commented Jan 3, 2023 at 13:51
  • .forEach() doesn't work with promises, so you're basically starting jsonPayload["data"].length concurrent asynchronous operations in each worker. Commented Jan 5, 2023 at 12:25
  • @robertklep but some data I am able to ingest arround 20k. Can you response with solution Commented Jan 5, 2023 at 14:02

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.