Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

private[spark] object OptionRequirements {

def requireBothOrNeitherDefined(
opt1: Option[_],
opt2: Option[_],
errMessageWhenFirstIsMissing: String,
errMessageWhenSecondIsMissing: String): Unit = {
requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing)
requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing)
}

def requireSecondIfFirstIsDefined(
opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = {
opt1.foreach { _ =>
require(opt2.isDefined, errMessageWhenSecondIsMissing)
}
}

def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ package object config extends Logging {
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_SSL_NAMESPACE = "kubernetes.resourceStagingServer"
private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE =
"kubernetes.resourceStagingServer.internal"
private[spark] val RESOURCE_STAGING_SERVER_CERT_PEM =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.serverCertPem")
.doc("Certificate PEM file to use when having the resource staging server" +
Expand All @@ -370,47 +372,98 @@ package object config extends Logging {
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.clientCertPem")
.doc("Certificate PEM file to use when the client contacts the resource staging server.")
.doc("Certificate PEM file to use when the client contacts the resource staging server." +
" This must strictly be a path to a file on the submitting machine's disk.")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with a scheme like file:// or without?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Strictly a path" I think implies no scheme.

.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_CLIENT_CERT_PEM =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE.clientCertPem")
.doc("Certificate PEM file to use when the init-container contacts the resource staging" +
" server. If this is not provided, it defaults to the value of" +
" spark.ssl.kubernetes.resourceStagingServer.clientCertPem. This can be a URI with" +
" a scheme of local:// which denotes that the file is pre-mounted on the init-container's" +
" disk. A uri without a scheme or a scheme of file:// will result in this file being" +
" mounted from the submitting machine's disk as a secret into the pods.")
.stringConf
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_PASSWORD_FILE =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.keyStorePasswordFile")
.doc("File containing the keystore password for the Kubernetes dependency server.")
.doc("File containing the keystore password for the Kubernetes resource staging server.")
.stringConf
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_KEY_PASSWORD_FILE =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.keyPasswordFile")
.doc("File containing the key password for the Kubernetes dependency server.")
.doc("File containing the key password for the Kubernetes resource staging server.")
.stringConf
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_SSL_ENABLED =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.enabled")
.doc("Whether or not to use SSL when communicating with the dependency server.")
.doc("Whether or not to use SSL when communicating with the resource staging server.")
.booleanConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_SSL_ENABLED =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE.enabled")
.doc("Whether or not to use SSL when communicating with the resource staging server from" +
" the init-container. If this is not provided, defaults to" +
" the value of spark.ssl.kubernetes.resourceStagingServer.enabled")
.booleanConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.trustStore")
.doc("File containing the trustStore to communicate with the Kubernetes dependency server.")
.doc("File containing the trustStore to communicate with the Kubernetes dependency server." +
" This must strictly be a path on the submitting machine's disk.")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what scheme?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Strictly be a path" denotes no scheme.

.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_FILE =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE.trustStore")
.doc("File containing the trustStore to communicate with the Kubernetes dependency server" +
" from the init-container. If this is not provided, defaults to the value of" +
" spark.ssl.kubernetes.resourceStagingServer.trustStore. This can be a URI with a scheme" +
" of local:// indicating that the trustStore is pre-mounted on the init-container's" +
" disk. If no scheme, or a scheme of file:// is provided, this file is mounted from the" +
" submitting machine's disk as a Kubernetes secret into the pods.")
.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.trustStorePassword")
.doc("Password for the trustStore for talking to the dependency server.")
.doc("Password for the trustStore for communicating to the dependency server.")
.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_PASSWORD =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE.trustStorePassword")
.doc("Password for the trustStore for communicating to the dependency server from the" +
" init-container. If this is not provided, defaults to" +
" spark.ssl.kubernetes.resourceStagingServer.trustStorePassword.")
.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.trustStoreType")
.doc("Type of trustStore for communicating with the dependency server.")
.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_TYPE =
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_INTERNAL_SSL_NAMESPACE.trustStoreType")
.doc("Type of trustStore for communicating with the dependency server from the" +
" init-container. If this is not provided, defaults to" +
" spark.ssl.kubernetes.resourceStagingServer.trustStoreType")
.stringConf
.createOptional

// Driver and Init-Container parameters for submission v2
private[spark] val RESOURCE_STAGING_SERVER_URI =
ConfigBuilder("spark.kubernetes.resourceStagingServer.uri")
.doc("Base URI for the Spark resource staging server")
.doc("Base URI for the Spark resource staging server.")
.stringConf
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_INTERNAL_URI =
ConfigBuilder("spark.kubernetes.resourceStagingServer.internal.uri")
.doc("Base URI for the Spark resource staging server when the init-containers access it for" +
" downloading resources. If this is not provided, it defaults to the value provided in" +
" spark.kubernetes.resourceStagingServer.uri, the URI that the submission client uses to" +
" upload the resources from outside the cluster.")
.stringConf
.createOptional

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ package object constants {
private[spark] val INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY =
"downloadSubmittedFilesSecret"
private[spark] val INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY = "trustStore"
private[spark] val INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY = "ssl-certificate"
private[spark] val INIT_CONTAINER_CONFIG_MAP_KEY = "download-submitted-files"
private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME = "download-jars-volume"
private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME = "download-files"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
package org.apache.spark.deploy.kubernetes.submit.v2

import org.apache.spark.{SparkConf, SSLOptions}
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.rest.kubernetes.v2.RetrofitClientFactoryImpl
import org.apache.spark.util.Utils

/**
* Interface that wraps the provision of everything the submission client needs to set up the
Expand All @@ -47,10 +48,51 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
kubernetesAppId: String,
sparkJars: Seq[String],
sparkFiles: Seq[String],
resourceStagingServerSslOptions: SSLOptions)
resourceStagingServerExternalSslOptions: SSLOptions)
extends DriverInitContainerComponentsProvider {

private val maybeResourceStagingServerUri = sparkConf.get(RESOURCE_STAGING_SERVER_URI)
private val maybeResourceStagingServerInternalUri =
sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_URI)
private val maybeResourceStagingServerInternalTrustStore =
sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_FILE)
.orElse(sparkConf.get(RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE))
private val maybeResourceStagingServerInternalTrustStorePassword =
sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_PASSWORD)
.orElse(sparkConf.get(RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD))
private val maybeResourceStagingServerInternalTrustStoreType =
sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_TRUSTSTORE_TYPE)
.orElse(sparkConf.get(RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE))
private val maybeResourceStagingServerInternalClientCert =
sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_CLIENT_CERT_PEM)
.orElse(sparkConf.get(RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM))
private val resourceStagingServerInternalSslEnabled =
sparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_SSL_ENABLED)
.orElse(sparkConf.get(RESOURCE_STAGING_SERVER_SSL_ENABLED))
.getOrElse(false)

OptionRequirements.requireNandDefined(
maybeResourceStagingServerInternalClientCert,
maybeResourceStagingServerInternalTrustStore,
"Cannot provide both a certificate file and a trustStore file for init-containers to" +
" use for contacting the resource staging server over TLS.")

require(maybeResourceStagingServerInternalTrustStore.forall { trustStore =>
Option(Utils.resolveURI(trustStore).getScheme).getOrElse("file") match {
case "file" | "local" => true
case _ => false
}
}, "TrustStore URI used for contacting the resource staging server from init containers must" +
" have no scheme, or scheme file://, or scheme local://.")

require(maybeResourceStagingServerInternalClientCert.forall { trustStore =>
Option(Utils.resolveURI(trustStore).getScheme).getOrElse("file") match {
case "file" | "local" => true
case _ => false
}
}, "Client cert file URI used for contacting the resource staging server from init containers" +
" must have no scheme, or scheme file://, or scheme local://.")

private val jarsDownloadPath = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION)
private val filesDownloadPath = sparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION)
private val maybeSecretName = maybeResourceStagingServerUri.map { _ =>
Expand All @@ -71,14 +113,20 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
filesResourceId <- maybeSubmittedResourceIds.map(_.filesResourceId)
} yield {
new SubmittedDependencyInitContainerConfigPluginImpl(
stagingServerUri,
// Configure the init-container with the internal URI over the external URI.
maybeResourceStagingServerInternalUri.getOrElse(stagingServerUri),
jarsResourceId,
filesResourceId,
INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY,
INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY,
INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY,
INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH,
resourceStagingServerSslOptions)
INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY,
resourceStagingServerInternalSslEnabled,
maybeResourceStagingServerInternalTrustStore,
maybeResourceStagingServerInternalClientCert,
maybeResourceStagingServerInternalTrustStorePassword,
maybeResourceStagingServerInternalTrustStoreType,
INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH)
}
new SparkInitContainerConfigMapBuilderImpl(
sparkJars,
Expand Down Expand Up @@ -113,7 +161,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
stagingServerUri,
sparkJars,
sparkFiles,
resourceStagingServerSslOptions,
resourceStagingServerExternalSslOptions,
RetrofitClientFactoryImpl)
}
}
Expand All @@ -133,7 +181,9 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY,
INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY,
INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY,
resourceStagingServerSslOptions)
INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY,
maybeResourceStagingServerInternalTrustStore,
maybeResourceStagingServerInternalClientCert)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
*/
package org.apache.spark.deploy.kubernetes.submit.v2

import org.apache.spark.SSLOptions
import org.apache.spark.SparkException
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.internal.config.OptionalConfigEntry
import org.apache.spark.util.Utils

private[spark] trait SubmittedDependencyInitContainerConfigPlugin {
/**
Expand All @@ -34,36 +35,62 @@ private[spark] trait SubmittedDependencyInitContainerConfigPlugin {
}

private[spark] class SubmittedDependencyInitContainerConfigPluginImpl(
resourceStagingServerUri: String,
internalResourceStagingServerUri: String,
jarsResourceId: String,
filesResourceId: String,
jarsSecretKey: String,
filesSecretKey: String,
trustStoreSecretKey: String,
secretsVolumeMountPath: String,
resourceStagingServiceSslOptions: SSLOptions)
clientCertSecretKey: String,
resourceStagingServerSslEnabled: Boolean,
maybeInternalTrustStoreUri: Option[String],
maybeInternalClientCertUri: Option[String],
maybeInternalTrustStorePassword: Option[String],
maybeInternalTrustStoreType: Option[String],
secretsVolumeMountPath: String)
extends SubmittedDependencyInitContainerConfigPlugin {

override def configurationsToFetchSubmittedDependencies(): Map[String, String] = {
Map[String, String](
RESOURCE_STAGING_SERVER_URI.key -> resourceStagingServerUri,
RESOURCE_STAGING_SERVER_URI.key -> internalResourceStagingServerUri,
INIT_CONTAINER_DOWNLOAD_JARS_RESOURCE_IDENTIFIER.key -> jarsResourceId,
INIT_CONTAINER_DOWNLOAD_JARS_SECRET_LOCATION.key ->
s"$secretsVolumeMountPath/$jarsSecretKey",
INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER.key -> filesResourceId,
INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION.key ->
s"$secretsVolumeMountPath/$filesSecretKey",
RESOURCE_STAGING_SERVER_SSL_ENABLED.key ->
resourceStagingServiceSslOptions.enabled.toString) ++
resourceStagingServiceSslOptions.trustStore.map { _ =>
(RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE.key,
s"$secretsVolumeMountPath/$trustStoreSecretKey")
}.toMap ++
resourceStagingServiceSslOptions.trustStorePassword.map { password =>
RESOURCE_STAGING_SERVER_SSL_ENABLED.key -> resourceStagingServerSslEnabled.toString) ++
resolveSecretPath(
maybeInternalTrustStoreUri,
trustStoreSecretKey,
RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE,
"TrustStore URI") ++
resolveSecretPath(
maybeInternalClientCertUri,
clientCertSecretKey,
RESOURCE_STAGING_SERVER_CLIENT_CERT_PEM,
"Client certificate URI") ++
maybeInternalTrustStorePassword.map { password =>
(RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD.key, password)
}.toMap ++
resourceStagingServiceSslOptions.trustStoreType.map { storeType =>
maybeInternalTrustStoreType.map { storeType =>
(RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE.key, storeType)
}.toMap
}

private def resolveSecretPath(
maybeUri: Option[String],
secretKey: String,
configEntry: OptionalConfigEntry[String],
uriType: String): Map[String, String] = {
maybeUri.map(Utils.resolveURI).map { uri =>
val resolvedPath = Option(uri.getScheme).getOrElse("file") match {
case "file" => s"$secretsVolumeMountPath/$secretKey"
case "local" => uri.getPath
case invalid => throw new SparkException(s"$uriType has invalid scheme $invalid must be" +
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe in a fixup commit after this merges can add "has invalid scheme for secret" to give end user just a bit more context

s" local://, file://, or empty.")
}
(configEntry.key, resolvedPath)
}.toMap
}
}
Loading