From cfa74942fc8bcc7edcea6cb81b837ba7931af43f Mon Sep 17 00:00:00 2001 From: Aaruna Date: Mon, 9 Dec 2019 22:14:55 -0800 Subject: [PATCH] [SPARK-30199][DSTREAM] Recover spark.ui.port and spark.blockManager.port from checkpoint --- .../apache/spark/streaming/Checkpoint.scala | 4 +++ .../spark/streaming/CheckpointSuite.scala | 27 +++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 8617434d4d74..5d81d36dfe35 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -55,6 +55,8 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) "spark.driver.bindAddress", "spark.driver.port", "spark.master", + "spark.ui.port", + "spark.blockManager.port", "spark.kubernetes.driver.pod.name", "spark.kubernetes.executor.podNamePrefix", "spark.yarn.jars", @@ -69,6 +71,8 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) .remove("spark.driver.host") .remove("spark.driver.bindAddress") .remove("spark.driver.port") + .remove("spark.ui.port") + .remove("spark.blockManager.port") .remove("spark.kubernetes.driver.pod.name") .remove("spark.kubernetes.executor.podNamePrefix") val newReloadConf = new SparkConf(loadDefaults = true) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 25c079658536..238ef1e2367a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -420,6 +420,33 @@ class CheckpointSuite extends TestSuiteBase with LocalStreamingContext with DStr assert(restoredConf1.get("spark.driver.port") !== "9999") } + test("SPARK-30199 get ui port and blockmanager port") { + val conf = Map("spark.ui.port" -> "30001", "spark.blockManager.port" -> "30002") + conf.foreach { case (k, v) => System.setProperty(k, v) } + ssc = new StreamingContext(master, framework, batchDuration) + conf.foreach { case (k, v) => assert(ssc.conf.get(k) === v) } + + val cp = new Checkpoint(ssc, Time(1000)) + ssc.stop() + + // Serialize/deserialize to simulate write to storage and reading it back + val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) + + val newCpConf = newCp.createSparkConf() + conf.foreach { case (k, v) => assert(newCpConf.contains(k) && newCpConf.get(k) === v) } + + // Check if all the parameters have been restored + ssc = new StreamingContext(null, newCp, null) + conf.foreach { case (k, v) => assert(ssc.conf.get(k) === v) } + ssc.stop() + + // If port numbers are not set in system property, these parameters should not be presented + // in the newly recovered conf. + conf.foreach(kv => System.clearProperty(kv._1)) + val newCpConf1 = newCp.createSparkConf() + conf.foreach { case (k, _) => assert(!newCpConf1.contains(k)) } + } + // This tests whether the system can recover from a master failure with simple // non-stateful operations. This assumes as reliable, replayable input // source - TestInputDStream.