Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 4940eae

Browse files
mccheahash211
authored andcommitted
Support SSL on the file staging server (#221)
* Staging server for receiving application dependencies. * Move packages around to split between v1 work and v2 work * Add unit test for file writing * Remove unnecessary main * Allow the file staging server to be secured with TLS. * Add back license header * Minor fixes * Fix integration test with renamed package for client. Fix scalastyle. * Remove unused import * Force json serialization to consider the different package. * Revert extraneous log * Fix scalastyle * Remove getting credentials from the API We still want to post them because in the future we can use these credentials to monitor the API server and handle cleaning up the data accordingly. * Fix build * Randomize name and namespace in test to prevent collisions * Generalize to resource staging server outside of Spark * Update code documentation * Val instead of var * Fix unit tests. * Fix build * Fix naming, remove unused import * Move suites from integration test package to core * Fix unit test * Use TrieMap instead of locks * Address comments * Fix imports * Address comments * Change main object name * Change config variable names * Change paths, use POST instead of PUT * Use a resource identifier as well as a resource secret
1 parent e24c4af commit 4940eae

File tree

10 files changed

+442
-22
lines changed

10 files changed

+442
-22
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,4 +288,37 @@ package object config {
288288
.doc("Interval between reports of the current app status in cluster mode.")
289289
.timeConf(TimeUnit.MILLISECONDS)
290290
.createWithDefaultString("1s")
291+
292+
// Spark dependency server for submission v2
293+
294+
private[spark] val RESOURCE_STAGING_SERVER_PORT =
295+
ConfigBuilder("spark.kubernetes.resourceStagingServer.port")
296+
.doc("Port for the Kubernetes resource staging server to listen on.")
297+
.intConf
298+
.createWithDefault(10000)
299+
300+
private[spark] val RESOURCE_STAGING_SERVER_KEY_PEM =
301+
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.keyPem")
302+
.doc("Key PEM file to use when having the Kubernetes dependency server listen on TLS.")
303+
.stringConf
304+
.createOptional
305+
306+
private[spark] val RESOURCE_STAGING_SERVER_CERT_PEM =
307+
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.serverCertPem")
308+
.doc("Certificate PEM file to use when having the Kubernetes dependency server" +
309+
" listen on TLS.")
310+
.stringConf
311+
.createOptional
312+
313+
private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_PASSWORD_FILE =
314+
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.keyStorePasswordFile")
315+
.doc("File containing the keystore password for the Kubernetes dependency server.")
316+
.stringConf
317+
.createOptional
318+
319+
private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_KEY_PASSWORD_FILE =
320+
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.keyPasswordFile")
321+
.doc("File containing the key password for the Kubernetes dependency server.")
322+
.stringConf
323+
.createOptional
291324
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/PemsToKeyStoreConverter.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,12 @@ private[spark] object PemsToKeyStoreConverter {
6060
privateKey,
6161
keyPassword.map(_.toCharArray).orNull,
6262
certificates)
63-
val keyStoreOutputPath = Paths.get(s"keystore-${UUID.randomUUID()}.$resolvedKeyStoreType")
64-
Utils.tryWithResource(new FileOutputStream(keyStoreOutputPath.toFile)) { storeStream =>
63+
val keyStoreDir = Utils.createTempDir("temp-keystores")
64+
val keyStoreFile = new File(keyStoreDir, s"keystore-${UUID.randomUUID()}.$resolvedKeyStoreType")
65+
Utils.tryWithResource(new FileOutputStream(keyStoreFile)) { storeStream =>
6566
keyStore.store(storeStream, keyStorePassword.map(_.toCharArray).orNull)
6667
}
67-
keyStoreOutputPath.toFile
68+
keyStoreFile
6869
}
6970

7071
def convertCertPemToTrustStore(

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServer.scala

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,32 @@
1616
*/
1717
package org.apache.spark.deploy.rest.kubernetes.v2
1818

19+
import java.io.{File, FileInputStream}
20+
import java.util.Properties
21+
1922
import com.fasterxml.jackson.databind.ObjectMapper
2023
import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider
2124
import com.fasterxml.jackson.module.scala.DefaultScalaModule
22-
import org.eclipse.jetty.server.{Server, ServerConnector}
25+
import com.google.common.collect.Maps
26+
import org.eclipse.jetty.http.HttpVersion
27+
import org.eclipse.jetty.server.{HttpConfiguration, HttpConnectionFactory, Server, ServerConnector, SslConnectionFactory}
2328
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
24-
import org.eclipse.jetty.util.thread.QueuedThreadPool
29+
import org.eclipse.jetty.util.thread.{QueuedThreadPool, ScheduledExecutorScheduler}
2530
import org.glassfish.jersey.media.multipart.MultiPartFeature
2631
import org.glassfish.jersey.server.ResourceConfig
2732
import org.glassfish.jersey.servlet.ServletContainer
33+
import scala.collection.JavaConverters._
34+
35+
import org.apache.spark.SparkConf
36+
import org.apache.spark.deploy.kubernetes.config._
37+
import org.apache.spark.internal.Logging
38+
import org.apache.spark.internal.config.{ConfigReader, SparkConfigProvider}
39+
import org.apache.spark.util.Utils
2840

2941
private[spark] class ResourceStagingServer(
3042
port: Int,
31-
serviceInstance: ResourceStagingService) {
43+
serviceInstance: ResourceStagingService,
44+
sslOptionsProvider: ResourceStagingServerSslOptionsProvider) extends Logging {
3245

3346
private var jettyServer: Option[Server] = None
3447

@@ -45,17 +58,72 @@ private[spark] class ResourceStagingServer(
4558
contextHandler.setContextPath("/api/")
4659
contextHandler.addServlet(servletHolder, "/*")
4760
threadPool.setDaemon(true)
61+
val resolvedConnectionFactories = sslOptionsProvider.getSslOptions
62+
.createJettySslContextFactory()
63+
.map(sslFactory => {
64+
val sslConnectionFactory = new SslConnectionFactory(
65+
sslFactory, HttpVersion.HTTP_1_1.asString())
66+
val rawHttpConfiguration = new HttpConfiguration()
67+
rawHttpConfiguration.setSecureScheme("https")
68+
rawHttpConfiguration.setSecurePort(port)
69+
val rawHttpConnectionFactory = new HttpConnectionFactory(rawHttpConfiguration)
70+
Array(sslConnectionFactory, rawHttpConnectionFactory)
71+
}).getOrElse(Array(new HttpConnectionFactory()))
4872
val server = new Server(threadPool)
49-
val connector = new ServerConnector(server)
73+
val connector = new ServerConnector(
74+
server,
75+
null,
76+
// Call this full constructor to set this, which forces daemon threads:
77+
new ScheduledExecutorScheduler("DependencyServer-Executor", true),
78+
null,
79+
-1,
80+
-1,
81+
resolvedConnectionFactories: _*)
5082
connector.setPort(port)
5183
server.addConnector(connector)
5284
server.setHandler(contextHandler)
5385
server.start()
5486
jettyServer = Some(server)
87+
logInfo(s"Resource staging server started on port $port.")
5588
}
5689

90+
def join(): Unit = jettyServer.foreach(_.join())
91+
5792
def stop(): Unit = synchronized {
5893
jettyServer.foreach(_.stop())
5994
jettyServer = None
6095
}
6196
}
97+
98+
object ResourceStagingServer {
99+
def main(args: Array[String]): Unit = {
100+
val sparkConf = new SparkConf(true)
101+
if (args.nonEmpty) {
102+
val propertiesFile = new File(args(0))
103+
if (!propertiesFile.isFile) {
104+
throw new IllegalArgumentException(s"Server properties file given at" +
105+
s" ${propertiesFile.getAbsoluteFile} does not exist or is not a file.")
106+
}
107+
val properties = new Properties
108+
Utils.tryWithResource(new FileInputStream(propertiesFile))(properties.load)
109+
val propertiesMap = Maps.fromProperties(properties)
110+
val configReader = new ConfigReader(new SparkConfigProvider(propertiesMap))
111+
propertiesMap.asScala.keys.foreach { key =>
112+
configReader.get(key).foreach(sparkConf.set(key, _))
113+
}
114+
}
115+
val dependenciesRootDir = Utils.createTempDir(namePrefix = "local-application-dependencies")
116+
val serviceInstance = new ResourceStagingServiceImpl(dependenciesRootDir)
117+
val sslOptionsProvider = new ResourceStagingServerSslOptionsProviderImpl(sparkConf)
118+
val server = new ResourceStagingServer(
119+
port = sparkConf.get(RESOURCE_STAGING_SERVER_PORT),
120+
serviceInstance = serviceInstance,
121+
sslOptionsProvider = sslOptionsProvider)
122+
server.start()
123+
try {
124+
server.join()
125+
} finally {
126+
server.stop()
127+
}
128+
}
129+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.rest.kubernetes.v2
18+
19+
import java.io.File
20+
21+
import com.google.common.base.Charsets
22+
import com.google.common.io.Files
23+
24+
import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SparkException, SSLOptions}
25+
import org.apache.spark.deploy.kubernetes.config._
26+
import org.apache.spark.deploy.rest.kubernetes.v1.PemsToKeyStoreConverter
27+
import org.apache.spark.internal.Logging
28+
29+
private[spark] trait ResourceStagingServerSslOptionsProvider {
30+
def getSslOptions: SSLOptions
31+
}
32+
33+
private[spark] class ResourceStagingServerSslOptionsProviderImpl(sparkConf: SparkConf)
34+
extends ResourceStagingServerSslOptionsProvider with Logging {
35+
def getSslOptions: SSLOptions = {
36+
val baseSslOptions = new SparkSecurityManager(sparkConf)
37+
.getSSLOptions("kubernetes.resourceStagingServer")
38+
val maybeKeyPem = sparkConf.get(RESOURCE_STAGING_SERVER_KEY_PEM)
39+
val maybeCertPem = sparkConf.get(RESOURCE_STAGING_SERVER_CERT_PEM)
40+
val maybeKeyStorePasswordFile = sparkConf.get(RESOURCE_STAGING_SERVER_KEYSTORE_PASSWORD_FILE)
41+
val maybeKeyPasswordFile = sparkConf.get(RESOURCE_STAGING_SERVER_KEYSTORE_KEY_PASSWORD_FILE)
42+
43+
logSslConfigurations(
44+
baseSslOptions, maybeKeyPem, maybeCertPem, maybeKeyStorePasswordFile, maybeKeyPasswordFile)
45+
46+
requireNandDefined(baseSslOptions.keyStore, maybeKeyPem,
47+
"Shouldn't provide both key PEM and keyStore files for TLS.")
48+
requireNandDefined(baseSslOptions.keyStore, maybeCertPem,
49+
"Shouldn't provide both certificate PEM and keyStore files for TLS.")
50+
requireNandDefined(baseSslOptions.keyStorePassword, maybeKeyStorePasswordFile,
51+
"Shouldn't provide both the keyStore password value and the keyStore password file.")
52+
requireNandDefined(baseSslOptions.keyPassword, maybeKeyPasswordFile,
53+
"Shouldn't provide both the keyStore key password value and the keyStore key password file.")
54+
requireBothOrNeitherDefined(
55+
maybeKeyPem,
56+
maybeCertPem,
57+
"When providing a certificate PEM file, the key PEM file must also be provided.",
58+
"When providing a key PEM file, the certificate PEM file must also be provided.")
59+
60+
val resolvedKeyStorePassword = baseSslOptions.keyStorePassword
61+
.orElse(maybeKeyStorePasswordFile.map { keyStorePasswordFile =>
62+
safeFileToString(keyStorePasswordFile, "KeyStore password file")
63+
})
64+
val resolvedKeyStoreKeyPassword = baseSslOptions.keyPassword
65+
.orElse(maybeKeyPasswordFile.map { keyPasswordFile =>
66+
safeFileToString(keyPasswordFile, "KeyStore key password file")
67+
})
68+
val resolvedKeyStore = baseSslOptions.keyStore
69+
.orElse(maybeKeyPem.map { keyPem =>
70+
val keyPemFile = new File(keyPem)
71+
val certPemFile = new File(maybeCertPem.get)
72+
PemsToKeyStoreConverter.convertPemsToTempKeyStoreFile(
73+
keyPemFile,
74+
certPemFile,
75+
"key",
76+
resolvedKeyStorePassword,
77+
resolvedKeyStoreKeyPassword,
78+
baseSslOptions.keyStoreType)
79+
})
80+
baseSslOptions.copy(
81+
keyStore = resolvedKeyStore,
82+
keyStorePassword = resolvedKeyStorePassword,
83+
keyPassword = resolvedKeyStoreKeyPassword)
84+
}
85+
86+
private def logSslConfigurations(
87+
baseSslOptions: SSLOptions,
88+
maybeKeyPem: Option[String],
89+
maybeCertPem: Option[String],
90+
maybeKeyStorePasswordFile: Option[String],
91+
maybeKeyPasswordFile: Option[String]) = {
92+
logDebug("The following SSL configurations were provided for the resource staging server:")
93+
logDebug(s"KeyStore File: ${baseSslOptions.keyStore.map(_.getAbsolutePath).getOrElse("N/A")}")
94+
logDebug("KeyStore Password: " +
95+
baseSslOptions.keyStorePassword.map(_ => "<present_but_redacted>").getOrElse("N/A"))
96+
logDebug(s"KeyStore Password File: ${maybeKeyStorePasswordFile.getOrElse("N/A")}")
97+
logDebug("Key Password: " +
98+
baseSslOptions.keyPassword.map(_ => "<present_but_redacted>").getOrElse("N/A"))
99+
logDebug(s"Key Password File: ${maybeKeyPasswordFile.getOrElse("N/A")}")
100+
logDebug(s"KeyStore Type: ${baseSslOptions.keyStoreType.getOrElse("N/A")}")
101+
logDebug(s"Key PEM: ${maybeKeyPem.getOrElse("N/A")}")
102+
logDebug(s"Certificate PEM: ${maybeCertPem.getOrElse("N/A")}")
103+
}
104+
105+
private def requireBothOrNeitherDefined(
106+
opt1: Option[_],
107+
opt2: Option[_],
108+
errMessageWhenFirstIsMissing: String,
109+
errMessageWhenSecondIsMissing: String): Unit = {
110+
requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing)
111+
requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing)
112+
}
113+
114+
private def requireSecondIfFirstIsDefined(
115+
opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = {
116+
opt1.foreach { _ =>
117+
require(opt2.isDefined, errMessageWhenSecondIsMissing)
118+
}
119+
}
120+
121+
private def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
122+
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
123+
}
124+
125+
private def safeFileToString(filePath: String, fileType: String): String = {
126+
val file = new File(filePath)
127+
if (!file.isFile) {
128+
throw new SparkException(s"$fileType provided at ${file.getAbsolutePath} does not exist or"
129+
+ s" is not a file.")
130+
}
131+
Files.toString(file, Charsets.UTF_8)
132+
}
133+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/RetrofitUtils.scala

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,50 @@
1616
*/
1717
package org.apache.spark.deploy.rest.kubernetes.v2
1818

19+
import java.io.FileInputStream
20+
import java.security.{KeyStore, SecureRandom}
21+
import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager}
22+
1923
import com.fasterxml.jackson.databind.ObjectMapper
2024
import com.fasterxml.jackson.module.scala.DefaultScalaModule
25+
import okhttp3.OkHttpClient
2126
import retrofit2.Retrofit
2227
import retrofit2.converter.jackson.JacksonConverterFactory
2328
import retrofit2.converter.scalars.ScalarsConverterFactory
2429

30+
import org.apache.spark.SSLOptions
31+
import org.apache.spark.util.Utils
32+
2533
private[spark] object RetrofitUtils {
2634

2735
private val OBJECT_MAPPER = new ObjectMapper().registerModule(new DefaultScalaModule)
36+
private val SECURE_RANDOM = new SecureRandom()
2837

29-
def createRetrofitClient[T](baseUrl: String, serviceType: Class[T]): T = {
38+
def createRetrofitClient[T](baseUrl: String, serviceType: Class[T], sslOptions: SSLOptions): T = {
39+
val okHttpClientBuilder = new OkHttpClient.Builder()
40+
sslOptions.trustStore.foreach { trustStoreFile =>
41+
require(trustStoreFile.isFile, s"TrustStore provided at ${trustStoreFile.getAbsolutePath}"
42+
+ " does not exist, or is not a file.")
43+
val trustStoreType = sslOptions.trustStoreType.getOrElse(KeyStore.getDefaultType)
44+
val trustStore = KeyStore.getInstance(trustStoreType)
45+
val trustStorePassword = sslOptions.trustStorePassword.map(_.toCharArray).orNull
46+
Utils.tryWithResource(new FileInputStream(trustStoreFile)) {
47+
trustStore.load(_, trustStorePassword)
48+
}
49+
val trustManagerFactory = TrustManagerFactory.getInstance(
50+
TrustManagerFactory.getDefaultAlgorithm)
51+
trustManagerFactory.init(trustStore)
52+
val trustManagers = trustManagerFactory.getTrustManagers
53+
val sslContext = SSLContext.getInstance("TLSv1.2")
54+
sslContext.init(null, trustManagers, SECURE_RANDOM)
55+
okHttpClientBuilder.sslSocketFactory(sslContext.getSocketFactory,
56+
trustManagers(0).asInstanceOf[X509TrustManager])
57+
}
3058
new Retrofit.Builder()
3159
.baseUrl(baseUrl)
3260
.addConverterFactory(ScalarsConverterFactory.create())
3361
.addConverterFactory(JacksonConverterFactory.create(OBJECT_MAPPER))
62+
.client(okHttpClientBuilder.build())
3463
.build()
3564
.create(serviceType)
3665
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.deploy.kubernetes.integrationtest.sslutil
17+
package org.apache.spark.deploy.kubernetes
1818

1919
import java.io.{File, FileOutputStream, OutputStreamWriter}
2020
import java.math.BigInteger

0 commit comments

Comments
 (0)