From 5190b7d6af216b2b830440adac0d671ffb2c9f24 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Thu, 22 Aug 2019 12:49:49 -0500 Subject: [PATCH 1/2] [ML] Adding data frame analytics stats to _usage API (#45820) * [ML] Adding data frame analytics stats to _usage API * making the size of analytics stats 10k --- .../ml/MachineLearningFeatureSetUsage.java | 33 +++++++++++----- .../xpack/ml/MachineLearningFeatureSet.java | 39 ++++++++++++++++--- .../ml/MachineLearningFeatureSetTests.java | 32 +++++++++++++++ 3 files changed, 88 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java index 755d6faef0ba2..ca898f9cbf6a1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java @@ -13,6 +13,7 @@ import org.elasticsearch.xpack.core.XPackField; import java.io.IOException; +import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -27,16 +28,23 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage { public static final String MODEL_SIZE = "model_size"; public static final String CREATED_BY = "created_by"; public static final String NODE_COUNT = "node_count"; + public static final String DATA_FRAME_ANALYTICS_JOBS_FIELD = "data_frame_analytics_jobs"; private final Map jobsUsage; private final Map datafeedsUsage; + private final Map analyticsUsage; private final int nodeCount; - public MachineLearningFeatureSetUsage(boolean available, boolean enabled, Map jobsUsage, - Map datafeedsUsage, int nodeCount) { + public MachineLearningFeatureSetUsage(boolean available, + boolean enabled, + Map jobsUsage, + Map datafeedsUsage, + Map analyticsUsage, + int nodeCount) { super(XPackField.MACHINE_LEARNING, available, enabled); this.jobsUsage = Objects.requireNonNull(jobsUsage); this.datafeedsUsage = Objects.requireNonNull(datafeedsUsage); + this.analyticsUsage = Objects.requireNonNull(analyticsUsage); this.nodeCount = nodeCount; } @@ -44,32 +52,37 @@ public MachineLearningFeatureSetUsage(StreamInput in) throws IOException { super(in); this.jobsUsage = in.readMap(); this.datafeedsUsage = in.readMap(); + if (in.getVersion().onOrAfter(Version.V_7_4_0)) { + this.analyticsUsage = in.readMap(); + } else { + this.analyticsUsage = Collections.emptyMap(); + } if (in.getVersion().onOrAfter(Version.V_6_5_0)) { this.nodeCount = in.readInt(); } else { this.nodeCount = -1; } - } + } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeMap(jobsUsage); out.writeMap(datafeedsUsage); + if (out.getVersion().onOrAfter(Version.V_7_4_0)) { + out.writeMap(analyticsUsage); + } if (out.getVersion().onOrAfter(Version.V_6_5_0)) { out.writeInt(nodeCount); } - } + } @Override protected void innerXContent(XContentBuilder builder, Params params) throws IOException { super.innerXContent(builder, params); - if (jobsUsage != null) { - builder.field(JOBS_FIELD, jobsUsage); - } - if (datafeedsUsage != null) { - builder.field(DATAFEEDS_FIELD, datafeedsUsage); - } + builder.field(JOBS_FIELD, jobsUsage); + builder.field(DATAFEEDS_FIELD, datafeedsUsage); + builder.field(DATA_FRAME_ANALYTICS_JOBS_FIELD, analyticsUsage); if (nodeCount >= 0) { builder.field(NODE_COUNT, nodeCount); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java index d6c15275c6eeb..18914b7d68eaf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java @@ -173,16 +173,29 @@ public void execute(ActionListener listener) { // empty holder means either ML disabled or transport client mode if (jobManagerHolder.isEmpty()) { listener.onResponse( - new MachineLearningFeatureSetUsage(available, enabled, Collections.emptyMap(), Collections.emptyMap(), 0)); + new MachineLearningFeatureSetUsage(available, enabled, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 0)); return; } + // Step 3. Extract usage from data frame analytics and return usage response + ActionListener dataframeAnalyticsListener = ActionListener.wrap( + response -> { + addDataFrameAnalyticsUsage(response, analyticsUsage); + MachineLearningFeatureSetUsage usage = new MachineLearningFeatureSetUsage(licenseState.isMachineLearningAllowed(), + enabled, jobsUsage, datafeedsUsage, analyticsUsage, nodeCount); + listener.onResponse(new XPackUsageFeatureResponse(usage)); + }, + listener::onFailure + ); + // Step 2. Extract usage from datafeeds stats and return usage response ActionListener datafeedStatsListener = ActionListener.wrap(response -> { addDatafeedsUsage(response); - listener.onResponse(new MachineLearningFeatureSetUsage( - available, enabled, jobsUsage, datafeedsUsage, nodeCount)); + GetDataFrameAnalyticsStatsAction.Request dataframeAnalyticsStatsRequest = + new GetDataFrameAnalyticsStatsAction.Request(GetDatafeedsStatsAction.ALL); + dataframeAnalyticsStatsRequest.setPageParams(new PageParams(0, 10_000)); + client.execute(GetDataFrameAnalyticsStatsAction.INSTANCE, dataframeAnalyticsStatsRequest, dataframeAnalyticsListener); }, listener::onFailure ); @@ -283,17 +296,31 @@ private void addDatafeedsUsage(GetDatafeedsStatsAction.Response response) { ds -> Counter.newCounter()).addAndGet(1); } - datafeedsUsage.put(MachineLearningFeatureSetUsage.ALL, createDatafeedUsageEntry(response.getResponse().count())); + datafeedsUsage.put(MachineLearningFeatureSetUsage.ALL, createCountUsageEntry(response.getResponse().count())); for (DatafeedState datafeedState : datafeedCountByState.keySet()) { datafeedsUsage.put(datafeedState.name().toLowerCase(Locale.ROOT), - createDatafeedUsageEntry(datafeedCountByState.get(datafeedState).get())); + createCountUsageEntry(datafeedCountByState.get(datafeedState).get())); } } - private Map createDatafeedUsageEntry(long count) { + private Map createCountUsageEntry(long count) { Map usage = new HashMap<>(); usage.put(MachineLearningFeatureSetUsage.COUNT, count); return usage; } + + private void addDataFrameAnalyticsUsage(GetDataFrameAnalyticsStatsAction.Response response, + Map dataframeAnalyticsUsage) { + Map dataFrameAnalyticsStateCounterMap = new HashMap<>(); + + for(GetDataFrameAnalyticsStatsAction.Response.Stats stats : response.getResponse().results()) { + dataFrameAnalyticsStateCounterMap.computeIfAbsent(stats.getState(), ds -> Counter.newCounter()).addAndGet(1); + } + dataframeAnalyticsUsage.put(MachineLearningFeatureSetUsage.ALL, createCountUsageEntry(response.getResponse().count())); + for (DataFrameAnalyticsState state : dataFrameAnalyticsStateCounterMap.keySet()) { + dataframeAnalyticsUsage.put(state.name().toLowerCase(Locale.ROOT), + createCountUsageEntry(dataFrameAnalyticsStateCounterMap.get(state).get())); + } + } } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java index ca076050578af..8f2ed47794aee 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java @@ -33,10 +33,13 @@ import org.elasticsearch.xpack.core.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage; import org.elasticsearch.xpack.core.ml.MachineLearningField; +import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction; +import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; @@ -94,6 +97,7 @@ public void init() { when(clusterService.state()).thenReturn(clusterState); givenJobs(Collections.emptyList(), Collections.emptyList()); givenDatafeeds(Collections.emptyList()); + givenDataFrameAnalytics(Collections.emptyList()); } public void testIsRunningOnMlPlatform() { @@ -171,6 +175,11 @@ public void testUsage() throws Exception { buildDatafeedStats(DatafeedState.STARTED), buildDatafeedStats(DatafeedState.STOPPED) )); + givenDataFrameAnalytics(Arrays.asList( + buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STOPPED), + buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STOPPED), + buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STARTED) + )); MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(TestEnvironment.newEnvironment(settings.build()), clusterService, client, licenseState, jobManagerHolder); @@ -237,6 +246,10 @@ public void testUsage() throws Exception { assertThat(source.getValue("datafeeds.started.count"), equalTo(2)); assertThat(source.getValue("datafeeds.stopped.count"), equalTo(1)); + assertThat(source.getValue("data_frame_analytics_jobs._all.count"), equalTo(3)); + assertThat(source.getValue("data_frame_analytics_jobs.started.count"), equalTo(1)); + assertThat(source.getValue("data_frame_analytics_jobs.stopped.count"), equalTo(2)); + assertThat(source.getValue("jobs._all.forecasts.total"), equalTo(11)); assertThat(source.getValue("jobs._all.forecasts.forecasted_jobs"), equalTo(2)); @@ -418,6 +431,19 @@ private void givenDatafeeds(List }).when(client).execute(same(GetDatafeedsStatsAction.INSTANCE), any(), any()); } + private void givenDataFrameAnalytics(List dataFrameAnalyticsStats) { + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = + (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(new GetDataFrameAnalyticsStatsAction.Response( + new QueryPage<>(dataFrameAnalyticsStats, + dataFrameAnalyticsStats.size(), + GetDataFrameAnalyticsAction.Response.RESULTS_FIELD))); + return Void.TYPE; + }).when(client).execute(same(GetDataFrameAnalyticsStatsAction.INSTANCE), any(), any()); + } + private static Detector buildMinDetector(String fieldName) { Detector.Builder detectorBuilder = new Detector.Builder(); detectorBuilder.setFunction("min"); @@ -458,6 +484,12 @@ private static GetDatafeedsStatsAction.Response.DatafeedStats buildDatafeedStats return stats; } + private static GetDataFrameAnalyticsStatsAction.Response.Stats buildDataFrameAnalyticsStats(DataFrameAnalyticsState state) { + GetDataFrameAnalyticsStatsAction.Response.Stats stats = mock(GetDataFrameAnalyticsStatsAction.Response.Stats.class); + when(stats.getState()).thenReturn(state); + return stats; + } + private static ForecastStats buildForecastStats(long numberOfForecasts) { return new ForecastStatsTests().createForecastStats(numberOfForecasts, numberOfForecasts); } From e98d696ed4e7965a9b5faf14bdc28d8f64ec72ca Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Thu, 22 Aug 2019 13:22:23 -0500 Subject: [PATCH 2/2] adjusting backport --- .../xpack/ml/MachineLearningFeatureSet.java | 62 +++++++++++-------- 1 file changed, 37 insertions(+), 25 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java index 18914b7d68eaf..e06feb4d6aa66 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java @@ -24,10 +24,13 @@ import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.action.util.PageParams; import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage; +import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; @@ -157,6 +160,7 @@ public static class Retriever { private final boolean enabled; private Map jobsUsage; private Map datafeedsUsage; + private Map analyticsUsage; private int nodeCount; public Retriever(Client client, JobManagerHolder jobManagerHolder, boolean available, boolean enabled, int nodeCount) { @@ -166,6 +170,7 @@ public Retriever(Client client, JobManagerHolder jobManagerHolder, boolean avail this.enabled = enabled; this.jobsUsage = new LinkedHashMap<>(); this.datafeedsUsage = new LinkedHashMap<>(); + this.analyticsUsage = new LinkedHashMap<>(); this.nodeCount = nodeCount; } @@ -173,32 +178,39 @@ public void execute(ActionListener listener) { // empty holder means either ML disabled or transport client mode if (jobManagerHolder.isEmpty()) { listener.onResponse( - new MachineLearningFeatureSetUsage(available, enabled, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 0)); + new MachineLearningFeatureSetUsage(available, + enabled, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + 0)); return; } - // Step 3. Extract usage from data frame analytics and return usage response - ActionListener dataframeAnalyticsListener = ActionListener.wrap( + // Step 3. Extract usage from data frame analytics and return usage response + ActionListener dataframeAnalyticsListener = ActionListener.wrap( response -> { addDataFrameAnalyticsUsage(response, analyticsUsage); - MachineLearningFeatureSetUsage usage = new MachineLearningFeatureSetUsage(licenseState.isMachineLearningAllowed(), - enabled, jobsUsage, datafeedsUsage, analyticsUsage, nodeCount); - listener.onResponse(new XPackUsageFeatureResponse(usage)); + listener.onResponse(new MachineLearningFeatureSetUsage(available, + enabled, + jobsUsage, + datafeedsUsage, + analyticsUsage, + nodeCount)); }, listener::onFailure ); // Step 2. Extract usage from datafeeds stats and return usage response ActionListener datafeedStatsListener = - ActionListener.wrap(response -> { - addDatafeedsUsage(response); - GetDataFrameAnalyticsStatsAction.Request dataframeAnalyticsStatsRequest = - new GetDataFrameAnalyticsStatsAction.Request(GetDatafeedsStatsAction.ALL); - dataframeAnalyticsStatsRequest.setPageParams(new PageParams(0, 10_000)); - client.execute(GetDataFrameAnalyticsStatsAction.INSTANCE, dataframeAnalyticsStatsRequest, dataframeAnalyticsListener); - }, - listener::onFailure - ); + ActionListener.wrap(response -> { + addDatafeedsUsage(response); + GetDataFrameAnalyticsStatsAction.Request dataframeAnalyticsStatsRequest = + new GetDataFrameAnalyticsStatsAction.Request(GetDatafeedsStatsAction.ALL); + dataframeAnalyticsStatsRequest.setPageParams(new PageParams(0, 10_000)); + client.execute(GetDataFrameAnalyticsStatsAction.INSTANCE, dataframeAnalyticsStatsRequest, dataframeAnalyticsListener); + }, + listener::onFailure); // Step 1. Extract usage from jobs stats and then request stats for all datafeeds GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(MetaData.ALL); @@ -309,18 +321,18 @@ private Map createCountUsageEntry(long count) { return usage; } - private void addDataFrameAnalyticsUsage(GetDataFrameAnalyticsStatsAction.Response response, + private void addDataFrameAnalyticsUsage(GetDataFrameAnalyticsStatsAction.Response response, Map dataframeAnalyticsUsage) { - Map dataFrameAnalyticsStateCounterMap = new HashMap<>(); + Map dataFrameAnalyticsStateCounterMap = new HashMap<>(); - for(GetDataFrameAnalyticsStatsAction.Response.Stats stats : response.getResponse().results()) { - dataFrameAnalyticsStateCounterMap.computeIfAbsent(stats.getState(), ds -> Counter.newCounter()).addAndGet(1); - } - dataframeAnalyticsUsage.put(MachineLearningFeatureSetUsage.ALL, createCountUsageEntry(response.getResponse().count())); - for (DataFrameAnalyticsState state : dataFrameAnalyticsStateCounterMap.keySet()) { - dataframeAnalyticsUsage.put(state.name().toLowerCase(Locale.ROOT), - createCountUsageEntry(dataFrameAnalyticsStateCounterMap.get(state).get())); + for(GetDataFrameAnalyticsStatsAction.Response.Stats stats : response.getResponse().results()) { + dataFrameAnalyticsStateCounterMap.computeIfAbsent(stats.getState(), ds -> Counter.newCounter()).addAndGet(1); + } + dataframeAnalyticsUsage.put(MachineLearningFeatureSetUsage.ALL, createCountUsageEntry(response.getResponse().count())); + for (DataFrameAnalyticsState state : dataFrameAnalyticsStateCounterMap.keySet()) { + dataframeAnalyticsUsage.put(state.name().toLowerCase(Locale.ROOT), + createCountUsageEntry(dataFrameAnalyticsStateCounterMap.get(state).get())); + } } } - } }