Skip to content

Commit a550498

Browse files
committed
Add primary term to doc write response
This commit adds the primary term to the doc write response.
1 parent 8f666a7 commit a550498

25 files changed

+214
-147
lines changed

core/src/main/java/org/elasticsearch/action/DocWriteResponse.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
5757
private static final String _ID = "_id";
5858
private static final String _VERSION = "_version";
5959
private static final String _SEQ_NO = "_seq_no";
60+
private static final String _PRIMARY_TERM = "_primary_term";
6061
private static final String RESULT = "result";
6162
private static final String FORCED_REFRESH = "forced_refresh";
6263

@@ -116,14 +117,16 @@ public void writeTo(StreamOutput out) throws IOException {
116117
private String type;
117118
private long version;
118119
private long seqNo;
120+
private long primaryTerm;
119121
private boolean forcedRefresh;
120122
protected Result result;
121123

122-
public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, long version, Result result) {
124+
public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, Result result) {
123125
this.shardId = shardId;
124126
this.type = type;
125127
this.id = id;
126128
this.seqNo = seqNo;
129+
this.primaryTerm = primaryTerm;
127130
this.version = version;
128131
this.result = result;
129132
}
@@ -182,6 +185,15 @@ public long getSeqNo() {
182185
return seqNo;
183186
}
184187

