Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -185,19 +185,6 @@
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-remote_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
Expand All @@ -224,6 +211,10 @@
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</dependency>
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
* is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter).
*
* Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
* workaround for the issue, which is ultimately caused by the way the BlockManager actors
* issue inter-dependent blocking Akka messages to each other at high frequencies. This happens,
* workaround for the issue, which is ultimately caused by the way the BlockManager endpoints
* issue inter-dependent blocking RPC messages to each other at high frequencies. This happens,
* for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
* longer in scope.
*/
Expand All @@ -101,7 +101,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
* exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this
* parameter by default disables blocking on shuffle cleanups. Note that this does not affect
* the cleanup of RDDs and broadcasts. This is intended to be a temporary workaround,
* until the real Akka issue (referred to in the comment above `blockOnCleanupTasks`) is
* until the real RPC issue (referred to in the comment above `blockOnCleanupTasks`) is
* resolved.
*/
private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
Expand Down
91 changes: 0 additions & 91 deletions core/src/main/scala/org/apache/spark/HttpFileServer.scala

This file was deleted.

7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,18 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
private[spark] class MapOutputTrackerMasterEndpoint(
override val rpcEnv: RpcEnv, tracker: MapOutputTrackerMaster, conf: SparkConf)
extends RpcEndpoint with Logging {
val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case GetMapOutputStatuses(shuffleId: Int) =>
val hostPort = context.senderAddress.hostPort
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId)
val serializedSize = mapOutputStatuses.length
if (serializedSize > maxAkkaFrameSize) {
if (serializedSize > maxRpcMessageSize) {

val msg = s"Map output statuses were $serializedSize bytes which " +
s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes)."
s"exceeds spark.rpc.message.maxSize ($maxRpcMessageSize bytes)."

/* For SPARK-1244 we'll opt for just logging an error and then sending it to the sender.
* A bigger refactoring (SPARK-1239) will ultimately remove this entire code path. */
Expand Down
43 changes: 1 addition & 42 deletions core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,14 @@ import java.io.File
import java.security.NoSuchAlgorithmException
import javax.net.ssl.SSLContext

import scala.collection.JavaConverters._

import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.eclipse.jetty.util.ssl.SslContextFactory

/**
* SSLOptions class is a common container for SSL configuration options. It offers methods to
* generate specific objects to configure SSL for different communication protocols.
*
* SSLOptions is intended to provide the maximum common set of SSL settings, which are supported
* by the protocol, which it can generate the configuration for. Since Akka doesn't support client
* authentication with SSL, SSLOptions cannot support it either.
* by the protocol, which it can generate the configuration for.
*
* @param enabled enables or disables SSL; if it is set to false, the rest of the
* settings are disregarded
Expand Down Expand Up @@ -88,43 +84,6 @@ private[spark] case class SSLOptions(
}
}

/**
* Creates an Akka configuration object which contains all the SSL settings represented by this
* object. It can be used then to compose the ultimate Akka configuration.
*/
def createAkkaConfig: Option[Config] = {
if (enabled) {
if (keyStoreType.isDefined) {
logWarning("Akka configuration does not support key store type.");
}
if (trustStoreType.isDefined) {
logWarning("Akka configuration does not support trust store type.");
}

Some(ConfigFactory.empty()
.withValue("akka.remote.netty.tcp.security.key-store",
ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
.withValue("akka.remote.netty.tcp.security.key-store-password",
ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.trust-store",
ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse("")))
.withValue("akka.remote.netty.tcp.security.trust-store-password",
ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.key-password",
ConfigValueFactory.fromAnyRef(keyPassword.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.random-number-generator",
ConfigValueFactory.fromAnyRef(""))
.withValue("akka.remote.netty.tcp.security.protocol",
ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.enabled-algorithms",
ConfigValueFactory.fromIterable(supportedAlgorithms.asJava))
.withValue("akka.remote.netty.tcp.enable-ssl",
ConfigValueFactory.fromAnyRef(true)))
} else {
None
}
}

/*
* The supportedAlgorithms set is a subset of the enabledAlgorithms that
* are supported by the current Java security provider for this protocol.
Expand Down
19 changes: 4 additions & 15 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,6 @@ import org.apache.spark.util.Utils
* At this point spark has multiple communication protocols that need to be secured and
* different underlying mechanisms are used depending on the protocol:
*
* - Akka -> The only option here is to use the Akka Remote secure-cookie functionality.
* Akka remoting allows you to specify a secure cookie that will be exchanged
* and ensured to be identical in the connection handshake between the client
* and the server. If they are not identical then the client will be refused
* to connect to the server. There is no control of the underlying
* authentication mechanism so its not clear if the password is passed in
* plaintext or uses DIGEST-MD5 or some other mechanism.
*
* Akka also has an option to turn on SSL, this option is currently supported (see
* the details below).
*
* - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty
* for the HttpServer. Jetty supports multiple authentication mechanisms -
* Basic, Digest, Form, Spengo, etc. It also supports multiple different login
Expand Down Expand Up @@ -168,16 +157,16 @@ import org.apache.spark.util.Utils
* denote the global configuration for all the supported protocols. In order to override the global
* configuration for the particular protocol, the properties must be overwritten in the
* protocol-specific namespace. Use `spark.ssl.yyy.xxx` settings to overwrite the global
* configuration for particular protocol denoted by `yyy`. Currently `yyy` can be either `akka` for
* Akka based connections or `fs` for broadcast and file server.
* configuration for particular protocol denoted by `yyy`. Currently `yyy` can be only`fs` for
* broadcast and file server.
*
* Refer to [[org.apache.spark.SSLOptions]] documentation for the list of
* options that can be specified.
*
* SecurityManager initializes SSLOptions objects for different protocols separately. SSLOptions
* object parses Spark configuration at a given namespace and builds the common representation
* of SSL settings. SSLOptions is then used to provide protocol-specific configuration like
* TypeSafe configuration for Akka or SSLContextFactory for Jetty.
* of SSL settings. SSLOptions is then used to provide protocol-specific SSLContextFactory for
* Jetty.
*
* SSL must be configured on each node and configured for each component involved in
* communication using the particular protocol. In YARN clusters, the key-store can be prepared on
Expand Down
32 changes: 11 additions & 21 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -344,17 +344,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
.map{case (k, v) => (k.substring(prefix.length), v)}
}

/** Get all akka conf variables set on this SparkConf */
def getAkkaConf: Seq[(String, String)] =
/* This is currently undocumented. If we want to make this public we should consider
* nesting options under the spark namespace to avoid conflicts with user akka options.
* Otherwise users configuring their own akka code via system properties could mess up
* spark's akka options.
*
* E.g. spark.akka.option.x.y.x = "value"
*/
getAll.filter { case (k, _) => isAkkaConf(k) }

/**
* Returns the Spark application id, valid in the Driver after TaskScheduler registration and
* from the start in the Executor.
Expand Down Expand Up @@ -600,7 +589,9 @@ private[spark] object SparkConf extends Logging {
"spark.yarn.max.executor.failures" -> Seq(
AlternateConfig("spark.yarn.max.worker.failures", "1.5")),
"spark.memory.offHeap.enabled" -> Seq(
AlternateConfig("spark.unsafe.offHeap", "1.6"))
AlternateConfig("spark.unsafe.offHeap", "1.6")),
"spark.rpc.message.maxSize" -> Seq(
AlternateConfig("spark.akka.frameSize", "1.6"))
)

/**
Expand All @@ -615,21 +606,13 @@ private[spark] object SparkConf extends Logging {
}.toMap
}

/**
* Return whether the given config is an akka config (e.g. akka.actor.provider).
* Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout).
*/
def isAkkaConf(name: String): Boolean = name.startsWith("akka.")

/**
* Return whether the given config should be passed to an executor on start-up.
*
* Certain akka and authentication configs are required from the executor when it connects to
* Certain authentication configs are required from the executor when it connects to
* the scheduler, while the rest of the spark configs can be inherited from the driver later.
*/
def isExecutorStartupConf(name: String): Boolean = {
isAkkaConf(name) ||
name.startsWith("spark.akka") ||
(name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||
name.startsWith("spark.ssl") ||
name.startsWith("spark.rpc") ||
Expand Down Expand Up @@ -664,12 +647,19 @@ private[spark] object SparkConf extends Logging {
logWarning(
s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
s"may be removed in the future. ${cfg.deprecationMessage}")
return
}

allAlternatives.get(key).foreach { case (newKey, cfg) =>
logWarning(
s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
s"and may be removed in the future. Please use the new key '$newKey' instead.")
return
}
if (key.startsWith("spark.akka") || key.startsWith("spark.ssl.akka")) {
logWarning(
s"The configuration key $key is not supported any more " +
s"because Spark doesn't use Akka since 2.0")
}
}

Expand Down
Loading