diff --git a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java index a6796c76f9279..95ff8a0809295 100644 --- a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java +++ b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java @@ -53,6 +53,7 @@ protected void doExecute(SearchRequest request, ActionListener l new SearchHit[0], 0L, 0.0f), new InternalAggregations(Collections.emptyList()), new Suggest(Collections.emptyList()), - new SearchProfileShardResults(Collections.emptyMap()), false, false, 1), "", 1, 1, 0, 0, new ShardSearchFailure[0])); + new SearchProfileShardResults(Collections.emptyMap()), false, false, 1), "", 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY)); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index 43a21856f149c..8e0c13aea474e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -166,7 +166,8 @@ public void testInfo() throws IOException { public void testSearchScroll() throws IOException { Header[] headers = randomHeaders(random(), "Header"); SearchResponse mockSearchResponse = new SearchResponse(new SearchResponseSections(SearchHits.empty(), InternalAggregations.EMPTY, - null, false, false, null, 1), randomAlphaOfLengthBetween(5, 10), 5, 5, 0, 100, new ShardSearchFailure[0]); + null, false, false, null, 1), randomAlphaOfLengthBetween(5, 10), 5, 5, 0, 100, ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY); mockResponse(mockSearchResponse); SearchResponse searchResponse = restHighLevelClient.searchScroll(new SearchScrollRequest(randomAlphaOfLengthBetween(5, 10)), headers); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/SearchIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/SearchIT.java index d73e746528dc6..55fe1ce6942cb 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/SearchIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/SearchIT.java @@ -470,5 +470,6 @@ private static void assertSearchHeader(SearchResponse searchResponse) { assertThat(searchResponse.getTotalShards(), greaterThan(0)); assertEquals(searchResponse.getTotalShards(), searchResponse.getSuccessfulShards()); assertEquals(0, searchResponse.getShardFailures().length); + assertEquals(SearchResponse.Clusters.EMPTY, searchResponse.getClusters()); } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java index b5b28e2b8f79a..28c7903efde81 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java @@ -30,11 +30,15 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; public class ClusterSearchShardsResponse extends ActionResponse implements ToXContentObject { + public static final ClusterSearchShardsResponse EMPTY = new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0], + new DiscoveryNode[0], Collections.emptyMap()); + private ClusterSearchShardsGroup[] groups; private DiscoveryNode[] nodes; private Map indicesAndFilters; diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 1c25cd7ac37a2..4632ef63174a2 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -68,7 +68,7 @@ abstract class AbstractSearchAsyncAction exten private final AtomicInteger successfulOps = new AtomicInteger(); private final AtomicInteger skippedOps = new AtomicInteger(); private final TransportSearchAction.SearchTimeProvider timeProvider; - + private final SearchResponse.Clusters clusters; protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService, BiFunction nodeIdToConnection, @@ -76,7 +76,8 @@ protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportS Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion, - SearchTask task, SearchPhaseResults resultConsumer, int maxConcurrentShardRequests) { + SearchTask task, SearchPhaseResults resultConsumer, int maxConcurrentShardRequests, + SearchResponse.Clusters clusters) { super(name, request, shardsIts, logger, maxConcurrentShardRequests, executor); this.timeProvider = timeProvider; this.logger = logger; @@ -90,6 +91,7 @@ protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportS this.concreteIndexBoosts = concreteIndexBoosts; this.aliasFilter = aliasFilter; this.results = resultConsumer; + this.clusters = clusters; } /** @@ -108,7 +110,7 @@ public final void start() { //no search shards to search on, bail with empty response //(it happens with search across _all with no indices around and consistent with broadcast operations) listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, 0, buildTookInMillis(), - ShardSearchFailure.EMPTY_ARRAY)); + ShardSearchFailure.EMPTY_ARRAY, clusters)); return; } executePhase(this); @@ -264,7 +266,7 @@ public final SearchRequest getRequest() { @Override public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) { return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(), - skippedOps.get(), buildTookInMillis(), buildShardFailures()); + skippedOps.get(), buildTookInMillis(), buildShardFailures(), clusters); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/core/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index 49575125f68d6..fe42d50393635 100644 --- a/core/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/core/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -33,7 +33,7 @@ import java.util.stream.Stream; /** - * This search phrase can be used as an initial search phase to pre-filter search shards based on query rewriting. + * This search phase can be used as an initial search phase to pre-filter search shards based on query rewriting. * The queries are rewritten against the shards and based on the rewrite result shards might be able to be excluded * from the search. The extra round trip to the search shards is very cheap and is not subject to rejections * which allows to fan out to more shards at the same time without running into rejections even if we are hitting a @@ -50,13 +50,15 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion, - SearchTask task, Function, SearchPhase> phaseFactory) { + SearchTask task, Function, SearchPhase> phaseFactory, + SearchResponse.Clusters clusters) { /* * We set max concurrent shard requests to the number of shards to otherwise avoid deep recursing that would occur if the local node * is the coordinating node for the query, holds all the shards for the request, and there are a lot of shards. */ super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, - listener, shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size()); + listener, shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size(), + clusters); this.phaseFactory = phaseFactory; this.shardsIts = shardsIts; } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index ec055dfec8df6..9bcbe1c8e6760 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -40,10 +40,10 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction final Map concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor, final SearchRequest request, final ActionListener listener, final GroupShardsIterator shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider, - final long clusterStateVersion, final SearchTask task) { + final long clusterStateVersion, final SearchTask task, SearchResponse.Clusters clusters) { super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task, new ArraySearchPhaseResults<>(shardsIts.size()), - request.getMaxConcurrentShardRequests()); + request.getMaxConcurrentShardRequests(), clusters); this.searchPhaseController = searchPhaseController; } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 5ddd1df231d17..b7669312b0088 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -40,10 +40,10 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor, final SearchRequest request, final ActionListener listener, final GroupShardsIterator shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider, - long clusterStateVersion, SearchTask task) { + long clusterStateVersion, SearchTask task, SearchResponse.Clusters clusters) { super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size()), - request.getMaxConcurrentShardRequests()); + request.getMaxConcurrentShardRequests(), clusters); this.searchPhaseController = searchPhaseController; } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchResponse.java b/core/src/main/java/org/elasticsearch/action/search/SearchResponse.java index 9e4b9382ab0e2..839abf1444412 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchResponse.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchResponse.java @@ -26,8 +26,10 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.StatusToXContentObject; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.rest.RestStatus; @@ -43,6 +45,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; @@ -71,15 +74,18 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb private ShardSearchFailure[] shardFailures; + private Clusters clusters; + private long tookInMillis; public SearchResponse() { } public SearchResponse(SearchResponseSections internalResponse, String scrollId, int totalShards, int successfulShards, - int skippedShards, long tookInMillis, ShardSearchFailure[] shardFailures) { + int skippedShards, long tookInMillis, ShardSearchFailure[] shardFailures, Clusters clusters) { this.internalResponse = internalResponse; this.scrollId = scrollId; + this.clusters = clusters; this.totalShards = totalShards; this.successfulShards = successfulShards; this.skippedShards = skippedShards; @@ -199,6 +205,15 @@ public Map getProfileResults() { return internalResponse.profile(); } + /** + * Returns info about what clusters the search was executed against. Available only in responses obtained + * from a Cross Cluster Search request, otherwise null + * @see Clusters + */ + public Clusters getClusters() { + return clusters; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -221,6 +236,7 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t } RestActions.buildBroadcastShardsHeader(builder, params, getTotalShards(), getSuccessfulShards(), getSkippedShards(), getFailedShards(), getShardFailures()); + clusters.toXContent(builder, params); internalResponse.toXContent(builder, params); return builder; } @@ -242,6 +258,7 @@ public static SearchResponse fromXContent(XContentParser parser) throws IOExcept int skippedShards = 0; // 0 for BWC String scrollId = null; List failures = new ArrayList<>(); + Clusters clusters = Clusters.EMPTY; while((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { currentFieldName = parser.currentName(); @@ -296,6 +313,28 @@ public static SearchResponse fromXContent(XContentParser parser) throws IOExcept parser.skipChildren(); } } + } else if (Clusters._CLUSTERS_FIELD.match(currentFieldName)) { + int successful = -1; + int total = -1; + int skipped = -1; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if (Clusters.SUCCESSFUL_FIELD.match(currentFieldName)) { + successful = parser.intValue(); + } else if (Clusters.TOTAL_FIELD.match(currentFieldName)) { + total = parser.intValue(); + } else if (Clusters.SKIPPED_FIELD.match(currentFieldName)) { + skipped = parser.intValue(); + } else { + parser.skipChildren(); + } + } else { + parser.skipChildren(); + } + } + clusters = new Clusters(total, successful, skipped); } else { parser.skipChildren(); } @@ -304,7 +343,7 @@ public static SearchResponse fromXContent(XContentParser parser) throws IOExcept SearchResponseSections searchResponseSections = new SearchResponseSections(hits, aggs, suggest, timedOut, terminatedEarly, profile, numReducePhases); return new SearchResponse(searchResponseSections, scrollId, totalShards, successfulShards, skippedShards, tookInMillis, - failures.toArray(new ShardSearchFailure[failures.size()])); + failures.toArray(new ShardSearchFailure[failures.size()]), clusters); } @Override @@ -322,6 +361,12 @@ public void readFrom(StreamInput in) throws IOException { shardFailures[i] = readShardSearchFailure(in); } } + //TODO update version once backported + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + clusters = new Clusters(in); + } else { + clusters = Clusters.EMPTY; + } scrollId = in.readOptionalString(); tookInMillis = in.readVLong(); if (in.getVersion().onOrAfter(Version.V_5_6_0)) { @@ -340,7 +385,10 @@ public void writeTo(StreamOutput out) throws IOException { for (ShardSearchFailure shardSearchFailure : shardFailures) { shardSearchFailure.writeTo(out); } - + //TODO update version once backported + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + clusters.writeTo(out); + } out.writeOptionalString(scrollId); out.writeVLong(tookInMillis); if(out.getVersion().onOrAfter(Version.V_5_6_0)) { @@ -353,4 +401,101 @@ public String toString() { return Strings.toString(this); } + /** + * Holds info about the clusters that the search was executed on: how many in total, how many of them were successful + * and how many of them were skipped. + */ + public static class Clusters implements ToXContent, Writeable { + + public static final Clusters EMPTY = new Clusters(0, 0, 0); + + static final ParseField _CLUSTERS_FIELD = new ParseField("_clusters"); + static final ParseField SUCCESSFUL_FIELD = new ParseField("successful"); + static final ParseField SKIPPED_FIELD = new ParseField("skipped"); + static final ParseField TOTAL_FIELD = new ParseField("total"); + + private final int total; + private final int successful; + private final int skipped; + + Clusters(int total, int successful, int skipped) { + assert total >= 0 && successful >= 0 && skipped >= 0 + : "total: " + total + " successful: " + successful + " skipped: " + skipped; + assert successful <= total && skipped == total - successful + : "total: " + total + " successful: " + successful + " skipped: " + skipped; + this.total = total; + this.successful = successful; + this.skipped = skipped; + } + + private Clusters(StreamInput in) throws IOException { + this.total = in.readVInt(); + this.successful = in.readVInt(); + this.skipped = in.readVInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(total); + out.writeVInt(successful); + out.writeVInt(skipped); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + if (this != EMPTY) { + builder.startObject(_CLUSTERS_FIELD.getPreferredName()); + builder.field(TOTAL_FIELD.getPreferredName(), total); + builder.field(SUCCESSFUL_FIELD.getPreferredName(), successful); + builder.field(SKIPPED_FIELD.getPreferredName(), skipped); + builder.endObject(); + } + return builder; + } + + /** + * Returns how many total clusters the search was requested to be executed on + */ + public int getTotal() { + return total; + } + + /** + * Returns how many total clusters the search was executed successfully on + */ + public int getSuccessful() { + return successful; + } + + /** + * Returns how many total clusters were during the execution of the search request + */ + public int getSkipped() { + return skipped; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Clusters clusters = (Clusters) o; + return total == clusters.total && + successful == clusters.successful && + skipped == clusters.skipped; + } + + @Override + public int hashCode() { + return Objects.hash(total, successful, skipped); + } + + @Override + public String toString() { + return "Clusters{total=" + total + ", successful=" + successful + ", skipped=" + skipped + '}'; + } + } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java index 109a1f30ffc8e..388a7679d2b8c 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java @@ -249,7 +249,7 @@ protected final void sendResponse(SearchPhaseController.ReducedQueryPhase queryP scrollId = request.scrollId(); } listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(), - 0, buildTookInMillis(), buildShardFailures())); + 0, buildTookInMillis(), buildShardFailures(), SearchResponse.Clusters.EMPTY)); } catch (Exception e) { listener.onFailure(new ReduceSearchPhaseException("fetch", "inner finish failed", e, buildShardFailures())); } diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 8400707e370d1..ab0a88cc59498 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -192,7 +192,7 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< if (remoteClusterIndices.isEmpty()) { executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteClusterIndices, Collections.emptyList(), (clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, clusterState.getNodes() - .getDataNodes().size()); + .getDataNodes().size(), SearchResponse.Clusters.EMPTY); } else { remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(), remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> { @@ -200,10 +200,12 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< Map remoteAliasFilters = new HashMap<>(); BiFunction clusterNodeLookup = processRemoteShards(searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); - int numNodesInvovled = searchShardsResponses.values().stream().mapToInt(r -> r.getNodes().length).sum() + int numNodesInvolved = searchShardsResponses.values().stream().mapToInt(r -> r.getNodes().length).sum() + clusterState.getNodes().getDataNodes().size(); + SearchResponse.Clusters clusters = buildClusters(localIndices, remoteClusterIndices, searchShardsResponses); executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteClusterIndices, - remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, numNodesInvovled); + remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, numNodesInvolved, + clusters); }, listener::onFailure)); } }, listener::onFailure); @@ -215,6 +217,20 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener< } } + static SearchResponse.Clusters buildClusters(OriginalIndices localIndices, Map remoteIndices, + Map searchShardsResponses) { + int localClusters = Math.min(localIndices.indices().length, 1); + int totalClusters = remoteIndices.size() + localClusters; + int successfulClusters = localClusters; + for (ClusterSearchShardsResponse searchShardsResponse : searchShardsResponses.values()) { + if (searchShardsResponse != ClusterSearchShardsResponse.EMPTY) { + successfulClusters++; + } + } + int skippedClusters = totalClusters - successfulClusters; + return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters); + } + static BiFunction processRemoteShards(Map searchShardsResponses, Map remoteIndicesByCluster, List remoteShardIterators, @@ -264,7 +280,8 @@ static BiFunction processRemoteShards(Map remoteClusterIndices, List remoteShardIterators, BiFunction remoteConnections, ClusterState clusterState, - Map remoteAliasMap, ActionListener listener, int nodeCount) { + Map remoteAliasMap, ActionListener listener, int nodeCount, + SearchResponse.Clusters clusters) { clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name @@ -329,7 +346,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea } boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators); searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(), - Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener, preFilterSearchShards).start(); + Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener, preFilterSearchShards, clusters).start(); } private boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupShardsIterator shardIterators) { @@ -343,9 +360,7 @@ static GroupShardsIterator mergeShardsIterators(GroupShards OriginalIndices localIndices, List remoteShardIterators) { List shards = new ArrayList<>(); - for (SearchShardIterator shardIterator : remoteShardIterators) { - shards.add(shardIterator); - } + shards.addAll(remoteShardIterators); for (ShardIterator shardIterator : localShardsIterator) { shards.add(new SearchShardIterator(null, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices)); } @@ -363,34 +378,35 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchReque BiFunction connectionLookup, long clusterStateVersion, Map aliasFilter, Map concreteIndexBoosts, - ActionListener listener, boolean preFilter) { + ActionListener listener, boolean preFilter, + SearchResponse.Clusters clusters) { Executor executor = threadPool.executor(ThreadPool.Names.SEARCH); if (preFilter) { return new CanMatchPreFilterSearchPhase(logger, searchTransportService, connectionLookup, aliasFilter, concreteIndexBoosts, executor, searchRequest, listener, shardIterators, timeProvider, clusterStateVersion, task, (iter) -> { AbstractSearchAsyncAction action = searchAsyncAction(task, searchRequest, iter, timeProvider, connectionLookup, - clusterStateVersion, aliasFilter, concreteIndexBoosts, listener, false); + clusterStateVersion, aliasFilter, concreteIndexBoosts, listener, false, clusters); return new SearchPhase(action.getName()) { @Override public void run() throws IOException { action.start(); } }; - }); + }, clusters); } else { AbstractSearchAsyncAction searchAsyncAction; switch (searchRequest.searchType()) { case DFS_QUERY_THEN_FETCH: searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup, aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, - timeProvider, clusterStateVersion, task); + timeProvider, clusterStateVersion, task, clusters); break; case QUERY_AND_FETCH: case QUERY_THEN_FETCH: searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup, aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, - timeProvider, clusterStateVersion, task); + timeProvider, clusterStateVersion, task, clusters); break; default: throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]"); diff --git a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 1ade10e4c7dd1..12a95a688ae88 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -263,6 +263,7 @@ public void apply(Settings value, Settings current, Settings previous) { ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING, TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, + RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE, RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER, RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING, RemoteClusterService.REMOTE_NODE_ATTRIBUTE, diff --git a/core/src/main/java/org/elasticsearch/common/settings/Setting.java b/core/src/main/java/org/elasticsearch/common/settings/Setting.java index abc589aedafc3..522bbdc9f35f7 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/core/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -664,7 +664,7 @@ public void diff(Settings.Builder builder, Settings source, Settings defaultSett } /** - * Returns the namespace for a concrete settting. Ie. an affix setting with prefix: search. and suffix: username + * Returns the namespace for a concrete setting. Ie. an affix setting with prefix: search. and suffix: username * will return remote as a namespace for the setting search.remote.username */ public String getNamespace(Setting concreteSetting) { @@ -1220,9 +1220,7 @@ private static AffixSetting affixKeySetting(AffixKey key, Function delegate = delegateFactory.apply("_na_"); return new AffixSetting<>(key, delegate, delegateFactory, dependencies); - }; - - + } public interface Key { boolean match(String key); diff --git a/core/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java index 18a8ebfa75082..107a4b32d89f9 100644 --- a/core/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java +++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java @@ -53,6 +53,7 @@ public abstract class RemoteClusterAware extends AbstractComponent { Setting.Property.NodeScope, Setting.Property.Dynamic)); public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':'; public static final String LOCAL_CLUSTER_GROUP_KEY = ""; + protected final ClusterNameExpressionResolver clusterNameResolver; /** @@ -160,7 +161,7 @@ private static InetSocketAddress parseSeedAddress(String remoteHost) { } } - public static final String buildRemoteIndexName(String clusterAlias, String indexName) { + public static String buildRemoteIndexName(String clusterAlias, String indexName) { return clusterAlias != null ? clusterAlias + REMOTE_CLUSTER_INDEX_SEPARATOR + indexName : indexName; } } diff --git a/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index c08bf9b737e95..55edd0c86ec29 100644 --- a/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -61,6 +61,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -87,6 +88,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo private final int maxNumRemoteConnections; private final Predicate nodePredicate; private volatile List seedNodes; + private volatile boolean skipUnavailable; private final ConnectHandler connectHandler; private SetOnce remoteClusterName = new SetOnce<>(); @@ -117,6 +119,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo remoteProfile = builder.build(); connectedNodes = new ConnectedNodes(clusterAlias); this.seedNodes = Collections.unmodifiableList(seedNodes); + this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE + .getConcreteSettingForNamespace(clusterAlias).get(settings); this.connectHandler = new ConnectHandler(); transportService.addConnectionListener(this); } @@ -129,6 +133,13 @@ synchronized void updateSeedNodes(List seedNodes, ActionListener< connectHandler.connect(connectListener); } + /** + * Updates the skipUnavailable flag that can be dynamically set for each remote cluster + */ + void updateSkipUnavailable(boolean skipUnavailable) { + this.skipUnavailable = skipUnavailable; + } + @Override public void onNodeDisconnected(DiscoveryNode node) { boolean remove = connectedNodes.remove(node); @@ -143,16 +154,19 @@ public void onNodeDisconnected(DiscoveryNode node) { */ public void fetchSearchShards(ClusterSearchShardsRequest searchRequest, ActionListener listener) { - if (connectedNodes.size() == 0) { - // just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener - // this will cause some back pressure on the search end and eventually will cause rejections but that's fine - // we can't proceed with a search on a cluster level. - // in the future we might want to just skip the remote nodes in such a case but that can already be implemented on the caller - // end since they provide the listener. - ensureConnected(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, listener), listener::onFailure)); + + final ActionListener searchShardsListener; + final Consumer onConnectFailure; + if (skipUnavailable) { + onConnectFailure = (exception) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY); + searchShardsListener = ActionListener.wrap(listener::onResponse, (e) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY)); } else { - fetchShardsInternal(searchRequest, listener); + onConnectFailure = listener::onFailure; + searchShardsListener = listener; } + // in case we have no connected nodes we try to connect and if we fail we either notify the listener or not depending on + // the skip_unavailable setting + ensureConnected(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, searchShardsListener), onConnectFailure)); } /** @@ -231,16 +245,12 @@ public String executor() { }); }; try { - if (connectedNodes.size() == 0) { - // just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener - // this will cause some back pressure on the search end and eventually will cause rejections but that's fine - // we can't proceed with a search on a cluster level. - // in the future we might want to just skip the remote nodes in such a case but that can already be implemented on the - // caller end since they provide the listener. - ensureConnected(ActionListener.wrap((x) -> runnable.run(), listener::onFailure)); - } else { - runnable.run(); - } + // just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener + // this will cause some back pressure on the search end and eventually will cause rejections but that's fine + // we can't proceed with a search on a cluster level. + // in the future we might want to just skip the remote nodes in such a case but that can already be implemented on the + // caller end since they provide the listener. + ensureConnected(ActionListener.wrap((x) -> runnable.run(), listener::onFailure)); } catch (Exception ex) { listener.onFailure(ex); } @@ -600,7 +610,7 @@ public void getConnectionInfo(ActionListener listener) { // not connected we return immediately RemoteConnectionInfo remoteConnectionStats = new RemoteConnectionInfo(clusterAlias, Collections.emptyList(), Collections.emptyList(), maxNumRemoteConnections, 0, - RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings)); + RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings), skipUnavailable); listener.onResponse(remoteConnectionStats); } else { NodesInfoRequest request = new NodesInfoRequest(); @@ -634,9 +644,9 @@ public void handleResponse(NodesInfoResponse response) { } } RemoteConnectionInfo remoteConnectionInfo = new RemoteConnectionInfo(clusterAlias, - seedNodes.stream().map(n -> n.getAddress()).collect(Collectors.toList()), new ArrayList<>(httpAddresses), + seedNodes.stream().map(DiscoveryNode::getAddress).collect(Collectors.toList()), new ArrayList<>(httpAddresses), maxNumRemoteConnections, connectedNodes.size(), - RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings)); + RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings), skipUnavailable); listener.onResponse(remoteConnectionInfo); } diff --git a/core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index c4b64e860b2b5..8f3a24cd70bfc 100644 --- a/core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -54,6 +55,8 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.elasticsearch.common.settings.Setting.boolSetting; + /** * Basic service for accessing remote clusters via gateway nodes */ @@ -89,6 +92,10 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl public static final Setting ENABLE_REMOTE_CLUSTERS = Setting.boolSetting("search.remote.connect", true, Setting.Property.NodeScope); + public static final Setting.AffixSetting REMOTE_CLUSTER_SKIP_UNAVAILABLE = + Setting.affixKeySetting("search.remote.", "skip_unavailable", + key -> boolSetting(key, false, Setting.Property.NodeScope, Setting.Property.Dynamic), REMOTE_CLUSTERS_SEEDS); + private final TransportService transportService; private final int numRemoteConnections; private volatile Map remoteClusters = Collections.emptyMap(); @@ -231,7 +238,7 @@ public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) @Override public void onFailure(Exception e) { TransportException exception = new TransportException("unable to communicate with remote cluster [" + - clusterName + "]", e); + clusterName + "]", e); if (transportException.compareAndSet(null, exception) == false) { exception = transportException.accumulateAndGet(exception, (previous, current) -> { current.addSuppressed(previous); @@ -283,6 +290,20 @@ protected Set getRemoteClusterNames() { return this.remoteClusters.keySet(); } + @Override + public void listenForUpdates(ClusterSettings clusterSettings) { + super.listenForUpdates(clusterSettings); + clusterSettings.addAffixUpdateConsumer(REMOTE_CLUSTER_SKIP_UNAVAILABLE, this::updateSkipUnavailable, + (clusterAlias, value) -> {}); + } + + synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) { + RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias); + if (remote != null) { + remote.updateSkipUnavailable(skipUnavailable); + } + } + protected void updateRemoteCluster(String clusterAlias, List addresses) { updateRemoteCluster(clusterAlias, addresses, ActionListener.wrap((x) -> {}, (x) -> {})); } diff --git a/core/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java b/core/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java index a05da795a81ef..20680a712859e 100644 --- a/core/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java +++ b/core/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java @@ -18,12 +18,12 @@ */ package org.elasticsearch.transport; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -42,17 +42,19 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable final TimeValue initialConnectionTimeout; final int numNodesConnected; final String clusterAlias; + final boolean skipUnavailable; RemoteConnectionInfo(String clusterAlias, List seedNodes, List httpAddresses, int connectionsPerCluster, int numNodesConnected, - TimeValue initialConnectionTimeout) { + TimeValue initialConnectionTimeout, boolean skipUnavailable) { this.clusterAlias = clusterAlias; this.seedNodes = seedNodes; this.httpAddresses = httpAddresses; this.connectionsPerCluster = connectionsPerCluster; this.numNodesConnected = numNodesConnected; this.initialConnectionTimeout = initialConnectionTimeout; + this.skipUnavailable = skipUnavailable; } public RemoteConnectionInfo(StreamInput input) throws IOException { @@ -62,6 +64,12 @@ public RemoteConnectionInfo(StreamInput input) throws IOException { initialConnectionTimeout = new TimeValue(input); numNodesConnected = input.readVInt(); clusterAlias = input.readString(); + //TODO update version once backported + if (input.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + skipUnavailable = input.readBoolean(); + } else { + skipUnavailable = false; + } } @Override @@ -82,6 +90,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("num_nodes_connected", numNodesConnected); builder.field("max_connections_per_cluster", connectionsPerCluster); builder.field("initial_connect_timeout", initialConnectionTimeout); + builder.field("skip_unavailable", skipUnavailable); } builder.endObject(); return builder; @@ -95,6 +104,10 @@ public void writeTo(StreamOutput out) throws IOException { initialConnectionTimeout.writeTo(out); out.writeVInt(numNodesConnected); out.writeString(clusterAlias); + //TODO update version once backported + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeBoolean(skipUnavailable); + } } @Override @@ -107,11 +120,13 @@ public boolean equals(Object o) { Objects.equals(seedNodes, that.seedNodes) && Objects.equals(httpAddresses, that.httpAddresses) && Objects.equals(initialConnectionTimeout, that.initialConnectionTimeout) && - Objects.equals(clusterAlias, that.clusterAlias); + Objects.equals(clusterAlias, that.clusterAlias) && + skipUnavailable == that.skipUnavailable; } @Override public int hashCode() { - return Objects.hash(seedNodes, httpAddresses, connectionsPerCluster, initialConnectionTimeout, numNodesConnected, clusterAlias); + return Objects.hash(seedNodes, httpAddresses, connectionsPerCluster, initialConnectionTimeout, + numNodesConnected, clusterAlias, skipUnavailable); } } diff --git a/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index 8f413eb436421..6cc1dae0468b2 100644 --- a/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -65,7 +65,8 @@ private AbstractSearchAsyncAction createAction( Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f), null, request, null, new GroupShardsIterator<>(Collections.singletonList( new SearchShardIterator(null, null, Collections.emptyList(), null))), timeProvider, 0, null, - new InitialSearchPhase.ArraySearchPhaseResults<>(10), request.getMaxConcurrentShardRequests()) { + new InitialSearchPhase.ArraySearchPhaseResults<>(10), request.getMaxConcurrentShardRequests(), + SearchResponse.Clusters.EMPTY) { @Override protected SearchPhase getNextPhase(final SearchPhaseResults results, final SearchPhaseContext context) { return null; diff --git a/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 9e0b4f7fee9ba..2bad34d20a635 100644 --- a/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -87,7 +87,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRe public void run() throws IOException { result.set(iter); latch.countDown(); - }}); + }}, SearchResponse.Clusters.EMPTY); canMatchPhase.start(); latch.await(); @@ -164,7 +164,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchTransportRe public void run() throws IOException { result.set(iter); latch.countDown(); - }}); + }}, SearchResponse.Clusters.EMPTY); canMatchPhase.start(); latch.await(); @@ -247,11 +247,10 @@ protected void executePhaseOnShard( listener.onFailure(new Exception("failure")); } } - }); + }, SearchResponse.Clusters.EMPTY); canMatchPhase.start(); latch.await(); executor.shutdown(); } - } diff --git a/core/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java b/core/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java index 2f06dd873215e..80fbd4cc43ddf 100644 --- a/core/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java +++ b/core/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java @@ -84,7 +84,7 @@ public SearchRequest getRequest() { @Override public SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) { return new SearchResponse(internalSearchResponse, scrollId, numShards, numSuccess.get(), 0, 0, - failures.toArray(new ShardSearchFailure[0])); + failures.toArray(new ShardSearchFailure[failures.size()]), SearchResponse.Clusters.EMPTY); } @Override diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 8a9c98395d76f..7d4ea9c9592f0 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -113,7 +113,8 @@ public void onFailure(Exception e) { 0, null, new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()), - request.getMaxConcurrentShardRequests()) { + request.getMaxConcurrentShardRequests(), + SearchResponse.Clusters.EMPTY) { @Override protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, @@ -203,7 +204,8 @@ public void onFailure(Exception e) { 0, null, new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()), - request.getMaxConcurrentShardRequests()) { + request.getMaxConcurrentShardRequests(), + SearchResponse.Clusters.EMPTY) { @Override protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, @@ -306,7 +308,8 @@ public void sendFreeContext(Transport.Connection connection, long contextId, Ori 0, null, new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()), - request.getMaxConcurrentShardRequests()) { + request.getMaxConcurrentShardRequests(), + SearchResponse.Clusters.EMPTY) { TestSearchResponse response = new TestSearchResponse(); @Override diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java index 999c348b57580..50a6b7ee31d36 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java @@ -19,8 +19,14 @@ package org.elasticsearch.action.search; +import org.elasticsearch.Version; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.text.Text; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContent; @@ -31,6 +37,7 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchHitsTests; +import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.AggregationsTests; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.internal.InternalSearchResponse; @@ -40,14 +47,17 @@ import org.elasticsearch.search.suggest.SuggestTests; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalAggregationTestCase; +import org.elasticsearch.test.VersionUtils; import org.junit.After; import org.junit.Before; import java.io.IOException; import java.util.ArrayList; +import java.util.Base64; import java.util.Collections; import java.util.List; +import static java.util.Collections.emptyList; import static java.util.Collections.singletonMap; import static org.elasticsearch.test.XContentTestUtils.insertRandomFields; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent; @@ -61,6 +71,8 @@ public class SearchResponseTests extends ESTestCase { xContentRegistry = new NamedXContentRegistry(namedXContents); } + private final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry( + new SearchModule(Settings.EMPTY, false, emptyList()).getNamedWriteables()); private AggregationsTests aggregationsTests = new AggregationsTests(); @Before @@ -112,8 +124,16 @@ private SearchResponse createTestItem(boolean minimal, ShardSearchFailure... sha } else { internalSearchResponse = InternalSearchResponse.empty(); } + return new SearchResponse(internalSearchResponse, null, totalShards, successfulShards, skippedShards, tookInMillis, - shardSearchFailures); + shardSearchFailures, randomBoolean() ? randomClusters() : SearchResponse.Clusters.EMPTY); + } + + private static SearchResponse.Clusters randomClusters() { + int totalClusters = randomIntBetween(0, 10); + int successfulClusters = randomIntBetween(0, totalClusters); + int skippedClusters = totalClusters - successfulClusters; + return new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters); } /** @@ -193,29 +213,104 @@ public void testToXContent() { SearchHit hit = new SearchHit(1, "id1", new Text("type"), Collections.emptyMap()); hit.score(2.0f); SearchHit[] hits = new SearchHit[] { hit }; - SearchResponse response = new SearchResponse( - new InternalSearchResponse(new SearchHits(hits, 100, 1.5f), null, null, null, false, null, 1), null, 0, 0, 0, 0, - new ShardSearchFailure[0]); - StringBuilder expectedString = new StringBuilder(); - expectedString.append("{"); { - expectedString.append("\"took\":0,"); - expectedString.append("\"timed_out\":false,"); - expectedString.append("\"_shards\":"); + SearchResponse response = new SearchResponse( + new InternalSearchResponse(new SearchHits(hits, 100, 1.5f), null, null, null, false, null, 1), null, 0, 0, 0, 0, + ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY); + StringBuilder expectedString = new StringBuilder(); + expectedString.append("{"); { - expectedString.append("{\"total\":0,"); - expectedString.append("\"successful\":0,"); - expectedString.append("\"skipped\":0,"); - expectedString.append("\"failed\":0},"); + expectedString.append("\"took\":0,"); + expectedString.append("\"timed_out\":false,"); + expectedString.append("\"_shards\":"); + { + expectedString.append("{\"total\":0,"); + expectedString.append("\"successful\":0,"); + expectedString.append("\"skipped\":0,"); + expectedString.append("\"failed\":0},"); + } + expectedString.append("\"hits\":"); + { + expectedString.append("{\"total\":100,"); + expectedString.append("\"max_score\":1.5,"); + expectedString.append("\"hits\":[{\"_type\":\"type\",\"_id\":\"id1\",\"_score\":2.0}]}"); + } } - expectedString.append("\"hits\":"); + expectedString.append("}"); + assertEquals(expectedString.toString(), Strings.toString(response)); + } + { + SearchResponse response = new SearchResponse( + new InternalSearchResponse(new SearchHits(hits, 100, 1.5f), null, null, null, false, null, 1), null, 0, 0, 0, 0, + ShardSearchFailure.EMPTY_ARRAY, new SearchResponse.Clusters(5, 3, 2)); + StringBuilder expectedString = new StringBuilder(); + expectedString.append("{"); { - expectedString.append("{\"total\":100,"); - expectedString.append("\"max_score\":1.5,"); - expectedString.append("\"hits\":[{\"_type\":\"type\",\"_id\":\"id1\",\"_score\":2.0}]}"); + expectedString.append("\"took\":0,"); + expectedString.append("\"timed_out\":false,"); + expectedString.append("\"_shards\":"); + { + expectedString.append("{\"total\":0,"); + expectedString.append("\"successful\":0,"); + expectedString.append("\"skipped\":0,"); + expectedString.append("\"failed\":0},"); + } + expectedString.append("\"_clusters\":"); + { + expectedString.append("{\"total\":5,"); + expectedString.append("\"successful\":3,"); + expectedString.append("\"skipped\":2},"); + } + expectedString.append("\"hits\":"); + { + expectedString.append("{\"total\":100,"); + expectedString.append("\"max_score\":1.5,"); + expectedString.append("\"hits\":[{\"_type\":\"type\",\"_id\":\"id1\",\"_score\":2.0}]}"); + } + } + expectedString.append("}"); + assertEquals(expectedString.toString(), Strings.toString(response)); + } + } + + public void testSerialization() throws IOException { + SearchResponse searchResponse = createTestItem(false); + BytesStreamOutput bytesStreamOutput = new BytesStreamOutput(); + searchResponse.writeTo(bytesStreamOutput); + try (StreamInput in = new NamedWriteableAwareStreamInput( + StreamInput.wrap(bytesStreamOutput.bytes().toBytesRef().bytes), namedWriteableRegistry)) { + SearchResponse serialized = new SearchResponse(); + serialized.readFrom(in); + assertEquals(searchResponse.getHits().totalHits, serialized.getHits().totalHits); + assertEquals(searchResponse.getHits().getHits().length, serialized.getHits().getHits().length); + assertEquals(searchResponse.getNumReducePhases(), serialized.getNumReducePhases()); + assertEquals(searchResponse.getFailedShards(), serialized.getFailedShards()); + assertEquals(searchResponse.getTotalShards(), serialized.getTotalShards()); + assertEquals(searchResponse.getSkippedShards(), serialized.getSkippedShards()); + assertEquals(searchResponse.getClusters(), serialized.getClusters()); + } + } + + public void testSerializationBwc() throws IOException { + final byte[] data = Base64.getDecoder().decode("AAAAAAAAAAAAAgABBQUAAAoAAAAAAAAA"); + final Version version = VersionUtils.randomVersionBetween(random(), Version.V_5_6_0, Version.V_6_0_0_rc2); + try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(data), namedWriteableRegistry)) { + in.setVersion(version); + SearchResponse deserialized = new SearchResponse(); + deserialized.readFrom(in); + assertSame(SearchResponse.Clusters.EMPTY, deserialized.getClusters()); + + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(version); + deserialized.writeTo(out); + try (StreamInput in2 = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes().toBytesRef().bytes), + namedWriteableRegistry)) { + in2.setVersion(version); + SearchResponse deserialized2 = new SearchResponse(); + deserialized2.readFrom(in2); + assertSame(SearchResponse.Clusters.EMPTY, deserialized2.getClusters()); + } } } - expectedString.append("}"); - assertEquals(expectedString.toString(), Strings.toString(response)); } } diff --git a/core/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/core/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index a275129e7610d..902a8ad97a0c4 100644 --- a/core/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -238,7 +238,43 @@ public void testProcessRemoteShards() throws IOException { assertEquals(new MatchAllQueryBuilder(), remoteAliases.get("bar_id").getQueryBuilder()); assertNull(remoteAliases.get("xyz_id").getQueryBuilder()); } + } + public void testBuildClusters() { + OriginalIndices localIndices = randomOriginalIndices(); + Map remoteIndices = new HashMap<>(); + Map searchShardsResponses = new HashMap<>(); + int numRemoteClusters = randomIntBetween(0, 10); + boolean onlySuccessful = randomBoolean(); + int localClusters = localIndices.indices().length == 0 ? 0 : 1; + int total = numRemoteClusters + localClusters; + int successful = localClusters; + int skipped = 0; + for (int i = 0; i < numRemoteClusters; i++) { + String cluster = randomAlphaOfLengthBetween(5, 10); + remoteIndices.put(cluster, randomOriginalIndices()); + if (onlySuccessful || randomBoolean()) { + //whatever response counts as successful as long as it's not the empty placeholder + searchShardsResponses.put(cluster, new ClusterSearchShardsResponse()); + successful++; + } else { + searchShardsResponses.put(cluster, ClusterSearchShardsResponse.EMPTY); + skipped++; + } + } + SearchResponse.Clusters clusters = TransportSearchAction.buildClusters(localIndices, remoteIndices, searchShardsResponses); + assertEquals(total, clusters.getTotal()); + assertEquals(successful, clusters.getSuccessful()); + assertEquals(skipped, clusters.getSkipped()); } + private static OriginalIndices randomOriginalIndices() { + int numLocalIndices = randomIntBetween(0, 5); + String[] localIndices = new String[numLocalIndices]; + for (int i = 0; i < numLocalIndices; i++) { + localIndices[i] = randomAlphaOfLengthBetween(3, 10); + } + return new OriginalIndices(localIndices, IndicesOptions.fromOptions(randomBoolean(), + randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())); + } } diff --git a/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 856385531d7ec..60d80f72b0cbd 100644 --- a/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -53,10 +53,10 @@ import org.elasticsearch.http.HttpInfo; import org.elasticsearch.mocksocket.MockServerSocket; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.hamcrest.Matchers; import java.io.IOException; import java.net.InetAddress; @@ -65,6 +65,7 @@ import java.net.Socket; import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.Collections; import java.util.List; import java.util.concurrent.BrokenBarrierException; @@ -80,6 +81,8 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.startsWith; public class RemoteClusterConnectionTests extends ESTestCase { @@ -162,14 +165,14 @@ public void testDiscoverSingleNode() throws Exception { public void testDiscoverSingleNodeWithIncompatibleSeed() throws Exception { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); - MockTransportService incomaptibleTransport = startTransport("incompat_seed_node", knownNodes, Version.fromString("2.0.0")); + MockTransportService incompatibleTransport = startTransport("incompat_seed_node", knownNodes, Version.fromString("2.0.0")); MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) { DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode(); - DiscoveryNode incompatibleSeedNode = incomaptibleTransport.getLocalDiscoNode(); + DiscoveryNode incompatibleSeedNode = incompatibleTransport.getLocalDiscoNode(); knownNodes.add(seedTransport.getLocalDiscoNode()); knownNodes.add(discoverableTransport.getLocalDiscoNode()); - knownNodes.add(incomaptibleTransport.getLocalDiscoNode()); + knownNodes.add(incompatibleTransport.getLocalDiscoNode()); Collections.shuffle(knownNodes, random()); List seedNodes = Arrays.asList(incompatibleSeedNode, seedNode); Collections.shuffle(seedNodes, random()); @@ -366,29 +369,24 @@ public void testFetchShards() throws Exception { try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { service.start(); service.acceptIncomingRequests(); + List nodes = Collections.singletonList(seedNode); try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", - Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + nodes, service, Integer.MAX_VALUE, n -> true)) { if (randomBoolean()) { - updateSeedNodes(connection, Arrays.asList(seedNode)); + updateSeedNodes(connection, nodes); + } + if (randomBoolean()) { + connection.updateSkipUnavailable(randomBoolean()); } - SearchRequest request = new SearchRequest("test-index"); CountDownLatch responseLatch = new CountDownLatch(1); AtomicReference reference = new AtomicReference<>(); AtomicReference failReference = new AtomicReference<>(); - ActionListener shardsListener = ActionListener.wrap( - x -> { - reference.set(x); - responseLatch.countDown(); - }, - x -> { - failReference.set(x); - responseLatch.countDown(); - }); - ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(new String[]{"test-index"}) + ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) .routing(request.routing()); - connection.fetchSearchShards(searchShardsRequest, shardsListener); + connection.fetchSearchShards(searchShardsRequest, + new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch)); responseLatch.await(); assertNull(failReference.get()); assertNotNull(reference.get()); @@ -400,6 +398,104 @@ public void testFetchShards() throws Exception { } } + public void testFetchShardsSkipUnavailable() throws Exception { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + knownNodes.add(seedNode); + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster", + Collections.singletonList(seedNode), service, Integer.MAX_VALUE, n -> true)) { + + SearchRequest request = new SearchRequest("test-index"); + ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest("test-index") + .indicesOptions(request.indicesOptions()).local(true).preference(request.preference()) + .routing(request.routing()); + { + CountDownLatch responseLatch = new CountDownLatch(1); + AtomicReference reference = new AtomicReference<>(); + AtomicReference failReference = new AtomicReference<>(); + connection.fetchSearchShards(searchShardsRequest, + new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch)); + assertTrue(responseLatch.await(1, TimeUnit.SECONDS)); + assertNull(failReference.get()); + assertNotNull(reference.get()); + ClusterSearchShardsResponse response = reference.get(); + assertTrue(response != ClusterSearchShardsResponse.EMPTY); + assertEquals(knownNodes, Arrays.asList(response.getNodes())); + } + + CountDownLatch disconnectedLatch = new CountDownLatch(1); + service.addConnectionListener(new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node) { + if (node.equals(seedNode)) { + disconnectedLatch.countDown(); + } + } + }); + + service.addFailToSendNoConnectRule(seedTransport); + + if (randomBoolean()) { + connection.updateSkipUnavailable(false); + } + { + CountDownLatch responseLatch = new CountDownLatch(1); + AtomicReference reference = new AtomicReference<>(); + AtomicReference failReference = new AtomicReference<>(); + connection.fetchSearchShards(searchShardsRequest, + new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch)); + assertTrue(responseLatch.await(1, TimeUnit.SECONDS)); + assertNotNull(failReference.get()); + assertNull(reference.get()); + assertThat(failReference.get(), instanceOf(TransportException.class)); + } + + connection.updateSkipUnavailable(true); + { + CountDownLatch responseLatch = new CountDownLatch(1); + AtomicReference reference = new AtomicReference<>(); + AtomicReference failReference = new AtomicReference<>(); + connection.fetchSearchShards(searchShardsRequest, + new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch)); + assertTrue(responseLatch.await(1, TimeUnit.SECONDS)); + assertNull(failReference.get()); + assertNotNull(reference.get()); + ClusterSearchShardsResponse response = reference.get(); + assertTrue(response == ClusterSearchShardsResponse.EMPTY); + } + + //give transport service enough time to realize that the node is down, and to notify the connection listeners + //so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next + assertTrue(disconnectedLatch.await(1, TimeUnit.SECONDS)); + + if (randomBoolean()) { + connection.updateSkipUnavailable(false); + } + + service.clearAllRules(); + //check that we reconnect once the node is back up + { + CountDownLatch responseLatch = new CountDownLatch(1); + AtomicReference reference = new AtomicReference<>(); + AtomicReference failReference = new AtomicReference<>(); + connection.fetchSearchShards(searchShardsRequest, + new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch)); + assertTrue(responseLatch.await(1, TimeUnit.SECONDS)); + assertNull(failReference.get()); + assertNotNull(reference.get()); + ClusterSearchShardsResponse response = reference.get(); + assertTrue(response != ClusterSearchShardsResponse.EMPTY); + assertEquals(knownNodes, Arrays.asList(response.getNodes())); + } + } + } + } + } + public void testTriggerUpdatesConcurrently() throws IOException, InterruptedException { List knownNodes = new CopyOnWriteArrayList<>(); try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT); @@ -621,53 +717,53 @@ public void testRemoteConnectionInfo() throws IOException { RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster", Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), - 4, 3, TimeValue.timeValueMinutes(30)); + 4, 3, TimeValue.timeValueMinutes(30), false); assertSerialization(stats); RemoteConnectionInfo stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), - 4, 4, TimeValue.timeValueMinutes(30)); + 4, 4, TimeValue.timeValueMinutes(30), true); assertSerialization(stats1); assertNotEquals(stats, stats1); stats1 = new RemoteConnectionInfo("test_cluster_1", Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), - 4, 3, TimeValue.timeValueMinutes(30)); + 4, 3, TimeValue.timeValueMinutes(30), false); assertSerialization(stats1); assertNotEquals(stats, stats1); stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 15)), Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), - 4, 3, TimeValue.timeValueMinutes(30)); + 4, 3, TimeValue.timeValueMinutes(30), false); assertSerialization(stats1); assertNotEquals(stats, stats1); stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 87)), - 4, 3, TimeValue.timeValueMinutes(30)); + 4, 3, TimeValue.timeValueMinutes(30), true); assertSerialization(stats1); assertNotEquals(stats, stats1); stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), - 4, 3, TimeValue.timeValueMinutes(325)); + 4, 3, TimeValue.timeValueMinutes(325), true); assertSerialization(stats1); assertNotEquals(stats, stats1); stats1 = new RemoteConnectionInfo("test_cluster", Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), - 5, 3, TimeValue.timeValueMinutes(30)); + 5, 3, TimeValue.timeValueMinutes(30), false); assertSerialization(stats1); assertNotEquals(stats, stats1); } - private RemoteConnectionInfo assertSerialization(RemoteConnectionInfo info) throws IOException { + private static RemoteConnectionInfo assertSerialization(RemoteConnectionInfo info) throws IOException { try (BytesStreamOutput out = new BytesStreamOutput()) { out.setVersion(Version.CURRENT); info.writeTo(out); @@ -680,31 +776,59 @@ private RemoteConnectionInfo assertSerialization(RemoteConnectionInfo info) thro } } + public void testRemoteConnectionInfoBwComp() throws IOException { + final Version version = VersionUtils.randomVersionBetween(random(), Version.V_5_6_0, Version.V_6_0_0_rc2); + RemoteConnectionInfo expected = new RemoteConnectionInfo("test_cluster", + Collections.singletonList(new TransportAddress(TransportAddress.META_ADDRESS, 1)), + Collections.singletonList(new TransportAddress(TransportAddress.META_ADDRESS, 80)), + 4, 4, new TimeValue(30, TimeUnit.MINUTES), false); + + String encoded = "AQQAAAAABzAuMC4wLjAAAAABAQQAAAAABzAuMC4wLjAAAABQBDwEBAx0ZXN0X2NsdXN0ZXIAAAAAAAAAAAAAAA=="; + final byte[] data = Base64.getDecoder().decode(encoded); + + try (StreamInput in = StreamInput.wrap(data)) { + in.setVersion(version); + RemoteConnectionInfo deserialized = new RemoteConnectionInfo(in); + assertEquals(expected, deserialized); + + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.setVersion(version); + deserialized.writeTo(out); + try (StreamInput in2 = StreamInput.wrap(out.bytes().toBytesRef().bytes)) { + in2.setVersion(version); + RemoteConnectionInfo deserialized2 = new RemoteConnectionInfo(in2); + assertEquals(expected, deserialized2); + } + } + } + } + public void testRenderConnectionInfoXContent() throws IOException { RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster", Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,1)), Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,80)), - 4, 3, TimeValue.timeValueMinutes(30)); + 4, 3, TimeValue.timeValueMinutes(30), true); stats = assertSerialization(stats); XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); stats.toXContent(builder, null); builder.endObject(); assertEquals("{\"test_cluster\":{\"seeds\":[\"0.0.0.0:1\"],\"http_addresses\":[\"0.0.0.0:80\"],\"connected\":true," + - "\"num_nodes_connected\":3,\"max_connections_per_cluster\":4,\"initial_connect_timeout\":\"30m\"}}", builder.string()); + "\"num_nodes_connected\":3,\"max_connections_per_cluster\":4,\"initial_connect_timeout\":\"30m\"," + + "\"skip_unavailable\":true}}", builder.string()); stats = new RemoteConnectionInfo("some_other_cluster", Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,1), new TransportAddress(TransportAddress.META_ADDRESS,2)), Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,80), new TransportAddress(TransportAddress.META_ADDRESS,81)), - 2, 0, TimeValue.timeValueSeconds(30)); + 2, 0, TimeValue.timeValueSeconds(30), false); stats = assertSerialization(stats); builder = XContentFactory.jsonBuilder(); builder.startObject(); stats.toXContent(builder, null); builder.endObject(); assertEquals("{\"some_other_cluster\":{\"seeds\":[\"0.0.0.0:1\",\"0.0.0.0:2\"],\"http_addresses\":[\"0.0.0.0:80\",\"0.0.0.0:81\"]," - + "\"connected\":false,\"num_nodes_connected\":0,\"max_connections_per_cluster\":2,\"initial_connect_timeout\":\"30s\"}}", - builder.string()); + + "\"connected\":false,\"num_nodes_connected\":0,\"max_connections_per_cluster\":2,\"initial_connect_timeout\":\"30s\"," + + "\"skip_unavailable\":false}}", builder.string()); } private RemoteConnectionInfo getRemoteConnectionInfo(RemoteClusterConnection connection) throws Exception { @@ -950,11 +1074,10 @@ public void testClusterNameIsChecked() throws Exception { IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, () -> updateSeedNodes(connection, Arrays.asList(otherClusterTransport.getLocalDiscoNode()))); assertThat(illegalStateException.getMessage(), - Matchers.startsWith("handshake failed, mismatched cluster name [Cluster [otherCluster]]" + + startsWith("handshake failed, mismatched cluster name [Cluster [otherCluster]]" + " - {other_cluster_discoverable_node}")); } } } } - } diff --git a/core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 8e0c039176207..91a245dc5491d 100644 --- a/core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -21,7 +21,12 @@ import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.AbstractScopedSettings; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; @@ -29,22 +34,29 @@ import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.hamcrest.Matchers; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.startsWith; + public class RemoteClusterServiceTests extends ESTestCase { private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @@ -70,6 +82,7 @@ private MockTransportService startTransport( public void testSettingsAreRegistered() { assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_NODE_ATTRIBUTE)); @@ -147,7 +160,7 @@ public void testGroupClusterIndices() throws IOException { IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar", - "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, i -> "cluster_1:bar".equals(i))); + "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, "cluster_1:bar"::equals)); assertEquals("Can not filter indices; index cluster_1:bar exists but there is also a remote cluster named:" + " cluster_1", iae.getMessage()); @@ -414,7 +427,7 @@ public void onFailure(Exception e) { failLatch.await(); assertNotNull(ex.get()); if (ex.get() instanceof IllegalStateException) { - assertThat(ex.get().getMessage(), Matchers.anyOf(Matchers.equalTo("no seed node left"), Matchers.startsWith + assertThat(ex.get().getMessage(), anyOf(equalTo("no seed node left"), startsWith ("No node available for cluster:"))); } else { if (ex.get() instanceof TransportException == false) { @@ -429,4 +442,192 @@ public void onFailure(Exception e) { } } } + + public void testCollectSearchShards() throws Exception { + int numClusters = randomIntBetween(2, 10); + MockTransportService[] mockTransportServices = new MockTransportService[numClusters]; + DiscoveryNode[] nodes = new DiscoveryNode[numClusters]; + Map remoteIndicesByCluster = new HashMap<>(); + Settings.Builder builder = Settings.builder(); + for (int i = 0; i < numClusters; i++) { + List knownNodes = new CopyOnWriteArrayList<>(); + MockTransportService remoteSeedTransport = startTransport("node_remote" + i, knownNodes, Version.CURRENT); + mockTransportServices[i] = remoteSeedTransport; + DiscoveryNode remoteSeedNode = remoteSeedTransport.getLocalDiscoNode(); + knownNodes.add(remoteSeedNode); + nodes[i] = remoteSeedNode; + builder.put("search.remote.remote" + i + ".seeds", remoteSeedNode.getAddress().toString()); + remoteIndicesByCluster.put("remote" + i, new OriginalIndices(new String[]{"index"}, IndicesOptions.lenientExpandOpen())); + } + Settings settings = builder.build(); + + try { + try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) { + service.start(); + service.acceptIncomingRequests(); + try (RemoteClusterService remoteClusterService = new RemoteClusterService(settings, service)) { + assertFalse(remoteClusterService.isCrossClusterSearchEnabled()); + remoteClusterService.initializeRemoteClusters(); + assertTrue(remoteClusterService.isCrossClusterSearchEnabled()); + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicReference> response = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster, + new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch)); + assertTrue(latch.await(1, TimeUnit.SECONDS)); + assertNull(failure.get()); + assertNotNull(response.get()); + Map map = response.get(); + assertEquals(numClusters, map.size()); + for (int i = 0; i < numClusters; i++) { + String clusterAlias = "remote" + i; + assertTrue(map.containsKey(clusterAlias)); + ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias); + assertEquals(1, shardsResponse.getNodes().length); + } + } + + int numDisconnectedClusters = randomIntBetween(1, numClusters); + Set disconnectedNodes = new HashSet<>(numDisconnectedClusters); + Set disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters); + while(disconnectedNodes.size() < numDisconnectedClusters) { + int i = randomIntBetween(0, numClusters - 1); + if (disconnectedNodes.add(nodes[i])) { + assertTrue(disconnectedNodesIndices.add(i)); + } + } + + CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); + service.addConnectionListener(new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node) { + if (disconnectedNodes.remove(node)) { + disconnectedLatch.countDown(); + } + } + }); + + for (DiscoveryNode disconnectedNode : disconnectedNodes) { + service.addFailToSendNoConnectRule(disconnectedNode.getAddress()); + } + + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicReference> response = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster, + new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch)); + assertTrue(latch.await(1, TimeUnit.SECONDS)); + assertNull(response.get()); + assertNotNull(failure.get()); + assertThat(failure.get(), instanceOf(TransportException.class)); + assertThat(failure.get().getMessage(), containsString("unable to communicate with remote cluster")); + } + + //setting skip_unavailable to true for all the disconnected clusters will make the request succeed again + for (int i : disconnectedNodesIndices) { + remoteClusterService.updateSkipUnavailable("remote" + i, true); + } + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicReference> response = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster, + new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch)); + assertTrue(latch.await(1, TimeUnit.SECONDS)); + assertNull(failure.get()); + assertNotNull(response.get()); + Map map = response.get(); + assertEquals(numClusters, map.size()); + for (int i = 0; i < numClusters; i++) { + String clusterAlias = "remote" + i; + assertTrue(map.containsKey(clusterAlias)); + ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias); + if (disconnectedNodesIndices.contains(i)) { + assertTrue(shardsResponse == ClusterSearchShardsResponse.EMPTY); + } else { + assertTrue(shardsResponse != ClusterSearchShardsResponse.EMPTY); + } + } + } + + //give transport service enough time to realize that the node is down, and to notify the connection listeners + //so that RemoteClusterConnection is left with no connected nodes, hence it will retry connecting next + assertTrue(disconnectedLatch.await(1, TimeUnit.SECONDS)); + + service.clearAllRules(); + if (randomBoolean()) { + for (int i : disconnectedNodesIndices) { + if (randomBoolean()) { + remoteClusterService.updateSkipUnavailable("remote" + i, true); + } + + } + } + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicReference> response = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), null, null, remoteIndicesByCluster, + new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch)); + assertTrue(latch.await(1, TimeUnit.SECONDS)); + assertNull(failure.get()); + assertNotNull(response.get()); + Map map = response.get(); + assertEquals(numClusters, map.size()); + for (int i = 0; i < numClusters; i++) { + String clusterAlias = "remote" + i; + assertTrue(map.containsKey(clusterAlias)); + ClusterSearchShardsResponse shardsResponse = map.get(clusterAlias); + assertTrue(shardsResponse != ClusterSearchShardsResponse.EMPTY); + } + } + } + } + } finally { + for (MockTransportService mockTransportService : mockTransportServices) { + mockTransportService.close(); + } + } + } + + public void testRemoteClusterSkipIfDisconnectedSetting() { + { + Settings settings = Settings.builder() + .put("search.remote.foo.skip_unavailable", true) + .put("search.remote.bar.skip_unavailable", false).build(); + RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings)); + } + { + Settings brokenSettings = Settings.builder() + .put("search.remote.foo.skip_unavailable", "broken").build(); + expectThrows(IllegalArgumentException.class, () -> + RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getAllConcreteSettings(brokenSettings) + .forEach(setting -> setting.get(brokenSettings))); + } + + AbstractScopedSettings service = new ClusterSettings(Settings.EMPTY, + new HashSet<>(Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, + RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE))); + { + Settings settings = Settings.builder().put("search.remote.foo.skip_unavailable", randomBoolean()).build(); + IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> service.validate(settings, true)); + assertEquals("Missing required setting [search.remote.foo.seeds] for setting [search.remote.foo.skip_unavailable]", + iae.getMessage()); + } + { + try (MockTransportService remoteSeedTransport = startTransport("seed", new CopyOnWriteArrayList<>(), Version.CURRENT)) { + String seed = remoteSeedTransport.getLocalDiscoNode().getAddress().toString(); + service.validate(Settings.builder().put("search.remote.foo.skip_unavailable", randomBoolean()) + .put("search.remote.foo.seeds", seed).build(), true); + service.validate(Settings.builder().put("search.remote.foo.seeds", seed).build(), true); + + AbstractScopedSettings service2 = new ClusterSettings(Settings.builder().put("search.remote.foo.seeds", seed).build(), + new HashSet<>(Arrays.asList(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS, + RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE))); + service2.validate(Settings.builder().put("search.remote.foo.skip_unavailable", randomBoolean()).build(), false); + } + } + } } diff --git a/docs/build.gradle b/docs/build.gradle index 4cb82b97152f8..6084561833a48 100644 --- a/docs/build.gradle +++ b/docs/build.gradle @@ -110,9 +110,10 @@ buildRestTests.setups['host'] = ''' - set: { master_node: master } - do: nodes.info: - metric: [ http ] + metric: [ http, transport ] - is_true: nodes.$master.http.publish_address - set: {nodes.$master.http.publish_address: host} + - set: {nodes.$master.transport.publish_address: transport_host} ''' buildRestTests.setups['node'] = ''' diff --git a/docs/reference/cluster/remote-info.asciidoc b/docs/reference/cluster/remote-info.asciidoc index 304e7c8de0b4c..d044f4dcad221 100644 --- a/docs/reference/cluster/remote-info.asciidoc +++ b/docs/reference/cluster/remote-info.asciidoc @@ -33,3 +33,7 @@ the configured remote cluster alias. `initial_connect_timeout`:: The initial connect timeout for remote cluster connections. + +`skip_unavailable`:: + Whether the remote cluster is skipped in case it is searched through + a cross cluster search request but none of its nodes are available. diff --git a/docs/reference/modules/cross-cluster-search.asciidoc b/docs/reference/modules/cross-cluster-search.asciidoc index 315149941f444..6bdbffdfe88ee 100644 --- a/docs/reference/modules/cross-cluster-search.asciidoc +++ b/docs/reference/modules/cross-cluster-search.asciidoc @@ -69,6 +69,11 @@ PUT _cluster/settings "seeds": [ "127.0.0.1:9301" ] + }, + "cluster_three": { + "seeds": [ + "127.0.0.1:9302" + ] } } } @@ -76,6 +81,46 @@ PUT _cluster/settings } -------------------------------- // CONSOLE +// TEST[setup:host] +// TEST[s/127.0.0.1:9300/\${transport_host}/] + +////////////////////////// + +We want to be sure that settings have been updated, +because we'll use them later. + +[source,js] +-------------------------------------------------- +{ + "acknowledged" : true, + "persistent": { + "search": { + "remote": { + "cluster_one": { + "seeds": [ + "127.0.0.1:9300" + ] + }, + "cluster_two": { + "seeds": [ + "127.0.0.1:9301" + ] + }, + "cluster_three": { + "seeds": [ + "127.0.0.1:9302" + ] + } + } + } + }, + "transient" : {} +} +-------------------------------------------------- +// TESTRESPONSE[s/127.0.0.1:9300/\${transport_host}/] + +////////////////////////// + A remote cluster can be deleted from the cluster settings by setting its seeds to `null`: @@ -86,7 +131,7 @@ PUT _cluster/settings "persistent": { "search": { "remote": { - "cluster_one": { + "cluster_three": { "seeds": null <1> } } @@ -95,41 +140,104 @@ PUT _cluster/settings } -------------------------------- // CONSOLE -<1> `cluster_one` would be removed from the cluster settings, leaving `cluster_two` intact. +// TEST[continued] +<1> `cluster_three` would be removed from the cluster settings, leaving `cluster_one` and `cluster_two` intact. + +////////////////////////// + +We want to be sure that settings have been updated, +because we'll use them later. + +[source,js] +-------------------------------------------------- +{ + "acknowledged" : true, + "persistent" : {}, + "transient" : {} +} +-------------------------------------------------- +// TESTRESPONSE +////////////////////////// [float] === Using cross cluster search -To search the `twitter` index on remote cluster `cluster_1` the index name must be prefixed with the cluster alias -separated by a `:` character: +To search the `twitter` index on remote cluster `cluster_one` the index name +must be prefixed with the cluster alias separated by a `:` character: [source,js] -------------------------------------------------- -POST /cluster_one:twitter/tweet/_search +GET /cluster_one:twitter/tweet/_search { "query": { - "match_all": {} + "match": { + "user": "kimchy" + } } } -------------------------------------------------- // CONSOLE -// TEST[skip:we don't have two clusters set up during docs testing] +// TEST[continued] +// TEST[setup:twitter] + +[source,js] +-------------------------------------------------- +{ + "took": 150, + "timed_out": false, + "_shards": { + "total": 1, + "successful": 1, + "failed": 0, + "skipped": 0 + }, + "_clusters": { + "total": 1, + "successful": 1, + "skipped": 0 + }, + "hits": { + "total": 1, + "max_score": 1, + "hits": [ + { + "_index": "cluster_one:twitter", + "_type": "tweet", + "_id": "0", + "_score": 1, + "_source": { + "user": "kimchy", + "date": "2009-11-15T14:12:12", + "message": "trying out Elasticsearch", + "likes": 0 + } + } + ] + } +} +-------------------------------------------------- +// TESTRESPONSE[s/"took": 150/"took": "$body.took"/] +// TESTRESPONSE[s/"max_score": 1/"max_score": "$body.hits.max_score"/] +// TESTRESPONSE[s/"_score": 1/"_score": "$body.hits.hits.0._score"/] + In contrast to the `tribe` feature cross cluster search can also search indices with the same name on different clusters: [source,js] -------------------------------------------------- -POST /cluster_one:twitter,twitter/tweet/_search +GET /cluster_one:twitter,twitter/tweet/_search { "query": { - "match_all": {} + "match": { + "user": "kimchy" + } } } -------------------------------------------------- // CONSOLE -// TEST[skip:we don't have two clusters set up during docs testing] +// TEST[continued] Search results are disambiguated the same way as the indices are disambiguated in the request. Even if index names are identical these indices will be treated as different indices when results are merged. All results retrieved from a @@ -139,44 +247,146 @@ will be prefixed with their remote cluster name: [source,js] -------------------------------------------------- { - "took" : 89, - "timed_out" : false, - "_shards" : { - "total" : 10, - "successful" : 10, - "failed" : 0 + "took": 150, + "timed_out": false, + "_shards": { + "total": 2, + "successful": 2, + "failed": 0, + "skipped": 0 }, - "hits" : { - "total" : 2, - "max_score" : 1.0, - "hits" : [ + "_clusters": { + "total": 2, + "successful": 2, + "skipped": 0 + }, + "hits": { + "total": 2, + "max_score": 1, + "hits": [ { - "_index" : "cluster_one:twitter", - "_type" : "tweet", - "_id" : "1", - "_score" : 1.0, - "_source" : { - "user" : "kimchy", - "postDate" : "2009-11-15T14:12:12", - "message" : "trying out Elasticsearch" + "_index": "cluster_one:twitter", + "_type": "tweet", + "_id": "0", + "_score": 1, + "_source": { + "user": "kimchy", + "date": "2009-11-15T14:12:12", + "message": "trying out Elasticsearch", + "likes": 0 } }, { - "_index" : "twitter", - "_type" : "tweet", - "_id" : "1", - "_score" : 1.0, - "_source" : { - "user" : "kimchy", - "postDate" : "2009-11-15T14:12:12", - "message" : "trying out Elasticsearch" + "_index": "twitter", + "_type": "tweet", + "_id": "0", + "_score": 2, + "_source": { + "user": "kimchy", + "date": "2009-11-15T14:12:12", + "message": "trying out Elasticsearch", + "likes": 0 } } ] } } -------------------------------------------------- -// TESTRESPONSE +// TESTRESPONSE[s/"took": 150/"took": "$body.took"/] +// TESTRESPONSE[s/"max_score": 1/"max_score": "$body.hits.max_score"/] +// TESTRESPONSE[s/"_score": 1/"_score": "$body.hits.hits.0._score"/] +// TESTRESPONSE[s/"_score": 2/"_score": "$body.hits.hits.1._score"/] + +[float] +=== Skipping disconnected clusters + +By default all remote clusters that are searched via Cross Cluster Search need to be available when +the search request is executed, otherwise the whole request fails and no search results are returned +despite some of the clusters are available. Remote clusters can be made optional through the +boolean `skip_unavailable` setting, set to `false` by default. + +[source,js] +-------------------------------- +PUT _cluster/settings +{ + "persistent": { + "search.remote.cluster_two.skip_unavailable": true <1> + } +} +-------------------------------- +// CONSOLE +// TEST[continued] +<1> `cluster_two` is made optional + +[source,js] +-------------------------------------------------- +GET /cluster_one:twitter,cluster_two:twitter,twitter/tweet/_search <1> +{ + "query": { + "match": { + "user": "kimchy" + } + } +} +-------------------------------------------------- +// CONSOLE +// TEST[continued] +<1> Search against the `twitter` index in `cluster_one`, `cluster_two` and also locally + +[source,js] +-------------------------------------------------- +{ + "took": 150, + "timed_out": false, + "_shards": { + "total": 2, + "successful": 2, + "failed": 0, + "skipped": 0 + }, + "_clusters": { <1> + "total": 3, + "successful": 2, + "skipped": 1 + }, + "hits": { + "total": 2, + "max_score": 1, + "hits": [ + { + "_index": "cluster_one:twitter", + "_type": "tweet", + "_id": "0", + "_score": 1, + "_source": { + "user": "kimchy", + "date": "2009-11-15T14:12:12", + "message": "trying out Elasticsearch", + "likes": 0 + } + }, + { + "_index": "twitter", + "_type": "tweet", + "_id": "0", + "_score": 2, + "_source": { + "user": "kimchy", + "date": "2009-11-15T14:12:12", + "message": "trying out Elasticsearch", + "likes": 0 + } + } + ] + } +} +-------------------------------------------------- +// TESTRESPONSE[s/"took": 150/"took": "$body.took"/] +// TESTRESPONSE[s/"max_score": 1/"max_score": "$body.hits.max_score"/] +// TESTRESPONSE[s/"_score": 1/"_score": "$body.hits.hits.0._score"/] +// TESTRESPONSE[s/"_score": 2/"_score": "$body.hits.hits.1._score"/] +<1> The `clusters` section indicates that one cluster was unavailable and got skipped + [float] [[cross-cluster-search-settings]] @@ -205,6 +415,14 @@ will be prefixed with their remote cluster name: remote clusters. Cross-cluster search requests must be sent to a node that is allowed to act as a cross-cluster client. +`search.remote.${cluster_alias}.skip_unavailable`:: + + Per cluster boolean setting that allows to skip specific clusters when no + nodes belonging to them are available and they are searched as part of a + cross cluster search request. Default is `false`, meaning that all clusters + are mandatory by default, but they can selectively be made optional by + setting this setting to `true`. + [float] [[retrieve-remote-clusters-info]] === Retrieving remote clusters info diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java index fe754b3881727..a13bdea0ef2f4 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -455,7 +455,8 @@ public ScheduledFuture schedule(TimeValue delay, String name, Runnable comman SearchHit hit = new SearchHit(0, "id", new Text("type"), emptyMap()); SearchHits hits = new SearchHits(new SearchHit[] { hit }, 0, 0); InternalSearchResponse internalResponse = new InternalSearchResponse(hits, null, null, null, false, false, 1); - SearchResponse searchResponse = new SearchResponse(internalResponse, scrollId(), 5, 4, 0, randomLong(), null); + SearchResponse searchResponse = new SearchResponse(internalResponse, scrollId(), 5, 4, 0, randomLong(), null, + SearchResponse.Clusters.EMPTY); if (randomBoolean()) { client.lastScroll.get().listener.onResponse(searchResponse); diff --git a/qa/ccs-unavailable-clusters/build.gradle b/qa/ccs-unavailable-clusters/build.gradle new file mode 100644 index 0000000000000..86d0cb64f65a2 --- /dev/null +++ b/qa/ccs-unavailable-clusters/build.gradle @@ -0,0 +1,25 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +apply plugin: 'elasticsearch.standalone-rest-test' +apply plugin: 'elasticsearch.rest-test' +apply plugin: 'elasticsearch.test-with-dependencies' + +dependencies { + testCompile project(path: ':client:rest-high-level', configuration: 'runtime') +} \ No newline at end of file diff --git a/qa/ccs-unavailable-clusters/src/test/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java b/qa/ccs-unavailable-clusters/src/test/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java new file mode 100644 index 0000000000000..4835c881e034c --- /dev/null +++ b/qa/ccs-unavailable-clusters/src/test/java/org/elasticsearch/search/CrossClusterSearchUnavailableClusterIT.java @@ -0,0 +1,319 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search; + +import org.apache.http.HttpEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.nio.entity.NStringEntity; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; +import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.AfterClass; +import org.junit.Before; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.CoreMatchers.containsString; + +public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase { + + private static RestHighLevelClient restHighLevelClient; + + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Before + public void initHighLevelClient() throws IOException { + super.initClient(); + if (restHighLevelClient == null) { + restHighLevelClient = new HighLevelClient(client()); + } + } + + @AfterClass + public static void cleanupClient() throws IOException { + restHighLevelClient.close(); + restHighLevelClient = null; + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + private static MockTransportService startTransport( + final String id, + final List knownNodes, + final Version version, + final ThreadPool threadPool) { + boolean success = false; + final Settings s = Settings.builder().put("node.name", id).build(); + ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(s); + MockTransportService newService = MockTransportService.createNewService(s, version, threadPool, null); + try { + newService.registerRequestHandler(ClusterSearchShardsAction.NAME, ThreadPool.Names.SAME, ClusterSearchShardsRequest::new, + (request, channel) -> { + channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0], + knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap())); + }); + newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new, + (request, channel) -> { + DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + for (DiscoveryNode node : knownNodes) { + builder.add(node); + } + ClusterState build = ClusterState.builder(clusterName).nodes(builder.build()).build(); + channel.sendResponse(new ClusterStateResponse(clusterName, build, 0L)); + }); + newService.start(); + newService.acceptIncomingRequests(); + success = true; + return newService; + } finally { + if (success == false) { + newService.close(); + } + } + } + + public void testSearchSkipUnavailable() throws IOException { + try (MockTransportService remoteTransport = startTransport("node0", new CopyOnWriteArrayList<>(), Version.CURRENT, threadPool)) { + DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode(); + + updateRemoteClusterSettings(Collections.singletonMap("seeds", remoteNode.getAddress().toString())); + + for (int i = 0; i < 10; i++) { + restHighLevelClient.index(new IndexRequest("index", "doc", String.valueOf(i)).source("field", "value")); + } + Response refreshResponse = client().performRequest("POST", "/index/_refresh"); + assertEquals(200, refreshResponse.getStatusLine().getStatusCode()); + + { + SearchResponse response = restHighLevelClient.search(new SearchRequest("index")); + assertSame(SearchResponse.Clusters.EMPTY, response.getClusters()); + assertEquals(10, response.getHits().totalHits); + assertEquals(10, response.getHits().getHits().length); + } + { + SearchResponse response = restHighLevelClient.search(new SearchRequest("index", "remote1:index")); + assertEquals(2, response.getClusters().getTotal()); + assertEquals(2, response.getClusters().getSuccessful()); + assertEquals(0, response.getClusters().getSkipped()); + assertEquals(10, response.getHits().totalHits); + assertEquals(10, response.getHits().getHits().length); + } + { + SearchResponse response = restHighLevelClient.search(new SearchRequest("remote1:index")); + assertEquals(1, response.getClusters().getTotal()); + assertEquals(1, response.getClusters().getSuccessful()); + assertEquals(0, response.getClusters().getSkipped()); + assertEquals(0, response.getHits().totalHits); + } + + { + SearchResponse response = restHighLevelClient.search(new SearchRequest("index", "remote1:index").scroll("1m")); + assertEquals(2, response.getClusters().getTotal()); + assertEquals(2, response.getClusters().getSuccessful()); + assertEquals(0, response.getClusters().getSkipped()); + assertEquals(10, response.getHits().totalHits); + assertEquals(10, response.getHits().getHits().length); + String scrollId = response.getScrollId(); + SearchResponse scrollResponse = restHighLevelClient.searchScroll(new SearchScrollRequest(scrollId)); + assertSame(SearchResponse.Clusters.EMPTY, scrollResponse.getClusters()); + assertEquals(10, scrollResponse.getHits().totalHits); + assertEquals(0, scrollResponse.getHits().getHits().length); + } + + remoteTransport.close(); + + updateRemoteClusterSettings(Collections.singletonMap("skip_unavailable", true)); + + { + SearchResponse response = restHighLevelClient.search(new SearchRequest("index", "remote1:index")); + assertEquals(2, response.getClusters().getTotal()); + assertEquals(1, response.getClusters().getSuccessful()); + assertEquals(1, response.getClusters().getSkipped()); + assertEquals(10, response.getHits().totalHits); + assertEquals(10, response.getHits().getHits().length); + } + { + SearchResponse response = restHighLevelClient.search(new SearchRequest("remote1:index")); + assertEquals(1, response.getClusters().getTotal()); + assertEquals(0, response.getClusters().getSuccessful()); + assertEquals(1, response.getClusters().getSkipped()); + assertEquals(0, response.getHits().totalHits); + } + + { + SearchResponse response = restHighLevelClient.search(new SearchRequest("index", "remote1:index").scroll("1m")); + assertEquals(2, response.getClusters().getTotal()); + assertEquals(1, response.getClusters().getSuccessful()); + assertEquals(1, response.getClusters().getSkipped()); + assertEquals(10, response.getHits().totalHits); + assertEquals(10, response.getHits().getHits().length); + String scrollId = response.getScrollId(); + SearchResponse scrollResponse = restHighLevelClient.searchScroll(new SearchScrollRequest(scrollId)); + assertSame(SearchResponse.Clusters.EMPTY, scrollResponse.getClusters()); + assertEquals(10, scrollResponse.getHits().totalHits); + assertEquals(0, scrollResponse.getHits().getHits().length); + } + + updateRemoteClusterSettings(Collections.singletonMap("skip_unavailable", false)); + assertSearchConnectFailure(); + + Map map = new HashMap<>(); + map.put("seeds", null); + map.put("skip_unavailable", null); + updateRemoteClusterSettings(map); + } + } + + public void testSkipUnavailableDependsOnSeeds() throws IOException { + try (MockTransportService remoteTransport = startTransport("node0", new CopyOnWriteArrayList<>(), Version.CURRENT, threadPool)) { + DiscoveryNode remoteNode = remoteTransport.getLocalDiscoNode(); + + { + //check that skip_unavailable alone cannot be set + HttpEntity clusterSettingsEntity = buildUpdateSettingsRequestBody( + Collections.singletonMap("skip_unavailable", randomBoolean())); + ResponseException responseException = expectThrows(ResponseException.class, + () -> client().performRequest("PUT", "/_cluster/settings", Collections.emptyMap(), clusterSettingsEntity)); + assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode()); + assertThat(responseException.getMessage(), + containsString("Missing required setting [search.remote.remote1.seeds] " + + "for setting [search.remote.remote1.skip_unavailable]")); + } + + Map settingsMap = new HashMap<>(); + settingsMap.put("seeds", remoteNode.getAddress().toString()); + settingsMap.put("skip_unavailable", randomBoolean()); + updateRemoteClusterSettings(settingsMap); + + { + //check that seeds cannot be reset alone if skip_unavailable is set + HttpEntity clusterSettingsEntity = buildUpdateSettingsRequestBody(Collections.singletonMap("seeds", null)); + ResponseException responseException = expectThrows(ResponseException.class, + () -> client().performRequest("PUT", "/_cluster/settings", Collections.emptyMap(), clusterSettingsEntity)); + assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode()); + assertThat(responseException.getMessage(), containsString("Missing required setting [search.remote.remote1.seeds] " + + "for setting [search.remote.remote1.skip_unavailable]")); + } + + if (randomBoolean()) { + updateRemoteClusterSettings(Collections.singletonMap("skip_unavailable", null)); + updateRemoteClusterSettings(Collections.singletonMap("seeds", null)); + } else { + Map nullMap = new HashMap<>(); + nullMap.put("seeds", null); + nullMap.put("skip_unavailable", null); + updateRemoteClusterSettings(nullMap); + } + } + } + + private static void assertSearchConnectFailure() { + { + ElasticsearchException exception = expectThrows(ElasticsearchException.class, + () -> restHighLevelClient.search(new SearchRequest("index", "remote1:index"))); + ElasticsearchException rootCause = (ElasticsearchException)exception.getRootCause(); + assertThat(rootCause.getMessage(), containsString("connect_exception")); + } + { + ElasticsearchException exception = expectThrows(ElasticsearchException.class, + () -> restHighLevelClient.search(new SearchRequest("remote1:index"))); + ElasticsearchException rootCause = (ElasticsearchException)exception.getRootCause(); + assertThat(rootCause.getMessage(), containsString("connect_exception")); + } + { + ElasticsearchException exception = expectThrows(ElasticsearchException.class, + () -> restHighLevelClient.search(new SearchRequest("remote1:index").scroll("1m"))); + ElasticsearchException rootCause = (ElasticsearchException)exception.getRootCause(); + assertThat(rootCause.getMessage(), containsString("connect_exception")); + } + } + + + + private static void updateRemoteClusterSettings(Map settings) throws IOException { + HttpEntity clusterSettingsEntity = buildUpdateSettingsRequestBody(settings); + Response response = client().performRequest("PUT", "/_cluster/settings", Collections.emptyMap(), clusterSettingsEntity); + assertEquals(200, response.getStatusLine().getStatusCode()); + } + + private static HttpEntity buildUpdateSettingsRequestBody(Map settings) throws IOException { + String requestBody; + try (XContentBuilder builder = JsonXContent.contentBuilder()) { + builder.startObject(); + { + builder.startObject("persistent"); + { + builder.startObject("search.remote.remote1"); + { + for (Map.Entry entry : settings.entrySet()) { + builder.field(entry.getKey(), entry.getValue()); + } + } + builder.endObject(); + } + builder.endObject(); + } + builder.endObject(); + requestBody = builder.string(); + } + return new NStringEntity(requestBody, ContentType.APPLICATION_JSON); + } + + private static class HighLevelClient extends RestHighLevelClient { + private HighLevelClient(RestClient restClient) { + super(restClient, (client) -> {}, Collections.emptyList()); + } + } +} diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yml index ad1a9441231d1..95835785dffc6 100644 --- a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yml +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/20_info.yml @@ -56,3 +56,52 @@ - match: { my_remote_cluster.initial_connect_timeout: "30s" } - match: { test_remote_cluster.initial_connect_timeout: "30s" } +--- +"skip_unavailable is returned as part of _remote/info response": + - skip: + #TODO update versions once backported + version: " - 7.0.0" + reason: "skip_unavailable is only returned from 7.0.0 on" + + - do: + remote.info: {} + - is_false: my_remote_cluster.skip_unavailable + + - do: + cluster.put_settings: + body: + transient: + search.remote.my_remote_cluster.skip_unavailable: true + + - is_true: transient.search.remote.my_remote_cluster.skip_unavailable + + - do: + remote.info: {} + + - is_true: my_remote_cluster.skip_unavailable + + - do: + cluster.put_settings: + body: + transient: + search.remote.my_remote_cluster.skip_unavailable: false + + - is_false: transient.search.remote.my_remote_cluster.skip_unavailable + + - do: + remote.info: {} + + - is_false: my_remote_cluster.skip_unavailable + + - do: + cluster.put_settings: + body: + transient: + search.remote.my_remote_cluster.skip_unavailable: null + + - match: {transient: {}} + + - do: + remote.info: {} + + - is_false: my_remote_cluster.skip_unavailable diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml index aac5c438c323c..df266bb1be9fc 100644 --- a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml @@ -11,6 +11,9 @@ query: match_all: {} + - match: {_clusters.total: 1} + - match: {_clusters.successful: 1} + - match: {_clusters.skipped: 0} - set: {_scroll_id: scroll_id} - match: {hits.total: 6 } - length: {hits.hits: 4 } @@ -23,6 +26,7 @@ scroll: body: { "scroll_id": "$scroll_id", "scroll": "1m"} + - is_false: _clusters - match: {hits.total: 6 } - length: {hits.hits: 2 } - match: {hits.hits.0._source.filter_field: 1 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search/70_response_filtering.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search/70_response_filtering.yml index 71eb5665bea43..ef60d48258cd4 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search/70_response_filtering.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search/70_response_filtering.yml @@ -28,6 +28,7 @@ body: "{ \"query\": { \"match_all\": {} } }" - gte: { took: 0 } + - is_false: _clusters - is_true: _shards.total - is_true: hits.total - is_true: hits.hits.0._index diff --git a/settings.gradle b/settings.gradle index 3a4afaee2be15..e1b31d71b0cc7 100644 --- a/settings.gradle +++ b/settings.gradle @@ -66,6 +66,7 @@ List projects = [ 'plugins:jvm-example', 'plugins:store-smb', 'qa:auto-create-index', + 'qa:ccs-unavailable-clusters', 'qa:evil-tests', 'qa:full-cluster-restart', 'qa:integration-bwc',