2

Problem

I'm trying to write millions of strings into a file using Node.js streams, but the RAM usage goes up to 800MB during the process:

const fs = require('fs')
const walkdir = require('walkdir')

let options = {
  "max_depth": 0,
  "track_inodes": true
}

let dir = "C:/"
let paths = walkdir(dir, options)
var wstream = fs.createWriteStream('C:/test/file.txt')
wstream.write('[')

paths.on('path', function(path, stat) {
  wstream.write(`"${path}",`)
})

paths.on('end', function(path, stat) {
  wstream.write(']')
  wstream.end()

  // Compressing the file after it's written:
  const gzip = require('zlib').createGzip()
  const inp = fs.createReadStream('C:/test/file.txt')
  const out = fs.createWriteStream('C:/test/file.txt.gz')
  inp.pipe(gzip).pipe(out)
})

I also tried writing the file like this:

...
paths.on('path', function(path, stat) {
  fs.writeFileSync('C:/test/file.txt', path)
})
...

And I also tried sync:

walkdir.sync(dir, options, callback)

function callback(path) {
  let res = wstream.write(`"${path}",`)
  if (!res) {
    wstream.once('drain', callback)
  }
  else {
    callback()
  }
}

But both of these produce the same result, RAM usage goes up to like 500-800MB

I also tried the following method, the RAM usage always stays at ~100MB but it doesn't really work, it writes 412kb into the file and then it keeps utilizing CPU but nothing really happens (other methods finish writing the file in under 1-2 minutes)

const readdirp = require('readdirp');

const { Transform } = require('stream');
const entryInfoStream = readdirp({
  root: dir
});

entryInfoStream
  .pipe(new Transform({
    objectMode: true,
    transform(entryInfo, encoding, callback) {
      this.push(entryInfo.path);
      callback();
    },
  }))
  .pipe(wstream);

Questions

  • How do I make sure the stream works as expected (low memory usage)?

  • How do I compress (gzip) the file during the writing process? Or can I only do it after it's written?

