Skip to content

Commit e990325

Browse files
committed
Merge branch 'master' into stratified
2 parents 555a3f9 + 28dbae8 commit e990325

File tree

9 files changed

+54
-44
lines changed

9 files changed

+54
-44
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ private[spark] class Worker(
136136
logInfo("Spark home: " + sparkHome)
137137
createWorkDir()
138138
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
139-
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
139+
webUi = new WorkerWebUI(this, workDir, webUiPort)
140140
webUi.bind()
141141
registerWithMaster()
142142

@@ -373,7 +373,8 @@ private[spark] class Worker(
373373
private[spark] object Worker extends Logging {
374374
def main(argStrings: Array[String]) {
375375
SignalLogger.register(log)
376-
val args = new WorkerArguments(argStrings)
376+
val conf = new SparkConf
377+
val args = new WorkerArguments(argStrings, conf)
377378
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
378379
args.memory, args.masters, args.workDir)
379380
actorSystem.awaitTermination()

core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ package org.apache.spark.deploy.worker
2020
import java.lang.management.ManagementFactory
2121

2222
import org.apache.spark.util.{IntParam, MemoryParam, Utils}
23+
import org.apache.spark.SparkConf
2324

2425
/**
2526
* Command-line parser for the worker.
2627
*/
27-
private[spark] class WorkerArguments(args: Array[String]) {
28+
private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
2829
var host = Utils.localHostName()
2930
var port = 0
3031
var webUiPort = 8081
@@ -46,6 +47,9 @@ private[spark] class WorkerArguments(args: Array[String]) {
4647
if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {
4748
webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt
4849
}
50+
if (conf.contains("spark.worker.ui.port")) {
51+
webUiPort = conf.get("spark.worker.ui.port").toInt
52+
}
4953
if (System.getenv("SPARK_WORKER_DIR") != null) {
5054
workDir = System.getenv("SPARK_WORKER_DIR")
5155
}

core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ private[spark]
3434
class WorkerWebUI(
3535
val worker: Worker,
3636
val workDir: File,
37-
port: Option[Int] = None)
38-
extends WebUI(worker.securityMgr, getUIPort(port, worker.conf), worker.conf, name = "WorkerUI")
37+
requestedPort: Int)
38+
extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
3939
with Logging {
4040

4141
val timeout = AkkaUtils.askTimeout(worker.conf)
@@ -55,10 +55,5 @@ class WorkerWebUI(
5555
}
5656

5757
private[spark] object WorkerWebUI {
58-
val DEFAULT_PORT = 8081
5958
val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
60-
61-
def getUIPort(requestedPort: Option[Int], conf: SparkConf): Int = {
62-
requestedPort.getOrElse(conf.getInt("spark.worker.ui.port", WorkerWebUI.DEFAULT_PORT))
63-
}
6459
}

core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,7 @@ class UnionRDD[T: ClassTag](
8383

8484
override def compute(s: Partition, context: TaskContext): Iterator[T] = {
8585
val part = s.asInstanceOf[UnionPartition[T]]
86-
val parentRdd = dependencies(part.parentRddIndex).rdd.asInstanceOf[RDD[T]]
87-
parentRdd.iterator(part.parentPartition, context)
86+
parent[T](part.parentRddIndex).iterator(part.parentPartition, context)
8887
}
8988

9089
override def getPreferredLocations(s: Partition): Seq[String] =

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
4747
{
4848
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
4949
var totalCoreCount = new AtomicInteger(0)
50-
var totalExpectedExecutors = new AtomicInteger(0)
50+
var totalRegisteredExecutors = new AtomicInteger(0)
5151
val conf = scheduler.sc.conf
5252
private val timeout = AkkaUtils.askTimeout(conf)
5353
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
54-
// Submit tasks only after (registered executors / total expected executors)
54+
// Submit tasks only after (registered resources / total expected resources)
5555
// is equal to at least this value, that is double between 0 and 1.
56-
var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
57-
if (minRegisteredRatio > 1) minRegisteredRatio = 1
58-
// Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds).
56+
var minRegisteredRatio =
57+
math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
58+
// Submit tasks after maxRegisteredWaitingTime milliseconds
59+
// if minRegisteredRatio has not yet been reached
5960
val maxRegisteredWaitingTime =
60-
conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000)
61+
conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
6162
val createTime = System.currentTimeMillis()
62-
var ready = if (minRegisteredRatio <= 0) true else false
6363

6464
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
6565
private val executorActor = new HashMap[String, ActorRef]
@@ -94,12 +94,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
9494
executorAddress(executorId) = sender.path.address
9595
addressToExecutorId(sender.path.address) = executorId
9696
totalCoreCount.addAndGet(cores)
97-
if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) {
98-
ready = true
99-
logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " +
100-
executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() +
101-
", minRegisteredExecutorsRatio: " + minRegisteredRatio)
102-
}
97+
totalRegisteredExecutors.addAndGet(1)
10398
makeOffers()
10499
}
105100

@@ -268,14 +263,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
268263
}
269264
}
270265

266+
def sufficientResourcesRegistered(): Boolean = true
267+
271268
override def isReady(): Boolean = {
272-
if (ready) {
269+
if (sufficientResourcesRegistered) {
270+
logInfo("SchedulerBackend is ready for scheduling beginning after " +
271+
s"reached minRegisteredResourcesRatio: $minRegisteredRatio")
273272
return true
274273
}
275274
if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
276-
ready = true
277275
logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
278-
"maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime)
276+
s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTime(ms)")
279277
return true
280278
}
281279
false

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ private[spark] class SparkDeploySchedulerBackend(
3636
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
3737

3838
val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
39+
val totalExpectedCores = maxCores.getOrElse(0)
3940

4041
override def start() {
4142
super.start()
@@ -97,7 +98,6 @@ private[spark] class SparkDeploySchedulerBackend(
9798

9899
override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
99100
memory: Int) {
100-
totalExpectedExecutors.addAndGet(1)
101101
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
102102
fullId, hostPort, cores, Utils.megabytesToString(memory)))
103103
}
@@ -110,4 +110,8 @@ private[spark] class SparkDeploySchedulerBackend(
110110
logInfo("Executor %s removed: %s".format(fullId, message))
111111
removeExecutor(fullId.split("/")(1), reason.toString)
112112
}
113+
114+
override def sufficientResourcesRegistered(): Boolean = {
115+
totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
116+
}
113117
}

docs/configuration.md

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -825,21 +825,22 @@ Apart from these, the following properties are also available, and may be useful
825825
</td>
826826
</tr>
827827
</tr>
828-
<td><code>spark.scheduler.minRegisteredExecutorsRatio</code></td>
828+
<td><code>spark.scheduler.minRegisteredResourcesRatio</code></td>
829829
<td>0</td>
830830
<td>
831-
The minimum ratio of registered executors (registered executors / total expected executors)
831+
The minimum ratio of registered resources (registered resources / total expected resources)
832+
(resources are executors in yarn mode, CPU cores in standalone mode)
832833
to wait for before scheduling begins. Specified as a double between 0 and 1.
833-
Regardless of whether the minimum ratio of executors has been reached,
834+
Regardless of whether the minimum ratio of resources has been reached,
834835
the maximum amount of time it will wait before scheduling begins is controlled by config
835-
<code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code>
836+
<code>spark.scheduler.maxRegisteredResourcesWaitingTime</code>
836837
</td>
837838
</tr>
838839
<tr>
839-
<td><code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code></td>
840+
<td><code>spark.scheduler.maxRegisteredResourcesWaitingTime</code></td>
840841
<td>30000</td>
841842
<td>
842-
Maximum amount of time to wait for executors to register before scheduling begins
843+
Maximum amount of time to wait for resources to register before scheduling begins
843844
(in milliseconds).
844845
</td>
845846
</tr>

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,15 @@ private[spark] class YarnClientSchedulerBackend(
3030
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
3131
with Logging {
3232

33-
if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
33+
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
3434
minRegisteredRatio = 0.8
35-
ready = false
3635
}
3736

3837
var client: Client = null
3938
var appId: ApplicationId = null
4039
var checkerThread: Thread = null
4140
var stopping: Boolean = false
41+
var totalExpectedExecutors = 0
4242

4343
private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
4444
arrayBuf: ArrayBuffer[String]) {
@@ -84,7 +84,7 @@ private[spark] class YarnClientSchedulerBackend(
8484

8585
logDebug("ClientArguments called with: " + argsArrayBuf)
8686
val args = new ClientArguments(argsArrayBuf.toArray, conf)
87-
totalExpectedExecutors.set(args.numExecutors)
87+
totalExpectedExecutors = args.numExecutors
8888
client = new Client(args, conf)
8989
appId = client.runApp()
9090
waitForApp()
@@ -150,4 +150,7 @@ private[spark] class YarnClientSchedulerBackend(
150150
logInfo("Stopped")
151151
}
152152

153+
override def sufficientResourcesRegistered(): Boolean = {
154+
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
155+
}
153156
}

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,24 @@ private[spark] class YarnClusterSchedulerBackend(
2727
sc: SparkContext)
2828
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
2929

30-
if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
30+
var totalExpectedExecutors = 0
31+
32+
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
3133
minRegisteredRatio = 0.8
32-
ready = false
3334
}
3435

3536
override def start() {
3637
super.start()
37-
var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
38+
totalExpectedExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
3839
if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
39-
numExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).getOrElse(numExecutors)
40+
totalExpectedExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES"))
41+
.getOrElse(totalExpectedExecutors)
4042
}
4143
// System property can override environment variable.
42-
numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)
43-
totalExpectedExecutors.set(numExecutors)
44+
totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
45+
}
46+
47+
override def sufficientResourcesRegistered(): Boolean = {
48+
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
4449
}
4550
}

0 commit comments

Comments
 (0)