@@ -34,13 +34,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
3434 /** Keys of RDD partitions that are being computed/loaded. */
3535 private val loading = new mutable.HashSet [RDDBlockId ]
3636
37- /**
38- * The amount of space ensured for unrolling partitions, shared across all cores.
39- * This space is not reserved in advance, but allocated dynamically by dropping existing blocks.
40- * It must be a lazy val in order to access a mocked BlockManager's conf in tests properly.
41- */
42- private lazy val globalBufferMemory = BlockManager .getBufferMemory(blockManager.conf)
43-
4437 /** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
4538 def getOrCompute [T ](
4639 rdd : RDD [T ],
@@ -137,10 +130,12 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
137130 updatedBlocks : ArrayBuffer [(BlockId , BlockStatus )]): Iterator [T ] = {
138131
139132 if (! storageLevel.useMemory) {
140- /* This RDD is not to be cached in memory, so we can just pass the computed values
141- * as an iterator directly to the BlockManager, rather than first fully unrolling
133+ /*
134+ * This RDD is not to be cached in memory, so we can just pass the computed values
135+ * as an iterator directly to the BlockManager, rather than first fully unfolding
142136 * it in memory. The latter option potentially uses much more memory and risks OOM
143- * exceptions that can be avoided. */
137+ * exceptions that can be avoided.
138+ */
144139 updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true )
145140 blockManager.get(key) match {
146141 case Some (v) => v.data.asInstanceOf [Iterator [T ]]
@@ -149,86 +144,38 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
149144 throw new BlockException (key, s " Block manager failed to return cached value for $key! " )
150145 }
151146 } else {
152- /* This RDD is to be cached in memory. In this case we cannot pass the computed values
147+ /*
148+ * This RDD is to be cached in memory. In this case we cannot pass the computed values
153149 * to the BlockManager as an iterator and expect to read it back later. This is because
154150 * we may end up dropping a partition from memory store before getting it back, e.g.
155- * when the entirety of the RDD does not fit in memory. */
156-
157- var count = 0 // The number of elements unrolled so far
158- var dropPartition = false // Whether to drop the new partition from memory
159- var previousSize = 0L // Previous estimate of the size of our buffer
160- val memoryRequestPeriod = 1000 // How frequently we request for more memory for our buffer
161-
162- val threadId = Thread .currentThread().getId
163- val cacheMemoryMap = SparkEnv .get.cacheMemoryMap
164- var buffer = new SizeTrackingAppendOnlyBuffer [Any ]
165-
166- try {
167- /* While adding values to the in-memory buffer, periodically check whether the memory
168- * restrictions for unrolling partitions are still satisfied. If not, stop immediately,
169- * and persist the partition to disk if specified by the storage level. This check is
170- * a safeguard against the scenario when a single partition does not fit in memory. */
171- while (values.hasNext && ! dropPartition) {
172- buffer += values.next()
173- count += 1
174- if (count % memoryRequestPeriod == 1 ) {
175- // Calculate the amount of memory to request from the global memory pool
176- val currentSize = buffer.estimateSize()
177- val delta = math.max(currentSize - previousSize, 0 )
178- val memoryToRequest = currentSize + delta
179- previousSize = currentSize
180-
181- // Atomically check whether there is sufficient memory in the global pool to continue
182- cacheMemoryMap.synchronized {
183- val previouslyOccupiedMemory = cacheMemoryMap.get(threadId).getOrElse(0L )
184- val otherThreadsMemory = cacheMemoryMap.values.sum - previouslyOccupiedMemory
185-
186- // Request for memory for the local buffer, and return whether request is granted
187- def requestForMemory (): Boolean = {
188- val availableMemory = blockManager.memoryStore.freeMemory - otherThreadsMemory
189- val granted = availableMemory > memoryToRequest
190- if (granted) { cacheMemoryMap(threadId) = memoryToRequest }
191- granted
192- }
193-
194- // If the first request is not granted, try again after ensuring free space
195- // If there is still not enough space, give up and drop the partition
196- if (! requestForMemory()) {
197- val result = blockManager.memoryStore.ensureFreeSpace(key, globalBufferMemory)
198- updatedBlocks ++= result.droppedBlocks
199- dropPartition = ! requestForMemory()
200- }
201- }
202- }
203- }
204-
205- if (! dropPartition) {
206- // We have successfully unrolled the entire partition, so cache it in memory
207- updatedBlocks ++= blockManager.put(key, buffer.array, storageLevel, tellMaster = true )
208- buffer.iterator.asInstanceOf [Iterator [T ]]
209- } else {
210- // We have exceeded our collective quota. This partition will not be cached in memory.
151+ * when the entirety of the RDD does not fit in memory.
152+ *
153+ * In addition, we must be careful to not unfold the entire partition in memory at once.
154+ * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
155+ * single partition. Instead, we unfold the values cautiously, potentially aborting and
156+ * dropping the partition to disk if applicable.
157+ */
158+ blockManager.memoryStore.unfoldSafely(key, values, storageLevel, updatedBlocks) match {
159+ case Left (arrayValues) =>
160+ // We have successfully unfolded the entire partition, so cache it in memory
161+ updatedBlocks ++= blockManager.put(key, arrayValues, storageLevel, tellMaster = true )
162+ arrayValues.iterator.asInstanceOf [Iterator [T ]]
163+ case Right (iteratorValues) =>
164+ // There is not enough space to cache this partition in memory
165+ var returnValues = iteratorValues.asInstanceOf [Iterator [T ]]
211166 val persistToDisk = storageLevel.useDisk
212- logWarning(s " Failed to cache $key in memory! There is not enough space to unroll the " +
167+ logWarning(s " Failed to cache $key in memory! There is not enough space to unfold the " +
213168 s " entire partition. " + (if (persistToDisk) " Persisting to disk instead." else " " ))
214- var newValues = (buffer.iterator ++ values).asInstanceOf [Iterator [T ]]
215169 if (persistToDisk) {
216170 val newLevel = StorageLevel (
217171 storageLevel.useDisk,
218172 useMemory = false ,
219173 storageLevel.useOffHeap,
220174 deserialized = false ,
221175 storageLevel.replication)
222- newValues = putInBlockManager[T ](key, newValues , newLevel, updatedBlocks)
176+ returnValues = putInBlockManager[T ](key, returnValues , newLevel, updatedBlocks)
223177 }
224- newValues
225- }
226- } finally {
227- // Free up buffer for other threads
228- buffer = null
229- cacheMemoryMap.synchronized {
230- cacheMemoryMap(threadId) = 0
231- }
178+ returnValues
232179 }
233180 }
234181 }
0 commit comments