diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java index ebd0d17215ecc..ecba2953c9499 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java @@ -28,19 +28,26 @@ import org.apache.http.entity.ContentType; import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -77,6 +84,127 @@ static Request ping() { return new Request("HEAD", "/", Collections.emptyMap(), null); } + static Request bulk(BulkRequest bulkRequest) throws IOException { + Params parameters = Params.builder(); + parameters.withTimeout(bulkRequest.timeout()); + parameters.withRefreshPolicy(bulkRequest.getRefreshPolicy()); + + // Bulk API only supports newline delimited JSON or Smile. Before executing + // the bulk, we need to check that all requests have the same content-type + // and this content-type is supported by the Bulk API. + XContentType bulkContentType = null; + for (int i = 0; i < bulkRequest.numberOfActions(); i++) { + DocWriteRequest request = bulkRequest.requests().get(i); + + DocWriteRequest.OpType opType = request.opType(); + if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) { + bulkContentType = enforceSameContentType((IndexRequest) request, bulkContentType); + + } else if (opType == DocWriteRequest.OpType.UPDATE) { + UpdateRequest updateRequest = (UpdateRequest) request; + if (updateRequest.doc() != null) { + bulkContentType = enforceSameContentType(updateRequest.doc(), bulkContentType); + } + if (updateRequest.upsertRequest() != null) { + bulkContentType = enforceSameContentType(updateRequest.upsertRequest(), bulkContentType); + } + } + } + + if (bulkContentType == null) { + bulkContentType = XContentType.JSON; + } + + byte separator = bulkContentType.xContent().streamSeparator(); + ContentType requestContentType = ContentType.create(bulkContentType.mediaType()); + + ByteArrayOutputStream content = new ByteArrayOutputStream(); + for (DocWriteRequest request : bulkRequest.requests()) { + DocWriteRequest.OpType opType = request.opType(); + + try (XContentBuilder metadata = XContentBuilder.builder(bulkContentType.xContent())) { + metadata.startObject(); + { + metadata.startObject(opType.getLowercase()); + if (Strings.hasLength(request.index())) { + metadata.field("_index", request.index()); + } + if (Strings.hasLength(request.type())) { + metadata.field("_type", request.type()); + } + if (Strings.hasLength(request.id())) { + metadata.field("_id", request.id()); + } + if (Strings.hasLength(request.routing())) { + metadata.field("_routing", request.routing()); + } + if (Strings.hasLength(request.parent())) { + metadata.field("_parent", request.parent()); + } + if (request.version() != Versions.MATCH_ANY) { + metadata.field("_version", request.version()); + } + + VersionType versionType = request.versionType(); + if (versionType != VersionType.INTERNAL) { + if (versionType == VersionType.EXTERNAL) { + metadata.field("_version_type", "external"); + } else if (versionType == VersionType.EXTERNAL_GTE) { + metadata.field("_version_type", "external_gte"); + } else if (versionType == VersionType.FORCE) { + metadata.field("_version_type", "force"); + } + } + + if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) { + IndexRequest indexRequest = (IndexRequest) request; + if (Strings.hasLength(indexRequest.getPipeline())) { + metadata.field("pipeline", indexRequest.getPipeline()); + } + } else if (opType == DocWriteRequest.OpType.UPDATE) { + UpdateRequest updateRequest = (UpdateRequest) request; + if (updateRequest.retryOnConflict() > 0) { + metadata.field("_retry_on_conflict", updateRequest.retryOnConflict()); + } + if (updateRequest.fetchSource() != null) { + metadata.field("_source", updateRequest.fetchSource()); + } + } + metadata.endObject(); + } + metadata.endObject(); + + BytesRef metadataSource = metadata.bytes().toBytesRef(); + content.write(metadataSource.bytes, metadataSource.offset, metadataSource.length); + content.write(separator); + } + + BytesRef source = null; + if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) { + IndexRequest indexRequest = (IndexRequest) request; + BytesReference indexSource = indexRequest.source(); + XContentType indexXContentType = indexRequest.getContentType(); + + try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, indexSource, indexXContentType)) { + try (XContentBuilder builder = XContentBuilder.builder(bulkContentType.xContent())) { + builder.copyCurrentStructure(parser); + source = builder.bytes().toBytesRef(); + } + } + } else if (opType == DocWriteRequest.OpType.UPDATE) { + source = XContentHelper.toXContent((UpdateRequest) request, bulkContentType, false).toBytesRef(); + } + + if (source != null) { + content.write(source.bytes, source.offset, source.length); + content.write(separator); + } + } + + HttpEntity entity = new ByteArrayEntity(content.toByteArray(), 0, content.size(), requestContentType); + return new Request(HttpPost.METHOD_NAME, "/_bulk", parameters.getParams(), entity); + } + static Request exists(GetRequest getRequest) { Request request = get(getRequest); return new Request(HttpHead.METHOD_NAME, request.endpoint, request.params, null); @@ -312,4 +440,26 @@ static Params builder() { return new Params(); } } + + /** + * Ensure that the {@link IndexRequest}'s content type is supported by the Bulk API and that it conforms + * to the current {@link BulkRequest}'s content type (if it's known at the time of this method get called). + * + * @return the {@link IndexRequest}'s content type + */ + static XContentType enforceSameContentType(IndexRequest indexRequest, @Nullable XContentType xContentType) { + XContentType requestContentType = indexRequest.getContentType(); + if (requestContentType != XContentType.JSON && requestContentType != XContentType.SMILE) { + throw new IllegalArgumentException("Unsupported content-type found for request with content-type [" + requestContentType + + "], only JSON and SMILE are supported"); + } + if (xContentType == null) { + return requestContentType; + } + if (requestContentType != xContentType) { + throw new IllegalArgumentException("Mismatching content-type found for request with content-type [" + requestContentType + + "], previous requests have content-type [" + xContentType + "]"); + } + return xContentType; + } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index 26d3a4e7e87cc..e174e2fffe6c0 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -26,6 +26,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; @@ -59,6 +61,24 @@ public RestHighLevelClient(RestClient client) { this.client = Objects.requireNonNull(client); } + /** + * Executes a bulk request using the Bulk API + * + * See Bulk API on elastic.co + */ + public BulkResponse bulk(BulkRequest bulkRequest, Header... headers) throws IOException { + return performRequestAndParseEntity(bulkRequest, Request::bulk, BulkResponse::fromXContent, emptySet(), headers); + } + + /** + * Asynchronously executes a bulk request using the Bulk API + * + * See Bulk API on elastic.co + */ + public void bulkAsync(BulkRequest bulkRequest, ActionListener listener, Header... headers) { + performRequestAsyncAndParseEntity(bulkRequest, Request::bulk, BulkResponse::fromXContent, listener, emptySet(), headers); + } + /** * Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise */ diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index 8c9689e04122c..4686a23b86851 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -25,6 +25,10 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; @@ -32,6 +36,7 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; @@ -440,4 +445,80 @@ public void testUpdate() throws IOException { exception.getMessage()); } } + + public void testBulk() throws IOException { + int nbItems = randomIntBetween(10, 100); + boolean[] errors = new boolean[nbItems]; + + XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE); + + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < nbItems; i++) { + String id = String.valueOf(i); + boolean erroneous = randomBoolean(); + errors[i] = erroneous; + + DocWriteRequest.OpType opType = randomFrom(DocWriteRequest.OpType.values()); + if (opType == DocWriteRequest.OpType.DELETE) { + if (erroneous == false) { + assertEquals(RestStatus.CREATED, + highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status()); + } + DeleteRequest deleteRequest = new DeleteRequest("index", "test", id); + bulkRequest.add(deleteRequest); + + } else { + BytesReference source = XContentBuilder.builder(xContentType.xContent()).startObject().field("id", i).endObject().bytes(); + if (opType == DocWriteRequest.OpType.INDEX) { + IndexRequest indexRequest = new IndexRequest("index", "test", id).source(source, xContentType); + if (erroneous) { + indexRequest.version(12L); + } + bulkRequest.add(indexRequest); + + } else if (opType == DocWriteRequest.OpType.CREATE) { + IndexRequest createRequest = new IndexRequest("index", "test", id).source(source, xContentType).create(true); + if (erroneous) { + assertEquals(RestStatus.CREATED, highLevelClient().index(createRequest).status()); + } + bulkRequest.add(createRequest); + + } else if (opType == DocWriteRequest.OpType.UPDATE) { + UpdateRequest updateRequest = new UpdateRequest("index", "test", id) + .doc(new IndexRequest().source(source, xContentType)); + if (erroneous == false) { + assertEquals(RestStatus.CREATED, + highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status()); + } + bulkRequest.add(updateRequest); + } + } + } + + BulkResponse bulkResponse = execute(bulkRequest, highLevelClient()::bulk, highLevelClient()::bulkAsync); + assertEquals(RestStatus.OK, bulkResponse.status()); + assertTrue(bulkResponse.getTookInMillis() > 0); + assertEquals(nbItems, bulkResponse.getItems().length); + + for (int i = 0; i < nbItems; i++) { + BulkItemResponse bulkItemResponse = bulkResponse.getItems()[i]; + + assertEquals(i, bulkItemResponse.getItemId()); + assertEquals("index", bulkItemResponse.getIndex()); + assertEquals("test", bulkItemResponse.getType()); + assertEquals(String.valueOf(i), bulkItemResponse.getId()); + + DocWriteRequest.OpType requestOpType = bulkRequest.requests().get(i).opType(); + if (requestOpType == DocWriteRequest.OpType.INDEX || requestOpType == DocWriteRequest.OpType.CREATE) { + assertEquals(errors[i], bulkItemResponse.isFailed()); + assertEquals(errors[i] ? RestStatus.INTERNAL_SERVER_ERROR : RestStatus.CREATED, bulkItemResponse.status()); + } else if (requestOpType == DocWriteRequest.OpType.UPDATE) { + assertEquals(errors[i], bulkItemResponse.isFailed()); + assertEquals(errors[i] ? RestStatus.INTERNAL_SERVER_ERROR : RestStatus.OK, bulkItemResponse.status()); + } else if (requestOpType == DocWriteRequest.OpType.DELETE) { + assertFalse(bulkItemResponse.isFailed()); + assertEquals(errors[i] ? RestStatus.NOT_FOUND : RestStatus.OK, bulkItemResponse.status()); + } + } + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java index fb1a738873218..1d61ef87c485e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java @@ -22,6 +22,9 @@ import org.apache.http.HttpEntity; import org.apache.http.entity.ByteArrayEntity; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkShardRequest; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; @@ -29,6 +32,7 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -40,13 +44,15 @@ import org.elasticsearch.test.RandomObjects; import java.io.IOException; -import java.util.Collections; +import java.io.InputStream; import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.function.Consumer; import java.util.function.Function; +import static java.util.Collections.singletonMap; +import static org.elasticsearch.client.Request.enforceSameContentType; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent; public class RequestTests extends ESTestCase { @@ -361,14 +367,207 @@ public void testUpdate() throws IOException { public void testUpdateWithDifferentContentTypes() throws IOException { IllegalStateException exception = expectThrows(IllegalStateException.class, () -> { UpdateRequest updateRequest = new UpdateRequest(); - updateRequest.doc(new IndexRequest().source(Collections.singletonMap("field", "doc"), XContentType.JSON)); - updateRequest.upsert(new IndexRequest().source(Collections.singletonMap("field", "upsert"), XContentType.YAML)); + updateRequest.doc(new IndexRequest().source(singletonMap("field", "doc"), XContentType.JSON)); + updateRequest.upsert(new IndexRequest().source(singletonMap("field", "upsert"), XContentType.YAML)); Request.update(updateRequest); }); assertEquals("Update request cannot have different content types for doc [JSON] and upsert [YAML] documents", exception.getMessage()); } + public void testBulk() throws IOException { + Map expectedParams = new HashMap<>(); + + BulkRequest bulkRequest = new BulkRequest(); + if (randomBoolean()) { + String timeout = randomTimeValue(); + bulkRequest.timeout(timeout); + expectedParams.put("timeout", timeout); + } else { + expectedParams.put("timeout", BulkShardRequest.DEFAULT_TIMEOUT.getStringRep()); + } + + if (randomBoolean()) { + WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values()); + bulkRequest.setRefreshPolicy(refreshPolicy); + if (refreshPolicy != WriteRequest.RefreshPolicy.NONE) { + expectedParams.put("refresh", refreshPolicy.getValue()); + } + } + + XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE); + + int nbItems = randomIntBetween(10, 100); + for (int i = 0; i < nbItems; i++) { + String index = randomAsciiOfLength(5); + String type = randomAsciiOfLength(5); + String id = randomAsciiOfLength(5); + + BytesReference source = RandomObjects.randomSource(random(), xContentType); + DocWriteRequest.OpType opType = randomFrom(DocWriteRequest.OpType.values()); + + DocWriteRequest docWriteRequest = null; + if (opType == DocWriteRequest.OpType.INDEX) { + IndexRequest indexRequest = new IndexRequest(index, type, id).source(source, xContentType); + docWriteRequest = indexRequest; + if (randomBoolean()) { + indexRequest.setPipeline(randomAsciiOfLength(5)); + } + if (randomBoolean()) { + indexRequest.parent(randomAsciiOfLength(5)); + } + } else if (opType == DocWriteRequest.OpType.CREATE) { + IndexRequest createRequest = new IndexRequest(index, type, id).source(source, xContentType).create(true); + docWriteRequest = createRequest; + if (randomBoolean()) { + createRequest.parent(randomAsciiOfLength(5)); + } + } else if (opType == DocWriteRequest.OpType.UPDATE) { + final UpdateRequest updateRequest = new UpdateRequest(index, type, id).doc(new IndexRequest().source(source, xContentType)); + docWriteRequest = updateRequest; + if (randomBoolean()) { + updateRequest.retryOnConflict(randomIntBetween(1, 5)); + } + if (randomBoolean()) { + randomizeFetchSourceContextParams(updateRequest::fetchSource, new HashMap<>()); + } + if (randomBoolean()) { + updateRequest.parent(randomAsciiOfLength(5)); + } + } else if (opType == DocWriteRequest.OpType.DELETE) { + docWriteRequest = new DeleteRequest(index, type, id); + } + + if (randomBoolean()) { + docWriteRequest.routing(randomAsciiOfLength(10)); + } + if (randomBoolean()) { + docWriteRequest.version(randomNonNegativeLong()); + } + if (randomBoolean()) { + docWriteRequest.versionType(randomFrom(VersionType.values())); + } + bulkRequest.add(docWriteRequest); + } + + Request request = Request.bulk(bulkRequest); + assertEquals("/_bulk", request.endpoint); + assertEquals(expectedParams, request.params); + assertEquals("POST", request.method); + + byte[] content = new byte[(int) request.entity.getContentLength()]; + try (InputStream inputStream = request.entity.getContent()) { + Streams.readFully(inputStream, content); + } + + BulkRequest parsedBulkRequest = new BulkRequest(); + parsedBulkRequest.add(content, 0, content.length, xContentType); + assertEquals(bulkRequest.numberOfActions(), parsedBulkRequest.numberOfActions()); + + for (int i = 0; i < bulkRequest.numberOfActions(); i++) { + DocWriteRequest originalRequest = bulkRequest.requests().get(i); + DocWriteRequest parsedRequest = parsedBulkRequest.requests().get(i); + + assertEquals(originalRequest.opType(), parsedRequest.opType()); + assertEquals(originalRequest.index(), parsedRequest.index()); + assertEquals(originalRequest.type(), parsedRequest.type()); + assertEquals(originalRequest.id(), parsedRequest.id()); + assertEquals(originalRequest.routing(), parsedRequest.routing()); + assertEquals(originalRequest.parent(), parsedRequest.parent()); + assertEquals(originalRequest.version(), parsedRequest.version()); + assertEquals(originalRequest.versionType(), parsedRequest.versionType()); + + DocWriteRequest.OpType opType = originalRequest.opType(); + if (opType == DocWriteRequest.OpType.INDEX) { + IndexRequest indexRequest = (IndexRequest) originalRequest; + IndexRequest parsedIndexRequest = (IndexRequest) parsedRequest; + + assertEquals(indexRequest.getPipeline(), parsedIndexRequest.getPipeline()); + assertToXContentEquivalent(indexRequest.source(), parsedIndexRequest.source(), xContentType); + } else if (opType == DocWriteRequest.OpType.UPDATE) { + UpdateRequest updateRequest = (UpdateRequest) originalRequest; + UpdateRequest parsedUpdateRequest = (UpdateRequest) parsedRequest; + + assertEquals(updateRequest.retryOnConflict(), parsedUpdateRequest.retryOnConflict()); + assertEquals(updateRequest.fetchSource(), parsedUpdateRequest.fetchSource()); + if (updateRequest.doc() != null) { + assertToXContentEquivalent(updateRequest.doc().source(), parsedUpdateRequest.doc().source(), xContentType); + } else { + assertNull(parsedUpdateRequest.doc()); + } + } + } + } + + public void testBulkWithDifferentContentTypes() throws IOException { + { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new DeleteRequest("index", "type", "0")); + bulkRequest.add(new UpdateRequest("index", "type", "1").script(new Script("test"))); + bulkRequest.add(new DeleteRequest("index", "type", "2")); + + Request request = Request.bulk(bulkRequest); + assertEquals(XContentType.JSON.mediaType(), request.entity.getContentType().getValue()); + } + { + XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE); + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new DeleteRequest("index", "type", "0")); + bulkRequest.add(new IndexRequest("index", "type", "0").source(singletonMap("field", "value"), xContentType)); + bulkRequest.add(new DeleteRequest("index", "type", "2")); + + Request request = Request.bulk(bulkRequest); + assertEquals(xContentType.mediaType(), request.entity.getContentType().getValue()); + } + { + XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE); + UpdateRequest updateRequest = new UpdateRequest("index", "type", "0"); + if (randomBoolean()) { + updateRequest.doc(new IndexRequest().source(singletonMap("field", "value"), xContentType)); + } else { + updateRequest.upsert(new IndexRequest().source(singletonMap("field", "value"), xContentType)); + } + + Request request = Request.bulk(new BulkRequest().add(updateRequest)); + assertEquals(xContentType.mediaType(), request.entity.getContentType().getValue()); + } + { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest("index", "type", "0").source(singletonMap("field", "value"), XContentType.SMILE)); + bulkRequest.add(new IndexRequest("index", "type", "1").source(singletonMap("field", "value"), XContentType.JSON)); + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> Request.bulk(bulkRequest)); + assertEquals("Mismatching content-type found for request with content-type [JSON], " + + "previous requests have content-type [SMILE]", exception.getMessage()); + } + { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest("index", "type", "0") + .source(singletonMap("field", "value"), XContentType.JSON)); + bulkRequest.add(new IndexRequest("index", "type", "1") + .source(singletonMap("field", "value"), XContentType.JSON)); + bulkRequest.add(new UpdateRequest("index", "type", "2") + .doc(new IndexRequest().source(singletonMap("field", "value"), XContentType.JSON)) + .upsert(new IndexRequest().source(singletonMap("field", "value"), XContentType.SMILE)) + ); + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> Request.bulk(bulkRequest)); + assertEquals("Mismatching content-type found for request with content-type [SMILE], " + + "previous requests have content-type [JSON]", exception.getMessage()); + } + { + XContentType xContentType = randomFrom(XContentType.CBOR, XContentType.YAML); + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new DeleteRequest("index", "type", "0")); + bulkRequest.add(new IndexRequest("index", "type", "1").source(singletonMap("field", "value"), XContentType.JSON)); + bulkRequest.add(new DeleteRequest("index", "type", "2")); + bulkRequest.add(new DeleteRequest("index", "type", "3")); + bulkRequest.add(new IndexRequest("index", "type", "4").source(singletonMap("field", "value"), XContentType.JSON)); + bulkRequest.add(new IndexRequest("index", "type", "1").source(singletonMap("field", "value"), xContentType)); + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> Request.bulk(bulkRequest)); + assertEquals("Unsupported content-type found for request with content-type [" + xContentType + + "], only JSON and SMILE are supported", exception.getMessage()); + } + } + public void testParams() { final int nbParams = randomIntBetween(0, 10); Request.Params params = Request.Params.builder(); @@ -404,6 +603,33 @@ public void testEndpoint() { assertEquals("/a/b", Request.endpoint("a", "b")); assertEquals("/a/b/_create", Request.endpoint("a", "b", "_create")); assertEquals("/a/b/c/_create", Request.endpoint("a", "b", "c", "_create")); + assertEquals("/a/_create", Request.endpoint("a", null, null, "_create")); + } + + public void testEnforceSameContentType() { + XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE); + IndexRequest indexRequest = new IndexRequest().source(singletonMap("field", "value"), xContentType); + assertEquals(xContentType, enforceSameContentType(indexRequest, null)); + assertEquals(xContentType, enforceSameContentType(indexRequest, xContentType)); + + XContentType bulkContentType = randomBoolean() ? xContentType : null; + + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> + enforceSameContentType(new IndexRequest().source(singletonMap("field", "value"), XContentType.CBOR), bulkContentType)); + assertEquals("Unsupported content-type found for request with content-type [CBOR], only JSON and SMILE are supported", + exception.getMessage()); + + exception = expectThrows(IllegalArgumentException.class, () -> + enforceSameContentType(new IndexRequest().source(singletonMap("field", "value"), XContentType.YAML), bulkContentType)); + assertEquals("Unsupported content-type found for request with content-type [YAML], only JSON and SMILE are supported", + exception.getMessage()); + + XContentType requestContentType = xContentType == XContentType.JSON ? XContentType.SMILE : XContentType.JSON; + + exception = expectThrows(IllegalArgumentException.class, () -> + enforceSameContentType(new IndexRequest().source(singletonMap("field", "value"), requestContentType), xContentType)); + assertEquals("Mismatching content-type found for request with content-type [" + requestContentType + "], " + + "previous requests have content-type [" + xContentType + "]", exception.getMessage()); } /**