Skip to content

Commit 4dde64e

Browse files
[ML] Extract common native process base class (#34856)
We currently have two different native processes: autodetect & normalizer. There are plans for introducing a new process. All these share many things in common. This commit refactors the processes to extend an `AbstractNativeProcess` class that encapsulates those commonalities with the purpose of reusing the code for new processes in the future.
1 parent 359cbf1 commit 4dde64e

File tree

46 files changed

+560
-492
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+560
-492
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Detector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import org.elasticsearch.common.xcontent.XContentBuilder;
1818
import org.elasticsearch.common.xcontent.XContentParser;
1919
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
20-
import org.elasticsearch.xpack.core.ml.job.process.autodetect.writer.RecordWriter;
20+
import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter;
2121
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
2222
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
2323

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* or more contributor license agreements. Licensed under the Elastic License;
44
* you may not use this file except in compliance with the Elastic License.
55
*/
6-
package org.elasticsearch.xpack.core.ml.job.process.autodetect.writer;
6+
package org.elasticsearch.xpack.core.ml.process.writer;
77

88
import java.io.IOException;
99
import java.util.List;

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import org.elasticsearch.test.AbstractSerializingTestCase;
1313
import org.elasticsearch.test.ESTestCase;
1414
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
15-
import org.elasticsearch.xpack.core.ml.job.process.autodetect.writer.RecordWriter;
15+
import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter;
1616

1717
import java.util.ArrayList;
1818
import java.util.Arrays;

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/DetectorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import org.elasticsearch.test.AbstractSerializingTestCase;
1313
import org.elasticsearch.test.ESTestCase;
1414
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
15-
import org.elasticsearch.xpack.core.ml.job.process.autodetect.writer.RecordWriter;
15+
import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter;
1616

1717
import java.util.ArrayList;
1818
import java.util.Arrays;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,6 @@
169169
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
170170
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
171171
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
172-
import org.elasticsearch.xpack.ml.job.process.NativeController;
173-
import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
174172
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectBuilder;
175173
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessFactory;
176174
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
@@ -181,6 +179,8 @@
181179
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
182180
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory;
183181
import org.elasticsearch.xpack.ml.notifications.Auditor;
182+
import org.elasticsearch.xpack.ml.process.NativeController;
183+
import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
184184
import org.elasticsearch.xpack.ml.rest.RestDeleteExpiredDataAction;
185185
import org.elasticsearch.xpack.ml.rest.RestFindFileStructureAction;
186186
import org.elasticsearch.xpack.ml.rest.RestMlInfoAction;
@@ -391,7 +391,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
391391
nativeController,
392392
client,
393393
clusterService);
394-
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, settings, nativeController);
394+
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController);
395395
} catch (IOException e) {
396396
// This also should not happen in production, as the MachineLearningFeatureSet should have
397397
// hit the same error first and brought down the node with a friendlier error message
@@ -401,8 +401,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
401401
autodetectProcessFactory = (job, autodetectParams, executorService, onProcessCrash) ->
402402
new BlackHoleAutodetectProcess(job.getId());
403403
// factor of 1.0 makes renormalization a no-op
404-
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) ->
405-
new MultiplyingNormalizerProcess(settings, 1.0);
404+
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0);
406405
}
407406
NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory,
408407
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
3333
import org.elasticsearch.xpack.core.ml.job.config.Job;
3434
import org.elasticsearch.xpack.core.ml.job.config.JobState;
35-
import org.elasticsearch.xpack.ml.job.process.NativeController;
36-
import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
35+
import org.elasticsearch.xpack.ml.process.NativeController;
36+
import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
3737
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
3838
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
3939
import org.elasticsearch.xpack.core.ml.stats.StatsAccumulator;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
import org.elasticsearch.common.component.LifecycleListener;
1111
import org.elasticsearch.env.Environment;
1212
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
13-
import org.elasticsearch.xpack.ml.job.process.NativeController;
14-
import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
13+
import org.elasticsearch.xpack.ml.process.NativeController;
14+
import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
1515
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
1616

