Skip to content

Commit 11f33d5

Browse files
committed
Persist DatafeedTiminStats with RefreshPolicy.NONE by default
1 parent 7776f75 commit 11f33d5

File tree

11 files changed

+137
-86
lines changed

11 files changed

+137
-86
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
2222
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
2323
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
24+
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister;
2425
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
2526
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
2627
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
27-
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
2828
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
2929

3030
import java.io.BufferedReader;
@@ -42,21 +42,19 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
4242
private final JobConfigProvider jobConfigProvider;
4343
private final DatafeedConfigProvider datafeedConfigProvider;
4444
private final JobResultsProvider jobResultsProvider;
45-
private final JobResultsPersister jobResultsPersister;
4645
private final NamedXContentRegistry xContentRegistry;
4746

4847
@Inject
4948
public TransportPreviewDatafeedAction(ThreadPool threadPool, TransportService transportService,
5049
ActionFilters actionFilters, Client client, JobConfigProvider jobConfigProvider,
5150
DatafeedConfigProvider datafeedConfigProvider, JobResultsProvider jobResultsProvider,
52-
JobResultsPersister jobResultsPersister, NamedXContentRegistry xContentRegistry) {
51+
NamedXContentRegistry xContentRegistry) {
5352
super(PreviewDatafeedAction.NAME, transportService, actionFilters, PreviewDatafeedAction.Request::new);
5453
this.threadPool = threadPool;
5554
this.client = client;
5655
this.jobConfigProvider = jobConfigProvider;
5756
this.datafeedConfigProvider = datafeedConfigProvider;
5857
this.jobResultsProvider = jobResultsProvider;
59-
this.jobResultsPersister = jobResultsPersister;
6058
this.xContentRegistry = xContentRegistry;
6159
}
6260

