Skip to content

Conversation

@areek
Copy link
Contributor

@areek areek commented Jun 27, 2016

Currently, any write (e.g. index, delete) operation failure can be categorized as:

  • request failure (e.g. analysis, parsing error, version conflict)
  • transient operation failure (e.g. due to shard initializing, relocation)
  • environment failure (e.g. out of disk, corruption, lucene tragic event)

The main motivation of the PR is to handle these failure types appropriately for a
write request. Each failure type needs to be handled differently:

  • request failure (being request specific) should be replicated and then failed
  • transient failure should be retried (eventually succeeding)
  • environment failure (persistent primary shard failure) should fail the request
    immediately.

Currently, transient operation failures are retried in replication action but no distinction
is made between request and environment failures, both fails write request immediately.

In this PR, we distinguish between request and environment failures for a write operation.
In case of environment failures, the exception is bubbled up failing the request and in case
of request failures, the exception is captured and replication continues (we ignore performing
on replicas when such failures occur in primary). Transient operation failures are bubbled up
to be retried by the replication operation, as before.

Note: #20109 simplifies bulk execution code, which should clean up error handling for shard bulk requests.

@areek areek force-pushed the enhancement/replicate_primary_write_failures branch 3 times, most recently from 3b56c4d to 280d18a Compare June 29, 2016 21:21
@areek areek changed the title Replicate primary write operation failures Make primary write operation failure a valid result Jun 29, 2016
@areek
Copy link
Contributor Author

areek commented Jun 29, 2016

After discussions with @bleskes, I changed the scope of this PR. Now, the PR focuses on making primary write operation failures a valid write result, so replication operation can handle them explicitly. We can add operation failure replication, as needed in the feature/seq_no branch

@areek areek force-pushed the enhancement/replicate_primary_write_failures branch from 280d18a to 5fbd801 Compare June 29, 2016 21:31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nite: maybe just put this in a variable?

@areek areek force-pushed the enhancement/replicate_primary_write_failures branch from 5fbd801 to 938e043 Compare July 4, 2016 22:22
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do we throw anything here? is this when the parsing doesn't match the mapping?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we throw error while parsing index request source here, currently we bubble up the exception as this is not engine level exception (though it is operation specific) .

@areek
Copy link
Contributor Author

areek commented Oct 27, 2016

Thanks @bleskes for the feedback. I updated the PR, addressing all your comments, including adding tests for failure handling in TransportWriteAction and InternalEngine

Copy link
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just looked at the exception handling for now and left a single comment