1717
import java.io.IOException;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
2020
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
2121
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
22-
import org.elasticsearch.xpack.ml.job.process.NativeController;
22+
import org.elasticsearch.xpack.ml.process.NativeController;
2323
import org.elasticsearch.xpack.ml.job.process.ProcessBuilderUtils;
24-
import org.elasticsearch.xpack.ml.job.process.ProcessPipes;
24+
import org.elasticsearch.xpack.ml.process.ProcessPipes;
2525
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AnalysisLimitsWriter;
2626
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.FieldConfigWriter;
2727
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ModelPlotConfigWriter;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ public void forecastJob(ForecastParams params, BiConsumer<Void, Exception> handl
263263

264264
public void persistJob(BiConsumer<Void, Exception> handler) {
265265
submitOperation(() -> {
266-
autodetectProcess.persistJob();
266+
autodetectProcess.persistState();
267267
return null;
268268
}, handler);
269269
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java

Lines changed: 3 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,22 @@
1010
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
1111
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
1212
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
13+
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
1314
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
1415
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
1516
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
1617
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
17-
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
1818
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
19+
import org.elasticsearch.xpack.ml.process.NativeProcess;
1920

20-
import java.io.Closeable;
2121
import java.io.IOException;
22-
import java.time.ZonedDateTime;
2322
import java.util.Iterator;
2423
import java.util.List;
2524

2625
/**
2726
* Interface representing the native C++ autodetect process
2827
*/
29-
public interface AutodetectProcess extends Closeable {
28+
public interface AutodetectProcess extends NativeProcess {
3029

3130
/**
3231
* Restore state from the given {@link ModelSnapshot}
@@ -35,22 +34,6 @@ public interface AutodetectProcess extends Closeable {
3534
*/
3635
void restoreState(StateStreamer stateStreamer, ModelSnapshot modelSnapshot);
3736

38-
/**
39-
* Is the process ready to receive data?
40-
* @return {@code true} if the process is ready to receive data
41-
*/
42-
boolean isReady();
43-
44-
/**
45-
* Write the record to autodetect. The record parameter should not be encoded
46-
* (i.e. length encoded) the implementation will appy the corrrect encoding.
47-
*
48-
* @param record Plain array of strings, implementors of this class should
49-
* encode the record appropriately
50-
* @throws IOException If the write failed
51-
*/
52-
void writeRecord(String[] record) throws IOException;
53-
5437
/**
5538
* Write the reset buckets control message
5639
*
@@ -115,60 +98,8 @@ void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rule
11598
*/
11699
void forecastJob(ForecastParams params) throws IOException;
117100

118-
/**
119-
* Ask the job to start persisting model state in the background
120-
* @throws IOException If writing the request fails
121-
*/
122-
void persistJob() throws IOException;
123-
124-
/**
125-
* Flush the output data stream
126-
*/
127-
void flushStream() throws IOException;
128-
129-
/**
130-
* Kill the process. Do not wait for it to stop gracefully.
131-
*/
132-
void kill() throws IOException;
133-
134101
/**
135102
* @return stream of autodetect results.
136103
*/
137104
Iterator<AutodetectResult> readAutodetectResults();
138-
139-
/**
140-
* The time the process was started
141-
* @return Process start time
142-
*/
143-
ZonedDateTime getProcessStartTime();
144-
145-
/**
146-
* Returns true if the process still running.
147-
* Methods such as {@link #flushJob(FlushJobParams)} are essentially
148-
* asynchronous the command will be continue to execute in the process after
149-
* the call has returned. This method tests whether something catastrophic
150-
* occurred in the process during its execution.
151-
* @return True if the process is still running
152-
*/
153-
boolean isProcessAlive();
154-
155-
/**
156-
* Check whether autodetect terminated given maximum 45ms for termination
157-
*
158-
* Processing errors are highly likely caused by autodetect being unexpectedly
159-
* terminated.
160-
*
161-
* Workaround: As we can not easily check if autodetect is alive, we rely on
162-
* the logPipe being ended. As the loghandler runs in another thread which
163-
* might fall behind this one, we give it a grace period of 45ms.
164-
*
165-
* @return false if process has ended for sure, true if it probably still runs
166-
*/
167-
boolean isProcessAliveAfterWaiting();
168-
169-
/**
170-
* Read any content in the error output buffer.
171-
* @return An error message or empty String if no error.
172-
*/
173-
String readError();
174105
}

0 commit comments

Comments
 (0)