Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 36b6775

Browse files
committed
Address some missed comments
1 parent 0c198cb commit 36b6775

File tree

2 files changed

+95
-81
lines changed

2 files changed

+95
-81
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ private[spark] class Client(
161161
driverServiceManager.handleSubmissionError(
162162
new SparkException("Submission shutting down early...")))
163163
try {
164-
val sslConfigurationProvider = new SslConfigurationProvider(
164+
val sslConfigurationProvider = new DriverSubmitSslConfigurationProvider(
165165
sparkConf, kubernetesAppId, kubernetesClient, kubernetesResourceCleaner)
166166
val submitServerSecret = kubernetesClient.secrets().createNew()
167167
.withNewMetadata()
@@ -182,7 +182,7 @@ private[spark] class Client(
182182
configureOwnerReferences(
183183
kubernetesClient,
184184
submitServerSecret,
185-
sslConfiguration.sslSecrets,
185+
sslConfiguration.sslSecret,
186186
driverPod,
187187
driverService)
188188
submitApplicationToDriverServer(
@@ -233,13 +233,13 @@ private[spark] class Client(
233233
}
234234

235235
private def submitApplicationToDriverServer(
236-
kubernetesClient: KubernetesClient,
237-
driverServiceManager: DriverServiceManager,
238-
sslConfiguration: SslConfiguration,
239-
driverService: Service,
240-
submitterLocalFiles: Iterable[String],
241-
submitterLocalJars: Iterable[String],
242-
driverPodKubernetesCredentials: KubernetesCredentials): Unit = {
236+
kubernetesClient: KubernetesClient,
237+
driverServiceManager: DriverServiceManager,
238+
sslConfiguration: DriverSubmitSslConfiguration,
239+
driverService: Service,
240+
submitterLocalFiles: Iterable[String],
241+
submitterLocalJars: Iterable[String],
242+
driverPodKubernetesCredentials: KubernetesCredentials): Unit = {
243243
sparkConf.getOption("spark.app.id").foreach { id =>
244244
logWarning(s"Warning: Provided app id in spark.app.id as $id will be" +
245245
s" overridden as $kubernetesAppId")
@@ -297,7 +297,7 @@ private[spark] class Client(
297297
customLabels: Map[String, String],
298298
customAnnotations: Map[String, String],
299299
submitServerSecret: Secret,
300-
sslConfiguration: SslConfiguration): (Pod, Service) = {
300+
sslConfiguration: DriverSubmitSslConfiguration): (Pod, Service) = {
301301
val driverKubernetesSelectors = (Map(
302302
SPARK_DRIVER_LABEL -> kubernetesAppId,
303303
SPARK_APP_ID_LABEL -> kubernetesAppId,
@@ -348,7 +348,7 @@ private[spark] class Client(
348348
private def configureOwnerReferences(
349349
kubernetesClient: KubernetesClient,
350350
submitServerSecret: Secret,
351-
sslSecrets: Option[Secret],
351+
sslSecret: Option[Secret],
352352
driverPod: Pod,
353353
driverService: Service): Service = {
354354
val driverPodOwnerRef = new OwnerReferenceBuilder()
@@ -358,7 +358,7 @@ private[spark] class Client(
358358
.withKind(driverPod.getKind)
359359
.withController(true)
360360
.build()
361-
sslSecrets.foreach(secret => {
361+
sslSecret.foreach(secret => {
362362
val updatedSecret = kubernetesClient.secrets().withName(secret.getMetadata.getName).edit()
363363
.editMetadata()
364364
.addToOwnerReferences(driverPodOwnerRef)
@@ -424,7 +424,7 @@ private[spark] class Client(
424424
driverKubernetesSelectors: Map[String, String],
425425
customAnnotations: Map[String, String],
426426
submitServerSecret: Secret,
427-
sslConfiguration: SslConfiguration): Pod = {
427+
sslConfiguration: DriverSubmitSslConfiguration): Pod = {
428428
val containerPorts = buildContainerPorts()
429429
val probePingHttpGet = new HTTPGetActionBuilder()
430430
.withScheme(if (sslConfiguration.enabled) "HTTPS" else "HTTP")
@@ -660,7 +660,7 @@ private[spark] class Client(
660660
kubernetesClient: KubernetesClient,
661661
driverServiceManager: DriverServiceManager,
662662
service: Service,
663-
sslConfiguration: SslConfiguration): KubernetesSparkRestApi = {
663+
sslConfiguration: DriverSubmitSslConfiguration): KubernetesSparkRestApi = {
664664
val serviceUris = driverServiceManager.getDriverServiceSubmissionServerUris(service)
665665
require(serviceUris.nonEmpty, "No uris found to contact the driver!")
666666
HttpClientUtil.createClient[KubernetesSparkRestApi](
Lines changed: 81 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -33,32 +33,44 @@ import org.apache.spark.deploy.rest.kubernetes.{KubernetesFileUtils, PemsToKeySt
3333
import org.apache.spark.util.Utils
3434

3535
/**
36-
* Raw SSL configuration as the user specified in SparkConf.
36+
* Raw SSL configuration as the user specified in SparkConf for setting up the driver
37+
* submission server.
3738
*/
38-
private case class SslConfigurationParameters(
39-
storeBasedSslOptions: SSLOptions,
40-
isKeyStoreLocalFile: Boolean,
41-
keyPem: Option[File],
42-
isKeyPemLocalFile: Boolean,
43-
serverCertPem: Option[File],
44-
isServerCertPemLocalFile: Boolean,
45-
clientCertPem: Option[File])
39+
private case class DriverSubmitSslConfigurationParameters(
40+
storeBasedSslOptions: SSLOptions,
41+
isKeyStoreLocalFile: Boolean,
42+
driverSubmitServerKeyPem: Option[File],
43+
isDriverSubmitKeyPemLocalFile: Boolean,
44+
driverSubmitServerCertPem: Option[File],
45+
isDriverSubmitServerCertPemLocalFile: Boolean,
46+
submissionClientCertPem: Option[File])
4647

4748
/**
48-
* Resolved from translating options provided in {@link SslConfigurationParameters} into
49-
* Kubernetes volumes, environment variables for the driver pod, client-side trust managers,
50-
* and the client-side SSL context.
49+
* Resolved from translating options provided in
50+
* {@link DriverSubmitSslConfigurationParameters} into Kubernetes volumes, environment variables
51+
* for the driver pod, client-side trust managers, and the client-side SSL context. This is used
52+
* for setting up the SSL connection for the submission server where the application local
53+
* dependencies and configuration is provided from.
5154
*/
52-
private[spark] case class SslConfiguration(
53-
enabled: Boolean,
54-
sslPodEnvVars: Array[EnvVar],
55-
sslPodVolume: Option[Volume],
56-
sslPodVolumeMount: Option[VolumeMount],
57-
sslSecrets: Option[Secret],
58-
driverSubmitClientTrustManager: Option[X509TrustManager],
59-
driverSubmitClientSslContext: SSLContext)
55+
private[spark] case class DriverSubmitSslConfiguration(
56+
enabled: Boolean,
57+
sslPodEnvVars: Array[EnvVar],
58+
sslPodVolume: Option[Volume],
59+
sslPodVolumeMount: Option[VolumeMount],
60+
sslSecret: Option[Secret],
61+
driverSubmitClientTrustManager: Option[X509TrustManager],
62+
driverSubmitClientSslContext: SSLContext)
6063

61-
private[spark] class SslConfigurationProvider(
64+
/**
65+
* Provides the SSL configuration for bootstrapping the driver pod to listen for the driver
66+
* submission over SSL, and then supply the client-side configuration for establishing the
67+
* SSL connection. This is done in two phases: first, interpreting the raw configuration
68+
* values from the SparkConf object; then second, converting the configuration into the
69+
* appropriate Kubernetes constructs, namely the volume and volume mount to add to the
70+
* driver pod, and the secret to create at the API server; and finally, constructing the
71+
* client-side trust manager and SSL context for sending the local dependencies.
72+
*/
73+
private[spark] class DriverSubmitSslConfigurationProvider(
6274
sparkConf: SparkConf,
6375
kubernetesAppId: String,
6476
kubernetesClient: KubernetesClient,
@@ -68,17 +80,17 @@ private[spark] class SslConfigurationProvider(
6880
private val sslSecretsDirectory = DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR +
6981
s"/$kubernetesAppId-ssl"
7082

71-
def getSslConfiguration(): SslConfiguration = {
72-
val driverSubmitSslOptions = parseDriverSubmitSslOptions()
73-
if (driverSubmitSslOptions.storeBasedSslOptions.enabled) {
74-
val storeBasedSslOptions = driverSubmitSslOptions.storeBasedSslOptions
83+
def getSslConfiguration(): DriverSubmitSslConfiguration = {
84+
val sslConfigurationParameters = parseSslConfigurationParameters()
85+
if (sslConfigurationParameters.storeBasedSslOptions.enabled) {
86+
val storeBasedSslOptions = sslConfigurationParameters.storeBasedSslOptions
7587
val keyStoreSecret = resolveFileToSecretMapping(
76-
driverSubmitSslOptions.isKeyStoreLocalFile,
88+
sslConfigurationParameters.isKeyStoreLocalFile,
7789
SUBMISSION_SSL_KEYSTORE_SECRET_NAME,
7890
storeBasedSslOptions.keyStore,
7991
"KeyStore")
8092
val keyStorePathEnv = resolveFilePathEnv(
81-
driverSubmitSslOptions.isKeyStoreLocalFile,
93+
sslConfigurationParameters.isKeyStoreLocalFile,
8294
ENV_SUBMISSION_KEYSTORE_FILE,
8395
SUBMISSION_SSL_KEYSTORE_SECRET_NAME,
8496
storeBasedSslOptions.keyStore)
@@ -109,25 +121,25 @@ private[spark] class SslConfigurationProvider(
109121
.build()
110122
})
111123
val keyPemSecret = resolveFileToSecretMapping(
112-
driverSubmitSslOptions.isKeyPemLocalFile,
124+
sslConfigurationParameters.isDriverSubmitKeyPemLocalFile,
113125
secretName = SUBMISSION_SSL_KEY_PEM_SECRET_NAME,
114126
secretType = "Key pem",
115-
secretFile = driverSubmitSslOptions.keyPem)
127+
secretFile = sslConfigurationParameters.driverSubmitServerKeyPem)
116128
val keyPemLocationEnv = resolveFilePathEnv(
117-
driverSubmitSslOptions.isKeyPemLocalFile,
129+
sslConfigurationParameters.isDriverSubmitKeyPemLocalFile,
118130
envName = ENV_SUBMISSION_KEY_PEM_FILE,
119131
secretName = SUBMISSION_SSL_KEY_PEM_SECRET_NAME,
120-
maybeFile = driverSubmitSslOptions.keyPem)
132+
maybeFile = sslConfigurationParameters.driverSubmitServerKeyPem)
121133
val certPemSecret = resolveFileToSecretMapping(
122-
driverSubmitSslOptions.isServerCertPemLocalFile,
134+
sslConfigurationParameters.isDriverSubmitServerCertPemLocalFile,
123135
secretName = SUBMISSION_SSL_CERT_PEM_SECRET_NAME,
124136
secretType = "Cert pem",
125-
secretFile = driverSubmitSslOptions.serverCertPem)
137+
secretFile = sslConfigurationParameters.driverSubmitServerCertPem)
126138
val certPemLocationEnv = resolveFilePathEnv(
127-
driverSubmitSslOptions.isServerCertPemLocalFile,
139+
sslConfigurationParameters.isDriverSubmitServerCertPemLocalFile,
128140
envName = ENV_SUBMISSION_CERT_PEM_FILE,
129141
secretName = SUBMISSION_SSL_CERT_PEM_SECRET_NAME,
130-
maybeFile = driverSubmitSslOptions.serverCertPem)
142+
maybeFile = sslConfigurationParameters.driverSubmitServerCertPem)
131143
val useSslEnv = new EnvVarBuilder()
132144
.withName(ENV_SUBMISSION_USE_SSL)
133145
.withValue("true")
@@ -164,8 +176,8 @@ private[spark] class SslConfigurationProvider(
164176
Array(useSslEnv) ++
165177
certPemLocationEnv
166178
val (driverSubmitClientTrustManager, driverSubmitClientSslContext) =
167-
buildSslConnectionConfiguration(driverSubmitSslOptions)
168-
SslConfiguration(
179+
buildSslConnectionConfiguration(sslConfigurationParameters)
180+
DriverSubmitSslConfiguration(
169181
true,
170182
allSslEnvs.toArray,
171183
Some(sslVolume),
@@ -174,7 +186,7 @@ private[spark] class SslConfigurationProvider(
174186
driverSubmitClientTrustManager,
175187
driverSubmitClientSslContext)
176188
} else {
177-
SslConfiguration(
189+
DriverSubmitSslConfiguration(
178190
false,
179191
Array[EnvVar](),
180192
None,
@@ -218,47 +230,48 @@ private[spark] class SslConfigurationProvider(
218230
}).toMap
219231
}
220232

221-
private def parseDriverSubmitSslOptions(): SslConfigurationParameters = {
233+
private def parseSslConfigurationParameters(): DriverSubmitSslConfigurationParameters = {
222234
val maybeKeyStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_SSL_KEYSTORE)
223235
val maybeTrustStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_SSL_TRUSTSTORE)
224236
val maybeKeyPem = sparkConf.get(DRIVER_SUBMIT_SSL_KEY_PEM)
225-
val maybeServerCertPem = sparkConf.get(DRIVER_SUBMIT_SSL_SERVER_CERT_PEM)
226-
val maybeClientCertPem = sparkConf.get(DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM)
237+
val maybeDriverSubmitServerCertPem = sparkConf.get(DRIVER_SUBMIT_SSL_SERVER_CERT_PEM)
238+
val maybeDriverSubmitClientCertPem = sparkConf.get(DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM)
227239
validatePemsDoNotConflictWithStores(
228240
maybeKeyStore,
229241
maybeTrustStore,
230242
maybeKeyPem,
231-
maybeServerCertPem,
232-
maybeClientCertPem)
243+
maybeDriverSubmitServerCertPem,
244+
maybeDriverSubmitClientCertPem)
233245
val resolvedSparkConf = sparkConf.clone()
234246
val (isLocalKeyStore, resolvedKeyStore) = resolveLocalFile(maybeKeyStore, "keyStore")
235247
resolvedKeyStore.foreach {
236248
resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_SSL_KEYSTORE, _)
237249
}
238-
val (isLocalServerCertPem, resolvedServerCertPem): (Boolean, Option[String]) =
239-
resolveLocalFile(maybeServerCertPem, "server cert PEM")
250+
val (isLocalDriverSubmitServerCertPem, resolvedDriverSubmitServerCertPem) =
251+
resolveLocalFile(maybeDriverSubmitServerCertPem, "server cert PEM")
240252
val (isLocalKeyPem, resolvedKeyPem) = resolveLocalFile(maybeKeyPem, "key PEM")
241253
maybeTrustStore.foreach { trustStore =>
242254
require(KubernetesFileUtils.isUriLocalFile(trustStore), s"Invalid trustStore URI" +
243-
s"$trustStore; trustStore URI for submit server must have no scheme, or scheme file://")
255+
s" $trustStore; trustStore URI for submit server must have no scheme, or scheme file://")
244256
resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_SSL_TRUSTSTORE,
245257
Utils.resolveURI(trustStore).getPath)
246258
}
247-
val clientCertPem: Option[String] = maybeClientCertPem.map { clientCert =>
248-
require(KubernetesFileUtils.isUriLocalFile(clientCert), "Invalid client certificate PEM URI" +
249-
s" $clientCert: client certificate URI must have no scheme, or scheme file://")
250-
Utils.resolveURI(clientCert).getPath
259+
val driverSubmitClientCertPem = maybeDriverSubmitClientCertPem.map { driverSubmitClientCert =>
260+
require(KubernetesFileUtils.isUriLocalFile(driverSubmitClientCert),
261+
"Invalid client certificate PEM URI $driverSubmitClientCert: client certificate URI must" +
262+
" have no scheme, or scheme file://")
263+
Utils.resolveURI(driverSubmitClientCert).getPath
251264
}
252265
val securityManager = new SparkSecurityManager(resolvedSparkConf)
253266
val storeBasedSslOptions = securityManager.getSSLOptions(DRIVER_SUBMIT_SSL_NAMESPACE)
254-
SslConfigurationParameters(
267+
DriverSubmitSslConfigurationParameters(
255268
storeBasedSslOptions,
256269
isLocalKeyStore,
257270
resolvedKeyPem.map(new File(_)),
258271
isLocalKeyPem,
259-
resolvedServerCertPem.map(new File(_)),
260-
isLocalServerCertPem,
261-
clientCertPem.map(new File(_)))
272+
resolvedDriverSubmitServerCertPem.map(new File(_)),
273+
isLocalDriverSubmitServerCertPem,
274+
driverSubmitClientCertPem.map(new File(_)))
262275
}
263276

264277
private def resolveLocalFile(file: Option[String],
@@ -275,22 +288,22 @@ private[spark] class SslConfigurationProvider(
275288
maybeKeyStore: Option[String],
276289
maybeTrustStore: Option[String],
277290
maybeKeyPem: Option[String],
278-
maybeServerCertPem: Option[String],
279-
maybeClientCertPem: Option[String]) = {
280-
maybeKeyPem.orElse(maybeServerCertPem).foreach { _ =>
291+
maybeDriverSubmitServerCertPem: Option[String],
292+
maybeSubmitClientCertPem: Option[String]) = {
293+
maybeKeyPem.orElse(maybeDriverSubmitServerCertPem).foreach { _ =>
281294
require(maybeKeyStore.isEmpty,
282295
"Cannot specify server PEM files and key store files; must specify only one or the other.")
283296
}
284297
maybeKeyPem.foreach { _ =>
285-
require(maybeServerCertPem.isDefined,
298+
require(maybeDriverSubmitServerCertPem.isDefined,
286299
"When specifying the key PEM file, the server certificate PEM file must also be provided.")
287300
}
288-
maybeServerCertPem.foreach { _ =>
301+
maybeDriverSubmitServerCertPem.foreach { _ =>
289302
require(maybeKeyPem.isDefined,
290303
"When specifying the server certificate PEM file, the key PEM file must also be provided.")
291304
}
292305
maybeTrustStore.foreach { _ =>
293-
require(maybeClientCertPem.isEmpty,
306+
require(maybeSubmitClientCertPem.isEmpty,
294307
"Cannot specify client cert file and truststore file; must specify only one or the other.")
295308
}
296309
}
@@ -300,24 +313,25 @@ private[spark] class SslConfigurationProvider(
300313
resolvedScheme == "file" || resolvedScheme == "local"
301314
}
302315

303-
private def buildSslConnectionConfiguration(driverSubmitSslOptions: SslConfigurationParameters):
304-
(Option[X509TrustManager], SSLContext) = {
305-
val maybeTrustStore = driverSubmitSslOptions.clientCertPem.map { certPem =>
316+
private def buildSslConnectionConfiguration(
317+
sslConfigurationParameters: DriverSubmitSslConfigurationParameters)
318+
: (Option[X509TrustManager], SSLContext) = {
319+
val maybeTrustStore = sslConfigurationParameters.submissionClientCertPem.map { certPem =>
306320
PemsToKeyStoreConverter.convertCertPemToTrustStore(
307321
certPem,
308-
driverSubmitSslOptions.storeBasedSslOptions.trustStoreType)
309-
}.orElse(driverSubmitSslOptions.storeBasedSslOptions.trustStore.map { trustStoreFile =>
322+
sslConfigurationParameters.storeBasedSslOptions.trustStoreType)
323+
}.orElse(sslConfigurationParameters.storeBasedSslOptions.trustStore.map { trustStoreFile =>
310324
if (!trustStoreFile.isFile) {
311325
throw new SparkException(s"TrustStore file at ${trustStoreFile.getAbsolutePath}" +
312326
s" does not exist or is not a file.")
313327
}
314328
val trustStore = KeyStore.getInstance(
315-
driverSubmitSslOptions
329+
sslConfigurationParameters
316330
.storeBasedSslOptions
317331
.trustStoreType
318332
.getOrElse(KeyStore.getDefaultType))
319333
Utils.tryWithResource(new FileInputStream(trustStoreFile)) { trustStoreStream =>
320-
val trustStorePassword = driverSubmitSslOptions
334+
val trustStorePassword = sslConfigurationParameters
321335
.storeBasedSslOptions
322336
.trustStorePassword
323337
.map(_.toCharArray)

0 commit comments

Comments
 (0)