Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.deploy.kubernetes

import java.io.File
import java.security.SecureRandom
import java.util.concurrent.{Executors, TimeUnit}
import java.util.concurrent.{Executors, TimeoutException, TimeUnit}
import javax.net.ssl.X509TrustManager

import com.google.common.io.Files
Expand All @@ -34,7 +34,7 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
import scala.util.Success

import org.apache.spark.{SPARK_VERSION, SparkConf}
import org.apache.spark.{SPARK_VERSION, SparkConf, SparkException}
import org.apache.spark.deploy.rest.{AppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource}
import org.apache.spark.deploy.rest.kubernetes._
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -130,8 +130,8 @@ private[spark] class Client(
val podWatcher = new Watcher[Pod] {
override def eventReceived(action: Action, t: Pod): Unit = {
if ((action == Action.ADDED || action == Action.MODIFIED)
&& t.getStatus.getPhase == "Running"
&& !submitCompletedFuture.isDone) {
&& t.getStatus.getPhase == "Running"
&& !submitCompletedFuture.isDone) {
t.getStatus
.getContainerStatuses
.asScala
Expand Down Expand Up @@ -216,8 +216,78 @@ private[spark] class Client(
.endContainer()
.endSpec()
.done()
submitCompletedFuture.get(30, TimeUnit.SECONDS)
}
var submitSucceeded = false
try {
submitCompletedFuture.get(LAUNCH_TIMEOUT_SECONDS, TimeUnit.SECONDS)
submitSucceeded = true
} catch {
case e: TimeoutException =>
val driverPod = try {
kubernetesClient.pods().withName(kubernetesAppId).get()
} catch {
case throwable: Throwable =>
logError(s"Timed out while waiting $LAUNCH_TIMEOUT_SECONDS seconds for the" +
" driver pod to start, but an error occurred while fetching the driver" +
" pod's details.", throwable)
throw new SparkException(s"Timed out while waiting $LAUNCH_TIMEOUT_SECONDS" +
" seconds for the driver pod to start. Unfortunately, in attempting to fetch" +
" the latest state of the pod, another error was thrown. Check the logs for" +
" the error that was thrown in looking up the driver pod.", e)
}
val topLevelMessage = s"The driver pod with name ${driverPod.getMetadata.getName}" +
s" in namespace ${driverPod.getMetadata.getNamespace} was not ready in" +
s" $LAUNCH_TIMEOUT_SECONDS seconds."
val podStatusPhase = if (driverPod.getStatus.getPhase != null) {
s"Latest phase from the pod is: ${driverPod.getStatus.getPhase}"
} else {
"The pod had no final phase."
}
val podStatusMessage = if (driverPod.getStatus.getMessage != null) {
s"Latest message from the pod is: ${driverPod.getStatus.getMessage}"
} else {
"The pod had no final message."
}
val failedDriverContainerStatusString = driverPod.getStatus
.getContainerStatuses
.asScala
.find(_.getName == DRIVER_LAUNCHER_CONTAINER_NAME)
.map(status => {
val lastState = status.getState
if (lastState.getRunning != null) {
"Driver container last state: Running\n" +
s"Driver container started at: ${lastState.getRunning.getStartedAt}"
} else if (lastState.getWaiting != null) {
"Driver container last state: Waiting\n" +
s"Driver container wait reason: ${lastState.getWaiting.getReason}\n" +
s"Driver container message: ${lastState.getWaiting.getMessage}\n"
} else if (lastState.getTerminated != null) {
"Driver container last state: Terminated\n" +
s"Driver container started at: ${lastState.getTerminated.getStartedAt}\n" +
s"Driver container finished at: ${lastState.getTerminated.getFinishedAt}\n" +
s"Driver container exit reason: ${lastState.getTerminated.getReason}\n" +
s"Driver container exit code: ${lastState.getTerminated.getExitCode}\n" +
s"Driver container message: ${lastState.getTerminated.getMessage}"
} else {
"Driver container last state: Unknown"
}
}).getOrElse("The driver container wasn't found in the pod; expected to find" +
s" container with name $DRIVER_LAUNCHER_CONTAINER_NAME")
val finalErrorMessage = s"$topLevelMessage\n" +
s"$podStatusPhase\n" +
s"$podStatusMessage\n\n$failedDriverContainerStatusString"
logError(finalErrorMessage, e)
throw new SparkException(finalErrorMessage, e)
} finally {
if (!submitSucceeded) {
try {
kubernetesClient.pods.withName(kubernetesAppId).delete
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Going to repeat my earlier comment as I wrote some changes that rendered the diff out of date)

On the other hand, I kind of don't want to delete the pod at all, as letting it stay around gives users the ability to use kubectl to inspect the state of the pod themselves. kubectl describe pod gives a lot more depth to what's going on than we should be providing in error logs. On the other hand, leaking failed pods seems sub-par... What does everyone think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@foxish sorry for moving the comment here. Yes - having a way to keep the pod around for debug mode would be useful. I'll follow up on that separately.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

brainstorming some flag names for that:
--preserve-pods
--keep-driver-pod
--preserve-driver-pod

Would we want this flag to keep the pod around even if it launched and finished its job successfully? Maybe some folks will want to look at logs afterwards for debugging, though kubectl logs might preserve those anyway so no need to keep successful pods just for logs.

} catch {
case throwable: Throwable =>
logError("Failed to delete driver pod after it failed to run.", throwable)
}
}
}
}

Utils.tryWithResource(kubernetesClient
.pods()
Expand Down Expand Up @@ -338,6 +408,7 @@ private object Client {
private val DRIVER_LAUNCHER_CONTAINER_NAME = "spark-kubernetes-driver-launcher"
private val SECURE_RANDOM = new SecureRandom()
private val SPARK_SUBMISSION_SECRET_BASE_DIR = "/var/run/secrets/spark-submission"
private val LAUNCH_TIMEOUT_SECONDS = 30

def main(args: Array[String]): Unit = {
require(args.length >= 2, s"Too few arguments. Usage: ${getClass.getName} <mainAppResource>" +
Expand Down