From 71b825af3390248d43919fe669f11cabb50f5636 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 3 Apr 2017 13:02:04 -0700 Subject: [PATCH 01/22] Staging server for receiving application dependencies. --- pom.xml | 21 +++ resource-managers/kubernetes/core/pom.xml | 21 +++ .../v2/KubernetesSparkDependencyServer.scala | 61 ++++++++ .../v2/KubernetesSparkDependencyService.scala | 106 ++++++++++++++ ...KubernetesSparkDependencyServiceImpl.scala | 130 ++++++++++++++++++ ...rnetesSparkDependencyServiceRetrofit.scala | 55 ++++++++ .../rest/kubernetes/v2/RetrofitUtils.scala | 38 +++++ ...KubernetesSparkDependencyServerSuite.scala | 101 ++++++++++++++ ...netesSparkDependencyServiceImplSuite.scala | 49 +++++++ 9 files changed, 582 insertions(+) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/RetrofitUtils.scala create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala diff --git a/pom.xml b/pom.xml index 9cfaf6eb6532..0149f802e461 100644 --- a/pom.xml +++ b/pom.xml @@ -137,6 +137,7 @@ 1.8.1 1.6.0 8.18.0 + 2.2.0 1.52 9.2.16.v20160414 3.1.0 @@ -327,6 +328,21 @@ feign-jaxrs ${feign.version} + + com.squareup.retrofit2 + retrofit + ${retrofit.version} + + + com.squareup.retrofit2 + converter-jackson + ${retrofit.version} + + + com.squareup.retrofit2 + converter-scalars + ${retrofit.version} + org.bouncycastle bcpkix-jdk15on @@ -681,6 +697,11 @@ jersey-client ${jersey.version} + + org.glassfish.jersey.media + jersey-media-multipart + ${jersey.version} + javax.ws.rs javax.ws.rs-api diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index 6d2f1d0fd276..9e8b3436ab36 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -60,10 +60,31 @@ com.netflix.feign feign-okhttp + + org.glassfish.jersey.containers + jersey-container-servlet + + + org.glassfish.jersey.media + jersey-media-multipart + com.netflix.feign feign-jackson + + com.squareup.retrofit2 + retrofit + + + com.squareup.retrofit2 + converter-jackson + + + com.squareup.retrofit2 + converter-scalars + + com.netflix.feign feign-jaxrs diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala new file mode 100644 index 000000000000..ace750f1927f --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes.v2 + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.jaxrs.json.{JacksonJaxbJsonProvider, JacksonJsonProvider} +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.eclipse.jetty.server.{Server, ServerConnector} +import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} +import org.eclipse.jetty.util.thread.QueuedThreadPool +import org.glassfish.jersey.media.multipart.MultiPartFeature +import org.glassfish.jersey.server.ResourceConfig +import org.glassfish.jersey.servlet.ServletContainer + +private[spark] class KubernetesSparkDependencyServer( + port: Int, + serviceInstance: KubernetesSparkDependencyService) { + + private var jettyServer: Option[Server] = None + + def start(): Unit = synchronized { + val threadPool = new QueuedThreadPool + val contextHandler = new ServletContextHandler() + val jsonProvider = new JacksonJaxbJsonProvider() + jsonProvider.setMapper(new ObjectMapper().registerModule(new DefaultScalaModule)) + val resourceConfig = new ResourceConfig().registerInstances( + serviceInstance, + jsonProvider, + new MultiPartFeature) + val servletHolder = new ServletHolder("main", new ServletContainer(resourceConfig)) + contextHandler.setContextPath("/api/") + contextHandler.addServlet(servletHolder, "/*") + threadPool.setDaemon(true) + val server = new Server(threadPool) + val connector = new ServerConnector(server) + connector.setPort(port) + server.addConnector(connector) + server.setHandler(contextHandler) + server.start() + jettyServer = Some(server) + } + + def stop(): Unit = synchronized { + jettyServer.foreach(_.stop()) + jettyServer = None + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala new file mode 100644 index 000000000000..a1e879231a7e --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes.v2 + +import java.io.InputStream +import javax.ws.rs.{Consumes, GET, HeaderParam, Path, Produces, PUT, QueryParam} +import javax.ws.rs.core.{MediaType, StreamingOutput} + +import org.glassfish.jersey.media.multipart.FormDataParam + +import org.apache.spark.deploy.rest.KubernetesCredentials + +/** + * Service that receives application data that can be received later on. This is primarily used + * in the context of Spark, but the concept is generic enough to be used for arbitrary applications. + * The use case is to have a place for Kubernetes application submitters to bootstrap dynamic, + * heavyweight application data for pods. Application submitters may have data stored on their + * local disks that they want to provide to the pods they create through the API server. ConfigMaps + * are one way to provide this data, but the data in ConfigMaps are stored in etcd which cannot + * maintain data in the hundreds of megabytes in size.
+ *
+ * The general use case is for an application submitter to ship the dependencies to the server via + * {@link uploadDependencies}; the application submitter will then receive a unique secure token. + * The application submitter then ought to convert the token into a secret, and use this secret in + * a pod that fetches the uploaded dependencies via {@link downloadJars}, {@link downloadFiles}, and + * {@link getKubernetesCredentials}. + */ +@Path("/") +private[spark] trait KubernetesSparkDependencyService { + + /** + * Register an application with the dependency service, so that the driver pod can retrieve them + * when it runs. + * + * @param driverPodName Name of the driver pod. + * @param driverPodNamespace Namespace for the driver pod. + * @param jars Application jars to upload, compacted together in tar + gzip format. The tarball + * should contain the jar files laid out in a flat hierarchy, without any directories. + * We take a stream here to avoid holding these entirely in memory. + * @param files Application files to upload, compacted together in tar + gzip format. THe tarball + * should contain the files laid out in a flat hierarchy, without any directories. + * We take a stream here to avoid holding these entirely in memory. + * @param kubernetesCredentials These credentials are primarily used to monitor the progress of + * the application. When the application shuts down normally, shuts + * down abnormally and does not restart, or fails to start entirely, + * the data uploaded through this endpoint is cleared. + * @return A unique token that should be provided when retrieving these dependencies later. + */ + @PUT + @Consumes(Array(MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON)) + @Produces(Array(MediaType.TEXT_PLAIN)) + @Path("/dependencies") + def uploadDependencies( + @QueryParam("driverPodName") driverPodName: String, + @QueryParam("driverPodNamespace") driverPodNamespace: String, + @FormDataParam("jars") jars: InputStream, + @FormDataParam("files") files: InputStream, + @FormDataParam("kubernetesCredentials") kubernetesCredentials: KubernetesCredentials) + : String + + /** + * Download an application's jars. The jars are provided as a stream, where the stream's + * underlying data matches the stream that was uploaded in {@link uploadDependencies}. + */ + @GET + @Consumes(Array(MediaType.APPLICATION_JSON)) + @Produces(Array(MediaType.APPLICATION_OCTET_STREAM)) + @Path("/dependencies/jars") + def downloadJars( + @HeaderParam("Authorization") applicationSecret: String): StreamingOutput + + /** + * Download an application's files. The jars are provided as a stream, where the stream's + * underlying data matches the stream that was uploaded in {@link uploadDependencies}. + */ + @GET + @Consumes(Array(MediaType.APPLICATION_JSON)) + @Produces(Array(MediaType.APPLICATION_OCTET_STREAM)) + @Path("/dependencies/files") + def downloadFiles( + @HeaderParam("Authorization") applicationSecret: String): StreamingOutput + + /** + * Retrieve the Kubernetes credentials being used to monitor the Spark application. + */ + @GET + @Consumes(Array(MediaType.APPLICATION_JSON)) + @Produces(Array(MediaType.APPLICATION_JSON)) + @Path("/dependencies/credentials") + def getKubernetesCredentials( + @HeaderParam("Authorization") applicationSecret: String): KubernetesCredentials +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala new file mode 100644 index 000000000000..d1892f0f455c --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes.v2 + +import java.io.{File, FileOutputStream, InputStream, OutputStream} +import java.security.SecureRandom +import java.util.UUID +import javax.ws.rs.core.StreamingOutput + +import com.google.common.io.{BaseEncoding, ByteStreams, Files} +import scala.collection.mutable + +import org.apache.spark.SparkException +import org.apache.spark.deploy.rest.KubernetesCredentials +import org.apache.spark.util.Utils + +private[spark] class KubernetesSparkDependencyServiceImpl(dependenciesRootDir: File) + extends KubernetesSparkDependencyService { + + private val DIRECTORIES_LOCK = new Object + private val REGISTERED_DRIVERS_LOCK = new Object + private val SPARK_APPLICATION_DEPENDENCIES_LOCK = new Object + private val SECURE_RANDOM = new SecureRandom() + // TODO clean up these resources based on the driver's lifecycle + private val registeredDrivers = mutable.Set.empty[PodNameAndNamespace] + private val sparkApplicationDependencies = mutable.Map.empty[String, SparkApplicationDependencies] + + override def uploadDependencies( + driverPodName: String, + driverPodNamespace: String, + jars: InputStream, + files: InputStream, + kubernetesCredentials: KubernetesCredentials): String = { + val podNameAndNamespace = PodNameAndNamespace( + name = driverPodName, + namespace = driverPodNamespace) + REGISTERED_DRIVERS_LOCK.synchronized { + if (registeredDrivers.contains(podNameAndNamespace)) { + throw new SparkException(s"Spark application with driver pod named $driverPodName" + + s" and namespace $driverPodNamespace already uploaded its dependencies.") + } + registeredDrivers.add(podNameAndNamespace) + } + try { + val secretBytes = new Array[Byte](1024) + SECURE_RANDOM.nextBytes(secretBytes) + val applicationSecret = UUID.randomUUID() + "-" + BaseEncoding.base64().encode(secretBytes) + val namespaceDir = new File(dependenciesRootDir, podNameAndNamespace.namespace) + val applicationDir = new File(namespaceDir, podNameAndNamespace.name) + DIRECTORIES_LOCK.synchronized { + if (!applicationDir.exists()) { + if (!applicationDir.mkdirs()) { + throw new SparkException("Failed to create dependencies directory for application" + + s" at ${applicationDir.getAbsolutePath}") + } + } + } + val jarsTgz = new File(applicationDir, "jars.tgz") + // TODO encrypt the written data with the secret. + Utils.tryWithResource(new FileOutputStream(jarsTgz)) { ByteStreams.copy(jars, _) } + val filesTgz = new File(applicationDir, "files.tgz") + Utils.tryWithResource(new FileOutputStream(filesTgz)) { ByteStreams.copy(files, _) } + SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized { + sparkApplicationDependencies(applicationSecret) = SparkApplicationDependencies( + podNameAndNamespace, + jarsTgz, + filesTgz, + kubernetesCredentials) + } + applicationSecret + } catch { + case e: Throwable => + // Revert the registered driver if we failed for any reason, most likely from disk ops + registeredDrivers.remove(podNameAndNamespace) + throw e + } + } + + override def downloadJars(applicationSecret: String): StreamingOutput = { + appFileToStreamingOutput(applicationSecret, _.jarsTgz) + } + + override def downloadFiles(applicationSecret: String): StreamingOutput = { + appFileToStreamingOutput(applicationSecret, _.filesTgz) + } + + override def getKubernetesCredentials(applicationSecret: String): KubernetesCredentials = { + SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized { + sparkApplicationDependencies.get(applicationSecret) + .map(_.kubernetesCredentials) + .getOrElse(throw new SparkException("No application found for the provided token.")) + } + } + + private def appFileToStreamingOutput( + applicationSecret: String, + dependency: (SparkApplicationDependencies => File)): StreamingOutput = { + val applicationDependencies = SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized { + sparkApplicationDependencies + .get(applicationSecret) + .getOrElse(throw new SparkException("No application found for the provided token.")) + } + new StreamingOutput { + override def write(outputStream: OutputStream) = { + Files.copy(dependency(applicationDependencies), outputStream) + } + } + } +} + +private case class PodNameAndNamespace(name: String, namespace: String) +private case class SparkApplicationDependencies( + driverPodNameAndNamespace: PodNameAndNamespace, + jarsTgz: File, + filesTgz: File, + kubernetesCredentials: KubernetesCredentials) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala new file mode 100644 index 000000000000..e181d240fd8d --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes.v2 + +import okhttp3.{RequestBody, ResponseBody} +import retrofit2.Call +import retrofit2.http.{Multipart, Streaming} + +import org.apache.spark.deploy.rest.KubernetesCredentials + +/** + * Retrofit-compatible variant of {@link KubernetesSparkDependencyService}. For documentation on + * how to use this service, see the aforementioned JAX-RS based interface. + */ +private[spark] trait KubernetesSparkDependencyServiceRetrofit { + + @Multipart + @retrofit2.http.PUT("/api/dependencies") + def uploadDependencies( + @retrofit2.http.Query("driverPodName") driverPodName: String, + @retrofit2.http.Query("driverPodNamespace") driverPodNamespace: String, + @retrofit2.http.Part("jars") jars: RequestBody, + @retrofit2.http.Part("files") files: RequestBody, + @retrofit2.http.Part("kubernetesCredentials") + kubernetesCredentials: RequestBody): Call[String] + + @Streaming + @retrofit2.http.GET("/api/dependencies/jars") + def downloadJars( + @retrofit2.http.Header("Authorization") applicationSecret: String): Call[ResponseBody] + + @Streaming + @retrofit2.http.GET("/api/dependencies/files") + def downloadFiles( + @retrofit2.http.Header("Authorization") applicationSecret: String): Call[ResponseBody] + + @retrofit2.http.GET("/api/dependencies/credentials") + def getKubernetesCredentials( + @retrofit2.http.Header("Authorization") applicationSecret: String) + : Call[KubernetesCredentials] +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/RetrofitUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/RetrofitUtils.scala new file mode 100644 index 000000000000..c5c5c0d35b7c --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/RetrofitUtils.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes.v2 + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import retrofit2.Retrofit +import retrofit2.converter.jackson.JacksonConverterFactory +import retrofit2.converter.scalars.ScalarsConverterFactory + +private[spark] object RetrofitUtils { + + private val OBJECT_MAPPER = new ObjectMapper().registerModule(new DefaultScalaModule) + + def createRetrofitClient[T](baseUrl: String, serviceType: Class[T]): T = { + new Retrofit.Builder() + .baseUrl(baseUrl) + .addConverterFactory(ScalarsConverterFactory.create()) + .addConverterFactory(JacksonConverterFactory.create(OBJECT_MAPPER)) + .build() + .create(serviceType) + } + +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala new file mode 100644 index 000000000000..bdf46f0d1ff2 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes.v2 + +import javax.ws.rs.core.MediaType + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.google.common.io.ByteStreams +import okhttp3.{RequestBody, ResponseBody} +import org.scalatest.BeforeAndAfterAll +import retrofit2.Call + +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.rest.KubernetesCredentials +import org.apache.spark.util.Utils + +/** + * Tests for KubernetesSparkDependencyServer and its APIs. Note that this is not an end-to-end + * integration test, and as such does not upload and download files in tar.gz as would be done + * in production. Thus we use the retrofit clients directly despite the fact that in practice + * we would likely want to create an opinionated abstraction on top of the retrofit client; we + * can test this abstraction layer separately, however. This test is mainly for checking that + * we've configured the Jetty server correctly and that the endpoints reached over HTTP can + * receive streamed uploads and can stream downloads. + */ +class KubernetesSparkDependencyServerSuite extends SparkFunSuite with BeforeAndAfterAll { + + private val serviceImpl = new KubernetesSparkDependencyServiceImpl(Utils.createTempDir()) + private val server = new KubernetesSparkDependencyServer(10021, serviceImpl) + private val OBJECT_MAPPER = new ObjectMapper().registerModule(new DefaultScalaModule) + + override def beforeAll(): Unit = { + server.start() + } + + override def afterAll(): Unit = { + server.stop() + } + + test("Accept file and jar uploads and downloads") { + val retrofitService = RetrofitUtils.createRetrofitClient("http://localhost:10021/", + classOf[KubernetesSparkDependencyServiceRetrofit]) + val jarsBytes = Array[Byte](1, 2, 3, 4) + val filesBytes = Array[Byte](5, 6, 7) + val jarsRequestBody = RequestBody.create( + okhttp3.MediaType.parse(MediaType.MULTIPART_FORM_DATA), jarsBytes) + val filesRequestBody = RequestBody.create( + okhttp3.MediaType.parse(MediaType.MULTIPART_FORM_DATA), filesBytes) + val kubernetesCredentials = KubernetesCredentials(Some("token"), Some("ca-cert"), None, None) + val kubernetesCredentialsString = OBJECT_MAPPER.writer() + .writeValueAsString(kubernetesCredentials) + val readKubernetesCredentials: KubernetesCredentials = OBJECT_MAPPER + .readerFor(classOf[KubernetesCredentials]) + .readValue(kubernetesCredentialsString) + val kubernetesCredentialsBody = RequestBody.create( + okhttp3.MediaType.parse(MediaType.APPLICATION_JSON), kubernetesCredentialsString) + val uploadResponse = retrofitService.uploadDependencies("podName", "podNamespace", + jarsRequestBody, filesRequestBody, kubernetesCredentialsBody) + val secret = getTypedResponseResult(uploadResponse) + + checkResponseBodyBytesMatches(retrofitService.downloadJars(secret), + jarsBytes) + checkResponseBodyBytesMatches(retrofitService.downloadFiles(secret), + filesBytes) + val downloadedCredentials = getTypedResponseResult( + retrofitService.getKubernetesCredentials(secret)) + assert(downloadedCredentials === kubernetesCredentials) + } + + private def getTypedResponseResult[T](call: Call[T]): T = { + val response = call.execute() + assert(response.code() >= 200 && response.code() < 300, Option(response.errorBody()) + .map(_.string()) + .getOrElse("Error executing HTTP request, but error body was not provided.")) + val callResult = response.body() + assert(callResult != null) + callResult + } + + private def checkResponseBodyBytesMatches(call: Call[ResponseBody], bytes: Array[Byte]): Unit = { + val responseBody = getTypedResponseResult(call) + val downloadedBytes = ByteStreams.toByteArray(responseBody.byteStream()) + assert(downloadedBytes.toSeq === bytes) + } + +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala new file mode 100644 index 000000000000..65c928ab8e83 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes.v2 + +import java.util.UUID + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.SparkFunSuite +import org.apache.spark.util.Utils + +/** + * Unit, scala-level tests for KubernetesSparkDependencyServiceImpl. The coverage here + * differs from that of KubernetesSparkDependencyServerSuite as here we invoke the + * implementation methods directly as opposed to over HTTP. + */ +class KubernetesSparkDependencyServiceImplSuite extends SparkFunSuite with BeforeAndAfter { + + private val dependencyRootDir = Utils.createTempDir() + private val serviceImpl = new KubernetesSparkDependencyServiceImpl(dependencyRootDir) + private val jarsBytes = Array[Byte](1, 2, 3, 4) + private val filesBytes = Array[Byte](5, 6, 7) + private var podName: String = _ + private var podNamespace: String = _ + + before { + podName = UUID.randomUUID().toString + podNamespace = UUID.randomUUID().toString + } + + test("Uploads should write to a directory in the underlying disk") { + // TODO + } + +} From 929d8809e3e162d55c118d943b4fcd67474009e4 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 3 Apr 2017 17:30:36 -0700 Subject: [PATCH 02/22] Move packages around to split between v1 work and v2 work --- dev/.rat-excludes | 2 +- ....kubernetes.submit.v1.DriverServiceManager | 2 ++ ...eploy.rest.kubernetes.DriverServiceManager | 2 -- .../spark/deploy/kubernetes/config.scala | 2 +- .../kubernetes/{ => submit/v1}/Client.scala | 5 ++-- .../submit/v1}/CompressionUtils.scala | 4 +-- ...iverPodKubernetesCredentialsProvider.scala | 4 +-- .../submit/v1}/DriverServiceManager.scala | 19 +------------ ...DriverSubmitSslConfigurationProvider.scala | 4 +-- ...rnalSuppliedUrisDriverServiceManager.scala | 4 +-- .../v1}/KubernetesResourceCleaner.scala | 2 +- .../v1}/LoggingPodStatusWatcher.scala | 5 ++-- .../NodePortUrisDriverServiceManager.scala | 2 +- .../kubernetes/{ => v1}/HttpClientUtil.scala | 2 +- .../{ => v1}/KubernetesFileUtils.scala | 2 +- .../v1}/KubernetesRestProtocolMessages.scala | 3 +- .../{ => v1}/KubernetesSparkRestApi.scala | 4 +-- .../{ => v1}/KubernetesSparkRestServer.scala | 5 ++-- .../{ => v1}/MultiServerFeignTarget.scala | 2 +- .../{ => v1}/PemsToKeyStoreConverter.scala | 2 +- .../v2/KubernetesSparkDependencyServer.scala | 6 ++++ .../v2/KubernetesSparkDependencyService.scala | 4 +-- ...KubernetesSparkDependencyServiceImpl.scala | 2 +- ...rnetesSparkDependencyServiceRetrofit.scala | 2 +- .../kubernetes/KubernetesClientBuilder.scala | 2 +- .../KubernetesClusterSchedulerBackend.scala | 1 - .../src/main/docker/driver/Dockerfile | 2 +- .../integrationtest/KubernetesSuite.scala | 3 +- .../integrationtest/minikube/Minikube.scala | 2 +- ...KubernetesSparkDependencyServerSuite.scala | 5 +--- ...netesSparkDependencyServiceImplSuite.scala | 28 +++++++++++++++++-- 31 files changed, 70 insertions(+), 64 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.kubernetes.submit.v1.DriverServiceManager delete mode 100644 resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.rest.kubernetes.DriverServiceManager rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/{ => submit/v1}/Client.scala (99%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/{rest/kubernetes => kubernetes/submit/v1}/CompressionUtils.scala (98%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/{ => submit/v1}/DriverPodKubernetesCredentialsProvider.scala (96%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/{rest/kubernetes => kubernetes/submit/v1}/DriverServiceManager.scala (77%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/{ => submit/v1}/DriverSubmitSslConfigurationProvider.scala (99%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/{rest/kubernetes => kubernetes/submit/v1}/ExternalSuppliedUrisDriverServiceManager.scala (98%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/{ => submit/v1}/KubernetesResourceCleaner.scala (97%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/{ => submit/v1}/LoggingPodStatusWatcher.scala (98%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/{rest/kubernetes => kubernetes/submit/v1}/NodePortUrisDriverServiceManager.scala (98%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/{ => v1}/HttpClientUtil.scala (98%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/{ => v1}/KubernetesFileUtils.scala (96%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/{ => kubernetes/v1}/KubernetesRestProtocolMessages.scala (94%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/{ => v1}/KubernetesSparkRestApi.scala (89%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/{ => v1}/KubernetesSparkRestServer.scala (98%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/{ => v1}/MultiServerFeignTarget.scala (98%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/{ => v1}/PemsToKeyStoreConverter.scala (99%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/{deploy => scheduler/cluster}/kubernetes/KubernetesClientBuilder.scala (98%) diff --git a/dev/.rat-excludes b/dev/.rat-excludes index f69567d8f675..6a805b3293a6 100644 --- a/dev/.rat-excludes +++ b/dev/.rat-excludes @@ -103,4 +103,4 @@ org.apache.spark.scheduler.ExternalClusterManager org.apache.spark.deploy.yarn.security.ServiceCredentialProvider spark-warehouse structured-streaming/* -org.apache.spark.deploy.rest.kubernetes.DriverServiceManager +org.apache.spark.deploy.kubernetes.submit.v1.DriverServiceManager diff --git a/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.kubernetes.submit.v1.DriverServiceManager b/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.kubernetes.submit.v1.DriverServiceManager new file mode 100644 index 000000000000..2ed0387c51bc --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.kubernetes.submit.v1.DriverServiceManager @@ -0,0 +1,2 @@ +org.apache.spark.deploy.kubernetes.submit.v1.ExternalSuppliedUrisDriverServiceManager +org.apache.spark.deploy.kubernetes.submit.v1.NodePortUrisDriverServiceManager diff --git a/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.rest.kubernetes.DriverServiceManager b/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.rest.kubernetes.DriverServiceManager deleted file mode 100644 index 56203ee38ac9..000000000000 --- a/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.rest.kubernetes.DriverServiceManager +++ /dev/null @@ -1,2 +0,0 @@ -org.apache.spark.deploy.rest.kubernetes.ExternalSuppliedUrisDriverServiceManager -org.apache.spark.deploy.rest.kubernetes.NodePortUrisDriverServiceManager diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 3328809e186e..3538c7273cef 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -18,8 +18,8 @@ package org.apache.spark.deploy.kubernetes import java.util.concurrent.TimeUnit +import org.apache.spark.deploy.kubernetes.submit.v1.NodePortUrisDriverServiceManager import org.apache.spark.{SPARK_VERSION => sparkVersion} -import org.apache.spark.deploy.rest.kubernetes.NodePortUrisDriverServiceManager import org.apache.spark.internal.config.ConfigBuilder import org.apache.spark.network.util.ByteUnit diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala similarity index 99% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala index e628464aa620..951bf7731783 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.kubernetes +package org.apache.spark.deploy.kubernetes.submit.v1 import java.io.File import java.security.SecureRandom @@ -32,8 +32,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, KubernetesCredentials, RemoteAppResource, UploadedAppResource} -import org.apache.spark.deploy.rest.kubernetes._ +import org.apache.spark.deploy.rest.kubernetes.v1.{AppResource, ContainerAppResource, HttpClientUtil, KubernetesCreateSubmissionRequest, KubernetesCredentials, KubernetesFileUtils, KubernetesSparkRestApi, RemoteAppResource, UploadedAppResource} import org.apache.spark.internal.Logging import org.apache.spark.util.{ShutdownHookManager, Utils} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/CompressionUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/CompressionUtils.scala similarity index 98% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/CompressionUtils.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/CompressionUtils.scala index 7204cb874aae..8296218ba1f7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/CompressionUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/CompressionUtils.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.rest.kubernetes +package org.apache.spark.deploy.kubernetes.submit.v1 import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream} import java.util.zip.{GZIPInputStream, GZIPOutputStream} @@ -26,7 +26,7 @@ import org.apache.commons.compress.utils.CharsetNames import org.apache.commons.io.IOUtils import scala.collection.mutable -import org.apache.spark.deploy.rest.TarGzippedData +import org.apache.spark.deploy.rest.kubernetes.v1.TarGzippedData import org.apache.spark.internal.Logging import org.apache.spark.util.{ByteBufferOutputStream, Utils} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverPodKubernetesCredentialsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/DriverPodKubernetesCredentialsProvider.scala similarity index 96% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverPodKubernetesCredentialsProvider.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/DriverPodKubernetesCredentialsProvider.scala index cee47aad7939..bc7490ef9ec4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverPodKubernetesCredentialsProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/DriverPodKubernetesCredentialsProvider.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.kubernetes +package org.apache.spark.deploy.kubernetes.submit.v1 import java.io.File @@ -22,7 +22,7 @@ import com.google.common.io.{BaseEncoding, Files} import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.config._ -import org.apache.spark.deploy.rest.KubernetesCredentials +import org.apache.spark.deploy.rest.kubernetes.v1.KubernetesCredentials import org.apache.spark.internal.config.OptionalConfigEntry private[spark] class DriverPodKubernetesCredentialsProvider(sparkConf: SparkConf) { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/DriverServiceManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/DriverServiceManager.scala similarity index 77% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/DriverServiceManager.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/DriverServiceManager.scala index d92c0247e2a3..f21fceb861ba 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/DriverServiceManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/DriverServiceManager.scala @@ -1,21 +1,4 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.rest.kubernetes +package org.apache.spark.deploy.kubernetes.submit.v1 import io.fabric8.kubernetes.api.model.{Service, ServiceBuilder} import io.fabric8.kubernetes.client.KubernetesClient diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverSubmitSslConfigurationProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/DriverSubmitSslConfigurationProvider.scala similarity index 99% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverSubmitSslConfigurationProvider.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/DriverSubmitSslConfigurationProvider.scala index a83c9a9896a0..10ffddcd7e7f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverSubmitSslConfigurationProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/DriverSubmitSslConfigurationProvider.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.kubernetes +package org.apache.spark.deploy.kubernetes.submit.v1 import java.io.{File, FileInputStream} import java.security.{KeyStore, SecureRandom} @@ -29,7 +29,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SparkException, SSLOptions} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.rest.kubernetes.{KubernetesFileUtils, PemsToKeyStoreConverter} +import org.apache.spark.deploy.rest.kubernetes.v1.{KubernetesFileUtils, PemsToKeyStoreConverter} import org.apache.spark.util.Utils /** diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ExternalSuppliedUrisDriverServiceManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/ExternalSuppliedUrisDriverServiceManager.scala similarity index 98% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ExternalSuppliedUrisDriverServiceManager.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/ExternalSuppliedUrisDriverServiceManager.scala index 257571b5a9d3..39beaa853e8e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ExternalSuppliedUrisDriverServiceManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/ExternalSuppliedUrisDriverServiceManager.scala @@ -14,14 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.rest.kubernetes +package org.apache.spark.deploy.kubernetes.submit.v1 import java.util.concurrent.TimeUnit import com.google.common.util.concurrent.SettableFuture import io.fabric8.kubernetes.api.model.{Service, ServiceBuilder} -import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch, Watcher} import io.fabric8.kubernetes.client.Watcher.Action +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch, Watcher} import scala.collection.JavaConverters._ import org.apache.spark.SparkConf diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesResourceCleaner.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/KubernetesResourceCleaner.scala similarity index 97% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesResourceCleaner.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/KubernetesResourceCleaner.scala index 6329bb135951..266ec652ed8a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesResourceCleaner.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/KubernetesResourceCleaner.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.kubernetes +package org.apache.spark.deploy.kubernetes.submit.v1 import io.fabric8.kubernetes.api.model.HasMetadata import io.fabric8.kubernetes.client.KubernetesClient diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala similarity index 98% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala index 17c3db8331ac..7be334194d9d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala @@ -14,15 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.kubernetes +package org.apache.spark.deploy.kubernetes.submit.v1 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} -import scala.collection.JavaConverters._ - import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action +import scala.collection.JavaConverters._ import org.apache.spark.internal.Logging diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/NodePortUrisDriverServiceManager.scala similarity index 98% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/NodePortUrisDriverServiceManager.scala index 141647682479..965d71917403 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/NodePortUrisDriverServiceManager.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.rest.kubernetes +package org.apache.spark.deploy.kubernetes.submit.v1 import io.fabric8.kubernetes.api.model.{Service, ServiceBuilder} import scala.collection.JavaConverters._ diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/HttpClientUtil.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/HttpClientUtil.scala similarity index 98% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/HttpClientUtil.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/HttpClientUtil.scala index 576f7058f20e..45eede8bf8e5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/HttpClientUtil.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/HttpClientUtil.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.rest.kubernetes +package org.apache.spark.deploy.rest.kubernetes.v1 import javax.net.ssl.{SSLContext, SSLSocketFactory, X509TrustManager} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesFileUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesFileUtils.scala similarity index 96% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesFileUtils.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesFileUtils.scala index f30be1535f81..b8e644219097 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesFileUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesFileUtils.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.rest.kubernetes +package org.apache.spark.deploy.rest.kubernetes.v1 import org.apache.spark.util.Utils diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesRestProtocolMessages.scala similarity index 94% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesRestProtocolMessages.scala index 1ea44109c5f5..0cc178c9f5ab 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesRestProtocolMessages.scala @@ -14,11 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.rest +package org.apache.spark.deploy.rest.kubernetes.v1 import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo} import org.apache.spark.SPARK_VERSION +import org.apache.spark.deploy.rest.{SubmitRestProtocolRequest, SubmitRestProtocolResponse} case class KubernetesCredentials( oauthToken: Option[String], diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestApi.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestApi.scala similarity index 89% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestApi.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestApi.scala index 18eb9b7a12ca..270e7ea0e77b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestApi.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestApi.scala @@ -14,12 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.rest.kubernetes +package org.apache.spark.deploy.rest.kubernetes.v1 import javax.ws.rs.{Consumes, GET, Path, POST, Produces} import javax.ws.rs.core.MediaType -import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KubernetesCreateSubmissionRequest, PingResponse} +import org.apache.spark.deploy.rest.CreateSubmissionResponse @Path("/v1/submissions/") trait KubernetesSparkRestApi { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala similarity index 98% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala index 4ca01b2f6bd3..70499a7f26f2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.rest.kubernetes +package org.apache.spark.deploy.rest.kubernetes.v1 import java.io.{File, FileOutputStream, StringReader} import java.net.URI @@ -31,9 +31,10 @@ import org.apache.commons.lang3.RandomStringUtils import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SparkException, SSLOptions} +import org.apache.spark.{SecurityManager, SparkConf, SparkException, SPARK_VERSION => sparkVersion, SSLOptions} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.submit.v1.CompressionUtils import org.apache.spark.deploy.rest._ import org.apache.spark.internal.config.OptionalConfigEntry import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/MultiServerFeignTarget.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/MultiServerFeignTarget.scala similarity index 98% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/MultiServerFeignTarget.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/MultiServerFeignTarget.scala index 51313e00ce2d..56ff82ea2fc3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/MultiServerFeignTarget.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/MultiServerFeignTarget.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.rest.kubernetes +package org.apache.spark.deploy.rest.kubernetes.v1 import feign.{Request, RequestTemplate, RetryableException, Retryer, Target} import scala.reflect.ClassTag diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/PemsToKeyStoreConverter.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/PemsToKeyStoreConverter.scala similarity index 99% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/PemsToKeyStoreConverter.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/PemsToKeyStoreConverter.scala index e5c43560eccb..da863a9fb48e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/PemsToKeyStoreConverter.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/PemsToKeyStoreConverter.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.rest.kubernetes +package org.apache.spark.deploy.rest.kubernetes.v1 import java.io.{File, FileInputStream, FileOutputStream, InputStreamReader} import java.nio.file.Paths diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala index ace750f1927f..7f2a1db822d2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala @@ -59,3 +59,9 @@ private[spark] class KubernetesSparkDependencyServer( jettyServer = None } } + +object KubernetesSparkDependencyServer { + def main(args: Array[String]): Unit = { + + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala index a1e879231a7e..5179d0e1fcf5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala @@ -17,12 +17,12 @@ package org.apache.spark.deploy.rest.kubernetes.v2 import java.io.InputStream -import javax.ws.rs.{Consumes, GET, HeaderParam, Path, Produces, PUT, QueryParam} +import javax.ws.rs.{Consumes, GET, HeaderParam, PUT, Path, Produces, QueryParam} import javax.ws.rs.core.{MediaType, StreamingOutput} import org.glassfish.jersey.media.multipart.FormDataParam -import org.apache.spark.deploy.rest.KubernetesCredentials +import org.apache.spark.deploy.rest.kubernetes.v1.KubernetesCredentials /** * Service that receives application data that can be received later on. This is primarily used diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala index d1892f0f455c..6c8fd050325e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala @@ -25,7 +25,7 @@ import com.google.common.io.{BaseEncoding, ByteStreams, Files} import scala.collection.mutable import org.apache.spark.SparkException -import org.apache.spark.deploy.rest.KubernetesCredentials +import org.apache.spark.deploy.rest.kubernetes.v1.KubernetesCredentials import org.apache.spark.util.Utils private[spark] class KubernetesSparkDependencyServiceImpl(dependenciesRootDir: File) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala index e181d240fd8d..107f578c620e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala @@ -20,7 +20,7 @@ import okhttp3.{RequestBody, ResponseBody} import retrofit2.Call import retrofit2.http.{Multipart, Streaming} -import org.apache.spark.deploy.rest.KubernetesCredentials +import org.apache.spark.deploy.rest.kubernetes.v1.KubernetesCredentials /** * Retrofit-compatible variant of {@link KubernetesSparkDependencyService}. For documentation on diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClientBuilder.scala similarity index 98% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClientBuilder.scala index 554ed17ff25c..6725992aae97 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClientBuilder.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.kubernetes +package org.apache.spark.scheduler.cluster.kubernetes import java.io.File diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 234829a541c3..362a7d685c5c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -23,7 +23,6 @@ import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkException} -import org.apache.spark.deploy.kubernetes.KubernetesClientBuilder import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.rpc.RpcEndpointAddress diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile index 1f35e7e5eb20..8ab7a5870450 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile @@ -44,7 +44,7 @@ CMD SSL_ARGS="" && \ if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-key-password-file $SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE"; fi && \ if ! [ -z ${SPARK_SUBMISSION_KEY_PEM_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --key-pem-file $SPARK_SUBMISSION_KEY_PEM_FILE"; fi && \ if ! [ -z ${SPARK_SUBMISSION_CERT_PEM_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --cert-pem-file $SPARK_SUBMISSION_CERT_PEM_FILE"; fi && \ - exec bin/spark-class org.apache.spark.deploy.rest.kubernetes.KubernetesSparkRestServer \ + exec bin/spark-class org.apache.spark.deploy.rest.kubernetes.v1.KubernetesSparkRestServer \ --hostname $HOSTNAME \ --port $SPARK_SUBMISSION_SERVER_PORT \ --secret-file $SPARK_SUBMISSION_SECRET_LOCATION \ diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 0e55e64fd1d7..8deb790f4b7a 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -35,14 +35,13 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.deploy.SparkSubmit -import org.apache.spark.deploy.kubernetes.Client import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1 import org.apache.spark.deploy.kubernetes.integrationtest.sslutil.SSLUtils -import org.apache.spark.deploy.rest.kubernetes.ExternalSuppliedUrisDriverServiceManager +import org.apache.spark.deploy.kubernetes.submit.v1.{Client, ExternalSuppliedUrisDriverServiceManager} import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus} import org.apache.spark.util.Utils diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala index 07274bf962dd..81491be944d3 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala @@ -27,7 +27,7 @@ import io.fabric8.kubernetes.client.internal.SSLUtils import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import org.apache.spark.deploy.rest.kubernetes.HttpClientUtil +import org.apache.spark.deploy.rest.kubernetes.v1.HttpClientUtil import org.apache.spark.internal.Logging import org.apache.spark.util.Utils diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala index bdf46f0d1ff2..d827b133b283 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala @@ -26,7 +26,7 @@ import org.scalatest.BeforeAndAfterAll import retrofit2.Call import org.apache.spark.SparkFunSuite -import org.apache.spark.deploy.rest.KubernetesCredentials +import org.apache.spark.deploy.rest.kubernetes.v1.KubernetesCredentials import org.apache.spark.util.Utils /** @@ -64,9 +64,6 @@ class KubernetesSparkDependencyServerSuite extends SparkFunSuite with BeforeAndA val kubernetesCredentials = KubernetesCredentials(Some("token"), Some("ca-cert"), None, None) val kubernetesCredentialsString = OBJECT_MAPPER.writer() .writeValueAsString(kubernetesCredentials) - val readKubernetesCredentials: KubernetesCredentials = OBJECT_MAPPER - .readerFor(classOf[KubernetesCredentials]) - .readValue(kubernetesCredentialsString) val kubernetesCredentialsBody = RequestBody.create( okhttp3.MediaType.parse(MediaType.APPLICATION_JSON), kubernetesCredentialsString) val uploadResponse = retrofitService.uploadDependencies("podName", "podNamespace", diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala index 65c928ab8e83..11327c0ce435 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala @@ -16,11 +16,15 @@ */ package org.apache.spark.deploy.rest.kubernetes.v2 +import java.io.ByteArrayInputStream +import java.nio.file.Paths import java.util.UUID +import com.google.common.io.Files import org.scalatest.BeforeAndAfter import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.rest.kubernetes.v1.KubernetesCredentials import org.apache.spark.util.Utils /** @@ -34,6 +38,8 @@ class KubernetesSparkDependencyServiceImplSuite extends SparkFunSuite with Befor private val serviceImpl = new KubernetesSparkDependencyServiceImpl(dependencyRootDir) private val jarsBytes = Array[Byte](1, 2, 3, 4) private val filesBytes = Array[Byte](5, 6, 7) + private val kubernetesCredentials = KubernetesCredentials( + Some("token"), Some("caCert"), Some("key"), Some("cert")) private var podName: String = _ private var podNamespace: String = _ @@ -42,8 +48,24 @@ class KubernetesSparkDependencyServiceImplSuite extends SparkFunSuite with Befor podNamespace = UUID.randomUUID().toString } - test("Uploads should write to a directory in the underlying disk") { - // TODO + test("Uploads should write data to the underlying disk") { + Utils.tryWithResource(new ByteArrayInputStream(jarsBytes)) { jarsStream => + Utils.tryWithResource(new ByteArrayInputStream(filesBytes)) { filesStream => + serviceImpl.uploadDependencies( + "name", "namespace", jarsStream, filesStream, kubernetesCredentials) + } + } + val jarsTgz = Paths.get(dependencyRootDir.getAbsolutePath, "namespace", "name", "jars.tgz") + .toFile + assert(jarsTgz.isFile, + s"Jars written to ${jarsTgz.getAbsolutePath} does not exist or is not a file.") + val jarsTgzBytes = Files.toByteArray(jarsTgz) + assert(jarsBytes.toSeq === jarsTgzBytes.toSeq, "Incorrect jars bytes were written.") + val filesTgz = Paths.get(dependencyRootDir.getAbsolutePath, "namespace", "name", "files.tgz") + .toFile + assert(filesTgz.isFile, + s"Files written to ${filesTgz.getAbsolutePath} does not exist or is not a file.") + val filesTgzBytes = Files.toByteArray(filesTgz) + assert(filesBytes.toSeq === filesTgzBytes.toSeq, "Incorrect files bytes were written.") } - } From 6e40c4c9b4d5e4d40949026193deebab1bb8ae11 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 3 Apr 2017 18:41:56 -0700 Subject: [PATCH 03/22] Add unit test for file writing --- ...netesSparkDependencyServiceImplSuite.scala | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala index 65c928ab8e83..ab91fa2db6fd 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala @@ -16,11 +16,15 @@ */ package org.apache.spark.deploy.rest.kubernetes.v2 +import java.io.ByteArrayInputStream +import java.nio.file.Paths import java.util.UUID +import com.google.common.io.Files import org.scalatest.BeforeAndAfter import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.rest.KubernetesCredentials import org.apache.spark.util.Utils /** @@ -34,6 +38,8 @@ class KubernetesSparkDependencyServiceImplSuite extends SparkFunSuite with Befor private val serviceImpl = new KubernetesSparkDependencyServiceImpl(dependencyRootDir) private val jarsBytes = Array[Byte](1, 2, 3, 4) private val filesBytes = Array[Byte](5, 6, 7) + private val kubernetesCredentials = KubernetesCredentials( + Some("token"), Some("caCert"), Some("key"), Some("cert")) private var podName: String = _ private var podNamespace: String = _ @@ -42,8 +48,25 @@ class KubernetesSparkDependencyServiceImplSuite extends SparkFunSuite with Befor podNamespace = UUID.randomUUID().toString } - test("Uploads should write to a directory in the underlying disk") { - // TODO + test("Uploads should write data to the underlying disk") { + Utils.tryWithResource(new ByteArrayInputStream(jarsBytes)) { jarsStream => + Utils.tryWithResource(new ByteArrayInputStream(filesBytes)) { filesStream => + serviceImpl.uploadDependencies( + "name", "namespace", jarsStream, filesStream, kubernetesCredentials) + } + } + val jarsTgz = Paths.get(dependencyRootDir.getAbsolutePath, "namespace", "name", "jars.tgz") + .toFile + assert(jarsTgz.isFile, + s"Jars written to ${jarsTgz.getAbsolutePath} does not exist or is not a file.") + val jarsTgzBytes = Files.toByteArray(jarsTgz) + assert(jarsBytes.toSeq === jarsTgzBytes.toSeq, "Incorrect jars bytes were written.") + val filesTgz = Paths.get(dependencyRootDir.getAbsolutePath, "namespace", "name", "files.tgz") + .toFile + assert(filesTgz.isFile, + s"Files written to ${filesTgz.getAbsolutePath} does not exist or is not a file.") + val filesTgzBytes = Files.toByteArray(filesTgz) + assert(filesBytes.toSeq === filesTgzBytes.toSeq, "Incorrect files bytes were written.") } } From 89c295e042aed4f944d209497615baaf96b4f0fe Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 5 Apr 2017 13:53:10 -0700 Subject: [PATCH 04/22] Remove unnecessary main --- .../kubernetes/v2/KubernetesSparkDependencyServer.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala index 7f2a1db822d2..ace750f1927f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala @@ -59,9 +59,3 @@ private[spark] class KubernetesSparkDependencyServer( jettyServer = None } } - -object KubernetesSparkDependencyServer { - def main(args: Array[String]): Unit = { - - } -} From 876bb4a85e629e4e7d6789f72d025797ac809fed Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 5 Apr 2017 17:17:22 -0700 Subject: [PATCH 05/22] Add back license header --- .../submit/v1/DriverServiceManager.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/DriverServiceManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/DriverServiceManager.scala index f21fceb861ba..c7d394fcf00a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/DriverServiceManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/DriverServiceManager.scala @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.deploy.kubernetes.submit.v1 import io.fabric8.kubernetes.api.model.{Service, ServiceBuilder} From 7d00f0736a8e45e6f838112e0e66417d0def79c7 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 5 Apr 2017 17:18:20 -0700 Subject: [PATCH 06/22] Minor fixes --- .../rest/kubernetes/v2/KubernetesSparkDependencyServer.scala | 2 +- .../kubernetes/v2/KubernetesSparkDependencyServerSuite.scala | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala index ace750f1927f..2b1022a7dcc9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.rest.kubernetes.v2 import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.jaxrs.json.{JacksonJaxbJsonProvider, JacksonJsonProvider} +import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.eclipse.jetty.server.{Server, ServerConnector} import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala index bdf46f0d1ff2..ad86a29bb308 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala @@ -64,9 +64,6 @@ class KubernetesSparkDependencyServerSuite extends SparkFunSuite with BeforeAndA val kubernetesCredentials = KubernetesCredentials(Some("token"), Some("ca-cert"), None, None) val kubernetesCredentialsString = OBJECT_MAPPER.writer() .writeValueAsString(kubernetesCredentials) - val readKubernetesCredentials: KubernetesCredentials = OBJECT_MAPPER - .readerFor(classOf[KubernetesCredentials]) - .readValue(kubernetesCredentialsString) val kubernetesCredentialsBody = RequestBody.create( okhttp3.MediaType.parse(MediaType.APPLICATION_JSON), kubernetesCredentialsString) val uploadResponse = retrofitService.uploadDependencies("podName", "podNamespace", From baf10aa6f460a170bfbedad867a8bc3aba30b499 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 6 Apr 2017 13:42:46 -0700 Subject: [PATCH 07/22] Fix integration test with renamed package for client. Fix scalastyle. --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 +- .../main/scala/org/apache/spark/deploy/kubernetes/config.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 002b29d5564e..aeccd0088d76 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -619,7 +619,7 @@ object SparkSubmit { } if (isKubernetesCluster) { - childMainClass = "org.apache.spark.deploy.kubernetes.Client" + childMainClass = "org.apache.spark.deploy.kubernetes.submit.v1.Client" childArgs += args.primaryResource childArgs += args.mainClass childArgs ++= args.childArgs diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 3538c7273cef..e403a6e8b927 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -18,8 +18,8 @@ package org.apache.spark.deploy.kubernetes import java.util.concurrent.TimeUnit -import org.apache.spark.deploy.kubernetes.submit.v1.NodePortUrisDriverServiceManager import org.apache.spark.{SPARK_VERSION => sparkVersion} +import org.apache.spark.deploy.kubernetes.submit.v1.NodePortUrisDriverServiceManager import org.apache.spark.internal.config.ConfigBuilder import org.apache.spark.network.util.ByteUnit From d20537d0e21342b0bc3030ed401d41460e3f71dc Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 10 Apr 2017 12:32:36 -0700 Subject: [PATCH 08/22] Force json serialization to consider the different package. --- .../kubernetes/v1/KubernetesRestProtocolMessages.scala | 9 ++++++++- .../rest/kubernetes/v1/KubernetesSparkRestServer.scala | 8 +++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesRestProtocolMessages.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesRestProtocolMessages.scala index 0cc178c9f5ab..cd1f9dcdf587 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesRestProtocolMessages.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesRestProtocolMessages.scala @@ -16,10 +16,11 @@ */ package org.apache.spark.deploy.rest.kubernetes.v1 -import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo} +import com.fasterxml.jackson.annotation.{JsonIgnore, JsonSubTypes, JsonTypeInfo} import org.apache.spark.SPARK_VERSION import org.apache.spark.deploy.rest.{SubmitRestProtocolRequest, SubmitRestProtocolResponse} +import org.apache.spark.util.Utils case class KubernetesCredentials( oauthToken: Option[String], @@ -36,6 +37,9 @@ case class KubernetesCreateSubmissionRequest( driverPodKubernetesCredentials: KubernetesCredentials, uploadedJarsBase64Contents: TarGzippedData, uploadedFilesBase64Contents: TarGzippedData) extends SubmitRestProtocolRequest { + @JsonIgnore + override val messageType: String = s"kubernetes.v1.${Utils.getFormattedClassName(this)}" + override val action = messageType message = "create" clientSparkVersion = SPARK_VERSION } @@ -69,5 +73,8 @@ class PingResponse extends SubmitRestProtocolResponse { val text = "pong" message = "pong" serverSparkVersion = SPARK_VERSION + @JsonIgnore + override val messageType: String = s"kubernetes.v1.${Utils.getFormattedClassName(this)}" + override val action: String = messageType } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala index 70499a7f26f2..2ab3d596cf6d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala @@ -138,7 +138,13 @@ private[spark] class KubernetesSparkRestServer( protected override def doGet( request: HttpServletRequest, response: HttpServletResponse): Unit = { - sendResponse(new PingResponse, response) + try { + sendResponse(new PingResponse, response) + } catch { + case e: Throwable => + logError("Failed to create ping.", e) + throw e + } } } From 2ca73a38da1676e664f3470006603b58e96e61dc Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 10 Apr 2017 12:34:11 -0700 Subject: [PATCH 09/22] Revert extraneous log --- .../rest/kubernetes/v1/KubernetesSparkRestServer.scala | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala index 2ab3d596cf6d..70499a7f26f2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala @@ -138,13 +138,7 @@ private[spark] class KubernetesSparkRestServer( protected override def doGet( request: HttpServletRequest, response: HttpServletResponse): Unit = { - try { - sendResponse(new PingResponse, response) - } catch { - case e: Throwable => - logError("Failed to create ping.", e) - throw e - } + sendResponse(new PingResponse, response) } } From ffe8c6c45b36948777d80f52ed36f4c9c8daddbe Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 10 Apr 2017 13:25:15 -0700 Subject: [PATCH 10/22] Fix scalastyle --- .../submit/v1/ExternalSuppliedUrisDriverServiceManager.scala | 2 +- .../deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala | 2 +- .../rest/kubernetes/v2/KubernetesSparkDependencyService.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/ExternalSuppliedUrisDriverServiceManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/ExternalSuppliedUrisDriverServiceManager.scala index 39beaa853e8e..4c784aeb5692 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/ExternalSuppliedUrisDriverServiceManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/ExternalSuppliedUrisDriverServiceManager.scala @@ -20,8 +20,8 @@ import java.util.concurrent.TimeUnit import com.google.common.util.concurrent.SettableFuture import io.fabric8.kubernetes.api.model.{Service, ServiceBuilder} -import io.fabric8.kubernetes.client.Watcher.Action import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action import scala.collection.JavaConverters._ import org.apache.spark.SparkConf diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala index 70499a7f26f2..048427fa4ec2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala @@ -31,7 +31,7 @@ import org.apache.commons.lang3.RandomStringUtils import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{SecurityManager, SparkConf, SparkException, SPARK_VERSION => sparkVersion, SSLOptions} +import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SparkException, SSLOptions} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.submit.v1.CompressionUtils diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala index 5179d0e1fcf5..9eaea18bf2ea 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.rest.kubernetes.v2 import java.io.InputStream -import javax.ws.rs.{Consumes, GET, HeaderParam, PUT, Path, Produces, QueryParam} +import javax.ws.rs.{Consumes, GET, HeaderParam, Path, Produces, PUT, QueryParam} import javax.ws.rs.core.{MediaType, StreamingOutput} import org.glassfish.jersey.media.multipart.FormDataParam From 3dd3504da1da80c1e22c8f0512f78f92e5228ccb Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 11 Apr 2017 15:33:34 -0700 Subject: [PATCH 11/22] Remove getting credentials from the API We still want to post them because in the future we can use these credentials to monitor the API server and handle cleaning up the data accordingly. --- .../v2/KubernetesSparkDependencyService.scala | 10 ---------- .../v2/KubernetesSparkDependencyServiceImpl.scala | 8 -------- .../v2/KubernetesSparkDependencyServiceRetrofit.scala | 5 ----- .../v2/KubernetesSparkDependencyServerSuite.scala | 3 --- 4 files changed, 26 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala index a1e879231a7e..83d704c398b6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala @@ -93,14 +93,4 @@ private[spark] trait KubernetesSparkDependencyService { @Path("/dependencies/files") def downloadFiles( @HeaderParam("Authorization") applicationSecret: String): StreamingOutput - - /** - * Retrieve the Kubernetes credentials being used to monitor the Spark application. - */ - @GET - @Consumes(Array(MediaType.APPLICATION_JSON)) - @Produces(Array(MediaType.APPLICATION_JSON)) - @Path("/dependencies/credentials") - def getKubernetesCredentials( - @HeaderParam("Authorization") applicationSecret: String): KubernetesCredentials } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala index d1892f0f455c..58067d5d7a23 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala @@ -98,14 +98,6 @@ private[spark] class KubernetesSparkDependencyServiceImpl(dependenciesRootDir: F appFileToStreamingOutput(applicationSecret, _.filesTgz) } - override def getKubernetesCredentials(applicationSecret: String): KubernetesCredentials = { - SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized { - sparkApplicationDependencies.get(applicationSecret) - .map(_.kubernetesCredentials) - .getOrElse(throw new SparkException("No application found for the provided token.")) - } - } - private def appFileToStreamingOutput( applicationSecret: String, dependency: (SparkApplicationDependencies => File)): StreamingOutput = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala index e181d240fd8d..86809ff695d6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala @@ -47,9 +47,4 @@ private[spark] trait KubernetesSparkDependencyServiceRetrofit { @retrofit2.http.GET("/api/dependencies/files") def downloadFiles( @retrofit2.http.Header("Authorization") applicationSecret: String): Call[ResponseBody] - - @retrofit2.http.GET("/api/dependencies/credentials") - def getKubernetesCredentials( - @retrofit2.http.Header("Authorization") applicationSecret: String) - : Call[KubernetesCredentials] } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala index ad86a29bb308..5fceb5664c00 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala @@ -74,9 +74,6 @@ class KubernetesSparkDependencyServerSuite extends SparkFunSuite with BeforeAndA jarsBytes) checkResponseBodyBytesMatches(retrofitService.downloadFiles(secret), filesBytes) - val downloadedCredentials = getTypedResponseResult( - retrofitService.getKubernetesCredentials(secret)) - assert(downloadedCredentials === kubernetesCredentials) } private def getTypedResponseResult[T](call: Call[T]): T = { From df8e0c86f5f2e1824cee252d7d40704e07e577b7 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 12 Apr 2017 13:47:16 -0700 Subject: [PATCH 12/22] Generalize to resource staging server outside of Spark --- ...rver.scala => ResourceStagingServer.scala} | 4 +- ...ice.scala => ResourceStagingService.scala} | 53 +++++------- ...scala => ResourceStagingServiceImpl.scala} | 82 +++++++------------ ...a => ResourceStagingServiceRetrofit.scala} | 22 ++--- ...scala => ResourceStagingServerSuite.scala} | 38 +++++---- ... => ResourceStagingServiceImplSuite.scala} | 49 +++++------ 6 files changed, 99 insertions(+), 149 deletions(-) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/{KubernetesSparkDependencyServer.scala => ResourceStagingServer.scala} (95%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/{KubernetesSparkDependencyService.scala => ResourceStagingService.scala} (64%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/{KubernetesSparkDependencyServiceImpl.scala => ResourceStagingServiceImpl.scala} (50%) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/{KubernetesSparkDependencyServiceRetrofit.scala => ResourceStagingServiceRetrofit.scala} (65%) rename resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/{KubernetesSparkDependencyServerSuite.scala => ResourceStagingServerSuite.scala} (72%) rename resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/{KubernetesSparkDependencyServiceImplSuite.scala => ResourceStagingServiceImplSuite.scala} (51%) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServer.scala similarity index 95% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServer.scala index 2b1022a7dcc9..e09a788c4532 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServer.scala @@ -26,9 +26,9 @@ import org.glassfish.jersey.media.multipart.MultiPartFeature import org.glassfish.jersey.server.ResourceConfig import org.glassfish.jersey.servlet.ServletContainer -private[spark] class KubernetesSparkDependencyServer( +private[spark] class ResourceStagingServer( port: Int, - serviceInstance: KubernetesSparkDependencyService) { + serviceInstance: ResourceStagingService) { private var jettyServer: Option[Server] = None diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala similarity index 64% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala index 83d704c398b6..4d841c42af6b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyService.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala @@ -40,20 +40,19 @@ import org.apache.spark.deploy.rest.KubernetesCredentials * {@link getKubernetesCredentials}. */ @Path("/") -private[spark] trait KubernetesSparkDependencyService { +private[spark] trait ResourceStagingService { /** - * Register an application with the dependency service, so that the driver pod can retrieve them - * when it runs. + * Register an application with the dependency service, so that pods with the given labels can + * retrieve them when they run. * - * @param driverPodName Name of the driver pod. - * @param driverPodNamespace Namespace for the driver pod. - * @param jars Application jars to upload, compacted together in tar + gzip format. The tarball - * should contain the jar files laid out in a flat hierarchy, without any directories. - * We take a stream here to avoid holding these entirely in memory. - * @param files Application files to upload, compacted together in tar + gzip format. THe tarball - * should contain the files laid out in a flat hierarchy, without any directories. - * We take a stream here to avoid holding these entirely in memory. + * @param resources Application resources to upload, compacted together in tar + gzip format. + * The tarball should contain the files laid out in a flat hierarchy, without + * any directories. We take a stream here to avoid holding these entirely in + * memory. + * @param podLabels Labels of pods to monitor. When no more pods are running with the given label, + * after some period of time, these dependencies will be cleared. + * @param podNamespace Namespace of pods to monitor. * @param kubernetesCredentials These credentials are primarily used to monitor the progress of * the application. When the application shuts down normally, shuts * down abnormally and does not restart, or fails to start entirely, @@ -61,36 +60,24 @@ private[spark] trait KubernetesSparkDependencyService { * @return A unique token that should be provided when retrieving these dependencies later. */ @PUT - @Consumes(Array(MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON)) + @Consumes(Array(MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN)) @Produces(Array(MediaType.TEXT_PLAIN)) - @Path("/dependencies") - def uploadDependencies( - @QueryParam("driverPodName") driverPodName: String, - @QueryParam("driverPodNamespace") driverPodNamespace: String, - @FormDataParam("jars") jars: InputStream, - @FormDataParam("files") files: InputStream, + @Path("/resources/upload") + def uploadResources( + @FormDataParam("podLabels") podLabels: Map[String, String], + @FormDataParam("podNamespace") podNamespace: String, + @FormDataParam("resources") resources: InputStream, @FormDataParam("kubernetesCredentials") kubernetesCredentials: KubernetesCredentials) : String /** - * Download an application's jars. The jars are provided as a stream, where the stream's - * underlying data matches the stream that was uploaded in {@link uploadDependencies}. + * Download an application's resources. The resources are provided as a stream, where the stream's + * underlying data matches the stream that was uploaded in uploadResources. */ @GET @Consumes(Array(MediaType.APPLICATION_JSON)) @Produces(Array(MediaType.APPLICATION_OCTET_STREAM)) - @Path("/dependencies/jars") - def downloadJars( - @HeaderParam("Authorization") applicationSecret: String): StreamingOutput - - /** - * Download an application's files. The jars are provided as a stream, where the stream's - * underlying data matches the stream that was uploaded in {@link uploadDependencies}. - */ - @GET - @Consumes(Array(MediaType.APPLICATION_JSON)) - @Produces(Array(MediaType.APPLICATION_OCTET_STREAM)) - @Path("/dependencies/files") - def downloadFiles( + @Path("/resources/download") + def downloadResources( @HeaderParam("Authorization") applicationSecret: String): StreamingOutput } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala similarity index 50% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala index 58067d5d7a23..3e0c0906a574 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala @@ -26,81 +26,60 @@ import scala.collection.mutable import org.apache.spark.SparkException import org.apache.spark.deploy.rest.KubernetesCredentials +import org.apache.spark.internal.Logging import org.apache.spark.util.Utils -private[spark] class KubernetesSparkDependencyServiceImpl(dependenciesRootDir: File) - extends KubernetesSparkDependencyService { +private[spark] class ResourceStagingServiceImpl(dependenciesRootDir: File) + extends ResourceStagingService with Logging { private val DIRECTORIES_LOCK = new Object - private val REGISTERED_DRIVERS_LOCK = new Object private val SPARK_APPLICATION_DEPENDENCIES_LOCK = new Object private val SECURE_RANDOM = new SecureRandom() // TODO clean up these resources based on the driver's lifecycle - private val registeredDrivers = mutable.Set.empty[PodNameAndNamespace] private val sparkApplicationDependencies = mutable.Map.empty[String, SparkApplicationDependencies] - override def uploadDependencies( - driverPodName: String, - driverPodNamespace: String, - jars: InputStream, - files: InputStream, + override def uploadResources( + podLabels: Map[String, String], + podNamespace: String, + resources: InputStream, kubernetesCredentials: KubernetesCredentials): String = { - val podNameAndNamespace = PodNameAndNamespace( - name = driverPodName, - namespace = driverPodNamespace) - REGISTERED_DRIVERS_LOCK.synchronized { - if (registeredDrivers.contains(podNameAndNamespace)) { - throw new SparkException(s"Spark application with driver pod named $driverPodName" + - s" and namespace $driverPodNamespace already uploaded its dependencies.") - } - registeredDrivers.add(podNameAndNamespace) - } + val resourcesId = UUID.randomUUID().toString + val secretBytes = new Array[Byte](1024) + SECURE_RANDOM.nextBytes(secretBytes) + val applicationSecret = resourcesId + "-" + BaseEncoding.base64().encode(secretBytes) + + val namespaceDir = new File(dependenciesRootDir, podNamespace) + val resourcesDir = new File(namespaceDir, resourcesId) try { - val secretBytes = new Array[Byte](1024) - SECURE_RANDOM.nextBytes(secretBytes) - val applicationSecret = UUID.randomUUID() + "-" + BaseEncoding.base64().encode(secretBytes) - val namespaceDir = new File(dependenciesRootDir, podNameAndNamespace.namespace) - val applicationDir = new File(namespaceDir, podNameAndNamespace.name) DIRECTORIES_LOCK.synchronized { - if (!applicationDir.exists()) { - if (!applicationDir.mkdirs()) { + if (!resourcesDir.exists()) { + if (!resourcesDir.mkdirs()) { throw new SparkException("Failed to create dependencies directory for application" + - s" at ${applicationDir.getAbsolutePath}") + s" at ${resourcesDir.getAbsolutePath}") } } } - val jarsTgz = new File(applicationDir, "jars.tgz") // TODO encrypt the written data with the secret. - Utils.tryWithResource(new FileOutputStream(jarsTgz)) { ByteStreams.copy(jars, _) } - val filesTgz = new File(applicationDir, "files.tgz") - Utils.tryWithResource(new FileOutputStream(filesTgz)) { ByteStreams.copy(files, _) } + val resourcesTgz = new File(resourcesDir, "resources.tgz") + Utils.tryWithResource(new FileOutputStream(resourcesTgz)) { ByteStreams.copy(resources, _) } SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized { sparkApplicationDependencies(applicationSecret) = SparkApplicationDependencies( - podNameAndNamespace, - jarsTgz, - filesTgz, + podLabels, + podNamespace, + resourcesTgz, kubernetesCredentials) } applicationSecret } catch { case e: Throwable => - // Revert the registered driver if we failed for any reason, most likely from disk ops - registeredDrivers.remove(podNameAndNamespace) + if (!resourcesDir.delete()) { + logWarning(s"Failed to delete application directory $resourcesDir.") + } throw e } } - override def downloadJars(applicationSecret: String): StreamingOutput = { - appFileToStreamingOutput(applicationSecret, _.jarsTgz) - } - - override def downloadFiles(applicationSecret: String): StreamingOutput = { - appFileToStreamingOutput(applicationSecret, _.filesTgz) - } - - private def appFileToStreamingOutput( - applicationSecret: String, - dependency: (SparkApplicationDependencies => File)): StreamingOutput = { + override def downloadResources(applicationSecret: String): StreamingOutput = { val applicationDependencies = SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized { sparkApplicationDependencies .get(applicationSecret) @@ -108,15 +87,14 @@ private[spark] class KubernetesSparkDependencyServiceImpl(dependenciesRootDir: F } new StreamingOutput { override def write(outputStream: OutputStream) = { - Files.copy(dependency(applicationDependencies), outputStream) + Files.copy(applicationDependencies.resourcesTgz, outputStream) } } } } -private case class PodNameAndNamespace(name: String, namespace: String) private case class SparkApplicationDependencies( - driverPodNameAndNamespace: PodNameAndNamespace, - jarsTgz: File, - filesTgz: File, + podLabels: Map[String, String], + podNamespace: String, + resourcesTgz: File, kubernetesCredentials: KubernetesCredentials) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala similarity index 65% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala index 86809ff695d6..6967b2a0f9a3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceRetrofit.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala @@ -23,28 +23,22 @@ import retrofit2.http.{Multipart, Streaming} import org.apache.spark.deploy.rest.KubernetesCredentials /** - * Retrofit-compatible variant of {@link KubernetesSparkDependencyService}. For documentation on + * Retrofit-compatible variant of {@link ResourceStagingService}. For documentation on * how to use this service, see the aforementioned JAX-RS based interface. */ -private[spark] trait KubernetesSparkDependencyServiceRetrofit { +private[spark] trait ResourceStagingServiceRetrofit { @Multipart - @retrofit2.http.PUT("/api/dependencies") + @retrofit2.http.PUT("/api/resources/upload") def uploadDependencies( - @retrofit2.http.Query("driverPodName") driverPodName: String, - @retrofit2.http.Query("driverPodNamespace") driverPodNamespace: String, - @retrofit2.http.Part("jars") jars: RequestBody, - @retrofit2.http.Part("files") files: RequestBody, + @retrofit2.http.Part("podLabels") podLabels: RequestBody, + @retrofit2.http.Part("podNamespace") podNamespace: RequestBody, + @retrofit2.http.Part("resources") resources: RequestBody, @retrofit2.http.Part("kubernetesCredentials") kubernetesCredentials: RequestBody): Call[String] @Streaming - @retrofit2.http.GET("/api/dependencies/jars") - def downloadJars( - @retrofit2.http.Header("Authorization") applicationSecret: String): Call[ResponseBody] - - @Streaming - @retrofit2.http.GET("/api/dependencies/files") - def downloadFiles( + @retrofit2.http.GET("/api/resources/download") + def downloadResources( @retrofit2.http.Header("Authorization") applicationSecret: String): Call[ResponseBody] } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala similarity index 72% rename from resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala rename to resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala index 5fceb5664c00..b9c2ca6f87dd 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServerSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala @@ -16,6 +16,7 @@ */ package org.apache.spark.deploy.rest.kubernetes.v2 +import java.net.ServerSocket import javax.ws.rs.core.MediaType import com.fasterxml.jackson.databind.ObjectMapper @@ -38,10 +39,11 @@ import org.apache.spark.util.Utils * we've configured the Jetty server correctly and that the endpoints reached over HTTP can * receive streamed uploads and can stream downloads. */ -class KubernetesSparkDependencyServerSuite extends SparkFunSuite with BeforeAndAfterAll { +class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfterAll { - private val serviceImpl = new KubernetesSparkDependencyServiceImpl(Utils.createTempDir()) - private val server = new KubernetesSparkDependencyServer(10021, serviceImpl) + private val serverPort = new ServerSocket(0).getLocalPort + private val serviceImpl = new ResourceStagingServiceImpl(Utils.createTempDir()) + private val server = new ResourceStagingServer(serverPort, serviceImpl) private val OBJECT_MAPPER = new ObjectMapper().registerModule(new DefaultScalaModule) override def beforeAll(): Unit = { @@ -53,27 +55,27 @@ class KubernetesSparkDependencyServerSuite extends SparkFunSuite with BeforeAndA } test("Accept file and jar uploads and downloads") { - val retrofitService = RetrofitUtils.createRetrofitClient("http://localhost:10021/", - classOf[KubernetesSparkDependencyServiceRetrofit]) - val jarsBytes = Array[Byte](1, 2, 3, 4) - val filesBytes = Array[Byte](5, 6, 7) - val jarsRequestBody = RequestBody.create( - okhttp3.MediaType.parse(MediaType.MULTIPART_FORM_DATA), jarsBytes) - val filesRequestBody = RequestBody.create( - okhttp3.MediaType.parse(MediaType.MULTIPART_FORM_DATA), filesBytes) + val retrofitService = RetrofitUtils.createRetrofitClient(s"http://localhost:$serverPort/", + classOf[ResourceStagingServiceRetrofit]) + val resourcesBytes = Array[Byte](1, 2, 3, 4) + val labels = Map("label1" -> "label1Value", "label2" -> "label2value") + val namespace = "namespace" + val labelsJson = OBJECT_MAPPER.writer().writeValueAsString(labels) + val resourcesRequestBody = RequestBody.create( + okhttp3.MediaType.parse(MediaType.MULTIPART_FORM_DATA), resourcesBytes) + val labelsRequestBody = RequestBody.create( + okhttp3.MediaType.parse(MediaType.APPLICATION_JSON), labelsJson) + val namespaceRequestBody = RequestBody.create( + okhttp3.MediaType.parse(MediaType.TEXT_PLAIN), namespace) val kubernetesCredentials = KubernetesCredentials(Some("token"), Some("ca-cert"), None, None) val kubernetesCredentialsString = OBJECT_MAPPER.writer() .writeValueAsString(kubernetesCredentials) val kubernetesCredentialsBody = RequestBody.create( okhttp3.MediaType.parse(MediaType.APPLICATION_JSON), kubernetesCredentialsString) - val uploadResponse = retrofitService.uploadDependencies("podName", "podNamespace", - jarsRequestBody, filesRequestBody, kubernetesCredentialsBody) + val uploadResponse = retrofitService.uploadDependencies( + labelsRequestBody, namespaceRequestBody, resourcesRequestBody, kubernetesCredentialsBody) val secret = getTypedResponseResult(uploadResponse) - - checkResponseBodyBytesMatches(retrofitService.downloadJars(secret), - jarsBytes) - checkResponseBodyBytesMatches(retrofitService.downloadFiles(secret), - filesBytes) + checkResponseBodyBytesMatches(retrofitService.downloadResources(secret), resourcesBytes) } private def getTypedResponseResult[T](call: Call[T]): T = { diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala similarity index 51% rename from resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala rename to resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala index ab91fa2db6fd..6616d0fbe680 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/KubernetesSparkDependencyServiceImplSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.deploy.rest.kubernetes.v2 -import java.io.ByteArrayInputStream +import java.io.{ByteArrayInputStream, File} import java.nio.file.Paths import java.util.UUID @@ -32,41 +32,30 @@ import org.apache.spark.util.Utils * differs from that of KubernetesSparkDependencyServerSuite as here we invoke the * implementation methods directly as opposed to over HTTP. */ -class KubernetesSparkDependencyServiceImplSuite extends SparkFunSuite with BeforeAndAfter { +class ResourceStagingServiceImplSuite extends SparkFunSuite with BeforeAndAfter { private val dependencyRootDir = Utils.createTempDir() - private val serviceImpl = new KubernetesSparkDependencyServiceImpl(dependencyRootDir) - private val jarsBytes = Array[Byte](1, 2, 3, 4) - private val filesBytes = Array[Byte](5, 6, 7) + private val serviceImpl = new ResourceStagingServiceImpl(dependencyRootDir) + private val resourceBytes = Array[Byte](1, 2, 3, 4) private val kubernetesCredentials = KubernetesCredentials( Some("token"), Some("caCert"), Some("key"), Some("cert")) - private var podName: String = _ - private var podNamespace: String = _ - - before { - podName = UUID.randomUUID().toString - podNamespace = UUID.randomUUID().toString - } + private var namespace = "namespace" + private val labels = Map("label1" -> "label1value", "label2" -> "label2value") test("Uploads should write data to the underlying disk") { - Utils.tryWithResource(new ByteArrayInputStream(jarsBytes)) { jarsStream => - Utils.tryWithResource(new ByteArrayInputStream(filesBytes)) { filesStream => - serviceImpl.uploadDependencies( - "name", "namespace", jarsStream, filesStream, kubernetesCredentials) - } + Utils.tryWithResource(new ByteArrayInputStream(resourceBytes)) { resourceStream => + serviceImpl.uploadResources(labels, namespace, resourceStream, kubernetesCredentials) } - val jarsTgz = Paths.get(dependencyRootDir.getAbsolutePath, "namespace", "name", "jars.tgz") - .toFile - assert(jarsTgz.isFile, - s"Jars written to ${jarsTgz.getAbsolutePath} does not exist or is not a file.") - val jarsTgzBytes = Files.toByteArray(jarsTgz) - assert(jarsBytes.toSeq === jarsTgzBytes.toSeq, "Incorrect jars bytes were written.") - val filesTgz = Paths.get(dependencyRootDir.getAbsolutePath, "namespace", "name", "files.tgz") - .toFile - assert(filesTgz.isFile, - s"Files written to ${filesTgz.getAbsolutePath} does not exist or is not a file.") - val filesTgzBytes = Files.toByteArray(filesTgz) - assert(filesBytes.toSeq === filesTgzBytes.toSeq, "Incorrect files bytes were written.") + val resourceNamespaceDir = Paths.get(dependencyRootDir.getAbsolutePath, "namespace").toFile + assert(resourceNamespaceDir.isDirectory, s"Resource namespace dir was not created at" + + s" ${resourceNamespaceDir.getAbsolutePath} or is not a directory.") + val resourceDirs = resourceNamespaceDir.listFiles() + assert(resourceDirs.length === 1, s"Resource root directory did not have exactly one" + + s" subdirectory. Got: ${resourceDirs.map(_.getAbsolutePath).mkString(",")}") + val resourceTgz = new File(resourceDirs(0), "resources.tgz") + assert(resourceTgz.isFile, + s"Resources written to ${resourceTgz.getAbsolutePath} does not exist or is not a file.") + val resourceTgzBytes = Files.toByteArray(resourceTgz) + assert(resourceTgzBytes.toSeq === resourceBytes.toSeq, "Incorrect resource bytes were written.") } - } From 24452ece52017c106b4f56d8f40cba3eeeee01ac Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 12 Apr 2017 13:49:23 -0700 Subject: [PATCH 13/22] Update code documentation --- .../rest/kubernetes/v2/ResourceStagingService.scala | 13 +++++++------ .../v2/ResourceStagingServiceImplSuite.scala | 3 ++- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala index 4d841c42af6b..b9f373190d8e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala @@ -25,19 +25,20 @@ import org.glassfish.jersey.media.multipart.FormDataParam import org.apache.spark.deploy.rest.KubernetesCredentials /** - * Service that receives application data that can be received later on. This is primarily used + * Service that receives application data that can be retrieved later on. This is primarily used * in the context of Spark, but the concept is generic enough to be used for arbitrary applications. * The use case is to have a place for Kubernetes application submitters to bootstrap dynamic, * heavyweight application data for pods. Application submitters may have data stored on their * local disks that they want to provide to the pods they create through the API server. ConfigMaps * are one way to provide this data, but the data in ConfigMaps are stored in etcd which cannot - * maintain data in the hundreds of megabytes in size.
- *
+ * maintain data in the hundreds of megabytes in size. + *

* The general use case is for an application submitter to ship the dependencies to the server via - * {@link uploadDependencies}; the application submitter will then receive a unique secure token. + * {@link uploadResources}; the application submitter will then receive a unique secure token. * The application submitter then ought to convert the token into a secret, and use this secret in - * a pod that fetches the uploaded dependencies via {@link downloadJars}, {@link downloadFiles}, and - * {@link getKubernetesCredentials}. + * a pod that fetches the uploaded dependencies via {@link downloadResources}. An application can + * provide multiple resource bundles simply by hitting the upload endpoint multiple times and + * downloading each bundle with the appropriate secret. */ @Path("/") private[spark] trait ResourceStagingService { diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala index 6616d0fbe680..86a45d0f1fe5 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala @@ -30,7 +30,8 @@ import org.apache.spark.util.Utils /** * Unit, scala-level tests for KubernetesSparkDependencyServiceImpl. The coverage here * differs from that of KubernetesSparkDependencyServerSuite as here we invoke the - * implementation methods directly as opposed to over HTTP. + * implementation methods directly as opposed to over HTTP, as well as check the + * data written to the underlying disk. */ class ResourceStagingServiceImplSuite extends SparkFunSuite with BeforeAndAfter { From f59717156f49980e0df1ea8eef3bae8a046f2ccd Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 12 Apr 2017 13:57:22 -0700 Subject: [PATCH 14/22] Val instead of var --- .../rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala index 86a45d0f1fe5..c124fce77cbf 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy.rest.kubernetes.v2 import java.io.{ByteArrayInputStream, File} import java.nio.file.Paths -import java.util.UUID import com.google.common.io.Files import org.scalatest.BeforeAndAfter @@ -40,7 +39,7 @@ class ResourceStagingServiceImplSuite extends SparkFunSuite with BeforeAndAfter private val resourceBytes = Array[Byte](1, 2, 3, 4) private val kubernetesCredentials = KubernetesCredentials( Some("token"), Some("caCert"), Some("key"), Some("cert")) - private var namespace = "namespace" + private val namespace = "namespace" private val labels = Map("label1" -> "label1value", "label2" -> "label2value") test("Uploads should write data to the underlying disk") { From 2e6fe070bee73c556d74ac73583994c78f74b6cc Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 12 Apr 2017 16:10:26 -0700 Subject: [PATCH 15/22] Fix build --- .../rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala index c124fce77cbf..a6792e32ce90 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala @@ -23,7 +23,7 @@ import com.google.common.io.Files import org.scalatest.BeforeAndAfter import org.apache.spark.SparkFunSuite -import org.apache.spark.deploy.rest.KubernetesCredentials +import org.apache.spark.deploy.rest.kubernetes.v1.KubernetesCredentials import org.apache.spark.util.Utils /** From e5f26aa6028cc1398f4f0adb262a61f5f74a32fd Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 12 Apr 2017 17:51:42 -0700 Subject: [PATCH 16/22] Fix naming, remove unused import --- .../rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala | 4 +--- .../rest/kubernetes/v2/ResourceStagingServerSuite.scala | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala index 6967b2a0f9a3..6ab118a44b7e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala @@ -20,8 +20,6 @@ import okhttp3.{RequestBody, ResponseBody} import retrofit2.Call import retrofit2.http.{Multipart, Streaming} -import org.apache.spark.deploy.rest.KubernetesCredentials - /** * Retrofit-compatible variant of {@link ResourceStagingService}. For documentation on * how to use this service, see the aforementioned JAX-RS based interface. @@ -30,7 +28,7 @@ private[spark] trait ResourceStagingServiceRetrofit { @Multipart @retrofit2.http.PUT("/api/resources/upload") - def uploadDependencies( + def uploadResources( @retrofit2.http.Part("podLabels") podLabels: RequestBody, @retrofit2.http.Part("podNamespace") podNamespace: RequestBody, @retrofit2.http.Part("resources") resources: RequestBody, diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala index b9c2ca6f87dd..db06ee8b6d65 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala @@ -72,7 +72,7 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfterAll { .writeValueAsString(kubernetesCredentials) val kubernetesCredentialsBody = RequestBody.create( okhttp3.MediaType.parse(MediaType.APPLICATION_JSON), kubernetesCredentialsString) - val uploadResponse = retrofitService.uploadDependencies( + val uploadResponse = retrofitService.uploadResources( labelsRequestBody, namespaceRequestBody, resourcesRequestBody, kubernetesCredentialsBody) val secret = getTypedResponseResult(uploadResponse) checkResponseBodyBytesMatches(retrofitService.downloadResources(secret), resourcesBytes) From c408ff9a46a2f079bab6049ba60139c676286f7d Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 12 Apr 2017 17:55:37 -0700 Subject: [PATCH 17/22] Move suites from integration test package to core --- .../deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala | 0 .../rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename resource-managers/kubernetes/{integration-tests => core}/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala (100%) rename resource-managers/kubernetes/{integration-tests => core}/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala (100%) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala similarity index 100% rename from resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala rename to resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala similarity index 100% rename from resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala rename to resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala From 64eddc1da37eff926f953d542ee9a9b29afe1653 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 19 Apr 2017 13:39:37 -0700 Subject: [PATCH 18/22] Use TrieMap instead of locks --- .../v2/ResourceStagingServiceImpl.scala | 34 +++++++------------ 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala index 3e0c0906a574..74e766b6a84f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala @@ -22,7 +22,7 @@ import java.util.UUID import javax.ws.rs.core.StreamingOutput import com.google.common.io.{BaseEncoding, ByteStreams, Files} -import scala.collection.mutable +import scala.collection.concurrent.TrieMap import org.apache.spark.SparkException import org.apache.spark.deploy.rest.KubernetesCredentials @@ -32,11 +32,9 @@ import org.apache.spark.util.Utils private[spark] class ResourceStagingServiceImpl(dependenciesRootDir: File) extends ResourceStagingService with Logging { - private val DIRECTORIES_LOCK = new Object - private val SPARK_APPLICATION_DEPENDENCIES_LOCK = new Object private val SECURE_RANDOM = new SecureRandom() // TODO clean up these resources based on the driver's lifecycle - private val sparkApplicationDependencies = mutable.Map.empty[String, SparkApplicationDependencies] + private val sparkApplicationDependencies = TrieMap.empty[String, ApplicationResources] override def uploadResources( podLabels: Map[String, String], @@ -51,24 +49,20 @@ private[spark] class ResourceStagingServiceImpl(dependenciesRootDir: File) val namespaceDir = new File(dependenciesRootDir, podNamespace) val resourcesDir = new File(namespaceDir, resourcesId) try { - DIRECTORIES_LOCK.synchronized { - if (!resourcesDir.exists()) { - if (!resourcesDir.mkdirs()) { - throw new SparkException("Failed to create dependencies directory for application" + - s" at ${resourcesDir.getAbsolutePath}") - } + if (!resourcesDir.exists()) { + if (!resourcesDir.mkdirs()) { + throw new SparkException("Failed to create dependencies directory for application" + + s" at ${resourcesDir.getAbsolutePath}") } } // TODO encrypt the written data with the secret. val resourcesTgz = new File(resourcesDir, "resources.tgz") Utils.tryWithResource(new FileOutputStream(resourcesTgz)) { ByteStreams.copy(resources, _) } - SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized { - sparkApplicationDependencies(applicationSecret) = SparkApplicationDependencies( - podLabels, - podNamespace, - resourcesTgz, - kubernetesCredentials) - } + sparkApplicationDependencies(applicationSecret) = ApplicationResources( + podLabels, + podNamespace, + resourcesTgz, + kubernetesCredentials) applicationSecret } catch { case e: Throwable => @@ -80,11 +74,9 @@ private[spark] class ResourceStagingServiceImpl(dependenciesRootDir: File) } override def downloadResources(applicationSecret: String): StreamingOutput = { - val applicationDependencies = SPARK_APPLICATION_DEPENDENCIES_LOCK.synchronized { - sparkApplicationDependencies + val applicationDependencies = sparkApplicationDependencies .get(applicationSecret) .getOrElse(throw new SparkException("No application found for the provided token.")) - } new StreamingOutput { override def write(outputStream: OutputStream) = { Files.copy(applicationDependencies.resourcesTgz, outputStream) @@ -93,7 +85,7 @@ private[spark] class ResourceStagingServiceImpl(dependenciesRootDir: File) } } -private case class SparkApplicationDependencies( +private case class ApplicationResources( podLabels: Map[String, String], podNamespace: String, resourcesTgz: File, From 8f798022e115c316522da1cb460f71ae6827e7f6 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 19 Apr 2017 13:41:57 -0700 Subject: [PATCH 19/22] Address comments --- .../rest/kubernetes/v2/ResourceStagingServiceImpl.scala | 2 +- .../rest/kubernetes/v2/ResourceStagingServerSuite.scala | 2 +- .../rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala index 74e766b6a84f..df5f83e9b437 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala @@ -56,7 +56,7 @@ private[spark] class ResourceStagingServiceImpl(dependenciesRootDir: File) } } // TODO encrypt the written data with the secret. - val resourcesTgz = new File(resourcesDir, "resources.tgz") + val resourcesTgz = new File(resourcesDir, "resources.data") Utils.tryWithResource(new FileOutputStream(resourcesTgz)) { ByteStreams.copy(resources, _) } sparkApplicationDependencies(applicationSecret) = ApplicationResources( podLabels, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala index db06ee8b6d65..098f30a2ea9d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.deploy.rest.KubernetesCredentials import org.apache.spark.util.Utils /** - * Tests for KubernetesSparkDependencyServer and its APIs. Note that this is not an end-to-end + * Tests for {@link ResourceStagingServer} and its APIs. Note that this is not an end-to-end * integration test, and as such does not upload and download files in tar.gz as would be done * in production. Thus we use the retrofit clients directly despite the fact that in practice * we would likely want to create an opinionated abstraction on top of the retrofit client; we diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala index c124fce77cbf..5bc37a54161b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.util.Utils * implementation methods directly as opposed to over HTTP, as well as check the * data written to the underlying disk. */ -class ResourceStagingServiceImplSuite extends SparkFunSuite with BeforeAndAfter { +class ResourceStagingServiceImplSuite extends SparkFunSuite { private val dependencyRootDir = Utils.createTempDir() private val serviceImpl = new ResourceStagingServiceImpl(dependencyRootDir) @@ -52,7 +52,7 @@ class ResourceStagingServiceImplSuite extends SparkFunSuite with BeforeAndAfter val resourceDirs = resourceNamespaceDir.listFiles() assert(resourceDirs.length === 1, s"Resource root directory did not have exactly one" + s" subdirectory. Got: ${resourceDirs.map(_.getAbsolutePath).mkString(",")}") - val resourceTgz = new File(resourceDirs(0), "resources.tgz") + val resourceTgz = new File(resourceDirs(0), "resources.data") assert(resourceTgz.isFile, s"Resources written to ${resourceTgz.getAbsolutePath} does not exist or is not a file.") val resourceTgzBytes = Files.toByteArray(resourceTgz) From 04099d61a87e3a6886950aa68944f2cbe4a80145 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 19 Apr 2017 13:49:16 -0700 Subject: [PATCH 20/22] Fix imports --- .../rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala index 5bc37a54161b..b92257005d5d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala @@ -20,7 +20,6 @@ import java.io.{ByteArrayInputStream, File} import java.nio.file.Paths import com.google.common.io.Files -import org.scalatest.BeforeAndAfter import org.apache.spark.SparkFunSuite import org.apache.spark.deploy.rest.KubernetesCredentials From d713c270e87a4552d733f18a08cdb32b6e2647b1 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 20 Apr 2017 15:06:36 -0700 Subject: [PATCH 21/22] Change paths, use POST instead of PUT --- .../rest/kubernetes/v2/ResourceStagingService.scala | 8 ++++---- .../kubernetes/v2/ResourceStagingServiceRetrofit.scala | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala index b9f373190d8e..4f79f244289d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.rest.kubernetes.v2 import java.io.InputStream -import javax.ws.rs.{Consumes, GET, HeaderParam, Path, Produces, PUT, QueryParam} +import javax.ws.rs.{Consumes, GET, HeaderParam, Path, POST, Produces} import javax.ws.rs.core.{MediaType, StreamingOutput} import org.glassfish.jersey.media.multipart.FormDataParam @@ -60,10 +60,10 @@ private[spark] trait ResourceStagingService { * the data uploaded through this endpoint is cleared. * @return A unique token that should be provided when retrieving these dependencies later. */ - @PUT + @POST @Consumes(Array(MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN)) @Produces(Array(MediaType.TEXT_PLAIN)) - @Path("/resources/upload") + @Path("/resources/") def uploadResources( @FormDataParam("podLabels") podLabels: Map[String, String], @FormDataParam("podNamespace") podNamespace: String, @@ -78,7 +78,7 @@ private[spark] trait ResourceStagingService { @GET @Consumes(Array(MediaType.APPLICATION_JSON)) @Produces(Array(MediaType.APPLICATION_OCTET_STREAM)) - @Path("/resources/download") + @Path("/resources/") def downloadResources( @HeaderParam("Authorization") applicationSecret: String): StreamingOutput } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala index 6ab118a44b7e..04746d238957 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala @@ -27,7 +27,7 @@ import retrofit2.http.{Multipart, Streaming} private[spark] trait ResourceStagingServiceRetrofit { @Multipart - @retrofit2.http.PUT("/api/resources/upload") + @retrofit2.http.POST("/api/resources/") def uploadResources( @retrofit2.http.Part("podLabels") podLabels: RequestBody, @retrofit2.http.Part("podNamespace") podNamespace: RequestBody, @@ -36,7 +36,7 @@ private[spark] trait ResourceStagingServiceRetrofit { kubernetesCredentials: RequestBody): Call[String] @Streaming - @retrofit2.http.GET("/api/resources/download") + @retrofit2.http.GET("/api/resources/") def downloadResources( @retrofit2.http.Header("Authorization") applicationSecret: String): Call[ResponseBody] } From 720c38d03b4c8d6c4afdf85bc8845aa6de55e21a Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 20 Apr 2017 16:21:17 -0700 Subject: [PATCH 22/22] Use a resource identifier as well as a resource secret --- .../v2/ResourceStagingService.scala | 17 +++++----- .../v2/ResourceStagingServiceImpl.scala | 34 +++++++++++-------- .../v2/ResourceStagingServiceRetrofit.scala | 12 +++---- .../v2/StagedResourceIdentifier.scala | 19 +++++++++++ .../v2/ResourceStagingServerSuite.scala | 6 ++-- 5 files changed, 58 insertions(+), 30 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/StagedResourceIdentifier.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala index 4f79f244289d..5f7ceb461615 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.rest.kubernetes.v2 import java.io.InputStream -import javax.ws.rs.{Consumes, GET, HeaderParam, Path, POST, Produces} +import javax.ws.rs.{Consumes, GET, HeaderParam, Path, PathParam, POST, Produces} import javax.ws.rs.core.{MediaType, StreamingOutput} import org.glassfish.jersey.media.multipart.FormDataParam @@ -40,11 +40,11 @@ import org.apache.spark.deploy.rest.KubernetesCredentials * provide multiple resource bundles simply by hitting the upload endpoint multiple times and * downloading each bundle with the appropriate secret. */ -@Path("/") +@Path("/v0") private[spark] trait ResourceStagingService { /** - * Register an application with the dependency service, so that pods with the given labels can + * Register a resource with the dependency service, so that pods with the given labels can * retrieve them when they run. * * @param resources Application resources to upload, compacted together in tar + gzip format. @@ -62,14 +62,14 @@ private[spark] trait ResourceStagingService { */ @POST @Consumes(Array(MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN)) - @Produces(Array(MediaType.TEXT_PLAIN)) - @Path("/resources/") + @Produces(Array(MediaType.APPLICATION_JSON)) + @Path("/resources") def uploadResources( @FormDataParam("podLabels") podLabels: Map[String, String], @FormDataParam("podNamespace") podNamespace: String, @FormDataParam("resources") resources: InputStream, @FormDataParam("kubernetesCredentials") kubernetesCredentials: KubernetesCredentials) - : String + : StagedResourceIdentifier /** * Download an application's resources. The resources are provided as a stream, where the stream's @@ -78,7 +78,8 @@ private[spark] trait ResourceStagingService { @GET @Consumes(Array(MediaType.APPLICATION_JSON)) @Produces(Array(MediaType.APPLICATION_OCTET_STREAM)) - @Path("/resources/") + @Path("/resources/{resourceId}") def downloadResources( - @HeaderParam("Authorization") applicationSecret: String): StreamingOutput + @PathParam("resourceId") resourceId: String, + @HeaderParam("Authorization") resourceSecret: String): StreamingOutput } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala index df5f83e9b437..bb338dacdf51 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.rest.kubernetes.v2 import java.io.{File, FileOutputStream, InputStream, OutputStream} import java.security.SecureRandom import java.util.UUID +import javax.ws.rs.{NotAuthorizedException, NotFoundException} import javax.ws.rs.core.StreamingOutput import com.google.common.io.{BaseEncoding, ByteStreams, Files} @@ -34,20 +35,20 @@ private[spark] class ResourceStagingServiceImpl(dependenciesRootDir: File) private val SECURE_RANDOM = new SecureRandom() // TODO clean up these resources based on the driver's lifecycle - private val sparkApplicationDependencies = TrieMap.empty[String, ApplicationResources] + private val stagedResources = TrieMap.empty[String, StagedResources] override def uploadResources( podLabels: Map[String, String], podNamespace: String, resources: InputStream, - kubernetesCredentials: KubernetesCredentials): String = { - val resourcesId = UUID.randomUUID().toString + kubernetesCredentials: KubernetesCredentials): StagedResourceIdentifier = { + val resourceId = UUID.randomUUID().toString val secretBytes = new Array[Byte](1024) SECURE_RANDOM.nextBytes(secretBytes) - val applicationSecret = resourcesId + "-" + BaseEncoding.base64().encode(secretBytes) + val resourceSecret = resourceId + "-" + BaseEncoding.base64().encode(secretBytes) val namespaceDir = new File(dependenciesRootDir, podNamespace) - val resourcesDir = new File(namespaceDir, resourcesId) + val resourcesDir = new File(namespaceDir, resourceId) try { if (!resourcesDir.exists()) { if (!resourcesDir.mkdirs()) { @@ -58,12 +59,13 @@ private[spark] class ResourceStagingServiceImpl(dependenciesRootDir: File) // TODO encrypt the written data with the secret. val resourcesTgz = new File(resourcesDir, "resources.data") Utils.tryWithResource(new FileOutputStream(resourcesTgz)) { ByteStreams.copy(resources, _) } - sparkApplicationDependencies(applicationSecret) = ApplicationResources( + stagedResources(resourceId) = StagedResources( + resourceSecret, podLabels, podNamespace, resourcesTgz, kubernetesCredentials) - applicationSecret + StagedResourceIdentifier(resourceId, resourceSecret) } catch { case e: Throwable => if (!resourcesDir.delete()) { @@ -73,20 +75,24 @@ private[spark] class ResourceStagingServiceImpl(dependenciesRootDir: File) } } - override def downloadResources(applicationSecret: String): StreamingOutput = { - val applicationDependencies = sparkApplicationDependencies - .get(applicationSecret) - .getOrElse(throw new SparkException("No application found for the provided token.")) + override def downloadResources(resourceId: String, resourceSecret: String): StreamingOutput = { + val resource = stagedResources + .get(resourceId) + .getOrElse(throw new NotFoundException(s"No resource bundle found with id $resourceId")) + if (!resource.resourceSecret.equals(resourceSecret)) { + throw new NotAuthorizedException(s"Unauthorized to download resource with id $resourceId") + } new StreamingOutput { override def write(outputStream: OutputStream) = { - Files.copy(applicationDependencies.resourcesTgz, outputStream) + Files.copy(resource.resourcesFile, outputStream) } } } } -private case class ApplicationResources( +private case class StagedResources( + resourceSecret: String, podLabels: Map[String, String], podNamespace: String, - resourcesTgz: File, + resourcesFile: File, kubernetesCredentials: KubernetesCredentials) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala index 04746d238957..daf03f764b35 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceRetrofit.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.rest.kubernetes.v2 import okhttp3.{RequestBody, ResponseBody} import retrofit2.Call -import retrofit2.http.{Multipart, Streaming} +import retrofit2.http.{Multipart, Path, Streaming} /** * Retrofit-compatible variant of {@link ResourceStagingService}. For documentation on @@ -27,16 +27,16 @@ import retrofit2.http.{Multipart, Streaming} private[spark] trait ResourceStagingServiceRetrofit { @Multipart - @retrofit2.http.POST("/api/resources/") + @retrofit2.http.POST("/api/v0/resources/") def uploadResources( @retrofit2.http.Part("podLabels") podLabels: RequestBody, @retrofit2.http.Part("podNamespace") podNamespace: RequestBody, @retrofit2.http.Part("resources") resources: RequestBody, @retrofit2.http.Part("kubernetesCredentials") - kubernetesCredentials: RequestBody): Call[String] + kubernetesCredentials: RequestBody): Call[StagedResourceIdentifier] @Streaming - @retrofit2.http.GET("/api/resources/") - def downloadResources( - @retrofit2.http.Header("Authorization") applicationSecret: String): Call[ResponseBody] + @retrofit2.http.GET("/api/v0/resources/{resourceId}") + def downloadResources(@Path("resourceId") resourceId: String, + @retrofit2.http.Header("Authorization") resourceSecret: String): Call[ResponseBody] } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/StagedResourceIdentifier.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/StagedResourceIdentifier.scala new file mode 100644 index 000000000000..65bc9bc17dae --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/StagedResourceIdentifier.scala @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes.v2 + +case class StagedResourceIdentifier(resourceId: String, resourceSecret: String) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala index 098f30a2ea9d..70ba5be39504 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala @@ -74,8 +74,10 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfterAll { okhttp3.MediaType.parse(MediaType.APPLICATION_JSON), kubernetesCredentialsString) val uploadResponse = retrofitService.uploadResources( labelsRequestBody, namespaceRequestBody, resourcesRequestBody, kubernetesCredentialsBody) - val secret = getTypedResponseResult(uploadResponse) - checkResponseBodyBytesMatches(retrofitService.downloadResources(secret), resourcesBytes) + val resourceIdentifier = getTypedResponseResult(uploadResponse) + checkResponseBodyBytesMatches( + retrofitService.downloadResources( + resourceIdentifier.resourceId, resourceIdentifier.resourceSecret), resourcesBytes) } private def getTypedResponseResult[T](call: Call[T]): T = {