-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-3736] Workers reconnect when disassociated from the master. #2828
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Before, if the master node is killed and restarted, the worker nodes would not attempt to reconnect to the Master. Therefore, when the Master node was restarted, the worker nodes needed to be restarted as well. Now, when the Master node is disconnected, the worker nodes will continuously ping the master node in attempts to reconnect to it. Once the master node restarts, it will detect one of the registration requests from its former workers. The result is that the cluster re-enters a healthy state. In addition, when the master does not receive a heartbeat from the worker, the worker was removed; however, when the worker sent a heartbeat to the master, the master used to ignore the heartbeat. Now, a master that receives a heartbeat from a worker that had been disconnected will request the worker to re-attempt the registration process, at which point the worker will send a RegisterWorker request and be re-connected accordingly. Re-connection attempts per worker are submitted every N seconds, where N is configured by the property spark.worker.reconnect.interval - this has a default of 60 seconds right now.
|
Can one of the admins verify this patch? |
|
One remark is that there are no automated tests in this commit for now. I was unsuccessful in setting up TestKit to emulate a worker and master sending messages to each other. I also have not seen any other unit tests that test message passing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scheduledReconnectTask? when I looked at this variable, I expected it to be some case class representing the message itself
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this have a not in it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ash211 what he is trying to do seems to be that, only before we decide this worker is DEAD, we allow the reconnect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above observation is correct - only workers that have previously registered with the master are allowed to reconnect. Workers that are connecting for the first time shouldn't be allowed to spawn a heartbeat and have the master send back a reconnection message. I've updated the log message on an else case to make this more explicit.
- scheduledReconnectMessage --> scheduledReconnectTask - A log statement in the master is printed if a worker that was unregistered and not in its worker set sends a heartbeat.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall this method be private? we call it somewhere else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to reuse registrationRetryTimer in Worker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic would need to be refactored a bit, but it might be doable. It uses the registered flag to determine if it should stop attempts to re-register, and otherwise attempts to reconnect.
If we toggle the registered flag upon disassociation as well we might be able to just call registerWithMaster(). The main question is, do we necessarily want the worker to give up reconnection after a certain number of retries in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
according to @ash211 , "The preferred alternative is to follow what Hadoop does – when there's a disconnect, attempt to reconnect at a particular interval until successful (I think it repeats indefinitely every 10sec).", I think we can do the same thing...just let the thread try infinitely
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case we can't directly use registrationRetryTimer, as that explicitly kills the worker after a certain number of retries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see....then I will vote to do something different with Hadoop by reusing registrationRetryTimer....otherwise the inconsistency of the logic in the two similar code blocks makes the program a bit fishy....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm.....now, I think exit after several retries might be better,
In your case, without restarting the worker after the restarting master may bring some problems, especially when the user didn't set RECOVERY_MODE, all application information is lost, for instance, the application whose resource requirement hasn't been filled will not be served anymore....the complete system will run in a weird status, so you eventually need to restart the applications (i.e. kill executors -> restart , which is equivalent to restart all workers)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure about the motivation of that Hadoop let tasktracker retries forever.....might be different with our case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ash211 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dug into Hadoop source and actually found out that the default policy for Hadoop reconnects is to retry every 10 seconds for 6 attempts, and then every 60 seconds for 10 attempts. Each attempt also has a fuzz factor applied of [0.5t, 1.5t] to prevent a thundering herd of reconnect attempts across the cluster.
I don't have a strong opinion on infinite vs ~10min of retries -- I'd vote for following Hadoop's lead unless presented with compelling arguments to do something different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
add to whitelist |
|
QA tests have started for PR 2828 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Warning? Seems more natural to me for the disconnect/failure to reply to be a WARN, but the subsequent reconnect request and related actions to just be INFO-level events.
|
QA tests have started for PR 2828 at commit
|
|
QA tests have finished for PR 2828 at commit
|
|
Test PASSed. |
|
QA tests have finished for PR 2828 at commit
|
|
Test PASSed. |
The logic of the worker reconnecting to the master is now shared with the logic of attempting to connect to the master on the worker's startup. Connection is attempted in certain intervals of time. - The first six attempts are in 5 to 15 second intervals, and - The ten attempts after that are in 30 to 90 second intervals. The exact intervals between attempts are randomized in that range, in order to introduce some jitter and prevent the master from being hit with giant bursts of registration requests. This model is the same as Hadoop's reconnection model.
|
QA tests have started for PR 2828 at commit
|
|
QA tests have finished for PR 2828 at commit
|
|
Test PASSed. |
|
tl;dr: this patch looks pretty good to me based on the testing that I've done so far. For my own interest / fun, I'd like to find a way to extend my test coverage to include the "worker-initiated reconnect" and "master restart" cases, but my tests shouldn't necessarily block the merging / review of this patch. To summarize my understanding of the failure scenarios that this PR addresses:
These scenarios are similar but there's one distinction: In the first scenario, the master forgets all previously-registered workers; in the second scenario, the master can remember that a worker was previously-registered even though it may now be disassociated. In some of these scenarios, a disconnection may be reflected at the master, worker, or both (perhaps at different times). For example, a master might deregister a worker if it has not received Spark-level heartbeats from it, or a worker might disassociate from a master due to the Akka failure detector being triggered. After this PR, there are two paths that can lead to a worker reconnection:
I've been working on building a Docker-based integration testing framework for testing these sorts of Spark Standalone fault-tolerance issues (to hopefully be released publicly sometime soon). I thought it would be interesting to test the "master stays alive but deregisters workers due to not receiving heartbeats" case by simulating network issues. In my testing framework, I added a Jepsen-inspired network fault-injector which updates test("workers should reconnect to master if disconnected due to transient network issues") {
// Regression test for SPARK-3736
val env = Seq(
"SPARK_MASTER_OPTS" -> "-Dspark.worker.timeout=2",
"SPARK_WORKER_OPTS" -> "-Dspark.worker.timeout=2 -Dspark.akka.timeout=1 -Dspark.akka.failure-detector.threshold=1 -Dspark.akka.heartbeat.interval=1"
)
cluster = SparkClusters.createStandaloneCluster(env, numWorkers = 1)
val master = cluster.masters.head
val worker = cluster.workers.head
master.getState.liveWorkerIPs.size should be (1)
println("Cluster launched with one worker")
networkFaultInjector.dropTraffic(master.container, worker.container)
networkFaultInjector.dropTraffic(worker.container, master.container)
eventually(timeout(30 seconds), interval(1 seconds)) {
master.getState.liveWorkerIPs.size should be (0)
}
println("Master shows that zero workers are registered after network connection fails")
networkFaultInjector.restore()
eventually(timeout(30 seconds), interval(1 seconds)) {
master.getState.liveWorkerIPs.size should be (1)
}
println("Master shows one worker after network connection is restored")
}While running this against the current Spark master: after I kill the network connection between the master and worker, the master more-or-less immediately times out the worker and disconnects it. However, the worker doesn't realize that it has become deregistered from the master. This happens because the master detects worker liveness using our own heartbeat mechanism, whereas the worker detects master liveness using Akka's failure-detection mechanisms (to see this, note that the worker's As a result, we end up in a scenario where the master receives a heartbeat from the de-registered worker who does not realize that it has been deregistered. Prior to this PR, the worker would never become re-registered. In this PR, the master explicitly asks the worker to reconnect (via the I'm still working on testing the case where the worker receives a DisassociationEvent and initiates the reconnection itself. To do this, I'll need to figure out how to configure the Akka failure detector so that it quickly fails in my testing suite. I'll also need to add a way to query the worker to ask whether it has become disconnected from the master so that I can drop packets for long enough in order to cause a disassociation. For completeness, I should also test the case where I kill the master and bring it back up using the same hostname. This may require a bit of extra scaffolding in my framework (which currently uses container IPs rather than hostnames that I control), but I think it's doable. That said, though, the code here seems reasonable. Don't block on me here 😄 |
|
This is EXCELLENT work @JoshRosen ! Looking forward to future integration tests that cover these sorts of behaviors. |
|
@JoshRosen agreed with @ash211, this is really good. You are correct about the cases that my fix is addressing. Are there any actual comments on the PR, or can it be merged? =) |
|
@JoshRosen , this is awesome to test Spark integration with Docker @mccheah , this PR is LGTM now, except that we exposed too many should-be-private members in Worker (not your fault, existing in the current code).. not sure about the reason....@pwendell @markhamstra you have some insights about this? |
|
@CodingCat, Worker is private[spark], so what is the nature of your concern? In fact, I'm wondering whether we really want the changes in this PR that make some methods inaccessible from the rest of spark. I haven't looked at the accessibility of Worker's methods in detail to say for certain what the correct modifier should be in each case; but if we want to change them, that's a refactoring that can and should be addressed in another PR. |
|
@markhamstra , yeah, my concern is just this, though Worker is marked as private[spark], is it a good practice to expose every detail in the implementation to the other components....? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This log message could be more informative. I'd say something like
logInfo(s"Attempting to connect to master (attempt # $connectionAttemptCount)")I'd also move this into the Utils.tryOrExit block so that we print the incremented connectionAttemptCount.
|
@CodingCat, A legitimate concern, and certainly something that could be worked up into a JIRA issue and separate pull request. But it's not a very pressing issue since nothing is in the public API, and a larger refactoring of Worker shouldn't be conflated with this PR. |
|
As a general principle, you should use the most private access modifiers that are sufficient. We can always make methods / fields more visible, but it's much harder to remove / change functionality once it's been exposed to other components. W.r.t. refactoring, I agree with Mark: a large-scale refactoring of access modifiers should happen in a separate PR, not here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment above this line to say that this is modeled after Hadoop's design. This will help future maintainers to understand this code.
|
sure, I created the JIRA: https://issues.apache.org/jira/browse/SPARK-4011 |
|
QA tests have started for PR 2828 at commit
|
|
This looks good to me. Thanks! I'm going to merge this into |
|
QA tests have finished for PR 2828 at commit
|
|
Test FAILed. |
|
The PR doesn't seem to be related to the unit tests that failed. How shall we tackle this issue? |
|
Don't worry about it. This test is a little flaky and will be fixed shortly. I highly doubt that the test failure is caused by this PR. |
|
It looks like this patch may have introduced a race-condition / bug during multi-master failover: https://issues.apache.org/jira/browse/SPARK-4592. I'm working on a fix, but thought I'd mention the JIRA here in case any of this patch's reviewers would be interested in providing feedback. |
|
Andrew's got a patch for this: #3447 |
…lass https://issues.apache.org/jira/browse/SPARK-4011 Currently, most of the members in Master/Worker are with public accessibility. We might wish to tighten the accessibility of them a bit more discussion is here: #2828 Author: CodingCat <[email protected]> Closes #4844 from CodingCat/SPARK-4011 and squashes the following commits: 1a64175 [CodingCat] fix compilation issue e7fd375 [CodingCat] Sean is right.... f5034a4 [CodingCat] fix rebase mistake 8d5b0c0 [CodingCat] loose more fields 0072f96 [CodingCat] lose some restrictions based on the possible design intention de77286 [CodingCat] tighten accessibility of deploy package 12b4fd3 [CodingCat] tighten accessibility of deploy.worker 1243bc7 [CodingCat] tighten accessibility of deploy.rest c5f622c [CodingCat] tighten the accessibility of deploy.history d441e20 [CodingCat] tighten accessibility of deploy.client 4e0ce4a [CodingCat] tighten the accessibility of the members of classes in master 23cddbb [CodingCat] stylistic fix 9a3a340 [CodingCat] tighten the access of worker class 67a0559 [CodingCat] tighten the access permission in Master
Before, if the master node is killed and restarted, the worker nodes would not attempt to reconnect to the Master. Therefore, when the Master node was restarted, the worker nodes needed to be restarted as well. Now, when the Master node is disconnected, the worker nodes will continuously ping the master node in attempts to reconnect to it. Once the master node restarts, it will detect one of the registration requests from its former workers. The result is that the cluster re-enters a healthy state. In addition, when the master does not receive a heartbeat from the worker, the worker was removed; however, when the worker sent a heartbeat to the master, the master used to ignore the heartbeat. Now, a master that receives a heartbeat from a worker that had been disconnected will request the worker to re-attempt the registration process, at which point the worker will send a RegisterWorker request and be re-connected accordingly. Re-connection attempts per worker are submitted every N seconds, where N is configured by the property spark.worker.reconnect.interval - this has a default of 60 seconds right now. Author: mcheah <[email protected]> Closes apache#2828 from mccheah/reconnect-dead-workers and squashes the following commits: 83f8bc9 [mcheah] [SPARK-3736] More informative log message, and fixing some indentation. fe0e02f [mcheah] [SPARK-3736] Moving reconnection logic to registerWithMaster(). 94ddeca [mcheah] [SPARK-3736] Changing a log warning to a log info. a698e35 [mcheah] [SPARK-3736] Addressing PR comment to make some defs private. b9a3077 [mcheah] [SPARK-3736] Addressing PR comments related to reconnection. 2ad5ed5 [mcheah] [SPARK-3736] Cancel attempts to reconnect if the master changes. b5b34af [mcheah] [SPARK-3736] Workers reconnect when disassociated from the master.
Before, if the master node is killed and restarted, the worker nodes
would not attempt to reconnect to the Master. Therefore, when the Master
node was restarted, the worker nodes needed to be restarted as well.
Now, when the Master node is disconnected, the worker nodes will
continuously ping the master node in attempts to reconnect to it. Once
the master node restarts, it will detect one of the registration
requests from its former workers. The result is that the cluster
re-enters a healthy state.
In addition, when the master does not receive a heartbeat from the
worker, the worker was removed; however, when the worker sent a
heartbeat to the master, the master used to ignore the heartbeat. Now,
a master that receives a heartbeat from a worker that had been
disconnected will request the worker to re-attempt the registration
process, at which point the worker will send a RegisterWorker request
and be re-connected accordingly.
Re-connection attempts per worker are submitted every N seconds, where N
is configured by the property spark.worker.reconnect.interval - this has
a default of 60 seconds right now.