1/*
2Copyright (c) 2012 Jakob Progsch, Václav Zeman
3
4This software is provided 'as-is', without any express or implied
5warranty. In no event will the authors be held liable for any damages
6arising from the use of this software.
7
8Permission is granted to anyone to use this software for any purpose,
9including commercial applications, and to alter it and redistribute it
10freely, subject to the following restrictions:
11
12 1. The origin of this software must not be misrepresented; you must not
13 claim that you wrote the original software. If you use this software
14 in a product, an acknowledgment in the product documentation would be
15 appreciated but is not required.
16
17 2. Altered source versions must be plainly marked as such, and must not be
18 misrepresented as being the original software.
19
20 3. This notice may not be removed or altered from any source
21distribution.
22*/
23
24#ifndef THREAD_POOL_H
25#define THREAD_POOL_H
26
27#include <condition_variable>
28#include <functional>
29#include <future>
30#include <memory>
31#include <mutex>
32#include <queue>
33#include <stdexcept>
34#include <thread>
35#include <vector>
36
37namespace jsonrpc {
38 class ThreadPool {
39 public:
40 ThreadPool(size_t);
41 template <class F, class... Args>
42 auto enqueue(F &&f, Args &&...args) -> std::future<typename std::result_of<F(Args...)>::type>;
43 ~ThreadPool();
44
45 private:
46 // need to keep track of threads so we can join them
47 std::vector<std::thread> workers;
48 // the task queue
49 std::queue<std::function<void()>> tasks;
50
51 // synchronization
52 std::mutex queue_mutex;
53 std::condition_variable condition;
54 bool stop;
55 };
56
57 // the constructor just launches some amount of workers
58 inline ThreadPool::ThreadPool(size_t threads) : stop(false) {
59 for (size_t i = 0; i < threads; ++i)
60 workers.emplace_back([this] {
61 for (;;) {
62 std::function<void()> task;
63
64 {
65 std::unique_lock<std::mutex> lock(this->queue_mutex);
66 this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
67 if (this->stop && this->tasks.empty())
68 return;
69 task = std::move(this->tasks.front());
70 this->tasks.pop();
71 }
72
73 task();
74 }
75 });
76 }
77
78 // add new work item to the pool
79 template <class F, class... Args>
80 auto ThreadPool::enqueue(F &&f, Args &&...args) -> std::future<typename std::result_of<F(Args...)>::type> {
81 using return_type = typename std::result_of<F(Args...)>::type;
82
83 auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
84
85 std::future<return_type> res = task->get_future();
86 {
87 std::unique_lock<std::mutex> lock(queue_mutex);
88
89 // don't allow enqueueing after stopping the pool
90 if (stop)
91 throw std::runtime_error("enqueue on stopped ThreadPool");
92
93 tasks.emplace([task]() { (*task)(); });
94 }
95 condition.notify_one();
96 return res;
97 }
98
99 // the destructor joins all threads
100 inline ThreadPool::~ThreadPool() {
101 {
102 std::unique_lock<std::mutex> lock(queue_mutex);
103 stop = true;
104 }
105 condition.notify_all();
106 for (std::thread &worker : workers)
107 worker.join();
108 }
109} // namespace jsonrpc
110
111#endif
112