@@ -20,6 +20,7 @@ import java.io.{File, FileOutputStream, StringReader}
2020import java .net .URI
2121import java .nio .file .Paths
2222import java .util .concurrent .CountDownLatch
23+ import java .util .concurrent .atomic .AtomicInteger
2324import javax .servlet .http .{HttpServletRequest , HttpServletResponse }
2425
2526import com .google .common .base .Charsets
@@ -101,6 +102,7 @@ private[spark] class KubernetesSparkRestServer(
101102 conf : SparkConf ,
102103 expectedApplicationSecret : Array [Byte ],
103104 shutdownLock : CountDownLatch ,
105+ exitCode : AtomicInteger ,
104106 sslOptions : SSLOptions = new SSLOptions )
105107 extends RestSubmissionServer (host, port, conf, sslOptions) {
106108
@@ -238,7 +240,8 @@ private[spark] class KubernetesSparkRestServer(
238240 })
239241 waitForProcessCompleteExecutor.submit(new Runnable {
240242 override def run (): Unit = {
241- process.waitFor
243+ // set the REST service's exit code to the exit code of the driver subprocess
244+ exitCode.set(process.waitFor)
242245 SERVLET_LOCK .synchronized {
243246 logInfo(" Spark application complete. Shutting down submission server..." )
244247 KubernetesSparkRestServer .this .stop
@@ -355,12 +358,14 @@ private[spark] object KubernetesSparkRestServer {
355358 }
356359 val secretBytes = Files .toByteArray(secretFile)
357360 val sparkConf = new SparkConf (true )
361+ val exitCode = new AtomicInteger (0 )
358362 val server = new KubernetesSparkRestServer (
359363 parsedArguments.host.get,
360364 parsedArguments.port.get,
361365 sparkConf,
362366 secretBytes,
363367 barrier,
368+ exitCode,
364369 sslOptions)
365370 server.start()
366371 ShutdownHookManager .addShutdownHook(() => {
@@ -371,6 +376,7 @@ private[spark] object KubernetesSparkRestServer {
371376 }
372377 })
373378 barrier.await()
379+ System .exit(exitCode.get())
374380 }
375381}
376382
0 commit comments