Skip to content

Commit 161ec69

Browse files
authored
Support point in time cross cluster search (#61827)
This commit integrates point in time into cross cluster search. Relates #61062 Closes #61790
1 parent 8a95e3c commit 161ec69

File tree

11 files changed

+371
-78
lines changed

11 files changed

+371
-78
lines changed

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 94 additions & 57 deletions
Large diffs are not rendered by default.

server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,16 +168,17 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r
168168
if (scroll != null) {
169169
searchRequest.scroll(new Scroll(parseTimeValue(scroll, null, "scroll")));
170170
}
171-
172171
searchRequest.routing(request.param("routing"));
173172
searchRequest.preference(request.param("preference"));
174173
searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions()));
175-
searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips()));
176174

177175
checkRestTotalHits(request, searchRequest);
178176

179177
if (searchRequest.pointInTimeBuilder() != null) {
180-
preparePointInTime(searchRequest, namedWriteableRegistry);
178+
preparePointInTime(searchRequest, request, namedWriteableRegistry);
179+
} else {
180+
searchRequest.setCcsMinimizeRoundtrips(
181+
request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips()));
181182
}
182183
}
183184

@@ -293,7 +294,7 @@ private static void parseSearchSource(final SearchSourceBuilder searchSourceBuil
293294
}
294295
}
295296

