2

I'm having trouble processing a list of files line by line. Here is the code that I'm using :

var LineReader = require("line-by-line");
var async = require("async");
var files = [ "small.txt", "medium.txt", "large.txt" ];

var queue = async.queue(function(task, next){ console.log(task); next(); }, 10);

async.eachSeries(
    files,
    function (file, callback) {
        var lineReader = new LineReader(file, { encoding: "utf8", skipEmptyLines: true });

        lineReader.on("error", function (err) {
            callback(err);
        });

        lineReader.on("line", function (line) {
            lineReader.pause();
            queue.push(line);
        });

        queue.drain = function () {
            lineReader.resume(); // I need to resume the stream !
            callback(); // When all lines have been processed, I need to read the next file
        };
    },
    function (err) {
        if (err) return console.log(err);
        console.log("Job done.");
    }
);

I'm using async to "synchronously" process each file and process each line in queue, and line-by-line to read each file line by line.

My problem is :

  • If I pause the stream, push the line to the queue and resume the stream after I'm getting this error

RangeError: Maximum call stack size exceeded

  • If I pause the stream, push the line to the queue and waiting the queue to be empty, I can't resume the stream and execute the callback

q.drain = function () { lineReader.resume(); callback(); };

How can I wait until all lines have been processed and execute the callback to process the next file ?

Thank you.

UPDATE:

I found a strange thing with "line-by-line" module. The "end" event is emitted twice. So I decided to refactor the code and I found where the issue come. Another problem : the module has not been updated for a year and there is 2 pull requests sent 1 month ago.

Here is my solution (if line-by-line had worked) :

var LineReader = require("line-by-line");
var async = require("async");
var files = [ "small.txt", "medium.txt", "large.txt" ];

var queue = async.queue(function(task, next){ console.log(task); next(); }, 10);

async.eachSeries(
    files,
    function (file, callback) {
        var lineReader = new LineReader(file, { encoding: "utf8", skipEmptyLines: true });

        lineReader.on("error", function (err) {
            callback(err);
        });

        lineReader.on("end", function () {
            callback();
        });

        lineReader.on("line", function (line) {
            lineReader.pause();
            queue.push(line);
        });

        queue.drain = function () {
            lineReader.resume();
        };
    },
    function (err) {
        if (err) return console.log(err);
        console.log("Job done.");
    }
);

With this solution, we have only 1 line in the queue. If anyone have an idea to push more than 1 line and then pause the stream.

I will try to find another module without this issue, because I don't want to rewrite a new module for that.

3 Answers 3

2

I would solve this problem totaly different.

There is no need to listen to events or pause with the new stream API.
I would use gulp and through2 like so:

var gulp = require('gulp')
, thr = require('through2').obj
;

function fixLine (line) {
  // do stuff with a single line of a file.
  // just return it back for no reason :)
  return line
}

files = [ "small.txt", "medium.txt", "large.txt" ]
gulp.src(files).pipe(thr(function(vfs, enc, next){
  // vfs - vinyl filesystem.
  var str = vfs.contents.toString().split('\n').map(fixLine).join('\n')
  vfs.contents = new Buffer(str)
  next(null, vfs)
}))

However this is async. There is no guarantee that the order of the files will be the one in the array. But the line are, obviously, process in order.

I hope this helps.

Sign up to request clarification or add additional context in comments.

Comments

0

I like to use this function:

function emitLines(stream, re) {
    re = re || /\n/;
    var buffer = '';

    stream.on('data', stream_data);
    stream.on('end', stream_end);

    function stream_data(data) {
        buffer += data;
        flush();
    }

    function stream_end() {
        if (buffer) stream.emmit('line', buffer);
    }

    function flush() {
        var match;
        while ((match = re.exec(buffer))) {
            var index = match.index + match[0].length;
            stream.emit('line', buffer.substring(0, index));
            buffer = buffer.substring(index);
            re.lastIndex = 0;
        }
    }

}

When invoking this function on a stream, your stream will start broadcasting 'line' events \o/

Comments

0

I was looking for a good solution of doing something similar. I decided to try async.queue. Not using streams here, but you could easily adapt it.

Here it is, highly stripped down for the gist.

I had files that needed processing that contained hundreds of thousands of lines of text in a tab dilimited form:

eric    2/1/21 6
michael 2/2/22 3
sally   6/8/19 20

to process and insert into a database. Here, I use a concurrency value of 500 and push 100 items at a time. I use the queue.unsaturated callback to tell me there is room to push more in the queue. This keeps the code rather simple.

My monitor tells me this keeps the queue filled between 400-500 concurrent operations at a time, increasing the processing speed about 5x to 6x (over serial processing). There's probably room for improvement by using updateMany(), but this works pretty well so far.

Mongoose was the database module.

Code:

const lineByLine = require('n-readlines');
const async = require('async');

const concurrency = 500;
const inserts = 100;

const liner = new lineByLine("./name.txt");

add();

async function add() {

    let c = 0;          // line counter
    let eof = false;

    var q = async.queue(async function(gline, done) {
            // gline is a string of format <name>\t<date>\t<age>
            await processLine(gline);
            done();
    }, concurrency);

    // queue.unsaturated callback
    q.unsaturated(async function() {

        // end of file, display results exit
        if (eof && !q.length() && !q.running()) {

            await displayStatus();
            await mongoose.connection.close();
            process.exit(0);
        }
        // insert more lines to process
        for(var i=0;i<inserts;i++)
            getLine();
    });

    // initial insert lines to process
    for(var i=0;i<inserts;i++)
        getLine();

    // monitor
    setInterval(function() {
        displayStatus();
    },500);

    // display status
    async function displayStatus() {
        process.stdout.write('\r'+q.running()+' - '+c+'   ');
    }

    // get next line from the file and push to queue
    function getLine() {
        if (eof) return;    // don't get more lines if EOF
        while(line=liner.next()) {
            let gline = line.toString('ascii');
            c++;
            q.push(gline);
            return;
        }
        // if we reach here, we are at EOF
        eof = true;
    }
    // process line - use mongoose to add to database
    async function processLine(line) {

        // file format: <name>\t<date>\t<age>
        let s = line.split('\t');

        try {
            // mongoose database operation
            await MyDb.updateOne(
                { name: s[0] }, 
                { name: s[0], date: s[1], age: s[3] },
                { upsert: true } 
            );
        }
        catch (err) {
            console.log(err);
        }
    }

}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.