Skip to content

Commit 98ccd94

Browse files
authored
Factor out a ChannelActionListener (#33819)
We use similar / same concepts in SerachTransportService and HandledTransportAction but both duplicate the efforts with slightly different implementation details. This streamlines sending responses / exceptions back to a channel in an ActionListener with appropriate logging.
1 parent 241c74e commit 98ccd94

File tree

2 files changed

+41
-39
lines changed

2 files changed

+41
-39
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.ActionListenerResponseHandler;
2424
import org.elasticsearch.action.IndicesRequest;
2525
import org.elasticsearch.action.OriginalIndices;
26+
import org.elasticsearch.action.support.HandledTransportAction;
2627
import org.elasticsearch.action.support.IndicesOptions;
2728
import org.elasticsearch.cluster.node.DiscoveryNode;
2829
import org.elasticsearch.common.component.AbstractComponent;
@@ -348,25 +349,8 @@ public void onFailure(Exception e) {
348349

349350
transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
350351
(request, channel, task) -> {
351-
searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
352-
@Override
353-
public void onResponse(SearchPhaseResult searchPhaseResult) {
354-
try {
355-
channel.sendResponse(searchPhaseResult);
356-
} catch (IOException e) {
357-
throw new UncheckedIOException(e);
358-
}
359-
}
360-
361-
@Override
362-
public void onFailure(Exception e) {
363-
try {
364-
channel.sendResponse(e);
365-
} catch (IOException e1) {
366-
throw new UncheckedIOException(e1);
367-
}
368-
}
369-
});
352+
searchService.executeQueryPhase(request, (SearchTask) task, new HandledTransportAction.ChannelActionListener<>(
353+
channel, QUERY_ACTION_NAME, request));
370354
});
371355
TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME,
372356
(request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new);

server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.elasticsearch.action.support;
2020

21+
import org.apache.logging.log4j.LogManager;
22+
import org.apache.logging.log4j.Logger;
2123
import org.apache.logging.log4j.message.ParameterizedMessage;
2224
import org.elasticsearch.action.ActionListener;
2325
import org.elasticsearch.action.ActionRequest;
@@ -27,7 +29,9 @@
2729
import org.elasticsearch.tasks.Task;
2830
import org.elasticsearch.threadpool.ThreadPool;
2931
import org.elasticsearch.transport.TransportChannel;
32+
import org.elasticsearch.transport.TransportRequest;
3033
import org.elasticsearch.transport.TransportRequestHandler;
34+
import org.elasticsearch.transport.TransportResponse;
3135
import org.elasticsearch.transport.TransportService;
3236

3337
import java.util.function.Supplier;
@@ -63,30 +67,44 @@ protected HandledTransportAction(Settings settings, String actionName, boolean c
6367
}
6468

6569
class TransportHandler implements TransportRequestHandler<Request> {
66-
6770
@Override
6871
public final void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
6972
// We already got the task created on the network layer - no need to create it again on the transport layer
70-
execute(task, request, new ActionListener<Response>() {
71-
@Override
72-
public void onResponse(Response response) {
73-
try {
74-
channel.sendResponse(response);
75-
} catch (Exception e) {
76-
onFailure(e);
77-
}
78-
}
73+
Logger logger = HandledTransportAction.this.logger;
74+
execute(task, request, new ChannelActionListener<>(channel, actionName, request));
75+
}
76+
}
77+
78+
public static final class ChannelActionListener<Response extends TransportResponse, Request extends TransportRequest> implements
79+
ActionListener<Response> {
80+
private final Logger logger = LogManager.getLogger(getClass());
81+
private final TransportChannel channel;
82+
private final Request request;
83+
private final String actionName;
84+
85+
public ChannelActionListener(TransportChannel channel, String actionName, Request request) {
86+
this.channel = channel;
87+
this.request = request;
88+
this.actionName = actionName;
89+
}
7990

80-
@Override
81-
public void onFailure(Exception e) {
82-
try {
83-
channel.sendResponse(e);
84-
} catch (Exception e1) {
85-
logger.warn(() -> new ParameterizedMessage(
86-
"Failed to send error response for action [{}] and request [{}]", actionName, request), e1);
87-
}
88-
}
89-
});
91+
@Override
92+
public void onResponse(Response response) {
93+
try {
94+
channel.sendResponse(response);
95+
} catch (Exception e) {
96+
onFailure(e);
97+
}
98+
}
99+
100+
@Override
101+
public void onFailure(Exception e) {
102+
try {
103+
channel.sendResponse(e);
104+
} catch (Exception e1) {
105+
logger.warn(() -> new ParameterizedMessage(
106+
"Failed to send error response for action [{}] and request [{}]", actionName, request), e1);
107+
}
90108
}
91109
}
92110

0 commit comments

Comments
 (0)