Skip to content

Commit 829cd7b

Browse files
jerryshaoMarcelo Vanzin
authored andcommitted
[SPARK-20605][CORE][YARN][MESOS] Deprecate not used AM and executor port configuration
## What changes were proposed in this pull request? After SPARK-10997, client mode Netty RpcEnv doesn't require to start server, so port configurations are not used any more, here propose to remove these two configurations: "spark.executor.port" and "spark.am.port". ## How was this patch tested? Existing UTs. Author: jerryshao <[email protected]> Closes #17866 from jerryshao/SPARK-20605.
1 parent aeb2ecc commit 829cd7b

File tree

10 files changed

+22
-57
lines changed

10 files changed

+22
-57
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,9 @@ private[spark] object SparkConf extends Logging {
579579
"are no longer accepted. To specify the equivalent now, one may use '64k'."),
580580
DeprecatedConfig("spark.rpc", "2.0", "Not used any more."),
581581
DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0",
582-
"Please use the new blacklisting options, spark.blacklist.*")
582+
"Please use the new blacklisting options, spark.blacklist.*"),
583+
DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used any more"),
584+
DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more")
583585
)
584586

585587
Map(configs.map { cfg => (cfg.key -> cfg) } : _*)

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ object SparkEnv extends Logging {
177177
SparkContext.DRIVER_IDENTIFIER,
178178
bindAddress,
179179
advertiseAddress,
180-
port,
180+
Option(port),
181181
isLocal,
182182
numCores,
183183
ioEncryptionKey,
@@ -194,7 +194,6 @@ object SparkEnv extends Logging {
194194
conf: SparkConf,
195195
executorId: String,
196196
hostname: String,
197-
port: Int,
198197
numCores: Int,
199198
ioEncryptionKey: Option[Array[Byte]],
200199
isLocal: Boolean): SparkEnv = {
@@ -203,7 +202,7 @@ object SparkEnv extends Logging {
203202
executorId,
204203
hostname,
205204
hostname,
206-
port,
205+
None,
207206
isLocal,
208207
numCores,
209208
ioEncryptionKey
@@ -220,7 +219,7 @@ object SparkEnv extends Logging {
220219
executorId: String,
221220
bindAddress: String,
222221
advertiseAddress: String,
223-
port: Int,
222+
port: Option[Int],
224223
isLocal: Boolean,
225224
numUsableCores: Int,
226225
ioEncryptionKey: Option[Array[Byte]],
@@ -243,17 +242,12 @@ object SparkEnv extends Logging {
243242
}
244243

245244
val systemName = if (isDriver) driverSystemName else executorSystemName
246-
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
245+
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
247246
securityManager, clientMode = !isDriver)
248247

249248
// Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
250-
// In the non-driver case, the RPC env's address may be null since it may not be listening
251-
// for incoming connections.
252249
if (isDriver) {
253250
conf.set("spark.driver.port", rpcEnv.address.port.toString)
254-
} else if (rpcEnv.address != null) {
255-
conf.set("spark.executor.port", rpcEnv.address.port.toString)
256-
logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port.toString}")
257251
}
258252

259253
// Create an instance of the class with the given name, possibly initializing it with our conf

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,11 +191,10 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
191191

192192
// Bootstrap to fetch the driver's Spark properties.
193193
val executorConf = new SparkConf
194-
val port = executorConf.getInt("spark.executor.port", 0)
195194
val fetcher = RpcEnv.create(
196195
"driverPropsFetcher",
197196
hostname,
198-
port,
197+
-1,
199198
executorConf,
200199
new SecurityManager(executorConf),
201200
clientMode = true)
@@ -221,7 +220,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
221220
}
222221

223222
val env = SparkEnv.createExecutorEnv(
224-
driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)
223+
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
225224

226225
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
227226
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))

docs/running-on-mesos.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ provide such guarantees on the offer stream.
209209

210210
In this mode spark executors will honor port allocation if such is
211211
provided from the user. Specifically if the user defines
212-
`spark.executor.port` or `spark.blockManager.port` in Spark configuration,
212+
`spark.blockManager.port` in Spark configuration,
213213
the mesos scheduler will check the available offers for a valid port
214214
range containing the port numbers. If no such range is available it will
215215
not launch any task. If no restriction is imposed on port numbers by the

docs/running-on-yarn.md

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -239,13 +239,6 @@ To use a custom metrics.properties for the application master and executors, upd
239239
Same as <code>spark.yarn.driver.memoryOverhead</code>, but for the YARN Application Master in client mode.
240240
</td>
241241
</tr>
242-
<tr>
243-
<td><code>spark.yarn.am.port</code></td>
244-
<td>(random)</td>
245-
<td>
246-
Port for the YARN Application Master to listen on. In YARN client mode, this is used to communicate between the Spark driver running on a gateway and the YARN Application Master running on YARN. In YARN cluster mode, this is used for the dynamic executor feature, where it handles the kill from the scheduler backend.
247-
</td>
248-
</tr>
249242
<tr>
250243
<td><code>spark.yarn.queue</code></td>
251244
<td><code>default</code></td>

resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,8 @@ private[spark] class MesosExecutorBackend
7474
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++
7575
Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
7676
val conf = new SparkConf(loadDefaults = true).setAll(properties)
77-
val port = conf.getInt("spark.executor.port", 0)
7877
val env = SparkEnv.createExecutorEnv(
79-
conf, executorId, slaveInfo.getHostname, port, cpusPerTask, None, isLocal = false)
78+
conf, executorId, slaveInfo.getHostname, cpusPerTask, None, isLocal = false)
8079

8180
executor = new Executor(
8281
executorId,

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ trait MesosSchedulerUtils extends Logging {
438438
}
439439
}
440440

441-
val managedPortNames = List("spark.executor.port", BLOCK_MANAGER_PORT.key)
441+
val managedPortNames = List(BLOCK_MANAGER_PORT.key)
442442

443443
/**
444444
* The values of the non-zero ports to be used by the executor process.

resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -179,40 +179,25 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
179179

180180
test("Port reservation is done correctly with user specified ports only") {
181181
val conf = new SparkConf()
182-
conf.set("spark.executor.port", "3000" )
183182
conf.set(BLOCK_MANAGER_PORT, 4000)
184183
val portResource = createTestPortResource((3000, 5000), Some("my_role"))
185184

186185
val (resourcesLeft, resourcesToBeUsed) = utils
187-
.partitionPortResources(List(3000, 4000), List(portResource))
188-
resourcesToBeUsed.length shouldBe 2
186+
.partitionPortResources(List(4000), List(portResource))
187+
resourcesToBeUsed.length shouldBe 1
189188

190189
val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}.toArray
191190

192-
portsToUse.length shouldBe 2
193-
arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true
191+
portsToUse.length shouldBe 1
192+
arePortsEqual(portsToUse, Array(4000L)) shouldBe true
194193

195194
val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
196195

197-
val expectedUSed = Array((3000L, 3000L), (4000L, 4000L))
196+
val expectedUSed = Array((4000L, 4000L))
198197

199198
arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true
200199
}
201200

202-
test("Port reservation is done correctly with some user specified ports (spark.executor.port)") {
203-
val conf = new SparkConf()
204-
conf.set("spark.executor.port", "3100" )
205-
val portResource = createTestPortResource((3000, 5000), Some("my_role"))
206-
207-
val (resourcesLeft, resourcesToBeUsed) = utils
208-
.partitionPortResources(List(3100), List(portResource))
209-
210-
val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
211-
212-
portsToUse.length shouldBe 1
213-
portsToUse.contains(3100) shouldBe true
214-
}
215-
216201
test("Port reservation is done correctly with all random ports") {
217202
val conf = new SparkConf()
218203
val portResource = createTestPortResource((3000L, 5000L), Some("my_role"))
@@ -226,21 +211,20 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
226211

227212
test("Port reservation is done correctly with user specified ports only - multiple ranges") {
228213
val conf = new SparkConf()
229-
conf.set("spark.executor.port", "2100" )
230214
conf.set("spark.blockManager.port", "4000")
231215
val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
232216
createTestPortResource((2000, 2500), Some("other_role")))
233217
val (resourcesLeft, resourcesToBeUsed) = utils
234-
.partitionPortResources(List(2100, 4000), portResourceList)
218+
.partitionPortResources(List(4000), portResourceList)
235219
val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
236220

237-
portsToUse.length shouldBe 2
221+
portsToUse.length shouldBe 1
238222
val portsRangesLeft = rangesResourcesToTuple(resourcesLeft)
239223
val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
240224

241-
val expectedUsed = Array((2100L, 2100L), (4000L, 4000L))
225+
val expectedUsed = Array((4000L, 4000L))
242226

243-
arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true
227+
arePortsEqual(portsToUse.toArray, Array(4000L)) shouldBe true
244228
arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true
245229
}
246230

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -429,8 +429,7 @@ private[spark] class ApplicationMaster(
429429
}
430430

431431
private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
432-
val port = sparkConf.get(AM_PORT)
433-
rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr,
432+
rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, -1, sparkConf, securityMgr,
434433
clientMode = true)
435434
val driverRef = waitForSparkDriver()
436435
addAmIpFilter()

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,6 @@ package object config {
4040
.timeConf(TimeUnit.MILLISECONDS)
4141
.createOptional
4242

43-
private[spark] val AM_PORT =
44-
ConfigBuilder("spark.yarn.am.port")
45-
.intConf
46-
.createWithDefault(0)
47-
4843
private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
4944
ConfigBuilder("spark.yarn.executor.failuresValidityInterval")
5045
.doc("Interval after which Executor failures will be considered independent and not " +

0 commit comments

Comments
 (0)