From 4513eabc78650d333dc1aaa00dbffcdca978df82 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Wed, 26 Aug 2020 20:51:02 +0530 Subject: [PATCH 1/5] HADOOP-17023 Tune S3AFileSystem.listStatus() api. S3AFileSystem.listStatus() to perform list operations directly and then fallback to head checks for files --- .../org/apache/hadoop/fs/s3a/Listing.java | 50 ++++++++++++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 71 ++++++++--------- .../fs/s3a/ITestS3AFileOperationCost.java | 77 ++++++++++++++++++- .../fs/s3a/performance/OperationCost.java | 3 + 4 files changed, 159 insertions(+), 42 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java index 16413a7620d0d..56f9d913cf511 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java @@ -24,6 +24,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang3.tuple.Triple; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.LocatedFileStatus; @@ -329,6 +330,55 @@ public RemoteIterator getLocatedFileStatusIteratorForDir( tombstones); } + /** + * Calculate list of file statuses assuming path + * to be a non-empty directory. + * @param path input path. + * @return Triple of file statuses, metaData, auth flag. + * @throws IOException Any IO problems. + */ + public Triple + getFileStatusesAssumingNonEmptyDir(Path path) + throws IOException { + String key = pathToKey(path); + List result; + if (!key.isEmpty()) { + key = key + '/'; + } + + boolean allowAuthoritative = listingOperationCallbacks + .allowAuthoritative(path); + DirListingMetadata dirMeta = + S3Guard.listChildrenWithTtl( + getStoreContext().getMetadataStore(), + path, + listingOperationCallbacks.getUpdatedTtlTimeProvider(), + allowAuthoritative); + // In auth mode return directly with auth flag. + if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) { + return Triple.of(S3Guard.dirMetaToStatuses(dirMeta), + dirMeta, Boolean.TRUE); + } + + S3ListRequest request = createListObjectsRequest(key, "/"); + LOG.debug("listStatus: doing listObjects for directory {}", key); + + Listing.FileStatusListingIterator files = createFileStatusListingIterator( + path, + request, + ACCEPT_ALL, + new Listing.AcceptAllButSelfAndS3nDirs(path)); + result = new ArrayList<>(files.getBatchSize()); + while (files.hasNext()) { + result.add(files.next()); + } + // return the results obtained from s3. + return Triple.of( + result.toArray(new S3AFileStatus[result.size()]), + dirMeta, + Boolean.FALSE); + } + public S3ListRequest createListObjectsRequest(String key, String delimiter) { return listingOperationCallbacks.createListObjectsRequest(key, delimiter); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 63c80bdd067e1..4dea13c8ec3ff 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -31,6 +31,7 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; @@ -2666,50 +2667,44 @@ public FileStatus[] listStatus(Path f) throws FileNotFoundException, * @throws AmazonClientException on failures inside the AWS SDK */ private S3AFileStatus[] innerListStatus(Path f) throws FileNotFoundException, - IOException, AmazonClientException { + IOException, AmazonClientException { Path path = qualify(f); - String key = pathToKey(path); LOG.debug("List status for path: {}", path); entryPoint(INVOCATION_LIST_STATUS); - List result; - final S3AFileStatus fileStatus = innerGetFileStatus(path, false, - StatusProbeEnum.ALL); - - if (fileStatus.isDirectory()) { - if (!key.isEmpty()) { - key = key + '/'; - } - - boolean allowAuthoritative = allowAuthoritative(f); - DirListingMetadata dirMeta = - S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider, - allowAuthoritative); - if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) { - return S3Guard.dirMetaToStatuses(dirMeta); - } - - S3ListRequest request = createListObjectsRequest(key, "/"); - LOG.debug("listStatus: doing listObjects for directory {}", key); - - Listing.FileStatusListingIterator files = - listing.createFileStatusListingIterator(path, - request, - ACCEPT_ALL, - new Listing.AcceptAllButSelfAndS3nDirs(path)); - result = new ArrayList<>(files.getBatchSize()); - while (files.hasNext()) { - result.add(files.next()); + Triple + statusesAssumingNonEmptyDir = listing + .getFileStatusesAssumingNonEmptyDir(path); + + if (statusesAssumingNonEmptyDir.getLeft().length == 0 && + statusesAssumingNonEmptyDir.getRight()) { + // We are sure that this is an empty directory in auth mode. + return statusesAssumingNonEmptyDir.getLeft(); + } + else if (statusesAssumingNonEmptyDir.getLeft().length == 0) { + // We may have an empty dir, or may have file or may have nothing. + // So we call innerGetFileStatus to get the status, this may throw + // FileNotFoundException if we have nothing. + // So We are guaranteed to have either a dir marker or a file. + final S3AFileStatus fileStatus = innerGetFileStatus(path, false, + StatusProbeEnum.ALL); + // If it is a file return directly. + if (fileStatus.isFile()) { + LOG.debug("Adding: rd (not a dir): {}", path); + S3AFileStatus[] stats = new S3AFileStatus[1]; + stats[0] = fileStatus; + return stats; } - // merge the results. This will update the store as needed - return S3Guard.dirListingUnion(metadataStore, path, result, dirMeta, - allowAuthoritative, ttlTimeProvider); - } else { - LOG.debug("Adding: rd (not a dir): {}", path); - S3AFileStatus[] stats = new S3AFileStatus[1]; - stats[0]= fileStatus; - return stats; } + // Here we have a directory which may or may not be empty. + // So we update the metastore and return. + return S3Guard.dirListingUnion( + metadataStore, + path, + Arrays.asList(statusesAssumingNonEmptyDir.getLeft()), + statusesAssumingNonEmptyDir.getMiddle(), + allowAuthoritative(path), + ttlTimeProvider); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java index 46e6f5fcea74f..5f3b77efb445e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java @@ -176,6 +176,77 @@ public void testCostOfListFilesOnNonExistingDir() throws Throwable { .plus(GET_FILE_STATUS_FNFE))); } + @Test + public void testCostOfListStatusOnFile() throws Throwable { + describe("Performing listStatus() on a file"); + Path file = path(getMethodName() + ".txt"); + S3AFileSystem fs = getFileSystem(); + touch(fs, file); + verifyMetrics(() -> + fs.listStatus(file), + whenRaw(LIST_STATUS_LIST_OP + .plus(GET_FILE_STATUS_ON_FILE)), + whenAuthoritative(LIST_STATUS_LIST_OP), + whenNonauth(LIST_STATUS_LIST_OP)); +// resetMetricDiffs(); +// fs.listStatus(file); +// if (!fs.hasMetadataStore()) { +// metadataRequests.assertDiffEquals(1); +// } +// listRequests.assertDiffEquals(1); + } + + @Test + public void testCostOfListStatusOnEmptyDir() throws Throwable { + describe("Performing listStatus() on an empty dir"); + Path dir = path(getMethodName()); + S3AFileSystem fs = getFileSystem(); + fs.mkdirs(dir); + verifyMetrics(() -> + fs.listStatus(dir), + whenRaw(LIST_STATUS_LIST_OP + .plus(GET_FILE_STATUS_ON_EMPTY_DIR)), + whenAuthoritative(NO_IO), + whenNonauth(LIST_STATUS_LIST_OP)); +// resetMetricDiffs(); +// fs.listStatus(dir); +// if (!fs.hasMetadataStore()) { +// verifyOperationCount(2, 1); +// } else { +// if (fs.allowAuthoritative(dir)) { +// verifyOperationCount(0, 0); +// } else { +// verifyOperationCount(0, 1); +// } +// } + } + + @Test + public void testCostOfListStatusOnNonEmptyDir() throws Throwable { + describe("Performing listStatus() on a non empty dir"); + Path dir = path(getMethodName()); + S3AFileSystem fs = getFileSystem(); + fs.mkdirs(dir); + Path file = new Path(dir, "file.txt"); + touch(fs, file); + verifyMetrics(() -> + fs.listStatus(dir), + whenRaw(LIST_STATUS_LIST_OP), + whenAuthoritative(NO_IO), + whenNonauth(LIST_STATUS_LIST_OP)); +// resetMetricDiffs(); +// fs.listStatus(dir); +// if (!fs.hasMetadataStore()) { +// verifyOperationCount(0, 1); +// } else { +// if (fs.allowAuthoritative(dir)) { +// verifyOperationCount(0, 0); +// } else { +// verifyOperationCount(0, 1); +// } +// } + } + @Test public void testCostOfGetFileStatusOnFile() throws Throwable { describe("performing getFileStatus on a file"); @@ -406,8 +477,7 @@ public void testCostOfGlobStatus() throws Throwable { fs.globStatus(basePath.suffix("/*")); // 2 head + 1 list from getFileStatus on path, // plus 1 list to match the glob pattern - verifyRaw(GET_FILE_STATUS_ON_DIR - .plus(LIST_OPERATION), + verifyRaw(LIST_STATUS_LIST_OP, () -> fs.globStatus(basePath.suffix("/*"))); } @@ -426,8 +496,7 @@ public void testCostOfGlobStatusNoSymlinkResolution() throws Throwable { // unguarded: 2 head + 1 list from getFileStatus on path, // plus 1 list to match the glob pattern // no additional operations from symlink resolution - verifyRaw(GET_FILE_STATUS_ON_DIR - .plus(LIST_OPERATION), + verifyRaw(LIST_STATUS_LIST_OP, () -> fs.globStatus(basePath.suffix("/*"))); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java index 46a6b712c49bf..d528111d3e901 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java @@ -110,6 +110,9 @@ public final class OperationCost { public static final OperationCost LIST_FILES_LIST_OP = new OperationCost(0, 1); + /** listStatus always does a LIST. */ + public static final OperationCost LIST_STATUS_LIST_OP = + new OperationCost(0, 1); /** * Metadata cost of a copy operation, as used during rename. * This happens even if the store is guarded. From 5a73520bb3bc7b5bbd8343d148fde544e64599c5 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Mon, 31 Aug 2020 16:36:39 +0530 Subject: [PATCH 2/5] Making innerListStatus to return RemoteIterator of S3AFileStatus --- .../org/apache/hadoop/fs/s3a/Listing.java | 23 +- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 26 +- .../org/apache/hadoop/fs/s3a/S3AUtils.java | 25 ++ .../apache/hadoop/fs/s3a/s3guard/S3Guard.java | 56 +-- .../fs/s3a/ITestS3AFileOperationCost.java | 28 -- .../apache/hadoop/fs/s3a/S3ATestUtils.java | 357 +++++++++++++++++ .../s3a/impl/TestPartialDeleteFailures.java | 366 +----------------- .../fs/s3a/performance/OperationCost.java | 6 +- .../hadoop/fs/s3a/s3guard/TestS3Guard.java | 99 ++++- 9 files changed, 528 insertions(+), 458 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java index 56f9d913cf511..4d9938a7c6d93 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java @@ -76,7 +76,7 @@ public class Listing extends AbstractStoreOperation { private static final Logger LOG = S3AFileSystem.LOG; - static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N = + public static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N = new AcceptAllButS3nDirs(); private final ListingOperationCallbacks listingOperationCallbacks; @@ -96,7 +96,7 @@ public Listing(ListingOperationCallbacks listingOperationCallbacks, * @param acceptor the file status acceptor * @return the file status iterator */ - ProvidedFileStatusIterator createProvidedFileStatusIterator( + public ProvidedFileStatusIterator createProvidedFileStatusIterator( S3AFileStatus[] fileStatuses, PathFilter filter, FileStatusAcceptor acceptor) { @@ -251,7 +251,7 @@ public RemoteIterator getListFilesAssumingDir( if (!forceNonAuthoritativeMS && allowAuthoritative && metadataStoreListFilesIterator.isRecursivelyAuthoritative()) { - S3AFileStatus[] statuses = S3Guard.iteratorToStatuses( + S3AFileStatus[] statuses = S3AUtils.iteratorToStatuses( metadataStoreListFilesIterator, tombstones); cachedFilesIterator = createProvidedFileStatusIterator( statuses, ACCEPT_ALL, acceptor); @@ -337,7 +337,7 @@ public RemoteIterator getLocatedFileStatusIteratorForDir( * @return Triple of file statuses, metaData, auth flag. * @throws IOException Any IO problems. */ - public Triple + public Triple, DirListingMetadata, Boolean> getFileStatusesAssumingNonEmptyDir(Path path) throws IOException { String key = pathToKey(path); @@ -356,25 +356,26 @@ public RemoteIterator getLocatedFileStatusIteratorForDir( allowAuthoritative); // In auth mode return directly with auth flag. if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) { - return Triple.of(S3Guard.dirMetaToStatuses(dirMeta), + ProvidedFileStatusIterator mfsItr = createProvidedFileStatusIterator( + S3Guard.dirMetaToStatuses(dirMeta), + ACCEPT_ALL, + Listing.ACCEPT_ALL_BUT_S3N); + return Triple.of(mfsItr, dirMeta, Boolean.TRUE); } S3ListRequest request = createListObjectsRequest(key, "/"); LOG.debug("listStatus: doing listObjects for directory {}", key); - Listing.FileStatusListingIterator files = createFileStatusListingIterator( + Listing.FileStatusListingIterator filesItr = createFileStatusListingIterator( path, request, ACCEPT_ALL, new Listing.AcceptAllButSelfAndS3nDirs(path)); - result = new ArrayList<>(files.getBatchSize()); - while (files.hasNext()) { - result.add(files.next()); - } + // return the results obtained from s3. return Triple.of( - result.toArray(new S3AFileStatus[result.size()]), + filesItr, dirMeta, Boolean.FALSE); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 4dea13c8ec3ff..0cfa7764344f4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -36,6 +36,7 @@ import java.util.Collections; import java.util.Date; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -2653,7 +2654,9 @@ void maybeCreateFakeParentDirectory(Path path) */ public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { - return once("listStatus", f.toString(), () -> innerListStatus(f)); + return once("listStatus", + f.toString(), + () -> iteratorToStatuses(innerListStatus(f), new HashSet<>())); } /** @@ -2666,22 +2669,22 @@ public FileStatus[] listStatus(Path f) throws FileNotFoundException, * @throws IOException due to an IO problem. * @throws AmazonClientException on failures inside the AWS SDK */ - private S3AFileStatus[] innerListStatus(Path f) throws FileNotFoundException, + private RemoteIterator innerListStatus(Path f) throws FileNotFoundException, IOException, AmazonClientException { Path path = qualify(f); LOG.debug("List status for path: {}", path); entryPoint(INVOCATION_LIST_STATUS); - Triple + Triple, DirListingMetadata, Boolean> statusesAssumingNonEmptyDir = listing .getFileStatusesAssumingNonEmptyDir(path); - if (statusesAssumingNonEmptyDir.getLeft().length == 0 && + if (!statusesAssumingNonEmptyDir.getLeft().hasNext() && statusesAssumingNonEmptyDir.getRight()) { // We are sure that this is an empty directory in auth mode. return statusesAssumingNonEmptyDir.getLeft(); } - else if (statusesAssumingNonEmptyDir.getLeft().length == 0) { + else if (!statusesAssumingNonEmptyDir.getLeft().hasNext()) { // We may have an empty dir, or may have file or may have nothing. // So we call innerGetFileStatus to get the status, this may throw // FileNotFoundException if we have nothing. @@ -2693,18 +2696,23 @@ else if (statusesAssumingNonEmptyDir.getLeft().length == 0) { LOG.debug("Adding: rd (not a dir): {}", path); S3AFileStatus[] stats = new S3AFileStatus[1]; stats[0] = fileStatus; - return stats; + return listing.createProvidedFileStatusIterator( + stats, + ACCEPT_ALL, + Listing.ACCEPT_ALL_BUT_S3N); } } // Here we have a directory which may or may not be empty. // So we update the metastore and return. - return S3Guard.dirListingUnion( + RemoteIterator combinedRes = S3Guard.dirListingUnion( metadataStore, path, - Arrays.asList(statusesAssumingNonEmptyDir.getLeft()), + statusesAssumingNonEmptyDir.getLeft(), statusesAssumingNonEmptyDir.getMiddle(), allowAuthoritative(path), - ttlTimeProvider); + ttlTimeProvider, + listing); + return combinedRes; } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 1d399505f5823..3e9115c12b3a0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -42,6 +42,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -1416,6 +1417,30 @@ private static void initUserAgent(Configuration conf, awsConf.setUserAgentPrefix(userAgent); } + /** + * Convert the data of an iterator of {@link S3AFileStatus} to + * an array. Given tombstones are filtered out. If the iterator + * does return any item, an empty array is returned. + * @param iterator a non-null iterator + * @param tombstones + * @return a possibly-empty array of file status entries + * @throws IOException + */ + public static S3AFileStatus[] iteratorToStatuses( + RemoteIterator iterator, Set tombstones) + throws IOException { + List statuses = new ArrayList<>(); + + while (iterator.hasNext()) { + S3AFileStatus status = iterator.next(); + if (!tombstones.contains(status.getPath())) { + statuses.add(status); + } + } + + return statuses.toArray(new S3AFileStatus[0]); + } + /** * An interface for use in lambda-expressions working with * directory tree listings. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java index ae5c293d639ff..cfea824f08d2c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java @@ -39,6 +39,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.Listing; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +59,7 @@ import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_AUTHORITATIVE_PATH; +import static org.apache.hadoop.fs.s3a.S3AUtils.ACCEPT_ALL; import static org.apache.hadoop.fs.s3a.S3AUtils.createUploadFileStatus; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.authoritativeEmptyDirectoryMarker; import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_BAD_CONFIGURATION; @@ -295,30 +297,6 @@ public static BulkOperationState initiateBulkWrite( } } - /** - * Convert the data of an iterator of {@link S3AFileStatus} to - * an array. Given tombstones are filtered out. If the iterator - * does return any item, an empty array is returned. - * @param iterator a non-null iterator - * @param tombstones - * @return a possibly-empty array of file status entries - * @throws IOException - */ - public static S3AFileStatus[] iteratorToStatuses( - RemoteIterator iterator, Set tombstones) - throws IOException { - List statuses = new ArrayList<>(); - - while (iterator.hasNext()) { - S3AFileStatus status = iterator.next(); - if (!tombstones.contains(status.getPath())) { - statuses.add(status); - } - } - - return statuses.toArray(new S3AFileStatus[0]); - } - /** * Convert the data of a directory listing to an array of {@link FileStatus} * entries. Tombstones are filtered out at this point. If the listing is null @@ -362,14 +340,16 @@ public static S3AFileStatus[] dirMetaToStatuses(DirListingMetadata dirMeta) { * @return Final result of directory listing. * @throws IOException if metadata store update failed */ - public static S3AFileStatus[] dirListingUnion(MetadataStore ms, Path path, - List backingStatuses, DirListingMetadata dirMeta, - boolean isAuthoritative, ITtlTimeProvider timeProvider) - throws IOException { + public static RemoteIterator dirListingUnion( + MetadataStore ms, Path path, + RemoteIterator backingStatuses, + DirListingMetadata dirMeta, boolean isAuthoritative, + ITtlTimeProvider timeProvider, Listing listing) + throws IOException { // Fast-path for NullMetadataStore if (isNullMetadataStore(ms)) { - return backingStatuses.toArray(new S3AFileStatus[backingStatuses.size()]); + return backingStatuses; } assertQualified(path); @@ -409,8 +389,10 @@ public static S3AFileStatus[] dirListingUnion(MetadataStore ms, Path path, timeProvider, operationState); } IOUtils.cleanupWithLogger(LOG, operationState); - - return dirMetaToStatuses(dirMeta); + return listing.createProvidedFileStatusIterator( + dirMetaToStatuses(dirMeta), + ACCEPT_ALL, + Listing.ACCEPT_ALL_BUT_S3N); } /** @@ -429,7 +411,7 @@ public static S3AFileStatus[] dirListingUnion(MetadataStore ms, Path path, private static void authoritativeUnion( final MetadataStore ms, final Path path, - final List backingStatuses, + final RemoteIterator backingStatuses, final DirListingMetadata dirMeta, final ITtlTimeProvider timeProvider, final BulkOperationState operationState) throws IOException { @@ -440,7 +422,8 @@ private static void authoritativeUnion( Set deleted = dirMeta.listTombstones(); final Map dirMetaMap = dirMeta.getListing().stream() .collect(Collectors.toMap(pm -> pm.getFileStatus().getPath(), pm -> pm)); - for (S3AFileStatus s : backingStatuses) { + while (backingStatuses.hasNext()) { + S3AFileStatus s = backingStatuses.next(); final Path statusPath = s.getPath(); if (deleted.contains(statusPath)) { continue; @@ -493,16 +476,17 @@ private static void authoritativeUnion( private static void nonAuthoritativeUnion( final MetadataStore ms, final Path path, - final List backingStatuses, + final RemoteIterator backingStatuses, final DirListingMetadata dirMeta, final ITtlTimeProvider timeProvider, final BulkOperationState operationState) throws IOException { - List entriesToAdd = new ArrayList<>(backingStatuses.size()); + List entriesToAdd = new ArrayList<>(); Set deleted = dirMeta.listTombstones(); final Map dirMetaMap = dirMeta.getListing().stream() .collect(Collectors.toMap(pm -> pm.getFileStatus().getPath(), pm -> pm)); - for (S3AFileStatus s : backingStatuses) { + while (backingStatuses.hasNext()) { + S3AFileStatus s = backingStatuses.next(); final Path statusPath = s.getPath(); if (deleted.contains(statusPath)) { continue; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java index 5f3b77efb445e..941e701333b09 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java @@ -188,12 +188,6 @@ public void testCostOfListStatusOnFile() throws Throwable { .plus(GET_FILE_STATUS_ON_FILE)), whenAuthoritative(LIST_STATUS_LIST_OP), whenNonauth(LIST_STATUS_LIST_OP)); -// resetMetricDiffs(); -// fs.listStatus(file); -// if (!fs.hasMetadataStore()) { -// metadataRequests.assertDiffEquals(1); -// } -// listRequests.assertDiffEquals(1); } @Test @@ -208,17 +202,6 @@ public void testCostOfListStatusOnEmptyDir() throws Throwable { .plus(GET_FILE_STATUS_ON_EMPTY_DIR)), whenAuthoritative(NO_IO), whenNonauth(LIST_STATUS_LIST_OP)); -// resetMetricDiffs(); -// fs.listStatus(dir); -// if (!fs.hasMetadataStore()) { -// verifyOperationCount(2, 1); -// } else { -// if (fs.allowAuthoritative(dir)) { -// verifyOperationCount(0, 0); -// } else { -// verifyOperationCount(0, 1); -// } -// } } @Test @@ -234,17 +217,6 @@ public void testCostOfListStatusOnNonEmptyDir() throws Throwable { whenRaw(LIST_STATUS_LIST_OP), whenAuthoritative(NO_IO), whenNonauth(LIST_STATUS_LIST_OP)); -// resetMetricDiffs(); -// fs.listStatus(dir); -// if (!fs.hasMetadataStore()) { -// verifyOperationCount(0, 1); -// } else { -// if (fs.allowAuthoritative(dir)) { -// verifyOperationCount(0, 0); -// } else { -// verifyOperationCount(0, 1); -// } -// } } @Test diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index f225800b872f3..3caf4b6f35373 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -35,18 +35,40 @@ import org.apache.hadoop.fs.s3a.auth.MarshalledCredentials; import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; +import org.apache.hadoop.fs.s3a.impl.ContextAccessors; +import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks; +import org.apache.hadoop.fs.s3a.impl.OperationCallbacks; import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum; +import org.apache.hadoop.fs.s3a.impl.StoreContext; +import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder; +import org.apache.hadoop.fs.s3a.impl.TestPartialDeleteFailures; +import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState; +import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata; +import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider; import org.apache.hadoop.fs.s3a.s3guard.MetadataStore; import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreCapabilities; +import org.apache.hadoop.fs.s3a.s3guard.PathMetadata; +import org.apache.hadoop.fs.s3a.s3guard.RenameTracker; +import org.apache.hadoop.fs.s3a.s3guard.S3Guard; import org.apache.hadoop.fs.s3native.S3xLoginHelper; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service; import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.util.BlockingThreadPoolExecutorService; import org.apache.hadoop.util.ReflectionUtils; +import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.DeleteObjectsResult; +import com.amazonaws.services.s3.model.MultiObjectDeleteException; +import com.amazonaws.services.s3.transfer.model.CopyResult; +import javax.annotation.Nullable; import org.hamcrest.core.Is; import org.junit.Assert; import org.junit.Assume; @@ -61,12 +83,15 @@ import java.net.URISyntaxException; import java.text.DateFormat; import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkNotNull; @@ -894,6 +919,49 @@ public static S3AFileStatus getStatusWithEmptyDirFlag( StatusProbeEnum.ALL); } + /** + * Create mock implementation of store context. + * @param multiDelete + * @param store + * @param accessors + * @return + * @throws URISyntaxException + * @throws IOException + */ + public static StoreContext createMockStoreContext( + boolean multiDelete, + OperationTrackingStore store, + ContextAccessors accessors) + throws URISyntaxException, IOException { + URI name = new URI("s3a://bucket"); + Configuration conf = new Configuration(); + return new StoreContextBuilder().setFsURI(name) + .setBucket("bucket") + .setConfiguration(conf) + .setUsername("alice") + .setOwner(UserGroupInformation.getCurrentUser()) + .setExecutor(BlockingThreadPoolExecutorService.newInstance( + 4, + 4, + 10, TimeUnit.SECONDS, + "s3a-transfer-shared")) + .setExecutorCapacity(DEFAULT_EXECUTOR_CAPACITY) + .setInvoker( + new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, Invoker.LOG_EVENT)) + .setInstrumentation(new S3AInstrumentation(name)) + .setStorageStatistics(new S3AStorageStatistics()) + .setInputPolicy(S3AInputPolicy.Normal) + .setChangeDetectionPolicy( + ChangeDetectionPolicy.createPolicy(ChangeDetectionPolicy.Mode.None, + ChangeDetectionPolicy.Source.ETag, false)) + .setMultiObjectDeleteEnabled(multiDelete) + .setMetadataStore(store) + .setUseListV1(false) + .setContextAccessors(accessors) + .setTimeProvider(new S3Guard.TtlTimeProvider(conf)) + .build(); + } + /** * Helper class to do diffs of metrics. */ @@ -1472,4 +1540,293 @@ public static S3AFileStatus innerGetFileStatus( needEmptyDirectoryFlag, probes); } + + public static class MinimalOperationCallbacks + implements OperationCallbacks { + @Override + public S3ObjectAttributes createObjectAttributes( + Path path, + String eTag, + String versionId, + long len) { + return null; + } + + @Override + public S3ObjectAttributes createObjectAttributes( + S3AFileStatus fileStatus) { + return null; + } + + @Override + public S3AReadOpContext createReadContext( + FileStatus fileStatus) { + return null; + } + + @Override + public void finishRename( + Path sourceRenamed, + Path destCreated) + throws IOException { + + } + + @Override + public void deleteObjectAtPath( + Path path, + String key, + boolean isFile, + BulkOperationState operationState) + throws IOException { + + } + + @Override + public RemoteIterator listFilesAndEmptyDirectories( + Path path, + S3AFileStatus status, + boolean collectTombstones, + boolean includeSelf) + throws IOException { + return null; + } + + @Override + public CopyResult copyFile( + String srcKey, + String destKey, + S3ObjectAttributes srcAttributes, + S3AReadOpContext readContext) + throws IOException { + return null; + } + + @Override + public DeleteObjectsResult removeKeys( + List keysToDelete, + boolean deleteFakeDir, + List undeletedObjectsOnFailure, + BulkOperationState operationState, + boolean quiet) + throws MultiObjectDeleteException, AmazonClientException, + IOException { + return null; + } + + @Override + public boolean allowAuthoritative(Path p) { + return false; + } + + @Override + public RemoteIterator listObjects( + Path path, + String key) + throws IOException { + return null; + } + } + + /** + * MetadataStore which tracks what is deleted and added. + */ + public static class OperationTrackingStore implements MetadataStore { + + private final List deleted = new ArrayList<>(); + + private final List created = new ArrayList<>(); + + @Override + public void initialize(final FileSystem fs, + ITtlTimeProvider ttlTimeProvider) { + } + + @Override + public void initialize(final Configuration conf, + ITtlTimeProvider ttlTimeProvider) { + } + + @Override + public void forgetMetadata(final Path path) { + } + + @Override + public PathMetadata get(final Path path) { + return null; + } + + @Override + public PathMetadata get(final Path path, + final boolean wantEmptyDirectoryFlag) { + return null; + } + + @Override + public DirListingMetadata listChildren(final Path path) { + return null; + } + + @Override + public void put(final PathMetadata meta) { + put(meta, null); + } + + @Override + public void put(final PathMetadata meta, + final BulkOperationState operationState) { + created.add(meta.getFileStatus().getPath()); + } + + @Override + public void put(final Collection metas, + final BulkOperationState operationState) { + metas.stream().forEach(meta -> put(meta, null)); + } + + @Override + public void put(final DirListingMetadata meta, + final List unchangedEntries, + final BulkOperationState operationState) { + created.add(meta.getPath()); + } + + @Override + public void destroy() { + } + + @Override + public void delete(final Path path, + final BulkOperationState operationState) { + deleted.add(path); + } + + @Override + public void deletePaths(final Collection paths, + @Nullable final BulkOperationState operationState) + throws IOException { + deleted.addAll(paths); + } + + @Override + public void deleteSubtree(final Path path, + final BulkOperationState operationState) { + + } + + @Override + public void move(@Nullable final Collection pathsToDelete, + @Nullable final Collection pathsToCreate, + @Nullable final BulkOperationState operationState) { + } + + @Override + public void prune(final PruneMode pruneMode, final long cutoff) { + } + + @Override + public long prune(final PruneMode pruneMode, + final long cutoff, + final String keyPrefix) { + return 0; + } + + @Override + public BulkOperationState initiateBulkWrite( + final BulkOperationState.OperationType operation, + final Path dest) { + return null; + } + + @Override + public void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) { + } + + @Override + public Map getDiagnostics() { + return null; + } + + @Override + public void updateParameters(final Map parameters) { + } + + @Override + public void close() { + } + + public List getDeleted() { + return deleted; + } + + public List getCreated() { + return created; + } + + @Override + public RenameTracker initiateRenameOperation( + final StoreContext storeContext, + final Path source, + final S3AFileStatus sourceStatus, + final Path dest) { + throw new UnsupportedOperationException("unsupported"); + } + + @Override + public void addAncestors(final Path qualifiedPath, + @Nullable final BulkOperationState operationState) { + + } + } + + public static class MinimalListingOperationCallbacks + implements ListingOperationCallbacks { + @Override + public CompletableFuture listObjectsAsync( + S3ListRequest request) + throws IOException { + return null; + } + + @Override + public CompletableFuture continueListObjectsAsync( + S3ListRequest request, + S3ListResult prevResult) + throws IOException { + return null; + } + + @Override + public S3ALocatedFileStatus toLocatedFileStatus( + S3AFileStatus status) throws IOException { + return null; + } + + @Override + public S3ListRequest createListObjectsRequest( + String key, + String delimiter) { + return null; + } + + @Override + public long getDefaultBlockSize(Path path) { + return 0; + } + + @Override + public int getMaxKeys() { + return 0; + } + + @Override + public ITtlTimeProvider getUpdatedTtlTimeProvider() { + return null; + } + + @Override + public boolean allowAuthoritative(Path p) { + return false; + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java index 0729f2ac289db..cdf79277f242b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java @@ -18,26 +18,15 @@ package org.apache.hadoop.fs.s3a.impl; -import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.model.DeleteObjectsRequest; -import com.amazonaws.services.s3.model.DeleteObjectsResult; import com.amazonaws.services.s3.model.MultiObjectDeleteException; -import com.amazonaws.services.s3.transfer.model.CopyResult; import com.google.common.collect.Lists; import org.assertj.core.api.Assertions; import org.junit.Before; @@ -45,32 +34,8 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Triple; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.fs.s3a.Constants; -import org.apache.hadoop.fs.s3a.Invoker; -import org.apache.hadoop.fs.s3a.S3AFileStatus; -import org.apache.hadoop.fs.s3a.S3AInputPolicy; -import org.apache.hadoop.fs.s3a.S3AInstrumentation; -import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus; -import org.apache.hadoop.fs.s3a.S3AReadOpContext; -import org.apache.hadoop.fs.s3a.S3AStorageStatistics; -import org.apache.hadoop.fs.s3a.S3ListRequest; -import org.apache.hadoop.fs.s3a.S3ListResult; -import org.apache.hadoop.fs.s3a.S3ObjectAttributes; -import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState; -import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata; -import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider; -import org.apache.hadoop.fs.s3a.s3guard.MetadataStore; -import org.apache.hadoop.fs.s3a.s3guard.PathMetadata; -import org.apache.hadoop.fs.s3a.s3guard.RenameTracker; -import org.apache.hadoop.fs.s3a.s3guard.S3Guard; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.BlockingThreadPoolExecutorService; +import org.apache.hadoop.fs.s3a.S3ATestUtils; import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.ACCESS_DENIED; import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.removeUndeletedPaths; @@ -93,8 +58,8 @@ private static Path qualifyKey(String k) { @Before public void setUp() throws Exception { - context = createMockStoreContext(true, - new OperationTrackingStore()); + context = S3ATestUtils.createMockStoreContext(true, + new S3ATestUtils.OperationTrackingStore(), CONTEXT_ACCESSORS); } @Test @@ -187,9 +152,10 @@ public void testProcessDeleteFailure() throws Throwable { final List deleteAllowed = Lists.newArrayList(pathA, pathAC); MultiObjectDeleteException ex = createDeleteException(ACCESS_DENIED, deleteForbidden); - OperationTrackingStore store - = new OperationTrackingStore(); - StoreContext storeContext = createMockStoreContext(true, store); + S3ATestUtils.OperationTrackingStore store + = new S3ATestUtils.OperationTrackingStore(); + StoreContext storeContext = S3ATestUtils + .createMockStoreContext(true, store, CONTEXT_ACCESSORS); MultiObjectDeleteSupport deleteSupport = new MultiObjectDeleteSupport(storeContext, null); Triple, List, List>> @@ -210,174 +176,6 @@ public void testProcessDeleteFailure() throws Throwable { } - private StoreContext createMockStoreContext(boolean multiDelete, - OperationTrackingStore store) throws URISyntaxException, IOException { - URI name = new URI("s3a://bucket"); - Configuration conf = new Configuration(); - return new StoreContextBuilder().setFsURI(name) - .setBucket("bucket") - .setConfiguration(conf) - .setUsername("alice") - .setOwner(UserGroupInformation.getCurrentUser()) - .setExecutor(BlockingThreadPoolExecutorService.newInstance( - 4, - 4, - 10, TimeUnit.SECONDS, - "s3a-transfer-shared")) - .setExecutorCapacity(Constants.DEFAULT_EXECUTOR_CAPACITY) - .setInvoker( - new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, Invoker.LOG_EVENT)) - .setInstrumentation(new S3AInstrumentation(name)) - .setStorageStatistics(new S3AStorageStatistics()) - .setInputPolicy(S3AInputPolicy.Normal) - .setChangeDetectionPolicy( - ChangeDetectionPolicy.createPolicy(ChangeDetectionPolicy.Mode.None, - ChangeDetectionPolicy.Source.ETag, false)) - .setMultiObjectDeleteEnabled(multiDelete) - .setMetadataStore(store) - .setUseListV1(false) - .setContextAccessors(CONTEXT_ACCESSORS) - .setTimeProvider(new S3Guard.TtlTimeProvider(conf)) - .build(); - } - - private static class MinimalListingOperationCallbacks - implements ListingOperationCallbacks { - @Override - public CompletableFuture listObjectsAsync( - S3ListRequest request) - throws IOException { - return null; - } - - @Override - public CompletableFuture continueListObjectsAsync( - S3ListRequest request, - S3ListResult prevResult) - throws IOException { - return null; - } - - @Override - public S3ALocatedFileStatus toLocatedFileStatus( - S3AFileStatus status) throws IOException { - return null; - } - - @Override - public S3ListRequest createListObjectsRequest( - String key, - String delimiter) { - return null; - } - - @Override - public long getDefaultBlockSize(Path path) { - return 0; - } - - @Override - public int getMaxKeys() { - return 0; - } - - @Override - public ITtlTimeProvider getUpdatedTtlTimeProvider() { - return null; - } - - @Override - public boolean allowAuthoritative(Path p) { - return false; - } - } - private static class MinimalOperationCallbacks - implements OperationCallbacks { - @Override - public S3ObjectAttributes createObjectAttributes( - Path path, - String eTag, - String versionId, - long len) { - return null; - } - - @Override - public S3ObjectAttributes createObjectAttributes( - S3AFileStatus fileStatus) { - return null; - } - - @Override - public S3AReadOpContext createReadContext( - FileStatus fileStatus) { - return null; - } - - @Override - public void finishRename( - Path sourceRenamed, - Path destCreated) - throws IOException { - - } - - @Override - public void deleteObjectAtPath( - Path path, - String key, - boolean isFile, - BulkOperationState operationState) - throws IOException { - - } - - @Override - public RemoteIterator listFilesAndEmptyDirectories( - Path path, - S3AFileStatus status, - boolean collectTombstones, - boolean includeSelf) - throws IOException { - return null; - } - - @Override - public CopyResult copyFile( - String srcKey, - String destKey, - S3ObjectAttributes srcAttributes, - S3AReadOpContext readContext) - throws IOException { - return null; - } - - @Override - public DeleteObjectsResult removeKeys( - List keysToDelete, - boolean deleteFakeDir, - List undeletedObjectsOnFailure, - BulkOperationState operationState, - boolean quiet) - throws MultiObjectDeleteException, AmazonClientException, - IOException { - return null; - } - - @Override - public boolean allowAuthoritative(Path p) { - return false; - } - - @Override - public RemoteIterator listObjects( - Path path, - String key) - throws IOException { - return null; - } - } - private static class MinimalContextAccessor implements ContextAccessors { @Override @@ -406,155 +204,5 @@ public Path makeQualified(final Path path) { return path; } } - /** - * MetadataStore which tracks what is deleted and added. - */ - private static class OperationTrackingStore implements MetadataStore { - - private final List deleted = new ArrayList<>(); - - private final List created = new ArrayList<>(); - - @Override - public void initialize(final FileSystem fs, - ITtlTimeProvider ttlTimeProvider) { - } - - @Override - public void initialize(final Configuration conf, - ITtlTimeProvider ttlTimeProvider) { - } - - @Override - public void forgetMetadata(final Path path) { - } - - @Override - public PathMetadata get(final Path path) { - return null; - } - - @Override - public PathMetadata get(final Path path, - final boolean wantEmptyDirectoryFlag) { - return null; - } - - @Override - public DirListingMetadata listChildren(final Path path) { - return null; - } - - @Override - public void put(final PathMetadata meta) { - put(meta, null); - } - - @Override - public void put(final PathMetadata meta, - final BulkOperationState operationState) { - created.add(meta.getFileStatus().getPath()); - } - - @Override - public void put(final Collection metas, - final BulkOperationState operationState) { - metas.stream().forEach(meta -> put(meta, null)); - } - - @Override - public void put(final DirListingMetadata meta, - final List unchangedEntries, - final BulkOperationState operationState) { - created.add(meta.getPath()); - } - - @Override - public void destroy() { - } - - @Override - public void delete(final Path path, - final BulkOperationState operationState) { - deleted.add(path); - } - - @Override - public void deletePaths(final Collection paths, - @Nullable final BulkOperationState operationState) - throws IOException { - deleted.addAll(paths); - } - - @Override - public void deleteSubtree(final Path path, - final BulkOperationState operationState) { - - } - - @Override - public void move(@Nullable final Collection pathsToDelete, - @Nullable final Collection pathsToCreate, - @Nullable final BulkOperationState operationState) { - } - - @Override - public void prune(final PruneMode pruneMode, final long cutoff) { - } - - @Override - public long prune(final PruneMode pruneMode, - final long cutoff, - final String keyPrefix) { - return 0; - } - - @Override - public BulkOperationState initiateBulkWrite( - final BulkOperationState.OperationType operation, - final Path dest) { - return null; - } - - @Override - public void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) { - } - - @Override - public Map getDiagnostics() { - return null; - } - - @Override - public void updateParameters(final Map parameters) { - } - - @Override - public void close() { - } - - public List getDeleted() { - return deleted; - } - - public List getCreated() { - return created; - } - - @Override - public RenameTracker initiateRenameOperation( - final StoreContext storeContext, - final Path source, - final S3AFileStatus sourceStatus, - final Path dest) { - throw new UnsupportedOperationException("unsupported"); - } - - @Override - public void addAncestors(final Path qualifiedPath, - @Nullable final BulkOperationState operationState) { - - } - } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java index d528111d3e901..54b68663fe9b8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/OperationCost.java @@ -107,12 +107,10 @@ public final class OperationCost { new OperationCost(0, 1); /** listFiles always does a LIST. */ - public static final OperationCost LIST_FILES_LIST_OP = - new OperationCost(0, 1); + public static final OperationCost LIST_FILES_LIST_OP = LIST_OPERATION; /** listStatus always does a LIST. */ - public static final OperationCost LIST_STATUS_LIST_OP = - new OperationCost(0, 1); + public static final OperationCost LIST_STATUS_LIST_OP = LIST_OPERATION; /** * Metadata cost of a copy operation, as used during rename. * This happens even if the store is guarded. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java index 672f3a9220954..dcebb7fe63cc4 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java @@ -18,11 +18,13 @@ package org.apache.hadoop.fs.s3a.s3guard; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.UUID; @@ -39,14 +41,21 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.Listing; import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.fs.s3a.S3AUtils; import org.apache.hadoop.fs.s3a.Tristate; +import org.apache.hadoop.fs.s3a.impl.ContextAccessors; import org.apache.hadoop.service.launcher.LauncherExitCodes; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.ExitUtil; import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL; import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL; +import static org.apache.hadoop.fs.s3a.S3AUtils.ACCEPT_ALL; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.createMockStoreContext; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -72,6 +81,11 @@ public class TestS3Guard extends Assert { private ITtlTimeProvider timeProvider; + private static final ContextAccessors CONTEXT_ACCESSORS + = new MinimalContextAccessor(); + + private Listing listing; + @Before public void setUp() throws Exception { final Configuration conf = new Configuration(false); @@ -79,7 +93,11 @@ public void setUp() throws Exception { ms.initialize(conf, new S3Guard.TtlTimeProvider(conf)); timeProvider = new S3Guard.TtlTimeProvider( DEFAULT_METADATASTORE_METADATA_TTL); - + listing = new Listing(new S3ATestUtils.MinimalListingOperationCallbacks(), + createMockStoreContext( + true, + new S3ATestUtils.OperationTrackingStore(), + CONTEXT_ACCESSORS)); } @After @@ -108,9 +126,16 @@ public void testDirListingUnionNonauth() throws Exception { List s3Listing = Arrays.asList( s1Status, s2Status); + RemoteIterator storeItr = + listing.createProvidedFileStatusIterator( + s3Listing.toArray(new S3AFileStatus[0]), + ACCEPT_ALL, + Listing.ACCEPT_ALL_BUT_S3N); - FileStatus[] result = S3Guard.dirListingUnion(ms, DIR_PATH, s3Listing, - dirMeta, false, timeProvider); + RemoteIterator resultItr = S3Guard + .dirListingUnion(ms, DIR_PATH, storeItr, dirMeta, + false, timeProvider, listing); + S3AFileStatus[] result = S3AUtils.iteratorToStatuses(resultItr, new HashSet<>()); assertEquals("listing length", 4, result.length); assertContainsPaths(result, MS_FILE_1, MS_FILE_2, S3_FILE_3, S3_DIR_4); @@ -124,9 +149,17 @@ public void testDirListingUnionNonauth() throws Exception { S3AFileStatus f1Status2 = new S3AFileStatus( 200, System.currentTimeMillis(), new Path(MS_FILE_1), 1, null, "tag2", "ver2"); - FileStatus[] result2 = S3Guard.dirListingUnion(ms, DIR_PATH, - Arrays.asList(f1Status2), - dirMeta, false, timeProvider); + S3AFileStatus[] f1Statuses = new S3AFileStatus[1]; + f1Statuses[0] = f1Status2; + RemoteIterator itr = listing. + createProvidedFileStatusIterator( + f1Statuses, + ACCEPT_ALL, + Listing.ACCEPT_ALL_BUT_S3N); + FileStatus[] result2 = S3AUtils.iteratorToStatuses( + S3Guard.dirListingUnion(ms, DIR_PATH, itr, dirMeta, + false, timeProvider, listing), + new HashSet<>()); // the listing returns the new status Assertions.assertThat(find(result2, MS_FILE_1)) .describedAs("Entry in listing results for %s", MS_FILE_1) @@ -159,9 +192,17 @@ public void testDirListingUnionAuth() throws Exception { ITtlTimeProvider timeProvider = new S3Guard.TtlTimeProvider( DEFAULT_METADATASTORE_METADATA_TTL); - FileStatus[] result = S3Guard.dirListingUnion(ms, DIR_PATH, s3Listing, - dirMeta, true, timeProvider); + RemoteIterator storeItr = + listing.createProvidedFileStatusIterator( + s3Listing.toArray(new S3AFileStatus[0]), + ACCEPT_ALL, + Listing.ACCEPT_ALL_BUT_S3N); + RemoteIterator resultItr = S3Guard + .dirListingUnion(ms, DIR_PATH, storeItr, dirMeta, + true, timeProvider, listing); + + S3AFileStatus[] result = S3AUtils.iteratorToStatuses(resultItr, new HashSet<>()); assertEquals("listing length", 4, result.length); assertContainsPaths(result, MS_FILE_1, MS_FILE_2, S3_FILE_3, S3_DIR_4); @@ -181,13 +222,22 @@ public void testDirListingUnionAuth() throws Exception { S3AFileStatus s1Status2 = new S3AFileStatus( 200, System.currentTimeMillis(), new Path(S3_FILE_3), 1, null, "tag2", "ver2"); + S3AFileStatus[] f1Statuses = new S3AFileStatus[1]; + f1Statuses[0] = s1Status2; + RemoteIterator itr = listing. + createProvidedFileStatusIterator( + f1Statuses, + ACCEPT_ALL, + Listing.ACCEPT_ALL_BUT_S3N); + + FileStatus[] result2 = S3AUtils.iteratorToStatuses( + S3Guard.dirListingUnion(ms, DIR_PATH, itr, dirMeta, + true, timeProvider, listing), + new HashSet<>()); // but the result of the listing contains the old entry // because auth mode doesn't pick up changes in S3 which // didn't go through s3guard - FileStatus[] result2 = S3Guard.dirListingUnion(ms, DIR_PATH, - Arrays.asList(s1Status2), - dirMeta2, true, timeProvider); Assertions.assertThat(find(result2, S3_FILE_3)) .describedAs("Entry in listing results for %s", S3_FILE_3) .isSameAs(file3Meta.getFileStatus()); @@ -488,4 +538,31 @@ private S3AFileStatus makeFileStatus(String pathStr, boolean isDir) { } return fileStatus; } + + private static class MinimalContextAccessor implements ContextAccessors { + @Override + public Path keyToPath(String key) { + return null; + } + + @Override + public String pathToKey(Path path) { + return null; + } + + @Override + public File createTempFile(String prefix, long size) throws IOException { + return null; + } + + @Override + public String getBucketLocation() throws IOException { + return null; + } + + @Override + public Path makeQualified(Path path) { + return null; + } + } } From 290d48ca15c47cec51c4e599b36dd7497d69bde2 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Tue, 8 Sep 2020 14:11:21 +0530 Subject: [PATCH 3/5] Adding empty commit for yetus to run From 6a561817fff5c051b3d787406e727c49d90c9483 Mon Sep 17 00:00:00 2001 From: Mukund Madhav Thakur Date: Wed, 16 Sep 2020 23:36:14 +0530 Subject: [PATCH 4/5] S3Guard to be unaware of Listing class --- .../org/apache/hadoop/fs/s3a/Listing.java | 19 +++- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 15 ++- .../apache/hadoop/fs/s3a/s3guard/S3Guard.java | 13 +-- .../apache/hadoop/fs/s3a/S3ATestUtils.java | 1 - .../hadoop/fs/s3a/s3guard/TestS3Guard.java | 106 ++++++------------ 5 files changed, 63 insertions(+), 91 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java index 4d9938a7c6d93..52f0e6288a10f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java @@ -76,7 +76,7 @@ public class Listing extends AbstractStoreOperation { private static final Logger LOG = S3AFileSystem.LOG; - public static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N = + static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N = new AcceptAllButS3nDirs(); private final ListingOperationCallbacks listingOperationCallbacks; @@ -96,13 +96,26 @@ public Listing(ListingOperationCallbacks listingOperationCallbacks, * @param acceptor the file status acceptor * @return the file status iterator */ - public ProvidedFileStatusIterator createProvidedFileStatusIterator( + ProvidedFileStatusIterator createProvidedFileStatusIterator( S3AFileStatus[] fileStatuses, PathFilter filter, FileStatusAcceptor acceptor) { return new ProvidedFileStatusIterator(fileStatuses, filter, acceptor); } + /** + * Create a FileStatus iterator against a provided list of file status. + * @param fileStatuses array of file status. + * @return + */ + @VisibleForTesting + public static ProvidedFileStatusIterator toProvidedFileStatusIterator( + S3AFileStatus[] fileStatuses) { + return new ProvidedFileStatusIterator(fileStatuses, + ACCEPT_ALL, + Listing.ACCEPT_ALL_BUT_S3N); + } + /** * Create a FileStatus iterator against a path, with a given list object * request. @@ -367,7 +380,7 @@ public RemoteIterator getLocatedFileStatusIteratorForDir( S3ListRequest request = createListObjectsRequest(key, "/"); LOG.debug("listStatus: doing listObjects for directory {}", key); - Listing.FileStatusListingIterator filesItr = createFileStatusListingIterator( + FileStatusListingIterator filesItr = createFileStatusListingIterator( path, request, ACCEPT_ALL, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 0cfa7764344f4..6f4585e7822ab 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -31,7 +31,6 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; @@ -187,6 +186,7 @@ import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404; import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion; import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.logDnsLookup; +import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.dirMetaToStatuses; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; /** @@ -2669,7 +2669,8 @@ public FileStatus[] listStatus(Path f) throws FileNotFoundException, * @throws IOException due to an IO problem. * @throws AmazonClientException on failures inside the AWS SDK */ - private RemoteIterator innerListStatus(Path f) throws FileNotFoundException, + private RemoteIterator innerListStatus(Path f) + throws FileNotFoundException, IOException, AmazonClientException { Path path = qualify(f); LOG.debug("List status for path: {}", path); @@ -2683,8 +2684,7 @@ private RemoteIterator innerListStatus(Path f) throws FileNotFoun statusesAssumingNonEmptyDir.getRight()) { // We are sure that this is an empty directory in auth mode. return statusesAssumingNonEmptyDir.getLeft(); - } - else if (!statusesAssumingNonEmptyDir.getLeft().hasNext()) { + } else if (!statusesAssumingNonEmptyDir.getLeft().hasNext()) { // We may have an empty dir, or may have file or may have nothing. // So we call innerGetFileStatus to get the status, this may throw // FileNotFoundException if we have nothing. @@ -2711,7 +2711,10 @@ else if (!statusesAssumingNonEmptyDir.getLeft().hasNext()) { statusesAssumingNonEmptyDir.getMiddle(), allowAuthoritative(path), ttlTimeProvider, - listing); + p -> listing.createProvidedFileStatusIterator( + dirMetaToStatuses(statusesAssumingNonEmptyDir.getMiddle()), + ACCEPT_ALL, + Listing.ACCEPT_ALL_BUT_S3N)); return combinedRes; } @@ -4500,7 +4503,7 @@ private RemoteIterator getLocatedFileStatusIteratorForDir( : null; final RemoteIterator cachedFileStatusIterator = listing.createProvidedFileStatusIterator( - S3Guard.dirMetaToStatuses(meta), filter, acceptor); + dirMetaToStatuses(meta), filter, acceptor); return (allowAuthoritative && meta != null && meta.isAuthoritative()) ? listing.createLocatedFileStatusIterator( diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java index cfea824f08d2c..78cedc293ac92 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java @@ -39,7 +39,6 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.fs.s3a.Listing; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +58,6 @@ import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_AUTHORITATIVE_PATH; -import static org.apache.hadoop.fs.s3a.S3AUtils.ACCEPT_ALL; import static org.apache.hadoop.fs.s3a.S3AUtils.createUploadFileStatus; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.authoritativeEmptyDirectoryMarker; import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_BAD_CONFIGURATION; @@ -337,6 +335,8 @@ public static S3AFileStatus[] dirMetaToStatuses(DirListingMetadata dirMeta) { * @param dirMeta Directory listing from MetadataStore. May be null. * @param isAuthoritative State of authoritative mode * @param timeProvider Time provider to use when updating entries + * @param toStatusItr function to convert array of file status to + * RemoteIterator. * @return Final result of directory listing. * @throws IOException if metadata store update failed */ @@ -344,7 +344,8 @@ public static RemoteIterator dirListingUnion( MetadataStore ms, Path path, RemoteIterator backingStatuses, DirListingMetadata dirMeta, boolean isAuthoritative, - ITtlTimeProvider timeProvider, Listing listing) + ITtlTimeProvider timeProvider, + Function> toStatusItr) throws IOException { // Fast-path for NullMetadataStore @@ -389,10 +390,8 @@ public static RemoteIterator dirListingUnion( timeProvider, operationState); } IOUtils.cleanupWithLogger(LOG, operationState); - return listing.createProvidedFileStatusIterator( - dirMetaToStatuses(dirMeta), - ACCEPT_ALL, - Listing.ACCEPT_ALL_BUT_S3N); + + return toStatusItr.apply(dirMetaToStatuses(dirMeta)); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 3caf4b6f35373..f057449547281 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -42,7 +42,6 @@ import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum; import org.apache.hadoop.fs.s3a.impl.StoreContext; import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder; -import org.apache.hadoop.fs.s3a.impl.TestPartialDeleteFailures; import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState; import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata; import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java index dcebb7fe63cc4..1b5761c68d8d9 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java @@ -18,7 +18,6 @@ package org.apache.hadoop.fs.s3a.s3guard; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -42,20 +41,17 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.fs.s3a.Listing; import org.apache.hadoop.fs.s3a.S3AFileStatus; -import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.s3a.S3AUtils; import org.apache.hadoop.fs.s3a.Tristate; -import org.apache.hadoop.fs.s3a.impl.ContextAccessors; import org.apache.hadoop.service.launcher.LauncherExitCodes; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.ExitUtil; import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL; import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL; -import static org.apache.hadoop.fs.s3a.S3AUtils.ACCEPT_ALL; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.createMockStoreContext; +import static org.apache.hadoop.fs.s3a.Listing.toProvidedFileStatusIterator; +import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.dirMetaToStatuses; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -81,11 +77,6 @@ public class TestS3Guard extends Assert { private ITtlTimeProvider timeProvider; - private static final ContextAccessors CONTEXT_ACCESSORS - = new MinimalContextAccessor(); - - private Listing listing; - @Before public void setUp() throws Exception { final Configuration conf = new Configuration(false); @@ -93,11 +84,6 @@ public void setUp() throws Exception { ms.initialize(conf, new S3Guard.TtlTimeProvider(conf)); timeProvider = new S3Guard.TtlTimeProvider( DEFAULT_METADATASTORE_METADATA_TTL); - listing = new Listing(new S3ATestUtils.MinimalListingOperationCallbacks(), - createMockStoreContext( - true, - new S3ATestUtils.OperationTrackingStore(), - CONTEXT_ACCESSORS)); } @After @@ -126,16 +112,14 @@ public void testDirListingUnionNonauth() throws Exception { List s3Listing = Arrays.asList( s1Status, s2Status); - RemoteIterator storeItr = - listing.createProvidedFileStatusIterator( - s3Listing.toArray(new S3AFileStatus[0]), - ACCEPT_ALL, - Listing.ACCEPT_ALL_BUT_S3N); - - RemoteIterator resultItr = S3Guard - .dirListingUnion(ms, DIR_PATH, storeItr, dirMeta, - false, timeProvider, listing); - S3AFileStatus[] result = S3AUtils.iteratorToStatuses(resultItr, new HashSet<>()); + RemoteIterator storeItr = toProvidedFileStatusIterator( + s3Listing.toArray(new S3AFileStatus[0])); + RemoteIterator resultItr = S3Guard.dirListingUnion( + ms, DIR_PATH, storeItr, dirMeta, false, + timeProvider, s3AFileStatuses -> + toProvidedFileStatusIterator(dirMetaToStatuses(dirMeta))); + S3AFileStatus[] result = S3AUtils.iteratorToStatuses( + resultItr, new HashSet<>()); assertEquals("listing length", 4, result.length); assertContainsPaths(result, MS_FILE_1, MS_FILE_2, S3_FILE_3, S3_DIR_4); @@ -151,14 +135,15 @@ public void testDirListingUnionNonauth() throws Exception { 1, null, "tag2", "ver2"); S3AFileStatus[] f1Statuses = new S3AFileStatus[1]; f1Statuses[0] = f1Status2; - RemoteIterator itr = listing. - createProvidedFileStatusIterator( - f1Statuses, - ACCEPT_ALL, - Listing.ACCEPT_ALL_BUT_S3N); + RemoteIterator itr = toProvidedFileStatusIterator( + f1Statuses); FileStatus[] result2 = S3AUtils.iteratorToStatuses( - S3Guard.dirListingUnion(ms, DIR_PATH, itr, dirMeta, - false, timeProvider, listing), + S3Guard.dirListingUnion( + ms, DIR_PATH, itr, dirMeta, + false, timeProvider, + s3AFileStatuses -> + toProvidedFileStatusIterator( + dirMetaToStatuses(dirMeta))), new HashSet<>()); // the listing returns the new status Assertions.assertThat(find(result2, MS_FILE_1)) @@ -193,16 +178,17 @@ public void testDirListingUnionAuth() throws Exception { ITtlTimeProvider timeProvider = new S3Guard.TtlTimeProvider( DEFAULT_METADATASTORE_METADATA_TTL); - RemoteIterator storeItr = - listing.createProvidedFileStatusIterator( - s3Listing.toArray(new S3AFileStatus[0]), - ACCEPT_ALL, - Listing.ACCEPT_ALL_BUT_S3N); + RemoteIterator storeItr = toProvidedFileStatusIterator( + s3Listing.toArray(new S3AFileStatus[0])); RemoteIterator resultItr = S3Guard .dirListingUnion(ms, DIR_PATH, storeItr, dirMeta, - true, timeProvider, listing); + true, timeProvider, + s3AFileStatuses -> + toProvidedFileStatusIterator( + dirMetaToStatuses(dirMeta))); - S3AFileStatus[] result = S3AUtils.iteratorToStatuses(resultItr, new HashSet<>()); + S3AFileStatus[] result = S3AUtils.iteratorToStatuses( + resultItr, new HashSet<>()); assertEquals("listing length", 4, result.length); assertContainsPaths(result, MS_FILE_1, MS_FILE_2, S3_FILE_3, S3_DIR_4); @@ -224,15 +210,14 @@ public void testDirListingUnionAuth() throws Exception { 1, null, "tag2", "ver2"); S3AFileStatus[] f1Statuses = new S3AFileStatus[1]; f1Statuses[0] = s1Status2; - RemoteIterator itr = listing. - createProvidedFileStatusIterator( - f1Statuses, - ACCEPT_ALL, - Listing.ACCEPT_ALL_BUT_S3N); - + RemoteIterator itr = + toProvidedFileStatusIterator(f1Statuses); FileStatus[] result2 = S3AUtils.iteratorToStatuses( S3Guard.dirListingUnion(ms, DIR_PATH, itr, dirMeta, - true, timeProvider, listing), + true, timeProvider, + s3AFileStatuses -> + toProvidedFileStatusIterator( + dirMetaToStatuses(dirMeta))), new HashSet<>()); // but the result of the listing contains the old entry @@ -538,31 +523,4 @@ private S3AFileStatus makeFileStatus(String pathStr, boolean isDir) { } return fileStatus; } - - private static class MinimalContextAccessor implements ContextAccessors { - @Override - public Path keyToPath(String key) { - return null; - } - - @Override - public String pathToKey(Path path) { - return null; - } - - @Override - public File createTempFile(String prefix, long size) throws IOException { - return null; - } - - @Override - public String getBucketLocation() throws IOException { - return null; - } - - @Override - public Path makeQualified(Path path) { - return null; - } - } } From 10731301042912e9caf69bd62ddbbd1b9ec3d076 Mon Sep 17 00:00:00 2001 From: Mukund Madhav Thakur Date: Fri, 18 Sep 2020 13:40:21 +0530 Subject: [PATCH 5/5] Fixing checkstyle and javadoc --- .../java/org/apache/hadoop/fs/s3a/Listing.java | 2 +- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 7 +++---- .../hadoop/fs/s3a/s3guard/TestS3Guard.java | 18 +++++++++--------- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java index 52f0e6288a10f..20ed288bdfec5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java @@ -106,7 +106,7 @@ ProvidedFileStatusIterator createProvidedFileStatusIterator( /** * Create a FileStatus iterator against a provided list of file status. * @param fileStatuses array of file status. - * @return + * @return the file status iterator. */ @VisibleForTesting public static ProvidedFileStatusIterator toProvidedFileStatusIterator( diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 6f4585e7822ab..86f2a889a9f42 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -2704,18 +2704,17 @@ private RemoteIterator innerListStatus(Path f) } // Here we have a directory which may or may not be empty. // So we update the metastore and return. - RemoteIterator combinedRes = S3Guard.dirListingUnion( + return S3Guard.dirListingUnion( metadataStore, path, statusesAssumingNonEmptyDir.getLeft(), statusesAssumingNonEmptyDir.getMiddle(), allowAuthoritative(path), - ttlTimeProvider, - p -> listing.createProvidedFileStatusIterator( + ttlTimeProvider, p -> + listing.createProvidedFileStatusIterator( dirMetaToStatuses(statusesAssumingNonEmptyDir.getMiddle()), ACCEPT_ALL, Listing.ACCEPT_ALL_BUT_S3N)); - return combinedRes; } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java index 1b5761c68d8d9..eaa363bbf19b9 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java @@ -141,9 +141,9 @@ public void testDirListingUnionNonauth() throws Exception { S3Guard.dirListingUnion( ms, DIR_PATH, itr, dirMeta, false, timeProvider, - s3AFileStatuses -> - toProvidedFileStatusIterator( - dirMetaToStatuses(dirMeta))), + s3AFileStatuses -> + toProvidedFileStatusIterator( + dirMetaToStatuses(dirMeta))), new HashSet<>()); // the listing returns the new status Assertions.assertThat(find(result2, MS_FILE_1)) @@ -183,9 +183,9 @@ public void testDirListingUnionAuth() throws Exception { RemoteIterator resultItr = S3Guard .dirListingUnion(ms, DIR_PATH, storeItr, dirMeta, true, timeProvider, - s3AFileStatuses -> - toProvidedFileStatusIterator( - dirMetaToStatuses(dirMeta))); + s3AFileStatuses -> + toProvidedFileStatusIterator( + dirMetaToStatuses(dirMeta))); S3AFileStatus[] result = S3AUtils.iteratorToStatuses( resultItr, new HashSet<>()); @@ -215,9 +215,9 @@ public void testDirListingUnionAuth() throws Exception { FileStatus[] result2 = S3AUtils.iteratorToStatuses( S3Guard.dirListingUnion(ms, DIR_PATH, itr, dirMeta, true, timeProvider, - s3AFileStatuses -> - toProvidedFileStatusIterator( - dirMetaToStatuses(dirMeta))), + s3AFileStatuses -> + toProvidedFileStatusIterator( + dirMetaToStatuses(dirMeta))), new HashSet<>()); // but the result of the listing contains the old entry