From bbc1c9c6a7c9535d574452b9410cb7df3d5c7f19 Mon Sep 17 00:00:00 2001 From: Saisai Shao Date: Tue, 17 Mar 2015 10:29:43 +0800 Subject: [PATCH 1/6] Fix checkpointing doesn't retain driver port issue --- .../scala/org/apache/spark/SparkContext.scala | 5 +++ .../apache/spark/streaming/Checkpoint.scala | 8 +++-- .../spark/streaming/CheckpointSuite.scala | 32 +++++++++++++++++++ 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index bd1cc332a63e..60478f8952a1 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -94,6 +94,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // contains a map from hostname to a list of input format splits on the host. private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map() + // This is used for Spark Streaming to check whether driver host and port are set by user, + // if these two configurations are set by user, so the recovery mechanism should not remove this. + private[spark] val isDriverHostSetByUser = config.contains("spark.driver.host") + private[spark] val isDriverPortSetByUser = config.contains("spark.driver.port") + val startTime = System.currentTimeMillis() private val stopped: AtomicBoolean = new AtomicBoolean(false) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 5279331c9e12..ce78111712b9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -41,7 +41,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) - val sparkConfPairs = ssc.conf.getAll + val sparkConfPairs = ssc.conf.getAll.filterNot { kv => + (!ssc.sc.isDriverHostSetByUser && kv._1 == "spark.driver.host") || + (!ssc.sc.isDriverPortSetByUser && kv._1 == "spark.driver.port") def createSparkConf(): SparkConf = { @@ -50,7 +52,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val propertiesToReload = List( "spark.master", "spark.yarn.keytab", - "spark.yarn.principal") + "spark.yarn.principal", + "spark.driver.host", + "spark.driver.port") val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs) .remove("spark.driver.host") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 6a9492807623..f4087d405a0c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -191,6 +191,38 @@ class CheckpointSuite extends TestSuiteBase { } } + // This tests if "spark.driver.host" and "spark.driver.port" is set by user, can be recovered + // with correct value. + test("correctly recover spark.driver.[host|port] from checkpoint") { + val conf = Map("spark.driver.host" -> "localhost", "spark.driver.port" -> "9999") + conf.foreach(kv => System.setProperty(kv._1, kv._2)) + ssc = new StreamingContext(master, framework, batchDuration) + val originalConf = ssc.conf + assert(ssc.sc.isDriverHostSetByUser === true) + assert(ssc.sc.isDriverPortSetByUser === true) + assert(originalConf.get("spark.driver.host") === "localhost") + assert(originalConf.get("spark.driver.port") === "9999") + + val cp = new Checkpoint(ssc, Time(1000)) + ssc.stop() + + // Serialize/deserialize to simulate write to storage and reading it back + val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) + + val newCpConf = newCp.sparkConf + assert(newCpConf.contains("spark.driver.host")) + assert(newCpConf.contains("spark.driver.port")) + assert(originalConf.get("spark.driver.host") === "localhost") + assert(originalConf.get("spark.driver.port") === "9999") + + // Check if all the parameters have been restored + ssc = new StreamingContext(null, newCp, null) + val restoredConf = ssc.conf + assert(ssc.sc.isDriverHostSetByUser === true) + assert(ssc.sc.isDriverPortSetByUser === true) + assert(restoredConf.get("spark.driver.host") === "localhost") + assert(restoredConf.get("spark.driver.port") === "9999") + } // This tests whether the systm can recover from a master failure with simple // non-stateful operations. This assumes as reliable, replayable input From 45befaaccd5d4c2b06616a56f5bf777dcfb431c3 Mon Sep 17 00:00:00 2001 From: Saisai Shao Date: Tue, 17 Mar 2015 11:21:45 +0800 Subject: [PATCH 2/6] Update the unit test --- .../scala/org/apache/spark/streaming/CheckpointSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index f4087d405a0c..9eea7efda928 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -212,8 +212,8 @@ class CheckpointSuite extends TestSuiteBase { val newCpConf = newCp.sparkConf assert(newCpConf.contains("spark.driver.host")) assert(newCpConf.contains("spark.driver.port")) - assert(originalConf.get("spark.driver.host") === "localhost") - assert(originalConf.get("spark.driver.port") === "9999") + assert(newCpConf.get("spark.driver.host") === "localhost") + assert(newCpConf.get("spark.driver.port") === "9999") // Check if all the parameters have been restored ssc = new StreamingContext(null, newCp, null) From 26247230588eeffb28ae2f5f89b04982ba06dc9a Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 24 Apr 2015 16:21:48 +0800 Subject: [PATCH 3/6] Fix rebase conflict --- .../src/main/scala/org/apache/spark/streaming/Checkpoint.scala | 2 +- .../test/scala/org/apache/spark/streaming/CheckpointSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index ce78111712b9..31ebc8c92a02 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -43,7 +43,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConfPairs = ssc.conf.getAll.filterNot { kv => (!ssc.sc.isDriverHostSetByUser && kv._1 == "spark.driver.host") || - (!ssc.sc.isDriverPortSetByUser && kv._1 == "spark.driver.port") + (!ssc.sc.isDriverPortSetByUser && kv._1 == "spark.driver.port") } def createSparkConf(): SparkConf = { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 9eea7efda928..b21339b01f6c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -209,7 +209,7 @@ class CheckpointSuite extends TestSuiteBase { // Serialize/deserialize to simulate write to storage and reading it back val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) - val newCpConf = newCp.sparkConf + val newCpConf = newCp.createSparkConf() assert(newCpConf.contains("spark.driver.host")) assert(newCpConf.contains("spark.driver.port")) assert(newCpConf.get("spark.driver.host") === "localhost") From 7cc146d207077909218dbdef2b7e9eb0fc7faba7 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 15 Jul 2015 15:38:43 +0800 Subject: [PATCH 4/6] Address the comments --- core/src/main/scala/org/apache/spark/SparkContext.scala | 5 ----- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 4 +--- .../scala/org/apache/spark/streaming/CheckpointSuite.scala | 4 ---- 3 files changed, 1 insertion(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 60478f8952a1..bd1cc332a63e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -94,11 +94,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // contains a map from hostname to a list of input format splits on the host. private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map() - // This is used for Spark Streaming to check whether driver host and port are set by user, - // if these two configurations are set by user, so the recovery mechanism should not remove this. - private[spark] val isDriverHostSetByUser = config.contains("spark.driver.host") - private[spark] val isDriverPortSetByUser = config.contains("spark.driver.port") - val startTime = System.currentTimeMillis() private val stopped: AtomicBoolean = new AtomicBoolean(false) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 31ebc8c92a02..96753d9b3cc5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -41,9 +41,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) - val sparkConfPairs = ssc.conf.getAll.filterNot { kv => - (!ssc.sc.isDriverHostSetByUser && kv._1 == "spark.driver.host") || - (!ssc.sc.isDriverPortSetByUser && kv._1 == "spark.driver.port") } + val sparkConfPairs = ssc.conf.getAll def createSparkConf(): SparkConf = { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index b21339b01f6c..52702bb852d8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -198,8 +198,6 @@ class CheckpointSuite extends TestSuiteBase { conf.foreach(kv => System.setProperty(kv._1, kv._2)) ssc = new StreamingContext(master, framework, batchDuration) val originalConf = ssc.conf - assert(ssc.sc.isDriverHostSetByUser === true) - assert(ssc.sc.isDriverPortSetByUser === true) assert(originalConf.get("spark.driver.host") === "localhost") assert(originalConf.get("spark.driver.port") === "9999") @@ -218,8 +216,6 @@ class CheckpointSuite extends TestSuiteBase { // Check if all the parameters have been restored ssc = new StreamingContext(null, newCp, null) val restoredConf = ssc.conf - assert(ssc.sc.isDriverHostSetByUser === true) - assert(ssc.sc.isDriverPortSetByUser === true) assert(restoredConf.get("spark.driver.host") === "localhost") assert(restoredConf.get("spark.driver.port") === "9999") } From 275d25284153ed477a91d764ae9968c41c389e29 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 15 Jul 2015 17:41:34 +0800 Subject: [PATCH 5/6] Address the comments --- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 96753d9b3cc5..65d4e933bf8e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -48,11 +48,11 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) // Reload properties for the checkpoint application since user wants to set a reload property // or spark had changed its value and user wants to set it back. val propertiesToReload = List( + "spark.driver.host", + "spark.driver.port", "spark.master", "spark.yarn.keytab", - "spark.yarn.principal", - "spark.driver.host", - "spark.driver.port") + "spark.yarn.principal") val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs) .remove("spark.driver.host") From 89b01f5c389ca06be7e1296326a79e34d5a955ac Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 16 Jul 2015 11:08:08 +0800 Subject: [PATCH 6/6] Update the unit test to add more cases --- .../spark/streaming/CheckpointSuite.scala | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 52702bb852d8..d308ac05a54f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -193,7 +193,7 @@ class CheckpointSuite extends TestSuiteBase { // This tests if "spark.driver.host" and "spark.driver.port" is set by user, can be recovered // with correct value. - test("correctly recover spark.driver.[host|port] from checkpoint") { + test("get correct spark.driver.[host|port] from checkpoint") { val conf = Map("spark.driver.host" -> "localhost", "spark.driver.port" -> "9999") conf.foreach(kv => System.setProperty(kv._1, kv._2)) ssc = new StreamingContext(master, framework, batchDuration) @@ -218,9 +218,24 @@ class CheckpointSuite extends TestSuiteBase { val restoredConf = ssc.conf assert(restoredConf.get("spark.driver.host") === "localhost") assert(restoredConf.get("spark.driver.port") === "9999") + ssc.stop() + + // If spark.driver.host and spark.driver.host is not set in system property, these two + // parameters should not be presented in the newly recovered conf. + conf.foreach(kv => System.clearProperty(kv._1)) + val newCpConf1 = newCp.createSparkConf() + assert(!newCpConf1.contains("spark.driver.host")) + assert(!newCpConf1.contains("spark.driver.port")) + + // Spark itself will dispatch a random, not-used port for spark.driver.port if it is not set + // explicitly. + ssc = new StreamingContext(null, newCp, null) + val restoredConf1 = ssc.conf + assert(restoredConf1.get("spark.driver.host") === "localhost") + assert(restoredConf1.get("spark.driver.port") !== "9999") } - // This tests whether the systm can recover from a master failure with simple + // This tests whether the system can recover from a master failure with simple // non-stateful operations. This assumes as reliable, replayable input // source - TestInputDStream. test("recovery with map and reduceByKey operations") {