Skip to content

Commit daa1964

Browse files
zsxwingtdas
authored andcommitted
[SPARK-8882] [STREAMING] Add a new Receiver scheduling mechanism
The design doc: https://docs.google.com/document/d/1ZsoRvHjpISPrDmSjsGzuSu8UjwgbtmoCTzmhgTurHJw/edit?usp=sharing Author: zsxwing <[email protected]> Closes apache#7276 from zsxwing/receiver-scheduling and squashes the following commits: 137b257 [zsxwing] Add preferredNumExecutors to rescheduleReceiver 61a6c3f [zsxwing] Set state to ReceiverState.INACTIVE in deregisterReceiver 5e1fa48 [zsxwing] Fix the code style 7451498 [zsxwing] Move DummyReceiver back to ReceiverTrackerSuite 715ef9c [zsxwing] Rename: scheduledLocations -> scheduledExecutors; locations -> executors 05daf9c [zsxwing] Use receiverTrackingInfo.toReceiverInfo 1d6d7c8 [zsxwing] Merge branch 'master' into receiver-scheduling 8f93c8d [zsxwing] Use hostPort as the receiver location rather than host; fix comments and unit tests 59f8887 [zsxwing] Schedule all receivers at the same time when launching them 075e0a3 [zsxwing] Add receiver RDD name; use '!isTrackerStarted' instead 276a4ac [zsxwing] Remove "ReceiverLauncher" and move codes to "launchReceivers" fab9a01 [zsxwing] Move methods back to the outer class 4e639c4 [zsxwing] Fix unintentional changes f60d021 [zsxwing] Reorganize ReceiverTracker to use an event loop for lock free 105037e [zsxwing] Merge branch 'master' into receiver-scheduling 5fee132 [zsxwing] Update tha scheduling algorithm to avoid to keep restarting Receiver 9e242c8 [zsxwing] Remove the ScheduleReceiver message because we can refuse it when receiving RegisterReceiver a9acfbf [zsxwing] Merge branch 'squash-pr-6294' into receiver-scheduling 881edb9 [zsxwing] ReceiverScheduler -> ReceiverSchedulingPolicy e530bcc [zsxwing] [SPARK-5681][Streaming] Use a lock to eliminate the race condition when stopping receivers and registering receivers happen at the same time apache#6294 3b87e4a [zsxwing] Revert SparkContext.scala a86850c [zsxwing] Remove submitAsyncJob and revert JobWaiter f549595 [zsxwing] Add comments for the scheduling approach 9ecc08e [zsxwing] Fix comments and code style 28d1bee [zsxwing] Make 'host' protected; rescheduleReceiver -> getAllowedLocations 2c86a9e [zsxwing] Use tryFailure to support calling jobFailed multiple times ca6fe35 [zsxwing] Add a test for Receiver.restart 27acd45 [zsxwing] Add unit tests for LoadBalanceReceiverSchedulerImplSuite cc76142 [zsxwing] Add JobWaiter.toFuture to avoid blocking threads d9a3e72 [zsxwing] Add a new Receiver scheduling mechanism
1 parent ce89ff4 commit daa1964

File tree

9 files changed

+674
-233
lines changed

9 files changed

+674
-233
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ import scala.collection.mutable.ArrayBuffer
2424
import scala.concurrent._
2525
import scala.util.control.NonFatal
2626

27-
import org.apache.spark.{Logging, SparkConf}
27+
import org.apache.spark.{SparkEnv, Logging, SparkConf}
2828
import org.apache.spark.storage.StreamBlockId
29-
import org.apache.spark.util.ThreadUtils
29+
import org.apache.spark.util.{Utils, ThreadUtils}
3030

