Skip to content

Commit 3fba579

Browse files
committed
Cross Cluster Search: make remote clusters optional (#27182)
Today Cross Cluster Search requires at least one node in each remote cluster to be up once the cross cluster search is run. Otherwise the whole search request fails despite some of the data (either local and/or remote) is available. This happens when performing the _search/shards calls to find out which remote shards the query has to be executed on. This scenario is different from shard failures that may happen later on when the query is actually executed, in case e.g. remote shards are missing, which is not going to fail the whole request but rather yield partial results, and the _shards section in the response will indicate that. This commit introduces a boolean setting per cluster called search.remote.$cluster_alias.skip_if_disconnected, set to false by default, which allows to skip certain clusters if they are down when trying to reach them through a cross cluster search requests. By default all clusters are mandatory. Scroll requests support such setting too when they are first initiated (first search request with scroll parameter), but subsequent scroll rounds (_search/scroll endpoint) will fail if some of the remote clusters went down meanwhile. The search API response contains now a new _clusters section, similar to the _shards section, that gets returned whenever one or more clusters were disconnected and got skipped: "_clusters" : { "total" : 3, "successful" : 2, "skipped" : 1 } Such section won't be part of the response if no clusters have been skipped. The per cluster skip_unavailable setting value has also been added to the output of the remote/info API.
1 parent 586cdb1 commit 3fba579

File tree

35 files changed

+1458
-164
lines changed

35 files changed

+1458
-164
lines changed

client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ protected void doExecute(SearchRequest request, ActionListener<SearchResponse> l
5353
new SearchHit[0], 0L, 0.0f),
5454
new InternalAggregations(Collections.emptyList()),
5555
new Suggest(Collections.emptyList()),
56-
new SearchProfileShardResults(Collections.emptyMap()), false, false, 1), "", 1, 1, 0, 0, new ShardSearchFailure[0]));
56+
new SearchProfileShardResults(Collections.emptyMap()), false, false, 1), "", 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY,
57+
SearchResponse.Clusters.EMPTY));
5758
}
5859
}

client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ public void testInfo() throws IOException {
166166
public void testSearchScroll() throws IOException {
167167
Header[] headers = randomHeaders(random(), "Header");
168168
SearchResponse mockSearchResponse = new SearchResponse(new SearchResponseSections(SearchHits.empty(), InternalAggregations.EMPTY,
169-
null, false, false, null, 1), randomAlphaOfLengthBetween(5, 10), 5, 5, 0, 100, new ShardSearchFailure[0]);
169+
null, false, false, null, 1), randomAlphaOfLengthBetween(5, 10), 5, 5, 0, 100, ShardSearchFailure.EMPTY_ARRAY,
170+
SearchResponse.Clusters.EMPTY);
170171
mockResponse(mockSearchResponse);
171172
SearchResponse searchResponse = restHighLevelClient.searchScroll(new SearchScrollRequest(randomAlphaOfLengthBetween(5, 10)),
172173
headers);

client/rest-high-level/src/test/java/org/elasticsearch/client/SearchIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,5 +470,6 @@ private static void assertSearchHeader(SearchResponse searchResponse) {
470470
assertThat(searchResponse.getTotalShards(), greaterThan(0));
471471
assertEquals(searchResponse.getTotalShards(), searchResponse.getSuccessfulShards());
472472
assertEquals(0, searchResponse.getShardFailures().length);
473+
assertEquals(SearchResponse.Clusters.EMPTY, searchResponse.getClusters());
473474
}
474475
}

core/src/main/java/org/elasticsearch/action/admin/cluster/shards/ClusterSearchShardsResponse.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,15 @@
3030

3131
import java.io.IOException;
3232
import java.util.Arrays;
33+
import java.util.Collections;
3334
import java.util.HashMap;
3435
import java.util.Map;
3536

3637
public class ClusterSearchShardsResponse extends ActionResponse implements ToXContentObject {
3738

39+
public static final ClusterSearchShardsResponse EMPTY = new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
40+
new DiscoveryNode[0], Collections.emptyMap());
41+
3842
private ClusterSearchShardsGroup[] groups;
3943
private DiscoveryNode[] nodes;
4044
private Map<String, AliasFilter> indicesAndFilters;

core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,16 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
6868
private final AtomicInteger successfulOps = new AtomicInteger();
6969
private final AtomicInteger skippedOps = new AtomicInteger();
7070
private final TransportSearchAction.SearchTimeProvider timeProvider;
71-
71+
private final SearchResponse.Clusters clusters;
7272

