-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-5681][Streaming] Move 'stopReceivers' to the event loop to resolve the race condition #6294
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…meout Conflicts: streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
…meout Conflicts: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
…d registering receivers happen at the same time.
|
Test build #33170 has finished for PR 6294 at commit
|
|
Do you need explicit locking? Wouldn't |
Using |
|
Test build #33172 has finished for PR 6294 at commit
|
|
Looks good. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this test deterministic? I dont see how it is guaranteed to test the situation where the context is stopped before the receiver has started?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test cannot always reproduce the issue. But the probability is high.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please comment this at the top of this unit test that this is not a deterministic unit. If this unit test is flaky, then there is definitely something wrong. Point to the JIRa.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, could you run this unit test locally repeatedly ... like 1000 times .. to see if it fail any time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran StreamingContextSuite 100 times locally and it didn't fail. StreamingContextSuite needs about 40 seconds, so I only ran 100 times.
|
Test build #33286 has finished for PR 6294 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also convert this case Throwable to case NonFatal(e)? I had caught Throwable in a number of places but later learnt that we should not catch all throwable as Scala uses throwables for some control logic. Best to always catch NonFatal(e)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. I learnt that the hard way too. See https://www.sumologic.com/2014/05/05/why-you-should-never-catch-throwable-in-scala/ and http://www.scala-lang.org/api/current/scala/util/control/ControlThrowable.html
|
Merge conflicts in this PR. |
Fixed |
|
Test build #33351 has finished for PR 6294 at commit
|
|
@tdas updated as per our discussion offline. |
|
@tdas moved |
|
Test build #37496 has finished for PR 6294 at commit
|
|
@harishreedharan Yeah, my bad. I had a offline discussion with @zsxwing about this. Our initial attempt to merge this PR into the receiver scheduling PR #7276 made that PR very complex and hard to reason about with a lot of locks. So we decided to split the PR back, and solve each one separately. So the plan is that we are first going to solve this PR without adding any locks. Then it will be easier to reason about when #7276 is applied, hopefully without introducing locks as well. |
|
Thanks! Makes sense |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No reason to return true as default. Keep this abstract.
|
retest this please |
|
Test build #37579 has finished for PR 6294 at commit
|
|
Test build #30 has finished for PR 6294 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to fixed, this could get stuck indefinitely. But different PR.
|
Test build #37628 has finished for PR 6294 at commit
|
|
LGTM. Merging this to master. Thanks! |
The design doc: https://docs.google.com/document/d/1ZsoRvHjpISPrDmSjsGzuSu8UjwgbtmoCTzmhgTurHJw/edit?usp=sharing Author: zsxwing <[email protected]> Closes #7276 from zsxwing/receiver-scheduling and squashes the following commits: 137b257 [zsxwing] Add preferredNumExecutors to rescheduleReceiver 61a6c3f [zsxwing] Set state to ReceiverState.INACTIVE in deregisterReceiver 5e1fa48 [zsxwing] Fix the code style 7451498 [zsxwing] Move DummyReceiver back to ReceiverTrackerSuite 715ef9c [zsxwing] Rename: scheduledLocations -> scheduledExecutors; locations -> executors 05daf9c [zsxwing] Use receiverTrackingInfo.toReceiverInfo 1d6d7c8 [zsxwing] Merge branch 'master' into receiver-scheduling 8f93c8d [zsxwing] Use hostPort as the receiver location rather than host; fix comments and unit tests 59f8887 [zsxwing] Schedule all receivers at the same time when launching them 075e0a3 [zsxwing] Add receiver RDD name; use '!isTrackerStarted' instead 276a4ac [zsxwing] Remove "ReceiverLauncher" and move codes to "launchReceivers" fab9a01 [zsxwing] Move methods back to the outer class 4e639c4 [zsxwing] Fix unintentional changes f60d021 [zsxwing] Reorganize ReceiverTracker to use an event loop for lock free 105037e [zsxwing] Merge branch 'master' into receiver-scheduling 5fee132 [zsxwing] Update tha scheduling algorithm to avoid to keep restarting Receiver 9e242c8 [zsxwing] Remove the ScheduleReceiver message because we can refuse it when receiving RegisterReceiver a9acfbf [zsxwing] Merge branch 'squash-pr-6294' into receiver-scheduling 881edb9 [zsxwing] ReceiverScheduler -> ReceiverSchedulingPolicy e530bcc [zsxwing] [SPARK-5681][Streaming] Use a lock to eliminate the race condition when stopping receivers and registering receivers happen at the same time #6294 3b87e4a [zsxwing] Revert SparkContext.scala a86850c [zsxwing] Remove submitAsyncJob and revert JobWaiter f549595 [zsxwing] Add comments for the scheduling approach 9ecc08e [zsxwing] Fix comments and code style 28d1bee [zsxwing] Make 'host' protected; rescheduleReceiver -> getAllowedLocations 2c86a9e [zsxwing] Use tryFailure to support calling jobFailed multiple times ca6fe35 [zsxwing] Add a test for Receiver.restart 27acd45 [zsxwing] Add unit tests for LoadBalanceReceiverSchedulerImplSuite cc76142 [zsxwing] Add JobWaiter.toFuture to avoid blocking threads d9a3e72 [zsxwing] Add a new Receiver scheduling mechanism
|
Hi @tdas and @zsxwing , is it feasible to back port this patch to branch 1.4? This patch fixes a restart issue here by updating the state before calling if (onReceiverStart()) {
logInfo("Starting receiver")
receiverState = Started
receiver.onStart()
logInfo("Called receiver onStart")Previously this state is updated after calling |
|
I think patch should apply cleanly on branch-1.4. But this is a non-trivial patch, as it changes the state changes in the ReceiverTracker. So it is slightly scary. What do you think @zsxwing? |
…ks.jira.com/browse/BUG-40311) [SPARK-5681] [STREAMING] Move 'stopReceivers' to the event loop to resolve the race condition This is an alternative way to fix `SPARK-5681`. It minimizes the changes. Closes apache#4467 Author: zsxwing <[email protected]> Author: Liang-Chi Hsieh <[email protected]> Closes apache#6294 from zsxwing/pr4467 and squashes the following commits: 709ac1f [zsxwing] Fix the comment e103e8a [zsxwing] Move ReceiverTracker.stop into ReceiverTracker.stop f637142 [zsxwing] Address minor code style comments a178d37 [zsxwing] Move 'stopReceivers' to the event looop to resolve the race condition 51fb07e [zsxwing] Fix the code style 3cb19a3 [zsxwing] Merge branch 'master' into pr4467 b4c29e7 [zsxwing] Stop receiver only if we start it c41ee94 [zsxwing] Make stopReceivers private 7c73c1f [zsxwing] Use trackerStateLock to protect trackerState a8120c0 [zsxwing] Merge branch 'master' into pr4467 7b1d9af [zsxwing] "case Throwable" => "case NonFatal" 15ed4a1 [zsxwing] Register before starting the receiver fff63f9 [zsxwing] Use a lock to eliminate the race condition when stopping receivers and registering receivers happen at the same time. e0ef72a [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into tracker_status_timeout 19b76d9 [Liang-Chi Hsieh] Remove timeout. 34c18dc [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into tracker_status_timeout c419677 [Liang-Chi Hsieh] Fix style. 9e1a760 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into tracker_status_timeout 355f9ce [Liang-Chi Hsieh] Separate register and start events for receivers. 3d568e8 [Liang-Chi Hsieh] Let receivers get registered first before going started. ae0d9fd [Liang-Chi Hsieh] Merge branch 'master' into tracker_status_timeout 77983f3 [Liang-Chi Hsieh] Add tracker status and stop to receive messages when stopping tracker.
This is an alternative way to fix
SPARK-5681. It minimizes the changes.Closes #4467