Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.xpack.core.XPackField;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

Expand All @@ -27,49 +28,61 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage {
public static final String MODEL_SIZE = "model_size";
public static final String CREATED_BY = "created_by";
public static final String NODE_COUNT = "node_count";
public static final String DATA_FRAME_ANALYTICS_JOBS_FIELD = "data_frame_analytics_jobs";

private final Map<String, Object> jobsUsage;
private final Map<String, Object> datafeedsUsage;
private final Map<String, Object> analyticsUsage;
private final int nodeCount;

public MachineLearningFeatureSetUsage(boolean available, boolean enabled, Map<String, Object> jobsUsage,
Map<String, Object> datafeedsUsage, int nodeCount) {
public MachineLearningFeatureSetUsage(boolean available,
boolean enabled,
Map<String, Object> jobsUsage,
Map<String, Object> datafeedsUsage,
Map<String, Object> analyticsUsage,
int nodeCount) {
super(XPackField.MACHINE_LEARNING, available, enabled);
this.jobsUsage = Objects.requireNonNull(jobsUsage);
this.datafeedsUsage = Objects.requireNonNull(datafeedsUsage);
this.analyticsUsage = Objects.requireNonNull(analyticsUsage);
this.nodeCount = nodeCount;
}

public MachineLearningFeatureSetUsage(StreamInput in) throws IOException {
super(in);
this.jobsUsage = in.readMap();
this.datafeedsUsage = in.readMap();
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
this.analyticsUsage = in.readMap();
} else {
this.analyticsUsage = Collections.emptyMap();
}
if (in.getVersion().onOrAfter(Version.V_6_5_0)) {
this.nodeCount = in.readInt();
} else {
this.nodeCount = -1;
}
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(jobsUsage);
out.writeMap(datafeedsUsage);
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeMap(analyticsUsage);
}
if (out.getVersion().onOrAfter(Version.V_6_5_0)) {
out.writeInt(nodeCount);
}
}
}

