Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 3f6e5ea

Browse files
mccheahash211
authored andcommitted
Staging server for receiving application dependencies. (#212)
* Staging server for receiving application dependencies. * Add unit test for file writing * Minor fixes * Remove getting credentials from the API We still want to post them because in the future we can use these credentials to monitor the API server and handle cleaning up the data accordingly. * Generalize to resource staging server outside of Spark * Update code documentation * Val instead of var * Fix naming, remove unused import * Move suites from integration test package to core * Use TrieMap instead of locks * Address comments * Fix imports * Change paths, use POST instead of PUT * Use a resource identifier as well as a resource secret
1 parent 1388e0a commit 3f6e5ea

File tree

10 files changed

+544
-0
lines changed

10 files changed

+544
-0
lines changed

pom.xml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@
137137
<parquet.version>1.8.1</parquet.version>
138138
<hive.parquet.version>1.6.0</hive.parquet.version>
139139
<feign.version>8.18.0</feign.version>
140+
<retrofit.version>2.2.0</retrofit.version>
140141
<bouncycastle.version>1.54</bouncycastle.version>
141142
<jetty.version>9.2.16.v20160414</jetty.version>
142143
<javaxservlet.version>3.1.0</javaxservlet.version>
@@ -327,6 +328,21 @@
327328
<artifactId>feign-jaxrs</artifactId>
328329
<version>${feign.version}</version>
329330
</dependency>
331+
<dependency>
332+
<groupId>com.squareup.retrofit2</groupId>
333+
<artifactId>retrofit</artifactId>
334+
<version>${retrofit.version}</version>
335+
</dependency>
336+
<dependency>
337+
<groupId>com.squareup.retrofit2</groupId>
338+
<artifactId>converter-jackson</artifactId>
339+
<version>${retrofit.version}</version>
340+
</dependency>
341+
<dependency>
342+
<groupId>com.squareup.retrofit2</groupId>
343+
<artifactId>converter-scalars</artifactId>
344+
<version>${retrofit.version}</version>
345+
</dependency>
330346
<dependency>
331347
<groupId>org.bouncycastle</groupId>
332348
<artifactId>bcpkix-jdk15on</artifactId>
@@ -686,6 +702,11 @@
686702
<artifactId>jersey-client</artifactId>
687703
<version>${jersey.version}</version>
688704
</dependency>
705+
<dependency>
706+
<groupId>org.glassfish.jersey.media</groupId>
707+
<artifactId>jersey-media-multipart</artifactId>
708+
<version>${jersey.version}</version>
709+
</dependency>
689710
<dependency>
690711
<groupId>javax.ws.rs</groupId>
691712
<artifactId>javax.ws.rs-api</artifactId>

resource-managers/kubernetes/core/pom.xml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,31 @@
6060
<groupId>com.netflix.feign</groupId>
6161
<artifactId>feign-okhttp</artifactId>
6262
</dependency>
63+
<dependency>
64+
<groupId>org.glassfish.jersey.containers</groupId>
65+
<artifactId>jersey-container-servlet</artifactId>
66+
</dependency>
67+
<dependency>
68+
<groupId>org.glassfish.jersey.media</groupId>
69+
<artifactId>jersey-media-multipart</artifactId>
70+
</dependency>
6371
<dependency>
6472
<groupId>com.netflix.feign</groupId>
6573
<artifactId>feign-jackson</artifactId>
6674
</dependency>
75+
<dependency>
76+
<groupId>com.squareup.retrofit2</groupId>
77+
<artifactId>retrofit</artifactId>
78+
</dependency>
79+
<dependency>
80+
<groupId>com.squareup.retrofit2</groupId>
81+
<artifactId>converter-jackson</artifactId>
82+
</dependency>
83+
<dependency>
84+
<groupId>com.squareup.retrofit2</groupId>
85+
<artifactId>converter-scalars</artifactId>
86+
</dependency>
87+
6788
<dependency>
6889
<groupId>com.netflix.feign</groupId>
6990
<artifactId>feign-jaxrs</artifactId>
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.rest.kubernetes.v2
18+
19+
import com.fasterxml.jackson.databind.ObjectMapper
20+
import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider
21+
import com.fasterxml.jackson.module.scala.DefaultScalaModule
22+
import org.eclipse.jetty.server.{Server, ServerConnector}
23+
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
24+
import org.eclipse.jetty.util.thread.QueuedThreadPool
25+
import org.glassfish.jersey.media.multipart.MultiPartFeature
26+
import org.glassfish.jersey.server.ResourceConfig
27+
import org.glassfish.jersey.servlet.ServletContainer
28+
29+
private[spark] class ResourceStagingServer(
30+
port: Int,
31+
serviceInstance: ResourceStagingService) {
32+
33+
private var jettyServer: Option[Server] = None
34+
35+
def start(): Unit = synchronized {
36+
val threadPool = new QueuedThreadPool
37+
val contextHandler = new ServletContextHandler()
38+
val jsonProvider = new JacksonJaxbJsonProvider()
39+
jsonProvider.setMapper(new ObjectMapper().registerModule(new DefaultScalaModule))
40+
val resourceConfig = new ResourceConfig().registerInstances(
41+
serviceInstance,
42+
jsonProvider,
43+
new MultiPartFeature)
44+
val servletHolder = new ServletHolder("main", new ServletContainer(resourceConfig))
45+
contextHandler.setContextPath("/api/")
46+
contextHandler.addServlet(servletHolder, "/*")
47+
threadPool.setDaemon(true)
48+
val server = new Server(threadPool)
49+
val connector = new ServerConnector(server)
50+
connector.setPort(port)
51+
server.addConnector(connector)
52+
server.setHandler(contextHandler)
53+
server.start()
54+
jettyServer = Some(server)
55+
}
56+
57+
def stop(): Unit = synchronized {
58+
jettyServer.foreach(_.stop())
59+
jettyServer = None
60+
}
61+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.rest.kubernetes.v2
18+
19+
import java.io.InputStream
20+
import javax.ws.rs.{Consumes, GET, HeaderParam, Path, PathParam, POST, Produces}
21+
import javax.ws.rs.core.{MediaType, StreamingOutput}
22+
23+
import org.glassfish.jersey.media.multipart.FormDataParam
24+
25+
import org.apache.spark.deploy.rest.KubernetesCredentials
26+
27+
/**
28+
* Service that receives application data that can be retrieved later on. This is primarily used
29+
* in the context of Spark, but the concept is generic enough to be used for arbitrary applications.
30+
* The use case is to have a place for Kubernetes application submitters to bootstrap dynamic,
31+
* heavyweight application data for pods. Application submitters may have data stored on their
32+
* local disks that they want to provide to the pods they create through the API server. ConfigMaps
33+
* are one way to provide this data, but the data in ConfigMaps are stored in etcd which cannot
34+
* maintain data in the hundreds of megabytes in size.
35+
* <p>
36+
* The general use case is for an application submitter to ship the dependencies to the server via
37+
* {@link uploadResources}; the application submitter will then receive a unique secure token.
38+
* The application submitter then ought to convert the token into a secret, and use this secret in
39+
* a pod that fetches the uploaded dependencies via {@link downloadResources}. An application can
40+
* provide multiple resource bundles simply by hitting the upload endpoint multiple times and
41+
* downloading each bundle with the appropriate secret.
42+
*/
43+
@Path("/v0")
44+
private[spark] trait ResourceStagingService {
45+
46+
/**
47+
* Register a resource with the dependency service, so that pods with the given labels can
48+
* retrieve them when they run.
49+
*
50+
* @param resources Application resources to upload, compacted together in tar + gzip format.
51+
* The tarball should contain the files laid out in a flat hierarchy, without
52+
* any directories. We take a stream here to avoid holding these entirely in
53+
* memory.
54+
* @param podLabels Labels of pods to monitor. When no more pods are running with the given label,
55+
* after some period of time, these dependencies will be cleared.
56+
* @param podNamespace Namespace of pods to monitor.
57+
* @param kubernetesCredentials These credentials are primarily used to monitor the progress of
58+
* the application. When the application shuts down normally, shuts
59+
* down abnormally and does not restart, or fails to start entirely,
60+
* the data uploaded through this endpoint is cleared.
61+
* @return A unique token that should be provided when retrieving these dependencies later.
62+
*/
63+
@POST
64+
@Consumes(Array(MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN))
65+
@Produces(Array(MediaType.APPLICATION_JSON))
66+
@Path("/resources")
67+
def uploadResources(
68+
@FormDataParam("podLabels") podLabels: Map[String, String],
69+
@FormDataParam("podNamespace") podNamespace: String,
70+
@FormDataParam("resources") resources: InputStream,
71+
@FormDataParam("kubernetesCredentials") kubernetesCredentials: KubernetesCredentials)
72+
: StagedResourceIdentifier
73+
74+
/**
75+
* Download an application's resources. The resources are provided as a stream, where the stream's
76+
* underlying data matches the stream that was uploaded in uploadResources.
77+
*/
78+
@GET
79+
@Consumes(Array(MediaType.APPLICATION_JSON))
80+
@Produces(Array(MediaType.APPLICATION_OCTET_STREAM))
81+
@Path("/resources/{resourceId}")
82+
def downloadResources(
83+
@PathParam("resourceId") resourceId: String,
84+
@HeaderParam("Authorization") resourceSecret: String): StreamingOutput
85+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.rest.kubernetes.v2
18+
19+
import java.io.{File, FileOutputStream, InputStream, OutputStream}
20+
import java.security.SecureRandom
21+
import java.util.UUID
22+
import javax.ws.rs.{NotAuthorizedException, NotFoundException}
23+
import javax.ws.rs.core.StreamingOutput
24+
25+
import com.google.common.io.{BaseEncoding, ByteStreams, Files}
26+
import scala.collection.concurrent.TrieMap
27+
28+
import org.apache.spark.SparkException
29+
import org.apache.spark.deploy.rest.KubernetesCredentials
30+
import org.apache.spark.internal.Logging
31+
import org.apache.spark.util.Utils
32+
33+
private[spark] class ResourceStagingServiceImpl(dependenciesRootDir: File)
34+
extends ResourceStagingService with Logging {
35+
36+
private val SECURE_RANDOM = new SecureRandom()
37+
// TODO clean up these resources based on the driver's lifecycle
38+
private val stagedResources = TrieMap.empty[String, StagedResources]
39+
40+
override def uploadResources(
41+
podLabels: Map[String, String],
42+
podNamespace: String,
43+
resources: InputStream,
44+
kubernetesCredentials: KubernetesCredentials): StagedResourceIdentifier = {
45+
val resourceId = UUID.randomUUID().toString
46+
val secretBytes = new Array[Byte](1024)
47+
SECURE_RANDOM.nextBytes(secretBytes)
48+
val resourceSecret = resourceId + "-" + BaseEncoding.base64().encode(secretBytes)
49+
50+
val namespaceDir = new File(dependenciesRootDir, podNamespace)
51+
val resourcesDir = new File(namespaceDir, resourceId)
52+
try {
53+
if (!resourcesDir.exists()) {
54+
if (!resourcesDir.mkdirs()) {
55+
throw new SparkException("Failed to create dependencies directory for application" +
56+
s" at ${resourcesDir.getAbsolutePath}")
57+
}
58+
}
59+
// TODO encrypt the written data with the secret.
60+
val resourcesTgz = new File(resourcesDir, "resources.data")
61+
Utils.tryWithResource(new FileOutputStream(resourcesTgz)) { ByteStreams.copy(resources, _) }
62+
stagedResources(resourceId) = StagedResources(
63+
resourceSecret,
64+
podLabels,
65+
podNamespace,
66+
resourcesTgz,
67+
kubernetesCredentials)
68+
StagedResourceIdentifier(resourceId, resourceSecret)
69+
} catch {
70+
case e: Throwable =>
71+
if (!resourcesDir.delete()) {
72+
logWarning(s"Failed to delete application directory $resourcesDir.")
73+
}
74+
throw e
75+
}
76+
}
77+
78+
override def downloadResources(resourceId: String, resourceSecret: String): StreamingOutput = {
79+
val resource = stagedResources
80+
.get(resourceId)
81+
.getOrElse(throw new NotFoundException(s"No resource bundle found with id $resourceId"))
82+
if (!resource.resourceSecret.equals(resourceSecret)) {
83+
throw new NotAuthorizedException(s"Unauthorized to download resource with id $resourceId")
84+
}
85+
new StreamingOutput {
86+
override def write(outputStream: OutputStream) = {
87+
Files.copy(resource.resourcesFile, outputStream)
88+
}
89+
}
90+
}
91+
}
92+
93+
private case class StagedResources(
94+
resourceSecret: String,
95+
podLabels: Map[String, String],
96+
podNamespace: String,
97+
resourcesFile: File,
98+
kubernetesCredentials: KubernetesCredentials)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.rest.kubernetes.v2
18+
19+
import okhttp3.{RequestBody, ResponseBody}
20+
import retrofit2.Call
21+
import retrofit2.http.{Multipart, Path, Streaming}
22+
23+
/**
24+
* Retrofit-compatible variant of {@link ResourceStagingService}. For documentation on
25+
* how to use this service, see the aforementioned JAX-RS based interface.
26+
*/
27+
private[spark] trait ResourceStagingServiceRetrofit {
28+
29+
@Multipart
30+
@retrofit2.http.POST("/api/v0/resources/")
31+
def uploadResources(
32+
@retrofit2.http.Part("podLabels") podLabels: RequestBody,
33+
@retrofit2.http.Part("podNamespace") podNamespace: RequestBody,
34+
@retrofit2.http.Part("resources") resources: RequestBody,
35+
@retrofit2.http.Part("kubernetesCredentials")
36+
kubernetesCredentials: RequestBody): Call[StagedResourceIdentifier]
37+
38+
@Streaming
39+
@retrofit2.http.GET("/api/v0/resources/{resourceId}")
40+
def downloadResources(@Path("resourceId") resourceId: String,
41+
@retrofit2.http.Header("Authorization") resourceSecret: String): Call[ResponseBody]
42+
}

0 commit comments

Comments
 (0)