diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Detector.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Detector.java index b6275c6e0579a..d53e4cb74126d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Detector.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Detector.java @@ -16,7 +16,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.core.ml.job.messages.Messages; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.writer.RecordWriter; +import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/writer/RecordWriter.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/process/writer/RecordWriter.java similarity index 93% rename from x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/writer/RecordWriter.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/process/writer/RecordWriter.java index 61b904246d50f..b66fd948a5a83 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/process/autodetect/writer/RecordWriter.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/process/writer/RecordWriter.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.core.ml.job.process.autodetect.writer; +package org.elasticsearch.xpack.core.ml.process.writer; import java.io.IOException; import java.util.List; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java index d691124a90a43..8843a336bde3d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java @@ -12,7 +12,7 @@ import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.job.messages.Messages; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.writer.RecordWriter; +import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter; import java.util.ArrayList; import java.util.Arrays; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/DetectorTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/DetectorTests.java index 2f7eab0e97c70..fe546a371816d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/DetectorTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/DetectorTests.java @@ -12,7 +12,7 @@ import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.job.messages.Messages; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.writer.RecordWriter; +import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter; import java.util.ArrayList; import java.util.Arrays; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index cdd3af133f6dc..2e90e678351c4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -168,8 +168,6 @@ import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; -import org.elasticsearch.xpack.ml.job.process.NativeController; -import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectBuilder; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessFactory; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; @@ -180,6 +178,8 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.ml.process.NativeController; +import org.elasticsearch.xpack.ml.process.NativeControllerHolder; import org.elasticsearch.xpack.ml.rest.RestDeleteExpiredDataAction; import org.elasticsearch.xpack.ml.rest.RestFindFileStructureAction; import org.elasticsearch.xpack.ml.rest.RestMlInfoAction; @@ -386,7 +386,7 @@ public Collection createComponents(Client client, ClusterService cluster nativeController, client, clusterService); - normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, settings, nativeController); + normalizerProcessFactory = new NativeNormalizerProcessFactory(environment, nativeController); } catch (IOException e) { // This also should not happen in production, as the MachineLearningFeatureSet should have // hit the same error first and brought down the node with a friendlier error message @@ -396,8 +396,7 @@ public Collection createComponents(Client client, ClusterService cluster autodetectProcessFactory = (job, autodetectParams, executorService, onProcessCrash) -> new BlackHoleAutodetectProcess(job.getId()); // factor of 1.0 makes renormalization a no-op - normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> - new MultiplyingNormalizerProcess(settings, 1.0); + normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0); } NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory, threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java index b5ff2e2a7de6e..d9b8ea7cd4226 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java @@ -31,8 +31,8 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.ml.job.process.NativeController; -import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder; +import org.elasticsearch.xpack.ml.process.NativeController; +import org.elasticsearch.xpack.ml.process.NativeControllerHolder; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.stats.ForecastStats; import org.elasticsearch.xpack.core.ml.stats.StatsAccumulator; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java index b0a0eebc49df3..efc0517900ec4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java @@ -10,8 +10,8 @@ import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; -import org.elasticsearch.xpack.ml.job.process.NativeController; -import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder; +import org.elasticsearch.xpack.ml.process.NativeController; +import org.elasticsearch.xpack.ml.process.NativeControllerHolder; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import java.io.IOException; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java index 4942200606dba..dbc565fc50c12 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilder.java @@ -19,9 +19,9 @@ import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles; -import org.elasticsearch.xpack.ml.job.process.NativeController; +import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.job.process.ProcessBuilderUtils; -import org.elasticsearch.xpack.ml.job.process.ProcessPipes; +import org.elasticsearch.xpack.ml.process.ProcessPipes; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AnalysisLimitsWriter; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.FieldConfigWriter; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ModelPlotConfigWriter; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 0206bd88245b3..3f93d46b72737 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -264,7 +264,7 @@ public void forecastJob(ForecastParams params, BiConsumer handl public void persistJob(BiConsumer handler) { submitOperation(() -> { - autodetectProcess.persistJob(); + autodetectProcess.persistState(); return null; }, handler); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java index 21be815d561a8..dab0c5aa49872 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java @@ -10,23 +10,22 @@ import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; +import org.elasticsearch.xpack.ml.process.NativeProcess; -import java.io.Closeable; import java.io.IOException; -import java.time.ZonedDateTime; import java.util.Iterator; import java.util.List; /** * Interface representing the native C++ autodetect process */ -public interface AutodetectProcess extends Closeable { +public interface AutodetectProcess extends NativeProcess { /** * Restore state from the given {@link ModelSnapshot} @@ -35,22 +34,6 @@ public interface AutodetectProcess extends Closeable { */ void restoreState(StateStreamer stateStreamer, ModelSnapshot modelSnapshot); - /** - * Is the process ready to receive data? - * @return {@code true} if the process is ready to receive data - */ - boolean isReady(); - - /** - * Write the record to autodetect. The record parameter should not be encoded - * (i.e. length encoded) the implementation will appy the corrrect encoding. - * - * @param record Plain array of strings, implementors of this class should - * encode the record appropriately - * @throws IOException If the write failed - */ - void writeRecord(String[] record) throws IOException; - /** * Write the reset buckets control message * @@ -115,60 +98,8 @@ void writeUpdateDetectorRulesMessage(int detectorIndex, List rule */ void forecastJob(ForecastParams params) throws IOException; - /** - * Ask the job to start persisting model state in the background - * @throws IOException If writing the request fails - */ - void persistJob() throws IOException; - - /** - * Flush the output data stream - */ - void flushStream() throws IOException; - - /** - * Kill the process. Do not wait for it to stop gracefully. - */ - void kill() throws IOException; - /** * @return stream of autodetect results. */ Iterator readAutodetectResults(); - - /** - * The time the process was started - * @return Process start time - */ - ZonedDateTime getProcessStartTime(); - - /** - * Returns true if the process still running. - * Methods such as {@link #flushJob(FlushJobParams)} are essentially - * asynchronous the command will be continue to execute in the process after - * the call has returned. This method tests whether something catastrophic - * occurred in the process during its execution. - * @return True if the process is still running - */ - boolean isProcessAlive(); - - /** - * Check whether autodetect terminated given maximum 45ms for termination - * - * Processing errors are highly likely caused by autodetect being unexpectedly - * terminated. - * - * Workaround: As we can not easily check if autodetect is alive, we rely on - * the logPipe being ended. As the loghandler runs in another thread which - * might fall behind this one, we give it a grace period of 45ms. - * - * @return false if process has ended for sure, true if it probably still runs - */ - boolean isProcessAliveAfterWaiting(); - - /** - * Read any content in the error output buffer. - * @return An error message or empty String if no error. - */ - String readError(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index ec6b67da1dca8..8dbc13038c7f7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -50,7 +50,7 @@ import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; -import org.elasticsearch.xpack.ml.job.process.NativeStorageProvider; +import org.elasticsearch.xpack.ml.process.NativeStorageProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java index 8ff54e80785c5..e1b69d78894db 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/BlackHoleAutodetectProcess.java @@ -96,7 +96,7 @@ public String flushJob(FlushJobParams params) throws IOException { } @Override - public void persistJob() { + public void persistState() { } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java index faae29fd1eb56..112805b2f7414 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcess.java @@ -5,300 +5,116 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.job.config.DetectionRule; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; -import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; -import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter; -import org.elasticsearch.xpack.ml.job.process.autodetect.writer.LengthEncodedWriter; -import org.elasticsearch.xpack.ml.job.process.logging.CppLogMessageHandler; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; -import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.process.AbstractNativeProcess; -import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.file.Files; import java.nio.file.Path; -import java.time.Duration; -import java.time.ZonedDateTime; import java.util.Iterator; import java.util.List; -import java.util.Objects; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** * Autodetect process using native code. */ -class NativeAutodetectProcess implements AutodetectProcess { - private static final Logger LOGGER = Loggers.getLogger(NativeAutodetectProcess.class); +class NativeAutodetectProcess extends AbstractNativeProcess implements AutodetectProcess { - private static final Duration WAIT_FOR_KILL_TIMEOUT = Duration.ofMillis(1000); + private static final Logger LOGGER = LogManager.getLogger(NativeAutodetectProcess.class); + + private static final String NAME = "autodetect"; - private final String jobId; - private final CppLogMessageHandler cppLogHandler; - private final OutputStream processInStream; - private final InputStream processOutStream; - private final OutputStream processRestoreStream; - private final LengthEncodedWriter recordWriter; - private final ZonedDateTime startTime; - private final int numberOfFields; - private final List filesToDelete; - private final Runnable onProcessCrash; - private volatile Future logTailFuture; - private volatile Future stateProcessorFuture; - private volatile boolean processCloseInitiated; - private volatile boolean processKilled; - private volatile boolean isReady; private final AutodetectResultsParser resultsParser; NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, List filesToDelete, AutodetectResultsParser resultsParser, Runnable onProcessCrash) { - this.jobId = jobId; - cppLogHandler = new CppLogMessageHandler(jobId, logStream); - this.processInStream = new BufferedOutputStream(processInStream); - this.processOutStream = processOutStream; - this.processRestoreStream = processRestoreStream; - this.recordWriter = new LengthEncodedWriter(this.processInStream); - startTime = ZonedDateTime.now(); - this.numberOfFields = numberOfFields; - this.filesToDelete = filesToDelete; + super(jobId, logStream, processInStream, processOutStream, processRestoreStream, numberOfFields, filesToDelete, onProcessCrash); this.resultsParser = resultsParser; - this.onProcessCrash = Objects.requireNonNull(onProcessCrash); } - public void start(ExecutorService executorService, StateProcessor stateProcessor, InputStream persistStream) { - logTailFuture = executorService.submit(() -> { - try (CppLogMessageHandler h = cppLogHandler) { - h.tailStream(); - } catch (IOException e) { - if (processKilled == false) { - LOGGER.error(new ParameterizedMessage("[{}] Error tailing autodetect process logs", jobId), e); - } - } finally { - if (processCloseInitiated == false && processKilled == false) { - // The log message doesn't say "crashed", as the process could have been killed - // by a user or other process (e.g. the Linux OOM killer) - - String errors = cppLogHandler.getErrors(); - LOGGER.error("[{}] autodetect process stopped unexpectedly: {}", jobId, errors); - onProcessCrash.run(); - } - } - }); - stateProcessorFuture = executorService.submit(() -> { - try (InputStream in = persistStream) { - stateProcessor.process(jobId, in); - if (processKilled == false) { - LOGGER.info("[{}] State output finished", jobId); - } - } catch (IOException e) { - if (processKilled == false) { - LOGGER.error(new ParameterizedMessage("[{}] Error reading autodetect state output", jobId), e); - } - } - }); + @Override + public String getName() { + return NAME; } @Override public void restoreState(StateStreamer stateStreamer, ModelSnapshot modelSnapshot) { if (modelSnapshot != null) { - try (OutputStream r = processRestoreStream) { - stateStreamer.restoreStateToStream(jobId, modelSnapshot, r); + try (OutputStream r = processRestoreStream()) { + stateStreamer.restoreStateToStream(jobId(), modelSnapshot, r); } catch (Exception e) { // TODO: should we fail to start? - if (processKilled == false) { - LOGGER.error("Error restoring model state for job " + jobId, e); + if (isProcessKilled() == false) { + LOGGER.error("Error restoring model state for job " + jobId(), e); } } } - isReady = true; - } - - @Override - public boolean isReady() { - return isReady; - } - - @Override - public void writeRecord(String[] record) throws IOException { - recordWriter.writeRecord(record); + setReady(); } @Override public void writeResetBucketsControlMessage(DataLoadParams params) throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields); - writer.writeResetBucketsMessage(params); + newMessageWriter().writeResetBucketsMessage(params); } @Override public void writeUpdateModelPlotMessage(ModelPlotConfig modelPlotConfig) throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields); - writer.writeUpdateModelPlotMessage(modelPlotConfig); + newMessageWriter().writeUpdateModelPlotMessage(modelPlotConfig); } @Override public void writeUpdateDetectorRulesMessage(int detectorIndex, List rules) throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields); - writer.writeUpdateDetectorRulesMessage(detectorIndex, rules); + newMessageWriter().writeUpdateDetectorRulesMessage(detectorIndex, rules); } @Override public void writeUpdateFiltersMessage(List filters) throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields); - writer.writeUpdateFiltersMessage(filters); + newMessageWriter().writeUpdateFiltersMessage(filters); } @Override public void writeUpdateScheduledEventsMessage(List events, TimeValue bucketSpan) throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields); - writer.writeUpdateScheduledEventsMessage(events, bucketSpan); + newMessageWriter().writeUpdateScheduledEventsMessage(events, bucketSpan); } @Override public String flushJob(FlushJobParams params) throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields); + ControlMsgToProcessWriter writer = newMessageWriter(); writer.writeFlushControlMessage(params); return writer.writeFlushMessage(); } @Override public void forecastJob(ForecastParams params) throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields); - writer.writeForecastMessage(params); - } - - @Override - public void persistJob() throws IOException { - ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(recordWriter, numberOfFields); - writer.writeStartBackgroundPersistMessage(); - } - - @Override - public void flushStream() throws IOException { - recordWriter.flush(); - } - - @Override - public void close() throws IOException { - try { - processCloseInitiated = true; - // closing its input causes the process to exit - processInStream.close(); - // wait for the process to exit by waiting for end-of-file on the named pipe connected - // to the state processor - it may take a long time for all the model state to be - // indexed - if (stateProcessorFuture != null) { - stateProcessorFuture.get(MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT.getMinutes(), TimeUnit.MINUTES); - } - // the log processor should have stopped by now too - assume processing the logs will - // take no more than 5 seconds longer than processing the state (usually it should - // finish first) - if (logTailFuture != null) { - logTailFuture.get(5, TimeUnit.SECONDS); - } - - if (cppLogHandler.seenFatalError()) { - throw ExceptionsHelper.serverError(cppLogHandler.getErrors()); - } - LOGGER.debug("[{}] Autodetect process exited", jobId); - } catch (ExecutionException | TimeoutException e) { - LOGGER.warn(new ParameterizedMessage("[{}] Exception closing the running autodetect process", jobId), e); - } catch (InterruptedException e) { - LOGGER.warn(new ParameterizedMessage("[{}] Exception closing the running autodetect process", jobId), e); - Thread.currentThread().interrupt(); - } finally { - deleteAssociatedFiles(); - } + newMessageWriter().writeForecastMessage(params); } @Override - public void kill() throws IOException { - processKilled = true; - try { - // The PID comes via the processes log stream. We don't wait for it to arrive here, - // but if the wait times out it implies the process has only just started, in which - // case it should die very quickly when we close its input stream. - NativeControllerHolder.getNativeController().killProcess(cppLogHandler.getPid(Duration.ZERO)); - - // Wait for the process to die before closing processInStream as if the process - // is still alive when processInStream is closed autodetect will start persisting state - cppLogHandler.waitForLogStreamClose(WAIT_FOR_KILL_TIMEOUT); - } catch (TimeoutException e) { - LOGGER.warn("[{}] Failed to get PID of autodetect process to kill", jobId); - } finally { - try { - processInStream.close(); - } catch (IOException e) { - // Ignore it - we're shutting down and the method itself has logged a warning - } - try { - deleteAssociatedFiles(); - } catch (IOException e) { - // Ignore it - we're shutting down and the method itself has logged a warning - } - } - } - - private synchronized void deleteAssociatedFiles() throws IOException { - if (filesToDelete == null) { - return; - } - - for (Path fileToDelete : filesToDelete) { - if (Files.deleteIfExists(fileToDelete)) { - LOGGER.debug("[{}] Deleted file {}", jobId, fileToDelete.toString()); - } else { - LOGGER.warn("[{}] Failed to delete file {}", jobId, fileToDelete.toString()); - } - } - - filesToDelete.clear(); + public void persistState() throws IOException { + newMessageWriter().writeStartBackgroundPersistMessage(); } @Override public Iterator readAutodetectResults() { - return resultsParser.parseResults(processOutStream); + return resultsParser.parseResults(processOutStream()); } - @Override - public ZonedDateTime getProcessStartTime() { - return startTime; - } - - @Override - public boolean isProcessAlive() { - // Sanity check: make sure the process hasn't terminated already - return !cppLogHandler.hasLogStreamEnded(); - } - - @Override - public boolean isProcessAliveAfterWaiting() { - cppLogHandler.waitForLogStreamClose(Duration.ofMillis(45)); - return isProcessAlive(); - } - - @Override - public String readError() { - return cppLogHandler.getErrors(); + private ControlMsgToProcessWriter newMessageWriter() { + return new ControlMsgToProcessWriter(recordWriter(), numberOfFields()); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java index 06055476f7642..ea31c5de4dffa 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessFactory.java @@ -16,10 +16,10 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.job.process.NativeController; -import org.elasticsearch.xpack.ml.job.process.ProcessPipes; +import org.elasticsearch.xpack.ml.process.NativeController; +import org.elasticsearch.xpack.ml.process.ProcessPipes; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; -import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor; +import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectStateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; @@ -67,7 +67,7 @@ public AutodetectProcess createAutodetectProcess(Job job, // The extra 1 is the control field int numberOfFields = job.allInputFields().size() + (includeTokensField ? 1 : 0) + 1; - StateProcessor stateProcessor = new StateProcessor(settings, client); + AutodetectStateProcessor stateProcessor = new AutodetectStateProcessor(client, job.getId()); AutodetectResultsParser resultsParser = new AutodetectResultsParser(settings); NativeAutodetectProcess autodetect = new NativeAutodetectProcess( job.getId(), processPipes.getLogStream().get(), processPipes.getProcessInStream().get(), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessor.java similarity index 82% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessor.java index ec62901d65a6e..63a496f0503bc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessor.java @@ -5,17 +5,18 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect.output; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; +import org.elasticsearch.xpack.ml.process.StateProcessor; import java.io.IOException; import java.io.InputStream; @@ -28,17 +29,22 @@ /** * Reads the autodetect state and persists via a bulk request */ -public class StateProcessor extends AbstractComponent { +public class AutodetectStateProcessor implements StateProcessor { + + private static final Logger LOGGER = LogManager.getLogger(AutodetectStateProcessor.class); private static final int READ_BUF_SIZE = 8192; + private final Client client; + private final String jobId; - public StateProcessor(Settings settings, Client client) { - super(settings); + public AutodetectStateProcessor(Client client, String jobId) { this.client = client; + this.jobId = jobId; } - public void process(String jobId, InputStream in) throws IOException { + @Override + public void process(InputStream in) throws IOException { BytesReference bytesToDate = null; List newBlocks = new ArrayList<>(); byte[] readBuf = new byte[READ_BUF_SIZE]; @@ -56,7 +62,7 @@ public void process(String jobId, InputStream in) throws IOException { } else { BytesReference newBytes = new CompositeBytesReference(newBlocks.toArray(new BytesReference[0])); bytesToDate = (bytesToDate == null) ? newBytes : new CompositeBytesReference(bytesToDate, newBytes); - bytesToDate = splitAndPersist(jobId, bytesToDate, searchFrom); + bytesToDate = splitAndPersist(bytesToDate, searchFrom); searchFrom = (bytesToDate == null) ? 0 : bytesToDate.length(); newBlocks.clear(); } @@ -69,7 +75,7 @@ public void process(String jobId, InputStream in) throws IOException { * data is expected to be a series of Elasticsearch bulk requests in UTF-8 JSON * (as would be uploaded to the public REST API) separated by zero bytes ('\0'). */ - private BytesReference splitAndPersist(String jobId, BytesReference bytesRef, int searchFrom) throws IOException { + private BytesReference splitAndPersist(BytesReference bytesRef, int searchFrom) throws IOException { int splitFrom = 0; while (true) { int nextZeroByte = findNextZeroByte(bytesRef, searchFrom, splitFrom); @@ -80,7 +86,7 @@ private BytesReference splitAndPersist(String jobId, BytesReference bytesRef, in // Ignore completely empty chunks if (nextZeroByte > splitFrom) { // No validation - assume the native process has formatted the state correctly - persist(jobId, bytesRef.slice(splitFrom, nextZeroByte - splitFrom)); + persist(bytesRef.slice(splitFrom, nextZeroByte - splitFrom)); } splitFrom = nextZeroByte + 1; } @@ -90,11 +96,11 @@ private BytesReference splitAndPersist(String jobId, BytesReference bytesRef, in return bytesRef.slice(splitFrom, bytesRef.length() - splitFrom); } - void persist(String jobId, BytesReference bytes) throws IOException { + void persist(BytesReference bytes) throws IOException { BulkRequest bulkRequest = new BulkRequest(); bulkRequest.add(bytes, AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, XContentType.JSON); if (bulkRequest.numberOfActions() > 0) { - logger.trace("[{}] Persisting job state document", jobId); + LOGGER.trace("[{}] Persisting job state document", jobId); try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) { client.bulk(bulkRequest).actionGet(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java index 7961fec449774..dc9d77cd68784 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java @@ -12,6 +12,7 @@ import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; +import org.elasticsearch.xpack.ml.process.writer.LengthEncodedWriter; import org.supercsv.encoder.CsvEncoder; import org.supercsv.encoder.DefaultCsvEncoder; import org.supercsv.prefs.CsvPreference; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java index 2c026ec15506e..fc98990d8d61f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java @@ -18,6 +18,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; +import org.elasticsearch.xpack.ml.process.writer.LengthEncodedWriter; import java.io.IOException; import java.io.OutputStream; @@ -168,7 +169,7 @@ public void writeForecastMessage(ForecastParams params) throws IOException { builder.field("tmp_storage", params.getTmpStorage()); } builder.endObject(); - + writeMessage(FORECAST_MESSAGE_CODE + Strings.toString(builder)); fillCommandBuffer(); lengthEncodedWriter.flush(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvRecordWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvRecordWriter.java index 2228835bea2a6..57bbb69c5d0de 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvRecordWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/CsvRecordWriter.java @@ -5,7 +5,7 @@ */ package org.elasticsearch.xpack.ml.job.process.autodetect.writer; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.writer.RecordWriter; +import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter; import org.supercsv.io.CsvListWriter; import org.supercsv.prefs.CsvPreference; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/MultiplyingNormalizerProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/MultiplyingNormalizerProcess.java index 8aa266e15d22e..5d320a1bd715c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/MultiplyingNormalizerProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/MultiplyingNormalizerProcess.java @@ -5,9 +5,8 @@ */ package org.elasticsearch.xpack.ml.job.process.normalizer; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -17,6 +16,7 @@ import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; +import java.time.ZonedDateTime; /** * Normalizer process that doesn't use native code. @@ -27,16 +27,15 @@ * - It can be used to produce results in testing that do not vary based on changes to the real normalization algorithms */ public class MultiplyingNormalizerProcess implements NormalizerProcess { - private static final Logger LOGGER = Loggers.getLogger(MultiplyingNormalizerProcess.class); - private final Settings settings; + private static final Logger LOGGER = LogManager.getLogger(MultiplyingNormalizerProcess.class); + private final double factor; private final PipedInputStream processOutStream; private XContentBuilder builder; private boolean shouldIgnoreHeader; - public MultiplyingNormalizerProcess(Settings settings, double factor) { - this.settings = settings; + public MultiplyingNormalizerProcess(double factor) { this.factor = factor; processOutStream = new PipedInputStream(); try { @@ -49,6 +48,11 @@ public MultiplyingNormalizerProcess(Settings settings, double factor) { shouldIgnoreHeader = true; } + @Override + public boolean isReady() { + return true; + } + @Override public void writeRecord(String[] record) throws IOException { if (shouldIgnoreHeader) { @@ -77,13 +81,33 @@ public void writeRecord(String[] record) throws IOException { } @Override - public void close() throws IOException { + public void persistState() { + // Nothing to do + } + + @Override + public void flushStream() { + // Nothing to do + } + + @Override + public void kill() { + // Nothing to do + } + + @Override + public ZonedDateTime getProcessStartTime() { + return null; + } + + @Override + public void close() { builder.close(); } @Override public NormalizerResultHandler createNormalizedResultsHandler() { - return new NormalizerResultHandler(settings, processOutStream); + return new NormalizerResultHandler(processOutStream); } @Override @@ -92,6 +116,11 @@ public boolean isProcessAlive() { return true; } + @Override + public boolean isProcessAliveAfterWaiting() { + return true; + } + @Override public String readError() { return ""; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java index ee6c7818b38ec..6b67ffa6acb6f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcess.java @@ -5,104 +5,41 @@ */ package org.elasticsearch.xpack.ml.job.process.normalizer; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.xpack.ml.job.process.autodetect.writer.LengthEncodedWriter; -import org.elasticsearch.xpack.ml.job.process.logging.CppLogMessageHandler; import org.elasticsearch.xpack.ml.job.process.normalizer.output.NormalizerResultHandler; -import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.process.AbstractNativeProcess; -import java.io.BufferedOutputStream; -import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.Collections; /** * Normalizer process using native code. */ -class NativeNormalizerProcess implements NormalizerProcess { - private static final Logger LOGGER = Loggers.getLogger(NativeNormalizerProcess.class); +class NativeNormalizerProcess extends AbstractNativeProcess implements NormalizerProcess { - private final String jobId; - private final Settings settings; - private final CppLogMessageHandler cppLogHandler; - private final OutputStream processInStream; - private final InputStream processOutStream; - private final LengthEncodedWriter recordWriter; - private volatile boolean processCloseInitiated; - private Future logTailThread; + private static final String NAME = "normalizer"; - NativeNormalizerProcess(String jobId, Settings settings, InputStream logStream, OutputStream processInStream, - InputStream processOutStream, ExecutorService executorService) throws EsRejectedExecutionException { - this.jobId = jobId; - this.settings = settings; - cppLogHandler = new CppLogMessageHandler(jobId, logStream); - this.processInStream = new BufferedOutputStream(processInStream); - this.processOutStream = processOutStream; - this.recordWriter = new LengthEncodedWriter(this.processInStream); - logTailThread = executorService.submit(() -> { - try (CppLogMessageHandler h = cppLogHandler) { - h.tailStream(); - } catch (IOException e) { - LOGGER.error(new ParameterizedMessage("[{}] Error tailing normalizer process logs", - new Object[] { jobId }), e); - } finally { - if (processCloseInitiated == false) { - // The log message doesn't say "crashed", as the process could have been killed - // by a user or other process (e.g. the Linux OOM killer) - LOGGER.error("[{}] normalizer process stopped unexpectedly", jobId); - } - } - }); + NativeNormalizerProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream) { + super(jobId, logStream, processInStream, processOutStream, null, 0, Collections.emptyList(), () -> {}); } @Override - public void writeRecord(String[] record) throws IOException { - recordWriter.writeRecord(record); + public String getName() { + return NAME; } @Override - public void close() throws IOException { - try { - processCloseInitiated = true; - // closing its input causes the process to exit - processInStream.close(); - // wait for the process to exit by waiting for end-of-file on the named pipe connected to its logger - // this may take a long time as it persists the model state - logTailThread.get(5, TimeUnit.MINUTES); - if (cppLogHandler.seenFatalError()) { - throw ExceptionsHelper.serverError(cppLogHandler.getErrors()); - } - LOGGER.debug("[{}] Normalizer process exited", jobId); - } catch (ExecutionException | TimeoutException e) { - LOGGER.warn(new ParameterizedMessage("[{}] Exception closing the running normalizer process", new Object[] { jobId }), e); - } catch (InterruptedException e) { - LOGGER.warn("[{}] Exception closing the running normalizer process", jobId); - Thread.currentThread().interrupt(); - } + public boolean isReady() { + return true; } @Override - public NormalizerResultHandler createNormalizedResultsHandler() { - return new NormalizerResultHandler(settings, processOutStream); + public void persistState() { + // nothing to persist } @Override - public boolean isProcessAlive() { - // Sanity check: make sure the process hasn't terminated already - return !cppLogHandler.hasLogStreamEnded(); - } - - @Override - public String readError() { - return cppLogHandler.getErrors(); + public NormalizerResultHandler createNormalizedResultsHandler() { + return new NormalizerResultHandler(processOutStream()); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java index 60f52d3f44288..21f7229aef123 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NativeNormalizerProcessFactory.java @@ -5,13 +5,14 @@ */ package org.elasticsearch.xpack.ml.job.process.normalizer; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.xpack.ml.job.process.NativeController; -import org.elasticsearch.xpack.ml.job.process.ProcessPipes; +import org.elasticsearch.xpack.ml.process.NativeController; +import org.elasticsearch.xpack.ml.process.ProcessPipes; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; import java.io.IOException; @@ -22,17 +23,15 @@ public class NativeNormalizerProcessFactory implements NormalizerProcessFactory { - private static final Logger LOGGER = Loggers.getLogger(NativeNormalizerProcessFactory.class); + private static final Logger LOGGER = LogManager.getLogger(NativeNormalizerProcessFactory.class); private static final NamedPipeHelper NAMED_PIPE_HELPER = new NamedPipeHelper(); private static final Duration PROCESS_STARTUP_TIMEOUT = Duration.ofSeconds(10); private final Environment env; - private final Settings settings; private final NativeController nativeController; - public NativeNormalizerProcessFactory(Environment env, Settings settings, NativeController nativeController) { + public NativeNormalizerProcessFactory(Environment env, NativeController nativeController) { this.env = Objects.requireNonNull(env); - this.settings = Objects.requireNonNull(settings); this.nativeController = Objects.requireNonNull(nativeController); } @@ -43,8 +42,20 @@ public NormalizerProcess createNormalizerProcess(String jobId, String quantilesS true, false, true, true, false, false); createNativeProcess(jobId, quantilesState, processPipes, bucketSpan); - return new NativeNormalizerProcess(jobId, settings, processPipes.getLogStream().get(), - processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get(), executorService); + NativeNormalizerProcess normalizerProcess = new NativeNormalizerProcess(jobId, processPipes.getLogStream().get(), + processPipes.getProcessInStream().get(), processPipes.getProcessOutStream().get()); + + try { + normalizerProcess.start(executorService); + return normalizerProcess; + } catch (EsRejectedExecutionException e) { + try { + IOUtils.close(normalizerProcess); + } catch (IOException ioe) { + LOGGER.error("Can't close normalizer", ioe); + } + throw e; + } } private void createNativeProcess(String jobId, String quantilesState, ProcessPipes processPipes, Integer bucketSpan) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerProcess.java index d0ce62612bb69..230048c5b4d2e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerProcess.java @@ -6,40 +6,16 @@ package org.elasticsearch.xpack.ml.job.process.normalizer; import org.elasticsearch.xpack.ml.job.process.normalizer.output.NormalizerResultHandler; - -import java.io.Closeable; -import java.io.IOException; +import org.elasticsearch.xpack.ml.process.NativeProcess; /** * Interface representing the native C++ normalizer process */ -public interface NormalizerProcess extends Closeable { - - /** - * Write the record to normalizer. The record parameter should not be encoded - * (i.e. length encoded) the implementation will appy the corrrect encoding. - * - * @param record Plain array of strings, implementors of this class should - * encode the record appropriately - * @throws IOException If the write failed - */ - void writeRecord(String[] record) throws IOException; +public interface NormalizerProcess extends NativeProcess { /** * Create a result handler for this process's results. * @return results handler */ NormalizerResultHandler createNormalizedResultsHandler(); - - /** - * Returns true if the process still running. - * @return True if the process is still running - */ - boolean isProcessAlive(); - - /** - * Read any content in the error output buffer. - * @return An error message or empty String if no error. - */ - String readError(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandler.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandler.java index dcadef7a24b53..3b65a739e82a9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandler.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandler.java @@ -8,8 +8,6 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContent; @@ -26,15 +24,14 @@ /** * Reads normalizer output. */ -public class NormalizerResultHandler extends AbstractComponent { +public class NormalizerResultHandler { private static final int READ_BUF_SIZE = 1024; private final InputStream inputStream; private final List normalizedResults; - public NormalizerResultHandler(Settings settings, InputStream inputStream) { - super(settings); + public NormalizerResultHandler(InputStream inputStream) { this.inputStream = inputStream; normalizedResults = new ArrayList<>(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java new file mode 100644 index 0000000000000..b84bfdd38e19a --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/AbstractNativeProcess.java @@ -0,0 +1,265 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.process; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.xpack.core.ml.MachineLearningField; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler; +import org.elasticsearch.xpack.ml.process.writer.LengthEncodedWriter; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Abstract class for implementing a native process. + */ +public abstract class AbstractNativeProcess implements NativeProcess { + + private static final Logger LOGGER = LogManager.getLogger(AbstractNativeProcess.class); + + private static final Duration WAIT_FOR_KILL_TIMEOUT = Duration.ofMillis(1000); + + private final String jobId; + private final CppLogMessageHandler cppLogHandler; + private final OutputStream processInStream; + private final InputStream processOutStream; + private final OutputStream processRestoreStream; + private final LengthEncodedWriter recordWriter; + private final ZonedDateTime startTime; + private final int numberOfFields; + private final List filesToDelete; + private final Runnable onProcessCrash; + private volatile Future logTailFuture; + private volatile Future stateProcessorFuture; + private volatile boolean processCloseInitiated; + private volatile boolean processKilled; + private volatile boolean isReady; + + protected AbstractNativeProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, + OutputStream processRestoreStream, int numberOfFields, List filesToDelete, + Runnable onProcessCrash) { + this.jobId = jobId; + cppLogHandler = new CppLogMessageHandler(jobId, logStream); + this.processInStream = new BufferedOutputStream(processInStream); + this.processOutStream = processOutStream; + this.processRestoreStream = processRestoreStream; + this.recordWriter = new LengthEncodedWriter(this.processInStream); + startTime = ZonedDateTime.now(); + this.numberOfFields = numberOfFields; + this.filesToDelete = filesToDelete; + this.onProcessCrash = Objects.requireNonNull(onProcessCrash); + } + + public abstract String getName(); + + /** + * Starts a process that does not persist any state + * @param executorService the executor service to run on + */ + public void start(ExecutorService executorService) { + logTailFuture = executorService.submit(() -> { + try (CppLogMessageHandler h = cppLogHandler) { + h.tailStream(); + } catch (IOException e) { + if (processKilled == false) { + LOGGER.error(new ParameterizedMessage("[{}] Error tailing {} process logs", jobId, getName()), e); + } + } finally { + if (processCloseInitiated == false && processKilled == false) { + // The log message doesn't say "crashed", as the process could have been killed + // by a user or other process (e.g. the Linux OOM killer) + + String errors = cppLogHandler.getErrors(); + LOGGER.error("[{}] {} process stopped unexpectedly: {}", jobId, getName(), errors); + onProcessCrash.run(); + } + } + }); + } + + /** + * Starts a process that may persist its state + * @param executorService the executor service to run on + * @param stateProcessor the state processor + * @param persistStream the stream where the state is persisted + */ + public void start(ExecutorService executorService, StateProcessor stateProcessor, InputStream persistStream) { + start(executorService); + + stateProcessorFuture = executorService.submit(() -> { + try (InputStream in = persistStream) { + stateProcessor.process(in); + if (processKilled == false) { + LOGGER.info("[{}] State output finished", jobId); + } + } catch (IOException e) { + if (processKilled == false) { + LOGGER.error(new ParameterizedMessage("[{}] Error reading {} state output", jobId, getName()), e); + } + } + }); + } + + @Override + public boolean isReady() { + return isReady; + } + + protected void setReady() { + isReady = true; + } + + @Override + public void writeRecord(String[] record) throws IOException { + recordWriter.writeRecord(record); + } + + @Override + public void flushStream() throws IOException { + recordWriter.flush(); + } + + @Override + public void close() throws IOException { + try { + processCloseInitiated = true; + // closing its input causes the process to exit + processInStream.close(); + // wait for the process to exit by waiting for end-of-file on the named pipe connected + // to the state processor - it may take a long time for all the model state to be + // indexed + if (stateProcessorFuture != null) { + stateProcessorFuture.get(MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT.getMinutes(), TimeUnit.MINUTES); + } + // the log processor should have stopped by now too - assume processing the logs will + // take no more than 5 seconds longer than processing the state (usually it should + // finish first) + if (logTailFuture != null) { + logTailFuture.get(5, TimeUnit.SECONDS); + } + + if (cppLogHandler.seenFatalError()) { + throw ExceptionsHelper.serverError(cppLogHandler.getErrors()); + } + LOGGER.debug("[{}] {} process exited", jobId, getName()); + } catch (ExecutionException | TimeoutException e) { + LOGGER.warn(new ParameterizedMessage("[{}] Exception closing the running {} process", jobId, getName()), e); + } catch (InterruptedException e) { + LOGGER.warn(new ParameterizedMessage("[{}] Exception closing the running {} process", jobId, getName()), e); + Thread.currentThread().interrupt(); + } finally { + deleteAssociatedFiles(); + } + } + + @Override + public void kill() throws IOException { + processKilled = true; + try { + // The PID comes via the processes log stream. We don't wait for it to arrive here, + // but if the wait times out it implies the process has only just started, in which + // case it should die very quickly when we close its input stream. + NativeControllerHolder.getNativeController().killProcess(cppLogHandler.getPid(Duration.ZERO)); + + // Wait for the process to die before closing processInStream as if the process + // is still alive when processInStream is closed it may start persisting state + cppLogHandler.waitForLogStreamClose(WAIT_FOR_KILL_TIMEOUT); + } catch (TimeoutException e) { + LOGGER.warn("[{}] Failed to get PID of {} process to kill", jobId, getName()); + } finally { + try { + processInStream.close(); + } catch (IOException e) { + // Ignore it - we're shutting down and the method itself has logged a warning + } + try { + deleteAssociatedFiles(); + } catch (IOException e) { + // Ignore it - we're shutting down and the method itself has logged a warning + } + } + } + + private synchronized void deleteAssociatedFiles() throws IOException { + if (filesToDelete == null) { + return; + } + + for (Path fileToDelete : filesToDelete) { + if (Files.deleteIfExists(fileToDelete)) { + LOGGER.debug("[{}] Deleted file {}", jobId, fileToDelete.toString()); + } else { + LOGGER.warn("[{}] Failed to delete file {}", jobId, fileToDelete.toString()); + } + } + + filesToDelete.clear(); + } + + @Override + public ZonedDateTime getProcessStartTime() { + return startTime; + } + + @Override + public boolean isProcessAlive() { + // Sanity check: make sure the process hasn't terminated already + return !cppLogHandler.hasLogStreamEnded(); + } + + @Override + public boolean isProcessAliveAfterWaiting() { + cppLogHandler.waitForLogStreamClose(Duration.ofMillis(45)); + return isProcessAlive(); + } + + @Override + public String readError() { + return cppLogHandler.getErrors(); + } + + protected String jobId() { + return jobId; + } + + protected InputStream processOutStream() { + return processOutStream; + } + + @Nullable + protected OutputStream processRestoreStream() { + return processRestoreStream; + } + + protected int numberOfFields() { + return numberOfFields; + } + + protected LengthEncodedWriter recordWriter() { + return recordWriter; + } + + protected boolean isProcessKilled() { + return processKilled; + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeController.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java similarity index 98% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeController.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java index 0b9cb833c8980..747074028953c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeController.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeController.java @@ -3,13 +3,13 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.job.process; +package org.elasticsearch.xpack.ml.process; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.env.Environment; -import org.elasticsearch.xpack.ml.job.process.logging.CppLogMessageHandler; +import org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler; import org.elasticsearch.xpack.ml.utils.NamedPipeHelper; import java.io.BufferedOutputStream; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeControllerHolder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeControllerHolder.java similarity index 97% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeControllerHolder.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeControllerHolder.java index 9bcb6e787290d..67e24b44a8494 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeControllerHolder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeControllerHolder.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.job.process; +package org.elasticsearch.xpack.ml.process; import org.elasticsearch.env.Environment; import org.elasticsearch.xpack.core.ml.MachineLearningField; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeProcess.java new file mode 100644 index 0000000000000..c4f2b4a463185 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeProcess.java @@ -0,0 +1,85 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.process; + +import java.io.Closeable; +import java.io.IOException; +import java.time.ZonedDateTime; + +/** + * Interface representing a native C++ process + */ +public interface NativeProcess extends Closeable { + + /** + * Is the process ready to receive data? + * @return {@code true} if the process is ready to receive data + */ + boolean isReady(); + + /** + * Write the record to the process. The record parameter should not be encoded + * (i.e. length encoded) the implementation will apply the correct encoding. + * + * @param record Plain array of strings, implementors of this class should + * encode the record appropriately + * @throws IOException If the write failed + */ + void writeRecord(String[] record) throws IOException; + + /** + * Ask the process to persist its state in the background + * @throws IOException If writing the request fails + */ + void persistState() throws IOException; + + /** + * Flush the output data stream + */ + void flushStream() throws IOException; + + /** + * Kill the process. Do not wait for it to stop gracefully. + */ + void kill() throws IOException; + + /** + * The time the process was started + * @return Process start time + */ + ZonedDateTime getProcessStartTime(); + + /** + * Returns true if the process still running. + * Methods instructing the process are essentially + * asynchronous; the command will be continue to execute in the process after + * the call has returned. + * This method tests whether something catastrophic + * occurred in the process during its execution. + * @return True if the process is still running + */ + boolean isProcessAlive(); + + /** + * Check whether the process terminated given a grace period. + * + * Processing errors are highly likely caused by the process being unexpectedly + * terminated. + * + * Workaround: As we can not easily check if the process is alive, we rely on + * the logPipe being ended. As the loghandler runs in another thread which + * might fall behind this one, we give it a grace period. + * + * @return false if process has ended for sure, true if it probably still runs + */ + boolean isProcessAliveAfterWaiting(); + + /** + * Read any content in the error output buffer. + * @return An error message or empty String if no error. + */ + String readError(); +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeStorageProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeStorageProvider.java similarity index 98% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeStorageProvider.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeStorageProvider.java index 8a0268a8d0793..9670fadfefff3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeStorageProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeStorageProvider.java @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.job.process; +package org.elasticsearch.xpack.ml.process; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.logging.Loggers; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/ProcessPipes.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java similarity index 99% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/ProcessPipes.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java index 41a7df348b103..4d468f80176f9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/ProcessPipes.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/ProcessPipes.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.job.process; +package org.elasticsearch.xpack.ml.process; import org.elasticsearch.common.Strings; import org.elasticsearch.env.Environment; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/StateProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/StateProcessor.java new file mode 100644 index 0000000000000..e3937d7199131 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/StateProcessor.java @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.process; + +import java.io.IOException; +import java.io.InputStream; + +public interface StateProcessor { + + void process(InputStream in) throws IOException; +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessage.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessage.java similarity index 99% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessage.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessage.java index 6064cfef31b18..c3310b6b1b5b8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessage.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessage.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.job.process.logging; +package org.elasticsearch.xpack.ml.process.logging; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageHandler.java similarity index 99% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageHandler.java index af0f199dd0c58..341b9ae371b82 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandler.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageHandler.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.job.process.logging; +package org.elasticsearch.xpack.ml.process.logging; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/LengthEncodedWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java similarity index 95% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/LengthEncodedWriter.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java index 34f9d8dc469fc..e82c963b5ed6c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/LengthEncodedWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriter.java @@ -3,9 +3,9 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.job.process.autodetect.writer; +package org.elasticsearch.xpack.ml.process.writer; -import org.elasticsearch.xpack.core.ml.job.process.autodetect.writer.RecordWriter; +import org.elasticsearch.xpack.core.ml.process.writer.RecordWriter; import java.io.IOException; import java.io.OutputStream; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilderTests.java index 325ad52864bfa..9ef56d927f553 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectBuilderTests.java @@ -15,8 +15,8 @@ import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.ml.job.process.NativeController; -import org.elasticsearch.xpack.ml.job.process.ProcessPipes; +import org.elasticsearch.xpack.ml.process.NativeController; +import org.elasticsearch.xpack.ml.process.ProcessPipes; import org.junit.Before; import java.nio.file.Path; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java index 93e79c8b97078..6d5adeb3fdbf1 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/NativeAutodetectProcessTests.java @@ -9,7 +9,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser; -import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor; +import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectStateProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; @@ -56,7 +56,7 @@ public void testProcessStartTime() throws Exception { mock(OutputStream.class), mock(InputStream.class), mock(OutputStream.class), NUMBER_FIELDS, null, new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) { - process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); + process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class)); ZonedDateTime startTime = process.getProcessStartTime(); Thread.sleep(500); @@ -76,7 +76,7 @@ public void testWriteRecord() throws IOException { try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, bos, mock(InputStream.class), mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) { - process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); + process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class)); process.writeRecord(record); process.flushStream(); @@ -108,7 +108,7 @@ public void testFlush() throws IOException { try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, bos, mock(InputStream.class), mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) { - process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); + process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class)); FlushJobParams params = FlushJobParams.builder().build(); process.flushJob(params); @@ -128,7 +128,7 @@ public void testWriteUpdateConfigMessage() throws IOException { } public void testPersistJob() throws IOException { - testWriteMessage(p -> p.persistJob(), ControlMsgToProcessWriter.BACKGROUND_PERSIST_MESSAGE_CODE); + testWriteMessage(p -> p.persistState(), ControlMsgToProcessWriter.BACKGROUND_PERSIST_MESSAGE_CODE); } public void testWriteMessage(CheckedConsumer writeFunction, String expectedMessageCode) throws IOException { @@ -138,7 +138,7 @@ public void testWriteMessage(CheckedConsumer writeFunct try (NativeAutodetectProcess process = new NativeAutodetectProcess("foo", logStream, bos, mock(InputStream.class), mock(OutputStream.class), NUMBER_FIELDS, Collections.emptyList(), new AutodetectResultsParser(Settings.EMPTY), mock(Runnable.class))) { - process.start(executorService, mock(StateProcessor.class), mock(InputStream.class)); + process.start(executorService, mock(AutodetectStateProcessor.class), mock(InputStream.class)); writeFunction.accept(process); process.writeUpdateModelPlotMessage(new ModelPlotConfig()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessorTests.java similarity index 88% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessorTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessorTests.java index 31b96d8393d12..e4fb5a7f07456 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/StateProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutodetectStateProcessorTests.java @@ -26,7 +26,6 @@ import java.util.List; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -37,7 +36,7 @@ /** * Tests for reading state from the native process. */ -public class StateProcessorTests extends ESTestCase { +public class AutodetectStateProcessorTests extends ESTestCase { private static final String STATE_SAMPLE = "" + "{\"index\": {\"_index\": \"test\", \"_type\": \"type1\", \"_id\": \"1\"}}\n" @@ -50,18 +49,20 @@ public class StateProcessorTests extends ESTestCase { + "{ \"field\" : \"value3\" }\n" + "\0"; + private static final String JOB_ID = "state-processor-test-job"; + private static final int NUM_LARGE_DOCS = 2; private static final int LARGE_DOC_SIZE = 1000000; private Client client; - private StateProcessor stateProcessor; + private AutodetectStateProcessor stateProcessor; @Before public void initialize() throws IOException { client = mock(Client.class); @SuppressWarnings("unchecked") ActionFuture bulkResponseFuture = mock(ActionFuture.class); - stateProcessor = spy(new StateProcessor(Settings.EMPTY, client)); + stateProcessor = spy(new AutodetectStateProcessor(client, JOB_ID)); when(client.bulk(any(BulkRequest.class))).thenReturn(bulkResponseFuture); ThreadPool threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); @@ -75,9 +76,9 @@ public void verifyNoMoreClientInteractions() { public void testStateRead() throws IOException { ByteArrayInputStream stream = new ByteArrayInputStream(STATE_SAMPLE.getBytes(StandardCharsets.UTF_8)); - stateProcessor.process("_id", stream); + stateProcessor.process(stream); ArgumentCaptor bytesRefCaptor = ArgumentCaptor.forClass(BytesReference.class); - verify(stateProcessor, times(3)).persist(eq("_id"), bytesRefCaptor.capture()); + verify(stateProcessor, times(3)).persist(bytesRefCaptor.capture()); String[] threeStates = STATE_SAMPLE.split("\0"); List capturedBytes = bytesRefCaptor.getAllValues(); @@ -92,9 +93,9 @@ public void testStateReadGivenConsecutiveZeroBytes() throws IOException { String zeroBytes = "\0\0\0\0\0\0"; ByteArrayInputStream stream = new ByteArrayInputStream(zeroBytes.getBytes(StandardCharsets.UTF_8)); - stateProcessor.process("_id", stream); + stateProcessor.process(stream); - verify(stateProcessor, never()).persist(eq("_id"), any()); + verify(stateProcessor, never()).persist(any()); Mockito.verifyNoMoreInteractions(client); } @@ -102,9 +103,9 @@ public void testStateReadGivenConsecutiveSpacesFollowedByZeroByte() throws IOExc String zeroBytes = " \n\0"; ByteArrayInputStream stream = new ByteArrayInputStream(zeroBytes.getBytes(StandardCharsets.UTF_8)); - stateProcessor.process("_id", stream); + stateProcessor.process(stream); - verify(stateProcessor, times(1)).persist(eq("_id"), any()); + verify(stateProcessor, times(1)).persist(any()); Mockito.verifyNoMoreInteractions(client); } @@ -125,8 +126,8 @@ public void testLargeStateRead() throws Exception { } ByteArrayInputStream stream = new ByteArrayInputStream(builder.toString().getBytes(StandardCharsets.UTF_8)); - stateProcessor.process("_id", stream); - verify(stateProcessor, times(NUM_LARGE_DOCS)).persist(eq("_id"), any()); + stateProcessor.process(stream); + verify(stateProcessor, times(NUM_LARGE_DOCS)).persist(any()); verify(client, times(NUM_LARGE_DOCS)).bulk(any(BulkRequest.class)); verify(client, times(NUM_LARGE_DOCS)).threadPool(); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriterTests.java index 38bef42f800cf..01bdd6a999f26 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriterTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; import org.elasticsearch.xpack.ml.job.process.autodetect.writer.AbstractDataToProcessWriter.InputOutputMap; +import org.elasticsearch.xpack.ml.process.writer.LengthEncodedWriter; import org.junit.Before; import org.mockito.Mockito; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java index 3d08f5a1c25fb..57554227e9ad3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriterTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; +import org.elasticsearch.xpack.ml.process.writer.LengthEncodedWriter; import org.junit.Before; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerTests.java index 661eeca98db8f..04ea8f2c70ef3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/NormalizerTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.job.process.normalizer; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.job.results.BucketInfluencer; @@ -32,7 +31,7 @@ public class NormalizerTests extends ESTestCase { private static final double INITIAL_SCORE = 3.0; private static final double FACTOR = 2.0; - private Bucket generateBucket(Date timestamp) throws IOException { + private Bucket generateBucket(Date timestamp) { return new Bucket(JOB_ID, timestamp, BUCKET_SPAN); } @@ -49,8 +48,8 @@ public void testNormalize() throws IOException, InterruptedException { ExecutorService threadpool = Executors.newScheduledThreadPool(1); try { NormalizerProcessFactory processFactory = mock(NormalizerProcessFactory.class); - when(processFactory.createNormalizerProcess(eq(JOB_ID), eq(QUANTILES_STATE), eq(BUCKET_SPAN), - any())).thenReturn(new MultiplyingNormalizerProcess(Settings.EMPTY, FACTOR)); + when(processFactory.createNormalizerProcess(eq(JOB_ID), eq(QUANTILES_STATE), eq(BUCKET_SPAN), any())) + .thenReturn(new MultiplyingNormalizerProcess(FACTOR)); Normalizer normalizer = new Normalizer(JOB_ID, processFactory, threadpool); Bucket bucket = generateBucket(new Date(0)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandlerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandlerTests.java index 9e6a4afc4e318..cc0234df39ed7 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandlerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/normalizer/output/NormalizerResultHandlerTests.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.ml.job.process.normalizer.output; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerResult; @@ -32,7 +31,7 @@ public void testParse() throws IOException { + "\"value_field_name\":\"x\",\"probability\":0.03,\"normalized_score\":22.22}\n"; InputStream is = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); - NormalizerResultHandler handler = new NormalizerResultHandler(Settings.EMPTY, is); + NormalizerResultHandler handler = new NormalizerResultHandler(is); handler.process(); List results = handler.getNormalizedResults(); assertEquals(3, results.size()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeControllerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeControllerTests.java similarity index 99% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeControllerTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeControllerTests.java index 08c73cdd9c7e9..ac00e8a24e1cf 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeControllerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeControllerTests.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.job.process; +package org.elasticsearch.xpack.ml.process; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.settings.Settings; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeStorageProviderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeStorageProviderTests.java similarity index 99% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeStorageProviderTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeStorageProviderTests.java index 3103e76c82bde..fd87e29387e0b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeStorageProviderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeStorageProviderTests.java @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.job.process; +package org.elasticsearch.xpack.ml.process; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -21,12 +21,11 @@ import java.util.HashMap; import java.util.Map; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import static org.mockito.Mockito.any; - -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; public class NativeStorageProviderTests extends ESTestCase { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/ProcessPipesTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java similarity index 99% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/ProcessPipesTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java index 708d7af152014..fa703e778c49d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/ProcessPipesTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/ProcessPipesTests.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.job.process; +package org.elasticsearch.xpack.ml.process; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageHandlerTests.java similarity index 99% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageHandlerTests.java index af2691d6f3575..d490d58c3ab52 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageHandlerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageHandlerTests.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.job.process.logging; +package org.elasticsearch.xpack.ml.process.logging; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; @@ -203,7 +203,7 @@ public void testParseFatalError() throws IOException, IllegalAccessException { } } - private static void executeLoggingTest(InputStream is, MockLogAppender mockAppender, Level level, String jobId) + private static void executeLoggingTest(InputStream is, MockLogAppender mockAppender, Level level, String jobId) throws IOException { Logger cppMessageLogger = Loggers.getLogger(CppLogMessageHandler.class); Loggers.addAppender(cppMessageLogger, mockAppender); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageTests.java similarity index 98% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageTests.java index d3145bb9f6c6b..c6a0bdf151a48 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/logging/CppLogMessageTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/logging/CppLogMessageTests.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.job.process.logging; +package org.elasticsearch.xpack.ml.process.logging; import org.elasticsearch.common.io.stream.Writeable.Reader; import org.elasticsearch.common.xcontent.DeprecationHandler; @@ -72,4 +72,4 @@ protected Reader instanceReader() { protected CppLogMessage doParseInstance(XContentParser parser) { return CppLogMessage.PARSER.apply(parser, null); } -} \ No newline at end of file +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/LengthEncodedWriterTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriterTests.java similarity index 99% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/LengthEncodedWriterTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriterTests.java index 36f8c8f003050..0e9aff1fb2caf 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/LengthEncodedWriterTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/writer/LengthEncodedWriterTests.java @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.job.process.autodetect.writer; +package org.elasticsearch.xpack.ml.process.writer; import org.elasticsearch.test.ESTestCase; import org.junit.Assert;