@@ -75,14 +75,6 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
7575 val blockManager = SparkEnv .get.blockManager
7676 val currSplit = split.asInstanceOf [CartesianPartition ]
7777 val blockId2 = RDDBlockId (rdd2.id, currSplit.s2.index)
78- // Whether the block persisted by the user with valid StorageLevel.
79- val persistedInLocal = blockManager.getStatus(blockId2) match {
80- case Some (result) =>
81- // This meaning if the block is cached by the user? If it's valid, it shouldn't be
82- // removed by other task.
83- result.storageLevel.isValid
84- case None => false
85- }
8678 var cachedInLocal = false
8779
8880 // Try to get data from the local, otherwise it will be cached to the local.
@@ -91,45 +83,58 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
9183 partition : Partition ,
9284 context : TaskContext ,
9385 level : StorageLevel ): Iterator [U ] = {
94- // Because the getLocalValues return a CompletionIterator, and it will release the read
95- // block after the iterator finish using. So there should update the flag.
96- cachedInLocal = blockManager.getStatus(blockId2) match {
97- case Some (_) => true
98- case None => false
99- }
100-
101- if (persistedInLocal || cachedInLocal) {
102- blockManager.getLocalValues(blockId2) match {
103- case Some (result) =>
104- val existingMetrics = context.taskMetrics().inputMetrics
105- existingMetrics.incBytesRead(result.bytes)
106- return new InterruptibleIterator [U ](context, result.data.asInstanceOf [Iterator [U ]]) {
107- override def next (): U = {
108- existingMetrics.incRecordsRead(1 )
109- delegate.next()
110- }
111- }
112- case None =>
113- if (persistedInLocal) {
114- throw new SparkException (s " Block $blockId2 was not found even though it's persisted " )
115- }
116- }
86+ getLocalValues() match {
87+ case Some (result) =>
88+ cachedInLocal = true
89+ return result
90+ case None =>
91+ cachedInLocal = false
11792 }
11893
11994 val iterator = rdd.iterator(partition, context)
120- val cachedResult = blockManager.putIterator[U ](blockId2, iterator, level, false ) match {
95+ // Keep read lock, because next we need read it.
96+ val cachedResult = blockManager.putIterator[U ](blockId2, iterator, level, false ,
97+ true ) match {
12198 case true =>
12299 cachedInLocal = true
123100 " successful"
124- case false => " failed"
101+ case false =>
102+ cachedInLocal = false
103+ " failed"
125104 }
126105
127106 logInfo(s " Cache the block $blockId2 to local $cachedResult. " )
128- iterator
107+ getLocalValues() match {
108+ // We don't need release the read lock, it will release after the iterator completion.
109+ case Some (result) => result
110+ case None =>
111+ throw new SparkException (s " Block $blockId2 was not found even though it's read-locked " )
112+ }
113+ }
114+
115+ def getLocalValues (): Option [Iterator [U ]] = {
116+ blockManager.getLocalValues(blockId2) match {
117+ case Some (result) =>
118+ val existingMetrics = context.taskMetrics().inputMetrics
119+ existingMetrics.incBytesRead(result.bytes)
120+ val localIter =
121+ new InterruptibleIterator [U ](context, result.data.asInstanceOf [Iterator [U ]]) {
122+ override def next (): U = {
123+ existingMetrics.incRecordsRead(1 )
124+ delegate.next()
125+ }
126+ }
127+ Some (localIter)
128+ case None =>
129+ None
130+ }
129131 }
130132
131133 def removeCachedBlock (): Unit = {
132134 val blockManager = SparkEnv .get.blockManager
135+ // Whether the block it persisted by the user.
136+ val persistedInLocal =
137+ blockManager.master.getLocations(blockId2).contains(blockManager.blockManagerId)
133138 if (! persistedInLocal || cachedInLocal || blockManager.isRemovable(blockId2)) {
134139 blockManager.removeOrMarkAsRemovable(blockId2, false )
135140 }
0 commit comments