Skip to content

Commit 454a561

Browse files
authored
[FEATURE][ML] Shallow copy memory mapped vector and matrix types (#340)
1 parent 295f9c3 commit 454a561

File tree

7 files changed

+143
-21
lines changed

7 files changed

+143
-21
lines changed

include/core/CDataFrame.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,9 @@ class CORE_EXPORT CDataFrame final {
320320
//! Get the memory used by the data frame.
321321
std::size_t memoryUsage() const;
322322

323+
//! Get a checksum of all the data stored in the data frame.
324+
std::uint64_t checksum() const;
325+
323326
// TODO Better error case diagnostics.
324327

325328
// TODO We may want an architecture agnostic check pointing mechanism for long

include/core/CDataFrameRowSlice.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class CORE_EXPORT CDataFrameRowSlice {
7575
virtual void write(const TFloatVec& values) = 0;
7676
virtual std::size_t staticSize() const = 0;
7777
virtual std::size_t memoryUsage() const = 0;
78+
virtual std::uint64_t checksum() const = 0;
7879
};
7980

8081
//! \brief In main memory CDataFrame slice storage.
@@ -95,6 +96,7 @@ class CORE_EXPORT CMainMemoryDataFrameRowSlice final : public CDataFrameRowSlice
9596
virtual void write(const TFloatVec& values);
9697
virtual std::size_t staticSize() const;
9798
virtual std::size_t memoryUsage() const;
99+
virtual std::uint64_t checksum() const;
98100

99101
private:
100102
std::size_t m_FirstRow;
@@ -155,6 +157,7 @@ class CORE_EXPORT COnDiskDataFrameRowSlice final : public CDataFrameRowSlice {
155157
virtual void write(const TFloatVec& values);
156158
virtual std::size_t staticSize() const;
157159
virtual std::size_t memoryUsage() const;
160+
virtual std::uint64_t checksum() const;
158161

159162
private:
160163
void writeToDisk(const TFloatVec& state);

include/maths/CLinearAlgebraEigen.h

Lines changed: 92 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,16 @@ struct SConstant<CDenseVector<SCALAR>> {
281281
}
282282
};
283283

284-
//! \brief Decorates an Eigen::Map of a dense matrix with some useful methods.
284+
//! \brief Decorates an Eigen::Map of a dense matrix with some useful methods
285+
//! and changes default copy semantics to shallow copy.
286+
//!
287+
//! IMPLEMENTATION:\n
288+
//! This effectively acts like a std::reference_wrapper of an Eigen::Map for
289+
//! an Eigen matrix. In particular, all copying is shallow unlike Eigen::Map
290+
//! that acts directly on the referenced memory. This is to match the behaviour
291+
//! of CMemoryMappedDenseVector.
292+
//!
293+
//! \sa CMemoryMappedDenseVector for more information.
285294
template<typename SCALAR>
286295
class CMemoryMappedDenseMatrix
287296
: public Eigen::Map<typename CDenseMatrix<SCALAR>::TBase> {
@@ -302,11 +311,32 @@ class CMemoryMappedDenseMatrix
302311
CMemoryMappedDenseMatrix(CMemoryMappedDenseMatrix& other)
303312
: CMemoryMappedDenseMatrix{static_cast<const CMemoryMappedDenseMatrix&>(other)} {}
304313
CMemoryMappedDenseMatrix(const CMemoryMappedDenseMatrix& other)
305-
: TBase{static_cast<const TBase&>(other)} {}
306-
CMemoryMappedDenseMatrix(CMemoryMappedDenseMatrix&& other) = default;
307-
CMemoryMappedDenseMatrix& operator=(const CMemoryMappedDenseMatrix& other) = default;
308-
CMemoryMappedDenseMatrix& operator=(CMemoryMappedDenseMatrix&& other) = default;
314+
: TBase{nullptr, 1, 1} {
315+
this->reseat(other);
316+
}
317+
CMemoryMappedDenseMatrix(CMemoryMappedDenseMatrix&& other)
318+
: TBase{nullptr, 1, 1} {
319+
this->reseat(other);
320+
}
321+
CMemoryMappedDenseMatrix& operator=(const CMemoryMappedDenseMatrix& other) {
322+
if (this != &other) {
323+
this->reseat(other);
324+
}
325+
return *this;
326+
}
327+
CMemoryMappedDenseMatrix& operator=(CMemoryMappedDenseMatrix&& other) {
328+
if (this != &other) {
329+
this->reseat(other);
330+
}
331+
return *this;
332+
}
309333
//@}
334+
335+
private:
336+
void reseat(const CMemoryMappedDenseMatrix& other) {
337+
TBase* base{static_cast<TBase*>(this)};
338+
new (base) TBase{const_cast<SCALAR*>(other.data()), other.rows(), other.cols()};
339+
}
310340
};
311341

312342
//! \brief Gets a constant square dense matrix with specified dimension or with
@@ -323,7 +353,37 @@ struct SConstant<CMemoryMappedDenseMatrix<SCALAR>> {
323353
}
324354
};
325355

326-
//! \brief Decorates an Eigen::Map of a dense vector with some useful methods.
356+
//! \brief Decorates an Eigen::Map of a dense vector with some useful methods
357+
//! and changes default copy semantics to shallow.
358+
//!
359+
//! IMPLEMENTATION:\n
360+
//! This effectively acts like a std::reference_wrapper of an Eigen::Map for
361+
//! an Eigen vector. In particular, all copying is shallow unlike Eigen::Map
362+
//! that acts directly on the referenced memory, i.e.
363+
//! \code{.cpp}
364+
//! double values1[]{1.0, 1.0};
365+
//! double values2[]{2.0, 2.0};
366+
//!
367+
//! CMemoryMappedDenseVector<double> mm1{values1, 2};
368+
//! CMemoryMappedDenseVector<double> mm2{values2, 2};
369+
//!
370+
//! mm1 = mm2;
371+
//! std::cout << mm1(0) << "," << mm1(1) << "," << values1[0] << "," << values1[1] << std::endl;
372+
//!
373+
//! Eigen::Map<Eigen::VectorXd> map1{values1, 2};
374+
//! Eigen::Map<Eigen::VectorXd> map2{values2, 2};
375+
//!
376+
//! map1 = map2;
377+
//! std::cout << map1(0) << "," << map1(1) << "," << values1[0] << "," << values1[1] << std::endl;
378+
//! \endcode
379+
//!
380+
//! Outputs:\n
381+
//! 2,2,1,1\n
382+
//! 2,2,2,2
383+
//!
384+
//! This better fits our needs with data frames where we want to reference the
385+
//! memory stored in the data frame rows, but never modify it directly through
386+
//! this vector type.
327387
template<typename SCALAR>
328388
class CMemoryMappedDenseVector
329389
: public Eigen::Map<typename CDenseVector<SCALAR>::TBase> {
@@ -337,17 +397,32 @@ class CMemoryMappedDenseVector
337397
//! Forwarding constructor.
338398
template<typename... ARGS>
339399
CMemoryMappedDenseVector(ARGS&&... args)
340-
: TBase(std::forward<ARGS>(args)...) {}
400+
: TBase{std::forward<ARGS>(args)...} {}
341401

342402
//! \name Copy and Move Semantics
343403
//@{
344404
CMemoryMappedDenseVector(CMemoryMappedDenseVector& other)
345405
: CMemoryMappedDenseVector{static_cast<const CMemoryMappedDenseVector&>(other)} {}
346406
CMemoryMappedDenseVector(const CMemoryMappedDenseVector& other)
347-
: TBase{static_cast<const TBase&>(other)} {}
348-
CMemoryMappedDenseVector(CMemoryMappedDenseVector&& other) = default;
349-
CMemoryMappedDenseVector& operator=(const CMemoryMappedDenseVector& other) = default;
350-
CMemoryMappedDenseVector& operator=(CMemoryMappedDenseVector&& other) = default;
407+
: TBase{nullptr, 1} {
408+
this->reseat(other);
409+
}
410+
CMemoryMappedDenseVector(CMemoryMappedDenseVector&& other)
411+
: TBase{nullptr, 1} {
412+
this->reseat(other);
413+
}
414+
CMemoryMappedDenseVector& operator=(const CMemoryMappedDenseVector& other) {
415+
if (this != &other) {
416+
this->reseat(other);
417+
}
418+
return *this;
419+
}
420+
CMemoryMappedDenseVector& operator=(CMemoryMappedDenseVector&& other) {
421+
if (this != &other) {
422+
this->reseat(other);
423+
}
424+
return *this;
425+
}
351426
//@}
352427

353428
//! Get a checksum of this object.
@@ -357,6 +432,12 @@ class CMemoryMappedDenseVector
357432
}
358433
return seed;
359434
}
435+
436+
private:
437+
void reseat(const CMemoryMappedDenseVector& other) {
438+
TBase* base{static_cast<TBase*>(this)};
439+
new (base) TBase{const_cast<SCALAR*>(other.data()), other.size()};
440+
}
360441
};
361442

