|
25 | 25 | import java.util.concurrent.atomic.AtomicInteger; |
26 | 26 |
|
27 | 27 | import org.apache.commons.io.IOUtils; |
28 | | -import org.apache.hadoop.fs.CanUnbuffer; |
29 | 28 | import org.apache.hadoop.fs.FSDataInputStream; |
30 | 29 | import org.apache.hadoop.fs.FileSystem; |
31 | 30 | import org.apache.hadoop.fs.Path; |
@@ -332,18 +331,22 @@ public void unbuffer() { |
332 | 331 | if (this.instanceOfCanUnbuffer == null) { |
333 | 332 | // To ensure we compute whether the stream is instance of CanUnbuffer only once. |
334 | 333 | this.instanceOfCanUnbuffer = false; |
335 | | - if(wrappedStream instanceof CanUnbuffer){ |
336 | | - try { |
337 | | - this.unbuffer = streamClass.getDeclaredMethod("unbuffer"); |
338 | | - } catch (NoSuchMethodException | SecurityException e) { |
339 | | - if (isLogTraceEnabled) { |
340 | | - LOG.trace("Failed to find 'unbuffer' method in class " + streamClass |
341 | | - + " . So there may be a TCP socket connection " |
342 | | - + "left open in CLOSE_WAIT state.", e); |
| 334 | + Class<?>[] streamInterfaces = streamClass.getInterfaces(); |
| 335 | + for (Class c : streamInterfaces) { |
| 336 | + if (c.getCanonicalName().toString().equals("org.apache.hadoop.fs.CanUnbuffer")) { |
| 337 | + try { |
| 338 | + this.unbuffer = streamClass.getDeclaredMethod("unbuffer"); |
| 339 | + } catch (NoSuchMethodException | SecurityException e) { |
| 340 | + if (isLogTraceEnabled) { |
| 341 | + LOG.trace("Failed to find 'unbuffer' method in class " + streamClass |
| 342 | + + " . So there may be a TCP socket connection " |
| 343 | + + "left open in CLOSE_WAIT state.", e); |
| 344 | + } |
| 345 | + return; |
343 | 346 | } |
344 | | - return; |
| 347 | + this.instanceOfCanUnbuffer = true; |
| 348 | + break; |
345 | 349 | } |
346 | | - this.instanceOfCanUnbuffer = true; |
347 | 350 | } |
348 | 351 | } |
349 | 352 | if (this.instanceOfCanUnbuffer) { |
|
0 commit comments