diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 9eed9bfd2cd79..524d537d46cf4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -107,7 +107,8 @@ private[spark] class Client( .withType("Opaque") .done() try { - val (sslEnvs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl(kubernetesClient, + val (sslArgs, sslVolumes, sslVolumeMounts, sslSecrets) = configureSsl( + kubernetesClient, driverSubmitSslOptions, isKeyStoreLocalFile) try { @@ -129,7 +130,7 @@ private[spark] class Client( sslSecrets, sslVolumes, sslVolumeMounts, - sslEnvs, + sslArgs, isKeyStoreLocalFile) val ownerReferenceConfiguredDriverService = try { configureOwnerReferences( @@ -237,7 +238,7 @@ private[spark] class Client( sslSecrets: Array[Secret], sslVolumes: Array[Volume], sslVolumeMounts: Array[VolumeMount], - sslEnvs: Array[EnvVar], + sslArgs: Array[String], isKeyStoreLocalFile: Boolean): (Pod, Service) = { val endpointsReadyFuture = SettableFuture.create[Endpoints] val endpointsReadyWatcher = new DriverEndpointsReadyWatcher(endpointsReadyFuture) @@ -274,7 +275,7 @@ private[spark] class Client( driverSubmitSslOptions, sslVolumes, sslVolumeMounts, - sslEnvs) + sslArgs) } catch { case e: Throwable => Utils.tryLogNonFatalError { @@ -401,13 +402,21 @@ private[spark] class Client( driverSubmitSslOptions: SSLOptions, sslVolumes: Array[Volume], sslVolumeMounts: Array[VolumeMount], - sslEnvs: Array[EnvVar]) = { + sslArgs: Array[String]) = { val containerPorts = buildContainerPorts() val probePingHttpGet = new HTTPGetActionBuilder() .withScheme(if (driverSubmitSslOptions.enabled) "HTTPS" else "HTTP") .withPath("/v1/submissions/ping") .withNewPort(SUBMISSION_SERVER_PORT_NAME) .build() + val args = mutable.Buffer[String]() + args ++= Seq( + "bin/spark-class", "org.apache.spark.deploy.rest.kubernetes.KubernetesSparkRestServer", + "--hostname", "$HOSTNAME", + "--port", SUBMISSION_SERVER_PORT.toString, + "--secret-file", SUBMISSION_SERVER_PORT.toString + ) + kubernetesClient.pods().createNew() .withNewMetadata() .withName(kubernetesAppId) @@ -427,6 +436,7 @@ private[spark] class Client( .withName(DRIVER_CONTAINER_NAME) .withImage(driverDockerImage) .withImagePullPolicy("IfNotPresent") + .withArgs(args.toArray ++ sslArgs: _*) .addNewVolumeMount() .withName(SUBMISSION_APP_SECRET_VOLUME_NAME) .withMountPath(secretDirectory) @@ -441,7 +451,6 @@ private[spark] class Client( .withName(ENV_SUBMISSION_SERVER_PORT) .withValue(SUBMISSION_SERVER_PORT.toString) .endEnv() - .addToEnv(sslEnvs: _*) .withPorts(containerPorts.asJava) .withNewReadinessProbe().withHttpGet(probePingHttpGet).endReadinessProbe() .endContainer() @@ -528,12 +537,14 @@ private[spark] class Client( (securityManager.getSSLOptions(KUBERNETES_SUBMIT_SSL_NAMESPACE), isLocalKeyStore) } - private def configureSsl(kubernetesClient: KubernetesClient, driverSubmitSslOptions: SSLOptions, - isKeyStoreLocalFile: Boolean): - (Array[EnvVar], Array[Volume], Array[VolumeMount], Array[Secret]) = { + private def configureSsl( + kubernetesClient: KubernetesClient, + driverSubmitSslOptions: SSLOptions, + isKeyStoreLocalFile: Boolean): + (Array[String], Array[Volume], Array[VolumeMount], Array[Secret]) = { if (driverSubmitSslOptions.enabled) { val sslSecretsMap = mutable.HashMap[String, String]() - val sslEnvs = mutable.Buffer[EnvVar]() + val sslArgs = mutable.Buffer[String]() val secrets = mutable.Buffer[Secret]() driverSubmitSslOptions.keyStore.foreach(store => { val resolvedKeyStoreFile = if (isKeyStoreLocalFile) { @@ -548,37 +559,26 @@ private[spark] class Client( } else { store.getAbsolutePath } - sslEnvs += new EnvVarBuilder() - .withName(ENV_SUBMISSION_KEYSTORE_FILE) - .withValue(resolvedKeyStoreFile) - .build() + sslArgs ++= Seq("--keystore-file", resolvedKeyStoreFile) }) driverSubmitSslOptions.keyStorePassword.foreach(password => { val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8)) sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME -> passwordBase64) - sslEnvs += new EnvVarBuilder() - .withName(ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE) - .withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME") - .build() + sslArgs ++= Seq( + "--keystore-password-file", + s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME") }) driverSubmitSslOptions.keyPassword.foreach(password => { val passwordBase64 = Base64.encodeBase64String(password.getBytes(Charsets.UTF_8)) sslSecretsMap += (SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME -> passwordBase64) - sslEnvs += new EnvVarBuilder() - .withName(ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE) - .withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME") - .build() + sslArgs ++= Seq( + "--keystore-key-password-file", + s"$sslSecretsDirectory/$SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME") }) driverSubmitSslOptions.keyStoreType.foreach(storeType => { - sslEnvs += new EnvVarBuilder() - .withName(ENV_SUBMISSION_KEYSTORE_TYPE) - .withValue(storeType) - .build() + sslArgs ++= Seq("--keystore-type", storeType) }) - sslEnvs += new EnvVarBuilder() - .withName(ENV_SUBMISSION_USE_SSL) - .withValue("true") - .build() + sslArgs ++= Seq("--use-ssl", "true") val sslVolume = new VolumeBuilder() .withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME) .withNewSecret() @@ -598,9 +598,9 @@ private[spark] class Client( .withType("Opaque") .done() secrets += sslSecrets - (sslEnvs.toArray, Array(sslVolume), Array(sslVolumeMount), secrets.toArray) + (sslArgs.toArray, Array(sslVolume), Array(sslVolumeMount), secrets.toArray) } else { - (Array[EnvVar](), Array[Volume](), Array[VolumeMount](), Array[Secret]()) + (Array[String](), Array[Volume](), Array[VolumeMount](), Array[Secret]()) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 550ddd113fa42..64e87f4e61ef1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -169,17 +169,6 @@ private[spark] class KubernetesClusterSchedulerBackend( val executorCpuQuantity = new QuantityBuilder(false) .withAmount(executorCores) .build() - val requiredEnv = Seq( - (ENV_EXECUTOR_PORT, executorPort.toString), - (ENV_DRIVER_URL, driverUrl), - (ENV_EXECUTOR_CORES, executorCores), - (ENV_EXECUTOR_MEMORY, executorMemory), - (ENV_APPLICATION_ID, applicationId()), - (ENV_EXECUTOR_ID, executorId) - ).map(env => new EnvVarBuilder() - .withName(env._1) - .withValue(env._2) - .build()) val requiredPorts = Seq( (EXECUTOR_PORT_NAME, executorPort), (BLOCK_MANAGER_PORT_NAME, blockmanagerPort)) @@ -189,6 +178,17 @@ private[spark] class KubernetesClusterSchedulerBackend( .withContainerPort(port._2) .build() }) + val args = scala.collection.mutable.Buffer[String]() + args ++= Seq( + "${JAVA_HOME}/bin/java", s"-Dspark.executor.port=${executorPort.toString}", + s"-Xms$executorMemory", s"-Xmx$executorMemory", + "-cp", "${SPARK_HOME}/jars/\\*", "org.apache.spark.executor.CoarseGrainedExecutorBackend", + "--driver-url", driverUrl, + "--executor-id", executorId, + "--cores", executorCores, + "--app-id", applicationId(), + "--hostname", "$HOSTNAME" + ) try { (executorKubernetesId, kubernetesClient.pods().createNew() .withNewMetadata() @@ -208,13 +208,13 @@ private[spark] class KubernetesClusterSchedulerBackend( .withName(s"executor") .withImage(executorDockerImage) .withImagePullPolicy("IfNotPresent") + .withArgs(args.toArray: _*) .withNewResources() .addToRequests("memory", executorMemoryQuantity) .addToLimits("memory", executorMemoryLimitQuantity) .addToRequests("cpu", executorCpuQuantity) .addToLimits("cpu", executorCpuQuantity) .endResources() - .withEnv(requiredEnv.asJava) .withPorts(requiredPorts.asJava) .endContainer() .endSpec() diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile index 92fdfb8ac5f41..28e43355ce3b6 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile @@ -17,16 +17,4 @@ ADD conf /opt/spark/conf ENV SPARK_HOME /opt/spark -WORKDIR /opt/spark - -CMD SSL_ARGS="" && \ - if ! [ -z ${SPARK_SUBMISSION_USE_SSL+x} ]; then SSL_ARGS="$SSL_ARGS --use-ssl $SPARK_SUBMISSION_USE_SSL"; fi && \ - if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-file $SPARK_SUBMISSION_KEYSTORE_FILE"; fi && \ - if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_TYPE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-type $SPARK_SUBMISSION_KEYSTORE_TYPE"; fi && \ - if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-password-file $SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE"; fi && \ - if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-key-password-file $SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE"; fi && \ - exec bin/spark-class org.apache.spark.deploy.rest.kubernetes.KubernetesSparkRestServer \ - --hostname $HOSTNAME \ - --port $SPARK_SUBMISSION_SERVER_PORT \ - --secret-file $SPARK_SUBMISSION_SECRET_LOCATION \ - ${SSL_ARGS} +WORKDIR /opt/spark \ No newline at end of file diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile index a225110d55c14..c7df91ed50431 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile @@ -17,7 +17,4 @@ ADD conf /opt/spark/conf ENV SPARK_HOME /opt/spark -WORKDIR /opt/spark - -# TODO support spark.executor.extraClassPath -CMD exec ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp ${SPARK_HOME}/jars/\* org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $HOSTNAME +WORKDIR /opt/spark \ No newline at end of file