Skip to content

Commit 9e242c8

Browse files
committed
Remove the ScheduleReceiver message because we can refuse it when receiving RegisterReceiver
1 parent a9acfbf commit 9e242c8

File tree

4 files changed

+9
-24
lines changed

4 files changed

+9
-24
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -175,20 +175,12 @@ private[streaming] abstract class ReceiverSupervisor(
175175
stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
176176
logDebug("Sleeping for " + delay)
177177
Thread.sleep(delay)
178-
val scheduledLocations = getAllowedLocations()
179-
if (scheduledLocations.isEmpty || scheduledLocations.contains(host)) {
180-
logInfo("Starting receiver again")
181-
startReceiver()
182-
logInfo("Receiver started again")
183-
} else {
184-
stop("Receiver is scheduled to another executor", None)
185-
}
178+
logInfo("Starting receiver again")
179+
startReceiver()
180+
logInfo("Receiver started again")
186181
}(futureExecutionContext)
187182
}
188183

189-
/** Return a list of candidate executors to run the receiver */
190-
def getAllowedLocations(): Seq[String] = Seq.empty
191-
192184
/** Check if receiver has been marked for starting */
193185
def isReceiverStarted(): Boolean = {
194186
logDebug("state = " + receiverState)

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,4 @@ private[streaming] class ReceiverSupervisorImpl(
183183
receivedBlockHandler.cleanupOldBlocks(cleanupThreshTime.milliseconds)
184184
}
185185

186-
override def getAllowedLocations(): Seq[String] = {
187-
trackerEndpoint.askWithRetry[Seq[String]](ScheduleReceiver(streamId))
188-
}
189186
}

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,6 @@ private[streaming] case class ReportError(streamId: Int, message: String, error:
5858
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String, error: String)
5959
extends ReceiverTrackerMessage
6060

61-
/** It's used to ask ReceiverTracker to return a candidate executor list to run the receiver */
62-
private[streaming] case class ScheduleReceiver(streamId: Int) extends ReceiverTrackerMessage
63-
6461
/**
6562
* This class manages the execution of the receivers of ReceiverInputDStreams. Instance of
6663
* this class must be created after all input streams have been added and StreamingContext.start()
@@ -122,12 +119,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
122119
private def isTrackerStarted(): Boolean = trackerStateLock.synchronized {
123120
trackerState == Started
124121
}
125-
122+
126123
/** Check if tracker has been marked for stopping */
127124
private def isTrackerStopping(): Boolean = trackerStateLock.synchronized {
128125
trackerState == Stopping
129126
}
130-
127+
131128
/** Check if tracker has been marked for stopped */
132129
private def isTrackerStopped(): Boolean = trackerStateLock.synchronized {
133130
trackerState == Stopped
@@ -219,6 +216,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
219216
trackerStateLock.synchronized {
220217
if (isTrackerStopping || isTrackerStopped) {
221218
false
219+
} else if (!ssc.sparkContext.isLocal && // We don't need to schedule it in the local mode
220+
!scheduleReceiver(streamId).contains(host)) {
221+
false
222222
} else {
223223
// When updating "receiverInfo", we should make sure "trackerState" won't be changed at the
224224
// same time. Therefore the following line should be in "trackerStateLock.synchronized".
@@ -305,8 +305,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
305305
case DeregisterReceiver(streamId, message, error) =>
306306
deregisterReceiver(streamId, message, error)
307307
context.reply(true)
308-
case ScheduleReceiver(streamId) =>
309-
context.reply(scheduleReceiver(streamId))
310308
}
311309
}
312310

streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
131131

132132
test("restart receiver should consider the scheduled locations") {
133133
val receiver = new FakeReceiver
134-
val executor = new FakeReceiverSupervisor(receiver) {
135-
override def getAllowedLocations: Seq[String] = Seq("unknown-host")
136-
}
134+
val executor = new FakeReceiverSupervisor(receiver)
137135
executor.start()
138136
receiver.restart("force the receiver restart")
139137
eventually(timeout(30000 millis), interval(10 millis)) {

0 commit comments

Comments
 (0)