3232import org .elasticsearch .search .SearchShardTarget ;
3333import org .elasticsearch .search .internal .InternalScrollSearchRequest ;
3434import org .elasticsearch .search .internal .InternalSearchResponse ;
35+ import org .elasticsearch .transport .RemoteClusterService ;
36+ import org .elasticsearch .transport .Transport ;
3537
3638import java .io .IOException ;
3739import java .util .ArrayList ;
40+ import java .util .Arrays ;
41+ import java .util .HashSet ;
3842import java .util .List ;
43+ import java .util .Set ;
3944import java .util .concurrent .atomic .AtomicInteger ;
45+ import java .util .function .BiFunction ;
4046
4147import static org .elasticsearch .action .search .TransportSearchHelper .internalScrollSearchRequest ;
4248
@@ -67,13 +73,15 @@ abstract class SearchScrollAsyncAction<T extends SearchPhaseResult> implements R
6773 protected final DiscoveryNodes nodes ;
6874 protected final SearchPhaseController searchPhaseController ;
6975 protected final SearchScrollRequest request ;
76+ protected final SearchTransportService searchTransportService ;
7077 private final long startTime ;
7178 private final List <ShardSearchFailure > shardFailures = new ArrayList <>();
7279 private final AtomicInteger successfulOps ;
7380
7481 protected SearchScrollAsyncAction (ParsedScrollId scrollId , Logger logger , DiscoveryNodes nodes ,
7582 ActionListener <SearchResponse > listener , SearchPhaseController searchPhaseController ,
76- SearchScrollRequest request ) {
83+ SearchScrollRequest request ,
84+ SearchTransportService searchTransportService ) {
7785 this .startTime = System .currentTimeMillis ();
7886 this .scrollId = scrollId ;
7987 this .successfulOps = new AtomicInteger (scrollId .getContext ().length );
@@ -82,6 +90,7 @@ protected SearchScrollAsyncAction(ParsedScrollId scrollId, Logger logger, Discov
8290 this .nodes = nodes ;
8391 this .searchPhaseController = searchPhaseController ;
8492 this .request = request ;
93+ this .searchTransportService = searchTransportService ;
8594 }
8695
8796 /**
@@ -97,57 +106,104 @@ public final void run() {
97106 final ScrollIdForNode [] context = scrollId .getContext ();
98107 if (context .length == 0 ) {
99108 listener .onFailure (new SearchPhaseExecutionException ("query" , "no nodes to search on" , ShardSearchFailure .EMPTY_ARRAY ));
100- return ;
109+ } else {
110+ collectNodesAndRun (Arrays .asList (context ), nodes , searchTransportService , ActionListener .wrap (lookup -> run (lookup , context ),
111+ listener ::onFailure ));
101112 }
113+ }
114+
115+ /**
116+ * This method collects nodes from the remote clusters asynchronously if any of the scroll IDs references a remote cluster.
117+ * Otherwise the action listener will be invoked immediately with a function based on the given discovery nodes.
118+ */
119+ static void collectNodesAndRun (final Iterable <ScrollIdForNode > scrollIds , DiscoveryNodes nodes ,
120+ SearchTransportService searchTransportService ,
121+ ActionListener <BiFunction <String , String , DiscoveryNode >> listener ) {
122+ Set <String > clusters = new HashSet <>();
123+ for (ScrollIdForNode target : scrollIds ) {
124+ if (target .getClusterAlias () != null ) {
125+ clusters .add (target .getClusterAlias ());
126+ }
127+ }
128+ if (clusters .isEmpty ()) { // no remote clusters
129+ listener .onResponse ((cluster , node ) -> nodes .get (node ));
130+ } else {
131+ RemoteClusterService remoteClusterService = searchTransportService .getRemoteClusterService ();
132+ remoteClusterService .collectNodes (clusters , ActionListener .wrap (nodeFunction -> {
133+ final BiFunction <String , String , DiscoveryNode > clusterNodeLookup = (clusterAlias , node ) -> {
134+ if (clusterAlias == null ) {
135+ return nodes .get (node );
136+ } else {
137+ return nodeFunction .apply (clusterAlias , node );
138+ }
139+ };
140+ listener .onResponse (clusterNodeLookup );
141+ }, listener ::onFailure ));
142+ }
143+ }
144+
145+ private void run (BiFunction <String , String , DiscoveryNode > clusterNodeLookup , final ScrollIdForNode [] context ) {
102146 final CountDown counter = new CountDown (scrollId .getContext ().length );
103147 for (int i = 0 ; i < context .length ; i ++) {
104148 ScrollIdForNode target = context [i ];
105- DiscoveryNode node = nodes .get (target .getNode ());
106149 final int shardIndex = i ;
107- if (node != null ) { // it might happen that a node is going down in-between scrolls...
108- InternalScrollSearchRequest internalRequest = internalScrollSearchRequest (target .getScrollId (), request );
109- // we can't create a SearchShardTarget here since we don't know the index and shard ID we are talking to
110- // we only know the node and the search context ID. Yet, the response will contain the SearchShardTarget
111- // from the target node instead...that's why we pass null here
112- SearchActionListener <T > searchActionListener = new SearchActionListener <T >(null , shardIndex ) {
113-
114- @ Override
115- protected void setSearchShardTarget (T response ) {
116- // don't do this - it's part of the response...
117- assert response .getSearchShardTarget () != null : "search shard target must not be null" ;
150+ final Transport .Connection connection ;
151+ try {
152+ DiscoveryNode node = clusterNodeLookup .apply (target .getClusterAlias (), target .getNode ());
153+ if (node == null ) {
154+ throw new IllegalStateException ("node [" + target .getNode () + "] is not available" );
155+ }
156+ connection = getConnection (target .getClusterAlias (), node );
157+ } catch (Exception ex ) {
158+ onShardFailure ("query" , counter , target .getScrollId (),
159+ ex , null , () -> SearchScrollAsyncAction .this .moveToNextPhase (clusterNodeLookup ));
160+ continue ;
161+ }
162+ final InternalScrollSearchRequest internalRequest = internalScrollSearchRequest (target .getScrollId (), request );
163+ // we can't create a SearchShardTarget here since we don't know the index and shard ID we are talking to
164+ // we only know the node and the search context ID. Yet, the response will contain the SearchShardTarget
165+ // from the target node instead...that's why we pass null here
166+ SearchActionListener <T > searchActionListener = new SearchActionListener <T >(null , shardIndex ) {
167+
168+ @ Override
169+ protected void setSearchShardTarget (T response ) {
170+ // don't do this - it's part of the response...
171+ assert response .getSearchShardTarget () != null : "search shard target must not be null" ;
172+ if (target .getClusterAlias () != null ) {
173+ // re-create the search target and add the cluster alias if there is any,
174+ // we need this down the road for subseq. phases
175+ SearchShardTarget searchShardTarget = response .getSearchShardTarget ();
176+ response .setSearchShardTarget (new SearchShardTarget (searchShardTarget .getNodeId (), searchShardTarget .getShardId (),
177+ target .getClusterAlias (), null ));
118178 }
179+ }
119180
120- @ Override
121- protected void innerOnResponse (T result ) {
122- assert shardIndex == result .getShardIndex () : "shard index mismatch: " + shardIndex + " but got: "
123- + result .getShardIndex ();
124- onFirstPhaseResult (shardIndex , result );
125- if (counter .countDown ()) {
126- SearchPhase phase = moveToNextPhase ();
127- try {
128- phase .run ();
129- } catch (Exception e ) {
130- // we need to fail the entire request here - the entire phase just blew up
131- // don't call onShardFailure or onFailure here since otherwise we'd countDown the counter
132- // again which would result in an exception
133- listener .onFailure (new SearchPhaseExecutionException (phase .getName (), "Phase failed" , e ,
134- ShardSearchFailure .EMPTY_ARRAY ));
135- }
181+ @ Override
182+ protected void innerOnResponse (T result ) {
183+ assert shardIndex == result .getShardIndex () : "shard index mismatch: " + shardIndex + " but got: "
184+ + result .getShardIndex ();
185+ onFirstPhaseResult (shardIndex , result );
186+ if (counter .countDown ()) {
187+ SearchPhase phase = moveToNextPhase (clusterNodeLookup );
188+ try {
189+ phase .run ();
190+ } catch (Exception e ) {
191+ // we need to fail the entire request here - the entire phase just blew up
192+ // don't call onShardFailure or onFailure here since otherwise we'd countDown the counter
193+ // again which would result in an exception
194+ listener .onFailure (new SearchPhaseExecutionException (phase .getName (), "Phase failed" , e ,
195+ ShardSearchFailure .EMPTY_ARRAY ));
136196 }
137197 }
198+ }
138199
139- @ Override
140- public void onFailure (Exception t ) {
141- onShardFailure ("query" , shardIndex , counter , target .getScrollId (), t , null ,
142- SearchScrollAsyncAction .this ::moveToNextPhase );
143- }
144- };
145- executeInitialPhase (node , internalRequest , searchActionListener );
146- } else { // the node is not available we treat this as a shard failure here
147- onShardFailure ("query" , shardIndex , counter , target .getScrollId (),
148- new IllegalStateException ("node [" + target .getNode () + "] is not available" ), null ,
149- SearchScrollAsyncAction .this ::moveToNextPhase );
150- }
200+ @ Override
201+ public void onFailure (Exception t ) {
202+ onShardFailure ("query" , counter , target .getScrollId (), t , null ,
203+ () -> SearchScrollAsyncAction .this .moveToNextPhase (clusterNodeLookup ));
204+ }
205+ };
206+ executeInitialPhase (connection , internalRequest , searchActionListener );
151207 }
152208 }
153209
@@ -164,10 +220,10 @@ private synchronized void addShardFailure(ShardSearchFailure failure) {
164220 shardFailures .add (failure );
165221 }
166222
167- protected abstract void executeInitialPhase (DiscoveryNode node , InternalScrollSearchRequest internalRequest ,
223+ protected abstract void executeInitialPhase (Transport . Connection connection , InternalScrollSearchRequest internalRequest ,
168224 SearchActionListener <T > searchActionListener );
169225
170- protected abstract SearchPhase moveToNextPhase ();
226+ protected abstract SearchPhase moveToNextPhase (BiFunction < String , String , DiscoveryNode > clusterNodeLookup );
171227
172228 protected abstract void onFirstPhaseResult (int shardId , T result );
173229
@@ -199,9 +255,9 @@ protected final void sendResponse(SearchPhaseController.ReducedQueryPhase queryP
199255 }
200256 }
201257
202- protected void onShardFailure (String phaseName , final int shardIndex , final CountDown counter , final long searchId , Exception failure ,
203- @ Nullable SearchShardTarget searchShardTarget ,
204- Supplier <SearchPhase > nextPhaseSupplier ) {
258+ protected void onShardFailure (String phaseName , final CountDown counter , final long searchId , Exception failure ,
259+ @ Nullable SearchShardTarget searchShardTarget ,
260+ Supplier <SearchPhase > nextPhaseSupplier ) {
205261 if (logger .isDebugEnabled ()) {
206262 logger .debug ((Supplier <?>) () -> new ParameterizedMessage ("[{}] Failed to execute {} phase" , searchId , phaseName ), failure );
207263 }
@@ -223,4 +279,8 @@ protected void onShardFailure(String phaseName, final int shardIndex, final Coun
223279 }
224280 }
225281 }
282+
283+ protected Transport .Connection getConnection (String clusterAlias , DiscoveryNode node ) {
284+ return searchTransportService .getConnection (clusterAlias , node );
285+ }
226286}
0 commit comments