1#ifndef CPR_THREAD_POOL_H
2#define CPR_THREAD_POOL_H
3
4#include <atomic>
5#include <chrono>
6#include <condition_variable>
7#include <functional>
8#include <future>
9#include <list>
10#include <memory>
11#include <mutex>
12#include <queue>
13#include <thread>
14#include <utility>
15
16#define CPR_DEFAULT_THREAD_POOL_MAX_THREAD_NUM std::thread::hardware_concurrency()
17
18constexpr size_t CPR_DEFAULT_THREAD_POOL_MIN_THREAD_NUM = 1;
19constexpr std::chrono::milliseconds CPR_DEFAULT_THREAD_POOL_MAX_IDLE_TIME{60000};
20
21namespace cpr {
22
23class ThreadPool {
24 public:
25 using Task = std::function<void()>;
26
27 explicit ThreadPool(size_t min_threads = CPR_DEFAULT_THREAD_POOL_MIN_THREAD_NUM, size_t max_threads = CPR_DEFAULT_THREAD_POOL_MAX_THREAD_NUM, std::chrono::milliseconds max_idle_ms = CPR_DEFAULT_THREAD_POOL_MAX_IDLE_TIME);
28
29 virtual ~ThreadPool();
30
31 void SetMinThreadNum(size_t min_threads) {
32 min_thread_num = min_threads;
33 }
34 void SetMaxThreadNum(size_t max_threads) {
35 max_thread_num = max_threads;
36 }
37 void SetMaxIdleTime(std::chrono::milliseconds ms) {
38 max_idle_time = ms;
39 }
40 size_t GetCurrentThreadNum() {
41 return cur_thread_num;
42 }
43 size_t GetIdleThreadNum() {
44 return idle_thread_num;
45 }
46 bool IsStarted() {
47 return status != STOP;
48 }
49 bool IsStopped() {
50 return status == STOP;
51 }
52
53 int Start(size_t start_threads = 0);
54 int Stop();
55 int Pause();
56 int Resume();
57 int Wait();
58
59 /**
60 * Return a future, calling future.get() will wait task done and return RetType.
61 * Submit(fn, args...)
62 * Submit(std::bind(&Class::mem_fn, &obj))
63 * Submit(std::mem_fn(&Class::mem_fn, &obj))
64 **/
65 template <class Fn, class... Args>
66 auto Submit(Fn&& fn, Args&&... args) {
67 if (status == STOP) {
68 Start();
69 }
70 if (idle_thread_num <= 0 && cur_thread_num < max_thread_num) {
71 CreateThread();
72 }
73 using RetType = decltype(fn(args...));
74 auto task = std::make_shared<std::packaged_task<RetType()> >(std::bind(std::forward<Fn>(fn), std::forward<Args>(args)...));
75 std::future<RetType> future = task->get_future();
76 {
77 std::lock_guard<std::mutex> locker(task_mutex);
78 tasks.emplace([task] { (*task)(); });
79 }
80
81 task_cond.notify_one();
82 return future;
83 }
84
85 private:
86 bool CreateThread();
87 void AddThread(std::thread* thread);
88 void DelThread(std::thread::id id);
89
90 public:
91 size_t min_thread_num;
92 size_t max_thread_num;
93 std::chrono::milliseconds max_idle_time;
94
95 private:
96 enum Status {
97 STOP,
98 RUNNING,
99 PAUSE,
100 };
101
102 struct ThreadData {
103 std::shared_ptr<std::thread> thread;
104 std::thread::id id;
105 Status status;
106 time_t start_time;
107 time_t stop_time;
108 };
109
110 std::atomic<Status> status;
111 std::atomic<size_t> cur_thread_num;
112 std::atomic<size_t> idle_thread_num;
113 std::list<ThreadData> threads;
114 std::mutex thread_mutex;
115 std::queue<Task> tasks;
116 std::mutex task_mutex;
117 std::condition_variable task_cond;
118};
119
120} // namespace cpr
121
122#endif
123