Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
private static final String _ID = "_id";
private static final String _VERSION = "_version";
private static final String _SEQ_NO = "_seq_no";
private static final String _PRIMARY_TERM = "_primary_term";
private static final String RESULT = "result";
private static final String FORCED_REFRESH = "forced_refresh";

Expand Down Expand Up @@ -116,14 +117,16 @@ public void writeTo(StreamOutput out) throws IOException {
private String type;
private long version;
private long seqNo;
private long primaryTerm;
private boolean forcedRefresh;
protected Result result;

public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, long version, Result result) {
public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, Result result) {
this.shardId = shardId;
this.type = type;
this.id = id;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.version = version;
this.result = result;
}
Expand Down Expand Up @@ -182,6 +185,15 @@ public long getSeqNo() {
return seqNo;
}

/**
* The primary term for this change.
*
* @return the primary term
*/
public long getPrimaryTerm() {
return primaryTerm;
}

/**
* Did this request force a refresh? Requests that set {@link WriteRequest#setRefreshPolicy(RefreshPolicy)} to
* {@link RefreshPolicy#IMMEDIATE} will always return true for this. Requests that set it to {@link RefreshPolicy#WAIT_UNTIL} will
Expand Down Expand Up @@ -251,8 +263,10 @@ public void readFrom(StreamInput in) throws IOException {
version = in.readZLong();
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
seqNo = in.readZLong();
primaryTerm = in.readVLong();
} else {
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
primaryTerm = 0;
}
forcedRefresh = in.readBoolean();
result = Result.readFrom(in);
Expand All @@ -267,6 +281,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeZLong(version);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeZLong(seqNo);
out.writeVLong(primaryTerm);
}
out.writeBoolean(forcedRefresh);
result.writeTo(out);
Expand All @@ -293,6 +308,7 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
builder.field(_SHARDS, shardInfo);
if (getSeqNo() >= 0) {
builder.field(_SEQ_NO, getSeqNo());
builder.field(_PRIMARY_TERM, getPrimaryTerm());
}
return builder;
}
Expand Down Expand Up @@ -333,6 +349,8 @@ protected static void parseInnerToXContent(XContentParser parser, Builder contex
context.setForcedRefresh(parser.booleanValue());
} else if (_SEQ_NO.equals(currentFieldName)) {
context.setSeqNo(parser.longValue());
} else if (_PRIMARY_TERM.equals(currentFieldName)) {
context.setPrimaryTerm(parser.longValue());
} else {
throwUnknownField(currentFieldName, parser.getTokenLocation());
}
Expand Down Expand Up @@ -362,6 +380,7 @@ public abstract static class Builder {
protected boolean forcedRefresh;
protected ShardInfo shardInfo = null;
protected Long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
protected Long primaryTerm = 0L;

public ShardId getShardId() {
return shardId;
Expand Down Expand Up @@ -407,6 +426,10 @@ public void setSeqNo(Long seqNo) {
this.seqNo = seqNo;
}

public void setPrimaryTerm(Long primaryTerm) {
this.primaryTerm = primaryTerm;
}

public abstract DocWriteResponse build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.SourceToParse;
Expand Down Expand Up @@ -142,7 +143,7 @@ private static BulkItemResultHolder executeIndexRequest(final IndexRequest index
return new BulkItemResultHolder(null, indexResult, bulkItemRequest);
} else {
IndexResponse response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(),
indexResult.getSeqNo(), indexResult.getVersion(), indexResult.isCreated());
indexResult.getSeqNo(), primary.getPrimaryTerm(), indexResult.getVersion(), indexResult.isCreated());
return new BulkItemResultHolder(response, indexResult, bulkItemRequest);
}
}
Expand All @@ -155,7 +156,7 @@ private static BulkItemResultHolder executeDeleteRequest(final DeleteRequest del
return new BulkItemResultHolder(null, deleteResult, bulkItemRequest);
} else {
DeleteResponse response = new DeleteResponse(primary.shardId(), deleteRequest.type(), deleteRequest.id(),
deleteResult.getSeqNo(), deleteResult.getVersion(), deleteResult.isFound());
deleteResult.getSeqNo(), primary.getPrimaryTerm(), deleteResult.getVersion(), deleteResult.isFound());
return new BulkItemResultHolder(response, deleteResult, bulkItemRequest);
}
}
Expand Down Expand Up @@ -276,7 +277,7 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq
int requestIndex, UpdateHelper updateHelper,
LongSupplier nowInMillis,
final MappingUpdatePerformer mappingUpdater) throws Exception {
Engine.Result updateOperationResult = null;
Engine.Result result = null;
UpdateResponse updateResponse = null;
BulkItemRequest replicaRequest = request.items()[requestIndex];
int maxAttempts = updateRequest.retryOnConflict();
Expand All @@ -288,7 +289,7 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq
} catch (Exception failure) {
// we may fail translating a update to index or delete operation
// we use index result to communicate failure while translating update request
updateOperationResult = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO);
result = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO);
break; // out of retry loop
}
// execute translated update request
Expand All @@ -298,34 +299,46 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq
IndexRequest indexRequest = translate.action();
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
indexRequest.process(mappingMd, request.index());
updateOperationResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater);
result = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater);
break;
case DELETED:
DeleteRequest deleteRequest = translate.action();
updateOperationResult = executeDeleteRequestOnPrimary(deleteRequest, primary);
result = executeDeleteRequestOnPrimary(deleteRequest, primary);
break;
case NOOP:
primary.noopUpdate(updateRequest.type());
break;
default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult());
}
if (updateOperationResult == null) {
if (result == null) {
// this is a noop operation
updateResponse = translate.action();
break; // out of retry loop
} else if (updateOperationResult.hasFailure() == false) {
} else if (result.hasFailure() == false) {
// enrich update response and
// set translated update (index/delete) request for replica execution in bulk items
switch (updateOperationResult.getOperationType()) {
switch (result.getOperationType()) {
case INDEX:
assert result instanceof Engine.IndexResult : result.getClass();
IndexRequest updateIndexRequest = translate.action();
final IndexResponse indexResponse = new IndexResponse(primary.shardId(),
updateIndexRequest.type(), updateIndexRequest.id(), updateOperationResult.getSeqNo(),
updateOperationResult.getVersion(), ((Engine.IndexResult) updateOperationResult).isCreated());
final IndexResponse indexResponse = new IndexResponse(
primary.shardId(),
updateIndexRequest.type(),
updateIndexRequest.id(),
result.getSeqNo(),
primary.getPrimaryTerm(),
result.getVersion(),
((Engine.IndexResult) result).isCreated());
BytesReference indexSourceAsBytes = updateIndexRequest.source();
updateResponse = new UpdateResponse(indexResponse.getShardInfo(),
indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getSeqNo(),
indexResponse.getVersion(), indexResponse.getResult());
updateResponse = new UpdateResponse(
indexResponse.getShardInfo(),
indexResponse.getShardId(),
indexResponse.getType(),
indexResponse.getId(),
indexResponse.getSeqNo(),
indexResponse.getPrimaryTerm(),
indexResponse.getVersion(),
indexResponse.getResult());
if ((updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) ||
(updateRequest.fields() != null && updateRequest.fields().length > 0)) {
Tuple<XContentType, Map<String, Object>> sourceAndContent =
Expand All @@ -337,29 +350,46 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq
replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateIndexRequest);
break;
case DELETE:
assert result instanceof Engine.DeleteResult : result.getClass();
DeleteRequest updateDeleteRequest = translate.action();
DeleteResponse deleteResponse = new DeleteResponse(primary.shardId(),
updateDeleteRequest.type(), updateDeleteRequest.id(), updateOperationResult.getSeqNo(),
updateOperationResult.getVersion(), ((Engine.DeleteResult) updateOperationResult).isFound());
updateResponse = new UpdateResponse(deleteResponse.getShardInfo(),
deleteResponse.getShardId(), deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getSeqNo(),
deleteResponse.getVersion(), deleteResponse.getResult());
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest,
request.index(), deleteResponse.getVersion(), translate.updatedSourceAsMap(),
translate.updateSourceContentType(), null));
DeleteResponse deleteResponse = new DeleteResponse(
primary.shardId(),
updateDeleteRequest.type(),
updateDeleteRequest.id(),
result.getSeqNo(),
primary.getPrimaryTerm(),
result.getVersion(),
((Engine.DeleteResult) result).isFound());
updateResponse = new UpdateResponse(
deleteResponse.getShardInfo(),
deleteResponse.getShardId(),
deleteResponse.getType(),
deleteResponse.getId(),
deleteResponse.getSeqNo(),
deleteResponse.getPrimaryTerm(),
deleteResponse.getVersion(),
deleteResponse.getResult());
final GetResult getResult = updateHelper.extractGetResult(
updateRequest,
request.index(),
deleteResponse.getVersion(),
translate.updatedSourceAsMap(),
translate.updateSourceContentType(),
null);
updateResponse.setGetResult(getResult);
// set translated request as replica request
replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateDeleteRequest);
break;
}
assert updateOperationResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO;
assert result.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO;
// successful operation
break; // out of retry loop
} else if (updateOperationResult.getFailure() instanceof VersionConflictEngineException == false) {
} else if (result.getFailure() instanceof VersionConflictEngineException == false) {
// not a version conflict exception
break; // out of retry loop
}
}
return new BulkItemResultHolder(updateResponse, updateOperationResult, replicaRequest);
return new BulkItemResultHolder(updateResponse, result, replicaRequest);
}

/** Modes for executing item request on replica depending on corresponding primary execution result */
Expand Down Expand Up @@ -513,8 +543,7 @@ private static Engine.IndexResult executeIndexRequestOnReplica(
try {
operation = prepareIndexOperationOnReplica(primaryResponse, request, replica);
} catch (MapperParsingException e) {
return new Engine.IndexResult(e, primaryResponse.getVersion(),
primaryResponse.getSeqNo());
return new Engine.IndexResult(e, primaryResponse.getVersion(), primaryResponse.getSeqNo());
}

Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public class DeleteResponse extends DocWriteResponse {
public DeleteResponse() {
}

public DeleteResponse(ShardId shardId, String type, String id, long seqNo, long version, boolean found) {
super(shardId, type, id, seqNo, version, found ? Result.DELETED : Result.NOT_FOUND);
public DeleteResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, boolean found) {
super(shardId, type, id, seqNo, primaryTerm, version, found ? Result.DELETED : Result.NOT_FOUND);
}

@Override
Expand Down Expand Up @@ -112,7 +112,7 @@ public void setFound(boolean found) {

@Override
public DeleteResponse build() {
DeleteResponse deleteResponse = new DeleteResponse(shardId, type, id, seqNo, version, found);
DeleteResponse deleteResponse = new DeleteResponse(shardId, type, id, seqNo, primaryTerm, version, found);
deleteResponse.setForcedRefresh(forcedRefresh);
if (shardInfo != null) {
deleteResponse.setShardInfo(shardInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public class IndexResponse extends DocWriteResponse {
public IndexResponse() {
}

public IndexResponse(ShardId shardId, String type, String id, long seqNo, long version, boolean created) {
super(shardId, type, id, seqNo, version, created ? Result.CREATED : Result.UPDATED);
public IndexResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, boolean created) {
super(shardId, type, id, seqNo, primaryTerm, version, created ? Result.CREATED : Result.UPDATED);
}

@Override
Expand All @@ -62,6 +62,7 @@ public String toString() {
builder.append(",version=").append(getVersion());
builder.append(",result=").append(getResult().getLowercase());
builder.append(",seqNo=").append(getSeqNo());
builder.append(",primaryTerm=").append(getPrimaryTerm());
builder.append(",shards=").append(Strings.toString(getShardInfo()));
return builder.append("]").toString();
}
Expand Down Expand Up @@ -114,7 +115,7 @@ public void setCreated(boolean created) {

@Override
public IndexResponse build() {
IndexResponse indexResponse = new IndexResponse(shardId, type, id, seqNo, version, created);
IndexResponse indexResponse = new IndexResponse(shardId, type, id, seqNo, primaryTerm, version, created);
indexResponse.setForcedRefresh(forcedRefresh);
if (shardInfo != null) {
indexResponse.setShardInfo(shardInfo);
Expand Down
Loading