From a8f181d6483b509c29900de5f325a01ea0ef824f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 13 Jun 2014 20:49:18 -0700 Subject: [PATCH 1/7] Add special handling for StorageLevel.MEMORY_*_SER We only unroll the serialized form of each partition for this case, because the deserialized form may be much larger and may not fit in memory. This commit also abstracts out part of the logic of getOrCompute to make it more readable. --- .../scala/org/apache/spark/CacheManager.scala | 175 +++++++++++------- 1 file changed, 104 insertions(+), 71 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 315ed91f81df3..c70e0c9c0dc89 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -20,25 +20,25 @@ package org.apache.spark import scala.collection.mutable.{ArrayBuffer, HashSet} import org.apache.spark.rdd.RDD -import org.apache.spark.storage.{BlockId, BlockManager, BlockStatus, RDDBlockId, StorageLevel} +import org.apache.spark.storage._ /** - * Spark class responsible for passing RDDs split contents to the BlockManager and making + * Spark class responsible for passing RDDs partition contents to the BlockManager and making * sure a node doesn't load two copies of an RDD at once. */ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { - /** Keys of RDD splits that are being computed/loaded. */ + /** Keys of RDD partitions that are being computed/loaded. */ private val loading = new HashSet[RDDBlockId]() - /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */ + /** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */ def getOrCompute[T]( rdd: RDD[T], - split: Partition, + partition: Partition, context: TaskContext, storageLevel: StorageLevel): Iterator[T] = { - val key = RDDBlockId(rdd.id, split.index) + val key = RDDBlockId(rdd.id, partition.index) logDebug(s"Looking for partition $key") blockManager.get(key) match { case Some(values) => @@ -46,79 +46,27 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) case None => - // Mark the split as loading (unless someone else marks it first) - loading.synchronized { - if (loading.contains(key)) { - logInfo(s"Another thread is loading $key, waiting for it to finish...") - while (loading.contains(key)) { - try { - loading.wait() - } catch { - case e: Exception => - logWarning(s"Got an exception while waiting for another thread to load $key", e) - } - } - logInfo(s"Finished waiting for $key") - /* See whether someone else has successfully loaded it. The main way this would fail - * is for the RDD-level cache eviction policy if someone else has loaded the same RDD - * partition but we didn't want to make space for it. However, that case is unlikely - * because it's unlikely that two threads would work on the same RDD partition. One - * downside of the current code is that threads wait serially if this does happen. */ - blockManager.get(key) match { - case Some(values) => - return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) - case None => - logInfo(s"Whoever was loading $key failed; we'll try it ourselves") - loading.add(key) - } - } else { - loading.add(key) - } + // Acquire a lock for loading this partition + // If another thread already holds the lock, wait for it to finish return its results + acquireLockForPartition(key).foreach { values => + return new InterruptibleIterator[T](context, values.asInstanceOf[Iterator[T]]) } + + // Otherwise, we have to load the partition ourselves try { - // If we got here, we have to load the split logInfo(s"Partition $key not found, computing it") - val computedValues = rdd.computeOrReadCheckpoint(split, context) + val computedValues = rdd.computeOrReadCheckpoint(partition, context) - // Persist the result, so long as the task is not running locally + // If the task is running locally, do not persist the result if (context.runningLocally) { return computedValues } - // Keep track of blocks with updated statuses - var updatedBlocks = Seq[(BlockId, BlockStatus)]() - val returnValue: Iterator[T] = { - if (storageLevel.useDisk && !storageLevel.useMemory) { - /* In the case that this RDD is to be persisted using DISK_ONLY - * the iterator will be passed directly to the blockManager (rather then - * caching it to an ArrayBuffer first), then the resulting block data iterator - * will be passed back to the user. If the iterator generates a lot of data, - * this means that it doesn't all have to be held in memory at one time. - * This could also apply to MEMORY_ONLY_SER storage, but we need to make sure - * blocks aren't dropped by the block store before enabling that. */ - updatedBlocks = blockManager.put(key, computedValues, storageLevel, tellMaster = true) - blockManager.get(key) match { - case Some(values) => - values.asInstanceOf[Iterator[T]] - case None => - logInfo(s"Failure to store $key") - throw new SparkException("Block manager failed to return persisted value") - } - } else { - // In this case the RDD is cached to an array buffer. This will save the results - // if we're dealing with a 'one-time' iterator - val elements = new ArrayBuffer[Any] - elements ++= computedValues - updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true) - elements.iterator.asInstanceOf[Iterator[T]] - } - } - - // Update task metrics to include any blocks whose storage status is updated - val metrics = context.taskMetrics - metrics.updatedBlocks = Some(updatedBlocks) - - new InterruptibleIterator(context, returnValue) + // Otherwise, cache the values and keep track of any updates in block statuses + val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + val cachedValues = cacheValues(key, computedValues, storageLevel, updatedBlocks) + context.taskMetrics.updatedBlocks = Some(updatedBlocks) + new InterruptibleIterator(context, cachedValues) } finally { loading.synchronized { @@ -128,4 +76,89 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { } } } + + /** + * Acquire a loading lock for the partition identified by the given block ID. + * + * If the lock is free, just acquire it and return None. Otherwise, another thread is already + * loading the partition, so we wait for it to finish and return the values loaded by the thread. + */ + private def acquireLockForPartition(id: RDDBlockId): Option[Iterator[Any]] = { + loading.synchronized { + if (!loading.contains(id)) { + // If the partition is free, acquire its lock and begin computing its value + loading.add(id) + None + } else { + // Otherwise, wait for another thread to finish and return its result + logInfo(s"Another thread is loading $id, waiting for it to finish...") + while (loading.contains(id)) { + try { + loading.wait() + } catch { + case e: Exception => + logWarning(s"Exception while waiting for another thread to load $id", e) + } + } + logInfo(s"Finished waiting for $id") + /* See whether someone else has successfully loaded it. The main way this would fail + * is for the RDD-level cache eviction policy if someone else has loaded the same RDD + * partition but we didn't want to make space for it. However, that case is unlikely + * because it's unlikely that two threads would work on the same RDD partition. One + * downside of the current code is that threads wait serially if this does happen. */ + val values = blockManager.get(id) + if (!values.isDefined) { + logInfo(s"Whoever was loading $id failed; we'll try it ourselves") + loading.add(id) + } + values + } + } + } + + /** + * Cache the values of a partition, keeping track of any updates in the storage statuses + * of other blocks along the way. + */ + private def cacheValues[T]( + key: BlockId, + value: Iterator[T], + storageLevel: StorageLevel, + updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Iterator[T] = { + + if (!storageLevel.useMemory) { + /* This RDD is not to be cached in memory, so we can just pass the computed values + * as an iterator directly to the BlockManager, rather than first fully unrolling + * it in memory. The latter option potentially uses much more memory and risks OOM + * exceptions that can be avoided. */ + assume(storageLevel.useDisk || storageLevel.useOffHeap, s"Empty storage level for $key!") + updatedBlocks ++= blockManager.put(key, value, storageLevel, tellMaster = true) + blockManager.get(key) match { + case Some(values) => + values.asInstanceOf[Iterator[T]] + case None => + logInfo(s"Failure to store $key") + throw new BlockException(key, s"Block manager failed to return cached value for $key!") + } + } else { + /* This RDD is to be cached in memory. In this case we cannot pass the computed values + * to the BlockManager as an iterator and expect to read it back later. This is because + * we may end up dropping a partition from memory store before getting it back, e.g. + * when the entirety of the RDD does not fit in memory. */ + if (storageLevel.deserialized) { + val elements = new ArrayBuffer[Any] + elements ++= value + updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true) + elements.iterator.asInstanceOf[Iterator[T]] + } else { + /* This RDD is to be cached in memory in the form of serialized bytes. In this case, + * we only unroll the serialized form of the data, because the deserialized form may + * be much larger and may not fit in memory. */ + val bytes = blockManager.dataSerialize(key, value) + updatedBlocks ++= blockManager.putBytes(key, bytes, storageLevel, tellMaster = true) + blockManager.dataDeserialize(key, bytes).asInstanceOf[Iterator[T]] + } + } + } + } From 2941c89baacacfc7573cde35a694bc18a7f5fd4f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 13 Jun 2014 20:52:31 -0700 Subject: [PATCH 2/7] Clean up BlockStore (minor) --- .../org/apache/spark/storage/BlockStore.scala | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala index 9a9be047c7245..b9b53b1a2f118 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala @@ -24,11 +24,11 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.Logging /** - * Abstract class to store blocks + * Abstract class to store blocks. */ -private[spark] -abstract class BlockStore(val blockManager: BlockManager) extends Logging { - def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel) : PutResult +private[spark] abstract class BlockStore(val blockManager: BlockManager) extends Logging { + + def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult /** * Put in a block and, possibly, also return its content as either bytes or another Iterator. @@ -37,11 +37,17 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging { * @return a PutResult that contains the size of the data, as well as the values put if * returnValues is true (if not, the result's data field can be null) */ - def putValues(blockId: BlockId, values: Iterator[Any], level: StorageLevel, - returnValues: Boolean) : PutResult + def putValues( + blockId: BlockId, + values: Iterator[Any], + level: StorageLevel, + returnValues: Boolean): PutResult - def putValues(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel, - returnValues: Boolean) : PutResult + def putValues( + blockId: BlockId, + values: ArrayBuffer[Any], + level: StorageLevel, + returnValues: Boolean): PutResult /** * Return the size of a block in bytes. From 44ef28246ad4f8116155b0db4969898cc09e5e5e Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 13 Jun 2014 20:53:25 -0700 Subject: [PATCH 3/7] Actually return updated blocks in putBytes Previously we never returned the updated blocks in MemoryStore's putBytes. This is a simple bug with a simple fix. --- .../main/scala/org/apache/spark/storage/MemoryStore.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 084a566c48560..71f66c826c5b3 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -58,11 +58,11 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) val elements = new ArrayBuffer[Any] elements ++= values val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) - tryToPut(blockId, elements, sizeEstimate, true) - PutResult(sizeEstimate, Left(values.toIterator)) + val putAttempt = tryToPut(blockId, elements, sizeEstimate, deserialized = true) + PutResult(sizeEstimate, Left(values.toIterator), putAttempt.droppedBlocks) } else { - tryToPut(blockId, bytes, bytes.limit, false) - PutResult(bytes.limit(), Right(bytes.duplicate())) + val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false) + PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks) } } From 0091ec07c4f69c29e2cea1fd28fcfabb4a4a9fd3 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 16 Jun 2014 10:26:18 -0700 Subject: [PATCH 4/7] Address review feedback --- .../scala/org/apache/spark/CacheManager.scala | 86 +++++++++---------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index c70e0c9c0dc89..a2fbf15a2927d 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -48,8 +48,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { case None => // Acquire a lock for loading this partition // If another thread already holds the lock, wait for it to finish return its results - acquireLockForPartition(key).foreach { values => - return new InterruptibleIterator[T](context, values.asInstanceOf[Iterator[T]]) + val storedValues = acquireLockForPartition[T](key) + if (storedValues.isDefined) { + return new InterruptibleIterator[T](context, storedValues.get) } // Otherwise, we have to load the partition ourselves @@ -64,7 +65,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { // Otherwise, cache the values and keep track of any updates in block statuses val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - val cachedValues = cacheValues(key, computedValues, storageLevel, updatedBlocks) + val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks) context.taskMetrics.updatedBlocks = Some(updatedBlocks) new InterruptibleIterator(context, cachedValues) @@ -83,10 +84,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { * If the lock is free, just acquire it and return None. Otherwise, another thread is already * loading the partition, so we wait for it to finish and return the values loaded by the thread. */ - private def acquireLockForPartition(id: RDDBlockId): Option[Iterator[Any]] = { + private def acquireLockForPartition[T](id: RDDBlockId): Option[Iterator[T]] = { loading.synchronized { if (!loading.contains(id)) { - // If the partition is free, acquire its lock and begin computing its value + // If the partition is free, acquire its lock to compute its value loading.add(id) None } else { @@ -101,17 +102,15 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { } } logInfo(s"Finished waiting for $id") - /* See whether someone else has successfully loaded it. The main way this would fail - * is for the RDD-level cache eviction policy if someone else has loaded the same RDD - * partition but we didn't want to make space for it. However, that case is unlikely - * because it's unlikely that two threads would work on the same RDD partition. One - * downside of the current code is that threads wait serially if this does happen. */ val values = blockManager.get(id) if (!values.isDefined) { + /* The block is not guaranteed to exist even after the other thread has finished. + * For instance, the block could be evicted after it was put, but before our get. + * In this case, we still need to load the partition ourselves. */ logInfo(s"Whoever was loading $id failed; we'll try it ourselves") loading.add(id) } - values + values.map(_.asInstanceOf[Iterator[T]]) } } } @@ -120,45 +119,46 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { * Cache the values of a partition, keeping track of any updates in the storage statuses * of other blocks along the way. */ - private def cacheValues[T]( + private def putInBlockManager[T]( key: BlockId, - value: Iterator[T], + values: Iterator[T], storageLevel: StorageLevel, updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Iterator[T] = { - if (!storageLevel.useMemory) { - /* This RDD is not to be cached in memory, so we can just pass the computed values - * as an iterator directly to the BlockManager, rather than first fully unrolling - * it in memory. The latter option potentially uses much more memory and risks OOM - * exceptions that can be avoided. */ - assume(storageLevel.useDisk || storageLevel.useOffHeap, s"Empty storage level for $key!") - updatedBlocks ++= blockManager.put(key, value, storageLevel, tellMaster = true) - blockManager.get(key) match { - case Some(values) => - values.asInstanceOf[Iterator[T]] - case None => - logInfo(s"Failure to store $key") - throw new BlockException(key, s"Block manager failed to return cached value for $key!") - } - } else { - /* This RDD is to be cached in memory. In this case we cannot pass the computed values - * to the BlockManager as an iterator and expect to read it back later. This is because - * we may end up dropping a partition from memory store before getting it back, e.g. - * when the entirety of the RDD does not fit in memory. */ - if (storageLevel.deserialized) { - val elements = new ArrayBuffer[Any] - elements ++= value - updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true) - elements.iterator.asInstanceOf[Iterator[T]] + val cachedValues = { + if (!storageLevel.useMemory) { + /* This RDD is not to be cached in memory, so we can just pass the computed values + * as an iterator directly to the BlockManager, rather than first fully unrolling + * it in memory. The latter option potentially uses much more memory and risks OOM + * exceptions that can be avoided. */ + updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true) + blockManager.get(key) match { + case Some(v) => v + case None => + logInfo(s"Failure to store $key") + throw new BlockException(key, s"Block manager failed to return cached value for $key!") + } } else { - /* This RDD is to be cached in memory in the form of serialized bytes. In this case, - * we only unroll the serialized form of the data, because the deserialized form may - * be much larger and may not fit in memory. */ - val bytes = blockManager.dataSerialize(key, value) - updatedBlocks ++= blockManager.putBytes(key, bytes, storageLevel, tellMaster = true) - blockManager.dataDeserialize(key, bytes).asInstanceOf[Iterator[T]] + /* This RDD is to be cached in memory. In this case we cannot pass the computed values + * to the BlockManager as an iterator and expect to read it back later. This is because + * we may end up dropping a partition from memory store before getting it back, e.g. + * when the entirety of the RDD does not fit in memory. */ + if (storageLevel.deserialized) { + val elements = new ArrayBuffer[Any] + elements ++= values + updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true) + elements.iterator + } else { + /* This RDD is to be cached in memory in the form of serialized bytes. In this case, + * we only unroll the serialized form of the data, because the deserialized form may + * be much larger and may not fit in memory. */ + val bytes = blockManager.dataSerialize(key, values) + updatedBlocks ++= blockManager.putBytes(key, bytes, storageLevel, tellMaster = true) + blockManager.dataDeserialize(key, bytes) + } } } + cachedValues.asInstanceOf[Iterator[T]] } } From cf5f5656ed75a7c8b13b438dfc970239c2799b39 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 19 Jun 2014 18:03:32 -0700 Subject: [PATCH 5/7] Remove special handling for MEM_*_SER This special handling sacrifices CPU cycles for memory usage by introducing an additional step to deserialize the serialized bytes put into BlockManager. This may cause a performance regression in some cases. For now, let's keep the functionality the same as before, and only include style changes in this PR. This is a precursor to another incoming PR that changes the way unroll RDD partitions. --- .../scala/org/apache/spark/CacheManager.scala | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index a2fbf15a2927d..4f477f5bfbdee 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -143,19 +143,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { * to the BlockManager as an iterator and expect to read it back later. This is because * we may end up dropping a partition from memory store before getting it back, e.g. * when the entirety of the RDD does not fit in memory. */ - if (storageLevel.deserialized) { - val elements = new ArrayBuffer[Any] - elements ++= values - updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true) - elements.iterator - } else { - /* This RDD is to be cached in memory in the form of serialized bytes. In this case, - * we only unroll the serialized form of the data, because the deserialized form may - * be much larger and may not fit in memory. */ - val bytes = blockManager.dataSerialize(key, values) - updatedBlocks ++= blockManager.putBytes(key, bytes, storageLevel, tellMaster = true) - blockManager.dataDeserialize(key, bytes) - } + val elements = new ArrayBuffer[Any] + elements ++= values + updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true) + elements.iterator } } cachedValues.asInstanceOf[Iterator[T]] From d12b95f9a91d86570cd00d9662239b582f96b5cb Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 19 Jun 2014 18:19:22 -0700 Subject: [PATCH 6/7] Remove unused imports (minor) --- .../main/scala/org/apache/spark/scheduler/ResultTask.scala | 3 ++- .../scala/org/apache/spark/scheduler/ShuffleMapTask.scala | 5 +---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 0e8d551e4b2ab..bbf9f7388b074 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -17,11 +17,12 @@ package org.apache.spark.scheduler +import scala.language.existentials + import java.io._ import java.util.zip.{GZIPInputStream, GZIPOutputStream} import scala.collection.mutable.HashMap -import scala.language.existentials import org.apache.spark._ import org.apache.spark.rdd.{RDD, RDDCheckpointData} diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 0098b5a59d1a5..859cdc524a581 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -25,10 +25,7 @@ import java.util.zip.{GZIPInputStream, GZIPOutputStream} import scala.collection.mutable.HashMap import org.apache.spark._ -import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.rdd.{RDD, RDDCheckpointData} -import org.apache.spark.serializer.Serializer -import org.apache.spark.storage._ import org.apache.spark.shuffle.ShuffleWriter private[spark] object ShuffleMapTask { @@ -150,7 +147,7 @@ private[spark] class ShuffleMapTask( for (elem <- rdd.iterator(split, context)) { writer.write(elem.asInstanceOf[Product2[Any, Any]]) } - return writer.stop(success = true).get + writer.stop(success = true).get } catch { case e: Exception => if (writer != null) { From 3d9a3663f1244bc1b1e9eee503957a88a02e6b32 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 20 Jun 2014 16:03:19 -0700 Subject: [PATCH 7/7] Minor change for readability --- .../scala/org/apache/spark/CacheManager.scala | 43 +++++++++---------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 4f477f5bfbdee..3f667a4a0f9c5 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -125,31 +125,28 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { storageLevel: StorageLevel, updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Iterator[T] = { - val cachedValues = { - if (!storageLevel.useMemory) { - /* This RDD is not to be cached in memory, so we can just pass the computed values - * as an iterator directly to the BlockManager, rather than first fully unrolling - * it in memory. The latter option potentially uses much more memory and risks OOM - * exceptions that can be avoided. */ - updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true) - blockManager.get(key) match { - case Some(v) => v - case None => - logInfo(s"Failure to store $key") - throw new BlockException(key, s"Block manager failed to return cached value for $key!") - } - } else { - /* This RDD is to be cached in memory. In this case we cannot pass the computed values - * to the BlockManager as an iterator and expect to read it back later. This is because - * we may end up dropping a partition from memory store before getting it back, e.g. - * when the entirety of the RDD does not fit in memory. */ - val elements = new ArrayBuffer[Any] - elements ++= values - updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true) - elements.iterator + if (!storageLevel.useMemory) { + /* This RDD is not to be cached in memory, so we can just pass the computed values + * as an iterator directly to the BlockManager, rather than first fully unrolling + * it in memory. The latter option potentially uses much more memory and risks OOM + * exceptions that can be avoided. */ + updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true) + blockManager.get(key) match { + case Some(v) => v.asInstanceOf[Iterator[T]] + case None => + logInfo(s"Failure to store $key") + throw new BlockException(key, s"Block manager failed to return cached value for $key!") } + } else { + /* This RDD is to be cached in memory. In this case we cannot pass the computed values + * to the BlockManager as an iterator and expect to read it back later. This is because + * we may end up dropping a partition from memory store before getting it back, e.g. + * when the entirety of the RDD does not fit in memory. */ + val elements = new ArrayBuffer[Any] + elements ++= values + updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true) + elements.iterator.asInstanceOf[Iterator[T]] } - cachedValues.asInstanceOf[Iterator[T]] } }