1 | #include <Common/ThreadPool.h> |
2 | #include <Common/Exception.h> |
3 | |
4 | #include <type_traits> |
5 | |
6 | |
7 | namespace DB |
8 | { |
9 | namespace ErrorCodes |
10 | { |
11 | extern const int CANNOT_SCHEDULE_TASK; |
12 | } |
13 | } |
14 | |
15 | namespace CurrentMetrics |
16 | { |
17 | extern const Metric GlobalThread; |
18 | extern const Metric GlobalThreadActive; |
19 | extern const Metric LocalThread; |
20 | extern const Metric LocalThreadActive; |
21 | } |
22 | |
23 | |
24 | template <typename Thread> |
25 | ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads_) |
26 | : ThreadPoolImpl(max_threads_, max_threads_, max_threads_) |
27 | { |
28 | } |
29 | |
30 | template <typename Thread> |
31 | ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_) |
32 | : max_threads(max_threads_), max_free_threads(max_free_threads_), queue_size(queue_size_) |
33 | { |
34 | } |
35 | |
36 | template <typename Thread> |
37 | void ThreadPoolImpl<Thread>::setMaxThreads(size_t value) |
38 | { |
39 | std::lock_guard lock(mutex); |
40 | max_threads = value; |
41 | } |
42 | |
43 | template <typename Thread> |
44 | void ThreadPoolImpl<Thread>::setMaxFreeThreads(size_t value) |
45 | { |
46 | std::lock_guard lock(mutex); |
47 | max_free_threads = value; |
48 | } |
49 | |
50 | template <typename Thread> |
51 | void ThreadPoolImpl<Thread>::setQueueSize(size_t value) |
52 | { |
53 | std::lock_guard lock(mutex); |
54 | queue_size = value; |
55 | } |
56 | |
57 | |
58 | template <typename Thread> |
59 | template <typename ReturnType> |
60 | ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds) |
61 | { |
62 | auto on_error = [&] |
63 | { |
64 | if constexpr (std::is_same_v<ReturnType, void>) |
65 | { |
66 | if (first_exception) |
67 | { |
68 | std::exception_ptr exception; |
69 | std::swap(exception, first_exception); |
70 | std::rethrow_exception(exception); |
71 | } |
72 | throw DB::Exception("Cannot schedule a task" , DB::ErrorCodes::CANNOT_SCHEDULE_TASK); |
73 | } |
74 | else |
75 | return false; |
76 | }; |
77 | |
78 | { |
79 | std::unique_lock lock(mutex); |
80 | |
81 | auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; }; |
82 | |
83 | if (wait_microseconds) /// Check for optional. Condition is true if the optional is set and the value is zero. |
84 | { |
85 | if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred)) |
86 | return on_error(); |
87 | } |
88 | else |
89 | job_finished.wait(lock, pred); |
90 | |
91 | if (shutdown) |
92 | return on_error(); |
93 | |
94 | jobs.emplace(std::move(job), priority); |
95 | ++scheduled_jobs; |
96 | |
97 | if (threads.size() < std::min(max_threads, scheduled_jobs)) |
98 | { |
99 | threads.emplace_front(); |
100 | try |
101 | { |
102 | threads.front() = Thread([this, it = threads.begin()] { worker(it); }); |
103 | } |
104 | catch (...) |
105 | { |
106 | threads.pop_front(); |
107 | |
108 | /// Remove the job and return error to caller. |
109 | /// Note that if we have allocated at least one thread, we may continue |
110 | /// (one thread is enough to process all jobs). |
111 | /// But this condition indicate an error nevertheless and better to refuse. |
112 | |
113 | jobs.pop(); |
114 | --scheduled_jobs; |
115 | return on_error(); |
116 | } |
117 | } |
118 | } |
119 | new_job_or_shutdown.notify_one(); |
120 | return ReturnType(true); |
121 | } |
122 | |
123 | template <typename Thread> |
124 | void ThreadPoolImpl<Thread>::scheduleOrThrowOnError(Job job, int priority) |
125 | { |
126 | scheduleImpl<void>(std::move(job), priority, std::nullopt); |
127 | } |
128 | |
129 | template <typename Thread> |
130 | bool ThreadPoolImpl<Thread>::trySchedule(Job job, int priority, uint64_t wait_microseconds) noexcept |
131 | { |
132 | return scheduleImpl<bool>(std::move(job), priority, wait_microseconds); |
133 | } |
134 | |
135 | template <typename Thread> |
136 | void ThreadPoolImpl<Thread>::scheduleOrThrow(Job job, int priority, uint64_t wait_microseconds) |
137 | { |
138 | scheduleImpl<void>(std::move(job), priority, wait_microseconds); |
139 | } |
140 | |
141 | template <typename Thread> |
142 | void ThreadPoolImpl<Thread>::wait() |
143 | { |
144 | { |
145 | std::unique_lock lock(mutex); |
146 | job_finished.wait(lock, [this] { return scheduled_jobs == 0; }); |
147 | |
148 | if (first_exception) |
149 | { |
150 | std::exception_ptr exception; |
151 | std::swap(exception, first_exception); |
152 | std::rethrow_exception(exception); |
153 | } |
154 | } |
155 | } |
156 | |
157 | template <typename Thread> |
158 | ThreadPoolImpl<Thread>::~ThreadPoolImpl() |
159 | { |
160 | finalize(); |
161 | } |
162 | |
163 | template <typename Thread> |
164 | void ThreadPoolImpl<Thread>::finalize() |
165 | { |
166 | { |
167 | std::unique_lock lock(mutex); |
168 | shutdown = true; |
169 | } |
170 | |
171 | new_job_or_shutdown.notify_all(); |
172 | |
173 | for (auto & thread : threads) |
174 | thread.join(); |
175 | |
176 | threads.clear(); |
177 | } |
178 | |
179 | template <typename Thread> |
180 | size_t ThreadPoolImpl<Thread>::active() const |
181 | { |
182 | std::unique_lock lock(mutex); |
183 | return scheduled_jobs; |
184 | } |
185 | |
186 | template <typename Thread> |
187 | void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_it) |
188 | { |
189 | CurrentMetrics::Increment metric_all_threads( |
190 | std::is_same_v<Thread, std::thread> ? CurrentMetrics::GlobalThread : CurrentMetrics::LocalThread); |
191 | |
192 | while (true) |
193 | { |
194 | Job job; |
195 | bool need_shutdown = false; |
196 | |
197 | { |
198 | std::unique_lock lock(mutex); |
199 | new_job_or_shutdown.wait(lock, [this] { return shutdown || !jobs.empty(); }); |
200 | need_shutdown = shutdown; |
201 | |
202 | if (!jobs.empty()) |
203 | { |
204 | job = jobs.top().job; |
205 | jobs.pop(); |
206 | } |
207 | else |
208 | { |
209 | /// shutdown is true, simply finish the thread. |
210 | return; |
211 | } |
212 | } |
213 | |
214 | if (!need_shutdown) |
215 | { |
216 | try |
217 | { |
218 | CurrentMetrics::Increment metric_active_threads( |
219 | std::is_same_v<Thread, std::thread> ? CurrentMetrics::GlobalThreadActive : CurrentMetrics::LocalThreadActive); |
220 | |
221 | job(); |
222 | } |
223 | catch (...) |
224 | { |
225 | { |
226 | std::unique_lock lock(mutex); |
227 | if (!first_exception) |
228 | first_exception = std::current_exception(); |
229 | shutdown = true; |
230 | --scheduled_jobs; |
231 | } |
232 | job_finished.notify_all(); |
233 | new_job_or_shutdown.notify_all(); |
234 | return; |
235 | } |
236 | } |
237 | |
238 | { |
239 | std::unique_lock lock(mutex); |
240 | --scheduled_jobs; |
241 | |
242 | if (threads.size() > scheduled_jobs + max_free_threads) |
243 | { |
244 | thread_it->detach(); |
245 | threads.erase(thread_it); |
246 | job_finished.notify_all(); |
247 | return; |
248 | } |
249 | } |
250 | |
251 | job_finished.notify_all(); |
252 | } |
253 | } |
254 | |
255 | |
256 | template class ThreadPoolImpl<std::thread>; |
257 | template class ThreadPoolImpl<ThreadFromGlobalPool>; |
258 | |
259 | |
260 | void ExceptionHandler::setException(std::exception_ptr && exception) |
261 | { |
262 | std::unique_lock lock(mutex); |
263 | if (!first_exception) |
264 | first_exception = std::move(exception); |
265 | } |
266 | |
267 | void ExceptionHandler::throwIfException() |
268 | { |
269 | std::unique_lock lock(mutex); |
270 | if (first_exception) |
271 | std::rethrow_exception(first_exception); |
272 | } |
273 | |
274 | |
275 | ThreadPool::Job createExceptionHandledJob(ThreadPool::Job job, ExceptionHandler & handler) |
276 | { |
277 | return [job{std::move(job)}, &handler] () |
278 | { |
279 | try |
280 | { |
281 | job(); |
282 | } |
283 | catch (...) |
284 | { |
285 | handler.setException(std::current_exception()); |
286 | } |
287 | }; |
288 | } |
289 | |
290 | GlobalThreadPool & GlobalThreadPool::instance() |
291 | { |
292 | static GlobalThreadPool ret; |
293 | return ret; |
294 | } |
295 | |