Skip to content

Commit cd3a9be

Browse files
committed
[SPARK-4080] Only throw IOException from [write|read][Object|External].
If classes implementing Serializable or Externalizable interfaces throw exceptions other than IOException or ClassNotFoundException from their (de)serialization methods, then this results in an unhelpful "IOException: unexpected exception type" rather than the actual exception that produced the (de)serialization error. This patch fixes this by adding a utility method that re-wraps any uncaught exceptions in IOException (unless they are already instances of IOException).
1 parent 7c89a8f commit cd3a9be

30 files changed

+84
-52
lines changed

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.collection.mutable.Map
2424
import scala.reflect.ClassTag
2525

2626
import org.apache.spark.serializer.JavaSerializer
27+
import org.apache.spark.util.Utils
2728

2829
/**
2930
* A data type that can be accumulated, ie has an commutative and associative "add" operation,
@@ -126,7 +127,7 @@ class Accumulable[R, T] (
126127
}
127128

128129
// Called by Java when deserializing an object
129-
private def readObject(in: ObjectInputStream) {
130+
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
130131
in.defaultReadObject()
131132
value_ = zero
132133
deserialized = true

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
204204
}
205205

206206
@throws(classOf[IOException])
207-
private def writeObject(out: ObjectOutputStream) {
207+
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
208208
val sfactory = SparkEnv.get.serializer
209209
sfactory match {
210210
case js: JavaSerializer => out.defaultWriteObject()
@@ -222,7 +222,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
222222
}
223223

224224
@throws(classOf[IOException])
225-
private def readObject(in: ObjectInputStream) {
225+
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
226226
val sfactory = SparkEnv.get.serializer
227227
sfactory match {
228228
case js: JavaSerializer => in.defaultReadObject()

core/src/main/scala/org/apache/spark/SerializableWritable.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,19 @@ import org.apache.hadoop.io.ObjectWritable
2424
import org.apache.hadoop.io.Writable
2525

2626
import org.apache.spark.annotation.DeveloperApi
27+
import org.apache.spark.util.Utils
2728

2829
@DeveloperApi
2930
class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
3031
def value = t
3132
override def toString = t.toString
3233

33-
private def writeObject(out: ObjectOutputStream) {
34+
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
3435
out.defaultWriteObject()
3536
new ObjectWritable(t).write(out)
3637
}
3738

38-
private def readObject(in: ObjectInputStream) {
39+
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
3940
in.defaultReadObject()
4041
val ow = new ObjectWritable()
4142
ow.setConf(new Configuration())

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,13 @@ private[spark] class HttpBroadcast[T: ClassTag](
7272
}
7373

7474
/** Used by the JVM when serializing this object. */
75-
private def writeObject(out: ObjectOutputStream) {
75+
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
7676
assertValid()
7777
out.defaultWriteObject()
7878
}
7979

8080
/** Used by the JVM when deserializing this object. */
81-
private def readObject(in: ObjectInputStream) {
81+
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
8282
in.defaultReadObject()
8383
HttpBroadcast.synchronized {
8484
SparkEnv.get.blockManager.getSingle(blockId) match {

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
2828
import org.apache.spark.io.CompressionCodec
2929
import org.apache.spark.serializer.Serializer
3030
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
31-
import org.apache.spark.util.ByteBufferInputStream
31+
import org.apache.spark.util.{ByteBufferInputStream, Utils}
3232
import org.apache.spark.util.io.ByteArrayChunkOutputStream
3333

3434
/**
@@ -152,13 +152,13 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
152152
}
153153

154154
/** Used by the JVM when serializing this object. */
155-
private def writeObject(out: ObjectOutputStream) {
155+
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
156156
assertValid()
157157
out.defaultWriteObject()
158158
}
159159

160160
/** Used by the JVM when deserializing this object. */
161-
private def readObject(in: ObjectInputStream) {
161+
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
162162
in.defaultReadObject()
163163
TorrentBroadcast.synchronized {
164164
setConf(SparkEnv.get.conf)

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer
2525
import akka.actor.ActorRef
2626

2727
import org.apache.spark.deploy.ApplicationDescription
28+
import org.apache.spark.util.Utils
2829

2930
private[spark] class ApplicationInfo(
3031
val startTime: Long,
@@ -46,7 +47,7 @@ private[spark] class ApplicationInfo(
4647

4748
init()
4849

49-
private def readObject(in: java.io.ObjectInputStream): Unit = {
50+
private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
5051
in.defaultReadObject()
5152
init()
5253
}

core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
2020
import java.util.Date
2121

2222
import org.apache.spark.deploy.DriverDescription
23+
import org.apache.spark.util.Utils
2324

2425
private[spark] class DriverInfo(
2526
val startTime: Long,
@@ -36,7 +37,7 @@ private[spark] class DriverInfo(
3637

3738
init()
3839

39-
private def readObject(in: java.io.ObjectInputStream): Unit = {
40+
private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
4041
in.defaultReadObject()
4142
init()
4243
}

core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ private[spark] class WorkerInfo(
5050
def coresFree: Int = cores - coresUsed
5151
def memoryFree: Int = memory - memoryUsed
5252

53-
private def readObject(in: java.io.ObjectInputStream) : Unit = {
53+
private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
5454
in.defaultReadObject()
5555
init()
5656
}

core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream}
2222
import scala.reflect.ClassTag
2323

2424
import org.apache.spark._
25+
import org.apache.spark.util.Utils
2526

2627
private[spark]
2728
class CartesianPartition(
@@ -36,7 +37,7 @@ class CartesianPartition(
3637
override val index: Int = idx
3738

3839
@throws(classOf[IOException])
39-
private def writeObject(oos: ObjectOutputStream) {
40+
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
4041
// Update the reference to parent split at the time of task serialization
4142
s1 = rdd1.partitions(s1Index)
4243
s2 = rdd2.partitions(s2Index)

core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv
2727
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
2828
import org.apache.spark.annotation.DeveloperApi
2929
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer}
30+
import org.apache.spark.util.Utils
3031
import org.apache.spark.serializer.Serializer
3132
import org.apache.spark.shuffle.ShuffleHandle
3233

@@ -39,7 +40,7 @@ private[spark] case class NarrowCoGroupSplitDep(
3940
) extends CoGroupSplitDep {
4041

4142
@throws(classOf[IOException])
42-
private def writeObject(oos: ObjectOutputStream) {
43+
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
4344
// Update the reference to parent split at the time of task serialization
4445
split = rdd.partitions(splitIndex)
4546
oos.defaultWriteObject()

0 commit comments

Comments
 (0)