Skip to content

Commit c95d181

Browse files
committed
Make certain ML node settings dynamic (#33565) (#33961)
* Make certain ML node settings dynamic (#33565) * Changing to pull in updating settings and pass to constructor * adding note about only newly opened jobs getting updated value
1 parent 8233e31 commit c95d181

File tree

9 files changed

+166
-44
lines changed

9 files changed

+166
-44
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,8 +297,11 @@ public List<Setting<?>> getSettings() {
297297
MAX_MACHINE_MEMORY_PERCENT,
298298
AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING,
299299
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING,
300+
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC,
300301
DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING,
302+
DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING,
301303
DataCountsReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING,
304+
DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING,
302305
AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE,
303306
AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE,
304307
AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP));
@@ -384,7 +387,12 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
384387
// This will only only happen when path.home is not set, which is disallowed in production
385388
throw new ElasticsearchException("Failed to create native process controller for Machine Learning");
386389
}
387-
autodetectProcessFactory = new NativeAutodetectProcessFactory(environment, settings, nativeController, client);
390+
autodetectProcessFactory = new NativeAutodetectProcessFactory(
391+
environment,
392+
settings,
393+
nativeController,
394+
client,
395+
clusterService);
388396
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, settings, nativeController);
389397
} catch (IOException e) {
390398
// This also should not happen in production, as the MachineLearningFeatureSet should have
@@ -402,7 +410,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
402410
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));
403411
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(env, settings, client, threadPool,
404412
jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
405-
normalizerFactory, xContentRegistry, auditor);
413+
normalizerFactory, xContentRegistry, auditor, clusterService);
406414
this.autodetectProcessManager.set(autodetectProcessManager);
407415
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis);
408416
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/DataCountsReporter.java

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.elasticsearch.xpack.ml.job.process;
77

88
import org.elasticsearch.action.ActionListener;
9+
import org.elasticsearch.cluster.service.ClusterService;
910
import org.elasticsearch.common.component.AbstractComponent;
1011
import org.elasticsearch.common.settings.Setting;
1112
import org.elasticsearch.common.settings.Setting.Property;
@@ -42,15 +43,28 @@ public class DataCountsReporter extends AbstractComponent {
4243
* The max percentage of date parse errors allowed before
4344
* an exception is thrown.
4445
*/
46+
@Deprecated
4547
public static final Setting<Integer> ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING = Setting.intSetting("max.percent.date.errors", 25,
46-
Property.NodeScope);
47-
48+
Property.NodeScope, Property.Deprecated);
49+
public static final Setting<Integer> MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING = Setting.intSetting(
50+
"xpack.ml.max_percent_date_errors",
51+
ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING,
52+
0,
53+
Property.Dynamic,
54+
Property.NodeScope);
4855
/**
4956
* The max percentage of out of order records allowed before
5057
* an exception is thrown.
5158
*/
59+
@Deprecated
5260
public static final Setting<Integer> ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING = Setting
53-
.intSetting("max.percent.outoforder.errors", 25, Property.NodeScope);
61+
.intSetting("max.percent.outoforder.errors", 25, Property.NodeScope, Property.Deprecated);
62+
public static final Setting<Integer> MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING = Setting.intSetting(
63+
"xpack.ml.max_percent_out_of_order_errors",
64+
ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING,
65+
0,
66+
Property.Dynamic,
67+
Property.NodeScope);
5468

5569
private static final TimeValue PERSIST_INTERVAL = TimeValue.timeValueMillis(10_000L);
5670

@@ -66,14 +80,15 @@ public class DataCountsReporter extends AbstractComponent {
6680
private long logEvery = 1;
6781
private long logCount = 0;
6882

69-
private final int acceptablePercentDateParseErrors;
70-
private final int acceptablePercentOutOfOrderErrors;
83+
private volatile int acceptablePercentDateParseErrors;
84+
private volatile int acceptablePercentOutOfOrderErrors;
7185

7286
private Function<Long, Boolean> reportingBoundaryFunction;
7387

7488
private DataStreamDiagnostics diagnostics;
7589

76-
public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobDataCountsPersister dataCountsPersister) {
90+
public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobDataCountsPersister dataCountsPersister,
91+
ClusterService clusterService) {
7792

7893
super(settings);
7994

@@ -84,9 +99,12 @@ public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobData
8499
incrementalRecordStats = new DataCounts(job.getId());
85100
diagnostics = new DataStreamDiagnostics(job, counts);
86101

87-
acceptablePercentDateParseErrors = ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.get(settings);
88-
acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings);
89-
102+
acceptablePercentDateParseErrors = MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.get(settings);
103+
acceptablePercentOutOfOrderErrors = MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.get(settings);
104+
clusterService.getClusterSettings()
105+
.addSettingsUpdateConsumer(MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING, this::setAcceptablePercentDateParseErrors);
106+
clusterService.getClusterSettings()
107+
.addSettingsUpdateConsumer(MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING, this::setAcceptablePercentOutOfOrderErrors);
90108
reportingBoundaryFunction = this::reportEvery10000Records;
91109
}
92110