3131
/**
3232
* Abstract class that is responsible for supervising a Receiver in the worker.

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.storage.StreamBlockId
3030
import org.apache.spark.streaming.Time
3131
import org.apache.spark.streaming.scheduler._
3232
import org.apache.spark.streaming.util.WriteAheadLogUtils
33-
import org.apache.spark.util.{RpcUtils, Utils}
33+
import org.apache.spark.util.RpcUtils
3434
import org.apache.spark.{Logging, SparkEnv, SparkException}
3535

3636
/**
@@ -46,6 +46,8 @@ private[streaming] class ReceiverSupervisorImpl(
4646
checkpointDirOption: Option[String]
4747
) extends ReceiverSupervisor(receiver, env.conf) with Logging {
4848

49+
private val hostPort = SparkEnv.get.blockManager.blockManagerId.hostPort
50+
4951
private val receivedBlockHandler: ReceivedBlockHandler = {
5052
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
5153
if (checkpointDirOption.isEmpty) {
@@ -170,7 +172,7 @@ private[streaming] class ReceiverSupervisorImpl(
170172

171173
override protected def onReceiverStart(): Boolean = {
172174
val msg = RegisterReceiver(
173-
streamId, receiver.getClass.getSimpleName, Utils.localHostName(), endpoint)
175+
streamId, receiver.getClass.getSimpleName, hostPort, endpoint)
174176
trackerEndpoint.askWithRetry[Boolean](msg)
175177
}
176178

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import org.apache.spark.rpc.RpcEndpointRef
2828
case class ReceiverInfo(
2929
streamId: Int,
3030
name: String,
31-
private[streaming] val endpoint: RpcEndpointRef,
3231
active: Boolean,
3332
location: String,
3433
lastErrorMessage: String = "",
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.scheduler
19+
20+
import scala.collection.Map
21+
import scala.collection.mutable
22+
23+
import org.apache.spark.streaming.receiver.Receiver
24+
25+
private[streaming] class ReceiverSchedulingPolicy {
26+
27+
/**
28+
* Try our best to schedule receivers with evenly distributed. However, if the
29+
* `preferredLocation`s of receivers are not even, we may not be able to schedule them evenly
30+
* because we have to respect them.
31+
*
32+
* Here is the approach to schedule executors:
33+
* <ol>
34+
* <li>First, schedule all the receivers with preferred locations (hosts), evenly among the
35+
* executors running on those host.</li>
36+
* <li>Then, schedule all other receivers evenly among all the executors such that overall
37+
* distribution over all the receivers is even.</li>
38+
* </ol>
39+
*
40+
* This method is called when we start to launch receivers at the first time.
41+
*/
42+
def scheduleReceivers(
43+
receivers: Seq[Receiver[_]], executors: Seq[String]): Map[Int, Seq[String]] = {
44+
if (receivers.isEmpty) {
45+
return Map.empty
46+
}
47+
48+
if (executors.isEmpty) {
49+
return receivers.map(_.streamId -> Seq.empty).toMap
50+
}
51+
52+
val hostToExecutors = executors.groupBy(_.split(":")(0))
53+
val scheduledExecutors = Array.fill(receivers.length)(new mutable.ArrayBuffer[String])
54+
val numReceiversOnExecutor = mutable.HashMap[String, Int]()
55+
// Set the initial value to 0
56+
executors.foreach(e => numReceiversOnExecutor(e) = 0)
57+
58+
// Firstly, we need to respect "preferredLocation". So if a receiver has "preferredLocation",
59+
// we need to make sure the "preferredLocation" is in the candidate scheduled executor list.
60+
for (i <- 0 until receivers.length) {
61+
// Note: preferredLocation is host but executors are host:port
62+
receivers(i).preferredLocation.foreach { host =>
63+
hostToExecutors.get(host) match {
64+
case Some(executorsOnHost) =>
65+
// preferredLocation is a known host. Select an executor that has the least receivers in
66+
// this host
67+
val leastScheduledExecutor =
68+
executorsOnHost.minBy(executor => numReceiversOnExecutor(executor))
69+
scheduledExecutors(i) += leastScheduledExecutor
70+
numReceiversOnExecutor(leastScheduledExecutor) =
71+
numReceiversOnExecutor(leastScheduledExecutor) + 1
72+
case None =>
73+
// preferredLocation is an unknown host.
74+
// Note: There are two cases:
75+
// 1. This executor is not up. But it may be up later.
76+
// 2. This executor is dead, or it's not a host in the cluster.
77+
// Currently, simply add host to the scheduled executors.
78+
scheduledExecutors(i) += host
79+
}
80+
}
81+
}
82+
83+
// For those receivers that don't have preferredLocation, make sure we assign at least one
84+
// executor to them.
85+
for (scheduledExecutorsForOneReceiver <- scheduledExecutors.filter(_.isEmpty)) {
86+
// Select the executor that has the least receivers
87+
val (leastScheduledExecutor, numReceivers) = numReceiversOnExecutor.minBy(_._2)
88+
scheduledExecutorsForOneReceiver += leastScheduledExecutor
89+
numReceiversOnExecutor(leastScheduledExecutor) = numReceivers + 1
90+
}
91+
92+
// Assign idle executors to receivers that have less executors
93+
val idleExecutors = numReceiversOnExecutor.filter(_._2 == 0).map(_._1)
94+
for (executor <- idleExecutors) {
95+
// Assign an idle executor to the receiver that has least candidate executors.
96+
val leastScheduledExecutors = scheduledExecutors.minBy(_.size)
97+
leastScheduledExecutors += executor
98+
}
99+
100+
receivers.map(_.streamId).zip(scheduledExecutors).toMap
101+
}
102+
103+
/**
104+
* Return a list of candidate executors to run the receiver. If the list is empty, the caller can
105+
* run this receiver in arbitrary executor. The caller can use `preferredNumExecutors` to require
106+
* returning `preferredNumExecutors` executors if possible.
107+
*
108+
* This method tries to balance executors' load. Here is the approach to schedule executors
109+
* for a receiver.
110+
* <ol>
111+
* <li>
112+
* If preferredLocation is set, preferredLocation should be one of the candidate executors.
113+
* </li>
114+
* <li>
115+
* Every executor will be assigned to a weight according to the receivers running or
116+
* scheduling on it.
117+
* <ul>
118+
* <li>
119+
* If a receiver is running on an executor, it contributes 1.0 to the executor's weight.
120+
* </li>
121+
* <li>
122+
* If a receiver is scheduled to an executor but has not yet run, it contributes
123+
* `1.0 / #candidate_executors_of_this_receiver` to the executor's weight.</li>
124+
* </ul>
125+
* At last, if there are more than `preferredNumExecutors` idle executors (weight = 0),
126+
* returns all idle executors. Otherwise, we only return `preferredNumExecutors` best options
127+
* according to the weights.
128+
* </li>
129+
* </ol>
130+
*
131+
* This method is called when a receiver is registering with ReceiverTracker or is restarting.
132+
*/
133+
def rescheduleReceiver(
134+
receiverId: Int,
135+
preferredLocation: Option[String],
136+
receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo],
137+
executors: Seq[String],
138+
preferredNumExecutors: Int = 3): Seq[String] = {
139+
if (executors.isEmpty) {
140+
return Seq.empty
141+
}
142+
143+
// Always try to schedule to the preferred locations
144+
val scheduledExecutors = mutable.Set[String]()
145+
scheduledExecutors ++= preferredLocation
146+
147+
val executorWeights = receiverTrackingInfoMap.values.flatMap { receiverTrackingInfo =>
148+
receiverTrackingInfo.state match {
149+
case ReceiverState.INACTIVE => Nil
150+
case ReceiverState.SCHEDULED =>
151+
val scheduledExecutors = receiverTrackingInfo.scheduledExecutors.get
152+
// The probability that a scheduled receiver will run in an executor is
153+
// 1.0 / scheduledLocations.size
154+
scheduledExecutors.map(location => location -> (1.0 / scheduledExecutors.size))
155+
case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0)
156+
}
157+
}.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor
158+
159+
val idleExecutors = (executors.toSet -- executorWeights.keys).toSeq
160+
if (idleExecutors.size >= preferredNumExecutors) {
161+
// If there are more than `preferredNumExecutors` idle executors, return all of them
162+
scheduledExecutors ++= idleExecutors
163+
} else {
164+
// If there are less than `preferredNumExecutors` idle executors, return 3 best options
165+
scheduledExecutors ++= idleExecutors
166+
val sortedExecutors = executorWeights.toSeq.sortBy(_._2).map(_._1)
167+
scheduledExecutors ++= (idleExecutors ++ sortedExecutors).take(preferredNumExecutors)
168+
}
169+
scheduledExecutors.toSeq
170+
}
171+
}

0 commit comments

Comments
 (0)