if (isDocumentFailure) {
return failure;
} else {
ElasticsearchException exception = new ElasticsearchException(failure);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry but this is an absolute no-go. We worked so hard to get rid of this in so many places. We should just rethrow the original exception instead of hiding it in and ElasticsearchException. Yet, I think we can use a littel hack to make it work in this case:

    @SuppressWarnings("unchecked")
    static <T extends Throwable> void rethrow(Throwable t) throws T {
        throw (T) t;
    }

should allow you to just retrhow failure without declaring it. I think in this case it's OK

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @s1monw for proposing a solution to this. Now we rethrow the original exception, if it failed the engine instead of wrapping it in ElasticsearchException.

* expects <code>finalResponseIfSuccessful</code> or <code>finalFailure</code> to be not-null
*/
public PrimaryResult(ReplicaRequest replicaRequest, Response finalResponseIfSuccessful, Exception finalFailure) {
assert finalFailure != null ^ finalResponseIfSuccessful != null : "either a response or a failure has to be not null";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some info to the string message to let us know which whether both were null or both were set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added finalFailure and finalResponseIfSuccessful as part of the assert message

// execute item request
final Engine.Result operationResult;
final DocWriteResponse response;
BulkItemRequest replicaRequest = request.items()[requestIndex];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering whether we should make it final, so each path will have to set it. wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes sense, made replicaRequest final and explicitly set it for all the cases

}
// set update response
item.setPrimaryResponse(new BulkItemResponse(item.id(), opType, response));
request.items()[requestIndex] = replicaRequest;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm... strictly speaking we should restore this on retry, like we do with preVersions & preVersionTypes. This is an existing bug. can you open an issue about it so we can think about a proper solution in a follow up? This one is hairy enough :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #21221 for this, I agree we should restore the original request, but it is non-trivial because we update the request in place.
In reality though, it should not be a problem, because upon retrying the shard bulk request, the shard will just execute the translated request instead of translating the update request again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not completely true as without the indexing into the replicas, we don't know if the way we resolved the update is valid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when could a resolved update request that successfully executed on the primary not be valid for a replica?

// then just use the response we got from the successful execution
if (item.getPrimaryResponse() == null || isConflictException(failure) == false) {
item.setPrimaryResponse(new BulkItemResponse(item.id(), docWriteRequest.opType(),
if (replicaRequest.getPrimaryResponse() == null || isConflictException(failure) == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a mess - there is no guarantee from the coe that the replica request wasn't just replaced when we did request.items()[requestIndex] = replicaRequest; . In practice I think it's OK, but I don't like it. Also, now that we have a proper primary relocation (where we don't shut down the shards half way operations), I don't think it's needed. Can you open another follow up issue so we can evaluate carefully?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will open a followup issue, for some reason github is erroring out on creating issues now :0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opened #21230 for this

void setTranslogLocation(Translog.Location translogLocation) {
if (freeze == false) {
this.location = location;
this.translogLocation = translogLocation;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we also want to assert failure is null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added an assert for a null failure

org.elasticsearch.action.RoutingMissingException::new, 79),
OPERATION_FAILED_ENGINE_EXCEPTION(OperationFailedEngineException.class,
OperationFailedEngineException::new, 80),
// 80 used to be for IndexFailedEngineException, removed in 6.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a bwc here no? or is it somewhere else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bleskes we need bwc here if we care about rolling restarts from 5.x. I am not sure how to add bwc for this (i.e. might be missing something). We could simply add the exceptions back (but not use them anywhere) so they can be serialized/deserialized when a 6.0 node is acting as a coordinating node?

Another question: The index/delete operation failures were communicated as exceptions, so do we even need a bwc for these failures for serialization/deserialization or can we just rely on generic exception serialization/deserialization like we currently do for persistent engine failures during index/delete operations?

statusChecker.bytesWritten(result.getSizeInBytes());
private void recordOperationBytes(Engine.Operation operation, Engine.Result result) {
final int sizeInBytes;
if (result.getTranslogLocation() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I talked this one with @mikemccand and since this whole thing is a heuristic to approximate the memory usage of the IW, we think it's OK to always use the operation.estimatedSizeInBytes() . No need to look at the translog location.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to just use operation.estimatedSizeInBytes(), just a note: before the change, we did used to account for tranlogLocation.size when translog location was set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ sorry for not mentioning that it was already the case before - I know you just ported what was there 🎉

TestAction.WriteReplicaResult::respond);
}

private <Result, Response> void handleDocumentFailure(TestAction testAction,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you unpack it? it saves on 10 lines of code at the expense of making it much harder to understand what it does - you end going back and forth - I think it's not worth it?

default void postIndex(Engine.Index index, Engine.IndexResult result) {}

/**
* Called after the indexing operation occurred with exception.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we document the difference with the other postIndex and how failures are treated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added java docs for this

@Nullable Location location, @Nullable Exception operationFailure,
IndexShard primary) {
super(request, finalResponse, operationFailure);
assert operationFailure != null ^ finalResponse != null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but still if the location is != null, there must be no failure?

Copy link
Contributor

@bleskes bleskes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few minor comments and some requests for new issues. I like how this looks.

I would be great to get @s1monw LGTM.

Also, @jpountz can you sanity check https://github.com/elastic/elasticsearch/pull/19105/files#diff-7cdd93f7b049567dc8e2ffc37300852eR169 ? It would be great to have one exception type

@areek
Copy link
Contributor Author

areek commented Nov 1, 2016

@bleskes thanks again for the review, I addressed all the comments and had
one question regarding bwc for the removed exceptions in #19105 (comment)

@bleskes
Copy link
Contributor

bleskes commented Nov 1, 2016

@areek because the github ui sucks, I'm responding here so it will be easy to see:

if we care about rolling restarts from 5.x

We care! and yeah, I think we can just keep them in there and doc that they can be removed in 7.0 (assertion?)

The index/delete operation failures were communicated as exceptions, so do we even need a bwc for these failures for serialization/deserialization or can we just rely on generic exception serialization/deserialization like we currently do for persistent engine failures during index/delete operations?

I don't think we can rely on generic exceptions for incoming responses from old nodes. That will ask for a specific exception ID and we won't have it -> boom.

IndexShard primary) {
super(request, finalResponse, operationFailure);
if (location != null) {
assert operationFailure == null : "expected no failures when translog location is not null";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: assertion location == null || operationFailure == null ?

/**
* Called after the indexing operation occurred.
* Called after the indexing operation occurred. Implementations should
* check {@link Engine.IndexResult#hasFailure()} for operation failures
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to say this? how about "note that this method is also called when indexing a document didn't succeed because of document related failures. See {@link #postIndex(..)} for engine level failures.

Copy link
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some minor comments, I did review the engine changes and glanced on the replciation action stuff. I think it LGTM except of the one or two commetns I gave. The one with the maybeFail is important


import java.io.IOException;

public class DeleteFailedEngineException extends EngineException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}


private void postIndexing(ParsedDocument doc, long tookInNanos) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

public abstract IndexResult index(Index operation);

public abstract void delete(Delete delete) throws EngineException;
public abstract DeleteResult delete(Delete delete);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a java doc commetn would be great here while we are at it

private final Exception failure;
private Translog.Location translogLocation;
private long took;
private boolean freeze;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use private final SetOnce<Boolean> frozen here it will barf if you freeze multiple times?

import java.io.IOException;
import java.util.Objects;

public class IndexFailedEngineException extends EngineException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// and set the error in operation.setFailure. In case of environment related errors, the failure
// is bubbled up
isDocumentFailure = !((failure instanceof IllegalStateException || failure instanceof IOException)
&& maybeFailEngine(operation.operationType().getLowercase(), failure));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we please move the maybeFailEngine(operation.operationType().getLowercase(), failure)) one line above and assign it to a variable. I really wanna make sure no short circuit logic here prevents it from being called


@SuppressWarnings("unchecked")
static <T extends Throwable> void rethrow(Throwable t) throws T {
throw (T) t;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe document that this is a hack to retrhow the original

if (result.hasFailure() == false) {
if (!index.origin().isRecovery()) {
long took = result.getTook();
totalStats.indexMetric.inc(took);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have a write failures statistic too? @bleskes

@areek areek merged commit 03abf4a into elastic:master Nov 1, 2016
dakrone added a commit to dakrone/elasticsearch that referenced this pull request Jan 23, 2017
This is a bespoke backport of elastic#20109 for 5.x:

Currently, bulk item requests can be any ActionRequest, this PR restricts bulk
item requests to DocumentRequest. This simplifies handling failures during bulk
requests. Additionally, a new enum is added to DocumentRequest to represent the
intended operation to be performed by a document request (create, index, update
and delete), which was previously represented with a mix of strings and index
request operation type.

Now, index request operation type reuses the new enum to specify whether the
request should create or index a document. Restricting bulk requests to
DocumentRequest further simplifies execution of shard-level bulk operations to
use the same failure handling for index, delete and update operations. This PR
also fixes a bug which executed delete operations twice for replica copies while
executing bulk requests.

Relates to elastic#19105 and elastic#20109
areek added a commit to areek/elasticsearch that referenced this pull request Jan 25, 2017
Currently, any write (e.g. `index`, `delete`) operation failure can be categorized as:
- request failure (e.g. analysis, parsing error, version conflict)
- transient operation failure (e.g. due to shard initializing, relocation)
- environment failure (e.g. out of disk, corruption, lucene tragic event)

The main motivation of the PR is to handle these failure types appropriately for a
write request. Each failure type needs to be handled differently:
- request failure (being request specific) should be replicated and then failed
- transient failure should be retried (eventually succeeding)
- environment failure (persistent primary shard failure) should fail the request
  immediately.

Currently, transient operation failures are retried in replication action but no distinction
is made between request and environment failures, both fails write request immediately.

In this PR, we distinguish between request and environment failures for a write operation.
In case of environment failures, the exception is bubbled up failing the request and in case
of request failures, the exception is captured and replication continues (we ignore performing
on replicas when such failures occur in primary). Transient operation failures are bubbled up
to be retried by the replication operation, as before.
areek added a commit that referenced this pull request Jan 25, 2017
* Simplify write failure handling (backport of #19105)

Currently, any write (e.g. `index`, `delete`) operation failure can be categorized as:
- request failure (e.g. analysis, parsing error, version conflict)
- transient operation failure (e.g. due to shard initializing, relocation)
- environment failure (e.g. out of disk, corruption, lucene tragic event)

The main motivation of the PR is to handle these failure types appropriately for a
write request. Each failure type needs to be handled differently:
- request failure (being request specific) should be replicated and then failed
- transient failure should be retried (eventually succeeding)
- environment failure (persistent primary shard failure) should fail the request
  immediately.

Currently, transient operation failures are retried in replication action but no distinction
is made between request and environment failures, both fails write request immediately.

In this PR, we distinguish between request and environment failures for a write operation.
In case of environment failures, the exception is bubbled up failing the request and in case
of request failures, the exception is captured and replication continues (we ignore performing
on replicas when such failures occur in primary). Transient operation failures are bubbled up
to be retried by the replication operation, as before.

* incorporate feedback
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants