Skip to content

Commit 35ec09e

Browse files
committed
CDPD-16792. HADOOP-17244. S3A directory delete tombstones dir markers prematurely. (apache#2310)
This fixes the S3Guard/Directory Marker Retention integration so that when fs.s3a.directory.marker.retention=keep, failures during multipart delete are handled correctly, as are incremental deletes during directory tree operations. In both cases, when a directory marker with children is deleted from S3, the directory entry in S3Guard is not deleted, because it is still critical to representing the structure of the store. Contributed by Steve Loughran. Change-Id: I4ca133a23ea582cd42ec35dbf2dc85b286297d2f
1 parent 1f66810 commit 35ec09e

File tree

20 files changed

+1100
-440
lines changed

20 files changed

+1100
-440
lines changed

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ public void testRecursiveRootListing() throws IOException {
265265
fs.listFiles(root, true));
266266
describe("verifying consistency with treewalk's files");
267267
ContractTestUtils.TreeScanResults treeWalk = treeWalk(fs, root);
268-
treeWalk.assertFieldsEquivalent("files", listing,
268+
treeWalk.assertFieldsEquivalent("treewalk vs listFiles(/, true)", listing,
269269
treeWalk.getFiles(),
270270
listing.getFiles());
271271
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1551,7 +1551,7 @@ public void deleteObjectAtPath(final Path path,
15511551

15521552
@Override
15531553
@Retries.RetryTranslated
1554-
public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
1554+
public RemoteIterator<S3ALocatedFileStatus> listFilesAndDirectoryMarkers(
15551555
final Path path,
15561556
final S3AFileStatus status,
15571557
final boolean collectTombstones,
@@ -2056,6 +2056,7 @@ protected void deleteObject(String key)
20562056
DELETE_CONSIDERED_IDEMPOTENT,
20572057
()-> {
20582058
incrementStatistic(OBJECT_DELETE_REQUESTS);
2059+
incrementStatistic(OBJECT_DELETE_OBJECTS);
20592060
s3.deleteObject(bucket, key);
20602061
return null;
20612062
});
@@ -2102,9 +2103,14 @@ private void blockRootDelete(String key) throws InvalidRequestException {
21022103
}
21032104

21042105
/**
2105-
* Perform a bulk object delete operation.
2106+
* Perform a bulk object delete operation against S3; leaves S3Guard
2107+
* alone.
21062108
* Increments the {@code OBJECT_DELETE_REQUESTS} and write
2107-
* operation statistics.
2109+
* operation statistics
2110+
* <p></p>
2111+
* {@code OBJECT_DELETE_OBJECTS} is updated with the actual number
2112+
* of objects deleted in the request.
2113+
* <p></p>
21082114
* Retry policy: retry untranslated; delete considered idempotent.
21092115
* If the request is throttled, this is logged in the throttle statistics,
21102116
* with the counter set to the number of keys, rather than the number
@@ -2125,9 +2131,10 @@ private DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteRequest)
21252131
incrementWriteOperations();
21262132
BulkDeleteRetryHandler retryHandler =
21272133
new BulkDeleteRetryHandler(createStoreContext());
2134+
int keyCount = deleteRequest.getKeys().size();
21282135
try(DurationInfo ignored =
21292136
new DurationInfo(LOG, false, "DELETE %d keys",
2130-
deleteRequest.getKeys().size())) {
2137+
keyCount)) {
21312138
return invoker.retryUntranslated("delete",
21322139
DELETE_CONSIDERED_IDEMPOTENT,
21332140
(text, e, r, i) -> {
@@ -2136,6 +2143,7 @@ private DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteRequest)
21362143
},
21372144
() -> {
21382145
incrementStatistic(OBJECT_DELETE_REQUESTS, 1);
2146+
incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount);
21392147
return s3.deleteObjects(deleteRequest);
21402148
});
21412149
} catch (MultiObjectDeleteException e) {
@@ -2525,8 +2533,8 @@ DeleteObjectsResult removeKeys(
25252533
// entries so we only process these failures on "real" deletes.
25262534
Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>> results =
25272535
new MultiObjectDeleteSupport(createStoreContext(), operationState)
2528-
.processDeleteFailure(ex, keysToDelete);
2529-
undeletedObjectsOnFailure.addAll(results.getMiddle());
2536+
.processDeleteFailure(ex, keysToDelete, new ArrayList<Path>());
2537+
undeletedObjectsOnFailure.addAll(results.getLeft());
25302538
}
25312539
throw ex;
25322540
} catch (AmazonClientException | IOException ex) {

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
156156
INVOCATION_RENAME,
157157
OBJECT_COPY_REQUESTS,
158158
OBJECT_DELETE_REQUESTS,
159+
OBJECT_DELETE_OBJECTS,
159160
OBJECT_LIST_REQUESTS,
160161
OBJECT_CONTINUE_LIST_REQUESTS,
161162
OBJECT_METADATA_REQUESTS,

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ public enum Statistic {
8585
"Calls of rename()"),
8686
OBJECT_COPY_REQUESTS("object_copy_requests", "Object copy requests"),
8787
OBJECT_DELETE_REQUESTS("object_delete_requests", "Object delete requests"),
88+
OBJECT_DELETE_OBJECTS("object_delete_objects",
89+
"Objects deleted in delete requests"),
8890
OBJECT_LIST_REQUESTS("object_list_requests",
8991
"Number of object listings made"),
9092
OBJECT_CONTINUE_LIST_REQUESTS("object_continue_list_requests",

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java

Lines changed: 86 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.ArrayList;
2424
import java.util.List;
2525
import java.util.concurrent.CompletableFuture;
26+
import java.util.stream.Collectors;
2627

2728
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
2829
import com.amazonaws.services.s3.model.DeleteObjectsResult;
@@ -152,10 +153,13 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
152153
/**
153154
* List of keys built up for the next delete batch.
154155
*/
155-
private List<DeleteObjectsRequest.KeyVersion> keys;
156+
private List<DeleteEntry> keys;
156157

157158
/**
158-
* List of paths built up for deletion.
159+
* List of paths built up for incremental deletion on tree delete.
160+
* At the end of the entire delete the full tree is scanned in S3Guard
161+
* and tombstones added. For this reason this list of paths <i>must not</i>
162+
* include directory markers, as that will break the scan.
159163
*/
160164
private List<Path> paths;
161165

@@ -279,7 +283,7 @@ public Boolean execute() throws IOException {
279283
LOG.debug("deleting simple file {}", path);
280284
deleteObjectAtPath(path, key, true);
281285
}
282-
LOG.debug("Deleted {} files", filesDeleted);
286+
LOG.debug("Deleted {} objects", filesDeleted);
283287
return true;
284288
}
285289

@@ -323,7 +327,7 @@ protected void deleteDirectoryTree(final Path path,
323327
// list files including any under tombstones through S3Guard
324328
LOG.debug("Getting objects for directory prefix {} to delete", dirKey);
325329
final RemoteIterator<S3ALocatedFileStatus> locatedFiles =
326-
callbacks.listFilesAndEmptyDirectories(path, status,
330+
callbacks.listFilesAndDirectoryMarkers(path, status,
327331
false, true);
328332

329333
// iterate through and delete. The next() call will block when a new S3
@@ -359,7 +363,10 @@ protected void deleteDirectoryTree(final Path path,
359363
while (objects.hasNext()) {
360364
// get the next entry in the listing.
361365
extraFilesDeleted++;
362-
queueForDeletion(deletionKey(objects.next()), null);
366+
S3AFileStatus next = objects.next();
367+
LOG.debug("Found Unlisted entry {}", next);
368+
queueForDeletion(deletionKey(next), null,
369+
next.isDirectory());
363370
}
364371
if (extraFilesDeleted > 0) {
365372
LOG.debug("Raw S3 Scan found {} extra file(s) to delete",
@@ -402,7 +409,7 @@ private String deletionKey(final S3AFileStatus stat) {
402409
*/
403410
private void queueForDeletion(
404411
final S3AFileStatus stat) throws IOException {
405-
queueForDeletion(deletionKey(stat), stat.getPath());
412+
queueForDeletion(deletionKey(stat), stat.getPath(), stat.isDirectory());
406413
}
407414

408415
/**
@@ -413,14 +420,18 @@ private void queueForDeletion(
413420
*
414421
* @param key key to delete
415422
* @param deletePath nullable path of the key
423+
* @param isDirMarker is the entry a directory?
416424
* @throws IOException failure of the previous batch of deletions.
417425
*/
418426
private void queueForDeletion(final String key,
419-
@Nullable final Path deletePath) throws IOException {
427+
@Nullable final Path deletePath,
428+
boolean isDirMarker) throws IOException {
420429
LOG.debug("Adding object to delete: \"{}\"", key);
421-
keys.add(new DeleteObjectsRequest.KeyVersion(key));
430+
keys.add(new DeleteEntry(key, isDirMarker));
422431
if (deletePath != null) {
423-
paths.add(deletePath);
432+
if (!isDirMarker) {
433+
paths.add(deletePath);
434+
}
424435
}
425436

426437
if (keys.size() == pageSize) {
@@ -484,7 +495,7 @@ private void deleteObjectAtPath(
484495
* @return the submitted future or null
485496
*/
486497
private CompletableFuture<Void> submitDelete(
487-
final List<DeleteObjectsRequest.KeyVersion> keyList,
498+
final List<DeleteEntry> keyList,
488499
final List<Path> pathList) {
489500

490501
if (keyList.isEmpty() && pathList.isEmpty()) {
@@ -514,31 +525,62 @@ private CompletableFuture<Void> submitDelete(
514525
@Retries.RetryTranslated
515526
private void asyncDeleteAction(
516527
final BulkOperationState state,
517-
final List<DeleteObjectsRequest.KeyVersion> keyList,
528+
final List<DeleteEntry> keyList,
518529
final List<Path> pathList,
519530
final boolean auditDeletedKeys)
520531
throws IOException {
532+
List<DeleteObjectsResult.DeletedObject> deletedObjects = new ArrayList<>();
521533
try (DurationInfo ignored =
522-
new DurationInfo(LOG, false, "Delete page of keys")) {
534+
new DurationInfo(LOG, false,
535+
"Delete page of %d keys", keyList.size())) {
523536
DeleteObjectsResult result = null;
524537
List<Path> undeletedObjects = new ArrayList<>();
525538
if (!keyList.isEmpty()) {
526-
result = Invoker.once("Remove S3 Keys",
539+
// first delete the files.
540+
List<DeleteObjectsRequest.KeyVersion> files = keyList.stream()
541+
.filter(e -> !e.isDirMarker)
542+
.map(e -> e.keyVersion)
543+
.collect(Collectors.toList());
544+
LOG.debug("Deleting of {} file objects", files.size());
545+
result = Invoker.once("Remove S3 Files",
527546
status.getPath().toString(),
528547
() -> callbacks.removeKeys(
529-
keyList,
548+
files,
530549
false,
531550
undeletedObjects,
532551
state,
533552
!auditDeletedKeys));
553+
if (result != null) {
554+
deletedObjects.addAll(result.getDeletedObjects());
555+
}
556+
// now the dirs
557+
List<DeleteObjectsRequest.KeyVersion> dirs = keyList.stream()
558+
.filter(e -> e.isDirMarker)
559+
.map(e -> e.keyVersion)
560+
.collect(Collectors.toList());
561+
LOG.debug("Deleting of {} directory markers", dirs.size());
562+
// This is invoked with deleteFakeDir = true, so
563+
// S3Guard is not updated.
564+
result = Invoker.once("Remove S3 Dir Markers",
565+
status.getPath().toString(),
566+
() -> callbacks.removeKeys(
567+
dirs,
568+
true,
569+
undeletedObjects,
570+
state,
571+
!auditDeletedKeys));
572+
if (result != null) {
573+
deletedObjects.addAll(result.getDeletedObjects());
574+
}
534575
}
535576
if (!pathList.isEmpty()) {
577+
// delete file paths only. This stops tombstones
578+
// being added until the final directory cleanup
579+
// (HADOOP-17244)
536580
metadataStore.deletePaths(pathList, state);
537581
}
538-
if (auditDeletedKeys && result != null) {
582+
if (auditDeletedKeys) {
539583
// audit the deleted keys
540-
List<DeleteObjectsResult.DeletedObject> deletedObjects =
541-
result.getDeletedObjects();
542584
if (deletedObjects.size() != keyList.size()) {
543585
// size mismatch
544586
LOG.warn("Size mismatch in deletion operation. "
@@ -549,13 +591,39 @@ private void asyncDeleteAction(
549591
for (DeleteObjectsResult.DeletedObject del : deletedObjects) {
550592
keyList.removeIf(kv -> kv.getKey().equals(del.getKey()));
551593
}
552-
for (DeleteObjectsRequest.KeyVersion kv : keyList) {
594+
for (DeleteEntry kv : keyList) {
553595
LOG.debug("{}", kv.getKey());
554596
}
555597
}
556598
}
557599
}
558600
}
559601

602+
/**
603+
* Deletion entry; dir marker state is tracked to control S3Guard
604+
* update policy.
605+
*/
606+
private static final class DeleteEntry {
607+
private final DeleteObjectsRequest.KeyVersion keyVersion;
608+
609+
private final boolean isDirMarker;
610+
611+
private DeleteEntry(final String key, final boolean isDirMarker) {
612+
this.keyVersion = new DeleteObjectsRequest.KeyVersion(key);
613+
this.isDirMarker = isDirMarker;
614+
}
615+
616+
public String getKey() {
617+
return keyVersion.getKey();
618+
}
619+
620+
@Override
621+
public String toString() {
622+
return "DeleteEntry{" +
623+
"key='" + getKey() + '\'' +
624+
", isDirMarker=" + isDirMarker +
625+
'}';
626+
}
627+
}
560628

561629
}

0 commit comments

Comments
 (0)