Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package org.springframework.integration.aws.lock;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -47,11 +50,16 @@
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
import com.amazonaws.services.dynamodbv2.CreateDynamoDBTableOptions;
import com.amazonaws.services.dynamodbv2.LockItem;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.BillingMode;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.LockTableDoesNotExistException;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;

/**
* An {@link ExpirableLockRegistry} implementation for the AWS DynamoDB. The algorithm is
Expand All @@ -62,6 +70,7 @@
*
* @author Artem Bilan
* @author Karl Lessard
* @author Asiel Caballero
* @since 2.0
*/
public class DynamoDbLockRegistry implements ExpirableLockRegistry, InitializingBean, DisposableBean {
Expand Down Expand Up @@ -109,6 +118,8 @@ public class DynamoDbLockRegistry implements ExpirableLockRegistry, Initializing

private boolean dynamoDBLockClientExplicitlySet;

private BillingMode billingMode = BillingMode.PAY_PER_REQUEST;

private long readCapacity = 1L;

private long writeCapacity = 1L;
Expand Down Expand Up @@ -154,6 +165,11 @@ public DynamoDbLockRegistry(AmazonDynamoDBLockClient dynamoDBLockClient) {
this.tableName = null;
}

public void setBillingMode(BillingMode billingMode) {
Assert.notNull(billingMode, "'billingMode' must not be null");
this.billingMode = billingMode;
}

public void setReadCapacity(long readCapacity) {
this.readCapacity = readCapacity;
}
Expand Down Expand Up @@ -233,18 +249,7 @@ public void afterPropertiesSet() {
}
}

CreateDynamoDBTableOptions createDynamoDBTableOptions = CreateDynamoDBTableOptions
.builder(this.dynamoDB, new ProvisionedThroughput(this.readCapacity,
this.writeCapacity),
this.tableName)
.withPartitionKeyName(this.partitionKey).withSortKeyName(this.sortKeyName).build();

try {
AmazonDynamoDBLockClient.createLockTableInDynamoDB(createDynamoDBTableOptions);
}
catch (ResourceInUseException ex) {
// Swallow an exception and check for table existence
}
createLockTableInDynamoDB();
}

int i = 0;
Expand Down Expand Up @@ -279,6 +284,52 @@ public void afterPropertiesSet() {
this.initialized = true;
}

/**
* Creates a DynamoDB table with the right schema for it to be used by this locking library.
* The table should be set
* up in advance, because it takes a few minutes for DynamoDB to provision a new instance.
* <p>
* This method is a variation of {@link AmazonDynamoDBLockClient#createLockTableInDynamoDB} to support custom
* {@link BillingMode} for the lock table.
* <p>
* If table already exists no exception.
*/
private void createLockTableInDynamoDB() {
try {
KeySchemaElement partitionKeyElement = new KeySchemaElement();
partitionKeyElement.setAttributeName(this.partitionKey);
partitionKeyElement.setKeyType(KeyType.HASH);

List<KeySchemaElement> keySchema = new ArrayList<>();
keySchema.add(partitionKeyElement);

Collection<AttributeDefinition> attributeDefinitions = new ArrayList<>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName(this.partitionKey)
.withAttributeType(ScalarAttributeType.S));

KeySchemaElement sortKeyElement = new KeySchemaElement();
sortKeyElement.setAttributeName(this.sortKeyName);
sortKeyElement.setKeyType(KeyType.RANGE);
keySchema.add(sortKeyElement);
attributeDefinitions.add(new AttributeDefinition().withAttributeName(this.sortKeyName)
.withAttributeType(ScalarAttributeType.S));

CreateTableRequest createTableRequest = new CreateTableRequest(this.tableName, keySchema)
.withAttributeDefinitions(attributeDefinitions)
.withBillingMode(this.billingMode);

if (BillingMode.PROVISIONED.equals(this.billingMode)) {
createTableRequest.setProvisionedThroughput(
new ProvisionedThroughput(this.readCapacity, this.writeCapacity));
}

this.dynamoDB.createTable(createTableRequest);
}
catch (ResourceInUseException ex) {
// Swallow an exception and you should check for table existence
}
}

