diff --git a/conf/kubernetes-custom-resource.yaml b/conf/kubernetes-custom-resource.yaml new file mode 100644 index 0000000000000..317d77c500381 --- /dev/null +++ b/conf/kubernetes-custom-resource.yaml @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +metadata: + name: spark-job.apache.org + labels: + resource: spark-job + object: spark +apiVersion: extensions/v1beta1 +kind: ThirdPartyResource +description: "A resource that reports status of a spark job" +versions: + - name: v1 diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 488efbe5eef36..f8690c734447c 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -210,6 +210,45 @@ the command may then look like the following: --conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service,spark-version=2.1.0" \ local:///opt/spark/examples/jars/spark_examples_2.11-2.2.0.jar 10 400000 2 +## ThirdPartyResources for visibility into state of deployed Spark job + +In order to expose the state of a deployed spark job to a kubernetes administrator or user, via the kubectl or the +kubernetes dashboard, we have added a kubernetes Resource (of kind: SparkJob) storing pertinent information +related to a specific spark job. + +Using this, we can view current and all past (if not already cleaned up) deployed spark apps within the +current namespace using `kubectl` like so: + + kubectl get sparkjobs + +Or via the kubernetes dashboard using the link as provided by: + + kubectl cluster-info + + +### Prerequisites + +Note that this resource is dependent on extending the kubernetes API using a +[ThirdPartyResource (TPR)](https://kubernetes.io/docs/tasks/access-kubernetes-api/extend-api-third-party-resource/). + +TPRs are available in K8s API as of v1.5 + +See conf/kubernetes-custom-resource.yaml for the recommended yaml file. From the spark base directory, +we can create the recommended TPR like so: + + kubectl create -f conf/kubernetes-custom-resource.yaml + +### Important Things to note + +TPRs are an alpha feature that might not be available in every cluster. +TPRs need to be manually cleaned up because garbage collection support does not exist for them yet. + +### Future work + +Kube administrators or users would be able to stop a spark app running in their cluster by simply +deleting the attached resource. + + ## Advanced ### Securing the Resource Staging Server with TLS diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index d1341b15afaca..6f3962b21af89 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -271,6 +271,21 @@ package object config extends Logging { .longConf .createWithDefault(1) + private[spark] val KUBERNETES_JOB_RESOURCE_NAME = + ConfigBuilder("spark.kubernetes.statusReporting.resourceName") + .doc("Name of SparkJob Resource attached to the said spark job. ") + .internal() + .stringConf + .createOptional + + private[spark] val KUBERNETES_JOB_RESOURCE_ENABLED = + ConfigBuilder("spark.kubernetes.statusReporting.enabled") + .doc("This is set to true when creation of SparkJob resource is successful" + + " which directly means we need to keep the resource updated") + .internal() + .booleanConf + .createWithDefault(false) + private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.kubernetes.submission.waitAppCompletion") .doc("In cluster mode, whether to wait for the application to finish before exiting the" + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index e267c9ff7e1d1..27e532415424a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -92,4 +92,14 @@ package object constants { private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10 private[spark] val MEMORY_OVERHEAD_MIN = 384L + + // TPR + private[spark] val TPR_API_GROUP = "apache.org" + private[spark] val TPR_API_VERSION = "v1" + private[spark] val TPR_KIND = "SparkJob" + + // SparkJob Status + private[spark] val STATUS_NOT_AVAILABLE = "N/A" + private[spark] val STATUS_PENDING = "Pending" + } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index dc8a6da45495e..f2e9cdf765caf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -17,15 +17,18 @@ package org.apache.spark.deploy.kubernetes.submit import java.io.File -import java.util.Collections +import java.text.SimpleDateFormat +import java.util.{Collections, Date} import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, OwnerReferenceBuilder, PodBuilder} import scala.collection.JavaConverters._ +import scala.util.{Failure, Success, Try} import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.ConfigurationUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.tpr.{sparkJobResourceController, sparkJobResourceControllerImpl, JobState, Status} import org.apache.spark.deploy.rest.kubernetes.ResourceStagingServerSslOptionsProviderImpl import org.apache.spark.internal.Logging import org.apache.spark.launcher.SparkLauncher @@ -53,7 +56,8 @@ private[spark] class Client( kubernetesClientProvider: SubmissionKubernetesClientProvider, initContainerComponentsProvider: DriverInitContainerComponentsProvider, kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider, - loggingPodStatusWatcher: LoggingPodStatusWatcher) + loggingPodStatusWatcher: LoggingPodStatusWatcher, + sparkJobResourceController: sparkJobResourceController) extends Logging { private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME) @@ -72,6 +76,31 @@ private[spark] class Client( org.apache.spark.internal.config.DRIVER_CLASS_PATH) private val driverJavaOptions = sparkConf.get( org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) + private val resourceTimeFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.S'Z'") + + // create resource of kind - SparkJob representing the deployed spark app + private val status = Status( + creationTimeStamp = resourceTimeFormat.format(new Date()), + completionTimeStamp = STATUS_NOT_AVAILABLE, + sparkDriver = kubernetesDriverPodName, + driverImage = driverDockerImage, + executorImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE), + jobState = JobState.QUEUED, + desiredExecutors = sparkConf.getInt("spark.executor.instances", 1), + currentExecutors = 0, + driverUi = STATUS_PENDING + ) + + // Failure might be due to TPR inexistence or maybe we're stuck in the 10 minute lag + // TODO: in the latter case we can attempt a retry depending on the rc + // This also assumes that once we fail at creation, we won't bother trying + // anything on the resource for the lifetime of the app + Try(sparkJobResourceController.createJobObject(kubernetesAppId, status)) match { + case Success(_) => + sparkConf.set(KUBERNETES_JOB_RESOURCE_ENABLED, true) + sparkConf.set(KUBERNETES_JOB_RESOURCE_NAME, kubernetesAppId) + case Failure(_) => + } def run(): Unit = { validateNoDuplicateFileNames(sparkJars) @@ -277,6 +306,8 @@ private[spark] object Client { val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION) val loggingInterval = Option(sparkConf.get(REPORT_INTERVAL)).filter( _ => waitForAppCompletion) val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval) + val sparkJobResourceController = new sparkJobResourceControllerImpl( + kubernetesClientProvider.get) new Client( appName, kubernetesAppId, @@ -289,6 +320,7 @@ private[spark] object Client { kubernetesClientProvider, initContainerComponentsProvider, kubernetesCredentialsMounterProvider, - loggingPodStatusWatcher).run() + loggingPodStatusWatcher, + sparkJobResourceController).run() } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobState.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobState.scala new file mode 100644 index 0000000000000..e2d0187d3e504 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobState.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.kubernetes.tpr + +private[spark] object JobState extends Enumeration { + type JobState = Value + + /* + * QUEUED - Spark Job has been queued to run + * SUBMITTED - Driver Pod deployed but tasks are not yet scheduled on worker pod(s) + * RUNNING - Task(s) have been allocated to worker pod(s) to run and Spark Job is now running + * FINISHED - Spark Job ran and exited cleanly, i.e, worker pod(s) and driver pod were + * gracefully deleted + * FAILED - Spark Job Failed due to error + * KILLED - A user manually killed this Spark Job + */ + val QUEUED, SUBMITTED, RUNNING, FINISHED, FAILED, KILLED = Value +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala new file mode 100644 index 0000000000000..41ecd9f82cbba --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.kubernetes.tpr + +import org.json4s.{CustomSerializer, JString} +import org.json4s.JsonAST.JNull + +import org.apache.spark.deploy.kubernetes.tpr.JobState.JobState + +/** + * JobState Serializer and Deserializer + */ +private[spark] object JobStateSerDe extends CustomSerializer[JobState](_ => + ({ + case JString("SUBMITTED") => JobState.SUBMITTED + case JString("QUEUED") => JobState.QUEUED + case JString("RUNNING") => JobState.RUNNING + case JString("FINISHED") => JobState.FINISHED + case JString("KILLED") => JobState.KILLED + case JString("FAILED") => JobState.FAILED + case JNull => + throw new UnsupportedOperationException("No JobState Specified") + }, { + case JobState.FAILED => JString("FAILED") + case JobState.SUBMITTED => JString("SUBMITTED") + case JobState.KILLED => JString("KILLED") + case JobState.FINISHED => JString("FINISHED") + case JobState.QUEUED => JString("QUEUED") + case JobState.RUNNING => JString("RUNNING") + }) +) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/sparkJobResourceController.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/sparkJobResourceController.scala new file mode 100644 index 0000000000000..54a3b3f83d579 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/sparkJobResourceController.scala @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.tpr + +import java.io.IOException +import java.util.concurrent.ThreadPoolExecutor + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{blocking, Future, Promise} + +import io.fabric8.kubernetes.client.{BaseClient, KubernetesClient} +import okhttp3.{HttpUrl, MediaType, OkHttpClient, Request, RequestBody, Response} +import org.json4s.{DefaultFormats, Formats} +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods.{compact, parse, pretty, render} +import org.json4s.jackson.Serialization.{read, write} + +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.internal.Logging +import org.apache.spark.SparkException +import org.apache.spark.deploy.kubernetes.tpr.JobState.JobState +import org.apache.spark.util.ThreadUtils + +/** + * This trait contains currently acceptable operations on the SparkJob Resource + * CRUD + */ +private[spark] trait sparkJobResourceController { + def createJobObject(name: String, status: Status): Unit + def deleteJobObject(tprObjectName: String): Unit + def getJobObject(name: String): SparkJobState + def updateJobObject(jobResourcePatches: Seq[JobResourcePatch]): Unit +} + +private[spark] case class Metadata(name: String, + uid: Option[String] = None, + labels: Option[Map[String, String]] = None, + annotations: Option[Map[String, String]] = None) + +private[spark] case class SparkJobState(apiVersion: String, + kind: String, + metadata: Metadata, + status: Status) + +private[spark] case class Status(creationTimeStamp: String, + completionTimeStamp: String, + sparkDriver: String, + driverImage: String, + executorImage: String, + jobState: JobState, + desiredExecutors: Int, + currentExecutors: Int, + driverUi: String) + +private[spark] case class WatchObject(`type`: String, `object`: SparkJobState) + +private[spark] case class JobResourcePatch(name: String, value: Any, fieldPath: String) + +/** + * Prereq - This assumes the kubernetes API has been extended using a TPR of name: Spark-job + * See conf/kubernetes-custom-resource.yaml for a model + * + * CRUD + Watch Operations on SparkJob Resource + * + * This class contains all CRUD+Watch implementations performed + * on the SparkJob Resource used to expose the state of a spark job + * in kubernetes (visible via kubectl or through the k8s dashboard). + */ +private[spark] class sparkJobResourceControllerImpl(k8sClient: KubernetesClient) + extends Logging with sparkJobResourceController { + private val kubeMaster = k8sClient.getMasterUrl.toString + private val httpClient: OkHttpClient = + extractHttpClientFromK8sClient(k8sClient.asInstanceOf[BaseClient]) + private val namespace = k8sClient.getNamespace + + private implicit val formats: Formats = DefaultFormats + JobStateSerDe + private implicit val ec: ThreadPoolExecutor = ThreadUtils + .newDaemonCachedThreadPool("tpr-watcher-pool") + + /** + * This method creates a resource of kind "SparkJob" + * @return Unit + * @throws IOException Due to failure from execute call + * @throws SparkException when POST request is unsuccessful + */ + override def createJobObject(name: String, status: Status): Unit = { + val resourceObject = + SparkJobState(s"$TPR_API_GROUP/$TPR_API_VERSION", TPR_KIND, Metadata(name), status) + val payload = parse(write(resourceObject)) + val requestBody = RequestBody + .create(MediaType.parse("application/json"), compact(render(payload))) + val requestPathSegments = Seq( + "apis", TPR_API_GROUP, TPR_API_VERSION, "namespaces", namespace, "sparkjobs") + val url = generateHttpUrl(requestPathSegments) + val request = new Request.Builder() + .post(requestBody) + .url(url) + .build() + + logDebug(s"Create SparkJobResource Request: $request") + var response: Response = null + try { + response = httpClient.newCall(request).execute() + } catch { + case x: IOException => + val msg = + s"Failed to post resource $name. ${x.getMessage}. ${compact(render(payload))}" + logError(msg) + response.close() + throw new SparkException(msg) + } + completeRequestWithExceptionIfNotSuccessful( + "post", + response, + Option(Seq(name, response.toString, compact(render(payload))))) + logDebug(s"Successfully posted resource $name: " + + s"${pretty(render(parse(write(resourceObject))))}") + response.body().close() + } + + /** + * This method deletes a resource of kind "SparkJob" with the specified name + * @return Unit + * @throws IOException Due to failure from execute call + * @throws SparkException when DELETE request is unsuccessful + */ + override def deleteJobObject(tprObjectName: String): Unit = { + val requestPathSegments = Seq( + "apis", TPR_API_GROUP, TPR_API_VERSION, "namespaces", namespace, "sparkjobs", tprObjectName) + val url = generateHttpUrl(requestPathSegments) + val request = new Request.Builder() + .delete() + .url(url) + .build() + + logDebug(s"Delete Request: $request") + var response: Response = null + try { + response = httpClient.newCall(request).execute() + } catch { + case x: IOException => + val msg = + s"Failed to delete resource. ${x.getMessage}." + logError(msg) + response.close() + throw new SparkException(msg) + } + completeRequestWithExceptionIfNotSuccessful( + "delete", + response, + Option(Seq(tprObjectName, response.message(), request.toString))) + + response.body().close() + logInfo(s"Successfully deleted resource $tprObjectName") + } + + /** + * This method GETS a resource of kind "SparkJob" with the given name + * @return SparkJobState + * @throws IOException Due to failure from execute call + * @throws SparkException when GET request is unsuccessful + */ + override def getJobObject(name: String): SparkJobState = { + val requestPathSegments = Seq( + "apis", TPR_API_GROUP, TPR_API_VERSION, "namespaces", namespace, "sparkjobs", name) + val url = generateHttpUrl(requestPathSegments) + val request = new Request.Builder() + .get() + .url(url) + .build() + + logDebug(s"Get Request: $request") + var response: Response = null + try { + response = httpClient.newCall(request).execute() + } catch { + case x: IOException => + val msg = + s"Failed to get resource $name. ${x.getMessage}." + logError(msg) + response.close() + throw new SparkException(msg) + } + completeRequestWithExceptionIfNotSuccessful( + "get", + response, + Option(Seq(name, response.message())) + ) + + logInfo(s"Successfully retrieved resource $name") + read[SparkJobState](response.body().string()) + } + + /** + * This method Patches in Batch or singly a resource of kind "SparkJob" with the specified name + * @return Unit + * @throws IOException Due to failure from execute call + * @throws SparkException when PATCH request is unsuccessful + */ + override def updateJobObject(jobResourcePatches: Seq[JobResourcePatch]): Unit = { + val payload = jobResourcePatches map { jobResourcePatch => + ("op" -> "replace") ~ + ("path" -> jobResourcePatch.fieldPath) ~ + ("value" -> jobResourcePatch.value.toString) + } + val requestBody = + RequestBody.create( + MediaType.parse("application/json-patch+json"), + compact(render(payload))) + val requestPathSegments = Seq( + "apis", TPR_API_GROUP, TPR_API_VERSION, "namespaces", + namespace, "sparkjobs", jobResourcePatches.head.name) + val url = generateHttpUrl(requestPathSegments) + val request = new Request.Builder() + .patch(requestBody) + .url(url) + .build() + + logDebug(s"Update Request: $request") + var response: Response = null + try { + response = httpClient.newCall(request).execute() + } catch { + case x: IOException => + val msg = + s"Failed to get resource ${jobResourcePatches.head.name}. ${x.getMessage}." + logError(msg) + response.close() + throw new SparkException(msg) + } + completeRequestWithExceptionIfNotSuccessful( + "patch", + response, + Option(Seq(jobResourcePatches.head.name, response.message(), compact(render(payload)))) + ) + + response.body().close() + logDebug(s"Successfully patched resource ${jobResourcePatches.head.name}.") + } + + private def generateHttpUrl(urlSegments: Seq[String], + querySegments: Seq[(String, String)] = Seq.empty[(String, String)]): HttpUrl = { + + val urlBuilder = HttpUrl.parse(kubeMaster).newBuilder + urlSegments map { pathSegment => + urlBuilder.addPathSegment(pathSegment) + } + querySegments map { + case (query, value) => urlBuilder.addQueryParameter(query, value) + } + urlBuilder.build() + } + + private def completeRequestWithExceptionIfNotSuccessful( + requestType: String, + response: Response, + additionalInfo: Option[Seq[String]] = None): Unit = { + + if (!response.isSuccessful) { + response.body().close() + val msg = new ArrayBuffer[String] + msg += s"Failed to $requestType resource." + + additionalInfo match { + case Some(info) => + for (extraMsg <- info) { + msg += extraMsg + } + case None => + } + + val finalMessage = msg.mkString(" ") + logError(finalMessage) + throw new SparkException(finalMessage) + } + } + + def extractHttpClientFromK8sClient(client: BaseClient): OkHttpClient = { + val field = classOf[BaseClient].getDeclaredField("httpClient") + try { + field.setAccessible(true) + field.get(client).asInstanceOf[OkHttpClient] + } finally { + field.setAccessible(false) + } + } + + private def executeBlocking(cb: => WatchObject): Future[WatchObject] = { + val p = Promise[WatchObject]() + ec.execute(new Runnable { + override def run(): Unit = { + try { + p.trySuccess(blocking(cb)) + } catch { + case e: Throwable => p.tryFailure(e) + } + } + }) + p.future + } + +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 257cee80fdea9..a453dd483148b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -17,27 +17,32 @@ package org.apache.spark.scheduler.cluster.kubernetes import java.io.Closeable +import java.text.SimpleDateFormat +import java.util.Date import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} -import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder} -import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} -import io.fabric8.kubernetes.client.Watcher.Action -import org.apache.commons.io.FilenameUtils import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import org.apache.commons.io.FilenameUtils + import org.apache.spark.{SparkContext, SparkEnv, SparkException} import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.tpr.{sparkJobResourceControllerImpl, JobResourcePatch, JobState} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient import org.apache.spark.rpc.{RpcCallContext, RpcEndpointAddress, RpcEnv} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, SparkAppConfig} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.ui.SparkUI import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class KubernetesClusterSchedulerBackend( @@ -175,6 +180,16 @@ private[spark] class KubernetesClusterSchedulerBackend( private val allocator = ThreadUtils .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") + private val workingWithSparkJobResource = conf.get(KUBERNETES_JOB_RESOURCE_ENABLED) + private val jobResourceName = conf.get(KUBERNETES_JOB_RESOURCE_NAME) + private val sparkJobResourceController = + if (workingWithSparkJobResource) { + Option.apply(new sparkJobResourceControllerImpl(kubernetesClient)) + } else { + Option.empty + } + private val resourceTimeFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.S'Z'") + private val allocatorRunnable: Runnable = new Runnable { override def run(): Unit = { if (totalRegisteredExecutors.get() < runningExecutorPods.size) { @@ -191,6 +206,12 @@ private[spark] class KubernetesClusterSchedulerBackend( } } } + if (workingWithSparkJobResource) { + updateJobResourceStatus( + JobResourcePatch(jobResourceName.get, + totalRegisteredExecutors.get(), + s"/status/currentExecutors")) + } } } @@ -218,6 +239,17 @@ private[spark] class KubernetesClusterSchedulerBackend( } + private def updateJobResourceStatus(jobResourcePatches: JobResourcePatch*): Unit = { + sparkJobResourceController.foreach(controller => + try { + controller.updateJobObject(jobResourcePatches) + } catch { + case e: SparkException if e.getMessage startsWith "404" => + logWarning(s"Possible deletion of jobResource before backend start") + case e: Exception => logWarning(s"SparkJob object not updated. ${e.getMessage}") + }) + } + override def applicationId(): String = conf.get("spark.app.id", super.applicationId()) override def sufficientResourcesRegistered(): Boolean = { @@ -226,6 +258,17 @@ private[spark] class KubernetesClusterSchedulerBackend( override def start(): Unit = { super.start() + + if (workingWithSparkJobResource) { + updateJobResourceStatus( + JobResourcePatch(jobResourceName.get, + JobState.RUNNING, + s"/status/jobState"), + JobResourcePatch(jobResourceName.get, + "http://localhost:8001/api/v1/namespaces" + + s"/default/pods/$kubernetesDriverPodName:${SparkUI.getUIPort(conf)}/proxy/", + "/status/driverUi")) + } executorWatchResource.set(kubernetesClient.pods().withLabel(SPARK_APP_ID_LABEL, applicationId()) .watch(new ExecutorPodsWatcher())) @@ -251,6 +294,21 @@ private[spark] class KubernetesClusterSchedulerBackend( // send stop message to executors so they shut down cleanly super.stop() + if (workingWithSparkJobResource) { + updateJobResourceStatus( + JobResourcePatch(jobResourceName.get, + STATUS_NOT_AVAILABLE, + "/status/currentExecutors"), + JobResourcePatch(jobResourceName.get, + STATUS_NOT_AVAILABLE, + "/status/driverUi"), + JobResourcePatch(jobResourceName.get, + resourceTimeFormat.format(new Date()), + "/status/completionTimeStamp"), + JobResourcePatch(jobResourceName.get, + JobState.FINISHED, + "/status/jobState")) + } // then delete the executor pods // TODO investigate why Utils.tryLogNonFatalError() doesn't work in this context. @@ -410,6 +468,12 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] { + if (workingWithSparkJobResource) { + updateJobResourceStatus(JobResourcePatch( + jobResourceName.get, + requestedTotal, + s"/status/currentExecutors")) + } totalExpectedExecutors.set(requestedTotal) true } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala index ff6c710117318..aaf72e95d9c9e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala @@ -18,7 +18,10 @@ package org.apache.spark.deploy.kubernetes.submit import java.io.File -import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, DoneablePod, HasMetadata, Pod, PodBuilder, PodList, Secret, SecretBuilder} +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClient, Watch} import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource} import org.hamcrest.{BaseMatcher, Description} @@ -28,13 +31,12 @@ import org.mockito.Mockito.{times, verify, when} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.scalatest.BeforeAndAfter -import scala.collection.JavaConverters._ -import scala.collection.mutable import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} -import org.apache.spark.deploy.kubernetes.{KubernetesExternalShuffleService, KubernetesShuffleBlockHandler, SparkPodInitContainerBootstrap} +import org.apache.spark.deploy.kubernetes.{KubernetesExternalShuffleService, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.tpr.sparkJobResourceController import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient import org.apache.spark.scheduler.cluster.kubernetes.DriverPodKubernetesClientProvider @@ -150,6 +152,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private var namedPodResource: PodResource[Pod, DoneablePod] = _ @Mock private var watch: Watch = _ + @Mock + private var sparkJobResourceController: sparkJobResourceController = _ before { MockitoAnnotations.initMocks(this) @@ -305,7 +309,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { kubernetesClientProvider, initContainerComponentsProvider, credentialsMounterProvider, - loggingPodStatusWatcher).run() + loggingPodStatusWatcher, + sparkJobResourceController).run() verify(loggingPodStatusWatcher).awaitCompletion() } @@ -412,7 +417,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { kubernetesClientProvider, initContainerComponentsProvider, credentialsMounterProvider, - loggingPodStatusWatcher).run() + loggingPodStatusWatcher, + sparkJobResourceController).run() val podMatcher = new BaseMatcher[Pod] { override def matches(o: scala.Any): Boolean = { o match {