Skip to content

Commit b67ebb7

Browse files
committed
[ML] Clearing JSON memory allocators (#30)
Use scoped JSON memory allocators to avoid excessive memory accumulation where-ever the JSON output writer is persisted. Fixes #26
1 parent 641ea1e commit b67ebb7

File tree

4 files changed

+55
-22
lines changed

4 files changed

+55
-22
lines changed

include/core/CRapidJsonWriterBase.h

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -71,28 +71,40 @@ template<typename OUTPUT_STREAM,
7171
template<typename, typename, typename, typename, unsigned> class JSON_WRITER = rapidjson::Writer>
7272
class CRapidJsonWriterBase : public JSON_WRITER<OUTPUT_STREAM, SOURCE_ENCODING, TARGET_ENCODING, STACK_ALLOCATOR, WRITE_FLAGS> {
7373
public:
74-
using TTimeVec = std::vector<core_t::TTime>;
75-
using TStrVec = std::vector<std::string>;
76-
using TDoubleVec = std::vector<double>;
77-
using TDoubleDoublePr = std::pair<double, double>;
78-
using TDoubleDoublePrVec = std::vector<TDoubleDoublePr>;
79-
using TDoubleDoubleDoublePrPr = std::pair<double, TDoubleDoublePr>;
80-
using TDoubleDoubleDoublePrPrVec = std::vector<TDoubleDoubleDoublePrPr>;
81-
using TStrUSet = boost::unordered_set<std::string>;
82-
using TDocument = rapidjson::Document;
83-
using TValue = rapidjson::Value;
84-
using TDocumentWeakPtr = boost::weak_ptr<TDocument>;
85-
using TValuePtr = boost::shared_ptr<TValue>;
86-
87-
using TPoolAllocatorPtr = boost::shared_ptr<CRapidJsonPoolAllocator>;
88-
using TPoolAllocatorPtrStack = std::stack<TPoolAllocatorPtr>;
89-
using TStrPoolAllocatorPtrMap = boost::unordered_map<std::string, TPoolAllocatorPtr>;
90-
using TStrPoolAllocatorPtrMapItr = TStrPoolAllocatorPtrMap::iterator;
91-
using TStrPoolAllocatorPtrMapItrBoolPr = std::pair<TStrPoolAllocatorPtrMapItr, bool>;
74+
typedef std::vector<core_t::TTime> TTimeVec;
75+
typedef std::vector<std::string> TStrVec;
76+
typedef std::vector<double> TDoubleVec;
77+
typedef std::pair<double, double> TDoubleDoublePr;
78+
typedef std::vector<TDoubleDoublePr> TDoubleDoublePrVec;
79+
typedef std::pair<double, TDoubleDoublePr> TDoubleDoubleDoublePrPr;
80+
typedef std::vector<TDoubleDoubleDoublePrPr> TDoubleDoubleDoublePrPrVec;
81+
typedef boost::unordered_set<std::string> TStrUSet;
82+
typedef rapidjson::Document TDocument;
83+
typedef rapidjson::Value TValue;
84+
typedef boost::weak_ptr<TDocument> TDocumentWeakPtr;
85+
typedef boost::shared_ptr<TValue> TValuePtr;
86+
87+
typedef boost::shared_ptr<CRapidJsonPoolAllocator> TPoolAllocatorPtr;
88+
typedef std::stack<TPoolAllocatorPtr> TPoolAllocatorPtrStack;
89+
typedef boost::unordered_map<std::string, TPoolAllocatorPtr> TStrPoolAllocatorPtrMap;
90+
typedef TStrPoolAllocatorPtrMap::iterator TStrPoolAllocatorPtrMapItr;
91+
typedef std::pair<TStrPoolAllocatorPtrMapItr, bool> TStrPoolAllocatorPtrMapItrBoolPr;
9292

9393
public:
9494
using TRapidJsonWriterBase = JSON_WRITER<OUTPUT_STREAM, SOURCE_ENCODING, TARGET_ENCODING, STACK_ALLOCATOR, WRITE_FLAGS>;
9595

96+
//! Instances of this class may very well be long lived, potentially for the lifetime of the application.
97+
//! Over the course of that lifetime resources will accumulate in the underlying rapidjson memory
98+
//! allocator. To prevent excessive memory expansion these resources will need to be cleaned regularly.
99+
//!
100+
//! In preference to clients of this class explicitly clearing the allocator a helper/wrapper class -
101+
//! \p CScopedRapidJsonPoolAllocator - is provided. This helper has an RAII style interface that clears the
102+
//! allocator when it goes out of scope which requires that the writer provides the push/popAllocator
103+
//! functions. The intent of this approach is to make it possible to use one or two separate allocators
104+
//! for the writer at nested scope.
105+
//!
106+
//! Note that allocators are not destroyed by the pop operation, they persist for the lifetime of the
107+
//! writer in a cache for swift retrieval.
96108
CRapidJsonWriterBase(OUTPUT_STREAM& os) : TRapidJsonWriterBase(os) {
97109
// push a default rapidjson allocator onto our stack
98110
m_JsonPoolAllocators.push(boost::make_shared<CRapidJsonPoolAllocator>());
@@ -105,10 +117,9 @@ class CRapidJsonWriterBase : public JSON_WRITER<OUTPUT_STREAM, SOURCE_ENCODING,
105117

106118
CRapidJsonWriterBase(CRapidJsonWriterBase&& rhs) : TRapidJsonWriterBase(std::move(rhs)) {}
107119

108-
virtual ~CRapidJsonWriterBase() {
109-
// clean up resources
110-
m_JsonPoolAllocators.pop();
111-
}
120+
// No need for an explicit destructor here as the allocators clear themselves
121+
// on destruction.
122+
virtual ~CRapidJsonWriterBase() = default;
112123

113124
//! Push a named allocator on to the stack
114125
//! Look in the cache for the allocator - creating it if not present

lib/api/CJsonOutputWriter.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
#include <api/CJsonOutputWriter.h>
1717

18+
#include <core/CScopedRapidJsonPoolAllocator.h>
1819
#include <core/CStringUtils.h>
1920
#include <core/CTimeUtils.h>
2021

@@ -345,6 +346,10 @@ const CJsonOutputWriter::TStrVec& CJsonOutputWriter::fieldNames() const {
345346
}
346347

347348
bool CJsonOutputWriter::writeRow(const TStrStrUMap& dataRowFields, const TStrStrUMap& overrideDataRowFields) {
349+
using TScopedAllocator = core::CScopedRapidJsonPoolAllocator<core::CRapidJsonConcurrentLineWriter>;
350+
351+
TScopedAllocator scopedAllocator("CJsonOutputWriter::writeRow", m_Writer);
352+
348353
rapidjson::Document doc = m_Writer.makeDoc();
349354

350355
// Write all the fields to the document as strings

lib/api/CLineifiedJsonOutputWriter.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
*/
1515
#include <api/CLineifiedJsonOutputWriter.h>
1616

17+
#include <core/CScopedRapidJsonPoolAllocator.h>
1718
#include <core/CSleep.h>
1819
#include <core/CStringUtils.h>
1920

@@ -58,6 +59,9 @@ const CLineifiedJsonOutputWriter::TStrVec& CLineifiedJsonOutputWriter::fieldName
5859
}
5960

6061
bool CLineifiedJsonOutputWriter::writeRow(const TStrStrUMap& dataRowFields, const TStrStrUMap& overrideDataRowFields) {
62+
using TScopedAllocator = core::CScopedRapidJsonPoolAllocator<TGenericLineWriter>;
63+
TScopedAllocator scopedAllocator("CLineifiedJsonOutputWriter::writeRow", m_Writer);
64+
6165
rapidjson::Document doc = m_Writer.makeDoc();
6266

6367
// Write all the fields to the document as strings

lib/model/CForecastDataSink.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <model/CForecastDataSink.h>
1717

1818
#include <core/CLogger.h>
19+
#include <core/CScopedRapidJsonPoolAllocator.h>
1920

2021
#include <vector>
2122

@@ -60,6 +61,8 @@ const std::string CForecastDataSink::PROCESSING_TIME_MS("processing_time_ms");
6061
const std::string CForecastDataSink::PROGRESS("forecast_progress");
6162
const std::string CForecastDataSink::STATUS("forecast_status");
6263

64+
using TScopedAllocator = core::CScopedRapidJsonPoolAllocator<core::CRapidJsonConcurrentLineWriter>;
65+
6366
CForecastDataSink::SForecastModelWrapper::SForecastModelWrapper(model_t::EFeature feature,
6467
TMathsModelPtr&& forecastModel,
6568
const std::string& byFieldValue)
@@ -102,6 +105,8 @@ CForecastDataSink::CForecastDataSink(const std::string& jobId,
102105
}
103106

104107
void CForecastDataSink::writeStats(const double progress, uint64_t runtime, const TStrUMap& messages, bool successful) {
108+
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
109+
105110
rapidjson::Document doc = m_Writer.makeDoc();
106111

107112
this->writeCommonStatsFields(doc);
@@ -134,6 +139,8 @@ void CForecastDataSink::writeScheduledMessage() {
134139
}
135140

136141
void CForecastDataSink::writeErrorMessage(const std::string& message) {
142+
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
143+
137144
rapidjson::Document doc = m_Writer.makeDoc();
138145
this->writeCommonStatsFields(doc);
139146
TStrVec messages{message};
@@ -143,6 +150,8 @@ void CForecastDataSink::writeErrorMessage(const std::string& message) {
143150
}
144151

145152
void CForecastDataSink::writeFinalMessage(const std::string& message) {
153+
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
154+
146155
rapidjson::Document doc = m_Writer.makeDoc();
147156
this->writeCommonStatsFields(doc);
148157
TStrVec messages{message};
@@ -165,6 +174,8 @@ void CForecastDataSink::writeCommonStatsFields(rapidjson::Value& doc) {
165174
}
166175

167176
void CForecastDataSink::push(bool flush, rapidjson::Value& doc) {
177+
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
178+
168179
rapidjson::Document wrapper = m_Writer.makeDoc();
169180

170181
m_Writer.addMember(MODEL_FORECAST_STATS, doc, wrapper);
@@ -186,6 +197,8 @@ void CForecastDataSink::push(const maths::SErrorBar errorBar,
186197
const std::string& byFieldName,
187198
const std::string& byFieldValue,
188199
int detectorIndex) {
200+
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);
201+
189202
++m_NumRecordsWritten;
190203
rapidjson::Document doc = m_Writer.makeDoc();
191204

0 commit comments

Comments
 (0)