Skip to content

Commit 01c46c2

Browse files
committed
Add BulkProcessor methods with XContentType parameter (#23078)
This commit adds methods to the BulkProcessor that accept bytes and a XContentType to avoid content type detection. The methods that do not accept XContentType with bytes have been deprecated by this commit. Relates #22691
1 parent 45c4c02 commit 01c46c2

File tree

2 files changed

+49
-7
lines changed

2 files changed

+49
-7
lines changed

core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.unit.TimeValue;
3131
import org.elasticsearch.common.util.concurrent.EsExecutors;
3232
import org.elasticsearch.common.util.concurrent.FutureUtils;
33+
import org.elasticsearch.common.xcontent.XContentType;
3334

3435
import java.io.Closeable;
3536
import java.util.Objects;
@@ -288,16 +289,46 @@ private synchronized void internalAdd(DocWriteRequest request, @Nullable Object
288289
executeIfNeeded();
289290
}
290291

292+
/**
293+
* Adds the data from the bytes to be processed by the bulk processor
294+
* @deprecated use {@link #add(BytesReference, String, String, XContentType)} instead to avoid content type auto-detection
295+
*/
296+
@Deprecated
291297
public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType) throws Exception {
292298
return add(data, defaultIndex, defaultType, null, null);
293299
}
294300

295-
public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultPipeline, @Nullable Object payload) throws Exception {
301+
/**
302+
* Adds the data from the bytes to be processed by the bulk processor
303+
*/
304+
public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
305+
XContentType xContentType) throws Exception {
306+
return add(data, defaultIndex, defaultType, null, null, xContentType);
307+
}
308+
309+
/**
310+
* Adds the data from the bytes to be processed by the bulk processor
311+
* @deprecated use {@link #add(BytesReference, String, String, String, Object, XContentType)} instead to avoid content type
312+
* auto-detection
313+
*/
314+
@Deprecated
315+
public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
316+
@Nullable String defaultPipeline, @Nullable Object payload) throws Exception {
296317
bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true);
297318
executeIfNeeded();
298319
return this;
299320
}
300321

322+
/**
323+
* Adds the data from the bytes to be processed by the bulk processor
324+
*/
325+
public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
326+
@Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception {
327+
bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true, xContentType);
328+
executeIfNeeded();
329+
return this;
330+
}
331+
301332
private void executeIfNeeded() {
302333
ensureOpen();
303334
if (!isOverTheLimit()) {

core/src/test/java/org/elasticsearch/action/bulk/BulkProcessorIT.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,13 @@
2727
import org.elasticsearch.client.Client;
2828
import org.elasticsearch.client.Requests;
2929
import org.elasticsearch.cluster.metadata.IndexMetaData;
30+
import org.elasticsearch.common.bytes.BytesArray;
3031
import org.elasticsearch.common.settings.Settings;
3132
import org.elasticsearch.common.unit.ByteSizeUnit;
3233
import org.elasticsearch.common.unit.ByteSizeValue;
3334
import org.elasticsearch.common.unit.TimeValue;
35+
import org.elasticsearch.common.xcontent.XContentType;
36+
import org.elasticsearch.common.xcontent.json.JsonXContent;
3437
import org.elasticsearch.env.Environment;
3538
import org.elasticsearch.test.ESIntegTestCase;
3639
import org.elasticsearch.transport.MockTransportClient;
@@ -54,7 +57,7 @@
5457
import static org.hamcrest.Matchers.lessThanOrEqualTo;
5558

5659
public class BulkProcessorIT extends ESIntegTestCase {
57-
public void testThatBulkProcessorCountIsCorrect() throws InterruptedException {
60+
public void testThatBulkProcessorCountIsCorrect() throws Exception {
5861
final CountDownLatch latch = new CountDownLatch(1);
5962
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
6063

@@ -77,7 +80,7 @@ public void testThatBulkProcessorCountIsCorrect() throws InterruptedException {
7780
}
7881
}
7982

80-
public void testBulkProcessorFlush() throws InterruptedException {
83+
public void testBulkProcessorFlush() throws Exception {
8184
final CountDownLatch latch = new CountDownLatch(1);
8285
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
8386

@@ -296,11 +299,18 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
296299
assertMultiGetResponse(multiGetRequestBuilder.get(), testDocs);
297300
}
298301

299-
private static MultiGetRequestBuilder indexDocs(Client client, BulkProcessor processor, int numDocs) {
302+
private static MultiGetRequestBuilder indexDocs(Client client, BulkProcessor processor, int numDocs) throws Exception {
300303
MultiGetRequestBuilder multiGetRequestBuilder = client.prepareMultiGet();
301304
for (int i = 1; i <= numDocs; i++) {
302-
processor.add(new IndexRequest("test", "test", Integer.toString(i))
303-
.source(Requests.INDEX_CONTENT_TYPE, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
305+
if (randomBoolean()) {
306+
processor.add(new IndexRequest("test", "test", Integer.toString(i))
307+
.source(Requests.INDEX_CONTENT_TYPE, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
308+
} else {
309+
final String source = "{ \"index\":{\"_index\":\"test\",\"_type\":\"test\",\"_id\":\"" + Integer.toString(i) + "\"} }\n"
310+
+ JsonXContent.contentBuilder()
311+
.startObject().field("field", randomRealisticUnicodeOfLengthBetween(1, 30)).endObject().string() + "\n";
312+
processor.add(new BytesArray(source), null, null, XContentType.JSON);
313+
}
304314
multiGetRequestBuilder.add("test", "test", Integer.toString(i));
305315
}
306316
return multiGetRequestBuilder;
@@ -313,7 +323,8 @@ private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses
313323
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
314324
assertThat(bulkItemResponse.getType(), equalTo("test"));
315325
assertThat(bulkItemResponse.getId(), equalTo(Integer.toString(i++)));
316-
assertThat(bulkItemResponse.isFailed(), equalTo(false));
326+
assertThat("item " + i + " failed with cause: " + bulkItemResponse.getFailureMessage(),
327+
bulkItemResponse.isFailed(), equalTo(false));
317328
}
318329
}
319330

0 commit comments

Comments
 (0)