88import org .apache .logging .log4j .LogManager ;
99import org .apache .logging .log4j .Logger ;
1010import org .elasticsearch .Version ;
11+ import org .apache .logging .log4j .message .ParameterizedMessage ;
1112import org .elasticsearch .cluster .ClusterState ;
1213import org .elasticsearch .cluster .node .DiscoveryNode ;
1314import org .elasticsearch .common .Strings ;
1415import org .elasticsearch .persistent .PersistentTasksCustomMetadata ;
15- import org .elasticsearch .xpack .core .ml .MlTasks ;
16- import org .elasticsearch .xpack .core .ml .action .OpenJobAction ;
17- import org .elasticsearch .xpack .core .ml .action .StartDataFrameAnalyticsAction ;
18- import org .elasticsearch .xpack .core .ml .dataframe .DataFrameAnalyticsState ;
19- import org .elasticsearch .xpack .core .ml .job .config .JobState ;
2016import org .elasticsearch .xpack .ml .MachineLearning ;
2117import org .elasticsearch .xpack .ml .process .MlMemoryTracker ;
2218
23- import java .util .Collection ;
2419import java .util .LinkedList ;
2520import java .util .List ;
2621import java .util .Map ;
@@ -58,6 +53,7 @@ public class JobNodeSelector {
5853 private final ClusterState clusterState ;
5954 private final MlMemoryTracker memoryTracker ;
6055 private final Function <DiscoveryNode , String > nodeFilter ;
56+ private final NodeLoadDetector nodeLoadDetector ;
6157 private final int maxLazyNodes ;
6258
6359 /**
@@ -71,6 +67,7 @@ public JobNodeSelector(ClusterState clusterState, String jobId, String taskName,
7167 this .taskName = Objects .requireNonNull (taskName );
7268 this .clusterState = Objects .requireNonNull (clusterState );
7369 this .memoryTracker = Objects .requireNonNull (memoryTracker );
70+ this .nodeLoadDetector = new NodeLoadDetector (Objects .requireNonNull (memoryTracker ));
7471 this .maxLazyNodes = maxLazyNodes ;
7572 this .nodeFilter = node -> {
7673 if (MachineLearning .isMlNode (node )) {
@@ -109,38 +106,42 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
109106 continue ;
110107 }
111108
112- // Assuming the node is eligible at all, check loading
113- CurrentLoad currentLoad = calculateCurrentLoadForNode (node , persistentTasks , allocateByMemory );
114- allocateByMemory = currentLoad .allocateByMemory ;
109+ NodeLoadDetector .NodeLoad currentLoad = nodeLoadDetector .detectNodeLoad (
110+ clusterState ,
111+ allNodesHaveDynamicMaxWorkers , // Remove in 8.0.0
112+ node ,
113+ dynamicMaxOpenJobs ,
114+ maxMachineMemoryPercent ,
115+ allocateByMemory
116+ );
117+ if (currentLoad .getError () != null ) {
118+ reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes (node )
119+ + "], because [" + currentLoad .getError () + "]" ;
120+ logger .trace (reason );
121+ reasons .add (reason );
122+ continue ;
123+ }
115124
116- if (currentLoad .numberOfAllocatingJobs >= maxConcurrentJobAllocations ) {
117- reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes (node ) + "], because node exceeds ["
118- + currentLoad .numberOfAllocatingJobs + "] the maximum number of jobs [" + maxConcurrentJobAllocations
125+ // Assuming the node is eligible at all, check loading
126+ allocateByMemory = currentLoad .isUseMemory ();
127+ int maxNumberOfOpenJobs = currentLoad .getMaxJobs ();
128+
129+ if (currentLoad .getNumAllocatingJobs () >= maxConcurrentJobAllocations ) {
130+ reason = "Not opening job ["
131+ + jobId
132+ + "] on node [" + nodeNameAndMlAttributes (node ) + "], because node exceeds ["
133+ + currentLoad .getNumAllocatingJobs ()
134+ + "] the maximum number of jobs [" + maxConcurrentJobAllocations
119135 + "] in opening state" ;
120136 logger .trace (reason );
121137 reasons .add (reason );
122138 continue ;
123139 }
124140
125- Map <String , String > nodeAttributes = node .getAttributes ();
126- int maxNumberOfOpenJobs = dynamicMaxOpenJobs ;
127- // TODO: remove this in 8.0.0
128- if (allNodesHaveDynamicMaxWorkers == false ) {
129- String maxNumberOfOpenJobsStr = nodeAttributes .get (MachineLearning .MAX_OPEN_JOBS_NODE_ATTR );
130- try {
131- maxNumberOfOpenJobs = Integer .parseInt (maxNumberOfOpenJobsStr );
132- } catch (NumberFormatException e ) {
133- reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes (node ) + "], because " +
134- MachineLearning .MAX_OPEN_JOBS_NODE_ATTR + " attribute [" + maxNumberOfOpenJobsStr + "] is not an integer" ;
135- logger .trace (reason );
136- reasons .add (reason );
137- continue ;
138- }
139- }
140- long availableCount = maxNumberOfOpenJobs - currentLoad .numberOfAssignedJobs ;
141+ long availableCount = maxNumberOfOpenJobs - currentLoad .getNumAssignedJobs ();
141142 if (availableCount == 0 ) {
142143 reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes (node )
143- + "], because this node is full. Number of opened jobs [" + currentLoad .numberOfAssignedJobs
144+ + "], because this node is full. Number of opened jobs [" + currentLoad .getNumAssignedJobs ()
144145 + "], " + MAX_OPEN_JOBS_PER_NODE .getKey () + " [" + maxNumberOfOpenJobs + "]" ;
145146 logger .trace (reason );
146147 reasons .add (reason );
@@ -152,33 +153,21 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
152153 minLoadedNodeByCount = node ;
153154 }
154155
155- String machineMemoryStr = nodeAttributes .get (MachineLearning .MACHINE_MEMORY_NODE_ATTR );
156- long machineMemory ;
157- try {
158- machineMemory = Long .parseLong (machineMemoryStr );
159- } catch (NumberFormatException e ) {
160- reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes (node ) + "], because " +
161- MachineLearning .MACHINE_MEMORY_NODE_ATTR + " attribute [" + machineMemoryStr + "] is not a long" ;
162- logger .trace (reason );
163- reasons .add (reason );
164- continue ;
165- }
166-
167156 if (allocateByMemory ) {
168- if (machineMemory > 0 ) {
169- long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100 ;
157+ if (currentLoad .getMaxMlMemory () > 0 ) {
170158 Long estimatedMemoryFootprint = memoryTracker .getJobMemoryRequirement (taskName , jobId );
171159 if (estimatedMemoryFootprint != null ) {
172160 // If this will be the first job assigned to the node then it will need to
173161 // load the native code shared libraries, so add the overhead for this
174- if (currentLoad .numberOfAssignedJobs == 0 ) {
162+ if (currentLoad .getNumAssignedJobs () == 0 ) {
175163 estimatedMemoryFootprint += MachineLearning .NATIVE_EXECUTABLE_CODE_OVERHEAD .getBytes ();
176164 }
177- long availableMemory = maxMlMemory - currentLoad .assignedJobMemory ;
165+ long availableMemory = currentLoad . getMaxMlMemory () - currentLoad .getAssignedJobMemory () ;
178166 if (estimatedMemoryFootprint > availableMemory ) {
179167 reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes (node )
180- + "], because this node has insufficient available memory. Available memory for ML [" + maxMlMemory
181- + "], memory required by existing jobs [" + currentLoad .assignedJobMemory
168+ + "], because this node has insufficient available memory. Available memory for ML ["
169+ + currentLoad .getMaxMlMemory ()
170+ + "], memory required by existing jobs [" + currentLoad .getAssignedJobMemory ()
182171 + "], estimated memory required for this job [" + estimatedMemoryFootprint + "]" ;
183172 logger .trace (reason );
184173 reasons .add (reason );
@@ -193,15 +182,20 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
193182 // If we cannot get the job memory requirement,
194183 // fall back to simply allocating by job count
195184 allocateByMemory = false ;
196- logger .debug ("Falling back to allocating job [{}] by job counts because its memory requirement was not available" ,
197- jobId );
185+ logger .debug (
186+ () -> new ParameterizedMessage (
187+ "Falling back to allocating job [{}] by job counts because its memory requirement was not available" ,
188+ jobId ));
198189 }
199190 } else {
200191 // If we cannot get the available memory on any machine in
201192 // the cluster, fall back to simply allocating by job count
202193 allocateByMemory = false ;
203- logger .debug ("Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]" ,
204- jobId , nodeNameAndMlAttributes (node ));
194+ logger .debug (
195+ () -> new ParameterizedMessage (
196+ "Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]" ,
197+ jobId ,
198+ nodeNameAndMlAttributes (node )));
205199 }
206200 }
207201 }
@@ -236,67 +230,6 @@ PersistentTasksCustomMetadata.Assignment considerLazyAssignment(PersistentTasksC
236230 return currentAssignment ;
237231 }
238232
239- private CurrentLoad calculateCurrentLoadForNode (DiscoveryNode node , PersistentTasksCustomMetadata persistentTasks ,
240- final boolean allocateByMemory ) {
241- CurrentLoad result = new CurrentLoad (allocateByMemory );
242-
243- if (persistentTasks != null ) {
244- // find all the anomaly detector job tasks assigned to this node
245- Collection <PersistentTasksCustomMetadata .PersistentTask <?>> assignedAnomalyDetectorTasks = persistentTasks .findTasks (
246- MlTasks .JOB_TASK_NAME , task -> node .getId ().equals (task .getExecutorNode ()));
247- for (PersistentTasksCustomMetadata .PersistentTask <?> assignedTask : assignedAnomalyDetectorTasks ) {
248- JobState jobState = MlTasks .getJobStateModifiedForReassignments (assignedTask );
249- if (jobState .isAnyOf (JobState .CLOSED , JobState .FAILED ) == false ) {
250- // Don't count CLOSED or FAILED jobs, as they don't consume native memory
251- ++result .numberOfAssignedJobs ;
252- if (jobState == JobState .OPENING ) {
253- ++result .numberOfAllocatingJobs ;
254- }
255- OpenJobAction .JobParams params = (OpenJobAction .JobParams ) assignedTask .getParams ();
256- Long jobMemoryRequirement = memoryTracker .getAnomalyDetectorJobMemoryRequirement (params .getJobId ());
257- if (jobMemoryRequirement == null ) {
258- result .allocateByMemory = false ;
259- logger .debug ("Falling back to allocating job [{}] by job counts because " +
260- "the memory requirement for job [{}] was not available" , jobId , params .getJobId ());
261- } else {
262- logger .debug ("adding " + jobMemoryRequirement );
263- result .assignedJobMemory += jobMemoryRequirement ;
264- }
265- }
266- }
267- // find all the data frame analytics job tasks assigned to this node
268- Collection <PersistentTasksCustomMetadata .PersistentTask <?>> assignedAnalyticsTasks = persistentTasks .findTasks (
269- MlTasks .DATA_FRAME_ANALYTICS_TASK_NAME , task -> node .getId ().equals (task .getExecutorNode ()));
270- for (PersistentTasksCustomMetadata .PersistentTask <?> assignedTask : assignedAnalyticsTasks ) {
271- DataFrameAnalyticsState dataFrameAnalyticsState = MlTasks .getDataFrameAnalyticsState (assignedTask );
272-
273- // Don't count stopped and failed df-analytics tasks as they don't consume native memory
274- if (dataFrameAnalyticsState .isAnyOf (DataFrameAnalyticsState .STOPPED , DataFrameAnalyticsState .FAILED ) == false ) {
275- // The native process is only running in the ANALYZING and STOPPING states, but in the STARTED
276- // and REINDEXING states we're committed to using the memory soon, so account for it here
277- ++result .numberOfAssignedJobs ;
278- StartDataFrameAnalyticsAction .TaskParams params =
279- (StartDataFrameAnalyticsAction .TaskParams ) assignedTask .getParams ();
280- Long jobMemoryRequirement = memoryTracker .getDataFrameAnalyticsJobMemoryRequirement (params .getId ());
281- if (jobMemoryRequirement == null ) {
282- result .allocateByMemory = false ;
283- logger .debug ("Falling back to allocating job [{}] by job counts because " +
284- "the memory requirement for job [{}] was not available" , jobId , params .getId ());
285- } else {
286- result .assignedJobMemory += jobMemoryRequirement ;
287- }
288- }
289- }
290- // if any jobs are running then the native code will be loaded, but shared between all jobs,
291- // so increase the total memory usage of the assigned jobs to account for this
292- if (result .numberOfAssignedJobs > 0 ) {
293- result .assignedJobMemory += MachineLearning .NATIVE_EXECUTABLE_CODE_OVERHEAD .getBytes ();
294- }
295- }
296-
297- return result ;
298- }
299-
300233 static String nodeNameOrId (DiscoveryNode node ) {
301234 String nodeNameOrID = node .getName ();
302235 if (Strings .isNullOrEmpty (nodeNameOrID )) {
@@ -324,15 +257,4 @@ static String nodeNameAndMlAttributes(DiscoveryNode node) {
324257 return builder .toString ();
325258 }
326259
327- private static class CurrentLoad {
328-
329- long numberOfAssignedJobs = 0 ;
330- long numberOfAllocatingJobs = 0 ;
331- long assignedJobMemory = 0 ;
332- boolean allocateByMemory ;
333-
334- CurrentLoad (boolean allocateByMemory ) {
335- this .allocateByMemory = allocateByMemory ;
336- }
337- }
338260}
0 commit comments