Skip to content

Commit 7b7df1b

Browse files
committed
[ML] Clearing JSON memory allocators (#30)
Use scoped JSON memory allocators to avoid excessive memory accumulation where-ever the JSON output writer is persisted. Fixes #26
1 parent b77a294 commit 7b7df1b

File tree

4 files changed

+55
-22
lines changed

4 files changed

+55
-22
lines changed

include/core/CRapidJsonWriterBase.h

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -62,28 +62,40 @@ template<typename OUTPUT_STREAM,
6262
template<typename, typename, typename, typename, unsigned> class JSON_WRITER = rapidjson::Writer>
6363
class CRapidJsonWriterBase : public JSON_WRITER<OUTPUT_STREAM, SOURCE_ENCODING, TARGET_ENCODING, STACK_ALLOCATOR, WRITE_FLAGS> {
6464
public:
65-
using TTimeVec = std::vector<core_t::TTime>;
66-
using TStrVec = std::vector<std::string>;
67-
using TDoubleVec = std::vector<double>;
68-
using TDoubleDoublePr = std::pair<double, double>;
69-
using TDoubleDoublePrVec = std::vector<TDoubleDoublePr>;
70-
using TDoubleDoubleDoublePrPr = std::pair<double, TDoubleDoublePr>;
71-
using TDoubleDoubleDoublePrPrVec = std::vector<TDoubleDoubleDoublePrPr>;
72-
using TStrUSet = boost::unordered_set<std::string>;
73-
using TDocument = rapidjson::Document;
74-
using TValue = rapidjson::Value;
75-
using TDocumentWeakPtr = boost::weak_ptr<TDocument>;
76-
using TValuePtr = boost::shared_ptr<TValue>;
77-
78-
using TPoolAllocatorPtr = boost::shared_ptr<CRapidJsonPoolAllocator>;
79-
using TPoolAllocatorPtrStack = std::stack<TPoolAllocatorPtr>;
80-
using TStrPoolAllocatorPtrMap = boost::unordered_map<std::string, TPoolAllocatorPtr>;
81-
using TStrPoolAllocatorPtrMapItr = TStrPoolAllocatorPtrMap::iterator;
82-
using TStrPoolAllocatorPtrMapItrBoolPr = std::pair<TStrPoolAllocatorPtrMapItr, bool>;
65+
typedef std::vector<core_t::TTime> TTimeVec;
66+
typedef std::vector<std::string> TStrVec;
67+
typedef std::vector<double> TDoubleVec;
68+
typedef std::pair<double, double> TDoubleDoublePr;
69+
typedef std::vector<TDoubleDoublePr> TDoubleDoublePrVec;
70+
typedef std::pair<double, TDoubleDoublePr> TDoubleDoubleDoublePrPr;
71+
typedef std::vector<TDoubleDoubleDoublePrPr> TDoubleDoubleDoublePrPrVec;
72+
typedef boost::unordered_set<std::string> TStrUSet;
73+
typedef rapidjson::Document TDocument;
74+
typedef rapidjson::Value TValue;
75+
typedef boost::weak_ptr<TDocument> TDocumentWeakPtr;
76+
typedef boost::shared_ptr<TValue> TValuePtr;
77+
78+
typedef boost::shared_ptr<CRapidJsonPoolAllocator> TPoolAllocatorPtr;
79+
typedef std::stack<TPoolAllocatorPtr> TPoolAllocatorPtrStack;
80+
typedef boost::unordered_map<std::string, TPoolAllocatorPtr> TStrPoolAllocatorPtrMap;
81+
typedef TStrPoolAllocatorPtrMap::iterator TStrPoolAllocatorPtrMapItr;
82+
typedef std::pair<TStrPoolAllocatorPtrMapItr, bool> TStrPoolAllocatorPtrMapItrBoolPr;
8383

8484
public:
8585
using TRapidJsonWriterBase = JSON_WRITER<OUTPUT_STREAM, SOURCE_ENCODING, TARGET_ENCODING, STACK_ALLOCATOR, WRITE_FLAGS>;
8686

87+
//! Instances of this class may very well be long lived, potentially for the lifetime of the application.
88+
//! Over the course of that lifetime resources will accumulate in the underlying rapidjson memory
89+
//! allocator. To prevent excessive memory expansion these resources will need to be cleaned regularly.
90+
//!
91+
//! In preference to clients of this class explicitly clearing the allocator a helper/wrapper class -
92+
//! \p CScopedRapidJsonPoolAllocator - is provided. This helper has an RAII style interface that clears the
93+
//! allocator when it goes out of scope which requires that the writer provides the push/popAllocator
94+
//! functions. The intent of this approach is to make it possible to use one or two separate allocators
95+
//! for the writer at nested scope.
96+
//!
97+
//! Note that allocators are not destroyed by the pop operation, they persist for the lifetime of the
98+
//! writer in a cache for swift retrieval.
8799
CRapidJsonWriterBase(OUTPUT_STREAM& os) : TRapidJsonWriterBase(os) {
88100
// push a default rapidjson allocator onto our stack
89101
m_JsonPoolAllocators.push(boost::make_shared<CRapidJsonPoolAllocator>());
@@ -96,10 +108,9 @@ class CRapidJsonWriterBase : public JSON_WRITER<OUTPUT_STREAM, SOURCE_ENCODING,
96108

97109
CRapidJsonWriterBase(CRapidJsonWriterBase&& rhs) : TRapidJsonWriterBase(std::move(rhs)) {}
98110

99-
virtual ~CRapidJsonWriterBase() {
100-
// clean up resources
101-
m_JsonPoolAllocators.pop();
102-
}
111+
// No need for an explicit destructor here as the allocators clear themselves
112+
// on destruction.
113+
virtual ~CRapidJsonWriterBase() = default;
103114

104115
//! Push a named allocator on to the stack
105116
//! Look in the cache for the allocator - creating it if not present

lib/api/CJsonOutputWriter.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
#include <api/CJsonOutputWriter.h>
88

9+
#include <core/CScopedRapidJsonPoolAllocator.h>
910
#include <core/CStringUtils.h>
1011
#include <core/CTimeUtils.h>
1112

@@ -336,6 +337,10 @@ const CJsonOutputWriter::TStrVec& CJsonOutputWriter::fieldNames() const {
336337
}
337338

338339
bool CJsonOutputWriter::writeRow(const TStrStrUMap& dataRowFields, const TStrStrUMap& overrideDataRowFields) {
340+
using TScopedAllocator = core::CScopedRapidJsonPoolAllocator<core::CRapidJsonConcurrentLineWriter>;
341+
342+
TScopedAllocator scopedAllocator("CJsonOutputWriter::writeRow", m_Writer);
343+
339344
rapidjson::Document doc = m_Writer.makeDoc();
340345

341346
// Write all the fields to the document as strings

lib/api/CLineifiedJsonOutputWriter.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
#include <api/CLineifiedJsonOutputWriter.h>
77

8+
#include <core/CScopedRapidJsonPoolAllocator.h>
89
#include <core/CSleep.h>
910
#include <core/CStringUtils.h>
1011

@@ -49,6 +50,9 @@ const CLineifiedJsonOutputWriter::TStrVec& CLineifiedJsonOutputWriter::fieldName
4950
}
5051

5152
bool CLineifiedJsonOutputWriter::writeRow(const TStrStrUMap& dataRowFields, const TStrStrUMap& overrideDataRowFields) {
53+
using TScopedAllocator = core::CScopedRapidJsonPoolAllocator<TGenericLineWriter>;
54+
TScopedAllocator scopedAllocator("CLineifiedJsonOutputWriter::writeRow", m_Writer);
55+
5256
rapidjson::Document doc = m_Writer.makeDoc();
5357

5458
// Write all the fields to the document as strings

lib/model/CForecastDataSink.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <model/CForecastDataSink.h>
88

99
#include <core/CLogger.h>
10+
#include <core/CScopedRapidJsonPoolAllocator.h>
1011

1112
#include <vector>
1213

@@ -51,6 +52,8 @@ const std::string CForecastDataSink::PROCESSING_TIME_MS("processing_time_ms");
5152
const std::string CForecastDataSink::PROGRESS("forecast_progress");
5253
const std::string CForecastDataSink::STATUS("forecast_status");
5354

55+
using TScopedAllocator = core::CScopedRapidJsonPoolAllocator<core::CRapidJsonConcurrentLineWriter>;
56+
5457
CForecastDataSink::SForecastModelWrapper::SForecastModelWrapper(model_t::EFeature feature,
5558
TMathsModelPtr&& forecastModel,
5659
const std::string& byFieldValue)
@@ -93,6 +96,8 @@ CForecastDataSink::CForecastDataSink(const std::string& jobId,
9396
}
9497

9598
void CForecastDataSink::writeStats(const double progress, uint64_t runtime, const TStrUMap& messages, bool successful) {
99+
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
100+
96101
rapidjson::Document doc = m_Writer.makeDoc();
97102

98103
this->writeCommonStatsFields(doc);
@@ -125,6 +130,8 @@ void CForecastDataSink::writeScheduledMessage() {
125130
}
126131

127132
void CForecastDataSink::writeErrorMessage(const std::string& message) {
133+
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
134+
128135
rapidjson::Document doc = m_Writer.makeDoc();
129136
this->writeCommonStatsFields(doc);
130137
TStrVec messages{message};
@@ -134,6 +141,8 @@ void CForecastDataSink::writeErrorMessage(const std::string& message) {
134141
}
135142

136143
void CForecastDataSink::writeFinalMessage(const std::string& message) {
144+
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
145+
137146
rapidjson::Document doc = m_Writer.makeDoc();
138147
this->writeCommonStatsFields(doc);
139148
TStrVec messages{message};
@@ -156,6 +165,8 @@ void CForecastDataSink::writeCommonStatsFields(rapidjson::Value& doc) {
156165
}
157166

158167
void CForecastDataSink::push(bool flush, rapidjson::Value& doc) {
168+
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
169+
159170
rapidjson::Document wrapper = m_Writer.makeDoc();
160171

161172
m_Writer.addMember(MODEL_FORECAST_STATS, doc, wrapper);
@@ -177,6 +188,8 @@ void CForecastDataSink::push(const maths::SErrorBar errorBar,
177188
const std::string& byFieldName,
178189
const std::string& byFieldValue,
179190
int detectorIndex) {
191+
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
192+
180193
++m_NumRecordsWritten;
181194
rapidjson::Document doc = m_Writer.makeDoc();
182195

0 commit comments

Comments
 (0)