Skip to content

Commit 715ef9c

Browse files
committed
Rename: scheduledLocations -> scheduledExecutors; locations -> executors
1 parent 05daf9c commit 715ef9c

File tree

4 files changed

+82
-83
lines changed

4 files changed

+82
-83
lines changed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -42,55 +42,54 @@ private[streaming] class ReceiverSchedulingPolicy {
4242
}
4343

4444
val hostToExecutors = executors.groupBy(_.split(":")(0))
45-
val locations = new Array[mutable.ArrayBuffer[String]](receivers.length)
45+
val scheduledExecutors = Array.fill(receivers.length)(new mutable.ArrayBuffer[String])
4646
val numReceiversOnExecutor = mutable.HashMap[String, Int]()
4747
// Set the initial value to 0
4848
executors.foreach(numReceiversOnExecutor(_) = 0)
4949

5050
// Firstly, we need to respect "preferredLocation". So if a receiver has "preferredLocation",
51-
// we need to make sure the "preferredLocation" is in the candidate location list.
51+
// we need to make sure the "preferredLocation" is in the candidate scheduled executor list.
5252
for (i <- 0 until receivers.length) {
53-
locations(i) = new mutable.ArrayBuffer[String]()
5453
// Note: preferredLocation is host but executors are host:port
5554
receivers(i).preferredLocation.foreach { host =>
5655
hostToExecutors.get(host) match {
5756
case Some(executorsOnHost) =>
5857
// preferredLocation is a known host. Select an executor that has the least receivers in
5958
// this host
60-
val scheduledLocation =
59+
val leastScheduledExecutor =
6160
executorsOnHost.minBy(executor => numReceiversOnExecutor(executor))
62-
locations(i) += scheduledLocation
63-
numReceiversOnExecutor(scheduledLocation) =
64-
numReceiversOnExecutor(scheduledLocation) + 1
61+
scheduledExecutors(i) += leastScheduledExecutor
62+
numReceiversOnExecutor(leastScheduledExecutor) =
63+
numReceiversOnExecutor(leastScheduledExecutor) + 1
6564
case None =>
6665
// preferredLocation is an unknown host.
6766
// Note: There are two cases:
6867
// 1. This executor is not up. But it may be up later.
6968
// 2. This executor is dead, or it's not a host in the cluster.
70-
// Currently, simply add host to the scheduled locations
71-
locations(i) += host
69+
// Currently, simply add host to the scheduled executors.
70+
scheduledExecutors(i) += host
7271
}
7372
}
7473
}
7574

7675
// For those receivers that don't have preferredLocation, make sure we assign at least one
7776
// executor to them.
78-
for (scheduledLocations <- locations.filter(_.isEmpty)) {
77+
for (scheduledExecutorsForOneReceiver <- scheduledExecutors.filter(_.isEmpty)) {
7978
// Select the executor that has the least receivers
80-
val (executor, numReceivers) = numReceiversOnExecutor.minBy(_._2)
81-
scheduledLocations += executor
82-
numReceiversOnExecutor(executor) = numReceivers + 1
79+
val (leastScheduledExecutor, numReceivers) = numReceiversOnExecutor.minBy(_._2)
80+
scheduledExecutorsForOneReceiver += leastScheduledExecutor
81+
numReceiversOnExecutor(leastScheduledExecutor) = numReceivers + 1
8382
}
8483

8584
// Assign idle executors to receivers that have less executors
8685
val idleExecutors = numReceiversOnExecutor.filter(_._2 == 0).map(_._1)
8786
for (executor <- idleExecutors) {
88-
// Assign an idle executor to the receiver that has least locations.
89-
val scheduledLocations = locations.minBy(_.size)
90-
scheduledLocations += executor
87+
// Assign an idle executor to the receiver that has least candidate executors.
88+
val leastScheduledExecutors = scheduledExecutors.minBy(_.size)
89+
leastScheduledExecutors += executor
9190
}
9291

93-
receivers.map(_.streamId).zip(locations).toMap
92+
receivers.map(_.streamId).zip(scheduledExecutors).toMap
9493
}
9594

