diff --git a/include/core/CConcurrentQueue.h b/include/core/CConcurrentQueue.h index 6452394a03..8e48f6f8b2 100644 --- a/include/core/CConcurrentQueue.h +++ b/include/core/CConcurrentQueue.h @@ -58,9 +58,11 @@ class CConcurrentQueue final : private CNonCopyable { } //! Pop an item out of the queue, this returns none if an item isn't available - TOptional tryPop() { + //! or the pop isn't allowed + template + TOptional tryPop(PREDICATE allowed) { std::unique_lock lock(m_Mutex); - if (m_Queue.empty()) { + if (m_Queue.empty() || allowed(m_Queue.front()) == false) { return boost::none; } @@ -72,6 +74,9 @@ class CConcurrentQueue final : private CNonCopyable { return result; } + //! Pop an item out of the queue, this returns none if an item isn't available + TOptional tryPop() { return this->tryPop(always); } + //! Push a copy of \p item onto the queue, this blocks if the queue is full which //! means it can deadlock if no one consumes items (implementor's responsibility) void push(const T& item) { @@ -150,6 +155,8 @@ class CConcurrentQueue final : private CNonCopyable { } } + static bool always(const T&) { return true; } + private: //! The internal queue boost::circular_buffer m_Queue; diff --git a/include/core/CStaticThreadPool.h b/include/core/CStaticThreadPool.h index c20b3a4a1a..266e1c09d8 100644 --- a/include/core/CStaticThreadPool.h +++ b/include/core/CStaticThreadPool.h @@ -49,7 +49,7 @@ class CORE_EXPORT CStaticThreadPool { //! and is suitable for our use case where we don't need to guaranty that this //! always returns immediately and instead want to exert back pressure on the //! thread scheduling tasks if the pool can't keep up. - void schedule(TTask&& task); + void schedule(std::packaged_task&& task); //! Executes the specified function in the thread pool. void schedule(std::function&& f); @@ -61,14 +61,27 @@ class CORE_EXPORT CStaticThreadPool { void busy(bool busy); private: - using TOptionalTask = boost::optional; - using TTaskQueue = CConcurrentQueue; - using TTaskQueueVec = std::vector; + using TOptionalSize = boost::optional; + class CWrappedTask { + public: + explicit CWrappedTask(TTask&& task, TOptionalSize threadId = boost::none); + + bool executableOnThread(std::size_t id) const; + void operator()(); + + private: + TTask m_Task; + TOptionalSize m_ThreadId; + }; + using TOptionalTask = boost::optional; + using TWrappedTaskQueue = CConcurrentQueue; + using TWrappedTaskQueueVec = std::vector; using TThreadVec = std::vector; private: void shutdown(); void worker(std::size_t id); + void drainQueuesWithoutBlocking(); private: // This doesn't have to be atomic because it is always only set to true, @@ -77,7 +90,7 @@ class CORE_EXPORT CStaticThreadPool { bool m_Done = false; std::atomic_bool m_Busy; std::atomic m_Cursor; - TTaskQueueVec m_TaskQueues; + TWrappedTaskQueueVec m_TaskQueues; TThreadVec m_Pool; }; } diff --git a/lib/core/CStaticThreadPool.cc b/lib/core/CStaticThreadPool.cc index b4ef7021e7..16afc65d32 100644 --- a/lib/core/CStaticThreadPool.cc +++ b/lib/core/CStaticThreadPool.cc @@ -6,6 +6,8 @@ #include +#include + namespace ml { namespace core { namespace { @@ -21,7 +23,7 @@ CStaticThreadPool::CStaticThreadPool(std::size_t size) m_Pool.reserve(m_TaskQueues.size()); for (std::size_t id = 0; id < m_TaskQueues.size(); ++id) { try { - m_Pool.emplace_back([&, id] { worker(id); }); + m_Pool.emplace_back([this, id] { this->worker(id); }); } catch (...) { this->shutdown(); throw; @@ -33,18 +35,19 @@ CStaticThreadPool::~CStaticThreadPool() { this->shutdown(); } -void CStaticThreadPool::schedule(TTask&& task) { +void CStaticThreadPool::schedule(TTask&& task_) { // Only block if every queue is full. std::size_t size{m_TaskQueues.size()}; std::size_t i{m_Cursor.load()}; std::size_t end{i + size}; + CWrappedTask task{std::forward(task_)}; for (/**/; i < end; ++i) { - if (m_TaskQueues[i % size].tryPush(std::forward(task))) { + if (m_TaskQueues[i % size].tryPush(std::move(task))) { break; } } if (i == end) { - m_TaskQueues[i % size].push(std::forward(task)); + m_TaskQueues[i % size].push(std::move(task)); } m_Cursor.store(i + 1); } @@ -65,33 +68,38 @@ void CStaticThreadPool::busy(bool value) { } void CStaticThreadPool::shutdown() { - // Signal to each thread that it is finished. - for (auto& queue : m_TaskQueues) { - queue.push(TTask{[&] { + + // Drain the queues before starting to shut down in order to maximise throughput. + this->drainQueuesWithoutBlocking(); + + // Signal to each thread that it is finished. We bind each task to a thread so + // so each thread executes exactly one shutdown task. + for (std::size_t id = 0; id < m_TaskQueues.size(); ++id) { + TTask done{[&] { m_Done = true; return boost::any{}; - }}); + }}; + m_TaskQueues[id].push(CWrappedTask{std::move(done), id}); } + for (auto& thread : m_Pool) { if (thread.joinable()) { thread.join(); } } + m_TaskQueues.clear(); m_Pool.clear(); } void CStaticThreadPool::worker(std::size_t id) { - auto noThrowExecute = [](TOptionalTask& task) { - try { - (*task)(); - } catch (const std::future_error& e) { - LOG_ERROR(<< "Failed executing packaged task: '" << e.code() << "' " - << "with error '" << e.what() << "'"); - } + auto ifAllowed = [id](const CWrappedTask& task) { + return task.executableOnThread(id); }; + TOptionalTask task; + while (m_Done == false) { // We maintain "worker count" queues and each worker has an affinity to a // different queue. We don't immediately block if the worker's "queue" is @@ -101,9 +109,8 @@ void CStaticThreadPool::worker(std::size_t id) { // workers on queue reads. std::size_t size{m_TaskQueues.size()}; - TOptionalTask task; for (std::size_t i = 0; i < size; ++i) { - task = m_TaskQueues[(id + i) % size].tryPop(); + task = m_TaskQueues[(id + i) % size].tryPop(ifAllowed); if (task != boost::none) { break; } @@ -112,12 +119,48 @@ void CStaticThreadPool::worker(std::size_t id) { task = m_TaskQueues[id].pop(); } - noThrowExecute(task); + (*task)(); + + // In the typical situation that the thread(s) adding tasks to the queues can + // do this much faster than the threads consuming them, all queues will be full + // and the producer(s) will be waiting to add a task as each one is consumed. + // By switching to work on a new queue here we minimise contention between the + // producers and consumers. Testing on bare metal (OSX) the overhead per task + // dropped from around 2.2 microseconds to 1.5 microseconds by yielding here. + std::this_thread::yield(); + } +} + +void CStaticThreadPool::drainQueuesWithoutBlocking() { + TOptionalTask task; + auto popTask = [&] { + for (auto& queue : m_TaskQueues) { + task = queue.tryPop(); + if (task != boost::none) { + (*task)(); + return true; + } + } + return false; + }; + while (popTask()) { } +} + +CStaticThreadPool::CWrappedTask::CWrappedTask(TTask&& task, TOptionalSize threadId) + : m_Task{std::forward(task)}, m_ThreadId{threadId} { +} + +bool CStaticThreadPool::CWrappedTask::executableOnThread(std::size_t id) const { + return m_ThreadId == boost::none || *m_ThreadId == id; +} - // Drain this thread's queue before exiting. - for (auto task = m_TaskQueues[id].tryPop(); task; task = m_TaskQueues[id].tryPop()) { - noThrowExecute(task); +void CStaticThreadPool::CWrappedTask::operator()() { + try { + m_Task(); + } catch (const std::future_error& e) { + LOG_ERROR(<< "Failed executing packaged task: '" << e.code() << "' " + << "with error '" << e.what() << "'"); } } } diff --git a/lib/core/unittest/CStaticThreadPoolTest.cc b/lib/core/unittest/CStaticThreadPoolTest.cc index 16eaa257ed..3b34640f7d 100644 --- a/lib/core/unittest/CStaticThreadPoolTest.cc +++ b/lib/core/unittest/CStaticThreadPoolTest.cc @@ -105,12 +105,10 @@ void CStaticThreadPoolTest::testThroughputStability() { CPPUNIT_ASSERT_EQUAL(2000u, counter.load()); - // The best we can achieve is 2000ms ignoring all overheads. In fact, there will - // be imbalance in the queues when the pool shuts down which is then performed - // single threaded. Also there are other overheads. + // The best we can achieve is 2000ms ignoring all overheads. std::uint64_t totalTime{totalTimeWatch.stop()}; LOG_DEBUG(<< "Total time = " << totalTime); - //CPPUNIT_ASSERT(totalTime <= 2600); + //CPPUNIT_ASSERT(totalTime <= 2400); } void CStaticThreadPoolTest::testManyTasksThroughput() { @@ -149,7 +147,26 @@ void CStaticThreadPoolTest::testManyTasksThroughput() { //CPPUNIT_ASSERT(totalTime <= 780); } -void CStaticThreadPoolTest::testExceptions() { +void CStaticThreadPoolTest::testSchedulingOverhead() { + + // Test the overhead per task is less than 1.6 microseconds. + + core::CStaticThreadPool pool{4}; + + core::CStopWatch watch{true}; + for (std::size_t i = 0; i < 1000000; ++i) { + if (i % 100000 == 0) { + LOG_DEBUG(<< i); + } + pool.schedule([]() {}); + } + + double overhead{static_cast(watch.stop()) / 1000.0}; + LOG_DEBUG(<< "Total time = " << overhead); + //CPPUNIT_ASSERT(overhead < 1.6); +} + +void CStaticThreadPoolTest::testWithExceptions() { // Check we don't deadlock we don't kill worker threads if we do stupid things. @@ -184,7 +201,10 @@ CppUnit::Test* CStaticThreadPoolTest::suite() { "CStaticThreadPoolTest::testManyTasksThroughput", &CStaticThreadPoolTest::testManyTasksThroughput)); suiteOfTests->addTest(new CppUnit::TestCaller( - "CStaticThreadPoolTest::testExceptions", &CStaticThreadPoolTest::testExceptions)); + "CStaticThreadPoolTest::testSchedulingOverhead", + &CStaticThreadPoolTest::testSchedulingOverhead)); + suiteOfTests->addTest(new CppUnit::TestCaller( + "CStaticThreadPoolTest::testWithExceptions", &CStaticThreadPoolTest::testWithExceptions)); return suiteOfTests; } diff --git a/lib/core/unittest/CStaticThreadPoolTest.h b/lib/core/unittest/CStaticThreadPoolTest.h index 50fe901895..3ef31adbba 100644 --- a/lib/core/unittest/CStaticThreadPoolTest.h +++ b/lib/core/unittest/CStaticThreadPoolTest.h @@ -14,7 +14,8 @@ class CStaticThreadPoolTest : public CppUnit::TestFixture { void testScheduleDelayMinimisation(); void testThroughputStability(); void testManyTasksThroughput(); - void testExceptions(); + void testSchedulingOverhead(); + void testWithExceptions(); static CppUnit::Test* suite(); };