Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;

import java.io.BufferedReader;
Expand All @@ -42,21 +41,19 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
private final JobConfigProvider jobConfigProvider;
private final DatafeedConfigProvider datafeedConfigProvider;
private final JobResultsProvider jobResultsProvider;
private final JobResultsPersister jobResultsPersister;
private final NamedXContentRegistry xContentRegistry;

@Inject
public TransportPreviewDatafeedAction(ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, Client client, JobConfigProvider jobConfigProvider,
DatafeedConfigProvider datafeedConfigProvider, JobResultsProvider jobResultsProvider,
JobResultsPersister jobResultsPersister, NamedXContentRegistry xContentRegistry) {
NamedXContentRegistry xContentRegistry) {
super(PreviewDatafeedAction.NAME, transportService, actionFilters, PreviewDatafeedAction.Request::new);
this.threadPool = threadPool;
this.client = client;
this.jobConfigProvider = jobConfigProvider;
this.datafeedConfigProvider = datafeedConfigProvider;
this.jobResultsProvider = jobResultsProvider;
this.jobResultsPersister = jobResultsPersister;
this.xContentRegistry = xContentRegistry;
}

Expand All @@ -83,7 +80,8 @@ protected void doExecute(Task task, PreviewDatafeedAction.Request request, Actio
previewDatafeed.build(),
jobBuilder.build(),
xContentRegistry,
new DatafeedTimingStatsReporter(timingStats, jobResultsPersister),
// Fake DatafeedTimingStatsReporter that does not have access to results index
new DatafeedTimingStatsReporter(timingStats, (ts, refreshPolicy) -> {}),
new ActionListener<>() {
@Override
public void onResponse(DataExtractorFactory dataExtractorFactory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.notifications.Auditor;

import java.io.IOException;
Expand All @@ -83,7 +82,6 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
private final PersistentTasksService persistentTasksService;
private final JobConfigProvider jobConfigProvider;
private final DatafeedConfigProvider datafeedConfigProvider;
private final JobResultsPersister jobResultsPersister;
private final Auditor auditor;
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
private final NamedXContentRegistry xContentRegistry;
Expand All @@ -94,15 +92,14 @@ public TransportStartDatafeedAction(Settings settings, TransportService transpor
PersistentTasksService persistentTasksService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Client client, JobConfigProvider jobConfigProvider, DatafeedConfigProvider datafeedConfigProvider,
JobResultsPersister jobResultsPersister, Auditor auditor, NamedXContentRegistry xContentRegistry) {
Auditor auditor, NamedXContentRegistry xContentRegistry) {
super(StartDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters, StartDatafeedAction.Request::new,
indexNameExpressionResolver);
this.licenseState = licenseState;
this.persistentTasksService = persistentTasksService;
this.client = client;
this.jobConfigProvider = jobConfigProvider;
this.datafeedConfigProvider = datafeedConfigProvider;
this.jobResultsPersister = jobResultsPersister;
this.auditor = auditor;
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
this.xContentRegistry = xContentRegistry;
Expand Down Expand Up @@ -251,8 +248,8 @@ private void createDataExtractor(Job job, DatafeedConfig datafeed, StartDatafeed
datafeed,
job,
xContentRegistry,
// Creating fake DatafeedTimingStatsReporter so that search API call is not needed.
new DatafeedTimingStatsReporter(new DatafeedTimingStats(job.getId()), jobResultsPersister),
// Fake DatafeedTimingStatsReporter that does not have access to results index
new DatafeedTimingStatsReporter(new DatafeedTimingStats(job.getId()), (ts, refreshPolicy) -> {}),
ActionListener.wrap(
unused ->
persistentTasksService.sendStartRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ public String getJobId() {
return jobId;
}

public void finishReportingTimingStats() {
timingStatsReporter.finishReporting();
}

Long runLookBack(long startTime, Long endTime) throws Exception {
lookbackStartTimeMs = skipToStartTime(startTime);
Optional<Long> endMs = Optional.ofNullable(endTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ void build(String datafeedId, ActionListener<DatafeedJob> listener) {
);

// Create data extractor factory
Consumer<DatafeedTimingStats> datafeedTimingStatsHandler = timingStats -> {
context.timingStatsReporter = new DatafeedTimingStatsReporter(timingStats, jobResultsPersister);
Consumer<DatafeedTimingStats> datafeedTimingStatsHandler = initialTimingStats -> {
context.timingStatsReporter =
new DatafeedTimingStatsReporter(initialTimingStats, jobResultsPersister::persistDatafeedTimingStats);
DataExtractorFactory.create(
client,
datafeedConfigHolder.get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ public void stop(String source, TimeValue timeout, Exception e) {
}
auditor.info(datafeedJob.getJobId(),
Messages.getMessage(isIsolated() ? Messages.JOB_AUDIT_DATAFEED_ISOLATED : Messages.JOB_AUDIT_DATAFEED_STOPPED));
datafeedJob.finishReportingTimingStats();
finishHandler.accept(e);
logger.info("[{}] datafeed [{}] for job [{}] has been stopped{}", source, datafeedId, datafeedJob.getJobId(),
acquired ? "" : ", but there may be pending tasks as the timeout [" + timeout.getStringRep() + "] expired");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;

import java.util.Objects;

Expand All @@ -21,20 +20,28 @@
*/
public class DatafeedTimingStatsReporter {

/** Interface used for persisting current timing stats to the results index. */
@FunctionalInterface
public interface DatafeedTimingStatsPersister {
/** Does nothing by default. This behavior is useful when creating fake {@link DatafeedTimingStatsReporter} objects. */
void persistDatafeedTimingStats(DatafeedTimingStats timingStats, WriteRequest.RefreshPolicy refreshPolicy);
}

/** Persisted timing stats. May be stale. */
private DatafeedTimingStats persistedTimingStats;
/** Current timing stats. */
private volatile DatafeedTimingStats currentTimingStats;
/** Object used to persist current timing stats. */
private final JobResultsPersister jobResultsPersister;
private final DatafeedTimingStatsPersister persister;

public DatafeedTimingStatsReporter(DatafeedTimingStats timingStats, JobResultsPersister jobResultsPersister) {
public DatafeedTimingStatsReporter(DatafeedTimingStats timingStats, DatafeedTimingStatsPersister persister) {
Objects.requireNonNull(timingStats);
this.persistedTimingStats = new DatafeedTimingStats(timingStats);
this.currentTimingStats = new DatafeedTimingStats(timingStats);
this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
this.persister = Objects.requireNonNull(persister);
}

/** Gets current timing stats. */
public DatafeedTimingStats getCurrentTimingStats() {
return new DatafeedTimingStats(currentTimingStats);
}
Expand Down Expand Up @@ -64,16 +71,23 @@ public void reportDataCounts(DataCounts dataCounts) {
flushIfDifferSignificantly();
}

/** Finishes reporting of timing stats. Makes timing stats persisted immediately. */
public void finishReporting() {
// Don't flush if current timing stats are identical to the persisted ones
if (currentTimingStats.equals(persistedTimingStats) == false) {
flush(WriteRequest.RefreshPolicy.IMMEDIATE);
}
}

private void flushIfDifferSignificantly() {
if (differSignificantly(currentTimingStats, persistedTimingStats)) {
flush();
flush(WriteRequest.RefreshPolicy.NONE);
}
}

private void flush() {
private void flush(WriteRequest.RefreshPolicy refreshPolicy) {
persistedTimingStats = new DatafeedTimingStats(currentTimingStats);
// TODO: Consider changing refresh policy to NONE here and only do IMMEDIATE on datafeed _stop action
jobResultsPersister.persistDatafeedTimingStats(persistedTimingStats, WriteRequest.RefreshPolicy.IMMEDIATE);
persister.persistDatafeedTimingStats(persistedTimingStats, refreshPolicy);
}

/**
Expand Down
Loading