1 | #include "duckdb/parallel/task_scheduler.hpp" |
2 | |
3 | #include "duckdb/common/exception.hpp" |
4 | #include "duckdb/main/client_context.hpp" |
5 | #include "duckdb/main/database.hpp" |
6 | |
7 | #ifndef DUCKDB_NO_THREADS |
8 | #include "concurrentqueue.h" |
9 | #include "lightweightsemaphore.h" |
10 | #include "duckdb/common/thread.hpp" |
11 | #else |
12 | #include <queue> |
13 | #endif |
14 | |
15 | namespace duckdb { |
16 | |
17 | struct SchedulerThread { |
18 | #ifndef DUCKDB_NO_THREADS |
19 | explicit SchedulerThread(unique_ptr<thread> thread_p) : internal_thread(std::move(thread_p)) { |
20 | } |
21 | |
22 | unique_ptr<thread> internal_thread; |
23 | #endif |
24 | }; |
25 | |
26 | #ifndef DUCKDB_NO_THREADS |
27 | typedef duckdb_moodycamel::ConcurrentQueue<shared_ptr<Task>> concurrent_queue_t; |
28 | typedef duckdb_moodycamel::LightweightSemaphore lightweight_semaphore_t; |
29 | |
30 | struct ConcurrentQueue { |
31 | concurrent_queue_t q; |
32 | lightweight_semaphore_t semaphore; |
33 | |
34 | void Enqueue(ProducerToken &token, shared_ptr<Task> task); |
35 | bool DequeueFromProducer(ProducerToken &token, shared_ptr<Task> &task); |
36 | }; |
37 | |
38 | struct QueueProducerToken { |
39 | explicit QueueProducerToken(ConcurrentQueue &queue) : queue_token(queue.q) { |
40 | } |
41 | |
42 | duckdb_moodycamel::ProducerToken queue_token; |
43 | }; |
44 | |
45 | void ConcurrentQueue::Enqueue(ProducerToken &token, shared_ptr<Task> task) { |
46 | lock_guard<mutex> producer_lock(token.producer_lock); |
47 | if (q.enqueue(token: token.token->queue_token, item: std::move(task))) { |
48 | semaphore.signal(); |
49 | } else { |
50 | throw InternalException("Could not schedule task!" ); |
51 | } |
52 | } |
53 | |
54 | bool ConcurrentQueue::DequeueFromProducer(ProducerToken &token, shared_ptr<Task> &task) { |
55 | lock_guard<mutex> producer_lock(token.producer_lock); |
56 | return q.try_dequeue_from_producer(producer: token.token->queue_token, item&: task); |
57 | } |
58 | |
59 | #else |
60 | struct ConcurrentQueue { |
61 | std::queue<shared_ptr<Task>> q; |
62 | mutex qlock; |
63 | |
64 | void Enqueue(ProducerToken &token, shared_ptr<Task> task); |
65 | bool DequeueFromProducer(ProducerToken &token, shared_ptr<Task> &task); |
66 | }; |
67 | |
68 | void ConcurrentQueue::Enqueue(ProducerToken &token, shared_ptr<Task> task) { |
69 | lock_guard<mutex> lock(qlock); |
70 | q.push(std::move(task)); |
71 | } |
72 | |
73 | bool ConcurrentQueue::DequeueFromProducer(ProducerToken &token, shared_ptr<Task> &task) { |
74 | lock_guard<mutex> lock(qlock); |
75 | if (q.empty()) { |
76 | return false; |
77 | } |
78 | task = std::move(q.front()); |
79 | q.pop(); |
80 | return true; |
81 | } |
82 | |
83 | struct QueueProducerToken { |
84 | QueueProducerToken(ConcurrentQueue &queue) { |
85 | } |
86 | }; |
87 | #endif |
88 | |
89 | ProducerToken::ProducerToken(TaskScheduler &scheduler, unique_ptr<QueueProducerToken> token) |
90 | : scheduler(scheduler), token(std::move(token)) { |
91 | } |
92 | |
93 | ProducerToken::~ProducerToken() { |
94 | } |
95 | |
96 | TaskScheduler::TaskScheduler(DatabaseInstance &db) : db(db), queue(make_uniq<ConcurrentQueue>()) { |
97 | } |
98 | |
99 | TaskScheduler::~TaskScheduler() { |
100 | #ifndef DUCKDB_NO_THREADS |
101 | SetThreadsInternal(1); |
102 | #endif |
103 | } |
104 | |
105 | TaskScheduler &TaskScheduler::GetScheduler(ClientContext &context) { |
106 | return TaskScheduler::GetScheduler(db&: DatabaseInstance::GetDatabase(context)); |
107 | } |
108 | |
109 | TaskScheduler &TaskScheduler::GetScheduler(DatabaseInstance &db) { |
110 | return db.GetScheduler(); |
111 | } |
112 | |
113 | unique_ptr<ProducerToken> TaskScheduler::CreateProducer() { |
114 | auto token = make_uniq<QueueProducerToken>(args&: *queue); |
115 | return make_uniq<ProducerToken>(args&: *this, args: std::move(token)); |
116 | } |
117 | |
118 | void TaskScheduler::ScheduleTask(ProducerToken &token, shared_ptr<Task> task) { |
119 | // Enqueue a task for the given producer token and signal any sleeping threads |
120 | queue->Enqueue(token, task: std::move(task)); |
121 | } |
122 | |
123 | bool TaskScheduler::GetTaskFromProducer(ProducerToken &token, shared_ptr<Task> &task) { |
124 | return queue->DequeueFromProducer(token, task); |
125 | } |
126 | |
127 | void TaskScheduler::ExecuteForever(atomic<bool> *marker) { |
128 | #ifndef DUCKDB_NO_THREADS |
129 | shared_ptr<Task> task; |
130 | // loop until the marker is set to false |
131 | while (*marker) { |
132 | // wait for a signal with a timeout |
133 | queue->semaphore.wait(); |
134 | if (queue->q.try_dequeue(item&: task)) { |
135 | auto execute_result = task->Execute(mode: TaskExecutionMode::PROCESS_ALL); |
136 | |
137 | switch (execute_result) { |
138 | case TaskExecutionResult::TASK_FINISHED: |
139 | case TaskExecutionResult::TASK_ERROR: |
140 | task.reset(); |
141 | break; |
142 | case TaskExecutionResult::TASK_NOT_FINISHED: |
143 | throw InternalException("Task should not return TASK_NOT_FINISHED in PROCESS_ALL mode" ); |
144 | case TaskExecutionResult::TASK_BLOCKED: |
145 | task->Deschedule(); |
146 | task.reset(); |
147 | break; |
148 | } |
149 | } |
150 | } |
151 | #else |
152 | throw NotImplementedException("DuckDB was compiled without threads! Background thread loop is not allowed." ); |
153 | #endif |
154 | } |
155 | |
156 | idx_t TaskScheduler::ExecuteTasks(atomic<bool> *marker, idx_t max_tasks) { |
157 | #ifndef DUCKDB_NO_THREADS |
158 | idx_t completed_tasks = 0; |
159 | // loop until the marker is set to false |
160 | while (*marker && completed_tasks < max_tasks) { |
161 | shared_ptr<Task> task; |
162 | if (!queue->q.try_dequeue(item&: task)) { |
163 | return completed_tasks; |
164 | } |
165 | auto execute_result = task->Execute(mode: TaskExecutionMode::PROCESS_ALL); |
166 | |
167 | switch (execute_result) { |
168 | case TaskExecutionResult::TASK_FINISHED: |
169 | case TaskExecutionResult::TASK_ERROR: |
170 | task.reset(); |
171 | completed_tasks++; |
172 | break; |
173 | case TaskExecutionResult::TASK_NOT_FINISHED: |
174 | throw InternalException("Task should not return TASK_NOT_FINISHED in PROCESS_ALL mode" ); |
175 | case TaskExecutionResult::TASK_BLOCKED: |
176 | task->Deschedule(); |
177 | task.reset(); |
178 | break; |
179 | } |
180 | } |
181 | return completed_tasks; |
182 | #else |
183 | throw NotImplementedException("DuckDB was compiled without threads! Background thread loop is not allowed." ); |
184 | #endif |
185 | } |
186 | |
187 | void TaskScheduler::ExecuteTasks(idx_t max_tasks) { |
188 | #ifndef DUCKDB_NO_THREADS |
189 | shared_ptr<Task> task; |
190 | for (idx_t i = 0; i < max_tasks; i++) { |
191 | queue->semaphore.wait(timeout_usecs: TASK_TIMEOUT_USECS); |
192 | if (!queue->q.try_dequeue(item&: task)) { |
193 | return; |
194 | } |
195 | try { |
196 | auto execute_result = task->Execute(mode: TaskExecutionMode::PROCESS_ALL); |
197 | switch (execute_result) { |
198 | case TaskExecutionResult::TASK_FINISHED: |
199 | case TaskExecutionResult::TASK_ERROR: |
200 | task.reset(); |
201 | break; |
202 | case TaskExecutionResult::TASK_NOT_FINISHED: |
203 | throw InternalException("Task should not return TASK_NOT_FINISHED in PROCESS_ALL mode" ); |
204 | case TaskExecutionResult::TASK_BLOCKED: |
205 | task->Deschedule(); |
206 | task.reset(); |
207 | break; |
208 | } |
209 | } catch (...) { |
210 | return; |
211 | } |
212 | } |
213 | #else |
214 | throw NotImplementedException("DuckDB was compiled without threads! Background thread loop is not allowed." ); |
215 | #endif |
216 | } |
217 | |
218 | #ifndef DUCKDB_NO_THREADS |
219 | static void ThreadExecuteTasks(TaskScheduler *scheduler, atomic<bool> *marker) { |
220 | scheduler->ExecuteForever(marker); |
221 | } |
222 | #endif |
223 | |
224 | int32_t TaskScheduler::NumberOfThreads() { |
225 | lock_guard<mutex> t(thread_lock); |
226 | auto &config = DBConfig::GetConfig(db); |
227 | return threads.size() + config.options.external_threads + 1; |
228 | } |
229 | |
230 | void TaskScheduler::SetThreads(int32_t n) { |
231 | #ifndef DUCKDB_NO_THREADS |
232 | lock_guard<mutex> t(thread_lock); |
233 | if (n < 1) { |
234 | throw SyntaxException("Must have at least 1 thread!" ); |
235 | } |
236 | SetThreadsInternal(n); |
237 | #else |
238 | if (n != 1) { |
239 | throw NotImplementedException("DuckDB was compiled without threads! Setting threads > 1 is not allowed." ); |
240 | } |
241 | #endif |
242 | } |
243 | |
244 | void TaskScheduler::Signal(idx_t n) { |
245 | #ifndef DUCKDB_NO_THREADS |
246 | queue->semaphore.signal(count: n); |
247 | #endif |
248 | } |
249 | |
250 | void TaskScheduler::SetThreadsInternal(int32_t n) { |
251 | #ifndef DUCKDB_NO_THREADS |
252 | if (threads.size() == idx_t(n - 1)) { |
253 | return; |
254 | } |
255 | idx_t new_thread_count = n - 1; |
256 | if (threads.size() > new_thread_count) { |
257 | // we are reducing the number of threads: clear all threads first |
258 | for (idx_t i = 0; i < threads.size(); i++) { |
259 | *markers[i] = false; |
260 | } |
261 | Signal(n: threads.size()); |
262 | // now join the threads to ensure they are fully stopped before erasing them |
263 | for (idx_t i = 0; i < threads.size(); i++) { |
264 | threads[i]->internal_thread->join(); |
265 | } |
266 | // erase the threads/markers |
267 | threads.clear(); |
268 | markers.clear(); |
269 | } |
270 | if (threads.size() < new_thread_count) { |
271 | // we are increasing the number of threads: launch them and run tasks on them |
272 | idx_t create_new_threads = new_thread_count - threads.size(); |
273 | for (idx_t i = 0; i < create_new_threads; i++) { |
274 | // launch a thread and assign it a cancellation marker |
275 | auto marker = unique_ptr<atomic<bool>>(new atomic<bool>(true)); |
276 | auto worker_thread = make_uniq<thread>(args&: ThreadExecuteTasks, args: this, args: marker.get()); |
277 | auto thread_wrapper = make_uniq<SchedulerThread>(args: std::move(worker_thread)); |
278 | |
279 | threads.push_back(x: std::move(thread_wrapper)); |
280 | markers.push_back(x: std::move(marker)); |
281 | } |
282 | } |
283 | #endif |
284 | } |
285 | |
286 | } // namespace duckdb |
287 | |