Skip to content

Commit c8a083f

Browse files
author
David Roberts
committed
[ML-DataFrame] Add a frequency option to transform config, default 1m (#44120)
Previously a data frame transform would check whether the source index was changed every 10 seconds. Sometimes it may be desirable for the check to be done less frequently. This commit increases the default to 60 seconds but also allows the frequency to be overridden by a setting in the data frame transform config.
1 parent 6c6742a commit c8a083f

File tree

22 files changed

+201
-52
lines changed

22 files changed

+201
-52
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.common.Nullable;
2626
import org.elasticsearch.common.ParseField;
2727
import org.elasticsearch.common.Strings;
28+
import org.elasticsearch.common.unit.TimeValue;
2829
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
2930
import org.elasticsearch.common.xcontent.ObjectParser;
3031
import org.elasticsearch.common.xcontent.ToXContentObject;
@@ -44,6 +45,7 @@ public class DataFrameTransformConfig implements ToXContentObject {
4445
public static final ParseField ID = new ParseField("id");
4546
public static final ParseField SOURCE = new ParseField("source");
4647
public static final ParseField DEST = new ParseField("dest");
48+
public static final ParseField FREQUENCY = new ParseField("frequency");
4749
public static final ParseField DESCRIPTION = new ParseField("description");
4850
public static final ParseField SYNC = new ParseField("sync");
4951
public static final ParseField VERSION = new ParseField("version");
@@ -54,6 +56,7 @@ public class DataFrameTransformConfig implements ToXContentObject {
5456
private final String id;
5557
private final SourceConfig source;
5658
private final DestConfig dest;
59+
private final TimeValue frequency;
5760
private final SyncConfig syncConfig;
5861
private final PivotConfig pivotConfig;
5962
private final String description;
@@ -66,14 +69,16 @@ public class DataFrameTransformConfig implements ToXContentObject {
6669
String id = (String) args[0];
6770
SourceConfig source = (SourceConfig) args[1];
6871
DestConfig dest = (DestConfig) args[2];
69-
SyncConfig syncConfig = (SyncConfig) args[3];
70-
PivotConfig pivotConfig = (PivotConfig) args[4];
71-
String description = (String)args[5];
72-
Instant createTime = (Instant)args[6];
73-
String transformVersion = (String)args[7];
72+
TimeValue frequency = (TimeValue) args[3];
73+
SyncConfig syncConfig = (SyncConfig) args[4];
74+
PivotConfig pivotConfig = (PivotConfig) args[5];
75+
String description = (String)args[6];
76+
Instant createTime = (Instant)args[7];
77+
String transformVersion = (String)args[8];
7478
return new DataFrameTransformConfig(id,
7579
source,
7680
dest,
81+
frequency,
7782
syncConfig,
7883
pivotConfig,
7984
description,
@@ -85,6 +90,8 @@ public class DataFrameTransformConfig implements ToXContentObject {
8590
PARSER.declareString(constructorArg(), ID);
8691
PARSER.declareObject(constructorArg(), (p, c) -> SourceConfig.PARSER.apply(p, null), SOURCE);
8792
PARSER.declareObject(constructorArg(), (p, c) -> DestConfig.PARSER.apply(p, null), DEST);
93+
PARSER.declareField(optionalConstructorArg(), p -> TimeValue.parseTimeValue(p.text(), FREQUENCY.getPreferredName()),
94+
FREQUENCY, ObjectParser.ValueType.STRING);
8895
PARSER.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p), SYNC);
8996
PARSER.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p), PIVOT_TRANSFORM);
9097
PARSER.declareString(optionalConstructorArg(), DESCRIPTION);
@@ -118,12 +125,13 @@ public static DataFrameTransformConfig fromXContent(final XContentParser parser)
118125
* @return A DataFrameTransformConfig to preview, NOTE it will have a {@code null} id, destination and index.
119126
*/
120127
public static DataFrameTransformConfig forPreview(final SourceConfig source, final PivotConfig pivotConfig) {
121-
return new DataFrameTransformConfig(null, source, null, null, pivotConfig, null, null, null);
128+
return new DataFrameTransformConfig(null, source, null, null, null, pivotConfig, null, null, null);
122129
}
123130

124131
DataFrameTransformConfig(final String id,
125132
final SourceConfig source,
126133
final DestConfig dest,
134+
final TimeValue frequency,
127135
final SyncConfig syncConfig,
128136
final PivotConfig pivotConfig,
129137
final String description,
@@ -132,6 +140,7 @@ public static DataFrameTransformConfig forPreview(final SourceConfig source, fin
132140
this.id = id;
133141
this.source = source;
134142
this.dest = dest;
143+
this.frequency = frequency;
135144
this.syncConfig = syncConfig;
136145
this.pivotConfig = pivotConfig;
137146
this.description = description;
@@ -151,6 +160,10 @@ public DestConfig getDestination() {
151160
return dest;
152161
}
153162

163+
public TimeValue getFrequency() {
164+
return frequency;
165+
}
166+
154167
public SyncConfig getSyncConfig() {
155168
return syncConfig;
156169
}
@@ -184,6 +197,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
184197
if (dest != null) {
185198
builder.field(DEST.getPreferredName(), dest);
186199
}
200+
if (frequency != null) {
201+
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
202+
}
187203
if (syncConfig != null) {
188204
builder.startObject(SYNC.getPreferredName());
189205
builder.field(syncConfig.getName(), syncConfig);
@@ -220,6 +236,7 @@ public boolean equals(Object other) {
220236
return Objects.equals(this.id, that.id)
221237
&& Objects.equals(this.source, that.source)
222238
&& Objects.equals(this.dest, that.dest)
239+
&& Objects.equals(this.frequency, that.frequency)
223240
&& Objects.equals(this.description, that.description)
224241
&& Objects.equals(this.syncConfig, that.syncConfig)
225242
&& Objects.equals(this.transformVersion, that.transformVersion)
@@ -229,7 +246,7 @@ public boolean equals(Object other) {
229246

230247
@Override
231248
public int hashCode() {
232-
return Objects.hash(id, source, dest, syncConfig, pivotConfig, description);
249+
return Objects.hash(id, source, dest, frequency, syncConfig, pivotConfig, description);
233250
}
234251

235252
@Override
@@ -246,6 +263,7 @@ public static class Builder {
246263
private String id;
247264
private SourceConfig source;
248265
private DestConfig dest;
266+
private TimeValue frequency;
249267
private SyncConfig syncConfig;
250268
private PivotConfig pivotConfig;
251269
private String description;
@@ -265,6 +283,11 @@ public Builder setDest(DestConfig dest) {
265283
return this;
266284
}
267285

286+
public Builder setFrequency(TimeValue frequency) {
287+
this.frequency = frequency;
288+
return this;
289+
}
290+
268291
public Builder setSyncConfig(SyncConfig syncConfig) {
269292
this.syncConfig = syncConfig;
270293
return this;
@@ -281,7 +304,7 @@ public Builder setDescription(String description) {
281304
}
282305

283306
public DataFrameTransformConfig build() {
284-
return new DataFrameTransformConfig(id, source, dest, syncConfig, pivotConfig, description, null, null);
307+
return new DataFrameTransformConfig(id, source, dest, frequency, syncConfig, pivotConfig, description, null, null);
285308
}
286309
}
287310
}

client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.Version;
2424
import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfigTests;
2525
import org.elasticsearch.common.settings.Settings;
26+
import org.elasticsearch.common.unit.TimeValue;
2627
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2728
import org.elasticsearch.common.xcontent.XContentParser;
2829
import org.elasticsearch.search.SearchModule;
@@ -43,6 +44,7 @@ public static DataFrameTransformConfig randomDataFrameTransformConfig() {
4344
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10),
4445
randomSourceConfig(),
4546
randomDestConfig(),
47+
randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1000, 1000000)),
4648
randomBoolean() ? null : randomSyncConfig(),
4749
PivotConfigTests.randomPivotConfig(),
4850
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 100),

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,9 @@ public void testPutDataFrameTransform() throws IOException, InterruptedException
156156
.setId("reviewer-avg-rating") // <1>
157157
.setSource(sourceConfig) // <2>
158158
.setDest(destConfig) // <3>
159-
.setPivotConfig(pivotConfig) // <4>
160-
.setDescription("This is my test transform") // <5>
159+
.setFrequency(TimeValue.timeValueSeconds(15)) // <4>
160+
.setPivotConfig(pivotConfig) // <5>
161+
.setDescription("This is my test transform") // <6>
161162
.build();
162163
// end::put-data-frame-transform-config
163164

docs/java-rest/high-level/dataframe/put_data_frame.asciidoc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ include-tagged::{doc-tests-file}[{api}-config]
3434
<1> The {dataframe-transform} ID
3535
<2> The source indices and query from which to gather data
3636
<3> The destination index and optional pipeline
37-
<4> The PivotConfig
38-
<5> Optional free text description of the transform
37+
<4> How often to check for updates to the source indices
38+
<5> The PivotConfig
39+
<6> Optional free text description of the transform
3940

4041
[id="{upid}-{api}-query-config"]
4142

docs/reference/data-frames/apis/put-transform.asciidoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ IMPORTANT: You must use {kib} or this API to create a {dataframe-transform}.
6060
(object) The destination configuration, which consists of `index` and
6161
optionally a `pipeline` id. See <<data-frame-transform-dest>>.
6262

63+
`frequency` (Optional)::
64+
(time units) The interval between checks for changes in the source indices
65+
when the {dataframe-transform} is running continuously. Defaults to `1m`.
66+
The lowest permitted value is `1s`; the highest `1h`.
67+
6368
`pivot` (Optional)::
6469
(object) Defines the pivot function `group by` fields and the aggregation to
6570
reduce the data. See <<data-frame-transform-pivot>>.
@@ -90,6 +95,7 @@ PUT _data_frame/transforms/ecommerce_transform
9095
"index": "kibana_sample_data_ecommerce_transform",
9196
"pipeline": "add_timestamp_pipeline"
9297
},
98+
"frequency": "5m",
9399
"pivot": {
94100
"group_by": {
95101
"customer_id": {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public final class DataFrameField {
2626
public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type");
2727
public static final ParseField SOURCE = new ParseField("source");
2828
public static final ParseField DESTINATION = new ParseField("dest");
29+
public static final ParseField FREQUENCY = new ParseField("frequency");
2930
public static final ParseField FORCE = new ParseField("force");
3031
public static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size");
3132
public static final ParseField FIELD = new ParseField("field");

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.common.io.stream.StreamInput;
1414
import org.elasticsearch.common.io.stream.StreamOutput;
1515
import org.elasticsearch.common.io.stream.Writeable;
16+
import org.elasticsearch.common.unit.TimeValue;
1617
import org.elasticsearch.common.xcontent.ToXContentObject;
1718
import org.elasticsearch.common.xcontent.XContentBuilder;
1819
import org.elasticsearch.common.xcontent.XContentParser;
@@ -34,6 +35,9 @@ public class PutDataFrameTransformAction extends ActionType<AcknowledgedResponse
3435
public static final PutDataFrameTransformAction INSTANCE = new PutDataFrameTransformAction();
3536
public static final String NAME = "cluster:admin/data_frame/put";
3637

38+
private static final TimeValue MIN_FREQUENCY = TimeValue.timeValueSeconds(1);
39+
private static final TimeValue MAX_FREQUENCY = TimeValue.timeValueHours(1);
40+
3741
private PutDataFrameTransformAction() {
3842
super(NAME);
3943
}
@@ -93,6 +97,19 @@ public ActionRequestValidationException validate() {
9397
DataFrameMessages.getMessage(DataFrameMessages.ID_TOO_LONG, DataFrameStrings.ID_LENGTH_LIMIT),
9498
validationException);
9599
}
100+
TimeValue frequency = config.getFrequency();
101+
if (frequency != null) {
102+
if (frequency.compareTo(MIN_FREQUENCY) < 0) {
103+
validationException = addValidationError(
104+
"minimum permitted [" + DataFrameField.FREQUENCY + "] is [" + MIN_FREQUENCY.getStringRep() + "]",
105+
validationException);
106+
} else if (frequency.compareTo(MAX_FREQUENCY) > 0) {
107+
validationException = addValidationError(
108+
"highest permitted [" + DataFrameField.FREQUENCY + "] is [" + MAX_FREQUENCY.getStringRep() + "]",
109+
validationException);
110+
}
111+
}
112+
96113
return validationException;
97114
}
98115

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransform.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.common.ParseField;
1212
import org.elasticsearch.common.io.stream.StreamInput;
1313
import org.elasticsearch.common.io.stream.StreamOutput;
14+
import org.elasticsearch.common.unit.TimeValue;
1415
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
1516
import org.elasticsearch.common.xcontent.XContentBuilder;
1617
import org.elasticsearch.common.xcontent.XContentParser;
@@ -24,25 +25,30 @@ public class DataFrameTransform extends AbstractDiffable<DataFrameTransform> imp
2425

2526
public static final String NAME = DataFrameField.TASK_NAME;
2627
public static final ParseField VERSION = new ParseField(DataFrameField.VERSION);
28+
public static final ParseField FREQUENCY = DataFrameField.FREQUENCY;
2729

2830
private final String transformId;
2931
private final Version version;
32+
private final TimeValue frequency;
3033

31-
public static final ConstructingObjectParser<DataFrameTransform, Void> PARSER = new ConstructingObjectParser<>(NAME,
32-
a -> new DataFrameTransform((String) a[0], (String) a[1]));
34+
public static final ConstructingObjectParser<DataFrameTransform, Void> PARSER = new ConstructingObjectParser<>(NAME, true,
35+
a -> new DataFrameTransform((String) a[0], (String) a[1], (String) a[2]));
3336

3437
static {
3538
PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameField.ID);
3639
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), VERSION);
40+
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FREQUENCY);
3741
}
3842

39-
private DataFrameTransform(String transformId, String version) {
40-
this(transformId, version == null ? null : Version.fromString(version));
43+
private DataFrameTransform(String transformId, String version, String frequency) {
44+
this(transformId, version == null ? null : Version.fromString(version),
45+
frequency == null ? null : TimeValue.parseTimeValue(frequency, FREQUENCY.getPreferredName()));
4146
}
4247

43-
public DataFrameTransform(String transformId, Version version) {
48+
public DataFrameTransform(String transformId, Version version, TimeValue frequency) {
4449
this.transformId = transformId;
4550
this.version = version == null ? Version.V_7_2_0 : version;
51+
this.frequency = frequency;
4652
}
4753

4854
public DataFrameTransform(StreamInput in) throws IOException {
@@ -52,6 +58,11 @@ public DataFrameTransform(StreamInput in) throws IOException {
5258
} else {
5359
this.version = Version.V_7_2_0;
5460
}
61+
if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
62+
this.frequency = in.readOptionalTimeValue();
63+
} else {
64+
this.frequency = null;
65+
}
5566
}
5667

5768
@Override
@@ -70,13 +81,19 @@ public void writeTo(StreamOutput out) throws IOException {
7081
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
7182
Version.writeVersion(version, out);
7283
}
84+
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
85+
out.writeOptionalTimeValue(frequency);
86+
}
7387
}
7488

