Skip to content

Commit 2fefb60

Browse files
aarunadongjoon-hyun
authored andcommitted
[SPARK-30199][DSTREAM] Recover spark.(ui|blockManager).port from checkpoint
### What changes were proposed in this pull request? This is a backport of #26827. This PR aims to recover `spark.ui.port` and `spark.blockManager.port` from checkpoint like `spark.driver.port`. ### Why are the changes needed? When the user configures these values, we can respect them. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Pass the Jenkins with the newly added test cases. Closes #28320 from dongjoon-hyun/SPARK-30199-2.4. Authored-by: Aaruna <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 5183984 commit 2fefb60

File tree

2 files changed

+31
-0
lines changed

2 files changed

+31
-0
lines changed

streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
5454
"spark.driver.bindAddress",
5555
"spark.driver.port",
5656
"spark.master",
57+
"spark.ui.port",
58+
"spark.blockManager.port",
5759
"spark.kubernetes.driver.pod.name",
5860
"spark.kubernetes.executor.podNamePrefix",
5961
"spark.yarn.jars",
@@ -66,6 +68,8 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
6668
.remove("spark.driver.host")
6769
.remove("spark.driver.bindAddress")
6870
.remove("spark.driver.port")
71+
.remove("spark.ui.port")
72+
.remove("spark.blockManager.port")
6973
.remove("spark.kubernetes.driver.pod.name")
7074
.remove("spark.kubernetes.executor.podNamePrefix")
7175
val newReloadConf = new SparkConf(loadDefaults = true)

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,33 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
410410
assert(restoredConf1.get("spark.driver.port") !== "9999")
411411
}
412412

413+
test("SPARK-30199 get ui port and blockmanager port") {
414+
val conf = Map("spark.ui.port" -> "30001", "spark.blockManager.port" -> "30002")
415+
conf.foreach { case (k, v) => System.setProperty(k, v) }
416+
ssc = new StreamingContext(master, framework, batchDuration)
417+
conf.foreach { case (k, v) => assert(ssc.conf.get(k) === v) }
418+
419+
val cp = new Checkpoint(ssc, Time(1000))
420+
ssc.stop()
421+
422+
// Serialize/deserialize to simulate write to storage and reading it back
423+
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
424+
425+
val newCpConf = newCp.createSparkConf()
426+
conf.foreach { case (k, v) => assert(newCpConf.contains(k) && newCpConf.get(k) === v) }
427+
428+
// Check if all the parameters have been restored
429+
ssc = new StreamingContext(null, newCp, null)
430+
conf.foreach { case (k, v) => assert(ssc.conf.get(k) === v) }
431+
ssc.stop()
432+
433+
// If port numbers are not set in system property, these parameters should not be presented
434+
// in the newly recovered conf.
435+
conf.foreach(kv => System.clearProperty(kv._1))
436+
val newCpConf1 = newCp.createSparkConf()
437+
conf.foreach { case (k, _) => assert(!newCpConf1.contains(k)) }
438+
}
439+
413440
// This tests whether the system can recover from a master failure with simple
414441
// non-stateful operations. This assumes as reliable, replayable input
415442
// source - TestInputDStream.

0 commit comments

Comments
 (0)