@@ -100,6 +100,9 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
100100 private final PersistentTasksService persistentTasksService ;
101101 private final Client client ;
102102 private final JobResultsProvider jobResultsProvider ;
103+ private static final PersistentTasksCustomMetaData .Assignment AWAITING_LAZY_ASSIGNMENT =
104+ new PersistentTasksCustomMetaData .Assignment (null , "persistent task is awaiting node assignment." );
105+
103106
104107 @ Inject
105108 public TransportOpenJobAction (Settings settings , TransportService transportService , ThreadPool threadPool ,
@@ -700,6 +703,7 @@ public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecut
700703 private final int fallbackMaxNumberOfOpenJobs ;
701704 private volatile int maxConcurrentJobAllocations ;
702705 private volatile int maxMachineMemoryPercent ;
706+ private volatile int maxLazyMLNodes ;
703707
704708 public OpenJobPersistentTasksExecutor (Settings settings , ClusterService clusterService ,
705709 AutodetectProcessManager autodetectProcessManager ) {
@@ -708,16 +712,35 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS
708712 this .fallbackMaxNumberOfOpenJobs = AutodetectProcessManager .MAX_OPEN_JOBS_PER_NODE .get (settings );
709713 this .maxConcurrentJobAllocations = MachineLearning .CONCURRENT_JOB_ALLOCATIONS .get (settings );
710714 this .maxMachineMemoryPercent = MachineLearning .MAX_MACHINE_MEMORY_PERCENT .get (settings );
715+ this .maxLazyMLNodes = MachineLearning .MAX_LAZY_ML_NODES .get (settings );
711716 clusterService .getClusterSettings ()
712717 .addSettingsUpdateConsumer (MachineLearning .CONCURRENT_JOB_ALLOCATIONS , this ::setMaxConcurrentJobAllocations );
713718 clusterService .getClusterSettings ()
714719 .addSettingsUpdateConsumer (MachineLearning .MAX_MACHINE_MEMORY_PERCENT , this ::setMaxMachineMemoryPercent );
720+ clusterService .getClusterSettings ().addSettingsUpdateConsumer (MachineLearning .MAX_LAZY_ML_NODES , this ::setMaxLazyMLNodes );
715721 }
716722
717723 @ Override
718724 public PersistentTasksCustomMetaData .Assignment getAssignment (OpenJobAction .JobParams params , ClusterState clusterState ) {
719- return selectLeastLoadedMlNode (params .getJobId (), clusterState , maxConcurrentJobAllocations , fallbackMaxNumberOfOpenJobs ,
720- maxMachineMemoryPercent , logger );
725+ PersistentTasksCustomMetaData .Assignment assignment = selectLeastLoadedMlNode (params .getJobId (),
726+ clusterState ,
727+ maxConcurrentJobAllocations ,
728+ fallbackMaxNumberOfOpenJobs ,
729+ maxMachineMemoryPercent ,
730+ logger );
731+ if (assignment .getExecutorNode () == null ) {
732+ int numMlNodes = 0 ;
733+ for (DiscoveryNode node : clusterState .getNodes ()) {
734+ if (Boolean .valueOf (node .getAttributes ().get (MachineLearning .ML_ENABLED_NODE_ATTR ))) {
735+ numMlNodes ++;
736+ }
737+ }
738+
739+ if (numMlNodes < maxLazyMLNodes ) { // Means we have lazy nodes left to allocate
740+ assignment = AWAITING_LAZY_ASSIGNMENT ;
741+ }
742+ }
743+ return assignment ;
721744 }
722745
723746 @ Override
@@ -727,9 +750,9 @@ public void validate(OpenJobAction.JobParams params, ClusterState clusterState)
727750
728751 // If we already know that we can't find an ml node because all ml nodes are running at capacity or
729752 // simply because there are no ml nodes in the cluster then we fail quickly here:
730- PersistentTasksCustomMetaData . Assignment assignment = selectLeastLoadedMlNode ( params . getJobId (), clusterState ,
731- maxConcurrentJobAllocations , fallbackMaxNumberOfOpenJobs , maxMachineMemoryPercent , logger );
732- if (assignment .getExecutorNode () == null ) {
753+
754+ PersistentTasksCustomMetaData . Assignment assignment = getAssignment ( params , clusterState );
755+ if (assignment .getExecutorNode () == null && assignment . equals ( AWAITING_LAZY_ASSIGNMENT ) == false ) {
733756 throw makeNoSuitableNodesException (logger , params .getJobId (), assignment .getExplanation ());
734757 }
735758 }
@@ -773,6 +796,12 @@ void setMaxMachineMemoryPercent(int maxMachineMemoryPercent) {
773796 this .maxMachineMemoryPercent , maxMachineMemoryPercent );
774797 this .maxMachineMemoryPercent = maxMachineMemoryPercent ;
775798 }
799+
800+ void setMaxLazyMLNodes (int maxLazyMLNodes ) {
801+ logger .info ("Changing [{}] from [{}] to [{}]" , MachineLearning .MAX_LAZY_ML_NODES .getKey (),
802+ this .maxLazyMLNodes , maxLazyMLNodes );
803+ this .maxLazyMLNodes = maxLazyMLNodes ;
804+ }
776805 }
777806
778807 public static class JobTask extends AllocatedPersistentTask implements OpenJobAction .JobTaskMatcher {
@@ -830,6 +859,12 @@ public boolean test(PersistentTasksCustomMetaData.PersistentTask<?> persistentTa
830859 jobState = jobTaskState == null ? JobState .OPENING : jobTaskState .getState ();
831860
832861 PersistentTasksCustomMetaData .Assignment assignment = persistentTask .getAssignment ();
862+
863+ // This means we are awaiting a new node to be spun up, ok to return back to the user to await node creation
864+ if (assignment != null && assignment .equals (AWAITING_LAZY_ASSIGNMENT )) {
865+ return true ;
866+ }
867+
833868 // This logic is only appropriate when opening a job, not when reallocating following a failure,
834869 // and this is why this class must only be used when opening a job
835870 if (assignment != null && assignment .equals (PersistentTasksCustomMetaData .INITIAL_ASSIGNMENT ) == false &&
0 commit comments