Skip to content

Commit 7cc146d

Browse files
committed
Address the comments
1 parent 2624723 commit 7cc146d

File tree

3 files changed

+1
-12
lines changed

3 files changed

+1
-12
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
9494
// contains a map from hostname to a list of input format splits on the host.
9595
private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()
9696

97-
// This is used for Spark Streaming to check whether driver host and port are set by user,
98-
// if these two configurations are set by user, so the recovery mechanism should not remove this.
99-
private[spark] val isDriverHostSetByUser = config.contains("spark.driver.host")
100-
private[spark] val isDriverPortSetByUser = config.contains("spark.driver.port")
101-
10297
val startTime = System.currentTimeMillis()
10398

10499
private val stopped: AtomicBoolean = new AtomicBoolean(false)

streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
4141
val checkpointDuration = ssc.checkpointDuration
4242
val pendingTimes = ssc.scheduler.getPendingTimes().toArray
4343
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
44-
val sparkConfPairs = ssc.conf.getAll.filterNot { kv =>
45-
(!ssc.sc.isDriverHostSetByUser && kv._1 == "spark.driver.host") ||
46-
(!ssc.sc.isDriverPortSetByUser && kv._1 == "spark.driver.port") }
44+
val sparkConfPairs = ssc.conf.getAll
4745

4846
def createSparkConf(): SparkConf = {
4947

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,6 @@ class CheckpointSuite extends TestSuiteBase {
198198
conf.foreach(kv => System.setProperty(kv._1, kv._2))
199199
ssc = new StreamingContext(master, framework, batchDuration)
200200
val originalConf = ssc.conf
201-
assert(ssc.sc.isDriverHostSetByUser === true)
202-
assert(ssc.sc.isDriverPortSetByUser === true)
203201
assert(originalConf.get("spark.driver.host") === "localhost")
204202
assert(originalConf.get("spark.driver.port") === "9999")
205203

@@ -218,8 +216,6 @@ class CheckpointSuite extends TestSuiteBase {
218216
// Check if all the parameters have been restored
219217
ssc = new StreamingContext(null, newCp, null)
220218
val restoredConf = ssc.conf
221-
assert(ssc.sc.isDriverHostSetByUser === true)
222-
assert(ssc.sc.isDriverPortSetByUser === true)
223219
assert(restoredConf.get("spark.driver.host") === "localhost")
224220
assert(restoredConf.get("spark.driver.port") === "9999")
225221
}

0 commit comments

Comments
 (0)