296-
static void preparePointInTime(SearchRequest request, NamedWriteableRegistry namedWriteableRegistry) {
297+
static void preparePointInTime(SearchRequest request, RestRequest restRequest, NamedWriteableRegistry namedWriteableRegistry) {
297298
assert request.pointInTimeBuilder() != null;
298299
ActionRequestValidationException validationException = null;
299300
if (request.indices().length > 0) {
@@ -308,6 +309,11 @@ static void preparePointInTime(SearchRequest request, NamedWriteableRegistry nam
308309
if (request.preference() != null) {
309310
validationException = addValidationError("[preference] cannot be used with point in time", validationException);
310311
}
312+
if (restRequest.paramAsBoolean("ccs_minimize_roundtrips", false)) {
313+
validationException =
314+
addValidationError("[ccs_minimize_roundtrips] cannot be used with point in time", validationException);
315+
request.setCcsMinimizeRoundtrips(false);
316+
}
311317
ExceptionsHelper.reThrowIfNotNull(validationException);
312318

313319
final IndicesOptions indicesOptions = request.indicesOptions();

server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,6 @@ public void testProcessRemoteShards() {
208208
null)) {
209209
RemoteClusterService service = transportService.getRemoteClusterService();
210210
assertFalse(service.isCrossClusterSearchEnabled());
211-
List<SearchShardIterator> iteratorList = new ArrayList<>();
212211
Map<String, ClusterSearchShardsResponse> searchShardsResponseMap = new HashMap<>();
213212
DiscoveryNode[] nodes = new DiscoveryNode[] {
214213
new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT),
@@ -246,9 +245,9 @@ public void testProcessRemoteShards() {
246245
new OriginalIndices(new String[]{"fo*", "ba*"}, SearchRequest.DEFAULT_INDICES_OPTIONS));
247246
remoteIndicesByCluster.put("test_cluster_2",
248247
new OriginalIndices(new String[]{"x*"}, SearchRequest.DEFAULT_INDICES_OPTIONS));
249-
Map<String, AliasFilter> remoteAliases = new HashMap<>();
250-
TransportSearchAction.processRemoteShards(searchShardsResponseMap, remoteIndicesByCluster, iteratorList,
251-
remoteAliases);
248+
Map<String, AliasFilter> remoteAliases = TransportSearchAction.getRemoteAliasFilters(searchShardsResponseMap);
249+
List<SearchShardIterator> iteratorList =
250+
TransportSearchAction.getRemoteShardsIterator(searchShardsResponseMap, remoteIndicesByCluster, remoteAliases);
252251
assertEquals(4, iteratorList.size());
253252
for (SearchShardIterator iterator : iteratorList) {
254253
if (iterator.shardId().getIndexName().endsWith("foo")) {

x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ public void setupSuiteScopeCluster() throws InterruptedException {
8989
indexRandom(true, true, reqs);
9090
}
9191

92-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/61790")
9392
public void testMaxMinAggregation() throws Exception {
9493
int step = numShards > 2 ? randomIntBetween(2, numShards) : 2;
9594
int numFailures = randomBoolean() ? randomIntBetween(0, numShards) : 0;
@@ -134,7 +133,6 @@ public void testMaxMinAggregation() throws Exception {
134133
}
135134
}
136135

137-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/61790")
138136
public void testTermsAggregation() throws Exception {
139137
int step = numShards > 2 ? randomIntBetween(2, numShards) : 2;
140138
int numFailures = randomBoolean() ? randomIntBetween(0, numShards) : 0;
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.core.search;
8+
9+
import org.elasticsearch.action.search.SearchResponse;
10+
import org.elasticsearch.client.Client;
11+
import org.elasticsearch.common.unit.TimeValue;
12+
import org.elasticsearch.index.query.MatchAllQueryBuilder;
13+
import org.elasticsearch.plugins.Plugin;
14+
import org.elasticsearch.test.AbstractMultiClustersTestCase;
15+
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
16+
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
17+
import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest;
18+
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction;
19+
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest;
20+
import org.elasticsearch.xpack.core.search.action.OpenPointInTimeResponse;
21+
22+
import java.util.ArrayList;
23+
import java.util.Collection;
24+
import java.util.List;
25+
26+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
27+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
28+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
29+
30+
public class CCSPointInTimeIT extends AbstractMultiClustersTestCase {
31+
32+
@Override
33+
protected Collection<String> remoteClusterAlias() {
34+
return List.of("remote_cluster");
35+
}
36+
37+
@Override
38+
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
39+
final List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
40+
plugins.add(LocalStateCompositeXPackPlugin.class);
41+
return plugins;
42+
}
43+
44+
void indexDocs(Client client, String index, int numDocs) {
45+
for (int i = 0; i < numDocs; i++) {
46+
String id = Integer.toString(i);
47+
client.prepareIndex(index).setId(id).setSource("value", i).get();
48+
}
49+
client.admin().indices().prepareRefresh(index).get();
50+
}
51+
52+
public void testBasic() {
53+
final Client localClient = client(LOCAL_CLUSTER);
54+
final Client remoteClient = client("remote_cluster");
55+
int localNumDocs = randomIntBetween(10, 50);
56+
assertAcked(localClient.admin().indices().prepareCreate("local_test"));
57+
indexDocs(localClient, "local_test", localNumDocs);
58+
59+
int remoteNumDocs = randomIntBetween(10, 50);
60+
assertAcked(remoteClient.admin().indices().prepareCreate("remote_test"));
61+
indexDocs(remoteClient, "remote_test", remoteNumDocs);
62+
boolean includeLocalIndex = randomBoolean();
63+
List<String> indices = new ArrayList<>();
64+
if (includeLocalIndex) {
65+
indices.add( randomFrom("*", "local_*", "local_test"));
66+
}
67+
indices.add(randomFrom("*:*", "remote_cluster:*", "remote_cluster:remote_test"));
68+
String pitId = openPointInTime(indices.toArray(new String[0]), TimeValue.timeValueMinutes(2));
69+
try {
70+
if (randomBoolean()) {
71+
localClient.prepareIndex("local_test").setId("local_new").setSource().get();
72+
localClient.admin().indices().prepareRefresh().get();
73+
}
74+
if (randomBoolean()) {
75+
remoteClient.prepareIndex("remote_test").setId("remote_new").setSource().get();
76+
remoteClient.admin().indices().prepareRefresh().get();
77+
}
78+
SearchResponse resp = localClient.prepareSearch()
79+
.setPreference(null)
80+
.setQuery(new MatchAllQueryBuilder())
81+
.setSearchContext(pitId, TimeValue.timeValueMinutes(2))
82+
.setSize(1000)
83+
.get();
84+
assertNoFailures(resp);
85+
assertHitCount(resp, (includeLocalIndex ? localNumDocs : 0) + remoteNumDocs);
86+
} finally {
87+
closePointInTime(pitId);
88+
}
89+
}
90+
91+
private String openPointInTime(String[] indices, TimeValue keepAlive) {
92+
OpenPointInTimeRequest request = new OpenPointInTimeRequest(
93+
indices,
94+
OpenPointInTimeRequest.DEFAULT_INDICES_OPTIONS,
95+
keepAlive,
96+
null,
97+
null
98+
);
99+
final OpenPointInTimeResponse response = client().execute(OpenPointInTimeAction.INSTANCE, request).actionGet();
100+
return response.getSearchContextId();
101+
}
102+
103+
private void closePointInTime(String readerId) {
104+
client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(readerId)).actionGet();
105+
}
106+
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/OpenPointInTimeResponse.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@
1414
import org.elasticsearch.common.xcontent.XContentBuilder;
1515

1616
import java.io.IOException;
17+
import java.util.Objects;
1718

1819
public final class OpenPointInTimeResponse extends ActionResponse implements ToXContentObject {
1920
private static final ParseField ID = new ParseField("id");
2021

2122
private final String searchContextId;
2223

2324
public OpenPointInTimeResponse(String searchContextId) {
24-
this.searchContextId = searchContextId;
25+
this.searchContextId = Objects.requireNonNull(searchContextId);
2526
}
2627

2728
public OpenPointInTimeResponse(StreamInput in) throws IOException {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/TransportOpenPointInTimeAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen
7272
.preference(request.preference())
7373
.routing(request.routing())
7474
.allowPartialSearchResults(false);
75+
searchRequest.setCcsMinimizeRoundtrips(false);
7576
transportSearchAction.executeRequest(
7677
task,
7778
searchRequest,
@@ -91,7 +92,10 @@ protected void doExecute(Task task, OpenPointInTimeRequest request, ActionListen
9192
new ActionListenerResponseHandler<SearchPhaseResult>(phaseListener, ShardOpenReaderResponse::new)
9293
);
9394
},
94-
ActionListener.map(listener, r -> new OpenPointInTimeResponse(r.pointInTimeId()))
95+
ActionListener.map(listener, r -> {
96+
assert r.pointInTimeId() != null : r;
97+
return new OpenPointInTimeResponse(r.pointInTimeId());
98+
})
9599
);
96100
}
97101

x-pack/qa/multi-cluster-search-security/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ dependencies {
1010

1111
restResources {
1212
restApi {
13-
includeXpack 'security', 'async_search', 'indices'
13+
includeXpack 'security', 'async_search', 'indices', 'open_point_in_time', 'close_point_in_time'
1414
}
1515
}
1616

x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/100_resolve_index.yml

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,15 @@
3030
- match: {indices.5.attributes.0: open}
3131
- match: {indices.6.name: my_remote_cluster:field_caps_index_3}
3232
- match: {indices.6.attributes.0: open}
33-
- match: {indices.7.name: my_remote_cluster:secured_via_alias}
34-
- match: {indices.7.attributes.0: open}
35-
- match: {indices.8.name: my_remote_cluster:single_doc_index}
33+
- match: {indices.7.name: my_remote_cluster:point_in_time_index }
34+
- match: {indices.7.attributes.0: open }
35+
- match: {indices.8.name: my_remote_cluster:secured_via_alias}
3636
- match: {indices.8.attributes.0: open}
37-
- match: {indices.9.name: my_remote_cluster:test_index}
38-
- match: {indices.9.aliases.0: aliased_test_index}
39-
- match: {indices.9.attributes.0: open}
37+
- match: {indices.9.name: my_remote_cluster:single_doc_index}
38+
- match: {indices.10.attributes.0: open}
39+
- match: {indices.10.name: my_remote_cluster:test_index}
40+
- match: {indices.10.aliases.0: aliased_test_index}
41+
- match: {indices.10.attributes.0: open}
4042
- match: {aliases.0.name: my_remote_cluster:.security}
4143
- match: {aliases.0.indices.0: .security-7}
4244
- match: {aliases.1.name: my_remote_cluster:aliased_closed_index}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
---
2+
setup:
3+
- skip:
4+
features: headers
5+
6+
- do:
7+
cluster.health:
8+
wait_for_status: yellow
9+
- do:
10+
security.put_user:
11+
username: "joe"
12+
body: >
13+
{
14+
"password": "s3krit",
15+
"roles" : [ "x_cluster_role" ]
16+
}
17+
- do:
18+
security.put_role:
19+
name: "x_cluster_role"
20+
body: >
21+
{
22+
"cluster": [],
23+
"indices": [
24+
{
25+
"names": ["local_pit", "my_remote_cluster:point_in_time_index"],
26+
"privileges": ["read"]
27+
}
28+
]
29+
}
30+
31+
- do:
32+
security.put_user:
33+
username: "remote"
34+
body: >
35+
{
36+
"password": "s3krit",
37+
"roles" : [ "remote_ccs" ]
38+
}
39+
- do:
40+
security.put_role:
41+
name: "remote_ccs"
42+
body: >
43+
{
44+
}
45+
---
46+
teardown:
47+
- do:
48+
security.delete_user:
49+
username: "joe"
50+
ignore: 404
51+
- do:
52+
security.delete_role:
53+
name: "x_cluster_role"
54+
ignore: 404
55+
---
56+
"Search with point in time":
57+
58+
- do:
59+
indices.create:
60+
index: local_pit
61+
body:
62+
settings:
63+
index:
64+
number_of_shards: 2
65+
number_of_replicas: 0
66+
mappings:
67+
properties:
68+
created_at:
69+
type: date
70+
format: "yyyy-MM-dd"
71+
- do:
72+
bulk:
73+
refresh: true
74+
body:
75+
- '{"index": {"_index": "local_pit"}}'
76+
- '{"f": "l1", "created_at" : "2020-01-01"}'
77+
- '{"index": {"_index": "local_pit"}}'
78+
- '{"f": "l2", "created_at" : "2021-01-02"}'
79+
80+
- do:
81+
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
82+
open_point_in_time:
83+
index: my_remote_cluster:point_in_time_index,local_pit
84+
keep_alive: 5m
85+
- set: {id: pit_id}
86+
87+
- do:
88+
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
89+
search:
90+
rest_total_hits_as_int: true
91+
sort: created_at
92+
body:
93+
query:
94+
range:
95+
created_at:
96+
gte: "2020-01-03"
97+
pit:
98+
id: "$pit_id"
99+
keep_alive: 1m
100+
101+
- match: { hits.total: 3 }
102+
- match: { hits.hits.0._index: "my_remote_cluster:point_in_time_index" }
103+
- match: { hits.hits.0._source.f: "r3" }
104+
- match: { hits.hits.1._index: "my_remote_cluster:point_in_time_index" }
105+
- match: { hits.hits.1._source.f: "r4" }
106+
- match: { hits.hits.2._index: "local_pit" }
107+
- match: { hits.hits.2._source.f: "l2" }
108+
109+
- do:
110+
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
111+
close_point_in_time:
112+
body:
113+
id: "$pit_id"

0 commit comments

Comments
 (0)