Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,10 @@ class SparkException(message: String, cause: Throwable)
*/
private[spark] class SparkDriverExecutionException(cause: Throwable)
extends SparkException("Execution error", cause)

/**
* Exception thrown when the main user code is run as a child process (e.g. pyspark) and we want
* the parent SparkSubmit process to exit with the same exit code.
*/
private[spark] case class SparkUserAppException(exitCode: Int)
extends SparkException(s"User application exited with $exitCode")
23 changes: 19 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.util.Try

import org.apache.spark.SparkUserAppException
import org.apache.spark.api.python.PythonUtils
import org.apache.spark.util.{RedirectThread, Utils}

Expand All @@ -46,7 +47,14 @@ object PythonRunner {
// Launch a Py4J gateway server for the process to connect to; this will let it see our
// Java system properties and such
val gatewayServer = new py4j.GatewayServer(null, 0)
gatewayServer.start()
val thread = new Thread(new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions {
gatewayServer.start()
}
})
thread.setName("py4j-gateway")
thread.setDaemon(true)
thread.start()

// Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
// python directories in SPARK_HOME (if set), and any files in the pyFiles argument
Expand All @@ -64,11 +72,18 @@ object PythonRunner {
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
val process = builder.start()
try {
val process = builder.start()

new RedirectThread(process.getInputStream, System.out, "redirect output").start()
new RedirectThread(process.getInputStream, System.out, "redirect output").start()

System.exit(process.waitFor())
val exitCode = process.waitFor()
if (exitCode != 0) {
throw new SparkUserAppException(exitCode)
}
} finally {
gatewayServer.shutdown()
}
}

/**
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}

import org.apache.spark.{SparkUserAppException, SPARK_VERSION}
import org.apache.spark.api.r.RUtils
import org.apache.spark.SPARK_VERSION
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}

Expand Down Expand Up @@ -665,7 +665,13 @@ object SparkSubmit {
mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
throw findCause(t)
findCause(t) match {
case SparkUserAppException(exitCode) =>
System.exit(exitCode)

case t: Throwable =>
throw t
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration

import org.apache.spark.rpc._
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.SparkException
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv,
SparkException, SparkUserAppException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
Expand Down Expand Up @@ -529,6 +529,10 @@ private[spark] class ApplicationMaster(
e.getCause match {
case _: InterruptedException =>
// Reporter thread can interrupt to stop user class
case SparkUserAppException(exitCode) =>
val msg = s"User application exited with status $exitCode"
logError(msg)
finish(FinalApplicationStatus.FAILED, exitCode, msg)
case cause: Throwable =>
logError("User class threw exception: " + cause, cause)
finish(FinalApplicationStatus.FAILED,
Expand Down