-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Prepare ShardFollowNodeTask to bootstrap when it fall behind leader shard #37562
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prepare ShardFollowNodeTask to bootstrap when it fall behind leader shard #37562
Conversation
…hard * Changed the shard changes api to include a special metadata in the exception being thrown to indicate that the ops are no longer there. * Changed ShardFollowNodeTask to handle this exception with special metadata and mark a shard as fallen behind its leader shard. The shard follow task will then abort its on going replication. The code that does the restore from ccr repository still needs to be added. This change should make that change a bit easier. Relates to elastic#35975
|
Pinging @elastic/es-distributed |
| String message = "Operations are no longer available for replicating. Maybe increase the retention setting [" + | ||
| IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey() + "]?"; | ||
| listener.onFailure(new ElasticsearchException(message, e)); | ||
| // Make it easy to detect this error in ShardFollowNodeTask: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe introduce a new exception (extends from IllegalStateException) which indicates that the requesting history is no longer available. We can throw that exception in the LuceneChangesSnapshot. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that make sense! I will make sure we thrown that new exception in LuceneChangesSnapshot and instead of the message checking that happens here then just do an instanceof check for this new exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @martijnvg :)
|
@dnhatn I've updated the PR. |
dnhatn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @martijnvg. I left two comments to discuss.
| */ | ||
| public final class OperationsMissingException extends IllegalStateException { | ||
|
|
||
| OperationsMissingException(String message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe name it MissingHistoryOperationsException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| failedReadRequests++; | ||
| fetchExceptions.put(from, Tuple.tuple(retryCounter, ExceptionsHelper.convertToElastic(e))); | ||
| } | ||
| if (e instanceof ElasticsearchException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we serialize the newly added exception and use it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But then MissingHistoryOperationsException needs to extend ElasticsearchException, otherwise it cannot be serialized.
Regardless of that I always understood that we want to minimize the number of exceptions that are serializable. This is why I added a header to ElasticsearchException instead of creating a new one. Does this still apply? Or do we really want to intoduce a new serializable exception here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regardless of that I always understood that we want to minimize the number of exceptions that are serializable.
If this is the case, let's use the header or metadata here.
dnhatn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@martijnvg I left two comments.
| // Make it easy to detect this error in ShardFollowNodeTask: | ||
| // (adding a metadata header instead of introducing a new exception that extends ElasticsearchException) | ||
| ElasticsearchException wrapper = new ElasticsearchException(message, e); | ||
| wrapper.addMetadata(Ccr.FALLEN_BEHIND_LEADER_SHARD_METADATA_KEY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should name this key to indicate that the requesting changes range is no longer available rather than "fallen behind leader shard" because this service should not know anything about the follower.
| } | ||
|
|
||
| void handleFallenBehindLeaderShard(Exception e) { | ||
| if (fallenBehindLeaderShard.compareAndSet(false, true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this method noop for now (i.e., without the state fallenBehindLeaderShard). I feel we need a more robust approach to avoid the scenario where an outstanding request can trigger another restore while the shard was restored already.
|
@dnhatn I've updated the PR. |
|
run the gradle build tests 1 |
|
@elasticmachine run elasticsearch-ci/2 |
|
@elasticmachine run elasticsearch-ci/1 |
This reverts commit 80c0efe.
|
@dnhatn As discussed, I have removed the |
dnhatn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @martijnvg.
| } | ||
| if (e instanceof ElasticsearchException) { | ||
| ElasticsearchException elasticsearchException = (ElasticsearchException) e; | ||
| if (elasticsearchException.getMetadataKeys().contains(Ccr.REQUESTED_OPS_MISSING_METADATA_KEY)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have been working on this PR locally with some other work of mine. When the listener is called, the exception is wrapped in a RemoteTransportException. So the check if the metadata is present fails even if the underlying exception has the metadata. I think that this needs to be unwrapped further. Blocking on a future unwraps the exception which is why your test passes (opposed to the listener infrastructure).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think also because the MissingHistoryOperationsException was not handeled properly in shard changes action, the ElasticsearchException with the metadata never made it to here.
| ActionListener<Response> wrappedListener = ActionListener.wrap(listener::onResponse, e -> { | ||
| Throwable cause = ExceptionsHelper.unwrapCause(e); | ||
| if (cause instanceof IllegalStateException && cause.getMessage().contains("Not all operations between from_seqno [")) { | ||
| if (cause instanceof MissingHistoryOperationsException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is serializing properly. I find this exception at a breakpoint here. I think that our exception serializing code is doing an instanceof check, seeing that this is an IllegalStateException and it is identified as an IllegalStateException on deserialization.
java.lang.IllegalStateException: Not all operations between from_seqno [17] and to_seqno [34] found; expected seqno [17]; found [Index{id='6', type='doc', seqNo=18, primaryTerm=1, version=2, autoGeneratedIdTimestamp=-1}]
at org.elasticsearch.index.engine.LuceneChangesSnapshot.rangeCheck(LuceneChangesSnapshot.java:155)
at org.elasticsearch.index.engine.LuceneChangesSnapshot.next(LuceneChangesSnapshot.java:138)
at org.elasticsearch.xpack.ccr.action.ShardChangesAction.getOperations(ShardChangesAction.java:527)
at org.elasticsearch.xpack.ccr.action.ShardChangesAction$TransportAction.shardOperation(ShardChangesAction.java:340)
at org.elasticsearch.xpack.ccr.action.ShardChangesAction$TransportAction.shardOperation(ShardChangesAction.java:319)
at org.elasticsearch.action.support.single.shard.TransportSingleShardAction$1.doRun(TransportSingleShardAction.java:117)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:41)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:732)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.lang.Thread.run(Thread.java:834)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a wrong assumption here. I thought that the exception here didn't get serialized, but it does get serialized. (when the request gets send to a different shard copy then the local node has).
This means that throwing the MissingHistoryOperationsException here does not work. We need to throw an exception that extends from ElasticticsearchException and that sets specific metadata in the LuceneSnapshot.java . I will make this change now.
…xception with a special header instead. MissingHistoryOperationsException couldn't be used because it ended up getting serialized in certain cases and this exception was then not handled correctly.
|
@dnhatn @tbrooks8 I've updated the PR. I removed the @tbrooks8 Can you verify whether this change works in your PR? |
…otFoundException with" This reverts commit 3025289.
a different place where it never gets serialized and there covert it into the wrapper exception.
|
@dnhatn and I talked about keeping the So I reverted my previous commit and instead of handeling the The only minor downside of this approach is that will potentially wrap a |
|
I reran my test and found the |
dnhatn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @martijnvg for an extra iteration.
…hard (#37562) * Changed `LuceneSnapshot` to throw an `OperationsMissingException` if the requested ops are missing. * Changed the shard changes api to handle the `OperationsMissingException` and wrap the exception into `ResourceNotFound` exception and include metadata to indicate the requested range can no longer be retrieved. * Changed `ShardFollowNodeTask` to handle this `ResourceNotFound` exception with the included metdata header. Relates to #35975
to indicate that the ops are no longer there.
and mark a shard as fallen behind its leader shard. The shard follow task
will then abort its on going replication.
The code that does the restore from ccr repository still needs to be added.
This change should make that change a bit easier.
Relates to #35975