362443
//! \brief Gets a constant dense vector with specified dimension.

lib/api/CDataFrameAnalyzer.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,8 @@ bool CDataFrameAnalyzer::handleControlMessage(const TStrVec& fieldValues) {
158158
case ' ':
159159
// Spaces are just used to fill the buffers and force prior messages
160160
// through the system - we don't need to do anything else.
161-
LOG_TRACE(<< "Received pad of length " << controlMessage.length());
161+
LOG_TRACE(<< "Received pad of length "
162+
<< fieldValues[m_ControlFieldIndex].length());
162163
return true;
163164
case FINISHED_DATA_CONTROL_MESSAGE_FIELD_VALUE:
164165
this->receivedAllRows();

lib/core/CDataFrame.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
#include <core/CConcurrentWrapper.h>
1010
#include <core/CDataFrameRowSlice.h>
11+
#include <core/CHashing.h>
1112
#include <core/CLogger.h>
1213
#include <core/CMemory.h>
1314
#include <core/Concurrency.h>
@@ -250,6 +251,19 @@ std::size_t CDataFrame::memoryUsage() const {
250251
return CMemory::dynamicSize(m_Slices) + CMemory::dynamicSize(m_Writer);
251252
}
252253

254+
std::uint64_t CDataFrame::checksum() const {
255+
std::vector<std::uint64_t> checksums(m_Slices.size(), 0);
256+
parallel_for_each(0, m_Slices.size(), [&](std::size_t index) {
257+
checksums[index] = m_Slices[index]->checksum();
258+
});
259+
260+
std::uint64_t result{0};
261+
for (auto checksum : checksums) {
262+
result = CHashing::hashCombine(result, checksum);
263+
}
264+
return result;
265+
}
266+
253267
CDataFrame::CDataFrameRowSliceWriter::CDataFrameRowSliceWriter(
254268
std::size_t numberRows,
255269
std::size_t rowCapacity,

lib/core/CDataFrameRowSlice.cc

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ class CBadDataFrameRowSliceHandle : public CDataFrameRowSliceHandleImpl {
8686
//! Stub for the values.
8787
mutable TFloatVec m_Empty;
8888
};
89+
90+
//! Checksum \p slice.
91+
uint64_t computeChecksum(const TFloatVec& slice) {
92+
return CHashing::murmurHash64(
93+
slice.data(), static_cast<int>(sizeof(CFloatStorage) * slice.size()), 0);
94+
}
8995
}
9096

9197
//////// CDataFrameRowSliceHandle ////////
@@ -181,6 +187,10 @@ std::size_t CMainMemoryDataFrameRowSlice::memoryUsage() const {
181187
return CMemory::dynamicSize(m_State);
182188
}
183189

190+
std::uint64_t CMainMemoryDataFrameRowSlice::checksum() const {
191+
return computeChecksum(m_State);
192+
}
193+
184194
//////// COnDiskDataFrameRowSlice ////////
185195

186196
namespace {
@@ -201,12 +211,6 @@ bool sufficientDiskSpaceAvailable(const boost::filesystem::path& path, std::size
201211
}
202212
return true;
203213
}
204-
205-
//! Checksum \p slice.
206-
uint64_t checksum(const TFloatVec& slice) {
207-
return CHashing::murmurHash64(
208-
slice.data(), static_cast<int>(sizeof(CFloatStorage) * slice.size()), 0);
209-
}
210214
}
211215

