1 | #include "cpr/threadpool.h" |
---|---|
2 | |
3 | namespace cpr { |
4 | |
5 | ThreadPool::ThreadPool(size_t min_threads, size_t max_threads, std::chrono::milliseconds max_idle_ms) : min_thread_num(min_threads), max_thread_num(max_threads), max_idle_time(max_idle_ms), status(STOP), cur_thread_num(0), idle_thread_num(0) {} |
6 | |
7 | ThreadPool::~ThreadPool() { |
8 | Stop(); |
9 | } |
10 | |
11 | int ThreadPool::Start(size_t start_threads) { |
12 | if (status != STOP) { |
13 | return -1; |
14 | } |
15 | status = RUNNING; |
16 | if (start_threads < min_thread_num) { |
17 | start_threads = min_thread_num; |
18 | } |
19 | if (start_threads > max_thread_num) { |
20 | start_threads = max_thread_num; |
21 | } |
22 | for (size_t i = 0; i < start_threads; ++i) { |
23 | CreateThread(); |
24 | } |
25 | return 0; |
26 | } |
27 | |
28 | int ThreadPool::Stop() { |
29 | if (status == STOP) { |
30 | return -1; |
31 | } |
32 | status = STOP; |
33 | task_cond.notify_all(); |
34 | for (auto& i : threads) { |
35 | if (i.thread->joinable()) { |
36 | i.thread->join(); |
37 | } |
38 | } |
39 | threads.clear(); |
40 | cur_thread_num = 0; |
41 | idle_thread_num = 0; |
42 | return 0; |
43 | } |
44 | |
45 | int ThreadPool::Pause() { |
46 | if (status == RUNNING) { |
47 | status = PAUSE; |
48 | } |
49 | return 0; |
50 | } |
51 | |
52 | int ThreadPool::Resume() { |
53 | if (status == PAUSE) { |
54 | status = RUNNING; |
55 | } |
56 | return 0; |
57 | } |
58 | |
59 | int ThreadPool::Wait() { |
60 | while (true) { |
61 | if (status == STOP || (tasks.empty() && idle_thread_num == cur_thread_num)) { |
62 | break; |
63 | } |
64 | std::this_thread::yield(); |
65 | } |
66 | return 0; |
67 | } |
68 | |
69 | bool ThreadPool::CreateThread() { |
70 | if (cur_thread_num >= max_thread_num) { |
71 | return false; |
72 | } |
73 | std::thread* thread = new std::thread([this] { |
74 | bool initialRun = true; |
75 | while (status != STOP) { |
76 | while (status == PAUSE) { |
77 | std::this_thread::yield(); |
78 | } |
79 | |
80 | Task task; |
81 | { |
82 | std::unique_lock<std::mutex> locker(task_mutex); |
83 | task_cond.wait_for(lock&: locker, rtime: std::chrono::milliseconds(max_idle_time), p: [this]() { return status == STOP || !tasks.empty(); }); |
84 | if (status == STOP) { |
85 | return; |
86 | } |
87 | if (tasks.empty()) { |
88 | if (cur_thread_num > min_thread_num) { |
89 | DelThread(id: std::this_thread::get_id()); |
90 | return; |
91 | } |
92 | continue; |
93 | } |
94 | if (!initialRun) { |
95 | --idle_thread_num; |
96 | } |
97 | task = std::move(tasks.front()); |
98 | tasks.pop(); |
99 | } |
100 | if (task) { |
101 | task(); |
102 | ++idle_thread_num; |
103 | if (initialRun) { |
104 | initialRun = false; |
105 | } |
106 | } |
107 | } |
108 | }); |
109 | AddThread(thread); |
110 | return true; |
111 | } |
112 | |
113 | void ThreadPool::AddThread(std::thread* thread) { |
114 | thread_mutex.lock(); |
115 | ++cur_thread_num; |
116 | ThreadData data; |
117 | data.thread = std::shared_ptr<std::thread>(thread); |
118 | data.id = thread->get_id(); |
119 | data.status = RUNNING; |
120 | data.start_time = time(timer: nullptr); |
121 | data.stop_time = 0; |
122 | threads.emplace_back(args&: data); |
123 | thread_mutex.unlock(); |
124 | } |
125 | |
126 | void ThreadPool::DelThread(std::thread::id id) { |
127 | const time_t now = time(timer: nullptr); |
128 | thread_mutex.lock(); |
129 | --cur_thread_num; |
130 | --idle_thread_num; |
131 | auto iter = threads.begin(); |
132 | while (iter != threads.end()) { |
133 | if (iter->status == STOP && now > iter->stop_time) { |
134 | if (iter->thread->joinable()) { |
135 | iter->thread->join(); |
136 | iter = threads.erase(position: iter); |
137 | continue; |
138 | } |
139 | } else if (iter->id == id) { |
140 | iter->status = STOP; |
141 | iter->stop_time = time(timer: nullptr); |
142 | } |
143 | ++iter; |
144 | } |
145 | thread_mutex.unlock(); |
146 | } |
147 | |
148 | } // namespace cpr |
149 |