Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public TransportBulkAction(ThreadPool threadPool, TransportService transportServ
TransportShardBulkAction shardBulkAction, NodeClient client,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex, LongSupplier relativeTimeProvider) {
super(BulkAction.NAME, transportService, actionFilters, (Supplier<BulkRequest>) BulkRequest::new);
super(BulkAction.NAME, transportService, actionFilters, (Supplier<BulkRequest>) BulkRequest::new, ThreadPool.Names.WRITE);
Objects.requireNonNull(relativeTimeProvider);
this.threadPool = threadPool;
this.clusterService = clusterService;
Expand Down Expand Up @@ -258,7 +258,8 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
threadPool.executor(ThreadPool.Names.WRITE).execute(
() -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.elasticsearch.action.support;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -57,6 +56,13 @@ protected HandledTransportAction(String actionName, boolean canTripCircuitBreake
new TransportHandler());
}

protected HandledTransportAction(String actionName, TransportService transportService, ActionFilters actionFilters,
Supplier<Request> request, String executor) {
super(actionName, actionFilters, transportService.getTaskManager());
transportService.registerRequestHandler(actionName, request, executor, false, true,
new TransportHandler());
}

protected HandledTransportAction(String actionName, boolean canTripCircuitBreaker,
TransportService transportService, ActionFilters actionFilters,
Writeable.Reader<Request> requestReader) {
Expand All @@ -73,9 +79,8 @@ protected HandledTransportAction(String actionName, boolean canTripCircuitBreake

class TransportHandler implements TransportRequestHandler<Request> {
@Override
public final void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
public final void messageReceived(final Request request, final TransportChannel channel, Task task) {
// We already got the task created on the network layer - no need to create it again on the transport layer
Logger logger = HandledTransportAction.this.logger;
execute(task, request, new ChannelActionListener<>(channel, actionName, request));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.RemoteTransportException;

import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -133,9 +134,14 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}
}
} else {
Throwable t = (Throwable) response;
// we're not expecting any other errors
throw new AssertionError("Unexpected failure", t);
if (response instanceof RemoteTransportException
&& ((RemoteTransportException) response).status() == RestStatus.TOO_MANY_REQUESTS && rejectedExecutionExpected) {
// ignored, we exceeded the write queue size with dispatching the initial bulk request
} else {
Throwable t = (Throwable) response;
// we're not expecting any other errors
throw new AssertionError("Unexpected failure", t);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,24 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;

import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -102,7 +106,10 @@ private void indicesThatCannotBeCreatedTestCase(Set<String> expected,
ClusterState state = mock(ClusterState.class);
when(state.getMetaData()).thenReturn(MetaData.EMPTY_META_DATA);
when(clusterService.state()).thenReturn(state);
TransportBulkAction action = new TransportBulkAction(null, mock(TransportService.class), clusterService,
final ThreadPool threadPool = mock(ThreadPool.class);
final ExecutorService direct = EsExecutors.newDirectExecutorService();
when(threadPool.executor(anyString())).thenReturn(direct);
TransportBulkAction action = new TransportBulkAction(threadPool, mock(TransportService.class), clusterService,
null, null, null, mock(ActionFilters.class), null, null) {
@Override
void executeBulk(Task task, BulkRequest bulkRequest, long startTimeNanos, ActionListener<BulkResponse> listener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
Expand All @@ -61,13 +63,15 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
Expand All @@ -92,6 +96,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
TransportService transportService;
ClusterService clusterService;
IngestService ingestService;
ThreadPool threadPool;

/** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */
@Captor
Expand Down Expand Up @@ -126,7 +131,7 @@ class TestTransportBulkAction extends TransportBulkAction {
boolean indexCreated = true; // If set to false, will be set to true by call to createIndex

TestTransportBulkAction() {
super(null, transportService, clusterService, ingestService,
super(threadPool, transportService, clusterService, ingestService,
null, null, new ActionFilters(Collections.emptySet()), null,
new AutoCreateIndex(
SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
Expand Down Expand Up @@ -163,6 +168,9 @@ class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction<I
@Before
public void setupAction() {
// initialize captors, which must be members to use @Capture because of generics
threadPool = mock(ThreadPool.class);
final ExecutorService direct = EsExecutors.newDirectExecutorService();
when(threadPool.executor(anyString())).thenReturn(direct);
MockitoAnnotations.initMocks(this);
// setup services that will be called by action
transportService = mock(TransportService.class);
Expand Down