Skip to content

Commit 5f8f91c

Browse files
author
David Roberts
authored
[ML] Use scaling thread pool and xpack.ml.max_open_jobs cluster-wide dynamic (#39736)
This change does the following: 1. Makes the per-node setting xpack.ml.max_open_jobs into a cluster-wide dynamic setting 2. Changes the job node selection to continue to use the per-node attributes storing the maximum number of open jobs if any node in the cluster is older than 7.1, and use the dynamic cluster-wide setting if all nodes are on 7.1 or later 3. Changes the docs to reflect this 4. Changes the thread pools for native process communication from fixed size to scaling, to support the dynamic nature of xpack.ml.max_open_jobs 5. Renames the autodetect thread pool to the job comms thread pool to make clear that it will be used for other types of ML jobs (data frame analytics in particular) Backport of #39320
1 parent 52fd102 commit 5f8f91c

File tree

11 files changed

+154
-118
lines changed

11 files changed

+154
-118
lines changed

docs/reference/ml/apis/datafeedresource.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ update their values:
134134
`ephemeral_id`::: The node ephemeral ID.
135135
`transport_address`::: The host and port where transport HTTP connections are
136136
accepted. For example, `127.0.0.1:9300`.
137-
`attributes`::: For example, `{"ml.max_open_jobs": "10"}`.
137+
`attributes`::: For example, `{"ml.machine_memory": "17179869184"}`.
138138

139139
`state`::
140140
(string) The status of the {dfeed}, which can be one of the following values: +

docs/reference/ml/apis/get-datafeed-stats.asciidoc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,7 @@ The API returns the following results:
8686
"transport_address": "127.0.0.1:9300",
8787
"attributes": {
8888
"ml.machine_memory": "17179869184",
89-
"ml.max_open_jobs": "20",
90-
"ml.enabled": "true"
89+
"ml.max_open_jobs": "20"
9190
}
9291
},
9392
"assignment_explanation": ""

docs/reference/ml/apis/jobcounts.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,4 +229,4 @@ This information is available only for open jobs.
229229
(string) The host and port where transport HTTP connections are accepted.
230230

231231
`attributes`::
232-
(object) For example, {"ml.max_open_jobs": "10"}.
232+
(object) For example, {"ml.machine_memory": "17179869184"}.

docs/reference/settings/ml-settings.asciidoc

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
12
[role="xpack"]
23
[[ml-settings]]
34
=== Machine learning settings in Elasticsearch
@@ -44,27 +45,32 @@ IMPORTANT: If you want to use {ml} features in your cluster, you must have
4445
`xpack.ml.enabled` set to `true` on all master-eligible nodes. This is the
4546
default behavior.
4647

47-
`xpack.ml.max_machine_memory_percent`::
48+
`xpack.ml.max_machine_memory_percent` (<<cluster-update-settings,Dynamic>>)::
4849
The maximum percentage of the machine's memory that {ml} may use for running
4950
analytics processes. (These processes are separate to the {es} JVM.) Defaults to
5051
`30` percent. The limit is based on the total memory of the machine, not current
5152
free memory. Jobs will not be allocated to a node if doing so would cause the
5253
estimated memory use of {ml} jobs to exceed the limit.
5354

54-
`xpack.ml.max_model_memory_limit`::
55+
`xpack.ml.max_model_memory_limit` (<<cluster-update-settings,Dynamic>>)::
5556
The maximum `model_memory_limit` property value that can be set for any job on
5657
this node. If you try to create a job with a `model_memory_limit` property value
5758
that is greater than this setting value, an error occurs. Existing jobs are not
5859
affected when you update this setting. For more information about the
5960
`model_memory_limit` property, see <<ml-apilimits>>.
6061

61-
`xpack.ml.max_open_jobs`::
62-
The maximum number of jobs that can run on a node. Defaults to `20`.
63-
The maximum number of jobs is also constrained by memory usage, so fewer
64-
jobs than specified by this setting will run on a node if the estimated
65-
memory use of the jobs would be higher than allowed.
62+
`xpack.ml.max_open_jobs` (<<cluster-update-settings,Dynamic>>)::
63+
The maximum number of jobs that can run simultaneously on a node. Defaults to
64+
`20`. In this context, jobs include both anomaly detector jobs and data frame
65+
analytics jobs. The maximum number of jobs is also constrained by memory usage.
66+
Thus if the estimated memory usage of the jobs would be higher than allowed,
67+
fewer jobs will run on a node. Prior to version 7.1, this setting was a per-node
68+
non-dynamic setting. It became a cluster-wide dynamic
69+
setting in version 7.1. As a result, changes to its value after node startup
70+
are used only after every node in the cluster is running version 7.1 or higher.
71+
The maximum permitted value is `512`.
6672

