Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d9a3e72
Add a new Receiver scheduling mechanism
zsxwing Jun 30, 2015
cc76142
Add JobWaiter.toFuture to avoid blocking threads
zsxwing Jul 8, 2015
27acd45
Add unit tests for LoadBalanceReceiverSchedulerImplSuite
zsxwing Jul 8, 2015
ca6fe35
Add a test for Receiver.restart
zsxwing Jul 8, 2015
2c86a9e
Use tryFailure to support calling jobFailed multiple times
zsxwing Jul 9, 2015
28d1bee
Make 'host' protected; rescheduleReceiver -> getAllowedLocations
zsxwing Jul 11, 2015
9ecc08e
Fix comments and code style
zsxwing Jul 14, 2015
f549595
Add comments for the scheduling approach
zsxwing Jul 14, 2015
a86850c
Remove submitAsyncJob and revert JobWaiter
zsxwing Jul 14, 2015
3b87e4a
Revert SparkContext.scala
zsxwing Jul 14, 2015
e530bcc
[SPARK-5681][Streaming] Use a lock to eliminate the race condition wh…
zsxwing May 25, 2015
881edb9
ReceiverScheduler -> ReceiverSchedulingPolicy
zsxwing Jul 15, 2015
a9acfbf
Merge branch 'squash-pr-6294' into receiver-scheduling
zsxwing Jul 15, 2015
9e242c8
Remove the ScheduleReceiver message because we can refuse it when rec…
zsxwing Jul 15, 2015
5fee132
Update tha scheduling algorithm to avoid to keep restarting Receiver
zsxwing Jul 15, 2015
105037e
Merge branch 'master' into receiver-scheduling
zsxwing Jul 20, 2015
f60d021
Reorganize ReceiverTracker to use an event loop for lock free
zsxwing Jul 21, 2015
4e639c4
Fix unintentional changes
zsxwing Jul 21, 2015
fab9a01
Move methods back to the outer class
zsxwing Jul 22, 2015
276a4ac
Remove "ReceiverLauncher" and move codes to "launchReceivers"
zsxwing Jul 22, 2015
075e0a3
Add receiver RDD name; use '!isTrackerStarted' instead
zsxwing Jul 22, 2015
59f8887
Schedule all receivers at the same time when launching them
zsxwing Jul 22, 2015
8f93c8d
Use hostPort as the receiver location rather than host; fix comments …
zsxwing Jul 22, 2015
1d6d7c8
Merge branch 'master' into receiver-scheduling
zsxwing Jul 23, 2015
05daf9c
Use receiverTrackingInfo.toReceiverInfo
zsxwing Jul 23, 2015
715ef9c
Rename: scheduledLocations -> scheduledExecutors; locations -> executors
zsxwing Jul 23, 2015
7451498
Move DummyReceiver back to ReceiverTrackerSuite
zsxwing Jul 23, 2015
5e1fa48
Fix the code style
zsxwing Jul 23, 2015
61a6c3f
Set state to ReceiverState.INACTIVE in deregisterReceiver
zsxwing Jul 23, 2015
137b257
Add preferredNumExecutors to rescheduleReceiver
zsxwing Jul 23, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import scala.collection.mutable.ArrayBuffer
import scala.concurrent._
import scala.util.control.NonFatal

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.{SparkEnv, Logging, SparkConf}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.util.ThreadUtils
import org.apache.spark.util.{Utils, ThreadUtils}

