Skip to content

Commit 7fd250e

Browse files
[FEATURE][ML] Add cluster setting that enables/disables config migration (#36700)
This commit adds a cluster settings called `xpack.ml.enable_config_migration`. The setting is `true` by default. When set to `false`, no config migration will be attempted and non-migrated resources (e.g. jobs, datafeeds) will be able to be updated normally. Relates #32905
1 parent 5eb20d2 commit 7fd250e

15 files changed

+568
-218
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
267267
public static final Setting<Integer> MAX_MACHINE_MEMORY_PERCENT =
268268
Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 90, Property.Dynamic, Property.NodeScope);
269269
public static final Setting<Integer> MAX_LAZY_ML_NODES =
270-
Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope);
270+
Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope);
271271

272272
private static final Logger logger = LogManager.getLogger(XPackPlugin.class);
273273

@@ -303,7 +303,8 @@ public List<Setting<?>> getSettings() {
303303
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC,
304304
AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE,
305305
AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE,
306-
AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP));
306+
AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP,
307+
MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION));
307308
}
308309

309310
public Settings additionalSettings() {
@@ -439,7 +440,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
439440
jobDataCountsPersister,
440441
datafeedManager,
441442
auditor,
442-
new MlAssignmentNotifier(auditor, threadPool, client, clusterService),
443+
new MlAssignmentNotifier(settings, auditor, threadPool, client, clusterService),
443444
memoryTracker
444445
);
445446
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.cluster.LocalNodeMasterListener;
1616
import org.elasticsearch.cluster.node.DiscoveryNode;
1717
import org.elasticsearch.cluster.service.ClusterService;
18+
import org.elasticsearch.common.settings.Settings;
1819
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
1920
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
2021
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
@@ -37,10 +38,10 @@ public class MlAssignmentNotifier implements ClusterStateListener, LocalNodeMast
3738
private final ThreadPool threadPool;
3839
private final AtomicBoolean enabled = new AtomicBoolean(false);
3940

