diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/SettingsConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/SettingsConfig.java index 0378d27669f4b..9b0054ac78f9c 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/SettingsConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/transform/transforms/SettingsConfig.java @@ -25,20 +25,25 @@ public class SettingsConfig implements ToXContentObject { private static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size"); private static final ParseField DOCS_PER_SECOND = new ParseField("docs_per_second"); private static final ParseField DATES_AS_EPOCH_MILLIS = new ParseField("dates_as_epoch_millis"); + private static final ParseField INTERIM_RESULTS = new ParseField("interim_results"); private static final int DEFAULT_MAX_PAGE_SEARCH_SIZE = -1; private static final float DEFAULT_DOCS_PER_SECOND = -1F; // use an integer as we need to code 4 states: true, false, null (unchanged), default (defined server side) private static final int DEFAULT_DATES_AS_EPOCH_MILLIS = -1; + // use an integer as we need to code 4 states: true, false, null (unchanged), default (defined server side) + private static final int DEFAULT_INTERIM_RESULTS = -1; + private final Integer maxPageSearchSize; private final Float docsPerSecond; private final Integer datesAsEpochMillis; + private final Integer interimResults; private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "settings_config", true, - args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2]) + args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2], (Integer) args[3]) ); static { @@ -51,16 +56,24 @@ public class SettingsConfig implements ToXContentObject { DATES_AS_EPOCH_MILLIS, ValueType.BOOLEAN_OR_NULL ); + // this boolean requires 4 possible values: true, false, not_specified, default, therefore using a custom parser + PARSER.declareField( + optionalConstructorArg(), + p -> p.currentToken() == XContentParser.Token.VALUE_NULL ? DEFAULT_INTERIM_RESULTS : p.booleanValue() ? 1 : 0, + INTERIM_RESULTS, + ValueType.BOOLEAN_OR_NULL + ); } public static SettingsConfig fromXContent(final XContentParser parser) { return PARSER.apply(parser, null); } - SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Integer datesAsEpochMillis) { + SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Integer datesAsEpochMillis, Integer interimResults) { this.maxPageSearchSize = maxPageSearchSize; this.docsPerSecond = docsPerSecond; this.datesAsEpochMillis = datesAsEpochMillis; + this.interimResults = interimResults; } @Override @@ -87,6 +100,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(DATES_AS_EPOCH_MILLIS.getPreferredName(), datesAsEpochMillis > 0 ? true : false); } } + if (interimResults != null) { + if (interimResults.equals(DEFAULT_INTERIM_RESULTS)) { + builder.field(INTERIM_RESULTS.getPreferredName(), (Boolean) null); + } else { + builder.field(INTERIM_RESULTS.getPreferredName(), interimResults > 0 ? true : false); + } + } builder.endObject(); return builder; } @@ -103,6 +123,10 @@ public Boolean getDatesAsEpochMillis() { return datesAsEpochMillis != null ? datesAsEpochMillis > 0 : null; } + public Boolean getInterimResults() { + return interimResults != null ? interimResults > 0 : null; + } + @Override public boolean equals(Object other) { if (other == this) { @@ -115,12 +139,13 @@ public boolean equals(Object other) { SettingsConfig that = (SettingsConfig) other; return Objects.equals(maxPageSearchSize, that.maxPageSearchSize) && Objects.equals(docsPerSecond, that.docsPerSecond) - && Objects.equals(datesAsEpochMillis, that.datesAsEpochMillis); + && Objects.equals(datesAsEpochMillis, that.datesAsEpochMillis) + && Objects.equals(interimResults, that.interimResults); } @Override public int hashCode() { - return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis); + return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, interimResults); } public static Builder builder() { @@ -131,6 +156,7 @@ public static class Builder { private Integer maxPageSearchSize; private Float docsPerSecond; private Integer datesAsEpochMillis; + private Integer interimResults; /** * Sets the paging maximum paging maxPageSearchSize that transform can use when @@ -176,8 +202,21 @@ public Builder setDatesAsEpochMillis(Boolean datesAsEpochMillis) { return this; } + /** + * Whether to write interim results in transform checkpoints. + * + * An explicit `null` resets to default. + * + * @param interimResults true if interim results should be written. + * @return the {@link Builder} with interimResults set. + */ + public Builder setInterimResults(Boolean interimResults) { + this.interimResults = interimResults == null ? DEFAULT_INTERIM_RESULTS : interimResults ? 1 : 0; + return this; + } + public SettingsConfig build() { - return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis); + return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, interimResults); } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/SettingsConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/SettingsConfigTests.java index d4ab886b625ca..9d2776415b432 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/SettingsConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/SettingsConfigTests.java @@ -30,6 +30,7 @@ public static SettingsConfig randomSettingsConfig() { return new SettingsConfig( randomBoolean() ? null : randomIntBetween(10, 10_000), randomBoolean() ? null : randomFloat(), + randomBoolean() ? null : randomIntBetween(-1, 1), randomBoolean() ? null : randomIntBetween(-1, 1) ); } @@ -72,6 +73,7 @@ public void testExplicitNullOnWriteParser() throws IOException { assertThat(settingsAsMap.getOrDefault("max_page_search_size", "not_set"), equalTo("not_set")); assertNull(settingsAsMap.getOrDefault("docs_per_second", "not_set")); assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set")); + assertThat(settingsAsMap.getOrDefault("interim_results", "not_set"), equalTo("not_set")); config = fromString("{\"dates_as_epoch_millis\" : null}"); assertFalse(config.getDatesAsEpochMillis()); @@ -80,6 +82,16 @@ public void testExplicitNullOnWriteParser() throws IOException { assertThat(settingsAsMap.getOrDefault("max_page_search_size", "not_set"), equalTo("not_set")); assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set")); assertNull(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set")); + assertThat(settingsAsMap.getOrDefault("interim_results", "not_set"), equalTo("not_set")); + + config = fromString("{\"interim_results\" : null}"); + assertFalse(config.getInterimResults()); + + settingsAsMap = xContentToMap(config); + assertThat(settingsAsMap.getOrDefault("max_page_search_size", "not_set"), equalTo("not_set")); + assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set")); + assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set")); + assertNull(settingsAsMap.getOrDefault("interim_results", "not_set")); } public void testExplicitNullOnWriteBuilder() throws IOException { @@ -91,10 +103,12 @@ public void testExplicitNullOnWriteBuilder() throws IOException { assertNull(settingsAsMap.getOrDefault("max_page_search_size", "not_set")); assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set")); assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set")); + assertThat(settingsAsMap.getOrDefault("interim_results", "not_set"), equalTo("not_set")); SettingsConfig emptyConfig = new SettingsConfig.Builder().build(); assertNull(emptyConfig.getMaxPageSearchSize()); assertNull(emptyConfig.getDatesAsEpochMillis()); + assertNull(emptyConfig.getInterimResults()); settingsAsMap = xContentToMap(emptyConfig); assertTrue(settingsAsMap.isEmpty()); @@ -106,6 +120,7 @@ public void testExplicitNullOnWriteBuilder() throws IOException { assertThat(settingsAsMap.getOrDefault("max_page_search_size", "not_set"), equalTo("not_set")); assertNull(settingsAsMap.getOrDefault("docs_per_second", "not_set")); assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set")); + assertThat(settingsAsMap.getOrDefault("interim_results", "not_set"), equalTo("not_set")); config = new SettingsConfig.Builder().setDatesAsEpochMillis(null).build(); // returns false, however it's `null` as in "use default", checked next @@ -115,6 +130,17 @@ public void testExplicitNullOnWriteBuilder() throws IOException { assertThat(settingsAsMap.getOrDefault("max_page_search_size", "not_set"), equalTo("not_set")); assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set")); assertNull(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set")); + assertThat(settingsAsMap.getOrDefault("interim_results", "not_set"), equalTo("not_set")); + + config = new SettingsConfig.Builder().setInterimResults(null).build(); + // returns false, however it's `null` as in "use default", checked next + assertFalse(config.getInterimResults()); + + settingsAsMap = xContentToMap(config); + assertThat(settingsAsMap.getOrDefault("max_page_search_size", "not_set"), equalTo("not_set")); + assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set")); + assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set")); + assertNull(settingsAsMap.getOrDefault("interim_results", "not_set")); } private Map xContentToMap(ToXContent xcontent) throws IOException { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/SettingsConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/SettingsConfigTests.java index b808538cc7566..6ac3ee81238aa 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/SettingsConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/transform/transforms/hlrc/SettingsConfigTests.java @@ -23,6 +23,7 @@ public static org.elasticsearch.xpack.core.transform.transforms.SettingsConfig r return new org.elasticsearch.xpack.core.transform.transforms.SettingsConfig( randomBoolean() ? null : randomIntBetween(10, 10_000), randomBoolean() ? null : randomFloat(), + randomBoolean() ? null : randomIntBetween(0, 1), randomBoolean() ? null : randomIntBetween(0, 1) ); } @@ -34,6 +35,7 @@ public static void assertHlrcEquals( assertEquals(serverTestInstance.getMaxPageSearchSize(), clientInstance.getMaxPageSearchSize()); assertEquals(serverTestInstance.getDocsPerSecond(), clientInstance.getDocsPerSecond()); assertEquals(serverTestInstance.getDatesAsEpochMillis(), clientInstance.getDatesAsEpochMillis()); + assertEquals(serverTestInstance.getInterimResults(), clientInstance.getInterimResults()); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java index ffebe61e30c16..90c9e244d0e01 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java @@ -37,6 +37,7 @@ public final class TransformField { public static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size"); public static final ParseField DOCS_PER_SECOND = new ParseField("docs_per_second"); public static final ParseField DATES_AS_EPOCH_MILLIS = new ParseField("dates_as_epoch_millis"); + public static final ParseField INTERIM_RESULTS = new ParseField("interim_results"); public static final ParseField FIELD = new ParseField("field"); public static final ParseField SYNC = new ParseField("sync"); public static final ParseField TIME = new ParseField("time"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfig.java index 0de9f49b1978d..7bf5ef6edfb7d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfig.java @@ -34,12 +34,13 @@ public class SettingsConfig implements Writeable, ToXContentObject { private static final int DEFAULT_MAX_PAGE_SEARCH_SIZE = -1; private static final float DEFAULT_DOCS_PER_SECOND = -1F; private static final int DEFAULT_DATES_AS_EPOCH_MILLIS = -1; + private static final int DEFAULT_INTERIM_RESULTS = -1; private static ConstructingObjectParser createParser(boolean lenient) { ConstructingObjectParser parser = new ConstructingObjectParser<>( "transform_config_settings", lenient, - args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2]) + args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2], (Integer) args[3]) ); parser.declareIntOrNull(optionalConstructorArg(), DEFAULT_MAX_PAGE_SEARCH_SIZE, TransformField.MAX_PAGE_SEARCH_SIZE); parser.declareFloatOrNull(optionalConstructorArg(), DEFAULT_DOCS_PER_SECOND, TransformField.DOCS_PER_SECOND); @@ -50,25 +51,39 @@ private static ConstructingObjectParser createParser(boole TransformField.DATES_AS_EPOCH_MILLIS, ValueType.BOOLEAN_OR_NULL ); + // this boolean requires 4 possible values: true, false, not_specified, default, therefore using a custom parser + parser.declareField( + optionalConstructorArg(), + p -> p.currentToken() == XContentParser.Token.VALUE_NULL ? DEFAULT_INTERIM_RESULTS : p.booleanValue() ? 1 : 0, + TransformField.INTERIM_RESULTS, + ValueType.BOOLEAN_OR_NULL + ); return parser; } private final Integer maxPageSearchSize; private final Float docsPerSecond; private final Integer datesAsEpochMillis; + private final Integer interimResults; public SettingsConfig() { - this(null, null, (Integer) null); + this(null, null, (Integer) null, (Integer) null); } - public SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Boolean datesAsEpochMillis) { - this(maxPageSearchSize, docsPerSecond, datesAsEpochMillis == null ? null : datesAsEpochMillis ? 1 : 0); + public SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Boolean datesAsEpochMillis, Boolean interimResults) { + this( + maxPageSearchSize, + docsPerSecond, + datesAsEpochMillis == null ? null : datesAsEpochMillis ? 1 : 0, + interimResults == null ? null : interimResults ? 1 : 0 + ); } - public SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Integer datesAsEpochMillis) { + public SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Integer datesAsEpochMillis, Integer interimResults) { this.maxPageSearchSize = maxPageSearchSize; this.docsPerSecond = docsPerSecond; this.datesAsEpochMillis = datesAsEpochMillis; + this.interimResults = interimResults; } public SettingsConfig(final StreamInput in) throws IOException { @@ -79,6 +94,11 @@ public SettingsConfig(final StreamInput in) throws IOException { } else { this.datesAsEpochMillis = DEFAULT_DATES_AS_EPOCH_MILLIS; } + if (in.getVersion().onOrAfter(Version.CURRENT)) { // TODO: 7.15 + this.interimResults = in.readOptionalInt(); + } else { + this.interimResults = DEFAULT_INTERIM_RESULTS; + } } public Integer getMaxPageSearchSize() { @@ -97,6 +117,14 @@ public Integer getDatesAsEpochMillisForUpdate() { return datesAsEpochMillis; } + public Boolean getInterimResults() { + return interimResults != null ? interimResults > 0 : null; + } + + public Integer getInterimResultsForUpdate() { + return interimResults; + } + public ActionRequestValidationException validate(ActionRequestValidationException validationException) { if (maxPageSearchSize != null && (maxPageSearchSize < 10 || maxPageSearchSize > MultiBucketConsumerService.DEFAULT_MAX_BUCKETS)) { validationException = addValidationError( @@ -118,6 +146,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_11_0)) { out.writeOptionalInt(datesAsEpochMillis); } + if (out.getVersion().onOrAfter(Version.CURRENT)) { // TODO: 7.15 + out.writeOptionalInt(interimResults); + } } @Override @@ -133,6 +164,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (datesAsEpochMillis != null && (datesAsEpochMillis.equals(DEFAULT_DATES_AS_EPOCH_MILLIS) == false)) { builder.field(TransformField.DATES_AS_EPOCH_MILLIS.getPreferredName(), datesAsEpochMillis > 0 ? true : false); } + if (interimResults != null && (interimResults.equals(DEFAULT_INTERIM_RESULTS) == false)) { + builder.field(TransformField.INTERIM_RESULTS.getPreferredName(), interimResults > 0 ? true : false); + } builder.endObject(); return builder; } @@ -149,12 +183,13 @@ public boolean equals(Object other) { SettingsConfig that = (SettingsConfig) other; return Objects.equals(maxPageSearchSize, that.maxPageSearchSize) && Objects.equals(docsPerSecond, that.docsPerSecond) - && Objects.equals(datesAsEpochMillis, that.datesAsEpochMillis); + && Objects.equals(datesAsEpochMillis, that.datesAsEpochMillis) + && Objects.equals(interimResults, that.interimResults); } @Override public int hashCode() { - return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis); + return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, interimResults); } @Override @@ -170,6 +205,7 @@ public static class Builder { private Integer maxPageSearchSize; private Float docsPerSecond; private Integer datesAsEpochMillis; + private Integer interimResults; /** * Default builder @@ -185,6 +221,7 @@ public Builder(SettingsConfig base) { this.maxPageSearchSize = base.maxPageSearchSize; this.docsPerSecond = base.docsPerSecond; this.datesAsEpochMillis = base.datesAsEpochMillis; + this.interimResults = base.interimResults; } /** @@ -231,6 +268,19 @@ public Builder setDatesAsEpochMillis(Boolean datesAsEpochMillis) { return this; } + /** + * Whether to write interim results in transform checkpoints. + * + * An explicit `null` resets to default. + * + * @param interimResults true if interim results should be written. + * @return the {@link Builder} with interimResults set. + */ + public Builder setInterimResults(Boolean interimResults) { + this.interimResults = interimResults == null ? DEFAULT_INTERIM_RESULTS : interimResults ? 1 : 0; + return this; + } + /** * Update settings according to given settings config. * @@ -253,12 +303,17 @@ public Builder update(SettingsConfig update) { ? null : update.getDatesAsEpochMillisForUpdate(); } + if (update.getInterimResultsForUpdate() != null) { + this.interimResults = update.getInterimResultsForUpdate().equals(DEFAULT_INTERIM_RESULTS) + ? null + : update.getInterimResultsForUpdate(); + } return this; } public SettingsConfig build() { - return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis); + return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, interimResults); } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java index dae3eb86966d3..dab1b31b1d1a7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformCheckpoint.java @@ -8,12 +8,12 @@ package org.elasticsearch.xpack.core.transform.transforms; import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java index 7117fdec1f6f4..1e4c77b000e00 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java @@ -538,7 +538,8 @@ private static TransformConfig applyRewriteForUpdate(Builder builder) { new SettingsConfig( maxPageSearchSize, builder.getSettings().getDocsPerSecond(), - builder.getSettings().getDatesAsEpochMillis() + builder.getSettings().getDatesAsEpochMillis(), + builder.getSettings().getInterimResults() ) ); } @@ -546,7 +547,22 @@ private static TransformConfig applyRewriteForUpdate(Builder builder) { // 2. set dates_as_epoch_millis to true for transforms < 7.11 to keep BWC if (builder.getVersion() != null && builder.getVersion().before(Version.V_7_11_0)) { builder.setSettings( - new SettingsConfig(builder.getSettings().getMaxPageSearchSize(), builder.getSettings().getDocsPerSecond(), true) + new SettingsConfig( + builder.getSettings().getMaxPageSearchSize(), + builder.getSettings().getDocsPerSecond(), + true, + builder.getSettings().getInterimResults()) + ); + } + + // 3. set interim_results to true for transforms < 7.15 to keep BWC + if (builder.getVersion() != null && builder.getVersion().before(Version.CURRENT)) { // TODO: 7.15 + builder.setSettings( + new SettingsConfig( + builder.getSettings().getMaxPageSearchSize(), + builder.getSettings().getDocsPerSecond(), + builder.getSettings().getDatesAsEpochMillis(), + true) ); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfigTests.java index 273a5c817bc1d..684c2950ea5f2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfigTests.java @@ -33,12 +33,13 @@ public static SettingsConfig randomSettingsConfig() { return new SettingsConfig( randomBoolean() ? null : randomIntBetween(10, 10_000), randomBoolean() ? null : randomFloat(), + randomBoolean() ? null : randomIntBetween(0, 1), randomBoolean() ? null : randomIntBetween(0, 1) ); } public static SettingsConfig randomNonEmptySettingsConfig() { - return new SettingsConfig(randomIntBetween(10, 10_000), randomFloat(), randomIntBetween(0, 1)); + return new SettingsConfig(randomIntBetween(10, 10_000), randomFloat(), randomIntBetween(0, 1), randomIntBetween(0, 1)); } @Before @@ -78,6 +79,9 @@ public void testExplicitNullParsing() throws IOException { assertThat(fromString("{\"dates_as_epoch_millis\" : null}").getDatesAsEpochMillisForUpdate(), equalTo(-1)); assertNull(fromString("{}").getDatesAsEpochMillisForUpdate()); + + assertThat(fromString("{\"interim_results\" : null}").getInterimResultsForUpdate(), equalTo(-1)); + assertNull(fromString("{}").getInterimResultsForUpdate()); } public void testUpdateUsingBuilder() throws IOException { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java index 59786fd5d80d0..7e4266bd966e7 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java @@ -106,7 +106,7 @@ public void testApply() { TimeValue frequency = TimeValue.timeValueSeconds(10); SyncConfig syncConfig = new TimeSyncConfig("time_field", TimeValue.timeValueSeconds(30)); String newDescription = "new description"; - SettingsConfig settings = new SettingsConfig(4_000, 4_000.400F, true); + SettingsConfig settings = new SettingsConfig(4_000, 4_000.400F, true, true); RetentionPolicyConfig retentionPolicyConfig = new TimeRetentionPolicyConfig("time_field", new TimeValue(60_000)); update = new TransformConfigUpdate( sourceConfig, @@ -150,37 +150,38 @@ public void testApplySettings() { randomBoolean() ? null : Version.V_7_2_0.toString() ); - TransformConfigUpdate update = new TransformConfigUpdate( - null, - null, - null, - null, - null, - new SettingsConfig(4_000, null, (Boolean) null), - null - ); + TransformConfigUpdate update = + new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(4_000, null, (Boolean) null, null), null); TransformConfig updatedConfig = update.apply(config); // for settings we allow partial updates, so changing 1 setting should not overwrite the other // the parser handles explicit nulls, tested in @link{SettingsConfigTests} assertThat(updatedConfig.getSettings().getMaxPageSearchSize(), equalTo(4_000)); assertThat(updatedConfig.getSettings().getDocsPerSecond(), equalTo(config.getSettings().getDocsPerSecond())); + assertThat(updatedConfig.getSettings().getDatesAsEpochMillis(), equalTo(config.getSettings().getDatesAsEpochMillis())); + assertThat(updatedConfig.getSettings().getInterimResults(), equalTo(config.getSettings().getInterimResults())); - update = new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(null, 43.244F, (Boolean) null), null); + update = new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(null, 43.244F, (Boolean) null, null), null); updatedConfig = update.apply(updatedConfig); assertThat(updatedConfig.getSettings().getMaxPageSearchSize(), equalTo(4_000)); assertThat(updatedConfig.getSettings().getDocsPerSecond(), equalTo(43.244F)); + assertThat(updatedConfig.getSettings().getDatesAsEpochMillis(), equalTo(config.getSettings().getDatesAsEpochMillis())); + assertThat(updatedConfig.getSettings().getInterimResults(), equalTo(config.getSettings().getInterimResults())); // now reset to default using the magic -1 - update = new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(-1, null, (Boolean) null), null); + update = new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(-1, null, (Boolean) null, null), null); updatedConfig = update.apply(updatedConfig); assertNull(updatedConfig.getSettings().getMaxPageSearchSize()); assertThat(updatedConfig.getSettings().getDocsPerSecond(), equalTo(43.244F)); + assertThat(updatedConfig.getSettings().getDatesAsEpochMillis(), equalTo(config.getSettings().getDatesAsEpochMillis())); + assertThat(updatedConfig.getSettings().getInterimResults(), equalTo(config.getSettings().getInterimResults())); - update = new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(-1, -1F, (Boolean) null), null); + update = new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(-1, -1F, (Boolean) null, null), null); updatedConfig = update.apply(updatedConfig); assertNull(updatedConfig.getSettings().getMaxPageSearchSize()); assertNull(updatedConfig.getSettings().getDocsPerSecond()); + assertThat(updatedConfig.getSettings().getDatesAsEpochMillis(), equalTo(config.getSettings().getDatesAsEpochMillis())); + assertThat(updatedConfig.getSettings().getInterimResults(), equalTo(config.getSettings().getInterimResults())); } public void testApplyWithSyncChange() { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GroupConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GroupConfigTests.java index 505462b4b9b6f..70d8cf840b9ae 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GroupConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/pivot/GroupConfigTests.java @@ -66,7 +66,7 @@ public static GroupConfig randomGroupConfig(Supplier singleGr return new GroupConfig(source, groups); } - private static SingleGroupSource randomSingleGroupSource(Version version) { + public static SingleGroupSource randomSingleGroupSource(Version version) { Type type = randomFrom(SingleGroupSource.Type.values()); switch (type) { case TERMS: diff --git a/x-pack/plugin/core/src/test/resources/rest-api-spec/schema/transform_config.schema.json b/x-pack/plugin/core/src/test/resources/rest-api-spec/schema/transform_config.schema.json index 9653e593ab449..67df081561618 100644 --- a/x-pack/plugin/core/src/test/resources/rest-api-spec/schema/transform_config.schema.json +++ b/x-pack/plugin/core/src/test/resources/rest-api-spec/schema/transform_config.schema.json @@ -180,6 +180,12 @@ "title": "docs per second", "type": "number" }, + "interim_results": { + "$id": "#root/settings/interim_results", + "title": "interim results", + "type": "boolean", + "default": false + }, "max_page_search_size": { "$id": "#root/settings/max_page_search_size", "title": "max page search size", @@ -243,4 +249,4 @@ } } } -} \ No newline at end of file +} diff --git a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java index e3682c8f8fc06..c99ee97ee9a3b 100644 --- a/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java +++ b/x-pack/plugin/transform/src/internalClusterTest/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointServiceNodeTests.java @@ -59,6 +59,7 @@ import org.junit.Before; import java.nio.file.Path; +import java.time.Clock; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; @@ -143,6 +144,7 @@ public void createComponents() { // use a mock for the checkpoint service TransformAuditor mockAuditor = mock(TransformAuditor.class); transformCheckpointService = new TransformCheckpointService( + Clock.systemUTC(), Settings.EMPTY, new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null), transformsConfigManager, diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 982a564a2b91c..458fdd4d8d7fd 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -264,7 +264,8 @@ public Collection createComponents( ) { TransformConfigManager configManager = new IndexBasedTransformConfigManager(client, xContentRegistry); TransformAuditor auditor = new TransformAuditor(client, clusterService.getNodeName(), clusterService); - TransformCheckpointService checkpointService = new TransformCheckpointService(settings, clusterService, configManager, auditor); + TransformCheckpointService checkpointService = + new TransformCheckpointService(Clock.systemUTC(), settings, clusterService, configManager, auditor); SchedulerEngine scheduler = new SchedulerEngine(settings, Clock.systemUTC()); transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler)); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index 5a4acfea022f8..4893d59c4ccb3 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -33,6 +33,7 @@ import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; +import java.time.Clock; import java.time.Instant; import java.util.Arrays; import java.util.Collection; @@ -44,26 +45,29 @@ import java.util.TreeMap; import java.util.stream.Collectors; -public class DefaultCheckpointProvider implements CheckpointProvider { +class DefaultCheckpointProvider implements CheckpointProvider { // threshold when to audit concrete index names, above this threshold we only report the number of changes private static final int AUDIT_CONCRETED_SOURCE_INDEX_CHANGES = 10; private static final Logger logger = LogManager.getLogger(DefaultCheckpointProvider.class); + protected final Clock clock; protected final Client client; protected final RemoteClusterResolver remoteClusterResolver; protected final TransformConfigManager transformConfigManager; protected final TransformAuditor transformAuditor; protected final TransformConfig transformConfig; - public DefaultCheckpointProvider( + DefaultCheckpointProvider( + final Clock clock, final Client client, final RemoteClusterResolver remoteClusterResolver, final TransformConfigManager transformConfigManager, final TransformAuditor transformAuditor, final TransformConfig transformConfig ) { + this.clock = clock; this.client = client; this.remoteClusterResolver = remoteClusterResolver; this.transformConfigManager = transformConfigManager; @@ -78,19 +82,23 @@ public void sourceHasChanged(final TransformCheckpoint lastCheckpoint, final Act @Override public void createNextCheckpoint(final TransformCheckpoint lastCheckpoint, final ActionListener listener) { - final long timestamp = System.currentTimeMillis(); + final long timestamp = clock.millis(); final long checkpoint = TransformCheckpoint.isNullOrEmpty(lastCheckpoint) ? 1 : lastCheckpoint.getCheckpoint() + 1; - getIndexCheckpoints(ActionListener.wrap(checkpointsByIndex -> { - reportSourceIndexChanges( - TransformCheckpoint.isNullOrEmpty(lastCheckpoint) - ? Collections.emptySet() - : lastCheckpoint.getIndicesCheckpoints().keySet(), - checkpointsByIndex.keySet() - ); + getIndexCheckpoints( + ActionListener.wrap( + checkpointsByIndex -> { + reportSourceIndexChanges( + TransformCheckpoint.isNullOrEmpty(lastCheckpoint) + ? Collections.emptySet() + : lastCheckpoint.getIndicesCheckpoints().keySet(), + checkpointsByIndex.keySet() + ); - listener.onResponse(new TransformCheckpoint(transformConfig.getId(), timestamp, checkpoint, checkpointsByIndex, 0L)); - }, listener::onFailure)); + listener.onResponse(new TransformCheckpoint(transformConfig.getId(), timestamp, checkpoint, checkpointsByIndex, 0L)); + }, + listener::onFailure) + ); } protected void getIndexCheckpoints(ActionListener> listener) { @@ -257,7 +265,7 @@ public void getCheckpointingInfo( .setNextCheckpointPosition(nextCheckpointPosition) .setNextCheckpointProgress(nextCheckpointProgress); - long timestamp = System.currentTimeMillis(); + long timestamp = clock.millis(); getIndexCheckpoints(ActionListener.wrap(checkpointsByIndex -> { TransformCheckpoint sourceCheckpoint = new TransformCheckpoint(transformConfig.getId(), timestamp, -1L, checkpointsByIndex, 0L); @@ -280,7 +288,7 @@ public void getCheckpointingInfo( checkpointingInfoBuilder.setNextCheckpointPosition(nextCheckpointPosition).setNextCheckpointProgress(nextCheckpointProgress); checkpointingInfoBuilder.setLastCheckpoint(TransformCheckpoint.EMPTY); - long timestamp = System.currentTimeMillis(); + long timestamp = clock.millis(); // <3> got the source checkpoint, notify the user ActionListener> checkpointsByIndexListener = ActionListener.wrap(checkpointsByIndex -> { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java index 83e525a7a7fba..d9e59ce9ed3f4 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/RemoteClusterResolver.java @@ -33,15 +33,15 @@ static class ResolvedIndices { this.remoteIndicesPerClusterAlias = remoteIndicesPerClusterAlias; } - public Map> getRemoteIndicesPerClusterAlias() { + Map> getRemoteIndicesPerClusterAlias() { return remoteIndicesPerClusterAlias; } - public List getLocalIndices() { + List getLocalIndices() { return localIndices; } - public int numClusters() { + int numClusters() { return remoteIndicesPerClusterAlias.size() + (localIndices.isEmpty() ? 0 : 1); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProvider.java index 4316c8354aad9..40c63907773f6 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProvider.java @@ -15,54 +15,68 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.pivot.DateHistogramGroupSource; +import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; -public class TimeBasedCheckpointProvider extends DefaultCheckpointProvider { +import java.time.Clock; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; +import java.util.Map; + +import static java.util.function.Function.identity; + +class TimeBasedCheckpointProvider extends DefaultCheckpointProvider { private static final Logger logger = LogManager.getLogger(TimeBasedCheckpointProvider.class); private final TimeSyncConfig timeSyncConfig; + // function aligning the given timestamp with date histogram interval or identity function is aligning is not possible + private final Function alignTimestamp; TimeBasedCheckpointProvider( + final Clock clock, final Client client, final RemoteClusterResolver remoteClusterResolver, final TransformConfigManager transformConfigManager, final TransformAuditor transformAuditor, final TransformConfig transformConfig ) { - super(client, remoteClusterResolver, transformConfigManager, transformAuditor, transformConfig); + super(clock, client, remoteClusterResolver, transformConfigManager, transformAuditor, transformConfig); timeSyncConfig = (TimeSyncConfig) transformConfig.getSyncConfig(); + alignTimestamp = createAlignTimestampFunction(transformConfig); } @Override public void sourceHasChanged(TransformCheckpoint lastCheckpoint, ActionListener listener) { + final long timestamp = clock.millis(); + final long timeUpperBound = alignTimestamp.apply(timestamp - timeSyncConfig.getDelay().millis()); - final long timestamp = getTime(); - - SearchRequest searchRequest = new SearchRequest(transformConfig.getSource().getIndex()).allowPartialSearchResults(false) - .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().size(0) - // we only want to know if there is at least 1 new document - .trackTotalHitsUpTo(1); - - QueryBuilder queryBuilder = transformConfig.getSource().getQueryConfig().getQuery(); - BoolQueryBuilder filteredQuery = new BoolQueryBuilder().filter(queryBuilder) + BoolQueryBuilder queryBuilder = new BoolQueryBuilder() + .filter(transformConfig.getSource().getQueryConfig().getQuery()) .filter( - new RangeQueryBuilder(timeSyncConfig.getField()).gte(lastCheckpoint.getTimeUpperBound()) - .lt(timestamp - timeSyncConfig.getDelay().millis()) + new RangeQueryBuilder(timeSyncConfig.getField()) + .gte(lastCheckpoint.getTimeUpperBound()) + .lt(timeUpperBound) .format("epoch_millis") ); - - sourceBuilder.query(filteredQuery); - searchRequest.source(sourceBuilder); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder() + .size(0) + // we only want to know if there is at least 1 new document + .trackTotalHitsUpTo(1) + .query(queryBuilder); + SearchRequest searchRequest = new SearchRequest(transformConfig.getSource().getIndex()) + .allowPartialSearchResults(false) + .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .source(sourceBuilder); logger.trace("query for changes based on time: {}", sourceBuilder); @@ -72,17 +86,20 @@ public void sourceHasChanged(TransformCheckpoint lastCheckpoint, ActionListener< client, SearchAction.INSTANCE, searchRequest, - ActionListener.wrap(r -> { listener.onResponse(r.getHits().getTotalHits().value > 0L); }, listener::onFailure) + ActionListener.wrap( + r -> listener.onResponse(r.getHits().getTotalHits().value > 0L), + listener::onFailure + ) ); } @Override public void createNextCheckpoint(final TransformCheckpoint lastCheckpoint, final ActionListener listener) { - final long timestamp = getTime(); + final long timestamp = clock.millis(); final long checkpoint = TransformCheckpoint.isNullOrEmpty(lastCheckpoint) ? 1 : lastCheckpoint.getCheckpoint() + 1; // for time based synchronization - long timeUpperBound = timestamp - timeSyncConfig.getDelay().millis(); + final long timeUpperBound = alignTimestamp.apply(timestamp - timeSyncConfig.getDelay().millis()); getIndexCheckpoints( ActionListener.wrap( @@ -96,8 +113,35 @@ public void createNextCheckpoint(final TransformCheckpoint lastCheckpoint, final ); } - // for the purpose of testing - long getTime() { - return System.currentTimeMillis(); + /** + * Aligns the timestamp with date histogram group source interval (if it is provided). + * + * @param transformConfig transform configuration + * @return function aligning the given timestamp with date histogram interval + */ + private static Function createAlignTimestampFunction(TransformConfig transformConfig) { + if (Boolean.FALSE.equals(transformConfig.getSettings().getInterimResults()) == false) { + return identity(); + } + if (transformConfig.getPivotConfig() == null) { + return identity(); + } + if (transformConfig.getPivotConfig().getGroupConfig() == null) { + return identity(); + } + Map groups = transformConfig.getPivotConfig().getGroupConfig().getGroups(); + if (groups == null || groups.isEmpty()) { + return identity(); + } + Optional dateHistogramGroupSource = + groups.values().stream() + .filter(DateHistogramGroupSource.class::isInstance) + .map(DateHistogramGroupSource.class::cast) + .filter(group -> Objects.equals(group.getField(), transformConfig.getSyncConfig().getField())) + .findFirst(); + if (dateHistogramGroupSource.isEmpty()) { + return identity(); + } + return dateHistogramGroupSource.get().getRounding()::round; } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java index a2819851e21f5..4eebaf4fbde7b 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TransformCheckpointService.java @@ -21,6 +21,8 @@ import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; +import java.time.Clock; + /** * Transform Checkpoint Service * @@ -33,16 +35,19 @@ public class TransformCheckpointService { private static final Logger logger = LogManager.getLogger(TransformCheckpointService.class); + private final Clock clock; private final TransformConfigManager transformConfigManager; private final TransformAuditor transformAuditor; private final RemoteClusterResolver remoteClusterResolver; public TransformCheckpointService( + final Clock clock, final Settings settings, final ClusterService clusterService, final TransformConfigManager transformConfigManager, TransformAuditor transformAuditor ) { + this.clock = clock; this.transformConfigManager = transformConfigManager; this.transformAuditor = transformAuditor; this.remoteClusterResolver = new RemoteClusterResolver(settings, clusterService.getClusterSettings()); @@ -51,6 +56,7 @@ public TransformCheckpointService( public CheckpointProvider getCheckpointProvider(final Client client, final TransformConfig transformConfig) { if (transformConfig.getSyncConfig() instanceof TimeSyncConfig) { return new TimeBasedCheckpointProvider( + clock, client, remoteClusterResolver, transformConfigManager, @@ -59,7 +65,8 @@ public CheckpointProvider getCheckpointProvider(final Client client, final Trans ); } - return new DefaultCheckpointProvider(client, remoteClusterResolver, transformConfigManager, transformAuditor, transformConfig); + return new DefaultCheckpointProvider( + clock, client, remoteClusterResolver, transformConfigManager, transformAuditor, transformConfig); } /** diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java index 05c61cd233b54..bb87e28675ea0 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProviderTests.java @@ -17,6 +17,9 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; @@ -28,6 +31,7 @@ import org.elasticsearch.test.MockLogAppender.LoggingExpectation; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests; import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor; @@ -36,6 +40,9 @@ import org.junit.Before; import org.mockito.stubbing.Answer; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; import java.util.Collections; import java.util.HashSet; import java.util.concurrent.CountDownLatch; @@ -46,19 +53,24 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class DefaultCheckpointProviderTests extends ESTestCase { - private Client client; + private static Logger checkpointProviderLogger = LogManager.getLogger(DefaultCheckpointProvider.class); - private MockTransformAuditor transformAuditor; + private Clock clock; + private Client client; private IndexBasedTransformConfigManager transformConfigManager; - private Logger checkpointProviderlogger = LogManager.getLogger(DefaultCheckpointProvider.class); + private MockTransformAuditor transformAuditor; @Before public void setUpMocks() { + clock = Clock.fixed(Instant.now(), ZoneId.systemDefault()); ThreadPool threadPool = mock(ThreadPool.class); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); client = mock(Client.class); @@ -70,19 +82,12 @@ public void setUpMocks() { public void testReportSourceIndexChangesRunsEmpty() throws Exception { String transformId = getTestName(); TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId); - - DefaultCheckpointProvider provider = new DefaultCheckpointProvider( - client, - new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), - transformConfigManager, - transformAuditor, - transformConfig - ); + DefaultCheckpointProvider provider = newCheckpointProvider(transformConfig); assertExpectation( new MockLogAppender.SeenEventExpectation( "warn when source is empty", - checkpointProviderlogger.getName(), + checkpointProviderLogger.getName(), Level.WARN, "[" + transformId + "] Source did not resolve to any open indexes" ), @@ -98,7 +103,7 @@ public void testReportSourceIndexChangesRunsEmpty() throws Exception { assertExpectation( new MockLogAppender.UnseenEventExpectation( "do not warn if empty again", - checkpointProviderlogger.getName(), + checkpointProviderLogger.getName(), Level.WARN, "Source did not resolve to any concrete indexes" ), @@ -115,19 +120,12 @@ public void testReportSourceIndexChangesRunsEmpty() throws Exception { public void testReportSourceIndexChangesAddDelete() throws Exception { String transformId = getTestName(); TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId); - - DefaultCheckpointProvider provider = new DefaultCheckpointProvider( - client, - new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), - transformConfigManager, - transformAuditor, - transformConfig - ); + DefaultCheckpointProvider provider = newCheckpointProvider(transformConfig); assertExpectation( new MockLogAppender.SeenEventExpectation( "info about adds/removal", - checkpointProviderlogger.getName(), + checkpointProviderLogger.getName(), Level.DEBUG, "[" + transformId + "] Source index resolve found changes, removedIndexes: [index], new indexes: [other_index]" ), @@ -143,7 +141,7 @@ public void testReportSourceIndexChangesAddDelete() throws Exception { assertExpectation( new MockLogAppender.SeenEventExpectation( "info about adds/removal", - checkpointProviderlogger.getName(), + checkpointProviderLogger.getName(), Level.DEBUG, "[" + transformId + "] Source index resolve found changes, removedIndexes: [index], new indexes: []" ), @@ -158,7 +156,7 @@ public void testReportSourceIndexChangesAddDelete() throws Exception { assertExpectation( new MockLogAppender.SeenEventExpectation( "info about adds/removal", - checkpointProviderlogger.getName(), + checkpointProviderLogger.getName(), Level.DEBUG, "[" + transformId + "] Source index resolve found changes, removedIndexes: [], new indexes: [other_index]" ), @@ -175,14 +173,7 @@ public void testReportSourceIndexChangesAddDelete() throws Exception { public void testReportSourceIndexChangesAddDeleteMany() throws Exception { String transformId = getTestName(); TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId); - - DefaultCheckpointProvider provider = new DefaultCheckpointProvider( - client, - new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), - transformConfigManager, - transformAuditor, - transformConfig - ); + DefaultCheckpointProvider provider = newCheckpointProvider(transformConfig); HashSet oldSet = new HashSet<>(); for (int i = 0; i < 100; ++i) { @@ -196,7 +187,7 @@ public void testReportSourceIndexChangesAddDeleteMany() throws Exception { assertExpectation( new MockLogAppender.SeenEventExpectation( "info about adds/removal", - checkpointProviderlogger.getName(), + checkpointProviderLogger.getName(), Level.DEBUG, "[" + transformId + "] Source index resolve found more than 10 changes, [50] removed indexes, [50] new indexes" ), @@ -234,6 +225,7 @@ public void testHandlingShardFailures() throws Exception { doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); DefaultCheckpointProvider provider = new DefaultCheckpointProvider( + clock, client, remoteClusterResolver, transformConfigManager, @@ -259,24 +251,52 @@ public void testHandlingShardFailures() throws Exception { latch.await(10, TimeUnit.SECONDS); } + public void testSourceHasChanged() throws InterruptedException { + String transformId = getTestName(); + TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId); + DefaultCheckpointProvider provider = newCheckpointProvider(transformConfig); + + SetOnce hasChangedHolder = new SetOnce<>(); + SetOnce exceptionHolder = new SetOnce<>(); + CountDownLatch latch = new CountDownLatch(1); + provider.sourceHasChanged( + TransformCheckpoint.EMPTY, + new LatchedActionListener<>(ActionListener.wrap(hasChangedHolder::set, exceptionHolder::set), latch) + ); + assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true)); + assertThat(hasChangedHolder.get(), is(equalTo(false))); + assertThat(exceptionHolder.get(), is(nullValue())); + } + + private DefaultCheckpointProvider newCheckpointProvider(TransformConfig transformConfig) { + return new DefaultCheckpointProvider( + clock, + client, + new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), + transformConfigManager, + transformAuditor, + transformConfig + ); + } + private void assertExpectation(LoggingExpectation loggingExpectation, AuditExpectation auditExpectation, Runnable codeBlock) throws IllegalAccessException { MockLogAppender mockLogAppender = new MockLogAppender(); mockLogAppender.start(); - Loggers.setLevel(checkpointProviderlogger, Level.DEBUG); + Loggers.setLevel(checkpointProviderLogger, Level.DEBUG); mockLogAppender.addExpectation(loggingExpectation); // always start fresh transformAuditor.reset(); transformAuditor.addExpectation(auditExpectation); try { - Loggers.addAppender(checkpointProviderlogger, mockLogAppender); + Loggers.addAppender(checkpointProviderLogger, mockLogAppender); codeBlock.run(); mockLogAppender.assertAllExpectationsMatched(); transformAuditor.assertAllExpectationsMatched(); } finally { - Loggers.removeAppender(checkpointProviderlogger, mockLogAppender); + Loggers.removeAppender(checkpointProviderLogger, mockLogAppender); mockLogAppender.stop(); } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java new file mode 100644 index 0000000000000..e2aca8d8fe8aa --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java @@ -0,0 +1,328 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.checkpoint; + +import org.apache.lucene.search.TotalHits; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.action.admin.indices.get.GetIndexAction; +import org.elasticsearch.action.admin.indices.get.GetIndexResponse; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchResponseSections; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; +import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests; +import org.elasticsearch.xpack.core.transform.transforms.pivot.AggregationConfigTests; +import org.elasticsearch.xpack.core.transform.transforms.pivot.DateHistogramGroupSource; +import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfigTests; +import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig; +import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource; +import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor; +import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager; +import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.mockito.stubbing.Answer; + +import java.time.Clock; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import static java.util.Collections.emptyMap; +import static org.elasticsearch.core.Tuple.tuple; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TimeBasedCheckpointProviderTests extends ESTestCase { + + private static final String TIMESTAMP_FIELD = "@timestamp"; + + private Clock clock; + private Client client; + private IndexBasedTransformConfigManager transformConfigManager; + private MockTransformAuditor transformAuditor; + + @Before + public void setUpMocks() { + clock = mock(Clock.class); + when(clock.millis()).thenReturn(123456789L); + client = mock(Client.class); + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + when(client.threadPool()).thenReturn(threadPool); + transformConfigManager = mock(IndexBasedTransformConfigManager.class); + transformAuditor = MockTransformAuditor.createMockAuditor(); + } + + public void testSourceHasChanged_NotChanged() throws InterruptedException { + testSourceHasChanged( + 0, + false, + TransformCheckpoint.EMPTY, + TIMESTAMP_FIELD, + TimeValue.timeValueMinutes(10), + TimeValue.ZERO, + tuple(0L, 123000000L)); + } + + public void testSourceHasChanged_Changed() throws InterruptedException { + testSourceHasChanged( + 1, + true, + TransformCheckpoint.EMPTY, + TIMESTAMP_FIELD, + TimeValue.timeValueMinutes(10), + TimeValue.ZERO, + tuple(0L, 123000000L) + ); + } + + public void testSourceHasChanged_UnfinishedCheckpoint() throws InterruptedException { + testSourceHasChanged( + 0, + false, + new TransformCheckpoint("", 100000000L, 7, emptyMap(), null), + TIMESTAMP_FIELD, + TimeValue.timeValueMinutes(10), + TimeValue.ZERO, + tuple(0L, 123000000L) + ); + } + + public void testSourceHasChanged_SubsequentCheckpoint() throws InterruptedException { + testSourceHasChanged( + 0, + false, + new TransformCheckpoint("", 100000000L, 7, emptyMap(), 120000000L), + TIMESTAMP_FIELD, + TimeValue.timeValueMinutes(10), + TimeValue.ZERO, + tuple(120000000L, 123000000L) + ); + } + + public void testSourceHasChanged_WithDelay() throws InterruptedException { + testSourceHasChanged( + 0, + false, + new TransformCheckpoint("", 100000000L, 7, emptyMap(), 120000000L), + TIMESTAMP_FIELD, + TimeValue.timeValueMinutes(10), + TimeValue.timeValueMinutes(5), + tuple(120000000L, 123000000L) + ); + } + + private void testSourceHasChanged(long totalHits, + boolean expectedHasChangedValue, + TransformCheckpoint lastCheckpoint, + String dateHistogramField, + TimeValue dateHistogramInterval, + TimeValue delay, + Tuple expectedRangeQueryBounds) throws InterruptedException { + doAnswer(withResponse(newSearchResponse(totalHits))).when(client).execute(eq(SearchAction.INSTANCE), any(), any()); + String transformId = getTestName(); + TransformConfig transformConfig = + newTransformConfigWithDateHistogram(transformId, dateHistogramField, dateHistogramInterval, delay); + TimeBasedCheckpointProvider provider = newCheckpointProvider(transformConfig); + + SetOnce hasChangedHolder = new SetOnce<>(); + SetOnce exceptionHolder = new SetOnce<>(); + CountDownLatch latch = new CountDownLatch(1); + provider.sourceHasChanged( + lastCheckpoint, + new LatchedActionListener<>(ActionListener.wrap(hasChangedHolder::set, exceptionHolder::set), latch) + ); + assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true)); + + ArgumentCaptor searchRequestArgumentCaptor = ArgumentCaptor.forClass(SearchRequest.class); + verify(client).execute(eq(SearchAction.INSTANCE), searchRequestArgumentCaptor.capture(), any()); + SearchRequest searchRequest = searchRequestArgumentCaptor.getValue(); + BoolQueryBuilder boolQuery = (BoolQueryBuilder) searchRequest.source().query(); + RangeQueryBuilder rangeQuery = (RangeQueryBuilder) boolQuery.filter().get(1); + assertThat(rangeQuery.from(), is(equalTo(expectedRangeQueryBounds.v1()))); + assertThat(rangeQuery.to(), is(equalTo(expectedRangeQueryBounds.v2()))); + + assertThat(hasChangedHolder.get(), is(equalTo(expectedHasChangedValue))); + assertThat(exceptionHolder.get(), is(nullValue())); + } + + public void testCreateNextCheckpoint_NoDelay() throws InterruptedException { + String transformId = getTestName(); + testCreateNextCheckpoint( + transformId, + TIMESTAMP_FIELD, + TimeValue.timeValueMinutes(10), + TimeValue.ZERO, + new TransformCheckpoint(transformId, 100000000L, 7, emptyMap(), 120000000L), + new TransformCheckpoint(transformId, 123456789L, 8, emptyMap(), 123000000L)); + } + + public void testCreateNextCheckpoint_SmallDelay() throws InterruptedException { + String transformId = getTestName(); + testCreateNextCheckpoint( + transformId, + TIMESTAMP_FIELD, + TimeValue.timeValueMinutes(10), + TimeValue.timeValueMinutes(5), + new TransformCheckpoint(transformId, 100000000L, 7, emptyMap(), 120000000L), + new TransformCheckpoint(transformId, 123456789L, 8, emptyMap(), 123000000L)); + } + + public void testCreateNextCheckpoint_BigDelay() throws InterruptedException { + String transformId = getTestName(); + testCreateNextCheckpoint( + transformId, + TIMESTAMP_FIELD, + TimeValue.timeValueMinutes(10), + TimeValue.timeValueMinutes(10), + new TransformCheckpoint(transformId, 100000000L, 7, emptyMap(), 120000000L), + new TransformCheckpoint(transformId, 123456789L, 8, emptyMap(), 122400000L)); + } + + private void testCreateNextCheckpoint(String transformId, + String dateHistogramField, + TimeValue dateHistogramInterval, + TimeValue delay, + TransformCheckpoint lastCheckpoint, + TransformCheckpoint expectedNextCheckpoint) throws InterruptedException { + GetIndexResponse getIndexResponse = + new GetIndexResponse( + new String[] { "some-index" }, + ImmutableOpenMap.of(), + ImmutableOpenMap.of(), + ImmutableOpenMap.of(), + ImmutableOpenMap.of(), + ImmutableOpenMap.of()); + doAnswer(withResponse(getIndexResponse)).when(client).execute(eq(GetIndexAction.INSTANCE), any(), any()); + IndicesStatsResponse indicesStatsResponse = mock(IndicesStatsResponse.class); + when(indicesStatsResponse.getShards()).thenReturn(new ShardStats[0]); + when(indicesStatsResponse.getFailedShards()).thenReturn(0); + doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); + + TransformConfig transformConfig = + newTransformConfigWithDateHistogram(transformId, dateHistogramField, dateHistogramInterval, delay); + TimeBasedCheckpointProvider provider = newCheckpointProvider(transformConfig); + + SetOnce checkpointHolder = new SetOnce<>(); + SetOnce exceptionHolder = new SetOnce<>(); + CountDownLatch latch = new CountDownLatch(1); + provider.createNextCheckpoint( + lastCheckpoint, + new LatchedActionListener<>(ActionListener.wrap(checkpointHolder::set, exceptionHolder::set), latch) + ); + assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true)); + assertThat(checkpointHolder.get(), is(equalTo(expectedNextCheckpoint))); + assertThat(exceptionHolder.get(), is(nullValue())); + } + + private TimeBasedCheckpointProvider newCheckpointProvider(TransformConfig transformConfig) { + return new TimeBasedCheckpointProvider( + clock, + client, + new RemoteClusterResolver(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), + transformConfigManager, + transformAuditor, + transformConfig + ); + } + + private static TransformConfig newTransformConfigWithDateHistogram(String transformId, + String dateHistogramField, + TimeValue dateHistogramInterval, + TimeValue delay) { + DateHistogramGroupSource dateHistogramGroupSource = new DateHistogramGroupSource( + dateHistogramField, + null, + false, + new DateHistogramGroupSource.FixedInterval(new DateHistogramInterval(dateHistogramInterval.getStringRep())), + null + ); + Supplier singleGroupSourceSupplier = + new Supplier<>() { + int groupCount = 0; + @Override + public SingleGroupSource get() { + return ++groupCount == 1 + ? dateHistogramGroupSource + : GroupConfigTests.randomSingleGroupSource(Version.CURRENT); + } + }; + PivotConfig pivotConfigWithDateHistogramSource = + new PivotConfig( + GroupConfigTests.randomGroupConfig(singleGroupSourceSupplier), + AggregationConfigTests.randomAggregationConfig(), + null // deprecated + ); + return new TransformConfig.Builder(TransformConfigTests.randomTransformConfig(transformId)) + .setSettings(new SettingsConfig.Builder().setInterimResults(false).build()) + .setPivotConfig(pivotConfigWithDateHistogramSource) + .setSyncConfig(new TimeSyncConfig(TIMESTAMP_FIELD, delay)) + .build(); + } + + private static SearchResponse newSearchResponse(long totalHits) { + return new SearchResponse( + new SearchResponseSections( + new SearchHits(SearchHits.EMPTY, new TotalHits(totalHits, TotalHits.Relation.EQUAL_TO), 0), + null, + null, + false, + false, + null, + 0 + ), + null, + 1, + 1, + 0, + 0, + ShardSearchFailure.EMPTY_ARRAY, + null + ); + } + + @SuppressWarnings("unchecked") + private static Answer withResponse(Response response) { + return invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(response); + return null; + }; + } +} diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index c1902b9631c01..9c32e6a5a3f07 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -309,7 +309,7 @@ public void testPageSizeAdapt() throws Exception { randomPivotConfig(), null, randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), - new SettingsConfig(pageSize, null, (Boolean) null), + new SettingsConfig(pageSize, null, (Boolean) null, null), null, null, null @@ -383,7 +383,7 @@ public void testDoProcessAggNullCheck() { randomPivotConfig(), null, randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), - new SettingsConfig(pageSize, null, (Boolean) null), + new SettingsConfig(pageSize, null, (Boolean) null, null), null, null, null @@ -446,7 +446,7 @@ public void testScriptError() throws Exception { randomPivotConfig(), null, randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), - new SettingsConfig(pageSize, null, (Boolean) null), + new SettingsConfig(pageSize, null, (Boolean) null, null), null, null, null diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java index 65d846323b06b..3091c4f62c089 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java @@ -505,7 +505,7 @@ public void testStopAtCheckpointForThrottledTransform() throws Exception { randomPivotConfig(), null, randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), - new SettingsConfig(null, Float.valueOf(1.0f), (Boolean) null), + new SettingsConfig(null, Float.valueOf(1.0f), (Boolean) null, (Boolean) null), null, null, null diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java index 06a1a0baba6eb..1df06e3329440 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager; import org.elasticsearch.xpack.transform.persistence.TransformInternalIndexTests; +import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -401,6 +402,7 @@ public TransformPersistentTasksExecutor buildTaskExecutor() { TransformAuditor mockAuditor = mock(TransformAuditor.class); IndexBasedTransformConfigManager transformsConfigManager = new IndexBasedTransformConfigManager(client, xContentRegistry()); TransformCheckpointService transformCheckpointService = new TransformCheckpointService( + Clock.systemUTC(), Settings.EMPTY, new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null), transformsConfigManager, diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java index db8d3338a75a7..d6a1bc8d1efa9 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java @@ -39,6 +39,7 @@ import org.junit.After; import org.junit.Before; +import java.time.Clock; import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; @@ -76,6 +77,7 @@ public void testStopOnFailedTaskWithStoppedIndexer() { TransformAuditor auditor = MockTransformAuditor.createMockAuditor(); TransformConfigManager transformsConfigManager = new InMemoryTransformConfigManager(); TransformCheckpointService transformsCheckpointService = new TransformCheckpointService( + Clock.systemUTC(), Settings.EMPTY, new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null), transformsConfigManager,