Skip to content

Commit 6aeaad1

Browse files
[FEATURE][ML] Add cluster setting that enables/disables config migration
This commit adds a cluster settings called `xpack.ml.enable_config_migration`. The setting is `true` by default. When set to `false`, no config migration will be attempted and non-migrated resources (e.g. jobs, datafeeds) will be able to be updated normally. Relates #32905
1 parent 28b99f0 commit 6aeaad1

15 files changed

+559
-231
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
268268
public static final Setting<Integer> MAX_MACHINE_MEMORY_PERCENT =
269269
Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 90, Property.Dynamic, Property.NodeScope);
270270
public static final Setting<Integer> MAX_LAZY_ML_NODES =
271-
Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope);
271+
Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope);
272272

273273
private static final Logger logger = LogManager.getLogger(XPackPlugin.class);
274274

@@ -308,7 +308,8 @@ public List<Setting<?>> getSettings() {
308308
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC,
309309
AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE,
310310
AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE,
311-
AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP));
311+
AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP,
312+
MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION));
312313
}
313314

314315
public Settings additionalSettings() {
@@ -444,7 +445,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
444445
jobDataCountsPersister,
445446
datafeedManager,
446447
auditor,
447-
new MlAssignmentNotifier(auditor, threadPool, client, clusterService),
448+
new MlAssignmentNotifier(settings, auditor, threadPool, client, clusterService),
448449
memoryTracker
449450
);
450451
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.cluster.LocalNodeMasterListener;
1616
import org.elasticsearch.cluster.node.DiscoveryNode;
1717
import org.elasticsearch.cluster.service.ClusterService;
18+
import org.elasticsearch.common.settings.Settings;
1819
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
1920
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
2021
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
@@ -36,10 +37,10 @@ public class MlAssignmentNotifier implements ClusterStateListener, LocalNodeMast
3637
private final ThreadPool threadPool;
3738
private final AtomicBoolean enabled = new AtomicBoolean(false);
3839

