Skip to content

Commit 2382f63

Browse files
Sahil Takiarsteveloughran
authored andcommitted
HADOOP-14747. S3AInputStream to implement CanUnbuffer.
Author: Sahil Takiar <[email protected]>
1 parent 626fec6 commit 2382f63

File tree

10 files changed

+438
-7
lines changed

10 files changed

+438
-7
lines changed

hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,43 @@ class, which can react to a checksum error in a read by attempting to source
275275
the data elsewhere. If a new source can be found it attempts to reread and
276276
recheck that portion of the file.
277277

278+
### `CanUnbuffer.unbuffer()`
279+
280+
This operation instructs the source to release any system resources they are
281+
currently holding on to, such as buffers, sockets, file descriptors, etc. Any
282+
subsequent IO operation will likely have to reacquire these resources.
283+
Unbuffering is useful in situation where streams need to remain open, but no IO
284+
operation is expected from the stream in the immediate future (examples include
285+
file handle cacheing).
286+
287+
#### Preconditions
288+
289+
Not all subclasses implement this operation. In addition to implementing
290+
`CanUnbuffer`. Subclasses must implement the `StreamCapabilities` interface and
291+
`StreamCapabilities.hasCapability(UNBUFFER)` must return true. If a subclass
292+
implements `CanUnbuffer` but does not report the functionality via
293+
`StreamCapabilities` then the call to `unbuffer` does nothing. If a subclass
294+
reports that it does implement `UNBUFFER`, but does not implement the
295+
`CanUnbuffer` interface, an `UnsupportedOperationException` is thrown.
296+
297+
supported(FSDIS, StreamCapabilities.hasCapability && FSDIS.hasCapability(UNBUFFER) && CanUnbuffer.unbuffer)
298+
299+
This method is not thread-safe. If `unbuffer` is called while a `read` is in
300+
progress, the outcome is undefined.
301+
302+
`unbuffer` can be called on a closed file, in which case `unbuffer` will do
303+
nothing.
304+
305+
#### Postconditions
306+
307+
The majority of subclasses that do not implement this operation simply
308+
do nothing.
309+
310+
If the operation is supported, `unbuffer` releases any and all system resources
311+
associated with the stream. The exact list of what these resources are is
312+
generally implementation dependent, however, in general, it may include
313+
buffers, sockets, file descriptors, etc.
314+
278315
## <a name="PositionedReadable"></a> interface `PositionedReadable`
279316

