7

I am trying to insert couple of millions records (with approximately 6 fields/columns) by receiving in requests from clients 10,000 records per bulk insert attempt (using sequelize.js and bulkCreate())

This obviously was a bad idea, so I tried looking into node-pg-copy-streams

However, I do not want to initiate a change on the client side, where a json array is sent as such

# python
data = [
    {
     "column a":"a values",
     "column b":"b values",
    },
    ...
    # 10,000 items
    ...
]
request.post(data=json.dumps(data), url=url)

On the Server side in nodejs, how would I stream the received request.body in the following skeleton ?

.post(function(req, res){

    // old sequelize code
    /* table5.bulkCreate(
        req.body, {raw:true}
    ).then(function(){
        return table5.findAll();
    }).then(function(result){
        res.json(result.count);
    });*/

    // new pg-copy-streams code
    pg.connect(function(err, client, done) {
    var stream = client.query(copyFrom('COPY my_table FROM STDIN'));
    // My question is here, how would I stream or pipe the request body ?
    // ?.on('error', done);
    // ?.pipe(stream).on('finish', done).on('error', done);
    });
});
4
  • Please let me know how I can improve this question, this probably is my first question or first question in forever Commented Jan 8, 2016 at 23:04
  • I am currently looking into how to stream a string or array. Commented Jan 8, 2016 at 23:10
  • 1
    postgres copy defines three formats allowed on input: text, csv and binary. I am no expert but the formats are described here: postgresql.org/docs/9.3/static/sql-copy.html (Search for "File formats" headline) Commented Jan 8, 2016 at 23:13
  • @thst thanks, that is helpful. I might be understanding it wrong, but whether text, csv or binary, doesn't it have to be streamed? I will actually look into the text part, but that still needs to be streamed? Commented Jan 8, 2016 at 23:16

2 Answers 2

3

Here's how I solved my problem,

First a function to convert my req.body dict to a TSV (not a part of the initial problem)

/**
 * Converts a dictionary and set of keys to a Tab Separated Value blob of text
 * @param {Dictionary object} dict
 * @param {Array of Keys} keys
 * @return {Concatenated Tab Separated Values} String
 */
function convertDictsToTSV(dicts, keys){
    // ...
}

Second the rest of my original .post function

.post(function(req, res){
    // ...
    /* requires 'stream' as 
     * var stream = require('stream');
     * var copyFrom = require('pg-copy-streams').from;
     */
    var read_stream_string = new stream.Readable();
    read_stream_string.read = function noop() {};
    var keys = [...]; // set of dictionary keys to extract from req.body 
    read_stream_string.push(convertDictsToTSV(req.body, keys));
    read_stream_string.push(null);
    pg.connect(connectionString, function(err, client, done) {
        // ...
        // error handling
        // ...
        var copy_string = 'Copy tablename (' + keys.join(',') + ') FROM STDIN'
        var pg_copy_stream = client.query( copyFrom( copy_string ) );
        read_stream_string.pipe(pg_copy_stream).on('finish', function(finished){
            // handle finished and done appropriately
        }).on('error', function(errored){
            // handle errored and done appropriately
        });
    });
    pg.end();
});
Sign up to request clarification or add additional context in comments.

1 Comment

this answer is still fairly accurate, 6 years later, but be warned of back pressure. nodejs.org/en/docs/guides/backpressuring-in-streams In my use case I'm testing a DB and populating it with random values. Doing so one can exceed the cache buffer limits of either the node or postgres app. The result of read_stream_string.push should be read for true or false and have a chance to pause
0

Technically, there is no streaming here, not in terms of how NodeJS streaming works.

You are sending a chunk of 10,000 records each time and expect your server-side to insert those and return an OK to the client to send another 10,000 records. That's throttling/paging data in, not streaming.

Once your server has received the next 10,000 records, insert them (usually as a transaction), and then respond with OK back to the client so it can send the next 10,000 records.

Writing transactions with node-postgres isn't an easy task, as it is too low-level for that.

Below is an example of how to do that with the help of pg-promise:

function insertRecords(records) {
    return db.tx(t=> {
        var inserts = [];
        records.forEach(r=> {
            var query = t.none("INSERT INTO table(fieldA, ...) VALUES(${propA}, ...)", r);
            inserts.push(query);
        });
        return t.batch(inserts);
    });
}

Then inside your HTTP handler, you would write:

function myPostHandler(req, res) {        
    // var records = get records from the request;    
    insertRecords(records)
        .then(data=> {
            // set response as success;
        })
        .catch(error=> {
            // set response as error;
        });    
}

5 Comments

Thanks, this helps but is still a bit inefficient on the big picture. I compared this to the speed of COPY FROM, and it's slower, so I'm going with that instead of INSERT
If you want to do it efficiently, you should be streaming data directly from client into the database, via TCP-IP, using web IO.
Yes, that is definitely more efficient! However changing the client's code isn't that feasible and also introducing access to the database also isn't practical in our current setup where clients and databases are on different networks. Changing the server code was our least path of resistance.
I don't know what kind of speed tests you did, but I would suggest feeding 1000 records from the client at a time, and then see how it performs compared to streaming. My guess - it will be about the same. 10,000 is just too many, you most likely overload your server-side.
We basically ran 'time python ... ' on the client side to compare results, and actually we ran it at 30,000-500,000 records/rows per upload as well. Our total size per data set was around a million rows and we had 40-50 datasets

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.