Skip to content

Commit 8f93c8d

Browse files
committed
Use hostPort as the receiver location rather than host; fix comments and unit tests
1 parent 59f8887 commit 8f93c8d

File tree

5 files changed

+63
-56
lines changed

5 files changed

+63
-56
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer
2424
import scala.concurrent._
2525
import scala.util.control.NonFatal
2626

27-
import org.apache.spark.{Logging, SparkConf}
27+
import org.apache.spark.{SparkEnv, Logging, SparkConf}
2828
import org.apache.spark.storage.StreamBlockId
2929
import org.apache.spark.util.{Utils, ThreadUtils}
3030

@@ -65,8 +65,6 @@ private[streaming] abstract class ReceiverSupervisor(
6565
/** State of the receiver */
6666
@volatile private[streaming] var receiverState = Initialized
6767

68-
protected val host = Utils.localHostName()
69-
7068
/** Push a single data item to backend data store. */
7169
def pushSingle(data: Any)
7270

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ private[streaming] class ReceiverSupervisorImpl(
4646
checkpointDirOption: Option[String]
4747
) extends ReceiverSupervisor(receiver, env.conf) with Logging {
4848

49+
private val hostPort = SparkEnv.get.blockManager.blockManagerId.hostPort
50+
4951
private val receivedBlockHandler: ReceivedBlockHandler = {
5052
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
5153
if (checkpointDirOption.isEmpty) {
@@ -164,7 +166,7 @@ private[streaming] class ReceiverSupervisorImpl(
164166

165167
override protected def onReceiverStart(): Boolean = {
166168
val msg = RegisterReceiver(
167-
streamId, receiver.getClass.getSimpleName, host, endpoint)
169+
streamId, receiver.getClass.getSimpleName, hostPort, endpoint)
168170
trackerEndpoint.askWithRetry[Boolean](msg)
169171
}
170172

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,30 +22,6 @@ import scala.collection.mutable
2222

2323
import org.apache.spark.streaming.receiver.Receiver
2424

25-
/**
26-
* A ReceiverScheduler trying to balance executors' load. Here is the approach to schedule executors
27-
* for a receiver.
28-
* <ol>
29-
* <li>
30-
* If preferredLocation is set, preferredLocation should be one of the candidate executors.
31-
* </li>
32-
* <li>
33-
* Every executor will be assigned to a weight according to the receivers running or scheduling
34-
* on it.
35-
* <ul>
36-
* <li>
37-
* If a receiver is running on an executor, it contributes 1.0 to the executor's weight.
38-
* </li>
39-
* <li>
40-
* If a receiver is scheduled to an executor but has not yet run, it contributes
41-
* `1.0 / #candidate_executors_of_this_receiver` to the executor's weight.</li>
42-
* </ul>
43-
* At last, if there are more than 3 idle executors (weight = 0), returns all idle executors.
44-
* Otherwise, we only return 3 best options according to the weights.
45-
* </li>
46-
* </ol>
47-
*
48-
*/
4925
private[streaming] class ReceiverSchedulingPolicy {
5026

5127
/**
@@ -61,7 +37,9 @@ private[streaming] class ReceiverSchedulingPolicy {
6137
return Map.empty
6238
}
6339

64-
require(executors.nonEmpty, "There is no executor up")
40+
if (executors.isEmpty) {
41+
return receivers.map(_.streamId -> Seq.empty).toMap
42+
}
6543

6644
val hostToExecutors = executors.groupBy(_.split(":")(0))
6745
val locations = new Array[mutable.ArrayBuffer[String]](receivers.length)
@@ -119,6 +97,28 @@ private[streaming] class ReceiverSchedulingPolicy {
11997
* Return a list of candidate executors to run the receiver. If the list is empty, the caller can
12098
* run this receiver in arbitrary executor.
12199
*
100+
* This method tries to balance executors' load. Here is the approach to schedule executors
101+
* for a receiver.
102+
* <ol>
103+
* <li>
104+
* If preferredLocation is set, preferredLocation should be one of the candidate executors.
105+
* </li>
106+
* <li>
107+
* Every executor will be assigned to a weight according to the receivers running or
108+
* scheduling on it.
109+
* <ul>
110+
* <li>
111+
* If a receiver is running on an executor, it contributes 1.0 to the executor's weight.
112+
* </li>
113+
* <li>
114+
* If a receiver is scheduled to an executor but has not yet run, it contributes
115+
* `1.0 / #candidate_executors_of_this_receiver` to the executor's weight.</li>
116+
* </ul>
117+
* At last, if there are more than 3 idle executors (weight = 0), returns all idle executors.
118+
* Otherwise, we only return 3 best options according to the weights.
119+
* </li>
120+
* </ol>
121+
*
122122
* This method is called when a receiver is registering with ReceiverTracker or is restarting.
123123
*/
124124
def rescheduleReceiver(

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.language.existentials
2525
import scala.util.{Failure, Success}
2626

2727
import org.apache.spark.streaming.util.WriteAheadLogUtils
28-
import org.apache.spark.{TaskContext, Logging, SparkEnv, SparkException}
28+
import org.apache.spark._
2929
import org.apache.spark.rdd.RDD
3030
import org.apache.spark.rpc._
3131
import org.apache.spark.streaming.{StreamingContext, Time}
@@ -47,7 +47,7 @@ private[streaming] sealed trait ReceiverTrackerMessage
4747
private[streaming] case class RegisterReceiver(
4848
streamId: Int,
4949
typ: String,
50-
host: String,
50+
hostPort: String,
5151
receiverEndpoint: RpcEndpointRef
5252
) extends ReceiverTrackerMessage
5353
private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)
@@ -130,12 +130,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
130130

131131
/**
132132
* Track all receivers' information. The key is the receiver id, the value is the receiver info.
133+
* It's only accessed in ReceiverTrackerEndpoint.
133134
*/
134135
private val receiverTrackingInfos = new HashMap[Int, ReceiverTrackingInfo]
135136

136137
/**
137138
* Store all preferred locations for all receivers. We need this information to schedule
138-
* receivers
139+
* receivers. It's only accessed in ReceiverTrackerEndpoint.
139140
*/
140141
private val receiverPreferredLocations = new HashMap[Int, Option[String]]
141142

@@ -227,7 +228,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
227228
private def registerReceiver(
228229
streamId: Int,
229230
typ: String,
230-
host: String,
231+
hostPort: String,
231232
receiverEndpoint: RpcEndpointRef,
232233
senderAddress: RpcAddress
233234
): Boolean = {
@@ -237,19 +238,18 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
237238

238239
if (isTrackerStopping || isTrackerStopped) {
239240
false
240-
} else if (!ssc.sparkContext.isLocal && // We don't need to schedule it in the local mode
241-
!scheduleReceiver(streamId).contains(host)) {
241+
} else if (!scheduleReceiver(streamId).contains(hostPort)) {
242242
// Refuse it since it's scheduled to a wrong executor
243243
false
244244
} else {
245245
val name = s"${typ}-${streamId}"
246-
val receiverInfo = ReceiverInfo(streamId, name, true, host)
246+
val receiverInfo = ReceiverInfo(streamId, name, true, hostPort)
247247
receiverTrackingInfos.put(streamId,
248248
ReceiverTrackingInfo(
249249
streamId,
250250
ReceiverState.ACTIVE,
251251
scheduledLocations = None,
252-
runningLocation = Some(host),
252+
runningLocation = Some(hostPort),
253253
name = Some(name),
254254
endpoint = Some(receiverEndpoint)))
255255
listenerBus.post(StreamingListenerReceiverStarted(receiverInfo))
@@ -344,13 +344,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
344344
/**
345345
* Get the list of executors excluding driver
346346
*/
347-
private def getExecutors: List[String] = {
347+
private def getExecutors: Seq[String] = {
348348
if (ssc.sc.isLocal) {
349-
List("localhost")
349+
Seq(ssc.sparkContext.env.blockManager.blockManagerId.hostPort)
350350
} else {
351-
val executors = ssc.sparkContext.getExecutorMemoryStatus.map(_._1.split(":")(0)).toList
352-
val driver = ssc.sparkContext.getConf.get("spark.driver.host")
353-
executors.diff(List(driver))
351+
ssc.sparkContext.env.blockManager.master.getMemoryStatus.filter { case (blockManagerId, _) =>
352+
blockManagerId.executorId != SparkContext.DRIVER_IDENTIFIER // Ignore the driver location
353+
}.map { case (blockManagerId, _) => blockManagerId.hostPort }.toSeq
354354
}
355355
}
356356

@@ -383,7 +383,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
383383
runDummySparkJob()
384384

385385
logInfo("Starting " + receivers.length + " receivers")
386-
endpoint.send(StartAllReceivers)
386+
endpoint.send(StartAllReceivers(receivers))
387387
}
388388

389389
/** Check if tracker has been marked for starting */
@@ -429,9 +429,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
429429

430430
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
431431
// Remote messages
432-
case RegisterReceiver(streamId, typ, host, receiverEndpoint) =>
432+
case RegisterReceiver(streamId, typ, hostPort, receiverEndpoint) =>
433433
val successful =
434-
registerReceiver(streamId, typ, host, receiverEndpoint, context.sender.address)
434+
registerReceiver(streamId, typ, hostPort, receiverEndpoint, context.sender.address)
435435
context.reply(successful)
436436
case AddBlock(receivedBlockInfo) =>
437437
context.reply(addBlock(receivedBlockInfo))

streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,21 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
2727

2828
val receiverSchedulingPolicy = new ReceiverSchedulingPolicy
2929

30-
test("empty executors") {
30+
test("rescheduleReceiver: empty executors") {
3131
val scheduledLocations =
3232
receiverSchedulingPolicy.rescheduleReceiver(0, None, Map.empty, executors = Seq.empty)
3333
assert(scheduledLocations === Seq.empty)
3434
}
3535

36-
test("receiver preferredLocation") {
36+
test("rescheduleReceiver: receiver preferredLocation") {
3737
val receiverTrackingInfoMap = Map(
3838
0 -> ReceiverTrackingInfo(0, ReceiverState.INACTIVE, None, None))
3939
val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver(
4040
0, Some("host1"), receiverTrackingInfoMap, executors = Seq("host2"))
4141
assert(scheduledLocations.toSet === Set("host1", "host2"))
4242
}
4343

44-
test("return all idle executors if more than 3 idle executors") {
44+
test("rescheduleReceiver: return all idle executors if more than 3 idle executors") {
4545
val executors = Seq("host1", "host2", "host3", "host4", "host5")
4646
// host3 is idle
4747
val receiverTrackingInfoMap = Map(
@@ -51,7 +51,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
5151
assert(scheduledLocations.toSet === Set("host2", "host3", "host4", "host5"))
5252
}
5353

54-
test("return 3 best options if less than 3 idle executors") {
54+
test("rescheduleReceiver: return 3 best options if less than 3 idle executors") {
5555
val executors = Seq("host1", "host2", "host3", "host4", "host5")
5656
// Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0
5757
// host4 and host5 are idle
@@ -64,8 +64,8 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
6464
assert(scheduledLocations.toSet === Set("host2", "host4", "host5"))
6565
}
6666

67-
test("scheduleReceivers should schedule receivers evenly " +
68-
"when there are more receivers than executors") {
67+
test("scheduleReceivers: " +
68+
"schedule receivers evenly when there are more receivers than executors") {
6969
val receivers = (0 until 6).map(new DummyReceiver(_))
7070
val executors = (10000 until 10003).map(port => s"localhost:${port}")
7171
val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
@@ -79,8 +79,8 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
7979
}
8080

8181

82-
test("scheduleReceivers should schedule receivers evenly " +
83-
"when there are more executors than receivers") {
82+
test("scheduleReceivers: " +
83+
"schedule receivers evenly when there are more executors than receivers") {
8484
val receivers = (0 until 3).map(new DummyReceiver(_))
8585
val executors = (10000 until 10006).map(port => s"localhost:${port}")
8686
val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
@@ -95,8 +95,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
9595
assert(numReceiversOnExecutor === executors.map(_ -> 1).toMap)
9696
}
9797

98-
test("scheduleReceivers should schedule receivers evenly " +
99-
"when the preferredLocations are even") {
98+
test("scheduleReceivers: schedule receivers evenly when the preferredLocations are even") {
10099
val receivers = (0 until 3).map(new DummyReceiver(_)) ++
101100
(3 until 6).map(new DummyReceiver(_, Some("localhost")))
102101
val executors = (10000 until 10003).map(port => s"localhost:${port}") ++
@@ -119,9 +118,17 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
119118
(10000 until 10003).map(port => s"localhost:${port}").toSet)
120119
}
121120

122-
test("scheduleReceivers should return empty if no receiver") {
121+
test("scheduleReceivers: return empty if no receiver") {
123122
assert(receiverSchedulingPolicy.scheduleReceivers(Seq.empty, Seq("localhost:10000")).isEmpty)
124123
}
124+
125+
test("scheduleReceivers: return empty locations if no executors") {
126+
val receivers = (0 until 3).map(new DummyReceiver(_))
127+
val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, Seq.empty)
128+
scheduledLocations.foreach { case (receiverId, locations) =>
129+
assert(locations.isEmpty)
130+
}
131+
}
125132
}
126133

127134
/**

0 commit comments

Comments
 (0)