Skip to content

Commit 031d7d4

Browse files
jerryshaotdas
authored andcommitted
[SPARK-6304] [STREAMING] Fix checkpointing doesn't retain driver port issue.
Author: jerryshao <[email protected]> Author: Saisai Shao <[email protected]> Closes apache#5060 from jerryshao/SPARK-6304 and squashes the following commits: 89b01f5 [jerryshao] Update the unit test to add more cases 275d252 [jerryshao] Address the comments 7cc146d [jerryshao] Address the comments 2624723 [jerryshao] Fix rebase conflict 45befaa [Saisai Shao] Update the unit test bbc1c9c [Saisai Shao] Fix checkpointing doesn't retain driver port issue
1 parent fec10f0 commit 031d7d4

File tree

2 files changed

+46
-1
lines changed

2 files changed

+46
-1
lines changed

streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
4848
// Reload properties for the checkpoint application since user wants to set a reload property
4949
// or spark had changed its value and user wants to set it back.
5050
val propertiesToReload = List(
51+
"spark.driver.host",
52+
"spark.driver.port",
5153
"spark.master",
5254
"spark.yarn.keytab",
5355
"spark.yarn.principal")

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,51 @@ class CheckpointSuite extends TestSuiteBase {
191191
}
192192
}
193193

194+
// This tests if "spark.driver.host" and "spark.driver.port" is set by user, can be recovered
195+
// with correct value.
196+
test("get correct spark.driver.[host|port] from checkpoint") {
197+
val conf = Map("spark.driver.host" -> "localhost", "spark.driver.port" -> "9999")
198+
conf.foreach(kv => System.setProperty(kv._1, kv._2))
199+
ssc = new StreamingContext(master, framework, batchDuration)
200+
val originalConf = ssc.conf
201+
assert(originalConf.get("spark.driver.host") === "localhost")
202+
assert(originalConf.get("spark.driver.port") === "9999")
203+
204+
val cp = new Checkpoint(ssc, Time(1000))
205+
ssc.stop()
206+
207+
// Serialize/deserialize to simulate write to storage and reading it back
208+
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
209+
210+
val newCpConf = newCp.createSparkConf()
211+
assert(newCpConf.contains("spark.driver.host"))
212+
assert(newCpConf.contains("spark.driver.port"))
213+
assert(newCpConf.get("spark.driver.host") === "localhost")
214+
assert(newCpConf.get("spark.driver.port") === "9999")
215+
216+
// Check if all the parameters have been restored
217+
ssc = new StreamingContext(null, newCp, null)
218+
val restoredConf = ssc.conf
219+
assert(restoredConf.get("spark.driver.host") === "localhost")
220+
assert(restoredConf.get("spark.driver.port") === "9999")
221+
ssc.stop()
222+
223+
// If spark.driver.host and spark.driver.host is not set in system property, these two
224+
// parameters should not be presented in the newly recovered conf.
225+
conf.foreach(kv => System.clearProperty(kv._1))
226+
val newCpConf1 = newCp.createSparkConf()
227+
assert(!newCpConf1.contains("spark.driver.host"))
228+
assert(!newCpConf1.contains("spark.driver.port"))
229+
230+
// Spark itself will dispatch a random, not-used port for spark.driver.port if it is not set
231+
// explicitly.
232+
ssc = new StreamingContext(null, newCp, null)
233+
val restoredConf1 = ssc.conf
234+
assert(restoredConf1.get("spark.driver.host") === "localhost")
235+
assert(restoredConf1.get("spark.driver.port") !== "9999")
236+
}
194237

195-
// This tests whether the systm can recover from a master failure with simple
238+
// This tests whether the system can recover from a master failure with simple
196239
// non-stateful operations. This assumes as reliable, replayable input
197240
// source - TestInputDStream.
198241
test("recovery with map and reduceByKey operations") {

0 commit comments

Comments
 (0)