Skip to content

Commit 39ed76c

Browse files
authored
Add parsing method to bulk response (#23234)
This commit adds the `fromXContent()` parsing method to BulkResponse.
1 parent c88eb00 commit 39ed76c

File tree

3 files changed

+185
-34
lines changed

3 files changed

+185
-34
lines changed

core/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,32 @@
2323
import org.elasticsearch.common.io.stream.StreamInput;
2424
import org.elasticsearch.common.io.stream.StreamOutput;
2525
import org.elasticsearch.common.unit.TimeValue;
26+
import org.elasticsearch.common.xcontent.StatusToXContentObject;
27+
import org.elasticsearch.common.xcontent.XContentBuilder;
28+
import org.elasticsearch.common.xcontent.XContentParser;
29+
import org.elasticsearch.rest.RestStatus;
2630

2731
import java.io.IOException;
32+
import java.util.ArrayList;
2833
import java.util.Arrays;
2934
import java.util.Iterator;
35+
import java.util.List;
36+
37+
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
38+
import static org.elasticsearch.common.xcontent.XContentParserUtils.throwUnknownField;
39+
import static org.elasticsearch.common.xcontent.XContentParserUtils.throwUnknownToken;
3040

3141
/**
3242
* A response of a bulk execution. Holding a response for each item responding (in order) of the
3343
* bulk requests. Each item holds the index/type/id is operated on, and if it failed or not (with the
3444
* failure message).
3545
*/
36-
public class BulkResponse extends ActionResponse implements Iterable<BulkItemResponse> {
46+
public class BulkResponse extends ActionResponse implements Iterable<BulkItemResponse>, StatusToXContentObject {
47+
48+
private static final String ITEMS = "items";
49+
private static final String ERRORS = "errors";
50+
private static final String TOOK = "took";
51+
private static final String INGEST_TOOK = "ingest_took";
3752

3853
public static final long NO_INGEST_TOOK = -1L;
3954

@@ -141,4 +156,61 @@ public void writeTo(StreamOutput out) throws IOException {
141156
out.writeVLong(tookInMillis);
142157
out.writeZLong(ingestTookInMillis);
143158
}
159+
160+
@Override
161+
public RestStatus status() {
162+
return RestStatus.OK;
163+
}
164+
165+
@Override
166+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
167+
builder.startObject();
168+
builder.field(TOOK, tookInMillis);
169+
if (ingestTookInMillis != BulkResponse.NO_INGEST_TOOK) {
170+
builder.field(INGEST_TOOK, ingestTookInMillis);
171+
}
172+
builder.field(ERRORS, hasFailures());
173+
builder.startArray(ITEMS);
174+
for (BulkItemResponse item : this) {
175+
item.toXContent(builder, params);
176+
}
177+
builder.endArray();
178+
builder.endObject();
179+
return builder;
180+
}
181+
182+
public static BulkResponse fromXContent(XContentParser parser) throws IOException {
183+
XContentParser.Token token = parser.nextToken();
184+
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser::getTokenLocation);
185+
186+
long took = -1L;
187+
long ingestTook = NO_INGEST_TOOK;
188+
List<BulkItemResponse> items = new ArrayList<>();
189+
190+
String currentFieldName = parser.currentName();
191+
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
192+
if (token == XContentParser.Token.FIELD_NAME) {
193+
currentFieldName = parser.currentName();
194+
} else if (token.isValue()) {
195+
if (TOOK.equals(currentFieldName)) {
196+
took = parser.longValue();
197+
} else if (INGEST_TOOK.equals(currentFieldName)) {
198+
ingestTook = parser.longValue();
199+
} else if (ERRORS.equals(currentFieldName) == false) {
200+
throwUnknownField(currentFieldName, parser.getTokenLocation());
201+
}
202+
} else if (token == XContentParser.Token.START_ARRAY) {
203+
if (ITEMS.equals(currentFieldName)) {
204+
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
205+
items.add(BulkItemResponse.fromXContent(parser, items.size()));
206+
}
207+
} else {
208+
throwUnknownField(currentFieldName, parser.getTokenLocation());
209+
}
210+
} else {
211+
throwUnknownToken(token, parser.getTokenLocation());
212+
}
213+
}
214+
return new BulkResponse(items.toArray(new BulkItemResponse[items.size()]), took, ingestTook);
215+
}
144216
}

