Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
configsWithAlternatives.get(key).toSeq.flatten.exists { alt => contains(alt.key) }
}

private[spark] def contains(entry: ConfigEntry[_]): Boolean = contains(entry.key)

/** Copy this object */
override def clone: SparkConf = {
val cloned = new SparkConf(false)
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
logInfo("Spark configuration:\n" + _conf.toDebugString)
}

// Set Spark driver host and port system properties
_conf.setIfMissing("spark.driver.host", Utils.localHostName())
// Set Spark driver host and port system properties. This explicitly sets the configuration
// instead of relying on the default value of the config constant.
_conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
_conf.setIfMissing("spark.driver.port", "0")

_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
Expand Down
27 changes: 20 additions & 7 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.netty.NettyBlockTransferService
Expand Down Expand Up @@ -158,14 +159,17 @@ object SparkEnv extends Logging {
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
assert(conf.contains(DRIVER_HOST_ADDRESS),
s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
val hostname = conf.get("spark.driver.host")
val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
val port = conf.get("spark.driver.port").toInt
create(
conf,
SparkContext.DRIVER_IDENTIFIER,
hostname,
bindAddress,
advertiseAddress,
port,
isDriver = true,
isLocal = isLocal,
Expand All @@ -190,6 +194,7 @@ object SparkEnv extends Logging {
conf,
executorId,
hostname,
hostname,
port,
isDriver = false,
isLocal = isLocal,
Expand All @@ -205,7 +210,8 @@ object SparkEnv extends Logging {
private def create(
conf: SparkConf,
executorId: String,
hostname: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
isDriver: Boolean,
isLocal: Boolean,
Expand All @@ -221,8 +227,8 @@ object SparkEnv extends Logging {
val securityManager = new SecurityManager(conf)

val systemName = if (isDriver) driverSystemName else executorSystemName
val rpcEnv = RpcEnv.create(systemName, hostname, port, conf, securityManager,
clientMode = !isDriver)
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
securityManager, clientMode = !isDriver)

// Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
// In the non-driver case, the RPC env's address may be null since it may not be listening
Expand Down Expand Up @@ -309,8 +315,15 @@ object SparkEnv extends Logging {
UnifiedMemoryManager(conf, numUsableCores)
}

val blockManagerPort = if (isDriver) {
conf.get(DRIVER_BLOCK_MANAGER_PORT)
} else {
conf.get(BLOCK_MANAGER_PORT)
}

val blockTransferService =
new NettyBlockTransferService(conf, securityManager, hostname, numUsableCores)
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
blockManagerPort, numUsableCores)

val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private[spark] class SparkConfigProvider(conf: JMap[String, String]) extends Con
findEntry(key) match {
case e: ConfigEntryWithDefault[_] => Option(e.defaultValueString)
case e: ConfigEntryWithDefaultString[_] => Option(e.defaultValueString)
case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key)
case e: FallbackConfigEntry[_] => get(e.fallback.key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to backport this fix to 2.0.1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this code is only in master.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are facing this issue with Spark 1.6 . Are we going to backport this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What issue? The code you're commenting on does not exist in 1.6. If you're having issues, please ask questions on the mailing lists or use the bug tracker.

case _ => None
}
}
Expand Down
20 changes: 20 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.internal

import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.util.Utils

package object config {

Expand Down Expand Up @@ -143,4 +144,23 @@ package object config {
.internal()
.stringConf
.createWithDefaultString("AES/CTR/NoPadding")

private[spark] val DRIVER_HOST_ADDRESS = ConfigBuilder("spark.driver.host")
.doc("Address of driver endpoints.")
.stringConf
.createWithDefault(Utils.localHostName())
Copy link
Member

@zsxwing zsxwing Sep 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a broken change. If a user uses spark.driver.host to specify the bind address, it won't work now. Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should, as long as he doesn't set "spark.driver.bindAddress" - which is a new setting that is being added to override that behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I miss something. val bindAddress = conf.get(DRIVER_BIND_ADDRESS) won't use spark.driver.host. Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see what you mean. I might have inverted the config resolution order... let me take a look.


private[spark] val DRIVER_BIND_ADDRESS = ConfigBuilder("spark.driver.bindAddress")
.doc("Address where to bind network listen sockets on the driver.")
.fallbackConf(DRIVER_HOST_ADDRESS)

private[spark] val BLOCK_MANAGER_PORT = ConfigBuilder("spark.blockManager.port")
.doc("Port to use for the block manager when a more specific setting is not provided.")
.intConf
.createWithDefault(0)

private[spark] val DRIVER_BLOCK_MANAGER_PORT = ConfigBuilder("spark.driver.blockManager.port")
.doc("Port to use for the block managed on the driver.")
.fallbackConf(BLOCK_MANAGER_PORT)

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ import org.apache.spark.util.Utils
private[spark] class NettyBlockTransferService(
conf: SparkConf,
securityManager: SecurityManager,
bindAddress: String,
override val hostName: String,
_port: Int,
numCores: Int)
extends BlockTransferService {

Expand Down Expand Up @@ -75,12 +77,11 @@ private[spark] class NettyBlockTransferService(
/** Creates and binds the TransportServer, possibly trying multiple ports. */
private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = {
def startService(port: Int): (TransportServer, Int) = {
val server = transportContext.createServer(hostName, port, bootstraps.asJava)
val server = transportContext.createServer(bindAddress, port, bootstraps.asJava)
(server, server.getPort)
}

val portToTry = conf.getInt("spark.blockManager.port", 0)
Utils.startServiceOnPort(portToTry, startService, conf, getClass.getName)._1
Utils.startServiceOnPort(_port, startService, conf, getClass.getName)._1
}

override def fetchBlocks(
Expand Down
17 changes: 15 additions & 2 deletions core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,19 @@ private[spark] object RpcEnv {
conf: SparkConf,
securityManager: SecurityManager,
clientMode: Boolean = false): RpcEnv = {
val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)
create(name, host, host, port, conf, securityManager, clientMode)
}

def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
clientMode: Boolean): RpcEnv = {
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
clientMode)
new NettyRpcEnvFactory().create(config)
}
}
Expand Down Expand Up @@ -186,7 +198,8 @@ private[spark] trait RpcEnvFileServer {
private[spark] case class RpcEnvConfig(
conf: SparkConf,
name: String,
host: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
securityManager: SecurityManager,
clientMode: Boolean)
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ private[netty] class NettyRpcEnv(
}
}

def startServer(port: Int): Unit = {
def startServer(bindAddress: String, port: Int): Unit = {
val bootstraps: java.util.List[TransportServerBootstrap] =
if (securityManager.isAuthenticationEnabled()) {
java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager))
} else {
java.util.Collections.emptyList()
}
server = transportContext.createServer(host, port, bootstraps)
server = transportContext.createServer(bindAddress, port, bootstraps)
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}
Expand Down Expand Up @@ -441,10 +441,11 @@ private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
val javaSerializerInstance =
new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, config.securityManager)
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager)
if (!config.clientMode) {
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
nettyEnv.startServer(actualPort)
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
try {
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.json4s.JsonAST.{JNothing, JValue}

import org.apache.spark.{SecurityManager, SparkConf, SSLOptions}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils

Expand All @@ -50,8 +51,8 @@ private[spark] abstract class WebUI(
protected val handlers = ArrayBuffer[ServletContextHandler]()
protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]]
protected var serverInfo: Option[ServerInfo] = None
protected val localHostName = Utils.localHostNameForURI()
protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(
conf.get(DRIVER_HOST_ADDRESS))
private val className = Utils.getFormattedClassName(this)

def getBasePath: String = basePath
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2079,9 +2079,9 @@ private[spark] object Utils extends Logging {
case e: Exception if isBindCollision(e) =>
if (offset >= maxRetries) {
val exceptionMessage = s"${e.getMessage}: Service$serviceString failed after " +
s"$maxRetries retries! Consider explicitly setting the appropriate port for the " +
s"service$serviceString (for example spark.ui.port for SparkUI) to an available " +
"port or increasing spark.port.maxRetries."
s"$maxRetries retries (starting from $startPort)! Consider explicitly setting " +
s"the appropriate port for the service$serviceString (for example spark.ui.port " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since you are touching this, could you add a space between service and $serviceString?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The space is actually in $serviceString because the service name is allowed to be empty. (Historical reasons? Don't ask me.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

s"for SparkUI) to an available port or increasing spark.port.maxRetries."
val exception = new BindException(exceptionMessage)
// restore original stack trace
exception.setStackTrace(e.getStackTrace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,13 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
when(blockManager.getBlockData(blockId)).thenReturn(blockBuffer)

val securityManager0 = new SecurityManager(conf0)
val exec0 = new NettyBlockTransferService(conf0, securityManager0, "localhost", numCores = 1)
val exec0 = new NettyBlockTransferService(conf0, securityManager0, "localhost", "localhost", 0,
1)
exec0.init(blockManager)

val securityManager1 = new SecurityManager(conf1)
val exec1 = new NettyBlockTransferService(conf1, securityManager1, "localhost", numCores = 1)
val exec1 = new NettyBlockTransferService(conf1, securityManager1, "localhost", "localhost", 0,
1)
exec1.init(blockManager)

val result = fetchBlock(exec0, exec1, "1", blockId) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.mockito.Mockito.mock
import org.scalatest._

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.internal.config._
import org.apache.spark.network.BlockDataManager

class NettyBlockTransferServiceSuite
Expand Down Expand Up @@ -86,10 +87,10 @@ class NettyBlockTransferServiceSuite
private def createService(port: Int): NettyBlockTransferService = {
val conf = new SparkConf()
.set("spark.app.id", s"test-${getClass.getName}")
.set("spark.blockManager.port", port.toString)
val securityManager = new SecurityManager(conf)
val blockDataManager = mock(classOf[BlockDataManager])
val service = new NettyBlockTransferService(conf, securityManager, "localhost", numCores = 1)
val service = new NettyBlockTransferService(conf, securityManager, "localhost", "localhost",
port, 1)
service.init(blockDataManager)
service
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class NettyRpcEnvSuite extends RpcEnvSuite {
name: String,
port: Int,
clientMode: Boolean = false): RpcEnv = {
val config = RpcEnvConfig(conf, "test", "localhost", port, new SecurityManager(conf),
clientMode)
val config = RpcEnvConfig(conf, "test", "localhost", "localhost", port,
new SecurityManager(conf), clientMode)
new NettyRpcEnvFactory().create(config)
}

Expand All @@ -41,4 +41,16 @@ class NettyRpcEnvSuite extends RpcEnvSuite {
assert(e.getCause.getMessage.contains(uri))
}

test("advertise address different from bind address") {
val sparkConf = new SparkConf()
val config = RpcEnvConfig(sparkConf, "test", "localhost", "example.com", 0,
new SecurityManager(sparkConf), false)
val env = new NettyRpcEnvFactory().create(config)
try {
assert(env.address.hostPort.startsWith("example.com:"))
} finally {
env.shutdown()
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
conf.set("spark.testing.memory", maxMem.toString)
conf.set("spark.memory.offHeap.size", maxMem.toString)
val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1)
val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
val memManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(serializer, conf)
val store = new BlockManager(name, rpcEnv, master, serializerManager, conf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.memory.offHeap.size", maxMem.toString)
val serializer = new KryoSerializer(conf)
val transfer = transferService
.getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1))
.getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1))
val memManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(serializer, conf)
val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, conf,
Expand Down Expand Up @@ -854,7 +854,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("block store put failure") {
// Use Java serializer so we can create an unserializable error.
conf.set("spark.testing.memory", "1200")
val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1)
val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(new JavaSerializer(conf), conf)
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
Expand Down
23 changes: 22 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1067,11 +1067,32 @@ Apart from these, the following properties are also available, and may be useful
Port for all block managers to listen on. These exist on both the driver and the executors.
</td>
</tr>
<tr>
<td><code>spark.driver.blockManager.port</code></td>
<td>(value of spark.blockManager.port)</td>
<td>
Driver-specific port for the block manager to listen on, for cases where it cannot use the same
configuration as executors.
</td>
</tr>
<tr>
<td><code>spark.driver.bindAddress</code></td>
<td>(value of spark.driver.host)</td>
<td>
<p>Hostname or IP address where to bind listening sockets. This config overrides the SPARK_LOCAL_IP
environment variable (see below).</p>

<p>It also allows a different address from the local one to be advertised to executors or external systems.
This is useful, for example, when running containers with bridged networking. For this to properly work,
the different ports used by the driver (RPC, block manager and UI) need to be forwarded from the
container's host.</p>
</td>
</tr>
<tr>
<td><code>spark.driver.host</code></td>
<td>(local hostname)</td>
<td>
Hostname or IP address for the driver to listen on.
Hostname or IP address for the driver.
This is used for communicating with the executors and the standalone Master.
</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.mesos.protobuf.{ByteString, GeneratedMessage}
import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.TaskState
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils


Expand Down Expand Up @@ -424,7 +425,7 @@ trait MesosSchedulerUtils extends Logging {
}
}

val managedPortNames = List("spark.executor.port", "spark.blockManager.port")
val managedPortNames = List("spark.executor.port", BLOCK_MANAGER_PORT.key)

/**
* The values of the non-zero ports to be used by the executor process.
Expand Down
Loading