Skip to content

Commit 7e642ec

Browse files
mukund-thakursteveloughran
authored andcommitted
HADOOP-17023. Tune S3AFileSystem.listStatus() (#2257)
S3AFileSystem.listStatus() is optimized for invocations where the path supplied is a non-empty directory. The number of S3 requests is significantly reduced, saving time, money, and reducing the risk of S3 throttling. Contributed by Mukund Thakur. Change-Id: I7cc5f87aa16a4819e245e0fbd2aad226bd500f3f
1 parent e5e9139 commit 7e642ec

File tree

9 files changed

+612
-454
lines changed

9 files changed

+612
-454
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.amazonaws.services.s3.model.S3ObjectSummary;
2525
import com.google.common.annotations.VisibleForTesting;
2626

27+
import org.apache.commons.lang3.tuple.Triple;
2728
import org.apache.hadoop.classification.InterfaceAudience;
2829
import org.apache.hadoop.fs.FileStatus;
2930
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -102,6 +103,19 @@ ProvidedFileStatusIterator createProvidedFileStatusIterator(
102103
return new ProvidedFileStatusIterator(fileStatuses, filter, acceptor);
103104
}
104105

106+
/**
107+
* Create a FileStatus iterator against a provided list of file status.
108+
* @param fileStatuses array of file status.
109+
* @return the file status iterator.
110+
*/
111+
@VisibleForTesting
112+
public static ProvidedFileStatusIterator toProvidedFileStatusIterator(
113+
S3AFileStatus[] fileStatuses) {
114+
return new ProvidedFileStatusIterator(fileStatuses,
115+
ACCEPT_ALL,
116+
Listing.ACCEPT_ALL_BUT_S3N);
117+
}
118+
105119
/**
106120
* Create a FileStatus iterator against a path, with a given list object
107121
* request.
@@ -250,7 +264,7 @@ public RemoteIterator<S3ALocatedFileStatus> getListFilesAssumingDir(
250264
if (!forceNonAuthoritativeMS &&
251265
allowAuthoritative &&
252266
metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
253-
S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
267+
S3AFileStatus[] statuses = S3AUtils.iteratorToStatuses(
254268
metadataStoreListFilesIterator, tombstones);
255269
cachedFilesIterator = createProvidedFileStatusIterator(
256270
statuses, ACCEPT_ALL, acceptor);
@@ -329,6 +343,56 @@ public RemoteIterator<S3ALocatedFileStatus> getLocatedFileStatusIteratorForDir(
329343
tombstones);
330344
}
331345

346+
/**
347+
* Calculate list of file statuses assuming path
348+
* to be a non-empty directory.
349+
* @param path input path.
350+
* @return Triple of file statuses, metaData, auth flag.
351+
* @throws IOException Any IO problems.
352+
*/
353+
public Triple<RemoteIterator<S3AFileStatus>, DirListingMetadata, Boolean>
354+
getFileStatusesAssumingNonEmptyDir(Path path)
355+
throws IOException {
356+
String key = pathToKey(path);
357+
List<S3AFileStatus> result;
358+
if (!key.isEmpty()) {
359+
key = key + '/';
360+
}
361+
362+
boolean allowAuthoritative = listingOperationCallbacks
363+
.allowAuthoritative(path);
364+
DirListingMetadata dirMeta =
365+
S3Guard.listChildrenWithTtl(
366+
getStoreContext().getMetadataStore(),
367+
path,
368+
listingOperationCallbacks.getUpdatedTtlTimeProvider(),
369+
allowAuthoritative);
370+
// In auth mode return directly with auth flag.
371+
if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
372+
ProvidedFileStatusIterator mfsItr = createProvidedFileStatusIterator(
373+
S3Guard.dirMetaToStatuses(dirMeta),
374+
ACCEPT_ALL,
375+
Listing.ACCEPT_ALL_BUT_S3N);
376+
return Triple.of(mfsItr,
377+
dirMeta, Boolean.TRUE);
378+
}
379+
380+
S3ListRequest request = createListObjectsRequest(key, "/");
381+
LOG.debug("listStatus: doing listObjects for directory {}", key);
382+
383+
FileStatusListingIterator filesItr = createFileStatusListingIterator(
384+
path,
385+
request,
386+
ACCEPT_ALL,
387+
new Listing.AcceptAllButSelfAndS3nDirs(path));
388+
389+
// return the results obtained from s3.
390+
return Triple.of(
391+
filesItr,
392+
dirMeta,
393+
Boolean.FALSE);
394+
}
395+
332396
public S3ListRequest createListObjectsRequest(String key, String delimiter) {
333397
return listingOperationCallbacks.createListObjectsRequest(key, delimiter);
334398
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 46 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.Collections;
3636
import java.util.Date;
3737
import java.util.EnumSet;
38+
import java.util.HashSet;
3839
import java.util.List;
3940
import java.util.Map;
4041
import java.util.Optional;
@@ -185,6 +186,7 @@
185186
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
186187
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion;
187188
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.logDnsLookup;
189+
import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.dirMetaToStatuses;
188190
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
189191

190192
/**
@@ -2652,7 +2654,9 @@ void maybeCreateFakeParentDirectory(Path path)
26522654
*/
26532655
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
26542656
IOException {
2655-
return once("listStatus", f.toString(), () -> innerListStatus(f));
2657+
return once("listStatus",
2658+
f.toString(),
2659+
() -> iteratorToStatuses(innerListStatus(f), new HashSet<>()));
26562660
}
26572661

26582662
/**
@@ -2665,51 +2669,52 @@ public FileStatus[] listStatus(Path f) throws FileNotFoundException,
26652669
* @throws IOException due to an IO problem.
26662670
* @throws AmazonClientException on failures inside the AWS SDK
26672671
*/
2668-
private S3AFileStatus[] innerListStatus(Path f) throws FileNotFoundException,
2669-
IOException, AmazonClientException {
2672+
private RemoteIterator<S3AFileStatus> innerListStatus(Path f)
2673+
throws FileNotFoundException,
2674+
IOException, AmazonClientException {
26702675
Path path = qualify(f);
2671-
String key = pathToKey(path);
26722676
LOG.debug("List status for path: {}", path);
26732677
entryPoint(INVOCATION_LIST_STATUS);
26742678

2675-
List<S3AFileStatus> result;
2676-
final S3AFileStatus fileStatus = innerGetFileStatus(path, false,
2677-
StatusProbeEnum.ALL);
2678-
2679-
if (fileStatus.isDirectory()) {
2680-
if (!key.isEmpty()) {
2681-
key = key + '/';
2679+
Triple<RemoteIterator<S3AFileStatus>, DirListingMetadata, Boolean>
2680+
statusesAssumingNonEmptyDir = listing
2681+
.getFileStatusesAssumingNonEmptyDir(path);
2682+
2683+
if (!statusesAssumingNonEmptyDir.getLeft().hasNext() &&
2684+
statusesAssumingNonEmptyDir.getRight()) {
2685+
// We are sure that this is an empty directory in auth mode.
2686+
return statusesAssumingNonEmptyDir.getLeft();
2687+
} else if (!statusesAssumingNonEmptyDir.getLeft().hasNext()) {
2688+
// We may have an empty dir, or may have file or may have nothing.
2689+
// So we call innerGetFileStatus to get the status, this may throw
2690+
// FileNotFoundException if we have nothing.
2691+
// So We are guaranteed to have either a dir marker or a file.
2692+
final S3AFileStatus fileStatus = innerGetFileStatus(path, false,
2693+
StatusProbeEnum.ALL);
2694+
// If it is a file return directly.
2695+
if (fileStatus.isFile()) {
2696+
LOG.debug("Adding: rd (not a dir): {}", path);
2697+
S3AFileStatus[] stats = new S3AFileStatus[1];
2698+
stats[0] = fileStatus;
2699+
return listing.createProvidedFileStatusIterator(
2700+
stats,
2701+
ACCEPT_ALL,
2702+
Listing.ACCEPT_ALL_BUT_S3N);
26822703
}
2683-
2684-
boolean allowAuthoritative = allowAuthoritative(f);
2685-
DirListingMetadata dirMeta =
2686-
S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider,
2687-
allowAuthoritative);
2688-
if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
2689-
return S3Guard.dirMetaToStatuses(dirMeta);
2690-
}
2691-
2692-
S3ListRequest request = createListObjectsRequest(key, "/");
2693-
LOG.debug("listStatus: doing listObjects for directory {}", key);
2694-
2695-
Listing.FileStatusListingIterator files =
2696-
listing.createFileStatusListingIterator(path,
2697-
request,
2698-
ACCEPT_ALL,
2699-
new Listing.AcceptAllButSelfAndS3nDirs(path));
2700-
result = new ArrayList<>(files.getBatchSize());
2701-
while (files.hasNext()) {
2702-
result.add(files.next());
2703-
}
2704-
// merge the results. This will update the store as needed
2705-
return S3Guard.dirListingUnion(metadataStore, path, result, dirMeta,
2706-
allowAuthoritative, ttlTimeProvider);
2707-
} else {
2708-
LOG.debug("Adding: rd (not a dir): {}", path);
2709-
S3AFileStatus[] stats = new S3AFileStatus[1];
2710-
stats[0]= fileStatus;
2711-
return stats;
27122704
}
2705+
// Here we have a directory which may or may not be empty.
2706+
// So we update the metastore and return.
2707+
return S3Guard.dirListingUnion(
2708+
metadataStore,
2709+
path,
2710+
statusesAssumingNonEmptyDir.getLeft(),
2711+
statusesAssumingNonEmptyDir.getMiddle(),
2712+
allowAuthoritative(path),
2713+
ttlTimeProvider, p ->
2714+
listing.createProvidedFileStatusIterator(
2715+
dirMetaToStatuses(statusesAssumingNonEmptyDir.getMiddle()),
2716+
ACCEPT_ALL,
2717+
Listing.ACCEPT_ALL_BUT_S3N));
27132718
}
27142719

27152720
/**
@@ -4497,7 +4502,7 @@ private RemoteIterator<S3ALocatedFileStatus> getLocatedFileStatusIteratorForDir(
44974502
: null;
44984503
final RemoteIterator<S3AFileStatus> cachedFileStatusIterator =
44994504
listing.createProvidedFileStatusIterator(
4500-
S3Guard.dirMetaToStatuses(meta), filter, acceptor);
4505+
dirMetaToStatuses(meta), filter, acceptor);
45014506
return (allowAuthoritative && meta != null
45024507
&& meta.isAuthoritative())
45034508
? listing.createLocatedFileStatusIterator(

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.hadoop.classification.InterfaceAudience;
4343
import org.apache.hadoop.classification.InterfaceStability;
4444
import org.apache.hadoop.conf.Configuration;
45+
import org.apache.hadoop.fs.FileStatus;
4546
import org.apache.hadoop.fs.FileSystem;
4647
import org.apache.hadoop.fs.LocatedFileStatus;
4748
import org.apache.hadoop.fs.Path;
@@ -1416,6 +1417,30 @@ private static void initUserAgent(Configuration conf,
14161417
awsConf.setUserAgentPrefix(userAgent);
14171418
}
14181419

1420+
/**
1421+
* Convert the data of an iterator of {@link S3AFileStatus} to
1422+
* an array. Given tombstones are filtered out. If the iterator
1423+
* does return any item, an empty array is returned.
1424+
* @param iterator a non-null iterator
1425+
* @param tombstones
1426+
* @return a possibly-empty array of file status entries
1427+
* @throws IOException
1428+
*/
1429+
public static S3AFileStatus[] iteratorToStatuses(
1430+
RemoteIterator<S3AFileStatus> iterator, Set<Path> tombstones)
1431+
throws IOException {
1432+
List<FileStatus> statuses = new ArrayList<>();
1433+
1434+
while (iterator.hasNext()) {
1435+
S3AFileStatus status = iterator.next();
1436+
if (!tombstones.contains(status.getPath())) {
1437+
statuses.add(status);
1438+
}
1439+
}
1440+
1441+
return statuses.toArray(new S3AFileStatus[0]);
1442+
}
1443+
14191444
/**
14201445
* An interface for use in lambda-expressions working with
14211446
* directory tree listings.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java

Lines changed: 18 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -295,30 +295,6 @@ public static BulkOperationState initiateBulkWrite(
295295
}
296296
}
297297

298-
/**
299-
* Convert the data of an iterator of {@link S3AFileStatus} to
300-
* an array. Given tombstones are filtered out. If the iterator
301-
* does return any item, an empty array is returned.
302-
* @param iterator a non-null iterator
303-
* @param tombstones
304-
* @return a possibly-empty array of file status entries
305-
* @throws IOException
306-
*/
307-
public static S3AFileStatus[] iteratorToStatuses(
308-
RemoteIterator<S3AFileStatus> iterator, Set<Path> tombstones)
309-
throws IOException {
310-
List<FileStatus> statuses = new ArrayList<>();
311-
312-
while (iterator.hasNext()) {
313-
S3AFileStatus status = iterator.next();
314-
if (!tombstones.contains(status.getPath())) {
315-
statuses.add(status);
316-
}
317-
}
318-
319-
return statuses.toArray(new S3AFileStatus[0]);
320-
}
321-
322298
/**
323299
* Convert the data of a directory listing to an array of {@link FileStatus}
324300
* entries. Tombstones are filtered out at this point. If the listing is null
@@ -359,17 +335,22 @@ public static S3AFileStatus[] dirMetaToStatuses(DirListingMetadata dirMeta) {
359335
* @param dirMeta Directory listing from MetadataStore. May be null.
360336
* @param isAuthoritative State of authoritative mode
361337
* @param timeProvider Time provider to use when updating entries
338+
* @param toStatusItr function to convert array of file status to
339+
* RemoteIterator.
362340
* @return Final result of directory listing.
363341
* @throws IOException if metadata store update failed
364342
*/
365-
public static S3AFileStatus[] dirListingUnion(MetadataStore ms, Path path,
366-
List<S3AFileStatus> backingStatuses, DirListingMetadata dirMeta,
367-
boolean isAuthoritative, ITtlTimeProvider timeProvider)
368-
throws IOException {
343+
public static RemoteIterator<S3AFileStatus> dirListingUnion(
344+
MetadataStore ms, Path path,
345+
RemoteIterator<S3AFileStatus> backingStatuses,
346+
DirListingMetadata dirMeta, boolean isAuthoritative,
347+
ITtlTimeProvider timeProvider,
348+
Function<S3AFileStatus[], RemoteIterator<S3AFileStatus>> toStatusItr)
349+
throws IOException {
369350

370351
// Fast-path for NullMetadataStore
371352
if (isNullMetadataStore(ms)) {
372-
return backingStatuses.toArray(new S3AFileStatus[backingStatuses.size()]);
353+
return backingStatuses;
373354
}
374355

375356
assertQualified(path);
@@ -410,7 +391,7 @@ public static S3AFileStatus[] dirListingUnion(MetadataStore ms, Path path,
410391
}
411392
IOUtils.cleanupWithLogger(LOG, operationState);
412393

413-
return dirMetaToStatuses(dirMeta);
394+
return toStatusItr.apply(dirMetaToStatuses(dirMeta));
414395
}
415396

416397
/**
@@ -429,7 +410,7 @@ public static S3AFileStatus[] dirListingUnion(MetadataStore ms, Path path,
429410
private static void authoritativeUnion(
430411
final MetadataStore ms,
431412
final Path path,
432-
final List<S3AFileStatus> backingStatuses,
413+
final RemoteIterator<S3AFileStatus> backingStatuses,
433414
final DirListingMetadata dirMeta,
434415
final ITtlTimeProvider timeProvider,
435416
final BulkOperationState operationState) throws IOException {
@@ -440,7 +421,8 @@ private static void authoritativeUnion(
440421
Set<Path> deleted = dirMeta.listTombstones();
441422
final Map<Path, PathMetadata> dirMetaMap = dirMeta.getListing().stream()
442423
.collect(Collectors.toMap(pm -> pm.getFileStatus().getPath(), pm -> pm));
443-
for (S3AFileStatus s : backingStatuses) {
424+
while (backingStatuses.hasNext()) {
425+
S3AFileStatus s = backingStatuses.next();
444426
final Path statusPath = s.getPath();
445427
if (deleted.contains(statusPath)) {
446428
continue;
@@ -493,16 +475,17 @@ private static void authoritativeUnion(
493475
private static void nonAuthoritativeUnion(
494476
final MetadataStore ms,
495477
final Path path,
496-
final List<S3AFileStatus> backingStatuses,
478+
final RemoteIterator<S3AFileStatus> backingStatuses,
497479
final DirListingMetadata dirMeta,
498480
final ITtlTimeProvider timeProvider,
499481
final BulkOperationState operationState) throws IOException {
500-
List<PathMetadata> entriesToAdd = new ArrayList<>(backingStatuses.size());
482+
List<PathMetadata> entriesToAdd = new ArrayList<>();
501483
Set<Path> deleted = dirMeta.listTombstones();
502484

503485
final Map<Path, PathMetadata> dirMetaMap = dirMeta.getListing().stream()
504486
.collect(Collectors.toMap(pm -> pm.getFileStatus().getPath(), pm -> pm));
505-
for (S3AFileStatus s : backingStatuses) {
487+
while (backingStatuses.hasNext()) {
488+
S3AFileStatus s = backingStatuses.next();
506489
final Path statusPath = s.getPath();
507490
if (deleted.contains(statusPath)) {
508491
continue;

0 commit comments

Comments
 (0)