1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#define __TBB_EXTRA_DEBUG 1
18
19#include <stdexcept>
20#include <cstdlib>
21#include <cstdio>
22#include <vector>
23#include <set>
24
25#include "harness_fp.h"
26
27#if __TBB_TASK_ISOLATION
28// Whitebox stuff for TestIsolatedExecuteNS::ContinuationTest().
29// TODO: Consider better approach instead of the whitebox approach.
30#define private public
31#include "tbb/task.h"
32#undef private
33#endif /* __TBB_TASK_ISOLATION */
34
35#include "tbb/task_arena.h"
36#include "tbb/atomic.h"
37#include "tbb/task_scheduler_observer.h"
38#include "tbb/task_scheduler_init.h"
39#include "tbb/parallel_for.h"
40#include "tbb/blocked_range.h"
41#include "tbb/enumerable_thread_specific.h"
42
43#include "harness_assert.h"
44#include "harness.h"
45#include "harness_barrier.h"
46
47#include "tbb/tbb_thread.h"
48
49#if _MSC_VER
50// plays around __TBB_NO_IMPLICIT_LINKAGE. __TBB_LIB_NAME should be defined (in makefiles)
51#pragma comment(lib, __TBB_STRING(__TBB_LIB_NAME))
52#endif
53
54#include "tbb/global_control.h"
55//--------------------------------------------------//
56// Test that task_arena::initialize and task_arena::terminate work when doing nothing else.
57/* maxthread is treated as the biggest possible concurrency level. */
58void InitializeAndTerminate( int maxthread ) {
59 __TBB_TRY {
60 for( int i=0; i<200; ++i ) {
61 switch( i&3 ) {
62 // Arena is created inactive, initialization is always explicit. Lazy initialization is covered by other test functions.
63 // Explicit initialization can either keep the original values or change those.
64 // Arena termination can be explicit or implicit (in the destructor).
65 // TODO: extend with concurrency level checks if such a method is added.
66 default: {
67 tbb::task_arena arena( std::rand() % maxthread + 1 );
68 ASSERT(!arena.is_active(), "arena should not be active until initialized");
69 arena.initialize();
70 ASSERT(arena.is_active(), NULL);
71 arena.terminate();
72 ASSERT(!arena.is_active(), "arena should not be active; it was terminated");
73 break;
74 }
75 case 0: {
76 tbb::task_arena arena( 1 );
77 ASSERT(!arena.is_active(), "arena should not be active until initialized");
78 arena.initialize( std::rand() % maxthread + 1 ); // change the parameters
79 ASSERT(arena.is_active(), NULL);
80 break;
81 }
82 case 1: {
83 tbb::task_arena arena( tbb::task_arena::automatic );
84 ASSERT(!arena.is_active(), NULL);
85 arena.initialize();
86 ASSERT(arena.is_active(), NULL);
87 break;
88 }
89 case 2: {
90 tbb::task_arena arena;
91 ASSERT(!arena.is_active(), "arena should not be active until initialized");
92 arena.initialize( std::rand() % maxthread + 1 );
93 ASSERT(arena.is_active(), NULL);
94 arena.terminate();
95 ASSERT(!arena.is_active(), "arena should not be active; it was terminated");
96 break;
97 }
98 }
99 }
100 } __TBB_CATCH( std::runtime_error& error ) {
101#if TBB_USE_EXCEPTIONS
102 REPORT("ERROR: %s\n", error.what() );
103#endif /* TBB_USE_EXCEPTIONS */
104 }
105}
106
107//--------------------------------------------------//
108// Definitions used in more than one test
109typedef tbb::blocked_range<int> Range;
110
111// slot_id value: -1 is reserved by current_slot(), -2 is set in on_scheduler_exit() below
112static tbb::enumerable_thread_specific<int> local_id, old_id, slot_id(-3);
113
114void ResetTLS() {
115 local_id.clear();
116 old_id.clear();
117 slot_id.clear();
118}
119
120class ArenaObserver : public tbb::task_scheduler_observer {
121 int myId; // unique observer/arena id within a test
122 int myMaxConcurrency; // concurrency of the associated arena
123 int myNumReservedSlots; // reserved slots in the associated arena
124 void on_scheduler_entry( bool is_worker ) __TBB_override {
125 int current_index = tbb::this_task_arena::current_thread_index();
126 REMARK("a %s #%p is entering arena %d from %d on slot %d\n", is_worker?"worker":"master",
127 &local_id.local(), myId, local_id.local(), current_index );
128 ASSERT(current_index<(myMaxConcurrency>1?myMaxConcurrency:2), NULL);
129 if(is_worker) ASSERT(current_index>=myNumReservedSlots, NULL);
130
131 ASSERT(!old_id.local(), "double call to on_scheduler_entry");
132 old_id.local() = local_id.local();
133 ASSERT(old_id.local() != myId, "double entry to the same arena");
134 local_id.local() = myId;
135 slot_id.local() = current_index;
136 }
137 void on_scheduler_exit( bool is_worker ) __TBB_override {
138 REMARK("a %s #%p is leaving arena %d to %d\n", is_worker?"worker":"master",
139 &local_id.local(), myId, old_id.local());
140 ASSERT(local_id.local() == myId, "nesting of arenas is broken");
141 ASSERT(slot_id.local() == tbb::this_task_arena::current_thread_index(), NULL);
142 //!deprecated, remove when tbb::task_arena::current_thread_index is removed.
143 ASSERT(slot_id.local() == tbb::task_arena::current_thread_index(), NULL);
144 slot_id.local() = -2;
145 local_id.local() = old_id.local();
146 old_id.local() = 0;
147 }
148public:
149 ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id)
150 : tbb::task_scheduler_observer(a)
151 , myId(id)
152 , myMaxConcurrency(maxConcurrency)
153 , myNumReservedSlots(numReservedSlots) {
154 ASSERT(myId, NULL);
155 observe(true);
156 }
157 ~ArenaObserver () {
158 ASSERT(!old_id.local(), "inconsistent observer state");
159 }
160};
161
162struct IndexTrackingBody { // Must be used together with ArenaObserver
163 void operator() ( const Range& ) const {
164 ASSERT(slot_id.local() == tbb::this_task_arena::current_thread_index(), NULL);
165 //!deprecated, remove when tbb::task_arena::current_thread_index is removed.
166 ASSERT(slot_id.local() == tbb::task_arena::current_thread_index(), NULL);
167 for ( volatile int i = 0; i < 50000; ++i )
168 ;
169 }
170};
171
172struct AsynchronousWork : NoAssign {
173 Harness::SpinBarrier &my_barrier;
174 bool my_is_blocking;
175 AsynchronousWork(Harness::SpinBarrier &a_barrier, bool blocking = true)
176 : my_barrier(a_barrier), my_is_blocking(blocking) {}
177 void operator()() const {
178 ASSERT(local_id.local() != 0, "not in explicit arena");
179 tbb::parallel_for(Range(0,500), IndexTrackingBody(), tbb::simple_partitioner(), *tbb::task::self().group());
180 if(my_is_blocking) my_barrier.timed_wait(10); // must be asynchronous to master thread
181 else my_barrier.signal_nowait();
182 }
183};
184
185//--------------------------------------------------//
186// Test that task_arenas might be created and used from multiple application threads.
187// Also tests arena observers. The parameter p is the index of an app thread running this test.
188void TestConcurrentArenasFunc(int idx) {
189 // A regression test for observer activation order:
190 // check that arena observer can be activated before local observer
191 struct LocalObserver : public tbb::task_scheduler_observer {
192 LocalObserver() : tbb::task_scheduler_observer(/*local=*/true) { observe(true); }
193 };
194 tbb::task_arena a1;
195 a1.initialize(1,0);
196 ArenaObserver o1(a1, 1, 0, idx*2+1); // the last argument is a "unique" observer/arena id for the test
197 ASSERT(o1.is_observing(), "Arena observer has not been activated");
198 LocalObserver lo;
199 ASSERT(lo.is_observing(), "Local observer has not been activated");
200 tbb::task_arena a2(2,1);
201 ArenaObserver o2(a2, 2, 1, idx*2+2);
202 ASSERT(o2.is_observing(), "Arena observer has not been activated");
203 Harness::SpinBarrier barrier(2);
204 AsynchronousWork work(barrier);
205 a1.enqueue(work); // put async work
206 barrier.timed_wait(10);
207 a2.enqueue(work); // another work
208 a2.execute(work); // my_barrier.timed_wait(10) inside
209 a1.debug_wait_until_empty();
210 a2.debug_wait_until_empty();
211}
212
213void TestConcurrentArenas(int p) {
214 ResetTLS();
215 NativeParallelFor( p, &TestConcurrentArenasFunc );
216}
217
218//--------------------------------------------------//
219// Test multiple application threads working with a single arena at the same time.
220class MultipleMastersPart1 : NoAssign {
221 tbb::task_arena &my_a;
222 Harness::SpinBarrier &my_b1, &my_b2;
223public:
224 MultipleMastersPart1( tbb::task_arena &a, Harness::SpinBarrier &b1, Harness::SpinBarrier &b2)
225 : my_a(a), my_b1(b1), my_b2(b2) {}
226 void operator()(int) const {
227 my_a.execute(AsynchronousWork(my_b2, /*blocking=*/false));
228 my_b1.timed_wait(10);
229 // A regression test for bugs 1954 & 1971
230 my_a.enqueue(AsynchronousWork(my_b2, /*blocking=*/false));
231 }
232};
233
234class MultipleMastersPart2 : NoAssign {
235 tbb::task_arena &my_a;
236 Harness::SpinBarrier &my_b;
237public:
238 MultipleMastersPart2( tbb::task_arena &a, Harness::SpinBarrier &b) : my_a(a), my_b(b) {}
239 void operator()(int) const {
240 my_a.execute(AsynchronousWork(my_b, /*blocking=*/false));
241 }
242};
243
244class MultipleMastersPart3 : NoAssign {
245 tbb::task_arena &my_a;
246 Harness::SpinBarrier &my_b;
247
248 struct Runner : NoAssign {
249 tbb::task* const a_task;
250 Runner(tbb::task* const t) : a_task(t) {}
251 void operator()() const {
252 for ( volatile int i = 0; i < 10000; ++i )
253 ;
254 a_task->decrement_ref_count();
255 }
256 };
257
258 struct Waiter : NoAssign {
259 tbb::task* const a_task;
260 Waiter(tbb::task* const t) : a_task(t) {}
261 void operator()() const {
262 a_task->wait_for_all();
263 }
264 };
265
266public:
267 MultipleMastersPart3(tbb::task_arena &a, Harness::SpinBarrier &b)
268 : my_a(a), my_b(b) {}
269 void operator()(int idx) const {
270 tbb::empty_task* root_task = new(tbb::task::allocate_root()) tbb::empty_task;
271 my_b.timed_wait(10); // increases chances for task_arena initialization contention
272 for( int i=0; i<100; ++i) {
273 root_task->set_ref_count(2);
274 my_a.enqueue(Runner(root_task));
275 my_a.execute(Waiter(root_task));
276 }
277 tbb::task::destroy(*root_task);
278 REMARK("Master #%d: job completed, wait for others\n", idx);
279 my_b.timed_wait(10);
280 }
281};
282
283class MultipleMastersPart4 : NoAssign {
284 tbb::task_arena &my_a;
285 Harness::SpinBarrier &my_b;
286 tbb::task_group_context *my_ag;
287
288 struct Getter : NoAssign {
289 tbb::task_group_context *& my_g;
290 Getter(tbb::task_group_context *&a_g) : my_g(a_g) {}
291 void operator()() const {
292 my_g = tbb::task::self().group();
293 }
294 };
295 struct Checker : NoAssign {
296 tbb::task_group_context *my_g;
297 Checker(tbb::task_group_context *a_g) : my_g(a_g) {}
298 void operator()() const {
299 ASSERT(my_g == tbb::task::self().group(), NULL);
300 tbb::task *t = new( tbb::task::allocate_root() ) tbb::empty_task;
301 ASSERT(my_g == t->group(), NULL);
302 tbb::task::destroy(*t);
303 }
304 };
305 struct NestedChecker : NoAssign {
306 const MultipleMastersPart4 &my_body;
307 NestedChecker(const MultipleMastersPart4 &b) : my_body(b) {}
308 void operator()() const {
309 tbb::task_group_context *nested_g = tbb::task::self().group();
310 ASSERT(my_body.my_ag != nested_g, NULL);
311 tbb::task *t = new( tbb::task::allocate_root() ) tbb::empty_task;
312 ASSERT(nested_g == t->group(), NULL);
313 tbb::task::destroy(*t);
314 my_body.my_a.enqueue(Checker(my_body.my_ag));
315 }
316 };
317public:
318 MultipleMastersPart4( tbb::task_arena &a, Harness::SpinBarrier &b) : my_a(a), my_b(b) {
319 my_a.execute(Getter(my_ag));
320 }
321 // NativeParallelFor's functor
322 void operator()(int) const {
323 my_a.execute(*this);
324 }
325 // Arena's functor
326 void operator()() const {
327 Checker check(my_ag);
328 check();
329 tbb::task_arena nested(1,1);
330 nested.execute(NestedChecker(*this)); // change arena
331 tbb::parallel_for(Range(0,1),*this); // change group context only
332 my_b.timed_wait(10);
333 my_a.execute(check);
334 check();
335 }
336 // parallel_for's functor
337 void operator()(const Range &) const {
338 NestedChecker(*this)();
339 my_a.execute(Checker(my_ag)); // restore arena context
340 }
341};
342
343void TestMultipleMasters(int p) {
344 {
345 REMARK("multiple masters, part 1\n");
346 ResetTLS();
347 tbb::task_arena a(1,0);
348 a.initialize();
349 ArenaObserver o(a, 1, 0, 1);
350 Harness::SpinBarrier barrier1(p), barrier2(2*p+1); // each of p threads will submit two tasks signaling the barrier
351 NativeParallelFor( p, MultipleMastersPart1(a, barrier1, barrier2) );
352 barrier2.timed_wait(10);
353 a.debug_wait_until_empty();
354 } {
355 REMARK("multiple masters, part 2\n");
356 ResetTLS();
357 tbb::task_arena a(2,1);
358 ArenaObserver o(a, 2, 1, 2);
359 Harness::SpinBarrier barrier(p+2);
360 a.enqueue(AsynchronousWork(barrier, /*blocking=*/true)); // occupy the worker, a regression test for bug 1981
361 NativeParallelFor( p, MultipleMastersPart2(a, barrier) );
362 barrier.timed_wait(10);
363 a.debug_wait_until_empty();
364 } {
365 // Regression test for the bug 1981 part 2 (task_arena::execute() with wait_for_all for an enqueued task)
366 REMARK("multiple masters, part 3: wait_for_all() in execute()\n");
367 tbb::task_arena a(p,1);
368 Harness::SpinBarrier barrier(p+1); // for masters to avoid endless waiting at least in some runs
369 // "Oversubscribe" the arena by 1 master thread
370 NativeParallelFor( p+1, MultipleMastersPart3(a, barrier) );
371 a.debug_wait_until_empty();
372 } {
373 int c = p%3? (p%2? p : 2) : 3;
374 REMARK("multiple masters, part 4: contexts, arena(%d)\n", c);
375 ResetTLS();
376 tbb::task_arena a(c, 1);
377 ArenaObserver o(a, c, 1, c);
378 Harness::SpinBarrier barrier(c);
379 MultipleMastersPart4 test(a, barrier);
380 NativeParallelFor(p, test);
381 a.debug_wait_until_empty();
382 }
383}
384
385//--------------------------------------------------//
386// TODO: explain what TestArenaEntryConsistency does
387#include <sstream>
388#if TBB_USE_EXCEPTIONS
389#include <stdexcept>
390#include "tbb/tbb_exception.h"
391#endif
392
393struct TestArenaEntryBody : FPModeContext {
394 tbb::atomic<int> &my_stage; // each execute increases it
395 std::stringstream my_id;
396 bool is_caught, is_expected;
397 enum { arenaFPMode = 1 };
398
399 TestArenaEntryBody(tbb::atomic<int> &s, int idx, int i) // init thread-specific instance
400 : FPModeContext(idx+i)
401 , my_stage(s)
402 , is_caught(false)
403 , is_expected( (idx&(1<<i)) != 0 && (TBB_USE_EXCEPTIONS) != 0 )
404 {
405 my_id << idx << ':' << i << '@';
406 }
407 void operator()() { // inside task_arena::execute()
408 // synchronize with other stages
409 int stage = my_stage++;
410 int slot = tbb::this_task_arena::current_thread_index();
411 ASSERT(slot >= 0 && slot <= 1, "master or the only worker");
412 // wait until the third stage is delegated and then starts on slot 0
413 while(my_stage < 2+slot) __TBB_Yield();
414 // deduct its entry type and put it into id, it helps to find source of a problem
415 my_id << (stage < 3 ? (tbb::this_task_arena::current_thread_index()?
416 "delegated_to_worker" : stage < 2? "direct" : "delegated_to_master")
417 : stage == 3? "nested_same_ctx" : "nested_alien_ctx");
418 REMARK("running %s\n", my_id.str().c_str());
419 AssertFPMode(arenaFPMode);
420 if(is_expected)
421 __TBB_THROW(std::logic_error(my_id.str()));
422 // no code can be put here since exceptions can be thrown
423 }
424 void on_exception(const char *e) { // outside arena, in catch block
425 is_caught = true;
426 REMARK("caught %s\n", e);
427 ASSERT(my_id.str() == e, NULL);
428 assertFPMode();
429 }
430 void after_execute() { // outside arena and catch block
431 REMARK("completing %s\n", my_id.str().c_str() );
432 ASSERT(is_caught == is_expected, NULL);
433 assertFPMode();
434 }
435};
436
437class ForEachArenaEntryBody : NoAssign {
438 tbb::task_arena &my_a; // expected task_arena(2,1)
439 tbb::atomic<int> &my_stage; // each execute increases it
440 int my_idx;
441
442public:
443 ForEachArenaEntryBody(tbb::task_arena &a, tbb::atomic<int> &c)
444 : my_a(a), my_stage(c), my_idx(0) {}
445
446 void test(int idx) {
447 my_idx = idx;
448 my_stage = 0;
449 NativeParallelFor(3, *this); // test cross-arena calls
450 ASSERT(my_stage == 3, NULL);
451 my_a.execute(*this); // test nested calls
452 ASSERT(my_stage == 5, NULL);
453 }
454
455 // task_arena functor for nested tests
456 void operator()() const {
457 test_arena_entry(3); // in current task group context
458 tbb::parallel_for(4, 5, *this); // in different context
459 }
460
461 // NativeParallelFor & parallel_for functor
462 void operator()(int i) const {
463 test_arena_entry(i);
464 }
465
466private:
467 void test_arena_entry(int i) const {
468 TestArenaEntryBody scoped_functor(my_stage, my_idx, i);
469 __TBB_TRY {
470 my_a.execute(scoped_functor);
471 }
472#if TBB_USE_EXCEPTIONS
473 catch(tbb::captured_exception &e) {
474 scoped_functor.on_exception(e.what());
475 ASSERT_WARNING(TBB_USE_CAPTURED_EXCEPTION, "Caught captured_exception while expecting exact one");
476 } catch(std::logic_error &e) {
477 scoped_functor.on_exception(e.what());
478 ASSERT(!TBB_USE_CAPTURED_EXCEPTION, "Caught exception of wrong type");
479 } catch(...) { ASSERT(false, "Unexpected exception type"); }
480#endif //TBB_USE_EXCEPTIONS
481 scoped_functor.after_execute();
482 }
483};
484
485void TestArenaEntryConsistency() {
486 REMARK("test arena entry consistency\n");
487
488 tbb::task_arena a(2, 1);
489 tbb::atomic<int> c;
490 ForEachArenaEntryBody body(a, c);
491
492 FPModeContext fp_scope(TestArenaEntryBody::arenaFPMode);
493 a.initialize(); // capture FP settings to arena
494 fp_scope.setNextFPMode();
495
496 for (int i = 0; i < 100; i++) // not less than 32 = 2^5 of entry types
497 body.test(i);
498}
499
500//--------------------------------------------------
501// Test that the requested degree of concurrency for task_arena is achieved in various conditions
502class TestArenaConcurrencyBody : NoAssign {
503 tbb::task_arena &my_a;
504 int my_max_concurrency;
505 int my_reserved_slots;
506 Harness::SpinBarrier *my_barrier;
507 Harness::SpinBarrier *my_worker_barrier;
508public:
509 TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, Harness::SpinBarrier *b = NULL, Harness::SpinBarrier *wb = NULL )
510 : my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {}
511 // NativeParallelFor's functor
512 void operator()( int ) const {
513 ASSERT( local_id.local() == 0, "TLS was not cleaned?" );
514 local_id.local() = 1;
515 my_a.execute( *this );
516 }
517 // Arena's functor
518 void operator()() const {
519 ASSERT( tbb::task_arena::current_thread_index() == tbb::this_task_arena::current_thread_index(), NULL );
520 int idx = tbb::this_task_arena::current_thread_index();
521 ASSERT( idx < (my_max_concurrency > 1 ? my_max_concurrency : 2), NULL );
522 ASSERT( my_a.max_concurrency() == tbb::this_task_arena::max_concurrency(), NULL );
523 int max_arena_concurrency = tbb::this_task_arena::max_concurrency();
524 ASSERT( max_arena_concurrency == my_max_concurrency, NULL );
525 if ( my_worker_barrier ) {
526 if ( local_id.local() == 1 ) {
527 // Master thread in a reserved slot
528 ASSERT( idx < my_reserved_slots, "Masters are supposed to use only reserved slots in this test" );
529 } else {
530 // Worker thread
531 ASSERT( idx >= my_reserved_slots, NULL );
532 my_worker_barrier->timed_wait( 10 );
533 }
534 } else if ( my_barrier )
535 ASSERT( local_id.local() == 1, "Workers are not supposed to enter the arena in this test" );
536 if ( my_barrier ) my_barrier->timed_wait( 10 );
537 else Harness::Sleep( 10 );
538 }
539};
540
541void TestArenaConcurrency( int p, int reserved = 0, int step = 1) {
542 for (; reserved <= p; reserved += step) {
543 REMARK("TestArenaConcurrency: %d slots, %d reserved\n", p, reserved);
544 tbb::task_arena a( p, reserved );
545 { // Check concurrency with worker & reserved master threads.
546 ResetTLS();
547 Harness::SpinBarrier b( p );
548 Harness::SpinBarrier wb( p-reserved );
549 TestArenaConcurrencyBody test( a, p, reserved, &b, &wb );
550 for ( int i = reserved; i < p; ++i )
551 a.enqueue( test );
552 if ( reserved==1 )
553 test( 0 ); // calls execute()
554 else
555 NativeParallelFor( reserved, test );
556 a.debug_wait_until_empty();
557 } { // Check if multiple masters alone can achieve maximum concurrency.
558 ResetTLS();
559 Harness::SpinBarrier b( p );
560 NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) );
561 a.debug_wait_until_empty();
562 } { // Check oversubscription by masters.
563 ResetTLS();
564 NativeParallelFor( 2*p, TestArenaConcurrencyBody( a, p, reserved ) );
565 a.debug_wait_until_empty();
566 }
567 }
568}
569
570//--------------------------------------------------//
571// Test creation/initialization of a task_arena that references an existing arena (aka attach).
572// This part of the test uses the knowledge of task_arena internals
573
574typedef tbb::interface7::internal::task_arena_base task_arena_internals;
575
576struct TaskArenaValidator : public task_arena_internals {
577 int my_slot_at_construction;
578 TaskArenaValidator( const task_arena_internals& other )
579 : task_arena_internals(other) /*copies the internal state of other*/ {
580 my_slot_at_construction = tbb::this_task_arena::current_thread_index();
581 }
582 // Inspect the internal state
583 int concurrency() { return my_max_concurrency; }
584 int reserved_for_masters() { return (int)my_master_slots; }
585
586 // This method should be called in task_arena::execute() for a captured arena
587 // by the same thread that created the validator.
588 void operator()() {
589 ASSERT( tbb::this_task_arena::current_thread_index()==my_slot_at_construction,
590 "Current thread index has changed since the validator construction" );
591 //!deprecated
592 ASSERT( tbb::task_arena::current_thread_index()==my_slot_at_construction,
593 "Current thread index has changed since the validator construction" );
594 }
595};
596
597void ValidateAttachedArena( tbb::task_arena& arena, bool expect_activated,
598 int expect_concurrency, int expect_masters ) {
599 ASSERT( arena.is_active()==expect_activated, "Unexpected activation state" );
600 if( arena.is_active() ) {
601 TaskArenaValidator validator( arena );
602 ASSERT( validator.concurrency()==expect_concurrency, "Unexpected arena size" );
603 ASSERT( validator.reserved_for_masters()==expect_masters, "Unexpected # of reserved slots" );
604 if ( tbb::this_task_arena::current_thread_index() != tbb::task_arena::not_initialized ) {
605 ASSERT( tbb::task_arena::current_thread_index() >= 0 &&
606 tbb::this_task_arena::current_thread_index() >= 0, NULL);
607 // for threads already in arena, check that the thread index remains the same
608 arena.execute( validator );
609 } else { // not_initialized
610 // Test the deprecated method
611 ASSERT( tbb::task_arena::current_thread_index()==-1, NULL);
612 }
613
614 // Ideally, there should be a check for having the same internal arena object,
615 // but that object is not easily accessible for implicit arenas.
616 }
617}
618
619struct TestAttachBody : NoAssign {
620 mutable int my_idx; // safe to modify and use within the NativeParallelFor functor
621 const int maxthread;
622 TestAttachBody( int max_thr ) : maxthread(max_thr) {}
623
624 // The functor body for NativeParallelFor
625 void operator()( int idx ) const {
626 my_idx = idx;
627 int default_threads = tbb::task_scheduler_init::default_num_threads();
628
629 tbb::task_arena arena = tbb::task_arena( tbb::task_arena::attach() );
630 ValidateAttachedArena( arena, false, -1, -1 ); // Nothing yet to attach to
631
632 { // attach to an arena created via task_scheduler_init
633 tbb::task_scheduler_init init( idx+1 );
634
635 tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() );
636 ValidateAttachedArena( arena2, true, idx+1, 1 );
637
638 arena.initialize( tbb::task_arena::attach() );
639 }
640 ValidateAttachedArena( arena, true, idx+1, 1 );
641
642 arena.terminate();
643 ValidateAttachedArena( arena, false, -1, -1 );
644
645 // Check default behavior when attach cannot succeed
646 switch (idx%2) {
647 case 0:
648 { // construct as attached, then initialize
649 tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() );
650 ValidateAttachedArena( arena2, false, -1, -1 );
651 arena2.initialize(); // must be initialized with default parameters
652 ValidateAttachedArena( arena2, true, default_threads, 1 );
653 }
654 break;
655 case 1:
656 { // default-construct, then initialize as attached
657 tbb::task_arena arena2;
658 ValidateAttachedArena( arena2, false, -1, -1 );
659 arena2.initialize( tbb::task_arena::attach() ); // must use default parameters
660 ValidateAttachedArena( arena2, true, default_threads, 1 );
661 }
662 break;
663 } // switch
664
665 // attach to an auto-initialized arena
666 tbb::empty_task& tsk = *new (tbb::task::allocate_root()) tbb::empty_task;
667 tbb::task::spawn_root_and_wait(tsk);
668 tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() );
669 ValidateAttachedArena( arena2, true, default_threads, 1 );
670
671 // attach to another task_arena
672 arena.initialize( maxthread, min(maxthread,idx) );
673 arena.execute( *this );
674 }
675
676 // The functor body for task_arena::execute above
677 void operator()() const {
678 tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() );
679 ValidateAttachedArena( arena2, true, maxthread, min(maxthread,my_idx) );
680 }
681
682 // The functor body for tbb::parallel_for
683 void operator()( const Range& r ) const {
684 for( int i = r.begin(); i<r.end(); ++i ) {
685 tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() );
686 ValidateAttachedArena( arena2, true, maxthread+1, 1 ); // +1 to match initialization in TestMain
687 }
688 }
689};
690
691void TestAttach( int maxthread ) {
692 REMARK( "Testing attached task_arenas\n" );
693 // Externally concurrent, but no concurrency within a thread
694 NativeParallelFor( max(maxthread,4), TestAttachBody( maxthread ) );
695 // Concurrent within the current arena; may also serve as a stress test
696 tbb::parallel_for( Range(0,10000*maxthread), TestAttachBody( maxthread ) );
697}
698
699//--------------------------------------------------//
700// Test that task_arena::enqueue does not tolerate a non-const functor.
701// TODO: can it be reworked as SFINAE-based compile-time check?
702struct TestFunctor {
703 void operator()() { ASSERT( false, "Non-const operator called" ); }
704 void operator()() const { /* library requires this overload only */ }
705};
706
707void TestConstantFunctorRequirement() {
708 tbb::task_arena a;
709 TestFunctor tf;
710 a.enqueue( tf );
711#if __TBB_TASK_PRIORITY
712 a.enqueue( tf, tbb::priority_normal );
713#endif
714}
715//--------------------------------------------------//
716#if __TBB_TASK_ISOLATION
717#include "tbb/parallel_reduce.h"
718#include "tbb/parallel_invoke.h"
719// Test this_task_arena::isolate
720namespace TestIsolatedExecuteNS {
721 //--------------------------------------------------//
722 template <typename NestedPartitioner>
723 class NestedParFor : NoAssign {
724 public:
725 NestedParFor() {}
726 void operator()() const {
727 NestedPartitioner p;
728 tbb::parallel_for( 0, 10, Harness::DummyBody( 10 ), p );
729 }
730 };
731
732 template <typename NestedPartitioner>
733 class ParForBody : NoAssign {
734 bool myOuterIsolation;
735 tbb::enumerable_thread_specific<int> &myEts;
736 tbb::atomic<bool> &myIsStolen;
737 public:
738 ParForBody( bool outer_isolation, tbb::enumerable_thread_specific<int> &ets, tbb::atomic<bool> &is_stolen )
739 : myOuterIsolation( outer_isolation ), myEts( ets ), myIsStolen( is_stolen ) {}
740 void operator()( int ) const {
741 int &e = myEts.local();
742 if ( e++ > 0 ) myIsStolen = true;
743 if ( myOuterIsolation )
744 NestedParFor<NestedPartitioner>()();
745 else
746 tbb::this_task_arena::isolate( NestedParFor<NestedPartitioner>() );
747 --e;
748 }
749 };
750
751 template <typename OuterPartitioner, typename NestedPartitioner>
752 class OuterParFor : NoAssign {
753 bool myOuterIsolation;
754 tbb::atomic<bool> &myIsStolen;
755 public:
756 OuterParFor( bool outer_isolation, tbb::atomic<bool> &is_stolen ) : myOuterIsolation( outer_isolation ), myIsStolen( is_stolen ) {}
757 void operator()() const {
758 tbb::enumerable_thread_specific<int> ets( 0 );
759 OuterPartitioner p;
760 tbb::parallel_for( 0, 1000, ParForBody<NestedPartitioner>( myOuterIsolation, ets, myIsStolen ), p );
761 }
762 };
763
764 template <typename OuterPartitioner, typename NestedPartitioner>
765 void TwoLoopsTest( bool outer_isolation ) {
766 tbb::atomic<bool> is_stolen;
767 is_stolen = false;
768 const int max_repeats = 100;
769 if ( outer_isolation ) {
770 for ( int i = 0; i <= max_repeats; ++i ) {
771 tbb::this_task_arena::isolate( OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen ) );
772 if ( is_stolen ) break;
773 }
774 ASSERT_WARNING( is_stolen, "isolate() should not block stealing on nested levels without isolation" );
775 } else {
776 for ( int i = 0; i <= max_repeats; ++i ) {
777 OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen )();
778 }
779 ASSERT( !is_stolen, "isolate() on nested levels should prevent stealing from outer leves" );
780 }
781 }
782
783 void TwoLoopsTest( bool outer_isolation ) {
784 TwoLoopsTest<tbb::simple_partitioner, tbb::simple_partitioner>( outer_isolation );
785 TwoLoopsTest<tbb::simple_partitioner, tbb::affinity_partitioner>( outer_isolation );
786 TwoLoopsTest<tbb::affinity_partitioner, tbb::simple_partitioner>( outer_isolation );
787 TwoLoopsTest<tbb::affinity_partitioner, tbb::affinity_partitioner>( outer_isolation );
788 }
789
790 void TwoLoopsTest() {
791 TwoLoopsTest( true );
792 TwoLoopsTest( false );
793 }
794 //--------------------------------------------------//
795 class HeavyMixTestBody : NoAssign {
796 tbb::enumerable_thread_specific<Harness::FastRandom>& myRandom;
797 tbb::enumerable_thread_specific<int>& myIsolatedLevel;
798 int myNestedLevel;
799 bool myHighPriority;
800
801 template <typename Partitioner, typename Body>
802 static void RunTwoBodies( Harness::FastRandom& rnd, const Body &body, Partitioner& p, tbb::task_group_context* ctx = NULL ) {
803 if ( rnd.get() % 2 )
804 if (ctx )
805 tbb::parallel_for( 0, 2, body, p, *ctx );
806 else
807 tbb::parallel_for( 0, 2, body, p );
808 else
809 tbb::parallel_invoke( body, body );
810 }
811
812 template <typename Partitioner>
813 class IsolatedBody : NoAssign {
814 const HeavyMixTestBody &myHeavyMixTestBody;
815 Partitioner &myPartitioner;
816 public:
817 IsolatedBody( const HeavyMixTestBody &body, Partitioner &partitioner )
818 : myHeavyMixTestBody( body ), myPartitioner( partitioner ) {}
819 void operator()() const {
820 RunTwoBodies( myHeavyMixTestBody.myRandom.local(),
821 HeavyMixTestBody( myHeavyMixTestBody.myRandom, myHeavyMixTestBody.myIsolatedLevel,
822 myHeavyMixTestBody.myNestedLevel + 1, myHeavyMixTestBody.myHighPriority ),
823 myPartitioner );
824 }
825 };
826
827 template <typename Partitioner>
828 void RunNextLevel( Harness::FastRandom& rnd, int &isolated_level ) const {
829 Partitioner p;
830 switch ( rnd.get() % 3 ) {
831 case 0: {
832 // No features
833 tbb::task_group_context ctx;
834 if ( myHighPriority )
835 ctx.set_priority( tbb::priority_high );
836 RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1, myHighPriority), p, &ctx );
837 break;
838 }
839 case 1: {
840 // High priority
841 tbb::task_group_context ctx;
842 ctx.set_priority( tbb::priority_high );
843 RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1, true), p, &ctx );
844 break;
845 }
846 case 2: {
847 // Isolation
848 int previous_isolation = isolated_level;
849 isolated_level = myNestedLevel;
850 tbb::this_task_arena::isolate( IsolatedBody<Partitioner>( *this, p ) );
851 isolated_level = previous_isolation;
852 break;
853 }
854 }
855 }
856 public:
857 HeavyMixTestBody( tbb::enumerable_thread_specific<Harness::FastRandom>& random,
858 tbb::enumerable_thread_specific<int>& isolated_level, int nested_level, bool high_priority )
859 : myRandom( random ), myIsolatedLevel( isolated_level )
860 , myNestedLevel( nested_level ), myHighPriority( high_priority ) {}
861 void operator()() const {
862 int &isolated_level = myIsolatedLevel.local();
863 ASSERT( myNestedLevel > isolated_level, "The outer-level task should not be stolen on isolated level" );
864 if ( myNestedLevel == 20 )
865 return;
866 Harness::FastRandom &rnd = myRandom.local();
867 if ( rnd.get() % 2 == 1 ) {
868 RunNextLevel<tbb::auto_partitioner>( rnd, isolated_level );
869 } else {
870 RunNextLevel<tbb::affinity_partitioner>( rnd, isolated_level );
871 }
872 }
873 void operator()(int) const {
874 this->operator()();
875 }
876 };
877
878 struct RandomInitializer {
879 Harness::FastRandom operator()() {
880 return Harness::FastRandom( tbb::this_task_arena::current_thread_index() );
881 }
882 };
883
884 void HeavyMixTest() {
885 tbb::task_scheduler_init init( tbb::task_scheduler_init::default_num_threads() < 3 ? 3 : tbb::task_scheduler_init::automatic );
886 RandomInitializer init_random;
887 tbb::enumerable_thread_specific<Harness::FastRandom> random( init_random );
888 tbb::enumerable_thread_specific<int> isolated_level( 0 );
889 for ( int i = 0; i < 5; ++i ) {
890 HeavyMixTestBody b( random, isolated_level, 1, false );
891 b( 0 );
892 REMARK( "." );
893 }
894 }
895 //--------------------------------------------------//
896 struct ContinuationTestReduceBody : NoAssign {
897 tbb::internal::isolation_tag myIsolation;
898 ContinuationTestReduceBody( tbb::internal::isolation_tag isolation ) : myIsolation( isolation ) {}
899 ContinuationTestReduceBody( ContinuationTestReduceBody& b, tbb::split ) : myIsolation( b.myIsolation ) {}
900 void operator()( tbb::blocked_range<int> ) {}
901 void join( ContinuationTestReduceBody& ) {
902 tbb::internal::isolation_tag isolation = tbb::task::self().prefix().isolation;
903 ASSERT( isolation == myIsolation, "The continuations should preserve children's isolation" );
904 }
905 };
906 struct ContinuationTestIsolated {
907 void operator()() const {
908 ContinuationTestReduceBody b( tbb::task::self().prefix().isolation );
909 tbb::parallel_deterministic_reduce( tbb::blocked_range<int>( 0, 100 ), b );
910 }
911 };
912 struct ContinuationTestParForBody : NoAssign {
913 tbb::enumerable_thread_specific<int> &myEts;
914 public:
915 ContinuationTestParForBody( tbb::enumerable_thread_specific<int> &ets ) : myEts( ets ){}
916 void operator()( int ) const {
917 int &e = myEts.local();
918 ++e;
919 ASSERT( e==1, "The task is stolen on isolated level" );
920 tbb::this_task_arena::isolate( ContinuationTestIsolated() );
921 --e;
922 }
923 };
924 void ContinuationTest() {
925 for ( int i = 0; i < 5; ++i ) {
926 tbb::enumerable_thread_specific<int> myEts;
927 tbb::parallel_for( 0, 100, ContinuationTestParForBody( myEts ), tbb::simple_partitioner() );
928 }
929 }
930 //--------------------------------------------------//
931#if TBB_USE_EXCEPTIONS
932 struct MyException {};
933 struct IsolatedBodyThrowsException {
934 void operator()() const {
935 __TBB_THROW( MyException() );
936 }
937 };
938 struct ExceptionTestBody : NoAssign {
939 tbb::enumerable_thread_specific<int>& myEts;
940 tbb::atomic<bool>& myIsStolen;
941 ExceptionTestBody( tbb::enumerable_thread_specific<int>& ets, tbb::atomic<bool>& is_stolen )
942 : myEts( ets ), myIsStolen( is_stolen ) {}
943 void operator()( int i ) const {
944 try {
945 tbb::this_task_arena::isolate( IsolatedBodyThrowsException() );
946 ASSERT( false, "The exception has been lost" );
947 }
948 catch ( MyException ) {}
949 catch ( ... ) {
950 ASSERT( false, "Unexpected exception" );
951 }
952 // Check that nested algorithms can steal outer-level tasks
953 int &e = myEts.local();
954 if ( e++ > 0 ) myIsStolen = true;
955 // work imbalance increases chances for stealing
956 tbb::parallel_for( 0, 10+i, Harness::DummyBody( 100 ) );
957 --e;
958 }
959 };
960
961#endif /* TBB_USE_EXCEPTIONS */
962 void ExceptionTest() {
963#if TBB_USE_EXCEPTIONS
964 tbb::enumerable_thread_specific<int> ets;
965 tbb::atomic<bool> is_stolen;
966 is_stolen = false;
967 for ( int i = 0; i<10; ++i ) {
968 tbb::parallel_for( 0, 1000, ExceptionTestBody( ets, is_stolen ) );
969 if ( is_stolen ) break;
970 }
971 ASSERT( is_stolen, "isolate should not affect non-isolated work" );
972#endif /* TBB_USE_EXCEPTIONS */
973 }
974
975 struct NonConstBody {
976 unsigned int state;
977 void operator()() {
978 state ^= ~0u;
979 }
980 };
981
982 void TestNonConstBody() {
983 NonConstBody body;
984 body.state = 0x6c97d5ed;
985 tbb::this_task_arena::isolate(body);
986 ASSERT(body.state == 0x93682a12, "The wrong state");
987 }
988
989 class TestEnqueueTask : public tbb::task {
990 bool enqueued;
991 tbb::enumerable_thread_specific<bool>& executed;
992 tbb::atomic<int>& completed;
993 public:
994 static const int N = 100;
995
996 TestEnqueueTask(bool enq, tbb::enumerable_thread_specific<bool>& exe, tbb::atomic<int>& c)
997 : enqueued(enq), executed(exe), completed(c) {}
998 tbb::task* execute() __TBB_override {
999 if (enqueued) {
1000 executed.local() = true;
1001 ++completed;
1002 __TBB_Yield();
1003 } else {
1004 parent()->add_ref_count(N);
1005 for (int i = 0; i < N; ++i)
1006 tbb::task::enqueue(*new (parent()->allocate_child()) TestEnqueueTask(true, executed, completed));
1007 }
1008 return NULL;
1009 }
1010 };
1011
1012 class TestEnqueueIsolateBody : NoCopy {
1013 tbb::enumerable_thread_specific<bool>& executed;
1014 tbb::atomic<int>& completed;
1015 public:
1016 TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool>& exe, tbb::atomic<int>& c)
1017 : executed(exe), completed(c) {}
1018 void operator()() {
1019 tbb::task::spawn_root_and_wait(*new (tbb::task::allocate_root()) TestEnqueueTask(false, executed, completed));
1020 }
1021 };
1022
1023 void TestEnqueue() {
1024 tbb::enumerable_thread_specific<bool> executed(false);
1025 tbb::atomic<int> completed;
1026
1027 // Check that the main thread can process enqueued tasks.
1028 completed = 0;
1029 TestEnqueueIsolateBody b1(executed, completed);
1030 b1();
1031 if (!executed.local())
1032 REPORT("Warning: No one enqueued task has executed by the main thread.\n");
1033
1034 executed.local() = false;
1035 completed = 0;
1036 const int N = 100;
1037 // Create enqueued tasks out of isolation.
1038 for (int i = 0; i < N; ++i)
1039 tbb::task::enqueue(*new (tbb::task::allocate_root()) TestEnqueueTask(true, executed, completed));
1040 TestEnqueueIsolateBody b2(executed, completed);
1041 tbb::this_task_arena::isolate(b2);
1042 ASSERT(executed.local() == false, "An enqueued task was executed within isolate.");
1043
1044 while (completed < TestEnqueueTask::N + N) __TBB_Yield();
1045 }
1046}
1047
1048void TestIsolatedExecute() {
1049 REMARK("TestIsolatedExecute");
1050 // At least 3 threads (owner + 2 thieves) are required to reproduce a situation when the owner steals outer
1051 // level task on a nested level. If we have only one thief then it will execute outer level tasks first and
1052 // the owner will not have a possibility to steal outer level tasks.
1053 int num_threads = min( tbb::task_scheduler_init::default_num_threads(), 3 );
1054 {
1055 // Too many threads require too many work to reproduce the stealing from outer level.
1056 tbb::task_scheduler_init init( max(num_threads, 7) );
1057 REMARK("."); TestIsolatedExecuteNS::TwoLoopsTest();
1058 REMARK("."); TestIsolatedExecuteNS::HeavyMixTest();
1059 REMARK("."); TestIsolatedExecuteNS::ContinuationTest();
1060 REMARK("."); TestIsolatedExecuteNS::ExceptionTest();
1061 }
1062 tbb::task_scheduler_init init(num_threads);
1063 REMARK("."); TestIsolatedExecuteNS::HeavyMixTest();
1064 REMARK("."); TestIsolatedExecuteNS::ContinuationTest();
1065 REMARK("."); TestIsolatedExecuteNS::TestNonConstBody();
1066 REMARK("."); TestIsolatedExecuteNS::TestEnqueue();
1067 REMARK("\rTestIsolatedExecute: done \n");
1068}
1069#endif /* __TBB_TASK_ISOLATION */
1070//--------------------------------------------------//
1071//--------------------------------------------------//
1072
1073class TestDelegatedSpawnWaitBody : NoAssign {
1074 tbb::task_arena &my_a;
1075 Harness::SpinBarrier &my_b1, &my_b2;
1076
1077 struct Spawner : NoAssign {
1078 tbb::task* const a_task;
1079 Spawner(tbb::task* const t) : a_task(t) {}
1080 void operator()() const {
1081 tbb::task::spawn( *new(a_task->allocate_child()) tbb::empty_task );
1082 }
1083 };
1084
1085 struct Waiter : NoAssign {
1086 tbb::task* const a_task;
1087 Waiter(tbb::task* const t) : a_task(t) {}
1088 void operator()() const {
1089 a_task->wait_for_all();
1090 }
1091 };
1092
1093public:
1094 TestDelegatedSpawnWaitBody( tbb::task_arena &a, Harness::SpinBarrier &b1, Harness::SpinBarrier &b2)
1095 : my_a(a), my_b1(b1), my_b2(b2) {}
1096 // NativeParallelFor's functor
1097 void operator()(int idx) const {
1098 if ( idx==0 ) { // thread 0 works in the arena, thread 1 waits for it (to prevent test hang)
1099 for( int i=0; i<2; ++i ) my_a.enqueue(*this); // tasks to sync with workers
1100 tbb::empty_task* root_task = new(tbb::task::allocate_root()) tbb::empty_task;
1101 root_task->set_ref_count(100001);
1102 my_b1.timed_wait(10); // sync with the workers
1103 for( int i=0; i<100000; ++i) {
1104 my_a.execute(Spawner(root_task));
1105 }
1106 my_a.execute(Waiter(root_task));
1107 tbb::task::destroy(*root_task);
1108 }
1109 my_b2.timed_wait(10); // sync both threads
1110 }
1111 // Arena's functor
1112 void operator()() const {
1113 my_b1.timed_wait(10); // sync with the arena master
1114 }
1115};
1116
1117void TestDelegatedSpawnWait() {
1118 // Regression test for a bug with missed wakeup notification from a delegated task
1119 REMARK( "Testing delegated spawn & wait\n" );
1120 tbb::task_arena a(2,0);
1121 a.initialize();
1122 Harness::SpinBarrier barrier1(3), barrier2(2);
1123 NativeParallelFor( 2, TestDelegatedSpawnWaitBody(a, barrier1, barrier2) );
1124 a.debug_wait_until_empty();
1125}
1126
1127class TestMultipleWaitsArenaWait : NoAssign {
1128public:
1129 TestMultipleWaitsArenaWait( int idx, int bunch_size, int num_tasks, tbb::task** waiters, tbb::atomic<int>& processed )
1130 : my_idx( idx ), my_bunch_size( bunch_size ), my_num_tasks(num_tasks), my_waiters( waiters ), my_processed( processed ) {}
1131 void operator()() const {
1132 ++my_processed;
1133 // Wait for all tasks
1134 if ( my_idx < my_num_tasks )
1135 my_waiters[my_idx]->wait_for_all();
1136 // Signal waiting tasks
1137 if ( my_idx >= my_bunch_size )
1138 my_waiters[my_idx-my_bunch_size]->decrement_ref_count();
1139 }
1140private:
1141 int my_idx;
1142 int my_bunch_size;
1143 int my_num_tasks;
1144 tbb::task** my_waiters;
1145 tbb::atomic<int>& my_processed;
1146};
1147
1148class TestMultipleWaitsThreadBody : NoAssign {
1149public:
1150 TestMultipleWaitsThreadBody( int bunch_size, int num_tasks, tbb::task_arena& a, tbb::task** waiters, tbb::atomic<int>& processed )
1151 : my_bunch_size( bunch_size ), my_num_tasks( num_tasks ), my_arena( a ), my_waiters( waiters ), my_processed( processed ) {}
1152 void operator()( int idx ) const {
1153 my_arena.execute( TestMultipleWaitsArenaWait( idx, my_bunch_size, my_num_tasks, my_waiters, my_processed ) );
1154 --my_processed;
1155 }
1156private:
1157 int my_bunch_size;
1158 int my_num_tasks;
1159 tbb::task_arena& my_arena;
1160 tbb::task** my_waiters;
1161 tbb::atomic<int>& my_processed;
1162};
1163
1164#include "tbb/tbb_thread.h"
1165
1166void TestMultipleWaits( int num_threads, int num_bunches, int bunch_size ) {
1167 tbb::task_arena a( num_threads );
1168 const int num_tasks = (num_bunches-1)*bunch_size;
1169 tbb::task** tasks = new tbb::task*[num_tasks];
1170 for ( int i = 0; i<num_tasks; ++i )
1171 tasks[i] = new (tbb::task::allocate_root()) tbb::empty_task();
1172 tbb::atomic<int> processed;
1173 processed = 0;
1174 for ( int repeats = 0; repeats<10; ++repeats ) {
1175 int idx = 0;
1176 for ( int bunch = 0; bunch < num_bunches-1; ++bunch ) {
1177 // Sync with the previous bunch of tasks to prevent "false" nested dependicies (when a nested task waits for an outer task).
1178 while ( processed < bunch*bunch_size ) __TBB_Yield();
1179 // Run the bunch of threads/tasks that depend on the next bunch of threads/tasks.
1180 for ( int i = 0; i<bunch_size; ++i ) {
1181 tasks[idx]->set_ref_count( 2 );
1182 tbb::tbb_thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, tasks, processed ), idx++ ).detach();
1183 }
1184 }
1185 // No sync because the threads of the last bunch do not call wait_for_all.
1186 // Run the last bunch of threads.
1187 for ( int i = 0; i<bunch_size; ++i )
1188 tbb::tbb_thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, tasks, processed ), idx++ ).detach();
1189 while ( processed ) __TBB_Yield();
1190 }
1191 for ( int i = 0; i<num_tasks; ++i )
1192 tbb::task::destroy( *tasks[i] );
1193 delete[] tasks;
1194}
1195
1196void TestMultipleWaits() {
1197 REMARK( "Testing multiple waits\n" );
1198 // Limit the number of threads to prevent heavy oversubscription.
1199 const int max_threads = min( 16, tbb::task_scheduler_init::default_num_threads() );
1200
1201 Harness::FastRandom rnd(1234);
1202 for ( int threads = 1; threads <= max_threads; threads += max( threads/2, 1 ) ) {
1203 for ( int i = 0; i<3; ++i ) {
1204 const int num_bunches = 3 + rnd.get()%3;
1205 const int bunch_size = max_threads + rnd.get()%max_threads;
1206 TestMultipleWaits( threads, num_bunches, bunch_size );
1207 }
1208 }
1209}
1210//--------------------------------------------------//
1211#include "tbb/global_control.h"
1212
1213void TestSmallStackSize() {
1214 tbb::task_scheduler_init init(tbb::task_scheduler_init::automatic,
1215 tbb::global_control::active_value(tbb::global_control::thread_stack_size) / 2 );
1216 // The test produces the warning (not a error) if fails. So the test is run many times
1217 // to make the log annoying (to force to consider it as an error).
1218 for (int i = 0; i < 100; ++i) {
1219 tbb::task_arena a;
1220 a.initialize();
1221 }
1222}
1223//--------------------------------------------------//
1224#if __TBB_CPP11_RVALUE_REF_PRESENT
1225namespace TestMoveSemanticsNS {
1226 struct TestFunctor {
1227 void operator()() const {};
1228 };
1229
1230 struct MoveOnlyFunctor : MoveOnly, TestFunctor {
1231 MoveOnlyFunctor() : MoveOnly() {};
1232 MoveOnlyFunctor(MoveOnlyFunctor&& other) : MoveOnly(std::move(other)) {};
1233 };
1234
1235 struct MovePreferableFunctor : Movable, TestFunctor {
1236 MovePreferableFunctor() : Movable() {};
1237 MovePreferableFunctor(MovePreferableFunctor&& other) : Movable( std::move(other) ) {};
1238 MovePreferableFunctor(const MovePreferableFunctor& other) : Movable(other) {};
1239 };
1240
1241 struct NoMoveNoCopyFunctor : NoCopy, TestFunctor {
1242 NoMoveNoCopyFunctor() : NoCopy() {};
1243 // mv ctor is not allowed as cp ctor from parent NoCopy
1244 private:
1245 NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&);
1246 };
1247
1248
1249 void TestFunctors() {
1250 tbb::task_arena ta;
1251 MovePreferableFunctor mpf;
1252 // execute() doesn't have any copies or moves of arguments inside the impl
1253 ta.execute( NoMoveNoCopyFunctor() );
1254
1255 ta.enqueue( MoveOnlyFunctor() );
1256 ta.enqueue( mpf );
1257 ASSERT(mpf.alive, "object was moved when was passed by lval");
1258 mpf.Reset();
1259 ta.enqueue( std::move(mpf) );
1260 ASSERT(!mpf.alive, "object was copied when was passed by rval");
1261 mpf.Reset();
1262#if __TBB_TASK_PRIORITY
1263 ta.enqueue( MoveOnlyFunctor(), tbb::priority_normal );
1264 ta.enqueue( mpf, tbb::priority_normal );
1265 ASSERT(mpf.alive, "object was moved when was passed by lval");
1266 mpf.Reset();
1267 ta.enqueue( std::move(mpf), tbb::priority_normal );
1268 ASSERT(!mpf.alive, "object was copied when was passed by rval");
1269 mpf.Reset();
1270#endif
1271 }
1272}
1273#endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
1274
1275void TestMoveSemantics() {
1276#if __TBB_CPP11_RVALUE_REF_PRESENT
1277 TestMoveSemanticsNS::TestFunctors();
1278#else
1279 REPORT("Known issue: move support tests are skipped.\n");
1280#endif
1281}
1282//--------------------------------------------------//
1283#if __TBB_CPP11_DECLTYPE_PRESENT && !__TBB_CPP11_DECLTYPE_OF_FUNCTION_RETURN_TYPE_BROKEN
1284#include <vector>
1285#include "harness_state_trackable.h"
1286
1287namespace TestReturnValueNS {
1288 struct noDefaultTag {};
1289 class ReturnType : public Harness::StateTrackable<> {
1290 static const int SIZE = 42;
1291 std::vector<int> data;
1292 public:
1293 ReturnType(noDefaultTag) : Harness::StateTrackable<>(0) {}
1294#if !__TBB_IMPLICIT_MOVE_PRESENT
1295 // Define copy constructor to test that it is never called
1296 ReturnType(const ReturnType& r) : Harness::StateTrackable<>(r), data(r.data) {}
1297 ReturnType(ReturnType&& r) : Harness::StateTrackable<>(std::move(r)), data(std::move(r.data)) {}
1298#endif
1299 void fill() {
1300 for (int i = 0; i < SIZE; ++i)
1301 data.push_back(i);
1302 }
1303 void check() {
1304 ASSERT(data.size() == unsigned(SIZE), NULL);
1305 for (int i = 0; i < SIZE; ++i)
1306 ASSERT(data[i] == i, NULL);
1307 Harness::StateTrackableCounters::counters_t& cnts = Harness::StateTrackableCounters::counters;
1308 ASSERT((cnts[Harness::StateTrackableBase::DefaultInitialized] == 0), NULL);
1309 ASSERT(cnts[Harness::StateTrackableBase::DirectInitialized] == 1, NULL);
1310 std::size_t copied = cnts[Harness::StateTrackableBase::CopyInitialized];
1311 std::size_t moved = cnts[Harness::StateTrackableBase::MoveInitialized];
1312 ASSERT(cnts[Harness::StateTrackableBase::Destroyed] == copied + moved, NULL);
1313 // The number of copies/moves should not exceed 3: function return, store to an internal storage,
1314 // acquire internal storage.
1315#if __TBB_CPP11_RVALUE_REF_PRESENT
1316 ASSERT(copied == 0 && moved <=3, NULL);
1317#else
1318 ASSERT(copied <= 3 && moved == 0, NULL);
1319#endif
1320 }
1321 };
1322
1323 template <typename R>
1324 R function() {
1325 noDefaultTag tag;
1326 R r(tag);
1327 r.fill();
1328 return r;
1329 }
1330
1331 template <>
1332 void function<void>() {}
1333
1334 template <typename R>
1335 struct Functor {
1336 R operator()() const {
1337 return function<R>();
1338 }
1339 };
1340
1341 tbb::task_arena& arena() {
1342 static tbb::task_arena a;
1343 return a;
1344 }
1345
1346 template <typename F>
1347 void TestExecute(F &f) {
1348 Harness::StateTrackableCounters::reset();
1349 ReturnType r = arena().execute(f);
1350 r.check();
1351 }
1352
1353 template <typename F>
1354 void TestExecute(const F &f) {
1355 Harness::StateTrackableCounters::reset();
1356 ReturnType r = arena().execute(f);
1357 r.check();
1358 }
1359#if TBB_PREVIEW_TASK_ISOLATION
1360 template <typename F>
1361 void TestIsolate(F &f) {
1362 Harness::StateTrackableCounters::reset();
1363 ReturnType r = tbb::this_task_arena::isolate(f);
1364 r.check();
1365 }
1366
1367 template <typename F>
1368 void TestIsolate(const F &f) {
1369 Harness::StateTrackableCounters::reset();
1370 ReturnType r = tbb::this_task_arena::isolate(f);
1371 r.check();
1372 }
1373#endif
1374
1375 void Test() {
1376 TestExecute(Functor<ReturnType>());
1377 Functor<ReturnType> f1;
1378 TestExecute(f1);
1379 TestExecute(function<ReturnType>);
1380
1381 arena().execute(Functor<void>());
1382 Functor<void> f2;
1383 arena().execute(f2);
1384 arena().execute(function<void>);
1385#if TBB_PREVIEW_TASK_ISOLATION
1386 TestIsolate(Functor<ReturnType>());
1387 TestIsolate(f1);
1388 TestIsolate(function<ReturnType>);
1389 tbb::this_task_arena::isolate(Functor<void>());
1390 tbb::this_task_arena::isolate(f2);
1391 tbb::this_task_arena::isolate(function<void>);
1392#endif
1393 }
1394}
1395#endif /* __TBB_CPP11_DECLTYPE_PRESENT */
1396
1397void TestReturnValue() {
1398#if __TBB_CPP11_DECLTYPE_PRESENT && !__TBB_CPP11_DECLTYPE_OF_FUNCTION_RETURN_TYPE_BROKEN
1399 TestReturnValueNS::Test();
1400#endif
1401}
1402//--------------------------------------------------//
1403void TestConcurrentFunctionality(int min_thread_num = MinThread, int max_thread_num = MaxThread) {
1404 InitializeAndTerminate(max_thread_num);
1405 for (int p = min_thread_num; p <= max_thread_num; ++p) {
1406 REMARK("testing with %d threads\n", p);
1407 TestConcurrentArenas(p);
1408 TestMultipleMasters(p);
1409 TestArenaConcurrency(p);
1410 }
1411}
1412//--------------------------------------------------//
1413struct DefaultCreatedWorkersAmountBody {
1414 int my_threadnum;
1415 DefaultCreatedWorkersAmountBody(int threadnum) : my_threadnum(threadnum) {}
1416 void operator()(int) const {
1417 ASSERT(my_threadnum == tbb::this_task_arena::max_concurrency(), "concurrency level is not equal specified threadnum");
1418 ASSERT(tbb::this_task_arena::current_thread_index() < tbb::this_task_arena::max_concurrency(), "amount of created threads is more than specified by default");
1419 local_id.local() = 1;
1420 Harness::Sleep(1);
1421 }
1422};
1423
1424struct NativeParallelForBody {
1425 int my_thread_num;
1426 int iterations;
1427 NativeParallelForBody(int thread_num, int multiplier = 100) : my_thread_num(thread_num), iterations(multiplier * thread_num) {}
1428 void operator()(int idx) const {
1429 ASSERT(idx == 0, "more than 1 thread is going to reset TLS");
1430 ResetTLS();
1431 tbb::parallel_for(0, iterations, DefaultCreatedWorkersAmountBody(my_thread_num), tbb::simple_partitioner());
1432 ASSERT(local_id.size() == size_t(my_thread_num), "amount of created threads is not equal to default num");
1433 }
1434};
1435
1436void TestDefaultCreatedWorkersAmount() {
1437 NativeParallelFor(1, NativeParallelForBody(tbb::task_scheduler_init::default_num_threads()));
1438}
1439
1440void TestAbilityToCreateWorkers(int thread_num) {
1441 tbb::task_scheduler_init init_market_with_necessary_amount_plus_one(thread_num);
1442 // Checks only some part of reserved-master threads amount:
1443 // 0 and 1 reserved threads are important cases but it is also needed
1444 // to collect some statistic data with other amount and to not consume
1445 // whole test sesion time checking each amount
1446 TestArenaConcurrency(thread_num - 1, 0, int(thread_num / 2.72));
1447 TestArenaConcurrency(thread_num, 1, int(thread_num / 3.14));
1448}
1449
1450void TestDefaultWorkersLimit() {
1451 TestDefaultCreatedWorkersAmount();
1452 // Shared RML might limit the number of workers even if you specify the limits
1453 // by the reason of (default_concurrency==max_concurrency) for shared RML
1454#ifndef RML_USE_WCRM
1455 TestAbilityToCreateWorkers(256);
1456#endif
1457}
1458//--------------------------------------------------//
1459
1460// MyObserver checks if threads join to the same arena
1461struct MyObserver: public tbb::task_scheduler_observer {
1462 tbb::enumerable_thread_specific<tbb::task_arena*>& my_tls;
1463 tbb::task_arena& my_arena;
1464 tbb::atomic<int>& my_failure_counter;
1465 tbb::atomic<int>& my_counter;
1466
1467 MyObserver(tbb::task_arena& a,
1468 tbb::enumerable_thread_specific<tbb::task_arena*>& tls,
1469 tbb::atomic<int>& failure_counter,
1470 tbb::atomic<int>& counter)
1471 : tbb::task_scheduler_observer(a), my_tls(tls), my_arena(a),
1472 my_failure_counter(failure_counter), my_counter(counter) {
1473 observe(true);
1474 }
1475 void on_scheduler_entry(bool worker) __TBB_override {
1476 if (worker) {
1477 ++my_counter;
1478 tbb::task_arena*& cur_arena = my_tls.local();
1479 if (cur_arena != 0 && cur_arena != &my_arena) {
1480 ++my_failure_counter;
1481 }
1482 cur_arena = &my_arena;
1483 }
1484 }
1485};
1486
1487struct MyLoopBody {
1488 Harness::SpinBarrier& m_barrier;
1489 MyLoopBody(Harness::SpinBarrier& b):m_barrier(b) { }
1490 void operator()(int) const {
1491 m_barrier.wait();
1492 }
1493};
1494
1495struct TaskForArenaExecute {
1496 Harness::SpinBarrier& m_barrier;
1497 TaskForArenaExecute(Harness::SpinBarrier& b):m_barrier(b) { }
1498 void operator()() const {
1499 tbb::parallel_for(0, tbb::this_task_arena::max_concurrency(),
1500 MyLoopBody(m_barrier), tbb::simple_partitioner()
1501 );
1502 }
1503};
1504
1505struct ExecuteParallelFor {
1506 int n_per_thread;
1507 int n_repetitions;
1508 std::vector<tbb::task_arena>& arenas;
1509 Harness::SpinBarrier& arena_barrier;
1510 Harness::SpinBarrier& master_barrier;
1511 ExecuteParallelFor(const int n_per_thread_, const int n_repetitions_,
1512 std::vector<tbb::task_arena>& arenas_,
1513 Harness::SpinBarrier& arena_barrier_, Harness::SpinBarrier& master_barrier_)
1514 : n_per_thread(n_per_thread_), n_repetitions(n_repetitions_), arenas(arenas_),
1515 arena_barrier(arena_barrier_), master_barrier(master_barrier_){ }
1516 void operator()(int i) const {
1517 for (int j = 0; j < n_repetitions; ++j) {
1518 arenas[i].execute(TaskForArenaExecute(arena_barrier));
1519 for(volatile int k = 0; k < n_per_thread; ++k){/* waiting until workers fall asleep */}
1520 master_barrier.wait();
1521 }
1522 }
1523};
1524
1525// if n_threads == -1 then global_control initialized with default value
1526void TestArenaWorkersMigrationWithNumThreads(int n_threads = 0) {
1527 if (n_threads == 0) {
1528 n_threads = tbb::task_scheduler_init::default_num_threads();
1529 }
1530 const int max_n_arenas = 8;
1531 int n_arenas = 2;
1532 if(n_threads >= 16)
1533 n_arenas = max_n_arenas;
1534 else if (n_threads >= 8)
1535 n_arenas = 4;
1536 n_threads = n_arenas * (n_threads / n_arenas);
1537 const int n_per_thread = 10000000;
1538 const int n_repetitions = 100;
1539 const int n_outer_repetitions = 20;
1540 std::multiset<float> failure_ratio; // for median calculating
1541 tbb::global_control control(tbb::global_control::max_allowed_parallelism, n_threads - (n_arenas - 1));
1542 Harness::SpinBarrier master_barrier(n_arenas);
1543 Harness::SpinBarrier arena_barrier(n_threads);
1544 MyObserver* observer[max_n_arenas];
1545 std::vector<tbb::task_arena> arenas(n_arenas);
1546 tbb::atomic<int> failure_counter;
1547 tbb::atomic<int> counter;
1548 tbb::enumerable_thread_specific<tbb::task_arena*> tls;
1549 for (int i = 0; i < n_arenas; ++i) {
1550 arenas[i].initialize(n_threads / n_arenas);
1551 observer[i] = new MyObserver(arenas[i], tls, failure_counter, counter);
1552 }
1553 int ii = 0;
1554 for (; ii < n_outer_repetitions; ++ii) {
1555 failure_counter = 0;
1556 counter = 0;
1557 // Main code
1558 NativeParallelFor(n_arenas, ExecuteParallelFor(n_per_thread, n_repetitions,
1559 arenas, arena_barrier, master_barrier));
1560 // TODO: get rid of check below by setting ratio between n_threads and n_arenas
1561 failure_ratio.insert((counter != 0 ? float(failure_counter) / counter : 1.0f));
1562 tls.clear();
1563 // collect 3 elements in failure_ratio before calculating median
1564 if (ii > 1) {
1565 std::multiset<float>::iterator it = failure_ratio.begin();
1566 std::advance(it, failure_ratio.size() / 2);
1567 if (*it < 0.02)
1568 break;
1569 }
1570 }
1571 for (int i = 0; i < n_arenas; ++i) {
1572 delete observer[i];
1573 }
1574 // check if median is so big
1575 std::multiset<float>::iterator it = failure_ratio.begin();
1576 std::advance(it, failure_ratio.size() / 2);
1577 // TODO: decrease constants 0.05 and 0.3 by setting ratio between n_threads and n_arenas
1578 if (*it > 0.05) {
1579 REPORT("Warning: So many cases when threads join to different arenas.\n");
1580 ASSERT(*it <= 0.3, "A lot of cases when threads join to different arenas.\n");
1581 }
1582}
1583
1584void TestArenaWorkersMigration() {
1585 TestArenaWorkersMigrationWithNumThreads(4);
1586 if (tbb::task_scheduler_init::default_num_threads() != 4) {
1587 TestArenaWorkersMigrationWithNumThreads();
1588 }
1589}
1590
1591class CheckArenaNumThreads : public tbb::task {
1592public:
1593 static Harness::SpinBarrier m_barrier;
1594
1595 CheckArenaNumThreads(int nt, int rm): num_threads(nt), reserved_for_masters(rm) {
1596 m_barrier.initialize(2);
1597 }
1598
1599 tbb::task* execute() __TBB_override {
1600 ASSERT( tbb::this_task_arena::max_concurrency() == num_threads, "Wrong concurrency of current arena" );
1601 ASSERT( tbb::this_task_arena::current_thread_index() >= reserved_for_masters, "Thread shouldn't attach to master's slots" );
1602 m_barrier.wait();
1603 return NULL;
1604 }
1605
1606private:
1607 const int num_threads;
1608 const int reserved_for_masters;
1609};
1610
1611Harness::SpinBarrier CheckArenaNumThreads::m_barrier;
1612
1613class EnqueueTaskIntoTaskArena
1614{
1615public:
1616 EnqueueTaskIntoTaskArena(tbb::task& t, tbb::task_arena& a) : my_task(t), my_arena(a) {}
1617 void operator() ()
1618 {
1619 tbb::task::enqueue(my_task, my_arena);
1620 }
1621private:
1622 tbb::task& my_task;
1623 tbb::task_arena& my_arena;
1624};
1625
1626void TestTaskEnqueueInArena()
1627{
1628 int pp[8]={3, 4, 5, 7, 8, 11, 13, 17};
1629 for(int i = 0; i < 8; ++i)
1630 {
1631 int p = pp[i];
1632 int reserved_for_masters = p - 1;
1633 tbb::task_arena a(p, reserved_for_masters);
1634 a.initialize();
1635 //Enqueue on master thread
1636 {
1637 CheckArenaNumThreads& t = *new( tbb::task::allocate_root() ) CheckArenaNumThreads(p, reserved_for_masters);
1638 tbb::task::enqueue(t, a);
1639 CheckArenaNumThreads::m_barrier.wait();
1640 a.debug_wait_until_empty();
1641 }
1642 //Enqueue on thread without scheduler
1643 {
1644 CheckArenaNumThreads& t = *new( tbb::task::allocate_root() ) CheckArenaNumThreads(p, reserved_for_masters);
1645 tbb::tbb_thread thr(EnqueueTaskIntoTaskArena(t, a));
1646 CheckArenaNumThreads::m_barrier.wait();
1647 a.debug_wait_until_empty();
1648 thr.join();
1649 }
1650 }
1651}
1652
1653//--------------------------------------------------//
1654
1655int TestMain() {
1656#if __TBB_TASK_ISOLATION
1657 TestIsolatedExecute();
1658#endif /* __TBB_TASK_ISOLATION */
1659 TestSmallStackSize();
1660 TestDefaultWorkersLimit();
1661 // The test uses up to MaxThread workers (in arenas with no master thread),
1662 // so the runtime should be initialized appropriately.
1663 tbb::task_scheduler_init init_market_p_plus_one(MaxThread + 1);
1664 TestConcurrentFunctionality();
1665 TestArenaEntryConsistency();
1666 TestAttach(MaxThread);
1667 TestConstantFunctorRequirement();
1668 TestDelegatedSpawnWait();
1669 TestMultipleWaits();
1670 TestMoveSemantics();
1671 TestReturnValue();
1672 TestArenaWorkersMigration();
1673 TestTaskEnqueueInArena();
1674 return Harness::Done;
1675}
1676