280317
The `PositionedReadable` operations supply "positioned reads" ("pread").
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.contract;
20+
21+
import org.apache.hadoop.fs.FSDataInputStream;
22+
import org.apache.hadoop.fs.Path;
23+
24+
import org.junit.Test;
25+
26+
import java.io.IOException;
27+
28+
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
29+
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
30+
31+
/**
32+
* Contract tests for {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer}.
33+
*/
34+
public abstract class AbstractContractUnbufferTest extends AbstractFSContractTestBase {
35+
36+
private Path file;
37+
38+
@Override
39+
public void setup() throws Exception {
40+
super.setup();
41+
skipIfUnsupported(SUPPORTS_UNBUFFER);
42+
file = path("unbufferFile");
43+
createFile(getFileSystem(), file, true,
44+
dataset(TEST_FILE_LEN, 0, 255));
45+
}
46+
47+
@Test
48+
public void testUnbufferAfterRead() throws IOException {
49+
describe("unbuffer a file after a single read");
50+
try (FSDataInputStream stream = getFileSystem().open(file)) {
51+
assertEquals(128, stream.read(new byte[128]));
52+
unbuffer(stream);
53+
}
54+
}
55+
56+
@Test
57+
public void testUnbufferBeforeRead() throws IOException {
58+
describe("unbuffer a file before a read");
59+
try (FSDataInputStream stream = getFileSystem().open(file)) {
60+
unbuffer(stream);
61+
assertEquals(128, stream.read(new byte[128]));
62+
}
63+
}
64+
65+
@Test
66+
public void testUnbufferEmptyFile() throws IOException {
67+
Path emptyFile = path("emptyUnbufferFile");
68+
createFile(getFileSystem(), emptyFile, true,
69+
dataset(TEST_FILE_LEN, 0, 255));
70+
describe("unbuffer an empty file");
71+
try (FSDataInputStream stream = getFileSystem().open(emptyFile)) {
72+
unbuffer(stream);
73+
}
74+
}
75+
76+
@Test
77+
public void testUnbufferOnClosedFile() throws IOException {
78+
describe("unbuffer a file before a read");
79+
FSDataInputStream stream = null;
80+
try {
81+
stream = getFileSystem().open(file);
82+
assertEquals(128, stream.read(new byte[128]));
83+
} finally {
84+
if (stream != null) {
85+
stream.close();
86+
}
87+
}
88+
unbuffer(stream);
89+
}
90+
91+
@Test
92+
public void testMultipleUnbuffers() throws IOException {
93+
describe("unbuffer a file multiple times");
94+
try (FSDataInputStream stream = getFileSystem().open(file)) {
95+
unbuffer(stream);
96+
unbuffer(stream);
97+
assertEquals(128, stream.read(new byte[128]));
98+
unbuffer(stream);
99+
unbuffer(stream);
100+
}
101+
}
102+
103+
@Test
104+
public void testUnbufferMultipleReads() throws IOException {
105+
describe("unbuffer a file multiple times");
106+
try (FSDataInputStream stream = getFileSystem().open(file)) {
107+
unbuffer(stream);
108+
assertEquals(128, stream.read(new byte[128]));
109+
unbuffer(stream);
110+
assertEquals(128, stream.read(new byte[128]));
111+
assertEquals(128, stream.read(new byte[128]));
112+
unbuffer(stream);
113+
assertEquals(128, stream.read(new byte[128]));
114+
assertEquals(128, stream.read(new byte[128]));
115+
assertEquals(128, stream.read(new byte[128]));
116+
unbuffer(stream);
117+
}
118+
}
119+
120+
private void unbuffer(FSDataInputStream stream) throws IOException {
121+
long pos = stream.getPos();
122+
stream.unbuffer();
123+
assertEquals(pos, stream.getPos());
124+
}
125+
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,11 @@ public interface ContractOptions {
201201
*/
202202
String SUPPORTS_CONTENT_CHECK = "supports-content-check";
203203

204+
/**
205+
* Indicates that FS supports unbuffer.
206+
*/
207+
String SUPPORTS_UNBUFFER = "supports-unbuffer";
208+
204209
/**
205210
* Maximum path length
206211
* {@value}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.contract.hdfs;
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.fs.contract.AbstractContractUnbufferTest;
23+
import org.apache.hadoop.fs.contract.AbstractFSContract;
24+
25+
import org.junit.AfterClass;
26+
import org.junit.BeforeClass;
27+
28+
import java.io.IOException;
29+
30+
public class TestHDFSContractUnbuffer extends AbstractContractUnbufferTest {
31+
32+
@BeforeClass
33+
public static void createCluster() throws IOException {
34+
HDFSContract.createCluster();
35+
}
36+
37+
@AfterClass
38+
public static void teardownCluster() throws IOException {
39+
HDFSContract.destroyCluster();
40+
}
41+
42+
@Override
43+
protected AbstractFSContract createContract(Configuration conf) {
44+
return new HDFSContract(conf);
45+
}
46+
}

hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,9 @@
111111
<value>true</value>
112112
</property>
113113

114+
<property>
115+
<name>fs.contract.supports-unbuffer</name>
116+
<value>true</value>
117+
</property>
118+
114119
</configuration>

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,17 @@
2525
import com.amazonaws.services.s3.model.S3Object;
2626
import com.amazonaws.services.s3.model.S3ObjectInputStream;
2727
import com.amazonaws.services.s3.model.SSECustomerKey;
28+
import com.google.common.annotations.VisibleForTesting;
2829
import com.google.common.base.Preconditions;
2930
import org.apache.commons.lang3.StringUtils;
3031
import org.apache.hadoop.classification.InterfaceAudience;
3132
import org.apache.hadoop.classification.InterfaceStability;
3233
import org.apache.hadoop.fs.CanSetReadahead;
34+
import org.apache.hadoop.fs.CanUnbuffer;
3335
import org.apache.hadoop.fs.FSExceptionMessages;
3436
import org.apache.hadoop.fs.FSInputStream;
3537
import org.apache.hadoop.fs.PathIOException;
38+
import org.apache.hadoop.fs.StreamCapabilities;
3639
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
3740

3841
import org.slf4j.Logger;
@@ -43,6 +46,7 @@
4346
import java.net.SocketTimeoutException;
4447

4548
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
49+
import static org.apache.hadoop.util.StringUtils.toLowerCase;
4650

4751
/**
4852
* The input stream for an S3A object.
@@ -63,7 +67,8 @@
6367
*/
6468
@InterfaceAudience.Private
6569
@InterfaceStability.Evolving
66-
public class S3AInputStream extends FSInputStream implements CanSetReadahead {
70+
public class S3AInputStream extends FSInputStream implements CanSetReadahead,
71+
CanUnbuffer, StreamCapabilities {
6772

6873
public static final String E_NEGATIVE_READAHEAD_VALUE
6974
= "Negative readahead value";
@@ -175,7 +180,7 @@ private void setInputPolicy(S3AInputPolicy inputPolicy) {
175180
private synchronized void reopen(String reason, long targetPos, long length,
176181
boolean forceAbort) throws IOException {
177182

178-
if (wrappedStream != null) {
183+
if (isObjectStreamOpen()) {
179184
closeStream("reopen(" + reason + ")", contentRangeFinish, forceAbort);
180185
}
181186

@@ -542,7 +547,7 @@ public synchronized void close() throws IOException {
542547
*/
543548
@Retries.OnceRaw
544549
private void closeStream(String reason, long length, boolean forceAbort) {
545-
if (wrappedStream != null) {
550+
if (isObjectStreamOpen()) {
546551

547552
// if the amount of data remaining in the current request is greater
548553
// than the readahead value: abort.
@@ -605,12 +610,11 @@ private void closeStream(String reason, long length, boolean forceAbort) {
605610
@InterfaceStability.Unstable
606611
public synchronized boolean resetConnection() throws IOException {
607612
checkNotClosed();
608-
boolean connectionOpen = wrappedStream != null;
609-
if (connectionOpen) {
613+
if (isObjectStreamOpen()) {
610614
LOG.info("Forced reset of connection to {}", uri);
611615
closeStream("reset()", contentRangeFinish, true);
612616
}
613-
return connectionOpen;
617+
return isObjectStreamOpen();
614618
}
615619

616620
@Override
@@ -677,7 +681,7 @@ public String toString() {
677681
"S3AInputStream{");
678682
sb.append(uri);
679683
sb.append(" wrappedStream=")
680-
.append(wrappedStream != null ? "open" : "closed");
684+
.append(isObjectStreamOpen() ? "open" : "closed");
681685
sb.append(" read policy=").append(inputPolicy);
682686
sb.append(" pos=").append(pos);
683687
sb.append(" nextReadPos=").append(nextReadPos);
@@ -814,4 +818,25 @@ public static long validateReadahead(@Nullable Long readahead) {
814818
return readahead;
815819
}
816820
}
821+
822+
@Override
823+
public synchronized void unbuffer() {
824+
closeStream("unbuffer()", contentRangeFinish, false);
825+
}
826+
827+
@Override
828+
public boolean hasCapability(String capability) {
829+
switch (toLowerCase(capability)) {
830+
case StreamCapabilities.READAHEAD:
831+
case StreamCapabilities.UNBUFFER:
832+
return true;
833+
default:
834+
return false;
835+
}
836+
}
837+
838+
@VisibleForTesting
839+
boolean isObjectStreamOpen() {
840+
return wrappedStream != null;
841+
}
817842
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.contract.s3a;
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.fs.contract.AbstractContractUnbufferTest;
23+
import org.apache.hadoop.fs.contract.AbstractFSContract;
24+
25+
import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
26+
27+
public class ITestS3AContractUnbuffer extends AbstractContractUnbufferTest {
28+
29+
@Override
30+
protected Configuration createConfiguration() {
31+
Configuration conf = super.createConfiguration();
32+
// patch in S3Guard options
33+
maybeEnableS3Guard(conf);
34+
return conf;
35+
}
36+
37+
@Override
38+
protected AbstractFSContract createContract(Configuration conf) {
39+
return new S3AContract(conf);
40+
}
41+
}

0 commit comments

Comments
 (0)