From 93050f4876aeb97727fe95bb780e6e03eb112414 Mon Sep 17 00:00:00 2001 From: Jacek Lewandowski Date: Thu, 9 Oct 2014 01:41:05 +0200 Subject: [PATCH 1/5] SPARK-3883: SSL support for HttpServer and Akka - Introduced SSLOptions object - SSLOptions is created by SecurityManager - SSLOptions configures Akka and Jetty to use SSL - Provided utility methods to determine the proper Akka protocol for Akka requests and to configure SSL socket factory for URL connections - Added tests cases for AkkaUtils, FileServer, SSLOptions and SecurityManager - Added a way to use node local SSL configuration by executors and driver - Make CoarseGrainedExecutorBackend not overwrite the settings which are executor startup configuration - they are passed anyway from Worker --- .../scala/org/apache/spark/HttpServer.scala | 11 +- .../scala/org/apache/spark/SSLOptions.scala | 124 +++++++++++ .../org/apache/spark/SecurityManager.scala | 50 ++++- .../scala/org/apache/spark/SparkConf.scala | 1 + .../spark/broadcast/HttpBroadcast.scala | 1 + .../spark/deploy/ApplicationDescription.scala | 9 + .../org/apache/spark/deploy/Client.scala | 4 +- .../spark/deploy/DriverDescription.scala | 8 + .../spark/deploy/client/AppClient.scala | 6 +- .../apache/spark/deploy/master/Master.scala | 8 +- .../spark/deploy/worker/ExecutorRunner.scala | 2 +- .../apache/spark/deploy/worker/Worker.scala | 61 +++++- .../CoarseGrainedExecutorBackend.scala | 16 +- .../cluster/SimrSchedulerBackend.scala | 6 +- .../cluster/SparkDeploySchedulerBackend.scala | 7 +- .../mesos/CoarseMesosSchedulerBackend.scala | 7 +- .../org/apache/spark/util/AkkaUtils.scala | 29 ++- .../scala/org/apache/spark/util/Utils.scala | 20 +- core/src/test/resources/keystore | Bin 0 -> 2247 bytes core/src/test/resources/truststore | Bin 0 -> 957 bytes core/src/test/resources/untrusted-keystore | Bin 0 -> 2246 bytes .../org/apache/spark/FileServerSuite.scala | 90 ++++++++ .../apache/spark/MapOutputTrackerSuite.scala | 2 +- .../org/apache/spark/SSLOptionsSuite.scala | 58 ++++++ .../org/apache/spark/SSLSampleConfigs.scala | 55 +++++ .../apache/spark/SecurityManagerSuite.scala | 36 +++- .../org/apache/spark/deploy/ClientSuite.scala | 1 + .../spark/deploy/master/MasterSuite.scala | 28 ++- .../spark/deploy/worker/WorkerSuite.scala | 57 +++++ .../apache/spark/util/AkkaUtilsSuite.scala | 197 ++++++++++++++++-- .../examples/streaming/ActorWordCount.scala | 4 +- .../spark/repl/ExecutorClassLoader.scala | 11 +- .../receiver/ReceiverSupervisorImpl.scala | 4 +- .../spark/deploy/yarn/ApplicationMaster.scala | 6 +- .../spark/deploy/yarn/YarnAllocator.scala | 9 +- 35 files changed, 852 insertions(+), 76 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/SSLOptions.scala create mode 100644 core/src/test/resources/keystore create mode 100644 core/src/test/resources/truststore create mode 100644 core/src/test/resources/untrusted-keystore create mode 100644 core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala create mode 100644 core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala create mode 100644 core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index fa22787ce7ea3..8466d99615698 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -19,6 +19,7 @@ package org.apache.spark import java.io.File +import org.eclipse.jetty.server.ssl.SslSocketConnector import org.eclipse.jetty.util.security.{Constraint, Password} import org.eclipse.jetty.security.authentication.DigestAuthenticator import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService} @@ -72,7 +73,10 @@ private[spark] class HttpServer( */ private def doStart(startPort: Int): (Server, Int) = { val server = new Server() - val connector = new SocketConnector + + val connector = securityManager.sslOptions.createJettySslContextFactory() + .map(new SslSocketConnector(_)).getOrElse(new SocketConnector) + connector.setMaxIdleTime(60 * 1000) connector.setSoLingerTime(-1) connector.setPort(startPort) @@ -149,13 +153,14 @@ private[spark] class HttpServer( } /** - * Get the URI of this HTTP server (http://host:port) + * Get the URI of this HTTP server (http://host:port or https://host:port) */ def uri: String = { if (server == null) { throw new ServerStateException("Server is not started") } else { - "http://" + Utils.localIpAddress + ":" + port + val scheme = if (securityManager.sslOptions.enabled) "https" else "http" + s"$scheme://${Utils.localIpAddress}:$port" } } } diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala new file mode 100644 index 0000000000000..2479911dd6237 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.io.File + +import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} +import org.eclipse.jetty.util.ssl.SslContextFactory + +private[spark] case class SSLOptions( + enabled: Boolean = false, + keyStore: Option[File] = None, + keyStorePassword: Option[String] = None, + keyPassword: Option[String] = None, + trustStore: Option[File] = None, + trustStorePassword: Option[String] = None, + protocol: Option[String] = None, + enabledAlgorithms: Set[String] = Set.empty) { + + /** + * Creates a Jetty SSL context factory according to the SSL settings represented by this object. + */ + def createJettySslContextFactory(): Option[SslContextFactory] = { + if (enabled) { + val sslContextFactory = new SslContextFactory() + + keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath)) + trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath)) + keyStorePassword.foreach(sslContextFactory.setKeyStorePassword) + trustStorePassword.foreach(sslContextFactory.setTrustStorePassword) + keyPassword.foreach(sslContextFactory.setKeyManagerPassword) + protocol.foreach(sslContextFactory.setProtocol) + sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*) + + Some(sslContextFactory) + } else { + None + } + } + + /** + * Creates an Akka configuration object which contains all the SSL settings represented by this + * object. It can be used then to compose the ultimate Akka configuration. + */ + def createAkkaConfig: Option[Config] = { + import scala.collection.JavaConversions._ + if (enabled) { + Some(ConfigFactory.empty() + .withValue("akka.remote.netty.tcp.security.key-store", + ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse(""))) + .withValue("akka.remote.netty.tcp.security.key-store-password", + ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse(""))) + .withValue("akka.remote.netty.tcp.security.trust-store", + ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse(""))) + .withValue("akka.remote.netty.tcp.security.trust-store-password", + ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse(""))) + .withValue("akka.remote.netty.tcp.security.key-password", + ConfigValueFactory.fromAnyRef(keyPassword.getOrElse(""))) + .withValue("akka.remote.netty.tcp.security.random-number-generator", + ConfigValueFactory.fromAnyRef("")) + .withValue("akka.remote.netty.tcp.security.protocol", + ConfigValueFactory.fromAnyRef(protocol.getOrElse(""))) + .withValue("akka.remote.netty.tcp.security.enabled-algorithms", + ConfigValueFactory.fromIterable(enabledAlgorithms.toSeq)) + .withValue("akka.remote.netty.tcp.enable-ssl", + ConfigValueFactory.fromAnyRef(true))) + } else { + None + } + } + + override def toString: String = s"SSLOptions{enabled=$enabled, " + + s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " + + s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " + + s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}" + +} + +private[spark] object SSLOptions extends Logging { + + /** + * Resolves SSLOptions settings from a given Spark configuration object at a given namespace. + * The parent directory of that location is used as a base directory to resolve relative paths + * to keystore and truststore. + */ + def parse(conf: SparkConf, ns: String): SSLOptions = { + val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = false) + val keyStore = conf.getOption(s"$ns.keyStore").map(new File(_)) + val keyStorePassword = conf.getOption(s"$ns.keyStorePassword") + val keyPassword = conf.getOption(s"$ns.keyPassword") + val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_)) + val trustStorePassword = conf.getOption(s"$ns.trustStorePassword") + val protocol = conf.getOption(s"$ns.protocol") + val enabledAlgorithms = conf.get(s"$ns.enabledAlgorithms", defaultValue = "") + .split(",").map(_.trim).filter(_.nonEmpty).toSet + + new SSLOptions( + enabled, + keyStore, + keyStorePassword, + keyPassword, + trustStore, + trustStorePassword, + protocol, + enabledAlgorithms) + } + +} + diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index ec82d09cd079b..d3ab0790e5e95 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -18,7 +18,11 @@ package org.apache.spark import java.net.{Authenticator, PasswordAuthentication} +import java.security.KeyStore +import java.security.cert.X509Certificate +import javax.net.ssl._ +import com.google.common.io.Files import org.apache.hadoop.io.Text import org.apache.spark.deploy.SparkHadoopUtil @@ -144,7 +148,8 @@ import org.apache.spark.network.sasl.SecretKeyHolder * can take place. */ -private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder { +private[spark] class SecurityManager(sparkConf: SparkConf) + extends Logging with SecretKeyHolder { // key used to store the spark secret in the Hadoop UGI private val sparkSecretLookupKey = "sparkCookie" @@ -196,6 +201,49 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with ) } + val sslOptions = SSLOptions.parse(sparkConf, "spark.ssl") + logDebug(s"SSLConfiguration: $sslOptions") + + val (sslSocketFactory, hostnameVerifier) = if (sslOptions.enabled) { + val trustStoreManagers = + for (trustStore <- sslOptions.trustStore) yield { + val input = Files.asByteSource(sslOptions.trustStore.get).openStream() + + try { + val ks = KeyStore.getInstance(KeyStore.getDefaultType) + ks.load(input, sslOptions.trustStorePassword.get.toCharArray) + + val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) + tmf.init(ks) + tmf.getTrustManagers + } finally { + input.close() + } + } + + lazy val credulousTrustStoreManagers = Array({ + logWarning("Using 'accept-all' trust manager for SSL connections.") + new X509TrustManager { + override def getAcceptedIssuers: Array[X509Certificate] = null + + override def checkClientTrusted(x509Certificates: Array[X509Certificate], s: String) {} + + override def checkServerTrusted(x509Certificates: Array[X509Certificate], s: String) {} + }: TrustManager + }) + + val sslContext = SSLContext.getInstance(sslOptions.protocol.getOrElse("Default")) + sslContext.init(null, trustStoreManagers.getOrElse(credulousTrustStoreManagers), null) + + val hostVerifier = new HostnameVerifier { + override def verify(s: String, sslSession: SSLSession): Boolean = true + } + + (Some(sslContext.getSocketFactory), Some(hostVerifier)) + } else { + (None, None) + } + /** * Split a comma separated String, filter out any empty items, and return a Set of strings */ diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index cd91c8f87547b..0512a8b0c4532 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -369,6 +369,7 @@ private[spark] object SparkConf { isAkkaConf(name) || name.startsWith("spark.akka") || name.startsWith("spark.auth") || + name.startsWith("spark.ssl") || isSparkPortConf(name) } diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 31d6958c403b3..428a3455b4eca 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -199,6 +199,7 @@ private[broadcast] object HttpBroadcast extends Logging { uc = new URL(url).openConnection() uc.setConnectTimeout(httpReadTimeout) } + Utils.setupSecureURLConnection(uc, securityManager) val in = { uc.setReadTimeout(httpReadTimeout) diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 65a1a8fd7e929..ae55b4ff40b74 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -28,5 +28,14 @@ private[spark] class ApplicationDescription( val user = System.getProperty("user.name", "") + def copy( + name: String = name, + maxCores: Option[Int] = maxCores, + memoryPerSlave: Int = memoryPerSlave, + command: Command = command, + appUiUrl: String = appUiUrl, + eventLogDir: Option[String] = eventLogDir): ApplicationDescription = + new ApplicationDescription(name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir) + override def toString: String = "ApplicationDescription(" + name + ")" } diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 7c1c831c248fc..542635fada6e8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -39,7 +39,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) val timeout = AkkaUtils.askTimeout(conf) override def preStart() = { - masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master)) + masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master, conf)) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) @@ -161,7 +161,7 @@ object Client { "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf)) // Verify driverArgs.master is a valid url so that we can use it in ClientActor safely - Master.toAkkaUrl(driverArgs.master) + Master.toAkkaUrl(driverArgs.master, conf) actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf)) actorSystem.awaitTermination() diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala index 58c95dc4f9116..b056a19ce6598 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala @@ -25,5 +25,13 @@ private[spark] class DriverDescription( val command: Command) extends Serializable { + def copy( + jarUrl: String = jarUrl, + mem: Int = mem, + cores: Int = cores, + supervise: Boolean = supervise, + command: Command = command): DriverDescription = + new DriverDescription(jarUrl, mem, cores, supervise, command) + override def toString: String = s"DriverDescription (${command.mainClass})" } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 39a7b0319b6a1..a711922f5cd82 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -47,7 +47,7 @@ private[spark] class AppClient( conf: SparkConf) extends Logging { - val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl) + val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, conf)) val REGISTRATION_TIMEOUT = 20.seconds val REGISTRATION_RETRIES = 3 @@ -107,8 +107,8 @@ private[spark] class AppClient( def changeMaster(url: String) { // activeMasterUrl is a valid Spark url since we receive it from master. activeMasterUrl = url - master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl)) - masterAddress = Master.toAkkaAddress(activeMasterUrl) + master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl, conf)) + masterAddress = Master.toAkkaAddress(activeMasterUrl, conf) } private def isPossibleMaster(remoteUrl: Address) = { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index d92d99310a583..a8294ecd51e3e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -860,9 +860,9 @@ private[spark] object Master extends Logging { * * @throws SparkException if the url is invalid */ - def toAkkaUrl(sparkUrl: String): String = { + def toAkkaUrl(sparkUrl: String, conf: SparkConf): String = { val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl) - "akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName) + AkkaUtils.address(systemName, host, port, actorName, conf) } /** @@ -870,9 +870,9 @@ private[spark] object Master extends Logging { * * @throws SparkException if the url is invalid */ - def toAkkaAddress(sparkUrl: String): Address = { + def toAkkaAddress(sparkUrl: String, conf: SparkConf): Address = { val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl) - Address("akka.tcp", systemName, host, port) + Address(AkkaUtils.protocol(conf), systemName, host, port) } def startSystemAndActor( diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index acbdf0d8bd7bc..bc9f78b9e5c77 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -26,7 +26,7 @@ import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.spark.{SparkConf, Logging} -import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState} +import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged import org.apache.spark.util.logging.FileAppender diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 13599830123d0..79fce0961369f 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -31,8 +31,8 @@ import scala.util.Random import akka.actor._ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} -import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} +import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI @@ -93,7 +93,7 @@ private[spark] class Worker( var masterAddress: Address = null var activeMasterUrl: String = "" var activeMasterWebUiUrl : String = "" - val akkaUrl = "akka.tcp://%s@%s:%s/user/%s".format(actorSystemName, host, port, actorName) + val akkaUrl = AkkaUtils.address(actorSystemName, host, port, actorName, conf) @volatile var registered = false @volatile var connected = false val workerId = generateWorkerId() @@ -174,8 +174,8 @@ private[spark] class Worker( // activeMasterUrl it's a valid Spark url since we receive it from master. activeMasterUrl = url activeMasterWebUiUrl = uiUrl - master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl)) - masterAddress = Master.toAkkaAddress(activeMasterUrl) + master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl, conf)) + masterAddress = Master.toAkkaAddress(activeMasterUrl, conf) connected = true // Cancel any outstanding re-registration attempts because we found a new master registrationRetryTimer.foreach(_.cancel()) @@ -347,10 +347,20 @@ private[spark] class Worker( }.toSeq } appDirectories(appId) = appLocalDirs - - val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, - self, workerId, host, sparkHome, executorDir, akkaUrl, conf, appLocalDirs, - ExecutorState.LOADING) + val manager = new ExecutorRunner( + appId, + execId, + appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)), + cores_, + memory_, + self, + workerId, + host, + sparkHome, + executorDir, + akkaUrl, + conf, + appLocalDirs, ExecutorState.LOADING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ @@ -406,7 +416,14 @@ private[spark] class Worker( case LaunchDriver(driverId, driverDesc) => { logInfo(s"Asked to launch driver $driverId") - val driver = new DriverRunner(conf, driverId, workDir, sparkHome, driverDesc, self, akkaUrl) + val driver = new DriverRunner( + conf, + driverId, + workDir, + sparkHome, + driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), + self, + akkaUrl) drivers(driverId) = driver driver.start() @@ -523,10 +540,32 @@ private[spark] object Worker extends Logging { val securityMgr = new SecurityManager(conf) val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, securityManager = securityMgr) - val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl) + val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, conf)) actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory, masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName) (actorSystem, boundPort) } + private[spark] def isUseLocalNodeSSLConfig(cmd: Command): Boolean = { + val pattern = """\-Dspark\.ssl\.useNodeLocalConf\=(.+)""".r + val result = cmd.javaOpts.collectFirst { + case pattern(_result) => _result.toBoolean + } + result.getOrElse(false) + } + + private[spark] def maybeUpdateSSLSettings(cmd: Command, conf: SparkConf): Command = { + val prefix = "spark.ssl." + val useNLC = "spark.ssl.useNodeLocalConf" + if (isUseLocalNodeSSLConfig(cmd)) { + val newJavaOpts = cmd.javaOpts + .filter(opt => !opt.startsWith(s"-D$prefix")) ++ + conf.getAll.collect { case (key, value) if key.startsWith(prefix) => s"-D$key=$value" } :+ + s"-D$useNLC=true" + cmd.copy(javaOpts = newJavaOpts) + } else { + cmd + } + } + } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 823825302658c..bc72c8970319c 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -123,7 +123,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { val executorConf = new SparkConf val port = executorConf.getInt("spark.executor.port", 0) val (fetcher, _) = AkkaUtils.createActorSystem( - "driverPropsFetcher", hostname, port, executorConf, new SecurityManager(executorConf)) + "driverPropsFetcher", + hostname, + port, + executorConf, + new SecurityManager(executorConf)) val driver = fetcher.actorSelection(driverUrl) val timeout = AkkaUtils.askTimeout(executorConf) val fut = Patterns.ask(driver, RetrieveSparkProps, timeout) @@ -132,7 +136,15 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { fetcher.shutdown() // Create SparkEnv using properties we fetched from the driver. - val driverConf = new SparkConf().setAll(props) + val driverConf = new SparkConf() + for ((key, value) <- props) { + // this is required for SSL in standalone mode + if (SparkConf.isExecutorStartupConf(key)) { + driverConf.setIfMissing(key, value) + } else { + driverConf.set(key, value) + } + } val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, port, cores, isLocal = false) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index ee10aa061f4e9..162058c328b2d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.spark.{Logging, SparkContext, SparkEnv} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.AkkaUtils private[spark] class SimrSchedulerBackend( scheduler: TaskSchedulerImpl, @@ -38,11 +39,12 @@ private[spark] class SimrSchedulerBackend( override def start() { super.start() - val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + val driverUrl = AkkaUtils.address( SparkEnv.driverActorSystemName, sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port"), - CoarseGrainedSchedulerBackend.ACTOR_NAME) + CoarseGrainedSchedulerBackend.ACTOR_NAME, + sc.conf) val conf = SparkHadoopUtil.get.newConfiguration(sc.conf) val fs = FileSystem.get(conf) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 7eb87a564d6f5..238ae462fefdd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -21,7 +21,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv} import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.client.{AppClient, AppClientListener} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl} -import org.apache.spark.util.Utils +import org.apache.spark.util.{AkkaUtils, Utils} private[spark] class SparkDeploySchedulerBackend( scheduler: TaskSchedulerImpl, @@ -46,11 +46,12 @@ private[spark] class SparkDeploySchedulerBackend( super.start() // The endpoint for executors to talk to us - val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + val driverUrl = AkkaUtils.address( SparkEnv.driverActorSystemName, conf.get("spark.driver.host"), conf.get("spark.driver.port"), - CoarseGrainedSchedulerBackend.ACTOR_NAME) + CoarseGrainedSchedulerBackend.ACTOR_NAME, + conf) val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{APP_ID}}", "{{WORKER_URL}}") val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 5289661eb896b..23d9c81438eb8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -31,7 +31,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTas import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend -import org.apache.spark.util.Utils +import org.apache.spark.util.{Utils, AkkaUtils} /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds @@ -143,11 +143,12 @@ private[spark] class CoarseMesosSchedulerBackend( } val command = CommandInfo.newBuilder() .setEnvironment(environment) - val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + val driverUrl = AkkaUtils.address( SparkEnv.driverActorSystemName, conf.get("spark.driver.host"), conf.get("spark.driver.port"), - CoarseGrainedSchedulerBackend.ACTOR_NAME) + CoarseGrainedSchedulerBackend.ACTOR_NAME, + conf) val uri = conf.get("spark.executor.uri", null) if (uri == null) { diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 4c9b1e3c46f0f..77f9182ed79e6 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -27,7 +27,7 @@ import akka.pattern.ask import com.typesafe.config.ConfigFactory import org.apache.log4j.{Level, Logger} -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException, SSLOptions} /** * Various utility classes for working with Akka. @@ -91,8 +91,10 @@ private[spark] object AkkaUtils extends Logging { val secureCookie = if (isAuthOn) secretKey else "" logDebug(s"In createActorSystem, requireCookie is: $requireCookie") - val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback( - ConfigFactory.parseString( + val akkaSslConfig = securityManager.sslOptions.createAkkaConfig.getOrElse(ConfigFactory.empty()) + + val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]) + .withFallback(akkaSslConfig).withFallback(ConfigFactory.parseString( s""" |akka.daemonic = on |akka.loggers = [""akka.event.slf4j.Slf4jLogger""] @@ -214,7 +216,7 @@ private[spark] object AkkaUtils extends Logging { val driverHost: String = conf.get("spark.driver.host", "localhost") val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost, "Expected hostname") - val url = s"akka.tcp://$driverActorSystemName@$driverHost:$driverPort/user/$name" + val url = address(driverActorSystemName, driverHost, driverPort, name, conf) val timeout = AkkaUtils.lookupTimeout(conf) logInfo(s"Connecting to $name: $url") Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) @@ -228,9 +230,26 @@ private[spark] object AkkaUtils extends Logging { actorSystem: ActorSystem): ActorRef = { val executorActorSystemName = SparkEnv.executorActorSystemName Utils.checkHost(host, "Expected hostname") - val url = s"akka.tcp://$executorActorSystemName@$host:$port/user/$name" + val url = address(executorActorSystemName, host, port, name, conf) val timeout = AkkaUtils.lookupTimeout(conf) logInfo(s"Connecting to $name: $url") Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) } + + def protocol(conf: SparkConf): String = { + if (conf.getBoolean("spark.ssl.enabled", defaultValue = false)) { + "akka.ssl.tcp" + } else { + "akka.tcp" + } + } + + def address( + systemName: String, + host: String, + port: Any, + actorName: String, + conf: SparkConf): String = { + s"${protocol(conf)}://$systemName@$host:$port/user/$actorName" + } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 86ac307fc84ba..65eabd7c13e2f 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -21,8 +21,9 @@ import java.io._ import java.lang.management.ManagementFactory import java.net._ import java.nio.ByteBuffer -import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor} -import java.util.{Locale, Properties, Random, UUID} +import java.util.{Properties, Locale, Random, UUID} +import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor} +import javax.net.ssl.HttpsURLConnection import scala.collection.JavaConversions._ import scala.collection.Map @@ -549,6 +550,7 @@ private[spark] object Utils extends Logging { logDebug("fetchFile not using security") uc = new URL(url).openConnection() } + Utils.setupSecureURLConnection(uc, securityMgr) val timeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000 uc.setConnectTimeout(timeout) @@ -1780,6 +1782,20 @@ private[spark] object Utils extends Logging { PropertyConfigurator.configure(pro) } + /** + * If the given URL connection is HttpsURLConnection, it sets the SSL socket factory and + * the host verifier from the given security manager. + */ + def setupSecureURLConnection(urlConnection: URLConnection, sm: SecurityManager): URLConnection = { + urlConnection match { + case https: HttpsURLConnection => + sm.sslSocketFactory.foreach(https.setSSLSocketFactory) + sm.hostnameVerifier.foreach(https.setHostnameVerifier) + https + case connection => connection + } + } + def invoke( clazz: Class[_], obj: AnyRef, diff --git a/core/src/test/resources/keystore b/core/src/test/resources/keystore new file mode 100644 index 0000000000000000000000000000000000000000..f8310e39ba1e07c7559e5724f522c06ca47f5787 GIT binary patch literal 2247 zcmchYc{J1uAI9f5Gt8i|P1&wBlPn{@Y0#AI+9uMFEKS|9bCtJ0{)TM zX}X`!>67%Z@GWAuh_Ft8Kt!M*6TS&bii+!r0&qYVDgyu_5I7UgZ~dLDFFKbbe4NJ> z8h*OOS9N;OWepcs>OGoE=zuk2V{*wI)Z_e}>Z@^h>_v>&7v=eK%-(UR|F2)6ifSZm zB{$HW8@=O<)XbNH;_wk=FeM$Bjfx=?s>^D9Zr#6WTzln-!DYFW$=EZVD(3K>jS96w z-f`E>K|`eE2uJ#EUlg%5TBw+iWJz5iySD{o5{Ul(C)@&^f0t&B->D-6`JH0%@@;s@ zjMngAG004|Ao+Z!S-#o>U%j;FhYZ-6+Sr%89Pdns-=>tDGRpP7IeZ?->$L-DANQrdWKTa;0 zluk^040v;S@5DL7!>L;M8&vwCZAjLK*vpGqYx~-VYZs+3mb>>RYt{axVJiP>$3Y|} zz*eB_iPWr6Tc->>lU_2dn3A0UZrr!p`Ly9<%d%n&X#yvmACj3LY^dq?e=FaqJ!`fkB(%M!T4bA&bADq%;%h1-L)a{YF#I>*n5; zXt7P-gtHyc-1yawv9;te^CLkar+P2}c`q>)=Cwt62se##oHtS0qXNZJW*YMjnTYe| z1W+lZjRJNxO-J~$3RTEpZS0TIp!w}_nOBXJ>YGGE1!=AA8t_d9vGYswy&Plo@H7BgF?Rr;IN}~I_OZ5AE?Xc>e zduB^$Zl^0#&4F^xR^%KivoAj*R)05F<5T{9#!cLae!D`i$@(i5VdKPs;vD6ow-Pa0 zmfi>Du&L;*jI`*2mxHbvy2C)a@EF^M?KmG1vd1J;hO<00q{Ea0+F@!vpWZUoIem_pk7QGFApU$XWEZyP!S$ zy2#@wW_snjaE*mkb%s`~Ur~W0n&f9pXRDv9^a`mg8=d@ECh_CSHJ#du`ah#SxB9W1 z-pb!}YxjkPKt*mOLDS3IRD~MerwrgiggePgez$MStIAG30Vaz+z znP5Kohc09y*(6S=bZ0;6*@CtY{i164xeX%S_5pQfMC@N!nO0wH%Xtuc`Uu_?pnpJ3cKSbR^Nesz@fkeg8K_ zTJ35i8GW{Bgm&Uy_A)8Ar>TC80v~KUm51AD4|t?pz-*InPU(xFwfe7aK@- z&`sYxAFsV#57|9oUhntf^%Ms6?u7Ik>%Ezi3GO{ccd$K+?Z=*8-c9HP#s zOrXfNhRKwkI)1xzMcTvsXTrA@mGdnR9#$e8BDNj1us(Rjp^I@0JHr=J9h*keYb@4` zEbRm4_$*5QRieRoP)JDy&5{>VVsBkDRg>0s_n`ei%|c)v!>9n^H~H}eLNotr3B^*= z{jjq6yqK``^Vn@D41fS-8lW1evegh3Nw^$*J9JmC0%UznnvdunEUWXJm7jB#`FDt0 zo!SZ!fDlyn__i1(A!8e5JfGHGYq#O59PYAPUd^G9FA4GLhdQU@y3_KivZwp9e#sjZ z1)6E~Py4647bJ-*g&tbSSqXlAo`|VY0-a#1EcqqpmY_MpMl0!c@gNTqd?DPoAC6{F zX#Czw>93?(`H$;NoCL46jICSrt~xR}wNV^~dn<|;)#U(7hP$oGOzs-~AvLGK&g0zM zw^?bP=4NGFJ3;N}QsShyk0XSs?LW@9d~Sxj?AVZ{pOtdaJ7!gP>dq$(AFE$ic(QF_ zMtzQi_+9hZzoOnZ-qFt&6a;ssvcmgs))|WiZVphZ+9n#oo9dzuTg|>3rcC)~XD4Ad ZwPrecI^>>DS9jrYAM6Ze@sCox{su3o&c*-$ literal 0 HcmV?d00001 diff --git a/core/src/test/resources/truststore b/core/src/test/resources/truststore new file mode 100644 index 0000000000000000000000000000000000000000..a6b1d46e1f391995553771665934518114892962 GIT binary patch literal 957 zcmezO_TO6u1_mY|W(3omIr+(nIT`uIB|s66PaJ187+53pObsj<7?`UKnwZNCnwa7j zFf%bSF|h<%`7Sl!W#iOp^Jx3d%gD&h%3zRVC}6bQhKm@;iSrto7#bK@7#SNH7@I@^xuyn&24+w$on6z!sDvCGjI0dIO^o~u z22G4yOihf849&hp=Zrag|9n~Gc#H3$vhv=@pKm=9>UvU(ZZwCjnIky!nB{NDyiMQB zBZO<_+{laCF!_t`)YJQR{_Ib&x!G3fds+PIs-~1XLS?NdeqFL|Ki_tDnb_pb)#>@( zTD|J;>oi2PIZoZR+nId!%3~GY`%wb7osA|uXbd}Tx=!cX%NWDQox39rO%?vMzHf?~ z=1swLR-RoC+D>nNbu4gY&$gL|-rloc_5R@2ZL8S5%Xot9o!zH-pDYd6dHL#Xw%@*+ zy&LU+nmT%>zI)Xlb76*?{++DCc`NfA&lk?~bl&)QW6#lb?w*%6eoUWUYC4NO`N6ZZ z?8-A|%!_eU4)c+*d(ZiH(T7{hnV1hF?DgUa_CL zx_EQkYw>r1g2;ggOjE!>WMr_A)k`{P(!x=_M=);x=K055cF6ji^!W09N=!ueTVn;U z6EA8{^-Nzg`Ng@3($jCUF&@ae8^o zyyq)UdNa4Ognk!%pnCm3|2v5#(^qdW|F_DnC)CMo)7|XjC#6>R9=pT-w&U3U_Jbb| zump?#5z4RO57rKM+MK!exniP|!?*K7M~h9*`B~JdzSntD@o>*t-I;G@<(}y1FS)RE zvl(0Zzf0L`j^5p4uvCF<+hNhw literal 0 HcmV?d00001 diff --git a/core/src/test/resources/untrusted-keystore b/core/src/test/resources/untrusted-keystore new file mode 100644 index 0000000000000000000000000000000000000000..6015b02caa12817721fca3a83c8e58338b6d9aeb GIT binary patch literal 2246 zcmchYS5VUl7sc~W0)Ye-=~4xxg#Zav38z3~KrxBxU2`W=0g8?MR?KmjE9H~?^gpj7C>T*;?*E%{fv1t#2A zwmzrvHqay1&t+Fi_Z+)w1n4H+Y%AMXJ&rSoCSXIQW|Hr+fhOC{~-z zRlY#ibR-jPTxR#)AT!BEO?ezTUwe>&ocp&K#iX4G4x}&l;Dj))1SC+yocOHPKcyd#4h$$Ysx{PW2wSrMAGjmd{> zmZKc2Dyf?4ujwLhFCx*5Hx~U|Z)4cF@ukC(ds5*mRqTjWZQ!rEzURPjf;fj^#543K+ z!ssm8re)pqzo9BLWxDF++xd2(u`*)aHlZZpG29)-s8jv^L!<=2n_lp*^DBVw#MvPDYjjU59hk-*=5ySF?;vN-zNZGbmhBK9h zT^c^=AHxzXuw5SuT_j1a9=(n&J?~VzvG}qlOYj8&6PtlOG82v{Ls5creeldHDY=n} z6NS98Hi+3%9-T{Nl<}Y`*yL)P*pyGtkkOKUx$aHwg>deSRCRDu3S_Bt*qYqP;^=vW z>Sial$32p`22-|}%Wtf09n6kxR~g&{ z;kf$;?1pyH#legE8IIWJMw1!v9A-Bq$dIyn#d=DTZcOxsMZN<8A29U{XY8FE^^?*t(;Wc9KRLxuFXiP-G z>e>s*`P%O)W#bh}MOrv+{^3tN%a_tNb9D8h>-eF{WYv1B{AL3=1%=#OJh|?KR$7@~ zM1%siVBJLAn~Pz$VnIy*@}LG*a&*qN<%mI`ZQi8>jN6Bzl5(ULw7YAmA5UoEllgUDu` zAewV|((5eVYOO+`<3sL2d)BS_Mok%KAB6q1*Jc7zF$H5nC_yMo`6_U&c?)aPk4b$z zk=~H8I#D$8!fS_4;uOVeZ*ca0M>wc+>X_;202 zht3{zS6QTZ`z}%LI2uqFW?YIRHI*33>s?|kWxkxSkI|mB{+aCe>R$D#3C_eDcm?~4 z04*s~%RJne@gR@YM1zFG)0DX;`R-ZeVAqWJizOR@=|SzH@w)|jt`!(`wOVk zU(Gc+3x6OSUy2;rv#hrK`;koo{3~F+?Lz<_V{4;1#8arvZ&0V8@AqY z%gOfdX610i{YAs|(!9Q+SL{EgzRT#t9JFQIGR2S`6VaKXbv2cJ2H)BOR*n|BK8B>J zu4T{_d4E~Njm&*(N{Lz*K3`qRFrPLYtX(hoRRuEpLT4Gjc z7p!#ttliZeFa!Vrd9r93wA5ijq>e&GphCV$=V%&Tr1x)APN~~@unO~heb4>d#KTM- znh1b2&I@^pv~P&tiq}$Qc2}5{Y1K+f=lfqETn=T3E>iK(`_W8&jcZK}2tO)n$}*DS zW9s~&EeO=mYgrtHq(vr!#h3NAy`7YJ8V0q&`(N)hb2~hoF87PZRGUk~lN{xCT(jsw z;Yp@T1SAT~KqTCb3a(cP)&1DNK4h=JduK2)s(9XTV?-4(76KO@#|t*-6s2Dl=w}?& z`IEOv)DCjYr5eow*?MXh8OBgmy;!t>iQ6;NbaZ0pfS*It8}lB6l1(9$AdKemI=5q_ z{7jO%*zS`ZmPK|=YMq+2r-Np*`16dZu!n^|WkFZrL^nhKO "y", "spark.ssl.opt2" -> "z")) + .javaOpts should contain theSameElementsInOrderAs Seq( + "-Dasdf=dfgh", "-Dspark.ssl.opt1=x") + + Worker.maybeUpdateSSLSettings( + cmd("-Dspark.ssl.useNodeLocalConf=false", "-Dspark.ssl.opt1=x"), + conf("spark.ssl.opt1" -> "y", "spark.ssl.opt2" -> "z")) + .javaOpts should contain theSameElementsInOrderAs Seq( + "-Dspark.ssl.useNodeLocalConf=false", "-Dspark.ssl.opt1=x") + + Worker.maybeUpdateSSLSettings( + cmd("-Dspark.ssl.useNodeLocalConf=true", "-Dspark.ssl.opt1=x"), + conf("spark.ssl.opt1" -> "y", "spark.ssl.opt2" -> "z")) + .javaOpts should contain theSameElementsAs Seq( + "-Dspark.ssl.useNodeLocalConf=true", "-Dspark.ssl.opt1=y", "-Dspark.ssl.opt2=z") + + } +} diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala index 6bbf72e929dcb..38a9341efb500 100644 --- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.util +import java.util.concurrent.TimeoutException + import scala.concurrent.Await import akka.actor._ @@ -26,6 +28,7 @@ import org.scalatest.FunSuite import org.apache.spark._ import org.apache.spark.scheduler.MapStatus import org.apache.spark.storage.BlockManagerId +import org.apache.spark.SSLSampleConfigs._ /** @@ -47,7 +50,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val masterTracker = new MapOutputTrackerMaster(conf) masterTracker.trackerActor = actorSystem.actorOf( - Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") + Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") val badconf = new SparkConf badconf.set("spark.authenticate", "true") @@ -60,7 +63,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro conf = conf, securityManager = securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) val selection = slaveSystem.actorSelection( - s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker") + AkkaUtils.address("spark", "localhost", boundPort, "MapOutputTracker", conf)) val timeout = AkkaUtils.lookupTimeout(conf) intercept[akka.actor.ActorNotFound] { slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) @@ -74,7 +77,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val conf = new SparkConf conf.set("spark.authenticate", "false") conf.set("spark.authenticate.secret", "bad") - val securityManager = new SecurityManager(conf); + val securityManager = new SecurityManager(conf) val hostname = "localhost" val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, @@ -85,18 +88,18 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val masterTracker = new MapOutputTrackerMaster(conf) masterTracker.trackerActor = actorSystem.actorOf( - Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") + Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") val badconf = new SparkConf badconf.set("spark.authenticate", "false") badconf.set("spark.authenticate.secret", "good") - val securityManagerBad = new SecurityManager(badconf); + val securityManagerBad = new SecurityManager(badconf) val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = badconf, securityManager = securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) val selection = slaveSystem.actorSelection( - s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker") + AkkaUtils.address("spark", "localhost", boundPort, "MapOutputTracker", conf)) val timeout = AkkaUtils.lookupTimeout(conf) slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) @@ -124,7 +127,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val conf = new SparkConf conf.set("spark.authenticate", "true") conf.set("spark.authenticate.secret", "good") - val securityManager = new SecurityManager(conf); + val securityManager = new SecurityManager(conf) val hostname = "localhost" val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, @@ -135,12 +138,12 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val masterTracker = new MapOutputTrackerMaster(conf) masterTracker.trackerActor = actorSystem.actorOf( - Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") + Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") val goodconf = new SparkConf goodconf.set("spark.authenticate", "true") goodconf.set("spark.authenticate.secret", "good") - val securityManagerGood = new SecurityManager(goodconf); + val securityManagerGood = new SecurityManager(goodconf) assert(securityManagerGood.isAuthenticationEnabled() === true) @@ -148,7 +151,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro conf = goodconf, securityManager = securityManagerGood) val slaveTracker = new MapOutputTrackerWorker(conf) val selection = slaveSystem.actorSelection( - s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker") + AkkaUtils.address("spark", "localhost", boundPort, "MapOutputTracker", conf)) val timeout = AkkaUtils.lookupTimeout(conf) slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) @@ -175,7 +178,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro conf.set("spark.authenticate", "true") conf.set("spark.authenticate.secret", "good") - val securityManager = new SecurityManager(conf); + val securityManager = new SecurityManager(conf) val hostname = "localhost" val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, @@ -186,12 +189,12 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro val masterTracker = new MapOutputTrackerMaster(conf) masterTracker.trackerActor = actorSystem.actorOf( - Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") + Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") val badconf = new SparkConf badconf.set("spark.authenticate", "false") badconf.set("spark.authenticate.secret", "bad") - val securityManagerBad = new SecurityManager(badconf); + val securityManagerBad = new SecurityManager(badconf) assert(securityManagerBad.isAuthenticationEnabled() === false) @@ -199,7 +202,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro conf = badconf, securityManager = securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) val selection = slaveSystem.actorSelection( - s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker") + AkkaUtils.address("spark", "localhost", boundPort, "MapOutputTracker", conf)) val timeout = AkkaUtils.lookupTimeout(conf) intercept[akka.actor.ActorNotFound] { slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) @@ -209,4 +212,170 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro slaveSystem.shutdown() } + test("remote fetch ssl on") { + val conf = sparkSSLConfig() + val securityManager = new SecurityManager(conf) + + val hostname = "localhost" + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, + conf = conf, securityManager = securityManager) + System.setProperty("spark.hostPort", hostname + ":" + boundPort) + + assert(securityManager.isAuthenticationEnabled() === false) + + val masterTracker = new MapOutputTrackerMaster(conf) + masterTracker.trackerActor = actorSystem.actorOf( + Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") + + val slaveConf = sparkSSLConfig() + val securityManagerBad = new SecurityManager(slaveConf) + + val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, + conf = slaveConf, securityManager = securityManagerBad) + val slaveTracker = new MapOutputTrackerWorker(conf) + val selection = slaveSystem.actorSelection( + AkkaUtils.address("spark", "localhost", boundPort, "MapOutputTracker", conf)) + val timeout = AkkaUtils.lookupTimeout(conf) + slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) + + assert(securityManagerBad.isAuthenticationEnabled() === false) + + masterTracker.registerShuffle(10, 1) + masterTracker.incrementEpoch() + slaveTracker.updateEpoch(masterTracker.getEpoch) + + val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) + masterTracker.registerMapOutput(10, 0, + MapStatus(BlockManagerId("a", "hostA", 1000), Array(1000L))) + masterTracker.incrementEpoch() + slaveTracker.updateEpoch(masterTracker.getEpoch) + + // this should succeed since security off + assert(slaveTracker.getServerStatuses(10, 0).toSeq === + Seq((BlockManagerId("a", "hostA", 1000), size1000))) + + actorSystem.shutdown() + slaveSystem.shutdown() + } + + + test("remote fetch ssl on and security enabled") { + val conf = sparkSSLConfig() + conf.set("spark.authenticate", "true") + conf.set("spark.authenticate.secret", "good") + val securityManager = new SecurityManager(conf) + + val hostname = "localhost" + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, + conf = conf, securityManager = securityManager) + System.setProperty("spark.hostPort", hostname + ":" + boundPort) + + assert(securityManager.isAuthenticationEnabled() === true) + + val masterTracker = new MapOutputTrackerMaster(conf) + masterTracker.trackerActor = actorSystem.actorOf( + Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") + + val slaveConf = sparkSSLConfig() + slaveConf.set("spark.authenticate", "true") + slaveConf.set("spark.authenticate.secret", "good") + val securityManagerBad = new SecurityManager(slaveConf) + + val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, + conf = slaveConf, securityManager = securityManagerBad) + val slaveTracker = new MapOutputTrackerWorker(conf) + val selection = slaveSystem.actorSelection( + AkkaUtils.address("spark", "localhost", boundPort, "MapOutputTracker", conf)) + val timeout = AkkaUtils.lookupTimeout(conf) + slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) + + assert(securityManagerBad.isAuthenticationEnabled() === true) + + masterTracker.registerShuffle(10, 1) + masterTracker.incrementEpoch() + slaveTracker.updateEpoch(masterTracker.getEpoch) + + val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) + masterTracker.registerMapOutput(10, 0, + MapStatus(BlockManagerId("a", "hostA", 1000), Array(1000L))) + masterTracker.incrementEpoch() + slaveTracker.updateEpoch(masterTracker.getEpoch) + + assert(slaveTracker.getServerStatuses(10, 0).toSeq === + Seq((BlockManagerId("a", "hostA", 1000), size1000))) + + actorSystem.shutdown() + slaveSystem.shutdown() + } + + + test("remote fetch ssl on and security enabled - bad credentials") { + val conf = sparkSSLConfig() + conf.set("spark.authenticate", "true") + conf.set("spark.authenticate.secret", "good") + val securityManager = new SecurityManager(conf) + + val hostname = "localhost" + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, + conf = conf, securityManager = securityManager) + System.setProperty("spark.hostPort", hostname + ":" + boundPort) + + assert(securityManager.isAuthenticationEnabled() === true) + + val masterTracker = new MapOutputTrackerMaster(conf) + masterTracker.trackerActor = actorSystem.actorOf( + Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") + + val slaveConf = sparkSSLConfig() + slaveConf.set("spark.authenticate", "true") + slaveConf.set("spark.authenticate.secret", "bad") + val securityManagerBad = new SecurityManager(slaveConf) + + val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, + conf = slaveConf, securityManager = securityManagerBad) + val slaveTracker = new MapOutputTrackerWorker(conf) + val selection = slaveSystem.actorSelection( + AkkaUtils.address("spark", "localhost", boundPort, "MapOutputTracker", conf)) + val timeout = AkkaUtils.lookupTimeout(conf) + intercept[akka.actor.ActorNotFound] { + slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) + } + + actorSystem.shutdown() + slaveSystem.shutdown() + } + + + test("remote fetch ssl on - untrusted server") { + val conf = sparkSSLConfigUntrusted() + val securityManager = new SecurityManager(conf) + + val hostname = "localhost" + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, + conf = conf, securityManager = securityManager) + System.setProperty("spark.hostPort", hostname + ":" + boundPort) + + assert(securityManager.isAuthenticationEnabled() === false) + + val masterTracker = new MapOutputTrackerMaster(conf) + masterTracker.trackerActor = actorSystem.actorOf( + Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker") + + val slaveConf = sparkSSLConfig() + val securityManagerBad = new SecurityManager(slaveConf) + + val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, + conf = slaveConf, securityManager = securityManagerBad) + val slaveTracker = new MapOutputTrackerWorker(conf) + val selection = slaveSystem.actorSelection( + AkkaUtils.address("spark", "localhost", boundPort, "MapOutputTracker", conf)) + val timeout = AkkaUtils.lookupTimeout(conf) + intercept[TimeoutException] { + slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) + } + + actorSystem.shutdown() + slaveSystem.shutdown() + } + } diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index b433082dce1a2..23c0161536919 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -162,8 +162,8 @@ object ActorWordCount { */ val lines = ssc.actorStream[String]( - Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format( - host, port.toInt))), "SampleReceiver") + Props(new SampleActorReceiver[String]( + AkkaUtils.address("test", host, port, "FeederActor", sparkConf))), "SampleReceiver") // compute wordcount lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print() diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala index b46df12da86dc..9805609120005 100644 --- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala +++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala @@ -45,7 +45,7 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader // Hadoop FileSystem object for our URI, if it isn't using HTTP var fileSystem: FileSystem = { - if (uri.getScheme() == "http") { + if (Set("http", "https", "ftp").contains(uri.getScheme)) { null } else { FileSystem.get(uri, SparkHadoopUtil.get.newConfiguration(conf)) @@ -78,13 +78,16 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader if (fileSystem != null) { fileSystem.open(new Path(directory, pathInDirectory)) } else { - if (SparkEnv.get.securityManager.isAuthenticationEnabled()) { + val url = if (SparkEnv.get.securityManager.isAuthenticationEnabled()) { val uri = new URI(classUri + "/" + urlEncode(pathInDirectory)) val newuri = Utils.constructURIForAuthentication(uri, SparkEnv.get.securityManager) - newuri.toURL().openStream() + newuri.toURL } else { - new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream() + new URL(classUri + "/" + urlEncode(pathInDirectory)) } + + Utils.setupSecureURLConnection(url.openConnection(), SparkEnv.get.securityManager) + .getInputStream } } val bytes = readAndTransformClass(name, inputStream) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 716cf2c7f32fc..07d4818b0f64a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -67,8 +67,8 @@ private[streaming] class ReceiverSupervisorImpl( private val trackerActor = { val ip = env.conf.get("spark.driver.host", "localhost") val port = env.conf.getInt("spark.driver.port", 7077) - val url = "akka.tcp://%s@%s:%s/user/ReceiverTracker".format( - SparkEnv.driverActorSystemName, ip, port) + val url = AkkaUtils.address( + SparkEnv.driverActorSystemName, ip, port, "ReceiverTracker", env.conf) env.actorSystem.actorSelection(url) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index d3e327b2497b7..ebb3fb1ed0463 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -244,11 +244,13 @@ private[spark] class ApplicationMaster( host: String, port: String, isDriver: Boolean): Unit = { - val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + + val driverUrl = AkkaUtils.address( SparkEnv.driverActorSystemName, host, port, - YarnSchedulerBackend.ACTOR_NAME) + YarnSchedulerBackend.ACTOR_NAME, + sparkConf) actor = actorSystem.actorOf(Props(new AMActor(driverUrl, isDriver)), name = "YarnAM") } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 3849586c6111e..87199edeb8918 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -34,9 +34,10 @@ import org.apache.hadoop.yarn.util.RackResolver import org.apache.log4j.{Level, Logger} -import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.AkkaUtils /** * YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding @@ -106,10 +107,12 @@ private[yarn] class YarnAllocator( new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build()) launcherPool.allowCoreThreadTimeOut(true) - private val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format( + private val driverUrl = AkkaUtils.address( + SparkEnv.driverActorSystemName, sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"), - CoarseGrainedSchedulerBackend.ACTOR_NAME) + CoarseGrainedSchedulerBackend.ACTOR_NAME, + sparkConf) // For testing private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true) From 72b2541270b458369e6af382a8bb272e9688d865 Mon Sep 17 00:00:00 2001 From: Jacek Lewandowski Date: Thu, 29 Jan 2015 12:13:23 +0100 Subject: [PATCH 2/5] SPARK-3883: A reference to the fallback SSLOptions can be provided when constructing SSLOptions --- .../scala/org/apache/spark/SSLOptions.scala | 23 +++++-- .../org/apache/spark/SecurityManager.scala | 2 +- .../org/apache/spark/SSLOptionsSuite.scala | 65 +++++++++++++++++++ 3 files changed, 85 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala index 2479911dd6237..511b7ba4f74ae 100644 --- a/core/src/main/scala/org/apache/spark/SSLOptions.scala +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -98,16 +98,31 @@ private[spark] object SSLOptions extends Logging { * The parent directory of that location is used as a base directory to resolve relative paths * to keystore and truststore. */ - def parse(conf: SparkConf, ns: String): SSLOptions = { - val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = false) + def parse(conf: SparkConf, ns: String, defaults: Option[SSLOptions] = None): SSLOptions = { + val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled)) + val keyStore = conf.getOption(s"$ns.keyStore").map(new File(_)) + .orElse(defaults.flatMap(_.keyStore)) + val keyStorePassword = conf.getOption(s"$ns.keyStorePassword") + .orElse(defaults.flatMap(_.keyStorePassword)) + val keyPassword = conf.getOption(s"$ns.keyPassword") + .orElse(defaults.flatMap(_.keyPassword)) + val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_)) + .orElse(defaults.flatMap(_.trustStore)) + val trustStorePassword = conf.getOption(s"$ns.trustStorePassword") + .orElse(defaults.flatMap(_.trustStorePassword)) + val protocol = conf.getOption(s"$ns.protocol") - val enabledAlgorithms = conf.get(s"$ns.enabledAlgorithms", defaultValue = "") - .split(",").map(_.trim).filter(_.nonEmpty).toSet + .orElse(defaults.flatMap(_.protocol)) + + val enabledAlgorithms = conf.getOption(s"$ns.enabledAlgorithms") + .map(_.split(",").map(_.trim).filter(_.nonEmpty).toSet) + .orElse(defaults.map(_.enabledAlgorithms)) + .getOrElse(Set.empty) new SSLOptions( enabled, diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index d3ab0790e5e95..923fde26c0164 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -201,7 +201,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) ) } - val sslOptions = SSLOptions.parse(sparkConf, "spark.ssl") + val sslOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None) logDebug(s"SSLConfiguration: $sslOptions") val (sslSocketFactory, hostnameVerifier) = if (sslOptions.enabled) { diff --git a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala index 01b20b5deeb7a..444a33371bd71 100644 --- a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala @@ -55,4 +55,69 @@ class SSLOptionsSuite extends FunSuite with BeforeAndAfterAll { assert(opts.enabledAlgorithms === Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA")) } + test("test resolving property with defaults specified ") { + val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath + val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath + + val conf = new SparkConf + conf.set("spark.ssl.enabled", "true") + conf.set("spark.ssl.keyStore", keyStorePath) + conf.set("spark.ssl.keyStorePassword", "password") + conf.set("spark.ssl.keyPassword", "password") + conf.set("spark.ssl.trustStore", trustStorePath) + conf.set("spark.ssl.trustStorePassword", "password") + conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA") + conf.set("spark.ssl.protocol", "SSLv3") + + val defaultOpts = SSLOptions.parse(conf, "spark.ssl", defaults = None) + val opts = SSLOptions.parse(conf, "spark.ui.ssl", defaults = Some(defaultOpts)) + + assert(opts.enabled === true) + assert(opts.trustStore.isDefined === true) + assert(opts.trustStore.get.getName === "truststore") + assert(opts.trustStore.get.getAbsolutePath === trustStorePath) + assert(opts.keyStore.isDefined === true) + assert(opts.keyStore.get.getName === "keystore") + assert(opts.keyStore.get.getAbsolutePath === keyStorePath) + assert(opts.trustStorePassword === Some("password")) + assert(opts.keyStorePassword === Some("password")) + assert(opts.keyPassword === Some("password")) + assert(opts.protocol === Some("SSLv3")) + assert(opts.enabledAlgorithms === Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA")) + } + + test("test whether defaults can be overridden ") { + val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath + val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath + + val conf = new SparkConf + conf.set("spark.ssl.enabled", "true") + conf.set("spark.ui.ssl.enabled", "false") + conf.set("spark.ssl.keyStore", keyStorePath) + conf.set("spark.ssl.keyStorePassword", "password") + conf.set("spark.ui.ssl.keyStorePassword", "12345") + conf.set("spark.ssl.keyPassword", "password") + conf.set("spark.ssl.trustStore", trustStorePath) + conf.set("spark.ssl.trustStorePassword", "password") + conf.set("spark.ssl.enabledAlgorithms", "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA") + conf.set("spark.ui.ssl.enabledAlgorithms", "ABC, DEF") + conf.set("spark.ssl.protocol", "SSLv3") + + val defaultOpts = SSLOptions.parse(conf, "spark.ssl", defaults = None) + val opts = SSLOptions.parse(conf, "spark.ui.ssl", defaults = Some(defaultOpts)) + + assert(opts.enabled === false) + assert(opts.trustStore.isDefined === true) + assert(opts.trustStore.get.getName === "truststore") + assert(opts.trustStore.get.getAbsolutePath === trustStorePath) + assert(opts.keyStore.isDefined === true) + assert(opts.keyStore.get.getName === "keystore") + assert(opts.keyStore.get.getAbsolutePath === keyStorePath) + assert(opts.trustStorePassword === Some("password")) + assert(opts.keyStorePassword === Some("12345")) + assert(opts.keyPassword === Some("password")) + assert(opts.protocol === Some("SSLv3")) + assert(opts.enabledAlgorithms === Set("ABC", "DEF")) + } + } From 90a8762a065e4a659902a8a346f3ee28ede52998 Mon Sep 17 00:00:00 2001 From: Jacek Lewandowski Date: Thu, 29 Jan 2015 13:50:55 +0100 Subject: [PATCH 3/5] SPARK-3883: Refactored methods to resolve Akka address and made it possible to easily configure multiple communication layers for SSL --- .../scala/org/apache/spark/HttpServer.scala | 4 +-- .../org/apache/spark/SecurityManager.scala | 22 ++++++++---- .../org/apache/spark/deploy/Client.scala | 5 +-- .../spark/deploy/client/AppClient.scala | 7 ++-- .../apache/spark/deploy/master/Master.scala | 8 ++--- .../apache/spark/deploy/worker/Worker.scala | 14 +++++--- .../cluster/SimrSchedulerBackend.scala | 4 +-- .../cluster/SparkDeploySchedulerBackend.scala | 4 +-- .../mesos/CoarseMesosSchedulerBackend.scala | 4 +-- .../org/apache/spark/util/AkkaUtils.scala | 27 +++++++++----- .../apache/spark/MapOutputTrackerSuite.scala | 2 +- .../apache/spark/SecurityManagerSuite.scala | 36 +++++++++++++------ .../spark/deploy/master/MasterSuite.scala | 16 ++++----- .../apache/spark/util/AkkaUtilsSuite.scala | 16 ++++----- .../examples/streaming/ActorWordCount.scala | 4 +-- .../receiver/ReceiverSupervisorImpl.scala | 6 +++- .../spark/deploy/yarn/ApplicationMaster.scala | 4 +-- .../spark/deploy/yarn/YarnAllocator.scala | 4 +-- 18 files changed, 114 insertions(+), 73 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 8466d99615698..09a9ccc226721 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -74,7 +74,7 @@ private[spark] class HttpServer( private def doStart(startPort: Int): (Server, Int) = { val server = new Server() - val connector = securityManager.sslOptions.createJettySslContextFactory() + val connector = securityManager.fileServerSSLOptions.createJettySslContextFactory() .map(new SslSocketConnector(_)).getOrElse(new SocketConnector) connector.setMaxIdleTime(60 * 1000) @@ -159,7 +159,7 @@ private[spark] class HttpServer( if (server == null) { throw new ServerStateException("Server is not started") } else { - val scheme = if (securityManager.sslOptions.enabled) "https" else "http" + val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http" s"$scheme://${Utils.localIpAddress}:$port" } } diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 923fde26c0164..322386d5cd35b 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -201,17 +201,25 @@ private[spark] class SecurityManager(sparkConf: SparkConf) ) } - val sslOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None) - logDebug(s"SSLConfiguration: $sslOptions") + // the default SSL configuration - it will be used by all communication layers unless overwritten + private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None) - val (sslSocketFactory, hostnameVerifier) = if (sslOptions.enabled) { + // SSL configuration for different communication layers - they can override the default + // configuration at a specified namespace. The namespace *must* start with spark.ssl. + val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions)) + val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions)) + + logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions") + logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions") + + val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) { val trustStoreManagers = - for (trustStore <- sslOptions.trustStore) yield { - val input = Files.asByteSource(sslOptions.trustStore.get).openStream() + for (trustStore <- fileServerSSLOptions.trustStore) yield { + val input = Files.asByteSource(fileServerSSLOptions.trustStore.get).openStream() try { val ks = KeyStore.getInstance(KeyStore.getDefaultType) - ks.load(input, sslOptions.trustStorePassword.get.toCharArray) + ks.load(input, fileServerSSLOptions.trustStorePassword.get.toCharArray) val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) tmf.init(ks) @@ -232,7 +240,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) }: TrustManager }) - val sslContext = SSLContext.getInstance(sslOptions.protocol.getOrElse("Default")) + val sslContext = SSLContext.getInstance(fileServerSSLOptions.protocol.getOrElse("Default")) sslContext.init(null, trustStoreManagers.getOrElse(credulousTrustStoreManagers), null) val hostVerifier = new HostnameVerifier { diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 542635fada6e8..38b3da0b13756 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -39,7 +39,8 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) val timeout = AkkaUtils.askTimeout(conf) override def preStart() = { - masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master, conf)) + masterActor = context.actorSelection( + Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(context.system))) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) @@ -161,7 +162,7 @@ object Client { "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf)) // Verify driverArgs.master is a valid url so that we can use it in ClientActor safely - Master.toAkkaUrl(driverArgs.master, conf) + Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(actorSystem)) actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf)) actorSystem.awaitTermination() diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index a711922f5cd82..ffe940fbda2fb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -47,7 +47,7 @@ private[spark] class AppClient( conf: SparkConf) extends Logging { - val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, conf)) + val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem))) val REGISTRATION_TIMEOUT = 20.seconds val REGISTRATION_RETRIES = 3 @@ -107,8 +107,9 @@ private[spark] class AppClient( def changeMaster(url: String) { // activeMasterUrl is a valid Spark url since we receive it from master. activeMasterUrl = url - master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl, conf)) - masterAddress = Master.toAkkaAddress(activeMasterUrl, conf) + master = context.actorSelection( + Master.toAkkaUrl(activeMasterUrl, AkkaUtils.protocol(actorSystem))) + masterAddress = Master.toAkkaAddress(activeMasterUrl, AkkaUtils.protocol(actorSystem)) } private def isPossibleMaster(remoteUrl: Address) = { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index a8294ecd51e3e..5eeb9fe526248 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -860,9 +860,9 @@ private[spark] object Master extends Logging { * * @throws SparkException if the url is invalid */ - def toAkkaUrl(sparkUrl: String, conf: SparkConf): String = { + def toAkkaUrl(sparkUrl: String, protocol: String): String = { val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl) - AkkaUtils.address(systemName, host, port, actorName, conf) + AkkaUtils.address(protocol, systemName, host, port, actorName) } /** @@ -870,9 +870,9 @@ private[spark] object Master extends Logging { * * @throws SparkException if the url is invalid */ - def toAkkaAddress(sparkUrl: String, conf: SparkConf): Address = { + def toAkkaAddress(sparkUrl: String, protocol: String): Address = { val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl) - Address(AkkaUtils.protocol(conf), systemName, host, port) + Address(protocol, systemName, host, port) } def startSystemAndActor( diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 79fce0961369f..b20f5c0c82895 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -93,7 +93,12 @@ private[spark] class Worker( var masterAddress: Address = null var activeMasterUrl: String = "" var activeMasterWebUiUrl : String = "" - val akkaUrl = AkkaUtils.address(actorSystemName, host, port, actorName, conf) + val akkaUrl = AkkaUtils.address( + AkkaUtils.protocol(context.system), + actorSystemName, + host, + port, + actorName) @volatile var registered = false @volatile var connected = false val workerId = generateWorkerId() @@ -174,8 +179,9 @@ private[spark] class Worker( // activeMasterUrl it's a valid Spark url since we receive it from master. activeMasterUrl = url activeMasterWebUiUrl = uiUrl - master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl, conf)) - masterAddress = Master.toAkkaAddress(activeMasterUrl, conf) + master = context.actorSelection( + Master.toAkkaUrl(activeMasterUrl, AkkaUtils.protocol(context.system))) + masterAddress = Master.toAkkaAddress(activeMasterUrl, AkkaUtils.protocol(context.system)) connected = true // Cancel any outstanding re-registration attempts because we found a new master registrationRetryTimer.foreach(_.cancel()) @@ -540,7 +546,7 @@ private[spark] object Worker extends Logging { val securityMgr = new SecurityManager(conf) val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, securityManager = securityMgr) - val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, conf)) + val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem))) actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory, masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName) (actorSystem, boundPort) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index 162058c328b2d..06786a59524e7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -40,11 +40,11 @@ private[spark] class SimrSchedulerBackend( super.start() val driverUrl = AkkaUtils.address( + AkkaUtils.protocol(actorSystem), SparkEnv.driverActorSystemName, sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port"), - CoarseGrainedSchedulerBackend.ACTOR_NAME, - sc.conf) + CoarseGrainedSchedulerBackend.ACTOR_NAME) val conf = SparkHadoopUtil.get.newConfiguration(sc.conf) val fs = FileSystem.get(conf) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 238ae462fefdd..d2e1680a5fd1b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -47,11 +47,11 @@ private[spark] class SparkDeploySchedulerBackend( // The endpoint for executors to talk to us val driverUrl = AkkaUtils.address( + AkkaUtils.protocol(actorSystem), SparkEnv.driverActorSystemName, conf.get("spark.driver.host"), conf.get("spark.driver.port"), - CoarseGrainedSchedulerBackend.ACTOR_NAME, - conf) + CoarseGrainedSchedulerBackend.ACTOR_NAME) val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{APP_ID}}", "{{WORKER_URL}}") val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 23d9c81438eb8..0d1c2a916ca7f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -144,11 +144,11 @@ private[spark] class CoarseMesosSchedulerBackend( val command = CommandInfo.newBuilder() .setEnvironment(environment) val driverUrl = AkkaUtils.address( + AkkaUtils.protocol(sc.env.actorSystem), SparkEnv.driverActorSystemName, conf.get("spark.driver.host"), conf.get("spark.driver.port"), - CoarseGrainedSchedulerBackend.ACTOR_NAME, - conf) + CoarseGrainedSchedulerBackend.ACTOR_NAME) val uri = conf.get("spark.executor.uri", null) if (uri == null) { diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 77f9182ed79e6..a442539ac1778 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.util import scala.collection.JavaConversions.mapAsJavaMap import scala.concurrent.Await import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.util.Try import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem} import akka.pattern.ask @@ -27,7 +28,7 @@ import akka.pattern.ask import com.typesafe.config.ConfigFactory import org.apache.log4j.{Level, Logger} -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException, SSLOptions} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException} /** * Various utility classes for working with Akka. @@ -91,7 +92,8 @@ private[spark] object AkkaUtils extends Logging { val secureCookie = if (isAuthOn) secretKey else "" logDebug(s"In createActorSystem, requireCookie is: $requireCookie") - val akkaSslConfig = securityManager.sslOptions.createAkkaConfig.getOrElse(ConfigFactory.empty()) + val akkaSslConfig = securityManager.akkaSSLOptions.createAkkaConfig + .getOrElse(ConfigFactory.empty()) val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]) .withFallback(akkaSslConfig).withFallback(ConfigFactory.parseString( @@ -216,7 +218,7 @@ private[spark] object AkkaUtils extends Logging { val driverHost: String = conf.get("spark.driver.host", "localhost") val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost, "Expected hostname") - val url = address(driverActorSystemName, driverHost, driverPort, name, conf) + val url = address(protocol(actorSystem), driverActorSystemName, driverHost, driverPort, name) val timeout = AkkaUtils.lookupTimeout(conf) logInfo(s"Connecting to $name: $url") Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) @@ -230,14 +232,20 @@ private[spark] object AkkaUtils extends Logging { actorSystem: ActorSystem): ActorRef = { val executorActorSystemName = SparkEnv.executorActorSystemName Utils.checkHost(host, "Expected hostname") - val url = address(executorActorSystemName, host, port, name, conf) + val url = address(protocol(actorSystem), executorActorSystemName, host, port, name) val timeout = AkkaUtils.lookupTimeout(conf) logInfo(s"Connecting to $name: $url") Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) } - def protocol(conf: SparkConf): String = { - if (conf.getBoolean("spark.ssl.enabled", defaultValue = false)) { + def protocol(actorSystem: ActorSystem): String = { + protocol(Try { + actorSystem.settings.config.getBoolean("akka.remote.netty.tcp.enable-ssl") + }.getOrElse(false)) + } + + def protocol(ssl: Boolean = false): String = { + if (ssl) { "akka.ssl.tcp" } else { "akka.tcp" @@ -245,11 +253,12 @@ private[spark] object AkkaUtils extends Logging { } def address( + protocol: String, systemName: String, host: String, port: Any, - actorName: String, - conf: SparkConf): String = { - s"${protocol(conf)}://$systemName@$host:$port/user/$actorName" + actorName: String): String = { + s"$protocol://$systemName@$host:$port/user/$actorName" } + } diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 3a5578cb902d0..ccfe0678cb1c3 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -120,7 +120,7 @@ class MapOutputTrackerSuite extends FunSuite { securityManager = new SecurityManager(conf)) val slaveTracker = new MapOutputTrackerWorker(conf) val selection = slaveSystem.actorSelection( - AkkaUtils.address("spark", "localhost", boundPort, "MapOutputTracker", conf)) + AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker")) val timeout = AkkaUtils.lookupTimeout(conf) slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala index 564a1e024671a..43fbd3ff3f756 100644 --- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala @@ -130,19 +130,32 @@ class SecurityManagerSuite extends FunSuite { val securityManager = new SecurityManager(conf) - assert(securityManager.sslOptions.enabled === true) + assert(securityManager.fileServerSSLOptions.enabled === true) + assert(securityManager.akkaSSLOptions.enabled === true) + assert(securityManager.sslSocketFactory.isDefined === true) assert(securityManager.hostnameVerifier.isDefined === true) - assert(securityManager.sslOptions.trustStore.isDefined === true) - assert(securityManager.sslOptions.trustStore.get.getName === "truststore") - assert(securityManager.sslOptions.keyStore.isDefined === true) - assert(securityManager.sslOptions.keyStore.get.getName === "keystore") - assert(securityManager.sslOptions.trustStorePassword === Some("password")) - assert(securityManager.sslOptions.keyStorePassword === Some("password")) - assert(securityManager.sslOptions.keyPassword === Some("password")) - assert(securityManager.sslOptions.protocol === Some("TLSv1")) - assert(securityManager.sslOptions.enabledAlgorithms === + assert(securityManager.fileServerSSLOptions.trustStore.isDefined === true) + assert(securityManager.fileServerSSLOptions.trustStore.get.getName === "truststore") + assert(securityManager.fileServerSSLOptions.keyStore.isDefined === true) + assert(securityManager.fileServerSSLOptions.keyStore.get.getName === "keystore") + assert(securityManager.fileServerSSLOptions.trustStorePassword === Some("password")) + assert(securityManager.fileServerSSLOptions.keyStorePassword === Some("password")) + assert(securityManager.fileServerSSLOptions.keyPassword === Some("password")) + assert(securityManager.fileServerSSLOptions.protocol === Some("TLSv1")) + assert(securityManager.fileServerSSLOptions.enabledAlgorithms === + Set("TLS_RSA_WITH_AES_128_CBC_SHA", "SSL_RSA_WITH_DES_CBC_SHA")) + + assert(securityManager.akkaSSLOptions.trustStore.isDefined === true) + assert(securityManager.akkaSSLOptions.trustStore.get.getName === "truststore") + assert(securityManager.akkaSSLOptions.keyStore.isDefined === true) + assert(securityManager.akkaSSLOptions.keyStore.get.getName === "keystore") + assert(securityManager.akkaSSLOptions.trustStorePassword === Some("password")) + assert(securityManager.akkaSSLOptions.keyStorePassword === Some("password")) + assert(securityManager.akkaSSLOptions.keyPassword === Some("password")) + assert(securityManager.akkaSSLOptions.protocol === Some("TLSv1")) + assert(securityManager.akkaSSLOptions.enabledAlgorithms === Set("TLS_RSA_WITH_AES_128_CBC_SHA", "SSL_RSA_WITH_DES_CBC_SHA")) } @@ -155,7 +168,8 @@ class SecurityManagerSuite extends FunSuite { val securityManager = new SecurityManager(conf) - assert(securityManager.sslOptions.enabled === false) + assert(securityManager.fileServerSSLOptions.enabled === false) + assert(securityManager.akkaSSLOptions.enabled === false) assert(securityManager.sslSocketFactory.isDefined === false) assert(securityManager.hostnameVerifier.isDefined === false) } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index 9d8a1fcb07b60..34c74d87f0a62 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -20,48 +20,46 @@ package org.apache.spark.deploy.master import akka.actor.Address import org.scalatest.FunSuite -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.{SSLOptions, SparkConf, SparkException} class MasterSuite extends FunSuite { test("toAkkaUrl") { val conf = new SparkConf(loadDefaults = false) - val akkaUrl = Master.toAkkaUrl("spark://1.2.3.4:1234", conf) + val akkaUrl = Master.toAkkaUrl("spark://1.2.3.4:1234", "akka.tcp") assert("akka.tcp://sparkMaster@1.2.3.4:1234/user/Master" === akkaUrl) } test("toAkkaUrl with SSL") { val conf = new SparkConf(loadDefaults = false) - conf.set("spark.ssl.enabled", "true") - val akkaUrl = Master.toAkkaUrl("spark://1.2.3.4:1234", conf) + val akkaUrl = Master.toAkkaUrl("spark://1.2.3.4:1234", "akka.ssl.tcp") assert("akka.ssl.tcp://sparkMaster@1.2.3.4:1234/user/Master" === akkaUrl) } test("toAkkaUrl: a typo url") { val conf = new SparkConf(loadDefaults = false) val e = intercept[SparkException] { - Master.toAkkaUrl("spark://1.2. 3.4:1234", conf) + Master.toAkkaUrl("spark://1.2. 3.4:1234", "akka.tcp") } assert("Invalid master URL: spark://1.2. 3.4:1234" === e.getMessage) } test("toAkkaAddress") { val conf = new SparkConf(loadDefaults = false) - val address = Master.toAkkaAddress("spark://1.2.3.4:1234", conf) + val address = Master.toAkkaAddress("spark://1.2.3.4:1234", "akka.tcp") assert(Address("akka.tcp", "sparkMaster", "1.2.3.4", 1234) === address) } test("toAkkaAddress with SSL") { val conf = new SparkConf(loadDefaults = false) - conf.set("spark.ssl.enabled", "true") - val address = Master.toAkkaAddress("spark://1.2.3.4:1234", conf) + val address = Master.toAkkaAddress("spark://1.2.3.4:1234", "akka.ssl.tcp") assert(Address("akka.ssl.tcp", "sparkMaster", "1.2.3.4", 1234) === address) } test("toAkkaAddress: a typo url") { val conf = new SparkConf(loadDefaults = false) val e = intercept[SparkException] { - Master.toAkkaAddress("spark://1.2. 3.4:1234", conf) + Master.toAkkaAddress("spark://1.2. 3.4:1234", "akka.tcp") } assert("Invalid master URL: spark://1.2. 3.4:1234" === e.getMessage) } diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala index 38a9341efb500..39e5d367d676c 100644 --- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala @@ -63,7 +63,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro conf = conf, securityManager = securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) val selection = slaveSystem.actorSelection( - AkkaUtils.address("spark", "localhost", boundPort, "MapOutputTracker", conf)) + AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker")) val timeout = AkkaUtils.lookupTimeout(conf) intercept[akka.actor.ActorNotFound] { slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) @@ -99,7 +99,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro conf = badconf, securityManager = securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) val selection = slaveSystem.actorSelection( - AkkaUtils.address("spark", "localhost", boundPort, "MapOutputTracker", conf)) + AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker")) val timeout = AkkaUtils.lookupTimeout(conf) slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) @@ -151,7 +151,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro conf = goodconf, securityManager = securityManagerGood) val slaveTracker = new MapOutputTrackerWorker(conf) val selection = slaveSystem.actorSelection( - AkkaUtils.address("spark", "localhost", boundPort, "MapOutputTracker", conf)) + AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker")) val timeout = AkkaUtils.lookupTimeout(conf) slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) @@ -202,7 +202,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro conf = badconf, securityManager = securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) val selection = slaveSystem.actorSelection( - AkkaUtils.address("spark", "localhost", boundPort, "MapOutputTracker", conf)) + AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker")) val timeout = AkkaUtils.lookupTimeout(conf) intercept[akka.actor.ActorNotFound] { slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) @@ -234,7 +234,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro conf = slaveConf, securityManager = securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) val selection = slaveSystem.actorSelection( - AkkaUtils.address("spark", "localhost", boundPort, "MapOutputTracker", conf)) + AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker")) val timeout = AkkaUtils.lookupTimeout(conf) slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) @@ -285,7 +285,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro conf = slaveConf, securityManager = securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) val selection = slaveSystem.actorSelection( - AkkaUtils.address("spark", "localhost", boundPort, "MapOutputTracker", conf)) + AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker")) val timeout = AkkaUtils.lookupTimeout(conf) slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) @@ -335,7 +335,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro conf = slaveConf, securityManager = securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) val selection = slaveSystem.actorSelection( - AkkaUtils.address("spark", "localhost", boundPort, "MapOutputTracker", conf)) + AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker")) val timeout = AkkaUtils.lookupTimeout(conf) intercept[akka.actor.ActorNotFound] { slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) @@ -368,7 +368,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro conf = slaveConf, securityManager = securityManagerBad) val slaveTracker = new MapOutputTrackerWorker(conf) val selection = slaveSystem.actorSelection( - AkkaUtils.address("spark", "localhost", boundPort, "MapOutputTracker", conf)) + AkkaUtils.address(AkkaUtils.protocol(slaveSystem), "spark", "localhost", boundPort, "MapOutputTracker")) val timeout = AkkaUtils.lookupTimeout(conf) intercept[TimeoutException] { slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index 23c0161536919..b433082dce1a2 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -162,8 +162,8 @@ object ActorWordCount { */ val lines = ssc.actorStream[String]( - Props(new SampleActorReceiver[String]( - AkkaUtils.address("test", host, port, "FeederActor", sparkConf))), "SampleReceiver") + Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format( + host, port.toInt))), "SampleReceiver") // compute wordcount lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 07d4818b0f64a..7d29ed88cfcb4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -68,7 +68,11 @@ private[streaming] class ReceiverSupervisorImpl( val ip = env.conf.get("spark.driver.host", "localhost") val port = env.conf.getInt("spark.driver.port", 7077) val url = AkkaUtils.address( - SparkEnv.driverActorSystemName, ip, port, "ReceiverTracker", env.conf) + AkkaUtils.protocol(env.actorSystem), + SparkEnv.driverActorSystemName, + ip, + port, + "ReceiverTracker") env.actorSystem.actorSelection(url) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index ebb3fb1ed0463..40f4a0dedaf3e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -246,11 +246,11 @@ private[spark] class ApplicationMaster( isDriver: Boolean): Unit = { val driverUrl = AkkaUtils.address( + AkkaUtils.protocol(actorSystem), SparkEnv.driverActorSystemName, host, port, - YarnSchedulerBackend.ACTOR_NAME, - sparkConf) + YarnSchedulerBackend.ACTOR_NAME) actor = actorSystem.actorOf(Props(new AMActor(driverUrl, isDriver)), name = "YarnAM") } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 87199edeb8918..5a7535623542a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -108,11 +108,11 @@ private[yarn] class YarnAllocator( launcherPool.allowCoreThreadTimeOut(true) private val driverUrl = AkkaUtils.address( + AkkaUtils.protocol(securityMgr.akkaSSLOptions.enabled), SparkEnv.driverActorSystemName, sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"), - CoarseGrainedSchedulerBackend.ACTOR_NAME, - sparkConf) + CoarseGrainedSchedulerBackend.ACTOR_NAME) // For testing private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true) From 2532668fbffb6e4b6a8340c4b2748f45083d3f13 Mon Sep 17 00:00:00 2001 From: Jacek Lewandowski Date: Mon, 2 Feb 2015 10:51:37 +0100 Subject: [PATCH 4/5] SPARK-3883: Refactored AkkaUtils.protocol method to not use Try --- core/src/main/scala/org/apache/spark/util/AkkaUtils.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index a442539ac1778..3d9c6192ff7f7 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -239,9 +239,9 @@ private[spark] object AkkaUtils extends Logging { } def protocol(actorSystem: ActorSystem): String = { - protocol(Try { - actorSystem.settings.config.getBoolean("akka.remote.netty.tcp.enable-ssl") - }.getOrElse(false)) + val akkaConf = actorSystem.settings.config + val sslProp = "akka.remote.netty.tcp.enable-ssl" + protocol(akkaConf.hasPath(sslProp) && akkaConf.getBoolean(sslProp)) } def protocol(ssl: Boolean = false): String = { From fb31b49df7753ae97b071517b9a7959cd7edb354 Mon Sep 17 00:00:00 2001 From: Jacek Lewandowski Date: Mon, 2 Feb 2015 12:14:05 +0100 Subject: [PATCH 5/5] SPARK-3883: Added SSL setup documentation --- .../scala/org/apache/spark/SSLOptions.scala | 60 +++++++++++--- .../org/apache/spark/SecurityManager.scala | 41 ++++++++-- docs/configuration.md | 80 +++++++++++++++++++ docs/security.md | 24 ++++++ 4 files changed, 188 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala index 511b7ba4f74ae..309fac07b908e 100644 --- a/core/src/main/scala/org/apache/spark/SSLOptions.scala +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -22,6 +22,23 @@ import java.io.File import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} import org.eclipse.jetty.util.ssl.SslContextFactory +/** SSLOptions class is a common container for SSL configuration options. It offers methods to + * generate specific objects to configure SSL for different communication protocols. + * + * SSLOptions is intended to provide the maximum common set of SSL settings, which are supported + * by the protocol, which it can generate the configuration for. Since Akka doesn't support client + * authentication with SSL, SSLOptions cannot support it either. + * + * @param enabled enables or disables SSL; if it is set to false, the rest of the + * settings are disregarded + * @param keyStore a path to the key-store file + * @param keyStorePassword a password to access the key-store file + * @param keyPassword a password to access the private key in the key-store + * @param trustStore a path to the trust-store file + * @param trustStorePassword a password to access the trust-store file + * @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java + * @param enabledAlgorithms a set of encryption algorithms to use + */ private[spark] case class SSLOptions( enabled: Boolean = false, keyStore: Option[File] = None, @@ -32,9 +49,8 @@ private[spark] case class SSLOptions( protocol: Option[String] = None, enabledAlgorithms: Set[String] = Set.empty) { - /** - * Creates a Jetty SSL context factory according to the SSL settings represented by this object. - */ + /** Creates a Jetty SSL context factory according to the SSL settings represented by this object. + */ def createJettySslContextFactory(): Option[SslContextFactory] = { if (enabled) { val sslContextFactory = new SslContextFactory() @@ -53,10 +69,9 @@ private[spark] case class SSLOptions( } } - /** - * Creates an Akka configuration object which contains all the SSL settings represented by this - * object. It can be used then to compose the ultimate Akka configuration. - */ + /** Creates an Akka configuration object which contains all the SSL settings represented by this + * object. It can be used then to compose the ultimate Akka configuration. + */ def createAkkaConfig: Option[Config] = { import scala.collection.JavaConversions._ if (enabled) { @@ -84,6 +99,7 @@ private[spark] case class SSLOptions( } } + /** Returns a string representation of this SSLOptions with all the passwords masked. */ override def toString: String = s"SSLOptions{enabled=$enabled, " + s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " + s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " + @@ -93,11 +109,31 @@ private[spark] case class SSLOptions( private[spark] object SSLOptions extends Logging { - /** - * Resolves SSLOptions settings from a given Spark configuration object at a given namespace. - * The parent directory of that location is used as a base directory to resolve relative paths - * to keystore and truststore. - */ + /** Resolves SSLOptions settings from a given Spark configuration object at a given namespace. + * + * The following settings are allowed: + * $ - `[ns].enabled` - `true` or `false`, to enable or disable SSL respectively + * $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory + * $ - `[ns].keyStorePassword` - a password to the key-store file + * $ - `[ns].keyPassword` - a password to the private key + * $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current + * directory + * $ - `[ns].trustStorePassword` - a password to the trust-store file + * $ - `[ns].protocol` - a protocol name supported by a particular Java version + * $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers + * + * For a list of protocols and ciphers supported by particular Java versions, you may go to + * [[https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https Oracle + * blog page]]. + * + * You can optionally specify the default configuration. If you do, for each setting which is + * missing in SparkConf, the corresponding setting is used from the default configuration. + * + * @param conf Spark configuration object where the settings are collected from + * @param ns the namespace name + * @param defaults the default configuration + * @return [[org.apache.spark.SSLOptions]] object + */ def parse(conf: SparkConf, ns: String, defaults: Option[SSLOptions] = None): SSLOptions = { val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled)) diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 322386d5cd35b..59f25cc8a18af 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -59,7 +59,7 @@ import org.apache.spark.network.sasl.SecretKeyHolder * Spark also has a set of admin acls (`spark.admin.acls`) which is a set of users/administrators * who always have permission to view or modify the Spark application. * - * Spark does not currently support encryption after authentication. + * Starting from version 1.3, Spark has partial support for encrypted connections with SSL. * * At this point spark has multiple communication protocols that need to be secured and * different underlying mechanisms are used depending on the protocol: @@ -71,8 +71,9 @@ import org.apache.spark.network.sasl.SecretKeyHolder * to connect to the server. There is no control of the underlying * authentication mechanism so its not clear if the password is passed in * plaintext or uses DIGEST-MD5 or some other mechanism. - * Akka also has an option to turn on SSL, this option is not currently supported - * but we could add a configuration option in the future. + * + * Akka also has an option to turn on SSL, this option is currently supported (see + * the details below). * * - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty * for the HttpServer. Jetty supports multiple authentication mechanisms - @@ -81,8 +82,9 @@ import org.apache.spark.network.sasl.SecretKeyHolder * to authenticate using DIGEST-MD5 via a single user and the shared secret. * Since we are using DIGEST-MD5, the shared secret is not passed on the wire * in plaintext. - * We currently do not support SSL (https), but Jetty can be configured to use it - * so we could add a configuration option for this in the future. + * + * We currently support SSL (https) for this communication protocol (see the details + * below). * * The Spark HttpServer installs the HashLoginServer and configures it to DIGEST-MD5. * Any clients must specify the user and password. There is a default @@ -146,6 +148,35 @@ import org.apache.spark.network.sasl.SecretKeyHolder * authentication. Spark will then use that user to compare against the view acls to do * authorization. If not filter is in place the user is generally null and no authorization * can take place. + * + * Connection encryption (SSL) configuration is organized hierarchically. The user can configure + * the default SSL settings which will be used for all the supported communication protocols unless + * they are overwritten by protocol specific settings. This way the user can easily provide the + * common settings for all the protocols without disabling the ability to configure each one + * individually. + * + * All the SSL settings like `spark.ssl.xxx` where `xxx` is a particular configuration property, + * denote the global configuration for all the supported protocols. In order to override the global + * configuration for the particular protocol, the properties must be overwritten in the + * protocol-specific namespace. Use `spark.ssl.yyy.xxx` settings to overwrite the global + * configuration for particular protocol denoted by `yyy`. Currently `yyy` can be either `akka` for + * Akka based connections or `fs` for broadcast and file server. + * + * Refer to [[org.apache.spark.SSLOptions]] documentation for the list of + * options that can be specified. + * + * SecurityManager initializes SSLOptions objects for different protocols separately. SSLOptions + * object parses Spark configuration at a given namespace and builds the common representation + * of SSL settings. SSLOptions is the used to provide protocol-specific configuration like TypeSafe + * configuration for Akka or SSLContextFactory for Jetty. + * SSL must be configured on each node and configured for each component involved in + * communication using the particular protocol. In YARN clusters, the key-store can be prepared on + * the client side then distributed and used by the executors as the part of the application + * (YARN allows the user to deploy files before the application is started). + * In standalone deployment, the user needs to provide key-stores and configuration + * options for master and workers. In this mode, the user may allow the executors to use the SSL + * settings inherited from the worker which spawned that executor. It can be accomplished by + * setting `spark.ssl.useNodeLocalConf` to `true`. */ private[spark] class SecurityManager(sparkConf: SparkConf) diff --git a/docs/configuration.md b/docs/configuration.md index e4e4b8d516b75..360cb7847e5a3 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1234,6 +1234,86 @@ Apart from these, the following properties are also available, and may be useful +#### Encryption + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Property NameDefaultMeaning
spark.ssl.enabledfalse +

