Skip to content

Commit 2931e9c

Browse files
Handle Bulk Requests on Write Threadpool (#40866)
* Bulk requests can be thousands of items large and take more than O(10ms) time to handle => we should not handle them on the transport threadpool to not block select loops * relates #39128 * relates #39658
1 parent 1f51f20 commit 2931e9c

File tree

5 files changed

+37
-10
lines changed

5 files changed

+37
-10
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public TransportBulkAction(ThreadPool threadPool, TransportService transportServ
117117
TransportShardBulkAction shardBulkAction, NodeClient client,
118118
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
119119
AutoCreateIndex autoCreateIndex, LongSupplier relativeTimeProvider) {
120-
super(BulkAction.NAME, transportService, actionFilters, (Supplier<BulkRequest>) BulkRequest::new);
120+
super(BulkAction.NAME, transportService, actionFilters, (Supplier<BulkRequest>) BulkRequest::new, ThreadPool.Names.WRITE);
121121
Objects.requireNonNull(relativeTimeProvider);
122122
this.threadPool = threadPool;
123123
this.clusterService = clusterService;
@@ -258,7 +258,8 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
258258
@Override
259259
public void onResponse(CreateIndexResponse result) {
260260
if (counter.decrementAndGet() == 0) {
261-
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
261+
threadPool.executor(ThreadPool.Names.WRITE).execute(
262+
() -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated));
262263
}
263264
}
264265

server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.elasticsearch.action.support;
2020

21-
import org.apache.logging.log4j.Logger;
2221
import org.elasticsearch.action.ActionRequest;
2322
import org.elasticsearch.action.ActionResponse;
2423
import org.elasticsearch.common.io.stream.Writeable;
@@ -57,6 +56,13 @@ protected HandledTransportAction(String actionName, boolean canTripCircuitBreake
5756
new TransportHandler());
5857
}
5958

59+
protected HandledTransportAction(String actionName, TransportService transportService, ActionFilters actionFilters,
60+
Supplier<Request> request, String executor) {
61+
super(actionName, actionFilters, transportService.getTaskManager());
62+
transportService.registerRequestHandler(actionName, request, executor, false, true,
63+
new TransportHandler());
64+
}
65+
6066
protected HandledTransportAction(String actionName, boolean canTripCircuitBreaker,
6167
TransportService transportService, ActionFilters actionFilters,
6268
Writeable.Reader<Request> requestReader) {
@@ -73,9 +79,8 @@ protected HandledTransportAction(String actionName, boolean canTripCircuitBreake
7379

7480
class TransportHandler implements TransportRequestHandler<Request> {
7581
@Override
76-
public final void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
82+
public final void messageReceived(final Request request, final TransportChannel channel, Task task) {
7783
// We already got the task created on the network layer - no need to create it again on the transport layer
78-
Logger logger = HandledTransportAction.this.logger;
7984
execute(task, request, new ChannelActionListener<>(channel, actionName, request));
8085
}
8186
}

server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.index.query.QueryBuilders;
2626
import org.elasticsearch.rest.RestStatus;
2727
import org.elasticsearch.test.ESIntegTestCase;
28+
import org.elasticsearch.transport.RemoteTransportException;
2829

2930
import java.util.Collections;
3031
import java.util.Iterator;
@@ -133,9 +134,14 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
133134
}
134135
}
135136
} else {
136-
Throwable t = (Throwable) response;
137-
// we're not expecting any other errors
138-
throw new AssertionError("Unexpected failure", t);
137+
if (response instanceof RemoteTransportException
138+
&& ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS && rejectedExecutionExpected) {
139+
// ignored, we exceeded the write queue size with dispatching the initial bulk request
140+
} else {
141+
Throwable t = (Throwable) response;
142+
// we're not expecting any other errors
143+
throw new AssertionError("Unexpected failure", t);
144+
}
139145
}
140146
}
141147

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,24 @@
3030
import org.elasticsearch.cluster.service.ClusterService;
3131
import org.elasticsearch.common.unit.TimeValue;
3232
import org.elasticsearch.common.util.concurrent.AtomicArray;
33+
import org.elasticsearch.common.util.concurrent.EsExecutors;
3334
import org.elasticsearch.index.IndexNotFoundException;
3435
import org.elasticsearch.index.VersionType;
3536
import org.elasticsearch.tasks.Task;
3637
import org.elasticsearch.test.ESTestCase;
38+
import org.elasticsearch.threadpool.ThreadPool;
3739
import org.elasticsearch.transport.TransportService;
3840