@@ -83,7 +81,8 @@ protected void doExecute(Task task, PreviewDatafeedAction.Request request, Actio
8381
previewDatafeed.build(),
8482
jobBuilder.build(),
8583
xContentRegistry,
86-
new DatafeedTimingStatsReporter(timingStats, jobResultsPersister),
84+
// Fake DatafeedTimingStatsReporter that does not have access to results index
85+
new DatafeedTimingStatsReporter(timingStats, new DatafeedTimingStatsPersister() {}),
8786
new ActionListener<>() {
8887
@Override
8988
public void onResponse(DataExtractorFactory dataExtractorFactory) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@
5353
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
5454
import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector;
5555
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
56+
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister;
5657
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
5758
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
5859
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
59-
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
6060
import org.elasticsearch.xpack.ml.notifications.Auditor;
6161

6262
import java.io.IOException;
@@ -83,7 +83,6 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
8383
private final PersistentTasksService persistentTasksService;
8484
private final JobConfigProvider jobConfigProvider;
8585
private final DatafeedConfigProvider datafeedConfigProvider;
86-
private final JobResultsPersister jobResultsPersister;
8786
private final Auditor auditor;
8887
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
8988
private final NamedXContentRegistry xContentRegistry;
@@ -94,15 +93,14 @@ public TransportStartDatafeedAction(Settings settings, TransportService transpor
9493
PersistentTasksService persistentTasksService,
9594
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
9695
Client client, JobConfigProvider jobConfigProvider, DatafeedConfigProvider datafeedConfigProvider,
97-
JobResultsPersister jobResultsPersister, Auditor auditor, NamedXContentRegistry xContentRegistry) {
96+
Auditor auditor, NamedXContentRegistry xContentRegistry) {
9897
super(StartDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters, StartDatafeedAction.Request::new,
9998
indexNameExpressionResolver);
10099
this.licenseState = licenseState;
101100
this.persistentTasksService = persistentTasksService;
102101
this.client = client;
103102
this.jobConfigProvider = jobConfigProvider;
104103
this.datafeedConfigProvider = datafeedConfigProvider;
105-
this.jobResultsPersister = jobResultsPersister;
106104
this.auditor = auditor;
107105
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
108106
this.xContentRegistry = xContentRegistry;
@@ -251,8 +249,8 @@ private void createDataExtractor(Job job, DatafeedConfig datafeed, StartDatafeed
251249
datafeed,
252250
job,
253251
xContentRegistry,
254-
// Creating fake DatafeedTimingStatsReporter so that search API call is not needed.
255-
new DatafeedTimingStatsReporter(new DatafeedTimingStats(job.getId()), jobResultsPersister),
252+
// Fake DatafeedTimingStatsReporter that does not have access to results index
253+
new DatafeedTimingStatsReporter(new DatafeedTimingStats(job.getId()), new DatafeedTimingStatsPersister() {}),
256254
ActionListener.wrap(
257255
unused ->
258256
persistentTasksService.sendStartRequest(

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ public String getJobId() {
107107
return jobId;
108108
}
109109

110+
public void finishReportingTimingStats() {
111+
timingStatsReporter.finishReporting();
112+
}
113+
110114
Long runLookBack(long startTime, Long endTime) throws Exception {
111115
lookbackStartTimeMs = skipToStartTime(startTime);
112116
Optional<Long> endMs = Optional.ofNullable(endTime);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.elasticsearch.ResourceNotFoundException;
99
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.support.WriteRequest;
1011
import org.elasticsearch.client.Client;
1112
import org.elasticsearch.common.unit.TimeValue;
1213
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -19,6 +20,7 @@
1920
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
2021
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
2122
import org.elasticsearch.xpack.core.ml.job.results.Result;
23+
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter.DatafeedTimingStatsPersister;
2224
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector;
2325
import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory;
2426
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
@@ -101,8 +103,14 @@ void build(String datafeedId, ActionListener<DatafeedJob> listener) {
101103
);
102104

103105
// Create data extractor factory
104-
Consumer<DatafeedTimingStats> datafeedTimingStatsHandler = timingStats -> {
105-
context.timingStatsReporter = new DatafeedTimingStatsReporter(timingStats, jobResultsPersister);
106+
Consumer<DatafeedTimingStats> datafeedTimingStatsHandler = initialTimingStats -> {
107+
DatafeedTimingStatsPersister timingStatsPersister = new DatafeedTimingStatsPersister() {
108+
@Override
109+
public void persistDatafeedTimingStats(DatafeedTimingStats timingStats, WriteRequest.RefreshPolicy refreshPolicy) {
110+
jobResultsPersister.persistDatafeedTimingStats(timingStats, refreshPolicy);
111+
}
112+
};
113+
context.timingStatsReporter = new DatafeedTimingStatsReporter(initialTimingStats, timingStatsPersister);
106114
DataExtractorFactory.create(
107115
client,
108116
datafeedConfigHolder.get(),

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ public void stop(String source, TimeValue timeout, Exception e) {
347347
}
348348
auditor.info(datafeedJob.getJobId(),
349349
Messages.getMessage(isIsolated() ? Messages.JOB_AUDIT_DATAFEED_ISOLATED : Messages.JOB_AUDIT_DATAFEED_STOPPED));
350+
datafeedJob.finishReportingTimingStats();
350351
finishHandler.accept(e);
351352
logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(),
352353
acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired");

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import org.elasticsearch.common.unit.TimeValue;
1010
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
1111
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
12-
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
1312

1413
import java.util.Objects;
1514

@@ -21,20 +20,27 @@
2120
*/
2221
public class DatafeedTimingStatsReporter {
2322

23+
/** Interface used for persisting current timing stats to the results index. */
24+
public interface DatafeedTimingStatsPersister {
25+
/** Does nothing by default. This behavior is useful when creating fake {@link DatafeedTimingStatsReporter} objects. */
26+
default void persistDatafeedTimingStats(DatafeedTimingStats timingStats, WriteRequest.RefreshPolicy refreshPolicy) {}
27+
}
28+
2429
/** Persisted timing stats. May be stale. */
2530
private DatafeedTimingStats persistedTimingStats;
2631
/** Current timing stats. */
2732
private volatile DatafeedTimingStats currentTimingStats;
2833
/** Object used to persist current timing stats. */
29-
private final JobResultsPersister jobResultsPersister;
34+
private final DatafeedTimingStatsPersister persister;
3035

31-
public DatafeedTimingStatsReporter(DatafeedTimingStats timingStats, JobResultsPersister jobResultsPersister) {
36+
public DatafeedTimingStatsReporter(DatafeedTimingStats timingStats, DatafeedTimingStatsPersister persister) {
3237
Objects.requireNonNull(timingStats);
3338
this.persistedTimingStats = new DatafeedTimingStats(timingStats);
3439
this.currentTimingStats = new DatafeedTimingStats(timingStats);
35-
this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
40+
this.persister = Objects.requireNonNull(persister);
3641
}
3742

43+
/** Gets current timing stats. */
3844
public DatafeedTimingStats getCurrentTimingStats() {
3945
return new DatafeedTimingStats(currentTimingStats);
4046
}
@@ -64,16 +70,23 @@ public void reportDataCounts(DataCounts dataCounts) {
6470
flushIfDifferSignificantly();
6571
}
6672

73+
/** Finishes reporting of timing stats. Makes timing stats persisted immediately. */
74+
public void finishReporting() {
75+
// Don't flush if current timing stats are identical to the persisted ones
76+
if (currentTimingStats.equals(persistedTimingStats) == false) {
77+
flush(WriteRequest.RefreshPolicy.IMMEDIATE);
78+
}
79+
}
80+
6781
private void flushIfDifferSignificantly() {
6882
if (differSignificantly(currentTimingStats, persistedTimingStats)) {
69-
flush();
83+
flush(WriteRequest.RefreshPolicy.NONE);
7084
}
7185
}
7286

73-
private void flush() {
87+
private void flush(WriteRequest.RefreshPolicy refreshPolicy) {
7488
persistedTimingStats = new DatafeedTimingStats(currentTimingStats);
75-
// TODO: Consider changing refresh policy to NONE here and only do IMMEDIATE on datafeed _stop action
76-
jobResultsPersister.persistDatafeedTimingStats(persistedTimingStats, WriteRequest.RefreshPolicy.IMMEDIATE);
89+
persister.persistDatafeedTimingStats(persistedTimingStats, refreshPolicy);
7790
}
7891

7992
/**

0 commit comments

Comments
 (0)