7373
protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
7474
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
7575
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
7676
Executor executor, SearchRequest request,
7777
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
7878
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
79-
SearchTask task, SearchPhaseResults<Result> resultConsumer, int maxConcurrentShardRequests) {
79+
SearchTask task, SearchPhaseResults<Result> resultConsumer, int maxConcurrentShardRequests,
80+
SearchResponse.Clusters clusters) {
8081
super(name, request, shardsIts, logger, maxConcurrentShardRequests, executor);
8182
this.timeProvider = timeProvider;
8283
this.logger = logger;
@@ -90,6 +91,7 @@ protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportS
9091
this.concreteIndexBoosts = concreteIndexBoosts;
9192
this.aliasFilter = aliasFilter;
9293
this.results = resultConsumer;
94+
this.clusters = clusters;
9395
}
9496

9597
/**
@@ -108,7 +110,7 @@ public final void start() {
108110
//no search shards to search on, bail with empty response
109111
//(it happens with search across _all with no indices around and consistent with broadcast operations)
110112
listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, 0, buildTookInMillis(),
111-
ShardSearchFailure.EMPTY_ARRAY));
113+
ShardSearchFailure.EMPTY_ARRAY, clusters));
112114
return;
113115
}
114116
executePhase(this);
@@ -264,7 +266,7 @@ public final SearchRequest getRequest() {
264266
@Override
265267
public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
266268
return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
267-
skippedOps.get(), buildTookInMillis(), buildShardFailures());
269+
skippedOps.get(), buildTookInMillis(), buildShardFailures(), clusters);
268270
}
269271

270272
@Override

core/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import java.util.stream.Stream;
3434

3535
/**
36-
* This search phrase can be used as an initial search phase to pre-filter search shards based on query rewriting.
36+
* This search phase can be used as an initial search phase to pre-filter search shards based on query rewriting.
3737
* The queries are rewritten against the shards and based on the rewrite result shards might be able to be excluded
3838
* from the search. The extra round trip to the search shards is very cheap and is not subject to rejections
3939
* which allows to fan out to more shards at the same time without running into rejections even if we are hitting a
@@ -50,13 +50,15 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
5050
Executor executor, SearchRequest request,
5151
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
5252
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
53-
SearchTask task, Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory) {
53+
SearchTask task, Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
54+
SearchResponse.Clusters clusters) {
5455
/*
5556
* We set max concurrent shard requests to the number of shards to otherwise avoid deep recursing that would occur if the local node
5657
* is the coordinating node for the query, holds all the shards for the request, and there are a lot of shards.
5758
*/
5859
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request,
59-
listener, shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size());
60+
listener, shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size(),
61+
clusters);
6062
this.phaseFactory = phaseFactory;
6163
this.shardsIts = shardsIts;
6264
}

core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
4040
final Map<String, Float> concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor,
4141
final SearchRequest request, final ActionListener<SearchResponse> listener,
4242
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
43-
final long clusterStateVersion, final SearchTask task) {
43+
final long clusterStateVersion, final SearchTask task, SearchResponse.Clusters clusters) {
4444
super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
4545
shardsIts, timeProvider, clusterStateVersion, task, new ArraySearchPhaseResults<>(shardsIts.size()),
46-
request.getMaxConcurrentShardRequests());
46+
request.getMaxConcurrentShardRequests(), clusters);
4747
this.searchPhaseController = searchPhaseController;
4848
}
4949

core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Se
4040
final Map<String, Float> concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor,
4141
final SearchRequest request, final ActionListener<SearchResponse> listener,
4242
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
43-
long clusterStateVersion, SearchTask task) {
43+
long clusterStateVersion, SearchTask task, SearchResponse.Clusters clusters) {
4444
super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
4545
shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size()),
46-
request.getMaxConcurrentShardRequests());
46+
request.getMaxConcurrentShardRequests(), clusters);
4747
this.searchPhaseController = searchPhaseController;
4848
}
4949

core/src/main/java/org/elasticsearch/action/search/SearchResponse.java

Lines changed: 146 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@
2626
import org.elasticsearch.common.Strings;
2727
import org.elasticsearch.common.io.stream.StreamInput;
2828
import org.elasticsearch.common.io.stream.StreamOutput;
29+
import org.elasticsearch.common.io.stream.Writeable;
2930
import org.elasticsearch.common.unit.TimeValue;
3031
import org.elasticsearch.common.xcontent.StatusToXContentObject;
32+
import org.elasticsearch.common.xcontent.ToXContent;
3133
import org.elasticsearch.common.xcontent.XContentBuilder;
3234
import org.elasticsearch.common.xcontent.XContentParser;
3335
import org.elasticsearch.rest.RestStatus;
@@ -43,6 +45,7 @@
4345
import java.util.ArrayList;
4446
import java.util.List;
4547
import java.util.Map;
48+
import java.util.Objects;
4649