212216
COnDiskDataFrameRowSlice::COnDiskDataFrameRowSlice(const TTemporaryDirectoryPtr& directory,
@@ -282,7 +286,7 @@ COnDiskDataFrameRowSlice::TSizeHandlePr COnDiskDataFrameRowSlice::read() {
282286
return {0, {boost::make_unique<CBadDataFrameRowSliceHandle>()}};
283287
}
284288

285-
if (checksum(result) != m_Checksum) {
289+
if (computeChecksum(result) != m_Checksum) {
286290
LOG_ERROR(<< "Corrupt from row " << m_FirstRow);
287291
m_StateIsBad = true;
288292
return {0, {boost::make_unique<CBadDataFrameRowSliceHandle>()}};
@@ -313,7 +317,7 @@ std::size_t COnDiskDataFrameRowSlice::memoryUsage() const {
313317

314318
void COnDiskDataFrameRowSlice::writeToDisk(const TFloatVec& state) {
315319
m_Capacity = state.size();
316-
m_Checksum = checksum(state);
320+
m_Checksum = computeChecksum(state);
317321
LOG_TRACE(<< "Checksum = " << m_Checksum);
318322

319323
std::size_t bytes{sizeof(CFloatStorage) * state.size()};
@@ -323,6 +327,10 @@ void COnDiskDataFrameRowSlice::writeToDisk(const TFloatVec& state) {
323327
file.write(reinterpret_cast<const char*>(state.data()), bytes);
324328
}
325329

330+
std::uint64_t COnDiskDataFrameRowSlice::checksum() const {
331+
return m_Checksum;
332+
}
333+
326334
bool COnDiskDataFrameRowSlice::readFromDisk(TFloatVec& result) const {
327335
result.resize(m_Capacity);
328336

lib/maths/CLocalOutlierFactors.cc

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,13 @@ bool computeOutliersNoPartitions(std::size_t numberThreads, core::CDataFrame& fr
7979

8080
auto rowsToPoints = [&points](TRowItr beginRows, TRowItr endRows) {
8181
for (auto row = beginRows; row != endRows; ++row) {
82-
new (&points[row->index()]) TVector{row->data(), row->numberColumns()};
82+
points[row->index()] =
83+
TVector{row->data(), static_cast<long>(row->numberColumns())};
8384
}
8485
};
8586

87+
std::uint64_t checksum{frame.checksum()};
88+
8689
bool successful;
8790
std::tie(std::ignore, successful) = frame.readRows(numberThreads, rowsToPoints);
8891

@@ -94,6 +97,15 @@ bool computeOutliersNoPartitions(std::size_t numberThreads, core::CDataFrame& fr
9497
TDoubleVec scores;
9598
CLocalOutlierFactors::ensemble(std::move(points), scores);
9699

100+
// This never happens now, but it is a sanity check against someone
101+
// changing CLocalOutlierFactors to accidentally write to the data
102+
// frame via one of the memory mapped vectors. All bets are off as to
103+
// if we generate anything meaningful if this happens.
104+
if (checksum != frame.checksum()) {
105+
LOG_ERROR(<< "Accidentally modified the data frame");
106+
return false;
107+
}
108+
97109
auto writeScores = [&scores](TRowItr beginRows, TRowItr endRows) {
98110
for (auto row = beginRows; row != endRows; ++row) {
99111
row->writeColumn(row->numberColumns() - 1, scores[row->index()]);

0 commit comments

Comments
 (0)