From 7e27931a19fc46277f6bd4ca4d79b37309f413cc Mon Sep 17 00:00:00 2001 From: Gabor Bota Date: Tue, 1 Oct 2019 10:58:44 +0200 Subject: [PATCH 1/8] HADOOP-16520. race condition in DDB table init and waiting threads - compile and verify OK after refactor Change-Id: Ia83e92b9039ccb780090c99c41b4f71ef7539d35 --- .../fs/s3a/s3guard/DynamoDBMetadataStore.java | 413 +------------- .../DynamoDBMetadataStoreTableHandler.java | 533 ++++++++++++++++++ .../s3guard/ITestDynamoDBMetadataStore.java | 21 +- .../ITestDynamoDBMetadataStoreScale.java | 4 +- .../s3guard/TestDynamoDBMiscOperations.java | 2 +- 5 files changed, 579 insertions(+), 394 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java index 92f04bf5b7688..9a6244d9ca85b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java @@ -28,7 +28,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -43,9 +42,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; -import com.amazonaws.SdkBaseException; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome; @@ -62,17 +59,9 @@ import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec; import com.amazonaws.services.dynamodbv2.document.utils.ValueMap; import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; -import com.amazonaws.services.dynamodbv2.model.BillingMode; -import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; -import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription; -import com.amazonaws.services.dynamodbv2.model.ResourceInUseException; -import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException; import com.amazonaws.services.dynamodbv2.model.TableDescription; -import com.amazonaws.services.dynamodbv2.model.Tag; -import com.amazonaws.services.dynamodbv2.model.TagResourceRequest; import com.amazonaws.services.dynamodbv2.model.WriteRequest; -import com.amazonaws.waiters.WaiterTimedOutException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -89,7 +78,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.fs.s3a.AWSClientIOException; import org.apache.hadoop.fs.s3a.AWSCredentialProviderList; import org.apache.hadoop.fs.s3a.AWSServiceThrottledException; import org.apache.hadoop.fs.s3a.Constants; @@ -305,14 +293,14 @@ public class DynamoDBMetadataStore implements MetadataStore, private String region; private Table table; private String tableName; - private String tableArn; private Configuration conf; private String username; /** * This policy is mostly for batched writes, not for processing * exceptions in invoke() calls. - * It also has a role purpose in {@link #getVersionMarkerItem()}; + * It also has a role purpose in + * {@link DynamoDBMetadataStoreTableHandler#getVersionMarkerItem()}; * look at that method for the details. */ private RetryPolicy batchWriteRetryPolicy; @@ -359,6 +347,8 @@ public class DynamoDBMetadataStore implements MetadataStore, */ private ITtlTimeProvider ttlTimeProvider; + private DynamoDBMetadataStoreTableHandler tableHandler; + /** * A utility function to create DynamoDB instance. * @param conf the file system configuration @@ -437,7 +427,11 @@ public void initialize(FileSystem fs, ITtlTimeProvider ttlTp) ); this.ttlTimeProvider = ttlTp; - initTable(); + + tableHandler = new DynamoDBMetadataStoreTableHandler( + dynamoDB, tableName, region, amazonDynamoDB, conf, readOp, + batchWriteRetryPolicy); + this.table = tableHandler.initTable(); instrumentation.initialized(); } @@ -494,6 +488,7 @@ public void initialize(Configuration config, conf = config; // use the bucket as the DynamoDB table name if not specified in config tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY); + Preconditions.checkArgument(!StringUtils.isEmpty(tableName), "No DynamoDB table name configured"); region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY); @@ -518,7 +513,11 @@ public void initialize(Configuration config, "s3a-ddb-" + tableName); initDataAccessRetries(conf); this.ttlTimeProvider = ttlTp; - initTable(); + + tableHandler = new DynamoDBMetadataStoreTableHandler( + dynamoDB, tableName, region, amazonDynamoDB, conf, readOp, + batchWriteRetryPolicy); + this.table = tableHandler.initTable(); } /** @@ -1450,8 +1449,8 @@ public void destroy() throws IOException { table.waitForDelete(); } catch (IllegalArgumentException ex) { throw new TableDeleteTimeoutException(tableName, - "Timeout waiting for the table " + tableArn + " to be deleted", - ex); + "Timeout waiting for the table " + tableHandler.getTableArn() + + " to be deleted", ex); } catch (FileNotFoundException rnfe) { LOG.info("FileNotFoundException while deleting DynamoDB table {} in " + "region {}. This may indicate that the table does not exist, " @@ -1688,29 +1687,6 @@ private void removeAuthoritativeDirFlag( } } - /** - * Add tags from configuration to the existing DynamoDB table. - */ - @Retries.OnceRaw - public void tagTable() { - List tags = new ArrayList<>(); - Map tagProperties = - conf.getPropsWithPrefix(S3GUARD_DDB_TABLE_TAG); - for (Map.Entry tagMapEntry : tagProperties.entrySet()) { - Tag tag = new Tag().withKey(tagMapEntry.getKey()) - .withValue(tagMapEntry.getValue()); - tags.add(tag); - } - if (tags.isEmpty()) { - return; - } - - TagResourceRequest tagResourceRequest = new TagResourceRequest() - .withResourceArn(table.getDescription().getTableArn()) - .withTags(tags); - getAmazonDynamoDB().tagResource(tagResourceRequest); - } - @VisibleForTesting public AmazonDynamoDB getAmazonDynamoDB() { return amazonDynamoDB; @@ -1721,7 +1697,7 @@ public String toString() { return getClass().getSimpleName() + '{' + "region=" + region + ", tableName=" + tableName - + ", tableArn=" + tableArn + + ", tableArn=" + tableHandler.getTableArn() + '}'; } @@ -1735,275 +1711,20 @@ public String toString() { @Override public List listAWSPolicyRules( final Set access) { - Preconditions.checkState(tableArn != null, "TableARN not known"); + Preconditions.checkState(tableHandler.getTableArn() != null, + "TableARN not known"); if (access.isEmpty()) { return Collections.emptyList(); } RoleModel.Statement stat; if (access.contains(AccessLevel.ADMIN)) { - stat = allowAllDynamoDBOperations(tableArn); + stat = allowAllDynamoDBOperations(tableHandler.getTableArn()); } else { - stat = allowS3GuardClientOperations(tableArn); + stat = allowS3GuardClientOperations(tableHandler.getTableArn()); } return Lists.newArrayList(stat); } - /** - * Create a table if it does not exist and wait for it to become active. - * - * If a table with the intended name already exists, then it uses that table. - * Otherwise, it will automatically create the table if the config - * {@link org.apache.hadoop.fs.s3a.Constants#S3GUARD_DDB_TABLE_CREATE_KEY} is - * enabled. The DynamoDB table creation API is asynchronous. This method wait - * for the table to become active after sending the creation request, so - * overall, this method is synchronous, and the table is guaranteed to exist - * after this method returns successfully. - * - * The wait for a table becoming active is Retry+Translated; it can fail - * while a table is not yet ready. - * - * @throws IOException if table does not exist and auto-creation is disabled; - * or table is being deleted, or any other I/O exception occurred. - */ - @VisibleForTesting - @Retries.OnceRaw - void initTable() throws IOException { - table = dynamoDB.getTable(tableName); - try { - try { - LOG.debug("Binding to table {}", tableName); - TableDescription description = table.describe(); - LOG.debug("Table state: {}", description); - tableArn = description.getTableArn(); - final String status = description.getTableStatus(); - switch (status) { - case "CREATING": - LOG.debug("Table {} in region {} is being created/updated. This may" - + " indicate that the table is being operated by another " - + "concurrent thread or process. Waiting for active...", - tableName, region); - waitForTableActive(table); - break; - case "DELETING": - throw new FileNotFoundException("DynamoDB table " - + "'" + tableName + "' is being " - + "deleted in region " + region); - case "UPDATING": - // table being updated; it can still be used. - LOG.debug("Table is being updated."); - break; - case "ACTIVE": - break; - default: - throw new IOException("Unknown DynamoDB table status " + status - + ": tableName='" + tableName + "', region=" + region); - } - - final Item versionMarker = getVersionMarkerItem(); - verifyVersionCompatibility(tableName, versionMarker); - Long created = extractCreationTimeFromMarker(versionMarker); - LOG.debug("Using existing DynamoDB table {} in region {} created {}", - tableName, region, (created != null) ? new Date(created) : null); - } catch (ResourceNotFoundException rnfe) { - if (conf.getBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, false)) { - long readCapacity = conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_READ_KEY, - S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT); - long writeCapacity = conf.getLong( - S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY, - S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT); - ProvisionedThroughput capacity; - if (readCapacity > 0 && writeCapacity > 0) { - capacity = new ProvisionedThroughput( - readCapacity, - writeCapacity); - } else { - // at least one capacity value is <= 0 - // verify they are both exactly zero - Preconditions.checkArgument( - readCapacity == 0 && writeCapacity == 0, - "S3Guard table read capacity %d and and write capacity %d" - + " are inconsistent", readCapacity, writeCapacity); - // and set the capacity to null for per-request billing. - capacity = null; - } - - createTable(capacity); - } else { - throw (FileNotFoundException)new FileNotFoundException( - "DynamoDB table '" + tableName + "' does not " - + "exist in region " + region + "; auto-creation is turned off") - .initCause(rnfe); - } - } - - } catch (AmazonClientException e) { - throw translateException("initTable", tableName, e); - } - } - - /** - * Get the version mark item in the existing DynamoDB table. - * - * As the version marker item may be created by another concurrent thread or - * process, we sleep and retry a limited number times if the lookup returns - * with a null value. - * DDB throttling is always retried. - */ - @VisibleForTesting - @Retries.RetryTranslated - Item getVersionMarkerItem() throws IOException { - final PrimaryKey versionMarkerKey = - createVersionMarkerPrimaryKey(VERSION_MARKER); - int retryCount = 0; - // look for a version marker, with usual throttling/failure retries. - Item versionMarker = queryVersionMarker(versionMarkerKey); - while (versionMarker == null) { - // The marker was null. - // Two possibilities - // 1. This isn't a S3Guard table. - // 2. This is a S3Guard table in construction; another thread/process - // is about to write/actively writing the version marker. - // So that state #2 is handled, batchWriteRetryPolicy is used to manage - // retries. - // This will mean that if the cause is actually #1, failure will not - // be immediate. As this will ultimately result in a failure to - // init S3Guard and the S3A FS, this isn't going to be a performance - // bottleneck -simply a slightly slower failure report than would otherwise - // be seen. - // "if your settings are broken, performance is not your main issue" - try { - RetryPolicy.RetryAction action = batchWriteRetryPolicy.shouldRetry(null, - retryCount, 0, true); - if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { - break; - } else { - LOG.debug("Sleeping {} ms before next retry", action.delayMillis); - Thread.sleep(action.delayMillis); - } - } catch (Exception e) { - throw new IOException("initTable: Unexpected exception " + e, e); - } - retryCount++; - versionMarker = queryVersionMarker(versionMarkerKey); - } - return versionMarker; - } - - /** - * Issue the query to get the version marker, with throttling for overloaded - * DDB tables. - * @param versionMarkerKey key to look up - * @return the marker - * @throws IOException failure - */ - @Retries.RetryTranslated - private Item queryVersionMarker(final PrimaryKey versionMarkerKey) - throws IOException { - return readOp.retry("getVersionMarkerItem", - VERSION_MARKER, true, - () -> table.getItem(versionMarkerKey)); - } - - /** - * Verify that a table version is compatible with this S3Guard client. - * @param tableName name of the table (for error messages) - * @param versionMarker the version marker retrieved from the table - * @throws IOException on any incompatibility - */ - @VisibleForTesting - static void verifyVersionCompatibility(String tableName, - Item versionMarker) throws IOException { - if (versionMarker == null) { - LOG.warn("Table {} contains no version marker", tableName); - throw new IOException(E_NO_VERSION_MARKER - + " Table: " + tableName); - } else { - final int version = extractVersionFromMarker(versionMarker); - if (VERSION != version) { - // version mismatch. Unless/until there is support for - // upgrading versions, treat this as an incompatible change - // and fail. - throw new IOException(E_INCOMPATIBLE_VERSION - + " Table "+ tableName - + " Expected version " + VERSION + " actual " + version); - } - } - } - - /** - * Wait for table being active. - * @param t table to block on. - * @throws IOException IO problems - * @throws InterruptedIOException if the wait was interrupted - * @throws IllegalArgumentException if an exception was raised in the waiter - */ - @Retries.RetryTranslated - private void waitForTableActive(Table t) throws IOException { - invoker.retry("Waiting for active state of table " + tableName, - null, - true, - () -> { - try { - t.waitForActive(); - } catch (IllegalArgumentException ex) { - throw translateTableWaitFailure(tableName, ex); - } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting for table {} in region {}" - + " active", - tableName, region, e); - Thread.currentThread().interrupt(); - throw (InterruptedIOException) - new InterruptedIOException("DynamoDB table '" - + tableName + "' is not active yet in region " + region) - .initCause(e); - } - }); - } - - /** - * Create a table, wait for it to become active, then add the version - * marker. - * Creating an setting up the table isn't wrapped by any retry operations; - * the wait for a table to become available is RetryTranslated. - * @param capacity capacity to provision. If null: create a per-request - * table. - * @throws IOException on any failure. - * @throws InterruptedIOException if the wait was interrupted - */ - @Retries.OnceRaw - private void createTable(ProvisionedThroughput capacity) throws IOException { - try { - String mode; - CreateTableRequest request = new CreateTableRequest() - .withTableName(tableName) - .withKeySchema(keySchema()) - .withAttributeDefinitions(attributeDefinitions()); - if (capacity != null) { - mode = String.format("with provisioned read capacity %d and" - + " write capacity %s", - capacity.getReadCapacityUnits(), capacity.getWriteCapacityUnits()); - request.withProvisionedThroughput(capacity); - } else { - mode = "with pay-per-request billing"; - request.withBillingMode(BillingMode.PAY_PER_REQUEST); - } - LOG.info("Creating non-existent DynamoDB table {} in region {} {}", - tableName, region, mode); - table = dynamoDB.createTable(request); - LOG.debug("Awaiting table becoming active"); - } catch (ResourceInUseException e) { - LOG.warn("ResourceInUseException while creating DynamoDB table {} " - + "in region {}. This may indicate that the table was " - + "created by another concurrent thread or process.", - tableName, region); - } - waitForTableActive(table); - final Item marker = createVersionMarker(VERSION_MARKER, VERSION, - System.currentTimeMillis()); - putItem(marker); - tagTable(); - } - /** * PUT a single item to the table. * @param item item to put @@ -2015,47 +1736,6 @@ private PutItemOutcome putItem(Item item) { return table.putItem(item); } - /** - * Provision the table with given read and write capacity units. - * Call will fail if the table is busy, or the new values match the current - * ones. - *

- * Until the AWS SDK lets us switch a table to on-demand, an attempt to - * set the I/O capacity to zero will fail. - * @param readCapacity read units: must be greater than zero - * @param writeCapacity write units: must be greater than zero - * @throws IOException on a failure - */ - @Retries.RetryTranslated - void provisionTable(Long readCapacity, Long writeCapacity) - throws IOException { - - if (readCapacity == 0 || writeCapacity == 0) { - // table is pay on demand - throw new IOException(E_ON_DEMAND_NO_SET_CAPACITY); - } - final ProvisionedThroughput toProvision = new ProvisionedThroughput() - .withReadCapacityUnits(readCapacity) - .withWriteCapacityUnits(writeCapacity); - invoker.retry("ProvisionTable", tableName, true, - () -> { - final ProvisionedThroughputDescription p = - table.updateTable(toProvision).getProvisionedThroughput(); - LOG.info("Provision table {} in region {}: readCapacityUnits={}, " - + "writeCapacityUnits={}", - tableName, region, p.getReadCapacityUnits(), - p.getWriteCapacityUnits()); - }); - } - - @Retries.RetryTranslated - @VisibleForTesting - void provisionTableBlocking(Long readCapacity, Long writeCapacity) - throws IOException { - provisionTable(readCapacity, writeCapacity); - waitForTableActive(table); - } - @VisibleForTesting Table getTable() { return table; @@ -2175,7 +1855,7 @@ public void updateParameters(Map parameters) currentRead, currentWrite); LOG.info("Changing capacity of table to read: {}, write: {}", newRead, newWrite); - provisionTableBlocking(newRead, newWrite); + tableHandler.provisionTableBlocking(newRead, newWrite); } else { LOG.info("Table capacity unchanged at read: {}, write: {}", newRead, newWrite); @@ -2374,48 +2054,6 @@ String getUsername() { return username; } - /** - * Take an {@code IllegalArgumentException} raised by a DDB operation - * and if it contains an inner SDK exception, unwrap it. - * @param ex exception. - * @return the inner AWS exception or null. - */ - public static SdkBaseException extractInnerException( - IllegalArgumentException ex) { - if (ex.getCause() instanceof SdkBaseException) { - return (SdkBaseException) ex.getCause(); - } else { - return null; - } - } - - /** - * Handle a table wait failure by extracting any inner cause and - * converting it, or, if unconvertable by wrapping - * the IllegalArgumentException in an IOE. - * - * @param name name of the table - * @param e exception - * @return an IOE to raise. - */ - @VisibleForTesting - static IOException translateTableWaitFailure( - final String name, IllegalArgumentException e) { - final SdkBaseException ex = extractInnerException(e); - if (ex != null) { - if (ex instanceof WaiterTimedOutException) { - // a timeout waiting for state change: extract the - // message from the outer exception, but translate - // the inner one for the throttle policy. - return new AWSClientIOException(e.getMessage(), ex); - } else { - return translateException(e.getMessage(), name, ex); - } - } else { - return new IOException(e); - } - } - /** * Log a PUT into the operations log at debug level. * @param state optional ancestor state. @@ -2691,4 +2329,9 @@ private static String stateAsString(@Nullable AncestorState state) { return stateStr; } } + + protected DynamoDBMetadataStoreTableHandler getTableHandler() { + Preconditions.checkNotNull(tableHandler, "Not initialized"); + return tableHandler; + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java new file mode 100644 index 0000000000000..8850e56dc43ba --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.SdkBaseException; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.Item; +import com.amazonaws.services.dynamodbv2.document.PrimaryKey; +import com.amazonaws.services.dynamodbv2.document.PutItemOutcome; +import com.amazonaws.services.dynamodbv2.document.Table; +import com.amazonaws.services.dynamodbv2.model.BillingMode; +import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription; +import com.amazonaws.services.dynamodbv2.model.ResourceInUseException; +import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException; +import com.amazonaws.services.dynamodbv2.model.ScanRequest; +import com.amazonaws.services.dynamodbv2.model.ScanResult; +import com.amazonaws.services.dynamodbv2.model.TableDescription; +import com.amazonaws.services.dynamodbv2.model.Tag; +import com.amazonaws.services.dynamodbv2.model.TagResourceRequest; +import com.amazonaws.waiters.WaiterTimedOutException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.AWSClientIOException; +import org.apache.hadoop.fs.s3a.Invoker; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT; +import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_READ_KEY; +import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT; +import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY; +import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CREATE_KEY; +import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_TAG; +import static org.apache.hadoop.fs.s3a.S3AUtils.translateException; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.E_INCOMPATIBLE_VERSION; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.E_NO_VERSION_MARKER; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.E_ON_DEMAND_NO_SET_CAPACITY; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER; +import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.attributeDefinitions; +import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.createVersionMarker; +import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.createVersionMarkerPrimaryKey; +import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.extractCreationTimeFromMarker; +import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.extractVersionFromMarker; +import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.keySchema; + +public class DynamoDBMetadataStoreTableHandler { + public static final Logger LOG = LoggerFactory.getLogger( + DynamoDBMetadataStoreTableHandler.class); + + /** Invoker for IO. Until configured properly, use try-once. */ + private Invoker invoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, + Invoker.NO_OP + ); + + private AmazonDynamoDB amazonDynamoDB; + final private DynamoDB dynamoDB; + final private String tableName; + private Table table; + private String region; + private Configuration conf; + private final Invoker readOp; + private final RetryPolicy batchWriteRetryPolicy; + + private String tableArn; + + public DynamoDBMetadataStoreTableHandler(DynamoDB dynamoDB, + String tableName, + String region, + AmazonDynamoDB amazonDynamoDB, + Configuration conf, + Invoker readOp, + RetryPolicy batchWriteCapacityExceededEvents) { + this.dynamoDB = dynamoDB; + this.amazonDynamoDB = amazonDynamoDB; + this.tableName = tableName; + this.region = region; + this.conf = conf; + this.readOp = readOp; + this.batchWriteRetryPolicy = batchWriteCapacityExceededEvents; + } + + /** + * Create a table if it does not exist and wait for it to become active. + * + * If a table with the intended name already exists, then it uses that table. + * Otherwise, it will automatically create the table if the config + * {@link org.apache.hadoop.fs.s3a.Constants#S3GUARD_DDB_TABLE_CREATE_KEY} is + * enabled. The DynamoDB table creation API is asynchronous. This method wait + * for the table to become active after sending the creation request, so + * overall, this method is synchronous, and the table is guaranteed to exist + * after this method returns successfully. + * + * The wait for a table becoming active is Retry+Translated; it can fail + * while a table is not yet ready. + * + * @throws IOException if table does not exist and auto-creation is disabled; + * or table is being deleted, or any other I/O exception occurred. + */ + @VisibleForTesting + @Retries.OnceRaw + Table initTable() throws IOException { + table = dynamoDB.getTable(tableName); + try { + try { + LOG.debug("Binding to table {}", tableName); + TableDescription description = table.describe(); + LOG.debug("Table state: {}", description); + tableArn = description.getTableArn(); + final String status = description.getTableStatus(); + switch (status) { + case "CREATING": + LOG.debug("Table {} in region {} is being created/updated. This may" + + " indicate that the table is being operated by another " + + "concurrent thread or process. Waiting for active...", + tableName, region); + waitForTableActive(table); + break; + case "DELETING": + throw new FileNotFoundException("DynamoDB table " + + "'" + tableName + "' is being " + + "deleted in region " + region); + case "UPDATING": + // table being updated; it can still be used. + LOG.debug("Table is being updated."); + break; + case "ACTIVE": + break; + default: + throw new IOException("Unknown DynamoDB table status " + status + + ": tableName='" + tableName + "', region=" + region); + } + + final Item versionMarker = getVersionMarkerItem(); + verifyVersionCompatibility(tableName, versionMarker); + Long created = extractCreationTimeFromMarker(versionMarker); + LOG.debug("Using existing DynamoDB table {} in region {} created {}", + tableName, region, (created != null) ? new Date(created) : null); + } catch (ResourceNotFoundException rnfe) { + if (conf.getBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, false)) { + long readCapacity = conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_READ_KEY, + S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT); + long writeCapacity = conf.getLong( + S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY, + S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT); + ProvisionedThroughput capacity; + if (readCapacity > 0 && writeCapacity > 0) { + capacity = new ProvisionedThroughput( + readCapacity, + writeCapacity); + } else { + // at least one capacity value is <= 0 + // verify they are both exactly zero + Preconditions.checkArgument( + readCapacity == 0 && writeCapacity == 0, + "S3Guard table read capacity %d and and write capacity %d" + + " are inconsistent", readCapacity, writeCapacity); + // and set the capacity to null for per-request billing. + capacity = null; + } + + createTable(capacity); + } else { + throw (FileNotFoundException) new FileNotFoundException( + "DynamoDB table '" + tableName + "' does not " + + "exist in region " + region + + "; auto-creation is turned off") + .initCause(rnfe); + } + } + + } catch (AmazonClientException e) { + throw translateException("initTable", tableName, e); + } + + return table; + } + + private void addVersionMarkerToEmptyTable(String tableName) + throws IOException { + final ScanResult result = readOp.retry( + "scan", + null, + true, + () -> { + final ScanRequest req = new ScanRequest().withTableName( + tableName).withLimit(1); + return amazonDynamoDB.scan(req); + } + ); + boolean isEmptyTable = result.getCount() == 0; + + if (!isEmptyTable) { + // the table is not empty, do nothing. + } else { + // the table is empty, add version marker + putVersionMarkerToTable(); + } + } + + /** + * Create a table, wait for it to become active, then add the version + * marker. + * Creating an setting up the table isn't wrapped by any retry operations; + * the wait for a table to become available is RetryTranslated. + * @param capacity capacity to provision. If null: create a per-request + * table. + * @throws IOException on any failure. + * @throws InterruptedIOException if the wait was interrupted + */ + @Retries.OnceRaw + private void createTable(ProvisionedThroughput capacity) throws IOException { + try { + String mode; + CreateTableRequest request = new CreateTableRequest() + .withTableName(tableName) + .withKeySchema(keySchema()) + .withAttributeDefinitions(attributeDefinitions()); + if (capacity != null) { + mode = String.format("with provisioned read capacity %d and" + + " write capacity %s", + capacity.getReadCapacityUnits(), capacity.getWriteCapacityUnits()); + request.withProvisionedThroughput(capacity); + } else { + mode = "with pay-per-request billing"; + request.withBillingMode(BillingMode.PAY_PER_REQUEST); + } + LOG.info("Creating non-existent DynamoDB table {} in region {} {}", + tableName, region, mode); + table = dynamoDB.createTable(request); + LOG.debug("Awaiting table becoming active"); + } catch (ResourceInUseException e) { + LOG.warn("ResourceInUseException while creating DynamoDB table {} " + + "in region {}. This may indicate that the table was " + + "created by another concurrent thread or process.", + tableName, region); + } + waitForTableActive(table); + putVersionMarkerToTable(); + tagTable(); + } + + /** + * Add tags from configuration to the existing DynamoDB table. + */ + @Retries.OnceRaw + public void tagTable() { + List tags = new ArrayList<>(); + Map tagProperties = + conf.getPropsWithPrefix(S3GUARD_DDB_TABLE_TAG); + for (Map.Entry tagMapEntry : tagProperties.entrySet()) { + Tag tag = new Tag().withKey(tagMapEntry.getKey()) + .withValue(tagMapEntry.getValue()); + tags.add(tag); + } + if (tags.isEmpty()) { + return; + } + + TagResourceRequest tagResourceRequest = new TagResourceRequest() + .withResourceArn(table.getDescription().getTableArn()) + .withTags(tags); + amazonDynamoDB.tagResource(tagResourceRequest); + } + + /** + * Verify that a table version is compatible with this S3Guard client. + * @param tableName name of the table (for error messages) + * @param versionMarker the version marker retrieved from the table + * @throws IOException on any incompatibility + */ + @VisibleForTesting + static void verifyVersionCompatibility(String tableName, + Item versionMarker) throws IOException { + if (versionMarker == null) { + LOG.warn("Table {} contains no version marker", tableName); + throw new IOException(E_NO_VERSION_MARKER + + " Table: " + tableName); + } else { + final int version = extractVersionFromMarker(versionMarker); + if (VERSION != version) { + // version mismatch. Unless/until there is support for + // upgrading versions, treat this as an incompatible change + // and fail. + throw new IOException(E_INCOMPATIBLE_VERSION + + " Table " + tableName + + " Expected version " + VERSION + " actual " + version); + } + } + } + + /** + * Add version marker to the dynamo table + */ + @Retries.OnceRaw + private void putVersionMarkerToTable() { + final Item marker = createVersionMarker(VERSION_MARKER, VERSION, + System.currentTimeMillis()); + putItem(marker); + } + + /** + * Wait for table being active. + * @param t table to block on. + * @throws IOException IO problems + * @throws InterruptedIOException if the wait was interrupted + * @throws IllegalArgumentException if an exception was raised in the waiter + */ + @Retries.RetryTranslated + private void waitForTableActive(Table t) throws IOException { + invoker.retry("Waiting for active state of table " + tableName, + null, + true, + () -> { + try { + t.waitForActive(); + } catch (IllegalArgumentException ex) { + throw translateTableWaitFailure(tableName, ex); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for table {} in region {}" + + " active", + tableName, region, e); + Thread.currentThread().interrupt(); + throw (InterruptedIOException) + new InterruptedIOException("DynamoDB table '" + + tableName + "' is not active yet in region " + region) + .initCause(e); + } + }); + } + + /** + * Handle a table wait failure by extracting any inner cause and + * converting it, or, if unconvertable by wrapping + * the IllegalArgumentException in an IOE. + * + * @param name name of the table + * @param e exception + * @return an IOE to raise. + */ + @VisibleForTesting + static IOException translateTableWaitFailure( + final String name, IllegalArgumentException e) { + final SdkBaseException ex = extractInnerException(e); + if (ex != null) { + if (ex instanceof WaiterTimedOutException) { + // a timeout waiting for state change: extract the + // message from the outer exception, but translate + // the inner one for the throttle policy. + return new AWSClientIOException(e.getMessage(), ex); + } else { + return translateException(e.getMessage(), name, ex); + } + } else { + return new IOException(e); + } + } + + /** + * Take an {@code IllegalArgumentException} raised by a DDB operation + * and if it contains an inner SDK exception, unwrap it. + * @param ex exception. + * @return the inner AWS exception or null. + */ + public static SdkBaseException extractInnerException( + IllegalArgumentException ex) { + if (ex.getCause() instanceof SdkBaseException) { + return (SdkBaseException) ex.getCause(); + } else { + return null; + } + } + + /** + * Get the version mark item in the existing DynamoDB table. + * + * As the version marker item may be created by another concurrent thread or + * process, we sleep and retry a limited number times if the lookup returns + * with a null value. + * DDB throttling is always retried. + */ + @VisibleForTesting + @Retries.RetryTranslated + Item getVersionMarkerItem() throws IOException { + final PrimaryKey versionMarkerKey = + createVersionMarkerPrimaryKey(VERSION_MARKER); + int retryCount = 0; + // look for a version marker, with usual throttling/failure retries. + Item versionMarker = queryVersionMarker(versionMarkerKey); + while (versionMarker == null) { + // The marker was null. + // Two possibilities + // 1. This isn't a S3Guard table. + // 2. This is a S3Guard table in construction; another thread/process + // is about to write/actively writing the version marker. + // So that state #2 is handled, batchWriteRetryPolicy is used to manage + // retries. + // This will mean that if the cause is actually #1, failure will not + // be immediate. As this will ultimately result in a failure to + // init S3Guard and the S3A FS, this isn't going to be a performance + // bottleneck -simply a slightly slower failure report than would otherwise + // be seen. + // "if your settings are broken, performance is not your main issue" + try { + RetryPolicy.RetryAction action = batchWriteRetryPolicy.shouldRetry(null, + retryCount, 0, true); + if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { + break; + } else { + LOG.debug("Sleeping {} ms before next retry", action.delayMillis); + Thread.sleep(action.delayMillis); + } + } catch (Exception e) { + throw new IOException("initTable: Unexpected exception " + e, e); + } + retryCount++; + versionMarker = queryVersionMarker(versionMarkerKey); + } + return versionMarker; + } + + /** + * Issue the query to get the version marker, with throttling for overloaded + * DDB tables. + * @param versionMarkerKey key to look up + * @return the marker + * @throws IOException failure + */ + @Retries.RetryTranslated + private Item queryVersionMarker(final PrimaryKey versionMarkerKey) + throws IOException { + return readOp.retry("getVersionMarkerItem", + VERSION_MARKER, true, + () -> table.getItem(versionMarkerKey)); + } + + /** + * PUT a single item to the table. + * @param item item to put + * @return the outcome. + */ + @Retries.OnceRaw + private PutItemOutcome putItem(Item item) { + LOG.debug("Putting item {}", item); + return table.putItem(item); + } + + /** + * Provision the table with given read and write capacity units. + * Call will fail if the table is busy, or the new values match the current + * ones. + *

+ * Until the AWS SDK lets us switch a table to on-demand, an attempt to + * set the I/O capacity to zero will fail. + * @param readCapacity read units: must be greater than zero + * @param writeCapacity write units: must be greater than zero + * @throws IOException on a failure + */ + @Retries.RetryTranslated + void provisionTable(Long readCapacity, Long writeCapacity) + throws IOException { + + if (readCapacity == 0 || writeCapacity == 0) { + // table is pay on demand + throw new IOException(E_ON_DEMAND_NO_SET_CAPACITY); + } + final ProvisionedThroughput toProvision = new ProvisionedThroughput() + .withReadCapacityUnits(readCapacity) + .withWriteCapacityUnits(writeCapacity); + invoker.retry("ProvisionTable", tableName, true, + () -> { + final ProvisionedThroughputDescription p = + table.updateTable(toProvision).getProvisionedThroughput(); + LOG.info("Provision table {} in region {}: readCapacityUnits={}, " + + "writeCapacityUnits={}", + tableName, region, p.getReadCapacityUnits(), + p.getWriteCapacityUnits()); + }); + } + + @Retries.RetryTranslated + @VisibleForTesting + void provisionTableBlocking(Long readCapacity, Long writeCapacity) + throws IOException { + provisionTable(readCapacity, writeCapacity); + waitForTableActive(table); + } + + public String getTableName() { + return tableName; + } + + public Table getTable() { + return table; + } + + public String getTableArn() { + return tableArn; + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java index 78c9fea2f571a..1feae3bdd864a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java @@ -114,6 +114,7 @@ public ITestDynamoDBMetadataStore() { private S3AFileSystem fileSystem; private S3AContract s3AContract; + private DynamoDBMetadataStoreTableHandler tableHandler; private URI fsUri; @@ -153,6 +154,7 @@ public void setUp() throws Exception { try{ super.setUp(); + tableHandler = getDynamoMetadataStore().getTableHandler(); } catch (FileNotFoundException e){ LOG.warn("MetadataStoreTestBase setup failed. Waiting for table to be " + "deleted before trying again."); @@ -613,7 +615,7 @@ public void testInitExistingTable() throws IOException { final String tableName = ddbms.getTable().getTableName(); verifyTableInitialized(tableName, ddbms.getDynamoDB()); // create existing table - ddbms.initTable(); + tableHandler.initTable(); verifyTableInitialized(tableName, ddbms.getDynamoDB()); } @@ -622,7 +624,7 @@ public void testInitExistingTable() throws IOException { */ @Test public void testItemVersionCompatibility() throws Throwable { - verifyVersionCompatibility("table", + DynamoDBMetadataStoreTableHandler.verifyVersionCompatibility("table", createVersionMarker(VERSION_MARKER, VERSION, 0)); } @@ -633,7 +635,7 @@ public void testItemVersionCompatibility() throws Throwable { @Test public void testItemLacksVersion() throws Throwable { intercept(IOException.class, E_NOT_VERSION_MARKER, - () -> verifyVersionCompatibility("table", + () -> DynamoDBMetadataStoreTableHandler.verifyVersionCompatibility("table", new Item().withPrimaryKey( createVersionMarkerPrimaryKey(VERSION_MARKER)))); } @@ -663,16 +665,21 @@ public void testTableVersioning() throws Exception { DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore(); try { ddbms.initialize(conf, new S3Guard.TtlTimeProvider(conf)); + DynamoDBMetadataStoreTableHandler localTableHandler = + ddbms.getTableHandler(); + Table table = verifyTableInitialized(tableName, ddbms.getDynamoDB()); // check the tagging too verifyStoreTags(createTagMap(), ddbms); Item originalVersionMarker = table.getItem(VERSION_MARKER_PRIMARY_KEY); table.deleteItem(VERSION_MARKER_PRIMARY_KEY); + assertNull("Version marker should be null after deleting it from the table.", + table.getItem(VERSION_MARKER_PRIMARY_KEY)); // create existing table intercept(IOException.class, E_NO_VERSION_MARKER, - () -> ddbms.initTable()); + () -> localTableHandler.initTable()); // now add a different version marker Item v200 = createVersionMarker(VERSION_MARKER, VERSION * 2, 0); @@ -680,7 +687,7 @@ public void testTableVersioning() throws Exception { // create existing table intercept(IOException.class, E_INCOMPATIBLE_VERSION, - () -> ddbms.initTable()); + () -> localTableHandler.initTable()); // create a marker with no version and expect failure final Item invalidMarker = new Item().withPrimaryKey( @@ -689,11 +696,11 @@ public void testTableVersioning() throws Exception { table.putItem(invalidMarker); intercept(IOException.class, E_NOT_VERSION_MARKER, - () -> ddbms.initTable()); + () -> localTableHandler.initTable()); // reinstate the version marker table.putItem(originalVersionMarker); - ddbms.initTable(); + localTableHandler.initTable(); conf.setInt(S3GUARD_DDB_MAX_RETRIES, maxRetries); } finally { destroy(ddbms); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java index 53df60f59c21a..b36428d74028e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java @@ -91,6 +91,7 @@ public class ITestDynamoDBMetadataStoreScale private static final long MAXIMUM_WRITE_CAPACITY = 15; private DynamoDBMetadataStore ddbms; + private DynamoDBMetadataStoreTableHandler tableHandler; private DynamoDB ddb; @@ -160,6 +161,7 @@ public void setup() throws Exception { super.setup(); ddbms = (DynamoDBMetadataStore) createMetadataStore(); tableName = ddbms.getTableName(); + tableHandler = ddbms.getTableHandler(); assertNotNull("table has no name", tableName); ddb = ddbms.getDynamoDB(); table = ddb.getTable(tableName); @@ -325,7 +327,7 @@ public void test_050_getVersionMarkerItem() throws Throwable { execute("get", OPERATIONS_PER_THREAD * 2, expectThrottling(), - () -> ddbms.getVersionMarkerItem() + () -> tableHandler.getVersionMarkerItem() ); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java index 578aed06bc3ca..bdba6b1720de1 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java @@ -32,10 +32,10 @@ import org.apache.hadoop.test.HadoopTestBase; import org.apache.hadoop.fs.Path; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableHandler.translateTableWaitFailure; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.translateTableWaitFailure; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** From f7deab10ac890e74908583235cc56befd2ec8b65 Mon Sep 17 00:00:00 2001 From: Gabor Bota Date: Wed, 2 Oct 2019 16:46:44 +0200 Subject: [PATCH 2/8] working version consistency check with tests Change-Id: I34704f7def441af72a1b85be44fcb91fc8d2b2ce --- .../fs/s3a/s3guard/DynamoDBMetadataStore.java | 8 - .../DynamoDBMetadataStoreTableHandler.java | 238 +++++++++++++----- .../PathMetadataDynamoDBTranslation.java | 2 +- .../s3guard/ITestDynamoDBMetadataStore.java | 166 ++++++++---- .../s3a/s3guard/ITestS3GuardToolDynamoDB.java | 3 +- 5 files changed, 294 insertions(+), 123 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java index 9a6244d9ca85b..f8d02c6384e6e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java @@ -226,14 +226,6 @@ public class DynamoDBMetadataStore implements MetadataStore, /** Current version number. */ public static final int VERSION = 100; - /** Error: version marker not found in table. */ - public static final String E_NO_VERSION_MARKER - = "S3Guard table lacks version marker."; - - /** Error: version mismatch. */ - public static final String E_INCOMPATIBLE_VERSION - = "Database table is from an incompatible S3Guard version."; - @VisibleForTesting static final String BILLING_MODE = "billing-mode"; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java index 8850e56dc43ba..9c46a52db3a28 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java @@ -28,6 +28,7 @@ import com.amazonaws.services.dynamodbv2.document.Table; import com.amazonaws.services.dynamodbv2.model.BillingMode; import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; +import com.amazonaws.services.dynamodbv2.model.ListTagsOfResourceRequest; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription; import com.amazonaws.services.dynamodbv2.model.ResourceInUseException; @@ -57,7 +58,9 @@ import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Optional; +import static java.lang.String.valueOf; import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT; import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_READ_KEY; import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT; @@ -65,8 +68,6 @@ import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CREATE_KEY; import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_TAG; import static org.apache.hadoop.fs.s3a.S3AUtils.translateException; -import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.E_INCOMPATIBLE_VERSION; -import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.E_NO_VERSION_MARKER; import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.E_ON_DEMAND_NO_SET_CAPACITY; import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION; import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER; @@ -77,24 +78,40 @@ import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.extractVersionFromMarker; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.keySchema; +/** + * Table handling for dynamo tables, factored out from DynamoDBMetadataStore + * Mainly + */ public class DynamoDBMetadataStoreTableHandler { public static final Logger LOG = LoggerFactory.getLogger( DynamoDBMetadataStoreTableHandler.class); + /** Error: version marker not found in table but the table is not empty. */ + public static final String E_NO_VERSION_MARKER_AND_NOT_EMPTY + = "S3Guard table lacks version marker, and not empty."; + + /** Error: version mismatch. */ + public static final String E_INCOMPATIBLE_TAG_VERSION + = "Database table is from an incompatible S3Guard version based on table TAG."; + + /** Error: version mismatch. */ + public static final String E_INCOMPATIBLE_ITEM_VERSION + = "Database table is from an incompatible S3Guard version based on table ITEM."; + /** Invoker for IO. Until configured properly, use try-once. */ private Invoker invoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, Invoker.NO_OP ); - private AmazonDynamoDB amazonDynamoDB; + final private AmazonDynamoDB amazonDynamoDB; final private DynamoDB dynamoDB; final private String tableName; - private Table table; - private String region; - private Configuration conf; - private final Invoker readOp; - private final RetryPolicy batchWriteRetryPolicy; + final private String region; + final private Configuration conf; + final private Invoker readOp; + final private RetryPolicy batchWriteRetryPolicy; + private Table table; private String tableArn; public DynamoDBMetadataStoreTableHandler(DynamoDB dynamoDB, @@ -164,8 +181,8 @@ Table initTable() throws IOException { + ": tableName='" + tableName + "', region=" + region); } + verifyVersionCompatibility(); final Item versionMarker = getVersionMarkerItem(); - verifyVersionCompatibility(tableName, versionMarker); Long created = extractCreationTimeFromMarker(versionMarker); LOG.debug("Using existing DynamoDB table {} in region {} created {}", tableName, region, (created != null) ? new Date(created) : null); @@ -209,25 +226,37 @@ Table initTable() throws IOException { return table; } - private void addVersionMarkerToEmptyTable(String tableName) - throws IOException { - final ScanResult result = readOp.retry( - "scan", - null, - true, - () -> { - final ScanRequest req = new ScanRequest().withTableName( - tableName).withLimit(1); - return amazonDynamoDB.scan(req); - } - ); - boolean isEmptyTable = result.getCount() == 0; + protected void tagTableWithVersionMarker() { + TagResourceRequest tagResourceRequest = new TagResourceRequest() + .withResourceArn(table.getDescription().getTableArn()) + .withTags(newVersionMarkerTag()); + amazonDynamoDB.tagResource(tagResourceRequest); + } - if (!isEmptyTable) { - // the table is not empty, do nothing. + // todo test + protected static Item getVersionMarkerFromTags(Table table, + AmazonDynamoDB addb) { + final List tags; + try { + final TableDescription description = table.describe(); + ListTagsOfResourceRequest listTagsOfResourceRequest = + new ListTagsOfResourceRequest() + .withResourceArn(description.getTableArn()); + tags = addb.listTagsOfResource(listTagsOfResourceRequest).getTags(); + } catch (ResourceNotFoundException e) { + LOG.error("Table: {} not found."); + throw e; + } + + final Optional first = tags.stream() + .filter(tag -> tag.getKey().equals(VERSION_MARKER)).findFirst(); + if (first.isPresent()) { + final Tag vmTag = first.get(); + return createVersionMarker( + vmTag.getKey(), Integer.valueOf(vmTag.getValue()), 0 + ); } else { - // the table is empty, add version marker - putVersionMarkerToTable(); + return null; } } @@ -236,6 +265,10 @@ private void addVersionMarkerToEmptyTable(String tableName) * marker. * Creating an setting up the table isn't wrapped by any retry operations; * the wait for a table to become available is RetryTranslated. + * The tags are added to the table during creation, not after creation. + * We can assume that tagging and creating the table is a single atomic + * operation. + * * @param capacity capacity to provision. If null: create a per-request * table. * @throws IOException on any failure. @@ -248,7 +281,8 @@ private void createTable(ProvisionedThroughput capacity) throws IOException { CreateTableRequest request = new CreateTableRequest() .withTableName(tableName) .withKeySchema(keySchema()) - .withAttributeDefinitions(attributeDefinitions()); + .withAttributeDefinitions(attributeDefinitions()) + .withTags(getTableTagsFromConfig()); if (capacity != null) { mode = String.format("with provisioned read capacity %d and" + " write capacity %s", @@ -270,15 +304,17 @@ private void createTable(ProvisionedThroughput capacity) throws IOException { } waitForTableActive(table); putVersionMarkerToTable(); - tagTable(); } /** - * Add tags from configuration to the existing DynamoDB table. + * Return tags from configuration and the version marker for adding to + * dynamo table during creation */ @Retries.OnceRaw - public void tagTable() { + public List getTableTagsFromConfig() { List tags = new ArrayList<>(); + + // from configuration Map tagProperties = conf.getPropsWithPrefix(S3GUARD_DDB_TABLE_TAG); for (Map.Entry tagMapEntry : tagProperties.entrySet()) { @@ -286,39 +322,124 @@ public void tagTable() { .withValue(tagMapEntry.getValue()); tags.add(tag); } - if (tags.isEmpty()) { - return; - } + // add the version marker + tags.add(newVersionMarkerTag()); + return tags; + } - TagResourceRequest tagResourceRequest = new TagResourceRequest() - .withResourceArn(table.getDescription().getTableArn()) - .withTags(tags); - amazonDynamoDB.tagResource(tagResourceRequest); + /** + * Create a new version marker tag + * @return a new version marker tag + */ + private static Tag newVersionMarkerTag() { + return new Tag().withKey(VERSION_MARKER).withValue(valueOf(VERSION)); } /** * Verify that a table version is compatible with this S3Guard client. - * @param tableName name of the table (for error messages) - * @param versionMarker the version marker retrieved from the table + * + * Checks for consistency between the version marker as the item and tag. + * + *

+   *   1. If the table lacks both version markers AND it's empty,
+   *      both markers will be added.
+   *      If the table is not empty the check throws {@link IOException}
+   *   2. If there's no version marker ITEM, the compatibility with the TAG
+   *      will be checked, and the version marker ITEM will be added if the
+   *      TAG version is compatible.
+   *      If the TAG version is not compatible, the check throws {@link IOException}
+   *   3. If there's no version marker TAG, the compatibility with the ITEM
+   *      version marker will be checked, and the version marker ITEM will be
+   *      added if the ITEM version is compatible.
+   *      If the ITEM version is not compatible, the check throws {@link IOException}
+   *   4. If the TAG and ITEM versions are both present then both will be checked
+   *      for compatibility. If the ITEM or TAG version marker is not compatible,
+   *      the check throws {@link IOException}
+   * 
+ * * @throws IOException on any incompatibility */ @VisibleForTesting - static void verifyVersionCompatibility(String tableName, - Item versionMarker) throws IOException { - if (versionMarker == null) { - LOG.warn("Table {} contains no version marker", tableName); - throw new IOException(E_NO_VERSION_MARKER - + " Table: " + tableName); - } else { - final int version = extractVersionFromMarker(versionMarker); - if (VERSION != version) { - // version mismatch. Unless/until there is support for - // upgrading versions, treat this as an incompatible change - // and fail. - throw new IOException(E_INCOMPATIBLE_VERSION - + " Table " + tableName - + " Expected version " + VERSION + " actual " + version); + protected void verifyVersionCompatibility() throws IOException { + final Item versionMarkerItem = getVersionMarkerItem(); + final Item versionMarkerFromTag = + getVersionMarkerFromTags(table, amazonDynamoDB); + + LOG.debug("versionMarkerItem: {}; versionMarkerFromTag: {}", + versionMarkerItem, versionMarkerFromTag); + + if (versionMarkerItem == null && versionMarkerFromTag == null) { + if (!isEmptyTable(tableName, amazonDynamoDB)) { + LOG.error("Table is not empty but missing the version maker. Failing."); + throw new IOException(E_NO_VERSION_MARKER_AND_NOT_EMPTY + + " Table: " + tableName); } + + LOG.info("Table {} contains no version marker item or tag. " + + "The table is empty, so the version marker will be added " + + "as TAG and ITEM.", tableName); + + tagTableWithVersionMarker(); + putVersionMarkerToTable(); + } + + if (versionMarkerItem == null && versionMarkerFromTag != null) { + final int tagVersionMarker = + extractVersionFromMarker(versionMarkerFromTag); + throwExceptionOnVersionMismatch(tagVersionMarker, tableName, + E_INCOMPATIBLE_TAG_VERSION); + + LOG.info("Table {} contains no version marker ITEM but contains " + + "compatible version marker TAG. Restoring the version marker " + + "item from tag.", tableName); + + putVersionMarkerToTable(); + } + + if (versionMarkerItem != null && versionMarkerFromTag == null) { + final int itemVersionMarker = + extractVersionFromMarker(versionMarkerItem); + throwExceptionOnVersionMismatch(itemVersionMarker, tableName, + E_INCOMPATIBLE_ITEM_VERSION); + + LOG.info("Table {} contains no version marker TAG but contains " + + "compatible version marker ITEM. Restoring the version marker " + + "item from item.", tableName); + + tagTableWithVersionMarker(); + } + + if (versionMarkerItem != null && versionMarkerFromTag != null) { + final int tagVersionMarker = + extractVersionFromMarker(versionMarkerFromTag); + final int itemVersionMarker = + extractVersionFromMarker(versionMarkerItem); + + throwExceptionOnVersionMismatch(tagVersionMarker, tableName, + E_INCOMPATIBLE_TAG_VERSION); + throwExceptionOnVersionMismatch(itemVersionMarker, tableName, + E_INCOMPATIBLE_ITEM_VERSION); + + LOG.info("Table {} contains correct version marker TAG and ITEM.", + tableName); + } + } + + private static boolean isEmptyTable(String tableName, AmazonDynamoDB aadb) { + final ScanRequest req = new ScanRequest().withTableName( + tableName).withLimit(1); + final ScanResult result = aadb.scan(req); + return result.getCount() == 0; + } + + private static void throwExceptionOnVersionMismatch(int actual, + String tableName, + String exMsg) throws IOException { + + if (VERSION != actual) { + throw new IOException(exMsg + " Table " + tableName + + " Expected version: " + VERSION + " actual tag version: " + + actual); } } @@ -414,7 +535,7 @@ public static SdkBaseException extractInnerException( */ @VisibleForTesting @Retries.RetryTranslated - Item getVersionMarkerItem() throws IOException { + protected Item getVersionMarkerItem() throws IOException { final PrimaryKey versionMarkerKey = createVersionMarkerPrimaryKey(VERSION_MARKER); int retryCount = 0; @@ -440,7 +561,8 @@ Item getVersionMarkerItem() throws IOException { if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { break; } else { - LOG.debug("Sleeping {} ms before next retry", action.delayMillis); + LOG.warn("Version marker in the dynamo table was null. " + + "Sleeping {} ms before next retry", action.delayMillis); Thread.sleep(action.delayMillis); } } catch (Exception e) { @@ -519,10 +641,6 @@ void provisionTableBlocking(Long readCapacity, Long writeCapacity) waitForTableActive(table); } - public String getTableName() { - return tableName; - } - public Table getTable() { return table; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java index 348dfbfced448..be120881cb341 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java @@ -247,7 +247,7 @@ static int extractVersionFromMarker(Item marker) throws IOException { * @return the creation time, or null * @throws IOException if the item is not a version marker */ - static Long extractCreationTimeFromMarker(Item marker) throws IOException { + static Long extractCreationTimeFromMarker(Item marker) { if (marker.hasAttribute(TABLE_CREATED)) { return marker.getLong(TABLE_CREATED); } else { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java index 1feae3bdd864a..096eea6725fe3 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.UUID; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.dynamodbv2.document.Item; import com.amazonaws.services.dynamodbv2.document.PrimaryKey; @@ -41,6 +42,8 @@ import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException; import com.amazonaws.services.dynamodbv2.model.TableDescription; import com.amazonaws.services.dynamodbv2.model.Tag; +import com.amazonaws.services.dynamodbv2.model.TagResourceRequest; +import com.amazonaws.services.dynamodbv2.model.UntagResourceRequest; import com.google.common.collect.Lists; import org.assertj.core.api.Assertions; @@ -70,10 +73,15 @@ import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.security.UserGroupInformation; +import static java.lang.String.valueOf; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.fs.s3a.S3AUtils.clearBucketOption; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableHandler.E_INCOMPATIBLE_ITEM_VERSION; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableHandler.E_INCOMPATIBLE_TAG_VERSION; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableHandler.E_NO_VERSION_MARKER_AND_NOT_EMPTY; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableHandler.getVersionMarkerFromTags; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*; import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.*; import static org.apache.hadoop.test.LambdaTestUtils.*; @@ -619,27 +627,6 @@ public void testInitExistingTable() throws IOException { verifyTableInitialized(tableName, ddbms.getDynamoDB()); } - /** - * Test the low level version check code. - */ - @Test - public void testItemVersionCompatibility() throws Throwable { - DynamoDBMetadataStoreTableHandler.verifyVersionCompatibility("table", - createVersionMarker(VERSION_MARKER, VERSION, 0)); - } - - /** - * Test that a version marker entry without the version number field - * is rejected as incompatible with a meaningful error message. - */ - @Test - public void testItemLacksVersion() throws Throwable { - intercept(IOException.class, E_NOT_VERSION_MARKER, - () -> DynamoDBMetadataStoreTableHandler.verifyVersionCompatibility("table", - new Item().withPrimaryKey( - createVersionMarkerPrimaryKey(VERSION_MARKER)))); - } - /** * Test versioning handling. *
    @@ -669,44 +656,114 @@ public void testTableVersioning() throws Exception { ddbms.getTableHandler(); Table table = verifyTableInitialized(tableName, ddbms.getDynamoDB()); - // check the tagging too + // check the tagging verifyStoreTags(createTagMap(), ddbms); + // check version compatibility + checkVerifyVersionMarkerCompatibility(localTableHandler, table); - Item originalVersionMarker = table.getItem(VERSION_MARKER_PRIMARY_KEY); - table.deleteItem(VERSION_MARKER_PRIMARY_KEY); - assertNull("Version marker should be null after deleting it from the table.", - table.getItem(VERSION_MARKER_PRIMARY_KEY)); - - // create existing table - intercept(IOException.class, E_NO_VERSION_MARKER, - () -> localTableHandler.initTable()); - - // now add a different version marker - Item v200 = createVersionMarker(VERSION_MARKER, VERSION * 2, 0); - table.putItem(v200); - - // create existing table - intercept(IOException.class, E_INCOMPATIBLE_VERSION, - () -> localTableHandler.initTable()); - - // create a marker with no version and expect failure - final Item invalidMarker = new Item().withPrimaryKey( - createVersionMarkerPrimaryKey(VERSION_MARKER)) - .withLong(TABLE_CREATED, 0); - table.putItem(invalidMarker); - - intercept(IOException.class, E_NOT_VERSION_MARKER, - () -> localTableHandler.initTable()); - - // reinstate the version marker - table.putItem(originalVersionMarker); - localTableHandler.initTable(); conf.setInt(S3GUARD_DDB_MAX_RETRIES, maxRetries); } finally { destroy(ddbms); } } + private void checkVerifyVersionMarkerCompatibility( + DynamoDBMetadataStoreTableHandler localTableHandler, Table table) + throws Exception { + final AmazonDynamoDB addb + = getDynamoMetadataStore().getAmazonDynamoDB(); + Item originalVersionMarker = table.getItem(VERSION_MARKER_PRIMARY_KEY); + + LOG.info("1/6: remove version marker and tags from table " + + "the table is empty, so it should be initialized after the call"); + deleteVersionMarkerItem(table); + removeVersionMarkerTag(table, addb); + localTableHandler.initTable(); + + final int versionFromItem = extractVersionFromMarker( + localTableHandler.getVersionMarkerItem()); + final int versionFromTag = extractVersionFromMarker( + getVersionMarkerFromTags(table, addb)); + assertEquals("Table should be tagged with the right version.", + VERSION, versionFromTag); + assertEquals("Table should have the right version marker.", + VERSION, versionFromItem); + + LOG.info("2/6: if the table is not empty and there's no version marker " + + "it should fail"); + deleteVersionMarkerItem(table); + removeVersionMarkerTag(table, addb); + String testKey = "coffee"; + Item wrongItem = + createVersionMarker(testKey, VERSION * 2, 0); + table.putItem(wrongItem); + intercept(IOException.class, E_NO_VERSION_MARKER_AND_NOT_EMPTY, + () -> localTableHandler.initTable()); + + LOG.info("3/6: table has only version marker item then it will be tagged"); + table.putItem(originalVersionMarker); + localTableHandler.initTable(); + final int versionFromTag2 = extractVersionFromMarker( + getVersionMarkerFromTags(table, addb)); + assertEquals("Table should have the right version marker tag " + + "if there was a version item.", VERSION, versionFromTag2); + + LOG.info("4/6: table has only version marker tag then the version marker " + + "item will be created."); + deleteVersionMarkerItem(table); + removeVersionMarkerTag(table, addb); + localTableHandler.tagTableWithVersionMarker(); + localTableHandler.initTable(); + final int versionFromItem2 = extractVersionFromMarker( + localTableHandler.getVersionMarkerItem()); + assertEquals("Table should have the right version marker item " + + "if there was a version tag.", VERSION, versionFromItem2); + + LOG.info("5/6: add a different marker tag to the table: init should fail"); + deleteVersionMarkerItem(table); + removeVersionMarkerTag(table, addb); + Item v200 = createVersionMarker(VERSION_MARKER, VERSION * 2, 0); + table.putItem(v200); + intercept(IOException.class, E_INCOMPATIBLE_ITEM_VERSION, + () -> localTableHandler.initTable()); + + LOG.info("6/6: add a different marker item to the table: init should fail"); + deleteVersionMarkerItem(table); + removeVersionMarkerTag(table, addb); + int wrongVersion = VERSION + 3; + tagTableWithCustomVersion(table, addb, wrongVersion); + intercept(IOException.class, E_INCOMPATIBLE_TAG_VERSION, + () -> localTableHandler.initTable()); + + // CLEANUP + table.putItem(originalVersionMarker); + localTableHandler.tagTableWithVersionMarker(); + localTableHandler.initTable(); + } + + private void tagTableWithCustomVersion(Table table, + AmazonDynamoDB addb, + int wrongVersion) { + final Tag vmTag = new Tag().withKey(VERSION_MARKER) + .withValue(valueOf(wrongVersion)); + TagResourceRequest tagResourceRequest = new TagResourceRequest() + .withResourceArn(table.getDescription().getTableArn()) + .withTags(vmTag); + addb.tagResource(tagResourceRequest); + } + + private void removeVersionMarkerTag(Table table, AmazonDynamoDB addb) { + addb.untagResource(new UntagResourceRequest() + .withResourceArn(table.describe().getTableArn()) + .withTagKeys(VERSION_MARKER)); + } + + private void deleteVersionMarkerItem(Table table) { + table.deleteItem(VERSION_MARKER_PRIMARY_KEY); + assertNull("Version marker should be null after deleting it from the table.", + table.getItem(VERSION_MARKER_PRIMARY_KEY)); + } + /** * Test that initTable fails with IOException when table does not exist and * table auto-creation is disabled. @@ -959,8 +1016,11 @@ protected void verifyStoreTags(final Map tagMap, tags.forEach(t -> actual.put(t.getKey(), t.getValue())); Assertions.assertThat(actual) .describedAs("Tags from DDB table") - .containsExactlyEntriesOf(tagMap); - assertEquals(tagMap.size(), tags.size()); + .containsAllEntriesOf(tagMap); + + // The version marker is always there in the tags. + // We have a plus one in tags we expect. + assertEquals(tagMap.size() + 1, tags.size()); } protected List listTagsOfStore(final DynamoDBMetadataStore store) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java index 205eb65a1c919..ad31305ea291b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java @@ -160,7 +160,8 @@ public void testDynamoTableTagging() throws Exception { List tags = ddbms.getAmazonDynamoDB().listTagsOfResource(listTagsOfResourceRequest).getTags(); // assert - assertEquals(tagMap.size(), tags.size()); + // table version is always there as a plus one tag. + assertEquals(tagMap.size() + 1, tags.size()); for (Tag tag : tags) { Assert.assertEquals(tagMap.get(tag.getKey()), tag.getValue()); } From 2b3cc5d027114ca577cf58969928b0d4c7e5ed12 Mon Sep 17 00:00:00 2001 From: Gabor Bota Date: Wed, 2 Oct 2019 17:02:59 +0200 Subject: [PATCH 3/8] import order fix Change-Id: I3ab87ca8c31d9e06ec1562691edd316536cfc883 --- .../DynamoDBMetadataStoreTableHandler.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java index 9c46a52db3a28..9e256090a54d1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java @@ -18,6 +18,15 @@ package org.apache.hadoop.fs.s3a.s3guard; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Optional; + import com.amazonaws.AmazonClientException; import com.amazonaws.SdkBaseException; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; @@ -51,15 +60,6 @@ import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Optional; - import static java.lang.String.valueOf; import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT; import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_TABLE_CAPACITY_READ_KEY; From 14f47fc5e292fb8533d5403855c7a3f37c5426c2 Mon Sep 17 00:00:00 2001 From: Gabor Bota Date: Wed, 2 Oct 2019 17:23:51 +0200 Subject: [PATCH 4/8] fix one failing test tagging Change-Id: I924347a550cecf991d37c9a1a8f1f1071967c3db --- .../hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java index ad31305ea291b..72df79b136221 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java @@ -163,6 +163,10 @@ public void testDynamoTableTagging() throws Exception { // table version is always there as a plus one tag. assertEquals(tagMap.size() + 1, tags.size()); for (Tag tag : tags) { + // skip the version marker tag + if (tag.getKey().equals(VERSION_MARKER)) { + continue; + } Assert.assertEquals(tagMap.get(tag.getKey()), tag.getValue()); } // be sure to clean up - delete table From 5e318746f2c16cc36e76725225967fa4535bad78 Mon Sep 17 00:00:00 2001 From: Gabor Bota Date: Thu, 3 Oct 2019 15:55:21 +0200 Subject: [PATCH 5/8] fixes based on Steve's comments. What's missing: add docs; shorter timeout for version marker ITEM check Change-Id: Idf960a0c0541c2d2088cbcafe045f6d43f366383 --- .../fs/s3a/s3guard/DynamoDBMetadataStore.java | 32 ++----- .../DynamoDBMetadataStoreTableHandler.java | 88 ++++++++++++++----- .../fs/s3a/s3guard/S3GuardTableAccess.java | 6 +- .../s3guard/ITestDynamoDBMetadataStore.java | 8 +- .../s3a/s3guard/ITestS3GuardToolDynamoDB.java | 2 +- .../TestPathMetadataDynamoDBTranslation.java | 6 +- 6 files changed, 81 insertions(+), 61 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java index f8d02c6384e6e..384a885173f92 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java @@ -221,7 +221,10 @@ public class DynamoDBMetadataStore implements MetadataStore, OPERATIONS_LOG_NAME); /** parent/child name to use in the version marker. */ - public static final String VERSION_MARKER = "../VERSION"; + public static final String VERSION_MARKER_ITEM_NAME = "../VERSION"; + + /** parent/child name to use in the version marker. */ + public static final String VERSION_MARKER_TAG_NAME = "s3guard_version"; /** Current version number. */ public static final int VERSION = 100; @@ -1429,32 +1432,7 @@ public synchronized void close() { @Override @Retries.RetryTranslated public void destroy() throws IOException { - if (table == null) { - LOG.info("In destroy(): no table to delete"); - return; - } - LOG.info("Deleting DynamoDB table {} in region {}", tableName, region); - Preconditions.checkNotNull(dynamoDB, "Not connected to DynamoDB"); - try { - invoker.retry("delete", null, true, - () -> table.delete()); - table.waitForDelete(); - } catch (IllegalArgumentException ex) { - throw new TableDeleteTimeoutException(tableName, - "Timeout waiting for the table " + tableHandler.getTableArn() - + " to be deleted", ex); - } catch (FileNotFoundException rnfe) { - LOG.info("FileNotFoundException while deleting DynamoDB table {} in " - + "region {}. This may indicate that the table does not exist, " - + "or has been deleted by another concurrent thread or process.", - tableName, region); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - LOG.warn("Interrupted while waiting for DynamoDB table {} being deleted", - tableName, ie); - throw new InterruptedIOException("Table " + tableName - + " in region " + region + " has not been deleted"); - } + tableHandler.destroy(); } @Retries.RetryTranslated diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java index 9e256090a54d1..0d46cc18d83bc 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java @@ -35,6 +35,7 @@ import com.amazonaws.services.dynamodbv2.document.PrimaryKey; import com.amazonaws.services.dynamodbv2.document.PutItemOutcome; import com.amazonaws.services.dynamodbv2.document.Table; +import com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException; import com.amazonaws.services.dynamodbv2.model.BillingMode; import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; import com.amazonaws.services.dynamodbv2.model.ListTagsOfResourceRequest; @@ -70,7 +71,8 @@ import static org.apache.hadoop.fs.s3a.S3AUtils.translateException; import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.E_ON_DEMAND_NO_SET_CAPACITY; import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION; -import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER_ITEM_NAME; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER_TAG_NAME; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.attributeDefinitions; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.createVersionMarker; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.createVersionMarkerPrimaryKey; @@ -148,7 +150,7 @@ public DynamoDBMetadataStoreTableHandler(DynamoDB dynamoDB, * or table is being deleted, or any other I/O exception occurred. */ @VisibleForTesting - @Retries.OnceRaw + @Retries.RetryTranslated Table initTable() throws IOException { table = dynamoDB.getTable(tableName); try { @@ -227,16 +229,19 @@ Table initTable() throws IOException { } protected void tagTableWithVersionMarker() { - TagResourceRequest tagResourceRequest = new TagResourceRequest() - .withResourceArn(table.getDescription().getTableArn()) - .withTags(newVersionMarkerTag()); - amazonDynamoDB.tagResource(tagResourceRequest); + try { + TagResourceRequest tagResourceRequest = new TagResourceRequest() + .withResourceArn(table.getDescription().getTableArn()) + .withTags(newVersionMarkerTag()); + amazonDynamoDB.tagResource(tagResourceRequest); + } catch (AmazonDynamoDBException e) { + LOG.warn("Exception during tagging table: {}", e.getMessage()); + } } - // todo test protected static Item getVersionMarkerFromTags(Table table, AmazonDynamoDB addb) { - final List tags; + List tags = null; try { final TableDescription description = table.describe(); ListTagsOfResourceRequest listTagsOfResourceRequest = @@ -246,10 +251,17 @@ protected static Item getVersionMarkerFromTags(Table table, } catch (ResourceNotFoundException e) { LOG.error("Table: {} not found."); throw e; + } catch (AmazonDynamoDBException e) { + LOG.warn("Exception while getting tags from the dynamo table: {}", + e.getMessage()); + } + + if (tags == null) { + return null; } final Optional first = tags.stream() - .filter(tag -> tag.getKey().equals(VERSION_MARKER)).findFirst(); + .filter(tag -> tag.getKey().equals(VERSION_MARKER_TAG_NAME)).findFirst(); if (first.isPresent()) { final Tag vmTag = first.get(); return createVersionMarker( @@ -303,7 +315,7 @@ private void createTable(ProvisionedThroughput capacity) throws IOException { tableName, region); } waitForTableActive(table); - putVersionMarkerToTable(); + putVersionMarkerItemToTable(); } /** @@ -332,7 +344,7 @@ public List getTableTagsFromConfig() { * @return a new version marker tag */ private static Tag newVersionMarkerTag() { - return new Tag().withKey(VERSION_MARKER).withValue(valueOf(VERSION)); + return new Tag().withKey(VERSION_MARKER_TAG_NAME).withValue(valueOf(VERSION)); } /** @@ -343,18 +355,18 @@ private static Tag newVersionMarkerTag() { *
        *   1. If the table lacks both version markers AND it's empty,
        *      both markers will be added.
    -   *      If the table is not empty the check throws {@link IOException}
    +   *      If the table is not empty the check throws IOException
        *   2. If there's no version marker ITEM, the compatibility with the TAG
        *      will be checked, and the version marker ITEM will be added if the
        *      TAG version is compatible.
    -   *      If the TAG version is not compatible, the check throws {@link IOException}
    +   *      If the TAG version is not compatible, the check throws OException
        *   3. If there's no version marker TAG, the compatibility with the ITEM
        *      version marker will be checked, and the version marker ITEM will be
        *      added if the ITEM version is compatible.
    -   *      If the ITEM version is not compatible, the check throws {@link IOException}
    +   *      If the ITEM version is not compatible, the check throws IOException
        *   4. If the TAG and ITEM versions are both present then both will be checked
        *      for compatibility. If the ITEM or TAG version marker is not compatible,
    -   *      the check throws {@link IOException}
    +   *      the check throws IOException
        * 
    * * @throws IOException on any incompatibility @@ -380,7 +392,7 @@ protected void verifyVersionCompatibility() throws IOException { "as TAG and ITEM.", tableName); tagTableWithVersionMarker(); - putVersionMarkerToTable(); + putVersionMarkerItemToTable(); } if (versionMarkerItem == null && versionMarkerFromTag != null) { @@ -393,7 +405,7 @@ protected void verifyVersionCompatibility() throws IOException { "compatible version marker TAG. Restoring the version marker " + "item from tag.", tableName); - putVersionMarkerToTable(); + putVersionMarkerItemToTable(); } if (versionMarkerItem != null && versionMarkerFromTag == null) { @@ -447,8 +459,8 @@ private static void throwExceptionOnVersionMismatch(int actual, * Add version marker to the dynamo table */ @Retries.OnceRaw - private void putVersionMarkerToTable() { - final Item marker = createVersionMarker(VERSION_MARKER, VERSION, + private void putVersionMarkerItemToTable() { + final Item marker = createVersionMarker(VERSION_MARKER_ITEM_NAME, VERSION, System.currentTimeMillis()); putItem(marker); } @@ -537,7 +549,7 @@ public static SdkBaseException extractInnerException( @Retries.RetryTranslated protected Item getVersionMarkerItem() throws IOException { final PrimaryKey versionMarkerKey = - createVersionMarkerPrimaryKey(VERSION_MARKER); + createVersionMarkerPrimaryKey(VERSION_MARKER_ITEM_NAME); int retryCount = 0; // look for a version marker, with usual throttling/failure retries. Item versionMarker = queryVersionMarker(versionMarkerKey); @@ -561,8 +573,8 @@ protected Item getVersionMarkerItem() throws IOException { if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { break; } else { - LOG.warn("Version marker in the dynamo table was null. " + - "Sleeping {} ms before next retry", action.delayMillis); + LOG.warn("No version marker found in the DynamoDB table: {}. " + + "Sleeping {} ms before next retry", tableName, action.delayMillis); Thread.sleep(action.delayMillis); } } catch (Exception e) { @@ -585,7 +597,7 @@ protected Item getVersionMarkerItem() throws IOException { private Item queryVersionMarker(final PrimaryKey versionMarkerKey) throws IOException { return readOp.retry("getVersionMarkerItem", - VERSION_MARKER, true, + VERSION_MARKER_ITEM_NAME, true, () -> table.getItem(versionMarkerKey)); } @@ -633,6 +645,36 @@ void provisionTable(Long readCapacity, Long writeCapacity) }); } + @Retries.RetryTranslated + public void destroy() throws IOException { + if (table == null) { + LOG.info("In destroy(): no table to delete"); + return; + } + LOG.info("Deleting DynamoDB table {} in region {}", tableName, region); + Preconditions.checkNotNull(dynamoDB, "Not connected to DynamoDB"); + try { + invoker.retry("delete", null, true, + () -> table.delete()); + table.waitForDelete(); + } catch (IllegalArgumentException ex) { + throw new TableDeleteTimeoutException(tableName, + "Timeout waiting for the table " + getTableArn() + + " to be deleted", ex); + } catch (FileNotFoundException rnfe) { + LOG.info("FileNotFoundException while deleting DynamoDB table {} in " + + "region {}. This may indicate that the table does not exist, " + + "or has been deleted by another concurrent thread or process.", + tableName, region); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while waiting for DynamoDB table {} being deleted", + tableName, ie); + throw new InterruptedIOException("Table " + tableName + + " in region " + region + " has not been deleted"); + } + } + @Retries.RetryTranslated @VisibleForTesting void provisionTableBlocking(Long readCapacity, Long writeCapacity) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java index 5592faafe3ebd..19ef90e455741 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java @@ -39,7 +39,7 @@ import org.apache.hadoop.fs.s3a.S3AFileStatus; import static com.google.common.base.Preconditions.checkNotNull; -import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER_ITEM_NAME; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.CHILD; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.PARENT; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.TABLE_VERSION; @@ -199,8 +199,8 @@ public boolean hasNext() { public DDBPathMetadata next() { Item item = it.next(); Pair key = primaryKey(item); - if (VERSION_MARKER.equals(key.getLeft()) && - VERSION_MARKER.equals(key.getRight())) { + if (VERSION_MARKER_ITEM_NAME.equals(key.getLeft()) && + VERSION_MARKER_ITEM_NAME.equals(key.getRight())) { // a version marker is found, return the special type return new VersionMarker(item); } else { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java index 096eea6725fe3..11b2bcb51741a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java @@ -118,7 +118,7 @@ public ITestDynamoDBMetadataStore() { LoggerFactory.getLogger(ITestDynamoDBMetadataStore.class); public static final PrimaryKey VERSION_MARKER_PRIMARY_KEY = createVersionMarkerPrimaryKey( - DynamoDBMetadataStore.VERSION_MARKER); + DynamoDBMetadataStore.VERSION_MARKER_ITEM_NAME); private S3AFileSystem fileSystem; private S3AContract s3AContract; @@ -722,7 +722,7 @@ private void checkVerifyVersionMarkerCompatibility( LOG.info("5/6: add a different marker tag to the table: init should fail"); deleteVersionMarkerItem(table); removeVersionMarkerTag(table, addb); - Item v200 = createVersionMarker(VERSION_MARKER, VERSION * 2, 0); + Item v200 = createVersionMarker(VERSION_MARKER_ITEM_NAME, VERSION * 2, 0); table.putItem(v200); intercept(IOException.class, E_INCOMPATIBLE_ITEM_VERSION, () -> localTableHandler.initTable()); @@ -744,7 +744,7 @@ private void checkVerifyVersionMarkerCompatibility( private void tagTableWithCustomVersion(Table table, AmazonDynamoDB addb, int wrongVersion) { - final Tag vmTag = new Tag().withKey(VERSION_MARKER) + final Tag vmTag = new Tag().withKey(VERSION_MARKER_TAG_NAME) .withValue(valueOf(wrongVersion)); TagResourceRequest tagResourceRequest = new TagResourceRequest() .withResourceArn(table.getDescription().getTableArn()) @@ -755,7 +755,7 @@ private void tagTableWithCustomVersion(Table table, private void removeVersionMarkerTag(Table table, AmazonDynamoDB addb) { addb.untagResource(new UntagResourceRequest() .withResourceArn(table.describe().getTableArn()) - .withTagKeys(VERSION_MARKER)); + .withTagKeys(VERSION_MARKER_TAG_NAME)); } private void deleteVersionMarkerItem(Table table) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java index 72df79b136221..358ab8322047e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java @@ -164,7 +164,7 @@ public void testDynamoTableTagging() throws Exception { assertEquals(tagMap.size() + 1, tags.size()); for (Tag tag : tags) { // skip the version marker tag - if (tag.getKey().equals(VERSION_MARKER)) { + if (tag.getKey().equals(VERSION_MARKER_TAG_NAME)) { continue; } Assert.assertEquals(tagMap.get(tag.getKey()), tag.getValue()); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestPathMetadataDynamoDBTranslation.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestPathMetadataDynamoDBTranslation.java index 70bf901514bab..c882094853302 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestPathMetadataDynamoDBTranslation.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestPathMetadataDynamoDBTranslation.java @@ -51,7 +51,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*; -import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER_ITEM_NAME; import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION; import static org.mockito.Mockito.never; @@ -272,14 +272,14 @@ private static void doTestPathToKey(Path path) { @Test public void testVersionRoundTrip() throws Throwable { - final Item marker = createVersionMarker(VERSION_MARKER, VERSION, 0); + final Item marker = createVersionMarker(VERSION_MARKER_ITEM_NAME, VERSION, 0); assertEquals("Extracted version from " + marker, VERSION, extractVersionFromMarker(marker)); } @Test public void testVersionMarkerNotStatusIllegalPath() throws Throwable { - final Item marker = createVersionMarker(VERSION_MARKER, VERSION, 0); + final Item marker = createVersionMarker(VERSION_MARKER_ITEM_NAME, VERSION, 0); assertNull("Path metadata fromfrom " + marker, itemToPathMetadata(marker, "alice")); } From ecd49f4db0b5ae2d2421548f0253b815846531a7 Mon Sep 17 00:00:00 2001 From: Gabor Bota Date: Wed, 9 Oct 2019 15:36:43 +0200 Subject: [PATCH 6/8] docs and small fixes Change-Id: Ief4db248f46be18d1838743dc0b212d6a4bbfd83 --- .../org/apache/hadoop/fs/s3a/Constants.java | 2 +- .../fs/s3a/s3guard/DynamoDBMetadataStore.java | 11 +++--- ...=> DynamoDBMetadataStoreTableManager.java} | 14 ++++---- .../site/markdown/tools/hadoop-aws/s3guard.md | 35 +++++++++++++++++-- .../s3guard/ITestDynamoDBMetadataStore.java | 14 ++++---- .../ITestDynamoDBMetadataStoreScale.java | 2 +- .../s3guard/TestDynamoDBMiscOperations.java | 2 +- 7 files changed, 54 insertions(+), 26 deletions(-) rename hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/{DynamoDBMetadataStoreTableHandler.java => DynamoDBMetadataStoreTableManager.java} (98%) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index fdbdf37a26ab3..9f120b8eda003 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -176,7 +176,7 @@ private Constants() { // number of times we should retry errors public static final String MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum"; - public static final int DEFAULT_MAX_ERROR_RETRIES = 20; + public static final int DEFAULT_MAX_ERROR_RETRIES = 10; // seconds until we give up trying to establish a connection to s3 public static final String ESTABLISH_TIMEOUT = diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java index 384a885173f92..044f3a573118f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.s3a.s3guard; import javax.annotation.Nullable; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; import java.net.URI; @@ -295,7 +294,7 @@ public class DynamoDBMetadataStore implements MetadataStore, * This policy is mostly for batched writes, not for processing * exceptions in invoke() calls. * It also has a role purpose in - * {@link DynamoDBMetadataStoreTableHandler#getVersionMarkerItem()}; + * {@link DynamoDBMetadataStoreTableManager#getVersionMarkerItem()}; * look at that method for the details. */ private RetryPolicy batchWriteRetryPolicy; @@ -342,7 +341,7 @@ public class DynamoDBMetadataStore implements MetadataStore, */ private ITtlTimeProvider ttlTimeProvider; - private DynamoDBMetadataStoreTableHandler tableHandler; + private DynamoDBMetadataStoreTableManager tableHandler; /** * A utility function to create DynamoDB instance. @@ -423,7 +422,7 @@ public void initialize(FileSystem fs, ITtlTimeProvider ttlTp) this.ttlTimeProvider = ttlTp; - tableHandler = new DynamoDBMetadataStoreTableHandler( + tableHandler = new DynamoDBMetadataStoreTableManager( dynamoDB, tableName, region, amazonDynamoDB, conf, readOp, batchWriteRetryPolicy); this.table = tableHandler.initTable(); @@ -509,7 +508,7 @@ public void initialize(Configuration config, initDataAccessRetries(conf); this.ttlTimeProvider = ttlTp; - tableHandler = new DynamoDBMetadataStoreTableHandler( + tableHandler = new DynamoDBMetadataStoreTableManager( dynamoDB, tableName, region, amazonDynamoDB, conf, readOp, batchWriteRetryPolicy); this.table = tableHandler.initTable(); @@ -2300,7 +2299,7 @@ private static String stateAsString(@Nullable AncestorState state) { } } - protected DynamoDBMetadataStoreTableHandler getTableHandler() { + protected DynamoDBMetadataStoreTableManager getTableHandler() { Preconditions.checkNotNull(tableHandler, "Not initialized"); return tableHandler; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableManager.java similarity index 98% rename from hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java rename to hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableManager.java index 0d46cc18d83bc..ac69c76bbd313 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableHandler.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableManager.java @@ -84,13 +84,13 @@ * Table handling for dynamo tables, factored out from DynamoDBMetadataStore * Mainly */ -public class DynamoDBMetadataStoreTableHandler { +public class DynamoDBMetadataStoreTableManager { public static final Logger LOG = LoggerFactory.getLogger( - DynamoDBMetadataStoreTableHandler.class); + DynamoDBMetadataStoreTableManager.class); /** Error: version marker not found in table but the table is not empty. */ public static final String E_NO_VERSION_MARKER_AND_NOT_EMPTY - = "S3Guard table lacks version marker, and not empty."; + = "S3Guard table lacks version marker, and it is not empty."; /** Error: version mismatch. */ public static final String E_INCOMPATIBLE_TAG_VERSION @@ -116,7 +116,7 @@ public class DynamoDBMetadataStoreTableHandler { private Table table; private String tableArn; - public DynamoDBMetadataStoreTableHandler(DynamoDB dynamoDB, + public DynamoDBMetadataStoreTableManager(DynamoDB dynamoDB, String tableName, String region, AmazonDynamoDB amazonDynamoDB, @@ -249,7 +249,7 @@ protected static Item getVersionMarkerFromTags(Table table, .withResourceArn(description.getTableArn()); tags = addb.listTagsOfResource(listTagsOfResourceRequest).getTags(); } catch (ResourceNotFoundException e) { - LOG.error("Table: {} not found."); + LOG.error("Table: {} not found.", table.getTableName()); throw e; } catch (AmazonDynamoDBException e) { LOG.warn("Exception while getting tags from the dynamo table: {}", @@ -286,7 +286,7 @@ protected static Item getVersionMarkerFromTags(Table table, * @throws IOException on any failure. * @throws InterruptedIOException if the wait was interrupted */ - @Retries.OnceRaw + @Retries.OnceMixed private void createTable(ProvisionedThroughput capacity) throws IOException { try { String mode; @@ -432,7 +432,7 @@ protected void verifyVersionCompatibility() throws IOException { throwExceptionOnVersionMismatch(itemVersionMarker, tableName, E_INCOMPATIBLE_ITEM_VERSION); - LOG.info("Table {} contains correct version marker TAG and ITEM.", + LOG.debug("Table {} contains correct version marker TAG and ITEM.", tableName); } } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md index cb0fd139262df..276579ce55afe 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md @@ -951,9 +951,36 @@ logged. ### Versioning -S3Guard tables are created with a version marker, an entry with the primary -key and child entry of `../VERSION`; the use of a relative path guarantees -that it will not be resolved. +S3Guard tables are created with a version marker entry and table tag. +The entry is created with the primary key and child entry of `../VERSION`; +the use of a relative path guarantees that it will not be resolved. +Table tag key is named `s3guard_version`. + +When the table is initialized by S3Guard, the table will be tagged during the +creating and the version marker entry will be created in the table. +If the table lacks the version marker entry or tag, S3Guard will try to create +it according to the following rules: + +1. If the table lacks both version markers AND it's empty, both markers will be added. +If the table is not empty the check throws IOException +1. If there's no version marker ITEM, the compatibility with the TAG +will be checked, and the version marker ITEM will be added if the +TAG version is compatible. +If the TAG version is not compatible, the check throws OException +1. If there's no version marker TAG, the compatibility with the ITEM +version marker will be checked, and the version marker ITEM will be +added if the ITEM version is compatible. +If the ITEM version is not compatible, the check throws IOException +1. If the TAG and ITEM versions are both present then both will be checked +for compatibility. If the ITEM or TAG version marker is not compatible, +the check throws IOException + +*Note*: If the user does not have sufficient rights to tag the table the +initialization of S3Guard will not fail, but there will be no version marker tag +on the dynamo table and the following message will be logged on WARN level: +``` +Exception during tagging table: {AmazonDynamoDBException exception message} +``` *Versioning policy* @@ -974,6 +1001,8 @@ in an incompatible manner. The version marker in tables exists to support such an option if it ever becomes necessary, by ensuring that all S3Guard client can recognise any version mismatch. +* Table versionin + ## Security All users of the DynamoDB table must have write access to it. This diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java index 11b2bcb51741a..ffc5fd200f299 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java @@ -78,10 +78,10 @@ import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.fs.s3a.S3AUtils.clearBucketOption; -import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableHandler.E_INCOMPATIBLE_ITEM_VERSION; -import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableHandler.E_INCOMPATIBLE_TAG_VERSION; -import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableHandler.E_NO_VERSION_MARKER_AND_NOT_EMPTY; -import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableHandler.getVersionMarkerFromTags; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.E_INCOMPATIBLE_ITEM_VERSION; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.E_INCOMPATIBLE_TAG_VERSION; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.E_NO_VERSION_MARKER_AND_NOT_EMPTY; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.getVersionMarkerFromTags; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*; import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.*; import static org.apache.hadoop.test.LambdaTestUtils.*; @@ -122,7 +122,7 @@ public ITestDynamoDBMetadataStore() { private S3AFileSystem fileSystem; private S3AContract s3AContract; - private DynamoDBMetadataStoreTableHandler tableHandler; + private DynamoDBMetadataStoreTableManager tableHandler; private URI fsUri; @@ -652,7 +652,7 @@ public void testTableVersioning() throws Exception { DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore(); try { ddbms.initialize(conf, new S3Guard.TtlTimeProvider(conf)); - DynamoDBMetadataStoreTableHandler localTableHandler = + DynamoDBMetadataStoreTableManager localTableHandler = ddbms.getTableHandler(); Table table = verifyTableInitialized(tableName, ddbms.getDynamoDB()); @@ -668,7 +668,7 @@ public void testTableVersioning() throws Exception { } private void checkVerifyVersionMarkerCompatibility( - DynamoDBMetadataStoreTableHandler localTableHandler, Table table) + DynamoDBMetadataStoreTableManager localTableHandler, Table table) throws Exception { final AmazonDynamoDB addb = getDynamoMetadataStore().getAmazonDynamoDB(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java index b36428d74028e..a0614d5a05fc1 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java @@ -91,7 +91,7 @@ public class ITestDynamoDBMetadataStoreScale private static final long MAXIMUM_WRITE_CAPACITY = 15; private DynamoDBMetadataStore ddbms; - private DynamoDBMetadataStoreTableHandler tableHandler; + private DynamoDBMetadataStoreTableManager tableHandler; private DynamoDB ddb; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java index bdba6b1720de1..602a072aac2f6 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java @@ -32,7 +32,7 @@ import org.apache.hadoop.test.HadoopTestBase; import org.apache.hadoop.fs.Path; -import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableHandler.translateTableWaitFailure; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStoreTableManager.translateTableWaitFailure; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; From 964323c0330bfff5f1f42050d0c58c55242d4f0b Mon Sep 17 00:00:00 2001 From: Gabor Bota Date: Wed, 9 Oct 2019 17:23:05 +0200 Subject: [PATCH 7/8] checkstyle and findbugs fixes Change-Id: Ifce37b942f3a929dbf7e7ab9278f1ba0be22f95b --- .../s3guard/DynamoDBMetadataStoreTableManager.java | 12 ++++++------ .../fs/s3a/s3guard/ITestDynamoDBMetadataStore.java | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableManager.java index ac69c76bbd313..d9f297cb254db 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStoreTableManager.java @@ -81,8 +81,8 @@ import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.keySchema; /** - * Table handling for dynamo tables, factored out from DynamoDBMetadataStore - * Mainly + * Managing dynamo tables for S3Guard dynamodb based metadatastore. + * Factored out from DynamoDBMetadataStore. */ public class DynamoDBMetadataStoreTableManager { public static final Logger LOG = LoggerFactory.getLogger( @@ -265,7 +265,7 @@ protected static Item getVersionMarkerFromTags(Table table, if (first.isPresent()) { final Tag vmTag = first.get(); return createVersionMarker( - vmTag.getKey(), Integer.valueOf(vmTag.getValue()), 0 + vmTag.getKey(), Integer.parseInt(vmTag.getValue()), 0 ); } else { return null; @@ -320,7 +320,7 @@ private void createTable(ProvisionedThroughput capacity) throws IOException { /** * Return tags from configuration and the version marker for adding to - * dynamo table during creation + * dynamo table during creation. */ @Retries.OnceRaw public List getTableTagsFromConfig() { @@ -340,7 +340,7 @@ public List getTableTagsFromConfig() { } /** - * Create a new version marker tag + * Create a new version marker tag. * @return a new version marker tag */ private static Tag newVersionMarkerTag() { @@ -456,7 +456,7 @@ private static void throwExceptionOnVersionMismatch(int actual, } /** - * Add version marker to the dynamo table + * Add version marker to the dynamo table. */ @Retries.OnceRaw private void putVersionMarkerItemToTable() { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java index ffc5fd200f299..e541683285ea8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java @@ -675,7 +675,7 @@ private void checkVerifyVersionMarkerCompatibility( Item originalVersionMarker = table.getItem(VERSION_MARKER_PRIMARY_KEY); LOG.info("1/6: remove version marker and tags from table " + - "the table is empty, so it should be initialized after the call"); + "the table is empty, so it should be initialized after the call"); deleteVersionMarkerItem(table); removeVersionMarkerTag(table, addb); localTableHandler.initTable(); @@ -760,8 +760,8 @@ private void removeVersionMarkerTag(Table table, AmazonDynamoDB addb) { private void deleteVersionMarkerItem(Table table) { table.deleteItem(VERSION_MARKER_PRIMARY_KEY); - assertNull("Version marker should be null after deleting it from the table.", - table.getItem(VERSION_MARKER_PRIMARY_KEY)); + assertNull("Version marker should be null after deleting it " + + "from the table.", table.getItem(VERSION_MARKER_PRIMARY_KEY)); } /** From 7003ee4ae4be3caa826d7778237402637ffaaf25 Mon Sep 17 00:00:00 2001 From: Gabor Bota Date: Fri, 11 Oct 2019 12:04:36 +0200 Subject: [PATCH 8/8] removed leftover line from md docs Change-Id: I94046bd08b0d22969a61ed8b6f0c7ec087ab0ac9 --- .../hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md index 276579ce55afe..571f2230cb7b7 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md @@ -1001,8 +1001,6 @@ in an incompatible manner. The version marker in tables exists to support such an option if it ever becomes necessary, by ensuring that all S3Guard client can recognise any version mismatch. -* Table versionin - ## Security All users of the DynamoDB table must have write access to it. This