Skip to content

Commit 52a8a73

Browse files
committed
[Ml] Prevent config snapshot failure blocking migration (#37493)
1 parent 464a970 commit 52a8a73

File tree

2 files changed

+65
-1
lines changed

2 files changed

+65
-1
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.common.xcontent.XContentBuilder;
3434
import org.elasticsearch.common.xcontent.XContentFactory;
3535
import org.elasticsearch.index.IndexSettings;
36+
import org.elasticsearch.index.engine.VersionConflictEngineException;
3637
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
3738
import org.elasticsearch.xpack.core.ml.MlMetadata;
3839
import org.elasticsearch.xpack.core.ml.MlTasks;
@@ -370,7 +371,14 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listen
370371
indexResponse -> {
371372
listener.onResponse(indexResponse.getResult() == DocWriteResponse.Result.CREATED);
372373
},
373-
listener::onFailure),
374+
e -> {
375+
if (e instanceof VersionConflictEngineException) {
376+
// the snapshot already exists
377+
listener.onResponse(Boolean.TRUE);
378+
} else {
379+
listener.onFailure(e);
380+
}
381+
}),
374382
client::index
375383
);
376384
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@
66
package org.elasticsearch.xpack.ml.integration;
77

88
import org.elasticsearch.Version;
9+
import org.elasticsearch.action.DocWriteRequest;
10+
import org.elasticsearch.action.index.IndexRequestBuilder;
911
import org.elasticsearch.action.index.IndexResponse;
1012
import org.elasticsearch.action.search.SearchResponse;
13+
import org.elasticsearch.action.support.WriteRequest;
1114
import org.elasticsearch.cluster.ClusterName;
1215
import org.elasticsearch.cluster.ClusterState;
1316
import org.elasticsearch.cluster.ClusterStateUpdateTask;
@@ -180,6 +183,59 @@ public void testMigrateConfigs() throws InterruptedException, IOException {
180183
assertEquals("df-1", datafeedsHolder.get().get(0).getId());
181184
}
182185

186+
public void testExistingSnapshotDoesNotBlockMigration() throws InterruptedException {
187+
// index a doc with the same Id as the config snapshot
188+
IndexRequestBuilder indexRequest = client().prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(),
189+
ElasticsearchMappings.DOC_TYPE, "ml-config")
190+
.setSource(Collections.singletonMap("a_field", "a_value"))
191+
.setOpType(DocWriteRequest.OpType.CREATE)
192+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
193+
194+
indexRequest.execute().actionGet();
195+
196+
// define the configs
197+
MlMetadata.Builder mlMetadata = new MlMetadata.Builder();
198+
mlMetadata.putJob(buildJobBuilder("job-foo").build(), false);
199+
200+
MetaData.Builder metaData = MetaData.builder();
201+
RoutingTable.Builder routingTable = RoutingTable.builder();
202+
addMlConfigIndex(metaData, routingTable);
203+
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
204+
.metaData(metaData.putCustom(MlMetadata.TYPE, mlMetadata.build()))
205+
.routingTable(routingTable.build())
206+
.build();
207+
208+
doAnswer(invocation -> {
209+
ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1];
210+
listener.clusterStateProcessed("source", mock(ClusterState.class), mock(ClusterState.class));
211+
return null;
212+
}).when(clusterService).submitStateUpdateTask(eq("remove-migrated-ml-configs"), any());
213+
214+
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
215+
AtomicReference<Boolean> responseHolder = new AtomicReference<>();
216+
217+
// do the migration
218+
MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService);
219+
// writing the snapshot should fail because the doc already exists
220+
// in which case the migration should continue
221+
blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener),
222+
responseHolder, exceptionHolder);
223+
224+
assertNull(exceptionHolder.get());
225+
assertTrue(responseHolder.get());
226+
227+
// check the jobs have been migrated
228+
AtomicReference<List<Job.Builder>> jobsHolder = new AtomicReference<>();
229+
JobConfigProvider jobConfigProvider = new JobConfigProvider(client());
230+
blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, true, actionListener),
231+
jobsHolder, exceptionHolder);
232+
233+
assertNull(exceptionHolder.get());
234+
assertThat(jobsHolder.get(), hasSize(1));
235+
assertTrue(jobsHolder.get().get(0).build().getCustomSettings().containsKey(MlConfigMigrator.MIGRATED_FROM_VERSION));
236+
assertEquals("job-foo", jobsHolder.get().get(0).build().getId());
237+
}
238+
183239
public void testMigrateConfigs_GivenLargeNumberOfJobsAndDatafeeds() throws InterruptedException {
184240
int jobCount = randomIntBetween(150, 201);
185241
int datafeedCount = randomIntBetween(150, jobCount);

0 commit comments

Comments
 (0)