Skip to content

Commit 28d1bee

Browse files
committed
Make 'host' protected; rescheduleReceiver -> getAllowedLocations
1 parent 2c86a9e commit 28d1bee

File tree

3 files changed

+5
-5
lines changed

3 files changed

+5
-5
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ private[streaming] abstract class ReceiverSupervisor(
6464
/** State of the receiver */
6565
@volatile private[streaming] var receiverState = Initialized
6666

67-
val host = Utils.localHostName()
67+
protected val host = Utils.localHostName()
6868

6969
/** Push a single data item to backend data store. */
7070
def pushSingle(data: Any)
@@ -163,7 +163,7 @@ private[streaming] abstract class ReceiverSupervisor(
163163
stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
164164
logDebug("Sleeping for " + delay)
165165
Thread.sleep(delay)
166-
val scheduledLocations = rescheduleReceiver()
166+
val scheduledLocations = getAllowedLocations()
167167
if (scheduledLocations.isEmpty || scheduledLocations.contains(host)) {
168168
logInfo("Starting receiver again")
169169
startReceiver()
@@ -175,7 +175,7 @@ private[streaming] abstract class ReceiverSupervisor(
175175
}
176176

177177
/** Reschedule this receiver and return a candidate executor list */
178-
def rescheduleReceiver(): Seq[String] = Seq.empty
178+
def getAllowedLocations(): Seq[String] = Seq.empty
179179

180180
/** Check if receiver has been marked for stopping */
181181
def isReceiverStarted(): Boolean = {

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ private[streaming] class ReceiverSupervisorImpl(
183183
receivedBlockHandler.cleanupOldBlocks(cleanupThreshTime.milliseconds)
184184
}
185185

186-
override def rescheduleReceiver(): Seq[String] = {
186+
override def getAllowedLocations(): Seq[String] = {
187187
trackerEndpoint.askWithRetry[Seq[String]](ScheduleReceiver(streamId))
188188
}
189189
}

streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
132132
test("restart receiver should consider the scheduled locations") {
133133
val receiver = new FakeReceiver
134134
val executor = new FakeReceiverSupervisor(receiver) {
135-
override def rescheduleReceiver: Seq[String] = Seq("unknown-host")
135+
override def getAllowedLocations: Seq[String] = Seq("unknown-host")
136136
}
137137
executor.start()
138138
receiver.restart("force the receiver restart")

0 commit comments

Comments
 (0)