File tree Expand file tree Collapse file tree 3 files changed +5
-5
lines changed
main/scala/org/apache/spark/streaming/receiver
test/scala/org/apache/spark/streaming Expand file tree Collapse file tree 3 files changed +5
-5
lines changed Original file line number Diff line number Diff line change @@ -64,7 +64,7 @@ private[streaming] abstract class ReceiverSupervisor(
6464 /** State of the receiver */
6565 @ volatile private [streaming] var receiverState = Initialized
6666
67- val host = Utils .localHostName()
67+ protected val host = Utils .localHostName()
6868
6969 /** Push a single data item to backend data store. */
7070 def pushSingle (data : Any )
@@ -163,7 +163,7 @@ private[streaming] abstract class ReceiverSupervisor(
163163 stopReceiver(" Restarting receiver with delay " + delay + " ms: " + message, error)
164164 logDebug(" Sleeping for " + delay)
165165 Thread .sleep(delay)
166- val scheduledLocations = rescheduleReceiver ()
166+ val scheduledLocations = getAllowedLocations ()
167167 if (scheduledLocations.isEmpty || scheduledLocations.contains(host)) {
168168 logInfo(" Starting receiver again" )
169169 startReceiver()
@@ -175,7 +175,7 @@ private[streaming] abstract class ReceiverSupervisor(
175175 }
176176
177177 /** Reschedule this receiver and return a candidate executor list */
178- def rescheduleReceiver (): Seq [String ] = Seq .empty
178+ def getAllowedLocations (): Seq [String ] = Seq .empty
179179
180180 /** Check if receiver has been marked for stopping */
181181 def isReceiverStarted (): Boolean = {
Original file line number Diff line number Diff line change @@ -183,7 +183,7 @@ private[streaming] class ReceiverSupervisorImpl(
183183 receivedBlockHandler.cleanupOldBlocks(cleanupThreshTime.milliseconds)
184184 }
185185
186- override def rescheduleReceiver (): Seq [String ] = {
186+ override def getAllowedLocations (): Seq [String ] = {
187187 trackerEndpoint.askWithRetry[Seq [String ]](ScheduleReceiver (streamId))
188188 }
189189}
Original file line number Diff line number Diff line change @@ -132,7 +132,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
132132 test(" restart receiver should consider the scheduled locations" ) {
133133 val receiver = new FakeReceiver
134134 val executor = new FakeReceiverSupervisor (receiver) {
135- override def rescheduleReceiver : Seq [String ] = Seq (" unknown-host" )
135+ override def getAllowedLocations : Seq [String ] = Seq (" unknown-host" )
136136 }
137137 executor.start()
138138 receiver.restart(" force the receiver restart" )
You can’t perform that action at this time.
0 commit comments