diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java index 355e3ad9bbc0f..2810d6a8cfad2 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; @@ -44,6 +45,7 @@ public class DataFrameTransformConfig implements ToXContentObject { public static final ParseField ID = new ParseField("id"); public static final ParseField SOURCE = new ParseField("source"); public static final ParseField DEST = new ParseField("dest"); + public static final ParseField FREQUENCY = new ParseField("frequency"); public static final ParseField DESCRIPTION = new ParseField("description"); public static final ParseField SYNC = new ParseField("sync"); public static final ParseField VERSION = new ParseField("version"); @@ -54,6 +56,7 @@ public class DataFrameTransformConfig implements ToXContentObject { private final String id; private final SourceConfig source; private final DestConfig dest; + private final TimeValue frequency; private final SyncConfig syncConfig; private final PivotConfig pivotConfig; private final String description; @@ -66,14 +69,16 @@ public class DataFrameTransformConfig implements ToXContentObject { String id = (String) args[0]; SourceConfig source = (SourceConfig) args[1]; DestConfig dest = (DestConfig) args[2]; - SyncConfig syncConfig = (SyncConfig) args[3]; - PivotConfig pivotConfig = (PivotConfig) args[4]; - String description = (String)args[5]; - Instant createTime = (Instant)args[6]; - String transformVersion = (String)args[7]; + TimeValue frequency = (TimeValue) args[3]; + SyncConfig syncConfig = (SyncConfig) args[4]; + PivotConfig pivotConfig = (PivotConfig) args[5]; + String description = (String)args[6]; + Instant createTime = (Instant)args[7]; + String transformVersion = (String)args[8]; return new DataFrameTransformConfig(id, source, dest, + frequency, syncConfig, pivotConfig, description, @@ -85,6 +90,8 @@ public class DataFrameTransformConfig implements ToXContentObject { PARSER.declareString(constructorArg(), ID); PARSER.declareObject(constructorArg(), (p, c) -> SourceConfig.PARSER.apply(p, null), SOURCE); PARSER.declareObject(constructorArg(), (p, c) -> DestConfig.PARSER.apply(p, null), DEST); + PARSER.declareField(optionalConstructorArg(), p -> TimeValue.parseTimeValue(p.text(), FREQUENCY.getPreferredName()), + FREQUENCY, ObjectParser.ValueType.STRING); PARSER.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p), SYNC); PARSER.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p), PIVOT_TRANSFORM); PARSER.declareString(optionalConstructorArg(), DESCRIPTION); @@ -118,12 +125,13 @@ public static DataFrameTransformConfig fromXContent(final XContentParser parser) * @return A DataFrameTransformConfig to preview, NOTE it will have a {@code null} id, destination and index. */ public static DataFrameTransformConfig forPreview(final SourceConfig source, final PivotConfig pivotConfig) { - return new DataFrameTransformConfig(null, source, null, null, pivotConfig, null, null, null); + return new DataFrameTransformConfig(null, source, null, null, null, pivotConfig, null, null, null); } DataFrameTransformConfig(final String id, final SourceConfig source, final DestConfig dest, + final TimeValue frequency, final SyncConfig syncConfig, final PivotConfig pivotConfig, final String description, @@ -132,6 +140,7 @@ public static DataFrameTransformConfig forPreview(final SourceConfig source, fin this.id = id; this.source = source; this.dest = dest; + this.frequency = frequency; this.syncConfig = syncConfig; this.pivotConfig = pivotConfig; this.description = description; @@ -151,6 +160,10 @@ public DestConfig getDestination() { return dest; } + public TimeValue getFrequency() { + return frequency; + } + public SyncConfig getSyncConfig() { return syncConfig; } @@ -184,6 +197,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa if (dest != null) { builder.field(DEST.getPreferredName(), dest); } + if (frequency != null) { + builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep()); + } if (syncConfig != null) { builder.startObject(SYNC.getPreferredName()); builder.field(syncConfig.getName(), syncConfig); @@ -220,6 +236,7 @@ public boolean equals(Object other) { return Objects.equals(this.id, that.id) && Objects.equals(this.source, that.source) && Objects.equals(this.dest, that.dest) + && Objects.equals(this.frequency, that.frequency) && Objects.equals(this.description, that.description) && Objects.equals(this.syncConfig, that.syncConfig) && Objects.equals(this.transformVersion, that.transformVersion) @@ -229,7 +246,7 @@ public boolean equals(Object other) { @Override public int hashCode() { - return Objects.hash(id, source, dest, syncConfig, pivotConfig, description); + return Objects.hash(id, source, dest, frequency, syncConfig, pivotConfig, description); } @Override @@ -246,6 +263,7 @@ public static class Builder { private String id; private SourceConfig source; private DestConfig dest; + private TimeValue frequency; private SyncConfig syncConfig; private PivotConfig pivotConfig; private String description; @@ -265,6 +283,11 @@ public Builder setDest(DestConfig dest) { return this; } + public Builder setFrequency(TimeValue frequency) { + this.frequency = frequency; + return this; + } + public Builder setSyncConfig(SyncConfig syncConfig) { this.syncConfig = syncConfig; return this; @@ -281,7 +304,7 @@ public Builder setDescription(String description) { } public DataFrameTransformConfig build() { - return new DataFrameTransformConfig(id, source, dest, syncConfig, pivotConfig, description, null, null); + return new DataFrameTransformConfig(id, source, dest, frequency, syncConfig, pivotConfig, description, null, null); } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java index 79b7e85098e04..7a42a6f70d950 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.Version; import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfigTests; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.SearchModule; @@ -43,6 +44,7 @@ public static DataFrameTransformConfig randomDataFrameTransformConfig() { return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomSourceConfig(), randomDestConfig(), + randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1000, 1000000)), randomBoolean() ? null : randomSyncConfig(), PivotConfigTests.randomPivotConfig(), randomBoolean() ? null : randomAlphaOfLengthBetween(1, 100), diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java index 731d42f902c50..048f383ce3a4f 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java @@ -156,8 +156,9 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException .setId("reviewer-avg-rating") // <1> .setSource(sourceConfig) // <2> .setDest(destConfig) // <3> - .setPivotConfig(pivotConfig) // <4> - .setDescription("This is my test transform") // <5> + .setFrequency(TimeValue.timeValueSeconds(15)) // <4> + .setPivotConfig(pivotConfig) // <5> + .setDescription("This is my test transform") // <6> .build(); // end::put-data-frame-transform-config diff --git a/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc b/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc index 19c7fe443dbcd..3ba16a987f98c 100644 --- a/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc +++ b/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc @@ -34,8 +34,9 @@ include-tagged::{doc-tests-file}[{api}-config] <1> The {dataframe-transform} ID <2> The source indices and query from which to gather data <3> The destination index and optional pipeline -<4> The PivotConfig -<5> Optional free text description of the transform +<4> How often to check for updates to the source indices +<5> The PivotConfig +<6> Optional free text description of the transform [id="{upid}-{api}-query-config"] diff --git a/docs/reference/data-frames/apis/put-transform.asciidoc b/docs/reference/data-frames/apis/put-transform.asciidoc index abc5779e12a88..d8fe652639d62 100644 --- a/docs/reference/data-frames/apis/put-transform.asciidoc +++ b/docs/reference/data-frames/apis/put-transform.asciidoc @@ -60,6 +60,11 @@ IMPORTANT: You must use {kib} or this API to create a {dataframe-transform}. (object) The destination configuration, which consists of `index` and optionally a `pipeline` id. See <>. +`frequency` (Optional):: + (time units) The interval between checks for changes in the source indices + when the {dataframe-transform} is running continuously. Defaults to `1m`. + The lowest permitted value is `1s`; the highest `1h`. + `pivot` (Optional):: (object) Defines the pivot function `group by` fields and the aggregation to reduce the data. See <>. @@ -90,6 +95,7 @@ PUT _data_frame/transforms/ecommerce_transform "index": "kibana_sample_data_ecommerce_transform", "pipeline": "add_timestamp_pipeline" }, + "frequency": "5m", "pivot": { "group_by": { "customer_id": { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java index 2dedef82eb3cf..9d5db1e5022e3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java @@ -26,6 +26,7 @@ public final class DataFrameField { public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type"); public static final ParseField SOURCE = new ParseField("source"); public static final ParseField DESTINATION = new ParseField("dest"); + public static final ParseField FREQUENCY = new ParseField("frequency"); public static final ParseField FORCE = new ParseField("force"); public static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size"); public static final ParseField FIELD = new ParseField("field"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java index 00873d7630754..b58c1154a597f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -34,6 +35,9 @@ public class PutDataFrameTransformAction extends ActionType 0) { + validationException = addValidationError( + "highest permitted [" + DataFrameField.FREQUENCY + "] is [" + MAX_FREQUENCY.getStringRep() + "]", + validationException); + } + } + return validationException; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransform.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransform.java index e620e4f859543..8ee56269c3cda 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransform.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransform.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -24,25 +25,30 @@ public class DataFrameTransform extends AbstractDiffable imp public static final String NAME = DataFrameField.TASK_NAME; public static final ParseField VERSION = new ParseField(DataFrameField.VERSION); + public static final ParseField FREQUENCY = DataFrameField.FREQUENCY; private final String transformId; private final Version version; + private final TimeValue frequency; - public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, - a -> new DataFrameTransform((String) a[0], (String) a[1])); + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, true, + a -> new DataFrameTransform((String) a[0], (String) a[1], (String) a[2])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameField.ID); PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), VERSION); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FREQUENCY); } - private DataFrameTransform(String transformId, String version) { - this(transformId, version == null ? null : Version.fromString(version)); + private DataFrameTransform(String transformId, String version, String frequency) { + this(transformId, version == null ? null : Version.fromString(version), + frequency == null ? null : TimeValue.parseTimeValue(frequency, FREQUENCY.getPreferredName())); } - public DataFrameTransform(String transformId, Version version) { + public DataFrameTransform(String transformId, Version version, TimeValue frequency) { this.transformId = transformId; this.version = version == null ? Version.V_7_2_0 : version; + this.frequency = frequency; } public DataFrameTransform(StreamInput in) throws IOException { @@ -52,6 +58,11 @@ public DataFrameTransform(StreamInput in) throws IOException { } else { this.version = Version.V_7_2_0; } + if (in.getVersion().onOrAfter(Version.CURRENT)) { + this.frequency = in.readOptionalTimeValue(); + } else { + this.frequency = null; + } } @Override @@ -70,6 +81,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_7_3_0)) { Version.writeVersion(version, out); } + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeOptionalTimeValue(frequency); + } } @Override @@ -77,6 +91,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field(DataFrameField.ID.getPreferredName(), transformId); builder.field(VERSION.getPreferredName(), version); + if (frequency != null) { + builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep()); + } builder.endObject(); return builder; } @@ -89,6 +106,10 @@ public Version getVersion() { return version; } + public TimeValue getFrequency() { + return frequency; + } + public static DataFrameTransform fromXContent(XContentParser parser) throws IOException { return PARSER.parse(parser, null); } @@ -105,11 +126,13 @@ public boolean equals(Object other) { DataFrameTransform that = (DataFrameTransform) other; - return Objects.equals(this.transformId, that.transformId) && Objects.equals(this.version, that.version); + return Objects.equals(this.transformId, that.transformId) + && Objects.equals(this.version, that.version) + && Objects.equals(this.frequency, that.frequency); } @Override public int hashCode() { - return Objects.hash(transformId, version); + return Objects.hash(transformId, version, frequency); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java index e3ad50d9b889e..af02125f6cbed 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; @@ -56,6 +57,7 @@ public class DataFrameTransformConfig extends AbstractDiffable create SourceConfig source = (SourceConfig) args[1]; DestConfig dest = (DestConfig) args[2]; - SyncConfig syncConfig = (SyncConfig) args[3]; - // ignored, only for internal storage: String docType = (String) args[4]; + TimeValue frequency = + args[3] == null ? null : TimeValue.parseTimeValue((String) args[3], DataFrameField.FREQUENCY.getPreferredName()); + + SyncConfig syncConfig = (SyncConfig) args[4]; + // ignored, only for internal storage: String docType = (String) args[5]; // on strict parsing do not allow injection of headers, transform version, or create time if (lenient == false) { - validateStrictParsingParams(args[5], HEADERS.getPreferredName()); - validateStrictParsingParams(args[8], CREATE_TIME.getPreferredName()); - validateStrictParsingParams(args[9], VERSION.getPreferredName()); + validateStrictParsingParams(args[6], HEADERS.getPreferredName()); + validateStrictParsingParams(args[9], CREATE_TIME.getPreferredName()); + validateStrictParsingParams(args[10], VERSION.getPreferredName()); } @SuppressWarnings("unchecked") - Map headers = (Map) args[5]; + Map headers = (Map) args[6]; - PivotConfig pivotConfig = (PivotConfig) args[6]; - String description = (String)args[7]; + PivotConfig pivotConfig = (PivotConfig) args[7]; + String description = (String)args[8]; return new DataFrameTransformConfig(id, source, dest, + frequency, syncConfig, headers, pivotConfig, description, - (Instant)args[8], - (String)args[9]); + (Instant)args[9], + (String)args[10]); }); parser.declareString(optionalConstructorArg(), DataFrameField.ID); parser.declareObject(constructorArg(), (p, c) -> SourceConfig.fromXContent(p, lenient), DataFrameField.SOURCE); parser.declareObject(constructorArg(), (p, c) -> DestConfig.fromXContent(p, lenient), DataFrameField.DESTINATION); + parser.declareString(optionalConstructorArg(), DataFrameField.FREQUENCY); parser.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p, lenient), DataFrameField.SYNC); @@ -146,6 +153,7 @@ public static String documentId(String transformId) { DataFrameTransformConfig(final String id, final SourceConfig source, final DestConfig dest, + final TimeValue frequency, final SyncConfig syncConfig, final Map headers, final PivotConfig pivotConfig, @@ -155,6 +163,7 @@ public static String documentId(String transformId) { this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName()); this.source = ExceptionsHelper.requireNonNull(source, DataFrameField.SOURCE.getPreferredName()); this.dest = ExceptionsHelper.requireNonNull(dest, DataFrameField.DESTINATION.getPreferredName()); + this.frequency = frequency; this.syncConfig = syncConfig; this.setHeaders(headers == null ? Collections.emptyMap() : headers); this.pivotConfig = pivotConfig; @@ -174,17 +183,23 @@ public static String documentId(String transformId) { public DataFrameTransformConfig(final String id, final SourceConfig source, final DestConfig dest, + final TimeValue frequency, final SyncConfig syncConfig, final Map headers, final PivotConfig pivotConfig, final String description) { - this(id, source, dest, syncConfig, headers, pivotConfig, description, null, null); + this(id, source, dest, frequency, syncConfig, headers, pivotConfig, description, null, null); } public DataFrameTransformConfig(final StreamInput in) throws IOException { id = in.readString(); source = new SourceConfig(in); dest = new DestConfig(in); + if (in.getVersion().onOrAfter(Version.CURRENT)) { + frequency = in.readOptionalTimeValue(); + } else { + frequency = null; + } setHeaders(in.readMap(StreamInput::readString, StreamInput::readString)); pivotConfig = in.readOptionalWriteable(PivotConfig::new); description = in.readOptionalString(); @@ -211,6 +226,10 @@ public DestConfig getDestination() { return dest; } + public TimeValue getFrequency() { + return frequency; + } + public SyncConfig getSyncConfig() { return syncConfig; } @@ -269,6 +288,9 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeString(id); source.writeTo(out); dest.writeTo(out); + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeOptionalTimeValue(frequency); + } out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); out.writeOptionalWriteable(pivotConfig); out.writeOptionalString(description); @@ -290,6 +312,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa builder.field(DataFrameField.ID.getPreferredName(), id); builder.field(DataFrameField.SOURCE.getPreferredName(), source); builder.field(DataFrameField.DESTINATION.getPreferredName(), dest); + if (frequency != null) { + builder.field(DataFrameField.FREQUENCY.getPreferredName(), frequency.getStringRep()); + } if (syncConfig != null) { builder.startObject(DataFrameField.SYNC.getPreferredName()); builder.field(syncConfig.getWriteableName(), syncConfig); @@ -332,6 +357,7 @@ public boolean equals(Object other) { return Objects.equals(this.id, that.id) && Objects.equals(this.source, that.source) && Objects.equals(this.dest, that.dest) + && Objects.equals(this.frequency, that.frequency) && Objects.equals(this.syncConfig, that.syncConfig) && Objects.equals(this.headers, that.headers) && Objects.equals(this.pivotConfig, that.pivotConfig) @@ -342,7 +368,7 @@ public boolean equals(Object other) { @Override public int hashCode(){ - return Objects.hash(id, source, dest, syncConfig, headers, pivotConfig, description, createTime, transformVersion); + return Objects.hash(id, source, dest, frequency, syncConfig, headers, pivotConfig, description, createTime, transformVersion); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java index ea6f2a47f4692..5eaf955249214 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/PreviewDataFrameTransformActionRequestTests.java @@ -44,6 +44,7 @@ protected Request createTestInstance() { "transform-preview", randomSourceConfig(), new DestConfig("unused-transform-preview-index", null), + null, randomBoolean() ? DataFrameTransformConfigTests.randomSyncConfig() : null, null, PivotConfigTests.randomPivotConfig(), diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java index dd5b5c9ff8841..849ff1629c755 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfigTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.Version; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -46,6 +47,7 @@ public static DataFrameTransformConfig randomDataFrameTransformConfigWithoutHead return new DataFrameTransformConfig(id, randomSourceConfig(), randomDestConfig(), + randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)), randomBoolean() ? null : randomSyncConfig(), null, PivotConfigTests.randomPivotConfig(), @@ -58,6 +60,7 @@ public static DataFrameTransformConfig randomDataFrameTransformConfig(String id) return new DataFrameTransformConfig(id, randomSourceConfig(), randomDestConfig(), + randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)), randomBoolean() ? null : randomSyncConfig(), randomHeaders(), PivotConfigTests.randomPivotConfig(), @@ -69,11 +72,11 @@ public static DataFrameTransformConfig randomDataFrameTransformConfig(String id) public static DataFrameTransformConfig randomInvalidDataFrameTransformConfig() { if (randomBoolean()) { return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomInvalidSourceConfig(), randomDestConfig(), - randomBoolean() ? randomSyncConfig() : null, randomHeaders(), PivotConfigTests.randomPivotConfig(), + null, randomBoolean() ? randomSyncConfig() : null, randomHeaders(), PivotConfigTests.randomPivotConfig(), randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)); } // else return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomSourceConfig(), randomDestConfig(), - randomBoolean() ? randomSyncConfig() : null, randomHeaders(), PivotConfigTests.randomInvalidPivotConfig(), + null, randomBoolean() ? randomSyncConfig() : null, randomHeaders(), PivotConfigTests.randomInvalidPivotConfig(), randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)); } @@ -146,7 +149,7 @@ public void testDefaultMatchAll() throws IOException { } } - public void testPreventHeaderInjection() throws IOException { + public void testPreventHeaderInjection() { String pivotTransform = "{" + " \"headers\" : {\"key\" : \"value\" }," + " \"source\" : {\"index\":\"src\"}," @@ -167,7 +170,7 @@ public void testPreventHeaderInjection() throws IOException { () -> createDataFrameTransformConfigFromString(pivotTransform, "test_header_injection")); } - public void testPreventCreateTimeInjection() throws IOException { + public void testPreventCreateTimeInjection() { String pivotTransform = "{" + " \"create_time\" : " + Instant.now().toEpochMilli() + " }," + " \"source\" : {\"index\":\"src\"}," @@ -188,7 +191,7 @@ public void testPreventCreateTimeInjection() throws IOException { () -> createDataFrameTransformConfigFromString(pivotTransform, "test_createTime_injection")); } - public void testPreventVersionInjection() throws IOException { + public void testPreventVersionInjection() { String pivotTransform = "{" + " \"version\" : \"7.3.0\"," + " \"source\" : {\"index\":\"src\"}," @@ -229,11 +232,11 @@ public void testXContentForInternalStorage() throws IOException { public void testMaxLengthDescription() { IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> new DataFrameTransformConfig("id", - randomSourceConfig(), randomDestConfig(), null, null, PivotConfigTests.randomPivotConfig(), randomAlphaOfLength(1001))); + randomSourceConfig(), randomDestConfig(), null, null, null, PivotConfigTests.randomPivotConfig(), randomAlphaOfLength(1001))); assertThat(exception.getMessage(), equalTo("[description] must be less than 1000 characters in length.")); String description = randomAlphaOfLength(1000); DataFrameTransformConfig config = new DataFrameTransformConfig("id", - randomSourceConfig(), randomDestConfig(), null, null, PivotConfigTests.randomPivotConfig(), description); + randomSourceConfig(), randomDestConfig(), null, null, null, PivotConfigTests.randomPivotConfig(), description); assertThat(description, equalTo(config.getDescription())); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTests.java index d7463a6df7139..ce830240c63f9 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; @@ -25,7 +26,8 @@ protected DataFrameTransform doParseInstance(XContentParser parser) throws IOExc @Override protected DataFrameTransform createTestInstance() { - return new DataFrameTransform(randomAlphaOfLength(10), randomBoolean() ? null : Version.CURRENT); + return new DataFrameTransform(randomAlphaOfLength(10), randomBoolean() ? null : Version.CURRENT, + randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000))); } @Override diff --git a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java index a1d8aca86ba16..09b34f7ed7b20 100644 --- a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java +++ b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java @@ -210,6 +210,7 @@ protected DataFrameTransformConfig.Builder createTransformConfigBuilder(String i .setId(id) .setSource(SourceConfig.builder().setIndex(sourceIndices).setQueryConfig(createQueryConfig(queryBuilder)).build()) .setDest(DestConfig.builder().setIndex(destinationIndex).build()) + .setFrequency(TimeValue.timeValueSeconds(10)) .setPivotConfig(createPivotConfig(groups, aggregations)) .setDescription("Test data frame transform config id: " + id); } diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java index a45fee6d9666f..fd6d21db045b5 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java @@ -214,6 +214,7 @@ public void testGetProgressResetWithContinuous() throws Exception { final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId, null); String config = "{ \"dest\": {\"index\":\"" + transformDest + "\"}," + " \"source\": {\"index\":\"" + transformSrc + "\"}," + + " \"frequency\": \"1s\"," + " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"1s\"}}," + " \"pivot\": {" + " \"group_by\": {" diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java index f2cd95ed1a9c7..4fb8ea6fafdd6 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java @@ -141,6 +141,7 @@ public void testContinuousPivot() throws Exception { String config = "{" + " \"source\": {\"index\":\"" + indexName + "\"}," + " \"dest\": {\"index\":\"" + dataFrameIndex + "\"}," + + " \"frequency\": \"1s\"," + " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}}," + " \"pivot\": {" + " \"group_by\": {" diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java index 002126e5b60e8..5c1efb48875e7 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java @@ -131,6 +131,7 @@ public void testGetProgress() throws Exception { destConfig, null, null, + null, pivotConfig, null); @@ -155,6 +156,7 @@ public void testGetProgress() throws Exception { destConfig, null, null, + null, pivotConfig, null); @@ -174,6 +176,7 @@ public void testGetProgress() throws Exception { destConfig, null, null, + null, pivotConfig, null); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java index 4b2a11a7d0258..21f53080ad596 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java @@ -186,7 +186,7 @@ protected void masterOperation(Task ignoredTask, StartDataFrameTransformAction.R return; } - transformTaskHolder.set(createDataFrameTransform(config.getId(), config.getVersion())); + transformTaskHolder.set(createDataFrameTransform(config.getId(), config.getVersion(), config.getFrequency())); final String destinationIndex = config.getDestination().getIndex(); String[] dest = indexNameExpressionResolver.concreteIndexNames(state, IndicesOptions.lenientExpandOpen(), @@ -255,8 +255,8 @@ protected ClusterBlockException checkBlock(StartDataFrameTransformAction.Request return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); } - private static DataFrameTransform createDataFrameTransform(String transformId, Version transformVersion) { - return new DataFrameTransform(transformId, transformVersion); + private static DataFrameTransform createDataFrameTransform(String transformId, Version transformVersion, TimeValue frequency) { + return new DataFrameTransform(transformId, transformVersion, frequency); } @SuppressWarnings("unchecked") diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index ea67da79620c4..57cb468fdd830 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -59,8 +60,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements SchedulerEngine.Listener { - // interval the scheduler sends an event - private static final int SCHEDULER_NEXT_MILLISECONDS = 10000; + // Default interval the scheduler sends an event if the config does not specify a frequency + private static final long SCHEDULER_NEXT_MILLISECONDS = 60000; private static final Logger logger = LogManager.getLogger(DataFrameTransformTask.class); // TODO consider moving to dynamic cluster setting private static final int MAX_CONTINUOUS_FAILURES = 10; @@ -363,7 +364,8 @@ private String schedulerJobName() { private SchedulerEngine.Schedule next() { return (startTime, now) -> { - return now + SCHEDULER_NEXT_MILLISECONDS; + TimeValue frequency = transform.getFrequency(); + return now + (frequency == null ? SCHEDULER_NEXT_MILLISECONDS : frequency.getMillis()); }; } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/DataFrameNodesTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/DataFrameNodesTests.java index 276d3af39ef13..d13c33fe9aabf 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/DataFrameNodesTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/DataFrameNodesTests.java @@ -31,10 +31,10 @@ public void testDataframeNodes() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); tasksBuilder.addTask(dataFrameIdFoo, - DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdFoo, Version.CURRENT), + DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdFoo, Version.CURRENT, null), new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); tasksBuilder.addTask(dataFrameIdBar, - DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdBar, Version.CURRENT), + DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdBar, Version.CURRENT, null), new PersistentTasksCustomMetaData.Assignment("node-2", "test assignment")); tasksBuilder.addTask("test-task1", "testTasks", new PersistentTaskParams() { @Override diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java index e3e9ff81eb653..03a34b013347b 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java @@ -186,6 +186,7 @@ public void testPageSizeAdapt() throws InterruptedException { randomDestConfig(), null, null, + null, new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig(), pageSize), randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java index 49cdabfdf0601..a58334242f105 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java @@ -51,15 +51,15 @@ public void testNodeVersionAssignment() { PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder() .addTask("data-frame-task-1", DataFrameTransform.NAME, - new DataFrameTransform("data-frame-task-1", Version.CURRENT), + new DataFrameTransform("data-frame-task-1", Version.CURRENT, null), new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", "")) .addTask("data-frame-task-2", DataFrameTransform.NAME, - new DataFrameTransform("data-frame-task-2", Version.CURRENT), + new DataFrameTransform("data-frame-task-2", Version.CURRENT, null), new PersistentTasksCustomMetaData.Assignment("current-data-node-with-2-tasks", "")) .addTask("data-frame-task-3", DataFrameTransform.NAME, - new DataFrameTransform("data-frame-task-3", Version.CURRENT), + new DataFrameTransform("data-frame-task-3", Version.CURRENT, null), new PersistentTasksCustomMetaData.Assignment("current-data-node-with-2-tasks", "")); PersistentTasksCustomMetaData pTasks = pTasksBuilder.build(); @@ -105,9 +105,9 @@ dataFrameTransformsCheckpointService, mock(SchedulerEngine.class), new DataFrameAuditor(client, ""), mock(ThreadPool.class)); - assertThat(executor.getAssignment(new DataFrameTransform("new-task-id", Version.CURRENT), cs).getExecutorNode(), + assertThat(executor.getAssignment(new DataFrameTransform("new-task-id", Version.CURRENT, null), cs).getExecutorNode(), equalTo("current-data-node-with-1-tasks")); - assertThat(executor.getAssignment(new DataFrameTransform("new-old-task-id", Version.V_7_2_0), cs).getExecutorNode(), + assertThat(executor.getAssignment(new DataFrameTransform("new-old-task-id", Version.V_7_2_0, null), cs).getExecutorNode(), equalTo("past-data-node-1")); } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml index bfde8128b491c..4d207d2750a76 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml @@ -42,6 +42,40 @@ setup: data_frame.delete_data_frame_transform: transform_id: "missing transform" +--- +"Test put transform with frequency too low": + - do: + catch: /minimum permitted \[frequency\] is \[1s\]/ + data_frame.put_data_frame_transform: + transform_id: "frequency-too-low" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-dest" }, + "frequency": "999ms", + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + +--- +"Test put transform with frequency too high": + - do: + catch: /highest permitted \[frequency\] is \[1h\]/ + data_frame.put_data_frame_transform: + transform_id: "frequency-too-low" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-dest" }, + "frequency": "3600001ms", + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + --- "Test put transform with invalid source index": - do: