-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Add BulkRequest support to High Level Rest client #23312
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,19 +28,26 @@ | |
| import org.apache.http.entity.ContentType; | ||
| import org.apache.lucene.util.BytesRef; | ||
| import org.elasticsearch.action.DocWriteRequest; | ||
| import org.elasticsearch.action.bulk.BulkRequest; | ||
| import org.elasticsearch.action.get.GetRequest; | ||
| import org.elasticsearch.action.index.IndexRequest; | ||
| import org.elasticsearch.action.support.ActiveShardCount; | ||
| import org.elasticsearch.action.support.WriteRequest; | ||
| import org.elasticsearch.action.update.UpdateRequest; | ||
| import org.elasticsearch.common.Nullable; | ||
| import org.elasticsearch.common.Strings; | ||
| import org.elasticsearch.common.bytes.BytesReference; | ||
| import org.elasticsearch.common.lucene.uid.Versions; | ||
| import org.elasticsearch.common.unit.TimeValue; | ||
| import org.elasticsearch.common.xcontent.NamedXContentRegistry; | ||
| import org.elasticsearch.common.xcontent.XContentBuilder; | ||
| import org.elasticsearch.common.xcontent.XContentHelper; | ||
| import org.elasticsearch.common.xcontent.XContentParser; | ||
| import org.elasticsearch.common.xcontent.XContentType; | ||
| import org.elasticsearch.index.VersionType; | ||
| import org.elasticsearch.search.fetch.subphase.FetchSourceContext; | ||
|
|
||
| import java.io.ByteArrayOutputStream; | ||
| import java.io.IOException; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
|
|
@@ -77,6 +84,127 @@ static Request ping() { | |
| return new Request("HEAD", "/", Collections.emptyMap(), null); | ||
| } | ||
|
|
||
| static Request bulk(BulkRequest bulkRequest) throws IOException { | ||
| Params parameters = Params.builder(); | ||
| parameters.withTimeout(bulkRequest.timeout()); | ||
| parameters.withRefreshPolicy(bulkRequest.getRefreshPolicy()); | ||
|
|
||
| // Bulk API only supports newline delimited JSON or Smile. Before executing | ||
| // the bulk, we need to check that all requests have the same content-type | ||
| // and this content-type is supported by the Bulk API. | ||
| XContentType bulkContentType = null; | ||
| for (int i = 0; i < bulkRequest.numberOfActions(); i++) { | ||
| DocWriteRequest<?> request = bulkRequest.requests().get(i); | ||
|
|
||
| DocWriteRequest.OpType opType = request.opType(); | ||
| if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) { | ||
| bulkContentType = enforceSameContentType((IndexRequest) request, bulkContentType); | ||
|
|
||
| } else if (opType == DocWriteRequest.OpType.UPDATE) { | ||
| UpdateRequest updateRequest = (UpdateRequest) request; | ||
| if (updateRequest.doc() != null) { | ||
| bulkContentType = enforceSameContentType(updateRequest.doc(), bulkContentType); | ||
| } | ||
| if (updateRequest.upsertRequest() != null) { | ||
| bulkContentType = enforceSameContentType(updateRequest.upsertRequest(), bulkContentType); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (bulkContentType == null) { | ||
| bulkContentType = XContentType.JSON; | ||
| } | ||
|
|
||
| byte separator = bulkContentType.xContent().streamSeparator(); | ||
| ContentType requestContentType = ContentType.create(bulkContentType.mediaType()); | ||
|
|
||
| ByteArrayOutputStream content = new ByteArrayOutputStream(); | ||
| for (DocWriteRequest<?> request : bulkRequest.requests()) { | ||
| DocWriteRequest.OpType opType = request.opType(); | ||
|
|
||
| try (XContentBuilder metadata = XContentBuilder.builder(bulkContentType.xContent())) { | ||
| metadata.startObject(); | ||
| { | ||
| metadata.startObject(opType.getLowercase()); | ||
| if (Strings.hasLength(request.index())) { | ||
| metadata.field("_index", request.index()); | ||
| } | ||
| if (Strings.hasLength(request.type())) { | ||
| metadata.field("_type", request.type()); | ||
| } | ||
| if (Strings.hasLength(request.id())) { | ||
| metadata.field("_id", request.id()); | ||
| } | ||
| if (Strings.hasLength(request.routing())) { | ||
| metadata.field("_routing", request.routing()); | ||
| } | ||
| if (Strings.hasLength(request.parent())) { | ||
| metadata.field("_parent", request.parent()); | ||
| } | ||
| if (request.version() != Versions.MATCH_ANY) { | ||
| metadata.field("_version", request.version()); | ||
| } | ||
|
|
||
| VersionType versionType = request.versionType(); | ||
| if (versionType != VersionType.INTERNAL) { | ||
| if (versionType == VersionType.EXTERNAL) { | ||
| metadata.field("_version_type", "external"); | ||
| } else if (versionType == VersionType.EXTERNAL_GTE) { | ||
| metadata.field("_version_type", "external_gte"); | ||
| } else if (versionType == VersionType.FORCE) { | ||
| metadata.field("_version_type", "force"); | ||
| } | ||
| } | ||
|
|
||
| if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) { | ||
| IndexRequest indexRequest = (IndexRequest) request; | ||
| if (Strings.hasLength(indexRequest.getPipeline())) { | ||
| metadata.field("pipeline", indexRequest.getPipeline()); | ||
| } | ||
| } else if (opType == DocWriteRequest.OpType.UPDATE) { | ||
| UpdateRequest updateRequest = (UpdateRequest) request; | ||
| if (updateRequest.retryOnConflict() > 0) { | ||
| metadata.field("_retry_on_conflict", updateRequest.retryOnConflict()); | ||
| } | ||
| if (updateRequest.fetchSource() != null) { | ||
| metadata.field("_source", updateRequest.fetchSource()); | ||
| } | ||
| } | ||
| metadata.endObject(); | ||
| } | ||
| metadata.endObject(); | ||
|
|
||
| BytesRef metadataSource = metadata.bytes().toBytesRef(); | ||
| content.write(metadataSource.bytes, metadataSource.offset, metadataSource.length); | ||
| content.write(separator); | ||
| } | ||
|
|
||
| BytesRef source = null; | ||
| if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) { | ||
| IndexRequest indexRequest = (IndexRequest) request; | ||
| BytesReference indexSource = indexRequest.source(); | ||
| XContentType indexXContentType = indexRequest.getContentType(); | ||
|
|
||
| try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, indexSource, indexXContentType)) { | ||
| try (XContentBuilder builder = XContentBuilder.builder(bulkContentType.xContent())) { | ||
| builder.copyCurrentStructure(parser); | ||
| source = builder.bytes().toBytesRef(); | ||
| } | ||
| } | ||
| } else if (opType == DocWriteRequest.OpType.UPDATE) { | ||
| source = XContentHelper.toXContent((UpdateRequest) request, bulkContentType, false).toBytesRef(); | ||
| } | ||
|
|
||
| if (source != null) { | ||
| content.write(source.bytes, source.offset, source.length); | ||
| content.write(separator); | ||
| } | ||
| } | ||
|
|
||
| HttpEntity entity = new ByteArrayEntity(content.toByteArray(), 0, content.size(), requestContentType); | ||
| return new Request(HttpPost.METHOD_NAME, "/_bulk", parameters.getParams(), entity); | ||
| } | ||
|
|
||
| static Request exists(GetRequest getRequest) { | ||
| Request request = get(getRequest); | ||
| return new Request(HttpHead.METHOD_NAME, request.endpoint, request.params, null); | ||
|
|
@@ -312,4 +440,26 @@ static Params builder() { | |
| return new Params(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Ensure that the {@link IndexRequest}'s content type is supported by the Bulk API and that it conforms | ||
| * to the current {@link BulkRequest}'s content type (if it's known at the time of this method get called). | ||
|
||
| * | ||
| * @return the {@link IndexRequest}'s content type | ||
| */ | ||
| static XContentType enforceSameContentType(IndexRequest indexRequest, @Nullable XContentType xContentType) { | ||
| XContentType requestContentType = indexRequest.getContentType(); | ||
| if (requestContentType != XContentType.JSON && requestContentType != XContentType.SMILE) { | ||
| throw new IllegalArgumentException("Unsupported content-type found for request with content-type [" + requestContentType | ||
| + "], only JSON and SMILE are supported"); | ||
| } | ||
| if (xContentType == null) { | ||
| return requestContentType; | ||
| } | ||
| if (requestContentType != xContentType) { | ||
| throw new IllegalArgumentException("Mismatching content-type found for request with content-type [" + requestContentType | ||
| + "], previous requests have content-type [" + xContentType + "]"); | ||
| } | ||
| return xContentType; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,13 +25,18 @@ | |
| import org.elasticsearch.ElasticsearchStatusException; | ||
| import org.elasticsearch.action.DocWriteRequest; | ||
| import org.elasticsearch.action.DocWriteResponse; | ||
| import org.elasticsearch.action.bulk.BulkItemResponse; | ||
| import org.elasticsearch.action.bulk.BulkRequest; | ||
| import org.elasticsearch.action.bulk.BulkResponse; | ||
| import org.elasticsearch.action.delete.DeleteRequest; | ||
| import org.elasticsearch.action.get.GetRequest; | ||
| import org.elasticsearch.action.get.GetResponse; | ||
| import org.elasticsearch.action.index.IndexRequest; | ||
| import org.elasticsearch.action.index.IndexResponse; | ||
| import org.elasticsearch.action.update.UpdateRequest; | ||
| import org.elasticsearch.action.update.UpdateResponse; | ||
| import org.elasticsearch.common.Strings; | ||
| import org.elasticsearch.common.bytes.BytesReference; | ||
| import org.elasticsearch.common.xcontent.XContentBuilder; | ||
| import org.elasticsearch.common.xcontent.XContentType; | ||
| import org.elasticsearch.index.VersionType; | ||
|
|
@@ -440,4 +445,80 @@ public void testUpdate() throws IOException { | |
| exception.getMessage()); | ||
| } | ||
| } | ||
|
|
||
| public void testBulk() throws IOException { | ||
| int nbItems = randomIntBetween(10, 100); | ||
| boolean[] errors = new boolean[nbItems]; | ||
|
|
||
| XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE); | ||
|
|
||
| BulkRequest bulkRequest = new BulkRequest(); | ||
| for (int i = 0; i < nbItems; i++) { | ||
| String id = String.valueOf(i); | ||
| boolean erroneous = randomBoolean(); | ||
| errors[i] = erroneous; | ||
|
|
||
| DocWriteRequest.OpType opType = randomFrom(DocWriteRequest.OpType.values()); | ||
| if (opType == DocWriteRequest.OpType.DELETE) { | ||
| if (erroneous == false) { | ||
| assertEquals(RestStatus.CREATED, | ||
| highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status()); | ||
| } | ||
| DeleteRequest deleteRequest = new DeleteRequest("index", "test", id); | ||
| bulkRequest.add(deleteRequest); | ||
|
|
||
| } else { | ||
| BytesReference source = XContentBuilder.builder(xContentType.xContent()).startObject().field("id", i).endObject().bytes(); | ||
| if (opType == DocWriteRequest.OpType.INDEX) { | ||
| IndexRequest indexRequest = new IndexRequest("index", "test", id).source(source, xContentType); | ||
| if (erroneous) { | ||
| indexRequest.version(12L); | ||
| } | ||
| bulkRequest.add(indexRequest); | ||
|
|
||
| } else if (opType == DocWriteRequest.OpType.CREATE) { | ||
| IndexRequest createRequest = new IndexRequest("index", "test", id).source(source, xContentType).create(true); | ||
| if (erroneous) { | ||
| assertEquals(RestStatus.CREATED, highLevelClient().index(createRequest).status()); | ||
| } | ||
| bulkRequest.add(createRequest); | ||
|
|
||
| } else if (opType == DocWriteRequest.OpType.UPDATE) { | ||
| UpdateRequest updateRequest = new UpdateRequest("index", "test", id) | ||
| .doc(new IndexRequest().source(source, xContentType)); | ||
| if (erroneous == false) { | ||
| assertEquals(RestStatus.CREATED, | ||
| highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status()); | ||
| } | ||
| bulkRequest.add(updateRequest); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| BulkResponse bulkResponse = execute(bulkRequest, highLevelClient()::bulk, highLevelClient()::bulkAsync); | ||
| assertEquals(RestStatus.OK, bulkResponse.status()); | ||
| assertTrue(bulkResponse.getTookInMillis() > 0); | ||
| assertEquals(nbItems, bulkResponse.getItems().length); | ||
|
|
||
| for (int i = 0; i < nbItems; i++) { | ||
| BulkItemResponse bulkItemResponse = bulkResponse.getItems()[i]; | ||
|
|
||
| assertEquals(i, bulkItemResponse.getItemId()); | ||
| assertEquals("index", bulkItemResponse.getIndex()); | ||
| assertEquals("test", bulkItemResponse.getType()); | ||
| assertEquals(String.valueOf(i), bulkItemResponse.getId()); | ||
|
|
||
| DocWriteRequest.OpType requestOpType = bulkRequest.requests().get(i).opType(); | ||
| if (requestOpType == DocWriteRequest.OpType.INDEX || requestOpType == DocWriteRequest.OpType.CREATE) { | ||
| assertEquals(errors[i], bulkItemResponse.isFailed()); | ||
| assertEquals(errors[i] ? RestStatus.INTERNAL_SERVER_ERROR : RestStatus.CREATED, bulkItemResponse.status()); | ||
| } else if (requestOpType == DocWriteRequest.OpType.UPDATE) { | ||
| assertEquals(errors[i], bulkItemResponse.isFailed()); | ||
| assertEquals(errors[i] ? RestStatus.INTERNAL_SERVER_ERROR : RestStatus.OK, bulkItemResponse.status()); | ||
|
||
| } else if (requestOpType == DocWriteRequest.OpType.DELETE) { | ||
| assertFalse(bulkItemResponse.isFailed()); | ||
| assertEquals(errors[i] ? RestStatus.NOT_FOUND : RestStatus.OK, bulkItemResponse.status()); | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interesting idea, to convert all the index requests to the target format, that way we can choose to always use either json or smile, without validating anything. Maybe that's better than what I suggested above. Although maybe tricky cause if a user sends cbor or smile, maybe they really expect cbor or smile to be stored in lucene rather than the json obtained from the conversion, but that is not possible when going through REST. I am on the fence. Maybe we should rather do some validation before we run the request, make sure that all of the requests have the same content-type and that can only be either json or smile. That way if something is not expected users are notified with an error. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I always forgot about Smile... I think it makes sense to check that all requests have the same content type and that it's either JSON or Smile. I updated the code and added tests for that, the first content type found dictates what's expected for the other requests.