Upserting Items into DynamoDB
When updating documents, MongoDB has a useful feature to insert a new document when no document matches the query criteria. This feature is called an upsert. Sadly, as of this writing, DynamoDB misses on this feature out of the box.Thankfully, there's a way to achieve this. The idea is to do it in 3 steps: (1) Get the previous copy of the item. (2) If a previous copy exists, update it. (3) If it does not exist, insert the item ensuring that concurrent requests do not overwrite each other. Here's a snippet written for Node.js:
function upsert(tableName, partitionKey, sortKey, data) {
  // ...
  // 1. Get the original item
  return _get(partitionKey, sortKey).the(function (original) {
    if (Object.keys(original).length > 0) {
      // 2. Update if item already exists
      return _update(data, original);
    } else {
      // 3. Otherwise, put the item
      return _put(data).catch(function (err) {
        if (err.code === 'ConditionalCheckFailedException') {
          // 3a. Only 1 of the concurrent puts will succeed,
          // the rest should retry recursively
          return this.upsert(tableName, partitionKey, sortKey, data);
        } else {
          throw err;
        }
      });
    }
  });
}
The last part is where it gets tricky -- below is the complete code that illustrates how it is done:
function upsert(tableName, partitionKey, sortKey, data) {
  function _get(partitionKey, sortKey) {
    var params = {
      TableName: tableName,
      Key: {
        partitionKey: partitionKey,
        sortKey:      sortKey
      }
    };
    return docClient.get(params).promise();
  }
  function _update(data, original) {
    var updateExpression = dynamodbUpdateExpression.getUpdateExpression({ data: original }, { data: data });
    var params = Object.assign({
      TableName: tableName,
      Key: {
        partitionKey: partitionKey,
        sortKey:      sortKey
      },
      ReturnValues: 'ALL_NEW',
      ConditionExpression: 'attribute_exists(partitionKey) AND attribute_exists(sortKey)'
    }, updateExpression);
    if (params.UpdateExpression === '') {
      return Promise.resolve();
    }
    return new Promise(function (resolve, reject) {
      return docClient.update(params).promise()
        .then(function (result) { resolve(result.Attributes.data); })
        .catch(reject);
    });
  }
  function _put(data) {
    var params = {
      TableName: tableName,
      Item: {
        partitionKey: partitionKey,
        sortKey:      sortKey,
        data:         data
      },
      ConditionExpression: 'attribute_not_exists(partitionKey) AND attribute_not_exists(sortKey)'
    };
    return docClient.put(params).promise();
  }
  // 1. Get the original item
  return _get(partitionKey, sortKey).the(function (original) {
    if (Object.keys(original).length > 0) {
      // 2. Update if item already exists
      return _update(data, original);
    } else {
      // 3. Otherwise, put the item
      return _put(data).catch(function (err) {
        if (err.code === 'ConditionalCheckFailedException') {
          // 3a. Only 1 of the concurrent puts will succeed,
          // the rest should retry recursively
          return this.upsert(tableName, partitionKey, sortKey, data);
        } else {
          throw err;
        }
      });
    }
  });
}
The trick is to declare a Condition Expression in the put step to ensure that an item only gets inserted if a previous copy does not exist (Line 46). This ensures that when handling concurrent put requests, only the 1st request succeeds and the others fail with a ConditionalCheckFailedException error. We then check for this error type to determine if any of the failed requests should be retried as update requests.The above code uses dynamodb-update-expression (Line 16) to generate DynamoDB Update Expressions.
Subscribe to:
Comments
                      (
                      Atom
                      )
                    
 
 
No comments :
Post a Comment