@@ -20,105 +20,54 @@ package org.apache.spark
2020import scala .collection .mutable .{ArrayBuffer , HashSet }
2121
2222import org .apache .spark .rdd .RDD
23- import org .apache .spark .storage .{ BlockId , BlockManager , BlockStatus , RDDBlockId , StorageLevel }
23+ import org .apache .spark .storage ._
2424
2525/**
26- * Spark class responsible for passing RDDs split contents to the BlockManager and making
26+ * Spark class responsible for passing RDDs partition contents to the BlockManager and making
2727 * sure a node doesn't load two copies of an RDD at once.
2828 */
2929private [spark] class CacheManager (blockManager : BlockManager ) extends Logging {
3030
31- /** Keys of RDD splits that are being computed/loaded. */
31+ /** Keys of RDD partitions that are being computed/loaded. */
3232 private val loading = new HashSet [RDDBlockId ]()
3333
34- /** Gets or computes an RDD split . Used by RDD.iterator() when an RDD is cached. */
34+ /** Gets or computes an RDD partition . Used by RDD.iterator() when an RDD is cached. */
3535 def getOrCompute [T ](
3636 rdd : RDD [T ],
37- split : Partition ,
37+ partition : Partition ,
3838 context : TaskContext ,
3939 storageLevel : StorageLevel ): Iterator [T ] = {
4040
41- val key = RDDBlockId (rdd.id, split .index)
41+ val key = RDDBlockId (rdd.id, partition .index)
4242 logDebug(s " Looking for partition $key" )
4343 blockManager.get(key) match {
4444 case Some (values) =>
4545 // Partition is already materialized, so just return its values
4646 new InterruptibleIterator (context, values.asInstanceOf [Iterator [T ]])
4747
4848 case None =>
49- // Mark the split as loading (unless someone else marks it first)
50- loading.synchronized {
51- if (loading.contains(key)) {
52- logInfo(s " Another thread is loading $key, waiting for it to finish... " )
53- while (loading.contains(key)) {
54- try {
55- loading.wait()
56- } catch {
57- case e : Exception =>
58- logWarning(s " Got an exception while waiting for another thread to load $key" , e)
59- }
60- }
61- logInfo(s " Finished waiting for $key" )
62- /* See whether someone else has successfully loaded it. The main way this would fail
63- * is for the RDD-level cache eviction policy if someone else has loaded the same RDD
64- * partition but we didn't want to make space for it. However, that case is unlikely
65- * because it's unlikely that two threads would work on the same RDD partition. One
66- * downside of the current code is that threads wait serially if this does happen. */
67- blockManager.get(key) match {
68- case Some (values) =>
69- return new InterruptibleIterator (context, values.asInstanceOf [Iterator [T ]])
70- case None =>
71- logInfo(s " Whoever was loading $key failed; we'll try it ourselves " )
72- loading.add(key)
73- }
74- } else {
75- loading.add(key)
76- }
49+ // Acquire a lock for loading this partition
50+ // If another thread already holds the lock, wait for it to finish return its results
51+ val storedValues = acquireLockForPartition[T ](key)
52+ if (storedValues.isDefined) {
53+ return new InterruptibleIterator [T ](context, storedValues.get)
7754 }
55+
56+ // Otherwise, we have to load the partition ourselves
7857 try {
79- // If we got here, we have to load the split
8058 logInfo(s " Partition $key not found, computing it " )
81- val computedValues = rdd.computeOrReadCheckpoint(split , context)
59+ val computedValues = rdd.computeOrReadCheckpoint(partition , context)
8260
83- // Persist the result, so long as the task is not running locally
61+ // If the task is running locally, do not persist the result
8462 if (context.runningLocally) {
8563 return computedValues
8664 }
8765
88- // Keep track of blocks with updated statuses
89- var updatedBlocks = Seq [(BlockId , BlockStatus )]()
90- val returnValue : Iterator [T ] = {
91- if (storageLevel.useDisk && ! storageLevel.useMemory) {
92- /* In the case that this RDD is to be persisted using DISK_ONLY
93- * the iterator will be passed directly to the blockManager (rather then
94- * caching it to an ArrayBuffer first), then the resulting block data iterator
95- * will be passed back to the user. If the iterator generates a lot of data,
96- * this means that it doesn't all have to be held in memory at one time.
97- * This could also apply to MEMORY_ONLY_SER storage, but we need to make sure
98- * blocks aren't dropped by the block store before enabling that. */
99- updatedBlocks = blockManager.put(key, computedValues, storageLevel, tellMaster = true )
100- blockManager.get(key) match {
101- case Some (values) =>
102- values.asInstanceOf [Iterator [T ]]
103- case None =>
104- logInfo(s " Failure to store $key" )
105- throw new SparkException (" Block manager failed to return persisted value" )
106- }
107- } else {
108- // In this case the RDD is cached to an array buffer. This will save the results
109- // if we're dealing with a 'one-time' iterator
110- val elements = new ArrayBuffer [Any ]
111- elements ++= computedValues
112- updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true )
113- elements.iterator.asInstanceOf [Iterator [T ]]
114- }
115- }
116-
117- // Update task metrics to include any blocks whose storage status is updated
118- val metrics = context.taskMetrics
119- metrics.updatedBlocks = Some (updatedBlocks)
120-
121- new InterruptibleIterator (context, returnValue)
66+ // Otherwise, cache the values and keep track of any updates in block statuses
67+ val updatedBlocks = new ArrayBuffer [(BlockId , BlockStatus )]
68+ val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
69+ context.taskMetrics.updatedBlocks = Some (updatedBlocks)
70+ new InterruptibleIterator (context, cachedValues)
12271
12372 } finally {
12473 loading.synchronized {
@@ -128,4 +77,76 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
12877 }
12978 }
13079 }
80+
81+ /**
82+ * Acquire a loading lock for the partition identified by the given block ID.
83+ *
84+ * If the lock is free, just acquire it and return None. Otherwise, another thread is already
85+ * loading the partition, so we wait for it to finish and return the values loaded by the thread.
86+ */
87+ private def acquireLockForPartition [T ](id : RDDBlockId ): Option [Iterator [T ]] = {
88+ loading.synchronized {
89+ if (! loading.contains(id)) {
90+ // If the partition is free, acquire its lock to compute its value
91+ loading.add(id)
92+ None
93+ } else {
94+ // Otherwise, wait for another thread to finish and return its result
95+ logInfo(s " Another thread is loading $id, waiting for it to finish... " )
96+ while (loading.contains(id)) {
97+ try {
98+ loading.wait()
99+ } catch {
100+ case e : Exception =>
101+ logWarning(s " Exception while waiting for another thread to load $id" , e)
102+ }
103+ }
104+ logInfo(s " Finished waiting for $id" )
105+ val values = blockManager.get(id)
106+ if (! values.isDefined) {
107+ /* The block is not guaranteed to exist even after the other thread has finished.
108+ * For instance, the block could be evicted after it was put, but before our get.
109+ * In this case, we still need to load the partition ourselves. */
110+ logInfo(s " Whoever was loading $id failed; we'll try it ourselves " )
111+ loading.add(id)
112+ }
113+ values.map(_.asInstanceOf [Iterator [T ]])
114+ }
115+ }
116+ }
117+
118+ /**
119+ * Cache the values of a partition, keeping track of any updates in the storage statuses
120+ * of other blocks along the way.
121+ */
122+ private def putInBlockManager [T ](
123+ key : BlockId ,
124+ values : Iterator [T ],
125+ storageLevel : StorageLevel ,
126+ updatedBlocks : ArrayBuffer [(BlockId , BlockStatus )]): Iterator [T ] = {
127+
128+ if (! storageLevel.useMemory) {
129+ /* This RDD is not to be cached in memory, so we can just pass the computed values
130+ * as an iterator directly to the BlockManager, rather than first fully unrolling
131+ * it in memory. The latter option potentially uses much more memory and risks OOM
132+ * exceptions that can be avoided. */
133+ updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true )
134+ blockManager.get(key) match {
135+ case Some (v) => v.asInstanceOf [Iterator [T ]]
136+ case None =>
137+ logInfo(s " Failure to store $key" )
138+ throw new BlockException (key, s " Block manager failed to return cached value for $key! " )
139+ }
140+ } else {
141+ /* This RDD is to be cached in memory. In this case we cannot pass the computed values
142+ * to the BlockManager as an iterator and expect to read it back later. This is because
143+ * we may end up dropping a partition from memory store before getting it back, e.g.
144+ * when the entirety of the RDD does not fit in memory. */
145+ val elements = new ArrayBuffer [Any ]
146+ elements ++= values
147+ updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true )
148+ elements.iterator.asInstanceOf [Iterator [T ]]
149+ }
150+ }
151+
131152}
0 commit comments