diff --git a/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java b/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java index c3479ecc0cfff..aacb0ff17e744 100644 --- a/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java +++ b/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java @@ -57,6 +57,7 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr private static final String _ID = "_id"; private static final String _VERSION = "_version"; private static final String _SEQ_NO = "_seq_no"; + private static final String _PRIMARY_TERM = "_primary_term"; private static final String RESULT = "result"; private static final String FORCED_REFRESH = "forced_refresh"; @@ -116,14 +117,16 @@ public void writeTo(StreamOutput out) throws IOException { private String type; private long version; private long seqNo; + private long primaryTerm; private boolean forcedRefresh; protected Result result; - public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, long version, Result result) { + public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, Result result) { this.shardId = shardId; this.type = type; this.id = id; this.seqNo = seqNo; + this.primaryTerm = primaryTerm; this.version = version; this.result = result; } @@ -182,6 +185,15 @@ public long getSeqNo() { return seqNo; } + /** + * The primary term for this change. + * + * @return the primary term + */ + public long getPrimaryTerm() { + return primaryTerm; + } + /** * Did this request force a refresh? Requests that set {@link WriteRequest#setRefreshPolicy(RefreshPolicy)} to * {@link RefreshPolicy#IMMEDIATE} will always return true for this. Requests that set it to {@link RefreshPolicy#WAIT_UNTIL} will @@ -251,8 +263,10 @@ public void readFrom(StreamInput in) throws IOException { version = in.readZLong(); if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { seqNo = in.readZLong(); + primaryTerm = in.readVLong(); } else { seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + primaryTerm = 0; } forcedRefresh = in.readBoolean(); result = Result.readFrom(in); @@ -267,6 +281,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeZLong(version); if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { out.writeZLong(seqNo); + out.writeVLong(primaryTerm); } out.writeBoolean(forcedRefresh); result.writeTo(out); @@ -293,6 +308,7 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t builder.field(_SHARDS, shardInfo); if (getSeqNo() >= 0) { builder.field(_SEQ_NO, getSeqNo()); + builder.field(_PRIMARY_TERM, getPrimaryTerm()); } return builder; } @@ -333,6 +349,8 @@ protected static void parseInnerToXContent(XContentParser parser, Builder contex context.setForcedRefresh(parser.booleanValue()); } else if (_SEQ_NO.equals(currentFieldName)) { context.setSeqNo(parser.longValue()); + } else if (_PRIMARY_TERM.equals(currentFieldName)) { + context.setPrimaryTerm(parser.longValue()); } else { throwUnknownField(currentFieldName, parser.getTokenLocation()); } @@ -362,6 +380,7 @@ public abstract static class Builder { protected boolean forcedRefresh; protected ShardInfo shardInfo = null; protected Long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + protected Long primaryTerm = 0L; public ShardId getShardId() { return shardId; @@ -407,6 +426,10 @@ public void setSeqNo(Long seqNo) { this.seqNo = seqNo; } + public void setPrimaryTerm(Long primaryTerm) { + this.primaryTerm = primaryTerm; + } + public abstract DocWriteResponse build(); } } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 170f2d3053627..30f38230bc94f 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -54,6 +54,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.SourceToParse; @@ -142,7 +143,7 @@ private static BulkItemResultHolder executeIndexRequest(final IndexRequest index return new BulkItemResultHolder(null, indexResult, bulkItemRequest); } else { IndexResponse response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(), - indexResult.getSeqNo(), indexResult.getVersion(), indexResult.isCreated()); + indexResult.getSeqNo(), primary.getPrimaryTerm(), indexResult.getVersion(), indexResult.isCreated()); return new BulkItemResultHolder(response, indexResult, bulkItemRequest); } } @@ -155,7 +156,7 @@ private static BulkItemResultHolder executeDeleteRequest(final DeleteRequest del return new BulkItemResultHolder(null, deleteResult, bulkItemRequest); } else { DeleteResponse response = new DeleteResponse(primary.shardId(), deleteRequest.type(), deleteRequest.id(), - deleteResult.getSeqNo(), deleteResult.getVersion(), deleteResult.isFound()); + deleteResult.getSeqNo(), primary.getPrimaryTerm(), deleteResult.getVersion(), deleteResult.isFound()); return new BulkItemResultHolder(response, deleteResult, bulkItemRequest); } } @@ -276,7 +277,7 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq int requestIndex, UpdateHelper updateHelper, LongSupplier nowInMillis, final MappingUpdatePerformer mappingUpdater) throws Exception { - Engine.Result updateOperationResult = null; + Engine.Result result = null; UpdateResponse updateResponse = null; BulkItemRequest replicaRequest = request.items()[requestIndex]; int maxAttempts = updateRequest.retryOnConflict(); @@ -288,7 +289,7 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq } catch (Exception failure) { // we may fail translating a update to index or delete operation // we use index result to communicate failure while translating update request - updateOperationResult = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO); + result = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO); break; // out of retry loop } // execute translated update request @@ -298,34 +299,46 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq IndexRequest indexRequest = translate.action(); MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type()); indexRequest.process(mappingMd, request.index()); - updateOperationResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater); + result = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater); break; case DELETED: DeleteRequest deleteRequest = translate.action(); - updateOperationResult = executeDeleteRequestOnPrimary(deleteRequest, primary); + result = executeDeleteRequestOnPrimary(deleteRequest, primary); break; case NOOP: primary.noopUpdate(updateRequest.type()); break; default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult()); } - if (updateOperationResult == null) { + if (result == null) { // this is a noop operation updateResponse = translate.action(); break; // out of retry loop - } else if (updateOperationResult.hasFailure() == false) { + } else if (result.hasFailure() == false) { // enrich update response and // set translated update (index/delete) request for replica execution in bulk items - switch (updateOperationResult.getOperationType()) { + switch (result.getOperationType()) { case INDEX: + assert result instanceof Engine.IndexResult : result.getClass(); IndexRequest updateIndexRequest = translate.action(); - final IndexResponse indexResponse = new IndexResponse(primary.shardId(), - updateIndexRequest.type(), updateIndexRequest.id(), updateOperationResult.getSeqNo(), - updateOperationResult.getVersion(), ((Engine.IndexResult) updateOperationResult).isCreated()); + final IndexResponse indexResponse = new IndexResponse( + primary.shardId(), + updateIndexRequest.type(), + updateIndexRequest.id(), + result.getSeqNo(), + primary.getPrimaryTerm(), + result.getVersion(), + ((Engine.IndexResult) result).isCreated()); BytesReference indexSourceAsBytes = updateIndexRequest.source(); - updateResponse = new UpdateResponse(indexResponse.getShardInfo(), - indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getSeqNo(), - indexResponse.getVersion(), indexResponse.getResult()); + updateResponse = new UpdateResponse( + indexResponse.getShardInfo(), + indexResponse.getShardId(), + indexResponse.getType(), + indexResponse.getId(), + indexResponse.getSeqNo(), + indexResponse.getPrimaryTerm(), + indexResponse.getVersion(), + indexResponse.getResult()); if ((updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) || (updateRequest.fields() != null && updateRequest.fields().length > 0)) { Tuple> sourceAndContent = @@ -337,29 +350,46 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateIndexRequest); break; case DELETE: + assert result instanceof Engine.DeleteResult : result.getClass(); DeleteRequest updateDeleteRequest = translate.action(); - DeleteResponse deleteResponse = new DeleteResponse(primary.shardId(), - updateDeleteRequest.type(), updateDeleteRequest.id(), updateOperationResult.getSeqNo(), - updateOperationResult.getVersion(), ((Engine.DeleteResult) updateOperationResult).isFound()); - updateResponse = new UpdateResponse(deleteResponse.getShardInfo(), - deleteResponse.getShardId(), deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getSeqNo(), - deleteResponse.getVersion(), deleteResponse.getResult()); - updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, - request.index(), deleteResponse.getVersion(), translate.updatedSourceAsMap(), - translate.updateSourceContentType(), null)); + DeleteResponse deleteResponse = new DeleteResponse( + primary.shardId(), + updateDeleteRequest.type(), + updateDeleteRequest.id(), + result.getSeqNo(), + primary.getPrimaryTerm(), + result.getVersion(), + ((Engine.DeleteResult) result).isFound()); + updateResponse = new UpdateResponse( + deleteResponse.getShardInfo(), + deleteResponse.getShardId(), + deleteResponse.getType(), + deleteResponse.getId(), + deleteResponse.getSeqNo(), + deleteResponse.getPrimaryTerm(), + deleteResponse.getVersion(), + deleteResponse.getResult()); + final GetResult getResult = updateHelper.extractGetResult( + updateRequest, + request.index(), + deleteResponse.getVersion(), + translate.updatedSourceAsMap(), + translate.updateSourceContentType(), + null); + updateResponse.setGetResult(getResult); // set translated request as replica request replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateDeleteRequest); break; } - assert updateOperationResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO; + assert result.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO; // successful operation break; // out of retry loop - } else if (updateOperationResult.getFailure() instanceof VersionConflictEngineException == false) { + } else if (result.getFailure() instanceof VersionConflictEngineException == false) { // not a version conflict exception break; // out of retry loop } } - return new BulkItemResultHolder(updateResponse, updateOperationResult, replicaRequest); + return new BulkItemResultHolder(updateResponse, result, replicaRequest); } /** Modes for executing item request on replica depending on corresponding primary execution result */ @@ -513,8 +543,7 @@ private static Engine.IndexResult executeIndexRequestOnReplica( try { operation = prepareIndexOperationOnReplica(primaryResponse, request, replica); } catch (MapperParsingException e) { - return new Engine.IndexResult(e, primaryResponse.getVersion(), - primaryResponse.getSeqNo()); + return new Engine.IndexResult(e, primaryResponse.getVersion(), primaryResponse.getSeqNo()); } Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); diff --git a/core/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java b/core/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java index 3680d09d39b2a..1e42537395f7b 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java +++ b/core/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java @@ -42,8 +42,8 @@ public class DeleteResponse extends DocWriteResponse { public DeleteResponse() { } - public DeleteResponse(ShardId shardId, String type, String id, long seqNo, long version, boolean found) { - super(shardId, type, id, seqNo, version, found ? Result.DELETED : Result.NOT_FOUND); + public DeleteResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, boolean found) { + super(shardId, type, id, seqNo, primaryTerm, version, found ? Result.DELETED : Result.NOT_FOUND); } @Override @@ -112,7 +112,7 @@ public void setFound(boolean found) { @Override public DeleteResponse build() { - DeleteResponse deleteResponse = new DeleteResponse(shardId, type, id, seqNo, version, found); + DeleteResponse deleteResponse = new DeleteResponse(shardId, type, id, seqNo, primaryTerm, version, found); deleteResponse.setForcedRefresh(forcedRefresh); if (shardInfo != null) { deleteResponse.setShardInfo(shardInfo); diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexResponse.java b/core/src/main/java/org/elasticsearch/action/index/IndexResponse.java index 6310a2aac1868..f3b71d590ff88 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexResponse.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexResponse.java @@ -43,8 +43,8 @@ public class IndexResponse extends DocWriteResponse { public IndexResponse() { } - public IndexResponse(ShardId shardId, String type, String id, long seqNo, long version, boolean created) { - super(shardId, type, id, seqNo, version, created ? Result.CREATED : Result.UPDATED); + public IndexResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, boolean created) { + super(shardId, type, id, seqNo, primaryTerm, version, created ? Result.CREATED : Result.UPDATED); } @Override @@ -62,6 +62,7 @@ public String toString() { builder.append(",version=").append(getVersion()); builder.append(",result=").append(getResult().getLowercase()); builder.append(",seqNo=").append(getSeqNo()); + builder.append(",primaryTerm=").append(getPrimaryTerm()); builder.append(",shards=").append(Strings.toString(getShardInfo())); return builder.append("]").toString(); } @@ -114,7 +115,7 @@ public void setCreated(boolean created) { @Override public IndexResponse build() { - IndexResponse indexResponse = new IndexResponse(shardId, type, id, seqNo, version, created); + IndexResponse indexResponse = new IndexResponse(shardId, type, id, seqNo, primaryTerm, version, created); indexResponse.setForcedRefresh(forcedRefresh); if (shardInfo != null) { indexResponse.setShardInfo(shardInfo); diff --git a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index 67d62113062a7..189803f818fcd 100644 --- a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -179,7 +179,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< final BytesReference upsertSourceBytes = upsertRequest.source(); bulkAction.execute(toSingleItemBulkRequest(upsertRequest), wrapBulkResponse( ActionListener.wrap(response -> { - UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getVersion(), response.getResult()); + UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult()); if ((request.fetchSource() != null && request.fetchSource().fetchSource()) || (request.fields() != null && request.fields().length > 0)) { Tuple> sourceAndContent = @@ -200,7 +200,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< final BytesReference indexSourceBytes = indexRequest.source(); bulkAction.execute(toSingleItemBulkRequest(indexRequest), wrapBulkResponse( ActionListener.wrap(response -> { - UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getVersion(), response.getResult()); + UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult()); update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes)); update.setForcedRefresh(response.forcedRefresh()); listener.onResponse(update); @@ -211,7 +211,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< DeleteRequest deleteRequest = result.action(); bulkAction.execute(toSingleItemBulkRequest(deleteRequest), wrapBulkResponse( ActionListener.wrap(response -> { - UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getVersion(), response.getResult()); + UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult()); update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null)); update.setForcedRefresh(response.forcedRefresh()); listener.onResponse(update); diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateResponse.java b/core/src/main/java/org/elasticsearch/action/update/UpdateResponse.java index 6f736a024eb39..672b190d91130 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateResponse.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateResponse.java @@ -47,11 +47,12 @@ public UpdateResponse() { * For example: update script with operation set to none */ public UpdateResponse(ShardId shardId, String type, String id, long version, Result result) { - this(new ShardInfo(0, 0), shardId, type, id, SequenceNumbersService.UNASSIGNED_SEQ_NO, version, result); + this(new ShardInfo(0, 0), shardId, type, id, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, version, result); } - public UpdateResponse(ShardInfo shardInfo, ShardId shardId, String type, String id, long seqNo, long version, Result result) { - super(shardId, type, id, seqNo, version, result); + public UpdateResponse( + ShardInfo shardInfo, ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, Result result) { + super(shardId, type, id, seqNo, primaryTerm, version, result); setShardInfo(shardInfo); } @@ -106,6 +107,8 @@ public String toString() { builder.append(",type=").append(getType()); builder.append(",id=").append(getId()); builder.append(",version=").append(getVersion()); + builder.append(",seqNo=").append(getSeqNo()); + builder.append(",primaryTerm=").append(getPrimaryTerm()); builder.append(",result=").append(getResult().getLowercase()); builder.append(",shards=").append(getShardInfo()); return builder.append("]").toString(); @@ -154,7 +157,7 @@ public void setGetResult(GetResult getResult) { public UpdateResponse build() { UpdateResponse update; if (shardInfo != null && seqNo != null) { - update = new UpdateResponse(shardInfo, shardId, type, id, seqNo, version, result); + update = new UpdateResponse(shardInfo, shardId, type, id, seqNo, primaryTerm, version, result); } else { update = new UpdateResponse(shardId, type, id, version, result); } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 75e5294525d15..3e5d3453cacf9 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -611,8 +611,8 @@ public IndexResult index(Index index) throws IOException { } else if (plan.indexIntoLucene) { indexResult = indexIntoLucene(index, plan); } else { - indexResult = new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, - plan.currentNotFoundOrDeleted); + indexResult = new IndexResult( + plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); } if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { final Translog.Location location; @@ -704,10 +704,9 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { } if (index.versionType().isVersionConflictForWrites( currentVersion, index.version(), currentNotFoundOrDeleted)) { - plan = IndexingStrategy.skipDueToVersionConflict( - new VersionConflictEngineException(shardId, index, currentVersion, - currentNotFoundOrDeleted), - currentNotFoundOrDeleted, currentVersion); + final VersionConflictEngineException e = + new VersionConflictEngineException(shardId, index, currentVersion, currentNotFoundOrDeleted); + plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion); } else { plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted, seqNoService().generateSeqNo(), @@ -828,12 +827,11 @@ static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) { return new IndexingStrategy(true, false, true, seqNoForIndexing, 1, null); } - static IndexingStrategy skipDueToVersionConflict(VersionConflictEngineException e, - boolean currentNotFoundOrDeleted, - long currentVersion) { - return new IndexingStrategy(currentNotFoundOrDeleted, false, - false, SequenceNumbersService.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, - new IndexResult(e, currentVersion)); + static IndexingStrategy skipDueToVersionConflict( + VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) { + final IndexResult result = new IndexResult(e, currentVersion); + return new IndexingStrategy( + currentNotFoundOrDeleted, false, false, SequenceNumbersService.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, result); } static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted, @@ -903,8 +901,8 @@ public DeleteResult delete(Delete delete) throws IOException { } else if (plan.deleteFromLucene) { deleteResult = deleteInLucene(delete, plan); } else { - deleteResult = new DeleteResult(plan.versionOfDeletion, plan.seqNoOfDeletion, - plan.currentlyDeleted == false); + deleteResult = new DeleteResult( + plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false); } if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { final Translog.Location location; @@ -982,9 +980,8 @@ private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException } final DeletionStrategy plan; if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) { - plan = DeletionStrategy.skipDueToVersionConflict( - new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted), - currentVersion, currentlyDeleted); + final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted); + plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted); } else { plan = DeletionStrategy.processNormally(currentlyDeleted, seqNoService().generateSeqNo(), @@ -1009,8 +1006,8 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) } catch (Exception ex) { if (indexWriter.getTragicException() == null) { // there is no tragic event and such it must be a document level failure - return new DeleteResult(ex, plan.versionOfDeletion, plan.versionOfDeletion, - plan.currentlyDeleted == false); + return new DeleteResult( + ex, plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false); } else { throw ex; } @@ -1040,26 +1037,20 @@ private DeletionStrategy(boolean deleteFromLucene, boolean currentlyDeleted, Optional.empty() : Optional.of(earlyResultOnPreflightError); } - static DeletionStrategy skipDueToVersionConflict(VersionConflictEngineException e, - long currentVersion, boolean currentlyDeleted) { - return new DeletionStrategy(false, currentlyDeleted, - SequenceNumbersService.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, - new DeleteResult(e, currentVersion, SequenceNumbersService.UNASSIGNED_SEQ_NO, - currentlyDeleted == false)); + static DeletionStrategy skipDueToVersionConflict( + VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) { + final long unassignedSeqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + final DeleteResult deleteResult = new DeleteResult(e, currentVersion, unassignedSeqNo, currentlyDeleted == false); + return new DeletionStrategy(false, currentlyDeleted, unassignedSeqNo, Versions.NOT_FOUND, deleteResult); } - static DeletionStrategy processNormally(boolean currentlyDeleted, - long seqNoOfDeletion, long versionOfDeletion) { - return new DeletionStrategy(true, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, - null); + static DeletionStrategy processNormally(boolean currentlyDeleted, long seqNoOfDeletion, long versionOfDeletion) { + return new DeletionStrategy(true, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null); } - public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, - long seqNoOfDeletion, - long versionOfDeletion) { - return new DeletionStrategy(false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, - null); + public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, long seqNoOfDeletion, long versionOfDeletion) { + return new DeletionStrategy(false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null); } } diff --git a/core/src/test/java/org/elasticsearch/action/DocWriteResponseTests.java b/core/src/test/java/org/elasticsearch/action/DocWriteResponseTests.java index 52eb8a82743c6..bb1f2d2a637f5 100644 --- a/core/src/test/java/org/elasticsearch/action/DocWriteResponseTests.java +++ b/core/src/test/java/org/elasticsearch/action/DocWriteResponseTests.java @@ -43,6 +43,7 @@ public void testGetLocation() { "type", "id", SequenceNumbersService.UNASSIGNED_SEQ_NO, + 17, 0, Result.CREATED) {}; assertEquals("/index/type/id", response.getLocation(null)); @@ -56,6 +57,7 @@ public void testGetLocationNonAscii() { "type", "❤", SequenceNumbersService.UNASSIGNED_SEQ_NO, + 17, 0, Result.CREATED) {}; assertEquals("/index/type/%E2%9D%A4", response.getLocation(null)); @@ -69,6 +71,7 @@ public void testGetLocationWithSpaces() { "type", "a b", SequenceNumbersService.UNASSIGNED_SEQ_NO, + 17, 0, Result.CREATED) {}; assertEquals("/index/type/a+b", response.getLocation(null)); @@ -86,6 +89,7 @@ public void testToXContentDoesntIncludeForcedRefreshUnlessForced() throws IOExce "type", "id", SequenceNumbersService.UNASSIGNED_SEQ_NO, + 17, 0, Result.CREATED) { // DocWriteResponse is abstract so we have to sneak a subclass in here to test it. diff --git a/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java b/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java index b6242e6d5fcd4..e7bd34e76ef3a 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/BulkRequestModifierTests.java @@ -117,7 +117,7 @@ public void onFailure(Exception e) { for (DocWriteRequest actionRequest : bulkRequest.requests()) { IndexRequest indexRequest = (IndexRequest) actionRequest; IndexResponse indexResponse = new IndexResponse(new ShardId("index", "_na_", 0), indexRequest.type(), - indexRequest.id(), 1, 1, true); + indexRequest.id(), 1, 17, 1, true); originalResponses.add(new BulkItemResponse(Integer.parseInt(indexRequest.id()), indexRequest.opType(), indexResponse)); } bulkResponseListener.onResponse(new BulkResponse(originalResponses.toArray(new BulkItemResponse[originalResponses.size()]), 0)); diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 4016c2cbdef2b..941cdbf995752 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -85,7 +85,7 @@ public void testShouldExecuteReplicaItem() throws Exception { // Successful index request should be replicated DocWriteRequest writeRequest = new IndexRequest("index", "type", "id") .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); - DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 1, randomBoolean()); + DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 17, 1, randomBoolean()); BulkItemRequest request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, response)); assertThat(replicaItemExecutionMode(request, 0), @@ -471,7 +471,7 @@ public void testUpdateReplicaRequestWithSuccess() throws Exception { boolean created = randomBoolean(); Translog.Location resultLocation = new Translog.Location(42, 42, 42); Engine.IndexResult indexResult = new FakeResult(1, 1, created, resultLocation); - DocWriteResponse indexResponse = new IndexResponse(shardId, "index", "id", 1, 1, created); + DocWriteResponse indexResponse = new IndexResponse(shardId, "index", "id", 1, 17, 1, created); BulkItemResultHolder goodResults = new BulkItemResultHolder(indexResponse, indexResult, replicaRequest); @@ -509,10 +509,12 @@ public void testCalculateTranslogLocation() throws Exception { equalTo(original)); boolean created = randomBoolean(); - DocWriteResponse indexResponse = new IndexResponse(shardId, "index", "id", 1, 1, created); + DocWriteResponse indexResponse = new IndexResponse(shardId, "index", "id", 1, 17, 1, created); Translog.Location newLocation = new Translog.Location(1, 1, 1); - Engine.IndexResult indexResult = new IndexResultWithLocation(randomNonNegativeLong(), - randomNonNegativeLong(), created, newLocation); + final long version = randomNonNegativeLong(); + final long seqNo = randomNonNegativeLong(); + final long primaryTerm = randomIntBetween(1, 16); + Engine.IndexResult indexResult = new IndexResultWithLocation(version, seqNo, primaryTerm, created, newLocation); results = new BulkItemResultHolder(indexResponse, indexResult, replicaRequest); assertThat(TransportShardBulkAction.calculateTranslogLocation(original, results), equalTo(newLocation)); @@ -614,8 +616,7 @@ public void verifyMappings(Engine.Index operation, public class IndexResultWithLocation extends Engine.IndexResult { private final Translog.Location location; - public IndexResultWithLocation(long version, long seqNo, boolean created, - Translog.Location newLocation) { + public IndexResultWithLocation(long version, long seqNo, long primaryTerm, boolean created, Translog.Location newLocation) { super(version, seqNo, created); this.location = newLocation; } @@ -630,8 +631,7 @@ public void testPrepareIndexOpOnReplica() throws Exception { IndexMetaData metaData = indexMetaData(); IndexShard shard = newStartedShard(false); - DocWriteResponse primaryResponse = new IndexResponse(shardId, "index", "id", - 1, 1, randomBoolean()); + DocWriteResponse primaryResponse = new IndexResponse(shardId, "index", "id", 1, 17, 1, randomBoolean()); IndexRequest request = new IndexRequest("index", "type", "id") .source(Requests.INDEX_CONTENT_TYPE, "field", "value"); @@ -652,8 +652,7 @@ private static class FakeResult extends Engine.IndexResult { private final Translog.Location location; - protected FakeResult(long version, long seqNo, boolean created, - Translog.Location location) { + protected FakeResult(long version, long seqNo, boolean created, Translog.Location location) { super(version, seqNo, created); this.location = location; } diff --git a/core/src/test/java/org/elasticsearch/action/bulk/byscroll/AsyncBulkByScrollActionTests.java b/core/src/test/java/org/elasticsearch/action/bulk/byscroll/AsyncBulkByScrollActionTests.java index 5786482e79e00..fa42573e439ee 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/byscroll/AsyncBulkByScrollActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/byscroll/AsyncBulkByScrollActionTests.java @@ -290,10 +290,11 @@ public void testBulkResponseSetsLotsOfStatus() { default: throw new RuntimeException("Bad scenario"); } - responses[i] = new BulkItemResponse( - i, - opType, - new IndexResponse(shardId, "type", "id" + i, randomInt(20), randomInt(), createdResponse)); + final int seqNo = randomInt(20); + final int primaryTerm = randomIntBetween(1, 16); + final IndexResponse response = + new IndexResponse(shardId, "type", "id" + i, seqNo, primaryTerm, randomInt(), createdResponse); + responses[i] = new BulkItemResponse(i, opType, response); } new DummyAsyncBulkByScrollAction().onBulkResponse(timeValueNanos(System.nanoTime()), new BulkResponse(responses, 0)); assertEquals(versionConflicts, testTask.getStatus().getVersionConflicts()); @@ -799,6 +800,7 @@ RequestBuilder extends ActionRequestBuilder> index.type(), index.id(), randomInt(20), + randomIntBetween(1, 16), randomIntBetween(0, Integer.MAX_VALUE), true); } else if (item instanceof UpdateRequest) { @@ -813,6 +815,7 @@ RequestBuilder extends ActionRequestBuilder> delete.type(), delete.id(), randomInt(20), + randomIntBetween(1, 16), randomIntBetween(0, Integer.MAX_VALUE), true); } else { diff --git a/core/src/test/java/org/elasticsearch/action/delete/DeleteResponseTests.java b/core/src/test/java/org/elasticsearch/action/delete/DeleteResponseTests.java index 2b13b2b8b4e8f..95fbbe8ed1466 100644 --- a/core/src/test/java/org/elasticsearch/action/delete/DeleteResponseTests.java +++ b/core/src/test/java/org/elasticsearch/action/delete/DeleteResponseTests.java @@ -40,13 +40,13 @@ public class DeleteResponseTests extends ESTestCase { public void testToXContent() { { - DeleteResponse response = new DeleteResponse(new ShardId("index", "index_uuid", 0), "type", "id", 3, 5, true); + DeleteResponse response = new DeleteResponse(new ShardId("index", "index_uuid", 0), "type", "id", 3, 17, 5, true); String output = Strings.toString(response); assertEquals("{\"found\":true,\"_index\":\"index\",\"_type\":\"type\",\"_id\":\"id\",\"_version\":5,\"result\":\"deleted\"," + - "\"_shards\":null,\"_seq_no\":3}", output); + "\"_shards\":null,\"_seq_no\":3,\"_primary_term\":17}", output); } { - DeleteResponse response = new DeleteResponse(new ShardId("index", "index_uuid", 0), "type", "id", -1, 7, true); + DeleteResponse response = new DeleteResponse(new ShardId("index", "index_uuid", 0), "type", "id", -1, 0, 7, true); response.setForcedRefresh(true); response.setShardInfo(new ReplicationResponse.ShardInfo(10, 5)); String output = Strings.toString(response); @@ -89,17 +89,19 @@ public static Tuple randomDeleteResponse() { String type = randomAlphaOfLength(5); String id = randomAlphaOfLength(5); long seqNo = randomFrom(SequenceNumbersService.UNASSIGNED_SEQ_NO, randomNonNegativeLong(), (long) randomIntBetween(0, 10000)); + long primaryTerm = seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO ? 0 : randomIntBetween(1, 10000); long version = randomBoolean() ? randomNonNegativeLong() : randomIntBetween(0, 10000); boolean found = randomBoolean(); boolean forcedRefresh = randomBoolean(); Tuple shardInfos = RandomObjects.randomShardInfo(random()); - DeleteResponse actual = new DeleteResponse(new ShardId(index, indexUUid, shardId), type, id, seqNo, version, found); + DeleteResponse actual = new DeleteResponse(new ShardId(index, indexUUid, shardId), type, id, seqNo, primaryTerm, version, found); actual.setForcedRefresh(forcedRefresh); actual.setShardInfo(shardInfos.v1()); - DeleteResponse expected = new DeleteResponse(new ShardId(index, INDEX_UUID_NA_VALUE, -1), type, id, seqNo, version, found); + DeleteResponse expected = + new DeleteResponse(new ShardId(index, INDEX_UUID_NA_VALUE, -1), type, id, seqNo, primaryTerm, version, found); expected.setForcedRefresh(forcedRefresh); expected.setShardInfo(shardInfos.v2()); diff --git a/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java b/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java index b4836496f885f..4fb1d0c648ea2 100644 --- a/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java @@ -135,7 +135,7 @@ public void testIndexResponse() { String id = randomAlphaOfLengthBetween(3, 10); long version = randomLong(); boolean created = randomBoolean(); - IndexResponse indexResponse = new IndexResponse(shardId, type, id, SequenceNumbersService.UNASSIGNED_SEQ_NO, version, created); + IndexResponse indexResponse = new IndexResponse(shardId, type, id, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, version, created); int total = randomIntBetween(1, 10); int successful = randomIntBetween(1, 10); ReplicationResponse.ShardInfo shardInfo = new ReplicationResponse.ShardInfo(total, successful); @@ -156,6 +156,7 @@ public void testIndexResponse() { assertEquals("IndexResponse[index=" + shardId.getIndexName() + ",type=" + type + ",id="+ id + ",version=" + version + ",result=" + (created ? "created" : "updated") + ",seqNo=" + SequenceNumbersService.UNASSIGNED_SEQ_NO + + ",primaryTerm=" + 0 + ",shards={\"total\":" + total + ",\"successful\":" + successful + ",\"failed\":0}]", indexResponse.toString()); } diff --git a/core/src/test/java/org/elasticsearch/action/index/IndexResponseTests.java b/core/src/test/java/org/elasticsearch/action/index/IndexResponseTests.java index c222d2d8964da..58947a7173e3d 100644 --- a/core/src/test/java/org/elasticsearch/action/index/IndexResponseTests.java +++ b/core/src/test/java/org/elasticsearch/action/index/IndexResponseTests.java @@ -41,13 +41,13 @@ public class IndexResponseTests extends ESTestCase { public void testToXContent() { { - IndexResponse indexResponse = new IndexResponse(new ShardId("index", "index_uuid", 0), "type", "id", 3, 5, true); + IndexResponse indexResponse = new IndexResponse(new ShardId("index", "index_uuid", 0), "type", "id", 3, 17, 5, true); String output = Strings.toString(indexResponse); assertEquals("{\"_index\":\"index\",\"_type\":\"type\",\"_id\":\"id\",\"_version\":5,\"result\":\"created\",\"_shards\":null," + - "\"_seq_no\":3,\"created\":true}", output); + "\"_seq_no\":3,\"_primary_term\":17,\"created\":true}", output); } { - IndexResponse indexResponse = new IndexResponse(new ShardId("index", "index_uuid", 0), "type", "id", -1, 7, true); + IndexResponse indexResponse = new IndexResponse(new ShardId("index", "index_uuid", 0), "type", "id", -1, 17, 7, true); indexResponse.setForcedRefresh(true); indexResponse.setShardInfo(new ReplicationResponse.ShardInfo(10, 5)); String output = Strings.toString(indexResponse); @@ -102,17 +102,19 @@ public static Tuple randomIndexResponse() { String type = randomAlphaOfLength(5); String id = randomAlphaOfLength(5); long seqNo = randomFrom(SequenceNumbersService.UNASSIGNED_SEQ_NO, randomNonNegativeLong(), (long) randomIntBetween(0, 10000)); + long primaryTerm = seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO ? 0 : randomIntBetween(1, 10000); long version = randomBoolean() ? randomNonNegativeLong() : randomIntBetween(0, 10000); boolean created = randomBoolean(); boolean forcedRefresh = randomBoolean(); Tuple shardInfos = RandomObjects.randomShardInfo(random()); - IndexResponse actual = new IndexResponse(new ShardId(index, indexUUid, shardId), type, id, seqNo, version, created); + IndexResponse actual = new IndexResponse(new ShardId(index, indexUUid, shardId), type, id, seqNo, primaryTerm, version, created); actual.setForcedRefresh(forcedRefresh); actual.setShardInfo(shardInfos.v1()); - IndexResponse expected = new IndexResponse(new ShardId(index, INDEX_UUID_NA_VALUE, -1), type, id, seqNo, version, created); + IndexResponse expected = + new IndexResponse(new ShardId(index, INDEX_UUID_NA_VALUE, -1), type, id, seqNo, primaryTerm, version, created); expected.setForcedRefresh(forcedRefresh); expected.setShardInfo(shardInfos.v2()); diff --git a/core/src/test/java/org/elasticsearch/action/update/UpdateResponseTests.java b/core/src/test/java/org/elasticsearch/action/update/UpdateResponseTests.java index 153c27b370363..1c80ddca1c533 100644 --- a/core/src/test/java/org/elasticsearch/action/update/UpdateResponseTests.java +++ b/core/src/test/java/org/elasticsearch/action/update/UpdateResponseTests.java @@ -57,10 +57,10 @@ public void testToXContent() throws IOException { } { UpdateResponse updateResponse = new UpdateResponse(new ReplicationResponse.ShardInfo(10, 6), - new ShardId("index", "index_uuid", 1), "type", "id", 3, 1, DELETED); + new ShardId("index", "index_uuid", 1), "type", "id", 3, 17, 1, DELETED); String output = Strings.toString(updateResponse); assertEquals("{\"_index\":\"index\",\"_type\":\"type\",\"_id\":\"id\",\"_version\":1,\"result\":\"deleted\"," + - "\"_shards\":{\"total\":10,\"successful\":6,\"failed\":0},\"_seq_no\":3}", output); + "\"_shards\":{\"total\":10,\"successful\":6,\"failed\":0},\"_seq_no\":3,\"_primary_term\":17}", output); } { BytesReference source = new BytesArray("{\"title\":\"Book title\",\"isbn\":\"ABC-123\"}"); @@ -69,12 +69,12 @@ public void testToXContent() throws IOException { fields.put("isbn", new GetField("isbn", Collections.singletonList("ABC-123"))); UpdateResponse updateResponse = new UpdateResponse(new ReplicationResponse.ShardInfo(3, 2), - new ShardId("books", "books_uuid", 2), "book", "1", 7, 2, UPDATED); + new ShardId("books", "books_uuid", 2), "book", "1", 7, 17, 2, UPDATED); updateResponse.setGetResult(new GetResult("books", "book", "1", 2, true, source, fields)); String output = Strings.toString(updateResponse); assertEquals("{\"_index\":\"books\",\"_type\":\"book\",\"_id\":\"1\",\"_version\":2,\"result\":\"updated\"," + - "\"_shards\":{\"total\":3,\"successful\":2,\"failed\":0},\"_seq_no\":7,\"get\":{\"found\":true," + + "\"_shards\":{\"total\":3,\"successful\":2,\"failed\":0},\"_seq_no\":7,\"_primary_term\":17,\"get\":{\"found\":true," + "\"_source\":{\"title\":\"Book title\",\"isbn\":\"ABC-123\"},\"fields\":{\"isbn\":[\"ABC-123\"],\"title\":[\"Book " + "title\"]}}}", output); } @@ -128,6 +128,7 @@ public static Tuple randomUpdateResponse(XConten // We also want small number values (randomNonNegativeLong() tend to generate high numbers) // in order to catch some conversion error that happen between int/long after parsing. Long seqNo = randomFrom(randomNonNegativeLong(), (long) randomIntBetween(0, 10_000), null); + long primaryTerm = seqNo == null ? 0 : randomIntBetween(1, 16); ShardId actualShardId = new ShardId(index, indexUUid, shardId); ShardId expectedShardId = new ShardId(index, INDEX_UUID_NA_VALUE, -1); @@ -136,8 +137,8 @@ public static Tuple randomUpdateResponse(XConten if (seqNo != null) { Tuple shardInfos = RandomObjects.randomShardInfo(random()); - actual = new UpdateResponse(shardInfos.v1(), actualShardId, type, id, seqNo, version, result); - expected = new UpdateResponse(shardInfos.v2(), expectedShardId, type, id, seqNo, version, result); + actual = new UpdateResponse(shardInfos.v1(), actualShardId, type, id, seqNo, primaryTerm, version, result); + expected = new UpdateResponse(shardInfos.v2(), expectedShardId, type, id, seqNo, primaryTerm, version, result); } else { actual = new UpdateResponse(actualShardId, type, id, version, result); expected = new UpdateResponse(expectedShardId, type, id, version, result); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 4f572f81f33a0..dfb8efb9ab943 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -2055,6 +2055,7 @@ public void testTranslogOpSerialization() throws Exception { SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); assert Version.CURRENT.major <= 6 : "Using UNASSIGNED_SEQ_NO can be removed in 7.0, because 6.0+ nodes have actual sequence numbers"; long randomSeqNum = randomBoolean() ? SequenceNumbersService.UNASSIGNED_SEQ_NO : randomNonNegativeLong(); + long primaryTerm = randomSeqNum == SequenceNumbersService.UNASSIGNED_SEQ_NO ? 0 : randomIntBetween(1, 16); long randomPrimaryTerm = randomBoolean() ? 0 : randomNonNegativeLong(); seqID.seqNo.setLongValue(randomSeqNum); seqID.seqNoDocValue.setLongValue(randomSeqNum); diff --git a/docs/reference/docs/bulk.asciidoc b/docs/reference/docs/bulk.asciidoc index d25641cacbd9e..0c2d5fd447312 100644 --- a/docs/reference/docs/bulk.asciidoc +++ b/docs/reference/docs/bulk.asciidoc @@ -104,7 +104,8 @@ The result of this bulk operation is: }, "created": true, "status": 201, - "_seq_no" : 0 + "_seq_no" : 0, + "_primary_term": 1 } }, { @@ -121,7 +122,8 @@ The result of this bulk operation is: "failed": 0 }, "status": 404, - "_seq_no" : 1 + "_seq_no" : 1, + "_primary_term" : 2 } }, { @@ -138,7 +140,8 @@ The result of this bulk operation is: }, "created": true, "status": 201, - "_seq_no" : 2 + "_seq_no" : 2, + "_primary_term" : 3 } }, { @@ -154,13 +157,23 @@ The result of this bulk operation is: "failed": 0 }, "status": 200, - "_seq_no" : 3 + "_seq_no" : 3, + "_primary_term" : 4 } } ] } -------------------------------------------------- -// TESTRESPONSE[s/"took": 30/"took": $body.took/ s/"index_uuid": .../"index_uuid": $body.items.3.update.error.index_uuid/ s/"_seq_no" : 0/"_seq_no" : $body.items.0.index._seq_no/ s/"_seq_no" : 1/"_seq_no" : $body.items.1.delete._seq_no/ s/"_seq_no" : 2/"_seq_no" : $body.items.2.create._seq_no/ s/"_seq_no" : 3/"_seq_no" : $body.items.3.update._seq_no/] +// TESTRESPONSE[s/"took": 30/"took": $body.took/] +// TESTRESPONSE[s/"index_uuid": .../"index_uuid": $body.items.3.update.error.index_uuid/] +// TESTRESPONSE[s/"_seq_no" : 0/"_seq_no" : $body.items.0.index._seq_no/] +// TESTRESPONSE[s/"_primary_term" : 1/"_primary_term" : $body.items.0.index._primary_term/] +// TESTRESPONSE[s/"_seq_no" : 1/"_seq_no" : $body.items.1.delete._seq_no/] +// TESTRESPONSE[s/"_primary_term" : 2/"_primary_term" : $body.items.1.delete._primary_term/] +// TESTRESPONSE[s/"_seq_no" : 2/"_seq_no" : $body.items.2.create._seq_no/] +// TESTRESPONSE[s/"_primary_term" : 3/"_primary_term" : $body.items.2.create._primary_term/] +// TESTRESPONSE[s/"_seq_no" : 3/"_seq_no" : $body.items.3.update._seq_no/] +// TESTRESPONSE[s/"_primary_term" : 4/"_primary_term" : $body.items.3.update._primary_term/] The endpoints are `/_bulk`, `/{index}/_bulk`, and `{index}/{type}/_bulk`. When the index or the index/type are provided, they will be used by diff --git a/docs/reference/docs/index_.asciidoc b/docs/reference/docs/index_.asciidoc index fb904e121756d..2af9cf0a0c529 100644 --- a/docs/reference/docs/index_.asciidoc +++ b/docs/reference/docs/index_.asciidoc @@ -32,6 +32,7 @@ The result of the above index operation is: "_version" : 1, "created" : true, "_seq_no" : 0, + "_primary_term" : 1, "result" : created } -------------------------------------------------- @@ -230,6 +231,7 @@ The result of the above index operation is: "_version" : 1, "created" : true, "_seq_no" : 0, + "_primary_term" : 1, "result": "created" } -------------------------------------------------- diff --git a/docs/reference/getting-started.asciidoc b/docs/reference/getting-started.asciidoc index 87d20b35221c7..608a462e1f3ea 100755 --- a/docs/reference/getting-started.asciidoc +++ b/docs/reference/getting-started.asciidoc @@ -323,10 +323,11 @@ And the response: "failed" : 0 }, "created" : true, - "_seq_no" : 0 + "_seq_no" : 0, + "_primary_term" : 1 } -------------------------------------------------- -// TESTRESPONSE[s/"_seq_no" : 0/"_seq_no" : $body._seq_no/] +// TESTRESPONSE[s/"_seq_no" : 0/"_seq_no" : $body._seq_no/ s/"_primary_term" : 1/"_primary_term" : $body._primary_term/] From the above, we can see that a new customer document was successfully created inside the customer index and the external type. The document also has an internal id of 1 which we specified at index time. diff --git a/docs/reference/ingest/ingest-node.asciidoc b/docs/reference/ingest/ingest-node.asciidoc index 8009f67e8e90c..f195ee1f2fd75 100644 --- a/docs/reference/ingest/ingest-node.asciidoc +++ b/docs/reference/ingest/ingest-node.asciidoc @@ -906,7 +906,8 @@ PUT /myindex/type/1?pipeline=monthlyindex "failed" : 0 }, "created" : true, - "_seq_no" : 0 + "_seq_no" : 0, + "_primary_term" : 1 } -------------------------------------------------- // TESTRESPONSE diff --git a/docs/reference/query-dsl/percolate-query.asciidoc b/docs/reference/query-dsl/percolate-query.asciidoc index 0e4ce7a8d4e76..1d43bff06a136 100644 --- a/docs/reference/query-dsl/percolate-query.asciidoc +++ b/docs/reference/query-dsl/percolate-query.asciidoc @@ -182,7 +182,8 @@ Index response: }, "created": true, "result": "created", - "_seq_no" : 1 + "_seq_no" : 1, + "_primary_term" : 1 } -------------------------------------------------- // TESTRESPONSE