@@ -48,8 +48,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
4848 case None =>
4949 // Acquire a lock for loading this partition
5050 // If another thread already holds the lock, wait for it to finish return its results
51- acquireLockForPartition(key).foreach { values =>
52- return new InterruptibleIterator [T ](context, values.asInstanceOf [Iterator [T ]])
51+ val storedValues = acquireLockForPartition[T ](key)
52+ if (storedValues.isDefined) {
53+ return new InterruptibleIterator [T ](context, storedValues.get)
5354 }
5455
5556 // Otherwise, we have to load the partition ourselves
@@ -64,7 +65,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
6465
6566 // Otherwise, cache the values and keep track of any updates in block statuses
6667 val updatedBlocks = new ArrayBuffer [(BlockId , BlockStatus )]
67- val cachedValues = cacheValues (key, computedValues, storageLevel, updatedBlocks)
68+ val cachedValues = putInBlockManager (key, computedValues, storageLevel, updatedBlocks)
6869 context.taskMetrics.updatedBlocks = Some (updatedBlocks)
6970 new InterruptibleIterator (context, cachedValues)
7071
@@ -83,10 +84,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
8384 * If the lock is free, just acquire it and return None. Otherwise, another thread is already
8485 * loading the partition, so we wait for it to finish and return the values loaded by the thread.
8586 */
86- private def acquireLockForPartition (id : RDDBlockId ): Option [Iterator [Any ]] = {
87+ private def acquireLockForPartition [ T ] (id : RDDBlockId ): Option [Iterator [T ]] = {
8788 loading.synchronized {
8889 if (! loading.contains(id)) {
89- // If the partition is free, acquire its lock and begin computing its value
90+ // If the partition is free, acquire its lock to compute its value
9091 loading.add(id)
9192 None
9293 } else {
@@ -101,17 +102,15 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
101102 }
102103 }
103104 logInfo(s " Finished waiting for $id" )
104- /* See whether someone else has successfully loaded it. The main way this would fail
105- * is for the RDD-level cache eviction policy if someone else has loaded the same RDD
106- * partition but we didn't want to make space for it. However, that case is unlikely
107- * because it's unlikely that two threads would work on the same RDD partition. One
108- * downside of the current code is that threads wait serially if this does happen. */
109105 val values = blockManager.get(id)
110106 if (! values.isDefined) {
107+ /* The block is not guaranteed to exist even after the other thread has finished.
108+ * For instance, the block could be evicted after it was put, but before our get.
109+ * In this case, we still need to load the partition ourselves. */
111110 logInfo(s " Whoever was loading $id failed; we'll try it ourselves " )
112111 loading.add(id)
113112 }
114- values
113+ values.map(_. asInstanceOf [ Iterator [ T ]])
115114 }
116115 }
117116 }
@@ -120,45 +119,46 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
120119 * Cache the values of a partition, keeping track of any updates in the storage statuses
121120 * of other blocks along the way.
122121 */
123- private def cacheValues [T ](
122+ private def putInBlockManager [T ](
124123 key : BlockId ,
125- value : Iterator [T ],
124+ values : Iterator [T ],
126125 storageLevel : StorageLevel ,
127126 updatedBlocks : ArrayBuffer [(BlockId , BlockStatus )]): Iterator [T ] = {
128127
129- if (! storageLevel.useMemory) {
130- /* This RDD is not to be cached in memory, so we can just pass the computed values
131- * as an iterator directly to the BlockManager, rather than first fully unrolling
132- * it in memory. The latter option potentially uses much more memory and risks OOM
133- * exceptions that can be avoided. */
134- assume(storageLevel.useDisk || storageLevel.useOffHeap, s " Empty storage level for $key! " )
135- updatedBlocks ++= blockManager.put(key, value, storageLevel, tellMaster = true )
136- blockManager.get(key) match {
137- case Some (values) =>
138- values.asInstanceOf [Iterator [T ]]
139- case None =>
140- logInfo(s " Failure to store $key" )
141- throw new BlockException (key, s " Block manager failed to return cached value for $key! " )
142- }
143- } else {
144- /* This RDD is to be cached in memory. In this case we cannot pass the computed values
145- * to the BlockManager as an iterator and expect to read it back later. This is because
146- * we may end up dropping a partition from memory store before getting it back, e.g.
147- * when the entirety of the RDD does not fit in memory. */
148- if (storageLevel.deserialized) {
149- val elements = new ArrayBuffer [Any ]
150- elements ++= value
151- updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true )
152- elements.iterator.asInstanceOf [Iterator [T ]]
128+ val cachedValues = {
129+ if (! storageLevel.useMemory) {
130+ /* This RDD is not to be cached in memory, so we can just pass the computed values
131+ * as an iterator directly to the BlockManager, rather than first fully unrolling
132+ * it in memory. The latter option potentially uses much more memory and risks OOM
133+ * exceptions that can be avoided. */
134+ updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true )
135+ blockManager.get(key) match {
136+ case Some (v) => v
137+ case None =>
138+ logInfo(s " Failure to store $key" )
139+ throw new BlockException (key, s " Block manager failed to return cached value for $key! " )
140+ }
153141 } else {
154- /* This RDD is to be cached in memory in the form of serialized bytes. In this case,
155- * we only unroll the serialized form of the data, because the deserialized form may
156- * be much larger and may not fit in memory. */
157- val bytes = blockManager.dataSerialize(key, value)
158- updatedBlocks ++= blockManager.putBytes(key, bytes, storageLevel, tellMaster = true )
159- blockManager.dataDeserialize(key, bytes).asInstanceOf [Iterator [T ]]
142+ /* This RDD is to be cached in memory. In this case we cannot pass the computed values
143+ * to the BlockManager as an iterator and expect to read it back later. This is because
144+ * we may end up dropping a partition from memory store before getting it back, e.g.
145+ * when the entirety of the RDD does not fit in memory. */
146+ if (storageLevel.deserialized) {
147+ val elements = new ArrayBuffer [Any ]
148+ elements ++= values
149+ updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true )
150+ elements.iterator
151+ } else {
152+ /* This RDD is to be cached in memory in the form of serialized bytes. In this case,
153+ * we only unroll the serialized form of the data, because the deserialized form may
154+ * be much larger and may not fit in memory. */
155+ val bytes = blockManager.dataSerialize(key, values)
156+ updatedBlocks ++= blockManager.putBytes(key, bytes, storageLevel, tellMaster = true )
157+ blockManager.dataDeserialize(key, bytes)
158+ }
160159 }
161160 }
161+ cachedValues.asInstanceOf [Iterator [T ]]
162162 }
163163
164164}
0 commit comments