diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java index 0f4835de2e9ea..81d5e874440df 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java @@ -19,11 +19,10 @@ package org.elasticsearch.action.bulk; +import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.IndicesRequest; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -38,7 +37,6 @@ public class BulkItemRequest implements Streamable { private int id; private DocWriteRequest request; private volatile BulkItemResponse primaryResponse; - private volatile boolean ignoreOnReplica; BulkItemRequest() { @@ -71,15 +69,9 @@ void setPrimaryResponse(BulkItemResponse primaryResponse) { this.primaryResponse = primaryResponse; } - /** - * Marks this request to be ignored and *not* execute on a replica. - */ - void setIgnoreOnReplica() { - this.ignoreOnReplica = true; - } - boolean isIgnoreOnReplica() { - return ignoreOnReplica; + return primaryResponse != null && + (primaryResponse.isFailed() || primaryResponse.getResponse().getResult() == DocWriteResponse.Result.NOOP); } public static BulkItemRequest readBulkItem(StreamInput in) throws IOException { @@ -94,8 +86,18 @@ public void readFrom(StreamInput in) throws IOException { request = DocWriteRequest.readDocumentRequest(in); if (in.readBoolean()) { primaryResponse = BulkItemResponse.readBulkItem(in); + // This is a bwc layer for 6.0 which no longer mutates the requests with these + // Since 5.x still requires it we do it here. Note that these are harmless + // as both operations are idempotent. This is something we rely on and assert on + // in InternalEngine.planIndexingAsNonPrimary() + request.version(primaryResponse.getVersion()); + request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); + } + if (in.getVersion().before(Version.V_5_6_0_UNRELEASED)) { + boolean ignoreOnReplica = in.readBoolean(); + assert ignoreOnReplica == isIgnoreOnReplica() : + "ignoreOnReplica mismatch. wire [" + ignoreOnReplica + "], ours [" + isIgnoreOnReplica() + "]"; } - ignoreOnReplica = in.readBoolean(); } @Override @@ -103,6 +105,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(id); DocWriteRequest.writeDocumentRequest(out, request); out.writeOptionalStreamable(primaryResponse); - out.writeBoolean(ignoreOnReplica); + if (out.getVersion().before(Version.V_5_6_0_UNRELEASED)) { + out.writeBoolean(isIgnoreOnReplica()); + } } } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index b4a188578b3b7..95c3d4b69301e 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -29,8 +29,8 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.TransportActions; +import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.action.update.UpdateHelper; @@ -186,8 +186,8 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh if (operationResult == null) { // in case of noop update operation assert response.getResult() == DocWriteResponse.Result.NOOP : "only noop update can have null operation"; - replicaRequest.setIgnoreOnReplica(); replicaRequest.setPrimaryResponse(new BulkItemResponse(replicaRequest.id(), opType, response)); + assert replicaRequest.isIgnoreOnReplica(); } else if (operationResult.hasFailure() == false) { location = locationToSync(location, operationResult.getTranslogLocation()); BulkItemResponse primaryResponse = new BulkItemResponse(replicaRequest.id(), opType, response); @@ -195,6 +195,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh // set an empty ShardInfo to indicate no shards participated in the request execution // so we can safely send it to the replicas. We won't use it in the real response though. primaryResponse.getResponse().setShardInfo(new ShardInfo()); + assert replicaRequest.isIgnoreOnReplica() == false; } else { DocWriteRequest docWriteRequest = replicaRequest.request(); Exception failure = operationResult.getFailure(); @@ -209,9 +210,9 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh // again, due to primary relocation and only processing up to N bulk items when the shard gets closed) // then just use the response we got from the successful execution if (replicaRequest.getPrimaryResponse() == null || isConflictException(failure) == false) { - replicaRequest.setIgnoreOnReplica(); replicaRequest.setPrimaryResponse(new BulkItemResponse(replicaRequest.id(), docWriteRequest.opType(), new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), failure))); + assert replicaRequest.isIgnoreOnReplica(); } } assert replicaRequest.getPrimaryResponse() != null;