Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
Expand All @@ -68,7 +70,10 @@
* If there was an error in step 3 and the config is in both the clusterstate and
* index then when the migrator retries it must not overwrite an existing job config
* document as once the index document is present all update operations will function
* on that rather than the clusterstate
* on that rather than the clusterstate.
*
* The number of configs indexed in each bulk operation is limited by {@link #MAX_BULK_WRITE_SIZE}
* pairs of datafeeds and jobs are migrated together.
*/
public class MlConfigMigrator {

Expand All @@ -77,6 +82,8 @@ public class MlConfigMigrator {
public static final String MIGRATED_FROM_VERSION = "migrated from version";
public static final Version MIN_NODE_VERSION = Version.V_6_6_0;

static final int MAX_BULK_WRITE_SIZE = 100;

private final Client client;
private final ClusterService clusterService;

Expand Down Expand Up @@ -118,12 +125,15 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
return;
}


logger.debug("migrating ml configurations");

Collection<DatafeedConfig> datafeedsToMigrate = stoppedDatafeedConfigs(clusterState);
List<Job> jobsToMigrate = nonDeletingJobs(closedJobConfigs(clusterState)).stream()
Collection<DatafeedConfig> stoppedDatafeeds = stoppedDatafeedConfigs(clusterState);
Map<String, Job> eligibleJobs = nonDeletingJobs(closedJobConfigs(clusterState)).stream()
.map(MlConfigMigrator::updateJobForMigration)
.collect(Collectors.toList());
.collect(Collectors.toMap(Job::getId, Function.identity(), (a, b) -> a));

JobsAndDatafeeds jobsAndDatafeedsToMigrate = limitWrites(stoppedDatafeeds, eligibleJobs);

ActionListener<Boolean> unMarkMigrationInProgress = ActionListener.wrap(
response -> {
Expand All @@ -136,16 +146,16 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
}
);

if (datafeedsToMigrate.isEmpty() && jobsToMigrate.isEmpty()) {
if (jobsAndDatafeedsToMigrate.totalCount() == 0) {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
return;
}

writeConfigToIndex(datafeedsToMigrate, jobsToMigrate, ActionListener.wrap(
writeConfigToIndex(jobsAndDatafeedsToMigrate.datafeedConfigs, jobsAndDatafeedsToMigrate.jobs, ActionListener.wrap(
failedDocumentIds -> {
List<String> successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, jobsToMigrate);
List<String> successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, jobsAndDatafeedsToMigrate.jobs);
List<String> successfulDatafeedWrites =
filterFailedDatafeedConfigWrites(failedDocumentIds, datafeedsToMigrate);
filterFailedDatafeedConfigWrites(failedDocumentIds, jobsAndDatafeedsToMigrate.datafeedConfigs);
removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, unMarkMigrationInProgress);
},
unMarkMigrationInProgress::onFailure
Expand Down Expand Up @@ -350,6 +360,62 @@ public static List<DatafeedConfig> stoppedDatafeedConfigs(ClusterState clusterSt
.collect(Collectors.toList());
}

public static class JobsAndDatafeeds {
List<Job> jobs;
List<DatafeedConfig> datafeedConfigs;

private JobsAndDatafeeds() {
jobs = new ArrayList<>();
datafeedConfigs = new ArrayList<>();
}

public int totalCount() {
return jobs.size() + datafeedConfigs.size();
}
}

/**
* Return at most {@link #MAX_BULK_WRITE_SIZE} configs favouring
* datafeed and job pairs so if a datafeed is chosen so is its job.
*
* @param datafeedsToMigrate Datafeed configs
* @param jobsToMigrate Job configs
* @return Job and datafeed configs
*/
public static JobsAndDatafeeds limitWrites(Collection<DatafeedConfig> datafeedsToMigrate, Map<String, Job> jobsToMigrate) {
JobsAndDatafeeds jobsAndDatafeeds = new JobsAndDatafeeds();

if (datafeedsToMigrate.size() + jobsToMigrate.size() <= MAX_BULK_WRITE_SIZE) {
jobsAndDatafeeds.jobs.addAll(jobsToMigrate.values());
jobsAndDatafeeds.datafeedConfigs.addAll(datafeedsToMigrate);
return jobsAndDatafeeds;
}

int count = 0;

// prioritise datafeed and job pairs
for (DatafeedConfig datafeedConfig : datafeedsToMigrate) {
if (count < MAX_BULK_WRITE_SIZE) {
jobsAndDatafeeds.datafeedConfigs.add(datafeedConfig);
count++;
Job datafeedsJob = jobsToMigrate.remove(datafeedConfig.getJobId());
if (datafeedsJob != null) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This situation where there is a datafeed but we cannot find the job it is using should never happen unless the job is open when the datafeed is stopped or the job is deleting. See https://github.com/elastic/elasticsearch/blob/3cb04d825bdc83da8064fed02b29245b45a2146e/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java#L122 where the list of jobs eligible for migration is constructed

jobsAndDatafeeds.jobs.add(datafeedsJob);
count++;
}
}
}

// are there jobs without datafeeds to migrate
Iterator<Job> iter = jobsToMigrate.values().iterator();
while (iter.hasNext() && count < MAX_BULK_WRITE_SIZE) {
jobsAndDatafeeds.jobs.add(iter.next());
count++;
}

return jobsAndDatafeeds;
}

/**
* Check for failures in the bulk response and return the
* Ids of any documents not written to the index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
Expand Down Expand Up @@ -333,6 +336,83 @@ public void testDatafeedIsEligibleForMigration_givenStoppedDatafeed() {
assertTrue(MlConfigMigrator.datafeedIsEligibleForMigration(datafeedId, clusterState));
}

public void testLimitWrites_GivenBelowLimit() {
MlConfigMigrator.JobsAndDatafeeds jobsAndDatafeeds = MlConfigMigrator.limitWrites(Collections.emptyList(), Collections.emptyMap());
assertThat(jobsAndDatafeeds.datafeedConfigs, empty());
assertThat(jobsAndDatafeeds.jobs, empty());

List<DatafeedConfig> datafeeds = new ArrayList<>();
Map<String, Job> jobs = new HashMap<>();

int numDatafeeds = MlConfigMigrator.MAX_BULK_WRITE_SIZE / 2;
for (int i=0; i<numDatafeeds; i++) {
String jobId = "job" + i;
jobs.put(jobId, JobTests.buildJobBuilder(jobId).build());
datafeeds.add(createCompatibleDatafeed(jobId));
}

jobsAndDatafeeds = MlConfigMigrator.limitWrites(datafeeds, jobs);
assertThat(jobsAndDatafeeds.datafeedConfigs, hasSize(numDatafeeds));
assertThat(jobsAndDatafeeds.jobs, hasSize(numDatafeeds));
}

public void testLimitWrites_GivenAboveLimit() {
List<DatafeedConfig> datafeeds = new ArrayList<>();
Map<String, Job> jobs = new HashMap<>();

int numDatafeeds = MlConfigMigrator.MAX_BULK_WRITE_SIZE / 2 + 10;
for (int i=0; i<numDatafeeds; i++) {
String jobId = "job" + i;
jobs.put(jobId, JobTests.buildJobBuilder(jobId).build());
datafeeds.add(createCompatibleDatafeed(jobId));
}

MlConfigMigrator.JobsAndDatafeeds jobsAndDatafeeds = MlConfigMigrator.limitWrites(datafeeds, jobs);
assertEquals(MlConfigMigrator.MAX_BULK_WRITE_SIZE, jobsAndDatafeeds.totalCount());
assertThat(jobsAndDatafeeds.datafeedConfigs, hasSize(MlConfigMigrator.MAX_BULK_WRITE_SIZE / 2));
assertThat(jobsAndDatafeeds.jobs, hasSize(MlConfigMigrator.MAX_BULK_WRITE_SIZE / 2));

// assert that for each datafeed its corresponding job is selected
Set<String> selectedJobIds = jobsAndDatafeeds.jobs.stream().map(Job::getId).collect(Collectors.toSet());
Set<String> datafeedJobIds = jobsAndDatafeeds.datafeedConfigs.stream().map(DatafeedConfig::getJobId).collect(Collectors.toSet());
assertEquals(selectedJobIds, datafeedJobIds);
}

public void testLimitWrites_GivenMoreJobsThanDatafeeds() {
List<DatafeedConfig> datafeeds = new ArrayList<>();
Map<String, Job> jobs = new HashMap<>();

int numDatafeeds = MlConfigMigrator.MAX_BULK_WRITE_SIZE / 2 - 10;
for (int i=0; i<numDatafeeds; i++) {
String jobId = "job" + i;
jobs.put(jobId, JobTests.buildJobBuilder(jobId).build());
datafeeds.add(createCompatibleDatafeed(jobId));
}

for (int i=numDatafeeds; i<numDatafeeds + 40; i++) {
String jobId = "job" + i;
jobs.put(jobId, JobTests.buildJobBuilder(jobId).build());
}

MlConfigMigrator.JobsAndDatafeeds jobsAndDatafeeds = MlConfigMigrator.limitWrites(datafeeds, jobs);
assertEquals(MlConfigMigrator.MAX_BULK_WRITE_SIZE, jobsAndDatafeeds.totalCount());
assertThat(jobsAndDatafeeds.datafeedConfigs, hasSize(numDatafeeds));
assertThat(jobsAndDatafeeds.jobs, hasSize(MlConfigMigrator.MAX_BULK_WRITE_SIZE - numDatafeeds));

// assert that for each datafeed its corresponding job is selected
Set<String> selectedJobIds = jobsAndDatafeeds.jobs.stream().map(Job::getId).collect(Collectors.toSet());
Set<String> datafeedJobIds = jobsAndDatafeeds.datafeedConfigs.stream().map(DatafeedConfig::getJobId).collect(Collectors.toSet());
assertTrue(selectedJobIds.containsAll(datafeedJobIds));
}

public void testLimitWrites_GivenNullJob() {
List<DatafeedConfig> datafeeds = Collections.singletonList(createCompatibleDatafeed("no-job-for-this-datafeed"));
MlConfigMigrator.JobsAndDatafeeds jobsAndDatafeeds = MlConfigMigrator.limitWrites(datafeeds, Collections.emptyMap());

assertThat(jobsAndDatafeeds.datafeedConfigs, hasSize(1));
assertThat(jobsAndDatafeeds.jobs, empty());
}

private DatafeedConfig createCompatibleDatafeed(String jobId) {
// create a datafeed without aggregations or anything
// else that may cause validation errors
Expand Down