40-
MlAssignmentNotifier(Auditor auditor, ThreadPool threadPool, Client client, ClusterService clusterService) {
41+
MlAssignmentNotifier(Settings settings, Auditor auditor, ThreadPool threadPool, Client client, ClusterService clusterService) {
4142
this.auditor = auditor;
4243
this.clusterService = clusterService;
43-
this.mlConfigMigrator = new MlConfigMigrator(client, clusterService);
44+
this.mlConfigMigrator = new MlConfigMigrator(settings, client, clusterService);
4445
this.threadPool = threadPool;
4546
clusterService.addLocalNodeMasterListener(this);
4647
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml;
7+
8+
import org.elasticsearch.Version;
9+
import org.elasticsearch.cluster.ClusterState;
10+
import org.elasticsearch.cluster.service.ClusterService;
11+
import org.elasticsearch.common.settings.Setting;
12+
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
14+
import org.elasticsearch.xpack.core.ml.MlMetadata;
15+
import org.elasticsearch.xpack.core.ml.MlTasks;
16+
import org.elasticsearch.xpack.core.ml.job.config.Job;
17+
18+
/**
19+
* Checks whether migration can start and whether ML resources (e.g. jobs, datafeeds)
20+
* are eligible to be migrated from the cluster state into the config index
21+
*/
22+
public class MlConfigMigrationEligibilityCheck {
23+
24+
private static final Version MIN_NODE_VERSION = Version.V_6_6_0;
25+
26+
public static final Setting<Boolean> ENABLE_CONFIG_MIGRATION = Setting.boolSetting(
27+
"xpack.ml.enable_config_migration", true, Setting.Property.Dynamic, Setting.Property.NodeScope);
28+
29+
private volatile boolean isConfigMigrationEnabled;
30+
31+
public MlConfigMigrationEligibilityCheck(Settings settings, ClusterService clusterService) {
32+
isConfigMigrationEnabled = ENABLE_CONFIG_MIGRATION.get(settings);
33+
clusterService.getClusterSettings().addSettingsUpdateConsumer(ENABLE_CONFIG_MIGRATION, this::setConfigMigrationEnabled);
34+
}
35+
36+
private void setConfigMigrationEnabled(boolean configMigrationEnabled) {
37+
this.isConfigMigrationEnabled = configMigrationEnabled;
38+
}
39+
40+
/**
41+
* Can migration start? Returns:
42+
* False if config migration is disabled via the setting {@link #ENABLE_CONFIG_MIGRATION}
43+
* False if the min node version of the cluster is before {@link #MIN_NODE_VERSION}
44+
* True otherwise
45+
* @param clusterState The cluster state
46+
* @return A boolean that dictates if config migration can start
47+
*/
48+
public boolean canStartMigration(ClusterState clusterState) {
49+
if (isConfigMigrationEnabled == false) {
50+
return false;
51+
}
52+
53+
Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
54+
if (minNodeVersion.before(MIN_NODE_VERSION)) {
55+
return false;
56+
}
57+
return true;
58+
}
59+
60+
/**
61+
* Is the job a eligible for migration? Returns:
62+
* False if {@link #canStartMigration(ClusterState)} returns {@code false}
63+
* False if the {@link Job#isDeleting()}
64+
* False if the job has a persistent task
65+
* True otherwise i.e. the job is present, not deleting
66+
* and does not have a persistent task.
67+
*
68+
* @param jobId The job Id
69+
* @param clusterState The cluster state
70+
* @return A boolean depending on the conditions listed above
71+
*/
72+
public boolean jobIsEligibleForMigration(String jobId, ClusterState clusterState) {
73+
if (canStartMigration(clusterState) == false) {
74+
return false;
75+
}
76+
77+
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
78+
Job job = mlMetadata.getJobs().get(jobId);
79+
80+
if (job == null || job.isDeleting()) {
81+
return false;
82+
}
83+
84+
PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
85+
return MlTasks.openJobIds(persistentTasks).contains(jobId) == false;
86+
}
87+
88+
/**
89+
* Is the datafeed a eligible for migration? Returns:
90+
* False if {@link #canStartMigration(ClusterState)} returns {@code false}
91+
* False if the datafeed is not in the cluster state
92+
* False if the datafeed has a persistent task
93+
* True otherwise i.e. the datafeed is present and does not have a persistent task.
94+
*
95+
* @param datafeedId The datafeed Id
96+
* @param clusterState The cluster state
97+
* @return A boolean depending on the conditions listed above
98+
*/
99+
public boolean datafeedIsEligibleForMigration(String datafeedId, ClusterState clusterState) {
100+
if (canStartMigration(clusterState) == false) {
101+
return false;
102+
}
103+
104+
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
105+
if (mlMetadata.getDatafeeds().containsKey(datafeedId) == false) {
106+
return false;
107+
}
108+
109+
PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
110+
return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false;
111+
}
112+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java

Lines changed: 9 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.cluster.ClusterStateUpdateTask;
2020
import org.elasticsearch.cluster.metadata.MetaData;
2121
import org.elasticsearch.cluster.service.ClusterService;
22+
import org.elasticsearch.common.settings.Settings;
2223
import org.elasticsearch.common.xcontent.ToXContent;
2324
import org.elasticsearch.common.xcontent.ToXContentObject;
2425
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -41,6 +42,7 @@
4142
import java.util.Iterator;
4243
import java.util.List;
4344
import java.util.Map;
45+
import java.util.Objects;
4446
import java.util.Set;
4547
import java.util.concurrent.atomic.AtomicBoolean;
4648
import java.util.concurrent.atomic.AtomicReference;
@@ -80,18 +82,19 @@ public class MlConfigMigrator {
8082
private static final Logger logger = LogManager.getLogger(MlConfigMigrator.class);
8183

8284
public static final String MIGRATED_FROM_VERSION = "migrated from version";
83-
public static final Version MIN_NODE_VERSION = Version.V_6_6_0;
8485

8586
static final int MAX_BULK_WRITE_SIZE = 100;
8687

8788
private final Client client;
8889
private final ClusterService clusterService;
90+
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
8991

9092
private final AtomicBoolean migrationInProgress;
9193

92-
public MlConfigMigrator(Client client, ClusterService clusterService) {
93-
this.client = client;
94-
this.clusterService = clusterService;
94+
public MlConfigMigrator(Settings settings, Client client, ClusterService clusterService) {
95+
this.client = Objects.requireNonNull(client);
96+
this.clusterService = Objects.requireNonNull(clusterService);
97+
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
9598
this.migrationInProgress = new AtomicBoolean(false);
9699
}
97100

@@ -114,9 +117,8 @@ public MlConfigMigrator(Client client, ClusterService clusterService) {
114117
*/
115118
public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener<Boolean> listener) {
116119

117-
Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
118-
if (minNodeVersion.before(MIN_NODE_VERSION)) {
119-
listener.onResponse(Boolean.FALSE);
120+
if (migrationEligibilityCheck.canStartMigration(clusterState) == false) {
121+
listener.onResponse(false);
120122
return;
121123
}
122124

@@ -454,60 +456,4 @@ static List<String> filterFailedDatafeedConfigWrites(Set<String> failedDocumentI
454456
.filter(id -> failedDocumentIds.contains(DatafeedConfig.documentId(id)) == false)
455457
.collect(Collectors.toList());
456458
}
457-
458-
/**
459-
* Is the job a eligible for migration? Returns:
460-
* False if the min node version of the cluster is before {@link #MIN_NODE_VERSION}
461-
* False if the job is not in the cluster state
462-
* False if the {@link Job#isDeleting()}
463-
* False if the job has a persistent task
464-
* True otherwise i.e. the job is present, not deleting
465-
* and does not have a persistent task.
466-
*
467-
* @param jobId The job Id
468-
* @param clusterState clusterstate
469-
* @return A boolean depending on the conditions listed above
470-
*/
471-
public static boolean jobIsEligibleForMigration(String jobId, ClusterState clusterState) {
472-
Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
473-
if (minNodeVersion.before(MIN_NODE_VERSION)) {
474-
return false;
475-
}
476-
477-
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
478-
Job job = mlMetadata.getJobs().get(jobId);
479-
480-
if (job == null || job.isDeleting()) {
481-
return false;
482-
}
483-
484-
PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
485-
return MlTasks.openJobIds(persistentTasks).contains(jobId) == false;
486-
}
487-
488-
/**
489-
* Is the datafeed a eligible for migration? Returns:
490-
* False if the min node version of the cluster is before {@link #MIN_NODE_VERSION}
491-
* False if the datafeed is not in the cluster state
492-
* False if the datafeed has a persistent task
493-
* True otherwise i.e. the datafeed is present and does not have a persistent task.
494-
*
495-
* @param datafeedId The datafeed Id
496-
* @param clusterState clusterstate
497-
* @return A boolean depending on the conditions listed above
498-
*/
499-
public static boolean datafeedIsEligibleForMigration(String datafeedId, ClusterState clusterState) {
500-
Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
501-
if (minNodeVersion.before(MIN_NODE_VERSION)) {
502-
return false;
503-
}
504-
505-
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
506-
if (mlMetadata.getDatafeeds().containsKey(datafeedId) == false) {
507-
return false;
508-
}
509-
510-
PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
511-
return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false;
512-
}
513459
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1818
import org.elasticsearch.cluster.service.ClusterService;
1919
import org.elasticsearch.common.inject.Inject;
20+
import org.elasticsearch.common.settings.Settings;
2021
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2122
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
2223
import org.elasticsearch.persistent.PersistentTasksService;
@@ -28,7 +29,7 @@
2829
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
2930
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
3031
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
31-
import org.elasticsearch.xpack.ml.MlConfigMigrator;
32+
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
3233
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
3334

3435
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
@@ -40,9 +41,10 @@ public class TransportDeleteDatafeedAction extends TransportMasterNodeAction<Del
4041
private final DatafeedConfigProvider datafeedConfigProvider;
4142
private final ClusterService clusterService;
4243
private final PersistentTasksService persistentTasksService;
44+
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
4345

4446
@Inject
45-
public TransportDeleteDatafeedAction(TransportService transportService, ClusterService clusterService,
47+
public TransportDeleteDatafeedAction(Settings settings, TransportService transportService, ClusterService clusterService,
4648
ThreadPool threadPool, ActionFilters actionFilters,
4749
IndexNameExpressionResolver indexNameExpressionResolver,
4850
Client client, PersistentTasksService persistentTasksService,
@@ -53,6 +55,7 @@ public TransportDeleteDatafeedAction(TransportService transportService, ClusterS
5355
this.datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry);
5456
this.persistentTasksService = persistentTasksService;
5557
this.clusterService = clusterService;
58+
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
5659
}
5760

5861
@Override
@@ -69,7 +72,7 @@ protected AcknowledgedResponse newResponse() {
6972
protected void masterOperation(DeleteDatafeedAction.Request request, ClusterState state,
7073
ActionListener<AcknowledgedResponse> listener) {
7174

72-
if (MlConfigMigrator.datafeedIsEligibleForMigration(request.getDatafeedId(), state)) {
75+
if (migrationEligibilityCheck.datafeedIsEligibleForMigration(request.getDatafeedId(), state)) {
7376
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("delete datafeed", request.getDatafeedId()));
7477
return;
7578
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.common.CheckedConsumer;
3434
import org.elasticsearch.common.Nullable;
3535
import org.elasticsearch.common.inject.Inject;
36+
import org.elasticsearch.common.settings.Settings;
3637
import org.elasticsearch.index.IndexNotFoundException;
3738
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
3839
import org.elasticsearch.index.query.IdsQueryBuilder;
@@ -63,7 +64,7 @@
6364
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
6465
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
6566
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
66-
import org.elasticsearch.xpack.ml.MlConfigMigrator;
67+
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
6768
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
6869
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
6970
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
@@ -96,6 +97,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
9697
private final JobConfigProvider jobConfigProvider;
9798
private final DatafeedConfigProvider datafeedConfigProvider;
9899
private final MlMemoryTracker memoryTracker;
100+
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
99101

100102
/**
101103
* A map of task listeners by job_id.
@@ -106,7 +108,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
106108
private final Map<String, List<ActionListener<AcknowledgedResponse>>> listenersByJobId;
107109

108110
@Inject
109-
public TransportDeleteJobAction(TransportService transportService, ClusterService clusterService,
111+
public TransportDeleteJobAction(Settings settings, TransportService transportService, ClusterService clusterService,
110112
ThreadPool threadPool, ActionFilters actionFilters,
111113
IndexNameExpressionResolver indexNameExpressionResolver, PersistentTasksService persistentTasksService,
112114
Client client, Auditor auditor, JobResultsProvider jobResultsProvider,
@@ -121,6 +123,7 @@ public TransportDeleteJobAction(TransportService transportService, ClusterServic
121123
this.jobConfigProvider = jobConfigProvider;
122124
this.datafeedConfigProvider = datafeedConfigProvider;
123125
this.memoryTracker = memoryTracker;
126+
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
124127
this.listenersByJobId = new HashMap<>();
125128
}
126129

@@ -148,7 +151,7 @@ protected void masterOperation(DeleteJobAction.Request request, ClusterState sta
148151
protected void masterOperation(Task task, DeleteJobAction.Request request, ClusterState state,
149152
ActionListener<AcknowledgedResponse> listener) {
150153

151-
if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobId(), state)) {
154+
if (migrationEligibilityCheck.jobIsEligibleForMigration(request.getJobId(), state)) {
152155
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("delete job", request.getJobId()));
153156
return;
154157
}

0 commit comments

Comments
 (0)