From 56b05172bc0bdc210737855ad398f7295240315a Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Wed, 22 Feb 2017 15:00:16 -0500 Subject: [PATCH 01/18] Replicate write failures Currently, when a primary write operation fails after generating a sequence number, the failure is not communicated to the replicas. Ideally, every operation which generates a sequence number on primary should be recorded in all replicas. In this change, a sequence number is associated with write operation failure. When a failure with an assinged seqence number arrives at a replica, the failure cause and sequence number is recorded in the translog and the sequence number is marked as completed via executing `Engine.noOp` on the replica engine. --- .../action/bulk/BulkItemResponse.java | 38 ++++++++-- .../action/bulk/TransportShardBulkAction.java | 72 ++++++++++++------- .../elasticsearch/index/engine/Engine.java | 2 +- .../elasticsearch/index/shard/IndexShard.java | 34 ++++++--- 4 files changed, 104 insertions(+), 42 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java index 2e2a7f1540108..396894ab2d29e 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java @@ -37,6 +37,8 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -171,17 +173,26 @@ public static class Failure implements Writeable, ToXContent { private final String id; private final Exception cause; private final RestStatus status; + private final long seqNo; - Failure(String index, String type, String id, Exception cause, RestStatus status) { + public Failure(String index, String type, String id, Exception cause) { + this(index, type, id, cause, ExceptionsHelper.status(cause), SequenceNumbersService.UNASSIGNED_SEQ_NO); + } + + public Failure(String index, String type, String id, Exception cause, RestStatus status) { + this(index, type, id, cause, status, SequenceNumbersService.UNASSIGNED_SEQ_NO); + } + public Failure(String index, String type, String id, Exception cause, long seqNo) { + this(index, type, id, cause, ExceptionsHelper.status(cause), seqNo); + } + + public Failure(String index, String type, String id, Exception cause, RestStatus status, long seqNo) { this.index = index; this.type = type; this.id = id; this.cause = cause; this.status = status; - } - - public Failure(String index, String type, String id, Exception cause) { - this(index, type, id, cause, ExceptionsHelper.status(cause)); + this.seqNo = seqNo; } /** @@ -193,6 +204,11 @@ public Failure(StreamInput in) throws IOException { id = in.readOptionalString(); cause = in.readException(); status = ExceptionsHelper.status(cause); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + seqNo = in.readLong(); + } else { + seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + } } @Override @@ -201,6 +217,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(getType()); out.writeOptionalString(getId()); out.writeException(getCause()); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + out.writeLong(getSeqNo()); + } } @@ -246,6 +265,15 @@ public Exception getCause() { return cause; } + /** + * The operation sequence number generated by primary + * NOTE: {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} + * indicates sequence number was not generated by primary + */ + public long getSeqNo() { + return seqNo; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(INDEX_FIELD, index); diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 697f4c2f9938e..38abb97f4a194 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -208,7 +208,8 @@ static BulkItemResponse createPrimaryResponse(BulkItemResultHolder bulkItemResul // Make sure to use request.index() here, if you // use docWriteRequest.index() it will use the // concrete index instead of an alias if used! - new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), failure)); + new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), + failure, operationResult.getSeqNo())); } else { assert replicaRequest.getPrimaryResponse() != null : "replica request must have a primary response"; return null; @@ -361,8 +362,7 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq static boolean shouldExecuteReplicaItem(final BulkItemRequest request, final int index) { final BulkItemResponse primaryResponse = request.getPrimaryResponse(); assert primaryResponse != null : "expected primary response to be set for item [" + index + "] request ["+ request.request() +"]"; - return primaryResponse.isFailed() == false && - primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP; + return request.getPrimaryResponse().getResponse().getResult() != DocWriteResponse.Result.NOOP; } @Override @@ -372,33 +372,45 @@ public WriteReplicaResult shardOperationOnReplica(BulkShardReq BulkItemRequest item = request.items()[i]; if (shouldExecuteReplicaItem(item, i)) { DocWriteRequest docWriteRequest = item.request(); - DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse(); final Engine.Result operationResult; try { - switch (docWriteRequest.opType()) { - case CREATE: - case INDEX: - operationResult = executeIndexRequestOnReplica(primaryResponse, (IndexRequest) docWriteRequest, replica); - break; - case DELETE: - operationResult = executeDeleteRequestOnReplica(primaryResponse, (DeleteRequest) docWriteRequest, replica); - break; - default: - throw new IllegalStateException("Unexpected request operation type on replica: " - + docWriteRequest.opType().getLowercase()); + if (item.getPrimaryResponse().isFailed()) { + // execution on primary resulted in a failure + // if primary execution generated a sequence no, execute a noop on the replica engine to record it in the translog + final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure(); + operationResult = failure.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO + ? executeNoOpRequestOnReplica(failure, docWriteRequest, replica) + : null; + } else { + final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse(); + switch (docWriteRequest.opType()) { + case CREATE: + case INDEX: + operationResult = executeIndexRequestOnReplica(primaryResponse, (IndexRequest) docWriteRequest, replica); + break; + case DELETE: + operationResult = executeDeleteRequestOnReplica(primaryResponse, (DeleteRequest) docWriteRequest, replica); + break; + default: + throw new IllegalStateException("Unexpected request operation type on replica: " + + docWriteRequest.opType().getLowercase()); + } + assert operationResult != null : "operation result must never be null when primary response has no failure"; } - if (operationResult.hasFailure()) { - // check if any transient write operation failures should be bubbled up - Exception failure = operationResult.getFailure(); - assert failure instanceof VersionConflictEngineException - || failure instanceof MapperParsingException - : "expected any one of [version conflict, mapper parsing, engine closed, index shard closed]" + - " failures. got " + failure; - if (!TransportActions.isShardNotAvailableException(failure)) { - throw failure; + if (operationResult != null) { + if (operationResult.hasFailure()) { + // check if any transient write operation failures should be bubbled up + Exception failure = operationResult.getFailure(); + assert failure instanceof VersionConflictEngineException + || failure instanceof MapperParsingException + : "expected any one of [version conflict, mapper parsing, engine closed, index shard closed]" + + " failures. got " + failure; + if (!TransportActions.isShardNotAvailableException(failure)) { + throw failure; + } + } else { + location = locationToSync(location, operationResult.getTranslogLocation()); } - } else { - location = locationToSync(location, operationResult.getTranslogLocation()); } } catch (Exception e) { // if its not an ignore replica failure, we need to make sure to bubble up the failure @@ -533,6 +545,14 @@ private static Engine.DeleteResult executeDeleteRequestOnReplica(DocWriteRespons return replica.delete(delete); } + private Engine.NoOpResult executeNoOpRequestOnReplica(BulkItemResponse.Failure primaryFailure, DocWriteRequest docWriteRequest, IndexShard replica) throws IOException { + final long version = docWriteRequest.version(); + final VersionType versionType = docWriteRequest.versionType().versionTypeForReplicationAndRecovery(); + final Engine.NoOp noOp = replica.prepareNoOpOnReplica(docWriteRequest.type(), docWriteRequest.id(), + primaryFailure.getSeqNo(), version, versionType, primaryFailure.getMessage()); + return replica.noOp(noOp); + } + class ConcreteMappingUpdatePerformer implements MappingUpdatePerformer { public void updateMappings(final Mapping update, final ShardId shardId, diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 59655abf2894c..43c0b2bdd61e8 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -432,7 +432,7 @@ public boolean isFound() { } - static class NoOpResult extends Result { + public static class NoOpResult extends Result { NoOpResult(long seqNo) { super(Operation.TYPE.NO_OP, 0, seqNo); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 1dee58ced002b..a9dd647853f06 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -569,12 +569,22 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc return result; } + public Engine.NoOp prepareNoOpOnReplica(String type, String id, long seqNo, long version, VersionType versionType, String reason) { + verifyReplicationTarget(); + final Term uid = extractUid(type, id); + long startTime = System.nanoTime(); + return new Engine.NoOp(uid, seqNo, primaryTerm, version, versionType, Engine.Operation.Origin.REPLICA, startTime, reason); + } + + public Engine.NoOpResult noOp(Engine.NoOp noOp) throws IOException { + ensureWriteAllowed(noOp); + Engine engine = getEngine(); + return engine.noOp(noOp); + } + public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version, VersionType versionType) { verifyPrimary(); - final DocumentMapper documentMapper = docMapper(type).getDocumentMapper(); - final MappedFieldType uidFieldType = documentMapper.uidMapper().fieldType(); - final Query uidQuery = uidFieldType.termQuery(Uid.createUid(type, id), null); - final Term uid = MappedFieldType.extractTerm(uidQuery); + final Term uid = extractUid(type, id); return prepareDelete(type, id, uid, SequenceNumbersService.UNASSIGNED_SEQ_NO, primaryTerm, version, versionType, Engine.Operation.Origin.PRIMARY); } @@ -582,15 +592,12 @@ public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version public Engine.Delete prepareDeleteOnReplica(String type, String id, long seqNo, long primaryTerm, long version, VersionType versionType) { verifyReplicationTarget(); - final DocumentMapper documentMapper = docMapper(type).getDocumentMapper(); - final MappedFieldType uidFieldType = documentMapper.uidMapper().fieldType(); - final Query uidQuery = uidFieldType.termQuery(Uid.createUid(type, id), null); - final Term uid = MappedFieldType.extractTerm(uidQuery); + final Term uid = extractUid(type, id); return prepareDelete(type, id, uid, seqNo, primaryTerm, version, versionType, Engine.Operation.Origin.REPLICA); } - static Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, - VersionType versionType, Engine.Operation.Origin origin) { + private static Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, + VersionType versionType, Engine.Operation.Origin origin) { long startTime = System.nanoTime(); return new Engine.Delete(type, id, uid, seqNo, primaryTerm, version, versionType, origin, startTime); } @@ -601,6 +608,13 @@ public Engine.DeleteResult delete(Engine.Delete delete) throws IOException { return delete(engine, delete); } + private Term extractUid(String type, String id) { + final DocumentMapper documentMapper = docMapper(type).getDocumentMapper(); + final MappedFieldType uidFieldType = documentMapper.uidMapper().fieldType(); + final Query uidQuery = uidFieldType.termQuery(Uid.createUid(type, id), null); + return MappedFieldType.extractTerm(uidQuery); + } + private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws IOException { active.set(true); final Engine.DeleteResult result; From e8e999d2d3c9298e5425018f776951df5ada5b81 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Thu, 2 Mar 2017 13:26:21 -0500 Subject: [PATCH 02/18] use zlong to serialize seq_no --- .../java/org/elasticsearch/action/bulk/BulkItemResponse.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java index 396894ab2d29e..18ff938da12d2 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java @@ -205,7 +205,7 @@ public Failure(StreamInput in) throws IOException { cause = in.readException(); status = ExceptionsHelper.status(cause); if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { - seqNo = in.readLong(); + seqNo = in.readZLong(); } else { seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; } @@ -218,7 +218,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(getId()); out.writeException(getCause()); if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { - out.writeLong(getSeqNo()); + out.writeZLong(getSeqNo()); } } From 02b7edacd19ed922082cb01853f449ca64c24c08 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Mon, 27 Mar 2017 13:46:53 -0400 Subject: [PATCH 03/18] Incorporate feedback --- .../action/bulk/TransportShardBulkAction.java | 34 ++++++--------- .../elasticsearch/index/engine/Engine.java | 26 ++++++++---- .../elasticsearch/index/shard/IndexShard.java | 4 +- .../shard/TranslogRecoveryPerformer.java | 4 +- .../index/engine/InternalEngineTests.java | 4 +- .../elasticsearch/backwards/IndexingIT.java | 41 +++++++++++-------- 6 files changed, 56 insertions(+), 57 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 38abb97f4a194..eb5923e85ea5a 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -71,7 +71,6 @@ import java.io.IOException; import java.util.Map; -import java.util.Objects; import java.util.function.LongSupplier; /** Performs shard-level bulk (index, delete or update) operations */ @@ -362,7 +361,7 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq static boolean shouldExecuteReplicaItem(final BulkItemRequest request, final int index) { final BulkItemResponse primaryResponse = request.getPrimaryResponse(); assert primaryResponse != null : "expected primary response to be set for item [" + index + "] request ["+ request.request() +"]"; - return request.getPrimaryResponse().getResponse().getResult() != DocWriteResponse.Result.NOOP; + return primaryResponse.isFailed() == false && primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP; } @Override @@ -378,9 +377,7 @@ public WriteReplicaResult shardOperationOnReplica(BulkShardReq // execution on primary resulted in a failure // if primary execution generated a sequence no, execute a noop on the replica engine to record it in the translog final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure(); - operationResult = failure.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO - ? executeNoOpRequestOnReplica(failure, docWriteRequest, replica) - : null; + operationResult = executeFailedSeqNoOnReplica(failure, docWriteRequest, replica); } else { final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse(); switch (docWriteRequest.opType()) { @@ -397,20 +394,15 @@ public WriteReplicaResult shardOperationOnReplica(BulkShardReq } assert operationResult != null : "operation result must never be null when primary response has no failure"; } - if (operationResult != null) { - if (operationResult.hasFailure()) { - // check if any transient write operation failures should be bubbled up - Exception failure = operationResult.getFailure(); - assert failure instanceof VersionConflictEngineException - || failure instanceof MapperParsingException - : "expected any one of [version conflict, mapper parsing, engine closed, index shard closed]" + - " failures. got " + failure; - if (!TransportActions.isShardNotAvailableException(failure)) { - throw failure; - } - } else { - location = locationToSync(location, operationResult.getTranslogLocation()); + if (operationResult.hasFailure()) { + // check if any transient write operation failures should be bubbled up + Exception failure = operationResult.getFailure(); + assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure; + if (!TransportActions.isShardNotAvailableException(failure)) { + throw failure; } + } else { + location = locationToSync(location, operationResult.getTranslogLocation()); } } catch (Exception e) { // if its not an ignore replica failure, we need to make sure to bubble up the failure @@ -545,11 +537,9 @@ private static Engine.DeleteResult executeDeleteRequestOnReplica(DocWriteRespons return replica.delete(delete); } - private Engine.NoOpResult executeNoOpRequestOnReplica(BulkItemResponse.Failure primaryFailure, DocWriteRequest docWriteRequest, IndexShard replica) throws IOException { - final long version = docWriteRequest.version(); - final VersionType versionType = docWriteRequest.versionType().versionTypeForReplicationAndRecovery(); + private Engine.NoOpResult executeFailedSeqNoOnReplica(BulkItemResponse.Failure primaryFailure, DocWriteRequest docWriteRequest, IndexShard replica) throws IOException { final Engine.NoOp noOp = replica.prepareNoOpOnReplica(docWriteRequest.type(), docWriteRequest.id(), - primaryFailure.getSeqNo(), version, versionType, primaryFailure.getMessage()); + primaryFailure.getSeqNo(), primaryFailure.getMessage()); return replica.noOp(noOp); } diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 43c0b2bdd61e8..9876c0fdb4b97 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1155,15 +1155,13 @@ public String reason() { } public NoOp( - final Term uid, - final long seqNo, - final long primaryTerm, - final long version, - final VersionType versionType, - final Origin origin, - final long startTime, - final String reason) { - super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); + final Term uid, + final long seqNo, + final long primaryTerm, + final Origin origin, + final long startTime, + final String reason) { + super(uid, seqNo, primaryTerm, 0, null, origin, startTime); this.reason = reason; } @@ -1172,6 +1170,16 @@ public String type() { throw new UnsupportedOperationException(); } + @Override + public long version() { + throw new UnsupportedOperationException(); + } + + @Override + public VersionType versionType() { + throw new UnsupportedOperationException(); + } + @Override String id() { throw new UnsupportedOperationException(); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index a9dd647853f06..a97cf52efa360 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -569,11 +569,11 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc return result; } - public Engine.NoOp prepareNoOpOnReplica(String type, String id, long seqNo, long version, VersionType versionType, String reason) { + public Engine.NoOp prepareNoOpOnReplica(String type, String id, long seqNo, String reason) { verifyReplicationTarget(); final Term uid = extractUid(type, id); long startTime = System.nanoTime(); - return new Engine.NoOp(uid, seqNo, primaryTerm, version, versionType, Engine.Operation.Origin.REPLICA, startTime, reason); + return new Engine.NoOp(uid, seqNo, primaryTerm, Engine.Operation.Origin.REPLICA, startTime, reason); } public Engine.NoOpResult noOp(Engine.NoOp noOp) throws IOException { diff --git a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java index d5aadc1664ea4..9801df664a986 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException; import org.elasticsearch.index.mapper.DocumentMapperForType; @@ -31,7 +30,6 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.Uid; -import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.rest.RestStatus; @@ -182,7 +180,7 @@ private void performRecoveryOperation(Engine engine, Translog.Operation operatio final String reason = noOp.reason(); logger.trace("[translog] recover [no_op] op [({}, {})] of [{}]", seqNo, primaryTerm, reason); final Engine.NoOp engineNoOp = - new Engine.NoOp(null, seqNo, primaryTerm, 0, VersionType.INTERNAL, origin, System.nanoTime(), reason); + new Engine.NoOp(null, seqNo, primaryTerm, origin, System.nanoTime(), reason); noOp(engine, engineNoOp); break; default: diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 71d754ddfb6ca..cbce67b21618c 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3675,9 +3675,7 @@ public long generateSeqNo() { null, maxSeqNo + 1, primaryTerm, - 0, - VersionType.INTERNAL, - randomFrom(PRIMARY, REPLICA, PEER_RECOVERY, LOCAL_TRANSLOG_RECOVERY), + randomFrom(PRIMARY, REPLICA, PEER_RECOVERY, LOCAL_TRANSLOG_RECOVERY), System.nanoTime(), reason)); assertThat(noOpEngine.seqNoService().getLocalCheckpoint(), equalTo((long) (maxSeqNo + 1))); diff --git a/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java index f0be775306725..3a8a288df1ce5 100644 --- a/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -76,7 +76,7 @@ private int indexDocs(String index, final int idStart, final int numDocs) throws for (int i = 0; i < numDocs; i++) { final int id = idStart + i; assertOK(client().performRequest("PUT", index + "/test/" + id, emptyMap(), - new StringEntity("{\"test\": \"test_" + id + "\"}", ContentType.APPLICATION_JSON))); + new StringEntity("{\"test\": \"test_" + randomAsciiOfLength(2) + "\"}", ContentType.APPLICATION_JSON))); } return numDocs; } @@ -116,7 +116,7 @@ public void testIndexVersionPropagation() throws Exception { .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2) .put("index.routing.allocation.include._name", bwcNames); - final String index = "test"; + final String index = "indexversionprop"; final int minUpdates = 5; final int maxUpdates = 10; createIndex(index, settings.build()); @@ -130,7 +130,9 @@ public void testIndexVersionPropagation() throws Exception { updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); ensureGreen(); assertOK(client().performRequest("POST", index + "/_refresh")); - List shards = buildShards(nodes, newNodeClient); + List shards = buildShards(index, nodes, newNodeClient); + Shard primary = buildShards(index, nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); + logger.info("primary resolved to: " + primary.getNode().getNodeName()); for (Shard shard : shards) { assertVersion(index, 1, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc1); assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 1); @@ -140,13 +142,15 @@ public void testIndexVersionPropagation() throws Exception { logger.info("indexing docs with [{}] concurrent updates after allowing shards on all nodes", nUpdates); final int finalVersionForDoc2 = indexDocWithConcurrentUpdates(index, 2, nUpdates); assertOK(client().performRequest("POST", index + "/_refresh")); - shards = buildShards(nodes, newNodeClient); + shards = buildShards(index, nodes, newNodeClient); + primary = shards.stream().filter(Shard::isPrimary).findFirst().get(); + logger.info("primary resolved to: " + primary.getNode().getNodeName()); for (Shard shard : shards) { assertVersion(index, 2, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc2); assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 2); } - Shard primary = buildShards(nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); + primary = shards.stream().filter(Shard::isPrimary).findFirst().get(); logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName()); updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName())); ensureGreen(); @@ -154,7 +158,7 @@ public void testIndexVersionPropagation() throws Exception { logger.info("indexing docs with [{}] concurrent updates after moving primary", nUpdates); final int finalVersionForDoc3 = indexDocWithConcurrentUpdates(index, 3, nUpdates); assertOK(client().performRequest("POST", index + "/_refresh")); - shards = buildShards(nodes, newNodeClient); + shards = buildShards(index, nodes, newNodeClient); for (Shard shard : shards) { assertVersion(index, 3, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc3); assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 3); @@ -167,7 +171,7 @@ public void testIndexVersionPropagation() throws Exception { logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 0", nUpdates); final int finalVersionForDoc4 = indexDocWithConcurrentUpdates(index, 4, nUpdates); assertOK(client().performRequest("POST", index + "/_refresh")); - shards = buildShards(nodes, newNodeClient); + shards = buildShards(index, nodes, newNodeClient); for (Shard shard : shards) { assertVersion(index, 4, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc4); assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 4); @@ -180,7 +184,7 @@ public void testIndexVersionPropagation() throws Exception { logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 1", nUpdates); final int finalVersionForDoc5 = indexDocWithConcurrentUpdates(index, 5, nUpdates); assertOK(client().performRequest("POST", index + "/_refresh")); - shards = buildShards(nodes, newNodeClient); + shards = buildShards(index, nodes, newNodeClient); for (Shard shard : shards) { assertVersion(index, 5, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc5); assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 5); @@ -216,7 +220,7 @@ public void testSeqNoCheckpoints() throws Exception { final int numberOfInitialDocs = 1 + randomInt(5); logger.info("indexing [{}] docs initially", numberOfInitialDocs); numDocs += indexDocs(index, 0, numberOfInitialDocs); - assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient); + assertSeqNoOnShards(index, nodes, checkGlobalCheckpoints, 0, newNodeClient); logger.info("allowing shards on all nodes"); updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); ensureGreen(); @@ -227,8 +231,8 @@ public void testSeqNoCheckpoints() throws Exception { final int numberOfDocsAfterAllowingShardsOnAllNodes = 1 + randomInt(5); logger.info("indexing [{}] docs after allowing shards on all nodes", numberOfDocsAfterAllowingShardsOnAllNodes); numDocs += indexDocs(index, numDocs, numberOfDocsAfterAllowingShardsOnAllNodes); - assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient); - Shard primary = buildShards(nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); + assertSeqNoOnShards(index, nodes, checkGlobalCheckpoints, 0, newNodeClient); + Shard primary = buildShards(index, nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName()); updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName())); ensureGreen(); @@ -237,7 +241,7 @@ public void testSeqNoCheckpoints() throws Exception { logger.info("indexing [{}] docs after moving primary", numberOfDocsAfterMovingPrimary); numDocsOnNewPrimary += indexDocs(index, numDocs, numberOfDocsAfterMovingPrimary); numDocs += numberOfDocsAfterMovingPrimary; - assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient); + assertSeqNoOnShards(index, nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient); /* * Dropping the number of replicas to zero, and then increasing it to one triggers a recovery thus exercising any BWC-logic in * the recovery code. @@ -255,7 +259,7 @@ public void testSeqNoCheckpoints() throws Exception { // the number of documents on the primary and on the recovered replica should match the number of indexed documents assertCount(index, "_primary", numDocs); assertCount(index, "_replica", numDocs); - assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient); + assertSeqNoOnShards(index, nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient); } } @@ -274,10 +278,11 @@ private void assertVersion(final String index, final int docId, final String pre assertThat("version mismatch for doc [" + docId + "] preference [" + preference + "]", actualVersion, equalTo(expectedVersion)); } - private void assertSeqNoOnShards(Nodes nodes, boolean checkGlobalCheckpoints, int numDocs, RestClient client) throws Exception { + private void assertSeqNoOnShards(String index, Nodes nodes, boolean checkGlobalCheckpoints, int numDocs, RestClient client) + throws Exception { assertBusy(() -> { try { - List shards = buildShards(nodes, client); + List shards = buildShards(index, nodes, client); Shard primaryShard = shards.stream().filter(Shard::isPrimary).findFirst().get(); assertNotNull("failed to find primary shard", primaryShard); final long expectedGlobalCkp; @@ -311,9 +316,9 @@ private void assertSeqNoOnShards(Nodes nodes, boolean checkGlobalCheckpoints, in }); } - private List buildShards(Nodes nodes, RestClient client) throws IOException { - Response response = client.performRequest("GET", "test/_stats", singletonMap("level", "shards")); - List shardStats = ObjectPath.createFromResponse(response).evaluate("indices.test.shards.0"); + private List buildShards(String index, Nodes nodes, RestClient client) throws IOException { + Response response = client.performRequest("GET", index + "/_stats", singletonMap("level", "shards")); + List shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards.0"); ArrayList shards = new ArrayList<>(); for (Object shard : shardStats) { final String nodeId = ObjectPath.evaluate(shard, "routing.node"); From 5e829bbe99ba61e3e2d0addced54fafb0bcc04dc Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Tue, 28 Mar 2017 12:11:46 -0400 Subject: [PATCH 04/18] track write failures in translog as a noop in primary --- .../elasticsearch/index/engine/Engine.java | 1 - .../index/engine/InternalEngine.java | 25 +++++++++++++------ 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 9876c0fdb4b97..34854dc294057 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -363,7 +363,6 @@ public Operation.TYPE getOperationType() { void setTranslogLocation(Translog.Location translogLocation) { if (freeze.get() == null) { - assert failure == null : "failure has to be null to set translog location"; this.translogLocation = translogLocation; } else { throw new IllegalStateException("result is already frozen"); diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 0bed51e0e24a1..fef5f39cfa5c7 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -614,10 +614,14 @@ public IndexResult index(Index index) throws IOException { indexResult = new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); } - if (indexResult.hasFailure() == false && - index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { - Translog.Location location = - translog.add(new Translog.Index(index, indexResult)); + if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY + && indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + final Translog.Location location; + if (indexResult.hasFailure() == false) { + location = translog.add(new Translog.Index(index, indexResult)); + } else { + location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage())); + } indexResult.setTranslogLocation(location); } if (indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { @@ -900,10 +904,15 @@ public DeleteResult delete(Delete delete) throws IOException { deleteResult = new DeleteResult(plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false); } - if (!deleteResult.hasFailure() && - delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { - Translog.Location location = - translog.add(new Translog.Delete(delete, deleteResult)); + if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY + && deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + final Translog.Location location; + if (deleteResult.hasFailure() == false) { + location = translog.add(new Translog.Delete(delete, deleteResult)); + } else { + location = translog.add(new Translog.NoOp(deleteResult.getSeqNo(), + delete.primaryTerm(), deleteResult.getFailure().getMessage())); + } deleteResult.setTranslogLocation(location); } if (deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { From 3b67b88937ffd1564eb3f07c00baeaa8056ae6bf Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Wed, 29 Mar 2017 12:25:35 -0400 Subject: [PATCH 05/18] Add tests for replicating write failures. Test that document failure (w/ seq no generated) are recorded as no-op in the translog for primary and replica shards --- .../action/bulk/BulkItemRequest.java | 9 +- .../action/bulk/TransportShardBulkAction.java | 52 +++++++-- .../index/engine/InternalEngine.java | 7 +- .../index/engine/InternalEngineTests.java | 5 +- .../ESIndexLevelReplicationTestCase.java | 110 ++++++++---------- .../IndexLevelReplicationTests.java | 79 ++++++++++++- .../RecoveryDuringReplicationTests.java | 6 +- 7 files changed, 184 insertions(+), 84 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java index 3023ecb1856a4..50da1476f49f3 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java @@ -38,7 +38,8 @@ public class BulkItemRequest implements Streamable { } - protected BulkItemRequest(int id, DocWriteRequest request) { + // NOTE: public for testing only + public BulkItemRequest(int id, DocWriteRequest request) { this.id = id; this.request = request; } @@ -56,13 +57,11 @@ public String index() { return request.indices()[0]; } - // NOTE: protected for testing only - protected BulkItemResponse getPrimaryResponse() { + BulkItemResponse getPrimaryResponse() { return primaryResponse; } - // NOTE: protected for testing only - protected void setPrimaryResponse(BulkItemResponse primaryResponse) { + void setPrimaryResponse(BulkItemResponse primaryResponse) { this.primaryResponse = primaryResponse; } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index eb5923e85ea5a..3a75f5df33d80 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -112,12 +112,36 @@ protected boolean resolveIndex() { @Override public WritePrimaryResult shardOperationOnPrimary( BulkShardRequest request, IndexShard primary) throws Exception { + final BulkShardResult shardResult = performOnPrimary(request, primary, updateHelper, + threadPool::absoluteTimeInMillis, new ConcreteMappingUpdatePerformer()); + return new WritePrimaryResult<>(shardResult.request, shardResult.response, + shardResult.location, null, primary, logger); + } + + /** Result holder of executing shard bulk on primary */ + public static class BulkShardResult { + public final BulkShardRequest request; + public final BulkShardResponse response; + public final Translog.Location location; + + private BulkShardResult(BulkShardRequest request, BulkShardResponse response, Translog.Location location) { + this.request = request; + this.response = response; + this.location = location; + } + } + + public static BulkShardResult performOnPrimary( + BulkShardRequest request, + IndexShard primary, + UpdateHelper updateHelper, + LongSupplier nowInMillisSupplier, + MappingUpdatePerformer mappingUpdater) throws Exception { final IndexMetaData metaData = primary.indexSettings().getIndexMetaData(); Translog.Location location = null; - final MappingUpdatePerformer mappingUpdater = new ConcreteMappingUpdatePerformer(); for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) { location = executeBulkItemRequest(metaData, primary, request, location, requestIndex, - updateHelper, threadPool::absoluteTimeInMillis, mappingUpdater); + updateHelper, nowInMillisSupplier, mappingUpdater); } BulkItemResponse[] responses = new BulkItemResponse[request.items().length]; BulkItemRequest[] items = request.items(); @@ -125,10 +149,9 @@ public WritePrimaryResult shardOperationOnP responses[i] = items[i].getPrimaryResponse(); } BulkShardResponse response = new BulkShardResponse(request.shardId(), responses); - return new WritePrimaryResult<>(request, response, location, null, primary, logger); + return new BulkShardResult(request, response, location); } - private static BulkItemResultHolder executeIndexRequest(final IndexRequest indexRequest, final BulkItemRequest bulkItemRequest, final IndexShard primary, @@ -221,7 +244,7 @@ static Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSha BulkShardRequest request, Translog.Location location, int requestIndex, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, - final MappingUpdatePerformer mappingUpdater) throws Exception { + final MappingUpdatePerformer mappingUpdater) throws Exception { final DocWriteRequest itemRequest = request.items()[requestIndex].request(); final DocWriteRequest.OpType opType = itemRequest.opType(); final BulkItemResultHolder responseHolder; @@ -360,12 +383,22 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq static boolean shouldExecuteReplicaItem(final BulkItemRequest request, final int index) { final BulkItemResponse primaryResponse = request.getPrimaryResponse(); - assert primaryResponse != null : "expected primary response to be set for item [" + index + "] request ["+ request.request() +"]"; - return primaryResponse.isFailed() == false && primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP; + assert primaryResponse != null : "expected primary response to be set for item [" + index + "] request [" + request.request() + "]"; + if (primaryResponse.isFailed()) { + return primaryResponse.getFailure().getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO; + } else { + // NOTE: for bwc as pre-6.0 write requests has unassigned seq no + return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP; + } } @Override public WriteReplicaResult shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { + final Translog.Location location = performOnReplica(request, replica); + return new WriteReplicaResult<>(request, location, null, replica, logger); + } + + public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { Translog.Location location = null; for (int i = 0; i < request.items().length; i++) { BulkItemRequest item = request.items()[i]; @@ -377,6 +410,7 @@ public WriteReplicaResult shardOperationOnReplica(BulkShardReq // execution on primary resulted in a failure // if primary execution generated a sequence no, execute a noop on the replica engine to record it in the translog final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure(); + assert failure.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "seq no must be assigned"; operationResult = executeFailedSeqNoOnReplica(failure, docWriteRequest, replica); } else { final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse(); @@ -413,7 +447,7 @@ public WriteReplicaResult shardOperationOnReplica(BulkShardReq } } } - return new WriteReplicaResult<>(request, location, null, replica, logger); + return location; } private static Translog.Location locationToSync(Translog.Location current, @@ -537,7 +571,7 @@ private static Engine.DeleteResult executeDeleteRequestOnReplica(DocWriteRespons return replica.delete(delete); } - private Engine.NoOpResult executeFailedSeqNoOnReplica(BulkItemResponse.Failure primaryFailure, DocWriteRequest docWriteRequest, IndexShard replica) throws IOException { + private static Engine.NoOpResult executeFailedSeqNoOnReplica(BulkItemResponse.Failure primaryFailure, DocWriteRequest docWriteRequest, IndexShard replica) throws IOException { final Engine.NoOp noOp = replica.prepareNoOpOnReplica(docWriteRequest.type(), docWriteRequest.id(), primaryFailure.getSeqNo(), primaryFailure.getMessage()); return replica.noOp(noOp); diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index fef5f39cfa5c7..dfd85359ae250 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -620,7 +620,8 @@ public IndexResult index(Index index) throws IOException { if (indexResult.hasFailure() == false) { location = translog.add(new Translog.Index(index, indexResult)); } else { - location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage())); + // if we have document failure, record it as a no-op in the translog with the generated seq_no + location = translog.add(new Translog.NoOp(plan.seqNoForIndexing, index.primaryTerm(), indexResult.getFailure().getMessage())); } indexResult.setTranslogLocation(location); } @@ -753,7 +754,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) * we return a `MATCH_ANY` version to indicate no document was index. The value is * not used anyway */ - return new IndexResult(ex, Versions.MATCH_ANY, index.seqNo()); + return new IndexResult(ex, Versions.MATCH_ANY, plan.seqNoForIndexing); } else { throw ex; } @@ -910,7 +911,7 @@ public DeleteResult delete(Delete delete) throws IOException { if (deleteResult.hasFailure() == false) { location = translog.add(new Translog.Delete(delete, deleteResult)); } else { - location = translog.add(new Translog.NoOp(deleteResult.getSeqNo(), + location = translog.add(new Translog.NoOp(plan.seqNoOfDeletion, delete.primaryTerm(), deleteResult.getFailure().getMessage())); } deleteResult.setTranslogLocation(location); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index cbce67b21618c..5408834349f86 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2857,10 +2857,13 @@ public void testHandleDocumentFailure() throws Exception { } Engine.IndexResult indexResult = engine.index(indexForDoc(doc1)); assertNotNull(indexResult.getFailure()); - + // document failures should be recorded in translog + assertNotNull(indexResult.getTranslogLocation()); throwingIndexWriter.get().clearFailure(); indexResult = engine.index(indexForDoc(doc1)); assertNull(indexResult.getFailure()); + // document failures should be recorded in translog + assertNotNull(indexResult.getTranslogLocation()); engine.index(indexForDoc(doc2)); // test failure while deleting diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index c35f72d208533..95d070b38db30 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -22,16 +22,16 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.bulk.BulkItemRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.bulk.BulkShardResponse; +import org.elasticsearch.action.bulk.TransportShardBulkAction; +import org.elasticsearch.action.bulk.TransportShardBulkAction.BulkShardResult; import org.elasticsearch.action.bulk.TransportShardBulkActionTests; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationRequest; @@ -50,7 +50,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; @@ -58,6 +57,7 @@ import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; @@ -77,8 +77,6 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import static org.elasticsearch.action.bulk.TransportShardBulkAction.executeIndexRequestOnPrimary; -import static org.elasticsearch.action.bulk.TransportShardBulkAction.executeIndexRequestOnReplica; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -147,9 +145,13 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { public int indexDocs(final int numOfDoc) throws Exception { for (int doc = 0; doc < numOfDoc; doc++) { final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", Integer.toString(docId.incrementAndGet())) - .source("{}", XContentType.JSON); - final IndexResponse response = index(indexRequest); - assertEquals(DocWriteResponse.Result.CREATED, response.getResult()); + .source("{}", XContentType.JSON); + final BulkItemResponse response = index(indexRequest); + if (response.isFailed()) { + throw response.getFailure().getCause(); + } else if (response.isFailed() == false) { + assertEquals(DocWriteResponse.Result.CREATED, response.getResponse().getResult()); + } } primary.updateGlobalCheckpointOnPrimary(); return numOfDoc; @@ -158,43 +160,29 @@ public int indexDocs(final int numOfDoc) throws Exception { public int appendDocs(final int numOfDoc) throws Exception { for (int doc = 0; doc < numOfDoc; doc++) { final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON); - final IndexResponse response = index(indexRequest); - assertEquals(DocWriteResponse.Result.CREATED, response.getResult()); + final BulkItemResponse response = index(indexRequest); + if (response.isFailed()) { + throw response.getFailure().getCause(); + } else if (response.isFailed() == false) { + assertEquals(DocWriteResponse.Result.CREATED, response.getResponse().getResult()); + } } primary.updateGlobalCheckpointOnPrimary(); return numOfDoc; } - public IndexResponse index(IndexRequest indexRequest) throws Exception { - PlainActionFuture listener = new PlainActionFuture<>(); + public BulkItemResponse index(IndexRequest indexRequest) throws Exception { + PlainActionFuture listener = new PlainActionFuture<>(); final ActionListener wrapBulkListener = ActionListener.wrap( - bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0].getResponse()), + bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0]), listener::onFailure); BulkItemRequest[] items = new BulkItemRequest[1]; - items[0] = new TestBulkItemRequest(0, indexRequest); + items[0] = new BulkItemRequest(0, indexRequest); BulkShardRequest request = new BulkShardRequest(shardId, indexRequest.getRefreshPolicy(), items); new IndexingAction(request, wrapBulkListener, this).execute(); return listener.get(); } - /** BulkItemRequest exposing get/set primary response */ - public class TestBulkItemRequest extends BulkItemRequest { - - TestBulkItemRequest(int id, DocWriteRequest request) { - super(id, request); - } - - @Override - protected void setPrimaryResponse(BulkItemResponse primaryResponse) { - super.setPrimaryResponse(primaryResponse); - } - - @Override - protected BulkItemResponse getPrimaryResponse() { - return super.getPrimaryResponse(); - } - } - public synchronized void startAll() throws IOException { startReplicas(replicas.size()); } @@ -442,7 +430,7 @@ protected Set getInSyncAllocationIds(ShardId shardId, ClusterState clust protected abstract PrimaryResult performOnPrimary(IndexShard primary, Request request) throws Exception; - protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica) throws IOException; + protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica) throws Exception; class PrimaryRef implements ReplicationOperation.Primary { @@ -539,46 +527,50 @@ class IndexingAction extends ReplicationAction mappings = - Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}"); + Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}"); try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(2, mappings))) { shards.startAll(); IndexShard replica1 = shards.getReplicas().get(0); @@ -180,4 +188,67 @@ public void testConflictingOpsOnReplica() throws Exception { } } } + + /** + * test document failures (failures after seq_no generation) are added as noop operation to the translog + * for primary and replica shards + */ + public void testDocumentFailureReplication() throws Exception { + IndexMetaData metaData = buildIndexMetaData(1); + final ReplicationGroup replicationGroupWithDocumentFailureOnPrimary = new ReplicationGroup(metaData) { + @Override + protected EngineFactory getEngineFactory(ShardRouting routing) { + if (routing.primary()) { + return config -> InternalEngineTests.createInternalEngine((directory, writerConfig) -> + new IndexWriter(directory, writerConfig) { + @Override + public long addDocument(Iterable doc) throws IOException { + throw new IOException("simulated document failure"); + } + }, null, config); + } else { + return null; + } + } + }; + replicationGroupWithDocumentFailureOnPrimary.startAll(); + final BulkItemResponse response = replicationGroupWithDocumentFailureOnPrimary.index( + new IndexRequest(index.getName(), "testDocumentFailureReplication", "1") + .source("{}", XContentType.JSON) + ); + assertTrue(response.isFailed()); + for (IndexShard indexShard : replicationGroupWithDocumentFailureOnPrimary) { + try(Translog.View view = indexShard.acquireTranslogView()) { + assertThat(view.totalOperations(), equalTo(1)); + Translog.Operation op = view.snapshot().next(); + assertThat(op.opType(), equalTo(Translog.Operation.Type.NO_OP)); + assertThat(op.seqNo(), equalTo(0L)); + assertThat(((Translog.NoOp) op).reason(), containsString("simulated document failure")); + } + } + replicationGroupWithDocumentFailureOnPrimary.assertAllEqual(0); + replicationGroupWithDocumentFailureOnPrimary.close(); + } + + /** + * test request failures (failures before seq_no generation) are not added as a noop to translog + */ + public void testRequestFailureReplication() throws Exception { + try (ReplicationGroup shards = createGroup(1)) { + shards.startAll(); + final BulkItemResponse response = shards.index( + new IndexRequest(index.getName(), "testRequestFailureException", "1") + .source("{}", XContentType.JSON) + .version(2) + ); + assertTrue(response.isFailed()); + assertThat(response.getFailure().getCause(), instanceOf(VersionConflictEngineException.class)); + shards.assertAllEqual(0); + for (IndexShard indexShard : shards) { + try(Translog.View view = indexShard.acquireTranslogView()) { + assertThat(view.totalOperations(), equalTo(0)); + } + } + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 12f749e681918..ea504b5bd13f9 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -24,9 +24,9 @@ import org.apache.lucene.index.IndexableField; import org.apache.lucene.util.IOUtils; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.bulk.TransportShardBulkAction; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.index.engine.Engine; @@ -168,8 +168,8 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { for (int i = 0; i < rollbackDocs; i++) { final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "rollback_" + i) .source("{}", XContentType.JSON); - final IndexResponse primaryResponse = indexOnPrimary(indexRequest, oldPrimary); - indexOnReplica(primaryResponse, indexRequest, replica); + final TransportShardBulkAction.BulkShardResult bulkShardResult = indexOnPrimary(indexRequest, oldPrimary); + indexOnReplica(bulkShardResult, replica); } if (randomBoolean()) { oldPrimary.flush(new FlushRequest(index.getName())); From c6384c83fc4a1e57ffdc388dcc28a1f8a6c526cc Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Thu, 30 Mar 2017 15:06:41 -0400 Subject: [PATCH 06/18] Update to master --- .../action/bulk/TransportShardBulkAction.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 3a75f5df33d80..9f2e659c1185a 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -43,7 +43,6 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; @@ -467,7 +466,7 @@ private static Translog.Location locationToSync(Translog.Location current, * Execute the given {@link IndexRequest} on a replica shard, throwing a * {@link RetryOnReplicaException} if the operation needs to be re-tried. */ - public static Engine.IndexResult executeIndexRequestOnReplica( + private static Engine.IndexResult executeIndexRequestOnReplica( DocWriteResponse primaryResponse, IndexRequest request, IndexShard replica) throws IOException { @@ -510,7 +509,7 @@ static Engine.Index prepareIndexOperationOnReplica( } /** Utility method to prepare an index operation on primary shards */ - static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) { + private static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) { final SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(), request.id(), request.source(), request.getContentType()) @@ -520,8 +519,8 @@ static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexSh } /** Executes index operation on primary shard after updates mapping if dynamic mappings are found */ - public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary, - MappingUpdatePerformer mappingUpdater) throws Exception { + private static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary, + MappingUpdatePerformer mappingUpdater) throws Exception { // Update the mappings if parsing the documents includes new dynamic updates final Engine.Index preUpdateOperation; final Mapping mappingUpdate; From 65c1a2a4d2ea22dc5bc967643071422a814bb7be Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Tue, 4 Apr 2017 10:11:52 -0400 Subject: [PATCH 07/18] update shouldExecuteOnReplica comment --- .../elasticsearch/action/bulk/TransportShardBulkAction.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 9f2e659c1185a..9c9ba882f0a8d 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -386,7 +386,9 @@ static boolean shouldExecuteReplicaItem(final BulkItemRequest request, final int if (primaryResponse.isFailed()) { return primaryResponse.getFailure().getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO; } else { - // NOTE: for bwc as pre-6.0 write requests has unassigned seq no + // NOTE: pre-6.0 write requests has unassigned seq no + // and in case of failure, requests don't reach the replica + // so we execute on replica when the primary execution is not a noop return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP; } } From d4642852ae908880a44e8e1554f2f495f35308de Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Tue, 4 Apr 2017 10:15:35 -0400 Subject: [PATCH 08/18] rename indexshard noop to markSeqNoAsNoOp --- .../action/bulk/TransportShardBulkAction.java | 7 ++----- .../java/org/elasticsearch/index/shard/IndexShard.java | 4 ++-- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 9c9ba882f0a8d..ebe0ced5c3ada 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -64,9 +64,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.index.translog.Translog.Location; -import org.elasticsearch.action.bulk.BulkItemResultHolder; -import org.elasticsearch.action.bulk.BulkItemResponse; import java.io.IOException; import java.util.Map; @@ -573,9 +570,9 @@ private static Engine.DeleteResult executeDeleteRequestOnReplica(DocWriteRespons } private static Engine.NoOpResult executeFailedSeqNoOnReplica(BulkItemResponse.Failure primaryFailure, DocWriteRequest docWriteRequest, IndexShard replica) throws IOException { - final Engine.NoOp noOp = replica.prepareNoOpOnReplica(docWriteRequest.type(), docWriteRequest.id(), + final Engine.NoOp noOp = replica.preparingMarkingSeqNoAsNoOp(docWriteRequest.type(), docWriteRequest.id(), primaryFailure.getSeqNo(), primaryFailure.getMessage()); - return replica.noOp(noOp); + return replica.markSeqNoAsNoOp(noOp); } class ConcreteMappingUpdatePerformer implements MappingUpdatePerformer { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index a97cf52efa360..f7871894a2320 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -569,14 +569,14 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc return result; } - public Engine.NoOp prepareNoOpOnReplica(String type, String id, long seqNo, String reason) { + public Engine.NoOp preparingMarkingSeqNoAsNoOp(String type, String id, long seqNo, String reason) { verifyReplicationTarget(); final Term uid = extractUid(type, id); long startTime = System.nanoTime(); return new Engine.NoOp(uid, seqNo, primaryTerm, Engine.Operation.Origin.REPLICA, startTime, reason); } - public Engine.NoOpResult noOp(Engine.NoOp noOp) throws IOException { + public Engine.NoOpResult markSeqNoAsNoOp(Engine.NoOp noOp) throws IOException { ensureWriteAllowed(noOp); Engine engine = getEngine(); return engine.noOp(noOp); From d096474eb022c90967cc6b3d2ab73ee21377b3c4 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Tue, 4 Apr 2017 10:17:19 -0400 Subject: [PATCH 09/18] remove redundant conditional --- .../index/replication/ESIndexLevelReplicationTestCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 95d070b38db30..e9d28624033e6 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -149,7 +149,7 @@ public int indexDocs(final int numOfDoc) throws Exception { final BulkItemResponse response = index(indexRequest); if (response.isFailed()) { throw response.getFailure().getCause(); - } else if (response.isFailed() == false) { + } else { assertEquals(DocWriteResponse.Result.CREATED, response.getResponse().getResult()); } } From 8d4d6469d176dd0f6de4dac8d3c807f4c8d90ebe Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Wed, 5 Apr 2017 12:48:35 -0400 Subject: [PATCH 10/18] Consolidate possible replica action for bulk item request depanding on it's primary execution --- .../action/bulk/TransportShardBulkAction.java | 117 ++++++++++++------ .../bulk/TransportShardBulkActionTests.java | 27 ++-- 2 files changed, 91 insertions(+), 53 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index ebe0ced5c3ada..569fa4a04bb1f 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -377,16 +377,46 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq return new BulkItemResultHolder(updateResponse, updateOperationResult, replicaRequest); } - static boolean shouldExecuteReplicaItem(final BulkItemRequest request, final int index) { + /** Result Enum for executing bulk item request on replica */ + public enum ShouldExecuteOnReplicaResult { + + /** + * When primary execution succeeded + */ + NORMAL, + + /** + * When primary execution failed before sequence no was generated + * or primary execution was a noop (only possible when request is originating from pre-6.0 nodes) + */ + NOOP, + + /** + * When primary execution failed after sequence no was generated + */ + FAILURE + } + + /** + * Determines whether a bulk item request should be executed on the replica. + * @return {@link ShouldExecuteOnReplicaResult#NORMAL} upon normal primary execution with no failures + * {@link ShouldExecuteOnReplicaResult#FAILURE} upon primary execution failure after sequence no generation + * {@link ShouldExecuteOnReplicaResult#NOOP} upon primary execution failure before sequence no generation or + * when primary execution resulted in noop (only possible for write requests from pre-6.0 nodes) + */ + static ShouldExecuteOnReplicaResult shouldExecuteOnReplica(final BulkItemRequest request, final int index) { final BulkItemResponse primaryResponse = request.getPrimaryResponse(); assert primaryResponse != null : "expected primary response to be set for item [" + index + "] request [" + request.request() + "]"; if (primaryResponse.isFailed()) { - return primaryResponse.getFailure().getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO; + return primaryResponse.getFailure().getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO + ? ShouldExecuteOnReplicaResult.FAILURE // we have a seq no generated with the failure, replicate as no-op + : ShouldExecuteOnReplicaResult.NOOP; // no seq no generated, ignore replication } else { - // NOTE: pre-6.0 write requests has unassigned seq no - // and in case of failure, requests don't reach the replica - // so we execute on replica when the primary execution is not a noop - return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP; + // NOTE: write requests originating from pre-6.0 nodes can send a no-op operation to + // the replica; we ignore replicatio + return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP + ? ShouldExecuteOnReplicaResult.NORMAL // execution successful on primary + : ShouldExecuteOnReplicaResult.NOOP; // ignore replication } } @@ -400,17 +430,12 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index Translog.Location location = null; for (int i = 0; i < request.items().length; i++) { BulkItemRequest item = request.items()[i]; - if (shouldExecuteReplicaItem(item, i)) { - DocWriteRequest docWriteRequest = item.request(); - final Engine.Result operationResult; - try { - if (item.getPrimaryResponse().isFailed()) { - // execution on primary resulted in a failure - // if primary execution generated a sequence no, execute a noop on the replica engine to record it in the translog - final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure(); - assert failure.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "seq no must be assigned"; - operationResult = executeFailedSeqNoOnReplica(failure, docWriteRequest, replica); - } else { + final ShouldExecuteOnReplicaResult shouldExecuteOnReplicaResult = shouldExecuteOnReplica(item, i); + final Engine.Result operationResult; + DocWriteRequest docWriteRequest = item.request(); + try { + switch (shouldExecuteOnReplicaResult) { + case NORMAL: final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse(); switch (docWriteRequest.opType()) { case CREATE: @@ -422,32 +447,50 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index break; default: throw new IllegalStateException("Unexpected request operation type on replica: " - + docWriteRequest.opType().getLowercase()); + + docWriteRequest.opType().getLowercase()); } assert operationResult != null : "operation result must never be null when primary response has no failure"; - } - if (operationResult.hasFailure()) { - // check if any transient write operation failures should be bubbled up - Exception failure = operationResult.getFailure(); - assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure; - if (!TransportActions.isShardNotAvailableException(failure)) { - throw failure; - } - } else { - location = locationToSync(location, operationResult.getTranslogLocation()); - } - } catch (Exception e) { - // if its not an ignore replica failure, we need to make sure to bubble up the failure - // so we will fail the shard - if (!TransportActions.isShardNotAvailableException(e)) { - throw e; - } + location = handleOperationResult(operationResult, location); + break; + case NOOP: + break; + case FAILURE: + final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure(); + assert failure.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "seq no must be assigned"; + operationResult = executeFailedSeqNoOnReplica(failure, docWriteRequest, replica); + assert operationResult != null : "operation result must never be null when primary response has no failure"; + location = handleOperationResult(operationResult, location); + break; + } + } catch (Exception e) { + // if its not an ignore replica failure, we need to make sure to bubble up the failure + // so we will fail the shard + if (!TransportActions.isShardNotAvailableException(e)) { + throw e; } } } return location; } + private static Translog.Location handleOperationResult(final Engine.Result operationResult, + final Translog.Location currentLocation) throws Exception { + final Translog.Location location; + if (operationResult.hasFailure()) { + // check if any transient write operation failures should be bubbled up + Exception failure = operationResult.getFailure(); + assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure; + if (!TransportActions.isShardNotAvailableException(failure)) { + throw failure; + } else { + location = currentLocation; + } + } else { + location = locationToSync(currentLocation, operationResult.getTranslogLocation()); + } + return location; + } + private static Translog.Location locationToSync(Translog.Location current, Translog.Location next) { /* here we are moving forward in the translog with each operation. Under the hood this might @@ -518,8 +561,8 @@ private static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, } /** Executes index operation on primary shard after updates mapping if dynamic mappings are found */ - private static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary, - MappingUpdatePerformer mappingUpdater) throws Exception { + static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary, + MappingUpdatePerformer mappingUpdater) throws Exception { // Update the mappings if parsing the documents includes new dynamic updates final Engine.Index preUpdateOperation; final Mapping mappingUpdate; diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 355b3978cbf46..b9db203db5ab6 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -23,7 +23,6 @@ import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; @@ -34,14 +33,9 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Requests; -import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.lucene.uid.Versions; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.Index; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; @@ -52,13 +46,8 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.action.bulk.TransportShardBulkAction; -import org.elasticsearch.action.bulk.MappingUpdatePerformer; -import org.elasticsearch.action.bulk.BulkItemResultHolder; import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.CoreMatchers.equalTo; @@ -96,26 +85,32 @@ public void testShouldExecuteReplicaItem() throws Exception { DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 1, randomBoolean()); BulkItemRequest request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, response)); - assertTrue(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0)); + assertThat(TransportShardBulkAction.shouldExecuteOnReplica(request, 0), equalTo(TransportShardBulkAction.ShouldExecuteOnReplicaResult.NORMAL)); - // Failed index requests should not be replicated (for now!) + // Failed index requests without sequence no should not be replicated writeRequest = new IndexRequest("index", "type", "id") .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); - response = new IndexResponse(shardId, "type", "id", 1, 1, randomBoolean()); request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse( new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, new BulkItemResponse.Failure("index", "type", "id", new IllegalArgumentException("i died")))); - assertFalse(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0)); + assertThat(TransportShardBulkAction.shouldExecuteOnReplica(request, 0), equalTo(TransportShardBulkAction.ShouldExecuteOnReplicaResult.NOOP)); + // Failed index requests with sequence no should be replicated + request = new BulkItemRequest(0, writeRequest); + request.setPrimaryResponse( + new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, + new BulkItemResponse.Failure("index", "type", "id", + new IllegalArgumentException("i died after sequence no was generated"), 1))); + assertThat(TransportShardBulkAction.shouldExecuteOnReplica(request, 0), equalTo(TransportShardBulkAction.ShouldExecuteOnReplicaResult.FAILURE)); // NOOP requests should not be replicated writeRequest = new UpdateRequest("index", "type", "id"); response = new UpdateResponse(shardId, "type", "id", 1, DocWriteResponse.Result.NOOP); request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.UPDATE, response)); - assertFalse(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0)); + assertThat(TransportShardBulkAction.shouldExecuteOnReplica(request, 0), equalTo(TransportShardBulkAction.ShouldExecuteOnReplicaResult.NOOP)); } From 41807279e1e0bef10cbad47a7bfca30468cbaa56 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Wed, 5 Apr 2017 14:56:25 -0400 Subject: [PATCH 11/18] remove bulk shard result abstraction --- .../action/bulk/TransportShardBulkAction.java | 22 +++--------------- .../replication/TransportWriteAction.java | 4 +++- .../bulk/TransportShardBulkActionTests.java | 18 +++++++++++---- .../ESIndexLevelReplicationTestCase.java | 23 +++++++++++-------- .../RecoveryDuringReplicationTests.java | 6 ++--- .../elasticsearch/backwards/IndexingIT.java | 1 + 6 files changed, 36 insertions(+), 38 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 569fa4a04bb1f..bed35703de81c 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -108,26 +108,10 @@ protected boolean resolveIndex() { @Override public WritePrimaryResult shardOperationOnPrimary( BulkShardRequest request, IndexShard primary) throws Exception { - final BulkShardResult shardResult = performOnPrimary(request, primary, updateHelper, - threadPool::absoluteTimeInMillis, new ConcreteMappingUpdatePerformer()); - return new WritePrimaryResult<>(shardResult.request, shardResult.response, - shardResult.location, null, primary, logger); + return performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, new ConcreteMappingUpdatePerformer()); } - /** Result holder of executing shard bulk on primary */ - public static class BulkShardResult { - public final BulkShardRequest request; - public final BulkShardResponse response; - public final Translog.Location location; - - private BulkShardResult(BulkShardRequest request, BulkShardResponse response, Translog.Location location) { - this.request = request; - this.response = response; - this.location = location; - } - } - - public static BulkShardResult performOnPrimary( + public static WritePrimaryResult performOnPrimary( BulkShardRequest request, IndexShard primary, UpdateHelper updateHelper, @@ -145,7 +129,7 @@ public static BulkShardResult performOnPrimary( responses[i] = items[i].getPrimaryResponse(); } BulkShardResponse response = new BulkShardResponse(request.shardId(), responses); - return new BulkShardResult(request, response, location); + return new WritePrimaryResult<>(request, response, location, null, primary, logger); } private static BulkItemResultHolder executeIndexRequest(final IndexRequest indexRequest, diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index ae4ae78c03386..938e90b82b2fb 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -94,8 +94,10 @@ protected abstract WriteReplicaResult shardOperationOnReplica( /** * Result of taking the action on the primary. + * + * NOTE: public for testing */ - protected static class WritePrimaryResult, + public static class WritePrimaryResult, Response extends ReplicationResponse & WriteResponse> extends PrimaryResult implements RespondingWriteResult { boolean finishedAsyncActions; diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index b9db203db5ab6..fdd96ab6e52ad 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.TransportShardBulkAction.ShouldExecuteOnReplicaResult; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; @@ -50,6 +51,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; +import static org.elasticsearch.action.bulk.TransportShardBulkAction.shouldExecuteOnReplica; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.containsString; @@ -85,7 +87,8 @@ public void testShouldExecuteReplicaItem() throws Exception { DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 1, randomBoolean()); BulkItemRequest request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, response)); - assertThat(TransportShardBulkAction.shouldExecuteOnReplica(request, 0), equalTo(TransportShardBulkAction.ShouldExecuteOnReplicaResult.NORMAL)); + assertThat(shouldExecuteOnReplica(request, 0), + equalTo(ShouldExecuteOnReplicaResult.NORMAL)); // Failed index requests without sequence no should not be replicated writeRequest = new IndexRequest("index", "type", "id") @@ -95,22 +98,27 @@ public void testShouldExecuteReplicaItem() throws Exception { new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, new BulkItemResponse.Failure("index", "type", "id", new IllegalArgumentException("i died")))); - assertThat(TransportShardBulkAction.shouldExecuteOnReplica(request, 0), equalTo(TransportShardBulkAction.ShouldExecuteOnReplicaResult.NOOP)); + assertThat(shouldExecuteOnReplica(request, 0), + equalTo(ShouldExecuteOnReplicaResult.NOOP)); // Failed index requests with sequence no should be replicated request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse( new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, new BulkItemResponse.Failure("index", "type", "id", - new IllegalArgumentException("i died after sequence no was generated"), 1))); - assertThat(TransportShardBulkAction.shouldExecuteOnReplica(request, 0), equalTo(TransportShardBulkAction.ShouldExecuteOnReplicaResult.FAILURE)); + new IllegalArgumentException( + "i died after sequence no was generated"), + 1))); + assertThat(shouldExecuteOnReplica(request, 0), + equalTo(ShouldExecuteOnReplicaResult.FAILURE)); // NOOP requests should not be replicated writeRequest = new UpdateRequest("index", "type", "id"); response = new UpdateResponse(shardId, "type", "id", 1, DocWriteResponse.Result.NOOP); request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.UPDATE, response)); - assertThat(TransportShardBulkAction.shouldExecuteOnReplica(request, 0), equalTo(TransportShardBulkAction.ShouldExecuteOnReplicaResult.NOOP)); + assertThat(shouldExecuteOnReplica(request, 0), + equalTo(ShouldExecuteOnReplicaResult.NOOP)); } diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index e9d28624033e6..2243a5769b99a 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -29,7 +29,6 @@ import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.bulk.BulkShardResponse; import org.elasticsearch.action.bulk.TransportShardBulkAction; -import org.elasticsearch.action.bulk.TransportShardBulkAction.BulkShardResult; import org.elasticsearch.action.bulk.TransportShardBulkActionTests; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; @@ -37,6 +36,7 @@ import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction.ReplicaResponse; +import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.action.support.replication.TransportWriteActionTestHelper; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -527,8 +527,8 @@ class IndexingAction extends ReplicationAction result = executeShardBulkOnPrimary(primary, request); + return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful); } @Override @@ -537,16 +537,17 @@ protected void performOnReplica(BulkShardRequest request, IndexShard replica) th } } - private BulkShardResult executeShardBulkOnPrimary(IndexShard primary, BulkShardRequest request) throws Exception { + private TransportWriteAction.WritePrimaryResult executeShardBulkOnPrimary(IndexShard primary, BulkShardRequest request) throws Exception { for (BulkItemRequest itemRequest : request.items()) { if (itemRequest.request() instanceof IndexRequest) { ((IndexRequest) itemRequest.request()).process(null, index.getName()); } } - final BulkShardResult result = TransportShardBulkAction.performOnPrimary(request, primary, null, + final TransportWriteAction.WritePrimaryResult result = + TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis, new TransportShardBulkActionTests.NoopMappingUpdatePerformer()); request.primaryTerm(primary.getPrimaryTerm()); - TransportWriteActionTestHelper.performPostWriteActions(primary, result.request, result.location, logger); + TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.location, logger); return result; } @@ -558,19 +559,21 @@ private void executeShardBulkOnReplica(IndexShard replica, BulkShardRequest requ /** * indexes the given requests on the supplied primary, modifying it for replicas */ - BulkShardResult indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception { + BulkShardRequest indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception { final BulkItemRequest bulkItemRequest = new BulkItemRequest(0, request); BulkItemRequest[] bulkItemRequests = new BulkItemRequest[1]; bulkItemRequests[0] = bulkItemRequest; final BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, request.getRefreshPolicy(), bulkItemRequests); - return executeShardBulkOnPrimary(primary, bulkShardRequest); + final TransportWriteAction.WritePrimaryResult result = + executeShardBulkOnPrimary(primary, bulkShardRequest); + return result.replicaRequest(); } /** * indexes the given requests on the supplied replica shard */ - void indexOnReplica(BulkShardResult primaryResult, IndexShard replica) throws Exception { - executeShardBulkOnReplica(replica, primaryResult.request); + void indexOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { + executeShardBulkOnReplica(replica, request); } class GlobalCheckpointSync extends ReplicationAction Date: Thu, 6 Apr 2017 14:05:15 -0400 Subject: [PATCH 12/18] fix failure handling logic for bwc --- .../action/bulk/TransportShardBulkAction.java | 11 ++++++----- .../index/engine/InternalEngine.java | 18 ++++++++++-------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index bed35703de81c..20a517da03c14 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -397,7 +397,7 @@ static ShouldExecuteOnReplicaResult shouldExecuteOnReplica(final BulkItemRequest : ShouldExecuteOnReplicaResult.NOOP; // no seq no generated, ignore replication } else { // NOTE: write requests originating from pre-6.0 nodes can send a no-op operation to - // the replica; we ignore replicatio + // the replica; we ignore replication return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP ? ShouldExecuteOnReplicaResult.NORMAL // execution successful on primary : ShouldExecuteOnReplicaResult.NOOP; // ignore replication @@ -434,7 +434,7 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index + docWriteRequest.opType().getLowercase()); } assert operationResult != null : "operation result must never be null when primary response has no failure"; - location = handleOperationResult(operationResult, location); + location = syncOperationResultOrThrow(operationResult, location); break; case NOOP: break; @@ -443,7 +443,7 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index assert failure.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "seq no must be assigned"; operationResult = executeFailedSeqNoOnReplica(failure, docWriteRequest, replica); assert operationResult != null : "operation result must never be null when primary response has no failure"; - location = handleOperationResult(operationResult, location); + location = syncOperationResultOrThrow(operationResult, location); break; } } catch (Exception e) { @@ -457,8 +457,9 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index return location; } - private static Translog.Location handleOperationResult(final Engine.Result operationResult, - final Translog.Location currentLocation) throws Exception { + /** Syncs operation result to the translog or throws a shard not available failure */ + private static Translog.Location syncOperationResultOrThrow(final Engine.Result operationResult, + final Translog.Location currentLocation) throws Exception { final Translog.Location location; if (operationResult.hasFailure()) { // check if any transient write operation failures should be bubbled up diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index dfd85359ae250..544b68add136f 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -614,14 +614,15 @@ public IndexResult index(Index index) throws IOException { indexResult = new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); } - if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY - && indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { final Translog.Location location; if (indexResult.hasFailure() == false) { location = translog.add(new Translog.Index(index, indexResult)); - } else { + } else if (indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { // if we have document failure, record it as a no-op in the translog with the generated seq_no - location = translog.add(new Translog.NoOp(plan.seqNoForIndexing, index.primaryTerm(), indexResult.getFailure().getMessage())); + location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage())); + } else { + location = null; } indexResult.setTranslogLocation(location); } @@ -905,14 +906,15 @@ public DeleteResult delete(Delete delete) throws IOException { deleteResult = new DeleteResult(plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false); } - if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY - && deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { final Translog.Location location; if (deleteResult.hasFailure() == false) { location = translog.add(new Translog.Delete(delete, deleteResult)); - } else { - location = translog.add(new Translog.NoOp(plan.seqNoOfDeletion, + } else if (deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + location = translog.add(new Translog.NoOp(deleteResult.getSeqNo(), delete.primaryTerm(), deleteResult.getFailure().getMessage())); + } else { + location = null; } deleteResult.setTranslogLocation(location); } From 4878565fd37788e7b786d1af19003a548365a275 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Thu, 6 Apr 2017 15:40:11 -0400 Subject: [PATCH 13/18] add more tests --- .../bulk/TransportShardBulkActionTests.java | 25 ++++ .../IndexLevelReplicationTests.java | 130 +++++++++++++----- 2 files changed, 120 insertions(+), 35 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index fdd96ab6e52ad..fa34f174fca0c 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -47,6 +47,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.rest.RestStatus; +import org.mockito.ArgumentCaptor; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; @@ -518,6 +519,30 @@ public void testCalculateTranslogLocation() throws Exception { } + public void testNoopReplicationOnPrimaryFailure() throws Exception { + final IndexShard shard = spy(newStartedShard(false)); + BulkItemRequest itemRequest = new BulkItemRequest(0, + new IndexRequest("index", "type") + .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar") + ); + final String failureMessage = "simulated primary failure"; + itemRequest.setPrimaryResponse(new BulkItemResponse(0, + randomFrom(DocWriteRequest.OpType.CREATE, DocWriteRequest.OpType.DELETE, DocWriteRequest.OpType.INDEX), + new BulkItemResponse.Failure("index", "type", "1", + new IOException(failureMessage), 1L) + )); + BulkItemRequest[] itemRequests = new BulkItemRequest[1]; + itemRequests[0] = itemRequest; + BulkShardRequest bulkShardRequest = new BulkShardRequest(shard.shardId(), RefreshPolicy.NONE, itemRequests); + TransportShardBulkAction.performOnReplica(bulkShardRequest, shard); + ArgumentCaptor noOp = ArgumentCaptor.forClass(Engine.NoOp.class); + verify(shard, times(1)).markSeqNoAsNoOp(noOp.capture()); + final Engine.NoOp noOpValue = noOp.getValue(); + assertThat(noOpValue.seqNo(), equalTo(1L)); + assertThat(noOpValue.reason(), containsString(failureMessage)); + closeShards(shard); + } + public void testMappingUpdateParsesCorrectNumberOfTimes() throws Exception { IndexMetaData metaData = indexMetaData(); logger.info("--> metadata.getIndex(): {}", metaData.getIndex()); diff --git a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index e40e7919d4ec7..36d775f93a173 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -26,12 +26,12 @@ import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngineTests; @@ -95,7 +95,8 @@ public void run() { }; thread.start(); Future future = shards.asyncRecoverReplica(replica, (indexShard, node) - -> new RecoveryTarget(indexShard, node, recoveryListener, version -> {}) { + -> new RecoveryTarget(indexShard, node, recoveryListener, version -> { + }) { @Override public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException { super.cleanFiles(totalTranslogOps, sourceMetaData); @@ -155,7 +156,7 @@ public void testCheckpointsAdvance() throws Exception { final SeqNoStats shardStats = shard.seqNoStats(); final ShardRouting shardRouting = shard.routingEntry(); logger.debug("seq_no stats for {}: {}", shardRouting, XContentHelper.toString(shardStats, - new ToXContent.MapParams(Collections.singletonMap("pretty", "false")))); + new ToXContent.MapParams(Collections.singletonMap("pretty", "false")))); assertThat(shardRouting + " local checkpoint mismatch", shardStats.getLocalCheckpoint(), equalTo(numDocs - 1L)); assertThat(shardRouting + " global checkpoint mismatch", shardStats.getGlobalCheckpoint(), equalTo(numDocs - 1L)); @@ -194,49 +195,68 @@ public void testConflictingOpsOnReplica() throws Exception { * for primary and replica shards */ public void testDocumentFailureReplication() throws Exception { - IndexMetaData metaData = buildIndexMetaData(1); - final ReplicationGroup replicationGroupWithDocumentFailureOnPrimary = new ReplicationGroup(metaData) { + final String failureMessage = "simulated document failure"; + final ThrowingDocumentFailureEngineFactory throwingDocumentFailureEngineFactory = + new ThrowingDocumentFailureEngineFactory(failureMessage); + try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(0)) { @Override protected EngineFactory getEngineFactory(ShardRouting routing) { - if (routing.primary()) { - return config -> InternalEngineTests.createInternalEngine((directory, writerConfig) -> - new IndexWriter(directory, writerConfig) { - @Override - public long addDocument(Iterable doc) throws IOException { - throw new IOException("simulated document failure"); - } - }, null, config); - } else { - return null; - } - } - }; - replicationGroupWithDocumentFailureOnPrimary.startAll(); - final BulkItemResponse response = replicationGroupWithDocumentFailureOnPrimary.index( - new IndexRequest(index.getName(), "testDocumentFailureReplication", "1") - .source("{}", XContentType.JSON) - ); - assertTrue(response.isFailed()); - for (IndexShard indexShard : replicationGroupWithDocumentFailureOnPrimary) { - try(Translog.View view = indexShard.acquireTranslogView()) { - assertThat(view.totalOperations(), equalTo(1)); - Translog.Operation op = view.snapshot().next(); - assertThat(op.opType(), equalTo(Translog.Operation.Type.NO_OP)); - assertThat(op.seqNo(), equalTo(0L)); - assertThat(((Translog.NoOp) op).reason(), containsString("simulated document failure")); + return throwingDocumentFailureEngineFactory; + }}) { + + // test only primary + shards.startPrimary(); + BulkItemResponse response = shards.index( + new IndexRequest(index.getName(), "testDocumentFailureReplication", "1") + .source("{}", XContentType.JSON) + ); + assertTrue(response.isFailed()); + assertNoopTranslogOperationForDocumentFailure(shards, 1, failureMessage); + shards.assertAllEqual(0); + + // add some replicas + int nReplica = randomIntBetween(1, 3); + for (int i = 0; i < nReplica; i++) { + shards.addReplica(); } + shards.startReplicas(nReplica); + response = shards.index( + new IndexRequest(index.getName(), "testDocumentFailureReplication", "1") + .source("{}", XContentType.JSON) + ); + assertTrue(response.isFailed()); + assertNoopTranslogOperationForDocumentFailure(shards, 2, failureMessage); + shards.assertAllEqual(0); } - replicationGroupWithDocumentFailureOnPrimary.assertAllEqual(0); - replicationGroupWithDocumentFailureOnPrimary.close(); } /** * test request failures (failures before seq_no generation) are not added as a noop to translog */ public void testRequestFailureReplication() throws Exception { - try (ReplicationGroup shards = createGroup(1)) { + try (ReplicationGroup shards = createGroup(0)) { shards.startAll(); - final BulkItemResponse response = shards.index( + BulkItemResponse response = shards.index( + new IndexRequest(index.getName(), "testRequestFailureException", "1") + .source("{}", XContentType.JSON) + .version(2) + ); + assertTrue(response.isFailed()); + assertThat(response.getFailure().getCause(), instanceOf(VersionConflictEngineException.class)); + shards.assertAllEqual(0); + for (IndexShard indexShard : shards) { + try(Translog.View view = indexShard.acquireTranslogView()) { + assertThat(view.totalOperations(), equalTo(0)); + } + } + + // add some replicas + int nReplica = randomIntBetween(1, 3); + for (int i = 0; i < nReplica; i++) { + shards.addReplica(); + } + shards.startReplicas(nReplica); + response = shards.index( new IndexRequest(index.getName(), "testRequestFailureException", "1") .source("{}", XContentType.JSON) .version(2) @@ -251,4 +271,44 @@ public void testRequestFailureReplication() throws Exception { } } } + + /** Throws documentFailure on every indexing operation */ + static class ThrowingDocumentFailureEngineFactory implements EngineFactory { + final String documentFailureMessage; + + ThrowingDocumentFailureEngineFactory(String documentFailureMessage) { + this.documentFailureMessage = documentFailureMessage; + } + + @Override + public Engine newReadWriteEngine(EngineConfig config) { + return InternalEngineTests.createInternalEngine((directory, writerConfig) -> + new IndexWriter(directory, writerConfig) { + @Override + public long addDocument(Iterable doc) throws IOException { + assert documentFailureMessage != null; + throw new IOException(documentFailureMessage); + } + }, null, config); + } + } + + private static void assertNoopTranslogOperationForDocumentFailure(ReplicationGroup replicationGroup, int expectedOperation, + String failureMessage) throws IOException { + for (IndexShard indexShard : replicationGroup) { + try(Translog.View view = indexShard.acquireTranslogView()) { + assertThat(view.totalOperations(), equalTo(expectedOperation)); + final Translog.Snapshot snapshot = view.snapshot(); + long expectedSeqNo = 0L; + Translog.Operation op = snapshot.next(); + do { + assertThat(op.opType(), equalTo(Translog.Operation.Type.NO_OP)); + assertThat(op.seqNo(), equalTo(expectedSeqNo)); + assertThat(((Translog.NoOp) op).reason(), containsString(failureMessage)); + op = snapshot.next(); + expectedSeqNo++; + } while (op != null); + } + } + } } From 2f33df855a3d5d7210c355f43e571368bd5cb709 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Thu, 6 Apr 2017 15:44:16 -0400 Subject: [PATCH 14/18] minor fix --- .../action/bulk/TransportShardBulkAction.java | 24 +++++++++---------- .../bulk/TransportShardBulkActionTests.java | 10 ++++---- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 20a517da03c14..dd6bc073effec 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -361,8 +361,8 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq return new BulkItemResultHolder(updateResponse, updateOperationResult, replicaRequest); } - /** Result Enum for executing bulk item request on replica */ - public enum ShouldExecuteOnReplicaResult { + /** Mode for executing bulk item request on replica depending on corresponding primary execution */ + public enum ReplicaExecutionMode { /** * When primary execution succeeded @@ -383,24 +383,24 @@ public enum ShouldExecuteOnReplicaResult { /** * Determines whether a bulk item request should be executed on the replica. - * @return {@link ShouldExecuteOnReplicaResult#NORMAL} upon normal primary execution with no failures - * {@link ShouldExecuteOnReplicaResult#FAILURE} upon primary execution failure after sequence no generation - * {@link ShouldExecuteOnReplicaResult#NOOP} upon primary execution failure before sequence no generation or + * @return {@link ReplicaExecutionMode#NORMAL} upon normal primary execution with no failures + * {@link ReplicaExecutionMode#FAILURE} upon primary execution failure after sequence no generation + * {@link ReplicaExecutionMode#NOOP} upon primary execution failure before sequence no generation or * when primary execution resulted in noop (only possible for write requests from pre-6.0 nodes) */ - static ShouldExecuteOnReplicaResult shouldExecuteOnReplica(final BulkItemRequest request, final int index) { + static ReplicaExecutionMode shouldExecuteOnReplica(final BulkItemRequest request, final int index) { final BulkItemResponse primaryResponse = request.getPrimaryResponse(); assert primaryResponse != null : "expected primary response to be set for item [" + index + "] request [" + request.request() + "]"; if (primaryResponse.isFailed()) { return primaryResponse.getFailure().getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO - ? ShouldExecuteOnReplicaResult.FAILURE // we have a seq no generated with the failure, replicate as no-op - : ShouldExecuteOnReplicaResult.NOOP; // no seq no generated, ignore replication + ? ReplicaExecutionMode.FAILURE // we have a seq no generated with the failure, replicate as no-op + : ReplicaExecutionMode.NOOP; // no seq no generated, ignore replication } else { // NOTE: write requests originating from pre-6.0 nodes can send a no-op operation to // the replica; we ignore replication return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP - ? ShouldExecuteOnReplicaResult.NORMAL // execution successful on primary - : ShouldExecuteOnReplicaResult.NOOP; // ignore replication + ? ReplicaExecutionMode.NORMAL // execution successful on primary + : ReplicaExecutionMode.NOOP; // ignore replication } } @@ -414,11 +414,11 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index Translog.Location location = null; for (int i = 0; i < request.items().length; i++) { BulkItemRequest item = request.items()[i]; - final ShouldExecuteOnReplicaResult shouldExecuteOnReplicaResult = shouldExecuteOnReplica(item, i); + final ReplicaExecutionMode replicaExecutionMode = shouldExecuteOnReplica(item, i); final Engine.Result operationResult; DocWriteRequest docWriteRequest = item.request(); try { - switch (shouldExecuteOnReplicaResult) { + switch (replicaExecutionMode) { case NORMAL: final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse(); switch (docWriteRequest.opType()) { diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index fa34f174fca0c..774efce51f582 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -23,7 +23,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.bulk.TransportShardBulkAction.ShouldExecuteOnReplicaResult; +import org.elasticsearch.action.bulk.TransportShardBulkAction.ReplicaExecutionMode; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; @@ -89,7 +89,7 @@ public void testShouldExecuteReplicaItem() throws Exception { BulkItemRequest request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, response)); assertThat(shouldExecuteOnReplica(request, 0), - equalTo(ShouldExecuteOnReplicaResult.NORMAL)); + equalTo(ReplicaExecutionMode.NORMAL)); // Failed index requests without sequence no should not be replicated writeRequest = new IndexRequest("index", "type", "id") @@ -100,7 +100,7 @@ public void testShouldExecuteReplicaItem() throws Exception { new BulkItemResponse.Failure("index", "type", "id", new IllegalArgumentException("i died")))); assertThat(shouldExecuteOnReplica(request, 0), - equalTo(ShouldExecuteOnReplicaResult.NOOP)); + equalTo(ReplicaExecutionMode.NOOP)); // Failed index requests with sequence no should be replicated request = new BulkItemRequest(0, writeRequest); @@ -111,7 +111,7 @@ public void testShouldExecuteReplicaItem() throws Exception { "i died after sequence no was generated"), 1))); assertThat(shouldExecuteOnReplica(request, 0), - equalTo(ShouldExecuteOnReplicaResult.FAILURE)); + equalTo(ReplicaExecutionMode.FAILURE)); // NOOP requests should not be replicated writeRequest = new UpdateRequest("index", "type", "id"); response = new UpdateResponse(shardId, "type", "id", 1, DocWriteResponse.Result.NOOP); @@ -119,7 +119,7 @@ public void testShouldExecuteReplicaItem() throws Exception { request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.UPDATE, response)); assertThat(shouldExecuteOnReplica(request, 0), - equalTo(ShouldExecuteOnReplicaResult.NOOP)); + equalTo(ReplicaExecutionMode.NOOP)); } From 7f6b97a3fe6c95118e3b13260d6cd902ca137a75 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Thu, 6 Apr 2017 15:53:43 -0400 Subject: [PATCH 15/18] cleanup --- .../action/bulk/BulkItemResponse.java | 8 +++++ .../action/bulk/TransportShardBulkAction.java | 27 ++++++++-------- .../bulk/TransportShardBulkActionTests.java | 31 +++++++++++-------- .../IndexLevelReplicationTests.java | 10 +++--- 4 files changed, 45 insertions(+), 31 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java index 18ff938da12d2..68cede5d25178 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java @@ -175,6 +175,12 @@ public static class Failure implements Writeable, ToXContent { private final RestStatus status; private final long seqNo; + /** + * For write failures before operation was assigned a sequence number. + * + * use @{link {@link #Failure(String, String, String, Exception, long)}} + * to record operation sequence no with failure + */ public Failure(String index, String type, String id, Exception cause) { this(index, type, id, cause, ExceptionsHelper.status(cause), SequenceNumbersService.UNASSIGNED_SEQ_NO); } @@ -182,6 +188,8 @@ public Failure(String index, String type, String id, Exception cause) { public Failure(String index, String type, String id, Exception cause, RestStatus status) { this(index, type, id, cause, status, SequenceNumbersService.UNASSIGNED_SEQ_NO); } + + /** For write failures after operation was assigned a sequence number. */ public Failure(String index, String type, String id, Exception cause, long seqNo) { this(index, type, id, cause, ExceptionsHelper.status(cause), seqNo); } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index dd6bc073effec..ecb7eee1a39b8 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -361,8 +361,8 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq return new BulkItemResultHolder(updateResponse, updateOperationResult, replicaRequest); } - /** Mode for executing bulk item request on replica depending on corresponding primary execution */ - public enum ReplicaExecutionMode { + /** Modes for executing item request on replica depending on corresponding primary execution result */ + public enum ReplicaItemExecutionMode { /** * When primary execution succeeded @@ -383,24 +383,24 @@ public enum ReplicaExecutionMode { /** * Determines whether a bulk item request should be executed on the replica. - * @return {@link ReplicaExecutionMode#NORMAL} upon normal primary execution with no failures - * {@link ReplicaExecutionMode#FAILURE} upon primary execution failure after sequence no generation - * {@link ReplicaExecutionMode#NOOP} upon primary execution failure before sequence no generation or + * @return {@link ReplicaItemExecutionMode#NORMAL} upon normal primary execution with no failures + * {@link ReplicaItemExecutionMode#FAILURE} upon primary execution failure after sequence no generation + * {@link ReplicaItemExecutionMode#NOOP} upon primary execution failure before sequence no generation or * when primary execution resulted in noop (only possible for write requests from pre-6.0 nodes) */ - static ReplicaExecutionMode shouldExecuteOnReplica(final BulkItemRequest request, final int index) { + static ReplicaItemExecutionMode replicaItemExecutionMode(final BulkItemRequest request, final int index) { final BulkItemResponse primaryResponse = request.getPrimaryResponse(); assert primaryResponse != null : "expected primary response to be set for item [" + index + "] request [" + request.request() + "]"; if (primaryResponse.isFailed()) { return primaryResponse.getFailure().getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO - ? ReplicaExecutionMode.FAILURE // we have a seq no generated with the failure, replicate as no-op - : ReplicaExecutionMode.NOOP; // no seq no generated, ignore replication + ? ReplicaItemExecutionMode.FAILURE // we have a seq no generated with the failure, replicate as no-op + : ReplicaItemExecutionMode.NOOP; // no seq no generated, ignore replication } else { // NOTE: write requests originating from pre-6.0 nodes can send a no-op operation to // the replica; we ignore replication return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP - ? ReplicaExecutionMode.NORMAL // execution successful on primary - : ReplicaExecutionMode.NOOP; // ignore replication + ? ReplicaItemExecutionMode.NORMAL // execution successful on primary + : ReplicaItemExecutionMode.NOOP; // ignore replication } } @@ -414,11 +414,10 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index Translog.Location location = null; for (int i = 0; i < request.items().length; i++) { BulkItemRequest item = request.items()[i]; - final ReplicaExecutionMode replicaExecutionMode = shouldExecuteOnReplica(item, i); final Engine.Result operationResult; DocWriteRequest docWriteRequest = item.request(); try { - switch (replicaExecutionMode) { + switch (replicaItemExecutionMode(item, i)) { case NORMAL: final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse(); switch (docWriteRequest.opType()) { @@ -444,7 +443,7 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index operationResult = executeFailedSeqNoOnReplica(failure, docWriteRequest, replica); assert operationResult != null : "operation result must never be null when primary response has no failure"; location = syncOperationResultOrThrow(operationResult, location); - break; + break; } } catch (Exception e) { // if its not an ignore replica failure, we need to make sure to bubble up the failure @@ -454,7 +453,7 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index } } } - return location; + return location; } /** Syncs operation result to the translog or throws a shard not available failure */ diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 774efce51f582..4016c2cbdef2b 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -23,7 +23,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.bulk.TransportShardBulkAction.ReplicaExecutionMode; +import org.elasticsearch.action.bulk.TransportShardBulkAction.ReplicaItemExecutionMode; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; @@ -52,7 +52,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; -import static org.elasticsearch.action.bulk.TransportShardBulkAction.shouldExecuteOnReplica; +import static org.elasticsearch.action.bulk.TransportShardBulkAction.replicaItemExecutionMode; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.containsString; @@ -88,8 +88,8 @@ public void testShouldExecuteReplicaItem() throws Exception { DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 1, randomBoolean()); BulkItemRequest request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, response)); - assertThat(shouldExecuteOnReplica(request, 0), - equalTo(ReplicaExecutionMode.NORMAL)); + assertThat(replicaItemExecutionMode(request, 0), + equalTo(ReplicaItemExecutionMode.NORMAL)); // Failed index requests without sequence no should not be replicated writeRequest = new IndexRequest("index", "type", "id") @@ -99,8 +99,8 @@ public void testShouldExecuteReplicaItem() throws Exception { new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, new BulkItemResponse.Failure("index", "type", "id", new IllegalArgumentException("i died")))); - assertThat(shouldExecuteOnReplica(request, 0), - equalTo(ReplicaExecutionMode.NOOP)); + assertThat(replicaItemExecutionMode(request, 0), + equalTo(ReplicaItemExecutionMode.NOOP)); // Failed index requests with sequence no should be replicated request = new BulkItemRequest(0, writeRequest); @@ -110,16 +110,16 @@ public void testShouldExecuteReplicaItem() throws Exception { new IllegalArgumentException( "i died after sequence no was generated"), 1))); - assertThat(shouldExecuteOnReplica(request, 0), - equalTo(ReplicaExecutionMode.FAILURE)); + assertThat(replicaItemExecutionMode(request, 0), + equalTo(ReplicaItemExecutionMode.FAILURE)); // NOOP requests should not be replicated writeRequest = new UpdateRequest("index", "type", "id"); response = new UpdateResponse(shardId, "type", "id", 1, DocWriteResponse.Result.NOOP); request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.UPDATE, response)); - assertThat(shouldExecuteOnReplica(request, 0), - equalTo(ReplicaExecutionMode.NOOP)); + assertThat(replicaItemExecutionMode(request, 0), + equalTo(ReplicaItemExecutionMode.NOOP)); } @@ -519,7 +519,7 @@ public void testCalculateTranslogLocation() throws Exception { } - public void testNoopReplicationOnPrimaryFailure() throws Exception { + public void testNoOpReplicationOnPrimaryDocumentFailure() throws Exception { final IndexShard shard = spy(newStartedShard(false)); BulkItemRequest itemRequest = new BulkItemRequest(0, new IndexRequest("index", "type") @@ -527,13 +527,18 @@ public void testNoopReplicationOnPrimaryFailure() throws Exception { ); final String failureMessage = "simulated primary failure"; itemRequest.setPrimaryResponse(new BulkItemResponse(0, - randomFrom(DocWriteRequest.OpType.CREATE, DocWriteRequest.OpType.DELETE, DocWriteRequest.OpType.INDEX), + randomFrom( + DocWriteRequest.OpType.CREATE, + DocWriteRequest.OpType.DELETE, + DocWriteRequest.OpType.INDEX + ), new BulkItemResponse.Failure("index", "type", "1", new IOException(failureMessage), 1L) )); BulkItemRequest[] itemRequests = new BulkItemRequest[1]; itemRequests[0] = itemRequest; - BulkShardRequest bulkShardRequest = new BulkShardRequest(shard.shardId(), RefreshPolicy.NONE, itemRequests); + BulkShardRequest bulkShardRequest = new BulkShardRequest( + shard.shardId(), RefreshPolicy.NONE, itemRequests); TransportShardBulkAction.performOnReplica(bulkShardRequest, shard); ArgumentCaptor noOp = ArgumentCaptor.forClass(Engine.NoOp.class); verify(shard, times(1)).markSeqNoAsNoOp(noOp.capture()); diff --git a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 36d775f93a173..6a347034a0c55 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -211,7 +211,7 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { .source("{}", XContentType.JSON) ); assertTrue(response.isFailed()); - assertNoopTranslogOperationForDocumentFailure(shards, 1, failureMessage); + assertNoOpTranslogOperationForDocumentFailure(shards, 1, failureMessage); shards.assertAllEqual(0); // add some replicas @@ -225,7 +225,7 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { .source("{}", XContentType.JSON) ); assertTrue(response.isFailed()); - assertNoopTranslogOperationForDocumentFailure(shards, 2, failureMessage); + assertNoOpTranslogOperationForDocumentFailure(shards, 2, failureMessage); shards.assertAllEqual(0); } } @@ -293,8 +293,10 @@ public long addDocument(Iterable doc) throws IOExcepti } } - private static void assertNoopTranslogOperationForDocumentFailure(ReplicationGroup replicationGroup, int expectedOperation, - String failureMessage) throws IOException { + private static void assertNoOpTranslogOperationForDocumentFailure( + Iterable replicationGroup, + int expectedOperation, + String failureMessage) throws IOException { for (IndexShard indexShard : replicationGroup) { try(Translog.View view = indexShard.acquireTranslogView()) { assertThat(view.totalOperations(), equalTo(expectedOperation)); From 8905c949ee07a520508727c32ef5e6ce4125ad00 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Thu, 13 Apr 2017 12:36:56 -0400 Subject: [PATCH 16/18] incorporate feedback --- .../action/bulk/TransportShardBulkAction.java | 2 +- .../org/elasticsearch/index/engine/Engine.java | 15 +++++++-------- .../org/elasticsearch/index/shard/IndexShard.java | 5 ++--- .../index/shard/TranslogRecoveryPerformer.java | 2 +- .../index/engine/InternalEngineTests.java | 3 +-- 5 files changed, 12 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index ecb7eee1a39b8..6bed12ee40b60 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -597,7 +597,7 @@ private static Engine.DeleteResult executeDeleteRequestOnReplica(DocWriteRespons } private static Engine.NoOpResult executeFailedSeqNoOnReplica(BulkItemResponse.Failure primaryFailure, DocWriteRequest docWriteRequest, IndexShard replica) throws IOException { - final Engine.NoOp noOp = replica.preparingMarkingSeqNoAsNoOp(docWriteRequest.type(), docWriteRequest.id(), + final Engine.NoOp noOp = replica.prepareMarkingSeqNoAsNoOp( primaryFailure.getSeqNo(), primaryFailure.getMessage()); return replica.markSeqNoAsNoOp(noOp); } diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 34854dc294057..45b731cd9cff1 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1153,17 +1153,16 @@ public String reason() { return reason; } - public NoOp( - final Term uid, - final long seqNo, - final long primaryTerm, - final Origin origin, - final long startTime, - final String reason) { - super(uid, seqNo, primaryTerm, 0, null, origin, startTime); + public NoOp(final long seqNo, final long primaryTerm, final Origin origin, final long startTime, final String reason) { + super(null, seqNo, primaryTerm, Versions.NOT_FOUND, null, origin, startTime); this.reason = reason; } + @Override + public Term uid() { + throw new UnsupportedOperationException(); + } + @Override public String type() { throw new UnsupportedOperationException(); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f7871894a2320..589572fff3fe6 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -569,11 +569,10 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc return result; } - public Engine.NoOp preparingMarkingSeqNoAsNoOp(String type, String id, long seqNo, String reason) { + public Engine.NoOp prepareMarkingSeqNoAsNoOp(long seqNo, String reason) { verifyReplicationTarget(); - final Term uid = extractUid(type, id); long startTime = System.nanoTime(); - return new Engine.NoOp(uid, seqNo, primaryTerm, Engine.Operation.Origin.REPLICA, startTime, reason); + return new Engine.NoOp(seqNo, primaryTerm, Engine.Operation.Origin.REPLICA, startTime, reason); } public Engine.NoOpResult markSeqNoAsNoOp(Engine.NoOp noOp) throws IOException { diff --git a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java index 9801df664a986..8842cbf3c0bd4 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java @@ -180,7 +180,7 @@ private void performRecoveryOperation(Engine engine, Translog.Operation operatio final String reason = noOp.reason(); logger.trace("[translog] recover [no_op] op [({}, {})] of [{}]", seqNo, primaryTerm, reason); final Engine.NoOp engineNoOp = - new Engine.NoOp(null, seqNo, primaryTerm, origin, System.nanoTime(), reason); + new Engine.NoOp(seqNo, primaryTerm, origin, System.nanoTime(), reason); noOp(engine, engineNoOp); break; default: diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 5408834349f86..af53c4997fde0 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3675,8 +3675,7 @@ public long generateSeqNo() { final String reason = randomAlphaOfLength(16); noOpEngine.noOp( new Engine.NoOp( - null, - maxSeqNo + 1, + maxSeqNo + 1, primaryTerm, randomFrom(PRIMARY, REPLICA, PEER_RECOVERY, LOCAL_TRANSLOG_RECOVERY), System.nanoTime(), From 8f325dfb77952e64b4b9e183b59122753a5f4868 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Tue, 18 Apr 2017 10:53:01 -0400 Subject: [PATCH 17/18] incorporate feedback --- .../elasticsearch/action/bulk/TransportShardBulkAction.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 6bed12ee40b60..e0dbea74f2358 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -440,10 +440,12 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index case FAILURE: final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure(); assert failure.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "seq no must be assigned"; - operationResult = executeFailedSeqNoOnReplica(failure, docWriteRequest, replica); + operationResult = executeFailureNoOpOnReplica(failure, replica); assert operationResult != null : "operation result must never be null when primary response has no failure"; location = syncOperationResultOrThrow(operationResult, location); break; + default: + throw new IllegalStateException("illegal replica item execution mode for: " + item.request()); } } catch (Exception e) { // if its not an ignore replica failure, we need to make sure to bubble up the failure @@ -596,7 +598,7 @@ private static Engine.DeleteResult executeDeleteRequestOnReplica(DocWriteRespons return replica.delete(delete); } - private static Engine.NoOpResult executeFailedSeqNoOnReplica(BulkItemResponse.Failure primaryFailure, DocWriteRequest docWriteRequest, IndexShard replica) throws IOException { + private static Engine.NoOpResult executeFailureNoOpOnReplica(BulkItemResponse.Failure primaryFailure, IndexShard replica) throws IOException { final Engine.NoOp noOp = replica.prepareMarkingSeqNoAsNoOp( primaryFailure.getSeqNo(), primaryFailure.getMessage()); return replica.markSeqNoAsNoOp(noOp); From d153fdcdd25a739fb98982daa63971a0d5c49c91 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Tue, 18 Apr 2017 17:53:31 -0400 Subject: [PATCH 18/18] add assert to remove handling noop primary response when 5.0 nodes are not supported --- .../action/bulk/TransportShardBulkAction.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index e0dbea74f2358..170f2d3053627 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteRequest; @@ -381,6 +382,13 @@ public enum ReplicaItemExecutionMode { FAILURE } + static { + assert Version.CURRENT.minimumCompatibilityVersion().after(Version.V_5_0_0) == false: + "Remove logic handling NoOp result from primary response; see TODO in replicaItemExecutionMode" + + " as the current minimum compatible version [" + + Version.CURRENT.minimumCompatibilityVersion() + "] is after 5.0"; + } + /** * Determines whether a bulk item request should be executed on the replica. * @return {@link ReplicaItemExecutionMode#NORMAL} upon normal primary execution with no failures @@ -398,6 +406,8 @@ static ReplicaItemExecutionMode replicaItemExecutionMode(final BulkItemRequest r } else { // NOTE: write requests originating from pre-6.0 nodes can send a no-op operation to // the replica; we ignore replication + // TODO: remove noOp result check from primary response, when pre-6.0 nodes are not supported + // we should return ReplicationItemExecutionMode.NORMAL instead return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP ? ReplicaItemExecutionMode.NORMAL // execution successful on primary : ReplicaItemExecutionMode.NOOP; // ignore replication