Skip to content

Commit 075cc7c

Browse files
committed
[ML Data Frame] Persist data frame after state changes (#42347)
1 parent f696769 commit 075cc7c

File tree

1 file changed

+10
-18
lines changed

1 file changed

+10
-18
lines changed

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,6 @@ static class ClientDataFrameIndexer extends DataFrameIndexer {
444444
private final DataFrameTransformsCheckpointService transformsCheckpointService;
445445
private final String transformId;
446446
private final DataFrameTransformTask transformTask;
447-
private volatile DataFrameIndexerTransformStats previouslyPersistedStats = null;
448447
private final AtomicInteger failureCount;
449448
// Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index
450449
private volatile String lastAuditedExceptionMessage = null;
@@ -552,25 +551,18 @@ protected void doSaveState(IndexerState indexerState, Map<String, Object> positi
552551
// only every-so-often when doing the bulk indexing calls. See AsyncTwoPhaseIndexer#onBulkResponse for current periodicity
553552
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> updateClusterStateListener = ActionListener.wrap(
554553
task -> {
555-
// Only persist the stats if something has actually changed
556-
if (previouslyPersistedStats == null || previouslyPersistedStats.equals(getStats()) == false) {
557-
transformsConfigManager.putOrUpdateTransformStats(
558-
new DataFrameTransformStateAndStats(transformId, state, getStats(),
559-
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
554+
transformsConfigManager.putOrUpdateTransformStats(
555+
new DataFrameTransformStateAndStats(transformId, state, getStats(),
556+
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
560557
ActionListener.wrap(
561-
r -> {
562-
previouslyPersistedStats = getStats();
563-
next.run();
564-
},
565-
statsExc -> {
566-
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
567-
next.run();
568-
}
558+
r -> {
559+
next.run();
560+
},
561+
statsExc -> {
562+
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
563+
next.run();
564+
}
569565
));
570-
// The stats that we have previously written to the doc is the same as as it is now, no need to update it
571-
} else {
572-
next.run();
573-
}
574566
},
575567
exc -> {
576568
logger.error("Updating persistent state of transform [" + transformConfig.getId() + "] failed", exc);

0 commit comments

Comments
 (0)