1 | #ifndef CPR_THREAD_POOL_H |
2 | #define CPR_THREAD_POOL_H |
3 | |
4 | #include <atomic> |
5 | #include <chrono> |
6 | #include <condition_variable> |
7 | #include <functional> |
8 | #include <future> |
9 | #include <list> |
10 | #include <memory> |
11 | #include <mutex> |
12 | #include <queue> |
13 | #include <thread> |
14 | #include <utility> |
15 | |
16 | #define CPR_DEFAULT_THREAD_POOL_MAX_THREAD_NUM std::thread::hardware_concurrency() |
17 | |
18 | constexpr size_t CPR_DEFAULT_THREAD_POOL_MIN_THREAD_NUM = 1; |
19 | constexpr std::chrono::milliseconds CPR_DEFAULT_THREAD_POOL_MAX_IDLE_TIME{60000}; |
20 | |
21 | namespace cpr { |
22 | |
23 | class ThreadPool { |
24 | public: |
25 | using Task = std::function<void()>; |
26 | |
27 | explicit ThreadPool(size_t min_threads = CPR_DEFAULT_THREAD_POOL_MIN_THREAD_NUM, size_t max_threads = CPR_DEFAULT_THREAD_POOL_MAX_THREAD_NUM, std::chrono::milliseconds max_idle_ms = CPR_DEFAULT_THREAD_POOL_MAX_IDLE_TIME); |
28 | |
29 | virtual ~ThreadPool(); |
30 | |
31 | void SetMinThreadNum(size_t min_threads) { |
32 | min_thread_num = min_threads; |
33 | } |
34 | void SetMaxThreadNum(size_t max_threads) { |
35 | max_thread_num = max_threads; |
36 | } |
37 | void SetMaxIdleTime(std::chrono::milliseconds ms) { |
38 | max_idle_time = ms; |
39 | } |
40 | size_t GetCurrentThreadNum() { |
41 | return cur_thread_num; |
42 | } |
43 | size_t GetIdleThreadNum() { |
44 | return idle_thread_num; |
45 | } |
46 | bool IsStarted() { |
47 | return status != STOP; |
48 | } |
49 | bool IsStopped() { |
50 | return status == STOP; |
51 | } |
52 | |
53 | int Start(size_t start_threads = 0); |
54 | int Stop(); |
55 | int Pause(); |
56 | int Resume(); |
57 | int Wait(); |
58 | |
59 | /** |
60 | * Return a future, calling future.get() will wait task done and return RetType. |
61 | * Submit(fn, args...) |
62 | * Submit(std::bind(&Class::mem_fn, &obj)) |
63 | * Submit(std::mem_fn(&Class::mem_fn, &obj)) |
64 | **/ |
65 | template <class Fn, class... Args> |
66 | auto Submit(Fn&& fn, Args&&... args) { |
67 | if (status == STOP) { |
68 | Start(); |
69 | } |
70 | if (idle_thread_num <= 0 && cur_thread_num < max_thread_num) { |
71 | CreateThread(); |
72 | } |
73 | using RetType = decltype(fn(args...)); |
74 | auto task = std::make_shared<std::packaged_task<RetType()> >(std::bind(std::forward<Fn>(fn), std::forward<Args>(args)...)); |
75 | std::future<RetType> future = task->get_future(); |
76 | { |
77 | std::lock_guard<std::mutex> locker(task_mutex); |
78 | tasks.emplace([task] { (*task)(); }); |
79 | } |
80 | |
81 | task_cond.notify_one(); |
82 | return future; |
83 | } |
84 | |
85 | private: |
86 | bool CreateThread(); |
87 | void AddThread(std::thread* thread); |
88 | void DelThread(std::thread::id id); |
89 | |
90 | public: |
91 | size_t min_thread_num; |
92 | size_t max_thread_num; |
93 | std::chrono::milliseconds max_idle_time; |
94 | |
95 | private: |
96 | enum Status { |
97 | STOP, |
98 | RUNNING, |
99 | PAUSE, |
100 | }; |
101 | |
102 | struct ThreadData { |
103 | std::shared_ptr<std::thread> thread; |
104 | std::thread::id id; |
105 | Status status; |
106 | time_t start_time; |
107 | time_t stop_time; |
108 | }; |
109 | |
110 | std::atomic<Status> status; |
111 | std::atomic<size_t> cur_thread_num; |
112 | std::atomic<size_t> idle_thread_num; |
113 | std::list<ThreadData> threads; |
114 | std::mutex thread_mutex; |
115 | std::queue<Task> tasks; |
116 | std::mutex task_mutex; |
117 | std::condition_variable task_cond; |
118 | }; |
119 | |
120 | } // namespace cpr |
121 | |
122 | #endif |
123 | |