Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Feb 9, 2015

Related to #4364.

Sometimes the receiver will be registered into tracker after ssc.stop() is called. Especially when stop() is called immediately after start(). So the receiver doesn't get the StopReceiver message from the tracker. In this case, when you call stop() in graceful mode, stop() would get stuck indefinitely.

This pr adds a status to ReceiverTracker and asks ReceiverTracker stop to receive messages when stopping.

This also adds a timeout check to ReceiverLauncher.stop.

@viirya
Copy link
Member Author

viirya commented Feb 9, 2015

cc @tdas.

@SparkQA
Copy link

SparkQA commented Feb 9, 2015

Test build #27091 has finished for PR 4467 at commit 77983f3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Feb 10, 2015

@tdas Please take a look of this when you have time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this visible to anything outside the class?

@tdas
Copy link
Contributor

tdas commented Feb 11, 2015

I realized that this is a tricky thing to fix while maintaining the stopGracefully semantics. Stop gracefully must ensure that if there are receivers that have already started, they must be stopped and all the received data processed before stopping completely. But what happens to the receivers that are still starting and have not registered yet? We have to wait for them to all be started, because if we dont, they may have started and pull data already, which may lead to loosing data. This is not good.

So to solve this correctly. We probably need a Starting state as well. And stopGracefully must stasrt the stoppign process only after the system has reached Started state. So it has to wait for all the receivers to have started, otherwise it is hard to guarantee that all the receivers are correctly stopped.

Also, this behavior must be properly unit tested with different state transitions, etc. Even before that, I would like to see what is the ideal state behavior -

  • if state = X, then allow register,
  • if state = Y do not start stopping,
    etc .

@viirya
Copy link
Member Author

viirya commented Feb 11, 2015

The state behavior should be:

  • If state = Initialized, then disallow all actions. (actor is not initialized yet)
  • If state = Started, then allow all actions {register, addblock, report, deregister}.
  • If state = Stopping, then disallow {register, addblock}, allow {report, deregister}.
  • If state = Stopped, then disallow all actions. (actor is destroyed)

For the receivers that are still starting and have not registered yet, we have two options.

  1. As you said, we have to wait for them to all be started.
  2. We ignore their register messages and let them timeout. So they simply fail their starting process and don't process any data.

I think both options guarantee no data would be lost. I was thinking using option 2 in this pr. Because it should be more simple and, semantically we should not allow receivers to register and then process data after stop is called.

I just realized that the current implementation of ReceiverSupervisor calls receiver.onStart before sending RegisterReceiver message. Thus it is possible the receiver are already started and process data. I think it is incorrect behavior. Correct one should be getting registered first and then begin to start the receiver. Otherwise, the receiver may process data and store data before it has registered successfully. It might cause some problems hard to detect. We should change the order so it register with tracer first then begin starting procedure.

The important reason I think we don't choose option 1 to wait for receivers to all be started is, from the tracker's aspect, it has not idea what receivers are started or not. It just asynchronously waits for them to register and deregister. The receivers are visible to the tracker only when they are registered with it. When it is going to stop, because it doesn't know if there are receivers started but not registered yet, so it doesn't know how longer it should wait for them. Thus it is safer to make sure that the receivers must register before they start.

@viirya
Copy link
Member Author

viirya commented Feb 11, 2015

It should accept addblock even it is stopping because there might be receivers processing data.

Modified state behavior should be:

  • If state = Initialized, then disallow all actions. (actor is not initialized yet)
  • If state = Started, then allow all actions {register, addblock, report, deregister}.
  • If state = Stopping, then disallow {register}, allow {addblock, report, deregister}.
  • If state = Stopped, then disallow all actions. (actor is destroyed)

@SparkQA
Copy link

SparkQA commented Feb 11, 2015

