+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.LocatedFileStatus;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link LocatedFileStatus} extended to also carry ETag and object version ID.
+ */
+public class S3ALocatedFileStatus extends LocatedFileStatus {
+
+ private static final long serialVersionUID = 3597192103662929338L;
+
+ private final String eTag;
+ private final String versionId;
+
+ public S3ALocatedFileStatus(S3AFileStatus status, BlockLocation[] locations,
+ String eTag, String versionId) {
+ super(checkNotNull(status), locations);
+ this.eTag = eTag;
+ this.versionId = versionId;
+ }
+
+ public String getETag() {
+ return eTag;
+ }
+
+ public String getVersionId() {
+ return versionId;
+ }
+
+ // equals() and hashCode() overridden to avoid FindBugs warning.
+ // Base implementation is equality on Path only, which is still appropriate.
+
+ @Override
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 8d204d7c56014..fe4fd0ffd6a70 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -530,16 +530,20 @@ public static String stringify(AmazonS3Exception e) {
* @param summary summary from AWS
* @param blockSize block size to declare.
* @param owner owner of the file
+ * @param eTag S3 object eTag or null if unavailable
+ * @param versionId S3 object versionId or null if unavailable
* @return a status entry
*/
public static S3AFileStatus createFileStatus(Path keyPath,
S3ObjectSummary summary,
long blockSize,
- String owner) {
+ String owner,
+ String eTag,
+ String versionId) {
long size = summary.getSize();
return createFileStatus(keyPath,
objectRepresentsDirectory(summary.getKey(), size),
- size, summary.getLastModified(), blockSize, owner);
+ size, summary.getLastModified(), blockSize, owner, eTag, versionId);
}
/**
@@ -552,22 +556,27 @@ public static S3AFileStatus createFileStatus(Path keyPath,
* @param size file length
* @param blockSize block size for file status
* @param owner Hadoop username
+ * @param eTag S3 object eTag or null if unavailable
+ * @param versionId S3 object versionId or null if unavailable
* @return a status entry
*/
public static S3AFileStatus createUploadFileStatus(Path keyPath,
- boolean isDir, long size, long blockSize, String owner) {
+ boolean isDir, long size, long blockSize, String owner,
+ String eTag, String versionId) {
Date date = isDir ? null : new Date();
- return createFileStatus(keyPath, isDir, size, date, blockSize, owner);
+ return createFileStatus(keyPath, isDir, size, date, blockSize, owner,
+ eTag, versionId);
}
/* Date 'modified' is ignored when isDir is true. */
private static S3AFileStatus createFileStatus(Path keyPath, boolean isDir,
- long size, Date modified, long blockSize, String owner) {
+ long size, Date modified, long blockSize, String owner,
+ String eTag, String versionId) {
if (isDir) {
return new S3AFileStatus(Tristate.UNKNOWN, keyPath, owner);
} else {
return new S3AFileStatus(size, dateToLong(modified), keyPath, blockSize,
- owner);
+ owner, eTag, versionId);
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java
index d67e3e1e8cbc6..2e62ff6728206 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java
@@ -34,16 +34,22 @@ public class S3ObjectAttributes {
private final String key;
private final S3AEncryptionMethods serverSideEncryptionAlgorithm;
private final String serverSideEncryptionKey;
+ private final String eTag;
+ private final String versionId;
public S3ObjectAttributes(
String bucket,
String key,
S3AEncryptionMethods serverSideEncryptionAlgorithm,
- String serverSideEncryptionKey) {
+ String serverSideEncryptionKey,
+ String eTag,
+ String versionId) {
this.bucket = bucket;
this.key = key;
this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
this.serverSideEncryptionKey = serverSideEncryptionKey;
+ this.eTag = eTag;
+ this.versionId = versionId;
}
public String getBucket() {
@@ -61,4 +67,12 @@ public S3AEncryptionMethods getServerSideEncryptionAlgorithm() {
public String getServerSideEncryptionKey() {
return serverSideEncryptionKey;
}
+
+ public String getETag() {
+ return eTag;
+ }
+
+ public String getVersionId() {
+ return versionId;
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
index 73303f4d92f79..4ea916f8eed2a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
@@ -226,7 +226,8 @@ public String initiateMultiPartUpload(String destKey) throws IOException {
/**
* Finalize a multipart PUT operation.
* This completes the upload, and, if that works, calls
- * {@link S3AFileSystem#finishedWrite(String, long)} to update the filesystem.
+ * {@link S3AFileSystem#finishedWrite(String, long, String, String)}
+ * to update the filesystem.
* Retry policy: retrying, translated.
* @param destKey destination of the commit
* @param uploadId multipart operation Id
@@ -259,7 +260,8 @@ private CompleteMultipartUploadResult finalizeMultipartUpload(
destKey,
uploadId,
new ArrayList<>(partETags)));
- owner.finishedWrite(destKey, length);
+ owner.finishedWrite(destKey, length, result.getETag(),
+ result.getVersionId());
return result;
}
);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeDetectionPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeDetectionPolicy.java
index f3d8bc20c824b..7972183796aa3 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeDetectionPolicy.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeDetectionPolicy.java
@@ -20,9 +20,12 @@
import java.util.Locale;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.transfer.model.CopyResult;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -200,6 +203,28 @@ public static ChangeDetectionPolicy createPolicy(final Mode mode,
public abstract String getRevisionId(ObjectMetadata objectMetadata,
String uri);
+ /**
+ * Like {{@link #getRevisionId(ObjectMetadata, String)}}, but retrieves the
+ * revision identifier from {@link S3ObjectAttributes}.
+ *
+ * @param s3Attributes the object attributes
+ * @return the revisionId string as interpreted by this policy, or potentially
+ * null if the attribute is unavailable (such as when the policy says to use
+ * versionId but object versioning is not enabled for the bucket).
+ */
+ public abstract String getRevisionId(S3ObjectAttributes s3Attributes);
+
+ /**
+ * Like {{@link #getRevisionId(ObjectMetadata, String)}}, but retrieves the
+ * revision identifier from {@link CopyResult}.
+ *
+ * @param copyResult the copy result
+ * @return the revisionId string as interpreted by this policy, or potentially
+ * null if the attribute is unavailable (such as when the policy says to use
+ * versionId but object versioning is not enabled for the bucket).
+ */
+ public abstract String getRevisionId(CopyResult copyResult);
+
/**
* Applies the given {@link #getRevisionId(ObjectMetadata, String) revisionId}
* as a server-side qualification on the {@code GetObjectRequest}.
@@ -210,6 +235,16 @@ public abstract String getRevisionId(ObjectMetadata objectMetadata,
public abstract void applyRevisionConstraint(GetObjectRequest request,
String revisionId);
+ /**
+ * Applies the given {@link #getRevisionId(ObjectMetadata, String) revisionId}
+ * as a server-side qualification on the {@code CopyObjectRequest}.
+ *
+ * @param request the request
+ * @param revisionId the revision id
+ */
+ public abstract void applyRevisionConstraint(CopyObjectRequest request,
+ String revisionId);
+
/**
* Takes appropriate action based on {@link #getMode() mode} when a change has
* been detected.
@@ -242,7 +277,8 @@ public ImmutablePair onChangeDetected(
if (timesAlreadyDetected == 0) {
// only warn on the first detection to avoid a noisy log
LOG.warn(
- String.format("%s change detected on %s %s at %d. Expected %s got %s",
+ String.format(
+ "%s change detected on %s %s at %d. Expected %s got %s",
getSource(), operation, uri, position, revisionId,
newRevisionId));
return new ImmutablePair<>(true, null);
@@ -277,11 +313,32 @@ public String getRevisionId(ObjectMetadata objectMetadata, String uri) {
return objectMetadata.getETag();
}
+ @Override
+ public String getRevisionId(S3ObjectAttributes s3Attributes) {
+ return s3Attributes.getETag();
+ }
+
+ @Override
+ public String getRevisionId(CopyResult copyResult) {
+ return copyResult.getETag();
+ }
+
@Override
public void applyRevisionConstraint(GetObjectRequest request,
String revisionId) {
- LOG.debug("Restricting request to etag {}", revisionId);
- request.withMatchingETagConstraint(revisionId);
+ if (revisionId != null) {
+ LOG.debug("Restricting get request to etag {}", revisionId);
+ request.withMatchingETagConstraint(revisionId);
+ }
+ }
+
+ @Override
+ public void applyRevisionConstraint(CopyObjectRequest request,
+ String revisionId) {
+ if (revisionId != null) {
+ LOG.debug("Restricting copy request to etag {}", revisionId);
+ request.withMatchingETagConstraint(revisionId);
+ }
}
@Override
@@ -323,11 +380,32 @@ public String getRevisionId(ObjectMetadata objectMetadata, String uri) {
return versionId;
}
+ @Override
+ public String getRevisionId(S3ObjectAttributes s3Attributes) {
+ return s3Attributes.getVersionId();
+ }
+
+ @Override
+ public String getRevisionId(CopyResult copyResult) {
+ return copyResult.getVersionId();
+ }
+
@Override
public void applyRevisionConstraint(GetObjectRequest request,
String revisionId) {
- LOG.debug("Restricting request to version {}", revisionId);
- request.withVersionId(revisionId);
+ if (revisionId != null) {
+ LOG.debug("Restricting get request to version {}", revisionId);
+ request.withVersionId(revisionId);
+ }
+ }
+
+ @Override
+ public void applyRevisionConstraint(CopyObjectRequest request,
+ String revisionId) {
+ if (revisionId != null) {
+ LOG.debug("Restricting copy request to version {}", revisionId);
+ request.withSourceVersionId(revisionId);
+ }
}
@Override
@@ -361,12 +439,28 @@ public String getRevisionId(final ObjectMetadata objectMetadata,
return null;
}
+ @Override
+ public String getRevisionId(final S3ObjectAttributes s3ObjectAttributes) {
+ return null;
+ }
+
+ @Override
+ public String getRevisionId(CopyResult copyResult) {
+ return null;
+ }
+
@Override
public void applyRevisionConstraint(final GetObjectRequest request,
final String revisionId) {
}
+ @Override
+ public void applyRevisionConstraint(CopyObjectRequest request,
+ String revisionId) {
+
+ }
+
@Override
public String toString() {
return "NoChangeDetection";
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java
index f76602b953259..7278307e5f1f8 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChangeTracker.java
@@ -20,11 +20,13 @@
import java.util.concurrent.atomic.AtomicLong;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.transfer.model.CopyResult;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.fs.s3a.NoVersionAttributeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,14 +34,18 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.s3a.NoVersionAttributeException;
import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.http.HttpStatus.SC_PRECONDITION_FAILED;
/**
- * Change tracking for input streams: the revision ID/etag
- * the previous request is recorded and when the next request comes in,
- * it is compared.
+ * Change tracking for input streams: the version ID or etag of the object is
+ * tracked and compared on open/re-open. An initial version ID or etag may or
+ * may not be available, depending on usage (e.g. if S3Guard is utilized).
+ *
* Self-contained for testing and use in different streams.
*/
@InterfaceAudience.Private
@@ -49,7 +55,7 @@ public class ChangeTracker {
private static final Logger LOG =
LoggerFactory.getLogger(ChangeTracker.class);
- public static final String CHANGE_REPORTED_BY_S3 = "reported by S3";
+ public static final String CHANGE_REPORTED_BY_S3 = "Change reported by S3";
/** Policy to use. */
private final ChangeDetectionPolicy policy;
@@ -76,13 +82,20 @@ public class ChangeTracker {
* @param uri URI of object being tracked
* @param policy policy to track.
* @param versionMismatches reference to the version mismatch counter
+ * @param s3ObjectAttributes attributes of the object, potentially including
+ * an eTag or versionId to match depending on {@code policy}
*/
public ChangeTracker(final String uri,
final ChangeDetectionPolicy policy,
- final AtomicLong versionMismatches) {
+ final AtomicLong versionMismatches,
+ final S3ObjectAttributes s3ObjectAttributes) {
this.policy = checkNotNull(policy);
this.uri = uri;
this.versionMismatches = versionMismatches;
+ this.revisionId = policy.getRevisionId(s3ObjectAttributes);
+ if (revisionId != null) {
+ LOG.debug("Revision ID for object at {}: {}", uri, revisionId);
+ }
}
public String getRevisionId() {
@@ -115,6 +128,23 @@ public boolean maybeApplyConstraint(
return false;
}
+ /**
+ * Apply any revision control set by the policy if it is to be
+ * enforced on the server.
+ * @param request request to modify
+ * @return true iff a constraint was added.
+ */
+ public boolean maybeApplyConstraint(
+ final CopyObjectRequest request) {
+
+ if (policy.getMode() == ChangeDetectionPolicy.Mode.Server
+ && revisionId != null) {
+ policy.applyRevisionConstraint(request, revisionId);
+ return true;
+ }
+ return false;
+ }
+
/**
* Process the response from the server for validation against the
@@ -135,29 +165,97 @@ public void processResponse(final S3Object object,
// object was not returned.
versionMismatches.incrementAndGet();
throw new RemoteFileChangedException(uri, operation,
- String.format("%s change "
- + CHANGE_REPORTED_BY_S3
+ String.format(CHANGE_REPORTED_BY_S3
+ " while reading"
+ " at position %s."
- + " Version %s was unavailable",
- getSource(),
+ + " %s %s was unavailable",
pos,
+ getSource(),
getRevisionId()));
} else {
throw new PathIOException(uri, "No data returned from GET request");
}
}
- final ObjectMetadata metadata = object.getObjectMetadata();
+ processMetadata(object.getObjectMetadata(), operation, pos);
+ }
+
+ /**
+ * Process the response from the server for validation against the
+ * change policy.
+ * @param copyResult result of a copy operation
+ * @throws PathIOException raised on failure
+ * @throws RemoteFileChangedException if the remote file has changed.
+ */
+ public void processResponse(final CopyResult copyResult)
+ throws PathIOException {
+ // ETag (sometimes, depending on encryption and/or multipart) is not the
+ // same on the copied object as the original. Version Id seems to never
+ // be the same on the copy. As such, there isn't really anything that
+ // can be verified on the response, except that a revision ID is present
+ // if required.
+ String newRevisionId = policy.getRevisionId(copyResult);
+ LOG.debug("Copy result {}: {}", policy.getSource(), newRevisionId);
+ if (newRevisionId == null && policy.isRequireVersion()) {
+ throw new NoVersionAttributeException(uri, String.format(
+ "Change detection policy requires %s",
+ policy.getSource()));
+ }
+ }
+
+ /**
+ * Process an exception generated against the change policy.
+ * If the exception indicates the file has changed, this method throws
+ * {@code RemoteFileChangedException} with the original exception as the
+ * cause.
+ * @param e the exception
+ * @param operation the operation performed when the exception was
+ * generated (e.g. "copy", "read", "select").
+ * @throws RemoteFileChangedException if the remote file has changed.
+ */
+ public void processException(Exception e, String operation) throws
+ RemoteFileChangedException {
+ if (e instanceof AmazonServiceException) {
+ AmazonServiceException serviceException = (AmazonServiceException) e;
+ if (serviceException.getStatusCode() == SC_PRECONDITION_FAILED) {
+ versionMismatches.incrementAndGet();
+ throw new RemoteFileChangedException(uri, operation, String.format(
+ RemoteFileChangedException.PRECONDITIONS_FAILED
+ + " on %s."
+ + " Version %s was unavailable",
+ getSource(),
+ getRevisionId()),
+ serviceException);
+ }
+ }
+ }
+
+ /**
+ * Process metadata response from server for validation against the change
+ * policy.
+ * @param metadata metadata returned from server
+ * @param operation operation in progress
+ * @param pos offset of read
+ * @throws PathIOException raised on failure
+ * @throws RemoteFileChangedException if the remote file has changed.
+ */
+ public void processMetadata(final ObjectMetadata metadata,
+ final String operation,
+ final long pos) throws PathIOException {
final String newRevisionId = policy.getRevisionId(metadata, uri);
+ processNewRevision(newRevisionId, operation, pos);
+ }
+
+ private void processNewRevision(final String newRevisionId,
+ final String operation, final long pos) throws PathIOException {
if (newRevisionId == null && policy.isRequireVersion()) {
throw new NoVersionAttributeException(uri, String.format(
"Change detection policy requires %s",
policy.getSource()));
}
if (revisionId == null) {
- // revisionId is null on first (re)open. Pin it so change can be detected
- // if object has been updated
+ // revisionId may be null on first (re)open. Pin it so change can be
+ // detected if object has been updated
LOG.debug("Setting revision ID for object at {}: {}",
uri, newRevisionId);
revisionId = newRevisionId;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DDBPathMetadata.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DDBPathMetadata.java
index 78568dc4bbbf9..e3a529ac14f2d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DDBPathMetadata.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DDBPathMetadata.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.fs.s3a.s3guard;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.Tristate;
/**
@@ -36,18 +36,18 @@ public DDBPathMetadata(PathMetadata pmd) {
this.setLastUpdated(pmd.getLastUpdated());
}
- public DDBPathMetadata(FileStatus fileStatus) {
+ public DDBPathMetadata(S3AFileStatus fileStatus) {
super(fileStatus);
this.isAuthoritativeDir = false;
}
- public DDBPathMetadata(FileStatus fileStatus, Tristate isEmptyDir,
+ public DDBPathMetadata(S3AFileStatus fileStatus, Tristate isEmptyDir,
boolean isDeleted) {
super(fileStatus, isEmptyDir, isDeleted);
this.isAuthoritativeDir = false;
}
- public DDBPathMetadata(FileStatus fileStatus, Tristate isEmptyDir,
+ public DDBPathMetadata(S3AFileStatus fileStatus, Tristate isEmptyDir,
boolean isDeleted, boolean isAuthoritativeDir, long lastUpdated) {
super(fileStatus, isEmptyDir, isDeleted);
this.isAuthoritativeDir = isAuthoritativeDir;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
index dcee35824ed0f..88a46745b11bf 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
@@ -28,9 +28,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
/**
* {@code DescendantsIterator} is a {@link RemoteIterator} that implements
@@ -83,7 +83,7 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class DescendantsIterator implements RemoteIterator {
+public class DescendantsIterator implements RemoteIterator {
private final MetadataStore metadataStore;
private final Queue queue = new LinkedList<>();
@@ -121,7 +121,7 @@ public boolean hasNext() throws IOException {
}
@Override
- public FileStatus next() throws IOException {
+ public S3AFileStatus next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException("No more descendants.");
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
index 88f24aa9841e7..1059dd148623b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.Tristate;
/**
@@ -61,7 +62,7 @@ public class DirListingMetadata extends ExpirableMetadata {
* Create a directory listing metadata container.
*
* @param path Path of the directory. If this path has a host component, then
- * all paths added later via {@link #put(FileStatus)} must also have
+ * all paths added later via {@link #put(S3AFileStatus)} must also have
* the same host.
* @param listing Entries in the directory.
* @param isAuthoritative true iff listing is the full contents of the
@@ -225,7 +226,7 @@ public void remove(Path childPath) {
* @return true if the status was added or replaced with a new value. False
* if the same FileStatus value was already present.
*/
- public boolean put(FileStatus childFileStatus) {
+ public boolean put(S3AFileStatus childFileStatus) {
Preconditions.checkNotNull(childFileStatus,
"childFileStatus must be non-null");
Path childPath = childStatusToPathKey(childFileStatus);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
index 769d3d4c4c376..289abf258a9cf 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
@@ -88,6 +88,7 @@
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.S3AUtils;
@@ -129,6 +130,14 @@
* This attribute is meaningful only to file items.
*
optional long attribute revealing block size of the file.
* This attribute is meaningful only to file items.
+ *
optional string attribute tracking the s3 eTag of the file.
+ * May be absent if the metadata was entered with a version of S3Guard
+ * before this was tracked.
+ * This attribute is meaningful only to file items.
+ *
optional string attribute tracking the s3 versionId of the file.
+ * May be absent if the metadata was entered with a version of S3Guard
+ * before this was tracked.
+ * This attribute is meaningful only to file items.
*
*
* The DynamoDB partition key is the parent, and the range key is the child.
@@ -155,20 +164,20 @@
* This is persisted to a single DynamoDB table as:
*
*
*
* This choice of schema is efficient for read access patterns.
@@ -625,9 +634,8 @@ private DDBPathMetadata innerGet(Path path, boolean wantEmptyDirectoryFlag)
* @param path path to dir
* @return new FileStatus
*/
- private FileStatus makeDirStatus(String owner, Path path) {
- return new FileStatus(0, true, 1, 0, 0, 0, null,
- owner, null, path);
+ private S3AFileStatus makeDirStatus(String owner, Path path) {
+ return new S3AFileStatus(Tristate.UNKNOWN, path, owner);
}
@Override
@@ -710,7 +718,7 @@ Collection completeAncestry(
while (!parent.isRoot() && !ancestry.containsKey(parent)) {
LOG.debug("auto-create ancestor path {} for child path {}",
parent, path);
- final FileStatus status = makeDirStatus(parent, username);
+ final S3AFileStatus status = makeDirStatus(parent, username);
ancestry.put(parent, new DDBPathMetadata(status, Tristate.FALSE,
false));
parent = parent.getParent();
@@ -915,7 +923,7 @@ Collection fullPathsToPut(DDBPathMetadata meta)
while (path != null && !path.isRoot()) {
final Item item = getConsistentItem(path);
if (!itemExists(item)) {
- final FileStatus status = makeDirStatus(path, username);
+ final S3AFileStatus status = makeDirStatus(path, username);
metasToPut.add(new DDBPathMetadata(status, Tristate.FALSE, false,
meta.isAuthoritativeDir(), meta.getLastUpdated()));
path = path.getParent();
@@ -938,9 +946,8 @@ private boolean itemExists(Item item) {
}
/** Create a directory FileStatus using current system time as mod time. */
- static FileStatus makeDirStatus(Path f, String owner) {
- return new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
- null, owner, owner, f);
+ static S3AFileStatus makeDirStatus(Path f, String owner) {
+ return new S3AFileStatus(Tristate.UNKNOWN, f, owner);
}
/**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
index b8f9635dcd283..9276388679866 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.Tristate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -231,7 +232,7 @@ public void move(Collection pathsToDelete,
public void put(PathMetadata meta) throws IOException {
Preconditions.checkNotNull(meta);
- FileStatus status = meta.getFileStatus();
+ S3AFileStatus status = meta.getFileStatus();
Path path = standardize(status.getPath());
synchronized (this) {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreListFilesIterator.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreListFilesIterator.java
index 378d10980c835..e4e76c50d6ce5 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreListFilesIterator.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreListFilesIterator.java
@@ -33,9 +33,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
/**
* {@code MetadataStoreListFilesIterator} is a {@link RemoteIterator} that
@@ -85,14 +85,14 @@
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class MetadataStoreListFilesIterator implements
- RemoteIterator {
+ RemoteIterator {
public static final Logger LOG = LoggerFactory.getLogger(
MetadataStoreListFilesIterator.class);
private final boolean allowAuthoritative;
private final MetadataStore metadataStore;
private final Set tombstones = new HashSet<>();
- private Iterator leafNodesIterator = null;
+ private Iterator leafNodesIterator = null;
public MetadataStoreListFilesIterator(MetadataStore ms, PathMetadata meta,
boolean allowAuthoritative) throws IOException {
@@ -104,7 +104,7 @@ public MetadataStoreListFilesIterator(MetadataStore ms, PathMetadata meta,
private void prefetch(PathMetadata meta) throws IOException {
final Queue queue = new LinkedList<>();
- final Collection leafNodes = new ArrayList<>();
+ final Collection leafNodes = new ArrayList<>();
if (meta != null) {
final Path path = meta.getFileStatus().getPath();
@@ -121,7 +121,7 @@ private void prefetch(PathMetadata meta) throws IOException {
while(!queue.isEmpty()) {
PathMetadata nextMetadata = queue.poll();
- FileStatus nextStatus = nextMetadata.getFileStatus();
+ S3AFileStatus nextStatus = nextMetadata.getFileStatus();
if (nextStatus.isFile()) {
// All files are leaf nodes by definition
leafNodes.add(nextStatus);
@@ -159,7 +159,7 @@ public boolean hasNext() {
}
@Override
- public FileStatus next() {
+ public S3AFileStatus next() {
return leafNodesIterator.next();
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java
index 56645fead73c4..5c00cdc5df706 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.Tristate;
/**
@@ -33,7 +34,7 @@
@InterfaceStability.Evolving
public class PathMetadata extends ExpirableMetadata {
- private final FileStatus fileStatus;
+ private S3AFileStatus fileStatus;
private Tristate isEmptyDirectory;
private boolean isDeleted;
@@ -43,24 +44,25 @@ public class PathMetadata extends ExpirableMetadata {
* @return the entry.
*/
public static PathMetadata tombstone(Path path) {
- long now = System.currentTimeMillis();
- FileStatus status = new FileStatus(0, false, 0, 0, now, path);
- return new PathMetadata(status, Tristate.UNKNOWN, true);
+ S3AFileStatus s3aStatus = new S3AFileStatus(0,
+ System.currentTimeMillis(), path, 0, null,
+ null, null);
+ return new PathMetadata(s3aStatus, Tristate.UNKNOWN, true);
}
/**
* Creates a new {@code PathMetadata} containing given {@code FileStatus}.
* @param fileStatus file status containing an absolute path.
*/
- public PathMetadata(FileStatus fileStatus) {
- this(fileStatus, Tristate.UNKNOWN);
+ public PathMetadata(S3AFileStatus fileStatus) {
+ this(fileStatus, Tristate.UNKNOWN, false);
}
- public PathMetadata(FileStatus fileStatus, Tristate isEmptyDir) {
+ public PathMetadata(S3AFileStatus fileStatus, Tristate isEmptyDir) {
this(fileStatus, isEmptyDir, false);
}
- public PathMetadata(FileStatus fileStatus, Tristate isEmptyDir, boolean
+ public PathMetadata(S3AFileStatus fileStatus, Tristate isEmptyDir, boolean
isDeleted) {
Preconditions.checkNotNull(fileStatus, "fileStatus must be non-null");
Preconditions.checkNotNull(fileStatus.getPath(), "fileStatus path must be" +
@@ -75,7 +77,7 @@ public PathMetadata(FileStatus fileStatus, Tristate isEmptyDir, boolean
/**
* @return {@code FileStatus} contained in this {@code PathMetadata}.
*/
- public final FileStatus getFileStatus() {
+ public final S3AFileStatus getFileStatus() {
return fileStatus;
}
@@ -91,6 +93,7 @@ public Tristate isEmptyDirectory() {
void setIsEmptyDirectory(Tristate isEmptyDirectory) {
this.isEmptyDirectory = isEmptyDirectory;
+ fileStatus.setIsEmptyDirectory(isEmptyDirectory);
}
public boolean isDeleted() {
@@ -128,10 +131,11 @@ public String toString() {
* @param sb target StringBuilder
*/
public void prettyPrint(StringBuilder sb) {
- sb.append(String.format("%-5s %-20s %-7d %-8s %-6s",
+ sb.append(String.format("%-5s %-20s %-7d %-8s %-6s %-20s %-20s",
fileStatus.isDirectory() ? "dir" : "file",
fileStatus.getPath().toString(), fileStatus.getLen(),
- isEmptyDirectory.name(), isDeleted));
+ isEmptyDirectory.name(), isDeleted,
+ fileStatus.getETag(), fileStatus.getVersionId()));
sb.append(fileStatus);
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
index c6f70bf277f44..7c168b9370961 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.Tristate;
/**
@@ -70,6 +71,8 @@ final class PathMetadataDynamoDBTranslation {
static final String IS_DELETED = "is_deleted";
static final String IS_AUTHORITATIVE = "is_authoritative";
static final String LAST_UPDATED = "last_updated";
+ static final String ETAG = "etag";
+ static final String VERSION_ID = "version_id";
/** Used while testing backward compatibility. */
@VisibleForTesting
@@ -135,7 +138,7 @@ static DDBPathMetadata itemToPathMetadata(Item item, String username) {
boolean isDir = item.hasAttribute(IS_DIR) && item.getBoolean(IS_DIR);
boolean isAuthoritativeDir = false;
- final FileStatus fileStatus;
+ final S3AFileStatus fileStatus;
long lastUpdated = 0;
if (isDir) {
isAuthoritativeDir = !IGNORED_FIELDS.contains(IS_AUTHORITATIVE)
@@ -146,8 +149,10 @@ static DDBPathMetadata itemToPathMetadata(Item item, String username) {
long len = item.hasAttribute(FILE_LENGTH) ? item.getLong(FILE_LENGTH) : 0;
long modTime = item.hasAttribute(MOD_TIME) ? item.getLong(MOD_TIME) : 0;
long block = item.hasAttribute(BLOCK_SIZE) ? item.getLong(BLOCK_SIZE) : 0;
- fileStatus = new FileStatus(len, false, 1, block, modTime, 0, null,
- username, username, path);
+ String eTag = item.getString(ETAG);
+ String versionId = item.getString(VERSION_ID);
+ fileStatus = new S3AFileStatus(
+ len, modTime, path, block, username, eTag, versionId);
}
lastUpdated =
!IGNORED_FIELDS.contains(LAST_UPDATED)
@@ -172,7 +177,7 @@ static DDBPathMetadata itemToPathMetadata(Item item, String username) {
*/
static Item pathMetadataToItem(DDBPathMetadata meta) {
Preconditions.checkNotNull(meta);
- final FileStatus status = meta.getFileStatus();
+ final S3AFileStatus status = meta.getFileStatus();
final Item item = new Item().withPrimaryKey(pathToKey(status.getPath()));
if (status.isDirectory()) {
item.withBoolean(IS_DIR, true);
@@ -183,6 +188,12 @@ static Item pathMetadataToItem(DDBPathMetadata meta) {
item.withLong(FILE_LENGTH, status.getLen())
.withLong(MOD_TIME, status.getModificationTime())
.withLong(BLOCK_SIZE, status.getBlockSize());
+ if (status.getETag() != null) {
+ item.withString(ETAG, status.getETag());
+ }
+ if (status.getVersionId() != null) {
+ item.withString(VERSION_ID, status.getVersionId());
+ }
}
item.withBoolean(IS_DELETED, meta.isDeleted());
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
index 3376f5c7512bd..09e66a021fddd 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
@@ -67,7 +67,7 @@ public final class S3Guard {
static final Class extends DynamoDBClientFactory>
S3GUARD_DDB_CLIENT_FACTORY_IMPL_DEFAULT =
DynamoDBClientFactory.DefaultDynamoDBClientFactory.class;
- private static final FileStatus[] EMPTY_LISTING = new FileStatus[0];
+ private static final S3AFileStatus[] EMPTY_LISTING = new S3AFileStatus[0];
// Utility class. All static functions.
private S3Guard() { }
@@ -162,7 +162,7 @@ public static S3AFileStatus putAndReturn(MetadataStore ms,
* @param dirMeta directory listing -may be null
* @return a possibly-empty array of file status entries
*/
- public static FileStatus[] dirMetaToStatuses(DirListingMetadata dirMeta) {
+ public static S3AFileStatus[] dirMetaToStatuses(DirListingMetadata dirMeta) {
if (dirMeta == null) {
return EMPTY_LISTING;
}
@@ -176,7 +176,7 @@ public static FileStatus[] dirMetaToStatuses(DirListingMetadata dirMeta) {
}
}
- return statuses.toArray(new FileStatus[0]);
+ return statuses.toArray(new S3AFileStatus[0]);
}
/**
@@ -199,7 +199,7 @@ public static FileStatus[] dirMetaToStatuses(DirListingMetadata dirMeta) {
* @throws IOException if metadata store update failed
*/
public static FileStatus[] dirListingUnion(MetadataStore ms, Path path,
- List backingStatuses, DirListingMetadata dirMeta,
+ List backingStatuses, DirListingMetadata dirMeta,
boolean isAuthoritative, ITtlTimeProvider timeProvider)
throws IOException {
@@ -230,7 +230,7 @@ public static FileStatus[] dirListingUnion(MetadataStore ms, Path path,
pm -> pm.getFileStatus().getPath(), PathMetadata::getFileStatus)
);
- for (FileStatus s : backingStatuses) {
+ for (S3AFileStatus s : backingStatuses) {
if (deleted.contains(s.getPath())) {
continue;
}
@@ -321,7 +321,7 @@ public static void makeDirsOrdered(MetadataStore ms, List dirs,
* [/a/b/file0, /a/b/file1, /a/b/file2, /a/b/file3], isAuthoritative =
* true
*/
- FileStatus prevStatus = null;
+ S3AFileStatus prevStatus = null;
// Use new batched put to reduce round trips.
List pathMetas = new ArrayList<>(dirs.size());
@@ -332,8 +332,8 @@ public static void makeDirsOrdered(MetadataStore ms, List dirs,
boolean isLeaf = (prevStatus == null);
Path f = dirs.get(i);
assertQualified(f);
- FileStatus status =
- createUploadFileStatus(f, true, 0, 0, owner);
+ S3AFileStatus status =
+ createUploadFileStatus(f, true, 0, 0, owner, null, null);
// We only need to put a DirListingMetadata if we are setting
// authoritative bit
@@ -381,7 +381,8 @@ public static void addMoveDir(MetadataStore ms, Collection srcPaths,
}
assertQualified(srcPath, dstPath);
- FileStatus dstStatus = createUploadFileStatus(dstPath, true, 0, 0, owner);
+ S3AFileStatus dstStatus = createUploadFileStatus(dstPath, true, 0,
+ 0, owner, null, null);
addMoveStatus(srcPaths, dstMetas, srcPath, dstStatus);
}
@@ -397,16 +398,18 @@ public static void addMoveDir(MetadataStore ms, Collection srcPaths,
* @param size length of file moved
* @param blockSize blocksize to associate with destination file
* @param owner file owner to use in created records
+ * @param eTag the s3 object eTag of file moved
+ * @param versionId the s3 object versionId of file moved
*/
public static void addMoveFile(MetadataStore ms, Collection srcPaths,
Collection dstMetas, Path srcPath, Path dstPath,
- long size, long blockSize, String owner) {
+ long size, long blockSize, String owner, String eTag, String versionId) {
if (isNullMetadataStore(ms)) {
return;
}
assertQualified(srcPath, dstPath);
- FileStatus dstStatus = createUploadFileStatus(dstPath, false,
- size, blockSize, owner);
+ S3AFileStatus dstStatus = createUploadFileStatus(dstPath, false,
+ size, blockSize, owner, eTag, versionId);
addMoveStatus(srcPaths, dstMetas, srcPath, dstStatus);
}
@@ -463,9 +466,8 @@ public static void addAncestors(MetadataStore metadataStore,
while (!parent.isRoot()) {
PathMetadata directory = metadataStore.get(parent);
if (directory == null || directory.isDeleted()) {
- FileStatus status = new FileStatus(0, true, 1, 0, 0, 0, null, username,
- null, parent);
- PathMetadata meta = new PathMetadata(status, Tristate.FALSE, false);
+ S3AFileStatus s3aStatus = new S3AFileStatus(Tristate.FALSE, parent, username);
+ PathMetadata meta = new PathMetadata(s3aStatus, Tristate.FALSE, false);
newDirs.add(meta);
} else {
break;
@@ -478,7 +480,7 @@ public static void addAncestors(MetadataStore metadataStore,
private static void addMoveStatus(Collection srcPaths,
Collection dstMetas,
Path srcPath,
- FileStatus dstStatus) {
+ S3AFileStatus dstStatus) {
srcPaths.add(srcPath);
dstMetas.add(new PathMetadata(dstStatus));
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
index 1ac167f5a6dc6..0458544832e3b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
@@ -44,12 +44,12 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.MultipartUtils;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
@@ -703,7 +703,7 @@ private void putParentsIfNotPresent(FileStatus f) throws IOException {
if (dirCache.contains(parent)) {
return;
}
- FileStatus dir = DynamoDBMetadataStore.makeDirStatus(parent,
+ S3AFileStatus dir = DynamoDBMetadataStore.makeDirStatus(parent,
f.getOwner());
getStore().put(new PathMetadata(dir));
dirCache.add(parent);
@@ -718,13 +718,13 @@ private void putParentsIfNotPresent(FileStatus f) throws IOException {
*/
private long importDir(FileStatus status) throws IOException {
Preconditions.checkArgument(status.isDirectory());
- RemoteIterator it = getFilesystem()
+ RemoteIterator it = getFilesystem()
.listFilesAndEmptyDirectories(status.getPath(), true);
long items = 0;
while (it.hasNext()) {
- LocatedFileStatus located = it.next();
- FileStatus child;
+ S3ALocatedFileStatus located = it.next();
+ S3AFileStatus child;
if (located.isDirectory()) {
child = DynamoDBMetadataStore.makeDirStatus(located.getPath(),
located.getOwner());
@@ -734,7 +734,9 @@ private long importDir(FileStatus status) throws IOException {
located.getModificationTime(),
located.getPath(),
located.getBlockSize(),
- located.getOwner());
+ located.getOwner(),
+ located.getETag(),
+ located.getVersionId());
}
putParentsIfNotPresent(child);
getStore().put(new PathMetadata(child));
@@ -761,7 +763,8 @@ public int run(String[] args, PrintStream out) throws Exception {
filePath = "/";
}
Path path = new Path(filePath);
- FileStatus status = getFilesystem().getFileStatus(path);
+ S3AFileStatus status = (S3AFileStatus) getFilesystem()
+ .getFileStatus(path);
try {
initMetadataStore(false);
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
index 284956a546515..9f64efe85887b 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
@@ -1054,6 +1054,111 @@ based on usage information collected from previous days, and choosing a
combination of retry counts and an interval which allow for the clients to cope with
some throttling, but not to time-out other applications.
+## Read-After-Overwrite Consistency
+
+S3Guard provides read-after-overwrite consistency through ETags (default) or
+object versioning checked either on the server (default) or client. This works
+such that a reader reading a file after an overwrite either sees the new version
+of the file or an error. Without S3Guard, new readers may see the original
+version. Once S3 reaches eventual consistency, new readers will see the new
+version.
+
+Readers using S3Guard will usually see the new file version, but may
+in rare cases see `RemoteFileChangedException` instead. This would occur if
+an S3 object read cannot provide the version tracked in S3Guard metadata.
+
+S3Guard achieves this behavior by storing ETags and object version IDs in the
+S3Guard metadata store (e.g. DynamoDB). On opening a file, S3AFileSystem
+will look in S3 for the version of the file indicated by the ETag or object
+version ID stored in the metadata store. If that version is unavailable,
+`RemoteFileChangedException` is thrown. Whether ETag or version ID and
+server or client mode is used is determed by the
+[fs.s3a.change.detection configuration options](./index.html#Handling_Read-During-Overwrite).
+
+### No Versioning Metadata Available
+
+When the first S3AFileSystem clients are upgraded to the version of
+S3AFileSystem that contains these change tracking features, any existing
+S3Guard metadata will not contain ETags or object version IDs. Reads of files
+tracked in such S3Guard metadata will access whatever version of the file is
+available in S3 at the time of read. Only if the file is subsequently updated
+will S3Guard start tracking ETag and object version ID and as such generating
+`RemoteFileChangedException` if an inconsistency is detected.
+
+Similarly, when S3Guard metadata is pruned, S3Guard will no longer be able to
+detect an inconsistent read. S3Guard metadata should be retained for at least
+as long as the perceived possible read-after-overwrite temporary inconsistency
+window. That window is expected to be short, but there are no guarantees so it
+is at the administrator's discretion to weigh the risk.
+
+### Known Limitations
+
+#### S3 Select
+
+S3 Select does not provide a capability for server-side ETag or object
+version ID qualification. Whether fs.s3a.change.detection.mode is client or
+server, S3Guard will cause a client-side check of the file version before
+opening the file with S3 Select. If the current version does not match the
+version tracked in S3Guard, `RemoteFileChangedException` is thrown.
+
+It is still possible that the S3 Select read will access a different version of
+the file, if the visible file version changes between the version check and
+the opening of the file. This can happen due to eventual consistency or
+an overwrite of the file between the version check and the open of the file.
+
+#### Rename
+
+Rename is implemented via copy in S3. With fs.s3a.change.detection.mode=client,
+a fully reliable mechansim for ensuring the copied content is the expected
+content is not possible. This is the case since there isn't necessarily a way
+to know the expected ETag or version ID to appear on the object resulting from
+the copy.
+
+Furthermore, if fs.s3a.change.detection.mode=server and a third-party S3
+implemntation is used that doesn't honor the provided ETag or version ID,
+S3AFileSystem and S3Guard cannot detect it.
+
+In either fs.s3.change.detection.mode=server or client, a client-side check
+will be performed before the copy to ensure the current version of the file
+matches S3Guard metadata. If not, `RemoteFileChangedException` is thrown.
+Similar to as discussed with regard to S3 Select, this is not sufficient to
+guarantee that same version is the version copied.
+
+When fs.s3.change.detection.mode=server, the expected version is also specified
+in the underlying S3 CopyObjectRequest. As long as the server honors it, the
+copied object will be correct.
+
+All this said, with the defaults of fs.s3.change.detection.mode=server and
+fs.s3.change.detection.source=etag against Amazon's S3, copy should in fact
+either copy the expected file version or, in the case of an eventual consistency
+anomaly, generate `RemoteFileChangedException`. The same should be true with
+fs.s3.change.detection.source=versionid.
+
+#### Out of Sync Metadata
+
+The S3Guard version tracking metadata (ETag or object version ID) could become
+out of sync with the true current object metadata in S3. For example, S3Guard
+is still tracking v1 of some file after v2 has been written. This could occur
+for reasons such as a writer writing without utilizing S3Guard and/or
+S3AFileSystem or simply due to a write with S3AFileSystem and S3Guard that wrote
+successfully to S3, but failed in communication with S3Guard's metadata store
+(e.g. DynamoDB).
+
+If this happens, reads of the affected file(s) will result in
+`RemoteFileChangedException` until one of:
+
+* the S3Guard metadata is corrected out-of-band
+* the file is overwritten (causing an S3Guard metadata update)
+* the S3Guard metadata is pruned
+
+The S3Guard metadata for a file can be corrected with the `s3guard import`
+command as discussed above. The command can take a file URI instead of a
+bucket URI to correct the metadata for a single file. For example:
+
+```bash
+hadoop s3guard import [-meta URI] s3a://my-bucket/file-with-bad-metadata
+```
+
## Troubleshooting
### Error: `S3Guard table lacks version marker.`
@@ -1152,7 +1257,7 @@ java.io.IOException: Invalid region specified "iceland-2":
The region specified in `fs.s3a.s3guard.ddb.region` is invalid.
-# "Neither ReadCapacityUnits nor WriteCapacityUnits can be specified when BillingMode is PAY_PER_REQUEST"
+### "Neither ReadCapacityUnits nor WriteCapacityUnits can be specified when BillingMode is PAY_PER_REQUEST"
```
ValidationException; One or more parameter values were invalid:
@@ -1164,6 +1269,41 @@ ValidationException; One or more parameter values were invalid:
On-Demand DynamoDB tables do not have any fixed capacity -it is an error
to try to change it with the `set-capacity` command.
+### Error `RemoteFileChangedException`
+
+An exception like the following could occur for a couple of reasons:
+
+* the S3Guard metadata is out of sync with the true S3 metadata. For
+example, the S3Guard DynamoDB table is tracking a different ETag than the ETag
+shown in the exception. This may suggest the object was updated in S3 without
+involvement from S3Guard or there was a transient failure when S3Guard tried to
+write to DynamoDB.
+
+* S3 is exhibiting read-after-overwrite temporary inconsistency. The S3Guard
+metadata was updated with a new ETag during a recent write, but the current read
+is not seeing that ETag due to S3 eventual consistency. This exception prevents
+the reader from an inconsistent read where the reader sees an older version of
+the file.
+
+```
+org.apache.hadoop.fs.s3a.RemoteFileChangedException: open 's3a://my-bucket/test/file.txt':
+ Change reported by S3 while reading at position 0.
+ ETag 4e886e26c072fef250cfaf8037675405 was unavailable
+ at org.apache.hadoop.fs.s3a.impl.ChangeTracker.processResponse(ChangeTracker.java:167)
+ at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:207)
+ at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:355)
+ at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$2(Invoker.java:195)
+ at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
+ at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
+ at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
+ at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
+ at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:193)
+ at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:215)
+ at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:348)
+ at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:381)
+ at java.io.FilterInputStream.read(FilterInputStream.java:83)
+```
+
## Other Topics
For details on how to test S3Guard, see [Testing S3Guard](./testing.html#s3guard)
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
index 3123221bd8293..8cdac9e35263f 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
@@ -970,8 +970,8 @@ and the like. The standard strategy here is to save to HDFS and then copy to S3.
```
org.apache.hadoop.fs.s3a.RemoteFileChangedException: re-open `s3a://my-bucket/test/file.txt':
- ETag change reported by S3 while reading at position 1949.
- Version f9c186d787d4de9657e99f280ba26555 was unavailable
+ Change reported by S3 while reading at position 1949.
+ ETag f9c186d787d4de9657e99f280ba26555 was unavailable
at org.apache.hadoop.fs.s3a.impl.ChangeTracker.processResponse(ChangeTracker.java:137)
at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:200)
at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:346)
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
index 03c91e62cedce..886795a9d90fc 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
@@ -56,19 +56,26 @@ public abstract class AbstractS3AMockTest {
@Before
public void setup() throws Exception {
+ Configuration conf = createConfiguration();
+ fs = new S3AFileSystem();
+ URI uri = URI.create(FS_S3A + "://" + BUCKET);
+ fs.initialize(uri, conf);
+ s3 = fs.getAmazonS3ClientForTesting("mocking");
+ }
+
+ public Configuration createConfiguration() {
Configuration conf = new Configuration();
conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class,
S3ClientFactory.class);
- // We explicitly disable MetadataStore even if it's configured. For unit
+ // We explicitly disable MetadataStore. For unit
// test we don't issue request to AWS DynamoDB service.
conf.setClass(S3_METADATA_STORE_IMPL, NullMetadataStore.class,
MetadataStore.class);
// FS is always magic
conf.setBoolean(CommitConstants.MAGIC_COMMITTER_ENABLED, true);
- fs = new S3AFileSystem();
- URI uri = URI.create(FS_S3A + "://" + BUCKET);
- fs.initialize(uri, conf);
- s3 = fs.getAmazonS3ClientForTesting("mocking");
+ // use minimum multipart size for faster triggering
+ conf.setLong(Constants.MULTIPART_SIZE, MULTIPART_MIN_SIZE);
+ return conf;
}
@After
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADelayedFNF.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADelayedFNF.java
index 7abd47497646e..94c5233f19a15 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADelayedFNF.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADelayedFNF.java
@@ -22,7 +22,10 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source;
import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.Assume;
import org.junit.Test;
import java.io.FileNotFoundException;
@@ -43,6 +46,12 @@ public class ITestS3ADelayedFNF extends AbstractS3ATestBase {
@Test
public void testNotFoundFirstRead() throws Exception {
FileSystem fs = getFileSystem();
+ ChangeDetectionPolicy changeDetectionPolicy =
+ ((S3AFileSystem) fs).getChangeDetectionPolicy();
+ Assume.assumeFalse("FNF not expected when using a bucket with"
+ + " object versioning",
+ changeDetectionPolicy.getSource() == Source.VersionId);
+
Path p = path("some-file");
ContractTestUtils.createFile(fs, p, false, new byte[] {20, 21, 22});
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java
index 6ac803e3085eb..c82a8b9789486 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInconsistency.java
@@ -24,9 +24,12 @@
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.Assume;
import org.junit.Test;
import java.io.FileNotFoundException;
@@ -106,6 +109,12 @@ public void testGetFileStatus() throws Exception {
@Test
public void testOpenDeleteRead() throws Exception {
S3AFileSystem fs = getFileSystem();
+ ChangeDetectionPolicy changeDetectionPolicy =
+ ((S3AFileSystem) fs).getChangeDetectionPolicy();
+ Assume.assumeFalse("FNF not expected when using a bucket with"
+ + " object versioning",
+ changeDetectionPolicy.getSource() == Source.VersionId);
+
Path p = path("testOpenDeleteRead.txt");
writeTextFile(fs, p, "1337c0d3z", true);
try (InputStream s = fs.open(p)) {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java
index 98dd2026f5f0d..11f4a383bef35 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARemoteFileChanged.java
@@ -18,11 +18,17 @@
package org.apache.hadoop.fs.s3a;
+import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collection;
+import org.apache.commons.io.IOUtils;
import org.junit.Assume;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -33,63 +39,112 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy.Source;
+import org.apache.hadoop.fs.s3a.s3guard.LocalMetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
+import static org.apache.hadoop.fs.s3a.select.SelectConstants.SELECT_SQL;
import static org.apache.hadoop.test.LambdaTestUtils.eventually;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
/**
* Test S3A remote file change detection.
*/
@RunWith(Parameterized.class)
public class ITestS3ARemoteFileChanged extends AbstractS3ATestBase {
+
private static final Logger LOG =
LoggerFactory.getLogger(ITestS3ARemoteFileChanged.class);
+ private static final String TEST_DATA = "Some test data";
+ private static final String QUOTED_TEST_DATA =
+ "\"" + TEST_DATA + "\"";
+
+ private enum InteractionType {
+ READ, READ_AFTER_DELETE, COPY, SELECT
+ }
+
private final String changeDetectionSource;
private final String changeDetectionMode;
- private final boolean expectChangeException;
- private final boolean expectFileNotFoundException;
+ private final Collection expectedExceptionInteractions;
+ private S3AFileSystem fs;
@Parameterized.Parameters
public static Collection