Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/reference/ml/apis/datafeedresource.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ update their values:
`ephemeral_id`::: The node ephemeral ID.
`transport_address`::: The host and port where transport HTTP connections are
accepted. For example, `127.0.0.1:9300`.
`attributes`::: For example, `{"ml.max_open_jobs": "10"}`.
`attributes`::: For example, `{"ml.machine_memory": "17179869184"}`.

`state`::
(string) The status of the {dfeed}, which can be one of the following values: +
Expand Down
3 changes: 1 addition & 2 deletions docs/reference/ml/apis/get-datafeed-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ The API returns the following results:
"transport_address": "127.0.0.1:9300",
"attributes": {
"ml.machine_memory": "17179869184",
"ml.max_open_jobs": "20",
"ml.enabled": "true"
"ml.max_open_jobs": "20"
}
},
"assignment_explanation": ""
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/ml/apis/jobcounts.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -229,4 +229,4 @@ This information is available only for open jobs.
(string) The host and port where transport HTTP connections are accepted.

`attributes`::
(object) For example, {"ml.max_open_jobs": "10"}.
(object) For example, {"ml.machine_memory": "17179869184"}.
22 changes: 14 additions & 8 deletions docs/reference/settings/ml-settings.asciidoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

[role="xpack"]
[[ml-settings]]
=== Machine learning settings in Elasticsearch
Expand Down Expand Up @@ -44,27 +45,32 @@ IMPORTANT: If you want to use {ml} features in your cluster, you must have
`xpack.ml.enabled` set to `true` on all master-eligible nodes. This is the
default behavior.

`xpack.ml.max_machine_memory_percent`::
`xpack.ml.max_machine_memory_percent` (<<cluster-update-settings,Dynamic>>)::
The maximum percentage of the machine's memory that {ml} may use for running
analytics processes. (These processes are separate to the {es} JVM.) Defaults to
`30` percent. The limit is based on the total memory of the machine, not current
free memory. Jobs will not be allocated to a node if doing so would cause the
estimated memory use of {ml} jobs to exceed the limit.

`xpack.ml.max_model_memory_limit`::
`xpack.ml.max_model_memory_limit` (<<cluster-update-settings,Dynamic>>)::
The maximum `model_memory_limit` property value that can be set for any job on
this node. If you try to create a job with a `model_memory_limit` property value
that is greater than this setting value, an error occurs. Existing jobs are not
affected when you update this setting. For more information about the
`model_memory_limit` property, see <<ml-apilimits>>.

`xpack.ml.max_open_jobs`::
The maximum number of jobs that can run on a node. Defaults to `20`.
The maximum number of jobs is also constrained by memory usage, so fewer
jobs than specified by this setting will run on a node if the estimated
memory use of the jobs would be higher than allowed.
`xpack.ml.max_open_jobs` (<<cluster-update-settings,Dynamic>>)::
The maximum number of jobs that can run simultaneously on a node. Defaults to
`20`. In this context, jobs include both anomaly detector jobs and data frame
analytics jobs. The maximum number of jobs is also constrained by memory usage.
Thus if the estimated memory usage of the jobs would be higher than allowed,
fewer jobs will run on a node. Prior to version 7.1, this setting was a per-node
non-dynamic setting. It became a cluster-wide dynamic
setting in version 7.1. As a result, changes to its value after node startup
are used only after every node in the cluster is running version 7.1 or higher.
The maximum permitted value is `512`.

`xpack.ml.node_concurrent_job_allocations`::
`xpack.ml.node_concurrent_job_allocations` (<<cluster-update-settings,Dynamic>>)::
The maximum number of jobs that can concurrently be in the `opening` state on
each node. Typically, jobs spend a small amount of time in this state before
they move to `open` state. Jobs that must restore large models when they are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.XPackPlugin;
Expand Down Expand Up @@ -256,7 +256,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
public static final String BASE_PATH = "/_ml/";
public static final String PRE_V7_BASE_PATH = "/_xpack/ml/";
public static final String DATAFEED_THREAD_POOL_NAME = NAME + "_datafeed";
public static final String AUTODETECT_THREAD_POOL_NAME = NAME + "_autodetect";
public static final String JOB_COMMS_THREAD_POOL_NAME = NAME + "_job_comms";
public static final String UTILITY_THREAD_POOL_NAME = NAME + "_utility";

// This is for performance testing. It's not exposed to the end user.
Expand All @@ -276,6 +276,17 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
public static final Setting<Integer> MAX_LAZY_ML_NODES =
Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope);

