Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.annotations.VisibleForTesting;

import org.apache.commons.lang3.tuple.Triple;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
Expand Down Expand Up @@ -102,6 +103,19 @@ ProvidedFileStatusIterator createProvidedFileStatusIterator(
return new ProvidedFileStatusIterator(fileStatuses, filter, acceptor);
}

/**
* Create a FileStatus iterator against a provided list of file status.
* @param fileStatuses array of file status.
* @return the file status iterator.
*/
@VisibleForTesting
public static ProvidedFileStatusIterator toProvidedFileStatusIterator(
S3AFileStatus[] fileStatuses) {
return new ProvidedFileStatusIterator(fileStatuses,
ACCEPT_ALL,
Listing.ACCEPT_ALL_BUT_S3N);
}

/**
* Create a FileStatus iterator against a path, with a given list object
* request.
Expand Down Expand Up @@ -250,7 +264,7 @@ public RemoteIterator<S3ALocatedFileStatus> getListFilesAssumingDir(
if (!forceNonAuthoritativeMS &&
allowAuthoritative &&
metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
S3AFileStatus[] statuses = S3AUtils.iteratorToStatuses(
metadataStoreListFilesIterator, tombstones);
cachedFilesIterator = createProvidedFileStatusIterator(
statuses, ACCEPT_ALL, acceptor);
Expand Down Expand Up @@ -329,6 +343,56 @@ public RemoteIterator<S3ALocatedFileStatus> getLocatedFileStatusIteratorForDir(
tombstones);
}

/**
* Calculate list of file statuses assuming path
* to be a non-empty directory.
* @param path input path.
* @return Triple of file statuses, metaData, auth flag.
* @throws IOException Any IO problems.
*/
public Triple<RemoteIterator<S3AFileStatus>, DirListingMetadata, Boolean>
getFileStatusesAssumingNonEmptyDir(Path path)
throws IOException {
String key = pathToKey(path);
List<S3AFileStatus> result;
if (!key.isEmpty()) {
key = key + '/';
}

boolean allowAuthoritative = listingOperationCallbacks
.allowAuthoritative(path);
DirListingMetadata dirMeta =
S3Guard.listChildrenWithTtl(
getStoreContext().getMetadataStore(),
path,
listingOperationCallbacks.getUpdatedTtlTimeProvider(),
allowAuthoritative);
// In auth mode return directly with auth flag.
if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
ProvidedFileStatusIterator mfsItr = createProvidedFileStatusIterator(
S3Guard.dirMetaToStatuses(dirMeta),
ACCEPT_ALL,
Listing.ACCEPT_ALL_BUT_S3N);
return Triple.of(mfsItr,
dirMeta, Boolean.TRUE);
}

S3ListRequest request = createListObjectsRequest(key, "/");
LOG.debug("listStatus: doing listObjects for directory {}", key);

FileStatusListingIterator filesItr = createFileStatusListingIterator(
path,
request,
ACCEPT_ALL,
new Listing.AcceptAllButSelfAndS3nDirs(path));

// return the results obtained from s3.
return Triple.of(
filesItr,
dirMeta,
Boolean.FALSE);
}

