Skip to content

Commit 27acd45

Browse files
committed
Add unit tests for LoadBalanceReceiverSchedulerImplSuite
1 parent cc76142 commit 27acd45

File tree

3 files changed

+79
-5
lines changed

3 files changed

+79
-5
lines changed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverScheduler.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,11 @@ package org.apache.spark.streaming.scheduler
2020
import scala.collection.mutable
2121
import scala.util.Random
2222

23-
import org.apache.spark.streaming.StreamingContext
2423
import org.apache.spark.streaming.scheduler.ReceiverState._
2524

2625
private[streaming] case class ReceiverTrackingInfo(
2726
receiverId: Int,
2827
state: ReceiverState,
29-
preferredLocation: Option[String],
3028
scheduledLocations: Option[Seq[String]],
3129
runningLocation: Option[String])
3230

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
450450
receiverTrackingInfos.put(receiver.streamId, ReceiverTrackingInfo(
451451
receiver.streamId,
452452
ReceiverState.INACTIVE,
453-
receiver.preferredLocation,
454453
None,
455454
None))
456455
}
@@ -468,7 +467,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
468467
receiverTrackingInfos.put(receiverId, ReceiverTrackingInfo(
469468
receiverId,
470469
ReceiverState.SCHEDULED,
471-
receiverPreferredLocations(receiverId),
472470
Some(scheduledLocations),
473471
None))
474472
}
@@ -478,7 +476,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
478476
receiverTrackingInfos.put(receiverId, ReceiverTrackingInfo(
479477
receiverId,
480478
ReceiverState.ACTIVE,
481-
receiverPreferredLocations(receiverId),
482479
None,
483480
Some(runningLocation)))
484481
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.scheduler
19+
20+
import org.apache.spark.SparkFunSuite
21+
22+
class LoadBalanceReceiverSchedulerImplSuite extends SparkFunSuite {
23+
24+
val receiverScheduler = new LoadBalanceReceiverSchedulerImpl
25+
26+
test("empty executors") {
27+
val scheduledLocations =
28+
receiverScheduler.scheduleReceiver(0, None, Map.empty, executors = Seq.empty)
29+
assert(scheduledLocations === Seq.empty)
30+
}
31+
32+
test("receiver preferredLocation") {
33+
val receiverTrackingInfoMap = Map(
34+
0 -> ReceiverTrackingInfo(0, ReceiverState.INACTIVE, None, None))
35+
val scheduledLocations = receiverScheduler.scheduleReceiver(
36+
0, Some("host1"), receiverTrackingInfoMap, executors = Seq("host2"))
37+
assert(scheduledLocations.toSet === Set("host1", "host2"))
38+
}
39+
40+
test("choose the idle executor") {
41+
val executors = Seq("host1", "host2", "host3")
42+
// host3 is idle
43+
val receiverTrackingInfoMap = Map(
44+
0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")),
45+
1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host2")), None))
46+
val scheduledLocations = receiverScheduler.scheduleReceiver(
47+
2, None, receiverTrackingInfoMap, executors)
48+
assert(scheduledLocations.toSet === Set("host3"))
49+
}
50+
51+
test("all executors are busy") {
52+
val executors = Seq("host1", "host2", "host3")
53+
// Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0
54+
val receiverTrackingInfoMap = Map(
55+
0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")),
56+
1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host2", "host3")), None),
57+
2 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None))
58+
val scheduledLocations = receiverScheduler.scheduleReceiver(
59+
3, None, receiverTrackingInfoMap, executors)
60+
assert(scheduledLocations.toSet === Set("host2"))
61+
}
62+
63+
test("ignore the receiver's info") {
64+
val executors = Seq("host1", "host2", "host3")
65+
// Weights: host1 = 1.0, host2 = 1.5, host3 = 1.5
66+
// But since we are scheduling the receiver 1, we should ignore
67+
// receiver 1's ReceiverTrackingInfo
68+
// So the new weights are host1 = 1.0, host2 = 0.5, host3 = 1.5
69+
// Then the scheduled location should be "host2"
70+
val receiverTrackingInfoMap = Map(
71+
0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")),
72+
1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host2")), None),
73+
2 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host3")), None),
74+
3 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host2", "host3")), None))
75+
val scheduledLocations = receiverScheduler.scheduleReceiver(
76+
1, None, receiverTrackingInfoMap, executors)
77+
assert(scheduledLocations.toSet === Set("host2"))
78+
}
79+
}

0 commit comments

Comments
 (0)