Skip to content

Commit 7e3c06c

Browse files
authored
Add BulkRequest support to High Level Rest client (#23312)
This commit adds support for BulkRequest execution in the High Level Rest client.
1 parent a4afc22 commit 7e3c06c

File tree

4 files changed

+480
-3
lines changed

4 files changed

+480
-3
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/Request.java

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,26 @@
2828
import org.apache.http.entity.ContentType;
2929
import org.apache.lucene.util.BytesRef;
3030
import org.elasticsearch.action.DocWriteRequest;
31+
import org.elasticsearch.action.bulk.BulkRequest;
3132
import org.elasticsearch.action.get.GetRequest;
3233
import org.elasticsearch.action.index.IndexRequest;
3334
import org.elasticsearch.action.support.ActiveShardCount;
3435
import org.elasticsearch.action.support.WriteRequest;
3536
import org.elasticsearch.action.update.UpdateRequest;
37+
import org.elasticsearch.common.Nullable;
3638
import org.elasticsearch.common.Strings;
39+
import org.elasticsearch.common.bytes.BytesReference;
3740
import org.elasticsearch.common.lucene.uid.Versions;
3841
import org.elasticsearch.common.unit.TimeValue;
42+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
43+
import org.elasticsearch.common.xcontent.XContentBuilder;
3944
import org.elasticsearch.common.xcontent.XContentHelper;
45+
import org.elasticsearch.common.xcontent.XContentParser;
4046
import org.elasticsearch.common.xcontent.XContentType;
4147
import org.elasticsearch.index.VersionType;
4248
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
4349

50+
import java.io.ByteArrayOutputStream;
4451
import java.io.IOException;
4552
import java.util.Collections;
4653
import java.util.HashMap;
@@ -77,6 +84,127 @@ static Request ping() {
7784
return new Request("HEAD", "/", Collections.emptyMap(), null);
7885
}
7986

87+
static Request bulk(BulkRequest bulkRequest) throws IOException {
88+
Params parameters = Params.builder();
89+
parameters.withTimeout(bulkRequest.timeout());
90+
parameters.withRefreshPolicy(bulkRequest.getRefreshPolicy());
91+
92+
// Bulk API only supports newline delimited JSON or Smile. Before executing
93+
// the bulk, we need to check that all requests have the same content-type
94+
// and this content-type is supported by the Bulk API.
95+
XContentType bulkContentType = null;
96+
for (int i = 0; i < bulkRequest.numberOfActions(); i++) {
97+
DocWriteRequest<?> request = bulkRequest.requests().get(i);
98+
99+
DocWriteRequest.OpType opType = request.opType();
100+
if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
101+
bulkContentType = enforceSameContentType((IndexRequest) request, bulkContentType);
102+
103+
} else if (opType == DocWriteRequest.OpType.UPDATE) {
104+
UpdateRequest updateRequest = (UpdateRequest) request;
105+
if (updateRequest.doc() != null) {
106+
bulkContentType = enforceSameContentType(updateRequest.doc(), bulkContentType);
107+
}
108+
if (updateRequest.upsertRequest() != null) {
109+
bulkContentType = enforceSameContentType(updateRequest.upsertRequest(), bulkContentType);
110+
}
111+
}
112+
}
113+
114+
if (bulkContentType == null) {
115+
bulkContentType = XContentType.JSON;
116+
}
117+
118+
byte separator = bulkContentType.xContent().streamSeparator();
119+
ContentType requestContentType = ContentType.create(bulkContentType.mediaType());
120+
121+
ByteArrayOutputStream content = new ByteArrayOutputStream();
122+
for (DocWriteRequest<?> request : bulkRequest.requests()) {
123+
DocWriteRequest.OpType opType = request.opType();
124+
125+
try (XContentBuilder metadata = XContentBuilder.builder(bulkContentType.xContent())) {
126+
metadata.startObject();
127+
{
128+
metadata.startObject(opType.getLowercase());
129+
if (Strings.hasLength(request.index())) {
130+
metadata.field("_index", request.index());
131+
}
132+
if (Strings.hasLength(request.type())) {
133+
metadata.field("_type", request.type());
134+
}
135+
if (Strings.hasLength(request.id())) {
136+
metadata.field("_id", request.id());
137+
}
138+
if (Strings.hasLength(request.routing())) {
139+
metadata.field("_routing", request.routing());
140+
}
141+
if (Strings.hasLength(request.parent())) {
142+
metadata.field("_parent", request.parent());
143+
}
144+
if (request.version() != Versions.MATCH_ANY) {
145+
metadata.field("_version", request.version());
146+
}
147+
148+
VersionType versionType = request.versionType();
149+
if (versionType != VersionType.INTERNAL) {
150+
if (versionType == VersionType.EXTERNAL) {
151+
metadata.field("_version_type", "external");
152+
} else if (versionType == VersionType.EXTERNAL_GTE) {
153+
metadata.field("_version_type", "external_gte");
154+
} else if (versionType == VersionType.FORCE) {
155+
metadata.field("_version_type", "force");
156+
}
157+
}
158+
159+
if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
160+
IndexRequest indexRequest = (IndexRequest) request;
161+
if (Strings.hasLength(indexRequest.getPipeline())) {
162+
metadata.field("pipeline", indexRequest.getPipeline());
163+
}
164+
} else if (opType == DocWriteRequest.OpType.UPDATE) {
165+
UpdateRequest updateRequest = (UpdateRequest) request;
166+
if (updateRequest.retryOnConflict() > 0) {
167+
metadata.field("_retry_on_conflict", updateRequest.retryOnConflict());
168+
}
169+
if (updateRequest.fetchSource() != null) {
170+
metadata.field("_source", updateRequest.fetchSource());
171+
}
172+
}
173+
metadata.endObject();
174+
}
175+
metadata.endObject();
176+
177+
BytesRef metadataSource = metadata.bytes().toBytesRef();
178+
content.write(metadataSource.bytes, metadataSource.offset, metadataSource.length);
179+
content.write(separator);
180+
}
181+
182+
BytesRef source = null;
183+
if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
184+
IndexRequest indexRequest = (IndexRequest) request;
185+
BytesReference indexSource = indexRequest.source();
186+
XContentType indexXContentType = indexRequest.getContentType();
187+
188+
try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, indexSource, indexXContentType)) {
189+
try (XContentBuilder builder = XContentBuilder.builder(bulkContentType.xContent())) {
190+
builder.copyCurrentStructure(parser);
191+
source = builder.bytes().toBytesRef();
192+
}
193+
}
194+
} else if (opType == DocWriteRequest.OpType.UPDATE) {
195+
source = XContentHelper.toXContent((UpdateRequest) request, bulkContentType, false).toBytesRef();
196+
}
197+
198+
if (source != null) {
199+
content.write(source.bytes, source.offset, source.length);
200+
content.write(separator);
201+
}
202+
}
203+
204+
HttpEntity entity = new ByteArrayEntity(content.toByteArray(), 0, content.size(), requestContentType);
205+
return new Request(HttpPost.METHOD_NAME, "/_bulk", parameters.getParams(), entity);
206+
}
207+
80208
static Request exists(GetRequest getRequest) {
81209
Request request = get(getRequest);
82210
return new Request(HttpHead.METHOD_NAME, request.endpoint, request.params, null);
@@ -312,4 +440,26 @@ static Params builder() {
312440
return new Params();
313441
}
314442
}
443+
444+
/**
445+
* Ensure that the {@link IndexRequest}'s content type is supported by the Bulk API and that it conforms
446+
* to the current {@link BulkRequest}'s content type (if it's known at the time of this method get called).
447+
*
448+
* @return the {@link IndexRequest}'s content type
449+
*/
450+
static XContentType enforceSameContentType(IndexRequest indexRequest, @Nullable XContentType xContentType) {
451+
XContentType requestContentType = indexRequest.getContentType();
452+
if (requestContentType != XContentType.JSON && requestContentType != XContentType.SMILE) {
453+
throw new IllegalArgumentException("Unsupported content-type found for request with content-type [" + requestContentType
454+
+ "], only JSON and SMILE are supported");
455+
}
456+
if (xContentType == null) {
457+
return requestContentType;
458+
}
459+
if (requestContentType != xContentType) {
460+
throw new IllegalArgumentException("Mismatching content-type found for request with content-type [" + requestContentType
461+
+ "], previous requests have content-type [" + xContentType + "]");
462+
}
463+
return xContentType;
464+
}
315465
}

