From e999b21a722c5c379e6fa63a51324448636b884a Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Wed, 25 Jan 2017 18:32:52 -0800 Subject: [PATCH 01/13] Introduce blocking submit to kubernetes by default Two new configuration settings: - spark.kubernetes.submit.waitAppCompletion - spark.kubernetes.report.interval --- .../spark/deploy/kubernetes/Client.scala | 14 +++- .../deploy/kubernetes/PodStateMonitor.scala | 73 +++++++++++++++++++ .../spark/deploy/kubernetes/config.scala | 20 +++++ 3 files changed, 105 insertions(+), 2 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/PodStateMonitor.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index fed9334dbbab4..c9f3efd448dd7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -26,7 +26,7 @@ import com.google.common.base.Charsets import com.google.common.io.Files import com.google.common.util.concurrent.SettableFuture import io.fabric8.kubernetes.api.model._ -import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.{ConfigBuilder => KConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.apache.commons.codec.binary.Base64 import scala.collection.JavaConverters._ @@ -65,6 +65,8 @@ private[spark] class Client( private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT) private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT) + private val fireAndForget: Boolean = !sparkConf.get(WAIT_FOR_APP_COMPLETION); + private val secretBase64String = { val secretBytes = new Array[Byte](128) SECURE_RANDOM.nextBytes(secretBytes) @@ -81,7 +83,7 @@ private[spark] class Client( def run(): Unit = { val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions() val parsedCustomLabels = parseCustomLabels(customLabels) - var k8ConfBuilder = new ConfigBuilder() + var k8ConfBuilder = new KConfigBuilder() .withApiVersion("v1") .withMasterUrl(master) .withNamespace(namespace) @@ -182,6 +184,14 @@ private[spark] class Client( } } } + + // poll for status + if (!fireAndForget) { + val reportInterval = sparkConf.get(REPORT_INTERVAL) + val finalState = new PodStateMonitor(kubernetesClient, kubernetesAppId, reportInterval) + .monitorToCompletion() + logInfo(s"Application $kubernetesAppId ended with final phase $finalState") + } } finally { Utils.tryLogNonFatalError { kubernetesClient.secrets().delete(submitServerSecret) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/PodStateMonitor.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/PodStateMonitor.scala new file mode 100644 index 0000000000000..bfce8db17acf1 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/PodStateMonitor.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.DefaultKubernetesClient + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging + +/** + * A monitor for a running Kubernetes application, logging on state change and interval. + * + * @param client Kubernetes client + * @param appId + * @param interval ms between each state request + */ +private[kubernetes] class PodStateMonitor(client: DefaultKubernetesClient, + appId: String, + interval: Long) extends Logging { + + /** + * Log the state of the application until it finishes, either successfully or due to a + * failure, logging status throughout and on every state change. + * + * When the application finishes, returns its final state, either "Succeeded" or "Failed". + */ + def monitorToCompletion(): String = { + + var previousPhase: String = null + + while (true) { + Thread.sleep(interval) + + val podState = requestCurrentPodState() + val phase = podState.getStatus().getPhase() + + // log a short message every interval, plus full details on every state change + logInfo(s"Application status for $appId (phase: $phase)") + if (previousPhase != phase) { + logInfo("Phase changed, new state: " + podState) + } + + // terminal state -- return + if (phase == "Succeeded" || phase == "Failed") { + return phase + } + + previousPhase = phase + } + + // Never reached, but keeps compiler happy + throw new SparkException("While loop is depleted! This should never happen...") + } + + private def requestCurrentPodState(): Pod = { + client.pods().withName(appId).get() + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 9b145370f87d6..81f77476e9046 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -174,4 +174,24 @@ package object config { .internal() .stringConf .createOptional + + private[spark] val WAIT_FOR_APP_COMPLETION = + ConfigBuilder( + "spark.kubernetes.submit.waitAppCompletion") + .doc( + """ + | In cluster mode, whether to wait for the application to finish before exiting the + | launcher process. + """.stripMargin) + .booleanConf + .createWithDefault(true) + + private[spark] val REPORT_INTERVAL = + ConfigBuilder("spark.kubernetes.report.interval") + .doc( + """ + | Interval between reports of the current app status in cluster mode. + """.stripMargin) + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") } From 3371b496d7f09a0c125040a0e13333bde7fe19b8 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Wed, 25 Jan 2017 23:34:07 -0800 Subject: [PATCH 02/13] Minor touchups --- .../org/apache/spark/deploy/kubernetes/Client.scala | 6 +++--- .../spark/deploy/kubernetes/PodStateMonitor.scala | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index c9f3efd448dd7..712d92d4a26ec 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -26,7 +26,7 @@ import com.google.common.base.Charsets import com.google.common.io.Files import com.google.common.util.concurrent.SettableFuture import io.fabric8.kubernetes.api.model._ -import io.fabric8.kubernetes.client.{ConfigBuilder => KConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.{ConfigBuilder => K8SConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.apache.commons.codec.binary.Base64 import scala.collection.JavaConverters._ @@ -65,7 +65,7 @@ private[spark] class Client( private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT) private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT) - private val fireAndForget: Boolean = !sparkConf.get(WAIT_FOR_APP_COMPLETION); + private val fireAndForget: Boolean = !sparkConf.get(WAIT_FOR_APP_COMPLETION) private val secretBase64String = { val secretBytes = new Array[Byte](128) @@ -83,7 +83,7 @@ private[spark] class Client( def run(): Unit = { val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions() val parsedCustomLabels = parseCustomLabels(customLabels) - var k8ConfBuilder = new KConfigBuilder() + var k8ConfBuilder = new K8SConfigBuilder() .withApiVersion("v1") .withMasterUrl(master) .withNamespace(namespace) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/PodStateMonitor.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/PodStateMonitor.scala index bfce8db17acf1..b95e40a7470ff 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/PodStateMonitor.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/PodStateMonitor.scala @@ -34,11 +34,11 @@ private[kubernetes] class PodStateMonitor(client: DefaultKubernetesClient, interval: Long) extends Logging { /** - * Log the state of the application until it finishes, either successfully or due to a - * failure, logging status throughout and on every state change. - * - * When the application finishes, returns its final state, either "Succeeded" or "Failed". - */ + * Log the state of the application until it finishes, either successfully or due to a + * failure, logging status throughout and on every state change. + * + * When the application finishes, returns its final state, either "Succeeded" or "Failed". + */ def monitorToCompletion(): String = { var previousPhase: String = null From 3f278f3949addd48e520e222e54d9b1601f501cb Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Thu, 26 Jan 2017 10:19:17 -0800 Subject: [PATCH 03/13] More succinct logging for pod state --- .../deploy/kubernetes/PodStateMonitor.scala | 40 +++++++++++++++++-- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/PodStateMonitor.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/PodStateMonitor.scala index b95e40a7470ff..83d04e2c4a7f1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/PodStateMonitor.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/PodStateMonitor.scala @@ -22,6 +22,8 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient import org.apache.spark.SparkException import org.apache.spark.internal.Logging +import scala.collection.JavaConverters._ + /** * A monitor for a running Kubernetes application, logging on state change and interval. * @@ -46,13 +48,13 @@ private[kubernetes] class PodStateMonitor(client: DefaultKubernetesClient, while (true) { Thread.sleep(interval) - val podState = requestCurrentPodState() - val phase = podState.getStatus().getPhase() + val pod = requestCurrentPodState() + val phase = pod.getStatus().getPhase() // log a short message every interval, plus full details on every state change logInfo(s"Application status for $appId (phase: $phase)") if (previousPhase != phase) { - logInfo("Phase changed, new state: " + podState) + logInfo("Phase changed, new state: " + formatPodState(pod)) } // terminal state -- return @@ -70,4 +72,36 @@ private[kubernetes] class PodStateMonitor(client: DefaultKubernetesClient, private def requestCurrentPodState(): Pod = { client.pods().withName(appId).get() } + + private def formatPodState(pod: Pod): String = { + + val details = Seq[(String, String)]( + // pod metadata + ("pod name", pod.getMetadata.getName()), + ("namespace", pod.getMetadata.getNamespace()), + ("labels", pod.getMetadata.getLabels().asScala.mkString(",")), + ("pod uid", pod.getMetadata.getUid), + ("creation time", pod.getMetadata.getCreationTimestamp()), + + // spec details + ("service account name", pod.getSpec.getServiceAccountName()), + ("volumes", pod.getSpec.getVolumes().asScala.map(_.getName).mkString(",")), + ("node name", pod.getSpec.getNodeName()), + + // status + ("start time", pod.getStatus.getStartTime), + ("container images", + pod.getStatus.getContainerStatuses() + .asScala + .map(_.getImage) + .mkString(",")), + ("phase", pod.getStatus.getPhase()) + ) + + // Use more loggable format if value is null or empty + details.map { case (k, v) => + val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A") + s"\n\t $k: $newValue" + }.mkString("") + } } From 4c55ea86d68854de8343309cb8b03235758dbac9 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Fri, 27 Jan 2017 00:20:58 -0800 Subject: [PATCH 04/13] Fix import order --- .../org/apache/spark/deploy/kubernetes/PodStateMonitor.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/PodStateMonitor.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/PodStateMonitor.scala index 83d04e2c4a7f1..c8b317bd7f2b8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/PodStateMonitor.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/PodStateMonitor.scala @@ -16,14 +16,14 @@ */ package org.apache.spark.deploy.kubernetes +import scala.collection.JavaConverters._ + import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.DefaultKubernetesClient import org.apache.spark.SparkException import org.apache.spark.internal.Logging -import scala.collection.JavaConverters._ - /** * A monitor for a running Kubernetes application, logging on state change and interval. * From 67ed06a33bbae135d36fc7172e9eb513c153802b Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Mon, 30 Jan 2017 16:15:56 -0800 Subject: [PATCH 05/13] Switch to watch-based logging --- .../spark/deploy/kubernetes/Client.scala | 147 ++++++++++-------- ...or.scala => LoggingPodStatusWatcher.scala} | 87 ++++++----- 2 files changed, 127 insertions(+), 107 deletions(-) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/{PodStateMonitor.scala => LoggingPodStatusWatcher.scala} (51%) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 712d92d4a26ec..85d5747830e80 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -65,7 +65,7 @@ private[spark] class Client( private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT) private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT) - private val fireAndForget: Boolean = !sparkConf.get(WAIT_FOR_APP_COMPLETION) + private val waitForAppCompletion: Boolean = sparkConf.get(WAIT_FOR_APP_COMPLETION) private val secretBase64String = { val secretBytes = new Array[Byte](128) @@ -81,7 +81,9 @@ private[spark] class Client( ThreadUtils.newDaemonSingleThreadExecutor("kubernetes-client-retryable-futures")) def run(): Unit = { + logInfo(s"Starting application $kubernetesAppId in Kubernetes...") val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions() + val parsedCustomLabels = parseCustomLabels(customLabels) var k8ConfBuilder = new K8SConfigBuilder() .withApiVersion("v1") @@ -116,81 +118,92 @@ private[spark] class Client( SPARK_APP_NAME_LABEL -> appName) ++ parsedCustomLabels).asJava val containerPorts = buildContainerPorts() - val submitCompletedFuture = SettableFuture.create[Boolean] - val submitPending = new AtomicBoolean(false) - val podWatcher = new DriverPodWatcher( - submitCompletedFuture, - submitPending, - kubernetesClient, - driverSubmitSslOptions, - Array(submitServerSecret) ++ sslSecrets, - driverKubernetesSelectors) + + // start outer watch for status logging of driver pod + val driverPodCompletedFuture = SettableFuture.create[Boolean] + val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedFuture, kubernetesAppId, + sparkConf.get(REPORT_INTERVAL)) Utils.tryWithResource(kubernetesClient .pods() .withLabels(driverKubernetesSelectors) - .watch(podWatcher)) { _ => - kubernetesClient.pods().createNew() - .withNewMetadata() - .withName(kubernetesAppId) + .watch(loggingWatch)) { _ => + + // launch driver pod with inner watch to upload jars when it's ready + val submitCompletedFuture = SettableFuture.create[Boolean] + val submitPending = new AtomicBoolean(false) + val podWatcher = new DriverPodWatcher( + submitCompletedFuture, + submitPending, + kubernetesClient, + driverSubmitSslOptions, + Array(submitServerSecret) ++ sslSecrets, + driverKubernetesSelectors) + Utils.tryWithResource(kubernetesClient + .pods() .withLabels(driverKubernetesSelectors) - .endMetadata() - .withNewSpec() - .withRestartPolicy("OnFailure") - .addNewVolume() - .withName(SUBMISSION_APP_SECRET_VOLUME_NAME) - .withNewSecret() - .withSecretName(submitServerSecret.getMetadata.getName) - .endSecret() - .endVolume - .addToVolumes(sslVolumes: _*) - .withServiceAccount(serviceAccount) - .addNewContainer() - .withName(DRIVER_CONTAINER_NAME) - .withImage(driverDockerImage) - .withImagePullPolicy("IfNotPresent") - .addNewVolumeMount() + .watch(podWatcher)) { _ => + kubernetesClient.pods().createNew() + .withNewMetadata() + .withName(kubernetesAppId) + .withLabels(driverKubernetesSelectors) + .endMetadata() + .withNewSpec() + .withRestartPolicy("OnFailure") + .addNewVolume() .withName(SUBMISSION_APP_SECRET_VOLUME_NAME) - .withMountPath(secretDirectory) - .withReadOnly(true) - .endVolumeMount() - .addToVolumeMounts(sslVolumeMounts: _*) - .addNewEnv() - .withName(ENV_SUBMISSION_SECRET_LOCATION) - .withValue(s"$secretDirectory/$SUBMISSION_APP_SECRET_NAME") - .endEnv() - .addNewEnv() - .withName(ENV_SUBMISSION_SERVER_PORT) - .withValue(SUBMISSION_SERVER_PORT.toString) - .endEnv() - .addToEnv(sslEnvs: _*) - .withPorts(containerPorts.asJava) - .endContainer() - .endSpec() - .done() - var submitSucceeded = false - try { - submitCompletedFuture.get(driverSubmitTimeoutSecs, TimeUnit.SECONDS) - submitSucceeded = true - } catch { - case e: TimeoutException => - val finalErrorMessage: String = buildSubmitFailedErrorMessage(kubernetesClient, e) - logError(finalErrorMessage, e) - throw new SparkException(finalErrorMessage, e) - } finally { - if (!submitSucceeded) { - Utils.tryLogNonFatalError { - kubernetesClient.pods.withName(kubernetesAppId).delete() + .withNewSecret() + .withSecretName(submitServerSecret.getMetadata.getName) + .endSecret() + .endVolume + .addToVolumes(sslVolumes: _*) + .withServiceAccount(serviceAccount) + .addNewContainer() + .withName(DRIVER_CONTAINER_NAME) + .withImage(driverDockerImage) + .withImagePullPolicy("IfNotPresent") + .addNewVolumeMount() + .withName(SUBMISSION_APP_SECRET_VOLUME_NAME) + .withMountPath(secretDirectory) + .withReadOnly(true) + .endVolumeMount() + .addToVolumeMounts(sslVolumeMounts: _*) + .addNewEnv() + .withName(ENV_SUBMISSION_SECRET_LOCATION) + .withValue(s"$secretDirectory/$SUBMISSION_APP_SECRET_NAME") + .endEnv() + .addNewEnv() + .withName(ENV_SUBMISSION_SERVER_PORT) + .withValue(SUBMISSION_SERVER_PORT.toString) + .endEnv() + .addToEnv(sslEnvs: _*) + .withPorts(containerPorts.asJava) + .endContainer() + .endSpec() + .done() + var submitSucceeded = false + try { + submitCompletedFuture.get(driverSubmitTimeoutSecs, TimeUnit.SECONDS) + submitSucceeded = true + } catch { + case e: TimeoutException => + val finalErrorMessage: String = buildSubmitFailedErrorMessage(kubernetesClient, e) + logError(finalErrorMessage, e) + throw new SparkException(finalErrorMessage, e) + } finally { + if (!submitSucceeded) { + Utils.tryLogNonFatalError { + kubernetesClient.pods.withName(kubernetesAppId).delete() + } } } } - } - // poll for status - if (!fireAndForget) { - val reportInterval = sparkConf.get(REPORT_INTERVAL) - val finalState = new PodStateMonitor(kubernetesClient, kubernetesAppId, reportInterval) - .monitorToCompletion() - logInfo(s"Application $kubernetesAppId ended with final phase $finalState") + // wait if configured to do so + if (waitForAppCompletion) { + logInfo(s"Waiting for application $kubernetesAppId to finish...") + driverPodCompletedFuture.get() + logInfo(s"Application $kubernetesAppId finished.") + } } } finally { Utils.tryLogNonFatalError { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/PodStateMonitor.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala similarity index 51% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/PodStateMonitor.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala index c8b317bd7f2b8..4183fff45946c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/PodStateMonitor.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala @@ -16,61 +16,68 @@ */ package org.apache.spark.deploy.kubernetes +import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} + import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.Pod -import io.fabric8.kubernetes.client.DefaultKubernetesClient +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler +import com.google.common.util.concurrent.SettableFuture +import io.fabric8.kubernetes.api.model.{Pod, PodStatus} +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action import org.apache.spark.SparkException import org.apache.spark.internal.Logging /** - * A monitor for a running Kubernetes application, logging on state change and interval. + * A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on + * every state change and also at an interval for liveness. * - * @param client Kubernetes client + * @param podCompletedFuture a SettableFuture that is set to true when the watched pod finishes * @param appId * @param interval ms between each state request */ -private[kubernetes] class PodStateMonitor(client: DefaultKubernetesClient, - appId: String, - interval: Long) extends Logging { - - /** - * Log the state of the application until it finishes, either successfully or due to a - * failure, logging status throughout and on every state change. - * - * When the application finishes, returns its final state, either "Succeeded" or "Failed". - */ - def monitorToCompletion(): String = { - - var previousPhase: String = null - - while (true) { - Thread.sleep(interval) - - val pod = requestCurrentPodState() - val phase = pod.getStatus().getPhase() - - // log a short message every interval, plus full details on every state change - logInfo(s"Application status for $appId (phase: $phase)") - if (previousPhase != phase) { - logInfo("Phase changed, new state: " + formatPodState(pod)) - } - - // terminal state -- return - if (phase == "Succeeded" || phase == "Failed") { - return phase - } - - previousPhase = phase +private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: SettableFuture[Boolean], + appId: String, + interval: Long) + extends Watcher[Pod] with Logging { + + // start timer for periodic logging + private val scheduler = Executors.newScheduledThreadPool(1) + private val logRunnable: Runnable = new Runnable { + override def run() = logShortStatus() + } + scheduler.scheduleWithFixedDelay(logRunnable, 0, interval, TimeUnit.MILLISECONDS) + + private var pod: Option[Pod] = Option.empty + private var prevPhase: String = null + private def phase: String = pod.map(_.getStatus().getPhase()).getOrElse("unknown") + + override def eventReceived(action: Action, pod: Pod): Unit = { + this.pod = Option(pod) + + logShortStatus() + if (prevPhase != phase) { + logLongStatus() } + prevPhase = phase + + if (phase == "Succeeded" || phase == "Failed") { + podCompletedFuture.set(true) + } + } + + override def onClose(e: KubernetesClientException): Unit = { + scheduler.shutdown() + logInfo(s"Application $appId ended with final phase $phase") + } - // Never reached, but keeps compiler happy - throw new SparkException("While loop is depleted! This should never happen...") + private def logShortStatus() = { + logInfo(s"Application status for $appId (phase: $phase)") } - private def requestCurrentPodState(): Pod = { - client.pods().withName(appId).get() + private def logLongStatus() = { + logInfo("Phase changed, new state: " + pod.map(formatPodState(_)).getOrElse("unknown")) } private def formatPodState(pod: Pod): String = { From 2cbe11f9c9a563d9407356ff123fddf2b4098574 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Mon, 30 Jan 2017 17:10:26 -0800 Subject: [PATCH 06/13] Spaces in comma-joined volumes, labels, and containers --- .../spark/deploy/kubernetes/LoggingPodStatusWatcher.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala index 4183fff45946c..945764a383ca6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala @@ -86,13 +86,13 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: SettableFu // pod metadata ("pod name", pod.getMetadata.getName()), ("namespace", pod.getMetadata.getNamespace()), - ("labels", pod.getMetadata.getLabels().asScala.mkString(",")), + ("labels", pod.getMetadata.getLabels().asScala.mkString(", ")), ("pod uid", pod.getMetadata.getUid), ("creation time", pod.getMetadata.getCreationTimestamp()), // spec details ("service account name", pod.getSpec.getServiceAccountName()), - ("volumes", pod.getSpec.getVolumes().asScala.map(_.getName).mkString(",")), + ("volumes", pod.getSpec.getVolumes().asScala.map(_.getName).mkString(", ")), ("node name", pod.getSpec.getNodeName()), // status @@ -101,7 +101,7 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: SettableFu pod.getStatus.getContainerStatuses() .asScala .map(_.getImage) - .mkString(",")), + .mkString(", ")), ("phase", pod.getStatus.getPhase()) ) From a69b8650e6d4795c0182441517d5bd0fd5eaeee2 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Tue, 31 Jan 2017 14:10:42 -0800 Subject: [PATCH 07/13] Use CountDownLatch instead of SettableFuture --- .../org/apache/spark/deploy/kubernetes/Client.scala | 8 ++++---- .../deploy/kubernetes/LoggingPodStatusWatcher.scala | 13 +++++-------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 85d5747830e80..87a17a6034898 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.kubernetes import java.io.{File, FileInputStream} import java.security.{KeyStore, SecureRandom} -import java.util.concurrent.{TimeoutException, TimeUnit} +import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager} @@ -120,8 +120,8 @@ private[spark] class Client( val containerPorts = buildContainerPorts() // start outer watch for status logging of driver pod - val driverPodCompletedFuture = SettableFuture.create[Boolean] - val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedFuture, kubernetesAppId, + val driverPodCompletedLatch = new CountDownLatch(1) + val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId, sparkConf.get(REPORT_INTERVAL)) Utils.tryWithResource(kubernetesClient .pods() @@ -201,7 +201,7 @@ private[spark] class Client( // wait if configured to do so if (waitForAppCompletion) { logInfo(s"Waiting for application $kubernetesAppId to finish...") - driverPodCompletedFuture.get() + driverPodCompletedLatch.await() logInfo(s"Application $kubernetesAppId finished.") } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala index 945764a383ca6..9b84334c9a7c1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala @@ -16,28 +16,25 @@ */ package org.apache.spark.deploy.kubernetes -import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import scala.collection.JavaConverters._ -import com.google.common.util.concurrent.AbstractScheduledService.Scheduler -import com.google.common.util.concurrent.SettableFuture -import io.fabric8.kubernetes.api.model.{Pod, PodStatus} +import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action -import org.apache.spark.SparkException import org.apache.spark.internal.Logging /** * A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on * every state change and also at an interval for liveness. * - * @param podCompletedFuture a SettableFuture that is set to true when the watched pod finishes + * @param podCompletedFuture a CountDownLatch that is set to true when the watched pod finishes * @param appId * @param interval ms between each state request */ -private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: SettableFuture[Boolean], +private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownLatch, appId: String, interval: Long) extends Watcher[Pod] with Logging { @@ -63,7 +60,7 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: SettableFu prevPhase = phase if (phase == "Succeeded" || phase == "Failed") { - podCompletedFuture.set(true) + podCompletedFuture.countDown() } } From f44ca49703575c55c82bb568bc454597ef978cd1 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Tue, 31 Jan 2017 16:06:37 -0800 Subject: [PATCH 08/13] Match parallel ConfigBuilder style --- .../main/scala/org/apache/spark/deploy/kubernetes/config.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 81f77476e9046..d276a05ceae01 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -176,8 +176,7 @@ package object config { .createOptional private[spark] val WAIT_FOR_APP_COMPLETION = - ConfigBuilder( - "spark.kubernetes.submit.waitAppCompletion") + ConfigBuilder("spark.kubernetes.submit.waitAppCompletion") .doc( """ | In cluster mode, whether to wait for the application to finish before exiting the From c63e5e59c6cd5ee8ed8191e6d56dc3f0c24dae2f Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Thu, 2 Feb 2017 13:30:31 -0800 Subject: [PATCH 09/13] Disable logging in fire-and-forget mode Which is enabled with spark.kubernetes.submit.waitAppCompletion=false (default: true) --- .../scala/org/apache/spark/deploy/kubernetes/Client.scala | 4 +++- .../spark/deploy/kubernetes/LoggingPodStatusWatcher.scala | 7 +++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 87a17a6034898..78a4bedbf1c3c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -121,8 +121,10 @@ private[spark] class Client( // start outer watch for status logging of driver pod val driverPodCompletedLatch = new CountDownLatch(1) + // only enable interval logging if in waitForAppCompletion mode + val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0 val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId, - sparkConf.get(REPORT_INTERVAL)) + loggingInterval) Utils.tryWithResource(kubernetesClient .pods() .withLabels(driverKubernetesSelectors) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala index 9b84334c9a7c1..8c30dbde53cc8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala @@ -32,7 +32,8 @@ import org.apache.spark.internal.Logging * * @param podCompletedFuture a CountDownLatch that is set to true when the watched pod finishes * @param appId - * @param interval ms between each state request + * @param interval ms between each state request. If set to 0 or a negative number, the periodic + * logging will be disabled. */ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownLatch, appId: String, @@ -44,7 +45,9 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL private val logRunnable: Runnable = new Runnable { override def run() = logShortStatus() } - scheduler.scheduleWithFixedDelay(logRunnable, 0, interval, TimeUnit.MILLISECONDS) + if (interval > 0) { + scheduler.scheduleWithFixedDelay(logRunnable, 0, interval, TimeUnit.MILLISECONDS) + } private var pod: Option[Pod] = Option.empty private var prevPhase: String = null From 61fbed3d8119f794a0a542b76e57713abd5ca634 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Thu, 2 Feb 2017 14:47:39 -0800 Subject: [PATCH 10/13] Additional log line for when application is launched --- .../main/scala/org/apache/spark/deploy/kubernetes/Client.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 5c8651216b3c9..40479c7f82f29 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -207,6 +207,8 @@ private[spark] class Client( logInfo(s"Waiting for application $kubernetesAppId to finish...") driverPodCompletedLatch.await() logInfo(s"Application $kubernetesAppId finished.") + } else { + logInfo(s"Application $kubernetesAppId launched.") } } } finally { From 6ef7df8fc7696a41a6a1905db59859c7ff48ede4 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Thu, 2 Feb 2017 15:23:39 -0800 Subject: [PATCH 11/13] Minor wording changes --- .../main/scala/org/apache/spark/deploy/kubernetes/Client.scala | 2 +- .../spark/deploy/kubernetes/LoggingPodStatusWatcher.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 40479c7f82f29..7617c2cd71a1d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -208,7 +208,7 @@ private[spark] class Client( driverPodCompletedLatch.await() logInfo(s"Application $kubernetesAppId finished.") } else { - logInfo(s"Application $kubernetesAppId launched.") + logInfo(s"Application $kubernetesAppId successfully launched.") } } } finally { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala index 8c30dbde53cc8..40264e584e489 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala @@ -69,7 +69,7 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL override def onClose(e: KubernetesClientException): Unit = { scheduler.shutdown() - logInfo(s"Application $appId ended with final phase $phase") + logInfo(s"Stopped watching application $appId with last-observed phase $phase") } private def logShortStatus() = { From 1d928543157788eef6cc776f48431d4a49fbea02 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Thu, 2 Feb 2017 15:40:32 -0800 Subject: [PATCH 12/13] More logging --- .../main/scala/org/apache/spark/deploy/kubernetes/Client.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 7617c2cd71a1d..afcf6d5b6ea6d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -188,6 +188,7 @@ private[spark] class Client( try { submitCompletedFuture.get(driverSubmitTimeoutSecs, TimeUnit.SECONDS) submitSucceeded = true + logInfo(s"Finished launching local resources to application $kubernetesAppId") } catch { case e: TimeoutException => val finalErrorMessage: String = buildSubmitFailedErrorMessage(kubernetesClient, e) @@ -404,6 +405,8 @@ private[spark] class Client( Future { sparkConf.set("spark.driver.host", pod.getStatus.getPodIP) val submitRequest = buildSubmissionRequest() + logInfo(s"Submitting local resources to driver pod for application " + + s"$kubernetesAppId ...") driverSubmitter.submitApplication(submitRequest) } } From d019cc9538b0c9a8c59810c49b58693345b6ca83 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Thu, 2 Feb 2017 15:42:04 -0800 Subject: [PATCH 13/13] Drop log to DEBUG --- .../spark/deploy/kubernetes/LoggingPodStatusWatcher.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala index 40264e584e489..cbacaf6bda854 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala @@ -69,7 +69,7 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL override def onClose(e: KubernetesClientException): Unit = { scheduler.shutdown() - logInfo(s"Stopped watching application $appId with last-observed phase $phase") + logDebug(s"Stopped watching application $appId with last-observed phase $phase") } private def logShortStatus() = {