From 85e0f97e0b3be1f6adb4dcedc9dcc1449bf9a316 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 6 Feb 2020 15:10:17 +0000 Subject: [PATCH] HADOOP-16848. Experimental directory marker optimization. Optimizes directory marker operations through:- * Async create/clear * An option "fs.s3a.experimental.optimized.directory.operations" That performs whatever optimisations to directory marker IO are considered effective to reduce that I while still avoiding problems with missing markers and/or markers not deleted. 1. Work in progress 2. Experimental: use at own risk 3. Not for use against buckets whether other S3A releases are active 4. And not for use with data you value. View it as a Proof of Concept to explore what we can do here. Change-Id: Ia0dadc5e513db00c650376769e37d2ef1fab12f5 --- .../org/apache/hadoop/fs/s3a/Constants.java | 14 +++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 108 +++++++++++++++--- 2 files changed, 104 insertions(+), 18 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 561ab4a84a7c3..e91ef695c843c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -210,6 +210,20 @@ private Constants() { public static final boolean EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT = true; + /** + * Experimental/Unstable feature: should empty directory marker + * operations be optimized? Value {@value}. + * Default: false. + * + * This is an experimental feature for reducing operations related + * to looking for/deleting fake directory markers. + * The goals are better performance as well as fewer tombstone markers + * being created on versioned buckets. + */ + @InterfaceStability.Unstable + public static final String EXPERIMENTAL_OPTIMIZED_DIRECTORY_OPERATIONS = + "fs.s3a.experimental.optimized.directory.operations"; + // seconds until we give up trying to establish a connection to s3 public static final String ESTABLISH_TIMEOUT = "fs.s3a.connection.establish.timeout"; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index ce7729fa396ea..59e0d2f66fa61 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -172,8 +172,10 @@ import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.TokenIssuingPolicy.NoTokensAvailable; import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; +import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.X_DIRECTORY; import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; @@ -287,6 +289,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, private final S3AFileSystem.OperationCallbacksImpl operationCallbacks = new OperationCallbacksImpl(); + /** + * Should directory marker use be optimized? + */ + private boolean optimizeDirectoryOperations; + /** Add any deprecated keys. */ @SuppressWarnings("deprecation") private static void addDeprecatedKeys() { @@ -411,7 +418,11 @@ public void initialize(URI name, Configuration originalConf) // instantiate S3 Select support selectBinding = new SelectBinding(writeHelper); - + optimizeDirectoryOperations = conf.getBoolean( + EXPERIMENTAL_OPTIMIZED_DIRECTORY_OPERATIONS, false); + if (optimizeDirectoryOperations) { + LOG.info("Using experimental optimized directory operations"); + } boolean blockUploadEnabled = conf.getBoolean(FAST_UPLOAD, true); if (!blockUploadEnabled) { @@ -1495,8 +1506,21 @@ public void finishRename(final Path sourceRenamed, final Path destCreated) Path destParent = destCreated.getParent(); if (!sourceRenamed.getParent().equals(destParent)) { LOG.debug("source & dest parents are different; fix up dir markers"); - deleteUnnecessaryFakeDirectories(destParent); - maybeCreateFakeParentDirectory(sourceRenamed); + // kick off an async delete + List> ops = new ArrayList<>(2); + ops.add(submit( + unboundedThreadPool, + () -> { + deleteUnnecessaryFakeDirectories(destParent, false); + return null; + })); + ops.add(submit( + unboundedThreadPool, + () -> { + maybeCreateFakeParentDirectory(sourceRenamed); + return null; + })); + waitForCompletion(ops); } } @@ -3564,7 +3588,7 @@ void finishedWrite(String key, long length, String eTag, String versionId, final CompletableFuture deletion = submit( unboundedThreadPool, () -> { - deleteUnnecessaryFakeDirectories(p.getParent()); + deleteUnnecessaryFakeDirectories(p.getParent(), isDir); return null; }); // this is only set if there is a metastore to update and the @@ -3629,18 +3653,50 @@ void finishedWrite(String key, long length, String eTag, String versionId, * Delete mock parent directories which are no longer needed. * Retry policy: retrying; exceptions swallowed. * @param path path + * @param isMkDirOperation is this for a mkdir call? */ @Retries.RetryExceptionsSwallowed - private void deleteUnnecessaryFakeDirectories(Path path) { + private void deleteUnnecessaryFakeDirectories(Path path, + final boolean isMkDirOperation) { List keysToRemove = new ArrayList<>(); - while (!path.isRoot()) { - String key = pathToKey(path); - key = (key.endsWith("/")) ? key : (key + "/"); - LOG.trace("To delete unnecessary fake directory {} for {}", key, path); - keysToRemove.add(new DeleteObjectsRequest.KeyVersion(key)); - path = path.getParent(); + boolean deleteWholeTree = false; + if (optimizeDirectoryOperations && !isMkDirOperation) { + // this is a file creation/commit + // Assume that the parent directory exists either explicitly as a marker + // on implicitly (peer entries) + // only look for the dir marker in S3 -we don't care about DDB. + try { + String key = pathToKey(path); + s3GetFileStatus(path, key, StatusProbeEnum.DIR_MARKER_ONLY, null); + // here an entry exists. + LOG.debug("Removing marker {}", key); + keysToRemove.add(new DeleteObjectsRequest.KeyVersion(key)); + } catch (FileNotFoundException e) { + // no entry. Nothing to delete. + } catch (IOException e) { + instrumentation.errorIgnored(); + LOG.debug("Ignored when looking at directory marker {}", path, e); + // for now, fall back to a full delete. + // if the failure was permissions or network this will probably fail + // too... + deleteWholeTree = true; + } + } else { + deleteWholeTree = true; + } + if (deleteWholeTree) { + // traditional delete creates a delete request for + // all parents. + while (!path.isRoot()) { + String key = pathToKey(path); + key = (key.endsWith("/")) ? key : (key + "/"); + LOG.trace("To delete unnecessary fake directory {} for {}", key, path); + keysToRemove.add(new DeleteObjectsRequest.KeyVersion(key)); + path = path.getParent(); + } } try { + // TODO: when size ==1, use DELETE instead removeKeys(keysToRemove, true, null); } catch(AmazonClientException | IOException e) { instrumentation.errorIgnored(); @@ -3686,8 +3742,10 @@ public int read() throws IOException { } }; + final ObjectMetadata metadata = newObjectMetadata(0L); + metadata.setContentType(X_DIRECTORY); PutObjectRequest putObjectRequest = newPutObjectRequest(objectName, - newObjectMetadata(0L), + metadata, im); invoker.retry("PUT 0-byte object ", objectName, true, @@ -3803,8 +3861,7 @@ public String toString() { if (committerIntegration != null) { sb.append(", magicCommitter=").append(isMagicCommitEnabled()); } - sb.append(", boundedExecutor=").append(boundedThreadPool); - sb.append(", unboundedExecutor=").append(unboundedThreadPool); + sb.append(", optimizeDirMarkers=").append(optimizeDirectoryOperations); sb.append(", credentials=").append(credentials); sb.append(", delegation tokens=") .append(delegationTokens.map(Objects::toString).orElse("disabled")); @@ -3902,25 +3959,40 @@ public boolean exists(Path f) throws IOException { } /** - * Override superclass so as to add statistic collection. + * An optimized check which only looks for directory markers. * {@inheritDoc} */ @Override @SuppressWarnings("deprecation") public boolean isDirectory(Path f) throws IOException { entryPoint(INVOCATION_IS_DIRECTORY); - return super.isDirectory(f); + try { + // against S3Guard, a full query; + // against S3 a HEAD + "/" then a LIST. + return innerGetFileStatus(f, false, + StatusProbeEnum.DIRECTORIES).isDirectory(); + } catch (FileNotFoundException e) { + return false; + } } /** - * Override superclass so as to add statistic collection. + * Override superclass so as to only poll for a file. + * Warning: may leave a 404 in the S3 load balancer cache. * {@inheritDoc} */ @Override @SuppressWarnings("deprecation") public boolean isFile(Path f) throws IOException { entryPoint(INVOCATION_IS_FILE); - return super.isFile(f); + try { + // against S3Guard, a full query; against S3 only a HEAD. + return innerGetFileStatus(f, false, + StatusProbeEnum.HEAD_ONLY).isFile(); + } catch (FileNotFoundException e) { + // no file or there is a directory there. + return false; + } } /**