|
16 | 16 | #define INCLUDED_ml_api_CJsonOutputWriter_h |
17 | 17 |
|
18 | 18 | #include <core/CJsonOutputStreamWrapper.h> |
19 | | -#include <core/CMutex.h> |
20 | 19 | #include <core/CoreTypes.h> |
21 | 20 | #include <core/CRapidJsonConcurrentLineWriter.h> |
22 | 21 | #include <core/CSmallVector.h> |
|
36 | 35 |
|
37 | 36 | #include <iosfwd> |
38 | 37 | #include <map> |
39 | | -#include <queue> |
40 | 38 | #include <sstream> |
41 | 39 | #include <string> |
42 | 40 | #include <utility> |
@@ -190,98 +188,10 @@ class API_EXPORT CJsonOutputWriter : public COutputHandler |
190 | 188 | typedef TTimeBucketDataMap::iterator TTimeBucketDataMapItr; |
191 | 189 | typedef TTimeBucketDataMap::const_iterator TTimeBucketDataMapCItr; |
192 | 190 |
|
193 | | - |
194 | | - static const std::string JOB_ID; |
195 | | - static const std::string TIMESTAMP; |
196 | | - static const std::string BUCKET; |
197 | | - static const std::string LOG_TIME; |
198 | | - static const std::string DETECTOR_INDEX; |
199 | | - static const std::string RECORDS; |
200 | | - static const std::string EVENT_COUNT; |
201 | | - static const std::string IS_INTERIM; |
202 | | - static const std::string PROBABILITY; |
203 | | - static const std::string RAW_ANOMALY_SCORE; |
204 | | - static const std::string ANOMALY_SCORE; |
205 | | - static const std::string RECORD_SCORE; |
206 | | - static const std::string INITIAL_RECORD_SCORE; |
207 | | - static const std::string INFLUENCER_SCORE; |
208 | | - static const std::string INITIAL_INFLUENCER_SCORE; |
209 | | - static const std::string FIELD_NAME; |
210 | | - static const std::string BY_FIELD_NAME; |
211 | | - static const std::string BY_FIELD_VALUE; |
212 | | - static const std::string CORRELATED_BY_FIELD_VALUE; |
213 | | - static const std::string TYPICAL; |
214 | | - static const std::string ACTUAL; |
215 | | - static const std::string CAUSES; |
216 | | - static const std::string FUNCTION; |
217 | | - static const std::string FUNCTION_DESCRIPTION; |
218 | | - static const std::string OVER_FIELD_NAME; |
219 | | - static const std::string OVER_FIELD_VALUE; |
220 | | - static const std::string PARTITION_FIELD_NAME; |
221 | | - static const std::string PARTITION_FIELD_VALUE; |
222 | | - static const std::string INITIAL_SCORE; |
223 | | - static const std::string BUCKET_INFLUENCERS; |
224 | | - static const std::string INFLUENCERS; |
225 | | - static const std::string INFLUENCER_FIELD_NAME; |
226 | | - static const std::string INFLUENCER_FIELD_VALUE; |
227 | | - static const std::string INFLUENCER_FIELD_VALUES; |
228 | | - static const std::string FLUSH; |
229 | | - static const std::string ID; |
230 | | - static const std::string LAST_FINALIZED_BUCKET_END; |
231 | | - static const std::string QUANTILE_STATE; |
232 | | - static const std::string QUANTILES; |
233 | | - static const std::string MODEL_SIZE_STATS; |
234 | | - static const std::string MODEL_BYTES; |
235 | | - static const std::string TOTAL_BY_FIELD_COUNT; |
236 | | - static const std::string TOTAL_OVER_FIELD_COUNT; |
237 | | - static const std::string TOTAL_PARTITION_FIELD_COUNT; |
238 | | - static const std::string BUCKET_ALLOCATION_FAILURES_COUNT; |
239 | | - static const std::string MEMORY_STATUS; |
240 | | - static const std::string CATEGORY_DEFINITION; |
241 | | - static const std::string CATEGORY_ID; |
242 | | - static const std::string TERMS; |
243 | | - static const std::string REGEX; |
244 | | - static const std::string MAX_MATCHING_LENGTH; |
245 | | - static const std::string EXAMPLES; |
246 | | - static const std::string MODEL_SNAPSHOT; |
247 | | - static const std::string SNAPSHOT_ID; |
248 | | - static const std::string SNAPSHOT_DOC_COUNT; |
249 | | - static const std::string DESCRIPTION; |
250 | | - static const std::string LATEST_RECORD_TIME; |
251 | | - static const std::string BUCKET_SPAN; |
252 | | - static const std::string LATEST_RESULT_TIME; |
253 | | - static const std::string PROCESSING_TIME; |
254 | | - static const std::string TIME_INFLUENCER; |
255 | | - static const std::string PARTITION_SCORES; |
256 | | - static const std::string SCHEDULED_EVENTS; |
257 | | - |
258 | 191 | private: |
259 | 192 | typedef CCategoryExamplesCollector::TStrSet TStrSet; |
260 | 193 | typedef TStrSet::const_iterator TStrSetCItr; |
261 | 194 |
|
262 | | - struct SModelSnapshotReport |
263 | | - { |
264 | | - SModelSnapshotReport(core_t::TTime snapshotTimestamp, |
265 | | - const std::string &description, |
266 | | - const std::string &snapshotId, |
267 | | - size_t numDocs, |
268 | | - const model::CResourceMonitor::SResults &modelSizeStats, |
269 | | - const std::string &normalizerState, |
270 | | - core_t::TTime latestRecordTime, |
271 | | - core_t::TTime latestFinalResultTime); |
272 | | - |
273 | | - core_t::TTime s_SnapshotTimestamp; |
274 | | - std::string s_Description; |
275 | | - std::string s_SnapshotId; |
276 | | - size_t s_NumDocs; |
277 | | - model::CResourceMonitor::SResults s_ModelSizeStats; |
278 | | - std::string s_NormalizerState; |
279 | | - core_t::TTime s_LatestRecordTime; |
280 | | - core_t::TTime s_LatestFinalResultTime; |
281 | | - }; |
282 | | - |
283 | | - typedef std::queue<SModelSnapshotReport> TModelSnapshotReportQueue; |
284 | | - |
285 | 195 | public: |
286 | 196 | //! Constructor that causes output to be written to the specified wrapped stream |
287 | 197 | CJsonOutputWriter(const std::string &jobId, |
@@ -352,19 +262,6 @@ class API_EXPORT CJsonOutputWriter : public COutputHandler |
352 | 262 | //! from the CResourceMonitor via a callback |
353 | 263 | void reportMemoryUsage(const model::CResourceMonitor::SResults &results); |
354 | 264 |
|
355 | | - //! Report information about completion of model persistence. |
356 | | - //! This method can be called in a thread other than the one |
357 | | - //! receiving the majority of results, so reporting is done |
358 | | - //! asynchronously. |
359 | | - void reportPersistComplete(core_t::TTime snapshotTimestamp, |
360 | | - const std::string &description, |
361 | | - const std::string &snapshotId, |
362 | | - size_t numDocs, |
363 | | - const model::CResourceMonitor::SResults &modelSizeStats, |
364 | | - const std::string &normalizerState, |
365 | | - core_t::TTime latestRecordTime, |
366 | | - core_t::TTime latestFinalResultTime); |
367 | | - |
368 | 265 | //! Acknowledge a flush request by echoing back the flush ID |
369 | 266 | void acknowledgeFlush(const std::string &flushId, core_t::TTime lastFinalizedBucketEnd); |
370 | 267 |
|
@@ -427,49 +324,33 @@ class API_EXPORT CJsonOutputWriter : public COutputHandler |
427 | 324 | void addPartitionScores(const CHierarchicalResultsWriter::TResults &results, |
428 | 325 | TDocumentWeakPtr weakDoc); |
429 | 326 |
|
430 | | - //! Write any model snapshot reports that are queuing up. |
431 | | - void writeModelSnapshotReports(void); |
432 | | - |
433 | | - //! Write the JSON object showing current levels of resource usage, as |
434 | | - //! given to us from the CResourceMonitor via a callback |
435 | | - void writeMemoryUsageObject(const model::CResourceMonitor::SResults &results); |
436 | | - |
437 | | - //! Write the quantile's state |
438 | | - void writeQuantileState(const std::string &state, core_t::TTime timestamp); |
439 | | - |
440 | 327 | private: |
441 | 328 | //! The job ID |
442 | | - std::string m_JobId; |
| 329 | + std::string m_JobId; |
443 | 330 |
|
444 | 331 | //! JSON line writer |
445 | | - core::CRapidJsonConcurrentLineWriter m_Writer; |
| 332 | + core::CRapidJsonConcurrentLineWriter m_Writer; |
446 | 333 |
|
447 | 334 | //! Time of last non-interim bucket written to output |
448 | | - core_t::TTime m_LastNonInterimBucketTime; |
| 335 | + core_t::TTime m_LastNonInterimBucketTime; |
449 | 336 |
|
450 | 337 | //! Has the output been finalised? |
451 | | - bool m_Finalised; |
| 338 | + bool m_Finalised; |
452 | 339 |
|
453 | 340 | //! Max number of records to write for each bucket/detector |
454 | | - size_t m_RecordOutputLimit; |
| 341 | + size_t m_RecordOutputLimit; |
455 | 342 |
|
456 | 343 | //! Vector for building up documents representing nested sub-results. |
457 | 344 | //! The documents in this vector will reference memory owned by |
458 | 345 | //! m_JsonPoolAllocator. (Hence this is declared after the memory pool |
459 | 346 | //! so that it's destroyed first when the destructor runs.) |
460 | | - TDocumentWeakPtrVec m_NestedDocs; |
| 347 | + TDocumentWeakPtrVec m_NestedDocs; |
461 | 348 |
|
462 | 349 | //! Bucket data waiting to be written. The map is keyed on bucket time. |
463 | 350 | //! The documents in this map will reference memory owned by |
464 | 351 | //! m_JsonPoolAllocator. (Hence this is declared after the memory pool |
465 | 352 | //! so that it's destroyed first when the destructor runs.) |
466 | | - TTimeBucketDataMap m_BucketDataByTime; |
467 | | - |
468 | | - //! Protects the m_ModelSnapshotReports from concurrent access. |
469 | | - core::CMutex m_ModelSnapshotReportsQueueMutex; |
470 | | - |
471 | | - //! Queue of model snapshot reports waiting to be output. |
472 | | - TModelSnapshotReportQueue m_ModelSnapshotReports; |
| 353 | + TTimeBucketDataMap m_BucketDataByTime; |
473 | 354 | }; |
474 | 355 |
|
475 | 356 |
|
|
0 commit comments