diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index f182c2985815d..325820532be6e 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -92,7 +92,7 @@ public TransportShardBulkAction(Settings settings, TransportService transportSer MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE); + indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false); this.threadPool = threadPool; this.updateHelper = updateHelper; this.mappingUpdatedAction = mappingUpdatedAction; diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 80a55e8ab2d41..3bf9abecf435c 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -47,7 +47,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.function.Supplier; public class TransportResyncReplicationAction extends TransportWriteAction implements PrimaryReplicaSyncer.SyncAction { @@ -60,22 +59,8 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran ShardStateAction shardStateAction, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE); - } - - @Override - protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, - Supplier replicaRequest, String executor) { - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); - // we should never reject resync because of thread pool capacity on primary - transportService.registerRequestHandler(transportPrimaryAction, - () -> new ConcreteShardRequest<>(request), - executor, true, true, - this::handlePrimaryRequest); - transportService.registerRequestHandler(transportReplicaAction, - () -> new ConcreteReplicaRequest<>(replicaRequest), - executor, true, true, - this::handleReplicaRequest); + indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE, + true /* we should never reject resync because of thread pool capacity on primary */); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index ef294b52803de..42745aa0e15dc 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -122,7 +122,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, Supplier replicaRequest, String executor) { this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, request, replicaRequest, executor, false); + indexNameExpressionResolver, request, replicaRequest, executor, false, false); } @@ -132,7 +132,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, Supplier replicaRequest, String executor, - boolean syncGlobalCheckpointAfterOperation) { + boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary) { super(actionName, actionFilters, transportService.getTaskManager()); this.threadPool = threadPool; this.transportService = transportService; @@ -144,21 +144,19 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans this.transportPrimaryAction = actionName + "[p]"; this.transportReplicaAction = actionName + "[r]"; - registerRequestHandlers(actionName, transportService, request, replicaRequest, executor); - this.transportOptions = transportOptions(settings); + transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); - this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation; - } + transportService.registerRequestHandler(transportPrimaryAction, + () -> new ConcreteShardRequest<>(request), executor, forceExecutionOnPrimary, true, this::handlePrimaryRequest); - protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, - Supplier replicaRequest, String executor) { - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); - transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, - this::handlePrimaryRequest); // we must never reject on because of thread pool capacity on replicas - transportService.registerRequestHandler( - transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest); + transportService.registerRequestHandler(transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), + executor, true, true, this::handleReplicaRequest); + + this.transportOptions = transportOptions(settings); + + this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 4781682437545..619beab57932c 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -60,11 +60,12 @@ public abstract class TransportWriteAction< > extends TransportReplicationAction { protected TransportWriteAction(Settings settings, String actionName, TransportService transportService, - ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, - Supplier replicaRequest, String executor) { + ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, + ShardStateAction shardStateAction, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, + Supplier replicaRequest, String executor, boolean forceExecutionOnPrimary) { super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, request, replicaRequest, executor, true); + indexNameExpressionResolver, request, replicaRequest, executor, true, forceExecutionOnPrimary); } /** Syncs operation result to the translog or throws a shard not available failure */ diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index d4845d92a3a6f..26eb32a9f1860 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -88,7 +88,7 @@ public RetentionLeaseSyncAction( indexNameExpressionResolver, RetentionLeaseSyncAction.Request::new, RetentionLeaseSyncAction.Request::new, - ThreadPool.Names.MANAGEMENT); + ThreadPool.Names.MANAGEMENT, false); } /** diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index f540374a56c20..3c2df0b59b24e 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -402,7 +402,7 @@ protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentF new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()), null, null, null, null, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), TestRequest::new, - TestRequest::new, ThreadPool.Names.SAME); + TestRequest::new, ThreadPool.Names.SAME, false); this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary; this.withDocumentFailureOnReplica = withDocumentFailureOnReplica; } @@ -412,7 +412,7 @@ protected TestAction(Settings settings, String actionName, TransportService tran super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool, shardStateAction, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), - TestRequest::new, TestRequest::new, ThreadPool.Names.SAME); + TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false); this.withDocumentFailureOnPrimary = false; this.withDocumentFailureOnReplica = false; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 7c3b2da32e88f..74e9ceda67026 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -57,7 +57,7 @@ public TransportBulkShardOperationsAction( indexNameExpressionResolver, BulkShardOperationsRequest::new, BulkShardOperationsRequest::new, - ThreadPool.Names.WRITE); + ThreadPool.Names.WRITE, false); } @Override