diff --git a/bin/data_frame_analyzer/Main.cc b/bin/data_frame_analyzer/Main.cc index fcdee15fa4..dfc783cda6 100644 --- a/bin/data_frame_analyzer/Main.cc +++ b/bin/data_frame_analyzer/Main.cc @@ -103,13 +103,14 @@ int main(int argc, char** argv) { return EXIT_FAILURE; } - ml::api::CDataFrameAnalysisSpecification analysisSpecification{analysisSpecificationJson}; - if (analysisSpecification.bad()) { + auto analysisSpecification = + std::make_unique(analysisSpecificationJson); + if (analysisSpecification->bad()) { LOG_FATAL("Failed to parse analysis specification"); return EXIT_FAILURE; } - if (analysisSpecification.threads() > 1) { - ml::core::startDefaultAsyncExecutor(analysisSpecification.threads()); + if (analysisSpecification->numberThreads() > 1) { + ml::core::startDefaultAsyncExecutor(analysisSpecification->numberThreads()); } ml::api::CDataFrameAnalyzer dataFrameAnalyzer{ diff --git a/include/api/CDataFrameAnalysisRunner.h b/include/api/CDataFrameAnalysisRunner.h index 6f3033587c..16d1cbce33 100644 --- a/include/api/CDataFrameAnalysisRunner.h +++ b/include/api/CDataFrameAnalysisRunner.h @@ -23,6 +23,10 @@ namespace ml { namespace core { class CDataFrame; +class CRapidJsonConcurrentLineWriter; +namespace data_frame_detail { +class CRowRef; +} } namespace api { class CDataFrameAnalysisSpecification; @@ -53,6 +57,7 @@ class CDataFrameAnalysisSpecification; class API_EXPORT CDataFrameAnalysisRunner { public: using TStrVec = std::vector; + using TRowRef = core::data_frame_detail::CRowRef; public: CDataFrameAnalysisRunner(const CDataFrameAnalysisSpecification& spec); @@ -67,9 +72,25 @@ class API_EXPORT CDataFrameAnalysisRunner { //! into main memory during an analysis. virtual std::size_t numberOfPartitions() const = 0; - //! \return The number of columns this analysis requires. This includes - //! the columns of the input frame plus any that the analysis will append. - virtual std::size_t requiredFrameColumns() const = 0; + //! \return The number of columns this analysis appends. + virtual std::size_t numberExtraColumns() const = 0; + + //! Write the extra columns of \p row added by the analysis to \p writer. + //! + //! This should create a new object of the form: + //!
+    //! {
+    //!   "name of column n":   "value of column n",
+    //!   "name of column n+1": "value of column n+1",
+    //!   ...
+    //! }
+    //! 
+ //! with one named member for each column added. + //! + //! \param[in] row The row to write the columns added by this analysis. + //! \param[in,out] writer The stream to which to write the extra columns. + virtual void writeOneRow(TRowRef row, + core::CRapidJsonConcurrentLineWriter& writer) const = 0; //! Checks whether the analysis is already running and if not launches it //! in the background. diff --git a/include/api/CDataFrameAnalysisSpecification.h b/include/api/CDataFrameAnalysisSpecification.h index 4ec00e513b..cc6d65b2fd 100644 --- a/include/api/CDataFrameAnalysisSpecification.h +++ b/include/api/CDataFrameAnalysisSpecification.h @@ -74,20 +74,29 @@ class API_EXPORT CDataFrameAnalysisSpecification { CDataFrameAnalysisSpecification(TRunnerFactoryUPtrVec runnerFactories, const std::string& jsonSpecification); + CDataFrameAnalysisSpecification(const CDataFrameAnalysisSpecification&) = delete; + CDataFrameAnalysisSpecification& operator=(const CDataFrameAnalysisSpecification&) = delete; + CDataFrameAnalysisSpecification(CDataFrameAnalysisSpecification&&) = delete; + CDataFrameAnalysisSpecification& operator=(CDataFrameAnalysisSpecification&&) = delete; + //! Check if the specification is bad. bool bad() const; //! \return The number of rows in the frame. - std::size_t rows() const; + std::size_t numberRows() const; //! \return The number of columns in the input frame. - std::size_t cols() const; + std::size_t numberColumns() const; + + //! \return The number of columns the analysis configured to run will append + //! to the data frame. + std::size_t numberExtraColumns() const; //! \return The memory limit for the process. std::size_t memoryLimit() const; //! \return The number of threads the analysis can use. - std::size_t threads() const; + std::size_t numberThreads() const; //! Run the analysis in a background thread. //! @@ -107,10 +116,10 @@ class API_EXPORT CDataFrameAnalysisSpecification { private: bool m_Bad = false; - std::size_t m_Rows = 0; - std::size_t m_Cols = 0; + std::size_t m_NumberRows = 0; + std::size_t m_NumberColumns = 0; std::size_t m_MemoryLimit = 0; - std::size_t m_Threads = 0; + std::size_t m_NumberThreads = 0; // TODO Sparse table support // double m_TableLoadFactor = 0.0; TRunnerFactoryUPtrVec m_RunnerFactories; diff --git a/include/api/CDataFrameAnalyzer.h b/include/api/CDataFrameAnalyzer.h index 2094d1f7b8..d177344663 100644 --- a/include/api/CDataFrameAnalyzer.h +++ b/include/api/CDataFrameAnalyzer.h @@ -3,12 +3,10 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ + #ifndef INCLUDED_ml_api_CDataFrameAnalyzer_h #define INCLUDED_ml_api_CDataFrameAnalyzer_h -#include - -#include #include #include @@ -19,9 +17,12 @@ namespace ml { namespace core { +class CDataFrame; class CJsonOutputStreamWrapper; } namespace api { +class CDataFrameAnalysisRunner; +class CDataFrameAnalysisSpecification; //! \brief Handles input to the data_frame_analyzer command. class API_EXPORT CDataFrameAnalyzer { @@ -30,39 +31,49 @@ class API_EXPORT CDataFrameAnalyzer { using TJsonOutputStreamWrapperUPtr = std::unique_ptr; using TJsonOutputStreamWrapperUPtrSupplier = std::function; + using TDataFrameAnalysisSpecificationUPtr = std::unique_ptr; public: - explicit CDataFrameAnalyzer(CDataFrameAnalysisSpecification analysisSpecification, - TJsonOutputStreamWrapperUPtrSupplier outStreamSupplier); + CDataFrameAnalyzer(TDataFrameAnalysisSpecificationUPtr analysisSpecification, + TJsonOutputStreamWrapperUPtrSupplier outStreamSupplier); + ~CDataFrameAnalyzer(); //! This is true if the analyzer is receiving control messages. bool usingControlMessages() const; - //! Handle adding a row of the data frame or a control message. + //! Handle receiving a row of the data frame or a control message. bool handleRecord(const TStrVec& fieldNames, const TStrVec& fieldValues); + //! Call when all row have been received. + void receivedAllRows(); + //! Run the configured analysis. void run(); +private: + using TDataFrameUPtr = std::unique_ptr; + private: static const std::ptrdiff_t CONTROL_FIELD_UNSET{-2}; static const std::ptrdiff_t CONTROL_FIELD_MISSING{-1}; private: + bool sufficientFieldValues(const TStrVec& fieldNames) const; bool readyToReceiveControlMessages() const; bool prepareToReceiveControlMessages(const TStrVec& fieldNames); bool isControlMessage(const TStrVec& fieldValues) const; bool handleControlMessage(const TStrVec& fieldValues); void addRowToDataFrame(const TStrVec& fieldValues); + void writeResultsOf(const CDataFrameAnalysisRunner& analysis) const; private: // This has values: -2 (unset), -1 (missing), >= 0 (control field index). - std::ptrdiff_t m_ControlFieldValue = CONTROL_FIELD_UNSET; + std::ptrdiff_t m_ControlFieldIndex = CONTROL_FIELD_UNSET; std::ptrdiff_t m_BeginDataFieldValues = -1; std::ptrdiff_t m_EndDataFieldValues = -1; std::uint64_t m_BadValueCount; - CDataFrameAnalysisSpecification m_AnalysisSpecification; - core::CDataFrame m_DataFrame; + TDataFrameAnalysisSpecificationUPtr m_AnalysisSpecification; + TDataFrameUPtr m_DataFrame; TJsonOutputStreamWrapperUPtrSupplier m_OutStreamSupplier; }; } diff --git a/include/api/CDataFrameOutliersRunner.h b/include/api/CDataFrameOutliersRunner.h index bfe3f959ac..42c6960a62 100644 --- a/include/api/CDataFrameOutliersRunner.h +++ b/include/api/CDataFrameOutliersRunner.h @@ -4,6 +4,9 @@ * you may not use this file except in compliance with the Elastic License. */ +#ifndef INCLUDED_ml_api_CDataFrameOutliersRunner_h +#define INCLUDED_ml_api_CDataFrameOutliersRunner_h + #include #include @@ -26,8 +29,11 @@ class API_EXPORT CDataFrameOutliersRunner : public CDataFrameAnalysisRunner { //! \sa CDataFrameAnalysisRunner::run. virtual std::size_t numberOfPartitions() const; - //! \return The number of columns of the output frame. - virtual std::size_t requiredFrameColumns() const; + //! \return The number of columns this adds to the data frame. + virtual std::size_t numberExtraColumns() const; + + //! Write the extra columns of \p row added by outlier analysis to \p writer. + virtual void writeOneRow(TRowRef row, core::CRapidJsonConcurrentLineWriter& writer) const; private: using TOptionalSize = boost::optional; @@ -62,3 +68,5 @@ class API_EXPORT CDataFrameOutliersRunnerFactory : public CDataFrameAnalysisRunn }; } } + +#endif // INCLUDED_ml_api_CDataFrameOutliersRunner_h diff --git a/include/core/CDataFrame.h b/include/core/CDataFrame.h index 8cfcd133ec..ca9b80d1ad 100644 --- a/include/core/CDataFrame.h +++ b/include/core/CDataFrame.h @@ -207,9 +207,10 @@ class CORE_EXPORT CDataFrame final { EReadWriteToStorage readAndWriteToStoreSyncStrategy, const TWriteSliceToStoreFunc& writeSliceToStore); + ~CDataFrame(); + CDataFrame(const CDataFrame&) = delete; CDataFrame& operator=(const CDataFrame&) = delete; - CDataFrame(CDataFrame&&) = default; CDataFrame& operator=(CDataFrame&&) = default; @@ -415,10 +416,11 @@ class CORE_EXPORT CDataFrame final { //! \param[in] readWriteToStoreSyncStrategy Controls whether reads and writes //! from slice storage are synchronous or asynchronous. CORE_EXPORT -CDataFrame makeMainStorageDataFrame(std::size_t numberColumns, - boost::optional sliceCapacity = boost::none, - CDataFrame::EReadWriteToStorage readWriteToStoreSyncStrategy = - CDataFrame::EReadWriteToStorage::E_Sync); +std::unique_ptr +makeMainStorageDataFrame(std::size_t numberColumns, + boost::optional sliceCapacity = boost::none, + CDataFrame::EReadWriteToStorage readWriteToStoreSyncStrategy = + CDataFrame::EReadWriteToStorage::E_Sync); //! Make a data frame which uses disk storage for its slices. //! @@ -431,12 +433,13 @@ CDataFrame makeMainStorageDataFrame(std::size_t numberColumns, //! \param[in] readWriteToStoreSyncStrategy Controls whether reads and writes //! from slice storage are synchronous or asynchronous. CORE_EXPORT -CDataFrame makeDiskStorageDataFrame(const std::string& rootDirectory, - std::size_t numberColumns, - std::size_t numberRows, - boost::optional sliceCapacity = boost::none, - CDataFrame::EReadWriteToStorage readWriteToStoreSyncStrategy = - CDataFrame::EReadWriteToStorage::E_Async); +std::unique_ptr +makeDiskStorageDataFrame(const std::string& rootDirectory, + std::size_t numberColumns, + std::size_t numberRows, + boost::optional sliceCapacity = boost::none, + CDataFrame::EReadWriteToStorage readWriteToStoreSyncStrategy = + CDataFrame::EReadWriteToStorage::E_Async); } } diff --git a/include/core/CRapidJsonConcurrentLineWriter.h b/include/core/CRapidJsonConcurrentLineWriter.h index 407752f80b..3b6e774a71 100644 --- a/include/core/CRapidJsonConcurrentLineWriter.h +++ b/include/core/CRapidJsonConcurrentLineWriter.h @@ -49,10 +49,11 @@ class CORE_EXPORT CRapidJsonConcurrentLineWriter std::size_t memoryUsage() const; //! Write JSON document to outputstream - //! Note this non-virtual overwrite is needed to avoid slicing of the writer - //! and hence ensure the correct EndObject is called - //! \p doc reference to rapidjson document value - void write(rapidjson::Value& doc) { doc.Accept(*this); } + //! \note This overwrite is needed because the members of rapidjson::Writer + //! are not virtual and we need to avoid "slicing" the writer to ensure that + //! that the correct StartObject/EndObject functions are called when this is + //! passed to \p doc Accept. + void write(const rapidjson::Value& doc) { doc.Accept(*this); } private: //! The stream object diff --git a/include/core/CRapidJsonLineWriter.h b/include/core/CRapidJsonLineWriter.h index b4b73553ff..ddc7e7e60f 100644 --- a/include/core/CRapidJsonLineWriter.h +++ b/include/core/CRapidJsonLineWriter.h @@ -55,10 +55,11 @@ class CRapidJsonLineWriter } //! Write JSON document to outputstream - //! Note this non-virtual overwrite is needed to avoid slicing of the writer - //! and hence ensure the correct StartObject/EndObject functions are called - //! \p doc reference to rapidjson document value - void write(rapidjson::Value& doc) { doc.Accept(*this); } + //! \note This overwrite is needed because the members of rapidjson::Writer + //! are not virtual and we need to avoid "slicing" the writer to ensure that + //! that the correct StartObject/EndObject functions are called when this is + //! passed to \p doc Accept. + void write(const rapidjson::Value& doc) { doc.Accept(*this); } private: size_t m_ObjectCount = 0; diff --git a/include/core/CRapidJsonWriterBase.h b/include/core/CRapidJsonWriterBase.h index 729ba4c95c..722f642fab 100644 --- a/include/core/CRapidJsonWriterBase.h +++ b/include/core/CRapidJsonWriterBase.h @@ -218,7 +218,7 @@ class CRapidJsonWriterBase //! write the rapidjson value document to the output stream //! \p[in] doc rapidjson document value to write out - virtual void write(TValue& doc) { doc.Accept(*this); } + virtual void write(const TValue& doc) { doc.Accept(*this); } //! Return a new rapidjson document TDocument makeDoc() const { diff --git a/include/core/CStaticThreadPool.h b/include/core/CStaticThreadPool.h index 21c2caf9b3..c20b3a4a1a 100644 --- a/include/core/CStaticThreadPool.h +++ b/include/core/CStaticThreadPool.h @@ -71,8 +71,9 @@ class CORE_EXPORT CStaticThreadPool { void worker(std::size_t id); private: - // This doesn't have to be atomic because it is always only set to true and - // always set straight before it is checked on each worker in the pool. + // This doesn't have to be atomic because it is always only set to true, + // always set straight before it is checked on each worker in the pool + // and tearing can't happen for single byte writes. bool m_Done = false; std::atomic_bool m_Busy; std::atomic m_Cursor; diff --git a/lib/api/CDataFrameAnalysisSpecification.cc b/lib/api/CDataFrameAnalysisSpecification.cc index ee91350a8c..af4acc5cfc 100644 --- a/lib/api/CDataFrameAnalysisSpecification.cc +++ b/lib/api/CDataFrameAnalysisSpecification.cc @@ -6,13 +6,14 @@ #include +#include #include +#include #include #include -#include -#include +#include #include @@ -52,10 +53,11 @@ bool isValidMember(const MEMBER& member) { } std::string toString(const rapidjson::Value& value) { - rapidjson::StringBuffer valueAsString; - rapidjson::Writer writer(valueAsString); - value.Accept(writer); - return valueAsString.GetString(); + std::ostringstream valueAsString; + rapidjson::OStreamWrapper shim{valueAsString}; + ml::core::CRapidJsonLineWriter writer{shim}; + writer.write(value); + return valueAsString.str(); } } @@ -85,12 +87,12 @@ CDataFrameAnalysisSpecification::CDataFrameAnalysisSpecification(TRunnerFactoryU }; if (document.HasMember(ROWS) && isPositiveInteger(document[ROWS])) { - m_Rows = document[ROWS].GetUint(); + m_NumberRows = document[ROWS].GetUint(); } else { registerFailure(ROWS); } if (document.HasMember(COLS) && isPositiveInteger(document[COLS])) { - m_Cols = document[COLS].GetUint(); + m_NumberColumns = document[COLS].GetUint(); } else { registerFailure(COLS); } @@ -100,7 +102,7 @@ CDataFrameAnalysisSpecification::CDataFrameAnalysisSpecification(TRunnerFactoryU registerFailure(MEMORY_LIMIT); } if (document.HasMember(THREADS) && isPositiveInteger(document[THREADS])) { - m_Threads = document[THREADS].GetUint(); + m_NumberThreads = document[THREADS].GetUint(); } else { registerFailure(THREADS); } @@ -130,20 +132,24 @@ bool CDataFrameAnalysisSpecification::bad() const { return m_Bad || m_Runner->bad(); } -std::size_t CDataFrameAnalysisSpecification::rows() const { - return m_Rows; +std::size_t CDataFrameAnalysisSpecification::numberRows() const { + return m_NumberRows; } -std::size_t CDataFrameAnalysisSpecification::cols() const { - return m_Cols; +std::size_t CDataFrameAnalysisSpecification::numberColumns() const { + return m_NumberColumns; +} + +std::size_t CDataFrameAnalysisSpecification::numberExtraColumns() const { + return m_Runner != nullptr ? m_Runner->numberExtraColumns() : 0; } std::size_t CDataFrameAnalysisSpecification::memoryLimit() const { return m_MemoryLimit; } -std::size_t CDataFrameAnalysisSpecification::threads() const { - return m_Threads; +std::size_t CDataFrameAnalysisSpecification::numberThreads() const { + return m_NumberThreads; } CDataFrameAnalysisRunner* CDataFrameAnalysisSpecification::run(core::CDataFrame& frame) const { diff --git a/lib/api/CDataFrameAnalyzer.cc b/lib/api/CDataFrameAnalyzer.cc index 0417984c13..aeb51b3483 100644 --- a/lib/api/CDataFrameAnalyzer.cc +++ b/lib/api/CDataFrameAnalyzer.cc @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -13,6 +14,8 @@ #include +#include + #include namespace ml { @@ -25,23 +28,29 @@ double truncateToFloatRange(double value) { const std::string CONTROL_MESSAGE_FIELD_NAME{"."}; // Control message types: -const char RUN_ANALYSIS_CONTROL_MESSAGE_FIELD_VALUE{'r'}; +const char FINISHED_DATA_CONTROL_MESSAGE_FIELD_VALUE{'$'}; const std::string ID_HASH{"id_hash"}; const std::string RESULTS{"results"}; -const std::string OUTLIER_SCORE{"outlier_score"}; } -// TODO memory calculations plus choose data frame storage strategy. -CDataFrameAnalyzer::CDataFrameAnalyzer(CDataFrameAnalysisSpecification analysisSpecification, +CDataFrameAnalyzer::CDataFrameAnalyzer(TDataFrameAnalysisSpecificationUPtr analysisSpecification, TJsonOutputStreamWrapperUPtrSupplier outStreamSupplier) - : m_AnalysisSpecification{std::move(analysisSpecification)}, - m_DataFrame{core::makeMainStorageDataFrame(m_AnalysisSpecification.cols())}, - m_OutStreamSupplier{outStreamSupplier} { + : m_AnalysisSpecification{std::move(analysisSpecification)}, m_OutStreamSupplier{outStreamSupplier} { + + // TODO memory calculations plus choose data frame storage strategy. + if (m_AnalysisSpecification != nullptr && m_AnalysisSpecification->bad() == false) { + m_DataFrame = core::makeMainStorageDataFrame(m_AnalysisSpecification->numberColumns()); + m_DataFrame->reserve(m_AnalysisSpecification->numberThreads(), + m_AnalysisSpecification->numberColumns() + + m_AnalysisSpecification->numberExtraColumns()); + } } +CDataFrameAnalyzer::~CDataFrameAnalyzer() = default; + bool CDataFrameAnalyzer::usingControlMessages() const { - return m_ControlFieldValue >= 0; + return m_ControlFieldIndex >= 0; } bool CDataFrameAnalyzer::handleRecord(const TStrVec& fieldNames, const TStrVec& fieldValues) { @@ -53,9 +62,24 @@ bool CDataFrameAnalyzer::handleRecord(const TStrVec& fieldNames, const TStrVec& // Note if the the control message field name is missing the analysis must // be triggered to run by calling run explicitly. - bool result{true}; - if (this->readyToReceiveControlMessages() == false) { - result = this->prepareToReceiveControlMessages(fieldNames); + if (m_AnalysisSpecification == nullptr || m_AnalysisSpecification->bad()) { + // TODO We need to communicate an error but don't want one for each row. + // Revisit this when we have finalized our monitoring strategy. + LOG_TRACE(<< "Specification is bad"); + return false; + } + + if (this->readyToReceiveControlMessages() == false && + this->prepareToReceiveControlMessages(fieldNames) == false) { + return false; + } + + if (this->sufficientFieldValues(fieldValues) == false) { + // TODO We need to communicate an error but don't want one for each row. + // Revisit this when we have finalized our monitoring strategy. + LOG_TRACE(<< "Expected " << m_AnalysisSpecification->numberColumns() + << " field values and got " << fieldValues.size()); + return false; } if (this->isControlMessage(fieldValues)) { @@ -63,44 +87,49 @@ bool CDataFrameAnalyzer::handleRecord(const TStrVec& fieldNames, const TStrVec& } this->addRowToDataFrame(fieldValues); + return false; +} - return result; +void CDataFrameAnalyzer::receivedAllRows() { + m_DataFrame->finishWritingRows(); + LOG_DEBUG(<< "Received " << m_DataFrame->numberRows() << " rows"); } void CDataFrameAnalyzer::run() { - // TODO - // This is writing fake results just to enable testing - // the parsing and merging of results in the java side - auto outStream = m_OutStreamSupplier(); - core::CRapidJsonConcurrentLineWriter outputWriter{*outStream}; - for (std::size_t i = 0; i < m_AnalysisSpecification.rows(); ++i) { - outputWriter.StartObject(); - outputWriter.String(ID_HASH); - outputWriter.String(ID_HASH); // This should be the actual hash - outputWriter.String(RESULTS); - outputWriter.StartObject(); - outputWriter.String(OUTLIER_SCORE); - outputWriter.Double(i); - outputWriter.EndObject(); - outputWriter.EndObject(); + if (m_AnalysisSpecification == nullptr || m_AnalysisSpecification->bad() || + m_DataFrame == nullptr) { + return; } - outputWriter.flush(); + + LOG_TRACE(<< "Running analysis..."); + + CDataFrameAnalysisRunner* analysis{m_AnalysisSpecification->run(*m_DataFrame)}; + + if (analysis == nullptr) { + return; + } + + // TODO progress monitoring, etc. + + analysis->waitToFinish(); + + this->writeResultsOf(*analysis); } bool CDataFrameAnalyzer::readyToReceiveControlMessages() const { - return m_ControlFieldValue != CONTROL_FIELD_UNSET; + return m_ControlFieldIndex != CONTROL_FIELD_UNSET; } bool CDataFrameAnalyzer::prepareToReceiveControlMessages(const TStrVec& fieldNames) { if (fieldNames.empty() || fieldNames.back() != CONTROL_MESSAGE_FIELD_NAME) { - m_ControlFieldValue = CONTROL_FIELD_MISSING; + m_ControlFieldIndex = CONTROL_FIELD_MISSING; m_BeginDataFieldValues = 0; m_EndDataFieldValues = static_cast(fieldNames.size()); } else { - m_ControlFieldValue = static_cast(fieldNames.size() - 1); + m_ControlFieldIndex = static_cast(fieldNames.size() - 1); m_BeginDataFieldValues = 0; - m_EndDataFieldValues = m_ControlFieldValue; + m_EndDataFieldValues = m_ControlFieldIndex; auto pos = std::find(fieldNames.begin(), fieldNames.end(), CONTROL_MESSAGE_FIELD_NAME); if (pos != fieldNames.end() - 1) { LOG_ERROR(<< "Unexpected possible control field: ignoring"); @@ -111,38 +140,51 @@ bool CDataFrameAnalyzer::prepareToReceiveControlMessages(const TStrVec& fieldNam } bool CDataFrameAnalyzer::isControlMessage(const TStrVec& fieldValues) const { - return m_ControlFieldValue >= 0 && fieldValues[m_ControlFieldValue].size() > 0; + return m_ControlFieldIndex >= 0 && fieldValues[m_ControlFieldIndex].size() > 0; +} + +bool CDataFrameAnalyzer::sufficientFieldValues(const TStrVec& fieldValues) const { + if (m_ControlFieldIndex >= 0) { + return fieldValues.size() == m_AnalysisSpecification->numberColumns() + 1; + } + return fieldValues.size() == m_AnalysisSpecification->numberColumns(); } bool CDataFrameAnalyzer::handleControlMessage(const TStrVec& fieldValues) { + LOG_TRACE(<< "Control message: '" << fieldValues[m_ControlFieldIndex] << "'"); bool unrecognised{false}; - switch (fieldValues[m_ControlFieldValue][0]) { + switch (fieldValues[m_ControlFieldIndex][0]) { case ' ': // Spaces are just used to fill the buffers and force prior messages // through the system - we don't need to do anything else. LOG_TRACE(<< "Received pad of length " << controlMessage.length()); return true; - case RUN_ANALYSIS_CONTROL_MESSAGE_FIELD_VALUE: + case FINISHED_DATA_CONTROL_MESSAGE_FIELD_VALUE: + this->receivedAllRows(); this->run(); break; default: unrecognised = true; break; } - if (unrecognised || fieldValues[m_ControlFieldValue].size() > 1) { + if (unrecognised || fieldValues[m_ControlFieldIndex].size() > 1) { LOG_ERROR(<< "Invalid control message value '" - << fieldValues[m_ControlFieldValue] << "'"); + << fieldValues[m_ControlFieldIndex] << "'"); return false; } return true; } void CDataFrameAnalyzer::addRowToDataFrame(const TStrVec& fieldValues) { + if (m_DataFrame == nullptr) { + return; + } + using TFloatVec = std::vector; using TFloatVecItr = TFloatVec::iterator; - m_DataFrame.writeRow([&](TFloatVecItr output) { + m_DataFrame->writeRow([&](TFloatVecItr output) { for (std::ptrdiff_t i = m_BeginDataFieldValues; i != m_EndDataFieldValues; ++i, ++output) { double value; @@ -164,5 +206,31 @@ void CDataFrameAnalyzer::addRowToDataFrame(const TStrVec& fieldValues) { } }); } + +void CDataFrameAnalyzer::writeResultsOf(const CDataFrameAnalysisRunner& analysis) const { + // TODO Revisit this can probably be core::CRapidJsonLineWriter. + // TODO This should probably be a member variable so it is only created once. + auto outStream = m_OutStreamSupplier(); + core::CRapidJsonConcurrentLineWriter outputWriter{*outStream}; + + // We write results single threaded because we need to write the rows to + // Java in the order they were written to the data_frame_analyzer so it + // can join the extra columns with the original data frame. + std::size_t numberThreads{1}; + + using TRowItr = core::CDataFrame::TRowItr; + m_DataFrame->readRows(numberThreads, [&](TRowItr beginRows, TRowItr endRows) { + for (auto row = beginRows; row != endRows; ++row) { + outputWriter.StartObject(); + outputWriter.Key(ID_HASH); + outputWriter.String(ID_HASH); // TODO this should be the actual hash + outputWriter.Key(RESULTS); + analysis.writeOneRow(*row, outputWriter); + outputWriter.EndObject(); + } + }); + + outputWriter.flush(); +} } } diff --git a/lib/api/CDataFrameOutliersRunner.cc b/lib/api/CDataFrameOutliersRunner.cc index ccba1b9319..fb9ec61d78 100644 --- a/lib/api/CDataFrameOutliersRunner.cc +++ b/lib/api/CDataFrameOutliersRunner.cc @@ -6,12 +6,13 @@ #include +#include #include +#include #include #include - #include #include #include @@ -25,15 +26,18 @@ namespace ml { namespace api { namespace { +// Configuration const char* NUMBER_NEIGHBOURS{"number_neighbours"}; const char* METHOD{"method"}; const char* LOF{"lof"}; const char* LDOF{"ldof"}; const char* DISTANCE_KTH_NN{"distance_kth_nn"}; const char* DISTANCE_KNN{"distance_knn"}; - const char* VALID_MEMBER_NAMES[]{NUMBER_NEIGHBOURS, METHOD}; +// Output +const std::string OUTLIER_SCORE{"outlier_score"}; + template bool isValidMember(const MEMBER& member) { return std::find_if(std::begin(VALID_MEMBER_NAMES), @@ -98,13 +102,22 @@ std::size_t CDataFrameOutliersRunner::numberOfPartitions() const { return m_NumberPartitions; } -std::size_t CDataFrameOutliersRunner::requiredFrameColumns() const { - // This is number of columns + outlier score + explaining features TBD. - return this->spec().cols() + 1; +std::size_t CDataFrameOutliersRunner::numberExtraColumns() const { + // Column for outlier score + explaining features TBD. + return 1; +} + +void CDataFrameOutliersRunner::writeOneRow(TRowRef row, + core::CRapidJsonConcurrentLineWriter& outputWriter) const { + std::size_t lastColumn{row.numberColumns() - 1}; + outputWriter.StartObject(); + outputWriter.Key(OUTLIER_SCORE); + outputWriter.Double(row[lastColumn]); + outputWriter.EndObject(); } -void CDataFrameOutliersRunner::runImpl(core::CDataFrame& /*frame*/) { - // TODO +void CDataFrameOutliersRunner::runImpl(core::CDataFrame& frame) { + maths::computeOutliers(this->spec().numberThreads(), frame); } const char* CDataFrameOutliersRunnerFactory::name() const { diff --git a/lib/api/unittest/CDataFrameAnalysisSpecificationTest.cc b/lib/api/unittest/CDataFrameAnalysisSpecificationTest.cc index b2d8172963..ae78eaa092 100644 --- a/lib/api/unittest/CDataFrameAnalysisSpecificationTest.cc +++ b/lib/api/unittest/CDataFrameAnalysisSpecificationTest.cc @@ -37,8 +37,8 @@ class CDataFrameTestAnalysisRunner : public api::CDataFrameAnalysisRunner { : api::CDataFrameAnalysisRunner{spec} {} virtual std::size_t numberOfPartitions() const { return 1; } - - std::size_t requiredFrameColumns() const { return this->spec().cols() + 2; } + virtual std::size_t numberExtraColumns() const { return 2; } + virtual void writeOneRow(TRowRef, core::CRapidJsonConcurrentLineWriter&) const {} protected: void runImpl(core::CDataFrame&) { @@ -129,10 +129,10 @@ void CDataFrameAnalysisSpecificationTest::testCreate() { api::CDataFrameAnalysisSpecification spec{ outliersFactory(), jsonSpec("1000", "20", "100000", "2", "outliers")}; CPPUNIT_ASSERT_EQUAL(false, spec.bad()); - CPPUNIT_ASSERT_EQUAL(std::size_t{1000}, spec.rows()); - CPPUNIT_ASSERT_EQUAL(std::size_t{20}, spec.cols()); + CPPUNIT_ASSERT_EQUAL(std::size_t{1000}, spec.numberRows()); + CPPUNIT_ASSERT_EQUAL(std::size_t{20}, spec.numberColumns()); CPPUNIT_ASSERT_EQUAL(std::size_t{100000}, spec.memoryLimit()); - CPPUNIT_ASSERT_EQUAL(std::size_t{2}, spec.threads()); + CPPUNIT_ASSERT_EQUAL(std::size_t{2}, spec.numberThreads()); } LOG_DEBUG(<< "Bad input"); { @@ -234,9 +234,9 @@ void CDataFrameAnalysisSpecificationTest::testRunAnalysis() { api::CDataFrameAnalysisSpecification spec{testFactory(), jsonSpec}; CPPUNIT_ASSERT_EQUAL(false, spec.bad()); - core::CDataFrame frame{core::makeMainStorageDataFrame(10)}; + std::unique_ptr frame{core::makeMainStorageDataFrame(10)}; - api::CDataFrameAnalysisRunner* runner = spec.run(frame); + api::CDataFrameAnalysisRunner* runner = spec.run(*frame); CPPUNIT_ASSERT(runner != nullptr); std::string possibleErrors[]{"[]", "[error 0]", "[error 0, error 10]", @@ -272,7 +272,7 @@ CppUnit::Test* CDataFrameAnalysisSpecificationTest::suite() { new CppUnit::TestSuite("CDataFrameAnalysisSpecificationTest"); suiteOfTests->addTest(new CppUnit::TestCaller( - "CDataFrameAnalysisSpecificationTest::testAdd", + "CDataFrameAnalysisSpecificationTest::testCreate", &CDataFrameAnalysisSpecificationTest::testCreate)); suiteOfTests->addTest(new CppUnit::TestCaller( "CDataFrameAnalysisSpecificationTest::testRunAnalysis", diff --git a/lib/api/unittest/CDataFrameAnalyzerTest.cc b/lib/api/unittest/CDataFrameAnalyzerTest.cc new file mode 100644 index 0000000000..80ab040a47 --- /dev/null +++ b/lib/api/unittest/CDataFrameAnalyzerTest.cc @@ -0,0 +1,226 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +#include "CDataFrameAnalyzerTest.h" + +#include +#include + +#include + +#include +#include + +#include + +#include +#include + +#include +#include +#include +#include +#include + +using namespace ml; + +namespace { +using TDoubleVec = std::vector; +using TDoubleVecVec = std::vector; +using TStrVec = std::vector; +using TPoint = maths::CDenseVector; +using TPointVec = std::vector; + +std::unique_ptr outlierSpec() { + std::string spec{"{\n" + " \"rows\": 100,\n" + " \"cols\": 5,\n" + " \"memory_limit\": 100000,\n" + " \"threads\": 1,\n" + " \"analysis\": {\n" + " \"name\": \"outliers\"" + " }" + "}"}; + return std::make_unique(spec); +} + +void addTestData(TStrVec fieldNames, + TStrVec fieldValues, + api::CDataFrameAnalyzer& analyzer, + TDoubleVec& expectedScores) { + + std::size_t numberInliers{100}; + std::size_t numberOutliers{10}; + + test::CRandomNumbers rng; + + TDoubleVec mean{1.0, 10.0, 4.0, 8.0, 3.0}; + TDoubleVecVec covariance{{1.0, 0.1, -0.1, 0.3, 0.2}, + {0.1, 1.3, -0.3, 0.1, 0.1}, + {-0.1, -0.3, 2.1, 0.1, 0.2}, + {0.3, 0.1, 0.1, 0.8, 0.2}, + {0.2, 0.1, 0.2, 0.2, 2.2}}; + + TDoubleVecVec inliers; + rng.generateMultivariateNormalSamples(mean, covariance, numberInliers, inliers); + + TDoubleVec outliers; + rng.generateUniformSamples(0.0, 10.0, numberOutliers * 5, outliers); + + TPointVec points(numberInliers + numberOutliers, TPoint(5)); + for (std::size_t i = 0; i < inliers.size(); ++i) { + for (std::size_t j = 0; j < 5; ++j) { + fieldValues[j] = std::to_string(inliers[i][j]); + points[i](j) = inliers[i][j]; + } + analyzer.handleRecord(fieldNames, fieldValues); + } + for (std::size_t i = 0, j = numberInliers; i < outliers.size(); ++j) { + for (std::size_t k = 0; k < 5; ++i, ++k) { + fieldValues[k] = std::to_string(outliers[i]); + points[j](k) = outliers[i]; + } + analyzer.handleRecord(fieldNames, fieldValues); + } + + maths::CLocalOutlierFactors::ensemble(points, expectedScores); +} +} + +void CDataFrameAnalyzerTest::testWithoutControlMessages() { + + std::stringstream output; + auto outputWriterFactory = [&output]() { + return std::make_unique(output); + }; + + api::CDataFrameAnalyzer analyzer{outlierSpec(), outputWriterFactory}; + + TDoubleVec expectedScores; + + TStrVec fieldNames{"c1", "c2", "c3", "c4", "c5"}; + TStrVec fieldValues(fieldNames.size()); + addTestData(fieldNames, fieldValues, analyzer, expectedScores); + + analyzer.receivedAllRows(); + analyzer.run(); + + rapidjson::Document results; + rapidjson::ParseResult ok(results.Parse(output.str().c_str())); + CPPUNIT_ASSERT(static_cast(ok) == true); + + auto expectedScore = expectedScores.begin(); + for (const auto& result : results.GetArray()) { + CPPUNIT_ASSERT(expectedScore != expectedScores.end()); + CPPUNIT_ASSERT_DOUBLES_EQUAL( + *(expectedScore++), result["results"]["outlier_score"].GetDouble(), 1e-6); + } + CPPUNIT_ASSERT(expectedScore == expectedScores.end()); +} + +void CDataFrameAnalyzerTest::testRunOutlierDetection() { + + std::stringstream output; + auto outputWriterFactory = [&output]() { + return std::make_unique(output); + }; + + api::CDataFrameAnalyzer analyzer{outlierSpec(), outputWriterFactory}; + + TDoubleVec expectedScores; + + TStrVec fieldNames{"c1", "c2", "c3", "c4", "c5", "."}; + TStrVec fieldValues(fieldNames.size()); + addTestData(fieldNames, fieldValues, analyzer, expectedScores); + + analyzer.handleRecord(fieldNames, {"", "", "", "", "", "$"}); + + rapidjson::Document results; + rapidjson::ParseResult ok(results.Parse(output.str().c_str())); + CPPUNIT_ASSERT(static_cast(ok) == true); + + auto expectedScore = expectedScores.begin(); + for (const auto& result : results.GetArray()) { + CPPUNIT_ASSERT(expectedScore != expectedScores.end()); + CPPUNIT_ASSERT_DOUBLES_EQUAL( + *(expectedScore++), result["results"]["outlier_score"].GetDouble(), 1e-6); + } + CPPUNIT_ASSERT(expectedScore == expectedScores.end()); +} + +void CDataFrameAnalyzerTest::testFlushMessage() { + + // Test that white space is just ignored. + + std::stringstream output; + auto outputWriterFactory = [&output]() { + return std::make_unique(output); + }; + + api::CDataFrameAnalyzer analyzer{outlierSpec(), outputWriterFactory}; + CPPUNIT_ASSERT_EQUAL( + true, analyzer.handleRecord({"c1", "c2", "c3", "c4", "c5", "."}, + {"", "", "", "", "", " "})); +} + +void CDataFrameAnalyzerTest::testErrors() { + + std::stringstream output; + auto outputWriterFactory = [&output]() { + return std::make_unique(output); + }; + + // Test with bad analysis specification. + { + api::CDataFrameAnalyzer analyzer{ + std::make_unique(std::string{"junk"}), + outputWriterFactory}; + CPPUNIT_ASSERT_EQUAL(false, + analyzer.handleRecord({"c1", "c2", "c3", "c4", "c5"}, + {"10", "10", "10", "10", "10"})); + } + + // Test control message in the wrong position + { + api::CDataFrameAnalyzer analyzer{outlierSpec(), outputWriterFactory}; + CPPUNIT_ASSERT_EQUAL( + false, analyzer.handleRecord({"c1", "c2", "c3", ".", "c4", "c5"}, + {"10", "10", "10", "", "10", "10"})); + } + + // Test bad control message + { + api::CDataFrameAnalyzer analyzer{outlierSpec(), outputWriterFactory}; + CPPUNIT_ASSERT_EQUAL( + false, analyzer.handleRecord({"c1", "c2", "c3", "c4", "c5", "."}, + {"10", "10", "10", "10", "10", "foo"})); + } + + // Test bad input + { + api::CDataFrameAnalyzer analyzer{outlierSpec(), outputWriterFactory}; + CPPUNIT_ASSERT_EQUAL( + false, analyzer.handleRecord({"c1", "c2", "c3", "c4", "c5", "."}, + {"10", "10", "10", "10", "10"})); + } +} + +CppUnit::Test* CDataFrameAnalyzerTest::suite() { + CppUnit::TestSuite* suiteOfTests = new CppUnit::TestSuite("CDataFrameAnalyzerTest"); + + suiteOfTests->addTest(new CppUnit::TestCaller( + "CDataFrameAnalyzerTest::testWithoutControlMessages", + &CDataFrameAnalyzerTest::testWithoutControlMessages)); + suiteOfTests->addTest(new CppUnit::TestCaller( + "CDataFrameAnalyzerTest::testRunOutlierDetection", + &CDataFrameAnalyzerTest::testRunOutlierDetection)); + suiteOfTests->addTest(new CppUnit::TestCaller( + "CDataFrameAnalyzerTest::testFlushMessage", &CDataFrameAnalyzerTest::testFlushMessage)); + suiteOfTests->addTest(new CppUnit::TestCaller( + "CDataFrameAnalyzerTest::testErrors", &CDataFrameAnalyzerTest::testErrors)); + + return suiteOfTests; +} diff --git a/lib/api/unittest/CDataFrameAnalyzerTest.h b/lib/api/unittest/CDataFrameAnalyzerTest.h new file mode 100644 index 0000000000..f2e0444225 --- /dev/null +++ b/lib/api/unittest/CDataFrameAnalyzerTest.h @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +#ifndef INCLUDED_CDataFrameAnalyzerTest_h +#define INCLUDED_CDataFrameAnalyzerTest_h + +#include + +class CDataFrameAnalyzerTest : public CppUnit::TestFixture { +public: + void testWithoutControlMessages(); + void testRunOutlierDetection(); + void testFlushMessage(); + void testErrors(); + + static CppUnit::Test* suite(); +}; + +#endif // INCLUDED_CDataFrameAnalyzerTest_h diff --git a/lib/api/unittest/Main.cc b/lib/api/unittest/Main.cc index ba03a70490..062f208f27 100644 --- a/lib/api/unittest/Main.cc +++ b/lib/api/unittest/Main.cc @@ -14,6 +14,7 @@ #include "CCsvInputParserTest.h" #include "CCsvOutputWriterTest.h" #include "CDataFrameAnalysisSpecificationTest.h" +#include "CDataFrameAnalyzerTest.h" #include "CDetectionRulesJsonParserTest.h" #include "CFieldConfigTest.h" #include "CFieldDataTyperTest.h" @@ -47,6 +48,7 @@ int main(int argc, const char** argv) { runner.addTest(CCsvInputParserTest::suite()); runner.addTest(CCsvOutputWriterTest::suite()); runner.addTest(CDataFrameAnalysisSpecificationTest::suite()); + runner.addTest(CDataFrameAnalyzerTest::suite()); runner.addTest(CDetectionRulesJsonParserTest::suite()); runner.addTest(CFieldConfigTest::suite()); runner.addTest(CFieldDataTyperTest::suite()); diff --git a/lib/api/unittest/Makefile b/lib/api/unittest/Makefile index 35e99478ff..616070ae29 100644 --- a/lib/api/unittest/Makefile +++ b/lib/api/unittest/Makefile @@ -28,6 +28,7 @@ SRCS=\ CCsvInputParserTest.cc \ CCsvOutputWriterTest.cc \ CDataFrameAnalysisSpecificationTest.cc \ + CDataFrameAnalyzerTest.cc \ CDetectionRulesJsonParserTest.cc \ CFieldConfigTest.cc \ CFieldDataTyperTest.cc \ diff --git a/lib/core/CDataFrame.cc b/lib/core/CDataFrame.cc index aa2ddfda8f..f629752f72 100644 --- a/lib/core/CDataFrame.cc +++ b/lib/core/CDataFrame.cc @@ -156,6 +156,8 @@ CDataFrame::CDataFrame(bool inMainMemory, readAndWriteToStoreSyncStrategy, writeSliceToStore} { } +CDataFrame::~CDataFrame() = default; + bool CDataFrame::inMainMemory() const { return m_InMainMemory; } @@ -169,7 +171,7 @@ std::size_t CDataFrame::numberColumns() const { } bool CDataFrame::reserve(std::size_t numberThreads, std::size_t rowCapacity) { - if (m_NumberColumns >= rowCapacity) { + if (m_RowCapacity >= rowCapacity) { return true; } @@ -425,7 +427,7 @@ void CDataFrame::applyToRowsOfOneSlice(TRowFunc& func, std::size_t firstRow, const CDataFrameRowSliceHandle& slice) const { LOG_TRACE(<< "Applying function to slice starting at row " << firstRow); - std::size_t rows{slice.size() / m_NumberColumns}; + std::size_t rows{slice.size() / m_RowCapacity}; std::size_t lastRow{firstRow + rows}; func(CRowIterator{m_NumberColumns, m_RowCapacity, firstRow, slice.begin()}, CRowIterator{m_NumberColumns, m_RowCapacity, lastRow, slice.end()}); @@ -443,28 +445,32 @@ void CDataFrame::CDataFrameRowSliceWriter::finishAsyncWriteToStore() { } } -CDataFrame makeMainStorageDataFrame(std::size_t numberColumns, - boost::optional sliceCapacity, - CDataFrame::EReadWriteToStorage readWriteToStoreSyncStrategy) { +std::unique_ptr +makeMainStorageDataFrame(std::size_t numberColumns, + boost::optional sliceCapacity, + CDataFrame::EReadWriteToStorage readWriteToStoreSyncStrategy) { // The return copy is elided so we never need to call the explicitly // deleted data frame copy constructor. auto writer = [](std::size_t firstRow, TFloatVec slice) { - return boost::make_unique(firstRow, std::move(slice)); + return std::make_unique(firstRow, std::move(slice)); }; if (sliceCapacity != boost::none) { - return {true, numberColumns, *sliceCapacity, readWriteToStoreSyncStrategy, writer}; + return std::make_unique(true, numberColumns, *sliceCapacity, + readWriteToStoreSyncStrategy, writer); } - return {true, numberColumns, readWriteToStoreSyncStrategy, writer}; + return std::make_unique(true, numberColumns, + readWriteToStoreSyncStrategy, writer); } -CDataFrame makeDiskStorageDataFrame(const std::string& rootDirectory, - std::size_t numberColumns, - std::size_t numberRows, - boost::optional sliceCapacity, - CDataFrame::EReadWriteToStorage readWriteToStoreSyncStrategy) { +std::unique_ptr +makeDiskStorageDataFrame(const std::string& rootDirectory, + std::size_t numberColumns, + std::size_t numberRows, + boost::optional sliceCapacity, + CDataFrame::EReadWriteToStorage readWriteToStoreSyncStrategy) { // The return copy is elided so we never need to call the explicitly // deleted data frame copy constructor. @@ -479,14 +485,16 @@ CDataFrame makeDiskStorageDataFrame(const std::string& rootDirectory, // the folder cleaned up, until the data frame itself is destroyed. auto writer = [directory](std::size_t firstRow, TFloatVec slice) { - return boost::make_unique(directory, firstRow, - std::move(slice)); + return std::make_unique(directory, firstRow, + std::move(slice)); }; if (sliceCapacity != boost::none) { - return {false, numberColumns, *sliceCapacity, readWriteToStoreSyncStrategy, writer}; + return std::make_unique(false, numberColumns, *sliceCapacity, + readWriteToStoreSyncStrategy, writer); } - return {false, numberColumns, readWriteToStoreSyncStrategy, writer}; + return std::make_unique(false, numberColumns, + readWriteToStoreSyncStrategy, writer); } } } diff --git a/lib/core/CStaticThreadPool.cc b/lib/core/CStaticThreadPool.cc index b54919ca99..b4ef7021e7 100644 --- a/lib/core/CStaticThreadPool.cc +++ b/lib/core/CStaticThreadPool.cc @@ -73,8 +73,12 @@ void CStaticThreadPool::shutdown() { }}); } for (auto& thread : m_Pool) { - thread.join(); + if (thread.joinable()) { + thread.join(); + } } + m_TaskQueues.clear(); + m_Pool.clear(); } void CStaticThreadPool::worker(std::size_t id) { diff --git a/lib/core/unittest/CDataFrameTest.cc b/lib/core/unittest/CDataFrameTest.cc index d034a56cd4..22c0ccba63 100644 --- a/lib/core/unittest/CDataFrameTest.cc +++ b/lib/core/unittest/CDataFrameTest.cc @@ -31,6 +31,7 @@ using TFloatVecCItr = TFloatVec::const_iterator; using TSizeFloatVecUMap = boost::unordered_map; using TRowItr = core::CDataFrame::TRowItr; using TReadFunc = std::function; +using TFactoryFunc = std::function()>; TFloatVec testData(std::size_t numberRows, std::size_t numberColumns) { test::CRandomNumbers rng; @@ -113,8 +114,7 @@ void CDataFrameTest::testInMainMemoryBasicReadWrite() { << sync[static_cast(readWriteToStoreAsync)]); for (auto end : {500 * cols, 2000 * cols, components.size()}) { - core::CDataFrame frame{core::makeMainStorageDataFrame( - cols, capacity, readWriteToStoreAsync)}; + auto frame = core::makeMainStorageDataFrame(cols, capacity, readWriteToStoreAsync); for (std::size_t i = 0; i < end; i += cols) { auto writer = [&components, cols, i](TFloatVecItr output) mutable { @@ -122,14 +122,14 @@ void CDataFrameTest::testInMainMemoryBasicReadWrite() { *output = components[i]; } }; - frame.writeRow(writer); + frame->writeRow(writer); } - frame.finishWritingRows(); + frame->finishWritingRows(); bool successful; bool passed{true}; std::size_t i{0}; - std::tie(std::ignore, successful) = frame.readRows( + std::tie(std::ignore, successful) = frame->readRows( 1, std::bind(makeReader(components, cols, passed), std::ref(i), std::placeholders::_1, std::placeholders::_2)); CPPUNIT_ASSERT(successful); @@ -151,20 +151,20 @@ void CDataFrameTest::testInMainMemoryParallelRead() { TFloatVec components{testData(rows, cols)}; - core::CDataFrame frame{core::makeMainStorageDataFrame(cols, capacity)}; + auto frame = core::makeMainStorageDataFrame(cols, capacity); for (std::size_t i = 0; i < components.size(); i += cols) { auto writer = [&components, cols, i](TFloatVecItr output) mutable { for (std::size_t end = i + cols; i < end; ++i, ++output) { *output = components[i]; } }; - frame.writeRow(writer); + frame->writeRow(writer); } - frame.finishWritingRows(); + frame->finishWritingRows(); std::vector readers; bool successful; - std::tie(readers, successful) = frame.readRows(3, CThreadReader{}); + std::tie(readers, successful) = frame->readRows(3, CThreadReader{}); CPPUNIT_ASSERT(successful); CPPUNIT_ASSERT_EQUAL(std::size_t{3}, readers.size()); @@ -194,8 +194,8 @@ void CDataFrameTest::testOnDiskBasicReadWrite() { std::size_t capacity{1000}; TFloatVec components{testData(rows, cols)}; - core::CDataFrame frame{core::makeDiskStorageDataFrame( - boost::filesystem::current_path().string(), cols, rows, capacity)}; + auto frame = core::makeDiskStorageDataFrame( + boost::filesystem::current_path().string(), cols, rows, capacity); for (std::size_t i = 0; i < components.size(); i += cols) { auto writer = [&components, cols, i](TFloatVecItr output) mutable { @@ -203,14 +203,14 @@ void CDataFrameTest::testOnDiskBasicReadWrite() { *output = components[i]; } }; - frame.writeRow(writer); + frame->writeRow(writer); } - frame.finishWritingRows(); + frame->finishWritingRows(); bool successful; bool passed{true}; std::size_t i{0}; - std::tie(std::ignore, successful) = frame.readRows( + std::tie(std::ignore, successful) = frame->readRows( 1, std::bind(makeReader(components, cols, passed), std::ref(i), std::placeholders::_1, std::placeholders::_2)); CPPUNIT_ASSERT(successful); @@ -224,8 +224,8 @@ void CDataFrameTest::testOnDiskParallelRead() { std::size_t capacity{1000}; TFloatVec components{testData(rows, cols)}; - core::CDataFrame frame{core::makeDiskStorageDataFrame( - boost::filesystem::current_path().string(), cols, rows, capacity)}; + auto frame = core::makeDiskStorageDataFrame( + boost::filesystem::current_path().string(), cols, rows, capacity); for (std::size_t i = 0; i < components.size(); i += cols) { auto writer = [&components, cols, i](TFloatVecItr output) mutable { @@ -233,13 +233,13 @@ void CDataFrameTest::testOnDiskParallelRead() { *output = components[i]; } }; - frame.writeRow(writer); + frame->writeRow(writer); } - frame.finishWritingRows(); + frame->finishWritingRows(); std::vector readers; bool successful; - std::tie(readers, successful) = frame.readRows(3, CThreadReader{}); + std::tie(readers, successful) = frame->readRows(3, CThreadReader{}); CPPUNIT_ASSERT(successful); CPPUNIT_ASSERT_EQUAL(std::size_t{(rows + 1999) / 2000}, readers.size()); @@ -270,8 +270,6 @@ void CDataFrameTest::testMemoryUsage() { std::size_t capacity{5000}; TFloatVec components{testData(rows, cols)}; - using TFactoryFunc = std::function; - TFactoryFunc makeOnDisk = std::bind( &core::makeDiskStorageDataFrame, boost::filesystem::current_path().string(), cols, rows, capacity, core::CDataFrame::EReadWriteToStorage::E_Async); @@ -287,7 +285,7 @@ void CDataFrameTest::testMemoryUsage() { for (const auto& factory : {makeOnDisk, makeMainMemory}) { LOG_DEBUG(<< "Test memory usage " << type[t]); - core::CDataFrame frame{factory()}; + auto frame = factory(); for (std::size_t i = 0; i < components.size(); i += cols) { auto writer = [&components, cols, i](TFloatVecItr output) mutable { @@ -295,12 +293,12 @@ void CDataFrameTest::testMemoryUsage() { *output = components[i]; } }; - frame.writeRow(writer); + frame->writeRow(writer); } - frame.finishWritingRows(); + frame->finishWritingRows(); - LOG_DEBUG(<< "Memory = " << frame.memoryUsage()); - CPPUNIT_ASSERT(frame.memoryUsage() < maximumMemory[t++]); + LOG_DEBUG(<< "Memory = " << frame->memoryUsage()); + CPPUNIT_ASSERT(frame->memoryUsage() < maximumMemory[t++]); } } @@ -313,8 +311,6 @@ void CDataFrameTest::testReserve() { std::size_t capacity{1000}; TFloatVec components{testData(rows, cols)}; - using TFactoryFunc = std::function; - TFactoryFunc makeOnDisk = std::bind( &core::makeDiskStorageDataFrame, boost::filesystem::current_path().string(), cols, rows, capacity, core::CDataFrame::EReadWriteToStorage::E_Async); @@ -329,8 +325,8 @@ void CDataFrameTest::testReserve() { for (const auto& factory : {makeOnDisk, makeMainMemory}) { LOG_DEBUG(<< "Test reserve " << type[t++]); - core::CDataFrame frame{factory()}; - frame.reserve(1, 20); + auto frame = factory(); + frame->reserve(1, 20); for (std::size_t i = 0; i < components.size(); i += cols) { auto writer = [&components, cols, i](TFloatVecItr output) mutable { @@ -338,14 +334,14 @@ void CDataFrameTest::testReserve() { *output = components[i]; } }; - frame.writeRow(writer); + frame->writeRow(writer); } - frame.finishWritingRows(); + frame->finishWritingRows(); bool successful; bool passed{true}; std::size_t i{0}; - std::tie(std::ignore, successful) = frame.readRows( + std::tie(std::ignore, successful) = frame->readRows( 1, std::bind(makeReader(components, cols, passed), std::ref(i), std::placeholders::_1, std::placeholders::_2)); CPPUNIT_ASSERT(successful); @@ -358,7 +354,7 @@ void CDataFrameTest::testReserve() { for (const auto& factory : {makeOnDisk, makeMainMemory}) { LOG_DEBUG(<< "Test reserve " << type[t++]); - core::CDataFrame frame{factory()}; + auto frame = factory(); for (std::size_t i = 0; i < components.size(); i += cols) { auto writer = [&components, cols, i](TFloatVecItr output) mutable { @@ -366,16 +362,16 @@ void CDataFrameTest::testReserve() { *output = components[i]; } }; - frame.writeRow(writer); + frame->writeRow(writer); } - frame.finishWritingRows(); + frame->finishWritingRows(); - frame.reserve(2, 20); + frame->reserve(2, 20); bool successful; bool passed{true}; std::size_t i{0}; - std::tie(std::ignore, successful) = frame.readRows( + std::tie(std::ignore, successful) = frame->readRows( 1, std::bind(makeReader(components, cols, passed), std::ref(i), std::placeholders::_1, std::placeholders::_2)); CPPUNIT_ASSERT(successful); @@ -393,8 +389,6 @@ void CDataFrameTest::testResizeColumns() { std::size_t capacity{1000}; TFloatVec components{testData(rows, cols)}; - using TFactoryFunc = std::function; - TFactoryFunc makeOnDisk = std::bind( &core::makeDiskStorageDataFrame, boost::filesystem::current_path().string(), cols, rows, capacity, core::CDataFrame::EReadWriteToStorage::E_Async); @@ -407,7 +401,7 @@ void CDataFrameTest::testResizeColumns() { for (const auto& factory : {makeOnDisk, makeMainMemory}) { LOG_DEBUG(<< "Test resize " << type[t++]); - core::CDataFrame frame{factory()}; + auto frame = factory(); for (std::size_t i = 0; i < components.size(); i += cols) { auto writer = [&components, cols, i](TFloatVecItr output) mutable { @@ -415,16 +409,16 @@ void CDataFrameTest::testResizeColumns() { *output = components[i]; } }; - frame.writeRow(writer); + frame->writeRow(writer); } - frame.finishWritingRows(); + frame->finishWritingRows(); - frame.resizeColumns(2, 18); + frame->resizeColumns(2, 18); bool successful; bool passed{true}; std::tie(std::ignore, successful) = - frame.readRows(1, [&passed](TRowItr beginRows, TRowItr endRows) { + frame->readRows(1, [&passed](TRowItr beginRows, TRowItr endRows) { for (auto row = beginRows; row != endRows; ++row) { if (passed && row->numberColumns() != 18) { LOG_DEBUG(<< "got " << row->numberColumns() << " columns"); @@ -454,8 +448,6 @@ void CDataFrameTest::testWriteColumns() { std::size_t capacity{1000}; TFloatVec components{testData(rows, cols + extraCols)}; - using TFactoryFunc = std::function; - TFactoryFunc makeOnDisk = std::bind( &core::makeDiskStorageDataFrame, boost::filesystem::current_path().string(), cols, rows, capacity, core::CDataFrame::EReadWriteToStorage::E_Async); @@ -468,7 +460,7 @@ void CDataFrameTest::testWriteColumns() { for (const auto& factory : {makeOnDisk, makeMainMemory}) { LOG_DEBUG(<< "Test write columns " << type[t++]); - core::CDataFrame frame{factory()}; + auto frame = factory(); for (std::size_t i = 0; i < components.size(); i += cols + extraCols) { auto writer = [&components, cols, i](TFloatVecItr output) mutable { @@ -476,12 +468,12 @@ void CDataFrameTest::testWriteColumns() { *output = components[i]; } }; - frame.writeRow(writer); + frame->writeRow(writer); } - frame.finishWritingRows(); + frame->finishWritingRows(); - frame.resizeColumns(2, 18); - frame.writeColumns(2, [&](TRowItr beginRows, TRowItr endRows) mutable { + frame->resizeColumns(2, 18); + frame->writeColumns(2, [&](TRowItr beginRows, TRowItr endRows) mutable { for (auto row = beginRows; row != endRows; ++row) { for (std::size_t j = 15; j < 18; ++j) { std::size_t index{row->index() * (cols + extraCols) + j}; @@ -493,7 +485,7 @@ void CDataFrameTest::testWriteColumns() { bool successful; bool passed{true}; std::size_t i{0}; - std::tie(std::ignore, successful) = frame.readRows( + std::tie(std::ignore, successful) = frame->readRows( 1, std::bind(makeReader(components, cols + extraCols, passed), std::ref(i), std::placeholders::_1, std::placeholders::_2)); CPPUNIT_ASSERT(successful); diff --git a/lib/maths/CLocalOutlierFactors.cc b/lib/maths/CLocalOutlierFactors.cc index 8dcad59bc8..161363d59a 100644 --- a/lib/maths/CLocalOutlierFactors.cc +++ b/lib/maths/CLocalOutlierFactors.cc @@ -75,12 +75,11 @@ bool computeOutliersNoPartitions(std::size_t numberThreads, core::CDataFrame& fr // value is not important. This is presized so that rowsToPoints only // needs to access and write to each element. Since it does this once // per element it is thread safe. - CFloatStorage initial; - TVectorVec points(frame.numberRows(), TVector{&initial, 1}); + TVectorVec points(frame.numberRows(), TVector{nullptr, 1}); auto rowsToPoints = [&points](TRowItr beginRows, TRowItr endRows) { for (auto row = beginRows; row != endRows; ++row) { - points[row->index()] = TVector{row->data(), row->numberColumns()}; + new (&points[row->index()]) TVector{row->data(), row->numberColumns()}; } }; @@ -104,7 +103,7 @@ bool computeOutliersNoPartitions(std::size_t numberThreads, core::CDataFrame& fr frame.resizeColumns(numberThreads, frame.numberColumns() + 1); successful = frame.writeColumns(numberThreads, writeScores); if (successful == false) { - LOG_ERROR(<< "Failed to write scores"); + LOG_ERROR(<< "Failed to write scores to the data frame"); return false; } return true;