Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1891,15 +1891,15 @@
<name>fs.s3a.change.detection.mode</name>
<value>server</value>
<description>
Determines how change detection is applied to alert to S3 objects
rewritten while being read. Value 'server' indicates to apply the attribute
constraint directly on GetObject requests to S3. Value 'client' means to do a
client-side comparison of the attribute value returned in the response. Value
'server' would not work with third-party S3 implementations that do not
support these constraints on GetObject. Values 'server' and 'client' generate
RemoteObjectChangedException when a mismatch is detected. Value 'warn' works
like 'client' but generates only a warning. Value 'none' will ignore change
detection completely.
Determines how change detection is applied to alert to inconsistent S3
objects read during or after an overwrite. Value 'server' indicates to apply
the attribute constraint directly on GetObject requests to S3. Value 'client'
means to do a client-side comparison of the attribute value returned in the
response. Value 'server' would not work with third-party S3 implementations
that do not support these constraints on GetObject. Values 'server' and
'client' generate RemoteObjectChangedException when a mismatch is detected.
Value 'warn' works like 'client' but generates only a warning. Value 'none'
will ignore change detection completely.
</description>
</property>

Expand Down
5 changes: 5 additions & 0 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,11 @@
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Listing(S3AFileSystem owner) {
* @return the file status iterator
*/
ProvidedFileStatusIterator createProvidedFileStatusIterator(
FileStatus[] fileStatuses,
S3AFileStatus[] fileStatuses,
PathFilter filter,
FileStatusAcceptor acceptor) {
return new ProvidedFileStatusIterator(fileStatuses, filter, acceptor);
Expand Down Expand Up @@ -114,7 +114,7 @@ FileStatusListingIterator createFileStatusListingIterator(
S3ListRequest request,
PathFilter filter,
Listing.FileStatusAcceptor acceptor,
RemoteIterator<FileStatus> providedStatus) throws IOException {
RemoteIterator<S3AFileStatus> providedStatus) throws IOException {
return new FileStatusListingIterator(
new ObjectListingIterator(listPath, request),
filter,
Expand All @@ -129,7 +129,7 @@ FileStatusListingIterator createFileStatusListingIterator(
*/
@VisibleForTesting
LocatedFileStatusIterator createLocatedFileStatusIterator(
RemoteIterator<FileStatus> statusIterator) {
RemoteIterator<S3AFileStatus> statusIterator) {
return new LocatedFileStatusIterator(statusIterator);
}

Expand All @@ -143,7 +143,7 @@ LocatedFileStatusIterator createLocatedFileStatusIterator(
*/
@VisibleForTesting
TombstoneReconcilingIterator createTombstoneReconcilingIterator(
RemoteIterator<LocatedFileStatus> iterator, Set<Path> tombstones) {
RemoteIterator<S3ALocatedFileStatus> iterator, Set<Path> tombstones) {
return new TombstoneReconcilingIterator(iterator, tombstones);
}

Expand Down Expand Up @@ -189,19 +189,19 @@ interface FileStatusAcceptor {
* iterator returned.
*/
static final class SingleStatusRemoteIterator
implements RemoteIterator<LocatedFileStatus> {
implements RemoteIterator<S3ALocatedFileStatus> {

/**
* The status to return; set to null after the first iteration.
*/
private LocatedFileStatus status;
private S3ALocatedFileStatus status;

/**
* Constructor.
* @param status status value: may be null, in which case
* the iterator is empty.
*/
public SingleStatusRemoteIterator(LocatedFileStatus status) {
public SingleStatusRemoteIterator(S3ALocatedFileStatus status) {
this.status = status;
}

Expand All @@ -226,9 +226,9 @@ public boolean hasNext() throws IOException {
* to the constructor.
*/
@Override
public LocatedFileStatus next() throws IOException {
public S3ALocatedFileStatus next() throws IOException {
if (hasNext()) {
LocatedFileStatus s = this.status;
S3ALocatedFileStatus s = this.status;
status = null;
return s;
} else {
Expand All @@ -247,16 +247,16 @@ public LocatedFileStatus next() throws IOException {
* There is no remote data to fetch.
*/
static class ProvidedFileStatusIterator
implements RemoteIterator<FileStatus> {
private final ArrayList<FileStatus> filteredStatusList;
implements RemoteIterator<S3AFileStatus> {
private final ArrayList<S3AFileStatus> filteredStatusList;
private int index = 0;

ProvidedFileStatusIterator(FileStatus[] fileStatuses, PathFilter filter,
ProvidedFileStatusIterator(S3AFileStatus[] fileStatuses, PathFilter filter,
FileStatusAcceptor acceptor) {
Preconditions.checkArgument(fileStatuses != null, "Null status list!");

filteredStatusList = new ArrayList<>(fileStatuses.length);
for (FileStatus status : fileStatuses) {
for (S3AFileStatus status : fileStatuses) {
if (filter.accept(status.getPath()) && acceptor.accept(status)) {
filteredStatusList.add(status);
}
Expand All @@ -270,7 +270,7 @@ public boolean hasNext() throws IOException {
}

@Override
public FileStatus next() throws IOException {
public S3AFileStatus next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
Expand Down Expand Up @@ -305,7 +305,7 @@ public FileStatus next() throws IOException {
* Thread safety: None.
*/
class FileStatusListingIterator
implements RemoteIterator<FileStatus> {
implements RemoteIterator<S3AFileStatus> {

/** Source of objects. */
private final ObjectListingIterator source;
Expand All @@ -316,10 +316,10 @@ class FileStatusListingIterator
/** request batch size. */
private int batchSize;
/** Iterator over the current set of results. */
private ListIterator<FileStatus> statusBatchIterator;
private ListIterator<S3AFileStatus> statusBatchIterator;

private final Set<FileStatus> providedStatus;
private Iterator<FileStatus> providedStatusIterator;
private final Set<S3AFileStatus> providedStatus;
private Iterator<S3AFileStatus> providedStatusIterator;

/**
* Create an iterator over file status entries.
Expand All @@ -335,13 +335,13 @@ class FileStatusListingIterator
FileStatusListingIterator(ObjectListingIterator source,
PathFilter filter,
FileStatusAcceptor acceptor,
RemoteIterator<FileStatus> providedStatus) throws IOException {
RemoteIterator<S3AFileStatus> providedStatus) throws IOException {
this.source = source;
this.filter = filter;
this.acceptor = acceptor;
this.providedStatus = new HashSet<>();
for (; providedStatus != null && providedStatus.hasNext();) {
final FileStatus status = providedStatus.next();
final S3AFileStatus status = providedStatus.next();
if (filter.accept(status.getPath()) && acceptor.accept(status)) {
this.providedStatus.add(status);
}
Expand Down Expand Up @@ -384,8 +384,8 @@ private boolean sourceHasNext() throws IOException {

@Override
@Retries.RetryTranslated
public FileStatus next() throws IOException {
final FileStatus status;
public S3AFileStatus next() throws IOException {
final S3AFileStatus status;
if (sourceHasNext()) {
status = statusBatchIterator.next();
// We remove from provided list the file status listed by S3 so that
Expand Down Expand Up @@ -441,7 +441,7 @@ private boolean buildNextStatusBatch(S3ListResult objects) {
// counters for debug logs
int added = 0, ignored = 0;
// list to fill in with results. Initial size will be list maximum.
List<FileStatus> stats = new ArrayList<>(
List<S3AFileStatus> stats = new ArrayList<>(
objects.getObjectSummaries().size() +
objects.getCommonPrefixes().size());
// objects
Expand All @@ -453,8 +453,9 @@ private boolean buildNextStatusBatch(S3ListResult objects) {
}
// Skip over keys that are ourselves and old S3N _$folder$ files
if (acceptor.accept(keyPath, summary) && filter.accept(keyPath)) {
FileStatus status = createFileStatus(keyPath, summary,
owner.getDefaultBlockSize(keyPath), owner.getUsername());
S3AFileStatus status = createFileStatus(keyPath, summary,
owner.getDefaultBlockSize(keyPath), owner.getUsername(),
null, null);
LOG.debug("Adding: {}", status);
stats.add(status);
added++;
Expand All @@ -468,7 +469,7 @@ private boolean buildNextStatusBatch(S3ListResult objects) {
for (String prefix : objects.getCommonPrefixes()) {
Path keyPath = owner.keyToQualifiedPath(prefix);
if (acceptor.accept(keyPath, prefix) && filter.accept(keyPath)) {
FileStatus status = new S3AFileStatus(Tristate.FALSE, keyPath,
S3AFileStatus status = new S3AFileStatus(Tristate.FALSE, keyPath,
owner.getUsername());
LOG.debug("Adding directory: {}", status);
added++;
Expand Down Expand Up @@ -679,14 +680,14 @@ public boolean accept(FileStatus status) {
* return a remote iterator of {@link LocatedFileStatus} instances.
*/
class LocatedFileStatusIterator
implements RemoteIterator<LocatedFileStatus> {
private final RemoteIterator<FileStatus> statusIterator;
implements RemoteIterator<S3ALocatedFileStatus> {
private final RemoteIterator<S3AFileStatus> statusIterator;

/**
* Constructor.
* @param statusIterator an iterator over the remote status entries
*/
LocatedFileStatusIterator(RemoteIterator<FileStatus> statusIterator) {
LocatedFileStatusIterator(RemoteIterator<S3AFileStatus> statusIterator) {
this.statusIterator = statusIterator;
}

Expand All @@ -696,7 +697,7 @@ public boolean hasNext() throws IOException {
}

@Override
public LocatedFileStatus next() throws IOException {
public S3ALocatedFileStatus next() throws IOException {
return owner.toLocatedFileStatus(statusIterator.next());
}
}
Expand All @@ -708,16 +709,16 @@ public LocatedFileStatus next() throws IOException {
* remain in the source iterator.
*/
static class TombstoneReconcilingIterator implements
RemoteIterator<LocatedFileStatus> {
private LocatedFileStatus next = null;
private final RemoteIterator<LocatedFileStatus> iterator;
RemoteIterator<S3ALocatedFileStatus> {
private S3ALocatedFileStatus next = null;
private final RemoteIterator<S3ALocatedFileStatus> iterator;
private final Set<Path> tombstones;

/**
* @param iterator Source iterator to filter
* @param tombstones set of tombstone markers to filter out of results
*/
TombstoneReconcilingIterator(RemoteIterator<LocatedFileStatus>
TombstoneReconcilingIterator(RemoteIterator<S3ALocatedFileStatus>
iterator, Set<Path> tombstones) {
this.iterator = iterator;
if (tombstones != null) {
Expand All @@ -729,7 +730,7 @@ static class TombstoneReconcilingIterator implements

private boolean fetch() throws IOException {
while (next == null && iterator.hasNext()) {
LocatedFileStatus candidate = iterator.next();
S3ALocatedFileStatus candidate = iterator.next();
if (!tombstones.contains(candidate.getPath())) {
next = candidate;
return true;
Expand All @@ -745,9 +746,9 @@ public boolean hasNext() throws IOException {
return fetch();
}

public LocatedFileStatus next() throws IOException {
public S3ALocatedFileStatus next() throws IOException {
if (hasNext()) {
LocatedFileStatus result = next;
S3ALocatedFileStatus result = next;
next = null;
fetch();
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
@InterfaceStability.Unstable
public class RemoteFileChangedException extends PathIOException {

public static final String PRECONDITIONS_FAILED =
"Constraints of request were unsatisfiable";

/**
* Constructs a RemoteFileChangedException.
*
Expand All @@ -46,4 +49,21 @@ public RemoteFileChangedException(String path,
super(path, message);
setOperation(operation);
}

/**
* Constructs a RemoteFileChangedException.
*
* @param path the path accessed when the change was detected
* @param operation the operation (e.g. open, re-open) performed when the
* change was detected
* @param message a message providing more details about the condition
* @param cause inner cause.
*/
public RemoteFileChangedException(String path,
String operation,
String message,
Throwable cause) {
super(path, message, cause);
setOperation(operation);
}
}
Loading