9695
/**
@@ -131,31 +130,31 @@ private[streaming] class ReceiverSchedulingPolicy {
131130
}
132131

133132
// Always try to schedule to the preferred locations
134-
val locations = mutable.Set[String]()
135-
locations ++= preferredLocation
133+
val scheduledExecutors = mutable.Set[String]()
134+
scheduledExecutors ++= preferredLocation
136135

137136
val executorWeights = receiverTrackingInfoMap.values.flatMap { receiverTrackingInfo =>
138137
receiverTrackingInfo.state match {
139138
case ReceiverState.INACTIVE => Nil
140139
case ReceiverState.SCHEDULED =>
141-
val scheduledLocations = receiverTrackingInfo.scheduledLocations.get
140+
val scheduledExecutors = receiverTrackingInfo.scheduledExecutors.get
142141
// The probability that a scheduled receiver will run in an executor is
143142
// 1.0 / scheduledLocations.size
144-
scheduledLocations.map(location => location -> (1.0 / scheduledLocations.size))
145-
case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningLocation.get -> 1.0)
143+
scheduledExecutors.map(location => location -> (1.0 / scheduledExecutors.size))
144+
case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0)
146145
}
147146
}.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor
148147

149148
val idleExecutors = (executors.toSet -- executorWeights.keys).toSeq
150149
if (idleExecutors.size >= 3) {
151150
// If there are more than 3 idle executors, return all of them
152-
locations ++= idleExecutors
151+
scheduledExecutors ++= idleExecutors
153152
} else {
154153
// If there are less than 3 idle executors, return 3 best options
155-
locations ++= idleExecutors
154+
scheduledExecutors ++= idleExecutors
156155
val sortedExecutors = executorWeights.toSeq.sortBy(_._2).map(_._1)
157-
locations ++= (idleExecutors ++ sortedExecutors).take(3)
156+
scheduledExecutors ++= (idleExecutors ++ sortedExecutors).take(3)
158157
}
159-
locations.toSeq
158+
scheduledExecutors.toSeq
160159
}
161160
}

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -249,8 +249,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
249249
val receiverTrackingInfo = ReceiverTrackingInfo(
250250
streamId,
251251
ReceiverState.ACTIVE,
252-
scheduledLocations = None,
253-
runningLocation = Some(hostPort),
252+
scheduledExecutors = None,
253+
runningExecutor = Some(hostPort),
254254
name = Some(name),
255255
endpoint = Some(receiverEndpoint))
256256
receiverTrackingInfos.put(streamId, receiverTrackingInfo)
@@ -321,23 +321,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
321321

322322
private def scheduleReceiver(receiverId: Int): Seq[String] = {
323323
val preferredLocation = receiverPreferredLocations.getOrElse(receiverId, None)
324-
val scheduledLocations = schedulingPolicy.rescheduleReceiver(
324+
val scheduledExecutors = schedulingPolicy.rescheduleReceiver(
325325
receiverId, preferredLocation, receiverTrackingInfos, getExecutors)
326-
updateReceiverScheduledLocations(receiverId, scheduledLocations)
327-
scheduledLocations
326+
updateReceiverScheduledExecutors(receiverId, scheduledExecutors)
327+
scheduledExecutors
328328
}
329329

330-
private def updateReceiverScheduledLocations(
331-
receiverId: Int, scheduledLocations: Seq[String]): Unit = {
330+
private def updateReceiverScheduledExecutors(
331+
receiverId: Int, scheduledExecutors: Seq[String]): Unit = {
332332
val newReceiverTrackingInfo = receiverTrackingInfos.get(receiverId) match {
333333
case Some(oldInfo) =>
334334
oldInfo.copy(state = ReceiverState.SCHEDULED,
335-
scheduledLocations = Some(scheduledLocations))
335+
scheduledExecutors = Some(scheduledExecutors))
336336
case None =>
337337
ReceiverTrackingInfo(
338338
receiverId,
339339
ReceiverState.SCHEDULED,
340-
Some(scheduledLocations),
340+
Some(scheduledExecutors),
341341
None)
342342
}
343343
receiverTrackingInfos.put(receiverId, newReceiverTrackingInfo)
@@ -412,21 +412,21 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
412412
override def receive: PartialFunction[Any, Unit] = {
413413
// Local messages
414414
case StartAllReceivers(receivers) =>
415-
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
415+
val scheduledExecutors = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
416416
for (receiver <- receivers) {
417-
val locations = scheduledLocations(receiver.streamId)
418-
updateReceiverScheduledLocations(receiver.streamId, locations)
417+
val executors = scheduledExecutors(receiver.streamId)
418+
updateReceiverScheduledExecutors(receiver.streamId, executors)
419419
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
420-
startReceiver(receiver, locations)
420+
startReceiver(receiver, executors)
421421
}
422422
case RestartReceiver(receiver) =>
423-
val scheduledLocations = schedulingPolicy.rescheduleReceiver(
423+
val scheduledExecutors = schedulingPolicy.rescheduleReceiver(
424424
receiver.streamId,
425425
receiver.preferredLocation,
426426
receiverTrackingInfos,
427427
getExecutors)
428-
updateReceiverScheduledLocations(receiver.streamId, scheduledLocations)
429-
startReceiver(receiver, scheduledLocations)
428+
updateReceiverScheduledExecutors(receiver.streamId, scheduledExecutors)
429+
startReceiver(receiver, scheduledExecutors)
430430
case c: CleanupOldBlocks =>
431431
receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
432432
case UpdateReceiverRateLimit(streamUID, newRate) =>
@@ -459,9 +459,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
459459
}
460460

461461
/**
462-
* Start a receiver along with its scheduled locations
462+
* Start a receiver along with its scheduled executors
463463
*/
464-
private def startReceiver(receiver: Receiver[_], scheduledLocations: Seq[String]): Unit = {
464+
private def startReceiver(receiver: Receiver[_], scheduledExecutors: Seq[String]): Unit = {
465465
val receiverId = receiver.streamId
466466
if (!isTrackerStarted) {
467467
onReceiverJobFinish(receiverId)
@@ -475,12 +475,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
475475
// Function to start the receiver on the worker node
476476
val startReceiverFunc = new StartReceiverFunc(checkpointDirOption, serializableHadoopConf)
477477

478-
// Create the RDD using the scheduledLocations to run the receiver in a Spark job
478+
// Create the RDD using the scheduledExecutors to run the receiver in a Spark job
479479
val receiverRDD: RDD[Receiver[_]] =
480-
if (scheduledLocations.isEmpty) {
480+
if (scheduledExecutors.isEmpty) {
481481
ssc.sc.makeRDD(Seq(receiver), 1)
482482
} else {
483-
ssc.sc.makeRDD(Seq(receiver -> scheduledLocations))
483+
ssc.sc.makeRDD(Seq(receiver -> scheduledExecutors))
484484
}
485485
receiverRDD.setName(s"Receiver $receiverId")
486486
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,17 @@ private[streaming] case class ReceiverErrorInfo(
2828
*
2929
* @param receiverId the unique receiver id
3030
* @param state the current Receiver state
31-
* @param scheduledLocations the scheduled locations provided by ReceiverSchedulingPolicy
32-
* @param runningLocation the running location if the receiver is active
31+
* @param scheduledExecutors the scheduled executors provided by ReceiverSchedulingPolicy
32+
* @param runningExecutor the running executor if the receiver is active
3333
* @param name the receiver name
3434
* @param endpoint the receiver endpoint. It can be used to send messages to the receiver
3535
* @param errorInfo the receiver error information if it fails
3636
*/
3737
private[streaming] case class ReceiverTrackingInfo(
3838
receiverId: Int,
3939
state: ReceiverState,
40-
scheduledLocations: Option[Seq[String]],
41-
runningLocation: Option[String],
40+
scheduledExecutors: Option[Seq[String]],
41+
runningExecutor: Option[String],
4242
name: Option[String] = None,
4343
endpoint: Option[RpcEndpointRef] = None,
4444
errorInfo: Option[ReceiverErrorInfo] = None) {
@@ -47,7 +47,7 @@ private[streaming] case class ReceiverTrackingInfo(
4747
receiverId,
4848
name.getOrElse(""),
4949
state == ReceiverState.ACTIVE,
50-
location = runningLocation.getOrElse(""),
50+
location = runningExecutor.getOrElse(""),
5151
lastErrorMessage = errorInfo.map(_.lastErrorMessage).getOrElse(""),
5252
lastError = errorInfo.map(_.lastError).getOrElse(""),
5353
lastErrorTime = errorInfo.map(_.lastErrorTime).getOrElse(-1L)

streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,27 +28,27 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
2828
val receiverSchedulingPolicy = new ReceiverSchedulingPolicy
2929

3030
test("rescheduleReceiver: empty executors") {
31-
val scheduledLocations =
31+
val scheduledExecutors =
3232
receiverSchedulingPolicy.rescheduleReceiver(0, None, Map.empty, executors = Seq.empty)
33-
assert(scheduledLocations === Seq.empty)
33+
assert(scheduledExecutors === Seq.empty)
3434
}
3535

3636
test("rescheduleReceiver: receiver preferredLocation") {
3737
val receiverTrackingInfoMap = Map(
3838
0 -> ReceiverTrackingInfo(0, ReceiverState.INACTIVE, None, None))
39-
val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver(
39+
val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
4040
0, Some("host1"), receiverTrackingInfoMap, executors = Seq("host2"))
41-
assert(scheduledLocations.toSet === Set("host1", "host2"))
41+
assert(scheduledExecutors.toSet === Set("host1", "host2"))
4242
}
4343

4444
test("rescheduleReceiver: return all idle executors if more than 3 idle executors") {
4545
val executors = Seq("host1", "host2", "host3", "host4", "host5")
4646
// host3 is idle
4747
val receiverTrackingInfoMap = Map(
4848
0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")))
49-
val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver(
49+
val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
5050
1, None, receiverTrackingInfoMap, executors)
51-
assert(scheduledLocations.toSet === Set("host2", "host3", "host4", "host5"))
51+
assert(scheduledExecutors.toSet === Set("host2", "host3", "host4", "host5"))
5252
}
5353

5454
test("rescheduleReceiver: return 3 best options if less than 3 idle executors") {
@@ -59,21 +59,21 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
5959
0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")),
6060
1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host2", "host3")), None),
6161
2 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None))
62-
val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver(
62+
val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
6363
3, None, receiverTrackingInfoMap, executors)
64-
assert(scheduledLocations.toSet === Set("host2", "host4", "host5"))
64+
assert(scheduledExecutors.toSet === Set("host2", "host4", "host5"))
6565
}
6666

6767
test("scheduleReceivers: " +
6868
"schedule receivers evenly when there are more receivers than executors") {
6969
val receivers = (0 until 6).map(new DummyReceiver(_))
7070
val executors = (10000 until 10003).map(port => s"localhost:${port}")
71-
val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
71+
val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
7272
val numReceiversOnExecutor = mutable.HashMap[String, Int]()
73-
// There should be 2 receivers running on each executor and each receiver has one location
74-
scheduledLocations.foreach { case (receiverId, locations) =>
75-
assert(locations.size == 1)
76-
numReceiversOnExecutor(locations(0)) = numReceiversOnExecutor.getOrElse(locations(0), 0) + 1
73+
// There should be 2 receivers running on each executor and each receiver has one executor
74+
scheduledExecutors.foreach { case (receiverId, executors) =>
75+
assert(executors.size == 1)
76+
numReceiversOnExecutor(executors(0)) = numReceiversOnExecutor.getOrElse(executors(0), 0) + 1
7777
}
7878
assert(numReceiversOnExecutor === executors.map(_ -> 2).toMap)
7979
}
@@ -83,12 +83,12 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
8383
"schedule receivers evenly when there are more executors than receivers") {
8484
val receivers = (0 until 3).map(new DummyReceiver(_))
8585
val executors = (10000 until 10006).map(port => s"localhost:${port}")
86-
val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
86+
val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
8787
val numReceiversOnExecutor = mutable.HashMap[String, Int]()
88-
// There should be 1 receiver running on each executor and each receiver has two locations
89-
scheduledLocations.foreach { case (receiverId, locations) =>
90-
assert(locations.size == 2)
91-
locations.foreach { l =>
88+
// There should be 1 receiver running on each executor and each receiver has two executors
89+
scheduledExecutors.foreach { case (receiverId, executors) =>
90+
assert(executors.size == 2)
91+
executors.foreach { l =>
9292
numReceiversOnExecutor(l) = numReceiversOnExecutor.getOrElse(l, 0) + 1
9393
}
9494
}
@@ -100,33 +100,33 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
100100
(3 until 6).map(new DummyReceiver(_, Some("localhost")))
101101
val executors = (10000 until 10003).map(port => s"localhost:${port}") ++
102102
(10003 until 10006).map(port => s"localhost2:${port}")
103-
val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
103+
val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
104104
val numReceiversOnExecutor = mutable.HashMap[String, Int]()
105-
// There should be 1 receiver running on each executor and each receiver has 1 location
106-
scheduledLocations.foreach { case (receiverId, locations) =>
107-
assert(locations.size == 1)
108-
locations.foreach { l =>
105+
// There should be 1 receiver running on each executor and each receiver has 1 executor
106+
scheduledExecutors.foreach { case (receiverId, executors) =>
107+
assert(executors.size == 1)
108+
executors.foreach { l =>
109109
numReceiversOnExecutor(l) = numReceiversOnExecutor.getOrElse(l, 0) + 1
110110
}
111111
}
112112
assert(numReceiversOnExecutor === executors.map(_ -> 1).toMap)
113113
// Make sure we schedule the receivers to their preferredLocations
114-
val locationsForReceiversWithPreferredLocation =
115-
scheduledLocations.filter { case (receiverId, locations) => receiverId >= 3 }.flatMap(_._2)
116-
// We can simply check the location set because we only know each receiver only has 1 location
117-
assert(locationsForReceiversWithPreferredLocation.toSet ===
114+
val executorsForReceiversWithPreferredLocation =
115+
scheduledExecutors.filter { case (receiverId, executors) => receiverId >= 3 }.flatMap(_._2)
116+
// We can simply check the executor set because we only know each receiver only has 1 executor
117+
assert(executorsForReceiversWithPreferredLocation.toSet ===
118118
(10000 until 10003).map(port => s"localhost:${port}").toSet)
119119
}
120120

121121
test("scheduleReceivers: return empty if no receiver") {
122122
assert(receiverSchedulingPolicy.scheduleReceivers(Seq.empty, Seq("localhost:10000")).isEmpty)
123123
}
124124

125-
test("scheduleReceivers: return empty locations if no executors") {
125+
test("scheduleReceivers: return empty scheduled executors if no executors") {
126126
val receivers = (0 until 3).map(new DummyReceiver(_))
127-
val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, Seq.empty)
128-
scheduledLocations.foreach { case (receiverId, locations) =>
129-
assert(locations.isEmpty)
127+
val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, Seq.empty)
128+
scheduledExecutors.foreach { case (receiverId, executors) =>
129+
assert(executors.isEmpty)
130130
}
131131
}
132132
}

0 commit comments

Comments
 (0)