188+
/**
189+
* The primary term for this change.
190+
*
191+
* @return the primary term
192+
*/
193+
public long getPrimaryTerm() {
194+
return primaryTerm;
195+
}
196+
185197
/**
186198
* Did this request force a refresh? Requests that set {@link WriteRequest#setRefreshPolicy(RefreshPolicy)} to
187199
* {@link RefreshPolicy#IMMEDIATE} will always return true for this. Requests that set it to {@link RefreshPolicy#WAIT_UNTIL} will
@@ -251,8 +263,10 @@ public void readFrom(StreamInput in) throws IOException {
251263
version = in.readZLong();
252264
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
253265
seqNo = in.readZLong();
266+
primaryTerm = in.readVLong();
254267
} else {
255268
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
269+
primaryTerm = 0;
256270
}
257271
forcedRefresh = in.readBoolean();
258272
result = Result.readFrom(in);
@@ -267,6 +281,7 @@ public void writeTo(StreamOutput out) throws IOException {
267281
out.writeZLong(version);
268282
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
269283
out.writeZLong(seqNo);
284+
out.writeVLong(primaryTerm);
270285
}
271286
out.writeBoolean(forcedRefresh);
272287
result.writeTo(out);
@@ -293,6 +308,7 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
293308
builder.field(_SHARDS, shardInfo);
294309
if (getSeqNo() >= 0) {
295310
builder.field(_SEQ_NO, getSeqNo());
311+
builder.field(_PRIMARY_TERM, getPrimaryTerm());
296312
}
297313
return builder;
298314
}
@@ -333,7 +349,9 @@ protected static void parseInnerToXContent(XContentParser parser, Builder contex
333349
context.setForcedRefresh(parser.booleanValue());
334350
} else if (_SEQ_NO.equals(currentFieldName)) {
335351
context.setSeqNo(parser.longValue());
336-
} else {
352+
} else if (_PRIMARY_TERM.equals(currentFieldName)) {
353+
context.setPrimaryTerm(parser.longValue());
354+
} else{
337355
throwUnknownField(currentFieldName, parser.getTokenLocation());
338356
}
339357
} else if (token == XContentParser.Token.START_OBJECT) {
@@ -362,6 +380,7 @@ public abstract static class Builder {
362380
protected boolean forcedRefresh;
363381
protected ShardInfo shardInfo = null;
364382
protected Long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
383+
protected Long primaryTerm = 0L;
365384

366385
public ShardId getShardId() {
367386
return shardId;
@@ -407,6 +426,10 @@ public void setSeqNo(Long seqNo) {
407426
this.seqNo = seqNo;
408427
}
409428

429+
public void setPrimaryTerm(Long primaryTerm) {
430+
this.primaryTerm = primaryTerm;
431+
}
432+
410433
public abstract DocWriteResponse build();
411434
}
412435
}

core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ private static BulkItemResultHolder executeIndexRequest(final IndexRequest index
139139
return new BulkItemResultHolder(null, indexResult, bulkItemRequest);
140140
} else {
141141
IndexResponse response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(),
142-
indexResult.getSeqNo(), indexResult.getVersion(), indexResult.isCreated());
142+
indexResult.getSeqNo(), indexResult.getPrimaryTerm(), indexResult.getVersion(), indexResult.isCreated());
143143
return new BulkItemResultHolder(response, indexResult, bulkItemRequest);
144144
}
145145
}
@@ -152,7 +152,7 @@ private static BulkItemResultHolder executeDeleteRequest(final DeleteRequest del
152152
return new BulkItemResultHolder(null, deleteResult, bulkItemRequest);
153153
} else {
154154
DeleteResponse response = new DeleteResponse(primary.shardId(), deleteRequest.type(), deleteRequest.id(),
155-
deleteResult.getSeqNo(), deleteResult.getVersion(), deleteResult.isFound());
155+
deleteResult.getSeqNo(), deleteResult.getPrimaryTerm(), deleteResult.getVersion(), deleteResult.isFound());
156156
return new BulkItemResultHolder(response, deleteResult, bulkItemRequest);
157157
}
158158
}
@@ -272,7 +272,7 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq
272272
int requestIndex, UpdateHelper updateHelper,
273273
LongSupplier nowInMillis,
274274
final MappingUpdatePerformer mappingUpdater) throws Exception {
275-
Engine.Result updateOperationResult = null;
275+
Engine.Result result = null;
276276
UpdateResponse updateResponse = null;
277277
BulkItemRequest replicaRequest = request.items()[requestIndex];
278278
int maxAttempts = updateRequest.retryOnConflict();
@@ -284,7 +284,7 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq
284284
} catch (Exception failure) {
285285
// we may fail translating a update to index or delete operation
286286
// we use index result to communicate failure while translating update request
287-
updateOperationResult = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO);
287+
result = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO);
288288
break; // out of retry loop
289289
}
290290
// execute translated update request
@@ -294,34 +294,41 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq
294294
IndexRequest indexRequest = translate.action();
295295
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
296296
indexRequest.process(mappingMd, request.index());
297-
updateOperationResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater);
297+
result = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater);
298298
break;
299299
case DELETED:
300300
DeleteRequest deleteRequest = translate.action();
301-
updateOperationResult = executeDeleteRequestOnPrimary(deleteRequest, primary);
301+
result = executeDeleteRequestOnPrimary(deleteRequest, primary);
302302
break;
303303
case NOOP:
304304
primary.noopUpdate(updateRequest.type());
305305
break;
306306
default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult());
307307
}
308-
if (updateOperationResult == null) {
308+
if (result == null) {
309309
// this is a noop operation
310310
updateResponse = translate.action();
311311
break; // out of retry loop
312-
} else if (updateOperationResult.hasFailure() == false) {
312+
} else if (result.hasFailure() == false) {
313313
// enrich update response and
314314
// set translated update (index/delete) request for replica execution in bulk items
315-
switch (updateOperationResult.getOperationType()) {
315+
switch (result.getOperationType()) {
316316
case INDEX:
317+
assert result instanceof Engine.IndexResult : result.getClass();
317318
IndexRequest updateIndexRequest = translate.action();
318-
final IndexResponse indexResponse = new IndexResponse(primary.shardId(),
319-
updateIndexRequest.type(), updateIndexRequest.id(), updateOperationResult.getSeqNo(),
320-
updateOperationResult.getVersion(), ((Engine.IndexResult) updateOperationResult).isCreated());
319+
final IndexResponse indexResponse =
320+
new IndexResponse(primary.shardId(), updateIndexRequest.type(), updateIndexRequest.id(), result.getSeqNo(), result.getPrimaryTerm(),
321+
result.getVersion(), ((Engine.IndexResult) result).isCreated());
321322
BytesReference indexSourceAsBytes = updateIndexRequest.source();
322-
updateResponse = new UpdateResponse(indexResponse.getShardInfo(),
323-
indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getSeqNo(),
324-
indexResponse.getVersion(), indexResponse.getResult());
323+
updateResponse = new UpdateResponse(
324+
indexResponse.getShardInfo(),
325+
indexResponse.getShardId(),
326+
indexResponse.getType(),
327+
indexResponse.getId(),
328+
indexResponse.getSeqNo(),
329+
indexResponse.getPrimaryTerm(),
330+
indexResponse.getVersion(),
331+
indexResponse.getResult());
325332
if ((updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) ||
326333
(updateRequest.fields() != null && updateRequest.fields().length > 0)) {
327334
Tuple<XContentType, Map<String, Object>> sourceAndContent =
@@ -333,29 +340,36 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq
333340
replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateIndexRequest);
334341
break;
335342
case DELETE:
343+
assert result instanceof Engine.DeleteResult : result.getClass();
336344
DeleteRequest updateDeleteRequest = translate.action();
337345
DeleteResponse deleteResponse = new DeleteResponse(primary.shardId(),
338-
updateDeleteRequest.type(), updateDeleteRequest.id(), updateOperationResult.getSeqNo(),
339-
updateOperationResult.getVersion(), ((Engine.DeleteResult) updateOperationResult).isFound());
340-
updateResponse = new UpdateResponse(deleteResponse.getShardInfo(),
341-
deleteResponse.getShardId(), deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getSeqNo(),
342-
deleteResponse.getVersion(), deleteResponse.getResult());
346+
updateDeleteRequest.type(), updateDeleteRequest.id(), result.getSeqNo(), result.getPrimaryTerm(),
347+
result.getVersion(), ((Engine.DeleteResult) result).isFound());
348+
updateResponse = new UpdateResponse(
349+
deleteResponse.getShardInfo(),
350+
deleteResponse.getShardId(),
351+
deleteResponse.getType(),
352+
deleteResponse.getId(),
353+
deleteResponse.getSeqNo(),
354+
deleteResponse.getPrimaryTerm(),
355+
deleteResponse.getVersion(),
356+
deleteResponse.getResult());
343357
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest,
344358
request.index(), deleteResponse.getVersion(), translate.updatedSourceAsMap(),
345359
translate.updateSourceContentType(), null));
346360
// set translated request as replica request
347361
replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateDeleteRequest);
348362
break;
349363
}
350-
assert updateOperationResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO;
364+
assert result.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO;
351365
// successful operation
352366
break; // out of retry loop
353-
} else if (updateOperationResult.getFailure() instanceof VersionConflictEngineException == false) {
367+
} else if (result.getFailure() instanceof VersionConflictEngineException == false) {
354368
// not a version conflict exception
355369
break; // out of retry loop
356370
}
357371
}
358-
return new BulkItemResultHolder(updateResponse, updateOperationResult, replicaRequest);
372+
return new BulkItemResultHolder(updateResponse, result, replicaRequest);
359373
}
360374

361375
static boolean shouldExecuteReplicaItem(final BulkItemRequest request, final int index) {
@@ -496,7 +510,7 @@ public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest reque
496510
mappingUpdater.updateMappings(mappingUpdate, primary.shardId(), request.type());
497511
}
498512
} catch (MapperParsingException | IllegalArgumentException failure) {
499-
return new Engine.IndexResult(failure, request.version());
513+
return new Engine.IndexResult(failure, request.version(), primary.getPrimaryTerm());
500514
}
501515

502516
// Verify that there are no more mappings that need to be applied. If there are failures, a
@@ -509,7 +523,7 @@ public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest reque
509523
} catch (MapperParsingException | IllegalStateException e) {
510524
// there was an error in parsing the document that was not because
511525
// of pending mapping updates, so return a failure for the result
512-
return new Engine.IndexResult(e, request.version());
526+
return new Engine.IndexResult(e, request.version(), primary.getPrimaryTerm());
513527
}
514528
} else {
515529
// There was no mapping update, the operation is the same as the pre-update version.

core/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ public class DeleteResponse extends DocWriteResponse {
4242
public DeleteResponse() {
4343
}
4444

45-
public DeleteResponse(ShardId shardId, String type, String id, long seqNo, long version, boolean found) {
46-
super(shardId, type, id, seqNo, version, found ? Result.DELETED : Result.NOT_FOUND);
45+
public DeleteResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, boolean found) {
46+
super(shardId, type, id, seqNo, primaryTerm, version, found ? Result.DELETED : Result.NOT_FOUND);
4747
}
4848

4949
@Override
@@ -112,7 +112,7 @@ public void setFound(boolean found) {
112112

113113
@Override
114114
public DeleteResponse build() {
115-
DeleteResponse deleteResponse = new DeleteResponse(shardId, type, id, seqNo, version, found);
115+
DeleteResponse deleteResponse = new DeleteResponse(shardId, type, id, seqNo, primaryTerm, version, found);
116116
deleteResponse.setForcedRefresh(forcedRefresh);
117117
if (shardInfo != null) {
118118
deleteResponse.setShardInfo(shardInfo);

core/src/main/java/org/elasticsearch/action/index/IndexResponse.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ public class IndexResponse extends DocWriteResponse {
4343
public IndexResponse() {
4444
}
4545

46-
public IndexResponse(ShardId shardId, String type, String id, long seqNo, long version, boolean created) {
47-
super(shardId, type, id, seqNo, version, created ? Result.CREATED : Result.UPDATED);
46+
public IndexResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, boolean created) {
47+
super(shardId, type, id, seqNo, primaryTerm, version, created ? Result.CREATED : Result.UPDATED);
4848
}
4949

5050
@Override
@@ -62,6 +62,7 @@ public String toString() {
6262
builder.append(",version=").append(getVersion());
6363
builder.append(",result=").append(getResult().getLowercase());
6464
builder.append(",seqNo=").append(getSeqNo());
65+
builder.append(",primaryTerm=").append(getPrimaryTerm());
6566
builder.append(",shards=").append(Strings.toString(getShardInfo()));
6667
return builder.append("]").toString();
6768
}
@@ -114,7 +115,7 @@ public void setCreated(boolean created) {
114115

115116
@Override
116117
public IndexResponse build() {
117-
IndexResponse indexResponse = new IndexResponse(shardId, type, id, seqNo, version, created);
118+
IndexResponse indexResponse = new IndexResponse(shardId, type, id, seqNo, primaryTerm, version, created);
118119
indexResponse.setForcedRefresh(forcedRefresh);
119120
if (shardInfo != null) {
120121
indexResponse.setShardInfo(shardInfo);

0 commit comments

Comments
 (0)