client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.elasticsearch.action.ActionListener;
2727
import org.elasticsearch.action.ActionRequest;
2828
import org.elasticsearch.action.ActionRequestValidationException;
29+
import org.elasticsearch.action.bulk.BulkRequest;
30+
import org.elasticsearch.action.bulk.BulkResponse;
2931
import org.elasticsearch.action.get.GetRequest;
3032
import org.elasticsearch.action.get.GetResponse;
3133
import org.elasticsearch.action.index.IndexRequest;
@@ -59,6 +61,24 @@ public RestHighLevelClient(RestClient client) {
5961
this.client = Objects.requireNonNull(client);
6062
}
6163

64+
/**
65+
* Executes a bulk request using the Bulk API
66+
*
67+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk API on elastic.co</a>
68+
*/
69+
public BulkResponse bulk(BulkRequest bulkRequest, Header... headers) throws IOException {
70+
return performRequestAndParseEntity(bulkRequest, Request::bulk, BulkResponse::fromXContent, emptySet(), headers);
71+
}
72+
73+
/**
74+
* Asynchronously executes a bulk request using the Bulk API
75+
*
76+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk API on elastic.co</a>
77+
*/
78+
public void bulkAsync(BulkRequest bulkRequest, ActionListener<BulkResponse> listener, Header... headers) {
79+
performRequestAsyncAndParseEntity(bulkRequest, Request::bulk, BulkResponse::fromXContent, listener, emptySet(), headers);
80+
}
81+
6282
/**
6383
* Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise
6484
*/

