1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include "tbb/global_control.h" // thread_stack_size
18
19#include "scheduler.h"
20#include "governor.h"
21#include "arena.h"
22#include "itt_notify.h"
23#include "semaphore.h"
24#include "tbb/internal/_flow_graph_impl.h"
25
26#include <functional>
27
28#if __TBB_STATISTICS_STDOUT
29#include <cstdio>
30#endif
31
32namespace tbb {
33namespace internal {
34
35// put it here in order to enable compiler to inline it into arena::process and nested_arena_entry
36void generic_scheduler::attach_arena( arena* a, size_t index, bool is_master ) {
37 __TBB_ASSERT( a->my_market == my_market, NULL );
38 my_arena = a;
39 my_arena_index = index;
40 my_arena_slot = a->my_slots + index;
41 attach_mailbox( affinity_id(index+1) );
42 if ( is_master && my_inbox.is_idle_state( true ) ) {
43 // Master enters an arena with its own task to be executed. It means that master is not
44 // going to enter stealing loop and take affinity tasks.
45 my_inbox.set_is_idle( false );
46 }
47#if __TBB_TASK_GROUP_CONTEXT
48 // Context to be used by root tasks by default (if the user has not specified one).
49 if( !is_master )
50 my_dummy_task->prefix().context = a->my_default_ctx;
51#endif /* __TBB_TASK_GROUP_CONTEXT */
52#if __TBB_TASK_PRIORITY
53 // In the current implementation master threads continue processing even when
54 // there are other masters with higher priority. Only TBB worker threads are
55 // redistributed between arenas based on the latters' priority. Thus master
56 // threads use arena's top priority as a reference point (in contrast to workers
57 // that use my_market->my_global_top_priority).
58 if( is_master ) {
59 my_ref_top_priority = &a->my_top_priority;
60 my_ref_reload_epoch = &a->my_reload_epoch;
61 }
62 my_local_reload_epoch = *my_ref_reload_epoch;
63 __TBB_ASSERT( !my_offloaded_tasks, NULL );
64#endif /* __TBB_TASK_PRIORITY */
65}
66
67inline static bool occupy_slot( generic_scheduler*& slot, generic_scheduler& s ) {
68 return !slot && as_atomic( slot ).compare_and_swap( &s, NULL ) == NULL;
69}
70
71size_t arena::occupy_free_slot_in_range( generic_scheduler& s, size_t lower, size_t upper ) {
72 if ( lower >= upper ) return out_of_arena;
73 // Start search for an empty slot from the one we occupied the last time
74 size_t index = s.my_arena_index;
75 if ( index < lower || index >= upper ) index = s.my_random.get() % (upper - lower) + lower;
76 __TBB_ASSERT( index >= lower && index < upper, NULL );
77 // Find a free slot
78 for ( size_t i = index; i < upper; ++i )
79 if ( occupy_slot(my_slots[i].my_scheduler, s) ) return i;
80 for ( size_t i = lower; i < index; ++i )
81 if ( occupy_slot(my_slots[i].my_scheduler, s) ) return i;
82 return out_of_arena;
83}
84
85template <bool as_worker>
86size_t arena::occupy_free_slot( generic_scheduler& s ) {
87 // Firstly, masters try to occupy reserved slots
88 size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( s, 0, my_num_reserved_slots );
89 if ( index == out_of_arena ) {
90 // Secondly, all threads try to occupy all non-reserved slots
91 index = occupy_free_slot_in_range( s, my_num_reserved_slots, my_num_slots );
92 // Likely this arena is already saturated
93 if ( index == out_of_arena )
94 return out_of_arena;
95 }
96
97 ITT_NOTIFY(sync_acquired, my_slots + index);
98 atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
99 return index;
100}
101
102void arena::process( generic_scheduler& s ) {
103 __TBB_ASSERT( is_alive(my_guard), NULL );
104 __TBB_ASSERT( governor::is_set(&s), NULL );
105 __TBB_ASSERT( s.my_innermost_running_task == s.my_dummy_task, NULL );
106 __TBB_ASSERT( s.worker_outermost_level(), NULL );
107
108 __TBB_ASSERT( my_num_slots > 1, NULL );
109
110 size_t index = occupy_free_slot</*as_worker*/true>( s );
111 if ( index == out_of_arena )
112 goto quit;
113
114 __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
115 s.attach_arena( this, index, /*is_master*/false );
116
117#if !__TBB_FP_CONTEXT
118 my_cpu_ctl_env.set_env();
119#endif
120
121#if __TBB_ARENA_OBSERVER
122 __TBB_ASSERT( !s.my_last_local_observer, "There cannot be notified local observers when entering arena" );
123 my_observers.notify_entry_observers( s.my_last_local_observer, /*worker=*/true );
124#endif /* __TBB_ARENA_OBSERVER */
125
126 // Task pool can be marked as non-empty if the worker occupies the slot left by a master.
127 if ( s.my_arena_slot->task_pool != EmptyTaskPool ) {
128 __TBB_ASSERT( s.my_inbox.is_idle_state(false), NULL );
129 s.local_wait_for_all( *s.my_dummy_task, NULL );
130 __TBB_ASSERT( s.my_inbox.is_idle_state(true), NULL );
131 }
132
133 for ( ;; ) {
134 __TBB_ASSERT( s.my_innermost_running_task == s.my_dummy_task, NULL );
135 __TBB_ASSERT( s.worker_outermost_level(), NULL );
136 __TBB_ASSERT( is_alive(my_guard), NULL );
137 __TBB_ASSERT( s.is_quiescent_local_task_pool_reset(),
138 "Worker cannot leave arena while its task pool is not reset" );
139 __TBB_ASSERT( s.my_arena_slot->task_pool == EmptyTaskPool, "Empty task pool is not marked appropriately" );
140 // This check prevents relinquishing more than necessary workers because
141 // of the non-atomicity of the decision making procedure
142 if ( num_workers_active() > my_num_workers_allotted
143#if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
144 || recall_by_mandatory_request()
145#endif
146 )
147 break;
148 // Try to steal a task.
149 // Passing reference count is technically unnecessary in this context,
150 // but omitting it here would add checks inside the function.
151 task* t = s.receive_or_steal_task( __TBB_ISOLATION_ARG( s.my_dummy_task->prefix().ref_count, no_isolation ) );
152 if (t) {
153 // A side effect of receive_or_steal_task is that my_innermost_running_task can be set.
154 // But for the outermost dispatch loop it has to be a dummy task.
155 s.my_innermost_running_task = s.my_dummy_task;
156 s.local_wait_for_all(*s.my_dummy_task,t);
157 }
158 }
159#if __TBB_ARENA_OBSERVER
160 my_observers.notify_exit_observers( s.my_last_local_observer, /*worker=*/true );
161 s.my_last_local_observer = NULL;
162#endif /* __TBB_ARENA_OBSERVER */
163#if __TBB_TASK_PRIORITY
164 if ( s.my_offloaded_tasks )
165 orphan_offloaded_tasks( s );
166#endif /* __TBB_TASK_PRIORITY */
167#if __TBB_STATISTICS
168 ++s.my_counters.arena_roundtrips;
169 *my_slots[index].my_counters += s.my_counters;
170 s.my_counters.reset();
171#endif /* __TBB_STATISTICS */
172 __TBB_store_with_release( my_slots[index].my_scheduler, (generic_scheduler*)NULL );
173 s.my_arena_slot = 0; // detached from slot
174 s.my_inbox.detach();
175 __TBB_ASSERT( s.my_inbox.is_idle_state(true), NULL );
176 __TBB_ASSERT( s.my_innermost_running_task == s.my_dummy_task, NULL );
177 __TBB_ASSERT( s.worker_outermost_level(), NULL );
178 __TBB_ASSERT( is_alive(my_guard), NULL );
179quit:
180 // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible
181 // that arena may be temporarily left unpopulated by threads. See comments in
182 // arena::on_thread_leaving() for more details.
183 on_thread_leaving<ref_worker>();
184}
185
186arena::arena ( market& m, unsigned num_slots, unsigned num_reserved_slots ) {
187 __TBB_ASSERT( !my_guard, "improperly allocated arena?" );
188 __TBB_ASSERT( sizeof(my_slots[0]) % NFS_GetLineSize()==0, "arena::slot size not multiple of cache line size" );
189 __TBB_ASSERT( (uintptr_t)this % NFS_GetLineSize()==0, "arena misaligned" );
190#if __TBB_TASK_PRIORITY
191 __TBB_ASSERT( !my_reload_epoch && !my_orphaned_tasks && !my_skipped_fifo_priority, "New arena object is not zeroed" );
192#endif /* __TBB_TASK_PRIORITY */
193 my_market = &m;
194 my_limit = 1;
195 // Two slots are mandatory: for the master, and for 1 worker (required to support starvation resistant tasks).
196 my_num_slots = num_arena_slots(num_slots);
197 my_num_reserved_slots = num_reserved_slots;
198 my_max_num_workers = num_slots-num_reserved_slots;
199 my_references = ref_external; // accounts for the master
200#if __TBB_TASK_PRIORITY
201 my_bottom_priority = my_top_priority = normalized_normal_priority;
202#endif /* __TBB_TASK_PRIORITY */
203 my_aba_epoch = m.my_arenas_aba_epoch;
204#if __TBB_ARENA_OBSERVER
205 my_observers.my_arena = this;
206#endif
207 __TBB_ASSERT ( my_max_num_workers <= my_num_slots, NULL );
208 // Construct slots. Mark internal synchronization elements for the tools.
209 for( unsigned i = 0; i < my_num_slots; ++i ) {
210 __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, NULL );
211 __TBB_ASSERT( !my_slots[i].task_pool_ptr, NULL );
212 __TBB_ASSERT( !my_slots[i].my_task_pool_size, NULL );
213 ITT_SYNC_CREATE(my_slots + i, SyncType_Scheduler, SyncObj_WorkerTaskPool);
214 mailbox(i+1).construct();
215 ITT_SYNC_CREATE(&mailbox(i+1), SyncType_Scheduler, SyncObj_Mailbox);
216 my_slots[i].hint_for_pop = i;
217#if __TBB_PREVIEW_CRITICAL_TASKS
218 my_slots[i].hint_for_critical = i;
219#endif
220#if __TBB_STATISTICS
221 my_slots[i].my_counters = new ( NFS_Allocate(1, sizeof(statistics_counters), NULL) ) statistics_counters;
222#endif /* __TBB_STATISTICS */
223 }
224 my_task_stream.initialize(my_num_slots);
225 ITT_SYNC_CREATE(&my_task_stream, SyncType_Scheduler, SyncObj_TaskStream);
226#if __TBB_PREVIEW_CRITICAL_TASKS
227 my_critical_task_stream.initialize(my_num_slots);
228 ITT_SYNC_CREATE(&my_critical_task_stream, SyncType_Scheduler, SyncObj_CriticalTaskStream);
229#endif
230#if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
231 my_concurrency_mode = cm_normal;
232#endif
233#if !__TBB_FP_CONTEXT
234 my_cpu_ctl_env.get_env();
235#endif
236}
237
238arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots ) {
239 __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
240 __TBB_ASSERT( sizeof(base_type) % NFS_GetLineSize() == 0, "arena slots area misaligned: wrong padding" );
241 __TBB_ASSERT( sizeof(mail_outbox) == NFS_MaxLineSize, "Mailbox padding is wrong" );
242 size_t n = allocation_size(num_arena_slots(num_slots));
243 unsigned char* storage = (unsigned char*)NFS_Allocate( 1, n, NULL );
244 // Zero all slots to indicate that they are empty
245 memset( storage, 0, n );
246 return *new( storage + num_arena_slots(num_slots) * sizeof(mail_outbox) ) arena(m, num_slots, num_reserved_slots);
247}
248
249void arena::free_arena () {
250 __TBB_ASSERT( is_alive(my_guard), NULL );
251 __TBB_ASSERT( !my_references, "There are threads in the dying arena" );
252 __TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" );
253 __TBB_ASSERT( my_pool_state == SNAPSHOT_EMPTY || !my_max_num_workers, "Inconsistent state of a dying arena" );
254#if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
255 __TBB_ASSERT( my_concurrency_mode != cm_enforced_global, NULL );
256#endif
257#if !__TBB_STATISTICS_EARLY_DUMP
258 GATHER_STATISTIC( dump_arena_statistics() );
259#endif
260 poison_value( my_guard );
261 intptr_t drained = 0;
262 for ( unsigned i = 0; i < my_num_slots; ++i ) {
263 __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" );
264 // TODO: understand the assertion and modify
265 // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, NULL );
266 __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, NULL ); // TODO: replace by is_quiescent_local_task_pool_empty
267 my_slots[i].free_task_pool();
268#if __TBB_STATISTICS
269 NFS_Free( my_slots[i].my_counters );
270#endif /* __TBB_STATISTICS */
271 drained += mailbox(i+1).drain();
272 }
273 __TBB_ASSERT( my_task_stream.drain()==0, "Not all enqueued tasks were executed");
274#if __TBB_PREVIEW_CRITICAL_TASKS
275 __TBB_ASSERT( my_critical_task_stream.drain()==0, "Not all critical tasks were executed");
276#endif
277#if __TBB_COUNT_TASK_NODES
278 my_market->update_task_node_count( -drained );
279#endif /* __TBB_COUNT_TASK_NODES */
280 // remove an internal reference
281 my_market->release( /*is_public=*/false, /*blocking_terminate=*/false );
282#if __TBB_TASK_GROUP_CONTEXT
283 __TBB_ASSERT( my_default_ctx, "Master thread never entered the arena?" );
284 my_default_ctx->~task_group_context();
285 NFS_Free(my_default_ctx);
286#endif /* __TBB_TASK_GROUP_CONTEXT */
287#if __TBB_ARENA_OBSERVER
288 if ( !my_observers.empty() )
289 my_observers.clear();
290#endif /* __TBB_ARENA_OBSERVER */
291 void* storage = &mailbox(my_num_slots);
292 __TBB_ASSERT( my_references == 0, NULL );
293 __TBB_ASSERT( my_pool_state == SNAPSHOT_EMPTY || !my_max_num_workers, NULL );
294 this->~arena();
295#if TBB_USE_ASSERT > 1
296 memset( storage, 0, allocation_size(my_num_slots) );
297#endif /* TBB_USE_ASSERT */
298 NFS_Free( storage );
299}
300
301#if __TBB_STATISTICS
302void arena::dump_arena_statistics () {
303 statistics_counters total;
304 for( unsigned i = 0; i < my_num_slots; ++i ) {
305#if __TBB_STATISTICS_EARLY_DUMP
306 generic_scheduler* s = my_slots[i].my_scheduler;
307 if ( s )
308 *my_slots[i].my_counters += s->my_counters;
309#else
310 __TBB_ASSERT( !my_slots[i].my_scheduler, NULL );
311#endif
312 if ( i != 0 ) {
313 total += *my_slots[i].my_counters;
314 dump_statistics( *my_slots[i].my_counters, i );
315 }
316 }
317 dump_statistics( *my_slots[0].my_counters, 0 );
318#if __TBB_STATISTICS_STDOUT
319#if !__TBB_STATISTICS_TOTALS_ONLY
320 printf( "----------------------------------------------\n" );
321#endif
322 dump_statistics( total, workers_counters_total );
323 total += *my_slots[0].my_counters;
324 dump_statistics( total, arena_counters_total );
325#if !__TBB_STATISTICS_TOTALS_ONLY
326 printf( "==============================================\n" );
327#endif
328#endif /* __TBB_STATISTICS_STDOUT */
329}
330#endif /* __TBB_STATISTICS */
331
332#if __TBB_TASK_PRIORITY
333// The method inspects a scheduler to determine:
334// 1. if it has tasks that can be retrieved and executed (via the return value);
335// 2. if it has any tasks at all, including those of lower priority (via tasks_present);
336// 3. if it is able to work with enqueued tasks (via dequeuing_possible).
337inline bool arena::may_have_tasks ( generic_scheduler* s, bool& tasks_present, bool& dequeuing_possible ) {
338 if ( !s || s->my_arena != this )
339 return false;
340 dequeuing_possible |= s->worker_outermost_level();
341 if ( s->my_pool_reshuffling_pending ) {
342 // This primary task pool is nonempty and may contain tasks at the current
343 // priority level. Its owner is winnowing lower priority tasks at the moment.
344 tasks_present = true;
345 return true;
346 }
347 if ( s->my_offloaded_tasks ) {
348 tasks_present = true;
349 if ( s->my_local_reload_epoch < *s->my_ref_reload_epoch ) {
350 // This scheduler's offload area is nonempty and may contain tasks at the
351 // current priority level.
352 return true;
353 }
354 }
355 return false;
356}
357
358void arena::orphan_offloaded_tasks(generic_scheduler& s) {
359 __TBB_ASSERT( s.my_offloaded_tasks, NULL );
360 GATHER_STATISTIC( ++s.my_counters.prio_orphanings );
361 ++my_abandonment_epoch;
362 __TBB_ASSERT( s.my_offloaded_task_list_tail_link && !*s.my_offloaded_task_list_tail_link, NULL );
363 task* orphans;
364 do {
365 orphans = const_cast<task*>(my_orphaned_tasks);
366 *s.my_offloaded_task_list_tail_link = orphans;
367 } while ( as_atomic(my_orphaned_tasks).compare_and_swap(s.my_offloaded_tasks, orphans) != orphans );
368 s.my_offloaded_tasks = NULL;
369#if TBB_USE_ASSERT
370 s.my_offloaded_task_list_tail_link = NULL;
371#endif /* TBB_USE_ASSERT */
372}
373#endif /* __TBB_TASK_PRIORITY */
374
375bool arena::has_enqueued_tasks() {
376 // Look for enqueued tasks at all priority levels
377 for ( int p = 0; p < num_priority_levels; ++p )
378 if ( !my_task_stream.empty(p) )
379 return true;
380 return false;
381}
382
383void arena::restore_priority_if_need() {
384 // Check for the presence of enqueued tasks "lost" on some of
385 // priority levels because updating arena priority and switching
386 // arena into "populated" (FULL) state happen non-atomically.
387 // Imposing atomicity would require task::enqueue() to use a lock,
388 // which is unacceptable.
389 if ( has_enqueued_tasks() ) {
390 advertise_new_work<work_enqueued>();
391#if __TBB_TASK_PRIORITY
392 // update_arena_priority() expects non-zero arena::my_num_workers_requested,
393 // so must be called after advertise_new_work<work_enqueued>()
394 for ( int p = 0; p < num_priority_levels; ++p )
395 if ( !my_task_stream.empty(p) ) {
396 if ( p < my_bottom_priority || p > my_top_priority )
397 my_market->update_arena_priority(*this, p);
398 }
399#endif
400 }
401}
402
403bool arena::is_out_of_work() {
404 // TODO: rework it to return at least a hint about where a task was found; better if the task itself.
405 for(;;) {
406 pool_state_t snapshot = my_pool_state;
407 switch( snapshot ) {
408 case SNAPSHOT_EMPTY:
409 return true;
410 case SNAPSHOT_FULL: {
411 // Use unique id for "busy" in order to avoid ABA problems.
412 const pool_state_t busy = pool_state_t(&busy);
413 // Request permission to take snapshot
414 if( my_pool_state.compare_and_swap( busy, SNAPSHOT_FULL )==SNAPSHOT_FULL ) {
415 // Got permission. Take the snapshot.
416 // NOTE: This is not a lock, as the state can be set to FULL at
417 // any moment by a thread that spawns/enqueues new task.
418 size_t n = my_limit;
419 // Make local copies of volatile parameters. Their change during
420 // snapshot taking procedure invalidates the attempt, and returns
421 // this thread into the dispatch loop.
422#if __TBB_TASK_PRIORITY
423 uintptr_t reload_epoch = __TBB_load_with_acquire( my_reload_epoch );
424 intptr_t top_priority = my_top_priority;
425 // Inspect primary task pools first
426#endif /* __TBB_TASK_PRIORITY */
427 size_t k;
428 for( k=0; k<n; ++k ) {
429 if( my_slots[k].task_pool != EmptyTaskPool &&
430 __TBB_load_relaxed(my_slots[k].head) < __TBB_load_relaxed(my_slots[k].tail) )
431 {
432 // k-th primary task pool is nonempty and does contain tasks.
433 break;
434 }
435 if( my_pool_state!=busy )
436 return false; // the work was published
437 }
438 __TBB_ASSERT( k <= n, NULL );
439 bool work_absent = k == n;
440#if __TBB_PREVIEW_CRITICAL_TASKS
441 bool no_critical_tasks = my_critical_task_stream.empty(0);
442 work_absent &= no_critical_tasks;
443#endif
444#if __TBB_TASK_PRIORITY
445 // Variable tasks_present indicates presence of tasks at any priority
446 // level, while work_absent refers only to the current priority.
447 bool tasks_present = !work_absent || my_orphaned_tasks;
448 bool dequeuing_possible = false;
449 if ( work_absent ) {
450 // Check for the possibility that recent priority changes
451 // brought some tasks to the current priority level
452
453 uintptr_t abandonment_epoch = my_abandonment_epoch;
454 // Master thread's scheduler needs special handling as it
455 // may be destroyed at any moment (workers' schedulers are
456 // guaranteed to be alive while at least one thread is in arena).
457 // The lock below excludes concurrency with task group state change
458 // propagation and guarantees lifetime of the master thread.
459 the_context_state_propagation_mutex.lock();
460 work_absent = !may_have_tasks( my_slots[0].my_scheduler, tasks_present, dequeuing_possible );
461 the_context_state_propagation_mutex.unlock();
462 // The following loop is subject to data races. While k-th slot's
463 // scheduler is being examined, corresponding worker can either
464 // leave to RML or migrate to another arena.
465 // But the races are not prevented because all of them are benign.
466 // First, the code relies on the fact that worker thread's scheduler
467 // object persists until the whole library is deinitialized.
468 // Second, in the worst case the races can only cause another
469 // round of stealing attempts to be undertaken. Introducing complex
470 // synchronization into this coldest part of the scheduler's control
471 // flow does not seem to make sense because it both is unlikely to
472 // ever have any observable performance effect, and will require
473 // additional synchronization code on the hotter paths.
474 for( k = 1; work_absent && k < n; ++k ) {
475 if( my_pool_state!=busy )
476 return false; // the work was published
477 work_absent = !may_have_tasks( my_slots[k].my_scheduler, tasks_present, dequeuing_possible );
478 }
479 // Preclude premature switching arena off because of a race in the previous loop.
480 work_absent = work_absent
481 && !__TBB_load_with_acquire(my_orphaned_tasks)
482 && abandonment_epoch == my_abandonment_epoch;
483 }
484#endif /* __TBB_TASK_PRIORITY */
485 // Test and test-and-set.
486 if( my_pool_state==busy ) {
487#if __TBB_TASK_PRIORITY
488 bool no_fifo_tasks = my_task_stream.empty(top_priority);
489 work_absent = work_absent && (!dequeuing_possible || no_fifo_tasks)
490 && top_priority == my_top_priority && reload_epoch == my_reload_epoch;
491#else
492 bool no_fifo_tasks = my_task_stream.empty(0);
493 work_absent = work_absent && no_fifo_tasks;
494#endif /* __TBB_TASK_PRIORITY */
495 if( work_absent ) {
496#if __TBB_TASK_PRIORITY
497 if ( top_priority > my_bottom_priority ) {
498 if ( my_market->lower_arena_priority(*this, top_priority - 1, reload_epoch)
499 && !my_task_stream.empty(top_priority) )
500 {
501 atomic_update( my_skipped_fifo_priority, top_priority, std::less<intptr_t>());
502 }
503 }
504 else if ( !tasks_present && !my_orphaned_tasks && no_fifo_tasks ) {
505#endif /* __TBB_TASK_PRIORITY */
506 // save current demand value before setting SNAPSHOT_EMPTY,
507 // to avoid race with advertise_new_work.
508 int current_demand = (int)my_max_num_workers;
509 if( my_pool_state.compare_and_swap( SNAPSHOT_EMPTY, busy )==busy ) {
510#if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
511 if( my_concurrency_mode==cm_enforced_global ) {
512 // adjust_demand() called inside, if needed
513 my_market->mandatory_concurrency_disable( this );
514 } else
515#endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
516 {
517 // This thread transitioned pool to empty state, and thus is
518 // responsible for telling the market that there is no work to do.
519 my_market->adjust_demand( *this, -current_demand );
520 }
521 restore_priority_if_need();
522 return true;
523 }
524 return false;
525#if __TBB_TASK_PRIORITY
526 }
527#endif /* __TBB_TASK_PRIORITY */
528 }
529 // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it.
530 my_pool_state.compare_and_swap( SNAPSHOT_FULL, busy );
531 }
532 }
533 return false;
534 }
535 default:
536 // Another thread is taking a snapshot.
537 return false;
538 }
539 }
540}
541
542#if __TBB_COUNT_TASK_NODES
543intptr_t arena::workers_task_node_count() {
544 intptr_t result = 0;
545 for( unsigned i = 1; i < my_num_slots; ++i ) {
546 generic_scheduler* s = my_slots[i].my_scheduler;
547 if( s )
548 result += s->my_task_node_count;
549 }
550 return result;
551}
552#endif /* __TBB_COUNT_TASK_NODES */
553
554void arena::enqueue_task( task& t, intptr_t prio, FastRandom &random )
555{
556#if __TBB_RECYCLE_TO_ENQUEUE
557 __TBB_ASSERT( t.state()==task::allocated || t.state()==task::to_enqueue, "attempt to enqueue task with inappropriate state" );
558#else
559 __TBB_ASSERT( t.state()==task::allocated, "attempt to enqueue task that is not in 'allocated' state" );
560#endif
561 t.prefix().state = task::ready;
562 t.prefix().extra_state |= es_task_enqueued; // enqueued task marker
563
564#if TBB_USE_ASSERT
565 if( task* parent = t.parent() ) {
566 internal::reference_count ref_count = parent->prefix().ref_count;
567 __TBB_ASSERT( ref_count!=0, "attempt to enqueue task whose parent has a ref_count==0 (forgot to set_ref_count?)" );
568 __TBB_ASSERT( ref_count>0, "attempt to enqueue task whose parent has a ref_count<0" );
569 parent->prefix().extra_state |= es_ref_count_active;
570 }
571 __TBB_ASSERT(t.prefix().affinity==affinity_id(0), "affinity is ignored for enqueued tasks");
572#endif /* TBB_USE_ASSERT */
573#if __TBB_PREVIEW_CRITICAL_TASKS
574 if( prio == internal::priority_critical || internal::is_critical( t ) ) {
575 // TODO: consider using of 'scheduler::handled_as_critical'
576 internal::make_critical( t );
577 generic_scheduler* s = governor::local_scheduler_if_initialized();
578 ITT_NOTIFY(sync_releasing, &my_critical_task_stream);
579 if( s && s->my_arena_slot ) {
580 // Scheduler is initialized and it is attached to the arena,
581 // propagate isolation level to critical task
582#if __TBB_TASK_ISOLATION
583 t.prefix().isolation = s->my_innermost_running_task->prefix().isolation;
584#endif
585 unsigned& lane = s->my_arena_slot->hint_for_critical;
586 my_critical_task_stream.push( &t, 0, tbb::internal::subsequent_lane_selector(lane) );
587 } else {
588 // Either scheduler is not initialized or it is not attached to the arena
589 // use random lane for the task
590 my_critical_task_stream.push( &t, 0, internal::random_lane_selector(random) );
591 }
592 advertise_new_work<work_spawned>();
593 return;
594 }
595#endif /* __TBB_PREVIEW_CRITICAL_TASKS */
596
597 ITT_NOTIFY(sync_releasing, &my_task_stream);
598#if __TBB_TASK_PRIORITY
599 intptr_t p = prio ? normalize_priority(priority_t(prio)) : normalized_normal_priority;
600 assert_priority_valid(p);
601#if __TBB_PREVIEW_CRITICAL_TASKS && __TBB_CPF_BUILD
602 my_task_stream.push( &t, p, internal::random_lane_selector(random) );
603#else
604 my_task_stream.push( &t, p, random );
605#endif
606 if ( p != my_top_priority )
607 my_market->update_arena_priority( *this, p );
608#else /* !__TBB_TASK_PRIORITY */
609 __TBB_ASSERT_EX(prio == 0, "the library is not configured to respect the task priority");
610#if __TBB_PREVIEW_CRITICAL_TASKS && __TBB_CPF_BUILD
611 my_task_stream.push( &t, 0, internal::random_lane_selector(random) );
612#else
613 my_task_stream.push( &t, 0, random );
614#endif
615#endif /* !__TBB_TASK_PRIORITY */
616 advertise_new_work<work_enqueued>();
617#if __TBB_TASK_PRIORITY
618 if ( p != my_top_priority )
619 my_market->update_arena_priority( *this, p );
620#endif /* __TBB_TASK_PRIORITY */
621}
622
623class nested_arena_context : no_copy {
624public:
625 nested_arena_context(generic_scheduler *s, arena* a, size_t slot_index, bool type, bool same)
626 : my_scheduler(*s), my_orig_ctx(NULL), same_arena(same) {
627 if (same_arena) {
628 my_orig_state.my_properties = my_scheduler.my_properties;
629 my_orig_state.my_innermost_running_task = my_scheduler.my_innermost_running_task;
630 mimic_outermost_level(a, type);
631 } else {
632 my_orig_state = *s;
633 mimic_outermost_level(a, type);
634 s->nested_arena_entry(a, slot_index);
635 }
636 }
637 ~nested_arena_context() {
638#if __TBB_TASK_GROUP_CONTEXT
639 my_scheduler.my_dummy_task->prefix().context = my_orig_ctx; // restore context of dummy task
640#endif
641 if (same_arena) {
642 my_scheduler.my_properties = my_orig_state.my_properties;
643 my_scheduler.my_innermost_running_task = my_orig_state.my_innermost_running_task;
644 } else {
645 my_scheduler.nested_arena_exit();
646 static_cast<scheduler_state&>(my_scheduler) = my_orig_state; // restore arena settings
647#if __TBB_TASK_PRIORITY
648 my_scheduler.my_local_reload_epoch = *my_orig_state.my_ref_reload_epoch;
649#endif
650 governor::assume_scheduler(&my_scheduler);
651 }
652 }
653
654private:
655 generic_scheduler &my_scheduler;
656 scheduler_state my_orig_state;
657 task_group_context *my_orig_ctx;
658 const bool same_arena;
659
660 void mimic_outermost_level(arena* a, bool type) {
661 my_scheduler.my_properties.outermost = true;
662 my_scheduler.my_properties.type = type;
663 my_scheduler.my_innermost_running_task = my_scheduler.my_dummy_task;
664#if __TBB_PREVIEW_CRITICAL_TASKS
665 my_scheduler.my_properties.has_taken_critical_task = false;
666#endif
667#if __TBB_TASK_GROUP_CONTEXT
668 // Save dummy's context and replace it by arena's context
669 my_orig_ctx = my_scheduler.my_dummy_task->prefix().context;
670 my_scheduler.my_dummy_task->prefix().context = a->my_default_ctx;
671#endif
672 }
673};
674
675void generic_scheduler::nested_arena_entry(arena* a, size_t slot_index) {
676 __TBB_ASSERT( is_alive(a->my_guard), NULL );
677 __TBB_ASSERT( a!=my_arena, NULL);
678
679 // overwrite arena settings
680#if __TBB_TASK_PRIORITY
681 if ( my_offloaded_tasks )
682 my_arena->orphan_offloaded_tasks( *this );
683 my_offloaded_tasks = NULL;
684#endif /* __TBB_TASK_PRIORITY */
685 attach_arena( a, slot_index, /*is_master*/true );
686 __TBB_ASSERT( my_arena == a, NULL );
687 governor::assume_scheduler( this );
688 // TODO? ITT_NOTIFY(sync_acquired, a->my_slots + index);
689 // TODO: it requires market to have P workers (not P-1)
690 // TODO: a preempted worker should be excluded from assignment to other arenas e.g. my_slack--
691 if( !is_worker() && slot_index >= my_arena->my_num_reserved_slots )
692 my_arena->my_market->adjust_demand(*my_arena, -1);
693#if __TBB_ARENA_OBSERVER
694 my_last_local_observer = 0; // TODO: try optimize number of calls
695 my_arena->my_observers.notify_entry_observers( my_last_local_observer, /*worker=*/false );
696#endif
697}
698
699void generic_scheduler::nested_arena_exit() {
700#if __TBB_ARENA_OBSERVER
701 my_arena->my_observers.notify_exit_observers( my_last_local_observer, /*worker=*/false );
702#endif /* __TBB_ARENA_OBSERVER */
703#if __TBB_TASK_PRIORITY
704 if ( my_offloaded_tasks )
705 my_arena->orphan_offloaded_tasks( *this );
706#endif
707 if( !is_worker() && my_arena_index >= my_arena->my_num_reserved_slots )
708 my_arena->my_market->adjust_demand(*my_arena, 1);
709 // Free the master slot.
710 __TBB_ASSERT(my_arena->my_slots[my_arena_index].my_scheduler, "A slot is already empty");
711 __TBB_store_with_release(my_arena->my_slots[my_arena_index].my_scheduler, (generic_scheduler*)NULL);
712 my_arena->my_exit_monitors.notify_one(); // do not relax!
713}
714
715void generic_scheduler::wait_until_empty() {
716 my_dummy_task->prefix().ref_count++; // prevents exit from local_wait_for_all when local work is done enforcing the stealing
717 while( my_arena->my_pool_state != arena::SNAPSHOT_EMPTY )
718 local_wait_for_all(*my_dummy_task, NULL);
719 my_dummy_task->prefix().ref_count--;
720}
721
722} // namespace internal
723} // namespace tbb
724
725#include "scheduler_utility.h"
726#include "tbb/task_arena.h" // task_arena_base
727
728namespace tbb {
729namespace interface7 {
730namespace internal {
731
732void task_arena_base::internal_initialize( ) {
733 governor::one_time_init();
734 if( my_max_concurrency < 1 )
735 my_max_concurrency = (int)governor::default_num_threads();
736 __TBB_ASSERT( my_master_slots <= (unsigned)my_max_concurrency, "Number of slots reserved for master should not exceed arena concurrency");
737 arena* new_arena = market::create_arena( my_max_concurrency, my_master_slots, 0 );
738 // add an internal market reference; a public reference was added in create_arena
739 market &m = market::global_market( /*is_public=*/false );
740 // allocate default context for task_arena
741#if __TBB_TASK_GROUP_CONTEXT
742 new_arena->my_default_ctx = new ( NFS_Allocate(1, sizeof(task_group_context), NULL) )
743 task_group_context( task_group_context::isolated, task_group_context::default_traits );
744#if __TBB_FP_CONTEXT
745 new_arena->my_default_ctx->capture_fp_settings();
746#endif
747#endif /* __TBB_TASK_GROUP_CONTEXT */
748 // threads might race to initialize the arena
749 if(as_atomic(my_arena).compare_and_swap(new_arena, NULL) != NULL) {
750 __TBB_ASSERT(my_arena, NULL); // another thread won the race
751 // release public market reference
752 m.release( /*is_public=*/true, /*blocking_terminate=*/false );
753 new_arena->on_thread_leaving<arena::ref_external>(); // destroy unneeded arena
754#if __TBB_TASK_GROUP_CONTEXT
755 spin_wait_while_eq(my_context, (task_group_context*)NULL);
756 } else {
757 new_arena->my_default_ctx->my_version_and_traits |= my_version_and_traits & exact_exception_flag;
758 as_atomic(my_context) = new_arena->my_default_ctx;
759#endif
760 }
761 // TODO: should it trigger automatic initialization of this thread?
762 governor::local_scheduler_weak();
763}
764
765void task_arena_base::internal_terminate( ) {
766 if( my_arena ) {// task_arena was initialized
767 my_arena->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false );
768 my_arena->on_thread_leaving<arena::ref_external>();
769 my_arena = 0;
770#if __TBB_TASK_GROUP_CONTEXT
771 my_context = 0;
772#endif
773 }
774}
775
776void task_arena_base::internal_attach( ) {
777 __TBB_ASSERT(!my_arena, NULL);
778 generic_scheduler* s = governor::local_scheduler_if_initialized();
779 if( s && s->my_arena ) {
780 // There is an active arena to attach to.
781 // It's still used by s, so won't be destroyed right away.
782 my_arena = s->my_arena;
783 __TBB_ASSERT( my_arena->my_references > 0, NULL );
784 my_arena->my_references += arena::ref_external;
785#if __TBB_TASK_GROUP_CONTEXT
786 my_context = my_arena->my_default_ctx;
787 my_version_and_traits |= my_context->my_version_and_traits & exact_exception_flag;
788#endif
789 my_master_slots = my_arena->my_num_reserved_slots;
790 my_max_concurrency = my_master_slots + my_arena->my_max_num_workers;
791 __TBB_ASSERT(arena::num_arena_slots(my_max_concurrency)==my_arena->my_num_slots, NULL);
792 // increases market's ref count for task_arena
793 market::global_market( /*is_public=*/true );
794 }
795}
796
797void task_arena_base::internal_enqueue( task& t, intptr_t prio ) const {
798 __TBB_ASSERT(my_arena, NULL);
799 generic_scheduler* s = governor::local_scheduler_weak(); // scheduler is only needed for FastRandom instance
800 __TBB_ASSERT(s, "Scheduler is not initialized"); // we allocated a task so can expect the scheduler
801#if __TBB_TASK_GROUP_CONTEXT
802 // Is there a better place for checking the state of my_default_ctx?
803 __TBB_ASSERT(!(my_arena->my_default_ctx == t.prefix().context && my_arena->my_default_ctx->is_group_execution_cancelled()),
804 "The task will not be executed because default task_group_context of task_arena is cancelled. Has previously enqueued task thrown an exception?");
805#endif
806 my_arena->enqueue_task( t, prio, s->my_random );
807}
808
809class delegated_task : public task {
810 internal::delegate_base & my_delegate;
811 concurrent_monitor & my_monitor;
812 task * my_root;
813 task* execute() __TBB_override {
814 generic_scheduler& s = *(generic_scheduler*)prefix().owner;
815 __TBB_ASSERT(s.outermost_level(), "expected to be enqueued and received on the outermost level");
816 struct outermost_context : internal::no_copy {
817 delegated_task * t;
818 generic_scheduler & s;
819 task * orig_dummy;
820 task_group_context * orig_ctx;
821 scheduler_properties orig_props;
822 outermost_context(delegated_task *_t, generic_scheduler &_s)
823 : t(_t), s(_s), orig_dummy(s.my_dummy_task), orig_props(s.my_properties) {
824 __TBB_ASSERT(s.my_innermost_running_task == t, NULL);
825#if __TBB_TASK_GROUP_CONTEXT
826 orig_ctx = t->prefix().context;
827 t->prefix().context = s.my_arena->my_default_ctx;
828#endif
829 // Mimics outermost master
830 s.my_dummy_task = t;
831 s.my_properties.type = scheduler_properties::master;
832 }
833 ~outermost_context() {
834#if __TBB_TASK_GROUP_CONTEXT
835 // Restore context for sake of registering potential exception
836 t->prefix().context = orig_ctx;
837#endif
838 s.my_properties = orig_props;
839 s.my_dummy_task = orig_dummy;
840 }
841 } scope(this, s);
842 my_delegate();
843 return NULL;
844 }
845 ~delegated_task() {
846 // potential exception was already registered. It must happen before the notification
847 __TBB_ASSERT(my_root->ref_count()==2, NULL);
848 __TBB_store_with_release(my_root->prefix().ref_count, 1); // must precede the wakeup
849 my_monitor.notify(*this); // do not relax, it needs a fence!
850 }
851public:
852 delegated_task( internal::delegate_base & d, concurrent_monitor & s, task * t )
853 : my_delegate(d), my_monitor(s), my_root(t) {}
854 // predicate for concurrent_monitor notification
855 bool operator()(uintptr_t ctx) const { return (void*)ctx == (void*)&my_delegate; }
856};
857
858void task_arena_base::internal_execute(internal::delegate_base& d) const {
859 __TBB_ASSERT(my_arena, NULL);
860 generic_scheduler* s = governor::local_scheduler_weak();
861 __TBB_ASSERT(s, "Scheduler is not initialized");
862
863 bool same_arena = s->my_arena == my_arena;
864 size_t index1 = s->my_arena_index;
865 if (!same_arena) {
866 index1 = my_arena->occupy_free_slot</* as_worker*/false>(*s);
867 if (index1 == arena::out_of_arena) {
868
869#if __TBB_USE_OPTIONAL_RTTI
870 // Workaround for the bug inside graph. If the thread can not occupy arena slot during task_arena::execute()
871 // and all aggregator operations depend on this task completion (all other threads are inside arena already)
872 // deadlock appears, because enqueued task will never enter arena.
873 // Workaround: check if the task came from graph via RTTI (casting to graph::spawn_functor)
874 // and enqueue this task with non-blocking internal_enqueue method.
875 // TODO: have to change behaviour later in next GOLD release (maybe to add new library entry point - try_execute)
876 typedef tbb::flow::interface10::graph::spawn_functor graph_funct;
877 internal::delegated_function< graph_funct, void >* deleg_funct =
878 dynamic_cast< internal::delegated_function< graph_funct, void>* >(&d);
879
880 if (deleg_funct) {
881 internal_enqueue(*new(task::allocate_root(*my_context))
882 internal::function_task< internal::strip< graph_funct >::type >
883 (internal::forward< graph_funct >(deleg_funct->my_func)), 0);
884 return;
885 } else {
886#endif /* __TBB_USE_OPTIONAL_RTTI */
887 concurrent_monitor::thread_context waiter;
888#if __TBB_TASK_GROUP_CONTEXT
889 task_group_context exec_context(task_group_context::isolated, my_version_and_traits & exact_exception_flag);
890#if __TBB_FP_CONTEXT
891 exec_context.copy_fp_settings(*my_context);
892#endif
893#endif
894 auto_empty_task root(__TBB_CONTEXT_ARG(s, &exec_context));
895 root.prefix().ref_count = 2;
896 my_arena->enqueue_task(*new(task::allocate_root(__TBB_CONTEXT_ARG1(exec_context)))
897 delegated_task(d, my_arena->my_exit_monitors, &root),
898 0, s->my_random); // TODO: priority?
899 size_t index2 = arena::out_of_arena;
900 do {
901 my_arena->my_exit_monitors.prepare_wait(waiter, (uintptr_t)&d);
902 if (__TBB_load_with_acquire(root.prefix().ref_count) < 2) {
903 my_arena->my_exit_monitors.cancel_wait(waiter);
904 break;
905 }
906 index2 = my_arena->occupy_free_slot</*as_worker*/false>(*s);
907 if (index2 != arena::out_of_arena) {
908 my_arena->my_exit_monitors.cancel_wait(waiter);
909 nested_arena_context scope(s, my_arena, index2, scheduler_properties::master, same_arena);
910 s->local_wait_for_all(root, NULL);
911#if TBB_USE_EXCEPTIONS
912 __TBB_ASSERT(!exec_context.my_exception, NULL); // exception can be thrown above, not deferred
913#endif
914 __TBB_ASSERT(root.prefix().ref_count == 0, NULL);
915 break;
916 }
917 my_arena->my_exit_monitors.commit_wait(waiter);
918 } while (__TBB_load_with_acquire(root.prefix().ref_count) == 2);
919 if (index2 == arena::out_of_arena) {
920 // notify a waiting thread even if this thread did not enter arena,
921 // in case it was woken by a leaving thread but did not need to enter
922 my_arena->my_exit_monitors.notify_one(); // do not relax!
923 }
924#if TBB_USE_EXCEPTIONS
925 // process possible exception
926 if (task_group_context::exception_container_type *pe = exec_context.my_exception)
927 TbbRethrowException(pe);
928#endif
929 return;
930#if __TBB_USE_OPTIONAL_RTTI
931 } // if task came from graph
932#endif
933 } // if (index1 == arena::out_of_arena)
934 } // if (!same_arena)
935
936 context_guard_helper</*report_tasks=*/false> context_guard;
937 context_guard.set_ctx(__TBB_CONTEXT_ARG1(my_context));
938#if TBB_USE_EXCEPTIONS
939 try {
940#endif
941 //TODO: replace dummy tasks for workers as well to avoid using of the_dummy_context
942 nested_arena_context scope(s, my_arena, index1, scheduler_properties::master, same_arena);
943 d();
944#if TBB_USE_EXCEPTIONS
945 }
946 catch (...) {
947 context_guard.restore_default(); // TODO: is it needed on Windows?
948 if (my_version_and_traits & exact_exception_flag) throw;
949 else {
950 task_group_context exception_container(task_group_context::isolated,
951 task_group_context::default_traits & ~task_group_context::exact_exception);
952 exception_container.register_pending_exception();
953 __TBB_ASSERT(exception_container.my_exception, NULL);
954 TbbRethrowException(exception_container.my_exception);
955 }
956 }
957#endif
958}
959
960// this wait task is a temporary approach to wait for arena emptiness for masters without slots
961// TODO: it will be rather reworked for one source of notification from is_out_of_work
962class wait_task : public task {
963 binary_semaphore & my_signal;
964 task* execute() __TBB_override {
965 generic_scheduler* s = governor::local_scheduler_if_initialized();
966 __TBB_ASSERT( s, NULL );
967 __TBB_ASSERT( s->outermost_level(), "The enqueued task can be processed only on outermost level" );
968 if ( s->is_worker() ) {
969 __TBB_ASSERT( s->my_innermost_running_task == this, NULL );
970 // Mimic worker on outermost level to run remaining tasks
971 s->my_innermost_running_task = s->my_dummy_task;
972 s->local_wait_for_all( *s->my_dummy_task, NULL );
973 s->my_innermost_running_task = this;
974 } else s->my_arena->is_out_of_work(); // avoids starvation of internal_wait: issuing this task makes arena full
975 my_signal.V();
976 return NULL;
977 }
978public:
979 wait_task ( binary_semaphore & sema ) : my_signal(sema) {}
980};
981
982void task_arena_base::internal_wait() const {
983 __TBB_ASSERT(my_arena, NULL);
984 generic_scheduler* s = governor::local_scheduler_weak();
985 __TBB_ASSERT(s, "Scheduler is not initialized");
986 __TBB_ASSERT(s->my_arena != my_arena || s->my_arena_index == 0, "task_arena::wait_until_empty() is not supported within a worker context" );
987 if( s->my_arena == my_arena ) {
988 //unsupported, but try do something for outermost master
989 __TBB_ASSERT(s->master_outermost_level(), "unsupported");
990 if( !s->my_arena_index )
991 while( my_arena->num_workers_active() )
992 s->wait_until_empty();
993 } else for(;;) {
994 while( my_arena->my_pool_state != arena::SNAPSHOT_EMPTY ) {
995 if( !__TBB_load_with_acquire(my_arena->my_slots[0].my_scheduler) // TODO TEMP: one master, make more masters
996 && as_atomic(my_arena->my_slots[0].my_scheduler).compare_and_swap(s, NULL) == NULL ) {
997 nested_arena_context a(s, my_arena, 0, scheduler_properties::worker, false);
998 s->wait_until_empty();
999 } else {
1000 binary_semaphore waiter; // TODO: replace by a single event notification from is_out_of_work
1001 internal_enqueue( *new( task::allocate_root(__TBB_CONTEXT_ARG1(*my_context)) ) wait_task(waiter), 0 ); // TODO: priority?
1002 waiter.P(); // TODO: concurrent_monitor
1003 }
1004 }
1005 if( !my_arena->num_workers_active() && !my_arena->my_slots[0].my_scheduler) // no activity
1006 break; // spin until workers active but avoid spinning in a worker
1007 __TBB_Yield(); // wait until workers and master leave
1008 }
1009}
1010
1011/*static*/ int task_arena_base::internal_current_slot() {
1012 generic_scheduler* s = governor::local_scheduler_if_initialized();
1013 return s? int(s->my_arena_index) : -1;
1014}
1015
1016#if __TBB_TASK_ISOLATION
1017class isolation_guard : tbb::internal::no_copy {
1018 isolation_tag &guarded;
1019 isolation_tag previous_value;
1020public:
1021 isolation_guard( isolation_tag &isolation ) : guarded( isolation ), previous_value( isolation ) {}
1022 ~isolation_guard() {
1023 guarded = previous_value;
1024 }
1025};
1026
1027void isolate_within_arena( delegate_base& d, intptr_t reserved ) {
1028 __TBB_ASSERT_EX( reserved == 0, NULL );
1029 // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it?
1030 generic_scheduler* s = governor::local_scheduler_weak();
1031 __TBB_ASSERT( s, "this_task_arena::isolate() needs an initialized scheduler" );
1032 // Theoretically, we can keep the current isolation in the scheduler; however, it makes sense to store it in innermost
1033 // running task because it can in principle be queried via task::self().
1034 isolation_tag& current_isolation = s->my_innermost_running_task->prefix().isolation;
1035 // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard.
1036 isolation_guard guard( current_isolation );
1037 current_isolation = reinterpret_cast<isolation_tag>(&d);
1038 d();
1039}
1040#endif /* __TBB_TASK_ISOLATION */
1041
1042int task_arena_base::internal_max_concurrency(const task_arena *ta) {
1043 arena* a = NULL;
1044 if( ta ) // for special cases of ta->max_concurrency()
1045 a = ta->my_arena;
1046 else if( generic_scheduler* s = governor::local_scheduler_if_initialized() )
1047 a = s->my_arena; // the current arena if any
1048
1049 if( a ) { // Get parameters from the arena
1050 __TBB_ASSERT( !ta || ta->my_max_concurrency==1, NULL );
1051 return a->my_num_reserved_slots + a->my_max_num_workers;
1052 } else {
1053 __TBB_ASSERT( !ta || ta->my_max_concurrency==automatic, NULL );
1054 return int(governor::default_num_threads());
1055 }
1056}
1057} // tbb::interfaceX::internal
1058} // tbb::interfaceX
1059} // tbb
1060