Skip to content

Commit f60d021

Browse files
committed
Reorganize ReceiverTracker to use an event loop for lock free
1. Add fields into ReceiverTrackingInfo to contain all necessary information for ReceiverInfo. When sending ReceiverInfo to StreamingListener, create ReceiverInfo from ReceiverTrackingInfo directly. 2. Move methods that use ReceiverTrackingInfo into ReceiverTrackerEndpoint so that we can maintain all ReceiverTrackingInfos in an event loop. 3. Move ReceiverTrackingInfo to a separate file ReceiverTrackingInfo.scala. 4. Address some code style issues
1 parent 105037e commit f60d021

File tree

8 files changed

+362
-415
lines changed

8 files changed

+362
-415
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,5 +182,4 @@ private[streaming] class ReceiverSupervisorImpl(
182182
logDebug(s"Cleaning up blocks older then $cleanupThreshTime")
183183
receivedBlockHandler.cleanupOldBlocks(cleanupThreshTime.milliseconds)
184184
}
185-
186185
}

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import org.apache.spark.rpc.RpcEndpointRef
2828
case class ReceiverInfo(
2929
streamId: Int,
3030
name: String,
31-
private[streaming] val endpoint: RpcEndpointRef,
3231
active: Boolean,
3332
location: String,
3433
lastErrorMessage: String = "",

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala

Lines changed: 8 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,8 @@
1717

1818
package org.apache.spark.streaming.scheduler
1919

20+
import scala.collection.Map
2021
import scala.collection.mutable
21-
import scala.util.Random
22-
23-
import org.apache.spark.streaming.scheduler.ReceiverState._
24-
25-
private[streaming] case class ReceiverTrackingInfo(
26-
receiverId: Int,
27-
state: ReceiverState,
28-
scheduledLocations: Option[Seq[String]],
29-
runningLocation: Option[String])
30-
31-
private[streaming] trait ReceiverSchedulingPolicy {
32-
33-
/**
34-
* Return a list of candidate executors to run the receiver. If the list is empty, the caller can
35-
* run this receiver in arbitrary executor.
36-
*/
37-
def scheduleReceiver(
38-
receiverId: Int,
39-
preferredLocation: Option[String],
40-
receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo],
41-
executors: Seq[String]): Seq[String]
42-
}
4322

4423
/**
4524
* A ReceiverScheduler trying to balance executors' load. Here is the approach to schedule executors
@@ -65,8 +44,12 @@ private[streaming] trait ReceiverSchedulingPolicy {
6544
* </ol>
6645
*
6746
*/
68-
private[streaming] class LoadBalanceReceiverSchedulingPolicyImpl extends ReceiverSchedulingPolicy {
47+
private[streaming] class ReceiverSchedulingPolicy {
6948

49+
/**
50+
* Return a list of candidate executors to run the receiver. If the list is empty, the caller can
51+
* run this receiver in arbitrary executor.
52+
*/
7053
def scheduleReceiver(
7154
receiverId: Int,
7255
preferredLocation: Option[String],
@@ -80,17 +63,14 @@ private[streaming] class LoadBalanceReceiverSchedulingPolicyImpl extends Receive
8063
val locations = mutable.Set[String]()
8164
locations ++= preferredLocation
8265

83-
val executorWeights = receiverTrackingInfoMap.filter { case (id, _) =>
84-
// Ignore the receiver to be scheduled. It may be still running.
85-
id != receiverId
86-
}.values.flatMap { receiverTrackingInfo =>
66+
val executorWeights = receiverTrackingInfoMap.values.flatMap { receiverTrackingInfo =>
8767
receiverTrackingInfo.state match {
8868
case ReceiverState.INACTIVE => Nil
8969
case ReceiverState.SCHEDULED =>
9070
val scheduledLocations = receiverTrackingInfo.scheduledLocations.get
9171
// The probability that a scheduled receiver will run in an executor is
9272
// 1.0 / scheduledLocations.size
93-
scheduledLocations.map(location => location -> 1.0 / scheduledLocations.size)
73+
scheduledLocations.map(location => location -> (1.0 / scheduledLocations.size))
9474
case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningLocation.get -> 1.0)
9575
}
9676
}.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor

0 commit comments

Comments
 (0)