Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ private[spark] class Client(
.withType("Opaque")
.done()
try {
val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(kubernetesClient,
val (sslArgs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(
kubernetesClient,
driverSubmitSslOptions,
isKeyStoreLocalFile)
try {
Expand All @@ -129,7 +130,7 @@ private[spark] class Client(
sslSecrets,
sslVolumes,
sslVolumeMounts,
sslEnvs,
sslArgs,
isKeyStoreLocalFile)
val ownerReferenceConfiguredDriverService = try {
configureOwnerReferences(
Expand Down Expand Up @@ -237,7 +238,7 @@ private[spark] class Client(
sslSecrets: Array[Secret],
sslVolumes: Array[Volume],
sslVolumeMounts: Array[VolumeMount],
sslEnvs: Array[EnvVar],
sslArgs: Array[String],
isKeyStoreLocalFile: Boolean): (Pod, Service) = {
val endpointsReadyFuture = SettableFuture.create[Endpoints]
val endpointsReadyWatcher = new DriverEndpointsReadyWatcher(endpointsReadyFuture)
Expand Down Expand Up @@ -274,7 +275,7 @@ private[spark] class Client(
driverSubmitSslOptions,
sslVolumes,
sslVolumeMounts,
sslEnvs)
sslArgs)
} catch {
case e: Throwable =>
Utils.tryLogNonFatalError {
Expand Down Expand Up @@ -401,13 +402,21 @@ private[spark] class Client(
driverSubmitSslOptions: SSLOptions,
sslVolumes: Array[Volume],
sslVolumeMounts: Array[VolumeMount],
sslEnvs: Array[EnvVar]) = {
sslArgs: Array[String]) = {
val containerPorts = buildContainerPorts()
val probePingHttpGet = new HTTPGetActionBuilder()
.withScheme(if (driverSubmitSslOptions.enabled) "HTTPS" else "HTTP")
.withPath("/v1/submissions/ping")
.withNewPort(SUBMISSION_SERVER_PORT_NAME)
.build()
val args = mutable.Buffer[String]()
args ++= Seq(
"bin/spark-class", "org.apache.spark.deploy.rest.kubernetes.KubernetesSparkRestServer",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The command bin/spark-class should be in the Dockerfile, yes?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's similarly how Mesos works as well (although there is a bit more path wrangling).

"--hostname", "$HOSTNAME",
"--port", SUBMISSION_SERVER_PORT.toString,
"--secret-file", SUBMISSION_SERVER_PORT.toString
)

kubernetesClient.pods().createNew()
.withNewMetadata()
.withName(kubernetesAppId)
Expand All @@ -427,6 +436,7 @@ private[spark] class Client(
.withName(DRIVER_CONTAINER_NAME)
.withImage(driverDockerImage)
.withImagePullPolicy("IfNotPresent")
.withArgs(args.toArray ++ sslArgs: _*)
.addNewVolumeMount()
.withName(SUBMISSION_APP_SECRET_VOLUME_NAME)
.withMountPath(secretDirectory)
Expand All @@ -441,7 +451,6 @@ private[spark] class Client(
.withName(ENV_SUBMISSION_SERVER_PORT)
.withValue(SUBMISSION_SERVER_PORT.toString)
.endEnv()
.addToEnv(sslEnvs: _*)
.withPorts(containerPorts.asJava)
.withNewReadinessProbe().withHttpGet(probePingHttpGet).endReadinessProbe()
.endContainer()
Expand Down Expand Up @@ -528,12 +537,14 @@ private[spark] class Client(
(securityManager.getSSLOptions(KUBERNETES_SUBMIT_SSL_NAMESPACE), isLocalKeyStore)
}

private def configureSsl(kubernetesClient: KubernetesClient, driverSubmitSslOptions: SSLOptions,
isKeyStoreLocalFile: Boolean):
(Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = {
private def configureSsl(
kubernetesClient: KubernetesClient,
driverSubmitSslOptions: SSLOptions,
isKeyStoreLocalFile: Boolean):
(Array[String], Array[Volume], Array[VolumeMount], Array[Secret]) = {
if (driverSubmitSslOptions.enabled) {
val sslSecretsMap = mutable.HashMap[String, String]()
val sslEnvs = mutable.Buffer[EnvVar]()
val sslArgs = mutable.Buffer[String]()
val secrets = mutable.Buffer[Secret]()
driverSubmitSslOptions.keyStore.foreach(store => {
val resolvedKeyStoreFile = if (isKeyStoreLocalFile) {
Expand All @@ -548,37 +559,26 @@ private[spark] class Client(
} else {
store.getAbsolutePath
}
sslEnvs += new EnvVarBuilder()
.withName(ENV_SUBMISSION_KEYSTORE_FILE)
.withValue(resolvedKeyStoreFile)
.build()
sslArgs ++= Seq("--keystore-file", resolvedKeyStoreFile)
})
driverSubmitSslOptions.keyStorePassword.foreach(password => {
val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8))
sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME -> passwordBase64)
sslEnvs += new EnvVarBuilder()
.withName(ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE)
.withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME")
.build()
sslArgs ++= Seq(
"--keystore-password-file",
s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME")
})
driverSubmitSslOptions.keyPassword.foreach(password => {
val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8))
sslSecretsMap += (SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME -> passwordBase64)
sslEnvs += new EnvVarBuilder()
.withName(ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE)
.withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME")
.build()
sslArgs ++= Seq(
"--keystore-key-password-file",
s"$sslSecretsDirectory/$SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME")
})
driverSubmitSslOptions.keyStoreType.foreach(storeType => {
sslEnvs += new EnvVarBuilder()
.withName(ENV_SUBMISSION_KEYSTORE_TYPE)
.withValue(storeType)
.build()
sslArgs ++= Seq("--keystore-type", storeType)
})
sslEnvs += new EnvVarBuilder()
.withName(ENV_SUBMISSION_USE_SSL)
.withValue("true")
.build()
sslArgs ++= Seq("--use-ssl", "true")
val sslVolume = new VolumeBuilder()
.withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME)
.withNewSecret()
Expand All @@ -598,9 +598,9 @@ private[spark] class Client(
.withType("Opaque")
.done()
secrets += sslSecrets
(sslEnvs.toArray, Array(sslVolume), Array(sslVolumeMount), secrets.toArray)
(sslArgs.toArray, Array(sslVolume), Array(sslVolumeMount), secrets.toArray)
} else {
(Array[EnvVar](), Array[Volume](), Array[VolumeMount](), Array[Secret]())
(Array[String](), Array[Volume](), Array[VolumeMount](), Array[Secret]())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,6 @@ private[spark] class KubernetesClusterSchedulerBackend(
val executorCpuQuantity = new QuantityBuilder(false)
.withAmount(executorCores)
.build()
val requiredEnv = Seq(
(ENV_EXECUTOR_PORT, executorPort.toString),
(ENV_DRIVER_URL, driverUrl),
(ENV_EXECUTOR_CORES, executorCores),
(ENV_EXECUTOR_MEMORY, executorMemory),
(ENV_APPLICATION_ID, applicationId()),
(ENV_EXECUTOR_ID, executorId)
).map(env => new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
.build())
val requiredPorts = Seq(
(EXECUTOR_PORT_NAME, executorPort),
(BLOCK_MANAGER_PORT_NAME, blockmanagerPort))
Expand All @@ -189,6 +178,17 @@ private[spark] class KubernetesClusterSchedulerBackend(
.withContainerPort(port._2)
.build()
})
val args = scala.collection.mutable.Buffer[String]()
args ++= Seq(
"${JAVA_HOME}/bin/java", s"-Dspark.executor.port=${executorPort.toString}",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here - at least JAVA_HOME/bin/java but would like the Dockerfile to at least handle the specification of the main class as well. That being said it's tricky to separate out the various parts of the java command in this context, seeing as the VM options have to come before the main class.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's exactly what I wanted to avoid is to let the Dockerfile specify the main class, then it becomes coupled with the Dockerfile with a particular version.

s"-Xms$executorMemory", s"-Xmx$executorMemory",
"-cp", "${SPARK_HOME}/jars/\\*", "org.apache.spark.executor.CoarseGrainedExecutorBackend",
"--driver-url", driverUrl,
"--executor-id", executorId,
"--cores", executorCores,
"--app-id", applicationId(),
"--hostname", "$HOSTNAME"
)
try {
(executorKubernetesId, kubernetesClient.pods().createNew()
.withNewMetadata()
Expand All @@ -208,13 +208,13 @@ private[spark] class KubernetesClusterSchedulerBackend(
.withName(s"executor")
.withImage(executorDockerImage)
.withImagePullPolicy("IfNotPresent")
.withArgs(args.toArray: _*)
.withNewResources()
.addToRequests("memory", executorMemoryQuantity)
.addToLimits("memory", executorMemoryLimitQuantity)
.addToRequests("cpu", executorCpuQuantity)
.addToLimits("cpu", executorCpuQuantity)
.endResources()
.withEnv(requiredEnv.asJava)
.withPorts(requiredPorts.asJava)
.endContainer()
.endSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,4 @@ ADD conf /opt/spark/conf

ENV SPARK_HOME /opt/spark

WORKDIR /opt/spark

CMD SSL_ARGS="" && \
if ! [ -z ${SPARK_SUBMISSION_USE_SSL+x} ]; then SSL_ARGS="$SSL_ARGS --use-ssl $SPARK_SUBMISSION_USE_SSL"; fi && \
if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-file $SPARK_SUBMISSION_KEYSTORE_FILE"; fi && \
if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_TYPE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-type $SPARK_SUBMISSION_KEYSTORE_TYPE"; fi && \
if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-password-file $SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE"; fi && \
if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-key-password-file $SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE"; fi && \
exec bin/spark-class org.apache.spark.deploy.rest.kubernetes.KubernetesSparkRestServer \
--hostname $HOSTNAME \
--port $SPARK_SUBMISSION_SERVER_PORT \
--secret-file $SPARK_SUBMISSION_SECRET_LOCATION \
${SSL_ARGS}
WORKDIR /opt/spark
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,4 @@ ADD conf /opt/spark/conf

ENV SPARK_HOME /opt/spark

WORKDIR /opt/spark

# TODO support spark.executor.extraClassPath
CMD exec ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp ${SPARK_HOME}/jars/\* org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $HOSTNAME
WORKDIR /opt/spark