From 71b825af3390248d43919fe669f11cabb50f5636 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 3 Apr 2017 13:02:04 -0700 Subject: [PATCH 01/14] Staging server for receiving application dependencies. --- pom.xml | 21 +++ resource-managers/kubernetes/core/pom.xml | 21 +++ .../v2/KubernetesSparkDependencyServer.scala | 61 ++++++++ .../v2/KubernetesSparkDependencyService.scala | 106 ++++++++++++++ ...KubernetesSparkDependencyServiceImpl.scala | 130 ++++++++++++++++++ ...rnetesSparkDependencyServiceRetrofit.scala | 55 ++++++++ .../rest/kubernetes/v2/RetrofitUtils.scala | 38 +++++ ...KubernetesSparkDependencyServerSuite.scala | 101 ++++++++++++++ ...netesSparkDependencyServiceImplSuite.scala | 49 +++++++ 9 files changed, 582 insertions(+) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/RetrofitUtils.scala create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala diff --git a/pom.xml b/pom.xml index 9cfaf6eb65323..0149f802e4614 100644 --- a/pom.xml +++ b/pom.xml @@ -137,6 +137,7 @@ 1.8.1 1.6.0 8.18.0 + 2.2.0 1.52 9.2.16.v20160414 3.1.0 @@ -327,6 +328,21 @@ feign-jaxrs ${feign.version} + + com.squareup.retrofit2 + retrofit + ${retrofit.version} + + + com.squareup.retrofit2 + converter-jackson + ${retrofit.version} + + + com.squareup.retrofit2 + converter-scalars + ${retrofit.version} + org.bouncycastle bcpkix-jdk15on @@ -681,6 +697,11 @@ jersey-client ${jersey.version} + + org.glassfish.jersey.media + jersey-media-multipart + ${jersey.version} + javax.ws.rs javax.ws.rs-api diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index 6d2f1d0fd2769..9e8b3436ab36d 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -60,10 +60,31 @@ com.netflix.feign feign-okhttp + + org.glassfish.jersey.containers + jersey-container-servlet + + + org.glassfish.jersey.media + jersey-media-multipart + com.netflix.feign feign-jackson + + com.squareup.retrofit2 + retrofit + + + com.squareup.retrofit2 + converter-jackson + + + com.squareup.retrofit2 + converter-scalars + + com.netflix.feign feign-jaxrs diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala new file mode 100644 index 0000000000000..ace750f1927ff --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes.v2 + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.jaxrs.json.{JacksonJaxbJsonProvider, JacksonJsonProvider} +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.eclipse.jetty.server.{Server, ServerConnector} +import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} +import org.eclipse.jetty.util.thread.QueuedThreadPool +import org.glassfish.jersey.media.multipart.MultiPartFeature +import org.glassfish.jersey.server.ResourceConfig +import org.glassfish.jersey.servlet.ServletContainer + +private[spark] class KubernetesSparkDependencyServer( + port: Int, + serviceInstance: KubernetesSparkDependencyService) { + + private var jettyServer: Option[Server] = None + + def start(): Unit = synchronized { + val threadPool = new QueuedThreadPool + val contextHandler = new ServletContextHandler() + val jsonProvider = new JacksonJaxbJsonProvider() + jsonProvider.setMapper(new ObjectMapper().registerModule(new DefaultScalaModule)) + val resourceConfig = new ResourceConfig().registerInstances( + serviceInstance, + jsonProvider, + new MultiPartFeature) + val servletHolder = new ServletHolder("main", new ServletContainer(resourceConfig)) + contextHandler.setContextPath("/api/") + contextHandler.addServlet(servletHolder, "/*") + threadPool.setDaemon(true) + val server = new Server(threadPool) + val connector = new ServerConnector(server) + connector.setPort(port) + server.addConnector(connector) + server.setHandler(contextHandler) + server.start() + jettyServer = Some(server) + } + + def stop(): Unit = synchronized { + jettyServer.foreach(_.stop()) + jettyServer = None + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala new file mode 100644 index 0000000000000..a1e879231a7ef --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes.v2 + +import java.io.InputStream +import javax.ws.rs.{Consumes, GET, HeaderParam, Path, Produces, PUT, QueryParam} +import javax.ws.rs.core.{MediaType, StreamingOutput} + +import org.glassfish.jersey.media.multipart.FormDataParam + +import org.apache.spark.deploy.rest.KubernetesCredentials + +/** + * Service that receives application data that can be received later on. This is primarily used + * in the context of Spark, but the concept is generic enough to be used for arbitrary applications. + * The use case is to have a place for Kubernetes application submitters to bootstrap dynamic, + * heavyweight application data for pods. Application submitters may have data stored on their + * local disks that they want to provide to the pods they create through the API server. ConfigMaps + * are one way to provide this data, but the data in ConfigMaps are stored in etcd which cannot + * maintain data in the hundreds of megabytes in size.
+ *
+ * The general use case is for an application submitter to ship the dependencies to the server via + * {@link uploadDependencies}; the application submitter will then receive a unique secure token. + * The application submitter then ought to convert the token into a secret, and use this secret in + * a pod that fetches the uploaded dependencies via {@link downloadJars}, {@link downloadFiles}, and + * {@link getKubernetesCredentials}. + */ +@Path("/") +private[spark] trait KubernetesSparkDependencyService { + + /** + * Register an application with the dependency service, so that the driver pod can retrieve them + * when it runs. + * + * @param driverPodName Name of the driver pod. + * @param driverPodNamespace Namespace for the driver pod. + * @param jars Application jars to upload, compacted together in tar + gzip format. The tarball + * should contain the jar files laid out in a flat hierarchy, without any directories. + * We take a stream here to avoid holding these entirely in memory. + * @param files Application files to upload, compacted together in tar + gzip format. THe tarball + * should contain the files laid out in a flat hierarchy, without any directories. + * We take a stream here to avoid holding these entirely in memory. + * @param kubernetesCredentials These credentials are primarily used to monitor the progress of + * the application. When the application shuts down normally, shuts + * down abnormally and does not restart, or fails to start entirely, + * the data uploaded through this endpoint is cleared. + * @return A unique token that should be provided when retrieving these dependencies later. + */ + @PUT + @Consumes(Array(MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON)) + @Produces(Array(MediaType.TEXT_PLAIN)) + @Path("/dependencies") + def uploadDependencies( + @QueryParam("driverPodName") driverPodName: String, + @QueryParam("driverPodNamespace") driverPodNamespace: String, + @FormDataParam("jars") jars: InputStream, + @FormDataParam("files") files: InputStream, + @FormDataParam("kubernetesCredentials") kubernetesCredentials: KubernetesCredentials) + : String + + /** + * Download an application's jars. The jars are provided as a stream, where the stream's + * underlying data matches the stream that was uploaded in {@link uploadDependencies}. + */ + @GET + @Consumes(Array(MediaType.APPLICATION_JSON)) + @Produces(Array(MediaType.APPLICATION_OCTET_STREAM)) + @Path("/dependencies/jars") + def downloadJars( + @HeaderParam("Authorization") applicationSecret: String): StreamingOutput + + /** + * Download an application's files. The jars are provided as a stream, where the stream's + * underlying data matches the stream that was uploaded in {@link uploadDependencies}. + */ + @GET + @Consumes(Array(MediaType.APPLICATION_JSON)) + @Produces(Array(MediaType.APPLICATION_OCTET_STREAM)) + @Path("/dependencies/files") + def downloadFiles( + @HeaderParam("Authorization") applicationSecret: String): StreamingOutput + + /** + * Retrieve the Kubernetes credentials being used to monitor the Spark application. + */ + @GET + @Consumes(Array(MediaType.APPLICATION_JSON)) + @Produces(Array(MediaType.APPLICATION_JSON)) + @Path("/dependencies/credentials") + def getKubernetesCredentials( + @HeaderParam("Authorization") applicationSecret: String): KubernetesCredentials +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala new file mode 100644 index 0000000000000..d1892f0f455c3 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes.v2 + +import java.io.{File, FileOutputStream, InputStream, OutputStream} +import java.security.SecureRandom +import java.util.UUID +import javax.ws.rs.core.StreamingOutput + +import com.google.common.io.{BaseEncoding, ByteStreams, Files} +import scala.collection.mutable + +import org.apache.spark.SparkException +import org.apache.spark.deploy.rest.KubernetesCredentials +import org.apache.spark.util.Utils + +private[spark] class KubernetesSparkDependencyServiceImpl(dependenciesRootDir: File) + extends KubernetesSparkDependencyService { + + private val DIRECTORIES_LOCK = new Object + private val REGISTERED_DRIVERS_LOCK = new Object + private val SPARK_APPLICATION_DEPENDENCIES_LOCK = new Object + private val SECURE_RANDOM = new SecureRandom() + // TODO clean up these resources based on the driver's lifecycle + private val registeredDrivers = mutable.Set.empty[PodNameAndNamespace] + private val sparkApplicationDependencies = mutable.Map.empty[String, SparkApplicationDependencies] + + override def uploadDependencies( + driverPodName: String, + driverPodNamespace: String, + jars: InputStream, + files: InputStream, + kubernetesCredentials: KubernetesCredentials): String = { + val podNameAndNamespace = PodNameAndNamespace( + name = driverPodName, + namespace = driverPodNamespace) + REGISTERED_DRIVERS_LOCK.synchronized { + if (registeredDrivers.contains(podNameAndNamespace)) { + throw new SparkException(s"Spark application with driver pod named $driverPodName" + + s" and namespace $driverPodNamespace already uploaded its dependencies.") + } + registeredDrivers.add(podNameAndNamespace) + } + try { + val secretBytes = new Array[Byte](1024) + SECURE_RANDOM.nextBytes(secretBytes) + val applicationSecret = UUID.randomUUID() + "-" + BaseEncoding.base64().encode(secretBytes) + val namespaceDir = new File(dependenciesRootDir, podNameAndNamespace.namespace) + val applicationDir = new File(namespaceDir, podNameAndNamespace.name) + DIRECTORIES_LOCK.synchronized { + if (!applicationDir.exists()) { + if (!applicationDir.mkdirs()) { + throw new SparkException("Failed to create dependencies directory for application" + + s" at ${applicationDir.getAbsolutePath}") + } + } + } + val jarsTgz = new File(applicationDir, "jars.tgz") + // TODO encrypt the written data with the secret. + Utils.tryWithResource(new FileOutputStream(jarsTgz)) { ByteStreams.copy(jars, _) } + val filesTgz = new File(applicationDir, "files.tgz") + Utils.tryWithResource(new FileOutputStream(filesTgz)) { ByteStreams.copy(files, _) } + SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized { + sparkApplicationDependencies(applicationSecret) = SparkApplicationDependencies( + podNameAndNamespace, + jarsTgz, + filesTgz, + kubernetesCredentials) + } + applicationSecret + } catch { + case e: Throwable => + // Revert the registered driver if we failed for any reason, most likely from disk ops + registeredDrivers.remove(podNameAndNamespace) + throw e + } + } + + override def downloadJars(applicationSecret: String): StreamingOutput = { + appFileToStreamingOutput(applicationSecret, _.jarsTgz) + } + + override def downloadFiles(applicationSecret: String): StreamingOutput = { + appFileToStreamingOutput(applicationSecret, _.filesTgz) + } + + override def getKubernetesCredentials(applicationSecret: String): KubernetesCredentials = { + SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized { + sparkApplicationDependencies.get(applicationSecret) + .map(_.kubernetesCredentials) + .getOrElse(throw new SparkException("No application found for the provided token.")) + } + } + + private def appFileToStreamingOutput( + applicationSecret: String, + dependency: (SparkApplicationDependencies => File)): StreamingOutput = { + val applicationDependencies = SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized { + sparkApplicationDependencies + .get(applicationSecret) + .getOrElse(throw new SparkException("No application found for the provided token.")) + } + new StreamingOutput { + override def write(outputStream: OutputStream) = { + Files.copy(dependency(applicationDependencies), outputStream) + } + } + } +} + +private case class PodNameAndNamespace(name: String, namespace: String) +private case class SparkApplicationDependencies( + driverPodNameAndNamespace: PodNameAndNamespace, + jarsTgz: File, + filesTgz: File, + kubernetesCredentials: KubernetesCredentials) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala new file mode 100644 index 0000000000000..e181d240fd8dc --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes.v2 + +import okhttp3.{RequestBody, ResponseBody} +import retrofit2.Call +import retrofit2.http.{Multipart, Streaming} + +import org.apache.spark.deploy.rest.KubernetesCredentials + +/** + * Retrofit-compatible variant of {@link KubernetesSparkDependencyService}. For documentation on + * how to use this service, see the aforementioned JAX-RS based interface. + */ +private[spark] trait KubernetesSparkDependencyServiceRetrofit { + + @Multipart + @retrofit2.http.PUT("/api/dependencies") + def uploadDependencies( + @retrofit2.http.Query("driverPodName") driverPodName: String, + @retrofit2.http.Query("driverPodNamespace") driverPodNamespace: String, + @retrofit2.http.Part("jars") jars: RequestBody, + @retrofit2.http.Part("files") files: RequestBody, + @retrofit2.http.Part("kubernetesCredentials") + kubernetesCredentials: RequestBody): Call[String] + + @Streaming + @retrofit2.http.GET("/api/dependencies/jars") + def downloadJars( + @retrofit2.http.Header("Authorization") applicationSecret: String): Call[ResponseBody] + + @Streaming + @retrofit2.http.GET("/api/dependencies/files") + def downloadFiles( + @retrofit2.http.Header("Authorization") applicationSecret: String): Call[ResponseBody] + + @retrofit2.http.GET("/api/dependencies/credentials") + def getKubernetesCredentials( + @retrofit2.http.Header("Authorization") applicationSecret: String) + : Call[KubernetesCredentials] +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/RetrofitUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/RetrofitUtils.scala new file mode 100644 index 0000000000000..c5c5c0d35b7cb --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/RetrofitUtils.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes.v2 + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import retrofit2.Retrofit +import retrofit2.converter.jackson.JacksonConverterFactory +import retrofit2.converter.scalars.ScalarsConverterFactory + +private[spark] object RetrofitUtils { + + private val OBJECT_MAPPER = new ObjectMapper().registerModule(new DefaultScalaModule) + + def createRetrofitClient[T](baseUrl: String, serviceType: Class[T]): T = { + new Retrofit.Builder() + .baseUrl(baseUrl) + .addConverterFactory(ScalarsConverterFactory.create()) + .addConverterFactory(JacksonConverterFactory.create(OBJECT_MAPPER)) + .build() + .create(serviceType) + } + +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala new file mode 100644 index 0000000000000..bdf46f0d1ff2b --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes.v2 + +import javax.ws.rs.core.MediaType + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.google.common.io.ByteStreams +import okhttp3.{RequestBody, ResponseBody} +import org.scalatest.BeforeAndAfterAll +import retrofit2.Call + +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.rest.KubernetesCredentials +import org.apache.spark.util.Utils + +/** + * Tests for KubernetesSparkDependencyServer and its APIs. Note that this is not an end-to-end + * integration test, and as such does not upload and download files in tar.gz as would be done + * in production. Thus we use the retrofit clients directly despite the fact that in practice + * we would likely want to create an opinionated abstraction on top of the retrofit client; we + * can test this abstraction layer separately, however. This test is mainly for checking that + * we've configured the Jetty server correctly and that the endpoints reached over HTTP can + * receive streamed uploads and can stream downloads. + */ +class KubernetesSparkDependencyServerSuite extends SparkFunSuite with BeforeAndAfterAll { + + private val serviceImpl = new KubernetesSparkDependencyServiceImpl(Utils.createTempDir()) + private val server = new KubernetesSparkDependencyServer(10021, serviceImpl) + private val OBJECT_MAPPER = new ObjectMapper().registerModule(new DefaultScalaModule) + + override def beforeAll(): Unit = { + server.start() + } + + override def afterAll(): Unit = { + server.stop() + } + + test("Accept file and jar uploads and downloads") { + val retrofitService = RetrofitUtils.createRetrofitClient("http://localhost:10021/", + classOf[KubernetesSparkDependencyServiceRetrofit]) + val jarsBytes = Array[Byte](1, 2, 3, 4) + val filesBytes = Array[Byte](5, 6, 7) + val jarsRequestBody = RequestBody.create( + okhttp3.MediaType.parse(MediaType.MULTIPART_FORM_DATA), jarsBytes) + val filesRequestBody = RequestBody.create( + okhttp3.MediaType.parse(MediaType.MULTIPART_FORM_DATA), filesBytes) + val kubernetesCredentials = KubernetesCredentials(Some("token"), Some("ca-cert"), None, None) + val kubernetesCredentialsString = OBJECT_MAPPER.writer() + .writeValueAsString(kubernetesCredentials) + val readKubernetesCredentials: KubernetesCredentials = OBJECT_MAPPER + .readerFor(classOf[KubernetesCredentials]) + .readValue(kubernetesCredentialsString) + val kubernetesCredentialsBody = RequestBody.create( + okhttp3.MediaType.parse(MediaType.APPLICATION_JSON), kubernetesCredentialsString) + val uploadResponse = retrofitService.uploadDependencies("podName", "podNamespace", + jarsRequestBody, filesRequestBody, kubernetesCredentialsBody) + val secret = getTypedResponseResult(uploadResponse) + + checkResponseBodyBytesMatches(retrofitService.downloadJars(secret), + jarsBytes) + checkResponseBodyBytesMatches(retrofitService.downloadFiles(secret), + filesBytes) + val downloadedCredentials = getTypedResponseResult( + retrofitService.getKubernetesCredentials(secret)) + assert(downloadedCredentials === kubernetesCredentials) + } + + private def getTypedResponseResult[T](call: Call[T]): T = { + val response = call.execute() + assert(response.code() >= 200 && response.code() < 300, Option(response.errorBody()) + .map(_.string()) + .getOrElse("Error executing HTTP request, but error body was not provided.")) + val callResult = response.body() + assert(callResult != null) + callResult + } + + private def checkResponseBodyBytesMatches(call: Call[ResponseBody], bytes: Array[Byte]): Unit = { + val responseBody = getTypedResponseResult(call) + val downloadedBytes = ByteStreams.toByteArray(responseBody.byteStream()) + assert(downloadedBytes.toSeq === bytes) + } + +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala new file mode 100644 index 0000000000000..65c928ab8e83d --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes.v2 + +import java.util.UUID + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.SparkFunSuite +import org.apache.spark.util.Utils + +/** + * Unit, scala-level tests for KubernetesSparkDependencyServiceImpl. The coverage here + * differs from that of KubernetesSparkDependencyServerSuite as here we invoke the + * implementation methods directly as opposed to over HTTP. + */ +class KubernetesSparkDependencyServiceImplSuite extends SparkFunSuite with BeforeAndAfter { + + private val dependencyRootDir = Utils.createTempDir() + private val serviceImpl = new KubernetesSparkDependencyServiceImpl(dependencyRootDir) + private val jarsBytes = Array[Byte](1, 2, 3, 4) + private val filesBytes = Array[Byte](5, 6, 7) + private var podName: String = _ + private var podNamespace: String = _ + + before { + podName = UUID.randomUUID().toString + podNamespace = UUID.randomUUID().toString + } + + test("Uploads should write to a directory in the underlying disk") { + // TODO + } + +} From 6e40c4c9b4d5e4d40949026193deebab1bb8ae11 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 3 Apr 2017 18:41:56 -0700 Subject: [PATCH 02/14] Add unit test for file writing --- ...netesSparkDependencyServiceImplSuite.scala | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala index 65c928ab8e83d..ab91fa2db6fdd 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala @@ -16,11 +16,15 @@ */ package org.apache.spark.deploy.rest.kubernetes.v2 +import java.io.ByteArrayInputStream +import java.nio.file.Paths import java.util.UUID +import com.google.common.io.Files import org.scalatest.BeforeAndAfter import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.rest.KubernetesCredentials import org.apache.spark.util.Utils /** @@ -34,6 +38,8 @@ class KubernetesSparkDependencyServiceImplSuite extends SparkFunSuite with Befor private val serviceImpl = new KubernetesSparkDependencyServiceImpl(dependencyRootDir) private val jarsBytes = Array[Byte](1, 2, 3, 4) private val filesBytes = Array[Byte](5, 6, 7) + private val kubernetesCredentials = KubernetesCredentials( + Some("token"), Some("caCert"), Some("key"), Some("cert")) private var podName: String = _ private var podNamespace: String = _ @@ -42,8 +48,25 @@ class KubernetesSparkDependencyServiceImplSuite extends SparkFunSuite with Befor podNamespace = UUID.randomUUID().toString } - test("Uploads should write to a directory in the underlying disk") { - // TODO + test("Uploads should write data to the underlying disk") { + Utils.tryWithResource(new ByteArrayInputStream(jarsBytes)) { jarsStream => + Utils.tryWithResource(new ByteArrayInputStream(filesBytes)) { filesStream => + serviceImpl.uploadDependencies( + "name", "namespace", jarsStream, filesStream, kubernetesCredentials) + } + } + val jarsTgz = Paths.get(dependencyRootDir.getAbsolutePath, "namespace", "name", "jars.tgz") + .toFile + assert(jarsTgz.isFile, + s"Jars written to ${jarsTgz.getAbsolutePath} does not exist or is not a file.") + val jarsTgzBytes = Files.toByteArray(jarsTgz) + assert(jarsBytes.toSeq === jarsTgzBytes.toSeq, "Incorrect jars bytes were written.") + val filesTgz = Paths.get(dependencyRootDir.getAbsolutePath, "namespace", "name", "files.tgz") + .toFile + assert(filesTgz.isFile, + s"Files written to ${filesTgz.getAbsolutePath} does not exist or is not a file.") + val filesTgzBytes = Files.toByteArray(filesTgz) + assert(filesBytes.toSeq === filesTgzBytes.toSeq, "Incorrect files bytes were written.") } } From 7d00f0736a8e45e6f838112e0e66417d0def79c7 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 5 Apr 2017 17:18:20 -0700 Subject: [PATCH 03/14] Minor fixes --- .../rest/kubernetes/v2/KubernetesSparkDependencyServer.scala | 2 +- .../kubernetes/v2/KubernetesSparkDependencyServerSuite.scala | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala index ace750f1927ff..2b1022a7dcc9d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.rest.kubernetes.v2 import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.jaxrs.json.{JacksonJaxbJsonProvider, JacksonJsonProvider} +import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.eclipse.jetty.server.{Server, ServerConnector} import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala index bdf46f0d1ff2b..ad86a29bb3086 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala @@ -64,9 +64,6 @@ class KubernetesSparkDependencyServerSuite extends SparkFunSuite with BeforeAndA val kubernetesCredentials = KubernetesCredentials(Some("token"), Some("ca-cert"), None, None) val kubernetesCredentialsString = OBJECT_MAPPER.writer() .writeValueAsString(kubernetesCredentials) - val readKubernetesCredentials: KubernetesCredentials = OBJECT_MAPPER - .readerFor(classOf[KubernetesCredentials]) - .readValue(kubernetesCredentialsString) val kubernetesCredentialsBody = RequestBody.create( okhttp3.MediaType.parse(MediaType.APPLICATION_JSON), kubernetesCredentialsString) val uploadResponse = retrofitService.uploadDependencies("podName", "podNamespace", From 3dd3504da1da80c1e22c8f0512f78f92e5228ccb Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 11 Apr 2017 15:33:34 -0700 Subject: [PATCH 04/14] Remove getting credentials from the API We still want to post them because in the future we can use these credentials to monitor the API server and handle cleaning up the data accordingly. --- .../v2/KubernetesSparkDependencyService.scala | 10 ---------- .../v2/KubernetesSparkDependencyServiceImpl.scala | 8 -------- .../v2/KubernetesSparkDependencyServiceRetrofit.scala | 5 ----- .../v2/KubernetesSparkDependencyServerSuite.scala | 3 --- 4 files changed, 26 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala index a1e879231a7ef..83d704c398b64 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala @@ -93,14 +93,4 @@ private[spark] trait KubernetesSparkDependencyService { @Path("/dependencies/files") def downloadFiles( @HeaderParam("Authorization") applicationSecret: String): StreamingOutput - - /** - * Retrieve the Kubernetes credentials being used to monitor the Spark application. - */ - @GET - @Consumes(Array(MediaType.APPLICATION_JSON)) - @Produces(Array(MediaType.APPLICATION_JSON)) - @Path("/dependencies/credentials") - def getKubernetesCredentials( - @HeaderParam("Authorization") applicationSecret: String): KubernetesCredentials } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala index d1892f0f455c3..58067d5d7a23a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala @@ -98,14 +98,6 @@ private[spark] class KubernetesSparkDependencyServiceImpl(dependenciesRootDir: F appFileToStreamingOutput(applicationSecret, _.filesTgz) } - override def getKubernetesCredentials(applicationSecret: String): KubernetesCredentials = { - SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized { - sparkApplicationDependencies.get(applicationSecret) - .map(_.kubernetesCredentials) - .getOrElse(throw new SparkException("No application found for the provided token.")) - } - } - private def appFileToStreamingOutput( applicationSecret: String, dependency: (SparkApplicationDependencies => File)): StreamingOutput = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala index e181d240fd8dc..86809ff695d6c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala @@ -47,9 +47,4 @@ private[spark] trait KubernetesSparkDependencyServiceRetrofit { @retrofit2.http.GET("/api/dependencies/files") def downloadFiles( @retrofit2.http.Header("Authorization") applicationSecret: String): Call[ResponseBody] - - @retrofit2.http.GET("/api/dependencies/credentials") - def getKubernetesCredentials( - @retrofit2.http.Header("Authorization") applicationSecret: String) - : Call[KubernetesCredentials] } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala index ad86a29bb3086..5fceb5664c00a 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala @@ -74,9 +74,6 @@ class KubernetesSparkDependencyServerSuite extends SparkFunSuite with BeforeAndA jarsBytes) checkResponseBodyBytesMatches(retrofitService.downloadFiles(secret), filesBytes) - val downloadedCredentials = getTypedResponseResult( - retrofitService.getKubernetesCredentials(secret)) - assert(downloadedCredentials === kubernetesCredentials) } private def getTypedResponseResult[T](call: Call[T]): T = { From df8e0c86f5f2e1824cee252d7d40704e07e577b7 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 12 Apr 2017 13:47:16 -0700 Subject: [PATCH 05/14] Generalize to resource staging server outside of Spark --- ...rver.scala => ResourceStagingServer.scala} | 4 +- ...ice.scala => ResourceStagingService.scala} | 53 +++++------- ...scala => ResourceStagingServiceImpl.scala} | 82 +++++++------------ ...a => ResourceStagingServiceRetrofit.scala} | 22 ++--- ...scala => ResourceStagingServerSuite.scala} | 38 +++++---- ... => ResourceStagingServiceImplSuite.scala} | 49 +++++------ 6 files changed, 99 insertions(+), 149 deletions(-) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/{KubernetesSparkDependencyServer.scala => ResourceStagingServer.scala} (95%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/{KubernetesSparkDependencyService.scala => ResourceStagingService.scala} (64%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/{KubernetesSparkDependencyServiceImpl.scala => ResourceStagingServiceImpl.scala} (50%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/{KubernetesSparkDependencyServiceRetrofit.scala => ResourceStagingServiceRetrofit.scala} (65%) rename resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/{KubernetesSparkDependencyServerSuite.scala => ResourceStagingServerSuite.scala} (72%) rename resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/{KubernetesSparkDependencyServiceImplSuite.scala => ResourceStagingServiceImplSuite.scala} (51%) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServer.scala similarity index 95% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServer.scala index 2b1022a7dcc9d..e09a788c45321 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServer.scala @@ -26,9 +26,9 @@ import org.glassfish.jersey.media.multipart.MultiPartFeature import org.glassfish.jersey.server.ResourceConfig import org.glassfish.jersey.servlet.ServletContainer -private[spark] class KubernetesSparkDependencyServer( +private[spark] class ResourceStagingServer( port: Int, - serviceInstance: KubernetesSparkDependencyService) { + serviceInstance: ResourceStagingService) { private var jettyServer: Option[Server] = None diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala similarity index 64% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala index 83d704c398b64..4d841c42af6ba 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala @@ -40,20 +40,19 @@ import org.apache.spark.deploy.rest.KubernetesCredentials * {@link getKubernetesCredentials}. */ @Path("/") -private[spark] trait KubernetesSparkDependencyService { +private[spark] trait ResourceStagingService { /** - * Register an application with the dependency service, so that the driver pod can retrieve them - * when it runs. + * Register an application with the dependency service, so that pods with the given labels can + * retrieve them when they run. * - * @param driverPodName Name of the driver pod. - * @param driverPodNamespace Namespace for the driver pod. - * @param jars Application jars to upload, compacted together in tar + gzip format. The tarball - * should contain the jar files laid out in a flat hierarchy, without any directories. - * We take a stream here to avoid holding these entirely in memory. - * @param files Application files to upload, compacted together in tar + gzip format. THe tarball - * should contain the files laid out in a flat hierarchy, without any directories. - * We take a stream here to avoid holding these entirely in memory. + * @param resources Application resources to upload, compacted together in tar + gzip format. + * The tarball should contain the files laid out in a flat hierarchy, without + * any directories. We take a stream here to avoid holding these entirely in + * memory. + * @param podLabels Labels of pods to monitor. When no more pods are running with the given label, + * after some period of time, these dependencies will be cleared. + * @param podNamespace Namespace of pods to monitor. * @param kubernetesCredentials These credentials are primarily used to monitor the progress of * the application. When the application shuts down normally, shuts * down abnormally and does not restart, or fails to start entirely, @@ -61,36 +60,24 @@ private[spark] trait KubernetesSparkDependencyService { * @return A unique token that should be provided when retrieving these dependencies later. */ @PUT - @Consumes(Array(MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON)) + @Consumes(Array(MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN)) @Produces(Array(MediaType.TEXT_PLAIN)) - @Path("/dependencies") - def uploadDependencies( - @QueryParam("driverPodName") driverPodName: String, - @QueryParam("driverPodNamespace") driverPodNamespace: String, - @FormDataParam("jars") jars: InputStream, - @FormDataParam("files") files: InputStream, + @Path("/resources/upload") + def uploadResources( + @FormDataParam("podLabels") podLabels: Map[String, String], + @FormDataParam("podNamespace") podNamespace: String, + @FormDataParam("resources") resources: InputStream, @FormDataParam("kubernetesCredentials") kubernetesCredentials: KubernetesCredentials) : String /** - * Download an application's jars. The jars are provided as a stream, where the stream's - * underlying data matches the stream that was uploaded in {@link uploadDependencies}. + * Download an application's resources. The resources are provided as a stream, where the stream's + * underlying data matches the stream that was uploaded in uploadResources. */ @GET @Consumes(Array(MediaType.APPLICATION_JSON)) @Produces(Array(MediaType.APPLICATION_OCTET_STREAM)) - @Path("/dependencies/jars") - def downloadJars( - @HeaderParam("Authorization") applicationSecret: String): StreamingOutput - - /** - * Download an application's files. The jars are provided as a stream, where the stream's - * underlying data matches the stream that was uploaded in {@link uploadDependencies}. - */ - @GET - @Consumes(Array(MediaType.APPLICATION_JSON)) - @Produces(Array(MediaType.APPLICATION_OCTET_STREAM)) - @Path("/dependencies/files") - def downloadFiles( + @Path("/resources/download") + def downloadResources( @HeaderParam("Authorization") applicationSecret: String): StreamingOutput } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala similarity index 50% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala index 58067d5d7a23a..3e0c0906a574b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala @@ -26,81 +26,60 @@ import scala.collection.mutable import org.apache.spark.SparkException import org.apache.spark.deploy.rest.KubernetesCredentials +import org.apache.spark.internal.Logging import org.apache.spark.util.Utils -private[spark] class KubernetesSparkDependencyServiceImpl(dependenciesRootDir: File) - extends KubernetesSparkDependencyService { +private[spark] class ResourceStagingServiceImpl(dependenciesRootDir: File) + extends ResourceStagingService with Logging { private val DIRECTORIES_LOCK = new Object - private val REGISTERED_DRIVERS_LOCK = new Object private val SPARK_APPLICATION_DEPENDENCIES_LOCK = new Object private val SECURE_RANDOM = new SecureRandom() // TODO clean up these resources based on the driver's lifecycle - private val registeredDrivers = mutable.Set.empty[PodNameAndNamespace] private val sparkApplicationDependencies = mutable.Map.empty[String, SparkApplicationDependencies] - override def uploadDependencies( - driverPodName: String, - driverPodNamespace: String, - jars: InputStream, - files: InputStream, + override def uploadResources( + podLabels: Map[String, String], + podNamespace: String, + resources: InputStream, kubernetesCredentials: KubernetesCredentials): String = { - val podNameAndNamespace = PodNameAndNamespace( - name = driverPodName, - namespace = driverPodNamespace) - REGISTERED_DRIVERS_LOCK.synchronized { - if (registeredDrivers.contains(podNameAndNamespace)) { - throw new SparkException(s"Spark application with driver pod named $driverPodName" + - s" and namespace $driverPodNamespace already uploaded its dependencies.") - } - registeredDrivers.add(podNameAndNamespace) - } + val resourcesId = UUID.randomUUID().toString + val secretBytes = new Array[Byte](1024) + SECURE_RANDOM.nextBytes(secretBytes) + val applicationSecret = resourcesId + "-" + BaseEncoding.base64().encode(secretBytes) + + val namespaceDir = new File(dependenciesRootDir, podNamespace) + val resourcesDir = new File(namespaceDir, resourcesId) try { - val secretBytes = new Array[Byte](1024) - SECURE_RANDOM.nextBytes(secretBytes) - val applicationSecret = UUID.randomUUID() + "-" + BaseEncoding.base64().encode(secretBytes) - val namespaceDir = new File(dependenciesRootDir, podNameAndNamespace.namespace) - val applicationDir = new File(namespaceDir, podNameAndNamespace.name) DIRECTORIES_LOCK.synchronized { - if (!applicationDir.exists()) { - if (!applicationDir.mkdirs()) { + if (!resourcesDir.exists()) { + if (!resourcesDir.mkdirs()) { throw new SparkException("Failed to create dependencies directory for application" + - s" at ${applicationDir.getAbsolutePath}") + s" at ${resourcesDir.getAbsolutePath}") } } } - val jarsTgz = new File(applicationDir, "jars.tgz") // TODO encrypt the written data with the secret. - Utils.tryWithResource(new FileOutputStream(jarsTgz)) { ByteStreams.copy(jars, _) } - val filesTgz = new File(applicationDir, "files.tgz") - Utils.tryWithResource(new FileOutputStream(filesTgz)) { ByteStreams.copy(files, _) } + val resourcesTgz = new File(resourcesDir, "resources.tgz") + Utils.tryWithResource(new FileOutputStream(resourcesTgz)) { ByteStreams.copy(resources, _) } SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized { sparkApplicationDependencies(applicationSecret) = SparkApplicationDependencies( - podNameAndNamespace, - jarsTgz, - filesTgz, + podLabels, + podNamespace, + resourcesTgz, kubernetesCredentials) } applicationSecret } catch { case e: Throwable => - // Revert the registered driver if we failed for any reason, most likely from disk ops - registeredDrivers.remove(podNameAndNamespace) + if (!resourcesDir.delete()) { + logWarning(s"Failed to delete application directory $resourcesDir.") + } throw e } } - override def downloadJars(applicationSecret: String): StreamingOutput = { - appFileToStreamingOutput(applicationSecret, _.jarsTgz) - } - - override def downloadFiles(applicationSecret: String): StreamingOutput = { - appFileToStreamingOutput(applicationSecret, _.filesTgz) - } - - private def appFileToStreamingOutput( - applicationSecret: String, - dependency: (SparkApplicationDependencies => File)): StreamingOutput = { + override def downloadResources(applicationSecret: String): StreamingOutput = { val applicationDependencies = SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized { sparkApplicationDependencies .get(applicationSecret) @@ -108,15 +87,14 @@ private[spark] class KubernetesSparkDependencyServiceImpl(dependenciesRootDir: F } new StreamingOutput { override def write(outputStream: OutputStream) = { - Files.copy(dependency(applicationDependencies), outputStream) + Files.copy(applicationDependencies.resourcesTgz, outputStream) } } } } -private case class PodNameAndNamespace(name: String, namespace: String) private case class SparkApplicationDependencies( - driverPodNameAndNamespace: PodNameAndNamespace, - jarsTgz: File, - filesTgz: File, + podLabels: Map[String, String], + podNamespace: String, + resourcesTgz: File, kubernetesCredentials: KubernetesCredentials) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala similarity index 65% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala index 86809ff695d6c..6967b2a0f9a31 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala @@ -23,28 +23,22 @@ import retrofit2.http.{Multipart, Streaming} import org.apache.spark.deploy.rest.KubernetesCredentials /** - * Retrofit-compatible variant of {@link KubernetesSparkDependencyService}. For documentation on + * Retrofit-compatible variant of {@link ResourceStagingService}. For documentation on * how to use this service, see the aforementioned JAX-RS based interface. */ -private[spark] trait KubernetesSparkDependencyServiceRetrofit { +private[spark] trait ResourceStagingServiceRetrofit { @Multipart - @retrofit2.http.PUT("/api/dependencies") + @retrofit2.http.PUT("/api/resources/upload") def uploadDependencies( - @retrofit2.http.Query("driverPodName") driverPodName: String, - @retrofit2.http.Query("driverPodNamespace") driverPodNamespace: String, - @retrofit2.http.Part("jars") jars: RequestBody, - @retrofit2.http.Part("files") files: RequestBody, + @retrofit2.http.Part("podLabels") podLabels: RequestBody, + @retrofit2.http.Part("podNamespace") podNamespace: RequestBody, + @retrofit2.http.Part("resources") resources: RequestBody, @retrofit2.http.Part("kubernetesCredentials") kubernetesCredentials: RequestBody): Call[String] @Streaming - @retrofit2.http.GET("/api/dependencies/jars") - def downloadJars( - @retrofit2.http.Header("Authorization") applicationSecret: String): Call[ResponseBody] - - @Streaming - @retrofit2.http.GET("/api/dependencies/files") - def downloadFiles( + @retrofit2.http.GET("/api/resources/download") + def downloadResources( @retrofit2.http.Header("Authorization") applicationSecret: String): Call[ResponseBody] } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala similarity index 72% rename from resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala rename to resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala index 5fceb5664c00a..b9c2ca6f87dda 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.deploy.rest.kubernetes.v2 +import java.net.ServerSocket import javax.ws.rs.core.MediaType import com.fasterxml.jackson.databind.ObjectMapper @@ -38,10 +39,11 @@ import org.apache.spark.util.Utils * we've configured the Jetty server correctly and that the endpoints reached over HTTP can * receive streamed uploads and can stream downloads. */ -class KubernetesSparkDependencyServerSuite extends SparkFunSuite with BeforeAndAfterAll { +class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfterAll { - private val serviceImpl = new KubernetesSparkDependencyServiceImpl(Utils.createTempDir()) - private val server = new KubernetesSparkDependencyServer(10021, serviceImpl) + private val serverPort = new ServerSocket(0).getLocalPort + private val serviceImpl = new ResourceStagingServiceImpl(Utils.createTempDir()) + private val server = new ResourceStagingServer(serverPort, serviceImpl) private val OBJECT_MAPPER = new ObjectMapper().registerModule(new DefaultScalaModule) override def beforeAll(): Unit = { @@ -53,27 +55,27 @@ class KubernetesSparkDependencyServerSuite extends SparkFunSuite with BeforeAndA } test("Accept file and jar uploads and downloads") { - val retrofitService = RetrofitUtils.createRetrofitClient("http://localhost:10021/", - classOf[KubernetesSparkDependencyServiceRetrofit]) - val jarsBytes = Array[Byte](1, 2, 3, 4) - val filesBytes = Array[Byte](5, 6, 7) - val jarsRequestBody = RequestBody.create( - okhttp3.MediaType.parse(MediaType.MULTIPART_FORM_DATA), jarsBytes) - val filesRequestBody = RequestBody.create( - okhttp3.MediaType.parse(MediaType.MULTIPART_FORM_DATA), filesBytes) + val retrofitService = RetrofitUtils.createRetrofitClient(s"http://localhost:$serverPort/", + classOf[ResourceStagingServiceRetrofit]) + val resourcesBytes = Array[Byte](1, 2, 3, 4) + val labels = Map("label1" -> "label1Value", "label2" -> "label2value") + val namespace = "namespace" + val labelsJson = OBJECT_MAPPER.writer().writeValueAsString(labels) + val resourcesRequestBody = RequestBody.create( + okhttp3.MediaType.parse(MediaType.MULTIPART_FORM_DATA), resourcesBytes) + val labelsRequestBody = RequestBody.create( + okhttp3.MediaType.parse(MediaType.APPLICATION_JSON), labelsJson) + val namespaceRequestBody = RequestBody.create( + okhttp3.MediaType.parse(MediaType.TEXT_PLAIN), namespace) val kubernetesCredentials = KubernetesCredentials(Some("token"), Some("ca-cert"), None, None) val kubernetesCredentialsString = OBJECT_MAPPER.writer() .writeValueAsString(kubernetesCredentials) val kubernetesCredentialsBody = RequestBody.create( okhttp3.MediaType.parse(MediaType.APPLICATION_JSON), kubernetesCredentialsString) - val uploadResponse = retrofitService.uploadDependencies("podName", "podNamespace", - jarsRequestBody, filesRequestBody, kubernetesCredentialsBody) + val uploadResponse = retrofitService.uploadDependencies( + labelsRequestBody, namespaceRequestBody, resourcesRequestBody, kubernetesCredentialsBody) val secret = getTypedResponseResult(uploadResponse) - - checkResponseBodyBytesMatches(retrofitService.downloadJars(secret), - jarsBytes) - checkResponseBodyBytesMatches(retrofitService.downloadFiles(secret), - filesBytes) + checkResponseBodyBytesMatches(retrofitService.downloadResources(secret), resourcesBytes) } private def getTypedResponseResult[T](call: Call[T]): T = { diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala similarity index 51% rename from resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala rename to resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala index ab91fa2db6fdd..6616d0fbe680b 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.deploy.rest.kubernetes.v2 -import java.io.ByteArrayInputStream +import java.io.{ByteArrayInputStream, File} import java.nio.file.Paths import java.util.UUID @@ -32,41 +32,30 @@ import org.apache.spark.util.Utils * differs from that of KubernetesSparkDependencyServerSuite as here we invoke the * implementation methods directly as opposed to over HTTP. */ -class KubernetesSparkDependencyServiceImplSuite extends SparkFunSuite with BeforeAndAfter { +class ResourceStagingServiceImplSuite extends SparkFunSuite with BeforeAndAfter { private val dependencyRootDir = Utils.createTempDir() - private val serviceImpl = new KubernetesSparkDependencyServiceImpl(dependencyRootDir) - private val jarsBytes = Array[Byte](1, 2, 3, 4) - private val filesBytes = Array[Byte](5, 6, 7) + private val serviceImpl = new ResourceStagingServiceImpl(dependencyRootDir) + private val resourceBytes = Array[Byte](1, 2, 3, 4) private val kubernetesCredentials = KubernetesCredentials( Some("token"), Some("caCert"), Some("key"), Some("cert")) - private var podName: String = _ - private var podNamespace: String = _ - - before { - podName = UUID.randomUUID().toString - podNamespace = UUID.randomUUID().toString - } + private var namespace = "namespace" + private val labels = Map("label1" -> "label1value", "label2" -> "label2value") test("Uploads should write data to the underlying disk") { - Utils.tryWithResource(new ByteArrayInputStream(jarsBytes)) { jarsStream => - Utils.tryWithResource(new ByteArrayInputStream(filesBytes)) { filesStream => - serviceImpl.uploadDependencies( - "name", "namespace", jarsStream, filesStream, kubernetesCredentials) - } + Utils.tryWithResource(new ByteArrayInputStream(resourceBytes)) { resourceStream => + serviceImpl.uploadResources(labels, namespace, resourceStream, kubernetesCredentials) } - val jarsTgz = Paths.get(dependencyRootDir.getAbsolutePath, "namespace", "name", "jars.tgz") - .toFile - assert(jarsTgz.isFile, - s"Jars written to ${jarsTgz.getAbsolutePath} does not exist or is not a file.") - val jarsTgzBytes = Files.toByteArray(jarsTgz) - assert(jarsBytes.toSeq === jarsTgzBytes.toSeq, "Incorrect jars bytes were written.") - val filesTgz = Paths.get(dependencyRootDir.getAbsolutePath, "namespace", "name", "files.tgz") - .toFile - assert(filesTgz.isFile, - s"Files written to ${filesTgz.getAbsolutePath} does not exist or is not a file.") - val filesTgzBytes = Files.toByteArray(filesTgz) - assert(filesBytes.toSeq === filesTgzBytes.toSeq, "Incorrect files bytes were written.") + val resourceNamespaceDir = Paths.get(dependencyRootDir.getAbsolutePath, "namespace").toFile + assert(resourceNamespaceDir.isDirectory, s"Resource namespace dir was not created at" + + s" ${resourceNamespaceDir.getAbsolutePath} or is not a directory.") + val resourceDirs = resourceNamespaceDir.listFiles() + assert(resourceDirs.length === 1, s"Resource root directory did not have exactly one" + + s" subdirectory. Got: ${resourceDirs.map(_.getAbsolutePath).mkString(",")}") + val resourceTgz = new File(resourceDirs(0), "resources.tgz") + assert(resourceTgz.isFile, + s"Resources written to ${resourceTgz.getAbsolutePath} does not exist or is not a file.") + val resourceTgzBytes = Files.toByteArray(resourceTgz) + assert(resourceTgzBytes.toSeq === resourceBytes.toSeq, "Incorrect resource bytes were written.") } - } From 24452ece52017c106b4f56d8f40cba3eeeee01ac Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 12 Apr 2017 13:49:23 -0700 Subject: [PATCH 06/14] Update code documentation --- .../rest/kubernetes/v2/ResourceStagingService.scala | 13 +++++++------ .../v2/ResourceStagingServiceImplSuite.scala | 3 ++- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala index 4d841c42af6ba..b9f373190d8e7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala @@ -25,19 +25,20 @@ import org.glassfish.jersey.media.multipart.FormDataParam import org.apache.spark.deploy.rest.KubernetesCredentials /** - * Service that receives application data that can be received later on. This is primarily used + * Service that receives application data that can be retrieved later on. This is primarily used * in the context of Spark, but the concept is generic enough to be used for arbitrary applications. * The use case is to have a place for Kubernetes application submitters to bootstrap dynamic, * heavyweight application data for pods. Application submitters may have data stored on their * local disks that they want to provide to the pods they create through the API server. ConfigMaps * are one way to provide this data, but the data in ConfigMaps are stored in etcd which cannot - * maintain data in the hundreds of megabytes in size.
- *
+ * maintain data in the hundreds of megabytes in size. + *

* The general use case is for an application submitter to ship the dependencies to the server via - * {@link uploadDependencies}; the application submitter will then receive a unique secure token. + * {@link uploadResources}; the application submitter will then receive a unique secure token. * The application submitter then ought to convert the token into a secret, and use this secret in - * a pod that fetches the uploaded dependencies via {@link downloadJars}, {@link downloadFiles}, and - * {@link getKubernetesCredentials}. + * a pod that fetches the uploaded dependencies via {@link downloadResources}. An application can + * provide multiple resource bundles simply by hitting the upload endpoint multiple times and + * downloading each bundle with the appropriate secret. */ @Path("/") private[spark] trait ResourceStagingService { diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala index 6616d0fbe680b..86a45d0f1fe5b 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala @@ -30,7 +30,8 @@ import org.apache.spark.util.Utils /** * Unit, scala-level tests for KubernetesSparkDependencyServiceImpl. The coverage here * differs from that of KubernetesSparkDependencyServerSuite as here we invoke the - * implementation methods directly as opposed to over HTTP. + * implementation methods directly as opposed to over HTTP, as well as check the + * data written to the underlying disk. */ class ResourceStagingServiceImplSuite extends SparkFunSuite with BeforeAndAfter { From f59717156f49980e0df1ea8eef3bae8a046f2ccd Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 12 Apr 2017 13:57:22 -0700 Subject: [PATCH 07/14] Val instead of var --- .../rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala index 86a45d0f1fe5b..c124fce77cbf5 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy.rest.kubernetes.v2 import java.io.{ByteArrayInputStream, File} import java.nio.file.Paths -import java.util.UUID import com.google.common.io.Files import org.scalatest.BeforeAndAfter @@ -40,7 +39,7 @@ class ResourceStagingServiceImplSuite extends SparkFunSuite with BeforeAndAfter private val resourceBytes = Array[Byte](1, 2, 3, 4) private val kubernetesCredentials = KubernetesCredentials( Some("token"), Some("caCert"), Some("key"), Some("cert")) - private var namespace = "namespace" + private val namespace = "namespace" private val labels = Map("label1" -> "label1value", "label2" -> "label2value") test("Uploads should write data to the underlying disk") { From e5f26aa6028cc1398f4f0adb262a61f5f74a32fd Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 12 Apr 2017 17:51:42 -0700 Subject: [PATCH 08/14] Fix naming, remove unused import --- .../rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala | 4 +--- .../rest/kubernetes/v2/ResourceStagingServerSuite.scala | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala index 6967b2a0f9a31..6ab118a44b7ed 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala @@ -20,8 +20,6 @@ import okhttp3.{RequestBody, ResponseBody} import retrofit2.Call import retrofit2.http.{Multipart, Streaming} -import org.apache.spark.deploy.rest.KubernetesCredentials - /** * Retrofit-compatible variant of {@link ResourceStagingService}. For documentation on * how to use this service, see the aforementioned JAX-RS based interface. @@ -30,7 +28,7 @@ private[spark] trait ResourceStagingServiceRetrofit { @Multipart @retrofit2.http.PUT("/api/resources/upload") - def uploadDependencies( + def uploadResources( @retrofit2.http.Part("podLabels") podLabels: RequestBody, @retrofit2.http.Part("podNamespace") podNamespace: RequestBody, @retrofit2.http.Part("resources") resources: RequestBody, diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala index b9c2ca6f87dda..db06ee8b6d650 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala @@ -72,7 +72,7 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfterAll { .writeValueAsString(kubernetesCredentials) val kubernetesCredentialsBody = RequestBody.create( okhttp3.MediaType.parse(MediaType.APPLICATION_JSON), kubernetesCredentialsString) - val uploadResponse = retrofitService.uploadDependencies( + val uploadResponse = retrofitService.uploadResources( labelsRequestBody, namespaceRequestBody, resourcesRequestBody, kubernetesCredentialsBody) val secret = getTypedResponseResult(uploadResponse) checkResponseBodyBytesMatches(retrofitService.downloadResources(secret), resourcesBytes) From c408ff9a46a2f079bab6049ba60139c676286f7d Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 12 Apr 2017 17:55:37 -0700 Subject: [PATCH 09/14] Move suites from integration test package to core --- .../deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala | 0 .../rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename resource-managers/kubernetes/{integration-tests => core}/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala (100%) rename resource-managers/kubernetes/{integration-tests => core}/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala (100%) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala similarity index 100% rename from resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala rename to resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala similarity index 100% rename from resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala rename to resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala From 64eddc1da37eff926f953d542ee9a9b29afe1653 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 19 Apr 2017 13:39:37 -0700 Subject: [PATCH 10/14] Use TrieMap instead of locks --- .../v2/ResourceStagingServiceImpl.scala | 34 +++++++------------ 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala index 3e0c0906a574b..74e766b6a84f6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala @@ -22,7 +22,7 @@ import java.util.UUID import javax.ws.rs.core.StreamingOutput import com.google.common.io.{BaseEncoding, ByteStreams, Files} -import scala.collection.mutable +import scala.collection.concurrent.TrieMap import org.apache.spark.SparkException import org.apache.spark.deploy.rest.KubernetesCredentials @@ -32,11 +32,9 @@ import org.apache.spark.util.Utils private[spark] class ResourceStagingServiceImpl(dependenciesRootDir: File) extends ResourceStagingService with Logging { - private val DIRECTORIES_LOCK = new Object - private val SPARK_APPLICATION_DEPENDENCIES_LOCK = new Object private val SECURE_RANDOM = new SecureRandom() // TODO clean up these resources based on the driver's lifecycle - private val sparkApplicationDependencies = mutable.Map.empty[String, SparkApplicationDependencies] + private val sparkApplicationDependencies = TrieMap.empty[String, ApplicationResources] override def uploadResources( podLabels: Map[String, String], @@ -51,24 +49,20 @@ private[spark] class ResourceStagingServiceImpl(dependenciesRootDir: File) val namespaceDir = new File(dependenciesRootDir, podNamespace) val resourcesDir = new File(namespaceDir, resourcesId) try { - DIRECTORIES_LOCK.synchronized { - if (!resourcesDir.exists()) { - if (!resourcesDir.mkdirs()) { - throw new SparkException("Failed to create dependencies directory for application" + - s" at ${resourcesDir.getAbsolutePath}") - } + if (!resourcesDir.exists()) { + if (!resourcesDir.mkdirs()) { + throw new SparkException("Failed to create dependencies directory for application" + + s" at ${resourcesDir.getAbsolutePath}") } } // TODO encrypt the written data with the secret. val resourcesTgz = new File(resourcesDir, "resources.tgz") Utils.tryWithResource(new FileOutputStream(resourcesTgz)) { ByteStreams.copy(resources, _) } - SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized { - sparkApplicationDependencies(applicationSecret) = SparkApplicationDependencies( - podLabels, - podNamespace, - resourcesTgz, - kubernetesCredentials) - } + sparkApplicationDependencies(applicationSecret) = ApplicationResources( + podLabels, + podNamespace, + resourcesTgz, + kubernetesCredentials) applicationSecret } catch { case e: Throwable => @@ -80,11 +74,9 @@ private[spark] class ResourceStagingServiceImpl(dependenciesRootDir: File) } override def downloadResources(applicationSecret: String): StreamingOutput = { - val applicationDependencies = SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized { - sparkApplicationDependencies + val applicationDependencies = sparkApplicationDependencies .get(applicationSecret) .getOrElse(throw new SparkException("No application found for the provided token.")) - } new StreamingOutput { override def write(outputStream: OutputStream) = { Files.copy(applicationDependencies.resourcesTgz, outputStream) @@ -93,7 +85,7 @@ private[spark] class ResourceStagingServiceImpl(dependenciesRootDir: File) } } -private case class SparkApplicationDependencies( +private case class ApplicationResources( podLabels: Map[String, String], podNamespace: String, resourcesTgz: File, From 8f798022e115c316522da1cb460f71ae6827e7f6 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 19 Apr 2017 13:41:57 -0700 Subject: [PATCH 11/14] Address comments --- .../rest/kubernetes/v2/ResourceStagingServiceImpl.scala | 2 +- .../rest/kubernetes/v2/ResourceStagingServerSuite.scala | 2 +- .../rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala index 74e766b6a84f6..df5f83e9b4373 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala @@ -56,7 +56,7 @@ private[spark] class ResourceStagingServiceImpl(dependenciesRootDir: File) } } // TODO encrypt the written data with the secret. - val resourcesTgz = new File(resourcesDir, "resources.tgz") + val resourcesTgz = new File(resourcesDir, "resources.data") Utils.tryWithResource(new FileOutputStream(resourcesTgz)) { ByteStreams.copy(resources, _) } sparkApplicationDependencies(applicationSecret) = ApplicationResources( podLabels, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala index db06ee8b6d650..098f30a2ea9da 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.deploy.rest.KubernetesCredentials import org.apache.spark.util.Utils /** - * Tests for KubernetesSparkDependencyServer and its APIs. Note that this is not an end-to-end + * Tests for {@link ResourceStagingServer} and its APIs. Note that this is not an end-to-end * integration test, and as such does not upload and download files in tar.gz as would be done * in production. Thus we use the retrofit clients directly despite the fact that in practice * we would likely want to create an opinionated abstraction on top of the retrofit client; we diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala index c124fce77cbf5..5bc37a54161b2 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.util.Utils * implementation methods directly as opposed to over HTTP, as well as check the * data written to the underlying disk. */ -class ResourceStagingServiceImplSuite extends SparkFunSuite with BeforeAndAfter { +class ResourceStagingServiceImplSuite extends SparkFunSuite { private val dependencyRootDir = Utils.createTempDir() private val serviceImpl = new ResourceStagingServiceImpl(dependencyRootDir) @@ -52,7 +52,7 @@ class ResourceStagingServiceImplSuite extends SparkFunSuite with BeforeAndAfter val resourceDirs = resourceNamespaceDir.listFiles() assert(resourceDirs.length === 1, s"Resource root directory did not have exactly one" + s" subdirectory. Got: ${resourceDirs.map(_.getAbsolutePath).mkString(",")}") - val resourceTgz = new File(resourceDirs(0), "resources.tgz") + val resourceTgz = new File(resourceDirs(0), "resources.data") assert(resourceTgz.isFile, s"Resources written to ${resourceTgz.getAbsolutePath} does not exist or is not a file.") val resourceTgzBytes = Files.toByteArray(resourceTgz) From 04099d61a87e3a6886950aa68944f2cbe4a80145 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 19 Apr 2017 13:49:16 -0700 Subject: [PATCH 12/14] Fix imports --- .../rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala index 5bc37a54161b2..b92257005d5df 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala @@ -20,7 +20,6 @@ import java.io.{ByteArrayInputStream, File} import java.nio.file.Paths import com.google.common.io.Files -import org.scalatest.BeforeAndAfter import org.apache.spark.SparkFunSuite import org.apache.spark.deploy.rest.KubernetesCredentials From d713c270e87a4552d733f18a08cdb32b6e2647b1 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 20 Apr 2017 15:06:36 -0700 Subject: [PATCH 13/14] Change paths, use POST instead of PUT --- .../rest/kubernetes/v2/ResourceStagingService.scala | 8 ++++---- .../kubernetes/v2/ResourceStagingServiceRetrofit.scala | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala index b9f373190d8e7..4f79f244289d2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.rest.kubernetes.v2 import java.io.InputStream -import javax.ws.rs.{Consumes, GET, HeaderParam, Path, Produces, PUT, QueryParam} +import javax.ws.rs.{Consumes, GET, HeaderParam, Path, POST, Produces} import javax.ws.rs.core.{MediaType, StreamingOutput} import org.glassfish.jersey.media.multipart.FormDataParam @@ -60,10 +60,10 @@ private[spark] trait ResourceStagingService { * the data uploaded through this endpoint is cleared. * @return A unique token that should be provided when retrieving these dependencies later. */ - @PUT + @POST @Consumes(Array(MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN)) @Produces(Array(MediaType.TEXT_PLAIN)) - @Path("/resources/upload") + @Path("/resources/") def uploadResources( @FormDataParam("podLabels") podLabels: Map[String, String], @FormDataParam("podNamespace") podNamespace: String, @@ -78,7 +78,7 @@ private[spark] trait ResourceStagingService { @GET @Consumes(Array(MediaType.APPLICATION_JSON)) @Produces(Array(MediaType.APPLICATION_OCTET_STREAM)) - @Path("/resources/download") + @Path("/resources/") def downloadResources( @HeaderParam("Authorization") applicationSecret: String): StreamingOutput } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala index 6ab118a44b7ed..04746d238957a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala @@ -27,7 +27,7 @@ import retrofit2.http.{Multipart, Streaming} private[spark] trait ResourceStagingServiceRetrofit { @Multipart - @retrofit2.http.PUT("/api/resources/upload") + @retrofit2.http.POST("/api/resources/") def uploadResources( @retrofit2.http.Part("podLabels") podLabels: RequestBody, @retrofit2.http.Part("podNamespace") podNamespace: RequestBody, @@ -36,7 +36,7 @@ private[spark] trait ResourceStagingServiceRetrofit { kubernetesCredentials: RequestBody): Call[String] @Streaming - @retrofit2.http.GET("/api/resources/download") + @retrofit2.http.GET("/api/resources/") def downloadResources( @retrofit2.http.Header("Authorization") applicationSecret: String): Call[ResponseBody] } From 720c38d03b4c8d6c4afdf85bc8845aa6de55e21a Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 20 Apr 2017 16:21:17 -0700 Subject: [PATCH 14/14] Use a resource identifier as well as a resource secret --- .../v2/ResourceStagingService.scala | 17 +++++----- .../v2/ResourceStagingServiceImpl.scala | 34 +++++++++++-------- .../v2/ResourceStagingServiceRetrofit.scala | 12 +++---- .../v2/StagedResourceIdentifier.scala | 19 +++++++++++ .../v2/ResourceStagingServerSuite.scala | 6 ++-- 5 files changed, 58 insertions(+), 30 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/StagedResourceIdentifier.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala index 4f79f244289d2..5f7ceb461615e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.rest.kubernetes.v2 import java.io.InputStream -import javax.ws.rs.{Consumes, GET, HeaderParam, Path, POST, Produces} +import javax.ws.rs.{Consumes, GET, HeaderParam, Path, PathParam, POST, Produces} import javax.ws.rs.core.{MediaType, StreamingOutput} import org.glassfish.jersey.media.multipart.FormDataParam @@ -40,11 +40,11 @@ import org.apache.spark.deploy.rest.KubernetesCredentials * provide multiple resource bundles simply by hitting the upload endpoint multiple times and * downloading each bundle with the appropriate secret. */ -@Path("/") +@Path("/v0") private[spark] trait ResourceStagingService { /** - * Register an application with the dependency service, so that pods with the given labels can + * Register a resource with the dependency service, so that pods with the given labels can * retrieve them when they run. * * @param resources Application resources to upload, compacted together in tar + gzip format. @@ -62,14 +62,14 @@ private[spark] trait ResourceStagingService { */ @POST @Consumes(Array(MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN)) - @Produces(Array(MediaType.TEXT_PLAIN)) - @Path("/resources/") + @Produces(Array(MediaType.APPLICATION_JSON)) + @Path("/resources") def uploadResources( @FormDataParam("podLabels") podLabels: Map[String, String], @FormDataParam("podNamespace") podNamespace: String, @FormDataParam("resources") resources: InputStream, @FormDataParam("kubernetesCredentials") kubernetesCredentials: KubernetesCredentials) - : String + : StagedResourceIdentifier /** * Download an application's resources. The resources are provided as a stream, where the stream's @@ -78,7 +78,8 @@ private[spark] trait ResourceStagingService { @GET @Consumes(Array(MediaType.APPLICATION_JSON)) @Produces(Array(MediaType.APPLICATION_OCTET_STREAM)) - @Path("/resources/") + @Path("/resources/{resourceId}") def downloadResources( - @HeaderParam("Authorization") applicationSecret: String): StreamingOutput + @PathParam("resourceId") resourceId: String, + @HeaderParam("Authorization") resourceSecret: String): StreamingOutput } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala index df5f83e9b4373..bb338dacdf511 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.rest.kubernetes.v2 import java.io.{File, FileOutputStream, InputStream, OutputStream} import java.security.SecureRandom import java.util.UUID +import javax.ws.rs.{NotAuthorizedException, NotFoundException} import javax.ws.rs.core.StreamingOutput import com.google.common.io.{BaseEncoding, ByteStreams, Files} @@ -34,20 +35,20 @@ private[spark] class ResourceStagingServiceImpl(dependenciesRootDir: File) private val SECURE_RANDOM = new SecureRandom() // TODO clean up these resources based on the driver's lifecycle - private val sparkApplicationDependencies = TrieMap.empty[String, ApplicationResources] + private val stagedResources = TrieMap.empty[String, StagedResources] override def uploadResources( podLabels: Map[String, String], podNamespace: String, resources: InputStream, - kubernetesCredentials: KubernetesCredentials): String = { - val resourcesId = UUID.randomUUID().toString + kubernetesCredentials: KubernetesCredentials): StagedResourceIdentifier = { + val resourceId = UUID.randomUUID().toString val secretBytes = new Array[Byte](1024) SECURE_RANDOM.nextBytes(secretBytes) - val applicationSecret = resourcesId + "-" + BaseEncoding.base64().encode(secretBytes) + val resourceSecret = resourceId + "-" + BaseEncoding.base64().encode(secretBytes) val namespaceDir = new File(dependenciesRootDir, podNamespace) - val resourcesDir = new File(namespaceDir, resourcesId) + val resourcesDir = new File(namespaceDir, resourceId) try { if (!resourcesDir.exists()) { if (!resourcesDir.mkdirs()) { @@ -58,12 +59,13 @@ private[spark] class ResourceStagingServiceImpl(dependenciesRootDir: File) // TODO encrypt the written data with the secret. val resourcesTgz = new File(resourcesDir, "resources.data") Utils.tryWithResource(new FileOutputStream(resourcesTgz)) { ByteStreams.copy(resources, _) } - sparkApplicationDependencies(applicationSecret) = ApplicationResources( + stagedResources(resourceId) = StagedResources( + resourceSecret, podLabels, podNamespace, resourcesTgz, kubernetesCredentials) - applicationSecret + StagedResourceIdentifier(resourceId, resourceSecret) } catch { case e: Throwable => if (!resourcesDir.delete()) { @@ -73,20 +75,24 @@ private[spark] class ResourceStagingServiceImpl(dependenciesRootDir: File) } } - override def downloadResources(applicationSecret: String): StreamingOutput = { - val applicationDependencies = sparkApplicationDependencies - .get(applicationSecret) - .getOrElse(throw new SparkException("No application found for the provided token.")) + override def downloadResources(resourceId: String, resourceSecret: String): StreamingOutput = { + val resource = stagedResources + .get(resourceId) + .getOrElse(throw new NotFoundException(s"No resource bundle found with id $resourceId")) + if (!resource.resourceSecret.equals(resourceSecret)) { + throw new NotAuthorizedException(s"Unauthorized to download resource with id $resourceId") + } new StreamingOutput { override def write(outputStream: OutputStream) = { - Files.copy(applicationDependencies.resourcesTgz, outputStream) + Files.copy(resource.resourcesFile, outputStream) } } } } -private case class ApplicationResources( +private case class StagedResources( + resourceSecret: String, podLabels: Map[String, String], podNamespace: String, - resourcesTgz: File, + resourcesFile: File, kubernetesCredentials: KubernetesCredentials) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala index 04746d238957a..daf03f764b35a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.rest.kubernetes.v2 import okhttp3.{RequestBody, ResponseBody} import retrofit2.Call -import retrofit2.http.{Multipart, Streaming} +import retrofit2.http.{Multipart, Path, Streaming} /** * Retrofit-compatible variant of {@link ResourceStagingService}. For documentation on @@ -27,16 +27,16 @@ import retrofit2.http.{Multipart, Streaming} private[spark] trait ResourceStagingServiceRetrofit { @Multipart - @retrofit2.http.POST("/api/resources/") + @retrofit2.http.POST("/api/v0/resources/") def uploadResources( @retrofit2.http.Part("podLabels") podLabels: RequestBody, @retrofit2.http.Part("podNamespace") podNamespace: RequestBody, @retrofit2.http.Part("resources") resources: RequestBody, @retrofit2.http.Part("kubernetesCredentials") - kubernetesCredentials: RequestBody): Call[String] + kubernetesCredentials: RequestBody): Call[StagedResourceIdentifier] @Streaming - @retrofit2.http.GET("/api/resources/") - def downloadResources( - @retrofit2.http.Header("Authorization") applicationSecret: String): Call[ResponseBody] + @retrofit2.http.GET("/api/v0/resources/{resourceId}") + def downloadResources(@Path("resourceId") resourceId: String, + @retrofit2.http.Header("Authorization") resourceSecret: String): Call[ResponseBody] } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/StagedResourceIdentifier.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/StagedResourceIdentifier.scala new file mode 100644 index 0000000000000..65bc9bc17dae9 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/StagedResourceIdentifier.scala @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes.v2 + +case class StagedResourceIdentifier(resourceId: String, resourceSecret: String) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala index 098f30a2ea9da..70ba5be395042 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala @@ -74,8 +74,10 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfterAll { okhttp3.MediaType.parse(MediaType.APPLICATION_JSON), kubernetesCredentialsString) val uploadResponse = retrofitService.uploadResources( labelsRequestBody, namespaceRequestBody, resourcesRequestBody, kubernetesCredentialsBody) - val secret = getTypedResponseResult(uploadResponse) - checkResponseBodyBytesMatches(retrofitService.downloadResources(secret), resourcesBytes) + val resourceIdentifier = getTypedResponseResult(uploadResponse) + checkResponseBodyBytesMatches( + retrofitService.downloadResources( + resourceIdentifier.resourceId, resourceIdentifier.resourceSecret), resourcesBytes) } private def getTypedResponseResult[T](call: Call[T]): T = {