Skip to content

Commit 1d2bc85

Browse files
committed
Inline TransportReplAction#registerRequestHandlers (#40762)
It is important that resync actions are not rejected on the primary even if its `write` threadpool is overloaded. Today we do this by exposing `registerRequestHandlers` to subclasses and overriding it in `TransportResyncReplicationAction`. This isn't ideal because it obscures the difference between this action and other replication actions, and also might allow subclasses to try and use some state before they are properly initialised. This change replaces this override with a constructor parameter to solve these issues. Relates #40706
1 parent 31e79a7 commit 1d2bc85

File tree

7 files changed

+23
-39
lines changed

7 files changed

+23
-39
lines changed

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public TransportShardBulkAction(Settings settings, TransportService transportSer
9292
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper, ActionFilters actionFilters,
9393
IndexNameExpressionResolver indexNameExpressionResolver) {
9494
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
95-
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE);
95+
indexNameExpressionResolver, BulkShardRequest::new, BulkShardRequest::new, ThreadPool.Names.WRITE, false);
9696
this.threadPool = threadPool;
9797
this.updateHelper = updateHelper;
9898
this.mappingUpdatedAction = mappingUpdatedAction;

server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import org.elasticsearch.transport.TransportService;
4949

5050
import java.io.IOException;
51-
import java.util.function.Supplier;
5251

5352
public class TransportResyncReplicationAction extends TransportWriteAction<ResyncReplicationRequest,
5453
ResyncReplicationRequest, ResyncReplicationResponse> implements PrimaryReplicaSyncer.SyncAction {
@@ -61,22 +60,8 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran
6160
ShardStateAction shardStateAction, ActionFilters actionFilters,
6261
IndexNameExpressionResolver indexNameExpressionResolver) {
6362
super(settings, ACTION_NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
64-
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE);
65-
}
66-
67-
@Override
68-
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<ResyncReplicationRequest> request,
69-
Supplier<ResyncReplicationRequest> replicaRequest, String executor) {
70-
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);
71-
// we should never reject resync because of thread pool capacity on primary
72-
transportService.registerRequestHandler(transportPrimaryAction,
73-
() -> new ConcreteShardRequest<>(request),
74-
executor, true, true,
75-
this::handlePrimaryRequest);
76-
transportService.registerRequestHandler(transportReplicaAction,
77-
() -> new ConcreteReplicaRequest<>(replicaRequest),
78-
executor, true, true,
79-
this::handleReplicaRequest);
63+
indexNameExpressionResolver, ResyncReplicationRequest::new, ResyncReplicationRequest::new, ThreadPool.Names.WRITE,
64+
true /* we should never reject resync because of thread pool capacity on primary */);
8065
}
8166

8267
@Override

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
123123
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
124124
Supplier<ReplicaRequest> replicaRequest, String executor) {
125125
this(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
126-
indexNameExpressionResolver, request, replicaRequest, executor, false);
126+
indexNameExpressionResolver, request, replicaRequest, executor, false, false);
127127
}
128128

129129

@@ -133,7 +133,7 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
133133
ActionFilters actionFilters,
134134
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
135135
Supplier<ReplicaRequest> replicaRequest, String executor,
136-
boolean syncGlobalCheckpointAfterOperation) {
136+
boolean syncGlobalCheckpointAfterOperation, boolean forceExecutionOnPrimary) {
137137
super(actionName, actionFilters, transportService.getTaskManager());
138138
this.threadPool = threadPool;
139139
this.transportService = transportService;
@@ -145,21 +145,19 @@ protected TransportReplicationAction(Settings settings, String actionName, Trans
145145

146146
this.transportPrimaryAction = actionName + "[p]";
147147
this.transportReplicaAction = actionName + "[r]";
148-
registerRequestHandlers(actionName, transportService, request, replicaRequest, executor);
149148

150-
this.transportOptions = transportOptions(settings);
149+
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);
151150

152-
this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation;
153-
}
151+
transportService.registerRequestHandler(transportPrimaryAction,
152+
() -> new ConcreteShardRequest<>(request), executor, forceExecutionOnPrimary, true, this::handlePrimaryRequest);
154153

155-
protected void registerRequestHandlers(String actionName, TransportService transportService, Supplier<Request> request,
156-
Supplier<ReplicaRequest> replicaRequest, String executor) {
157-
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, this::handleOperationRequest);
158-
transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
159-
this::handlePrimaryRequest);
160154
// we must never reject on because of thread pool capacity on replicas
161-
transportService.registerRequestHandler(
162-
transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest), executor, true, true, this::handleReplicaRequest);
155+
transportService.registerRequestHandler(transportReplicaAction, () -> new ConcreteReplicaRequest<>(replicaRequest),
156+
executor, true, true, this::handleReplicaRequest);
157+
158+
this.transportOptions = transportOptions(settings);
159+
160+
this.syncGlobalCheckpointAfterOperation = syncGlobalCheckpointAfterOperation;
163161
}
164162

165163
@Override

server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,12 @@ public abstract class TransportWriteAction<
6060
> extends TransportReplicationAction<Request, ReplicaRequest, Response> {
6161

6262
protected TransportWriteAction(Settings settings, String actionName, TransportService transportService,
63-
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
64-
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
65-
Supplier<ReplicaRequest> replicaRequest, String executor) {
63+
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
64+
ShardStateAction shardStateAction, ActionFilters actionFilters,
65+
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
66+
Supplier<ReplicaRequest> replicaRequest, String executor, boolean forceExecutionOnPrimary) {
6667
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
67-
indexNameExpressionResolver, request, replicaRequest, executor, true);
68+
indexNameExpressionResolver, request, replicaRequest, executor, true, forceExecutionOnPrimary);
6869
}
6970

7071
/** Syncs operation result to the translog or throws a shard not available failure */

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public RetentionLeaseSyncAction(
8888
indexNameExpressionResolver,
8989
RetentionLeaseSyncAction.Request::new,
9090
RetentionLeaseSyncAction.Request::new,
91-
ThreadPool.Names.MANAGEMENT);
91+
ThreadPool.Names.MANAGEMENT, false);
9292
}
9393

9494
/**

server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ protected TestAction(boolean withDocumentFailureOnPrimary, boolean withDocumentF
402402
new TransportService(Settings.EMPTY, mock(Transport.class), null, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
403403
x -> null, null, Collections.emptySet()), null, null, null, null,
404404
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), TestRequest::new,
405-
TestRequest::new, ThreadPool.Names.SAME);
405+
TestRequest::new, ThreadPool.Names.SAME, false);
406406
this.withDocumentFailureOnPrimary = withDocumentFailureOnPrimary;
407407
this.withDocumentFailureOnReplica = withDocumentFailureOnReplica;
408408
}
@@ -412,7 +412,7 @@ protected TestAction(Settings settings, String actionName, TransportService tran
412412
super(settings, actionName, transportService, clusterService,
413413
mockIndicesService(clusterService), threadPool, shardStateAction,
414414
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(),
415-
TestRequest::new, TestRequest::new, ThreadPool.Names.SAME);
415+
TestRequest::new, TestRequest::new, ThreadPool.Names.SAME, false);
416416
this.withDocumentFailureOnPrimary = false;
417417
this.withDocumentFailureOnReplica = false;
418418
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public TransportBulkShardOperationsAction(
5757
indexNameExpressionResolver,
5858
BulkShardOperationsRequest::new,
5959
BulkShardOperationsRequest::new,
60-
ThreadPool.Names.WRITE);
60+
ThreadPool.Names.WRITE, false);
6161
}
6262

6363
@Override

0 commit comments

Comments
 (0)