4750
import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure;
4851
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
@@ -71,15 +74,18 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
7174

7275
private ShardSearchFailure[] shardFailures;
7376

77+
private Clusters clusters;
78+
7479
private long tookInMillis;
7580

7681
public SearchResponse() {
7782
}
7883

7984
public SearchResponse(SearchResponseSections internalResponse, String scrollId, int totalShards, int successfulShards,
80-
int skippedShards, long tookInMillis, ShardSearchFailure[] shardFailures) {
85+
int skippedShards, long tookInMillis, ShardSearchFailure[] shardFailures, Clusters clusters) {
8186
this.internalResponse = internalResponse;
8287
this.scrollId = scrollId;
88+
this.clusters = clusters;
8389
this.totalShards = totalShards;
8490
this.successfulShards = successfulShards;
8591
this.skippedShards = skippedShards;
@@ -199,6 +205,15 @@ public Map<String, ProfileShardResult> getProfileResults() {
199205
return internalResponse.profile();
200206
}
201207

208+
/**
209+
* Returns info about what clusters the search was executed against. Available only in responses obtained
210+
* from a Cross Cluster Search request, otherwise <code>null</code>
211+
* @see Clusters
212+
*/
213+
public Clusters getClusters() {
214+
return clusters;
215+
}
216+
202217
@Override
203218
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
204219
builder.startObject();
@@ -221,6 +236,7 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
221236
}
222237
RestActions.buildBroadcastShardsHeader(builder, params, getTotalShards(), getSuccessfulShards(), getSkippedShards(),
223238
getFailedShards(), getShardFailures());
239+
clusters.toXContent(builder, params);
224240
internalResponse.toXContent(builder, params);
225241
return builder;
226242
}
@@ -242,6 +258,7 @@ public static SearchResponse fromXContent(XContentParser parser) throws IOExcept
242258
int skippedShards = 0; // 0 for BWC
243259
String scrollId = null;
244260
List<ShardSearchFailure> failures = new ArrayList<>();
261+
Clusters clusters = Clusters.EMPTY;
245262
while((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
246263
if (token == XContentParser.Token.FIELD_NAME) {
247264
currentFieldName = parser.currentName();
@@ -296,6 +313,28 @@ public static SearchResponse fromXContent(XContentParser parser) throws IOExcept
296313
parser.skipChildren();
297314
}
298315
}
316+
} else if (Clusters._CLUSTERS_FIELD.match(currentFieldName)) {
317+
int successful = -1;
318+
int total = -1;
319+
int skipped = -1;
320+
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
321+
if (token == XContentParser.Token.FIELD_NAME) {
322+
currentFieldName = parser.currentName();
323+
} else if (token.isValue()) {
324+
if (Clusters.SUCCESSFUL_FIELD.match(currentFieldName)) {
325+
successful = parser.intValue();
326+
} else if (Clusters.TOTAL_FIELD.match(currentFieldName)) {
327+
total = parser.intValue();
328+
} else if (Clusters.SKIPPED_FIELD.match(currentFieldName)) {
329+
skipped = parser.intValue();
330+
} else {
331+
parser.skipChildren();
332+
}
333+
} else {
334+
parser.skipChildren();
335+
}
336+
}
337+
clusters = new Clusters(total, successful, skipped);
299338
} else {
300339
parser.skipChildren();
301340
}
@@ -304,7 +343,7 @@ public static SearchResponse fromXContent(XContentParser parser) throws IOExcept
304343
SearchResponseSections searchResponseSections = new SearchResponseSections(hits, aggs, suggest, timedOut, terminatedEarly,
305344
profile, numReducePhases);
306345
return new SearchResponse(searchResponseSections, scrollId, totalShards, successfulShards, skippedShards, tookInMillis,
307-
failures.toArray(new ShardSearchFailure[failures.size()]));
346+
failures.toArray(new ShardSearchFailure[failures.size()]), clusters);
308347
}
309348

