Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public void testRecursiveRootListing() throws IOException {
fs.listFiles(root, true));
describe("verifying consistency with treewalk's files");
ContractTestUtils.TreeScanResults treeWalk = treeWalk(fs, root);
treeWalk.assertFieldsEquivalent("files", listing,
treeWalk.assertFieldsEquivalent("treewalk vs listFiles(/, true)", listing,
treeWalk.getFiles(),
listing.getFiles());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1576,7 +1576,7 @@ public void deleteObjectAtPath(final Path path,

@Override
@Retries.RetryTranslated
public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
public RemoteIterator<S3ALocatedFileStatus> listFilesAndDirectoryMarkers(
final Path path,
final S3AFileStatus status,
final boolean collectTombstones,
Expand Down Expand Up @@ -2081,6 +2081,7 @@ protected void deleteObject(String key)
DELETE_CONSIDERED_IDEMPOTENT,
()-> {
incrementStatistic(OBJECT_DELETE_REQUESTS);
incrementStatistic(OBJECT_DELETE_OBJECTS);
s3.deleteObject(bucket, key);
return null;
});
Expand Down Expand Up @@ -2127,9 +2128,14 @@ private void blockRootDelete(String key) throws InvalidRequestException {
}

/**
* Perform a bulk object delete operation.
* Perform a bulk object delete operation against S3; leaves S3Guard
* alone.
* Increments the {@code OBJECT_DELETE_REQUESTS} and write
* operation statistics.
* operation statistics
* <p></p>
* {@code OBJECT_DELETE_OBJECTS} is updated with the actual number
* of objects deleted in the request.
* <p></p>
* Retry policy: retry untranslated; delete considered idempotent.
* If the request is throttled, this is logged in the throttle statistics,
* with the counter set to the number of keys, rather than the number
Expand All @@ -2150,9 +2156,10 @@ private DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteRequest)
incrementWriteOperations();
BulkDeleteRetryHandler retryHandler =
new BulkDeleteRetryHandler(createStoreContext());
int keyCount = deleteRequest.getKeys().size();
try(DurationInfo ignored =
new DurationInfo(LOG, false, "DELETE %d keys",
deleteRequest.getKeys().size())) {
keyCount)) {
return invoker.retryUntranslated("delete",
DELETE_CONSIDERED_IDEMPOTENT,
(text, e, r, i) -> {
Expand All @@ -2161,6 +2168,7 @@ private DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteRequest)
},
() -> {
incrementStatistic(OBJECT_DELETE_REQUESTS, 1);
incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount);
return s3.deleteObjects(deleteRequest);
});
} catch (MultiObjectDeleteException e) {
Expand Down Expand Up @@ -2550,8 +2558,8 @@ DeleteObjectsResult removeKeys(
// entries so we only process these failures on "real" deletes.
Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>> results =
new MultiObjectDeleteSupport(createStoreContext(), operationState)
.processDeleteFailure(ex, keysToDelete);
undeletedObjectsOnFailure.addAll(results.getMiddle());
.processDeleteFailure(ex, keysToDelete, new ArrayList<Path>());
undeletedObjectsOnFailure.addAll(results.getLeft());
}
throw ex;
} catch (AmazonClientException | IOException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
INVOCATION_RENAME,
OBJECT_COPY_REQUESTS,
OBJECT_DELETE_REQUESTS,
OBJECT_DELETE_OBJECTS,
OBJECT_LIST_REQUESTS,
OBJECT_CONTINUE_LIST_REQUESTS,
OBJECT_METADATA_REQUESTS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public enum Statistic {
"Calls of rename()"),
OBJECT_COPY_REQUESTS("object_copy_requests", "Object copy requests"),
OBJECT_DELETE_REQUESTS("object_delete_requests", "Object delete requests"),
OBJECT_DELETE_OBJECTS("object_delete_objects",
"Objects deleted in delete requests"),
OBJECT_LIST_REQUESTS("object_list_requests",
"Number of object listings made"),
OBJECT_CONTINUE_LIST_REQUESTS("object_continue_list_requests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
Expand Down Expand Up @@ -152,10 +153,13 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
/**
* List of keys built up for the next delete batch.
*/
private List<DeleteObjectsRequest.KeyVersion> keys;
private List<DeleteEntry> keys;

/**
* List of paths built up for deletion.
* List of paths built up for incremental deletion on tree delete.
* At the end of the entire delete the full tree is scanned in S3Guard
* and tombstones added. For this reason this list of paths <i>must not</i>
* include directory markers, as that will break the scan.
*/
private List<Path> paths;

Expand Down Expand Up @@ -279,7 +283,7 @@ public Boolean execute() throws IOException {
LOG.debug("deleting simple file {}", path);
deleteObjectAtPath(path, key, true);
}
LOG.debug("Deleted {} files", filesDeleted);
LOG.debug("Deleted {} objects", filesDeleted);
return true;
}

Expand Down Expand Up @@ -323,7 +327,7 @@ protected void deleteDirectoryTree(final Path path,
// list files including any under tombstones through S3Guard
LOG.debug("Getting objects for directory prefix {} to delete", dirKey);
final RemoteIterator<S3ALocatedFileStatus> locatedFiles =
callbacks.listFilesAndEmptyDirectories(path, status,
callbacks.listFilesAndDirectoryMarkers(path, status,
false, true);

// iterate through and delete. The next() call will block when a new S3
Expand Down Expand Up @@ -359,7 +363,10 @@ protected void deleteDirectoryTree(final Path path,
while (objects.hasNext()) {
// get the next entry in the listing.
extraFilesDeleted++;
queueForDeletion(deletionKey(objects.next()), null);
S3AFileStatus next = objects.next();
LOG.debug("Found Unlisted entry {}", next);
queueForDeletion(deletionKey(next), null,
next.isDirectory());
}
if (extraFilesDeleted > 0) {
LOG.debug("Raw S3 Scan found {} extra file(s) to delete",
Expand Down Expand Up @@ -402,7 +409,7 @@ private String deletionKey(final S3AFileStatus stat) {
*/
private void queueForDeletion(
final S3AFileStatus stat) throws IOException {
queueForDeletion(deletionKey(stat), stat.getPath());
queueForDeletion(deletionKey(stat), stat.getPath(), stat.isDirectory());
}

/**
Expand All @@ -413,14 +420,18 @@ private void queueForDeletion(
*
* @param key key to delete
* @param deletePath nullable path of the key
* @param isDirMarker is the entry a directory?
* @throws IOException failure of the previous batch of deletions.
*/
private void queueForDeletion(final String key,
@Nullable final Path deletePath) throws IOException {
@Nullable final Path deletePath,
boolean isDirMarker) throws IOException {
LOG.debug("Adding object to delete: \"{}\"", key);
keys.add(new DeleteObjectsRequest.KeyVersion(key));
keys.add(new DeleteEntry(key, isDirMarker));
if (deletePath != null) {
paths.add(deletePath);
if (!isDirMarker) {
paths.add(deletePath);
}
}

if (keys.size() == pageSize) {
Expand Down Expand Up @@ -484,7 +495,7 @@ private void deleteObjectAtPath(
* @return the submitted future or null
*/
private CompletableFuture<Void> submitDelete(
final List<DeleteObjectsRequest.KeyVersion> keyList,
final List<DeleteEntry> keyList,
final List<Path> pathList) {

if (keyList.isEmpty() && pathList.isEmpty()) {
Expand Down Expand Up @@ -514,31 +525,62 @@ private CompletableFuture<Void> submitDelete(
@Retries.RetryTranslated
private void asyncDeleteAction(
final BulkOperationState state,
final List<DeleteObjectsRequest.KeyVersion> keyList,
final List<DeleteEntry> keyList,
final List<Path> pathList,
final boolean auditDeletedKeys)
throws IOException {
List<DeleteObjectsResult.DeletedObject> deletedObjects = new ArrayList<>();
try (DurationInfo ignored =
new DurationInfo(LOG, false, "Delete page of keys")) {
new DurationInfo(LOG, false,
"Delete page of %d keys", keyList.size())) {
DeleteObjectsResult result = null;
List<Path> undeletedObjects = new ArrayList<>();
if (!keyList.isEmpty()) {
result = Invoker.once("Remove S3 Keys",
// first delete the files.
List<DeleteObjectsRequest.KeyVersion> files = keyList.stream()
.filter(e -> !e.isDirMarker)
.map(e -> e.keyVersion)
.collect(Collectors.toList());
LOG.debug("Deleting of {} file objects", files.size());
result = Invoker.once("Remove S3 Files",
status.getPath().toString(),
() -> callbacks.removeKeys(
keyList,
files,
false,
undeletedObjects,
state,
!auditDeletedKeys));
if (result != null) {
deletedObjects.addAll(result.getDeletedObjects());
}
// now the dirs
List<DeleteObjectsRequest.KeyVersion> dirs = keyList.stream()
.filter(e -> e.isDirMarker)
.map(e -> e.keyVersion)
.collect(Collectors.toList());
LOG.debug("Deleting of {} directory markers", dirs.size());
// This is invoked with deleteFakeDir = true, so
// S3Guard is not updated.
result = Invoker.once("Remove S3 Dir Markers",
status.getPath().toString(),
() -> callbacks.removeKeys(
dirs,
true,
undeletedObjects,
state,
!auditDeletedKeys));
if (result != null) {
deletedObjects.addAll(result.getDeletedObjects());
}
}
if (!pathList.isEmpty()) {
// delete file paths only. This stops tombstones
// being added until the final directory cleanup
// (HADOOP-17244)
metadataStore.deletePaths(pathList, state);
}
if (auditDeletedKeys && result != null) {
if (auditDeletedKeys) {
// audit the deleted keys
List<DeleteObjectsResult.DeletedObject> deletedObjects =
result.getDeletedObjects();
if (deletedObjects.size() != keyList.size()) {
// size mismatch
LOG.warn("Size mismatch in deletion operation. "
Expand All @@ -549,13 +591,39 @@ private void asyncDeleteAction(
for (DeleteObjectsResult.DeletedObject del : deletedObjects) {
keyList.removeIf(kv -> kv.getKey().equals(del.getKey()));
}
for (DeleteObjectsRequest.KeyVersion kv : keyList) {
for (DeleteEntry kv : keyList) {
LOG.debug("{}", kv.getKey());
}
}
}
}
}

/**
* Deletion entry; dir marker state is tracked to control S3Guard
* update policy.
*/
private static final class DeleteEntry {
private final DeleteObjectsRequest.KeyVersion keyVersion;

private final boolean isDirMarker;

private DeleteEntry(final String key, final boolean isDirMarker) {
this.keyVersion = new DeleteObjectsRequest.KeyVersion(key);
this.isDirMarker = isDirMarker;
}

public String getKey() {
return keyVersion.getKey();
}

@Override
public String toString() {
return "DeleteEntry{" +
"key='" + getKey() + '\'' +
", isDirMarker=" + isDirMarker +
'}';
}
}

}
Loading