/**
* Abstract class that is responsible for supervising a Receiver in the worker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.util.{RpcUtils, Utils}
import org.apache.spark.util.RpcUtils
import org.apache.spark.{Logging, SparkEnv, SparkException}

/**
Expand All @@ -46,6 +46,8 @@ private[streaming] class ReceiverSupervisorImpl(
checkpointDirOption: Option[String]
) extends ReceiverSupervisor(receiver, env.conf) with Logging {

private val hostPort = SparkEnv.get.blockManager.blockManagerId.hostPort

private val receivedBlockHandler: ReceivedBlockHandler = {
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
if (checkpointDirOption.isEmpty) {
Expand Down Expand Up @@ -170,7 +172,7 @@ private[streaming] class ReceiverSupervisorImpl(

override protected def onReceiverStart(): Boolean = {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, Utils.localHostName(), endpoint)
streamId, receiver.getClass.getSimpleName, hostPort, endpoint)
trackerEndpoint.askWithRetry[Boolean](msg)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.spark.rpc.RpcEndpointRef
case class ReceiverInfo(
streamId: Int,
name: String,
private[streaming] val endpoint: RpcEndpointRef,
active: Boolean,
location: String,
lastErrorMessage: String = "",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.scheduler

import scala.collection.Map
import scala.collection.mutable

import org.apache.spark.streaming.receiver.Receiver

private[streaming] class ReceiverSchedulingPolicy {

/**
* Try our best to schedule receivers with evenly distributed. However, if the
* `preferredLocation`s of receivers are not even, we may not be able to schedule them evenly
* because we have to respect them.
*
* Here is the approach to schedule executors:
* <ol>
* <li>First, schedule all the receivers with preferred locations (hosts), evenly among the
* executors running on those host.</li>
* <li>Then, schedule all other receivers evenly among all the executors such that overall
* distribution over all the receivers is even.</li>
* </ol>
*
* This method is called when we start to launch receivers at the first time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to elaborate the algorithm in the comments. .. things like

  1. First, schedule all the receivers with preferred locations (hosts), evenly among the executors running on those hosts
  2. Then, schedule all other receivers evenly among all the executors such that overall distribution over all the receivers is even.

*/
def scheduleReceivers(
receivers: Seq[Receiver[_]], executors: Seq[String]): Map[Int, Seq[String]] = {
if (receivers.isEmpty) {
return Map.empty
}

if (executors.isEmpty) {
return receivers.map(_.streamId -> Seq.empty).toMap
}

val hostToExecutors = executors.groupBy(_.split(":")(0))
val scheduledExecutors = Array.fill(receivers.length)(new mutable.ArrayBuffer[String])
val numReceiversOnExecutor = mutable.HashMap[String, Int]()
// Set the initial value to 0
executors.foreach(e => numReceiversOnExecutor(e) = 0)

// Firstly, we need to respect "preferredLocation". So if a receiver has "preferredLocation",
// we need to make sure the "preferredLocation" is in the candidate scheduled executor list.
for (i <- 0 until receivers.length) {
// Note: preferredLocation is host but executors are host:port
receivers(i).preferredLocation.foreach { host =>
hostToExecutors.get(host) match {
case Some(executorsOnHost) =>
// preferredLocation is a known host. Select an executor that has the least receivers in
// this host
val leastScheduledExecutor =
executorsOnHost.minBy(executor => numReceiversOnExecutor(executor))
scheduledExecutors(i) += leastScheduledExecutor
numReceiversOnExecutor(leastScheduledExecutor) =
numReceiversOnExecutor(leastScheduledExecutor) + 1
case None =>
// preferredLocation is an unknown host.
// Note: There are two cases:
// 1. This executor is not up. But it may be up later.
// 2. This executor is dead, or it's not a host in the cluster.
// Currently, simply add host to the scheduled executors.
scheduledExecutors(i) += host
}
}
}

// For those receivers that don't have preferredLocation, make sure we assign at least one
// executor to them.
for (scheduledExecutorsForOneReceiver <- scheduledExecutors.filter(_.isEmpty)) {
// Select the executor that has the least receivers
val (leastScheduledExecutor, numReceivers) = numReceiversOnExecutor.minBy(_._2)
scheduledExecutorsForOneReceiver += leastScheduledExecutor
numReceiversOnExecutor(leastScheduledExecutor) = numReceivers + 1
}

// Assign idle executors to receivers that have less executors
val idleExecutors = numReceiversOnExecutor.filter(_._2 == 0).map(_._1)
for (executor <- idleExecutors) {
// Assign an idle executor to the receiver that has least candidate executors.
val leastScheduledExecutors = scheduledExecutors.minBy(_.size)
leastScheduledExecutors += executor
}

receivers.map(_.streamId).zip(scheduledExecutors).toMap
}

/**
* Return a list of candidate executors to run the receiver. If the list is empty, the caller can
* run this receiver in arbitrary executor. The caller can use `preferredNumExecutors` to require
* returning `preferredNumExecutors` executors if possible.
*
* This method tries to balance executors' load. Here is the approach to schedule executors
* for a receiver.
* <ol>
* <li>
* If preferredLocation is set, preferredLocation should be one of the candidate executors.
* </li>
* <li>
* Every executor will be assigned to a weight according to the receivers running or
* scheduling on it.
* <ul>
* <li>
* If a receiver is running on an executor, it contributes 1.0 to the executor's weight.
* </li>
* <li>
* If a receiver is scheduled to an executor but has not yet run, it contributes
* `1.0 / #candidate_executors_of_this_receiver` to the executor's weight.</li>
* </ul>
* At last, if there are more than `preferredNumExecutors` idle executors (weight = 0),
* returns all idle executors. Otherwise, we only return `preferredNumExecutors` best options
* according to the weights.
* </li>
* </ol>
*
* This method is called when a receiver is registering with ReceiverTracker or is restarting.
*/
def rescheduleReceiver(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method was called scheduleReceiver. I renamed it to rescheduleReceiver to avoid confusing with scheduleReceivers.

receiverId: Int,
preferredLocation: Option[String],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be more consistent with scheduleReceivers to pass the whole Receiver object, rather than receiverId and preferredLocatons.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using receiverId and preferredLocatons is because we don't store the Receiver object currently. The Receiver object is only available when launching it.

receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo],
executors: Seq[String],
preferredNumExecutors: Int = 3): Seq[String] = {
if (executors.isEmpty) {
return Seq.empty
}

// Always try to schedule to the preferred locations
val scheduledExecutors = mutable.Set[String]()
scheduledExecutors ++= preferredLocation

val executorWeights = receiverTrackingInfoMap.values.flatMap { receiverTrackingInfo =>
receiverTrackingInfo.state match {
case ReceiverState.INACTIVE => Nil
case ReceiverState.SCHEDULED =>
val scheduledExecutors = receiverTrackingInfo.scheduledExecutors.get
// The probability that a scheduled receiver will run in an executor is
// 1.0 / scheduledLocations.size
scheduledExecutors.map(location => location -> (1.0 / scheduledExecutors.size))
case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0)
}
}.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor

val idleExecutors = (executors.toSet -- executorWeights.keys).toSeq
if (idleExecutors.size >= preferredNumExecutors) {
// If there are more than `preferredNumExecutors` idle executors, return all of them
scheduledExecutors ++= idleExecutors
} else {
// If there are less than `preferredNumExecutors` idle executors, return 3 best options
scheduledExecutors ++= idleExecutors
val sortedExecutors = executorWeights.toSeq.sortBy(_._2).map(_._1)
scheduledExecutors ++= (idleExecutors ++ sortedExecutors).take(preferredNumExecutors)
}
scheduledExecutors.toSeq
}
}
Loading