Skip to content

Commit 59e7c64

Browse files
authored
Core: Combine messageRecieved methods in TransportRequestHandler (#31519)
TransportRequestHandler currently contains 2 messageReceived methods, one which takes a Task, and one that does not. The first just delegates to the second. This commit changes all existing implementors of TransportRequestHandler to implement the version which takes Task, thus allowing the class to be a functional interface, and eliminating the need to throw exceptions when a task needs to be ensured.
1 parent f023e95 commit 59e7c64

File tree

41 files changed

+177
-273
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+177
-273
lines changed

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.util.BigArrays;
2727
import org.elasticsearch.indices.breaker.CircuitBreakerService;
2828
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
29+
import org.elasticsearch.tasks.Task;
2930
import org.elasticsearch.test.ESTestCase;
3031
import org.elasticsearch.test.transport.MockTransportService;
3132
import org.elasticsearch.threadpool.TestThreadPool;
@@ -91,7 +92,7 @@ public void testScheduledPing() throws Exception {
9192
serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
9293
new TransportRequestHandler<TransportRequest.Empty>() {
9394
@Override
94-
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) {
95+
public void messageReceived(TransportRequest.Empty request, TransportChannel channel, Task task) {
9596
try {
9697
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY);
9798
} catch (IOException e) {

qa/ccs-unavailable-clusters/src/test/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,12 @@ private static MockTransportService startTransport(
103103
MockTransportService newService = MockTransportService.createNewService(s, version, threadPool, null);
104104
try {
105105
newService.registerRequestHandler(ClusterSearchShardsAction.NAME, ThreadPool.Names.SAME, ClusterSearchShardsRequest::new,
106-
(request, channel) -> {
106+
(request, channel, task) -> {
107107
channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
108108
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
109109
});
110110
newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new,
111-
(request, channel) -> {
111+
(request, channel, task) -> {
112112
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
113113
for (DiscoveryNode node : knownNodes) {
114114
builder.add(node);

server/src/main/java/org/elasticsearch/action/admin/cluster/node/liveness/TransportLivenessAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.cluster.service.ClusterService;
2323
import org.elasticsearch.common.inject.Inject;
24+
import org.elasticsearch.tasks.Task;
2425
import org.elasticsearch.threadpool.ThreadPool;
2526
import org.elasticsearch.transport.TransportChannel;
2627
import org.elasticsearch.transport.TransportRequestHandler;
@@ -39,7 +40,7 @@ public TransportLivenessAction(ClusterService clusterService, TransportService t
3940
}
4041

4142
@Override
42-
public void messageReceived(LivenessRequest request, TransportChannel channel) throws Exception {
43+
public void messageReceived(LivenessRequest request, TransportChannel channel, Task task) throws Exception {
4344
channel.sendResponse(new LivenessResponse(clusterService.getClusterName(), clusterService.localNode()));
4445
}
4546
}

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.common.io.stream.StreamOutput;
3535
import org.elasticsearch.common.settings.Settings;
3636
import org.elasticsearch.tasks.CancellableTask;
37+
import org.elasticsearch.tasks.Task;
3738
import org.elasticsearch.tasks.TaskId;
3839
import org.elasticsearch.tasks.TaskInfo;
3940
import org.elasticsearch.threadpool.ThreadPool;
@@ -285,7 +286,7 @@ public void writeTo(StreamOutput out) throws IOException {
285286

286287
class BanParentRequestHandler implements TransportRequestHandler<BanParentTaskRequest> {
287288
@Override
288-
public void messageReceived(final BanParentTaskRequest request, final TransportChannel channel) throws Exception {
289+
public void messageReceived(final BanParentTaskRequest request, final TransportChannel channel, Task task) throws Exception {
289290
if (request.ban) {
290291
logger.debug("Received ban for the parent [{}] on the node [{}], reason: [{}]", request.parentTaskId,
291292
clusterService.localNode().getId(), request.reason);

server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

Lines changed: 66 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,10 @@
4545
import org.elasticsearch.search.query.QuerySearchRequest;
4646
import org.elasticsearch.search.query.QuerySearchResult;
4747
import org.elasticsearch.search.query.ScrollQuerySearchResult;
48-
import org.elasticsearch.tasks.Task;
4948
import org.elasticsearch.threadpool.ThreadPool;
5049
import org.elasticsearch.transport.RemoteClusterService;
51-
import org.elasticsearch.transport.TaskAwareTransportRequestHandler;
5250
import org.elasticsearch.transport.Transport;
5351
import org.elasticsearch.transport.TransportActionProxy;
54-
import org.elasticsearch.transport.TransportChannel;
5552
import org.elasticsearch.transport.TransportException;
5653
import org.elasticsearch.transport.TransportRequest;
5754
import org.elasticsearch.transport.TransportRequestOptions;
@@ -314,150 +311,116 @@ public void writeTo(StreamOutput out) throws IOException {
314311

315312
public static void registerRequestHandler(TransportService transportService, SearchService searchService) {
316313
transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ScrollFreeContextRequest::new,
317-
new TaskAwareTransportRequestHandler<ScrollFreeContextRequest>() {
318-
@Override
319-
public void messageReceived(ScrollFreeContextRequest request, TransportChannel channel, Task task) throws Exception {
320-
boolean freed = searchService.freeContext(request.id());
321-
channel.sendResponse(new SearchFreeContextResponse(freed));
322-
}
323-
});
314+
(request, channel, task) -> {
315+
boolean freed = searchService.freeContext(request.id());
316+
channel.sendResponse(new SearchFreeContextResponse(freed));
317+
});
324318
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME,
325319
(Supplier<TransportResponse>) SearchFreeContextResponse::new);
326320
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, SearchFreeContextRequest::new,
327-
new TaskAwareTransportRequestHandler<SearchFreeContextRequest>() {
328-
@Override
329-
public void messageReceived(SearchFreeContextRequest request, TransportChannel channel, Task task) throws Exception {
330-
boolean freed = searchService.freeContext(request.id());
331-
channel.sendResponse(new SearchFreeContextResponse(freed));
332-
}
333-
});
321+
(request, channel, task) -> {
322+
boolean freed = searchService.freeContext(request.id());
323+
channel.sendResponse(new SearchFreeContextResponse(freed));
324+
});
334325
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME,
335326
(Supplier<TransportResponse>) SearchFreeContextResponse::new);
336327
transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportRequest.Empty.INSTANCE,
337-
ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler<TransportRequest.Empty>() {
338-
@Override
339-
public void messageReceived(TransportRequest.Empty request, TransportChannel channel, Task task) throws Exception {
340-
searchService.freeAllScrollContexts();
341-
channel.sendResponse(TransportResponse.Empty.INSTANCE);
342-
}
343-
});
328+
ThreadPool.Names.SAME, (request, channel, task) -> {
329+
searchService.freeAllScrollContexts();
330+
channel.sendResponse(TransportResponse.Empty.INSTANCE);
331+
});
344332
TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
345333
() -> TransportResponse.Empty.INSTANCE);
346334

347335
transportService.registerRequestHandler(DFS_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
348-
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
349-
@Override
350-
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
351-
searchService.executeDfsPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
352-
@Override
353-
public void onResponse(SearchPhaseResult searchPhaseResult) {
354-
try {
355-
channel.sendResponse(searchPhaseResult);
356-
} catch (IOException e) {
357-
throw new UncheckedIOException(e);
358-
}
336+
(request, channel, task) -> {
337+
searchService.executeDfsPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
338+
@Override
339+
public void onResponse(SearchPhaseResult searchPhaseResult) {
340+
try {
341+
channel.sendResponse(searchPhaseResult);
342+
} catch (IOException e) {
343+
throw new UncheckedIOException(e);
359344
}
360-
361-
@Override
362-
public void onFailure(Exception e) {
363-
try {
364-
channel.sendResponse(e);
365-
} catch (IOException e1) {
366-
throw new UncheckedIOException(e1);
367-
}
345+
}
346+
347+
@Override
348+
public void onFailure(Exception e) {
349+
try {
350+
channel.sendResponse(e);
351+
} catch (IOException e1) {
352+
throw new UncheckedIOException(e1);
368353
}
369-
});
370-
371-
}
354+
}
355+
});
372356
});
373357
TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, DfsSearchResult::new);
374358

375359
transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
376-
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
377-
@Override
378-
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
379-
searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
380-
@Override
381-
public void onResponse(SearchPhaseResult searchPhaseResult) {
382-
try {
383-
channel.sendResponse(searchPhaseResult);
384-
} catch (IOException e) {
385-
throw new UncheckedIOException(e);
386-
}
360+
(request, channel, task) -> {
361+
searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
362+
@Override
363+
public void onResponse(SearchPhaseResult searchPhaseResult) {
364+
try {
365+
channel.sendResponse(searchPhaseResult);
366+
} catch (IOException e) {
367+
throw new UncheckedIOException(e);
387368
}
388-
389-
@Override
390-
public void onFailure(Exception e) {
391-
try {
392-
channel.sendResponse(e);
393-
} catch (IOException e1) {
394-
throw new UncheckedIOException(e1);
395-
}
369+
}
370+
371+
@Override
372+
public void onFailure(Exception e) {
373+
try {
374+
channel.sendResponse(e);
375+
} catch (IOException e1) {
376+
throw new UncheckedIOException(e1);
396377
}
397-
});
398-
}
378+
}
379+
});
399380
});
400381
TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME,
401382
(request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new);
402383

