From 529907af8cc046c023e658c8f0efd132505d4a64 Mon Sep 17 00:00:00 2001 From: iyanuobidele Date: Wed, 14 Dec 2016 21:32:57 -0800 Subject: [PATCH 1/5] initial object watcher implementation --- .../deploy/kubernetes/SparkJobResource.scala | 78 +++++++++++++++++-- .../KubernetesClusterScheduler.scala | 14 ++-- .../KubernetesClusterSchedulerBackend.scala | 47 +++++++---- 3 files changed, 109 insertions(+), 30 deletions(-) diff --git a/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala b/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala index 52e9ff3e05661..6aaf0216c8737 100644 --- a/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala +++ b/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala @@ -17,8 +17,13 @@ package org.apache.spark.deploy.kubernetes +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Success} +import scala.util.control.Breaks.{break, breakable} + import io.fabric8.kubernetes.client.{BaseClient, KubernetesClient} -import okhttp3.{MediaType, OkHttpClient, Request, RequestBody} +import okhttp3._ +import okio.{Buffer, BufferedSource} import org.json4s.{CustomSerializer, DefaultFormats, JString} import org.json4s.JsonAST.JNull import org.json4s.JsonDSL._ @@ -44,8 +49,10 @@ object SparkJobResource { metadata: Metadata, spec: Map[String, Any]) + case class WatchObject(`type`: String, `object`: SparkJobState) + case object JobStateSerDe - extends CustomSerializer[JobState](format => + extends CustomSerializer[JobState](_ => ({ case JString("SUBMITTED") => JobState.SUBMITTED case JString("QUEUED") => JobState.QUEUED @@ -65,15 +72,15 @@ object SparkJobResource { })) } -class SparkJobResource(client: KubernetesClient) extends Logging { +class SparkJobResource(client: KubernetesClient)(implicit ec: ExecutionContext) extends Logging { import SparkJobResource._ - implicit val formats = DefaultFormats + JobStateSerDe + private implicit val formats = DefaultFormats + JobStateSerDe private val httpClient = getHttpClient(client.asInstanceOf[BaseClient]) private val kind = "SparkJob" private val apiVersion = "apache.io/v1" - private val apiEndpoint = s"${client.getMasterUrl}apis/$apiVersion/" + + private val apiEndpoint = s"${client.getMasterUrl}/apis/$apiVersion/" + s"namespaces/${client.getNamespace}/sparkjobs" private def getHttpClient(client: BaseClient): OkHttpClient = { @@ -86,7 +93,7 @@ class SparkJobResource(client: KubernetesClient) extends Logging { } } - /* + /** * using a Map as an argument here allows adding more info into the Object if needed * */ def createJobObject(name: String, keyValuePairs: Map[String, Any]): Unit = { @@ -97,7 +104,6 @@ class SparkJobResource(client: KubernetesClient) extends Logging { .create(MediaType.parse("application/json"), compact(render(payload))) val request = new Request.Builder().post(requestBody).url(apiEndpoint).build() - val response = httpClient.newCall(request).execute() if (response.code() == 201) { logInfo( @@ -140,7 +146,7 @@ class SparkJobResource(client: KubernetesClient) extends Logging { logInfo(s"Successfully deleted resource $name") } else { val msg = - s"Failed to delete resource $name. ${response.message()}. ${request}" + s"Failed to delete resource $name. ${response.message()}. $request" logError(msg) throw new SparkException(msg) } @@ -160,4 +166,60 @@ class SparkJobResource(client: KubernetesClient) extends Logging { } } + /** + * This method has an helper method that blocks to watch the object. + * The future is completed on a Delete event. + */ + def watchJobObject(): Future[WatchObject] = { + val promiseWatchOver = Promise[WatchObject]() + val request = + new Request.Builder().get().url(s"$apiEndpoint?watch=true").build() + httpClient.newCall(request).execute() match { + case r: Response if r.code() == 200 => + val deleteWatch = watchJobObjectUtil(r) + logInfo("Starting watch on object") + deleteWatch onComplete { + case Success(w: WatchObject) => promiseWatchOver success w + case Success(_) => throw new SparkException("Unexpected Response received") + case Failure(e: Throwable) => throw new SparkException(e.getMessage) + } + case _: Response => throw new IllegalStateException("There's fire on the mountain") + } + promiseWatchOver.future + } + + /** + * This method has a blocking call inside it. + * However it is wrapped in a future, so it'll take off in another thread + */ + private def watchJobObjectUtil(response: Response): Future[WatchObject] = { + val promiseOfJobState = Promise[WatchObject]() + val buffer = new Buffer() + val source: BufferedSource = response.body().source() + Future { + breakable { + // This will block until there are bytes to read or the source is definitely exhausted. + while (!source.exhausted()) { + source.read(buffer, 8192) match { + case -1 => cleanUpListener(source, buffer, response) + case _ => val wo = read[WatchObject](buffer.readUtf8()) + wo match { + case WatchObject("DELETED", _) => promiseOfJobState success wo + cleanUpListener(source, buffer, response) + case WatchObject(_, _) => + } + } + } + } + } + promiseOfJobState.future + } + + private def cleanUpListener(source: BufferedSource, buffer: Buffer, response: Response): Unit = { + buffer.close() + source.close() + response.close() + break() + } + } diff --git a/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala b/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala index 8aaa5193db968..9a361c5666b38 100644 --- a/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala +++ b/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala @@ -49,19 +49,19 @@ private[spark] class KubernetesClusterScheduler(conf: SparkConf) private val DEFAULT_CORES = 1.0 logInfo("Created KubernetesClusterScheduler instance") - val client = setupKubernetesClient() - val driverName = s"spark-driver-${Random.alphanumeric take 5 mkString ""}".toLowerCase() - val svcName = s"spark-svc-${Random.alphanumeric take 5 mkString ""}".toLowerCase() - val nameSpace = conf.get( + private val client = setupKubernetesClient() + private val driverName = s"spark-driver-${Random.alphanumeric take 5 mkString ""}".toLowerCase() + private val svcName = s"spark-svc-${Random.alphanumeric take 5 mkString ""}".toLowerCase() + private val nameSpace = conf.get( "spark.kubernetes.namespace", KubernetesClusterScheduler.defaultNameSpace) - val serviceAccountName = conf.get( + private val serviceAccountName = conf.get( "spark.kubernetes.serviceAccountName", KubernetesClusterScheduler.defaultServiceAccountName) // Anything that should either not be passed to driver config in the cluster, or // that is going to be explicitly managed as command argument to the driver pod - val confBlackList = scala.collection.Set( + private val confBlackList = scala.collection.Set( "spark.master", "spark.app.name", "spark.submit.deployMode", @@ -69,7 +69,7 @@ private[spark] class KubernetesClusterScheduler(conf: SparkConf) "spark.dynamicAllocation.enabled", "spark.shuffle.service.enabled") - val instances = conf.get(EXECUTOR_INSTANCES).getOrElse(1) + private val instances = conf.get(EXECUTOR_INSTANCES).getOrElse(1) // image needs to support shim scripts "/opt/driver.sh" and "/opt/executor.sh" private val sparkDriverImage = conf.getOption("spark.kubernetes.sparkImage").getOrElse { diff --git a/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index ff676c0394768..142b995eca0b5 100644 --- a/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -17,17 +17,20 @@ package org.apache.spark.scheduler.cluster.kubernetes +import java.util.concurrent.Executors + import scala.collection.{concurrent, mutable} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import scala.concurrent.Future -import scala.util.Random +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Random, Success, Try} import io.fabric8.kubernetes.api.model.{Pod, PodBuilder, PodFluent} import io.fabric8.kubernetes.client.DefaultKubernetesClient import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.deploy.kubernetes.SparkJobResource +import org.apache.spark.deploy.kubernetes.SparkJobResource.WatchObject import org.apache.spark.internal.config._ import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler._ @@ -42,8 +45,8 @@ private[spark] class KubernetesClusterSchedulerBackend( private val client = new DefaultKubernetesClient() - val DEFAULT_NUMBER_EXECUTORS = 2 - val podPrefix = s"spark-executor-${Random.alphanumeric take 5 mkString ""}".toLowerCase() + private val DEFAULT_NUMBER_EXECUTORS = 2 + private val podPrefix = s"spark-executor-${Random.alphanumeric take 5 mkString ""}".toLowerCase() // using a concurrent TrieMap gets rid of possible concurrency issues // key is executor id, value is pod name @@ -51,13 +54,17 @@ private[spark] class KubernetesClusterSchedulerBackend( private var shutdownToPod = new concurrent.TrieMap[String, String] // pending shutdown private var executorID = 0 - val sparkImage = conf.get("spark.kubernetes.sparkImage") - val clientJarUri = conf.get("spark.executor.jar") - val ns = conf.get( + private val sparkImage = conf.get("spark.kubernetes.sparkImage") + private val clientJarUri = conf.get("spark.executor.jar") + private val ns = conf.get( "spark.kubernetes.namespace", KubernetesClusterScheduler.defaultNameSpace) - val dynamicExecutors = Utils.isDynamicAllocationEnabled(conf) - val sparkJobResource = new SparkJobResource(client) + private val dynamicExecutors = Utils.isDynamicAllocationEnabled(conf) + + private val executorService = Executors.newFixedThreadPool(4) + private implicit val executionContext = ExecutionContext.fromExecutorService(executorService) + + private val sparkJobResource = new SparkJobResource(client)(executionContext) private val imagePullSecret = System.getProperty("SPARK_IMAGE_PULLSECRET", "") @@ -72,11 +79,11 @@ private[spark] class KubernetesClusterSchedulerBackend( val keyValuePairs = Map("num-executors" -> getInitialTargetExecutorNumber(sc.conf), "image" -> sparkImage, "state" -> JobState.SUBMITTED) - try { - logInfo(s"Creating Job Resource with name. $podPrefix") - sparkJobResource.createJobObject(podPrefix, keyValuePairs) - } catch { - case e: SparkException => + logInfo(s"Creating Job Resource with name. $podPrefix") + + Try(sparkJobResource.createJobObject(podPrefix, keyValuePairs)) match { + case Success(_) => startWatcher() + case Failure(e: SparkException) => logWarning(s"SparkJob object not created. ${e.getMessage}") // SparkJob should continue if this fails as discussed on thread. // TODO: we should short-circuit on things like update or delete @@ -84,6 +91,16 @@ private[spark] class KubernetesClusterSchedulerBackend( createExecutorPods(getInitialTargetExecutorNumber(sc.getConf)) } + private def startWatcher(): Unit = { + sparkJobResource.watchJobObject() onComplete { + case Success(w: WatchObject) if w.`type` == "DELETED" => + logInfo("TPR Object deleted. Cleaning up") + stop() + case Success(_: WatchObject) => throw new SparkException("Unexpected response received") + case Failure(e: Throwable) => throw new SparkException(e.getMessage) + } + } + override def stop(): Unit = { // Kill all executor pods indiscriminately killExecutorPods(executorToPod.toVector) @@ -138,7 +155,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } private def createExecutorPods(n: Int) { - for (i <- 1 to n) { + for (_ <- 1 to n) { executorID += 1 executorToPod += ((executorID.toString, createExecutorPod(executorID))) } From c0f9c482c60266a8aab64947d84e5e61d0cb22bc Mon Sep 17 00:00:00 2001 From: iyanuobidele Date: Wed, 14 Dec 2016 22:54:27 -0800 Subject: [PATCH 2/5] changing TPR instance creation logic --- .../spark/deploy/kubernetes/Client.scala | 1 + .../KubernetesClusterScheduler.scala | 27 +++++++++++++--- .../KubernetesClusterSchedulerBackend.scala | 32 +++++++++---------- 3 files changed, 38 insertions(+), 22 deletions(-) diff --git a/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index df16e742dd7ab..8914a9c18b4c2 100644 --- a/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -46,6 +46,7 @@ private[spark] class Client(val args: ClientArguments, shutdownLatch.countDown() System.clearProperty("SPARK_KUBERNETES_MODE") System.clearProperty("SPARK_IMAGE_PULLSECRET") + System.clearProperty("SPARK_JOB_OBJECT_NAME") } def awaitShutdown(): Unit = { diff --git a/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala b/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala index 9a361c5666b38..a288abf1dabbe 100644 --- a/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala +++ b/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala @@ -19,17 +19,19 @@ package org.apache.spark.scheduler.cluster.kubernetes import java.io.File import java.util.Date +import java.util.concurrent.Executors import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import scala.util.Random +import scala.concurrent.ExecutionContext.Implicits.global +import scala.util.{Failure, Random, Success, Try} import io.fabric8.kubernetes.api.model.{Pod, PodBuilder, PodFluent, ServiceBuilder} import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException} import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.Command -import org.apache.spark.deploy.kubernetes.ClientArguments +import org.apache.spark.deploy.kubernetes.{ClientArguments, SparkJobResource} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.util.Utils @@ -72,17 +74,32 @@ private[spark] class KubernetesClusterScheduler(conf: SparkConf) private val instances = conf.get(EXECUTOR_INSTANCES).getOrElse(1) // image needs to support shim scripts "/opt/driver.sh" and "/opt/executor.sh" - private val sparkDriverImage = conf.getOption("spark.kubernetes.sparkImage").getOrElse { + private val sparkImage = conf.getOption("spark.kubernetes.sparkImage").getOrElse { // TODO: this needs to default to some standard Apache Spark image throw new SparkException("Spark image not set. Please configure spark.kubernetes.sparkImage") } + private val sparkJobResource = new SparkJobResource(client) + private val imagePullSecret = conf.get("spark.kubernetes.imagePullSecret", "") private val isImagePullSecretSet = isSecretRunning(imagePullSecret) logWarning("Instances: " + instances) def start(args: ClientArguments): Unit = { + val sparkJobName = + s"spark-job-$nameSpace-${Random.alphanumeric take 5 mkString ""}".toLowerCase() + System.setProperty("SPARK_JOB_OBJECT_NAME", sparkJobName) + val keyValuePairs = Map("num-executors" -> instances, + "image" -> sparkImage, + "state" -> JobState.QUEUED) + + Try(sparkJobResource.createJobObject(sparkJobName, keyValuePairs)) match { + case Success(_) => logInfo(s"Object with name: $sparkJobName posted to k8s successfully") + case Failure(e: Throwable) => // log and carry on + logInfo(s"Failed to post object $sparkJobName due to ${e.getMessage}") + } + startDriver(client, args) } @@ -116,7 +133,7 @@ private[spark] class KubernetesClusterScheduler(conf: SparkConf) s"--conf=spark.executor.jar=$clientJarUri", s"--conf=spark.executor.instances=$instances", s"--conf=spark.kubernetes.namespace=$nameSpace", - s"--conf=spark.kubernetes.sparkImage=$sparkDriverImage") + s"--conf=spark.kubernetes.sparkImage=$sparkImage") if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { submitArgs ++= Vector( @@ -183,7 +200,7 @@ private[spark] class KubernetesClusterScheduler(conf: SparkConf) .withServiceAccount(serviceAccountName) .addNewContainer() .withName("spark-driver") - .withImage(sparkDriverImage) + .withImage(sparkImage) .withImagePullPolicy("Always") .withCommand(s"/opt/driver.sh") .withArgs(submitArgs: _*) diff --git a/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 142b995eca0b5..ee90366152710 100644 --- a/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -46,7 +46,7 @@ private[spark] class KubernetesClusterSchedulerBackend( private val client = new DefaultKubernetesClient() private val DEFAULT_NUMBER_EXECUTORS = 2 - private val podPrefix = s"spark-executor-${Random.alphanumeric take 5 mkString ""}".toLowerCase() + private val jobObjectName = System.getProperty("SPARK_JOB_OBJECT_NAME", "") // using a concurrent TrieMap gets rid of possible concurrency issues // key is executor id, value is pod name @@ -55,13 +55,12 @@ private[spark] class KubernetesClusterSchedulerBackend( private var executorID = 0 private val sparkImage = conf.get("spark.kubernetes.sparkImage") - private val clientJarUri = conf.get("spark.executor.jar") private val ns = conf.get( "spark.kubernetes.namespace", KubernetesClusterScheduler.defaultNameSpace) private val dynamicExecutors = Utils.isDynamicAllocationEnabled(conf) - private val executorService = Executors.newFixedThreadPool(4) + private val executorService = Executors.newFixedThreadPool(4) // why 4 ?! private implicit val executionContext = ExecutionContext.fromExecutorService(executorService) private val sparkJobResource = new SparkJobResource(client)(executionContext) @@ -76,15 +75,12 @@ private[spark] class KubernetesClusterSchedulerBackend( override def start(): Unit = { super.start() - val keyValuePairs = Map("num-executors" -> getInitialTargetExecutorNumber(sc.conf), - "image" -> sparkImage, - "state" -> JobState.SUBMITTED) - logInfo(s"Creating Job Resource with name. $podPrefix") - - Try(sparkJobResource.createJobObject(podPrefix, keyValuePairs)) match { + logInfo(s"Updating Job Resource with name. $jobObjectName") + Try(sparkJobResource + .updateJobObject(jobObjectName, JobState.SUBMITTED.toString, "/spec/state")) match { case Success(_) => startWatcher() case Failure(e: SparkException) => - logWarning(s"SparkJob object not created. ${e.getMessage}") + logWarning(s"SparkJob object not updated. ${e.getMessage}") // SparkJob should continue if this fails as discussed on thread. // TODO: we should short-circuit on things like update or delete } @@ -107,7 +103,7 @@ private[spark] class KubernetesClusterSchedulerBackend( killExecutorPods(shutdownToPod.toVector) // TODO: pods that failed during build up due to some error are left behind. try{ - sparkJobResource.deleteJobObject(podPrefix) + sparkJobResource.deleteJobObject(jobObjectName) } catch { case e: SparkException => logWarning(s"SparkJob object not deleted. ${e.getMessage}") @@ -139,11 +135,12 @@ private[spark] class KubernetesClusterSchedulerBackend( } // TODO: be smarter about when to update. - try { - sparkJobResource.updateJobObject(podPrefix, requestedTotal.toString, "/spec/num-executors") - } catch { - case e: SparkException => logWarning(s"SparkJob Object not updated. ${e.getMessage}") + Try(sparkJobResource + .updateJobObject(jobObjectName, requestedTotal.toString, "/spec/num-executors")) match { + case Success(_) => logInfo(s"Object with name: $jobObjectName updated successfully") + case Failure(e: SparkException) => logWarning(s"SparkJob Object not updated. ${e.getMessage}") } + // TODO: are there meaningful failure modes here? Future.successful(true) } @@ -162,7 +159,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } def shutdownExecutors(idPodPairs: Seq[(String, String)]) { - val active = getExecutorIds.toSet + val active = getExecutorIds().toSet // Check for any finished shutting down and kill the pods val shutdown = shutdownToPod.toVector.filter { case (e, _) => !active.contains(e) } @@ -222,7 +219,8 @@ private[spark] class KubernetesClusterSchedulerBackend( def createExecutorPod(executorNum: Int): String = { // create a single k8s executor pod. val labelMap = Map("type" -> "spark-executor") - val podName = s"$podPrefix-$executorNum" + val executorBaseName = s"spark-executor-${Random.alphanumeric take 5 mkString ""}".toLowerCase + val podName = s"$executorBaseName-$executorNum" val submitArgs = mutable.ArrayBuffer.empty[String] From 0ea1ac6da264bc94ae24b5c1d62ebf478ef4151d Mon Sep 17 00:00:00 2001 From: iyanuobidele Date: Thu, 15 Dec 2016 12:31:12 -0800 Subject: [PATCH 3/5] minor refactoring + moving ser and de out --- .../deploy/kubernetes/SparkJobResource.scala | 66 +++++++------------ .../cluster/kubernetes/JobStateSerDe.scala | 46 +++++++++++++ 2 files changed, 68 insertions(+), 44 deletions(-) create mode 100644 resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/JobStateSerDe.scala diff --git a/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala b/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala index 6aaf0216c8737..bb484cfc7e8fc 100644 --- a/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala +++ b/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala @@ -24,16 +24,14 @@ import scala.util.control.Breaks.{break, breakable} import io.fabric8.kubernetes.client.{BaseClient, KubernetesClient} import okhttp3._ import okio.{Buffer, BufferedSource} -import org.json4s.{CustomSerializer, DefaultFormats, JString} -import org.json4s.JsonAST.JNull +import org.json4s.DefaultFormats import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.json4s.jackson.Serialization.{read, write} import org.apache.spark.SparkException import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.cluster.kubernetes.JobState -import org.apache.spark.scheduler.cluster.kubernetes.JobState._ +import org.apache.spark.scheduler.cluster.kubernetes.JobStateSerDe /* * Representation of a Spark Job State in Kubernetes @@ -50,26 +48,6 @@ object SparkJobResource { spec: Map[String, Any]) case class WatchObject(`type`: String, `object`: SparkJobState) - - case object JobStateSerDe - extends CustomSerializer[JobState](_ => - ({ - case JString("SUBMITTED") => JobState.SUBMITTED - case JString("QUEUED") => JobState.QUEUED - case JString("RUNNING") => JobState.RUNNING - case JString("FINISHED") => JobState.FINISHED - case JString("KILLED") => JobState.KILLED - case JString("FAILED") => JobState.FAILED - case JNull => - throw new UnsupportedOperationException("No JobState Specified") - }, { - case JobState.FAILED => JString("FAILED") - case JobState.SUBMITTED => JString("SUBMITTED") - case JobState.KILLED => JString("KILLED") - case JobState.FINISHED => JString("FINISHED") - case JobState.QUEUED => JString("QUEUED") - case JobState.RUNNING => JString("RUNNING") - })) } class SparkJobResource(client: KubernetesClient)(implicit ec: ExecutionContext) extends Logging { @@ -105,16 +83,14 @@ class SparkJobResource(client: KubernetesClient)(implicit ec: ExecutionContext) val request = new Request.Builder().post(requestBody).url(apiEndpoint).build() val response = httpClient.newCall(request).execute() - if (response.code() == 201) { - logInfo( - s"Successfully posted resource $name: " + - s"${pretty(render(parse(write(resourceObject))))}") - } else { + if (!response.isSuccessful) { val msg = s"Failed to post resource $name. ${response.toString}. ${compact(render(payload))}" logError(msg) throw new SparkException(msg) } + logInfo(s"Successfully posted resource $name: " + + s"${pretty(render(parse(write(resourceObject))))}") } def updateJobObject(name: String, value: String, fieldPath: String): Unit = { @@ -128,42 +104,39 @@ class SparkJobResource(client: KubernetesClient)(implicit ec: ExecutionContext) .url(s"$apiEndpoint/$name") .build() val response = httpClient.newCall(request).execute() - if (response.code() == 200) { - logInfo(s"Successfully patched resource $name") - } else { + if (!response.isSuccessful) { val msg = s"Failed to patch resource $name. ${response.message()}. ${compact(render(payload))}" logError(msg) throw new SparkException(msg) } + logInfo(s"Successfully patched resource $name") } def deleteJobObject(name: String): Unit = { val request = new Request.Builder().delete().url(s"$apiEndpoint/$name").build() val response = httpClient.newCall(request).execute() - if (response.code() == 200) { - logInfo(s"Successfully deleted resource $name") - } else { + if (!response.isSuccessful) { val msg = s"Failed to delete resource $name. ${response.message()}. $request" logError(msg) throw new SparkException(msg) } + logInfo(s"Successfully deleted resource $name") } def getJobObject(name: String): SparkJobState = { val request = new Request.Builder().get().url(s"$apiEndpoint/$name").build() val response = httpClient.newCall(request).execute() - if (response.code() == 200) { - logInfo(s"Successfully retrieved resource $name") - read[SparkJobState](response.body().string()) - } else { + if (!response.isSuccessful) { val msg = s"Failed to retrieve resource $name. ${response.message()}" logError(msg) throw new SparkException(msg) } + logInfo(s"Successfully retrieved resource $name") + read[SparkJobState](response.body().string()) } /** @@ -175,15 +148,20 @@ class SparkJobResource(client: KubernetesClient)(implicit ec: ExecutionContext) val request = new Request.Builder().get().url(s"$apiEndpoint?watch=true").build() httpClient.newCall(request).execute() match { - case r: Response if r.code() == 200 => + case r: Response if r.isSuccessful => val deleteWatch = watchJobObjectUtil(r) logInfo("Starting watch on object") deleteWatch onComplete { - case Success(w: WatchObject) => promiseWatchOver success w - case Success(_) => throw new SparkException("Unexpected Response received") - case Failure(e: Throwable) => throw new SparkException(e.getMessage) + case Success(w: WatchObject) => promiseWatchOver trySuccess w + case Success(_) => + promiseWatchOver tryFailure new SparkException("Unexpected Response received") + case Failure(e: Throwable) => + promiseWatchOver tryFailure new SparkException(e.getMessage) } - case _: Response => throw new IllegalStateException("There's fire on the mountain") + case r: Response => + val msg = s"Failed to start watch on resource ${r.code()} ${r.message()}" + logWarning(msg) + throw new SparkException(msg) } promiseWatchOver.future } diff --git a/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/JobStateSerDe.scala b/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/JobStateSerDe.scala new file mode 100644 index 0000000000000..9fb9b3742ab0d --- /dev/null +++ b/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/JobStateSerDe.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.kubernetes + +import org.json4s.{CustomSerializer, JString} +import org.json4s.JsonAST.JNull + +import org.apache.spark.scheduler.cluster.kubernetes.JobState.JobState + +/** + * JobState Serializer and Deserializer + */ +object JobStateSerDe extends CustomSerializer[JobState](_ => + ({ + case JString("SUBMITTED") => JobState.SUBMITTED + case JString("QUEUED") => JobState.QUEUED + case JString("RUNNING") => JobState.RUNNING + case JString("FINISHED") => JobState.FINISHED + case JString("KILLED") => JobState.KILLED + case JString("FAILED") => JobState.FAILED + case JNull => + throw new UnsupportedOperationException("No JobState Specified") + }, { + case JobState.FAILED => JString("FAILED") + case JobState.SUBMITTED => JString("SUBMITTED") + case JobState.KILLED => JString("KILLED") + case JobState.FINISHED => JString("FINISHED") + case JobState.QUEUED => JString("QUEUED") + case JobState.RUNNING => JString("RUNNING") + }) +) From f1ef3e88ca3c3c7ae5ee23b2b56541440cf153f0 Mon Sep 17 00:00:00 2001 From: iyanuobidele Date: Thu, 15 Dec 2016 18:03:11 -0800 Subject: [PATCH 4/5] handling blocking call in watcher + changing backend clean up logic --- .../deploy/kubernetes/SparkJobResource.scala | 51 +++++++++++++++---- .../KubernetesClusterSchedulerBackend.scala | 26 +++++++--- 2 files changed, 60 insertions(+), 17 deletions(-) diff --git a/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala b/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala index bb484cfc7e8fc..941b4d6b7e721 100644 --- a/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala +++ b/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala @@ -17,7 +17,9 @@ package org.apache.spark.deploy.kubernetes -import scala.concurrent.{ExecutionContext, Future, Promise} +import java.util.concurrent.{Executors, ThreadFactory} + +import scala.concurrent.{blocking, ExecutionContext, Future, Promise} import scala.util.{Failure, Success} import scala.util.control.Breaks.{break, breakable} @@ -54,6 +56,16 @@ class SparkJobResource(client: KubernetesClient)(implicit ec: ExecutionContext) import SparkJobResource._ + private lazy val blockingThreadPool = Executors.newCachedThreadPool( + new ThreadFactory { + def newThread(r: Runnable): Thread = { + val thread = new Thread(r) + thread.setDaemon(true) + thread + } + } + ) + private implicit val formats = DefaultFormats + JobStateSerDe private val httpClient = getHttpClient(client.asInstanceOf[BaseClient]) private val kind = "SparkJob" @@ -61,6 +73,23 @@ class SparkJobResource(client: KubernetesClient)(implicit ec: ExecutionContext) private val apiEndpoint = s"${client.getMasterUrl}/apis/$apiVersion/" + s"namespaces/${client.getNamespace}/sparkjobs" + private def executeBlocking[T](cb: => T): Future[T] = { + val p = Promise[T]() + + blockingThreadPool.execute( + new Runnable { + override def run(): Unit = { + try { + p.trySuccess(blocking(cb)) + } catch { + case e: Throwable => logError(e.getMessage) + p.tryFailure(e) + } + } + }) + p.future + } + private def getHttpClient(client: BaseClient): OkHttpClient = { val field = classOf[BaseClient].getDeclaredField("httpClient") try { @@ -141,7 +170,7 @@ class SparkJobResource(client: KubernetesClient)(implicit ec: ExecutionContext) /** * This method has an helper method that blocks to watch the object. - * The future is completed on a Delete event. + * The future is completed on a Delete event or source exhaustion. */ def watchJobObject(): Future[WatchObject] = { val promiseWatchOver = Promise[WatchObject]() @@ -167,22 +196,27 @@ class SparkJobResource(client: KubernetesClient)(implicit ec: ExecutionContext) } /** - * This method has a blocking call inside it. - * However it is wrapped in a future, so it'll take off in another thread + * This method has a blocking call - wait on SSE - inside it. + * However it is sent off in a new thread */ private def watchJobObjectUtil(response: Response): Future[WatchObject] = { val promiseOfJobState = Promise[WatchObject]() val buffer = new Buffer() val source: BufferedSource = response.body().source() - Future { + + executeBlocking { breakable { - // This will block until there are bytes to read or the source is definitely exhausted. + // This will block until there are bytes to read or the source is exhausted. while (!source.exhausted()) { source.read(buffer, 8192) match { - case -1 => cleanUpListener(source, buffer, response) + case -1 => + promiseOfJobState tryFailure + new SparkException("Source is exhausted and object state is unknown") + cleanUpListener(source, buffer, response) case _ => val wo = read[WatchObject](buffer.readUtf8()) wo match { - case WatchObject("DELETED", _) => promiseOfJobState success wo + case WatchObject("DELETED", _) => + promiseOfJobState trySuccess wo cleanUpListener(source, buffer, response) case WatchObject(_, _) => } @@ -199,5 +233,4 @@ class SparkJobResource(client: KubernetesClient)(implicit ec: ExecutionContext) response.close() break() } - } diff --git a/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index ee90366152710..afd983938bf92 100644 --- a/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -60,13 +60,15 @@ private[spark] class KubernetesClusterSchedulerBackend( KubernetesClusterScheduler.defaultNameSpace) private val dynamicExecutors = Utils.isDynamicAllocationEnabled(conf) - private val executorService = Executors.newFixedThreadPool(4) // why 4 ?! + private val executorService = Executors.newCachedThreadPool() private implicit val executionContext = ExecutionContext.fromExecutorService(executorService) private val sparkJobResource = new SparkJobResource(client)(executionContext) private val imagePullSecret = System.getProperty("SPARK_IMAGE_PULLSECRET", "") + private var isObjectDeleted: Boolean = _ + // executor back-ends take their configuration this way if (dynamicExecutors) { sc.getConf.setExecutorEnv("spark.dynamicAllocation.enabled", "true") @@ -90,6 +92,7 @@ private[spark] class KubernetesClusterSchedulerBackend( private def startWatcher(): Unit = { sparkJobResource.watchJobObject() onComplete { case Success(w: WatchObject) if w.`type` == "DELETED" => + isObjectDeleted = true logInfo("TPR Object deleted. Cleaning up") stop() case Success(_: WatchObject) => throw new SparkException("Unexpected response received") @@ -98,17 +101,24 @@ private[spark] class KubernetesClusterSchedulerBackend( } override def stop(): Unit = { + if (isObjectDeleted) { + stopUtil() + } else { + try { + sparkJobResource.deleteJobObject(jobObjectName) + } catch { + case e: SparkException => + logWarning(s"SparkJob object not deleted. ${e.getMessage}") + // what else do we need to do here ? + } + } + } + + private def stopUtil() = { // Kill all executor pods indiscriminately killExecutorPods(executorToPod.toVector) killExecutorPods(shutdownToPod.toVector) // TODO: pods that failed during build up due to some error are left behind. - try{ - sparkJobResource.deleteJobObject(jobObjectName) - } catch { - case e: SparkException => - logWarning(s"SparkJob object not deleted. ${e.getMessage}") - // what else do we need to do here ? - } super.stop() } From afc5d464efe96484069d357d904bfe6adc00e33e Mon Sep 17 00:00:00 2001 From: iyanuobidele Date: Thu, 5 Jan 2017 14:26:13 -0800 Subject: [PATCH 5/5] using spark wrappers for thread creation + changing resource crud logic + separating external deletion of resource --- .../spark/deploy/kubernetes/Client.scala | 2 - .../deploy/kubernetes/SparkJobResource.scala | 278 +++++++++++------- .../kubernetes/JobResourceCreateCall.scala | 41 +++ .../KubernetesClusterScheduler.scala | 44 +-- .../KubernetesClusterSchedulerBackend.scala | 131 ++++++--- 5 files changed, 326 insertions(+), 170 deletions(-) create mode 100644 resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/JobResourceCreateCall.scala diff --git a/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 8914a9c18b4c2..7b43b4faf6201 100644 --- a/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -45,8 +45,6 @@ private[spark] class Client(val args: ClientArguments, scheduler.stop() shutdownLatch.countDown() System.clearProperty("SPARK_KUBERNETES_MODE") - System.clearProperty("SPARK_IMAGE_PULLSECRET") - System.clearProperty("SPARK_JOB_OBJECT_NAME") } def awaitShutdown(): Unit = { diff --git a/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala b/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala index 941b4d6b7e721..0f3f6ec956716 100644 --- a/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala +++ b/resource-managers/kubernetes/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala @@ -17,28 +17,49 @@ package org.apache.spark.deploy.kubernetes -import java.util.concurrent.{Executors, ThreadFactory} +import java.nio.file.{Files, Paths} +import java.util.concurrent.TimeUnit import scala.concurrent.{blocking, ExecutionContext, Future, Promise} -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} import scala.util.control.Breaks.{break, breakable} import io.fabric8.kubernetes.client.{BaseClient, KubernetesClient} import okhttp3._ import okio.{Buffer, BufferedSource} -import org.json4s.DefaultFormats +import org.json4s.{DefaultFormats, Formats} import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.json4s.jackson.Serialization.{read, write} +import org.apache.spark.deploy.kubernetes.SparkJobResource._ import org.apache.spark.SparkException import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.cluster.kubernetes.JobResourceCreateCall +import org.apache.spark.scheduler.cluster.kubernetes.JobResourceRUDCalls import org.apache.spark.scheduler.cluster.kubernetes.JobStateSerDe /* - * Representation of a Spark Job State in Kubernetes + * CRUD + Watch operations on a Spark Job State in Kubernetes * */ -object SparkJobResource { +private[spark] object SparkJobResource { + + implicit val formats: Formats = DefaultFormats + JobStateSerDe + + val kind = "SparkJob" + val apiVersion = "apache.io/v1" + val apiEndpoint = s"apis/$apiVersion/namespaces/%s/sparkjobs" + + def getHttpClient(client: BaseClient): OkHttpClient = { + val field = classOf[BaseClient].getDeclaredField("httpClient") + try { + field.setAccessible(true) + field.get(client).asInstanceOf[OkHttpClient] + } finally { + field.setAccessible(false) + } + } + case class Metadata(name: String, uid: Option[String] = None, labels: Option[Map[String, String]] = None, @@ -52,114 +73,173 @@ object SparkJobResource { case class WatchObject(`type`: String, `object`: SparkJobState) } -class SparkJobResource(client: KubernetesClient)(implicit ec: ExecutionContext) extends Logging { - import SparkJobResource._ +private[spark] class SparkJobCreateResource(client: KubernetesClient, namespace: String) + extends JobResourceCreateCall with Logging { - private lazy val blockingThreadPool = Executors.newCachedThreadPool( - new ThreadFactory { - def newThread(r: Runnable): Thread = { - val thread = new Thread(r) - thread.setDaemon(true) - thread - } + private val httpClient = getHttpClient(client.asInstanceOf[BaseClient]) + + /** + * Using a Map as an argument here allows adding more info into the Object if needed + * This is currently called on the client machine. We can avoid the token use. + * */ + override def createJobObject(name: String, keyValuePairs: Map[String, Any]): Unit = { + val resourceObject = + SparkJobState(apiVersion, kind, Metadata(name), keyValuePairs) + val payload = parse(write(resourceObject)) + val requestBody = RequestBody + .create(MediaType.parse("application/json"), compact(render(payload))) + val request = new Request.Builder() + .post(requestBody) + .url(s"${client.getMasterUrl}${apiEndpoint.format(namespace)}") + .build() + logDebug(s"Create Request: $request") + val response = httpClient.newCall(request).execute() + if (!response.isSuccessful) { + response.body().close() + val msg = + s"Failed to post resource $name. ${response.toString}. ${compact(render(payload))}" + logError(msg) + throw new SparkException(msg) } - ) + response.body().close() + logDebug(s"Successfully posted resource $name: " + + s"${pretty(render(parse(write(resourceObject))))}") + } +} + +private[spark] class SparkJobRUDResource( + client: KubernetesClient, + namespace: String, + ec: ExecutionContext) extends JobResourceRUDCalls with Logging { + + private val protocol = "https://" - private implicit val formats = DefaultFormats + JobStateSerDe private val httpClient = getHttpClient(client.asInstanceOf[BaseClient]) - private val kind = "SparkJob" - private val apiVersion = "apache.io/v1" - private val apiEndpoint = s"${client.getMasterUrl}/apis/$apiVersion/" + - s"namespaces/${client.getNamespace}/sparkjobs" - private def executeBlocking[T](cb: => T): Future[T] = { - val p = Promise[T]() + private var watchSource: BufferedSource = _ + + private lazy val buffer = new Buffer() - blockingThreadPool.execute( - new Runnable { + // Since this will be running inside a pod + // we can access the pods token and use it with the Authorization header when + // making rest calls to the k8s Api + private val kubeToken = { + val path = Paths.get("/var/run/secrets/kubernetes.io/serviceaccount/token") + val tok = Try(new String(Files.readAllBytes(path))) match { + case Success(some) => Option(some) + case Failure(e: Throwable) => logError(s"${e.getMessage}") + None + } + tok.map(t => t).getOrElse{ + // Log a warning just in case, but this should almost certainly never happen + logWarning("Error while retrieving pod token") + "" + } + } + + // we can also get the host from the environment variable + private val k8sServiceHost = { + val host = Try(sys.env("KUBERNETES_SERVICE_HOST")) match { + case Success(h) => Option(h) + case Failure(_) => None + } + host.map(h => h).getOrElse{ + // Log a warning just in case, but this should almost certainly never happen + logWarning("Error while retrieving k8s host address") + "127.0.0.1" + } + } + + // the port from the environment variable + private val k8sPort = { + val port = Try(sys.env("KUBERNETES_PORT_443_TCP_PORT")) match { + case Success(p) => Option(p) + case Failure(_) => None + } + port.map(p => p).getOrElse{ + // Log a warning just in case, but this should almost certainly never happen + logWarning("Error while retrieving k8s host port") + "8001" + } + } + + private def executeBlocking(cb: => WatchObject): Future[WatchObject] = { + val p = Promise[WatchObject]() + ec.execute(new Runnable { override def run(): Unit = { try { p.trySuccess(blocking(cb)) } catch { - case e: Throwable => logError(e.getMessage) - p.tryFailure(e) + case e: Throwable => p.tryFailure(e) } } }) p.future } - private def getHttpClient(client: BaseClient): OkHttpClient = { - val field = classOf[BaseClient].getDeclaredField("httpClient") - try { - field.setAccessible(true) - field.get(client).asInstanceOf[OkHttpClient] - } finally { - field.setAccessible(false) + // Serves as a way to interrupt to the watcher thread. + // This closes the source the watcher is reading from and as a result triggers promise completion + def stopWatcher(): Unit = { + if (watchSource != null) { + buffer.close() + watchSource.close() } } - /** - * using a Map as an argument here allows adding more info into the Object if needed - * */ - def createJobObject(name: String, keyValuePairs: Map[String, Any]): Unit = { - val resourceObject = - SparkJobState(apiVersion, kind, Metadata(name), keyValuePairs) - val payload = parse(write(resourceObject)) - val requestBody = RequestBody - .create(MediaType.parse("application/json"), compact(render(payload))) - val request = - new Request.Builder().post(requestBody).url(apiEndpoint).build() - val response = httpClient.newCall(request).execute() - if (!response.isSuccessful) { - val msg = - s"Failed to post resource $name. ${response.toString}. ${compact(render(payload))}" - logError(msg) - throw new SparkException(msg) - } - logInfo(s"Successfully posted resource $name: " + - s"${pretty(render(parse(write(resourceObject))))}") - } - - def updateJobObject(name: String, value: String, fieldPath: String): Unit = { + override def updateJobObject(name: String, value: String, fieldPath: String): Unit = { val payload = List( ("op" -> "replace") ~ ("path" -> fieldPath) ~ ("value" -> value)) val requestBody = RequestBody.create( MediaType.parse("application/json-patch+json"), compact(render(payload))) val request = new Request.Builder() - .post(requestBody) - .url(s"$apiEndpoint/$name") + .addHeader("Authorization", s"Bearer $kubeToken") + .patch(requestBody) + .url(s"$protocol$k8sServiceHost:$k8sPort/${apiEndpoint.format(namespace)}/$name") .build() + logDebug(s"Update Request: $request") val response = httpClient.newCall(request).execute() if (!response.isSuccessful) { + response.body().close() val msg = s"Failed to patch resource $name. ${response.message()}. ${compact(render(payload))}" logError(msg) - throw new SparkException(msg) + throw new SparkException(s"${response.code()} ${response.message()}") } - logInfo(s"Successfully patched resource $name") + response.body().close() + logDebug(s"Successfully patched resource $name.") } - def deleteJobObject(name: String): Unit = { - val request = - new Request.Builder().delete().url(s"$apiEndpoint/$name").build() + override def deleteJobObject(name: String): Unit = { + val request = new Request.Builder() + .addHeader("Authorization", s"Bearer $kubeToken") + .delete() + .url(s"$protocol$k8sServiceHost:$k8sPort/${apiEndpoint.format(namespace)}/$name") + .build() + logDebug(s"Delete Request: $request") val response = httpClient.newCall(request).execute() if (!response.isSuccessful) { + response.body().close() val msg = s"Failed to delete resource $name. ${response.message()}. $request" logError(msg) throw new SparkException(msg) } + response.body().close() logInfo(s"Successfully deleted resource $name") } def getJobObject(name: String): SparkJobState = { - val request = - new Request.Builder().get().url(s"$apiEndpoint/$name").build() + val request = new Request.Builder() + .addHeader("Authorization", s"Bearer $kubeToken") + .get() + .url(s"$protocol$k8sServiceHost:$k8sPort/${apiEndpoint.format(namespace)}/$name") + .build() + logDebug(s"Get Request: $request") val response = httpClient.newCall(request).execute() if (!response.isSuccessful) { + response.body().close() val msg = s"Failed to retrieve resource $name. ${response.message()}" logError(msg) throw new SparkException(msg) @@ -171,28 +251,25 @@ class SparkJobResource(client: KubernetesClient)(implicit ec: ExecutionContext) /** * This method has an helper method that blocks to watch the object. * The future is completed on a Delete event or source exhaustion. + * This method relies on the assumption of one sparkjob per namespace */ - def watchJobObject(): Future[WatchObject] = { - val promiseWatchOver = Promise[WatchObject]() - val request = - new Request.Builder().get().url(s"$apiEndpoint?watch=true").build() - httpClient.newCall(request).execute() match { - case r: Response if r.isSuccessful => - val deleteWatch = watchJobObjectUtil(r) - logInfo("Starting watch on object") - deleteWatch onComplete { - case Success(w: WatchObject) => promiseWatchOver trySuccess w - case Success(_) => - promiseWatchOver tryFailure new SparkException("Unexpected Response received") - case Failure(e: Throwable) => - promiseWatchOver tryFailure new SparkException(e.getMessage) - } - case r: Response => - val msg = s"Failed to start watch on resource ${r.code()} ${r.message()}" - logWarning(msg) - throw new SparkException(msg) + override def watchJobObject(): Future[WatchObject] = { + val watchClient = httpClient.newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build() + val request = new Request.Builder() + .addHeader("Authorization", s"Bearer $kubeToken") + .get() + .url(s"$protocol$k8sServiceHost:$k8sPort/${apiEndpoint.format(namespace)}?watch=true") + .build() + logDebug(s"Watch Request: $request") + val resp = watchClient.newCall(request).execute() + if (!resp.isSuccessful) { + resp.body().close() + val msg = s"Failed to start watch on resource ${resp.code()} ${resp.message()}" + logWarning(msg) + throw new SparkException(msg) } - promiseWatchOver.future + logInfo(s"Starting watch on jobResource") + watchJobObjectUtil(resp) } /** @@ -200,37 +277,34 @@ class SparkJobResource(client: KubernetesClient)(implicit ec: ExecutionContext) * However it is sent off in a new thread */ private def watchJobObjectUtil(response: Response): Future[WatchObject] = { - val promiseOfJobState = Promise[WatchObject]() - val buffer = new Buffer() - val source: BufferedSource = response.body().source() - + @volatile var wo: WatchObject = null + watchSource = response.body().source() executeBlocking { breakable { // This will block until there are bytes to read or the source is exhausted. - while (!source.exhausted()) { - source.read(buffer, 8192) match { + while (!watchSource.exhausted()) { + watchSource.read(buffer, 8192) match { case -1 => - promiseOfJobState tryFailure - new SparkException("Source is exhausted and object state is unknown") - cleanUpListener(source, buffer, response) - case _ => val wo = read[WatchObject](buffer.readUtf8()) + cleanUpListener(watchSource, buffer) + throw new SparkException("Source is exhausted and object state is unknown") + case _ => + wo = read[WatchObject](buffer.readUtf8()) wo match { - case WatchObject("DELETED", _) => - promiseOfJobState trySuccess wo - cleanUpListener(source, buffer, response) - case WatchObject(_, _) => + case WatchObject("DELETED", w) => + logInfo(s"${w.metadata.name} has been deleted") + cleanUpListener(watchSource, buffer) + case WatchObject(e, _) => logInfo(s"$e event. Still watching") } } } } + wo } - promiseOfJobState.future } - private def cleanUpListener(source: BufferedSource, buffer: Buffer, response: Response): Unit = { + private def cleanUpListener(source: BufferedSource, buffer: Buffer): Unit = { buffer.close() source.close() - response.close() break() } } diff --git a/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/JobResourceCreateCall.scala b/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/JobResourceCreateCall.scala new file mode 100644 index 0000000000000..42127486eb569 --- /dev/null +++ b/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/JobResourceCreateCall.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import scala.concurrent.Future + +import org.apache.spark.deploy.kubernetes.SparkJobResource.{SparkJobState, WatchObject} + +/** + * Isolated this since the method is called on the client machine + */ +trait JobResourceCreateCall { + def createJobObject(name: String, keyValuePairs: Map[String, Any]): Unit +} + +/** + * RUD and W - Read, Update, Delete & Watch + */ +trait JobResourceRUDCalls { + def deleteJobObject(name: String): Unit + + def getJobObject(name: String): SparkJobState + + def updateJobObject(name: String, value: String, fieldPath: String): Unit + + def watchJobObject(): Future[WatchObject] +} diff --git a/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala b/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala index a288abf1dabbe..6f1b37601d171 100644 --- a/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala +++ b/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterScheduler.scala @@ -19,11 +19,9 @@ package org.apache.spark.scheduler.cluster.kubernetes import java.io.File import java.util.Date -import java.util.concurrent.Executors import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import scala.concurrent.ExecutionContext.Implicits.global import scala.util.{Failure, Random, Success, Try} import io.fabric8.kubernetes.api.model.{Pod, PodBuilder, PodFluent, ServiceBuilder} @@ -31,7 +29,7 @@ import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, Kub import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.Command -import org.apache.spark.deploy.kubernetes.{ClientArguments, SparkJobResource} +import org.apache.spark.deploy.kubernetes.{ClientArguments, SparkJobCreateResource} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.util.Utils @@ -79,7 +77,7 @@ private[spark] class KubernetesClusterScheduler(conf: SparkConf) throw new SparkException("Spark image not set. Please configure spark.kubernetes.sparkImage") } - private val sparkJobResource = new SparkJobResource(client) + private val sparkJobResource = new SparkJobCreateResource(client, nameSpace) private val imagePullSecret = conf.get("spark.kubernetes.imagePullSecret", "") private val isImagePullSecretSet = isSecretRunning(imagePullSecret) @@ -87,17 +85,23 @@ private[spark] class KubernetesClusterScheduler(conf: SparkConf) logWarning("Instances: " + instances) def start(args: ClientArguments): Unit = { - val sparkJobName = - s"spark-job-$nameSpace-${Random.alphanumeric take 5 mkString ""}".toLowerCase() - System.setProperty("SPARK_JOB_OBJECT_NAME", sparkJobName) - val keyValuePairs = Map("num-executors" -> instances, + val sparkJobResourceName = + s"sparkJob-$nameSpace-${Random.alphanumeric take 5 mkString ""}".toLowerCase() + val keyValuePairs = Map( + "num-executors" -> instances, "image" -> sparkImage, - "state" -> JobState.QUEUED) - - Try(sparkJobResource.createJobObject(sparkJobName, keyValuePairs)) match { - case Success(_) => logInfo(s"Object with name: $sparkJobName posted to k8s successfully") + "state" -> JobState.QUEUED, + "spark-driver" -> driverName, + "spark-svc" -> svcName) + + Try(sparkJobResource.createJobObject(sparkJobResourceName, keyValuePairs)) match { + case Success(_) => + conf.set("spark.kubernetes.jobResourceName", sparkJobResourceName) + conf.set("spark.kubernetes.jobResourceSet", "true") + logInfo(s"Object with name: $sparkJobResourceName posted to k8s successfully") case Failure(e: Throwable) => // log and carry on - logInfo(s"Failed to post object $sparkJobName due to ${e.getMessage}") + conf.set("spark.kubernetes.jobResourceSet", "false") + logInfo(s"Failed to post object $sparkJobResourceName due to ${e.getMessage}") } startDriver(client, args) @@ -121,8 +125,8 @@ private[spark] class KubernetesClusterScheduler(conf: SparkConf) val clientJarUri = args.userJar // This is the kubernetes master we're launching on. - val kubernetesHost = "k8s://" + client.getMasterUrl().getHost() - logInfo("Using as kubernetes-master: " + kubernetesHost.toString()) + val kubernetesHost = "k8s://" + client.getMasterUrl.getHost + logInfo("Using as kubernetes-master: " + kubernetesHost.toString) val submitArgs = scala.collection.mutable.ArrayBuffer.empty[String] submitArgs ++= Vector( @@ -135,6 +139,9 @@ private[spark] class KubernetesClusterScheduler(conf: SparkConf) s"--conf=spark.kubernetes.namespace=$nameSpace", s"--conf=spark.kubernetes.sparkImage=$sparkImage") + submitArgs ++= conf.getAll.collect { + case (name, value) if !confBlackList.contains(name) => s"--conf $name=$value" } + if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) { submitArgs ++= Vector( "--conf spark.dynamicAllocation.enabled=true", @@ -182,7 +189,8 @@ private[spark] class KubernetesClusterScheduler(conf: SparkConf) try { client.secrets().inNamespace(nameSpace).withName(name).get() != null } catch { - case e: KubernetesClientException => false + case e: KubernetesClientException => logError(e.getMessage) + false // is this enough to throw a SparkException? For now default to false } } @@ -211,15 +219,15 @@ private[spark] class KubernetesClusterScheduler(conf: SparkConf) private def buildPodUtil(pod: PodFluent.SpecNested[PodBuilder]): Pod = { if (isImagePullSecretSet) { - System.setProperty("SPARK_IMAGE_PULLSECRET", imagePullSecret) pod.addNewImagePullSecret(imagePullSecret).endSpec().build() } else { + conf.remove("spark.kubernetes.imagePullSecret") pod.endSpec().build() } } def setupKubernetesClient(): KubernetesClient = { - val sparkHost = new java.net.URI(conf.get("spark.master")).getHost() + val sparkHost = new java.net.URI(conf.get("spark.master")).getHost var config = new ConfigBuilder().withNamespace(nameSpace) if (sparkHost != "default") { diff --git a/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index afd983938bf92..40dc66d342584 100644 --- a/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -17,42 +17,39 @@ package org.apache.spark.scheduler.cluster.kubernetes -import java.util.concurrent.Executors - import scala.collection.{concurrent, mutable} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration import scala.util.{Failure, Random, Success, Try} import io.fabric8.kubernetes.api.model.{Pod, PodBuilder, PodFluent} import io.fabric8.kubernetes.client.DefaultKubernetesClient import org.apache.spark.{SparkConf, SparkContext, SparkException} -import org.apache.spark.deploy.kubernetes.SparkJobResource import org.apache.spark.deploy.kubernetes.SparkJobResource.WatchObject +import org.apache.spark.deploy.kubernetes.SparkJobRUDResource import org.apache.spark.internal.config._ import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class KubernetesClusterSchedulerBackend( - scheduler: TaskSchedulerImpl, - sc: SparkContext) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { + scheduler: TaskSchedulerImpl, + sc: SparkContext) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { private val client = new DefaultKubernetesClient() private val DEFAULT_NUMBER_EXECUTORS = 2 - private val jobObjectName = System.getProperty("SPARK_JOB_OBJECT_NAME", "") + private val jobResourceName = conf.get("spark.kubernetes.jobResourceName", "") // using a concurrent TrieMap gets rid of possible concurrency issues // key is executor id, value is pod name - private var executorToPod = new concurrent.TrieMap[String, String] // active executors - private var shutdownToPod = new concurrent.TrieMap[String, String] // pending shutdown - private var executorID = 0 + private val executorToPod = new concurrent.TrieMap[String, String] // active executors + private val shutdownToPod = new concurrent.TrieMap[String, String] // pending shutdown private val sparkImage = conf.get("spark.kubernetes.sparkImage") private val ns = conf.get( @@ -60,14 +57,20 @@ private[spark] class KubernetesClusterSchedulerBackend( KubernetesClusterScheduler.defaultNameSpace) private val dynamicExecutors = Utils.isDynamicAllocationEnabled(conf) - private val executorService = Executors.newCachedThreadPool() - private implicit val executionContext = ExecutionContext.fromExecutorService(executorService) + private implicit val resourceWatcherPool = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonFixedThreadPool(2, "resource-watcher-pool")) + + private val sparkJobResource = new SparkJobRUDResource(client, ns, resourceWatcherPool) - private val sparkJobResource = new SparkJobResource(client)(executionContext) + private val imagePullSecret = conf.get("spark.kubernetes.imagePullSecret", "") - private val imagePullSecret = System.getProperty("SPARK_IMAGE_PULLSECRET", "") + private val executorBaseName = + s"spark-executor-${Random.alphanumeric take 5 mkString ""}".toLowerCase - private var isObjectDeleted: Boolean = _ + private var workingWithJobResource = + conf.get("spark.kubernetes.jobResourceSet", "false").toBoolean + + private var watcherFuture: Future[WatchObject] = _ // executor back-ends take their configuration this way if (dynamicExecutors) { @@ -77,44 +80,65 @@ private[spark] class KubernetesClusterSchedulerBackend( override def start(): Unit = { super.start() - logInfo(s"Updating Job Resource with name. $jobObjectName") - Try(sparkJobResource - .updateJobObject(jobObjectName, JobState.SUBMITTED.toString, "/spec/state")) match { - case Success(_) => startWatcher() - case Failure(e: SparkException) => - logWarning(s"SparkJob object not updated. ${e.getMessage}") - // SparkJob should continue if this fails as discussed on thread. - // TODO: we should short-circuit on things like update or delete - } + startLogic() createExecutorPods(getInitialTargetExecutorNumber(sc.getConf)) } - private def startWatcher(): Unit = { - sparkJobResource.watchJobObject() onComplete { - case Success(w: WatchObject) if w.`type` == "DELETED" => - isObjectDeleted = true - logInfo("TPR Object deleted. Cleaning up") - stop() - case Success(_: WatchObject) => throw new SparkException("Unexpected response received") - case Failure(e: Throwable) => throw new SparkException(e.getMessage) + private def startLogic(): Unit = { + if (workingWithJobResource) { + logInfo(s"Updating Job Resource with name. $jobResourceName") + Try(sparkJobResource + .updateJobObject(jobResourceName, JobState.SUBMITTED.toString, "/spec/state")) match { + case Success(_) => startWatcher() + case Failure(e: SparkException) if e.getMessage startsWith "404" => + logWarning(s"Possible deletion of jobResource before backend start") + workingWithJobResource = false + case Failure(e: SparkException) => + logWarning(s"SparkJob object not updated. ${e.getMessage}") + // SparkJob should continue if this fails as discussed + // Maybe some retry + backoff mechanism ? + } } } + private def startWatcher(): Unit = { + resourceWatcherPool.execute(new Runnable { + override def run(): Unit = { + watcherFuture = sparkJobResource.watchJobObject() + watcherFuture onComplete { + case Success(w: WatchObject) if w.`type` == "DELETED" => + logInfo("TPR Object deleted externally. Cleaning up") + stopUtil() + // TODO: are there other todo's for a clean kill while job is running? + case Success(w: WatchObject) => + // Log a warning just in case, but this should almost certainly never happen + logWarning(s"Unexpected response received. $w") + deleteJobResource() + workingWithJobResource = false + case Failure(e: Throwable) => + logWarning(e.getMessage) + deleteJobResource() + workingWithJobResource = false // in case watcher fails early on + } + } + }) + } + override def stop(): Unit = { - if (isObjectDeleted) { - stopUtil() - } else { + if (workingWithJobResource) { + sparkJobResource.stopWatcher() try { - sparkJobResource.deleteJobObject(jobObjectName) + ThreadUtils.awaitResult(watcherFuture, Duration.Inf) } catch { - case e: SparkException => - logWarning(s"SparkJob object not deleted. ${e.getMessage}") - // what else do we need to do here ? + case _ : Throwable => } } + stopUtil() + } private def stopUtil() = { + resourceWatcherPool.shutdown() // Kill all executor pods indiscriminately killExecutorPods(executorToPod.toVector) killExecutorPods(shutdownToPod.toVector) @@ -122,6 +146,16 @@ private[spark] class KubernetesClusterSchedulerBackend( super.stop() } + private def deleteJobResource(): Unit = { + try { + sparkJobResource.deleteJobObject(jobResourceName) + } catch { + case e: SparkException => + logError(s"SparkJob object not deleted. ${e.getMessage}") + // what else do we need to do here ? + } + } + // Dynamic allocation interfaces override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = { logInfo(s"Received doRequestTotalExecutors: $requestedTotal") @@ -145,10 +179,13 @@ private[spark] class KubernetesClusterSchedulerBackend( } // TODO: be smarter about when to update. - Try(sparkJobResource - .updateJobObject(jobObjectName, requestedTotal.toString, "/spec/num-executors")) match { - case Success(_) => logInfo(s"Object with name: $jobObjectName updated successfully") - case Failure(e: SparkException) => logWarning(s"SparkJob Object not updated. ${e.getMessage}") + if (workingWithJobResource) { + Try(sparkJobResource + .updateJobObject(jobResourceName, requestedTotal.toString, "/spec/num-executors")) match { + case Success(_) => logInfo(s"Object with name: $jobResourceName updated successfully") + case Failure(e: SparkException) => + logWarning(s"SparkJob Object not updated. ${e.getMessage}") + } } // TODO: are there meaningful failure modes here? @@ -162,9 +199,8 @@ private[spark] class KubernetesClusterSchedulerBackend( } private def createExecutorPods(n: Int) { - for (_ <- 1 to n) { - executorID += 1 - executorToPod += ((executorID.toString, createExecutorPod(executorID))) + for (i <- 1 to n) { + executorToPod += ((i.toString, createExecutorPod(i))) } } @@ -229,7 +265,6 @@ private[spark] class KubernetesClusterSchedulerBackend( def createExecutorPod(executorNum: Int): String = { // create a single k8s executor pod. val labelMap = Map("type" -> "spark-executor") - val executorBaseName = s"spark-executor-${Random.alphanumeric take 5 mkString ""}".toLowerCase val podName = s"$executorBaseName-$executorNum" val submitArgs = mutable.ArrayBuffer.empty[String]