|
25 | 25 | import java.util.concurrent.atomic.AtomicInteger; |
26 | 26 |
|
27 | 27 | import org.apache.commons.io.IOUtils; |
28 | | -import org.apache.hadoop.fs.CanUnbuffer; |
29 | 28 | import org.apache.hadoop.fs.FSDataInputStream; |
30 | 29 | import org.apache.hadoop.fs.FileSystem; |
31 | 30 | import org.apache.hadoop.fs.Path; |
@@ -271,18 +270,22 @@ public void unbuffer() { |
271 | 270 | if (this.instanceOfCanUnbuffer == null) { |
272 | 271 | // To ensure we compute whether the stream is instance of CanUnbuffer only once. |
273 | 272 | this.instanceOfCanUnbuffer = false; |
274 | | - if(wrappedStream instanceof CanUnbuffer){ |
275 | | - try { |
276 | | - this.unbuffer = streamClass.getDeclaredMethod("unbuffer"); |
277 | | - } catch (NoSuchMethodException | SecurityException e) { |
278 | | - if (isLogTraceEnabled) { |
279 | | - LOG.trace("Failed to find 'unbuffer' method in class " + streamClass |
280 | | - + " . So there may be a TCP socket connection " |
281 | | - + "left open in CLOSE_WAIT state.", e); |
| 273 | + Class<?>[] streamInterfaces = streamClass.getInterfaces(); |
| 274 | + for (Class c : streamInterfaces) { |
| 275 | + if (c.getCanonicalName().toString().equals("org.apache.hadoop.fs.CanUnbuffer")) { |
| 276 | + try { |
| 277 | + this.unbuffer = streamClass.getDeclaredMethod("unbuffer"); |
| 278 | + } catch (NoSuchMethodException | SecurityException e) { |
| 279 | + if (isLogTraceEnabled) { |
| 280 | + LOG.trace("Failed to find 'unbuffer' method in class " + streamClass |
| 281 | + + " . So there may be a TCP socket connection " |
| 282 | + + "left open in CLOSE_WAIT state.", e); |
| 283 | + } |
| 284 | + return; |
282 | 285 | } |
283 | | - return; |
| 286 | + this.instanceOfCanUnbuffer = true; |
| 287 | + break; |
284 | 288 | } |
285 | | - this.instanceOfCanUnbuffer = true; |
286 | 289 | } |
287 | 290 | } |
288 | 291 | if (this.instanceOfCanUnbuffer) { |
|
0 commit comments