Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,14 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -57,6 +52,7 @@ public class JobNodeSelector {
private final ClusterState clusterState;
private final MlMemoryTracker memoryTracker;
private final Function<DiscoveryNode, String> nodeFilter;
private final NodeLoadDetector nodeLoadDetector;
private final int maxLazyNodes;

/**
Expand All @@ -70,6 +66,7 @@ public JobNodeSelector(ClusterState clusterState, String jobId, String taskName,
this.taskName = Objects.requireNonNull(taskName);
this.clusterState = Objects.requireNonNull(clusterState);
this.memoryTracker = Objects.requireNonNull(memoryTracker);
this.nodeLoadDetector = new NodeLoadDetector(Objects.requireNonNull(memoryTracker));
this.maxLazyNodes = maxLazyNodes;
this.nodeFilter = node -> {
if (MachineLearning.isMlNode(node)) {
Expand Down Expand Up @@ -105,26 +102,42 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
continue;
}

// Assuming the node is eligible at all, check loading
CurrentLoad currentLoad = calculateCurrentLoadForNode(node, persistentTasks, allocateByMemory);
allocateByMemory = currentLoad.allocateByMemory;
NodeLoadDetector.NodeLoad currentLoad = nodeLoadDetector.detectNodeLoad(
clusterState,
true, // Remove in 8.0.0
node,
dynamicMaxOpenJobs,
maxMachineMemoryPercent,
allocateByMemory
);
if (currentLoad.getError() != null) {
reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node)
+ "], because [" + currentLoad.getError() + "]";
logger.trace(reason);
reasons.add(reason);
continue;
}

if (currentLoad.numberOfAllocatingJobs >= maxConcurrentJobAllocations) {
reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because node exceeds ["
+ currentLoad.numberOfAllocatingJobs + "] the maximum number of jobs [" + maxConcurrentJobAllocations
// Assuming the node is eligible at all, check loading
allocateByMemory = currentLoad.isUseMemory();
int maxNumberOfOpenJobs = currentLoad.getMaxJobs();

if (currentLoad.getNumAllocatingJobs() >= maxConcurrentJobAllocations) {
reason = "Not opening job ["
+ jobId
+ "] on node [" + nodeNameAndMlAttributes(node) + "], because node exceeds ["
+ currentLoad.getNumAllocatingJobs()
+ "] the maximum number of jobs [" + maxConcurrentJobAllocations
+ "] in opening state";
logger.trace(reason);
reasons.add(reason);
continue;
}

Map<String, String> nodeAttributes = node.getAttributes();
int maxNumberOfOpenJobs = dynamicMaxOpenJobs;

long availableCount = maxNumberOfOpenJobs - currentLoad.numberOfAssignedJobs;
long availableCount = maxNumberOfOpenJobs - currentLoad.getNumAssignedJobs();
if (availableCount == 0) {
reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node)
+ "], because this node is full. Number of opened jobs [" + currentLoad.numberOfAssignedJobs
+ "], because this node is full. Number of opened jobs [" + currentLoad.getNumAssignedJobs()
+ "], " + MAX_OPEN_JOBS_PER_NODE.getKey() + " [" + maxNumberOfOpenJobs + "]";
logger.trace(reason);
reasons.add(reason);
Expand All @@ -136,33 +149,21 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
minLoadedNodeByCount = node;
}

String machineMemoryStr = nodeAttributes.get(MachineLearning.MACHINE_MEMORY_NODE_ATTR);
long machineMemory;
try {
machineMemory = Long.parseLong(machineMemoryStr);
} catch (NumberFormatException e) {
reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " +
MachineLearning.MACHINE_MEMORY_NODE_ATTR + " attribute [" + machineMemoryStr + "] is not a long";
logger.trace(reason);
reasons.add(reason);
continue;
}

if (allocateByMemory) {
if (machineMemory > 0) {
long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100;
if (currentLoad.getMaxMlMemory() > 0) {
Long estimatedMemoryFootprint = memoryTracker.getJobMemoryRequirement(taskName, jobId);
if (estimatedMemoryFootprint != null) {
// If this will be the first job assigned to the node then it will need to
// load the native code shared libraries, so add the overhead for this
if (currentLoad.numberOfAssignedJobs == 0) {
if (currentLoad.getNumAssignedJobs() == 0) {
estimatedMemoryFootprint += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
}
long availableMemory = maxMlMemory - currentLoad.assignedJobMemory;
long availableMemory = currentLoad.getMaxMlMemory() - currentLoad.getAssignedJobMemory();
if (estimatedMemoryFootprint > availableMemory) {
reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node)
+ "], because this node has insufficient available memory. Available memory for ML [" + maxMlMemory
+ "], memory required by existing jobs [" + currentLoad.assignedJobMemory
+ "], because this node has insufficient available memory. Available memory for ML ["
+ currentLoad.getMaxMlMemory()
+ "], memory required by existing jobs [" + currentLoad.getAssignedJobMemory()
+ "], estimated memory required for this job [" + estimatedMemoryFootprint + "]";
logger.trace(reason);
reasons.add(reason);
Expand All @@ -177,15 +178,20 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
// If we cannot get the job memory requirement,
// fall back to simply allocating by job count
allocateByMemory = false;
logger.debug("Falling back to allocating job [{}] by job counts because its memory requirement was not available",
jobId);
logger.debug(
() -> new ParameterizedMessage(
"Falling back to allocating job [{}] by job counts because its memory requirement was not available",
jobId));
}
} else {
// If we cannot get the available memory on any machine in
// the cluster, fall back to simply allocating by job count
allocateByMemory = false;
logger.debug("Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]",
jobId, nodeNameAndMlAttributes(node));
logger.debug(
() -> new ParameterizedMessage(
"Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]",
jobId,
nodeNameAndMlAttributes(node)));
}
}
}
Expand Down Expand Up @@ -220,67 +226,6 @@ PersistentTasksCustomMetadata.Assignment considerLazyAssignment(PersistentTasksC
return currentAssignment;
}

private CurrentLoad calculateCurrentLoadForNode(DiscoveryNode node, PersistentTasksCustomMetadata persistentTasks,
final boolean allocateByMemory) {
CurrentLoad result = new CurrentLoad(allocateByMemory);

if (persistentTasks != null) {
// find all the anomaly detector job tasks assigned to this node
Collection<PersistentTasksCustomMetadata.PersistentTask<?>> assignedAnomalyDetectorTasks = persistentTasks.findTasks(
MlTasks.JOB_TASK_NAME, task -> node.getId().equals(task.getExecutorNode()));
for (PersistentTasksCustomMetadata.PersistentTask<?> assignedTask : assignedAnomalyDetectorTasks) {
JobState jobState = MlTasks.getJobStateModifiedForReassignments(assignedTask);
if (jobState.isAnyOf(JobState.CLOSED, JobState.FAILED) == false) {
// Don't count CLOSED or FAILED jobs, as they don't consume native memory
++result.numberOfAssignedJobs;
if (jobState == JobState.OPENING) {
++result.numberOfAllocatingJobs;
}
OpenJobAction.JobParams params = (OpenJobAction.JobParams) assignedTask.getParams();
Long jobMemoryRequirement = memoryTracker.getAnomalyDetectorJobMemoryRequirement(params.getJobId());
if (jobMemoryRequirement == null) {
result.allocateByMemory = false;
logger.debug("Falling back to allocating job [{}] by job counts because " +
"the memory requirement for job [{}] was not available", jobId, params.getJobId());
} else {
logger.debug("adding " + jobMemoryRequirement);
result.assignedJobMemory += jobMemoryRequirement;
}
}
}
// find all the data frame analytics job tasks assigned to this node
Collection<PersistentTasksCustomMetadata.PersistentTask<?>> assignedAnalyticsTasks = persistentTasks.findTasks(
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, task -> node.getId().equals(task.getExecutorNode()));
for (PersistentTasksCustomMetadata.PersistentTask<?> assignedTask : assignedAnalyticsTasks) {
DataFrameAnalyticsState dataFrameAnalyticsState = MlTasks.getDataFrameAnalyticsState(assignedTask);

// Don't count stopped and failed df-analytics tasks as they don't consume native memory
if (dataFrameAnalyticsState.isAnyOf(DataFrameAnalyticsState.STOPPED, DataFrameAnalyticsState.FAILED) == false) {
// The native process is only running in the ANALYZING and STOPPING states, but in the STARTED
// and REINDEXING states we're committed to using the memory soon, so account for it here
++result.numberOfAssignedJobs;
StartDataFrameAnalyticsAction.TaskParams params =
(StartDataFrameAnalyticsAction.TaskParams) assignedTask.getParams();
Long jobMemoryRequirement = memoryTracker.getDataFrameAnalyticsJobMemoryRequirement(params.getId());
if (jobMemoryRequirement == null) {
result.allocateByMemory = false;
logger.debug("Falling back to allocating job [{}] by job counts because " +
"the memory requirement for job [{}] was not available", jobId, params.getId());
} else {
result.assignedJobMemory += jobMemoryRequirement;
}
}
}
// if any jobs are running then the native code will be loaded, but shared between all jobs,
// so increase the total memory usage of the assigned jobs to account for this
if (result.numberOfAssignedJobs > 0) {
result.assignedJobMemory += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
}
}

return result;
}

static String nodeNameOrId(DiscoveryNode node) {
String nodeNameOrID = node.getName();
if (Strings.isNullOrEmpty(nodeNameOrID)) {
Expand Down Expand Up @@ -308,15 +253,4 @@ static String nodeNameAndMlAttributes(DiscoveryNode node) {
return builder.toString();
}

private static class CurrentLoad {

long numberOfAssignedJobs = 0;
long numberOfAllocatingJobs = 0;
long assignedJobMemory = 0;
boolean allocateByMemory;

CurrentLoad(boolean allocateByMemory) {
this.allocateByMemory = allocateByMemory;
}
}
}
Loading