Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.mutable
import scala.util.{Failure, Success}
import scala.util.control.NonFatal

import org.apache.spark._
import org.apache.spark.TaskState.TaskState
Expand Down Expand Up @@ -64,8 +65,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case Success(msg) =>
// Always receive `true`. Just ignore it
case Failure(e) =>
logError(s"Cannot register with driver: $driverUrl", e)
exitExecutor(1)
exitExecutor(1, s"Cannot register with driver: $driverUrl", e)
}(ThreadUtils.sameThread)
}

Expand All @@ -78,16 +78,19 @@ private[spark] class CoarseGrainedExecutorBackend(
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}

case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
exitExecutor(1)
exitExecutor(1, "Slave registration failed: " + message)

case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
exitExecutor(1)
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
Expand All @@ -97,8 +100,7 @@ private[spark] class CoarseGrainedExecutorBackend(

case KillTask(taskId, _, interruptThread) =>
if (executor == null) {
logError("Received KillTask command but executor was null")
exitExecutor(1)
exitExecutor(1, "Received KillTask command but executor was null")
} else {
executor.killTask(taskId, interruptThread)
}
Expand Down Expand Up @@ -127,8 +129,7 @@ private[spark] class CoarseGrainedExecutorBackend(
if (stopping.get()) {
logInfo(s"Driver from $remoteAddress disconnected during shutdown")
} else if (driver.exists(_.address == remoteAddress)) {
logError(s"Driver $remoteAddress disassociated! Shutting down.")
exitExecutor(1)
exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.")
} else {
logWarning(s"An unknown ($remoteAddress) driver disconnected.")
}
Expand All @@ -147,7 +148,14 @@ private[spark] class CoarseGrainedExecutorBackend(
* executor exits differently. For e.g. when an executor goes down,
* back-end may not want to take the parent process down.
*/
protected def exitExecutor(code: Int): Unit = System.exit(code)
protected def exitExecutor(code: Int, reason: String, throwable: Throwable = null) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hbhanawat FYI, this is going to change exitExecutor.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

if (throwable != null) {
logError(reason, throwable)
} else {
logError(reason)
}
System.exit(code)
}
}

private[spark] object CoarseGrainedExecutorBackend extends Logging {
Expand Down