3941
import java.util.Arrays;
4042
import java.util.HashSet;
4143
import java.util.Map;
4244
import java.util.Set;
45+
import java.util.concurrent.ExecutorService;
4346
import java.util.function.Function;
4447

4548
import static java.util.Collections.emptySet;
4649
import static java.util.Collections.singleton;
50+
import static org.mockito.Matchers.anyString;
4751
import static org.mockito.Mockito.mock;
4852
import static org.mockito.Mockito.when;
4953

@@ -102,7 +106,10 @@ private void indicesThatCannotBeCreatedTestCase(Set<String> expected,
102106
ClusterState state = mock(ClusterState.class);
103107
when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
104108
when(clusterService.state()).thenReturn(state);
105-
TransportBulkAction action = new TransportBulkAction(null, mock(TransportService.class), clusterService,
109+
final ThreadPool threadPool = mock(ThreadPool.class);
110+
final ExecutorService direct = EsExecutors.newDirectExecutorService();
111+
when(threadPool.executor(anyString())).thenReturn(direct);
112+
TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService,
106113
null, null, null, mock(ActionFilters.class), null, null) {
107114
@Override
108115
void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener<BulkResponse> listener,

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,13 @@
4545
import org.elasticsearch.common.settings.Settings;
4646
import org.elasticsearch.common.unit.TimeValue;
4747
import org.elasticsearch.common.util.concurrent.AtomicArray;
48+
import org.elasticsearch.common.util.concurrent.EsExecutors;
4849
import org.elasticsearch.index.IndexNotFoundException;
4950
import org.elasticsearch.index.IndexSettings;
5051
import org.elasticsearch.ingest.IngestService;
5152
import org.elasticsearch.tasks.Task;
5253
import org.elasticsearch.test.ESTestCase;
54+
import org.elasticsearch.threadpool.ThreadPool;
5355
import org.elasticsearch.transport.TransportResponseHandler;
5456
import org.elasticsearch.transport.TransportService;
5557
import org.junit.Before;
@@ -61,13 +63,15 @@
6163
import java.util.Collections;
6264
import java.util.Iterator;
6365
import java.util.Map;
66+
import java.util.concurrent.ExecutorService;
6467
import java.util.concurrent.atomic.AtomicBoolean;
6568
import java.util.function.BiConsumer;
6669
import java.util.function.Consumer;
6770

6871
import static org.hamcrest.Matchers.containsString;
6972
import static org.hamcrest.Matchers.sameInstance;
7073
import static org.mockito.Matchers.any;
74+
import static org.mockito.Matchers.anyString;
7175
import static org.mockito.Matchers.eq;
7276
import static org.mockito.Mockito.doAnswer;
7377
import static org.mockito.Mockito.mock;
@@ -92,6 +96,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
9296
TransportService transportService;
9397
ClusterService clusterService;
9498
IngestService ingestService;
99+
ThreadPool threadPool;
95100

96101
/** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */
97102
@Captor
@@ -126,7 +131,7 @@ class TestTransportBulkAction extends TransportBulkAction {
126131
boolean indexCreated = true; // If set to false, will be set to true by call to createIndex
127132

128133
TestTransportBulkAction() {
129-
super(null, transportService, clusterService, ingestService,
134+
super(threadPool, transportService, clusterService, ingestService,
130135
null, null, new ActionFilters(Collections.emptySet()), null,
131136
new AutoCreateIndex(
132137
SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
@@ -163,6 +168,9 @@ class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction<I
163168
@Before
164169
public void setupAction() {
165170
// initialize captors, which must be members to use @Capture because of generics
171+
threadPool = mock(ThreadPool.class);
172+
final ExecutorService direct = EsExecutors.newDirectExecutorService();
173+
when(threadPool.executor(anyString())).thenReturn(direct);
166174
MockitoAnnotations.initMocks(this);
167175
// setup services that will be called by action
168176
transportService = mock(TransportService.class);

0 commit comments

Comments
 (0)