Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.Version;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
Expand All @@ -38,7 +37,6 @@ public class BulkItemRequest implements Streamable {
private int id;
private DocWriteRequest request;
private volatile BulkItemResponse primaryResponse;
private volatile boolean ignoreOnReplica;

BulkItemRequest() {

Expand Down Expand Up @@ -71,15 +69,9 @@ void setPrimaryResponse(BulkItemResponse primaryResponse) {
this.primaryResponse = primaryResponse;
}

/**
* Marks this request to be ignored and *not* execute on a replica.
*/
void setIgnoreOnReplica() {
this.ignoreOnReplica = true;
}

boolean isIgnoreOnReplica() {
return ignoreOnReplica;
return primaryResponse != null &&
(primaryResponse.isFailed() || primaryResponse.getResponse().getResult() == DocWriteResponse.Result.NOOP);
}

public static BulkItemRequest readBulkItem(StreamInput in) throws IOException {
Expand All @@ -94,15 +86,27 @@ public void readFrom(StreamInput in) throws IOException {
request = DocWriteRequest.readDocumentRequest(in);
if (in.readBoolean()) {
primaryResponse = BulkItemResponse.readBulkItem(in);
// This is a bwc layer for 6.0 which no longer mutates the requests with these
// Since 5.x still requires it we do it here. Note that these are harmless
// as both operations are idempotent. This is something we rely on and assert on
// in InternalEngine.planIndexingAsNonPrimary()
request.version(primaryResponse.getVersion());
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
}
if (in.getVersion().before(Version.V_5_6_0_UNRELEASED)) {
boolean ignoreOnReplica = in.readBoolean();
assert ignoreOnReplica == isIgnoreOnReplica() :
"ignoreOnReplica mismatch. wire [" + ignoreOnReplica + "], ours [" + isIgnoreOnReplica() + "]";
}
ignoreOnReplica = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(id);
DocWriteRequest.writeDocumentRequest(out, request);
out.writeOptionalStreamable(primaryResponse);
out.writeBoolean(ignoreOnReplica);
if (out.getVersion().before(Version.V_5_6_0_UNRELEASED)) {
out.writeBoolean(isIgnoreOnReplica());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.action.update.UpdateHelper;
Expand Down Expand Up @@ -186,15 +186,16 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
if (operationResult == null) { // in case of noop update operation
assert response.getResult() == DocWriteResponse.Result.NOOP
: "only noop update can have null operation";
replicaRequest.setIgnoreOnReplica();
replicaRequest.setPrimaryResponse(new BulkItemResponse(replicaRequest.id(), opType, response));
assert replicaRequest.isIgnoreOnReplica();
} else if (operationResult.hasFailure() == false) {
location = locationToSync(location, operationResult.getTranslogLocation());
BulkItemResponse primaryResponse = new BulkItemResponse(replicaRequest.id(), opType, response);
replicaRequest.setPrimaryResponse(primaryResponse);
// set an empty ShardInfo to indicate no shards participated in the request execution
// so we can safely send it to the replicas. We won't use it in the real response though.
primaryResponse.getResponse().setShardInfo(new ShardInfo());
assert replicaRequest.isIgnoreOnReplica() == false;
} else {
DocWriteRequest docWriteRequest = replicaRequest.request();
Exception failure = operationResult.getFailure();
Expand All @@ -209,9 +210,9 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
// then just use the response we got from the successful execution
if (replicaRequest.getPrimaryResponse() == null || isConflictException(failure) == false) {
replicaRequest.setIgnoreOnReplica();
replicaRequest.setPrimaryResponse(new BulkItemResponse(replicaRequest.id(), docWriteRequest.opType(),
new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), failure)));
assert replicaRequest.isIgnoreOnReplica();
}
}
assert replicaRequest.getPrimaryResponse() != null;
Expand Down