Skip to content

Commit 5e1fa48

Browse files
committed
Fix the code style
1 parent 7451498 commit 5e1fa48

File tree

2 files changed

+10
-2
lines changed

2 files changed

+10
-2
lines changed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,14 @@ private[streaming] class ReceiverSchedulingPolicy {
2929
* `preferredLocation`s of receivers are not even, we may not be able to schedule them evenly
3030
* because we have to respect them.
3131
*
32+
* Here is the approach to schedule executors:
33+
* <ol>
34+
* <li>First, schedule all the receivers with preferred locations (hosts), evenly among the
35+
* executors running on those host.</li>
36+
* <li>Then, schedule all other receivers evenly among all the executors such that overall
37+
* distribution over all the receivers is even.</li>
38+
* </ol>
39+
*
3240
* This method is called when we start to launch receivers at the first time.
3341
*/
3442
def scheduleReceivers(
@@ -45,7 +53,7 @@ private[streaming] class ReceiverSchedulingPolicy {
4553
val scheduledExecutors = Array.fill(receivers.length)(new mutable.ArrayBuffer[String])
4654
val numReceiversOnExecutor = mutable.HashMap[String, Int]()
4755
// Set the initial value to 0
48-
executors.foreach(numReceiversOnExecutor(_) = 0)
56+
executors.foreach(e => numReceiversOnExecutor(e) = 0)
4957

5058
// Firstly, we need to respect "preferredLocation". So if a receiver has "preferredLocation",
5159
// we need to make sure the "preferredLocation" is in the candidate scheduled executor list.

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
338338
receiverId,
339339
ReceiverState.SCHEDULED,
340340
Some(scheduledExecutors),
341-
None)
341+
runningExecutor = None)
342342
}
343343
receiverTrackingInfos.put(receiverId, newReceiverTrackingInfo)
344344
}

0 commit comments

Comments
 (0)