diff --git a/src/main/java/org/springframework/integration/aws/lock/DynamoDbLockRegistry.java b/src/main/java/org/springframework/integration/aws/lock/DynamoDbLockRegistry.java
index e90903ed..55a7e94d 100644
--- a/src/main/java/org/springframework/integration/aws/lock/DynamoDbLockRegistry.java
+++ b/src/main/java/org/springframework/integration/aws/lock/DynamoDbLockRegistry.java
@@ -17,8 +17,11 @@
package org.springframework.integration.aws.lock;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -47,11 +50,16 @@
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
-import com.amazonaws.services.dynamodbv2.CreateDynamoDBTableOptions;
import com.amazonaws.services.dynamodbv2.LockItem;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.BillingMode;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.LockTableDoesNotExistException;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
/**
* An {@link ExpirableLockRegistry} implementation for the AWS DynamoDB. The algorithm is
@@ -62,6 +70,7 @@
*
* @author Artem Bilan
* @author Karl Lessard
+ * @author Asiel Caballero
* @since 2.0
*/
public class DynamoDbLockRegistry implements ExpirableLockRegistry, InitializingBean, DisposableBean {
@@ -109,6 +118,8 @@ public class DynamoDbLockRegistry implements ExpirableLockRegistry, Initializing
private boolean dynamoDBLockClientExplicitlySet;
+ private BillingMode billingMode = BillingMode.PAY_PER_REQUEST;
+
private long readCapacity = 1L;
private long writeCapacity = 1L;
@@ -154,6 +165,11 @@ public DynamoDbLockRegistry(AmazonDynamoDBLockClient dynamoDBLockClient) {
this.tableName = null;
}
+ public void setBillingMode(BillingMode billingMode) {
+ Assert.notNull(billingMode, "'billingMode' must not be null");
+ this.billingMode = billingMode;
+ }
+
public void setReadCapacity(long readCapacity) {
this.readCapacity = readCapacity;
}
@@ -233,18 +249,7 @@ public void afterPropertiesSet() {
}
}
- CreateDynamoDBTableOptions createDynamoDBTableOptions = CreateDynamoDBTableOptions
- .builder(this.dynamoDB, new ProvisionedThroughput(this.readCapacity,
- this.writeCapacity),
- this.tableName)
- .withPartitionKeyName(this.partitionKey).withSortKeyName(this.sortKeyName).build();
-
- try {
- AmazonDynamoDBLockClient.createLockTableInDynamoDB(createDynamoDBTableOptions);
- }
- catch (ResourceInUseException ex) {
- // Swallow an exception and check for table existence
- }
+ createLockTableInDynamoDB();
}
int i = 0;
@@ -279,6 +284,52 @@ public void afterPropertiesSet() {
this.initialized = true;
}
+ /**
+ * Creates a DynamoDB table with the right schema for it to be used by this locking library.
+ * The table should be set
+ * up in advance, because it takes a few minutes for DynamoDB to provision a new instance.
+ *
+ * This method is a variation of {@link AmazonDynamoDBLockClient#createLockTableInDynamoDB} to support custom
+ * {@link BillingMode} for the lock table.
+ *
+ * If table already exists no exception.
+ */
+ private void createLockTableInDynamoDB() {
+ try {
+ KeySchemaElement partitionKeyElement = new KeySchemaElement();
+ partitionKeyElement.setAttributeName(this.partitionKey);
+ partitionKeyElement.setKeyType(KeyType.HASH);
+
+ List keySchema = new ArrayList<>();
+ keySchema.add(partitionKeyElement);
+
+ Collection attributeDefinitions = new ArrayList<>();
+ attributeDefinitions.add(new AttributeDefinition().withAttributeName(this.partitionKey)
+ .withAttributeType(ScalarAttributeType.S));
+
+ KeySchemaElement sortKeyElement = new KeySchemaElement();
+ sortKeyElement.setAttributeName(this.sortKeyName);
+ sortKeyElement.setKeyType(KeyType.RANGE);
+ keySchema.add(sortKeyElement);
+ attributeDefinitions.add(new AttributeDefinition().withAttributeName(this.sortKeyName)
+ .withAttributeType(ScalarAttributeType.S));
+
+ CreateTableRequest createTableRequest = new CreateTableRequest(this.tableName, keySchema)
+ .withAttributeDefinitions(attributeDefinitions)
+ .withBillingMode(this.billingMode);
+
+ if (BillingMode.PROVISIONED.equals(this.billingMode)) {
+ createTableRequest.setProvisionedThroughput(
+ new ProvisionedThroughput(this.readCapacity, this.writeCapacity));
+ }
+
+ this.dynamoDB.createTable(createTableRequest);
+ }
+ catch (ResourceInUseException ex) {
+ // Swallow an exception and you should check for table existence
+ }
+ }
+
private void awaitForActive() {
Assert.state(this.initialized,
() -> "The component has not been initialized: " + this + ".\n Is it declared as a bean?");
@@ -324,11 +375,11 @@ public void expireUnusedOlderThan(long age) {
@Override
public String toString() {
- return "DynamoDbLockRegistry{" + "tableName='" + this.tableName + '\'' + ", readCapacity=" + this.readCapacity
- + ", writeCapacity=" + this.writeCapacity + ", partitionKey='" + this.partitionKey + '\''
- + ", sortKeyName='" + this.sortKeyName + '\'' + ", sortKey='" + this.sortKey + '\'' + ", refreshPeriod="
- + this.refreshPeriod + ", leaseDuration=" + this.leaseDuration + ", heartbeatPeriod="
- + this.heartbeatPeriod + '}';
+ return "DynamoDbLockRegistry{" + "tableName='" + this.tableName + '\'' + ", billingMode=" + this.billingMode
+ + ", readCapacity=" + this.readCapacity + ", writeCapacity=" + this.writeCapacity + ", partitionKey='"
+ + this.partitionKey + '\'' + ", sortKeyName='" + this.sortKeyName + '\'' + ", sortKey='" + this.sortKey
+ + '\'' + ", refreshPeriod=" + this.refreshPeriod + ", leaseDuration=" + this.leaseDuration
+ + ", heartbeatPeriod=" + this.heartbeatPeriod + '}';
}
private final class DynamoDbLock implements Lock {
diff --git a/src/main/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStore.java b/src/main/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStore.java
index 2b7e0db4..81edac3a 100644
--- a/src/main/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStore.java
+++ b/src/main/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStore.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2017-2019 the original author or authors.
+ * Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,6 +38,7 @@
import com.amazonaws.services.dynamodbv2.document.spec.UpdateItemSpec;
import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.BillingMode;
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.CreateTableResult;
@@ -61,6 +62,7 @@
* The {@link ConcurrentMetadataStore} for the {@link AmazonDynamoDB}.
*
* @author Artem Bilan
+ * @author Asiel Caballero
* @since 1.1
*/
public class DynamoDbMetadataStore implements ConcurrentMetadataStore, InitializingBean {
@@ -89,6 +91,8 @@ public class DynamoDbMetadataStore implements ConcurrentMetadataStore, Initializ
private int createTableDelay = 1;
+ private BillingMode billingMode = BillingMode.PAY_PER_REQUEST;
+
private long readCapacity = 1L;
private long writeCapacity = 1L;
@@ -109,14 +113,6 @@ public DynamoDbMetadataStore(AmazonDynamoDBAsync dynamoDB, String tableName) {
}
- public void setReadCapacity(long readCapacity) {
- this.readCapacity = readCapacity;
- }
-
- public void setWriteCapacity(long writeCapacity) {
- this.writeCapacity = writeCapacity;
- }
-
public void setCreateTableRetries(int createTableRetries) {
this.createTableRetries = createTableRetries;
}
@@ -125,6 +121,19 @@ public void setCreateTableDelay(int createTableDelay) {
this.createTableDelay = createTableDelay;
}
+ public void setBillingMode(BillingMode billingMode) {
+ Assert.notNull(billingMode, "'billingMode' must not be null");
+ this.billingMode = billingMode;
+ }
+
+ public void setReadCapacity(long readCapacity) {
+ this.readCapacity = readCapacity;
+ }
+
+ public void setWriteCapacity(long writeCapacity) {
+ this.writeCapacity = writeCapacity;
+ }
+
/**
* Configure a period in seconds for items expiration. If it is configured to
* non-positive value ({@code <= 0}), the TTL is disabled on the table.
@@ -141,22 +150,19 @@ public void setTimeToLive(int timeToLive) {
@Override
public void afterPropertiesSet() {
try {
- try {
- this.table.describe();
- updateTimeToLiveIfAny();
- this.createTableLatch.countDown();
+ if (isTableAvailable()) {
return;
}
- catch (ResourceNotFoundException e) {
- if (logger.isInfoEnabled()) {
- logger.info("No table '" + this.table.getTableName() + "'. Creating one...");
- }
- }
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(this.table.getTableName())
.withKeySchema(new KeySchemaElement(KEY, KeyType.HASH))
.withAttributeDefinitions(new AttributeDefinition(KEY, ScalarAttributeType.S))
- .withProvisionedThroughput(new ProvisionedThroughput(this.readCapacity, this.writeCapacity));
+ .withBillingMode(this.billingMode);
+
+ if (BillingMode.PROVISIONED.equals(this.billingMode)) {
+ createTableRequest.withProvisionedThroughput(
+ new ProvisionedThroughput(this.readCapacity, this.writeCapacity));
+ }
this.dynamoDB.createTableAsync(createTableRequest,
new AsyncHandler() {
@@ -208,6 +214,21 @@ public void onWaitFailure(Exception e) {
}
}
+ private boolean isTableAvailable() {
+ try {
+ this.table.describe();
+ updateTimeToLiveIfAny();
+ this.createTableLatch.countDown();
+ return true;
+ }
+ catch (ResourceNotFoundException e) {
+ if (logger.isInfoEnabled()) {
+ logger.info("No table '" + this.table.getTableName() + "'. Creating one...");
+ }
+ return false;
+ }
+ }
+
private void updateTimeToLiveIfAny() {
if (this.timeToLive != null) {
UpdateTimeToLiveRequest updateTimeToLiveRequest = new UpdateTimeToLiveRequest()
@@ -339,8 +360,9 @@ private static String getValueIfAny(Item item) {
@Override
public String toString() {
return "DynamoDbMetadataStore{" + "table=" + this.table + ", createTableRetries=" + this.createTableRetries
- + ", createTableDelay=" + this.createTableDelay + ", readCapacity=" + this.readCapacity
- + ", writeCapacity=" + this.writeCapacity + ", timeToLive=" + this.timeToLive + '}';
+ + ", createTableDelay=" + this.createTableDelay + ", billingMode=" + this.billingMode
+ + ", readCapacity=" + this.readCapacity + ", writeCapacity=" + this.writeCapacity
+ + ", timeToLive=" + this.timeToLive + '}';
}
}
diff --git a/src/test/java/org/springframework/integration/aws/lock/DynamoDbLockRegistryBuildTableTests.java b/src/test/java/org/springframework/integration/aws/lock/DynamoDbLockRegistryBuildTableTests.java
new file mode 100644
index 00000000..7cb7595a
--- /dev/null
+++ b/src/test/java/org/springframework/integration/aws/lock/DynamoDbLockRegistryBuildTableTests.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.aws.lock;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.amazonaws.services.dynamodbv2.AbstractAmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.model.BillingMode;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.CreateTableResult;
+import com.amazonaws.services.dynamodbv2.model.DescribeTableRequest;
+import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;
+import com.amazonaws.services.dynamodbv2.model.GetItemRequest;
+import com.amazonaws.services.dynamodbv2.model.GetItemResult;
+import com.amazonaws.services.dynamodbv2.model.PutItemRequest;
+import com.amazonaws.services.dynamodbv2.model.PutItemResult;
+import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
+import com.amazonaws.services.dynamodbv2.model.TableDescription;
+import com.amazonaws.services.dynamodbv2.model.TableStatus;
+
+/**
+ * @author Asiel Caballero
+ *
+ * @since 2.3.5
+ */
+class DynamoDbLockRegistryBuildTableTests {
+ private static final String TEST_TABLE
+ = "testLockRegistry" + DynamoDbLockRegistryBuildTableTests.class.getSimpleName();
+
+ private InMemoryAmazonDynamoDB client;
+
+ @BeforeEach
+ void setup() {
+ this.client = new InMemoryAmazonDynamoDB();
+ }
+
+ @Test
+ void onDemandIsSetup() throws InterruptedException {
+ assertsBillingMode(BillingMode.PAY_PER_REQUEST,
+ lockRegistry -> lockRegistry.setBillingMode(BillingMode.PAY_PER_REQUEST));
+ }
+
+ @Test
+ void provisionedIsSetup() throws InterruptedException {
+ assertsBillingMode(BillingMode.PROVISIONED,
+ lockRegistry -> lockRegistry.setBillingMode(BillingMode.PROVISIONED));
+ }
+
+ @Test
+ void defaultsToProvisioned() throws InterruptedException {
+ assertsBillingMode(BillingMode.PAY_PER_REQUEST, store -> { });
+ }
+
+ private void assertsBillingMode(com.amazonaws.services.dynamodbv2.model.BillingMode billingMode,
+ Consumer propertySetter) throws InterruptedException {
+ DynamoDbLockRegistry lockRegistry = new DynamoDbLockRegistry(this.client, TEST_TABLE);
+ propertySetter.accept(lockRegistry);
+ lockRegistry.afterPropertiesSet();
+
+ lockRegistry.obtain("test").tryLock(1, TimeUnit.SECONDS);
+ assertThat(billingMode.toString())
+ .isEqualTo(this.client.getCreateTableRequest().getBillingMode());
+ }
+
+ private static class InMemoryAmazonDynamoDB extends AbstractAmazonDynamoDB {
+ private CreateTableRequest createTableRequest;
+ private boolean wasCalled = false;
+
+ @Override
+ public synchronized CreateTableResult createTable(CreateTableRequest request) {
+ this.createTableRequest = request;
+
+ return null;
+ }
+
+ @Override
+ public GetItemResult getItem(GetItemRequest request) {
+ return new GetItemResult();
+ }
+
+ @Override
+ public PutItemResult putItem(PutItemRequest request) {
+ return new PutItemResult();
+ }
+
+ @Override
+ public synchronized DescribeTableResult describeTable(DescribeTableRequest request) {
+ if (this.wasCalled) {
+ return new DescribeTableResult()
+ .withTable(new TableDescription()
+ .withTableStatus(TableStatus.ACTIVE));
+ }
+ else {
+ this.wasCalled = true;
+ throw new ResourceNotFoundException(TEST_TABLE);
+ }
+ }
+
+ public synchronized CreateTableRequest getCreateTableRequest() {
+ return this.createTableRequest;
+ }
+ }
+}
diff --git a/src/test/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStoreBuildTableTests.java b/src/test/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStoreBuildTableTests.java
new file mode 100644
index 00000000..88751dc4
--- /dev/null
+++ b/src/test/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStoreBuildTableTests.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2020-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.aws.metadata;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+
+import org.junit.jupiter.api.Test;
+
+import com.amazonaws.handlers.AsyncHandler;
+import com.amazonaws.services.dynamodbv2.AbstractAmazonDynamoDBAsync;
+import com.amazonaws.services.dynamodbv2.model.BillingMode;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.CreateTableResult;
+import com.amazonaws.services.dynamodbv2.model.DescribeTableRequest;
+import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;
+import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
+
+/**
+ * @author Asiel Caballero
+ *
+ * @since 2.3.5
+ */
+class DynamoDbMetadataStoreBuildTableTests {
+ private static final String TEST_TABLE
+ = "testMetadataStore" + DynamoDbMetadataStoreBuildTableTests.class.getSimpleName();
+
+ private final InMemoryAmazonDynamoDB client = new InMemoryAmazonDynamoDB();
+
+ @Test
+ void onDemandIsSetup() {
+ assertsBillingMode(BillingMode.PAY_PER_REQUEST,
+ store -> store.setBillingMode(BillingMode.PAY_PER_REQUEST));
+ }
+
+ @Test
+ void provisionedIsSetup() {
+ assertsBillingMode(BillingMode.PROVISIONED,
+ store -> store.setBillingMode(BillingMode.PROVISIONED));
+ }
+
+ @Test
+ void defaultsToProvisioned() {
+ assertsBillingMode(BillingMode.PAY_PER_REQUEST, store -> { });
+ }
+
+ private void assertsBillingMode(com.amazonaws.services.dynamodbv2.model.BillingMode billingMode,
+ Consumer propertySetter) {
+ DynamoDbMetadataStore store = new DynamoDbMetadataStore(this.client, TEST_TABLE);
+ propertySetter.accept(store);
+ store.afterPropertiesSet();
+
+ assertThat(billingMode.toString())
+ .isEqualTo(this.client.createTableRequest.getBillingMode());
+ }
+
+ private static class InMemoryAmazonDynamoDB extends AbstractAmazonDynamoDBAsync {
+ private CreateTableRequest createTableRequest;
+
+ @Override
+ public Future createTableAsync(CreateTableRequest request,
+ AsyncHandler asyncHandler) {
+ this.createTableRequest = request;
+
+ return null;
+ }
+
+ @Override
+ public DescribeTableResult describeTable(DescribeTableRequest request) {
+ throw new ResourceNotFoundException(TEST_TABLE);
+ }
+ }
+}