@Override
protected void innerXContent(XContentBuilder builder, Params params) throws IOException {
super.innerXContent(builder, params);
if (jobsUsage != null) {
builder.field(JOBS_FIELD, jobsUsage);
}
if (datafeedsUsage != null) {
builder.field(DATAFEEDS_FIELD, datafeedsUsage);
}
builder.field(JOBS_FIELD, jobsUsage);
builder.field(DATAFEEDS_FIELD, datafeedsUsage);
builder.field(DATA_FRAME_ANALYTICS_JOBS_FIELD, analyticsUsage);
if (nodeCount >= 0) {
builder.field(NODE_COUNT, nodeCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
Expand Down Expand Up @@ -157,6 +160,7 @@ public static class Retriever {
private final boolean enabled;
private Map<String, Object> jobsUsage;
private Map<String, Object> datafeedsUsage;
private Map<String, Object> analyticsUsage;
private int nodeCount;

public Retriever(Client client, JobManagerHolder jobManagerHolder, boolean available, boolean enabled, int nodeCount) {
Expand All @@ -166,26 +170,47 @@ public Retriever(Client client, JobManagerHolder jobManagerHolder, boolean avail
this.enabled = enabled;
this.jobsUsage = new LinkedHashMap<>();
this.datafeedsUsage = new LinkedHashMap<>();
this.analyticsUsage = new LinkedHashMap<>();
this.nodeCount = nodeCount;
}

public void execute(ActionListener<Usage> listener) {
// empty holder means either ML disabled or transport client mode
if (jobManagerHolder.isEmpty()) {
listener.onResponse(
new MachineLearningFeatureSetUsage(available, enabled, Collections.emptyMap(), Collections.emptyMap(), 0));
new MachineLearningFeatureSetUsage(available,
enabled,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
0));
return;
}

// Step 3. Extract usage from data frame analytics and return usage response
ActionListener<GetDataFrameAnalyticsStatsAction.Response> dataframeAnalyticsListener = ActionListener.wrap(
response -> {
addDataFrameAnalyticsUsage(response, analyticsUsage);
listener.onResponse(new MachineLearningFeatureSetUsage(available,
enabled,
jobsUsage,
datafeedsUsage,
analyticsUsage,
nodeCount));
},
listener::onFailure
);

// Step 2. Extract usage from datafeeds stats and return usage response
ActionListener<GetDatafeedsStatsAction.Response> datafeedStatsListener =
ActionListener.wrap(response -> {
addDatafeedsUsage(response);
listener.onResponse(new MachineLearningFeatureSetUsage(
available, enabled, jobsUsage, datafeedsUsage, nodeCount));
},
listener::onFailure
);
ActionListener.wrap(response -> {
addDatafeedsUsage(response);
GetDataFrameAnalyticsStatsAction.Request dataframeAnalyticsStatsRequest =
new GetDataFrameAnalyticsStatsAction.Request(GetDatafeedsStatsAction.ALL);
dataframeAnalyticsStatsRequest.setPageParams(new PageParams(0, 10_000));
client.execute(GetDataFrameAnalyticsStatsAction.INSTANCE, dataframeAnalyticsStatsRequest, dataframeAnalyticsListener);
},
listener::onFailure);

// Step 1. Extract usage from jobs stats and then request stats for all datafeeds
GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(MetaData.ALL);
Expand Down Expand Up @@ -283,17 +308,31 @@ private void addDatafeedsUsage(GetDatafeedsStatsAction.Response response) {
ds -> Counter.newCounter()).addAndGet(1);
}

datafeedsUsage.put(MachineLearningFeatureSetUsage.ALL, createDatafeedUsageEntry(response.getResponse().count()));
datafeedsUsage.put(MachineLearningFeatureSetUsage.ALL, createCountUsageEntry(response.getResponse().count()));
for (DatafeedState datafeedState : datafeedCountByState.keySet()) {
datafeedsUsage.put(datafeedState.name().toLowerCase(Locale.ROOT),
createDatafeedUsageEntry(datafeedCountByState.get(datafeedState).get()));
createCountUsageEntry(datafeedCountByState.get(datafeedState).get()));
}
}

private Map<String, Object> createDatafeedUsageEntry(long count) {
private Map<String, Object> createCountUsageEntry(long count) {
Map<String, Object> usage = new HashMap<>();
usage.put(MachineLearningFeatureSetUsage.COUNT, count);
return usage;
}

private void addDataFrameAnalyticsUsage(GetDataFrameAnalyticsStatsAction.Response response,
Map<String, Object> dataframeAnalyticsUsage) {
Map<DataFrameAnalyticsState, Counter> dataFrameAnalyticsStateCounterMap = new HashMap<>();

for(GetDataFrameAnalyticsStatsAction.Response.Stats stats : response.getResponse().results()) {
dataFrameAnalyticsStateCounterMap.computeIfAbsent(stats.getState(), ds -> Counter.newCounter()).addAndGet(1);
}
dataframeAnalyticsUsage.put(MachineLearningFeatureSetUsage.ALL, createCountUsageEntry(response.getResponse().count()));
for (DataFrameAnalyticsState state : dataFrameAnalyticsStateCounterMap.keySet()) {
dataframeAnalyticsUsage.put(state.name().toLowerCase(Locale.ROOT),
createCountUsageEntry(dataFrameAnalyticsStateCounterMap.get(state).get()));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
Expand Down Expand Up @@ -94,6 +97,7 @@ public void init() {
when(clusterService.state()).thenReturn(clusterState);
givenJobs(Collections.emptyList(), Collections.emptyList());
givenDatafeeds(Collections.emptyList());
givenDataFrameAnalytics(Collections.emptyList());
}

public void testIsRunningOnMlPlatform() {
Expand Down Expand Up @@ -171,6 +175,11 @@ public void testUsage() throws Exception {
buildDatafeedStats(DatafeedState.STARTED),
buildDatafeedStats(DatafeedState.STOPPED)
));
givenDataFrameAnalytics(Arrays.asList(
buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STOPPED),
buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STOPPED),
buildDataFrameAnalyticsStats(DataFrameAnalyticsState.STARTED)
));

MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(TestEnvironment.newEnvironment(settings.build()),
clusterService, client, licenseState, jobManagerHolder);
Expand Down Expand Up @@ -237,6 +246,10 @@ public void testUsage() throws Exception {
assertThat(source.getValue("datafeeds.started.count"), equalTo(2));
assertThat(source.getValue("datafeeds.stopped.count"), equalTo(1));

assertThat(source.getValue("data_frame_analytics_jobs._all.count"), equalTo(3));
assertThat(source.getValue("data_frame_analytics_jobs.started.count"), equalTo(1));
assertThat(source.getValue("data_frame_analytics_jobs.stopped.count"), equalTo(2));

assertThat(source.getValue("jobs._all.forecasts.total"), equalTo(11));
assertThat(source.getValue("jobs._all.forecasts.forecasted_jobs"), equalTo(2));

Expand Down Expand Up @@ -418,6 +431,19 @@ private void givenDatafeeds(List<GetDatafeedsStatsAction.Response.DatafeedStats>
}).when(client).execute(same(GetDatafeedsStatsAction.INSTANCE), any(), any());
}

private void givenDataFrameAnalytics(List<GetDataFrameAnalyticsStatsAction.Response.Stats> dataFrameAnalyticsStats) {
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<GetDataFrameAnalyticsStatsAction.Response> listener =
(ActionListener<GetDataFrameAnalyticsStatsAction.Response>) invocationOnMock.getArguments()[2];
listener.onResponse(new GetDataFrameAnalyticsStatsAction.Response(
new QueryPage<>(dataFrameAnalyticsStats,
dataFrameAnalyticsStats.size(),
GetDataFrameAnalyticsAction.Response.RESULTS_FIELD)));
return Void.TYPE;
}).when(client).execute(same(GetDataFrameAnalyticsStatsAction.INSTANCE), any(), any());
}

private static Detector buildMinDetector(String fieldName) {
Detector.Builder detectorBuilder = new Detector.Builder();
detectorBuilder.setFunction("min");
Expand Down Expand Up @@ -458,6 +484,12 @@ private static GetDatafeedsStatsAction.Response.DatafeedStats buildDatafeedStats
return stats;
}

private static GetDataFrameAnalyticsStatsAction.Response.Stats buildDataFrameAnalyticsStats(DataFrameAnalyticsState state) {
GetDataFrameAnalyticsStatsAction.Response.Stats stats = mock(GetDataFrameAnalyticsStatsAction.Response.Stats.class);
when(stats.getState()).thenReturn(state);
return stats;
}

private static ForecastStats buildForecastStats(long numberOfForecasts) {
return new ForecastStatsTests().createForecastStats(numberOfForecasts, numberOfForecasts);
}
Expand Down