Skip to content

Commit 440d1ed

Browse files
authored
Fix failures in BulkProcessorIT#testGlobalParametersAndBulkProcessor. (#38129)
This PR fixes a couple test issues: * It narrows an assertWarnings call that was too broad, and wasn't always applicable with certain random sequences. * Previously, we could send a typeless bulk request containing '_type: 'null'. Now we omit the _type key altogether for typeless requests.
1 parent c9701be commit 440d1ed

File tree

1 file changed

+35
-27
lines changed

1 file changed

+35
-27
lines changed

client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.common.unit.ByteSizeUnit;
3535
import org.elasticsearch.common.unit.ByteSizeValue;
3636
import org.elasticsearch.common.unit.TimeValue;
37+
import org.elasticsearch.common.xcontent.XContentBuilder;
3738
import org.elasticsearch.common.xcontent.XContentType;
3839
import org.elasticsearch.index.mapper.MapperService;
3940
import org.elasticsearch.rest.action.document.RestBulkAction;
@@ -75,12 +76,12 @@ private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.List
7576
(request, bulkListener) -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT,
7677
bulkListener), listener);
7778
}
78-
79+
7980
private static BulkProcessor.Builder initBulkProcessorBuilderUsingTypes(BulkProcessor.Listener listener) {
8081
return BulkProcessor.builder(
8182
(request, bulkListener) -> highLevelClient().bulkAsync(request, expectWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE),
8283
bulkListener), listener);
83-
}
84+
}
8485

8586
public void testThatBulkProcessorCountIsCorrect() throws Exception {
8687
final CountDownLatch latch = new CountDownLatch(1);
@@ -383,22 +384,22 @@ public void testGlobalParametersAndBulkProcessor() throws Exception {
383384
.build()) {
384385
indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
385386
latch.await();
386-
387+
387388
assertThat(listener.beforeCounts.get(), equalTo(1));
388389
assertThat(listener.afterCounts.get(), equalTo(1));
389390
assertThat(listener.bulkFailures.size(), equalTo(0));
390391
assertResponseItems(listener.bulkItems, numDocs, localType);
391-
392+
392393
Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));
393-
394+
394395
assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
395396
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType(localType))));
396397
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
397398
}
398399
}
399400
{
400401
//Check that untyped document additions and untyped global inherit the established custom type
401-
// (the custom document type introduced to the mapping by the earlier code in this test)
402+
// (the custom document type introduced to the mapping by the earlier code in this test)
402403
String globalType = null;
403404
String localType = null;
404405
final CountDownLatch latch = new CountDownLatch(1);
@@ -414,20 +415,19 @@ public void testGlobalParametersAndBulkProcessor() throws Exception {
414415
.build()) {
415416
indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id");
416417
latch.await();
417-
418+
418419
assertThat(listener.beforeCounts.get(), equalTo(1));
419420
assertThat(listener.afterCounts.get(), equalTo(1));
420421
assertThat(listener.bulkFailures.size(), equalTo(0));
421422
assertResponseItems(listener.bulkItems, numDocs, MapperService.SINGLE_MAPPING_NAME);
422-
423+
423424
Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));
424-
425+
425426
assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
426427
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType(customType))));
427428
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
428429
}
429-
}
430-
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
430+
}
431431
}
432432

433433
@SuppressWarnings("unchecked")
@@ -438,8 +438,8 @@ private Matcher<SearchHit>[] expectedIds(int numDocs) {
438438
.<Matcher<SearchHit>>toArray(Matcher[]::new);
439439
}
440440

441-
private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType,
442-
String globalIndex, String globalType, String globalPipeline) throws Exception {
441+
private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType,
442+
String globalIndex, String globalType, String globalPipeline) throws Exception {
443443
MultiGetRequest multiGetRequest = new MultiGetRequest();
444444
for (int i = 1; i <= numDocs; i++) {
445445
if (randomBoolean()) {
@@ -448,33 +448,41 @@ private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, S
448448
} else {
449449
BytesArray data = bytesBulkRequest(localIndex, localType, i);
450450
processor.add(data, globalIndex, globalType, globalPipeline, null, XContentType.JSON);
451+
452+
if (localType != null) {
453+
// If the payload contains types, parsing it into a bulk request results in a warning.
454+
assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE);
455+
}
451456
}
452457
multiGetRequest.add(localIndex, Integer.toString(i));
453458
}
454459
return multiGetRequest;
455460
}
456461

457462
private static BytesArray bytesBulkRequest(String localIndex, String localType, int id) throws IOException {
458-
String action = Strings.toString(jsonBuilder()
459-
.startObject()
460-
.startObject("index")
461-
.field("_index", localIndex)
462-
.field("_type", localType)
463-
.field("_id", Integer.toString(id))
464-
.endObject()
465-
.endObject()
466-
);
467-
String source = Strings.toString(jsonBuilder()
463+
XContentBuilder action = jsonBuilder().startObject().startObject("index");
464+
465+
if (localIndex != null) {
466+
action.field("_index", localIndex);
467+
}
468+
469+
if (localType != null) {
470+
action.field("_type", localType);
471+
}
472+
473+
action.field("_id", Integer.toString(id));
474+
action.endObject().endObject();
475+
476+
XContentBuilder source = jsonBuilder()
468477
.startObject()
469478
.field("field", randomRealisticUnicodeOfLengthBetween(1, 30))
470-
.endObject()
471-
);
479+
.endObject();
472480

473-
String request = action + "\n" + source + "\n";
481+
String request = Strings.toString(action) + "\n" + Strings.toString(source) + "\n";
474482
return new BytesArray(request);
475483
}
476484

477-
private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
485+
private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
478486
return indexDocs(processor, numDocs, "test", null, null, null, null);
479487
}
480488

0 commit comments

Comments
 (0)