Upserting Items into DynamoDB
When updating documents, MongoDB has a useful feature to insert a new document when no document matches the query criteria. This feature is called an upsert. Sadly, as of this writing, DynamoDB misses on this feature out of the box.Thankfully, there's a way to achieve this. The idea is to do it in 3 steps: (1) Get the previous copy of the item. (2) If a previous copy exists, update it. (3) If it does not exist, insert the item ensuring that concurrent requests do not overwrite each other. Here's a snippet written for Node.js:
function upsert(tableName, partitionKey, sortKey, data) {
// ...
// 1. Get the original item
return _get(partitionKey, sortKey).the(function (original) {
if (Object.keys(original).length > 0) {
// 2. Update if item already exists
return _update(data, original);
} else {
// 3. Otherwise, put the item
return _put(data).catch(function (err) {
if (err.code === 'ConditionalCheckFailedException') {
// 3a. Only 1 of the concurrent puts will succeed,
// the rest should retry recursively
return this.upsert(tableName, partitionKey, sortKey, data);
} else {
throw err;
}
});
}
});
}
The last part is where it gets tricky -- below is the complete code that illustrates how it is done:
function upsert(tableName, partitionKey, sortKey, data) {
function _get(partitionKey, sortKey) {
var params = {
TableName: tableName,
Key: {
partitionKey: partitionKey,
sortKey: sortKey
}
};
return docClient.get(params).promise();
}
function _update(data, original) {
var updateExpression = dynamodbUpdateExpression.getUpdateExpression({ data: original }, { data: data });
var params = Object.assign({
TableName: tableName,
Key: {
partitionKey: partitionKey,
sortKey: sortKey
},
ReturnValues: 'ALL_NEW',
ConditionExpression: 'attribute_exists(partitionKey) AND attribute_exists(sortKey)'
}, updateExpression);
if (params.UpdateExpression === '') {
return Promise.resolve();
}
return new Promise(function (resolve, reject) {
return docClient.update(params).promise()
.then(function (result) { resolve(result.Attributes.data); })
.catch(reject);
});
}
function _put(data) {
var params = {
TableName: tableName,
Item: {
partitionKey: partitionKey,
sortKey: sortKey,
data: data
},
ConditionExpression: 'attribute_not_exists(partitionKey) AND attribute_not_exists(sortKey)'
};
return docClient.put(params).promise();
}
// 1. Get the original item
return _get(partitionKey, sortKey).the(function (original) {
if (Object.keys(original).length > 0) {
// 2. Update if item already exists
return _update(data, original);
} else {
// 3. Otherwise, put the item
return _put(data).catch(function (err) {
if (err.code === 'ConditionalCheckFailedException') {
// 3a. Only 1 of the concurrent puts will succeed,
// the rest should retry recursively
return this.upsert(tableName, partitionKey, sortKey, data);
} else {
throw err;
}
});
}
});
}
The trick is to declare a Condition Expression in the put step to ensure that an item only gets inserted if a previous copy does not exist (Line 46). This ensures that when handling concurrent put requests, only the 1st request succeeds and the others fail with a ConditionalCheckFailedException error. We then check for this error type to determine if any of the failed requests should be retried as update requests.The above code uses dynamodb-update-expression (Line 16) to generate DynamoDB Update Expressions.
Subscribe to:
Comments
(
Atom
)
No comments :
Post a Comment