From 58acc06148e243420ab12ced77749be1767c4bc0 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 12 Dec 2016 13:44:20 +0800 Subject: [PATCH 1/5] Fix cannot read broadcast on disk --- .../apache/spark/storage/memory/MemoryStore.scala | 2 +- .../org/apache/spark/broadcast/BroadcastSuite.scala | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index fff21218b176..bbe890448b93 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -694,7 +694,7 @@ private[storage] class PartiallyUnrolledIterator[T]( } override def next(): T = { - if (unrolled == null) { + if (unrolled == null || unrolled.isEmpty) { rest.next() } else { unrolled.next() diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index 973676398ae5..6646068d5080 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -137,6 +137,18 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("Cache broadcast to disk") { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test") + .set("spark.memory.useLegacyMode", "true") + .set("spark.storage.memoryFraction", "0.0") + sc = new SparkContext(conf) + val list = List[Int](1, 2, 3, 4) + val broadcast = sc.broadcast(list) + assert(broadcast.value.sum === 10) + } + /** * Verify the persistence of state associated with a TorrentBroadcast in a local-cluster. * From d964c540974e680fc2809cd40a25ef72992a17db Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 12 Dec 2016 23:06:55 +0800 Subject: [PATCH 2/5] Consistency with previously code --- .../scala/org/apache/spark/storage/memory/MemoryStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index bbe890448b93..929a0808bd23 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -694,7 +694,7 @@ private[storage] class PartiallyUnrolledIterator[T]( } override def next(): T = { - if (unrolled == null || unrolled.isEmpty) { + if (unrolled == null || !unrolled.hasNext) { rest.next() } else { unrolled.next() From f00474032dab2561e85d3c2fd7aad01d0dcacc8e Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 13 Dec 2016 19:31:09 +0800 Subject: [PATCH 3/5] Make more robust --- .../apache/spark/broadcast/TorrentBroadcast.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index f35078437879..22d01c47e645 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -207,11 +207,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) TorrentBroadcast.synchronized { setConf(SparkEnv.get.conf) val blockManager = SparkEnv.get.blockManager - blockManager.getLocalValues(broadcastId).map(_.data.next()) match { - case Some(x) => - releaseLock(broadcastId) - x.asInstanceOf[T] - + blockManager.getLocalValues(broadcastId) match { + case Some(blockResult) => + if (blockResult.data.hasNext) { + val x = blockResult.data.next().asInstanceOf[T] + releaseLock(broadcastId) + x + } else { + throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") + } case None => logInfo("Started reading broadcast variable " + id) val startTimeMs = System.currentTimeMillis() From 0903da83632336db24083d7d19f8d751d1c876c7 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 15 Dec 2016 01:02:44 +0800 Subject: [PATCH 4/5] Remove !unrolled.hasNext check. --- .../scala/org/apache/spark/storage/memory/MemoryStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 929a0808bd23..fff21218b176 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -694,7 +694,7 @@ private[storage] class PartiallyUnrolledIterator[T]( } override def next(): T = { - if (unrolled == null || !unrolled.hasNext) { + if (unrolled == null) { rest.next() } else { unrolled.next() From c13edcee677a63eb4230520515dfb68fb228bb20 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sat, 17 Dec 2016 21:27:52 +0800 Subject: [PATCH 5/5] Restore !unrolled.hasNext check. --- .../scala/org/apache/spark/storage/memory/MemoryStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index fff21218b176..929a0808bd23 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -694,7 +694,7 @@ private[storage] class PartiallyUnrolledIterator[T]( } override def next(): T = { - if (unrolled == null) { + if (unrolled == null || !unrolled.hasNext) { rest.next() } else { unrolled.next()