77
88import org .apache .logging .log4j .LogManager ;
99import org .apache .logging .log4j .Logger ;
10+ import org .apache .logging .log4j .message .ParameterizedMessage ;
1011import org .elasticsearch .cluster .ClusterState ;
1112import org .elasticsearch .cluster .node .DiscoveryNode ;
1213import org .elasticsearch .common .Strings ;
1314import org .elasticsearch .persistent .PersistentTasksCustomMetadata ;
14- import org .elasticsearch .xpack .core .ml .MlTasks ;
15- import org .elasticsearch .xpack .core .ml .action .OpenJobAction ;
16- import org .elasticsearch .xpack .core .ml .action .StartDataFrameAnalyticsAction ;
17- import org .elasticsearch .xpack .core .ml .dataframe .DataFrameAnalyticsState ;
18- import org .elasticsearch .xpack .core .ml .job .config .JobState ;
1915import org .elasticsearch .xpack .ml .MachineLearning ;
2016import org .elasticsearch .xpack .ml .process .MlMemoryTracker ;
2117
22- import java .util .Collection ;
2318import java .util .LinkedList ;
2419import java .util .List ;
2520import java .util .Map ;
@@ -57,6 +52,7 @@ public class JobNodeSelector {
5752 private final ClusterState clusterState ;
5853 private final MlMemoryTracker memoryTracker ;
5954 private final Function <DiscoveryNode , String > nodeFilter ;
55+ private final NodeLoadDetector nodeLoadDetector ;
6056 private final int maxLazyNodes ;
6157
6258 /**
@@ -70,6 +66,7 @@ public JobNodeSelector(ClusterState clusterState, String jobId, String taskName,
7066 this .taskName = Objects .requireNonNull (taskName );
7167 this .clusterState = Objects .requireNonNull (clusterState );
7268 this .memoryTracker = Objects .requireNonNull (memoryTracker );
69+ this .nodeLoadDetector = new NodeLoadDetector (Objects .requireNonNull (memoryTracker ));
7370 this .maxLazyNodes = maxLazyNodes ;
7471 this .nodeFilter = node -> {
7572 if (MachineLearning .isMlNode (node )) {
@@ -105,26 +102,42 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
105102 continue ;
106103 }
107104
108- // Assuming the node is eligible at all, check loading
109- CurrentLoad currentLoad = calculateCurrentLoadForNode (node , persistentTasks , allocateByMemory );
110- allocateByMemory = currentLoad .allocateByMemory ;
105+ NodeLoadDetector .NodeLoad currentLoad = nodeLoadDetector .detectNodeLoad (
106+ clusterState ,
107+ true , // Remove in 8.0.0
108+ node ,
109+ dynamicMaxOpenJobs ,
110+ maxMachineMemoryPercent ,
111+ allocateByMemory
112+ );
113+ if (currentLoad .getError () != null ) {
114+ reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes (node )
115+ + "], because [" + currentLoad .getError () + "]" ;
116+ logger .trace (reason );
117+ reasons .add (reason );
118+ continue ;
119+ }
111120
112- if (currentLoad .numberOfAllocatingJobs >= maxConcurrentJobAllocations ) {
113- reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes (node ) + "], because node exceeds ["
114- + currentLoad .numberOfAllocatingJobs + "] the maximum number of jobs [" + maxConcurrentJobAllocations
121+ // Assuming the node is eligible at all, check loading
122+ allocateByMemory = currentLoad .isUseMemory ();
123+ int maxNumberOfOpenJobs = currentLoad .getMaxJobs ();
124+
125+ if (currentLoad .getNumAllocatingJobs () >= maxConcurrentJobAllocations ) {
126+ reason = "Not opening job ["
127+ + jobId
128+ + "] on node [" + nodeNameAndMlAttributes (node ) + "], because node exceeds ["
129+ + currentLoad .getNumAllocatingJobs ()
130+ + "] the maximum number of jobs [" + maxConcurrentJobAllocations
115131 + "] in opening state" ;
116132 logger .trace (reason );
117133 reasons .add (reason );
118134 continue ;
119135 }
120136
121- Map <String , String > nodeAttributes = node .getAttributes ();
122- int maxNumberOfOpenJobs = dynamicMaxOpenJobs ;
123-
124- long availableCount = maxNumberOfOpenJobs - currentLoad .numberOfAssignedJobs ;
137+ long availableCount = maxNumberOfOpenJobs - currentLoad .getNumAssignedJobs ();
125138 if (availableCount == 0 ) {
126139 reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes (node )
127- + "], because this node is full. Number of opened jobs [" + currentLoad .numberOfAssignedJobs
140+ + "], because this node is full. Number of opened jobs [" + currentLoad .getNumAssignedJobs ()
128141 + "], " + MAX_OPEN_JOBS_PER_NODE .getKey () + " [" + maxNumberOfOpenJobs + "]" ;
129142 logger .trace (reason );
130143 reasons .add (reason );
@@ -136,33 +149,21 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
136149 minLoadedNodeByCount = node ;
137150 }
138151
139- String machineMemoryStr = nodeAttributes .get (MachineLearning .MACHINE_MEMORY_NODE_ATTR );
140- long machineMemory ;
141- try {
142- machineMemory = Long .parseLong (machineMemoryStr );
143- } catch (NumberFormatException e ) {
144- reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes (node ) + "], because " +
145- MachineLearning .MACHINE_MEMORY_NODE_ATTR + " attribute [" + machineMemoryStr + "] is not a long" ;
146- logger .trace (reason );
147- reasons .add (reason );
148- continue ;
149- }
150-
151152 if (allocateByMemory ) {
152- if (machineMemory > 0 ) {
153- long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100 ;
153+ if (currentLoad .getMaxMlMemory () > 0 ) {
154154 Long estimatedMemoryFootprint = memoryTracker .getJobMemoryRequirement (taskName , jobId );
155155 if (estimatedMemoryFootprint != null ) {
156156 // If this will be the first job assigned to the node then it will need to
157157 // load the native code shared libraries, so add the overhead for this
158- if (currentLoad .numberOfAssignedJobs == 0 ) {
158+ if (currentLoad .getNumAssignedJobs () == 0 ) {
159159 estimatedMemoryFootprint += MachineLearning .NATIVE_EXECUTABLE_CODE_OVERHEAD .getBytes ();
160160 }
161- long availableMemory = maxMlMemory - currentLoad .assignedJobMemory ;
161+ long availableMemory = currentLoad . getMaxMlMemory () - currentLoad .getAssignedJobMemory () ;
162162 if (estimatedMemoryFootprint > availableMemory ) {
163163 reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes (node )
164- + "], because this node has insufficient available memory. Available memory for ML [" + maxMlMemory
165- + "], memory required by existing jobs [" + currentLoad .assignedJobMemory
164+ + "], because this node has insufficient available memory. Available memory for ML ["
165+ + currentLoad .getMaxMlMemory ()
166+ + "], memory required by existing jobs [" + currentLoad .getAssignedJobMemory ()
166167 + "], estimated memory required for this job [" + estimatedMemoryFootprint + "]" ;
167168 logger .trace (reason );
168169 reasons .add (reason );
@@ -177,15 +178,20 @@ public PersistentTasksCustomMetadata.Assignment selectNode(int dynamicMaxOpenJob
177178 // If we cannot get the job memory requirement,
178179 // fall back to simply allocating by job count
179180 allocateByMemory = false ;
180- logger .debug ("Falling back to allocating job [{}] by job counts because its memory requirement was not available" ,
181- jobId );
181+ logger .debug (
182+ () -> new ParameterizedMessage (
183+ "Falling back to allocating job [{}] by job counts because its memory requirement was not available" ,
184+ jobId ));
182185 }
183186 } else {
184187 // If we cannot get the available memory on any machine in
185188 // the cluster, fall back to simply allocating by job count
186189 allocateByMemory = false ;
187- logger .debug ("Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]" ,
188- jobId , nodeNameAndMlAttributes (node ));
190+ logger .debug (
191+ () -> new ParameterizedMessage (
192+ "Falling back to allocating job [{}] by job counts because machine memory was not available for node [{}]" ,
193+ jobId ,
194+ nodeNameAndMlAttributes (node )));
189195 }
190196 }
191197 }
@@ -220,67 +226,6 @@ PersistentTasksCustomMetadata.Assignment considerLazyAssignment(PersistentTasksC
220226 return currentAssignment ;
221227 }
222228
223- private CurrentLoad calculateCurrentLoadForNode (DiscoveryNode node , PersistentTasksCustomMetadata persistentTasks ,
224- final boolean allocateByMemory ) {
225- CurrentLoad result = new CurrentLoad (allocateByMemory );
226-
227- if (persistentTasks != null ) {
228- // find all the anomaly detector job tasks assigned to this node
229- Collection <PersistentTasksCustomMetadata .PersistentTask <?>> assignedAnomalyDetectorTasks = persistentTasks .findTasks (
230- MlTasks .JOB_TASK_NAME , task -> node .getId ().equals (task .getExecutorNode ()));
231- for (PersistentTasksCustomMetadata .PersistentTask <?> assignedTask : assignedAnomalyDetectorTasks ) {
232- JobState jobState = MlTasks .getJobStateModifiedForReassignments (assignedTask );
233- if (jobState .isAnyOf (JobState .CLOSED , JobState .FAILED ) == false ) {
234- // Don't count CLOSED or FAILED jobs, as they don't consume native memory
235- ++result .numberOfAssignedJobs ;
236- if (jobState == JobState .OPENING ) {
237- ++result .numberOfAllocatingJobs ;
238- }
239- OpenJobAction .JobParams params = (OpenJobAction .JobParams ) assignedTask .getParams ();
240- Long jobMemoryRequirement = memoryTracker .getAnomalyDetectorJobMemoryRequirement (params .getJobId ());
241- if (jobMemoryRequirement == null ) {
242- result .allocateByMemory = false ;
243- logger .debug ("Falling back to allocating job [{}] by job counts because " +
244- "the memory requirement for job [{}] was not available" , jobId , params .getJobId ());
245- } else {
246- logger .debug ("adding " + jobMemoryRequirement );
247- result .assignedJobMemory += jobMemoryRequirement ;
248- }
249- }
250- }
251- // find all the data frame analytics job tasks assigned to this node
252- Collection <PersistentTasksCustomMetadata .PersistentTask <?>> assignedAnalyticsTasks = persistentTasks .findTasks (
253- MlTasks .DATA_FRAME_ANALYTICS_TASK_NAME , task -> node .getId ().equals (task .getExecutorNode ()));
254- for (PersistentTasksCustomMetadata .PersistentTask <?> assignedTask : assignedAnalyticsTasks ) {
255- DataFrameAnalyticsState dataFrameAnalyticsState = MlTasks .getDataFrameAnalyticsState (assignedTask );
256-
257- // Don't count stopped and failed df-analytics tasks as they don't consume native memory
258- if (dataFrameAnalyticsState .isAnyOf (DataFrameAnalyticsState .STOPPED , DataFrameAnalyticsState .FAILED ) == false ) {
259- // The native process is only running in the ANALYZING and STOPPING states, but in the STARTED
260- // and REINDEXING states we're committed to using the memory soon, so account for it here
261- ++result .numberOfAssignedJobs ;
262- StartDataFrameAnalyticsAction .TaskParams params =
263- (StartDataFrameAnalyticsAction .TaskParams ) assignedTask .getParams ();
264- Long jobMemoryRequirement = memoryTracker .getDataFrameAnalyticsJobMemoryRequirement (params .getId ());
265- if (jobMemoryRequirement == null ) {
266- result .allocateByMemory = false ;
267- logger .debug ("Falling back to allocating job [{}] by job counts because " +
268- "the memory requirement for job [{}] was not available" , jobId , params .getId ());
269- } else {
270- result .assignedJobMemory += jobMemoryRequirement ;
271- }
272- }
273- }
274- // if any jobs are running then the native code will be loaded, but shared between all jobs,
275- // so increase the total memory usage of the assigned jobs to account for this
276- if (result .numberOfAssignedJobs > 0 ) {
277- result .assignedJobMemory += MachineLearning .NATIVE_EXECUTABLE_CODE_OVERHEAD .getBytes ();
278- }
279- }
280-
281- return result ;
282- }
283-
284229 static String nodeNameOrId (DiscoveryNode node ) {
285230 String nodeNameOrID = node .getName ();
286231 if (Strings .isNullOrEmpty (nodeNameOrID )) {
@@ -308,15 +253,4 @@ static String nodeNameAndMlAttributes(DiscoveryNode node) {
308253 return builder .toString ();
309254 }
310255
311- private static class CurrentLoad {
312-
313- long numberOfAssignedJobs = 0 ;
314- long numberOfAllocatingJobs = 0 ;
315- long assignedJobMemory = 0 ;
316- boolean allocateByMemory ;
317-
318- CurrentLoad (boolean allocateByMemory ) {
319- this .allocateByMemory = allocateByMemory ;
320- }
321- }
322256}
0 commit comments