Skip to content

Commit 96bcb67

Browse files
authored
[FEATURE][ML] Store and retrieve doc id (32 bit hashes) for data frame analyses (#358)
1 parent b1db773 commit 96bcb67

File tree

11 files changed

+414
-233
lines changed

11 files changed

+414
-233
lines changed

include/api/CDataFrameAnalyzer.h

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ class API_EXPORT CDataFrameAnalyzer {
5454
using TDataFrameUPtr = std::unique_ptr<core::CDataFrame>;
5555

5656
private:
57-
static const std::ptrdiff_t CONTROL_FIELD_UNSET{-2};
58-
static const std::ptrdiff_t CONTROL_FIELD_MISSING{-1};
57+
static const std::ptrdiff_t FIELD_UNSET{-2};
58+
static const std::ptrdiff_t FIELD_MISSING{-1};
5959

6060
private:
6161
bool sufficientFieldValues(const TStrVec& fieldNames) const;
@@ -68,10 +68,12 @@ class API_EXPORT CDataFrameAnalyzer {
6868

6969
private:
7070
// This has values: -2 (unset), -1 (missing), >= 0 (control field index).
71-
std::ptrdiff_t m_ControlFieldIndex = CONTROL_FIELD_UNSET;
72-
std::ptrdiff_t m_BeginDataFieldValues = -1;
73-
std::ptrdiff_t m_EndDataFieldValues = -1;
71+
std::ptrdiff_t m_ControlFieldIndex = FIELD_UNSET;
72+
std::ptrdiff_t m_BeginDataFieldValues = FIELD_UNSET;
73+
std::ptrdiff_t m_EndDataFieldValues = FIELD_UNSET;
74+
std::ptrdiff_t m_DocIdFieldIndex = FIELD_UNSET;
7475
std::uint64_t m_BadValueCount;
76+
std::uint64_t m_BadDocIdCount;
7577
TDataFrameAnalysisSpecificationUPtr m_AnalysisSpecification;
7678
TDataFrameUPtr m_DataFrame;
7779
TJsonOutputStreamWrapperUPtrSupplier m_OutStreamSupplier;

include/core/CDataFrame.h

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ namespace data_frame_detail {
2828

2929
using TFloatVec = std::vector<CFloatStorage>;
3030
using TFloatVecItr = TFloatVec::iterator;
31+
using TInt32Vec = std::vector<std::int32_t>;
32+
using TInt32VecCItr = TInt32Vec::const_iterator;
3133

3234
//! \brief A lightweight wrapper around a single row of the data frame.
3335
//!
@@ -45,7 +47,8 @@ class CORE_EXPORT CRowRef {
4547
//! \param[in] beginColumns The iterator for the columns of row \p index.
4648
//! \param[in] endColumns The iterator for the end of the columns of row
4749
//! \p index.
48-
CRowRef(std::size_t index, TFloatVecItr beginColumns, TFloatVecItr endColumns);
50+
//! \param[in] docId The row's document identifier.
51+
CRowRef(std::size_t index, TFloatVecItr beginColumns, TFloatVecItr endColumns, std::int32_t docId);
4952

5053
//! Get column \p i value.
5154
CFloatStorage operator[](std::size_t i) const;
@@ -71,10 +74,14 @@ class CORE_EXPORT CRowRef {
7174
std::copy(m_BeginColumns, m_EndColumns, output);
7275
}
7376

77+
//! Get the row's document identifier.
78+
std::int32_t docId() const;
79+
7480
private:
7581
std::size_t m_Index;
7682
TFloatVecItr m_BeginColumns;
7783
TFloatVecItr m_EndColumns;
84+
std::int32_t m_DocId;
7885
};
7986

8087
//! \brief Decorates CRowCRef to give it pointer semantics.
@@ -100,8 +107,15 @@ class CORE_EXPORT CRowIterator
100107
//! \param[in] numberColumns The number of columns in the data frame.
101108
//! \param[in] rowCapacity The capacity of each row in the data frame.
102109
//! \param[in] index The row index.
103-
//! \param[in] base The iterator for the columns of row \p index.
104-
CRowIterator(std::size_t numberColumns, std::size_t rowCapacity, std::size_t index, TFloatVecItr base);
110+
//! \param[in] rowItr The iterator for the columns of the rows starting
111+
//! at \p index.
112+
//! \param[in] docIdItr The iterator for the document identifiers of rows
113+
//! starting at \p index.
114+
CRowIterator(std::size_t numberColumns,
115+
std::size_t rowCapacity,
116+
std::size_t index,
117+
TFloatVecItr rowItr,
118+
TInt32VecCItr docIdItr);
105119

106120
//! \name Forward Iterator Contract
107121
//@{
@@ -113,13 +127,12 @@ class CORE_EXPORT CRowIterator
113127
CRowIterator operator++(int);
114128
//@}
115129

116-
TFloatVecItr base() const;
117-
118130
private:
119131
std::size_t m_NumberColumns = 0;
120132
std::size_t m_RowCapacity = 0;
121133
std::size_t m_Index = 0;
122-
TFloatVecItr m_Base;
134+
TFloatVecItr m_RowItr;
135+
TInt32VecCItr m_DocIdItr;
123136
};
124137
}
125138

@@ -169,16 +182,17 @@ class CORE_EXPORT CDataFrame final {
169182
public:
170183
using TFloatVec = std::vector<CFloatStorage>;
171184
using TFloatVecItr = TFloatVec::iterator;
172-
using TSizeFloatVecPr = std::pair<std::size_t, TFloatVec>;
185+
using TInt32Vec = std::vector<std::int32_t>;
173186
using TRowItr = data_frame_detail::CRowIterator;
174187
using TRowFunc = std::function<void(TRowItr, TRowItr)>;
175188
using TRowFuncVec = std::vector<TRowFunc>;
176189
using TRowFuncVecBoolPr = std::pair<TRowFuncVec, bool>;
177-
using TWriteFunc = std::function<void(TFloatVecItr)>;
190+
using TWriteFunc = std::function<void(TFloatVecItr, std::int32_t&)>;
178191
using TRowSlicePtr = std::shared_ptr<CDataFrameRowSlice>;
179192
using TRowSlicePtrVec = std::vector<TRowSlicePtr>;
180193
using TSizeRowSliceHandlePr = std::pair<std::size_t, CDataFrameRowSliceHandle>;
181-
using TWriteSliceToStoreFunc = std::function<TRowSlicePtr(std::size_t, TFloatVec)>;
194+
using TWriteSliceToStoreFunc =
195+
std::function<TRowSlicePtr(std::size_t, TFloatVec, TInt32Vec)>;
182196

183197
//! Controls whether to read and write to storage asynchronously.
184198
enum class EReadWriteToStorage { E_Async, E_Sync };
@@ -359,7 +373,8 @@ class CORE_EXPORT CDataFrame final {
359373
std::size_t m_SliceCapacityInRows;
360374
EReadWriteToStorage m_WriteToStoreSyncStrategy;
361375
TWriteSliceToStoreFunc m_WriteSliceToStore;
362-
TFloatVec m_SliceBeingWritten;
376+
TFloatVec m_RowsOfSliceBeingWritten;
377+
TInt32Vec m_DocIdsOfSliceBeingWritten;
363378
future<TRowSlicePtr> m_SliceWrittenAsyncToStore;
364379
TRowSlicePtrVec m_SlicesWrittenToStore;
365380
};

include/core/CDataFrameRowSlice.h

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ namespace data_frame_row_slice_detail {
2525
class CORE_EXPORT CDataFrameRowSliceHandleImpl {
2626
public:
2727
using TFloatVec = std::vector<CFloatStorage>;
28+
using TInt32Vec = std::vector<std::int32_t>;
2829
using TImplPtr = std::unique_ptr<CDataFrameRowSliceHandleImpl>;
2930

3031
public:
3132
virtual ~CDataFrameRowSliceHandleImpl() = default;
3233
virtual TImplPtr clone() const = 0;
33-
virtual TFloatVec& values() const = 0;
34+
virtual TFloatVec& rows() const = 0;
35+
virtual const TInt32Vec& docIds() const = 0;
3436
virtual bool bad() const = 0;
3537
};
3638
}
@@ -41,6 +43,8 @@ class CORE_EXPORT CDataFrameRowSliceHandle {
4143
public:
4244
using TFloatVec = std::vector<CFloatStorage>;
4345
using TFloatVecItr = TFloatVec::iterator;
46+
using TInt32Vec = std::vector<std::int32_t>;
47+
using TInt32VecCItr = TInt32Vec::const_iterator;
4448
using TImplPtr = std::unique_ptr<data_frame_row_slice_detail::CDataFrameRowSliceHandleImpl>;
4549

4650
public:
@@ -53,9 +57,12 @@ class CORE_EXPORT CDataFrameRowSliceHandle {
5357
CDataFrameRowSliceHandle& operator=(CDataFrameRowSliceHandle&& other);
5458

5559
std::size_t size() const;
56-
TFloatVecItr begin() const;
57-
TFloatVecItr end() const;
58-
const TFloatVec& values() const;
60+
TFloatVecItr beginRows() const;
61+
TFloatVecItr endRows() const;
62+
TInt32VecCItr beginDocIds() const;
63+
TInt32VecCItr endDocIds() const;
64+
const TFloatVec& rows() const;
65+
const TInt32Vec& docIds() const;
5966
bool bad() const;
6067

6168
private:
@@ -67,12 +74,13 @@ class CORE_EXPORT CDataFrameRowSlice {
6774
public:
6875
using TFloatVec = std::vector<CFloatStorage>;
6976
using TSizeHandlePr = std::pair<std::size_t, CDataFrameRowSliceHandle>;
77+
using TInt32Vec = std::vector<std::int32_t>;
7078

7179
public:
7280
virtual ~CDataFrameRowSlice() = default;
7381
virtual bool reserve(std::size_t numberColumns, std::size_t extraColumns) = 0;
7482
virtual TSizeHandlePr read() = 0;
75-
virtual void write(const TFloatVec& values) = 0;
83+
virtual void write(const TFloatVec& rows, const TInt32Vec& docIds) = 0;
7684
virtual std::size_t staticSize() const = 0;
7785
virtual std::size_t memoryUsage() const = 0;
7886
virtual std::uint64_t checksum() const = 0;
@@ -90,17 +98,18 @@ class CORE_EXPORT CDataFrameRowSlice {
9098
//! rows to adapt it for use by the data frame.
9199
class CORE_EXPORT CMainMemoryDataFrameRowSlice final : public CDataFrameRowSlice {
92100
public:
93-
CMainMemoryDataFrameRowSlice(std::size_t firstRow, TFloatVec state);
101+
CMainMemoryDataFrameRowSlice(std::size_t firstRow, TFloatVec rows, TInt32Vec docIds);
94102
virtual bool reserve(std::size_t numberColumns, std::size_t extraColumns);
95103
virtual TSizeHandlePr read();
96-
virtual void write(const TFloatVec& values);
104+
virtual void write(const TFloatVec& rows, const TInt32Vec& docIds);
97105
virtual std::size_t staticSize() const;
98106
virtual std::size_t memoryUsage() const;
99107
virtual std::uint64_t checksum() const;
100108

101109
private:
102110
std::size_t m_FirstRow;
103-
TFloatVec m_State;
111+
TFloatVec m_Rows;
112+
TInt32Vec m_DocIds;
104113
};
105114

106115
//! \brief On disk CDataFrame slice storage.
@@ -151,28 +160,30 @@ class CORE_EXPORT COnDiskDataFrameRowSlice final : public CDataFrameRowSlice {
151160
public:
152161
COnDiskDataFrameRowSlice(const TTemporaryDirectoryPtr& directory,
153162
std::size_t firstRow,
154-
TFloatVec state);
163+
TFloatVec rows,
164+
TInt32Vec docIds);
155165
virtual bool reserve(std::size_t numberColumns, std::size_t extraColumns);
156166
virtual TSizeHandlePr read();
157-
virtual void write(const TFloatVec& values);
167+
virtual void write(const TFloatVec& rows, const TInt32Vec& docIds);
158168
virtual std::size_t staticSize() const;
159169
virtual std::size_t memoryUsage() const;
160170
virtual std::uint64_t checksum() const;
161171

162172
private:
163-
void writeToDisk(const TFloatVec& state);
164-
bool readFromDisk(TFloatVec& result) const;
173+
void writeToDisk(const TFloatVec& rows, const TInt32Vec& docIds);
174+
bool readFromDisk(TFloatVec& rows, TInt32Vec& docIds) const;
165175

166176
private:
167177
using TByteVec = CCompressUtil::TByteVec;
168178

169179
private:
170180
mutable bool m_StateIsBad = false;
171181
std::size_t m_FirstRow;
172-
std::size_t m_Capacity;
182+
std::size_t m_RowsCapacity;
183+
std::size_t m_DocIdsCapacity;
173184
TTemporaryDirectoryPtr m_Directory;
174185
boost::filesystem::path m_FileName;
175-
uint64_t m_Checksum;
186+
std::uint64_t m_Checksum;
176187
};
177188
}
178189
}

lib/api/CDataFrameAnalysisSpecification.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,11 @@ CDataFrameAnalysisSpecification::makeDataFrame() const {
172172
}
173173

174174
// TODO Remove hack when passing directory in config.
175+
////
175176
if (m_Runner->storeDataFrameInMainMemory() == false) {
176177
return {};
177178
}
179+
////
178180

179181
TDataFrameUPtr result{m_Runner->storeDataFrameInMainMemory()
180182
? core::makeMainStorageDataFrame(m_NumberColumns)

0 commit comments

Comments
 (0)