From dd60098fdf6ca61118cdefea2ad8e9ca119ee214 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 3 Apr 2019 08:56:37 +0100 Subject: [PATCH] Inline TransportReplAction#registerRequestHandlers It is important that resync actions are not rejected on the primary even if its `write` threadpool is overloaded. Today we do this by exposing `registerRequestHandlers` to subclasses and overriding it in `TransportResyncReplicationAction`. This isn't ideal because it obscures the difference between this action and other replication actions, and also might allow subclasses to try and use some state before they are properly initialised. This change replaces this override with a constructor parameter to solve these issues. Relates #40706 --- .../action/bulk/TransportShardBulkAction.java | 2 +- .../TransportResyncReplicationAction.java | 19 ++------------- .../TransportReplicationAction.java | 24 +++++++++---------- .../replication/TransportWriteAction.java | 9 +++---- .../index/seqno/RetentionLeaseSyncAction.java | 2 +- .../TransportWriteActionTests.java | 4 ++-- .../TransportBulkShardOperationsAction.java | 2 +- 7 files changed, 23 insertions(+), 39 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index f182c2985815d..325820532be6e 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -92,7 +92,7 @@ public TransportShardBulkAction(Settings settings, TransportService transportSer MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE); + indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false); this.threadPool = threadPool; this.updateHelper = updateHelper; this.mappingUpdatedAction = mappingUpdatedAction; diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 80a55e8ab2d41..3bf9abecf435c 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -47,7 +47,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.function.Supplier; public class TransportResyncReplicationAction extends TransportWriteAction implements PrimaryReplicaSyncer.SyncAction { @@ -60,22 +59,8 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran ShardStateAction shardStateAction, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE); - } - - @Override - protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, - Supplier replicaRequest, String executor) { - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); - // we should never reject resync because of thread pool capacity on primary - transportService.registerRequestHandler(transportPrimaryAction, - () -> new ConcreteShardRequest<>(request), - executor, true, true, - this::handlePrimaryRequest); - transportService.registerRequestHandler(transportReplicaAction, - () -> new ConcreteReplicaRequest<>(replicaRequest), - executor, true, true, - this::handleReplicaRequest); + indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE, + true /* we should never reject resync because of thread pool capacity on primary */); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index ef294b52803de..42745aa0e15dc 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -122,7 +122,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, Supplier replicaRequest, String executor) { this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, request, replicaRequest, executor, false); + indexNameExpressionResolver, request, replicaRequest, executor, false, false); } @@ -132,7 +132,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, Supplier replicaRequest, String executor, - boolean syncGlobalCheckpointAfterOperation) { + boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary) { super(actionName, actionFilters, transportService.getTaskManager()); this.threadPool = threadPool; this.transportService = transportService; @@ -144,21 +144,19 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans this.transportPrimaryAction = actionName + "[p]"; this.transportReplicaAction = actionName + "[r]"; - registerRequestHandlers(actionName, transportService, request, replicaRequest, executor); - this.transportOptions = transportOptions(settings); + transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); - this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation; - } + transportService.registerRequestHandler(transportPrimaryAction, + () -> new ConcreteShardRequest<>(request), executor, forceExecutionOnPrimary, true, this::handlePrimaryRequest); - protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier request, - Supplier replicaRequest, String executor) { - transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest); - transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor, - this::handlePrimaryRequest); // we must never reject on because of thread pool capacity on replicas - transportService.registerRequestHandler( - transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest); + transportService.registerRequestHandler(transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), + executor, true, true, this::handleReplicaRequest); + + this.transportOptions = transportOptions(settings); + + this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation; } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 4781682437545..619beab57932c 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -60,11 +60,12 @@ public abstract class TransportWriteAction< > extends TransportReplicationAction { protected TransportWriteAction(Settings settings, String actionName, TransportService transportService, - ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, - Supplier replicaRequest, String executor) { + ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, + ShardStateAction shardStateAction, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, Supplier request, + Supplier replicaRequest, String executor, boolean forceExecutionOnPrimary) { super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, - indexNameExpressionResolver, request, replicaRequest, executor, true); + indexNameExpressionResolver, request, replicaRequest, executor, true, forceExecutionOnPrimary); } /** Syncs operation result to the translog or throws a shard not available failure */ diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index d4845d92a3a6f..26eb32a9f1860 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -88,7 +88,7 @@ public RetentionLeaseSyncAction( indexNameExpressionResolver, RetentionLeaseSyncAction.Request::new, RetentionLeaseSyncAction.Request::new, - ThreadPool.Names.MANAGEMENT); + ThreadPool.Names.MANAGEMENT, false); } /** diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index f540374a56c20..3c2df0b59b24e 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -402,7 +402,7 @@ protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentF new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null, Collections.emptySet()), null, null, null, null, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), TestRequest::new, - TestRequest::new, ThreadPool.Names.SAME); + TestRequest::new, ThreadPool.Names.SAME, false); this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary; this.withDocumentFailureOnReplica = withDocumentFailureOnReplica; } @@ -412,7 +412,7 @@ protected TestAction(Settings settings, String actionName, TransportService tran super(settings, actionName, transportService, clusterService, mockIndicesService(clusterService), threadPool, shardStateAction, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), - TestRequest::new, TestRequest::new, ThreadPool.Names.SAME); + TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false); this.withDocumentFailureOnPrimary = false; this.withDocumentFailureOnReplica = false; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 7c3b2da32e88f..74e9ceda67026 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -57,7 +57,7 @@ public TransportBulkShardOperationsAction( indexNameExpressionResolver, BulkShardOperationsRequest::new, BulkShardOperationsRequest::new, - ThreadPool.Names.WRITE); + ThreadPool.Names.WRITE, false); } @Override