// Before 8.0.0 this needs to match the max allowed value for xpack.ml.max_open_jobs,
// as the current node could be running in a cluster where some nodes are still using
// that setting. From 8.0.0 onwards we have the flexibility to increase it...
private static final int MAX_MAX_OPEN_JOBS_PER_NODE = 512;
// This setting is cluster-wide and can be set dynamically. However, prior to version 7.1 it was
// a non-dynamic per-node setting. n a mixed version cluster containing 6.7 or 7.0 nodes those
// older nodes will not react to the dynamic changes. Therefore, in such mixed version clusters
// allocation will be based on the value first read at node startup rather than the current value.
public static final Setting<Integer> MAX_OPEN_JOBS_PER_NODE =
Setting.intSetting("xpack.ml.max_open_jobs", 20, 1, MAX_MAX_OPEN_JOBS_PER_NODE, Property.Dynamic, Property.NodeScope);

private static final Logger logger = LogManager.getLogger(XPackPlugin.class);

private final Settings settings;
Expand Down Expand Up @@ -315,7 +326,7 @@ public List<Setting<?>> getSettings() {
MAX_MACHINE_MEMORY_PERCENT,
AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING,
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC,
AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE,
MAX_OPEN_JOBS_PER_NODE,
AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP,
MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION));
}
Expand All @@ -333,8 +344,10 @@ public Settings additionalSettings() {
Settings.Builder additionalSettings = Settings.builder();
Boolean allocationEnabled = ML_ENABLED.get(settings);
if (allocationEnabled != null && allocationEnabled) {
// TODO: stop setting this attribute in 8.0.0 but disallow it (like mlEnabledNodeAttrName below)
// The ML UI will need to be changed to check machineMemoryAttrName instead before this is done
addMlNodeAttribute(additionalSettings, maxOpenJobsPerNodeNodeAttrName,
String.valueOf(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings)));
String.valueOf(MAX_OPEN_JOBS_PER_NODE.get(settings)));
addMlNodeAttribute(additionalSettings, machineMemoryAttrName,
Long.toString(machineMemoryFromStats(OsProbe.getInstance().osStats())));
// This is not used in v7 and higher, but users are still prevented from setting it directly to avoid confusion
Expand Down Expand Up @@ -608,35 +621,37 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
new ActionHandler<>(SetUpgradeModeAction.INSTANCE, TransportSetUpgradeModeAction.class)
);
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
if (false == enabled || transportClientMode) {
return emptyList();
}
int maxNumberOfJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings);
// 4 threads per job: for cpp logging, result processing, state processing and
// AutodetectProcessManager worker thread:
FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_THREAD_POOL_NAME,
maxNumberOfJobs * 4, maxNumberOfJobs * 4, "xpack.ml.autodetect_thread_pool");

// 4 threads per job: processing logging, result and state of the renormalization process.
// Renormalization does't run for the entire lifetime of a job, so additionally autodetect process
// based operation (open, close, flush, post data), datafeed based operations (start and stop)
// and deleting expired data use this threadpool too and queue up if all threads are busy.
FixedExecutorBuilder renormalizer = new FixedExecutorBuilder(settings, UTILITY_THREAD_POOL_NAME,
maxNumberOfJobs * 4, 500, "xpack.ml.utility_thread_pool");

// TODO: if datafeed and non datafeed jobs are considered more equal and the datafeed and
// autodetect process are created at the same time then these two different TPs can merge.
FixedExecutorBuilder datafeed = new FixedExecutorBuilder(settings, DATAFEED_THREAD_POOL_NAME,
maxNumberOfJobs, 200, "xpack.ml.datafeed_thread_pool");
return Arrays.asList(autoDetect, renormalizer, datafeed);

// These thread pools scale such that they can accommodate the maximum number of jobs per node
// that is permitted to be configured. It is up to other code to enforce the configured maximum
// number of jobs per node.

// 4 threads per job process: for input, c++ logger output, result processing and state processing.
ScalingExecutorBuilder jobComms = new ScalingExecutorBuilder(JOB_COMMS_THREAD_POOL_NAME,
4, MAX_MAX_OPEN_JOBS_PER_NODE * 4, TimeValue.timeValueMinutes(1), "xpack.ml.job_comms_thread_pool");

// This pool is used by renormalization, plus some other parts of ML that
// need to kick off non-trivial activities that mustn't block other threads.
ScalingExecutorBuilder utility = new ScalingExecutorBuilder(UTILITY_THREAD_POOL_NAME,
1, MAX_MAX_OPEN_JOBS_PER_NODE * 4, TimeValue.timeValueMinutes(10), "xpack.ml.utility_thread_pool");

ScalingExecutorBuilder datafeed = new ScalingExecutorBuilder(DATAFEED_THREAD_POOL_NAME,
1, MAX_MAX_OPEN_JOBS_PER_NODE, TimeValue.timeValueMinutes(1), "xpack.ml.datafeed_thread_pool");

return Arrays.asList(jobComms, utility, datafeed);
}

@Override
public Map<String, AnalysisProvider<TokenizerFactory>> getTokenizers() {
return Collections.singletonMap(MlClassicTokenizer.NAME, MlClassicTokenizerFactory::new);
}

