Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -33,10 +34,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
Expand All @@ -49,7 +46,6 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
private final ClusterService clusterService;
private final TransportFieldCapabilitiesIndexAction shardAction;
private final RemoteClusterService remoteClusterService;
private final TransportService transportService;

@Inject
public TransportFieldCapabilitiesAction(Settings settings, TransportService transportService,
Expand All @@ -62,7 +58,6 @@ public TransportFieldCapabilitiesAction(Settings settings, TransportService tran
actionFilters, indexNameExpressionResolver, FieldCapabilitiesRequest::new);
this.clusterService = clusterService;
this.remoteClusterService = transportService.getRemoteClusterService();
this.transportService = transportService;
this.shardAction = shardAction;
}

Expand Down Expand Up @@ -118,47 +113,20 @@ public void onFailure(Exception e) {
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
String clusterAlias = remoteIndices.getKey();
OriginalIndices originalIndices = remoteIndices.getValue();
// if we are connected this is basically a no-op, if we are not we try to connect in parallel in a non-blocking fashion
remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(v -> {
Transport.Connection connection = remoteClusterService.getConnection(clusterAlias);
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
remoteRequest.setMergeResults(false); // we need to merge on this node
remoteRequest.indicesOptions(originalIndices.indicesOptions());
remoteRequest.indices(originalIndices.indices());
remoteRequest.fields(request.fields());
transportService.sendRequest(connection, FieldCapabilitiesAction.NAME, remoteRequest, TransportRequestOptions.EMPTY,
new TransportResponseHandler<FieldCapabilitiesResponse>() {

@Override
public FieldCapabilitiesResponse newInstance() {
return new FieldCapabilitiesResponse();
}

@Override
public void handleResponse(FieldCapabilitiesResponse response) {
try {
for (FieldCapabilitiesIndexResponse res : response.getIndexResponses()) {
indexResponses.add(new FieldCapabilitiesIndexResponse(RemoteClusterAware.
buildRemoteIndexName(clusterAlias, res.getIndexName()), res.get()));
}
} finally {
onResponse.run();
}
}

@Override
public void handleException(TransportException exp) {
onResponse.run();
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}, e -> onResponse.run()));
Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
remoteRequest.setMergeResults(false); // we need to merge on this node
remoteRequest.indicesOptions(originalIndices.indicesOptions());
remoteRequest.indices(originalIndices.indices());
remoteRequest.fields(request.fields());
remoteClusterClient.fieldCaps(remoteRequest, ActionListener.wrap(response -> {
for (FieldCapabilitiesIndexResponse res : response.getIndexResponses()) {
indexResponses.add(new FieldCapabilitiesIndexResponse(RemoteClusterAware.
buildRemoteIndexName(clusterAlias, res.getIndexName()), res.get()));
}
onResponse.run();
}, failure -> onResponse.run()));
}

}
}

Expand Down