From 3946444d7e7ac30feb1824d4765f3873db483d79 Mon Sep 17 00:00:00 2001 From: Iyanu Obidele Date: Thu, 10 Nov 2016 08:48:49 -0800 Subject: [PATCH 1/5] initial sparkResource implementation --- .../SparkJobResourceFromWithinK8s.scala | 78 +++++ .../spark/deploy/kubernetes/constants.scala | 5 + .../deploy/kubernetes/tpr/JobState.scala | 33 +++ .../deploy/kubernetes/tpr/JobStateSerDe.scala | 46 +++ .../deploy/kubernetes/tpr/TPRCrudCalls.scala | 271 ++++++++++++++++++ sbin/driver.yaml | 8 + sbin/kubernetes-resource.yaml | 10 + 7 files changed, 451 insertions(+) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResourceFromWithinK8s.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobState.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala create mode 100644 sbin/driver.yaml create mode 100644 sbin/kubernetes-resource.yaml diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResourceFromWithinK8s.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResourceFromWithinK8s.scala new file mode 100644 index 0000000000000..44550cc5fcc48 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResourceFromWithinK8s.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.kubernetes + +import java.nio.file.{Files, Paths} + +import io.fabric8.kubernetes.client.{Config, KubernetesClient} +import scala.util.{Failure, Success, Try} + +import org.apache.spark.deploy.kubernetes.tpr.TPRCrudCalls + +private[spark] class SparkJobResourceClientFromWithinK8s( + client: KubernetesClient) extends TPRCrudCalls { + + private val protocol: String = "https://" + + // we can also get the host from the environment variable + private val kubeHost: String = { + val host = Try(sys.env("KUBERNETES_SERVICE_HOST")) match { + case Success(h) => Option(h) + case Failure(_) => None + } + host.map(h => h).getOrElse { + // Log a warning just in case, but this should almost certainly never happen + logWarning("Error while retrieving k8s host address") + "127.0.0.1" + } + } + + // the port from the environment variable + private val kubeHostPort: String = { + val port = Try(sys.env("KUBERNETES_PORT_443_TCP_PORT")) match { + case Success(p) => Option(p) + case Failure(_) => None + } + port.map(p => p).getOrElse { + // Log a warning just in case, but this should almost certainly never happen + logWarning("Error while retrieving k8s host port") + "8001" + } + } + + // Since this will be running inside a pod + // we can access the pods token and use it with the Authorization header when + // making rest calls to the k8s Api + override protected val kubeToken: Option[String] = { + val path = Paths.get(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH) + Try(new String(Files.readAllBytes(path))) match { + case Success(some) => Option(some) + case Failure(e: Throwable) => logError(s"${e.getMessage}") + None + } + } + + override protected val k8sClient: KubernetesClient = client + override protected val kubeMaster: String = s"$protocol$kubeHost:$kubeHostPort" +} + +private[spark] class SparkJobResourceClientFromOutsideK8s( + client: KubernetesClient) extends TPRCrudCalls { + + override protected val k8sClient: KubernetesClient = client +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index e267c9ff7e1d1..56c5e69ad1205 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -92,4 +92,9 @@ package object constants { private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10 private[spark] val MEMORY_OVERHEAD_MIN = 384L + + // TPR + private[spark] val TPR_API_VERSION = "apache.io/v1" + private[spark] val TPR_API_ENDPOINT = s"apis/%s/namespaces/%s/sparkjobs" + private[spark] val TPR_KIND = "SparkJob" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobState.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobState.scala new file mode 100644 index 0000000000000..6f90194f673ab --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobState.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.kubernetes.tpr + +private[spark] object JobState extends Enumeration { + type JobState = Value + + /* + * QUEUED - Spark Job has been queued to run + * SUBMITTED - Driver Pod deployed but tasks are not yet scheduled on worker pod(s) + * RUNNING - Task(s) have been allocated to worker pod(s) to run and Spark Job is now running + * FINISHED - Spark Job ran and exited cleanly, i.e, worker pod(s) and driver pod were + * gracefully deleted + * FAILED - Spark Job Failed due to error + * KILLED - A user manually killed this Spark Job + */ + val QUEUED, SUBMITTED, RUNNING, FINISHED, FAILED, KILLED = Value +} \ No newline at end of file diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala new file mode 100644 index 0000000000000..16edd19de6e0c --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.kubernetes.tpr + +import org.json4s.{CustomSerializer, JString} +import org.json4s.JsonAST.JNull + +import org.apache.spark.deploy.kubernetes.tpr.JobState.JobState + + /** + * JobState Serializer and Deserializer + * */ +object JobStateSerDe extends CustomSerializer[JobState](_ => + ({ + case JString("SUBMITTED") => JobState.SUBMITTED + case JString("QUEUED") => JobState.QUEUED + case JString("RUNNING") => JobState.RUNNING + case JString("FINISHED") => JobState.FINISHED + case JString("KILLED") => JobState.KILLED + case JString("FAILED") => JobState.FAILED + case JNull => + throw new UnsupportedOperationException("No JobState Specified") + }, { + case JobState.FAILED => JString("FAILED") + case JobState.SUBMITTED => JString("SUBMITTED") + case JobState.KILLED => JString("KILLED") + case JobState.FINISHED => JString("FINISHED") + case JobState.QUEUED => JString("QUEUED") + case JobState.RUNNING => JString("RUNNING") + }) +) \ No newline at end of file diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala new file mode 100644 index 0000000000000..7277412ef8e16 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.tpr + +import java.util.concurrent.{ThreadPoolExecutor, TimeUnit} + +import io.fabric8.kubernetes.client.{BaseClient, KubernetesClient} +import okhttp3.{MediaType, OkHttpClient, Request, RequestBody, Response} +import okio.{Buffer, BufferedSource} +import org.json4s.{DefaultFormats, Formats} +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods.{compact, parse, pretty, render} +import org.json4s.jackson.Serialization.{read, write} +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{blocking, Future, Promise} +import scala.util.control.Breaks.{break, breakable} + +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.internal.Logging +import org.apache.spark.SparkException +import org.apache.spark.util.ThreadUtils + +/** + * Isolated this since the method is called on the client machine + */ + +private[spark] case class Metadata(name: String, + uid: Option[String] = None, + labels: Option[Map[String, String]] = None, + annotations: Option[Map[String, String]] = None) + +private[spark] case class SparkJobState(apiVersion: String, + kind: String, + metadata: Metadata, + spec: Map[String, Any]) + +private[spark] case class WatchObject(`type`: String, `object`: SparkJobState) + +private[spark] abstract class TPRCrudCalls extends Logging { + protected val k8sClient: KubernetesClient + protected val kubeMaster: String = KUBERNETES_MASTER_INTERNAL_URL + protected val kubeToken: Option[String] = None + + implicit val formats: Formats = DefaultFormats + JobStateSerDe + + + protected val httpClient: OkHttpClient = + buildOkhttpClientFromWithinPod(k8sClient.asInstanceOf[BaseClient]) + + protected val namespace: String = k8sClient.getNamespace + private var watchSource: BufferedSource = _ + private lazy val buffer = new Buffer() + protected implicit val ec: ThreadPoolExecutor = ThreadUtils + .newDaemonCachedThreadPool("tpr-watcher-pool") + + private def executeBlocking(cb: => WatchObject): Future[WatchObject] = { + val p = Promise[WatchObject]() + ec.execute(new Runnable { + override def run(): Unit = { + try { + p.trySuccess(blocking(cb)) + } catch { + case e: Throwable => p.tryFailure(e) + } + } + }) + p.future + } + + private def completeRequest(partialReq: Request.Builder): Request = { + kubeToken match { + case Some(tok) => partialReq.addHeader("Authorization", s"Bearer $tok").build() + case None => partialReq.build() + } + } + + private def completeRequestWithExceptionIfNotSuccessful( + requestType: String, + response: Response, + additionalInfo: Option[Seq[String]] = None): Unit = { + + if (!response.isSuccessful) { + response.body().close() + val msg = new ArrayBuffer[String] + msg += s"Failed to $requestType resource." + + additionalInfo match { + case Some(info) => + for (extraMsg <- info) { + msg += extraMsg + } + case None => + } + + val finalMessage = msg.mkString(" ") + logError(finalMessage) + throw new SparkException(finalMessage) + } + } + + def buildOkhttpClientFromWithinPod(client: BaseClient): OkHttpClient = { + val field = classOf[BaseClient].getDeclaredField("httpClient") + try { + field.setAccessible(true) + field.get(client).asInstanceOf[OkHttpClient] + } finally { + field.setAccessible(false) + } + } + + def createJobObject(name: String, keyValuePairs: Map[String, Any]): Unit = { + val resourceObject = + SparkJobState(TPR_API_VERSION, TPR_KIND, Metadata(name), keyValuePairs) + val payload = parse(write(resourceObject)) + + val requestBody = RequestBody + .create(MediaType.parse("application/json"), compact(render(payload))) + val request = completeRequest(new Request.Builder() + .post(requestBody) + .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}")) + + logDebug(s"Create Request: $request") + val response = httpClient.newCall(request).execute() + completeRequestWithExceptionIfNotSuccessful( + "post", + response, + Option(Seq(name, response.toString, compact(render(payload))))) + + response.body().close() + logDebug(s"Successfully posted resource $name: " + + s"${pretty(render(parse(write(resourceObject))))}") + } + + def deleteJobObject(tprObjectName: String): Unit = { + val request = completeRequest(new Request.Builder() + .delete() + .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}/$tprObjectName")) + + logDebug(s"Delete Request: $request") + val response = httpClient.newCall(request).execute() + completeRequestWithExceptionIfNotSuccessful( + "delete", + response, + Option(Seq(tprObjectName, response.message(), request.toString))) + + response.body().close() + logInfo(s"Successfully deleted resource $tprObjectName") + } + + def getJobObject(name: String): SparkJobState = { + val request = completeRequest(new Request.Builder() + .get() + .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}/$name")) + + logDebug(s"Get Request: $request") + val response = httpClient.newCall(request).execute() + completeRequestWithExceptionIfNotSuccessful( + "get", + response, + Option(Seq(name, response.message())) + ) + + logInfo(s"Successfully retrieved resource $name") + read[SparkJobState](response.body().string()) + } + + def updateJobObject(name: String, value: String, fieldPath: String): Unit = { + val payload = List( + ("op" -> "replace") ~ ("path" -> fieldPath) ~ ("value" -> value)) + val requestBody = + RequestBody.create( + MediaType.parse("application/json-patch+json"), + compact(render(payload))) + val request = completeRequest(new Request.Builder() + .patch(requestBody) + .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}/$name")) + + logDebug(s"Update Request: $request") + val response = httpClient.newCall(request).execute() + completeRequestWithExceptionIfNotSuccessful( + "patch", + response, + Option(Seq(name, response.message(), compact(render(payload)))) + ) + + response.body().close() + logDebug(s"Successfully patched resource $name.") + } + + /** + * This method has an helper method that blocks to watch the object. + * The future is completed on a Delete event or source exhaustion. + * This method relies on the assumption of one sparkjob per namespace + */ + def watchJobObject(): Future[WatchObject] = { + val watchClient = httpClient.newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build() + val request = completeRequest(new Request.Builder() + .get() + .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}?watch=true")) + + logDebug(s"Watch Request: $request") + val resp = watchClient.newCall(request).execute() + completeRequestWithExceptionIfNotSuccessful( + "start watch on", + resp, + Option(Seq(resp.code().toString, resp.message()))) + + logInfo(s"Starting watch on jobResource") + watchJobObjectUtil(resp) + } + + /** + * This method has a blocking call - wait on SSE - inside it. + * However it is sent off in a new thread + */ + private def watchJobObjectUtil(response: Response): Future[WatchObject] = { + @volatile var wo: WatchObject = null + watchSource = response.body().source() + executeBlocking { + breakable { + // This will block until there are bytes to read or the source is exhausted. + while (!watchSource.exhausted()) { + watchSource.read(buffer, 8192) match { + case -1 => + cleanUpListener(watchSource, buffer) + throw new SparkException("Source is exhausted and object state is unknown") + case _ => + wo = read[WatchObject](buffer.readUtf8()) + wo match { + case WatchObject("DELETED", w) => + logInfo(s"${w.metadata.name} has been deleted") + cleanUpListener(watchSource, buffer) + case WatchObject(e, _) => logInfo(s"$e event. Still watching") + } + } + } + } + wo + } + } + + private def cleanUpListener(source: BufferedSource, buffer: Buffer): Unit = { + buffer.close() + source.close() + break() + } + + // Serves as a way to interrupt to the watcher thread. + // This closes the source the watcher is reading from and as a result triggers promise completion + def stopWatcher(): Unit = { + if (watchSource != null) { + buffer.close() + watchSource.close() + } + } + +} diff --git a/sbin/driver.yaml b/sbin/driver.yaml new file mode 100644 index 0000000000000..8946ca8403195 --- /dev/null +++ b/sbin/driver.yaml @@ -0,0 +1,8 @@ +apiVersion: "apache.io/v1" +kind: "SparkJob" +metadata: + name: "spark-job-1" +status: + image: "driver-image" + state: "completed" + num-executors: 10 \ No newline at end of file diff --git a/sbin/kubernetes-resource.yaml b/sbin/kubernetes-resource.yaml new file mode 100644 index 0000000000000..0ff3d39b45182 --- /dev/null +++ b/sbin/kubernetes-resource.yaml @@ -0,0 +1,10 @@ +metadata: + name: spark-job.apache.io + labels: + resource: spark-job + object: spark +apiVersion: extensions/v1beta1 +kind: ThirdPartyResource +description: "A resource that manages a spark job" +versions: + - name: v1 \ No newline at end of file From 75057f31df9ea968aa4ddd8cff85190679173930 Mon Sep 17 00:00:00 2001 From: iyanuobidele Date: Fri, 19 May 2017 11:30:21 -0700 Subject: [PATCH 2/5] next iteration with resource creation and watch logic --- {sbin => conf}/driver.yaml | 2 +- {sbin => conf}/kubernetes-resource.yaml | 2 +- .../SparkJobResourceFromWithinK8s.scala | 29 --- .../spark/deploy/kubernetes/constants.scala | 4 +- .../deploy/kubernetes/submit/Client.scala | 27 +++ .../deploy/kubernetes/tpr/JobState.scala | 2 +- .../deploy/kubernetes/tpr/JobStateSerDe.scala | 2 +- .../deploy/kubernetes/tpr/TPRCrudCalls.scala | 182 +++++++++++------- .../KubernetesClusterSchedulerBackend.scala | 72 ++++++- 9 files changed, 208 insertions(+), 114 deletions(-) rename {sbin => conf}/driver.yaml (86%) rename {sbin => conf}/kubernetes-resource.yaml (94%) diff --git a/sbin/driver.yaml b/conf/driver.yaml similarity index 86% rename from sbin/driver.yaml rename to conf/driver.yaml index 8946ca8403195..a1a18e10ae731 100644 --- a/sbin/driver.yaml +++ b/conf/driver.yaml @@ -5,4 +5,4 @@ metadata: status: image: "driver-image" state: "completed" - num-executors: 10 \ No newline at end of file + num-executors: 10 diff --git a/sbin/kubernetes-resource.yaml b/conf/kubernetes-resource.yaml similarity index 94% rename from sbin/kubernetes-resource.yaml rename to conf/kubernetes-resource.yaml index 0ff3d39b45182..ef4f8b25befdb 100644 --- a/sbin/kubernetes-resource.yaml +++ b/conf/kubernetes-resource.yaml @@ -7,4 +7,4 @@ apiVersion: extensions/v1beta1 kind: ThirdPartyResource description: "A resource that manages a spark job" versions: - - name: v1 \ No newline at end of file + - name: v1 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResourceFromWithinK8s.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResourceFromWithinK8s.scala index 44550cc5fcc48..809dee43901dd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResourceFromWithinK8s.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResourceFromWithinK8s.scala @@ -27,34 +27,6 @@ import org.apache.spark.deploy.kubernetes.tpr.TPRCrudCalls private[spark] class SparkJobResourceClientFromWithinK8s( client: KubernetesClient) extends TPRCrudCalls { - private val protocol: String = "https://" - - // we can also get the host from the environment variable - private val kubeHost: String = { - val host = Try(sys.env("KUBERNETES_SERVICE_HOST")) match { - case Success(h) => Option(h) - case Failure(_) => None - } - host.map(h => h).getOrElse { - // Log a warning just in case, but this should almost certainly never happen - logWarning("Error while retrieving k8s host address") - "127.0.0.1" - } - } - - // the port from the environment variable - private val kubeHostPort: String = { - val port = Try(sys.env("KUBERNETES_PORT_443_TCP_PORT")) match { - case Success(p) => Option(p) - case Failure(_) => None - } - port.map(p => p).getOrElse { - // Log a warning just in case, but this should almost certainly never happen - logWarning("Error while retrieving k8s host port") - "8001" - } - } - // Since this will be running inside a pod // we can access the pods token and use it with the Authorization header when // making rest calls to the k8s Api @@ -68,7 +40,6 @@ private[spark] class SparkJobResourceClientFromWithinK8s( } override protected val k8sClient: KubernetesClient = client - override protected val kubeMaster: String = s"$protocol$kubeHost:$kubeHostPort" } private[spark] class SparkJobResourceClientFromOutsideK8s( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 56c5e69ad1205..26de6858de22e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -94,7 +94,7 @@ package object constants { private[spark] val MEMORY_OVERHEAD_MIN = 384L // TPR - private[spark] val TPR_API_VERSION = "apache.io/v1" - private[spark] val TPR_API_ENDPOINT = s"apis/%s/namespaces/%s/sparkjobs" + private[spark] val TPR_API_GROUP = "apache.io" + private[spark] val TPR_API_VERSION = "v1" private[spark] val TPR_KIND = "SparkJob" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index dc8a6da45495e..58308283bdf2e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -21,11 +21,16 @@ import java.util.Collections import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, OwnerReferenceBuilder, PodBuilder} import scala.collection.JavaConverters._ +import scala.util.{Failure, Success, Try} import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.ConfigurationUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.submit.{LoggingPodStatusWatcher, LoggingPodStatusWatcherImpl} +import org.apache.spark.deploy.kubernetes.SparkJobResourceClientFromOutsideK8s +import org.apache.spark.deploy.kubernetes.tpr.JobState +import org.apache.spark.deploy.rest.kubernetes.v2.ResourceStagingServerSslOptionsProviderImpl import org.apache.spark.deploy.rest.kubernetes.ResourceStagingServerSslOptionsProviderImpl import org.apache.spark.internal.Logging import org.apache.spark.launcher.SparkLauncher @@ -73,6 +78,28 @@ private[spark] class Client( private val driverJavaOptions = sparkConf.get( org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) + // create resource of kind - SparkJob representing the deployed spark app + private val sparkJobCtrller = new SparkJobResourceClientFromOutsideK8s( + kubernetesClientProvider.get) + private val jobResourceName = s"sparkJob-${sparkConf.get(KUBERNETES_NAMESPACE)}-$appName" + private val keyValuePairs = Map( + "image" -> driverDockerImage, + "state" -> JobState.QUEUED, + "numExecutors" -> sparkConf.getInt("spark.executor.instances", 1), + "sparkDriver" -> kubernetesDriverPodName + ) + + // Failure might be due to TPR inexistence or maybe we're stuck in the 10 minute lag + // TODO: in the latter case we can attempt a retry depending on the rc + // This also assumes that once we fail at creation, we won't bother trying + // anything on the resource for the lifetime of the app + Try(sparkJobCtrller.createJobObject(jobResourceName, keyValuePairs)) match { + case Success(_) => sparkConf.set("spark.kubernetes.jobResourceSet", "true") + sparkConf.set("spark.kubernetes.jobResourceName", jobResourceName) + case Failure(_: SparkException) => // if e.getMessage startsWith "40" => + sparkConf.set("spark.kubernetes.jobResourceSet", "false") + } + def run(): Unit = { validateNoDuplicateFileNames(sparkJars) validateNoDuplicateFileNames(sparkFiles) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobState.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobState.scala index 6f90194f673ab..e2d0187d3e504 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobState.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobState.scala @@ -30,4 +30,4 @@ private[spark] object JobState extends Enumeration { * KILLED - A user manually killed this Spark Job */ val QUEUED, SUBMITTED, RUNNING, FINISHED, FAILED, KILLED = Value -} \ No newline at end of file +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala index 16edd19de6e0c..2043088d1f3b8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala @@ -43,4 +43,4 @@ object JobStateSerDe extends CustomSerializer[JobState](_ => case JobState.QUEUED => JString("QUEUED") case JobState.RUNNING => JString("RUNNING") }) -) \ No newline at end of file +) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala index 7277412ef8e16..4c0240036b53f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.kubernetes.tpr import java.util.concurrent.{ThreadPoolExecutor, TimeUnit} import io.fabric8.kubernetes.client.{BaseClient, KubernetesClient} -import okhttp3.{MediaType, OkHttpClient, Request, RequestBody, Response} +import okhttp3.{HttpUrl, MediaType, OkHttpClient, Request, RequestBody, Response} import okio.{Buffer, BufferedSource} import org.json4s.{DefaultFormats, Formats} import org.json4s.JsonDSL._ @@ -46,7 +46,7 @@ private[spark] case class Metadata(name: String, private[spark] case class SparkJobState(apiVersion: String, kind: String, metadata: Metadata, - spec: Map[String, Any]) + status: Map[String, Any]) private[spark] case class WatchObject(`type`: String, `object`: SparkJobState) @@ -57,9 +57,8 @@ private[spark] abstract class TPRCrudCalls extends Logging { implicit val formats: Formats = DefaultFormats + JobStateSerDe - protected val httpClient: OkHttpClient = - buildOkhttpClientFromWithinPod(k8sClient.asInstanceOf[BaseClient]) + extractHttpClientFromK8sClient(k8sClient.asInstanceOf[BaseClient]) protected val namespace: String = k8sClient.getNamespace private var watchSource: BufferedSource = _ @@ -67,61 +66,6 @@ private[spark] abstract class TPRCrudCalls extends Logging { protected implicit val ec: ThreadPoolExecutor = ThreadUtils .newDaemonCachedThreadPool("tpr-watcher-pool") - private def executeBlocking(cb: => WatchObject): Future[WatchObject] = { - val p = Promise[WatchObject]() - ec.execute(new Runnable { - override def run(): Unit = { - try { - p.trySuccess(blocking(cb)) - } catch { - case e: Throwable => p.tryFailure(e) - } - } - }) - p.future - } - - private def completeRequest(partialReq: Request.Builder): Request = { - kubeToken match { - case Some(tok) => partialReq.addHeader("Authorization", s"Bearer $tok").build() - case None => partialReq.build() - } - } - - private def completeRequestWithExceptionIfNotSuccessful( - requestType: String, - response: Response, - additionalInfo: Option[Seq[String]] = None): Unit = { - - if (!response.isSuccessful) { - response.body().close() - val msg = new ArrayBuffer[String] - msg += s"Failed to $requestType resource." - - additionalInfo match { - case Some(info) => - for (extraMsg <- info) { - msg += extraMsg - } - case None => - } - - val finalMessage = msg.mkString(" ") - logError(finalMessage) - throw new SparkException(finalMessage) - } - } - - def buildOkhttpClientFromWithinPod(client: BaseClient): OkHttpClient = { - val field = classOf[BaseClient].getDeclaredField("httpClient") - try { - field.setAccessible(true) - field.get(client).asInstanceOf[OkHttpClient] - } finally { - field.setAccessible(false) - } - } - def createJobObject(name: String, keyValuePairs: Map[String, Any]): Unit = { val resourceObject = SparkJobState(TPR_API_VERSION, TPR_KIND, Metadata(name), keyValuePairs) @@ -129,11 +73,16 @@ private[spark] abstract class TPRCrudCalls extends Logging { val requestBody = RequestBody .create(MediaType.parse("application/json"), compact(render(payload))) + + val requestSegments = Seq( + "apis", TPR_API_GROUP, TPR_API_VERSION, "namespaces", namespace, "sparkjobs") + val url = generateHttpUrl(requestSegments) + val request = completeRequest(new Request.Builder() .post(requestBody) - .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}")) + .url(url)) - logDebug(s"Create Request: $request") + logDebug(s"Create SparkJobResource Request: $request") val response = httpClient.newCall(request).execute() completeRequestWithExceptionIfNotSuccessful( "post", @@ -146,9 +95,13 @@ private[spark] abstract class TPRCrudCalls extends Logging { } def deleteJobObject(tprObjectName: String): Unit = { + val requestSegments = Seq( + "apis", TPR_API_GROUP, TPR_API_VERSION, "namespaces", namespace, "sparkjobs", tprObjectName) + val url = generateHttpUrl(requestSegments) + val request = completeRequest(new Request.Builder() .delete() - .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}/$tprObjectName")) + .url(url)) logDebug(s"Delete Request: $request") val response = httpClient.newCall(request).execute() @@ -162,9 +115,13 @@ private[spark] abstract class TPRCrudCalls extends Logging { } def getJobObject(name: String): SparkJobState = { + val requestSegments = Seq( + "apis", TPR_API_GROUP, TPR_API_VERSION, "namespaces", namespace, "sparkjobs", name) + val url = generateHttpUrl(requestSegments) + val request = completeRequest(new Request.Builder() .get() - .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}/$name")) + .url(url)) logDebug(s"Get Request: $request") val response = httpClient.newCall(request).execute() @@ -185,9 +142,14 @@ private[spark] abstract class TPRCrudCalls extends Logging { RequestBody.create( MediaType.parse("application/json-patch+json"), compact(render(payload))) + + val requestSegments = Seq( + "apis", TPR_API_GROUP, TPR_API_VERSION, "namespaces", namespace, "sparkjobs", name) + val url = generateHttpUrl(requestSegments) + val request = completeRequest(new Request.Builder() .patch(requestBody) - .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}/$name")) + .url(url)) logDebug(s"Update Request: $request") val response = httpClient.newCall(request).execute() @@ -201,16 +163,20 @@ private[spark] abstract class TPRCrudCalls extends Logging { logDebug(s"Successfully patched resource $name.") } - /** - * This method has an helper method that blocks to watch the object. - * The future is completed on a Delete event or source exhaustion. - * This method relies on the assumption of one sparkjob per namespace - */ +/** + * This method has an helper method that blocks to watch the object. + * The future is completed on a Delete event or source exhaustion. + * This method relies on the assumption of one sparkjob per namespace + */ def watchJobObject(): Future[WatchObject] = { val watchClient = httpClient.newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build() + val requestSegments = Seq( + "apis", TPR_API_GROUP, TPR_API_VERSION, "namespaces", namespace, "sparkjobs") + val url = generateHttpUrl(requestSegments, Seq(("watch", "true"))) + val request = completeRequest(new Request.Builder() .get() - .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}?watch=true")) + .url(url)) logDebug(s"Watch Request: $request") val resp = watchClient.newCall(request).execute() @@ -223,10 +189,10 @@ private[spark] abstract class TPRCrudCalls extends Logging { watchJobObjectUtil(resp) } - /** - * This method has a blocking call - wait on SSE - inside it. - * However it is sent off in a new thread - */ +/** + * This method has a blocking call - wait on SSE - inside it. + * However it is sent off in a new thread + */ private def watchJobObjectUtil(response: Response): Future[WatchObject] = { @volatile var wo: WatchObject = null watchSource = response.body().source() @@ -268,4 +234,72 @@ private[spark] abstract class TPRCrudCalls extends Logging { } } + private def completeRequest(partialReq: Request.Builder): Request = { + kubeToken match { + case Some(tok) => partialReq.addHeader("Authorization", s"Bearer $tok").build() + case None => partialReq.build() + } + } + + private def generateHttpUrl(urlSegments: Seq[String], + querySegments: Seq[(String, String)] = Seq.empty[(String, String)]): HttpUrl = { + + val urlBuilder = HttpUrl.parse(kubeMaster).newBuilder + urlSegments map { pathSegment => + urlBuilder.addPathSegment(pathSegment) + } + querySegments map { + case (query, value) => urlBuilder.addQueryParameter(query, value) + } + urlBuilder.build() + } + + private def completeRequestWithExceptionIfNotSuccessful( + requestType: String, + response: Response, + additionalInfo: Option[Seq[String]] = None): Unit = { + + if (!response.isSuccessful) { + response.body().close() + val msg = new ArrayBuffer[String] + msg += s"Failed to $requestType resource." + + additionalInfo match { + case Some(info) => + for (extraMsg <- info) { + msg += extraMsg + } + case None => + } + + val finalMessage = msg.mkString(" ") + logError(finalMessage) + throw new SparkException(finalMessage) + } + } + + def extractHttpClientFromK8sClient(client: BaseClient): OkHttpClient = { + val field = classOf[BaseClient].getDeclaredField("httpClient") + try { + field.setAccessible(true) + field.get(client).asInstanceOf[OkHttpClient] + } finally { + field.setAccessible(false) + } + } + + private def executeBlocking(cb: => WatchObject): Future[WatchObject] = { + val p = Promise[WatchObject]() + ec.execute(new Runnable { + override def run(): Unit = { + try { + p.trySuccess(blocking(cb)) + } catch { + case e: Throwable => p.tryFailure(e) + } + } + }) + p.future + } + } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 257cee80fdea9..1809276b0e0df 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -20,18 +20,21 @@ import java.io.Closeable import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} -import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder} -import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} -import io.fabric8.kubernetes.client.Watcher.Action -import org.apache.commons.io.FilenameUtils import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success, Try} + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import org.apache.commons.io.FilenameUtils import org.apache.spark.{SparkContext, SparkEnv, SparkException} -import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, SparkPodInitContainerBootstrap} +import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, SparkJobResourceClientFromWithinK8s, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.tpr.{JobState, WatchObject} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient import org.apache.spark.rpc.{RpcCallContext, RpcEndpointAddress, RpcEnv} @@ -201,6 +204,15 @@ private[spark] class KubernetesClusterSchedulerBackend( sc.env.securityManager.isAuthenticationEnabled(), sc.env.securityManager.isSaslEncryptionEnabled()) } + private var workingWithSparkJobResource = + conf.get("spark.kubernetes.jobResourceSet", "false").toBoolean + + private val jobResourceName = conf.get("spark.kubernetes.jobResourceName", "") + + private val resourceWatcherPool = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonFixedThreadPool(2, "resource-watcher-pool")) + + private val sparkJobResourceCtrller = new SparkJobResourceClientFromWithinK8s(kubernetesClient) private def getInitialTargetExecutorNumber(defaultNumExecutors: Int = 1): Int = { if (Utils.isDynamicAllocationEnabled(conf)) { @@ -218,6 +230,55 @@ private[spark] class KubernetesClusterSchedulerBackend( } + private def updateAndStartWatchOnResource(): Unit = { + if (workingWithSparkJobResource) { + logInfo(s"Updating Job Resource with name. $jobResourceName") + Try(sparkJobResourceCtrller + .updateJobObject(jobResourceName, JobState.SUBMITTED.toString, "/spec/state")) match { + case Success(_) => startWatcher()(resourceWatcherPool) + case Failure(e: SparkException) if e.getMessage startsWith "404" => + logWarning(s"Possible deletion of jobResource before backend start") + workingWithSparkJobResource = false + case Failure(e: SparkException) => + logWarning(s"SparkJob object not updated. ${e.getMessage}") + // SparkJob should continue if this fails as discussed + // Maybe some retry + backoff mechanism ? + } + } + } + + private def startWatcher()(implicit ec: ExecutionContext): Unit = { + ec.execute(new Runnable { + override def run(): Unit = { + sparkJobResourceCtrller.watchJobObject() onComplete { + case Success(w: WatchObject) if w.`type` == "DELETED" => + logInfo("TPR Object deleted externally. Cleaning up") + stop() + // TODO: are there other todo's for a clean kill while job is running? + case Success(w: WatchObject) => + // Log a warning just in case, but this should almost certainly never happen + logWarning(s"Unexpected response received. $w") + deleteJobResource() + workingWithSparkJobResource = false + case Failure(e: Throwable) => + logWarning(e.getMessage) + deleteJobResource() + workingWithSparkJobResource = false // in case watcher fails early on + } + } + }) + } + + private def deleteJobResource(): Unit = { + try { + sparkJobResourceCtrller.deleteJobObject(jobResourceName) + } catch { + case e: SparkException => + logError(s"SparkJob object not deleted. ${e.getMessage}") + // what else do we need to do here ? + } + } + override def applicationId(): String = conf.get("spark.app.id", super.applicationId()) override def sufficientResourcesRegistered(): Boolean = { @@ -248,6 +309,7 @@ private[spark] class KubernetesClusterSchedulerBackend( allocator.shutdown() shufflePodCache.foreach(_.stop()) kubernetesExternalShuffleClient.foreach(_.close()) + resourceWatcherPool.shutdown() // send stop message to executors so they shut down cleanly super.stop() From e21f3b56ef7b6ad45959fe8dfd8763154526320d Mon Sep 17 00:00:00 2001 From: iyanuobidele Date: Tue, 23 May 2017 15:23:33 -0700 Subject: [PATCH 3/5] addressing comments --- conf/driver.yaml | 2 +- conf/kubernetes-resource.yaml | 2 +- .../SparkJobResourceFromWithinK8s.scala | 49 ------------------- .../spark/deploy/kubernetes/constants.scala | 2 +- .../deploy/kubernetes/submit/Client.scala | 12 ++--- .../deploy/kubernetes/tpr/JobStateSerDe.scala | 8 +-- .../deploy/kubernetes/tpr/TPRCrudCalls.scala | 40 ++++++++------- .../KubernetesClusterSchedulerBackend.scala | 6 +-- 8 files changed, 36 insertions(+), 85 deletions(-) delete mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResourceFromWithinK8s.scala diff --git a/conf/driver.yaml b/conf/driver.yaml index a1a18e10ae731..297945a127471 100644 --- a/conf/driver.yaml +++ b/conf/driver.yaml @@ -1,4 +1,4 @@ -apiVersion: "apache.io/v1" +apiVersion: "apache.org/v1" kind: "SparkJob" metadata: name: "spark-job-1" diff --git a/conf/kubernetes-resource.yaml b/conf/kubernetes-resource.yaml index ef4f8b25befdb..4e606dabfcdbe 100644 --- a/conf/kubernetes-resource.yaml +++ b/conf/kubernetes-resource.yaml @@ -1,5 +1,5 @@ metadata: - name: spark-job.apache.io + name: spark-job.apache.org labels: resource: spark-job object: spark diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResourceFromWithinK8s.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResourceFromWithinK8s.scala deleted file mode 100644 index 809dee43901dd..0000000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResourceFromWithinK8s.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.kubernetes - -import java.nio.file.{Files, Paths} - -import io.fabric8.kubernetes.client.{Config, KubernetesClient} -import scala.util.{Failure, Success, Try} - -import org.apache.spark.deploy.kubernetes.tpr.TPRCrudCalls - -private[spark] class SparkJobResourceClientFromWithinK8s( - client: KubernetesClient) extends TPRCrudCalls { - - // Since this will be running inside a pod - // we can access the pods token and use it with the Authorization header when - // making rest calls to the k8s Api - override protected val kubeToken: Option[String] = { - val path = Paths.get(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH) - Try(new String(Files.readAllBytes(path))) match { - case Success(some) => Option(some) - case Failure(e: Throwable) => logError(s"${e.getMessage}") - None - } - } - - override protected val k8sClient: KubernetesClient = client -} - -private[spark] class SparkJobResourceClientFromOutsideK8s( - client: KubernetesClient) extends TPRCrudCalls { - - override protected val k8sClient: KubernetesClient = client -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 26de6858de22e..dc07430488148 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -94,7 +94,7 @@ package object constants { private[spark] val MEMORY_OVERHEAD_MIN = 384L // TPR - private[spark] val TPR_API_GROUP = "apache.io" + private[spark] val TPR_API_GROUP = "apache.org" private[spark] val TPR_API_VERSION = "v1" private[spark] val TPR_KIND = "SparkJob" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index 58308283bdf2e..7dbecf615f080 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -27,10 +27,7 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.ConfigurationUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.submit.{LoggingPodStatusWatcher, LoggingPodStatusWatcherImpl} -import org.apache.spark.deploy.kubernetes.SparkJobResourceClientFromOutsideK8s -import org.apache.spark.deploy.kubernetes.tpr.JobState -import org.apache.spark.deploy.rest.kubernetes.v2.ResourceStagingServerSslOptionsProviderImpl +import org.apache.spark.deploy.kubernetes.tpr.{JobState, TPRCrudCalls} import org.apache.spark.deploy.rest.kubernetes.ResourceStagingServerSslOptionsProviderImpl import org.apache.spark.internal.Logging import org.apache.spark.launcher.SparkLauncher @@ -79,10 +76,9 @@ private[spark] class Client( org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) // create resource of kind - SparkJob representing the deployed spark app - private val sparkJobCtrller = new SparkJobResourceClientFromOutsideK8s( - kubernetesClientProvider.get) + private val sparkJobCtrller = new TPRCrudCalls(kubernetesClientProvider.get) private val jobResourceName = s"sparkJob-${sparkConf.get(KUBERNETES_NAMESPACE)}-$appName" - private val keyValuePairs = Map( + private val statusMap = Map( "image" -> driverDockerImage, "state" -> JobState.QUEUED, "numExecutors" -> sparkConf.getInt("spark.executor.instances", 1), @@ -93,7 +89,7 @@ private[spark] class Client( // TODO: in the latter case we can attempt a retry depending on the rc // This also assumes that once we fail at creation, we won't bother trying // anything on the resource for the lifetime of the app - Try(sparkJobCtrller.createJobObject(jobResourceName, keyValuePairs)) match { + Try(sparkJobCtrller.createJobObject(jobResourceName, statusMap)) match { case Success(_) => sparkConf.set("spark.kubernetes.jobResourceSet", "true") sparkConf.set("spark.kubernetes.jobResourceName", jobResourceName) case Failure(_: SparkException) => // if e.getMessage startsWith "40" => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala index 2043088d1f3b8..41ecd9f82cbba 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala @@ -22,10 +22,10 @@ import org.json4s.JsonAST.JNull import org.apache.spark.deploy.kubernetes.tpr.JobState.JobState - /** - * JobState Serializer and Deserializer - * */ -object JobStateSerDe extends CustomSerializer[JobState](_ => +/** + * JobState Serializer and Deserializer + */ +private[spark] object JobStateSerDe extends CustomSerializer[JobState](_ => ({ case JString("SUBMITTED") => JobState.SUBMITTED case JString("QUEUED") => JobState.QUEUED diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala index 4c0240036b53f..9343c11991fbf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala @@ -34,36 +34,40 @@ import org.apache.spark.internal.Logging import org.apache.spark.SparkException import org.apache.spark.util.ThreadUtils -/** - * Isolated this since the method is called on the client machine - */ - private[spark] case class Metadata(name: String, - uid: Option[String] = None, - labels: Option[Map[String, String]] = None, - annotations: Option[Map[String, String]] = None) + uid: Option[String] = None, + labels: Option[Map[String, String]] = None, + annotations: Option[Map[String, String]] = None) private[spark] case class SparkJobState(apiVersion: String, - kind: String, - metadata: Metadata, - status: Map[String, Any]) + kind: String, + metadata: Metadata, + status: Map[String, Any]) private[spark] case class WatchObject(`type`: String, `object`: SparkJobState) -private[spark] abstract class TPRCrudCalls extends Logging { - protected val k8sClient: KubernetesClient - protected val kubeMaster: String = KUBERNETES_MASTER_INTERNAL_URL - protected val kubeToken: Option[String] = None +/** + * CRUD + Watch Operations on SparkJob Resource + * + * This class contains all CRUD+Watch implementations performed + * on the SparkJob Resource used to expose the state of a spark job + * in kubernetes (visible via kubectl or through the k8s dashboard). + * + * + */ +private[spark] class TPRCrudCalls(k8sClient: KubernetesClient, + kubeToken: Option[String] = None) extends Logging { + private val kubeMaster: String = KUBERNETES_MASTER_INTERNAL_URL implicit val formats: Formats = DefaultFormats + JobStateSerDe - protected val httpClient: OkHttpClient = + private val httpClient: OkHttpClient = extractHttpClientFromK8sClient(k8sClient.asInstanceOf[BaseClient]) - protected val namespace: String = k8sClient.getNamespace + private val namespace: String = k8sClient.getNamespace private var watchSource: BufferedSource = _ private lazy val buffer = new Buffer() - protected implicit val ec: ThreadPoolExecutor = ThreadUtils + private implicit val ec: ThreadPoolExecutor = ThreadUtils .newDaemonCachedThreadPool("tpr-watcher-pool") def createJobObject(name: String, keyValuePairs: Map[String, Any]): Unit = { @@ -166,7 +170,7 @@ private[spark] abstract class TPRCrudCalls extends Logging { /** * This method has an helper method that blocks to watch the object. * The future is completed on a Delete event or source exhaustion. - * This method relies on the assumption of one sparkjob per namespace + * This method also relies on the assumption of one sparkjob per namespace */ def watchJobObject(): Future[WatchObject] = { val watchClient = httpClient.newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 1809276b0e0df..edbba35469192 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -31,10 +31,10 @@ import io.fabric8.kubernetes.client.Watcher.Action import org.apache.commons.io.FilenameUtils import org.apache.spark.{SparkContext, SparkEnv, SparkException} -import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, SparkJobResourceClientFromWithinK8s, SparkPodInitContainerBootstrap} +import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.tpr.{JobState, WatchObject} +import org.apache.spark.deploy.kubernetes.tpr.{JobState, TPRCrudCalls, WatchObject} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient import org.apache.spark.rpc.{RpcCallContext, RpcEndpointAddress, RpcEnv} @@ -212,7 +212,7 @@ private[spark] class KubernetesClusterSchedulerBackend( private val resourceWatcherPool = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonFixedThreadPool(2, "resource-watcher-pool")) - private val sparkJobResourceCtrller = new SparkJobResourceClientFromWithinK8s(kubernetesClient) + private val sparkJobResourceCtrller = new TPRCrudCalls(kubernetesClient) private def getInitialTargetExecutorNumber(defaultNumExecutors: Int = 1): Int = { if (Utils.isDynamicAllocationEnabled(conf)) { From 706de99fb703bb61f2c094a396a0a4415630c2d4 Mon Sep 17 00:00:00 2001 From: Anirudh Date: Thu, 25 May 2017 04:19:22 -0700 Subject: [PATCH 4/5] Added basic support for various status functions --- conf/driver.yaml | 8 -- conf/kubernetes-custom-resource.yaml | 27 ++++++ conf/kubernetes-resource.yaml | 10 --- .../spark/deploy/kubernetes/constants.scala | 14 ++++ .../deploy/kubernetes/submit/Client.scala | 22 +++-- .../deploy/kubernetes/tpr/TPRCrudCalls.scala | 4 +- .../KubernetesClusterSchedulerBackend.scala | 83 +++++++------------ 7 files changed, 87 insertions(+), 81 deletions(-) delete mode 100644 conf/driver.yaml create mode 100644 conf/kubernetes-custom-resource.yaml delete mode 100644 conf/kubernetes-resource.yaml diff --git a/conf/driver.yaml b/conf/driver.yaml deleted file mode 100644 index 297945a127471..0000000000000 --- a/conf/driver.yaml +++ /dev/null @@ -1,8 +0,0 @@ -apiVersion: "apache.org/v1" -kind: "SparkJob" -metadata: - name: "spark-job-1" -status: - image: "driver-image" - state: "completed" - num-executors: 10 diff --git a/conf/kubernetes-custom-resource.yaml b/conf/kubernetes-custom-resource.yaml new file mode 100644 index 0000000000000..317d77c500381 --- /dev/null +++ b/conf/kubernetes-custom-resource.yaml @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +metadata: + name: spark-job.apache.org + labels: + resource: spark-job + object: spark +apiVersion: extensions/v1beta1 +kind: ThirdPartyResource +description: "A resource that reports status of a spark job" +versions: + - name: v1 diff --git a/conf/kubernetes-resource.yaml b/conf/kubernetes-resource.yaml deleted file mode 100644 index 4e606dabfcdbe..0000000000000 --- a/conf/kubernetes-resource.yaml +++ /dev/null @@ -1,10 +0,0 @@ -metadata: - name: spark-job.apache.org - labels: - resource: spark-job - object: spark -apiVersion: extensions/v1beta1 -kind: ThirdPartyResource -description: "A resource that manages a spark job" -versions: - - name: v1 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index dc07430488148..a0a231f8b3b2b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -97,4 +97,18 @@ package object constants { private[spark] val TPR_API_GROUP = "apache.org" private[spark] val TPR_API_VERSION = "v1" private[spark] val TPR_KIND = "SparkJob" + + // SparkJob Status + private[spark] val STATUS_CREATION_TIMESTAMP = "creationTimeStamp" + private[spark] val STATUS_COMPLETION_TIMESTAMP = "completionTimeStamp" + private[spark] val STATUS_DRIVER = "sparkDriver" + private[spark] val STATUS_DRIVER_IMAGE = "driverImage" + private[spark] val STATUS_EXECUTOR_IMAGE = "executorImage" + private[spark] val STATUS_JOB_STATE = "jobState" + private[spark] val STATUS_DESIRED_EXECUTORS = "desiredExecutors" + private[spark] val STATUS_CURRENT_EXECUTORS = "currentExecutors" + private[spark] val STATUS_DRIVER_UI = "driverUi" + private[spark] val STATUS_NOT_AVAILABLE = "N/A" + private[spark] val STATUS_PENDING = "Pending" + } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index 7dbecf615f080..d41cf56fba02d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.kubernetes.submit import java.io.File -import java.util.Collections +import java.util.{Calendar, Collections} import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, OwnerReferenceBuilder, PodBuilder} import scala.collection.JavaConverters._ @@ -76,22 +76,26 @@ private[spark] class Client( org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) // create resource of kind - SparkJob representing the deployed spark app - private val sparkJobCtrller = new TPRCrudCalls(kubernetesClientProvider.get) - private val jobResourceName = s"sparkJob-${sparkConf.get(KUBERNETES_NAMESPACE)}-$appName" + private val sparkJobController = new TPRCrudCalls(kubernetesClientProvider.get) private val statusMap = Map( - "image" -> driverDockerImage, - "state" -> JobState.QUEUED, - "numExecutors" -> sparkConf.getInt("spark.executor.instances", 1), - "sparkDriver" -> kubernetesDriverPodName + STATUS_CREATION_TIMESTAMP -> Calendar.getInstance().getTime().toString(), + STATUS_COMPLETION_TIMESTAMP -> STATUS_NOT_AVAILABLE, + STATUS_DRIVER -> kubernetesDriverPodName, + STATUS_DRIVER_IMAGE -> driverDockerImage, + STATUS_EXECUTOR_IMAGE -> sparkConf.get(EXECUTOR_DOCKER_IMAGE), + STATUS_JOB_STATE -> JobState.QUEUED, + STATUS_DESIRED_EXECUTORS -> sparkConf.getInt("spark.executor.instances", 1), + STATUS_CURRENT_EXECUTORS -> 0, + STATUS_DRIVER_UI -> STATUS_PENDING ) // Failure might be due to TPR inexistence or maybe we're stuck in the 10 minute lag // TODO: in the latter case we can attempt a retry depending on the rc // This also assumes that once we fail at creation, we won't bother trying // anything on the resource for the lifetime of the app - Try(sparkJobCtrller.createJobObject(jobResourceName, statusMap)) match { + Try(sparkJobController.createJobObject(kubernetesAppId, statusMap)) match { case Success(_) => sparkConf.set("spark.kubernetes.jobResourceSet", "true") - sparkConf.set("spark.kubernetes.jobResourceName", jobResourceName) + sparkConf.set("spark.kubernetes.jobResourceName", kubernetesAppId) case Failure(_: SparkException) => // if e.getMessage startsWith "40" => sparkConf.set("spark.kubernetes.jobResourceSet", "false") } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala index 9343c11991fbf..42ca57b163f58 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala @@ -57,7 +57,7 @@ private[spark] case class WatchObject(`type`: String, `object`: SparkJobState) */ private[spark] class TPRCrudCalls(k8sClient: KubernetesClient, kubeToken: Option[String] = None) extends Logging { - private val kubeMaster: String = KUBERNETES_MASTER_INTERNAL_URL + private val kubeMaster: String = k8sClient.getMasterUrl().toString implicit val formats: Formats = DefaultFormats + JobStateSerDe @@ -72,7 +72,7 @@ private[spark] class TPRCrudCalls(k8sClient: KubernetesClient, def createJobObject(name: String, keyValuePairs: Map[String, Any]): Unit = { val resourceObject = - SparkJobState(TPR_API_VERSION, TPR_KIND, Metadata(name), keyValuePairs) + SparkJobState(s"$TPR_API_GROUP/$TPR_API_VERSION", TPR_KIND, Metadata(name), keyValuePairs) val payload = parse(write(resourceObject)) val requestBody = RequestBody diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index edbba35469192..fbc940d2c196d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -17,11 +17,13 @@ package org.apache.spark.scheduler.cluster.kubernetes import java.io.Closeable +import java.util.Calendar import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success, Try} @@ -41,6 +43,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointAddress, RpcEnv} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, SparkAppConfig} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.ui.SparkUI import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class KubernetesClusterSchedulerBackend( @@ -194,6 +197,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } } } + updateStatus(STATUS_CURRENT_EXECUTORS, totalRegisteredExecutors.get()) } } @@ -208,11 +212,12 @@ private[spark] class KubernetesClusterSchedulerBackend( conf.get("spark.kubernetes.jobResourceSet", "false").toBoolean private val jobResourceName = conf.get("spark.kubernetes.jobResourceName", "") - - private val resourceWatcherPool = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonFixedThreadPool(2, "resource-watcher-pool")) - - private val sparkJobResourceCtrller = new TPRCrudCalls(kubernetesClient) + private val sparkJobResourceController = + if (conf.get("spark.kubernetes.jobResourceSet", "false").toBoolean) { + Some(new TPRCrudCalls(kubernetesClient)) + } else { + None + } private def getInitialTargetExecutorNumber(defaultNumExecutors: Int = 1): Int = { if (Utils.isDynamicAllocationEnabled(conf)) { @@ -230,55 +235,20 @@ private[spark] class KubernetesClusterSchedulerBackend( } - private def updateAndStartWatchOnResource(): Unit = { - if (workingWithSparkJobResource) { - logInfo(s"Updating Job Resource with name. $jobResourceName") - Try(sparkJobResourceCtrller - .updateJobObject(jobResourceName, JobState.SUBMITTED.toString, "/spec/state")) match { - case Success(_) => startWatcher()(resourceWatcherPool) - case Failure(e: SparkException) if e.getMessage startsWith "404" => - logWarning(s"Possible deletion of jobResource before backend start") - workingWithSparkJobResource = false - case Failure(e: SparkException) => - logWarning(s"SparkJob object not updated. ${e.getMessage}") - // SparkJob should continue if this fails as discussed - // Maybe some retry + backoff mechanism ? - } - } - } - - private def startWatcher()(implicit ec: ExecutionContext): Unit = { - ec.execute(new Runnable { - override def run(): Unit = { - sparkJobResourceCtrller.watchJobObject() onComplete { - case Success(w: WatchObject) if w.`type` == "DELETED" => - logInfo("TPR Object deleted externally. Cleaning up") - stop() - // TODO: are there other todo's for a clean kill while job is running? - case Success(w: WatchObject) => - // Log a warning just in case, but this should almost certainly never happen - logWarning(s"Unexpected response received. $w") - deleteJobResource() - workingWithSparkJobResource = false - case Failure(e: Throwable) => - logWarning(e.getMessage) - deleteJobResource() - workingWithSparkJobResource = false // in case watcher fails early on - } - } + private def updateStatus(key: String, value: Any): Unit = { + logInfo(s"Updating Job Resource with name. $jobResourceName") + sparkJobResourceController.foreach(controller => + Try( + controller.updateJobObject(jobResourceName, value.toString, s"/status/$key")) match { + case Success(_) => + logInfo(s"Updated Job Resource with name. $jobResourceName") + case Failure(e: SparkException) if e.getMessage startsWith "404" => + logWarning(s"Possible deletion of jobResource before backend start") + case Failure(e: Exception) => + logWarning(s"SparkJob object not updated. ${e.getMessage}") }) } - private def deleteJobResource(): Unit = { - try { - sparkJobResourceCtrller.deleteJobObject(jobResourceName) - } catch { - case e: SparkException => - logError(s"SparkJob object not deleted. ${e.getMessage}") - // what else do we need to do here ? - } - } - override def applicationId(): String = conf.get("spark.app.id", super.applicationId()) override def sufficientResourcesRegistered(): Boolean = { @@ -287,6 +257,11 @@ private[spark] class KubernetesClusterSchedulerBackend( override def start(): Unit = { super.start() + + updateStatus(STATUS_JOB_STATE, JobState.RUNNING) + updateStatus(STATUS_DRIVER_UI, "http://localhost:8001/api/v1/namespaces" + + s"/default/pods/$kubernetesDriverPodName:${SparkUI.getUIPort(conf)}/proxy/") + executorWatchResource.set(kubernetesClient.pods().withLabel(SPARK_APP_ID_LABEL, applicationId()) .watch(new ExecutorPodsWatcher())) @@ -309,10 +284,13 @@ private[spark] class KubernetesClusterSchedulerBackend( allocator.shutdown() shufflePodCache.foreach(_.stop()) kubernetesExternalShuffleClient.foreach(_.close()) - resourceWatcherPool.shutdown() // send stop message to executors so they shut down cleanly super.stop() + updateStatus(STATUS_CURRENT_EXECUTORS, STATUS_NOT_AVAILABLE) + updateStatus(STATUS_DRIVER_UI, STATUS_NOT_AVAILABLE) + updateStatus(STATUS_COMPLETION_TIMESTAMP, Calendar.getInstance().getTime().toString()) + updateStatus(STATUS_JOB_STATE, JobState.FINISHED) // then delete the executor pods // TODO investigate why Utils.tryLogNonFatalError() doesn't work in this context. @@ -472,6 +450,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] { + updateStatus(STATUS_DESIRED_EXECUTORS, requestedTotal) totalExpectedExecutors.set(requestedTotal) true } From f762f85808366324cc754fbe16784ba5ce6b3bbc Mon Sep 17 00:00:00 2001 From: iyanuobidele Date: Fri, 26 May 2017 15:30:09 -0700 Subject: [PATCH 5/5] address more comments --- docs/running-on-kubernetes.md | 39 +++ .../spark/deploy/kubernetes/config.scala | 15 + .../spark/deploy/kubernetes/constants.scala | 9 - .../deploy/kubernetes/submit/Client.scala | 45 +-- ...scala => sparkJobResourceController.scala} | 275 +++++++++--------- .../KubernetesClusterSchedulerBackend.scala | 93 +++--- .../kubernetes/submit/ClientV2Suite.scala | 18 +- 7 files changed, 290 insertions(+), 204 deletions(-) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/{TPRCrudCalls.scala => sparkJobResourceController.scala} (53%) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 488efbe5eef36..f8690c734447c 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -210,6 +210,45 @@ the command may then look like the following: --conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service,spark-version=2.1.0" \ local:///opt/spark/examples/jars/spark_examples_2.11-2.2.0.jar 10 400000 2 +## ThirdPartyResources for visibility into state of deployed Spark job + +In order to expose the state of a deployed spark job to a kubernetes administrator or user, via the kubectl or the +kubernetes dashboard, we have added a kubernetes Resource (of kind: SparkJob) storing pertinent information +related to a specific spark job. + +Using this, we can view current and all past (if not already cleaned up) deployed spark apps within the +current namespace using `kubectl` like so: + + kubectl get sparkjobs + +Or via the kubernetes dashboard using the link as provided by: + + kubectl cluster-info + + +### Prerequisites + +Note that this resource is dependent on extending the kubernetes API using a +[ThirdPartyResource (TPR)](https://kubernetes.io/docs/tasks/access-kubernetes-api/extend-api-third-party-resource/). + +TPRs are available in K8s API as of v1.5 + +See conf/kubernetes-custom-resource.yaml for the recommended yaml file. From the spark base directory, +we can create the recommended TPR like so: + + kubectl create -f conf/kubernetes-custom-resource.yaml + +### Important Things to note + +TPRs are an alpha feature that might not be available in every cluster. +TPRs need to be manually cleaned up because garbage collection support does not exist for them yet. + +### Future work + +Kube administrators or users would be able to stop a spark app running in their cluster by simply +deleting the attached resource. + + ## Advanced ### Securing the Resource Staging Server with TLS diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index d1341b15afaca..6f3962b21af89 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -271,6 +271,21 @@ package object config extends Logging { .longConf .createWithDefault(1) + private[spark] val KUBERNETES_JOB_RESOURCE_NAME = + ConfigBuilder("spark.kubernetes.statusReporting.resourceName") + .doc("Name of SparkJob Resource attached to the said spark job. ") + .internal() + .stringConf + .createOptional + + private[spark] val KUBERNETES_JOB_RESOURCE_ENABLED = + ConfigBuilder("spark.kubernetes.statusReporting.enabled") + .doc("This is set to true when creation of SparkJob resource is successful" + + " which directly means we need to keep the resource updated") + .internal() + .booleanConf + .createWithDefault(false) + private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.kubernetes.submission.waitAppCompletion") .doc("In cluster mode, whether to wait for the application to finish before exiting the" + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index a0a231f8b3b2b..27e532415424a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -99,15 +99,6 @@ package object constants { private[spark] val TPR_KIND = "SparkJob" // SparkJob Status - private[spark] val STATUS_CREATION_TIMESTAMP = "creationTimeStamp" - private[spark] val STATUS_COMPLETION_TIMESTAMP = "completionTimeStamp" - private[spark] val STATUS_DRIVER = "sparkDriver" - private[spark] val STATUS_DRIVER_IMAGE = "driverImage" - private[spark] val STATUS_EXECUTOR_IMAGE = "executorImage" - private[spark] val STATUS_JOB_STATE = "jobState" - private[spark] val STATUS_DESIRED_EXECUTORS = "desiredExecutors" - private[spark] val STATUS_CURRENT_EXECUTORS = "currentExecutors" - private[spark] val STATUS_DRIVER_UI = "driverUi" private[spark] val STATUS_NOT_AVAILABLE = "N/A" private[spark] val STATUS_PENDING = "Pending" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index d41cf56fba02d..f2e9cdf765caf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -17,7 +17,8 @@ package org.apache.spark.deploy.kubernetes.submit import java.io.File -import java.util.{Calendar, Collections} +import java.text.SimpleDateFormat +import java.util.{Collections, Date} import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, OwnerReferenceBuilder, PodBuilder} import scala.collection.JavaConverters._ @@ -27,7 +28,7 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.ConfigurationUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.tpr.{JobState, TPRCrudCalls} +import org.apache.spark.deploy.kubernetes.tpr.{sparkJobResourceController, sparkJobResourceControllerImpl, JobState, Status} import org.apache.spark.deploy.rest.kubernetes.ResourceStagingServerSslOptionsProviderImpl import org.apache.spark.internal.Logging import org.apache.spark.launcher.SparkLauncher @@ -55,7 +56,8 @@ private[spark] class Client( kubernetesClientProvider: SubmissionKubernetesClientProvider, initContainerComponentsProvider: DriverInitContainerComponentsProvider, kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider, - loggingPodStatusWatcher: LoggingPodStatusWatcher) + loggingPodStatusWatcher: LoggingPodStatusWatcher, + sparkJobResourceController: sparkJobResourceController) extends Logging { private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME) @@ -74,30 +76,30 @@ private[spark] class Client( org.apache.spark.internal.config.DRIVER_CLASS_PATH) private val driverJavaOptions = sparkConf.get( org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) + private val resourceTimeFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.S'Z'") // create resource of kind - SparkJob representing the deployed spark app - private val sparkJobController = new TPRCrudCalls(kubernetesClientProvider.get) - private val statusMap = Map( - STATUS_CREATION_TIMESTAMP -> Calendar.getInstance().getTime().toString(), - STATUS_COMPLETION_TIMESTAMP -> STATUS_NOT_AVAILABLE, - STATUS_DRIVER -> kubernetesDriverPodName, - STATUS_DRIVER_IMAGE -> driverDockerImage, - STATUS_EXECUTOR_IMAGE -> sparkConf.get(EXECUTOR_DOCKER_IMAGE), - STATUS_JOB_STATE -> JobState.QUEUED, - STATUS_DESIRED_EXECUTORS -> sparkConf.getInt("spark.executor.instances", 1), - STATUS_CURRENT_EXECUTORS -> 0, - STATUS_DRIVER_UI -> STATUS_PENDING + private val status = Status( + creationTimeStamp = resourceTimeFormat.format(new Date()), + completionTimeStamp = STATUS_NOT_AVAILABLE, + sparkDriver = kubernetesDriverPodName, + driverImage = driverDockerImage, + executorImage = sparkConf.get(EXECUTOR_DOCKER_IMAGE), + jobState = JobState.QUEUED, + desiredExecutors = sparkConf.getInt("spark.executor.instances", 1), + currentExecutors = 0, + driverUi = STATUS_PENDING ) // Failure might be due to TPR inexistence or maybe we're stuck in the 10 minute lag // TODO: in the latter case we can attempt a retry depending on the rc // This also assumes that once we fail at creation, we won't bother trying // anything on the resource for the lifetime of the app - Try(sparkJobController.createJobObject(kubernetesAppId, statusMap)) match { - case Success(_) => sparkConf.set("spark.kubernetes.jobResourceSet", "true") - sparkConf.set("spark.kubernetes.jobResourceName", kubernetesAppId) - case Failure(_: SparkException) => // if e.getMessage startsWith "40" => - sparkConf.set("spark.kubernetes.jobResourceSet", "false") + Try(sparkJobResourceController.createJobObject(kubernetesAppId, status)) match { + case Success(_) => + sparkConf.set(KUBERNETES_JOB_RESOURCE_ENABLED, true) + sparkConf.set(KUBERNETES_JOB_RESOURCE_NAME, kubernetesAppId) + case Failure(_) => } def run(): Unit = { @@ -304,6 +306,8 @@ private[spark] object Client { val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION) val loggingInterval = Option(sparkConf.get(REPORT_INTERVAL)).filter( _ => waitForAppCompletion) val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval) + val sparkJobResourceController = new sparkJobResourceControllerImpl( + kubernetesClientProvider.get) new Client( appName, kubernetesAppId, @@ -316,6 +320,7 @@ private[spark] object Client { kubernetesClientProvider, initContainerComponentsProvider, kubernetesCredentialsMounterProvider, - loggingPodStatusWatcher).run() + loggingPodStatusWatcher, + sparkJobResourceController).run() } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/sparkJobResourceController.scala similarity index 53% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/sparkJobResourceController.scala index 42ca57b163f58..54a3b3f83d579 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/sparkJobResourceController.scala @@ -16,24 +16,36 @@ */ package org.apache.spark.deploy.kubernetes.tpr -import java.util.concurrent.{ThreadPoolExecutor, TimeUnit} +import java.io.IOException +import java.util.concurrent.ThreadPoolExecutor + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{blocking, Future, Promise} import io.fabric8.kubernetes.client.{BaseClient, KubernetesClient} import okhttp3.{HttpUrl, MediaType, OkHttpClient, Request, RequestBody, Response} -import okio.{Buffer, BufferedSource} import org.json4s.{DefaultFormats, Formats} import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods.{compact, parse, pretty, render} import org.json4s.jackson.Serialization.{read, write} -import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{blocking, Future, Promise} -import scala.util.control.Breaks.{break, breakable} import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.internal.Logging import org.apache.spark.SparkException +import org.apache.spark.deploy.kubernetes.tpr.JobState.JobState import org.apache.spark.util.ThreadUtils +/** + * This trait contains currently acceptable operations on the SparkJob Resource + * CRUD + */ +private[spark] trait sparkJobResourceController { + def createJobObject(name: String, status: Status): Unit + def deleteJobObject(tprObjectName: String): Unit + def getJobObject(name: String): SparkJobState + def updateJobObject(jobResourcePatches: Seq[JobResourcePatch]): Unit +} + private[spark] case class Metadata(name: String, uid: Option[String] = None, labels: Option[Map[String, String]] = None, @@ -42,73 +54,111 @@ private[spark] case class Metadata(name: String, private[spark] case class SparkJobState(apiVersion: String, kind: String, metadata: Metadata, - status: Map[String, Any]) + status: Status) + +private[spark] case class Status(creationTimeStamp: String, + completionTimeStamp: String, + sparkDriver: String, + driverImage: String, + executorImage: String, + jobState: JobState, + desiredExecutors: Int, + currentExecutors: Int, + driverUi: String) private[spark] case class WatchObject(`type`: String, `object`: SparkJobState) +private[spark] case class JobResourcePatch(name: String, value: Any, fieldPath: String) + /** + * Prereq - This assumes the kubernetes API has been extended using a TPR of name: Spark-job + * See conf/kubernetes-custom-resource.yaml for a model + * * CRUD + Watch Operations on SparkJob Resource * * This class contains all CRUD+Watch implementations performed * on the SparkJob Resource used to expose the state of a spark job * in kubernetes (visible via kubectl or through the k8s dashboard). - * - * */ -private[spark] class TPRCrudCalls(k8sClient: KubernetesClient, - kubeToken: Option[String] = None) extends Logging { - private val kubeMaster: String = k8sClient.getMasterUrl().toString - - implicit val formats: Formats = DefaultFormats + JobStateSerDe - +private[spark] class sparkJobResourceControllerImpl(k8sClient: KubernetesClient) + extends Logging with sparkJobResourceController { + private val kubeMaster = k8sClient.getMasterUrl.toString private val httpClient: OkHttpClient = extractHttpClientFromK8sClient(k8sClient.asInstanceOf[BaseClient]) + private val namespace = k8sClient.getNamespace - private val namespace: String = k8sClient.getNamespace - private var watchSource: BufferedSource = _ - private lazy val buffer = new Buffer() + private implicit val formats: Formats = DefaultFormats + JobStateSerDe private implicit val ec: ThreadPoolExecutor = ThreadUtils .newDaemonCachedThreadPool("tpr-watcher-pool") - def createJobObject(name: String, keyValuePairs: Map[String, Any]): Unit = { + /** + * This method creates a resource of kind "SparkJob" + * @return Unit + * @throws IOException Due to failure from execute call + * @throws SparkException when POST request is unsuccessful + */ + override def createJobObject(name: String, status: Status): Unit = { val resourceObject = - SparkJobState(s"$TPR_API_GROUP/$TPR_API_VERSION", TPR_KIND, Metadata(name), keyValuePairs) + SparkJobState(s"$TPR_API_GROUP/$TPR_API_VERSION", TPR_KIND, Metadata(name), status) val payload = parse(write(resourceObject)) - val requestBody = RequestBody .create(MediaType.parse("application/json"), compact(render(payload))) - - val requestSegments = Seq( + val requestPathSegments = Seq( "apis", TPR_API_GROUP, TPR_API_VERSION, "namespaces", namespace, "sparkjobs") - val url = generateHttpUrl(requestSegments) - - val request = completeRequest(new Request.Builder() + val url = generateHttpUrl(requestPathSegments) + val request = new Request.Builder() .post(requestBody) - .url(url)) + .url(url) + .build() logDebug(s"Create SparkJobResource Request: $request") - val response = httpClient.newCall(request).execute() + var response: Response = null + try { + response = httpClient.newCall(request).execute() + } catch { + case x: IOException => + val msg = + s"Failed to post resource $name. ${x.getMessage}. ${compact(render(payload))}" + logError(msg) + response.close() + throw new SparkException(msg) + } completeRequestWithExceptionIfNotSuccessful( "post", response, Option(Seq(name, response.toString, compact(render(payload))))) - - response.body().close() logDebug(s"Successfully posted resource $name: " + s"${pretty(render(parse(write(resourceObject))))}") + response.body().close() } - def deleteJobObject(tprObjectName: String): Unit = { - val requestSegments = Seq( + /** + * This method deletes a resource of kind "SparkJob" with the specified name + * @return Unit + * @throws IOException Due to failure from execute call + * @throws SparkException when DELETE request is unsuccessful + */ + override def deleteJobObject(tprObjectName: String): Unit = { + val requestPathSegments = Seq( "apis", TPR_API_GROUP, TPR_API_VERSION, "namespaces", namespace, "sparkjobs", tprObjectName) - val url = generateHttpUrl(requestSegments) - - val request = completeRequest(new Request.Builder() + val url = generateHttpUrl(requestPathSegments) + val request = new Request.Builder() .delete() - .url(url)) + .url(url) + .build() logDebug(s"Delete Request: $request") - val response = httpClient.newCall(request).execute() + var response: Response = null + try { + response = httpClient.newCall(request).execute() + } catch { + case x: IOException => + val msg = + s"Failed to delete resource. ${x.getMessage}." + logError(msg) + response.close() + throw new SparkException(msg) + } completeRequestWithExceptionIfNotSuccessful( "delete", response, @@ -118,17 +168,33 @@ private[spark] class TPRCrudCalls(k8sClient: KubernetesClient, logInfo(s"Successfully deleted resource $tprObjectName") } - def getJobObject(name: String): SparkJobState = { - val requestSegments = Seq( + /** + * This method GETS a resource of kind "SparkJob" with the given name + * @return SparkJobState + * @throws IOException Due to failure from execute call + * @throws SparkException when GET request is unsuccessful + */ + override def getJobObject(name: String): SparkJobState = { + val requestPathSegments = Seq( "apis", TPR_API_GROUP, TPR_API_VERSION, "namespaces", namespace, "sparkjobs", name) - val url = generateHttpUrl(requestSegments) - - val request = completeRequest(new Request.Builder() + val url = generateHttpUrl(requestPathSegments) + val request = new Request.Builder() .get() - .url(url)) + .url(url) + .build() logDebug(s"Get Request: $request") - val response = httpClient.newCall(request).execute() + var response: Response = null + try { + response = httpClient.newCall(request).execute() + } catch { + case x: IOException => + val msg = + s"Failed to get resource $name. ${x.getMessage}." + logError(msg) + response.close() + throw new SparkException(msg) + } completeRequestWithExceptionIfNotSuccessful( "get", response, @@ -139,110 +205,51 @@ private[spark] class TPRCrudCalls(k8sClient: KubernetesClient, read[SparkJobState](response.body().string()) } - def updateJobObject(name: String, value: String, fieldPath: String): Unit = { - val payload = List( - ("op" -> "replace") ~ ("path" -> fieldPath) ~ ("value" -> value)) + /** + * This method Patches in Batch or singly a resource of kind "SparkJob" with the specified name + * @return Unit + * @throws IOException Due to failure from execute call + * @throws SparkException when PATCH request is unsuccessful + */ + override def updateJobObject(jobResourcePatches: Seq[JobResourcePatch]): Unit = { + val payload = jobResourcePatches map { jobResourcePatch => + ("op" -> "replace") ~ + ("path" -> jobResourcePatch.fieldPath) ~ + ("value" -> jobResourcePatch.value.toString) + } val requestBody = RequestBody.create( MediaType.parse("application/json-patch+json"), compact(render(payload))) - - val requestSegments = Seq( - "apis", TPR_API_GROUP, TPR_API_VERSION, "namespaces", namespace, "sparkjobs", name) - val url = generateHttpUrl(requestSegments) - - val request = completeRequest(new Request.Builder() + val requestPathSegments = Seq( + "apis", TPR_API_GROUP, TPR_API_VERSION, "namespaces", + namespace, "sparkjobs", jobResourcePatches.head.name) + val url = generateHttpUrl(requestPathSegments) + val request = new Request.Builder() .patch(requestBody) - .url(url)) + .url(url) + .build() logDebug(s"Update Request: $request") - val response = httpClient.newCall(request).execute() + var response: Response = null + try { + response = httpClient.newCall(request).execute() + } catch { + case x: IOException => + val msg = + s"Failed to get resource ${jobResourcePatches.head.name}. ${x.getMessage}." + logError(msg) + response.close() + throw new SparkException(msg) + } completeRequestWithExceptionIfNotSuccessful( "patch", response, - Option(Seq(name, response.message(), compact(render(payload)))) + Option(Seq(jobResourcePatches.head.name, response.message(), compact(render(payload)))) ) response.body().close() - logDebug(s"Successfully patched resource $name.") - } - -/** - * This method has an helper method that blocks to watch the object. - * The future is completed on a Delete event or source exhaustion. - * This method also relies on the assumption of one sparkjob per namespace - */ - def watchJobObject(): Future[WatchObject] = { - val watchClient = httpClient.newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build() - val requestSegments = Seq( - "apis", TPR_API_GROUP, TPR_API_VERSION, "namespaces", namespace, "sparkjobs") - val url = generateHttpUrl(requestSegments, Seq(("watch", "true"))) - - val request = completeRequest(new Request.Builder() - .get() - .url(url)) - - logDebug(s"Watch Request: $request") - val resp = watchClient.newCall(request).execute() - completeRequestWithExceptionIfNotSuccessful( - "start watch on", - resp, - Option(Seq(resp.code().toString, resp.message()))) - - logInfo(s"Starting watch on jobResource") - watchJobObjectUtil(resp) - } - -/** - * This method has a blocking call - wait on SSE - inside it. - * However it is sent off in a new thread - */ - private def watchJobObjectUtil(response: Response): Future[WatchObject] = { - @volatile var wo: WatchObject = null - watchSource = response.body().source() - executeBlocking { - breakable { - // This will block until there are bytes to read or the source is exhausted. - while (!watchSource.exhausted()) { - watchSource.read(buffer, 8192) match { - case -1 => - cleanUpListener(watchSource, buffer) - throw new SparkException("Source is exhausted and object state is unknown") - case _ => - wo = read[WatchObject](buffer.readUtf8()) - wo match { - case WatchObject("DELETED", w) => - logInfo(s"${w.metadata.name} has been deleted") - cleanUpListener(watchSource, buffer) - case WatchObject(e, _) => logInfo(s"$e event. Still watching") - } - } - } - } - wo - } - } - - private def cleanUpListener(source: BufferedSource, buffer: Buffer): Unit = { - buffer.close() - source.close() - break() - } - - // Serves as a way to interrupt to the watcher thread. - // This closes the source the watcher is reading from and as a result triggers promise completion - def stopWatcher(): Unit = { - if (watchSource != null) { - buffer.close() - watchSource.close() - } - } - - private def completeRequest(partialReq: Request.Builder): Request = { - kubeToken match { - case Some(tok) => partialReq.addHeader("Authorization", s"Bearer $tok").build() - case None => partialReq.build() - } + logDebug(s"Successfully patched resource ${jobResourcePatches.head.name}.") } private def generateHttpUrl(urlSegments: Seq[String], diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index fbc940d2c196d..a453dd483148b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -17,15 +17,14 @@ package org.apache.spark.scheduler.cluster.kubernetes import java.io.Closeable -import java.util.Calendar +import java.text.SimpleDateFormat +import java.util.Date import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success, Try} import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} @@ -36,7 +35,7 @@ import org.apache.spark.{SparkContext, SparkEnv, SparkException} import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.tpr.{JobState, TPRCrudCalls, WatchObject} +import org.apache.spark.deploy.kubernetes.tpr.{sparkJobResourceControllerImpl, JobResourcePatch, JobState} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient import org.apache.spark.rpc.{RpcCallContext, RpcEndpointAddress, RpcEnv} @@ -181,6 +180,16 @@ private[spark] class KubernetesClusterSchedulerBackend( private val allocator = ThreadUtils .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") + private val workingWithSparkJobResource = conf.get(KUBERNETES_JOB_RESOURCE_ENABLED) + private val jobResourceName = conf.get(KUBERNETES_JOB_RESOURCE_NAME) + private val sparkJobResourceController = + if (workingWithSparkJobResource) { + Option.apply(new sparkJobResourceControllerImpl(kubernetesClient)) + } else { + Option.empty + } + private val resourceTimeFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.S'Z'") + private val allocatorRunnable: Runnable = new Runnable { override def run(): Unit = { if (totalRegisteredExecutors.get() < runningExecutorPods.size) { @@ -197,7 +206,12 @@ private[spark] class KubernetesClusterSchedulerBackend( } } } - updateStatus(STATUS_CURRENT_EXECUTORS, totalRegisteredExecutors.get()) + if (workingWithSparkJobResource) { + updateJobResourceStatus( + JobResourcePatch(jobResourceName.get, + totalRegisteredExecutors.get(), + s"/status/currentExecutors")) + } } } @@ -208,16 +222,6 @@ private[spark] class KubernetesClusterSchedulerBackend( sc.env.securityManager.isAuthenticationEnabled(), sc.env.securityManager.isSaslEncryptionEnabled()) } - private var workingWithSparkJobResource = - conf.get("spark.kubernetes.jobResourceSet", "false").toBoolean - - private val jobResourceName = conf.get("spark.kubernetes.jobResourceName", "") - private val sparkJobResourceController = - if (conf.get("spark.kubernetes.jobResourceSet", "false").toBoolean) { - Some(new TPRCrudCalls(kubernetesClient)) - } else { - None - } private def getInitialTargetExecutorNumber(defaultNumExecutors: Int = 1): Int = { if (Utils.isDynamicAllocationEnabled(conf)) { @@ -235,18 +239,15 @@ private[spark] class KubernetesClusterSchedulerBackend( } - private def updateStatus(key: String, value: Any): Unit = { - logInfo(s"Updating Job Resource with name. $jobResourceName") + private def updateJobResourceStatus(jobResourcePatches: JobResourcePatch*): Unit = { sparkJobResourceController.foreach(controller => - Try( - controller.updateJobObject(jobResourceName, value.toString, s"/status/$key")) match { - case Success(_) => - logInfo(s"Updated Job Resource with name. $jobResourceName") - case Failure(e: SparkException) if e.getMessage startsWith "404" => - logWarning(s"Possible deletion of jobResource before backend start") - case Failure(e: Exception) => - logWarning(s"SparkJob object not updated. ${e.getMessage}") - }) + try { + controller.updateJobObject(jobResourcePatches) + } catch { + case e: SparkException if e.getMessage startsWith "404" => + logWarning(s"Possible deletion of jobResource before backend start") + case e: Exception => logWarning(s"SparkJob object not updated. ${e.getMessage}") + }) } override def applicationId(): String = conf.get("spark.app.id", super.applicationId()) @@ -258,10 +259,16 @@ private[spark] class KubernetesClusterSchedulerBackend( override def start(): Unit = { super.start() - updateStatus(STATUS_JOB_STATE, JobState.RUNNING) - updateStatus(STATUS_DRIVER_UI, "http://localhost:8001/api/v1/namespaces" + - s"/default/pods/$kubernetesDriverPodName:${SparkUI.getUIPort(conf)}/proxy/") - + if (workingWithSparkJobResource) { + updateJobResourceStatus( + JobResourcePatch(jobResourceName.get, + JobState.RUNNING, + s"/status/jobState"), + JobResourcePatch(jobResourceName.get, + "http://localhost:8001/api/v1/namespaces" + + s"/default/pods/$kubernetesDriverPodName:${SparkUI.getUIPort(conf)}/proxy/", + "/status/driverUi")) + } executorWatchResource.set(kubernetesClient.pods().withLabel(SPARK_APP_ID_LABEL, applicationId()) .watch(new ExecutorPodsWatcher())) @@ -287,10 +294,21 @@ private[spark] class KubernetesClusterSchedulerBackend( // send stop message to executors so they shut down cleanly super.stop() - updateStatus(STATUS_CURRENT_EXECUTORS, STATUS_NOT_AVAILABLE) - updateStatus(STATUS_DRIVER_UI, STATUS_NOT_AVAILABLE) - updateStatus(STATUS_COMPLETION_TIMESTAMP, Calendar.getInstance().getTime().toString()) - updateStatus(STATUS_JOB_STATE, JobState.FINISHED) + if (workingWithSparkJobResource) { + updateJobResourceStatus( + JobResourcePatch(jobResourceName.get, + STATUS_NOT_AVAILABLE, + "/status/currentExecutors"), + JobResourcePatch(jobResourceName.get, + STATUS_NOT_AVAILABLE, + "/status/driverUi"), + JobResourcePatch(jobResourceName.get, + resourceTimeFormat.format(new Date()), + "/status/completionTimeStamp"), + JobResourcePatch(jobResourceName.get, + JobState.FINISHED, + "/status/jobState")) + } // then delete the executor pods // TODO investigate why Utils.tryLogNonFatalError() doesn't work in this context. @@ -450,7 +468,12 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] { - updateStatus(STATUS_DESIRED_EXECUTORS, requestedTotal) + if (workingWithSparkJobResource) { + updateJobResourceStatus(JobResourcePatch( + jobResourceName.get, + requestedTotal, + s"/status/currentExecutors")) + } totalExpectedExecutors.set(requestedTotal) true } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala index ff6c710117318..aaf72e95d9c9e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala @@ -18,7 +18,10 @@ package org.apache.spark.deploy.kubernetes.submit import java.io.File -import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, DoneablePod, HasMetadata, Pod, PodBuilder, PodList, Secret, SecretBuilder} +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClient, Watch} import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource} import org.hamcrest.{BaseMatcher, Description} @@ -28,13 +31,12 @@ import org.mockito.Mockito.{times, verify, when} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.scalatest.BeforeAndAfter -import scala.collection.JavaConverters._ -import scala.collection.mutable import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} -import org.apache.spark.deploy.kubernetes.{KubernetesExternalShuffleService, KubernetesShuffleBlockHandler, SparkPodInitContainerBootstrap} +import org.apache.spark.deploy.kubernetes.{KubernetesExternalShuffleService, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.tpr.sparkJobResourceController import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient import org.apache.spark.scheduler.cluster.kubernetes.DriverPodKubernetesClientProvider @@ -150,6 +152,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private var namedPodResource: PodResource[Pod, DoneablePod] = _ @Mock private var watch: Watch = _ + @Mock + private var sparkJobResourceController: sparkJobResourceController = _ before { MockitoAnnotations.initMocks(this) @@ -305,7 +309,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { kubernetesClientProvider, initContainerComponentsProvider, credentialsMounterProvider, - loggingPodStatusWatcher).run() + loggingPodStatusWatcher, + sparkJobResourceController).run() verify(loggingPodStatusWatcher).awaitCompletion() } @@ -412,7 +417,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { kubernetesClientProvider, initContainerComponentsProvider, credentialsMounterProvider, - loggingPodStatusWatcher).run() + loggingPodStatusWatcher, + sparkJobResourceController).run() val podMatcher = new BaseMatcher[Pod] { override def matches(o: scala.Any): Boolean = { o match {