Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,20 @@ private Constants() {
public static final boolean EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT =
true;

/**
* Experimental/Unstable feature: should empty directory marker
* operations be optimized? Value {@value}.
* Default: false.
*
* This is an experimental feature for reducing operations related
* to looking for/deleting fake directory markers.
* The goals are better performance as well as fewer tombstone markers
* being created on versioned buckets.
*/
@InterfaceStability.Unstable
public static final String EXPERIMENTAL_OPTIMIZED_DIRECTORY_OPERATIONS =
"fs.s3a.experimental.optimized.directory.operations";

// seconds until we give up trying to establish a connection to s3
public static final String ESTABLISH_TIMEOUT =
"fs.s3a.connection.establish.timeout";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,10 @@
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.TokenIssuingPolicy.NoTokensAvailable;
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.X_DIRECTORY;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;

Expand Down Expand Up @@ -287,6 +289,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private final S3AFileSystem.OperationCallbacksImpl
operationCallbacks = new OperationCallbacksImpl();

/**
* Should directory marker use be optimized?
*/
private boolean optimizeDirectoryOperations;

/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
Expand Down Expand Up @@ -411,7 +418,11 @@ public void initialize(URI name, Configuration originalConf)

// instantiate S3 Select support
selectBinding = new SelectBinding(writeHelper);

optimizeDirectoryOperations = conf.getBoolean(
EXPERIMENTAL_OPTIMIZED_DIRECTORY_OPERATIONS, false);
if (optimizeDirectoryOperations) {
LOG.info("Using experimental optimized directory operations");
}
boolean blockUploadEnabled = conf.getBoolean(FAST_UPLOAD, true);

if (!blockUploadEnabled) {
Expand Down Expand Up @@ -1495,8 +1506,21 @@ public void finishRename(final Path sourceRenamed, final Path destCreated)
Path destParent = destCreated.getParent();
if (!sourceRenamed.getParent().equals(destParent)) {
LOG.debug("source & dest parents are different; fix up dir markers");
deleteUnnecessaryFakeDirectories(destParent);
maybeCreateFakeParentDirectory(sourceRenamed);
// kick off an async delete
List<CompletableFuture<Void>> ops = new ArrayList<>(2);
ops.add(submit(
unboundedThreadPool,
() -> {
deleteUnnecessaryFakeDirectories(destParent, false);
return null;
}));
ops.add(submit(
unboundedThreadPool,
() -> {
maybeCreateFakeParentDirectory(sourceRenamed);
return null;
}));
waitForCompletion(ops);
}
}

Expand Down Expand Up @@ -3564,7 +3588,7 @@ void finishedWrite(String key, long length, String eTag, String versionId,
final CompletableFuture<?> deletion = submit(
unboundedThreadPool,
() -> {
deleteUnnecessaryFakeDirectories(p.getParent());
deleteUnnecessaryFakeDirectories(p.getParent(), isDir);
return null;
});
// this is only set if there is a metastore to update and the
Expand Down Expand Up @@ -3629,18 +3653,50 @@ void finishedWrite(String key, long length, String eTag, String versionId,
* Delete mock parent directories which are no longer needed.
* Retry policy: retrying; exceptions swallowed.
* @param path path
* @param isMkDirOperation is this for a mkdir call?
*/
@Retries.RetryExceptionsSwallowed
private void deleteUnnecessaryFakeDirectories(Path path) {
private void deleteUnnecessaryFakeDirectories(Path path,
final boolean isMkDirOperation) {
List<DeleteObjectsRequest.KeyVersion> keysToRemove = new ArrayList<>();
while (!path.isRoot()) {
String key = pathToKey(path);
key = (key.endsWith("/")) ? key : (key + "/");
LOG.trace("To delete unnecessary fake directory {} for {}", key, path);
keysToRemove.add(new DeleteObjectsRequest.KeyVersion(key));
path = path.getParent();
boolean deleteWholeTree = false;
if (optimizeDirectoryOperations && !isMkDirOperation) {
// this is a file creation/commit
// Assume that the parent directory exists either explicitly as a marker
// on implicitly (peer entries)
// only look for the dir marker in S3 -we don't care about DDB.
try {
String key = pathToKey(path);
s3GetFileStatus(path, key, StatusProbeEnum.DIR_MARKER_ONLY, null);
// here an entry exists.
LOG.debug("Removing marker {}", key);
keysToRemove.add(new DeleteObjectsRequest.KeyVersion(key));
} catch (FileNotFoundException e) {
// no entry. Nothing to delete.
} catch (IOException e) {
instrumentation.errorIgnored();
LOG.debug("Ignored when looking at directory marker {}", path, e);
// for now, fall back to a full delete.
// if the failure was permissions or network this will probably fail
// too...
deleteWholeTree = true;
}
} else {
deleteWholeTree = true;
}
if (deleteWholeTree) {
// traditional delete creates a delete request for
// all parents.
while (!path.isRoot()) {
String key = pathToKey(path);
key = (key.endsWith("/")) ? key : (key + "/");
LOG.trace("To delete unnecessary fake directory {} for {}", key, path);
keysToRemove.add(new DeleteObjectsRequest.KeyVersion(key));
path = path.getParent();
}
}
try {
// TODO: when size ==1, use DELETE instead
removeKeys(keysToRemove, true, null);
} catch(AmazonClientException | IOException e) {
instrumentation.errorIgnored();
Expand Down Expand Up @@ -3686,8 +3742,10 @@ public int read() throws IOException {
}
};

final ObjectMetadata metadata = newObjectMetadata(0L);
metadata.setContentType(X_DIRECTORY);
PutObjectRequest putObjectRequest = newPutObjectRequest(objectName,
newObjectMetadata(0L),
metadata,
im);
invoker.retry("PUT 0-byte object ", objectName,
true,
Expand Down Expand Up @@ -3803,8 +3861,7 @@ public String toString() {
if (committerIntegration != null) {
sb.append(", magicCommitter=").append(isMagicCommitEnabled());
}
sb.append(", boundedExecutor=").append(boundedThreadPool);
sb.append(", unboundedExecutor=").append(unboundedThreadPool);
sb.append(", optimizeDirMarkers=").append(optimizeDirectoryOperations);
sb.append(", credentials=").append(credentials);
sb.append(", delegation tokens=")
.append(delegationTokens.map(Objects::toString).orElse("disabled"));
Expand Down Expand Up @@ -3902,25 +3959,40 @@ public boolean exists(Path f) throws IOException {
}

/**
* Override superclass so as to add statistic collection.
* An optimized check which only looks for directory markers.
* {@inheritDoc}
*/
@Override
@SuppressWarnings("deprecation")
public boolean isDirectory(Path f) throws IOException {
entryPoint(INVOCATION_IS_DIRECTORY);
return super.isDirectory(f);
try {
// against S3Guard, a full query;
// against S3 a HEAD + "/" then a LIST.
return innerGetFileStatus(f, false,
StatusProbeEnum.DIRECTORIES).isDirectory();
} catch (FileNotFoundException e) {
return false;
}
}

/**
* Override superclass so as to add statistic collection.
* Override superclass so as to only poll for a file.
* Warning: may leave a 404 in the S3 load balancer cache.
* {@inheritDoc}
*/
@Override
@SuppressWarnings("deprecation")
public boolean isFile(Path f) throws IOException {
entryPoint(INVOCATION_IS_FILE);
return super.isFile(f);
try {
// against S3Guard, a full query; against S3 only a HEAD.
return innerGetFileStatus(f, false,
StatusProbeEnum.HEAD_ONLY).isFile();
} catch (FileNotFoundException e) {
// no file or there is a directory there.
return false;
}
}

/**
Expand Down