@@ -193,7 +193,7 @@ class CheckpointSuite extends TestSuiteBase {
193193
194194 // This tests if "spark.driver.host" and "spark.driver.port" is set by user, can be recovered
195195 // with correct value.
196- test(" correctly recover spark.driver.[host|port] from checkpoint" ) {
196+ test(" get correct spark.driver.[host|port] from checkpoint" ) {
197197 val conf = Map (" spark.driver.host" -> " localhost" , " spark.driver.port" -> " 9999" )
198198 conf.foreach(kv => System .setProperty(kv._1, kv._2))
199199 ssc = new StreamingContext (master, framework, batchDuration)
@@ -218,9 +218,24 @@ class CheckpointSuite extends TestSuiteBase {
218218 val restoredConf = ssc.conf
219219 assert(restoredConf.get(" spark.driver.host" ) === " localhost" )
220220 assert(restoredConf.get(" spark.driver.port" ) === " 9999" )
221+ ssc.stop()
222+
223+ // If spark.driver.host and spark.driver.host is not set in system property, these two
224+ // parameters should not be presented in the newly recovered conf.
225+ conf.foreach(kv => System .clearProperty(kv._1))
226+ val newCpConf1 = newCp.createSparkConf()
227+ assert(! newCpConf1.contains(" spark.driver.host" ))
228+ assert(! newCpConf1.contains(" spark.driver.port" ))
229+
230+ // Spark itself will dispatch a random, not-used port for spark.driver.port if it is not set
231+ // explicitly.
232+ ssc = new StreamingContext (null , newCp, null )
233+ val restoredConf1 = ssc.conf
234+ assert(restoredConf1.get(" spark.driver.host" ) === " localhost" )
235+ assert(restoredConf1.get(" spark.driver.port" ) !== " 9999" )
221236 }
222237
223- // This tests whether the systm can recover from a master failure with simple
238+ // This tests whether the system can recover from a master failure with simple
224239 // non-stateful operations. This assumes as reliable, replayable input
225240 // source - TestInputDStream.
226241 test(" recovery with map and reduceByKey operations" ) {
0 commit comments