Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions include/core/CRapidJsonWriterBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@ class CRapidJsonWriterBase : public JSON_WRITER<OUTPUT_STREAM, SOURCE_ENCODING,
public:
using TRapidJsonWriterBase = JSON_WRITER<OUTPUT_STREAM, SOURCE_ENCODING, TARGET_ENCODING, STACK_ALLOCATOR, WRITE_FLAGS>;

//! Instances of this class may very well be long lived, potentially for the lifetime of the application.
//! Over the course of that lifetime resources will accumulate in the underlying rapidjson memory
//! allocator. To prevent excessive memory expansion these resources will need to be cleaned regularly.
//!
//! In preference to clients of this class explicitly clearing the allocator a helper/wrapper class -
//! \p CScopedRapidJsonPoolAllocator - is provided. This helper has an RAII style interface that clears the
//! allocator when it goes out of scope which requires that the writer provides the push/popAllocator
//! functions. The intent of this approach is to make it possible to use one or two separate allocators
//! for the writer at nested scope.
//!
//! Note that allocators are not destroyed by the pop operation, they persist for the lifetime of the
//! writer in a cache for swift retrieval.
CRapidJsonWriterBase(OUTPUT_STREAM &os) : TRapidJsonWriterBase(os)
{
// push a default rapidjson allocator onto our stack
Expand All @@ -112,11 +124,9 @@ class CRapidJsonWriterBase : public JSON_WRITER<OUTPUT_STREAM, SOURCE_ENCODING,
m_JsonPoolAllocators.push(boost::make_shared<CRapidJsonPoolAllocator>());
}

virtual ~CRapidJsonWriterBase()
{
// clean up resources
m_JsonPoolAllocators.pop();
}
// No need for an explicit destructor here as the allocators clear themselves
// on destruction.
virtual ~CRapidJsonWriterBase() = default;

//! Push a named allocator on to the stack
//! Look in the cache for the allocator - creating it if not present
Expand Down
5 changes: 5 additions & 0 deletions lib/api/CJsonOutputWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include <api/CJsonOutputWriter.h>

#include <core/CScopedRapidJsonPoolAllocator.h>
#include <core/CStringUtils.h>
#include <core/CTimeUtils.h>

Expand Down Expand Up @@ -430,6 +431,10 @@ const CJsonOutputWriter::TStrVec &CJsonOutputWriter::fieldNames(void) const
bool CJsonOutputWriter::writeRow(const TStrStrUMap &dataRowFields,
const TStrStrUMap &overrideDataRowFields)
{
using TScopedAllocator = core::CScopedRapidJsonPoolAllocator<core::CRapidJsonConcurrentLineWriter>;

TScopedAllocator scopedAllocator("CJsonOutputWriter::writeRow", m_Writer);

rapidjson::Document doc = m_Writer.makeDoc();


Expand Down
4 changes: 4 additions & 0 deletions lib/api/CLineifiedJsonOutputWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
#include <api/CLineifiedJsonOutputWriter.h>

#include <core/CScopedRapidJsonPoolAllocator.h>
#include <core/CSleep.h>
#include <core/CStringUtils.h>

Expand Down Expand Up @@ -82,6 +83,9 @@ const CLineifiedJsonOutputWriter::TStrVec &CLineifiedJsonOutputWriter::fieldName
bool CLineifiedJsonOutputWriter::writeRow(const TStrStrUMap &dataRowFields,
const TStrStrUMap &overrideDataRowFields)
{
using TScopedAllocator = core::CScopedRapidJsonPoolAllocator<TGenericLineWriter>;
TScopedAllocator scopedAllocator("CLineifiedJsonOutputWriter::writeRow", m_Writer);

rapidjson::Document doc = m_Writer.makeDoc();

// Write all the fields to the document as strings
Expand Down
13 changes: 13 additions & 0 deletions lib/model/CForecastDataSink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <model/CForecastDataSink.h>

#include <core/CLogger.h>
#include <core/CScopedRapidJsonPoolAllocator.h>

#include <vector>

Expand Down Expand Up @@ -64,6 +65,8 @@ const std::string CForecastDataSink::PROCESSING_TIME_MS("processing_time_ms");
const std::string CForecastDataSink::PROGRESS("forecast_progress");
const std::string CForecastDataSink::STATUS("forecast_status");

using TScopedAllocator = core::CScopedRapidJsonPoolAllocator<core::CRapidJsonConcurrentLineWriter>;

CForecastDataSink::SForecastModelWrapper::SForecastModelWrapper(model_t::EFeature feature,
TMathsModelPtr &&forecastModel,
const std::string &byFieldValue)
Expand Down Expand Up @@ -121,6 +124,8 @@ CForecastDataSink::CForecastDataSink(const std::string &jobId,

void CForecastDataSink::writeStats(const double progress, uint64_t runtime, const TStrUMap &messages, bool successful)
{
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);

rapidjson::Document doc = m_Writer.makeDoc();

this->writeCommonStatsFields(doc);
Expand Down Expand Up @@ -161,6 +166,8 @@ void CForecastDataSink::writeScheduledMessage()

void CForecastDataSink::writeErrorMessage(const std::string &message)
{
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);

rapidjson::Document doc = m_Writer.makeDoc();
this->writeCommonStatsFields(doc);
TStrVec messages{message};
Expand All @@ -171,6 +178,8 @@ void CForecastDataSink::writeErrorMessage(const std::string &message)

void CForecastDataSink::writeFinalMessage(const std::string &message)
{
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);

rapidjson::Document doc = m_Writer.makeDoc();
this->writeCommonStatsFields(doc);
TStrVec messages{message};
Expand Down Expand Up @@ -200,6 +209,8 @@ void CForecastDataSink::writeCommonStatsFields(rapidjson::Value &doc)

void CForecastDataSink::push(bool flush, rapidjson::Value &doc)
{
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);

rapidjson::Document wrapper = m_Writer.makeDoc();

m_Writer.addMember(MODEL_FORECAST_STATS, doc, wrapper);
Expand All @@ -225,6 +236,8 @@ void CForecastDataSink::push(const maths::SErrorBar errorBar,
const std::string &byFieldValue,
int detectorIndex)
{
TScopedAllocator scopedAllocator("CForecastDataSink", m_Writer);

++m_NumRecordsWritten;
rapidjson::Document doc = m_Writer.makeDoc();

Expand Down