1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | #include "tbb/global_control.h" // thread_stack_size |
18 | |
19 | #include "scheduler.h" |
20 | #include "governor.h" |
21 | #include "arena.h" |
22 | #include "itt_notify.h" |
23 | #include "semaphore.h" |
24 | #include "tbb/internal/_flow_graph_impl.h" |
25 | |
26 | #include <functional> |
27 | |
28 | #if __TBB_STATISTICS_STDOUT |
29 | #include <cstdio> |
30 | #endif |
31 | |
32 | namespace tbb { |
33 | namespace internal { |
34 | |
35 | // put it here in order to enable compiler to inline it into arena::process and nested_arena_entry |
36 | void generic_scheduler::attach_arena( arena* a, size_t index, bool is_master ) { |
37 | __TBB_ASSERT( a->my_market == my_market, NULL ); |
38 | my_arena = a; |
39 | my_arena_index = index; |
40 | my_arena_slot = a->my_slots + index; |
41 | attach_mailbox( affinity_id(index+1) ); |
42 | if ( is_master && my_inbox.is_idle_state( true ) ) { |
43 | // Master enters an arena with its own task to be executed. It means that master is not |
44 | // going to enter stealing loop and take affinity tasks. |
45 | my_inbox.set_is_idle( false ); |
46 | } |
47 | #if __TBB_TASK_GROUP_CONTEXT |
48 | // Context to be used by root tasks by default (if the user has not specified one). |
49 | if( !is_master ) |
50 | my_dummy_task->prefix().context = a->my_default_ctx; |
51 | #endif /* __TBB_TASK_GROUP_CONTEXT */ |
52 | #if __TBB_TASK_PRIORITY |
53 | // In the current implementation master threads continue processing even when |
54 | // there are other masters with higher priority. Only TBB worker threads are |
55 | // redistributed between arenas based on the latters' priority. Thus master |
56 | // threads use arena's top priority as a reference point (in contrast to workers |
57 | // that use my_market->my_global_top_priority). |
58 | if( is_master ) { |
59 | my_ref_top_priority = &a->my_top_priority; |
60 | my_ref_reload_epoch = &a->my_reload_epoch; |
61 | } |
62 | my_local_reload_epoch = *my_ref_reload_epoch; |
63 | __TBB_ASSERT( !my_offloaded_tasks, NULL ); |
64 | #endif /* __TBB_TASK_PRIORITY */ |
65 | } |
66 | |
67 | inline static bool occupy_slot( generic_scheduler*& slot, generic_scheduler& s ) { |
68 | return !slot && as_atomic( slot ).compare_and_swap( &s, NULL ) == NULL; |
69 | } |
70 | |
71 | size_t arena::occupy_free_slot_in_range( generic_scheduler& s, size_t lower, size_t upper ) { |
72 | if ( lower >= upper ) return out_of_arena; |
73 | // Start search for an empty slot from the one we occupied the last time |
74 | size_t index = s.my_arena_index; |
75 | if ( index < lower || index >= upper ) index = s.my_random.get() % (upper - lower) + lower; |
76 | __TBB_ASSERT( index >= lower && index < upper, NULL ); |
77 | // Find a free slot |
78 | for ( size_t i = index; i < upper; ++i ) |
79 | if ( occupy_slot(my_slots[i].my_scheduler, s) ) return i; |
80 | for ( size_t i = lower; i < index; ++i ) |
81 | if ( occupy_slot(my_slots[i].my_scheduler, s) ) return i; |
82 | return out_of_arena; |
83 | } |
84 | |
85 | template <bool as_worker> |
86 | size_t arena::occupy_free_slot( generic_scheduler& s ) { |
87 | // Firstly, masters try to occupy reserved slots |
88 | size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( s, 0, my_num_reserved_slots ); |
89 | if ( index == out_of_arena ) { |
90 | // Secondly, all threads try to occupy all non-reserved slots |
91 | index = occupy_free_slot_in_range( s, my_num_reserved_slots, my_num_slots ); |
92 | // Likely this arena is already saturated |
93 | if ( index == out_of_arena ) |
94 | return out_of_arena; |
95 | } |
96 | |
97 | ITT_NOTIFY(sync_acquired, my_slots + index); |
98 | atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() ); |
99 | return index; |
100 | } |
101 | |
102 | void arena::process( generic_scheduler& s ) { |
103 | __TBB_ASSERT( is_alive(my_guard), NULL ); |
104 | __TBB_ASSERT( governor::is_set(&s), NULL ); |
105 | __TBB_ASSERT( s.my_innermost_running_task == s.my_dummy_task, NULL ); |
106 | __TBB_ASSERT( s.worker_outermost_level(), NULL ); |
107 | |
108 | __TBB_ASSERT( my_num_slots > 1, NULL ); |
109 | |
110 | size_t index = occupy_free_slot</*as_worker*/true>( s ); |
111 | if ( index == out_of_arena ) |
112 | goto quit; |
113 | |
114 | __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" ); |
115 | s.attach_arena( this, index, /*is_master*/false ); |
116 | |
117 | #if !__TBB_FP_CONTEXT |
118 | my_cpu_ctl_env.set_env(); |
119 | #endif |
120 | |
121 | #if __TBB_ARENA_OBSERVER |
122 | __TBB_ASSERT( !s.my_last_local_observer, "There cannot be notified local observers when entering arena" ); |
123 | my_observers.notify_entry_observers( s.my_last_local_observer, /*worker=*/true ); |
124 | #endif /* __TBB_ARENA_OBSERVER */ |
125 | |
126 | // Task pool can be marked as non-empty if the worker occupies the slot left by a master. |
127 | if ( s.my_arena_slot->task_pool != EmptyTaskPool ) { |
128 | __TBB_ASSERT( s.my_inbox.is_idle_state(false), NULL ); |
129 | s.local_wait_for_all( *s.my_dummy_task, NULL ); |
130 | __TBB_ASSERT( s.my_inbox.is_idle_state(true), NULL ); |
131 | } |
132 | |
133 | for ( ;; ) { |
134 | __TBB_ASSERT( s.my_innermost_running_task == s.my_dummy_task, NULL ); |
135 | __TBB_ASSERT( s.worker_outermost_level(), NULL ); |
136 | __TBB_ASSERT( is_alive(my_guard), NULL ); |
137 | __TBB_ASSERT( s.is_quiescent_local_task_pool_reset(), |
138 | "Worker cannot leave arena while its task pool is not reset" ); |
139 | __TBB_ASSERT( s.my_arena_slot->task_pool == EmptyTaskPool, "Empty task pool is not marked appropriately" ); |
140 | // This check prevents relinquishing more than necessary workers because |
141 | // of the non-atomicity of the decision making procedure |
142 | if ( num_workers_active() > my_num_workers_allotted |
143 | #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY |
144 | || recall_by_mandatory_request() |
145 | #endif |
146 | ) |
147 | break; |
148 | // Try to steal a task. |
149 | // Passing reference count is technically unnecessary in this context, |
150 | // but omitting it here would add checks inside the function. |
151 | task* t = s.receive_or_steal_task( __TBB_ISOLATION_ARG( s.my_dummy_task->prefix().ref_count, no_isolation ) ); |
152 | if (t) { |
153 | // A side effect of receive_or_steal_task is that my_innermost_running_task can be set. |
154 | // But for the outermost dispatch loop it has to be a dummy task. |
155 | s.my_innermost_running_task = s.my_dummy_task; |
156 | s.local_wait_for_all(*s.my_dummy_task,t); |
157 | } |
158 | } |
159 | #if __TBB_ARENA_OBSERVER |
160 | my_observers.notify_exit_observers( s.my_last_local_observer, /*worker=*/true ); |
161 | s.my_last_local_observer = NULL; |
162 | #endif /* __TBB_ARENA_OBSERVER */ |
163 | #if __TBB_TASK_PRIORITY |
164 | if ( s.my_offloaded_tasks ) |
165 | orphan_offloaded_tasks( s ); |
166 | #endif /* __TBB_TASK_PRIORITY */ |
167 | #if __TBB_STATISTICS |
168 | ++s.my_counters.arena_roundtrips; |
169 | *my_slots[index].my_counters += s.my_counters; |
170 | s.my_counters.reset(); |
171 | #endif /* __TBB_STATISTICS */ |
172 | __TBB_store_with_release( my_slots[index].my_scheduler, (generic_scheduler*)NULL ); |
173 | s.my_arena_slot = 0; // detached from slot |
174 | s.my_inbox.detach(); |
175 | __TBB_ASSERT( s.my_inbox.is_idle_state(true), NULL ); |
176 | __TBB_ASSERT( s.my_innermost_running_task == s.my_dummy_task, NULL ); |
177 | __TBB_ASSERT( s.worker_outermost_level(), NULL ); |
178 | __TBB_ASSERT( is_alive(my_guard), NULL ); |
179 | quit: |
180 | // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible |
181 | // that arena may be temporarily left unpopulated by threads. See comments in |
182 | // arena::on_thread_leaving() for more details. |
183 | on_thread_leaving<ref_worker>(); |
184 | } |
185 | |
186 | arena::arena ( market& m, unsigned num_slots, unsigned num_reserved_slots ) { |
187 | __TBB_ASSERT( !my_guard, "improperly allocated arena?" ); |
188 | __TBB_ASSERT( sizeof(my_slots[0]) % NFS_GetLineSize()==0, "arena::slot size not multiple of cache line size" ); |
189 | __TBB_ASSERT( (uintptr_t)this % NFS_GetLineSize()==0, "arena misaligned" ); |
190 | #if __TBB_TASK_PRIORITY |
191 | __TBB_ASSERT( !my_reload_epoch && !my_orphaned_tasks && !my_skipped_fifo_priority, "New arena object is not zeroed" ); |
192 | #endif /* __TBB_TASK_PRIORITY */ |
193 | my_market = &m; |
194 | my_limit = 1; |
195 | // Two slots are mandatory: for the master, and for 1 worker (required to support starvation resistant tasks). |
196 | my_num_slots = num_arena_slots(num_slots); |
197 | my_num_reserved_slots = num_reserved_slots; |
198 | my_max_num_workers = num_slots-num_reserved_slots; |
199 | my_references = ref_external; // accounts for the master |
200 | #if __TBB_TASK_PRIORITY |
201 | my_bottom_priority = my_top_priority = normalized_normal_priority; |
202 | #endif /* __TBB_TASK_PRIORITY */ |
203 | my_aba_epoch = m.my_arenas_aba_epoch; |
204 | #if __TBB_ARENA_OBSERVER |
205 | my_observers.my_arena = this; |
206 | #endif |
207 | __TBB_ASSERT ( my_max_num_workers <= my_num_slots, NULL ); |
208 | // Construct slots. Mark internal synchronization elements for the tools. |
209 | for( unsigned i = 0; i < my_num_slots; ++i ) { |
210 | __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, NULL ); |
211 | __TBB_ASSERT( !my_slots[i].task_pool_ptr, NULL ); |
212 | __TBB_ASSERT( !my_slots[i].my_task_pool_size, NULL ); |
213 | ITT_SYNC_CREATE(my_slots + i, SyncType_Scheduler, SyncObj_WorkerTaskPool); |
214 | mailbox(i+1).construct(); |
215 | ITT_SYNC_CREATE(&mailbox(i+1), SyncType_Scheduler, SyncObj_Mailbox); |
216 | my_slots[i].hint_for_pop = i; |
217 | #if __TBB_PREVIEW_CRITICAL_TASKS |
218 | my_slots[i].hint_for_critical = i; |
219 | #endif |
220 | #if __TBB_STATISTICS |
221 | my_slots[i].my_counters = new ( NFS_Allocate(1, sizeof(statistics_counters), NULL) ) statistics_counters; |
222 | #endif /* __TBB_STATISTICS */ |
223 | } |
224 | my_task_stream.initialize(my_num_slots); |
225 | ITT_SYNC_CREATE(&my_task_stream, SyncType_Scheduler, SyncObj_TaskStream); |
226 | #if __TBB_PREVIEW_CRITICAL_TASKS |
227 | my_critical_task_stream.initialize(my_num_slots); |
228 | ITT_SYNC_CREATE(&my_critical_task_stream, SyncType_Scheduler, SyncObj_CriticalTaskStream); |
229 | #endif |
230 | #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY |
231 | my_concurrency_mode = cm_normal; |
232 | #endif |
233 | #if !__TBB_FP_CONTEXT |
234 | my_cpu_ctl_env.get_env(); |
235 | #endif |
236 | } |
237 | |
238 | arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots ) { |
239 | __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" ); |
240 | __TBB_ASSERT( sizeof(base_type) % NFS_GetLineSize() == 0, "arena slots area misaligned: wrong padding" ); |
241 | __TBB_ASSERT( sizeof(mail_outbox) == NFS_MaxLineSize, "Mailbox padding is wrong" ); |
242 | size_t n = allocation_size(num_arena_slots(num_slots)); |
243 | unsigned char* storage = (unsigned char*)NFS_Allocate( 1, n, NULL ); |
244 | // Zero all slots to indicate that they are empty |
245 | memset( storage, 0, n ); |
246 | return *new( storage + num_arena_slots(num_slots) * sizeof(mail_outbox) ) arena(m, num_slots, num_reserved_slots); |
247 | } |
248 | |
249 | void arena::free_arena () { |
250 | __TBB_ASSERT( is_alive(my_guard), NULL ); |
251 | __TBB_ASSERT( !my_references, "There are threads in the dying arena" ); |
252 | __TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" ); |
253 | __TBB_ASSERT( my_pool_state == SNAPSHOT_EMPTY || !my_max_num_workers, "Inconsistent state of a dying arena" ); |
254 | #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY |
255 | __TBB_ASSERT( my_concurrency_mode != cm_enforced_global, NULL ); |
256 | #endif |
257 | #if !__TBB_STATISTICS_EARLY_DUMP |
258 | GATHER_STATISTIC( dump_arena_statistics() ); |
259 | #endif |
260 | poison_value( my_guard ); |
261 | intptr_t drained = 0; |
262 | for ( unsigned i = 0; i < my_num_slots; ++i ) { |
263 | __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" ); |
264 | // TODO: understand the assertion and modify |
265 | // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, NULL ); |
266 | __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, NULL ); // TODO: replace by is_quiescent_local_task_pool_empty |
267 | my_slots[i].free_task_pool(); |
268 | #if __TBB_STATISTICS |
269 | NFS_Free( my_slots[i].my_counters ); |
270 | #endif /* __TBB_STATISTICS */ |
271 | drained += mailbox(i+1).drain(); |
272 | } |
273 | __TBB_ASSERT( my_task_stream.drain()==0, "Not all enqueued tasks were executed" ); |
274 | #if __TBB_PREVIEW_CRITICAL_TASKS |
275 | __TBB_ASSERT( my_critical_task_stream.drain()==0, "Not all critical tasks were executed" ); |
276 | #endif |
277 | #if __TBB_COUNT_TASK_NODES |
278 | my_market->update_task_node_count( -drained ); |
279 | #endif /* __TBB_COUNT_TASK_NODES */ |
280 | // remove an internal reference |
281 | my_market->release( /*is_public=*/false, /*blocking_terminate=*/false ); |
282 | #if __TBB_TASK_GROUP_CONTEXT |
283 | __TBB_ASSERT( my_default_ctx, "Master thread never entered the arena?" ); |
284 | my_default_ctx->~task_group_context(); |
285 | NFS_Free(my_default_ctx); |
286 | #endif /* __TBB_TASK_GROUP_CONTEXT */ |
287 | #if __TBB_ARENA_OBSERVER |
288 | if ( !my_observers.empty() ) |
289 | my_observers.clear(); |
290 | #endif /* __TBB_ARENA_OBSERVER */ |
291 | void* storage = &mailbox(my_num_slots); |
292 | __TBB_ASSERT( my_references == 0, NULL ); |
293 | __TBB_ASSERT( my_pool_state == SNAPSHOT_EMPTY || !my_max_num_workers, NULL ); |
294 | this->~arena(); |
295 | #if TBB_USE_ASSERT > 1 |
296 | memset( storage, 0, allocation_size(my_num_slots) ); |
297 | #endif /* TBB_USE_ASSERT */ |
298 | NFS_Free( storage ); |
299 | } |
300 | |
301 | #if __TBB_STATISTICS |
302 | void arena::dump_arena_statistics () { |
303 | statistics_counters total; |
304 | for( unsigned i = 0; i < my_num_slots; ++i ) { |
305 | #if __TBB_STATISTICS_EARLY_DUMP |
306 | generic_scheduler* s = my_slots[i].my_scheduler; |
307 | if ( s ) |
308 | *my_slots[i].my_counters += s->my_counters; |
309 | #else |
310 | __TBB_ASSERT( !my_slots[i].my_scheduler, NULL ); |
311 | #endif |
312 | if ( i != 0 ) { |
313 | total += *my_slots[i].my_counters; |
314 | dump_statistics( *my_slots[i].my_counters, i ); |
315 | } |
316 | } |
317 | dump_statistics( *my_slots[0].my_counters, 0 ); |
318 | #if __TBB_STATISTICS_STDOUT |
319 | #if !__TBB_STATISTICS_TOTALS_ONLY |
320 | printf( "----------------------------------------------\n" ); |
321 | #endif |
322 | dump_statistics( total, workers_counters_total ); |
323 | total += *my_slots[0].my_counters; |
324 | dump_statistics( total, arena_counters_total ); |
325 | #if !__TBB_STATISTICS_TOTALS_ONLY |
326 | printf( "==============================================\n" ); |
327 | #endif |
328 | #endif /* __TBB_STATISTICS_STDOUT */ |
329 | } |
330 | #endif /* __TBB_STATISTICS */ |
331 | |
332 | #if __TBB_TASK_PRIORITY |
333 | // The method inspects a scheduler to determine: |
334 | // 1. if it has tasks that can be retrieved and executed (via the return value); |
335 | // 2. if it has any tasks at all, including those of lower priority (via tasks_present); |
336 | // 3. if it is able to work with enqueued tasks (via dequeuing_possible). |
337 | inline bool arena::may_have_tasks ( generic_scheduler* s, bool& tasks_present, bool& dequeuing_possible ) { |
338 | if ( !s || s->my_arena != this ) |
339 | return false; |
340 | dequeuing_possible |= s->worker_outermost_level(); |
341 | if ( s->my_pool_reshuffling_pending ) { |
342 | // This primary task pool is nonempty and may contain tasks at the current |
343 | // priority level. Its owner is winnowing lower priority tasks at the moment. |
344 | tasks_present = true; |
345 | return true; |
346 | } |
347 | if ( s->my_offloaded_tasks ) { |
348 | tasks_present = true; |
349 | if ( s->my_local_reload_epoch < *s->my_ref_reload_epoch ) { |
350 | // This scheduler's offload area is nonempty and may contain tasks at the |
351 | // current priority level. |
352 | return true; |
353 | } |
354 | } |
355 | return false; |
356 | } |
357 | |
358 | void arena::orphan_offloaded_tasks(generic_scheduler& s) { |
359 | __TBB_ASSERT( s.my_offloaded_tasks, NULL ); |
360 | GATHER_STATISTIC( ++s.my_counters.prio_orphanings ); |
361 | ++my_abandonment_epoch; |
362 | __TBB_ASSERT( s.my_offloaded_task_list_tail_link && !*s.my_offloaded_task_list_tail_link, NULL ); |
363 | task* orphans; |
364 | do { |
365 | orphans = const_cast<task*>(my_orphaned_tasks); |
366 | *s.my_offloaded_task_list_tail_link = orphans; |
367 | } while ( as_atomic(my_orphaned_tasks).compare_and_swap(s.my_offloaded_tasks, orphans) != orphans ); |
368 | s.my_offloaded_tasks = NULL; |
369 | #if TBB_USE_ASSERT |
370 | s.my_offloaded_task_list_tail_link = NULL; |
371 | #endif /* TBB_USE_ASSERT */ |
372 | } |
373 | #endif /* __TBB_TASK_PRIORITY */ |
374 | |
375 | bool arena::has_enqueued_tasks() { |
376 | // Look for enqueued tasks at all priority levels |
377 | for ( int p = 0; p < num_priority_levels; ++p ) |
378 | if ( !my_task_stream.empty(p) ) |
379 | return true; |
380 | return false; |
381 | } |
382 | |
383 | void arena::restore_priority_if_need() { |
384 | // Check for the presence of enqueued tasks "lost" on some of |
385 | // priority levels because updating arena priority and switching |
386 | // arena into "populated" (FULL) state happen non-atomically. |
387 | // Imposing atomicity would require task::enqueue() to use a lock, |
388 | // which is unacceptable. |
389 | if ( has_enqueued_tasks() ) { |
390 | advertise_new_work<work_enqueued>(); |
391 | #if __TBB_TASK_PRIORITY |
392 | // update_arena_priority() expects non-zero arena::my_num_workers_requested, |
393 | // so must be called after advertise_new_work<work_enqueued>() |
394 | for ( int p = 0; p < num_priority_levels; ++p ) |
395 | if ( !my_task_stream.empty(p) ) { |
396 | if ( p < my_bottom_priority || p > my_top_priority ) |
397 | my_market->update_arena_priority(*this, p); |
398 | } |
399 | #endif |
400 | } |
401 | } |
402 | |
403 | bool arena::is_out_of_work() { |
404 | // TODO: rework it to return at least a hint about where a task was found; better if the task itself. |
405 | for(;;) { |
406 | pool_state_t snapshot = my_pool_state; |
407 | switch( snapshot ) { |
408 | case SNAPSHOT_EMPTY: |
409 | return true; |
410 | case SNAPSHOT_FULL: { |
411 | // Use unique id for "busy" in order to avoid ABA problems. |
412 | const pool_state_t busy = pool_state_t(&busy); |
413 | // Request permission to take snapshot |
414 | if( my_pool_state.compare_and_swap( busy, SNAPSHOT_FULL )==SNAPSHOT_FULL ) { |
415 | // Got permission. Take the snapshot. |
416 | // NOTE: This is not a lock, as the state can be set to FULL at |
417 | // any moment by a thread that spawns/enqueues new task. |
418 | size_t n = my_limit; |
419 | // Make local copies of volatile parameters. Their change during |
420 | // snapshot taking procedure invalidates the attempt, and returns |
421 | // this thread into the dispatch loop. |
422 | #if __TBB_TASK_PRIORITY |
423 | uintptr_t reload_epoch = __TBB_load_with_acquire( my_reload_epoch ); |
424 | intptr_t top_priority = my_top_priority; |
425 | // Inspect primary task pools first |
426 | #endif /* __TBB_TASK_PRIORITY */ |
427 | size_t k; |
428 | for( k=0; k<n; ++k ) { |
429 | if( my_slots[k].task_pool != EmptyTaskPool && |
430 | __TBB_load_relaxed(my_slots[k].head) < __TBB_load_relaxed(my_slots[k].tail) ) |
431 | { |
432 | // k-th primary task pool is nonempty and does contain tasks. |
433 | break; |
434 | } |
435 | if( my_pool_state!=busy ) |
436 | return false; // the work was published |
437 | } |
438 | __TBB_ASSERT( k <= n, NULL ); |
439 | bool work_absent = k == n; |
440 | #if __TBB_PREVIEW_CRITICAL_TASKS |
441 | bool no_critical_tasks = my_critical_task_stream.empty(0); |
442 | work_absent &= no_critical_tasks; |
443 | #endif |
444 | #if __TBB_TASK_PRIORITY |
445 | // Variable tasks_present indicates presence of tasks at any priority |
446 | // level, while work_absent refers only to the current priority. |
447 | bool tasks_present = !work_absent || my_orphaned_tasks; |
448 | bool dequeuing_possible = false; |
449 | if ( work_absent ) { |
450 | // Check for the possibility that recent priority changes |
451 | // brought some tasks to the current priority level |
452 | |
453 | uintptr_t abandonment_epoch = my_abandonment_epoch; |
454 | // Master thread's scheduler needs special handling as it |
455 | // may be destroyed at any moment (workers' schedulers are |
456 | // guaranteed to be alive while at least one thread is in arena). |
457 | // The lock below excludes concurrency with task group state change |
458 | // propagation and guarantees lifetime of the master thread. |
459 | the_context_state_propagation_mutex.lock(); |
460 | work_absent = !may_have_tasks( my_slots[0].my_scheduler, tasks_present, dequeuing_possible ); |
461 | the_context_state_propagation_mutex.unlock(); |
462 | // The following loop is subject to data races. While k-th slot's |
463 | // scheduler is being examined, corresponding worker can either |
464 | // leave to RML or migrate to another arena. |
465 | // But the races are not prevented because all of them are benign. |
466 | // First, the code relies on the fact that worker thread's scheduler |
467 | // object persists until the whole library is deinitialized. |
468 | // Second, in the worst case the races can only cause another |
469 | // round of stealing attempts to be undertaken. Introducing complex |
470 | // synchronization into this coldest part of the scheduler's control |
471 | // flow does not seem to make sense because it both is unlikely to |
472 | // ever have any observable performance effect, and will require |
473 | // additional synchronization code on the hotter paths. |
474 | for( k = 1; work_absent && k < n; ++k ) { |
475 | if( my_pool_state!=busy ) |
476 | return false; // the work was published |
477 | work_absent = !may_have_tasks( my_slots[k].my_scheduler, tasks_present, dequeuing_possible ); |
478 | } |
479 | // Preclude premature switching arena off because of a race in the previous loop. |
480 | work_absent = work_absent |
481 | && !__TBB_load_with_acquire(my_orphaned_tasks) |
482 | && abandonment_epoch == my_abandonment_epoch; |
483 | } |
484 | #endif /* __TBB_TASK_PRIORITY */ |
485 | // Test and test-and-set. |
486 | if( my_pool_state==busy ) { |
487 | #if __TBB_TASK_PRIORITY |
488 | bool no_fifo_tasks = my_task_stream.empty(top_priority); |
489 | work_absent = work_absent && (!dequeuing_possible || no_fifo_tasks) |
490 | && top_priority == my_top_priority && reload_epoch == my_reload_epoch; |
491 | #else |
492 | bool no_fifo_tasks = my_task_stream.empty(0); |
493 | work_absent = work_absent && no_fifo_tasks; |
494 | #endif /* __TBB_TASK_PRIORITY */ |
495 | if( work_absent ) { |
496 | #if __TBB_TASK_PRIORITY |
497 | if ( top_priority > my_bottom_priority ) { |
498 | if ( my_market->lower_arena_priority(*this, top_priority - 1, reload_epoch) |
499 | && !my_task_stream.empty(top_priority) ) |
500 | { |
501 | atomic_update( my_skipped_fifo_priority, top_priority, std::less<intptr_t>()); |
502 | } |
503 | } |
504 | else if ( !tasks_present && !my_orphaned_tasks && no_fifo_tasks ) { |
505 | #endif /* __TBB_TASK_PRIORITY */ |
506 | // save current demand value before setting SNAPSHOT_EMPTY, |
507 | // to avoid race with advertise_new_work. |
508 | int current_demand = (int)my_max_num_workers; |
509 | if( my_pool_state.compare_and_swap( SNAPSHOT_EMPTY, busy )==busy ) { |
510 | #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY |
511 | if( my_concurrency_mode==cm_enforced_global ) { |
512 | // adjust_demand() called inside, if needed |
513 | my_market->mandatory_concurrency_disable( this ); |
514 | } else |
515 | #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */ |
516 | { |
517 | // This thread transitioned pool to empty state, and thus is |
518 | // responsible for telling the market that there is no work to do. |
519 | my_market->adjust_demand( *this, -current_demand ); |
520 | } |
521 | restore_priority_if_need(); |
522 | return true; |
523 | } |
524 | return false; |
525 | #if __TBB_TASK_PRIORITY |
526 | } |
527 | #endif /* __TBB_TASK_PRIORITY */ |
528 | } |
529 | // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it. |
530 | my_pool_state.compare_and_swap( SNAPSHOT_FULL, busy ); |
531 | } |
532 | } |
533 | return false; |
534 | } |
535 | default: |
536 | // Another thread is taking a snapshot. |
537 | return false; |
538 | } |
539 | } |
540 | } |
541 | |
542 | #if __TBB_COUNT_TASK_NODES |
543 | intptr_t arena::workers_task_node_count() { |
544 | intptr_t result = 0; |
545 | for( unsigned i = 1; i < my_num_slots; ++i ) { |
546 | generic_scheduler* s = my_slots[i].my_scheduler; |
547 | if( s ) |
548 | result += s->my_task_node_count; |
549 | } |
550 | return result; |
551 | } |
552 | #endif /* __TBB_COUNT_TASK_NODES */ |
553 | |
554 | void arena::enqueue_task( task& t, intptr_t prio, FastRandom &random ) |
555 | { |
556 | #if __TBB_RECYCLE_TO_ENQUEUE |
557 | __TBB_ASSERT( t.state()==task::allocated || t.state()==task::to_enqueue, "attempt to enqueue task with inappropriate state" ); |
558 | #else |
559 | __TBB_ASSERT( t.state()==task::allocated, "attempt to enqueue task that is not in 'allocated' state" ); |
560 | #endif |
561 | t.prefix().state = task::ready; |
562 | t.prefix().extra_state |= es_task_enqueued; // enqueued task marker |
563 | |
564 | #if TBB_USE_ASSERT |
565 | if( task* parent = t.parent() ) { |
566 | internal::reference_count ref_count = parent->prefix().ref_count; |
567 | __TBB_ASSERT( ref_count!=0, "attempt to enqueue task whose parent has a ref_count==0 (forgot to set_ref_count?)" ); |
568 | __TBB_ASSERT( ref_count>0, "attempt to enqueue task whose parent has a ref_count<0" ); |
569 | parent->prefix().extra_state |= es_ref_count_active; |
570 | } |
571 | __TBB_ASSERT(t.prefix().affinity==affinity_id(0), "affinity is ignored for enqueued tasks" ); |
572 | #endif /* TBB_USE_ASSERT */ |
573 | #if __TBB_PREVIEW_CRITICAL_TASKS |
574 | if( prio == internal::priority_critical || internal::is_critical( t ) ) { |
575 | // TODO: consider using of 'scheduler::handled_as_critical' |
576 | internal::make_critical( t ); |
577 | generic_scheduler* s = governor::local_scheduler_if_initialized(); |
578 | ITT_NOTIFY(sync_releasing, &my_critical_task_stream); |
579 | if( s && s->my_arena_slot ) { |
580 | // Scheduler is initialized and it is attached to the arena, |
581 | // propagate isolation level to critical task |
582 | #if __TBB_TASK_ISOLATION |
583 | t.prefix().isolation = s->my_innermost_running_task->prefix().isolation; |
584 | #endif |
585 | unsigned& lane = s->my_arena_slot->hint_for_critical; |
586 | my_critical_task_stream.push( &t, 0, tbb::internal::subsequent_lane_selector(lane) ); |
587 | } else { |
588 | // Either scheduler is not initialized or it is not attached to the arena |
589 | // use random lane for the task |
590 | my_critical_task_stream.push( &t, 0, internal::random_lane_selector(random) ); |
591 | } |
592 | advertise_new_work<work_spawned>(); |
593 | return; |
594 | } |
595 | #endif /* __TBB_PREVIEW_CRITICAL_TASKS */ |
596 | |
597 | ITT_NOTIFY(sync_releasing, &my_task_stream); |
598 | #if __TBB_TASK_PRIORITY |
599 | intptr_t p = prio ? normalize_priority(priority_t(prio)) : normalized_normal_priority; |
600 | assert_priority_valid(p); |
601 | #if __TBB_PREVIEW_CRITICAL_TASKS && __TBB_CPF_BUILD |
602 | my_task_stream.push( &t, p, internal::random_lane_selector(random) ); |
603 | #else |
604 | my_task_stream.push( &t, p, random ); |
605 | #endif |
606 | if ( p != my_top_priority ) |
607 | my_market->update_arena_priority( *this, p ); |
608 | #else /* !__TBB_TASK_PRIORITY */ |
609 | __TBB_ASSERT_EX(prio == 0, "the library is not configured to respect the task priority" ); |
610 | #if __TBB_PREVIEW_CRITICAL_TASKS && __TBB_CPF_BUILD |
611 | my_task_stream.push( &t, 0, internal::random_lane_selector(random) ); |
612 | #else |
613 | my_task_stream.push( &t, 0, random ); |
614 | #endif |
615 | #endif /* !__TBB_TASK_PRIORITY */ |
616 | advertise_new_work<work_enqueued>(); |
617 | #if __TBB_TASK_PRIORITY |
618 | if ( p != my_top_priority ) |
619 | my_market->update_arena_priority( *this, p ); |
620 | #endif /* __TBB_TASK_PRIORITY */ |
621 | } |
622 | |
623 | class nested_arena_context : no_copy { |
624 | public: |
625 | nested_arena_context(generic_scheduler *s, arena* a, size_t slot_index, bool type, bool same) |
626 | : my_scheduler(*s), my_orig_ctx(NULL), same_arena(same) { |
627 | if (same_arena) { |
628 | my_orig_state.my_properties = my_scheduler.my_properties; |
629 | my_orig_state.my_innermost_running_task = my_scheduler.my_innermost_running_task; |
630 | mimic_outermost_level(a, type); |
631 | } else { |
632 | my_orig_state = *s; |
633 | mimic_outermost_level(a, type); |
634 | s->nested_arena_entry(a, slot_index); |
635 | } |
636 | } |
637 | ~nested_arena_context() { |
638 | #if __TBB_TASK_GROUP_CONTEXT |
639 | my_scheduler.my_dummy_task->prefix().context = my_orig_ctx; // restore context of dummy task |
640 | #endif |
641 | if (same_arena) { |
642 | my_scheduler.my_properties = my_orig_state.my_properties; |
643 | my_scheduler.my_innermost_running_task = my_orig_state.my_innermost_running_task; |
644 | } else { |
645 | my_scheduler.nested_arena_exit(); |
646 | static_cast<scheduler_state&>(my_scheduler) = my_orig_state; // restore arena settings |
647 | #if __TBB_TASK_PRIORITY |
648 | my_scheduler.my_local_reload_epoch = *my_orig_state.my_ref_reload_epoch; |
649 | #endif |
650 | governor::assume_scheduler(&my_scheduler); |
651 | } |
652 | } |
653 | |
654 | private: |
655 | generic_scheduler &my_scheduler; |
656 | scheduler_state my_orig_state; |
657 | task_group_context *my_orig_ctx; |
658 | const bool same_arena; |
659 | |
660 | void mimic_outermost_level(arena* a, bool type) { |
661 | my_scheduler.my_properties.outermost = true; |
662 | my_scheduler.my_properties.type = type; |
663 | my_scheduler.my_innermost_running_task = my_scheduler.my_dummy_task; |
664 | #if __TBB_PREVIEW_CRITICAL_TASKS |
665 | my_scheduler.my_properties.has_taken_critical_task = false; |
666 | #endif |
667 | #if __TBB_TASK_GROUP_CONTEXT |
668 | // Save dummy's context and replace it by arena's context |
669 | my_orig_ctx = my_scheduler.my_dummy_task->prefix().context; |
670 | my_scheduler.my_dummy_task->prefix().context = a->my_default_ctx; |
671 | #endif |
672 | } |
673 | }; |
674 | |
675 | void generic_scheduler::nested_arena_entry(arena* a, size_t slot_index) { |
676 | __TBB_ASSERT( is_alive(a->my_guard), NULL ); |
677 | __TBB_ASSERT( a!=my_arena, NULL); |
678 | |
679 | // overwrite arena settings |
680 | #if __TBB_TASK_PRIORITY |
681 | if ( my_offloaded_tasks ) |
682 | my_arena->orphan_offloaded_tasks( *this ); |
683 | my_offloaded_tasks = NULL; |
684 | #endif /* __TBB_TASK_PRIORITY */ |
685 | attach_arena( a, slot_index, /*is_master*/true ); |
686 | __TBB_ASSERT( my_arena == a, NULL ); |
687 | governor::assume_scheduler( this ); |
688 | // TODO? ITT_NOTIFY(sync_acquired, a->my_slots + index); |
689 | // TODO: it requires market to have P workers (not P-1) |
690 | // TODO: a preempted worker should be excluded from assignment to other arenas e.g. my_slack-- |
691 | if( !is_worker() && slot_index >= my_arena->my_num_reserved_slots ) |
692 | my_arena->my_market->adjust_demand(*my_arena, -1); |
693 | #if __TBB_ARENA_OBSERVER |
694 | my_last_local_observer = 0; // TODO: try optimize number of calls |
695 | my_arena->my_observers.notify_entry_observers( my_last_local_observer, /*worker=*/false ); |
696 | #endif |
697 | } |
698 | |
699 | void generic_scheduler::nested_arena_exit() { |
700 | #if __TBB_ARENA_OBSERVER |
701 | my_arena->my_observers.notify_exit_observers( my_last_local_observer, /*worker=*/false ); |
702 | #endif /* __TBB_ARENA_OBSERVER */ |
703 | #if __TBB_TASK_PRIORITY |
704 | if ( my_offloaded_tasks ) |
705 | my_arena->orphan_offloaded_tasks( *this ); |
706 | #endif |
707 | if( !is_worker() && my_arena_index >= my_arena->my_num_reserved_slots ) |
708 | my_arena->my_market->adjust_demand(*my_arena, 1); |
709 | // Free the master slot. |
710 | __TBB_ASSERT(my_arena->my_slots[my_arena_index].my_scheduler, "A slot is already empty" ); |
711 | __TBB_store_with_release(my_arena->my_slots[my_arena_index].my_scheduler, (generic_scheduler*)NULL); |
712 | my_arena->my_exit_monitors.notify_one(); // do not relax! |
713 | } |
714 | |
715 | void generic_scheduler::wait_until_empty() { |
716 | my_dummy_task->prefix().ref_count++; // prevents exit from local_wait_for_all when local work is done enforcing the stealing |
717 | while( my_arena->my_pool_state != arena::SNAPSHOT_EMPTY ) |
718 | local_wait_for_all(*my_dummy_task, NULL); |
719 | my_dummy_task->prefix().ref_count--; |
720 | } |
721 | |
722 | } // namespace internal |
723 | } // namespace tbb |
724 | |
725 | #include "scheduler_utility.h" |
726 | #include "tbb/task_arena.h" // task_arena_base |
727 | |
728 | namespace tbb { |
729 | namespace interface7 { |
730 | namespace internal { |
731 | |
732 | void task_arena_base::internal_initialize( ) { |
733 | governor::one_time_init(); |
734 | if( my_max_concurrency < 1 ) |
735 | my_max_concurrency = (int)governor::default_num_threads(); |
736 | __TBB_ASSERT( my_master_slots <= (unsigned)my_max_concurrency, "Number of slots reserved for master should not exceed arena concurrency" ); |
737 | arena* new_arena = market::create_arena( my_max_concurrency, my_master_slots, 0 ); |
738 | // add an internal market reference; a public reference was added in create_arena |
739 | market &m = market::global_market( /*is_public=*/false ); |
740 | // allocate default context for task_arena |
741 | #if __TBB_TASK_GROUP_CONTEXT |
742 | new_arena->my_default_ctx = new ( NFS_Allocate(1, sizeof(task_group_context), NULL) ) |
743 | task_group_context( task_group_context::isolated, task_group_context::default_traits ); |
744 | #if __TBB_FP_CONTEXT |
745 | new_arena->my_default_ctx->capture_fp_settings(); |
746 | #endif |
747 | #endif /* __TBB_TASK_GROUP_CONTEXT */ |
748 | // threads might race to initialize the arena |
749 | if(as_atomic(my_arena).compare_and_swap(new_arena, NULL) != NULL) { |
750 | __TBB_ASSERT(my_arena, NULL); // another thread won the race |
751 | // release public market reference |
752 | m.release( /*is_public=*/true, /*blocking_terminate=*/false ); |
753 | new_arena->on_thread_leaving<arena::ref_external>(); // destroy unneeded arena |
754 | #if __TBB_TASK_GROUP_CONTEXT |
755 | spin_wait_while_eq(my_context, (task_group_context*)NULL); |
756 | } else { |
757 | new_arena->my_default_ctx->my_version_and_traits |= my_version_and_traits & exact_exception_flag; |
758 | as_atomic(my_context) = new_arena->my_default_ctx; |
759 | #endif |
760 | } |
761 | // TODO: should it trigger automatic initialization of this thread? |
762 | governor::local_scheduler_weak(); |
763 | } |
764 | |
765 | void task_arena_base::internal_terminate( ) { |
766 | if( my_arena ) {// task_arena was initialized |
767 | my_arena->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false ); |
768 | my_arena->on_thread_leaving<arena::ref_external>(); |
769 | my_arena = 0; |
770 | #if __TBB_TASK_GROUP_CONTEXT |
771 | my_context = 0; |
772 | #endif |
773 | } |
774 | } |
775 | |
776 | void task_arena_base::internal_attach( ) { |
777 | __TBB_ASSERT(!my_arena, NULL); |
778 | generic_scheduler* s = governor::local_scheduler_if_initialized(); |
779 | if( s && s->my_arena ) { |
780 | // There is an active arena to attach to. |
781 | // It's still used by s, so won't be destroyed right away. |
782 | my_arena = s->my_arena; |
783 | __TBB_ASSERT( my_arena->my_references > 0, NULL ); |
784 | my_arena->my_references += arena::ref_external; |
785 | #if __TBB_TASK_GROUP_CONTEXT |
786 | my_context = my_arena->my_default_ctx; |
787 | my_version_and_traits |= my_context->my_version_and_traits & exact_exception_flag; |
788 | #endif |
789 | my_master_slots = my_arena->my_num_reserved_slots; |
790 | my_max_concurrency = my_master_slots + my_arena->my_max_num_workers; |
791 | __TBB_ASSERT(arena::num_arena_slots(my_max_concurrency)==my_arena->my_num_slots, NULL); |
792 | // increases market's ref count for task_arena |
793 | market::global_market( /*is_public=*/true ); |
794 | } |
795 | } |
796 | |
797 | void task_arena_base::internal_enqueue( task& t, intptr_t prio ) const { |
798 | __TBB_ASSERT(my_arena, NULL); |
799 | generic_scheduler* s = governor::local_scheduler_weak(); // scheduler is only needed for FastRandom instance |
800 | __TBB_ASSERT(s, "Scheduler is not initialized" ); // we allocated a task so can expect the scheduler |
801 | #if __TBB_TASK_GROUP_CONTEXT |
802 | // Is there a better place for checking the state of my_default_ctx? |
803 | __TBB_ASSERT(!(my_arena->my_default_ctx == t.prefix().context && my_arena->my_default_ctx->is_group_execution_cancelled()), |
804 | "The task will not be executed because default task_group_context of task_arena is cancelled. Has previously enqueued task thrown an exception?" ); |
805 | #endif |
806 | my_arena->enqueue_task( t, prio, s->my_random ); |
807 | } |
808 | |
809 | class delegated_task : public task { |
810 | internal::delegate_base & my_delegate; |
811 | concurrent_monitor & my_monitor; |
812 | task * my_root; |
813 | task* execute() __TBB_override { |
814 | generic_scheduler& s = *(generic_scheduler*)prefix().owner; |
815 | __TBB_ASSERT(s.outermost_level(), "expected to be enqueued and received on the outermost level" ); |
816 | struct outermost_context : internal::no_copy { |
817 | delegated_task * t; |
818 | generic_scheduler & s; |
819 | task * orig_dummy; |
820 | task_group_context * orig_ctx; |
821 | scheduler_properties orig_props; |
822 | outermost_context(delegated_task *_t, generic_scheduler &_s) |
823 | : t(_t), s(_s), orig_dummy(s.my_dummy_task), orig_props(s.my_properties) { |
824 | __TBB_ASSERT(s.my_innermost_running_task == t, NULL); |
825 | #if __TBB_TASK_GROUP_CONTEXT |
826 | orig_ctx = t->prefix().context; |
827 | t->prefix().context = s.my_arena->my_default_ctx; |
828 | #endif |
829 | // Mimics outermost master |
830 | s.my_dummy_task = t; |
831 | s.my_properties.type = scheduler_properties::master; |
832 | } |
833 | ~outermost_context() { |
834 | #if __TBB_TASK_GROUP_CONTEXT |
835 | // Restore context for sake of registering potential exception |
836 | t->prefix().context = orig_ctx; |
837 | #endif |
838 | s.my_properties = orig_props; |
839 | s.my_dummy_task = orig_dummy; |
840 | } |
841 | } scope(this, s); |
842 | my_delegate(); |
843 | return NULL; |
844 | } |
845 | ~delegated_task() { |
846 | // potential exception was already registered. It must happen before the notification |
847 | __TBB_ASSERT(my_root->ref_count()==2, NULL); |
848 | __TBB_store_with_release(my_root->prefix().ref_count, 1); // must precede the wakeup |
849 | my_monitor.notify(*this); // do not relax, it needs a fence! |
850 | } |
851 | public: |
852 | delegated_task( internal::delegate_base & d, concurrent_monitor & s, task * t ) |
853 | : my_delegate(d), my_monitor(s), my_root(t) {} |
854 | // predicate for concurrent_monitor notification |
855 | bool operator()(uintptr_t ctx) const { return (void*)ctx == (void*)&my_delegate; } |
856 | }; |
857 | |
858 | void task_arena_base::internal_execute(internal::delegate_base& d) const { |
859 | __TBB_ASSERT(my_arena, NULL); |
860 | generic_scheduler* s = governor::local_scheduler_weak(); |
861 | __TBB_ASSERT(s, "Scheduler is not initialized" ); |
862 | |
863 | bool same_arena = s->my_arena == my_arena; |
864 | size_t index1 = s->my_arena_index; |
865 | if (!same_arena) { |
866 | index1 = my_arena->occupy_free_slot</* as_worker*/false>(*s); |
867 | if (index1 == arena::out_of_arena) { |
868 | |
869 | #if __TBB_USE_OPTIONAL_RTTI |
870 | // Workaround for the bug inside graph. If the thread can not occupy arena slot during task_arena::execute() |
871 | // and all aggregator operations depend on this task completion (all other threads are inside arena already) |
872 | // deadlock appears, because enqueued task will never enter arena. |
873 | // Workaround: check if the task came from graph via RTTI (casting to graph::spawn_functor) |
874 | // and enqueue this task with non-blocking internal_enqueue method. |
875 | // TODO: have to change behaviour later in next GOLD release (maybe to add new library entry point - try_execute) |
876 | typedef tbb::flow::interface10::graph::spawn_functor graph_funct; |
877 | internal::delegated_function< graph_funct, void >* deleg_funct = |
878 | dynamic_cast< internal::delegated_function< graph_funct, void>* >(&d); |
879 | |
880 | if (deleg_funct) { |
881 | internal_enqueue(*new(task::allocate_root(*my_context)) |
882 | internal::function_task< internal::strip< graph_funct >::type > |
883 | (internal::forward< graph_funct >(deleg_funct->my_func)), 0); |
884 | return; |
885 | } else { |
886 | #endif /* __TBB_USE_OPTIONAL_RTTI */ |
887 | concurrent_monitor::thread_context waiter; |
888 | #if __TBB_TASK_GROUP_CONTEXT |
889 | task_group_context exec_context(task_group_context::isolated, my_version_and_traits & exact_exception_flag); |
890 | #if __TBB_FP_CONTEXT |
891 | exec_context.copy_fp_settings(*my_context); |
892 | #endif |
893 | #endif |
894 | auto_empty_task root(__TBB_CONTEXT_ARG(s, &exec_context)); |
895 | root.prefix().ref_count = 2; |
896 | my_arena->enqueue_task(*new(task::allocate_root(__TBB_CONTEXT_ARG1(exec_context))) |
897 | delegated_task(d, my_arena->my_exit_monitors, &root), |
898 | 0, s->my_random); // TODO: priority? |
899 | size_t index2 = arena::out_of_arena; |
900 | do { |
901 | my_arena->my_exit_monitors.prepare_wait(waiter, (uintptr_t)&d); |
902 | if (__TBB_load_with_acquire(root.prefix().ref_count) < 2) { |
903 | my_arena->my_exit_monitors.cancel_wait(waiter); |
904 | break; |
905 | } |
906 | index2 = my_arena->occupy_free_slot</*as_worker*/false>(*s); |
907 | if (index2 != arena::out_of_arena) { |
908 | my_arena->my_exit_monitors.cancel_wait(waiter); |
909 | nested_arena_context scope(s, my_arena, index2, scheduler_properties::master, same_arena); |
910 | s->local_wait_for_all(root, NULL); |
911 | #if TBB_USE_EXCEPTIONS |
912 | __TBB_ASSERT(!exec_context.my_exception, NULL); // exception can be thrown above, not deferred |
913 | #endif |
914 | __TBB_ASSERT(root.prefix().ref_count == 0, NULL); |
915 | break; |
916 | } |
917 | my_arena->my_exit_monitors.commit_wait(waiter); |
918 | } while (__TBB_load_with_acquire(root.prefix().ref_count) == 2); |
919 | if (index2 == arena::out_of_arena) { |
920 | // notify a waiting thread even if this thread did not enter arena, |
921 | // in case it was woken by a leaving thread but did not need to enter |
922 | my_arena->my_exit_monitors.notify_one(); // do not relax! |
923 | } |
924 | #if TBB_USE_EXCEPTIONS |
925 | // process possible exception |
926 | if (task_group_context::exception_container_type *pe = exec_context.my_exception) |
927 | TbbRethrowException(pe); |
928 | #endif |
929 | return; |
930 | #if __TBB_USE_OPTIONAL_RTTI |
931 | } // if task came from graph |
932 | #endif |
933 | } // if (index1 == arena::out_of_arena) |
934 | } // if (!same_arena) |
935 | |
936 | context_guard_helper</*report_tasks=*/false> context_guard; |
937 | context_guard.set_ctx(__TBB_CONTEXT_ARG1(my_context)); |
938 | #if TBB_USE_EXCEPTIONS |
939 | try { |
940 | #endif |
941 | //TODO: replace dummy tasks for workers as well to avoid using of the_dummy_context |
942 | nested_arena_context scope(s, my_arena, index1, scheduler_properties::master, same_arena); |
943 | d(); |
944 | #if TBB_USE_EXCEPTIONS |
945 | } |
946 | catch (...) { |
947 | context_guard.restore_default(); // TODO: is it needed on Windows? |
948 | if (my_version_and_traits & exact_exception_flag) throw; |
949 | else { |
950 | task_group_context exception_container(task_group_context::isolated, |
951 | task_group_context::default_traits & ~task_group_context::exact_exception); |
952 | exception_container.register_pending_exception(); |
953 | __TBB_ASSERT(exception_container.my_exception, NULL); |
954 | TbbRethrowException(exception_container.my_exception); |
955 | } |
956 | } |
957 | #endif |
958 | } |
959 | |
960 | // this wait task is a temporary approach to wait for arena emptiness for masters without slots |
961 | // TODO: it will be rather reworked for one source of notification from is_out_of_work |
962 | class wait_task : public task { |
963 | binary_semaphore & my_signal; |
964 | task* execute() __TBB_override { |
965 | generic_scheduler* s = governor::local_scheduler_if_initialized(); |
966 | __TBB_ASSERT( s, NULL ); |
967 | __TBB_ASSERT( s->outermost_level(), "The enqueued task can be processed only on outermost level" ); |
968 | if ( s->is_worker() ) { |
969 | __TBB_ASSERT( s->my_innermost_running_task == this, NULL ); |
970 | // Mimic worker on outermost level to run remaining tasks |
971 | s->my_innermost_running_task = s->my_dummy_task; |
972 | s->local_wait_for_all( *s->my_dummy_task, NULL ); |
973 | s->my_innermost_running_task = this; |
974 | } else s->my_arena->is_out_of_work(); // avoids starvation of internal_wait: issuing this task makes arena full |
975 | my_signal.V(); |
976 | return NULL; |
977 | } |
978 | public: |
979 | wait_task ( binary_semaphore & sema ) : my_signal(sema) {} |
980 | }; |
981 | |
982 | void task_arena_base::internal_wait() const { |
983 | __TBB_ASSERT(my_arena, NULL); |
984 | generic_scheduler* s = governor::local_scheduler_weak(); |
985 | __TBB_ASSERT(s, "Scheduler is not initialized" ); |
986 | __TBB_ASSERT(s->my_arena != my_arena || s->my_arena_index == 0, "task_arena::wait_until_empty() is not supported within a worker context" ); |
987 | if( s->my_arena == my_arena ) { |
988 | //unsupported, but try do something for outermost master |
989 | __TBB_ASSERT(s->master_outermost_level(), "unsupported" ); |
990 | if( !s->my_arena_index ) |
991 | while( my_arena->num_workers_active() ) |
992 | s->wait_until_empty(); |
993 | } else for(;;) { |
994 | while( my_arena->my_pool_state != arena::SNAPSHOT_EMPTY ) { |
995 | if( !__TBB_load_with_acquire(my_arena->my_slots[0].my_scheduler) // TODO TEMP: one master, make more masters |
996 | && as_atomic(my_arena->my_slots[0].my_scheduler).compare_and_swap(s, NULL) == NULL ) { |
997 | nested_arena_context a(s, my_arena, 0, scheduler_properties::worker, false); |
998 | s->wait_until_empty(); |
999 | } else { |
1000 | binary_semaphore waiter; // TODO: replace by a single event notification from is_out_of_work |
1001 | internal_enqueue( *new( task::allocate_root(__TBB_CONTEXT_ARG1(*my_context)) ) wait_task(waiter), 0 ); // TODO: priority? |
1002 | waiter.P(); // TODO: concurrent_monitor |
1003 | } |
1004 | } |
1005 | if( !my_arena->num_workers_active() && !my_arena->my_slots[0].my_scheduler) // no activity |
1006 | break; // spin until workers active but avoid spinning in a worker |
1007 | __TBB_Yield(); // wait until workers and master leave |
1008 | } |
1009 | } |
1010 | |
1011 | /*static*/ int task_arena_base::internal_current_slot() { |
1012 | generic_scheduler* s = governor::local_scheduler_if_initialized(); |
1013 | return s? int(s->my_arena_index) : -1; |
1014 | } |
1015 | |
1016 | #if __TBB_TASK_ISOLATION |
1017 | class isolation_guard : tbb::internal::no_copy { |
1018 | isolation_tag &guarded; |
1019 | isolation_tag previous_value; |
1020 | public: |
1021 | isolation_guard( isolation_tag &isolation ) : guarded( isolation ), previous_value( isolation ) {} |
1022 | ~isolation_guard() { |
1023 | guarded = previous_value; |
1024 | } |
1025 | }; |
1026 | |
1027 | void isolate_within_arena( delegate_base& d, intptr_t reserved ) { |
1028 | __TBB_ASSERT_EX( reserved == 0, NULL ); |
1029 | // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it? |
1030 | generic_scheduler* s = governor::local_scheduler_weak(); |
1031 | __TBB_ASSERT( s, "this_task_arena::isolate() needs an initialized scheduler" ); |
1032 | // Theoretically, we can keep the current isolation in the scheduler; however, it makes sense to store it in innermost |
1033 | // running task because it can in principle be queried via task::self(). |
1034 | isolation_tag& current_isolation = s->my_innermost_running_task->prefix().isolation; |
1035 | // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard. |
1036 | isolation_guard guard( current_isolation ); |
1037 | current_isolation = reinterpret_cast<isolation_tag>(&d); |
1038 | d(); |
1039 | } |
1040 | #endif /* __TBB_TASK_ISOLATION */ |
1041 | |
1042 | int task_arena_base::internal_max_concurrency(const task_arena *ta) { |
1043 | arena* a = NULL; |
1044 | if( ta ) // for special cases of ta->max_concurrency() |
1045 | a = ta->my_arena; |
1046 | else if( generic_scheduler* s = governor::local_scheduler_if_initialized() ) |
1047 | a = s->my_arena; // the current arena if any |
1048 | |
1049 | if( a ) { // Get parameters from the arena |
1050 | __TBB_ASSERT( !ta || ta->my_max_concurrency==1, NULL ); |
1051 | return a->my_num_reserved_slots + a->my_max_num_workers; |
1052 | } else { |
1053 | __TBB_ASSERT( !ta || ta->my_max_concurrency==automatic, NULL ); |
1054 | return int(governor::default_num_threads()); |
1055 | } |
1056 | } |
1057 | } // tbb::interfaceX::internal |
1058 | } // tbb::interfaceX |
1059 | } // tbb |
1060 | |