Skip to content

Commit ae1f928

Browse files
authored
HBASE-24051 Allows indirect inheritance to CanUnbuffer (#1406)
Signed-off-by: stack <[email protected]>
1 parent 040a0a7 commit ae1f928

File tree

2 files changed

+99
-14
lines changed

2 files changed

+99
-14
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.atomic.AtomicInteger;
2626

2727
import org.apache.commons.io.IOUtils;
28+
import org.apache.hadoop.fs.CanUnbuffer;
2829
import org.apache.hadoop.fs.FSDataInputStream;
2930
import org.apache.hadoop.fs.FileSystem;
3031
import org.apache.hadoop.fs.Path;
@@ -331,22 +332,18 @@ public void unbuffer() {
331332
if (this.instanceOfCanUnbuffer == null) {
332333
// To ensure we compute whether the stream is instance of CanUnbuffer only once.
333334
this.instanceOfCanUnbuffer = false;
334-
Class<?>[] streamInterfaces = streamClass.getInterfaces();
335-
for (Class c : streamInterfaces) {
336-
if (c.getCanonicalName().toString().equals("org.apache.hadoop.fs.CanUnbuffer")) {
337-
try {
338-
this.unbuffer = streamClass.getDeclaredMethod("unbuffer");
339-
} catch (NoSuchMethodException | SecurityException e) {
340-
if (isLogTraceEnabled) {
341-
LOG.trace("Failed to find 'unbuffer' method in class " + streamClass
342-
+ " . So there may be a TCP socket connection "
343-
+ "left open in CLOSE_WAIT state.", e);
344-
}
345-
return;
335+
if(wrappedStream instanceof CanUnbuffer){
336+
try {
337+
this.unbuffer = streamClass.getDeclaredMethod("unbuffer");
338+
} catch (NoSuchMethodException | SecurityException e) {
339+
if (isLogTraceEnabled) {
340+
LOG.trace("Failed to find 'unbuffer' method in class " + streamClass
341+
+ " . So there may be a TCP socket connection "
342+
+ "left open in CLOSE_WAIT state.", e);
346343
}
347-
this.instanceOfCanUnbuffer = true;
348-
break;
344+
return;
349345
}
346+
this.instanceOfCanUnbuffer = true;
350347
}
351348
}
352349
if (this.instanceOfCanUnbuffer) {
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.io;
19+
20+
import org.apache.hadoop.fs.CanUnbuffer;
21+
import org.apache.hadoop.hbase.HBaseClassTestRule;
22+
import org.apache.hadoop.hbase.testclassification.IOTests;
23+
import org.apache.hadoop.hbase.testclassification.SmallTests;
24+
import org.junit.Assert;
25+
import org.junit.ClassRule;
26+
import org.junit.Test;
27+
import org.junit.experimental.categories.Category;
28+
29+
import java.io.IOException;
30+
import java.io.InputStream;
31+
import java.lang.reflect.Method;
32+
33+
@Category({IOTests.class, SmallTests.class})
34+
public class TestFSDataInputStreamWrapper {
35+
private Method unbuffer = null;
36+
37+
@ClassRule
38+
public static final HBaseClassTestRule CLASS_RULE =
39+
HBaseClassTestRule.forClass(TestFSDataInputStreamWrapper.class);
40+
41+
42+
@Test
43+
public void TestUnbuffer() {
44+
InputStream wrappedStream = new SonStream();
45+
final Class<? extends InputStream> streamClass = wrappedStream.getClass();
46+
Class<?>[] streamInterfaces = streamClass.getInterfaces();
47+
for (Class c : streamInterfaces) {
48+
if (c.getCanonicalName().toString().equals("org.apache.hadoop.fs.CanUnbuffer")) {
49+
try {
50+
this.unbuffer = streamClass.getDeclaredMethod("unbuffer");
51+
} catch (NoSuchMethodException | SecurityException e) {
52+
return;
53+
}
54+
break;
55+
}
56+
}
57+
Assert.assertEquals(false, unbuffer != null);
58+
unbuffer = null;
59+
if (wrappedStream instanceof CanUnbuffer) {
60+
try {
61+
this.unbuffer = streamClass.getDeclaredMethod("unbuffer");
62+
} catch (NoSuchMethodException | SecurityException e) {
63+
return;
64+
}
65+
}
66+
Assert.assertEquals(true, unbuffer != null);
67+
}
68+
69+
public class SonStream extends FatherStream {
70+
@Override
71+
public void unbuffer() {
72+
73+
}
74+
}
75+
76+
public class FatherStream extends InputStream implements CanUnbuffer {
77+
78+
@Override
79+
public void unbuffer() {
80+
81+
}
82+
83+
@Override
84+
public int read() throws IOException {
85+
return 0;
86+
}
87+
}
88+
}

0 commit comments

Comments
 (0)