diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index e403a6e8b927f..15f7a17857f1f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -288,4 +288,37 @@ package object config { .doc("Interval between reports of the current app status in cluster mode.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("1s") + + // Spark dependency server for submission v2 + + private[spark] val RESOURCE_STAGING_SERVER_PORT = + ConfigBuilder("spark.kubernetes.resourceStagingServer.port") + .doc("Port for the Kubernetes resource staging server to listen on.") + .intConf + .createWithDefault(10000) + + private[spark] val RESOURCE_STAGING_SERVER_KEY_PEM = + ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.keyPem") + .doc("Key PEM file to use when having the Kubernetes dependency server listen on TLS.") + .stringConf + .createOptional + + private[spark] val RESOURCE_STAGING_SERVER_CERT_PEM = + ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.serverCertPem") + .doc("Certificate PEM file to use when having the Kubernetes dependency server" + + " listen on TLS.") + .stringConf + .createOptional + + private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_PASSWORD_FILE = + ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.keyStorePasswordFile") + .doc("File containing the keystore password for the Kubernetes dependency server.") + .stringConf + .createOptional + + private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_KEY_PASSWORD_FILE = + ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.keyPasswordFile") + .doc("File containing the key password for the Kubernetes dependency server.") + .stringConf + .createOptional } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/PemsToKeyStoreConverter.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/PemsToKeyStoreConverter.scala index da863a9fb48e2..2c68b150baf91 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/PemsToKeyStoreConverter.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/PemsToKeyStoreConverter.scala @@ -60,11 +60,12 @@ private[spark] object PemsToKeyStoreConverter { privateKey, keyPassword.map(_.toCharArray).orNull, certificates) - val keyStoreOutputPath = Paths.get(s"keystore-${UUID.randomUUID()}.$resolvedKeyStoreType") - Utils.tryWithResource(new FileOutputStream(keyStoreOutputPath.toFile)) { storeStream => + val keyStoreDir = Utils.createTempDir("temp-keystores") + val keyStoreFile = new File(keyStoreDir, s"keystore-${UUID.randomUUID()}.$resolvedKeyStoreType") + Utils.tryWithResource(new FileOutputStream(keyStoreFile)) { storeStream => keyStore.store(storeStream, keyStorePassword.map(_.toCharArray).orNull) } - keyStoreOutputPath.toFile + keyStoreFile } def convertCertPemToTrustStore( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServer.scala index e09a788c45321..8ca13da545d5d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServer.scala @@ -16,19 +16,32 @@ */ package org.apache.spark.deploy.rest.kubernetes.v2 +import java.io.{File, FileInputStream} +import java.util.Properties + import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider import com.fasterxml.jackson.module.scala.DefaultScalaModule -import org.eclipse.jetty.server.{Server, ServerConnector} +import com.google.common.collect.Maps +import org.eclipse.jetty.http.HttpVersion +import org.eclipse.jetty.server.{HttpConfiguration, HttpConnectionFactory, Server, ServerConnector, SslConnectionFactory} import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} -import org.eclipse.jetty.util.thread.QueuedThreadPool +import org.eclipse.jetty.util.thread.{QueuedThreadPool, ScheduledExecutorScheduler} import org.glassfish.jersey.media.multipart.MultiPartFeature import org.glassfish.jersey.server.ResourceConfig import org.glassfish.jersey.servlet.ServletContainer +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{ConfigReader, SparkConfigProvider} +import org.apache.spark.util.Utils private[spark] class ResourceStagingServer( port: Int, - serviceInstance: ResourceStagingService) { + serviceInstance: ResourceStagingService, + sslOptionsProvider: ResourceStagingServerSslOptionsProvider) extends Logging { private var jettyServer: Option[Server] = None @@ -45,17 +58,72 @@ private[spark] class ResourceStagingServer( contextHandler.setContextPath("/api/") contextHandler.addServlet(servletHolder, "/*") threadPool.setDaemon(true) + val resolvedConnectionFactories = sslOptionsProvider.getSslOptions + .createJettySslContextFactory() + .map(sslFactory => { + val sslConnectionFactory = new SslConnectionFactory( + sslFactory, HttpVersion.HTTP_1_1.asString()) + val rawHttpConfiguration = new HttpConfiguration() + rawHttpConfiguration.setSecureScheme("https") + rawHttpConfiguration.setSecurePort(port) + val rawHttpConnectionFactory = new HttpConnectionFactory(rawHttpConfiguration) + Array(sslConnectionFactory, rawHttpConnectionFactory) + }).getOrElse(Array(new HttpConnectionFactory())) val server = new Server(threadPool) - val connector = new ServerConnector(server) + val connector = new ServerConnector( + server, + null, + // Call this full constructor to set this, which forces daemon threads: + new ScheduledExecutorScheduler("DependencyServer-Executor", true), + null, + -1, + -1, + resolvedConnectionFactories: _*) connector.setPort(port) server.addConnector(connector) server.setHandler(contextHandler) server.start() jettyServer = Some(server) + logInfo(s"Resource staging server started on port $port.") } + def join(): Unit = jettyServer.foreach(_.join()) + def stop(): Unit = synchronized { jettyServer.foreach(_.stop()) jettyServer = None } } + +object ResourceStagingServer { + def main(args: Array[String]): Unit = { + val sparkConf = new SparkConf(true) + if (args.nonEmpty) { + val propertiesFile = new File(args(0)) + if (!propertiesFile.isFile) { + throw new IllegalArgumentException(s"Server properties file given at" + + s" ${propertiesFile.getAbsoluteFile} does not exist or is not a file.") + } + val properties = new Properties + Utils.tryWithResource(new FileInputStream(propertiesFile))(properties.load) + val propertiesMap = Maps.fromProperties(properties) + val configReader = new ConfigReader(new SparkConfigProvider(propertiesMap)) + propertiesMap.asScala.keys.foreach { key => + configReader.get(key).foreach(sparkConf.set(key, _)) + } + } + val dependenciesRootDir = Utils.createTempDir(namePrefix = "local-application-dependencies") + val serviceInstance = new ResourceStagingServiceImpl(dependenciesRootDir) + val sslOptionsProvider = new ResourceStagingServerSslOptionsProviderImpl(sparkConf) + val server = new ResourceStagingServer( + port = sparkConf.get(RESOURCE_STAGING_SERVER_PORT), + serviceInstance = serviceInstance, + sslOptionsProvider = sslOptionsProvider) + server.start() + try { + server.join() + } finally { + server.stop() + } + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSslOptionsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSslOptionsProvider.scala new file mode 100644 index 0000000000000..2744ed0a74616 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSslOptionsProvider.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes.v2 + +import java.io.File + +import com.google.common.base.Charsets +import com.google.common.io.Files + +import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SparkException, SSLOptions} +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.rest.kubernetes.v1.PemsToKeyStoreConverter +import org.apache.spark.internal.Logging + +private[spark] trait ResourceStagingServerSslOptionsProvider { + def getSslOptions: SSLOptions +} + +private[spark] class ResourceStagingServerSslOptionsProviderImpl(sparkConf: SparkConf) + extends ResourceStagingServerSslOptionsProvider with Logging { + def getSslOptions: SSLOptions = { + val baseSslOptions = new SparkSecurityManager(sparkConf) + .getSSLOptions("kubernetes.resourceStagingServer") + val maybeKeyPem = sparkConf.get(RESOURCE_STAGING_SERVER_KEY_PEM) + val maybeCertPem = sparkConf.get(RESOURCE_STAGING_SERVER_CERT_PEM) + val maybeKeyStorePasswordFile = sparkConf.get(RESOURCE_STAGING_SERVER_KEYSTORE_PASSWORD_FILE) + val maybeKeyPasswordFile = sparkConf.get(RESOURCE_STAGING_SERVER_KEYSTORE_KEY_PASSWORD_FILE) + + logSslConfigurations( + baseSslOptions, maybeKeyPem, maybeCertPem, maybeKeyStorePasswordFile, maybeKeyPasswordFile) + + requireNandDefined(baseSslOptions.keyStore, maybeKeyPem, + "Shouldn't provide both key PEM and keyStore files for TLS.") + requireNandDefined(baseSslOptions.keyStore, maybeCertPem, + "Shouldn't provide both certificate PEM and keyStore files for TLS.") + requireNandDefined(baseSslOptions.keyStorePassword, maybeKeyStorePasswordFile, + "Shouldn't provide both the keyStore password value and the keyStore password file.") + requireNandDefined(baseSslOptions.keyPassword, maybeKeyPasswordFile, + "Shouldn't provide both the keyStore key password value and the keyStore key password file.") + requireBothOrNeitherDefined( + maybeKeyPem, + maybeCertPem, + "When providing a certificate PEM file, the key PEM file must also be provided.", + "When providing a key PEM file, the certificate PEM file must also be provided.") + + val resolvedKeyStorePassword = baseSslOptions.keyStorePassword + .orElse(maybeKeyStorePasswordFile.map { keyStorePasswordFile => + safeFileToString(keyStorePasswordFile, "KeyStore password file") + }) + val resolvedKeyStoreKeyPassword = baseSslOptions.keyPassword + .orElse(maybeKeyPasswordFile.map { keyPasswordFile => + safeFileToString(keyPasswordFile, "KeyStore key password file") + }) + val resolvedKeyStore = baseSslOptions.keyStore + .orElse(maybeKeyPem.map { keyPem => + val keyPemFile = new File(keyPem) + val certPemFile = new File(maybeCertPem.get) + PemsToKeyStoreConverter.convertPemsToTempKeyStoreFile( + keyPemFile, + certPemFile, + "key", + resolvedKeyStorePassword, + resolvedKeyStoreKeyPassword, + baseSslOptions.keyStoreType) + }) + baseSslOptions.copy( + keyStore = resolvedKeyStore, + keyStorePassword = resolvedKeyStorePassword, + keyPassword = resolvedKeyStoreKeyPassword) + } + + private def logSslConfigurations( + baseSslOptions: SSLOptions, + maybeKeyPem: Option[String], + maybeCertPem: Option[String], + maybeKeyStorePasswordFile: Option[String], + maybeKeyPasswordFile: Option[String]) = { + logDebug("The following SSL configurations were provided for the resource staging server:") + logDebug(s"KeyStore File: ${baseSslOptions.keyStore.map(_.getAbsolutePath).getOrElse("N/A")}") + logDebug("KeyStore Password: " + + baseSslOptions.keyStorePassword.map(_ => "").getOrElse("N/A")) + logDebug(s"KeyStore Password File: ${maybeKeyStorePasswordFile.getOrElse("N/A")}") + logDebug("Key Password: " + + baseSslOptions.keyPassword.map(_ => "").getOrElse("N/A")) + logDebug(s"Key Password File: ${maybeKeyPasswordFile.getOrElse("N/A")}") + logDebug(s"KeyStore Type: ${baseSslOptions.keyStoreType.getOrElse("N/A")}") + logDebug(s"Key PEM: ${maybeKeyPem.getOrElse("N/A")}") + logDebug(s"Certificate PEM: ${maybeCertPem.getOrElse("N/A")}") + } + + private def requireBothOrNeitherDefined( + opt1: Option[_], + opt2: Option[_], + errMessageWhenFirstIsMissing: String, + errMessageWhenSecondIsMissing: String): Unit = { + requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing) + requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing) + } + + private def requireSecondIfFirstIsDefined( + opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = { + opt1.foreach { _ => + require(opt2.isDefined, errMessageWhenSecondIsMissing) + } + } + + private def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { + opt1.foreach { _ => require(opt2.isEmpty, errMessage) } + } + + private def safeFileToString(filePath: String, fileType: String): String = { + val file = new File(filePath) + if (!file.isFile) { + throw new SparkException(s"$fileType provided at ${file.getAbsolutePath} does not exist or" + + s" is not a file.") + } + Files.toString(file, Charsets.UTF_8) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/RetrofitUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/RetrofitUtils.scala index c5c5c0d35b7cb..7416c624e97f6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/RetrofitUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/RetrofitUtils.scala @@ -16,21 +16,50 @@ */ package org.apache.spark.deploy.rest.kubernetes.v2 +import java.io.FileInputStream +import java.security.{KeyStore, SecureRandom} +import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager} + import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule +import okhttp3.OkHttpClient import retrofit2.Retrofit import retrofit2.converter.jackson.JacksonConverterFactory import retrofit2.converter.scalars.ScalarsConverterFactory +import org.apache.spark.SSLOptions +import org.apache.spark.util.Utils + private[spark] object RetrofitUtils { private val OBJECT_MAPPER = new ObjectMapper().registerModule(new DefaultScalaModule) + private val SECURE_RANDOM = new SecureRandom() - def createRetrofitClient[T](baseUrl: String, serviceType: Class[T]): T = { + def createRetrofitClient[T](baseUrl: String, serviceType: Class[T], sslOptions: SSLOptions): T = { + val okHttpClientBuilder = new OkHttpClient.Builder() + sslOptions.trustStore.foreach { trustStoreFile => + require(trustStoreFile.isFile, s"TrustStore provided at ${trustStoreFile.getAbsolutePath}" + + " does not exist, or is not a file.") + val trustStoreType = sslOptions.trustStoreType.getOrElse(KeyStore.getDefaultType) + val trustStore = KeyStore.getInstance(trustStoreType) + val trustStorePassword = sslOptions.trustStorePassword.map(_.toCharArray).orNull + Utils.tryWithResource(new FileInputStream(trustStoreFile)) { + trustStore.load(_, trustStorePassword) + } + val trustManagerFactory = TrustManagerFactory.getInstance( + TrustManagerFactory.getDefaultAlgorithm) + trustManagerFactory.init(trustStore) + val trustManagers = trustManagerFactory.getTrustManagers + val sslContext = SSLContext.getInstance("TLSv1.2") + sslContext.init(null, trustManagers, SECURE_RANDOM) + okHttpClientBuilder.sslSocketFactory(sslContext.getSocketFactory, + trustManagers(0).asInstanceOf[X509TrustManager]) + } new Retrofit.Builder() .baseUrl(baseUrl) .addConverterFactory(ScalarsConverterFactory.create()) .addConverterFactory(JacksonConverterFactory.create(OBJECT_MAPPER)) + .client(okHttpClientBuilder.build()) .build() .create(serviceType) } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/SSLUtils.scala similarity index 98% rename from resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala rename to resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/SSLUtils.scala index 2078e0585e8f0..dacb017d8a513 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/SSLUtils.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.kubernetes.integrationtest.sslutil +package org.apache.spark.deploy.kubernetes import java.io.{File, FileOutputStream, OutputStreamWriter} import java.math.BigInteger diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSslOptionsProviderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSslOptionsProviderSuite.scala new file mode 100644 index 0000000000000..290b46a537bf3 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSslOptionsProviderSuite.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes.v2 + +import java.io.{File, FileInputStream, StringWriter} +import java.security.KeyStore + +import com.google.common.base.Charsets +import com.google.common.io.Files +import org.bouncycastle.openssl.jcajce.JcaPEMWriter +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SparkConf, SparkFunSuite, SSLOptions} +import org.apache.spark.deploy.kubernetes.SSLUtils +import org.apache.spark.util.Utils + +class ResourceStagingServerSslOptionsProviderSuite extends SparkFunSuite with BeforeAndAfter { + + private var sslTempDir: File = _ + private var keyStoreFile: File = _ + + private var sparkConf: SparkConf = _ + private var sslOptionsProvider: ResourceStagingServerSslOptionsProvider = _ + + before { + sslTempDir = Utils.createTempDir(namePrefix = "resource-staging-server-ssl-test") + keyStoreFile = new File(sslTempDir, "keyStore.jks") + sparkConf = new SparkConf(true) + sslOptionsProvider = new ResourceStagingServerSslOptionsProviderImpl(sparkConf) + } + + test("Default SparkConf does not have TLS enabled.") { + assert(sslOptionsProvider.getSslOptions === SSLOptions()) + assert(!sslOptionsProvider.getSslOptions.enabled) + keyStoreFile.delete() + sslTempDir.delete() + } + + test("Setting keyStore, key password, and key field directly.") { + sparkConf.set("spark.ssl.kubernetes.resourceStagingServer.enabled", "true") + .set("spark.ssl.kubernetes.resourceStagingServer.keyStore", keyStoreFile.getAbsolutePath) + .set("spark.ssl.kubernetes.resourceStagingServer.keyStorePassword", "keyStorePassword") + .set("spark.ssl.kubernetes.resourceStagingServer.keyPassword", "keyPassword") + val sslOptions = sslOptionsProvider.getSslOptions + assert(sslOptions.enabled, "SSL should be enabled.") + assert(sslOptions.keyStore.map(_.getAbsolutePath) === Some(keyStoreFile.getAbsolutePath), + "Incorrect keyStore path or it was not set.") + assert(sslOptions.keyStorePassword === Some("keyStorePassword"), + "Incorrect keyStore password or it was not set.") + assert(sslOptions.keyPassword === Some("keyPassword"), + "Incorrect key password or it was not set.") + } + + test("Setting key and certificate pem files should write an appropriate keyStore.") { + val (keyPemFile, certPemFile) = SSLUtils.generateKeyCertPemPair("127.0.0.1") + sparkConf.set("spark.ssl.kubernetes.resourceStagingServer.enabled", "true") + .set("spark.ssl.kubernetes.resourceStagingServer.keyPem", keyPemFile.getAbsolutePath) + .set("spark.ssl.kubernetes.resourceStagingServer.serverCertPem", certPemFile.getAbsolutePath) + .set("spark.ssl.kubernetes.resourceStagingServer.keyStorePassword", "keyStorePassword") + .set("spark.ssl.kubernetes.resourceStagingServer.keyPassword", "keyPassword") + val sslOptions = sslOptionsProvider.getSslOptions + assert(sslOptions.enabled, "SSL should be enabled.") + assert(sslOptions.keyStore.isDefined, "KeyStore should be defined.") + sslOptions.keyStore.foreach { keyStoreFile => + val keyStore = KeyStore.getInstance(KeyStore.getDefaultType) + Utils.tryWithResource(new FileInputStream(keyStoreFile)) { + keyStore.load(_, "keyStorePassword".toCharArray) + } + val key = keyStore.getKey("key", "keyPassword".toCharArray) + compareJcaPemObjectToFileString(key, keyPemFile) + val certificate = keyStore.getCertificateChain("key")(0) + compareJcaPemObjectToFileString(certificate, certPemFile) + } + } + + test("Using password files should read from the appropriate locations.") { + val keyStorePasswordFile = new File(sslTempDir, "keyStorePassword.txt") + Files.write("keyStorePassword", keyStorePasswordFile, Charsets.UTF_8) + val keyPasswordFile = new File(sslTempDir, "keyPassword.txt") + Files.write("keyPassword", keyPasswordFile, Charsets.UTF_8) + sparkConf.set("spark.ssl.kubernetes.resourceStagingServer.enabled", "true") + .set("spark.ssl.kubernetes.resourceStagingServer.keyStore", keyStoreFile.getAbsolutePath) + .set("spark.ssl.kubernetes.resourceStagingServer.keyStorePasswordFile", + keyStorePasswordFile.getAbsolutePath) + .set("spark.ssl.kubernetes.resourceStagingServer.keyPasswordFile", keyPasswordFile.getAbsolutePath) + val sslOptions = sslOptionsProvider.getSslOptions + assert(sslOptions.keyStorePassword === Some("keyStorePassword"), + "Incorrect keyStore password or it was not set.") + assert(sslOptions.keyPassword === Some("keyPassword"), + "Incorrect key password or it was not set.") + } + + private def compareJcaPemObjectToFileString(pemObject: Any, pemFile: File): Unit = { + Utils.tryWithResource(new StringWriter()) { stringWriter => + Utils.tryWithResource(new JcaPEMWriter(stringWriter)) { pemWriter => + pemWriter.writeObject(pemObject) + } + val pemFileAsString = Files.toString(pemFile, Charsets.UTF_8) + assert(stringWriter.toString === pemFileAsString) + } + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala index babc0994d25dc..51c5e43af1124 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala @@ -23,10 +23,11 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.google.common.io.ByteStreams import okhttp3.{RequestBody, ResponseBody} -import org.scalatest.BeforeAndAfterAll +import org.scalatest.BeforeAndAfter import retrofit2.Call -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkFunSuite, SSLOptions} +import org.apache.spark.deploy.kubernetes.SSLUtils import org.apache.spark.deploy.rest.kubernetes.v1.KubernetesCredentials import org.apache.spark.util.Utils @@ -39,30 +40,53 @@ import org.apache.spark.util.Utils * we've configured the Jetty server correctly and that the endpoints reached over HTTP can * receive streamed uploads and can stream downloads. */ -class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfterAll { +class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter { + private val OBJECT_MAPPER = new ObjectMapper().registerModule(new DefaultScalaModule) private val serverPort = new ServerSocket(0).getLocalPort private val serviceImpl = new ResourceStagingServiceImpl(Utils.createTempDir()) - private val server = new ResourceStagingServer(serverPort, serviceImpl) - private val OBJECT_MAPPER = new ObjectMapper().registerModule(new DefaultScalaModule) + private val sslOptionsProvider = new SettableReferenceSslOptionsProvider() + private val server = new ResourceStagingServer(serverPort, serviceImpl, sslOptionsProvider) - override def beforeAll(): Unit = { + after { + server.stop() + } + + test("Accept file and jar uploads and downloads") { server.start() + runUploadAndDownload(SSLOptions()) } - override def afterAll(): Unit = { - server.stop() + test("Enable SSL on the server") { + val (keyStore, trustStore) = SSLUtils.generateKeyStoreTrustStorePair( + ipAddress = "127.0.0.1", + keyStorePassword = "keyStore", + keyPassword = "key", + trustStorePassword = "trustStore") + val sslOptions = SSLOptions( + enabled = true, + keyStore = Some(keyStore), + keyStorePassword = Some("keyStore"), + keyPassword = Some("key"), + trustStore = Some(trustStore), + trustStorePassword = Some("trustStore")) + sslOptionsProvider.setOptions(sslOptions) + server.start() + runUploadAndDownload(sslOptions) } - test("Accept file and jar uploads and downloads") { - val retrofitService = RetrofitUtils.createRetrofitClient(s"http://localhost:$serverPort/", - classOf[ResourceStagingServiceRetrofit]) + private def runUploadAndDownload(sslOptions: SSLOptions): Unit = { + val scheme = if (sslOptions.enabled) "https" else "http" + val retrofitService = RetrofitUtils.createRetrofitClient( + s"$scheme://127.0.0.1:$serverPort/", + classOf[ResourceStagingServiceRetrofit], + sslOptions) val resourcesBytes = Array[Byte](1, 2, 3, 4) val labels = Map("label1" -> "label1Value", "label2" -> "label2value") val namespace = "namespace" val labelsJson = OBJECT_MAPPER.writer().writeValueAsString(labels) val resourcesRequestBody = RequestBody.create( - okhttp3.MediaType.parse(MediaType.MULTIPART_FORM_DATA), resourcesBytes) + okhttp3.MediaType.parse(MediaType.MULTIPART_FORM_DATA), resourcesBytes) val labelsRequestBody = RequestBody.create( okhttp3.MediaType.parse(MediaType.APPLICATION_JSON), labelsJson) val namespaceRequestBody = RequestBody.create( @@ -95,5 +119,14 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfterAll { val downloadedBytes = ByteStreams.toByteArray(responseBody.byteStream()) assert(downloadedBytes.toSeq === bytes) } +} + +private class SettableReferenceSslOptionsProvider extends ResourceStagingServerSslOptionsProvider { + private var options = SSLOptions() + + def setOptions(newOptions: SSLOptions): Unit = { + this.options = newOptions + } + override def getSslOptions: SSLOptions = options } diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index da78e783cac1b..5418afa25ca85 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -35,6 +35,13 @@ ${project.version} test + + org.apache.spark + spark-kubernetes_${scala.binary.version} + ${project.version} + test + test-jar + org.apache.spark spark-core_${scala.binary.version} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 8deb790f4b7a0..750e7668b9912 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -35,12 +35,12 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.deploy.SparkSubmit +import org.apache.spark.deploy.kubernetes.SSLUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1 -import org.apache.spark.deploy.kubernetes.integrationtest.sslutil.SSLUtils import org.apache.spark.deploy.kubernetes.submit.v1.{Client, ExternalSuppliedUrisDriverServiceManager} import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus} import org.apache.spark.util.Utils