Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 143 additions & 15 deletions core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import javax.net.ssl.SSLContext
import scala.collection.JavaConverters._

import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}

import org.apache.spark.network.util.ConfigProvider
import org.apache.spark.network.util.ssl.SSLFactory

import org.eclipse.jetty.util.ssl.SslContextFactory

/**
Expand All @@ -34,23 +38,38 @@ import org.eclipse.jetty.util.ssl.SslContextFactory
* by the protocol, which it can generate the configuration for. Since Akka doesn't support client
* authentication with SSL, SSLOptions cannot support it either.
*
* @param enabled enables or disables SSL; if it is set to false, the rest of the
* settings are disregarded
* @param keyStore a path to the key-store file
* @param keyStorePassword a password to access the key-store file
* @param keyPassword a password to access the private key in the key-store
* @param trustStore a path to the trust-store file
* @param trustStorePassword a password to access the trust-store file
* @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
* @param enabledAlgorithms a set of encryption algorithms that may be used
* @param nameSpace the configuration namespace
* @param enabled enables or disables SSL;
* if it is set to false, the rest of the settings are disregarded
* @param keyStore a path to the key-store file
* @param keyStorePassword a password to access the key-store file
* @param privateKey a PKCS#8 private key file in PEM format
* @param keyPassword a password to access the private key in the key-store
* @param certChain an X.509 certificate chain file in PEM format
* @param trustStore a path to the trust-store file
* @param trustStorePassword a password to access the trust-store file
* @param trustStoreReloadingEnabled enables or disables using a trust-store that that
* reloads its configuration when the trust-store file on disk changes
* @param trustStoreReloadInterval the interval, in milliseconds,
* when the trust-store will reload its configuration
* @param openSslEnabled enables or disables using an OpenSSL implementation
* (if available on host system), requires certChain and keyFile arguments
* @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
* @param enabledAlgorithms a set of encryption algorithms that may be used
*/
private[spark] case class SSLOptions(
nameSpace: Option[String] = None,
enabled: Boolean = false,
keyStore: Option[File] = None,
keyStorePassword: Option[String] = None,
privateKey: Option[File] = None,
keyPassword: Option[String] = None,
certChain: Option[File] = None,
trustStore: Option[File] = None,
trustStorePassword: Option[String] = None,
trustStoreReloadingEnabled: Boolean = false,
trustStoreReloadInterval: Int = 10000,
openSslEnabled: Boolean = false,
protocol: Option[String] = None,
enabledAlgorithms: Set[String] = Set.empty)
extends Logging {
Expand All @@ -76,6 +95,30 @@ private[spark] case class SSLOptions(
}
}

/**
*
* @return
*/
def createSSLFactory: Option[SSLFactory] = {
if (enabled) {
Some(new SSLFactory.Builder()
.requestedProtocol(protocol.getOrElse(null))
.requestedCiphers(enabledAlgorithms.toArray)
.keyStore(keyStore.getOrElse(null), keyStorePassword.getOrElse(null))
.privateKey(privateKey.getOrElse(null))
.keyPassword(keyPassword.getOrElse(null))
.certChain(certChain.getOrElse(null))
.trustStore(
trustStore.getOrElse(null),
trustStorePassword.getOrElse(null),
trustStoreReloadingEnabled,
trustStoreReloadInterval)
.build())
} else {
None
}
}

/**
* Creates an Akka configuration object which contains all the SSL settings represented by this
* object. It can be used then to compose the ultimate Akka configuration.
Expand Down Expand Up @@ -136,11 +179,69 @@ private[spark] case class SSLOptions(
enabledAlgorithms & providerAlgorithms
}

/**
* Simple implicit helper class to allow for string interpolation
* pattern matching...
* @param sc
*/
implicit private class NsContext (val sc: StringContext) {
object ns {
def apply (args : Any*) : String = sc.s (args : _*)
def unapplySeq (s : String) : Option[Seq[String]] = {
val regexp = sc.parts.mkString ("(.+)").r
regexp.unapplySeq(s)
}
}
}

