Skip to content

Commit 2f4dafa

Browse files
authored
Add support for providing absolute start time to SearchRequest (#37142)
We have recently added support for providing a local cluster alias to a SearchRequest through a package protected constructor. When executing cross-cluster search requests with local reduction on each cluster, the CCS coordinating node will have to provide such cluster alias to each remote cluster, as well as the absolute start time of the search action in milliseconds from the time epoch, to be used when evaluating date math expressions both while executing queries / scripts as well as when resolving index names. This commit adds support for providing the start time together with the cluster alias. It is a final member in the search request, which will only be set when using cross-cluster search with local reduction (also known as alternate execution mode). When not provided, the coordinating node will determine the current time and pass it through (by calling `System.currentTimeMillis`). Relates to #32125
1 parent 6347461 commit 2f4dafa

File tree

4 files changed

+125
-30
lines changed

4 files changed

+125
-30
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchRequest.java

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,10 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
6262
public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 128;
6363
public static final int DEFAULT_BATCHED_REDUCE_SIZE = 512;
6464

65+
private static final long DEFAULT_ABSOLUTE_START_MILLIS = -1;
66+
6567
private final String localClusterAlias;
68+
private final long absoluteStartMillis;
6669

6770
private SearchType searchType = SearchType.DEFAULT;
6871

@@ -95,6 +98,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
9598

9699
public SearchRequest() {
97100
this.localClusterAlias = null;
101+
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
98102
}
99103

100104
/**
@@ -115,6 +119,7 @@ public SearchRequest(SearchRequest searchRequest) {
115119
this.source = searchRequest.source;
116120
this.types = searchRequest.types;
117121
this.localClusterAlias = searchRequest.localClusterAlias;
122+
this.absoluteStartMillis = searchRequest.absoluteStartMillis;
118123
}
119124

120125
/**
@@ -138,12 +143,17 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {
138143
}
139144

140145
/**
141-
* Creates a new search request by providing the alias of the cluster where it will be executed. Used when a {@link SearchRequest}
142-
* is created and executed as part of a cross-cluster search request performing local reduction on each cluster.
143-
* The coordinating CCS node provides the alias to prefix index names with in the returned search results.
146+
* Creates a new search request by providing the alias of the cluster where it will be executed, as well as the current time in
147+
* milliseconds from the epoch time. Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search
148+
* request performing local reduction on each cluster. The coordinating CCS node provides the alias to prefix index names with in
149+
* the returned search results, and the current time to be used on the remote clusters to ensure that the same value is used.
144150
*/
145-
SearchRequest(String localClusterAlias) {
151+
SearchRequest(String localClusterAlias, long absoluteStartMillis) {
146152
this.localClusterAlias = Objects.requireNonNull(localClusterAlias, "cluster alias must not be null");
153+
if (absoluteStartMillis < 0) {
154+
throw new IllegalArgumentException("absoluteStartMillis must not be negative but was [" + absoluteStartMillis + "]");
155+
}
156+
this.absoluteStartMillis = absoluteStartMillis;
147157
}
148158

149159
/**
@@ -155,10 +165,7 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {
155165
public SearchRequest(StreamInput in) throws IOException {
156166
super(in);
157167
searchType = SearchType.fromId(in.readByte());
158-
indices = new String[in.readVInt()];
159-
for (int i = 0; i < indices.length; i++) {
160-
indices[i] = in.readString();
161-
}
168+
indices = in.readStringArray();
162169
routing = in.readOptionalString();
163170
preference = in.readOptionalString();
164171
scroll = in.readOptionalWriteable(Scroll::new);
@@ -175,19 +182,22 @@ public SearchRequest(StreamInput in) throws IOException {
175182
//TODO update version after backport
176183
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
177184
localClusterAlias = in.readOptionalString();
185+
if (localClusterAlias != null) {
186+
absoluteStartMillis = in.readVLong();
187+
} else {
188+
absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
189+
}
178190
} else {
179191
localClusterAlias = null;
192+
absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
180193
}
181194
}
182195

183196
@Override
184197
public void writeTo(StreamOutput out) throws IOException {
185198
super.writeTo(out);
186199
out.writeByte(searchType.id());
187-
out.writeVInt(indices.length);
188-
for (String index : indices) {
189-
out.writeString(index);
190-
}
200+
out.writeStringArray(indices);
191201
out.writeOptionalString(routing);
192202
out.writeOptionalString(preference);
193203
out.writeOptionalWriteable(scroll);
@@ -204,6 +214,9 @@ public void writeTo(StreamOutput out) throws IOException {
204214
//TODO update version after backport
205215
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
206216
out.writeOptionalString(localClusterAlias);
217+
if (localClusterAlias != null) {
218+
out.writeVLong(absoluteStartMillis);
219+
}
207220
}
208221
}
209222

@@ -243,6 +256,17 @@ String getLocalClusterAlias() {
243256
return localClusterAlias;
244257
}
245258

259+
/**
260+
* Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to
261+
* ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search
262+
* request. When created through {@link #SearchRequest(String, long)}, this method returns the provided current time, otherwise
263+
* it will return {@link System#currentTimeMillis()}.
264+
*
265+
*/
266+
long getOrCreateAbsoluteStartMillis() {
267+
return absoluteStartMillis == DEFAULT_ABSOLUTE_START_MILLIS ? System.currentTimeMillis() : absoluteStartMillis;
268+
}
269+
246270
/**
247271
* Sets the indices the search will be executed on.
248272
*/
@@ -435,7 +459,6 @@ public Boolean allowPartialSearchResults() {
435459
return this.allowPartialSearchResults;
436460
}
437461

438-
439462
/**
440463
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
441464
* mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
@@ -498,13 +521,6 @@ public int getPreFilterShardSize() {
498521
return preFilterShardSize;
499522
}
500523

501-
/**
502-
* Returns <code>true</code> iff the maxConcurrentShardRequest is set.
503-
*/
504-
boolean isMaxConcurrentShardRequestsSet() {
505-
return maxConcurrentShardRequests != 0;
506-
}
507-
508524
/**
509525
* @return true if the request only has suggest
510526
*/
@@ -538,7 +554,7 @@ public String getDescription() {
538554
}
539555

540556
@Override
541-
public void readFrom(StreamInput in) throws IOException {
557+
public void readFrom(StreamInput in) {
542558
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
543559
}
544560

@@ -564,14 +580,15 @@ public boolean equals(Object o) {
564580
Objects.equals(preFilterShardSize, that.preFilterShardSize) &&
565581
Objects.equals(indicesOptions, that.indicesOptions) &&
566582
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) &&
567-
Objects.equals(localClusterAlias, that.localClusterAlias);
583+
Objects.equals(localClusterAlias, that.localClusterAlias) &&
584+
absoluteStartMillis == that.absoluteStartMillis;
568585
}
569586

570587
@Override
571588
public int hashCode() {
572589
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
573590
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
574-
allowPartialSearchResults, localClusterAlias);
591+
allowPartialSearchResults, localClusterAlias, absoluteStartMillis);
575592
}
576593

