Skip to content

Commit bbc1c9c

Browse files
committed
Fix checkpointing doesn't retain driver port issue
1 parent 14935d8 commit bbc1c9c

File tree

3 files changed

+43
-2
lines changed

3 files changed

+43
-2
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
9494
// contains a map from hostname to a list of input format splits on the host.
9595
private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()
9696

97+
// This is used for Spark Streaming to check whether driver host and port are set by user,
98+
// if these two configurations are set by user, so the recovery mechanism should not remove this.
99+
private[spark] val isDriverHostSetByUser = config.contains("spark.driver.host")
100+
private[spark] val isDriverPortSetByUser = config.contains("spark.driver.port")
101+
97102
val startTime = System.currentTimeMillis()
98103

99104
private val stopped: AtomicBoolean = new AtomicBoolean(false)

streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
4141
val checkpointDuration = ssc.checkpointDuration
4242
val pendingTimes = ssc.scheduler.getPendingTimes().toArray
4343
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
44-
val sparkConfPairs = ssc.conf.getAll
44+
val sparkConfPairs = ssc.conf.getAll.filterNot { kv =>
45+
(!ssc.sc.isDriverHostSetByUser && kv._1 == "spark.driver.host") ||
46+
(!ssc.sc.isDriverPortSetByUser && kv._1 == "spark.driver.port")
4547

4648
def createSparkConf(): SparkConf = {
4749

@@ -50,7 +52,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
5052
val propertiesToReload = List(
5153
"spark.master",
5254
"spark.yarn.keytab",
53-
"spark.yarn.principal")
55+
"spark.yarn.principal",
56+
"spark.driver.host",
57+
"spark.driver.port")
5458

5559
val newSparkConf = new SparkConf(loadDefaults = false).setAll(sparkConfPairs)
5660
.remove("spark.driver.host")

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,38 @@ class CheckpointSuite extends TestSuiteBase {
191191
}
192192
}
193193

194+
// This tests if "spark.driver.host" and "spark.driver.port" is set by user, can be recovered
195+
// with correct value.
196+
test("correctly recover spark.driver.[host|port] from checkpoint") {
197+
val conf = Map("spark.driver.host" -> "localhost", "spark.driver.port" -> "9999")
198+
conf.foreach(kv => System.setProperty(kv._1, kv._2))
199+
ssc = new StreamingContext(master, framework, batchDuration)
200+
val originalConf = ssc.conf
201+
assert(ssc.sc.isDriverHostSetByUser === true)
202+
assert(ssc.sc.isDriverPortSetByUser === true)
203+
assert(originalConf.get("spark.driver.host") === "localhost")
204+
assert(originalConf.get("spark.driver.port") === "9999")
205+
206+
val cp = new Checkpoint(ssc, Time(1000))
207+
ssc.stop()
208+
209+
// Serialize/deserialize to simulate write to storage and reading it back
210+
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
211+
212+
val newCpConf = newCp.sparkConf
213+
assert(newCpConf.contains("spark.driver.host"))
214+
assert(newCpConf.contains("spark.driver.port"))
215+
assert(originalConf.get("spark.driver.host") === "localhost")
216+
assert(originalConf.get("spark.driver.port") === "9999")
217+
218+
// Check if all the parameters have been restored
219+
ssc = new StreamingContext(null, newCp, null)
220+
val restoredConf = ssc.conf
221+
assert(ssc.sc.isDriverHostSetByUser === true)
222+
assert(ssc.sc.isDriverPortSetByUser === true)
223+
assert(restoredConf.get("spark.driver.host") === "localhost")
224+
assert(restoredConf.get("spark.driver.port") === "9999")
225+
}
194226

195227
// This tests whether the systm can recover from a master failure with simple
196228
// non-stateful operations. This assumes as reliable, replayable input

0 commit comments

Comments
 (0)