310349
@Override
@@ -322,6 +361,11 @@ public void readFrom(StreamInput in) throws IOException {
322361
shardFailures[i] = readShardSearchFailure(in);
323362
}
324363
}
364+
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
365+
clusters = new Clusters(in);
366+
} else {
367+
clusters = Clusters.EMPTY;
368+
}
325369
scrollId = in.readOptionalString();
326370
tookInMillis = in.readVLong();
327371
if (in.getVersion().onOrAfter(Version.V_5_6_0)) {
@@ -340,7 +384,9 @@ public void writeTo(StreamOutput out) throws IOException {
340384
for (ShardSearchFailure shardSearchFailure : shardFailures) {
341385
shardSearchFailure.writeTo(out);
342386
}
343-
387+
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
388+
clusters.writeTo(out);
389+
}
344390
out.writeOptionalString(scrollId);
345391
out.writeVLong(tookInMillis);
346392
if(out.getVersion().onOrAfter(Version.V_5_6_0)) {
@@ -353,4 +399,101 @@ public String toString() {
353399
return Strings.toString(this);
354400
}
355401

402+
/**
403+
* Holds info about the clusters that the search was executed on: how many in total, how many of them were successful
404+
* and how many of them were skipped.
405+
*/
406+
public static class Clusters implements ToXContent, Writeable {
407+
408+
public static final Clusters EMPTY = new Clusters(0, 0, 0);
409+
410+
static final ParseField _CLUSTERS_FIELD = new ParseField("_clusters");
411+
static final ParseField SUCCESSFUL_FIELD = new ParseField("successful");
412+
static final ParseField SKIPPED_FIELD = new ParseField("skipped");
413+
static final ParseField TOTAL_FIELD = new ParseField("total");
414+
415+
private final int total;
416+
private final int successful;
417+
private final int skipped;
418+
419+
Clusters(int total, int successful, int skipped) {
420+
assert total >= 0 && successful >= 0 && skipped >= 0
421+
: "total: " + total + " successful: " + successful + " skipped: " + skipped;
422+
assert successful <= total && skipped == total - successful
423+
: "total: " + total + " successful: " + successful + " skipped: " + skipped;
424+
this.total = total;
425+
this.successful = successful;
426+
this.skipped = skipped;
427+
}
428+
429+
private Clusters(StreamInput in) throws IOException {
430+
this.total = in.readVInt();
431+
this.successful = in.readVInt();
432+
this.skipped = in.readVInt();
433+
}
434+
435+
@Override
436+
public void writeTo(StreamOutput out) throws IOException {
437+
out.writeVInt(total);
438+
out.writeVInt(successful);
439+
out.writeVInt(skipped);
440+
}
441+
442+
@Override
443+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
444+
if (this != EMPTY) {
445+
builder.startObject(_CLUSTERS_FIELD.getPreferredName());
446+
builder.field(TOTAL_FIELD.getPreferredName(), total);
447+
builder.field(SUCCESSFUL_FIELD.getPreferredName(), successful);
448+
builder.field(SKIPPED_FIELD.getPreferredName(), skipped);
449+
builder.endObject();
450+
}
451+
return builder;
452+
}
453+
454+
/**
455+
* Returns how many total clusters the search was requested to be executed on
456+
*/
457+
public int getTotal() {
458+
return total;
459+
}
460+
461+
/**
462+
* Returns how many total clusters the search was executed successfully on
463+
*/
464+
public int getSuccessful() {
465+
return successful;
466+
}
467+
468+
/**
469+
* Returns how many total clusters were during the execution of the search request
470+
*/
471+
public int getSkipped() {
472+
return skipped;
473+
}
474+
475+
@Override
476+
public boolean equals(Object o) {
477+
if (this == o) {
478+
return true;
479+
}
480+
if (o == null || getClass() != o.getClass()) {
481+
return false;
482+
}
483+
Clusters clusters = (Clusters) o;
484+
return total == clusters.total &&
485+
successful == clusters.successful &&
486+
skipped == clusters.skipped;
487+
}
488+
489+
@Override
490+
public int hashCode() {
491+
return Objects.hash(total, successful, skipped);
492+
}
493+
494+
@Override
495+
public String toString() {
496+
return "Clusters{total=" + total + ", successful=" + successful + ", skipped=" + skipped + '}';
497+
}
498+
}
356499
}

core/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ protected final void sendResponse(SearchPhaseController.ReducedQueryPhase queryP
249249
scrollId = request.scrollId();
250250
}
251251
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(),
252-
0, buildTookInMillis(), buildShardFailures()));
252+
0, buildTookInMillis(), buildShardFailures(), SearchResponse.Clusters.EMPTY));
253253
} catch (Exception e) {
254254
listener.onFailure(new ReduceSearchPhaseException("fetch", "inner finish failed", e, buildShardFailures()));
255255
}

0 commit comments

Comments
 (0)