From f048cd47520118b5f427e27f9832cdaf75de2e2b Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 18 Oct 2018 13:48:50 +0100 Subject: [PATCH] Get datafeed and job stats --- .../TransportGetDatafeedsStatsAction.java | 33 +++++++----- .../action/TransportGetJobsStatsAction.java | 52 +++++++++++-------- .../TransportGetJobsStatsActionTests.java | 34 +++--------- 3 files changed, 55 insertions(+), 64 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java index 4d11e857c4c8a..ae1b42d2e3083 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDatafeedsStatsAction.java @@ -16,30 +16,33 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import java.util.List; -import java.util.Set; import java.util.stream.Collectors; public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAction { + private final DatafeedConfigProvider datafeedConfigProvider; + @Inject public TransportGetDatafeedsStatsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver) { + IndexNameExpressionResolver indexNameExpressionResolver, + DatafeedConfigProvider datafeedConfigProvider) { super(settings, GetDatafeedsStatsAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, GetDatafeedsStatsAction.Request::new); + this.datafeedConfigProvider = datafeedConfigProvider; } @Override @@ -57,16 +60,18 @@ protected void masterOperation(GetDatafeedsStatsAction.Request request, ClusterS ActionListener listener) throws Exception { logger.debug("Get stats for datafeed '{}'", request.getDatafeedId()); - MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); - Set expandedDatafeedIds = mlMetadata.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds()); - - PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - List results = expandedDatafeedIds.stream() - .map(datafeedId -> getDatafeedStats(datafeedId, state, tasksInProgress)) - .collect(Collectors.toList()); - QueryPage statsPage = new QueryPage<>(results, results.size(), - DatafeedConfig.RESULTS_FIELD); - listener.onResponse(new GetDatafeedsStatsAction.Response(statsPage)); + datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap( + expandedDatafeedIds -> { + PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + List results = expandedDatafeedIds.stream() + .map(datafeedId -> getDatafeedStats(datafeedId, state, tasksInProgress)) + .collect(Collectors.toList()); + QueryPage statsPage = new QueryPage<>(results, results.size(), + DatafeedConfig.RESULTS_FIELD); + listener.onResponse(new GetDatafeedsStatsAction.Response(statsPage)); + }, + listener::onFailure + )); } private static GetDatafeedsStatsAction.Response.DatafeedStats getDatafeedStats( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java index f7a1cf5573733..6104dcd9db64b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java @@ -20,10 +20,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; @@ -32,7 +32,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.stats.ForecastStats; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; @@ -54,28 +54,37 @@ public class TransportGetJobsStatsAction extends TransportTasksAction listener) { - MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterService.state()); - request.setExpandedJobsIds(new ArrayList<>(mlMetadata.expandJobIds(request.getJobId(), request.allowNoJobs()))); - ActionListener finalListener = listener; - listener = ActionListener.wrap(response -> gatherStatsForClosedJobs(mlMetadata, - request, response, finalListener), listener::onFailure); - super.doExecute(task, request, listener); + protected void doExecute(Task task, GetJobsStatsAction.Request request, ActionListener finalListener) { + + jobConfigProvider.expandJobsIds(request.getJobId(), request.allowNoJobs(), ActionListener.wrap( + expandedIds -> { + request.setExpandedJobsIds(new ArrayList<>(expandedIds)); + ActionListener jobStatsListener = ActionListener.wrap( + response -> gatherStatsForClosedJobs(request, response, finalListener), + finalListener::onFailure + ); + super.doExecute(task, request, jobStatsListener); + }, + finalListener::onFailure + )); } @Override @@ -123,21 +132,20 @@ protected void taskOperation(GetJobsStatsAction.Request request, TransportOpenJo // Up until now we gathered the stats for jobs that were open, // This method will fetch the stats for missing jobs, that was stored in the jobs index - void gatherStatsForClosedJobs(MlMetadata mlMetadata, GetJobsStatsAction.Request request, GetJobsStatsAction.Response response, + void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAction.Response response, ActionListener listener) { - List jobIds = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, - request.getExpandedJobsIds(), response.getResponse().results()); - if (jobIds.isEmpty()) { + List closedJobIds = determineJobIdsWithoutLiveStats(request.getExpandedJobsIds(), response.getResponse().results()); + if (closedJobIds.isEmpty()) { listener.onResponse(response); return; } - AtomicInteger counter = new AtomicInteger(jobIds.size()); - AtomicArray jobStats = new AtomicArray<>(jobIds.size()); + AtomicInteger counter = new AtomicInteger(closedJobIds.size()); + AtomicArray jobStats = new AtomicArray<>(closedJobIds.size()); PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - for (int i = 0; i < jobIds.size(); i++) { + for (int i = 0; i < closedJobIds.size(); i++) { int slot = i; - String jobId = jobIds.get(i); + String jobId = closedJobIds.get(i); gatherForecastStats(jobId, forecastStats -> { gatherDataCountsAndModelSizeStats(jobId, (dataCounts, modelSizeStats) -> { JobState jobState = MlTasks.getJobState(jobId, tasks); @@ -180,11 +188,9 @@ static TimeValue durationToTimeValue(Optional duration) { } } - static List determineNonDeletedJobIdsWithoutLiveStats(MlMetadata mlMetadata, - List requestedJobIds, - List stats) { + static List determineJobIdsWithoutLiveStats(List requestedJobIds, + List stats) { Set excludeJobIds = stats.stream().map(GetJobsStatsAction.Response.JobStats::getJobId).collect(Collectors.toSet()); - return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId) && - !mlMetadata.isJobDeleted(jobId)).collect(Collectors.toList()); + return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId)).collect(Collectors.toList()); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java index 2e00ad71251db..2ee184ec877ed 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java @@ -8,7 +8,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; @@ -18,37 +17,27 @@ import java.util.List; import java.util.Optional; -import static org.elasticsearch.xpack.ml.action.TransportGetJobsStatsAction.determineNonDeletedJobIdsWithoutLiveStats; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.elasticsearch.xpack.ml.action.TransportGetJobsStatsAction.determineJobIdsWithoutLiveStats; public class TransportGetJobsStatsActionTests extends ESTestCase { public void testDetermineJobIds() { - MlMetadata mlMetadata = mock(MlMetadata.class); - when(mlMetadata.isJobDeleted(eq("id4"))).thenReturn(true); - - List result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, - Collections.singletonList("id1"), Collections.emptyList()); + List result = determineJobIdsWithoutLiveStats(Collections.singletonList("id1"), Collections.emptyList()); assertEquals(1, result.size()); assertEquals("id1", result.get(0)); - result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, - Collections.singletonList("id1"), Collections.singletonList( + result = determineJobIdsWithoutLiveStats(Collections.singletonList("id1"), Collections.singletonList( new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null))); assertEquals(0, result.size()); - result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, - Arrays.asList("id1", "id2", "id3"), Collections.emptyList()); + result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Collections.emptyList()); assertEquals(3, result.size()); assertEquals("id1", result.get(0)); assertEquals("id2", result.get(1)); assertEquals("id3", result.get(2)); - result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, - Arrays.asList("id1", "id2", "id3"), + result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Collections.singletonList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.CLOSED, null, null, null)) ); @@ -56,27 +45,18 @@ public void testDetermineJobIds() { assertEquals("id2", result.get(0)); assertEquals("id3", result.get(1)); - result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, - Arrays.asList("id1", "id2", "id3"), Arrays.asList( + result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Arrays.asList( new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null), new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, null, JobState.OPENED, null, null, null) )); assertEquals(1, result.size()); assertEquals("id2", result.get(0)); - result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, Arrays.asList("id1", "id2", "id3"), Arrays.asList( + result = determineJobIdsWithoutLiveStats(Arrays.asList("id1", "id2", "id3"), Arrays.asList( new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null), new GetJobsStatsAction.Response.JobStats("id2", new DataCounts("id2"), null, null, JobState.OPENED, null, null, null), new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, null, JobState.OPENED, null, null, null))); assertEquals(0, result.size()); - - // No jobs running, but job 4 is being deleted - result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, - Arrays.asList("id1", "id2", "id3", "id4"), Collections.emptyList()); - assertEquals(3, result.size()); - assertEquals("id1", result.get(0)); - assertEquals("id2", result.get(1)); - assertEquals("id3", result.get(2)); } public void testDurationToTimeValue() {