|
7 | 7 | #define INCLUDED_ml_api_CJsonOutputWriter_h |
8 | 8 |
|
9 | 9 | #include <core/CJsonOutputStreamWrapper.h> |
10 | | -#include <core/CMutex.h> |
11 | 10 | #include <core/CoreTypes.h> |
12 | 11 | #include <core/CRapidJsonConcurrentLineWriter.h> |
13 | 12 | #include <core/CSmallVector.h> |
|
27 | 26 |
|
28 | 27 | #include <iosfwd> |
29 | 28 | #include <map> |
30 | | -#include <queue> |
31 | 29 | #include <sstream> |
32 | 30 | #include <string> |
33 | 31 | #include <utility> |
@@ -181,98 +179,10 @@ class API_EXPORT CJsonOutputWriter : public COutputHandler |
181 | 179 | typedef TTimeBucketDataMap::iterator TTimeBucketDataMapItr; |
182 | 180 | typedef TTimeBucketDataMap::const_iterator TTimeBucketDataMapCItr; |
183 | 181 |
|
184 | | - |
185 | | - static const std::string JOB_ID; |
186 | | - static const std::string TIMESTAMP; |
187 | | - static const std::string BUCKET; |
188 | | - static const std::string LOG_TIME; |
189 | | - static const std::string DETECTOR_INDEX; |
190 | | - static const std::string RECORDS; |
191 | | - static const std::string EVENT_COUNT; |
192 | | - static const std::string IS_INTERIM; |
193 | | - static const std::string PROBABILITY; |
194 | | - static const std::string RAW_ANOMALY_SCORE; |
195 | | - static const std::string ANOMALY_SCORE; |
196 | | - static const std::string RECORD_SCORE; |
197 | | - static const std::string INITIAL_RECORD_SCORE; |
198 | | - static const std::string INFLUENCER_SCORE; |
199 | | - static const std::string INITIAL_INFLUENCER_SCORE; |
200 | | - static const std::string FIELD_NAME; |
201 | | - static const std::string BY_FIELD_NAME; |
202 | | - static const std::string BY_FIELD_VALUE; |
203 | | - static const std::string CORRELATED_BY_FIELD_VALUE; |
204 | | - static const std::string TYPICAL; |
205 | | - static const std::string ACTUAL; |
206 | | - static const std::string CAUSES; |
207 | | - static const std::string FUNCTION; |
208 | | - static const std::string FUNCTION_DESCRIPTION; |
209 | | - static const std::string OVER_FIELD_NAME; |
210 | | - static const std::string OVER_FIELD_VALUE; |
211 | | - static const std::string PARTITION_FIELD_NAME; |
212 | | - static const std::string PARTITION_FIELD_VALUE; |
213 | | - static const std::string INITIAL_SCORE; |
214 | | - static const std::string BUCKET_INFLUENCERS; |
215 | | - static const std::string INFLUENCERS; |
216 | | - static const std::string INFLUENCER_FIELD_NAME; |
217 | | - static const std::string INFLUENCER_FIELD_VALUE; |
218 | | - static const std::string INFLUENCER_FIELD_VALUES; |
219 | | - static const std::string FLUSH; |
220 | | - static const std::string ID; |
221 | | - static const std::string LAST_FINALIZED_BUCKET_END; |
222 | | - static const std::string QUANTILE_STATE; |
223 | | - static const std::string QUANTILES; |
224 | | - static const std::string MODEL_SIZE_STATS; |
225 | | - static const std::string MODEL_BYTES; |
226 | | - static const std::string TOTAL_BY_FIELD_COUNT; |
227 | | - static const std::string TOTAL_OVER_FIELD_COUNT; |
228 | | - static const std::string TOTAL_PARTITION_FIELD_COUNT; |
229 | | - static const std::string BUCKET_ALLOCATION_FAILURES_COUNT; |
230 | | - static const std::string MEMORY_STATUS; |
231 | | - static const std::string CATEGORY_DEFINITION; |
232 | | - static const std::string CATEGORY_ID; |
233 | | - static const std::string TERMS; |
234 | | - static const std::string REGEX; |
235 | | - static const std::string MAX_MATCHING_LENGTH; |
236 | | - static const std::string EXAMPLES; |
237 | | - static const std::string MODEL_SNAPSHOT; |
238 | | - static const std::string SNAPSHOT_ID; |
239 | | - static const std::string SNAPSHOT_DOC_COUNT; |
240 | | - static const std::string DESCRIPTION; |
241 | | - static const std::string LATEST_RECORD_TIME; |
242 | | - static const std::string BUCKET_SPAN; |
243 | | - static const std::string LATEST_RESULT_TIME; |
244 | | - static const std::string PROCESSING_TIME; |
245 | | - static const std::string TIME_INFLUENCER; |
246 | | - static const std::string PARTITION_SCORES; |
247 | | - static const std::string SCHEDULED_EVENTS; |
248 | | - |
249 | 182 | private: |
250 | 183 | typedef CCategoryExamplesCollector::TStrSet TStrSet; |
251 | 184 | typedef TStrSet::const_iterator TStrSetCItr; |
252 | 185 |
|
253 | | - struct SModelSnapshotReport |
254 | | - { |
255 | | - SModelSnapshotReport(core_t::TTime snapshotTimestamp, |
256 | | - const std::string &description, |
257 | | - const std::string &snapshotId, |
258 | | - size_t numDocs, |
259 | | - const model::CResourceMonitor::SResults &modelSizeStats, |
260 | | - const std::string &normalizerState, |
261 | | - core_t::TTime latestRecordTime, |
262 | | - core_t::TTime latestFinalResultTime); |
263 | | - |
264 | | - core_t::TTime s_SnapshotTimestamp; |
265 | | - std::string s_Description; |
266 | | - std::string s_SnapshotId; |
267 | | - size_t s_NumDocs; |
268 | | - model::CResourceMonitor::SResults s_ModelSizeStats; |
269 | | - std::string s_NormalizerState; |
270 | | - core_t::TTime s_LatestRecordTime; |
271 | | - core_t::TTime s_LatestFinalResultTime; |
272 | | - }; |
273 | | - |
274 | | - typedef std::queue<SModelSnapshotReport> TModelSnapshotReportQueue; |
275 | | - |
276 | 186 | public: |
277 | 187 | //! Constructor that causes output to be written to the specified wrapped stream |
278 | 188 | CJsonOutputWriter(const std::string &jobId, |
@@ -343,19 +253,6 @@ class API_EXPORT CJsonOutputWriter : public COutputHandler |
343 | 253 | //! from the CResourceMonitor via a callback |
344 | 254 | void reportMemoryUsage(const model::CResourceMonitor::SResults &results); |
345 | 255 |
|
346 | | - //! Report information about completion of model persistence. |
347 | | - //! This method can be called in a thread other than the one |
348 | | - //! receiving the majority of results, so reporting is done |
349 | | - //! asynchronously. |
350 | | - void reportPersistComplete(core_t::TTime snapshotTimestamp, |
351 | | - const std::string &description, |
352 | | - const std::string &snapshotId, |
353 | | - size_t numDocs, |
354 | | - const model::CResourceMonitor::SResults &modelSizeStats, |
355 | | - const std::string &normalizerState, |
356 | | - core_t::TTime latestRecordTime, |
357 | | - core_t::TTime latestFinalResultTime); |
358 | | - |
359 | 256 | //! Acknowledge a flush request by echoing back the flush ID |
360 | 257 | void acknowledgeFlush(const std::string &flushId, core_t::TTime lastFinalizedBucketEnd); |
361 | 258 |
|
@@ -418,49 +315,33 @@ class API_EXPORT CJsonOutputWriter : public COutputHandler |
418 | 315 | void addPartitionScores(const CHierarchicalResultsWriter::TResults &results, |
419 | 316 | TDocumentWeakPtr weakDoc); |
420 | 317 |
|
421 | | - //! Write any model snapshot reports that are queuing up. |
422 | | - void writeModelSnapshotReports(void); |
423 | | - |
424 | | - //! Write the JSON object showing current levels of resource usage, as |
425 | | - //! given to us from the CResourceMonitor via a callback |
426 | | - void writeMemoryUsageObject(const model::CResourceMonitor::SResults &results); |
427 | | - |
428 | | - //! Write the quantile's state |
429 | | - void writeQuantileState(const std::string &state, core_t::TTime timestamp); |
430 | | - |
431 | 318 | private: |
432 | 319 | //! The job ID |
433 | | - std::string m_JobId; |
| 320 | + std::string m_JobId; |
434 | 321 |
|
435 | 322 | //! JSON line writer |
436 | | - core::CRapidJsonConcurrentLineWriter m_Writer; |
| 323 | + core::CRapidJsonConcurrentLineWriter m_Writer; |
437 | 324 |
|
438 | 325 | //! Time of last non-interim bucket written to output |
439 | | - core_t::TTime m_LastNonInterimBucketTime; |
| 326 | + core_t::TTime m_LastNonInterimBucketTime; |
440 | 327 |
|
441 | 328 | //! Has the output been finalised? |
442 | | - bool m_Finalised; |
| 329 | + bool m_Finalised; |
443 | 330 |
|
444 | 331 | //! Max number of records to write for each bucket/detector |
445 | | - size_t m_RecordOutputLimit; |
| 332 | + size_t m_RecordOutputLimit; |
446 | 333 |
|
447 | 334 | //! Vector for building up documents representing nested sub-results. |
448 | 335 | //! The documents in this vector will reference memory owned by |
449 | 336 | //! m_JsonPoolAllocator. (Hence this is declared after the memory pool |
450 | 337 | //! so that it's destroyed first when the destructor runs.) |
451 | | - TDocumentWeakPtrVec m_NestedDocs; |
| 338 | + TDocumentWeakPtrVec m_NestedDocs; |
452 | 339 |
|
453 | 340 | //! Bucket data waiting to be written. The map is keyed on bucket time. |
454 | 341 | //! The documents in this map will reference memory owned by |
455 | 342 | //! m_JsonPoolAllocator. (Hence this is declared after the memory pool |
456 | 343 | //! so that it's destroyed first when the destructor runs.) |
457 | | - TTimeBucketDataMap m_BucketDataByTime; |
458 | | - |
459 | | - //! Protects the m_ModelSnapshotReports from concurrent access. |
460 | | - core::CMutex m_ModelSnapshotReportsQueueMutex; |
461 | | - |
462 | | - //! Queue of model snapshot reports waiting to be output. |
463 | | - TModelSnapshotReportQueue m_ModelSnapshotReports; |
| 344 | + TTimeBucketDataMap m_BucketDataByTime; |
464 | 345 | }; |
465 | 346 |
|
466 | 347 |
|
|
0 commit comments