@@ -18,7 +18,7 @@ package org.apache.spark.deploy.kubernetes
1818
1919import java .io .File
2020import java .security .SecureRandom
21- import java .util .concurrent .{Executors , TimeUnit }
21+ import java .util .concurrent .{Executors , TimeoutException , TimeUnit }
2222import javax .net .ssl .X509TrustManager
2323
2424import com .google .common .io .Files
@@ -34,7 +34,7 @@ import scala.concurrent.ExecutionContext
3434import scala .concurrent .duration .DurationInt
3535import scala .util .Success
3636
37- import org .apache .spark .{SPARK_VERSION , SparkConf }
37+ import org .apache .spark .{SPARK_VERSION , SparkConf , SparkException }
3838import org .apache .spark .deploy .rest .{AppResource , KubernetesCreateSubmissionRequest , RemoteAppResource , TarGzippedData , UploadedAppResource }
3939import org .apache .spark .deploy .rest .kubernetes ._
4040import org .apache .spark .internal .Logging
@@ -130,8 +130,8 @@ private[spark] class Client(
130130 val podWatcher = new Watcher [Pod ] {
131131 override def eventReceived (action : Action , t : Pod ): Unit = {
132132 if ((action == Action .ADDED || action == Action .MODIFIED )
133- && t.getStatus.getPhase == " Running"
134- && ! submitCompletedFuture.isDone) {
133+ && t.getStatus.getPhase == " Running"
134+ && ! submitCompletedFuture.isDone) {
135135 t.getStatus
136136 .getContainerStatuses
137137 .asScala
@@ -216,8 +216,78 @@ private[spark] class Client(
216216 .endContainer()
217217 .endSpec()
218218 .done()
219- submitCompletedFuture.get(30 , TimeUnit .SECONDS )
220- }
219+ var submitSucceeded = false
220+ try {
221+ submitCompletedFuture.get(LAUNCH_TIMEOUT_SECONDS , TimeUnit .SECONDS )
222+ submitSucceeded = true
223+ } catch {
224+ case e : TimeoutException =>
225+ val driverPod = try {
226+ kubernetesClient.pods().withName(kubernetesAppId).get()
227+ } catch {
228+ case throwable : Throwable =>
229+ logError(s " Timed out while waiting $LAUNCH_TIMEOUT_SECONDS seconds for the " +
230+ " driver pod to start, but an error occurred while fetching the driver" +
231+ " pod's details." , throwable)
232+ throw new SparkException (s " Timed out while waiting $LAUNCH_TIMEOUT_SECONDS" +
233+ " seconds for the driver pod to start. Unfortunately, in attempting to fetch" +
234+ " the latest state of the pod, another error was thrown. Check the logs for" +
235+ " the error that was thrown in looking up the driver pod." , e)
236+ }
237+ val topLevelMessage = s " The driver pod with name ${driverPod.getMetadata.getName}" +
238+ s " in namespace ${driverPod.getMetadata.getNamespace} was not ready in " +
239+ s " $LAUNCH_TIMEOUT_SECONDS seconds. "
240+ val podStatusPhase = if (driverPod.getStatus.getPhase != null ) {
241+ s " Latest phase from the pod is: ${driverPod.getStatus.getPhase}"
242+ } else {
243+ " The pod had no final phase."
244+ }
245+ val podStatusMessage = if (driverPod.getStatus.getMessage != null ) {
246+ s " Latest message from the pod is: ${driverPod.getStatus.getMessage}"
247+ } else {
248+ " The pod had no final message."
249+ }
250+ val failedDriverContainerStatusString = driverPod.getStatus
251+ .getContainerStatuses
252+ .asScala
253+ .find(_.getName == DRIVER_LAUNCHER_CONTAINER_NAME )
254+ .map(status => {
255+ val lastState = status.getState
256+ if (lastState.getRunning != null ) {
257+ " Driver container last state: Running\n " +
258+ s " Driver container started at: ${lastState.getRunning.getStartedAt}"
259+ } else if (lastState.getWaiting != null ) {
260+ " Driver container last state: Waiting\n " +
261+ s " Driver container wait reason: ${lastState.getWaiting.getReason}\n " +
262+ s " Driver container message: ${lastState.getWaiting.getMessage}\n "
263+ } else if (lastState.getTerminated != null ) {
264+ " Driver container last state: Terminated\n " +
265+ s " Driver container started at: ${lastState.getTerminated.getStartedAt}\n " +
266+ s " Driver container finished at: ${lastState.getTerminated.getFinishedAt}\n " +
267+ s " Driver container exit reason: ${lastState.getTerminated.getReason}\n " +
268+ s " Driver container exit code: ${lastState.getTerminated.getExitCode}\n " +
269+ s " Driver container message: ${lastState.getTerminated.getMessage}"
270+ } else {
271+ " Driver container last state: Unknown"
272+ }
273+ }).getOrElse(" The driver container wasn't found in the pod; expected to find" +
274+ s " container with name $DRIVER_LAUNCHER_CONTAINER_NAME" )
275+ val finalErrorMessage = s " $topLevelMessage\n " +
276+ s " $podStatusPhase\n " +
277+ s " $podStatusMessage\n\n $failedDriverContainerStatusString"
278+ logError(finalErrorMessage, e)
279+ throw new SparkException (finalErrorMessage, e)
280+ } finally {
281+ if (! submitSucceeded) {
282+ try {
283+ kubernetesClient.pods.withName(kubernetesAppId).delete
284+ } catch {
285+ case throwable : Throwable =>
286+ logError(" Failed to delete driver pod after it failed to run." , throwable)
287+ }
288+ }
289+ }
290+ }
221291
222292 Utils .tryWithResource(kubernetesClient
223293 .pods()
@@ -338,6 +408,7 @@ private object Client {
338408 private val DRIVER_LAUNCHER_CONTAINER_NAME = " spark-kubernetes-driver-launcher"
339409 private val SECURE_RANDOM = new SecureRandom ()
340410 private val SPARK_SUBMISSION_SECRET_BASE_DIR = " /var/run/secrets/spark-submission"
411+ private val LAUNCH_TIMEOUT_SECONDS = 30
341412
342413 def main (args : Array [String ]): Unit = {
343414 require(args.length >= 2 , s " Too few arguments. Usage: ${getClass.getName} <mainAppResource> " +
0 commit comments