Skip to content

Commit eaf672f

Browse files
author
David Roberts
authored
[ML] Don't install empty ML metadata on startup (#30751)
This change is to support rolling upgrade from a pre-6.3 default distribution (i.e. without X-Pack) to a 6.3+ default distribution (i.e. with X-Pack). The ML metadata is no longer eagerly added to the cluster state as soon as the master node has X-Pack available. Instead, it is added when the first ML job is created. As a result all methods that get the ML metadata need to be able to handle the situation where there is no ML metadata in the current cluster state. They do this by behaving as though an empty ML metadata was present. This logic is encapsulated by always asking for the current ML metadata using a static method on the MlMetadata class. Relates #30731
1 parent e639036 commit eaf672f

34 files changed

+99
-256
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.elasticsearch.ResourceNotFoundException;
1010
import org.elasticsearch.Version;
1111
import org.elasticsearch.cluster.AbstractDiffable;
12+
import org.elasticsearch.cluster.ClusterState;
1213
import org.elasticsearch.cluster.Diff;
1314
import org.elasticsearch.cluster.DiffableUtils;
1415
import org.elasticsearch.cluster.NamedDiff;
@@ -467,6 +468,14 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis
467468
}
468469
}
469470

471+
public static MlMetadata getMlMetadata(ClusterState state) {
472+
MlMetadata mlMetadata = (state == null) ? null : state.getMetaData().custom(MLMetadataField.TYPE);
473+
if (mlMetadata == null) {
474+
return EMPTY_METADATA;
475+
}
476+
return mlMetadata;
477+
}
478+
470479
public static class JobAlreadyMarkedAsDeletedException extends RuntimeException {
471480
}
472481
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package org.elasticsearch.xpack.core.ml.job.persistence;
77

88
import org.elasticsearch.cluster.ClusterState;
9-
import org.elasticsearch.xpack.core.ml.MLMetadataField;
109
import org.elasticsearch.xpack.core.ml.MlMetadata;
1110

1211
/**
@@ -47,8 +46,7 @@ public static String resultsWriteAlias(String jobId) {
4746
* @return The index name
4847
*/
4948
public static String getPhysicalIndexFromState(ClusterState state, String jobId) {
50-
MlMetadata meta = state.getMetaData().custom(MLMetadataField.TYPE);
51-
return meta.getJobs().get(jobId).getResultsIndexName();
49+
return MlMetadata.getMlMetadata(state).getJobs().get(jobId).getResultsIndexName();
5250
}
5351

