From 631a3c1887e6e24d4f76e60d2e9934f33891375c Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Fri, 4 Sep 2020 14:15:03 +0100 Subject: [PATCH 1/2] [ML] Refactor foreground persistence Refactor foreground persistence to allow for using supplied descriptors when snapshotting model state. This paves the way for upgrading model state. --- include/api/CAnomalyJob.h | 11 +- lib/api/CAnomalyJob.cc | 44 +++++--- lib/api/unittest/CPersistenceManagerTest.cc | 108 +++++++++++++++++++- 3 files changed, 144 insertions(+), 19 deletions(-) diff --git a/include/api/CAnomalyJob.h b/include/api/CAnomalyJob.h index f4974aa5d6..4ec27fd94e 100644 --- a/include/api/CAnomalyJob.h +++ b/include/api/CAnomalyJob.h @@ -168,6 +168,13 @@ class API_EXPORT CAnomalyJob : public CDataProcessor { bool persistStateInForeground(core::CDataAdder& persister, const std::string& descriptionPrefix) override; + //! Persist the current model state regardless of whether + //! any results have been output. + bool doPersistStateInForeground(core::CDataAdder& persister, + const std::string& description, + const std::string& snapshotId, + core_t::TTime snapshotTimestamp); + //! Persist state of the residual models only bool persistModelsState(core::CDataAdder& persister, core_t::TTime timestamp, @@ -250,7 +257,9 @@ class API_EXPORT CAnomalyJob : public CDataProcessor { bool runForegroundPersist(core::CDataAdder& persister); //! Persist the detectors to a stream. - bool persistCopiedState(const std::string& descriptionPrefix, + bool persistCopiedState(const std::string& description, + const std::string& snapshotId, + core_t::TTime snapshotTimestamp, core_t::TTime time, const TKeyCRefAnomalyDetectorPtrPrVec& detectors, const model::CResourceMonitor::SModelSizeStats& modelSizeStats, diff --git a/lib/api/CAnomalyJob.cc b/lib/api/CAnomalyJob.cc index ee14b054b6..373ad342ec 100644 --- a/lib/api/CAnomalyJob.cc +++ b/lib/api/CAnomalyJob.cc @@ -990,6 +990,22 @@ bool CAnomalyJob::persistModelsState(core::CDataAdder& persister, bool CAnomalyJob::persistStateInForeground(core::CDataAdder& persister, const std::string& descriptionPrefix) { + if (m_LastFinalisedBucketEndTime == 0) { + LOG_INFO(<< "Will not persist detectors as no results have been output"); + return true; + } + + core_t::TTime snapshotTimestamp{core::CTimeUtils::now()}; + const std::string snapshotId{core::CStringUtils::typeToString(snapshotTimestamp)}; + const std::string description{descriptionPrefix + + core::CTimeUtils::toIso8601(snapshotTimestamp)}; + return this->doPersistStateInForeground(persister, description, snapshotId, snapshotTimestamp); +} + +bool CAnomalyJob::doPersistStateInForeground(core::CDataAdder& persister, + const std::string& description, + const std::string& snapshotId, + core_t::TTime snapshotTimestamp) { if (m_PersistenceManager != nullptr) { // This will not happen if finalise() was called before persisting state if (m_PersistenceManager->isBusy()) { @@ -999,11 +1015,6 @@ bool CAnomalyJob::persistStateInForeground(core::CDataAdder& persister, } } - if (m_LastFinalisedBucketEndTime == 0) { - LOG_INFO(<< "Will not persist detectors as no results have been output"); - return true; - } - TKeyCRefAnomalyDetectorPtrPrVec detectors; this->sortedDetectors(detectors); std::string normaliserState; @@ -1015,7 +1026,7 @@ bool CAnomalyJob::persistStateInForeground(core::CDataAdder& persister, core::CProgramCounters::cacheCounters(); return this->persistCopiedState( - descriptionPrefix, m_LastFinalisedBucketEndTime, detectors, + description, snapshotId, snapshotTimestamp, m_LastFinalisedBucketEndTime, detectors, m_Limits.resourceMonitor().createMemoryUsageReport( m_LastFinalisedBucketEndTime - m_ModelConfig.bucketLength()), m_ModelConfig.interimBucketCorrector(), m_Aggregator, normaliserState, @@ -1094,9 +1105,13 @@ bool CAnomalyJob::runBackgroundPersist(TBackgroundPersistArgsPtr args, return false; } + core_t::TTime snapshotTimestamp(core::CTimeUtils::now()); + const std::string snapshotId(core::CStringUtils::typeToString(snapshotTimestamp)); + const std::string description{"Periodic background persist at " + snapshotId}; + return this->persistCopiedState( - "Periodic background persist at ", args->s_Time, args->s_Detectors, - args->s_ModelSizeStats, args->s_InterimBucketCorrector, + description, snapshotId, snapshotTimestamp, args->s_Time, + args->s_Detectors, args->s_ModelSizeStats, args->s_InterimBucketCorrector, args->s_Aggregator, args->s_NormalizerState, args->s_LatestRecordTime, args->s_LastResultsTime, persister); } @@ -1147,7 +1162,9 @@ bool CAnomalyJob::persistModelsState(const TKeyCRefAnomalyDetectorPtrPrVec& dete return true; } -bool CAnomalyJob::persistCopiedState(const std::string& descriptionPrefix, +bool CAnomalyJob::persistCopiedState(const std::string& description, + const std::string& snapshotId, + core_t::TTime snapshotTimestamp, core_t::TTime time, const TKeyCRefAnomalyDetectorPtrPrVec& detectors, const model::CResourceMonitor::SModelSizeStats& modelSizeStats, @@ -1161,10 +1178,8 @@ bool CAnomalyJob::persistCopiedState(const std::string& descriptionPrefix, try { core::CStateCompressor compressor(persister); - core_t::TTime snapshotTimestamp(core::CTimeUtils::now()); - const std::string snapShotId(core::CStringUtils::typeToString(snapshotTimestamp)); core::CDataAdder::TOStreamP strm = compressor.addStreamed( - ML_STATE_INDEX, m_JobId + '_' + STATE_TYPE + '_' + snapShotId); + ML_STATE_INDEX, m_JobId + '_' + STATE_TYPE + '_' + snapshotId); if (strm != nullptr) { // IMPORTANT - this method can run in a background thread while the // analytics carries on processing new buckets in the main thread. @@ -1212,9 +1227,8 @@ bool CAnomalyJob::persistCopiedState(const std::string& descriptionPrefix, if (m_PersistCompleteFunc) { CModelSnapshotJsonWriter::SModelSnapshotReport modelSnapshotReport{ - MODEL_SNAPSHOT_MIN_VERSION, snapshotTimestamp, - descriptionPrefix + core::CTimeUtils::toIso8601(snapshotTimestamp), - snapShotId, compressor.numCompressedDocs(), modelSizeStats, + MODEL_SNAPSHOT_MIN_VERSION, snapshotTimestamp, description, + snapshotId, compressor.numCompressedDocs(), modelSizeStats, normalizerState, latestRecordTime, // This needs to be the last final result time as it serves // as the time after which all results are deleted when a diff --git a/lib/api/unittest/CPersistenceManagerTest.cc b/lib/api/unittest/CPersistenceManagerTest.cc index f8b40c8f4d..4bc4b88ce7 100644 --- a/lib/api/unittest/CPersistenceManagerTest.cc +++ b/lib/api/unittest/CPersistenceManagerTest.cc @@ -35,9 +35,11 @@ BOOST_AUTO_TEST_SUITE(CPersistenceManagerTest) namespace { void reportPersistComplete(ml::api::CModelSnapshotJsonWriter::SModelSnapshotReport modelSnapshotReport, + std::string& description, std::string& snapshotIdOut, std::size_t& numDocsOut) { LOG_DEBUG(<< "Persist complete with description: " << modelSnapshotReport.s_Description); + description = modelSnapshotReport.s_Description; snapshotIdOut = modelSnapshotReport.s_SnapshotId; numDocsOut = modelSnapshotReport.s_NumDocs; } @@ -71,10 +73,10 @@ class CTestFixture { ml::api::CSingleStreamDataAdder::TOStreamP backgroundStreamPtr{ backgroundStream = new std::ostringstream()}; ml::api::CSingleStreamDataAdder backgroundDataAdder{backgroundStreamPtr}; + std::ostringstream* foregroundStream{nullptr}; ml::api::CSingleStreamDataAdder::TOStreamP foregroundStreamPtr{ foregroundStream = new std::ostringstream()}; - ml::api::CSingleStreamDataAdder foregroundDataAdder{foregroundStreamPtr}; // The 30000 second persist interval is set large enough that the timer will @@ -83,6 +85,7 @@ class CTestFixture { ml::api::CPersistenceManager persistenceManager{ 30000, false, backgroundDataAdder, foregroundDataAdder}; + std::string description; std::string snapshotId; std::size_t numDocs{0}; @@ -101,7 +104,8 @@ class CTestFixture { fieldConfig, modelConfig, wrappedOutputStream, - std::bind(&reportPersistComplete, std::placeholders::_1, + std::bind(&reportPersistComplete, + std::placeholders::_1, std::ref(description), std::ref(snapshotId), std::ref(numDocs)), &persistenceManager, -1, @@ -217,6 +221,7 @@ class CTestFixture { ml::api::CPersistenceManager persistenceManager{ 30000, false, backgroundDataAdder, foregroundDataAdder}; + std::string description; std::string snapshotId; std::size_t numDocs{0}; @@ -231,7 +236,8 @@ class CTestFixture { fieldConfig, modelConfig, wrappedOutputStream, - std::bind(&reportPersistComplete, std::placeholders::_1, + std::bind(&reportPersistComplete, + std::placeholders::_1, std::ref(description), std::ref(snapshotId), std::ref(numDocs)), &persistenceManager, -1, @@ -287,8 +293,104 @@ class CTestFixture { BOOST_REQUIRE_EQUAL(backgroundState, foregroundState); } + + void foregroundPersistWithGivenSnapshotDescriptors(const std::string& configFileName) { + // Start by creating processors with non-trivial state + + static const ml::core_t::TTime BUCKET_SIZE{3600}; + static const std::string JOB_ID{"job"}; + + std::string inputFilename{"testfiles/big_ascending.txt"}; + + // Open the input and output files + std::ifstream inputStrm{inputFilename}; + BOOST_TEST_REQUIRE(inputStrm.is_open()); + + std::ofstream outputStrm{ml::core::COsFileFuncs::NULL_FILENAME}; + BOOST_TEST_REQUIRE(outputStrm.is_open()); + + ml::model::CLimits limits; + ml::api::CFieldConfig fieldConfig; + BOOST_TEST_REQUIRE(fieldConfig.initFromFile(configFileName)); + + ml::model::CAnomalyDetectorModelConfig modelConfig{ + ml::model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE)}; + + std::ostringstream* dataStream{nullptr}; + ml::api::CSingleStreamDataAdder::TOStreamP dataStreamPtr{ + dataStream = new std::ostringstream()}; + + // Persist the processors' state + ml::api::CSingleStreamDataAdder dataAdder{dataStreamPtr}; + + // The 30000 second persist interval is set large enough that the timer + // will not trigger during the test - we bypass the timer in this test + // and kick off the background persistence chain explicitly + ml::api::CPersistenceManager persistenceManager{30000, false, dataAdder}; + + std::string description_; + std::string snapshotId_; + std::size_t numDocs_{0}; + + std::string backgroundSnapshotId; + std::string foregroundSnapshotId; + + { + ml::core::CJsonOutputStreamWrapper wrappedOutputStream{outputStrm}; + + CTestAnomalyJob job{JOB_ID, + limits, + fieldConfig, + modelConfig, + wrappedOutputStream, + std::bind(&reportPersistComplete, + std::placeholders::_1, std::ref(description_), + std::ref(snapshotId_), std::ref(numDocs_)), + &persistenceManager, + -1, + "time", + "%d/%b/%Y:%T %z"}; + + ml::api::CDataProcessor* firstProcessor{&job}; + + ml::api::CNdJsonInputParser parser{ + {CTestFieldDataCategorizer::MLCATEGORY_NAME}, inputStrm}; + + BOOST_TEST_REQUIRE(parser.readStreamIntoMaps( + [firstProcessor](const ml::api::CDataProcessor::TStrStrUMap& dataRowFields) { + return firstProcessor->handleRecord( + dataRowFields, ml::api::CDataProcessor::TOptionalTime{}); + })); + + // Ensure the model size stats are up to date + job.finalise(); + + ml::core_t::TTime snapshotTimestamp{1283524206}; + const std::string snapshotId{ml::core::CStringUtils::typeToString(snapshotTimestamp)}; + const std::string description{"Supplied description for snapshot at " + + ml::core::CTimeUtils::toIso8601(snapshotTimestamp)}; + BOOST_TEST_REQUIRE(job.doPersistStateInForeground( + dataAdder, description, snapshotId, snapshotTimestamp)); + + // Check that the snapshot description and Id reported by the "persist complete" + // handler match those supplied to the persist function + BOOST_REQUIRE_EQUAL(description, description_); + BOOST_REQUIRE_EQUAL(snapshotId, snapshotId_); + + std::string state{dataStream->str()}; + + // Compare snapshot ID embedded in the state string with the supplied value. + const std::string expectedId{"job_model_state_" + + std::to_string(snapshotTimestamp) + "#1"}; + BOOST_TEST_REQUIRE(state.find(expectedId) != std::string::npos); + } + } }; +BOOST_FIXTURE_TEST_CASE(testDetectorPersistByWithGivenSnapshotDescriptors, CTestFixture) { + this->foregroundPersistWithGivenSnapshotDescriptors("testfiles/new_mlfields.conf"); +} + BOOST_FIXTURE_TEST_CASE(testDetectorPersistBy, CTestFixture) { this->foregroundBackgroundCompCategorizationAndAnomalyDetection("testfiles/new_mlfields.conf"); } From bcb45b728b8d2459c23b35945eae891251aed30f Mon Sep 17 00:00:00 2001 From: Ed Savage Date: Fri, 4 Sep 2020 16:06:50 +0100 Subject: [PATCH 2/2] Attending to code review comments --- lib/api/CAnomalyJob.cc | 3 +- lib/api/unittest/CPersistenceManagerTest.cc | 44 ++++++++++++--------- 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/lib/api/CAnomalyJob.cc b/lib/api/CAnomalyJob.cc index 373ad342ec..1d85435cf4 100644 --- a/lib/api/CAnomalyJob.cc +++ b/lib/api/CAnomalyJob.cc @@ -1107,7 +1107,8 @@ bool CAnomalyJob::runBackgroundPersist(TBackgroundPersistArgsPtr args, core_t::TTime snapshotTimestamp(core::CTimeUtils::now()); const std::string snapshotId(core::CStringUtils::typeToString(snapshotTimestamp)); - const std::string description{"Periodic background persist at " + snapshotId}; + const std::string description{"Periodic background persist at " + + core::CTimeUtils::toIso8601(snapshotTimestamp)}; return this->persistCopiedState( description, snapshotId, snapshotTimestamp, args->s_Time, diff --git a/lib/api/unittest/CPersistenceManagerTest.cc b/lib/api/unittest/CPersistenceManagerTest.cc index 4bc4b88ce7..7e04881d23 100644 --- a/lib/api/unittest/CPersistenceManagerTest.cc +++ b/lib/api/unittest/CPersistenceManagerTest.cc @@ -35,10 +35,12 @@ BOOST_AUTO_TEST_SUITE(CPersistenceManagerTest) namespace { void reportPersistComplete(ml::api::CModelSnapshotJsonWriter::SModelSnapshotReport modelSnapshotReport, + ml::core_t::TTime& snapshotTimestamp, std::string& description, std::string& snapshotIdOut, std::size_t& numDocsOut) { LOG_DEBUG(<< "Persist complete with description: " << modelSnapshotReport.s_Description); + snapshotTimestamp = modelSnapshotReport.s_SnapshotTimestamp; description = modelSnapshotReport.s_Description; snapshotIdOut = modelSnapshotReport.s_SnapshotId; numDocsOut = modelSnapshotReport.s_NumDocs; @@ -85,6 +87,7 @@ class CTestFixture { ml::api::CPersistenceManager persistenceManager{ 30000, false, backgroundDataAdder, foregroundDataAdder}; + ml::core_t::TTime snapshotTimestamp; std::string description; std::string snapshotId; std::size_t numDocs{0}; @@ -104,8 +107,8 @@ class CTestFixture { fieldConfig, modelConfig, wrappedOutputStream, - std::bind(&reportPersistComplete, - std::placeholders::_1, std::ref(description), + std::bind(&reportPersistComplete, std::placeholders::_1, + std::ref(snapshotTimestamp), std::ref(description), std::ref(snapshotId), std::ref(numDocs)), &persistenceManager, -1, @@ -221,6 +224,7 @@ class CTestFixture { ml::api::CPersistenceManager persistenceManager{ 30000, false, backgroundDataAdder, foregroundDataAdder}; + ml::core_t::TTime snapshotTimestamp; std::string description; std::string snapshotId; std::size_t numDocs{0}; @@ -236,8 +240,8 @@ class CTestFixture { fieldConfig, modelConfig, wrappedOutputStream, - std::bind(&reportPersistComplete, - std::placeholders::_1, std::ref(description), + std::bind(&reportPersistComplete, std::placeholders::_1, + std::ref(snapshotTimestamp), std::ref(description), std::ref(snapshotId), std::ref(numDocs)), &persistenceManager, -1, @@ -328,6 +332,7 @@ class CTestFixture { // and kick off the background persistence chain explicitly ml::api::CPersistenceManager persistenceManager{30000, false, dataAdder}; + ml::core_t::TTime snapshotTimestamp_; std::string description_; std::string snapshotId_; std::size_t numDocs_{0}; @@ -338,18 +343,19 @@ class CTestFixture { { ml::core::CJsonOutputStreamWrapper wrappedOutputStream{outputStrm}; - CTestAnomalyJob job{JOB_ID, - limits, - fieldConfig, - modelConfig, - wrappedOutputStream, - std::bind(&reportPersistComplete, - std::placeholders::_1, std::ref(description_), - std::ref(snapshotId_), std::ref(numDocs_)), - &persistenceManager, - -1, - "time", - "%d/%b/%Y:%T %z"}; + CTestAnomalyJob job{ + JOB_ID, + limits, + fieldConfig, + modelConfig, + wrappedOutputStream, + std::bind(&reportPersistComplete, std::placeholders::_1, + std::ref(snapshotTimestamp_), std::ref(description_), + std::ref(snapshotId_), std::ref(numDocs_)), + &persistenceManager, + -1, + "time", + "%d/%b/%Y:%T %z"}; ml::api::CDataProcessor* firstProcessor{&job}; @@ -366,7 +372,7 @@ class CTestFixture { job.finalise(); ml::core_t::TTime snapshotTimestamp{1283524206}; - const std::string snapshotId{ml::core::CStringUtils::typeToString(snapshotTimestamp)}; + const std::string snapshotId{"my_special_snapshot"}; const std::string description{"Supplied description for snapshot at " + ml::core::CTimeUtils::toIso8601(snapshotTimestamp)}; BOOST_TEST_REQUIRE(job.doPersistStateInForeground( @@ -374,14 +380,14 @@ class CTestFixture { // Check that the snapshot description and Id reported by the "persist complete" // handler match those supplied to the persist function + BOOST_REQUIRE_EQUAL(snapshotTimestamp, snapshotTimestamp_); BOOST_REQUIRE_EQUAL(description, description_); BOOST_REQUIRE_EQUAL(snapshotId, snapshotId_); std::string state{dataStream->str()}; // Compare snapshot ID embedded in the state string with the supplied value. - const std::string expectedId{"job_model_state_" + - std::to_string(snapshotTimestamp) + "#1"}; + const std::string expectedId{"job_model_state_" + snapshotId + "#1"}; BOOST_TEST_REQUIRE(state.find(expectedId) != std::string::npos); } }