diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/launcher/ServiceLaunchException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/launcher/ServiceLaunchException.java index 1243a1fabfa50..fdb60236fe645 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/launcher/ServiceLaunchException.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/launcher/ServiceLaunchException.java @@ -78,4 +78,18 @@ public ServiceLaunchException(int exitCode, String format, Object... args) { } } + /** + * Create a formatted exception. + *

+ * This uses {@link String#format(String, Object...)} + * to build the formatted exception in the ENGLISH locale. + * @param exitCode exit code + * @param cause inner cause + * @param format format for message to use in exception + * @param args list of arguments + */ + public ServiceLaunchException(int exitCode, Throwable cause, + String format, Object... args) { + super(exitCode, String.format(Locale.ENGLISH, format, args), cause); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/launcher/ServiceLauncher.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/launcher/ServiceLauncher.java index da91a3d0e6c70..5e8a1f4eb21fb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/launcher/ServiceLauncher.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/launcher/ServiceLauncher.java @@ -200,7 +200,7 @@ public ServiceLauncher(String serviceName, String serviceClassName) { * Get the service. * * Null until - * {@link #coreServiceLaunch(Configuration, List, boolean, boolean)} + * {@link #coreServiceLaunch(Configuration, Service, List, boolean, boolean)} * has completed. * @return the service */ @@ -303,7 +303,7 @@ public void launchServiceAndExit(List args) { exitException = e; noteException(exitException); } - if (exitException.getExitCode() != 0) { + if (exitException.getExitCode() == LauncherExitCodes.EXIT_USAGE) { // something went wrong. Print the usage and commands System.err.println(getUsageMessage()); System.err.println("Command: " + argumentString); @@ -328,8 +328,18 @@ protected void bindCommandOptions() { * @param exitException exception */ void noteException(ExitUtil.ExitException exitException) { - LOG.debug("Exception raised", exitException); - serviceExitCode = exitException.getExitCode(); + int exitCode = exitException.getExitCode(); + if (exitCode != 0) { + LOG.debug("Exception raised with exit code {}", + exitCode, + exitException); + Throwable cause = exitException.getCause(); + if (cause != null) { + // log the nested exception in more detail + LOG.warn("{}", cause.toString(), cause); + } + } + serviceExitCode = exitCode; serviceException = exitException; } @@ -451,17 +461,38 @@ public int loadConfigurationClasses() { * @param execute execute/wait for the service to stop. * @return an exit exception, which will have a status code of 0 if it worked */ - @VisibleForTesting public ExitUtil.ExitException launchService(Configuration conf, List processedArgs, boolean addShutdownHook, boolean execute) { - + return launchService(conf, null, processedArgs, addShutdownHook, execute); + } + + /** + * Launch a service catching all exceptions and downgrading them to exit codes + * after logging. + * + * Sets {@link #serviceException} to this value. + * @param conf configuration to use + * @param instance optional instance of the service. + * @param processedArgs command line after the launcher-specific arguments + * have been stripped out. + * @param addShutdownHook should a shutdown hook be added to terminate + * this service on shutdown. Tests should set this to false. + * @param execute execute/wait for the service to stop. + * @return an exit exception, which will have a status code of 0 if it worked + */ + public ExitUtil.ExitException launchService(Configuration conf, + S instance, + List processedArgs, + boolean addShutdownHook, + boolean execute) { + ExitUtil.ExitException exitException; - + try { - int exitCode = coreServiceLaunch(conf, processedArgs, addShutdownHook, - execute); + int exitCode = coreServiceLaunch(conf, instance, processedArgs, + addShutdownHook, execute); if (service != null) { // check to see if the service failed Throwable failure = service.getFailureCause(); @@ -495,6 +526,12 @@ public ExitUtil.ExitException launchService(Configuration conf, // exit exceptions are passed through unchanged exitException = ee; } catch (Throwable thrown) { + // other errors need a full log. + LOG.error("Exception raised {}", + service != null + ? (service.toString() + " in state " + service.getServiceState()) + : "during service instantiation", + thrown); exitException = convertToExitException(thrown); } noteException(exitException); @@ -514,6 +551,7 @@ public ExitUtil.ExitException launchService(Configuration conf, * {@link #getService()}. * * @param conf configuration + * @param instance optional instance of the service. * @param processedArgs arguments after the configuration parameters * have been stripped out. * @param addShutdownHook should a shutdown hook be added to terminate @@ -530,12 +568,19 @@ public ExitUtil.ExitException launchService(Configuration conf, */ protected int coreServiceLaunch(Configuration conf, + S instance, List processedArgs, boolean addShutdownHook, boolean execute) throws Exception { // create the service instance - instantiateService(conf); + if (instance == null) { + instantiateService(conf); + } else { + // service already exists, so instantiate + configuration = conf; + service = instance; + } ServiceShutdownHook shutdownHook = null; // and the shutdown hook if requested @@ -685,8 +730,7 @@ protected static ExitUtil.ExitException convertToExitException( } // construct the new exception with the original message and // an exit code - exitException = new ServiceLaunchException(exitCode, message); - exitException.initCause(thrown); + exitException = new ServiceLaunchException(exitCode, thrown, message); return exitException; } @@ -917,7 +961,7 @@ protected List parseCommandArgs(Configuration conf, throw new ServiceLaunchException(EXIT_COMMAND_ARGUMENT_ERROR, e); } catch (RuntimeException e) { // lower level issue such as XML parse failure - throw new ServiceLaunchException(EXIT_COMMAND_ARGUMENT_ERROR, + throw new ServiceLaunchException(EXIT_COMMAND_ARGUMENT_ERROR, e, E_PARSE_FAILED + " %s : %s", argString, e); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java index 5fba4bfc2786b..647ffdf9996c5 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java @@ -31,8 +31,10 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.test.LambdaTestUtils; +import static org.apache.commons.lang3.StringUtils.join; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.deleteChildren; @@ -149,14 +151,18 @@ public void testRmRootRecursive() throws Throwable { Path root = new Path("/"); assertIsDirectory(root); Path file = new Path("/testRmRootRecursive"); - ContractTestUtils.touch(getFileSystem(), file); - boolean deleted = getFileSystem().delete(root, true); - assertIsDirectory(root); - LOG.info("rm -rf / result is {}", deleted); - if (deleted) { - assertPathDoesNotExist("expected file to be deleted", file); - } else { - assertPathExists("expected file to be preserved", file);; + try { + ContractTestUtils.touch(getFileSystem(), file); + boolean deleted = getFileSystem().delete(root, true); + assertIsDirectory(root); + LOG.info("rm -rf / result is {}", deleted); + if (deleted) { + assertPathDoesNotExist("expected file to be deleted", file); + } else { + assertPathExists("expected file to be preserved", file); + } + } finally{ + getFileSystem().delete(file, false); } } @@ -185,28 +191,57 @@ public void testListEmptyRootDirectory() throws IOException { for (FileStatus status : statuses) { ContractTestUtils.assertDeleted(fs, status.getPath(), true); } - assertEquals("listStatus on empty root-directory returned a non-empty list", - 0, fs.listStatus(root).length); - assertFalse("listFiles(/, false).hasNext", - fs.listFiles(root, false).hasNext()); - assertFalse("listFiles(/, true).hasNext", - fs.listFiles(root, true).hasNext()); - assertFalse("listLocatedStatus(/).hasNext", - fs.listLocatedStatus(root).hasNext()); + FileStatus[] rootListStatus = fs.listStatus(root); + assertEquals("listStatus on empty root-directory returned found: " + + join("\n", rootListStatus), + 0, rootListStatus.length); + assertNoElements("listFiles(/, false)", + fs.listFiles(root, false)); + assertNoElements("listFiles(/, true)", + fs.listFiles(root, true)); + assertNoElements("listLocatedStatus(/)", + fs.listLocatedStatus(root)); assertIsDirectory(root); } + /** + * Assert that an iterator has no elements; the raised exception + * will include the element list. + * @param operation operation for assertion text. + * @param iter iterator + * @throws IOException failure retrieving the values. + */ + protected void assertNoElements(String operation, + RemoteIterator iter) throws IOException { + List resultList = toList(iter); + if (!resultList.isEmpty()) { + fail("Expected no results from " + operation + ", but got " + + resultList.size() + " elements:\n" + + join(resultList, "\n")); + } + } + @Test public void testSimpleRootListing() throws IOException { describe("test the nonrecursive root listing calls"); FileSystem fs = getFileSystem(); Path root = new Path("/"); FileStatus[] statuses = fs.listStatus(root); + String listStatusResult = join(statuses, "\n"); List locatedStatusList = toList( fs.listLocatedStatus(root)); - assertEquals(statuses.length, locatedStatusList.size()); + String locatedStatusResult = join(locatedStatusList, "\n"); + + assertEquals("listStatus(/) vs listLocatedStatus(/) with \n" + + "listStatus =" + listStatusResult + +" listLocatedStatus = " + locatedStatusResult, + statuses.length, locatedStatusList.size()); List fileList = toList(fs.listFiles(root, false)); - assertTrue(fileList.size() <= statuses.length); + String listFilesResult = join(fileList, "\n"); + assertTrue("listStatus(/) vs listFiles(/, false) with \n" + + "listStatus = " + listStatusResult + + "listFiles = " + listFilesResult, + fileList.size() <= statuses.length); } @Test diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index b4db3a5803ad8..b7b67390aa77c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.io.IOUtils; import org.junit.Assert; -import org.junit.internal.AssumptionViolatedException; +import org.junit.AssumptionViolatedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -457,8 +457,10 @@ public static void rejectRootOperation(Path path) throws IOException { public static FileStatus[] deleteChildren(FileSystem fileSystem, Path path, boolean recursive) throws IOException { + LOG.debug("Deleting children of {} (recursive={})", path, recursive); FileStatus[] children = listChildren(fileSystem, path); for (FileStatus entry : children) { + LOG.debug("Deleting {}", entry); fileSystem.delete(entry.getPath(), recursive); } return children; diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index dbe593d437670..ff330e52dc01e 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -186,7 +186,12 @@ **/ITestS3AHuge*.java **/ITestDynamoDBMetadataStoreScale.java + **/ITestTerasort*.java + + **/ITest*CommitMRJob.java + + **/ITestS3GuardDDBRootOperations.java @@ -215,15 +220,22 @@ - **/ITestS3AContractRootDir.java **/ITestS3AFileContextStatistics.java + **/ITestS3AHuge*.java + **/ITestS3AEncryptionSSEC*.java **/ITestDynamoDBMetadataStoreScale.java + **/ITestTerasort*.java + + **/ITest*CommitMRJob.java + + **/ITestS3AContractRootDir.java + **/ITestS3GuardDDBRootOperations.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java index b62c4569b6e62..54f8fc618c2d0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java @@ -21,6 +21,8 @@ import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -50,6 +52,7 @@ /** * Place for the S3A listing classes; keeps all the small classes under control. */ +@InterfaceAudience.Private public class Listing { private final S3AFileSystem owner; @@ -87,7 +90,7 @@ ProvidedFileStatusIterator createProvidedFileStatusIterator( * @return the iterator * @throws IOException IO Problems */ - FileStatusListingIterator createFileStatusListingIterator( + public FileStatusListingIterator createFileStatusListingIterator( Path listPath, S3ListRequest request, PathFilter filter, @@ -110,7 +113,7 @@ FileStatusListingIterator createFileStatusListingIterator( * @throws IOException IO Problems */ @Retries.RetryRaw - FileStatusListingIterator createFileStatusListingIterator( + public FileStatusListingIterator createFileStatusListingIterator( Path listPath, S3ListRequest request, PathFilter filter, @@ -129,7 +132,7 @@ FileStatusListingIterator createFileStatusListingIterator( * @return a new remote iterator */ @VisibleForTesting - LocatedFileStatusIterator createLocatedFileStatusIterator( + public LocatedFileStatusIterator createLocatedFileStatusIterator( RemoteIterator statusIterator) { return new LocatedFileStatusIterator(statusIterator); } @@ -789,7 +792,7 @@ public boolean accept(FileStatus status) { * Accept all entries except the base path and those which map to S3N * pseudo directory markers. */ - static class AcceptAllButSelfAndS3nDirs implements FileStatusAcceptor { + public static class AcceptAllButSelfAndS3nDirs implements FileStatusAcceptor { /** Base path. */ private final Path qualifiedPath; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 3ae4f1fe1139e..f9148ef47bbfb 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1588,7 +1588,8 @@ public S3AStorageStatistics getStorageStatistics() { * @throws IOException if the retry invocation raises one (it shouldn't). */ @Retries.RetryRaw - protected ObjectMetadata getObjectMetadata(String key) throws IOException { + @VisibleForTesting + ObjectMetadata getObjectMetadata(String key) throws IOException { return getObjectMetadata(key, null, invoker,null); } @@ -2231,6 +2232,11 @@ private boolean innerDelete(S3AFileStatus status, boolean recursive) throws IOException, AmazonClientException { Path f = status.getPath(); LOG.debug("Delete path {} - recursive {}", f, recursive); + LOG.debug("Type = {}", + status.isFile() ? "File" + : (status.isEmptyDirectory() == Tristate.TRUE + ? "Empty Directory" + : "Directory")); String key = pathToKey(f); @@ -2290,7 +2296,7 @@ private boolean innerDelete(S3AFileStatus status, boolean recursive) metadataStore.deleteSubtree(f, ttlTimeProvider); } } else { - LOG.debug("delete: Path is a file"); + LOG.debug("delete: Path is a file: {}", key); deleteObjectAtPath(f, key, true); } @@ -2439,7 +2445,7 @@ public FileStatus[] innerListStatus(Path f) throws FileNotFoundException, * @return the request */ @VisibleForTesting - S3ListRequest createListObjectsRequest(String key, + public S3ListRequest createListObjectsRequest(String key, String delimiter) { return createListObjectsRequest(key, delimiter, null); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ALocatedFileStatus.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ALocatedFileStatus.java index d3ca2610e225b..f78e11c89924a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ALocatedFileStatus.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ALocatedFileStatus.java @@ -60,4 +60,19 @@ public boolean equals(Object o) { public int hashCode() { return super.hashCode(); } + + /** + * Generate an S3AFileStatus instance, including etag and + * version ID, if present. + */ + public S3AFileStatus toS3AFileStatus() { + return new S3AFileStatus( + getLen(), + getModificationTime(), + getPath(), + getBlockSize(), + getOwner(), + getETag(), + getVersionId()); + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 8cdce7b71d928..5b6dae7cdc963 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -538,7 +538,7 @@ public CompleteMultipartUploadResult commitUpload( public BulkOperationState initiateCommitOperation( Path path) throws IOException { return S3Guard.initiateBulkWrite(owner.getMetadataStore(), - BulkOperationState.OperationType.Put, path); + BulkOperationState.OperationType.Commit, path); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardDynamoDBDiagnostic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardDynamoDBDiagnostic.java new file mode 100644 index 0000000000000..83495ca310623 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardDynamoDBDiagnostic.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.service.launcher.AbstractLaunchableService; +import org.apache.hadoop.service.launcher.ServiceLaunchException; + +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_FAIL; +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_USAGE; + +/** + * Entry point for S3Guard diagnostics operations against DynamoDB tables. + */ +public class AbstractS3GuardDynamoDBDiagnostic + extends AbstractLaunchableService { + + private S3AFileSystem filesystem; + + private DynamoDBMetadataStore store; + + private URI uri; + + private List arguments; + + /** + * Constructor. + * @param name entry point name. + */ + public AbstractS3GuardDynamoDBDiagnostic(final String name) { + super(name); + } + + /** + * Constructor. If the store is set then that is the store for the operation, + * otherwise the filesystem's binding is used instead. + * @param name entry point name. + * @param filesystem filesystem + * @param store optional metastore. + * @param uri URI. Must be set if filesystem == null. + */ + public AbstractS3GuardDynamoDBDiagnostic( + final String name, + @Nullable final S3AFileSystem filesystem, + @Nullable final DynamoDBMetadataStore store, + @Nullable final URI uri) { + super(name); + this.store = store; + this.filesystem = filesystem; + if (store == null) { + require(filesystem != null, "No filesystem or URI"); + bindStore(filesystem); + } + if (uri == null) { + require(filesystem != null, "No filesystem or URI"); + setUri(filesystem.getUri()); + } else { + setUri(uri); + } + } + + /** + * Require a condition to hold, otherwise an exception is thrown. + * @param condition condition to be true + * @param error text on failure. + * @throws ServiceLaunchException if the condition is not met + */ + protected static void require(boolean condition, String error) { + if (!condition) { + throw failure(error); + } + } + + /** + * Generate a failure exception for throwing. + * @param message message + * @param ex optional nested exception. + * @return an exception to throw + */ + protected static ServiceLaunchException failure(String message, + Throwable ex) { + return new ServiceLaunchException(EXIT_FAIL, message, ex); + } + + /** + * Generate a failure exception for throwing. + * @param message message + * @return an exception to throw + */ + protected static ServiceLaunchException failure(String message) { + return new ServiceLaunchException(EXIT_FAIL, message); + } + + @Override + public Configuration bindArgs(final Configuration config, + final List args) + throws Exception { + this.arguments = args; + return super.bindArgs(config, args); + } + + /** + * Get the argument list. + * @return the argument list. + */ + protected List getArguments() { + return arguments; + } + + /** + * Bind to the store from a CLI argument. + * @param fsURI filesystem URI + * @throws IOException failure + */ + protected void bindFromCLI(String fsURI) + throws IOException { + Configuration conf = getConfig(); + setUri(fsURI); + FileSystem fs = FileSystem.get(getUri(), conf); + require(fs instanceof S3AFileSystem, + "Not an S3A Filesystem: " + fsURI); + filesystem = (S3AFileSystem) fs; + bindStore(filesystem); + setUri(fs.getUri()); + } + + /** + * Binds the {@link #store} field to the metastore of + * the filesystem -which must have a DDB metastore. + * @param fs filesystem to bind the store to. + */ + private void bindStore(final S3AFileSystem fs) { + require(fs.hasMetadataStore(), + "Filesystem has no metadata store: " + fs.getUri()); + MetadataStore ms = fs.getMetadataStore(); + require(ms instanceof DynamoDBMetadataStore, + "Filesystem " + fs.getUri() + + " does not have a DynamoDB metadata store: " + ms); + store = (DynamoDBMetadataStore) ms; + } + + protected DynamoDBMetadataStore getStore() { + return store; + } + + public S3AFileSystem getFilesystem() { + return filesystem; + } + + public URI getUri() { + return uri; + } + + public void setUri(final URI uri) { + String fsURI = uri.toString(); + if (!fsURI.endsWith("/")) { + setUri(fsURI); + } else { + this.uri = uri; + } + } + + /** + * Set the URI from a string; will add a "/" if needed. + * @param fsURI filesystem URI. + * @throws RuntimeException if the fsURI parameter is not a valid URI. + */ + public void setUri(String fsURI) { + if (fsURI != null) { + if (!fsURI.endsWith("/")) { + fsURI += "/"; + } + try { + setUri(new URI(fsURI)); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Get the list of arguments, after validating the list size. + * @param argMin minimum number of entries. + * @param argMax maximum number of entries. + * @param usage Usage message. + * @return the argument list, which will be in the range. + * @throws ServiceLaunchException if the argument list is not valid. + */ + protected List getArgumentList(final int argMin, + final int argMax, + final String usage) { + List arg = getArguments(); + if (arg == null || arg.size() < argMin || arg.size() > argMax) { + // no arguments: usage message + throw new ServiceLaunchException(EXIT_USAGE, usage); + } + return arg; + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/BulkOperationState.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/BulkOperationState.java index 0fe05db833552..a1185da03165a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/BulkOperationState.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/BulkOperationState.java @@ -78,5 +78,11 @@ public enum OperationType { Rename, /** Pruning: deleting entries and updating parents. */ Prune, + /** Commit operation. */ + Commit, + /** Deletion operation. */ + Delete, + /** FSCK operation. */ + Fsck } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java new file mode 100644 index 0000000000000..7a273a66c2938 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java @@ -0,0 +1,787 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.Deque; +import java.util.List; + +import com.amazonaws.services.dynamodbv2.xspec.ExpressionSpecBuilder; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.Listing; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus; +import org.apache.hadoop.fs.s3a.S3ListRequest; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.service.launcher.LauncherExitCodes; +import org.apache.hadoop.service.launcher.ServiceLaunchException; +import org.apache.hadoop.service.launcher.ServiceLauncher; +import org.apache.hadoop.util.DurationInfo; +import org.apache.hadoop.util.ExitUtil; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.hadoop.fs.s3a.S3AUtils.ACCEPT_ALL; + +/** + * This is a low-level diagnostics entry point which does a CVE/TSV dump of + * the DDB state. + * As it also lists the filesystem, it actually changes the state of the store + * during the operation. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class DumpS3GuardDynamoTable extends AbstractS3GuardDynamoDBDiagnostic { + + private static final Logger LOG = + LoggerFactory.getLogger(DumpS3GuardDynamoTable.class); + + /** + * Application name. + */ + public static final String NAME = "DumpS3GuardDynamoTable"; + + /** + * Usage. + */ + private static final String USAGE_MESSAGE = NAME + + " "; + + /** + * Suffix for the flat list: {@value}. + */ + public static final String FLAT_CSV = "-flat.csv"; + + /** + * Suffix for the raw S3 dump: {@value}. + */ + public static final String RAW_CSV = "-s3.csv"; + + /** + * Suffix for the DDB scan: {@value}. + */ + public static final String SCAN_CSV = "-scan.csv"; + + /** + * Suffix for the second DDB scan: : {@value}. + */ + public static final String SCAN2_CSV = "-scan-2.csv"; + + /** + * Suffix for the treewalk scan of the S3A Filesystem: {@value}. + */ + public static final String TREE_CSV = "-tree.csv"; + + /** + * Suffix for a recursive treewalk through the metastore: {@value}. + */ + public static final String STORE_CSV = "-store.csv"; + + /** + * Path in the local filesystem to save the data. + */ + private String destPath; + + private Pair scanEntryResult; + + private Pair secondScanResult; + + private long rawObjectStoreCount; + + private long listStatusCount; + + private long treewalkCount; + + /** + * Instantiate. + * @param name application name. + */ + public DumpS3GuardDynamoTable(final String name) { + super(name); + } + + /** + * Instantiate with default name. + */ + public DumpS3GuardDynamoTable() { + this(NAME); + } + + /** + * Bind to a specific FS + store. + * @param fs filesystem + * @param store metastore to use + * @param destFile the base filename for output + * @param uri URI of store -only needed if FS is null. + */ + public DumpS3GuardDynamoTable( + final S3AFileSystem fs, + final DynamoDBMetadataStore store, + final File destFile, + final URI uri) { + super(NAME, fs, store, uri); + this.destPath = destFile.getAbsolutePath(); + } + + /** + * Bind to the argument list, including validating the CLI. + * @throws Exception failure. + */ + @Override + protected void serviceStart() throws Exception { + if (getStore() == null) { + List arg = getArgumentList(2, 2, USAGE_MESSAGE); + bindFromCLI(arg.get(0)); + destPath = arg.get(1); + } + } + + /** + * Dump the filesystem and the metastore. + * @return the exit code. + * @throws ServiceLaunchException on failure. + * @throws IOException IO failure. + */ + @Override + public int execute() throws ServiceLaunchException, IOException { + + try { + final File scanFile = new File( + destPath + SCAN_CSV).getCanonicalFile(); + File parentDir = scanFile.getParentFile(); + if (!parentDir.mkdirs() && !parentDir.isDirectory()) { + throw new PathIOException(parentDir.toString(), + "Could not create destination directory"); + } + + try (CsvFile csv = new CsvFile(scanFile); + DurationInfo ignored = new DurationInfo(LOG, + "scanFile dump to %s", scanFile)) { + scanEntryResult = scanMetastore(csv); + } + + if (getFilesystem() != null) { + + Path basePath = getFilesystem().qualify(new Path(getUri())); + + final File destFile = new File(destPath + STORE_CSV) + .getCanonicalFile(); + LOG.info("Writing Store details to {}", destFile); + try (CsvFile csv = new CsvFile(destFile); + DurationInfo ignored = new DurationInfo(LOG, "List metastore")) { + + LOG.info("Base path: {}", basePath); + dumpMetastore(csv, basePath); + } + + // these operations all update the metastore as they list, + // that is: they are side-effecting. + final File treewalkFile = new File(destPath + TREE_CSV) + .getCanonicalFile(); + + try (CsvFile csv = new CsvFile(treewalkFile); + DurationInfo ignored = new DurationInfo(LOG, + "Treewalk to %s", treewalkFile)) { + treewalkCount = treewalkFilesystem(csv, basePath); + } + final File flatlistFile = new File( + destPath + FLAT_CSV).getCanonicalFile(); + + try (CsvFile csv = new CsvFile(flatlistFile); + DurationInfo ignored = new DurationInfo(LOG, + "Flat list to %s", flatlistFile)) { + listStatusCount = listStatusFilesystem(csv, basePath); + } + final File rawFile = new File( + destPath + RAW_CSV).getCanonicalFile(); + + try (CsvFile csv = new CsvFile(rawFile); + DurationInfo ignored = new DurationInfo(LOG, + "Raw dump to %s", rawFile)) { + rawObjectStoreCount = dumpRawS3ObjectStore(csv); + } + final File scanFile2 = new File( + destPath + SCAN2_CSV).getCanonicalFile(); + + try (CsvFile csv = new CsvFile(scanFile); + DurationInfo ignored = new DurationInfo(LOG, + "scanFile dump to %s", scanFile2)) { + secondScanResult = scanMetastore(csv); + } + } + + return LauncherExitCodes.EXIT_SUCCESS; + } catch (IOException | RuntimeException e) { + LOG.error("failure", e); + throw e; + } + } + + /** + * Push all elements of a list to a queue, such that the first entry + * on the list becomes the head of the queue. + * @param queue queue to update + * @param entries list of entries to add. + * @param type of queue + */ + private void pushAll(Deque queue, List entries) { + List reversed = Lists.reverse(entries); + for (T t : reversed) { + queue.push(t); + } + } + + /** + * Dump the filesystem via a treewalk. + * If metastore entries mark directories as deleted, this + * walk will not explore them. + * @param csv destination. + * @param base base path. + * @return number of entries found. + * @throws IOException IO failure. + */ + protected long treewalkFilesystem( + final CsvFile csv, + final Path base) throws IOException { + ArrayDeque queue = new ArrayDeque<>(); + queue.add(base); + long count = 0; + while (!queue.isEmpty()) { + Path path = queue.pop(); + count++; + FileStatus[] fileStatuses; + try { + fileStatuses = getFilesystem().listStatus(path); + } catch (FileNotFoundException e) { + LOG.warn("File {} was not found", path); + continue; + } + // entries + for (FileStatus fileStatus : fileStatuses) { + csv.entry((S3AFileStatus) fileStatus); + } + // scan through the list, building up a reverse list of all directories + // found. + List dirs = new ArrayList<>(fileStatuses.length); + for (FileStatus fileStatus : fileStatuses) { + if (fileStatus.isDirectory() + && !(fileStatus.getPath().equals(path))) { + // directory: add to the end of the queue. + dirs.add(fileStatus.getPath()); + } else { + // file: just increment the count + count++; + } + // now push the dirs list in reverse + // so that they have been added in the sort order as returned. + pushAll(queue, dirs); + } + } + return count; + } + + /** + * Dump the filesystem via a recursive listStatus call. + * @param csv destination. + * @return number of entries found. + * @throws IOException IO failure. + */ + protected long listStatusFilesystem( + final CsvFile csv, + final Path path) throws IOException { + long count = 0; + RemoteIterator iterator = getFilesystem() + .listFilesAndEmptyDirectories(path, true); + while (iterator.hasNext()) { + S3ALocatedFileStatus status = iterator.next(); + csv.entry(status.toS3AFileStatus()); + } + return count; + } + + /** + * Dump the raw S3 Object Store. + * @param csv destination. + * @return number of entries found. + * @throws IOException IO failure. + */ + protected long dumpRawS3ObjectStore( + final CsvFile csv) throws IOException { + S3AFileSystem fs = getFilesystem(); + Path rootPath = fs.qualify(new Path("/")); + Listing listing = new Listing(fs); + S3ListRequest request = fs.createListObjectsRequest("", null); + long count = 0; + RemoteIterator st = + listing.createFileStatusListingIterator(rootPath, request, + ACCEPT_ALL, + new Listing.AcceptAllButSelfAndS3nDirs(rootPath)); + while (st.hasNext()) { + count++; + S3AFileStatus next = st.next(); + LOG.debug("[{}] {}", count, next); + csv.entry(next); + } + LOG.info("entry count: {}", count); + return count; + } + + /** + * list children under the metastore from a base path, through + * a recursive query + walk strategy. + * @param csv dest + * @param basePath base path + * @throws IOException failure. + */ + protected void dumpMetastore(final CsvFile csv, + final Path basePath) throws IOException { + dumpStoreEntries(csv, getStore().listChildren(basePath)); + } + + /** + * Recursive Store Dump. + * @param csv open CSV file. + * @param dir directory listing + * @return (directories, files) + * @throws IOException failure + */ + private Pair dumpStoreEntries( + CsvFile csv, + DirListingMetadata dir) throws IOException { + ArrayDeque queue = new ArrayDeque<>(); + queue.add(dir); + long files = 0, dirs = 1; + while (!queue.isEmpty()) { + DirListingMetadata next = queue.pop(); + List childDirs = new ArrayList<>(); + Collection listing = next.getListing(); + // sort by name + List sorted = new ArrayList<>(listing); + sorted.sort(new PathOrderComparators.PathMetadataComparator( + (l, r) -> l.compareTo(r))); + + for (PathMetadata pmd : sorted) { + DDBPathMetadata ddbMd = (DDBPathMetadata) pmd; + dumpEntry(csv, ddbMd); + if (ddbMd.getFileStatus().isDirectory()) { + childDirs.add(ddbMd); + } else { + files++; + } + } + List childMD = new ArrayList<>(childDirs.size()); + for (DDBPathMetadata childDir : childDirs) { + childMD.add(getStore().listChildren( + childDir.getFileStatus().getPath())); + } + pushAll(queue, childMD); + } + + return Pair.of(dirs, files); + } + + + /** + * Dump a single entry, and log it. + * @param csv CSV output file. + * @param md metadata to log. + */ + private void dumpEntry(CsvFile csv, DDBPathMetadata md) { + LOG.debug("{}", md.prettyPrint()); + csv.entry(md); + } + + /** + * Scan the metastore for all entries and dump them. + * There's no attempt to sort the output. + * @param csv file + * @return tuple of (live entries, tombstones). + */ + private Pair scanMetastore(CsvFile csv) { + S3GuardTableAccess tableAccess = new S3GuardTableAccess(getStore()); + ExpressionSpecBuilder builder = new ExpressionSpecBuilder(); + Iterable results = tableAccess.scanMetadata( + builder); + long live = 0; + long tombstone = 0; + for (DDBPathMetadata md : results) { + if (!(md instanceof S3GuardTableAccess.VersionMarker)) { + // print it + csv.entry(md); + if (md.isDeleted()) { + tombstone++; + } else { + live++; + } + + } + } + return Pair.of(live, tombstone); + } + + public Pair getScanEntryResult() { + return scanEntryResult; + } + + public Pair getSecondScanResult() { + return secondScanResult; + } + + public long getRawObjectStoreCount() { + return rawObjectStoreCount; + } + + public long getListStatusCount() { + return listStatusCount; + } + + public long getTreewalkCount() { + return treewalkCount; + } + + /** + * Convert a timestamp in milliseconds to a human string. + * @param millis epoch time in millis + * @return a string for the CSV file. + */ + private static String stringify(long millis) { + return new Date(millis).toString(); + } + + /** + * This is the JVM entry point for the service launcher. + * + * Converts the arguments to a list, then invokes + * {@link #serviceMain(List, AbstractS3GuardDynamoDBDiagnostic)}. + * @param args command line arguments. + */ + public static void main(String[] args) { + try { + serviceMain(Arrays.asList(args), new DumpS3GuardDynamoTable()); + } catch (ExitUtil.ExitException e) { + ExitUtil.terminate(e); + } + } + + /** + * The real main function, which takes the arguments as a list. + * Argument 0 MUST be the service classname + * @param argsList the list of arguments + * @param service service to launch. + */ + static void serviceMain( + final List argsList, + final AbstractS3GuardDynamoDBDiagnostic service) { + ServiceLauncher serviceLauncher = + new ServiceLauncher<>(service.getName()); + + ExitUtil.ExitException ex = serviceLauncher.launchService( + new Configuration(), + service, + argsList, + false, + true); + if (ex != null) { + throw ex; + } + } + + /** + * Entry point to dump the metastore and s3 store world views + *

+ * Both the FS and the store will be dumped: the store is scanned + * before and after the sequence to show what changes were made to + * the store during the list operation. + * @param fs fs to dump. If null a store must be provided. + * @param store store to dump (fallback to FS) + * @param conf configuration to use (fallback to fs) + * @param destFile base name of the output files. + * @param uri URI of store -only needed if FS is null. + * @throws ExitUtil.ExitException failure. + * @return the store + */ + public static DumpS3GuardDynamoTable dumpStore( + @Nullable final S3AFileSystem fs, + @Nullable DynamoDBMetadataStore store, + @Nullable Configuration conf, + final File destFile, + @Nullable URI uri) throws ExitUtil.ExitException { + ServiceLauncher serviceLauncher = + new ServiceLauncher<>(NAME); + + if (conf == null) { + conf = checkNotNull(fs, "No filesystem").getConf(); + } + if (store == null) { + store = (DynamoDBMetadataStore) checkNotNull(fs, "No filesystem") + .getMetadataStore(); + } + DumpS3GuardDynamoTable dump = new DumpS3GuardDynamoTable(fs, + store, + destFile, + uri); + ExitUtil.ExitException ex = serviceLauncher.launchService( + conf, + dump, + Collections.emptyList(), + false, + true); + if (ex != null && ex.getExitCode() != 0) { + throw ex; + } + LOG.info("Results:"); + Pair r = dump.getScanEntryResult(); + LOG.info("Metastore entries: {}", r); + LOG.info("Metastore scan total {}, entries {}, tombstones {}", + r.getLeft() + r.getRight(), + r.getLeft(), + r.getRight()); + LOG.info("S3 count {}", dump.getRawObjectStoreCount()); + LOG.info("Treewalk Count {}", dump.getTreewalkCount()); + LOG.info("List Status Count {}", dump.getListStatusCount()); + r = dump.getSecondScanResult(); + if (r != null) { + LOG.info("Second metastore scan total {}, entries {}, tombstones {}", + r.getLeft() + r.getRight(), + r.getLeft(), + r.getRight()); + } + return dump; + } + + /** + * Writer for generating test CSV files. + * + * Quotes are manged by passing in a long whose specific bits control + * whether or not a row is quoted, bit 0 for column 0, etc. + * + * There is no escaping of values here. + */ + private static final class CsvFile implements Closeable { + + + /** constant to quote all columns. */ + public static final long ALL_QUOTES = 0x7fffffff; + + /** least significant bit is used for first column; 1 mean 'quote'. */ + public static final int ROW_QUOTE_MAP = 0b1110_1001_1111; + + /** quote nothing: {@value}. */ + public static final long NO_QUOTES = 0; + + private final Path path; + + private final PrintWriter out; + + private final String separator; + + private final String eol; + + private final String quote; + + /** + * Create. + * @param path filesystem path. + * @param out output write. + * @param separator separator of entries. + * @param eol EOL marker. + * @param quote quote marker. + * @throws IOException failure. + */ + private CsvFile( + final Path path, + final PrintWriter out, + final String separator, + final String eol, + final String quote) throws IOException { + this.separator = checkNotNull(separator); + this.eol = checkNotNull(eol); + this.quote = checkNotNull(quote); + this.path = path; + this.out = checkNotNull(out); + header(); + } + + /** + * Create to a file, with UTF-8 output and the standard + * options of the TSV file. + * @param file destination file. + * @throws IOException failure. + */ + private CsvFile(File file) throws IOException { + this(null, + new PrintWriter(file, "UTF-8"), "\t", "\n", "\""); + } + + /** + * Close the file, if not already done. + * @throws IOException on a failure. + */ + @Override + public synchronized void close() throws IOException { + if (out != null) { + out.close(); + } + } + + public Path getPath() { + return path; + } + + public String getSeparator() { + return separator; + } + + public String getEol() { + return eol; + } + + /** + * Write a row. + * Entries are quoted if the bit for that column is true. + * @param quotes quote policy: every bit defines the rule for that element + * @param columns columns to write + * @return self for ease of chaining. + */ + public CsvFile row(long quotes, Object... columns) { + checkNotNull(out); + for (int i = 0; i < columns.length; i++) { + if (i != 0) { + out.write(separator); + } + boolean toQuote = (quotes & 1) == 1; + // unsigned right shift to make next column flag @ position 0 + quotes = quotes >>> 1; + if (toQuote) { + out.write(quote); + } + Object column = columns[i]; + out.write(column != null ? column.toString() : ""); + if (toQuote) { + out.write(quote); + } + } + out.write(eol); + return this; + } + + /** + * Write a line. + * @param line line to print + * @return self for ease of chaining. + */ + public CsvFile line(String line) { + out.write(line); + out.write(eol); + return this; + } + + /** + * Get the output stream. + * @return the stream. + */ + public PrintWriter getOut() { + return out; + } + + /** + * Print the header. + */ + void header() { + row(CsvFile.ALL_QUOTES, + "type", + "deleted", + "path", + "is_auth_dir", + "is_empty_dir", + "len", + "updated", + "updated_s", + "last_modified", + "last_modified_s", + "etag", + "version"); + } + + /** + * Add a metadata entry. + * @param md metadata. + */ + void entry(DDBPathMetadata md) { + S3AFileStatus fileStatus = md.getFileStatus(); + row(ROW_QUOTE_MAP, + fileStatus.isDirectory() ? "dir" : "file", + md.isDeleted(), + fileStatus.getPath().toString(), + md.isAuthoritativeDir(), + md.isEmptyDirectory().name(), + fileStatus.getLen(), + md.getLastUpdated(), + stringify(md.getLastUpdated()), + fileStatus.getModificationTime(), + stringify(fileStatus.getModificationTime()), + fileStatus.getETag(), + fileStatus.getVersionId()); + } + + /** + * filesystem entry: no metadata. + * @param fileStatus file status + */ + void entry(S3AFileStatus fileStatus) { + row(ROW_QUOTE_MAP, + fileStatus.isDirectory() ? "dir" : "file", + "false", + fileStatus.getPath().toString(), + "", + fileStatus.isEmptyDirectory().name(), + fileStatus.getLen(), + "", + "", + fileStatus.getModificationTime(), + stringify(fileStatus.getModificationTime()), + fileStatus.getETag(), + fileStatus.getVersionId()); + } + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java index 9f0631309fd4d..be5de96b504d5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java @@ -107,6 +107,7 @@ import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.BlockingThreadPoolExecutorService; +import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.ReflectionUtils; import static org.apache.hadoop.fs.s3a.Constants.*; @@ -217,6 +218,19 @@ public class DynamoDBMetadataStore implements MetadataStore, public static final Logger LOG = LoggerFactory.getLogger( DynamoDBMetadataStore.class); + /** + * Name of the operations log. + */ + public static final String OPERATIONS_LOG_NAME = + "org.apache.hadoop.fs.s3a.s3guard.Operations"; + + /** + * A log of all state changing operations to the store; + * only updated at debug level. + */ + public static final Logger OPERATIONS_LOG = LoggerFactory.getLogger( + OPERATIONS_LOG_NAME); + /** parent/child name to use in the version marker. */ public static final String VERSION_MARKER = "../VERSION"; @@ -528,14 +542,14 @@ private void initDataAccessRetries(Configuration config) { @Retries.RetryTranslated public void delete(Path path, ITtlTimeProvider ttlTimeProvider) throws IOException { - innerDelete(path, true, ttlTimeProvider); + innerDelete(path, true, ttlTimeProvider, null); } @Override @Retries.RetryTranslated public void forgetMetadata(Path path) throws IOException { LOG.debug("Forget metadata for {}", path); - innerDelete(path, false, null); + innerDelete(path, false, null, null); } /** @@ -546,11 +560,14 @@ public void forgetMetadata(Path path) throws IOException { * @param tombstone flag to create a tombstone marker * @param ttlTimeProvider The time provider to set last_updated. Must not * be null if tombstone is true. + * @param ancestorState ancestor state for logging * @throws IOException I/O error. */ @Retries.RetryTranslated - private void innerDelete(final Path path, boolean tombstone, - ITtlTimeProvider ttlTimeProvider) + private void innerDelete(final Path path, + final boolean tombstone, + final ITtlTimeProvider ttlTimeProvider, + final AncestorState ancestorState) throws IOException { checkPath(path); LOG.debug("Deleting from table {} in region {}: {}", @@ -577,7 +594,7 @@ private void innerDelete(final Path path, boolean tombstone, path.toString(), idempotent, () -> { - LOG.debug("Adding tombstone to {}", path); + logPut(ancestorState, item); recordsWritten(1); table.putItem(item); }); @@ -589,7 +606,7 @@ private void innerDelete(final Path path, boolean tombstone, idempotent, () -> { // record the attempt so even on retry the counter goes up. - LOG.debug("Delete key {}", path); + logDelete(ancestorState, key); recordsDeleted(1); table.deleteItem(key); }); @@ -605,28 +622,35 @@ public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider) tableName, region, path); final PathMetadata meta = get(path); - if (meta == null || meta.isDeleted()) { + if (meta == null) { LOG.debug("Subtree path {} does not exist; this will be a no-op", path); return; } + if (meta.isDeleted()) { + LOG.debug("Subtree path {} is deleted; this will be a no-op", path); + return; + } - // Execute via the bounded threadpool. - final List> futures = new ArrayList<>(); - for (DescendantsIterator desc = new DescendantsIterator(this, meta); - desc.hasNext();) { - final Path pathToDelete = desc.next().getPath(); - futures.add(submit(executor, () -> { - innerDelete(pathToDelete, true, ttlTimeProvider); - return null; - })); - if (futures.size() > S3GUARD_DDB_SUBMITTED_TASK_LIMIT) { - // first batch done; block for completion. - waitForCompletion(futures); - futures.clear(); + try(AncestorState state = new AncestorState(this, + BulkOperationState.OperationType.Delete, path)) { + // Execute via the bounded threadpool. + final List> futures = new ArrayList<>(); + for (DescendantsIterator desc = new DescendantsIterator(this, meta); + desc.hasNext();) { + final Path pathToDelete = desc.next().getPath(); + futures.add(submit(executor, () -> { + innerDelete(pathToDelete, true, ttlTimeProvider, state); + return null; + })); + if (futures.size() > S3GUARD_DDB_SUBMITTED_TASK_LIMIT) { + // first batch done; block for completion. + waitForCompletion(futures); + futures.clear(); + } } + // now wait for the final set. + waitForCompletion(futures); } - // now wait for the final set. - waitForCompletion(futures); } /** @@ -806,7 +830,8 @@ private Collection completeAncestry( final Collection pathsToCreate, final AncestorState ancestorState, final ITtlTimeProvider ttlTimeProvider) throws PathIOException { - List ancestorsToAdd = new ArrayList<>(0); + // Key on path to allow fast lookup + Map ancestry = new HashMap<>(); LOG.debug("Completing ancestry for {} paths", pathsToCreate.size()); // we sort the inputs to guarantee that the topmost entries come first. // that way if the put request contains both parents and children @@ -832,7 +857,7 @@ private Collection completeAncestry( if (!oldEntry.getFileStatus().isDirectory() || !entry.getFileStatus().isDirectory()) { // check for and warn if the existing bulk operation overwrote it. - // this should never occur outside tests explicitly crating it + // this should never occur outside tests explicitly creating it LOG.warn("Overwriting a S3Guard file created in the operation: {}", oldEntry); LOG.warn("With new entry: {}", entry); @@ -846,9 +871,9 @@ private Collection completeAncestry( path, entry); } } - ancestorsToAdd.add(entry); + ancestry.put(path, entry); Path parent = path.getParent(); - while (!parent.isRoot()) { + while (!parent.isRoot() && !ancestry.containsKey(parent)) { if (!ancestorState.findEntry(parent, true)) { // don't add this entry, but carry on with the parents LOG.debug("auto-create ancestor path {} for child path {}", @@ -857,12 +882,12 @@ private Collection completeAncestry( DDBPathMetadata md = new DDBPathMetadata(status, Tristate.FALSE, false, false, ttlTimeProvider.getNow()); ancestorState.put(parent, md); - ancestorsToAdd.add(md); + ancestry.put(parent, md); } parent = parent.getParent(); } } - return ancestorsToAdd; + return ancestry.values(); } /** @@ -936,7 +961,7 @@ public void addAncestors( entryFound = true; if (directory.getFileStatus().isFile()) { throw new PathIOException(parent.toString(), - "Cannot overwrite parent file: metadatstore is" + "Cannot overwrite parent file: metastore is" + " in an inconsistent state"); } // the directory exists. Add it to the ancestor state for next time. @@ -1029,7 +1054,8 @@ public void move( newItems.addAll(tombstones); } - processBatchWriteRequest(null, pathMetadataToItem(newItems)); + processBatchWriteRequest(ancestorState, + null, pathMetadataToItem(newItems)); } /** @@ -1039,13 +1065,17 @@ public void move( *

  • No attempt is made to sort the input: the caller must do that
  • * * As well as retrying on the operation invocation, incomplete - * batches are retried until all have been processed.. + * batches are retried until all have been processed. + * + * @param ancestorState ancestor state for logging * @param keysToDelete primary keys to be deleted; can be null * @param itemsToPut new items to be put; can be null * @return the number of iterations needed to complete the call. */ @Retries.RetryTranslated("Outstanding batch items are updated with backoff") - private int processBatchWriteRequest(PrimaryKey[] keysToDelete, + private int processBatchWriteRequest( + @Nullable AncestorState ancestorState, + PrimaryKey[] keysToDelete, Item[] itemsToPut) throws IOException { final int totalToDelete = (keysToDelete == null ? 0 : keysToDelete.length); final int totalToPut = (itemsToPut == null ? 0 : itemsToPut.length); @@ -1062,8 +1092,10 @@ private int processBatchWriteRequest(PrimaryKey[] keysToDelete, && count < totalToDelete) { numToDelete = Math.min(S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT, totalToDelete - count); - writeItems.withPrimaryKeysToDelete( - Arrays.copyOfRange(keysToDelete, count, count + numToDelete)); + PrimaryKey[] toDelete = Arrays.copyOfRange(keysToDelete, + count, count + numToDelete); + LOG.debug("Deleting {} entries: {}", toDelete.length, toDelete); + writeItems.withPrimaryKeysToDelete(toDelete); count += numToDelete; } @@ -1106,9 +1138,12 @@ private int processBatchWriteRequest(PrimaryKey[] keysToDelete, } if (itemsToPut != null) { recordsWritten(itemsToPut.length); + logPut(ancestorState, itemsToPut); } if (keysToDelete != null) { recordsDeleted(keysToDelete.length); + logDelete(ancestorState, keysToDelete); + } return batches; } @@ -1227,7 +1262,7 @@ private void innerPut( } LOG.debug("Saving batch of {} items to table {}, region {}", items.length, tableName, region); - processBatchWriteRequest(null, items); + processBatchWriteRequest(ancestorState, null, items); } /** @@ -1290,7 +1325,7 @@ List fullPathsToPut(DDBPathMetadata meta, * @return true iff the item isn't null and, if there is an is_deleted * column, that its value is false. */ - private boolean itemExists(Item item) { + private static boolean itemExists(Item item) { if (item == null) { return false; } @@ -1309,7 +1344,8 @@ static S3AFileStatus makeDirStatus(Path f, String owner) { /** * {@inheritDoc}. * There is retry around building the list of paths to update, but - * the call to {@link #processBatchWriteRequest(PrimaryKey[], Item[])} + * the call to + * {@link #processBatchWriteRequest(DynamoDBMetadataStore.AncestorState, PrimaryKey[], Item[])} * is only tried once. * @param meta Directory listing metadata. * @param operationState operational state for a bulk update @@ -1320,15 +1356,17 @@ static S3AFileStatus makeDirStatus(Path f, String owner) { public void put( final DirListingMetadata meta, @Nullable final BulkOperationState operationState) throws IOException { - LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta); - + LOG.debug("Saving {} dir meta for {} to table {} in region {}: {}", + tableName, + meta.isAuthoritative() ? "auth" : "nonauth", + meta.getPath(), + tableName, region, meta); // directory path Path path = meta.getPath(); DDBPathMetadata ddbPathMeta = new DDBPathMetadata(makeDirStatus(path, username), meta.isEmpty(), false, meta.isAuthoritative(), meta.getLastUpdated()); - // put all its ancestors if not present; as an optimization we return at its - // first existent ancestor + // put all its ancestors if not present final AncestorState ancestorState = extractOrCreate(operationState, BulkOperationState.OperationType.Put); // First add any missing ancestors... @@ -1341,7 +1379,9 @@ public void put( // sort so highest-level entries are written to the store first. // if a sequence fails, no orphan entries will have been written. metasToPut.sort(PathOrderComparators.TOPMOST_PM_FIRST); - processBatchWriteRequest(null, pathMetadataToItem(metasToPut)); + processBatchWriteRequest(ancestorState, + null, + pathMetadataToItem(metasToPut)); // and add the ancestors synchronized (ancestorState) { metasToPut.forEach(ancestorState::put); @@ -1455,7 +1495,10 @@ public void prune(PruneMode pruneMode, long cutoff) throws IOException { @Retries.RetryTranslated public void prune(PruneMode pruneMode, long cutoff, String keyPrefix) throws IOException { - LOG.debug("Prune files under {} with age {}", keyPrefix, cutoff); + LOG.debug("Prune {} under {} with age {}", + pruneMode == PruneMode.ALL_BY_MODTIME + ? "files and tombstones" : "tombstones", + keyPrefix, cutoff); final ItemCollection items = expiredFiles(pruneMode, cutoff, keyPrefix); innerPrune(keyPrefix, items); @@ -1465,7 +1508,9 @@ private void innerPrune(String keyPrefix, ItemCollection items) throws IOException { int itemCount = 0; try (AncestorState state = initiateBulkWrite( - BulkOperationState.OperationType.Prune, null)) { + BulkOperationState.OperationType.Prune, null); + DurationInfo ignored = + new DurationInfo(LOG, "Pruning DynamoDB Store")) { ArrayList deletionBatch = new ArrayList<>(S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT); long delay = conf.getTimeDuration( @@ -1478,12 +1523,19 @@ private void innerPrune(String keyPrefix, ItemCollection items) DDBPathMetadata md = PathMetadataDynamoDBTranslation .itemToPathMetadata(item, username); Path path = md.getFileStatus().getPath(); + boolean tombstone = md.isDeleted(); + LOG.debug("Prune entry {}", path); deletionBatch.add(path); - // add parent path of what we remove if it has not - // already been processed + // add parent path of item so it can be marked as non-auth. + // this is only done if + // * it has not already been processed + // * the entry pruned is not a tombstone (no need to update) + // * the file is not in the root dir Path parentPath = path.getParent(); - if (parentPath != null && !clearedParentPathSet.contains(parentPath)) { + if (!tombstone + && parentPath != null + && !clearedParentPathSet.contains(parentPath)) { parentPathSet.add(parentPath); } @@ -1491,7 +1543,7 @@ private void innerPrune(String keyPrefix, ItemCollection items) if (deletionBatch.size() == S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT) { // lowest path entries get deleted first. deletionBatch.sort(PathOrderComparators.TOPMOST_PATH_LAST); - processBatchWriteRequest(pathToKey(deletionBatch), null); + processBatchWriteRequest(state, pathToKey(deletionBatch), null); // set authoritative false for each pruned dir listing removeAuthoritativeDirFlag(parentPathSet, state); @@ -1507,7 +1559,7 @@ private void innerPrune(String keyPrefix, ItemCollection items) } // final batch of deletes if (!deletionBatch.isEmpty()) { - processBatchWriteRequest(pathToKey(deletionBatch), null); + processBatchWriteRequest(state, pathToKey(deletionBatch), null); // set authoritative false for each pruned dir listing removeAuthoritativeDirFlag(parentPathSet, state); @@ -1527,6 +1579,20 @@ private void innerPrune(String keyPrefix, ItemCollection items) /** * Remove the Authoritative Directory Marker from a set of paths, if * those paths are in the store. + *

    + * This operation is onlyfor pruning; it does not raise an error + * if, during the prune phase, the table appears inconsistent. + * This is not unusual as it can happen in a number of ways + *

      + *
    1. The state of the table changes during a slow prune operation which + * deliberately inserts pauses to avoid overloading prepaid IO capacity. + *
    2. + *
    3. Tombstone markers have been left in the table after many other + * operations have taken place, including deleting/replacing + * parents.
    4. + *
    + *

    + * * If an exception is raised in the get/update process, then the exception * is caught and only rethrown after all the other paths are processed. * This is to ensure a best-effort attempt to update the store. @@ -1548,10 +1614,22 @@ private void removeAuthoritativeDirFlag( return null; } DDBPathMetadata ddbPathMetadata = get(path); - if(ddbPathMetadata == null) { + if (ddbPathMetadata == null) { + // there is no entry. + LOG.debug("No parent {}; skipping", path); + return null; + } + if (ddbPathMetadata.isDeleted()) { + // the parent itself is deleted + LOG.debug("Parent has been deleted {}; skipping", path); + return null; + } + if (!ddbPathMetadata.getFileStatus().isDirectory()) { + // the parent itself is deleted + LOG.debug("Parent is not a directory {}; skipping", path); return null; } - LOG.debug("Setting false isAuthoritativeDir on {}", ddbPathMetadata); + LOG.debug("Setting isAuthoritativeDir==false on {}", ddbPathMetadata); ddbPathMetadata.setAuthoritativeDir(false); return ddbPathMetadata; } catch (IOException e) { @@ -2232,14 +2310,14 @@ public RenameTracker initiateRenameOperation( final S3AFileStatus sourceStatus, final Path dest) { return new ProgressiveRenameTracker(storeContext, this, source, dest, - new AncestorState(BulkOperationState.OperationType.Rename, dest)); + new AncestorState(this, BulkOperationState.OperationType.Rename, dest)); } @Override public AncestorState initiateBulkWrite( final BulkOperationState.OperationType operation, final Path dest) { - return new AncestorState(operation, dest); + return new AncestorState(this, operation, dest); } /** @@ -2253,6 +2331,14 @@ private ITtlTimeProvider extractTimeProvider( return ttlTimeProvider != null ? ttlTimeProvider : timeProvider; } + /** + * Username. + * @return the current username + */ + String getUsername() { + return username; + } + /** * Take an {@code IllegalArgumentException} raised by a DDB operation * and if it contains an inner SDK exception, unwrap it. @@ -2295,19 +2381,84 @@ static IOException translateTableWaitFailure( } } + /** + * Log a PUT into the operations log at debug level. + * @param state optional ancestor state. + * @param items items which have been PUT + */ + private static void logPut( + @Nullable AncestorState state, + Item[] items) { + if (OPERATIONS_LOG.isDebugEnabled()) { + // log the operations + String stateStr = AncestorState.stateAsString(state); + for (Item item : items) { + boolean tombstone = itemExists(item); + OPERATIONS_LOG.debug("{} {} {}", + stateStr, + tombstone ? "TOMBSTONE" : "PUT", + itemPrimaryKeyToString(item)); + } + } + } + + /** + * Log a PUT into the operations log at debug level. + * @param state optional ancestor state. + * @param item item PUT. + */ + private static void logPut( + @Nullable AncestorState state, + Item item) { + if (OPERATIONS_LOG.isDebugEnabled()) { + // log the operations + logPut(state, new Item[]{item}); + } + } + + /** + * Log a DELETE into the operations log at debug level. + * @param state optional ancestor state. + * @param keysDeleted keys which were deleted. + */ + private static void logDelete( + @Nullable AncestorState state, + PrimaryKey[] keysDeleted) { + if (OPERATIONS_LOG.isDebugEnabled()) { + // log the operations + String stateStr = AncestorState.stateAsString(state); + for (PrimaryKey key : keysDeleted) { + OPERATIONS_LOG.debug("{} DELETE {}", + stateStr, primaryKeyToString(key)); + } + } + } + + /** + * Log a DELETE into the operations log at debug level. + * @param state optional ancestor state. + * @param key Deleted key + */ + private static void logDelete( + @Nullable AncestorState state, + PrimaryKey key) { + if (OPERATIONS_LOG.isDebugEnabled()) { + logDelete(state, new PrimaryKey[]{key}); + } + } + /** * Get the move state passed in; create a new one if needed. * @param state state. * @param operation the type of the operation to use if the state is created. * @return the cast or created state. */ - @VisibleForTesting - static AncestorState extractOrCreate(@Nullable BulkOperationState state, + private AncestorState extractOrCreate(@Nullable BulkOperationState state, BulkOperationState.OperationType operation) { if (state != null) { return (AncestorState) state; } else { - return new AncestorState(operation, null); + return new AncestorState(this, operation, null); } } @@ -2320,18 +2471,42 @@ static AncestorState extractOrCreate(@Nullable BulkOperationState state, @VisibleForTesting static final class AncestorState extends BulkOperationState { + /** + * Counter of IDs issued. + */ + private static final AtomicLong ID_COUNTER = new AtomicLong(0); + + /** Owning store. */ + private final DynamoDBMetadataStore store; + + /** The ID of the state; for logging. */ + private final long id; + + /** + * Map of ancestors. + */ private final Map ancestry = new HashMap<>(); + /** + * Destination path. + */ private final Path dest; /** * Create the state. + * @param store the store, for use in validation. + * If null: no validation (test only operation) * @param operation the type of the operation. * @param dest destination path. */ - AncestorState(final OperationType operation, @Nullable final Path dest) { + AncestorState( + @Nullable final DynamoDBMetadataStore store, + final OperationType operation, + @Nullable final Path dest) { super(operation); + this.store = store; this.dest = dest; + this.id = ID_COUNTER.addAndGet(1); } int size() { @@ -2342,11 +2517,16 @@ public Path getDest() { return dest; } + long getId() { + return id; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder( "AncestorState{"); sb.append("operation=").append(getOperation()); + sb.append("id=").append(id); sb.append("; dest=").append(dest); sb.append("; size=").append(size()); sb.append("; paths={") @@ -2362,7 +2542,7 @@ public String toString() { * @return true if the state has an entry */ boolean contains(Path p) { - return ancestry.containsKey(p); + return get(p) != null; } DDBPathMetadata put(Path p, DDBPathMetadata md) { @@ -2406,5 +2586,74 @@ boolean findEntry( return false; } } + + /** + * If debug logging is enabled, this does an audit of the store state. + * it only logs this; the error messages are created so as they could + * be turned into exception messages. + * Audit failures aren't being turned into IOEs is that + * rename operations delete the source entry and that ends up in the + * ancestor state as present + * @throws IOException failure + */ + @Override + public void close() throws IOException { + if (LOG.isDebugEnabled() && store != null) { + LOG.debug("Auditing {}", stateAsString(this)); + for (Map.Entry entry : ancestry + .entrySet()) { + Path path = entry.getKey(); + DDBPathMetadata expected = entry.getValue(); + if (expected.isDeleted()) { + // file was deleted in bulk op; we don't care about it + // any more + continue; + } + DDBPathMetadata actual; + try { + actual = store.get(path); + } catch (IOException e) { + LOG.debug("Retrieving {}", path, e); + // this is for debug; don't be ambitious + return; + } + if (actual == null || actual.isDeleted()) { + String message = "Metastore entry for path " + + path + " deleted during bulk " + + getOperation() + " operation"; + LOG.debug(message); + } else { + if (actual.getFileStatus().isDirectory() != + expected.getFileStatus().isDirectory()) { + // the type of the entry has changed + String message = "Metastore entry for path " + + path + " changed during bulk " + + getOperation() + " operation" + + " from " + expected + + " to " + actual; + LOG.debug(message); + } + } + + } + } + } + + /** + * Create a string from the state including operation and ID. + * @param state state to use -may be null + * @return a string for logging. + */ + private static String stateAsString(@Nullable AncestorState state) { + String stateStr; + if (state != null) { + stateStr = String.format("#(%s-%04d)", + state.getOperation(), + state.getId()); + } else { + stateStr = "#()"; + } + return stateStr; + } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java index 397d23aa34323..3767edcbced39 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java @@ -301,7 +301,7 @@ void put(DirListingMetadata meta, * * * - * @param pruneMode + * @param pruneMode Prune Mode * @param cutoff Oldest time to allow (UTC) * @throws IOException if there is an error * @throws UnsupportedOperationException if not implemented @@ -313,7 +313,7 @@ void prune(PruneMode pruneMode, long cutoff) throws IOException, * Same as {@link MetadataStore#prune(PruneMode, long)}, but with an * additional keyPrefix parameter to filter the pruned keys with a prefix. * - * @param pruneMode + * @param pruneMode Prune Mode * @param cutoff Oldest time to allow (UTC) * @param keyPrefix The prefix for the keys that should be removed * @throws IOException if there is an error diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java index 7d4980a06fc29..348dfbfced448 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java @@ -293,11 +293,12 @@ static KeyAttribute pathToParentKeyAttribute(Path path) { @VisibleForTesting public static String pathToParentKey(Path path) { Preconditions.checkNotNull(path); - Preconditions.checkArgument(path.isUriPathAbsolute(), "Path not absolute"); + Preconditions.checkArgument(path.isUriPathAbsolute(), + "Path not absolute: '%s'", path); URI uri = path.toUri(); String bucket = uri.getHost(); Preconditions.checkArgument(!StringUtils.isEmpty(bucket), - "Path missing bucket"); + "Path missing bucket %s", path); String pKey = "/" + bucket + uri.getPath(); // Strip trailing slash @@ -363,4 +364,38 @@ static List pathMetaToDDBPathMeta( .collect(Collectors.toList()); } + /** + * Convert an item's (parent, child) key to a string value + * for logging. There is no validation of the item. + * @param item item. + * @return an s3a:// prefixed string. + */ + static String itemPrimaryKeyToString(Item item) { + String parent = item.getString(PARENT); + String child = item.getString(CHILD); + return "s3a://" + parent + "/" + child; + } + /** + * Convert an item's (parent, child) key to a string value + * for logging. There is no validation of the item. + * @param item item. + * @return an s3a:// prefixed string. + */ + static String primaryKeyToString(PrimaryKey item) { + Collection c = item.getComponents(); + String parent = ""; + String child = ""; + for (KeyAttribute attr : c) { + switch (attr.getName()) { + case PARENT: + parent = attr.getValue().toString(); + break; + case CHILD: + child = attr.getValue().toString(); + break; + default: + } + } + return "s3a://" + parent + "/" + child; + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathOrderComparators.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathOrderComparators.java index a3a7967caf79b..cbf41b4bab099 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathOrderComparators.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathOrderComparators.java @@ -115,12 +115,12 @@ public int compare(final Path pathL, final Path pathR) { /** * Compare on path status. */ - private static final class PathMetadataComparator implements + static final class PathMetadataComparator implements Comparator, Serializable { private final Comparator inner; - private PathMetadataComparator(final Comparator inner) { + PathMetadataComparator(final Comparator inner) { this.inner = inner; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PurgeS3GuardDynamoTable.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PurgeS3GuardDynamoTable.java new file mode 100644 index 0000000000000..244779abb939b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PurgeS3GuardDynamoTable.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import javax.annotation.Nullable; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import com.amazonaws.services.dynamodbv2.xspec.ExpressionSpecBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.service.launcher.LauncherExitCodes; +import org.apache.hadoop.service.launcher.ServiceLaunchException; +import org.apache.hadoop.service.launcher.ServiceLauncher; +import org.apache.hadoop.util.DurationInfo; +import org.apache.hadoop.util.ExitUtil; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.hadoop.fs.s3a.s3guard.DumpS3GuardDynamoTable.serviceMain; +import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.PARENT; + +/** + * Purge the S3Guard table of a FileSystem from all entries related to + * that table. + * Will fail if there is no table, or the store is in auth mode. + *

    + *   hadoop org.apache.hadoop.fs.s3a.s3guard.PurgeS3GuardDynamoTable \
    + *   -force s3a://example-bucket/
    + * 
    + * + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class PurgeS3GuardDynamoTable + extends AbstractS3GuardDynamoDBDiagnostic { + + private static final Logger LOG = + LoggerFactory.getLogger(PurgeS3GuardDynamoTable.class); + + public static final String NAME = "PurgeS3GuardDynamoTable"; + + /** + * Name of the force option. + */ + public static final String FORCE = "-force"; + + /** + * Usage message. + */ + private static final String USAGE_MESSAGE = NAME + + " [-force] "; + + /** + * Flag which actually triggers the delete. + */ + private boolean force; + + private long filesFound; + private long filesDeleted; + + public PurgeS3GuardDynamoTable(final String name) { + super(name); + } + + public PurgeS3GuardDynamoTable() { + this(NAME); + } + + public PurgeS3GuardDynamoTable( + final S3AFileSystem filesystem, + final DynamoDBMetadataStore store, + final URI uri, + final boolean force) { + super(NAME, filesystem, store, uri); + this.force = force; + } + + /** + * Bind to the argument list, including validating the CLI. + * @throws Exception failure. + */ + @Override + protected void serviceStart() throws Exception { + if (getStore() == null) { + List arg = getArgumentList(1, 2, USAGE_MESSAGE); + String fsURI = arg.get(0); + if (arg.size() == 2) { + if (!arg.get(0).equals(FORCE)) { + throw new ServiceLaunchException(LauncherExitCodes.EXIT_USAGE, + USAGE_MESSAGE); + } + force = true; + fsURI = arg.get(1); + } + bindFromCLI(fsURI); + } + } + + /** + * Extract the host from the FS URI, then scan and + * delete all entries from that bucket. + * @return the exit code. + * @throws ServiceLaunchException on failure. + */ + @Override + public int execute() throws ServiceLaunchException { + + URI uri = getUri(); + String host = uri.getHost(); + String prefix = "/" + host + "/"; + DynamoDBMetadataStore ddbms = getStore(); + S3GuardTableAccess tableAccess = new S3GuardTableAccess(ddbms); + ExpressionSpecBuilder builder = new ExpressionSpecBuilder(); + builder.withKeyCondition( + ExpressionSpecBuilder.S(PARENT).beginsWith(prefix)); + + LOG.info("Scanning for entries with prefix {} to delete from {}", + prefix, ddbms); + + Iterable entries = tableAccess.scanMetadata(builder); + List list = new ArrayList<>(); + entries.iterator().forEachRemaining(e -> { + if (!(e instanceof S3GuardTableAccess.VersionMarker)) { + Path p = e.getFileStatus().getPath(); + String type = e.getFileStatus().isFile() ? "file" : "directory"; + boolean tombstone = e.isDeleted(); + if (tombstone) { + type = "tombstone " + type; + } + LOG.info("{} {}", type, p); + list.add(p); + } + }); + int count = list.size(); + filesFound = count; + LOG.info("Found {} entries{}", + count, + (count == 0 ? " -nothing to purge": "")); + if (count > 0) { + if (force) { + DurationInfo duration = + new DurationInfo(LOG, + "deleting %s entries from %s", + count, ddbms.toString()); + tableAccess.delete(list); + duration.close(); + long durationMillis = duration.value(); + long timePerEntry = durationMillis / count; + LOG.info("Time per entry: {} ms", timePerEntry); + filesDeleted = count; + } else { + LOG.info("Delete process will only be executed when " + + FORCE + " is set"); + } + } + return LauncherExitCodes.EXIT_SUCCESS; + } + + /** + * This is the Main entry point for the service launcher. + * + * Converts the arguments to a list, instantiates a instance of the class + * then executes it. + * @param args command line arguments. + */ + public static void main(String[] args) { + try { + serviceMain(Arrays.asList(args), new PurgeS3GuardDynamoTable()); + } catch (ExitUtil.ExitException e) { + ExitUtil.terminate(e); + } + } + + /** + * API Entry point to dump the metastore and S3 store world views + *

    + * Both the FS and the store will be dumped: the store is scanned + * before and after the sequence to show what changes were made to + * the store during the list operation. + * @param fs fs to dump. If null a store must be provided. + * @param store store to dump (fallback to FS) + * @param conf configuration to use (fallback to fs) + * @param uri URI of store -only needed if FS is null. + * @param force force the actual delete + * @return (filesFound, filesDeleted) + * @throws ExitUtil.ExitException failure. + */ + @InterfaceAudience.Private + @InterfaceStability.Unstable + public static Pair purgeStore( + @Nullable final S3AFileSystem fs, + @Nullable DynamoDBMetadataStore store, + @Nullable Configuration conf, + @Nullable URI uri, + boolean force) throws ExitUtil.ExitException { + ServiceLauncher serviceLauncher = + new ServiceLauncher<>(NAME); + + if (conf == null) { + conf = checkNotNull(fs, "No filesystem").getConf(); + } + if (store == null) { + store = (DynamoDBMetadataStore) checkNotNull(fs, "No filesystem") + .getMetadataStore(); + } + PurgeS3GuardDynamoTable purge = new PurgeS3GuardDynamoTable(fs, + store, + uri, + force); + ExitUtil.ExitException ex = serviceLauncher.launchService( + conf, + purge, + Collections.emptyList(), + false, + true); + if (ex != null && ex.getExitCode() != 0) { + throw ex; + } + return Pair.of(purge.filesFound, purge.filesDeleted); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/RenameTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/RenameTracker.java index 76e269e1e3f6e..19d4568d06db9 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/RenameTracker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/RenameTracker.java @@ -94,7 +94,7 @@ public abstract class RenameTracker extends AbstractStoreOperation { * Constructor. * @param name tracker name for logs. * @param storeContext store context. - * @param metadataStore the stopre + * @param metadataStore the store * @param sourceRoot source path. * @param dest destination path. * @param operationState ongoing move state. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java new file mode 100644 index 0000000000000..5592faafe3ebd --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import java.util.Collection; +import java.util.Iterator; + +import com.amazonaws.services.dynamodbv2.document.Item; +import com.amazonaws.services.dynamodbv2.document.ItemCollection; +import com.amazonaws.services.dynamodbv2.document.QueryOutcome; +import com.amazonaws.services.dynamodbv2.document.ScanOutcome; +import com.amazonaws.services.dynamodbv2.document.Table; +import com.amazonaws.services.dynamodbv2.document.internal.IteratorSupport; +import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec; +import com.amazonaws.services.dynamodbv2.xspec.ExpressionSpecBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileStatus; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore.VERSION_MARKER; +import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.CHILD; +import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.PARENT; +import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.TABLE_VERSION; +import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.itemToPathMetadata; + +/** + * Package-scoped accessor to table state in S3Guard. + * This is for maintenance, diagnostics and testing: it is not to + * be used otherwise. + *

      + *
    1. + * Some of the operations here may dramatically alter the state of + * a table, so use carefully. + *
    2. + *
    3. + * Operations to assess consistency of a store are best executed + * against a table which is otherwise inactive. + *
    4. + *
    5. + * No retry/throttling or AWS to IOE logic here. + *
    6. + *
    7. + * If a scan or query includes the version marker in the result, it + * is converted to a {@link VersionMarker} instance. + *
    8. + *
    + * + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +class S3GuardTableAccess { + + private static final Logger LOG = + LoggerFactory.getLogger(S3GuardTableAccess.class); + + /** + * Store instance to work with. + */ + private final DynamoDBMetadataStore store; + + /** + * Table; retrieved from the store. + */ + private final Table table; + + /** + * Construct. + * @param store store to work with. + */ + S3GuardTableAccess(final DynamoDBMetadataStore store) { + this.store = checkNotNull(store); + this.table = checkNotNull(store.getTable()); + } + + /** + * Username of user in store. + * @return a string. + */ + private String getUsername() { + return store.getUsername(); + } + + /** + * Execute a query. + * @param spec query spec. + * @return the outcome. + */ + ItemCollection query(QuerySpec spec) { + return table.query(spec); + } + + /** + * Issue a query where the result is to be an iterator over + * the entries + * of DDBPathMetadata instances. + * @param spec query spec. + * @return an iterator over path entries. + */ + Iterable queryMetadata(QuerySpec spec) { + return new DDBPathMetadataCollection<>(query(spec)); + } + + ItemCollection scan(ExpressionSpecBuilder spec) { + return table.scan(spec.buildForScan()); + } + + Iterable scanMetadata(ExpressionSpecBuilder spec) { + return new DDBPathMetadataCollection<>(scan(spec)); + } + + void delete(Collection paths) { + paths.stream() + .map(PathMetadataDynamoDBTranslation::pathToKey) + .forEach(table::deleteItem); + } + + /** + * A collection which wraps the result of a query or scan. + * Important: iterate through this only once; the outcome + * of repeating an iteration is "undefined" + * @param type of outcome. + */ + private final class DDBPathMetadataCollection + implements Iterable { + + /** + * Query/scan result. + */ + private final ItemCollection outcome; + + /** + * Instantiate. + * @param outcome query/scan outcome. + */ + private DDBPathMetadataCollection(final ItemCollection outcome) { + this.outcome = outcome; + } + + /** + * Get the iterator. + * @return the iterator. + */ + @Override + public Iterator iterator() { + return new DDBPathMetadataIterator<>(outcome.iterator()); + } + + } + + /** + * An iterator which converts the iterated-over result of + * a query or scan into a {@code DDBPathMetadataIterator} entry. + * @param type of source. + */ + private final class DDBPathMetadataIterator implements + Iterator { + + /** + * Iterator to invoke. + */ + private final IteratorSupport it; + + /** + * Instantiate. + * @param it Iterator to invoke. + */ + private DDBPathMetadataIterator(final IteratorSupport it) { + this.it = it; + } + + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public DDBPathMetadata next() { + Item item = it.next(); + Pair key = primaryKey(item); + if (VERSION_MARKER.equals(key.getLeft()) && + VERSION_MARKER.equals(key.getRight())) { + // a version marker is found, return the special type + return new VersionMarker(item); + } else { + return itemToPathMetadata(item, getUsername()); + } + } + + } + + /** + * DDBPathMetadata subclass returned when a query returns + * the version marker. + * There is a FileStatus returned where the owner field contains + * the table version; the path is always the unqualified path "/VERSION". + * Because it is unqualified, operations which treat this as a normal + * DDB metadata entry usually fail. + */ + static final class VersionMarker extends DDBPathMetadata { + + /** + * Instantiate. + * @param versionMarker the version marker. + */ + VersionMarker(Item versionMarker) { + super(new S3AFileStatus(true, new Path("/VERSION"), + "" + versionMarker.getString(TABLE_VERSION))); + } + } + + /** + * Given an item, split it to the parent and child fields. + * @param item item to split. + * @return (parent, child). + */ + private static Pair primaryKey(Item item) { + return Pair.of(item.getString(PARENT), item.getString(CHILD)); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java index c9fd6731a0f40..08beae2fdec10 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java @@ -755,13 +755,7 @@ private long importDir(FileStatus status) throws IOException { located.getOwner()); dirCache.add(child.getPath()); } else { - child = new S3AFileStatus(located.getLen(), - located.getModificationTime(), - located.getPath(), - located.getBlockSize(), - located.getOwner(), - located.getETag(), - located.getVersionId()); + child = located.toS3AFileStatus(); } putParentsIfNotPresent(child, operationState); S3Guard.putWithTtl(getStore(), @@ -1026,11 +1020,15 @@ static class Prune extends S3GuardTool { public static final String PURPOSE = "truncate older metadata from " + "repository " + DATA_IN_S3_IS_PRESERVED;; + + public static final String TOMBSTONE = "tombstone"; + private static final String USAGE = NAME + " [OPTIONS] [s3a://BUCKET]\n" + "\t" + PURPOSE + "\n\n" + "Common options:\n" + " -" + META_FLAG + " URL - Metadata repository details " + "(implementation-specific)\n" + + "[-" + TOMBSTONE + "]\n" + "Age options. Any combination of these integer-valued options:\n" + AGE_OPTIONS_USAGE + "\n" + "Amazon DynamoDB-specific options:\n" + @@ -1041,7 +1039,7 @@ static class Prune extends S3GuardTool { " is not supported."; Prune(Configuration conf) { - super(conf); + super(conf, TOMBSTONE); addAgeOptions(); } @@ -1098,8 +1096,13 @@ public int run(String[] args, PrintStream out) throws keyPrefix = PathMetadataDynamoDBTranslation.pathToParentKey(path); } + MetadataStore.PruneMode mode + = MetadataStore.PruneMode.ALL_BY_MODTIME; + if (getCommandFormat().getOpt(TOMBSTONE)) { + mode = MetadataStore.PruneMode.TOMBSTONES_BY_LASTUPDATED; + } try { - getStore().prune(MetadataStore.PruneMode.ALL_BY_MODTIME, divide, + getStore().prune(mode, divide, keyPrefix); } catch (UnsupportedOperationException e){ errorln("Prune operation not supported in metadata store."); diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md index c2f37483732a2..cb0fd139262df 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md @@ -796,7 +796,7 @@ time" is older than the specified age. ```bash hadoop s3guard prune [-days DAYS] [-hours HOURS] [-minutes MINUTES] - [-seconds SECONDS] [-m URI] ( -region REGION | s3a://BUCKET ) + [-seconds SECONDS] [-tombstone] [-meta URI] ( -region REGION | s3a://BUCKET ) ``` A time value of hours, minutes and/or seconds must be supplied. @@ -807,6 +807,13 @@ in the S3 Bucket. 1. If an S3A URI is supplied, only the entries in the table specified by the URI and older than a specific age are deleted. + +The `-tombstone` option instructs the operation to only purge "tombstones", +markers of deleted files. These tombstone markers are only used briefly, +to indicate that a recently deleted file should not be found in listings. +As a result, there is no adverse consequences in regularly pruning old +tombstones. + Example ```bash @@ -817,18 +824,18 @@ Deletes all entries in the S3Guard table for files older than seven days from the table associated with `s3a://ireland-1`. ```bash -hadoop s3guard prune -days 7 s3a://ireland-1/path_prefix/ +hadoop s3guard prune -tombstone -days 7 s3a://ireland-1/path_prefix/ ``` -Deletes all entries in the S3Guard table for files older than seven days from -the table associated with `s3a://ireland-1` and with the prefix "path_prefix" +Deletes all entries in the S3Guard table for tombstones older than seven days from +the table associated with `s3a://ireland-1` and with the prefix `path_prefix` ```bash hadoop s3guard prune -hours 1 -minutes 30 -meta dynamodb://ireland-team -region eu-west-1 ``` -Delete all entries more than 90 minutes old from the table "ireland-team" in -the region "eu-west-1". +Delete all entries more than 90 minutes old from the table "`ireland-team"` in +the region `eu-west-1`. ### Tune the I/O capacity of the DynamoDB Table, `s3guard set-capacity` diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md index 7a591d99b60eb..c1ce08ac668fc 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md @@ -845,7 +845,7 @@ it can be manually done: hadoop s3guard uploads -abort -force s3a://test-bucket/ * If you don't need it, destroy the S3Guard DDB table. - hadoop s3guard destroy s3a://hwdev-steve-ireland-new/ + hadoop s3guard destroy s3a://test-bucket/ The S3Guard tests will automatically create the Dynamo DB table in runs with `-Ds3guard -Ddynamo` set; default capacity of these buckets @@ -881,7 +881,7 @@ using an absolute XInclude reference to it. ``` -# Failure Injection +## Failure Injection **Warning do not enable any type of failure injection in production. The following settings are for testing only.** @@ -1014,7 +1014,7 @@ The inconsistent client is shipped in the `hadoop-aws` JAR, so it can be used in applications which work with S3 to see how they handle inconsistent directory listings. -## Testing S3Guard +## Testing S3Guard [S3Guard](./s3guard.html) is an extension to S3A which adds consistent metadata listings to the S3A client. As it is part of S3A, it also needs to be tested. @@ -1052,7 +1052,7 @@ The basic strategy for testing S3Guard correctness consists of: No charges are incurred for using this store, and its consistency guarantees are that of the underlying object store instance. -## Testing S3A with S3Guard Enabled +### Testing S3A with S3Guard Enabled All the S3A tests which work with a private repository can be configured to run with S3Guard by using the `s3guard` profile. When set, this will run @@ -1084,13 +1084,14 @@ mvn -T 1C verify -Dtest=skip -Dit.test=ITestS3AMiscOperations -Ds3guard -Ddynamo 1. If the `s3guard` profile is not set, then the S3Guard properties are those of the test configuration set in `contract-test-options.xml` or `auth-keys.xml` -If the `s3guard` profile *is* set, +If the `s3guard` profile *is* set: 1. The S3Guard options from maven (the dynamo and authoritative flags) overwrite any previously set in the configuration files. 1. DynamoDB will be configured to create any missing tables. -1. When using DynamoDB and running ITestDynamoDBMetadataStore, the fs.s3a.s3guard.ddb.test.table -property should be configured, and the name of that table should be different - than what is used for fs.s3a.s3guard.ddb.table. The test table is destroyed +1. When using DynamoDB and running `ITestDynamoDBMetadataStore`, + the `fs.s3a.s3guard.ddb.test.table` +property MUST be configured, and the name of that table MUST be different + than what is used for `fs.s3a.s3guard.ddb.table`. The test table is destroyed and modified multiple times during the test. 1. Several of the tests create and destroy DynamoDB tables. The table names are prefixed with the value defined by @@ -1100,6 +1101,88 @@ property should be configured, and the name of that table should be different incurring AWS charges. +### How to Dump the Table and Metastore State + +There's an unstable entry point to list the contents of a table +and S3 filesystem ot a set of Tab Separated Value files: + +``` +hadoop org.apache.hadoop.fs.s3a.s3guard.DumpS3GuardDynamoTable s3a://bucket/ dir/out +``` + +This generates a set of files prefixed `dir/out-` with different views of the +world which can then be viewed on the command line or editor: + +``` +"type" "deleted" "path" "is_auth_dir" "is_empty_dir" "len" "updated" "updated_s" "last_modified" "last_modified_s" "etag" "version" +"file" "true" "s3a://bucket/fork-0001/test/ITestS3AContractDistCp/testDirectWrite/remote" "false" "UNKNOWN" 0 1562171244451 "Wed Jul 03 17:27:24 BST 2019" 1562171244451 "Wed Jul 03 17:27:24 BST 2019" "" "" +"file" "true" "s3a://bucket/Users/stevel/Projects/hadoop-trunk/hadoop-tools/hadoop-aws/target/test-dir/1/5xlPpalRwv/test/new/newdir/file1" "false" "UNKNOWN" 0 1562171518435 "Wed Jul 03 17:31:58 BST 2019" 1562171518435 "Wed Jul 03 17:31:58 BST 2019" "" "" +"file" "true" "s3a://bucket/Users/stevel/Projects/hadoop-trunk/hadoop-tools/hadoop-aws/target/test-dir/1/5xlPpalRwv/test/new/newdir/subdir" "false" "UNKNOWN" 0 1562171518535 "Wed Jul 03 17:31:58 BST 2019" 1562171518535 "Wed Jul 03 17:31:58 BST 2019" "" "" +"file" "true" "s3a://bucket/test/DELAY_LISTING_ME/testMRJob" "false" "UNKNOWN" 0 1562172036299 "Wed Jul 03 17:40:36 BST 2019" 1562172036299 "Wed Jul 03 17:40:36 BST 2019" "" "" +``` + +This is unstable: the output format may change without warning. +To understand the meaning of the fields, consult the documentation. +They are, currently: + +| field | meaning | source | +|-------|---------| -------| +| `type` | type | filestatus | +| `deleted` | tombstone marker | metadata | +| `path` | path of an entry | filestatus | +| `is_auth_dir` | directory entry authoritative status | metadata | +| `is_empty_dir` | does the entry represent an empty directory | metadata | +| `len` | file length | filestatus | +| `last_modified` | file status last modified | filestatus | +| `last_modified_s` | file status last modified as string | filestatus | +| `updated` | time (millis) metadata was updated | metadata | +| `updated_s` | updated time as a string | metadata | +| `etag` | any etag | filestatus | +| `version` | any version| filestatus | + +Files generated + +| suffix | content | +|---------------|---------| +| `-scan.csv` | Full scan/dump of the metastore | +| `-store.csv` | Recursive walk through the metastore | +| `-tree.csv` | Treewalk through filesystem `listStatus("/")` calls | +| `-flat.csv` | Flat listing through filesystem `listFiles("/", recursive)` | +| `-s3.csv` | Dump of the S3 Store *only* | +| `-scan-2.csv` | Scan of the store after the previous operations | + +Why the two scan entries? The S3A listing and treewalk operations +may add new entries to the metastore/DynamoDB table. + +Note 1: this is unstable; entry list and meaning may change, sorting of output, +the listing algorithm, representation of types, etc. It's expected +uses are: diagnostics, support calls and helping us developers +work out what we've just broken. + +Note 2: This *is* safe to use against an active store; the tables may appear +to be inconsistent due to changes taking place during the dump sequence. + +### Resetting the Metastore: `PurgeS3GuardDynamoTable` + +The `PurgeS3GuardDynamoTable` entry point +`org.apache.hadoop.fs.s3a.s3guard.PurgeS3GuardDynamoTable` can +list all entries in a store for a specific filesystem, and delete them. +It *only* deletes those entries in the store for that specific filesystem, +even if the store is shared. + +```bash +hadoop org.apache.hadoop.fs.s3a.s3guard.PurgeS3GuardDynamoTable \ + -force s3a://bucket/ +``` + +Without the `-force` option the table is scanned, but no entries deleted; +with it then all entries for that filesystem are deleted. +No attempt is made to order the deletion; while the operation is under way +the store is not fully connected (i.e. there may be entries whose parent has +already been deleted). + +Needless to say: *it is not safe to use this against a table in active use.* + ### Scale Testing MetadataStore Directly There are some scale tests that exercise Metadata Store implementations diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRootDir.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRootDir.java index 5c2e2cdf3677a..9ef3763bfc39a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRootDir.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRootDir.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,9 +57,24 @@ protected AbstractFSContract createContract(Configuration conf) { return new S3AContract(conf); } + @Override + public S3AFileSystem getFileSystem() { + return (S3AFileSystem) super.getFileSystem(); + } + + /** + * This is overridden to allow for eventual consistency on listings, + * but only if the store does not have S3Guard protecting it. + */ @Override public void testListEmptyRootDirectory() throws IOException { - for (int attempt = 1, maxAttempts = 10; attempt <= maxAttempts; ++attempt) { + int maxAttempts = 10; + if (getFileSystem().hasMetadataStore()) { + maxAttempts = 1; + } + describe("Listing root directory; for consistency allowing " + + maxAttempts + " attempts"); + for (int attempt = 1; attempt <= maxAttempts; ++attempt) { try { super.testListEmptyRootDirectory(); break; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java index c1942a944353b..b05bbb48bab02 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java @@ -49,8 +49,15 @@ protected AbstractFSContract createContract(Configuration conf) { return new S3AContract(conf); } + @Override + public void setup() throws Exception { + Thread.currentThread().setName("setup"); + super.setup(); + } + @Override public void teardown() throws Exception { + Thread.currentThread().setName("teardown"); super.teardown(); describe("closing file system"); IOUtils.closeStream(getFileSystem()); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardEmptyDirs.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardEmptyDirs.java index fb6e3701d3227..c086597355f47 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardEmptyDirs.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardEmptyDirs.java @@ -18,13 +18,18 @@ package org.apache.hadoop.fs.s3a; +import java.io.FileNotFoundException; +import java.io.IOException; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.s3guard.MetadataStore; import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore; import org.junit.Assume; import org.junit.Test; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome; import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** * Test logic around whether or not a directory is empty, with S3Guard enabled. @@ -37,6 +42,45 @@ */ public class ITestS3GuardEmptyDirs extends AbstractS3ATestBase { + @Test + public void testRenameEmptyDir() throws Throwable { + S3AFileSystem fs = getFileSystem(); + Path basePath = path(getMethodName()); + Path sourceDir = new Path(basePath, "AAA-source"); + String sourceDirMarker = fs.pathToKey(sourceDir) + "/"; + Path destDir = new Path(basePath, "BBB-dest"); + String destDirMarker = fs.pathToKey(destDir) + "/"; + // set things up. + mkdirs(sourceDir); + // there'a source directory marker + fs.getObjectMetadata(sourceDirMarker); + S3AFileStatus srcStatus = getEmptyDirStatus(sourceDir); + assertEquals("Must be an empty dir: " + srcStatus, Tristate.TRUE, + srcStatus.isEmptyDirectory()); + // do the rename + assertRenameOutcome(fs, sourceDir, destDir, true); + S3AFileStatus destStatus = getEmptyDirStatus(destDir); + assertEquals("Must be an empty dir: " + destStatus, Tristate.TRUE, + destStatus.isEmptyDirectory()); + // source does not exist. + intercept(FileNotFoundException.class, + () -> getEmptyDirStatus(sourceDir)); + // and verify that there's no dir marker hidden under a tombstone + intercept(FileNotFoundException.class, + () -> Invoker.once("HEAD", sourceDirMarker, + () -> fs.getObjectMetadata(sourceDirMarker))); + // the parent dir mustn't be confused + S3AFileStatus baseStatus = getEmptyDirStatus(basePath); + assertEquals("Must not be an empty dir: " + baseStatus, Tristate.FALSE, + baseStatus.isEmptyDirectory()); + // and verify the dest dir has a marker + fs.getObjectMetadata(destDirMarker); + } + + private S3AFileStatus getEmptyDirStatus(Path dir) throws IOException { + return getFileSystem().innerGetFileStatus(dir, true); + } + @Test public void testEmptyDirs() throws Exception { S3AFileSystem fs = getFileSystem(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java index 10ebacdbed815..1503fef98ff90 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.contract.s3a.S3AContract; import com.google.common.collect.Lists; +import org.assertj.core.api.Assertions; import org.junit.Assume; import org.junit.Test; @@ -67,6 +68,14 @@ public void setup() throws Exception { invoker = new Invoker(new S3ARetryPolicy(getConfiguration()), Invoker.NO_OP ); + Assume.assumeTrue("No metadata store in test filesystem", + getFileSystem().hasMetadataStore()); + } + + @Override + public void teardown() throws Exception { + clearInconsistency(getFileSystem()); + super.teardown(); } @Override @@ -95,7 +104,7 @@ protected AbstractFSContract createContract(Configuration conf) { private void doTestRenameSequence(Path[] mkdirs, Path[] srcdirs, Path[] dstdirs, Path[] yesdirs, Path[] nodirs) throws Exception { S3AFileSystem fs = getFileSystem(); - Assume.assumeTrue(fs.hasMetadataStore()); + if (mkdirs != null) { for (Path mkdir : mkdirs) { @@ -104,8 +113,8 @@ private void doTestRenameSequence(Path[] mkdirs, Path[] srcdirs, clearInconsistency(fs); } - assertTrue("srcdirs and dstdirs must have equal length", - srcdirs.length == dstdirs.length); + assertEquals("srcdirs and dstdirs must have equal length", + srcdirs.length, dstdirs.length); for (int i = 0; i < srcdirs.length; i++) { assertTrue("Rename returned false: " + srcdirs[i] + " -> " + dstdirs[i], fs.rename(srcdirs[i], dstdirs[i])); @@ -119,6 +128,21 @@ private void doTestRenameSequence(Path[] mkdirs, Path[] srcdirs, } } + /** + * Delete an array of paths; log exceptions. + * @param paths paths to delete + */ + private void deletePathsQuietly(Path...paths) { + for (Path dir : paths) { + try { + getFileSystem().delete(dir, true); + } catch (IOException e) { + LOG.info("Failed to delete {}: {}", dir, e.toString()); + LOG.debug("Delete failure:, e"); + } + } + } + /** * Tests that after renaming a directory, the original directory and its * contents are indeed missing and the corresponding new paths are visible. @@ -126,19 +150,23 @@ private void doTestRenameSequence(Path[] mkdirs, Path[] srcdirs, */ @Test public void testConsistentListAfterRename() throws Exception { - Path[] mkdirs = { - path("d1/f"), - path("d1/f" + DEFAULT_DELAY_KEY_SUBSTRING) - }; - Path[] srcdirs = {path("d1")}; - Path[] dstdirs = {path("d2")}; - Path[] yesdirs = {path("d2"), path("d2/f"), - path("d2/f" + DEFAULT_DELAY_KEY_SUBSTRING)}; - Path[] nodirs = {path("d1"), path("d1/f"), - path("d1/f" + DEFAULT_DELAY_KEY_SUBSTRING)}; - doTestRenameSequence(mkdirs, srcdirs, dstdirs, yesdirs, nodirs); - getFileSystem().delete(path("d1"), true); - getFileSystem().delete(path("d2"), true); + Path d1f = path("d1/f"); + Path d1f2 = path("d1/f" + DEFAULT_DELAY_KEY_SUBSTRING); + Path[] mkdirs = {d1f, d1f2}; + Path d1 = path("d1"); + Path[] srcdirs = {d1}; + Path d2 = path("d2"); + Path[] dstdirs = {d2}; + Path d2f2 = path("d2/f" + DEFAULT_DELAY_KEY_SUBSTRING); + Path[] yesdirs = {d2, path("d2/f"), d2f2}; + Path[] nodirs = { + d1, d1f, d1f2}; + try { + doTestRenameSequence(mkdirs, srcdirs, dstdirs, yesdirs, nodirs); + } finally { + clearInconsistency(getFileSystem()); + deletePathsQuietly(d1, d2, d1f, d1f2, d2f2); + } } /** @@ -157,18 +185,23 @@ public void testRollingRenames() throws Exception { Path[] setB = {dir2[0], dir1[0]}; Path[] setC = {dir0[0], dir2[0]}; - for(int i = 0; i < 2; i++) { - Path[] firstSet = i == 0 ? setA : null; - doTestRenameSequence(firstSet, setA, setB, setB, dir0); - doTestRenameSequence(null, setB, setC, setC, dir1); - doTestRenameSequence(null, setC, setA, setA, dir2); - } + try { + for(int i = 0; i < 2; i++) { + Path[] firstSet = i == 0 ? setA : null; + doTestRenameSequence(firstSet, setA, setB, setB, dir0); + doTestRenameSequence(null, setB, setC, setC, dir1); + doTestRenameSequence(null, setC, setA, setA, dir2); + } - S3AFileSystem fs = getFileSystem(); - assertFalse("Renaming deleted file should have failed", - fs.rename(dir2[0], dir1[0])); - assertTrue("Renaming over existing file should have succeeded", - fs.rename(dir1[0], dir0[0])); + S3AFileSystem fs = getFileSystem(); + assertFalse("Renaming deleted file should have failed", + fs.rename(dir2[0], dir1[0])); + assertTrue("Renaming over existing file should have succeeded", + fs.rename(dir1[0], dir0[0])); + } finally { + clearInconsistency(getFileSystem()); + deletePathsQuietly(dir0[0], dir1[0], dir2[0]); + } } /** @@ -179,9 +212,6 @@ public void testRollingRenames() throws Exception { @Test public void testConsistentListAfterDelete() throws Exception { S3AFileSystem fs = getFileSystem(); - // test will fail if NullMetadataStore (the default) is configured: skip it. - Assume.assumeTrue("FS needs to have a metadatastore.", - fs.hasMetadataStore()); // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed // in listObjects() results via InconsistentS3Client @@ -223,8 +253,6 @@ public void testConsistentListAfterDelete() throws Exception { @Test public void testConsistentRenameAfterDelete() throws Exception { S3AFileSystem fs = getFileSystem(); - // test will fail if NullMetadataStore (the default) is configured: skip it. - Assume.assumeTrue(fs.hasMetadataStore()); // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed // in listObjects() results via InconsistentS3Client @@ -266,10 +294,6 @@ public void testConsistentListStatusAfterPut() throws Exception { S3AFileSystem fs = getFileSystem(); - // This test will fail if NullMetadataStore (the default) is configured: - // skip it. - Assume.assumeTrue(fs.hasMetadataStore()); - // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed // in listObjects() results via InconsistentS3Client Path inconsistentPath = @@ -301,9 +325,6 @@ public void testConsistentListStatusAfterPut() throws Exception { @Test public void testConsistentListLocatedStatusAfterPut() throws Exception { final S3AFileSystem fs = getFileSystem(); - // This test will fail if NullMetadataStore (the default) is configured: - // skip it. - Assume.assumeTrue(fs.hasMetadataStore()); String rootDir = "doTestConsistentListLocatedStatusAfterPut"; fs.mkdirs(path(rootDir)); @@ -368,9 +389,6 @@ private void doTestConsistentListLocatedStatusAfterPut(S3AFileSystem fs, @Test public void testConsistentListFiles() throws Exception { final S3AFileSystem fs = getFileSystem(); - // This test will fail if NullMetadataStore (the default) is configured: - // skip it. - Assume.assumeTrue(fs.hasMetadataStore()); final int[] numOfPaths = {0, 2}; for (int dirNum : numOfPaths) { @@ -480,7 +498,6 @@ private static void verifyFileIsListed(Collection listedFiles, @Test public void testCommitByRenameOperations() throws Throwable { S3AFileSystem fs = getFileSystem(); - Assume.assumeTrue(fs.hasMetadataStore()); Path work = path("test-commit-by-rename-" + DEFAULT_DELAY_KEY_SUBSTRING); Path task00 = new Path(work, "task00"); fs.mkdirs(task00); @@ -564,10 +581,10 @@ public void testInconsistentS3ClientDeletes() throws Throwable { @Test public void testListingReturnsVersionMetadata() throws Throwable { S3AFileSystem fs = getFileSystem(); - Assume.assumeTrue(fs.hasMetadataStore()); // write simple file - Path file = path("file1"); + Path parent = path(getMethodName()); + Path file = new Path(parent, "file1"); try (FSDataOutputStream outputStream = fs.create(file)) { outputStream.writeChars("hello"); } @@ -577,22 +594,26 @@ public void testListingReturnsVersionMetadata() throws Throwable { assertEquals(1, fileStatuses.length); S3AFileStatus status = (S3AFileStatus) fileStatuses[0]; String eTag = status.getETag(); + assertNotNull("Etag in " + eTag, eTag); String versionId = status.getVersionId(); // get status through recursive directory listing RemoteIterator filesIterator = fs.listFiles( - file.getParent(), true); + parent, true); List files = Lists.newArrayList(); while (filesIterator.hasNext()) { files.add(filesIterator.next()); } - assertEquals(1, files.size()); + Assertions.assertThat(files) + .hasSize(1); // ensure eTag and versionId are preserved in directory listing S3ALocatedFileStatus locatedFileStatus = (S3ALocatedFileStatus) files.get(0); - assertEquals(eTag, locatedFileStatus.getETag()); - assertEquals(versionId, locatedFileStatus.getVersionId()); + assertEquals("etag of " + locatedFileStatus, + eTag, locatedFileStatus.getETag()); + assertEquals("versionID of " + locatedFileStatus, + versionId, locatedFileStatus.getVersionId()); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java index c5e0265b964bd..ea3fd84ffdafe 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java @@ -36,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileSystem; @@ -51,6 +52,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.DurationInfo; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID; /** @@ -61,6 +63,13 @@ public abstract class AbstractITCommitMRJob extends AbstractYarnClusterITest { private static final Logger LOG = LoggerFactory.getLogger(AbstractITCommitMRJob.class); + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + disableFilesystemCaching(conf); + return conf; + } + @Rule public final TemporaryFolder temp = new TemporaryFolder(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java index 942f0b6658077..93b48e772dd87 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java @@ -48,8 +48,6 @@ import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.fs.s3a.s3guard.MetadataStore; -import org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation; import org.apache.hadoop.util.BlockingThreadPoolExecutorService; import org.apache.hadoop.util.DurationInfo; @@ -286,23 +284,6 @@ public void teardown() throws Exception { super.teardown(); } - /** - * Directory cleanup includes pruning everything under the path. - * This ensures that any in the tree from failed tests don't fill up - * the store with many, many, deleted entries. - * @throws IOException failure. - */ - @Override - protected void deleteTestDirInTeardown() throws IOException { - super.deleteTestDirInTeardown(); - Path path = getContract().getTestPath(); - try { - prune(path); - } catch (IOException e) { - LOG.warn("When pruning the test directory {}", path, e); - } - } - private void assumeRoleTests() { assume("No ARN for role tests", !getAssumedRoleARN().isEmpty()); } @@ -692,10 +673,6 @@ public void testPartialDirDelete() throws Throwable { Assertions.assertThat(readOnlyListing) .as("ReadOnly directory " + directoryList) .containsAll(readOnlyFiles); - - // do this prune in the test as well as teardown, so that the test - // reporting includes it in the runtime of a successful run. - prune(basePath); } /** @@ -753,25 +730,6 @@ private void pathMustExist(Path p) { eval(() -> assertPathExists("Missing path", p)); } - /** - * Prune the store for everything under the test path. - * @param path path. - * @throws IOException on failure. - */ - private void prune(Path path) throws IOException { - S3AFileSystem fs = getFileSystem(); - if (fs.hasMetadataStore()) { - MetadataStore store = fs.getMetadataStore(); - try (DurationInfo ignored = - new DurationInfo(LOG, true, "prune %s", path)) { - store.prune( - MetadataStore.PruneMode.ALL_BY_MODTIME, - System.currentTimeMillis(), - PathMetadataDynamoDBTranslation.pathToParentKey(fs.qualify(path))); - } - } - } - /** * List all files under a path. * @param path path to list diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java index eaaa50c1c0f3a..b918500ef98d6 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java @@ -105,7 +105,7 @@ protected static void expectResult(int expected, * @return the output of any successful run * @throws Exception failure */ - protected static String expectSuccess( + public static String expectSuccess( String message, S3GuardTool tool, String... args) throws Exception { @@ -322,6 +322,19 @@ public void testPruneCommandCLI() throws Exception { testPath.toString()); } + @Test + public void testPruneCommandTombstones() throws Exception { + Path testPath = path("testPruneCommandTombstones"); + getFileSystem().mkdirs(testPath); + getFileSystem().delete(testPath, true); + S3GuardTool.Prune cmd = new S3GuardTool.Prune(getFileSystem().getConf()); + cmd.setMetadataStore(ms); + exec(cmd, + "prune", "-" + S3GuardTool.Prune.TOMBSTONE, + "-seconds", "0", + testPath.toString()); + } + @Test public void testPruneCommandConf() throws Exception { getConfiguration().setLong(Constants.S3GUARD_CLI_PRUNE_AGE, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java index 03ebe1ee76a5b..68d39dcdb6dde 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java @@ -18,9 +18,14 @@ package org.apache.hadoop.fs.s3a.s3guard; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStreamReader; import java.net.URI; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -41,6 +46,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.contract.s3a.S3AContract; import org.apache.hadoop.fs.s3a.Constants; @@ -94,6 +100,8 @@ */ public class ITestDynamoDBMetadataStore extends MetadataStoreTestBase { + public static final int MINUTE = 60_000; + public ITestDynamoDBMetadataStore() { super(); } @@ -281,22 +289,6 @@ private void deleteAllMetadata() throws IOException { public static void deleteMetadataUnderPath(final DynamoDBMetadataStore ms, final Path path, final boolean suppressErrors) throws IOException { ThrottleTracker throttleTracker = new ThrottleTracker(ms); - try (DurationInfo ignored = new DurationInfo(LOG, true, "prune")) { - ms.prune(PruneMode.ALL_BY_MODTIME, - System.currentTimeMillis(), - PathMetadataDynamoDBTranslation.pathToParentKey(path)); - LOG.info("Throttle statistics: {}", throttleTracker); - } catch (FileNotFoundException fnfe) { - // there is no table. - return; - } catch (IOException ioe) { - // prune failed. warn and then fall back to forget. - LOG.warn("Failed to prune {}", path, ioe); - if (!suppressErrors) { - throw ioe; - } - } - // and after the pruning, make sure all other metadata is gone int forgotten = 0; try (DurationInfo ignored = new DurationInfo(LOG, true, "forget")) { PathMetadata meta = ms.get(path); @@ -921,43 +913,6 @@ private DDBPathMetadata verifyInAncestor(AncestorState state, return md; } - @Test - public void testProvisionTable() throws Exception { - final String tableName - = getTestTableName("testProvisionTable-" + UUID.randomUUID()); - final Configuration conf = getTableCreationConfig(); - conf.set(S3GUARD_DDB_TABLE_NAME_KEY, tableName); - conf.setInt(S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY, 2); - conf.setInt(S3GUARD_DDB_TABLE_CAPACITY_READ_KEY, 2); - DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore(); - try { - ddbms.initialize(conf); - DynamoDB dynamoDB = ddbms.getDynamoDB(); - final DDBCapacities oldProvision = DDBCapacities.extractCapacities( - dynamoDB.getTable(tableName).describe().getProvisionedThroughput()); - Assume.assumeFalse("Table is on-demand", oldProvision.isOnDemandTable()); - long desiredReadCapacity = oldProvision.getRead() - 1; - long desiredWriteCapacity = oldProvision.getWrite() - 1; - ddbms.provisionTable(desiredReadCapacity, - desiredWriteCapacity); - ddbms.initTable(); - // we have to wait until the provisioning settings are applied, - // so until the table is ACTIVE again and not in UPDATING - ddbms.getTable().waitForActive(); - final DDBCapacities newProvision = DDBCapacities.extractCapacities( - dynamoDB.getTable(tableName).describe().getProvisionedThroughput()); - assertEquals("Check newly provisioned table read capacity units.", - desiredReadCapacity, - newProvision.getRead()); - assertEquals("Check newly provisioned table write capacity units.", - desiredWriteCapacity, - newProvision.getWrite()); - } finally { - ddbms.destroy(); - ddbms.close(); - } - } - @Test public void testDeleteTable() throws Exception { final String tableName = getTestTableName("testDeleteTable"); @@ -1107,7 +1062,7 @@ public void testPruneAgainstInvalidTable() throws Throwable { describe("Create an Invalid listing and prune it"); DynamoDBMetadataStore ms = ITestDynamoDBMetadataStore.ddbmsStatic; - String base = "/testPruneAgainstInvalidTable"; + String base = "/" + getMethodName(); String subdir = base + "/subdir"; Path subDirPath = strToPath(subdir); createNewDirs(base, subdir); @@ -1125,13 +1080,9 @@ public void testPruneAgainstInvalidTable() throws Throwable { // over the subdirectory long now = getTime(); - long oldTime = now - 60_000; + long oldTime = now - MINUTE; putFile(subdir, oldTime, null); - final DDBPathMetadata subDirAsFile = ms.get(subDirPath); - - Assertions.assertThat(subDirAsFile.getFileStatus().isFile()) - .describedAs("Subdirectory entry %s is now file", subDirMetadataOrig) - .isTrue(); + getFile(subdir); Path basePath = strToPath(base); DirListingMetadata listing = ms.listChildren(basePath); @@ -1147,13 +1098,13 @@ public void testPruneAgainstInvalidTable() throws Throwable { Assertions.assertThat(status.isFile()) .as("Entry %s", (Object)pm) .isTrue(); - DDBPathMetadata subFilePm = checkNotNull(ms.get(subFilePath)); - LOG.info("Pruning"); + getNonNull(subFile); + LOG.info("Pruning"); // now prune ms.prune(PruneMode.ALL_BY_MODTIME, - now + 60_000, subdir); - DDBPathMetadata prunedFile = ms.get(subFilePath); + now + MINUTE, subdir); + ms.get(subFilePath); final PathMetadata subDirMetadataFinal = getNonNull(subdir); @@ -1165,8 +1116,8 @@ public void testPruneAgainstInvalidTable() throws Throwable { @Test public void testPutFileDirectlyUnderTombstone() throws Throwable { - describe("Put a file under a tombstone"); - String base = "/testPutFileDirectlyUnderTombstone"; + describe("Put a file under a tombstone; verify the tombstone"); + String base = "/" + getMethodName(); long now = getTime(); putTombstone(base, now, null); PathMetadata baseMeta1 = get(base); @@ -1175,35 +1126,114 @@ public void testPutFileDirectlyUnderTombstone() throws Throwable { .isTrue(); String child = base + "/file"; putFile(child, now, null); - PathMetadata baseMeta2 = get(base); - Assertions.assertThat(baseMeta2.isDeleted()) - .as("Metadata %s", baseMeta2) - .isFalse(); + getDirectory(base); + } + + @Test + public void testPruneTombstoneUnderTombstone() throws Throwable { + describe("Put a tombsteone under a tombstone, prune the pair"); + String base = "/" + getMethodName(); + long now = getTime(); + String dir = base + "/dir"; + putTombstone(dir, now, null); + assertIsTombstone(dir); + // parent dir is created + assertCached(base); + String child = dir + "/file"; + String child2 = dir + "/file2"; + + // this will actually mark the parent as a dir, + // so that lists of that dir will pick up the tombstone + putTombstone(child, now, null); + getDirectory(dir); + // tombstone the dir + putTombstone(dir, now, null); + // add another child entry; this will update the dir entry from being + // tombstone to dir + putFile(child2, now, null); + getDirectory(dir); + + // put a tombstone over the directory again + putTombstone(dir, now, null); + // verify + assertIsTombstone(dir); + + //prune all tombstones + getDynamoMetadataStore().prune(PruneMode.TOMBSTONES_BY_LASTUPDATED, + now + MINUTE); + + // the child is gone + assertNotFound(child); + + // *AND* the parent dir has not been created + assertNotFound(dir); + + // the child2 entry is still there, though it's now orphan (the store isn't + // meeting the rule "all entries must have a parent which exists" + getFile(child2); + // a full prune will still find and delete it, as this + // doesn't walk the tree + getDynamoMetadataStore().prune(PruneMode.ALL_BY_MODTIME, + now + MINUTE); + assertNotFound(child2); + assertNotFound(dir); } + @Test + public void testPruneFileUnderTombstone() throws Throwable { + describe("Put a file under a tombstone, prune the pair"); + String base = "/" + getMethodName(); + long now = getTime(); + String dir = base + "/dir"; + putTombstone(dir, now, null); + assertIsTombstone(dir); + // parent dir is created + assertCached(base); + String child = dir + "/file"; + + // this will actually mark the parent as a dir, + // so that lists of that dir will pick up the tombstone + putFile(child, now, null); + // dir is reinstated + getDirectory(dir); + + // put a tombstone + putTombstone(dir, now, null); + // prune all entries + getDynamoMetadataStore().prune(PruneMode.ALL_BY_MODTIME, + now + MINUTE); + // the child is gone + assertNotFound(child); + + // *AND* the parent dir has not been created + assertNotFound(dir); + } + + /** + * Keep in sync with code changes in S3AFileSystem.finishedWrite() so that + * the production code can be tested here. + */ @Test public void testPutFileDeepUnderTombstone() throws Throwable { describe("Put a file two levels under a tombstone"); - String base = "/testPutFileDeepUnderTombstone"; - String subdir = base + "/subdir"; + String base = "/" + getMethodName(); + String dir = base + "/dir"; long now = getTime(); // creating a file MUST create its parents - String child = subdir + "/file"; + String child = dir + "/file"; Path childPath = strToPath(child); putFile(child, now, null); getFile(child); - getDirectory(subdir); + getDirectory(dir); getDirectory(base); // now put the tombstone putTombstone(base, now, null); - PathMetadata baseMeta1 = getNonNull(base); - Assertions.assertThat(baseMeta1.isDeleted()) - .as("Metadata %s", baseMeta1) - .isTrue(); - - // this is the same ordering as S3FileSystem.finishedWrite() + assertIsTombstone(base); + /*- --------------------------------------------*/ + /* Begin S3FileSystem.finishedWrite() sequence. */ + /* ---------------------------------------------*/ AncestorState ancestorState = getDynamoMetadataStore() .initiateBulkWrite(BulkOperationState.OperationType.Put, childPath); @@ -1213,8 +1243,103 @@ public void testPutFileDeepUnderTombstone() throws Throwable { ancestorState); // now write the file again. putFile(child, now, ancestorState); + /* -------------------------------------------*/ + /* End S3FileSystem.finishedWrite() sequence. */ + /* -------------------------------------------*/ + + getFile(child); // the ancestor will now exist. + getDirectory(dir); getDirectory(base); } + @Test + public void testDumpTable() throws Throwable { + describe("Dump the table contents, but not the S3 Store"); + String target = System.getProperty("test.build.dir", "target"); + File buildDir = new File(target).getAbsoluteFile(); + String name = "ITestDynamoDBMetadataStore"; + File destFile = new File(buildDir, name); + DumpS3GuardDynamoTable.dumpStore( + null, + ddbmsStatic, + getFileSystem().getConf(), + destFile, + fsUri); + File storeFile = new File(buildDir, name + DumpS3GuardDynamoTable.SCAN_CSV); + try (BufferedReader in = new BufferedReader(new InputStreamReader( + new FileInputStream(storeFile), Charset.forName("UTF-8")))) { + for (String line : org.apache.commons.io.IOUtils.readLines(in)) { + LOG.info(line); + } + } + } + + @Test + public void testPurgeTableNoForce() throws Throwable { + describe("Purge the table"); + + putTombstone("/" + getMethodName(), getTime(), null); + Pair r = PurgeS3GuardDynamoTable.purgeStore( + null, + ddbmsStatic, + getFileSystem().getConf(), + fsUri, + false); + + Assertions.assertThat(r.getLeft()). + describedAs("entries found in %s", r) + .isGreaterThanOrEqualTo(1); + Assertions.assertThat(r.getRight()). + describedAs("entries deleted in %s", r) + .isZero(); + } + + @Test + public void testPurgeTableForce() throws Throwable { + describe("Purge the table -force"); + putTombstone("/" + getMethodName(), getTime(), null); + Pair r = PurgeS3GuardDynamoTable.purgeStore( + null, + ddbmsStatic, + getFileSystem().getConf(), + fsUri, + true); + Assertions.assertThat(r.getLeft()). + describedAs("entries found in %s", r) + .isGreaterThanOrEqualTo(1); + Assertions.assertThat(r.getRight()). + describedAs("entries deleted in %s", r) + .isEqualTo(r.getLeft()); + + // second iteration will have zero entries + + r = PurgeS3GuardDynamoTable.purgeStore( + null, + ddbmsStatic, + getFileSystem().getConf(), + fsUri, + true); + Assertions.assertThat(r.getLeft()). + describedAs("entries found in %s", r) + .isZero(); + Assertions.assertThat(r.getRight()). + describedAs("entries deleted in %s", r) + .isZero(); + } + + /** + * Assert that an entry exists and is a directory. + * @param pathStr path + * @throws IOException IO failure. + */ + protected DDBPathMetadata verifyAuthDirStatus(String pathStr, + boolean authDirFlag) + throws IOException { + DDBPathMetadata md = (DDBPathMetadata) getDirectory(pathStr); + assertEquals("isAuthoritativeDir() mismatch in " + md, + authDirFlag, + md.isAuthoritativeDir()); + return md; + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java index 72a4bb468c6bd..a39afa4c60b53 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java @@ -32,6 +32,7 @@ import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.dynamodbv2.document.Table; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription; +import com.amazonaws.services.dynamodbv2.xspec.ExpressionSpecBuilder; import org.junit.Assume; import org.junit.FixMethodOrder; import org.junit.Test; @@ -59,6 +60,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.s3guard.MetadataStoreTestBase.basicFileStatus; +import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.PARENT; import static org.junit.Assume.*; /** @@ -172,6 +174,21 @@ public void setup() throws Exception { @Override public void teardown() throws Exception { + if (ddbms != null) { + S3GuardTableAccess tableAccess = new S3GuardTableAccess(ddbms); + ExpressionSpecBuilder builder = new ExpressionSpecBuilder(); + builder.withCondition( + ExpressionSpecBuilder.S(PARENT).beginsWith("/test/")); + + Iterable entries = tableAccess.scanMetadata(builder); + List list = new ArrayList<>(); + entries.iterator().forEachRemaining(e -> { + Path p = e.getFileStatus().getPath(); + LOG.info("Deleting {}", p); + list.add(p); + }); + tableAccess.delete(list); + } IOUtils.cleanupWithLogger(LOG, ddbms); super.teardown(); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardDDBRootOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardDDBRootOperations.java new file mode 100644 index 0000000000000..74c406978e3e9 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardDDBRootOperations.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import java.io.File; +import java.net.URI; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.assertj.core.api.Assertions; +import org.junit.FixMethodOrder; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.fs.s3a.impl.StoreContext; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE; +import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides; +import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles; + +/** + * This test run against the root of the FS, and operations which span the DDB + * table and the filesystem. + * For this reason, these tests are executed in the sequential phase of the + * integration tests. + *

    + * The tests only run if DynamoDB is the metastore. + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class ITestS3GuardDDBRootOperations extends AbstractS3ATestBase { + + private StoreContext storeContext; + + private String fsUriStr; + + private DynamoDBMetadataStore metastore; + + private String metastoreUriStr; + + // this is a switch you can change in your IDE to enable + // or disable those tests which clean up the metastore. + private final boolean cleaning = true; + + /** + * The test timeout is increased in case previous tests have created + * many tombstone markers which now need to be purged. + * @return the test timeout. + */ + @Override + protected int getTestTimeoutMillis() { + return SCALE_TEST_TIMEOUT_SECONDS * 1000; + } + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + String bucketName = getTestBucketName(conf); + + // set a sleep time of 0 on pruning, for speedier test runs. + removeBucketOverrides(bucketName, conf, ENABLE_MULTI_DELETE); + conf.setTimeDuration( + S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY, + 0, + TimeUnit.MILLISECONDS); + return conf; + } + + @Override + public void setup() throws Exception { + super.setup(); + S3AFileSystem fs = getFileSystem(); + Configuration conf = fs.getConf(); + S3ATestUtils.assumeS3GuardState(true, conf); + storeContext = fs.createStoreContext(); + assume("Filesystem isn't running DDB", + storeContext.getMetadataStore() instanceof DynamoDBMetadataStore); + metastore = (DynamoDBMetadataStore) storeContext.getMetadataStore(); + URI fsURI = storeContext.getFsURI(); + fsUriStr = fsURI.toString(); + if (!fsUriStr.endsWith("/")) { + fsUriStr = fsUriStr + "/"; + } + metastoreUriStr = "dynamodb://" + metastore.getTableName() + "/"; + } + + @Override + public void teardown() throws Exception { + Thread.currentThread().setName("teardown"); + super.teardown(); + } + + private void assumeCleaningOperation() { + assume("Cleaning operation skipped", cleaning); + } + + @Test + @Ignore + public void test_050_dump_metastore() throws Throwable { + File destFile = calculateDumpFileBase(); + describe("Dumping S3Guard store under %s", destFile); + DumpS3GuardDynamoTable.dumpStore( + null, + metastore, + getConfiguration(), + destFile, + getFileSystem().getUri()); + } + + @Test + public void test_060_dump_metastore_and_s3() throws Throwable { + File destFile = calculateDumpFileBase(); + describe("Dumping S3Guard store under %s", destFile); + DumpS3GuardDynamoTable.dumpStore( + getFileSystem(), + metastore, + getConfiguration(), + destFile, + getFileSystem().getUri()); + } + + @Test + public void test_100_FilesystemPrune() throws Throwable { + describe("Execute prune against a filesystem URI"); + assumeCleaningOperation(); + S3AFileSystem fs = getFileSystem(); + Configuration conf = fs.getConf(); + int result = S3GuardTool.run(conf, + S3GuardTool.Prune.NAME, + fsUriStr); + Assertions.assertThat(result) + .describedAs("Result of prune %s", fsUriStr) + .isEqualTo(0); + } + + + @Test + public void test_200_MetastorePruneTombstones() throws Throwable { + describe("Execute prune against a dynamo URL"); + assumeCleaningOperation(); + S3AFileSystem fs = getFileSystem(); + Configuration conf = fs.getConf(); + int result = S3GuardTool.run(conf, + S3GuardTool.Prune.NAME, + "-tombstone", + "-meta", checkNotNull(metastoreUriStr), + "-seconds", "1", + fs.qualify(new Path("/")).toString()); + Assertions.assertThat(result) + .describedAs("Result of prune %s", fsUriStr) + .isEqualTo(0); + } + + @Test + public void test_300_MetastorePrune() throws Throwable { + describe("Execute prune against a dynamo URL"); + assumeCleaningOperation(); + S3AFileSystem fs = getFileSystem(); + Configuration conf = fs.getConf(); + int result = S3GuardTool.run(conf, + S3GuardTool.Prune.NAME, + "-meta", checkNotNull(metastoreUriStr), + "-seconds", "1"); + Assertions.assertThat(result) + .describedAs("Result of prune %s", fsUriStr) + .isEqualTo(0); + } + + @Test + public void test_400_rm_root_recursive() throws Throwable { + describe("Remove the root directory"); + assumeCleaningOperation(); + S3AFileSystem fs = getFileSystem(); + Path root = new Path("/"); + Path file = new Path("/test_400_rm_root_recursive-01"); + Path file2 = new Path("/test_400_rm_root_recursive-02"); + // recursive treewalk to delete all files + // does not delete directories. + applyLocatedFiles(fs.listFilesAndEmptyDirectories(root, true), + f -> { + Path p = f.getPath(); + fs.delete(p, true); + assertPathDoesNotExist("expected file to be deleted", p); + }); + ContractTestUtils.deleteChildren(fs, root, true); + // everything must be done by now + StringBuffer sb = new StringBuffer(); + AtomicInteger foundFile = new AtomicInteger(0); + applyLocatedFiles(fs.listFilesAndEmptyDirectories(root, true), + f -> { + foundFile.addAndGet(1); + Path p = f.getPath(); + sb.append(f.isDirectory() + ? "Dir " + : "File ") + .append(p); + if (!f.isDirectory()) { + sb.append("[").append(f.getLen()).append("]"); + } + + fs.delete(p, true); + }); + + assertEquals("Remaining files " + sb, + 0, foundFile.get()); + try { + ContractTestUtils.touch(fs, file); + assertDeleted(file, false); + + + assertTrue("Root directory delete failed", + fs.delete(root, true)); + + ContractTestUtils.touch(fs, file2); + assertFalse("Root directory delete should have failed", + fs.delete(root, true)); + } finally { + fs.delete(file, false); + fs.delete(file2, false); + } + } + + @Test + @Ignore + public void test_600_dump_metastore() throws Throwable { + File destFile = calculateDumpFileBase(); + describe("Dumping S3Guard store under %s", destFile); + DumpS3GuardDynamoTable.dumpStore( + getFileSystem(), + metastore, + getConfiguration(), + destFile, + getFileSystem().getUri()); + } + + protected File calculateDumpFileBase() { + String target = System.getProperty("test.build.dir", "target"); + File buildDir = new File(target, + this.getClass().getSimpleName()).getAbsoluteFile(); + buildDir.mkdirs(); + return new File(buildDir, getMethodName()); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java index f774bbb58caf1..d64da9d89e9ea 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java @@ -34,6 +34,7 @@ import com.amazonaws.services.dynamodbv2.model.Tag; import org.junit.Assert; import org.junit.Assume; +import org.junit.AssumptionViolatedException; import org.junit.Test; import org.apache.hadoop.conf.Configuration; @@ -60,10 +61,18 @@ public class ITestS3GuardToolDynamoDB extends AbstractS3GuardToolTestBase { @Override public void setup() throws Exception { super.setup(); - MetadataStore ms = getMetadataStore(); - Assume.assumeTrue("Test only applies when DynamoDB is used for S3Guard;" - + "Store is " + (ms == null ? "none" : ms.toString()), - ms instanceof DynamoDBMetadataStore); + try { + getMetadataStore(); + } catch (ClassCastException e) { + throw new AssumptionViolatedException( + "Test only applies when DynamoDB is used for S3Guard Store", + e); + } + } + + @Override + protected DynamoDBMetadataStore getMetadataStore() { + return (DynamoDBMetadataStore) super.getMetadataStore(); } // Check the existence of a given DynamoDB table. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java index 75bd760e1fcb7..303cf36bef490 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java @@ -120,6 +120,7 @@ protected AbstractMSContract getContract() { @Before public void setUp() throws Exception { + Thread.currentThread().setName("setup"); LOG.debug("== Setup. =="); contract = createContract(); ms = contract.getMetadataStore(); @@ -132,6 +133,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { + Thread.currentThread().setName("teardown"); LOG.debug("== Tear down. =="); if (ms != null) { try { @@ -1050,6 +1052,11 @@ protected PathMetadata getNonNull(final String pathStr) throws IOException { return checkNotNull(get(pathStr), "No metastore entry for %s", pathStr); } + /** + * Assert that either a path has no entry or that it is marked as deleted. + * @param pathStr path + * @throws IOException IO failure. + */ protected void assertDeleted(String pathStr) throws IOException { PathMetadata meta = get(pathStr); boolean cached = meta != null && !meta.isDeleted(); @@ -1073,6 +1080,39 @@ protected PathMetadata verifyCached(final String pathStr) throws IOException { return meta; } + /** + * Assert that an entry exists and is a file. + * @param pathStr path + * @throws IOException IO failure. + */ + protected PathMetadata verifyIsFile(String pathStr) throws IOException { + PathMetadata md = verifyCached(pathStr); + assertTrue("Not a file: " + md, + md.getFileStatus().isFile()); + return md; + } + + /** + * Assert that an entry exists and is a tombstone. + * @param pathStr path + * @throws IOException IO failure. + */ + protected void assertIsTombstone(String pathStr) throws IOException { + PathMetadata meta = getNonNull(pathStr); + assertTrue(pathStr + " must be a tombstone: " + meta, meta.isDeleted()); + } + + /** + * Assert that an entry does not exist. + * @param pathStr path + * @throws IOException IO failure. + */ + protected void assertNotFound(String pathStr) throws IOException { + PathMetadata meta = get(pathStr); + assertNull("Unexpectedly found entry at path " + pathStr, + meta); + } + /** * Get an entry which must be a file. * @param pathStr path @@ -1099,6 +1139,40 @@ protected PathMetadata getDirectory(final String pathStr) throws IOException { return meta; } + /** + * Get an entry which must not be marked as an empty directory: + * its empty directory field must be FALSE or UNKNOWN. + * @param pathStr path + * @return the entry + * @throws IOException IO failure. + */ + protected PathMetadata getNonEmptyDirectory(final String pathStr) + throws IOException { + PathMetadata meta = getDirectory(pathStr); + assertNotEquals("Path " + pathStr + + " is considered an empty dir " + meta, + Tristate.TRUE, + meta.isEmptyDirectory()); + return meta; + } + + /** + * Get an entry which must be an empty directory. + * its empty directory field must be TRUE. + * @param pathStr path + * @return the entry + * @throws IOException IO failure. + */ + protected PathMetadata getEmptyDirectory(final String pathStr) + throws IOException { + PathMetadata meta = getDirectory(pathStr); + assertEquals("Path " + pathStr + + " is not considered an empty dir " + meta, + Tristate.TRUE, + meta.isEmptyDirectory()); + return meta; + } + /** * Convenience to create a fully qualified Path from string. */ diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java index a6d20fd966af2..578aed06bc3ca 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMiscOperations.java @@ -115,7 +115,7 @@ public void testInnerListChildrenDirectoryNpe() throws Exception { public void testAncestorStateForDir() throws Throwable { final DynamoDBMetadataStore.AncestorState ancestorState = new DynamoDBMetadataStore.AncestorState( - BulkOperationState.OperationType.Rename, null); + null, BulkOperationState.OperationType.Rename, null); // path 1 is a directory final Path path1 = new Path("s3a://bucket/1"); @@ -143,7 +143,7 @@ public void testAncestorStateForDir() throws Throwable { public void testAncestorStateForFile() throws Throwable { final DynamoDBMetadataStore.AncestorState ancestorState = new DynamoDBMetadataStore.AncestorState( - BulkOperationState.OperationType.Rename, null); + null, BulkOperationState.OperationType.Rename, null); // path 1 is a file final Path path1 = new Path("s3a://bucket/1"); diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties index 383dec82204ee..f6162644e2535 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties @@ -57,6 +57,8 @@ log4j.logger.org.apache.hadoop.ipc.Server=WARN #log4j.logger.org.apache.hadoop.fs.s3a.Listing=INFO # Log S3Guard classes #log4j.logger.org.apache.hadoop.fs.s3a.s3guard=DEBUG +# if set to debug, this will log the PUT/DELETE operations on a store +#log4j.logger.org.apache.hadoop.fs.s3a.s3guard.Operations=DEBUG # Log Committer classes #log4j.logger.org.apache.hadoop.fs.s3a.commit=DEBUG