5452
/**

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.xpack.core.XPackPlugin;
2424
import org.elasticsearch.xpack.core.XPackSettings;
2525
import org.elasticsearch.xpack.core.XPackField;
26-
import org.elasticsearch.xpack.core.ml.MLMetadataField;
2726
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
2827
import org.elasticsearch.xpack.core.ml.MlMetadata;
2928
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
@@ -132,15 +131,7 @@ public Map<String, Object> nativeCodeInfo() {
132131
@Override
133132
public void usage(ActionListener<XPackFeatureSet.Usage> listener) {
134133
ClusterState state = clusterService.state();
135-
MlMetadata mlMetadata = state.getMetaData().custom(MLMetadataField.TYPE);
136-
137-
// Handle case when usage is called but MlMetadata has not been installed yet
138-
if (mlMetadata == null) {
139-
listener.onResponse(new MachineLearningFeatureSetUsage(available(), enabled,
140-
Collections.emptyMap(), Collections.emptyMap()));
141-
} else {
142-
new Retriever(client, mlMetadata, available(), enabled()).execute(listener);
143-
}
134+
new Retriever(client, MlMetadata.getMlMetadata(state), available(), enabled()).execute(listener);
144135
}
145136

146137
public static class Retriever {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.common.component.AbstractComponent;
1414
import org.elasticsearch.common.settings.Settings;
1515
import org.elasticsearch.threadpool.ThreadPool;
16-
import org.elasticsearch.xpack.core.ml.MLMetadataField;
1716
import org.elasticsearch.xpack.core.ml.MlMetadata;
1817
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
1918
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
@@ -90,8 +89,7 @@ public void clusterChanged(ClusterChangedEvent event) {
9089
}
9190
} else if (StartDatafeedAction.TASK_NAME.equals(currentTask.getTaskName())) {
9291
String datafeedId = ((StartDatafeedAction.DatafeedParams) currentTask.getParams()).getDatafeedId();
93-
MlMetadata mlMetadata = event.state().getMetaData().custom(MLMetadataField.TYPE);
94-
DatafeedConfig datafeedConfig = mlMetadata.getDatafeed(datafeedId);
92+
DatafeedConfig datafeedConfig = MlMetadata.getMlMetadata(event.state()).getDatafeed(datafeedId);
9593
if (currentAssignment.getExecutorNode() == null) {
9694
String msg = "No node found to start datafeed [" + datafeedId +"]. Reasons [" +
9795
currentAssignment.getExplanation() + "]";

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,29 +7,20 @@
77

88
import org.elasticsearch.client.Client;
99
import org.elasticsearch.cluster.ClusterChangedEvent;
10-
import org.elasticsearch.cluster.ClusterState;
1110
import org.elasticsearch.cluster.ClusterStateListener;
12-
import org.elasticsearch.cluster.ClusterStateUpdateTask;
13-
import org.elasticsearch.cluster.metadata.MetaData;
1411
import org.elasticsearch.cluster.service.ClusterService;
1512
import org.elasticsearch.common.component.AbstractComponent;
1613
import org.elasticsearch.common.component.LifecycleListener;
1714
import org.elasticsearch.common.settings.Settings;
1815
import org.elasticsearch.gateway.GatewayService;
1916
import org.elasticsearch.threadpool.ThreadPool;
20-
import org.elasticsearch.xpack.core.ml.MLMetadataField;
21-
import org.elasticsearch.xpack.core.ml.MlMetadata;
22-
23-
import java.util.concurrent.atomic.AtomicBoolean;
2417

2518
class MlInitializationService extends AbstractComponent implements ClusterStateListener {
2619

2720
private final ThreadPool threadPool;
2821
private final ClusterService clusterService;
2922
private final Client client;
3023

31-
private final AtomicBoolean installMlMetadataCheck = new AtomicBoolean(false);
32-
3324
private volatile MlDailyMaintenanceService mlDailyMaintenanceService;
3425

3526
MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client) {
@@ -48,45 +39,12 @@ public void clusterChanged(ClusterChangedEvent event) {
4839
}
4940

5041
if (event.localNodeMaster()) {
51-
MetaData metaData = event.state().metaData();
52-
installMlMetadata(metaData);
5342
installDailyMaintenanceService();
5443
} else {
5544
uninstallDailyMaintenanceService();
5645
}
5746
}
5847

59-
private void installMlMetadata(MetaData metaData) {
60-
if (metaData.custom(MLMetadataField.TYPE) == null) {
61-
if (installMlMetadataCheck.compareAndSet(false, true)) {
62-
threadPool.executor(ThreadPool.Names.GENERIC).execute(() ->
63-
clusterService.submitStateUpdateTask("install-ml-metadata", new ClusterStateUpdateTask() {
64-
@Override
65-
public ClusterState execute(ClusterState currentState) throws Exception {
66-
// If the metadata has been added already don't try to update
67-
if (currentState.metaData().custom(MLMetadataField.TYPE) != null) {
68-
return currentState;
69-
}
70-
ClusterState.Builder builder = new ClusterState.Builder(currentState);
71-
MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
72-
metadataBuilder.putCustom(MLMetadataField.TYPE, MlMetadata.EMPTY_METADATA);
73-
builder.metaData(metadataBuilder.build());
74-
return builder.build();
75-
}
76-
77-
@Override
78-
public void onFailure(String source, Exception e) {
79-
installMlMetadataCheck.set(false);
80-
logger.error("unable to install ml metadata", e);
81-
}
82-
})
83-
);
84-
}
85-
} else {
86-
installMlMetadataCheck.set(false);
87-
}
88-
}
89-
9048
private void installDailyMaintenanceService() {
9149
if (mlDailyMaintenanceService == null) {
9250
mlDailyMaintenanceService = new MlDailyMaintenanceService(clusterService.getClusterName(), threadPool, client);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.elasticsearch.tasks.Task;
2828
import org.elasticsearch.threadpool.ThreadPool;
2929
import org.elasticsearch.transport.TransportService;
30-
import org.elasticsearch.xpack.core.ml.MLMetadataField;
3130
import org.elasticsearch.xpack.core.ml.MlMetadata;
3231
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
3332
import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
@@ -92,8 +91,7 @@ public TransportCloseJobAction(Settings settings, TransportService transportServ
9291
static void resolveAndValidateJobId(CloseJobAction.Request request, ClusterState state, List<String> openJobIds,
9392
List<String> closingJobIds) {
9493
PersistentTasksCustomMetaData tasksMetaData = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
95-
MlMetadata maybeNull = state.metaData().custom(MLMetadataField.TYPE);
96-
final MlMetadata mlMetadata = (maybeNull == null) ? MlMetadata.EMPTY_METADATA : maybeNull;
94+
final MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
9795

9896
List<String> failedJobs = new ArrayList<>();
9997

@@ -107,7 +105,7 @@ static void resolveAndValidateJobId(CloseJobAction.Request request, ClusterState
107105
};
108106

109107
Set<String> expandedJobIds = mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs());
110-
expandedJobIds.stream().forEach(jobIdProcessor::accept);
108+
expandedJobIds.forEach(jobIdProcessor::accept);
111109
if (request.isForce() == false && failedJobs.size() > 0) {
112110
if (expandedJobIds.size() == 1) {
113111
throw ExceptionsHelper.conflictStatusException("cannot close job [{}] because it failed, use force close",

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,8 @@ protected DeleteDatafeedAction.Response newResponse(boolean acknowledged) {
119119
}
120120

121121
@Override
122-
public ClusterState execute(ClusterState currentState) throws Exception {
123-
MlMetadata currentMetadata = currentState.getMetaData().custom(MLMetadataField.TYPE);
122+
public ClusterState execute(ClusterState currentState) {
123+
MlMetadata currentMetadata = MlMetadata.getMlMetadata(currentState);
124124
PersistentTasksCustomMetaData persistentTasks =
125125
currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
126126
MlMetadata newMetadata = new MlMetadata.Builder(currentMetadata)

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteFilterAction.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.threadpool.ThreadPool;
2525
import org.elasticsearch.transport.TransportService;
2626
import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction;
27-
import org.elasticsearch.xpack.core.ml.MLMetadataField;
2827
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
2928
import org.elasticsearch.xpack.core.ml.MlMetadata;
3029
import org.elasticsearch.xpack.core.ml.job.config.Detector;
@@ -60,8 +59,7 @@ protected void doExecute(DeleteFilterAction.Request request, ActionListener<Dele
6059

6160
final String filterId = request.getFilterId();
6261
ClusterState state = clusterService.state();
63-
MlMetadata currentMlMetadata = state.metaData().custom(MLMetadataField.TYPE);
64-
Map<String, Job> jobs = currentMlMetadata.getJobs();
62+
Map<String, Job> jobs = MlMetadata.getMlMetadata(state).getJobs();
6563
List<String> currentlyUsedBy = new ArrayList<>();
6664
for (Job job : jobs.values()) {
6765
List<Detector> detectors = job.getAnalysisConfig().getDetectors();

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -200,10 +200,9 @@ public void onFailure(Exception e) {
200200
void markJobAsDeleting(String jobId, ActionListener<Boolean> listener, boolean force) {
201201
clusterService.submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() {
202202
@Override
203-
public ClusterState execute(ClusterState currentState) throws Exception {
204-
MlMetadata currentMlMetadata = currentState.metaData().custom(MLMetadataField.TYPE);
203+
public ClusterState execute(ClusterState currentState) {
205204
PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
206-
MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata);
205+
MlMetadata.Builder builder = new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState));
207206
builder.markJobAsDeleted(jobId, tasks, force);
208207
return buildNewClusterState(currentState, builder);
209208
}
@@ -248,11 +247,7 @@ public void onTimeout(TimeValue timeout) {
248247
}
249248

250249
static boolean jobIsDeletedFromState(String jobId, ClusterState clusterState) {
251-
MlMetadata metadata = clusterState.metaData().custom(MLMetadataField.TYPE);
252-
if (metadata == null) {
253-
return true;
254-
}
255-
return !metadata.getJobs().containsKey(jobId);
250+
return !MlMetadata.getMlMetadata(clusterState).getJobs().containsKey(jobId);
256251
}
257252

258253
private static ClusterState buildNewClusterState(ClusterState currentState, MlMetadata.Builder builder) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFinalizeJobExecutionAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ protected void masterOperation(FinalizeJobExecutionAction.Request request, Clust
5656
logger.debug("finalizing jobs [{}]", jobIdString);
5757
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask() {
5858
@Override
59-
public ClusterState execute(ClusterState currentState) throws Exception {
60-
MlMetadata mlMetadata = currentState.metaData().custom(MLMetadataField.TYPE);
59+
public ClusterState execute(ClusterState currentState) {
60+
MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState);
6161
MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(mlMetadata);
6262
Date finishedTime = new Date();
6363

0 commit comments

Comments
 (0)