Skip to content

Commit efa2fb5

Browse files
authored
[ML] Clearing JSON memory allocators (#30)
[ML] Clearing JSON memory allocators Use scoped JSON memory allocators to avoid excessive memory accumulation where-ever the JSON output writer is persisted. Fixes #26
1 parent 306bae4 commit efa2fb5

File tree

4 files changed

+55
-22
lines changed

4 files changed

+55
-22
lines changed

include/core/CRapidJsonWriterBase.h

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -71,28 +71,40 @@ template<typename OUTPUT_STREAM,
7171
template<typename, typename, typename, typename, unsigned> class JSON_WRITER = rapidjson::Writer>
7272
class CRapidJsonWriterBase : public JSON_WRITER<OUTPUT_STREAM, SOURCE_ENCODING, TARGET_ENCODING, STACK_ALLOCATOR, WRITE_FLAGS> {
7373
public:
74-
using TTimeVec = std::vector<core_t::TTime>;
75-
using TStrVec = std::vector<std::string>;
76-
using TDoubleVec = std::vector<double>;
77-
using TDoubleDoublePr = std::pair<double, double>;
78-
using TDoubleDoublePrVec = std::vector<TDoubleDoublePr>;
79-
using TDoubleDoubleDoublePrPr = std::pair<double, TDoubleDoublePr>;
80-
using TDoubleDoubleDoublePrPrVec = std::vector<TDoubleDoubleDoublePrPr>;
81-
using TStrUSet = boost::unordered_set<std::string>;
82-
using TDocument = rapidjson::Document;
83-
using TValue = rapidjson::Value;
84-
using TDocumentWeakPtr = boost::weak_ptr<TDocument>;
85-
using TValuePtr = boost::shared_ptr<TValue>;
86-
87-
using TPoolAllocatorPtr = boost::shared_ptr<CRapidJsonPoolAllocator>;
88-
using TPoolAllocatorPtrStack = std::stack<TPoolAllocatorPtr>;
89-
using TStrPoolAllocatorPtrMap = boost::unordered_map<std::string, TPoolAllocatorPtr>;
90-
using TStrPoolAllocatorPtrMapItr = TStrPoolAllocatorPtrMap::iterator;
91-
using TStrPoolAllocatorPtrMapItrBoolPr = std::pair<TStrPoolAllocatorPtrMapItr, bool>;
74+
typedef std::vector<core_t::TTime> TTimeVec;
75+
typedef std::vector<std::string> TStrVec;
76+
typedef std::vector<double> TDoubleVec;
77+
typedef std::pair<double, double> TDoubleDoublePr;
78+
typedef std::vector<TDoubleDoublePr> TDoubleDoublePrVec;
79+
typedef std::pair<double, TDoubleDoublePr> TDoubleDoubleDoublePrPr;
80+
typedef std::vector<TDoubleDoubleDoublePrPr> TDoubleDoubleDoublePrPrVec;
81+
typedef boost::unordered_set<std::string> TStrUSet;
82+
typedef rapidjson::Document TDocument;
83+
typedef rapidjson::Value TValue;
84+
typedef boost::weak_ptr<TDocument> TDocumentWeakPtr;
85+
typedef boost::shared_ptr<TValue> TValuePtr;
86+
87+
typedef boost::shared_ptr<CRapidJsonPoolAllocator> TPoolAllocatorPtr;
88+
typedef std::stack<TPoolAllocatorPtr> TPoolAllocatorPtrStack;
89+
typedef boost::unordered_map<std::string, TPoolAllocatorPtr> TStrPoolAllocatorPtrMap;
90+
typedef TStrPoolAllocatorPtrMap::iterator TStrPoolAllocatorPtrMapItr;
91+
typedef std::pair<TStrPoolAllocatorPtrMapItr, bool> TStrPoolAllocatorPtrMapItrBoolPr;
9292

9393
public:
9494
using TRapidJsonWriterBase = JSON_WRITER<OUTPUT_STREAM, SOURCE_ENCODING, TARGET_ENCODING, STACK_ALLOCATOR, WRITE_FLAGS>;
9595

96+
//! Instances of this class may very well be long lived, potentially for the lifetime of the application.
97+
//! Over the course of that lifetime resources will accumulate in the underlying rapidjson memory
98+
//! allocator. To prevent excessive memory expansion these resources will need to be cleaned regularly.
99+
//!
100+
//! In preference to clients of this class explicitly clearing the allocator a helper/wrapper class -
101+
//! \p CScopedRapidJsonPoolAllocator - is provided. This helper has an RAII style interface that clears the
102+
//! allocator when it goes out of scope which requires that the writer provides the push/popAllocator
103+
//! functions. The intent of this approach is to make it possible to use one or two separate allocators
104+
//! for the writer at nested scope.
105+
//!
106+
//! Note that allocators are not destroyed by the pop operation, they persist for the lifetime of the
107+
//! writer in a cache for swift retrieval.
96108
CRapidJsonWriterBase(OUTPUT_STREAM& os) : TRapidJsonWriterBase(os) {
97109
// push a default rapidjson allocator onto our stack
98110
m_JsonPoolAllocators.push(boost::make_shared<CRapidJsonPoolAllocator>());
@@ -103,10 +115,9 @@ class CRapidJsonWriterBase : public JSON_WRITER<OUTPUT_STREAM, SOURCE_ENCODING,
103115
m_JsonPoolAllocators.push(boost::make_shared<CRapidJsonPoolAllocator>());
104116
}
105117

106-
virtual ~CRapidJsonWriterBase() {
107-
// clean up resources
108-
m_JsonPoolAllocators.pop();
109-
}
118+
// No need for an explicit destructor here as the allocators clear themselves
119+
// on destruction.
120+
virtual ~CRapidJsonWriterBase() = default;
110121

111122
//! Push a named allocator on to the stack
112123
//! Look in the cache for the allocator - creating it if not present

lib/api/CJsonOutputWriter.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
#include <api/CJsonOutputWriter.h>
1717

18+
#include <core/CScopedRapidJsonPoolAllocator.h>
1819
#include <core/CStringUtils.h>
1920
#include <core/CTimeUtils.h>
2021

@@ -345,6 +346,10 @@ const CJsonOutputWriter::TStrVec& CJsonOutputWriter::fieldNames() const {
345346
}
346347

347348
bool CJsonOutputWriter::writeRow(const TStrStrUMap& dataRowFields, const TStrStrUMap& overrideDataRowFields) {
349+
using TScopedAllocator = core::CScopedRapidJsonPoolAllocator<core::CRapidJsonConcurrentLineWriter>;
350+
351+
TScopedAllocator scopedAllocator("CJsonOutputWriter::writeRow", m_Writer);
352+
348353
rapidjson::Document doc = m_Writer.makeDoc();
349354

350355
// Write all the fields to the document as strings

lib/api/CLineifiedJsonOutputWriter.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
*/
1515
#include <api/CLineifiedJsonOutputWriter.h>
1616

17+
#include <core/CScopedRapidJsonPoolAllocator.h>
1718
#include <core/CSleep.h>
1819
#include <core/CStringUtils.h>
1920

@@ -58,6 +59,9 @@ const CLineifiedJsonOutputWriter::TStrVec& CLineifiedJsonOutputWriter::fieldName
5859
}
5960

6061
bool CLineifiedJsonOutputWriter::writeRow(const TStrStrUMap& dataRowFields, const TStrStrUMap& overrideDataRowFields) {
62+
using TScopedAllocator = core::CScopedRapidJsonPoolAllocator<TGenericLineWriter>;
63+
TScopedAllocator scopedAllocator("CLineifiedJsonOutputWriter::writeRow", m_Writer);
64+
6165
rapidjson::Document doc = m_Writer.makeDoc();
6266

6367
// Write all the fields to the document as strings

lib/model/CForecastDataSink.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <model/CForecastDataSink.h>
1717

1818
#include <core/CLogger.h>
19+
#include <core/CScopedRapidJsonPoolAllocator.h>
1920

2021
#include <vector>
2122

@@ -61,6 +62,8 @@ const std::string CForecastDataSink::PROCESSING_TIME_MS("processing_time_ms");
6162
const std::string CForecastDataSink::PROGRESS("forecast_progress");
6263
const std::string CForecastDataSink::STATUS("forecast_status");
6364

65+
using TScopedAllocator = core::CScopedRapidJsonPoolAllocator<core::CRapidJsonConcurrentLineWriter>;
66+
6467
CForecastDataSink::SForecastModelWrapper::SForecastModelWrapper(model_t::EFeature feature,
6568
TMathsModelPtr&& forecastModel,
6669
const std::string& byFieldValue)
@@ -105,6 +108,8 @@ CForecastDataSink::CForecastDataSink(const std::string& jobId,
105108
}
106109

107110
void CForecastDataSink::writeStats(const double progress, uint64_t runtime, const TStrUMap& messages, bool successful) {
111+
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
112+
108113
rapidjson::Document doc = m_Writer.makeDoc();
109114

110115
this->writeCommonStatsFields(doc);
@@ -137,6 +142,8 @@ void CForecastDataSink::writeScheduledMessage() {
137142
}
138143

139144
void CForecastDataSink::writeErrorMessage(const std::string& message) {
145+
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
146+
140147
rapidjson::Document doc = m_Writer.makeDoc();
141148
this->writeCommonStatsFields(doc);
142149
TStrVec messages{message};
@@ -146,6 +153,8 @@ void CForecastDataSink::writeErrorMessage(const std::string& message) {
146153
}
147154

148155
void CForecastDataSink::writeFinalMessage(const std::string& message) {
156+
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
157+
149158
rapidjson::Document doc = m_Writer.makeDoc();
150159
this->writeCommonStatsFields(doc);
151160
TStrVec messages{message};
@@ -171,6 +180,8 @@ void CForecastDataSink::writeCommonStatsFields(rapidjson::Value& doc) {
171180
}
172181

173182
void CForecastDataSink::push(bool flush, rapidjson::Value& doc) {
183+
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
184+
174185
rapidjson::Document wrapper = m_Writer.makeDoc();
175186

176187
m_Writer.addMember(MODEL_FORECAST_STATS, doc, wrapper);
@@ -192,6 +203,8 @@ void CForecastDataSink::push(const maths::SErrorBar errorBar,
192203
const std::string& byFieldName,
193204
const std::string& byFieldValue,
194205
int detectorIndex) {
206+
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
207+
195208
++m_NumRecordsWritten;
196209
rapidjson::Document doc = m_Writer.makeDoc();
197210

0 commit comments

Comments
 (0)