diff --git a/README.md b/README.md index be3d25a..4c771f5 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,12 @@ ThreadPool ========== -A simple C++11 Thread Pool implementation. +A simple C++20 Thread Pool implementation. Basic usage: ```c++ // create thread pool with 4 worker threads -ThreadPool pool(4); +ThreadPool pool{4}; // enqueue and store future auto result = pool.enqueue([](int answer) { return answer; }, 42); diff --git a/ThreadPool.h b/ThreadPool.h index 4183203..4ca0d6b 100644 --- a/ThreadPool.h +++ b/ThreadPool.h @@ -10,89 +10,77 @@ #include #include #include +constexpr size_t maxQueueSize = 100; -class ThreadPool { +class ThreadPool +{ public: - ThreadPool(size_t); - template - auto enqueue(F&& f, Args&&... args) - -> std::future::type>; - ~ThreadPool(); + ThreadPool(size_t threads = std::thread::hardware_concurrency() - 1) + { + workers.reserve(threads); + for (size_t i{}; i < threads; ++i) + makeThread(); + } + + template + auto enqueue(F &&f, Args &&... args) + { + using return_type = std::invoke_result_t; + //...v This need C++20! + std::packaged_task task{[f = std::forward(f), ... args = std::forward(args)]() mutable { return f(std::forward(args)...); }}; + auto result = task.get_future(); + { + std::unique_lock lock(queue_mutex); + + // don't allow enqueueing after stopping the pool + if (stop) + throw std::runtime_error("enqueue on stopped ThreadPool"); + + if (tasks.size() >= maxQueueSize) + queueStatus.wait(lock, [&]() { return tasks.size() < maxQueueSize; }); + + tasks.emplace(std::move(task));//No need for std::shared_ptr anymore! + } + hasNewTask.notify_one(); + return result; + } + + ~ThreadPool() + { + stop = true; + hasNewTask.notify_all(); + for (auto &worker : workers) + worker.join(); + } + private: - // need to keep track of threads so we can join them - std::vector< std::thread > workers; - // the task queue - std::queue< std::function > tasks; - - // synchronization + std::vector workers; + std::queue> tasks; std::mutex queue_mutex; - std::condition_variable condition; - bool stop; -}; - -// the constructor just launches some amount of workers -inline ThreadPool::ThreadPool(size_t threads) - : stop(false) -{ - for(size_t i = 0;i task; - + std::packaged_task task; { std::unique_lock lock(this->queue_mutex); - this->condition.wait(lock, - [this]{ return this->stop || !this->tasks.empty(); }); - if(this->stop && this->tasks.empty()) + this->hasNewTask.wait(lock, [&] { return this->stop || !this->tasks.empty(); }); + if (this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); + queueStatus.notify_one(); } - task(); } - } - ); -} - -// add new work item to the pool -template -auto ThreadPool::enqueue(F&& f, Args&&... args) - -> std::future::type> -{ - using return_type = typename std::result_of::type; - - auto task = std::make_shared< std::packaged_task >( - std::bind(std::forward(f), std::forward(args)...) - ); - - std::future res = task->get_future(); - { - std::unique_lock lock(queue_mutex); - - // don't allow enqueueing after stopping the pool - if(stop) - throw std::runtime_error("enqueue on stopped ThreadPool"); - - tasks.emplace([task](){ (*task)(); }); - } - condition.notify_one(); - return res; -} - -// the destructor joins all threads -inline ThreadPool::~ThreadPool() -{ - { - std::unique_lock lock(queue_mutex); - stop = true; + }); } - condition.notify_all(); - for(std::thread &worker: workers) - worker.join(); -} +}; #endif