/**
*
* @return
*/
def createConfigProvider(conf: SparkConf): ConfigProvider = {
val nsp = nameSpace.getOrElse("spark.ssl")
new ConfigProvider() {
override def get(name: String): String = conf.get(name)
override def getBoolean(name: String, defaultValue: Boolean): Boolean = {
name match {
case ns"$nsp.enabled" => enabled
case ns"$nsp.trustStoreReloadingEnabled" => trustStoreReloadingEnabled
case ns"$nsp.openSslEnabled" => openSslEnabled
case _ => conf.getBoolean(name, defaultValue)
}
}

override def getInt(name: String, defaultValue: Int): Int = {
name match {
case ns"$nsp.trustStoreReloadInterval" => trustStoreReloadInterval
case _ => conf.getInt(name, defaultValue)
}
}

override def get(name: String, defaultValue: String): String = {
name match {
case ns"$nsp.keyStore" => keyStore.fold(defaultValue)(_.getAbsolutePath)
case ns"$nsp.keyStorePassword" => keyStorePassword.getOrElse(defaultValue)
case ns"$nsp.privateKey" => privateKey.fold(defaultValue)(_.getAbsolutePath)
case ns"$nsp.keyPassword" => keyPassword.getOrElse(defaultValue)
case ns"$nsp.certChain" => certChain.fold(defaultValue)(_.getAbsolutePath)
case ns"$nsp.trustStore" => trustStore.fold(defaultValue)(_.getAbsolutePath)
case ns"$nsp.trustStorePassword" => trustStorePassword.getOrElse(defaultValue)
case ns"$nsp.protocol" => protocol.getOrElse(defaultValue)
case ns"$nsp.enabledAlgorithms" => enabledAlgorithms.mkString(",")
case _ => conf.get(name, defaultValue)
}
}
}
}

/** Returns a string representation of this SSLOptions with all the passwords masked. */
override def toString: String = s"SSLOptions{enabled=$enabled, " +
s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}"
s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
s"keyFile=$privateKey, certChain=$certChain, trustStore=$trustStore, " +
s"trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
s"openSslEnabled=$openSslEnabled, trustStoreReloadingEnabled=$trustStoreReloadingEnabled, " +
s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}"

}

Expand All @@ -150,12 +251,21 @@ private[spark] object SSLOptions extends Logging {
*
* The following settings are allowed:
* $ - `[ns].enabled` - `true` or `false`, to enable or disable SSL respectively
* $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
* $ - `[ns].keyStore` - a path to the key-store file; can be relative
* to the current directory
* $ - `[ns].keyStorePassword` - a password to the key-store file
* $ - `[ns].privateKey` - a PKCS#8 private key file in PEM format
* $ - `[ns].keyPassword` - a password to the private key
* $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
* directory
* $ - `[ns].certChain` - an X.509 certificate chain file in PEM format
* $ - `[ns].trustStore` - a path to the trust-store file; can be relative
* to the current directory
* $ - `[ns].trustStorePassword` - a password to the trust-store file
* $ - `[ns].trustStoreReloadingEnabled` - enables or disables using a trust-store
* that that reloads its configuration when the trust-store file on disk changes
* $ - `[ns].trustStoreReloadInterval` - the interval, in milliseconds, the
* trust-store will reload its configuration
* $ - `[ns].openSslEnabled` - enables or disables using an OpenSSL implementation
* (if available on host system), requires certChain and keyFile arguments
* $ - `[ns].protocol` - a protocol name supported by a particular Java version
* $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
*
Expand All @@ -180,15 +290,27 @@ private[spark] object SSLOptions extends Logging {
val keyStorePassword = conf.getOption(s"$ns.keyStorePassword")
.orElse(defaults.flatMap(_.keyStorePassword))

val privateKey = conf.getOption(s"$ns.privateKey").map(new File(_))
.orElse(defaults.flatMap(_.privateKey))

val keyPassword = conf.getOption(s"$ns.keyPassword")
.orElse(defaults.flatMap(_.keyPassword))

val certChain = conf.getOption(s"$ns.certChain").map(new File(_))
.orElse(defaults.flatMap(_.certChain))

val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_))
.orElse(defaults.flatMap(_.trustStore))

val trustStorePassword = conf.getOption(s"$ns.trustStorePassword")
.orElse(defaults.flatMap(_.trustStorePassword))

val trustStoreReloadingEnabled = conf.getBoolean(s"$ns.trustStoreReloadingEnabled", false)

val trustStoreReloadInterval = conf.getInt(s"$ns.trustStoreReloadInterval", 10000)

val openSslEnabled = conf.getBoolean(s"$ns.openSslEnabled", false)

val protocol = conf.getOption(s"$ns.protocol")
.orElse(defaults.flatMap(_.protocol))

