1717
1818package org .apache .spark
1919
20- import scala .collection .mutable .{ArrayBuffer , HashSet }
20+ import scala .collection .mutable
21+ import scala .collection .mutable .ArrayBuffer
2122
22- import org .apache .spark .executor .InputMetrics
2323import org .apache .spark .rdd .RDD
2424import org .apache .spark .storage ._
2525
@@ -30,7 +30,7 @@ import org.apache.spark.storage._
3030private [spark] class CacheManager (blockManager : BlockManager ) extends Logging {
3131
3232 /** Keys of RDD partitions that are being computed/loaded. */
33- private val loading = new HashSet [RDDBlockId ]()
33+ private val loading = new mutable. HashSet [RDDBlockId ]
3434
3535 /** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
3636 def getOrCompute [T ](
@@ -118,36 +118,66 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
118118 }
119119
120120 /**
121- * Cache the values of a partition, keeping track of any updates in the storage statuses
122- * of other blocks along the way.
121+ * Cache the values of a partition, keeping track of any updates in the storage statuses of
122+ * other blocks along the way.
123+ *
124+ * The effective storage level refers to the level that actually specifies BlockManager put
125+ * behavior, not the level originally specified by the user. This is mainly for forcing a
126+ * MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition,
127+ * while preserving the the original semantics of the RDD as specified by the application.
123128 */
124129 private def putInBlockManager [T ](
125130 key : BlockId ,
126131 values : Iterator [T ],
127- storageLevel : StorageLevel ,
128- updatedBlocks : ArrayBuffer [(BlockId , BlockStatus )]): Iterator [T ] = {
129-
130- if (! storageLevel.useMemory) {
131- /* This RDD is not to be cached in memory, so we can just pass the computed values
132- * as an iterator directly to the BlockManager, rather than first fully unrolling
133- * it in memory. The latter option potentially uses much more memory and risks OOM
134- * exceptions that can be avoided. */
135- updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true )
132+ level : StorageLevel ,
133+ updatedBlocks : ArrayBuffer [(BlockId , BlockStatus )],
134+ effectiveStorageLevel : Option [StorageLevel ] = None ): Iterator [T ] = {
135+
136+ val putLevel = effectiveStorageLevel.getOrElse(level)
137+ if (! putLevel.useMemory) {
138+ /*
139+ * This RDD is not to be cached in memory, so we can just pass the computed values as an
140+ * iterator directly to the BlockManager rather than first fully unrolling it in memory.
141+ */
142+ updatedBlocks ++=
143+ blockManager.putIterator(key, values, level, tellMaster = true , effectiveStorageLevel)
136144 blockManager.get(key) match {
137145 case Some (v) => v.data.asInstanceOf [Iterator [T ]]
138146 case None =>
139147 logInfo(s " Failure to store $key" )
140148 throw new BlockException (key, s " Block manager failed to return cached value for $key! " )
141149 }
142150 } else {
143- /* This RDD is to be cached in memory. In this case we cannot pass the computed values
151+ /*
152+ * This RDD is to be cached in memory. In this case we cannot pass the computed values
144153 * to the BlockManager as an iterator and expect to read it back later. This is because
145- * we may end up dropping a partition from memory store before getting it back, e.g.
146- * when the entirety of the RDD does not fit in memory. */
147- val elements = new ArrayBuffer [Any ]
148- elements ++= values
149- updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true )
150- elements.iterator.asInstanceOf [Iterator [T ]]
154+ * we may end up dropping a partition from memory store before getting it back.
155+ *
156+ * In addition, we must be careful to not unroll the entire partition in memory at once.
157+ * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
158+ * single partition. Instead, we unroll the values cautiously, potentially aborting and
159+ * dropping the partition to disk if applicable.
160+ */
161+ blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
162+ case Left (arr) =>
163+ // We have successfully unrolled the entire partition, so cache it in memory
164+ updatedBlocks ++=
165+ blockManager.putArray(key, arr, level, tellMaster = true , effectiveStorageLevel)
166+ arr.iterator.asInstanceOf [Iterator [T ]]
167+ case Right (it) =>
168+ // There is not enough space to cache this partition in memory
169+ logWarning(s " Not enough space to cache partition $key in memory! " +
170+ s " Free memory is ${blockManager.memoryStore.freeMemory} bytes. " )
171+ val returnValues = it.asInstanceOf [Iterator [T ]]
172+ if (putLevel.useDisk) {
173+ logWarning(s " Persisting partition $key to disk instead. " )
174+ val diskOnlyLevel = StorageLevel (useDisk = true , useMemory = false ,
175+ useOffHeap = false , deserialized = false , putLevel.replication)
176+ putInBlockManager[T ](key, returnValues, level, updatedBlocks, Some (diskOnlyLevel))
177+ } else {
178+ returnValues
179+ }
180+ }
151181 }
152182 }
153183
0 commit comments