Skip to content

Commit cfa7494

Browse files
aarunadongjoon-hyun
authored andcommitted
[SPARK-30199][DSTREAM] Recover spark.ui.port and spark.blockManager.port from checkpoint
1 parent be867e8 commit cfa7494

File tree

2 files changed

+31
-0
lines changed

2 files changed

+31
-0
lines changed

streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
5555
"spark.driver.bindAddress",
5656
"spark.driver.port",
5757
"spark.master",
58+
"spark.ui.port",
59+
"spark.blockManager.port",
5860
"spark.kubernetes.driver.pod.name",
5961
"spark.kubernetes.executor.podNamePrefix",
6062
"spark.yarn.jars",
@@ -69,6 +71,8 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
6971
.remove("spark.driver.host")
7072
.remove("spark.driver.bindAddress")
7173
.remove("spark.driver.port")
74+
.remove("spark.ui.port")
75+
.remove("spark.blockManager.port")
7276
.remove("spark.kubernetes.driver.pod.name")
7377
.remove("spark.kubernetes.executor.podNamePrefix")
7478
val newReloadConf = new SparkConf(loadDefaults = true)

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,33 @@ class CheckpointSuite extends TestSuiteBase with LocalStreamingContext with DStr
420420
assert(restoredConf1.get("spark.driver.port") !== "9999")
421421
}
422422

423+
test("SPARK-30199 get ui port and blockmanager port") {
424+
val conf = Map("spark.ui.port" -> "30001", "spark.blockManager.port" -> "30002")
425+
conf.foreach { case (k, v) => System.setProperty(k, v) }
426+
ssc = new StreamingContext(master, framework, batchDuration)
427+
conf.foreach { case (k, v) => assert(ssc.conf.get(k) === v) }
428+
429+
val cp = new Checkpoint(ssc, Time(1000))
430+
ssc.stop()
431+
432+
// Serialize/deserialize to simulate write to storage and reading it back
433+
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
434+
435+
val newCpConf = newCp.createSparkConf()
436+
conf.foreach { case (k, v) => assert(newCpConf.contains(k) && newCpConf.get(k) === v) }
437+
438+
// Check if all the parameters have been restored
439+
ssc = new StreamingContext(null, newCp, null)
440+
conf.foreach { case (k, v) => assert(ssc.conf.get(k) === v) }
441+
ssc.stop()
442+
443+
// If port numbers are not set in system property, these parameters should not be presented
444+
// in the newly recovered conf.
445+
conf.foreach(kv => System.clearProperty(kv._1))
446+
val newCpConf1 = newCp.createSparkConf()
447+
conf.foreach { case (k, _) => assert(!newCpConf1.contains(k)) }
448+
}
449+
423450
// This tests whether the system can recover from a master failure with simple
424451
// non-stateful operations. This assumes as reliable, replayable input
425452
// source - TestInputDStream.

0 commit comments

Comments
 (0)