diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index bb5714d3c3a1d..b4c3daee08f9d 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteRequest; @@ -29,6 +30,7 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationOperation; +import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.action.update.UpdateHelper; @@ -65,9 +67,6 @@ import java.util.Map; -import static org.elasticsearch.action.support.replication.ReplicationOperation.ignoreReplicaException; -import static org.elasticsearch.action.support.replication.ReplicationOperation.isConflictException; - /** Performs shard-level bulk (index, delete or update) operations */ public class TransportShardBulkAction extends TransportWriteAction { @@ -235,6 +234,10 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh return location; } + private static boolean isConflictException(final Exception e) { + return ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException; + } + private static class UpdateResultHolder { final BulkItemRequest replicaRequest; final Engine.Result operationResult; @@ -392,7 +395,7 @@ public WriteReplicaResult shardOperationOnReplica(BulkShardReq || failure instanceof IndexShardClosedException : "expected any one of [version conflict, mapper parsing, engine closed, index shard closed]" + " failures. got " + failure; - if (!ignoreReplicaException(failure)) { + if (!TransportActions.isShardNotAvailableException(failure)) { throw failure; } } else { @@ -401,7 +404,7 @@ public WriteReplicaResult shardOperationOnReplica(BulkShardReq } catch (Exception e) { // if its not an ignore replica failure, we need to make sure to bubble up the failure // so we will fail the shard - if (!ignoreReplicaException(e)) { + if (!TransportActions.isShardNotAvailableException(e)) { throw e; } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 25dcc29a5c3a3..6a3d217fcf6c9 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -202,7 +202,7 @@ public void onFailure(Exception replicaException) { shard, replicaRequest), replicaException); - if (ignoreReplicaException(replicaException)) { + if (TransportActions.isShardNotAvailableException(replicaException)) { decPendingAndFinishIfNeeded(); } else { RestStatus restStatus = ExceptionsHelper.status(replicaException); @@ -314,30 +314,6 @@ private void finishAsFailed(Exception exception) { } } - - /** - * Should an exception be ignored when the operation is performed on the replica. - */ - public static boolean ignoreReplicaException(Exception e) { - if (TransportActions.isShardNotAvailableException(e)) { - return true; - } - // on version conflict or document missing, it means - // that a new change has crept into the replica, and it's fine - if (isConflictException(e)) { - return true; - } - return false; - } - - public static boolean isConflictException(Throwable t) { - final Throwable cause = ExceptionsHelper.unwrapCause(t); - // on version conflict or document missing, it means - // that a new change has crept into the replica, and it's fine - return cause instanceof VersionConflictEngineException; - } - - public interface Primary< Request extends ReplicationRequest, ReplicaRequest extends ReplicationRequest, diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a18ca7f280ee0..362a4c9a48c66 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -477,10 +477,7 @@ private Optional checkVersionConflict( } if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) { - if (op.origin().isRecovery()) { - // version conflict, but okay - result = onSuccess.get(); - } else { + if (op.origin() == Operation.Origin.PRIMARY) { // fatal version conflict final VersionConflictEngineException e = new VersionConflictEngineException( @@ -489,8 +486,13 @@ private Optional checkVersionConflict( op.id(), op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted)); result = onFailure.apply(e); + } else { + /* + * Version conflicts during recovery and on replicas are normal due to asynchronous execution; as such, we should return a + * successful result. + */ + result = onSuccess.get(); } - return Optional.of(result); } else { return Optional.empty(); @@ -672,7 +674,7 @@ private IndexResult innerIndex(Index index) throws IOException { } } final long expectedVersion = index.version(); - final Optional checkVersionConflictResult = + final Optional resultOnVersionConflict = checkVersionConflict( index, currentVersion, @@ -682,15 +684,15 @@ private IndexResult innerIndex(Index index) throws IOException { e -> new IndexResult(e, currentVersion, index.seqNo())); final IndexResult indexResult; - if (checkVersionConflictResult.isPresent()) { - indexResult = checkVersionConflictResult.get(); + if (resultOnVersionConflict.isPresent()) { + indexResult = resultOnVersionConflict.get(); } else { // no version conflict if (index.origin() == Operation.Origin.PRIMARY) { seqNo = seqNoService().generateSeqNo(); } - /** + /* * Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence * number service if this is on the primary, or the existing document's sequence number if this is on the replica. The * primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created. @@ -707,10 +709,12 @@ private IndexResult innerIndex(Index index) throws IOException { update(index.uid(), index.docs(), indexWriter); } indexResult = new IndexResult(updatedVersion, seqNo, deleted); + versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion)); + } + if (!indexResult.hasFailure()) { location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY ? translog.add(new Translog.Index(index, indexResult)) : null; - versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion)); indexResult.setTranslogLocation(location); } indexResult.setTook(System.nanoTime() - index.startTime()); @@ -804,7 +808,7 @@ private DeleteResult innerDelete(Delete delete) throws IOException { final long expectedVersion = delete.version(); - final Optional result = + final Optional resultOnVersionConflict = checkVersionConflict( delete, currentVersion, @@ -812,10 +816,9 @@ private DeleteResult innerDelete(Delete delete) throws IOException { deleted, () -> new DeleteResult(expectedVersion, delete.seqNo(), true), e -> new DeleteResult(e, expectedVersion, delete.seqNo())); - final DeleteResult deleteResult; - if (result.isPresent()) { - deleteResult = result.get(); + if (resultOnVersionConflict.isPresent()) { + deleteResult = resultOnVersionConflict.get(); } else { if (delete.origin() == Operation.Origin.PRIMARY) { seqNo = seqNoService().generateSeqNo(); @@ -824,11 +827,14 @@ private DeleteResult innerDelete(Delete delete) throws IOException { updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion); found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue); deleteResult = new DeleteResult(updatedVersion, seqNo, found); + + versionMap.putUnderLock(delete.uid().bytes(), + new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis())); + } + if (!deleteResult.hasFailure()) { location = delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY ? translog.add(new Translog.Delete(delete, deleteResult)) : null; - versionMap.putUnderLock(delete.uid().bytes(), - new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis())); deleteResult.setTranslogLocation(location); } deleteResult.setTook(System.nanoTime() - delete.startTime()); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f0ca8292f4f23..6f85d65bc913a 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1478,76 +1478,121 @@ public void testVersioningCreateExistsExceptionWithFlush() { } public void testVersioningReplicaConflict1() { - ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocument(), B_1, null); - Engine.Index index = new Engine.Index(newUid("1"), doc); - Engine.IndexResult indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(1L)); + final ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocument(), B_1, null); + final Engine.Index v1Index = new Engine.Index(newUid("1"), doc); + final Engine.IndexResult v1Result = engine.index(v1Index); + assertThat(v1Result.getVersion(), equalTo(1L)); - index = new Engine.Index(newUid("1"), doc); - indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(2L)); + final Engine.Index v2Index = new Engine.Index(newUid("1"), doc); + final Engine.IndexResult v2Result = engine.index(v2Index); + assertThat(v2Result.getVersion(), equalTo(2L)); // apply the second index to the replica, should work fine - index = new Engine.Index(newUid("1"), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); - indexResult = replicaEngine.index(index); - assertThat(indexResult.getVersion(), equalTo(2L)); - - long seqNo = indexResult.getSeqNo(); - long primaryTerm = index.primaryTerm(); - // now, the old one should not work - index = new Engine.Index(newUid("1"), doc, seqNo, primaryTerm, 1L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); - indexResult = replicaEngine.index(index); - assertTrue(indexResult.hasFailure()); - assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); + final Engine.Index replicaV2Index = new Engine.Index( + newUid("1"), + doc, + v2Result.getSeqNo(), + v2Index.primaryTerm(), + v2Result.getVersion(), + VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), + REPLICA, + 0, + -1, + false); + final Engine.IndexResult replicaV2Result = replicaEngine.index(replicaV2Index); + assertThat(replicaV2Result.getVersion(), equalTo(2L)); + + // now, the old one should produce an indexing result + final Engine.Index replicaV1Index = new Engine.Index( + newUid("1"), + doc, + v1Result.getSeqNo(), + v1Index.primaryTerm(), + v1Result.getVersion(), + VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), + REPLICA, + 0, + -1, + false); + final Engine.IndexResult replicaV1Result = replicaEngine.index(replicaV1Index); + assertFalse(replicaV1Result.hasFailure()); + assertFalse(replicaV1Result.isCreated()); + assertThat(replicaV1Result.getVersion(), equalTo(2L)); // second version on replica should fail as well - index = new Engine.Index(newUid("1"), doc, seqNo, primaryTerm, 2L - , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); - indexResult = replicaEngine.index(index); - assertThat(indexResult.getVersion(), equalTo(2L)); - assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); + final Engine.IndexResult replicaV2ReplayResult = replicaEngine.index(replicaV2Index); + assertFalse(replicaV2Result.hasFailure()); + assertFalse(replicaV1Result.isCreated()); + assertThat(replicaV2ReplayResult.getVersion(), equalTo(2L)); } public void testVersioningReplicaConflict2() { - ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocument(), B_1, null); - Engine.Index index = new Engine.Index(newUid("1"), doc); - Engine.IndexResult indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(1L)); + final ParsedDocument doc = testParsedDocument("1", "1", "test", null, testDocument(), B_1, null); + final Engine.Index v1Index = new Engine.Index(newUid("1"), doc); + final Engine.IndexResult v1Result = engine.index(v1Index); + assertThat(v1Result.getVersion(), equalTo(1L)); // apply the first index to the replica, should work fine - index = new Engine.Index(newUid("1"), doc, indexResult.getSeqNo(), index.primaryTerm(), 1L, - VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); - indexResult = replicaEngine.index(index); - assertThat(indexResult.getVersion(), equalTo(1L)); + final Engine.Index replicaV1Index = new Engine.Index( + newUid("1"), + doc, + v1Result.getSeqNo(), + v1Index.primaryTerm(), + v1Result.getVersion(), + VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), + REPLICA, + 0, + -1, + false); + Engine.IndexResult replicaV1Result = replicaEngine.index(replicaV1Index); + assertThat(replicaV1Result.getVersion(), equalTo(1L)); // index it again - index = new Engine.Index(newUid("1"), doc); - indexResult = engine.index(index); - assertThat(indexResult.getVersion(), equalTo(2L)); + final Engine.Index v2Index = new Engine.Index(newUid("1"), doc); + final Engine.IndexResult v2Result = engine.index(v2Index); + assertThat(v2Result.getVersion(), equalTo(2L)); // now delete it - Engine.Delete delete = new Engine.Delete("test", "1", newUid("1")); - Engine.DeleteResult deleteResult = engine.delete(delete); + final Engine.Delete delete = new Engine.Delete("test", "1", newUid("1")); + final Engine.DeleteResult deleteResult = engine.delete(delete); assertThat(deleteResult.getVersion(), equalTo(3L)); // apply the delete on the replica (skipping the second index) - delete = new Engine.Delete("test", "1", newUid("1"), deleteResult.getSeqNo(), delete.primaryTerm(), 3L - , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0); - deleteResult = replicaEngine.delete(delete); - assertThat(deleteResult.getVersion(), equalTo(3L)); - - // second time delete with same version should fail - delete = new Engine.Delete("test", "1", newUid("1"), deleteResult.getSeqNo(), delete.primaryTerm(), 3L - , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0); - deleteResult = replicaEngine.delete(delete); - assertTrue(deleteResult.hasFailure()); - assertThat(deleteResult.getFailure(), instanceOf(VersionConflictEngineException.class)); - - // now do the second index on the replica, it should fail - index = new Engine.Index(newUid("1"), doc, deleteResult.getSeqNo(), delete.primaryTerm(), 2L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); - indexResult = replicaEngine.index(index); - assertTrue(indexResult.hasFailure()); - assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); + final Engine.Delete replicaDelete = new Engine.Delete( + "test", + "1", + newUid("1"), + deleteResult.getSeqNo(), + delete.primaryTerm(), + deleteResult.getVersion(), + VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), + REPLICA, + 0); + final Engine.DeleteResult replicaDeleteResult = replicaEngine.delete(replicaDelete); + assertThat(replicaDeleteResult.getVersion(), equalTo(3L)); + + // second time delete with same version should just produce the same version + final Engine.DeleteResult deleteReplayResult = replicaEngine.delete(replicaDelete); + assertFalse(deleteReplayResult.hasFailure()); + assertTrue(deleteReplayResult.isFound()); + assertThat(deleteReplayResult.getVersion(), equalTo(3L)); + + // now do the second index on the replica, it should result in the current version + final Engine.Index replicaV2Index = new Engine.Index( + newUid("1"), + doc, + v2Result.getSeqNo(), + v2Index.primaryTerm(), + v2Result.getVersion(), + VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), + REPLICA, + 0, + -1, + false); + final Engine.IndexResult replicaV2Result = replicaEngine.index(replicaV2Index); + assertFalse(replicaV2Result.hasFailure()); + assertFalse(replicaV2Result.isCreated()); + assertThat(replicaV2Result.getVersion(), equalTo(3L)); } public void testBasicCreatedFlag() {