#pragma once /* A simple C++11 Thread Pool implementation(https://github.com/progschj/ThreadPool) modified by bab2min to have additional parameter threadId */ #include #include #include #include #include #include #include #include #include namespace tomoto { class ThreadPool { public: ThreadPool(size_t threads = 0, size_t maxQueued = 0); template auto enqueue(F&& f, Args&&... args) ->std::future::type>; template auto enqueueToAll(F&& f, Args&&... args) ->std::vector::type>>; ~ThreadPool(); size_t getNumWorkers() const { return workers.size(); } size_t getNumEnqued() const { return tasks.size(); } private: // need to keep track of threads so we can join them std::vector< std::thread > workers; // the task queue std::queue< std::function > shared_task; std::vector< std::queue< std::function > > tasks; // synchronization std::mutex queue_mutex; std::condition_variable condition, inputCnd; size_t maxQueued; bool stop; }; // the constructor just launches some amount of workers inline ThreadPool::ThreadPool(size_t threads, size_t _maxQueued) : tasks(threads), maxQueued(_maxQueued), stop(false) { for (size_t i = 0; i < threads; ++i) { workers.emplace_back([this, i] { while (1) { std::function task; { std::unique_lock lock(this->queue_mutex); this->condition.wait(lock, [this, i] { return this->stop || !this->shared_task.empty() || !this->tasks[i].empty(); }); if (this->stop && this->shared_task.empty() && this->tasks[i].empty()) return; if (this->tasks[i].empty()) { task = std::move(this->shared_task.front()); this->shared_task.pop(); } else { task = std::move(this->tasks[i].front()); this->tasks[i].pop(); } if (this->maxQueued) this->inputCnd.notify_all(); } //std::cout << "Start #" << i << std::endl; task(i); //std::cout << "End #" << i << std::endl; } }); } } // add new work item to the pool template auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future::type> { using return_type = typename std::result_of::type; auto task = std::make_shared< std::packaged_task >( std::bind(std::forward(f), std::placeholders::_1, std::forward(args)...)); std::future res = task->get_future(); { std::unique_lock lock(queue_mutex); // don't allow enqueueing after stopping the pool if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); if (maxQueued && shared_task.size() >= maxQueued) { inputCnd.wait(lock, [&]() { return shared_task.size() < maxQueued; }); } shared_task.emplace([task](size_t id) { (*task)(id); }); } condition.notify_one(); return res; } template auto ThreadPool::enqueueToAll(F&& f, Args&&... args) ->std::vector::type> > { using return_type = typename std::result_of::type; std::vector > ret; std::unique_lock lock(queue_mutex); for (size_t i = 0; i < workers.size(); ++i) { auto task = std::make_shared< std::packaged_task >( std::bind(f, std::placeholders::_1, args...)); ret.emplace_back(task->get_future()); { // don't allow enqueueing after stopping the pool if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks[i].emplace([task](size_t id) { (*task)(id); }); } } condition.notify_all(); return ret; } // the destructor joins all threads inline ThreadPool::~ThreadPool() { { std::unique_lock lock(queue_mutex); stop = true; } condition.notify_all(); for (std::thread &worker : workers) worker.join(); } }