@@ -19,106 +19,57 @@ package org.apache.spark
1919
2020import scala .collection .mutable .{ArrayBuffer , HashSet }
2121
22+ import org .apache .spark .executor .InputMetrics
2223import org .apache .spark .rdd .RDD
23- import org .apache .spark .storage .{ BlockId , BlockManager , BlockStatus , RDDBlockId , StorageLevel }
24+ import org .apache .spark .storage ._
2425
2526/**
26- * Spark class responsible for passing RDDs split contents to the BlockManager and making
27+ * Spark class responsible for passing RDDs partition contents to the BlockManager and making
2728 * sure a node doesn't load two copies of an RDD at once.
2829 */
2930private [spark] class CacheManager (blockManager : BlockManager ) extends Logging {
3031
31- /** Keys of RDD splits that are being computed/loaded. */
32+ /** Keys of RDD partitions that are being computed/loaded. */
3233 private val loading = new HashSet [RDDBlockId ]()
3334
34- /** Gets or computes an RDD split . Used by RDD.iterator() when an RDD is cached. */
35+ /** Gets or computes an RDD partition . Used by RDD.iterator() when an RDD is cached. */
3536 def getOrCompute [T ](
3637 rdd : RDD [T ],
37- split : Partition ,
38+ partition : Partition ,
3839 context : TaskContext ,
3940 storageLevel : StorageLevel ): Iterator [T ] = {
4041
41- val key = RDDBlockId (rdd.id, split .index)
42+ val key = RDDBlockId (rdd.id, partition .index)
4243 logDebug(s " Looking for partition $key" )
4344 blockManager.get(key) match {
44- case Some (values ) =>
45+ case Some (blockResult ) =>
4546 // Partition is already materialized, so just return its values
46- new InterruptibleIterator (context, values.asInstanceOf [Iterator [T ]])
47+ context.taskMetrics.inputMetrics = Some (blockResult.inputMetrics)
48+ new InterruptibleIterator (context, blockResult.data.asInstanceOf [Iterator [T ]])
4749
4850 case None =>
49- // Mark the split as loading (unless someone else marks it first)
50- loading.synchronized {
51- if (loading.contains(key)) {
52- logInfo(s " Another thread is loading $key, waiting for it to finish... " )
53- while (loading.contains(key)) {
54- try {
55- loading.wait()
56- } catch {
57- case e : Exception =>
58- logWarning(s " Got an exception while waiting for another thread to load $key" , e)
59- }
60- }
61- logInfo(s " Finished waiting for $key" )
62- /* See whether someone else has successfully loaded it. The main way this would fail
63- * is for the RDD-level cache eviction policy if someone else has loaded the same RDD
64- * partition but we didn't want to make space for it. However, that case is unlikely
65- * because it's unlikely that two threads would work on the same RDD partition. One
66- * downside of the current code is that threads wait serially if this does happen. */
67- blockManager.get(key) match {
68- case Some (values) =>
69- return new InterruptibleIterator (context, values.asInstanceOf [Iterator [T ]])
70- case None =>
71- logInfo(s " Whoever was loading $key failed; we'll try it ourselves " )
72- loading.add(key)
73- }
74- } else {
75- loading.add(key)
76- }
51+ // Acquire a lock for loading this partition
52+ // If another thread already holds the lock, wait for it to finish return its results
53+ val storedValues = acquireLockForPartition[T ](key)
54+ if (storedValues.isDefined) {
55+ return new InterruptibleIterator [T ](context, storedValues.get)
7756 }
57+
58+ // Otherwise, we have to load the partition ourselves
7859 try {
79- // If we got here, we have to load the split
8060 logInfo(s " Partition $key not found, computing it " )
81- val computedValues = rdd.computeOrReadCheckpoint(split , context)
61+ val computedValues = rdd.computeOrReadCheckpoint(partition , context)
8262
83- // Persist the result, so long as the task is not running locally
63+ // If the task is running locally, do not persist the result
8464 if (context.runningLocally) {
8565 return computedValues
8666 }
8767
88- // Keep track of blocks with updated statuses
89- var updatedBlocks = Seq [(BlockId , BlockStatus )]()
90- val returnValue : Iterator [T ] = {
91- if (storageLevel.useDisk && ! storageLevel.useMemory) {
92- /* In the case that this RDD is to be persisted using DISK_ONLY
93- * the iterator will be passed directly to the blockManager (rather then
94- * caching it to an ArrayBuffer first), then the resulting block data iterator
95- * will be passed back to the user. If the iterator generates a lot of data,
96- * this means that it doesn't all have to be held in memory at one time.
97- * This could also apply to MEMORY_ONLY_SER storage, but we need to make sure
98- * blocks aren't dropped by the block store before enabling that. */
99- updatedBlocks = blockManager.put(key, computedValues, storageLevel, tellMaster = true )
100- blockManager.get(key) match {
101- case Some (values) =>
102- values.asInstanceOf [Iterator [T ]]
103- case None =>
104- logInfo(s " Failure to store $key" )
105- throw new SparkException (" Block manager failed to return persisted value" )
106- }
107- } else {
108- // In this case the RDD is cached to an array buffer. This will save the results
109- // if we're dealing with a 'one-time' iterator
110- val elements = new ArrayBuffer [Any ]
111- elements ++= computedValues
112- updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true )
113- elements.iterator.asInstanceOf [Iterator [T ]]
114- }
115- }
116-
117- // Update task metrics to include any blocks whose storage status is updated
118- val metrics = context.taskMetrics
119- metrics.updatedBlocks = Some (updatedBlocks)
120-
121- new InterruptibleIterator (context, returnValue)
68+ // Otherwise, cache the values and keep track of any updates in block statuses
69+ val updatedBlocks = new ArrayBuffer [(BlockId , BlockStatus )]
70+ val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
71+ context.taskMetrics.updatedBlocks = Some (updatedBlocks)
72+ new InterruptibleIterator (context, cachedValues)
12273
12374 } finally {
12475 loading.synchronized {
@@ -128,4 +79,76 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
12879 }
12980 }
13081 }
82+
83+ /**
84+ * Acquire a loading lock for the partition identified by the given block ID.
85+ *
86+ * If the lock is free, just acquire it and return None. Otherwise, another thread is already
87+ * loading the partition, so we wait for it to finish and return the values loaded by the thread.
88+ */
89+ private def acquireLockForPartition [T ](id : RDDBlockId ): Option [Iterator [T ]] = {
90+ loading.synchronized {
91+ if (! loading.contains(id)) {
92+ // If the partition is free, acquire its lock to compute its value
93+ loading.add(id)
94+ None
95+ } else {
96+ // Otherwise, wait for another thread to finish and return its result
97+ logInfo(s " Another thread is loading $id, waiting for it to finish... " )
98+ while (loading.contains(id)) {
99+ try {
100+ loading.wait()
101+ } catch {
102+ case e : Exception =>
103+ logWarning(s " Exception while waiting for another thread to load $id" , e)
104+ }
105+ }
106+ logInfo(s " Finished waiting for $id" )
107+ val values = blockManager.get(id)
108+ if (! values.isDefined) {
109+ /* The block is not guaranteed to exist even after the other thread has finished.
110+ * For instance, the block could be evicted after it was put, but before our get.
111+ * In this case, we still need to load the partition ourselves. */
112+ logInfo(s " Whoever was loading $id failed; we'll try it ourselves " )
113+ loading.add(id)
114+ }
115+ values.map(_.data.asInstanceOf [Iterator [T ]])
116+ }
117+ }
118+ }
119+
120+ /**
121+ * Cache the values of a partition, keeping track of any updates in the storage statuses
122+ * of other blocks along the way.
123+ */
124+ private def putInBlockManager [T ](
125+ key : BlockId ,
126+ values : Iterator [T ],
127+ storageLevel : StorageLevel ,
128+ updatedBlocks : ArrayBuffer [(BlockId , BlockStatus )]): Iterator [T ] = {
129+
130+ if (! storageLevel.useMemory) {
131+ /* This RDD is not to be cached in memory, so we can just pass the computed values
132+ * as an iterator directly to the BlockManager, rather than first fully unrolling
133+ * it in memory. The latter option potentially uses much more memory and risks OOM
134+ * exceptions that can be avoided. */
135+ updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true )
136+ blockManager.get(key) match {
137+ case Some (v) => v.data.asInstanceOf [Iterator [T ]]
138+ case None =>
139+ logInfo(s " Failure to store $key" )
140+ throw new BlockException (key, s " Block manager failed to return cached value for $key! " )
141+ }
142+ } else {
143+ /* This RDD is to be cached in memory. In this case we cannot pass the computed values
144+ * to the BlockManager as an iterator and expect to read it back later. This is because
145+ * we may end up dropping a partition from memory store before getting it back, e.g.
146+ * when the entirety of the RDD does not fit in memory. */
147+ val elements = new ArrayBuffer [Any ]
148+ elements ++= values
149+ updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true )
150+ elements.iterator.asInstanceOf [Iterator [T ]]
151+ }
152+ }
153+
131154}
0 commit comments