Whether to enable SSL connections on all supported protocols.

+ +

All the SSL settings like spark.ssl.xxx where xxx is a + particular configuration property, denote the global configuration for all the supported + protocols. In order to override the global configuration for the particular protocol, + the properties must be overwritten in the protocol-specific namespace.

+ +

Use spark.ssl.YYY.XXX settings to overwrite the global configuration for + particular protocol denoted by YYY. Currently YYY can be + either akka for Akka based connections or fs for broadcast and + file server.

+
spark.ssl.keyStoreNone + A path to a key-store file. The path can be absolute or relative to the directory where + the component is started in. +
spark.ssl.keyStorePasswordNone + A password to the key-store. +
spark.ssl.keyPasswordNone + A password to the private key in key-store. +
spark.ssl.trustStoreNone + A path to a trust-store file. The path can be absolute or relative to the directory + where the component is started in. +
spark.ssl.trustStorePasswordNone + A password to the trust-store. +
spark.ssl.protocolNone + A protocol name. The protocol must be supported by JVM. The reference list of protocols + one can find on this + page. +
spark.ssl.enabledAlgorithmsEmpty + A comma separated list of ciphers. The specified ciphers must be supported by JVM. + The reference list of protocols one can find on + this + page. +
+ + #### Spark Streaming diff --git a/docs/security.md b/docs/security.md index 1e206a139fb72..6e0a54fbc4ad7 100644 --- a/docs/security.md +++ b/docs/security.md @@ -20,6 +20,30 @@ Spark allows for a set of administrators to be specified in the acls who always If your applications are using event logging, the directory where the event logs go (`spark.eventLog.dir`) should be manually created and have the proper permissions set on it. If you want those log files secured, the permissions should be set to `drwxrwxrwxt` for that directory. The owner of the directory should be the super user who is running the history server and the group permissions should be restricted to super user group. This will allow all users to write to the directory but will prevent unprivileged users from removing or renaming a file unless they own the file or directory. The event log files will be created by Spark with permissions such that only the user and group have read and write access. +## Encryption + +Spark supports SSL for Akka and HTTP (for broadcast and file server) protocols. However SSL is not supported yet for WebUI and block transfer service. + +Connection encryption (SSL) configuration is organized hierarchically. The user can configure the default SSL settings which will be used for all the supported communication protocols unless they are overwritten by protocol-specific settings. This way the user can easily provide the common settings for all the protocols without disabling the ability to configure each one individually. The common SSL settings are at `spark.ssl` namespace in Spark configuration, while Akka SSL configuration is at `spark.ssl.akka` and HTTP for broadcast and file server SSL configuration is at `spark.ssl.fs`. The full breakdown can be found on the [configuration page](configuration.html). + +SSL must be configured on each node and configured for each component involved in communication using the particular protocol. + +### YARN mode +The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark. + +### Standalone mode +The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors. + +### Preparing the key-stores +Key-stores can be generated by `keytool` program. The reference documentation for this tool is +[here](https://docs.oracle.com/javase/7/docs/technotes/tools/solaris/keytool.html). The most basic +steps to configure the key-stores and the trust-store for the standalone deployment mode is as +follows: +* Generate a keys pair for each node +* Export the public key of the key pair to a file on each node +* Import all exported public keys into a single trust-store +* Distribute the trust-store over the nodes + ## Configuring Ports for Network Security Spark makes heavy use of the network, and some environments have strict requirements for using tight
Property NameDefaultMeaning