From df9e05200b76c36647147131e2d2f3dc60c6804b Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Wed, 27 Dec 2017 17:06:28 +0800 Subject: [PATCH 1/2] Clean broadcast garbage when IOException occurred in MetadataFetch Summary: Ref T10399 Test Plan: n/a Reviewers: liushaohui, peng.zhang, zhoukang, chenfan Subscribers: cloud-computing Maniphest Tasks: T10399 Differential Revision: https://phabricator.d.xiaomi.net/D79279 --- .../main/scala/org/apache/spark/MapOutputTracker.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 195fd4f818b36..09cf1b8edc1d3 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -832,7 +832,13 @@ private[spark] object MapOutputTracker extends Logging { logInfo("Broadcast mapstatuses size = " + bytes.length + ", actual size = " + bcast.value.length) // Important - ignore the DIRECT tag ! Start from offset 1 - deserializeObject(bcast.value, 1, bcast.value.length - 1).asInstanceOf[Array[MapStatus]] + try { + deserializeObject(bcast.value, 1, bcast.value.length - 1).asInstanceOf[Array[MapStatus]] + } catch { + case e: IOException => + bcast.destroy() + throw new IOException(s"Failed to read broadcast for mapstatuses", e) + } case _ => throw new IllegalArgumentException("Unexpected byte tag = " + bytes(0)) } } From cd4fd77e20065963915682db377cff569391e9a9 Mon Sep 17 00:00:00 2001 From: liupengcheng Date: Wed, 27 Dec 2017 18:40:40 +0800 Subject: [PATCH 2/2] Update --- core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 09cf1b8edc1d3..1b58c9f195927 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -829,14 +829,14 @@ private[spark] object MapOutputTracker extends Logging { // deserialize the Broadcast, pull .value array out of it, and then deserialize that val bcast = deserializeObject(bytes, 1, bytes.length - 1). asInstanceOf[Broadcast[Array[Byte]]] + try { logInfo("Broadcast mapstatuses size = " + bytes.length + ", actual size = " + bcast.value.length) // Important - ignore the DIRECT tag ! Start from offset 1 - try { deserializeObject(bcast.value, 1, bcast.value.length - 1).asInstanceOf[Array[MapStatus]] } catch { case e: IOException => - bcast.destroy() + SparkEnv.get.blockManager.removeBroadcast(bcast.id, false) throw new IOException(s"Failed to read broadcast for mapstatuses", e) } case _ => throw new IllegalArgumentException("Unexpected byte tag = " + bytes(0))