577594
@Override
@@ -590,6 +607,7 @@ public String toString() {
590607
", preFilterShardSize=" + preFilterShardSize +
591608
", allowPartialSearchResults=" + allowPartialSearchResults +
592609
", localClusterAlias=" + localClusterAlias +
610+
", getOrCreateAbsoluteStartMillis=" + absoluteStartMillis +
593611
", source=" + source + '}';
594612
}
595613
}

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,10 +180,9 @@ long getRelativeCurrentNanos() {
180180

181181
@Override
182182
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
183-
final long absoluteStartMillis = System.currentTimeMillis();
184183
final long relativeStartNanos = System.nanoTime();
185184
final SearchTimeProvider timeProvider =
186-
new SearchTimeProvider(absoluteStartMillis, relativeStartNanos, System::nanoTime);
185+
new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
187186
ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
188187
if (source != searchRequest.source()) {
189188
// only set it if it changed - we don't allow null values to be set but it might be already null be we want to catch

server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@
4040
import java.util.List;
4141

4242
import static org.elasticsearch.test.EqualsHashCodeTestUtils.checkEqualsAndHashCode;
43+
import static org.hamcrest.Matchers.allOf;
44+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
45+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
4346

4447
public class SearchRequestTests extends AbstractSearchTestCase {
4548

@@ -48,12 +51,19 @@ protected SearchRequest createSearchRequest() throws IOException {
4851
if (randomBoolean()) {
4952
return super.createSearchRequest();
5053
}
51-
//clusterAlias does not have public getter/setter hence we randomize it only in this test specifically.
52-
SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10));
54+
//clusterAlias and absoluteStartMillis do not have public getters/setters hence we randomize them only in this test specifically.
55+
SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong());
5356
RandomSearchRequestGenerator.randomSearchRequest(searchRequest, this::createSearchSourceBuilder);
5457
return searchRequest;
5558
}
5659

