diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 195fd4f818b36..1b58c9f195927 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -829,10 +829,16 @@ private[spark] object MapOutputTracker extends Logging { // deserialize the Broadcast, pull .value array out of it, and then deserialize that val bcast = deserializeObject(bytes, 1, bytes.length - 1). asInstanceOf[Broadcast[Array[Byte]]] + try { logInfo("Broadcast mapstatuses size = " + bytes.length + ", actual size = " + bcast.value.length) // Important - ignore the DIRECT tag ! Start from offset 1 - deserializeObject(bcast.value, 1, bcast.value.length - 1).asInstanceOf[Array[MapStatus]] + deserializeObject(bcast.value, 1, bcast.value.length - 1).asInstanceOf[Array[MapStatus]] + } catch { + case e: IOException => + SparkEnv.get.blockManager.removeBroadcast(bcast.id, false) + throw new IOException(s"Failed to read broadcast for mapstatuses", e) + } case _ => throw new IllegalArgumentException("Unexpected byte tag = " + bytes(0)) } }