67-
`xpack.ml.node_concurrent_job_allocations`::
73+
`xpack.ml.node_concurrent_job_allocations` (<<cluster-update-settings,Dynamic>>)::
6874
The maximum number of jobs that can concurrently be in the `opening` state on
6975
each node. Typically, jobs spend a small amount of time in this state before
7076
they move to `open` state. Jobs that must restore large models when they are

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
import org.elasticsearch.rest.RestHandler;
5252
import org.elasticsearch.script.ScriptService;
5353
import org.elasticsearch.threadpool.ExecutorBuilder;
54-
import org.elasticsearch.threadpool.FixedExecutorBuilder;
54+
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
5555
import org.elasticsearch.threadpool.ThreadPool;
5656
import org.elasticsearch.watcher.ResourceWatcherService;
5757
import org.elasticsearch.xpack.core.XPackPlugin;
@@ -256,7 +256,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
256256
public static final String BASE_PATH = "/_ml/";
257257
public static final String PRE_V7_BASE_PATH = "/_xpack/ml/";
258258
public static final String DATAFEED_THREAD_POOL_NAME = NAME + "_datafeed";
259-
public static final String AUTODETECT_THREAD_POOL_NAME = NAME + "_autodetect";
259+
public static final String JOB_COMMS_THREAD_POOL_NAME = NAME + "_job_comms";
260260
public static final String UTILITY_THREAD_POOL_NAME = NAME + "_utility";
261261

262262
// This is for performance testing. It's not exposed to the end user.
@@ -276,6 +276,17 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
276276
public static final Setting<Integer> MAX_LAZY_ML_NODES =
277277
Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope);
278278

279+
// Before 8.0.0 this needs to match the max allowed value for xpack.ml.max_open_jobs,
280+
// as the current node could be running in a cluster where some nodes are still using
281+
// that setting. From 8.0.0 onwards we have the flexibility to increase it...
282+
private static final int MAX_MAX_OPEN_JOBS_PER_NODE = 512;
283+
// This setting is cluster-wide and can be set dynamically. However, prior to version 7.1 it was
284+
// a non-dynamic per-node setting. n a mixed version cluster containing 6.7 or 7.0 nodes those
285+
// older nodes will not react to the dynamic changes. Therefore, in such mixed version clusters
286+
// allocation will be based on the value first read at node startup rather than the current value.
287+
public static final Setting<Integer> MAX_OPEN_JOBS_PER_NODE =
288+
Setting.intSetting("xpack.ml.max_open_jobs", 20, 1, MAX_MAX_OPEN_JOBS_PER_NODE, Property.Dynamic, Property.NodeScope);
289+
279290
private static final Logger logger = LogManager.getLogger(XPackPlugin.class);
280291

