| 1 | /**************************************************************************/ |
| 2 | /* worker_thread_pool.cpp */ |
| 3 | /**************************************************************************/ |
| 4 | /* This file is part of: */ |
| 5 | /* GODOT ENGINE */ |
| 6 | /* https://godotengine.org */ |
| 7 | /**************************************************************************/ |
| 8 | /* Copyright (c) 2014-present Godot Engine contributors (see AUTHORS.md). */ |
| 9 | /* Copyright (c) 2007-2014 Juan Linietsky, Ariel Manzur. */ |
| 10 | /* */ |
| 11 | /* Permission is hereby granted, free of charge, to any person obtaining */ |
| 12 | /* a copy of this software and associated documentation files (the */ |
| 13 | /* "Software"), to deal in the Software without restriction, including */ |
| 14 | /* without limitation the rights to use, copy, modify, merge, publish, */ |
| 15 | /* distribute, sublicense, and/or sell copies of the Software, and to */ |
| 16 | /* permit persons to whom the Software is furnished to do so, subject to */ |
| 17 | /* the following conditions: */ |
| 18 | /* */ |
| 19 | /* The above copyright notice and this permission notice shall be */ |
| 20 | /* included in all copies or substantial portions of the Software. */ |
| 21 | /* */ |
| 22 | /* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ |
| 23 | /* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ |
| 24 | /* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */ |
| 25 | /* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ |
| 26 | /* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ |
| 27 | /* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ |
| 28 | /* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ |
| 29 | /**************************************************************************/ |
| 30 | |
| 31 | #include "worker_thread_pool.h" |
| 32 | |
| 33 | #include "core/os/os.h" |
| 34 | #include "core/os/thread_safe.h" |
| 35 | |
| 36 | void WorkerThreadPool::Task::free_template_userdata() { |
| 37 | ERR_FAIL_NULL(template_userdata); |
| 38 | ERR_FAIL_NULL(native_func_userdata); |
| 39 | BaseTemplateUserdata *btu = (BaseTemplateUserdata *)native_func_userdata; |
| 40 | memdelete(btu); |
| 41 | } |
| 42 | |
| 43 | WorkerThreadPool *WorkerThreadPool::singleton = nullptr; |
| 44 | |
| 45 | void WorkerThreadPool::_process_task_queue() { |
| 46 | task_mutex.lock(); |
| 47 | Task *task = task_queue.first()->self(); |
| 48 | task_queue.remove(task_queue.first()); |
| 49 | task_mutex.unlock(); |
| 50 | _process_task(task); |
| 51 | } |
| 52 | |
| 53 | void WorkerThreadPool::_process_task(Task *p_task) { |
| 54 | bool low_priority = p_task->low_priority; |
| 55 | int pool_thread_index = -1; |
| 56 | Task *prev_low_prio_task = nullptr; // In case this is recursively called. |
| 57 | |
| 58 | if (!use_native_low_priority_threads) { |
| 59 | // Tasks must start with this unset. They are free to set-and-forget otherwise. |
| 60 | set_current_thread_safe_for_nodes(false); |
| 61 | pool_thread_index = thread_ids[Thread::get_caller_id()]; |
| 62 | ThreadData &curr_thread = threads[pool_thread_index]; |
| 63 | task_mutex.lock(); |
| 64 | p_task->pool_thread_index = pool_thread_index; |
| 65 | if (low_priority) { |
| 66 | low_priority_tasks_running++; |
| 67 | prev_low_prio_task = curr_thread.current_low_prio_task; |
| 68 | curr_thread.current_low_prio_task = p_task; |
| 69 | } else { |
| 70 | curr_thread.current_low_prio_task = nullptr; |
| 71 | } |
| 72 | task_mutex.unlock(); |
| 73 | } |
| 74 | |
| 75 | if (p_task->group) { |
| 76 | // Handling a group |
| 77 | bool do_post = false; |
| 78 | Callable::CallError ce; |
| 79 | Variant ret; |
| 80 | Variant arg; |
| 81 | Variant *argptr = &arg; |
| 82 | |
| 83 | while (true) { |
| 84 | uint32_t work_index = p_task->group->index.postincrement(); |
| 85 | |
| 86 | if (work_index >= p_task->group->max) { |
| 87 | break; |
| 88 | } |
| 89 | if (p_task->native_group_func) { |
| 90 | p_task->native_group_func(p_task->native_func_userdata, work_index); |
| 91 | } else if (p_task->template_userdata) { |
| 92 | p_task->template_userdata->callback_indexed(work_index); |
| 93 | } else { |
| 94 | arg = work_index; |
| 95 | p_task->callable.callp((const Variant **)&argptr, 1, ret, ce); |
| 96 | } |
| 97 | |
| 98 | // This is the only way to ensure posting is done when all tasks are really complete. |
| 99 | uint32_t completed_amount = p_task->group->completed_index.increment(); |
| 100 | |
| 101 | if (completed_amount == p_task->group->max) { |
| 102 | do_post = true; |
| 103 | } |
| 104 | } |
| 105 | |
| 106 | if (do_post && p_task->template_userdata) { |
| 107 | memdelete(p_task->template_userdata); // This is no longer needed at this point, so get rid of it. |
| 108 | } |
| 109 | |
| 110 | if (low_priority && use_native_low_priority_threads) { |
| 111 | p_task->completed = true; |
| 112 | p_task->done_semaphore.post(); |
| 113 | if (do_post) { |
| 114 | p_task->group->completed.set_to(true); |
| 115 | } |
| 116 | } else { |
| 117 | if (do_post) { |
| 118 | p_task->group->done_semaphore.post(); |
| 119 | p_task->group->completed.set_to(true); |
| 120 | } |
| 121 | uint32_t max_users = p_task->group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment. |
| 122 | uint32_t finished_users = p_task->group->finished.increment(); |
| 123 | |
| 124 | if (finished_users == max_users) { |
| 125 | // Get rid of the group, because nobody else is using it. |
| 126 | task_mutex.lock(); |
| 127 | group_allocator.free(p_task->group); |
| 128 | task_mutex.unlock(); |
| 129 | } |
| 130 | |
| 131 | // For groups, tasks get rid of themselves. |
| 132 | |
| 133 | task_mutex.lock(); |
| 134 | task_allocator.free(p_task); |
| 135 | task_mutex.unlock(); |
| 136 | } |
| 137 | } else { |
| 138 | if (p_task->native_func) { |
| 139 | p_task->native_func(p_task->native_func_userdata); |
| 140 | } else if (p_task->template_userdata) { |
| 141 | p_task->template_userdata->callback(); |
| 142 | memdelete(p_task->template_userdata); |
| 143 | } else { |
| 144 | Callable::CallError ce; |
| 145 | Variant ret; |
| 146 | p_task->callable.callp(nullptr, 0, ret, ce); |
| 147 | } |
| 148 | |
| 149 | task_mutex.lock(); |
| 150 | p_task->completed = true; |
| 151 | for (uint8_t i = 0; i < p_task->waiting; i++) { |
| 152 | p_task->done_semaphore.post(); |
| 153 | } |
| 154 | if (!use_native_low_priority_threads) { |
| 155 | p_task->pool_thread_index = -1; |
| 156 | } |
| 157 | task_mutex.unlock(); // Keep mutex down to here since on unlock the task may be freed. |
| 158 | } |
| 159 | |
| 160 | // Task may have been freed by now (all callers notified). |
| 161 | p_task = nullptr; |
| 162 | |
| 163 | if (!use_native_low_priority_threads) { |
| 164 | bool post = false; |
| 165 | task_mutex.lock(); |
| 166 | ThreadData &curr_thread = threads[pool_thread_index]; |
| 167 | curr_thread.current_low_prio_task = prev_low_prio_task; |
| 168 | if (low_priority) { |
| 169 | low_priority_threads_used--; |
| 170 | low_priority_tasks_running--; |
| 171 | // A low prioriry task was freed, so see if we can move a pending one to the high priority queue. |
| 172 | if (_try_promote_low_priority_task()) { |
| 173 | post = true; |
| 174 | } |
| 175 | |
| 176 | if (low_priority_tasks_awaiting_others == low_priority_tasks_running) { |
| 177 | _prevent_low_prio_saturation_deadlock(); |
| 178 | } |
| 179 | } |
| 180 | task_mutex.unlock(); |
| 181 | if (post) { |
| 182 | task_available_semaphore.post(); |
| 183 | } |
| 184 | } |
| 185 | } |
| 186 | |
| 187 | void WorkerThreadPool::_thread_function(void *p_user) { |
| 188 | while (true) { |
| 189 | singleton->task_available_semaphore.wait(); |
| 190 | if (singleton->exit_threads) { |
| 191 | break; |
| 192 | } |
| 193 | singleton->_process_task_queue(); |
| 194 | } |
| 195 | } |
| 196 | |
| 197 | void WorkerThreadPool::_native_low_priority_thread_function(void *p_user) { |
| 198 | Task *task = (Task *)p_user; |
| 199 | singleton->_process_task(task); |
| 200 | } |
| 201 | |
| 202 | void WorkerThreadPool::_post_task(Task *p_task, bool p_high_priority) { |
| 203 | // Fall back to processing on the calling thread if there are no worker threads. |
| 204 | // Separated into its own variable to make it easier to extend this logic |
| 205 | // in custom builds. |
| 206 | bool process_on_calling_thread = threads.size() == 0; |
| 207 | if (process_on_calling_thread) { |
| 208 | _process_task(p_task); |
| 209 | return; |
| 210 | } |
| 211 | |
| 212 | task_mutex.lock(); |
| 213 | p_task->low_priority = !p_high_priority; |
| 214 | if (!p_high_priority && use_native_low_priority_threads) { |
| 215 | p_task->low_priority_thread = native_thread_allocator.alloc(); |
| 216 | task_mutex.unlock(); |
| 217 | |
| 218 | if (p_task->group) { |
| 219 | p_task->group->low_priority_native_tasks.push_back(p_task); |
| 220 | } |
| 221 | p_task->low_priority_thread->start(_native_low_priority_thread_function, p_task); // Pask task directly to thread. |
| 222 | } else if (p_high_priority || low_priority_threads_used < max_low_priority_threads) { |
| 223 | task_queue.add_last(&p_task->task_elem); |
| 224 | if (!p_high_priority) { |
| 225 | low_priority_threads_used++; |
| 226 | } |
| 227 | task_mutex.unlock(); |
| 228 | task_available_semaphore.post(); |
| 229 | } else { |
| 230 | // Too many threads using low priority, must go to queue. |
| 231 | low_priority_task_queue.add_last(&p_task->task_elem); |
| 232 | task_mutex.unlock(); |
| 233 | } |
| 234 | } |
| 235 | |
| 236 | bool WorkerThreadPool::_try_promote_low_priority_task() { |
| 237 | if (low_priority_task_queue.first()) { |
| 238 | Task *low_prio_task = low_priority_task_queue.first()->self(); |
| 239 | low_priority_task_queue.remove(low_priority_task_queue.first()); |
| 240 | task_queue.add_last(&low_prio_task->task_elem); |
| 241 | low_priority_threads_used++; |
| 242 | return true; |
| 243 | } else { |
| 244 | return false; |
| 245 | } |
| 246 | } |
| 247 | |
| 248 | void WorkerThreadPool::_prevent_low_prio_saturation_deadlock() { |
| 249 | if (low_priority_tasks_awaiting_others == low_priority_tasks_running) { |
| 250 | #ifdef DEV_ENABLED |
| 251 | print_verbose("WorkerThreadPool: Low-prio slots saturated with tasks all waiting for other low-prio tasks. Attempting to avoid deadlock by scheduling one extra task." ); |
| 252 | #endif |
| 253 | // In order not to create dependency cycles, we can only schedule the next one. |
| 254 | // We'll keep doing the same until the deadlock is broken, |
| 255 | SelfList<Task> *to_promote = low_priority_task_queue.first(); |
| 256 | if (to_promote) { |
| 257 | low_priority_task_queue.remove(to_promote); |
| 258 | task_queue.add_last(to_promote); |
| 259 | low_priority_threads_used++; |
| 260 | task_available_semaphore.post(); |
| 261 | } |
| 262 | } |
| 263 | } |
| 264 | |
| 265 | WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority, const String &p_description) { |
| 266 | return _add_task(Callable(), p_func, p_userdata, nullptr, p_high_priority, p_description); |
| 267 | } |
| 268 | |
| 269 | WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description) { |
| 270 | task_mutex.lock(); |
| 271 | // Get a free task |
| 272 | Task *task = task_allocator.alloc(); |
| 273 | TaskID id = last_task++; |
| 274 | task->callable = p_callable; |
| 275 | task->native_func = p_func; |
| 276 | task->native_func_userdata = p_userdata; |
| 277 | task->description = p_description; |
| 278 | task->template_userdata = p_template_userdata; |
| 279 | tasks.insert(id, task); |
| 280 | task_mutex.unlock(); |
| 281 | |
| 282 | _post_task(task, p_high_priority); |
| 283 | |
| 284 | return id; |
| 285 | } |
| 286 | |
| 287 | WorkerThreadPool::TaskID WorkerThreadPool::add_task(const Callable &p_action, bool p_high_priority, const String &p_description) { |
| 288 | return _add_task(p_action, nullptr, nullptr, nullptr, p_high_priority, p_description); |
| 289 | } |
| 290 | |
| 291 | bool WorkerThreadPool::is_task_completed(TaskID p_task_id) const { |
| 292 | task_mutex.lock(); |
| 293 | const Task *const *taskp = tasks.getptr(p_task_id); |
| 294 | if (!taskp) { |
| 295 | task_mutex.unlock(); |
| 296 | ERR_FAIL_V_MSG(false, "Invalid Task ID" ); // Invalid task |
| 297 | } |
| 298 | |
| 299 | bool completed = (*taskp)->completed; |
| 300 | task_mutex.unlock(); |
| 301 | |
| 302 | return completed; |
| 303 | } |
| 304 | |
| 305 | Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) { |
| 306 | task_mutex.lock(); |
| 307 | Task **taskp = tasks.getptr(p_task_id); |
| 308 | if (!taskp) { |
| 309 | task_mutex.unlock(); |
| 310 | ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "Invalid Task ID" ); // Invalid task |
| 311 | } |
| 312 | Task *task = *taskp; |
| 313 | |
| 314 | if (!task->completed) { |
| 315 | if (!use_native_low_priority_threads && task->pool_thread_index != -1) { // Otherwise, it's not running yet. |
| 316 | int caller_pool_th_index = thread_ids.has(Thread::get_caller_id()) ? thread_ids[Thread::get_caller_id()] : -1; |
| 317 | if (caller_pool_th_index == task->pool_thread_index) { |
| 318 | // Deadlock prevention. |
| 319 | // Waiting for a task run on this same thread? That means the task to be awaited started waiting as well |
| 320 | // and another task was run to make use of the thread in the meantime, with enough bad luck as to |
| 321 | // the need to wait for the original task arose in turn. |
| 322 | // In other words, the task we want to wait for is buried in the stack. |
| 323 | // Let's report the caller about the issue to it handles as it sees fit. |
| 324 | task_mutex.unlock(); |
| 325 | return ERR_BUSY; |
| 326 | } |
| 327 | } |
| 328 | |
| 329 | task->waiting++; |
| 330 | |
| 331 | bool is_low_prio_waiting_for_another = false; |
| 332 | if (!use_native_low_priority_threads) { |
| 333 | // Deadlock prevention: |
| 334 | // If all low-prio tasks are waiting for other low-prio tasks and there are no more free low-prio slots, |
| 335 | // we have a no progressable situation. We can apply a workaround, consisting in promoting an awaited queued |
| 336 | // low-prio task to the schedule queue so it can run and break the "impasse". |
| 337 | // NOTE: A similar reasoning could be made about high priority tasks, but there are usually much more |
| 338 | // than low-prio. Therefore, a deadlock there would only happen when dealing with a very complex task graph |
| 339 | // or when there are too few worker threads (limited platforms or exotic settings). If that turns out to be |
| 340 | // an issue in the real world, a further fix can be applied against that. |
| 341 | if (task->low_priority) { |
| 342 | bool awaiter_is_a_low_prio_task = thread_ids.has(Thread::get_caller_id()) && threads[thread_ids[Thread::get_caller_id()]].current_low_prio_task; |
| 343 | if (awaiter_is_a_low_prio_task) { |
| 344 | is_low_prio_waiting_for_another = true; |
| 345 | low_priority_tasks_awaiting_others++; |
| 346 | if (low_priority_tasks_awaiting_others == low_priority_tasks_running) { |
| 347 | _prevent_low_prio_saturation_deadlock(); |
| 348 | } |
| 349 | } |
| 350 | } |
| 351 | } |
| 352 | |
| 353 | task_mutex.unlock(); |
| 354 | |
| 355 | if (use_native_low_priority_threads && task->low_priority) { |
| 356 | task->done_semaphore.wait(); |
| 357 | } else { |
| 358 | bool current_is_pool_thread = thread_ids.has(Thread::get_caller_id()); |
| 359 | if (current_is_pool_thread) { |
| 360 | // We are an actual process thread, we must not be blocked so continue processing stuff if available. |
| 361 | bool must_exit = false; |
| 362 | while (true) { |
| 363 | if (task->done_semaphore.try_wait()) { |
| 364 | // If done, exit |
| 365 | break; |
| 366 | } |
| 367 | if (!must_exit) { |
| 368 | if (task_available_semaphore.try_wait()) { |
| 369 | if (exit_threads) { |
| 370 | must_exit = true; |
| 371 | } else { |
| 372 | // Solve tasks while they are around. |
| 373 | bool safe_for_nodes_backup = is_current_thread_safe_for_nodes(); |
| 374 | _process_task_queue(); |
| 375 | set_current_thread_safe_for_nodes(safe_for_nodes_backup); |
| 376 | continue; |
| 377 | } |
| 378 | } else if (!use_native_low_priority_threads && task->low_priority) { |
| 379 | // A low prioriry task started waiting, so see if we can move a pending one to the high priority queue. |
| 380 | task_mutex.lock(); |
| 381 | bool post = _try_promote_low_priority_task(); |
| 382 | task_mutex.unlock(); |
| 383 | if (post) { |
| 384 | task_available_semaphore.post(); |
| 385 | } |
| 386 | } |
| 387 | } |
| 388 | OS::get_singleton()->delay_usec(1); // Microsleep, this could be converted to waiting for multiple objects in supported platforms for a bit more performance. |
| 389 | } |
| 390 | } else { |
| 391 | task->done_semaphore.wait(); |
| 392 | } |
| 393 | } |
| 394 | |
| 395 | task_mutex.lock(); |
| 396 | if (is_low_prio_waiting_for_another) { |
| 397 | low_priority_tasks_awaiting_others--; |
| 398 | } |
| 399 | |
| 400 | task->waiting--; |
| 401 | } |
| 402 | |
| 403 | if (task->waiting == 0) { |
| 404 | if (use_native_low_priority_threads && task->low_priority) { |
| 405 | task->low_priority_thread->wait_to_finish(); |
| 406 | native_thread_allocator.free(task->low_priority_thread); |
| 407 | } |
| 408 | tasks.erase(p_task_id); |
| 409 | task_allocator.free(task); |
| 410 | } |
| 411 | |
| 412 | task_mutex.unlock(); |
| 413 | return OK; |
| 414 | } |
| 415 | |
| 416 | WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) { |
| 417 | ERR_FAIL_COND_V(p_elements < 0, INVALID_TASK_ID); |
| 418 | if (p_tasks < 0) { |
| 419 | p_tasks = MAX(1u, threads.size()); |
| 420 | } |
| 421 | |
| 422 | task_mutex.lock(); |
| 423 | Group *group = group_allocator.alloc(); |
| 424 | GroupID id = last_task++; |
| 425 | group->max = p_elements; |
| 426 | group->self = id; |
| 427 | |
| 428 | Task **tasks_posted = nullptr; |
| 429 | if (p_elements == 0) { |
| 430 | // Should really not call it with zero Elements, but at least it should work. |
| 431 | group->completed.set_to(true); |
| 432 | group->done_semaphore.post(); |
| 433 | group->tasks_used = 0; |
| 434 | p_tasks = 0; |
| 435 | if (p_template_userdata) { |
| 436 | memdelete(p_template_userdata); |
| 437 | } |
| 438 | |
| 439 | } else { |
| 440 | group->tasks_used = p_tasks; |
| 441 | tasks_posted = (Task **)alloca(sizeof(Task *) * p_tasks); |
| 442 | for (int i = 0; i < p_tasks; i++) { |
| 443 | Task *task = task_allocator.alloc(); |
| 444 | task->native_group_func = p_func; |
| 445 | task->native_func_userdata = p_userdata; |
| 446 | task->description = p_description; |
| 447 | task->group = group; |
| 448 | task->callable = p_callable; |
| 449 | task->template_userdata = p_template_userdata; |
| 450 | tasks_posted[i] = task; |
| 451 | // No task ID is used. |
| 452 | } |
| 453 | } |
| 454 | |
| 455 | groups[id] = group; |
| 456 | task_mutex.unlock(); |
| 457 | |
| 458 | for (int i = 0; i < p_tasks; i++) { |
| 459 | _post_task(tasks_posted[i], p_high_priority); |
| 460 | } |
| 461 | |
| 462 | return id; |
| 463 | } |
| 464 | |
| 465 | WorkerThreadPool::GroupID WorkerThreadPool::add_native_group_task(void (*p_func)(void *, uint32_t), void *p_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) { |
| 466 | return _add_group_task(Callable(), p_func, p_userdata, nullptr, p_elements, p_tasks, p_high_priority, p_description); |
| 467 | } |
| 468 | |
| 469 | WorkerThreadPool::GroupID WorkerThreadPool::add_group_task(const Callable &p_action, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) { |
| 470 | return _add_group_task(p_action, nullptr, nullptr, nullptr, p_elements, p_tasks, p_high_priority, p_description); |
| 471 | } |
| 472 | |
| 473 | uint32_t WorkerThreadPool::get_group_processed_element_count(GroupID p_group) const { |
| 474 | task_mutex.lock(); |
| 475 | const Group *const *groupp = groups.getptr(p_group); |
| 476 | if (!groupp) { |
| 477 | task_mutex.unlock(); |
| 478 | ERR_FAIL_V_MSG(0, "Invalid Group ID" ); |
| 479 | } |
| 480 | uint32_t elements = (*groupp)->completed_index.get(); |
| 481 | task_mutex.unlock(); |
| 482 | return elements; |
| 483 | } |
| 484 | bool WorkerThreadPool::is_group_task_completed(GroupID p_group) const { |
| 485 | task_mutex.lock(); |
| 486 | const Group *const *groupp = groups.getptr(p_group); |
| 487 | if (!groupp) { |
| 488 | task_mutex.unlock(); |
| 489 | ERR_FAIL_V_MSG(false, "Invalid Group ID" ); |
| 490 | } |
| 491 | bool completed = (*groupp)->completed.is_set(); |
| 492 | task_mutex.unlock(); |
| 493 | return completed; |
| 494 | } |
| 495 | |
| 496 | void WorkerThreadPool::wait_for_group_task_completion(GroupID p_group) { |
| 497 | task_mutex.lock(); |
| 498 | Group **groupp = groups.getptr(p_group); |
| 499 | task_mutex.unlock(); |
| 500 | if (!groupp) { |
| 501 | ERR_FAIL_MSG("Invalid Group ID" ); |
| 502 | } |
| 503 | Group *group = *groupp; |
| 504 | |
| 505 | if (group->low_priority_native_tasks.size() > 0) { |
| 506 | for (Task *task : group->low_priority_native_tasks) { |
| 507 | task->low_priority_thread->wait_to_finish(); |
| 508 | task_mutex.lock(); |
| 509 | native_thread_allocator.free(task->low_priority_thread); |
| 510 | task_allocator.free(task); |
| 511 | task_mutex.unlock(); |
| 512 | } |
| 513 | |
| 514 | task_mutex.lock(); |
| 515 | group_allocator.free(group); |
| 516 | task_mutex.unlock(); |
| 517 | } else { |
| 518 | group->done_semaphore.wait(); |
| 519 | |
| 520 | uint32_t max_users = group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment. |
| 521 | uint32_t finished_users = group->finished.increment(); // fetch happens before inc, so increment later. |
| 522 | |
| 523 | if (finished_users == max_users) { |
| 524 | // All tasks using this group are gone (finished before the group), so clear the group too. |
| 525 | task_mutex.lock(); |
| 526 | group_allocator.free(group); |
| 527 | task_mutex.unlock(); |
| 528 | } |
| 529 | } |
| 530 | |
| 531 | task_mutex.lock(); // This mutex is needed when Physics 2D and/or 3D is selected to run on a separate thread. |
| 532 | groups.erase(p_group); |
| 533 | task_mutex.unlock(); |
| 534 | } |
| 535 | |
| 536 | void WorkerThreadPool::init(int p_thread_count, bool p_use_native_threads_low_priority, float p_low_priority_task_ratio) { |
| 537 | ERR_FAIL_COND(threads.size() > 0); |
| 538 | if (p_thread_count < 0) { |
| 539 | p_thread_count = OS::get_singleton()->get_default_thread_pool_size(); |
| 540 | } |
| 541 | |
| 542 | if (p_use_native_threads_low_priority) { |
| 543 | max_low_priority_threads = 0; |
| 544 | } else { |
| 545 | max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count - 1); |
| 546 | } |
| 547 | |
| 548 | use_native_low_priority_threads = p_use_native_threads_low_priority; |
| 549 | |
| 550 | threads.resize(p_thread_count); |
| 551 | |
| 552 | for (uint32_t i = 0; i < threads.size(); i++) { |
| 553 | threads[i].index = i; |
| 554 | threads[i].thread.start(&WorkerThreadPool::_thread_function, &threads[i]); |
| 555 | thread_ids.insert(threads[i].thread.get_id(), i); |
| 556 | } |
| 557 | } |
| 558 | |
| 559 | void WorkerThreadPool::finish() { |
| 560 | if (threads.size() == 0) { |
| 561 | return; |
| 562 | } |
| 563 | |
| 564 | task_mutex.lock(); |
| 565 | SelfList<Task> *E = low_priority_task_queue.first(); |
| 566 | while (E) { |
| 567 | print_error("Task waiting was never re-claimed: " + E->self()->description); |
| 568 | E = E->next(); |
| 569 | } |
| 570 | task_mutex.unlock(); |
| 571 | |
| 572 | exit_threads = true; |
| 573 | |
| 574 | for (uint32_t i = 0; i < threads.size(); i++) { |
| 575 | task_available_semaphore.post(); |
| 576 | } |
| 577 | |
| 578 | for (ThreadData &data : threads) { |
| 579 | data.thread.wait_to_finish(); |
| 580 | } |
| 581 | |
| 582 | threads.clear(); |
| 583 | } |
| 584 | |
| 585 | void WorkerThreadPool::_bind_methods() { |
| 586 | ClassDB::bind_method(D_METHOD("add_task" , "action" , "high_priority" , "description" ), &WorkerThreadPool::add_task, DEFVAL(false), DEFVAL(String())); |
| 587 | ClassDB::bind_method(D_METHOD("is_task_completed" , "task_id" ), &WorkerThreadPool::is_task_completed); |
| 588 | ClassDB::bind_method(D_METHOD("wait_for_task_completion" , "task_id" ), &WorkerThreadPool::wait_for_task_completion); |
| 589 | |
| 590 | ClassDB::bind_method(D_METHOD("add_group_task" , "action" , "elements" , "tasks_needed" , "high_priority" , "description" ), &WorkerThreadPool::add_group_task, DEFVAL(-1), DEFVAL(false), DEFVAL(String())); |
| 591 | ClassDB::bind_method(D_METHOD("is_group_task_completed" , "group_id" ), &WorkerThreadPool::is_group_task_completed); |
| 592 | ClassDB::bind_method(D_METHOD("get_group_processed_element_count" , "group_id" ), &WorkerThreadPool::get_group_processed_element_count); |
| 593 | ClassDB::bind_method(D_METHOD("wait_for_group_task_completion" , "group_id" ), &WorkerThreadPool::wait_for_group_task_completion); |
| 594 | } |
| 595 | |
| 596 | WorkerThreadPool::WorkerThreadPool() { |
| 597 | singleton = this; |
| 598 | } |
| 599 | |
| 600 | WorkerThreadPool::~WorkerThreadPool() { |
| 601 | finish(); |
| 602 | } |
| 603 | |