Skip to content

Commit cd3d228

Browse files
committed
Add support for providing absolute start time to SearchRequest (#37142)
We have recently added support for providing a local cluster alias to a SearchRequest through a package protected constructor. When executing cross-cluster search requests with local reduction on each cluster, the CCS coordinating node will have to provide such cluster alias to each remote cluster, as well as the absolute start time of the search action in milliseconds from the time epoch, to be used when evaluating date math expressions both while executing queries / scripts as well as when resolving index names. This commit adds support for providing the start time together with the cluster alias. It is a final member in the search request, which will only be set when using cross-cluster search with local reduction (also known as alternate execution mode). When not provided, the coordinating node will determine the current time and pass it through (by calling `System.currentTimeMillis`). Relates to #32125
1 parent ea90c96 commit cd3d228

File tree

4 files changed

+121
-14
lines changed

4 files changed

+121
-14
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchRequest.java

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,10 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
6565
public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 128;
6666
public static final int DEFAULT_BATCHED_REDUCE_SIZE = 512;
6767

68+
private static final long DEFAULT_ABSOLUTE_START_MILLIS = -1;
69+
6870
private String localClusterAlias;
71+
private long absoluteStartMillis;
6972

7073
private SearchType searchType = SearchType.DEFAULT;
7174

@@ -98,6 +101,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
98101

99102
public SearchRequest() {
100103
this.localClusterAlias = null;
104+
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
101105
}
102106

103107
/**
@@ -118,6 +122,7 @@ public SearchRequest(SearchRequest searchRequest) {
118122
this.source = searchRequest.source;
119123
this.types = searchRequest.types;
120124
this.localClusterAlias = searchRequest.localClusterAlias;
125+
this.absoluteStartMillis = searchRequest.absoluteStartMillis;
121126
}
122127

123128
/**
@@ -141,12 +146,17 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {
141146
}
142147

143148
/**
144-
* Creates a new search request by providing the alias of the cluster where it will be executed. Used when a {@link SearchRequest}
145-
* is created and executed as part of a cross-cluster search request performing local reduction on each cluster.
146-
* The coordinating CCS node provides the alias to prefix index names with in the returned search results.
149+
* Creates a new search request by providing the alias of the cluster where it will be executed, as well as the current time in
150+
* milliseconds from the epoch time. Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search
151+
* request performing local reduction on each cluster. The coordinating CCS node provides the alias to prefix index names with in
152+
* the returned search results, and the current time to be used on the remote clusters to ensure that the same value is used.
147153
*/
148-
SearchRequest(String localClusterAlias) {
154+
SearchRequest(String localClusterAlias, long absoluteStartMillis) {
149155
this.localClusterAlias = Objects.requireNonNull(localClusterAlias, "cluster alias must not be null");
156+
if (absoluteStartMillis < 0) {
157+
throw new IllegalArgumentException("absoluteStartMillis must not be negative but was [" + absoluteStartMillis + "]");
158+
}
159+
this.absoluteStartMillis = absoluteStartMillis;
150160
}
151161

152162
@Override
@@ -185,6 +195,17 @@ String getLocalClusterAlias() {
185195
return localClusterAlias;
186196
}
187197

198+
/**
199+
* Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to
200+
* ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search
201+
* request. When created through {@link #SearchRequest(String, long)}, this method returns the provided current time, otherwise
202+
* it will return {@link System#currentTimeMillis()}.
203+
*
204+
*/
205+
long getOrCreateAbsoluteStartMillis() {
206+
return absoluteStartMillis == DEFAULT_ABSOLUTE_START_MILLIS ? System.currentTimeMillis() : absoluteStartMillis;
207+
}
208+
188209
/**
189210
* Sets the indices the search will be executed on.
190211
*/
@@ -367,8 +388,7 @@ public SearchRequest allowPartialSearchResults(boolean allowPartialSearchResults
367388

368389
public Boolean allowPartialSearchResults() {
369390
return this.allowPartialSearchResults;
370-
}
371-
391+
}
372392

373393
/**
374394
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
@@ -496,8 +516,14 @@ public void readFrom(StreamInput in) throws IOException {
496516
}
497517
if (in.getVersion().onOrAfter(Version.V_6_7_0)) {
498518
localClusterAlias = in.readOptionalString();
519+
if (localClusterAlias != null) {
520+
absoluteStartMillis = in.readVLong();
521+
} else {
522+
absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
523+
}
499524
} else {
500525
localClusterAlias = null;
526+
absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
501527
}
502528
}
503529

@@ -526,6 +552,9 @@ public void writeTo(StreamOutput out) throws IOException {
526552
}
527553
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
528554
out.writeOptionalString(localClusterAlias);
555+
if (localClusterAlias != null) {
556+
out.writeVLong(absoluteStartMillis);
557+
}
529558
}
530559
}
531560

@@ -551,14 +580,15 @@ public boolean equals(Object o) {
551580
Objects.equals(preFilterShardSize, that.preFilterShardSize) &&
552581
Objects.equals(indicesOptions, that.indicesOptions) &&
553582
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) &&
554-
Objects.equals(localClusterAlias, that.localClusterAlias);
583+
Objects.equals(localClusterAlias, that.localClusterAlias) &&
584+
absoluteStartMillis == that.absoluteStartMillis;
555585
}
556586

557587
@Override
558588
public int hashCode() {
559589
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
560590
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
561-
allowPartialSearchResults, localClusterAlias);
591+
allowPartialSearchResults, localClusterAlias, absoluteStartMillis);
562592
}
563593

564594
@Override
@@ -577,6 +607,7 @@ public String toString() {
577607
", preFilterShardSize=" + preFilterShardSize +
578608
", allowPartialSearchResults=" + allowPartialSearchResults +
579609
", localClusterAlias=" + localClusterAlias +
610+
", getOrCreateAbsoluteStartMillis=" + absoluteStartMillis +
580611
", source=" + source + '}';
581612
}
582613
}

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,9 @@ long getRelativeCurrentNanos() {
177177

178178
@Override
179179
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
180-
final long absoluteStartMillis = System.currentTimeMillis();
181180
final long relativeStartNanos = System.nanoTime();
182181
final SearchTimeProvider timeProvider =
183-
new SearchTimeProvider(absoluteStartMillis, relativeStartNanos, System::nanoTime);
182+
new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
184183
ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
185184
if (source != searchRequest.source()) {
186185
// only set it if it changed - we don't allow null values to be set but it might be already null be we want to catch

server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@
4040
import java.util.List;
4141

4242
import static org.elasticsearch.test.EqualsHashCodeTestUtils.checkEqualsAndHashCode;
43+
import static org.hamcrest.Matchers.allOf;
44+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
45+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
4346

4447
public class SearchRequestTests extends AbstractSearchTestCase {
4548

@@ -48,12 +51,19 @@ protected SearchRequest createSearchRequest() throws IOException {
4851
if (randomBoolean()) {
4952
return super.createSearchRequest();
5053
}
51-
//clusterAlias does not have public getter/setter hence we randomize it only in this test specifically.
52-
SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10));
54+
//clusterAlias and absoluteStartMillis do not have public getters/setters hence we randomize them only in this test specifically.
55+
SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong());
5356
RandomSearchRequestGenerator.randomSearchRequest(searchRequest, this::createSearchSourceBuilder);
5457
return searchRequest;
5558
}
5659

60+
public void testClusterAliasValidation() {
61+
expectThrows(NullPointerException.class, () -> new SearchRequest(null, 0));
62+
expectThrows(IllegalArgumentException.class, () -> new SearchRequest("", -1));
63+
SearchRequest searchRequest = new SearchRequest("", 0);
64+
assertNull(searchRequest.validate());
65+
}
66+
5767
public void testSerialization() throws Exception {
5868
SearchRequest searchRequest = createSearchRequest();
5969
SearchRequest deserializedRequest = copyStreamable(searchRequest, namedWriteableRegistry, SearchRequest::new, Version.CURRENT);
@@ -68,21 +78,32 @@ public void testClusterAliasSerialization() throws IOException {
6878
SearchRequest deserializedRequest = copyStreamable(searchRequest, namedWriteableRegistry, SearchRequest::new, version);
6979
if (version.before(Version.V_6_7_0)) {
7080
assertNull(deserializedRequest.getLocalClusterAlias());
81+
assertAbsoluteStartMillisIsCurrentTime(deserializedRequest);
7182
} else {
7283
assertEquals(searchRequest.getLocalClusterAlias(), deserializedRequest.getLocalClusterAlias());
84+
assertEquals(searchRequest.getOrCreateAbsoluteStartMillis(), deserializedRequest.getOrCreateAbsoluteStartMillis());
7385
}
7486
}
7587

7688
public void testReadFromPre6_7_0() throws IOException {
7789
String msg = "AAEBBWluZGV4AAAAAQACAAAA/////w8AAAAAAAAA/////w8AAAAAAAACAAAAAAABAAMCBAUBAAKABACAAQIAAA==";
7890
try (StreamInput in = StreamInput.wrap(Base64.getDecoder().decode(msg))) {
91+
in.setVersion(VersionUtils.randomVersionBetween(random(), Version.V_6_4_0, VersionUtils.getPreviousVersion(Version.V_6_7_0)));
7992
SearchRequest searchRequest = new SearchRequest();
8093
searchRequest.readFrom(in);
8194
assertArrayEquals(new String[]{"index"}, searchRequest.indices());
8295
assertNull(searchRequest.getLocalClusterAlias());
96+
assertAbsoluteStartMillisIsCurrentTime(searchRequest);
8397
}
8498
}
8599

100+
private static void assertAbsoluteStartMillisIsCurrentTime(SearchRequest searchRequest) {
101+
long before = System.currentTimeMillis();
102+
long absoluteStartMillis = searchRequest.getOrCreateAbsoluteStartMillis();
103+
long after = System.currentTimeMillis();
104+
assertThat(absoluteStartMillis, allOf(greaterThanOrEqualTo(before), lessThanOrEqualTo(after)));
105+
}
106+
86107
public void testIllegalArguments() {
87108
SearchRequest searchRequest = new SearchRequest();
88109
assertNotNull(searchRequest.indices());

server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,26 @@
2121

2222
import org.elasticsearch.action.index.IndexRequest;
2323
import org.elasticsearch.action.index.IndexResponse;
24+
import org.elasticsearch.action.support.IndicesOptions;
2425
import org.elasticsearch.action.support.WriteRequest;
26+
import org.elasticsearch.index.query.RangeQueryBuilder;
2527
import org.elasticsearch.rest.RestStatus;
2628
import org.elasticsearch.search.SearchHit;
29+
import org.elasticsearch.search.builder.SearchSourceBuilder;
2730
import org.elasticsearch.test.ESSingleNodeTestCase;
2831

2932
public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {
3033

3134
public void testLocalClusterAlias() {
35+
long nowInMillis = System.currentTimeMillis();
3236
IndexRequest indexRequest = new IndexRequest("test", "type", "1");
3337
indexRequest.source("field", "value");
3438
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
3539
IndexResponse indexResponse = client().index(indexRequest).actionGet();
3640
assertEquals(RestStatus.CREATED, indexResponse.status());
3741

3842
{
39-
SearchRequest searchRequest = new SearchRequest("local");
43+
SearchRequest searchRequest = new SearchRequest("local", nowInMillis);
4044
SearchResponse searchResponse = client().search(searchRequest).actionGet();
4145
assertEquals(1, searchResponse.getHits().getTotalHits());
4246
SearchHit[] hits = searchResponse.getHits().getHits();
@@ -47,7 +51,7 @@ public void testLocalClusterAlias() {
4751
assertEquals("1", hit.getId());
4852
}
4953
{
50-
SearchRequest searchRequest = new SearchRequest("");
54+
SearchRequest searchRequest = new SearchRequest("", nowInMillis);
5155
SearchResponse searchResponse = client().search(searchRequest).actionGet();
5256
assertEquals(1, searchResponse.getHits().getTotalHits());
5357
SearchHit[] hits = searchResponse.getHits().getHits();
@@ -58,4 +62,56 @@ public void testLocalClusterAlias() {
5862
assertEquals("1", hit.getId());
5963
}
6064
}
65+
66+
public void testAbsoluteStartMillis() {
67+
{
68+
IndexRequest indexRequest = new IndexRequest("test-1970.01.01", "type", "1");
69+
indexRequest.source("date", "1970-01-01");
70+
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
71+
IndexResponse indexResponse = client().index(indexRequest).actionGet();
72+
assertEquals(RestStatus.CREATED, indexResponse.status());
73+
}
74+
{
75+
IndexRequest indexRequest = new IndexRequest("test-1982.01.01", "type", "1");
76+
indexRequest.source("date", "1982-01-01");
77+
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
78+
IndexResponse indexResponse = client().index(indexRequest).actionGet();
79+
assertEquals(RestStatus.CREATED, indexResponse.status());
80+
}
81+
{
82+
SearchRequest searchRequest = new SearchRequest();
83+
SearchResponse searchResponse = client().search(searchRequest).actionGet();
84+
assertEquals(2, searchResponse.getHits().getTotalHits());
85+
}
86+
{
87+
SearchRequest searchRequest = new SearchRequest("<test-{now/d}>");
88+
searchRequest.indicesOptions(IndicesOptions.fromOptions(true, true, true, true));
89+
SearchResponse searchResponse = client().search(searchRequest).actionGet();
90+
assertEquals(0, searchResponse.getTotalShards());
91+
}
92+
{
93+
SearchRequest searchRequest = new SearchRequest("", 0);
94+
SearchResponse searchResponse = client().search(searchRequest).actionGet();
95+
assertEquals(2, searchResponse.getHits().getTotalHits());
96+
}
97+
{
98+
SearchRequest searchRequest = new SearchRequest("", 0);
99+
searchRequest.indices("<test-{now/d}>");
100+
SearchResponse searchResponse = client().search(searchRequest).actionGet();
101+
assertEquals(1, searchResponse.getHits().getTotalHits());
102+
assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
103+
}
104+
{
105+
SearchRequest searchRequest = new SearchRequest("", 0);
106+
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
107+
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("date");
108+
rangeQuery.gte("1970-01-01");
109+
rangeQuery.lt("1982-01-01");
110+
sourceBuilder.query(rangeQuery);
111+
searchRequest.source(sourceBuilder);
112+
SearchResponse searchResponse = client().search(searchRequest).actionGet();
113+
assertEquals(1, searchResponse.getHits().getTotalHits());
114+
assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
115+
}
116+
}
61117
}

0 commit comments

Comments
 (0)