Skip to content

Commit 08c1849

Browse files
committed
follow the code style and add some comments
1 parent 0f812d9 commit 08c1849

File tree

2 files changed

+13
-14
lines changed

2 files changed

+13
-14
lines changed

core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -85,25 +85,21 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
8585
level: StorageLevel): Iterator[U] = {
8686
getLocalValues() match {
8787
case Some(result) =>
88-
cachedInLocal = true
8988
return result
90-
case None =>
91-
cachedInLocal = false
89+
case None => // do nothing
9290
}
9391

9492
val iterator = rdd.iterator(partition, context)
95-
// Keep read lock, because next we need read it.
96-
val cachedResult = blockManager.putIterator[U](blockId2, iterator, level, false,
97-
true) match {
98-
case true =>
99-
cachedInLocal = true
100-
"successful"
93+
// Keep read lock, because next we need read it. And don't tell master.
94+
blockManager.putIterator[U](blockId2, iterator, level, false, true) match {
95+
case true => cachedInLocal = true
10196
case false =>
102-
cachedInLocal = false
103-
"failed"
97+
// There shouldn't a error caused by put in memory, because we use MEMORY_AND_DISK to
98+
// cache it.
99+
throw new SparkException(s"Cache block $blockId2 in local failed even though it's $level")
104100
}
105101

106-
logInfo(s"Cache the block $blockId2 to local $cachedResult.")
102+
logInfo(s"Cache the block $blockId2 to local successful.")
107103
getLocalValues() match {
108104
// We don't need release the read lock, it will release after the iterator completion.
109105
case Some(result) => result
@@ -112,6 +108,7 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
112108
}
113109
}
114110

111+
// Get block from local, and update the metrics.
115112
def getLocalValues(): Option[Iterator[U]] = {
116113
blockManager.getLocalValues(blockId2) match {
117114
case Some(result) =>
@@ -135,7 +132,7 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
135132
// Whether the block it persisted by the user.
136133
val persistedInLocal =
137134
blockManager.master.getLocations(blockId2).contains(blockManager.blockManagerId)
138-
if (!persistedInLocal || cachedInLocal || blockManager.isRemovable(blockId2)) {
135+
if (!persistedInLocal && (cachedInLocal || blockManager.isRemovable(blockId2))) {
139136
blockManager.removeOrMarkAsRemovable(blockId2, false)
140137
}
141138
}

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,18 @@ import java.nio.ByteBuffer
2121
import java.util.Properties
2222

2323
import scala.collection.mutable.ArrayBuffer
24-
import scala.concurrent.duration._
2524
import scala.concurrent.{ExecutionContext, Future}
25+
import scala.concurrent.duration._
2626
import scala.language.implicitConversions
2727
import scala.language.postfixOps
2828
import scala.reflect.ClassTag
29+
2930
import org.mockito.{Matchers => mc}
3031
import org.mockito.Mockito.{mock, times, verify, when}
3132
import org.scalatest._
3233
import org.scalatest.concurrent.Eventually._
3334
import org.scalatest.concurrent.Timeouts._
35+
3436
import org.apache.spark._
3537
import org.apache.spark.broadcast.BroadcastManager
3638
import org.apache.spark.executor.DataReadMethod

0 commit comments

Comments
 (0)