@@ -125,31 +125,28 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
125125 storageLevel : StorageLevel ,
126126 updatedBlocks : ArrayBuffer [(BlockId , BlockStatus )]): Iterator [T ] = {
127127
128- val cachedValues = {
129- if (! storageLevel.useMemory) {
130- /* This RDD is not to be cached in memory, so we can just pass the computed values
131- * as an iterator directly to the BlockManager, rather than first fully unrolling
132- * it in memory. The latter option potentially uses much more memory and risks OOM
133- * exceptions that can be avoided. */
134- updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true )
135- blockManager.get(key) match {
136- case Some (v) => v
137- case None =>
138- logInfo(s " Failure to store $key" )
139- throw new BlockException (key, s " Block manager failed to return cached value for $key! " )
140- }
141- } else {
142- /* This RDD is to be cached in memory. In this case we cannot pass the computed values
143- * to the BlockManager as an iterator and expect to read it back later. This is because
144- * we may end up dropping a partition from memory store before getting it back, e.g.
145- * when the entirety of the RDD does not fit in memory. */
146- val elements = new ArrayBuffer [Any ]
147- elements ++= values
148- updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true )
149- elements.iterator
128+ if (! storageLevel.useMemory) {
129+ /* This RDD is not to be cached in memory, so we can just pass the computed values
130+ * as an iterator directly to the BlockManager, rather than first fully unrolling
131+ * it in memory. The latter option potentially uses much more memory and risks OOM
132+ * exceptions that can be avoided. */
133+ updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true )
134+ blockManager.get(key) match {
135+ case Some (v) => v.asInstanceOf [Iterator [T ]]
136+ case None =>
137+ logInfo(s " Failure to store $key" )
138+ throw new BlockException (key, s " Block manager failed to return cached value for $key! " )
150139 }
140+ } else {
141+ /* This RDD is to be cached in memory. In this case we cannot pass the computed values
142+ * to the BlockManager as an iterator and expect to read it back later. This is because
143+ * we may end up dropping a partition from memory store before getting it back, e.g.
144+ * when the entirety of the RDD does not fit in memory. */
145+ val elements = new ArrayBuffer [Any ]
146+ elements ++= values
147+ updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true )
148+ elements.iterator.asInstanceOf [Iterator [T ]]
151149 }
152- cachedValues.asInstanceOf [Iterator [T ]]
153150 }
154151
155152}
0 commit comments