Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.writer.RecordWriter;
import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.job.process.autodetect.writer;
package org.elasticsearch.xpack.core.ml.process.writer;

import java.io.IOException;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.writer.RecordWriter;
import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.writer.RecordWriter;
import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.process.NativeController;
import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectBuilder;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessFactory;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
Expand All @@ -180,6 +178,8 @@
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
import org.elasticsearch.xpack.ml.rest.RestDeleteExpiredDataAction;
import org.elasticsearch.xpack.ml.rest.RestFindFileStructureAction;
import org.elasticsearch.xpack.ml.rest.RestMlInfoAction;
Expand Down Expand Up @@ -386,7 +386,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
nativeController,
client,
clusterService);
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, settings, nativeController);
normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController);
} catch (IOException e) {
// This also should not happen in production, as the MachineLearningFeatureSet should have
// hit the same error first and brought down the node with a friendlier error message
Expand All @@ -396,8 +396,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
autodetectProcessFactory = (job, autodetectParams, executorService, onProcessCrash) ->
new BlackHoleAutodetectProcess(job.getId());
// factor of 1.0 makes renormalization a no-op
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) ->
new MultiplyingNormalizerProcess(settings, 1.0);
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0);
}
NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory,
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.NativeController;
import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.xpack.core.ml.stats.StatsAccumulator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.env.Environment;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.job.process.NativeController;
import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.ml.job.process.NativeController;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.job.process.ProcessBuilderUtils;
import org.elasticsearch.xpack.ml.job.process.ProcessPipes;
import org.elasticsearch.xpack.ml.process.ProcessPipes;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AnalysisLimitsWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.FieldConfigWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ModelPlotConfigWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public void forecastJob(ForecastParams params, BiConsumer<Void, Exception> handl

public void persistJob(BiConsumer<Void, Exception> handler) {
submitOperation(() -> {
autodetectProcess.persistJob();
autodetectProcess.persistState();
return null;
}, handler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,22 @@
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.process.NativeProcess;

import java.io.Closeable;
import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.Iterator;
import java.util.List;

/**
* Interface representing the native C++ autodetect process
*/
public interface AutodetectProcess extends Closeable {
public interface AutodetectProcess extends NativeProcess {

/**
* Restore state from the given {@link ModelSnapshot}
Expand All @@ -35,22 +34,6 @@ public interface AutodetectProcess extends Closeable {
*/
void restoreState(StateStreamer stateStreamer, ModelSnapshot modelSnapshot);

/**
* Is the process ready to receive data?
* @return {@code true} if the process is ready to receive data
*/
boolean isReady();

/**
* Write the record to autodetect. The record parameter should not be encoded
* (i.e. length encoded) the implementation will appy the corrrect encoding.
*
* @param record Plain array of strings, implementors of this class should
* encode the record appropriately
* @throws IOException If the write failed
*/
void writeRecord(String[] record) throws IOException;

/**
* Write the reset buckets control message
*
Expand Down Expand Up @@ -115,60 +98,8 @@ void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rule
*/
void forecastJob(ForecastParams params) throws IOException;

/**
* Ask the job to start persisting model state in the background
* @throws IOException If writing the request fails
*/
void persistJob() throws IOException;

/**
* Flush the output data stream
*/
void flushStream() throws IOException;

/**
* Kill the process. Do not wait for it to stop gracefully.
*/
void kill() throws IOException;

/**
* @return stream of autodetect results.
*/
Iterator<AutodetectResult> readAutodetectResults();

/**
* The time the process was started
* @return Process start time
*/
ZonedDateTime getProcessStartTime();

/**
* Returns true if the process still running.
* Methods such as {@link #flushJob(FlushJobParams)} are essentially
* asynchronous the command will be continue to execute in the process after
* the call has returned. This method tests whether something catastrophic
* occurred in the process during its execution.
* @return True if the process is still running
*/
boolean isProcessAlive();

/**
* Check whether autodetect terminated given maximum 45ms for termination
*
* Processing errors are highly likely caused by autodetect being unexpectedly
* terminated.
*
* Workaround: As we can not easily check if autodetect is alive, we rely on
* the logPipe being ended. As the loghandler runs in another thread which
* might fall behind this one, we give it a grace period of 45ms.
*
* @return false if process has ended for sure, true if it probably still runs
*/
boolean isProcessAliveAfterWaiting();

/**
* Read any content in the error output buffer.
* @return An error message or empty String if no error.
*/
String readError();
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.NativeStorageProvider;
import org.elasticsearch.xpack.ml.process.NativeStorageProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public String flushJob(FlushJobParams params) throws IOException {
}

@Override
public void persistJob() {
public void persistState() {
}

@Override
Expand Down
Loading