@@ -352,4 +370,17 @@ private void retrieveDiagnosticsIntermediateResults() {
352370

353371
diagnostics.resetCounts();
354372
}
373+
374+
private void setAcceptablePercentDateParseErrors(int acceptablePercentDateParseErrors) {
375+
logger.info("Changing [{}] from [{}] to [{}]", MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.getKey(),
376+
this.acceptablePercentDateParseErrors, acceptablePercentDateParseErrors);
377+
this.acceptablePercentDateParseErrors = acceptablePercentDateParseErrors;
378+
}
379+
380+
private void setAcceptablePercentOutOfOrderErrors(int acceptablePercentOutOfOrderErrors) {
381+
logger.info("Changing [{}] from [{}] to [{}]", MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.getKey(),
382+
this.acceptablePercentOutOfOrderErrors, acceptablePercentOutOfOrderErrors);
383+
this.acceptablePercentOutOfOrderErrors = acceptablePercentOutOfOrderErrors;
384+
}
385+
355386
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,16 @@ public class AutodetectBuilder {
8383
/**
8484
* The maximum number of anomaly records that will be written each bucket
8585
*/
86+
@Deprecated
8687
public static final Setting<Integer> MAX_ANOMALY_RECORDS_SETTING = Setting.intSetting("max.anomaly.records", DEFAULT_MAX_NUM_RECORDS,
87-
Setting.Property.NodeScope);
88+
Setting.Property.NodeScope, Setting.Property.Deprecated);
89+
// Though this setting is dynamic, it is only set when a new job is opened. So, already runnin jobs will not get the updated value.
90+
public static final Setting<Integer> MAX_ANOMALY_RECORDS_SETTING_DYNAMIC = Setting.intSetting(
91+
"xpack.ml.max_anomaly_records",
92+
MAX_ANOMALY_RECORDS_SETTING,
93+
1,
94+
Setting.Property.NodeScope,
95+
Setting.Property.Dynamic);
8896

8997
/**
9098
* Config setting storing the flag that disables model persistence
@@ -244,9 +252,8 @@ List<String> buildAutodetectCommand() {
244252
return command;
245253
}
246254

247-
248255
static String maxAnomalyRecordsArg(Settings settings) {
249-
return "--maxAnomalyRecords=" + MAX_ANOMALY_RECORDS_SETTING.get(settings);
256+
return "--maxAnomalyRecords=" + MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.get(settings);
250257
}
251258

252259
private static String getTimeFieldOrDefault(Job job) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.elasticsearch.ElasticsearchStatusException;
1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.client.Client;
12+
import org.elasticsearch.cluster.service.ClusterService;
1213
import org.elasticsearch.common.CheckedConsumer;
1314
import org.elasticsearch.common.collect.Tuple;
1415
import org.elasticsearch.common.component.AbstractComponent;
@@ -130,12 +131,13 @@ public class AutodetectProcessManager extends AbstractComponent {
130131
private final NamedXContentRegistry xContentRegistry;
131132

132133
private final Auditor auditor;
134+
private final ClusterService clusterService;
133135

134136
public AutodetectProcessManager(Environment environment, Settings settings, Client client, ThreadPool threadPool,
135137
JobManager jobManager, JobResultsProvider jobResultsProvider, JobResultsPersister jobResultsPersister,
136138
JobDataCountsPersister jobDataCountsPersister,
137139
AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory,
138-
NamedXContentRegistry xContentRegistry, Auditor auditor) {
140+
NamedXContentRegistry xContentRegistry, Auditor auditor, ClusterService clusterService) {
139141
super(settings);
140142
this.environment = environment;
141143
this.client = client;
@@ -150,6 +152,7 @@ public AutodetectProcessManager(Environment environment, Settings settings, Clie
150152
this.jobDataCountsPersister = jobDataCountsPersister;
151153
this.auditor = auditor;
152154
this.nativeStorageProvider = new NativeStorageProvider(environment, MIN_DISK_SPACE_OFF_HEAP.get(settings));
155+
this.clusterService = clusterService;
153156
}
154157

155158
public void onNodeStartup() {
@@ -493,8 +496,11 @@ AutodetectCommunicator create(JobTask jobTask, AutodetectParams autodetectParams
493496
Job job = jobManager.getJobOrThrowIfUnknown(jobId);
494497
// A TP with no queue, so that we fail immediately if there are no threads available
495498
ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.AUTODETECT_THREAD_POOL_NAME);
496-
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings, job, autodetectParams.dataCounts(),
497-
jobDataCountsPersister);
499+
DataCountsReporter dataCountsReporter = new DataCountsReporter(settings,
500+
job,
501+
autodetectParams.dataCounts(),
502+
jobDataCountsPersister,
503+
clusterService);
498504
ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobResultsProvider,
499505
new JobRenormalizedResultsPersister(job.getId(), settings, client), normalizerFactory);
500506
ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.apache.logging.log4j.Logger;
99
import org.elasticsearch.client.Client;
10+
import org.elasticsearch.cluster.service.ClusterService;
1011
import org.elasticsearch.common.logging.Loggers;
1112
import org.elasticsearch.common.settings.Settings;
1213
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
@@ -40,12 +41,15 @@ public class NativeAutodetectProcessFactory implements AutodetectProcessFactory
4041
private final Environment env;
4142
private final Settings settings;
4243
private final NativeController nativeController;
44+
private final ClusterService clusterService;
4345

44-
public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client) {
46+
public NativeAutodetectProcessFactory(Environment env, Settings settings, NativeController nativeController, Client client,
47+
ClusterService clusterService) {
4548
this.env = Objects.requireNonNull(env);
4649
this.settings = Objects.requireNonNull(settings);
4750
this.nativeController = Objects.requireNonNull(nativeController);
4851
this.client = client;
52+
this.clusterService = clusterService;
4953
}
5054

5155
@Override
@@ -85,8 +89,15 @@ public AutodetectProcess createAutodetectProcess(Job job,
8589
private void createNativeProcess(Job job, AutodetectParams autodetectParams, ProcessPipes processPipes,
8690
List<Path> filesToDelete) {
8791
try {
92+
93+
Settings updatedSettings = Settings.builder()
94+
.put(settings)
95+
.put(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC.getKey(),
96+
clusterService.getClusterSettings().get(AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC))
97+
.build();
98+
8899
AutodetectBuilder autodetectBuilder = new AutodetectBuilder(job, filesToDelete, LOGGER, env,
89-
settings, nativeController, processPipes)
100+
updatedSettings, nativeController, processPipes)
90101
.referencedFilters(autodetectParams.filters())
91102
.scheduledEvents(autodetectParams.scheduledEvents());
92103

@@ -95,7 +106,6 @@ private void createNativeProcess(Job job, AutodetectParams autodetectParams, Pro
95106
if (autodetectParams.quantiles() != null) {
96107
autodetectBuilder.quantiles(autodetectParams.quantiles());
97108
}
98-
99109
autodetectBuilder.build();
100110
processPipes.connectStreams(PROCESS_STARTUP_TIMEOUT);
101111
} catch (IOException e) {
@@ -104,5 +114,6 @@ private void createNativeProcess(Job job, AutodetectParams autodetectParams, Pro
104114
throw ExceptionsHelper.serverError(msg, e);
105115
}
106116
}
117+
107118
}
108119

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/CountingInputStreamTests.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,44 @@
55
*/
66
package org.elasticsearch.xpack.ml.job.process;
77

