diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml
index 5deb52e53fdba..82c0ce3b77c86 100644
--- a/buildSrc/src/main/resources/checkstyle_suppressions.xml
+++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml
@@ -147,7 +147,6 @@
-
@@ -454,7 +453,6 @@
-
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoResponse.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoResponse.java
index 6d79e23092291..8e9360bdb1238 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoResponse.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoResponse.java
@@ -20,7 +20,7 @@
package org.elasticsearch.action.admin.cluster.remote;
import org.elasticsearch.action.ActionResponse;
-import org.elasticsearch.action.search.RemoteConnectionInfo;
+import org.elasticsearch.transport.RemoteConnectionInfo;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/TransportRemoteInfoAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/TransportRemoteInfoAction.java
index cdb79a825834b..33254a9aed9ab 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/TransportRemoteInfoAction.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/remote/TransportRemoteInfoAction.java
@@ -20,7 +20,7 @@
package org.elasticsearch.action.admin.cluster.remote;
import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.search.RemoteClusterService;
+import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.action.search.SearchTransportService;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
@@ -30,8 +30,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
-import java.util.ArrayList;
-
public final class TransportRemoteInfoAction extends HandledTransportAction {
private final RemoteClusterService remoteClusterService;
diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
index a221c6001a58a..9e858a4ccafdd 100644
--- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
+++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java
@@ -26,7 +26,7 @@
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ClusterSettings;
@@ -46,6 +46,7 @@
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TaskAwareTransportRequestHandler;
@@ -62,7 +63,7 @@
* An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
* transport.
*/
-public class SearchTransportService extends AbstractLifecycleComponent {
+public class SearchTransportService extends AbstractComponent {
public static final String FREE_CONTEXT_SCROLL_ACTION_NAME = "indices:data/read/search[free_context/scroll]";
public static final String FREE_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context]";
@@ -77,17 +78,10 @@ public class SearchTransportService extends AbstractLifecycleComponent {
public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]";
private final TransportService transportService;
- private final RemoteClusterService remoteClusterService;
- private final boolean connectToRemote;
public SearchTransportService(Settings settings, ClusterSettings clusterSettings, TransportService transportService) {
super(settings);
- this.connectToRemote = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
this.transportService = transportService;
- this.remoteClusterService = new RemoteClusterService(settings, transportService);
- if (connectToRemote) {
- remoteClusterService.listenForUpdates(clusterSettings);
- }
}
public void sendFreeContext(Transport.Connection connection, final long contextId, OriginalIndices originalIndices) {
@@ -181,7 +175,7 @@ void sendExecuteMultiSearch(final MultiSearchRequest request, SearchTask task,
}
public RemoteClusterService getRemoteClusterService() {
- return remoteClusterService;
+ return transportService.getRemoteClusterService();
}
static class ScrollFreeContextRequest extends TransportRequest {
@@ -399,20 +393,4 @@ public void messageReceived(ShardFetchSearchRequest request, TransportChannel ch
Transport.Connection getConnection(DiscoveryNode node) {
return transportService.getConnection(node);
}
-
- @Override
- protected void doStart() {
- if (connectToRemote) {
- // here we start to connect to the remote clusters
- remoteClusterService.initializeRemoteClusters();
- }
- }
-
- @Override
- protected void doStop() {}
-
- @Override
- protected void doClose() throws IOException {
- remoteClusterService.close();
- }
}
diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java
index 94803d771ebaf..ae18caa50f0ba 100644
--- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java
+++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java
@@ -19,8 +19,11 @@
package org.elasticsearch.action.search;
+import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
+import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
+import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
@@ -37,15 +40,19 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
+import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.RemoteClusterAware;
+import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -203,7 +210,7 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<
ActionListener.wrap((searchShardsResponses) -> {
List remoteShardIterators = new ArrayList<>();
Map remoteAliasFilters = new HashMap<>();
- Function connectionFunction = remoteClusterService.processRemoteShards(
+ Function connectionFunction = processRemoteShards(remoteClusterService,
searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteShardIterators,
connectionFunction, clusterState, remoteAliasFilters, listener);
@@ -211,6 +218,51 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<
}
}
+ static Function processRemoteShards(RemoteClusterService remoteClusterService,
+ Map searchShardsResponses,
+ Map remoteIndicesByCluster,
+ List remoteShardIterators,
+ Map aliasFilterMap) {
+ Map> nodeToCluster = new HashMap<>();
+ for (Map.Entry entry : searchShardsResponses.entrySet()) {
+ String clusterAlias = entry.getKey();
+ ClusterSearchShardsResponse searchShardsResponse = entry.getValue();
+ for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) {
+ nodeToCluster.put(remoteNode.getId(), () -> remoteClusterService.getConnection(remoteNode, clusterAlias));
+ }
+ Map indicesAndFilters = searchShardsResponse.getIndicesAndFilters();
+ for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
+ //add the cluster name to the remote index names for indices disambiguation
+ //this ends up in the hits returned with the search response
+ ShardId shardId = clusterSearchShardsGroup.getShardId();
+ Index remoteIndex = shardId.getIndex();
+ Index index = new Index(clusterAlias + RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR + remoteIndex.getName(),
+ remoteIndex.getUUID());
+ OriginalIndices originalIndices = remoteIndicesByCluster.get(clusterAlias);
+ assert originalIndices != null;
+ SearchShardIterator shardIterator = new SearchShardIterator(new ShardId(index, shardId.getId()),
+ Arrays.asList(clusterSearchShardsGroup.getShards()), originalIndices);
+ remoteShardIterators.add(shardIterator);
+ AliasFilter aliasFilter;
+ if (indicesAndFilters == null) {
+ aliasFilter = new AliasFilter(null, Strings.EMPTY_ARRAY);
+ } else {
+ aliasFilter = indicesAndFilters.get(shardId.getIndexName());
+ assert aliasFilter != null;
+ }
+ // here we have to map the filters to the UUID since from now on we use the uuid for the lookup
+ aliasFilterMap.put(remoteIndex.getUUID(), aliasFilter);
+ }
+ }
+ return (nodeId) -> {
+ Supplier supplier = nodeToCluster.get(nodeId);
+ if (supplier == null) {
+ throw new IllegalArgumentException("unknown remote node: " + nodeId);
+ }
+ return supplier.get();
+ };
+ }
+
private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices,
List remoteShardIterators, Function remoteConnections,
ClusterState clusterState, Map remoteAliasMap,
@@ -234,9 +286,10 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
for (int i = 0; i < indices.length; i++) {
concreteIndices[i] = indices[i].getName();
}
- GroupShardsIterator localShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap,
- searchRequest.preference());
- GroupShardsIterator shardIterators = mergeShardsIterators(localShardsIterator, localIndices, remoteShardIterators);
+ GroupShardsIterator localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
+ concreteIndices, routingMap, searchRequest.preference());
+ GroupShardsIterator shardIterators = mergeShardsIterators(localShardsIterator, localIndices,
+ remoteShardIterators);
failIfOverShardCountLimit(clusterService, shardIterators.size());
@@ -297,7 +350,8 @@ protected final void doExecute(SearchRequest searchRequest, ActionListener shardIterators,
- SearchTimeProvider timeProvider, Function connectionLookup,
+ SearchTimeProvider timeProvider,
+ Function connectionLookup,
long clusterStateVersion, Map aliasFilter,
Map concreteIndexBoosts,
ActionListener listener) {
@@ -306,13 +360,13 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchReque
switch(searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH:
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
- aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, timeProvider,
- clusterStateVersion, task);
+ aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
+ timeProvider, clusterStateVersion, task);
break;
case QUERY_THEN_FETCH:
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
- aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, timeProvider,
- clusterStateVersion, task);
+ aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
+ timeProvider, clusterStateVersion, task);
break;
default:
throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");
diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
index 18ca8ef7ad624..778b02dd7915a 100644
--- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
+++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
@@ -19,8 +19,8 @@
package org.elasticsearch.common.settings;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
-import org.elasticsearch.action.search.RemoteClusterAware;
-import org.elasticsearch.action.search.RemoteClusterService;
+import org.elasticsearch.transport.RemoteClusterService;
+import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.DestructiveOperations;
diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java
index 0b10b16863b07..c89f31fbe749c 100644
--- a/core/src/main/java/org/elasticsearch/node/Node.java
+++ b/core/src/main/java/org/elasticsearch/node/Node.java
@@ -736,10 +736,6 @@ public void onTimeout(TimeValue timeout) {
// start nodes now, after the http server, because it may take some time
tribeService.startNodes();
- // starts connecting to remote clusters if any cluster is configured
- SearchTransportService searchTransportService = injector.getInstance(SearchTransportService.class);
- searchTransportService.start();
-
logger.info("started");
return this;
@@ -773,7 +769,6 @@ private Node stop() {
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
injector.getInstance(TransportService.class).stop();
- injector.getInstance(SearchTransportService.class).stop();
pluginLifecycleComponents.forEach(LifecycleComponent::stop);
// we should stop this last since it waits for resources to get released
@@ -835,8 +830,6 @@ public synchronized void close() throws IOException {
toClose.add(injector.getInstance(SearchService.class));
toClose.add(() -> stopWatch.stop().start("transport"));
toClose.add(injector.getInstance(TransportService.class));
- toClose.add(() -> stopWatch.stop().start("search_transport_service"));
- toClose.add(injector.getInstance(SearchTransportService.class));
for (LifecycleComponent plugin : pluginLifecycleComponents) {
toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")"));
diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterAware.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java
similarity index 96%
rename from core/src/main/java/org/elasticsearch/action/search/RemoteClusterAware.java
rename to core/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java
index 2785a8efdb6f0..42ab7315234bc 100644
--- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterAware.java
+++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.elasticsearch.action.search;
+package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
@@ -51,8 +51,8 @@ public abstract class RemoteClusterAware extends AbstractComponent {
public static final Setting.AffixSetting> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting("search.remote.",
"seeds", (key) -> Setting.listSetting(key, Collections.emptyList(), RemoteClusterAware::parseSeedAddress,
Setting.Property.NodeScope, Setting.Property.Dynamic));
- protected static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
- protected static final String LOCAL_CLUSTER_GROUP_KEY = "";
+ public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
+ public static final String LOCAL_CLUSTER_GROUP_KEY = "";
protected final ClusterNameExpressionResolver clusterNameResolver;
/**
@@ -91,7 +91,7 @@ protected static Map> buildRemoteClustersSeeds(Setti
*
* @return a map of grouped remote and local indices
*/
- protected Map> groupClusterIndices(String[] requestIndices, Predicate indexExists) {
+ public Map> groupClusterIndices(String[] requestIndices, Predicate indexExists) {
Map> perClusterIndices = new HashMap<>();
Set remoteClusterNames = getRemoteClusterNames();
for (String index : requestIndices) {
diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java
similarity index 97%
rename from core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java
rename to core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java
index a3f3f3a9612b5..5c7e072f65063 100644
--- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterConnection.java
+++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.elasticsearch.action.search;
+package org.elasticsearch.transport;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
@@ -33,6 +33,7 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
@@ -42,17 +43,6 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.transport.ConnectTransportException;
-import org.elasticsearch.transport.ConnectionProfile;
-import org.elasticsearch.transport.TcpTransport;
-import org.elasticsearch.transport.Transport;
-import org.elasticsearch.transport.TransportActionProxy;
-import org.elasticsearch.transport.TransportConnectionListener;
-import org.elasticsearch.transport.TransportException;
-import org.elasticsearch.transport.TransportRequest;
-import org.elasticsearch.transport.TransportRequestOptions;
-import org.elasticsearch.transport.TransportResponseHandler;
-import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException;
diff --git a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java
similarity index 81%
rename from core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java
rename to core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java
index cf2be61ed0526..92dce9d53f13a 100644
--- a/core/src/main/java/org/elasticsearch/action/search/RemoteClusterService.java
+++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.elasticsearch.action.search;
+package org.elasticsearch.transport;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.IOUtils;
@@ -25,6 +25,8 @@
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchShardIterator;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -38,9 +40,6 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.internal.AliasFilter;
-import org.elasticsearch.transport.Transport;
-import org.elasticsearch.transport.TransportException;
-import org.elasticsearch.transport.TransportService;
import java.io.Closeable;
import java.io.IOException;
@@ -169,7 +168,7 @@ private synchronized void updateRemoteClusters(Map>
/**
* Returns true if at least one remote cluster is configured
*/
- boolean isCrossClusterSearchEnabled() {
+ public boolean isCrossClusterSearchEnabled() {
return remoteClusters.isEmpty() == false;
}
@@ -184,7 +183,7 @@ boolean isRemoteClusterRegistered(String clusterName) {
return remoteClusters.containsKey(clusterName);
}
- void collectSearchShards(SearchRequest searchRequest, Map remoteIndicesByCluster,
+ public void collectSearchShards(SearchRequest searchRequest, Map remoteIndicesByCluster,
ActionListener