Skip to content

Commit 8f327f3

Browse files
authored
Add dispatching to HandledTransportAction (#38050) (#38066)
This commit allows implementors of the `HandledTransportAction` to specify what thread the action should be executed on. The motivation for this commit is that certain CCR requests should be performed on the generic threadpool.
1 parent 63c57b3 commit 8f327f3

File tree

3 files changed

+33
-34
lines changed

3 files changed

+33
-34
lines changed

server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ protected HandledTransportAction(Settings settings, String actionName, ThreadPoo
4242
this(settings, actionName, true, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
4343
}
4444

45+
protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService,
46+
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
47+
Supplier<Request> request, String executor) {
48+
this(settings, actionName, true, threadPool, transportService, actionFilters, indexNameExpressionResolver, request, executor);
49+
}
50+
4551
protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService,
4652
ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
4753
IndexNameExpressionResolver indexNameExpressionResolver) {
@@ -51,8 +57,15 @@ protected HandledTransportAction(Settings settings, String actionName, ThreadPoo
5157
protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool,
5258
TransportService transportService, ActionFilters actionFilters,
5359
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
60+
this(settings, actionName, canTripCircuitBreaker, threadPool, transportService, actionFilters, indexNameExpressionResolver,
61+
request, ThreadPool.Names.SAME);
62+
}
63+
64+
protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool,
65+
TransportService transportService, ActionFilters actionFilters,
66+
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request, String executor) {
5467
super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
55-
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, false, canTripCircuitBreaker,
68+
transportService.registerRequestHandler(actionName, request, executor, false, canTripCircuitBreaker,
5669
new TransportHandler());
5770
}
5871

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,26 +47,21 @@ public static class TransportDeleteCcrRestoreSessionAction
4747
extends HandledTransportAction<ClearCcrRestoreSessionRequest, ClearCcrRestoreSessionResponse> {
4848

4949
private final CcrRestoreSourceService ccrRestoreService;
50-
private final ThreadPool threadPool;
5150

5251
@Inject
5352
public TransportDeleteCcrRestoreSessionAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
5453
IndexNameExpressionResolver resolver,
5554
TransportService transportService, CcrRestoreSourceService ccrRestoreService) {
56-
super(settings, NAME, threadPool, transportService, actionFilters, resolver, ClearCcrRestoreSessionRequest::new);
55+
super(settings, NAME, threadPool, transportService, actionFilters, resolver, ClearCcrRestoreSessionRequest::new,
56+
ThreadPool.Names.GENERIC);
5757
TransportActionProxy.registerProxyAction(transportService, NAME, ClearCcrRestoreSessionResponse::new);
5858
this.ccrRestoreService = ccrRestoreService;
59-
this.threadPool = transportService.getThreadPool();
6059
}
6160

6261
@Override
6362
protected void doExecute(ClearCcrRestoreSessionRequest request, ActionListener<ClearCcrRestoreSessionResponse> listener) {
64-
// TODO: Currently blocking actions might occur in the session closed callbacks. This dispatch
65-
// may be unnecessary when we remove these callbacks.
66-
threadPool.generic().execute(() -> {
67-
ccrRestoreService.closeSession(request.getSessionUUID());
68-
listener.onResponse(new ClearCcrRestoreSessionResponse());
69-
});
63+
ccrRestoreService.closeSession(request.getSessionUUID());
64+
listener.onResponse(new ClearCcrRestoreSessionResponse());
7065
}
7166
}
7267

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.elasticsearch.common.settings.Settings;
2323
import org.elasticsearch.common.util.BigArrays;
2424
import org.elasticsearch.common.util.ByteArray;
25-
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2625
import org.elasticsearch.threadpool.ThreadPool;
2726
import org.elasticsearch.transport.TransportActionProxy;
2827
import org.elasticsearch.transport.TransportService;
@@ -68,7 +67,7 @@ public TransportGetCcrRestoreFileChunkAction(Settings settings, BigArrays bigArr
6867
IndexNameExpressionResolver resolver,
6968
ActionFilters actionFilters, CcrRestoreSourceService restoreSourceService) {
7069
super(settings, NAME, transportService.getThreadPool(), transportService, actionFilters, resolver,
71-
GetCcrRestoreFileChunkRequest::new);
70+
GetCcrRestoreFileChunkRequest::new, ThreadPool.Names.GENERIC);
7271
TransportActionProxy.registerProxyAction(transportService, NAME, GetCcrRestoreFileChunkResponse::new);
7372
this.threadPool = transportService.getThreadPool();
7473
this.restoreSourceService = restoreSourceService;
@@ -77,29 +76,21 @@ public TransportGetCcrRestoreFileChunkAction(Settings settings, BigArrays bigArr
7776

7877
@Override
7978
protected void doExecute(GetCcrRestoreFileChunkRequest request, ActionListener<GetCcrRestoreFileChunkResponse> listener) {
80-
threadPool.generic().execute(new AbstractRunnable() {
81-
@Override
82-
public void onFailure(Exception e) {
83-
listener.onFailure(e);
79+
int bytesRequested = request.getSize();
80+
ByteArray array = bigArrays.newByteArray(bytesRequested, false);
81+
String fileName = request.getFileName();
82+
String sessionUUID = request.getSessionUUID();
83+
// This is currently safe to do because calling `onResponse` will serialize the bytes to the network layer data
84+
// structure on the same thread. So the bytes will be copied before the reference is released.
85+
try (ReleasablePagedBytesReference reference = new ReleasablePagedBytesReference(array, bytesRequested, array)) {
86+
try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) {
87+
long offsetAfterRead = sessionReader.readFileBytes(fileName, reference);
88+
long offsetBeforeRead = offsetAfterRead - reference.length();
89+
listener.onResponse(new GetCcrRestoreFileChunkResponse(offsetBeforeRead, reference));
8490
}
85-
86-
@Override
87-
protected void doRun() throws Exception {
88-
int bytesRequested = request.getSize();
89-
ByteArray array = bigArrays.newByteArray(bytesRequested, false);
90-
String fileName = request.getFileName();
91-
String sessionUUID = request.getSessionUUID();
92-
// This is currently safe to do because calling `onResponse` will serialize the bytes to the network layer data
93-
// structure on the same thread. So the bytes will be copied before the reference is released.
94-
try (ReleasablePagedBytesReference reference = new ReleasablePagedBytesReference(array, bytesRequested, array)) {
95-
try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) {
96-
long offsetAfterRead = sessionReader.readFileBytes(fileName, reference);
97-
long offsetBeforeRead = offsetAfterRead - reference.length();
98-
listener.onResponse(new GetCcrRestoreFileChunkResponse(offsetBeforeRead, reference));
99-
}
100-
}
101-
}
102-
});
91+
} catch (IOException e) {
92+
listener.onFailure(e);
93+
}
10394
}
10495
}
10596

0 commit comments

Comments
 (0)