Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public static ElasticsearchStatusException badRequestException(String msg, Objec
return new ElasticsearchStatusException(msg, RestStatus.BAD_REQUEST, args);
}

public static ElasticsearchStatusException configHasNotBeenMigrated(String verb, String id) {
return new ElasticsearchStatusException("cannot {} as the configuration [{}] is temporarily pending migration",
RestStatus.SERVICE_UNAVAILABLE, verb, id);
}

/**
* Creates an error message that explains there are shard failures, displays info
* for the first failure (shard/reason) and kindly asks to see more info in the logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand Down Expand Up @@ -82,29 +81,23 @@ public void clusterChanged(ClusterChangedEvent event) {
}
PersistentTasksCustomMetaData previous = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData current = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (Objects.equals(previous, current)) {
return;
}

Version minNodeVersion = event.state().nodes().getMinNodeVersion();
if (minNodeVersion.onOrAfter(Version.V_6_6_0)) {
// ok to migrate
mlConfigMigrator.migrateConfigsWithoutTasks(event.state(), ActionListener.wrap(
response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())),
e -> {
logger.error("error migrating ml configurations", e);
threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state()));
}
));
} else {
threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state()));
}

mlConfigMigrator.migrateConfigsWithoutTasks(event.state(), ActionListener.wrap(
response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())),
e -> {
logger.error("error migrating ml configurations", e);
threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state()));
}
));
}

private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, PersistentTasksCustomMetaData previous,
ClusterState state) {

if (Objects.equals(previous, current)) {
return;
}

for (PersistentTask<?> currentTask : current.tasks()) {
Assignment currentAssignment = currentTask.getAssignment();
PersistentTask<?> previousTask = previous != null ? previous.getTask(currentTask.getId()) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class MlConfigMigrator {
private static final Logger logger = LogManager.getLogger(MlConfigMigrator.class);

public static final String MIGRATED_FROM_VERSION = "migrated from version";
public static final Version MIN_NODE_VERSION = Version.V_6_6_0;

private final Client client;
private final ClusterService clusterService;
Expand Down Expand Up @@ -106,11 +107,19 @@ public MlConfigMigrator(Client client, ClusterService clusterService) {
*/
public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener<Boolean> listener) {

Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
if (minNodeVersion.before(MIN_NODE_VERSION)) {
listener.onResponse(Boolean.FALSE);
return;
}

if (migrationInProgress.compareAndSet(false, true) == false) {
listener.onResponse(Boolean.FALSE);
return;
}

logger.debug("migrating ml configurations");

Collection<DatafeedConfig> datafeedsToMigrate = stoppedDatafeedConfigs(clusterState);
List<Job> jobsToMigrate = nonDeletingJobs(closedJobConfigs(clusterState)).stream()
.map(MlConfigMigrator::updateJobForMigration)
Expand Down Expand Up @@ -381,4 +390,60 @@ static List<String> filterFailedDatafeedConfigWrites(Set<String> failedDocumentI
.filter(id -> failedDocumentIds.contains(DatafeedConfig.documentId(id)) == false)
.collect(Collectors.toList());
}

/**
* Is the job a eligible for migration? Returns:
* False if the min node version of the cluster is before {@link #MIN_NODE_VERSION}
* False if the job is not in the cluster state
* False if the {@link Job#isDeleting()}
* False if the job has a persistent task
* True otherwise i.e. the job is present, not deleting
* and does not have a persistent task.
*
* @param jobId The job Id
* @param clusterState clusterstate
* @return A boolean depending on the conditions listed above
*/
public static boolean jobIsEligibleForMigration(String jobId, ClusterState clusterState) {
Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
if (minNodeVersion.before(MIN_NODE_VERSION)) {
return false;
}

MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
Job job = mlMetadata.getJobs().get(jobId);

if (job == null || job.isDeleting()) {
return false;
}

PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
return MlTasks.openJobIds(persistentTasks).contains(jobId) == false;
}

/**
* Is the datafeed a eligible for migration? Returns:
* False if the min node version of the cluster is before {@link #MIN_NODE_VERSION}
* False if the datafeed is not in the cluster state
* False if the datafeed has a persistent task
* True otherwise i.e. the datafeed is present and does not have a persistent task.
*
* @param datafeedId The datafeed Id
* @param clusterState clusterstate
* @return A boolean depending on the conditions listed above
*/
public static boolean datafeedIsEligibleForMigration(String datafeedId, ClusterState clusterState) {
Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
if (minNodeVersion.before(MIN_NODE_VERSION)) {
return false;
}

MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
if (mlMetadata.getDatafeeds().containsKey(datafeedId) == false) {
return false;
}

PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MlConfigMigrator;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
Expand Down Expand Up @@ -72,6 +73,12 @@ protected AcknowledgedResponse newResponse() {
@Override
protected void masterOperation(DeleteDatafeedAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {

if (MlConfigMigrator.datafeedIsEligibleForMigration(request.getDatafeedId(), state)) {
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("delete datafeed", request.getDatafeedId()));
return;
}

if (request.isForce()) {
forceDeleteDatafeed(request, state, listener);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MlConfigMigrator;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
Expand Down Expand Up @@ -147,6 +148,12 @@ protected void masterOperation(DeleteJobAction.Request request, ClusterState sta
@Override
protected void masterOperation(Task task, DeleteJobAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {

if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobId(), state)) {
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("delete job", request.getJobId()));
return;
}

logger.debug("Deleting job '{}'", request.getJobId());

if (request.isForce() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlConfigMigrator;
import org.elasticsearch.xpack.ml.job.ClusterStateJobUpdate;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
Expand Down Expand Up @@ -528,6 +529,11 @@ protected ClusterBlockException checkBlock(OpenJobAction.Request request, Cluste

@Override
protected void masterOperation(OpenJobAction.Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobParams().getJobId(), state)) {
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("open job", request.getJobParams().getJobId()));
return;
}

OpenJobAction.JobParams jobParams = request.getJobParams();
if (licenseState.isMachineLearningAllowed()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.MlConfigMigrator;
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
Expand Down Expand Up @@ -68,6 +69,11 @@ protected RevertModelSnapshotAction.Response newResponse() {
@Override
protected void masterOperation(RevertModelSnapshotAction.Request request, ClusterState state,
ActionListener<RevertModelSnapshotAction.Response> listener) {
if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobId(), state)) {
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("revert model snapshot", request.getJobId()));
return;
}

logger.debug("Received request to revert to snapshot id '{}' for job '{}', deleting intervening results: {}",
request.getSnapshotId(), request.getJobId(), request.getDeleteInterveningResults());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlConfigMigrator;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfigReader;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector;
Expand Down Expand Up @@ -139,6 +140,11 @@ protected void masterOperation(StartDatafeedAction.Request request, ClusterState
return;
}

if (MlConfigMigrator.datafeedIsEligibleForMigration(request.getParams().getDatafeedId(), state)) {
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("start datafeed", request.getParams().getDatafeedId()));
return;
}

AtomicReference<DatafeedConfig> datafeedConfigHolder = new AtomicReference<>();
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MlConfigMigrator;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;

Expand Down Expand Up @@ -69,6 +70,11 @@ protected PutDatafeedAction.Response newResponse() {
protected void masterOperation(UpdateDatafeedAction.Request request, ClusterState state,
ActionListener<PutDatafeedAction.Response> listener) throws Exception {

if (MlConfigMigrator.datafeedIsEligibleForMigration(request.getUpdate().getId(), state)) {
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("update datafeed", request.getUpdate().getId()));
return;
}

MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
boolean datafeedConfigIsInClusterState = mlMetadata.getDatafeed(request.getUpdate().getId()) != null;
if (datafeedConfigIsInClusterState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlConfigMigrator;
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer;
import org.elasticsearch.xpack.ml.job.persistence.ExpandedIdsMatcher;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
Expand Down Expand Up @@ -436,7 +437,13 @@ public void onFailure(Exception e) {
}

public void updateJob(UpdateJobAction.Request request, ActionListener<PutJobAction.Response> actionListener) {
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state());
ClusterState clusterState = clusterService.state();
if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobId(), clusterState)) {
actionListener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("update job", request.getJobId()));
return;
}

MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);

if (request.getJobUpdate().getGroups() != null && request.getJobUpdate().getGroups().isEmpty() == false) {

Expand Down Expand Up @@ -466,6 +473,7 @@ public void updateJob(UpdateJobAction.Request request, ActionListener<PutJobActi

private void updateJobPostInitialChecks(UpdateJobAction.Request request, MlMetadata mlMetadata,
ActionListener<PutJobAction.Response> actionListener) {

if (ClusterStateJobUpdate.jobIsInMlMetadata(mlMetadata, request.getJobId())) {
updateJobClusterState(request, actionListener);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,38 +139,4 @@ public void testClusterChanged_noPersistentTaskChanges() {
notifier.offMaster();
verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any());
}

public void testMigrateNotTriggered_GivenPre66Nodes() {
MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService);
notifier.onMaster();

ClusterState previous = ClusterState.builder(new ClusterName("_name"))
.build();

PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", null, null, tasksBuilder);
MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build();

// mixed 6.5 and 6.6 nodes
ClusterState current = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_5_0))
.add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0)))
.metaData(metaData)
.build();

notifier.clusterChanged(new ClusterChangedEvent("_test", current, previous));
verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any());

current = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("node_id1", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.V_6_6_0))
.add(new DiscoveryNode("node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301), Version.V_6_6_0)))
.metaData(metaData)
.build();

// all 6.6 nodes
notifier.clusterChanged(new ClusterChangedEvent("_test", current, previous));
verify(configMigrator, times(1)).migrateConfigsWithoutTasks(any(), any());
}
}
Loading