60+
public void testClusterAliasValidation() {
61+
expectThrows(NullPointerException.class, () -> new SearchRequest(null, 0));
62+
expectThrows(IllegalArgumentException.class, () -> new SearchRequest("", -1));
63+
SearchRequest searchRequest = new SearchRequest("", 0);
64+
assertNull(searchRequest.validate());
65+
}
66+
5767
public void testSerialization() throws Exception {
5868
SearchRequest searchRequest = createSearchRequest();
5969
SearchRequest deserializedRequest = copyWriteable(searchRequest, namedWriteableRegistry, SearchRequest::new);
@@ -69,22 +79,32 @@ public void testClusterAliasSerialization() throws IOException {
6979
//TODO update version after backport
7080
if (version.before(Version.V_7_0_0)) {
7181
assertNull(deserializedRequest.getLocalClusterAlias());
82+
assertAbsoluteStartMillisIsCurrentTime(deserializedRequest);
7283
} else {
7384
assertEquals(searchRequest.getLocalClusterAlias(), deserializedRequest.getLocalClusterAlias());
85+
assertEquals(searchRequest.getOrCreateAbsoluteStartMillis(), deserializedRequest.getOrCreateAbsoluteStartMillis());
7486
}
7587
}
7688

7789
//TODO rename and update version after backport
7890
public void testReadFromPre7_0_0() throws IOException {
7991
String msg = "AAEBBWluZGV4AAAAAQACAAAA/////w8AAAAAAAAA/////w8AAAAAAAACAAAAAAABAAMCBAUBAAKABACAAQIAAA==";
8092
try (StreamInput in = StreamInput.wrap(Base64.getDecoder().decode(msg))) {
81-
in.setVersion(Version.V_6_6_0);
93+
in.setVersion(VersionUtils.randomVersionBetween(random(), Version.V_6_4_0, VersionUtils.getPreviousVersion(Version.V_7_0_0)));
8294
SearchRequest searchRequest = new SearchRequest(in);
8395
assertArrayEquals(new String[]{"index"}, searchRequest.indices());
8496
assertNull(searchRequest.getLocalClusterAlias());
97+
assertAbsoluteStartMillisIsCurrentTime(searchRequest);
8598
}
8699
}
87100

101+
private static void assertAbsoluteStartMillisIsCurrentTime(SearchRequest searchRequest) {
102+
long before = System.currentTimeMillis();
103+
long absoluteStartMillis = searchRequest.getOrCreateAbsoluteStartMillis();
104+
long after = System.currentTimeMillis();
105+
assertThat(absoluteStartMillis, allOf(greaterThanOrEqualTo(before), lessThanOrEqualTo(after)));
106+
}
107+
88108
public void testIllegalArguments() {
89109
SearchRequest searchRequest = new SearchRequest();
90110
assertNotNull(searchRequest.indices());

server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,18 @@
2121

2222
import org.elasticsearch.action.index.IndexRequest;
2323
import org.elasticsearch.action.index.IndexResponse;
24+
import org.elasticsearch.action.support.IndicesOptions;
2425
import org.elasticsearch.action.support.WriteRequest;
26+
import org.elasticsearch.index.query.RangeQueryBuilder;
2527
import org.elasticsearch.rest.RestStatus;
2628
import org.elasticsearch.search.SearchHit;
29+
import org.elasticsearch.search.builder.SearchSourceBuilder;
2730
import org.elasticsearch.test.ESSingleNodeTestCase;
2831

2932
public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {
3033

3134
public void testLocalClusterAlias() {
35+
long nowInMillis = System.currentTimeMillis();
3236
IndexRequest indexRequest = new IndexRequest("test");
3337
indexRequest.id("1");
3438
indexRequest.source("field", "value");
@@ -37,7 +41,7 @@ public void testLocalClusterAlias() {
3741
assertEquals(RestStatus.CREATED, indexResponse.status());
3842

3943
{
40-
SearchRequest searchRequest = new SearchRequest("local");
44+
SearchRequest searchRequest = new SearchRequest("local", nowInMillis);
4145
SearchResponse searchResponse = client().search(searchRequest).actionGet();
4246
assertEquals(1, searchResponse.getHits().getTotalHits().value);
4347
SearchHit[] hits = searchResponse.getHits().getHits();
@@ -48,7 +52,7 @@ public void testLocalClusterAlias() {
4852
assertEquals("1", hit.getId());
4953
}
5054
{
51-
SearchRequest searchRequest = new SearchRequest("");
55+
SearchRequest searchRequest = new SearchRequest("", nowInMillis);
5256
SearchResponse searchResponse = client().search(searchRequest).actionGet();
5357
assertEquals(1, searchResponse.getHits().getTotalHits().value);
5458
SearchHit[] hits = searchResponse.getHits().getHits();
@@ -59,4 +63,58 @@ public void testLocalClusterAlias() {
5963
assertEquals("1", hit.getId());
6064
}
6165
}
66+
67+
public void testAbsoluteStartMillis() {
68+
{
69+
IndexRequest indexRequest = new IndexRequest("test-1970.01.01");
70+
indexRequest.id("1");
71+
indexRequest.source("date", "1970-01-01");
72+
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
73+
IndexResponse indexResponse = client().index(indexRequest).actionGet();
74+
assertEquals(RestStatus.CREATED, indexResponse.status());
75+
}
76+
{
77+
IndexRequest indexRequest = new IndexRequest("test-1982.01.01");
78+
indexRequest.id("1");
79+
indexRequest.source("date", "1982-01-01");
80+
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
81+
IndexResponse indexResponse = client().index(indexRequest).actionGet();
82+
assertEquals(RestStatus.CREATED, indexResponse.status());
83+
}
84+
{
85+
SearchRequest searchRequest = new SearchRequest();
86+
SearchResponse searchResponse = client().search(searchRequest).actionGet();
87+
assertEquals(2, searchResponse.getHits().getTotalHits().value);
88+
}
89+
{
90+
SearchRequest searchRequest = new SearchRequest("<test-{now/d}>");
91+
searchRequest.indicesOptions(IndicesOptions.fromOptions(true, true, true, true));
92+
SearchResponse searchResponse = client().search(searchRequest).actionGet();
93+
assertEquals(0, searchResponse.getTotalShards());
94+
}
95+
{
96+
SearchRequest searchRequest = new SearchRequest("", 0);
97+
SearchResponse searchResponse = client().search(searchRequest).actionGet();
98+
assertEquals(2, searchResponse.getHits().getTotalHits().value);
99+
}
100+
{
101+
SearchRequest searchRequest = new SearchRequest("", 0);
102+
searchRequest.indices("<test-{now/d}>");
103+
SearchResponse searchResponse = client().search(searchRequest).actionGet();
104+
assertEquals(1, searchResponse.getHits().getTotalHits().value);
105+
assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
106+
}
107+
{
108+
SearchRequest searchRequest = new SearchRequest("", 0);
109+
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
110+
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("date");
111+
rangeQuery.gte("1970-01-01");
112+
rangeQuery.lt("1982-01-01");
113+
sourceBuilder.query(rangeQuery);
114+
searchRequest.source(sourceBuilder);
115+
SearchResponse searchResponse = client().search(searchRequest).actionGet();
116+
assertEquals(1, searchResponse.getHits().getTotalHits().value);
117+
assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
118+
}
119+
}
62120
}

0 commit comments

Comments
 (0)