From 2c603ab96059ad87f1ca9fa975aa62050b9d626a Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 11 Jan 2017 15:43:34 -0800 Subject: [PATCH 1/6] Error messages when the driver container fails to start. --- .../spark/deploy/kubernetes/Client.scala | 68 +++++++++++++++++-- 1 file changed, 62 insertions(+), 6 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 230598d63bed..54394c258a6f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.kubernetes import java.io.File import java.security.SecureRandom -import java.util.concurrent.{Executors, TimeUnit} +import java.util.concurrent.{Executors, TimeoutException, TimeUnit} import javax.net.ssl.X509TrustManager import com.google.common.io.Files @@ -34,7 +34,7 @@ import scala.concurrent.ExecutionContext import scala.concurrent.duration.DurationInt import scala.util.Success -import org.apache.spark.{SPARK_VERSION, SparkConf} +import org.apache.spark.{SPARK_VERSION, SparkConf, SparkException} import org.apache.spark.deploy.rest.{AppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource} import org.apache.spark.deploy.rest.kubernetes._ import org.apache.spark.internal.Logging @@ -130,8 +130,8 @@ private[spark] class Client( val podWatcher = new Watcher[Pod] { override def eventReceived(action: Action, t: Pod): Unit = { if ((action == Action.ADDED || action == Action.MODIFIED) - && t.getStatus.getPhase == "Running" - && !submitCompletedFuture.isDone) { + && t.getStatus.getPhase == "Running" + && !submitCompletedFuture.isDone) { t.getStatus .getContainerStatuses .asScala @@ -216,8 +216,64 @@ private[spark] class Client( .endContainer() .endSpec() .done() - submitCompletedFuture.get(30, TimeUnit.SECONDS) - } + try { + submitCompletedFuture.get(30, TimeUnit.SECONDS) + } catch { + case e: TimeoutException => + val driverPod = try { + kubernetesClient.pods().withName(kubernetesAppId).get() + } catch { + case throwable: Throwable => + logError("Timed out while waiting for the driver pod to start, but the driver" + + " pod could not be found.", throwable) + throw new SparkException("Timed out while waiting for the driver pod to start." + + " Unfortunately, in attempting to fetch the latest state of the pod, another" + + " error was thrown. Check the logs for the error that was thrown in looking" + + " up the driver pod.", e) + } + val topLevelMessage = s"The driver pod with name ${driverPod.getMetadata.getName}" + + s" in namespace ${driverPod.getMetadata.getNamespace} exited with status Failed." + val podMessage = if (driverPod.getStatus.getMessage != null) { + s"Latest message from the pod is: ${driverPod.getStatus.getMessage}" + } else { + "The pod had no final message." + } + val failedDriverContainerStatusString = driverPod.getStatus + .getContainerStatuses + .asScala + .find(_.getName == DRIVER_LAUNCHER_CONTAINER_NAME) + .map(status => { + val lastState = status.getState + val containerStatusMessage = if (lastState.getRunning != null) { + "Container last state: Running\n" + + s"Container started at: ${lastState.getRunning.getStartedAt}" + } else if (lastState.getWaiting != null) { + "Container last state: Waiting\n" + + s"Container wait reason: ${lastState.getWaiting.getReason}\n" + + s"Container message: ${lastState.getWaiting.getMessage}\n" + } else if (lastState.getTerminated != null) { + "Container last state: Terminated\n" + + s"Container started at: ${lastState.getTerminated.getStartedAt}\n" + + s"Container finished at: ${lastState.getTerminated.getFinishedAt}\n" + + s"Container exit reason: ${lastState.getTerminated.getReason}\n" + + s"Container exit code: ${lastState.getTerminated.getExitCode}\n" + + s"Container message: ${lastState.getTerminated.getMessage}" + } else { + "Container last state: Unknown" + } + s"Driver container final state:\n$containerStatusMessage" + }).getOrElse("The driver container wasn't found in the pod.") + val finalErrorMessage = s"$topLevelMessage\n" + + s"$podMessage\n\n$failedDriverContainerStatusString" + try { + kubernetesClient.pods.delete(driverPod) + } catch { + case throwable: Throwable => + logError("Failed to delete driver pod after it failed to run.", throwable) + } + throw new SparkException(finalErrorMessage, e) + } + } Utils.tryWithResource(kubernetesClient .pods() From 99df5933b9b0c972e7b3aea97b2ec0a171f0ce61 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 11 Jan 2017 16:01:55 -0800 Subject: [PATCH 2/6] Fix messages a bit --- .../spark/deploy/kubernetes/Client.scala | 42 +++++++++++-------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 54394c258a6f..28a840bef0ea 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -232,8 +232,13 @@ private[spark] class Client( " up the driver pod.", e) } val topLevelMessage = s"The driver pod with name ${driverPod.getMetadata.getName}" + - s" in namespace ${driverPod.getMetadata.getNamespace} exited with status Failed." - val podMessage = if (driverPod.getStatus.getMessage != null) { + s" in namespace ${driverPod.getMetadata.getNamespace} was not ready in 30 seconds." + val podStatusPhase = if (driverPod.getStatus.getPhase != null) { + s"Latest phase from the pod is: ${driverPod.getStatus.getPhase}" + } else { + "The pod had no final phase." + } + val podStatusMessage = if (driverPod.getStatus.getMessage != null) { s"Latest message from the pod is: ${driverPod.getStatus.getMessage}" } else { "The pod had no final message." @@ -244,27 +249,28 @@ private[spark] class Client( .find(_.getName == DRIVER_LAUNCHER_CONTAINER_NAME) .map(status => { val lastState = status.getState - val containerStatusMessage = if (lastState.getRunning != null) { - "Container last state: Running\n" + - s"Container started at: ${lastState.getRunning.getStartedAt}" + if (lastState.getRunning != null) { + "Driver container last state: Running\n" + + s"Driver container started at: ${lastState.getRunning.getStartedAt}" } else if (lastState.getWaiting != null) { - "Container last state: Waiting\n" + - s"Container wait reason: ${lastState.getWaiting.getReason}\n" + - s"Container message: ${lastState.getWaiting.getMessage}\n" + "Driver container last state: Waiting\n" + + s"Driver container wait reason: ${lastState.getWaiting.getReason}\n" + + s"Driver container message: ${lastState.getWaiting.getMessage}\n" } else if (lastState.getTerminated != null) { - "Container last state: Terminated\n" + - s"Container started at: ${lastState.getTerminated.getStartedAt}\n" + - s"Container finished at: ${lastState.getTerminated.getFinishedAt}\n" + - s"Container exit reason: ${lastState.getTerminated.getReason}\n" + - s"Container exit code: ${lastState.getTerminated.getExitCode}\n" + - s"Container message: ${lastState.getTerminated.getMessage}" + "Driver container last state: Terminated\n" + + s"Driver container started at: ${lastState.getTerminated.getStartedAt}\n" + + s"Driver container finished at: ${lastState.getTerminated.getFinishedAt}\n" + + s"Driver container exit reason: ${lastState.getTerminated.getReason}\n" + + s"Driver container exit code: ${lastState.getTerminated.getExitCode}\n" + + s"Driver container message: ${lastState.getTerminated.getMessage}" } else { - "Container last state: Unknown" + "Driver container last state: Unknown" } - s"Driver container final state:\n$containerStatusMessage" - }).getOrElse("The driver container wasn't found in the pod.") + }).getOrElse("The driver container wasn't found in the pod; expected to find container" + + s" with name $DRIVER_LAUNCHER_CONTAINER_NAME") val finalErrorMessage = s"$topLevelMessage\n" + - s"$podMessage\n\n$failedDriverContainerStatusString" + s"$podStatusPhase\n" + + s"$podStatusMessage\n\n$failedDriverContainerStatusString" try { kubernetesClient.pods.delete(driverPod) } catch { From 33430eab764fbea2c0a5c08c69d13ce3313e2e4e Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 11 Jan 2017 17:07:49 -0800 Subject: [PATCH 3/6] Use timeout constant --- .../spark/deploy/kubernetes/Client.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 28a840bef0ea..c24ebc0d3c3f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -217,22 +217,24 @@ private[spark] class Client( .endSpec() .done() try { - submitCompletedFuture.get(30, TimeUnit.SECONDS) + submitCompletedFuture.get(LAUNCH_TIMEOUT_SECONDS, TimeUnit.SECONDS) } catch { case e: TimeoutException => val driverPod = try { kubernetesClient.pods().withName(kubernetesAppId).get() } catch { case throwable: Throwable => - logError("Timed out while waiting for the driver pod to start, but the driver" + - " pod could not be found.", throwable) - throw new SparkException("Timed out while waiting for the driver pod to start." + - " Unfortunately, in attempting to fetch the latest state of the pod, another" + - " error was thrown. Check the logs for the error that was thrown in looking" + - " up the driver pod.", e) + logError(s"Timed out while waiting $LAUNCH_TIMEOUT_SECONDS seconds for the" + + " driver pod to start, but an error occurred while fetching the driver" + + " pod's details.", throwable) + throw new SparkException(s"Timed out while waiting $LAUNCH_TIMEOUT_SECONDS" + + " seconds for the driver pod to start. Unfortunately, in attempting to fetch" + + " the latest state of the pod, another error was thrown. Check the logs for" + + " the error that was thrown in looking up the driver pod.", e) } val topLevelMessage = s"The driver pod with name ${driverPod.getMetadata.getName}" + - s" in namespace ${driverPod.getMetadata.getNamespace} was not ready in 30 seconds." + s" in namespace ${driverPod.getMetadata.getNamespace} was not ready in" + + s" $LAUNCH_TIMEOUT_SECONDS seconds." val podStatusPhase = if (driverPod.getStatus.getPhase != null) { s"Latest phase from the pod is: ${driverPod.getStatus.getPhase}" } else { @@ -271,6 +273,7 @@ private[spark] class Client( val finalErrorMessage = s"$topLevelMessage\n" + s"$podStatusPhase\n" + s"$podStatusMessage\n\n$failedDriverContainerStatusString" + logError(finalErrorMessage, e) try { kubernetesClient.pods.delete(driverPod) } catch { @@ -400,6 +403,7 @@ private object Client { private val DRIVER_LAUNCHER_CONTAINER_NAME = "spark-kubernetes-driver-launcher" private val SECURE_RANDOM = new SecureRandom() private val SPARK_SUBMISSION_SECRET_BASE_DIR = "/var/run/secrets/spark-submission" + private val LAUNCH_TIMEOUT_SECONDS = 30 def main(args: Array[String]): Unit = { require(args.length >= 2, s"Too few arguments. Usage: ${getClass.getName} " + From 26bca0992a6d43852c33a3aefbd75426e8e1966b Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 11 Jan 2017 17:22:27 -0800 Subject: [PATCH 4/6] Delete the pod if it fails for any reason (not just timeout) --- .../spark/deploy/kubernetes/Client.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index c24ebc0d3c3f..760719ab8a9d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -216,6 +216,7 @@ private[spark] class Client( .endContainer() .endSpec() .done() + var submitSucceeded = false try { submitCompletedFuture.get(LAUNCH_TIMEOUT_SECONDS, TimeUnit.SECONDS) } catch { @@ -268,19 +269,22 @@ private[spark] class Client( } else { "Driver container last state: Unknown" } - }).getOrElse("The driver container wasn't found in the pod; expected to find container" + - s" with name $DRIVER_LAUNCHER_CONTAINER_NAME") + }).getOrElse("The driver container wasn't found in the pod; expected to find" + + " container with name $DRIVER_LAUNCHER_CONTAINER_NAME") val finalErrorMessage = s"$topLevelMessage\n" + s"$podStatusPhase\n" + s"$podStatusMessage\n\n$failedDriverContainerStatusString" logError(finalErrorMessage, e) - try { - kubernetesClient.pods.delete(driverPod) - } catch { - case throwable: Throwable => - logError("Failed to delete driver pod after it failed to run.", throwable) - } throw new SparkException(finalErrorMessage, e) + } finally { + if (!submitSucceeded) { + try { + kubernetesClient.pods.withName(kubernetesAppId).delete + } catch { + case throwable: Throwable => + logError("Failed to delete driver pod after it failed to run.", throwable) + } + } } } From 7cac8ddaec9fb4de36d8c471f7eec9346547270f Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 11 Jan 2017 17:23:57 -0800 Subject: [PATCH 5/6] Actually set submit succeeded --- .../main/scala/org/apache/spark/deploy/kubernetes/Client.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 760719ab8a9d..833b796bb918 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -219,6 +219,7 @@ private[spark] class Client( var submitSucceeded = false try { submitCompletedFuture.get(LAUNCH_TIMEOUT_SECONDS, TimeUnit.SECONDS) + submitSucceeded = true } catch { case e: TimeoutException => val driverPod = try { From 2346eac073f7893504d7cde7a33cf765cab84cd0 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 12 Jan 2017 17:54:06 -0800 Subject: [PATCH 6/6] Fix typo --- .../main/scala/org/apache/spark/deploy/kubernetes/Client.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 833b796bb918..6d7de973a52c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -271,7 +271,7 @@ private[spark] class Client( "Driver container last state: Unknown" } }).getOrElse("The driver container wasn't found in the pod; expected to find" + - " container with name $DRIVER_LAUNCHER_CONTAINER_NAME") + s" container with name $DRIVER_LAUNCHER_CONTAINER_NAME") val finalErrorMessage = s"$topLevelMessage\n" + s"$podStatusPhase\n" + s"$podStatusMessage\n\n$failedDriverContainerStatusString"