From 7c18389dffaa9672ffc9449c9803156b8f48dbce Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 11 Feb 2020 09:59:18 +0200 Subject: [PATCH] [7.x][ML] Prepare to hold additional stats in DF Analytics task (#52134) Refactors `DataFrameAnalyticsTask` to hold a `StatsHolder` object. That just has a `ProgressTracker` for now but this is paving the way to add additional stats like memory usage, analysis stats, etc. Backport #52134 --- ...sportGetDataFrameAnalyticsStatsAction.java | 7 ++-- .../ml/dataframe/DataFrameAnalyticsTask.java | 33 ++++-------------- .../process/AnalyticsProcessManager.java | 7 ++-- .../process/AnalyticsResultProcessor.java | 14 ++++---- .../ml/dataframe/stats/ProgressTracker.java | 34 +++++++++++++++++++ .../xpack/ml/dataframe/stats/StatsHolder.java | 19 +++++++++++ .../process/AnalyticsProcessManagerTests.java | 9 ++--- .../AnalyticsResultProcessorTests.java | 14 ++++---- 8 files changed, 85 insertions(+), 52 deletions(-) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/ProgressTracker.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsHolder.java diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java index 4b35b092285e2..08edea3329813 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java @@ -48,6 +48,7 @@ import org.elasticsearch.xpack.core.ml.utils.PhaseProgress; import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask; import org.elasticsearch.xpack.ml.dataframe.StoredProgress; +import org.elasticsearch.xpack.ml.dataframe.stats.ProgressTracker; import java.io.IOException; import java.io.InputStream; @@ -106,9 +107,7 @@ protected void taskOperation(GetDataFrameAnalyticsStatsAction.Request request, D ); ActionListener reindexingProgressListener = ActionListener.wrap( - aVoid -> { - progressListener.onResponse(task.getProgressTracker().report()); - }, + aVoid -> progressListener.onResponse(task.getStatsHolder().getProgressTracker().report()), listener::onFailure ); @@ -201,7 +200,7 @@ private void searchStoredProgresses(List configIds, ActionListener headers, Client client, ClusterService clusterService, DataFrameAnalyticsManager analyticsManager, @@ -98,8 +98,8 @@ public boolean isStopping() { return isStopping; } - public ProgressTracker getProgressTracker() { - return progressTracker; + public StatsHolder getStatsHolder() { + return statsHolder; } @Override @@ -197,7 +197,7 @@ public void updateReindexTaskProgress(ActionListener listener) { // We set reindexing progress at least to 1 for a running process to be able to // distinguish a job that is running for the first time against a job that is restarting. reindexTaskProgress -> { - progressTracker.reindexingPercent.set(Math.max(1, reindexTaskProgress)); + statsHolder.getProgressTracker().reindexingPercent.set(Math.max(1, reindexTaskProgress)); listener.onResponse(null); }, listener::onFailure @@ -353,25 +353,4 @@ public static StartingState determineStartingState(String jobId, List report() { - return Arrays.asList( - new PhaseProgress(REINDEXING, reindexingPercent.get()), - new PhaseProgress(LOADING_DATA, loadingDataPercent.get()), - new PhaseProgress(ANALYZING, analyzingPercent.get()), - new PhaseProgress(WRITING_RESULTS, writingResultsPercent.get()) - ); - } - } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java index d66973bb7777f..c1c8d95bd7157 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManager.java @@ -35,6 +35,7 @@ import org.elasticsearch.xpack.ml.dataframe.process.customprocessing.CustomProcessor; import org.elasticsearch.xpack.ml.dataframe.process.customprocessing.CustomProcessorFactory; import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult; +import org.elasticsearch.xpack.ml.dataframe.stats.ProgressTracker; import org.elasticsearch.xpack.ml.extractor.ExtractedFields; import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider; import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; @@ -152,7 +153,7 @@ private void processData(DataFrameAnalyticsTask task, ProcessContext processCont AnalyticsResultProcessor resultProcessor = processContext.resultProcessor.get(); try { writeHeaderRecord(dataExtractor, process); - writeDataRows(dataExtractor, process, config.getAnalysis(), task.getProgressTracker()); + writeDataRows(dataExtractor, process, config.getAnalysis(), task.getStatsHolder().getProgressTracker()); process.writeEndOfDataMessage(); process.flushStream(); @@ -199,7 +200,7 @@ private void processData(DataFrameAnalyticsTask task, ProcessContext processCont } private void writeDataRows(DataFrameDataExtractor dataExtractor, AnalyticsProcess process, - DataFrameAnalysis analysis, DataFrameAnalyticsTask.ProgressTracker progressTracker) throws IOException { + DataFrameAnalysis analysis, ProgressTracker progressTracker) throws IOException { CustomProcessor customProcessor = new CustomProcessorFactory(dataExtractor.getFieldNames()).create(analysis); @@ -427,7 +428,7 @@ private AnalyticsResultProcessor createResultProcessor(DataFrameAnalyticsTask ta DataFrameRowsJoiner dataFrameRowsJoiner = new DataFrameRowsJoiner(config.getId(), dataExtractorFactory.newExtractor(true), resultsPersisterService); return new AnalyticsResultProcessor( - config, dataFrameRowsJoiner, task.getProgressTracker(), trainedModelProvider, auditor, dataExtractor.get().getFieldNames()); + config, dataFrameRowsJoiner, task.getStatsHolder(), trainedModelProvider, auditor, dataExtractor.get().getFieldNames()); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java index 9ce7a16084461..636502dbe0058 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessor.java @@ -23,9 +23,9 @@ import org.elasticsearch.xpack.core.ml.inference.TrainedModelInput; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.security.user.XPackUser; -import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask.ProgressTracker; import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult; import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults; +import org.elasticsearch.xpack.ml.dataframe.stats.StatsHolder; import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider; import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; @@ -57,7 +57,7 @@ public class AnalyticsResultProcessor { private final DataFrameAnalyticsConfig analytics; private final DataFrameRowsJoiner dataFrameRowsJoiner; - private final ProgressTracker progressTracker; + private final StatsHolder statsHolder; private final TrainedModelProvider trainedModelProvider; private final DataFrameAnalyticsAuditor auditor; private final List fieldNames; @@ -66,11 +66,11 @@ public class AnalyticsResultProcessor { private volatile boolean isCancelled; public AnalyticsResultProcessor(DataFrameAnalyticsConfig analytics, DataFrameRowsJoiner dataFrameRowsJoiner, - ProgressTracker progressTracker, TrainedModelProvider trainedModelProvider, + StatsHolder statsHolder, TrainedModelProvider trainedModelProvider, DataFrameAnalyticsAuditor auditor, List fieldNames) { this.analytics = Objects.requireNonNull(analytics); this.dataFrameRowsJoiner = Objects.requireNonNull(dataFrameRowsJoiner); - this.progressTracker = Objects.requireNonNull(progressTracker); + this.statsHolder = Objects.requireNonNull(statsHolder); this.trainedModelProvider = Objects.requireNonNull(trainedModelProvider); this.auditor = Objects.requireNonNull(auditor); this.fieldNames = Collections.unmodifiableList(Objects.requireNonNull(fieldNames)); @@ -128,11 +128,11 @@ public void process(AnalyticsProcess process) { } private void updateResultsProgress(int progress) { - progressTracker.writingResultsPercent.set(Math.min(progress, MAX_PROGRESS_BEFORE_COMPLETION)); + statsHolder.getProgressTracker().writingResultsPercent.set(Math.min(progress, MAX_PROGRESS_BEFORE_COMPLETION)); } private void completeResultsProgress() { - progressTracker.writingResultsPercent.set(100); + statsHolder.getProgressTracker().writingResultsPercent.set(100); } private void processResult(AnalyticsResult result, DataFrameRowsJoiner resultsJoiner) { @@ -142,7 +142,7 @@ private void processResult(AnalyticsResult result, DataFrameRowsJoiner resultsJo } Integer progressPercent = result.getProgressPercent(); if (progressPercent != null) { - progressTracker.analyzingPercent.set(progressPercent); + statsHolder.getProgressTracker().analyzingPercent.set(progressPercent); } TrainedModelDefinition.Builder inferenceModelBuilder = result.getInferenceModelBuilder(); if (inferenceModelBuilder != null) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/ProgressTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/ProgressTracker.java new file mode 100644 index 0000000000000..0c62707210519 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/ProgressTracker.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.dataframe.stats; + +import org.elasticsearch.xpack.core.ml.utils.PhaseProgress; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class ProgressTracker { + + public static final String REINDEXING = "reindexing"; + public static final String LOADING_DATA = "loading_data"; + public static final String ANALYZING = "analyzing"; + public static final String WRITING_RESULTS = "writing_results"; + + public final AtomicInteger reindexingPercent = new AtomicInteger(0); + public final AtomicInteger loadingDataPercent = new AtomicInteger(0); + public final AtomicInteger analyzingPercent = new AtomicInteger(0); + public final AtomicInteger writingResultsPercent = new AtomicInteger(0); + + public List report() { + return Arrays.asList( + new PhaseProgress(REINDEXING, reindexingPercent.get()), + new PhaseProgress(LOADING_DATA, loadingDataPercent.get()), + new PhaseProgress(ANALYZING, analyzingPercent.get()), + new PhaseProgress(WRITING_RESULTS, writingResultsPercent.get()) + ); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsHolder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsHolder.java new file mode 100644 index 0000000000000..ac0396b3e81ca --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/stats/StatsHolder.java @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.dataframe.stats; + +/** + * Holds data frame analytics stats in memory so that they may be retrieved + * from the get stats api for started jobs efficiently. + */ +public class StatsHolder { + + private final ProgressTracker progressTracker = new ProgressTracker(); + + public ProgressTracker getProgressTracker() { + return progressTracker; + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java index 5d0e136a50013..185042ba5c13a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsProcessManagerTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractor; import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory; import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult; +import org.elasticsearch.xpack.ml.dataframe.stats.StatsHolder; import org.elasticsearch.xpack.ml.extractor.ExtractedFields; import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider; import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; @@ -89,7 +90,7 @@ public void setUpMocks() { task = mock(DataFrameAnalyticsTask.class); when(task.getAllocationId()).thenReturn(TASK_ALLOCATION_ID); - when(task.getProgressTracker()).thenReturn(mock(DataFrameAnalyticsTask.ProgressTracker.class)); + when(task.getStatsHolder()).thenReturn(new StatsHolder()); dataFrameAnalyticsConfig = DataFrameAnalyticsConfigTests.createRandomBuilder(CONFIG_ID, false, OutlierDetectionTests.createRandom()).build(); @@ -127,7 +128,7 @@ public void testRunJob_ProcessContextAlreadyExists() { inOrder.verify(task).isStopping(); inOrder.verify(task).getAllocationId(); inOrder.verify(task).isStopping(); - inOrder.verify(task).getProgressTracker(); + inOrder.verify(task).getStatsHolder(); inOrder.verify(task).isStopping(); inOrder.verify(task).getAllocationId(); inOrder.verify(task).updateState(DataFrameAnalyticsState.FAILED, "[config-id] Could not create process as one already exists"); @@ -162,7 +163,7 @@ public void testRunJob_Ok() { inOrder.verify(dataExtractor).collectDataSummary(); inOrder.verify(dataExtractor).getCategoricalFields(dataFrameAnalyticsConfig.getAnalysis()); inOrder.verify(process).isProcessAlive(); - inOrder.verify(task).getProgressTracker(); + inOrder.verify(task).getStatsHolder(); inOrder.verify(dataExtractor).getFieldNames(); inOrder.verify(executorServiceForProcess, times(2)).execute(any()); // 'processData' and 'processResults' threads verifyNoMoreInteractions(dataExtractor, executorServiceForProcess, process, task); @@ -220,7 +221,7 @@ public void testProcessContext_StartAndStop() throws Exception { inOrder.verify(dataExtractor).collectDataSummary(); inOrder.verify(dataExtractor).getCategoricalFields(dataFrameAnalyticsConfig.getAnalysis()); inOrder.verify(process).isProcessAlive(); - inOrder.verify(task).getProgressTracker(); + inOrder.verify(task).getStatsHolder(); inOrder.verify(dataExtractor).getFieldNames(); // stop inOrder.verify(dataExtractor).cancel(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java index 09969817374cd..a93e3f4b0f126 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/process/AnalyticsResultProcessorTests.java @@ -21,9 +21,9 @@ import org.elasticsearch.xpack.core.ml.inference.TrainedModelDefinition; import org.elasticsearch.xpack.core.ml.inference.TrainedModelDefinitionTests; import org.elasticsearch.xpack.core.security.user.XPackUser; -import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask.ProgressTracker; import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult; import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults; +import org.elasticsearch.xpack.ml.dataframe.stats.StatsHolder; import org.elasticsearch.xpack.ml.extractor.ExtractedFields; import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider; import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; @@ -58,7 +58,7 @@ public class AnalyticsResultProcessorTests extends ESTestCase { private AnalyticsProcess process; private DataFrameRowsJoiner dataFrameRowsJoiner; - private ProgressTracker progressTracker = new ProgressTracker(); + private StatsHolder statsHolder = new StatsHolder(); private TrainedModelProvider trainedModelProvider; private DataFrameAnalyticsAuditor auditor; private DataFrameAnalyticsConfig analyticsConfig; @@ -101,7 +101,7 @@ public void testProcess_GivenEmptyResults() { verify(dataFrameRowsJoiner).close(); Mockito.verifyNoMoreInteractions(dataFrameRowsJoiner); - assertThat(progressTracker.writingResultsPercent.get(), equalTo(100)); + assertThat(statsHolder.getProgressTracker().writingResultsPercent.get(), equalTo(100)); } public void testProcess_GivenRowResults() { @@ -118,7 +118,7 @@ public void testProcess_GivenRowResults() { inOrder.verify(dataFrameRowsJoiner).processRowResults(rowResults1); inOrder.verify(dataFrameRowsJoiner).processRowResults(rowResults2); - assertThat(progressTracker.writingResultsPercent.get(), equalTo(100)); + assertThat(statsHolder.getProgressTracker().writingResultsPercent.get(), equalTo(100)); } public void testProcess_GivenDataFrameRowsJoinerFails() { @@ -140,7 +140,7 @@ public void testProcess_GivenDataFrameRowsJoinerFails() { verify(auditor).error(eq(JOB_ID), auditCaptor.capture()); assertThat(auditCaptor.getValue(), containsString("Error processing results; some failure")); - assertThat(progressTracker.writingResultsPercent.get(), equalTo(0)); + assertThat(statsHolder.getProgressTracker().writingResultsPercent.get(), equalTo(0)); } @SuppressWarnings("unchecked") @@ -212,7 +212,7 @@ public void testProcess_GivenInferenceModelFailedToStore() { Mockito.verifyNoMoreInteractions(auditor); assertThat(resultProcessor.getFailure(), startsWith("error processing results; error storing trained model with id [" + JOB_ID)); - assertThat(progressTracker.writingResultsPercent.get(), equalTo(0)); + assertThat(statsHolder.getProgressTracker().writingResultsPercent.get(), equalTo(0)); } private void givenProcessResults(List results) { @@ -232,6 +232,6 @@ private AnalyticsResultProcessor createResultProcessor() { private AnalyticsResultProcessor createResultProcessor(List fieldNames) { return new AnalyticsResultProcessor( - analyticsConfig, dataFrameRowsJoiner, progressTracker, trainedModelProvider, auditor, fieldNames); + analyticsConfig, dataFrameRowsJoiner, statsHolder, trainedModelProvider, auditor, fieldNames); } }