From 5fba8d46db62118711ff965efc6c6772ba1ddd1b Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 20 Aug 2015 18:25:54 +0800 Subject: [PATCH 1/8] Avoid to restart receivers if scheduleReceivers returns balanced results --- .../scheduler/ReceiverSchedulingPolicy.scala | 49 ++++++++++--------- .../ReceiverSchedulingPolicySuite.scala | 34 ++++++++++--- 2 files changed, 53 insertions(+), 30 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala index ef5b687b5831..ff8c2123f804 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala @@ -102,8 +102,7 @@ private[streaming] class ReceiverSchedulingPolicy { /** * Return a list of candidate executors to run the receiver. If the list is empty, the caller can - * run this receiver in arbitrary executor. The caller can use `preferredNumExecutors` to require - * returning `preferredNumExecutors` executors if possible. + * run this receiver in arbitrary executor. * * This method tries to balance executors' load. Here is the approach to schedule executors * for a receiver. @@ -122,9 +121,8 @@ private[streaming] class ReceiverSchedulingPolicy { * If a receiver is scheduled to an executor but has not yet run, it contributes * `1.0 / #candidate_executors_of_this_receiver` to the executor's weight. * - * At last, if there are more than `preferredNumExecutors` idle executors (weight = 0), - * returns all idle executors. Otherwise, we only return `preferredNumExecutors` best options - * according to the weights. + * At last, if there are any idle executors (weight = 0), returns all idle executors. + * Otherwise, returns the executors that have the minimum weight. * * * @@ -134,8 +132,7 @@ private[streaming] class ReceiverSchedulingPolicy { receiverId: Int, preferredLocation: Option[String], receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo], - executors: Seq[String], - preferredNumExecutors: Int = 3): Seq[String] = { + executors: Seq[String]): Seq[String] = { if (executors.isEmpty) { return Seq.empty } @@ -144,27 +141,31 @@ private[streaming] class ReceiverSchedulingPolicy { val scheduledExecutors = mutable.Set[String]() scheduledExecutors ++= preferredLocation - val executorWeights = receiverTrackingInfoMap.values.flatMap { receiverTrackingInfo => - receiverTrackingInfo.state match { - case ReceiverState.INACTIVE => Nil - case ReceiverState.SCHEDULED => - val scheduledExecutors = receiverTrackingInfo.scheduledExecutors.get - // The probability that a scheduled receiver will run in an executor is - // 1.0 / scheduledLocations.size - scheduledExecutors.map(location => location -> (1.0 / scheduledExecutors.size)) - case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0) - } + val executorWeights = receiverTrackingInfoMap.filter(_._1 != receiverId).values.flatMap { + receiverTrackingInfo => + receiverTrackingInfo.state match { + case ReceiverState.INACTIVE => Nil + case ReceiverState.SCHEDULED => + val scheduledExecutors = receiverTrackingInfo.scheduledExecutors.get + // The probability that a scheduled receiver will run in an executor is + // 1.0 / scheduledLocations.size + scheduledExecutors.map(location => location -> (1.0 / scheduledExecutors.size)) + case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0) + } }.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor - val idleExecutors = (executors.toSet -- executorWeights.keys).toSeq - if (idleExecutors.size >= preferredNumExecutors) { - // If there are more than `preferredNumExecutors` idle executors, return all of them + val idleExecutors = executors.toSet -- executorWeights.keys + if (idleExecutors.nonEmpty) { scheduledExecutors ++= idleExecutors } else { - // If there are less than `preferredNumExecutors` idle executors, return 3 best options - scheduledExecutors ++= idleExecutors - val sortedExecutors = executorWeights.toSeq.sortBy(_._2).map(_._1) - scheduledExecutors ++= (idleExecutors ++ sortedExecutors).take(preferredNumExecutors) + // There is no idle executor. So select all executors that have the minimum weight. + val sortedExecutors = executorWeights.toSeq.sortBy(_._2) + if (sortedExecutors.nonEmpty) { + val minWeight = sortedExecutors(0)._2 + scheduledExecutors ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1) + } else { + // This should not happen since "executors" is not empty + } } scheduledExecutors.toSeq } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala index 0418d776ecc9..ea63a357062a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala @@ -39,7 +39,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite { assert(scheduledExecutors.toSet === Set("host1", "host2")) } - test("rescheduleReceiver: return all idle executors if more than 3 idle executors") { + test("rescheduleReceiver: return all idle executors if there are any idle executors") { val executors = Seq("host1", "host2", "host3", "host4", "host5") // host3 is idle val receiverTrackingInfoMap = Map( @@ -49,16 +49,16 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite { assert(scheduledExecutors.toSet === Set("host2", "host3", "host4", "host5")) } - test("rescheduleReceiver: return 3 best options if less than 3 idle executors") { + test("rescheduleReceiver: return executors that have minimum weight if no idle executors") { val executors = Seq("host1", "host2", "host3", "host4", "host5") - // Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0 - // host4 and host5 are idle + // Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0, host4 = 0.5, host5 = 0.5 val receiverTrackingInfoMap = Map( 0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")), 1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host2", "host3")), None), - 2 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None)) + 2 -> ReceiverTrackingInfo(2, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None), + 3 -> ReceiverTrackingInfo(4, ReceiverState.SCHEDULED, Some(Seq("host4", "host5")), None)) val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver( - 3, None, receiverTrackingInfoMap, executors) + 4, None, receiverTrackingInfoMap, executors) assert(scheduledExecutors.toSet === Set("host2", "host4", "host5")) } @@ -127,4 +127,26 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite { assert(executors.isEmpty) } } + + test("when scheduleReceivers return a balanced result, we should not restart receivers") { + val receivers = (0 until 6).map(new RateTestReceiver(_)) + val executors = (10000 until 10005).map(port => s"localhost:${port}") + val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors) + val receiverTrackingInfoMap = scheduledExecutors.map { case (receiverId, executors) => + assert(executors.size === 1) // Each receiver has been assigned one executor + receiverId -> ReceiverTrackingInfo(receiverId, ReceiverState.SCHEDULED, Some(executors), None) + } + // "scheduledExecutors" has already been balanced, assume we launch receivers as + // "scheduledExecutors" suggested + for (receiverId <- receivers.map(_.streamId)) { + val scheduledExecutorsWhenRegistering = receiverSchedulingPolicy.rescheduleReceiver( + receiverId, None, receiverTrackingInfoMap, executors) + // Assume the receiver has been launched in the exact scheduled location + val runningLocation = scheduledExecutors(receiverId)(0) + // Since all receivers are launched as "scheduledExecutors" suggested, we should allow it to + // run in its current runningLocation, so "scheduledExecutorsWhenRegistering" should contains + // "runningLocation" + assert(scheduledExecutorsWhenRegistering.contains(runningLocation)) + } + } } From 7283bf7f53aa1d69cd8c60514c520e8937aefd15 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 21 Aug 2015 15:02:11 +0800 Subject: [PATCH 2/8] When the receiver registers at the first time, use the stored scheduled executors to check it and don't call "rescheduleReceiver" --- .../streaming/scheduler/ReceiverTracker.scala | 52 +++++++++++++------ 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index aae3acf7aba3..fd385660be1b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -244,24 +244,43 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false } if (isTrackerStopping || isTrackerStopped) { - false - } else if (!scheduleReceiver(streamId).contains(hostPort)) { - // Refuse it since it's scheduled to a wrong executor - false + return false + } + + val scheduledExecutors = receiverTrackingInfos(streamId).scheduledExecutors + if (scheduledExecutors.nonEmpty) { + // This receiver is registering at the first time, it's scheduled by + // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledExecutors" to check it. + if (!scheduledExecutors.get.contains(hostPort)) { + // Refuse it since it's scheduled to a wrong executor + val oldReceiverInfo = receiverTrackingInfos(streamId) + // Clear "scheduledExecutors" to mark this receiver has already tried to register. + val newReceiverInfo = oldReceiverInfo.copy( + state = ReceiverState.INACTIVE, scheduledExecutors = None) + receiverTrackingInfos(streamId) = newReceiverInfo + return false + } } else { - val name = s"${typ}-${streamId}" - val receiverTrackingInfo = ReceiverTrackingInfo( - streamId, - ReceiverState.ACTIVE, - scheduledExecutors = None, - runningExecutor = Some(hostPort), - name = Some(name), - endpoint = Some(receiverEndpoint)) - receiverTrackingInfos.put(streamId, receiverTrackingInfo) - listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo)) - logInfo("Registered receiver for stream " + streamId + " from " + senderAddress) - true + // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling + // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it. + if (!scheduleReceiver(streamId).contains(hostPort)) { + // Refuse it since it's scheduled to a wrong executor + return false + } } + + val name = s"${typ}-${streamId}" + val receiverTrackingInfo = ReceiverTrackingInfo( + streamId, + ReceiverState.ACTIVE, + scheduledExecutors = None, + runningExecutor = Some(hostPort), + name = Some(name), + endpoint = Some(receiverEndpoint)) + receiverTrackingInfos.put(streamId, receiverTrackingInfo) + listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo)) + logInfo("Registered receiver for stream " + streamId + " from " + senderAddress) + true } /** Deregister a receiver */ @@ -431,7 +450,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false receiver.preferredLocation, receiverTrackingInfos, getExecutors) - updateReceiverScheduledExecutors(receiver.streamId, scheduledExecutors) startReceiver(receiver, scheduledExecutors) case c: CleanupOldBlocks => receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c)) From f0d1f6e9992afa2be6f9efc74abf155b9dccbcf2 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 21 Aug 2015 23:10:59 +0800 Subject: [PATCH 3/8] Don't need to filter the receiver itself --- .../scheduler/ReceiverSchedulingPolicy.scala | 21 ++++++++--------- .../streaming/scheduler/ReceiverTracker.scala | 2 ++ .../ReceiverSchedulingPolicySuite.scala | 23 +------------------ 3 files changed, 13 insertions(+), 33 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala index ff8c2123f804..9a3598b5a670 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala @@ -141,17 +141,16 @@ private[streaming] class ReceiverSchedulingPolicy { val scheduledExecutors = mutable.Set[String]() scheduledExecutors ++= preferredLocation - val executorWeights = receiverTrackingInfoMap.filter(_._1 != receiverId).values.flatMap { - receiverTrackingInfo => - receiverTrackingInfo.state match { - case ReceiverState.INACTIVE => Nil - case ReceiverState.SCHEDULED => - val scheduledExecutors = receiverTrackingInfo.scheduledExecutors.get - // The probability that a scheduled receiver will run in an executor is - // 1.0 / scheduledLocations.size - scheduledExecutors.map(location => location -> (1.0 / scheduledExecutors.size)) - case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0) - } + val executorWeights = receiverTrackingInfoMap.values.flatMap { receiverTrackingInfo => + receiverTrackingInfo.state match { + case ReceiverState.INACTIVE => Nil + case ReceiverState.SCHEDULED => + val scheduledExecutors = receiverTrackingInfo.scheduledExecutors.get + // The probability that a scheduled receiver will run in an executor is + // 1.0 / scheduledLocations.size + scheduledExecutors.map(location => location -> (1.0 / scheduledExecutors.size)) + case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0) + } }.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor val idleExecutors = executors.toSet -- executorWeights.keys diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index fd385660be1b..f6d71105ffec 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -450,6 +450,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false receiver.preferredLocation, receiverTrackingInfos, getExecutors) + // Assume there is one receiver restarting at one time, so we don't need to update + // receiverTrackingInfos startReceiver(receiver, scheduledExecutors) case c: CleanupOldBlocks => receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c)) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala index ea63a357062a..b2a51d72bac2 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala @@ -49,7 +49,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite { assert(scheduledExecutors.toSet === Set("host2", "host3", "host4", "host5")) } - test("rescheduleReceiver: return executors that have minimum weight if no idle executors") { + test("rescheduleReceiver: return all executors that have minimum weight if no idle executors") { val executors = Seq("host1", "host2", "host3", "host4", "host5") // Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0, host4 = 0.5, host5 = 0.5 val receiverTrackingInfoMap = Map( @@ -128,25 +128,4 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite { } } - test("when scheduleReceivers return a balanced result, we should not restart receivers") { - val receivers = (0 until 6).map(new RateTestReceiver(_)) - val executors = (10000 until 10005).map(port => s"localhost:${port}") - val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors) - val receiverTrackingInfoMap = scheduledExecutors.map { case (receiverId, executors) => - assert(executors.size === 1) // Each receiver has been assigned one executor - receiverId -> ReceiverTrackingInfo(receiverId, ReceiverState.SCHEDULED, Some(executors), None) - } - // "scheduledExecutors" has already been balanced, assume we launch receivers as - // "scheduledExecutors" suggested - for (receiverId <- receivers.map(_.streamId)) { - val scheduledExecutorsWhenRegistering = receiverSchedulingPolicy.rescheduleReceiver( - receiverId, None, receiverTrackingInfoMap, executors) - // Assume the receiver has been launched in the exact scheduled location - val runningLocation = scheduledExecutors(receiverId)(0) - // Since all receivers are launched as "scheduledExecutors" suggested, we should allow it to - // run in its current runningLocation, so "scheduledExecutorsWhenRegistering" should contains - // "runningLocation" - assert(scheduledExecutorsWhenRegistering.contains(runningLocation)) - } - } } From 55d99574cae446e9b33602b9f076cbfdd50c11d5 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 24 Aug 2015 21:41:34 +0800 Subject: [PATCH 4/8] Update the scheduling policy and also add the policy to the docs --- .../scheduler/ReceiverSchedulingPolicy.scala | 30 ++++++++++++ .../streaming/scheduler/ReceiverTracker.scala | 46 ++++++++++++++----- 2 files changed, 65 insertions(+), 11 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala index 9a3598b5a670..10b5a7f57a80 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala @@ -22,6 +22,36 @@ import scala.collection.mutable import org.apache.spark.streaming.receiver.Receiver +/** + * A class that tries to schedule receivers with evenly distributed. There are two phases for + * scheduling receivers. + * + * - The first phase is global scheduling when ReceiverTracker is starting and we need to schedule + * all receivers at the same time. ReceiverTracker will call `scheduleReceivers` at this phase. + * It will try to schedule receivers with evenly distributed. ReceiverTracker should update its + * receiverTrackingInfoMap according to the results of `scheduleReceivers`. + * `ReceiverTrackingInfo.scheduledExecutors` for each receiver will set to an executor list that + * contains the scheduled locations. Then when a receiver is starting, it will send a register + * request and `ReceiverTracker.registerReceiver` will be called. In + * `ReceiverTracker.registerReceiver`, if a receiver's scheduled executors is set, it should check + * if the location of this receiver is one of the scheduled executors, if not, the register will + * be rejected. + * - The second phase is local scheduling when a receiver is restarting. There are two cases of + * receiver restarting: + * - If a receiver is restarting because it's rejected due to the real location and the scheduled + * executors mismatching, in other words, it fails to start in one of the locations that + * `scheduleReceivers` suggested, `ReceiverTracker` should firstly choose the executors that are + * still alive in the list of scheduled executors, then use them to launch the receiver job. + * - If a receiver is restarting without a scheduled executors list, or the executors in the list + * are dead, `ReceiverTracker` should call `rescheduleReceiver`. If so, `ReceiverTracker` should + * not set `ReceiverTrackingInfo.scheduledExecutors` for this executor, instead, it should clear + * it. Then when this receiver is registering, we can know this is a local scheduling, and + * `ReceiverTrackingInfo` should call `rescheduleReceiver` again to check if the launching + * location is matching. + * + * In conclusion, we should make a global schedule, try to achieve that exactly as long as possible, + * otherwise do local scheduling. + */ private[streaming] class ReceiverSchedulingPolicy { /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index f6d71105ffec..9a6b3cb318c3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -249,15 +249,10 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false val scheduledExecutors = receiverTrackingInfos(streamId).scheduledExecutors if (scheduledExecutors.nonEmpty) { - // This receiver is registering at the first time, it's scheduled by + // This receiver is registering and it's scheduled by // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledExecutors" to check it. if (!scheduledExecutors.get.contains(hostPort)) { // Refuse it since it's scheduled to a wrong executor - val oldReceiverInfo = receiverTrackingInfos(streamId) - // Clear "scheduledExecutors" to mark this receiver has already tried to register. - val newReceiverInfo = oldReceiverInfo.copy( - state = ReceiverState.INACTIVE, scheduledExecutors = None) - receiverTrackingInfos(streamId) = newReceiverInfo return false } } else { @@ -445,11 +440,22 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false startReceiver(receiver, executors) } case RestartReceiver(receiver) => - val scheduledExecutors = schedulingPolicy.rescheduleReceiver( - receiver.streamId, - receiver.preferredLocation, - receiverTrackingInfos, - getExecutors) + val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId) + val scheduledExecutors = if (oldScheduledExecutors.nonEmpty) { + // Try global scheduling again + oldScheduledExecutors + } else { + val oldReceiverInfo = receiverTrackingInfos(receiver.streamId) + // Clear "scheduledExecutors" to indicate we are going to do local scheduling + val newReceiverInfo = oldReceiverInfo.copy( + state = ReceiverState.INACTIVE, scheduledExecutors = None) + receiverTrackingInfos(receiver.streamId) = newReceiverInfo + schedulingPolicy.rescheduleReceiver( + receiver.streamId, + receiver.preferredLocation, + receiverTrackingInfos, + getExecutors) + } // Assume there is one receiver restarting at one time, so we don't need to update // receiverTrackingInfos startReceiver(receiver, scheduledExecutors) @@ -484,6 +490,24 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false context.reply(true) } + /** + * Return the stored scheduled executors that are still alive. + */ + private def getStoredScheduledExecutors(receiverId: Int): Seq[String] = { + if (receiverTrackingInfos.contains(receiverId)) { + val scheduledExecutors = receiverTrackingInfos(receiverId).scheduledExecutors + if (scheduledExecutors.nonEmpty) { + val executors = getExecutors.toSet + // Only return the alive executors + scheduledExecutors.get.filter(executors) + } else { + Nil + } + } else { + Nil + } + } + /** * Start a receiver along with its scheduled executors */ From c70690fbe1f3e94534b4bf4d0cb5ff9dd626f2b3 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 24 Aug 2015 23:43:30 +0800 Subject: [PATCH 5/8] Fix ClosureCleaner warning --- .../apache/spark/streaming/scheduler/ReceiverTracker.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 9a6b3cb318c3..906e4eb29028 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -528,7 +528,10 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration) // Function to start the receiver on the worker node - val startReceiverFunc = new StartReceiverFunc(checkpointDirOption, serializableHadoopConf) + val startReceiverFunc: Iterator[Receiver[_]] => Unit = + (i: Iterator[Receiver[_]]) => { + new StartReceiverFunc(checkpointDirOption, serializableHadoopConf)(i) + } // Create the RDD using the scheduledExecutors to run the receiver in a Spark job val receiverRDD: RDD[Receiver[_]] = From f2954ebdfeb83aa8adbed804a5534bdf69cb8072 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 25 Aug 2015 09:17:44 +0800 Subject: [PATCH 6/8] Refactor the codes in registerReceiver --- .../streaming/scheduler/ReceiverTracker.scala | 49 +++++++++---------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 7b34f33cfb0f..606239af3952 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -248,34 +248,33 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false } val scheduledExecutors = receiverTrackingInfos(streamId).scheduledExecutors - if (scheduledExecutors.nonEmpty) { - // This receiver is registering and it's scheduled by - // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledExecutors" to check it. - if (!scheduledExecutors.get.contains(hostPort)) { - // Refuse it since it's scheduled to a wrong executor - return false + val accetableExecutors = if (scheduledExecutors.nonEmpty) { + // This receiver is registering and it's scheduled by + // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledExecutors" to check it. + scheduledExecutors.get + } else { + // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling + // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it. + scheduleReceiver(streamId) } + + if (!accetableExecutors.contains(hostPort)) { + // Refuse it since it's scheduled to a wrong executor + false } else { - // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling - // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it. - if (!scheduleReceiver(streamId).contains(hostPort)) { - // Refuse it since it's scheduled to a wrong executor - return false - } + val name = s"${typ}-${streamId}" + val receiverTrackingInfo = ReceiverTrackingInfo( + streamId, + ReceiverState.ACTIVE, + scheduledExecutors = None, + runningExecutor = Some(hostPort), + name = Some(name), + endpoint = Some(receiverEndpoint)) + receiverTrackingInfos.put(streamId, receiverTrackingInfo) + listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo)) + logInfo("Registered receiver for stream " + streamId + " from " + senderAddress) + true } - - val name = s"${typ}-${streamId}" - val receiverTrackingInfo = ReceiverTrackingInfo( - streamId, - ReceiverState.ACTIVE, - scheduledExecutors = None, - runningExecutor = Some(hostPort), - name = Some(name), - endpoint = Some(receiverEndpoint)) - receiverTrackingInfos.put(streamId, receiverTrackingInfo) - listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo)) - logInfo("Registered receiver for stream " + streamId + " from " + senderAddress) - true } /** Deregister a receiver */ From b8faa3a010066d54e3194178396a8e9950c6717f Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 25 Aug 2015 09:18:56 +0800 Subject: [PATCH 7/8] Add comments --- .../org/apache/spark/streaming/scheduler/ReceiverTracker.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 606239af3952..978d3ab02a36 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -439,6 +439,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false startReceiver(receiver, executors) } case RestartReceiver(receiver) => + // Old scheduled executors minus the ones that are not active any more val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId) val scheduledExecutors = if (oldScheduledExecutors.nonEmpty) { // Try global scheduling again From 55fe99e4d0ef19e26f70374ac356a9058d03c5e0 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 25 Aug 2015 09:28:44 +0800 Subject: [PATCH 8/8] Change StartReceiverFunc to a closure --- .../streaming/scheduler/ReceiverTracker.scala | 45 +++++++------------ 1 file changed, 15 insertions(+), 30 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 978d3ab02a36..3d532a675db0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -529,8 +529,21 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false // Function to start the receiver on the worker node val startReceiverFunc: Iterator[Receiver[_]] => Unit = - (i: Iterator[Receiver[_]]) => { - new StartReceiverFunc(checkpointDirOption, serializableHadoopConf)(i) + (iterator: Iterator[Receiver[_]]) => { + if (!iterator.hasNext) { + throw new SparkException( + "Could not start receiver as object not found.") + } + if (TaskContext.get().attemptNumber() == 0) { + val receiver = iterator.next() + assert(iterator.hasNext == false) + val supervisor = new ReceiverSupervisorImpl( + receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) + supervisor.start() + supervisor.awaitTermination() + } else { + // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. + } } // Create the RDD using the scheduledExecutors to run the receiver in a Spark job @@ -588,31 +601,3 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false } } - -/** - * Function to start the receiver on the worker node. Use a class instead of closure to avoid - * the serialization issue. - */ -private[streaming] class StartReceiverFunc( - checkpointDirOption: Option[String], - serializableHadoopConf: SerializableConfiguration) - extends (Iterator[Receiver[_]] => Unit) with Serializable { - - override def apply(iterator: Iterator[Receiver[_]]): Unit = { - if (!iterator.hasNext) { - throw new SparkException( - "Could not start receiver as object not found.") - } - if (TaskContext.get().attemptNumber() == 0) { - val receiver = iterator.next() - assert(iterator.hasNext == false) - val supervisor = new ReceiverSupervisorImpl( - receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) - supervisor.start() - supervisor.awaitTermination() - } else { - // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. - } - } - -}