public S3ListRequest createListObjectsRequest(String key, String delimiter) {
return listingOperationCallbacks.createListObjectsRequest(key, delimiter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -185,6 +186,7 @@
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.logDnsLookup;
import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.dirMetaToStatuses;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;

/**
Expand Down Expand Up @@ -2652,7 +2654,9 @@ void maybeCreateFakeParentDirectory(Path path)
*/
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException {
return once("listStatus", f.toString(), () -> innerListStatus(f));
return once("listStatus",
f.toString(),
() -> iteratorToStatuses(innerListStatus(f), new HashSet<>()));
}

/**
Expand All @@ -2665,51 +2669,52 @@ public FileStatus[] listStatus(Path f) throws FileNotFoundException,
* @throws IOException due to an IO problem.
* @throws AmazonClientException on failures inside the AWS SDK
*/
private S3AFileStatus[] innerListStatus(Path f) throws FileNotFoundException,
IOException, AmazonClientException {
private RemoteIterator<S3AFileStatus> innerListStatus(Path f)
throws FileNotFoundException,
IOException, AmazonClientException {
Path path = qualify(f);
String key = pathToKey(path);
LOG.debug("List status for path: {}", path);
entryPoint(INVOCATION_LIST_STATUS);

List<S3AFileStatus> result;
final S3AFileStatus fileStatus = innerGetFileStatus(path, false,
StatusProbeEnum.ALL);

if (fileStatus.isDirectory()) {
if (!key.isEmpty()) {
key = key + '/';
Triple<RemoteIterator<S3AFileStatus>, DirListingMetadata, Boolean>
statusesAssumingNonEmptyDir = listing
.getFileStatusesAssumingNonEmptyDir(path);

if (!statusesAssumingNonEmptyDir.getLeft().hasNext() &&
statusesAssumingNonEmptyDir.getRight()) {
// We are sure that this is an empty directory in auth mode.
return statusesAssumingNonEmptyDir.getLeft();
} else if (!statusesAssumingNonEmptyDir.getLeft().hasNext()) {
// We may have an empty dir, or may have file or may have nothing.
// So we call innerGetFileStatus to get the status, this may throw
// FileNotFoundException if we have nothing.
// So We are guaranteed to have either a dir marker or a file.
final S3AFileStatus fileStatus = innerGetFileStatus(path, false,
StatusProbeEnum.ALL);
// If it is a file return directly.
if (fileStatus.isFile()) {
LOG.debug("Adding: rd (not a dir): {}", path);
S3AFileStatus[] stats = new S3AFileStatus[1];
stats[0] = fileStatus;
return listing.createProvidedFileStatusIterator(
stats,
ACCEPT_ALL,
Listing.ACCEPT_ALL_BUT_S3N);
}

boolean allowAuthoritative = allowAuthoritative(f);
DirListingMetadata dirMeta =
S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider,
allowAuthoritative);
if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
return S3Guard.dirMetaToStatuses(dirMeta);
}

S3ListRequest request = createListObjectsRequest(key, "/");
LOG.debug("listStatus: doing listObjects for directory {}", key);

Listing.FileStatusListingIterator files =
listing.createFileStatusListingIterator(path,
request,
ACCEPT_ALL,
new Listing.AcceptAllButSelfAndS3nDirs(path));
result = new ArrayList<>(files.getBatchSize());
while (files.hasNext()) {
result.add(files.next());
}
// merge the results. This will update the store as needed
return S3Guard.dirListingUnion(metadataStore, path, result, dirMeta,
allowAuthoritative, ttlTimeProvider);
} else {
LOG.debug("Adding: rd (not a dir): {}", path);
S3AFileStatus[] stats = new S3AFileStatus[1];
stats[0]= fileStatus;
return stats;
}
// Here we have a directory which may or may not be empty.
// So we update the metastore and return.
return S3Guard.dirListingUnion(
metadataStore,
path,
statusesAssumingNonEmptyDir.getLeft(),
statusesAssumingNonEmptyDir.getMiddle(),
allowAuthoritative(path),
ttlTimeProvider, p ->
listing.createProvidedFileStatusIterator(
dirMetaToStatuses(statusesAssumingNonEmptyDir.getMiddle()),
ACCEPT_ALL,
Listing.ACCEPT_ALL_BUT_S3N));
}

/**
Expand Down Expand Up @@ -4497,7 +4502,7 @@ private RemoteIterator<S3ALocatedFileStatus> getLocatedFileStatusIteratorForDir(
: null;
final RemoteIterator<S3AFileStatus> cachedFileStatusIterator =
listing.createProvidedFileStatusIterator(
S3Guard.dirMetaToStatuses(meta), filter, acceptor);
dirMetaToStatuses(meta), filter, acceptor);
return (allowAuthoritative && meta != null
&& meta.isAuthoritative())
? listing.createLocatedFileStatusIterator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -1416,6 +1417,30 @@ private static void initUserAgent(Configuration conf,
awsConf.setUserAgentPrefix(userAgent);
}

/**
* Convert the data of an iterator of {@link S3AFileStatus} to
* an array. Given tombstones are filtered out. If the iterator
* does return any item, an empty array is returned.
* @param iterator a non-null iterator
* @param tombstones
* @return a possibly-empty array of file status entries
* @throws IOException
*/
public static S3AFileStatus[] iteratorToStatuses(
RemoteIterator<S3AFileStatus> iterator, Set<Path> tombstones)
throws IOException {
List<FileStatus> statuses = new ArrayList<>();

while (iterator.hasNext()) {
S3AFileStatus status = iterator.next();
if (!tombstones.contains(status.getPath())) {
statuses.add(status);
}
}

return statuses.toArray(new S3AFileStatus[0]);
}

/**
* An interface for use in lambda-expressions working with
* directory tree listings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,30 +295,6 @@ public static BulkOperationState initiateBulkWrite(
}
}

/**
* Convert the data of an iterator of {@link S3AFileStatus} to
* an array. Given tombstones are filtered out. If the iterator
* does return any item, an empty array is returned.
* @param iterator a non-null iterator
* @param tombstones
* @return a possibly-empty array of file status entries
* @throws IOException
*/
public static S3AFileStatus[] iteratorToStatuses(
RemoteIterator<S3AFileStatus> iterator, Set<Path> tombstones)
throws IOException {
List<FileStatus> statuses = new ArrayList<>();

while (iterator.hasNext()) {
S3AFileStatus status = iterator.next();
if (!tombstones.contains(status.getPath())) {
statuses.add(status);
}
}

return statuses.toArray(new S3AFileStatus[0]);
}

/**
* Convert the data of a directory listing to an array of {@link FileStatus}
* entries. Tombstones are filtered out at this point. If the listing is null
Expand Down Expand Up @@ -359,17 +335,22 @@ public static S3AFileStatus[] dirMetaToStatuses(DirListingMetadata dirMeta) {
* @param dirMeta Directory listing from MetadataStore. May be null.
* @param isAuthoritative State of authoritative mode
* @param timeProvider Time provider to use when updating entries
* @param toStatusItr function to convert array of file status to
* RemoteIterator.
* @return Final result of directory listing.
* @throws IOException if metadata store update failed
*/
public static S3AFileStatus[] dirListingUnion(MetadataStore ms, Path path,
List<S3AFileStatus> backingStatuses, DirListingMetadata dirMeta,
boolean isAuthoritative, ITtlTimeProvider timeProvider)
throws IOException {
public static RemoteIterator<S3AFileStatus> dirListingUnion(
MetadataStore ms, Path path,
RemoteIterator<S3AFileStatus> backingStatuses,
DirListingMetadata dirMeta, boolean isAuthoritative,
ITtlTimeProvider timeProvider,
Function<S3AFileStatus[], RemoteIterator<S3AFileStatus>> toStatusItr)
throws IOException {

// Fast-path for NullMetadataStore
if (isNullMetadataStore(ms)) {
return backingStatuses.toArray(new S3AFileStatus[backingStatuses.size()]);
return backingStatuses;
}

assertQualified(path);
Expand Down Expand Up @@ -410,7 +391,7 @@ public static S3AFileStatus[] dirListingUnion(MetadataStore ms, Path path,
}
IOUtils.cleanupWithLogger(LOG, operationState);

return dirMetaToStatuses(dirMeta);
return toStatusItr.apply(dirMetaToStatuses(dirMeta));
}

/**
Expand All @@ -429,7 +410,7 @@ public static S3AFileStatus[] dirListingUnion(MetadataStore ms, Path path,
private static void authoritativeUnion(
final MetadataStore ms,
final Path path,
final List<S3AFileStatus> backingStatuses,
final RemoteIterator<S3AFileStatus> backingStatuses,
final DirListingMetadata dirMeta,
final ITtlTimeProvider timeProvider,
final BulkOperationState operationState) throws IOException {
Expand All @@ -440,7 +421,8 @@ private static void authoritativeUnion(
Set<Path> deleted = dirMeta.listTombstones();
final Map<Path, PathMetadata> dirMetaMap = dirMeta.getListing().stream()
.collect(Collectors.toMap(pm -> pm.getFileStatus().getPath(), pm -> pm));
for (S3AFileStatus s : backingStatuses) {
while (backingStatuses.hasNext()) {
S3AFileStatus s = backingStatuses.next();
final Path statusPath = s.getPath();
if (deleted.contains(statusPath)) {
continue;
Expand Down Expand Up @@ -493,16 +475,17 @@ private static void authoritativeUnion(
private static void nonAuthoritativeUnion(
final MetadataStore ms,
final Path path,
final List<S3AFileStatus> backingStatuses,
final RemoteIterator<S3AFileStatus> backingStatuses,
final DirListingMetadata dirMeta,
final ITtlTimeProvider timeProvider,
final BulkOperationState operationState) throws IOException {
List<PathMetadata> entriesToAdd = new ArrayList<>(backingStatuses.size());
List<PathMetadata> entriesToAdd = new ArrayList<>();
Set<Path> deleted = dirMeta.listTombstones();

final Map<Path, PathMetadata> dirMetaMap = dirMeta.getListing().stream()
.collect(Collectors.toMap(pm -> pm.getFileStatus().getPath(), pm -> pm));
for (S3AFileStatus s : backingStatuses) {
while (backingStatuses.hasNext()) {
S3AFileStatus s = backingStatuses.next();
final Path statusPath = s.getPath();
if (deleted.contains(statusPath)) {
continue;
Expand Down
Loading