Skip to content

Commit 5a78d3c

Browse files
authored
Persist DatafeedTimingStats with RefreshPolicy.NONE by default (#44940)
1 parent c5eed7e commit 5a78d3c

File tree

11 files changed

+129
-86
lines changed

11 files changed

+129
-86
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
2525
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
2626
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
27-
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
2827
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
2928

3029
import java.io.BufferedReader;
@@ -42,21 +41,19 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
4241
private final JobConfigProvider jobConfigProvider;
4342
private final DatafeedConfigProvider datafeedConfigProvider;
4443
private final JobResultsProvider jobResultsProvider;
45-
private final JobResultsPersister jobResultsPersister;
4644
private final NamedXContentRegistry xContentRegistry;
4745

4846
@Inject
4947
public TransportPreviewDatafeedAction(ThreadPool threadPool, TransportService transportService,
5048
ActionFilters actionFilters, Client client, JobConfigProvider jobConfigProvider,
5149
DatafeedConfigProvider datafeedConfigProvider, JobResultsProvider jobResultsProvider,
52-
JobResultsPersister jobResultsPersister, NamedXContentRegistry xContentRegistry) {
50+
NamedXContentRegistry xContentRegistry) {
5351
super(PreviewDatafeedAction.NAME, transportService, actionFilters, PreviewDatafeedAction.Request::new);
5452
this.threadPool = threadPool;
5553
this.client = client;
5654
this.jobConfigProvider = jobConfigProvider;
5755
this.datafeedConfigProvider = datafeedConfigProvider;
5856
this.jobResultsProvider = jobResultsProvider;
59-
this.jobResultsPersister = jobResultsPersister;
6057
this.xContentRegistry = xContentRegistry;
6158
}
6259

@@ -83,7 +80,8 @@ protected void doExecute(Task task, PreviewDatafeedAction.Request request, Actio
8380
previewDatafeed.build(),
8481
jobBuilder.build(),
8582
xContentRegistry,
86-
new DatafeedTimingStatsReporter(timingStats, jobResultsPersister),
83+
// Fake DatafeedTimingStatsReporter that does not have access to results index
84+
new DatafeedTimingStatsReporter(timingStats, (ts, refreshPolicy) -> {}),
8785
new ActionListener<>() {
8886
@Override
8987
public void onResponse(DataExtractorFactory dataExtractorFactory) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
5757
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
5858
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
59-
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
6059
import org.elasticsearch.xpack.ml.notifications.Auditor;
6160

6261
import java.io.IOException;
@@ -83,7 +82,6 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
8382
private final PersistentTasksService persistentTasksService;
8483
private final JobConfigProvider jobConfigProvider;
8584
private final DatafeedConfigProvider datafeedConfigProvider;
86-
private final JobResultsPersister jobResultsPersister;
8785
private final Auditor auditor;
8886
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
8987
private final NamedXContentRegistry xContentRegistry;
@@ -94,15 +92,14 @@ public TransportStartDatafeedAction(Settings settings, TransportService transpor
9492
PersistentTasksService persistentTasksService,
9593
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
9694
Client client, JobConfigProvider jobConfigProvider, DatafeedConfigProvider datafeedConfigProvider,
97-
JobResultsPersister jobResultsPersister, Auditor auditor, NamedXContentRegistry xContentRegistry) {
95+
Auditor auditor, NamedXContentRegistry xContentRegistry) {
9896
super(StartDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters, StartDatafeedAction.Request::new,
9997
indexNameExpressionResolver);
10098
this.licenseState = licenseState;
10199
this.persistentTasksService = persistentTasksService;
102100
this.client = client;
103101
this.jobConfigProvider = jobConfigProvider;
104102
this.datafeedConfigProvider = datafeedConfigProvider;
105-
this.jobResultsPersister = jobResultsPersister;
106103
this.auditor = auditor;
107104
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
108105
this.xContentRegistry = xContentRegistry;
@@ -251,8 +248,8 @@ private void createDataExtractor(Job job, DatafeedConfig datafeed, StartDatafeed
251248
datafeed,
252249
job,
253250
xContentRegistry,
254-
// Creating fake DatafeedTimingStatsReporter so that search API call is not needed.
255-
new DatafeedTimingStatsReporter(new DatafeedTimingStats(job.getId()), jobResultsPersister),
251+
// Fake DatafeedTimingStatsReporter that does not have access to results index
252+
new DatafeedTimingStatsReporter(new DatafeedTimingStats(job.getId()), (ts, refreshPolicy) -> {}),
256253
ActionListener.wrap(
257254
unused ->
258255
persistentTasksService.sendStartRequest(

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ public String getJobId() {
107107
return jobId;
108108
}
109109

110+
public void finishReportingTimingStats() {
111+
timingStatsReporter.finishReporting();
112+
}
113+
110114
Long runLookBack(long startTime, Long endTime) throws Exception {
111115
lookbackStartTimeMs = skipToStartTime(startTime);
112116
Optional<Long> endMs = Optional.ofNullable(endTime);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,9 @@ void build(String datafeedId, ActionListener<DatafeedJob> listener) {
101101
);
102102

103103
// Create data extractor factory
104-
Consumer<DatafeedTimingStats> datafeedTimingStatsHandler = timingStats -> {
105-
context.timingStatsReporter = new DatafeedTimingStatsReporter(timingStats, jobResultsPersister);
104+
Consumer<DatafeedTimingStats> datafeedTimingStatsHandler = initialTimingStats -> {
105+
context.timingStatsReporter =
106+
new DatafeedTimingStatsReporter(initialTimingStats, jobResultsPersister::persistDatafeedTimingStats);
106107
DataExtractorFactory.create(
107108
client,
108109
datafeedConfigHolder.get(),

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ public void stop(String source, TimeValue timeout, Exception e) {
347347
}
348348
auditor.info(datafeedJob.getJobId(),
349349
Messages.getMessage(isIsolated() ? Messages.JOB_AUDIT_DATAFEED_ISOLATED : Messages.JOB_AUDIT_DATAFEED_STOPPED));
350+
datafeedJob.finishReportingTimingStats();
350351
finishHandler.accept(e);
351352
logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(),
352353
acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired");

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedTimingStatsReporter.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import org.elasticsearch.common.unit.TimeValue;
1010
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
1111
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
12-
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
1312

1413
import java.util.Objects;
1514

@@ -21,20 +20,28 @@
2120
*/
2221
public class DatafeedTimingStatsReporter {
2322

23+
/** Interface used for persisting current timing stats to the results index. */
24+
@FunctionalInterface
25+
public interface DatafeedTimingStatsPersister {
26+
/** Does nothing by default. This behavior is useful when creating fake {@link DatafeedTimingStatsReporter} objects. */
27+
void persistDatafeedTimingStats(DatafeedTimingStats timingStats, WriteRequest.RefreshPolicy refreshPolicy);
28+
}
29+
2430
/** Persisted timing stats. May be stale. */
2531
private DatafeedTimingStats persistedTimingStats;
2632
/** Current timing stats. */
2733
private volatile DatafeedTimingStats currentTimingStats;
2834
/** Object used to persist current timing stats. */
29-
private final JobResultsPersister jobResultsPersister;
35+
private final DatafeedTimingStatsPersister persister;
3036

31-
public DatafeedTimingStatsReporter(DatafeedTimingStats timingStats, JobResultsPersister jobResultsPersister) {
37+
public DatafeedTimingStatsReporter(DatafeedTimingStats timingStats, DatafeedTimingStatsPersister persister) {
3238
Objects.requireNonNull(timingStats);
3339
this.persistedTimingStats = new DatafeedTimingStats(timingStats);
3440
this.currentTimingStats = new DatafeedTimingStats(timingStats);
35-
this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
41+
this.persister = Objects.requireNonNull(persister);
3642
}
3743

44+
/** Gets current timing stats. */
3845
public DatafeedTimingStats getCurrentTimingStats() {
3946
return new DatafeedTimingStats(currentTimingStats);
4047
}
@@ -64,16 +71,23 @@ public void reportDataCounts(DataCounts dataCounts) {
6471
flushIfDifferSignificantly();
6572
}
6673

74+
/** Finishes reporting of timing stats. Makes timing stats persisted immediately. */
75+
public void finishReporting() {
76+
// Don't flush if current timing stats are identical to the persisted ones
77+
if (currentTimingStats.equals(persistedTimingStats) == false) {
78+
flush(WriteRequest.RefreshPolicy.IMMEDIATE);
79+
}
80+
}
81+
6782
private void flushIfDifferSignificantly() {
6883
if (differSignificantly(currentTimingStats, persistedTimingStats)) {
69-
flush();
84+
flush(WriteRequest.RefreshPolicy.NONE);
7085
}
7186
}
7287

73-
private void flush() {
88+
private void flush(WriteRequest.RefreshPolicy refreshPolicy) {
7489
persistedTimingStats = new DatafeedTimingStats(currentTimingStats);
75-
// TODO: Consider changing refresh policy to NONE here and only do IMMEDIATE on datafeed _stop action
76-
jobResultsPersister.persistDatafeedTimingStats(persistedTimingStats, WriteRequest.RefreshPolicy.IMMEDIATE);
90+
persister.persistDatafeedTimingStats(persistedTimingStats, refreshPolicy);
7791
}
7892

7993
/**

0 commit comments

Comments
 (0)