private void awaitForActive() {
Assert.state(this.initialized,
() -> "The component has not been initialized: " + this + ".\n Is it declared as a bean?");
Expand Down Expand Up @@ -324,11 +375,11 @@ public void expireUnusedOlderThan(long age) {

@Override
public String toString() {
return "DynamoDbLockRegistry{" + "tableName='" + this.tableName + '\'' + ", readCapacity=" + this.readCapacity
+ ", writeCapacity=" + this.writeCapacity + ", partitionKey='" + this.partitionKey + '\''
+ ", sortKeyName='" + this.sortKeyName + '\'' + ", sortKey='" + this.sortKey + '\'' + ", refreshPeriod="
+ this.refreshPeriod + ", leaseDuration=" + this.leaseDuration + ", heartbeatPeriod="
+ this.heartbeatPeriod + '}';
return "DynamoDbLockRegistry{" + "tableName='" + this.tableName + '\'' + ", billingMode=" + this.billingMode
+ ", readCapacity=" + this.readCapacity + ", writeCapacity=" + this.writeCapacity + ", partitionKey='"
+ this.partitionKey + '\'' + ", sortKeyName='" + this.sortKeyName + '\'' + ", sortKey='" + this.sortKey
+ '\'' + ", refreshPeriod=" + this.refreshPeriod + ", leaseDuration=" + this.leaseDuration
+ ", heartbeatPeriod=" + this.heartbeatPeriod + '}';
}

private final class DynamoDbLock implements Lock {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,6 +38,7 @@
import com.amazonaws.services.dynamodbv2.document.spec.UpdateItemSpec;
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.BillingMode;
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.CreateTableResult;
Expand All @@ -61,6 +62,7 @@
* The {@link ConcurrentMetadataStore} for the {@link AmazonDynamoDB}.
*
* @author Artem Bilan
* @author Asiel Caballero
* @since 1.1
*/
public class DynamoDbMetadataStore implements ConcurrentMetadataStore, InitializingBean {
Expand Down Expand Up @@ -89,6 +91,8 @@ public class DynamoDbMetadataStore implements ConcurrentMetadataStore, Initializ

private int createTableDelay = 1;

private BillingMode billingMode = BillingMode.PAY_PER_REQUEST;

private long readCapacity = 1L;

private long writeCapacity = 1L;
Expand All @@ -109,14 +113,6 @@ public DynamoDbMetadataStore(AmazonDynamoDBAsync dynamoDB, String tableName) {

}

public void setReadCapacity(long readCapacity) {
this.readCapacity = readCapacity;
}

public void setWriteCapacity(long writeCapacity) {
this.writeCapacity = writeCapacity;
}

public void setCreateTableRetries(int createTableRetries) {
this.createTableRetries = createTableRetries;
}
Expand All @@ -125,6 +121,19 @@ public void setCreateTableDelay(int createTableDelay) {
this.createTableDelay = createTableDelay;
}

public void setBillingMode(BillingMode billingMode) {
Assert.notNull(billingMode, "'billingMode' must not be null");
this.billingMode = billingMode;
}

public void setReadCapacity(long readCapacity) {
this.readCapacity = readCapacity;
}

public void setWriteCapacity(long writeCapacity) {
this.writeCapacity = writeCapacity;
}

/**
* Configure a period in seconds for items expiration. If it is configured to
* non-positive value ({@code <= 0}), the TTL is disabled on the table.
Expand All @@ -141,22 +150,19 @@ public void setTimeToLive(int timeToLive) {
@Override
public void afterPropertiesSet() {
try {
try {
this.table.describe();
updateTimeToLiveIfAny();
this.createTableLatch.countDown();
if (isTableAvailable()) {
return;
}
catch (ResourceNotFoundException e) {
if (logger.isInfoEnabled()) {
logger.info("No table '" + this.table.getTableName() + "'. Creating one...");
}
}

CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(this.table.getTableName())
.withKeySchema(new KeySchemaElement(KEY, KeyType.HASH))
.withAttributeDefinitions(new AttributeDefinition(KEY, ScalarAttributeType.S))
.withProvisionedThroughput(new ProvisionedThroughput(this.readCapacity, this.writeCapacity));
.withBillingMode(this.billingMode);

if (BillingMode.PROVISIONED.equals(this.billingMode)) {
createTableRequest.withProvisionedThroughput(
new ProvisionedThroughput(this.readCapacity, this.writeCapacity));
}

this.dynamoDB.createTableAsync(createTableRequest,
new AsyncHandler<CreateTableRequest, CreateTableResult>() {
Expand Down Expand Up @@ -208,6 +214,21 @@ public void onWaitFailure(Exception e) {
}
}

private boolean isTableAvailable() {
try {
this.table.describe();
updateTimeToLiveIfAny();
this.createTableLatch.countDown();
return true;
}
catch (ResourceNotFoundException e) {
if (logger.isInfoEnabled()) {
logger.info("No table '" + this.table.getTableName() + "'. Creating one...");
}
return false;
}
}

private void updateTimeToLiveIfAny() {
if (this.timeToLive != null) {
UpdateTimeToLiveRequest updateTimeToLiveRequest = new UpdateTimeToLiveRequest()
Expand Down Expand Up @@ -339,8 +360,9 @@ private static String getValueIfAny(Item item) {
@Override
public String toString() {
return "DynamoDbMetadataStore{" + "table=" + this.table + ", createTableRetries=" + this.createTableRetries
+ ", createTableDelay=" + this.createTableDelay + ", readCapacity=" + this.readCapacity
+ ", writeCapacity=" + this.writeCapacity + ", timeToLive=" + this.timeToLive + '}';
+ ", createTableDelay=" + this.createTableDelay + ", billingMode=" + this.billingMode
+ ", readCapacity=" + this.readCapacity + ", writeCapacity=" + this.writeCapacity
+ ", timeToLive=" + this.timeToLive + '}';
}

}
Loading