core/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919

2020
package org.elasticsearch.rest.action.document;
2121

22-
import org.elasticsearch.action.bulk.BulkItemResponse;
2322
import org.elasticsearch.action.bulk.BulkRequest;
24-
import org.elasticsearch.action.bulk.BulkResponse;
2523
import org.elasticsearch.action.bulk.BulkShardRequest;
2624
import org.elasticsearch.action.support.ActiveShardCount;
2725
import org.elasticsearch.client.Requests;
@@ -30,20 +28,16 @@
3028
import org.elasticsearch.common.logging.DeprecationLogger;
3129
import org.elasticsearch.common.logging.Loggers;
3230
import org.elasticsearch.common.settings.Settings;
33-
import org.elasticsearch.common.xcontent.XContentBuilder;
3431
import org.elasticsearch.rest.BaseRestHandler;
35-
import org.elasticsearch.rest.BytesRestResponse;
3632
import org.elasticsearch.rest.RestController;
3733
import org.elasticsearch.rest.RestRequest;
38-
import org.elasticsearch.rest.RestResponse;
39-
import org.elasticsearch.rest.action.RestBuilderListener;
34+
import org.elasticsearch.rest.action.RestStatusToXContentListener;
4035
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
4136

4237
import java.io.IOException;
4338

4439
import static org.elasticsearch.rest.RestRequest.Method.POST;
4540
import static org.elasticsearch.rest.RestRequest.Method.PUT;
46-
import static org.elasticsearch.rest.RestStatus.OK;
4741

