From 910f5c96fb8dc51abbe19f684f2ee9caa37f9a97 Mon Sep 17 00:00:00 2001 From: haorenfsa Date: Thu, 27 Apr 2017 22:34:56 +0800 Subject: [PATCH 1/2] job can have param, add a func to fail when queue is full --- ThreadPool.h | 46 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/ThreadPool.h b/ThreadPool.h index 730dd60..3429a33 100644 --- a/ThreadPool.h +++ b/ThreadPool.h @@ -11,6 +11,15 @@ namespace nbsdx { namespace concurrent { +template +class CallObject{ +public: + CallObject(){func = [](DataType){};}; + CallObject(std::function in1, DataType in2): + func(in1),data(in2){}; + std::function func; + DataType data; +}; /** * Simple ThreadPool that creates `ThreadCount` threads upon its creation, @@ -18,11 +27,11 @@ namespace concurrent { * * This class requires a number of c++11 features be present in your compiler. */ -template +template class ThreadPool { - +protected: std::array threads; - std::list> queue; + std::list> queue; std::atomic_int jobs_left; std::atomic_bool bailout; @@ -38,7 +47,8 @@ class ThreadPool { */ void Task() { while( !bailout ) { - next_job()(); + CallObject next = next_job(); + next.func(next.data); --jobs_left; wait_var.notify_one(); } @@ -48,8 +58,8 @@ class ThreadPool { * Get the next job; pop the first item in the queue, * otherwise wait for a signal from the main thread. */ - std::function next_job() { - std::function res; + CallObject next_job() { + CallObject res; std::unique_lock job_lock( queue_mutex ); // Wait for a job if we don't have any. @@ -61,7 +71,7 @@ class ThreadPool { queue.pop_front(); } else { // If we're bailing out, 'inject' a job into the queue to keep jobs_left accurate. - res = []{}; + res = CallObject(); ++jobs_left; } return res; @@ -104,13 +114,33 @@ class ThreadPool { * a thread is woken up to take the job. If all threads are busy, * the job is added to the end of the queue. */ - void AddJob( std::function job ) { + void AddJob( CallObject job ) { std::lock_guard guard( queue_mutex ); queue.emplace_back( job ); ++jobs_left; job_available_var.notify_one(); } + + /** + * Add a new job to the pool. If there are no jobs in the queue, + * a thread is woken up to take the job. If all threads are busy, + * the job is added to the end of the queue. + * Return: -1 if queue is full, 0 if ok + */ + int AddJobNoMoreThanMax( CallObject job ) + { + std::lock_guard guard( queue_mutex ); + if(jobs_left <= Max) + { + queue.emplace_back( job ); + ++jobs_left; + job_available_var.notify_one(); + return 0; + } + else return -1; + } + /** * Join with all threads. Block until all threads have completed. * Params: WaitForAll: If true, will wait for the queue to empty From 675fea130bcd0dfb93187eba8842f8bf6b7e74bf Mon Sep 17 00:00:00 2001 From: haorenfsa Date: Thu, 27 Apr 2017 22:47:28 +0800 Subject: [PATCH 2/2] update readme for new version --- README.md | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 36ee1f0..533d9bd 100644 --- a/README.md +++ b/README.md @@ -13,12 +13,22 @@ Public Domain. If my licensing is wrong, please let me know. Use at your own ris Below is a quick overview, but ThreadPool.h is documented, so just read that. It's less than 200 lines with comments. ```c++ -template +class CallObject{ +public: + CallObject(){func = [](DataType){};}; + CallObject(std::function in1, DataType in2): + func(in1),data(in2){}; + std::function func; + DataType data; +}; + +template class ThreadPool { public: ThreadPool(); ~ThreadPool(); - void AddJob( std::function ); + void AddJob( CallObject job ); + int AddJobNoMoreThanMax( CallObject job ); unsigned Size() const; unsigned JobsRemaining(); void JoinAll( bool WaitForAll = true ); @@ -36,16 +46,17 @@ public: int main() { using nbsdx::concurrent::ThreadPool; - ThreadPool pool; // Defaults to 10 threads. - int JOB_COUNT = 100; + ThreadPool pool; // Defaults to 20 threads,10000 queue size. + int JOB_COUNT = 20000; for( int i = 0; i < JOB_COUNT; ++i ) - pool.AddJob( []() { - std::this_thread::sleep_for( std::chrono::seconds( 1 ) ); - } ); + { + if(pool.AddJobNoMoreThanMax(CallObject job) < 0 ) + break; + } pool.JoinAll(); - std::cout << "Expected runtime: 10 seconds." << std::endl; + std::cout << "Expected runtime: x seconds." << std::endl; } ``` @@ -53,7 +64,7 @@ Convience Function for running a list of jobs in a pool, assuming the type being ```c++ template void RunInPool( Iter begin, Iter end ) { - ThreadPool pool; + ThreadPool> pool; for( ; begin != end; begin = std::next( begin ) ) pool.AddJob( *begin ); pool.JoinAll();