| 1 | /**************************************************************************/ | 
|---|
| 2 | /*  worker_thread_pool.cpp                                                */ | 
|---|
| 3 | /**************************************************************************/ | 
|---|
| 4 | /*                         This file is part of:                          */ | 
|---|
| 5 | /*                             GODOT ENGINE                               */ | 
|---|
| 6 | /*                        https://godotengine.org                         */ | 
|---|
| 7 | /**************************************************************************/ | 
|---|
| 8 | /* Copyright (c) 2014-present Godot Engine contributors (see AUTHORS.md). */ | 
|---|
| 9 | /* Copyright (c) 2007-2014 Juan Linietsky, Ariel Manzur.                  */ | 
|---|
| 10 | /*                                                                        */ | 
|---|
| 11 | /* Permission is hereby granted, free of charge, to any person obtaining  */ | 
|---|
| 12 | /* a copy of this software and associated documentation files (the        */ | 
|---|
| 13 | /* "Software"), to deal in the Software without restriction, including    */ | 
|---|
| 14 | /* without limitation the rights to use, copy, modify, merge, publish,    */ | 
|---|
| 15 | /* distribute, sublicense, and/or sell copies of the Software, and to     */ | 
|---|
| 16 | /* permit persons to whom the Software is furnished to do so, subject to  */ | 
|---|
| 17 | /* the following conditions:                                              */ | 
|---|
| 18 | /*                                                                        */ | 
|---|
| 19 | /* The above copyright notice and this permission notice shall be         */ | 
|---|
| 20 | /* included in all copies or substantial portions of the Software.        */ | 
|---|
| 21 | /*                                                                        */ | 
|---|
| 22 | /* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,        */ | 
|---|
| 23 | /* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF     */ | 
|---|
| 24 | /* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */ | 
|---|
| 25 | /* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY   */ | 
|---|
| 26 | /* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,   */ | 
|---|
| 27 | /* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE      */ | 
|---|
| 28 | /* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.                 */ | 
|---|
| 29 | /**************************************************************************/ | 
|---|
| 30 |  | 
|---|
| 31 | #include "worker_thread_pool.h" | 
|---|
| 32 |  | 
|---|
| 33 | #include "core/os/os.h" | 
|---|
| 34 | #include "core/os/thread_safe.h" | 
|---|
| 35 |  | 
|---|
| 36 | void WorkerThreadPool::Task::free_template_userdata() { | 
|---|
| 37 | ERR_FAIL_NULL(template_userdata); | 
|---|
| 38 | ERR_FAIL_NULL(native_func_userdata); | 
|---|
| 39 | BaseTemplateUserdata *btu = (BaseTemplateUserdata *)native_func_userdata; | 
|---|
| 40 | memdelete(btu); | 
|---|
| 41 | } | 
|---|
| 42 |  | 
|---|
| 43 | WorkerThreadPool *WorkerThreadPool::singleton = nullptr; | 
|---|
| 44 |  | 
|---|
| 45 | void WorkerThreadPool::_process_task_queue() { | 
|---|
| 46 | task_mutex.lock(); | 
|---|
| 47 | Task *task = task_queue.first()->self(); | 
|---|
| 48 | task_queue.remove(task_queue.first()); | 
|---|
| 49 | task_mutex.unlock(); | 
|---|
| 50 | _process_task(task); | 
|---|
| 51 | } | 
|---|
| 52 |  | 
|---|
| 53 | void WorkerThreadPool::_process_task(Task *p_task) { | 
|---|
| 54 | bool low_priority = p_task->low_priority; | 
|---|
| 55 | int pool_thread_index = -1; | 
|---|
| 56 | Task *prev_low_prio_task = nullptr; // In case this is recursively called. | 
|---|
| 57 |  | 
|---|
| 58 | if (!use_native_low_priority_threads) { | 
|---|
| 59 | // Tasks must start with this unset. They are free to set-and-forget otherwise. | 
|---|
| 60 | set_current_thread_safe_for_nodes(false); | 
|---|
| 61 | pool_thread_index = thread_ids[Thread::get_caller_id()]; | 
|---|
| 62 | ThreadData &curr_thread = threads[pool_thread_index]; | 
|---|
| 63 | task_mutex.lock(); | 
|---|
| 64 | p_task->pool_thread_index = pool_thread_index; | 
|---|
| 65 | if (low_priority) { | 
|---|
| 66 | low_priority_tasks_running++; | 
|---|
| 67 | prev_low_prio_task = curr_thread.current_low_prio_task; | 
|---|
| 68 | curr_thread.current_low_prio_task = p_task; | 
|---|
| 69 | } else { | 
|---|
| 70 | curr_thread.current_low_prio_task = nullptr; | 
|---|
| 71 | } | 
|---|
| 72 | task_mutex.unlock(); | 
|---|
| 73 | } | 
|---|
| 74 |  | 
|---|
| 75 | if (p_task->group) { | 
|---|
| 76 | // Handling a group | 
|---|
| 77 | bool do_post = false; | 
|---|
| 78 | Callable::CallError ce; | 
|---|
| 79 | Variant ret; | 
|---|
| 80 | Variant arg; | 
|---|
| 81 | Variant *argptr = &arg; | 
|---|
| 82 |  | 
|---|
| 83 | while (true) { | 
|---|
| 84 | uint32_t work_index = p_task->group->index.postincrement(); | 
|---|
| 85 |  | 
|---|
| 86 | if (work_index >= p_task->group->max) { | 
|---|
| 87 | break; | 
|---|
| 88 | } | 
|---|
| 89 | if (p_task->native_group_func) { | 
|---|
| 90 | p_task->native_group_func(p_task->native_func_userdata, work_index); | 
|---|
| 91 | } else if (p_task->template_userdata) { | 
|---|
| 92 | p_task->template_userdata->callback_indexed(work_index); | 
|---|
| 93 | } else { | 
|---|
| 94 | arg = work_index; | 
|---|
| 95 | p_task->callable.callp((const Variant **)&argptr, 1, ret, ce); | 
|---|
| 96 | } | 
|---|
| 97 |  | 
|---|
| 98 | // This is the only way to ensure posting is done when all tasks are really complete. | 
|---|
| 99 | uint32_t completed_amount = p_task->group->completed_index.increment(); | 
|---|
| 100 |  | 
|---|
| 101 | if (completed_amount == p_task->group->max) { | 
|---|
| 102 | do_post = true; | 
|---|
| 103 | } | 
|---|
| 104 | } | 
|---|
| 105 |  | 
|---|
| 106 | if (do_post && p_task->template_userdata) { | 
|---|
| 107 | memdelete(p_task->template_userdata); // This is no longer needed at this point, so get rid of it. | 
|---|
| 108 | } | 
|---|
| 109 |  | 
|---|
| 110 | if (low_priority && use_native_low_priority_threads) { | 
|---|
| 111 | p_task->completed = true; | 
|---|
| 112 | p_task->done_semaphore.post(); | 
|---|
| 113 | if (do_post) { | 
|---|
| 114 | p_task->group->completed.set_to(true); | 
|---|
| 115 | } | 
|---|
| 116 | } else { | 
|---|
| 117 | if (do_post) { | 
|---|
| 118 | p_task->group->done_semaphore.post(); | 
|---|
| 119 | p_task->group->completed.set_to(true); | 
|---|
| 120 | } | 
|---|
| 121 | uint32_t max_users = p_task->group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment. | 
|---|
| 122 | uint32_t finished_users = p_task->group->finished.increment(); | 
|---|
| 123 |  | 
|---|
| 124 | if (finished_users == max_users) { | 
|---|
| 125 | // Get rid of the group, because nobody else is using it. | 
|---|
| 126 | task_mutex.lock(); | 
|---|
| 127 | group_allocator.free(p_task->group); | 
|---|
| 128 | task_mutex.unlock(); | 
|---|
| 129 | } | 
|---|
| 130 |  | 
|---|
| 131 | // For groups, tasks get rid of themselves. | 
|---|
| 132 |  | 
|---|
| 133 | task_mutex.lock(); | 
|---|
| 134 | task_allocator.free(p_task); | 
|---|
| 135 | task_mutex.unlock(); | 
|---|
| 136 | } | 
|---|
| 137 | } else { | 
|---|
| 138 | if (p_task->native_func) { | 
|---|
| 139 | p_task->native_func(p_task->native_func_userdata); | 
|---|
| 140 | } else if (p_task->template_userdata) { | 
|---|
| 141 | p_task->template_userdata->callback(); | 
|---|
| 142 | memdelete(p_task->template_userdata); | 
|---|
| 143 | } else { | 
|---|
| 144 | Callable::CallError ce; | 
|---|
| 145 | Variant ret; | 
|---|
| 146 | p_task->callable.callp(nullptr, 0, ret, ce); | 
|---|
| 147 | } | 
|---|
| 148 |  | 
|---|
| 149 | task_mutex.lock(); | 
|---|
| 150 | p_task->completed = true; | 
|---|
| 151 | for (uint8_t i = 0; i < p_task->waiting; i++) { | 
|---|
| 152 | p_task->done_semaphore.post(); | 
|---|
| 153 | } | 
|---|
| 154 | if (!use_native_low_priority_threads) { | 
|---|
| 155 | p_task->pool_thread_index = -1; | 
|---|
| 156 | } | 
|---|
| 157 | task_mutex.unlock(); // Keep mutex down to here since on unlock the task may be freed. | 
|---|
| 158 | } | 
|---|
| 159 |  | 
|---|
| 160 | // Task may have been freed by now (all callers notified). | 
|---|
| 161 | p_task = nullptr; | 
|---|
| 162 |  | 
|---|
| 163 | if (!use_native_low_priority_threads) { | 
|---|
| 164 | bool post = false; | 
|---|
| 165 | task_mutex.lock(); | 
|---|
| 166 | ThreadData &curr_thread = threads[pool_thread_index]; | 
|---|
| 167 | curr_thread.current_low_prio_task = prev_low_prio_task; | 
|---|
| 168 | if (low_priority) { | 
|---|
| 169 | low_priority_threads_used--; | 
|---|
| 170 | low_priority_tasks_running--; | 
|---|
| 171 | // A low prioriry task was freed, so see if we can move a pending one to the high priority queue. | 
|---|
| 172 | if (_try_promote_low_priority_task()) { | 
|---|
| 173 | post = true; | 
|---|
| 174 | } | 
|---|
| 175 |  | 
|---|
| 176 | if (low_priority_tasks_awaiting_others == low_priority_tasks_running) { | 
|---|
| 177 | _prevent_low_prio_saturation_deadlock(); | 
|---|
| 178 | } | 
|---|
| 179 | } | 
|---|
| 180 | task_mutex.unlock(); | 
|---|
| 181 | if (post) { | 
|---|
| 182 | task_available_semaphore.post(); | 
|---|
| 183 | } | 
|---|
| 184 | } | 
|---|
| 185 | } | 
|---|
| 186 |  | 
|---|
| 187 | void WorkerThreadPool::_thread_function(void *p_user) { | 
|---|
| 188 | while (true) { | 
|---|
| 189 | singleton->task_available_semaphore.wait(); | 
|---|
| 190 | if (singleton->exit_threads) { | 
|---|
| 191 | break; | 
|---|
| 192 | } | 
|---|
| 193 | singleton->_process_task_queue(); | 
|---|
| 194 | } | 
|---|
| 195 | } | 
|---|
| 196 |  | 
|---|
| 197 | void WorkerThreadPool::_native_low_priority_thread_function(void *p_user) { | 
|---|
| 198 | Task *task = (Task *)p_user; | 
|---|
| 199 | singleton->_process_task(task); | 
|---|
| 200 | } | 
|---|
| 201 |  | 
|---|
| 202 | void WorkerThreadPool::_post_task(Task *p_task, bool p_high_priority) { | 
|---|
| 203 | // Fall back to processing on the calling thread if there are no worker threads. | 
|---|
| 204 | // Separated into its own variable to make it easier to extend this logic | 
|---|
| 205 | // in custom builds. | 
|---|
| 206 | bool process_on_calling_thread = threads.size() == 0; | 
|---|
| 207 | if (process_on_calling_thread) { | 
|---|
| 208 | _process_task(p_task); | 
|---|
| 209 | return; | 
|---|
| 210 | } | 
|---|
| 211 |  | 
|---|
| 212 | task_mutex.lock(); | 
|---|
| 213 | p_task->low_priority = !p_high_priority; | 
|---|
| 214 | if (!p_high_priority && use_native_low_priority_threads) { | 
|---|
| 215 | p_task->low_priority_thread = native_thread_allocator.alloc(); | 
|---|
| 216 | task_mutex.unlock(); | 
|---|
| 217 |  | 
|---|
| 218 | if (p_task->group) { | 
|---|
| 219 | p_task->group->low_priority_native_tasks.push_back(p_task); | 
|---|
| 220 | } | 
|---|
| 221 | p_task->low_priority_thread->start(_native_low_priority_thread_function, p_task); // Pask task directly to thread. | 
|---|
| 222 | } else if (p_high_priority || low_priority_threads_used < max_low_priority_threads) { | 
|---|
| 223 | task_queue.add_last(&p_task->task_elem); | 
|---|
| 224 | if (!p_high_priority) { | 
|---|
| 225 | low_priority_threads_used++; | 
|---|
| 226 | } | 
|---|
| 227 | task_mutex.unlock(); | 
|---|
| 228 | task_available_semaphore.post(); | 
|---|
| 229 | } else { | 
|---|
| 230 | // Too many threads using low priority, must go to queue. | 
|---|
| 231 | low_priority_task_queue.add_last(&p_task->task_elem); | 
|---|
| 232 | task_mutex.unlock(); | 
|---|
| 233 | } | 
|---|
| 234 | } | 
|---|
| 235 |  | 
|---|
| 236 | bool WorkerThreadPool::_try_promote_low_priority_task() { | 
|---|
| 237 | if (low_priority_task_queue.first()) { | 
|---|
| 238 | Task *low_prio_task = low_priority_task_queue.first()->self(); | 
|---|
| 239 | low_priority_task_queue.remove(low_priority_task_queue.first()); | 
|---|
| 240 | task_queue.add_last(&low_prio_task->task_elem); | 
|---|
| 241 | low_priority_threads_used++; | 
|---|
| 242 | return true; | 
|---|
| 243 | } else { | 
|---|
| 244 | return false; | 
|---|
| 245 | } | 
|---|
| 246 | } | 
|---|
| 247 |  | 
|---|
| 248 | void WorkerThreadPool::_prevent_low_prio_saturation_deadlock() { | 
|---|
| 249 | if (low_priority_tasks_awaiting_others == low_priority_tasks_running) { | 
|---|
| 250 | #ifdef DEV_ENABLED | 
|---|
| 251 | print_verbose( "WorkerThreadPool: Low-prio slots saturated with tasks all waiting for other low-prio tasks. Attempting to avoid deadlock by scheduling one extra task."); | 
|---|
| 252 | #endif | 
|---|
| 253 | // In order not to create dependency cycles, we can only schedule the next one. | 
|---|
| 254 | // We'll keep doing the same until the deadlock is broken, | 
|---|
| 255 | SelfList<Task> *to_promote = low_priority_task_queue.first(); | 
|---|
| 256 | if (to_promote) { | 
|---|
| 257 | low_priority_task_queue.remove(to_promote); | 
|---|
| 258 | task_queue.add_last(to_promote); | 
|---|
| 259 | low_priority_threads_used++; | 
|---|
| 260 | task_available_semaphore.post(); | 
|---|
| 261 | } | 
|---|
| 262 | } | 
|---|
| 263 | } | 
|---|
| 264 |  | 
|---|
| 265 | WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority, const String &p_description) { | 
|---|
| 266 | return _add_task(Callable(), p_func, p_userdata, nullptr, p_high_priority, p_description); | 
|---|
| 267 | } | 
|---|
| 268 |  | 
|---|
| 269 | WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description) { | 
|---|
| 270 | task_mutex.lock(); | 
|---|
| 271 | // Get a free task | 
|---|
| 272 | Task *task = task_allocator.alloc(); | 
|---|
| 273 | TaskID id = last_task++; | 
|---|
| 274 | task->callable = p_callable; | 
|---|
| 275 | task->native_func = p_func; | 
|---|
| 276 | task->native_func_userdata = p_userdata; | 
|---|
| 277 | task->description = p_description; | 
|---|
| 278 | task->template_userdata = p_template_userdata; | 
|---|
| 279 | tasks.insert(id, task); | 
|---|
| 280 | task_mutex.unlock(); | 
|---|
| 281 |  | 
|---|
| 282 | _post_task(task, p_high_priority); | 
|---|
| 283 |  | 
|---|
| 284 | return id; | 
|---|
| 285 | } | 
|---|
| 286 |  | 
|---|
| 287 | WorkerThreadPool::TaskID WorkerThreadPool::add_task(const Callable &p_action, bool p_high_priority, const String &p_description) { | 
|---|
| 288 | return _add_task(p_action, nullptr, nullptr, nullptr, p_high_priority, p_description); | 
|---|
| 289 | } | 
|---|
| 290 |  | 
|---|
| 291 | bool WorkerThreadPool::is_task_completed(TaskID p_task_id) const { | 
|---|
| 292 | task_mutex.lock(); | 
|---|
| 293 | const Task *const *taskp = tasks.getptr(p_task_id); | 
|---|
| 294 | if (!taskp) { | 
|---|
| 295 | task_mutex.unlock(); | 
|---|
| 296 | ERR_FAIL_V_MSG(false, "Invalid Task ID"); // Invalid task | 
|---|
| 297 | } | 
|---|
| 298 |  | 
|---|
| 299 | bool completed = (*taskp)->completed; | 
|---|
| 300 | task_mutex.unlock(); | 
|---|
| 301 |  | 
|---|
| 302 | return completed; | 
|---|
| 303 | } | 
|---|
| 304 |  | 
|---|
| 305 | Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) { | 
|---|
| 306 | task_mutex.lock(); | 
|---|
| 307 | Task **taskp = tasks.getptr(p_task_id); | 
|---|
| 308 | if (!taskp) { | 
|---|
| 309 | task_mutex.unlock(); | 
|---|
| 310 | ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "Invalid Task ID"); // Invalid task | 
|---|
| 311 | } | 
|---|
| 312 | Task *task = *taskp; | 
|---|
| 313 |  | 
|---|
| 314 | if (!task->completed) { | 
|---|
| 315 | if (!use_native_low_priority_threads && task->pool_thread_index != -1) { // Otherwise, it's not running yet. | 
|---|
| 316 | int caller_pool_th_index = thread_ids.has(Thread::get_caller_id()) ? thread_ids[Thread::get_caller_id()] : -1; | 
|---|
| 317 | if (caller_pool_th_index == task->pool_thread_index) { | 
|---|
| 318 | // Deadlock prevention. | 
|---|
| 319 | // Waiting for a task run on this same thread? That means the task to be awaited started waiting as well | 
|---|
| 320 | // and another task was run to make use of the thread in the meantime, with enough bad luck as to | 
|---|
| 321 | // the need to wait for the original task arose in turn. | 
|---|
| 322 | // In other words, the task we want to wait for is buried in the stack. | 
|---|
| 323 | // Let's report the caller about the issue to it handles as it sees fit. | 
|---|
| 324 | task_mutex.unlock(); | 
|---|
| 325 | return ERR_BUSY; | 
|---|
| 326 | } | 
|---|
| 327 | } | 
|---|
| 328 |  | 
|---|
| 329 | task->waiting++; | 
|---|
| 330 |  | 
|---|
| 331 | bool is_low_prio_waiting_for_another = false; | 
|---|
| 332 | if (!use_native_low_priority_threads) { | 
|---|
| 333 | // Deadlock prevention: | 
|---|
| 334 | // If all low-prio tasks are waiting for other low-prio tasks and there are no more free low-prio slots, | 
|---|
| 335 | // we have a no progressable situation. We can apply a workaround, consisting in promoting an awaited queued | 
|---|
| 336 | // low-prio task to the schedule queue so it can run and break the "impasse". | 
|---|
| 337 | // NOTE: A similar reasoning could be made about high priority tasks, but there are usually much more | 
|---|
| 338 | // than low-prio. Therefore, a deadlock there would only happen when dealing with a very complex task graph | 
|---|
| 339 | // or when there are too few worker threads (limited platforms or exotic settings). If that turns out to be | 
|---|
| 340 | // an issue in the real world, a further fix can be applied against that. | 
|---|
| 341 | if (task->low_priority) { | 
|---|
| 342 | bool awaiter_is_a_low_prio_task = thread_ids.has(Thread::get_caller_id()) && threads[thread_ids[Thread::get_caller_id()]].current_low_prio_task; | 
|---|
| 343 | if (awaiter_is_a_low_prio_task) { | 
|---|
| 344 | is_low_prio_waiting_for_another = true; | 
|---|
| 345 | low_priority_tasks_awaiting_others++; | 
|---|
| 346 | if (low_priority_tasks_awaiting_others == low_priority_tasks_running) { | 
|---|
| 347 | _prevent_low_prio_saturation_deadlock(); | 
|---|
| 348 | } | 
|---|
| 349 | } | 
|---|
| 350 | } | 
|---|
| 351 | } | 
|---|
| 352 |  | 
|---|
| 353 | task_mutex.unlock(); | 
|---|
| 354 |  | 
|---|
| 355 | if (use_native_low_priority_threads && task->low_priority) { | 
|---|
| 356 | task->done_semaphore.wait(); | 
|---|
| 357 | } else { | 
|---|
| 358 | bool current_is_pool_thread = thread_ids.has(Thread::get_caller_id()); | 
|---|
| 359 | if (current_is_pool_thread) { | 
|---|
| 360 | // We are an actual process thread, we must not be blocked so continue processing stuff if available. | 
|---|
| 361 | bool must_exit = false; | 
|---|
| 362 | while (true) { | 
|---|
| 363 | if (task->done_semaphore.try_wait()) { | 
|---|
| 364 | // If done, exit | 
|---|
| 365 | break; | 
|---|
| 366 | } | 
|---|
| 367 | if (!must_exit) { | 
|---|
| 368 | if (task_available_semaphore.try_wait()) { | 
|---|
| 369 | if (exit_threads) { | 
|---|
| 370 | must_exit = true; | 
|---|
| 371 | } else { | 
|---|
| 372 | // Solve tasks while they are around. | 
|---|
| 373 | bool safe_for_nodes_backup = is_current_thread_safe_for_nodes(); | 
|---|
| 374 | _process_task_queue(); | 
|---|
| 375 | set_current_thread_safe_for_nodes(safe_for_nodes_backup); | 
|---|
| 376 | continue; | 
|---|
| 377 | } | 
|---|
| 378 | } else if (!use_native_low_priority_threads && task->low_priority) { | 
|---|
| 379 | // A low prioriry task started waiting, so see if we can move a pending one to the high priority queue. | 
|---|
| 380 | task_mutex.lock(); | 
|---|
| 381 | bool post = _try_promote_low_priority_task(); | 
|---|
| 382 | task_mutex.unlock(); | 
|---|
| 383 | if (post) { | 
|---|
| 384 | task_available_semaphore.post(); | 
|---|
| 385 | } | 
|---|
| 386 | } | 
|---|
| 387 | } | 
|---|
| 388 | OS::get_singleton()->delay_usec(1); // Microsleep, this could be converted to waiting for multiple objects in supported platforms for a bit more performance. | 
|---|
| 389 | } | 
|---|
| 390 | } else { | 
|---|
| 391 | task->done_semaphore.wait(); | 
|---|
| 392 | } | 
|---|
| 393 | } | 
|---|
| 394 |  | 
|---|
| 395 | task_mutex.lock(); | 
|---|
| 396 | if (is_low_prio_waiting_for_another) { | 
|---|
| 397 | low_priority_tasks_awaiting_others--; | 
|---|
| 398 | } | 
|---|
| 399 |  | 
|---|
| 400 | task->waiting--; | 
|---|
| 401 | } | 
|---|
| 402 |  | 
|---|
| 403 | if (task->waiting == 0) { | 
|---|
| 404 | if (use_native_low_priority_threads && task->low_priority) { | 
|---|
| 405 | task->low_priority_thread->wait_to_finish(); | 
|---|
| 406 | native_thread_allocator.free(task->low_priority_thread); | 
|---|
| 407 | } | 
|---|
| 408 | tasks.erase(p_task_id); | 
|---|
| 409 | task_allocator.free(task); | 
|---|
| 410 | } | 
|---|
| 411 |  | 
|---|
| 412 | task_mutex.unlock(); | 
|---|
| 413 | return OK; | 
|---|
| 414 | } | 
|---|
| 415 |  | 
|---|
| 416 | WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) { | 
|---|
| 417 | ERR_FAIL_COND_V(p_elements < 0, INVALID_TASK_ID); | 
|---|
| 418 | if (p_tasks < 0) { | 
|---|
| 419 | p_tasks = MAX(1u, threads.size()); | 
|---|
| 420 | } | 
|---|
| 421 |  | 
|---|
| 422 | task_mutex.lock(); | 
|---|
| 423 | Group *group = group_allocator.alloc(); | 
|---|
| 424 | GroupID id = last_task++; | 
|---|
| 425 | group->max = p_elements; | 
|---|
| 426 | group->self = id; | 
|---|
| 427 |  | 
|---|
| 428 | Task **tasks_posted = nullptr; | 
|---|
| 429 | if (p_elements == 0) { | 
|---|
| 430 | // Should really not call it with zero Elements, but at least it should work. | 
|---|
| 431 | group->completed.set_to(true); | 
|---|
| 432 | group->done_semaphore.post(); | 
|---|
| 433 | group->tasks_used = 0; | 
|---|
| 434 | p_tasks = 0; | 
|---|
| 435 | if (p_template_userdata) { | 
|---|
| 436 | memdelete(p_template_userdata); | 
|---|
| 437 | } | 
|---|
| 438 |  | 
|---|
| 439 | } else { | 
|---|
| 440 | group->tasks_used = p_tasks; | 
|---|
| 441 | tasks_posted = (Task **)alloca(sizeof(Task *) * p_tasks); | 
|---|
| 442 | for (int i = 0; i < p_tasks; i++) { | 
|---|
| 443 | Task *task = task_allocator.alloc(); | 
|---|
| 444 | task->native_group_func = p_func; | 
|---|
| 445 | task->native_func_userdata = p_userdata; | 
|---|
| 446 | task->description = p_description; | 
|---|
| 447 | task->group = group; | 
|---|
| 448 | task->callable = p_callable; | 
|---|
| 449 | task->template_userdata = p_template_userdata; | 
|---|
| 450 | tasks_posted[i] = task; | 
|---|
| 451 | // No task ID is used. | 
|---|
| 452 | } | 
|---|
| 453 | } | 
|---|
| 454 |  | 
|---|
| 455 | groups[id] = group; | 
|---|
| 456 | task_mutex.unlock(); | 
|---|
| 457 |  | 
|---|
| 458 | for (int i = 0; i < p_tasks; i++) { | 
|---|
| 459 | _post_task(tasks_posted[i], p_high_priority); | 
|---|
| 460 | } | 
|---|
| 461 |  | 
|---|
| 462 | return id; | 
|---|
| 463 | } | 
|---|
| 464 |  | 
|---|
| 465 | WorkerThreadPool::GroupID WorkerThreadPool::add_native_group_task(void (*p_func)(void *, uint32_t), void *p_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) { | 
|---|
| 466 | return _add_group_task(Callable(), p_func, p_userdata, nullptr, p_elements, p_tasks, p_high_priority, p_description); | 
|---|
| 467 | } | 
|---|
| 468 |  | 
|---|
| 469 | WorkerThreadPool::GroupID WorkerThreadPool::add_group_task(const Callable &p_action, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) { | 
|---|
| 470 | return _add_group_task(p_action, nullptr, nullptr, nullptr, p_elements, p_tasks, p_high_priority, p_description); | 
|---|
| 471 | } | 
|---|
| 472 |  | 
|---|
| 473 | uint32_t WorkerThreadPool::get_group_processed_element_count(GroupID p_group) const { | 
|---|
| 474 | task_mutex.lock(); | 
|---|
| 475 | const Group *const *groupp = groups.getptr(p_group); | 
|---|
| 476 | if (!groupp) { | 
|---|
| 477 | task_mutex.unlock(); | 
|---|
| 478 | ERR_FAIL_V_MSG(0, "Invalid Group ID"); | 
|---|
| 479 | } | 
|---|
| 480 | uint32_t elements = (*groupp)->completed_index.get(); | 
|---|
| 481 | task_mutex.unlock(); | 
|---|
| 482 | return elements; | 
|---|
| 483 | } | 
|---|
| 484 | bool WorkerThreadPool::is_group_task_completed(GroupID p_group) const { | 
|---|
| 485 | task_mutex.lock(); | 
|---|
| 486 | const Group *const *groupp = groups.getptr(p_group); | 
|---|
| 487 | if (!groupp) { | 
|---|
| 488 | task_mutex.unlock(); | 
|---|
| 489 | ERR_FAIL_V_MSG(false, "Invalid Group ID"); | 
|---|
| 490 | } | 
|---|
| 491 | bool completed = (*groupp)->completed.is_set(); | 
|---|
| 492 | task_mutex.unlock(); | 
|---|
| 493 | return completed; | 
|---|
| 494 | } | 
|---|
| 495 |  | 
|---|
| 496 | void WorkerThreadPool::wait_for_group_task_completion(GroupID p_group) { | 
|---|
| 497 | task_mutex.lock(); | 
|---|
| 498 | Group **groupp = groups.getptr(p_group); | 
|---|
| 499 | task_mutex.unlock(); | 
|---|
| 500 | if (!groupp) { | 
|---|
| 501 | ERR_FAIL_MSG( "Invalid Group ID"); | 
|---|
| 502 | } | 
|---|
| 503 | Group *group = *groupp; | 
|---|
| 504 |  | 
|---|
| 505 | if (group->low_priority_native_tasks.size() > 0) { | 
|---|
| 506 | for (Task *task : group->low_priority_native_tasks) { | 
|---|
| 507 | task->low_priority_thread->wait_to_finish(); | 
|---|
| 508 | task_mutex.lock(); | 
|---|
| 509 | native_thread_allocator.free(task->low_priority_thread); | 
|---|
| 510 | task_allocator.free(task); | 
|---|
| 511 | task_mutex.unlock(); | 
|---|
| 512 | } | 
|---|
| 513 |  | 
|---|
| 514 | task_mutex.lock(); | 
|---|
| 515 | group_allocator.free(group); | 
|---|
| 516 | task_mutex.unlock(); | 
|---|
| 517 | } else { | 
|---|
| 518 | group->done_semaphore.wait(); | 
|---|
| 519 |  | 
|---|
| 520 | uint32_t max_users = group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment. | 
|---|
| 521 | uint32_t finished_users = group->finished.increment(); // fetch happens before inc, so increment later. | 
|---|
| 522 |  | 
|---|
| 523 | if (finished_users == max_users) { | 
|---|
| 524 | // All tasks using this group are gone (finished before the group), so clear the group too. | 
|---|
| 525 | task_mutex.lock(); | 
|---|
| 526 | group_allocator.free(group); | 
|---|
| 527 | task_mutex.unlock(); | 
|---|
| 528 | } | 
|---|
| 529 | } | 
|---|
| 530 |  | 
|---|
| 531 | task_mutex.lock(); // This mutex is needed when Physics 2D and/or 3D is selected to run on a separate thread. | 
|---|
| 532 | groups.erase(p_group); | 
|---|
| 533 | task_mutex.unlock(); | 
|---|
| 534 | } | 
|---|
| 535 |  | 
|---|
| 536 | void WorkerThreadPool::init(int p_thread_count, bool p_use_native_threads_low_priority, float p_low_priority_task_ratio) { | 
|---|
| 537 | ERR_FAIL_COND(threads.size() > 0); | 
|---|
| 538 | if (p_thread_count < 0) { | 
|---|
| 539 | p_thread_count = OS::get_singleton()->get_default_thread_pool_size(); | 
|---|
| 540 | } | 
|---|
| 541 |  | 
|---|
| 542 | if (p_use_native_threads_low_priority) { | 
|---|
| 543 | max_low_priority_threads = 0; | 
|---|
| 544 | } else { | 
|---|
| 545 | max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count - 1); | 
|---|
| 546 | } | 
|---|
| 547 |  | 
|---|
| 548 | use_native_low_priority_threads = p_use_native_threads_low_priority; | 
|---|
| 549 |  | 
|---|
| 550 | threads.resize(p_thread_count); | 
|---|
| 551 |  | 
|---|
| 552 | for (uint32_t i = 0; i < threads.size(); i++) { | 
|---|
| 553 | threads[i].index = i; | 
|---|
| 554 | threads[i].thread.start(&WorkerThreadPool::_thread_function, &threads[i]); | 
|---|
| 555 | thread_ids.insert(threads[i].thread.get_id(), i); | 
|---|
| 556 | } | 
|---|
| 557 | } | 
|---|
| 558 |  | 
|---|
| 559 | void WorkerThreadPool::finish() { | 
|---|
| 560 | if (threads.size() == 0) { | 
|---|
| 561 | return; | 
|---|
| 562 | } | 
|---|
| 563 |  | 
|---|
| 564 | task_mutex.lock(); | 
|---|
| 565 | SelfList<Task> *E = low_priority_task_queue.first(); | 
|---|
| 566 | while (E) { | 
|---|
| 567 | print_error( "Task waiting was never re-claimed: "+ E->self()->description); | 
|---|
| 568 | E = E->next(); | 
|---|
| 569 | } | 
|---|
| 570 | task_mutex.unlock(); | 
|---|
| 571 |  | 
|---|
| 572 | exit_threads = true; | 
|---|
| 573 |  | 
|---|
| 574 | for (uint32_t i = 0; i < threads.size(); i++) { | 
|---|
| 575 | task_available_semaphore.post(); | 
|---|
| 576 | } | 
|---|
| 577 |  | 
|---|
| 578 | for (ThreadData &data : threads) { | 
|---|
| 579 | data.thread.wait_to_finish(); | 
|---|
| 580 | } | 
|---|
| 581 |  | 
|---|
| 582 | threads.clear(); | 
|---|
| 583 | } | 
|---|
| 584 |  | 
|---|
| 585 | void WorkerThreadPool::_bind_methods() { | 
|---|
| 586 | ClassDB::bind_method(D_METHOD( "add_task", "action", "high_priority", "description"), &WorkerThreadPool::add_task, DEFVAL(false), DEFVAL(String())); | 
|---|
| 587 | ClassDB::bind_method(D_METHOD( "is_task_completed", "task_id"), &WorkerThreadPool::is_task_completed); | 
|---|
| 588 | ClassDB::bind_method(D_METHOD( "wait_for_task_completion", "task_id"), &WorkerThreadPool::wait_for_task_completion); | 
|---|
| 589 |  | 
|---|
| 590 | ClassDB::bind_method(D_METHOD( "add_group_task", "action", "elements", "tasks_needed", "high_priority", "description"), &WorkerThreadPool::add_group_task, DEFVAL(-1), DEFVAL(false), DEFVAL(String())); | 
|---|
| 591 | ClassDB::bind_method(D_METHOD( "is_group_task_completed", "group_id"), &WorkerThreadPool::is_group_task_completed); | 
|---|
| 592 | ClassDB::bind_method(D_METHOD( "get_group_processed_element_count", "group_id"), &WorkerThreadPool::get_group_processed_element_count); | 
|---|
| 593 | ClassDB::bind_method(D_METHOD( "wait_for_group_task_completion", "group_id"), &WorkerThreadPool::wait_for_group_task_completion); | 
|---|
| 594 | } | 
|---|
| 595 |  | 
|---|
| 596 | WorkerThreadPool::WorkerThreadPool() { | 
|---|
| 597 | singleton = this; | 
|---|
| 598 | } | 
|---|
| 599 |  | 
|---|
| 600 | WorkerThreadPool::~WorkerThreadPool() { | 
|---|
| 601 | finish(); | 
|---|
| 602 | } | 
|---|
| 603 |  | 
|---|