7589
@Override
7690
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
7791
builder.startObject();
7892
builder.field(DataFrameField.ID.getPreferredName(), transformId);
7993
builder.field(VERSION.getPreferredName(), version);
94+
if (frequency != null) {
95+
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
96+
}
8097
builder.endObject();
8198
return builder;
8299
}
@@ -89,6 +106,10 @@ public Version getVersion() {
89106
return version;
90107
}
91108

109+
public TimeValue getFrequency() {
110+
return frequency;
111+
}
112+
92113
public static DataFrameTransform fromXContent(XContentParser parser) throws IOException {
93114
return PARSER.parse(parser, null);
94115
}
@@ -105,11 +126,13 @@ public boolean equals(Object other) {
105126

106127
DataFrameTransform that = (DataFrameTransform) other;
107128

108-
return Objects.equals(this.transformId, that.transformId) && Objects.equals(this.version, that.version);
129+
return Objects.equals(this.transformId, that.transformId)
130+
&& Objects.equals(this.version, that.version)
131+
&& Objects.equals(this.frequency, that.frequency);
109132
}
110133

111134
@Override
112135
public int hashCode() {
113-
return Objects.hash(transformId, version);
136+
return Objects.hash(transformId, version, frequency);
114137
}
115138
}

0 commit comments

Comments
 (0)