From 811f5cb090d73489aedad42fc0300879d42eadbc Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 4 Apr 2019 18:43:20 +0200 Subject: [PATCH 1/2] Handle Bulk Requests on Generic Threadpool * Bulk requests can be thousands of items large and take more than O(10ms) time to handle => we should not handle them on the transport threadpool to not block select loops * relates #39128 * closes #39658 --- .../action/bulk/TransportBulkAction.java | 5 +++-- .../action/support/HandledTransportAction.java | 11 ++++++++--- ...portBulkActionIndicesThatCannotBeCreatedTests.java | 9 ++++++++- .../action/bulk/TransportBulkActionIngestTests.java | 10 +++++++++- 4 files changed, 28 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 9adc92e02bedb..7ccc2e1f3ea1a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -117,7 +117,7 @@ public TransportBulkAction(ThreadPool threadPool, TransportService transportServ TransportShardBulkAction shardBulkAction, NodeClient client, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, AutoCreateIndex autoCreateIndex, LongSupplier relativeTimeProvider) { - super(BulkAction.NAME, transportService, actionFilters, (Supplier) BulkRequest::new); + super(BulkAction.NAME, transportService, actionFilters, (Supplier) BulkRequest::new, ThreadPool.Names.GENERIC); Objects.requireNonNull(relativeTimeProvider); this.threadPool = threadPool; this.clusterService = clusterService; @@ -258,7 +258,8 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated)); } } diff --git a/server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java b/server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java index c0bc0af839967..ca10583ce248a 100644 --- a/server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.action.support; -import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.io.stream.Writeable; @@ -57,6 +56,13 @@ protected HandledTransportAction(String actionName, boolean canTripCircuitBreake new TransportHandler()); } + protected HandledTransportAction(String actionName, TransportService transportService, ActionFilters actionFilters, + Supplier request, String executor) { + super(actionName, actionFilters, transportService.getTaskManager()); + transportService.registerRequestHandler(actionName, request, executor, false, true, + new TransportHandler()); + } + protected HandledTransportAction(String actionName, boolean canTripCircuitBreaker, TransportService transportService, ActionFilters actionFilters, Writeable.Reader requestReader) { @@ -73,9 +79,8 @@ protected HandledTransportAction(String actionName, boolean canTripCircuitBreake class TransportHandler implements TransportRequestHandler { @Override - public final void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception { + public final void messageReceived(final Request request, final TransportChannel channel, Task task) { // We already got the task created on the network layer - no need to create it again on the transport layer - Logger logger = HandledTransportAction.this.logger; execute(task, request, new ChannelActionListener<>(channel, actionName, request)); } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index 10014c6fb3f56..f213b523fbfaf 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -30,20 +30,24 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.VersionType; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.util.Arrays; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.function.Function; import static java.util.Collections.emptySet; import static java.util.Collections.singleton; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -102,7 +106,10 @@ private void indicesThatCannotBeCreatedTestCase(Set expected, ClusterState state = mock(ClusterState.class); when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA); when(clusterService.state()).thenReturn(state); - TransportBulkAction action = new TransportBulkAction(null, mock(TransportService.class), clusterService, + final ThreadPool threadPool = mock(ThreadPool.class); + final ExecutorService direct = EsExecutors.newDirectExecutorService(); + when(threadPool.executor(anyString())).thenReturn(direct); + TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService, null, null, null, mock(ActionFilters.class), null, null) { @Override void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener listener, diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index b3ecc59076759..b570ec8f781a6 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -45,11 +45,13 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.junit.Before; @@ -61,6 +63,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -68,6 +71,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -92,6 +96,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { TransportService transportService; ClusterService clusterService; IngestService ingestService; + ThreadPool threadPool; /** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */ @Captor @@ -126,7 +131,7 @@ class TestTransportBulkAction extends TransportBulkAction { boolean indexCreated = true; // If set to false, will be set to true by call to createIndex TestTransportBulkAction() { - super(null, transportService, clusterService, ingestService, + super(threadPool, transportService, clusterService, ingestService, null, null, new ActionFilters(Collections.emptySet()), null, new AutoCreateIndex( SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), @@ -163,6 +168,9 @@ class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction Date: Fri, 12 Apr 2019 16:15:58 +0200 Subject: [PATCH 2/2] CR: move bulk request initial processing to WRITE pool --- .../action/bulk/TransportBulkAction.java | 4 ++-- .../action/bulk/BulkProcessorRetryIT.java | 12 +++++++++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index dbca38ed43087..eae849407666e 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -117,7 +117,7 @@ public TransportBulkAction(ThreadPool threadPool, TransportService transportServ TransportShardBulkAction shardBulkAction, NodeClient client, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, AutoCreateIndex autoCreateIndex, LongSupplier relativeTimeProvider) { - super(BulkAction.NAME, transportService, actionFilters, (Supplier) BulkRequest::new, ThreadPool.Names.GENERIC); + super(BulkAction.NAME, transportService, actionFilters, (Supplier) BulkRequest::new, ThreadPool.Names.WRITE); Objects.requireNonNull(relativeTimeProvider); this.threadPool = threadPool; this.clusterService = clusterService; @@ -258,7 +258,7 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated)); } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java index 515c539a884fc..e4b6fff9fc353 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.transport.RemoteTransportException; import java.util.Collections; import java.util.Iterator; @@ -133,9 +134,14 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) } } } else { - Throwable t = (Throwable) response; - // we're not expecting any other errors - throw new AssertionError("Unexpected failure", t); + if (response instanceof RemoteTransportException + && ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS && rejectedExecutionExpected) { + // ignored, we exceeded the write queue size with dispatching the initial bulk request + } else { + Throwable t = (Throwable) response; + // we're not expecting any other errors + throw new AssertionError("Unexpected failure", t); + } } }