| 1 | /* |
| 2 | Copyright (c) 2005-2019 Intel Corporation |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | #include "tbb/global_control.h" // thread_stack_size |
| 18 | |
| 19 | #include "scheduler.h" |
| 20 | #include "governor.h" |
| 21 | #include "arena.h" |
| 22 | #include "itt_notify.h" |
| 23 | #include "semaphore.h" |
| 24 | #include "tbb/internal/_flow_graph_impl.h" |
| 25 | |
| 26 | #include <functional> |
| 27 | |
| 28 | #if __TBB_STATISTICS_STDOUT |
| 29 | #include <cstdio> |
| 30 | #endif |
| 31 | |
| 32 | namespace tbb { |
| 33 | namespace internal { |
| 34 | |
| 35 | // put it here in order to enable compiler to inline it into arena::process and nested_arena_entry |
| 36 | void generic_scheduler::attach_arena( arena* a, size_t index, bool is_master ) { |
| 37 | __TBB_ASSERT( a->my_market == my_market, NULL ); |
| 38 | my_arena = a; |
| 39 | my_arena_index = index; |
| 40 | my_arena_slot = a->my_slots + index; |
| 41 | attach_mailbox( affinity_id(index+1) ); |
| 42 | if ( is_master && my_inbox.is_idle_state( true ) ) { |
| 43 | // Master enters an arena with its own task to be executed. It means that master is not |
| 44 | // going to enter stealing loop and take affinity tasks. |
| 45 | my_inbox.set_is_idle( false ); |
| 46 | } |
| 47 | #if __TBB_TASK_GROUP_CONTEXT |
| 48 | // Context to be used by root tasks by default (if the user has not specified one). |
| 49 | if( !is_master ) |
| 50 | my_dummy_task->prefix().context = a->my_default_ctx; |
| 51 | #endif /* __TBB_TASK_GROUP_CONTEXT */ |
| 52 | #if __TBB_TASK_PRIORITY |
| 53 | // In the current implementation master threads continue processing even when |
| 54 | // there are other masters with higher priority. Only TBB worker threads are |
| 55 | // redistributed between arenas based on the latters' priority. Thus master |
| 56 | // threads use arena's top priority as a reference point (in contrast to workers |
| 57 | // that use my_market->my_global_top_priority). |
| 58 | if( is_master ) { |
| 59 | my_ref_top_priority = &a->my_top_priority; |
| 60 | my_ref_reload_epoch = &a->my_reload_epoch; |
| 61 | } |
| 62 | my_local_reload_epoch = *my_ref_reload_epoch; |
| 63 | __TBB_ASSERT( !my_offloaded_tasks, NULL ); |
| 64 | #endif /* __TBB_TASK_PRIORITY */ |
| 65 | } |
| 66 | |
| 67 | inline static bool occupy_slot( generic_scheduler*& slot, generic_scheduler& s ) { |
| 68 | return !slot && as_atomic( slot ).compare_and_swap( &s, NULL ) == NULL; |
| 69 | } |
| 70 | |
| 71 | size_t arena::occupy_free_slot_in_range( generic_scheduler& s, size_t lower, size_t upper ) { |
| 72 | if ( lower >= upper ) return out_of_arena; |
| 73 | // Start search for an empty slot from the one we occupied the last time |
| 74 | size_t index = s.my_arena_index; |
| 75 | if ( index < lower || index >= upper ) index = s.my_random.get() % (upper - lower) + lower; |
| 76 | __TBB_ASSERT( index >= lower && index < upper, NULL ); |
| 77 | // Find a free slot |
| 78 | for ( size_t i = index; i < upper; ++i ) |
| 79 | if ( occupy_slot(my_slots[i].my_scheduler, s) ) return i; |
| 80 | for ( size_t i = lower; i < index; ++i ) |
| 81 | if ( occupy_slot(my_slots[i].my_scheduler, s) ) return i; |
| 82 | return out_of_arena; |
| 83 | } |
| 84 | |
| 85 | template <bool as_worker> |
| 86 | size_t arena::occupy_free_slot( generic_scheduler& s ) { |
| 87 | // Firstly, masters try to occupy reserved slots |
| 88 | size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( s, 0, my_num_reserved_slots ); |
| 89 | if ( index == out_of_arena ) { |
| 90 | // Secondly, all threads try to occupy all non-reserved slots |
| 91 | index = occupy_free_slot_in_range( s, my_num_reserved_slots, my_num_slots ); |
| 92 | // Likely this arena is already saturated |
| 93 | if ( index == out_of_arena ) |
| 94 | return out_of_arena; |
| 95 | } |
| 96 | |
| 97 | ITT_NOTIFY(sync_acquired, my_slots + index); |
| 98 | atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() ); |
| 99 | return index; |
| 100 | } |
| 101 | |
| 102 | void arena::process( generic_scheduler& s ) { |
| 103 | __TBB_ASSERT( is_alive(my_guard), NULL ); |
| 104 | __TBB_ASSERT( governor::is_set(&s), NULL ); |
| 105 | __TBB_ASSERT( s.my_innermost_running_task == s.my_dummy_task, NULL ); |
| 106 | __TBB_ASSERT( s.worker_outermost_level(), NULL ); |
| 107 | |
| 108 | __TBB_ASSERT( my_num_slots > 1, NULL ); |
| 109 | |
| 110 | size_t index = occupy_free_slot</*as_worker*/true>( s ); |
| 111 | if ( index == out_of_arena ) |
| 112 | goto quit; |
| 113 | |
| 114 | __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" ); |
| 115 | s.attach_arena( this, index, /*is_master*/false ); |
| 116 | |
| 117 | #if !__TBB_FP_CONTEXT |
| 118 | my_cpu_ctl_env.set_env(); |
| 119 | #endif |
| 120 | |
| 121 | #if __TBB_ARENA_OBSERVER |
| 122 | __TBB_ASSERT( !s.my_last_local_observer, "There cannot be notified local observers when entering arena" ); |
| 123 | my_observers.notify_entry_observers( s.my_last_local_observer, /*worker=*/true ); |
| 124 | #endif /* __TBB_ARENA_OBSERVER */ |
| 125 | |
| 126 | // Task pool can be marked as non-empty if the worker occupies the slot left by a master. |
| 127 | if ( s.my_arena_slot->task_pool != EmptyTaskPool ) { |
| 128 | __TBB_ASSERT( s.my_inbox.is_idle_state(false), NULL ); |
| 129 | s.local_wait_for_all( *s.my_dummy_task, NULL ); |
| 130 | __TBB_ASSERT( s.my_inbox.is_idle_state(true), NULL ); |
| 131 | } |
| 132 | |
| 133 | for ( ;; ) { |
| 134 | __TBB_ASSERT( s.my_innermost_running_task == s.my_dummy_task, NULL ); |
| 135 | __TBB_ASSERT( s.worker_outermost_level(), NULL ); |
| 136 | __TBB_ASSERT( is_alive(my_guard), NULL ); |
| 137 | __TBB_ASSERT( s.is_quiescent_local_task_pool_reset(), |
| 138 | "Worker cannot leave arena while its task pool is not reset" ); |
| 139 | __TBB_ASSERT( s.my_arena_slot->task_pool == EmptyTaskPool, "Empty task pool is not marked appropriately" ); |
| 140 | // This check prevents relinquishing more than necessary workers because |
| 141 | // of the non-atomicity of the decision making procedure |
| 142 | if ( num_workers_active() > my_num_workers_allotted |
| 143 | #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY |
| 144 | || recall_by_mandatory_request() |
| 145 | #endif |
| 146 | ) |
| 147 | break; |
| 148 | // Try to steal a task. |
| 149 | // Passing reference count is technically unnecessary in this context, |
| 150 | // but omitting it here would add checks inside the function. |
| 151 | task* t = s.receive_or_steal_task( __TBB_ISOLATION_ARG( s.my_dummy_task->prefix().ref_count, no_isolation ) ); |
| 152 | if (t) { |
| 153 | // A side effect of receive_or_steal_task is that my_innermost_running_task can be set. |
| 154 | // But for the outermost dispatch loop it has to be a dummy task. |
| 155 | s.my_innermost_running_task = s.my_dummy_task; |
| 156 | s.local_wait_for_all(*s.my_dummy_task,t); |
| 157 | } |
| 158 | } |
| 159 | #if __TBB_ARENA_OBSERVER |
| 160 | my_observers.notify_exit_observers( s.my_last_local_observer, /*worker=*/true ); |
| 161 | s.my_last_local_observer = NULL; |
| 162 | #endif /* __TBB_ARENA_OBSERVER */ |
| 163 | #if __TBB_TASK_PRIORITY |
| 164 | if ( s.my_offloaded_tasks ) |
| 165 | orphan_offloaded_tasks( s ); |
| 166 | #endif /* __TBB_TASK_PRIORITY */ |
| 167 | #if __TBB_STATISTICS |
| 168 | ++s.my_counters.arena_roundtrips; |
| 169 | *my_slots[index].my_counters += s.my_counters; |
| 170 | s.my_counters.reset(); |
| 171 | #endif /* __TBB_STATISTICS */ |
| 172 | __TBB_store_with_release( my_slots[index].my_scheduler, (generic_scheduler*)NULL ); |
| 173 | s.my_arena_slot = 0; // detached from slot |
| 174 | s.my_inbox.detach(); |
| 175 | __TBB_ASSERT( s.my_inbox.is_idle_state(true), NULL ); |
| 176 | __TBB_ASSERT( s.my_innermost_running_task == s.my_dummy_task, NULL ); |
| 177 | __TBB_ASSERT( s.worker_outermost_level(), NULL ); |
| 178 | __TBB_ASSERT( is_alive(my_guard), NULL ); |
| 179 | quit: |
| 180 | // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible |
| 181 | // that arena may be temporarily left unpopulated by threads. See comments in |
| 182 | // arena::on_thread_leaving() for more details. |
| 183 | on_thread_leaving<ref_worker>(); |
| 184 | } |
| 185 | |
| 186 | arena::arena ( market& m, unsigned num_slots, unsigned num_reserved_slots ) { |
| 187 | __TBB_ASSERT( !my_guard, "improperly allocated arena?" ); |
| 188 | __TBB_ASSERT( sizeof(my_slots[0]) % NFS_GetLineSize()==0, "arena::slot size not multiple of cache line size" ); |
| 189 | __TBB_ASSERT( (uintptr_t)this % NFS_GetLineSize()==0, "arena misaligned" ); |
| 190 | #if __TBB_TASK_PRIORITY |
| 191 | __TBB_ASSERT( !my_reload_epoch && !my_orphaned_tasks && !my_skipped_fifo_priority, "New arena object is not zeroed" ); |
| 192 | #endif /* __TBB_TASK_PRIORITY */ |
| 193 | my_market = &m; |
| 194 | my_limit = 1; |
| 195 | // Two slots are mandatory: for the master, and for 1 worker (required to support starvation resistant tasks). |
| 196 | my_num_slots = num_arena_slots(num_slots); |
| 197 | my_num_reserved_slots = num_reserved_slots; |
| 198 | my_max_num_workers = num_slots-num_reserved_slots; |
| 199 | my_references = ref_external; // accounts for the master |
| 200 | #if __TBB_TASK_PRIORITY |
| 201 | my_bottom_priority = my_top_priority = normalized_normal_priority; |
| 202 | #endif /* __TBB_TASK_PRIORITY */ |
| 203 | my_aba_epoch = m.my_arenas_aba_epoch; |
| 204 | #if __TBB_ARENA_OBSERVER |
| 205 | my_observers.my_arena = this; |
| 206 | #endif |
| 207 | __TBB_ASSERT ( my_max_num_workers <= my_num_slots, NULL ); |
| 208 | // Construct slots. Mark internal synchronization elements for the tools. |
| 209 | for( unsigned i = 0; i < my_num_slots; ++i ) { |
| 210 | __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, NULL ); |
| 211 | __TBB_ASSERT( !my_slots[i].task_pool_ptr, NULL ); |
| 212 | __TBB_ASSERT( !my_slots[i].my_task_pool_size, NULL ); |
| 213 | ITT_SYNC_CREATE(my_slots + i, SyncType_Scheduler, SyncObj_WorkerTaskPool); |
| 214 | mailbox(i+1).construct(); |
| 215 | ITT_SYNC_CREATE(&mailbox(i+1), SyncType_Scheduler, SyncObj_Mailbox); |
| 216 | my_slots[i].hint_for_pop = i; |
| 217 | #if __TBB_PREVIEW_CRITICAL_TASKS |
| 218 | my_slots[i].hint_for_critical = i; |
| 219 | #endif |
| 220 | #if __TBB_STATISTICS |
| 221 | my_slots[i].my_counters = new ( NFS_Allocate(1, sizeof(statistics_counters), NULL) ) statistics_counters; |
| 222 | #endif /* __TBB_STATISTICS */ |
| 223 | } |
| 224 | my_task_stream.initialize(my_num_slots); |
| 225 | ITT_SYNC_CREATE(&my_task_stream, SyncType_Scheduler, SyncObj_TaskStream); |
| 226 | #if __TBB_PREVIEW_CRITICAL_TASKS |
| 227 | my_critical_task_stream.initialize(my_num_slots); |
| 228 | ITT_SYNC_CREATE(&my_critical_task_stream, SyncType_Scheduler, SyncObj_CriticalTaskStream); |
| 229 | #endif |
| 230 | #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY |
| 231 | my_concurrency_mode = cm_normal; |
| 232 | #endif |
| 233 | #if !__TBB_FP_CONTEXT |
| 234 | my_cpu_ctl_env.get_env(); |
| 235 | #endif |
| 236 | } |
| 237 | |
| 238 | arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots ) { |
| 239 | __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" ); |
| 240 | __TBB_ASSERT( sizeof(base_type) % NFS_GetLineSize() == 0, "arena slots area misaligned: wrong padding" ); |
| 241 | __TBB_ASSERT( sizeof(mail_outbox) == NFS_MaxLineSize, "Mailbox padding is wrong" ); |
| 242 | size_t n = allocation_size(num_arena_slots(num_slots)); |
| 243 | unsigned char* storage = (unsigned char*)NFS_Allocate( 1, n, NULL ); |
| 244 | // Zero all slots to indicate that they are empty |
| 245 | memset( storage, 0, n ); |
| 246 | return *new( storage + num_arena_slots(num_slots) * sizeof(mail_outbox) ) arena(m, num_slots, num_reserved_slots); |
| 247 | } |
| 248 | |
| 249 | void arena::free_arena () { |
| 250 | __TBB_ASSERT( is_alive(my_guard), NULL ); |
| 251 | __TBB_ASSERT( !my_references, "There are threads in the dying arena" ); |
| 252 | __TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" ); |
| 253 | __TBB_ASSERT( my_pool_state == SNAPSHOT_EMPTY || !my_max_num_workers, "Inconsistent state of a dying arena" ); |
| 254 | #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY |
| 255 | __TBB_ASSERT( my_concurrency_mode != cm_enforced_global, NULL ); |
| 256 | #endif |
| 257 | #if !__TBB_STATISTICS_EARLY_DUMP |
| 258 | GATHER_STATISTIC( dump_arena_statistics() ); |
| 259 | #endif |
| 260 | poison_value( my_guard ); |
| 261 | intptr_t drained = 0; |
| 262 | for ( unsigned i = 0; i < my_num_slots; ++i ) { |
| 263 | __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" ); |
| 264 | // TODO: understand the assertion and modify |
| 265 | // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, NULL ); |
| 266 | __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, NULL ); // TODO: replace by is_quiescent_local_task_pool_empty |
| 267 | my_slots[i].free_task_pool(); |
| 268 | #if __TBB_STATISTICS |
| 269 | NFS_Free( my_slots[i].my_counters ); |
| 270 | #endif /* __TBB_STATISTICS */ |
| 271 | drained += mailbox(i+1).drain(); |
| 272 | } |
| 273 | __TBB_ASSERT( my_task_stream.drain()==0, "Not all enqueued tasks were executed" ); |
| 274 | #if __TBB_PREVIEW_CRITICAL_TASKS |
| 275 | __TBB_ASSERT( my_critical_task_stream.drain()==0, "Not all critical tasks were executed" ); |
| 276 | #endif |
| 277 | #if __TBB_COUNT_TASK_NODES |
| 278 | my_market->update_task_node_count( -drained ); |
| 279 | #endif /* __TBB_COUNT_TASK_NODES */ |
| 280 | // remove an internal reference |
| 281 | my_market->release( /*is_public=*/false, /*blocking_terminate=*/false ); |
| 282 | #if __TBB_TASK_GROUP_CONTEXT |
| 283 | __TBB_ASSERT( my_default_ctx, "Master thread never entered the arena?" ); |
| 284 | my_default_ctx->~task_group_context(); |
| 285 | NFS_Free(my_default_ctx); |
| 286 | #endif /* __TBB_TASK_GROUP_CONTEXT */ |
| 287 | #if __TBB_ARENA_OBSERVER |
| 288 | if ( !my_observers.empty() ) |
| 289 | my_observers.clear(); |
| 290 | #endif /* __TBB_ARENA_OBSERVER */ |
| 291 | void* storage = &mailbox(my_num_slots); |
| 292 | __TBB_ASSERT( my_references == 0, NULL ); |
| 293 | __TBB_ASSERT( my_pool_state == SNAPSHOT_EMPTY || !my_max_num_workers, NULL ); |
| 294 | this->~arena(); |
| 295 | #if TBB_USE_ASSERT > 1 |
| 296 | memset( storage, 0, allocation_size(my_num_slots) ); |
| 297 | #endif /* TBB_USE_ASSERT */ |
| 298 | NFS_Free( storage ); |
| 299 | } |
| 300 | |
| 301 | #if __TBB_STATISTICS |
| 302 | void arena::dump_arena_statistics () { |
| 303 | statistics_counters total; |
| 304 | for( unsigned i = 0; i < my_num_slots; ++i ) { |
| 305 | #if __TBB_STATISTICS_EARLY_DUMP |
| 306 | generic_scheduler* s = my_slots[i].my_scheduler; |
| 307 | if ( s ) |
| 308 | *my_slots[i].my_counters += s->my_counters; |
| 309 | #else |
| 310 | __TBB_ASSERT( !my_slots[i].my_scheduler, NULL ); |
| 311 | #endif |
| 312 | if ( i != 0 ) { |
| 313 | total += *my_slots[i].my_counters; |
| 314 | dump_statistics( *my_slots[i].my_counters, i ); |
| 315 | } |
| 316 | } |
| 317 | dump_statistics( *my_slots[0].my_counters, 0 ); |
| 318 | #if __TBB_STATISTICS_STDOUT |
| 319 | #if !__TBB_STATISTICS_TOTALS_ONLY |
| 320 | printf( "----------------------------------------------\n" ); |
| 321 | #endif |
| 322 | dump_statistics( total, workers_counters_total ); |
| 323 | total += *my_slots[0].my_counters; |
| 324 | dump_statistics( total, arena_counters_total ); |
| 325 | #if !__TBB_STATISTICS_TOTALS_ONLY |
| 326 | printf( "==============================================\n" ); |
| 327 | #endif |
| 328 | #endif /* __TBB_STATISTICS_STDOUT */ |
| 329 | } |
| 330 | #endif /* __TBB_STATISTICS */ |
| 331 | |
| 332 | #if __TBB_TASK_PRIORITY |
| 333 | // The method inspects a scheduler to determine: |
| 334 | // 1. if it has tasks that can be retrieved and executed (via the return value); |
| 335 | // 2. if it has any tasks at all, including those of lower priority (via tasks_present); |
| 336 | // 3. if it is able to work with enqueued tasks (via dequeuing_possible). |
| 337 | inline bool arena::may_have_tasks ( generic_scheduler* s, bool& tasks_present, bool& dequeuing_possible ) { |
| 338 | if ( !s || s->my_arena != this ) |
| 339 | return false; |
| 340 | dequeuing_possible |= s->worker_outermost_level(); |
| 341 | if ( s->my_pool_reshuffling_pending ) { |
| 342 | // This primary task pool is nonempty and may contain tasks at the current |
| 343 | // priority level. Its owner is winnowing lower priority tasks at the moment. |
| 344 | tasks_present = true; |
| 345 | return true; |
| 346 | } |
| 347 | if ( s->my_offloaded_tasks ) { |
| 348 | tasks_present = true; |
| 349 | if ( s->my_local_reload_epoch < *s->my_ref_reload_epoch ) { |
| 350 | // This scheduler's offload area is nonempty and may contain tasks at the |
| 351 | // current priority level. |
| 352 | return true; |
| 353 | } |
| 354 | } |
| 355 | return false; |
| 356 | } |
| 357 | |
| 358 | void arena::orphan_offloaded_tasks(generic_scheduler& s) { |
| 359 | __TBB_ASSERT( s.my_offloaded_tasks, NULL ); |
| 360 | GATHER_STATISTIC( ++s.my_counters.prio_orphanings ); |
| 361 | ++my_abandonment_epoch; |
| 362 | __TBB_ASSERT( s.my_offloaded_task_list_tail_link && !*s.my_offloaded_task_list_tail_link, NULL ); |
| 363 | task* orphans; |
| 364 | do { |
| 365 | orphans = const_cast<task*>(my_orphaned_tasks); |
| 366 | *s.my_offloaded_task_list_tail_link = orphans; |
| 367 | } while ( as_atomic(my_orphaned_tasks).compare_and_swap(s.my_offloaded_tasks, orphans) != orphans ); |
| 368 | s.my_offloaded_tasks = NULL; |
| 369 | #if TBB_USE_ASSERT |
| 370 | s.my_offloaded_task_list_tail_link = NULL; |
| 371 | #endif /* TBB_USE_ASSERT */ |
| 372 | } |
| 373 | #endif /* __TBB_TASK_PRIORITY */ |
| 374 | |
| 375 | bool arena::has_enqueued_tasks() { |
| 376 | // Look for enqueued tasks at all priority levels |
| 377 | for ( int p = 0; p < num_priority_levels; ++p ) |
| 378 | if ( !my_task_stream.empty(p) ) |
| 379 | return true; |
| 380 | return false; |
| 381 | } |
| 382 | |
| 383 | void arena::restore_priority_if_need() { |
| 384 | // Check for the presence of enqueued tasks "lost" on some of |
| 385 | // priority levels because updating arena priority and switching |
| 386 | // arena into "populated" (FULL) state happen non-atomically. |
| 387 | // Imposing atomicity would require task::enqueue() to use a lock, |
| 388 | // which is unacceptable. |
| 389 | if ( has_enqueued_tasks() ) { |
| 390 | advertise_new_work<work_enqueued>(); |
| 391 | #if __TBB_TASK_PRIORITY |
| 392 | // update_arena_priority() expects non-zero arena::my_num_workers_requested, |
| 393 | // so must be called after advertise_new_work<work_enqueued>() |
| 394 | for ( int p = 0; p < num_priority_levels; ++p ) |
| 395 | if ( !my_task_stream.empty(p) ) { |
| 396 | if ( p < my_bottom_priority || p > my_top_priority ) |
| 397 | my_market->update_arena_priority(*this, p); |
| 398 | } |
| 399 | #endif |
| 400 | } |
| 401 | } |
| 402 | |
| 403 | bool arena::is_out_of_work() { |
| 404 | // TODO: rework it to return at least a hint about where a task was found; better if the task itself. |
| 405 | for(;;) { |
| 406 | pool_state_t snapshot = my_pool_state; |
| 407 | switch( snapshot ) { |
| 408 | case SNAPSHOT_EMPTY: |
| 409 | return true; |
| 410 | case SNAPSHOT_FULL: { |
| 411 | // Use unique id for "busy" in order to avoid ABA problems. |
| 412 | const pool_state_t busy = pool_state_t(&busy); |
| 413 | // Request permission to take snapshot |
| 414 | if( my_pool_state.compare_and_swap( busy, SNAPSHOT_FULL )==SNAPSHOT_FULL ) { |
| 415 | // Got permission. Take the snapshot. |
| 416 | // NOTE: This is not a lock, as the state can be set to FULL at |
| 417 | // any moment by a thread that spawns/enqueues new task. |
| 418 | size_t n = my_limit; |
| 419 | // Make local copies of volatile parameters. Their change during |
| 420 | // snapshot taking procedure invalidates the attempt, and returns |
| 421 | // this thread into the dispatch loop. |
| 422 | #if __TBB_TASK_PRIORITY |
| 423 | uintptr_t reload_epoch = __TBB_load_with_acquire( my_reload_epoch ); |
| 424 | intptr_t top_priority = my_top_priority; |
| 425 | // Inspect primary task pools first |
| 426 | #endif /* __TBB_TASK_PRIORITY */ |
| 427 | size_t k; |
| 428 | for( k=0; k<n; ++k ) { |
| 429 | if( my_slots[k].task_pool != EmptyTaskPool && |
| 430 | __TBB_load_relaxed(my_slots[k].head) < __TBB_load_relaxed(my_slots[k].tail) ) |
| 431 | { |
| 432 | // k-th primary task pool is nonempty and does contain tasks. |
| 433 | break; |
| 434 | } |
| 435 | if( my_pool_state!=busy ) |
| 436 | return false; // the work was published |
| 437 | } |
| 438 | __TBB_ASSERT( k <= n, NULL ); |
| 439 | bool work_absent = k == n; |
| 440 | #if __TBB_PREVIEW_CRITICAL_TASKS |
| 441 | bool no_critical_tasks = my_critical_task_stream.empty(0); |
| 442 | work_absent &= no_critical_tasks; |
| 443 | #endif |
| 444 | #if __TBB_TASK_PRIORITY |
| 445 | // Variable tasks_present indicates presence of tasks at any priority |
| 446 | // level, while work_absent refers only to the current priority. |
| 447 | bool tasks_present = !work_absent || my_orphaned_tasks; |
| 448 | bool dequeuing_possible = false; |
| 449 | if ( work_absent ) { |
| 450 | // Check for the possibility that recent priority changes |
| 451 | // brought some tasks to the current priority level |
| 452 | |
| 453 | uintptr_t abandonment_epoch = my_abandonment_epoch; |
| 454 | // Master thread's scheduler needs special handling as it |
| 455 | // may be destroyed at any moment (workers' schedulers are |
| 456 | // guaranteed to be alive while at least one thread is in arena). |
| 457 | // The lock below excludes concurrency with task group state change |
| 458 | // propagation and guarantees lifetime of the master thread. |
| 459 | the_context_state_propagation_mutex.lock(); |
| 460 | work_absent = !may_have_tasks( my_slots[0].my_scheduler, tasks_present, dequeuing_possible ); |
| 461 | the_context_state_propagation_mutex.unlock(); |
| 462 | // The following loop is subject to data races. While k-th slot's |
| 463 | // scheduler is being examined, corresponding worker can either |
| 464 | // leave to RML or migrate to another arena. |
| 465 | // But the races are not prevented because all of them are benign. |
| 466 | // First, the code relies on the fact that worker thread's scheduler |
| 467 | // object persists until the whole library is deinitialized. |
| 468 | // Second, in the worst case the races can only cause another |
| 469 | // round of stealing attempts to be undertaken. Introducing complex |
| 470 | // synchronization into this coldest part of the scheduler's control |
| 471 | // flow does not seem to make sense because it both is unlikely to |
| 472 | // ever have any observable performance effect, and will require |
| 473 | // additional synchronization code on the hotter paths. |
| 474 | for( k = 1; work_absent && k < n; ++k ) { |
| 475 | if( my_pool_state!=busy ) |
| 476 | return false; // the work was published |
| 477 | work_absent = !may_have_tasks( my_slots[k].my_scheduler, tasks_present, dequeuing_possible ); |
| 478 | } |
| 479 | // Preclude premature switching arena off because of a race in the previous loop. |
| 480 | work_absent = work_absent |
| 481 | && !__TBB_load_with_acquire(my_orphaned_tasks) |
| 482 | && abandonment_epoch == my_abandonment_epoch; |
| 483 | } |
| 484 | #endif /* __TBB_TASK_PRIORITY */ |
| 485 | // Test and test-and-set. |
| 486 | if( my_pool_state==busy ) { |
| 487 | #if __TBB_TASK_PRIORITY |
| 488 | bool no_fifo_tasks = my_task_stream.empty(top_priority); |
| 489 | work_absent = work_absent && (!dequeuing_possible || no_fifo_tasks) |
| 490 | && top_priority == my_top_priority && reload_epoch == my_reload_epoch; |
| 491 | #else |
| 492 | bool no_fifo_tasks = my_task_stream.empty(0); |
| 493 | work_absent = work_absent && no_fifo_tasks; |
| 494 | #endif /* __TBB_TASK_PRIORITY */ |
| 495 | if( work_absent ) { |
| 496 | #if __TBB_TASK_PRIORITY |
| 497 | if ( top_priority > my_bottom_priority ) { |
| 498 | if ( my_market->lower_arena_priority(*this, top_priority - 1, reload_epoch) |
| 499 | && !my_task_stream.empty(top_priority) ) |
| 500 | { |
| 501 | atomic_update( my_skipped_fifo_priority, top_priority, std::less<intptr_t>()); |
| 502 | } |
| 503 | } |
| 504 | else if ( !tasks_present && !my_orphaned_tasks && no_fifo_tasks ) { |
| 505 | #endif /* __TBB_TASK_PRIORITY */ |
| 506 | // save current demand value before setting SNAPSHOT_EMPTY, |
| 507 | // to avoid race with advertise_new_work. |
| 508 | int current_demand = (int)my_max_num_workers; |
| 509 | if( my_pool_state.compare_and_swap( SNAPSHOT_EMPTY, busy )==busy ) { |
| 510 | #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY |
| 511 | if( my_concurrency_mode==cm_enforced_global ) { |
| 512 | // adjust_demand() called inside, if needed |
| 513 | my_market->mandatory_concurrency_disable( this ); |
| 514 | } else |
| 515 | #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */ |
| 516 | { |
| 517 | // This thread transitioned pool to empty state, and thus is |
| 518 | // responsible for telling the market that there is no work to do. |
| 519 | my_market->adjust_demand( *this, -current_demand ); |
| 520 | } |
| 521 | restore_priority_if_need(); |
| 522 | return true; |
| 523 | } |
| 524 | return false; |
| 525 | #if __TBB_TASK_PRIORITY |
| 526 | } |
| 527 | #endif /* __TBB_TASK_PRIORITY */ |
| 528 | } |
| 529 | // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it. |
| 530 | my_pool_state.compare_and_swap( SNAPSHOT_FULL, busy ); |
| 531 | } |
| 532 | } |
| 533 | return false; |
| 534 | } |
| 535 | default: |
| 536 | // Another thread is taking a snapshot. |
| 537 | return false; |
| 538 | } |
| 539 | } |
| 540 | } |
| 541 | |
| 542 | #if __TBB_COUNT_TASK_NODES |
| 543 | intptr_t arena::workers_task_node_count() { |
| 544 | intptr_t result = 0; |
| 545 | for( unsigned i = 1; i < my_num_slots; ++i ) { |
| 546 | generic_scheduler* s = my_slots[i].my_scheduler; |
| 547 | if( s ) |
| 548 | result += s->my_task_node_count; |
| 549 | } |
| 550 | return result; |
| 551 | } |
| 552 | #endif /* __TBB_COUNT_TASK_NODES */ |
| 553 | |
| 554 | void arena::enqueue_task( task& t, intptr_t prio, FastRandom &random ) |
| 555 | { |
| 556 | #if __TBB_RECYCLE_TO_ENQUEUE |
| 557 | __TBB_ASSERT( t.state()==task::allocated || t.state()==task::to_enqueue, "attempt to enqueue task with inappropriate state" ); |
| 558 | #else |
| 559 | __TBB_ASSERT( t.state()==task::allocated, "attempt to enqueue task that is not in 'allocated' state" ); |
| 560 | #endif |
| 561 | t.prefix().state = task::ready; |
| 562 | t.prefix().extra_state |= es_task_enqueued; // enqueued task marker |
| 563 | |
| 564 | #if TBB_USE_ASSERT |
| 565 | if( task* parent = t.parent() ) { |
| 566 | internal::reference_count ref_count = parent->prefix().ref_count; |
| 567 | __TBB_ASSERT( ref_count!=0, "attempt to enqueue task whose parent has a ref_count==0 (forgot to set_ref_count?)" ); |
| 568 | __TBB_ASSERT( ref_count>0, "attempt to enqueue task whose parent has a ref_count<0" ); |
| 569 | parent->prefix().extra_state |= es_ref_count_active; |
| 570 | } |
| 571 | __TBB_ASSERT(t.prefix().affinity==affinity_id(0), "affinity is ignored for enqueued tasks" ); |
| 572 | #endif /* TBB_USE_ASSERT */ |
| 573 | #if __TBB_PREVIEW_CRITICAL_TASKS |
| 574 | if( prio == internal::priority_critical || internal::is_critical( t ) ) { |
| 575 | // TODO: consider using of 'scheduler::handled_as_critical' |
| 576 | internal::make_critical( t ); |
| 577 | generic_scheduler* s = governor::local_scheduler_if_initialized(); |
| 578 | ITT_NOTIFY(sync_releasing, &my_critical_task_stream); |
| 579 | if( s && s->my_arena_slot ) { |
| 580 | // Scheduler is initialized and it is attached to the arena, |
| 581 | // propagate isolation level to critical task |
| 582 | #if __TBB_TASK_ISOLATION |
| 583 | t.prefix().isolation = s->my_innermost_running_task->prefix().isolation; |
| 584 | #endif |
| 585 | unsigned& lane = s->my_arena_slot->hint_for_critical; |
| 586 | my_critical_task_stream.push( &t, 0, tbb::internal::subsequent_lane_selector(lane) ); |
| 587 | } else { |
| 588 | // Either scheduler is not initialized or it is not attached to the arena |
| 589 | // use random lane for the task |
| 590 | my_critical_task_stream.push( &t, 0, internal::random_lane_selector(random) ); |
| 591 | } |
| 592 | advertise_new_work<work_spawned>(); |
| 593 | return; |
| 594 | } |
| 595 | #endif /* __TBB_PREVIEW_CRITICAL_TASKS */ |
| 596 | |
| 597 | ITT_NOTIFY(sync_releasing, &my_task_stream); |
| 598 | #if __TBB_TASK_PRIORITY |
| 599 | intptr_t p = prio ? normalize_priority(priority_t(prio)) : normalized_normal_priority; |
| 600 | assert_priority_valid(p); |
| 601 | #if __TBB_PREVIEW_CRITICAL_TASKS && __TBB_CPF_BUILD |
| 602 | my_task_stream.push( &t, p, internal::random_lane_selector(random) ); |
| 603 | #else |
| 604 | my_task_stream.push( &t, p, random ); |
| 605 | #endif |
| 606 | if ( p != my_top_priority ) |
| 607 | my_market->update_arena_priority( *this, p ); |
| 608 | #else /* !__TBB_TASK_PRIORITY */ |
| 609 | __TBB_ASSERT_EX(prio == 0, "the library is not configured to respect the task priority" ); |
| 610 | #if __TBB_PREVIEW_CRITICAL_TASKS && __TBB_CPF_BUILD |
| 611 | my_task_stream.push( &t, 0, internal::random_lane_selector(random) ); |
| 612 | #else |
| 613 | my_task_stream.push( &t, 0, random ); |
| 614 | #endif |
| 615 | #endif /* !__TBB_TASK_PRIORITY */ |
| 616 | advertise_new_work<work_enqueued>(); |
| 617 | #if __TBB_TASK_PRIORITY |
| 618 | if ( p != my_top_priority ) |
| 619 | my_market->update_arena_priority( *this, p ); |
| 620 | #endif /* __TBB_TASK_PRIORITY */ |
| 621 | } |
| 622 | |
| 623 | class nested_arena_context : no_copy { |
| 624 | public: |
| 625 | nested_arena_context(generic_scheduler *s, arena* a, size_t slot_index, bool type, bool same) |
| 626 | : my_scheduler(*s), my_orig_ctx(NULL), same_arena(same) { |
| 627 | if (same_arena) { |
| 628 | my_orig_state.my_properties = my_scheduler.my_properties; |
| 629 | my_orig_state.my_innermost_running_task = my_scheduler.my_innermost_running_task; |
| 630 | mimic_outermost_level(a, type); |
| 631 | } else { |
| 632 | my_orig_state = *s; |
| 633 | mimic_outermost_level(a, type); |
| 634 | s->nested_arena_entry(a, slot_index); |
| 635 | } |
| 636 | } |
| 637 | ~nested_arena_context() { |
| 638 | #if __TBB_TASK_GROUP_CONTEXT |
| 639 | my_scheduler.my_dummy_task->prefix().context = my_orig_ctx; // restore context of dummy task |
| 640 | #endif |
| 641 | if (same_arena) { |
| 642 | my_scheduler.my_properties = my_orig_state.my_properties; |
| 643 | my_scheduler.my_innermost_running_task = my_orig_state.my_innermost_running_task; |
| 644 | } else { |
| 645 | my_scheduler.nested_arena_exit(); |
| 646 | static_cast<scheduler_state&>(my_scheduler) = my_orig_state; // restore arena settings |
| 647 | #if __TBB_TASK_PRIORITY |
| 648 | my_scheduler.my_local_reload_epoch = *my_orig_state.my_ref_reload_epoch; |
| 649 | #endif |
| 650 | governor::assume_scheduler(&my_scheduler); |
| 651 | } |
| 652 | } |
| 653 | |
| 654 | private: |
| 655 | generic_scheduler &my_scheduler; |
| 656 | scheduler_state my_orig_state; |
| 657 | task_group_context *my_orig_ctx; |
| 658 | const bool same_arena; |
| 659 | |
| 660 | void mimic_outermost_level(arena* a, bool type) { |
| 661 | my_scheduler.my_properties.outermost = true; |
| 662 | my_scheduler.my_properties.type = type; |
| 663 | my_scheduler.my_innermost_running_task = my_scheduler.my_dummy_task; |
| 664 | #if __TBB_PREVIEW_CRITICAL_TASKS |
| 665 | my_scheduler.my_properties.has_taken_critical_task = false; |
| 666 | #endif |
| 667 | #if __TBB_TASK_GROUP_CONTEXT |
| 668 | // Save dummy's context and replace it by arena's context |
| 669 | my_orig_ctx = my_scheduler.my_dummy_task->prefix().context; |
| 670 | my_scheduler.my_dummy_task->prefix().context = a->my_default_ctx; |
| 671 | #endif |
| 672 | } |
| 673 | }; |
| 674 | |
| 675 | void generic_scheduler::nested_arena_entry(arena* a, size_t slot_index) { |
| 676 | __TBB_ASSERT( is_alive(a->my_guard), NULL ); |
| 677 | __TBB_ASSERT( a!=my_arena, NULL); |
| 678 | |
| 679 | // overwrite arena settings |
| 680 | #if __TBB_TASK_PRIORITY |
| 681 | if ( my_offloaded_tasks ) |
| 682 | my_arena->orphan_offloaded_tasks( *this ); |
| 683 | my_offloaded_tasks = NULL; |
| 684 | #endif /* __TBB_TASK_PRIORITY */ |
| 685 | attach_arena( a, slot_index, /*is_master*/true ); |
| 686 | __TBB_ASSERT( my_arena == a, NULL ); |
| 687 | governor::assume_scheduler( this ); |
| 688 | // TODO? ITT_NOTIFY(sync_acquired, a->my_slots + index); |
| 689 | // TODO: it requires market to have P workers (not P-1) |
| 690 | // TODO: a preempted worker should be excluded from assignment to other arenas e.g. my_slack-- |
| 691 | if( !is_worker() && slot_index >= my_arena->my_num_reserved_slots ) |
| 692 | my_arena->my_market->adjust_demand(*my_arena, -1); |
| 693 | #if __TBB_ARENA_OBSERVER |
| 694 | my_last_local_observer = 0; // TODO: try optimize number of calls |
| 695 | my_arena->my_observers.notify_entry_observers( my_last_local_observer, /*worker=*/false ); |
| 696 | #endif |
| 697 | } |
| 698 | |
| 699 | void generic_scheduler::nested_arena_exit() { |
| 700 | #if __TBB_ARENA_OBSERVER |
| 701 | my_arena->my_observers.notify_exit_observers( my_last_local_observer, /*worker=*/false ); |
| 702 | #endif /* __TBB_ARENA_OBSERVER */ |
| 703 | #if __TBB_TASK_PRIORITY |
| 704 | if ( my_offloaded_tasks ) |
| 705 | my_arena->orphan_offloaded_tasks( *this ); |
| 706 | #endif |
| 707 | if( !is_worker() && my_arena_index >= my_arena->my_num_reserved_slots ) |
| 708 | my_arena->my_market->adjust_demand(*my_arena, 1); |
| 709 | // Free the master slot. |
| 710 | __TBB_ASSERT(my_arena->my_slots[my_arena_index].my_scheduler, "A slot is already empty" ); |
| 711 | __TBB_store_with_release(my_arena->my_slots[my_arena_index].my_scheduler, (generic_scheduler*)NULL); |
| 712 | my_arena->my_exit_monitors.notify_one(); // do not relax! |
| 713 | } |
| 714 | |
| 715 | void generic_scheduler::wait_until_empty() { |
| 716 | my_dummy_task->prefix().ref_count++; // prevents exit from local_wait_for_all when local work is done enforcing the stealing |
| 717 | while( my_arena->my_pool_state != arena::SNAPSHOT_EMPTY ) |
| 718 | local_wait_for_all(*my_dummy_task, NULL); |
| 719 | my_dummy_task->prefix().ref_count--; |
| 720 | } |
| 721 | |
| 722 | } // namespace internal |
| 723 | } // namespace tbb |
| 724 | |
| 725 | #include "scheduler_utility.h" |
| 726 | #include "tbb/task_arena.h" // task_arena_base |
| 727 | |
| 728 | namespace tbb { |
| 729 | namespace interface7 { |
| 730 | namespace internal { |
| 731 | |
| 732 | void task_arena_base::internal_initialize( ) { |
| 733 | governor::one_time_init(); |
| 734 | if( my_max_concurrency < 1 ) |
| 735 | my_max_concurrency = (int)governor::default_num_threads(); |
| 736 | __TBB_ASSERT( my_master_slots <= (unsigned)my_max_concurrency, "Number of slots reserved for master should not exceed arena concurrency" ); |
| 737 | arena* new_arena = market::create_arena( my_max_concurrency, my_master_slots, 0 ); |
| 738 | // add an internal market reference; a public reference was added in create_arena |
| 739 | market &m = market::global_market( /*is_public=*/false ); |
| 740 | // allocate default context for task_arena |
| 741 | #if __TBB_TASK_GROUP_CONTEXT |
| 742 | new_arena->my_default_ctx = new ( NFS_Allocate(1, sizeof(task_group_context), NULL) ) |
| 743 | task_group_context( task_group_context::isolated, task_group_context::default_traits ); |
| 744 | #if __TBB_FP_CONTEXT |
| 745 | new_arena->my_default_ctx->capture_fp_settings(); |
| 746 | #endif |
| 747 | #endif /* __TBB_TASK_GROUP_CONTEXT */ |
| 748 | // threads might race to initialize the arena |
| 749 | if(as_atomic(my_arena).compare_and_swap(new_arena, NULL) != NULL) { |
| 750 | __TBB_ASSERT(my_arena, NULL); // another thread won the race |
| 751 | // release public market reference |
| 752 | m.release( /*is_public=*/true, /*blocking_terminate=*/false ); |
| 753 | new_arena->on_thread_leaving<arena::ref_external>(); // destroy unneeded arena |
| 754 | #if __TBB_TASK_GROUP_CONTEXT |
| 755 | spin_wait_while_eq(my_context, (task_group_context*)NULL); |
| 756 | } else { |
| 757 | new_arena->my_default_ctx->my_version_and_traits |= my_version_and_traits & exact_exception_flag; |
| 758 | as_atomic(my_context) = new_arena->my_default_ctx; |
| 759 | #endif |
| 760 | } |
| 761 | // TODO: should it trigger automatic initialization of this thread? |
| 762 | governor::local_scheduler_weak(); |
| 763 | } |
| 764 | |
| 765 | void task_arena_base::internal_terminate( ) { |
| 766 | if( my_arena ) {// task_arena was initialized |
| 767 | my_arena->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false ); |
| 768 | my_arena->on_thread_leaving<arena::ref_external>(); |
| 769 | my_arena = 0; |
| 770 | #if __TBB_TASK_GROUP_CONTEXT |
| 771 | my_context = 0; |
| 772 | #endif |
| 773 | } |
| 774 | } |
| 775 | |
| 776 | void task_arena_base::internal_attach( ) { |
| 777 | __TBB_ASSERT(!my_arena, NULL); |
| 778 | generic_scheduler* s = governor::local_scheduler_if_initialized(); |
| 779 | if( s && s->my_arena ) { |
| 780 | // There is an active arena to attach to. |
| 781 | // It's still used by s, so won't be destroyed right away. |
| 782 | my_arena = s->my_arena; |
| 783 | __TBB_ASSERT( my_arena->my_references > 0, NULL ); |
| 784 | my_arena->my_references += arena::ref_external; |
| 785 | #if __TBB_TASK_GROUP_CONTEXT |
| 786 | my_context = my_arena->my_default_ctx; |
| 787 | my_version_and_traits |= my_context->my_version_and_traits & exact_exception_flag; |
| 788 | #endif |
| 789 | my_master_slots = my_arena->my_num_reserved_slots; |
| 790 | my_max_concurrency = my_master_slots + my_arena->my_max_num_workers; |
| 791 | __TBB_ASSERT(arena::num_arena_slots(my_max_concurrency)==my_arena->my_num_slots, NULL); |
| 792 | // increases market's ref count for task_arena |
| 793 | market::global_market( /*is_public=*/true ); |
| 794 | } |
| 795 | } |
| 796 | |
| 797 | void task_arena_base::internal_enqueue( task& t, intptr_t prio ) const { |
| 798 | __TBB_ASSERT(my_arena, NULL); |
| 799 | generic_scheduler* s = governor::local_scheduler_weak(); // scheduler is only needed for FastRandom instance |
| 800 | __TBB_ASSERT(s, "Scheduler is not initialized" ); // we allocated a task so can expect the scheduler |
| 801 | #if __TBB_TASK_GROUP_CONTEXT |
| 802 | // Is there a better place for checking the state of my_default_ctx? |
| 803 | __TBB_ASSERT(!(my_arena->my_default_ctx == t.prefix().context && my_arena->my_default_ctx->is_group_execution_cancelled()), |
| 804 | "The task will not be executed because default task_group_context of task_arena is cancelled. Has previously enqueued task thrown an exception?" ); |
| 805 | #endif |
| 806 | my_arena->enqueue_task( t, prio, s->my_random ); |
| 807 | } |
| 808 | |
| 809 | class delegated_task : public task { |
| 810 | internal::delegate_base & my_delegate; |
| 811 | concurrent_monitor & my_monitor; |
| 812 | task * my_root; |
| 813 | task* execute() __TBB_override { |
| 814 | generic_scheduler& s = *(generic_scheduler*)prefix().owner; |
| 815 | __TBB_ASSERT(s.outermost_level(), "expected to be enqueued and received on the outermost level" ); |
| 816 | struct outermost_context : internal::no_copy { |
| 817 | delegated_task * t; |
| 818 | generic_scheduler & s; |
| 819 | task * orig_dummy; |
| 820 | task_group_context * orig_ctx; |
| 821 | scheduler_properties orig_props; |
| 822 | outermost_context(delegated_task *_t, generic_scheduler &_s) |
| 823 | : t(_t), s(_s), orig_dummy(s.my_dummy_task), orig_props(s.my_properties) { |
| 824 | __TBB_ASSERT(s.my_innermost_running_task == t, NULL); |
| 825 | #if __TBB_TASK_GROUP_CONTEXT |
| 826 | orig_ctx = t->prefix().context; |
| 827 | t->prefix().context = s.my_arena->my_default_ctx; |
| 828 | #endif |
| 829 | // Mimics outermost master |
| 830 | s.my_dummy_task = t; |
| 831 | s.my_properties.type = scheduler_properties::master; |
| 832 | } |
| 833 | ~outermost_context() { |
| 834 | #if __TBB_TASK_GROUP_CONTEXT |
| 835 | // Restore context for sake of registering potential exception |
| 836 | t->prefix().context = orig_ctx; |
| 837 | #endif |
| 838 | s.my_properties = orig_props; |
| 839 | s.my_dummy_task = orig_dummy; |
| 840 | } |
| 841 | } scope(this, s); |
| 842 | my_delegate(); |
| 843 | return NULL; |
| 844 | } |
| 845 | ~delegated_task() { |
| 846 | // potential exception was already registered. It must happen before the notification |
| 847 | __TBB_ASSERT(my_root->ref_count()==2, NULL); |
| 848 | __TBB_store_with_release(my_root->prefix().ref_count, 1); // must precede the wakeup |
| 849 | my_monitor.notify(*this); // do not relax, it needs a fence! |
| 850 | } |
| 851 | public: |
| 852 | delegated_task( internal::delegate_base & d, concurrent_monitor & s, task * t ) |
| 853 | : my_delegate(d), my_monitor(s), my_root(t) {} |
| 854 | // predicate for concurrent_monitor notification |
| 855 | bool operator()(uintptr_t ctx) const { return (void*)ctx == (void*)&my_delegate; } |
| 856 | }; |
| 857 | |
| 858 | void task_arena_base::internal_execute(internal::delegate_base& d) const { |
| 859 | __TBB_ASSERT(my_arena, NULL); |
| 860 | generic_scheduler* s = governor::local_scheduler_weak(); |
| 861 | __TBB_ASSERT(s, "Scheduler is not initialized" ); |
| 862 | |
| 863 | bool same_arena = s->my_arena == my_arena; |
| 864 | size_t index1 = s->my_arena_index; |
| 865 | if (!same_arena) { |
| 866 | index1 = my_arena->occupy_free_slot</* as_worker*/false>(*s); |
| 867 | if (index1 == arena::out_of_arena) { |
| 868 | |
| 869 | #if __TBB_USE_OPTIONAL_RTTI |
| 870 | // Workaround for the bug inside graph. If the thread can not occupy arena slot during task_arena::execute() |
| 871 | // and all aggregator operations depend on this task completion (all other threads are inside arena already) |
| 872 | // deadlock appears, because enqueued task will never enter arena. |
| 873 | // Workaround: check if the task came from graph via RTTI (casting to graph::spawn_functor) |
| 874 | // and enqueue this task with non-blocking internal_enqueue method. |
| 875 | // TODO: have to change behaviour later in next GOLD release (maybe to add new library entry point - try_execute) |
| 876 | typedef tbb::flow::interface10::graph::spawn_functor graph_funct; |
| 877 | internal::delegated_function< graph_funct, void >* deleg_funct = |
| 878 | dynamic_cast< internal::delegated_function< graph_funct, void>* >(&d); |
| 879 | |
| 880 | if (deleg_funct) { |
| 881 | internal_enqueue(*new(task::allocate_root(*my_context)) |
| 882 | internal::function_task< internal::strip< graph_funct >::type > |
| 883 | (internal::forward< graph_funct >(deleg_funct->my_func)), 0); |
| 884 | return; |
| 885 | } else { |
| 886 | #endif /* __TBB_USE_OPTIONAL_RTTI */ |
| 887 | concurrent_monitor::thread_context waiter; |
| 888 | #if __TBB_TASK_GROUP_CONTEXT |
| 889 | task_group_context exec_context(task_group_context::isolated, my_version_and_traits & exact_exception_flag); |
| 890 | #if __TBB_FP_CONTEXT |
| 891 | exec_context.copy_fp_settings(*my_context); |
| 892 | #endif |
| 893 | #endif |
| 894 | auto_empty_task root(__TBB_CONTEXT_ARG(s, &exec_context)); |
| 895 | root.prefix().ref_count = 2; |
| 896 | my_arena->enqueue_task(*new(task::allocate_root(__TBB_CONTEXT_ARG1(exec_context))) |
| 897 | delegated_task(d, my_arena->my_exit_monitors, &root), |
| 898 | 0, s->my_random); // TODO: priority? |
| 899 | size_t index2 = arena::out_of_arena; |
| 900 | do { |
| 901 | my_arena->my_exit_monitors.prepare_wait(waiter, (uintptr_t)&d); |
| 902 | if (__TBB_load_with_acquire(root.prefix().ref_count) < 2) { |
| 903 | my_arena->my_exit_monitors.cancel_wait(waiter); |
| 904 | break; |
| 905 | } |
| 906 | index2 = my_arena->occupy_free_slot</*as_worker*/false>(*s); |
| 907 | if (index2 != arena::out_of_arena) { |
| 908 | my_arena->my_exit_monitors.cancel_wait(waiter); |
| 909 | nested_arena_context scope(s, my_arena, index2, scheduler_properties::master, same_arena); |
| 910 | s->local_wait_for_all(root, NULL); |
| 911 | #if TBB_USE_EXCEPTIONS |
| 912 | __TBB_ASSERT(!exec_context.my_exception, NULL); // exception can be thrown above, not deferred |
| 913 | #endif |
| 914 | __TBB_ASSERT(root.prefix().ref_count == 0, NULL); |
| 915 | break; |
| 916 | } |
| 917 | my_arena->my_exit_monitors.commit_wait(waiter); |
| 918 | } while (__TBB_load_with_acquire(root.prefix().ref_count) == 2); |
| 919 | if (index2 == arena::out_of_arena) { |
| 920 | // notify a waiting thread even if this thread did not enter arena, |
| 921 | // in case it was woken by a leaving thread but did not need to enter |
| 922 | my_arena->my_exit_monitors.notify_one(); // do not relax! |
| 923 | } |
| 924 | #if TBB_USE_EXCEPTIONS |
| 925 | // process possible exception |
| 926 | if (task_group_context::exception_container_type *pe = exec_context.my_exception) |
| 927 | TbbRethrowException(pe); |
| 928 | #endif |
| 929 | return; |
| 930 | #if __TBB_USE_OPTIONAL_RTTI |
| 931 | } // if task came from graph |
| 932 | #endif |
| 933 | } // if (index1 == arena::out_of_arena) |
| 934 | } // if (!same_arena) |
| 935 | |
| 936 | context_guard_helper</*report_tasks=*/false> context_guard; |
| 937 | context_guard.set_ctx(__TBB_CONTEXT_ARG1(my_context)); |
| 938 | #if TBB_USE_EXCEPTIONS |
| 939 | try { |
| 940 | #endif |
| 941 | //TODO: replace dummy tasks for workers as well to avoid using of the_dummy_context |
| 942 | nested_arena_context scope(s, my_arena, index1, scheduler_properties::master, same_arena); |
| 943 | d(); |
| 944 | #if TBB_USE_EXCEPTIONS |
| 945 | } |
| 946 | catch (...) { |
| 947 | context_guard.restore_default(); // TODO: is it needed on Windows? |
| 948 | if (my_version_and_traits & exact_exception_flag) throw; |
| 949 | else { |
| 950 | task_group_context exception_container(task_group_context::isolated, |
| 951 | task_group_context::default_traits & ~task_group_context::exact_exception); |
| 952 | exception_container.register_pending_exception(); |
| 953 | __TBB_ASSERT(exception_container.my_exception, NULL); |
| 954 | TbbRethrowException(exception_container.my_exception); |
| 955 | } |
| 956 | } |
| 957 | #endif |
| 958 | } |
| 959 | |
| 960 | // this wait task is a temporary approach to wait for arena emptiness for masters without slots |
| 961 | // TODO: it will be rather reworked for one source of notification from is_out_of_work |
| 962 | class wait_task : public task { |
| 963 | binary_semaphore & my_signal; |
| 964 | task* execute() __TBB_override { |
| 965 | generic_scheduler* s = governor::local_scheduler_if_initialized(); |
| 966 | __TBB_ASSERT( s, NULL ); |
| 967 | __TBB_ASSERT( s->outermost_level(), "The enqueued task can be processed only on outermost level" ); |
| 968 | if ( s->is_worker() ) { |
| 969 | __TBB_ASSERT( s->my_innermost_running_task == this, NULL ); |
| 970 | // Mimic worker on outermost level to run remaining tasks |
| 971 | s->my_innermost_running_task = s->my_dummy_task; |
| 972 | s->local_wait_for_all( *s->my_dummy_task, NULL ); |
| 973 | s->my_innermost_running_task = this; |
| 974 | } else s->my_arena->is_out_of_work(); // avoids starvation of internal_wait: issuing this task makes arena full |
| 975 | my_signal.V(); |
| 976 | return NULL; |
| 977 | } |
| 978 | public: |
| 979 | wait_task ( binary_semaphore & sema ) : my_signal(sema) {} |
| 980 | }; |
| 981 | |
| 982 | void task_arena_base::internal_wait() const { |
| 983 | __TBB_ASSERT(my_arena, NULL); |
| 984 | generic_scheduler* s = governor::local_scheduler_weak(); |
| 985 | __TBB_ASSERT(s, "Scheduler is not initialized" ); |
| 986 | __TBB_ASSERT(s->my_arena != my_arena || s->my_arena_index == 0, "task_arena::wait_until_empty() is not supported within a worker context" ); |
| 987 | if( s->my_arena == my_arena ) { |
| 988 | //unsupported, but try do something for outermost master |
| 989 | __TBB_ASSERT(s->master_outermost_level(), "unsupported" ); |
| 990 | if( !s->my_arena_index ) |
| 991 | while( my_arena->num_workers_active() ) |
| 992 | s->wait_until_empty(); |
| 993 | } else for(;;) { |
| 994 | while( my_arena->my_pool_state != arena::SNAPSHOT_EMPTY ) { |
| 995 | if( !__TBB_load_with_acquire(my_arena->my_slots[0].my_scheduler) // TODO TEMP: one master, make more masters |
| 996 | && as_atomic(my_arena->my_slots[0].my_scheduler).compare_and_swap(s, NULL) == NULL ) { |
| 997 | nested_arena_context a(s, my_arena, 0, scheduler_properties::worker, false); |
| 998 | s->wait_until_empty(); |
| 999 | } else { |
| 1000 | binary_semaphore waiter; // TODO: replace by a single event notification from is_out_of_work |
| 1001 | internal_enqueue( *new( task::allocate_root(__TBB_CONTEXT_ARG1(*my_context)) ) wait_task(waiter), 0 ); // TODO: priority? |
| 1002 | waiter.P(); // TODO: concurrent_monitor |
| 1003 | } |
| 1004 | } |
| 1005 | if( !my_arena->num_workers_active() && !my_arena->my_slots[0].my_scheduler) // no activity |
| 1006 | break; // spin until workers active but avoid spinning in a worker |
| 1007 | __TBB_Yield(); // wait until workers and master leave |
| 1008 | } |
| 1009 | } |
| 1010 | |
| 1011 | /*static*/ int task_arena_base::internal_current_slot() { |
| 1012 | generic_scheduler* s = governor::local_scheduler_if_initialized(); |
| 1013 | return s? int(s->my_arena_index) : -1; |
| 1014 | } |
| 1015 | |
| 1016 | #if __TBB_TASK_ISOLATION |
| 1017 | class isolation_guard : tbb::internal::no_copy { |
| 1018 | isolation_tag &guarded; |
| 1019 | isolation_tag previous_value; |
| 1020 | public: |
| 1021 | isolation_guard( isolation_tag &isolation ) : guarded( isolation ), previous_value( isolation ) {} |
| 1022 | ~isolation_guard() { |
| 1023 | guarded = previous_value; |
| 1024 | } |
| 1025 | }; |
| 1026 | |
| 1027 | void isolate_within_arena( delegate_base& d, intptr_t reserved ) { |
| 1028 | __TBB_ASSERT_EX( reserved == 0, NULL ); |
| 1029 | // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it? |
| 1030 | generic_scheduler* s = governor::local_scheduler_weak(); |
| 1031 | __TBB_ASSERT( s, "this_task_arena::isolate() needs an initialized scheduler" ); |
| 1032 | // Theoretically, we can keep the current isolation in the scheduler; however, it makes sense to store it in innermost |
| 1033 | // running task because it can in principle be queried via task::self(). |
| 1034 | isolation_tag& current_isolation = s->my_innermost_running_task->prefix().isolation; |
| 1035 | // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard. |
| 1036 | isolation_guard guard( current_isolation ); |
| 1037 | current_isolation = reinterpret_cast<isolation_tag>(&d); |
| 1038 | d(); |
| 1039 | } |
| 1040 | #endif /* __TBB_TASK_ISOLATION */ |
| 1041 | |
| 1042 | int task_arena_base::internal_max_concurrency(const task_arena *ta) { |
| 1043 | arena* a = NULL; |
| 1044 | if( ta ) // for special cases of ta->max_concurrency() |
| 1045 | a = ta->my_arena; |
| 1046 | else if( generic_scheduler* s = governor::local_scheduler_if_initialized() ) |
| 1047 | a = s->my_arena; // the current arena if any |
| 1048 | |
| 1049 | if( a ) { // Get parameters from the arena |
| 1050 | __TBB_ASSERT( !ta || ta->my_max_concurrency==1, NULL ); |
| 1051 | return a->my_num_reserved_slots + a->my_max_num_workers; |
| 1052 | } else { |
| 1053 | __TBB_ASSERT( !ta || ta->my_max_concurrency==automatic, NULL ); |
| 1054 | return int(governor::default_num_threads()); |
| 1055 | } |
| 1056 | } |
| 1057 | } // tbb::interfaceX::internal |
| 1058 | } // tbb::interfaceX |
| 1059 | } // tbb |
| 1060 | |