1#include "cpr/threadpool.h"
2
3namespace cpr {
4
5ThreadPool::ThreadPool(size_t min_threads, size_t max_threads, std::chrono::milliseconds max_idle_ms) : min_thread_num(min_threads), max_thread_num(max_threads), max_idle_time(max_idle_ms), status(STOP), cur_thread_num(0), idle_thread_num(0) {}
6
7ThreadPool::~ThreadPool() {
8 Stop();
9}
10
11int ThreadPool::Start(size_t start_threads) {
12 if (status != STOP) {
13 return -1;
14 }
15 status = RUNNING;
16 if (start_threads < min_thread_num) {
17 start_threads = min_thread_num;
18 }
19 if (start_threads > max_thread_num) {
20 start_threads = max_thread_num;
21 }
22 for (size_t i = 0; i < start_threads; ++i) {
23 CreateThread();
24 }
25 return 0;
26}
27
28int ThreadPool::Stop() {
29 if (status == STOP) {
30 return -1;
31 }
32 status = STOP;
33 task_cond.notify_all();
34 for (auto& i : threads) {
35 if (i.thread->joinable()) {
36 i.thread->join();
37 }
38 }
39 threads.clear();
40 cur_thread_num = 0;
41 idle_thread_num = 0;
42 return 0;
43}
44
45int ThreadPool::Pause() {
46 if (status == RUNNING) {
47 status = PAUSE;
48 }
49 return 0;
50}
51
52int ThreadPool::Resume() {
53 if (status == PAUSE) {
54 status = RUNNING;
55 }
56 return 0;
57}
58
59int ThreadPool::Wait() {
60 while (true) {
61 if (status == STOP || (tasks.empty() && idle_thread_num == cur_thread_num)) {
62 break;
63 }
64 std::this_thread::yield();
65 }
66 return 0;
67}
68
69bool ThreadPool::CreateThread() {
70 if (cur_thread_num >= max_thread_num) {
71 return false;
72 }
73 std::thread* thread = new std::thread([this] {
74 bool initialRun = true;
75 while (status != STOP) {
76 while (status == PAUSE) {
77 std::this_thread::yield();
78 }
79
80 Task task;
81 {
82 std::unique_lock<std::mutex> locker(task_mutex);
83 task_cond.wait_for(lock&: locker, rtime: std::chrono::milliseconds(max_idle_time), p: [this]() { return status == STOP || !tasks.empty(); });
84 if (status == STOP) {
85 return;
86 }
87 if (tasks.empty()) {
88 if (cur_thread_num > min_thread_num) {
89 DelThread(id: std::this_thread::get_id());
90 return;
91 }
92 continue;
93 }
94 if (!initialRun) {
95 --idle_thread_num;
96 }
97 task = std::move(tasks.front());
98 tasks.pop();
99 }
100 if (task) {
101 task();
102 ++idle_thread_num;
103 if (initialRun) {
104 initialRun = false;
105 }
106 }
107 }
108 });
109 AddThread(thread);
110 return true;
111}
112
113void ThreadPool::AddThread(std::thread* thread) {
114 thread_mutex.lock();
115 ++cur_thread_num;
116 ThreadData data;
117 data.thread = std::shared_ptr<std::thread>(thread);
118 data.id = thread->get_id();
119 data.status = RUNNING;
120 data.start_time = time(timer: nullptr);
121 data.stop_time = 0;
122 threads.emplace_back(args&: data);
123 thread_mutex.unlock();
124}
125
126void ThreadPool::DelThread(std::thread::id id) {
127 const time_t now = time(timer: nullptr);
128 thread_mutex.lock();
129 --cur_thread_num;
130 --idle_thread_num;
131 auto iter = threads.begin();
132 while (iter != threads.end()) {
133 if (iter->status == STOP && now > iter->stop_time) {
134 if (iter->thread->joinable()) {
135 iter->thread->join();
136 iter = threads.erase(position: iter);
137 continue;
138 }
139 } else if (iter->id == id) {
140 iter->status = STOP;
141 iter->stop_time = time(timer: nullptr);
142 }
143 ++iter;
144 }
145 thread_mutex.unlock();
146}
147
148} // namespace cpr
149