Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion include/api/CAnomalyJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
bool persistStateInForeground(core::CDataAdder& persister,
const std::string& descriptionPrefix) override;

//! Persist the current model state regardless of whether
//! any results have been output.
bool doPersistStateInForeground(core::CDataAdder& persister,
const std::string& description,
const std::string& snapshotId,
core_t::TTime snapshotTimestamp);

//! Persist state of the residual models only
bool persistModelsState(core::CDataAdder& persister,
core_t::TTime timestamp,
Expand Down Expand Up @@ -250,7 +257,9 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
bool runForegroundPersist(core::CDataAdder& persister);

//! Persist the detectors to a stream.
bool persistCopiedState(const std::string& descriptionPrefix,
bool persistCopiedState(const std::string& description,
const std::string& snapshotId,
core_t::TTime snapshotTimestamp,
core_t::TTime time,
const TKeyCRefAnomalyDetectorPtrPrVec& detectors,
const model::CResourceMonitor::SModelSizeStats& modelSizeStats,
Expand Down
45 changes: 30 additions & 15 deletions lib/api/CAnomalyJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,22 @@ bool CAnomalyJob::persistModelsState(core::CDataAdder& persister,

bool CAnomalyJob::persistStateInForeground(core::CDataAdder& persister,
const std::string& descriptionPrefix) {
if (m_LastFinalisedBucketEndTime == 0) {
LOG_INFO(<< "Will not persist detectors as no results have been output");
return true;
}

core_t::TTime snapshotTimestamp{core::CTimeUtils::now()};
const std::string snapshotId{core::CStringUtils::typeToString(snapshotTimestamp)};
const std::string description{descriptionPrefix +
core::CTimeUtils::toIso8601(snapshotTimestamp)};
return this->doPersistStateInForeground(persister, description, snapshotId, snapshotTimestamp);
}

bool CAnomalyJob::doPersistStateInForeground(core::CDataAdder& persister,
const std::string& description,
const std::string& snapshotId,
core_t::TTime snapshotTimestamp) {
if (m_PersistenceManager != nullptr) {
// This will not happen if finalise() was called before persisting state
if (m_PersistenceManager->isBusy()) {
Expand All @@ -999,11 +1015,6 @@ bool CAnomalyJob::persistStateInForeground(core::CDataAdder& persister,
}
}

if (m_LastFinalisedBucketEndTime == 0) {
LOG_INFO(<< "Will not persist detectors as no results have been output");
return true;
}

TKeyCRefAnomalyDetectorPtrPrVec detectors;
this->sortedDetectors(detectors);
std::string normaliserState;
Expand All @@ -1015,7 +1026,7 @@ bool CAnomalyJob::persistStateInForeground(core::CDataAdder& persister,
core::CProgramCounters::cacheCounters();

return this->persistCopiedState(
descriptionPrefix, m_LastFinalisedBucketEndTime, detectors,
description, snapshotId, snapshotTimestamp, m_LastFinalisedBucketEndTime, detectors,
m_Limits.resourceMonitor().createMemoryUsageReport(
m_LastFinalisedBucketEndTime - m_ModelConfig.bucketLength()),
m_ModelConfig.interimBucketCorrector(), m_Aggregator, normaliserState,
Expand Down Expand Up @@ -1094,9 +1105,14 @@ bool CAnomalyJob::runBackgroundPersist(TBackgroundPersistArgsPtr args,
return false;
}

core_t::TTime snapshotTimestamp(core::CTimeUtils::now());
const std::string snapshotId(core::CStringUtils::typeToString(snapshotTimestamp));
const std::string description{"Periodic background persist at " +
core::CTimeUtils::toIso8601(snapshotTimestamp)};

return this->persistCopiedState(
"Periodic background persist at ", args->s_Time, args->s_Detectors,
args->s_ModelSizeStats, args->s_InterimBucketCorrector,
description, snapshotId, snapshotTimestamp, args->s_Time,
args->s_Detectors, args->s_ModelSizeStats, args->s_InterimBucketCorrector,
args->s_Aggregator, args->s_NormalizerState, args->s_LatestRecordTime,
args->s_LastResultsTime, persister);
}
Expand Down Expand Up @@ -1147,7 +1163,9 @@ bool CAnomalyJob::persistModelsState(const TKeyCRefAnomalyDetectorPtrPrVec& dete
return true;
}

bool CAnomalyJob::persistCopiedState(const std::string& descriptionPrefix,
bool CAnomalyJob::persistCopiedState(const std::string& description,
const std::string& snapshotId,
core_t::TTime snapshotTimestamp,
core_t::TTime time,
const TKeyCRefAnomalyDetectorPtrPrVec& detectors,
const model::CResourceMonitor::SModelSizeStats& modelSizeStats,
Expand All @@ -1161,10 +1179,8 @@ bool CAnomalyJob::persistCopiedState(const std::string& descriptionPrefix,
try {
core::CStateCompressor compressor(persister);

core_t::TTime snapshotTimestamp(core::CTimeUtils::now());
const std::string snapShotId(core::CStringUtils::typeToString(snapshotTimestamp));
core::CDataAdder::TOStreamP strm = compressor.addStreamed(
ML_STATE_INDEX, m_JobId + '_' + STATE_TYPE + '_' + snapShotId);
ML_STATE_INDEX, m_JobId + '_' + STATE_TYPE + '_' + snapshotId);
if (strm != nullptr) {
// IMPORTANT - this method can run in a background thread while the
// analytics carries on processing new buckets in the main thread.
Expand Down Expand Up @@ -1212,9 +1228,8 @@ bool CAnomalyJob::persistCopiedState(const std::string& descriptionPrefix,

if (m_PersistCompleteFunc) {
CModelSnapshotJsonWriter::SModelSnapshotReport modelSnapshotReport{
MODEL_SNAPSHOT_MIN_VERSION, snapshotTimestamp,
descriptionPrefix + core::CTimeUtils::toIso8601(snapshotTimestamp),
snapShotId, compressor.numCompressedDocs(), modelSizeStats,
MODEL_SNAPSHOT_MIN_VERSION, snapshotTimestamp, description,
snapshotId, compressor.numCompressedDocs(), modelSizeStats,
normalizerState, latestRecordTime,
// This needs to be the last final result time as it serves
// as the time after which all results are deleted when a
Expand Down
110 changes: 109 additions & 1 deletion lib/api/unittest/CPersistenceManagerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ BOOST_AUTO_TEST_SUITE(CPersistenceManagerTest)
namespace {

void reportPersistComplete(ml::api::CModelSnapshotJsonWriter::SModelSnapshotReport modelSnapshotReport,
ml::core_t::TTime& snapshotTimestamp,
std::string& description,
std::string& snapshotIdOut,
std::size_t& numDocsOut) {
LOG_DEBUG(<< "Persist complete with description: " << modelSnapshotReport.s_Description);
snapshotTimestamp = modelSnapshotReport.s_SnapshotTimestamp;
description = modelSnapshotReport.s_Description;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also capture the snapshot timestamp so tests can assert that is as expected too.

snapshotIdOut = modelSnapshotReport.s_SnapshotId;
numDocsOut = modelSnapshotReport.s_NumDocs;
}
Expand Down Expand Up @@ -71,10 +75,10 @@ class CTestFixture {
ml::api::CSingleStreamDataAdder::TOStreamP backgroundStreamPtr{
backgroundStream = new std::ostringstream()};
ml::api::CSingleStreamDataAdder backgroundDataAdder{backgroundStreamPtr};

std::ostringstream* foregroundStream{nullptr};
ml::api::CSingleStreamDataAdder::TOStreamP foregroundStreamPtr{
foregroundStream = new std::ostringstream()};

ml::api::CSingleStreamDataAdder foregroundDataAdder{foregroundStreamPtr};

// The 30000 second persist interval is set large enough that the timer will
Expand All @@ -83,6 +87,8 @@ class CTestFixture {
ml::api::CPersistenceManager persistenceManager{
30000, false, backgroundDataAdder, foregroundDataAdder};

ml::core_t::TTime snapshotTimestamp;
std::string description;
std::string snapshotId;
std::size_t numDocs{0};

Expand All @@ -102,6 +108,7 @@ class CTestFixture {
modelConfig,
wrappedOutputStream,
std::bind(&reportPersistComplete, std::placeholders::_1,
std::ref(snapshotTimestamp), std::ref(description),
std::ref(snapshotId), std::ref(numDocs)),
&persistenceManager,
-1,
Expand Down Expand Up @@ -217,6 +224,8 @@ class CTestFixture {
ml::api::CPersistenceManager persistenceManager{
30000, false, backgroundDataAdder, foregroundDataAdder};

ml::core_t::TTime snapshotTimestamp;
std::string description;
std::string snapshotId;
std::size_t numDocs{0};

Expand All @@ -232,6 +241,7 @@ class CTestFixture {
modelConfig,
wrappedOutputStream,
std::bind(&reportPersistComplete, std::placeholders::_1,
std::ref(snapshotTimestamp), std::ref(description),
std::ref(snapshotId), std::ref(numDocs)),
&persistenceManager,
-1,
Expand Down Expand Up @@ -287,8 +297,106 @@ class CTestFixture {

BOOST_REQUIRE_EQUAL(backgroundState, foregroundState);
}

void foregroundPersistWithGivenSnapshotDescriptors(const std::string& configFileName) {
// Start by creating processors with non-trivial state

static const ml::core_t::TTime BUCKET_SIZE{3600};
static const std::string JOB_ID{"job"};

std::string inputFilename{"testfiles/big_ascending.txt"};

// Open the input and output files
std::ifstream inputStrm{inputFilename};
BOOST_TEST_REQUIRE(inputStrm.is_open());

std::ofstream outputStrm{ml::core::COsFileFuncs::NULL_FILENAME};
BOOST_TEST_REQUIRE(outputStrm.is_open());

ml::model::CLimits limits;
ml::api::CFieldConfig fieldConfig;
BOOST_TEST_REQUIRE(fieldConfig.initFromFile(configFileName));

ml::model::CAnomalyDetectorModelConfig modelConfig{
ml::model::CAnomalyDetectorModelConfig::defaultConfig(BUCKET_SIZE)};

std::ostringstream* dataStream{nullptr};
ml::api::CSingleStreamDataAdder::TOStreamP dataStreamPtr{
dataStream = new std::ostringstream()};

// Persist the processors' state
ml::api::CSingleStreamDataAdder dataAdder{dataStreamPtr};

// The 30000 second persist interval is set large enough that the timer
// will not trigger during the test - we bypass the timer in this test
// and kick off the background persistence chain explicitly
ml::api::CPersistenceManager persistenceManager{30000, false, dataAdder};

ml::core_t::TTime snapshotTimestamp_;
std::string description_;
std::string snapshotId_;
std::size_t numDocs_{0};

std::string backgroundSnapshotId;
std::string foregroundSnapshotId;

{
ml::core::CJsonOutputStreamWrapper wrappedOutputStream{outputStrm};

CTestAnomalyJob job{
JOB_ID,
limits,
fieldConfig,
modelConfig,
wrappedOutputStream,
std::bind(&reportPersistComplete, std::placeholders::_1,
std::ref(snapshotTimestamp_), std::ref(description_),
std::ref(snapshotId_), std::ref(numDocs_)),
&persistenceManager,
-1,
"time",
"%d/%b/%Y:%T %z"};

ml::api::CDataProcessor* firstProcessor{&job};

ml::api::CNdJsonInputParser parser{
{CTestFieldDataCategorizer::MLCATEGORY_NAME}, inputStrm};

BOOST_TEST_REQUIRE(parser.readStreamIntoMaps(
[firstProcessor](const ml::api::CDataProcessor::TStrStrUMap& dataRowFields) {
return firstProcessor->handleRecord(
dataRowFields, ml::api::CDataProcessor::TOptionalTime{});
}));

// Ensure the model size stats are up to date
job.finalise();

ml::core_t::TTime snapshotTimestamp{1283524206};
const std::string snapshotId{"my_special_snapshot"};
const std::string description{"Supplied description for snapshot at " +
ml::core::CTimeUtils::toIso8601(snapshotTimestamp)};
BOOST_TEST_REQUIRE(job.doPersistStateInForeground(
dataAdder, description, snapshotId, snapshotTimestamp));

// Check that the snapshot description and Id reported by the "persist complete"
// handler match those supplied to the persist function
BOOST_REQUIRE_EQUAL(snapshotTimestamp, snapshotTimestamp_);
BOOST_REQUIRE_EQUAL(description, description_);
BOOST_REQUIRE_EQUAL(snapshotId, snapshotId_);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also assert that the snapshot timestamp is as expected.


std::string state{dataStream->str()};

// Compare snapshot ID embedded in the state string with the supplied value.
const std::string expectedId{"job_model_state_" + snapshotId + "#1"};
BOOST_TEST_REQUIRE(state.find(expectedId) != std::string::npos);
}
}
};

BOOST_FIXTURE_TEST_CASE(testDetectorPersistByWithGivenSnapshotDescriptors, CTestFixture) {
this->foregroundPersistWithGivenSnapshotDescriptors("testfiles/new_mlfields.conf");
}

BOOST_FIXTURE_TEST_CASE(testDetectorPersistBy, CTestFixture) {
this->foregroundBackgroundCompCategorizationAndAnomalyDetection("testfiles/new_mlfields.conf");
}
Expand Down