From 323dfec9734111c923772b2b0203766b620b0153 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 29 Sep 2014 11:13:44 -0700 Subject: [PATCH 1/2] Add more debug message. --- .../apache/spark/network/ManagedBuffer.scala | 39 +++++++++++++++++-- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala index e990c1da6730f..37b9939f90bf8 100644 --- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala +++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala @@ -17,11 +17,13 @@ package org.apache.spark.network -import java.io.{FileInputStream, RandomAccessFile, File, InputStream} +import java.io._ import java.nio.ByteBuffer import java.nio.channels.FileChannel import java.nio.channels.FileChannel.MapMode +import scala.util.Try + import com.google.common.io.ByteStreams import io.netty.buffer.{ByteBufInputStream, ByteBuf} @@ -71,6 +73,14 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt try { channel = new RandomAccessFile(file, "r").getChannel channel.map(MapMode.READ_ONLY, offset, length) + } catch { + case e: IOException => + Try(channel.size).toOption match { + case Some(fileLen) => + throw new IOException(s"Error in reading $this (actual file length $fileLen)", e) + case None => + throw new IOException(s"Error in opening $this", e) + } } finally { if (channel != null) { channel.close() @@ -79,10 +89,31 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt } override def inputStream(): InputStream = { - val is = new FileInputStream(file) - is.skip(offset) - ByteStreams.limit(is, length) + var is: FileInputStream = null + try { + is = new FileInputStream(file) + is.skip(offset) + ByteStreams.limit(is, length) + } catch { + case e: IOException => + if (is != null) { + is.close() + } + Try(file.length).toOption match { + case Some(fileLen) => + throw new IOException(s"Error in reading $this (actual file length $fileLen)", e) + case None => + throw new IOException(s"Error in opening $this", e) + } + case e: Throwable => + if (is != null) { + is.close() + } + throw e + } } + + override def toString: String = s"${getClass.getName}($file, $offset, $length)" } From 5814292ec2f9f438ef2eb822504ef3d9a5100f96 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 29 Sep 2014 11:30:13 -0700 Subject: [PATCH 2/2] Logging close() in case close() fails. --- .../org/apache/spark/network/ManagedBuffer.scala | 8 ++++---- .../main/scala/org/apache/spark/util/Utils.scala | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala index 37b9939f90bf8..a4409181ec907 100644 --- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala +++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala @@ -27,7 +27,7 @@ import scala.util.Try import com.google.common.io.ByteStreams import io.netty.buffer.{ByteBufInputStream, ByteBuf} -import org.apache.spark.util.ByteBufferInputStream +import org.apache.spark.util.{ByteBufferInputStream, Utils} /** @@ -83,7 +83,7 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt } } finally { if (channel != null) { - channel.close() + Utils.tryLog(channel.close()) } } } @@ -97,7 +97,7 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt } catch { case e: IOException => if (is != null) { - is.close() + Utils.tryLog(is.close()) } Try(file.length).toOption match { case Some(fileLen) => @@ -107,7 +107,7 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt } case e: Throwable => if (is != null) { - is.close() + Utils.tryLog(is.close()) } throw e } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 2755887feeeff..10d440828e323 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1304,6 +1304,20 @@ private[spark] object Utils extends Logging { } } + /** Executes the given block in a Try, logging any uncaught exceptions. */ + def tryLog[T](f: => T): Try[T] = { + try { + val res = f + scala.util.Success(res) + } catch { + case ct: ControlThrowable => + throw ct + case t: Throwable => + logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) + scala.util.Failure(t) + } + } + /** Returns true if the given exception was fatal. See docs for scala.util.control.NonFatal. */ def isFatalError(e: Throwable): Boolean = { e match {