1#include "duckdb/parallel/task_scheduler.hpp"
2
3#include "duckdb/common/exception.hpp"
4#include "duckdb/main/client_context.hpp"
5#include "duckdb/main/database.hpp"
6
7#ifndef DUCKDB_NO_THREADS
8#include "concurrentqueue.h"
9#include "lightweightsemaphore.h"
10#include "duckdb/common/thread.hpp"
11#else
12#include <queue>
13#endif
14
15namespace duckdb {
16
17struct SchedulerThread {
18#ifndef DUCKDB_NO_THREADS
19 explicit SchedulerThread(unique_ptr<thread> thread_p) : internal_thread(std::move(thread_p)) {
20 }
21
22 unique_ptr<thread> internal_thread;
23#endif
24};
25
26#ifndef DUCKDB_NO_THREADS
27typedef duckdb_moodycamel::ConcurrentQueue<shared_ptr<Task>> concurrent_queue_t;
28typedef duckdb_moodycamel::LightweightSemaphore lightweight_semaphore_t;
29
30struct ConcurrentQueue {
31 concurrent_queue_t q;
32 lightweight_semaphore_t semaphore;
33
34 void Enqueue(ProducerToken &token, shared_ptr<Task> task);
35 bool DequeueFromProducer(ProducerToken &token, shared_ptr<Task> &task);
36};
37
38struct QueueProducerToken {
39 explicit QueueProducerToken(ConcurrentQueue &queue) : queue_token(queue.q) {
40 }
41
42 duckdb_moodycamel::ProducerToken queue_token;
43};
44
45void ConcurrentQueue::Enqueue(ProducerToken &token, shared_ptr<Task> task) {
46 lock_guard<mutex> producer_lock(token.producer_lock);
47 if (q.enqueue(token: token.token->queue_token, item: std::move(task))) {
48 semaphore.signal();
49 } else {
50 throw InternalException("Could not schedule task!");
51 }
52}
53
54bool ConcurrentQueue::DequeueFromProducer(ProducerToken &token, shared_ptr<Task> &task) {
55 lock_guard<mutex> producer_lock(token.producer_lock);
56 return q.try_dequeue_from_producer(producer: token.token->queue_token, item&: task);
57}
58
59#else
60struct ConcurrentQueue {
61 std::queue<shared_ptr<Task>> q;
62 mutex qlock;
63
64 void Enqueue(ProducerToken &token, shared_ptr<Task> task);
65 bool DequeueFromProducer(ProducerToken &token, shared_ptr<Task> &task);
66};
67
68void ConcurrentQueue::Enqueue(ProducerToken &token, shared_ptr<Task> task) {
69 lock_guard<mutex> lock(qlock);
70 q.push(std::move(task));
71}
72
73bool ConcurrentQueue::DequeueFromProducer(ProducerToken &token, shared_ptr<Task> &task) {
74 lock_guard<mutex> lock(qlock);
75 if (q.empty()) {
76 return false;
77 }
78 task = std::move(q.front());
79 q.pop();
80 return true;
81}
82
83struct QueueProducerToken {
84 QueueProducerToken(ConcurrentQueue &queue) {
85 }
86};
87#endif
88
89ProducerToken::ProducerToken(TaskScheduler &scheduler, unique_ptr<QueueProducerToken> token)
90 : scheduler(scheduler), token(std::move(token)) {
91}
92
93ProducerToken::~ProducerToken() {
94}
95
96TaskScheduler::TaskScheduler(DatabaseInstance &db) : db(db), queue(make_uniq<ConcurrentQueue>()) {
97}
98
99TaskScheduler::~TaskScheduler() {
100#ifndef DUCKDB_NO_THREADS
101 SetThreadsInternal(1);
102#endif
103}
104
105TaskScheduler &TaskScheduler::GetScheduler(ClientContext &context) {
106 return TaskScheduler::GetScheduler(db&: DatabaseInstance::GetDatabase(context));
107}
108
109TaskScheduler &TaskScheduler::GetScheduler(DatabaseInstance &db) {
110 return db.GetScheduler();
111}
112
113unique_ptr<ProducerToken> TaskScheduler::CreateProducer() {
114 auto token = make_uniq<QueueProducerToken>(args&: *queue);
115 return make_uniq<ProducerToken>(args&: *this, args: std::move(token));
116}
117
118void TaskScheduler::ScheduleTask(ProducerToken &token, shared_ptr<Task> task) {
119 // Enqueue a task for the given producer token and signal any sleeping threads
120 queue->Enqueue(token, task: std::move(task));
121}
122
123bool TaskScheduler::GetTaskFromProducer(ProducerToken &token, shared_ptr<Task> &task) {
124 return queue->DequeueFromProducer(token, task);
125}
126
127void TaskScheduler::ExecuteForever(atomic<bool> *marker) {
128#ifndef DUCKDB_NO_THREADS
129 shared_ptr<Task> task;
130 // loop until the marker is set to false
131 while (*marker) {
132 // wait for a signal with a timeout
133 queue->semaphore.wait();
134 if (queue->q.try_dequeue(item&: task)) {
135 auto execute_result = task->Execute(mode: TaskExecutionMode::PROCESS_ALL);
136
137 switch (execute_result) {
138 case TaskExecutionResult::TASK_FINISHED:
139 case TaskExecutionResult::TASK_ERROR:
140 task.reset();
141 break;
142 case TaskExecutionResult::TASK_NOT_FINISHED:
143 throw InternalException("Task should not return TASK_NOT_FINISHED in PROCESS_ALL mode");
144 case TaskExecutionResult::TASK_BLOCKED:
145 task->Deschedule();
146 task.reset();
147 break;
148 }
149 }
150 }
151#else
152 throw NotImplementedException("DuckDB was compiled without threads! Background thread loop is not allowed.");
153#endif
154}
155
156idx_t TaskScheduler::ExecuteTasks(atomic<bool> *marker, idx_t max_tasks) {
157#ifndef DUCKDB_NO_THREADS
158 idx_t completed_tasks = 0;
159 // loop until the marker is set to false
160 while (*marker && completed_tasks < max_tasks) {
161 shared_ptr<Task> task;
162 if (!queue->q.try_dequeue(item&: task)) {
163 return completed_tasks;
164 }
165 auto execute_result = task->Execute(mode: TaskExecutionMode::PROCESS_ALL);
166
167 switch (execute_result) {
168 case TaskExecutionResult::TASK_FINISHED:
169 case TaskExecutionResult::TASK_ERROR:
170 task.reset();
171 completed_tasks++;
172 break;
173 case TaskExecutionResult::TASK_NOT_FINISHED:
174 throw InternalException("Task should not return TASK_NOT_FINISHED in PROCESS_ALL mode");
175 case TaskExecutionResult::TASK_BLOCKED:
176 task->Deschedule();
177 task.reset();
178 break;
179 }
180 }
181 return completed_tasks;
182#else
183 throw NotImplementedException("DuckDB was compiled without threads! Background thread loop is not allowed.");
184#endif
185}
186
187void TaskScheduler::ExecuteTasks(idx_t max_tasks) {
188#ifndef DUCKDB_NO_THREADS
189 shared_ptr<Task> task;
190 for (idx_t i = 0; i < max_tasks; i++) {
191 queue->semaphore.wait(timeout_usecs: TASK_TIMEOUT_USECS);
192 if (!queue->q.try_dequeue(item&: task)) {
193 return;
194 }
195 try {
196 auto execute_result = task->Execute(mode: TaskExecutionMode::PROCESS_ALL);
197 switch (execute_result) {
198 case TaskExecutionResult::TASK_FINISHED:
199 case TaskExecutionResult::TASK_ERROR:
200 task.reset();
201 break;
202 case TaskExecutionResult::TASK_NOT_FINISHED:
203 throw InternalException("Task should not return TASK_NOT_FINISHED in PROCESS_ALL mode");
204 case TaskExecutionResult::TASK_BLOCKED:
205 task->Deschedule();
206 task.reset();
207 break;
208 }
209 } catch (...) {
210 return;
211 }
212 }
213#else
214 throw NotImplementedException("DuckDB was compiled without threads! Background thread loop is not allowed.");
215#endif
216}
217
218#ifndef DUCKDB_NO_THREADS
219static void ThreadExecuteTasks(TaskScheduler *scheduler, atomic<bool> *marker) {
220 scheduler->ExecuteForever(marker);
221}
222#endif
223
224int32_t TaskScheduler::NumberOfThreads() {
225 lock_guard<mutex> t(thread_lock);
226 auto &config = DBConfig::GetConfig(db);
227 return threads.size() + config.options.external_threads + 1;
228}
229
230void TaskScheduler::SetThreads(int32_t n) {
231#ifndef DUCKDB_NO_THREADS
232 lock_guard<mutex> t(thread_lock);
233 if (n < 1) {
234 throw SyntaxException("Must have at least 1 thread!");
235 }
236 SetThreadsInternal(n);
237#else
238 if (n != 1) {
239 throw NotImplementedException("DuckDB was compiled without threads! Setting threads > 1 is not allowed.");
240 }
241#endif
242}
243
244void TaskScheduler::Signal(idx_t n) {
245#ifndef DUCKDB_NO_THREADS
246 queue->semaphore.signal(count: n);
247#endif
248}
249
250void TaskScheduler::SetThreadsInternal(int32_t n) {
251#ifndef DUCKDB_NO_THREADS
252 if (threads.size() == idx_t(n - 1)) {
253 return;
254 }
255 idx_t new_thread_count = n - 1;
256 if (threads.size() > new_thread_count) {
257 // we are reducing the number of threads: clear all threads first
258 for (idx_t i = 0; i < threads.size(); i++) {
259 *markers[i] = false;
260 }
261 Signal(n: threads.size());
262 // now join the threads to ensure they are fully stopped before erasing them
263 for (idx_t i = 0; i < threads.size(); i++) {
264 threads[i]->internal_thread->join();
265 }
266 // erase the threads/markers
267 threads.clear();
268 markers.clear();
269 }
270 if (threads.size() < new_thread_count) {
271 // we are increasing the number of threads: launch them and run tasks on them
272 idx_t create_new_threads = new_thread_count - threads.size();
273 for (idx_t i = 0; i < create_new_threads; i++) {
274 // launch a thread and assign it a cancellation marker
275 auto marker = unique_ptr<atomic<bool>>(new atomic<bool>(true));
276 auto worker_thread = make_uniq<thread>(args&: ThreadExecuteTasks, args: this, args: marker.get());
277 auto thread_wrapper = make_uniq<SchedulerThread>(args: std::move(worker_thread));
278
279 threads.push_back(x: std::move(thread_wrapper));
280 markers.push_back(x: std::move(marker));
281 }
282 }
283#endif
284}
285
286} // namespace duckdb
287