From 3dbefa6e3ae33963b4797dcd8c5028967c1c8957 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 9 Jul 2019 09:08:17 +0100 Subject: [PATCH 1/2] [ML-DataFrame] Add a frequency option to transform config, default 1m Previously a data frame transform would check whether the source index was changed every 10 seconds. Sometimes it may be desirable for the check to be done less frequently. This commit increases the default to 60 seconds but also allows the frequency to be overridden by a setting in the data frame transform config. --- .../transforms/DataFrameTransformConfig.java | 39 +++++++++--- .../DataFrameTransformConfigTests.java | 2 + .../DataFrameTransformDocumentationIT.java | 5 +- .../dataframe/put_data_frame.asciidoc | 5 +- .../data-frames/apis/put-transform.asciidoc | 6 ++ .../xpack/core/dataframe/DataFrameField.java | 1 + .../transforms/DataFrameTransform.java | 37 ++++++++--- .../transforms/DataFrameTransformConfig.java | 62 +++++++++++++++---- ...wDataFrameTransformActionRequestTests.java | 1 + .../DataFrameTransformConfigTests.java | 17 ++--- .../transforms/DataFrameTransformTests.java | 4 +- .../integration/DataFrameIntegTestCase.java | 1 + .../DataFrameGetAndGetStatsIT.java | 1 + .../integration/DataFramePivotRestIT.java | 1 + .../DataFrameTransformProgressIT.java | 3 + ...ransportStartDataFrameTransformAction.java | 6 +- .../transforms/DataFrameTransformTask.java | 8 ++- .../dataframe/action/DataFrameNodesTests.java | 4 +- .../transforms/DataFrameIndexerTests.java | 1 + ...TransformPersistentTasksExecutorTests.java | 10 +-- .../test/data_frame/transforms_crud.yml | 34 ++++++++++ 21 files changed, 196 insertions(+), 52 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java index 355e3ad9bbc0f..2810d6a8cfad2 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; @@ -44,6 +45,7 @@ public class DataFrameTransformConfig implements ToXContentObject { public static final ParseField ID = new ParseField("id"); public static final ParseField SOURCE = new ParseField("source"); public static final ParseField DEST = new ParseField("dest"); + public static final ParseField FREQUENCY = new ParseField("frequency"); public static final ParseField DESCRIPTION = new ParseField("description"); public static final ParseField SYNC = new ParseField("sync"); public static final ParseField VERSION = new ParseField("version"); @@ -54,6 +56,7 @@ public class DataFrameTransformConfig implements ToXContentObject { private final String id; private final SourceConfig source; private final DestConfig dest; + private final TimeValue frequency; private final SyncConfig syncConfig; private final PivotConfig pivotConfig; private final String description; @@ -66,14 +69,16 @@ public class DataFrameTransformConfig implements ToXContentObject { String id = (String) args[0]; SourceConfig source = (SourceConfig) args[1]; DestConfig dest = (DestConfig) args[2]; - SyncConfig syncConfig = (SyncConfig) args[3]; - PivotConfig pivotConfig = (PivotConfig) args[4]; - String description = (String)args[5]; - Instant createTime = (Instant)args[6]; - String transformVersion = (String)args[7]; + TimeValue frequency = (TimeValue) args[3]; + SyncConfig syncConfig = (SyncConfig) args[4]; + PivotConfig pivotConfig = (PivotConfig) args[5]; + String description = (String)args[6]; + Instant createTime = (Instant)args[7]; + String transformVersion = (String)args[8]; return new DataFrameTransformConfig(id, source, dest, + frequency, syncConfig, pivotConfig, description, @@ -85,6 +90,8 @@ public class DataFrameTransformConfig implements ToXContentObject { PARSER.declareString(constructorArg(), ID); PARSER.declareObject(constructorArg(), (p, c) -> SourceConfig.PARSER.apply(p, null), SOURCE); PARSER.declareObject(constructorArg(), (p, c) -> DestConfig.PARSER.apply(p, null), DEST); + PARSER.declareField(optionalConstructorArg(), p -> TimeValue.parseTimeValue(p.text(), FREQUENCY.getPreferredName()), + FREQUENCY, ObjectParser.ValueType.STRING); PARSER.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p), SYNC); PARSER.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p), PIVOT_TRANSFORM); PARSER.declareString(optionalConstructorArg(), DESCRIPTION); @@ -118,12 +125,13 @@ public static DataFrameTransformConfig fromXContent(final XContentParser parser) * @return A DataFrameTransformConfig to preview, NOTE it will have a {@code null} id, destination and index. */ public static DataFrameTransformConfig forPreview(final SourceConfig source, final PivotConfig pivotConfig) { - return new DataFrameTransformConfig(null, source, null, null, pivotConfig, null, null, null); + return new DataFrameTransformConfig(null, source, null, null, null, pivotConfig, null, null, null); } DataFrameTransformConfig(final String id, final SourceConfig source, final DestConfig dest, + final TimeValue frequency, final SyncConfig syncConfig, final PivotConfig pivotConfig, final String description, @@ -132,6 +140,7 @@ public static DataFrameTransformConfig forPreview(final SourceConfig source, fin this.id = id; this.source = source; this.dest = dest; + this.frequency = frequency; this.syncConfig = syncConfig; this.pivotConfig = pivotConfig; this.description = description; @@ -151,6 +160,10 @@ public DestConfig getDestination() { return dest; } + public TimeValue getFrequency() { + return frequency; + } + public SyncConfig getSyncConfig() { return syncConfig; } @@ -184,6 +197,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa if (dest != null) { builder.field(DEST.getPreferredName(), dest); } + if (frequency != null) { + builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep()); + } if (syncConfig != null) { builder.startObject(SYNC.getPreferredName()); builder.field(syncConfig.getName(), syncConfig); @@ -220,6 +236,7 @@ public boolean equals(Object other) { return Objects.equals(this.id, that.id) && Objects.equals(this.source, that.source) && Objects.equals(this.dest, that.dest) + && Objects.equals(this.frequency, that.frequency) && Objects.equals(this.description, that.description) && Objects.equals(this.syncConfig, that.syncConfig) && Objects.equals(this.transformVersion, that.transformVersion) @@ -229,7 +246,7 @@ public boolean equals(Object other) { @Override public int hashCode() { - return Objects.hash(id, source, dest, syncConfig, pivotConfig, description); + return Objects.hash(id, source, dest, frequency, syncConfig, pivotConfig, description); } @Override @@ -246,6 +263,7 @@ public static class Builder { private String id; private SourceConfig source; private DestConfig dest; + private TimeValue frequency; private SyncConfig syncConfig; private PivotConfig pivotConfig; private String description; @@ -265,6 +283,11 @@ public Builder setDest(DestConfig dest) { return this; } + public Builder setFrequency(TimeValue frequency) { + this.frequency = frequency; + return this; + } + public Builder setSyncConfig(SyncConfig syncConfig) { this.syncConfig = syncConfig; return this; @@ -281,7 +304,7 @@ public Builder setDescription(String description) { } public DataFrameTransformConfig build() { - return new DataFrameTransformConfig(id, source, dest, syncConfig, pivotConfig, description, null, null); + return new DataFrameTransformConfig(id, source, dest, frequency, syncConfig, pivotConfig, description, null, null); } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java index 79b7e85098e04..7a42a6f70d950 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.Version; import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfigTests; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.SearchModule; @@ -43,6 +44,7 @@ public static DataFrameTransformConfig randomDataFrameTransformConfig() { return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomSourceConfig(), randomDestConfig(), + randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1000, 1000000)), randomBoolean() ? null : randomSyncConfig(), PivotConfigTests.randomPivotConfig(), randomBoolean() ? null : randomAlphaOfLengthBetween(1, 100), diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java index 731d42f902c50..048f383ce3a4f 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java @@ -156,8 +156,9 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException .setId("reviewer-avg-rating") // <1> .setSource(sourceConfig) // <2> .setDest(destConfig) // <3> - .setPivotConfig(pivotConfig) // <4> - .setDescription("This is my test transform") // <5> + .setFrequency(TimeValue.timeValueSeconds(15)) // <4> + .setPivotConfig(pivotConfig) // <5> + .setDescription("This is my test transform") // <6> .build(); // end::put-data-frame-transform-config diff --git a/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc b/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc index 19c7fe443dbcd..3ba16a987f98c 100644 --- a/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc +++ b/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc @@ -34,8 +34,9 @@ include-tagged::{doc-tests-file}[{api}-config] <1> The {dataframe-transform} ID <2> The source indices and query from which to gather data <3> The destination index and optional pipeline -<4> The PivotConfig -<5> Optional free text description of the transform +<4> How often to check for updates to the source indices +<5> The PivotConfig +<6> Optional free text description of the transform [id="{upid}-{api}-query-config"] diff --git a/docs/reference/data-frames/apis/put-transform.asciidoc b/docs/reference/data-frames/apis/put-transform.asciidoc index abc5779e12a88..d8fe652639d62 100644 --- a/docs/reference/data-frames/apis/put-transform.asciidoc +++ b/docs/reference/data-frames/apis/put-transform.asciidoc @@ -60,6 +60,11 @@ IMPORTANT: You must use {kib} or this API to create a {dataframe-transform}. (object) The destination configuration, which consists of `index` and optionally a `pipeline` id. See <>. +`frequency` (Optional):: + (time units) The interval between checks for changes in the source indices + when the {dataframe-transform} is running continuously. Defaults to `1m`. + The lowest permitted value is `1s`; the highest `1h`. + `pivot` (Optional):: (object) Defines the pivot function `group by` fields and the aggregation to reduce the data. See <>. @@ -90,6 +95,7 @@ PUT _data_frame/transforms/ecommerce_transform "index": "kibana_sample_data_ecommerce_transform", "pipeline": "add_timestamp_pipeline" }, + "frequency": "5m", "pivot": { "group_by": { "customer_id": { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java index 2dedef82eb3cf..9d5db1e5022e3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java @@ -26,6 +26,7 @@ public final class DataFrameField { public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type"); public static final ParseField SOURCE = new ParseField("source"); public static final ParseField DESTINATION = new ParseField("dest"); + public static final ParseField FREQUENCY = new ParseField("frequency"); public static final ParseField FORCE = new ParseField("force"); public static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size"); public static final ParseField FIELD = new ParseField("field"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransform.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransform.java index e620e4f859543..8ee56269c3cda 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransform.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransform.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -24,25 +25,30 @@ public class DataFrameTransform extends AbstractDiffable imp public static final String NAME = DataFrameField.TASK_NAME; public static final ParseField VERSION = new ParseField(DataFrameField.VERSION); + public static final ParseField FREQUENCY = DataFrameField.FREQUENCY; private final String transformId; private final Version version; + private final TimeValue frequency; - public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, - a -> new DataFrameTransform((String) a[0], (String) a[1])); + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, true, + a -> new DataFrameTransform((String) a[0], (String) a[1], (String) a[2])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameField.ID); PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), VERSION); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FREQUENCY); } - private DataFrameTransform(String transformId, String version) { - this(transformId, version == null ? null : Version.fromString(version)); + private DataFrameTransform(String transformId, String version, String frequency) { + this(transformId, version == null ? null : Version.fromString(version), + frequency == null ? null : TimeValue.parseTimeValue(frequency, FREQUENCY.getPreferredName())); } - public DataFrameTransform(String transformId, Version version) { + public DataFrameTransform(String transformId, Version version, TimeValue frequency) { this.transformId = transformId; this.version = version == null ? Version.V_7_2_0 : version; + this.frequency = frequency; } public DataFrameTransform(StreamInput in) throws IOException { @@ -52,6 +58,11 @@ public DataFrameTransform(StreamInput in) throws IOException { } else { this.version = Version.V_7_2_0; } + if (in.getVersion().onOrAfter(Version.CURRENT)) { + this.frequency = in.readOptionalTimeValue(); + } else { + this.frequency = null; + } } @Override @@ -70,6 +81,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_3_0)) { Version.writeVersion(version, out); } + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeOptionalTimeValue(frequency); + } } @Override @@ -77,6 +91,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field(DataFrameField.ID.getPreferredName(), transformId); builder.field(VERSION.getPreferredName(), version); + if (frequency != null) { + builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep()); + } builder.endObject(); return builder; } @@ -89,6 +106,10 @@ public Version getVersion() { return version; } + public TimeValue getFrequency() { + return frequency; + } + public static DataFrameTransform fromXContent(XContentParser parser) throws IOException { return PARSER.parse(parser, null); } @@ -105,11 +126,13 @@ public boolean equals(Object other) { DataFrameTransform that = (DataFrameTransform) other; - return Objects.equals(this.transformId, that.transformId) && Objects.equals(this.version, that.version); + return Objects.equals(this.transformId, that.transformId) + && Objects.equals(this.version, that.version) + && Objects.equals(this.frequency, that.frequency); } @Override public int hashCode() { - return Objects.hash(transformId, version); + return Objects.hash(transformId, version, frequency); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java index e3ad50d9b889e..5de93540dc116 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; @@ -52,10 +53,13 @@ public class DataFrameTransformConfig extends AbstractDiffable STRICT_PARSER = createParser(false); private static final ConstructingObjectParser LENIENT_PARSER = createParser(true); private static final int MAX_DESCRIPTION_LENGTH = 1_000; + private static final TimeValue MIN_FREQUENCY = TimeValue.timeValueSeconds(1); + private static final TimeValue MAX_FREQUENCY = TimeValue.timeValueHours(1); private final String id; private final SourceConfig source; private final DestConfig dest; + private final TimeValue frequency; private final SyncConfig syncConfig; private final String description; // headers store the user context from the creating user, which allows us to run the transform as this user @@ -88,35 +92,50 @@ private static ConstructingObjectParser create SourceConfig source = (SourceConfig) args[1]; DestConfig dest = (DestConfig) args[2]; - SyncConfig syncConfig = (SyncConfig) args[3]; - // ignored, only for internal storage: String docType = (String) args[4]; + TimeValue frequency = + args[3] == null ? null : TimeValue.parseTimeValue((String) args[3], DataFrameField.FREQUENCY.getPreferredName()); + if (lenient == false && frequency != null) { + if (frequency.compareTo(MIN_FREQUENCY) < 0) { + throw new IllegalArgumentException( + "minimum permitted [" + DataFrameField.FREQUENCY + "] is [" + MIN_FREQUENCY + "]"); + } + if (frequency.compareTo(MAX_FREQUENCY) > 0) { + throw new IllegalArgumentException( + "highest permitted [" + DataFrameField.FREQUENCY + "] is [" + MAX_FREQUENCY + "]"); + } + } + + SyncConfig syncConfig = (SyncConfig) args[4]; + // ignored, only for internal storage: String docType = (String) args[5]; // on strict parsing do not allow injection of headers, transform version, or create time if (lenient == false) { - validateStrictParsingParams(args[5], HEADERS.getPreferredName()); - validateStrictParsingParams(args[8], CREATE_TIME.getPreferredName()); - validateStrictParsingParams(args[9], VERSION.getPreferredName()); + validateStrictParsingParams(args[6], HEADERS.getPreferredName()); + validateStrictParsingParams(args[9], CREATE_TIME.getPreferredName()); + validateStrictParsingParams(args[10], VERSION.getPreferredName()); } @SuppressWarnings("unchecked") - Map headers = (Map) args[5]; + Map headers = (Map) args[6]; - PivotConfig pivotConfig = (PivotConfig) args[6]; - String description = (String)args[7]; + PivotConfig pivotConfig = (PivotConfig) args[7]; + String description = (String)args[8]; return new DataFrameTransformConfig(id, source, dest, + frequency, syncConfig, headers, pivotConfig, description, - (Instant)args[8], - (String)args[9]); + (Instant)args[9], + (String)args[10]); }); parser.declareString(optionalConstructorArg(), DataFrameField.ID); parser.declareObject(constructorArg(), (p, c) -> SourceConfig.fromXContent(p, lenient), DataFrameField.SOURCE); parser.declareObject(constructorArg(), (p, c) -> DestConfig.fromXContent(p, lenient), DataFrameField.DESTINATION); + parser.declareString(optionalConstructorArg(), DataFrameField.FREQUENCY); parser.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p, lenient), DataFrameField.SYNC); @@ -146,6 +165,7 @@ public static String documentId(String transformId) { DataFrameTransformConfig(final String id, final SourceConfig source, final DestConfig dest, + final TimeValue frequency, final SyncConfig syncConfig, final Map headers, final PivotConfig pivotConfig, @@ -155,6 +175,7 @@ public static String documentId(String transformId) { this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName()); this.source = ExceptionsHelper.requireNonNull(source, DataFrameField.SOURCE.getPreferredName()); this.dest = ExceptionsHelper.requireNonNull(dest, DataFrameField.DESTINATION.getPreferredName()); + this.frequency = frequency; this.syncConfig = syncConfig; this.setHeaders(headers == null ? Collections.emptyMap() : headers); this.pivotConfig = pivotConfig; @@ -174,17 +195,23 @@ public static String documentId(String transformId) { public DataFrameTransformConfig(final String id, final SourceConfig source, final DestConfig dest, + final TimeValue frequency, final SyncConfig syncConfig, final Map headers, final PivotConfig pivotConfig, final String description) { - this(id, source, dest, syncConfig, headers, pivotConfig, description, null, null); + this(id, source, dest, frequency, syncConfig, headers, pivotConfig, description, null, null); } public DataFrameTransformConfig(final StreamInput in) throws IOException { id = in.readString(); source = new SourceConfig(in); dest = new DestConfig(in); + if (in.getVersion().onOrAfter(Version.CURRENT)) { + frequency = in.readOptionalTimeValue(); + } else { + frequency = null; + } setHeaders(in.readMap(StreamInput::readString, StreamInput::readString)); pivotConfig = in.readOptionalWriteable(PivotConfig::new); description = in.readOptionalString(); @@ -211,6 +238,10 @@ public DestConfig getDestination() { return dest; } + public TimeValue getFrequency() { + return frequency; + } + public SyncConfig getSyncConfig() { return syncConfig; } @@ -269,6 +300,9 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeString(id); source.writeTo(out); dest.writeTo(out); + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeOptionalTimeValue(frequency); + } out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); out.writeOptionalWriteable(pivotConfig); out.writeOptionalString(description); @@ -290,6 +324,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa builder.field(DataFrameField.ID.getPreferredName(), id); builder.field(DataFrameField.SOURCE.getPreferredName(), source); builder.field(DataFrameField.DESTINATION.getPreferredName(), dest); + if (frequency != null) { + builder.field(DataFrameField.FREQUENCY.getPreferredName(), frequency.getStringRep()); + } if (syncConfig != null) { builder.startObject(DataFrameField.SYNC.getPreferredName()); builder.field(syncConfig.getWriteableName(), syncConfig); @@ -332,6 +369,7 @@ public boolean equals(Object other) { return Objects.equals(this.id, that.id) && Objects.equals(this.source, that.source) && Objects.equals(this.dest, that.dest) + && Objects.equals(this.frequency, that.frequency) && Objects.equals(this.syncConfig, that.syncConfig) && Objects.equals(this.headers, that.headers) && Objects.equals(this.pivotConfig, that.pivotConfig) @@ -342,7 +380,7 @@ public boolean equals(Object other) { @Override public int hashCode(){ - return Objects.hash(id, source, dest, syncConfig, headers, pivotConfig, description, createTime, transformVersion); + return Objects.hash(id, source, dest, frequency, syncConfig, headers, pivotConfig, description, createTime, transformVersion); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java index ea6f2a47f4692..5eaf955249214 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java @@ -44,6 +44,7 @@ protected Request createTestInstance() { "transform-preview", randomSourceConfig(), new DestConfig("unused-transform-preview-index", null), + null, randomBoolean() ? DataFrameTransformConfigTests.randomSyncConfig() : null, null, PivotConfigTests.randomPivotConfig(), diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java index dd5b5c9ff8841..849ff1629c755 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.Version; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -46,6 +47,7 @@ public static DataFrameTransformConfig randomDataFrameTransformConfigWithoutHead return new DataFrameTransformConfig(id, randomSourceConfig(), randomDestConfig(), + randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)), randomBoolean() ? null : randomSyncConfig(), null, PivotConfigTests.randomPivotConfig(), @@ -58,6 +60,7 @@ public static DataFrameTransformConfig randomDataFrameTransformConfig(String id) return new DataFrameTransformConfig(id, randomSourceConfig(), randomDestConfig(), + randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)), randomBoolean() ? null : randomSyncConfig(), randomHeaders(), PivotConfigTests.randomPivotConfig(), @@ -69,11 +72,11 @@ public static DataFrameTransformConfig randomDataFrameTransformConfig(String id) public static DataFrameTransformConfig randomInvalidDataFrameTransformConfig() { if (randomBoolean()) { return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomInvalidSourceConfig(), randomDestConfig(), - randomBoolean() ? randomSyncConfig() : null, randomHeaders(), PivotConfigTests.randomPivotConfig(), + null, randomBoolean() ? randomSyncConfig() : null, randomHeaders(), PivotConfigTests.randomPivotConfig(), randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)); } // else return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomSourceConfig(), randomDestConfig(), - randomBoolean() ? randomSyncConfig() : null, randomHeaders(), PivotConfigTests.randomInvalidPivotConfig(), + null, randomBoolean() ? randomSyncConfig() : null, randomHeaders(), PivotConfigTests.randomInvalidPivotConfig(), randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)); } @@ -146,7 +149,7 @@ public void testDefaultMatchAll() throws IOException { } } - public void testPreventHeaderInjection() throws IOException { + public void testPreventHeaderInjection() { String pivotTransform = "{" + " \"headers\" : {\"key\" : \"value\" }," + " \"source\" : {\"index\":\"src\"}," @@ -167,7 +170,7 @@ public void testPreventHeaderInjection() throws IOException { () -> createDataFrameTransformConfigFromString(pivotTransform, "test_header_injection")); } - public void testPreventCreateTimeInjection() throws IOException { + public void testPreventCreateTimeInjection() { String pivotTransform = "{" + " \"create_time\" : " + Instant.now().toEpochMilli() + " }," + " \"source\" : {\"index\":\"src\"}," @@ -188,7 +191,7 @@ public void testPreventCreateTimeInjection() throws IOException { () -> createDataFrameTransformConfigFromString(pivotTransform, "test_createTime_injection")); } - public void testPreventVersionInjection() throws IOException { + public void testPreventVersionInjection() { String pivotTransform = "{" + " \"version\" : \"7.3.0\"," + " \"source\" : {\"index\":\"src\"}," @@ -229,11 +232,11 @@ public void testXContentForInternalStorage() throws IOException { public void testMaxLengthDescription() { IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> new DataFrameTransformConfig("id", - randomSourceConfig(), randomDestConfig(), null, null, PivotConfigTests.randomPivotConfig(), randomAlphaOfLength(1001))); + randomSourceConfig(), randomDestConfig(), null, null, null, PivotConfigTests.randomPivotConfig(), randomAlphaOfLength(1001))); assertThat(exception.getMessage(), equalTo("[description] must be less than 1000 characters in length.")); String description = randomAlphaOfLength(1000); DataFrameTransformConfig config = new DataFrameTransformConfig("id", - randomSourceConfig(), randomDestConfig(), null, null, PivotConfigTests.randomPivotConfig(), description); + randomSourceConfig(), randomDestConfig(), null, null, null, PivotConfigTests.randomPivotConfig(), description); assertThat(description, equalTo(config.getDescription())); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTests.java index d7463a6df7139..ce830240c63f9 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; @@ -25,7 +26,8 @@ protected DataFrameTransform doParseInstance(XContentParser parser) throws IOExc @Override protected DataFrameTransform createTestInstance() { - return new DataFrameTransform(randomAlphaOfLength(10), randomBoolean() ? null : Version.CURRENT); + return new DataFrameTransform(randomAlphaOfLength(10), randomBoolean() ? null : Version.CURRENT, + randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000))); } @Override diff --git a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java index a1d8aca86ba16..09b34f7ed7b20 100644 --- a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java +++ b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java @@ -210,6 +210,7 @@ protected DataFrameTransformConfig.Builder createTransformConfigBuilder(String i .setId(id) .setSource(SourceConfig.builder().setIndex(sourceIndices).setQueryConfig(createQueryConfig(queryBuilder)).build()) .setDest(DestConfig.builder().setIndex(destinationIndex).build()) + .setFrequency(TimeValue.timeValueSeconds(10)) .setPivotConfig(createPivotConfig(groups, aggregations)) .setDescription("Test data frame transform config id: " + id); } diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java index a45fee6d9666f..fd6d21db045b5 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java @@ -214,6 +214,7 @@ public void testGetProgressResetWithContinuous() throws Exception { final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId, null); String config = "{ \"dest\": {\"index\":\"" + transformDest + "\"}," + " \"source\": {\"index\":\"" + transformSrc + "\"}," + + " \"frequency\": \"1s\"," + " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"1s\"}}," + " \"pivot\": {" + " \"group_by\": {" diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java index f2cd95ed1a9c7..4fb8ea6fafdd6 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java @@ -141,6 +141,7 @@ public void testContinuousPivot() throws Exception { String config = "{" + " \"source\": {\"index\":\"" + indexName + "\"}," + " \"dest\": {\"index\":\"" + dataFrameIndex + "\"}," + + " \"frequency\": \"1s\"," + " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}}," + " \"pivot\": {" + " \"group_by\": {" diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java index 002126e5b60e8..5c1efb48875e7 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java @@ -131,6 +131,7 @@ public void testGetProgress() throws Exception { destConfig, null, null, + null, pivotConfig, null); @@ -155,6 +156,7 @@ public void testGetProgress() throws Exception { destConfig, null, null, + null, pivotConfig, null); @@ -174,6 +176,7 @@ public void testGetProgress() throws Exception { destConfig, null, null, + null, pivotConfig, null); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java index 4b2a11a7d0258..21f53080ad596 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java @@ -186,7 +186,7 @@ protected void masterOperation(Task ignoredTask, StartDataFrameTransformAction.R return; } - transformTaskHolder.set(createDataFrameTransform(config.getId(), config.getVersion())); + transformTaskHolder.set(createDataFrameTransform(config.getId(), config.getVersion(), config.getFrequency())); final String destinationIndex = config.getDestination().getIndex(); String[] dest = indexNameExpressionResolver.concreteIndexNames(state, IndicesOptions.lenientExpandOpen(), @@ -255,8 +255,8 @@ protected ClusterBlockException checkBlock(StartDataFrameTransformAction.Request return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); } - private static DataFrameTransform createDataFrameTransform(String transformId, Version transformVersion) { - return new DataFrameTransform(transformId, transformVersion); + private static DataFrameTransform createDataFrameTransform(String transformId, Version transformVersion, TimeValue frequency) { + return new DataFrameTransform(transformId, transformVersion, frequency); } @SuppressWarnings("unchecked") diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index ea67da79620c4..57cb468fdd830 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -59,8 +60,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements SchedulerEngine.Listener { - // interval the scheduler sends an event - private static final int SCHEDULER_NEXT_MILLISECONDS = 10000; + // Default interval the scheduler sends an event if the config does not specify a frequency + private static final long SCHEDULER_NEXT_MILLISECONDS = 60000; private static final Logger logger = LogManager.getLogger(DataFrameTransformTask.class); // TODO consider moving to dynamic cluster setting private static final int MAX_CONTINUOUS_FAILURES = 10; @@ -363,7 +364,8 @@ private String schedulerJobName() { private SchedulerEngine.Schedule next() { return (startTime, now) -> { - return now + SCHEDULER_NEXT_MILLISECONDS; + TimeValue frequency = transform.getFrequency(); + return now + (frequency == null ? SCHEDULER_NEXT_MILLISECONDS : frequency.getMillis()); }; } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/DataFrameNodesTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/DataFrameNodesTests.java index 276d3af39ef13..d13c33fe9aabf 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/DataFrameNodesTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/DataFrameNodesTests.java @@ -31,10 +31,10 @@ public void testDataframeNodes() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); tasksBuilder.addTask(dataFrameIdFoo, - DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdFoo, Version.CURRENT), + DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdFoo, Version.CURRENT, null), new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); tasksBuilder.addTask(dataFrameIdBar, - DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdBar, Version.CURRENT), + DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdBar, Version.CURRENT, null), new PersistentTasksCustomMetaData.Assignment("node-2", "test assignment")); tasksBuilder.addTask("test-task1", "testTasks", new PersistentTaskParams() { @Override diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java index e3e9ff81eb653..03a34b013347b 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java @@ -186,6 +186,7 @@ public void testPageSizeAdapt() throws InterruptedException { randomDestConfig(), null, null, + null, new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig(), pageSize), randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java index 49cdabfdf0601..a58334242f105 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java @@ -51,15 +51,15 @@ public void testNodeVersionAssignment() { PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder() .addTask("data-frame-task-1", DataFrameTransform.NAME, - new DataFrameTransform("data-frame-task-1", Version.CURRENT), + new DataFrameTransform("data-frame-task-1", Version.CURRENT, null), new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", "")) .addTask("data-frame-task-2", DataFrameTransform.NAME, - new DataFrameTransform("data-frame-task-2", Version.CURRENT), + new DataFrameTransform("data-frame-task-2", Version.CURRENT, null), new PersistentTasksCustomMetaData.Assignment("current-data-node-with-2-tasks", "")) .addTask("data-frame-task-3", DataFrameTransform.NAME, - new DataFrameTransform("data-frame-task-3", Version.CURRENT), + new DataFrameTransform("data-frame-task-3", Version.CURRENT, null), new PersistentTasksCustomMetaData.Assignment("current-data-node-with-2-tasks", "")); PersistentTasksCustomMetaData pTasks = pTasksBuilder.build(); @@ -105,9 +105,9 @@ dataFrameTransformsCheckpointService, mock(SchedulerEngine.class), new DataFrameAuditor(client, ""), mock(ThreadPool.class)); - assertThat(executor.getAssignment(new DataFrameTransform("new-task-id", Version.CURRENT), cs).getExecutorNode(), + assertThat(executor.getAssignment(new DataFrameTransform("new-task-id", Version.CURRENT, null), cs).getExecutorNode(), equalTo("current-data-node-with-1-tasks")); - assertThat(executor.getAssignment(new DataFrameTransform("new-old-task-id", Version.V_7_2_0), cs).getExecutorNode(), + assertThat(executor.getAssignment(new DataFrameTransform("new-old-task-id", Version.V_7_2_0, null), cs).getExecutorNode(), equalTo("past-data-node-1")); } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml index bfde8128b491c..4d207d2750a76 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml @@ -42,6 +42,40 @@ setup: data_frame.delete_data_frame_transform: transform_id: "missing transform" +--- +"Test put transform with frequency too low": + - do: + catch: /minimum permitted \[frequency\] is \[1s\]/ + data_frame.put_data_frame_transform: + transform_id: "frequency-too-low" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-dest" }, + "frequency": "999ms", + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + +--- +"Test put transform with frequency too high": + - do: + catch: /highest permitted \[frequency\] is \[1h\]/ + data_frame.put_data_frame_transform: + transform_id: "frequency-too-low" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-dest" }, + "frequency": "3600001ms", + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + --- "Test put transform with invalid source index": - do: From 35b32c2351e5347cfe53bec2d3f047bd7837ab79 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 9 Jul 2019 17:33:57 +0100 Subject: [PATCH 2/2] Move validation to validate method --- .../action/PutDataFrameTransformAction.java | 17 +++++++++++++++++ .../transforms/DataFrameTransformConfig.java | 12 ------------ 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java index 00873d7630754..b58c1154a597f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -34,6 +35,9 @@ public class PutDataFrameTransformAction extends ActionType 0) { + validationException = addValidationError( + "highest permitted [" + DataFrameField.FREQUENCY + "] is [" + MAX_FREQUENCY.getStringRep() + "]", + validationException); + } + } + return validationException; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java index 5de93540dc116..af02125f6cbed 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java @@ -53,8 +53,6 @@ public class DataFrameTransformConfig extends AbstractDiffable STRICT_PARSER = createParser(false); private static final ConstructingObjectParser LENIENT_PARSER = createParser(true); private static final int MAX_DESCRIPTION_LENGTH = 1_000; - private static final TimeValue MIN_FREQUENCY = TimeValue.timeValueSeconds(1); - private static final TimeValue MAX_FREQUENCY = TimeValue.timeValueHours(1); private final String id; private final SourceConfig source; @@ -94,16 +92,6 @@ private static ConstructingObjectParser create TimeValue frequency = args[3] == null ? null : TimeValue.parseTimeValue((String) args[3], DataFrameField.FREQUENCY.getPreferredName()); - if (lenient == false && frequency != null) { - if (frequency.compareTo(MIN_FREQUENCY) < 0) { - throw new IllegalArgumentException( - "minimum permitted [" + DataFrameField.FREQUENCY + "] is [" + MIN_FREQUENCY + "]"); - } - if (frequency.compareTo(MAX_FREQUENCY) > 0) { - throw new IllegalArgumentException( - "highest permitted [" + DataFrameField.FREQUENCY + "] is [" + MAX_FREQUENCY + "]"); - } - } SyncConfig syncConfig = (SyncConfig) args[4]; // ignored, only for internal storage: String docType = (String) args[5];