281292
private final Settings settings;
@@ -315,7 +326,7 @@ public List<Setting<?>> getSettings() {
315326
MAX_MACHINE_MEMORY_PERCENT,
316327
AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING,
317328
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC,
318-
AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE,
329+
MAX_OPEN_JOBS_PER_NODE,
319330
AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP,
320331
MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION));
321332
}
@@ -333,8 +344,10 @@ public Settings additionalSettings() {
333344
Settings.Builder additionalSettings = Settings.builder();
334345
Boolean allocationEnabled = ML_ENABLED.get(settings);
335346
if (allocationEnabled != null && allocationEnabled) {
347+
// TODO: stop setting this attribute in 8.0.0 but disallow it (like mlEnabledNodeAttrName below)
348+
// The ML UI will need to be changed to check machineMemoryAttrName instead before this is done
336349
addMlNodeAttribute(additionalSettings, maxOpenJobsPerNodeNodeAttrName,
337-
String.valueOf(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings)));
350+
String.valueOf(MAX_OPEN_JOBS_PER_NODE.get(settings)));
338351
addMlNodeAttribute(additionalSettings, machineMemoryAttrName,
339352
Long.toString(machineMemoryFromStats(OsProbe.getInstance().osStats())));
340353
// This is not used in v7 and higher, but users are still prevented from setting it directly to avoid confusion
@@ -608,35 +621,37 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
608621
new ActionHandler<>(SetUpgradeModeAction.INSTANCE, TransportSetUpgradeModeAction.class)
609622
);
610623
}
624+
611625
@Override
612626
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
613627
if (false == enabled || transportClientMode) {
614628
return emptyList();
615629
}
616-
int maxNumberOfJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings);
617-
// 4 threads per job: for cpp logging, result processing, state processing and
618-
// AutodetectProcessManager worker thread:
619-
FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_THREAD_POOL_NAME,
620-
maxNumberOfJobs * 4, maxNumberOfJobs * 4, "xpack.ml.autodetect_thread_pool");
621-
622-
// 4 threads per job: processing logging, result and state of the renormalization process.
623-
// Renormalization does't run for the entire lifetime of a job, so additionally autodetect process
624-
// based operation (open, close, flush, post data), datafeed based operations (start and stop)
625-
// and deleting expired data use this threadpool too and queue up if all threads are busy.
626-
FixedExecutorBuilder renormalizer = new FixedExecutorBuilder(settings, UTILITY_THREAD_POOL_NAME,
627-
maxNumberOfJobs * 4, 500, "xpack.ml.utility_thread_pool");
628-
629-
// TODO: if datafeed and non datafeed jobs are considered more equal and the datafeed and
630-
// autodetect process are created at the same time then these two different TPs can merge.
631-
FixedExecutorBuilder datafeed = new FixedExecutorBuilder(settings, DATAFEED_THREAD_POOL_NAME,
632-
maxNumberOfJobs, 200, "xpack.ml.datafeed_thread_pool");
633-
return Arrays.asList(autoDetect, renormalizer, datafeed);
630+
631+
// These thread pools scale such that they can accommodate the maximum number of jobs per node
632+
// that is permitted to be configured. It is up to other code to enforce the configured maximum
633+
// number of jobs per node.
634+
635+
// 4 threads per job process: for input, c++ logger output, result processing and state processing.
636+
ScalingExecutorBuilder jobComms = new ScalingExecutorBuilder(JOB_COMMS_THREAD_POOL_NAME,
637+
4, MAX_MAX_OPEN_JOBS_PER_NODE * 4, TimeValue.timeValueMinutes(1), "xpack.ml.job_comms_thread_pool");
638+
639+
// This pool is used by renormalization, plus some other parts of ML that
640+
// need to kick off non-trivial activities that mustn't block other threads.
641+
ScalingExecutorBuilder utility = new ScalingExecutorBuilder(UTILITY_THREAD_POOL_NAME,
642+
1, MAX_MAX_OPEN_JOBS_PER_NODE * 4, TimeValue.timeValueMinutes(10), "xpack.ml.utility_thread_pool");
643+
644+
ScalingExecutorBuilder datafeed = new ScalingExecutorBuilder(DATAFEED_THREAD_POOL_NAME,
645+
1, MAX_MAX_OPEN_JOBS_PER_NODE, TimeValue.timeValueMinutes(1), "xpack.ml.datafeed_thread_pool");
646+
647+
return Arrays.asList(jobComms, utility, datafeed);
634648
}
635649

636650
@Override
637651
public Map<String, AnalysisProvider<TokenizerFactory>> getTokenizers() {
638652
return Collections.singletonMap(MlClassicTokenizer.NAME, MlClassicTokenizerFactory::new);
639653
}
654+
640655
@Override
641656
public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDataUpgrader() {
642657
return templates -> {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.ElasticsearchException;
1111
import org.elasticsearch.ElasticsearchStatusException;
1212
import org.elasticsearch.ResourceAlreadyExistsException;
13+
import org.elasticsearch.Version;
1314
import org.elasticsearch.action.ActionListener;
1415
import org.elasticsearch.action.support.ActionFilters;
1516
import org.elasticsearch.action.support.IndicesOptions;
@@ -68,7 +69,7 @@
6869
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
6970
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
7071
import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
71-
import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE;
72+
import static org.elasticsearch.xpack.ml.MachineLearning.MAX_OPEN_JOBS_PER_NODE;
7273

7374
/*
7475
This class extends from TransportMasterNodeAction for cluster state observing purposes.
@@ -131,11 +132,14 @@ static void validate(String jobId, Job job) {
131132

132133
static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String jobId, Job job,
133134
ClusterState clusterState,
135+
int dynamicMaxOpenJobs,
134136
int maxConcurrentJobAllocations,
135137
int maxMachineMemoryPercent,
136138
MlMemoryTracker memoryTracker,
137139
boolean isMemoryTrackerRecentlyRefreshed,
138140
Logger logger) {
141+
// TODO: remove in 8.0.0
142+
boolean allNodesHaveDynamicMaxWorkers = clusterState.getNodes().getMinNodeVersion().onOrAfter(Version.V_7_1_0);
139143

140144
// Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe
141145
// because of some weird OS problem) then fall back to the old mechanism of only considering numbers of assigned jobs
@@ -223,16 +227,19 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j
223227
}
224228

225229
Map<String, String> nodeAttributes = node.getAttributes();
226-
String maxNumberOfOpenJobsStr = nodeAttributes.get(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR);
227-
int maxNumberOfOpenJobs;
228-
try {
229-
maxNumberOfOpenJobs = Integer.parseInt(maxNumberOfOpenJobsStr);
230-
} catch (NumberFormatException e) {
231-
String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " +
232-
MachineLearning.MAX_OPEN_JOBS_NODE_ATTR + " attribute [" + maxNumberOfOpenJobsStr + "] is not an integer";
233-
logger.trace(reason);
234-
reasons.add(reason);
235-
continue;
230+
int maxNumberOfOpenJobs = dynamicMaxOpenJobs;
231+
// TODO: remove this in 8.0.0
232+
if (allNodesHaveDynamicMaxWorkers == false) {
233+
String maxNumberOfOpenJobsStr = nodeAttributes.get(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR);
234+
try {
235+
maxNumberOfOpenJobs = Integer.parseInt(maxNumberOfOpenJobsStr);
236+
} catch (NumberFormatException e) {
237+
String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " +
238+
MachineLearning.MAX_OPEN_JOBS_NODE_ATTR + " attribute [" + maxNumberOfOpenJobsStr + "] is not an integer";
239+
logger.trace(reason);
240+
reasons.add(reason);
241+
continue;
242+
}
236243
}
237244
long availableCount = maxNumberOfOpenJobs - numberOfAssignedJobs;
238245
if (availableCount == 0) {
@@ -538,6 +545,7 @@ public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecut
538545
private volatile int maxConcurrentJobAllocations;
539546
private volatile int maxMachineMemoryPercent;
540547
private volatile int maxLazyMLNodes;
548+
private volatile int maxOpenJobs;
541549
private volatile ClusterState clusterState;
542550

543551
public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService,
@@ -550,12 +558,14 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS
550558
this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings);
551559
this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings);
552560
this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
561+
this.maxOpenJobs = MAX_OPEN_JOBS_PER_NODE.get(settings);
553562
this.clusterService = clusterService;
554563
clusterService.getClusterSettings()
555564
.addSettingsUpdateConsumer(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, this::setMaxConcurrentJobAllocations);
556565
clusterService.getClusterSettings()
557566
.addSettingsUpdateConsumer(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, this::setMaxMachineMemoryPercent);
558567
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes);
568+
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_OPEN_JOBS_PER_NODE, this::setMaxOpenJobs);
559569
clusterService.addListener(event -> clusterState = event.state());
560570
}
561571

@@ -596,6 +606,7 @@ public PersistentTasksCustomMetaData.Assignment getAssignment(OpenJobAction.JobP
596606
PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(jobId,
597607
params.getJob(),
598608
clusterState,
609+
maxOpenJobs,
599610
maxConcurrentJobAllocations,
600611
maxMachineMemoryPercent,
601612
memoryTracker,
@@ -672,22 +683,20 @@ protected AllocatedPersistentTask createTask(long id, String type, String action
672683
}
673684

674685
void setMaxConcurrentJobAllocations(int maxConcurrentJobAllocations) {
675-
logger.info("Changing [{}] from [{}] to [{}]", MachineLearning.CONCURRENT_JOB_ALLOCATIONS.getKey(),
676-
this.maxConcurrentJobAllocations, maxConcurrentJobAllocations);
677686
this.maxConcurrentJobAllocations = maxConcurrentJobAllocations;
678687
}
679688

680689
void setMaxMachineMemoryPercent(int maxMachineMemoryPercent) {
681-
logger.info("Changing [{}] from [{}] to [{}]", MachineLearning.MAX_MACHINE_MEMORY_PERCENT.getKey(),
682-
this.maxMachineMemoryPercent, maxMachineMemoryPercent);
683690
this.maxMachineMemoryPercent = maxMachineMemoryPercent;
684691
}
685692

686693
void setMaxLazyMLNodes(int maxLazyMLNodes) {
687-
logger.info("Changing [{}] from [{}] to [{}]", MachineLearning.MAX_LAZY_ML_NODES.getKey(),
688-
this.maxLazyMLNodes, maxLazyMLNodes);
689694
this.maxLazyMLNodes = maxLazyMLNodes;
690695
}
696+
697+
void setMaxOpenJobs(int maxOpenJobs) {
698+
this.maxOpenJobs = maxOpenJobs;
699+
}
691700
}
692701

693702
public static class JobTask extends AllocatedPersistentTask implements OpenJobAction.JobTaskMatcher {

0 commit comments

Comments
 (0)