-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Retry failed replication due to transient errors #55633
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Currently a failed replication action will fail an entire replica. This includes when replication fails due to potentially short lived transient issues such as network distruptions or circuit breaking errors. This commit adds the concept of a retryable action. A retryable action will be retryed in face of certain errors. The action will be retried after an exponentially increasing backoff period. After defined time, the action will timeout.
|
Pinging @elastic/es-distributed (:Distributed/CRUD) |
|
Hey @ywelsch this isa POC for handling the replication group logic. I know that we discussed the cluster state listener approach. But as I was working on that I felt like I was replicating the |
ywelsch
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Tim. Going with the replicationgroup listener is a viable option I think. I have left some comments on the structure and ways to possibly simplify things a bit.
server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java
Outdated
Show resolved
Hide resolved
|
|
||
| private final Map<String, Map<Object, RetryableAction<?>>> onGoingReplicationActions = ConcurrentCollections.newConcurrentMap(); | ||
|
|
||
| public void addPendingAction(String nodeId, Object actionKey, RetryableAction<?> replicationAction) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just use the RetryableAction as key?
...er/src/main/java/org/elasticsearch/action/support/replication/PendingReplicationActions.java
Show resolved
Hide resolved
...er/src/main/java/org/elasticsearch/action/support/replication/PendingReplicationActions.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java
Outdated
Show resolved
Hide resolved
| } | ||
| } | ||
| } | ||
| this.replicationGroup = newReplicationGroup; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that I would prefer for this to be set before we call the methods on pendingReplication, just to be sure that an exception there does not mess up the state in this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This work was to prevent the race where the ReplicationGroup is returned by the replication tracker with a new node to a ReplicationOperation. This operation attempts to start a replication request, but the listener to PendingReplicationActions has not yet been called so the request is immediately cancelled.
I made your change. But I think spurious cancellations are possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I see now. One option is to turn it around then, but use try - finally to make sure that the state is updated even when the listener throws.
Another option (currently preferring this one for its generality) is that we have a versioning concept on ReplicationGroup (i.e. knowing which one is newer than another one) and that we explicitly update PendingReplicationActions whenever we capture the ReplicationGroup in IndexShard (most times the update will be a NOOP and should not need any locking on PendingReplicationActions).
server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java
Outdated
Show resolved
Hide resolved
...er/src/main/java/org/elasticsearch/action/support/replication/PendingReplicationActions.java
Outdated
Show resolved
Hide resolved
|
@ywelsch - As an FYI I plan to add tests once we decide on the specific approach. |
ywelsch
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see two options:
- tracking replication group changes during the whole replication (i.e. from beginning where we send replication request first time to eventually successfully/unsuccessfully completing after X retries). The advantage of this approach is that we can react more quickly to replication group changes (and don't even need to wait for the respective replication request to complete before we can reach a conclusion about its success). The disadvantage is that we will have to track every outgoing request in the system, which comes with a certain overhead (listener registration / condition checking / deregistration).
- tracking replication group changes only while retries are locally scheduled. As soon as the replication request is resent, we would remove replication group tracking (and readd it before rescheduling). The advantage of this approach is that no tracking and listener registration would be required under normal operation when there are no retries. The disadvantage is that we are less reactive to nodes that might have long GCs and which the master has already failed (i.e. thrown out of the replication group).
Currently I'm leaning for option 1, if we can make it fast (have minimal overhead with the listener registration stuff). Let's discuss this when you're online
server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/index/shard/ReplicationGroup.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java
Outdated
Show resolved
Hide resolved
|
One test that is failing here is
This test is now inconsistently failing because the assertBusy times out after 1 minute. But our retries for the rsync times-out after 1 minute (defined by the timeout for the request which is the default). This raises the question of what we want the timeout to be. I set this to be the request timeout which is configurable for external bulk shard requests. But I assume this is not configurable for internal operations? Do we want some type of setting like we introduced with peer recovery? I could fix the test by introducing a different non-retryble error for the failure reason. But I assume the timeout conversation is something we wanted to have anyway., |
|
@ywelsch - In the most recent test run I got a test failure related to an issue I described here.
I can get this to consistently fail by added a sleep in Thoughts on the correct approach? I assume we have to fix this at this point since it is failing tests. |
yes, a cluster-level setting like for peer recoveries would be best here I think. Given that the timeout determines how failure-resilient the cluster will be (i.e. how quickly it will start to fail shards), it's probably best not to mix it with the request-level timeout, which is more about how long the cluster should try to bring the given request to execution. For our integration tests (ESIntegTestCase), we should then inject a random timeout in all tests (and explicitly use higher timeout for those tests where care about the added resilience). |
|
@ywelsch I have updated this PR with the versioning approach and adding the timeout setting. |
test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
Outdated
Show resolved
Hide resolved
ywelsch
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks Tim! As follow-up, we can look at possibly exposing some information in the tasks API about a replication request that is pending replication (after indexing into primary completed), just so that we see that we're possibly waiting for a retry (see TransportReplication.setPhase).
dnhatn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! I left some minor comments. Thanks Tim!
| } | ||
| } | ||
|
|
||
| threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> toCancel.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe share this logic with close?
|
|
||
| @Override | ||
| public void accept(ReplicationGroup replicationGroup) { | ||
| if (replicationGroup.getVersion() - replicationGroupVersion > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just replicationGroup.getVersion() > replicationGroupVersion?
| public void accept(ReplicationGroup replicationGroup) { | ||
| if (replicationGroup.getVersion() - replicationGroupVersion > 0) { | ||
| synchronized (this) { | ||
| if (replicationGroup.getVersion() - replicationGroupVersion > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just replicationGroup.getVersion() > replicationGroupVersion ?
Currently a failed replication action will fail an entire replica. This includes when replication fails due to potentially short lived transient issues such as network distruptions or circuit breaking errors. This commit implements retries using the retryable action.
Currently a failed replication action will fail an entire replica. This
includes when replication fails due to potentially short lived transient
issues such as network distruptions or circuit breaking errors.
This commit implements retries using the retryable action.