Skip to content

Commit b7ee227

Browse files
Ben Rolingdeepakdamri
authored andcommitted
HADOOP-16085. S3Guard: use object version or etags to protect against inconsistent read after replace/overwrite.
Contributed by Ben Roling. S3Guard will now track the etag of uploaded files and, if an S3 bucket is versioned, the object version. You can then control how to react to a mismatch between the data in the DynamoDB table and that in the store: warn, fail, or, when using versions, return the original value. This adds two new columns to the table: etag and version. This is transparent to older S3A clients -but when such clients add/update data to the S3Guard table, they will not add these values. As a result, the etag/version checks will not work with files uploaded by older clients. For a consistent experience, upgrade all clients to use the latest hadoop version.
1 parent af36221 commit b7ee227

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2601
-199
lines changed

hadoop-tools/hadoop-aws/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,11 @@
405405
<artifactId>hadoop-common</artifactId>
406406
<scope>provided</scope>
407407
</dependency>
408+
<dependency>
409+
<groupId>org.apache.httpcomponents</groupId>
410+
<artifactId>httpcore</artifactId>
411+
<scope>provided</scope>
412+
</dependency>
408413
<dependency>
409414
<groupId>org.apache.hadoop</groupId>
410415
<artifactId>hadoop-common</artifactId>

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,33 @@ public void retry(String action,
197197
});
198198
}
199199

200+
/**
201+
* Execute a void operation with retry processing when doRetry=true, else
202+
* just once.
203+
* @param doRetry true if retries should be performed
204+
* @param action action to execute (used in error messages)
205+
* @param path path of work (used in error messages)
206+
* @param idempotent does the operation have semantics
207+
* which mean that it can be retried even if was already executed?
208+
* @param retrying callback on retries
209+
* @param operation operation to execute
210+
* @throws IOException any IOE raised, or translated exception
211+
*/
212+
@Retries.RetryTranslated
213+
public void maybeRetry(boolean doRetry,
214+
String action,
215+
String path,
216+
boolean idempotent,
217+
Retried retrying,
218+
VoidOperation operation)
219+
throws IOException {
220+
maybeRetry(doRetry, action, path, idempotent, retrying,
221+
() -> {
222+
operation.execute();
223+
return null;
224+
});
225+
}
226+
200227
/**
201228
* Execute a void operation with the default retry callback invoked.
202229
* @param action action to execute (used in error messages)
@@ -215,6 +242,28 @@ public void retry(String action,
215242
retry(action, path, idempotent, retryCallback, operation);
216243
}
217244

245+
/**
246+
* Execute a void operation with the default retry callback invoked when
247+
* doRetry=true, else just once.
248+
* @param doRetry true if retries should be performed
249+
* @param action action to execute (used in error messages)
250+
* @param path path of work (used in error messages)
251+
* @param idempotent does the operation have semantics
252+
* which mean that it can be retried even if was already executed?
253+
* @param operation operation to execute
254+
* @throws IOException any IOE raised, or translated exception
255+
*/
256+
@Retries.RetryTranslated
257+
public void maybeRetry(
258+
boolean doRetry,
259+
String action,
260+
String path,
261+
boolean idempotent,
262+
VoidOperation operation)
263+
throws IOException {
264+
maybeRetry(doRetry, action, path, idempotent, retryCallback, operation);
265+
}
266+
218267
/**
219268
* Execute a function with the default retry callback invoked.
220269
* @param action action to execute (used in error messages)
@@ -265,6 +314,41 @@ public <T> T retry(
265314
() -> once(action, path, operation));
266315
}
267316

317+
/**
318+
* Execute a function with retry processing when doRetry=true, else just once.
319+
* Uses {@link #once(String, String, Operation)} as the inner
320+
* invocation mechanism before retry logic is performed.
321+
* @param <T> type of return value
322+
* @param doRetry true if retries should be performed
323+
* @param action action to execute (used in error messages)
324+
* @param path path of work (used in error messages)
325+
* @param idempotent does the operation have semantics
326+
* which mean that it can be retried even if was already executed?
327+
* @param retrying callback on retries
328+
* @param operation operation to execute
329+
* @return the result of the call
330+
* @throws IOException any IOE raised, or translated exception
331+
*/
332+
@Retries.RetryTranslated
333+
public <T> T maybeRetry(
334+
boolean doRetry,
335+
String action,
336+
@Nullable String path,
337+
boolean idempotent,
338+
Retried retrying,
339+
Operation<T> operation)
340+
throws IOException {
341+
if (doRetry) {
342+
return retryUntranslated(
343+
toDescription(action, path),
344+
idempotent,
345+
retrying,
346+
() -> once(action, path, operation));
347+
} else {
348+
return once(action, path, operation);
349+
}
350+
}
351+
268352
/**
269353
* Execute a function with retry processing and no translation.
270354
* and the default retry callback.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RemoteFileChangedException.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
@InterfaceStability.Unstable
3333
public class RemoteFileChangedException extends PathIOException {
3434

35+
public static final String PRECONDITIONS_FAILED =
36+
"Constraints of request were unsatisfiable";
37+
3538
/**
3639
* Constructs a RemoteFileChangedException.
3740
*
@@ -46,4 +49,21 @@ public RemoteFileChangedException(String path,
4649
super(path, message);
4750
setOperation(operation);
4851
}
52+
53+
/**
54+
* Constructs a RemoteFileChangedException.
55+
*
56+
* @param path the path accessed when the change was detected
57+
* @param operation the operation (e.g. open, re-open) performed when the
58+
* change was detected
59+
* @param message a message providing more details about the condition
60+
* @param cause inner cause.
61+
*/
62+
public RemoteFileChangedException(String path,
63+
String operation,
64+
String message,
65+
Throwable cause) {
66+
super(path, message, cause);
67+
setOperation(operation);
68+
}
4969
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
@InterfaceStability.Evolving
3333
public class S3AFileStatus extends FileStatus {
3434
private Tristate isEmptyDirectory;
35+
private String eTag;
36+
private String versionId;
3537

3638
/**
3739
* Create a directory status.
@@ -69,15 +71,17 @@ public S3AFileStatus(Tristate isemptydir,
6971
* @param path path
7072
* @param blockSize block size
7173
* @param owner owner
74+
* @param eTag eTag of the S3 object if available, else null
75+
* @param versionId versionId of the S3 object if available, else null
7276
*/
7377
public S3AFileStatus(long length, long modification_time, Path path,
74-
long blockSize, String owner) {
75-
super(length, false, 1, blockSize, modification_time, 0,
76-
null, null, null, null,
78+
long blockSize, String owner, String eTag, String versionId) {
79+
super(length, false, 1, blockSize, modification_time,
80+
0, null, owner, owner, null,
7781
path, false, true, false);
7882
isEmptyDirectory = Tristate.FALSE;
79-
setOwner(owner);
80-
setGroup(owner);
83+
this.eTag = eTag;
84+
this.versionId = versionId;
8185
}
8286

8387
/**
@@ -86,16 +90,19 @@ public S3AFileStatus(long length, long modification_time, Path path,
8690
* @param source FileStatus to convert to S3AFileStatus
8791
* @param isEmptyDirectory TRUE/FALSE if known to be / not be an empty
8892
* directory, UNKNOWN if that information was not computed.
93+
* @param eTag eTag of the S3 object if available, else null
94+
* @param versionId versionId of the S3 object if available, else null
8995
* @return a new S3AFileStatus
9096
*/
9197
public static S3AFileStatus fromFileStatus(FileStatus source,
92-
Tristate isEmptyDirectory) {
98+
Tristate isEmptyDirectory, String eTag, String versionId) {
9399
if (source.isDirectory()) {
94100
return new S3AFileStatus(isEmptyDirectory, source.getPath(),
95101
source.getOwner());
96102
} else {
97103
return new S3AFileStatus(source.getLen(), source.getModificationTime(),
98-
source.getPath(), source.getBlockSize(), source.getOwner());
104+
source.getPath(), source.getBlockSize(), source.getOwner(),
105+
eTag, versionId);
99106
}
100107
}
101108

@@ -109,6 +116,28 @@ public Tristate isEmptyDirectory() {
109116
return isEmptyDirectory;
110117
}
111118

119+
/**
120+
* Update isEmptyDirectory attribute.
121+
* @param isEmptyDirectory new isEmptyDirectory value
122+
*/
123+
public void setIsEmptyDirectory(Tristate isEmptyDirectory) {
124+
this.isEmptyDirectory = isEmptyDirectory;
125+
}
126+
127+
/**
128+
* @return the S3 object eTag when available, else null.
129+
*/
130+
public String getETag() {
131+
return eTag;
132+
}
133+
134+
/**
135+
* @return the S3 object versionId when available, else null.
136+
*/
137+
public String getVersionId() {
138+
return versionId;
139+
}
140+
112141
/** Compare if this object is equal to another object.
113142
* @param o the object to be compared.
114143
* @return true if two file status has the same path name; false if not.
@@ -150,8 +179,10 @@ public long getModificationTime(){
150179

151180
@Override
152181
public String toString() {
153-
return super.toString() +
154-
String.format(" isEmptyDirectory=%s", isEmptyDirectory().name());
182+
return super.toString()
183+
+ String.format(" isEmptyDirectory=%s", isEmptyDirectory().name()
184+
+ String.format(" eTag=%s", eTag)
185+
+ String.format(" versionId=%s", versionId));
155186
}
156187

157188
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a;
20+
21+
import org.apache.hadoop.fs.BlockLocation;
22+
import org.apache.hadoop.fs.LocatedFileStatus;
23+
24+
import static com.google.common.base.Preconditions.checkNotNull;
25+
26+
/**
27+
* {@link LocatedFileStatus} extended to also carry ETag and object version ID.
28+
*/
29+
public class S3ALocatedFileStatus extends LocatedFileStatus {
30+
31+
private static final long serialVersionUID = 3597192103662929338L;
32+
33+
private final String eTag;
34+
private final String versionId;
35+
36+
public S3ALocatedFileStatus(S3AFileStatus status, BlockLocation[] locations,
37+
String eTag, String versionId) {
38+
super(checkNotNull(status), locations);
39+
this.eTag = eTag;
40+
this.versionId = versionId;
41+
}
42+
43+
public String getETag() {
44+
return eTag;
45+
}
46+
47+
public String getVersionId() {
48+
return versionId;
49+
}
50+
51+
// equals() and hashCode() overridden to avoid FindBugs warning.
52+
// Base implementation is equality on Path only, which is still appropriate.
53+
54+
@Override
55+
public boolean equals(Object o) {
56+
return super.equals(o);
57+
}
58+
59+
@Override
60+
public int hashCode() {
61+
return super.hashCode();
62+
}
63+
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -531,16 +531,20 @@ public static String stringify(AmazonS3Exception e) {
531531
* @param summary summary from AWS
532532
* @param blockSize block size to declare.
533533
* @param owner owner of the file
534+
* @param eTag S3 object eTag or null if unavailable
535+
* @param versionId S3 object versionId or null if unavailable
534536
* @return a status entry
535537
*/
536538
public static S3AFileStatus createFileStatus(Path keyPath,
537539
S3ObjectSummary summary,
538540
long blockSize,
539-
String owner) {
541+
String owner,
542+
String eTag,
543+
String versionId) {
540544
long size = summary.getSize();
541545
return createFileStatus(keyPath,
542546
objectRepresentsDirectory(summary.getKey(), size),
543-
size, summary.getLastModified(), blockSize, owner);
547+
size, summary.getLastModified(), blockSize, owner, eTag, versionId);
544548
}
545549

546550
/**
@@ -553,22 +557,27 @@ public static S3AFileStatus createFileStatus(Path keyPath,
553557
* @param size file length
554558
* @param blockSize block size for file status
555559
* @param owner Hadoop username
560+
* @param eTag S3 object eTag or null if unavailable
561+
* @param versionId S3 object versionId or null if unavailable
556562
* @return a status entry
557563
*/
558564
public static S3AFileStatus createUploadFileStatus(Path keyPath,
559-
boolean isDir, long size, long blockSize, String owner) {
565+
boolean isDir, long size, long blockSize, String owner,
566+
String eTag, String versionId) {
560567
Date date = isDir ? null : new Date();
561-
return createFileStatus(keyPath, isDir, size, date, blockSize, owner);
568+
return createFileStatus(keyPath, isDir, size, date, blockSize, owner,
569+
eTag, versionId);
562570
}
563571

564572
/* Date 'modified' is ignored when isDir is true. */
565573
private static S3AFileStatus createFileStatus(Path keyPath, boolean isDir,
566-
long size, Date modified, long blockSize, String owner) {
574+
long size, Date modified, long blockSize, String owner,
575+
String eTag, String versionId) {
567576
if (isDir) {
568577
return new S3AFileStatus(Tristate.UNKNOWN, keyPath, owner);
569578
} else {
570579
return new S3AFileStatus(size, dateToLong(modified), keyPath, blockSize,
571-
owner);
580+
owner, eTag, versionId);
572581
}
573582
}
574583

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3GuardExistsRetryPolicy.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public S3GuardExistsRetryPolicy(Configuration conf) {
4242
protected Map<Class<? extends Exception>, RetryPolicy> createExceptionMap() {
4343
Map<Class<? extends Exception>, RetryPolicy> b = super.createExceptionMap();
4444
b.put(FileNotFoundException.class, retryIdempotentCalls);
45+
b.put(RemoteFileChangedException.class, retryIdempotentCalls);
4546
return b;
4647
}
4748
}

0 commit comments

Comments
 (0)