Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;

import java.io.IOException;
Expand All @@ -38,7 +39,7 @@ public class DatafeedTimingStats implements ToXContentObject, Writeable {
private static ConstructingObjectParser<DatafeedTimingStats, Void> createParser() {
ConstructingObjectParser<DatafeedTimingStats, Void> parser =
new ConstructingObjectParser<>(
"datafeed_timing_stats",
TYPE.getPreferredName(),
true,
args -> {
String jobId = (String) args[0];
Expand Down Expand Up @@ -128,6 +129,9 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
if (params.paramAsBoolean(ToXContentParams.FOR_INTERNAL_STORAGE, false)) {
builder.field(Result.RESULT_TYPE.getPreferredName(), TYPE.getPreferredName());
}
builder.field(JOB_ID.getPreferredName(), jobId);
builder.field(SEARCH_COUNT.getPreferredName(), searchCount);
builder.field(BUCKET_COUNT.getPreferredName(), bucketCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.results.Result;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;

import java.io.IOException;
Expand Down Expand Up @@ -195,6 +196,9 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (params.paramAsBoolean(ToXContentParams.FOR_INTERNAL_STORAGE, false)) {
builder.field(Result.RESULT_TYPE.getPreferredName(), TYPE.getPreferredName());
}
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(BUCKET_COUNT.getPreferredName(), bucketCount);
if (params.paramAsBoolean(ToXContentParams.INCLUDE_CALCULATED_FIELDS, false)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.job.results.Influencer;
import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -130,7 +131,11 @@ private void persistBucketInfluencersStandalone(String jobId, List<BucketInfluen
* @return this
*/
public Builder persistTimingStats(TimingStats timingStats) {
indexResult(TimingStats.documentId(timingStats.getJobId()), timingStats, TimingStats.TYPE.getPreferredName());
indexResult(
TimingStats.documentId(timingStats.getJobId()),
timingStats,
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")),
TimingStats.TYPE.getPreferredName());
return this;
}

Expand Down Expand Up @@ -185,7 +190,11 @@ public Builder persistForecastRequestStats(ForecastRequestStats forecastRequestS
}

private void indexResult(String id, ToXContent resultDoc, String resultType) {
try (XContentBuilder content = toXContentBuilder(resultDoc)) {
indexResult(id, resultDoc, ToXContent.EMPTY_PARAMS, resultType);
}

private void indexResult(String id, ToXContent resultDoc, ToXContent.Params params, String resultType) {
try (XContentBuilder content = toXContentBuilder(resultDoc, params)) {
bulkRequest.add(new IndexRequest(indexName).id(id).source(content));
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Error serialising {}", jobId, resultType), e);
Expand Down Expand Up @@ -335,27 +344,37 @@ public void commitStateWrites(String jobId) {
public IndexResponse persistDatafeedTimingStats(DatafeedTimingStats timingStats, WriteRequest.RefreshPolicy refreshPolicy) {
String jobId = timingStats.getJobId();
logger.trace("[{}] Persisting datafeed timing stats", jobId);
Persistable persistable = new Persistable(jobId, timingStats, DatafeedTimingStats.documentId(timingStats.getJobId()));
Persistable persistable = new Persistable(
jobId,
timingStats,
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")),
DatafeedTimingStats.documentId(timingStats.getJobId()));
persistable.setRefreshPolicy(refreshPolicy);
return persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId)).actionGet();
}

private XContentBuilder toXContentBuilder(ToXContent obj) throws IOException {
private static XContentBuilder toXContentBuilder(ToXContent obj, ToXContent.Params params) throws IOException {
XContentBuilder builder = jsonBuilder();
obj.toXContent(builder, ToXContent.EMPTY_PARAMS);
obj.toXContent(builder, params);
return builder;
}

private class Persistable {

private final String jobId;
private final ToXContent object;
private final ToXContent.Params params;
private final String id;
private WriteRequest.RefreshPolicy refreshPolicy;

Persistable(String jobId, ToXContent object, String id) {
this(jobId, object, ToXContent.EMPTY_PARAMS, id);
}

Persistable(String jobId, ToXContent object, ToXContent.Params params, String id) {
this.jobId = jobId;
this.object = object;
this.params = params;
this.id = id;
this.refreshPolicy = WriteRequest.RefreshPolicy.NONE;
}
Expand All @@ -373,7 +392,7 @@ ActionFuture<IndexResponse> persist(String indexName) {
void persist(String indexName, ActionListener<IndexResponse> listener) {
logCall(indexName);

try (XContentBuilder content = toXContentBuilder(object)) {
try (XContentBuilder content = toXContentBuilder(object, params)) {
IndexRequest indexRequest = new IndexRequest(indexName).id(id).source(content).setRefreshPolicy(refreshPolicy);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest, listener, client::index);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ public void testPersistTimingStats() {
assertThat(indexRequest.index(), equalTo(".ml-anomalies-.write-foo"));
assertThat(indexRequest.id(), equalTo("foo_timing_stats"));
Map<String, Object> expectedSourceAsMap = new HashMap<>();
expectedSourceAsMap.put("result_type", "timing_stats");
expectedSourceAsMap.put("job_id", "foo");
expectedSourceAsMap.put("bucket_count", 7);
expectedSourceAsMap.put("minimum_bucket_processing_time_ms", 1.0);
Expand Down Expand Up @@ -255,6 +256,7 @@ public void testPersistDatafeedTimingStats() {
assertThat(indexRequest.id(), equalTo("foo_datafeed_timing_stats"));
assertThat(indexRequest.getRefreshPolicy(), equalTo(WriteRequest.RefreshPolicy.IMMEDIATE));
Map<String, Object> expectedSourceAsMap = new HashMap<>();
expectedSourceAsMap.put("result_type", "datafeed_timing_stats");
expectedSourceAsMap.put("job_id", "foo");
expectedSourceAsMap.put("search_count", 6);
expectedSourceAsMap.put("bucket_count", 66);
Expand Down