Upserting Items into DynamoDB
When updating documents, MongoDB has a useful feature to insert a new document when no document matches the query criteria. This feature is called an upsert. Sadly, as of this writing, DynamoDB misses on this feature out of the box.Thankfully, there's a way to achieve this. The idea is to do it in 3 steps: (1) Get the previous copy of the item. (2) If a previous copy exists, update it. (3) If it does not exist, insert the item ensuring that concurrent requests do not overwrite each other. Here's a snippet written for Node.js:
function upsert(tableName, partitionKey, sortKey, data) { // ... // 1. Get the original item return _get(partitionKey, sortKey).the(function (original) { if (Object.keys(original).length > 0) { // 2. Update if item already exists return _update(data, original); } else { // 3. Otherwise, put the item return _put(data).catch(function (err) { if (err.code === 'ConditionalCheckFailedException') { // 3a. Only 1 of the concurrent puts will succeed, // the rest should retry recursively return this.upsert(tableName, partitionKey, sortKey, data); } else { throw err; } }); } }); }The last part is where it gets tricky -- below is the complete code that illustrates how it is done:
function upsert(tableName, partitionKey, sortKey, data) { function _get(partitionKey, sortKey) { var params = { TableName: tableName, Key: { partitionKey: partitionKey, sortKey: sortKey } }; return docClient.get(params).promise(); } function _update(data, original) { var updateExpression = dynamodbUpdateExpression.getUpdateExpression({ data: original }, { data: data }); var params = Object.assign({ TableName: tableName, Key: { partitionKey: partitionKey, sortKey: sortKey }, ReturnValues: 'ALL_NEW', ConditionExpression: 'attribute_exists(partitionKey) AND attribute_exists(sortKey)' }, updateExpression); if (params.UpdateExpression === '') { return Promise.resolve(); } return new Promise(function (resolve, reject) { return docClient.update(params).promise() .then(function (result) { resolve(result.Attributes.data); }) .catch(reject); }); } function _put(data) { var params = { TableName: tableName, Item: { partitionKey: partitionKey, sortKey: sortKey, data: data }, ConditionExpression: 'attribute_not_exists(partitionKey) AND attribute_not_exists(sortKey)' }; return docClient.put(params).promise(); } // 1. Get the original item return _get(partitionKey, sortKey).the(function (original) { if (Object.keys(original).length > 0) { // 2. Update if item already exists return _update(data, original); } else { // 3. Otherwise, put the item return _put(data).catch(function (err) { if (err.code === 'ConditionalCheckFailedException') { // 3a. Only 1 of the concurrent puts will succeed, // the rest should retry recursively return this.upsert(tableName, partitionKey, sortKey, data); } else { throw err; } }); } }); }The trick is to declare a Condition Expression in the put step to ensure that an item only gets inserted if a previous copy does not exist (Line 46). This ensures that when handling concurrent put requests, only the 1st request succeeds and the others fail with a ConditionalCheckFailedException error. We then check for this error type to determine if any of the failed requests should be retried as update requests.
The above code uses dynamodb-update-expression (Line 16) to generate DynamoDB Update Expressions.
Subscribe to:
Post Comments
(
Atom
)
No comments :
Post a Comment