diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index a30fec41b0bf3..860788e0157a5 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -108,6 +108,8 @@ static Request delete(DeleteRequest deleteRequest) { parameters.withTimeout(deleteRequest.timeout()); parameters.withVersion(deleteRequest.version()); parameters.withVersionType(deleteRequest.versionType()); + parameters.withIfSeqNo(deleteRequest.ifSeqNo()); + parameters.withIfPrimaryTerm(deleteRequest.ifPrimaryTerm()); parameters.withRefreshPolicy(deleteRequest.getRefreshPolicy()); parameters.withWaitForActiveShards(deleteRequest.waitForActiveShards()); return request; @@ -191,6 +193,11 @@ static Request bulk(BulkRequest bulkRequest) throws IOException { } } + if (action.ifSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { + metadata.field("if_seq_no", action.ifSeqNo()); + metadata.field("if_primary_term", action.ifPrimaryTerm()); + } + if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) { IndexRequest indexRequest = (IndexRequest) action; if (Strings.hasLength(indexRequest.getPipeline())) { @@ -319,6 +326,8 @@ static Request index(IndexRequest indexRequest) { parameters.withTimeout(indexRequest.timeout()); parameters.withVersion(indexRequest.version()); parameters.withVersionType(indexRequest.versionType()); + parameters.withIfSeqNo(indexRequest.ifSeqNo()); + parameters.withIfPrimaryTerm(indexRequest.ifPrimaryTerm()); parameters.withPipeline(indexRequest.getPipeline()); parameters.withRefreshPolicy(indexRequest.getRefreshPolicy()); parameters.withWaitForActiveShards(indexRequest.waitForActiveShards()); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index 3bd3c79072dc9..e2102236cc422 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -104,11 +104,13 @@ public void testDelete() throws IOException { { // Testing deletion String docId = "id"; - highLevelClient().index( + IndexResponse indexResponse = highLevelClient().index( new IndexRequest("index").id(docId).source(Collections.singletonMap("foo", "bar")), RequestOptions.DEFAULT); + assertThat(indexResponse.getSeqNo(), greaterThanOrEqualTo(0L)); DeleteRequest deleteRequest = new DeleteRequest("index", docId); if (randomBoolean()) { - deleteRequest.version(1L); + deleteRequest.setIfSeqNo(indexResponse.getSeqNo()); + deleteRequest.setIfPrimaryTerm(indexResponse.getPrimaryTerm()); } DeleteResponse deleteResponse = execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync); assertEquals("index", deleteResponse.getIndex()); @@ -131,12 +133,13 @@ public void testDelete() throws IOException { String docId = "version_conflict"; highLevelClient().index( new IndexRequest("index").id( docId).source(Collections.singletonMap("foo", "bar")), RequestOptions.DEFAULT); - DeleteRequest deleteRequest = new DeleteRequest("index", docId).version(2); + DeleteRequest deleteRequest = new DeleteRequest("index", docId).setIfSeqNo(2).setIfPrimaryTerm(2); ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync)); assertEquals(RestStatus.CONFLICT, exception.status()); assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][" + docId + "]: " + - "version conflict, current version [1] is different than the one provided [2]]", exception.getMessage()); + "version conflict, required seqNo [2], primary term [2]. current document has seqNo [3] and primary term [1]]", + exception.getMessage()); assertEquals("index", exception.getMetadata("es.index").get(0)); } { @@ -519,13 +522,14 @@ public void testIndex() throws IOException { ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> { IndexRequest wrongRequest = new IndexRequest("index").id("id"); wrongRequest.source(XContentBuilder.builder(xContentType.xContent()).startObject().field("field", "test").endObject()); - wrongRequest.version(5L); + wrongRequest.setIfSeqNo(1L).setIfPrimaryTerm(5L); execute(wrongRequest, highLevelClient()::index, highLevelClient()::indexAsync); }); assertEquals(RestStatus.CONFLICT, exception.status()); assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][id]: " + - "version conflict, current version [2] is different than the one provided [5]]", exception.getMessage()); + "version conflict, required seqNo [1], primary term [5]. current document has seqNo [2] and primary term [1]]", + exception.getMessage()); assertEquals("index", exception.getMetadata("es.index").get(0)); } { @@ -820,7 +824,8 @@ public void testBulk() throws IOException { if (opType == DocWriteRequest.OpType.INDEX) { IndexRequest indexRequest = new IndexRequest("index").id(id).source(source, xContentType); if (erroneous) { - indexRequest.version(12L); + indexRequest.setIfSeqNo(12L); + indexRequest.setIfPrimaryTerm(12L); } bulkRequest.add(indexRequest); @@ -1130,7 +1135,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) if (opType == DocWriteRequest.OpType.INDEX) { IndexRequest indexRequest = new IndexRequest("index").id(id).source(xContentType, "id", i); if (erroneous) { - indexRequest.version(12L); + indexRequest.setIfSeqNo(12L); + indexRequest.setIfPrimaryTerm(12L); } processor.add(indexRequest); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index b58e5ae8852d3..9364e2ce2d57c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -281,6 +281,7 @@ public void testDelete() { setRandomRefreshPolicy(deleteRequest::setRefreshPolicy, expectedParams); setRandomVersion(deleteRequest, expectedParams); setRandomVersionType(deleteRequest::versionType, expectedParams); + setRandomIfSeqNoAndTerm(deleteRequest, expectedParams); if (frequently()) { if (randomBoolean()) { @@ -631,6 +632,7 @@ public void testIndex() throws IOException { } else { setRandomVersion(indexRequest, expectedParams); setRandomVersionType(indexRequest::versionType, expectedParams); + setRandomIfSeqNoAndTerm(indexRequest, expectedParams); } if (frequently()) { @@ -768,6 +770,7 @@ public void testUpdate() throws IOException { setRandomWaitForActiveShards(updateRequest::waitForActiveShards, expectedParams); setRandomVersion(updateRequest, expectedParams); setRandomVersionType(updateRequest::versionType, expectedParams); + setRandomIfSeqNoAndTerm(updateRequest, new HashMap<>()); // if* params are passed in the body if (randomBoolean()) { int retryOnConflict = randomIntBetween(0, 5); updateRequest.retryOnConflict(retryOnConflict); @@ -798,6 +801,7 @@ public void testUpdate() throws IOException { assertEquals(updateRequest.docAsUpsert(), parsedUpdateRequest.docAsUpsert()); assertEquals(updateRequest.detectNoop(), parsedUpdateRequest.detectNoop()); assertEquals(updateRequest.fetchSource(), parsedUpdateRequest.fetchSource()); + assertIfSeqNoAndTerm(updateRequest, parsedUpdateRequest); assertEquals(updateRequest.script(), parsedUpdateRequest.script()); if (updateRequest.doc() != null) { assertToXContentEquivalent(updateRequest.doc().source(), parsedUpdateRequest.doc().source(), xContentType); @@ -811,6 +815,22 @@ public void testUpdate() throws IOException { } } + private static void assertIfSeqNoAndTerm(DocWriteRequestrequest, DocWriteRequest parsedRequest) { + assertEquals(request.ifSeqNo(), parsedRequest.ifSeqNo()); + assertEquals(request.ifPrimaryTerm(), parsedRequest.ifPrimaryTerm()); + } + + private static void setRandomIfSeqNoAndTerm(DocWriteRequest request, Map expectedParams) { + if (randomBoolean()) { + final long seqNo = randomNonNegativeLong(); + request.setIfSeqNo(seqNo); + expectedParams.put("if_seq_no", Long.toString(seqNo)); + final long primaryTerm = randomLongBetween(1, 200); + request.setIfPrimaryTerm(primaryTerm); + expectedParams.put("if_primary_term", Long.toString(primaryTerm)); + } + } + public void testUpdateWithType() throws IOException { String index = randomAlphaOfLengthBetween(3, 10); String type = randomAlphaOfLengthBetween(3, 10); @@ -892,10 +912,15 @@ public void testBulk() throws IOException { docWriteRequest.routing(randomAlphaOfLength(10)); } if (randomBoolean()) { - docWriteRequest.version(randomNonNegativeLong()); - } - if (randomBoolean()) { - docWriteRequest.versionType(randomFrom(VersionType.values())); + if (randomBoolean()) { + docWriteRequest.version(randomNonNegativeLong()); + } + if (randomBoolean()) { + docWriteRequest.versionType(randomFrom(VersionType.values())); + } + } else if (randomBoolean()) { + docWriteRequest.setIfSeqNo(randomNonNegativeLong()); + docWriteRequest.setIfPrimaryTerm(randomLongBetween(1, 200)); } bulkRequest.add(docWriteRequest); } @@ -925,6 +950,8 @@ public void testBulk() throws IOException { assertEquals(originalRequest.routing(), parsedRequest.routing()); assertEquals(originalRequest.version(), parsedRequest.version()); assertEquals(originalRequest.versionType(), parsedRequest.versionType()); + assertEquals(originalRequest.ifSeqNo(), parsedRequest.ifSeqNo()); + assertEquals(originalRequest.ifPrimaryTerm(), parsedRequest.ifPrimaryTerm()); DocWriteRequest.OpType opType = originalRequest.opType(); if (opType == DocWriteRequest.OpType.INDEX) { diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/80_cas.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/80_cas.yml new file mode 100644 index 0000000000000..902621cfba578 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/80_cas.yml @@ -0,0 +1,42 @@ +--- +"Compare And Swap Sequence Numbers": + + - skip: + version: " - 6.99.99" + reason: typeless API are add in 7.0.0 + + - do: + index: + index: test_1 + id: 1 + body: { foo: bar } + - match: { _version: 1} + - set: { _seq_no: seqno } + - set: { _primary_term: primary_term } + + - do: + bulk: + body: + - index: + _index: test_1 + _id: 1 + if_seq_no: 10000 + if_primary_term: $primary_term + - foo: bar2 + + - match: { errors: true } + - match: { items.0.index.status: 409 } + - match: { items.0.index.error.type: version_conflict_engine_exception } + + - do: + bulk: + body: + - index: + _index: test_1 + _id: 1 + if_seq_no: $seqno + if_primary_term: $primary_term + - foo: bar2 + + - match: { errors: false} + - match: { items.0.index.status: 200 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/81_cas_with_types.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/81_cas_with_types.yml new file mode 100644 index 0000000000000..101316e7bf504 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/81_cas_with_types.yml @@ -0,0 +1,45 @@ +--- +"Compare And Swap Sequence Numbers": + + - skip: + version: " - 6.6.99" + reason: cas operations with sequence numbers was added in 6.7 + + - do: + index: + index: test_1 + type: _doc + id: 1 + body: { foo: bar } + - match: { _version: 1} + - set: { _seq_no: seqno } + - set: { _primary_term: primary_term } + + - do: + bulk: + body: + - index: + _index: test_1 + _type: _doc + _id: 1 + if_seq_no: 10000 + if_primary_term: $primary_term + - foo: bar2 + + - match: { errors: true } + - match: { items.0.index.status: 409 } + - match: { items.0.index.error.type: version_conflict_engine_exception } + + - do: + bulk: + body: + - index: + _index: test_1 + _type: _doc + _id: 1 + if_seq_no: $seqno + if_primary_term: $primary_term + - foo: bar2 + + - match: { errors: false} + - match: { items.0.index.status: 200 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/index/30_cas.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/index/30_cas.yml index a43ec1437a50b..550582e9816eb 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/index/30_cas.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/index/30_cas.yml @@ -3,7 +3,7 @@ - skip: version: " - 6.99.99" - reason: cas ops are introduced in 7.0.0 + reason: typesless api was introduces in 7.0 - do: index: