File tree Expand file tree Collapse file tree 2 files changed +1
-13
lines changed
main/scala/org/apache/spark/streaming/receiver
test/scala/org/apache/spark/streaming Expand file tree Collapse file tree 2 files changed +1
-13
lines changed Original file line number Diff line number Diff line change @@ -101,7 +101,7 @@ private[streaming] abstract class ReceiverSupervisor(
101101 protected def onStop (message : String , error : Option [Throwable ]) { }
102102
103103 /** Called when receiver is started. Return true if the driver accepts us */
104- protected def onReceiverStart (): Boolean = true
104+ protected def onReceiverStart (): Boolean
105105
106106 /** Called when receiver is stopped */
107107 protected def onReceiverStop (message : String , error : Option [Throwable ]) { }
Original file line number Diff line number Diff line change @@ -129,18 +129,6 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
129129 }
130130 }
131131
132- test(" restart receiver should consider the scheduled locations" ) {
133- val receiver = new FakeReceiver
134- val executor = new FakeReceiverSupervisor (receiver)
135- executor.start()
136- receiver.restart(" force the receiver restart" )
137- eventually(timeout(30000 millis), interval(10 millis)) {
138- // Since the scheduled location is not the current host, the receiver should exit
139- assert(receiver.onStopCalled)
140- assert(receiver.isStopped)
141- }
142- }
143-
144132 test(" block generator" ) {
145133 val blockGeneratorListener = new FakeBlockGeneratorListener
146134 val blockIntervalMs = 200
You can’t perform that action at this time.
0 commit comments