From fcc3d29866b73f1c3553ae1861ff8cc9c7829258 Mon Sep 17 00:00:00 2001 From: Jacek Lewandowski Date: Thu, 9 Oct 2014 01:41:05 +0200 Subject: [PATCH 1/3] SPARK-3883 SSL support for HttpServer and Akka - Introduced SSLOptions object - SSLOptions is created by SecurityManager - SSLOptions configures Akka and Jetty to use SSL - SSLOptions uses property file which is node-local to set SSL settings - Provided utility methods to determine the proper Akka protocol for Akka requests and to configure SSL socket factory for URL connections - Added tests cases for AkkaUtils, FileServer, SSLOptions and SecurityManager --- conf/ssl.conf.template | 27 +++ .../org/apache/spark/HttpFileServer.scala | 5 +- .../scala/org/apache/spark/HttpServer.scala | 14 +- .../scala/org/apache/spark/SSLOptions.scala | 188 ++++++++++++++++ .../org/apache/spark/SecurityManager.scala | 42 ++++ .../scala/org/apache/spark/SparkConf.scala | 5 + .../scala/org/apache/spark/SparkEnv.scala | 2 +- .../spark/broadcast/HttpBroadcast.scala | 4 +- .../org/apache/spark/deploy/Client.scala | 2 +- .../spark/deploy/client/AppClient.scala | 8 +- .../apache/spark/deploy/master/Master.scala | 4 +- .../apache/spark/deploy/worker/Worker.scala | 8 +- .../CoarseGrainedSchedulerBackend.scala | 2 +- .../cluster/SimrSchedulerBackend.scala | 2 +- .../cluster/SparkDeploySchedulerBackend.scala | 7 +- .../mesos/CoarseMesosSchedulerBackend.scala | 7 +- .../org/apache/spark/util/AkkaUtils.scala | 23 +- .../scala/org/apache/spark/util/Utils.scala | 16 ++ core/src/test/resources/bad-ssl.conf | 27 +++ core/src/test/resources/good-ssl.conf | 27 +++ core/src/test/resources/keystore | Bin 0 -> 2247 bytes core/src/test/resources/truststore | Bin 0 -> 957 bytes core/src/test/resources/untrusted-keystore | Bin 0 -> 2246 bytes .../org/apache/spark/FileServerSuite.scala | 100 +++++++++ .../org/apache/spark/LocalSparkContext.scala | 1 + .../apache/spark/MapOutputTrackerSuite.scala | 2 +- .../org/apache/spark/SSLOptionsSuite.scala | 115 ++++++++++ .../apache/spark/SecurityManagerSuite.scala | 51 ++++- .../org/apache/spark/deploy/ClientSuite.scala | 1 + .../apache/spark/util/AkkaUtilsSuite.scala | 203 +++++++++++++++++- .../examples/streaming/ActorWordCount.scala | 4 +- .../spark/repl/ExecutorClassLoader.scala | 11 +- .../org/apache/spark/repl/SparkIMain.scala | 2 +- .../receiver/ReceiverSupervisorImpl.scala | 4 +- .../spark/deploy/yarn/ExecutorLauncher.scala | 5 +- .../deploy/yarn/YarnAllocationHandler.scala | 7 +- .../spark/deploy/yarn/ExecutorLauncher.scala | 5 +- .../deploy/yarn/YarnAllocationHandler.scala | 7 +- 38 files changed, 877 insertions(+), 61 deletions(-) create mode 100644 conf/ssl.conf.template create mode 100644 core/src/main/scala/org/apache/spark/SSLOptions.scala create mode 100644 core/src/test/resources/bad-ssl.conf create mode 100644 core/src/test/resources/good-ssl.conf create mode 100644 core/src/test/resources/keystore create mode 100644 core/src/test/resources/truststore create mode 100644 core/src/test/resources/untrusted-keystore create mode 100644 core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala diff --git a/conf/ssl.conf.template b/conf/ssl.conf.template new file mode 100644 index 000000000000..5b5d32eaa22b --- /dev/null +++ b/conf/ssl.conf.template @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Spark SSL settings + +# ssl.enabled true +# ssl.keyStore /path/to/your/keyStore +# ssl.keyStorePassword password +# ssl.keyPassword password +# ssl.trustStore /path/to/your/trustStore +# ssl.trustStorePassword password +# ssl.enabledAlgorithms [TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA] +# ssl.protocol SSLv3 diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala index edc3889c9ae5..13e79531ed62 100644 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala @@ -25,7 +25,8 @@ import org.apache.spark.util.Utils private[spark] class HttpFileServer( securityManager: SecurityManager, - requestedPort: Int = 0) + requestedPort: Int = 0, + conf: SparkConf) extends Logging { var baseDir : File = null @@ -41,7 +42,7 @@ private[spark] class HttpFileServer( fileDir.mkdir() jarDir.mkdir() logInfo("HTTP File server directory is " + baseDir) - httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server") + httpServer = new HttpServer(baseDir, securityManager, requestedPort, "HTTP file server", conf) httpServer.start() serverUri = httpServer.uri logDebug("HTTP file server started at: " + serverUri) diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala index 912558d0cab7..459628bd8cb5 100644 --- a/core/src/main/scala/org/apache/spark/HttpServer.scala +++ b/core/src/main/scala/org/apache/spark/HttpServer.scala @@ -19,6 +19,7 @@ package org.apache.spark import java.io.File +import org.eclipse.jetty.server.ssl.SslSocketConnector import org.eclipse.jetty.util.security.{Constraint, Password} import org.eclipse.jetty.security.authentication.DigestAuthenticator import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService} @@ -45,7 +46,8 @@ private[spark] class HttpServer( resourceBase: File, securityManager: SecurityManager, requestedPort: Int = 0, - serverName: String = "HTTP server") + serverName: String = "HTTP server", + conf: SparkConf) extends Logging { private var server: Server = null @@ -71,7 +73,10 @@ private[spark] class HttpServer( */ private def doStart(startPort: Int): (Server, Int) = { val server = new Server() - val connector = new SocketConnector + + val connector = securityManager.sslOptions.createJettySslContextFactory() + .map(new SslSocketConnector(_)).getOrElse(new SocketConnector) + connector.setMaxIdleTime(60 * 1000) connector.setSoLingerTime(-1) connector.setPort(startPort) @@ -148,13 +153,14 @@ private[spark] class HttpServer( } /** - * Get the URI of this HTTP server (http://host:port) + * Get the URI of this HTTP server (http://host:port or https://host:port) */ def uri: String = { if (server == null) { throw new ServerStateException("Server is not started") } else { - "http://" + Utils.localIpAddress + ":" + port + val scheme = if (securityManager.sslOptions.enabled) "https" else "http" + s"$scheme://${Utils.localIpAddress}:$port" } } } diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala new file mode 100644 index 000000000000..0fb2ddd74230 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.io.{FileReader, File} +import java.nio.file.Paths +import java.util.Properties + +import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} +import org.apache.commons.io.FilenameUtils +import org.eclipse.jetty.util.ssl.SslContextFactory + +import scala.util.Try + +case class SSLOptions(enabled: Boolean = false, + keyStore: Option[File] = None, + keyStorePassword: Option[String] = None, + keyPassword: Option[String] = None, + trustStore: Option[File] = None, + trustStorePassword: Option[String] = None, + protocol: Option[String] = None, + enabledAlgorithms: Set[String] = Set.empty) { + + /** + * Creates a Jetty SSL context factory according to the SSL settings represented by this object. + */ + def createJettySslContextFactory(): Option[SslContextFactory] = { + if (enabled) { + val sslContextFactory = new SslContextFactory() + + keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath)) + trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath)) + keyStorePassword.foreach(sslContextFactory.setKeyStorePassword) + trustStorePassword.foreach(sslContextFactory.setTrustStorePassword) + keyPassword.foreach(sslContextFactory.setKeyManagerPassword) + protocol.foreach(sslContextFactory.setProtocol) + sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*) + + Some(sslContextFactory) + } else { + None + } + } + + /** + * Creates an Akka configuration object which contains all the SSL settings represented by this + * object. It can be used then to compose the ultimate Akka configuration. + */ + def createAkkaConfig: Option[Config] = { + import scala.collection.JavaConversions._ + if (enabled) { + Some(ConfigFactory.empty() + .withValue("akka.remote.netty.tcp.security.key-store", + ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse(""))) + .withValue("akka.remote.netty.tcp.security.key-store-password", + ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse(""))) + .withValue("akka.remote.netty.tcp.security.trust-store", + ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse(""))) + .withValue("akka.remote.netty.tcp.security.trust-store-password", + ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse(""))) + .withValue("akka.remote.netty.tcp.security.key-password", + ConfigValueFactory.fromAnyRef(keyPassword.getOrElse(""))) + .withValue("akka.remote.netty.tcp.security.random-number-generator", + ConfigValueFactory.fromAnyRef("")) + .withValue("akka.remote.netty.tcp.security.protocol", + ConfigValueFactory.fromAnyRef(protocol.getOrElse(""))) + .withValue("akka.remote.netty.tcp.security.enabled-algorithms", + ConfigValueFactory.fromIterable(enabledAlgorithms.toSeq)) + .withValue("akka.remote.netty.tcp.enable-ssl", + ConfigValueFactory.fromAnyRef(true))) + } else { + None + } + } + +} + +object SSLOptions extends Logging { + + /** + * Resolves the SSL configuration file location by checking: + * - SPARK_SSL_CONFIG_FILE env variable + * - SPARK_CONF_DIR/ssl.conf + * - SPARK_HOME/conf/ssl.conf + */ + def defaultConfigFile: Option[File] = { + val specifiedFile = Option(System.getenv("SPARK_SSL_CONFIG_FILE")).map(new File(_)) + val sparkConfDir = Option(System.getenv("SPARK_CONF_DIR")).map(new File(_)) + val sparkHomeConfDir = Option(System.getenv("SPARK_HOME")) + .map(new File(_, "conf")) + val defaultFile = (sparkConfDir orElse sparkHomeConfDir).map(new File(_, "ssl.conf")) + + specifiedFile orElse defaultFile + } + + /** + * Loads the given properties file with failover to empty Properties object. + */ + def load(configFile: File): Properties = { + logInfo(s"Loading SSL configuration from $configFile") + try { + val props = new Properties() + val reader = new FileReader(configFile) + try { + props.load(reader) + props.put("sslConfigurationFileLocation", configFile.getAbsolutePath) + props + } finally { + reader.close() + } + } catch { + case ex: Throwable => + logWarning(s"The SSL configuration file ${configFile.getAbsolutePath} " + + s"could not be loaded. The underlying exception was: ${ex.getMessage}") + new Properties + } + } + + /** + * Resolves SSLOptions settings from a given Spark configuration object at a given namespace. + * If SSL settings were loaded from the configuration file, ``sslConfigurationFileLocation`` + * property is present in the Spark configuration. The parent directory of that location is used + * as a base directory to resolve relative paths to keystore and truststore. + */ + def parse(conf: SparkConf, ns: String): SSLOptions = { + val configFilePath = conf.getOption("sslConfigurationFileLocation") + + def makeFile(pathString: String): File = { + val path = Paths.get(pathString) + + if (path.isAbsolute || configFilePath.isEmpty) { + path.toFile + } else { + new File(FilenameUtils.concat(new File(configFilePath.get).getParent, pathString)) + } + } + + val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = false) + val keyStore = Try(conf.get(s"$ns.keyStore")).toOption.map(makeFile) + val keyStorePassword = Try(conf.get(s"$ns.keyStorePassword")).toOption + val keyPassword = Try(conf.get(s"$ns.keyPassword")).toOption + val trustStore = Try(conf.get(s"$ns.trustStore")).toOption.map(makeFile) + val trustStorePassword = Try(conf.get(s"$ns.trustStorePassword")).toOption + val protocol = Try(conf.get(s"$ns.protocol")).toOption + val enabledAlgorithms = Try(conf.get(s"$ns.enabledAlgorithms")).toOption + .map(_.trim.dropWhile(_ == '[') + .takeWhile(_ != ']')).map(_.split(",").map(_.trim).toSet) + .getOrElse(Set.empty) + + new SSLOptions(enabled, keyStore, keyStorePassword, keyPassword, trustStore, trustStorePassword, + protocol, enabledAlgorithms) + } + + /** + * Loads the SSL configuration file. If ``spark.ssl.configFile`` property is in the system + * properties, it is assumed it contains the SSL configuration file location to be used. + * Otherwise, it uses the location returned by [[SSLOptions.defaultConfigFile]]. + */ + def load(): Properties = { + val file = Option(System.getProperty("spark.ssl.configFile")) + .map(new File(_)) orElse defaultConfigFile + + file.fold { + logWarning("SSL configuration file not found. SSL will be disabled.") + new Properties() + } { file => + logInfo(s"Loading SSL configuration from ${file.getAbsolutePath}") + load(file) + } + } + +} + diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 48c4e515885e..b76dbba08af6 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -18,7 +18,11 @@ package org.apache.spark import java.net.{Authenticator, PasswordAuthentication} +import java.security.KeyStore +import java.security.cert.X509Certificate +import javax.net.ssl._ +import org.apache.commons.io.FileUtils import org.apache.hadoop.io.Text import org.apache.spark.deploy.SparkHadoopUtil @@ -192,6 +196,44 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging { ) } + private[spark] val sslOptions = SSLOptions.parse(ns = "ssl", conf = sparkConf) + + private[spark] val (sslSocketFactory, hostnameVerifier) = if (sslOptions.enabled) { + val trustStoreManagers = + for (trustStore <- sslOptions.trustStore) yield { + val ks = KeyStore.getInstance(KeyStore.getDefaultType) + ks.load(FileUtils.openInputStream(sslOptions.trustStore.get), + sslOptions.trustStorePassword.get.toCharArray) + + val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) + tmf.init(ks) + tmf.getTrustManagers + } + + lazy val credulousTrustStoreManagers = Array({ + logWarning("Using 'accept-all' trust manager for SSL connections.") + new X509TrustManager { + override def getAcceptedIssuers: Array[X509Certificate] = null + + override def checkClientTrusted(x509Certificates: Array[X509Certificate], s: String) {} + + override def checkServerTrusted(x509Certificates: Array[X509Certificate], s: String) {} + }: TrustManager + }) + + val sslContext = SSLContext.getInstance(sslOptions.protocol.getOrElse("Default")) + sslContext.init(null, trustStoreManagers getOrElse credulousTrustStoreManagers, null) + + val hostVerifier = new HostnameVerifier { + override def verify(s: String, sslSession: SSLSession): Boolean = true + } + + (Some(sslContext.getSocketFactory), Some(hostVerifier)) + } else { + val sslContext = SSLContext.getDefault + (None, None) + } + /** * Split a comma separated String, filter out any empty items, and return a Set of strings */ diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 605df0e929fa..15099636031d 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -48,6 +48,11 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { private[spark] val settings = new HashMap[String, String]() if (loadDefaults) { + // Load SSL settings from SSL configuration file + for ((k, v) <- SSLOptions.load().asScala) { + settings(k) = v + } + // Load any spark.* system properties for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) { settings(k) = v diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 72716567ca99..f86759dc9326 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -239,7 +239,7 @@ object SparkEnv extends Logging { val httpFileServer = if (isDriver) { val fileServerPort = conf.getInt("spark.fileserver.port", 0) - val server = new HttpFileServer(securityManager, fileServerPort) + val server = new HttpFileServer(securityManager, fileServerPort, conf) server.initialize() conf.set("spark.fileserver.uri", server.serverUri) server diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 4cd4f4f96fd1..9ffa85953892 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -153,7 +153,8 @@ private[broadcast] object HttpBroadcast extends Logging { private def createServer(conf: SparkConf) { broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf)) val broadcastPort = conf.getInt("spark.broadcast.port", 0) - server = new HttpServer(broadcastDir, securityManager, broadcastPort, "HTTP broadcast server") + server = new HttpServer(broadcastDir, securityManager, + broadcastPort, "HTTP broadcast server", conf) server.start() serverUri = server.uri logInfo("Broadcast server started at " + serverUri) @@ -196,6 +197,7 @@ private[broadcast] object HttpBroadcast extends Logging { logDebug("broadcast not using security") uc = new URL(url).openConnection() } + Utils.setupSecureURLConnection(uc, securityManager) val in = { uc.setReadTimeout(httpReadTimeout) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 065ddda50e65..3b1eb880525c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -39,7 +39,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) val timeout = AkkaUtils.askTimeout(conf) override def preStart() = { - masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master)) + masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master, conf)) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 32790053a6be..5e438369f280 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -77,7 +77,7 @@ private[spark] class AppClient( def tryRegisterAllMasters() { for (masterUrl <- masterUrls) { logInfo("Connecting to master " + masterUrl + "...") - val actor = context.actorSelection(Master.toAkkaUrl(masterUrl)) + val actor = context.actorSelection(Master.toAkkaUrl(masterUrl, conf)) actor ! RegisterApplication(appDescription) } } @@ -104,17 +104,17 @@ private[spark] class AppClient( def changeMaster(url: String) { activeMasterUrl = url - master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl)) + master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl, conf)) masterAddress = activeMasterUrl match { case Master.sparkUrlRegex(host, port) => - Address("akka.tcp", Master.systemName, host, port.toInt) + Address(AkkaUtils.protocol(conf), Master.systemName, host, port.toInt) case x => throw new SparkException("Invalid spark URL: " + x) } } private def isPossibleMaster(remoteUrl: Address) = { - masterUrls.map(s => Master.toAkkaUrl(s)) + masterUrls.map(s => Master.toAkkaUrl(s, conf)) .map(u => AddressFromURIString(u).hostPort) .contains(remoteUrl.hostPort) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 8d99ed442604..b4d181a7d42e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -798,10 +798,10 @@ private[spark] object Master extends Logging { } /** Returns an `akka.tcp://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */ - def toAkkaUrl(sparkUrl: String): String = { + def toAkkaUrl(sparkUrl: String, conf: SparkConf): String = { sparkUrl match { case sparkUrlRegex(host, port) => - "akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName) + AkkaUtils.address(systemName, host, port, actorName, conf) case _ => throw new SparkException("Invalid master URL: " + sparkUrl) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index acc83d52ce98..f801978d7a2a 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -77,7 +77,7 @@ private[spark] class Worker( var masterAddress: Address = null var activeMasterUrl: String = "" var activeMasterWebUiUrl : String = "" - val akkaUrl = "akka.tcp://%s@%s:%s/user/%s".format(actorSystemName, host, port, actorName) + val akkaUrl = AkkaUtils.address(actorSystemName, host, port, actorName, conf) @volatile var registered = false @volatile var connected = false val workerId = generateWorkerId() @@ -148,10 +148,10 @@ private[spark] class Worker( masterLock.synchronized { activeMasterUrl = url activeMasterWebUiUrl = uiUrl - master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl)) + master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl, conf)) masterAddress = activeMasterUrl match { case Master.sparkUrlRegex(_host, _port) => - Address("akka.tcp", Master.systemName, _host, _port.toInt) + Address(AkkaUtils.protocol(conf), Master.systemName, _host, _port.toInt) case x => throw new SparkException("Invalid spark URL: " + x) } @@ -162,7 +162,7 @@ private[spark] class Worker( def tryRegisterAllMasters() { for (masterUrl <- masterUrls) { logInfo("Connecting to master " + masterUrl + "...") - val actor = context.actorSelection(Master.toAkkaUrl(masterUrl)) + val actor = context.actorSelection(Master.toAkkaUrl(masterUrl, conf)) actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 04046e2e5d11..0e1a247179a4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -27,7 +27,7 @@ import akka.actor._ import akka.pattern.ask import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} -import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState} +import org.apache.spark._ import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index c1b0da4b99cf..b3c6e0fc9a54 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -38,7 +38,7 @@ private[spark] class SimrSchedulerBackend( override def start() { super.start() - val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + val driverUrl = "%s://%s@%s:%s/user/%s".format( SparkEnv.driverActorSystemName, sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port"), diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index c1d5ce0a3607..5eb01c6b7be4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -21,7 +21,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv} import org.apache.spark.deploy.{ApplicationDescription, Command} import org.apache.spark.deploy.client.{AppClient, AppClientListener} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl} -import org.apache.spark.util.Utils +import org.apache.spark.util.{AkkaUtils, Utils} private[spark] class SparkDeploySchedulerBackend( scheduler: TaskSchedulerImpl, @@ -42,11 +42,12 @@ private[spark] class SparkDeploySchedulerBackend( super.start() // The endpoint for executors to talk to us - val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + val driverUrl = AkkaUtils.address( SparkEnv.driverActorSystemName, conf.get("spark.driver.host"), conf.get("spark.driver.port"), - CoarseGrainedSchedulerBackend.ACTOR_NAME) + CoarseGrainedSchedulerBackend.ACTOR_NAME, + conf) val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}") val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") .map(Utils.splitCommandString).getOrElse(Seq.empty) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 037fea5854ca..f321da133983 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -21,6 +21,8 @@ import java.io.File import java.util.{List => JList} import java.util.Collections +import org.apache.spark.util.AkkaUtils + import scala.collection.JavaConversions._ import scala.collection.mutable.{HashMap, HashSet} @@ -138,11 +140,12 @@ private[spark] class CoarseMesosSchedulerBackend( } val command = CommandInfo.newBuilder() .setEnvironment(environment) - val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + val driverUrl = AkkaUtils.address( SparkEnv.driverActorSystemName, conf.get("spark.driver.host"), conf.get("spark.driver.port"), - CoarseGrainedSchedulerBackend.ACTOR_NAME) + CoarseGrainedSchedulerBackend.ACTOR_NAME, + conf) val uri = conf.get("spark.executor.uri", null) if (uri == null) { diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index e2d32c859bbd..71bf3b16d288 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -27,7 +27,7 @@ import akka.pattern.ask import com.typesafe.config.ConfigFactory import org.apache.log4j.{Level, Logger} -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException, SSLOptions} /** * Various utility classes for working with Akka. @@ -91,8 +91,10 @@ private[spark] object AkkaUtils extends Logging { val secureCookie = if (isAuthOn) secretKey else "" logDebug("In createActorSystem, requireCookie is: " + requireCookie) - val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback( - ConfigFactory.parseString( + val akkaSslConfig = securityManager.sslOptions.createAkkaConfig.getOrElse(ConfigFactory.empty()) + + val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]) + .withFallback(akkaSslConfig).withFallback(ConfigFactory.parseString( s""" |akka.daemonic = on |akka.loggers = [""akka.event.slf4j.Slf4jLogger""] @@ -196,9 +198,22 @@ private[spark] object AkkaUtils extends Logging { val driverHost: String = conf.get("spark.driver.host", "localhost") val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost, "Expected hostname") - val url = s"akka.tcp://$driverActorSystemName@$driverHost:$driverPort/user/$name" + val url = address(driverActorSystemName, driverHost, driverPort, name, conf) val timeout = AkkaUtils.lookupTimeout(conf) logInfo(s"Connecting to $name: $url") Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) } + + def protocol(conf: SparkConf): String = { + if (conf.getBoolean("ssl.enabled", defaultValue = false)) { + "akka.ssl.tcp" + } else { + "akka.tcp" + } + } + + def address(systemName: String, host: String, port: Any, actorName: String, + conf: SparkConf): String = { + s"${protocol(conf)}://$systemName@$host:$port/user/$actorName" + } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 3f0a80b95649..5cb73fcbabd1 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -22,6 +22,7 @@ import java.net._ import java.nio.ByteBuffer import java.util.{Properties, Locale, Random, UUID} import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor} +import javax.net.ssl.HttpsURLConnection import org.apache.log4j.PropertyConfigurator @@ -376,6 +377,7 @@ private[spark] object Utils extends Logging { logDebug("fetchFile not using security") uc = new URL(url).openConnection() } + Utils.setupSecureURLConnection(uc, securityMgr) val timeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000 uc.setConnectTimeout(timeout) @@ -1516,6 +1518,20 @@ private[spark] object Utils extends Logging { PropertyConfigurator.configure(pro) } + /** + * If the given URL connection is HttpsURLConnection, it sets the SSL socket factory and + * the host verifier from the given security manager. + */ + def setupSecureURLConnection(urlConnection: URLConnection, sm: SecurityManager): URLConnection = { + urlConnection match { + case https: HttpsURLConnection => + sm.sslSocketFactory.foreach(https.setSSLSocketFactory) + sm.hostnameVerifier.foreach(https.setHostnameVerifier) + https + case connection => connection + } + } + } /** diff --git a/core/src/test/resources/bad-ssl.conf b/core/src/test/resources/bad-ssl.conf new file mode 100644 index 000000000000..36ec81dff39a --- /dev/null +++ b/core/src/test/resources/bad-ssl.conf @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Spark SSL settings + +ssl.enabled true +ssl.keyStore untrusted-keystore +ssl.keyStorePassword password +ssl.keyPassword password +ssl.trustStore truststore +ssl.trustStorePassword password +ssl.enabledAlgorithms [TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA] +ssl.protocol SSLv3 diff --git a/core/src/test/resources/good-ssl.conf b/core/src/test/resources/good-ssl.conf new file mode 100644 index 000000000000..0f07476251c9 --- /dev/null +++ b/core/src/test/resources/good-ssl.conf @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Spark SSL settings + +ssl.enabled true +ssl.keyStore keystore +ssl.keyStorePassword password +ssl.keyPassword password +ssl.trustStore truststore +ssl.trustStorePassword password +ssl.enabledAlgorithms [TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA] +ssl.protocol SSLv3 diff --git a/core/src/test/resources/keystore b/core/src/test/resources/keystore new file mode 100644 index 0000000000000000000000000000000000000000..f8310e39ba1e07c7559e5724f522c06ca47f5787 GIT binary patch literal 2247 zcmchYc{J1uAI9f5Gt8i|P1&wBlPn{@Y0#AI+9uMFEKS|9bCtJ0{)TM zX}X`!>67%Z@GWAuh_Ft8Kt!M*6TS&bii+!r0&qYVDgyu_5I7UgZ~dLDFFKbbe4NJ> z8h*OOS9N;OWepcs>OGoE=zuk2V{*wI)Z_e}>Z@^h>_v>&7v=eK%-(UR|F2)6ifSZm zB{$HW8@=O<)XbNH;_wk=FeM$Bjfx=?s>^D9Zr#6WTzln-!DYFW$=EZVD(3K>jS96w z-f`E>K|`eE2uJ#EUlg%5TBw+iWJz5iySD{o5{Ul(C)@&^f0t&B->D-6`JH0%@@;s@ zjMngAG004|Ao+Z!S-#o>U%j;FhYZ-6+Sr%89Pdns-=>tDGRpP7IeZ?->$L-DANQrdWKTa;0 zluk^040v;S@5DL7!>L;M8&vwCZAjLK*vpGqYx~-VYZs+3mb>>RYt{axVJiP>$3Y|} zz*eB_iPWr6Tc->>lU_2dn3A0UZrr!p`Ly9<%d%n&X#yvmACj3LY^dq?e=FaqJ!`fkB(%M!T4bA&bADq%;%h1-L)a{YF#I>*n5; zXt7P-gtHyc-1yawv9;te^CLkar+P2}c`q>)=Cwt62se##oHtS0qXNZJW*YMjnTYe| z1W+lZjRJNxO-J~$3RTEpZS0TIp!w}_nOBXJ>YGGE1!=AA8t_d9vGYswy&Plo@H7BgF?Rr;IN}~I_OZ5AE?Xc>e zduB^$Zl^0#&4F^xR^%KivoAj*R)05F<5T{9#!cLae!D`i$@(i5VdKPs;vD6ow-Pa0 zmfi>Du&L;*jI`*2mxHbvy2C)a@EF^M?KmG1vd1J;hO<00q{Ea0+F@!vpWZUoIem_pk7QGFApU$XWEZyP!S$ zy2#@wW_snjaE*mkb%s`~Ur~W0n&f9pXRDv9^a`mg8=d@ECh_CSHJ#du`ah#SxB9W1 z-pb!}YxjkPKt*mOLDS3IRD~MerwrgiggePgez$MStIAG30Vaz+z znP5Kohc09y*(6S=bZ0;6*@CtY{i164xeX%S_5pQfMC@N!nO0wH%Xtuc`Uu_?pnpJ3cKSbR^Nesz@fkeg8K_ zTJ35i8GW{Bgm&Uy_A)8Ar>TC80v~KUm51AD4|t?pz-*InPU(xFwfe7aK@- z&`sYxAFsV#57|9oUhntf^%Ms6?u7Ik>%Ezi3GO{ccd$K+?Z=*8-c9HP#s zOrXfNhRKwkI)1xzMcTvsXTrA@mGdnR9#$e8BDNj1us(Rjp^I@0JHr=J9h*keYb@4` zEbRm4_$*5QRieRoP)JDy&5{>VVsBkDRg>0s_n`ei%|c)v!>9n^H~H}eLNotr3B^*= z{jjq6yqK``^Vn@D41fS-8lW1evegh3Nw^$*J9JmC0%UznnvdunEUWXJm7jB#`FDt0 zo!SZ!fDlyn__i1(A!8e5JfGHGYq#O59PYAPUd^G9FA4GLhdQU@y3_KivZwp9e#sjZ z1)6E~Py4647bJ-*g&tbSSqXlAo`|VY0-a#1EcqqpmY_MpMl0!c@gNTqd?DPoAC6{F zX#Czw>93?(`H$;NoCL46jICSrt~xR}wNV^~dn<|;)#U(7hP$oGOzs-~AvLGK&g0zM zw^?bP=4NGFJ3;N}QsShyk0XSs?LW@9d~Sxj?AVZ{pOtdaJ7!gP>dq$(AFE$ic(QF_ zMtzQi_+9hZzoOnZ-qFt&6a;ssvcmgs))|WiZVphZ+9n#oo9dzuTg|>3rcC)~XD4Ad ZwPrecI^>>DS9jrYAM6Ze@sCox{su3o&c*-$ literal 0 HcmV?d00001 diff --git a/core/src/test/resources/truststore b/core/src/test/resources/truststore new file mode 100644 index 0000000000000000000000000000000000000000..a6b1d46e1f391995553771665934518114892962 GIT binary patch literal 957 zcmezO_TO6u1_mY|W(3omIr+(nIT`uIB|s66PaJ187+53pObsj<7?`UKnwZNCnwa7j zFf%bSF|h<%`7Sl!W#iOp^Jx3d%gD&h%3zRVC}6bQhKm@;iSrto7#bK@7#SNH7@I@^xuyn&24+w$on6z!sDvCGjI0dIO^o~u z22G4yOihf849&hp=Zrag|9n~Gc#H3$vhv=@pKm=9>UvU(ZZwCjnIky!nB{NDyiMQB zBZO<_+{laCF!_t`)YJQR{_Ib&x!G3fds+PIs-~1XLS?NdeqFL|Ki_tDnb_pb)#>@( zTD|J;>oi2PIZoZR+nId!%3~GY`%wb7osA|uXbd}Tx=!cX%NWDQox39rO%?vMzHf?~ z=1swLR-RoC+D>nNbu4gY&$gL|-rloc_5R@2ZL8S5%Xot9o!zH-pDYd6dHL#Xw%@*+ zy&LU+nmT%>zI)Xlb76*?{++DCc`NfA&lk?~bl&)QW6#lb?w*%6eoUWUYC4NO`N6ZZ z?8-A|%!_eU4)c+*d(ZiH(T7{hnV1hF?DgUa_CL zx_EQkYw>r1g2;ggOjE!>WMr_A)k`{P(!x=_M=);x=K055cF6ji^!W09N=!ueTVn;U z6EA8{^-Nzg`Ng@3($jCUF&@ae8^o zyyq)UdNa4Ognk!%pnCm3|2v5#(^qdW|F_DnC)CMo)7|XjC#6>R9=pT-w&U3U_Jbb| zump?#5z4RO57rKM+MK!exniP|!?*K7M~h9*`B~JdzSntD@o>*t-I;G@<(}y1FS)RE zvl(0Zzf0L`j^5p4uvCF<+hNhw literal 0 HcmV?d00001 diff --git a/core/src/test/resources/untrusted-keystore b/core/src/test/resources/untrusted-keystore new file mode 100644 index 0000000000000000000000000000000000000000..6015b02caa12817721fca3a83c8e58338b6d9aeb GIT binary patch literal 2246 zcmchYS5VUl7sc~W0)Ye-=~4xxg#Zav38z3~KrxBxU2`W=0g8?MR?KmjE9H~?^gpj7C>T*;?*E%{fv1t#2A zwmzrvHqay1&t+Fi_Z+)w1n4H+Y%AMXJ&rSoCSXIQW|Hr+fhOC{~-z zRlY#ibR-jPTxR#)AT!BEO?ezTUwe>&ocp&K#iX4G4x}&l;Dj))1SC+yocOHPKcyd#4h$$Ysx{PW2wSrMAGjmd{> zmZKc2Dyf?4ujwLhFCx*5Hx~U|Z)4cF@ukC(ds5*mRqTjWZQ!rEzURPjf;fj^#543K+ z!ssm8re)pqzo9BLWxDF++xd2(u`*)aHlZZpG29)-s8jv^L!<=2n_lp*^DBVw#MvPDYjjU59hk-*=5ySF?;vN-zNZGbmhBK9h zT^c^=AHxzXuw5SuT_j1a9=(n&J?~VzvG}qlOYj8&6PtlOG82v{Ls5creeldHDY=n} z6NS98Hi+3%9-T{Nl<}Y`*yL)P*pyGtkkOKUx$aHwg>deSRCRDu3S_Bt*qYqP;^=vW z>Sial$32p`22-|}%Wtf09n6kxR~g&{ z;kf$;?1pyH#legE8IIWJMw1!v9A-Bq$dIyn#d=DTZcOxsMZN<8A29U{XY8FE^^?*t(;Wc9KRLxuFXiP-G z>e>s*`P%O)W#bh}MOrv+{^3tN%a_tNb9D8h>-eF{WYv1B{AL3=1%=#OJh|?KR$7@~ zM1%siVBJLAn~Pz$VnIy*@}LG*a&*qN<%mI`ZQi8>jN6Bzl5(ULw7YAmA5UoEllgUDu` zAewV|((5eVYOO+`<3sL2d)BS_Mok%KAB6q1*Jc7zF$H5nC_yMo`6_U&c?)aPk4b$z zk=~H8I#D$8!fS_4;uOVeZ*ca0M>wc+>X_;202 zht3{zS6QTZ`z}%LI2uqFW?YIRHI*33>s?|kWxkxSkI|mB{+aCe>R$D#3C_eDcm?~4 z04*s~%RJne@gR@YM1zFG)0DX;`R-ZeVAqWJizOR@=|SzH@w)|jt`!(`wOVk zU(Gc+3x6OSUy2;rv#hrK`;koo{3~F+?Lz<_V{4;1#8arvZ&0V8@AqY z%gOfdX610i{YAs|(!9Q+SL{EgzRT#t9JFQIGR2S`6VaKXbv2cJ2H)BOR*n|BK8B>J zu4T{_d4E~Njm&*(N{Lz*K3`qRFrPLYtX(hoRRuEpLT4Gjc z7p!#ttliZeFa!Vrd9r93wA5ijq>e&GphCV$=V%&Tr1x)APN~~@unO~heb4>d#KTM- znh1b2&I@^pv~P&tiq}$Qc2}5{Y1K+f=lfqETn=T3E>iK(`_W8&jcZK}2tO)n$}*DS zW9s~&EeO=mYgrtHq(vr!#h3NAy`7YJ8V0q&`(N)hb2~hoF87PZRGUk~lN{xCT(jsw z;Yp@T1SAT~KqTCb3a(cP)&1DNK4h=JduK2)s(9XTV?-4(76KO@#|t*-6s2Dl=w}?& z`IEOv)DCjYr5eow*?MXh8OBgmy;!t>iQ6;NbaZ0pfS*It8}lB6l1(9$AdKemI=5q_ z{7jO%*zS`ZmPK|=YMq+2r-Np*`16dZu!n^|WkFZrL^nhKO (x, 1)).reduceByKey(_ + _).print() diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala index 687e85ca94d3..8dbe5c6627e1 100644 --- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala +++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala @@ -45,7 +45,7 @@ class ExecutorClassLoader(classUri: String, parent: ClassLoader, // Hadoop FileSystem object for our URI, if it isn't using HTTP var fileSystem: FileSystem = { - if (uri.getScheme() == "http") { + if (Set("http", "https", "ftp").contains(uri.getScheme)) { null } else { FileSystem.get(uri, new Configuration()) @@ -78,13 +78,16 @@ class ExecutorClassLoader(classUri: String, parent: ClassLoader, if (fileSystem != null) { fileSystem.open(new Path(directory, pathInDirectory)) } else { - if (SparkEnv.get.securityManager.isAuthenticationEnabled()) { + val url = if (SparkEnv.get.securityManager.isAuthenticationEnabled()) { val uri = new URI(classUri + "/" + urlEncode(pathInDirectory)) val newuri = Utils.constructURIForAuthentication(uri, SparkEnv.get.securityManager) - newuri.toURL().openStream() + newuri.toURL } else { - new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream() + new URL(classUri + "/" + urlEncode(pathInDirectory)) } + + Utils.setupSecureURLConnection(url.openConnection(), SparkEnv.get.securityManager) + .getInputStream } } val bytes = readAndTransformClass(name, inputStream) diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala index 84b57cd2dc1a..7fba4e5538d5 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkIMain.scala @@ -103,7 +103,7 @@ import org.apache.spark.util.Utils val virtualDirectory = new PlainFile(outputDir) // "directory" for classfiles /** Jetty server that will serve our classes to worker nodes */ val classServerPort = conf.getInt("spark.replClassServer.port", 0) - val classServer = new HttpServer(outputDir, new SecurityManager(conf), classServerPort, "HTTP class server") + val classServer = new HttpServer(outputDir, new SecurityManager(conf), classServerPort, "HTTP class server", conf) private var currentSettings: Settings = initialSettings var printResults = true // whether to print result lines var totalSilence = false // whether to print anything diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 53a3e6200e34..51601ed6880a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -55,8 +55,8 @@ private[streaming] class ReceiverSupervisorImpl( private val trackerActor = { val ip = env.conf.get("spark.driver.host", "localhost") val port = env.conf.getInt("spark.driver.port", 7077) - val url = "akka.tcp://%s@%s:%s/user/ReceiverTracker".format( - SparkEnv.driverActorSystemName, ip, port) + val url = AkkaUtils.address( + SparkEnv.driverActorSystemName, ip, port, "ReceiverTracker", env.conf) env.actorSystem.actorSelection(url) } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 155dd88aa2b8..35ed341c13fd 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -210,11 +210,12 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp sparkConf.set("spark.driver.host", driverHost) sparkConf.set("spark.driver.port", driverPort.toString) - val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + val driverUrl = AkkaUtils.address( SparkEnv.driverActorSystemName, driverHost, driverPort.toString, - CoarseGrainedSchedulerBackend.ACTOR_NAME) + CoarseGrainedSchedulerBackend.ACTOR_NAME, + sparkConf) actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM") } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 568a6ef932bb..6e5483e73d17 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -29,7 +29,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import org.apache.spark.{Logging, SparkConf, SparkEnv} import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend -import org.apache.spark.util.Utils +import org.apache.spark.util.{AkkaUtils, Utils} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.AMRMProtocol @@ -245,11 +245,12 @@ private[yarn] class YarnAllocationHandler( // Deallocate + allocate can result in reusing id's wrongly - so use a different counter // (executorIdCounter) val executorId = executorIdCounter.incrementAndGet().toString - val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + val driverUrl = AkkaUtils.address( SparkEnv.driverActorSystemName, sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"), - CoarseGrainedSchedulerBackend.ACTOR_NAME) + CoarseGrainedSchedulerBackend.ACTOR_NAME, + sparkConf) logInfo("launching container on " + containerId + " host " + executorHostname) // Just to be safe, simply remove it from pendingReleaseContainers. diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index e093fe4ae6ff..5d769ccc8649 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -174,11 +174,12 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp sparkConf.set("spark.driver.host", driverHost) sparkConf.set("spark.driver.port", driverPort.toString) - val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + val driverUrl = AkkaUtils.address( SparkEnv.driverActorSystemName, driverHost, driverPort.toString, - CoarseGrainedSchedulerBackend.ACTOR_NAME) + CoarseGrainedSchedulerBackend.ACTOR_NAME, + sparkConf) actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM") } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 0a461749c819..ffa77d178307 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -29,7 +29,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import org.apache.spark.{Logging, SparkConf, SparkEnv} import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend -import org.apache.spark.util.Utils +import org.apache.spark.util.{AkkaUtils, Utils} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.ApplicationMasterProtocol @@ -262,11 +262,12 @@ private[yarn] class YarnAllocationHandler( numExecutorsRunning.decrementAndGet() } else { val executorId = executorIdCounter.incrementAndGet().toString - val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + val driverUrl = AkkaUtils.address( SparkEnv.driverActorSystemName, sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"), - CoarseGrainedSchedulerBackend.ACTOR_NAME) + CoarseGrainedSchedulerBackend.ACTOR_NAME, + sparkConf) logInfo("Launching container %s for on host %s".format(containerId, executorHostname)) From bc1b6a8ba19af8d87aa6b76dd3e52b46d02a4eed Mon Sep 17 00:00:00 2001 From: Jacek Lewandowski Date: Tue, 21 Oct 2014 12:02:35 +0200 Subject: [PATCH 2/3] SPARK-3883: Fixes after the review - Removed license header from ssl.conf.template - Fixed parameters order in SSLOptions.parse invocation in SecurityManager at L199 - Replaced org.apache.commons.io with different approaches - Removed unused line of code from SecurityManager - Changed the default protocol from SSLv3 to TLSv1.2 (POODLE attack) --- conf/ssl.conf.template | 19 +------------------ .../scala/org/apache/spark/SSLOptions.scala | 19 ++++--------------- .../org/apache/spark/SecurityManager.scala | 7 +++---- core/src/test/resources/bad-ssl.conf | 2 +- core/src/test/resources/good-ssl.conf | 2 +- 5 files changed, 10 insertions(+), 39 deletions(-) diff --git a/conf/ssl.conf.template b/conf/ssl.conf.template index 5b5d32eaa22b..403c18c00a2a 100644 --- a/conf/ssl.conf.template +++ b/conf/ssl.conf.template @@ -1,20 +1,3 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - # Spark SSL settings # ssl.enabled true @@ -24,4 +7,4 @@ # ssl.trustStore /path/to/your/trustStore # ssl.trustStorePassword password # ssl.enabledAlgorithms [TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA] -# ssl.protocol SSLv3 +# ssl.protocol TLSv1.2 diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala index 0fb2ddd74230..1804359766ab 100644 --- a/core/src/main/scala/org/apache/spark/SSLOptions.scala +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -18,11 +18,9 @@ package org.apache.spark import java.io.{FileReader, File} -import java.nio.file.Paths import java.util.Properties import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} -import org.apache.commons.io.FilenameUtils import org.eclipse.jetty.util.ssl.SslContextFactory import scala.util.Try @@ -138,23 +136,14 @@ object SSLOptions extends Logging { * as a base directory to resolve relative paths to keystore and truststore. */ def parse(conf: SparkConf, ns: String): SSLOptions = { - val configFilePath = conf.getOption("sslConfigurationFileLocation") - - def makeFile(pathString: String): File = { - val path = Paths.get(pathString) - - if (path.isAbsolute || configFilePath.isEmpty) { - path.toFile - } else { - new File(FilenameUtils.concat(new File(configFilePath.get).getParent, pathString)) - } - } + val parentDir = conf.getOption("sslConfigurationFileLocation").map(new File(_).getParentFile) + .getOrElse(new File(".")).toPath val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = false) - val keyStore = Try(conf.get(s"$ns.keyStore")).toOption.map(makeFile) + val keyStore = Try(conf.get(s"$ns.keyStore")).toOption.map(parentDir.resolve(_).toFile) val keyStorePassword = Try(conf.get(s"$ns.keyStorePassword")).toOption val keyPassword = Try(conf.get(s"$ns.keyPassword")).toOption - val trustStore = Try(conf.get(s"$ns.trustStore")).toOption.map(makeFile) + val trustStore = Try(conf.get(s"$ns.trustStore")).toOption.map(parentDir.resolve(_).toFile) val trustStorePassword = Try(conf.get(s"$ns.trustStorePassword")).toOption val protocol = Try(conf.get(s"$ns.protocol")).toOption val enabledAlgorithms = Try(conf.get(s"$ns.enabledAlgorithms")).toOption diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index b76dbba08af6..8c4fe1f7b2d5 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -22,7 +22,7 @@ import java.security.KeyStore import java.security.cert.X509Certificate import javax.net.ssl._ -import org.apache.commons.io.FileUtils +import com.google.common.io.Files import org.apache.hadoop.io.Text import org.apache.spark.deploy.SparkHadoopUtil @@ -196,13 +196,13 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging { ) } - private[spark] val sslOptions = SSLOptions.parse(ns = "ssl", conf = sparkConf) + private[spark] val sslOptions = SSLOptions.parse(sparkConf, "ssl") private[spark] val (sslSocketFactory, hostnameVerifier) = if (sslOptions.enabled) { val trustStoreManagers = for (trustStore <- sslOptions.trustStore) yield { val ks = KeyStore.getInstance(KeyStore.getDefaultType) - ks.load(FileUtils.openInputStream(sslOptions.trustStore.get), + ks.load(Files.asByteSource(sslOptions.trustStore.get).openStream(), sslOptions.trustStorePassword.get.toCharArray) val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) @@ -230,7 +230,6 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging { (Some(sslContext.getSocketFactory), Some(hostVerifier)) } else { - val sslContext = SSLContext.getDefault (None, None) } diff --git a/core/src/test/resources/bad-ssl.conf b/core/src/test/resources/bad-ssl.conf index 36ec81dff39a..968f974632a8 100644 --- a/core/src/test/resources/bad-ssl.conf +++ b/core/src/test/resources/bad-ssl.conf @@ -24,4 +24,4 @@ ssl.keyPassword password ssl.trustStore truststore ssl.trustStorePassword password ssl.enabledAlgorithms [TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA] -ssl.protocol SSLv3 +ssl.protocol TLSv1.2 diff --git a/core/src/test/resources/good-ssl.conf b/core/src/test/resources/good-ssl.conf index 0f07476251c9..2e5057620d80 100644 --- a/core/src/test/resources/good-ssl.conf +++ b/core/src/test/resources/good-ssl.conf @@ -24,4 +24,4 @@ ssl.keyPassword password ssl.trustStore truststore ssl.trustStorePassword password ssl.enabledAlgorithms [TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA] -ssl.protocol SSLv3 +ssl.protocol TLSv1.2 From f24d854704101d781ab6b15c49caf51c9b4ba9ce Mon Sep 17 00:00:00 2001 From: Jacek Lewandowski Date: Thu, 23 Oct 2014 22:12:31 +0200 Subject: [PATCH 3/3] SPARK-3883: Added ssl.conf.template to .rat-excludes --- .rat-excludes | 1 + 1 file changed, 1 insertion(+) diff --git a/.rat-excludes b/.rat-excludes index eaefef1b0aa2..22b38a335fa6 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -57,3 +57,4 @@ dist/* .*iws logs .*scalastyle-output.xml +ssl.conf.template