1#include <Common/ThreadPool.h>
2#include <Common/Exception.h>
3
4#include <type_traits>
5
6
7namespace DB
8{
9 namespace ErrorCodes
10 {
11 extern const int CANNOT_SCHEDULE_TASK;
12 }
13}
14
15namespace CurrentMetrics
16{
17 extern const Metric GlobalThread;
18 extern const Metric GlobalThreadActive;
19 extern const Metric LocalThread;
20 extern const Metric LocalThreadActive;
21}
22
23
24template <typename Thread>
25ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads_)
26 : ThreadPoolImpl(max_threads_, max_threads_, max_threads_)
27{
28}
29
30template <typename Thread>
31ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_)
32 : max_threads(max_threads_), max_free_threads(max_free_threads_), queue_size(queue_size_)
33{
34}
35
36template <typename Thread>
37void ThreadPoolImpl<Thread>::setMaxThreads(size_t value)
38{
39 std::lock_guard lock(mutex);
40 max_threads = value;
41}
42
43template <typename Thread>
44void ThreadPoolImpl<Thread>::setMaxFreeThreads(size_t value)
45{
46 std::lock_guard lock(mutex);
47 max_free_threads = value;
48}
49
50template <typename Thread>
51void ThreadPoolImpl<Thread>::setQueueSize(size_t value)
52{
53 std::lock_guard lock(mutex);
54 queue_size = value;
55}
56
57
58template <typename Thread>
59template <typename ReturnType>
60ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds)
61{
62 auto on_error = [&]
63 {
64 if constexpr (std::is_same_v<ReturnType, void>)
65 {
66 if (first_exception)
67 {
68 std::exception_ptr exception;
69 std::swap(exception, first_exception);
70 std::rethrow_exception(exception);
71 }
72 throw DB::Exception("Cannot schedule a task", DB::ErrorCodes::CANNOT_SCHEDULE_TASK);
73 }
74 else
75 return false;
76 };
77
78 {
79 std::unique_lock lock(mutex);
80
81 auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; };
82
83 if (wait_microseconds) /// Check for optional. Condition is true if the optional is set and the value is zero.
84 {
85 if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred))
86 return on_error();
87 }
88 else
89 job_finished.wait(lock, pred);
90
91 if (shutdown)
92 return on_error();
93
94 jobs.emplace(std::move(job), priority);
95 ++scheduled_jobs;
96
97 if (threads.size() < std::min(max_threads, scheduled_jobs))
98 {
99 threads.emplace_front();
100 try
101 {
102 threads.front() = Thread([this, it = threads.begin()] { worker(it); });
103 }
104 catch (...)
105 {
106 threads.pop_front();
107
108 /// Remove the job and return error to caller.
109 /// Note that if we have allocated at least one thread, we may continue
110 /// (one thread is enough to process all jobs).
111 /// But this condition indicate an error nevertheless and better to refuse.
112
113 jobs.pop();
114 --scheduled_jobs;
115 return on_error();
116 }
117 }
118 }
119 new_job_or_shutdown.notify_one();
120 return ReturnType(true);
121}
122
123template <typename Thread>
124void ThreadPoolImpl<Thread>::scheduleOrThrowOnError(Job job, int priority)
125{
126 scheduleImpl<void>(std::move(job), priority, std::nullopt);
127}
128
129template <typename Thread>
130bool ThreadPoolImpl<Thread>::trySchedule(Job job, int priority, uint64_t wait_microseconds) noexcept
131{
132 return scheduleImpl<bool>(std::move(job), priority, wait_microseconds);
133}
134
135template <typename Thread>
136void ThreadPoolImpl<Thread>::scheduleOrThrow(Job job, int priority, uint64_t wait_microseconds)
137{
138 scheduleImpl<void>(std::move(job), priority, wait_microseconds);
139}
140
141template <typename Thread>
142void ThreadPoolImpl<Thread>::wait()
143{
144 {
145 std::unique_lock lock(mutex);
146 job_finished.wait(lock, [this] { return scheduled_jobs == 0; });
147
148 if (first_exception)
149 {
150 std::exception_ptr exception;
151 std::swap(exception, first_exception);
152 std::rethrow_exception(exception);
153 }
154 }
155}
156
157template <typename Thread>
158ThreadPoolImpl<Thread>::~ThreadPoolImpl()
159{
160 finalize();
161}
162
163template <typename Thread>
164void ThreadPoolImpl<Thread>::finalize()
165{
166 {
167 std::unique_lock lock(mutex);
168 shutdown = true;
169 }
170
171 new_job_or_shutdown.notify_all();
172
173 for (auto & thread : threads)
174 thread.join();
175
176 threads.clear();
177}
178
179template <typename Thread>
180size_t ThreadPoolImpl<Thread>::active() const
181{
182 std::unique_lock lock(mutex);
183 return scheduled_jobs;
184}
185
186template <typename Thread>
187void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_it)
188{
189 CurrentMetrics::Increment metric_all_threads(
190 std::is_same_v<Thread, std::thread> ? CurrentMetrics::GlobalThread : CurrentMetrics::LocalThread);
191
192 while (true)
193 {
194 Job job;
195 bool need_shutdown = false;
196
197 {
198 std::unique_lock lock(mutex);
199 new_job_or_shutdown.wait(lock, [this] { return shutdown || !jobs.empty(); });
200 need_shutdown = shutdown;
201
202 if (!jobs.empty())
203 {
204 job = jobs.top().job;
205 jobs.pop();
206 }
207 else
208 {
209 /// shutdown is true, simply finish the thread.
210 return;
211 }
212 }
213
214 if (!need_shutdown)
215 {
216 try
217 {
218 CurrentMetrics::Increment metric_active_threads(
219 std::is_same_v<Thread, std::thread> ? CurrentMetrics::GlobalThreadActive : CurrentMetrics::LocalThreadActive);
220
221 job();
222 }
223 catch (...)
224 {
225 {
226 std::unique_lock lock(mutex);
227 if (!first_exception)
228 first_exception = std::current_exception();
229 shutdown = true;
230 --scheduled_jobs;
231 }
232 job_finished.notify_all();
233 new_job_or_shutdown.notify_all();
234 return;
235 }
236 }
237
238 {
239 std::unique_lock lock(mutex);
240 --scheduled_jobs;
241
242 if (threads.size() > scheduled_jobs + max_free_threads)
243 {
244 thread_it->detach();
245 threads.erase(thread_it);
246 job_finished.notify_all();
247 return;
248 }
249 }
250
251 job_finished.notify_all();
252 }
253}
254
255
256template class ThreadPoolImpl<std::thread>;
257template class ThreadPoolImpl<ThreadFromGlobalPool>;
258
259
260void ExceptionHandler::setException(std::exception_ptr && exception)
261{
262 std::unique_lock lock(mutex);
263 if (!first_exception)
264 first_exception = std::move(exception);
265}
266
267void ExceptionHandler::throwIfException()
268{
269 std::unique_lock lock(mutex);
270 if (first_exception)
271 std::rethrow_exception(first_exception);
272}
273
274
275ThreadPool::Job createExceptionHandledJob(ThreadPool::Job job, ExceptionHandler & handler)
276{
277 return [job{std::move(job)}, &handler] ()
278 {
279 try
280 {
281 job();
282 }
283 catch (...)
284 {
285 handler.setException(std::current_exception());
286 }
287 };
288}
289
290GlobalThreadPool & GlobalThreadPool::instance()
291{
292 static GlobalThreadPool ret;
293 return ret;
294}
295