0

enter image description here

I'm using node 18.7 on ubuntu. I'm trying to parse a bunch of csv files to objects (using csv-parse), ultimately to load into a db. Because there are large numbers of these I decided to try streams and I'd like to use the async await style.

So far I have:

const { parse } = require('csv-parse');
const path = __dirname + '/file1.csv';
const opt = { columns: true, relax_column_count: true, skip_empty_lines: true, skip_records_with_error: true };
console.log(path);
const { pipeline } = require('stream');
// const pipeline  = stream.pipeline;


async function readByLine(path, opt) {
    const readFileStream = fs.createReadStream(path);
    var csvParser = parse(opt, function (err, records) {
        if (err) throw err;
    });
    await pipeline(readFileStream, csvParser, (err) => {
        if (err) {
            console.error('Pipeline failed.', err);
        } else {
            console.log('Pipeline succeeded.');
        }
    });
    for await (const record of csvParser) {
        console.log(record);
    }
}

readByLine(path, opt)

When I run this I see:

Pipeline succeeded.

But the parsed objects are not sent to the console. What am I doing wrong?

edit1:

I changed the code to :

async function readByLine(path, opt) {
    const readFileStream = fs.createReadStream(path);
    var csvParser = parse(opt, function (err, records) {
        if (err) throw err;
    });
    await pipeline(readFileStream, csvParser, (err) => {
        if (err) {
            console.error('Pipeline failed.', err);
        } else {
            console.log('Pipeline succeeded.');
        }
    });
    // for await (const record of csvParser) {
    //     console.log(record);
    // }
    return csvParser;
}

(async function () {
    const o = await readByLine(path, opt);
    console.log(o);
})();

The result is an object which has a million properties, but some look set like in the screenshot.

6
  • How do you know if the objects were really parsed? Commented Aug 6, 2022 at 21:03
  • The code is based on working code in stackoverflow.com/questions/73256575/…. What would you suggest to test further? Commented Aug 6, 2022 at 21:07
  • Making readByLine return csvParser. Calling this function with await before it. Then, assigning its return to a variable and finally logging the result. Commented Aug 6, 2022 at 21:15
  • return csvParser before or after the 'await pipeline..' block? Commented Aug 6, 2022 at 21:28
  • I meant after. To remove the for await and return something from the async which will be wrapped to a Promise, in order to observe the output. Commented Aug 6, 2022 at 21:41

1 Answer 1

1

You can only useful await a promise.

The pipeline function you are using doesn't return a promise.

If you look at the documentation you will see:

The pipeline API provides a promise version, which can also receive an options argument as the last parameter with a signal <AbortSignal> property. When the signal is aborted, destroy will be called on the underlying pipeline, with an AbortError.

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');

async function run() {
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz')
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);

Note the different value passed to require. Use that version of pipeline instead.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.