|
17 | 17 |
|
18 | 18 | package org.apache.spark.network |
19 | 19 |
|
20 | | -import java.io.{FileInputStream, RandomAccessFile, File, InputStream} |
| 20 | +import java.io._ |
21 | 21 | import java.nio.ByteBuffer |
22 | 22 | import java.nio.channels.FileChannel |
23 | 23 | import java.nio.channels.FileChannel.MapMode |
24 | 24 |
|
| 25 | +import scala.util.Try |
| 26 | + |
25 | 27 | import com.google.common.io.ByteStreams |
26 | 28 | import io.netty.buffer.{ByteBufInputStream, ByteBuf} |
27 | 29 |
|
28 | | -import org.apache.spark.util.ByteBufferInputStream |
| 30 | +import org.apache.spark.util.{ByteBufferInputStream, Utils} |
29 | 31 |
|
30 | 32 |
|
31 | 33 | /** |
@@ -71,18 +73,47 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt |
71 | 73 | try { |
72 | 74 | channel = new RandomAccessFile(file, "r").getChannel |
73 | 75 | channel.map(MapMode.READ_ONLY, offset, length) |
| 76 | + } catch { |
| 77 | + case e: IOException => |
| 78 | + Try(channel.size).toOption match { |
| 79 | + case Some(fileLen) => |
| 80 | + throw new IOException(s"Error in reading $this (actual file length $fileLen)", e) |
| 81 | + case None => |
| 82 | + throw new IOException(s"Error in opening $this", e) |
| 83 | + } |
74 | 84 | } finally { |
75 | 85 | if (channel != null) { |
76 | | - channel.close() |
| 86 | + Utils.tryLog(channel.close()) |
77 | 87 | } |
78 | 88 | } |
79 | 89 | } |
80 | 90 |
|
81 | 91 | override def inputStream(): InputStream = { |
82 | | - val is = new FileInputStream(file) |
83 | | - is.skip(offset) |
84 | | - ByteStreams.limit(is, length) |
| 92 | + var is: FileInputStream = null |
| 93 | + try { |
| 94 | + is = new FileInputStream(file) |
| 95 | + is.skip(offset) |
| 96 | + ByteStreams.limit(is, length) |
| 97 | + } catch { |
| 98 | + case e: IOException => |
| 99 | + if (is != null) { |
| 100 | + Utils.tryLog(is.close()) |
| 101 | + } |
| 102 | + Try(file.length).toOption match { |
| 103 | + case Some(fileLen) => |
| 104 | + throw new IOException(s"Error in reading $this (actual file length $fileLen)", e) |
| 105 | + case None => |
| 106 | + throw new IOException(s"Error in opening $this", e) |
| 107 | + } |
| 108 | + case e: Throwable => |
| 109 | + if (is != null) { |
| 110 | + Utils.tryLog(is.close()) |
| 111 | + } |
| 112 | + throw e |
| 113 | + } |
85 | 114 | } |
| 115 | + |
| 116 | + override def toString: String = s"${getClass.getName}($file, $offset, $length)" |
86 | 117 | } |
87 | 118 |
|
88 | 119 |
|
|
0 commit comments