Skip to content

Commit a1a49f2

Browse files
committed
Tie break search shard iterator comparisons on cluster alias (#38853)
`SearchShardIterator` inherits its `compareTo` implementation from `PlainShardIterator`. That is good in most of the cases, as such comparisons are based on the shard id which is unique, even when searching against indices with same names across multiple clusters (thanks to the index uuid being different). In case though the same cluster is registered multiple times with different aliases, the shard id is exactly the same, hence remote results will be returned before local ones with same shard id objects. That is because remote iterators are added before local ones, and we use a stable sorting method in `GroupShardIterators` constructor. This PR enhances `compareTo` for `SearchShardIterator` to tie break on cluster alias and introduces consistent `equals` and `hashcode` methods. This allows to remove a TODO in `SearchResponseMerger` which otherwise has to handle this special case specifically. Also, while at it I added missing tests around equals/hashcode and compareTo and expanded existing ones.
1 parent 7e20a92 commit a1a49f2

File tree

9 files changed

+380
-144
lines changed

9 files changed

+380
-144
lines changed

docs/reference/modules/cross-cluster-search.asciidoc

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,10 @@ will be prefixed with their remote cluster name:
150150
"max_score": 1,
151151
"hits": [
152152
{
153-
"_index": "cluster_one:twitter",
153+
"_index": "twitter",
154154
"_type": "_doc",
155155
"_id": "0",
156-
"_score": 1,
156+
"_score": 2,
157157
"_source": {
158158
"user": "kimchy",
159159
"date": "2009-11-15T14:12:12",
@@ -162,10 +162,10 @@ will be prefixed with their remote cluster name:
162162
}
163163
},
164164
{
165-
"_index": "twitter",
165+
"_index": "cluster_one:twitter",
166166
"_type": "_doc",
167167
"_id": "0",
168-
"_score": 2,
168+
"_score": 1,
169169
"_source": {
170170
"user": "kimchy",
171171
"date": "2009-11-15T14:12:12",
@@ -243,10 +243,10 @@ GET /cluster_one:twitter,cluster_two:twitter,twitter/_search <1>
243243
"max_score": 1,
244244
"hits": [
245245
{
246-
"_index": "cluster_one:twitter",
246+
"_index": "twitter",
247247
"_type": "_doc",
248248
"_id": "0",
249-
"_score": 1,
249+
"_score": 2,
250250
"_source": {
251251
"user": "kimchy",
252252
"date": "2009-11-15T14:12:12",
@@ -255,10 +255,10 @@ GET /cluster_one:twitter,cluster_two:twitter,twitter/_search <1>
255255
}
256256
},
257257
{
258-
"_index": "twitter",
258+
"_index": "cluster_one:twitter",
259259
"_type": "_doc",
260260
"_id": "0",
261-
"_score": 2,
261+
"_score": 1,
262262
"_source": {
263263
"user": "kimchy",
264264
"date": "2009-11-15T14:12:12",

server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.elasticsearch.search.profile.ProfileShardResult;
4040
import org.elasticsearch.search.profile.SearchProfileShardResults;
4141
import org.elasticsearch.search.suggest.Suggest;
42-
import org.elasticsearch.transport.RemoteClusterAware;
4342

4443
import java.util.ArrayList;
4544
import java.util.Arrays;
@@ -368,17 +367,7 @@ public int compareTo(ShardIdAndClusterAlias o) {
368367
if (shardIdCompareTo != 0) {
369368
return shardIdCompareTo;
370369
}
371-
int clusterAliasCompareTo = clusterAlias.compareTo(o.clusterAlias);
372-
if (clusterAliasCompareTo != 0) {
373-
//TODO we may want to fix this, CCS returns remote results before local ones (TransportSearchAction#mergeShardsIterators)
374-
if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
375-
return 1;
376-
}
377-
if (o.clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
378-
return -1;
379-
}
380-
}
381-
return clusterAliasCompareTo;
370+
return clusterAlias.compareTo(o.clusterAlias);
382371
}
383372
}
384373
}

server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121

2222
import org.elasticsearch.action.OriginalIndices;
2323
import org.elasticsearch.cluster.routing.PlainShardIterator;
24+
import org.elasticsearch.cluster.routing.ShardIterator;
2425
import org.elasticsearch.cluster.routing.ShardRouting;
2526
import org.elasticsearch.common.Nullable;
2627
import org.elasticsearch.index.shard.ShardId;
2728
import org.elasticsearch.search.SearchShardTarget;
2829

2930
import java.util.List;
31+
import java.util.Objects;
3032

3133
/**
3234
* Extension of {@link PlainShardIterator} used in the search api, which also holds the {@link OriginalIndices}
@@ -93,4 +95,43 @@ void resetAndSkip() {
9395
boolean skip() {
9496
return skip;
9597
}
98+
99+
@Override
100+
public boolean equals(Object o) {
101+
if (this == o) {
102+
return true;
103+
}
104+
if (o == null || getClass() != o.getClass()) {
105+
return false;
106+
}
107+
if (super.equals(o) == false) {
108+
return false;
109+
}
110+
SearchShardIterator that = (SearchShardIterator) o;
111+
return Objects.equals(clusterAlias, that.clusterAlias);
112+
}
113+
114+
@Override
115+
public int hashCode() {
116+
return Objects.hash(super.hashCode(), clusterAlias);
117+
}
118+
119+
@Override
120+
public int compareTo(ShardIterator o) {
121+
int superCompareTo = super.compareTo(o);
122+
if (superCompareTo != 0 || (o instanceof SearchShardIterator == false)) {
123+
return superCompareTo;
124+
}
125+
SearchShardIterator searchShardIterator = (SearchShardIterator)o;
126+
if (clusterAlias == null && searchShardIterator.getClusterAlias() == null) {
127+
return 0;
128+
}
129+
if (clusterAlias == null) {
130+
return -1;
131+
}
132+
if (searchShardIterator.getClusterAlias() == null) {
133+
return 1;
134+
}
135+
return clusterAlias.compareTo(searchShardIterator.getClusterAlias());
136+
}
96137
}

server/src/test/java/org/elasticsearch/action/OriginalIndicesTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public void testOriginalIndicesSerialization() throws IOException {
5353
}
5454
}
5555

56-
private static OriginalIndices randomOriginalIndices() {
56+
public static OriginalIndices randomOriginalIndices() {
5757
int numIndices = randomInt(10);
5858
String[] indices = new String[numIndices];
5959
for (int j = 0; j < indices.length; j++) {

server/src/test/java/org/elasticsearch/action/search/SearchResponseMergerTests.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -632,12 +632,6 @@ public int compare(SearchHit a, SearchHit b) {
632632
}
633633
int clusterAliasCompareTo = aShard.getClusterAlias().compareTo(bShard.getClusterAlias());
634634
if (clusterAliasCompareTo != 0) {
635-
if (aShard.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
636-
return 1;
637-
}
638-
if (bShard.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
639-
return -1;
640-
}
641635
return clusterAliasCompareTo;
642636
}
643637
return Integer.compare(a.docId(), b.docId());

server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,19 @@
2020
package org.elasticsearch.action.search;
2121

2222
import org.elasticsearch.action.OriginalIndices;
23+
import org.elasticsearch.action.OriginalIndicesTests;
2324
import org.elasticsearch.action.support.IndicesOptions;
25+
import org.elasticsearch.cluster.routing.GroupShardsIteratorTests;
2426
import org.elasticsearch.index.shard.ShardId;
2527
import org.elasticsearch.search.SearchShardTarget;
2628
import org.elasticsearch.test.ESTestCase;
29+
import org.elasticsearch.test.EqualsHashCodeTestUtils;
30+
import org.hamcrest.Matchers;
2731

32+
import java.util.ArrayList;
33+
import java.util.Arrays;
2834
import java.util.Collections;
35+
import java.util.List;
2936

3037
public class SearchShardIteratorTests extends ESTestCase {
3138

@@ -64,4 +71,79 @@ public void testNewSearchShardTarget() {
6471
assertEquals(nodeId, searchShardTarget.getNodeId());
6572
assertSame(originalIndices, searchShardTarget.getOriginalIndices());
6673
}
74+
75+
public void testEqualsAndHashcode() {
76+
EqualsHashCodeTestUtils.checkEqualsAndHashCode(randomSearchShardIterator(), s -> new SearchShardIterator(s.getClusterAlias(),
77+
s.shardId(), s.getShardRoutings(), s.getOriginalIndices()), s -> {
78+
if (randomBoolean()) {
79+
String clusterAlias;
80+
if (s.getClusterAlias() == null) {
81+
clusterAlias = randomAlphaOfLengthBetween(5, 10);
82+
} else {
83+
clusterAlias = randomBoolean() ? null : s.getClusterAlias() + randomAlphaOfLength(3);
84+
}
85+
return new SearchShardIterator(clusterAlias, s.shardId(), s.getShardRoutings(), s.getOriginalIndices());
86+
} else {
87+
ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10),
88+
randomIntBetween(0, Integer.MAX_VALUE));
89+
return new SearchShardIterator(s.getClusterAlias(), shardId, s.getShardRoutings(), s.getOriginalIndices());
90+
}
91+
});
92+
}
93+
94+
public void testCompareTo() {
95+
String[] clusters = generateRandomStringArray(2, 10, false, false);
96+
Arrays.sort(clusters);
97+
String[] indices = generateRandomStringArray(3, 10, false, false);
98+
Arrays.sort(indices);
99+
String[] uuids = generateRandomStringArray(3, 10, false, false);
100+
Arrays.sort(uuids);
101+
List<SearchShardIterator> shardIterators = new ArrayList<>();
102+
int numShards = randomIntBetween(1, 5);
103+
for (int i = 0; i < numShards; i++) {
104+
for (String index : indices) {
105+
for (String uuid : uuids) {
106+
ShardId shardId = new ShardId(index, uuid, i);
107+
shardIterators.add(new SearchShardIterator(null, shardId, GroupShardsIteratorTests.randomShardRoutings(shardId),
108+
OriginalIndicesTests.randomOriginalIndices()));
109+
for (String cluster : clusters) {
110+
shardIterators.add(new SearchShardIterator(cluster, shardId, GroupShardsIteratorTests.randomShardRoutings(shardId),
111+
OriginalIndicesTests.randomOriginalIndices()));
112+
}
113+
114+
}
115+
}
116+
}
117+
for (int i = 0; i < shardIterators.size(); i++) {
118+
SearchShardIterator currentIterator = shardIterators.get(i);
119+
for (int j = i + 1; j < shardIterators.size(); j++) {
120+
SearchShardIterator greaterIterator = shardIterators.get(j);
121+
assertThat(currentIterator, Matchers.lessThan(greaterIterator));
122+
assertThat(greaterIterator, Matchers.greaterThan(currentIterator));
123+
assertNotEquals(currentIterator, greaterIterator);
124+
}
125+
for (int j = i - 1; j >= 0; j--) {
126+
SearchShardIterator smallerIterator = shardIterators.get(j);
127+
assertThat(smallerIterator, Matchers.lessThan(currentIterator));
128+
assertThat(currentIterator, Matchers.greaterThan(smallerIterator));
129+
assertNotEquals(currentIterator, smallerIterator);
130+
}
131+
}
132+
}
133+
134+
public void testCompareToEqualItems() {
135+
SearchShardIterator shardIterator1 = randomSearchShardIterator();
136+
SearchShardIterator shardIterator2 = new SearchShardIterator(shardIterator1.getClusterAlias(), shardIterator1.shardId(),
137+
shardIterator1.getShardRoutings(), shardIterator1.getOriginalIndices());
138+
assertEquals(shardIterator1, shardIterator2);
139+
assertEquals(0, shardIterator1.compareTo(shardIterator2));
140+
assertEquals(0, shardIterator2.compareTo(shardIterator1));
141+
}
142+
143+
private static SearchShardIterator randomSearchShardIterator() {
144+
String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10);
145+
ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomIntBetween(0, Integer.MAX_VALUE));
146+
return new SearchShardIterator(clusterAlias, shardId, GroupShardsIteratorTests.randomShardRoutings(shardId),
147+
OriginalIndicesTests.randomOriginalIndices());
148+
}
67149
}

0 commit comments

Comments
 (0)