14

I am trying to use an AWS Lambda function to handle events from SendGrid. As I understand it, the event will be an array with a variable number of JSON objects, each representing a given event. I want to write these events to DynamoDB using batchWriteItem and loop the process until I'm not returned any UnprocessedItems. However, I'm getting stuck in an infinite loop. Here is my code now:

console.log('Loading function');

var aws = require('aws-sdk');
var dynamo = new aws.DynamoDB();
params = {};

exports.handler = function(sg_event, context) {

    var items = [];
    for(var i = 0; i < sg_event.length; i++) {
        var obj = sg_event[i];
        var request = {
            PutRequest: {
                Item: {
                    email: { S: obj.email },
                    timestamp: { S: obj.timestamp.toString() },
                    sg_message_id: { S: obj.sg_message_id },
                    event: { S: obj.event }
                }
            }
        };
        items.push(request);
    }

    params = {
        RequestItems: {
            sendgrid_response: items
        }
    }

    do {
        dynamo.batchWriteItem( params, function(err, data) {
            if(err)
                context.fail(err);
            else
                params.RequestItems = data.UnprocessedItems;
        });
    } while(!isEmpty(params.RequestItems));
};

function isEmpty(obj) {
    return (Object.keys(obj).length === 0);
}

I think the problem is trying to set the params in the callback function, but I don't know how else I'm supposed to do it...I know I could just call another batchWriteItem using UnprocessedItems within the callback of the original one, but I still need to be able to run the function as many times as needed to ensure all UnprocessedItems are written. How can I loop the batchWriteItem correctly?

6 Answers 6

10

@Daniela Miao, Thanks for sharing the solution.

We can add one code block in your posted code which will avoid exception from DynamoDB. This will check if params.RequestItems has Unprocessed data before requesting the DynamoDB for batch write again.

//db is AWS.DynamoDB Client
var processItemsCallback = function(err, data) {
  if (err) { 
     //fail
  } else {
    var params = {};
    params.RequestItems = data.UnprocessedItems;
    /*
    * Added Code block 
    */
    if(Object.keys(params.RequestItems).length != 0) {
      db.batchWriteItem(params, processItemsCallback);
    }
  }
};

db.batchWriteItem(/*initial params*/, processItemsCallback);
Sign up to request clarification or add additional context in comments.

Comments

6

This is my code sample using the "await" syntax. So this code must be in an async function. It does a random delay before retry.

do {
    batchWriteResp = await dynamo.batchWriteItem({RequestItems:batchWriteItems}).promise()
    if (Object.keys(batchWriteResp.UnprocessedItems).length>0) {
        batchWriteItems = batchWriteResp.UnprocessedItems
        // delay a random time between 0.5~2.5 seconds
        const delay = Math.floor(Math.random() * 2000 + 500)
        await new Promise(resolve => setTimeout(resolve, delay));
    } else {
        break
    }
} while (true)

1 Comment

I found the condition can bork if you don't check for Unprocessed items like so: (batchWriteResp && batchWriteResp.UnprocessedItems && Object.keys(batchWriteResp.UnprocessedItems) && Object.keys(batchWriteResp.UnprocessedItems).length > 0)
5

Nodejs is single-threaded and it executes all the main functions first so your while loop will never finish and the callback will never execute.

Here is how you do it:

//db is AWS.DynamoDB Client
var processItemsCallback = function(err, data) {
  if (err) { 
     //fail
  } else {
    var params = {};
    params.RequestItems = data.UnprocessedItems;
    db.batchWriteItem(params, processItemsCallback);
  }
};

db.batchWriteItem(/*initial params*/, processItemsCallback);

4 Comments

Shoud be data.UnprocessedKeys
@PiminKonstantinKefaloukos No, UnprocessedItems is correct. See the docs
Is there a chance that this can get stuck in infinite loop?
You're probably better off using an exponential backoff on db.batchWriteItem if there are unprocessed items.
1

I find the accepted answer unnecessarily recursive, and as a result it's very difficult to read and maintain. How about the following?

const isEmptyObj = obj => {
    for (const k of obj) return false;
    return true;
};

async writeDynamodbBatch(db, itemsToWrite) => {
    
    // Note that `itemsToWrite` should look like:
    // { [tableName]: [ item1, item2, item3, ... ] }    
    
    while (!isEmptyObj(itemsToWrite)) {
        
        const result = await db.batchWriteItem({ RequestItems: itemsToWrite }).promise();
        itemsToWrite = result.UnprocessedItems;
        
    }
    
};

Comments

-1

A pretty neat solution using recursion for BatchGetItem (which works the exact same way as BatchWrite):

    public async batchGet(params: BatchGetItemInput, output: BatchGetResponseMap): Promise<BatchGetResponseMap> {
    const batchGetItemOutput: BatchGetItemOutput = await this.documentClient.batchGet(params).promise();

    Object.keys(batchGetItemOutput.Responses).forEach(tableName => {
        if (output[tableName]) {
            output[tableName] = output[tableName].concat(batchGetItemOutput.Responses[tableName]);
        } else {
            output[tableName] = batchGetItemOutput.Responses[tableName];
        }
    });

    if (Object.keys(batchGetItemOutput.UnprocessedKeys).length !== 0) {
        output = await this.batchGet({ RequestItems: batchGetItemOutput.UnprocessedKeys }, output);
    }

    return output;
}

1 Comment

Errors are also placed into UnprocessedItems. If you do this without a limit on number of retries and you are getting throttled or some other consistent error for an item, you will just exasperate the problem and the loop will go infinitely.
-5

According to AWS documentation, the SDK automatically handles the retry logic:

Note The AWS SDKs implement automatic retry logic and exponential backoff.

And the max number of retries is configurable through:

var dynamodb = new AWS.DynamoDB({apiVersion: '2012-08-10', maxRetries:5});

2 Comments

It does not automatically retry the unprocessed items.
Also AWS documentation specifies that If batch calls fail partially, failed items will not be retried

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.