Skip to content
Merged
9 changes: 5 additions & 4 deletions bin/data_frame_analyzer/Main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,14 @@ int main(int argc, char** argv) {
return EXIT_FAILURE;
}

ml::api::CDataFrameAnalysisSpecification analysisSpecification{analysisSpecificationJson};
if (analysisSpecification.bad()) {
auto analysisSpecification =
std::make_unique<ml::api::CDataFrameAnalysisSpecification>(analysisSpecificationJson);
if (analysisSpecification->bad()) {
LOG_FATAL("Failed to parse analysis specification");
return EXIT_FAILURE;
}
if (analysisSpecification.threads() > 1) {
ml::core::startDefaultAsyncExecutor(analysisSpecification.threads());
if (analysisSpecification->numberThreads() > 1) {
ml::core::startDefaultAsyncExecutor(analysisSpecification->numberThreads());
}

ml::api::CDataFrameAnalyzer dataFrameAnalyzer{
Expand Down
27 changes: 24 additions & 3 deletions include/api/CDataFrameAnalysisRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
namespace ml {
namespace core {
class CDataFrame;
class CRapidJsonConcurrentLineWriter;
namespace data_frame_detail {
class CRowRef;
}
}
namespace api {
class CDataFrameAnalysisSpecification;
Expand Down Expand Up @@ -53,6 +57,7 @@ class CDataFrameAnalysisSpecification;
class API_EXPORT CDataFrameAnalysisRunner {
public:
using TStrVec = std::vector<std::string>;
using TRowRef = core::data_frame_detail::CRowRef;

public:
CDataFrameAnalysisRunner(const CDataFrameAnalysisSpecification& spec);
Expand All @@ -67,9 +72,25 @@ class API_EXPORT CDataFrameAnalysisRunner {
//! into main memory during an analysis.
virtual std::size_t numberOfPartitions() const = 0;

//! \return The number of columns this analysis requires. This includes
//! the columns of the input frame plus any that the analysis will append.
virtual std::size_t requiredFrameColumns() const = 0;
//! \return The number of columns this analysis appends.
virtual std::size_t numberExtraColumns() const = 0;

//! Write the extra columns of \p row added by the analysis to \p writer.
//!
//! This should create a new object of the form:
//! <pre>
//! {
//! "name of column n": "value of column n",
//! "name of column n+1": "value of column n+1",
//! ...
//! }
//! </pre>
//! with one named member for each column added.
//!
//! \param[in] row The row to write the columns added by this analysis.
//! \param[in,out] writer The stream to which to write the extra columns.
virtual void writeOneRow(TRowRef row,
core::CRapidJsonConcurrentLineWriter& writer) const = 0;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the writer change per call? if not, can it be set at construction?

Copy link
Contributor Author

@tveasey tveasey Dec 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the object created here so not currently available at construction time of CDataFrameAnalysisRunner. We could refactor so it is a member of CDataFrameAnalyzer and only created once (rather than once per analysis, although currently this is equivalent). I think this would be a good idea, but I don't want to make that refactor now.

That aside I'm not sure I'd want to make this a member of the CDataFrameAnalysisRunner. Really, this function shouldn't require state from any implementation object. It isn't static in order to be virtual, because different analyses will want to extract different column values. However, all the information is coming from the row passed in and some local statics. So this feels extrinsic to a particular instance of the class and being forced to always create a row writer whenever you create a CDataFrameAnalysisRunner object feels undesirable to me.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

understood, I am asking because this looks like the other extreme: supplying the write for every row. Looks like the reason for this is the extra wrapping required in writeResultsOf.

What about just accessing the data and doing the json'ification in writeResultsOf , which would also get rid of forward declarations.

Just suggestions.

Copy link
Contributor Author

@tveasey tveasey Dec 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is we'd then need logic in this function for every single type of analysis. I quite like that the CDataFrameAnalyzer class is agnostic to the type of analyses it is running. To keep this it has to somehow pass off writing analysis specific parts of the results to the CDataFrameAnalysisRunner.

If I understand your suggestion correctly, we'd have to have a something like a big switch statement in writeResultsOf and some sort of identifier returned by each analyser.

Equally if all the logic in writeResultsOf gets pushed down into CDataFrameAnalysisRunner implementations we'd duplicate it for each different analysis.

Note as well we capture the writer by reference in the lambda, so we only use one writer to write all rows (not one per row).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand your suggestion correctly, we'd have to have a something like a big switch statement in writeResultsOf and some sort of identifier returned by each analyser.

sorry, I do not get that. All I see at the moment is writing 1 key and 1 value. No need for a big switch statement.

Copy link
Contributor Author

@tveasey tveasey Dec 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean down the line when we have additional analysis types (regression, etc) and these will all have their own runners (say CDataFrameXGBoostRunner, etc). Each type of analysis will have its own column names and numbers of columns they want to write. This information can all be in the writeResultsOf, but not without doing something like a switch statement. Additional analysis types are definitely coming, so this isn't premature generalisation.


//! Checks whether the analysis is already running and if not launches it
//! in the background.
Expand Down
21 changes: 15 additions & 6 deletions include/api/CDataFrameAnalysisSpecification.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,29 @@ class API_EXPORT CDataFrameAnalysisSpecification {
CDataFrameAnalysisSpecification(TRunnerFactoryUPtrVec runnerFactories,
const std::string& jsonSpecification);

CDataFrameAnalysisSpecification(const CDataFrameAnalysisSpecification&) = delete;
CDataFrameAnalysisSpecification& operator=(const CDataFrameAnalysisSpecification&) = delete;
CDataFrameAnalysisSpecification(CDataFrameAnalysisSpecification&&) = delete;
CDataFrameAnalysisSpecification& operator=(CDataFrameAnalysisSpecification&&) = delete;

//! Check if the specification is bad.
bool bad() const;

//! \return The number of rows in the frame.
std::size_t rows() const;
std::size_t numberRows() const;

//! \return The number of columns in the input frame.
std::size_t cols() const;
std::size_t numberColumns() const;

//! \return The number of columns the analysis configured to run will append
//! to the data frame.
std::size_t numberExtraColumns() const;

//! \return The memory limit for the process.
std::size_t memoryLimit() const;

//! \return The number of threads the analysis can use.
std::size_t threads() const;
std::size_t numberThreads() const;

//! Run the analysis in a background thread.
//!
Expand All @@ -107,10 +116,10 @@ class API_EXPORT CDataFrameAnalysisSpecification {

private:
bool m_Bad = false;
std::size_t m_Rows = 0;
std::size_t m_Cols = 0;
std::size_t m_NumberRows = 0;
std::size_t m_NumberColumns = 0;
std::size_t m_MemoryLimit = 0;
std::size_t m_Threads = 0;
std::size_t m_NumberThreads = 0;
// TODO Sparse table support
// double m_TableLoadFactor = 0.0;
TRunnerFactoryUPtrVec m_RunnerFactories;
Expand Down
29 changes: 20 additions & 9 deletions include/api/CDataFrameAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

#ifndef INCLUDED_ml_api_CDataFrameAnalyzer_h
#define INCLUDED_ml_api_CDataFrameAnalyzer_h

#include <core/CDataFrame.h>

#include <api/CDataFrameAnalysisSpecification.h>
#include <api/ImportExport.h>

#include <cinttypes>
Expand All @@ -19,9 +17,12 @@

namespace ml {
namespace core {
class CDataFrame;
class CJsonOutputStreamWrapper;
}
namespace api {
class CDataFrameAnalysisRunner;
class CDataFrameAnalysisSpecification;

//! \brief Handles input to the data_frame_analyzer command.
class API_EXPORT CDataFrameAnalyzer {
Expand All @@ -30,39 +31,49 @@ class API_EXPORT CDataFrameAnalyzer {
using TJsonOutputStreamWrapperUPtr = std::unique_ptr<core::CJsonOutputStreamWrapper>;
using TJsonOutputStreamWrapperUPtrSupplier =
std::function<TJsonOutputStreamWrapperUPtr()>;
using TDataFrameAnalysisSpecificationUPtr = std::unique_ptr<CDataFrameAnalysisSpecification>;

public:
explicit CDataFrameAnalyzer(CDataFrameAnalysisSpecification analysisSpecification,
TJsonOutputStreamWrapperUPtrSupplier outStreamSupplier);
CDataFrameAnalyzer(TDataFrameAnalysisSpecificationUPtr analysisSpecification,
TJsonOutputStreamWrapperUPtrSupplier outStreamSupplier);
~CDataFrameAnalyzer();

//! This is true if the analyzer is receiving control messages.
bool usingControlMessages() const;

//! Handle adding a row of the data frame or a control message.
//! Handle receiving a row of the data frame or a control message.
bool handleRecord(const TStrVec& fieldNames, const TStrVec& fieldValues);

//! Call when all row have been received.
void receivedAllRows();

//! Run the configured analysis.
void run();

private:
using TDataFrameUPtr = std::unique_ptr<core::CDataFrame>;

private:
static const std::ptrdiff_t CONTROL_FIELD_UNSET{-2};
static const std::ptrdiff_t CONTROL_FIELD_MISSING{-1};

private:
bool sufficientFieldValues(const TStrVec& fieldNames) const;
bool readyToReceiveControlMessages() const;
bool prepareToReceiveControlMessages(const TStrVec& fieldNames);
bool isControlMessage(const TStrVec& fieldValues) const;
bool handleControlMessage(const TStrVec& fieldValues);
void addRowToDataFrame(const TStrVec& fieldValues);
void writeResultsOf(const CDataFrameAnalysisRunner& analysis) const;

private:
// This has values: -2 (unset), -1 (missing), >= 0 (control field index).
std::ptrdiff_t m_ControlFieldValue = CONTROL_FIELD_UNSET;
std::ptrdiff_t m_ControlFieldIndex = CONTROL_FIELD_UNSET;
std::ptrdiff_t m_BeginDataFieldValues = -1;
std::ptrdiff_t m_EndDataFieldValues = -1;
std::uint64_t m_BadValueCount;
CDataFrameAnalysisSpecification m_AnalysisSpecification;
core::CDataFrame m_DataFrame;
TDataFrameAnalysisSpecificationUPtr m_AnalysisSpecification;
TDataFrameUPtr m_DataFrame;
TJsonOutputStreamWrapperUPtrSupplier m_OutStreamSupplier;
};
}
Expand Down
12 changes: 10 additions & 2 deletions include/api/CDataFrameOutliersRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
* you may not use this file except in compliance with the Elastic License.
*/

#ifndef INCLUDED_ml_api_CDataFrameOutliersRunner_h
#define INCLUDED_ml_api_CDataFrameOutliersRunner_h

#include <api/CDataFrameAnalysisRunner.h>

#include <api/ImportExport.h>
Expand All @@ -26,8 +29,11 @@ class API_EXPORT CDataFrameOutliersRunner : public CDataFrameAnalysisRunner {
//! \sa CDataFrameAnalysisRunner::run.
virtual std::size_t numberOfPartitions() const;

//! \return The number of columns of the output frame.
virtual std::size_t requiredFrameColumns() const;
//! \return The number of columns this adds to the data frame.
virtual std::size_t numberExtraColumns() const;

//! Write the extra columns of \p row added by outlier analysis to \p writer.
virtual void writeOneRow(TRowRef row, core::CRapidJsonConcurrentLineWriter& writer) const;

private:
using TOptionalSize = boost::optional<std::size_t>;
Expand Down Expand Up @@ -62,3 +68,5 @@ class API_EXPORT CDataFrameOutliersRunnerFactory : public CDataFrameAnalysisRunn
};
}
}

#endif // INCLUDED_ml_api_CDataFrameOutliersRunner_h
25 changes: 14 additions & 11 deletions include/core/CDataFrame.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,10 @@ class CORE_EXPORT CDataFrame final {
EReadWriteToStorage readAndWriteToStoreSyncStrategy,
const TWriteSliceToStoreFunc& writeSliceToStore);

~CDataFrame();

CDataFrame(const CDataFrame&) = delete;
CDataFrame& operator=(const CDataFrame&) = delete;

CDataFrame(CDataFrame&&) = default;
CDataFrame& operator=(CDataFrame&&) = default;

Expand Down Expand Up @@ -415,10 +416,11 @@ class CORE_EXPORT CDataFrame final {
//! \param[in] readWriteToStoreSyncStrategy Controls whether reads and writes
//! from slice storage are synchronous or asynchronous.
CORE_EXPORT
CDataFrame makeMainStorageDataFrame(std::size_t numberColumns,
boost::optional<std::size_t> sliceCapacity = boost::none,
CDataFrame::EReadWriteToStorage readWriteToStoreSyncStrategy =
CDataFrame::EReadWriteToStorage::E_Sync);
std::unique_ptr<CDataFrame>
makeMainStorageDataFrame(std::size_t numberColumns,
boost::optional<std::size_t> sliceCapacity = boost::none,
CDataFrame::EReadWriteToStorage readWriteToStoreSyncStrategy =
CDataFrame::EReadWriteToStorage::E_Sync);

//! Make a data frame which uses disk storage for its slices.
//!
Expand All @@ -431,12 +433,13 @@ CDataFrame makeMainStorageDataFrame(std::size_t numberColumns,
//! \param[in] readWriteToStoreSyncStrategy Controls whether reads and writes
//! from slice storage are synchronous or asynchronous.
CORE_EXPORT
CDataFrame makeDiskStorageDataFrame(const std::string& rootDirectory,
std::size_t numberColumns,
std::size_t numberRows,
boost::optional<std::size_t> sliceCapacity = boost::none,
CDataFrame::EReadWriteToStorage readWriteToStoreSyncStrategy =
CDataFrame::EReadWriteToStorage::E_Async);
std::unique_ptr<CDataFrame>
makeDiskStorageDataFrame(const std::string& rootDirectory,
std::size_t numberColumns,
std::size_t numberRows,
boost::optional<std::size_t> sliceCapacity = boost::none,
CDataFrame::EReadWriteToStorage readWriteToStoreSyncStrategy =
CDataFrame::EReadWriteToStorage::E_Async);
}
}

Expand Down
9 changes: 5 additions & 4 deletions include/core/CRapidJsonConcurrentLineWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ class CORE_EXPORT CRapidJsonConcurrentLineWriter
std::size_t memoryUsage() const;

//! Write JSON document to outputstream
//! Note this non-virtual overwrite is needed to avoid slicing of the writer
//! and hence ensure the correct EndObject is called
//! \p doc reference to rapidjson document value
void write(rapidjson::Value& doc) { doc.Accept(*this); }
//! \note This overwrite is needed because the members of rapidjson::Writer
//! are not virtual and we need to avoid "slicing" the writer to ensure that
//! that the correct StartObject/EndObject functions are called when this is
//! passed to \p doc Accept.
void write(const rapidjson::Value& doc) { doc.Accept(*this); }

private:
//! The stream object
Expand Down
9 changes: 5 additions & 4 deletions include/core/CRapidJsonLineWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ class CRapidJsonLineWriter
}

//! Write JSON document to outputstream
//! Note this non-virtual overwrite is needed to avoid slicing of the writer
//! and hence ensure the correct StartObject/EndObject functions are called
//! \p doc reference to rapidjson document value
void write(rapidjson::Value& doc) { doc.Accept(*this); }
//! \note This overwrite is needed because the members of rapidjson::Writer
//! are not virtual and we need to avoid "slicing" the writer to ensure that
//! that the correct StartObject/EndObject functions are called when this is
//! passed to \p doc Accept.
void write(const rapidjson::Value& doc) { doc.Accept(*this); }

private:
size_t m_ObjectCount = 0;
Expand Down
2 changes: 1 addition & 1 deletion include/core/CRapidJsonWriterBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ class CRapidJsonWriterBase

//! write the rapidjson value document to the output stream
//! \p[in] doc rapidjson document value to write out
virtual void write(TValue& doc) { doc.Accept(*this); }
virtual void write(const TValue& doc) { doc.Accept(*this); }

//! Return a new rapidjson document
TDocument makeDoc() const {
Expand Down
5 changes: 3 additions & 2 deletions include/core/CStaticThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ class CORE_EXPORT CStaticThreadPool {
void worker(std::size_t id);

private:
// This doesn't have to be atomic because it is always only set to true and
// always set straight before it is checked on each worker in the pool.
// This doesn't have to be atomic because it is always only set to true,
// always set straight before it is checked on each worker in the pool
// and tearing can't happen for single byte writes.
bool m_Done = false;
std::atomic_bool m_Busy;
std::atomic<std::uint64_t> m_Cursor;
Expand Down
Loading