403384
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SEARCH, QuerySearchRequest::new,
404-
new TaskAwareTransportRequestHandler<QuerySearchRequest>() {
405-
@Override
406-
public void messageReceived(QuerySearchRequest request, TransportChannel channel, Task task) throws Exception {
407-
QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
408-
channel.sendResponse(result);
409-
}
385+
(request, channel, task) -> {
386+
QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
387+
channel.sendResponse(result);
410388
});
411389
TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new);
412390

413391
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
414-
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
415-
@Override
416-
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception {
417-
ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
418-
channel.sendResponse(result);
419-
}
392+
(request, channel, task) -> {
393+
ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
394+
channel.sendResponse(result);
420395
});
421396
TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new);
422397

423398
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
424-
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
425-
@Override
426-
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception {
427-
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
428-
channel.sendResponse(result);
429-
}
399+
(request, channel, task) -> {
400+
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
401+
channel.sendResponse(result);
430402
});
431403
TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new);
432404

433405
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchRequest::new,
434-
new TaskAwareTransportRequestHandler<ShardFetchRequest>() {
435-
@Override
436-
public void messageReceived(ShardFetchRequest request, TransportChannel channel, Task task) throws Exception {
437-
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
438-
channel.sendResponse(result);
439-
}
406+
(request, channel, task) -> {
407+
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
408+
channel.sendResponse(result);
440409
});
441410
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new);
442411