4842
/**
4943
* <pre>
@@ -95,36 +89,11 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
9589
bulkRequest.add(request.content(), defaultIndex, defaultType, defaultRouting, defaultFields,
9690
defaultFetchSourceContext, defaultPipeline, null, allowExplicitIndex, request.getXContentType());
9791

98-
return channel -> client.bulk(bulkRequest, new RestBuilderListener<BulkResponse>(channel) {
99-
@Override
100-
public RestResponse buildResponse(BulkResponse response, XContentBuilder builder) throws Exception {
101-
builder.startObject();
102-
builder.field(Fields.TOOK, response.getTookInMillis());
103-
if (response.getIngestTookInMillis() != BulkResponse.NO_INGEST_TOOK) {
104-
builder.field(Fields.INGEST_TOOK, response.getIngestTookInMillis());
105-
}
106-
builder.field(Fields.ERRORS, response.hasFailures());
107-
builder.startArray(Fields.ITEMS);
108-
for (BulkItemResponse itemResponse : response) {
109-
itemResponse.toXContent(builder, request);
110-
}
111-
builder.endArray();
112-
113-
builder.endObject();
114-
return new BytesRestResponse(OK, builder);
115-
}
116-
});
92+
return channel -> client.bulk(bulkRequest, new RestStatusToXContentListener<>(channel));
11793
}
11894

11995
@Override
12096
public boolean supportsContentStream() {
12197
return true;
12298
}
123-
124-
static final class Fields {
125-
static final String ITEMS = "items";
126-
static final String ERRORS = "errors";
127-
static final String TOOK = "took";
128-
static final String INGEST_TOOK = "ingest_took";
129-
}
13099
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.bulk;
21+
22+
import org.elasticsearch.ElasticsearchException;
23+
import org.elasticsearch.action.DocWriteRequest;
24+
import org.elasticsearch.action.DocWriteResponse;
25+
import org.elasticsearch.action.delete.DeleteResponseTests;
26+
import org.elasticsearch.action.index.IndexResponseTests;
27+
import org.elasticsearch.action.update.UpdateResponseTests;
28+
import org.elasticsearch.common.bytes.BytesReference;
29+
import org.elasticsearch.common.collect.Tuple;
30+
import org.elasticsearch.common.xcontent.XContentParser;
31+
import org.elasticsearch.common.xcontent.XContentType;
32+
import org.elasticsearch.test.ESTestCase;
33+
34+
import java.io.IOException;
35+
36+
import static org.elasticsearch.ElasticsearchExceptionTests.randomExceptions;
37+
import static org.elasticsearch.action.bulk.BulkItemResponseTests.assertBulkItemResponse;
38+
import static org.elasticsearch.action.bulk.BulkResponse.NO_INGEST_TOOK;
39+
import static org.elasticsearch.common.xcontent.XContentHelper.toXContent;
40+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
41+
42+
public class BulkResponseTests extends ESTestCase {
43+
44+
public void testToAndFromXContent() throws IOException {
45+
XContentType xContentType = randomFrom(XContentType.values());
46+
boolean humanReadable = randomBoolean();
47+
48+
long took = randomFrom(randomNonNegativeLong(), -1L);
49+
long ingestTook = randomFrom(randomNonNegativeLong(), NO_INGEST_TOOK);
50+
int nbBulkItems = randomIntBetween(1, 10);
51+
52+
BulkItemResponse[] bulkItems = new BulkItemResponse[nbBulkItems];
53+
BulkItemResponse[] expectedBulkItems = new BulkItemResponse[nbBulkItems];
54+
55+
for (int i = 0; i < nbBulkItems; i++) {
56+
DocWriteRequest.OpType opType = randomFrom(DocWriteRequest.OpType.values());
57+
58+
if (frequently()) {
59+
Tuple<? extends DocWriteResponse, ? extends DocWriteResponse> randomDocWriteResponses = null;
60+
if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
61+
randomDocWriteResponses = IndexResponseTests.randomIndexResponse();
62+
} else if (opType == DocWriteRequest.OpType.DELETE) {
63+
randomDocWriteResponses = DeleteResponseTests.randomDeleteResponse();
64+
} else if (opType == DocWriteRequest.OpType.UPDATE) {
65+
randomDocWriteResponses = UpdateResponseTests.randomUpdateResponse(xContentType);
66+
} else {
67+
fail("Test does not support opType [" + opType + "]");
68+
}
69+
70+
bulkItems[i] = new BulkItemResponse(i, opType, randomDocWriteResponses.v1());
71+
expectedBulkItems[i] = new BulkItemResponse(i, opType, randomDocWriteResponses.v2());
72+
} else {
73+
String index = randomAsciiOfLength(5);
74+
String type = randomAsciiOfLength(5);
75+
String id = randomAsciiOfLength(5);
76+
77+
Tuple<Throwable, ElasticsearchException> failures = randomExceptions();
78+
bulkItems[i] = new BulkItemResponse(i, opType, new BulkItemResponse.Failure(index, type, id, (Exception) failures.v1()));
79+
expectedBulkItems[i] = new BulkItemResponse(i, opType, new BulkItemResponse.Failure(index, type, id, failures.v2()));
80+
}
81+
}
82+
83+
BulkResponse bulkResponse = new BulkResponse(bulkItems, took, ingestTook);
84+
BytesReference originalBytes = toXContent(bulkResponse, xContentType, humanReadable);
85+
86+
if (randomBoolean()) {
87+
try (XContentParser parser = createParser(xContentType.xContent(), originalBytes)) {
88+
originalBytes = shuffleXContent(parser, randomBoolean()).bytes();
89+
}
90+
}
91+
92+
BulkResponse parsedBulkResponse;
93+
try (XContentParser parser = createParser(xContentType.xContent(), originalBytes)) {
94+
parsedBulkResponse = BulkResponse.fromXContent(parser);
95+
assertNull(parser.nextToken());
96+
}
97+
98+
assertEquals(took, parsedBulkResponse.getTookInMillis());
99+
assertEquals(ingestTook, parsedBulkResponse.getIngestTookInMillis());
100+
assertEquals(expectedBulkItems.length, parsedBulkResponse.getItems().length);
101+
102+
for (int i = 0; i < expectedBulkItems.length; i++) {
103+
assertBulkItemResponse(expectedBulkItems[i], parsedBulkResponse.getItems()[i]);
104+
}
105+
106+
BytesReference finalBytes = toXContent(parsedBulkResponse, xContentType, humanReadable);
107+
BytesReference expectedFinalBytes = toXContent(parsedBulkResponse, xContentType, humanReadable);
108+
assertToXContentEquivalent(expectedFinalBytes, finalBytes, xContentType);
109+
}
110+
}

0 commit comments

Comments
 (0)