Skip to content

Commit b88bdfe

Browse files
authored
Add dispatching to HandledTransportAction (#38050)
This commit allows implementors of the `HandledTransportAction` to specify what thread the action should be executed on. The motivation for this commit is that certain CCR requests should be performed on the generic threadpool.
1 parent 54dbf94 commit b88bdfe

File tree

3 files changed

+30
-34
lines changed

3 files changed

+30
-34
lines changed

server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ protected HandledTransportAction(String actionName, TransportService transportSe
4545
this(actionName, true, transportService, actionFilters, requestReader);
4646
}
4747

48+
protected HandledTransportAction(String actionName, TransportService transportService,
49+
ActionFilters actionFilters, Writeable.Reader<Request> requestReader, String executor) {
50+
this(actionName, true, transportService, actionFilters, requestReader, executor);
51+
}
52+
4853
protected HandledTransportAction(String actionName, boolean canTripCircuitBreaker,
4954
TransportService transportService, ActionFilters actionFilters, Supplier<Request> request) {
5055
super(actionName, actionFilters, transportService.getTaskManager());
@@ -55,8 +60,14 @@ protected HandledTransportAction(String actionName, boolean canTripCircuitBreake
5560
protected HandledTransportAction(String actionName, boolean canTripCircuitBreaker,
5661
TransportService transportService, ActionFilters actionFilters,
5762
Writeable.Reader<Request> requestReader) {
63+
this(actionName, canTripCircuitBreaker, transportService, actionFilters, requestReader, ThreadPool.Names.SAME);
64+
}
65+
66+
protected HandledTransportAction(String actionName, boolean canTripCircuitBreaker,
67+
TransportService transportService, ActionFilters actionFilters,
68+
Writeable.Reader<Request> requestReader, String executor) {
5869
super(actionName, actionFilters, transportService.getTaskManager());
59-
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, false, canTripCircuitBreaker, requestReader,
70+
transportService.registerRequestHandler(actionName, executor, false, canTripCircuitBreaker, requestReader,
6071
new TransportHandler());
6172
}
6273

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,26 +43,20 @@ public static class TransportDeleteCcrRestoreSessionAction
4343
extends HandledTransportAction<ClearCcrRestoreSessionRequest, ClearCcrRestoreSessionResponse> {
4444

4545
private final CcrRestoreSourceService ccrRestoreService;
46-
private final ThreadPool threadPool;
4746

4847
@Inject
4948
public TransportDeleteCcrRestoreSessionAction(ActionFilters actionFilters, TransportService transportService,
5049
CcrRestoreSourceService ccrRestoreService) {
51-
super(NAME, transportService, actionFilters, ClearCcrRestoreSessionRequest::new);
50+
super(NAME, transportService, actionFilters, ClearCcrRestoreSessionRequest::new, ThreadPool.Names.GENERIC);
5251
TransportActionProxy.registerProxyAction(transportService, NAME, ClearCcrRestoreSessionResponse::new);
5352
this.ccrRestoreService = ccrRestoreService;
54-
this.threadPool = transportService.getThreadPool();
5553
}
5654

5755
@Override
5856
protected void doExecute(Task task, ClearCcrRestoreSessionRequest request,
5957
ActionListener<ClearCcrRestoreSessionResponse> listener) {
60-
// TODO: Currently blocking actions might occur in the session closed callbacks. This dispatch
61-
// may be unnecessary when we remove these callbacks.
62-
threadPool.generic().execute(() -> {
63-
ccrRestoreService.closeSession(request.getSessionUUID());
64-
listener.onResponse(new ClearCcrRestoreSessionResponse());
65-
});
58+
ccrRestoreService.closeSession(request.getSessionUUID());
59+
listener.onResponse(new ClearCcrRestoreSessionResponse());
6660
}
6761
}
6862

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.common.io.stream.Writeable;
2020
import org.elasticsearch.common.util.BigArrays;
2121
import org.elasticsearch.common.util.ByteArray;
22-
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2322
import org.elasticsearch.tasks.Task;
2423
import org.elasticsearch.threadpool.ThreadPool;
2524
import org.elasticsearch.transport.TransportActionProxy;
@@ -58,7 +57,7 @@ public static class TransportGetCcrRestoreFileChunkAction
5857
@Inject
5958
public TransportGetCcrRestoreFileChunkAction(BigArrays bigArrays, TransportService transportService, ActionFilters actionFilters,
6059
CcrRestoreSourceService restoreSourceService) {
61-
super(NAME, transportService, actionFilters, GetCcrRestoreFileChunkRequest::new);
60+
super(NAME, transportService, actionFilters, GetCcrRestoreFileChunkRequest::new, ThreadPool.Names.GENERIC);
6261
TransportActionProxy.registerProxyAction(transportService, NAME, GetCcrRestoreFileChunkResponse::new);
6362
this.threadPool = transportService.getThreadPool();
6463
this.restoreSourceService = restoreSourceService;
@@ -68,29 +67,21 @@ public TransportGetCcrRestoreFileChunkAction(BigArrays bigArrays, TransportServi
6867
@Override
6968
protected void doExecute(Task task, GetCcrRestoreFileChunkRequest request,
7069
ActionListener<GetCcrRestoreFileChunkResponse> listener) {
71-
threadPool.generic().execute(new AbstractRunnable() {
72-
@Override
73-
public void onFailure(Exception e) {
74-
listener.onFailure(e);
70+
int bytesRequested = request.getSize();
71+
ByteArray array = bigArrays.newByteArray(bytesRequested, false);
72+
String fileName = request.getFileName();
73+
String sessionUUID = request.getSessionUUID();
74+
// This is currently safe to do because calling `onResponse` will serialize the bytes to the network layer data
75+
// structure on the same thread. So the bytes will be copied before the reference is released.
76+
try (ReleasablePagedBytesReference reference = new ReleasablePagedBytesReference(array, bytesRequested, array)) {
77+
try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) {
78+
long offsetAfterRead = sessionReader.readFileBytes(fileName, reference);
79+
long offsetBeforeRead = offsetAfterRead - reference.length();
80+
listener.onResponse(new GetCcrRestoreFileChunkResponse(offsetBeforeRead, reference));
7581
}
76-
77-
@Override
78-
protected void doRun() throws Exception {
79-
int bytesRequested = request.getSize();
80-
ByteArray array = bigArrays.newByteArray(bytesRequested, false);
81-
String fileName = request.getFileName();
82-
String sessionUUID = request.getSessionUUID();
83-
// This is currently safe to do because calling `onResponse` will serialize the bytes to the network layer data
84-
// structure on the same thread. So the bytes will be copied before the reference is released.
85-
try (ReleasablePagedBytesReference reference = new ReleasablePagedBytesReference(array, bytesRequested, array)) {
86-
try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) {
87-
long offsetAfterRead = sessionReader.readFileBytes(fileName, reference);
88-
long offsetBeforeRead = offsetAfterRead - reference.length();
89-
listener.onResponse(new GetCcrRestoreFileChunkResponse(offsetBeforeRead, reference));
90-
}
91-
}
92-
}
93-
});
82+
} catch (IOException e) {
83+
listener.onFailure(e);
84+
}
9485
}
9586
}
9687

0 commit comments

Comments
 (0)