1 | /**************************************************************************/ |
2 | /* worker_thread_pool.cpp */ |
3 | /**************************************************************************/ |
4 | /* This file is part of: */ |
5 | /* GODOT ENGINE */ |
6 | /* https://godotengine.org */ |
7 | /**************************************************************************/ |
8 | /* Copyright (c) 2014-present Godot Engine contributors (see AUTHORS.md). */ |
9 | /* Copyright (c) 2007-2014 Juan Linietsky, Ariel Manzur. */ |
10 | /* */ |
11 | /* Permission is hereby granted, free of charge, to any person obtaining */ |
12 | /* a copy of this software and associated documentation files (the */ |
13 | /* "Software"), to deal in the Software without restriction, including */ |
14 | /* without limitation the rights to use, copy, modify, merge, publish, */ |
15 | /* distribute, sublicense, and/or sell copies of the Software, and to */ |
16 | /* permit persons to whom the Software is furnished to do so, subject to */ |
17 | /* the following conditions: */ |
18 | /* */ |
19 | /* The above copyright notice and this permission notice shall be */ |
20 | /* included in all copies or substantial portions of the Software. */ |
21 | /* */ |
22 | /* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ |
23 | /* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ |
24 | /* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */ |
25 | /* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ |
26 | /* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ |
27 | /* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ |
28 | /* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ |
29 | /**************************************************************************/ |
30 | |
31 | #include "worker_thread_pool.h" |
32 | |
33 | #include "core/os/os.h" |
34 | #include "core/os/thread_safe.h" |
35 | |
36 | void WorkerThreadPool::Task::free_template_userdata() { |
37 | ERR_FAIL_NULL(template_userdata); |
38 | ERR_FAIL_NULL(native_func_userdata); |
39 | BaseTemplateUserdata *btu = (BaseTemplateUserdata *)native_func_userdata; |
40 | memdelete(btu); |
41 | } |
42 | |
43 | WorkerThreadPool *WorkerThreadPool::singleton = nullptr; |
44 | |
45 | void WorkerThreadPool::_process_task_queue() { |
46 | task_mutex.lock(); |
47 | Task *task = task_queue.first()->self(); |
48 | task_queue.remove(task_queue.first()); |
49 | task_mutex.unlock(); |
50 | _process_task(task); |
51 | } |
52 | |
53 | void WorkerThreadPool::_process_task(Task *p_task) { |
54 | bool low_priority = p_task->low_priority; |
55 | int pool_thread_index = -1; |
56 | Task *prev_low_prio_task = nullptr; // In case this is recursively called. |
57 | |
58 | if (!use_native_low_priority_threads) { |
59 | // Tasks must start with this unset. They are free to set-and-forget otherwise. |
60 | set_current_thread_safe_for_nodes(false); |
61 | pool_thread_index = thread_ids[Thread::get_caller_id()]; |
62 | ThreadData &curr_thread = threads[pool_thread_index]; |
63 | task_mutex.lock(); |
64 | p_task->pool_thread_index = pool_thread_index; |
65 | if (low_priority) { |
66 | low_priority_tasks_running++; |
67 | prev_low_prio_task = curr_thread.current_low_prio_task; |
68 | curr_thread.current_low_prio_task = p_task; |
69 | } else { |
70 | curr_thread.current_low_prio_task = nullptr; |
71 | } |
72 | task_mutex.unlock(); |
73 | } |
74 | |
75 | if (p_task->group) { |
76 | // Handling a group |
77 | bool do_post = false; |
78 | Callable::CallError ce; |
79 | Variant ret; |
80 | Variant arg; |
81 | Variant *argptr = &arg; |
82 | |
83 | while (true) { |
84 | uint32_t work_index = p_task->group->index.postincrement(); |
85 | |
86 | if (work_index >= p_task->group->max) { |
87 | break; |
88 | } |
89 | if (p_task->native_group_func) { |
90 | p_task->native_group_func(p_task->native_func_userdata, work_index); |
91 | } else if (p_task->template_userdata) { |
92 | p_task->template_userdata->callback_indexed(work_index); |
93 | } else { |
94 | arg = work_index; |
95 | p_task->callable.callp((const Variant **)&argptr, 1, ret, ce); |
96 | } |
97 | |
98 | // This is the only way to ensure posting is done when all tasks are really complete. |
99 | uint32_t completed_amount = p_task->group->completed_index.increment(); |
100 | |
101 | if (completed_amount == p_task->group->max) { |
102 | do_post = true; |
103 | } |
104 | } |
105 | |
106 | if (do_post && p_task->template_userdata) { |
107 | memdelete(p_task->template_userdata); // This is no longer needed at this point, so get rid of it. |
108 | } |
109 | |
110 | if (low_priority && use_native_low_priority_threads) { |
111 | p_task->completed = true; |
112 | p_task->done_semaphore.post(); |
113 | if (do_post) { |
114 | p_task->group->completed.set_to(true); |
115 | } |
116 | } else { |
117 | if (do_post) { |
118 | p_task->group->done_semaphore.post(); |
119 | p_task->group->completed.set_to(true); |
120 | } |
121 | uint32_t max_users = p_task->group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment. |
122 | uint32_t finished_users = p_task->group->finished.increment(); |
123 | |
124 | if (finished_users == max_users) { |
125 | // Get rid of the group, because nobody else is using it. |
126 | task_mutex.lock(); |
127 | group_allocator.free(p_task->group); |
128 | task_mutex.unlock(); |
129 | } |
130 | |
131 | // For groups, tasks get rid of themselves. |
132 | |
133 | task_mutex.lock(); |
134 | task_allocator.free(p_task); |
135 | task_mutex.unlock(); |
136 | } |
137 | } else { |
138 | if (p_task->native_func) { |
139 | p_task->native_func(p_task->native_func_userdata); |
140 | } else if (p_task->template_userdata) { |
141 | p_task->template_userdata->callback(); |
142 | memdelete(p_task->template_userdata); |
143 | } else { |
144 | Callable::CallError ce; |
145 | Variant ret; |
146 | p_task->callable.callp(nullptr, 0, ret, ce); |
147 | } |
148 | |
149 | task_mutex.lock(); |
150 | p_task->completed = true; |
151 | for (uint8_t i = 0; i < p_task->waiting; i++) { |
152 | p_task->done_semaphore.post(); |
153 | } |
154 | if (!use_native_low_priority_threads) { |
155 | p_task->pool_thread_index = -1; |
156 | } |
157 | task_mutex.unlock(); // Keep mutex down to here since on unlock the task may be freed. |
158 | } |
159 | |
160 | // Task may have been freed by now (all callers notified). |
161 | p_task = nullptr; |
162 | |
163 | if (!use_native_low_priority_threads) { |
164 | bool post = false; |
165 | task_mutex.lock(); |
166 | ThreadData &curr_thread = threads[pool_thread_index]; |
167 | curr_thread.current_low_prio_task = prev_low_prio_task; |
168 | if (low_priority) { |
169 | low_priority_threads_used--; |
170 | low_priority_tasks_running--; |
171 | // A low prioriry task was freed, so see if we can move a pending one to the high priority queue. |
172 | if (_try_promote_low_priority_task()) { |
173 | post = true; |
174 | } |
175 | |
176 | if (low_priority_tasks_awaiting_others == low_priority_tasks_running) { |
177 | _prevent_low_prio_saturation_deadlock(); |
178 | } |
179 | } |
180 | task_mutex.unlock(); |
181 | if (post) { |
182 | task_available_semaphore.post(); |
183 | } |
184 | } |
185 | } |
186 | |
187 | void WorkerThreadPool::_thread_function(void *p_user) { |
188 | while (true) { |
189 | singleton->task_available_semaphore.wait(); |
190 | if (singleton->exit_threads) { |
191 | break; |
192 | } |
193 | singleton->_process_task_queue(); |
194 | } |
195 | } |
196 | |
197 | void WorkerThreadPool::_native_low_priority_thread_function(void *p_user) { |
198 | Task *task = (Task *)p_user; |
199 | singleton->_process_task(task); |
200 | } |
201 | |
202 | void WorkerThreadPool::_post_task(Task *p_task, bool p_high_priority) { |
203 | // Fall back to processing on the calling thread if there are no worker threads. |
204 | // Separated into its own variable to make it easier to extend this logic |
205 | // in custom builds. |
206 | bool process_on_calling_thread = threads.size() == 0; |
207 | if (process_on_calling_thread) { |
208 | _process_task(p_task); |
209 | return; |
210 | } |
211 | |
212 | task_mutex.lock(); |
213 | p_task->low_priority = !p_high_priority; |
214 | if (!p_high_priority && use_native_low_priority_threads) { |
215 | p_task->low_priority_thread = native_thread_allocator.alloc(); |
216 | task_mutex.unlock(); |
217 | |
218 | if (p_task->group) { |
219 | p_task->group->low_priority_native_tasks.push_back(p_task); |
220 | } |
221 | p_task->low_priority_thread->start(_native_low_priority_thread_function, p_task); // Pask task directly to thread. |
222 | } else if (p_high_priority || low_priority_threads_used < max_low_priority_threads) { |
223 | task_queue.add_last(&p_task->task_elem); |
224 | if (!p_high_priority) { |
225 | low_priority_threads_used++; |
226 | } |
227 | task_mutex.unlock(); |
228 | task_available_semaphore.post(); |
229 | } else { |
230 | // Too many threads using low priority, must go to queue. |
231 | low_priority_task_queue.add_last(&p_task->task_elem); |
232 | task_mutex.unlock(); |
233 | } |
234 | } |
235 | |
236 | bool WorkerThreadPool::_try_promote_low_priority_task() { |
237 | if (low_priority_task_queue.first()) { |
238 | Task *low_prio_task = low_priority_task_queue.first()->self(); |
239 | low_priority_task_queue.remove(low_priority_task_queue.first()); |
240 | task_queue.add_last(&low_prio_task->task_elem); |
241 | low_priority_threads_used++; |
242 | return true; |
243 | } else { |
244 | return false; |
245 | } |
246 | } |
247 | |
248 | void WorkerThreadPool::_prevent_low_prio_saturation_deadlock() { |
249 | if (low_priority_tasks_awaiting_others == low_priority_tasks_running) { |
250 | #ifdef DEV_ENABLED |
251 | print_verbose("WorkerThreadPool: Low-prio slots saturated with tasks all waiting for other low-prio tasks. Attempting to avoid deadlock by scheduling one extra task." ); |
252 | #endif |
253 | // In order not to create dependency cycles, we can only schedule the next one. |
254 | // We'll keep doing the same until the deadlock is broken, |
255 | SelfList<Task> *to_promote = low_priority_task_queue.first(); |
256 | if (to_promote) { |
257 | low_priority_task_queue.remove(to_promote); |
258 | task_queue.add_last(to_promote); |
259 | low_priority_threads_used++; |
260 | task_available_semaphore.post(); |
261 | } |
262 | } |
263 | } |
264 | |
265 | WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority, const String &p_description) { |
266 | return _add_task(Callable(), p_func, p_userdata, nullptr, p_high_priority, p_description); |
267 | } |
268 | |
269 | WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description) { |
270 | task_mutex.lock(); |
271 | // Get a free task |
272 | Task *task = task_allocator.alloc(); |
273 | TaskID id = last_task++; |
274 | task->callable = p_callable; |
275 | task->native_func = p_func; |
276 | task->native_func_userdata = p_userdata; |
277 | task->description = p_description; |
278 | task->template_userdata = p_template_userdata; |
279 | tasks.insert(id, task); |
280 | task_mutex.unlock(); |
281 | |
282 | _post_task(task, p_high_priority); |
283 | |
284 | return id; |
285 | } |
286 | |
287 | WorkerThreadPool::TaskID WorkerThreadPool::add_task(const Callable &p_action, bool p_high_priority, const String &p_description) { |
288 | return _add_task(p_action, nullptr, nullptr, nullptr, p_high_priority, p_description); |
289 | } |
290 | |
291 | bool WorkerThreadPool::is_task_completed(TaskID p_task_id) const { |
292 | task_mutex.lock(); |
293 | const Task *const *taskp = tasks.getptr(p_task_id); |
294 | if (!taskp) { |
295 | task_mutex.unlock(); |
296 | ERR_FAIL_V_MSG(false, "Invalid Task ID" ); // Invalid task |
297 | } |
298 | |
299 | bool completed = (*taskp)->completed; |
300 | task_mutex.unlock(); |
301 | |
302 | return completed; |
303 | } |
304 | |
305 | Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) { |
306 | task_mutex.lock(); |
307 | Task **taskp = tasks.getptr(p_task_id); |
308 | if (!taskp) { |
309 | task_mutex.unlock(); |
310 | ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "Invalid Task ID" ); // Invalid task |
311 | } |
312 | Task *task = *taskp; |
313 | |
314 | if (!task->completed) { |
315 | if (!use_native_low_priority_threads && task->pool_thread_index != -1) { // Otherwise, it's not running yet. |
316 | int caller_pool_th_index = thread_ids.has(Thread::get_caller_id()) ? thread_ids[Thread::get_caller_id()] : -1; |
317 | if (caller_pool_th_index == task->pool_thread_index) { |
318 | // Deadlock prevention. |
319 | // Waiting for a task run on this same thread? That means the task to be awaited started waiting as well |
320 | // and another task was run to make use of the thread in the meantime, with enough bad luck as to |
321 | // the need to wait for the original task arose in turn. |
322 | // In other words, the task we want to wait for is buried in the stack. |
323 | // Let's report the caller about the issue to it handles as it sees fit. |
324 | task_mutex.unlock(); |
325 | return ERR_BUSY; |
326 | } |
327 | } |
328 | |
329 | task->waiting++; |
330 | |
331 | bool is_low_prio_waiting_for_another = false; |
332 | if (!use_native_low_priority_threads) { |
333 | // Deadlock prevention: |
334 | // If all low-prio tasks are waiting for other low-prio tasks and there are no more free low-prio slots, |
335 | // we have a no progressable situation. We can apply a workaround, consisting in promoting an awaited queued |
336 | // low-prio task to the schedule queue so it can run and break the "impasse". |
337 | // NOTE: A similar reasoning could be made about high priority tasks, but there are usually much more |
338 | // than low-prio. Therefore, a deadlock there would only happen when dealing with a very complex task graph |
339 | // or when there are too few worker threads (limited platforms or exotic settings). If that turns out to be |
340 | // an issue in the real world, a further fix can be applied against that. |
341 | if (task->low_priority) { |
342 | bool awaiter_is_a_low_prio_task = thread_ids.has(Thread::get_caller_id()) && threads[thread_ids[Thread::get_caller_id()]].current_low_prio_task; |
343 | if (awaiter_is_a_low_prio_task) { |
344 | is_low_prio_waiting_for_another = true; |
345 | low_priority_tasks_awaiting_others++; |
346 | if (low_priority_tasks_awaiting_others == low_priority_tasks_running) { |
347 | _prevent_low_prio_saturation_deadlock(); |
348 | } |
349 | } |
350 | } |
351 | } |
352 | |
353 | task_mutex.unlock(); |
354 | |
355 | if (use_native_low_priority_threads && task->low_priority) { |
356 | task->done_semaphore.wait(); |
357 | } else { |
358 | bool current_is_pool_thread = thread_ids.has(Thread::get_caller_id()); |
359 | if (current_is_pool_thread) { |
360 | // We are an actual process thread, we must not be blocked so continue processing stuff if available. |
361 | bool must_exit = false; |
362 | while (true) { |
363 | if (task->done_semaphore.try_wait()) { |
364 | // If done, exit |
365 | break; |
366 | } |
367 | if (!must_exit) { |
368 | if (task_available_semaphore.try_wait()) { |
369 | if (exit_threads) { |
370 | must_exit = true; |
371 | } else { |
372 | // Solve tasks while they are around. |
373 | bool safe_for_nodes_backup = is_current_thread_safe_for_nodes(); |
374 | _process_task_queue(); |
375 | set_current_thread_safe_for_nodes(safe_for_nodes_backup); |
376 | continue; |
377 | } |
378 | } else if (!use_native_low_priority_threads && task->low_priority) { |
379 | // A low prioriry task started waiting, so see if we can move a pending one to the high priority queue. |
380 | task_mutex.lock(); |
381 | bool post = _try_promote_low_priority_task(); |
382 | task_mutex.unlock(); |
383 | if (post) { |
384 | task_available_semaphore.post(); |
385 | } |
386 | } |
387 | } |
388 | OS::get_singleton()->delay_usec(1); // Microsleep, this could be converted to waiting for multiple objects in supported platforms for a bit more performance. |
389 | } |
390 | } else { |
391 | task->done_semaphore.wait(); |
392 | } |
393 | } |
394 | |
395 | task_mutex.lock(); |
396 | if (is_low_prio_waiting_for_another) { |
397 | low_priority_tasks_awaiting_others--; |
398 | } |
399 | |
400 | task->waiting--; |
401 | } |
402 | |
403 | if (task->waiting == 0) { |
404 | if (use_native_low_priority_threads && task->low_priority) { |
405 | task->low_priority_thread->wait_to_finish(); |
406 | native_thread_allocator.free(task->low_priority_thread); |
407 | } |
408 | tasks.erase(p_task_id); |
409 | task_allocator.free(task); |
410 | } |
411 | |
412 | task_mutex.unlock(); |
413 | return OK; |
414 | } |
415 | |
416 | WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) { |
417 | ERR_FAIL_COND_V(p_elements < 0, INVALID_TASK_ID); |
418 | if (p_tasks < 0) { |
419 | p_tasks = MAX(1u, threads.size()); |
420 | } |
421 | |
422 | task_mutex.lock(); |
423 | Group *group = group_allocator.alloc(); |
424 | GroupID id = last_task++; |
425 | group->max = p_elements; |
426 | group->self = id; |
427 | |
428 | Task **tasks_posted = nullptr; |
429 | if (p_elements == 0) { |
430 | // Should really not call it with zero Elements, but at least it should work. |
431 | group->completed.set_to(true); |
432 | group->done_semaphore.post(); |
433 | group->tasks_used = 0; |
434 | p_tasks = 0; |
435 | if (p_template_userdata) { |
436 | memdelete(p_template_userdata); |
437 | } |
438 | |
439 | } else { |
440 | group->tasks_used = p_tasks; |
441 | tasks_posted = (Task **)alloca(sizeof(Task *) * p_tasks); |
442 | for (int i = 0; i < p_tasks; i++) { |
443 | Task *task = task_allocator.alloc(); |
444 | task->native_group_func = p_func; |
445 | task->native_func_userdata = p_userdata; |
446 | task->description = p_description; |
447 | task->group = group; |
448 | task->callable = p_callable; |
449 | task->template_userdata = p_template_userdata; |
450 | tasks_posted[i] = task; |
451 | // No task ID is used. |
452 | } |
453 | } |
454 | |
455 | groups[id] = group; |
456 | task_mutex.unlock(); |
457 | |
458 | for (int i = 0; i < p_tasks; i++) { |
459 | _post_task(tasks_posted[i], p_high_priority); |
460 | } |
461 | |
462 | return id; |
463 | } |
464 | |
465 | WorkerThreadPool::GroupID WorkerThreadPool::add_native_group_task(void (*p_func)(void *, uint32_t), void *p_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) { |
466 | return _add_group_task(Callable(), p_func, p_userdata, nullptr, p_elements, p_tasks, p_high_priority, p_description); |
467 | } |
468 | |
469 | WorkerThreadPool::GroupID WorkerThreadPool::add_group_task(const Callable &p_action, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) { |
470 | return _add_group_task(p_action, nullptr, nullptr, nullptr, p_elements, p_tasks, p_high_priority, p_description); |
471 | } |
472 | |
473 | uint32_t WorkerThreadPool::get_group_processed_element_count(GroupID p_group) const { |
474 | task_mutex.lock(); |
475 | const Group *const *groupp = groups.getptr(p_group); |
476 | if (!groupp) { |
477 | task_mutex.unlock(); |
478 | ERR_FAIL_V_MSG(0, "Invalid Group ID" ); |
479 | } |
480 | uint32_t elements = (*groupp)->completed_index.get(); |
481 | task_mutex.unlock(); |
482 | return elements; |
483 | } |
484 | bool WorkerThreadPool::is_group_task_completed(GroupID p_group) const { |
485 | task_mutex.lock(); |
486 | const Group *const *groupp = groups.getptr(p_group); |
487 | if (!groupp) { |
488 | task_mutex.unlock(); |
489 | ERR_FAIL_V_MSG(false, "Invalid Group ID" ); |
490 | } |
491 | bool completed = (*groupp)->completed.is_set(); |
492 | task_mutex.unlock(); |
493 | return completed; |
494 | } |
495 | |
496 | void WorkerThreadPool::wait_for_group_task_completion(GroupID p_group) { |
497 | task_mutex.lock(); |
498 | Group **groupp = groups.getptr(p_group); |
499 | task_mutex.unlock(); |
500 | if (!groupp) { |
501 | ERR_FAIL_MSG("Invalid Group ID" ); |
502 | } |
503 | Group *group = *groupp; |
504 | |
505 | if (group->low_priority_native_tasks.size() > 0) { |
506 | for (Task *task : group->low_priority_native_tasks) { |
507 | task->low_priority_thread->wait_to_finish(); |
508 | task_mutex.lock(); |
509 | native_thread_allocator.free(task->low_priority_thread); |
510 | task_allocator.free(task); |
511 | task_mutex.unlock(); |
512 | } |
513 | |
514 | task_mutex.lock(); |
515 | group_allocator.free(group); |
516 | task_mutex.unlock(); |
517 | } else { |
518 | group->done_semaphore.wait(); |
519 | |
520 | uint32_t max_users = group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment. |
521 | uint32_t finished_users = group->finished.increment(); // fetch happens before inc, so increment later. |
522 | |
523 | if (finished_users == max_users) { |
524 | // All tasks using this group are gone (finished before the group), so clear the group too. |
525 | task_mutex.lock(); |
526 | group_allocator.free(group); |
527 | task_mutex.unlock(); |
528 | } |
529 | } |
530 | |
531 | task_mutex.lock(); // This mutex is needed when Physics 2D and/or 3D is selected to run on a separate thread. |
532 | groups.erase(p_group); |
533 | task_mutex.unlock(); |
534 | } |
535 | |
536 | void WorkerThreadPool::init(int p_thread_count, bool p_use_native_threads_low_priority, float p_low_priority_task_ratio) { |
537 | ERR_FAIL_COND(threads.size() > 0); |
538 | if (p_thread_count < 0) { |
539 | p_thread_count = OS::get_singleton()->get_default_thread_pool_size(); |
540 | } |
541 | |
542 | if (p_use_native_threads_low_priority) { |
543 | max_low_priority_threads = 0; |
544 | } else { |
545 | max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count - 1); |
546 | } |
547 | |
548 | use_native_low_priority_threads = p_use_native_threads_low_priority; |
549 | |
550 | threads.resize(p_thread_count); |
551 | |
552 | for (uint32_t i = 0; i < threads.size(); i++) { |
553 | threads[i].index = i; |
554 | threads[i].thread.start(&WorkerThreadPool::_thread_function, &threads[i]); |
555 | thread_ids.insert(threads[i].thread.get_id(), i); |
556 | } |
557 | } |
558 | |
559 | void WorkerThreadPool::finish() { |
560 | if (threads.size() == 0) { |
561 | return; |
562 | } |
563 | |
564 | task_mutex.lock(); |
565 | SelfList<Task> *E = low_priority_task_queue.first(); |
566 | while (E) { |
567 | print_error("Task waiting was never re-claimed: " + E->self()->description); |
568 | E = E->next(); |
569 | } |
570 | task_mutex.unlock(); |
571 | |
572 | exit_threads = true; |
573 | |
574 | for (uint32_t i = 0; i < threads.size(); i++) { |
575 | task_available_semaphore.post(); |
576 | } |
577 | |
578 | for (ThreadData &data : threads) { |
579 | data.thread.wait_to_finish(); |
580 | } |
581 | |
582 | threads.clear(); |
583 | } |
584 | |
585 | void WorkerThreadPool::_bind_methods() { |
586 | ClassDB::bind_method(D_METHOD("add_task" , "action" , "high_priority" , "description" ), &WorkerThreadPool::add_task, DEFVAL(false), DEFVAL(String())); |
587 | ClassDB::bind_method(D_METHOD("is_task_completed" , "task_id" ), &WorkerThreadPool::is_task_completed); |
588 | ClassDB::bind_method(D_METHOD("wait_for_task_completion" , "task_id" ), &WorkerThreadPool::wait_for_task_completion); |
589 | |
590 | ClassDB::bind_method(D_METHOD("add_group_task" , "action" , "elements" , "tasks_needed" , "high_priority" , "description" ), &WorkerThreadPool::add_group_task, DEFVAL(-1), DEFVAL(false), DEFVAL(String())); |
591 | ClassDB::bind_method(D_METHOD("is_group_task_completed" , "group_id" ), &WorkerThreadPool::is_group_task_completed); |
592 | ClassDB::bind_method(D_METHOD("get_group_processed_element_count" , "group_id" ), &WorkerThreadPool::get_group_processed_element_count); |
593 | ClassDB::bind_method(D_METHOD("wait_for_group_task_completion" , "group_id" ), &WorkerThreadPool::wait_for_group_task_completion); |
594 | } |
595 | |
596 | WorkerThreadPool::WorkerThreadPool() { |
597 | singleton = this; |
598 | } |
599 | |
600 | WorkerThreadPool::~WorkerThreadPool() { |
601 | finish(); |
602 | } |
603 | |