Skip to content

Commit 77685b2

Browse files
committed
Bulk processor#awaitClose to close scheduler (#29263)
When the `BulkProcessor` is used with the high-level REST client, a scheduler is internally created that allows to schedule tasks. Such scheduler is not exposed to users and needs to be closed once the `BulkProcessor` is closed. There are two ways to close the `BulkProcessor` though, one is the ordinary `close` method and the other one is `awaitClose`. The former closes the scheduler while the latter doesn't, leaving threads lingering.
1 parent 046f84b commit 77685b2

File tree

3 files changed

+382
-2
lines changed

3 files changed

+382
-2
lines changed
Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.client;
21+
22+
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
23+
import org.apache.http.entity.ContentType;
24+
import org.apache.http.nio.entity.NStringEntity;
25+
import org.elasticsearch.action.bulk.BulkItemResponse;
26+
import org.elasticsearch.action.bulk.BulkProcessor;
27+
import org.elasticsearch.action.bulk.BulkRequest;
28+
import org.elasticsearch.action.bulk.BulkResponse;
29+
import org.elasticsearch.action.get.MultiGetItemResponse;
30+
import org.elasticsearch.action.get.MultiGetRequest;
31+
import org.elasticsearch.action.get.MultiGetResponse;
32+
import org.elasticsearch.action.index.IndexRequest;
33+
import org.elasticsearch.common.Strings;
34+
import org.elasticsearch.common.bytes.BytesArray;
35+
import org.elasticsearch.common.unit.ByteSizeUnit;
36+
import org.elasticsearch.common.unit.ByteSizeValue;
37+
import org.elasticsearch.common.unit.TimeValue;
38+
import org.elasticsearch.common.xcontent.XContentType;
39+
import org.elasticsearch.common.xcontent.json.JsonXContent;
40+
41+
import java.util.Arrays;
42+
import java.util.Collections;
43+
import java.util.HashSet;
44+
import java.util.List;
45+
import java.util.Set;
46+
import java.util.concurrent.CopyOnWriteArrayList;
47+
import java.util.concurrent.CountDownLatch;
48+
import java.util.concurrent.TimeUnit;
49+
import java.util.concurrent.atomic.AtomicInteger;
50+
51+
import static org.hamcrest.Matchers.both;
52+
import static org.hamcrest.Matchers.either;
53+
import static org.hamcrest.Matchers.equalTo;
54+
import static org.hamcrest.Matchers.greaterThan;
55+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
56+
import static org.hamcrest.Matchers.is;
57+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
58+
59+
public class BulkProcessorIT extends ESRestHighLevelClientTestCase {
60+
61+
private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.Listener listener) {
62+
return BulkProcessor.builder(highLevelClient()::bulkAsync, listener);
63+
}
64+
65+
public void testThatBulkProcessorCountIsCorrect() throws Exception {
66+
final CountDownLatch latch = new CountDownLatch(1);
67+
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
68+
69+
int numDocs = randomIntBetween(10, 100);
70+
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
71+
//let's make sure that the bulk action limit trips, one single execution will index all the documents
72+
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
73+
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
74+
.build()) {
75+
76+
MultiGetRequest multiGetRequest = indexDocs(processor, numDocs);
77+
78+
latch.await();
79+
80+
assertThat(listener.beforeCounts.get(), equalTo(1));
81+
assertThat(listener.afterCounts.get(), equalTo(1));
82+
assertThat(listener.bulkFailures.size(), equalTo(0));
83+
assertResponseItems(listener.bulkItems, numDocs);
84+
assertMultiGetResponse(highLevelClient().multiGet(multiGetRequest), numDocs);
85+
}
86+
}
87+
88+
public void testBulkProcessorFlush() throws Exception {
89+
final CountDownLatch latch = new CountDownLatch(1);
90+
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
91+
92+
int numDocs = randomIntBetween(10, 100);
93+
94+
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
95+
//let's make sure that this bulk won't be automatically flushed
96+
.setConcurrentRequests(randomIntBetween(0, 10)).setBulkActions(numDocs + randomIntBetween(1, 100))
97+
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {
98+
99+
MultiGetRequest multiGetRequest = indexDocs(processor, numDocs);
100+
101+
assertThat(latch.await(randomInt(500), TimeUnit.MILLISECONDS), equalTo(false));
102+
//we really need an explicit flush as none of the bulk thresholds was reached
103+
processor.flush();
104+
latch.await();
105+
106+
assertThat(listener.beforeCounts.get(), equalTo(1));
107+
assertThat(listener.afterCounts.get(), equalTo(1));
108+
assertThat(listener.bulkFailures.size(), equalTo(0));
109+
assertResponseItems(listener.bulkItems, numDocs);
110+
assertMultiGetResponse(highLevelClient().multiGet(multiGetRequest), numDocs);
111+
}
112+
}
113+
114+
public void testBulkProcessorConcurrentRequests() throws Exception {
115+
int bulkActions = randomIntBetween(10, 100);
116+
int numDocs = randomIntBetween(bulkActions, bulkActions + 100);
117+
int concurrentRequests = randomIntBetween(0, 7);
118+
119+
int expectedBulkActions = numDocs / bulkActions;
120+
121+
final CountDownLatch latch = new CountDownLatch(expectedBulkActions);
122+
int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1;
123+
final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions);
124+
125+
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch);
126+
127+
MultiGetRequest multiGetRequest;
128+
129+
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
130+
.setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions)
131+
//set interval and size to high values
132+
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {
133+
134+
multiGetRequest = indexDocs(processor, numDocs);
135+
136+
latch.await();
137+
138+
assertThat(listener.beforeCounts.get(), equalTo(expectedBulkActions));
139+
assertThat(listener.afterCounts.get(), equalTo(expectedBulkActions));
140+
assertThat(listener.bulkFailures.size(), equalTo(0));
141+
assertThat(listener.bulkItems.size(), equalTo(numDocs - numDocs % bulkActions));
142+
}
143+
144+
closeLatch.await();
145+
146+
assertThat(listener.beforeCounts.get(), equalTo(totalExpectedBulkActions));
147+
assertThat(listener.afterCounts.get(), equalTo(totalExpectedBulkActions));
148+
assertThat(listener.bulkFailures.size(), equalTo(0));
149+
assertThat(listener.bulkItems.size(), equalTo(numDocs));
150+
151+
Set<String> ids = new HashSet<>();
152+
for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
153+
assertThat(bulkItemResponse.getFailureMessage(), bulkItemResponse.isFailed(), equalTo(false));
154+
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
155+
assertThat(bulkItemResponse.getType(), equalTo("test"));
156+
//with concurrent requests > 1 we can't rely on the order of the bulk requests
157+
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(numDocs)));
158+
//we do want to check that we don't get duplicate ids back
159+
assertThat(ids.add(bulkItemResponse.getId()), equalTo(true));
160+
}
161+
162+
assertMultiGetResponse(highLevelClient().multiGet(multiGetRequest), numDocs);
163+
}
164+
165+
public void testBulkProcessorWaitOnClose() throws Exception {
166+
BulkProcessorTestListener listener = new BulkProcessorTestListener();
167+
168+
int numDocs = randomIntBetween(10, 100);
169+
BulkProcessor processor = initBulkProcessorBuilder(listener)
170+
//let's make sure that the bulk action limit trips, one single execution will index all the documents
171+
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
172+
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(randomIntBetween(1, 10),
173+
RandomPicks.randomFrom(random(), ByteSizeUnit.values())))
174+
.build();
175+
176+
MultiGetRequest multiGetRequest = indexDocs(processor, numDocs);
177+
assertThat(processor.awaitClose(1, TimeUnit.MINUTES), is(true));
178+
if (randomBoolean()) { // check if we can call it multiple times
179+
if (randomBoolean()) {
180+
assertThat(processor.awaitClose(1, TimeUnit.MINUTES), is(true));
181+
} else {
182+
processor.close();
183+
}
184+
}
185+
186+
assertThat(listener.beforeCounts.get(), greaterThanOrEqualTo(1));
187+
assertThat(listener.afterCounts.get(), greaterThanOrEqualTo(1));
188+
for (Throwable bulkFailure : listener.bulkFailures) {
189+
logger.error("bulk failure", bulkFailure);
190+
}
191+
assertThat(listener.bulkFailures.size(), equalTo(0));
192+
assertResponseItems(listener.bulkItems, numDocs);
193+
assertMultiGetResponse(highLevelClient().multiGet(multiGetRequest), numDocs);
194+
}
195+
196+
public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception {
197+
198+
String createIndexBody = "{\n" +
199+
" \"settings\" : {\n" +
200+
" \"index\" : {\n" +
201+
" \"blocks.write\" : true\n" +
202+
" }\n" +
203+
" }\n" +
204+
" \n" +
205+
"}";
206+
207+
NStringEntity entity = new NStringEntity(createIndexBody, ContentType.APPLICATION_JSON);
208+
Response response = client().performRequest("PUT", "/test-ro", Collections.emptyMap(), entity);
209+
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
210+
211+
int bulkActions = randomIntBetween(10, 100);
212+
int numDocs = randomIntBetween(bulkActions, bulkActions + 100);
213+
int concurrentRequests = randomIntBetween(0, 10);
214+
215+
int expectedBulkActions = numDocs / bulkActions;
216+
217+
final CountDownLatch latch = new CountDownLatch(expectedBulkActions);
218+
int totalExpectedBulkActions = numDocs % bulkActions == 0 ? expectedBulkActions : expectedBulkActions + 1;
219+
final CountDownLatch closeLatch = new CountDownLatch(totalExpectedBulkActions);
220+
221+
int testDocs = 0;
222+
int testReadOnlyDocs = 0;
223+
MultiGetRequest multiGetRequest = new MultiGetRequest();
224+
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch);
225+
226+
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
227+
.setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions)
228+
//set interval and size to high values
229+
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {
230+
231+
for (int i = 1; i <= numDocs; i++) {
232+
if (randomBoolean()) {
233+
testDocs++;
234+
processor.add(new IndexRequest("test", "test", Integer.toString(testDocs))
235+
.source(XContentType.JSON, "field", "value"));
236+
multiGetRequest.add("test", "test", Integer.toString(testDocs));
237+
} else {
238+
testReadOnlyDocs++;
239+
processor.add(new IndexRequest("test-ro", "test", Integer.toString(testReadOnlyDocs))
240+
.source(XContentType.JSON, "field", "value"));
241+
}
242+
}
243+
}
244+
245+
closeLatch.await();
246+
247+
assertThat(listener.beforeCounts.get(), equalTo(totalExpectedBulkActions));
248+
assertThat(listener.afterCounts.get(), equalTo(totalExpectedBulkActions));
249+
assertThat(listener.bulkFailures.size(), equalTo(0));
250+
assertThat(listener.bulkItems.size(), equalTo(testDocs + testReadOnlyDocs));
251+
252+
Set<String> ids = new HashSet<>();
253+
Set<String> readOnlyIds = new HashSet<>();
254+
for (BulkItemResponse bulkItemResponse : listener.bulkItems) {
255+
assertThat(bulkItemResponse.getIndex(), either(equalTo("test")).or(equalTo("test-ro")));
256+
assertThat(bulkItemResponse.getType(), equalTo("test"));
257+
if (bulkItemResponse.getIndex().equals("test")) {
258+
assertThat(bulkItemResponse.isFailed(), equalTo(false));
259+
//with concurrent requests > 1 we can't rely on the order of the bulk requests
260+
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testDocs)));
261+
//we do want to check that we don't get duplicate ids back
262+
assertThat(ids.add(bulkItemResponse.getId()), equalTo(true));
263+
} else {
264+
assertThat(bulkItemResponse.isFailed(), equalTo(true));
265+
//with concurrent requests > 1 we can't rely on the order of the bulk requests
266+
assertThat(Integer.valueOf(bulkItemResponse.getId()), both(greaterThan(0)).and(lessThanOrEqualTo(testReadOnlyDocs)));
267+
//we do want to check that we don't get duplicate ids back
268+
assertThat(readOnlyIds.add(bulkItemResponse.getId()), equalTo(true));
269+
}
270+
}
271+
272+
assertMultiGetResponse(highLevelClient().multiGet(multiGetRequest), testDocs);
273+
}
274+
275+
private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
276+
MultiGetRequest multiGetRequest = new MultiGetRequest();
277+
for (int i = 1; i <= numDocs; i++) {
278+
if (randomBoolean()) {
279+
processor.add(new IndexRequest("test", "test", Integer.toString(i))
280+
.source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
281+
} else {
282+
final String source = "{ \"index\":{\"_index\":\"test\",\"_type\":\"test\",\"_id\":\"" + Integer.toString(i) + "\"} }\n"
283+
+ Strings.toString(JsonXContent.contentBuilder()
284+
.startObject().field("field", randomRealisticUnicodeOfLengthBetween(1, 30)).endObject()) + "\n";
285+
processor.add(new BytesArray(source), null, null, XContentType.JSON);
286+
}
287+
multiGetRequest.add("test", "test", Integer.toString(i));
288+
}
289+
return multiGetRequest;
290+
}
291+
292+
private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs) {
293+
assertThat(bulkItemResponses.size(), is(numDocs));
294+
int i = 1;
295+
for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
296+
assertThat(bulkItemResponse.getIndex(), equalTo("test"));
297+
assertThat(bulkItemResponse.getType(), equalTo("test"));
298+
assertThat(bulkItemResponse.getId(), equalTo(Integer.toString(i++)));
299+
assertThat("item " + i + " failed with cause: " + bulkItemResponse.getFailureMessage(),
300+
bulkItemResponse.isFailed(), equalTo(false));
301+
}
302+
}
303+
304+
private static void assertMultiGetResponse(MultiGetResponse multiGetResponse, int numDocs) {
305+
assertThat(multiGetResponse.getResponses().length, equalTo(numDocs));
306+
int i = 1;
307+
for (MultiGetItemResponse multiGetItemResponse : multiGetResponse) {
308+
assertThat(multiGetItemResponse.getIndex(), equalTo("test"));
309+
assertThat(multiGetItemResponse.getType(), equalTo("test"));
310+
assertThat(multiGetItemResponse.getId(), equalTo(Integer.toString(i++)));
311+
}
312+
}
313+
314+
private static class BulkProcessorTestListener implements BulkProcessor.Listener {
315+
316+
private final CountDownLatch[] latches;
317+
private final AtomicInteger beforeCounts = new AtomicInteger();
318+
private final AtomicInteger afterCounts = new AtomicInteger();
319+
private final List<BulkItemResponse> bulkItems = new CopyOnWriteArrayList<>();
320+
private final List<Throwable> bulkFailures = new CopyOnWriteArrayList<>();
321+
322+
private BulkProcessorTestListener(CountDownLatch... latches) {
323+
this.latches = latches;
324+
}
325+
326+
@Override
327+
public void beforeBulk(long executionId, BulkRequest request) {
328+
beforeCounts.incrementAndGet();
329+
}
330+
331+
@Override
332+
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
333+
bulkItems.addAll(Arrays.asList(response.getItems()));
334+
afterCounts.incrementAndGet();
335+
for (CountDownLatch latch : latches) {
336+
latch.countDown();
337+
}
338+
}
339+
340+
@Override
341+
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
342+
bulkFailures.add(failure);
343+
afterCounts.incrementAndGet();
344+
for (CountDownLatch latch : latches) {
345+
latch.countDown();
346+
}
347+
}
348+
}
349+
350+
}

server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,6 @@ public void close() {
213213
} catch (InterruptedException exc) {
214214
Thread.currentThread().interrupt();
215215
}
216-
onClose.run();
217216
}
218217

219218
/**
@@ -239,7 +238,11 @@ public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws Inter
239238
if (bulkRequest.numberOfActions() > 0) {
240239
execute();
241240
}
242-
return this.bulkRequestHandler.awaitClose(timeout, unit);
241+
try {
242+
return this.bulkRequestHandler.awaitClose(timeout, unit);
243+
} finally {
244+
onClose.run();
245+
}
243246
}
244247

245248
/**

0 commit comments

Comments
 (0)