From 989f65c3237601cd703126a590b9c1c29291bee9 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 6 Mar 2019 09:45:13 +0000 Subject: [PATCH] [ML] Use scaling thread pool and xpack.ml.max_open_jobs cluster-wide dynamic This change does the following: 1. Makes the per-node setting xpack.ml.max_open_jobs into a cluster-wide dynamic setting 2. Changes the job node selection to continue to use the per-node attributes storing the maximum number of open jobs if any node in the cluster is older than 7.1, and use the dynamic cluster-wide setting if all nodes are on 7.1 or later 3. Changes the docs to reflect this 4. Changes the thread pools for native process communication from fixed size to scaling, to support the dynamic nature of xpack.ml.max_open_jobs 5. Renames the autodetect thread pool to the job comms thread pool to make clear that it will be used for other types of ML jobs (data frame analytics in particular) Backport of #39320 --- .../ml/apis/datafeedresource.asciidoc | 2 +- .../ml/apis/get-datafeed-stats.asciidoc | 3 +- docs/reference/ml/apis/jobcounts.asciidoc | 2 +- docs/reference/settings/ml-settings.asciidoc | 22 ++++--- .../xpack/ml/MachineLearning.java | 59 ++++++++++++------- .../ml/action/TransportOpenJobAction.java | 43 ++++++++------ .../autodetect/AutodetectProcessManager.java | 32 +++++----- .../xpack/ml/MachineLearningTests.java | 12 ++++ .../action/TransportOpenJobActionTests.java | 44 +++++++------- .../xpack/ml/integration/TooManyJobsIT.java | 28 +++++---- .../AutodetectProcessManagerTests.java | 25 +++----- 11 files changed, 154 insertions(+), 118 deletions(-) diff --git a/docs/reference/ml/apis/datafeedresource.asciidoc b/docs/reference/ml/apis/datafeedresource.asciidoc index 8e1251067dd9f..33fce3dbf7c9d 100644 --- a/docs/reference/ml/apis/datafeedresource.asciidoc +++ b/docs/reference/ml/apis/datafeedresource.asciidoc @@ -134,7 +134,7 @@ update their values: `ephemeral_id`::: The node ephemeral ID. `transport_address`::: The host and port where transport HTTP connections are accepted. For example, `127.0.0.1:9300`. - `attributes`::: For example, `{"ml.max_open_jobs": "10"}`. + `attributes`::: For example, `{"ml.machine_memory": "17179869184"}`. `state`:: (string) The status of the {dfeed}, which can be one of the following values: + diff --git a/docs/reference/ml/apis/get-datafeed-stats.asciidoc b/docs/reference/ml/apis/get-datafeed-stats.asciidoc index 34c27d3dae962..cb2ca19a60b43 100644 --- a/docs/reference/ml/apis/get-datafeed-stats.asciidoc +++ b/docs/reference/ml/apis/get-datafeed-stats.asciidoc @@ -86,8 +86,7 @@ The API returns the following results: "transport_address": "127.0.0.1:9300", "attributes": { "ml.machine_memory": "17179869184", - "ml.max_open_jobs": "20", - "ml.enabled": "true" + "ml.max_open_jobs": "20" } }, "assignment_explanation": "" diff --git a/docs/reference/ml/apis/jobcounts.asciidoc b/docs/reference/ml/apis/jobcounts.asciidoc index c2e3aebb1a0a1..ed69a8b0a796b 100644 --- a/docs/reference/ml/apis/jobcounts.asciidoc +++ b/docs/reference/ml/apis/jobcounts.asciidoc @@ -229,4 +229,4 @@ This information is available only for open jobs. (string) The host and port where transport HTTP connections are accepted. `attributes`:: - (object) For example, {"ml.max_open_jobs": "10"}. + (object) For example, {"ml.machine_memory": "17179869184"}. diff --git a/docs/reference/settings/ml-settings.asciidoc b/docs/reference/settings/ml-settings.asciidoc index 113f264a31331..dbc11223f40ce 100644 --- a/docs/reference/settings/ml-settings.asciidoc +++ b/docs/reference/settings/ml-settings.asciidoc @@ -1,3 +1,4 @@ + [role="xpack"] [[ml-settings]] === Machine learning settings in Elasticsearch @@ -44,27 +45,32 @@ IMPORTANT: If you want to use {ml} features in your cluster, you must have `xpack.ml.enabled` set to `true` on all master-eligible nodes. This is the default behavior. -`xpack.ml.max_machine_memory_percent`:: +`xpack.ml.max_machine_memory_percent` (<>):: The maximum percentage of the machine's memory that {ml} may use for running analytics processes. (These processes are separate to the {es} JVM.) Defaults to `30` percent. The limit is based on the total memory of the machine, not current free memory. Jobs will not be allocated to a node if doing so would cause the estimated memory use of {ml} jobs to exceed the limit. -`xpack.ml.max_model_memory_limit`:: +`xpack.ml.max_model_memory_limit` (<>):: The maximum `model_memory_limit` property value that can be set for any job on this node. If you try to create a job with a `model_memory_limit` property value that is greater than this setting value, an error occurs. Existing jobs are not affected when you update this setting. For more information about the `model_memory_limit` property, see <>. -`xpack.ml.max_open_jobs`:: -The maximum number of jobs that can run on a node. Defaults to `20`. -The maximum number of jobs is also constrained by memory usage, so fewer -jobs than specified by this setting will run on a node if the estimated -memory use of the jobs would be higher than allowed. +`xpack.ml.max_open_jobs` (<>):: +The maximum number of jobs that can run simultaneously on a node. Defaults to +`20`. In this context, jobs include both anomaly detector jobs and data frame +analytics jobs. The maximum number of jobs is also constrained by memory usage. +Thus if the estimated memory usage of the jobs would be higher than allowed, +fewer jobs will run on a node. Prior to version 7.1, this setting was a per-node +non-dynamic setting. It became a cluster-wide dynamic +setting in version 7.1. As a result, changes to its value after node startup +are used only after every node in the cluster is running version 7.1 or higher. +The maximum permitted value is `512`. -`xpack.ml.node_concurrent_job_allocations`:: +`xpack.ml.node_concurrent_job_allocations` (<>):: The maximum number of jobs that can concurrently be in the `opening` state on each node. Typically, jobs spend a small amount of time in this state before they move to `open` state. Jobs that must restore large models when they are diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 434305aac341f..4cc5857bad6af 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -51,7 +51,7 @@ import org.elasticsearch.rest.RestHandler; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ExecutorBuilder; -import org.elasticsearch.threadpool.FixedExecutorBuilder; +import org.elasticsearch.threadpool.ScalingExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.core.XPackPlugin; @@ -256,7 +256,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu public static final String BASE_PATH = "/_ml/"; public static final String PRE_V7_BASE_PATH = "/_xpack/ml/"; public static final String DATAFEED_THREAD_POOL_NAME = NAME + "_datafeed"; - public static final String AUTODETECT_THREAD_POOL_NAME = NAME + "_autodetect"; + public static final String JOB_COMMS_THREAD_POOL_NAME = NAME + "_job_comms"; public static final String UTILITY_THREAD_POOL_NAME = NAME + "_utility"; // This is for performance testing. It's not exposed to the end user. @@ -276,6 +276,17 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu public static final Setting MAX_LAZY_ML_NODES = Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope); + // Before 8.0.0 this needs to match the max allowed value for xpack.ml.max_open_jobs, + // as the current node could be running in a cluster where some nodes are still using + // that setting. From 8.0.0 onwards we have the flexibility to increase it... + private static final int MAX_MAX_OPEN_JOBS_PER_NODE = 512; + // This setting is cluster-wide and can be set dynamically. However, prior to version 7.1 it was + // a non-dynamic per-node setting. n a mixed version cluster containing 6.7 or 7.0 nodes those + // older nodes will not react to the dynamic changes. Therefore, in such mixed version clusters + // allocation will be based on the value first read at node startup rather than the current value. + public static final Setting MAX_OPEN_JOBS_PER_NODE = + Setting.intSetting("xpack.ml.max_open_jobs", 20, 1, MAX_MAX_OPEN_JOBS_PER_NODE, Property.Dynamic, Property.NodeScope); + private static final Logger logger = LogManager.getLogger(XPackPlugin.class); private final Settings settings; @@ -315,7 +326,7 @@ public List> getSettings() { MAX_MACHINE_MEMORY_PERCENT, AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC, - AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE, + MAX_OPEN_JOBS_PER_NODE, AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP, MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION)); } @@ -333,8 +344,10 @@ public Settings additionalSettings() { Settings.Builder additionalSettings = Settings.builder(); Boolean allocationEnabled = ML_ENABLED.get(settings); if (allocationEnabled != null && allocationEnabled) { + // TODO: stop setting this attribute in 8.0.0 but disallow it (like mlEnabledNodeAttrName below) + // The ML UI will need to be changed to check machineMemoryAttrName instead before this is done addMlNodeAttribute(additionalSettings, maxOpenJobsPerNodeNodeAttrName, - String.valueOf(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings))); + String.valueOf(MAX_OPEN_JOBS_PER_NODE.get(settings))); addMlNodeAttribute(additionalSettings, machineMemoryAttrName, Long.toString(machineMemoryFromStats(OsProbe.getInstance().osStats()))); // This is not used in v7 and higher, but users are still prevented from setting it directly to avoid confusion @@ -608,35 +621,37 @@ public List getRestHandlers(Settings settings, RestController restC new ActionHandler<>(SetUpgradeModeAction.INSTANCE, TransportSetUpgradeModeAction.class) ); } + @Override public List> getExecutorBuilders(Settings settings) { if (false == enabled || transportClientMode) { return emptyList(); } - int maxNumberOfJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings); - // 4 threads per job: for cpp logging, result processing, state processing and - // AutodetectProcessManager worker thread: - FixedExecutorBuilder autoDetect = new FixedExecutorBuilder(settings, AUTODETECT_THREAD_POOL_NAME, - maxNumberOfJobs * 4, maxNumberOfJobs * 4, "xpack.ml.autodetect_thread_pool"); - - // 4 threads per job: processing logging, result and state of the renormalization process. - // Renormalization does't run for the entire lifetime of a job, so additionally autodetect process - // based operation (open, close, flush, post data), datafeed based operations (start and stop) - // and deleting expired data use this threadpool too and queue up if all threads are busy. - FixedExecutorBuilder renormalizer = new FixedExecutorBuilder(settings, UTILITY_THREAD_POOL_NAME, - maxNumberOfJobs * 4, 500, "xpack.ml.utility_thread_pool"); - - // TODO: if datafeed and non datafeed jobs are considered more equal and the datafeed and - // autodetect process are created at the same time then these two different TPs can merge. - FixedExecutorBuilder datafeed = new FixedExecutorBuilder(settings, DATAFEED_THREAD_POOL_NAME, - maxNumberOfJobs, 200, "xpack.ml.datafeed_thread_pool"); - return Arrays.asList(autoDetect, renormalizer, datafeed); + + // These thread pools scale such that they can accommodate the maximum number of jobs per node + // that is permitted to be configured. It is up to other code to enforce the configured maximum + // number of jobs per node. + + // 4 threads per job process: for input, c++ logger output, result processing and state processing. + ScalingExecutorBuilder jobComms = new ScalingExecutorBuilder(JOB_COMMS_THREAD_POOL_NAME, + 4, MAX_MAX_OPEN_JOBS_PER_NODE * 4, TimeValue.timeValueMinutes(1), "xpack.ml.job_comms_thread_pool"); + + // This pool is used by renormalization, plus some other parts of ML that + // need to kick off non-trivial activities that mustn't block other threads. + ScalingExecutorBuilder utility = new ScalingExecutorBuilder(UTILITY_THREAD_POOL_NAME, + 1, MAX_MAX_OPEN_JOBS_PER_NODE * 4, TimeValue.timeValueMinutes(10), "xpack.ml.utility_thread_pool"); + + ScalingExecutorBuilder datafeed = new ScalingExecutorBuilder(DATAFEED_THREAD_POOL_NAME, + 1, MAX_MAX_OPEN_JOBS_PER_NODE, TimeValue.timeValueMinutes(1), "xpack.ml.datafeed_thread_pool"); + + return Arrays.asList(jobComms, utility, datafeed); } @Override public Map> getTokenizers() { return Collections.singletonMap(MlClassicTokenizer.NAME, MlClassicTokenizerFactory::new); } + @Override public UnaryOperator> getIndexTemplateMetaDataUpgrader() { return templates -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index ac4f435da130d..162ed33657479 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.IndicesOptions; @@ -68,7 +69,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE; -import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE; +import static org.elasticsearch.xpack.ml.MachineLearning.MAX_OPEN_JOBS_PER_NODE; /* This class extends from TransportMasterNodeAction for cluster state observing purposes. @@ -131,11 +132,14 @@ static void validate(String jobId, Job job) { static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String jobId, Job job, ClusterState clusterState, + int dynamicMaxOpenJobs, int maxConcurrentJobAllocations, int maxMachineMemoryPercent, MlMemoryTracker memoryTracker, boolean isMemoryTrackerRecentlyRefreshed, Logger logger) { + // TODO: remove in 8.0.0 + boolean allNodesHaveDynamicMaxWorkers = clusterState.getNodes().getMinNodeVersion().onOrAfter(Version.V_7_1_0); // Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe // because of some weird OS problem) then fall back to the old mechanism of only considering numbers of assigned jobs @@ -223,16 +227,19 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j } Map nodeAttributes = node.getAttributes(); - String maxNumberOfOpenJobsStr = nodeAttributes.get(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR); - int maxNumberOfOpenJobs; - try { - maxNumberOfOpenJobs = Integer.parseInt(maxNumberOfOpenJobsStr); - } catch (NumberFormatException e) { - String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " + - MachineLearning.MAX_OPEN_JOBS_NODE_ATTR + " attribute [" + maxNumberOfOpenJobsStr + "] is not an integer"; - logger.trace(reason); - reasons.add(reason); - continue; + int maxNumberOfOpenJobs = dynamicMaxOpenJobs; + // TODO: remove this in 8.0.0 + if (allNodesHaveDynamicMaxWorkers == false) { + String maxNumberOfOpenJobsStr = nodeAttributes.get(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR); + try { + maxNumberOfOpenJobs = Integer.parseInt(maxNumberOfOpenJobsStr); + } catch (NumberFormatException e) { + String reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) + "], because " + + MachineLearning.MAX_OPEN_JOBS_NODE_ATTR + " attribute [" + maxNumberOfOpenJobsStr + "] is not an integer"; + logger.trace(reason); + reasons.add(reason); + continue; + } } long availableCount = maxNumberOfOpenJobs - numberOfAssignedJobs; if (availableCount == 0) { @@ -538,6 +545,7 @@ public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecut private volatile int maxConcurrentJobAllocations; private volatile int maxMachineMemoryPercent; private volatile int maxLazyMLNodes; + private volatile int maxOpenJobs; private volatile ClusterState clusterState; public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterService, @@ -550,12 +558,14 @@ public OpenJobPersistentTasksExecutor(Settings settings, ClusterService clusterS this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings); this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings); this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings); + this.maxOpenJobs = MAX_OPEN_JOBS_PER_NODE.get(settings); this.clusterService = clusterService; clusterService.getClusterSettings() .addSettingsUpdateConsumer(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, this::setMaxConcurrentJobAllocations); clusterService.getClusterSettings() .addSettingsUpdateConsumer(MachineLearning.MAX_MACHINE_MEMORY_PERCENT, this::setMaxMachineMemoryPercent); clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_OPEN_JOBS_PER_NODE, this::setMaxOpenJobs); clusterService.addListener(event -> clusterState = event.state()); } @@ -596,6 +606,7 @@ public PersistentTasksCustomMetaData.Assignment getAssignment(OpenJobAction.JobP PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(jobId, params.getJob(), clusterState, + maxOpenJobs, maxConcurrentJobAllocations, maxMachineMemoryPercent, memoryTracker, @@ -672,22 +683,20 @@ protected AllocatedPersistentTask createTask(long id, String type, String action } void setMaxConcurrentJobAllocations(int maxConcurrentJobAllocations) { - logger.info("Changing [{}] from [{}] to [{}]", MachineLearning.CONCURRENT_JOB_ALLOCATIONS.getKey(), - this.maxConcurrentJobAllocations, maxConcurrentJobAllocations); this.maxConcurrentJobAllocations = maxConcurrentJobAllocations; } void setMaxMachineMemoryPercent(int maxMachineMemoryPercent) { - logger.info("Changing [{}] from [{}] to [{}]", MachineLearning.MAX_MACHINE_MEMORY_PERCENT.getKey(), - this.maxMachineMemoryPercent, maxMachineMemoryPercent); this.maxMachineMemoryPercent = maxMachineMemoryPercent; } void setMaxLazyMLNodes(int maxLazyMLNodes) { - logger.info("Changing [{}] from [{}] to [{}]", MachineLearning.MAX_LAZY_ML_NODES.getKey(), - this.maxLazyMLNodes, maxLazyMLNodes); this.maxLazyMLNodes = maxLazyMLNodes; } + + void setMaxOpenJobs(int maxOpenJobs) { + this.maxOpenJobs = maxOpenJobs; + } } public static class JobTask extends AllocatedPersistentTask implements OpenJobAction.JobTaskMatcher { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 6b8eada7406f6..4620cc9af3b5c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -100,17 +100,6 @@ public class AutodetectProcessManager implements ClusterStateListener { - // We should be able from the job config to estimate the memory/cpu a job needs to have, - // and if we know that then we can prior to assigning a job to a node fail based on the - // available resources on that node: https://github.com/elastic/x-pack-elasticsearch/issues/546 - // However, it is useful to also be able to apply a hard limit. - - // WARNING: This setting cannot be made DYNAMIC, because it is tied to several threadpools - // and a threadpool's size can't be changed at runtime. - // See MachineLearning#getExecutorBuilders(...) - public static final Setting MAX_OPEN_JOBS_PER_NODE = - Setting.intSetting("xpack.ml.max_open_jobs", 20, 1, 512, Property.NodeScope); - // Undocumented setting for integration test purposes public static final Setting MIN_DISK_SPACE_OFF_HEAP = Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Property.NodeScope); @@ -134,7 +123,7 @@ public class AutodetectProcessManager implements ClusterStateListener { // a map that manages the allocation of temporary space to jobs private final ConcurrentMap nativeTmpStorage = new ConcurrentHashMap<>(); - private final int maxAllowedRunningJobs; + private volatile int maxAllowedRunningJobs; private final NamedXContentRegistry xContentRegistry; @@ -151,7 +140,7 @@ public AutodetectProcessManager(Environment environment, Settings settings, Clie this.client = client; this.threadPool = threadPool; this.xContentRegistry = xContentRegistry; - this.maxAllowedRunningJobs = MAX_OPEN_JOBS_PER_NODE.get(settings); + this.maxAllowedRunningJobs = MachineLearning.MAX_OPEN_JOBS_PER_NODE.get(settings); this.autodetectProcessFactory = autodetectProcessFactory; this.normalizerFactory = normalizerFactory; this.jobManager = jobManager; @@ -161,6 +150,12 @@ public AutodetectProcessManager(Environment environment, Settings settings, Clie this.auditor = auditor; this.nativeStorageProvider = new NativeStorageProvider(environment, MIN_DISK_SPACE_OFF_HEAP.get(settings)); clusterService.addListener(this); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(MachineLearning.MAX_OPEN_JOBS_PER_NODE, this::setMaxAllowedRunningJobs); + } + + void setMaxAllowedRunningJobs(int maxAllowedRunningJobs) { + this.maxAllowedRunningJobs = maxAllowedRunningJobs; } public void onNodeStartup() { @@ -522,11 +517,14 @@ private void createProcessAndSetRunning(ProcessContext processContext, } AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodetectParams, BiConsumer handler) { - // Closing jobs can still be using some or all threads in MachineLearning.AUTODETECT_THREAD_POOL_NAME + // Copy for consistency within a single method call + int localMaxAllowedRunningJobs = maxAllowedRunningJobs; + // Closing jobs can still be using some or all threads in MachineLearning.JOB_COMMS_THREAD_POOL_NAME // that an open job uses, so include them too when considering if enough threads are available. int currentRunningJobs = processByAllocation.size(); - if (currentRunningJobs > maxAllowedRunningJobs) { - throw new ElasticsearchStatusException("max running job capacity [" + maxAllowedRunningJobs + "] reached", + // TODO: in future this will also need to consider jobs that are not anomaly detector jobs + if (currentRunningJobs > localMaxAllowedRunningJobs) { + throw new ElasticsearchStatusException("max running job capacity [" + localMaxAllowedRunningJobs + "] reached", RestStatus.TOO_MANY_REQUESTS); } @@ -547,7 +545,7 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet } // A TP with no queue, so that we fail immediately if there are no threads available - ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME); + ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME); DataCountsReporter dataCountsReporter = new DataCountsReporter(job, autodetectParams.dataCounts(), jobDataCountsPersister); ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobResultsProvider, new JobRenormalizedResultsPersister(job.getId(), client), normalizerFactory); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java index 2f1aa29d919d9..2c296691c249d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java @@ -19,6 +19,18 @@ public class MachineLearningTests extends ESTestCase { + public void testMaxOpenWorkersSetting_givenDefault() { + int maxOpenWorkers = MachineLearning.MAX_OPEN_JOBS_PER_NODE.get(Settings.EMPTY); + assertEquals(20, maxOpenWorkers); + } + + public void testMaxOpenWorkersSetting_givenSetting() { + Settings.Builder settings = Settings.builder(); + settings.put(MachineLearning.MAX_OPEN_JOBS_PER_NODE.getKey(), 7); + int maxOpenWorkers = MachineLearning.MAX_OPEN_JOBS_PER_NODE.get(settings.build()); + assertEquals(7, maxOpenWorkers); + } + public void testNoAttributes_givenNoClash() { Settings.Builder builder = Settings.builder(); if (randomBoolean()) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index e489a6a9a7c4c..040ed5e1d0ed4 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -75,6 +75,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +// TODO: in 8.0.0 remove all instances of MAX_OPEN_JOBS_NODE_ATTR from this file public class TransportOpenJobActionTests extends ESTestCase { private MlMemoryTracker memoryTracker; @@ -142,12 +143,11 @@ public void testSelectLeastLoadedMlNode_byCount() { jobBuilder.setJobVersion(Version.CURRENT); Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id4", jobBuilder.build(), - cs.build(), 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger); + cs.build(), 10, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger); assertEquals("", result.getExplanation()); assertEquals("_node_id3", result.getExecutorNode()); } - public void testSelectLeastLoadedMlNode_maxCapacity() { int numNodes = randomIntBetween(1, 10); int maxRunningJobsPerNode = randomIntBetween(1, 100); @@ -178,11 +178,11 @@ public void testSelectLeastLoadedMlNode_maxCapacity() { Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id0", new ByteSizeValue(150, ByteSizeUnit.MB)).build(new Date()); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id0", job, cs.build(), 2, + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id0", job, cs.build(), maxRunningJobsPerNode, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger); assertNull(result.getExecutorNode()); - assertTrue(result.getExplanation().contains("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode - + "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]")); + assertTrue(result.getExplanation(), result.getExplanation().contains("because this node is full. Number of opened jobs [" + + maxRunningJobsPerNode + "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]")); } public void testSelectLeastLoadedMlNode_noMlNodes() { @@ -205,7 +205,7 @@ public void testSelectLeastLoadedMlNode_noMlNodes() { Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id2", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 2, 30, memoryTracker, + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 20, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger); assertTrue(result.getExplanation().contains("because this node isn't a ml node")); assertNull(result.getExecutorNode()); @@ -241,7 +241,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id6", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()); ClusterState cs = csBuilder.build(); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 2, 30, memoryTracker, + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 10, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger); assertEquals("_node_id3", result.getExecutorNode()); @@ -252,8 +252,8 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, - logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 10, 2, 30, memoryTracker, + isMemoryTrackerRecentlyRefreshed, logger); assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); @@ -264,8 +264,8 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, - logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 10, 2, 30, memoryTracker, + isMemoryTrackerRecentlyRefreshed, logger); assertNull("no node selected, because stale task", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); @@ -276,8 +276,8 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, - logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 10, 2, 30, memoryTracker, + isMemoryTrackerRecentlyRefreshed, logger); assertNull("no node selected, because null state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } @@ -316,7 +316,7 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id7", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()); // Allocation won't be possible if the stale failed job is treated as opening - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 10, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger); assertEquals("_node_id1", result.getExecutorNode()); @@ -327,8 +327,8 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob() csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, - logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 10, 2, 30, memoryTracker, + isMemoryTrackerRecentlyRefreshed, logger); assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } @@ -360,7 +360,7 @@ public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() { cs.nodes(nodes); metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 2, 30, + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 10, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger); assertThat(result.getExplanation(), containsString("because this node does not support jobs of type [incompatible_type]")); assertNull(result.getExecutorNode()); @@ -391,7 +391,7 @@ public void testSelectLeastLoadedMlNode_noNodesMatchingModelSnapshotMinVersion() cs.nodes(nodes); metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_incompatible_model_snapshot", job, cs.build(), + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_incompatible_model_snapshot", job, cs.build(), 10, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger); assertThat(result.getExplanation(), containsString( "because the job's model snapshot requires a node of version [6.3.0] or higher")); @@ -420,7 +420,7 @@ public void testSelectLeastLoadedMlNode_jobWithRulesButNoNodeMeetsRequiredVersio cs.metaData(metaData); Job job = jobWithRules("job_with_rules"); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 30, memoryTracker, + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 10, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger); assertThat(result.getExplanation(), containsString( "because jobs using custom_rules require a node of version [6.4.0] or higher")); @@ -449,7 +449,7 @@ public void testSelectLeastLoadedMlNode_jobWithRulesAndNodeMeetsRequiredVersion( cs.metaData(metaData); Job job = jobWithRules("job_with_rules"); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 30, memoryTracker, + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 10, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger); assertNotNull(result.getExecutorNode()); } @@ -539,7 +539,7 @@ public void testGetAssignment_GivenJobThatRequiresMigration() { ClusterService clusterService = mock(ClusterService.class); ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT, - MachineLearning.MAX_LAZY_ML_NODES) + MachineLearning.MAX_LAZY_ML_NODES, MachineLearning.MAX_OPEN_JOBS_PER_NODE) ); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); @@ -556,7 +556,7 @@ public void testGetAssignment_GivenUnavailableIndicesWithLazyNode() { ClusterService clusterService = mock(ClusterService.class); ClusterSettings clusterSettings = new ClusterSettings(settings, Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT, - MachineLearning.MAX_LAZY_ML_NODES) + MachineLearning.MAX_LAZY_ML_NODES, MachineLearning.MAX_OPEN_JOBS_PER_NODE) ); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java index f79e9f1e4e945..95106385a9e22 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.settings.Settings; @@ -23,7 +25,6 @@ import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; public class TooManyJobsIT extends BaseMlIntegTestCase { @@ -66,10 +67,10 @@ public void testLazyNodeValidation() throws Exception { int maxNumberOfJobsPerNode = 1; int maxNumberOfLazyNodes = 2; internalCluster().ensureAtMostNumDataNodes(0); - logger.info("[{}] is [{}]", AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode); + logger.info("[{}] is [{}]", MachineLearning.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode); for (int i = 0; i < numNodes; i++) { internalCluster().startNode(Settings.builder() - .put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode)); + .put(MachineLearning.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode)); } logger.info("Started [{}] nodes", numNodes); ensureStableCluster(numNodes); @@ -111,7 +112,7 @@ public void testLazyNodeValidation() throws Exception { // Add another Node so we can get allocated internalCluster().startNode(Settings.builder() - .put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode)); + .put(MachineLearning.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode)); ensureStableCluster(numNodes+1); // We should automatically get allocated and opened to new node @@ -124,15 +125,15 @@ public void testLazyNodeValidation() throws Exception { } public void testSingleNode() throws Exception { - verifyMaxNumberOfJobsLimit(1, randomIntBetween(1, 20)); + verifyMaxNumberOfJobsLimit(1, randomIntBetween(1, 20), randomBoolean()); } public void testMultipleNodes() throws Exception { - verifyMaxNumberOfJobsLimit(3, randomIntBetween(1, 20)); + verifyMaxNumberOfJobsLimit(3, randomIntBetween(1, 20), randomBoolean()); } - private void verifyMaxNumberOfJobsLimit(int numNodes, int maxNumberOfJobsPerNode) throws Exception { - startMlCluster(numNodes, maxNumberOfJobsPerNode); + private void verifyMaxNumberOfJobsLimit(int numNodes, int maxNumberOfJobsPerNode, boolean testDynamicChange) throws Exception { + startMlCluster(numNodes, testDynamicChange ? 1 : maxNumberOfJobsPerNode); long maxMlMemoryPerNode = calculateMaxMlMemory(); ByteSizeValue jobModelMemoryLimit = new ByteSizeValue(2, ByteSizeUnit.MB); long memoryFootprintPerJob = jobModelMemoryLimit.getBytes() + Job.PROCESS_MEMORY_OVERHEAD.getBytes(); @@ -140,6 +141,11 @@ private void verifyMaxNumberOfJobsLimit(int numNodes, int maxNumberOfJobsPerNode int clusterWideMaxNumberOfJobs = numNodes * maxNumberOfJobsPerNode; boolean expectMemoryLimitBeforeCountLimit = maxJobsPerNodeDueToMemoryLimit < maxNumberOfJobsPerNode; for (int i = 1; i <= (clusterWideMaxNumberOfJobs + 1); i++) { + if (i == 2 && testDynamicChange) { + ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings( + Settings.builder().put(MachineLearning.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode).build()); + client().execute(ClusterUpdateSettingsAction.INSTANCE, clusterUpdateSettingsRequest).actionGet(); + } Job.Builder job = createJob("max-number-of-jobs-limit-job-" + Integer.toString(i), jobModelMemoryLimit); PutJobAction.Request putJobRequest = new PutJobAction.Request(job); client().execute(PutJobAction.INSTANCE, putJobRequest).get(); @@ -192,13 +198,13 @@ private void verifyMaxNumberOfJobsLimit(int numNodes, int maxNumberOfJobsPerNode fail("shouldn't be able to add more than [" + clusterWideMaxNumberOfJobs + "] jobs"); } - private void startMlCluster(int numNodes, int maxNumberOfJobsPerNode) throws Exception { + private void startMlCluster(int numNodes, int maxNumberOfWorkersPerNode) throws Exception { // clear all nodes, so that we can set xpack.ml.max_open_jobs setting: internalCluster().ensureAtMostNumDataNodes(0); - logger.info("[{}] is [{}]", AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode); + logger.info("[{}] is [{}]", MachineLearning.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfWorkersPerNode); for (int i = 0; i < numNodes; i++) { internalCluster().startNode(Settings.builder() - .put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfJobsPerNode)); + .put(MachineLearning.MAX_OPEN_JOBS_PER_NODE.getKey(), maxNumberOfWorkersPerNode)); } logger.info("Started [{}] nodes", numNodes); ensureStableCluster(numNodes); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 14b6d08514fad..ee02e5237c605 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -42,6 +43,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests; @@ -144,6 +146,9 @@ public void setup() throws Exception { normalizerFactory = mock(NormalizerFactory.class); auditor = mock(Auditor.class); clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = + new ClusterSettings(Settings.EMPTY, Collections.singleton(MachineLearning.MAX_OPEN_JOBS_PER_NODE)); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); MetaData metaData = mock(MetaData.class); SortedMap aliasOrIndexSortedMap = new TreeMap<>(); aliasOrIndexSortedMap.put(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), mock(AliasOrIndex.Alias.class)); @@ -170,23 +175,10 @@ public void setup() throws Exception { } @After - public void stopThreadPool() throws InterruptedException { + public void stopThreadPool() { terminate(threadPool); } - - public void testMaxOpenJobsSetting_givenDefault() { - int maxOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(Settings.EMPTY); - assertEquals(20, maxOpenJobs); - } - - public void testMaxOpenJobsSetting_givenNewSettingOnly() { - Settings.Builder settings = Settings.builder(); - settings.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), 7); - int maxOpenJobs = AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.get(settings.build()); - assertEquals(7, maxOpenJobs); - } - public void testOpenJob() { Client client = mock(Client.class); AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); @@ -207,7 +199,6 @@ public void testOpenJob() { verify(jobTask).updatePersistentTaskState(eq(new JobTaskState(JobState.OPENED, 1L, null)), any()); } - public void testOpenJob_withoutVersion() { Client client = mock(Client.class); AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); @@ -259,7 +250,7 @@ public void testOpenJob_exceedMaxNumJobs() { AutodetectProcessFactory autodetectProcessFactory = (j, autodetectParams, e, onProcessCrash) -> autodetectProcess; Settings.Builder settings = Settings.builder(); - settings.put(AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE.getKey(), 3); + settings.put(MachineLearning.MAX_OPEN_JOBS_PER_NODE.getKey(), 3); AutodetectProcessManager manager = spy(new AutodetectProcessManager(environment, settings.build(), client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService)); @@ -571,7 +562,7 @@ public void testKillKillsAutodetectProcess() throws IOException { verify(communicator).killProcess(false, false, true); } - public void testKillingAMissingJobFinishesTheTask() throws IOException { + public void testKillingAMissingJobFinishesTheTask() { AutodetectCommunicator communicator = mock(AutodetectCommunicator.class); AutodetectProcessManager manager = createManager(communicator); JobTask jobTask = mock(JobTask.class);