From 3104453ee4174da9d0248d9cb1fd0a90002cb1b6 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 22 Feb 2017 21:12:36 +0100 Subject: [PATCH 1/4] Add BulkRequest support to High Level Rest client This commit adds support for BulkRequest execution in the High Level Rest client. --- .../org/elasticsearch/client/Request.java | 103 +++++++++++++ .../client/RestHighLevelClient.java | 20 +++ .../java/org/elasticsearch/client/CrudIT.java | 79 ++++++++++ .../elasticsearch/client/RequestTests.java | 143 ++++++++++++++++++ 4 files changed, 345 insertions(+) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java index ebd0d17215ecc..b080e56047a83 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java @@ -28,19 +28,25 @@ import org.apache.http.entity.ContentType; import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -77,6 +83,103 @@ static Request ping() { return new Request("HEAD", "/", Collections.emptyMap(), null); } + static Request bulk(BulkRequest bulkRequest) throws IOException { + Params parameters = Params.builder(); + parameters.withTimeout(bulkRequest.timeout()); + parameters.withRefreshPolicy(bulkRequest.getRefreshPolicy()); + + // Bulk API only supports newline delimited JSON + XContentType xContentType = XContentType.JSON; + byte separator = xContentType.xContent().streamSeparator(); + ContentType requestContentType = ContentType.create("application/x-ndjson"); + + ByteArrayOutputStream content = new ByteArrayOutputStream(); + for (DocWriteRequest request : bulkRequest.requests()) { + DocWriteRequest.OpType opType = request.opType(); + + try (XContentBuilder metadata = XContentBuilder.builder(xContentType.xContent())) { + metadata.startObject(); + { + metadata.startObject(opType.getLowercase()); + if (Strings.hasLength(request.index())) { + metadata.field("_index", request.index()); + } + if (Strings.hasLength(request.type())) { + metadata.field("_type", request.type()); + } + if (Strings.hasLength(request.id())) { + metadata.field("_id", request.id()); + } + if (Strings.hasLength(request.routing())) { + metadata.field("_routing", request.routing()); + } + if (Strings.hasLength(request.parent())) { + metadata.field("_parent", request.parent()); + } + if (request.version() != Versions.MATCH_ANY) { + metadata.field("_version", request.version()); + } + + VersionType versionType = request.versionType(); + if (versionType != VersionType.INTERNAL) { + if (versionType == VersionType.EXTERNAL) { + metadata.field("_version_type", "external"); + } else if (versionType == VersionType.EXTERNAL_GTE) { + metadata.field("_version_type", "external_gte"); + } else if (versionType == VersionType.FORCE) { + metadata.field("_version_type", "force"); + } + } + + if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) { + IndexRequest indexRequest = (IndexRequest) request; + if (Strings.hasLength(indexRequest.getPipeline())) { + metadata.field("pipeline", indexRequest.getPipeline()); + } + } else if (opType == DocWriteRequest.OpType.UPDATE) { + UpdateRequest updateRequest = (UpdateRequest) request; + if (updateRequest.retryOnConflict() > 0) { + metadata.field("_retry_on_conflict", updateRequest.retryOnConflict()); + } + if (updateRequest.fetchSource() != null) { + metadata.field("_source", updateRequest.fetchSource()); + } + } + metadata.endObject(); + } + metadata.endObject(); + + BytesRef metadataSource = metadata.bytes().toBytesRef(); + content.write(metadataSource.bytes, metadataSource.offset, metadataSource.length); + content.write(separator); + } + + BytesRef source = null; + if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) { + IndexRequest indexRequest = (IndexRequest) request; + BytesReference indexSource = indexRequest.source(); + XContentType indexXContentType = indexRequest.getContentType(); + + try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, indexSource, indexXContentType)) { + try (XContentBuilder builder = XContentBuilder.builder(xContentType.xContent())) { + builder.copyCurrentStructure(parser); + source = builder.bytes().toBytesRef(); + } + } + } else if (opType == DocWriteRequest.OpType.UPDATE) { + source = XContentHelper.toXContent((UpdateRequest) request, xContentType, false).toBytesRef(); + } + + if (source != null) { + content.write(source.bytes, source.offset, source.length); + content.write(separator); + } + } + + HttpEntity entity = new ByteArrayEntity(content.toByteArray(), 0, content.size(), requestContentType); + return new Request(HttpPost.METHOD_NAME, "/_bulk", parameters.getParams(), entity); + } + static Request exists(GetRequest getRequest) { Request request = get(getRequest); return new Request(HttpHead.METHOD_NAME, request.endpoint, request.params, null); diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index 26d3a4e7e87cc..e174e2fffe6c0 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -26,6 +26,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; @@ -59,6 +61,24 @@ public RestHighLevelClient(RestClient client) { this.client = Objects.requireNonNull(client); } + /** + * Executes a bulk request using the Bulk API + * + * See Bulk API on elastic.co + */ + public BulkResponse bulk(BulkRequest bulkRequest, Header... headers) throws IOException { + return performRequestAndParseEntity(bulkRequest, Request::bulk, BulkResponse::fromXContent, emptySet(), headers); + } + + /** + * Asynchronously executes a bulk request using the Bulk API + * + * See Bulk API on elastic.co + */ + public void bulkAsync(BulkRequest bulkRequest, ActionListener listener, Header... headers) { + performRequestAsyncAndParseEntity(bulkRequest, Request::bulk, BulkResponse::fromXContent, listener, emptySet(), headers); + } + /** * Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise */ diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index 8c9689e04122c..e00d8316640a6 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -25,6 +25,10 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; @@ -32,6 +36,7 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; @@ -440,4 +445,78 @@ public void testUpdate() throws IOException { exception.getMessage()); } } + + public void testBulk() throws IOException { + int nbItems = randomIntBetween(10, 100); + boolean[] errors = new boolean[nbItems]; + + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < nbItems; i++) { + String id = String.valueOf(i); + boolean erroneous = randomBoolean(); + errors[i] = erroneous; + + DocWriteRequest.OpType opType = randomFrom(DocWriteRequest.OpType.values()); + if (opType == DocWriteRequest.OpType.DELETE) { + if (erroneous == false) { + assertEquals(RestStatus.CREATED, + highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status()); + } + DeleteRequest deleteRequest = new DeleteRequest("index", "test", id); + bulkRequest.add(deleteRequest); + + } else { + XContentType xContentType = randomFrom(XContentType.values()); + BytesReference source = XContentBuilder.builder(xContentType.xContent()).startObject().field("id", i).endObject().bytes(); + if (opType == DocWriteRequest.OpType.INDEX) { + IndexRequest indexRequest = new IndexRequest("index", "test", id).source(source, xContentType); + if (erroneous) { + indexRequest.version(12L); + } + bulkRequest.add(indexRequest); + + } else if (opType == DocWriteRequest.OpType.CREATE) { + IndexRequest createRequest = new IndexRequest("index", "test", id).source(source, xContentType).create(true); + if (erroneous) { + assertEquals(RestStatus.CREATED, highLevelClient().index(createRequest).status()); + } + bulkRequest.add(createRequest); + + } else if (opType == DocWriteRequest.OpType.UPDATE) { + UpdateRequest updateRequest = new UpdateRequest("index", "test", id).doc(source, xContentType); + if (erroneous == false) { + assertEquals(RestStatus.CREATED, + highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status()); + } + bulkRequest.add(updateRequest); + } + } + } + + BulkResponse bulkResponse = execute(bulkRequest, highLevelClient()::bulk, highLevelClient()::bulkAsync); + assertEquals(RestStatus.OK, bulkResponse.status()); + assertTrue(bulkResponse.getTookInMillis() > 0); + assertEquals(nbItems, bulkResponse.getItems().length); + + for (int i = 0; i < nbItems; i++) { + BulkItemResponse bulkItemResponse = bulkResponse.getItems()[i]; + + assertEquals(i, bulkItemResponse.getItemId()); + assertEquals("index", bulkItemResponse.getIndex()); + assertEquals("test", bulkItemResponse.getType()); + assertEquals(String.valueOf(i), bulkItemResponse.getId()); + + DocWriteRequest.OpType requestOpType = bulkRequest.requests().get(i).opType(); + if (requestOpType == DocWriteRequest.OpType.INDEX || requestOpType == DocWriteRequest.OpType.CREATE) { + assertEquals(errors[i], bulkItemResponse.isFailed()); + assertEquals(errors[i] ? RestStatus.INTERNAL_SERVER_ERROR : RestStatus.CREATED, bulkItemResponse.status()); + } else if (requestOpType == DocWriteRequest.OpType.UPDATE) { + assertEquals(errors[i], bulkItemResponse.isFailed()); + assertEquals(errors[i] ? RestStatus.INTERNAL_SERVER_ERROR : RestStatus.OK, bulkItemResponse.status()); + } else if (requestOpType == DocWriteRequest.OpType.DELETE) { + assertFalse(bulkItemResponse.isFailed()); + assertEquals(errors[i] ? RestStatus.NOT_FOUND : RestStatus.OK, bulkItemResponse.status()); + } + } + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java index fb1a738873218..f42d677cab26c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java @@ -22,6 +22,9 @@ import org.apache.http.HttpEntity; import org.apache.http.entity.ByteArrayEntity; import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkShardRequest; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; @@ -29,7 +32,9 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; @@ -40,6 +45,7 @@ import org.elasticsearch.test.RandomObjects; import java.io.IOException; +import java.io.InputStream; import java.util.Collections; import java.util.HashMap; import java.util.Locale; @@ -369,6 +375,142 @@ public void testUpdateWithDifferentContentTypes() throws IOException { exception.getMessage()); } + public void testBulk() throws IOException { + Map expectedParams = new HashMap<>(); + + BulkRequest bulkRequest = new BulkRequest(); + if (randomBoolean()) { + String timeout = randomTimeValue(); + bulkRequest.timeout(timeout); + expectedParams.put("timeout", timeout); + } else { + expectedParams.put("timeout", BulkShardRequest.DEFAULT_TIMEOUT.getStringRep()); + } + + if (randomBoolean()) { + WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values()); + bulkRequest.setRefreshPolicy(refreshPolicy); + if (refreshPolicy != WriteRequest.RefreshPolicy.NONE) { + expectedParams.put("refresh", refreshPolicy.getValue()); + } + } + + int nbItems = randomIntBetween(10, 100); + for (int i = 0; i < nbItems; i++) { + String index = randomAsciiOfLength(5); + String type = randomAsciiOfLength(5); + String id = randomAsciiOfLength(5); + + XContentType xContentType = randomFrom(XContentType.values()); + BytesReference source = RandomObjects.randomSource(random(), xContentType); + DocWriteRequest.OpType opType = randomFrom(DocWriteRequest.OpType.values()); + + DocWriteRequest docWriteRequest = null; + if (opType == DocWriteRequest.OpType.INDEX) { + IndexRequest indexRequest = new IndexRequest(index, type, id).source(source, xContentType); + docWriteRequest = indexRequest; + if (randomBoolean()) { + indexRequest.setPipeline(randomAsciiOfLength(5)); + } + if (randomBoolean()) { + indexRequest.parent(randomAsciiOfLength(5)); + } + } else if (opType == DocWriteRequest.OpType.CREATE) { + IndexRequest createRequest = new IndexRequest(index, type, id).source(source, xContentType).create(true); + docWriteRequest = createRequest; + if (randomBoolean()) { + createRequest.parent(randomAsciiOfLength(5)); + } + } else if (opType == DocWriteRequest.OpType.UPDATE) { + final UpdateRequest updateRequest = new UpdateRequest(index, type, id).doc(source, xContentType); + docWriteRequest = updateRequest; + if (randomBoolean()) { + updateRequest.retryOnConflict(randomIntBetween(1, 5)); + } + if (randomBoolean()) { + randomizeFetchSourceContextParams(updateRequest::fetchSource, new HashMap<>()); + } + if (randomBoolean()) { + updateRequest.parent(randomAsciiOfLength(5)); + } + } else if (opType == DocWriteRequest.OpType.DELETE) { + docWriteRequest = new DeleteRequest(index, type, id); + } + + if (randomBoolean()) { + docWriteRequest.routing(randomAsciiOfLength(10)); + } + if (randomBoolean()) { + docWriteRequest.version(randomNonNegativeLong()); + } + if (randomBoolean()) { + docWriteRequest.versionType(randomFrom(VersionType.values())); + } + bulkRequest.add(docWriteRequest); + } + + Request request = Request.bulk(bulkRequest); + assertEquals("/_bulk", request.endpoint); + assertEquals(expectedParams, request.params); + assertEquals("POST", request.method); + + byte[] content = new byte[(int) request.entity.getContentLength()]; + try (InputStream inputStream = request.entity.getContent()) { + Streams.readFully(inputStream, content); + } + + BulkRequest parsedBulkRequest = new BulkRequest(); + parsedBulkRequest.add(content, 0, content.length, XContentType.JSON); + assertEquals(bulkRequest.numberOfActions(), parsedBulkRequest.numberOfActions()); + + for (int i = 0; i < bulkRequest.numberOfActions(); i++) { + DocWriteRequest originalRequest = bulkRequest.requests().get(i); + DocWriteRequest parsedRequest = parsedBulkRequest.requests().get(i); + + assertEquals(originalRequest.opType(), parsedRequest.opType()); + assertEquals(originalRequest.index(), parsedRequest.index()); + assertEquals(originalRequest.type(), parsedRequest.type()); + assertEquals(originalRequest.id(), parsedRequest.id()); + assertEquals(originalRequest.routing(), parsedRequest.routing()); + assertEquals(originalRequest.parent(), parsedRequest.parent()); + assertEquals(originalRequest.version(), parsedRequest.version()); + assertEquals(originalRequest.versionType(), parsedRequest.versionType()); + + DocWriteRequest.OpType opType = originalRequest.opType(); + if (opType == DocWriteRequest.OpType.INDEX) { + IndexRequest indexRequest = (IndexRequest) originalRequest; + IndexRequest parsedIndexRequest = (IndexRequest) parsedRequest; + + assertEquals(indexRequest.getPipeline(), parsedIndexRequest.getPipeline()); + } else if (opType == DocWriteRequest.OpType.UPDATE) { + UpdateRequest updateRequest = (UpdateRequest) originalRequest; + UpdateRequest parsedUpdateRequest = (UpdateRequest) parsedRequest; + + assertEquals(updateRequest.retryOnConflict(), parsedUpdateRequest.retryOnConflict()); + assertEquals(updateRequest.fetchSource(), parsedUpdateRequest.fetchSource()); + if (updateRequest.doc() != null) { + BytesReference originalBytes = updateRequest.doc().source(); + XContentType originalContentType = updateRequest.doc().getContentType(); + + BytesReference finalBytes = parsedUpdateRequest.doc().source(); + XContentType finalContentType = parsedUpdateRequest.doc().getContentType(); + + if (finalContentType != originalContentType) { + try (XContentParser parser = finalContentType.xContent().createParser(NamedXContentRegistry.EMPTY, finalBytes)) { + try (XContentBuilder builder = XContentBuilder.builder(originalContentType.xContent())) { + builder.copyCurrentStructure(parser); + finalBytes = builder.bytes(); + } + } + } + assertToXContentEquivalent(originalBytes, finalBytes, originalContentType); + } else { + assertNull(parsedUpdateRequest.doc()); + } + } + } + } + public void testParams() { final int nbParams = randomIntBetween(0, 10); Request.Params params = Request.Params.builder(); @@ -404,6 +546,7 @@ public void testEndpoint() { assertEquals("/a/b", Request.endpoint("a", "b")); assertEquals("/a/b/_create", Request.endpoint("a", "b", "_create")); assertEquals("/a/b/c/_create", Request.endpoint("a", "b", "c", "_create")); + assertEquals("/a/_create", Request.endpoint("a", null, null, "_create")); } /** From 121553ec458d6014b0b97fce0662a152987507a7 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 23 Feb 2017 11:21:39 +0100 Subject: [PATCH 2/4] Update after Luca review --- .../org/elasticsearch/client/Request.java | 76 +++++++++++-- .../java/org/elasticsearch/client/CrudIT.java | 6 +- .../elasticsearch/client/RequestTests.java | 101 ++++++++++++++---- 3 files changed, 152 insertions(+), 31 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java index b080e56047a83..977cd761d227b 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java @@ -48,8 +48,10 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.StringJoiner; @@ -88,16 +90,76 @@ static Request bulk(BulkRequest bulkRequest) throws IOException { parameters.withTimeout(bulkRequest.timeout()); parameters.withRefreshPolicy(bulkRequest.getRefreshPolicy()); - // Bulk API only supports newline delimited JSON - XContentType xContentType = XContentType.JSON; - byte separator = xContentType.xContent().streamSeparator(); - ContentType requestContentType = ContentType.create("application/x-ndjson"); + // Bulk API only supports newline delimited JSON or Smile. Before executing + // the bulk, we need to check that all requests have the same content-type + // and this content-type is supported by the Bulk API. + List allowedContentTypes = Arrays.asList(XContentType.JSON, XContentType.SMILE); + + XContentType bulkContentType = null; + for (int i = 0; i < bulkRequest.numberOfActions(); i++) { + DocWriteRequest request = bulkRequest.requests().get(i); + XContentType requestContentType = null; + + boolean supported = true; + boolean match = true; + + DocWriteRequest.OpType opType = request.opType(); + if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) { + IndexRequest indexRequest = (IndexRequest) request; + requestContentType = indexRequest.getContentType(); + + supported = allowedContentTypes.contains(requestContentType); + if (bulkContentType == null) { + bulkContentType = requestContentType; + } else { + match = (requestContentType == bulkContentType); + } + + } else if (opType == DocWriteRequest.OpType.UPDATE) { + UpdateRequest updateRequest = (UpdateRequest) request; + if (updateRequest.doc() != null) { + requestContentType = updateRequest.doc().getContentType(); + + supported = allowedContentTypes.contains(requestContentType); + if (bulkContentType == null) { + bulkContentType = requestContentType; + } else { + match = (requestContentType == bulkContentType); + } + } + if (updateRequest.upsertRequest() != null && supported && match) { + requestContentType = updateRequest.upsertRequest().getContentType(); + + supported = allowedContentTypes.contains(requestContentType); + if (bulkContentType == null) { + bulkContentType = requestContentType; + } else { + match = (requestContentType == bulkContentType); + } + } + } + + if (supported == false) { + throw new IllegalStateException("Unsupported content-type found for " + opType.getLowercase() + " request at [" + i + + "] with content-type [" + requestContentType + "], only " + allowedContentTypes + " are supported"); + } else if (match == false) { + throw new IllegalStateException("Mismatching content-type found for " + opType.getLowercase() + " request at [" + i + + "] with content-type [" + requestContentType + "], previous requests have content-type [" + bulkContentType + "]"); + } + } + + if (bulkContentType == null) { + bulkContentType = XContentType.JSON; + } + + byte separator = bulkContentType.xContent().streamSeparator(); + ContentType requestContentType = ContentType.create(bulkContentType.mediaType()); ByteArrayOutputStream content = new ByteArrayOutputStream(); for (DocWriteRequest request : bulkRequest.requests()) { DocWriteRequest.OpType opType = request.opType(); - try (XContentBuilder metadata = XContentBuilder.builder(xContentType.xContent())) { + try (XContentBuilder metadata = XContentBuilder.builder(bulkContentType.xContent())) { metadata.startObject(); { metadata.startObject(opType.getLowercase()); @@ -161,13 +223,13 @@ static Request bulk(BulkRequest bulkRequest) throws IOException { XContentType indexXContentType = indexRequest.getContentType(); try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, indexSource, indexXContentType)) { - try (XContentBuilder builder = XContentBuilder.builder(xContentType.xContent())) { + try (XContentBuilder builder = XContentBuilder.builder(bulkContentType.xContent())) { builder.copyCurrentStructure(parser); source = builder.bytes().toBytesRef(); } } } else if (opType == DocWriteRequest.OpType.UPDATE) { - source = XContentHelper.toXContent((UpdateRequest) request, xContentType, false).toBytesRef(); + source = XContentHelper.toXContent((UpdateRequest) request, bulkContentType, false).toBytesRef(); } if (source != null) { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index e00d8316640a6..4686a23b86851 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -450,6 +450,8 @@ public void testBulk() throws IOException { int nbItems = randomIntBetween(10, 100); boolean[] errors = new boolean[nbItems]; + XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE); + BulkRequest bulkRequest = new BulkRequest(); for (int i = 0; i < nbItems; i++) { String id = String.valueOf(i); @@ -466,7 +468,6 @@ public void testBulk() throws IOException { bulkRequest.add(deleteRequest); } else { - XContentType xContentType = randomFrom(XContentType.values()); BytesReference source = XContentBuilder.builder(xContentType.xContent()).startObject().field("id", i).endObject().bytes(); if (opType == DocWriteRequest.OpType.INDEX) { IndexRequest indexRequest = new IndexRequest("index", "test", id).source(source, xContentType); @@ -483,7 +484,8 @@ public void testBulk() throws IOException { bulkRequest.add(createRequest); } else if (opType == DocWriteRequest.OpType.UPDATE) { - UpdateRequest updateRequest = new UpdateRequest("index", "test", id).doc(source, xContentType); + UpdateRequest updateRequest = new UpdateRequest("index", "test", id) + .doc(new IndexRequest().source(source, xContentType)); if (erroneous == false) { assertEquals(RestStatus.CREATED, highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status()); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java index f42d677cab26c..9f9b2e1173838 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java @@ -34,7 +34,6 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.lucene.uid.Versions; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; @@ -46,13 +45,13 @@ import java.io.IOException; import java.io.InputStream; -import java.util.Collections; import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.function.Consumer; import java.util.function.Function; +import static java.util.Collections.singletonMap; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent; public class RequestTests extends ESTestCase { @@ -367,8 +366,8 @@ public void testUpdate() throws IOException { public void testUpdateWithDifferentContentTypes() throws IOException { IllegalStateException exception = expectThrows(IllegalStateException.class, () -> { UpdateRequest updateRequest = new UpdateRequest(); - updateRequest.doc(new IndexRequest().source(Collections.singletonMap("field", "doc"), XContentType.JSON)); - updateRequest.upsert(new IndexRequest().source(Collections.singletonMap("field", "upsert"), XContentType.YAML)); + updateRequest.doc(new IndexRequest().source(singletonMap("field", "doc"), XContentType.JSON)); + updateRequest.upsert(new IndexRequest().source(singletonMap("field", "upsert"), XContentType.YAML)); Request.update(updateRequest); }); assertEquals("Update request cannot have different content types for doc [JSON] and upsert [YAML] documents", @@ -395,13 +394,14 @@ public void testBulk() throws IOException { } } + XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE); + int nbItems = randomIntBetween(10, 100); for (int i = 0; i < nbItems; i++) { String index = randomAsciiOfLength(5); String type = randomAsciiOfLength(5); String id = randomAsciiOfLength(5); - XContentType xContentType = randomFrom(XContentType.values()); BytesReference source = RandomObjects.randomSource(random(), xContentType); DocWriteRequest.OpType opType = randomFrom(DocWriteRequest.OpType.values()); @@ -422,7 +422,7 @@ public void testBulk() throws IOException { createRequest.parent(randomAsciiOfLength(5)); } } else if (opType == DocWriteRequest.OpType.UPDATE) { - final UpdateRequest updateRequest = new UpdateRequest(index, type, id).doc(source, xContentType); + final UpdateRequest updateRequest = new UpdateRequest(index, type, id).doc(new IndexRequest().source(source, xContentType)); docWriteRequest = updateRequest; if (randomBoolean()) { updateRequest.retryOnConflict(randomIntBetween(1, 5)); @@ -460,7 +460,7 @@ public void testBulk() throws IOException { } BulkRequest parsedBulkRequest = new BulkRequest(); - parsedBulkRequest.add(content, 0, content.length, XContentType.JSON); + parsedBulkRequest.add(content, 0, content.length, xContentType); assertEquals(bulkRequest.numberOfActions(), parsedBulkRequest.numberOfActions()); for (int i = 0; i < bulkRequest.numberOfActions(); i++) { @@ -482,6 +482,7 @@ public void testBulk() throws IOException { IndexRequest parsedIndexRequest = (IndexRequest) parsedRequest; assertEquals(indexRequest.getPipeline(), parsedIndexRequest.getPipeline()); + assertToXContentEquivalent(indexRequest.source(), parsedIndexRequest.source(), xContentType); } else if (opType == DocWriteRequest.OpType.UPDATE) { UpdateRequest updateRequest = (UpdateRequest) originalRequest; UpdateRequest parsedUpdateRequest = (UpdateRequest) parsedRequest; @@ -489,21 +490,7 @@ public void testBulk() throws IOException { assertEquals(updateRequest.retryOnConflict(), parsedUpdateRequest.retryOnConflict()); assertEquals(updateRequest.fetchSource(), parsedUpdateRequest.fetchSource()); if (updateRequest.doc() != null) { - BytesReference originalBytes = updateRequest.doc().source(); - XContentType originalContentType = updateRequest.doc().getContentType(); - - BytesReference finalBytes = parsedUpdateRequest.doc().source(); - XContentType finalContentType = parsedUpdateRequest.doc().getContentType(); - - if (finalContentType != originalContentType) { - try (XContentParser parser = finalContentType.xContent().createParser(NamedXContentRegistry.EMPTY, finalBytes)) { - try (XContentBuilder builder = XContentBuilder.builder(originalContentType.xContent())) { - builder.copyCurrentStructure(parser); - finalBytes = builder.bytes(); - } - } - } - assertToXContentEquivalent(originalBytes, finalBytes, originalContentType); + assertToXContentEquivalent(updateRequest.doc().source(), parsedUpdateRequest.doc().source(), xContentType); } else { assertNull(parsedUpdateRequest.doc()); } @@ -511,6 +498,76 @@ public void testBulk() throws IOException { } } + public void testBulkWithDifferentContentTypes() throws IOException { + { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new DeleteRequest("index", "type", "0")); + bulkRequest.add(new UpdateRequest("index", "type", "1").script(new Script("test"))); + bulkRequest.add(new DeleteRequest("index", "type", "2")); + + Request request = Request.bulk(bulkRequest); + assertEquals(XContentType.JSON.mediaType(), request.entity.getContentType().getValue()); + } + { + XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE); + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new DeleteRequest("index", "type", "0")); + bulkRequest.add(new IndexRequest("index", "type", "0").source(singletonMap("field", "value"), xContentType)); + bulkRequest.add(new DeleteRequest("index", "type", "2")); + + Request request = Request.bulk(bulkRequest); + assertEquals(xContentType.mediaType(), request.entity.getContentType().getValue()); + } + { + XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE); + UpdateRequest updateRequest = new UpdateRequest("index", "type", "0"); + if (randomBoolean()) { + updateRequest.doc(new IndexRequest().source(singletonMap("field", "value"), xContentType)); + } + if (randomBoolean()) { + updateRequest.upsert(new IndexRequest().source(singletonMap("field", "value"), xContentType)); + } + + Request request = Request.bulk(new BulkRequest().add(updateRequest)); + assertEquals(xContentType.mediaType(), request.entity.getContentType().getValue()); + } + { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest("index", "type", "0").source(singletonMap("field", "value"), XContentType.SMILE)); + bulkRequest.add(new IndexRequest("index", "type", "1").source(singletonMap("field", "value"), XContentType.JSON)); + IllegalStateException exception = expectThrows(IllegalStateException.class, () -> Request.bulk(bulkRequest)); + assertEquals("Mismatching content-type found for index request at [1] with content-type [JSON], " + + "previous requests have content-type [SMILE]", exception.getMessage()); + } + { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest("index", "type", "0") + .source(singletonMap("field", "value"), XContentType.JSON)); + bulkRequest.add(new IndexRequest("index", "type", "1") + .source(singletonMap("field", "value"), XContentType.JSON)); + bulkRequest.add(new UpdateRequest("index", "type", "2") + .doc(new IndexRequest().source(singletonMap("field", "value"), XContentType.JSON)) + .upsert(new IndexRequest().source(singletonMap("field", "value"), XContentType.SMILE)) + ); + IllegalStateException exception = expectThrows(IllegalStateException.class, () -> Request.bulk(bulkRequest)); + assertEquals("Mismatching content-type found for update request at [2] with content-type [SMILE], " + + "previous requests have content-type [JSON]", exception.getMessage()); + } + { + XContentType xContentType = randomFrom(XContentType.CBOR, XContentType.YAML); + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new DeleteRequest("index", "type", "0")); + bulkRequest.add(new IndexRequest("index", "type", "1").source(singletonMap("field", "value"), XContentType.JSON)); + bulkRequest.add(new DeleteRequest("index", "type", "2")); + bulkRequest.add(new DeleteRequest("index", "type", "3")); + bulkRequest.add(new IndexRequest("index", "type", "4").source(singletonMap("field", "value"), XContentType.JSON)); + bulkRequest.add(new IndexRequest("index", "type", "1").source(singletonMap("field", "value"), xContentType)); + IllegalStateException exception = expectThrows(IllegalStateException.class, () -> Request.bulk(bulkRequest)); + assertEquals("Unsupported content-type found for index request at [5] with content-type [CBOR], " + + "only [JSON, SMILE] are supported", exception.getMessage()); + } + } + public void testParams() { final int nbParams = randomIntBetween(0, 10); Request.Params params = Request.Params.builder(); From bce46bffe8dbdc5948d00238af0e02a79bc1ae0c Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 23 Feb 2017 13:45:37 +0100 Subject: [PATCH 3/4] Update after Luca (2) --- .../org/elasticsearch/client/Request.java | 67 +++++++------------ .../elasticsearch/client/RequestTests.java | 38 +++++++++-- 2 files changed, 57 insertions(+), 48 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java index 977cd761d227b..0e5ff181cd317 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java @@ -34,6 +34,7 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.uid.Versions; @@ -48,10 +49,8 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Locale; import java.util.Map; import java.util.StringJoiner; @@ -93,59 +92,23 @@ static Request bulk(BulkRequest bulkRequest) throws IOException { // Bulk API only supports newline delimited JSON or Smile. Before executing // the bulk, we need to check that all requests have the same content-type // and this content-type is supported by the Bulk API. - List allowedContentTypes = Arrays.asList(XContentType.JSON, XContentType.SMILE); - XContentType bulkContentType = null; for (int i = 0; i < bulkRequest.numberOfActions(); i++) { DocWriteRequest request = bulkRequest.requests().get(i); - XContentType requestContentType = null; - - boolean supported = true; - boolean match = true; DocWriteRequest.OpType opType = request.opType(); if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) { - IndexRequest indexRequest = (IndexRequest) request; - requestContentType = indexRequest.getContentType(); - - supported = allowedContentTypes.contains(requestContentType); - if (bulkContentType == null) { - bulkContentType = requestContentType; - } else { - match = (requestContentType == bulkContentType); - } + bulkContentType = ensureBulkContentType((IndexRequest) request, bulkContentType); } else if (opType == DocWriteRequest.OpType.UPDATE) { UpdateRequest updateRequest = (UpdateRequest) request; if (updateRequest.doc() != null) { - requestContentType = updateRequest.doc().getContentType(); - - supported = allowedContentTypes.contains(requestContentType); - if (bulkContentType == null) { - bulkContentType = requestContentType; - } else { - match = (requestContentType == bulkContentType); - } + bulkContentType = ensureBulkContentType(updateRequest.doc(), bulkContentType); } - if (updateRequest.upsertRequest() != null && supported && match) { - requestContentType = updateRequest.upsertRequest().getContentType(); - - supported = allowedContentTypes.contains(requestContentType); - if (bulkContentType == null) { - bulkContentType = requestContentType; - } else { - match = (requestContentType == bulkContentType); - } + if (updateRequest.upsertRequest() != null) { + bulkContentType = ensureBulkContentType(updateRequest.upsertRequest(), bulkContentType); } } - - if (supported == false) { - throw new IllegalStateException("Unsupported content-type found for " + opType.getLowercase() + " request at [" + i + - "] with content-type [" + requestContentType + "], only " + allowedContentTypes + " are supported"); - } else if (match == false) { - throw new IllegalStateException("Mismatching content-type found for " + opType.getLowercase() + " request at [" + i + - "] with content-type [" + requestContentType + "], previous requests have content-type [" + bulkContentType + "]"); - } } if (bulkContentType == null) { @@ -477,4 +440,24 @@ static Params builder() { return new Params(); } } + + /** + * Ensure that the {@link IndexRequest}'s content type is supported by the Bulk API and that it conforms + * to the current {@link BulkRequest}'s content type (if it's known at the time of this method get called). + */ + static XContentType ensureBulkContentType(IndexRequest indexRequest, @Nullable XContentType xContentType) { + XContentType requestContentType = indexRequest.getContentType(); + if (requestContentType != XContentType.JSON && requestContentType != XContentType.SMILE) { + throw new IllegalStateException("Unsupported content-type found for request with content-type [" + requestContentType + + "], only JSON and SMILE are supported"); + } + if (xContentType == null) { + return requestContentType; + } + if (requestContentType != xContentType) { + throw new IllegalStateException("Mismatching content-type found for request with content-type [" + requestContentType + + "], previous requests have content-type [" + xContentType + "]"); + } + return xContentType; + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java index 9f9b2e1173838..65aa51f75619e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java @@ -52,6 +52,7 @@ import java.util.function.Function; import static java.util.Collections.singletonMap; +import static org.elasticsearch.client.Request.ensureBulkContentType; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent; public class RequestTests extends ESTestCase { @@ -523,8 +524,7 @@ public void testBulkWithDifferentContentTypes() throws IOException { UpdateRequest updateRequest = new UpdateRequest("index", "type", "0"); if (randomBoolean()) { updateRequest.doc(new IndexRequest().source(singletonMap("field", "value"), xContentType)); - } - if (randomBoolean()) { + } else { updateRequest.upsert(new IndexRequest().source(singletonMap("field", "value"), xContentType)); } @@ -536,7 +536,7 @@ public void testBulkWithDifferentContentTypes() throws IOException { bulkRequest.add(new IndexRequest("index", "type", "0").source(singletonMap("field", "value"), XContentType.SMILE)); bulkRequest.add(new IndexRequest("index", "type", "1").source(singletonMap("field", "value"), XContentType.JSON)); IllegalStateException exception = expectThrows(IllegalStateException.class, () -> Request.bulk(bulkRequest)); - assertEquals("Mismatching content-type found for index request at [1] with content-type [JSON], " + + assertEquals("Mismatching content-type found for request with content-type [JSON], " + "previous requests have content-type [SMILE]", exception.getMessage()); } { @@ -550,7 +550,7 @@ public void testBulkWithDifferentContentTypes() throws IOException { .upsert(new IndexRequest().source(singletonMap("field", "value"), XContentType.SMILE)) ); IllegalStateException exception = expectThrows(IllegalStateException.class, () -> Request.bulk(bulkRequest)); - assertEquals("Mismatching content-type found for update request at [2] with content-type [SMILE], " + + assertEquals("Mismatching content-type found for request with content-type [SMILE], " + "previous requests have content-type [JSON]", exception.getMessage()); } { @@ -563,8 +563,8 @@ public void testBulkWithDifferentContentTypes() throws IOException { bulkRequest.add(new IndexRequest("index", "type", "4").source(singletonMap("field", "value"), XContentType.JSON)); bulkRequest.add(new IndexRequest("index", "type", "1").source(singletonMap("field", "value"), xContentType)); IllegalStateException exception = expectThrows(IllegalStateException.class, () -> Request.bulk(bulkRequest)); - assertEquals("Unsupported content-type found for index request at [5] with content-type [CBOR], " + - "only [JSON, SMILE] are supported", exception.getMessage()); + assertEquals("Unsupported content-type found for request with content-type [" + xContentType + + "], only JSON and SMILE are supported", exception.getMessage()); } } @@ -606,6 +606,32 @@ public void testEndpoint() { assertEquals("/a/_create", Request.endpoint("a", null, null, "_create")); } + public void testEnsureBulkContentType() { + XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE); + IndexRequest indexRequest = new IndexRequest().source(singletonMap("field", "value"), xContentType); + assertEquals(xContentType, ensureBulkContentType(indexRequest, null)); + assertEquals(xContentType, ensureBulkContentType(indexRequest, xContentType)); + + XContentType bulkContentType = randomBoolean() ? xContentType : null; + + IllegalStateException exception = expectThrows(IllegalStateException.class, () -> + ensureBulkContentType(new IndexRequest().source(singletonMap("field", "value"), XContentType.CBOR), bulkContentType)); + assertEquals("Unsupported content-type found for request with content-type [CBOR], only JSON and SMILE are supported", + exception.getMessage()); + + exception = expectThrows(IllegalStateException.class, () -> + ensureBulkContentType(new IndexRequest().source(singletonMap("field", "value"), XContentType.YAML), bulkContentType)); + assertEquals("Unsupported content-type found for request with content-type [YAML], only JSON and SMILE are supported", + exception.getMessage()); + + XContentType requestContentType = xContentType == XContentType.JSON ? XContentType.SMILE : XContentType.JSON; + + exception = expectThrows(IllegalStateException.class, () -> + ensureBulkContentType(new IndexRequest().source(singletonMap("field", "value"), requestContentType), xContentType)); + assertEquals("Mismatching content-type found for request with content-type [" + requestContentType + "], " + + "previous requests have content-type [" + xContentType + "]", exception.getMessage()); + } + /** * Randomize the {@link FetchSourceContext} request parameters. */ From d823ca728cd570304447cab6f2998aaf8842b8ee Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 23 Feb 2017 16:33:05 +0100 Subject: [PATCH 4/4] Rename method and use IllegalArgumentException instead --- .../org/elasticsearch/client/Request.java | 14 +++++----- .../elasticsearch/client/RequestTests.java | 26 +++++++++---------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java index 0e5ff181cd317..ecba2953c9499 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java @@ -98,15 +98,15 @@ static Request bulk(BulkRequest bulkRequest) throws IOException { DocWriteRequest.OpType opType = request.opType(); if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) { - bulkContentType = ensureBulkContentType((IndexRequest) request, bulkContentType); + bulkContentType = enforceSameContentType((IndexRequest) request, bulkContentType); } else if (opType == DocWriteRequest.OpType.UPDATE) { UpdateRequest updateRequest = (UpdateRequest) request; if (updateRequest.doc() != null) { - bulkContentType = ensureBulkContentType(updateRequest.doc(), bulkContentType); + bulkContentType = enforceSameContentType(updateRequest.doc(), bulkContentType); } if (updateRequest.upsertRequest() != null) { - bulkContentType = ensureBulkContentType(updateRequest.upsertRequest(), bulkContentType); + bulkContentType = enforceSameContentType(updateRequest.upsertRequest(), bulkContentType); } } } @@ -444,18 +444,20 @@ static Params builder() { /** * Ensure that the {@link IndexRequest}'s content type is supported by the Bulk API and that it conforms * to the current {@link BulkRequest}'s content type (if it's known at the time of this method get called). + * + * @return the {@link IndexRequest}'s content type */ - static XContentType ensureBulkContentType(IndexRequest indexRequest, @Nullable XContentType xContentType) { + static XContentType enforceSameContentType(IndexRequest indexRequest, @Nullable XContentType xContentType) { XContentType requestContentType = indexRequest.getContentType(); if (requestContentType != XContentType.JSON && requestContentType != XContentType.SMILE) { - throw new IllegalStateException("Unsupported content-type found for request with content-type [" + requestContentType + throw new IllegalArgumentException("Unsupported content-type found for request with content-type [" + requestContentType + "], only JSON and SMILE are supported"); } if (xContentType == null) { return requestContentType; } if (requestContentType != xContentType) { - throw new IllegalStateException("Mismatching content-type found for request with content-type [" + requestContentType + throw new IllegalArgumentException("Mismatching content-type found for request with content-type [" + requestContentType + "], previous requests have content-type [" + xContentType + "]"); } return xContentType; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java index 65aa51f75619e..1d61ef87c485e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestTests.java @@ -52,7 +52,7 @@ import java.util.function.Function; import static java.util.Collections.singletonMap; -import static org.elasticsearch.client.Request.ensureBulkContentType; +import static org.elasticsearch.client.Request.enforceSameContentType; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent; public class RequestTests extends ESTestCase { @@ -535,7 +535,7 @@ public void testBulkWithDifferentContentTypes() throws IOException { BulkRequest bulkRequest = new BulkRequest(); bulkRequest.add(new IndexRequest("index", "type", "0").source(singletonMap("field", "value"), XContentType.SMILE)); bulkRequest.add(new IndexRequest("index", "type", "1").source(singletonMap("field", "value"), XContentType.JSON)); - IllegalStateException exception = expectThrows(IllegalStateException.class, () -> Request.bulk(bulkRequest)); + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> Request.bulk(bulkRequest)); assertEquals("Mismatching content-type found for request with content-type [JSON], " + "previous requests have content-type [SMILE]", exception.getMessage()); } @@ -549,7 +549,7 @@ public void testBulkWithDifferentContentTypes() throws IOException { .doc(new IndexRequest().source(singletonMap("field", "value"), XContentType.JSON)) .upsert(new IndexRequest().source(singletonMap("field", "value"), XContentType.SMILE)) ); - IllegalStateException exception = expectThrows(IllegalStateException.class, () -> Request.bulk(bulkRequest)); + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> Request.bulk(bulkRequest)); assertEquals("Mismatching content-type found for request with content-type [SMILE], " + "previous requests have content-type [JSON]", exception.getMessage()); } @@ -562,7 +562,7 @@ public void testBulkWithDifferentContentTypes() throws IOException { bulkRequest.add(new DeleteRequest("index", "type", "3")); bulkRequest.add(new IndexRequest("index", "type", "4").source(singletonMap("field", "value"), XContentType.JSON)); bulkRequest.add(new IndexRequest("index", "type", "1").source(singletonMap("field", "value"), xContentType)); - IllegalStateException exception = expectThrows(IllegalStateException.class, () -> Request.bulk(bulkRequest)); + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> Request.bulk(bulkRequest)); assertEquals("Unsupported content-type found for request with content-type [" + xContentType + "], only JSON and SMILE are supported", exception.getMessage()); } @@ -606,28 +606,28 @@ public void testEndpoint() { assertEquals("/a/_create", Request.endpoint("a", null, null, "_create")); } - public void testEnsureBulkContentType() { + public void testEnforceSameContentType() { XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE); IndexRequest indexRequest = new IndexRequest().source(singletonMap("field", "value"), xContentType); - assertEquals(xContentType, ensureBulkContentType(indexRequest, null)); - assertEquals(xContentType, ensureBulkContentType(indexRequest, xContentType)); + assertEquals(xContentType, enforceSameContentType(indexRequest, null)); + assertEquals(xContentType, enforceSameContentType(indexRequest, xContentType)); XContentType bulkContentType = randomBoolean() ? xContentType : null; - IllegalStateException exception = expectThrows(IllegalStateException.class, () -> - ensureBulkContentType(new IndexRequest().source(singletonMap("field", "value"), XContentType.CBOR), bulkContentType)); + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> + enforceSameContentType(new IndexRequest().source(singletonMap("field", "value"), XContentType.CBOR), bulkContentType)); assertEquals("Unsupported content-type found for request with content-type [CBOR], only JSON and SMILE are supported", exception.getMessage()); - exception = expectThrows(IllegalStateException.class, () -> - ensureBulkContentType(new IndexRequest().source(singletonMap("field", "value"), XContentType.YAML), bulkContentType)); + exception = expectThrows(IllegalArgumentException.class, () -> + enforceSameContentType(new IndexRequest().source(singletonMap("field", "value"), XContentType.YAML), bulkContentType)); assertEquals("Unsupported content-type found for request with content-type [YAML], only JSON and SMILE are supported", exception.getMessage()); XContentType requestContentType = xContentType == XContentType.JSON ? XContentType.SMILE : XContentType.JSON; - exception = expectThrows(IllegalStateException.class, () -> - ensureBulkContentType(new IndexRequest().source(singletonMap("field", "value"), requestContentType), xContentType)); + exception = expectThrows(IllegalArgumentException.class, () -> + enforceSameContentType(new IndexRequest().source(singletonMap("field", "value"), requestContentType), xContentType)); assertEquals("Mismatching content-type found for request with content-type [" + requestContentType + "], " + "previous requests have content-type [" + xContentType + "]", exception.getMessage()); }