Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions include/core/CConcurrentQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ class CConcurrentQueue final : private CNonCopyable {
}

//! Pop an item out of the queue, this returns none if an item isn't available
TOptional tryPop() {
//! or the pop isn't allowed
template<typename PREDICATE>
TOptional tryPop(PREDICATE allowed) {
std::unique_lock<std::mutex> lock(m_Mutex);
if (m_Queue.empty()) {
if (m_Queue.empty() || allowed(m_Queue.front()) == false) {
return boost::none;
}

Expand All @@ -72,6 +74,9 @@ class CConcurrentQueue final : private CNonCopyable {
return result;
}

//! Pop an item out of the queue, this returns none if an item isn't available
TOptional tryPop() { return this->tryPop(always); }

//! Push a copy of \p item onto the queue, this blocks if the queue is full which
//! means it can deadlock if no one consumes items (implementor's responsibility)
void push(const T& item) {
Expand Down Expand Up @@ -150,6 +155,8 @@ class CConcurrentQueue final : private CNonCopyable {
}
}

static bool always(const T&) { return true; }

private:
//! The internal queue
boost::circular_buffer<T> m_Queue;
Expand Down
23 changes: 18 additions & 5 deletions include/core/CStaticThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class CORE_EXPORT CStaticThreadPool {
//! and is suitable for our use case where we don't need to guaranty that this
//! always returns immediately and instead want to exert back pressure on the
//! thread scheduling tasks if the pool can't keep up.
void schedule(TTask&& task);
void schedule(std::packaged_task<boost::any()>&& task);

//! Executes the specified function in the thread pool.
void schedule(std::function<void()>&& f);
Expand All @@ -61,14 +61,27 @@ class CORE_EXPORT CStaticThreadPool {
void busy(bool busy);

private:
using TOptionalTask = boost::optional<TTask>;
using TTaskQueue = CConcurrentQueue<TTask, 50>;
using TTaskQueueVec = std::vector<TTaskQueue>;
using TOptionalSize = boost::optional<std::size_t>;
class CWrappedTask {
public:
explicit CWrappedTask(TTask&& task, TOptionalSize threadId = boost::none);

bool executableOnThread(std::size_t id) const;
void operator()();

private:
TTask m_Task;
TOptionalSize m_ThreadId;
};
using TOptionalTask = boost::optional<CWrappedTask>;
using TWrappedTaskQueue = CConcurrentQueue<CWrappedTask, 50>;
using TWrappedTaskQueueVec = std::vector<TWrappedTaskQueue>;
using TThreadVec = std::vector<std::thread>;

private:
void shutdown();
void worker(std::size_t id);
void drainQueuesWithoutBlocking();

private:
// This doesn't have to be atomic because it is always only set to true,
Expand All @@ -77,7 +90,7 @@ class CORE_EXPORT CStaticThreadPool {
bool m_Done = false;
std::atomic_bool m_Busy;
std::atomic<std::uint64_t> m_Cursor;
TTaskQueueVec m_TaskQueues;
TWrappedTaskQueueVec m_TaskQueues;
TThreadVec m_Pool;
};
}
Expand Down
85 changes: 64 additions & 21 deletions lib/core/CStaticThreadPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include <core/CStaticThreadPool.h>

#include <chrono>

namespace ml {
namespace core {
namespace {
Expand All @@ -21,7 +23,7 @@ CStaticThreadPool::CStaticThreadPool(std::size_t size)
m_Pool.reserve(m_TaskQueues.size());
for (std::size_t id = 0; id < m_TaskQueues.size(); ++id) {
try {
m_Pool.emplace_back([&, id] { worker(id); });
m_Pool.emplace_back([this, id] { this->worker(id); });
} catch (...) {
this->shutdown();
throw;
Expand All @@ -33,18 +35,19 @@ CStaticThreadPool::~CStaticThreadPool() {
this->shutdown();
}

void CStaticThreadPool::schedule(TTask&& task) {
void CStaticThreadPool::schedule(TTask&& task_) {
// Only block if every queue is full.
std::size_t size{m_TaskQueues.size()};
std::size_t i{m_Cursor.load()};
std::size_t end{i + size};
CWrappedTask task{std::forward<TTask>(task_)};
for (/**/; i < end; ++i) {
if (m_TaskQueues[i % size].tryPush(std::forward<TTask>(task))) {
if (m_TaskQueues[i % size].tryPush(std::move(task))) {
break;
}
}
if (i == end) {
m_TaskQueues[i % size].push(std::forward<TTask>(task));
m_TaskQueues[i % size].push(std::move(task));
}
m_Cursor.store(i + 1);
}
Expand All @@ -65,33 +68,38 @@ void CStaticThreadPool::busy(bool value) {
}

void CStaticThreadPool::shutdown() {
// Signal to each thread that it is finished.
for (auto& queue : m_TaskQueues) {
queue.push(TTask{[&] {

// Drain the queues before starting to shut down in order to maximise throughput.
this->drainQueuesWithoutBlocking();

// Signal to each thread that it is finished. We bind each task to a thread so
// so each thread executes exactly one shutdown task.
for (std::size_t id = 0; id < m_TaskQueues.size(); ++id) {
TTask done{[&] {
m_Done = true;
return boost::any{};
}});
}};
m_TaskQueues[id].push(CWrappedTask{std::move(done), id});
}

for (auto& thread : m_Pool) {
if (thread.joinable()) {
thread.join();
}
}

m_TaskQueues.clear();
m_Pool.clear();
}

void CStaticThreadPool::worker(std::size_t id) {

auto noThrowExecute = [](TOptionalTask& task) {
try {
(*task)();
} catch (const std::future_error& e) {
LOG_ERROR(<< "Failed executing packaged task: '" << e.code() << "' "
<< "with error '" << e.what() << "'");
}
auto ifAllowed = [id](const CWrappedTask& task) {
return task.executableOnThread(id);
};

TOptionalTask task;

while (m_Done == false) {
// We maintain "worker count" queues and each worker has an affinity to a
// different queue. We don't immediately block if the worker's "queue" is
Expand All @@ -101,9 +109,8 @@ void CStaticThreadPool::worker(std::size_t id) {
// workers on queue reads.

std::size_t size{m_TaskQueues.size()};
TOptionalTask task;
for (std::size_t i = 0; i < size; ++i) {
task = m_TaskQueues[(id + i) % size].tryPop();
task = m_TaskQueues[(id + i) % size].tryPop(ifAllowed);
if (task != boost::none) {
break;
}
Expand All @@ -112,12 +119,48 @@ void CStaticThreadPool::worker(std::size_t id) {
task = m_TaskQueues[id].pop();
}

noThrowExecute(task);
(*task)();

// In the typical situation that the thread(s) adding tasks to the queues can
// do this much faster than the threads consuming them, all queues will be full
// and the producer(s) will be waiting to add a task as each one is consumed.
// By switching to work on a new queue here we minimise contention between the
// producers and consumers. Testing on bare metal (OSX) the overhead per task
// dropped from around 2.2 microseconds to 1.5 microseconds by yielding here.
std::this_thread::yield();
}
}

void CStaticThreadPool::drainQueuesWithoutBlocking() {
TOptionalTask task;
auto popTask = [&] {
for (auto& queue : m_TaskQueues) {
task = queue.tryPop();
if (task != boost::none) {
(*task)();
return true;
}
}
return false;
};
while (popTask()) {
}
}

CStaticThreadPool::CWrappedTask::CWrappedTask(TTask&& task, TOptionalSize threadId)
: m_Task{std::forward<TTask>(task)}, m_ThreadId{threadId} {
}

bool CStaticThreadPool::CWrappedTask::executableOnThread(std::size_t id) const {
return m_ThreadId == boost::none || *m_ThreadId == id;
}

// Drain this thread's queue before exiting.
for (auto task = m_TaskQueues[id].tryPop(); task; task = m_TaskQueues[id].tryPop()) {
noThrowExecute(task);
void CStaticThreadPool::CWrappedTask::operator()() {
try {
m_Task();
} catch (const std::future_error& e) {
LOG_ERROR(<< "Failed executing packaged task: '" << e.code() << "' "
<< "with error '" << e.what() << "'");
}
}
}
Expand Down
32 changes: 26 additions & 6 deletions lib/core/unittest/CStaticThreadPoolTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,10 @@ void CStaticThreadPoolTest::testThroughputStability() {

CPPUNIT_ASSERT_EQUAL(2000u, counter.load());

// The best we can achieve is 2000ms ignoring all overheads. In fact, there will
// be imbalance in the queues when the pool shuts down which is then performed
// single threaded. Also there are other overheads.
// The best we can achieve is 2000ms ignoring all overheads.
std::uint64_t totalTime{totalTimeWatch.stop()};
LOG_DEBUG(<< "Total time = " << totalTime);
//CPPUNIT_ASSERT(totalTime <= 2600);
//CPPUNIT_ASSERT(totalTime <= 2400);
}

void CStaticThreadPoolTest::testManyTasksThroughput() {
Expand Down Expand Up @@ -149,7 +147,26 @@ void CStaticThreadPoolTest::testManyTasksThroughput() {
//CPPUNIT_ASSERT(totalTime <= 780);
}

void CStaticThreadPoolTest::testExceptions() {
void CStaticThreadPoolTest::testSchedulingOverhead() {

// Test the overhead per task is less than 1.6 microseconds.

core::CStaticThreadPool pool{4};

core::CStopWatch watch{true};
for (std::size_t i = 0; i < 1000000; ++i) {
if (i % 100000 == 0) {
LOG_DEBUG(<< i);
}
pool.schedule([]() {});
}

double overhead{static_cast<double>(watch.stop()) / 1000.0};
LOG_DEBUG(<< "Total time = " << overhead);
//CPPUNIT_ASSERT(overhead < 1.6);
}

void CStaticThreadPoolTest::testWithExceptions() {

// Check we don't deadlock we don't kill worker threads if we do stupid things.

Expand Down Expand Up @@ -184,7 +201,10 @@ CppUnit::Test* CStaticThreadPoolTest::suite() {
"CStaticThreadPoolTest::testManyTasksThroughput",
&CStaticThreadPoolTest::testManyTasksThroughput));
suiteOfTests->addTest(new CppUnit::TestCaller<CStaticThreadPoolTest>(
"CStaticThreadPoolTest::testExceptions", &CStaticThreadPoolTest::testExceptions));
"CStaticThreadPoolTest::testSchedulingOverhead",
&CStaticThreadPoolTest::testSchedulingOverhead));
suiteOfTests->addTest(new CppUnit::TestCaller<CStaticThreadPoolTest>(
"CStaticThreadPoolTest::testWithExceptions", &CStaticThreadPoolTest::testWithExceptions));

return suiteOfTests;
}
3 changes: 2 additions & 1 deletion lib/core/unittest/CStaticThreadPoolTest.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ class CStaticThreadPoolTest : public CppUnit::TestFixture {
void testScheduleDelayMinimisation();
void testThroughputStability();
void testManyTasksThroughput();
void testExceptions();
void testSchedulingOverhead();
void testWithExceptions();

static CppUnit::Test* suite();
};
Expand Down