443412
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchSearchRequest::new,
444-
new TaskAwareTransportRequestHandler<ShardFetchSearchRequest>() {
445-
@Override
446-
public void messageReceived(ShardFetchSearchRequest request, TransportChannel channel, Task task) throws Exception {
447-
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
448-
channel.sendResponse(result);
449-
}
413+
(request, channel, task) -> {
414+
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
415+
channel.sendResponse(result);
450416
});
451417
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);
452418

453419
// this is cheap, it does not fetch during the rewrite phase, so we can let it quickly execute on a networking thread
454420
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
455-
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
456-
@Override
457-
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
458-
boolean canMatch = searchService.canMatch(request);
459-
channel.sendResponse(new CanMatchResponse(canMatch));
460-
}
421+
(request, channel, task) -> {
422+
boolean canMatch = searchService.canMatch(request);
423+
channel.sendResponse(new CanMatchResponse(canMatch));
461424
});
462425
TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME,
463426
(Supplier<TransportResponse>) CanMatchResponse::new);

server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,6 @@ protected HandledTransportAction(Settings settings, String actionName, boolean c
6464

6565
class TransportHandler implements TransportRequestHandler<Request> {
6666

67-
@Override
68-
public final void messageReceived(Request request, TransportChannel channel) throws Exception {
69-
throw new UnsupportedOperationException("the task parameter is required for this operation");
70-
}
71-
7267
@Override
7368
public final void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
7469
// We already got the task created on the network layer - no need to create it again on the transport layer

server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -284,10 +284,5 @@ class ShardTransportHandler implements TransportRequestHandler<ShardRequest> {
284284
public void messageReceived(ShardRequest request, TransportChannel channel, Task task) throws Exception {
285285
channel.sendResponse(shardOperation(request, task));
286286
}
287-
288-
@Override
289-
public final void messageReceived(final ShardRequest request, final TransportChannel channel) throws Exception {
290-
throw new UnsupportedOperationException("the task parameter is required");
291-
}
292287
}
293288
}

server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ protected void onCompletion() {
393393

394394
class BroadcastByNodeTransportRequestHandler implements TransportRequestHandler<NodeRequest> {
395395
@Override
396-
public void messageReceived(final NodeRequest request, TransportChannel channel) throws Exception {
396+
public void messageReceived(final NodeRequest request, TransportChannel channel, Task task) throws Exception {
397397
List<ShardRouting> shards = request.getShards();
398398
final int totalShards = shards.size();
399399
if (logger.isTraceEnabled()) {

server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -258,12 +258,6 @@ class NodeTransportHandler implements TransportRequestHandler<NodeRequest> {
258258
public void messageReceived(NodeRequest request, TransportChannel channel, Task task) throws Exception {
259259
channel.sendResponse(nodeOperation(request, task));
260260
}
261-
262-
@Override
263-
public void messageReceived(NodeRequest request, TransportChannel channel) throws Exception {
264-
channel.sendResponse(nodeOperation(request));
265-
}
266-
267261
}
268262

269263
}

0 commit comments

Comments
 (0)