Skip to content

Commit 73b5564

Browse files
Fix ActionListener.map exception handling (#50886)
ActionListener.map would call listener.onFailure for exceptions from listener.onResponse, but this means we could double trigger some listeners which is generally unexpected. Instead, we should assume that a listener's onResponse (and onFailure) implementation is responsible for its own exception handling.
1 parent 65ad269 commit 73b5564

File tree

3 files changed

+93
-25
lines changed

3 files changed

+93
-25
lines changed

server/src/main/java/org/elasticsearch/action/ActionListener.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,14 +136,50 @@ static <Response> ActionListener<Response> wrap(Runnable runnable) {
136136
* Creates a listener that wraps another listener, mapping response values via the given mapping function and passing along
137137
* exceptions to the delegate.
138138
*
139-
* @param listener Listener to delegate to
139+
* Notice that it is considered a bug if the listener's onResponse or onFailure fails. onResponse failures will not call onFailure.
140+
*
141+
* If the function fails, the listener's onFailure handler will be called. The principle is that the mapped listener will handle
142+
* exceptions from the mapping function {@code fn} but it is the responsibility of {@code delegate} to handle its own exceptions
143+
* inside `onResponse` and `onFailure`.
144+
*
145+
* @param delegate Listener to delegate to
140146
* @param fn Function to apply to listener response
141147
* @param <Response> Response type of the new listener
142148
* @param <T> Response type of the wrapped listener
143149
* @return a listener that maps the received response and then passes it to its delegate listener
144150
*/
145-
static <T, Response> ActionListener<Response> map(ActionListener<T> listener, CheckedFunction<Response, T, Exception> fn) {
146-
return wrap(r -> listener.onResponse(fn.apply(r)), listener::onFailure);
151+
static <T, Response> ActionListener<Response> map(ActionListener<T> delegate, CheckedFunction<Response, T, Exception> fn) {
152+
return new ActionListener<>() {
153+
@Override
154+
public void onResponse(Response response) {
155+
T mapped;
156+
try {
157+
mapped = fn.apply(response);
158+
} catch (Exception e) {
159+
onFailure(e);
160+
return;
161+
}
162+
try {
163+
delegate.onResponse(mapped);
164+
} catch (RuntimeException e) {
165+
assert false : new AssertionError("map: listener.onResponse failed", e);
166+
throw e;
167+
}
168+
}
169+
170+
@Override
171+
public void onFailure(Exception e) {
172+
try {
173+
delegate.onFailure(e);
174+
} catch (RuntimeException ex) {
175+
if (ex != e) {
176+
ex.addSuppressed(e);
177+
}
178+
assert false : new AssertionError("map: listener.onFailure failed", ex);
179+
throw ex;
180+
}
181+
}
182+
};
147183
}
148184

149185
/**

server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import org.elasticsearch.transport.TransportService;
5656

5757
import java.io.IOException;
58-
import java.io.UncheckedIOException;
5958
import java.util.HashMap;
6059
import java.util.Map;
6160
import java.util.function.BiFunction;
@@ -306,27 +305,11 @@ public static void registerRequestHandler(TransportService transportService, Sea
306305
(in) -> TransportResponse.Empty.INSTANCE);
307306

308307
transportService.registerRequestHandler(DFS_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchRequest::new,
309-
(request, channel, task) -> {
310-
searchService.executeDfsPhase(request, (SearchShardTask) task, new ActionListener<SearchPhaseResult>() {
311-
@Override
312-
public void onResponse(SearchPhaseResult searchPhaseResult) {
313-
try {
314-
channel.sendResponse(searchPhaseResult);
315-
} catch (IOException e) {
316-
throw new UncheckedIOException(e);
317-
}
318-
}
319-
320-
@Override
321-
public void onFailure(Exception e) {
322-
try {
323-
channel.sendResponse(e);
324-
} catch (IOException e1) {
325-
throw new UncheckedIOException(e1);
326-
}
327-
}
328-
});
329-
});
308+
(request, channel, task) ->
309+
searchService.executeDfsPhase(request, (SearchShardTask) task,
310+
new ChannelActionListener<>(channel, DFS_ACTION_NAME, request))
311+
);
312+
330313
TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, DfsSearchResult::new);
331314

332315
transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchRequest::new,

server/src/test/java/org/elasticsearch/action/ActionListenerTests.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,4 +234,53 @@ public void testCompleteWith() {
234234
assertThat(onFailureListener.isDone(), equalTo(true));
235235
assertThat(expectThrows(ExecutionException.class, onFailureListener::get).getCause(), instanceOf(IOException.class));
236236
}
237+
238+
/**
239+
* Test that map passes the output of the function to its delegate listener and that exceptions in the function are propagated to the
240+
* onFailure handler. Also verify that exceptions from ActionListener.onResponse does not invoke onFailure, since it is the
241+
* responsibility of the ActionListener implementation (the client of the API) to handle exceptions in onResponse and onFailure.
242+
*/
243+
public void testMap() {
244+
AtomicReference<Exception> exReference = new AtomicReference<>();
245+
246+
ActionListener<String> listener = new ActionListener<>() {
247+
@Override
248+
public void onResponse(String s) {
249+
if (s == null) {
250+
throw new IllegalArgumentException("simulate onResponse exception");
251+
}
252+
}
253+
254+
@Override
255+
public void onFailure(Exception e) {
256+
exReference.set(e);
257+
if (e instanceof IllegalArgumentException) {
258+
throw (IllegalArgumentException) e;
259+
}
260+
}
261+
};
262+
ActionListener<Boolean> mapped = ActionListener.map(listener, b -> {
263+
if (b == null) {
264+
return null;
265+
} else if (b) {
266+
throw new IllegalStateException("simulate map function exception");
267+
} else {
268+
return b.toString();
269+
}
270+
});
271+
272+
AssertionError assertionError = expectThrows(AssertionError.class, () -> mapped.onResponse(null));
273+
assertThat(assertionError.getCause().getCause(), instanceOf(IllegalArgumentException.class));
274+
assertNull(exReference.get());
275+
mapped.onResponse(false);
276+
assertNull(exReference.get());
277+
mapped.onResponse(true);
278+
assertThat(exReference.get(), instanceOf(IllegalStateException.class));
279+
280+
assertionError = expectThrows(AssertionError.class, () -> mapped.onFailure(new IllegalArgumentException()));
281+
assertThat(assertionError.getCause().getCause(), instanceOf(IllegalArgumentException.class));
282+
assertThat(exReference.get(), instanceOf(IllegalArgumentException.class));
283+
mapped.onFailure(new IllegalStateException());
284+
assertThat(exReference.get(), instanceOf(IllegalStateException.class));
285+
}
237286
}

0 commit comments

Comments
 (0)