Skip to content

Commit d9a3e72

Browse files
committed
Add a new Receiver scheduling mechanism
1 parent 1cb2629 commit d9a3e72

File tree

5 files changed

+286
-37
lines changed

5 files changed

+286
-37
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1859,6 +1859,29 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
18591859
new SimpleFutureAction(waiter, resultFunc)
18601860
}
18611861

1862+
/**
1863+
* Submit a job for execution and return a FutureJob holding the result.
1864+
*/
1865+
private[spark] def submitAsyncJob[T, U, R](
1866+
rdd: RDD[T],
1867+
processPartition: (TaskContext, Iterator[T]) => U,
1868+
resultHandler: (Int, U) => Unit,
1869+
resultFunc: => R): SimpleFutureAction[R] =
1870+
{
1871+
assertNotStopped()
1872+
val cleanF = clean(processPartition)
1873+
val callSite = getCallSite
1874+
val waiter = dagScheduler.submitJob(
1875+
rdd,
1876+
(context: TaskContext, iter: Iterator[T]) => cleanF(context, iter),
1877+
0 until rdd.partitions.size,
1878+
callSite,
1879+
allowLocal = false,
1880+
resultHandler,
1881+
localProperties.get)
1882+
new SimpleFutureAction(waiter, resultFunc)
1883+
}
1884+
18621885
/**
18631886
* Cancel active jobs for the specified group. See [[org.apache.spark.SparkContext.setJobGroup]]
18641887
* for more information.

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.concurrent._
2525

2626
import org.apache.spark.{Logging, SparkConf}
2727
import org.apache.spark.storage.StreamBlockId
28-
import org.apache.spark.util.ThreadUtils
28+
import org.apache.spark.util.{Utils, ThreadUtils}
2929

3030
/**
3131
* Abstract class that is responsible for supervising a Receiver in the worker.
@@ -64,6 +64,8 @@ private[streaming] abstract class ReceiverSupervisor(
6464
/** State of the receiver */
6565
@volatile private[streaming] var receiverState = Initialized
6666

67+
val host = Utils.localHostName()
68+
6769
/** Push a single data item to backend data store. */
6870
def pushSingle(data: Any)
6971

@@ -161,12 +163,19 @@ private[streaming] abstract class ReceiverSupervisor(
161163
stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
162164
logDebug("Sleeping for " + delay)
163165
Thread.sleep(delay)
164-
logInfo("Starting receiver again")
165-
startReceiver()
166-
logInfo("Receiver started again")
166+
if (rescheduleReceiver().contains(host)) {
167+
logInfo("Starting receiver again")
168+
startReceiver()
169+
logInfo("Receiver started again")
170+
} else {
171+
stop("Receiver is scheduled to another executor", None)
172+
}
167173
}(futureExecutionContext)
168174
}
169175

176+
/** Reschedule this receiver and return a candidate executor list */
177+
def rescheduleReceiver(): Seq[String]
178+
170179
/** Check if receiver has been marked for stopping */
171180
def isReceiverStarted(): Boolean = {
172181
logDebug("state = " + receiverState)

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.storage.StreamBlockId
3030
import org.apache.spark.streaming.Time
3131
import org.apache.spark.streaming.scheduler._
3232
import org.apache.spark.streaming.util.WriteAheadLogUtils
33-
import org.apache.spark.util.{RpcUtils, Utils}
33+
import org.apache.spark.util.RpcUtils
3434
import org.apache.spark.{Logging, SparkEnv, SparkException}
3535

3636
/**
@@ -164,7 +164,7 @@ private[streaming] class ReceiverSupervisorImpl(
164164

165165
override protected def onReceiverStart() {
166166
val msg = RegisterReceiver(
167-
streamId, receiver.getClass.getSimpleName, Utils.localHostName(), endpoint)
167+
streamId, receiver.getClass.getSimpleName, host, endpoint)
168168
trackerEndpoint.askWithRetry[Boolean](msg)
169169
}
170170

@@ -182,4 +182,8 @@ private[streaming] class ReceiverSupervisorImpl(
182182
logDebug(s"Cleaning up blocks older then $cleanupThreshTime")
183183
receivedBlockHandler.cleanupOldBlocks(cleanupThreshTime.milliseconds)
184184
}
185+
186+
override def rescheduleReceiver(): Seq[String] = {
187+
trackerEndpoint.askWithRetry[Seq[String]](ScheduleReceiver(streamId))
188+
}
185189
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.scheduler
19+
20+
import scala.collection.mutable
21+
import scala.util.Random
22+
23+
import org.apache.spark.streaming.scheduler.ReceiverState._
24+
25+
private[streaming] case class ReceiverTrackingInfo(
26+
receiverId: Int,
27+
state: ReceiverState,
28+
preferredLocation: Option[String],
29+
scheduledLocations: Option[Seq[String]],
30+
runningLocation: Option[String])
31+
32+
private[streaming] trait ReceiverScheduler {
33+
34+
def scheduleReceiver(
35+
receiverId: Int,
36+
preferredLocation: Option[String],
37+
receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo],
38+
executors: Seq[String]): Seq[String]
39+
}
40+
41+
/**
42+
* A ReceiverScheduler trying to balance executors' load.
43+
*/
44+
private[streaming] class LoadBalanceReceiverSchedulerImpl extends ReceiverScheduler {
45+
46+
def scheduleReceiver(
47+
receiverId: Int,
48+
preferredLocation: Option[String],
49+
receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo],
50+
executors: Seq[String]): Seq[String] = {
51+
if (executors.isEmpty) {
52+
return Seq.empty
53+
}
54+
55+
// Always try to schedule to the preferred locations
56+
val locations = mutable.Set[String]()
57+
locations ++= preferredLocation
58+
59+
val executorWeights = receiverTrackingInfoMap.filter { case (id, _) =>
60+
// Ignore the receiver to be scheduled. It may be still running.
61+
id != receiverId
62+
}.values.flatMap { receiverTrackingInfo =>
63+
receiverTrackingInfo.state match {
64+
case ReceiverState.INACTIVE => Nil
65+
case ReceiverState.SCHEDULED =>
66+
val scheduledLocations = receiverTrackingInfo.scheduledLocations.get
67+
// The probability that a scheduled receiver will run in an executor is
68+
// 1.0 / scheduledLocations.size
69+
scheduledLocations.map(location => (location, 1.0 / scheduledLocations.size))
70+
case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningLocation.get -> 1.0)
71+
}
72+
}.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor
73+
74+
val idleExecutors = (executors.toSet -- executorWeights.keys).toSeq
75+
if (idleExecutors.nonEmpty) {
76+
// If there are idle executors, randomly select one
77+
locations += idleExecutors(Random.nextInt(idleExecutors.size))
78+
} else {
79+
// Use the executor that runs the least receivers
80+
locations += executorWeights.minBy(_._2)._1
81+
}
82+
locations.toSeq
83+
}
84+
}

0 commit comments

Comments
 (0)