3

What I'm trying to accomplish: I query a collection(Conversations) that has a lot of documents in it. For each document/Conversation in the collection, I want to query another Collection(Users), to see if there is an existing User record that matches an ID attribute from that Conversation. So essentially i want to see if a User record exists for a user attached to the Conversation.

Users = { uid:someNumber, bunch of other attributes};

I know this is a problem with the asynchronous nature of node.js. Ive been trying to use async.js to solve this problem with callbacks. But i think i may have it wrong, or am not using it correctly.

The problem is that each conversation item in the conversation array is doing a query for an item, however, because the 'save' hasn't finished yet, the 'find' queries don't ever see that there is record that was already inserted. Here is my code. Maybe I'm doing something obvisouly wrong? So essentially, check the conversation record, if a user record coincides with a user on the conversation record don't do anything, if the user record doesn't exist, create a record.

Conversations.find().limit(1000).exec(function (err, data) {
    //data is an array of conversations, i want to loop through each conversation and compare one of the attribute with an attribute on the Users table
    async.each(data, function(item, callback1){
        //item is a single conversation, on this item there is a participants object that holds two user objects(name, id, type)
        async.each(item.participants, function(user, callback2){

            //this is where i do my query to see if a user exists
            Users.find({uid:user.participantId}).exec(function (err, results){
                //if the user doesn't exist then create a user record
                if(results.length == 0){
                    var user = new Users();
                    user.name =user.participantName;
                    user.uid = user.participantId;
                    user.type = user.participantType;

                    user.save(function(err, result){
                        console.log(result);
                        //after it has saved, callback2() so that the second item in the array will query against the Users table
                        callback2();
                    })
                }
                else{
                  callback2()
            })

        })
        //first item in the conversations array is completed, callback1(), second item should now start
        callback1();

    });
})
1
  • I would just change the way you approach your problem. Why not loop every item in conversation (simple forEach) and collect all participants, then get all unique (no async, can be _.unique) users. Now you need to check if they exist (async), and if not, save (async). Another way: just cache newly created users (map: user by Id) and check the cache before calling Users.find. Commented Nov 23, 2015 at 23:22

1 Answer 1

3

You can clean this up a lot and save on memory utilization by implementing "stream" processing as well as using .findOneAndUpdate():

var stream = Conversations.find().stream();

stream.on("data",function(item) {
    stream.pause();                 // pauses processing stream

    async.each(
        item.particpants,
        function(user,callback) {
            Users.findOneAndUpdate(
                { "uid": user.participantId },
                { "$setOnInsert": {
                    "name": user.participantName,
                    "type": user.participantType
                }},
                { "upsert": true, "new": true },
                callback
            );
        },
        function(err) {
            if (err) throw err;
            stream.resume();        // resume stream
        }
    );

});

stream.on("error",function(err) {
    // error handling
});

stream.on("end",function() {
    // Complete
});

Basically you avoid loading all results from Conversations into memory ( mongoose default ) by implementing the stream. Then as each item is read from the stream result you process .findOneAndUpdate() which both looks for the item present and returns the modified result.

The { "upsert": true } means that where it is not found then a new document is created in the collection. The $setOnInsert is a MongoDB modifier that makes sure that the "update" changes are only applied when a new document is created, therefore it does not alter the existing document when found.

Of course this can also be .update() where you don't do anything with the result ( as this is not really doing anything with the result ), but I'm leaving .findOneAndUpdate() in just in case you want to console.log() to see what is happening. Using .update() will be more efficient due to not needing to return the document, and basically takes the same arguments.

Aside from the inner async.each flow control, there is the stream control with .pause() and .resume(). These essentially control the flow of the outer entry, allowing one item at a time. You could extend on that to allow a pool of items to process in parallel, but this is the basic example.

Of course an event stream will also tell you when it is complete, and since the other flow control is already handling the other async operations, this will only be called when all items are complete.

Sign up to request clarification or add additional context in comments.

1 Comment

Beautiful! Absolutely great. Thanks for making me a better developer. I really appreciate your help and effort!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.