Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ package org.apache.spark.deploy.kubernetes

import java.io.{File, FileInputStream}
import java.security.{KeyStore, SecureRandom}
import java.util.concurrent.{TimeoutException, TimeUnit}
import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager}

import com.google.common.base.Charsets
import com.google.common.io.Files
import com.google.common.util.concurrent.SettableFuture
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.{ConfigBuilder => K8SConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.apache.commons.codec.binary.Base64
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -67,6 +67,8 @@ private[spark] class Client(
private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT)
private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT)

private val waitForAppCompletion: Boolean = sparkConf.get(WAIT_FOR_APP_COMPLETION)

private val secretBase64String = {
val secretBytes = new Array[Byte](128)
SECURE_RANDOM.nextBytes(secretBytes)
Expand All @@ -81,9 +83,11 @@ private[spark] class Client(
ThreadUtils.newDaemonSingleThreadExecutor("kubernetes-client-retryable-futures"))

def run(): Unit = {
logInfo(s"Starting application $kubernetesAppId in Kubernetes...")
val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions()

val parsedCustomLabels = parseCustomLabels(customLabels)
var k8ConfBuilder = new ConfigBuilder()
var k8ConfBuilder = new K8SConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(master)
.withNamespace(namespace)
Expand Down Expand Up @@ -116,73 +120,97 @@ private[spark] class Client(
SPARK_APP_NAME_LABEL -> appName)
++ parsedCustomLabels).asJava
val containerPorts = buildContainerPorts()
val submitCompletedFuture = SettableFuture.create[Boolean]
val submitPending = new AtomicBoolean(false)
val podWatcher = new DriverPodWatcher(
submitCompletedFuture,
submitPending,
kubernetesClient,
driverSubmitSslOptions,
Array(submitServerSecret) ++ sslSecrets,
driverKubernetesSelectors)

// start outer watch for status logging of driver pod
val driverPodCompletedLatch = new CountDownLatch(1)
// only enable interval logging if in waitForAppCompletion mode
val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0
val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId,
loggingInterval)
Utils.tryWithResource(kubernetesClient
.pods()
.withLabels(driverKubernetesSelectors)
.watch(podWatcher)) { _ =>
kubernetesClient.pods().createNew()
.withNewMetadata()
.withName(kubernetesAppId)
.watch(loggingWatch)) { _ =>

// launch driver pod with inner watch to upload jars when it's ready
val submitCompletedFuture = SettableFuture.create[Boolean]
val submitPending = new AtomicBoolean(false)
val podWatcher = new DriverPodWatcher(
submitCompletedFuture,
submitPending,
kubernetesClient,
driverSubmitSslOptions,
Array(submitServerSecret) ++ sslSecrets,
driverKubernetesSelectors)
Utils.tryWithResource(kubernetesClient
.pods()
.withLabels(driverKubernetesSelectors)
.endMetadata()
.withNewSpec()
.withRestartPolicy("OnFailure")
.addNewVolume()
.withName(SUBMISSION_APP_SECRET_VOLUME_NAME)
.withNewSecret()
.withSecretName(submitServerSecret.getMetadata.getName)
.endSecret()
.endVolume
.addToVolumes(sslVolumes: _*)
.withServiceAccount(serviceAccount)
.addNewContainer()
.withName(DRIVER_CONTAINER_NAME)
.withImage(driverDockerImage)
.withImagePullPolicy("IfNotPresent")
.addNewVolumeMount()
.watch(podWatcher)) { _ =>
kubernetesClient.pods().createNew()
.withNewMetadata()
.withName(kubernetesAppId)
.withLabels(driverKubernetesSelectors)
.endMetadata()
.withNewSpec()
.withRestartPolicy("OnFailure")
.addNewVolume()
.withName(SUBMISSION_APP_SECRET_VOLUME_NAME)
.withMountPath(secretDirectory)
.withReadOnly(true)
.endVolumeMount()
.addToVolumeMounts(sslVolumeMounts: _*)
.addNewEnv()
.withName(ENV_SUBMISSION_SECRET_LOCATION)
.withValue(s"$secretDirectory/$SUBMISSION_APP_SECRET_NAME")
.endEnv()
.addNewEnv()
.withName(ENV_SUBMISSION_SERVER_PORT)
.withValue(SUBMISSION_SERVER_PORT.toString)
.endEnv()
.addToEnv(sslEnvs: _*)
.withPorts(containerPorts.asJava)
.endContainer()
.endSpec()
.done()
var submitSucceeded = false
try {
submitCompletedFuture.get(driverSubmitTimeoutSecs, TimeUnit.SECONDS)
submitSucceeded = true
} catch {
case e: TimeoutException =>
val finalErrorMessage: String = buildSubmitFailedErrorMessage(kubernetesClient, e)
logError(finalErrorMessage, e)
throw new SparkException(finalErrorMessage, e)
} finally {
if (!submitSucceeded) {
Utils.tryLogNonFatalError {
kubernetesClient.pods.withName(kubernetesAppId).delete()
.withNewSecret()
.withSecretName(submitServerSecret.getMetadata.getName)
.endSecret()
.endVolume
.addToVolumes(sslVolumes: _*)
.withServiceAccount(serviceAccount)
.addNewContainer()
.withName(DRIVER_CONTAINER_NAME)
.withImage(driverDockerImage)
.withImagePullPolicy("IfNotPresent")
.addNewVolumeMount()
.withName(SUBMISSION_APP_SECRET_VOLUME_NAME)
.withMountPath(secretDirectory)
.withReadOnly(true)
.endVolumeMount()
.addToVolumeMounts(sslVolumeMounts: _*)
.addNewEnv()
.withName(ENV_SUBMISSION_SECRET_LOCATION)
.withValue(s"$secretDirectory/$SUBMISSION_APP_SECRET_NAME")
.endEnv()
.addNewEnv()
.withName(ENV_SUBMISSION_SERVER_PORT)
.withValue(SUBMISSION_SERVER_PORT.toString)
.endEnv()
.addToEnv(sslEnvs: _*)
.withPorts(containerPorts.asJava)
.endContainer()
.endSpec()
.done()
var submitSucceeded = false
try {
submitCompletedFuture.get(driverSubmitTimeoutSecs, TimeUnit.SECONDS)
submitSucceeded = true
logInfo(s"Finished launching local resources to application $kubernetesAppId")
} catch {
case e: TimeoutException =>
val finalErrorMessage: String = buildSubmitFailedErrorMessage(kubernetesClient, e)
logError(finalErrorMessage, e)
throw new SparkException(finalErrorMessage, e)
} finally {
if (!submitSucceeded) {
Utils.tryLogNonFatalError {
kubernetesClient.pods.withName(kubernetesAppId).delete()
}
}
}
}

// wait if configured to do so
if (waitForAppCompletion) {
logInfo(s"Waiting for application $kubernetesAppId to finish...")
driverPodCompletedLatch.await()
logInfo(s"Application $kubernetesAppId finished.")
} else {
logInfo(s"Application $kubernetesAppId successfully launched.")
}
}
} finally {
Utils.tryLogNonFatalError {
Expand Down Expand Up @@ -377,6 +405,8 @@ private[spark] class Client(
Future {
sparkConf.set("spark.driver.host", pod.getStatus.getPodIP)
val submitRequest = buildSubmissionRequest()
logInfo(s"Submitting local resources to driver pod for application " +
s"$kubernetesAppId ...")
driverSubmitter.submitApplication(submitRequest)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action

import org.apache.spark.internal.Logging

/**
* A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on
* every state change and also at an interval for liveness.
*
* @param podCompletedFuture a CountDownLatch that is set to true when the watched pod finishes
* @param appId
* @param interval ms between each state request. If set to 0 or a negative number, the periodic
* logging will be disabled.
*/
private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownLatch,
appId: String,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: four-space indentation from the leftmost margin here

interval: Long)
extends Watcher[Pod] with Logging {

// start timer for periodic logging
private val scheduler = Executors.newScheduledThreadPool(1)
private val logRunnable: Runnable = new Runnable {
override def run() = logShortStatus()
}
if (interval > 0) {
scheduler.scheduleWithFixedDelay(logRunnable, 0, interval, TimeUnit.MILLISECONDS)
}

private var pod: Option[Pod] = Option.empty
private var prevPhase: String = null
private def phase: String = pod.map(_.getStatus().getPhase()).getOrElse("unknown")

override def eventReceived(action: Action, pod: Pod): Unit = {
this.pod = Option(pod)

logShortStatus()
if (prevPhase != phase) {
logLongStatus()
}
prevPhase = phase

if (phase == "Succeeded" || phase == "Failed") {
podCompletedFuture.countDown()
}
}

override def onClose(e: KubernetesClientException): Unit = {
scheduler.shutdown()
logDebug(s"Stopped watching application $appId with last-observed phase $phase")
}

private def logShortStatus() = {
logInfo(s"Application status for $appId (phase: $phase)")
}

private def logLongStatus() = {
logInfo("Phase changed, new state: " + pod.map(formatPodState(_)).getOrElse("unknown"))
}

private def formatPodState(pod: Pod): String = {

val details = Seq[(String, String)](
// pod metadata
("pod name", pod.getMetadata.getName()),
("namespace", pod.getMetadata.getNamespace()),
("labels", pod.getMetadata.getLabels().asScala.mkString(", ")),
("pod uid", pod.getMetadata.getUid),
("creation time", pod.getMetadata.getCreationTimestamp()),

// spec details
("service account name", pod.getSpec.getServiceAccountName()),
("volumes", pod.getSpec.getVolumes().asScala.map(_.getName).mkString(", ")),
("node name", pod.getSpec.getNodeName()),

// status
("start time", pod.getStatus.getStartTime),
("container images",
pod.getStatus.getContainerStatuses()
.asScala
.map(_.getImage)
.mkString(", ")),
("phase", pod.getStatus.getPhase())
)

// Use more loggable format if value is null or empty
details.map { case (k, v) =>
val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
s"\n\t $k: $newValue"
}.mkString("")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,23 @@ package object config {
.internal()
.stringConf
.createOptional

private[spark] val WAIT_FOR_APP_COMPLETION =
ConfigBuilder("spark.kubernetes.submit.waitAppCompletion")
.doc(
"""
| In cluster mode, whether to wait for the application to finish before exiting the
| launcher process.
""".stripMargin)
.booleanConf
.createWithDefault(true)

private[spark] val REPORT_INTERVAL =
ConfigBuilder("spark.kubernetes.report.interval")
.doc(
"""
| Interval between reports of the current app status in cluster mode.
""".stripMargin)
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")
}