39-
MlAssignmentNotifier(Auditor auditor, ThreadPool threadPool, Client client, ClusterService clusterService) {
40+
MlAssignmentNotifier(Settings settings, Auditor auditor, ThreadPool threadPool, Client client, ClusterService clusterService) {
4041
this.auditor = auditor;
4142
this.clusterService = clusterService;
42-
this.mlConfigMigrator = new MlConfigMigrator(client, clusterService);
43+
this.mlConfigMigrator = new MlConfigMigrator(settings, client, clusterService);
4344
this.threadPool = threadPool;
4445
clusterService.addLocalNodeMasterListener(this);
4546
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ml;
7+
8+
import org.elasticsearch.Version;
9+
import org.elasticsearch.cluster.ClusterState;
10+
import org.elasticsearch.cluster.service.ClusterService;
11+
import org.elasticsearch.common.settings.Setting;
12+
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
14+
import org.elasticsearch.xpack.core.ml.MlMetadata;
15+
import org.elasticsearch.xpack.core.ml.MlTasks;
16+
import org.elasticsearch.xpack.core.ml.job.config.Job;
17+
18+
/**
19+
* Checks whether migration can start and whether ML resources (e.g. jobs, datafeeds)
20+
* are eligible to be migrated from the cluster state into the config index
21+
*/
22+
public class MlConfigMigrationEligibilityCheck {
23+
24+
private static final Version MIN_NODE_VERSION = Version.V_6_6_0;
25+
26+
public static final Setting<Boolean> ENABLE_CONFIG_MIGRATION = Setting.boolSetting(
27+
"xpack.ml.enable_config_migration", true, Setting.Property.Dynamic, Setting.Property.NodeScope);
28+
29+
private volatile boolean isConfigMigrationEnabled;
30+
31+
public MlConfigMigrationEligibilityCheck(Settings settings, ClusterService clusterService) {
32+
isConfigMigrationEnabled = ENABLE_CONFIG_MIGRATION.get(settings);
33+
clusterService.getClusterSettings().addSettingsUpdateConsumer(ENABLE_CONFIG_MIGRATION, this::setConfigMigrationEnabled);
34+
}
35+
36+
private void setConfigMigrationEnabled(boolean configMigrationEnabled) {
37+
this.isConfigMigrationEnabled = configMigrationEnabled;
38+
}
39+
40+
/**
41+
* Can migration start? Returns:
42+
* False if config migration is disabled via the setting {@link #ENABLE_CONFIG_MIGRATION}
43+
* False if the min node version of the cluster is before {@link #MIN_NODE_VERSION}
44+
* True otherwise
45+
* @param clusterState The cluster state
46+
* @return A boolean that dictates if config migration can start
47+
*/
48+
public boolean canStartMigration(ClusterState clusterState) {
49+
if (isConfigMigrationEnabled == false) {
50+
return false;
51+
}
52+
53+
Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
54+
if (minNodeVersion.before(MIN_NODE_VERSION)) {
55+
return false;
56+
}
57+
return true;
58+
}
59+
60+
/**
61+
* Is the job a eligible for migration? Returns:
62+
* False if {@link #canStartMigration(ClusterState)} returns {@code false}
63+
* False if the {@link Job#isDeleting()}
64+
* False if the job has a persistent task
65+
* True otherwise i.e. the job is present, not deleting
66+
* and does not have a persistent task.
67+
*
68+
* @param jobId The job Id
69+
* @param clusterState The cluster state
70+
* @return A boolean depending on the conditions listed above
71+
*/
72+
public boolean jobIsEligibleForMigration(String jobId, ClusterState clusterState) {
73+
if (canStartMigration(clusterState) == false) {
74+
return false;
75+
}
76+
77+
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
78+
Job job = mlMetadata.getJobs().get(jobId);
79+
80+
if (job == null || job.isDeleting()) {
81+
return false;
82+
}
83+
84+
PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
85+
return MlTasks.openJobIds(persistentTasks).contains(jobId) == false;
86+
}
87+
88+
/**
89+
* Is the datafeed a eligible for migration? Returns:
90+
* False if {@link #canStartMigration(ClusterState)} returns {@code false}
91+
* False if the datafeed is not in the cluster state
92+
* False if the datafeed has a persistent task
93+
* True otherwise i.e. the datafeed is present and does not have a persistent task.
94+
*
95+
* @param datafeedId The datafeed Id
96+
* @param clusterState The cluster state
97+
* @return A boolean depending on the conditions listed above
98+
*/
99+
public boolean datafeedIsEligibleForMigration(String datafeedId, ClusterState clusterState) {
100+
if (canStartMigration(clusterState) == false) {
101+
return false;
102+
}
103+
104+
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
105+
if (mlMetadata.getDatafeeds().containsKey(datafeedId) == false) {
106+
return false;
107+
}
108+
109+
PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
110+
return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false;
111+
}
112+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java

Lines changed: 9 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.cluster.ClusterStateUpdateTask;
2020
import org.elasticsearch.cluster.metadata.MetaData;
2121
import org.elasticsearch.cluster.service.ClusterService;
22+
import org.elasticsearch.common.settings.Settings;
2223
import org.elasticsearch.common.xcontent.ToXContent;
2324
import org.elasticsearch.common.xcontent.ToXContentObject;
2425
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -41,6 +42,7 @@
4142
import java.util.Iterator;
4243
import java.util.List;
4344
import java.util.Map;
45+
import java.util.Objects;
4446
import java.util.Set;
4547
import java.util.concurrent.atomic.AtomicBoolean;
4648
import java.util.concurrent.atomic.AtomicReference;
@@ -80,18 +82,19 @@ public class MlConfigMigrator {
8082
private static final Logger logger = LogManager.getLogger(MlConfigMigrator.class);
8183

8284
public static final String MIGRATED_FROM_VERSION = "migrated from version";
83-
public static final Version MIN_NODE_VERSION = Version.V_6_6_0;
8485

8586
static final int MAX_BULK_WRITE_SIZE = 100;
8687

8788
private final Client client;
8889
private final ClusterService clusterService;
90+
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
8991

9092
private final AtomicBoolean migrationInProgress;
9193

92-
public MlConfigMigrator(Client client, ClusterService clusterService) {
93-
this.client = client;
94-
this.clusterService = clusterService;
94+
public MlConfigMigrator(Settings settings, Client client, ClusterService clusterService) {
95+
this.client = Objects.requireNonNull(client);
96+
this.clusterService = Objects.requireNonNull(clusterService);
97+
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
9598
this.migrationInProgress = new AtomicBoolean(false);
9699
}
97100

@@ -114,9 +117,8 @@ public MlConfigMigrator(Client client, ClusterService clusterService) {
114117
*/
115118
public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener<Boolean> listener) {
116119

117-
Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
118-
if (minNodeVersion.before(MIN_NODE_VERSION)) {
119-
listener.onResponse(Boolean.FALSE);
120+
if (migrationEligibilityCheck.canStartMigration(clusterState) == false) {
121+
listener.onResponse(false);
120122
return;
121123
}
122124

@@ -455,60 +457,4 @@ static List<String> filterFailedDatafeedConfigWrites(Set<String> failedDocumentI
455457
.filter(id -> failedDocumentIds.contains(DatafeedConfig.documentId(id)) == false)
456458
.collect(Collectors.toList());
457459
}
458-
459-
/**
460-
* Is the job a eligible for migration? Returns:
461-
* False if the min node version of the cluster is before {@link #MIN_NODE_VERSION}
462-
* False if the job is not in the cluster state
463-
* False if the {@link Job#isDeleting()}
464-
* False if the job has a persistent task
465-
* True otherwise i.e. the job is present, not deleting
466-
* and does not have a persistent task.
467-
*
468-
* @param jobId The job Id
469-
* @param clusterState clusterstate
470-
* @return A boolean depending on the conditions listed above
471-
*/
472-
public static boolean jobIsEligibleForMigration(String jobId, ClusterState clusterState) {
473-
Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
474-
if (minNodeVersion.before(MIN_NODE_VERSION)) {
475-
return false;
476-
}
477-
478-
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
479-
Job job = mlMetadata.getJobs().get(jobId);
480-
481-
if (job == null || job.isDeleting()) {
482-
return false;
483-
}
484-
485-
PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
486-
return MlTasks.openJobIds(persistentTasks).contains(jobId) == false;
487-
}
488-
489-
/**
490-
* Is the datafeed a eligible for migration? Returns:
491-
* False if the min node version of the cluster is before {@link #MIN_NODE_VERSION}
492-
* False if the datafeed is not in the cluster state
493-
* False if the datafeed has a persistent task
494-
* True otherwise i.e. the datafeed is present and does not have a persistent task.
495-
*
496-
* @param datafeedId The datafeed Id
497-
* @param clusterState clusterstate
498-
* @return A boolean depending on the conditions listed above
499-
*/
500-
public static boolean datafeedIsEligibleForMigration(String datafeedId, ClusterState clusterState) {
501-
Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
502-
if (minNodeVersion.before(MIN_NODE_VERSION)) {
503-
return false;
504-
}
505-
506-
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
507-
if (mlMetadata.getDatafeeds().containsKey(datafeedId) == false) {
508-
return false;
509-
}
510-
511-
PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
512-
return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false;
513-
}
514460
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
import org.elasticsearch.cluster.service.ClusterService;
2121
import org.elasticsearch.common.inject.Inject;
2222
import org.elasticsearch.common.settings.Settings;
23+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2324
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
2425
import org.elasticsearch.persistent.PersistentTasksService;
25-
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2626
import org.elasticsearch.threadpool.ThreadPool;
2727
import org.elasticsearch.transport.TransportService;
2828
import org.elasticsearch.xpack.core.XPackPlugin;
@@ -33,7 +33,7 @@
3333
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
3434
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
3535
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
36-
import org.elasticsearch.xpack.ml.MlConfigMigrator;
36+
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
3737
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
3838

3939
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
@@ -45,6 +45,7 @@ public class TransportDeleteDatafeedAction extends TransportMasterNodeAction<Del
4545
private final DatafeedConfigProvider datafeedConfigProvider;
4646
private final ClusterService clusterService;
4747
private final PersistentTasksService persistentTasksService;
48+
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
4849

4950
@Inject
5051
public TransportDeleteDatafeedAction(Settings settings, TransportService transportService, ClusterService clusterService,
@@ -58,6 +59,7 @@ public TransportDeleteDatafeedAction(Settings settings, TransportService transpo
5859
this.datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry);
5960
this.persistentTasksService = persistentTasksService;
6061
this.clusterService = clusterService;
62+
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
6163
}
6264

6365
@Override
@@ -74,7 +76,7 @@ protected AcknowledgedResponse newResponse() {
7476
protected void masterOperation(DeleteDatafeedAction.Request request, ClusterState state,
7577
ActionListener<AcknowledgedResponse> listener) {
7678

77-
if (MlConfigMigrator.datafeedIsEligibleForMigration(request.getDatafeedId(), state)) {
79+
if (migrationEligibilityCheck.datafeedIsEligibleForMigration(request.getDatafeedId(), state)) {
7880
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("delete datafeed", request.getDatafeedId()));
7981
return;
8082
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
6565
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
6666
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
67-
import org.elasticsearch.xpack.ml.MlConfigMigrator;
67+
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
6868
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
6969
import org.elasticsearch.xpack.ml.job.JobManager;
7070
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
@@ -97,6 +97,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
9797
private final JobManager jobManager;
9898
private final DatafeedConfigProvider datafeedConfigProvider;
9999
private final MlMemoryTracker memoryTracker;
100+
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
100101

101102
/**
102103
* A map of task listeners by job_id.
@@ -122,6 +123,7 @@ public TransportDeleteJobAction(Settings settings, TransportService transportSer
122123
this.jobManager = jobManager;
123124
this.datafeedConfigProvider = datafeedConfigProvider;
124125
this.memoryTracker = memoryTracker;
126+
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
125127
this.listenersByJobId = new HashMap<>();
126128
}
127129

@@ -149,7 +151,7 @@ protected void masterOperation(DeleteJobAction.Request request, ClusterState sta
149151
protected void masterOperation(Task task, DeleteJobAction.Request request, ClusterState state,
150152
ActionListener<AcknowledgedResponse> listener) {
151153

152-
if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobId(), state)) {
154+
if (migrationEligibilityCheck.jobIsEligibleForMigration(request.getJobId(), state)) {
153155
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("delete job", request.getJobId()));
154156
return;
155157
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@
7070
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
7171
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
7272
import org.elasticsearch.xpack.ml.MachineLearning;
73-
import org.elasticsearch.xpack.ml.MlConfigMigrator;
73+
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
7474
import org.elasticsearch.xpack.ml.job.ClusterStateJobUpdate;
7575
import org.elasticsearch.xpack.ml.job.JobManager;
7676
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
@@ -113,6 +113,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
113113
private final JobResultsProvider jobResultsProvider;
114114
private final JobManager jobManager;
115115
private final MlMemoryTracker memoryTracker;
116+
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
116117

117118
@Inject
118119
public TransportOpenJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
@@ -130,6 +131,7 @@ public TransportOpenJobAction(Settings settings, TransportService transportServi
130131
this.jobConfigProvider = jobConfigProvider;
131132
this.jobManager = jobManager;
132133
this.memoryTracker = memoryTracker;
134+
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
133135
}
134136

135137
/**
@@ -520,7 +522,7 @@ protected ClusterBlockException checkBlock(OpenJobAction.Request request, Cluste
520522

521523
@Override
522524
protected void masterOperation(OpenJobAction.Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
523-
if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobParams().getJobId(), state)) {
525+
if (migrationEligibilityCheck.jobIsEligibleForMigration(request.getJobParams().getJobId(), state)) {
524526
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("open job", request.getJobParams().getJobId()));
525527
return;
526528
}

0 commit comments

Comments
 (0)