From 3776898416b0a8c896cea423056583a020208efc Mon Sep 17 00:00:00 2001 From: Anirudh Date: Thu, 10 Nov 2016 08:48:49 -0800 Subject: [PATCH 1/4] [revert] adding new yamls for TPR --- sbin/driver.yaml | 7 +++++++ sbin/kubernetes-resource.yaml | 10 ++++++++++ 2 files changed, 17 insertions(+) create mode 100644 sbin/driver.yaml create mode 100644 sbin/kubernetes-resource.yaml diff --git a/sbin/driver.yaml b/sbin/driver.yaml new file mode 100644 index 0000000000000..de806599c2c5c --- /dev/null +++ b/sbin/driver.yaml @@ -0,0 +1,7 @@ +apiVersion: "kubernetes.io/v1" +kind: SparkJob +metadata: + name: spark-driver-1924 +image: "driver-image" +state: "completed" +num-executors: 10 \ No newline at end of file diff --git a/sbin/kubernetes-resource.yaml b/sbin/kubernetes-resource.yaml new file mode 100644 index 0000000000000..58d2072c0622b --- /dev/null +++ b/sbin/kubernetes-resource.yaml @@ -0,0 +1,10 @@ +metadata: + name: spark-job.kubernetes.io + labels: + resource: spark-job + object: spark +apiVersion: extensions/v1beta1 +kind: ThirdPartyResource +description: "A resource that manages a spark job" +versions: + - name: v1 \ No newline at end of file From f339dc220db83cee1e80011927798ddfd9289277 Mon Sep 17 00:00:00 2001 From: Iyanu Obidele Date: Thu, 16 Feb 2017 16:41:49 -0800 Subject: [PATCH 2/4] initial port from foxish-fork --- .../deploy/kubernetes/SparkJobResource.scala | 106 ++++++++++++++++++ sbin/driver.yaml | 13 ++- sbin/kubernetes-resource.yaml | 2 +- 3 files changed, 114 insertions(+), 7 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala new file mode 100644 index 0000000000000..72435ee7de1f1 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.deploy.kubernetes + +import io.fabric8.kubernetes.client.{BaseClient, KubernetesClient} +import okhttp3.{MediaType, OkHttpClient, Request, RequestBody} +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +class SparkJobResource(client: KubernetesClient) extends Logging { + val ApiVersion = "apache.io/v1" + val Kind = "SparkJob" + +// apiVersion: "apache.io/v1" +// kind: "SparkJob" +// metadata: +// name: "spark-job-1" +// spec: +// image: "driver-image" +// state: "completed" +// num-executors: 10 + + val httpClient = this.getHTTPClient(client.asInstanceOf[BaseClient]) + def createJobObject(name: String, executors: Int, image: String): Unit = { + val json = ("apiVersion" -> ApiVersion) ~ + ("kind" -> Kind) ~ + ("metadata" -> ("name" -> name)) ~ + ("spec" -> (("state" -> "started") ~ + ("image" -> image) ~ + ("num-executors" -> executors.toString))) + + val body = RequestBody.create(MediaType.parse("application/json"), compact(render(json))) + logInfo(s"POSTing resource ${name}:" + pretty(render(json))) + val request = new Request.Builder() + .post(body) + .url(s"${client.getMasterUrl()}" + + s"apis/apache.io/v1/namespaces/default/sparkjobs/") + .build() + val response = httpClient.newCall(request).execute() + if (response.code() > 300) { + throw new SparkException("Unable to create SparkJob: " + response.body().string()) + } + } + + def deleteJobObject(name: String): Unit = { + logInfo(s"DELETEing resource ${name}") + val request = new Request.Builder() + .delete() + .url(s"${client.getMasterUrl()}" + + s"apis/apache.io/v1/namespaces/default/sparkjobs/${name}") + .build() + val response = httpClient.newCall(request).execute() + if (response.code() > 300) { + throw new SparkException("Unable to delete SparkJob: " + response.body().string()) + } + } + + def updateJobObject(name: String, field: String, value: String): Unit = { + val json = List(("op" -> "replace") ~ ("path" -> field) ~ ("value" -> value)) + val body = RequestBody.create(MediaType.parse("application/json-patch+json"), + compact(render(json))) + + logInfo(s"PATCHing resource ${name}:" + pretty(render(json))) + val request = new Request.Builder() + .patch(body) + .url(s"${client.getMasterUrl()}" + + s"apis/apache.io/v1/namespaces/default/sparkjobs/${name}") + .build() + val response = httpClient.newCall(request).execute() + if (response.code() > 300) { + throw new SparkException("Unable to patch SparkJob: " + response.body().string()) + } + } + + def findJobObject(): Unit = { + // TODO: implement logic to find things here. + } + + def getHTTPClient(client: BaseClient): OkHttpClient = { + val field = classOf[BaseClient].getDeclaredField("httpClient") + try { + field.setAccessible(true) + return field.get(client).asInstanceOf[OkHttpClient] + } finally { + field.setAccessible(false) + } + } +} diff --git a/sbin/driver.yaml b/sbin/driver.yaml index de806599c2c5c..e322a724ec01d 100644 --- a/sbin/driver.yaml +++ b/sbin/driver.yaml @@ -1,7 +1,8 @@ -apiVersion: "kubernetes.io/v1" -kind: SparkJob +apiVersion: "apache.io/v1" +kind: "SparkJob" metadata: - name: spark-driver-1924 -image: "driver-image" -state: "completed" -num-executors: 10 \ No newline at end of file + name: "spark-job-1" +spec: + image: "driver-image" + state: "completed" + num-executors: 10 \ No newline at end of file diff --git a/sbin/kubernetes-resource.yaml b/sbin/kubernetes-resource.yaml index 58d2072c0622b..0ff3d39b45182 100644 --- a/sbin/kubernetes-resource.yaml +++ b/sbin/kubernetes-resource.yaml @@ -1,5 +1,5 @@ metadata: - name: spark-job.kubernetes.io + name: spark-job.apache.io labels: resource: spark-job object: spark From 1fd1633cef0d6263009fd4d2c0aa2c3cc3bac857 Mon Sep 17 00:00:00 2001 From: Iyanu Obidele Date: Thu, 16 Feb 2017 17:08:38 -0800 Subject: [PATCH 3/4] second iteration from foxish-fork --- .../kubernetes/JobResourceCreateCall.scala | 41 +++ .../spark/deploy/kubernetes/JobState.scala | 33 ++ .../deploy/kubernetes/JobStateSerDe.scala | 46 +++ .../deploy/kubernetes/SparkJobResource.scala | 318 ++++++++++++++---- 4 files changed, 379 insertions(+), 59 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobResourceCreateCall.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobState.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobStateSerDe.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobResourceCreateCall.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobResourceCreateCall.scala new file mode 100644 index 0000000000000..bd6b06fda4400 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobResourceCreateCall.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +import org.apache.spark.deploy.kubernetes.SparkJobResource.{SparkJobState, WatchObject} + +import scala.concurrent.Future + +/** + * Isolated this since the method is called on the client machine + */ +trait JobResourceCreateCall { + def createJobObject(name: String, keyValuePairs: Map[String, Any]): Unit +} + +/** + * RUD and W - Read, Update, Delete & Watch + */ +trait JobResourceRUDCalls { + def deleteJobObject(name: String): Unit + + def getJobObject(name: String): SparkJobState + + def updateJobObject(name: String, value: String, fieldPath: String): Unit + + def watchJobObject(): Future[WatchObject] +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobState.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobState.scala new file mode 100644 index 0000000000000..5dacf39a6357c --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobState.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.kubernetes + +private[spark] object JobState extends Enumeration { + type JobState = Value + + /* + * QUEUED - Spark Job has been queued to run + * SUBMITTED - Driver Pod deployed but tasks are not yet scheduled on worker pod(s) + * RUNNING - Task(s) have been allocated to worker pod(s) to run and Spark Job is now running + * FINISHED - Spark Job ran and exited cleanly, i.e, worker pod(s) and driver pod were + * gracefully deleted + * FAILED - Spark Job Failed due to error + * KILLED - A user manually killed this Spark Job + */ + val QUEUED, SUBMITTED, RUNNING, FINISHED, FAILED, KILLED = Value +} \ No newline at end of file diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobStateSerDe.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobStateSerDe.scala new file mode 100644 index 0000000000000..40201b8bd49de --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobStateSerDe.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.kubernetes + +import org.json4s.{CustomSerializer, JString} +import org.json4s.JsonAST.JNull + +import org.apache.spark.deploy.kubernetes.JobState.JobState + + /** + * JobState Serializer and Deserializer + * */ +object JobStateSerDe extends CustomSerializer[JobState](_ => + ({ + case JString("SUBMITTED") => JobState.SUBMITTED + case JString("QUEUED") => JobState.QUEUED + case JString("RUNNING") => JobState.RUNNING + case JString("FINISHED") => JobState.FINISHED + case JString("KILLED") => JobState.KILLED + case JString("FAILED") => JobState.FAILED + case JNull => + throw new UnsupportedOperationException("No JobState Specified") + }, { + case JobState.FAILED => JString("FAILED") + case JobState.SUBMITTED => JString("SUBMITTED") + case JobState.KILLED => JString("KILLED") + case JobState.FINISHED => JString("FINISHED") + case JobState.QUEUED => JString("QUEUED") + case JobState.RUNNING => JString("RUNNING") + }) +) \ No newline at end of file diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala index 72435ee7de1f1..b2c9e17486917 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala @@ -15,92 +15,292 @@ * limitations under the License. */ - package org.apache.spark.deploy.kubernetes +import java.nio.file.{Files, Paths} +import java.util.concurrent.TimeUnit + import io.fabric8.kubernetes.client.{BaseClient, KubernetesClient} -import okhttp3.{MediaType, OkHttpClient, Request, RequestBody} -import org.apache.spark.SparkException -import org.apache.spark.internal.Logging +import okhttp3._ +import okio.{Buffer, BufferedSource} +import org.json4s.{DefaultFormats, Formats} import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ +import org.json4s.jackson.Serialization.{read, write} +import scala.concurrent.{blocking, ExecutionContext, Future, Promise} +import scala.util.{Failure, Success, Try} +import scala.util.control.Breaks.{break, breakable} + +import org.apache.spark.SparkException +import org.apache.spark.deploy.kubernetes.SparkJobResource._ +import org.apache.spark.internal.Logging + +/* + * CRUD + Watch operations on a Spark Job State in Kubernetes + * */ +private[spark] object SparkJobResource { + + implicit val formats: Formats = DefaultFormats + JobStateSerDe + + val kind = "SparkJob" + val apiVersion = "apache.io/v1" + val apiEndpoint = s"apis/$apiVersion/namespaces/%s/sparkjobs" + + def getHttpClient(client: BaseClient): OkHttpClient = { + val field = classOf[BaseClient].getDeclaredField("httpClient") + try { + field.setAccessible(true) + field.get(client).asInstanceOf[OkHttpClient] + } finally { + field.setAccessible(false) + } + } + + case class Metadata(name: String, + uid: Option[String] = None, + labels: Option[Map[String, String]] = None, + annotations: Option[Map[String, String]] = None) + + case class SparkJobState(apiVersion: String, + kind: String, + metadata: Metadata, + spec: Map[String, Any]) + + case class WatchObject(`type`: String, `object`: SparkJobState) +} + + +private[spark] class SparkJobCreateResource(client: KubernetesClient, namespace: String) + extends JobResourceCreateCall with Logging { + + private val httpClient = getHttpClient(client.asInstanceOf[BaseClient]) + + /** + * Using a Map as an argument here allows adding more info into the Object if needed + * This is currently called on the client machine. We can avoid the token use. + * */ + override def createJobObject(name: String, keyValuePairs: Map[String, Any]): Unit = { + val resourceObject = + SparkJobState(apiVersion, kind, Metadata(name), keyValuePairs) + val payload = parse(write(resourceObject)) + val requestBody = RequestBody + .create(MediaType.parse("application/json"), compact(render(payload))) + val request = new Request.Builder() + .post(requestBody) + .url(s"${client.getMasterUrl}${apiEndpoint.format(namespace)}") + .build() + logDebug(s"Create Request: $request") + val response = httpClient.newCall(request).execute() + if (!response.isSuccessful) { + response.body().close() + val msg = + s"Failed to post resource $name. ${response.toString}. ${compact(render(payload))}" + logError(msg) + throw new SparkException(msg) + } + response.body().close() + logDebug(s"Successfully posted resource $name: " + + s"${pretty(render(parse(write(resourceObject))))}") + } +} -class SparkJobResource(client: KubernetesClient) extends Logging { - val ApiVersion = "apache.io/v1" - val Kind = "SparkJob" - -// apiVersion: "apache.io/v1" -// kind: "SparkJob" -// metadata: -// name: "spark-job-1" -// spec: -// image: "driver-image" -// state: "completed" -// num-executors: 10 - - val httpClient = this.getHTTPClient(client.asInstanceOf[BaseClient]) - def createJobObject(name: String, executors: Int, image: String): Unit = { - val json = ("apiVersion" -> ApiVersion) ~ - ("kind" -> Kind) ~ - ("metadata" -> ("name" -> name)) ~ - ("spec" -> (("state" -> "started") ~ - ("image" -> image) ~ - ("num-executors" -> executors.toString))) - - val body = RequestBody.create(MediaType.parse("application/json"), compact(render(json))) - logInfo(s"POSTing resource ${name}:" + pretty(render(json))) +private[spark] class SparkJobRUDResource( + client: KubernetesClient, + namespace: String, + ec: ExecutionContext) extends JobResourceRUDCalls with Logging { + + private val protocol = "https://" + + private val httpClient = getHttpClient(client.asInstanceOf[BaseClient]) + + private var watchSource: BufferedSource = _ + + private lazy val buffer = new Buffer() + + // Since this will be running inside a pod + // we can access the pods token and use it with the Authorization header when + // making rest calls to the k8s Api + private val kubeToken = { + val path = Paths.get("/var/run/secrets/kubernetes.io/serviceaccount/token") + val tok = Try(new String(Files.readAllBytes(path))) match { + case Success(some) => Option(some) + case Failure(e: Throwable) => logError(s"${e.getMessage}") + None + } + tok.map(t => t).getOrElse{ + // Log a warning just in case, but this should almost certainly never happen + logWarning("Error while retrieving pod token") + "" + } + } + + // we can also get the host from the environment variable + private val k8sServiceHost = { + val host = Try(sys.env("KUBERNETES_SERVICE_HOST")) match { + case Success(h) => Option(h) + case Failure(_) => None + } + host.map(h => h).getOrElse{ + // Log a warning just in case, but this should almost certainly never happen + logWarning("Error while retrieving k8s host address") + "127.0.0.1" + } + } + + // the port from the environment variable + private val k8sPort = { + val port = Try(sys.env("KUBERNETES_PORT_443_TCP_PORT")) match { + case Success(p) => Option(p) + case Failure(_) => None + } + port.map(p => p).getOrElse{ + // Log a warning just in case, but this should almost certainly never happen + logWarning("Error while retrieving k8s host port") + "8001" + } + } + + private def executeBlocking(cb: => WatchObject): Future[WatchObject] = { + val p = Promise[WatchObject]() + ec.execute(new Runnable { + override def run(): Unit = { + try { + p.trySuccess(blocking(cb)) + } catch { + case e: Throwable => p.tryFailure(e) + } + } + }) + p.future + } + + // Serves as a way to interrupt to the watcher thread. + // This closes the source the watcher is reading from and as a result triggers promise completion + def stopWatcher(): Unit = { + if (watchSource != null) { + buffer.close() + watchSource.close() + } + } + + override def updateJobObject(name: String, value: String, fieldPath: String): Unit = { + val payload = List( + ("op" -> "replace") ~ ("path" -> fieldPath) ~ ("value" -> value)) + val requestBody = RequestBody.create( + MediaType.parse("application/json-patch+json"), + compact(render(payload))) val request = new Request.Builder() - .post(body) - .url(s"${client.getMasterUrl()}" + - s"apis/apache.io/v1/namespaces/default/sparkjobs/") + .addHeader("Authorization", s"Bearer $kubeToken") + .patch(requestBody) + .url(s"$protocol$k8sServiceHost:$k8sPort/${apiEndpoint.format(namespace)}/$name") .build() + logDebug(s"Update Request: $request") val response = httpClient.newCall(request).execute() - if (response.code() > 300) { - throw new SparkException("Unable to create SparkJob: " + response.body().string()) + if (!response.isSuccessful) { + response.body().close() + val msg = + s"Failed to patch resource $name. ${response.message()}. ${compact(render(payload))}" + logError(msg) + throw new SparkException(s"${response.code()} ${response.message()}") } + response.body().close() + logDebug(s"Successfully patched resource $name.") } - def deleteJobObject(name: String): Unit = { - logInfo(s"DELETEing resource ${name}") + override def deleteJobObject(name: String): Unit = { val request = new Request.Builder() + .addHeader("Authorization", s"Bearer $kubeToken") .delete() - .url(s"${client.getMasterUrl()}" + - s"apis/apache.io/v1/namespaces/default/sparkjobs/${name}") + .url(s"$protocol$k8sServiceHost:$k8sPort/${apiEndpoint.format(namespace)}/$name") .build() + logDebug(s"Delete Request: $request") val response = httpClient.newCall(request).execute() - if (response.code() > 300) { - throw new SparkException("Unable to delete SparkJob: " + response.body().string()) + if (!response.isSuccessful) { + response.body().close() + val msg = + s"Failed to delete resource $name. ${response.message()}. $request" + logError(msg) + throw new SparkException(msg) } + response.body().close() + logInfo(s"Successfully deleted resource $name") } - def updateJobObject(name: String, field: String, value: String): Unit = { - val json = List(("op" -> "replace") ~ ("path" -> field) ~ ("value" -> value)) - val body = RequestBody.create(MediaType.parse("application/json-patch+json"), - compact(render(json))) - - logInfo(s"PATCHing resource ${name}:" + pretty(render(json))) + def getJobObject(name: String): SparkJobState = { val request = new Request.Builder() - .patch(body) - .url(s"${client.getMasterUrl()}" + - s"apis/apache.io/v1/namespaces/default/sparkjobs/${name}") + .addHeader("Authorization", s"Bearer $kubeToken") + .get() + .url(s"$protocol$k8sServiceHost:$k8sPort/${apiEndpoint.format(namespace)}/$name") .build() + logDebug(s"Get Request: $request") val response = httpClient.newCall(request).execute() - if (response.code() > 300) { - throw new SparkException("Unable to patch SparkJob: " + response.body().string()) + if (!response.isSuccessful) { + response.body().close() + val msg = s"Failed to retrieve resource $name. ${response.message()}" + logError(msg) + throw new SparkException(msg) } + logInfo(s"Successfully retrieved resource $name") + read[SparkJobState](response.body().string()) } - def findJobObject(): Unit = { - // TODO: implement logic to find things here. + /** + * This method has an helper method that blocks to watch the object. + * The future is completed on a Delete event or source exhaustion. + * This method relies on the assumption of one sparkjob per namespace + */ + override def watchJobObject(): Future[WatchObject] = { + val watchClient = httpClient.newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build() + val request = new Request.Builder() + .addHeader("Authorization", s"Bearer $kubeToken") + .get() + .url(s"$protocol$k8sServiceHost:$k8sPort/${apiEndpoint.format(namespace)}?watch=true") + .build() + logDebug(s"Watch Request: $request") + val resp = watchClient.newCall(request).execute() + if (!resp.isSuccessful) { + resp.body().close() + val msg = s"Failed to start watch on resource ${resp.code()} ${resp.message()}" + logWarning(msg) + throw new SparkException(msg) + } + logInfo(s"Starting watch on jobResource") + watchJobObjectUtil(resp) } - def getHTTPClient(client: BaseClient): OkHttpClient = { - val field = classOf[BaseClient].getDeclaredField("httpClient") - try { - field.setAccessible(true) - return field.get(client).asInstanceOf[OkHttpClient] - } finally { - field.setAccessible(false) + /** + * This method has a blocking call - wait on SSE - inside it. + * However it is sent off in a new thread + */ + private def watchJobObjectUtil(response: Response): Future[WatchObject] = { + @volatile var wo: WatchObject = null + watchSource = response.body().source() + executeBlocking { + breakable { + // This will block until there are bytes to read or the source is exhausted. + while (!watchSource.exhausted()) { + watchSource.read(buffer, 8192) match { + case -1 => + cleanUpListener(watchSource, buffer) + throw new SparkException("Source is exhausted and object state is unknown") + case _ => + wo = read[WatchObject](buffer.readUtf8()) + wo match { + case WatchObject("DELETED", w) => + logInfo(s"${w.metadata.name} has been deleted") + cleanUpListener(watchSource, buffer) + case WatchObject(e, _) => logInfo(s"$e event. Still watching") + } + } + } + } + wo } } + + private def cleanUpListener(source: BufferedSource, buffer: Buffer): Unit = { + buffer.close() + source.close() + break() + } } From c198d9f60b9d48c8f42eea8ee0ee2ea20b8d696c Mon Sep 17 00:00:00 2001 From: Iyanu Obidele Date: Thu, 16 Feb 2017 20:37:31 -0800 Subject: [PATCH 4/4] initial sparkResource implementation --- pom.xml | 2 +- ...lientBuilder.scala => ClientBuilder.scala} | 17 +- .../kubernetes/JobResourceCreateCall.scala | 41 --- .../deploy/kubernetes/SparkJobResource.scala | 306 ------------------ .../SparkJobResourceFromWithinK8s.scala | 78 +++++ .../spark/deploy/kubernetes/constants.scala | 5 + .../kubernetes/{ => tpr}/JobState.scala | 2 +- .../kubernetes/{ => tpr}/JobStateSerDe.scala | 4 +- .../deploy/kubernetes/tpr/TPRCrudCalls.scala | 262 +++++++++++++++ .../KubernetesClusterSchedulerBackend.scala | 15 +- 10 files changed, 369 insertions(+), 363 deletions(-) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/{KubernetesClientBuilder.scala => ClientBuilder.scala} (80%) delete mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobResourceCreateCall.scala delete mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResourceFromWithinK8s.scala rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/{ => tpr}/JobState.scala (96%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/{ => tpr}/JobStateSerDe.scala (93%) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala diff --git a/pom.xml b/pom.xml index a27daf08a90bb..41c1f076a8621 100644 --- a/pom.xml +++ b/pom.xml @@ -336,7 +336,7 @@ com.squareup.okhttp3 okhttp - 3.4.1 + 3.6.0 org.bouncycastle diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ClientBuilder.scala similarity index 80% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ClientBuilder.scala index 89369b30694ee..9002ed4c0788c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/ClientBuilder.scala @@ -20,11 +20,12 @@ import java.io.File import com.google.common.base.Charsets import com.google.common.io.Files -import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient} +import io.fabric8.kubernetes.client.{BaseClient, Config, ConfigBuilder, DefaultKubernetesClient} +import okhttp3._ import org.apache.spark.deploy.kubernetes.constants._ -private[spark] object KubernetesClientBuilder { +private[spark] object ClientBuilder { private val API_SERVER_TOKEN = new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH) private val CA_CERT_FILE = new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH) @@ -34,7 +35,7 @@ private[spark] object KubernetesClientBuilder { * are picked up from canonical locations, as they are injected * into the pod's disk space. */ - def buildFromWithinPod( + def buildK8sClientFromWithinPod( kubernetesNamespace: String): DefaultKubernetesClient = { var clientConfigBuilder = new ConfigBuilder() .withApiVersion("v1") @@ -51,4 +52,14 @@ private[spark] object KubernetesClientBuilder { } new DefaultKubernetesClient(clientConfigBuilder.build) } + + def buildOkhttpClientFromWithinPod(client: BaseClient): OkHttpClient = { + val field = classOf[BaseClient].getDeclaredField("httpClient") + try { + field.setAccessible(true) + field.get(client).asInstanceOf[OkHttpClient] + } finally { + field.setAccessible(false) + } + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobResourceCreateCall.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobResourceCreateCall.scala deleted file mode 100644 index bd6b06fda4400..0000000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobResourceCreateCall.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.kubernetes - -import org.apache.spark.deploy.kubernetes.SparkJobResource.{SparkJobState, WatchObject} - -import scala.concurrent.Future - -/** - * Isolated this since the method is called on the client machine - */ -trait JobResourceCreateCall { - def createJobObject(name: String, keyValuePairs: Map[String, Any]): Unit -} - -/** - * RUD and W - Read, Update, Delete & Watch - */ -trait JobResourceRUDCalls { - def deleteJobObject(name: String): Unit - - def getJobObject(name: String): SparkJobState - - def updateJobObject(name: String, value: String, fieldPath: String): Unit - - def watchJobObject(): Future[WatchObject] -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala deleted file mode 100644 index b2c9e17486917..0000000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResource.scala +++ /dev/null @@ -1,306 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.kubernetes - -import java.nio.file.{Files, Paths} -import java.util.concurrent.TimeUnit - -import io.fabric8.kubernetes.client.{BaseClient, KubernetesClient} -import okhttp3._ -import okio.{Buffer, BufferedSource} -import org.json4s.{DefaultFormats, Formats} -import org.json4s.JsonDSL._ -import org.json4s.jackson.JsonMethods._ -import org.json4s.jackson.Serialization.{read, write} -import scala.concurrent.{blocking, ExecutionContext, Future, Promise} -import scala.util.{Failure, Success, Try} -import scala.util.control.Breaks.{break, breakable} - -import org.apache.spark.SparkException -import org.apache.spark.deploy.kubernetes.SparkJobResource._ -import org.apache.spark.internal.Logging - -/* - * CRUD + Watch operations on a Spark Job State in Kubernetes - * */ -private[spark] object SparkJobResource { - - implicit val formats: Formats = DefaultFormats + JobStateSerDe - - val kind = "SparkJob" - val apiVersion = "apache.io/v1" - val apiEndpoint = s"apis/$apiVersion/namespaces/%s/sparkjobs" - - def getHttpClient(client: BaseClient): OkHttpClient = { - val field = classOf[BaseClient].getDeclaredField("httpClient") - try { - field.setAccessible(true) - field.get(client).asInstanceOf[OkHttpClient] - } finally { - field.setAccessible(false) - } - } - - case class Metadata(name: String, - uid: Option[String] = None, - labels: Option[Map[String, String]] = None, - annotations: Option[Map[String, String]] = None) - - case class SparkJobState(apiVersion: String, - kind: String, - metadata: Metadata, - spec: Map[String, Any]) - - case class WatchObject(`type`: String, `object`: SparkJobState) -} - - -private[spark] class SparkJobCreateResource(client: KubernetesClient, namespace: String) - extends JobResourceCreateCall with Logging { - - private val httpClient = getHttpClient(client.asInstanceOf[BaseClient]) - - /** - * Using a Map as an argument here allows adding more info into the Object if needed - * This is currently called on the client machine. We can avoid the token use. - * */ - override def createJobObject(name: String, keyValuePairs: Map[String, Any]): Unit = { - val resourceObject = - SparkJobState(apiVersion, kind, Metadata(name), keyValuePairs) - val payload = parse(write(resourceObject)) - val requestBody = RequestBody - .create(MediaType.parse("application/json"), compact(render(payload))) - val request = new Request.Builder() - .post(requestBody) - .url(s"${client.getMasterUrl}${apiEndpoint.format(namespace)}") - .build() - logDebug(s"Create Request: $request") - val response = httpClient.newCall(request).execute() - if (!response.isSuccessful) { - response.body().close() - val msg = - s"Failed to post resource $name. ${response.toString}. ${compact(render(payload))}" - logError(msg) - throw new SparkException(msg) - } - response.body().close() - logDebug(s"Successfully posted resource $name: " + - s"${pretty(render(parse(write(resourceObject))))}") - } -} - -private[spark] class SparkJobRUDResource( - client: KubernetesClient, - namespace: String, - ec: ExecutionContext) extends JobResourceRUDCalls with Logging { - - private val protocol = "https://" - - private val httpClient = getHttpClient(client.asInstanceOf[BaseClient]) - - private var watchSource: BufferedSource = _ - - private lazy val buffer = new Buffer() - - // Since this will be running inside a pod - // we can access the pods token and use it with the Authorization header when - // making rest calls to the k8s Api - private val kubeToken = { - val path = Paths.get("/var/run/secrets/kubernetes.io/serviceaccount/token") - val tok = Try(new String(Files.readAllBytes(path))) match { - case Success(some) => Option(some) - case Failure(e: Throwable) => logError(s"${e.getMessage}") - None - } - tok.map(t => t).getOrElse{ - // Log a warning just in case, but this should almost certainly never happen - logWarning("Error while retrieving pod token") - "" - } - } - - // we can also get the host from the environment variable - private val k8sServiceHost = { - val host = Try(sys.env("KUBERNETES_SERVICE_HOST")) match { - case Success(h) => Option(h) - case Failure(_) => None - } - host.map(h => h).getOrElse{ - // Log a warning just in case, but this should almost certainly never happen - logWarning("Error while retrieving k8s host address") - "127.0.0.1" - } - } - - // the port from the environment variable - private val k8sPort = { - val port = Try(sys.env("KUBERNETES_PORT_443_TCP_PORT")) match { - case Success(p) => Option(p) - case Failure(_) => None - } - port.map(p => p).getOrElse{ - // Log a warning just in case, but this should almost certainly never happen - logWarning("Error while retrieving k8s host port") - "8001" - } - } - - private def executeBlocking(cb: => WatchObject): Future[WatchObject] = { - val p = Promise[WatchObject]() - ec.execute(new Runnable { - override def run(): Unit = { - try { - p.trySuccess(blocking(cb)) - } catch { - case e: Throwable => p.tryFailure(e) - } - } - }) - p.future - } - - // Serves as a way to interrupt to the watcher thread. - // This closes the source the watcher is reading from and as a result triggers promise completion - def stopWatcher(): Unit = { - if (watchSource != null) { - buffer.close() - watchSource.close() - } - } - - override def updateJobObject(name: String, value: String, fieldPath: String): Unit = { - val payload = List( - ("op" -> "replace") ~ ("path" -> fieldPath) ~ ("value" -> value)) - val requestBody = RequestBody.create( - MediaType.parse("application/json-patch+json"), - compact(render(payload))) - val request = new Request.Builder() - .addHeader("Authorization", s"Bearer $kubeToken") - .patch(requestBody) - .url(s"$protocol$k8sServiceHost:$k8sPort/${apiEndpoint.format(namespace)}/$name") - .build() - logDebug(s"Update Request: $request") - val response = httpClient.newCall(request).execute() - if (!response.isSuccessful) { - response.body().close() - val msg = - s"Failed to patch resource $name. ${response.message()}. ${compact(render(payload))}" - logError(msg) - throw new SparkException(s"${response.code()} ${response.message()}") - } - response.body().close() - logDebug(s"Successfully patched resource $name.") - } - - override def deleteJobObject(name: String): Unit = { - val request = new Request.Builder() - .addHeader("Authorization", s"Bearer $kubeToken") - .delete() - .url(s"$protocol$k8sServiceHost:$k8sPort/${apiEndpoint.format(namespace)}/$name") - .build() - logDebug(s"Delete Request: $request") - val response = httpClient.newCall(request).execute() - if (!response.isSuccessful) { - response.body().close() - val msg = - s"Failed to delete resource $name. ${response.message()}. $request" - logError(msg) - throw new SparkException(msg) - } - response.body().close() - logInfo(s"Successfully deleted resource $name") - } - - def getJobObject(name: String): SparkJobState = { - val request = new Request.Builder() - .addHeader("Authorization", s"Bearer $kubeToken") - .get() - .url(s"$protocol$k8sServiceHost:$k8sPort/${apiEndpoint.format(namespace)}/$name") - .build() - logDebug(s"Get Request: $request") - val response = httpClient.newCall(request).execute() - if (!response.isSuccessful) { - response.body().close() - val msg = s"Failed to retrieve resource $name. ${response.message()}" - logError(msg) - throw new SparkException(msg) - } - logInfo(s"Successfully retrieved resource $name") - read[SparkJobState](response.body().string()) - } - - /** - * This method has an helper method that blocks to watch the object. - * The future is completed on a Delete event or source exhaustion. - * This method relies on the assumption of one sparkjob per namespace - */ - override def watchJobObject(): Future[WatchObject] = { - val watchClient = httpClient.newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build() - val request = new Request.Builder() - .addHeader("Authorization", s"Bearer $kubeToken") - .get() - .url(s"$protocol$k8sServiceHost:$k8sPort/${apiEndpoint.format(namespace)}?watch=true") - .build() - logDebug(s"Watch Request: $request") - val resp = watchClient.newCall(request).execute() - if (!resp.isSuccessful) { - resp.body().close() - val msg = s"Failed to start watch on resource ${resp.code()} ${resp.message()}" - logWarning(msg) - throw new SparkException(msg) - } - logInfo(s"Starting watch on jobResource") - watchJobObjectUtil(resp) - } - - /** - * This method has a blocking call - wait on SSE - inside it. - * However it is sent off in a new thread - */ - private def watchJobObjectUtil(response: Response): Future[WatchObject] = { - @volatile var wo: WatchObject = null - watchSource = response.body().source() - executeBlocking { - breakable { - // This will block until there are bytes to read or the source is exhausted. - while (!watchSource.exhausted()) { - watchSource.read(buffer, 8192) match { - case -1 => - cleanUpListener(watchSource, buffer) - throw new SparkException("Source is exhausted and object state is unknown") - case _ => - wo = read[WatchObject](buffer.readUtf8()) - wo match { - case WatchObject("DELETED", w) => - logInfo(s"${w.metadata.name} has been deleted") - cleanUpListener(watchSource, buffer) - case WatchObject(e, _) => logInfo(s"$e event. Still watching") - } - } - } - } - wo - } - } - - private def cleanUpListener(source: BufferedSource, buffer: Buffer): Unit = { - buffer.close() - source.close() - break() - } -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResourceFromWithinK8s.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResourceFromWithinK8s.scala new file mode 100644 index 0000000000000..44550cc5fcc48 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkJobResourceFromWithinK8s.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.kubernetes + +import java.nio.file.{Files, Paths} + +import io.fabric8.kubernetes.client.{Config, KubernetesClient} +import scala.util.{Failure, Success, Try} + +import org.apache.spark.deploy.kubernetes.tpr.TPRCrudCalls + +private[spark] class SparkJobResourceClientFromWithinK8s( + client: KubernetesClient) extends TPRCrudCalls { + + private val protocol: String = "https://" + + // we can also get the host from the environment variable + private val kubeHost: String = { + val host = Try(sys.env("KUBERNETES_SERVICE_HOST")) match { + case Success(h) => Option(h) + case Failure(_) => None + } + host.map(h => h).getOrElse { + // Log a warning just in case, but this should almost certainly never happen + logWarning("Error while retrieving k8s host address") + "127.0.0.1" + } + } + + // the port from the environment variable + private val kubeHostPort: String = { + val port = Try(sys.env("KUBERNETES_PORT_443_TCP_PORT")) match { + case Success(p) => Option(p) + case Failure(_) => None + } + port.map(p => p).getOrElse { + // Log a warning just in case, but this should almost certainly never happen + logWarning("Error while retrieving k8s host port") + "8001" + } + } + + // Since this will be running inside a pod + // we can access the pods token and use it with the Authorization header when + // making rest calls to the k8s Api + override protected val kubeToken: Option[String] = { + val path = Paths.get(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH) + Try(new String(Files.readAllBytes(path))) match { + case Success(some) => Option(some) + case Failure(e: Throwable) => logError(s"${e.getMessage}") + None + } + } + + override protected val k8sClient: KubernetesClient = client + override protected val kubeMaster: String = s"$protocol$kubeHost:$kubeHostPort" +} + +private[spark] class SparkJobResourceClientFromOutsideK8s( + client: KubernetesClient) extends TPRCrudCalls { + + override protected val k8sClient: KubernetesClient = client +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 688cd858e79ff..ffdccc14eb87b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -68,4 +68,9 @@ package object constants { private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submit" private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" + + // TPR + private[spark] val TPR_API_VERSION = "apache.io/v1" + private[spark] val TPR_API_ENDPOINT = s"apis/%s/namespaces/%s/sparkjobs" + private[spark] val TPR_KIND = "SparkJob" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobState.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobState.scala similarity index 96% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobState.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobState.scala index 5dacf39a6357c..6f90194f673ab 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobState.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobState.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.deploy.kubernetes +package org.apache.spark.deploy.kubernetes.tpr private[spark] object JobState extends Enumeration { type JobState = Value diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobStateSerDe.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala similarity index 93% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobStateSerDe.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala index 40201b8bd49de..16edd19de6e0c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/JobStateSerDe.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/JobStateSerDe.scala @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.spark.deploy.kubernetes +package org.apache.spark.deploy.kubernetes.tpr import org.json4s.{CustomSerializer, JString} import org.json4s.JsonAST.JNull -import org.apache.spark.deploy.kubernetes.JobState.JobState +import org.apache.spark.deploy.kubernetes.tpr.JobState.JobState /** * JobState Serializer and Deserializer diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala new file mode 100644 index 0000000000000..af88d06e51cc7 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/tpr/TPRCrudCalls.scala @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.tpr + +import java.util.concurrent.{ThreadPoolExecutor, TimeUnit} + +import io.fabric8.kubernetes.client.{BaseClient, KubernetesClient} +import okhttp3.{MediaType, OkHttpClient, Request, RequestBody, Response} +import okio.{Buffer, BufferedSource} +import org.json4s.{DefaultFormats, Formats} +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods.{compact, parse, pretty, render} +import org.json4s.jackson.Serialization.{read, write} +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{blocking, Future, Promise} +import scala.util.control.Breaks.{break, breakable} + +import org.apache.spark.deploy.kubernetes.ClientBuilder +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.internal.Logging +import org.apache.spark.SparkException +import org.apache.spark.util.ThreadUtils + +/** + * Isolated this since the method is called on the client machine + */ + +private[spark] case class Metadata(name: String, + uid: Option[String] = None, + labels: Option[Map[String, String]] = None, + annotations: Option[Map[String, String]] = None) + +private[spark] case class SparkJobState(apiVersion: String, + kind: String, + metadata: Metadata, + spec: Map[String, Any]) + +private[spark] case class WatchObject(`type`: String, `object`: SparkJobState) + +private[spark] abstract class TPRCrudCalls extends Logging { + protected val k8sClient: KubernetesClient + protected val kubeMaster: String = KUBERNETES_MASTER_INTERNAL_URL + protected val kubeToken: Option[String] = None + + implicit val formats: Formats = DefaultFormats + JobStateSerDe + + + protected val httpClient: OkHttpClient = ClientBuilder + .buildOkhttpClientFromWithinPod(k8sClient.asInstanceOf[BaseClient]) + + protected val namespace: String = k8sClient.getNamespace + private var watchSource: BufferedSource = _ + private lazy val buffer = new Buffer() + protected implicit val ec: ThreadPoolExecutor = ThreadUtils + .newDaemonCachedThreadPool("tpr-watcher-pool") + + private def executeBlocking(cb: => WatchObject): Future[WatchObject] = { + val p = Promise[WatchObject]() + ec.execute(new Runnable { + override def run(): Unit = { + try { + p.trySuccess(blocking(cb)) + } catch { + case e: Throwable => p.tryFailure(e) + } + } + }) + p.future + } + + private def completeRequest(partialReq: Request.Builder): Request = { + kubeToken match { + case Some(tok) => partialReq.addHeader("Authorization", s"Bearer $tok").build() + case None => partialReq.build() + } + } + + private def completeRequestWithExceptionIfNotSuccessful( + requestType: String, + response: Response, + additionalInfo: Option[Seq[String]] = None): Unit = { + + if (!response.isSuccessful) { + response.body().close() + val msg = new ArrayBuffer[String] + msg += s"Failed to $requestType resource." + + additionalInfo match { + case Some(info) => + for (extraMsg <- info) { + msg += extraMsg + } + case None => + } + + val finalMessage = msg.mkString(" ") + logError(finalMessage) + throw new SparkException(finalMessage) + } + } + + def createJobObject(name: String, keyValuePairs: Map[String, Any]): Unit = { + val resourceObject = + SparkJobState(TPR_API_VERSION, TPR_KIND, Metadata(name), keyValuePairs) + val payload = parse(write(resourceObject)) + + val requestBody = RequestBody + .create(MediaType.parse("application/json"), compact(render(payload))) + val request = completeRequest(new Request.Builder() + .post(requestBody) + .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}")) + + logDebug(s"Create Request: $request") + val response = httpClient.newCall(request).execute() + completeRequestWithExceptionIfNotSuccessful( + "post", + response, + Option(Seq(name, response.toString, compact(render(payload))))) + + response.body().close() + logDebug(s"Successfully posted resource $name: " + + s"${pretty(render(parse(write(resourceObject))))}") + } + + def deleteJobObject(tprObjectName: String): Unit = { + val request = completeRequest(new Request.Builder() + .delete() + .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}/$tprObjectName")) + + logDebug(s"Delete Request: $request") + val response = httpClient.newCall(request).execute() + completeRequestWithExceptionIfNotSuccessful( + "delete", + response, + Option(Seq(tprObjectName, response.message(), request.toString))) + + response.body().close() + logInfo(s"Successfully deleted resource $tprObjectName") + } + + def getJobObject(name: String): SparkJobState = { + val request = completeRequest(new Request.Builder() + .get() + .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}/$name")) + + logDebug(s"Get Request: $request") + val response = httpClient.newCall(request).execute() + completeRequestWithExceptionIfNotSuccessful( + "get", + response, + Option(Seq(name, response.message())) + ) + + logInfo(s"Successfully retrieved resource $name") + read[SparkJobState](response.body().string()) + } + + def updateJobObject(name: String, value: String, fieldPath: String): Unit = { + val payload = List( + ("op" -> "replace") ~ ("path" -> fieldPath) ~ ("value" -> value)) + val requestBody = + RequestBody.create( + MediaType.parse("application/json-patch+json"), + compact(render(payload))) + val request = completeRequest(new Request.Builder() + .patch(requestBody) + .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}/$name")) + + logDebug(s"Update Request: $request") + val response = httpClient.newCall(request).execute() + completeRequestWithExceptionIfNotSuccessful( + "patch", + response, + Option(Seq(name, response.message(), compact(render(payload)))) + ) + + response.body().close() + logDebug(s"Successfully patched resource $name.") + } + + /** + * This method has an helper method that blocks to watch the object. + * The future is completed on a Delete event or source exhaustion. + * This method relies on the assumption of one sparkjob per namespace + */ + def watchJobObject(): Future[WatchObject] = { + val watchClient = httpClient.newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build() + val request = completeRequest(new Request.Builder() + .get() + .url(s"$kubeMaster/${TPR_API_ENDPOINT.format(TPR_API_VERSION, namespace)}?watch=true")) + + logDebug(s"Watch Request: $request") + val resp = watchClient.newCall(request).execute() + completeRequestWithExceptionIfNotSuccessful( + "start watch on", + resp, + Option(Seq(resp.code().toString, resp.message()))) + + logInfo(s"Starting watch on jobResource") + watchJobObjectUtil(resp) + } + + /** + * This method has a blocking call - wait on SSE - inside it. + * However it is sent off in a new thread + */ + private def watchJobObjectUtil(response: Response): Future[WatchObject] = { + @volatile var wo: WatchObject = null + watchSource = response.body().source() + executeBlocking { + breakable { + // This will block until there are bytes to read or the source is exhausted. + while (!watchSource.exhausted()) { + watchSource.read(buffer, 8192) match { + case -1 => + cleanUpListener(watchSource, buffer) + throw new SparkException("Source is exhausted and object state is unknown") + case _ => + wo = read[WatchObject](buffer.readUtf8()) + wo match { + case WatchObject("DELETED", w) => + logInfo(s"${w.metadata.name} has been deleted") + cleanUpListener(watchSource, buffer) + case WatchObject(e, _) => logInfo(s"$e event. Still watching") + } + } + } + } + wo + } + } + + private def cleanUpListener(source: BufferedSource, buffer: Buffer): Unit = { + buffer.close() + source.close() + break() + } + + // Serves as a way to interrupt to the watcher thread. + // This closes the source the watcher is reading from and as a result triggers promise completion + def stopWatcher(): Unit = { + if (watchSource != null) { + buffer.close() + watchSource.close() + } + } + +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 898b215b92d04..6f77e2c2f5c23 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -16,17 +16,14 @@ */ package org.apache.spark.scheduler.cluster.kubernetes -import java.util.UUID -import java.util.concurrent.Executors import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} -import com.google.common.util.concurrent.ThreadFactoryBuilder import io.fabric8.kubernetes.api.model.{ContainerPortBuilder, EnvVarBuilder, Pod, QuantityBuilder} import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkException} -import org.apache.spark.deploy.kubernetes.{Client, KubernetesClientBuilder} +import org.apache.spark.deploy.kubernetes.ClientBuilder import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.rpc.RpcEndpointAddress @@ -75,8 +72,8 @@ private[spark] class KubernetesClusterSchedulerBackend( private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) - private val kubernetesClient = KubernetesClientBuilder - .buildFromWithinPod(kubernetesNamespace) + private val kubernetesClient = ClientBuilder + .buildK8sClientFromWithinPod(kubernetesNamespace) private val driverPod = try { kubernetesClient.pods().inNamespace(kubernetesNamespace). @@ -87,7 +84,7 @@ private[spark] class KubernetesClusterSchedulerBackend( throw new SparkException(s"Executor cannot find driver pod", throwable) } - override val minRegisteredRatio = + override val minRegisteredRatio: Double = if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { 0.8 } else { @@ -101,7 +98,7 @@ private[spark] class KubernetesClusterSchedulerBackend( sc.getConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString - private val initialExecutors = getInitialTargetExecutorNumber(1) + private val initialExecutors = getInitialTargetExecutorNumber() private def getInitialTargetExecutorNumber(defaultNumExecutors: Int = 1): Int = { if (Utils.isDynamicAllocationEnabled(conf)) { @@ -237,7 +234,7 @@ private[spark] class KubernetesClusterSchedulerBackend( logInfo(s"Requesting ${requestedTotal - totalExpectedExecutors.get}" + s" additional executors, expecting total $requestedTotal and currently" + s" expected ${totalExpectedExecutors.get}") - for (i <- 0 until (requestedTotal - totalExpectedExecutors.get)) { + for (_ <- 0 until (requestedTotal - totalExpectedExecutors.get)) { runningExecutorPods += allocateNewExecutorPod() } }