4
  • Node does not automatically flush output streams as they are written to. A potential workaround to use a transform stream that supports a flush method to overcome this lacks documentation examples :-( Commented Mar 5, 2019 at 9:11
  • @traktor53 yeah, there's not enough documentation on this, I'm not sure how to create that transformation. I tried creating a readable stream const readable = require('stream').Readable and then sending the paths from within that emitter and then trying to write it like this readable.on('data', (path) => { WRITE HERE } hoping it would drain it automatically once it has readable/writable. But still not luck, I guess I'm doing it wrong Commented Mar 5, 2019 at 9:15
  • In node, directory walking is not really "stream"-ified so far, hence all the walk dir modules in npm have to keep internal buffers. See github.com/nodejs/node/issues/583 Commented Mar 5, 2019 at 10:02
  • @S.D. thanks for the info Commented Mar 5, 2019 at 10:23

2 Answers 2

2

You can implement entire logic without any external dependencies to see where to optimize. Below is a minimal implementation that you can tweak:

const fs = require('fs');
const path = require('path');
const zlib = require('zlib');
const stream = require('stream');

// Recursive walk file system
function walk(dir, str, busy) {
    busy.inc();
    fs.readdir(dir, (e, c) => {
        if (!e) {
            c.forEach(f => {
                const p = path.join(dir, f);
                busy.inc();
                fs.stat(p, (e, s) => {
                    if (!e && s.isDirectory()) {
                        walk(p, str, busy);
                    }
                    str.write(p + "\n");
                    busy.dec();
                });
            });
        }
        busy.dec();
    });
}

// Scan FS and write to file
async function scan(dir, dest) {
    return new Promise((resolve) => {
        const gzStr = zlib.createGzip();
        const destStr = fs.createWriteStream(dest);

        let count = 0;
        const busy = {
            inc: () => count++,
            dec: () => {
                count--;
                if (count < 1) {
                    process.nextTick(() => {
                        gzStr.end();
                        gzStr.once('finish', resolve);
                    });
                }
            }
        };

        walk(dir, gzStr, busy, resolve);
        gzStr.pipe(destStr);
    });
}

// Test above code
(async () => {
    // Save gzipped
    await scan(__dirname, './files.txt.gz');

    // Gunip to verify
    const unzipped = fs.createWriteStream('./files.txt');
    fs.createReadStream('./files.txt.gz').pipe(zlib.createGunzip()).pipe(unzipped);

    // End 
    unzipped.on('close', () => console.log('done'));
})();

Sign up to request clarification or add additional context in comments.

6 Comments

Thanks for the answer, mate, that helped me understand how to compress it during the process. That's a good solution, even though I think I have to add writer.once('drain', callback) somewhere in there because it also ramps up memory usage like the other methods I tried
@Un1 Updated with better walk termination detection.
Thanks, much appreciate it! So as I understand it, the walk(dir, gzStr, busy, resolve); has to gather all the paths synchronously in a variable, then it give it to zipper gzStr.pipe(destStr) which saves all the paths from RAM to disk, that's why if you scan a large dir (e.g. root of the disk) it will gather half a gig worth of paths in RAM, I get it now where that bottleneck is. I wish zlib could zip all the paths as it gathers them, but I don't think that's even possible. Anyway, that's one quick recursive function, thanks a bunch! It should do the trick
@Un1 Not entirely true, pipe() sets up a stream connection. As soon as readDir() returns files inside a directory, each file is sent to stat(), and written to gzip stream. Gzip stream encodes and forwards it to file output stream. There is no buffer collecting files, apart from internal buffers of streams. But streams won't keep buffers because there is nothing stopping items from being written to disk as soon they are scanned. The ONLY chunk is result of readDir(). If there are a million files in the same directory, readDir will return an array that big.
I don't entirely understand how it happens, do you mean zlib can technically keep re-compressing the whole buffer by appending new data as it receives it? I posted a question on this here. No one seems to know why it doesn't work
|
0

It's because your doing things asynchronously without any limits. Each path is going to create a new event for paths.on('path', ...) so all your paths are being loaded onto the event loop much faster than they are being processed hence the spike in memory. You need to limit the amount of paths that are being written at a time.

You can limit it by using walkdir.sync, but this means you'll only be able to process one path at a time. Also, depending on how you implement it, you might still end up discovering paths faster than you can write to your stream.

A more flexible solution is to track how many concurrent paths you are processing and pause the stream once you've hit the limit.

const fs = require('fs')
const walkdir = require('walkdir')

let options = {
  "max_depth": 0,
  "track_inodes": true
}

let dir = "C:/"
let paths = walkdir(dir, options)
var wstream = fs.createWriteStream('C:/test/file.txt')
wstream.write('[')

const maxPaths = 20; // Maximum amount of concurrent paths allowed to process
let currentPaths = 0; // Current amount of concurrent paths being processed
let deferredPaths = []; // If we somehow exceed the limit, store the excess paths here for later processing. This might not be necessary, depending on how walkdir implements their pause function

const finishPathFlush = () => {
  if (deferredPaths.length > 0) {
    // Process any paths in the deferred queue
    wstream.write('"' + deferredPaths.pop() + '",', finishPathFlush);
  } else {
    // No more work to do, resume walkdir
    --currentPaths;
    paths.resume();
  }
}

paths.on('path', function(path, stat) {
  if (currentPaths < maxPaths) {
    // We have room to process this path
    if (++currentPaths === maxPaths) {
      // If we reach the limit pause walkdir
      paths.pause();
    }
    wstream.write(`"${path}",`, finishPathFlush)
  } else {
    // Got too many paths, defer this path
    deferredPaths.push(path);
  }
})

paths.on('end', function(path, stat) {
  wstream.write(']')
  wstream.end()

  // Compressing the file after it's written:
  const gzip = require('zlib').createGzip()
  const inp = fs.createReadStream('C:/test/file.txt')
  const out = fs.createWriteStream('C:/test/file.txt.gz')
  inp.pipe(gzip).pipe(out)
})

1 Comment

Thanks for the answer, unfortunately neither of the two proposed solutions work. It's quite strange. If i use walkdir.sync it still uses 800MB of ram, it's just twice as slow. But when I use the track/pause solution it takes about 10 times longer and the RAM usage also climbs up to 800MB.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.