From 1f49c57f75058e038dac0e15913841da89bf9981 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 10 Aug 2020 14:56:34 +0100 Subject: [PATCH 1/2] DFA Get stats can return multiple responses if there is an error --- .../TransportGetDataFrameAnalyticsStatsAction.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java index 06a79c44ec6b4..65441ecfed1f3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java @@ -63,6 +63,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -167,6 +168,7 @@ void gatherStatsForStoppedTasks(List configs, GetDataF AtomicInteger counter = new AtomicInteger(stoppedConfigs.size()); AtomicArray jobStats = new AtomicArray<>(stoppedConfigs.size()); + AtomicReference searchException = new AtomicReference<>(); for (int i = 0; i < stoppedConfigs.size(); i++) { final int slot = i; DataFrameAnalyticsConfig config = stoppedConfigs.get(i); @@ -174,6 +176,10 @@ void gatherStatsForStoppedTasks(List configs, GetDataF stats -> { jobStats.set(slot, stats); if (counter.decrementAndGet() == 0) { + if (searchException.get() != null) { + listener.onFailure(searchException.get()); + return; + } List allTasksStats = new ArrayList<>(runningTasksResponse.getResponse().results()); allTasksStats.addAll(jobStats.asList()); Collections.sort(allTasksStats, Comparator.comparing(Stats::getId)); @@ -181,7 +187,13 @@ void gatherStatsForStoppedTasks(List configs, GetDataF allTasksStats, allTasksStats.size(), GetDataFrameAnalyticsAction.Response.RESULTS_FIELD))); } }, - listener::onFailure) + e -> { + // take the first error + searchException.compareAndSet(null, e); + if (counter.decrementAndGet() == 0) { + listener.onFailure(e); + } + }) ); } } From b09b33d36862d0254b25b88bf38adeef43cd8bf6 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 10 Aug 2020 15:48:02 +0100 Subject: [PATCH 2/2] And for Job Stats --- .../action/TransportGetJobsStatsAction.java | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java index b96e6ca99c8b0..bab56bcc2753c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java @@ -46,6 +46,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -143,7 +144,17 @@ void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAc } AtomicInteger counter = new AtomicInteger(closedJobIds.size()); + AtomicReference searchException = new AtomicReference<>(); AtomicArray jobStats = new AtomicArray<>(closedJobIds.size()); + + Consumer errorHandler = e -> { + // take the first error + searchException.compareAndSet(null, e); + if (counter.decrementAndGet() == 0) { + listener.onFailure(e); + } + }; + PersistentTasksCustomMetadata tasks = clusterService.state().getMetadata().custom(PersistentTasksCustomMetadata.TYPE); for (int i = 0; i < closedJobIds.size(); i++) { int slot = i; @@ -159,14 +170,19 @@ void gatherStatsForClosedJobs(GetJobsStatsAction.Request request, GetJobsStatsAc jobStats.set(slot, new JobStats(jobId, dataCounts, modelSizeStats, forecastStats, jobState, null, assignmentExplanation, null, timingStats)); if (counter.decrementAndGet() == 0) { + if (searchException.get() != null) { + // there was an error + listener.onFailure(searchException.get()); + return; + } List results = response.getResponse().results(); results.addAll(jobStats.asList()); Collections.sort(results, Comparator.comparing(GetJobsStatsAction.Response.JobStats::getJobId)); listener.onResponse(new GetJobsStatsAction.Response(response.getTaskFailures(), response.getNodeFailures(), new QueryPage<>(results, results.size(), Job.RESULTS_FIELD))); } - }, listener::onFailure); - }, listener::onFailure); + }, errorHandler); + }, errorHandler); } }