Skip to content

Commit c3eebc9

Browse files
authored
[ML] Refactor foreground persistence (#1473)
Refactor foreground persistence to allow for using supplied descriptors when snapshotting model state. This paves the way for upgrading model state.
1 parent bba9007 commit c3eebc9

File tree

3 files changed

+149
-17
lines changed

3 files changed

+149
-17
lines changed

include/api/CAnomalyJob.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,13 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
168168
bool persistStateInForeground(core::CDataAdder& persister,
169169
const std::string& descriptionPrefix) override;
170170

171+
//! Persist the current model state regardless of whether
172+
//! any results have been output.
173+
bool doPersistStateInForeground(core::CDataAdder& persister,
174+
const std::string& description,
175+
const std::string& snapshotId,
176+
core_t::TTime snapshotTimestamp);
177+
171178
//! Persist state of the residual models only
172179
bool persistModelsState(core::CDataAdder& persister,
173180
core_t::TTime timestamp,
@@ -250,7 +257,9 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
250257
bool runForegroundPersist(core::CDataAdder& persister);
251258

252259
//! Persist the detectors to a stream.
253-
bool persistCopiedState(const std::string& descriptionPrefix,
260+
bool persistCopiedState(const std::string& description,
261+
const std::string& snapshotId,
262+
core_t::TTime snapshotTimestamp,
254263
core_t::TTime time,
255264
const TKeyCRefAnomalyDetectorPtrPrVec& detectors,
256265
const model::CResourceMonitor::SModelSizeStats& modelSizeStats,

lib/api/CAnomalyJob.cc

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -990,6 +990,22 @@ bool CAnomalyJob::persistModelsState(core::CDataAdder& persister,
990990

991991
bool CAnomalyJob::persistStateInForeground(core::CDataAdder& persister,
992992
const std::string& descriptionPrefix) {
993+
if (m_LastFinalisedBucketEndTime == 0) {
994+
LOG_INFO(<< "Will not persist detectors as no results have been output");
995+
return true;
996+
}
997+
998+
core_t::TTime snapshotTimestamp{core::CTimeUtils::now()};
999+
const std::string snapshotId{core::CStringUtils::typeToString(snapshotTimestamp)};
1000+
const std::string description{descriptionPrefix +
1001+
core::CTimeUtils::toIso8601(snapshotTimestamp)};
1002+
return this->doPersistStateInForeground(persister, description, snapshotId, snapshotTimestamp);
1003+
}
1004+
1005+
bool CAnomalyJob::doPersistStateInForeground(core::CDataAdder& persister,
1006+
const std::string& description,
1007+
const std::string& snapshotId,
1008+
core_t::TTime snapshotTimestamp) {
9931009
if (m_PersistenceManager != nullptr) {
9941010
// This will not happen if finalise() was called before persisting state
9951011
if (m_PersistenceManager->isBusy()) {
@@ -999,11 +1015,6 @@ bool CAnomalyJob::persistStateInForeground(core::CDataAdder& persister,
9991015
}
10001016
}
10011017

1002-
if (m_LastFinalisedBucketEndTime == 0) {
1003-
LOG_INFO(<< "Will not persist detectors as no results have been output");
1004-
return true;
1005-
}
1006-
10071018
TKeyCRefAnomalyDetectorPtrPrVec detectors;
10081019
this->sortedDetectors(detectors);
10091020
std::string normaliserState;
@@ -1015,7 +1026,7 @@ bool CAnomalyJob::persistStateInForeground(core::CDataAdder& persister,
10151026
core::CProgramCounters::cacheCounters();
10161027

10171028
return this->persistCopiedState(
1018-
descriptionPrefix, m_LastFinalisedBucketEndTime, detectors,
1029+
description, snapshotId, snapshotTimestamp, m_LastFinalisedBucketEndTime, detectors,
10191030
m_Limits.resourceMonitor().createMemoryUsageReport(
10201031
m_LastFinalisedBucketEndTime - m_ModelConfig.bucketLength()),
10211032
m_ModelConfig.interimBucketCorrector(), m_Aggregator, normaliserState,
@@ -1094,9 +1105,14 @@ bool CAnomalyJob::runBackgroundPersist(TBackgroundPersistArgsPtr args,
10941105
return false;
10951106
}
10961107

1108+
core_t::TTime snapshotTimestamp(core::CTimeUtils::now());
1109+
const std::string snapshotId(core::CStringUtils::typeToString(snapshotTimestamp));
1110+
const std::string description{"Periodic background persist at " +
1111+
core::CTimeUtils::toIso8601(snapshotTimestamp)};
1112+
10971113
return this->persistCopiedState(
1098-
"Periodic background persist at ", args->s_Time, args->s_Detectors,
1099-
args->s_ModelSizeStats, args->s_InterimBucketCorrector,
1114+
description, snapshotId, snapshotTimestamp, args->s_Time,
1115+
args->s_Detectors, args->s_ModelSizeStats, args->s_InterimBucketCorrector,
11001116
args->s_Aggregator, args->s_NormalizerState, args->s_LatestRecordTime,
11011117
args->s_LastResultsTime, persister);
11021118
}
@@ -1147,7 +1163,9 @@ bool CAnomalyJob::persistModelsState(const TKeyCRefAnomalyDetectorPtrPrVec& dete
11471163
return true;
11481164
}
11491165

1150-
bool CAnomalyJob::persistCopiedState(const std::string& descriptionPrefix,
1166+
bool CAnomalyJob::persistCopiedState(const std::string& description,
1167+
const std::string& snapshotId,
1168+
core_t::TTime snapshotTimestamp,
11511169
core_t::TTime time,
11521170
const TKeyCRefAnomalyDetectorPtrPrVec& detectors,
11531171
const model::CResourceMonitor::SModelSizeStats& modelSizeStats,
@@ -1161,10 +1179,8 @@ bool CAnomalyJob::persistCopiedState(const std::string& descriptionPrefix,
11611179
try {
11621180
core::CStateCompressor compressor(persister);
11631181

1164-
core_t::TTime snapshotTimestamp(core::CTimeUtils::now());
1165-
const std::string snapShotId(core::CStringUtils::typeToString(snapshotTimestamp));
11661182
core::CDataAdder::TOStreamP strm = compressor.addStreamed(
1167-
ML_STATE_INDEX, m_JobId + '_' + STATE_TYPE + '_' + snapShotId);
1183+
ML_STATE_INDEX, m_JobId + '_' + STATE_TYPE + '_' + snapshotId);
11681184
if (strm != nullptr) {
11691185
// IMPORTANT - this method can run in a background thread while the
11701186
// analytics carries on processing new buckets in the main thread.
@@ -1212,9 +1228,8 @@ bool CAnomalyJob::persistCopiedState(const std::string& descriptionPrefix,
12121228

12131229
if (m_PersistCompleteFunc) {
12141230
CModelSnapshotJsonWriter::SModelSnapshotReport modelSnapshotReport{
1215-
MODEL_SNAPSHOT_MIN_VERSION, snapshotTimestamp,
1216-
descriptionPrefix + core::CTimeUtils::toIso8601(snapshotTimestamp),
1217-
snapShotId, compressor.numCompressedDocs(), modelSizeStats,
1231+
MODEL_SNAPSHOT_MIN_VERSION, snapshotTimestamp, description,
1232+
snapshotId, compressor.numCompressedDocs(), modelSizeStats,
12181233
normalizerState, latestRecordTime,
12191234
// This needs to be the last final result time as it serves
12201235
// as the time after which all results are deleted when a

lib/api/unittest/CPersistenceManagerTest.cc

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,13 @@ BOOST_AUTO_TEST_SUITE(CPersistenceManagerTest)
3535
namespace {
3636

3737
void reportPersistComplete(ml::api::CModelSnapshotJsonWriter::SModelSnapshotReport modelSnapshotReport,
38+
ml::core_t::TTime& snapshotTimestamp,
39+
std::string& description,
3840
std::string& snapshotIdOut,
3941
std::size_t& numDocsOut) {
4042
LOG_DEBUG(<< "Persist complete with description: " << modelSnapshotReport.s_Description);
43+
snapshotTimestamp = modelSnapshotReport.s_SnapshotTimestamp;
44+
description = modelSnapshotReport.s_Description;
4145
snapshotIdOut = modelSnapshotReport.s_SnapshotId;
4246
numDocsOut = modelSnapshotReport.s_NumDocs;
4347
}
@@ -71,10 +75,10 @@ class CTestFixture {
7175
ml::api::CSingleStreamDataAdder::TOStreamP backgroundStreamPtr{
7276
backgroundStream = new std::ostringstream()};
7377
ml::api::CSingleStreamDataAdder backgroundDataAdder{backgroundStreamPtr};
78+
7479
std::ostringstream* foregroundStream{nullptr};
7580
ml::api::CSingleStreamDataAdder::TOStreamP foregroundStreamPtr{
7681
foregroundStream = new std::ostringstream()};
77-
7882
ml::api::CSingleStreamDataAdder foregroundDataAdder{foregroundStreamPtr};
7983

8084
// The 30000 second persist interval is set large enough that the timer will
@@ -83,6 +87,8 @@ class CTestFixture {
8387
ml::api::CPersistenceManager persistenceManager{
8488
30000, false, backgroundDataAdder, foregroundDataAdder};
8589

90+
ml::core_t::TTime snapshotTimestamp;
91+
std::string description;
8692
std::string snapshotId;
8793
std::size_t numDocs{0};
8894

@@ -102,6 +108,7 @@ class CTestFixture {
102108
modelConfig,
103109
wrappedOutputStream,
104110
std::bind(&reportPersistComplete, std::placeholders::_1,
111+
std::ref(snapshotTimestamp), std::ref(description),
105112
std::ref(snapshotId), std::ref(numDocs)),
106113
&persistenceManager,
107114
-1,
@@ -217,6 +224,8 @@ class CTestFixture {
217224
ml::api::CPersistenceManager persistenceManager{
218225
30000, false, backgroundDataAdder, foregroundDataAdder};
219226

227+
ml::core_t::TTime snapshotTimestamp;
228+
std::string description;
220229
std::string snapshotId;
221230
std::size_t numDocs{0};
222231

@@ -232,6 +241,7 @@ class CTestFixture {
232241
modelConfig,
233242
wrappedOutputStream,
234243
std::bind(&reportPersistComplete, std::placeholders::_1,
244+
std::ref(snapshotTimestamp), std::ref(description),
235245
std::ref(snapshotId), std::ref(numDocs)),
236246
&persistenceManager,
237247
-1,
@@ -287,8 +297,106 @@ class CTestFixture {
287297

288298
BOOST_REQUIRE_EQUAL(backgroundState, foregroundState);
289299
}
300+
301+
void foregroundPersistWithGivenSnapshotDescriptors(const std::string& configFileName) {
302+
// Start by creating processors with non-trivial state
303+
304+
static const ml::core_t::TTime BUCKET_SIZE{3600};
305+
static const std::string JOB_ID{"job"};
306+
307+
std::string inputFilename{"testfiles/big_ascending.txt"};
308+
309+
// Open the input and output files
310+
std::ifstream inputStrm{inputFilename};
311+
BOOST_TEST_REQUIRE(inputStrm.is_open());
312+
313+
std::ofstream outputStrm{ml::core::COsFileFuncs::NULL_FILENAME};
314+
BOOST_TEST_REQUIRE(outputStrm.is_open());
315+
316+
ml::model::CLimits limits;
317+
ml::api::CFieldConfig fieldConfig;
318+
BOOST_TEST_REQUIRE(fieldConfig.initFromFile(configFileName));
319+
320+
ml::model::CAnomalyDetectorModelConfig modelConfig{
321+
ml::model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE)};
322+
323+
std::ostringstream* dataStream{nullptr};
324+
ml::api::CSingleStreamDataAdder::TOStreamP dataStreamPtr{
325+
dataStream = new std::ostringstream()};
326+
327+
// Persist the processors' state
328+
ml::api::CSingleStreamDataAdder dataAdder{dataStreamPtr};
329+
330+
// The 30000 second persist interval is set large enough that the timer
331+
// will not trigger during the test - we bypass the timer in this test
332+
// and kick off the background persistence chain explicitly
333+
ml::api::CPersistenceManager persistenceManager{30000, false, dataAdder};
334+
335+
ml::core_t::TTime snapshotTimestamp_;
336+
std::string description_;
337+
std::string snapshotId_;
338+
std::size_t numDocs_{0};
339+
340+
std::string backgroundSnapshotId;
341+
std::string foregroundSnapshotId;
342+
343+
{
344+
ml::core::CJsonOutputStreamWrapper wrappedOutputStream{outputStrm};
345+
346+
CTestAnomalyJob job{
347+
JOB_ID,
348+
limits,
349+
fieldConfig,
350+
modelConfig,
351+
wrappedOutputStream,
352+
std::bind(&reportPersistComplete, std::placeholders::_1,
353+
std::ref(snapshotTimestamp_), std::ref(description_),
354+
std::ref(snapshotId_), std::ref(numDocs_)),
355+
&persistenceManager,
356+
-1,
357+
"time",
358+
"%d/%b/%Y:%T %z"};
359+
360+
ml::api::CDataProcessor* firstProcessor{&job};
361+
362+
ml::api::CNdJsonInputParser parser{
363+
{CTestFieldDataCategorizer::MLCATEGORY_NAME}, inputStrm};
364+
365+
BOOST_TEST_REQUIRE(parser.readStreamIntoMaps(
366+
[firstProcessor](const ml::api::CDataProcessor::TStrStrUMap& dataRowFields) {
367+
return firstProcessor->handleRecord(
368+
dataRowFields, ml::api::CDataProcessor::TOptionalTime{});
369+
}));
370+
371+
// Ensure the model size stats are up to date
372+
job.finalise();
373+
374+
ml::core_t::TTime snapshotTimestamp{1283524206};
375+
const std::string snapshotId{"my_special_snapshot"};
376+
const std::string description{"Supplied description for snapshot at " +
377+
ml::core::CTimeUtils::toIso8601(snapshotTimestamp)};
378+
BOOST_TEST_REQUIRE(job.doPersistStateInForeground(
379+
dataAdder, description, snapshotId, snapshotTimestamp));
380+
381+
// Check that the snapshot description and Id reported by the "persist complete"
382+
// handler match those supplied to the persist function
383+
BOOST_REQUIRE_EQUAL(snapshotTimestamp, snapshotTimestamp_);
384+
BOOST_REQUIRE_EQUAL(description, description_);
385+
BOOST_REQUIRE_EQUAL(snapshotId, snapshotId_);
386+
387+
std::string state{dataStream->str()};
388+
389+
// Compare snapshot ID embedded in the state string with the supplied value.
390+
const std::string expectedId{"job_model_state_" + snapshotId + "#1"};
391+
BOOST_TEST_REQUIRE(state.find(expectedId) != std::string::npos);
392+
}
393+
}
290394
};
291395

396+
BOOST_FIXTURE_TEST_CASE(testDetectorPersistByWithGivenSnapshotDescriptors, CTestFixture) {
397+
this->foregroundPersistWithGivenSnapshotDescriptors("testfiles/new_mlfields.conf");
398+
}
399+
292400
BOOST_FIXTURE_TEST_CASE(testDetectorPersistBy, CTestFixture) {
293401
this->foregroundBackgroundCompCategorizationAndAnomalyDetection("testfiles/new_mlfields.conf");
294402
}

0 commit comments

Comments
 (0)