Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
Expand All @@ -29,6 +30,7 @@
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.action.update.UpdateHelper;
Expand Down Expand Up @@ -65,9 +67,6 @@

import java.util.Map;

import static org.elasticsearch.action.support.replication.ReplicationOperation.ignoreReplicaException;
import static org.elasticsearch.action.support.replication.ReplicationOperation.isConflictException;

/** Performs shard-level bulk (index, delete or update) operations */
public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {

Expand Down Expand Up @@ -235,6 +234,10 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
return location;
}

private static boolean isConflictException(final Exception e) {
return ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException;
}

private static class UpdateResultHolder {
final BulkItemRequest replicaRequest;
final Engine.Result operationResult;
Expand Down Expand Up @@ -392,7 +395,7 @@ public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardReq
|| failure instanceof IndexShardClosedException
: "expected any one of [version conflict, mapper parsing, engine closed, index shard closed]" +
" failures. got " + failure;
if (!ignoreReplicaException(failure)) {
if (!TransportActions.isShardNotAvailableException(failure)) {
throw failure;
}
} else {
Expand All @@ -401,7 +404,7 @@ public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardReq
} catch (Exception e) {
// if its not an ignore replica failure, we need to make sure to bubble up the failure
// so we will fail the shard
if (!ignoreReplicaException(e)) {
if (!TransportActions.isShardNotAvailableException(e)) {
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void onFailure(Exception replicaException) {
shard,
replicaRequest),
replicaException);
if (ignoreReplicaException(replicaException)) {
if (TransportActions.isShardNotAvailableException(replicaException)) {
decPendingAndFinishIfNeeded();
} else {
RestStatus restStatus = ExceptionsHelper.status(replicaException);
Expand Down Expand Up @@ -314,30 +314,6 @@ private void finishAsFailed(Exception exception) {
}
}


/**
* Should an exception be ignored when the operation is performed on the replica.
*/
public static boolean ignoreReplicaException(Exception e) {
if (TransportActions.isShardNotAvailableException(e)) {
return true;
}
// on version conflict or document missing, it means
// that a new change has crept into the replica, and it's fine
if (isConflictException(e)) {
return true;
}
return false;
}

public static boolean isConflictException(Throwable t) {
final Throwable cause = ExceptionsHelper.unwrapCause(t);
// on version conflict or document missing, it means
// that a new change has crept into the replica, and it's fine
return cause instanceof VersionConflictEngineException;
}


public interface Primary<
Request extends ReplicationRequest<Request>,
ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,10 +477,7 @@ private <T extends Result> Optional<T> checkVersionConflict(
}

if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
if (op.origin().isRecovery()) {
// version conflict, but okay
result = onSuccess.get();
} else {
if (op.origin() == Operation.Origin.PRIMARY) {
// fatal version conflict
final VersionConflictEngineException e =
new VersionConflictEngineException(
Expand All @@ -489,8 +486,13 @@ private <T extends Result> Optional<T> checkVersionConflict(
op.id(),
op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
result = onFailure.apply(e);
} else {
/*
* Version conflicts during recovery and on replicas are normal due to asynchronous execution; as such, we should return a
* successful result.
*/
result = onSuccess.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment here ? something like "version conflicts during recovery and replica operation are normal and occur due to out of order delivery. we should return a successful result"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed bbffd87.

}

return Optional.of(result);
} else {
return Optional.empty();
Expand Down Expand Up @@ -672,7 +674,7 @@ private IndexResult innerIndex(Index index) throws IOException {
}
}
final long expectedVersion = index.version();
final Optional<IndexResult> checkVersionConflictResult =
final Optional<IndexResult> resultOnVersionConflict =
checkVersionConflict(
index,
currentVersion,
Expand All @@ -682,15 +684,15 @@ private IndexResult innerIndex(Index index) throws IOException {
e -> new IndexResult(e, currentVersion, index.seqNo()));

final IndexResult indexResult;
if (checkVersionConflictResult.isPresent()) {
indexResult = checkVersionConflictResult.get();
if (resultOnVersionConflict.isPresent()) {
indexResult = resultOnVersionConflict.get();
} else {
// no version conflict
if (index.origin() == Operation.Origin.PRIMARY) {
seqNo = seqNoService().generateSeqNo();
}

/**
/*
* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
* number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
Expand All @@ -707,10 +709,12 @@ private IndexResult innerIndex(Index index) throws IOException {
update(index.uid(), index.docs(), indexWriter);
}
indexResult = new IndexResult(updatedVersion, seqNo, deleted);
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
}
if (!indexResult.hasFailure()) {
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
? translog.add(new Translog.Index(index, indexResult))
: null;
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
indexResult.setTranslogLocation(location);
}
indexResult.setTook(System.nanoTime() - index.startTime());
Expand Down Expand Up @@ -804,18 +808,17 @@ private DeleteResult innerDelete(Delete delete) throws IOException {

final long expectedVersion = delete.version();

final Optional<DeleteResult> result =
final Optional<DeleteResult> resultOnVersionConflict =
checkVersionConflict(
delete,
currentVersion,
expectedVersion,
deleted,
() -> new DeleteResult(expectedVersion, delete.seqNo(), true),
e -> new DeleteResult(e, expectedVersion, delete.seqNo()));

final DeleteResult deleteResult;
if (result.isPresent()) {
deleteResult = result.get();
if (resultOnVersionConflict.isPresent()) {
deleteResult = resultOnVersionConflict.get();
} else {
if (delete.origin() == Operation.Origin.PRIMARY) {
seqNo = seqNoService().generateSeqNo();
Expand All @@ -824,11 +827,14 @@ private DeleteResult innerDelete(Delete delete) throws IOException {
updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion);
found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue);
deleteResult = new DeleteResult(updatedVersion, seqNo, found);

versionMap.putUnderLock(delete.uid().bytes(),
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
}
if (!deleteResult.hasFailure()) {
location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
? translog.add(new Translog.Delete(delete, deleteResult))
: null;
versionMap.putUnderLock(delete.uid().bytes(),
new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));
deleteResult.setTranslogLocation(location);
}
deleteResult.setTook(System.nanoTime() - delete.startTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1478,76 +1478,121 @@ public void testVersioningCreateExistsExceptionWithFlush() {
}

public void testVersioningReplicaConflict1() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc);
Engine.IndexResult indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(1L));
final ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocument(), B_1, null);
final Engine.Index v1Index = new Engine.Index(newUid("1"), doc);
final Engine.IndexResult v1Result = engine.index(v1Index);
assertThat(v1Result.getVersion(), equalTo(1L));

index = new Engine.Index(newUid("1"), doc);
indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(2L));
final Engine.Index v2Index = new Engine.Index(newUid("1"), doc);
final Engine.IndexResult v2Result = engine.index(v2Index);
assertThat(v2Result.getVersion(), equalTo(2L));

// apply the second index to the replica, should work fine
index = new Engine.Index(newUid("1"), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
indexResult = replicaEngine.index(index);
assertThat(indexResult.getVersion(), equalTo(2L));

long seqNo = indexResult.getSeqNo();
long primaryTerm = index.primaryTerm();
// now, the old one should not work
index = new Engine.Index(newUid("1"), doc, seqNo, primaryTerm, 1L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
indexResult = replicaEngine.index(index);
assertTrue(indexResult.hasFailure());
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
final Engine.Index replicaV2Index = new Engine.Index(
newUid("1"),
doc,
v2Result.getSeqNo(),
v2Index.primaryTerm(),
v2Result.getVersion(),
VersionType.INTERNAL.versionTypeForReplicationAndRecovery(),
REPLICA,
0,
-1,
false);
final Engine.IndexResult replicaV2Result = replicaEngine.index(replicaV2Index);
assertThat(replicaV2Result.getVersion(), equalTo(2L));

// now, the old one should produce an indexing result
final Engine.Index replicaV1Index = new Engine.Index(
newUid("1"),
doc,
v1Result.getSeqNo(),
v1Index.primaryTerm(),
v1Result.getVersion(),
VersionType.INTERNAL.versionTypeForReplicationAndRecovery(),
REPLICA,
0,
-1,
false);
final Engine.IndexResult replicaV1Result = replicaEngine.index(replicaV1Index);
assertFalse(replicaV1Result.hasFailure());
assertFalse(replicaV1Result.isCreated());
assertThat(replicaV1Result.getVersion(), equalTo(2L));

// second version on replica should fail as well
index = new Engine.Index(newUid("1"), doc, seqNo, primaryTerm, 2L
, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
indexResult = replicaEngine.index(index);
assertThat(indexResult.getVersion(), equalTo(2L));
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
final Engine.IndexResult replicaV2ReplayResult = replicaEngine.index(replicaV2Index);
assertFalse(replicaV2Result.hasFailure());
assertFalse(replicaV1Result.isCreated());
assertThat(replicaV2ReplayResult.getVersion(), equalTo(2L));
}

public void testVersioningReplicaConflict2() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc);
Engine.IndexResult indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(1L));
final ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocument(), B_1, null);
final Engine.Index v1Index = new Engine.Index(newUid("1"), doc);
final Engine.IndexResult v1Result = engine.index(v1Index);
assertThat(v1Result.getVersion(), equalTo(1L));

// apply the first index to the replica, should work fine
index = new Engine.Index(newUid("1"), doc, indexResult.getSeqNo(), index.primaryTerm(), 1L,
VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
indexResult = replicaEngine.index(index);
assertThat(indexResult.getVersion(), equalTo(1L));
final Engine.Index replicaV1Index = new Engine.Index(
newUid("1"),
doc,
v1Result.getSeqNo(),
v1Index.primaryTerm(),
v1Result.getVersion(),
VersionType.INTERNAL.versionTypeForReplicationAndRecovery(),
REPLICA,
0,
-1,
false);
Engine.IndexResult replicaV1Result = replicaEngine.index(replicaV1Index);
assertThat(replicaV1Result.getVersion(), equalTo(1L));

// index it again
index = new Engine.Index(newUid("1"), doc);
indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(2L));
final Engine.Index v2Index = new Engine.Index(newUid("1"), doc);
final Engine.IndexResult v2Result = engine.index(v2Index);
assertThat(v2Result.getVersion(), equalTo(2L));

// now delete it
Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"));
Engine.DeleteResult deleteResult = engine.delete(delete);
final Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"));
final Engine.DeleteResult deleteResult = engine.delete(delete);
assertThat(deleteResult.getVersion(), equalTo(3L));

// apply the delete on the replica (skipping the second index)
delete = new Engine.Delete("test", "1", newUid("1"), deleteResult.getSeqNo(), delete.primaryTerm(), 3L
, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0);
deleteResult = replicaEngine.delete(delete);
assertThat(deleteResult.getVersion(), equalTo(3L));

// second time delete with same version should fail
delete = new Engine.Delete("test", "1", newUid("1"), deleteResult.getSeqNo(), delete.primaryTerm(), 3L
, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0);
deleteResult = replicaEngine.delete(delete);
assertTrue(deleteResult.hasFailure());
assertThat(deleteResult.getFailure(), instanceOf(VersionConflictEngineException.class));

// now do the second index on the replica, it should fail
index = new Engine.Index(newUid("1"), doc, deleteResult.getSeqNo(), delete.primaryTerm(), 2L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
indexResult = replicaEngine.index(index);
assertTrue(indexResult.hasFailure());
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
final Engine.Delete replicaDelete = new Engine.Delete(
"test",
"1",
newUid("1"),
deleteResult.getSeqNo(),
delete.primaryTerm(),
deleteResult.getVersion(),
VersionType.INTERNAL.versionTypeForReplicationAndRecovery(),
REPLICA,
0);
final Engine.DeleteResult replicaDeleteResult = replicaEngine.delete(replicaDelete);
assertThat(replicaDeleteResult.getVersion(), equalTo(3L));

// second time delete with same version should just produce the same version
final Engine.DeleteResult deleteReplayResult = replicaEngine.delete(replicaDelete);
assertFalse(deleteReplayResult.hasFailure());
assertTrue(deleteReplayResult.isFound());
assertThat(deleteReplayResult.getVersion(), equalTo(3L));

// now do the second index on the replica, it should result in the current version
final Engine.Index replicaV2Index = new Engine.Index(
newUid("1"),
doc,
v2Result.getSeqNo(),
v2Index.primaryTerm(),
v2Result.getVersion(),
VersionType.INTERNAL.versionTypeForReplicationAndRecovery(),
REPLICA,
0,
-1,
false);
final Engine.IndexResult replicaV2Result = replicaEngine.index(replicaV2Index);
assertFalse(replicaV2Result.hasFailure());
assertFalse(replicaV2Result.isCreated());
assertThat(replicaV2Result.getVersion(), equalTo(3L));
}

public void testBasicCreatedFlag() {
Expand Down