@Override
public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDataUpgrader() {
return templates -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
Expand Down Expand Up @@ -68,7 +69,7 @@
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE;
import static org.elasticsearch.xpack.ml.MachineLearning.MAX_OPEN_JOBS_PER_NODE;

/*
This class extends from TransportMasterNodeAction for cluster state observing purposes.
Expand Down Expand Up @@ -131,11 +132,14 @@ static void validate(String jobId, Job job) {

static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String jobId, Job job,
ClusterState clusterState,
int dynamicMaxOpenJobs,
int maxConcurrentJobAllocations,
int maxMachineMemoryPercent,
MlMemoryTracker memoryTracker,
boolean isMemoryTrackerRecentlyRefreshed,
Logger logger) {
// TODO: remove in 8.0.0
boolean allNodesHaveDynamicMaxWorkers = clusterState.getNodes().getMinNodeVersion().onOrAfter(Version.V_7_1_0);

// Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe
// because of some weird OS problem) then fall back to the old mechanism of only considering numbers of assigned jobs
Expand Down Expand Up @@ -223,16 +227,19 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j
}

Map<String, String> nodeAttributes = node.getAttributes();
String maxNumberOfOpenJobsStr = nodeAttributes.get(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR);
int maxNumberOfOpenJobs;
try {
maxNumberOfOpenJobs = Integer.parseInt(maxNumberOfOpenJobsStr);
} catch (NumberFormatException e) {
String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " +
MachineLearning.MAX_OPEN_JOBS_NODE_ATTR + " attribute [" + maxNumberOfOpenJobsStr + "] is not an integer";
logger.trace(reason);
reasons.add(reason);
continue;
int maxNumberOfOpenJobs = dynamicMaxOpenJobs;
// TODO: remove this in 8.0.0
if (allNodesHaveDynamicMaxWorkers == false) {
String maxNumberOfOpenJobsStr = nodeAttributes.get(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR);
try {
maxNumberOfOpenJobs = Integer.parseInt(maxNumberOfOpenJobsStr);
} catch (NumberFormatException e) {
String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " +
MachineLearning.MAX_OPEN_JOBS_NODE_ATTR + " attribute [" + maxNumberOfOpenJobsStr + "] is not an integer";
logger.trace(reason);
reasons.add(reason);
continue;
}
}
long availableCount = maxNumberOfOpenJobs - numberOfAssignedJobs;
if (availableCount == 0) {
Expand Down Expand Up @@ -538,6 +545,7 @@ public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecut
private volatile int maxConcurrentJobAllocations;
private volatile int maxMachineMemoryPercent;
private volatile int maxLazyMLNodes;
private volatile int maxOpenJobs;
private volatile ClusterState clusterState;

public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService,
Expand All @@ -550,12 +558,14 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS
this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings);
this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings);
this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
this.maxOpenJobs = MAX_OPEN_JOBS_PER_NODE.get(settings);
this.clusterService = clusterService;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, this::setMaxConcurrentJobAllocations);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, this::setMaxMachineMemoryPercent);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_OPEN_JOBS_PER_NODE, this::setMaxOpenJobs);
clusterService.addListener(event -> clusterState = event.state());
}

Expand Down Expand Up @@ -596,6 +606,7 @@ public PersistentTasksCustomMetaData.Assignment getAssignment(OpenJobAction.JobP
PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(jobId,
params.getJob(),
clusterState,
maxOpenJobs,
maxConcurrentJobAllocations,
maxMachineMemoryPercent,
memoryTracker,
Expand Down Expand Up @@ -672,22 +683,20 @@ protected AllocatedPersistentTask createTask(long id, String type, String action
}

void setMaxConcurrentJobAllocations(int maxConcurrentJobAllocations) {
logger.info("Changing [{}] from [{}] to [{}]", MachineLearning.CONCURRENT_JOB_ALLOCATIONS.getKey(),
this.maxConcurrentJobAllocations, maxConcurrentJobAllocations);
this.maxConcurrentJobAllocations = maxConcurrentJobAllocations;
}

void setMaxMachineMemoryPercent(int maxMachineMemoryPercent) {
logger.info("Changing [{}] from [{}] to [{}]", MachineLearning.MAX_MACHINE_MEMORY_PERCENT.getKey(),
this.maxMachineMemoryPercent, maxMachineMemoryPercent);
this.maxMachineMemoryPercent = maxMachineMemoryPercent;
}

void setMaxLazyMLNodes(int maxLazyMLNodes) {
logger.info("Changing [{}] from [{}] to [{}]", MachineLearning.MAX_LAZY_ML_NODES.getKey(),
this.maxLazyMLNodes, maxLazyMLNodes);
this.maxLazyMLNodes = maxLazyMLNodes;
}

void setMaxOpenJobs(int maxOpenJobs) {
this.maxOpenJobs = maxOpenJobs;
}
}

public static class JobTask extends AllocatedPersistentTask implements OpenJobAction.JobTaskMatcher {
Expand Down
Loading