client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,18 @@
2525
import org.elasticsearch.ElasticsearchStatusException;
2626
import org.elasticsearch.action.DocWriteRequest;
2727
import org.elasticsearch.action.DocWriteResponse;
28+
import org.elasticsearch.action.bulk.BulkItemResponse;
29+
import org.elasticsearch.action.bulk.BulkRequest;
30+
import org.elasticsearch.action.bulk.BulkResponse;
31+
import org.elasticsearch.action.delete.DeleteRequest;
2832
import org.elasticsearch.action.get.GetRequest;
2933
import org.elasticsearch.action.get.GetResponse;
3034
import org.elasticsearch.action.index.IndexRequest;
3135
import org.elasticsearch.action.index.IndexResponse;
3236
import org.elasticsearch.action.update.UpdateRequest;
3337
import org.elasticsearch.action.update.UpdateResponse;
3438
import org.elasticsearch.common.Strings;
39+
import org.elasticsearch.common.bytes.BytesReference;
3540
import org.elasticsearch.common.xcontent.XContentBuilder;
3641
import org.elasticsearch.common.xcontent.XContentType;
3742
import org.elasticsearch.index.VersionType;
@@ -440,4 +445,80 @@ public void testUpdate() throws IOException {
440445
exception.getMessage());
441446
}
442447
}
448+
449+
public void testBulk() throws IOException {
450+
int nbItems = randomIntBetween(10, 100);
451+
boolean[] errors = new boolean[nbItems];
452+
453+
XContentType xContentType = randomFrom(XContentType.JSON, XContentType.SMILE);
454+
455+
BulkRequest bulkRequest = new BulkRequest();
456+
for (int i = 0; i < nbItems; i++) {
457+
String id = String.valueOf(i);
458+
boolean erroneous = randomBoolean();
459+
errors[i] = erroneous;
460+
461+
DocWriteRequest.OpType opType = randomFrom(DocWriteRequest.OpType.values());
462+
if (opType == DocWriteRequest.OpType.DELETE) {
463+
if (erroneous == false) {
464+
assertEquals(RestStatus.CREATED,
465+
highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status());
466+
}
467+
DeleteRequest deleteRequest = new DeleteRequest("index", "test", id);
468+
bulkRequest.add(deleteRequest);
469+
470+
} else {
471+
BytesReference source = XContentBuilder.builder(xContentType.xContent()).startObject().field("id", i).endObject().bytes();
472+
if (opType == DocWriteRequest.OpType.INDEX) {
473+
IndexRequest indexRequest = new IndexRequest("index", "test", id).source(source, xContentType);
474+
if (erroneous) {
475+
indexRequest.version(12L);
476+
}
477+
bulkRequest.add(indexRequest);
478+
479+
} else if (opType == DocWriteRequest.OpType.CREATE) {
480+
IndexRequest createRequest = new IndexRequest("index", "test", id).source(source, xContentType).create(true);
481+
if (erroneous) {
482+
assertEquals(RestStatus.CREATED, highLevelClient().index(createRequest).status());
483+
}
484+
bulkRequest.add(createRequest);
485+
486+
} else if (opType == DocWriteRequest.OpType.UPDATE) {
487+
UpdateRequest updateRequest = new UpdateRequest("index", "test", id)
488+
.doc(new IndexRequest().source(source, xContentType));
489+
if (erroneous == false) {
490+
assertEquals(RestStatus.CREATED,
491+
highLevelClient().index(new IndexRequest("index", "test", id).source("field", -1)).status());
492+
}
493+
bulkRequest.add(updateRequest);
494+
}
495+
}
496+
}
497+
498+
BulkResponse bulkResponse = execute(bulkRequest, highLevelClient()::bulk, highLevelClient()::bulkAsync);
499+
assertEquals(RestStatus.OK, bulkResponse.status());
500+
assertTrue(bulkResponse.getTookInMillis() > 0);
501+
assertEquals(nbItems, bulkResponse.getItems().length);
502+
503+
for (int i = 0; i < nbItems; i++) {
504+
BulkItemResponse bulkItemResponse = bulkResponse.getItems()[i];
505+
506+
assertEquals(i, bulkItemResponse.getItemId());
507+
assertEquals("index", bulkItemResponse.getIndex());
508+
assertEquals("test", bulkItemResponse.getType());
509+
assertEquals(String.valueOf(i), bulkItemResponse.getId());
510+
511+
DocWriteRequest.OpType requestOpType = bulkRequest.requests().get(i).opType();
512+
if (requestOpType == DocWriteRequest.OpType.INDEX || requestOpType == DocWriteRequest.OpType.CREATE) {
513+
assertEquals(errors[i], bulkItemResponse.isFailed());
514+
assertEquals(errors[i] ? RestStatus.INTERNAL_SERVER_ERROR : RestStatus.CREATED, bulkItemResponse.status());
515+
} else if (requestOpType == DocWriteRequest.OpType.UPDATE) {
516+
assertEquals(errors[i], bulkItemResponse.isFailed());
517+
assertEquals(errors[i] ? RestStatus.INTERNAL_SERVER_ERROR : RestStatus.OK, bulkItemResponse.status());
518+
} else if (requestOpType == DocWriteRequest.OpType.DELETE) {
519+
assertFalse(bulkItemResponse.isFailed());
520+
assertEquals(errors[i] ? RestStatus.NOT_FOUND : RestStatus.OK, bulkItemResponse.status());
521+
}
522+
}
523+
}
443524
}

0 commit comments

Comments
 (0)