Expand All @@ -198,12 +320,18 @@ private[spark] object SSLOptions extends Logging {
.getOrElse(Set.empty)

new SSLOptions(
Some(ns),
enabled,
keyStore,
keyStorePassword,
privateKey,
keyPassword,
certChain,
trustStore,
trustStorePassword,
trustStoreReloadingEnabled,
trustStoreReloadInterval,
openSslEnabled,
protocol,
enabledAlgorithms)
}
Expand Down
19 changes: 11 additions & 8 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ import org.apache.spark.util.Utils
* over the wire in plaintext.
* Note that SASL is pluggable as to what mechanism it uses. We currently use
* DIGEST-MD5 but this could be changed to use Kerberos or other in the future.
* Spark currently supports "auth" for the quality of protection, which means
* the connection does not support integrity or privacy protection (encryption)
* after authentication. SASL also supports "auth-int" and "auth-conf" which
* SPARK could support in the future to allow the user to specify the quality
* of protection they want. If we support those, the messages will also have to
* be wrapped and unwrapped via the SaslServer/SaslClient.wrap/unwrap API's.
* Spark currently supports "auth" for the quality of protection, and if configured,
* privacy protection (SSL/TLS encryption) for the Netty based implementation.
* SASL also supports "auth-int" and "auth-conf" which SPARK could support
* in the future to allow the user to specify the quality of protection they want.
* If we support those, the messages will also have to be wrapped and unwrapped
* via the SaslServer/SaslClient.wrap/unwrap API's.
*
* Since the NioBlockTransferService does asynchronous messages passing, the SASL
* authentication is a bit more complex. A ConnectionManager can be both a client
Expand Down Expand Up @@ -169,15 +169,16 @@ import org.apache.spark.util.Utils
* configuration for the particular protocol, the properties must be overwritten in the
* protocol-specific namespace. Use `spark.ssl.yyy.xxx` settings to overwrite the global
* configuration for particular protocol denoted by `yyy`. Currently `yyy` can be either `akka` for
* Akka based connections or `fs` for broadcast and file server.
* Akka based connections, `fs` for broadcast and file server, or `bts` for the Netty based
* block transfer service.
*
* Refer to [[org.apache.spark.SSLOptions]] documentation for the list of
* options that can be specified.
*
* SecurityManager initializes SSLOptions objects for different protocols separately. SSLOptions
* object parses Spark configuration at a given namespace and builds the common representation
* of SSL settings. SSLOptions is then used to provide protocol-specific configuration like
* TypeSafe configuration for Akka or SSLContextFactory for Jetty.
* TypeSafe configuration for Akka, SSLContextFactory for Jetty, or SSLContext for Netty.
*
* SSL must be configured on each node and configured for each component involved in
* communication using the particular protocol. In YARN clusters, the key-store can be prepared on
Expand Down Expand Up @@ -248,9 +249,11 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
// configuration at a specified namespace. The namespace *must* start with spark.ssl.
val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions))
val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions))
val btsSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.bts", Some(defaultSSLOptions))

logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions")
logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions")
logDebug(s"SSLConfiguration for block transfer service (Netty): $btsSSLOptions")

val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) {
val trustStoreManagers =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
private val useSasl: Boolean = securityManager.isAuthenticationEnabled()

private val transportConf =
SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
private val transportConf = SparkTransportConf.fromSparkConf(
sparkConf, "shuffle", numUsableCores = 0, Some(securityManager.btsSSLOptions))
private val blockHandler = newShuffleBlockHandler(transportConf)
private val transportContext: TransportContext =
new TransportContext(transportConf, blockHandler, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
// TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
private val serializer = new JavaSerializer(conf)
private val authEnabled = securityManager.isAuthenticationEnabled()
private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numCores)
private val transportConf = SparkTransportConf.fromSparkConf(
conf, "shuffle", numCores, Some(securityManager.btsSSLOptions))

private[this] var transportContext: TransportContext = _
private[this] var server: TransportServer = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.network.netty

import org.apache.spark.SparkConf
import org.apache.spark.{SSLOptions, SparkConf}
import org.apache.spark.network.util.{TransportConf, ConfigProvider}

/**
Expand Down Expand Up @@ -45,8 +45,13 @@ object SparkTransportConf {
* @param numUsableCores if nonzero, this will restrict the server and client threads to only
* use the given number of cores, rather than all of the machine's cores.
* This restriction will only occur if these properties are not already set.
* @param sslOptions SSL config options
*/
def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = {
def fromSparkConf(
_conf: SparkConf,
module: String,
numUsableCores: Int = 0,
sslOptions: Option[SSLOptions] = None): TransportConf = {
val conf = _conf.clone

// Specify thread configuration based on our JVM's allocation of cores (rather than necessarily
Expand All @@ -56,9 +61,9 @@ object SparkTransportConf {
conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)

new TransportConf(module, new ConfigProvider {
new TransportConf(module, sslOptions.fold(new ConfigProvider {
override def get(name: String): String = conf.get(name)
})
})(_.createConfigProvider(conf)))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ private[spark] class BlockManager(
// Client to read other executors' shuffle files. This is either an external service, or just the
// standard BlockTransferService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
val transConf = SparkTransportConf.fromSparkConf(
conf, "shuffle", numUsableCores, Some(securityManager.btsSSLOptions))
new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
securityManager.isSaslEncryptionEnabled())
} else {
Expand Down
Loading