Skip to content

Commit 5cdd02b

Browse files
authored
[ML] Add internal flag to flush api determining whether to refresh indices or not. (#2532)
When datafeeds send flush requests they don't require the indices to be refreshed. Round trip the "should_refresh" flag from ES to ml-cpp code and back again.
1 parent f858c0a commit 5cdd02b

File tree

5 files changed

+26
-5
lines changed

5 files changed

+26
-5
lines changed

include/api/CAnomalyJob.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,9 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
504504
//! Flag indicating whether or not time has been advanced.
505505
bool m_TimeAdvanced{false};
506506

507+
//! Flag indicating whether or not a flush control message should trigger a refresh of the datafeed
508+
bool m_RefreshRequired{true};
509+
507510
//! Introduced in version 8.6
508511
//! The initial value of the end time of the last bucket
509512
//! out of latency window we've seen, i.e. this member records

include/api/CJsonOutputWriter.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,8 +244,10 @@ class API_EXPORT CJsonOutputWriter {
244244
const model::SCategorizerStats& categorizerStats,
245245
const TOptionalTime& timestamp);
246246

247-
//! Acknowledge a flush request by echoing back the flush ID
248-
void acknowledgeFlush(const std::string& flushId, core_t::TTime lastFinalizedBucketEnd);
247+
//! Acknowledge a flush request by echoing back the flush ID and the "refreshRequired" flag
248+
void acknowledgeFlush(const std::string& flushId,
249+
core_t::TTime lastFinalizedBucketEnd,
250+
bool refreshRequired = true);
249251

250252
//! Write a category definition
251253
void writeCategoryDefinition(const std::string& partitionFieldName,

lib/api/CAnomalyJob.cc

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ bool CAnomalyJob::handleControlMessage(const std::string& controlMessage) {
323323
return false;
324324
}
325325

326+
bool refreshRequired{true};
326327
switch (controlMessage[0]) {
327328
case ' ':
328329
// Spaces are just used to fill the buffers and force prior messages
@@ -358,6 +359,17 @@ bool CAnomalyJob::handleControlMessage(const std::string& controlMessage) {
358359
break;
359360
case 'w':
360361
this->processPersistControlMessage(controlMessage.substr(1));
362+
break;
363+
case 'z':
364+
LOG_TRACE(<< "Received control message '" << controlMessage << "'");
365+
// "refreshRequired" parameter comes after the initial z.
366+
if (core::CStringUtils::stringToType(controlMessage.substr(1), refreshRequired) == false) {
367+
LOG_ERROR(<< "Received request to flush with invalid control message '"
368+
<< controlMessage << "'");
369+
} else {
370+
m_RefreshRequired = refreshRequired;
371+
}
372+
361373
break;
362374
default:
363375
LOG_WARN(<< "Ignoring unknown control message of length "
@@ -454,7 +466,7 @@ void CAnomalyJob::acknowledgeFlush(const std::string& flushId) {
454466
} else {
455467
LOG_TRACE(<< "Received flush control message with ID " << flushId);
456468
}
457-
m_JsonOutputWriter.acknowledgeFlush(flushId, m_LastFinalisedBucketEndTime);
469+
m_JsonOutputWriter.acknowledgeFlush(flushId, m_LastFinalisedBucketEndTime, m_RefreshRequired);
458470
}
459471

460472
void CAnomalyJob::updateConfig(const std::string& config) {

lib/api/CJsonOutputWriter.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ const std::string INFLUENCERS("influencers");
6767
const std::string FLUSH("flush");
6868
const std::string ID("id");
6969
const std::string LAST_FINALIZED_BUCKET_END("last_finalized_bucket_end");
70+
const std::string REFRESH_REQUIRED("refresh_required");
7071
const std::string CATEGORY_ID("category_id");
7172
const std::string CATEGORY_DEFINITION("category_definition");
7273
const std::string TERMS("terms");
@@ -874,7 +875,8 @@ void CJsonOutputWriter::writeCategorizerStats(const std::string& partitionFieldN
874875
}
875876

876877
void CJsonOutputWriter::acknowledgeFlush(const std::string& flushId,
877-
core_t::TTime lastFinalizedBucketEnd) {
878+
core_t::TTime lastFinalizedBucketEnd,
879+
bool refreshRequired) {
878880
m_Writer.StartObject();
879881
m_Writer.Key(FLUSH);
880882
m_Writer.StartObject();
@@ -883,6 +885,8 @@ void CJsonOutputWriter::acknowledgeFlush(const std::string& flushId,
883885
m_Writer.String(flushId);
884886
m_Writer.Key(LAST_FINALIZED_BUCKET_END);
885887
m_Writer.Time(lastFinalizedBucketEnd);
888+
m_Writer.Key(REFRESH_REQUIRED);
889+
m_Writer.Bool(refreshRequired);
886890

887891
m_Writer.EndObject();
888892
m_Writer.EndObject();

lib/api/unittest/CFieldDataCategorizerTest.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ BOOST_AUTO_TEST_CASE(testHandleControlMessages) {
573573

574574
const std::string& output{outputStrm.str()};
575575
LOG_DEBUG(<< "Output is: " << output);
576-
BOOST_REQUIRE_EQUAL(0, output.find("[{\"flush\":{\"id\":\"7\",\"last_finalized_bucket_end\":0}}"));
576+
BOOST_REQUIRE_EQUAL(0, output.find("[{\"flush\":{\"id\":\"7\",\"last_finalized_bucket_end\":0,\"refresh_required\":true}}"));
577577
}
578578

579579
BOOST_AUTO_TEST_CASE(testRestoreStateFailsWithEmptyState) {

0 commit comments

Comments
 (0)