Test build #27273 has finished for PR 4467 at commit 3d568e8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class TaskCommitDenied(
    • class CommitDeniedException(
    • class OutputCommitCoordinatorActor(outputCommitCoordinator: OutputCommitCoordinator)

@SparkQA
Copy link

SparkQA commented Feb 11, 2015

Test build #27277 has finished for PR 4467 at commit 355f9ce.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class TaskCommitDenied(
    • class CommitDeniedException(
    • class OutputCommitCoordinatorActor(outputCommitCoordinator: OutputCommitCoordinator)
    • case class StreamingListenerReceiverRegistered(receiverInfo: ReceiverInfo)

@tdas
Copy link
Contributor

tdas commented Feb 11, 2015

I dont think that correct. If the state becomes Stopping before a receiver
has registered, it may so happen that the receiver starts up and receives
some data and puts the data in its buffer/block manager. Since it was not
registered, it wont get a stop receiver message. So it will continue
receiving messages and will never be stopped. Isnt it?

That's why I think option 1 is cleanest. Wait for everything to start up,
so that we cleanly and correctly stop all of them.

On Tue, Feb 10, 2015 at 11:45 PM, UCB AMPLab [email protected]
wrote:

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27277/
Test PASSed.


Reply to this email directly or view it on GitHub
#4467 (comment).

@viirya
Copy link
Member Author

viirya commented Feb 12, 2015

Let's analyze it clearly. The following is a simplified status transformation of the problem:

time | tracker | receivers
t = 1 | started | registered:{A, B}; starting, not registered: {C}
t = 2 | stopping | got stop msg:{A, B}; starting, not registered: {C}
t = 3 | stopping | stopped:{A, B}; registered: {C}

The above causes potential data loss. We want to avoid that. I agree.

If we implement option 1, now the status transformation:

time | tracker | receivers
t = 1 | started | registered:{A, B}; starting, not registered: {C}
t = 2 | stopping | got stop msg:{A, B}; starting, not registered: {C}

*we are going to wait for receivers that are started but not registered yet.
*suppose we wait a fixed time period n.
*however, we can't guarantee when the receiver C will be registered.
*so, after waiting time n, the system status can be:

t = n+2 | stopping | stopped:{A, B}; registered: {C}
*or still
t = n+2 | stopping | stopped:{A, B}; starting, not registered: {C}

As you see, there will still be possible status that we have unregistered receiver C that processes data.

This pr implements another approach. The receivers register first then do starting process:

time | tracker | receivers
t = 1 | started | registered, started:{A, B}; registered, starting: {C}
t = 2 | stopping | got stop msg:{A, B, C}; **D wants to register -> timeout
t = 3 | stopping | stopped:{A, B, C}

@viirya
Copy link
Member Author

viirya commented Feb 13, 2015

@tdas, Do you have time to take a look of the analysis and the current implementation?

@tdas
Copy link
Contributor

tdas commented Feb 13, 2015

Sorry I am bit tied with stuff. I will definitely take a look as soon as i
get a chance. :)

On Fri, Feb 13, 2015 at 8:24 AM, Liang-Chi Hsieh [email protected]
wrote:

@tdas https://github.com/tdas, Do you have time to take a look of the
analysis and the current implementation?


Reply to this email directly or view it on GitHub
#4467 (comment).

@tdas
Copy link
Contributor

tdas commented Mar 7, 2015

Hey ... lets continue the discussion. I took a quick look at the logic, sounds good. Let me think a bit more and look at the code.

@viirya
Copy link
Member Author

viirya commented Mar 17, 2015

@tdas Any updated ideas or comments?

@viirya
Copy link
Member Author

viirya commented Mar 23, 2015

/cc @tdas Still busy?

@tdas
Copy link
Contributor

tdas commented Apr 3, 2015

@viirya Sorry for slacking on this, been busy. I think understand your explanation. But I also spent some more time thinking about this ground up.

Correct me if I am wrong, but the thing was getting stuck because of this line
What happens in this line? The receiverInfo should be empty because whatever had registered, would deregister and make the map empty. So that cannot be the reason of the hang. So the only reason it will stay stuck in that line is because running = true. running is set to false only if the job that runs the receiver completes (this line). The fact that it is stuck indefinitely that means this job never completes.

That can happen only if the receiver (C in your examples) that had not registered by the time "stop gracefully" was called is somehow running indefinitely. That is because it had registered and started running even if the system was stopping gracefully. Ideally, it should have never been allowed to register at all! That is, the ReceiverTracker should prevent any further registration as soon as it gets a stop signal.

In this solution, the sequence of the events will be.

time | tracker | receivers
t = 1 | started | registered:{A, B}; starting, not registered: {C}
t = 2 | stopping | got stop msg:{A, B}; starting, not registered: {C}
t = 3 | stopping | stopped:{A, B}; attempts to register but denied by tracker, never starts itself: {C}

Since C is not allowed to start, its stops itself and the task completes. The jobs completes, running = false, tracker stops.

t = 4 | stopped | stopped:{A, B}; stopped: {C}

No data is lost in this case, because C is never allowed to be started.

Isnt this a viable solution? If so, I think this is simpler than introducing another state.

@viirya
Copy link
Member Author

viirya commented Apr 7, 2015

Receiver.onStart() is called before it sends RegisterReceiver message to tracker for registration.

In Receiver.onStart(), it already begins to pull data. So in your cases, receiver {C} is possibly to lose its data.

We know that the thing was getting stuck because sometimes receiver will register after tracker is going to stop. So the receiver will not get stop message properly.

As you said, we can disallow its registration. However, because it already pulls data, it may lose received data.

This solution I proposed, also disallows receiver to register. Besides, it moves registration ahead of starting. So a receiver is only going to pull data after it is registered with the tracker. And all registered receivers are properly received stop messages. By doing that, we guarantee no data loss for receivers.

…meout

Conflicts:
	streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala
	streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@SparkQA
Copy link

SparkQA commented Apr 7, 2015

Test build #29785 has finished for PR 4467 at commit 9e1a760.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class StreamingListenerReceiverRegistered(receiverInfo: ReceiverInfo)
  • This patch does not change any dependencies.

@SparkQA
Copy link

SparkQA commented Apr 7, 2015

Test build #29786 has finished for PR 4467 at commit c419677.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class StreamingListenerReceiverRegistered(receiverInfo: ReceiverInfo)
  • This patch does not change any dependencies.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to send something back, or trackerActor.ask(msg)(askTimeout) will wait until timeout.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose that returning false if isTrackerStopping == true. And if onReceiverRegister receives false, it just throws an exception.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was originally intended to let it timeout and throw exception. Returning false and throw exception is good too. I will update it.

@zsxwing
Copy link
Member

zsxwing commented May 4, 2015

Could you resolve the conflicts?

…meout

Conflicts:
	streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@SparkQA
Copy link

SparkQA commented May 5, 2015

Test build #31869 has finished for PR 4467 at commit 34c18dc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class StreamingListenerReceiverRegistered(receiverInfo: ReceiverInfo)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Timeout logic will change the stopGracefully semantics. The stopGracefully semantics should be if ssc.stop(..., stopGracefully = true) returns normally, no data loss will happen. But after your change, if ssc.stop(..., stopGracefully = true) returns normally, the user won't know if everything goes smooth. There is no signal here to help the user understand what happens internally.

I vote for keeping the original codes unchanged.

@SparkQA
Copy link

SparkQA commented May 10, 2015

Test build #32342 has finished for PR 4467 at commit e0ef72a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class StreamingListenerReceiverRegistered(receiverInfo: ReceiverInfo)

@viirya viirya closed this May 21, 2015
asfgit pushed a commit that referenced this pull request Jul 17, 2015
…solve the race condition

This is an alternative way to fix `SPARK-5681`. It minimizes the changes.

Closes #4467

Author: zsxwing <[email protected]>
Author: Liang-Chi Hsieh <[email protected]>

Closes #6294 from zsxwing/pr4467 and squashes the following commits:

709ac1f [zsxwing] Fix the comment
e103e8a [zsxwing] Move ReceiverTracker.stop into ReceiverTracker.stop
f637142 [zsxwing] Address minor code style comments
a178d37 [zsxwing] Move 'stopReceivers' to the event looop to resolve the race condition
51fb07e [zsxwing] Fix the code style
3cb19a3 [zsxwing] Merge branch 'master' into pr4467
b4c29e7 [zsxwing] Stop receiver only if we start it
c41ee94 [zsxwing] Make stopReceivers private
7c73c1f [zsxwing] Use trackerStateLock to protect trackerState
a8120c0 [zsxwing] Merge branch 'master' into pr4467
7b1d9af [zsxwing] "case Throwable" => "case NonFatal"
15ed4a1 [zsxwing] Register before starting the receiver
fff63f9 [zsxwing] Use a lock to eliminate the race condition when stopping receivers and registering receivers happen at the same time.
e0ef72a [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into tracker_status_timeout
19b76d9 [Liang-Chi Hsieh] Remove timeout.
34c18dc [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into tracker_status_timeout
c419677 [Liang-Chi Hsieh] Fix style.
9e1a760 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into tracker_status_timeout
355f9ce [Liang-Chi Hsieh] Separate register and start events for receivers.
3d568e8 [Liang-Chi Hsieh] Let receivers get registered first before going started.
ae0d9fd [Liang-Chi Hsieh] Merge branch 'master' into tracker_status_timeout
77983f3 [Liang-Chi Hsieh] Add tracker status and stop to receive messages when stopping tracker.
jerryshao pushed a commit to jerryshao/apache-spark that referenced this pull request Sep 8, 2015
…ks.jira.com/browse/BUG-40311)

[SPARK-5681] [STREAMING] Move 'stopReceivers' to the event loop to resolve the race condition

This is an alternative way to fix `SPARK-5681`. It minimizes the changes.

Closes apache#4467

Author: zsxwing <[email protected]>
Author: Liang-Chi Hsieh <[email protected]>

Closes apache#6294 from zsxwing/pr4467 and squashes the following commits:

709ac1f [zsxwing] Fix the comment
e103e8a [zsxwing] Move ReceiverTracker.stop into ReceiverTracker.stop
f637142 [zsxwing] Address minor code style comments
a178d37 [zsxwing] Move 'stopReceivers' to the event looop to resolve the race condition
51fb07e [zsxwing] Fix the code style
3cb19a3 [zsxwing] Merge branch 'master' into pr4467
b4c29e7 [zsxwing] Stop receiver only if we start it
c41ee94 [zsxwing] Make stopReceivers private
7c73c1f [zsxwing] Use trackerStateLock to protect trackerState
a8120c0 [zsxwing] Merge branch 'master' into pr4467
7b1d9af [zsxwing] "case Throwable" => "case NonFatal"
15ed4a1 [zsxwing] Register before starting the receiver
fff63f9 [zsxwing] Use a lock to eliminate the race condition when stopping receivers and registering receivers happen at the same time.
e0ef72a [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into tracker_status_timeout
19b76d9 [Liang-Chi Hsieh] Remove timeout.
34c18dc [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into tracker_status_timeout
c419677 [Liang-Chi Hsieh] Fix style.
9e1a760 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into tracker_status_timeout
355f9ce [Liang-Chi Hsieh] Separate register and start events for receivers.
3d568e8 [Liang-Chi Hsieh] Let receivers get registered first before going started.
ae0d9fd [Liang-Chi Hsieh] Merge branch 'master' into tracker_status_timeout
77983f3 [Liang-Chi Hsieh] Add tracker status and stop to receive messages when stopping tracker.
@viirya viirya deleted the tracker_status_timeout branch December 27, 2023 18:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants