2424import org .elasticsearch .action .ActionListenerResponseHandler ;
2525import org .elasticsearch .action .IndicesRequest ;
2626import org .elasticsearch .action .OriginalIndices ;
27- import org .elasticsearch .action .support .HandledTransportAction ;
27+ import org .elasticsearch .action .support .HandledTransportAction . ChannelActionListener ;
2828import org .elasticsearch .action .support .IndicesOptions ;
2929import org .elasticsearch .cluster .node .DiscoveryNode ;
3030import org .elasticsearch .common .component .AbstractComponent ;
3434import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
3535import org .elasticsearch .search .SearchPhaseResult ;
3636import org .elasticsearch .search .SearchService ;
37+ import org .elasticsearch .search .SearchService .CanMatchResponse ;
3738import org .elasticsearch .search .dfs .DfsSearchResult ;
3839import org .elasticsearch .search .fetch .FetchSearchResult ;
3940import org .elasticsearch .search .fetch .QueryFetchSearchResult ;
6061import org .elasticsearch .transport .TransportService ;
6162
6263import java .io .IOException ;
63- import java .io .UncheckedIOException ;
6464import java .util .HashMap ;
6565import java .util .Map ;
6666import java .util .function .BiFunction ;
@@ -340,26 +340,9 @@ public void messageReceived(TransportRequest.Empty request, TransportChannel cha
340340 transportService .registerRequestHandler (DFS_ACTION_NAME , ShardSearchTransportRequest ::new , ThreadPool .Names .SAME ,
341341 new TaskAwareTransportRequestHandler <ShardSearchTransportRequest >() {
342342 @ Override
343- public void messageReceived (ShardSearchTransportRequest request , TransportChannel channel , Task task ) throws Exception {
344- searchService .executeDfsPhase (request , (SearchTask ) task , new ActionListener <SearchPhaseResult >() {
345- @ Override
346- public void onResponse (SearchPhaseResult searchPhaseResult ) {
347- try {
348- channel .sendResponse (searchPhaseResult );
349- } catch (IOException e ) {
350- throw new UncheckedIOException (e );
351- }
352- }
353-
354- @ Override
355- public void onFailure (Exception e ) {
356- try {
357- channel .sendResponse (e );
358- } catch (IOException e1 ) {
359- throw new UncheckedIOException (e1 );
360- }
361- }
362- });
343+ public void messageReceived (ShardSearchTransportRequest request , TransportChannel channel , Task task ) {
344+ searchService .executeDfsPhase (request , (SearchTask ) task ,
345+ new ChannelActionListener <>(channel , DFS_ACTION_NAME , request ));
363346
364347 }
365348 });
@@ -369,8 +352,8 @@ public void onFailure(Exception e) {
369352 new TaskAwareTransportRequestHandler <ShardSearchTransportRequest >() {
370353 @ Override
371354 public void messageReceived (ShardSearchTransportRequest request , TransportChannel channel , Task task ) {
372- searchService .executeQueryPhase (request , (SearchTask ) task , new HandledTransportAction . ChannelActionListener <>
373- (channel , QUERY_ACTION_NAME , request ));
355+ searchService .executeQueryPhase (request , (SearchTask ) task ,
356+ new ChannelActionListener <> (channel , QUERY_ACTION_NAME , request ));
374357 }
375358 });
376359 TransportActionProxy .registerProxyAction (transportService , QUERY_ACTION_NAME ,
@@ -379,49 +362,49 @@ public void messageReceived(ShardSearchTransportRequest request, TransportChanne
379362 transportService .registerRequestHandler (QUERY_ID_ACTION_NAME , QuerySearchRequest ::new , ThreadPool .Names .SEARCH ,
380363 new TaskAwareTransportRequestHandler <QuerySearchRequest >() {
381364 @ Override
382- public void messageReceived (QuerySearchRequest request , TransportChannel channel , Task task ) throws Exception {
383- QuerySearchResult result = searchService .executeQueryPhase (request , (SearchTask )task );
384- channel . sendResponse ( result );
365+ public void messageReceived (QuerySearchRequest request , TransportChannel channel , Task task ) {
366+ searchService .executeQueryPhase (request , (SearchTask )task ,
367+ new ChannelActionListener <>( channel , QUERY_ID_ACTION_NAME , request ) );
385368 }
386369 });
387370 TransportActionProxy .registerProxyAction (transportService , QUERY_ID_ACTION_NAME , QuerySearchResult ::new );
388371
389372 transportService .registerRequestHandler (QUERY_SCROLL_ACTION_NAME , InternalScrollSearchRequest ::new , ThreadPool .Names .SEARCH ,
390373 new TaskAwareTransportRequestHandler <InternalScrollSearchRequest >() {
391374 @ Override
392- public void messageReceived (InternalScrollSearchRequest request , TransportChannel channel , Task task ) throws Exception {
393- ScrollQuerySearchResult result = searchService .executeQueryPhase (request , (SearchTask )task );
394- channel . sendResponse ( result );
375+ public void messageReceived (InternalScrollSearchRequest request , TransportChannel channel , Task task ) {
376+ searchService .executeQueryPhase (request , (SearchTask )task ,
377+ new ChannelActionListener <>( channel , QUERY_SCROLL_ACTION_NAME , request ) );
395378 }
396379 });
397380 TransportActionProxy .registerProxyAction (transportService , QUERY_SCROLL_ACTION_NAME , ScrollQuerySearchResult ::new );
398381
399382 transportService .registerRequestHandler (QUERY_FETCH_SCROLL_ACTION_NAME , InternalScrollSearchRequest ::new , ThreadPool .Names .SEARCH ,
400383 new TaskAwareTransportRequestHandler <InternalScrollSearchRequest >() {
401384 @ Override
402- public void messageReceived (InternalScrollSearchRequest request , TransportChannel channel , Task task ) throws Exception {
403- ScrollQueryFetchSearchResult result = searchService .executeFetchPhase (request , (SearchTask )task );
404- channel . sendResponse ( result );
385+ public void messageReceived (InternalScrollSearchRequest request , TransportChannel channel , Task task ) {
386+ searchService .executeFetchPhase (request , (SearchTask )task ,
387+ new ChannelActionListener <>( channel , QUERY_FETCH_SCROLL_ACTION_NAME , request ) );
405388 }
406389 });
407390 TransportActionProxy .registerProxyAction (transportService , QUERY_FETCH_SCROLL_ACTION_NAME , ScrollQueryFetchSearchResult ::new );
408391
409392 transportService .registerRequestHandler (FETCH_ID_SCROLL_ACTION_NAME , ShardFetchRequest ::new , ThreadPool .Names .SEARCH ,
410393 new TaskAwareTransportRequestHandler <ShardFetchRequest >() {
411394 @ Override
412- public void messageReceived (ShardFetchRequest request , TransportChannel channel , Task task ) throws Exception {
413- FetchSearchResult result = searchService .executeFetchPhase (request , (SearchTask )task );
414- channel . sendResponse ( result );
395+ public void messageReceived (ShardFetchRequest request , TransportChannel channel , Task task ){
396+ searchService .executeFetchPhase (request , (SearchTask )task ,
397+ new ChannelActionListener <>( channel , FETCH_ID_SCROLL_ACTION_NAME , request ) );
415398 }
416399 });
417400 TransportActionProxy .registerProxyAction (transportService , FETCH_ID_SCROLL_ACTION_NAME , FetchSearchResult ::new );
418401
419402 transportService .registerRequestHandler (FETCH_ID_ACTION_NAME , ShardFetchSearchRequest ::new , ThreadPool .Names .SEARCH , true , true ,
420403 new TaskAwareTransportRequestHandler <ShardFetchSearchRequest >() {
421404 @ Override
422- public void messageReceived (ShardFetchSearchRequest request , TransportChannel channel , Task task ) throws Exception {
423- FetchSearchResult result = searchService .executeFetchPhase (request , (SearchTask )task );
424- channel . sendResponse ( result );
405+ public void messageReceived (ShardFetchSearchRequest request , TransportChannel channel , Task task ) {
406+ searchService .executeFetchPhase (request , (SearchTask )task ,
407+ new ChannelActionListener <>( channel , FETCH_ID_ACTION_NAME , request ) );
425408 }
426409 });
427410 TransportActionProxy .registerProxyAction (transportService , FETCH_ID_ACTION_NAME , FetchSearchResult ::new );
@@ -439,35 +422,6 @@ public void messageReceived(ShardSearchTransportRequest request, TransportChanne
439422 (Supplier <TransportResponse >) CanMatchResponse ::new );
440423 }
441424
442- public static final class CanMatchResponse extends SearchPhaseResult {
443- private boolean canMatch ;
444-
445- public CanMatchResponse () {
446- }
447-
448- public CanMatchResponse (boolean canMatch ) {
449- this .canMatch = canMatch ;
450- }
451-
452-
453- @ Override
454- public void readFrom (StreamInput in ) throws IOException {
455- super .readFrom (in );
456- canMatch = in .readBoolean ();
457- }
458-
459- @ Override
460- public void writeTo (StreamOutput out ) throws IOException {
461- super .writeTo (out );
462- out .writeBoolean (canMatch );
463- }
464-
465- public boolean canMatch () {
466- return canMatch ;
467- }
468- }
469-
470-
471425 /**
472426 * Returns a connection to the given node on the provided cluster. If the cluster alias is <code>null</code> the node will be resolved
473427 * against the local cluster.
0 commit comments