diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java index 5edb42779a2c9..8a4d0330d76e9 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java @@ -35,46 +35,69 @@ public class TransformCheckpointingInfo { public static final ParseField NEXT_CHECKPOINT = new ParseField("next", "in_progress"); public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind"); public static final ParseField CHANGES_LAST_DETECTED_AT = new ParseField("changes_last_detected_at"); + public static final ParseField LAST_SEARCH_TIME = new ParseField("last_search_time"); private final TransformCheckpointStats last; private final TransformCheckpointStats next; private final long operationsBehind; private final Instant changesLastDetectedAt; - - private static final ConstructingObjectParser LENIENT_PARSER = - new ConstructingObjectParser<>( - "transform_checkpointing_info", - true, - a -> { - long behind = a[2] == null ? 0L : (Long) a[2]; - Instant changesLastDetectedAt = (Instant)a[3]; - return new TransformCheckpointingInfo( - a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0], - a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1], - behind, - changesLastDetectedAt); - }); + private final Instant lastSearchTime; + + private static final ConstructingObjectParser LENIENT_PARSER = new ConstructingObjectParser<>( + "transform_checkpointing_info", + true, + a -> { + long behind = a[2] == null ? 0L : (Long) a[2]; + Instant changesLastDetectedAt = (Instant) a[3]; + Instant lastSearchTime = (Instant) a[4]; + return new TransformCheckpointingInfo( + a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0], + a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1], + behind, + changesLastDetectedAt, + lastSearchTime + ); + } + ); static { - LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), - (p, c) -> TransformCheckpointStats.fromXContent(p), LAST_CHECKPOINT); - LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), - (p, c) -> TransformCheckpointStats.fromXContent(p), NEXT_CHECKPOINT); + LENIENT_PARSER.declareObject( + ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> TransformCheckpointStats.fromXContent(p), + LAST_CHECKPOINT + ); + LENIENT_PARSER.declareObject( + ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> TransformCheckpointStats.fromXContent(p), + NEXT_CHECKPOINT + ); LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), OPERATIONS_BEHIND); - LENIENT_PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), + LENIENT_PARSER.declareField( + ConstructingObjectParser.optionalConstructorArg(), p -> TimeUtil.parseTimeFieldToInstant(p, CHANGES_LAST_DETECTED_AT.getPreferredName()), CHANGES_LAST_DETECTED_AT, - ObjectParser.ValueType.VALUE); + ObjectParser.ValueType.VALUE + ); + LENIENT_PARSER.declareField( + ConstructingObjectParser.optionalConstructorArg(), + p -> TimeUtil.parseTimeFieldToInstant(p, LAST_SEARCH_TIME.getPreferredName()), + LAST_SEARCH_TIME, + ObjectParser.ValueType.VALUE + ); } - public TransformCheckpointingInfo(TransformCheckpointStats last, - TransformCheckpointStats next, - long operationsBehind, - Instant changesLastDetectedAt) { + public TransformCheckpointingInfo( + TransformCheckpointStats last, + TransformCheckpointStats next, + long operationsBehind, + Instant changesLastDetectedAt, + Instant lastSearchTime + ) { this.last = Objects.requireNonNull(last); this.next = Objects.requireNonNull(next); this.operationsBehind = operationsBehind; this.changesLastDetectedAt = changesLastDetectedAt; + this.lastSearchTime = lastSearchTime; } public TransformCheckpointStats getLast() { @@ -94,13 +117,18 @@ public Instant getChangesLastDetectedAt() { return changesLastDetectedAt; } + @Nullable + public Instant getLastSearchTime() { + return lastSearchTime; + } + public static TransformCheckpointingInfo fromXContent(XContentParser p) { return LENIENT_PARSER.apply(p, null); } @Override public int hashCode() { - return Objects.hash(last, next, operationsBehind, changesLastDetectedAt); + return Objects.hash(last, next, operationsBehind, changesLastDetectedAt, lastSearchTime); } @Override @@ -115,10 +143,11 @@ public boolean equals(Object other) { TransformCheckpointingInfo that = (TransformCheckpointingInfo) other; - return Objects.equals(this.last, that.last) && - Objects.equals(this.next, that.next) && - this.operationsBehind == that.operationsBehind && - Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt); + return Objects.equals(this.last, that.last) + && Objects.equals(this.next, that.next) + && this.operationsBehind == that.operationsBehind + && Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt) + && Objects.equals(this.lastSearchTime, that.lastSearchTime); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java index 326ecd73a152c..73714088b442e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java @@ -30,12 +30,12 @@ public class TransformCheckpointingInfoTests extends ESTestCase { public void testFromXContent() throws IOException { - xContentTester(this::createParser, + xContentTester( + this::createParser, TransformCheckpointingInfoTests::randomTransformCheckpointingInfo, TransformCheckpointingInfoTests::toXContent, - TransformCheckpointingInfo::fromXContent) - .supportsUnknownFields(false) - .test(); + TransformCheckpointingInfo::fromXContent + ).supportsUnknownFields(false).test(); } public static TransformCheckpointingInfo randomTransformCheckpointingInfo() { @@ -43,7 +43,9 @@ public static TransformCheckpointingInfo randomTransformCheckpointingInfo() { TransformCheckpointStatsTests.randomTransformCheckpointStats(), TransformCheckpointStatsTests.randomTransformCheckpointStats(), randomLongBetween(0, 10000), - randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong())); + randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()), + randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()) + ); } public static void toXContent(TransformCheckpointingInfo info, XContentBuilder builder) throws IOException { @@ -60,6 +62,9 @@ public static void toXContent(TransformCheckpointingInfo info, XContentBuilder b if (info.getChangesLastDetectedAt() != null) { builder.field(TransformCheckpointingInfo.CHANGES_LAST_DETECTED_AT.getPreferredName(), info.getChangesLastDetectedAt()); } + if (info.getLastSearchTime() != null) { + builder.field(TransformCheckpointingInfo.LAST_SEARCH_TIME.getPreferredName(), info.getLastSearchTime()); + } builder.endObject(); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/TransformCheckpointingInfoTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/TransformCheckpointingInfoTests.java index ca1ca32ede911..0d8081b23dff0 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/TransformCheckpointingInfoTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/TransformCheckpointingInfoTests.java @@ -31,19 +31,22 @@ public class TransformCheckpointingInfoTests extends AbstractResponseTestCase< org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo, - TransformCheckpointingInfo> { + TransformCheckpointingInfo> { public static org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo randomTransformCheckpointingInfo() { return new org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo( TransformCheckpointStatsTests.randomTransformCheckpointStats(), TransformCheckpointStatsTests.randomTransformCheckpointStats(), randomNonNegativeLong(), - randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong())); + randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()), + randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()) + ); } @Override - protected org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo - createServerTestInstance(XContentType xContentType) { + protected org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo createServerTestInstance( + XContentType xContentType + ) { return randomTransformCheckpointingInfo(); } @@ -53,8 +56,10 @@ protected TransformCheckpointingInfo doParseToClientInstance(XContentParser pars } @Override - protected void assertInstances(org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo serverTestInstance, - TransformCheckpointingInfo clientInstance) { + protected void assertInstances( + org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo serverTestInstance, + TransformCheckpointingInfo clientInstance + ) { assertTransformCheckpointInfo(serverTestInstance, clientInstance); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java index 8a2f04e1e82b0..41612bb05e959 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java @@ -41,6 +41,7 @@ public static class TransformCheckpointingInfoBuilder { private TransformCheckpoint nextCheckpoint; private TransformCheckpoint sourceCheckpoint; private Instant changesLastDetectedAt; + private Instant lastSearchTime; private long operationsBehind; public TransformCheckpointingInfoBuilder() {} @@ -76,7 +77,8 @@ public TransformCheckpointingInfo build() { nextCheckpoint.getTimeUpperBound() ), operationsBehind, - changesLastDetectedAt + changesLastDetectedAt, + lastSearchTime ); } @@ -122,6 +124,11 @@ public TransformCheckpointingInfoBuilder setChangesLastDetectedAt(Instant change return this; } + public TransformCheckpointingInfoBuilder setLastSearchTime(Instant lastSearchTime) { + this.lastSearchTime = lastSearchTime; + return this; + } + public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehind) { this.operationsBehind = operationsBehind; return this; @@ -133,6 +140,7 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi TransformCheckpointStats.EMPTY, TransformCheckpointStats.EMPTY, 0L, + null, null ); @@ -140,10 +148,12 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi public static final ParseField NEXT_CHECKPOINT = new ParseField("next"); public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind"); public static final ParseField CHANGES_LAST_DETECTED_AT = new ParseField("changes_last_detected_at"); + public static final ParseField LAST_SEARCH_TIME = new ParseField("last_search_time"); private final TransformCheckpointStats last; private final TransformCheckpointStats next; private final long operationsBehind; private final Instant changesLastDetectedAt; + private final Instant lastSearchTime; private static final ConstructingObjectParser LENIENT_PARSER = new ConstructingObjectParser<>( "data_frame_transform_checkpointing_info", @@ -151,11 +161,13 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi a -> { long behind = a[2] == null ? 0L : (Long) a[2]; Instant changesLastDetectedAt = (Instant) a[3]; + Instant lastSearchTime = (Instant) a[4]; return new TransformCheckpointingInfo( a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0], a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1], behind, - changesLastDetectedAt + changesLastDetectedAt, + lastSearchTime ); } ); @@ -178,6 +190,12 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi CHANGES_LAST_DETECTED_AT, ObjectParser.ValueType.VALUE ); + LENIENT_PARSER.declareField( + ConstructingObjectParser.optionalConstructorArg(), + p -> TimeUtils.parseTimeFieldToInstant(p, LAST_SEARCH_TIME.getPreferredName()), + LAST_SEARCH_TIME, + ObjectParser.ValueType.VALUE + ); } /** @@ -187,18 +205,21 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi * @param last stats of the last checkpoint * @param next stats of the next checkpoint * @param operationsBehind counter of operations the current checkpoint is behind source - * @param changesLastDetectedAt the last time the source indices were checked for changes + * @param changesLastDetectedAt the last time the source indices changes have been found + * @param lastSearchTime the last time the source indices were searched */ public TransformCheckpointingInfo( TransformCheckpointStats last, TransformCheckpointStats next, long operationsBehind, - Instant changesLastDetectedAt + Instant changesLastDetectedAt, + Instant lastSearchTime ) { this.last = Objects.requireNonNull(last); this.next = Objects.requireNonNull(next); this.operationsBehind = operationsBehind; - this.changesLastDetectedAt = changesLastDetectedAt == null ? null : Instant.ofEpochMilli(changesLastDetectedAt.toEpochMilli()); + this.changesLastDetectedAt = changesLastDetectedAt; + this.lastSearchTime = lastSearchTime; } public TransformCheckpointingInfo(StreamInput in) throws IOException { @@ -210,6 +231,11 @@ public TransformCheckpointingInfo(StreamInput in) throws IOException { } else { changesLastDetectedAt = null; } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // todo: V_7_12_0 + lastSearchTime = in.readOptionalInstant(); + } else { + lastSearchTime = null; + } } public TransformCheckpointStats getLast() { @@ -228,6 +254,10 @@ public Instant getChangesLastDetectedAt() { return changesLastDetectedAt; } + public Instant getLastSearchTime() { + return lastSearchTime; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -245,6 +275,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws changesLastDetectedAt.toEpochMilli() ); } + if (lastSearchTime != null) { + builder.timeField( + LAST_SEARCH_TIME.getPreferredName(), + LAST_SEARCH_TIME.getPreferredName() + "_string", + lastSearchTime.toEpochMilli() + ); + } builder.endObject(); return builder; } @@ -257,6 +294,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_4_0)) { out.writeOptionalInstant(changesLastDetectedAt); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { // todo: V_7_12_0 + out.writeOptionalInstant(lastSearchTime); + } } public static TransformCheckpointingInfo fromXContent(XContentParser p) { @@ -265,7 +305,7 @@ public static TransformCheckpointingInfo fromXContent(XContentParser p) { @Override public int hashCode() { - return Objects.hash(last, next, operationsBehind, changesLastDetectedAt); + return Objects.hash(last, next, operationsBehind, changesLastDetectedAt, lastSearchTime); } @Override @@ -283,7 +323,8 @@ public boolean equals(Object other) { return Objects.equals(this.last, that.last) && Objects.equals(this.next, that.next) && this.operationsBehind == that.operationsBehind - && Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt); + && Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt) + && Objects.equals(this.lastSearchTime, that.lastSearchTime); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfoTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfoTests.java index 9bf487589c3a7..fa2c3eda335e6 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfoTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfoTests.java @@ -22,6 +22,7 @@ public static TransformCheckpointingInfo randomTransformCheckpointingInfo() { TransformCheckpointStatsTests.randomTransformCheckpointStats(), TransformCheckpointStatsTests.randomTransformCheckpointStats(), randomNonNegativeLong(), + randomBoolean() ? null : Instant.ofEpochMilli(randomLongBetween(1, 100000)), randomBoolean() ? null : Instant.ofEpochMilli(randomLongBetween(1, 100000)) ); } @@ -46,7 +47,8 @@ public void testBackwardsSerialization() throws IOException { TransformCheckpointStats.EMPTY, TransformCheckpointStats.EMPTY, randomNonNegativeLong(), - // changesLastDetectedAt is not serialized to past values, so when it is pulled back in, it will be null + // changesLastDetectedAt, lastSearchTime is not serialized to past values, so when it is pulled back in, it will be null + null, null ); try (BytesStreamOutput output = new BytesStreamOutput()) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java index cbaacf85277a6..15bd8daf4c1f7 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java @@ -75,6 +75,7 @@ public void testBwcWith73() throws IOException { new TransformCheckpointStats(0, null, null, 100, 1000), // changesLastDetectedAt aren't serialized back 100, + null, null ) ); @@ -103,6 +104,7 @@ public void testBwcWith76() throws IOException { new TransformCheckpointStats(0, null, null, 100, 1000), // changesLastDetectedAt aren't serialized back 100, + null, null ) ); diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/ContinuousTestCase.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/ContinuousTestCase.java index c7e1a0215b395..c20cb5e36e809 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/ContinuousTestCase.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/ContinuousTestCase.java @@ -42,6 +42,7 @@ public abstract class ContinuousTestCase extends ESRestTestCase { + public static final TimeValue SYNC_DELAY = new TimeValue(1, TimeUnit.SECONDS); public static final String CONTINUOUS_EVENTS_SOURCE_INDEX = "test-transform-continuous-events"; public static final String INGEST_PIPELINE = "transform-ingest"; public static final String MAX_RUN_FIELD = "run.max"; @@ -89,7 +90,7 @@ public abstract class ContinuousTestCase extends ESRestTestCase { protected TransformConfig.Builder addCommonBuilderParameters(TransformConfig.Builder builder) { return builder.setSyncConfig(getSyncConfig()) .setSettings(addCommonSetings(new SettingsConfig.Builder()).build()) - .setFrequency(new TimeValue(1, TimeUnit.SECONDS)); + .setFrequency(SYNC_DELAY); } protected AggregatorFactories.Builder addCommonAggregations(AggregatorFactories.Builder builder) { @@ -131,6 +132,6 @@ private static class TestRestHighLevelClient extends RestHighLevelClient { } private SyncConfig getSyncConfig() { - return new TimeSyncConfig("timestamp", new TimeValue(1, TimeUnit.SECONDS)); + return new TimeSyncConfig("timestamp", SYNC_DELAY); } } diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/HistogramGroupByIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/HistogramGroupByIT.java index a129179d58794..1ac619dc89bd9 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/HistogramGroupByIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/HistogramGroupByIT.java @@ -1,6 +1,5 @@ package org.elasticsearch.xpack.transform.integration.continuous; -import org.apache.lucene.util.LuceneTestCase.AwaitsFix; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; @@ -27,7 +26,6 @@ import static org.hamcrest.Matchers.equalTo; -@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/66410") public class HistogramGroupByIT extends ContinuousTestCase { private static final String NAME = "continuous-histogram-pivot-test"; diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java index a19d4a9602239..59393b0c20bec 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java @@ -16,7 +16,6 @@ import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.Response; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.core.AcknowledgedResponse; import org.elasticsearch.client.transform.DeleteTransformRequest; @@ -274,9 +273,7 @@ public void testContinousEvents() throws Exception { // start all transforms, wait until the processed all data and stop them startTransforms(); - // at random we added between 0 and 999_999ns == (1ms - 1ns) to every data point, so we add 1ms, so every data point is before - // the checkpoint - waitUntilTransformsReachedUpperBound(runDate.toEpochMilli() + 1, run); + waitUntilTransformsProcessedNewData(ContinuousTestCase.SYNC_DELAY, run); stopTransforms(); // TODO: the transform dest index requires a refresh, see gh#51154 @@ -495,11 +492,12 @@ private GetTransformStatsResponse getTransformStats(String id) throws IOExceptio } } - private void waitUntilTransformsReachedUpperBound(long timeStampUpperBoundMillis, int iteration) throws Exception { + private void waitUntilTransformsProcessedNewData(TimeValue delay, int iteration) throws Exception { + Instant waitUntil = Instant.now().plusMillis(delay.getMillis()); logger.info( - "wait until transform reaches timestamp_millis: {} iteration: {}", - ContinuousTestCase.STRICT_DATE_OPTIONAL_TIME_PRINTER_NANOS.withZone(ZoneId.of("UTC")) - .format(Instant.ofEpochMilli(timeStampUpperBoundMillis)), + "wait until transform reaches timestamp_millis: {} (takes into account the delay: {}) iteration: {}", + ContinuousTestCase.STRICT_DATE_OPTIONAL_TIME_PRINTER_NANOS.withZone(ZoneId.of("UTC")).format(waitUntil), + delay, iteration ); for (ContinuousTestCase testCase : transformTestCases) { @@ -512,8 +510,8 @@ private void waitUntilTransformsReachedUpperBound(long timeStampUpperBoundMillis + stats.getState() + ", reason: " + stats.getReason(), - stats.getCheckpointingInfo().getLast().getTimeUpperBoundMillis(), - greaterThan(timeStampUpperBoundMillis) + stats.getCheckpointingInfo().getLastSearchTime(), + greaterThan(waitUntil) ); }, 20, TimeUnit.SECONDS); } diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java index 7529d7d7848e2..72784bf98e326 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java @@ -266,7 +266,8 @@ public void testGetCheckpointStats() throws InterruptedException { new TransformCheckpointStats(1, null, null, timestamp, 0L), new TransformCheckpointStats(2, position, progress, timestamp + 100L, 0L), 30L, - Instant.ofEpochMilli(timestamp) + Instant.ofEpochMilli(timestamp), + null ); assertAsync( @@ -281,7 +282,8 @@ public void testGetCheckpointStats() throws InterruptedException { new TransformCheckpointStats(1, null, null, timestamp, 0L), new TransformCheckpointStats(2, position, progress, timestamp + 100L, 0L), 63L, - Instant.ofEpochMilli(timestamp) + Instant.ofEpochMilli(timestamp), + null ); assertAsync( listener -> getCheckpoint(transformCheckpointService, transformId, 1, position, progress, listener), @@ -296,7 +298,8 @@ public void testGetCheckpointStats() throws InterruptedException { new TransformCheckpointStats(1, null, null, timestamp, 0L), new TransformCheckpointStats(2, position, progress, timestamp + 100L, 0L), 0L, - Instant.ofEpochMilli(timestamp) + Instant.ofEpochMilli(timestamp), + null ); assertAsync( listener -> getCheckpoint(transformCheckpointService, transformId, 1, position, progress, listener), diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java index a3fd0b6385e86..83fa40605d2bb 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java @@ -29,6 +29,7 @@ public interface Listener { private volatile int numFailureRetries = Transform.DEFAULT_FAILURE_RETRIES; private final AtomicInteger failureCount; private volatile Instant changesLastDetectedAt; + private volatile Instant lastSearchTime; private volatile boolean shouldStopAtCheckpoint; // the checkpoint of this transform, storing the checkpoint until data indexing from source to dest is _complete_ @@ -107,6 +108,14 @@ Instant getChangesLastDetectedAt() { return changesLastDetectedAt; } + void setLastSearchTime(Instant time) { + lastSearchTime = time; + } + + Instant getLastSearchTime() { + return lastSearchTime; + } + public boolean shouldStopAtCheckpoint() { return shouldStopAtCheckpoint; } @@ -120,18 +129,16 @@ void shutdown() { } void markAsFailed(String failureMessage) { - taskListener - .fail( - failureMessage, - ActionListener - .wrap( - r -> { - // Successfully marked as failed, reset counter so that task can be restarted - failureCount.set(0); - }, - e -> {} - ) - ); + taskListener.fail( + failureMessage, + ActionListener.wrap( + r -> { + // Successfully marked as failed, reset counter so that task can be restarted + failureCount.set(0); + }, + e -> {} + ) + ); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index e0d75894d860b..ccdda9aedf447 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -333,13 +333,15 @@ protected void onStart(long now, ActionListener listener) { } }, listener::onFailure); + Instant instantOfTrigger = Instant.ofEpochMilli(now); // If we are not on the initial batch checkpoint and its the first pass of whatever continuous checkpoint we are on, // we should verify if there are local changes based on the sync config. If not, do not proceed further and exit. if (context.getCheckpoint() > 0 && initialRun()) { sourceHasChanged(ActionListener.wrap(hasChanged -> { + context.setLastSearchTime(instantOfTrigger); hasSourceChanged = hasChanged; if (hasChanged) { - context.setChangesLastDetectedAt(Instant.now()); + context.setChangesLastDetectedAt(instantOfTrigger); logger.debug("[{}] source has changed, triggering new indexer run.", getJobId()); changedSourceListener.onResponse(null); } else { @@ -355,6 +357,8 @@ protected void onStart(long now, ActionListener listener) { })); } else { hasSourceChanged = true; + context.setLastSearchTime(instantOfTrigger); + context.setChangesLastDetectedAt(instantOfTrigger); changedSourceListener.onResponse(null); } } @@ -876,15 +880,15 @@ private SearchSourceBuilder buildUpdateQuery(SearchSourceBuilder sourceBuilder) QueryBuilder queryBuilder = config.getSource().getQueryConfig().getQuery(); if (isContinuous()) { - BoolQueryBuilder filteredQuery = - new BoolQueryBuilder() - .filter(queryBuilder) - .filter(config.getSyncConfig().getRangeQuery(nextCheckpoint)); + BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(queryBuilder) + .filter(config.getSyncConfig().getRangeQuery(nextCheckpoint)); // Only apply extra filter if it is the subsequent run of the continuous transform if (nextCheckpoint.getCheckpoint() > 1 && changeCollector != null) { - QueryBuilder filter = - changeCollector.buildFilterQuery(lastCheckpoint.getTimeUpperBound(), nextCheckpoint.getTimeUpperBound()); + QueryBuilder filter = changeCollector.buildFilterQuery( + lastCheckpoint.getTimeUpperBound(), + nextCheckpoint.getTimeUpperBound() + ); if (filter != null) { filteredQuery.filter(filter); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java index 0d507b30cb174..e1fe60dde71c0 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java @@ -174,6 +174,9 @@ public void getCheckpointingInfo( if (context.getChangesLastDetectedAt() != null) { infoBuilder.setChangesLastDetectedAt(context.getChangesLastDetectedAt()); } + if (context.getLastSearchTime() != null) { + infoBuilder.setLastSearchTime(context.getLastSearchTime()); + } listener.onResponse(infoBuilder.build()); }, listener::onFailure); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java index 28b5d11633afe..fe99825392c4d 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java @@ -45,6 +45,7 @@ public void testDeriveStatsStopped() { new TransformCheckpointStats(1, null, null, 1, 1), new TransformCheckpointStats(2, null, null, 2, 5), 2, + Instant.now(), Instant.now() ); @@ -81,6 +82,7 @@ public void testDeriveStatsFailed() { new TransformCheckpointStats(1, null, null, 1, 1), new TransformCheckpointStats(2, null, null, 2, 5), 2, + Instant.now(), Instant.now() ); @@ -126,6 +128,7 @@ public void testDeriveStats() { new TransformCheckpointStats(1, null, null, 1, 1), new TransformCheckpointStats(2, null, null, 2, 5), 2, + Instant.now(), Instant.now() );