2323import org .elasticsearch .action .OriginalIndices ;
2424import org .elasticsearch .action .support .ActionFilters ;
2525import org .elasticsearch .action .support .HandledTransportAction ;
26+ import org .elasticsearch .client .Client ;
2627import org .elasticsearch .cluster .ClusterState ;
2728import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
2829import org .elasticsearch .cluster .service .ClusterService ;
3334import org .elasticsearch .threadpool .ThreadPool ;
3435import org .elasticsearch .transport .RemoteClusterAware ;
3536import org .elasticsearch .transport .RemoteClusterService ;
36- import org .elasticsearch .transport .Transport ;
37- import org .elasticsearch .transport .TransportException ;
38- import org .elasticsearch .transport .TransportRequestOptions ;
39- import org .elasticsearch .transport .TransportResponseHandler ;
4037import org .elasticsearch .transport .TransportService ;
4138
4239import java .util .ArrayList ;
@@ -49,7 +46,6 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
4946 private final ClusterService clusterService ;
5047 private final TransportFieldCapabilitiesIndexAction shardAction ;
5148 private final RemoteClusterService remoteClusterService ;
52- private final TransportService transportService ;
5349
5450 @ Inject
5551 public TransportFieldCapabilitiesAction (Settings settings , TransportService transportService ,
@@ -62,7 +58,6 @@ public TransportFieldCapabilitiesAction(Settings settings, TransportService tran
6258 actionFilters , indexNameExpressionResolver , FieldCapabilitiesRequest ::new );
6359 this .clusterService = clusterService ;
6460 this .remoteClusterService = transportService .getRemoteClusterService ();
65- this .transportService = transportService ;
6661 this .shardAction = shardAction ;
6762 }
6863
@@ -118,47 +113,20 @@ public void onFailure(Exception e) {
118113 for (Map .Entry <String , OriginalIndices > remoteIndices : remoteClusterIndices .entrySet ()) {
119114 String clusterAlias = remoteIndices .getKey ();
120115 OriginalIndices originalIndices = remoteIndices .getValue ();
121- // if we are connected this is basically a no-op, if we are not we try to connect in parallel in a non-blocking fashion
122- remoteClusterService .ensureConnected (clusterAlias , ActionListener .wrap (v -> {
123- Transport .Connection connection = remoteClusterService .getConnection (clusterAlias );
124- FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest ();
125- remoteRequest .setMergeResults (false ); // we need to merge on this node
126- remoteRequest .indicesOptions (originalIndices .indicesOptions ());
127- remoteRequest .indices (originalIndices .indices ());
128- remoteRequest .fields (request .fields ());
129- transportService .sendRequest (connection , FieldCapabilitiesAction .NAME , remoteRequest , TransportRequestOptions .EMPTY ,
130- new TransportResponseHandler <FieldCapabilitiesResponse >() {
131-
132- @ Override
133- public FieldCapabilitiesResponse newInstance () {
134- return new FieldCapabilitiesResponse ();
135- }
136-
137- @ Override
138- public void handleResponse (FieldCapabilitiesResponse response ) {
139- try {
140- for (FieldCapabilitiesIndexResponse res : response .getIndexResponses ()) {
141- indexResponses .add (new FieldCapabilitiesIndexResponse (RemoteClusterAware .
142- buildRemoteIndexName (clusterAlias , res .getIndexName ()), res .get ()));
143- }
144- } finally {
145- onResponse .run ();
146- }
147- }
148-
149- @ Override
150- public void handleException (TransportException exp ) {
151- onResponse .run ();
152- }
153-
154- @ Override
155- public String executor () {
156- return ThreadPool .Names .SAME ;
157- }
158- });
159- }, e -> onResponse .run ()));
116+ Client remoteClusterClient = remoteClusterService .getRemoteClusterClient (threadPool , clusterAlias );
117+ FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest ();
118+ remoteRequest .setMergeResults (false ); // we need to merge on this node
119+ remoteRequest .indicesOptions (originalIndices .indicesOptions ());
120+ remoteRequest .indices (originalIndices .indices ());
121+ remoteRequest .fields (request .fields ());
122+ remoteClusterClient .fieldCaps (remoteRequest , ActionListener .wrap (response -> {
123+ for (FieldCapabilitiesIndexResponse res : response .getIndexResponses ()) {
124+ indexResponses .add (new FieldCapabilitiesIndexResponse (RemoteClusterAware .
125+ buildRemoteIndexName (clusterAlias , res .getIndexName ()), res .get ()));
126+ }
127+ onResponse .run ();
128+ }, failure -> onResponse .run ()));
160129 }
161-
162130 }
163131 }
164132
0 commit comments