diff --git a/pom.xml b/pom.xml index a27daf08a90bb..41c1f076a8621 100644 --- a/pom.xml +++ b/pom.xml @@ -336,7 +336,7 @@ com.squareup.okhttp3 okhttp - 3.4.1 + 3.6.0 org.bouncycastle diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ClientBuilder.scala similarity index 80% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ClientBuilder.scala index 89369b30694ee..9002ed4c0788c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ClientBuilder.scala @@ -20,11 +20,12 @@ import java.io.File import com.google.common.base.Charsets import com.google.common.io.Files -import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient} +import io.fabric8.kubernetes.client.{BaseClient, Config, ConfigBuilder, DefaultKubernetesClient} +import okhttp3._ import org.apache.spark.deploy.kubernetes.constants._ -private[spark] object KubernetesClientBuilder { +private[spark] object ClientBuilder { private val API_SERVER_TOKEN = new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH) private val CA_CERT_FILE = new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH) @@ -34,7 +35,7 @@ private[spark] object KubernetesClientBuilder { * are picked up from canonical locations, as they are injected * into the pod's disk space. */ - def buildFromWithinPod( + def buildK8sClientFromWithinPod( kubernetesNamespace: String): DefaultKubernetesClient = { var clientConfigBuilder = new ConfigBuilder() .withApiVersion("v1") @@ -51,4 +52,14 @@ private[spark] object KubernetesClientBuilder { } new DefaultKubernetesClient(clientConfigBuilder.build) } + + def buildOkhttpClientFromWithinPod(client: BaseClient): OkHttpClient = { + val field = classOf[BaseClient].getDeclaredField("httpClient") + try { + field.setAccessible(true) + field.get(client).asInstanceOf[OkHttpClient] + } finally { + field.setAccessible(false) + } + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResourceFromWithinK8s.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResourceFromWithinK8s.scala new file mode 100644 index 0000000000000..44550cc5fcc48 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResourceFromWithinK8s.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.kubernetes + +import java.nio.file.{Files, Paths} + +import io.fabric8.kubernetes.client.{Config, KubernetesClient} +import scala.util.{Failure, Success, Try} + +import org.apache.spark.deploy.kubernetes.tpr.TPRCrudCalls + +private[spark] class SparkJobResourceClientFromWithinK8s( + client: KubernetesClient) extends TPRCrudCalls { + + private val protocol: String = "https://" + + // we can also get the host from the environment variable + private val kubeHost: String = { + val host = Try(sys.env("KUBERNETES_SERVICE_HOST")) match { + case Success(h) => Option(h) + case Failure(_) => None + } + host.map(h => h).getOrElse { + // Log a warning just in case, but this should almost certainly never happen + logWarning("Error while retrieving k8s host address") + "127.0.0.1" + } + } + + // the port from the environment variable + private val kubeHostPort: String = { + val port = Try(sys.env("KUBERNETES_PORT_443_TCP_PORT")) match { + case Success(p) => Option(p) + case Failure(_) => None + } + port.map(p => p).getOrElse { + // Log a warning just in case, but this should almost certainly never happen + logWarning("Error while retrieving k8s host port") + "8001" + } + } + + // Since this will be running inside a pod + // we can access the pods token and use it with the Authorization header when + // making rest calls to the k8s Api + override protected val kubeToken: Option[String] = { + val path = Paths.get(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH) + Try(new String(Files.readAllBytes(path))) match { + case Success(some) => Option(some) + case Failure(e: Throwable) => logError(s"${e.getMessage}") + None + } + } + + override protected val k8sClient: KubernetesClient = client + override protected val kubeMaster: String = s"$protocol$kubeHost:$kubeHostPort" +} + +private[spark] class SparkJobResourceClientFromOutsideK8s( + client: KubernetesClient) extends TPRCrudCalls { + + override protected val k8sClient: KubernetesClient = client +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 688cd858e79ff..ffdccc14eb87b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -68,4 +68,9 @@ package object constants { private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submit" private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" + + // TPR + private[spark] val TPR_API_VERSION = "apache.io/v1" + private[spark] val TPR_API_ENDPOINT = s"apis/%s/namespaces/%s/sparkjobs" + private[spark] val TPR_KIND = "SparkJob" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobState.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobState.scala new file mode 100644 index 0000000000000..6f90194f673ab --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobState.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.kubernetes.tpr + +private[spark] object JobState extends Enumeration { + type JobState = Value + + /* + * QUEUED - Spark Job has been queued to run + * SUBMITTED - Driver Pod deployed but tasks are not yet scheduled on worker pod(s) + * RUNNING - Task(s) have been allocated to worker pod(s) to run and Spark Job is now running + * FINISHED - Spark Job ran and exited cleanly, i.e, worker pod(s) and driver pod were + * gracefully deleted + * FAILED - Spark Job Failed due to error + * KILLED - A user manually killed this Spark Job + */ + val QUEUED, SUBMITTED, RUNNING, FINISHED, FAILED, KILLED = Value +} \ No newline at end of file diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala new file mode 100644 index 0000000000000..16edd19de6e0c --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.kubernetes.tpr + +import org.json4s.{CustomSerializer, JString} +import org.json4s.JsonAST.JNull + +import org.apache.spark.deploy.kubernetes.tpr.JobState.JobState + + /** + * JobState Serializer and Deserializer + * */ +object JobStateSerDe extends CustomSerializer[JobState](_ => + ({ + case JString("SUBMITTED") => JobState.SUBMITTED + case JString("QUEUED") => JobState.QUEUED + case JString("RUNNING") => JobState.RUNNING + case JString("FINISHED") => JobState.FINISHED + case JString("KILLED") => JobState.KILLED + case JString("FAILED") => JobState.FAILED + case JNull => + throw new UnsupportedOperationException("No JobState Specified") + }, { + case JobState.FAILED => JString("FAILED") + case JobState.SUBMITTED => JString("SUBMITTED") + case JobState.KILLED => JString("KILLED") + case JobState.FINISHED => JString("FINISHED") + case JobState.QUEUED => JString("QUEUED") + case JobState.RUNNING => JString("RUNNING") + }) +) \ No newline at end of file diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala new file mode 100644 index 0000000000000..af88d06e51cc7 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.tpr + +import java.util.concurrent.{ThreadPoolExecutor, TimeUnit} + +import io.fabric8.kubernetes.client.{BaseClient, KubernetesClient} +import okhttp3.{MediaType, OkHttpClient, Request, RequestBody, Response} +import okio.{Buffer, BufferedSource} +import org.json4s.{DefaultFormats, Formats} +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods.{compact, parse, pretty, render} +import org.json4s.jackson.Serialization.{read, write} +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{blocking, Future, Promise} +import scala.util.control.Breaks.{break, breakable} + +import org.apache.spark.deploy.kubernetes.ClientBuilder +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.internal.Logging +import org.apache.spark.SparkException +import org.apache.spark.util.ThreadUtils + +/** + * Isolated this since the method is called on the client machine + */ + +private[spark] case class Metadata(name: String, + uid: Option[String] = None, + labels: Option[Map[String, String]] = None, + annotations: Option[Map[String, String]] = None) + +private[spark] case class SparkJobState(apiVersion: String, + kind: String, + metadata: Metadata, + spec: Map[String, Any]) + +private[spark] case class WatchObject(`type`: String, `object`: SparkJobState) + +private[spark] abstract class TPRCrudCalls extends Logging { + protected val k8sClient: KubernetesClient + protected val kubeMaster: String = KUBERNETES_MASTER_INTERNAL_URL + protected val kubeToken: Option[String] = None + + implicit val formats: Formats = DefaultFormats + JobStateSerDe + + + protected val httpClient: OkHttpClient = ClientBuilder + .buildOkhttpClientFromWithinPod(k8sClient.asInstanceOf[BaseClient]) + + protected val namespace: String = k8sClient.getNamespace + private var watchSource: BufferedSource = _ + private lazy val buffer = new Buffer() + protected implicit val ec: ThreadPoolExecutor = ThreadUtils + .newDaemonCachedThreadPool("tpr-watcher-pool") + + private def executeBlocking(cb: => WatchObject): Future[WatchObject] = { + val p = Promise[WatchObject]() + ec.execute(new Runnable { + override def run(): Unit = { + try { + p.trySuccess(blocking(cb)) + } catch { + case e: Throwable => p.tryFailure(e) + } + } + }) + p.future + } + + private def completeRequest(partialReq: Request.Builder): Request = { + kubeToken match { + case Some(tok) => partialReq.addHeader("Authorization", s"Bearer $tok").build() + case None => partialReq.build() + } + } + + private def completeRequestWithExceptionIfNotSuccessful( + requestType: String, + response: Response, + additionalInfo: Option[Seq[String]] = None): Unit = { + + if (!response.isSuccessful) { + response.body().close() + val msg = new ArrayBuffer[String] + msg += s"Failed to $requestType resource." + + additionalInfo match { + case Some(info) => + for (extraMsg <- info) { + msg += extraMsg + } + case None => + } + + val finalMessage = msg.mkString(" ") + logError(finalMessage) + throw new SparkException(finalMessage) + } + } + + def createJobObject(name: String, keyValuePairs: Map[String, Any]): Unit = { + val resourceObject = + SparkJobState(TPR_API_VERSION, TPR_KIND, Metadata(name), keyValuePairs) + val payload = parse(write(resourceObject)) + + val requestBody = RequestBody + .create(MediaType.parse("application/json"), compact(render(payload))) + val request = completeRequest(new Request.Builder() + .post(requestBody) + .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}")) + + logDebug(s"Create Request: $request") + val response = httpClient.newCall(request).execute() + completeRequestWithExceptionIfNotSuccessful( + "post", + response, + Option(Seq(name, response.toString, compact(render(payload))))) + + response.body().close() + logDebug(s"Successfully posted resource $name: " + + s"${pretty(render(parse(write(resourceObject))))}") + } + + def deleteJobObject(tprObjectName: String): Unit = { + val request = completeRequest(new Request.Builder() + .delete() + .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}/$tprObjectName")) + + logDebug(s"Delete Request: $request") + val response = httpClient.newCall(request).execute() + completeRequestWithExceptionIfNotSuccessful( + "delete", + response, + Option(Seq(tprObjectName, response.message(), request.toString))) + + response.body().close() + logInfo(s"Successfully deleted resource $tprObjectName") + } + + def getJobObject(name: String): SparkJobState = { + val request = completeRequest(new Request.Builder() + .get() + .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}/$name")) + + logDebug(s"Get Request: $request") + val response = httpClient.newCall(request).execute() + completeRequestWithExceptionIfNotSuccessful( + "get", + response, + Option(Seq(name, response.message())) + ) + + logInfo(s"Successfully retrieved resource $name") + read[SparkJobState](response.body().string()) + } + + def updateJobObject(name: String, value: String, fieldPath: String): Unit = { + val payload = List( + ("op" -> "replace") ~ ("path" -> fieldPath) ~ ("value" -> value)) + val requestBody = + RequestBody.create( + MediaType.parse("application/json-patch+json"), + compact(render(payload))) + val request = completeRequest(new Request.Builder() + .patch(requestBody) + .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}/$name")) + + logDebug(s"Update Request: $request") + val response = httpClient.newCall(request).execute() + completeRequestWithExceptionIfNotSuccessful( + "patch", + response, + Option(Seq(name, response.message(), compact(render(payload)))) + ) + + response.body().close() + logDebug(s"Successfully patched resource $name.") + } + + /** + * This method has an helper method that blocks to watch the object. + * The future is completed on a Delete event or source exhaustion. + * This method relies on the assumption of one sparkjob per namespace + */ + def watchJobObject(): Future[WatchObject] = { + val watchClient = httpClient.newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build() + val request = completeRequest(new Request.Builder() + .get() + .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}?watch=true")) + + logDebug(s"Watch Request: $request") + val resp = watchClient.newCall(request).execute() + completeRequestWithExceptionIfNotSuccessful( + "start watch on", + resp, + Option(Seq(resp.code().toString, resp.message()))) + + logInfo(s"Starting watch on jobResource") + watchJobObjectUtil(resp) + } + + /** + * This method has a blocking call - wait on SSE - inside it. + * However it is sent off in a new thread + */ + private def watchJobObjectUtil(response: Response): Future[WatchObject] = { + @volatile var wo: WatchObject = null + watchSource = response.body().source() + executeBlocking { + breakable { + // This will block until there are bytes to read or the source is exhausted. + while (!watchSource.exhausted()) { + watchSource.read(buffer, 8192) match { + case -1 => + cleanUpListener(watchSource, buffer) + throw new SparkException("Source is exhausted and object state is unknown") + case _ => + wo = read[WatchObject](buffer.readUtf8()) + wo match { + case WatchObject("DELETED", w) => + logInfo(s"${w.metadata.name} has been deleted") + cleanUpListener(watchSource, buffer) + case WatchObject(e, _) => logInfo(s"$e event. Still watching") + } + } + } + } + wo + } + } + + private def cleanUpListener(source: BufferedSource, buffer: Buffer): Unit = { + buffer.close() + source.close() + break() + } + + // Serves as a way to interrupt to the watcher thread. + // This closes the source the watcher is reading from and as a result triggers promise completion + def stopWatcher(): Unit = { + if (watchSource != null) { + buffer.close() + watchSource.close() + } + } + +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 898b215b92d04..6f77e2c2f5c23 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -16,17 +16,14 @@ */ package org.apache.spark.scheduler.cluster.kubernetes -import java.util.UUID -import java.util.concurrent.Executors import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} -import com.google.common.util.concurrent.ThreadFactoryBuilder import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, Pod, QuantityBuilder} import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkException} -import org.apache.spark.deploy.kubernetes.{Client, KubernetesClientBuilder} +import org.apache.spark.deploy.kubernetes.ClientBuilder import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.rpc.RpcEndpointAddress @@ -75,8 +72,8 @@ private[spark] class KubernetesClusterSchedulerBackend( private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) - private val kubernetesClient = KubernetesClientBuilder - .buildFromWithinPod(kubernetesNamespace) + private val kubernetesClient = ClientBuilder + .buildK8sClientFromWithinPod(kubernetesNamespace) private val driverPod = try { kubernetesClient.pods().inNamespace(kubernetesNamespace). @@ -87,7 +84,7 @@ private[spark] class KubernetesClusterSchedulerBackend( throw new SparkException(s"Executor cannot find driver pod", throwable) } - override val minRegisteredRatio = + override val minRegisteredRatio: Double = if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { 0.8 } else { @@ -101,7 +98,7 @@ private[spark] class KubernetesClusterSchedulerBackend( sc.getConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString - private val initialExecutors = getInitialTargetExecutorNumber(1) + private val initialExecutors = getInitialTargetExecutorNumber() private def getInitialTargetExecutorNumber(defaultNumExecutors: Int = 1): Int = { if (Utils.isDynamicAllocationEnabled(conf)) { @@ -237,7 +234,7 @@ private[spark] class KubernetesClusterSchedulerBackend( logInfo(s"Requesting ${requestedTotal - totalExpectedExecutors.get}" + s" additional executors, expecting total $requestedTotal and currently" + s" expected ${totalExpectedExecutors.get}") - for (i <- 0 until (requestedTotal - totalExpectedExecutors.get)) { + for (_ <- 0 until (requestedTotal - totalExpectedExecutors.get)) { runningExecutorPods += allocateNewExecutorPod() } } diff --git a/sbin/driver.yaml b/sbin/driver.yaml new file mode 100644 index 0000000000000..e322a724ec01d --- /dev/null +++ b/sbin/driver.yaml @@ -0,0 +1,8 @@ +apiVersion: "apache.io/v1" +kind: "SparkJob" +metadata: + name: "spark-job-1" +spec: + image: "driver-image" + state: "completed" + num-executors: 10 \ No newline at end of file diff --git a/sbin/kubernetes-resource.yaml b/sbin/kubernetes-resource.yaml new file mode 100644 index 0000000000000..0ff3d39b45182 --- /dev/null +++ b/sbin/kubernetes-resource.yaml @@ -0,0 +1,10 @@ +metadata: + name: spark-job.apache.io + labels: + resource: spark-job + object: spark +apiVersion: extensions/v1beta1 +kind: ThirdPartyResource +description: "A resource that manages a spark job" +versions: + - name: v1 \ No newline at end of file