From bb28ef9229feee9cdd0f98f462b810c25845b909 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 21 Dec 2020 22:24:12 +0100 Subject: [PATCH 1/7] add last_searched_at field --- .../TransformCheckpointingInfo.java | 24 +++++++-- .../TransformCheckpointingInfoTests.java | 1 + .../hlrc/TransformCheckpointingInfoTests.java | 1 + .../TransformCheckpointingInfo.java | 54 ++++++++++++++++--- .../TransformCheckpointingInfoTests.java | 4 +- .../transforms/TransformStatsTests.java | 2 + .../continuous/ContinuousTestCase.java | 5 +- .../continuous/TransformContinuousIT.java | 15 +++--- .../transforms/TransformContext.java | 9 ++++ .../transforms/TransformIndexer.java | 6 ++- .../transform/transforms/TransformTask.java | 3 ++ 11 files changed, 103 insertions(+), 21 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java index 5edb42779a2c9..c501aeb3ebcb4 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java @@ -35,11 +35,13 @@ public class TransformCheckpointingInfo { public static final ParseField NEXT_CHECKPOINT = new ParseField("next", "in_progress"); public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind"); public static final ParseField CHANGES_LAST_DETECTED_AT = new ParseField("changes_last_detected_at"); + public static final ParseField CHANGES_LAST_SEARCHED_AT = new ParseField("changes_last_searched_at"); private final TransformCheckpointStats last; private final TransformCheckpointStats next; private final long operationsBehind; private final Instant changesLastDetectedAt; + private final Instant changesLastSearchedAt; private static final ConstructingObjectParser LENIENT_PARSER = new ConstructingObjectParser<>( @@ -48,11 +50,13 @@ public class TransformCheckpointingInfo { a -> { long behind = a[2] == null ? 0L : (Long) a[2]; Instant changesLastDetectedAt = (Instant)a[3]; + Instant changesLastSearchedAt = (Instant)a[4]; return new TransformCheckpointingInfo( a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0], a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1], behind, - changesLastDetectedAt); + changesLastDetectedAt, + changesLastSearchedAt); }); static { @@ -65,16 +69,22 @@ public class TransformCheckpointingInfo { p -> TimeUtil.parseTimeFieldToInstant(p, CHANGES_LAST_DETECTED_AT.getPreferredName()), CHANGES_LAST_DETECTED_AT, ObjectParser.ValueType.VALUE); + LENIENT_PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), + p -> TimeUtil.parseTimeFieldToInstant(p, CHANGES_LAST_SEARCHED_AT.getPreferredName()), + CHANGES_LAST_SEARCHED_AT, + ObjectParser.ValueType.VALUE); } public TransformCheckpointingInfo(TransformCheckpointStats last, TransformCheckpointStats next, long operationsBehind, - Instant changesLastDetectedAt) { + Instant changesLastDetectedAt, + Instant changesLastSearchedAt) { this.last = Objects.requireNonNull(last); this.next = Objects.requireNonNull(next); this.operationsBehind = operationsBehind; this.changesLastDetectedAt = changesLastDetectedAt; + this.changesLastSearchedAt = changesLastSearchedAt; } public TransformCheckpointStats getLast() { @@ -94,13 +104,18 @@ public Instant getChangesLastDetectedAt() { return changesLastDetectedAt; } + @Nullable + public Instant getChangesLastSearchedAt() { + return changesLastSearchedAt; + } + public static TransformCheckpointingInfo fromXContent(XContentParser p) { return LENIENT_PARSER.apply(p, null); } @Override public int hashCode() { - return Objects.hash(last, next, operationsBehind, changesLastDetectedAt); + return Objects.hash(last, next, operationsBehind, changesLastDetectedAt, changesLastSearchedAt); } @Override @@ -118,7 +133,8 @@ public boolean equals(Object other) { return Objects.equals(this.last, that.last) && Objects.equals(this.next, that.next) && this.operationsBehind == that.operationsBehind && - Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt); + Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt) && + Objects.equals(this.changesLastSearchedAt, that.changesLastSearchedAt); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java index 326ecd73a152c..92c5652c0f46d 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java @@ -43,6 +43,7 @@ public static TransformCheckpointingInfo randomTransformCheckpointingInfo() { TransformCheckpointStatsTests.randomTransformCheckpointStats(), TransformCheckpointStatsTests.randomTransformCheckpointStats(), randomLongBetween(0, 10000), + randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()), randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong())); } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/TransformCheckpointingInfoTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/TransformCheckpointingInfoTests.java index ca1ca32ede911..d2677bbfb62ec 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/TransformCheckpointingInfoTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/TransformCheckpointingInfoTests.java @@ -38,6 +38,7 @@ public static org.elasticsearch.xpack.core.transform.transforms.TransformCheckpo TransformCheckpointStatsTests.randomTransformCheckpointStats(), TransformCheckpointStatsTests.randomTransformCheckpointStats(), randomNonNegativeLong(), + randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()), randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong())); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java index 8a2f04e1e82b0..0174e5d163738 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java @@ -41,6 +41,7 @@ public static class TransformCheckpointingInfoBuilder { private TransformCheckpoint nextCheckpoint; private TransformCheckpoint sourceCheckpoint; private Instant changesLastDetectedAt; + private Instant changesLastSearchedAt; private long operationsBehind; public TransformCheckpointingInfoBuilder() {} @@ -76,7 +77,8 @@ public TransformCheckpointingInfo build() { nextCheckpoint.getTimeUpperBound() ), operationsBehind, - changesLastDetectedAt + changesLastDetectedAt, + changesLastSearchedAt ); } @@ -122,6 +124,11 @@ public TransformCheckpointingInfoBuilder setChangesLastDetectedAt(Instant change return this; } + public TransformCheckpointingInfoBuilder setChangesLastSearchedAt(Instant changesLastSearchedAt) { + this.changesLastSearchedAt = changesLastSearchedAt; + return this; + } + public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehind) { this.operationsBehind = operationsBehind; return this; @@ -133,6 +140,7 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi TransformCheckpointStats.EMPTY, TransformCheckpointStats.EMPTY, 0L, + null, null ); @@ -140,10 +148,12 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi public static final ParseField NEXT_CHECKPOINT = new ParseField("next"); public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind"); public static final ParseField CHANGES_LAST_DETECTED_AT = new ParseField("changes_last_detected_at"); + public static final ParseField CHANGES_LAST_SEARCHED_AT = new ParseField("changes_last_searched_at"); private final TransformCheckpointStats last; private final TransformCheckpointStats next; private final long operationsBehind; private final Instant changesLastDetectedAt; + private final Instant changesLastSearchedAt; private static final ConstructingObjectParser LENIENT_PARSER = new ConstructingObjectParser<>( "data_frame_transform_checkpointing_info", @@ -151,11 +161,13 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi a -> { long behind = a[2] == null ? 0L : (Long) a[2]; Instant changesLastDetectedAt = (Instant) a[3]; + Instant changesLastSearchedAt = (Instant) a[4]; return new TransformCheckpointingInfo( a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0], a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1], behind, - changesLastDetectedAt + changesLastDetectedAt, + changesLastSearchedAt ); } ); @@ -178,6 +190,12 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi CHANGES_LAST_DETECTED_AT, ObjectParser.ValueType.VALUE ); + LENIENT_PARSER.declareField( + ConstructingObjectParser.optionalConstructorArg(), + p -> TimeUtils.parseTimeFieldToInstant(p, CHANGES_LAST_SEARCHED_AT.getPreferredName()), + CHANGES_LAST_SEARCHED_AT, + ObjectParser.ValueType.VALUE + ); } /** @@ -187,18 +205,22 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi * @param last stats of the last checkpoint * @param next stats of the next checkpoint * @param operationsBehind counter of operations the current checkpoint is behind source - * @param changesLastDetectedAt the last time the source indices were checked for changes + * @param changesLastDetectedAt the last time the source indices changes have been found + * @param changesLastSearchedAt the last time the source indices were checked for changes */ public TransformCheckpointingInfo( TransformCheckpointStats last, TransformCheckpointStats next, long operationsBehind, - Instant changesLastDetectedAt + Instant changesLastDetectedAt, + Instant changesLastSearchedAt ) { this.last = Objects.requireNonNull(last); this.next = Objects.requireNonNull(next); this.operationsBehind = operationsBehind; + // todo: Instant is immutable, so a copy does not seem necessary this.changesLastDetectedAt = changesLastDetectedAt == null ? null : Instant.ofEpochMilli(changesLastDetectedAt.toEpochMilli()); + this.changesLastSearchedAt = changesLastSearchedAt == null ? null : Instant.ofEpochMilli(changesLastSearchedAt.toEpochMilli()); } public TransformCheckpointingInfo(StreamInput in) throws IOException { @@ -210,6 +232,11 @@ public TransformCheckpointingInfo(StreamInput in) throws IOException { } else { changesLastDetectedAt = null; } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // todo: V_7_12_0 + changesLastSearchedAt = in.readOptionalInstant(); + } else { + changesLastSearchedAt = null; + } } public TransformCheckpointStats getLast() { @@ -227,6 +254,10 @@ public long getOperationsBehind() { public Instant getChangesLastDetectedAt() { return changesLastDetectedAt; } + + public Instant getChangesLastSearchedAt() { + return changesLastSearchedAt; + } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { @@ -245,6 +276,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws changesLastDetectedAt.toEpochMilli() ); } + if (changesLastSearchedAt != null) { + builder.timeField( + CHANGES_LAST_SEARCHED_AT.getPreferredName(), + CHANGES_LAST_SEARCHED_AT.getPreferredName() + "_string", + changesLastSearchedAt.toEpochMilli() + ); + } builder.endObject(); return builder; } @@ -257,6 +295,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_4_0)) { out.writeOptionalInstant(changesLastDetectedAt); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { // todo: V_7_12_0 + out.writeOptionalInstant(changesLastSearchedAt); + } } public static TransformCheckpointingInfo fromXContent(XContentParser p) { @@ -265,7 +306,7 @@ public static TransformCheckpointingInfo fromXContent(XContentParser p) { @Override public int hashCode() { - return Objects.hash(last, next, operationsBehind, changesLastDetectedAt); + return Objects.hash(last, next, operationsBehind, changesLastDetectedAt, changesLastSearchedAt); } @Override @@ -283,7 +324,8 @@ public boolean equals(Object other) { return Objects.equals(this.last, that.last) && Objects.equals(this.next, that.next) && this.operationsBehind == that.operationsBehind - && Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt); + && Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt) + && Objects.equals(this.changesLastSearchedAt, that.changesLastSearchedAt); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfoTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfoTests.java index 9bf487589c3a7..33e67947d9690 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfoTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfoTests.java @@ -22,6 +22,7 @@ public static TransformCheckpointingInfo randomTransformCheckpointingInfo() { TransformCheckpointStatsTests.randomTransformCheckpointStats(), TransformCheckpointStatsTests.randomTransformCheckpointStats(), randomNonNegativeLong(), + randomBoolean() ? null : Instant.ofEpochMilli(randomLongBetween(1, 100000)), randomBoolean() ? null : Instant.ofEpochMilli(randomLongBetween(1, 100000)) ); } @@ -46,7 +47,8 @@ public void testBackwardsSerialization() throws IOException { TransformCheckpointStats.EMPTY, TransformCheckpointStats.EMPTY, randomNonNegativeLong(), - // changesLastDetectedAt is not serialized to past values, so when it is pulled back in, it will be null + // changesLastDetectedAt, changesLastSearchedAt is not serialized to past values, so when it is pulled back in, it will be null + null, null ); try (BytesStreamOutput output = new BytesStreamOutput()) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java index cbaacf85277a6..15bd8daf4c1f7 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java @@ -75,6 +75,7 @@ public void testBwcWith73() throws IOException { new TransformCheckpointStats(0, null, null, 100, 1000), // changesLastDetectedAt aren't serialized back 100, + null, null ) ); @@ -103,6 +104,7 @@ public void testBwcWith76() throws IOException { new TransformCheckpointStats(0, null, null, 100, 1000), // changesLastDetectedAt aren't serialized back 100, + null, null ) ); diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/ContinuousTestCase.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/ContinuousTestCase.java index c7e1a0215b395..c20cb5e36e809 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/ContinuousTestCase.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/ContinuousTestCase.java @@ -42,6 +42,7 @@ public abstract class ContinuousTestCase extends ESRestTestCase { + public static final TimeValue SYNC_DELAY = new TimeValue(1, TimeUnit.SECONDS); public static final String CONTINUOUS_EVENTS_SOURCE_INDEX = "test-transform-continuous-events"; public static final String INGEST_PIPELINE = "transform-ingest"; public static final String MAX_RUN_FIELD = "run.max"; @@ -89,7 +90,7 @@ public abstract class ContinuousTestCase extends ESRestTestCase { protected TransformConfig.Builder addCommonBuilderParameters(TransformConfig.Builder builder) { return builder.setSyncConfig(getSyncConfig()) .setSettings(addCommonSetings(new SettingsConfig.Builder()).build()) - .setFrequency(new TimeValue(1, TimeUnit.SECONDS)); + .setFrequency(SYNC_DELAY); } protected AggregatorFactories.Builder addCommonAggregations(AggregatorFactories.Builder builder) { @@ -131,6 +132,6 @@ private static class TestRestHighLevelClient extends RestHighLevelClient { } private SyncConfig getSyncConfig() { - return new TimeSyncConfig("timestamp", new TimeValue(1, TimeUnit.SECONDS)); + return new TimeSyncConfig("timestamp", SYNC_DELAY); } } diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java index a19d4a9602239..c96e50986606b 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java @@ -276,7 +276,7 @@ public void testContinousEvents() throws Exception { // at random we added between 0 and 999_999ns == (1ms - 1ns) to every data point, so we add 1ms, so every data point is before // the checkpoint - waitUntilTransformsReachedUpperBound(runDate.toEpochMilli() + 1, run); + waitUntilTransformsProcessedNewData(ContinuousTestCase.SYNC_DELAY, run); stopTransforms(); // TODO: the transform dest index requires a refresh, see gh#51154 @@ -495,11 +495,12 @@ private GetTransformStatsResponse getTransformStats(String id) throws IOExceptio } } - private void waitUntilTransformsReachedUpperBound(long timeStampUpperBoundMillis, int iteration) throws Exception { + private void waitUntilTransformsProcessedNewData(TimeValue delay, int iteration) throws Exception { + Instant waitUntil = Instant.now().plusMillis(delay.getMillis()); logger.info( - "wait until transform reaches timestamp_millis: {} iteration: {}", - ContinuousTestCase.STRICT_DATE_OPTIONAL_TIME_PRINTER_NANOS.withZone(ZoneId.of("UTC")) - .format(Instant.ofEpochMilli(timeStampUpperBoundMillis)), + "wait until transform reaches timestamp_millis: {} (takes into account the delay: {}) iteration: {}", + ContinuousTestCase.STRICT_DATE_OPTIONAL_TIME_PRINTER_NANOS.withZone(ZoneId.of("UTC")).format(waitUntil), + delay, iteration ); for (ContinuousTestCase testCase : transformTestCases) { @@ -512,8 +513,8 @@ private void waitUntilTransformsReachedUpperBound(long timeStampUpperBoundMillis + stats.getState() + ", reason: " + stats.getReason(), - stats.getCheckpointingInfo().getLast().getTimeUpperBoundMillis(), - greaterThan(timeStampUpperBoundMillis) + stats.getCheckpointingInfo().getChangesLastSearchedAt(), + greaterThan(waitUntil) ); }, 20, TimeUnit.SECONDS); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java index a3fd0b6385e86..934be61c8661e 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java @@ -29,6 +29,7 @@ public interface Listener { private volatile int numFailureRetries = Transform.DEFAULT_FAILURE_RETRIES; private final AtomicInteger failureCount; private volatile Instant changesLastDetectedAt; + private volatile Instant changesLastSearchedAt; private volatile boolean shouldStopAtCheckpoint; // the checkpoint of this transform, storing the checkpoint until data indexing from source to dest is _complete_ @@ -106,7 +107,15 @@ void setChangesLastDetectedAt(Instant time) { Instant getChangesLastDetectedAt() { return changesLastDetectedAt; } + + void setChangesLastSearchedAt(Instant time) { + changesLastSearchedAt = time; + } + Instant getChangesLastSearchedAt() { + return changesLastSearchedAt; + } + public boolean shouldStopAtCheckpoint() { return shouldStopAtCheckpoint; } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index e0d75894d860b..29da1657dc138 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -333,13 +333,15 @@ protected void onStart(long now, ActionListener listener) { } }, listener::onFailure); + Instant instantOfTrigger = Instant.ofEpochMilli(now); // If we are not on the initial batch checkpoint and its the first pass of whatever continuous checkpoint we are on, // we should verify if there are local changes based on the sync config. If not, do not proceed further and exit. if (context.getCheckpoint() > 0 && initialRun()) { sourceHasChanged(ActionListener.wrap(hasChanged -> { + context.setChangesLastSearchedAt(instantOfTrigger); hasSourceChanged = hasChanged; if (hasChanged) { - context.setChangesLastDetectedAt(Instant.now()); + context.setChangesLastDetectedAt(instantOfTrigger); logger.debug("[{}] source has changed, triggering new indexer run.", getJobId()); changedSourceListener.onResponse(null); } else { @@ -355,6 +357,8 @@ protected void onStart(long now, ActionListener listener) { })); } else { hasSourceChanged = true; + context.setChangesLastSearchedAt(instantOfTrigger); + context.setChangesLastDetectedAt(instantOfTrigger); changedSourceListener.onResponse(null); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java index 0d507b30cb174..69d66976db902 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java @@ -174,6 +174,9 @@ public void getCheckpointingInfo( if (context.getChangesLastDetectedAt() != null) { infoBuilder.setChangesLastDetectedAt(context.getChangesLastDetectedAt()); } + if (context.getChangesLastSearchedAt() != null) { + infoBuilder.setChangesLastSearchedAt(context.getChangesLastSearchedAt()); + } listener.onResponse(infoBuilder.build()); }, listener::onFailure); From 28c0fffd8513e65d89564b9e53e85c6415419134 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 21 Dec 2020 22:25:37 +0100 Subject: [PATCH 2/7] spotless --- .../TransformCheckpointingInfo.java | 79 +++++++++++-------- .../TransformCheckpointingInfoTests.java | 11 +-- .../hlrc/TransformCheckpointingInfoTests.java | 16 ++-- .../TransformCheckpointingInfo.java | 4 +- .../continuous/TransformContinuousIT.java | 1 - .../transforms/TransformContext.java | 28 +++---- .../transforms/TransformIndexer.java | 12 +-- 7 files changed, 83 insertions(+), 68 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java index c501aeb3ebcb4..8135541d9012a 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java @@ -43,43 +43,56 @@ public class TransformCheckpointingInfo { private final Instant changesLastDetectedAt; private final Instant changesLastSearchedAt; - private static final ConstructingObjectParser LENIENT_PARSER = - new ConstructingObjectParser<>( - "transform_checkpointing_info", - true, - a -> { - long behind = a[2] == null ? 0L : (Long) a[2]; - Instant changesLastDetectedAt = (Instant)a[3]; - Instant changesLastSearchedAt = (Instant)a[4]; - return new TransformCheckpointingInfo( - a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0], - a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1], - behind, - changesLastDetectedAt, - changesLastSearchedAt); - }); + private static final ConstructingObjectParser LENIENT_PARSER = new ConstructingObjectParser<>( + "transform_checkpointing_info", + true, + a -> { + long behind = a[2] == null ? 0L : (Long) a[2]; + Instant changesLastDetectedAt = (Instant) a[3]; + Instant changesLastSearchedAt = (Instant) a[4]; + return new TransformCheckpointingInfo( + a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0], + a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1], + behind, + changesLastDetectedAt, + changesLastSearchedAt + ); + } + ); static { - LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), - (p, c) -> TransformCheckpointStats.fromXContent(p), LAST_CHECKPOINT); - LENIENT_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), - (p, c) -> TransformCheckpointStats.fromXContent(p), NEXT_CHECKPOINT); + LENIENT_PARSER.declareObject( + ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> TransformCheckpointStats.fromXContent(p), + LAST_CHECKPOINT + ); + LENIENT_PARSER.declareObject( + ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> TransformCheckpointStats.fromXContent(p), + NEXT_CHECKPOINT + ); LENIENT_PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), OPERATIONS_BEHIND); - LENIENT_PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), + LENIENT_PARSER.declareField( + ConstructingObjectParser.optionalConstructorArg(), p -> TimeUtil.parseTimeFieldToInstant(p, CHANGES_LAST_DETECTED_AT.getPreferredName()), CHANGES_LAST_DETECTED_AT, - ObjectParser.ValueType.VALUE); - LENIENT_PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), + ObjectParser.ValueType.VALUE + ); + LENIENT_PARSER.declareField( + ConstructingObjectParser.optionalConstructorArg(), p -> TimeUtil.parseTimeFieldToInstant(p, CHANGES_LAST_SEARCHED_AT.getPreferredName()), CHANGES_LAST_SEARCHED_AT, - ObjectParser.ValueType.VALUE); + ObjectParser.ValueType.VALUE + ); } - public TransformCheckpointingInfo(TransformCheckpointStats last, - TransformCheckpointStats next, - long operationsBehind, - Instant changesLastDetectedAt, - Instant changesLastSearchedAt) { + public TransformCheckpointingInfo( + TransformCheckpointStats last, + TransformCheckpointStats next, + long operationsBehind, + Instant changesLastDetectedAt, + Instant changesLastSearchedAt + ) { this.last = Objects.requireNonNull(last); this.next = Objects.requireNonNull(next); this.operationsBehind = operationsBehind; @@ -130,11 +143,11 @@ public boolean equals(Object other) { TransformCheckpointingInfo that = (TransformCheckpointingInfo) other; - return Objects.equals(this.last, that.last) && - Objects.equals(this.next, that.next) && - this.operationsBehind == that.operationsBehind && - Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt) && - Objects.equals(this.changesLastSearchedAt, that.changesLastSearchedAt); + return Objects.equals(this.last, that.last) + && Objects.equals(this.next, that.next) + && this.operationsBehind == that.operationsBehind + && Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt) + && Objects.equals(this.changesLastSearchedAt, that.changesLastSearchedAt); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java index 92c5652c0f46d..60a0281985157 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java @@ -30,12 +30,12 @@ public class TransformCheckpointingInfoTests extends ESTestCase { public void testFromXContent() throws IOException { - xContentTester(this::createParser, + xContentTester( + this::createParser, TransformCheckpointingInfoTests::randomTransformCheckpointingInfo, TransformCheckpointingInfoTests::toXContent, - TransformCheckpointingInfo::fromXContent) - .supportsUnknownFields(false) - .test(); + TransformCheckpointingInfo::fromXContent + ).supportsUnknownFields(false).test(); } public static TransformCheckpointingInfo randomTransformCheckpointingInfo() { @@ -44,7 +44,8 @@ public static TransformCheckpointingInfo randomTransformCheckpointingInfo() { TransformCheckpointStatsTests.randomTransformCheckpointStats(), randomLongBetween(0, 10000), randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()), - randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong())); + randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()) + ); } public static void toXContent(TransformCheckpointingInfo info, XContentBuilder builder) throws IOException { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/TransformCheckpointingInfoTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/TransformCheckpointingInfoTests.java index d2677bbfb62ec..0d8081b23dff0 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/TransformCheckpointingInfoTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/TransformCheckpointingInfoTests.java @@ -31,7 +31,7 @@ public class TransformCheckpointingInfoTests extends AbstractResponseTestCase< org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo, - TransformCheckpointingInfo> { + TransformCheckpointingInfo> { public static org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo randomTransformCheckpointingInfo() { return new org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo( @@ -39,12 +39,14 @@ public static org.elasticsearch.xpack.core.transform.transforms.TransformCheckpo TransformCheckpointStatsTests.randomTransformCheckpointStats(), randomNonNegativeLong(), randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()), - randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong())); + randomBoolean() ? null : Instant.ofEpochMilli(randomNonNegativeLong()) + ); } @Override - protected org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo - createServerTestInstance(XContentType xContentType) { + protected org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo createServerTestInstance( + XContentType xContentType + ) { return randomTransformCheckpointingInfo(); } @@ -54,8 +56,10 @@ protected TransformCheckpointingInfo doParseToClientInstance(XContentParser pars } @Override - protected void assertInstances(org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo serverTestInstance, - TransformCheckpointingInfo clientInstance) { + protected void assertInstances( + org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo serverTestInstance, + TransformCheckpointingInfo clientInstance + ) { assertTransformCheckpointInfo(serverTestInstance, clientInstance); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java index 0174e5d163738..0abcd01393361 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java @@ -128,7 +128,7 @@ public TransformCheckpointingInfoBuilder setChangesLastSearchedAt(Instant change this.changesLastSearchedAt = changesLastSearchedAt; return this; } - + public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehind) { this.operationsBehind = operationsBehind; return this; @@ -254,7 +254,7 @@ public long getOperationsBehind() { public Instant getChangesLastDetectedAt() { return changesLastDetectedAt; } - + public Instant getChangesLastSearchedAt() { return changesLastSearchedAt; } diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java index c96e50986606b..97ee59fa178e0 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java @@ -16,7 +16,6 @@ import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.Response; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.core.AcknowledgedResponse; import org.elasticsearch.client.transform.DeleteTransformRequest; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java index 934be61c8661e..2a87284875421 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java @@ -107,15 +107,15 @@ void setChangesLastDetectedAt(Instant time) { Instant getChangesLastDetectedAt() { return changesLastDetectedAt; } - + void setChangesLastSearchedAt(Instant time) { changesLastSearchedAt = time; - } + } Instant getChangesLastSearchedAt() { return changesLastSearchedAt; } - + public boolean shouldStopAtCheckpoint() { return shouldStopAtCheckpoint; } @@ -129,18 +129,16 @@ void shutdown() { } void markAsFailed(String failureMessage) { - taskListener - .fail( - failureMessage, - ActionListener - .wrap( - r -> { - // Successfully marked as failed, reset counter so that task can be restarted - failureCount.set(0); - }, - e -> {} - ) - ); + taskListener.fail( + failureMessage, + ActionListener.wrap( + r -> { + // Successfully marked as failed, reset counter so that task can be restarted + failureCount.set(0); + }, + e -> {} + ) + ); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 29da1657dc138..fb0b0c9bf6271 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -880,15 +880,15 @@ private SearchSourceBuilder buildUpdateQuery(SearchSourceBuilder sourceBuilder) QueryBuilder queryBuilder = config.getSource().getQueryConfig().getQuery(); if (isContinuous()) { - BoolQueryBuilder filteredQuery = - new BoolQueryBuilder() - .filter(queryBuilder) - .filter(config.getSyncConfig().getRangeQuery(nextCheckpoint)); + BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(queryBuilder) + .filter(config.getSyncConfig().getRangeQuery(nextCheckpoint)); // Only apply extra filter if it is the subsequent run of the continuous transform if (nextCheckpoint.getCheckpoint() > 1 && changeCollector != null) { - QueryBuilder filter = - changeCollector.buildFilterQuery(lastCheckpoint.getTimeUpperBound(), nextCheckpoint.getTimeUpperBound()); + QueryBuilder filter = changeCollector.buildFilterQuery( + lastCheckpoint.getTimeUpperBound(), + nextCheckpoint.getTimeUpperBound() + ); if (filter != null) { filteredQuery.filter(filter); } From 2c82086ebde7456bea86043e42015702724a5fb6 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 22 Dec 2020 10:21:53 +0100 Subject: [PATCH 3/7] fix tests and unmute HistogramGroupByIT --- .../transforms/TransformCheckpointingInfoTests.java | 3 +++ .../integration/continuous/HistogramGroupByIT.java | 2 -- .../checkpoint/TransformCheckpointServiceNodeTests.java | 9 ++++++--- .../action/TransportGetTransformStatsActionTests.java | 3 +++ 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java index 60a0281985157..b32da39e129ce 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java @@ -62,6 +62,9 @@ public static void toXContent(TransformCheckpointingInfo info, XContentBuilder b if (info.getChangesLastDetectedAt() != null) { builder.field(TransformCheckpointingInfo.CHANGES_LAST_DETECTED_AT.getPreferredName(), info.getChangesLastDetectedAt()); } + if (info.getChangesLastSearchedAt() != null) { + builder.field(TransformCheckpointingInfo.CHANGES_LAST_SEARCHED_AT.getPreferredName(), info.getChangesLastSearchedAt()); + } builder.endObject(); } } diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/HistogramGroupByIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/HistogramGroupByIT.java index a129179d58794..1ac619dc89bd9 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/HistogramGroupByIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/HistogramGroupByIT.java @@ -1,6 +1,5 @@ package org.elasticsearch.xpack.transform.integration.continuous; -import org.apache.lucene.util.LuceneTestCase.AwaitsFix; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; @@ -27,7 +26,6 @@ import static org.hamcrest.Matchers.equalTo; -@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/66410") public class HistogramGroupByIT extends ContinuousTestCase { private static final String NAME = "continuous-histogram-pivot-test"; diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java index 7529d7d7848e2..72784bf98e326 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java @@ -266,7 +266,8 @@ public void testGetCheckpointStats() throws InterruptedException { new TransformCheckpointStats(1, null, null, timestamp, 0L), new TransformCheckpointStats(2, position, progress, timestamp + 100L, 0L), 30L, - Instant.ofEpochMilli(timestamp) + Instant.ofEpochMilli(timestamp), + null ); assertAsync( @@ -281,7 +282,8 @@ public void testGetCheckpointStats() throws InterruptedException { new TransformCheckpointStats(1, null, null, timestamp, 0L), new TransformCheckpointStats(2, position, progress, timestamp + 100L, 0L), 63L, - Instant.ofEpochMilli(timestamp) + Instant.ofEpochMilli(timestamp), + null ); assertAsync( listener -> getCheckpoint(transformCheckpointService, transformId, 1, position, progress, listener), @@ -296,7 +298,8 @@ public void testGetCheckpointStats() throws InterruptedException { new TransformCheckpointStats(1, null, null, timestamp, 0L), new TransformCheckpointStats(2, position, progress, timestamp + 100L, 0L), 0L, - Instant.ofEpochMilli(timestamp) + Instant.ofEpochMilli(timestamp), + null ); assertAsync( listener -> getCheckpoint(transformCheckpointService, transformId, 1, position, progress, listener), diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java index 28b5d11633afe..fe99825392c4d 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsActionTests.java @@ -45,6 +45,7 @@ public void testDeriveStatsStopped() { new TransformCheckpointStats(1, null, null, 1, 1), new TransformCheckpointStats(2, null, null, 2, 5), 2, + Instant.now(), Instant.now() ); @@ -81,6 +82,7 @@ public void testDeriveStatsFailed() { new TransformCheckpointStats(1, null, null, 1, 1), new TransformCheckpointStats(2, null, null, 2, 5), 2, + Instant.now(), Instant.now() ); @@ -126,6 +128,7 @@ public void testDeriveStats() { new TransformCheckpointStats(1, null, null, 1, 1), new TransformCheckpointStats(2, null, null, 2, 5), 2, + Instant.now(), Instant.now() ); From cd71929aec0929df22a3ef2d15a0e98bd2212dc3 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 19 Jan 2021 11:43:31 +0100 Subject: [PATCH 4/7] change naming to last_search_time --- .../TransformCheckpointingInfo.java | 20 ++++---- .../TransformCheckpointingInfoTests.java | 4 +- .../TransformCheckpointingInfo.java | 47 +++++++++---------- .../TransformCheckpointingInfoTests.java | 2 +- .../continuous/TransformContinuousIT.java | 2 +- .../transform/transforms/TransformTask.java | 2 +- 6 files changed, 38 insertions(+), 39 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java index 8135541d9012a..fea76fbeab09f 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java @@ -35,13 +35,13 @@ public class TransformCheckpointingInfo { public static final ParseField NEXT_CHECKPOINT = new ParseField("next", "in_progress"); public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind"); public static final ParseField CHANGES_LAST_DETECTED_AT = new ParseField("changes_last_detected_at"); - public static final ParseField CHANGES_LAST_SEARCHED_AT = new ParseField("changes_last_searched_at"); + public static final ParseField LAST_SEARCH_TIME = new ParseField("last_search_time"); private final TransformCheckpointStats last; private final TransformCheckpointStats next; private final long operationsBehind; private final Instant changesLastDetectedAt; - private final Instant changesLastSearchedAt; + private final Instant lastSearchTime; private static final ConstructingObjectParser LENIENT_PARSER = new ConstructingObjectParser<>( "transform_checkpointing_info", @@ -80,8 +80,8 @@ public class TransformCheckpointingInfo { ); LENIENT_PARSER.declareField( ConstructingObjectParser.optionalConstructorArg(), - p -> TimeUtil.parseTimeFieldToInstant(p, CHANGES_LAST_SEARCHED_AT.getPreferredName()), - CHANGES_LAST_SEARCHED_AT, + p -> TimeUtil.parseTimeFieldToInstant(p, LAST_SEARCH_TIME.getPreferredName()), + LAST_SEARCH_TIME, ObjectParser.ValueType.VALUE ); } @@ -91,13 +91,13 @@ public TransformCheckpointingInfo( TransformCheckpointStats next, long operationsBehind, Instant changesLastDetectedAt, - Instant changesLastSearchedAt + Instant lastSearchTime ) { this.last = Objects.requireNonNull(last); this.next = Objects.requireNonNull(next); this.operationsBehind = operationsBehind; this.changesLastDetectedAt = changesLastDetectedAt; - this.changesLastSearchedAt = changesLastSearchedAt; + this.lastSearchTime = lastSearchTime; } public TransformCheckpointStats getLast() { @@ -118,8 +118,8 @@ public Instant getChangesLastDetectedAt() { } @Nullable - public Instant getChangesLastSearchedAt() { - return changesLastSearchedAt; + public Instant getLastSearchTime() { + return lastSearchTime; } public static TransformCheckpointingInfo fromXContent(XContentParser p) { @@ -128,7 +128,7 @@ public static TransformCheckpointingInfo fromXContent(XContentParser p) { @Override public int hashCode() { - return Objects.hash(last, next, operationsBehind, changesLastDetectedAt, changesLastSearchedAt); + return Objects.hash(last, next, operationsBehind, changesLastDetectedAt, lastSearchTime); } @Override @@ -147,7 +147,7 @@ public boolean equals(Object other) { && Objects.equals(this.next, that.next) && this.operationsBehind == that.operationsBehind && Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt) - && Objects.equals(this.changesLastSearchedAt, that.changesLastSearchedAt); + && Objects.equals(this.lastSearchTime, that.lastSearchTime); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java index b32da39e129ce..73714088b442e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfoTests.java @@ -62,8 +62,8 @@ public static void toXContent(TransformCheckpointingInfo info, XContentBuilder b if (info.getChangesLastDetectedAt() != null) { builder.field(TransformCheckpointingInfo.CHANGES_LAST_DETECTED_AT.getPreferredName(), info.getChangesLastDetectedAt()); } - if (info.getChangesLastSearchedAt() != null) { - builder.field(TransformCheckpointingInfo.CHANGES_LAST_SEARCHED_AT.getPreferredName(), info.getChangesLastSearchedAt()); + if (info.getLastSearchTime() != null) { + builder.field(TransformCheckpointingInfo.LAST_SEARCH_TIME.getPreferredName(), info.getLastSearchTime()); } builder.endObject(); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java index 0abcd01393361..61b942788cd7e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java @@ -41,7 +41,7 @@ public static class TransformCheckpointingInfoBuilder { private TransformCheckpoint nextCheckpoint; private TransformCheckpoint sourceCheckpoint; private Instant changesLastDetectedAt; - private Instant changesLastSearchedAt; + private Instant lastSearchTime; private long operationsBehind; public TransformCheckpointingInfoBuilder() {} @@ -78,7 +78,7 @@ public TransformCheckpointingInfo build() { ), operationsBehind, changesLastDetectedAt, - changesLastSearchedAt + lastSearchTime ); } @@ -124,8 +124,8 @@ public TransformCheckpointingInfoBuilder setChangesLastDetectedAt(Instant change return this; } - public TransformCheckpointingInfoBuilder setChangesLastSearchedAt(Instant changesLastSearchedAt) { - this.changesLastSearchedAt = changesLastSearchedAt; + public TransformCheckpointingInfoBuilder setLastSearchTime(Instant lastSearchTime) { + this.lastSearchTime = lastSearchTime; return this; } @@ -148,12 +148,12 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi public static final ParseField NEXT_CHECKPOINT = new ParseField("next"); public static final ParseField OPERATIONS_BEHIND = new ParseField("operations_behind"); public static final ParseField CHANGES_LAST_DETECTED_AT = new ParseField("changes_last_detected_at"); - public static final ParseField CHANGES_LAST_SEARCHED_AT = new ParseField("changes_last_searched_at"); + public static final ParseField LAST_SEARCH_TIME = new ParseField("last_search_time"); private final TransformCheckpointStats last; private final TransformCheckpointStats next; private final long operationsBehind; private final Instant changesLastDetectedAt; - private final Instant changesLastSearchedAt; + private final Instant lastSearchTime; private static final ConstructingObjectParser LENIENT_PARSER = new ConstructingObjectParser<>( "data_frame_transform_checkpointing_info", @@ -192,8 +192,8 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi ); LENIENT_PARSER.declareField( ConstructingObjectParser.optionalConstructorArg(), - p -> TimeUtils.parseTimeFieldToInstant(p, CHANGES_LAST_SEARCHED_AT.getPreferredName()), - CHANGES_LAST_SEARCHED_AT, + p -> TimeUtils.parseTimeFieldToInstant(p, LAST_SEARCH_TIME.getPreferredName()), + LAST_SEARCH_TIME, ObjectParser.ValueType.VALUE ); } @@ -206,21 +206,20 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi * @param next stats of the next checkpoint * @param operationsBehind counter of operations the current checkpoint is behind source * @param changesLastDetectedAt the last time the source indices changes have been found - * @param changesLastSearchedAt the last time the source indices were checked for changes + * @param lastSearchTime the last time the source indices were searched */ public TransformCheckpointingInfo( TransformCheckpointStats last, TransformCheckpointStats next, long operationsBehind, Instant changesLastDetectedAt, - Instant changesLastSearchedAt + Instant lastSearchTime ) { this.last = Objects.requireNonNull(last); this.next = Objects.requireNonNull(next); this.operationsBehind = operationsBehind; - // todo: Instant is immutable, so a copy does not seem necessary - this.changesLastDetectedAt = changesLastDetectedAt == null ? null : Instant.ofEpochMilli(changesLastDetectedAt.toEpochMilli()); - this.changesLastSearchedAt = changesLastSearchedAt == null ? null : Instant.ofEpochMilli(changesLastSearchedAt.toEpochMilli()); + this.changesLastDetectedAt = changesLastDetectedAt; + this.lastSearchTime = lastSearchTime; } public TransformCheckpointingInfo(StreamInput in) throws IOException { @@ -233,9 +232,9 @@ public TransformCheckpointingInfo(StreamInput in) throws IOException { changesLastDetectedAt = null; } if (in.getVersion().onOrAfter(Version.V_8_0_0)) { // todo: V_7_12_0 - changesLastSearchedAt = in.readOptionalInstant(); + lastSearchTime = in.readOptionalInstant(); } else { - changesLastSearchedAt = null; + lastSearchTime = null; } } @@ -255,8 +254,8 @@ public Instant getChangesLastDetectedAt() { return changesLastDetectedAt; } - public Instant getChangesLastSearchedAt() { - return changesLastSearchedAt; + public Instant getLastSearchTime() { + return lastSearchTime; } @Override @@ -276,11 +275,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws changesLastDetectedAt.toEpochMilli() ); } - if (changesLastSearchedAt != null) { + if (lastSearchTime != null) { builder.timeField( - CHANGES_LAST_SEARCHED_AT.getPreferredName(), - CHANGES_LAST_SEARCHED_AT.getPreferredName() + "_string", - changesLastSearchedAt.toEpochMilli() + LAST_SEARCH_TIME.getPreferredName(), + LAST_SEARCH_TIME.getPreferredName() + "_string", + lastSearchTime.toEpochMilli() ); } builder.endObject(); @@ -296,7 +295,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalInstant(changesLastDetectedAt); } if (out.getVersion().onOrAfter(Version.V_8_0_0)) { // todo: V_7_12_0 - out.writeOptionalInstant(changesLastSearchedAt); + out.writeOptionalInstant(lastSearchTime); } } @@ -306,7 +305,7 @@ public static TransformCheckpointingInfo fromXContent(XContentParser p) { @Override public int hashCode() { - return Objects.hash(last, next, operationsBehind, changesLastDetectedAt, changesLastSearchedAt); + return Objects.hash(last, next, operationsBehind, changesLastDetectedAt, lastSearchTime); } @Override @@ -325,7 +324,7 @@ public boolean equals(Object other) { && Objects.equals(this.next, that.next) && this.operationsBehind == that.operationsBehind && Objects.equals(this.changesLastDetectedAt, that.changesLastDetectedAt) - && Objects.equals(this.changesLastSearchedAt, that.changesLastSearchedAt); + && Objects.equals(this.lastSearchTime, that.lastSearchTime); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfoTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfoTests.java index 33e67947d9690..fa2c3eda335e6 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfoTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfoTests.java @@ -47,7 +47,7 @@ public void testBackwardsSerialization() throws IOException { TransformCheckpointStats.EMPTY, TransformCheckpointStats.EMPTY, randomNonNegativeLong(), - // changesLastDetectedAt, changesLastSearchedAt is not serialized to past values, so when it is pulled back in, it will be null + // changesLastDetectedAt, lastSearchTime is not serialized to past values, so when it is pulled back in, it will be null null, null ); diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java index 97ee59fa178e0..b6ee678b4cd46 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java @@ -512,7 +512,7 @@ private void waitUntilTransformsProcessedNewData(TimeValue delay, int iteration) + stats.getState() + ", reason: " + stats.getReason(), - stats.getCheckpointingInfo().getChangesLastSearchedAt(), + stats.getCheckpointingInfo().getLastSearchTime(), greaterThan(waitUntil) ); }, 20, TimeUnit.SECONDS); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java index 69d66976db902..a6c1b90339dce 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java @@ -175,7 +175,7 @@ public void getCheckpointingInfo( infoBuilder.setChangesLastDetectedAt(context.getChangesLastDetectedAt()); } if (context.getChangesLastSearchedAt() != null) { - infoBuilder.setChangesLastSearchedAt(context.getChangesLastSearchedAt()); + infoBuilder.setLastSearchTime(context.getChangesLastSearchedAt()); } listener.onResponse(infoBuilder.build()); }, listener::onFailure); From b2bd358dbdb7bebc8eac9ea15b13ba69a4d9b061 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 19 Jan 2021 13:12:09 +0100 Subject: [PATCH 5/7] fix some more namings --- .../transforms/TransformCheckpointingInfo.java | 4 ++-- .../transforms/TransformCheckpointingInfo.java | 4 ++-- .../xpack/transform/transforms/TransformContext.java | 10 +++++----- .../xpack/transform/transforms/TransformIndexer.java | 4 ++-- .../xpack/transform/transforms/TransformTask.java | 4 ++-- 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java index fea76fbeab09f..8a4d0330d76e9 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/TransformCheckpointingInfo.java @@ -49,13 +49,13 @@ public class TransformCheckpointingInfo { a -> { long behind = a[2] == null ? 0L : (Long) a[2]; Instant changesLastDetectedAt = (Instant) a[3]; - Instant changesLastSearchedAt = (Instant) a[4]; + Instant lastSearchTime = (Instant) a[4]; return new TransformCheckpointingInfo( a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0], a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1], behind, changesLastDetectedAt, - changesLastSearchedAt + lastSearchTime ); } ); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java index 61b942788cd7e..35617e0f84271 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java @@ -161,13 +161,13 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi a -> { long behind = a[2] == null ? 0L : (Long) a[2]; Instant changesLastDetectedAt = (Instant) a[3]; - Instant changesLastSearchedAt = (Instant) a[4]; + Instant lastSearchedTime = (Instant) a[4]; return new TransformCheckpointingInfo( a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0], a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1], behind, changesLastDetectedAt, - changesLastSearchedAt + lastSearchedTime ); } ); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java index 2a87284875421..cde96b39f3605 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java @@ -29,7 +29,7 @@ public interface Listener { private volatile int numFailureRetries = Transform.DEFAULT_FAILURE_RETRIES; private final AtomicInteger failureCount; private volatile Instant changesLastDetectedAt; - private volatile Instant changesLastSearchedAt; + private volatile Instant lastSearchedTime; private volatile boolean shouldStopAtCheckpoint; // the checkpoint of this transform, storing the checkpoint until data indexing from source to dest is _complete_ @@ -108,12 +108,12 @@ Instant getChangesLastDetectedAt() { return changesLastDetectedAt; } - void setChangesLastSearchedAt(Instant time) { - changesLastSearchedAt = time; + void setLastSearchedTime(Instant time) { + lastSearchedTime = time; } - Instant getChangesLastSearchedAt() { - return changesLastSearchedAt; + Instant getLastSearchedTime() { + return lastSearchedTime; } public boolean shouldStopAtCheckpoint() { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index fb0b0c9bf6271..b2056d25f949b 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -338,7 +338,7 @@ protected void onStart(long now, ActionListener listener) { // we should verify if there are local changes based on the sync config. If not, do not proceed further and exit. if (context.getCheckpoint() > 0 && initialRun()) { sourceHasChanged(ActionListener.wrap(hasChanged -> { - context.setChangesLastSearchedAt(instantOfTrigger); + context.setLastSearchedTime(instantOfTrigger); hasSourceChanged = hasChanged; if (hasChanged) { context.setChangesLastDetectedAt(instantOfTrigger); @@ -357,7 +357,7 @@ protected void onStart(long now, ActionListener listener) { })); } else { hasSourceChanged = true; - context.setChangesLastSearchedAt(instantOfTrigger); + context.setLastSearchedTime(instantOfTrigger); context.setChangesLastDetectedAt(instantOfTrigger); changedSourceListener.onResponse(null); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java index a6c1b90339dce..238cee706d859 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java @@ -174,8 +174,8 @@ public void getCheckpointingInfo( if (context.getChangesLastDetectedAt() != null) { infoBuilder.setChangesLastDetectedAt(context.getChangesLastDetectedAt()); } - if (context.getChangesLastSearchedAt() != null) { - infoBuilder.setLastSearchTime(context.getChangesLastSearchedAt()); + if (context.getLastSearchedTime() != null) { + infoBuilder.setLastSearchTime(context.getLastSearchedTime()); } listener.onResponse(infoBuilder.build()); }, listener::onFailure); From a2a11acc2d22c73bfafe859b3a584ce92d2d62cb Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 19 Jan 2021 16:04:46 +0100 Subject: [PATCH 6/7] fix namings again --- .../transforms/TransformCheckpointingInfo.java | 4 ++-- .../xpack/transform/transforms/TransformContext.java | 10 +++++----- .../xpack/transform/transforms/TransformIndexer.java | 4 ++-- .../xpack/transform/transforms/TransformTask.java | 4 ++-- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java index 35617e0f84271..41612bb05e959 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpointingInfo.java @@ -161,13 +161,13 @@ public TransformCheckpointingInfoBuilder setOperationsBehind(long operationsBehi a -> { long behind = a[2] == null ? 0L : (Long) a[2]; Instant changesLastDetectedAt = (Instant) a[3]; - Instant lastSearchedTime = (Instant) a[4]; + Instant lastSearchTime = (Instant) a[4]; return new TransformCheckpointingInfo( a[0] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[0], a[1] == null ? TransformCheckpointStats.EMPTY : (TransformCheckpointStats) a[1], behind, changesLastDetectedAt, - lastSearchedTime + lastSearchTime ); } ); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java index cde96b39f3605..83fa40605d2bb 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformContext.java @@ -29,7 +29,7 @@ public interface Listener { private volatile int numFailureRetries = Transform.DEFAULT_FAILURE_RETRIES; private final AtomicInteger failureCount; private volatile Instant changesLastDetectedAt; - private volatile Instant lastSearchedTime; + private volatile Instant lastSearchTime; private volatile boolean shouldStopAtCheckpoint; // the checkpoint of this transform, storing the checkpoint until data indexing from source to dest is _complete_ @@ -108,12 +108,12 @@ Instant getChangesLastDetectedAt() { return changesLastDetectedAt; } - void setLastSearchedTime(Instant time) { - lastSearchedTime = time; + void setLastSearchTime(Instant time) { + lastSearchTime = time; } - Instant getLastSearchedTime() { - return lastSearchedTime; + Instant getLastSearchTime() { + return lastSearchTime; } public boolean shouldStopAtCheckpoint() { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index b2056d25f949b..ccdda9aedf447 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -338,7 +338,7 @@ protected void onStart(long now, ActionListener listener) { // we should verify if there are local changes based on the sync config. If not, do not proceed further and exit. if (context.getCheckpoint() > 0 && initialRun()) { sourceHasChanged(ActionListener.wrap(hasChanged -> { - context.setLastSearchedTime(instantOfTrigger); + context.setLastSearchTime(instantOfTrigger); hasSourceChanged = hasChanged; if (hasChanged) { context.setChangesLastDetectedAt(instantOfTrigger); @@ -357,7 +357,7 @@ protected void onStart(long now, ActionListener listener) { })); } else { hasSourceChanged = true; - context.setLastSearchedTime(instantOfTrigger); + context.setLastSearchTime(instantOfTrigger); context.setChangesLastDetectedAt(instantOfTrigger); changedSourceListener.onResponse(null); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java index 238cee706d859..e1fe60dde71c0 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java @@ -174,8 +174,8 @@ public void getCheckpointingInfo( if (context.getChangesLastDetectedAt() != null) { infoBuilder.setChangesLastDetectedAt(context.getChangesLastDetectedAt()); } - if (context.getLastSearchedTime() != null) { - infoBuilder.setLastSearchTime(context.getLastSearchedTime()); + if (context.getLastSearchTime() != null) { + infoBuilder.setLastSearchTime(context.getLastSearchTime()); } listener.onResponse(infoBuilder.build()); }, listener::onFailure); From 17854c8c33161d6f6e9760a68a5ad6ce776c8ce3 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 20 Jan 2021 11:09:02 +0100 Subject: [PATCH 7/7] remove outdated comment --- .../transform/integration/continuous/TransformContinuousIT.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java index b6ee678b4cd46..59393b0c20bec 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/continuous/TransformContinuousIT.java @@ -273,8 +273,6 @@ public void testContinousEvents() throws Exception { // start all transforms, wait until the processed all data and stop them startTransforms(); - // at random we added between 0 and 999_999ns == (1ms - 1ns) to every data point, so we add 1ms, so every data point is before - // the checkpoint waitUntilTransformsProcessedNewData(ContinuousTestCase.SYNC_DELAY, run); stopTransforms();