Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope);
public static final Setting<Integer> MAX_MACHINE_MEMORY_PERCENT =
Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 90, Property.Dynamic, Property.NodeScope);
public static final Setting<Integer> MAX_LAZY_ML_NODES =
Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope);

private static final Logger logger = Loggers.getLogger(XPackPlugin.class);

Expand Down Expand Up @@ -288,6 +290,7 @@ public List<Setting<?>> getSettings() {
ML_ENABLED,
CONCURRENT_JOB_ALLOCATIONS,
MachineLearningField.MAX_MODEL_MEMORY_LIMIT,
MAX_LAZY_ML_NODES,
MAX_MACHINE_MEMORY_PERCENT,
AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING,
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
private final PersistentTasksService persistentTasksService;
private final Client client;
private final JobResultsProvider jobResultsProvider;
private static final PersistentTasksCustomMetaData.Assignment AWAITING_LAZY_ASSIGNMENT =
new PersistentTasksCustomMetaData.Assignment(null, "persistent task is awaiting node assignment.");


@Inject
public TransportOpenJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
Expand Down Expand Up @@ -683,6 +686,7 @@ public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecut
private final int fallbackMaxNumberOfOpenJobs;
private volatile int maxConcurrentJobAllocations;
private volatile int maxMachineMemoryPercent;
private volatile int maxLazyMLNodes;

public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService,
AutodetectProcessManager autodetectProcessManager) {
Expand All @@ -691,16 +695,35 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS
this.fallbackMaxNumberOfOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings);
this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings);
this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings);
this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, this::setMaxConcurrentJobAllocations);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, this::setMaxMachineMemoryPercent);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes);
}

@Override
public PersistentTasksCustomMetaData.Assignment getAssignment(OpenJobAction.JobParams params, ClusterState clusterState) {
return selectLeastLoadedMlNode(params.getJobId(), clusterState, maxConcurrentJobAllocations, fallbackMaxNumberOfOpenJobs,
maxMachineMemoryPercent, logger);
PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(params.getJobId(),
clusterState,
maxConcurrentJobAllocations,
fallbackMaxNumberOfOpenJobs,
maxMachineMemoryPercent,
logger);
if (assignment.getExecutorNode() == null) {
int numMlNodes = 0;
for(DiscoveryNode node : clusterState.getNodes()) {
if (Boolean.valueOf(node.getAttributes().get(MachineLearning.ML_ENABLED_NODE_ATTR))) {
numMlNodes++;
}
}

if (numMlNodes < maxLazyMLNodes) { // Means we have lazy nodes left to allocate
assignment = AWAITING_LAZY_ASSIGNMENT;
}
}
return assignment;
}

@Override
Expand All @@ -710,9 +733,9 @@ public void validate(OpenJobAction.JobParams params, ClusterState clusterState)

// If we already know that we can't find an ml node because all ml nodes are running at capacity or
// simply because there are no ml nodes in the cluster then we fail quickly here:
PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(params.getJobId(), clusterState,
maxConcurrentJobAllocations, fallbackMaxNumberOfOpenJobs, maxMachineMemoryPercent, logger);
if (assignment.getExecutorNode() == null) {

PersistentTasksCustomMetaData.Assignment assignment = getAssignment(params, clusterState);
if (assignment.getExecutorNode() == null && assignment.equals(AWAITING_LAZY_ASSIGNMENT) == false) {
throw makeNoSuitableNodesException(logger, params.getJobId(), assignment.getExplanation());
}
}
Expand Down Expand Up @@ -756,6 +779,12 @@ void setMaxMachineMemoryPercent(int maxMachineMemoryPercent) {
this.maxMachineMemoryPercent, maxMachineMemoryPercent);
this.maxMachineMemoryPercent = maxMachineMemoryPercent;
}

void setMaxLazyMLNodes(int maxLazyMLNodes) {
logger.info("Changing [{}] from [{}] to [{}]", MachineLearning.MAX_LAZY_ML_NODES.getKey(),
this.maxLazyMLNodes, maxLazyMLNodes);
this.maxLazyMLNodes = maxLazyMLNodes;
}
}

public static class JobTask extends AllocatedPersistentTask implements OpenJobAction.JobTaskMatcher {
Expand Down Expand Up @@ -813,6 +842,12 @@ public boolean test(PersistentTasksCustomMetaData.PersistentTask<?> persistentTa
jobState = jobTaskState == null ? JobState.OPENING : jobTaskState.getState();

PersistentTasksCustomMetaData.Assignment assignment = persistentTask.getAssignment();

// This means we are awaiting a new node to be spun up, ok to return back to the user to await node creation
if (assignment != null && assignment.equals(AWAITING_LAZY_ASSIGNMENT)) {
return true;
}

// This logic is only appropriate when opening a job, not when reallocating following a failure,
// and this is why this class must only be used when opening a job
if (assignment != null && assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,68 @@ public void testCloseFailedJob() throws Exception {
assertEquals(JobState.OPENED, ((JobTaskState) task.getState()).getState());
}

public void testLazyNodeValidation() throws Exception {
int numNodes = 1;
int maxNumberOfJobsPerNode = 1;
int maxNumberOfLazyNodes = 2;
internalCluster().ensureAtMostNumDataNodes(0);
logger.info("[{}] is [{}]", AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode);
for (int i = 0; i < numNodes; i++) {
internalCluster().startNode(Settings.builder()
.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode));
}
logger.info("Started [{}] nodes", numNodes);
ensureStableCluster(numNodes);
logger.info("[{}] is [{}]", MachineLearning.MAX_LAZY_ML_NODES.getKey(), maxNumberOfLazyNodes);
// Set our lazy node number
assertTrue(client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder()
.put(MachineLearning.MAX_LAZY_ML_NODES.getKey(), maxNumberOfLazyNodes))
.get()
.isAcknowledged());
// create and open first job, which succeeds:
Job.Builder job = createJob("lazy-node-validation-job-1", new ByteSizeValue(2, ByteSizeUnit.MB));
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
client().execute(PutJobAction.INSTANCE, putJobRequest).get();
client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).get();
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE,
new GetJobsStatsAction.Request("lazy-node-validation-job-1")).actionGet();
assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED);
});

// create and try to open second job, which succeeds due to lazy node number:
job = createJob("lazy-node-validation-job-2", new ByteSizeValue(2, ByteSizeUnit.MB));
putJobRequest = new PutJobAction.Request(job);
client().execute(PutJobAction.INSTANCE, putJobRequest).get();
client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())).get(); // Should return while job is opening

assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE,
new GetJobsStatsAction.Request("lazy-node-validation-job-2")).actionGet();
// Should get to opening state w/o a node
assertEquals(JobState.OPENING, statsResponse.getResponse().results().get(0).getState());
});

// Add another Node so we can get allocated
internalCluster().startNode(Settings.builder()
.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode));
ensureStableCluster(numNodes+1);

// We should automatically get allocated and opened to new node
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE,
new GetJobsStatsAction.Request("lazy-node-validation-job-2")).actionGet();
assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState());
});
}

public void testSingleNode() throws Exception {
verifyMaxNumberOfJobsLimit(1, randomIntBetween(1, 100));
}
Expand Down