8+
import org.elasticsearch.cluster.service.ClusterService;
9+
import org.elasticsearch.common.settings.ClusterSettings;
10+
import org.elasticsearch.common.settings.Setting;
11+
import org.elasticsearch.common.settings.Settings;
812
import org.elasticsearch.test.ESTestCase;
13+
import org.junit.Before;
914

1015
import java.io.ByteArrayInputStream;
1116
import java.io.IOException;
1217
import java.io.InputStream;
1318
import java.nio.charset.StandardCharsets;
19+
import java.util.HashSet;
20+
import java.util.Set;
21+
22+
import static org.elasticsearch.mock.orig.Mockito.when;
23+
import static org.mockito.Mockito.mock;
1424

1525
public class CountingInputStreamTests extends ESTestCase {
1626

27+
private ClusterService clusterService;
28+
29+
@Before
30+
public void setUpMocks() {
31+
Settings settings = Settings.builder().put(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING.getKey(), 10)
32+
.put(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING.getKey(), 10)
33+
.build();
34+
Set<Setting<?>> setOfSettings = new HashSet<>();
35+
setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_DATE_PARSE_ERRORS_SETTING);
36+
setOfSettings.add(DataCountsReporter.MAX_ACCEPTABLE_PERCENT_OF_OUT_OF_ORDER_ERRORS_SETTING);
37+
ClusterSettings clusterSettings = new ClusterSettings(settings, setOfSettings);
38+
39+
clusterService = mock(ClusterService.class);
40+
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
41+
}
42+
1743
public void testRead_OneByteAtATime() throws IOException {
1844

19-
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
45+
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);
2046

2147
final String TEXT = "123";
2248
InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8));
@@ -30,7 +56,7 @@ public void testRead_OneByteAtATime() throws IOException {
3056
public void testRead_WithBuffer() throws IOException {
3157
final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail.";
3258

33-
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
59+
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);
3460

3561
InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8));
3662

@@ -44,7 +70,7 @@ public void testRead_WithBuffer() throws IOException {
4470
public void testRead_WithTinyBuffer() throws IOException {
4571
final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail.";
4672

47-
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
73+
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);
4874

4975
InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8));
5076

@@ -57,7 +83,7 @@ public void testRead_WithTinyBuffer() throws IOException {
5783

5884
public void testRead_WithResets() throws IOException {
5985

60-
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter();
86+
DummyDataCountsReporter dataCountsReporter = new DummyDataCountsReporter(clusterService);
6187

6288
final String TEXT = "To the man who only has a hammer, everything he encounters begins to look like a nail.";
6389
InputStream source = new ByteArrayInputStream(TEXT.getBytes(StandardCharsets.UTF_8));

0 commit comments

Comments
 (0)