Skip to content

Commit dadac0c

Browse files
przemekwitekjkakavas
authored andcommitted
Remove the ability to update datafeed's job_id. (#44752)
1 parent fecde5d commit dadac0c

File tree

9 files changed

+56
-190
lines changed

9 files changed

+56
-190
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.elasticsearch.client.ml.datafeed;
2020

21-
import org.elasticsearch.client.ml.job.config.Job;
2221
import org.elasticsearch.common.ParseField;
2322
import org.elasticsearch.common.bytes.BytesArray;
2423
import org.elasticsearch.common.bytes.BytesReference;
@@ -57,7 +56,6 @@ public class DatafeedUpdate implements ToXContentObject {
5756
static {
5857
PARSER.declareString(ConstructingObjectParser.constructorArg(), DatafeedConfig.ID);
5958

60-
PARSER.declareString(Builder::setJobId, Job.ID);
6159
PARSER.declareStringArray(Builder::setIndices, DatafeedConfig.INDEXES);
6260
PARSER.declareStringArray(Builder::setIndices, DatafeedConfig.INDICES);
6361
PARSER.declareString((builder, val) -> builder.setQueryDelay(
@@ -88,7 +86,6 @@ private static BytesReference parseBytes(XContentParser parser) throws IOExcepti
8886
}
8987

9088
private final String id;
91-
private final String jobId;
9289
private final TimeValue queryDelay;
9390
private final TimeValue frequency;
9491
private final List<String> indices;
@@ -99,11 +96,10 @@ private static BytesReference parseBytes(XContentParser parser) throws IOExcepti
9996
private final ChunkingConfig chunkingConfig;
10097
private final DelayedDataCheckConfig delayedDataCheckConfig;
10198

102-
private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, BytesReference query,
99+
private DatafeedUpdate(String id, TimeValue queryDelay, TimeValue frequency, List<String> indices, BytesReference query,
103100
BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields, Integer scrollSize,
104101
ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) {
105102
this.id = id;
106-
this.jobId = jobId;
107103
this.queryDelay = queryDelay;
108104
this.frequency = frequency;
109105
this.indices = indices;
@@ -126,7 +122,6 @@ public String getId() {
126122
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
127123
builder.startObject();
128124
builder.field(DatafeedConfig.ID.getPreferredName(), id);
129-
addOptionalField(builder, Job.ID, jobId);
130125
if (queryDelay != null) {
131126
builder.field(DatafeedConfig.QUERY_DELAY.getPreferredName(), queryDelay.getStringRep());
132127
}
@@ -162,10 +157,6 @@ private void addOptionalField(XContentBuilder builder, ParseField field, Object
162157
}
163158
}
164159

165-
public String getJobId() {
166-
return jobId;
167-
}
168-
169160
public TimeValue getQueryDelay() {
170161
return queryDelay;
171162
}
@@ -228,7 +219,6 @@ public boolean equals(Object other) {
228219
DatafeedUpdate that = (DatafeedUpdate) other;
229220

230221
return Objects.equals(this.id, that.id)
231-
&& Objects.equals(this.jobId, that.jobId)
232222
&& Objects.equals(this.frequency, that.frequency)
233223
&& Objects.equals(this.queryDelay, that.queryDelay)
234224
&& Objects.equals(this.indices, that.indices)
@@ -247,7 +237,7 @@ public boolean equals(Object other) {
247237
*/
248238
@Override
249239
public int hashCode() {
250-
return Objects.hash(id, jobId, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields,
240+
return Objects.hash(id, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields,
251241
chunkingConfig, delayedDataCheckConfig);
252242
}
253243

@@ -258,7 +248,6 @@ public static Builder builder(String id) {
258248
public static class Builder {
259249

260250
private String id;
261-
private String jobId;
262251
private TimeValue queryDelay;
263252
private TimeValue frequency;
264253
private List<String> indices;
@@ -275,7 +264,6 @@ public Builder(String id) {
275264

276265
public Builder(DatafeedUpdate config) {
277266
this.id = config.id;
278-
this.jobId = config.jobId;
279267
this.queryDelay = config.queryDelay;
280268
this.frequency = config.frequency;
281269
this.indices = config.indices;
@@ -287,11 +275,6 @@ public Builder(DatafeedUpdate config) {
287275
this.delayedDataCheckConfig = config.delayedDataCheckConfig;
288276
}
289277

290-
public Builder setJobId(String jobId) {
291-
this.jobId = jobId;
292-
return this;
293-
}
294-
295278
public Builder setIndices(List<String> indices) {
296279
this.indices = indices;
297280
return this;
@@ -364,7 +347,7 @@ public Builder setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheck
364347
}
365348

366349
public DatafeedUpdate build() {
367-
return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize,
350+
return new DatafeedUpdate(id, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize,
368351
chunkingConfig, delayedDataCheckConfig);
369352
}
370353

client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -463,31 +463,6 @@ public void testUpdateDatafeed() throws Exception {
463463
assertThat(datafeedUpdate.getScrollSize(), equalTo(updatedDatafeed.getScrollSize()));
464464
}
465465

466-
public void testUpdateDatafeed_UpdatingJobIdIsDeprecated() throws Exception {
467-
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
468-
469-
String jobId = randomValidJobId();
470-
Job job = buildJob(jobId);
471-
execute(new PutJobRequest(job), machineLearningClient::putJob, machineLearningClient::putJobAsync);
472-
473-
String anotherJobId = randomValidJobId();
474-
Job anotherJob = buildJob(anotherJobId);
475-
execute(new PutJobRequest(anotherJob), machineLearningClient::putJob, machineLearningClient::putJobAsync);
476-
477-
String datafeedId = "datafeed-" + jobId;
478-
DatafeedConfig datafeedConfig = DatafeedConfig.builder(datafeedId, jobId).setIndices("some_data_index").build();
479-
execute(new PutDatafeedRequest(datafeedConfig), machineLearningClient::putDatafeed, machineLearningClient::putDatafeedAsync);
480-
481-
DatafeedUpdate datafeedUpdateWithChangedJobId = DatafeedUpdate.builder(datafeedId).setJobId(anotherJobId).build();
482-
WarningFailureException exception = expectThrows(
483-
WarningFailureException.class,
484-
() -> execute(
485-
new UpdateDatafeedRequest(datafeedUpdateWithChangedJobId),
486-
machineLearningClient::updateDatafeed,
487-
machineLearningClient::updateDatafeedAsync));
488-
assertThat(exception.getResponse().getWarnings(), contains("The ability to update a datafeed's job_id is deprecated."));
489-
}
490-
491466
public void testGetDatafeed() throws Exception {
492467
String jobId1 = "test-get-datafeed-job-1";
493468
String jobId2 = "test-get-datafeed-job-2";

client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdateTests.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@ public class DatafeedUpdateTests extends AbstractXContentTestCase<DatafeedUpdate
3434

3535
public static DatafeedUpdate createRandom() {
3636
DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(DatafeedConfigTests.randomValidDatafeedId());
37-
if (randomBoolean()) {
38-
builder.setJobId(randomAlphaOfLength(10));
39-
}
4037
if (randomBoolean()) {
4138
builder.setQueryDelay(TimeValue.timeValueMillis(randomIntBetween(1, Integer.MAX_VALUE)));
4239
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,12 @@
55
*/
66
package org.elasticsearch.xpack.core.ml.datafeed;
77

8-
import org.apache.logging.log4j.LogManager;
98
import org.elasticsearch.Version;
109
import org.elasticsearch.common.ParseField;
1110
import org.elasticsearch.common.Strings;
1211
import org.elasticsearch.common.io.stream.StreamInput;
1312
import org.elasticsearch.common.io.stream.StreamOutput;
1413
import org.elasticsearch.common.io.stream.Writeable;
15-
import org.elasticsearch.common.logging.DeprecationLogger;
1614
import org.elasticsearch.common.unit.TimeValue;
1715
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
1816
import org.elasticsearch.common.xcontent.ObjectParser;
@@ -46,8 +44,7 @@
4644
*/
4745
public class DatafeedUpdate implements Writeable, ToXContentObject {
4846

49-
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(DatafeedUpdate.class));
50-
private static final String DEPRECATION_MESSAGE_ON_JOB_ID_UPDATE = "The ability to update a datafeed's job_id is deprecated.";
47+
static final String ERROR_MESSAGE_ON_JOB_ID_UPDATE = "Datafeed's job_id cannot be changed.";
5148

5249
public static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("datafeed_update", Builder::new);
5350

@@ -110,9 +107,6 @@ private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue
110107
this.scrollSize = scrollSize;
111108
this.chunkingConfig = chunkingConfig;
112109
this.delayedDataCheckConfig = delayedDataCheckConfig;
113-
if (jobId != null) {
114-
deprecationLogger.deprecated(DEPRECATION_MESSAGE_ON_JOB_ID_UPDATE);
115-
}
116110
}
117111

118112
public DatafeedUpdate(StreamInput in) throws IOException {
@@ -298,6 +292,9 @@ public DatafeedConfig apply(DatafeedConfig datafeedConfig, Map<String, String> h
298292

299293
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedConfig);
300294
if (jobId != null) {
295+
if (datafeedConfig.getJobId() != null && datafeedConfig.getJobId().equals(jobId) == false) {
296+
throw ExceptionsHelper.badRequestException(ERROR_MESSAGE_ON_JOB_ID_UPDATE);
297+
}
301298
builder.setJobId(jobId);
302299
}
303300
if (queryDelay != null) {

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateDatafeedActionRequestTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public void setUpDatafeedId() {
3030

3131
@Override
3232
protected Request createTestInstance() {
33-
return new Request(DatafeedUpdateTests.createRandomized(datafeedId, null, false));
33+
return new Request(DatafeedUpdateTests.createRandomized(datafeedId));
3434
}
3535

3636
@Override

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdateTests.java

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package org.elasticsearch.xpack.core.ml.datafeed;
77

8+
import org.elasticsearch.ElasticsearchStatusException;
89
import org.elasticsearch.Version;
910
import org.elasticsearch.common.Nullable;
1011
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -23,6 +24,7 @@
2324
import org.elasticsearch.index.query.BoolQueryBuilder;
2425
import org.elasticsearch.index.query.QueryBuilders;
2526
import org.elasticsearch.index.query.TermQueryBuilder;
27+
import org.elasticsearch.rest.RestStatus;
2628
import org.elasticsearch.script.Script;
2729
import org.elasticsearch.search.SearchModule;
2830
import org.elasticsearch.search.aggregations.AggregationBuilders;
@@ -77,14 +79,11 @@ protected DatafeedUpdate createTestInstance() {
7779
}
7880

7981
public static DatafeedUpdate createRandomized(String datafeedId) {
80-
return createRandomized(datafeedId, null, true);
82+
return createRandomized(datafeedId, null);
8183
}
8284

83-
public static DatafeedUpdate createRandomized(String datafeedId, @Nullable DatafeedConfig datafeed, boolean canSetJobId) {
85+
public static DatafeedUpdate createRandomized(String datafeedId, @Nullable DatafeedConfig datafeed) {
8486
DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(datafeedId);
85-
if (randomBoolean() && datafeed == null && canSetJobId) {
86-
builder.setJobId(randomAlphaOfLength(10));
87-
}
8887
if (randomBoolean()) {
8988
builder.setQueryDelay(TimeValue.timeValueMillis(randomIntBetween(1, Integer.MAX_VALUE)));
9089
}
@@ -197,6 +196,24 @@ public void testApply_failBecauseTargetDatafeedHasDifferentId() {
197196
expectThrows(IllegalArgumentException.class, () -> createRandomized(datafeed.getId() + "_2").apply(datafeed, null));
198197
}
199198

199+
public void testApply_failBecauseJobIdChanged() {
200+
DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig("foo");
201+
202+
DatafeedUpdate datafeedUpdateWithUnchangedJobId = new DatafeedUpdate.Builder(datafeed.getId())
203+
.setJobId("foo")
204+
.build();
205+
DatafeedConfig updatedDatafeed = datafeedUpdateWithUnchangedJobId.apply(datafeed, Collections.emptyMap());
206+
assertThat(updatedDatafeed, equalTo(datafeed));
207+
208+
DatafeedUpdate datafeedUpdateWithChangedJobId = new DatafeedUpdate.Builder(datafeed.getId())
209+
.setJobId("bar")
210+
.build();
211+
ElasticsearchStatusException ex = expectThrows(
212+
ElasticsearchStatusException.class, () -> datafeedUpdateWithChangedJobId.apply(datafeed, Collections.emptyMap()));
213+
assertThat(ex.status(), equalTo(RestStatus.BAD_REQUEST));
214+
assertThat(ex.getMessage(), equalTo(DatafeedUpdate.ERROR_MESSAGE_ON_JOB_ID_UPDATE));
215+
}
216+
200217
public void testApply_givenEmptyUpdate() {
201218
DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig("foo");
202219
DatafeedConfig updatedDatafeed = new DatafeedUpdate.Builder(datafeed.getId()).build().apply(datafeed, Collections.emptyMap());
@@ -223,7 +240,6 @@ public void testApply_givenFullUpdateNoAggregations() {
223240
DatafeedConfig datafeed = datafeedBuilder.build();
224241
QueryProvider queryProvider = createRandomValidQueryProvider("a", "b");
225242
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeed.getId());
226-
update.setJobId("bar");
227243
update.setIndices(Collections.singletonList("i_2"));
228244
update.setQueryDelay(TimeValue.timeValueSeconds(42));
229245
update.setFrequency(TimeValue.timeValueSeconds(142));
@@ -235,7 +251,7 @@ public void testApply_givenFullUpdateNoAggregations() {
235251

236252
DatafeedConfig updatedDatafeed = update.build().apply(datafeed, Collections.emptyMap());
237253

238-
assertThat(updatedDatafeed.getJobId(), equalTo("bar"));
254+
assertThat(updatedDatafeed.getJobId(), equalTo("foo-feed"));
239255
assertThat(updatedDatafeed.getIndices(), equalTo(Collections.singletonList("i_2")));
240256
assertThat(updatedDatafeed.getQueryDelay(), equalTo(TimeValue.timeValueSeconds(42)));
241257
assertThat(updatedDatafeed.getFrequency(), equalTo(TimeValue.timeValueSeconds(142)));
@@ -276,9 +292,9 @@ public void testApply_GivenRandomUpdates_AssertImmutability() {
276292
withoutAggs.setAggProvider(null);
277293
datafeed = withoutAggs.build();
278294
}
279-
DatafeedUpdate update = createRandomized(datafeed.getId(), datafeed, true);
295+
DatafeedUpdate update = createRandomized(datafeed.getId(), datafeed);
280296
while (update.isNoop(datafeed)) {
281-
update = createRandomized(datafeed.getId(), datafeed, true);
297+
update = createRandomized(datafeed.getId(), datafeed);
282298
}
283299

284300
DatafeedConfig updatedDatafeed = update.apply(datafeed, Collections.emptyMap());
@@ -339,12 +355,9 @@ public void testSerializationOfComplexAggsBetweenVersions() throws IOException {
339355
@Override
340356
protected DatafeedUpdate mutateInstance(DatafeedUpdate instance) throws IOException {
341357
DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(instance);
342-
switch (between(0, 9)) {
343-
case 0:
344-
builder.setId(instance.getId() + DatafeedConfigTests.randomValidDatafeedId());
345-
break;
358+
switch (between(1, 9)) {
346359
case 1:
347-
builder.setJobId(instance.getJobId() + randomAlphaOfLength(5));
360+
builder.setId(instance.getId() + DatafeedConfigTests.randomValidDatafeedId());
348361
break;
349362
case 2:
350363
if (instance.getQueryDelay() == null) {

x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
1111
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
1212
import org.elasticsearch.action.support.master.AcknowledgedResponse;
13-
import org.elasticsearch.common.CheckedConsumer;
1413
import org.elasticsearch.common.CheckedRunnable;
1514
import org.elasticsearch.common.unit.TimeValue;
1615
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@@ -137,48 +136,6 @@ public void testDatafeedTimingStats_DatafeedRecreated() throws Exception {
137136
openAndRunJob.run();
138137
}
139138

140-
public void testDatafeedTimingStats_DatafeedJobIdUpdated() throws Exception {
141-
client().admin().indices().prepareCreate("data")
142-
.addMapping("type", "time", "type=date")
143-
.get();
144-
long numDocs = randomIntBetween(32, 2048);
145-
Instant now = Instant.now();
146-
indexDocs(logger, "data", numDocs, now.minus(Duration.ofDays(14)).toEpochMilli(), now.toEpochMilli());
147-
148-
Job.Builder jobA = createScheduledJob("lookback-job-jobid-updated");
149-
Job.Builder jobB = createScheduledJob("other-lookback-job-jobid-updated");
150-
for (Job.Builder job : Arrays.asList(jobA, jobB)) {
151-
registerJob(job);
152-
putJob(job);
153-
}
154-
155-
String datafeedId = "lookback-datafeed";
156-
DatafeedConfig datafeedConfig = createDatafeed(datafeedId, jobA.getId(), Arrays.asList("data"));
157-
registerDatafeed(datafeedConfig);
158-
putDatafeed(datafeedConfig);
159-
160-
CheckedConsumer<Job.Builder, Exception> openAndRunJob = job -> {
161-
openJob(job.getId());
162-
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
163-
// Bind datafeedId to the current job on the list, timing stats are wiped out.
164-
// Datafeed did not do anything yet, hence search_count is equal to 0.
165-
assertDatafeedStats(datafeedId, DatafeedState.STOPPED, job.getId(), equalTo(0L));
166-
startDatafeed(datafeedId, 0L, now.toEpochMilli());
167-
assertBusy(() -> {
168-
assertThat(getDataCounts(job.getId()).getProcessedRecordCount(), equalTo(numDocs));
169-
// Datafeed processed numDocs documents so search_count must be greater than 0.
170-
assertDatafeedStats(datafeedId, DatafeedState.STOPPED, job.getId(), greaterThan(0L));
171-
}, 60, TimeUnit.SECONDS);
172-
waitUntilJobIsClosed(job.getId());
173-
};
174-
175-
openAndRunJob.accept(jobA);
176-
updateDatafeed(new DatafeedUpdate.Builder(datafeedId).setJobId(jobB.getId()).build()); // wipes out timing stats
177-
openAndRunJob.accept(jobB);
178-
updateDatafeed(new DatafeedUpdate.Builder(datafeedId).setJobId(jobA.getId()).build()); // wipes out timing stats
179-
openAndRunJob.accept(jobA);
180-
}
181-
182139
public void testDatafeedTimingStats_QueryDelayUpdated_TimingStatsNotReset() throws Exception {
183140
client().admin().indices().prepareCreate("data")
184141
.addMapping("type", "time", "type=date")

0 commit comments

Comments
 (0)