Skip to content

Commit 4afad9c

Browse files
ilayaperumalgtdas
authored andcommitted
[SPARK-4803] [streaming] Remove duplicate RegisterReceiver message
- The ReceiverTracker receivers `RegisterReceiver` messages two times 1) When the actor at `ReceiverSupervisorImpl`'s preStart is invoked 2) After the receiver is started at the executor `onReceiverStart()` at `ReceiverSupervisorImpl` Though, RegisterReceiver message uses the same streamId and the receiverInfo gets updated everytime the message is processed at the `ReceiverTracker`, it makes sense to call register receiver only after the receiver is started. Author: Ilayaperumal Gopinathan <[email protected]> Closes apache#3648 from ilayaperumalg/RTActor-remove-prestart and squashes the following commits: 868efab [Ilayaperumal Gopinathan] Increase receiverInfo collector timeout to 2 secs 3118e5e [Ilayaperumal Gopinathan] Fix StreamingListenerSuite's startedReceiverStreamIds size 634abde [Ilayaperumal Gopinathan] Remove duplicate RegisterReceiver message
1 parent debc031 commit 4afad9c

File tree

2 files changed

+2
-9
lines changed

2 files changed

+2
-9
lines changed

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,6 @@ private[streaming] class ReceiverSupervisorImpl(
7777
/** Akka actor for receiving messages from the ReceiverTracker in the driver */
7878
private val actor = env.actorSystem.actorOf(
7979
Props(new Actor {
80-
override def preStart() {
81-
logInfo("Registered receiver " + streamId)
82-
val msg = RegisterReceiver(
83-
streamId, receiver.getClass.getSimpleName, Utils.localHostName(), self)
84-
val future = trackerActor.ask(msg)(askTimeout)
85-
Await.result(future, askTimeout)
86-
}
8780

8881
override def receive() = {
8982
case StopReceiver =>

streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
7373

7474
ssc.start()
7575
try {
76-
eventually(timeout(1000 millis), interval(20 millis)) {
77-
collector.startedReceiverStreamIds.size should be >= 1
76+
eventually(timeout(2000 millis), interval(20 millis)) {
77+
collector.startedReceiverStreamIds.size should equal (1)
7878
collector.startedReceiverStreamIds(0) should equal (0)
7979
collector.stoppedReceiverStreamIds should have size 1
8080
collector.stoppedReceiverStreamIds(0) should equal (0)

0 commit comments

Comments
 (0)