Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchPhaseController.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]ShardSearchFailure.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]TransportClearScrollAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]TransportSearchAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]DelegatingActionListener.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]IndicesOptions.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]ToXContentToBytes.java" checks="LineLength" />
Expand Down Expand Up @@ -454,7 +453,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]ingest[/\\]SimulatePipelineRequestParsingTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]ingest[/\\]SimulatePipelineResponseTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]ingest[/\\]WriteableIngestDocumentTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]RemoteClusterServiceTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchRequestBuilderTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]TransportActionFilterChainTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]WaitActiveShardCountIT.java" checks="LineLength" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.elasticsearch.action.admin.cluster.remote;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.search.RemoteConnectionInfo;
import org.elasticsearch.transport.RemoteConnectionInfo;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.elasticsearch.action.admin.cluster.remote;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.RemoteClusterService;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.action.search.SearchTransportService;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
Expand All @@ -30,8 +30,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;

public final class TransportRemoteInfoAction extends HandledTransportAction<RemoteInfoRequest, RemoteInfoResponse> {

private final RemoteClusterService remoteClusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ClusterSettings;
Expand All @@ -46,6 +46,7 @@
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TaskAwareTransportRequestHandler;
Expand All @@ -62,7 +63,7 @@
* An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
* transport.
*/
public class SearchTransportService extends AbstractLifecycleComponent {
public class SearchTransportService extends AbstractComponent {

public static final String FREE_CONTEXT_SCROLL_ACTION_NAME = "indices:data/read/search[free_context/scroll]";
public static final String FREE_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context]";
Expand All @@ -77,17 +78,10 @@ public class SearchTransportService extends AbstractLifecycleComponent {
public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]";

private final TransportService transportService;
private final RemoteClusterService remoteClusterService;
private final boolean connectToRemote;

public SearchTransportService(Settings settings, ClusterSettings clusterSettings, TransportService transportService) {
super(settings);
this.connectToRemote = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
this.transportService = transportService;
this.remoteClusterService = new RemoteClusterService(settings, transportService);
if (connectToRemote) {
remoteClusterService.listenForUpdates(clusterSettings);
}
}

public void sendFreeContext(Transport.Connection connection, final long contextId, OriginalIndices originalIndices) {
Expand Down Expand Up @@ -181,7 +175,7 @@ void sendExecuteMultiSearch(final MultiSearchRequest request, SearchTask task,
}

public RemoteClusterService getRemoteClusterService() {
return remoteClusterService;
return transportService.getRemoteClusterService();
}

static class ScrollFreeContextRequest extends TransportRequest {
Expand Down Expand Up @@ -399,20 +393,4 @@ public void messageReceived(ShardFetchSearchRequest request, TransportChannel ch
Transport.Connection getConnection(DiscoveryNode node) {
return transportService.getConnection(node);
}

@Override
protected void doStart() {
if (connectToRemote) {
// here we start to connect to the remote clusters
remoteClusterService.initializeRemoteClusters();
}
}

@Override
protected void doStop() {}

@Override
protected void doClose() throws IOException {
remoteClusterService.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

package org.elasticsearch.action.search;

import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -37,15 +40,19 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -203,14 +210,59 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<
ActionListener.wrap((searchShardsResponses) -> {
List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
Function<String, Transport.Connection> connectionFunction = remoteClusterService.processRemoteShards(
Function<String, Transport.Connection> connectionFunction = processRemoteShards(remoteClusterService,
searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteShardIterators,
connectionFunction, clusterState, remoteAliasFilters, listener);
}, listener::onFailure));
}
}

static Function<String, Transport.Connection> processRemoteShards(RemoteClusterService remoteClusterService,
Map<String, ClusterSearchShardsResponse> searchShardsResponses,
Map<String, OriginalIndices> remoteIndicesByCluster,
List<SearchShardIterator> remoteShardIterators,
Map<String, AliasFilter> aliasFilterMap) {
Map<String, Supplier<Transport.Connection>> nodeToCluster = new HashMap<>();
for (Map.Entry<String, ClusterSearchShardsResponse> entry : searchShardsResponses.entrySet()) {
String clusterAlias = entry.getKey();
ClusterSearchShardsResponse searchShardsResponse = entry.getValue();
for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) {
nodeToCluster.put(remoteNode.getId(), () -> remoteClusterService.getConnection(remoteNode, clusterAlias));
}
Map<String, AliasFilter> indicesAndFilters = searchShardsResponse.getIndicesAndFilters();
for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
//add the cluster name to the remote index names for indices disambiguation
//this ends up in the hits returned with the search response
ShardId shardId = clusterSearchShardsGroup.getShardId();
Index remoteIndex = shardId.getIndex();
Index index = new Index(clusterAlias + RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR + remoteIndex.getName(),
remoteIndex.getUUID());
OriginalIndices originalIndices = remoteIndicesByCluster.get(clusterAlias);
assert originalIndices != null;
SearchShardIterator shardIterator = new SearchShardIterator(new ShardId(index, shardId.getId()),
Arrays.asList(clusterSearchShardsGroup.getShards()), originalIndices);
remoteShardIterators.add(shardIterator);
AliasFilter aliasFilter;
if (indicesAndFilters == null) {
aliasFilter = new AliasFilter(null, Strings.EMPTY_ARRAY);
} else {
aliasFilter = indicesAndFilters.get(shardId.getIndexName());
assert aliasFilter != null;
}
// here we have to map the filters to the UUID since from now on we use the uuid for the lookup
aliasFilterMap.put(remoteIndex.getUUID(), aliasFilter);
}
}
return (nodeId) -> {
Supplier<Transport.Connection> supplier = nodeToCluster.get(nodeId);
if (supplier == null) {
throw new IllegalArgumentException("unknown remote node: " + nodeId);
}
return supplier.get();
};
}

private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices,
List<SearchShardIterator> remoteShardIterators, Function<String, Transport.Connection> remoteConnections,
ClusterState clusterState, Map<String, AliasFilter> remoteAliasMap,
Expand All @@ -234,9 +286,10 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
for (int i = 0; i < indices.length; i++) {
concreteIndices[i] = indices[i].getName();
}
GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap,
searchRequest.preference());
GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices, remoteShardIterators);
GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
concreteIndices, routingMap, searchRequest.preference());
GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,
remoteShardIterators);

