Skip to content
This repository was archived by the owner on Feb 9, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,22 @@ Public Domain. If my licensing is wrong, please let me know. Use at your own ris
Below is a quick overview, but ThreadPool.h is documented, so just read that. It's less than 200 lines with comments.

```c++
template <unsigned ThreadCount = 10>
class CallObject{
public:
CallObject(){func = [](DataType){};};
CallObject(std::function<void(DataType)> in1, DataType in2):
func(in1),data(in2){};
std::function<void(DataType)> func;
DataType data;
};

template <typename DataType, unsigned ThreadCount = 20, unsigned Max = 10000>
class ThreadPool {
public:
ThreadPool();
~ThreadPool();
void AddJob( std::function<void(void)> );
void AddJob( CallObject<DataType> job );
int AddJobNoMoreThanMax( CallObject<DataType> job );
unsigned Size() const;
unsigned JobsRemaining();
void JoinAll( bool WaitForAll = true );
Expand All @@ -36,24 +46,25 @@ public:
int main() {
using nbsdx::concurrent::ThreadPool;

ThreadPool pool; // Defaults to 10 threads.
int JOB_COUNT = 100;
ThreadPool<int> pool; // Defaults to 20 threads,10000 queue size.
int JOB_COUNT = 20000;

for( int i = 0; i < JOB_COUNT; ++i )
pool.AddJob( []() {
std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
} );
{
if(pool.AddJobNoMoreThanMax(CallObject<int> job) < 0 )
break;
}

pool.JoinAll();
std::cout << "Expected runtime: 10 seconds." << std::endl;
std::cout << "Expected runtime: x seconds." << std::endl;
}
```

Convience Function for running a list of jobs in a pool, assuming the type being iterated is of `std::function<void(void)>`:
```c++
template <typename Iter, unsigned Count = 10>
void RunInPool( Iter begin, Iter end ) {
ThreadPool<Count> pool;
ThreadPool<CallObject<your_type_name>> pool;
for( ; begin != end; begin = std::next( begin ) )
pool.AddJob( *begin );
pool.JoinAll();
Expand Down
46 changes: 38 additions & 8 deletions ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,27 @@

namespace nbsdx {
namespace concurrent {
template<typename DataType>
class CallObject{
public:
CallObject(){func = [](DataType){};};
CallObject(std::function<void(DataType)> in1, DataType in2):
func(in1),data(in2){};
std::function<void(DataType)> func;
DataType data;
};

/**
* Simple ThreadPool that creates `ThreadCount` threads upon its creation,
* and pulls from a queue to get new jobs. The default is 10 threads.
*
* This class requires a number of c++11 features be present in your compiler.
*/
template <unsigned ThreadCount = 10>
template <typename DataType, unsigned ThreadCount = 20, unsigned Max = 10000>
class ThreadPool {
protected:
std::array<std::thread, ThreadCount> threads;
std::list<std::function<void(void)>> queue;
std::list<CallObject<DataType>> queue;

std::atomic_int jobs_left;
std::atomic_bool bailout;
Expand All @@ -38,7 +47,8 @@ class ThreadPool {
*/
void Task() {
while( !bailout ) {
next_job()();
CallObject<DataType> next = next_job();
next.func(next.data);
--jobs_left;
wait_var.notify_one();
}
Expand All @@ -48,8 +58,8 @@ class ThreadPool {
* Get the next job; pop the first item in the queue,
* otherwise wait for a signal from the main thread.
*/
std::function<void(void)> next_job() {
std::function<void(void)> res;
CallObject<DataType> next_job() {
CallObject<DataType> res;
std::unique_lock<std::mutex> job_lock( queue_mutex );

// Wait for a job if we don't have any.
Expand All @@ -61,7 +71,7 @@ class ThreadPool {
queue.pop_front();
}
else { // If we're bailing out, 'inject' a job into the queue to keep jobs_left accurate.
res = []{};
res = CallObject<DataType>();
++jobs_left;
}
return res;
Expand Down Expand Up @@ -104,13 +114,33 @@ class ThreadPool {
* a thread is woken up to take the job. If all threads are busy,
* the job is added to the end of the queue.
*/
void AddJob( std::function<void(void)> job ) {
void AddJob( CallObject<DataType> job ) {
std::lock_guard<std::mutex> guard( queue_mutex );
queue.emplace_back( job );
++jobs_left;
job_available_var.notify_one();
}


/**
* Add a new job to the pool. If there are no jobs in the queue,
* a thread is woken up to take the job. If all threads are busy,
* the job is added to the end of the queue.
* Return: -1 if queue is full, 0 if ok
*/
int AddJobNoMoreThanMax( CallObject<DataType> job )
{
std::lock_guard<std::mutex> guard( queue_mutex );
if(jobs_left <= Max)
{
queue.emplace_back( job );
++jobs_left;
job_available_var.notify_one();
return 0;
}
else return -1;
}

/**
* Join with all threads. Block until all threads have completed.
* Params: WaitForAll: If true, will wait for the queue to empty
Expand Down