1#pragma once
2
3#include <cstdint>
4#include <thread>
5#include <mutex>
6#include <condition_variable>
7#include <functional>
8#include <queue>
9#include <list>
10#include <optional>
11
12#include <Poco/Event.h>
13#include <Common/ThreadStatus.h>
14
15
16/** Very simple thread pool similar to boost::threadpool.
17 * Advantages:
18 * - catches exceptions and rethrows on wait.
19 *
20 * This thread pool can be used as a task queue.
21 * For example, you can create a thread pool with 10 threads (and queue of size 10) and schedule 1000 tasks
22 * - in this case you will be blocked to keep 10 tasks in fly.
23 *
24 * Thread: std::thread or something with identical interface.
25 */
26template <typename Thread>
27class ThreadPoolImpl
28{
29public:
30 using Job = std::function<void()>;
31
32 /// Size is constant. Up to num_threads are created on demand and then run until shutdown.
33 explicit ThreadPoolImpl(size_t max_threads_);
34
35 /// queue_size - maximum number of running plus scheduled jobs. It can be greater than max_threads. Zero means unlimited.
36 ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_);
37
38 /// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown.
39 /// If any thread was throw an exception, first exception will be rethrown from this method,
40 /// and exception will be cleared.
41 /// Also throws an exception if cannot create thread.
42 /// Priority: greater is higher.
43 /// NOTE: Probably you should call wait() if exception was thrown. If some previously scheduled jobs are using some objects,
44 /// located on stack of current thread, the stack must not be unwinded until all jobs finished. However,
45 /// if ThreadPool is a local object, it will wait for all scheduled jobs in own destructor.
46 void scheduleOrThrowOnError(Job job, int priority = 0);
47
48 /// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or return false.
49 bool trySchedule(Job job, int priority = 0, uint64_t wait_microseconds = 0) noexcept;
50
51 /// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or throw an exception.
52 void scheduleOrThrow(Job job, int priority = 0, uint64_t wait_microseconds = 0);
53
54 /// Wait for all currently active jobs to be done.
55 /// You may call schedule and wait many times in arbitrary order.
56 /// If any thread was throw an exception, first exception will be rethrown from this method,
57 /// and exception will be cleared.
58 void wait();
59
60 /// Waits for all threads. Doesn't rethrow exceptions (use 'wait' method to rethrow exceptions).
61 /// You should not destroy object while calling schedule or wait methods from another threads.
62 ~ThreadPoolImpl();
63
64 /// Returns number of running and scheduled jobs.
65 size_t active() const;
66
67 void setMaxThreads(size_t value);
68 void setMaxFreeThreads(size_t value);
69 void setQueueSize(size_t value);
70
71private:
72 mutable std::mutex mutex;
73 std::condition_variable job_finished;
74 std::condition_variable new_job_or_shutdown;
75
76 size_t max_threads;
77 size_t max_free_threads;
78 size_t queue_size;
79
80 size_t scheduled_jobs = 0;
81 bool shutdown = false;
82
83 struct JobWithPriority
84 {
85 Job job;
86 int priority;
87
88 JobWithPriority(Job job_, int priority_)
89 : job(job_), priority(priority_) {}
90
91 bool operator< (const JobWithPriority & rhs) const
92 {
93 return priority < rhs.priority;
94 }
95 };
96
97 std::priority_queue<JobWithPriority> jobs;
98 std::list<Thread> threads;
99 std::exception_ptr first_exception;
100
101
102 template <typename ReturnType>
103 ReturnType scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds);
104
105 void worker(typename std::list<Thread>::iterator thread_it);
106
107 void finalize();
108};
109
110
111/// ThreadPool with std::thread for threads.
112using FreeThreadPool = ThreadPoolImpl<std::thread>;
113
114
115/** Global ThreadPool that can be used as a singleton.
116 * Why it is needed?
117 *
118 * Linux can create and destroy about 100 000 threads per second (quite good).
119 * With simple ThreadPool (based on mutex and condvar) you can assign about 200 000 tasks per second
120 * - not much difference comparing to not using a thread pool at all.
121 *
122 * But if you reuse OS threads instead of creating and destroying them, several benefits exist:
123 * - allocator performance will usually be better due to reuse of thread local caches, especially for jemalloc:
124 * https://github.com/jemalloc/jemalloc/issues/1347
125 * - address sanitizer and thread sanitizer will not fail due to global limit on number of created threads.
126 * - program will work faster in gdb;
127 */
128class GlobalThreadPool : public FreeThreadPool, private boost::noncopyable
129{
130public:
131 GlobalThreadPool() : FreeThreadPool(10000, 1000, 10000) {}
132 static GlobalThreadPool & instance();
133};
134
135
136/** Looks like std::thread but allocates threads in GlobalThreadPool.
137 * Also holds ThreadStatus for ClickHouse.
138 */
139class ThreadFromGlobalPool
140{
141public:
142 ThreadFromGlobalPool() {}
143
144 template <typename Function, typename... Args>
145 explicit ThreadFromGlobalPool(Function && func, Args &&... args)
146 : state(std::make_shared<Poco::Event>())
147 {
148 /// NOTE: If this will throw an exception, the destructor won't be called.
149 GlobalThreadPool::instance().scheduleOrThrow([
150 state = state,
151 func = std::forward<Function>(func),
152 args = std::make_tuple(std::forward<Args>(args)...)]
153 {
154 {
155 DB::ThreadStatus thread_status;
156 std::apply(func, args);
157 }
158 state->set();
159 });
160 }
161
162 ThreadFromGlobalPool(ThreadFromGlobalPool && rhs)
163 {
164 *this = std::move(rhs);
165 }
166
167 ThreadFromGlobalPool & operator=(ThreadFromGlobalPool && rhs)
168 {
169 if (joinable())
170 std::terminate();
171 state = std::move(rhs.state);
172 return *this;
173 }
174
175 ~ThreadFromGlobalPool()
176 {
177 if (joinable())
178 std::terminate();
179 }
180
181 void join()
182 {
183 if (!joinable())
184 std::terminate();
185
186 state->wait();
187 state.reset();
188 }
189
190 void detach()
191 {
192 if (!joinable())
193 std::terminate();
194 state.reset();
195 }
196
197 bool joinable() const
198 {
199 return state != nullptr;
200 }
201
202private:
203 /// The state used in this object and inside the thread job.
204 std::shared_ptr<Poco::Event> state;
205};
206
207
208/// Recommended thread pool for the case when multiple thread pools are created and destroyed.
209using ThreadPool = ThreadPoolImpl<ThreadFromGlobalPool>;
210
211
212/// Allows to save first catched exception in jobs and postpone its rethrow.
213class ExceptionHandler
214{
215public:
216 void setException(std::exception_ptr && exception);
217 void throwIfException();
218
219private:
220 std::exception_ptr first_exception;
221 std::mutex mutex;
222};
223
224ThreadPool::Job createExceptionHandledJob(ThreadPool::Job job, ExceptionHandler & handler);
225