failIfOverShardCountLimit(clusterService, shardIterators.size());

Expand Down Expand Up @@ -297,7 +350,8 @@ protected final void doExecute(SearchRequest searchRequest, ActionListener<Searc

private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchRequest searchRequest,
GroupShardsIterator<SearchShardIterator> shardIterators,
SearchTimeProvider timeProvider, Function<String, Transport.Connection> connectionLookup,
SearchTimeProvider timeProvider,
Function<String, Transport.Connection> connectionLookup,
long clusterStateVersion, Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
ActionListener<SearchResponse> listener) {
Expand All @@ -306,13 +360,13 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchReque
switch(searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH:
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, timeProvider,
clusterStateVersion, task);
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task);
break;
case QUERY_THEN_FETCH:
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, timeProvider,
clusterStateVersion, task);
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
timeProvider, clusterStateVersion, task);
break;
default:
throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.elasticsearch.common.settings;

import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.search.RemoteClusterAware;
import org.elasticsearch.action.search.RemoteClusterService;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.DestructiveOperations;
Expand Down
7 changes: 0 additions & 7 deletions core/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -736,10 +736,6 @@ public void onTimeout(TimeValue timeout) {

// start nodes now, after the http server, because it may take some time
tribeService.startNodes();
// starts connecting to remote clusters if any cluster is configured
SearchTransportService searchTransportService = injector.getInstance(SearchTransportService.class);
searchTransportService.start();

logger.info("started");

return this;
Expand Down Expand Up @@ -773,7 +769,6 @@ private Node stop() {
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
injector.getInstance(TransportService.class).stop();
injector.getInstance(SearchTransportService.class).stop();

pluginLifecycleComponents.forEach(LifecycleComponent::stop);
// we should stop this last since it waits for resources to get released
Expand Down Expand Up @@ -835,8 +830,6 @@ public synchronized void close() throws IOException {
toClose.add(injector.getInstance(SearchService.class));
toClose.add(() -> stopWatch.stop().start("transport"));
toClose.add(injector.getInstance(TransportService.class));
toClose.add(() -> stopWatch.stop().start("search_transport_service"));
toClose.add(injector.getInstance(SearchTransportService.class));

for (LifecycleComponent plugin : pluginLifecycleComponents) {
toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
package org.elasticsearch.transport;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
Expand Down Expand Up @@ -51,8 +51,8 @@ public abstract class RemoteClusterAware extends AbstractComponent {
public static final Setting.AffixSetting<List<InetSocketAddress>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting("search.remote.",
"seeds", (key) -> Setting.listSetting(key, Collections.emptyList(), RemoteClusterAware::parseSeedAddress,
Setting.Property.NodeScope, Setting.Property.Dynamic));
protected static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
protected static final String LOCAL_CLUSTER_GROUP_KEY = "";
public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
public static final String LOCAL_CLUSTER_GROUP_KEY = "";
protected final ClusterNameExpressionResolver clusterNameResolver;

/**
Expand Down Expand Up @@ -91,7 +91,7 @@ protected static Map<String, List<DiscoveryNode>> buildRemoteClustersSeeds(Setti
*
* @return a map of grouped remote and local indices
*/
protected Map<String, List<String>> groupClusterIndices(String[] requestIndices, Predicate<String> indexExists) {
public Map<String, List<String>> groupClusterIndices(String[] requestIndices, Predicate<String> indexExists) {
Map<String, List<String>> perClusterIndices = new HashMap<>();
Set<String> remoteClusterNames = getRemoteClusterNames();
for (String index : requestIndices) {
Expand Down
Loading