Skip to content

Commit c47f24d

Browse files
authored
BulkProcessor flush runnable preserves the thread context from creation time (#26718)
When using a bulk processor, the thread context was not preserved for the flush runnable which is executed in another thread in the thread pool. This change wraps the flush runnable in a context preserving runnable so that the headers and transients from the creation time of the bulk processor are available during the execution of the flush. Closes #26596
1 parent b9c0d44 commit c47f24d

File tree

2 files changed

+102
-1
lines changed

2 files changed

+102
-1
lines changed

core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,8 @@ public boolean isCancelled() {
302302
};
303303
}
304304

305-
return threadPool.scheduleWithFixedDelay(new Flush(), flushInterval, ThreadPool.Names.GENERIC);
305+
final Runnable flushRunnable = threadPool.getThreadContext().preserveContext(new Flush());
306+
return threadPool.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
306307
}
307308

308309
private void executeIfNeeded() {
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.bulk;
21+
22+
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.action.index.IndexRequest;
24+
import org.elasticsearch.common.unit.ByteSizeUnit;
25+
import org.elasticsearch.common.unit.ByteSizeValue;
26+
import org.elasticsearch.common.unit.TimeValue;
27+
import org.elasticsearch.common.util.concurrent.ThreadContext;
28+
import org.elasticsearch.test.ESTestCase;
29+
import org.elasticsearch.threadpool.TestThreadPool;
30+
import org.elasticsearch.threadpool.ThreadPool;
31+
import org.junit.After;
32+
import org.junit.Before;
33+
34+
import java.util.concurrent.CountDownLatch;
35+
import java.util.function.BiConsumer;
36+
37+
public class BulkProcessorTests extends ESTestCase {
38+
39+
private ThreadPool threadPool;
40+
41+
@Before
42+
public void startThreadPool() {
43+
threadPool = new TestThreadPool("BulkProcessorTests");
44+
}
45+
46+
@After
47+
public void stopThreadPool() throws InterruptedException {
48+
terminate(threadPool);
49+
}
50+
51+
public void testBulkProcessorFlushPreservesContext() throws InterruptedException {
52+
final CountDownLatch latch = new CountDownLatch(1);
53+
final String headerKey = randomAlphaOfLengthBetween(1, 8);
54+
final String transientKey = randomAlphaOfLengthBetween(1, 8);
55+
final String headerValue = randomAlphaOfLengthBetween(1, 32);
56+
final Object transientValue = new Object();
57+
58+
BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {
59+
ThreadContext threadContext = threadPool.getThreadContext();
60+
assertEquals(headerValue, threadContext.getHeader(headerKey));
61+
assertSame(transientValue, threadContext.getTransient(transientKey));
62+
latch.countDown();
63+
};
64+
65+
final int bulkSize = randomIntBetween(2, 32);
66+
final TimeValue flushInterval = TimeValue.timeValueSeconds(1L);
67+
final BulkProcessor bulkProcessor;
68+
assertNull(threadPool.getThreadContext().getHeader(headerKey));
69+
assertNull(threadPool.getThreadContext().getTransient(transientKey));
70+
try (ThreadContext.StoredContext ctx = threadPool.getThreadContext().stashContext()) {
71+
threadPool.getThreadContext().putHeader(headerKey, headerValue);
72+
threadPool.getThreadContext().putTransient(transientKey, transientValue);
73+
bulkProcessor = new BulkProcessor(consumer, BackoffPolicy.noBackoff(), new BulkProcessor.Listener() {
74+
@Override
75+
public void beforeBulk(long executionId, BulkRequest request) {
76+
}
77+
78+
@Override
79+
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
80+
}
81+
82+
@Override
83+
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
84+
}
85+
}, 1, bulkSize, new ByteSizeValue(5, ByteSizeUnit.MB), flushInterval, threadPool);
86+
}
87+
assertNull(threadPool.getThreadContext().getHeader(headerKey));
88+
assertNull(threadPool.getThreadContext().getTransient(transientKey));
89+
90+
// add a single item which won't be over the size or number of items
91+
bulkProcessor.add(new IndexRequest());
92+
93+
// wait for flush to execute
94+
latch.await();
95+
96+
assertNull(threadPool.getThreadContext().getHeader(headerKey));
97+
assertNull(threadPool.getThreadContext().getTransient(transientKey));
98+
bulkProcessor.close();
99+
}
100+
}

0 commit comments

Comments
 (0)