0

I may be lacking some in depth understanding of streams in general. However, I would like to know how efficiently what I need should work.

I want to implement so that a csv file would be read, then to each row a query to the database (or api) is made and data is attached. After that the row with attached data is written to a new csv file. I am using fast-csv node library for this.

Here is my implementation:

const fs = require("fs");
const csv = require("fast-csv");
const delay = t => new Promise(resolve => setTimeout(resolve, t));


const asyncFunction = async (row, csvStream) => {
  // Imitate some stuff with database
  await delay(1200);
  row.data = "data";
  csvStream.write(row);
};

const array = [];

const csvStream = csv.format({ headers: true });

const writeStream = fs.createWriteStream("output.csv");

csvStream.pipe(writeStream).on("finish", () => {
  console.log("End of writing");
});
fs.createReadStream("input.csv")
  .pipe(csv.parse({ headers: true }))
  .transform(async function(row, next) {
    array.push(asyncFunction(row, csvStream));
    next();
  })
  .on("finish", function() {
    console.log("finished reading file");
  //Wait for all database requests and writings to be finished to close write stream
    Promise.all(array).then(() => {
      csvStream.end();
      console.log("finished writing file");
    });
  });

Particularly I would like to know are there ways to optimize what I am doing here, because I feel that I am missing something important on how this library can be used for these type of cases

Regards, Rokas

2
  • Typically making one query per row isn’t a good idea. What kind of query is it – maybe rows can be batched? Commented Dec 16, 2019 at 9:51
  • Hello, sadly this cannot be done, the API which I am using does not have functionality to return many items. But I am more interested in knowing if I am doing the whole streaming part correctly and optimally Commented Dec 16, 2019 at 11:46

1 Answer 1

2

I was able to find a solution in fast-csv issues section. A good person doug-martin, provided this gist, on how you can do efficiently this kind of operation via Transform stream:

const path = require('path');
const fs = require('fs');
const { Transform } = require('stream');
const csv = require('fast-csv');

class PersistStream extends Transform {
    constructor(args) {
        super({ objectMode: true, ...(args || {}) });
        this.batchSize = 100;
        this.batch = [];
        if (args && args.batchSize) {
            this.batchSize = args.batchSize;
        }
    }

    _transform(record, encoding, callback) {
        this.batch.push(record);
        if (this.shouldSaveBatch) {
            // we have hit our batch size to process the records as a batch
            this.processRecords()
                // we successfully processed the records so callback
                .then(() => callback())
                // An error occurred!
                .catch(err => err(err));
            return;
        }
        // we shouldnt persist so ignore
        callback();
    }

    _flush(callback) {
        if (this.batch.length) {
            // handle any leftover records that were not persisted because the batch was too small
            this.processRecords()
                // we successfully processed the records so callback
                .then(() => callback())
                // An error occurred!
                .catch(err => err(err));
            return;
        }
        // no records to persist so just call callback
        callback();
    }

    pushRecords(records) {
        // emit each record for down stream processing
        records.forEach(r => this.push(r));
    }

    get shouldSaveBatch() {
        // this could be any check, for this example is is record cont
        return this.batch.length >= this.batchSize;
    }

    async processRecords() {
        // save the records
        const records = await this.saveBatch();
        // besure to emit them
        this.pushRecords(records);
        return records;
    }

    async saveBatch() {
        const records = this.batch;
        this.batch = [];
        console.log(`Saving batch [noOfRecords=${records.length}]`);
        // This is where you should save/update/delete the records
        return new Promise(res => {
            setTimeout(() => res(records), 100);
        });
    }
}

const processCsv = ({ file, batchSize }) =>
    new Promise((res, rej) => {
        let recordCount = 0;
        fs.createReadStream(file)
            // catch file read errors
            .on('error', err => rej(err))
            .pipe(csv.parse({ headers: true }))
            // catch an parsing errors
            .on('error', err => rej(err))
            // pipe into our processing stream
            .pipe(new PersistStream({ batchSize }))
            .on('error', err => rej(err))
            .on('data', () => {
                recordCount += 1;
            })
            .on('end', () => res({ event: 'end', recordCount }));
    });

const file = path.resolve(__dirname, `batch_write.csv`);
// end early after 30000 records
processCsv({ file, batchSize: 5 })
    .then(({ event, recordCount }) => {
        console.log(`Done Processing [event=${event}] [recordCount=${recordCount}]`);
    })
    .catch(e => {
        console.error(e.stack);
    });

https://gist.github.com/doug-martin/b434a04f164c81da82165f4adcb144ec

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.