| 1 | /* |
| 2 | Copyright (c) 2005-2019 Intel Corporation |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | #define 1 |
| 18 | |
| 19 | #include <stdexcept> |
| 20 | #include <cstdlib> |
| 21 | #include <cstdio> |
| 22 | #include <vector> |
| 23 | #include <set> |
| 24 | |
| 25 | #include "harness_fp.h" |
| 26 | |
| 27 | #if __TBB_TASK_ISOLATION |
| 28 | // Whitebox stuff for TestIsolatedExecuteNS::ContinuationTest(). |
| 29 | // TODO: Consider better approach instead of the whitebox approach. |
| 30 | #define private public |
| 31 | #include "tbb/task.h" |
| 32 | #undef private |
| 33 | #endif /* __TBB_TASK_ISOLATION */ |
| 34 | |
| 35 | #include "tbb/task_arena.h" |
| 36 | #include "tbb/atomic.h" |
| 37 | #include "tbb/task_scheduler_observer.h" |
| 38 | #include "tbb/task_scheduler_init.h" |
| 39 | #include "tbb/parallel_for.h" |
| 40 | #include "tbb/blocked_range.h" |
| 41 | #include "tbb/enumerable_thread_specific.h" |
| 42 | |
| 43 | #include "harness_assert.h" |
| 44 | #include "harness.h" |
| 45 | #include "harness_barrier.h" |
| 46 | |
| 47 | #include "tbb/tbb_thread.h" |
| 48 | |
| 49 | #if _MSC_VER |
| 50 | // plays around __TBB_NO_IMPLICIT_LINKAGE. __TBB_LIB_NAME should be defined (in makefiles) |
| 51 | #pragma comment(lib, __TBB_STRING(__TBB_LIB_NAME)) |
| 52 | #endif |
| 53 | |
| 54 | #include "tbb/global_control.h" |
| 55 | //--------------------------------------------------// |
| 56 | // Test that task_arena::initialize and task_arena::terminate work when doing nothing else. |
| 57 | /* maxthread is treated as the biggest possible concurrency level. */ |
| 58 | void InitializeAndTerminate( int maxthread ) { |
| 59 | __TBB_TRY { |
| 60 | for( int i=0; i<200; ++i ) { |
| 61 | switch( i&3 ) { |
| 62 | // Arena is created inactive, initialization is always explicit. Lazy initialization is covered by other test functions. |
| 63 | // Explicit initialization can either keep the original values or change those. |
| 64 | // Arena termination can be explicit or implicit (in the destructor). |
| 65 | // TODO: extend with concurrency level checks if such a method is added. |
| 66 | default: { |
| 67 | tbb::task_arena arena( std::rand() % maxthread + 1 ); |
| 68 | ASSERT(!arena.is_active(), "arena should not be active until initialized" ); |
| 69 | arena.initialize(); |
| 70 | ASSERT(arena.is_active(), NULL); |
| 71 | arena.terminate(); |
| 72 | ASSERT(!arena.is_active(), "arena should not be active; it was terminated" ); |
| 73 | break; |
| 74 | } |
| 75 | case 0: { |
| 76 | tbb::task_arena arena( 1 ); |
| 77 | ASSERT(!arena.is_active(), "arena should not be active until initialized" ); |
| 78 | arena.initialize( std::rand() % maxthread + 1 ); // change the parameters |
| 79 | ASSERT(arena.is_active(), NULL); |
| 80 | break; |
| 81 | } |
| 82 | case 1: { |
| 83 | tbb::task_arena arena( tbb::task_arena::automatic ); |
| 84 | ASSERT(!arena.is_active(), NULL); |
| 85 | arena.initialize(); |
| 86 | ASSERT(arena.is_active(), NULL); |
| 87 | break; |
| 88 | } |
| 89 | case 2: { |
| 90 | tbb::task_arena arena; |
| 91 | ASSERT(!arena.is_active(), "arena should not be active until initialized" ); |
| 92 | arena.initialize( std::rand() % maxthread + 1 ); |
| 93 | ASSERT(arena.is_active(), NULL); |
| 94 | arena.terminate(); |
| 95 | ASSERT(!arena.is_active(), "arena should not be active; it was terminated" ); |
| 96 | break; |
| 97 | } |
| 98 | } |
| 99 | } |
| 100 | } __TBB_CATCH( std::runtime_error& error ) { |
| 101 | #if TBB_USE_EXCEPTIONS |
| 102 | REPORT("ERROR: %s\n" , error.what() ); |
| 103 | #endif /* TBB_USE_EXCEPTIONS */ |
| 104 | } |
| 105 | } |
| 106 | |
| 107 | //--------------------------------------------------// |
| 108 | // Definitions used in more than one test |
| 109 | typedef tbb::blocked_range<int> Range; |
| 110 | |
| 111 | // slot_id value: -1 is reserved by current_slot(), -2 is set in on_scheduler_exit() below |
| 112 | static tbb::enumerable_thread_specific<int> local_id, old_id, slot_id(-3); |
| 113 | |
| 114 | void ResetTLS() { |
| 115 | local_id.clear(); |
| 116 | old_id.clear(); |
| 117 | slot_id.clear(); |
| 118 | } |
| 119 | |
| 120 | class ArenaObserver : public tbb::task_scheduler_observer { |
| 121 | int myId; // unique observer/arena id within a test |
| 122 | int myMaxConcurrency; // concurrency of the associated arena |
| 123 | int myNumReservedSlots; // reserved slots in the associated arena |
| 124 | void on_scheduler_entry( bool is_worker ) __TBB_override { |
| 125 | int current_index = tbb::this_task_arena::current_thread_index(); |
| 126 | REMARK("a %s #%p is entering arena %d from %d on slot %d\n" , is_worker?"worker" :"master" , |
| 127 | &local_id.local(), myId, local_id.local(), current_index ); |
| 128 | ASSERT(current_index<(myMaxConcurrency>1?myMaxConcurrency:2), NULL); |
| 129 | if(is_worker) ASSERT(current_index>=myNumReservedSlots, NULL); |
| 130 | |
| 131 | ASSERT(!old_id.local(), "double call to on_scheduler_entry" ); |
| 132 | old_id.local() = local_id.local(); |
| 133 | ASSERT(old_id.local() != myId, "double entry to the same arena" ); |
| 134 | local_id.local() = myId; |
| 135 | slot_id.local() = current_index; |
| 136 | } |
| 137 | void on_scheduler_exit( bool is_worker ) __TBB_override { |
| 138 | REMARK("a %s #%p is leaving arena %d to %d\n" , is_worker?"worker" :"master" , |
| 139 | &local_id.local(), myId, old_id.local()); |
| 140 | ASSERT(local_id.local() == myId, "nesting of arenas is broken" ); |
| 141 | ASSERT(slot_id.local() == tbb::this_task_arena::current_thread_index(), NULL); |
| 142 | //!deprecated, remove when tbb::task_arena::current_thread_index is removed. |
| 143 | ASSERT(slot_id.local() == tbb::task_arena::current_thread_index(), NULL); |
| 144 | slot_id.local() = -2; |
| 145 | local_id.local() = old_id.local(); |
| 146 | old_id.local() = 0; |
| 147 | } |
| 148 | public: |
| 149 | ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id) |
| 150 | : tbb::task_scheduler_observer(a) |
| 151 | , myId(id) |
| 152 | , myMaxConcurrency(maxConcurrency) |
| 153 | , myNumReservedSlots(numReservedSlots) { |
| 154 | ASSERT(myId, NULL); |
| 155 | observe(true); |
| 156 | } |
| 157 | ~ArenaObserver () { |
| 158 | ASSERT(!old_id.local(), "inconsistent observer state" ); |
| 159 | } |
| 160 | }; |
| 161 | |
| 162 | struct IndexTrackingBody { // Must be used together with ArenaObserver |
| 163 | void operator() ( const Range& ) const { |
| 164 | ASSERT(slot_id.local() == tbb::this_task_arena::current_thread_index(), NULL); |
| 165 | //!deprecated, remove when tbb::task_arena::current_thread_index is removed. |
| 166 | ASSERT(slot_id.local() == tbb::task_arena::current_thread_index(), NULL); |
| 167 | for ( volatile int i = 0; i < 50000; ++i ) |
| 168 | ; |
| 169 | } |
| 170 | }; |
| 171 | |
| 172 | struct AsynchronousWork : NoAssign { |
| 173 | Harness::SpinBarrier &my_barrier; |
| 174 | bool my_is_blocking; |
| 175 | AsynchronousWork(Harness::SpinBarrier &a_barrier, bool blocking = true) |
| 176 | : my_barrier(a_barrier), my_is_blocking(blocking) {} |
| 177 | void operator()() const { |
| 178 | ASSERT(local_id.local() != 0, "not in explicit arena" ); |
| 179 | tbb::parallel_for(Range(0,500), IndexTrackingBody(), tbb::simple_partitioner(), *tbb::task::self().group()); |
| 180 | if(my_is_blocking) my_barrier.timed_wait(10); // must be asynchronous to master thread |
| 181 | else my_barrier.signal_nowait(); |
| 182 | } |
| 183 | }; |
| 184 | |
| 185 | //--------------------------------------------------// |
| 186 | // Test that task_arenas might be created and used from multiple application threads. |
| 187 | // Also tests arena observers. The parameter p is the index of an app thread running this test. |
| 188 | void TestConcurrentArenasFunc(int idx) { |
| 189 | // A regression test for observer activation order: |
| 190 | // check that arena observer can be activated before local observer |
| 191 | struct LocalObserver : public tbb::task_scheduler_observer { |
| 192 | LocalObserver() : tbb::task_scheduler_observer(/*local=*/true) { observe(true); } |
| 193 | }; |
| 194 | tbb::task_arena a1; |
| 195 | a1.initialize(1,0); |
| 196 | ArenaObserver o1(a1, 1, 0, idx*2+1); // the last argument is a "unique" observer/arena id for the test |
| 197 | ASSERT(o1.is_observing(), "Arena observer has not been activated" ); |
| 198 | LocalObserver lo; |
| 199 | ASSERT(lo.is_observing(), "Local observer has not been activated" ); |
| 200 | tbb::task_arena a2(2,1); |
| 201 | ArenaObserver o2(a2, 2, 1, idx*2+2); |
| 202 | ASSERT(o2.is_observing(), "Arena observer has not been activated" ); |
| 203 | Harness::SpinBarrier barrier(2); |
| 204 | AsynchronousWork work(barrier); |
| 205 | a1.enqueue(work); // put async work |
| 206 | barrier.timed_wait(10); |
| 207 | a2.enqueue(work); // another work |
| 208 | a2.execute(work); // my_barrier.timed_wait(10) inside |
| 209 | a1.debug_wait_until_empty(); |
| 210 | a2.debug_wait_until_empty(); |
| 211 | } |
| 212 | |
| 213 | void TestConcurrentArenas(int p) { |
| 214 | ResetTLS(); |
| 215 | NativeParallelFor( p, &TestConcurrentArenasFunc ); |
| 216 | } |
| 217 | |
| 218 | //--------------------------------------------------// |
| 219 | // Test multiple application threads working with a single arena at the same time. |
| 220 | class MultipleMastersPart1 : NoAssign { |
| 221 | tbb::task_arena &my_a; |
| 222 | Harness::SpinBarrier &my_b1, &my_b2; |
| 223 | public: |
| 224 | MultipleMastersPart1( tbb::task_arena &a, Harness::SpinBarrier &b1, Harness::SpinBarrier &b2) |
| 225 | : my_a(a), my_b1(b1), my_b2(b2) {} |
| 226 | void operator()(int) const { |
| 227 | my_a.execute(AsynchronousWork(my_b2, /*blocking=*/false)); |
| 228 | my_b1.timed_wait(10); |
| 229 | // A regression test for bugs 1954 & 1971 |
| 230 | my_a.enqueue(AsynchronousWork(my_b2, /*blocking=*/false)); |
| 231 | } |
| 232 | }; |
| 233 | |
| 234 | class MultipleMastersPart2 : NoAssign { |
| 235 | tbb::task_arena &my_a; |
| 236 | Harness::SpinBarrier &my_b; |
| 237 | public: |
| 238 | MultipleMastersPart2( tbb::task_arena &a, Harness::SpinBarrier &b) : my_a(a), my_b(b) {} |
| 239 | void operator()(int) const { |
| 240 | my_a.execute(AsynchronousWork(my_b, /*blocking=*/false)); |
| 241 | } |
| 242 | }; |
| 243 | |
| 244 | class MultipleMastersPart3 : NoAssign { |
| 245 | tbb::task_arena &my_a; |
| 246 | Harness::SpinBarrier &my_b; |
| 247 | |
| 248 | struct Runner : NoAssign { |
| 249 | tbb::task* const a_task; |
| 250 | Runner(tbb::task* const t) : a_task(t) {} |
| 251 | void operator()() const { |
| 252 | for ( volatile int i = 0; i < 10000; ++i ) |
| 253 | ; |
| 254 | a_task->decrement_ref_count(); |
| 255 | } |
| 256 | }; |
| 257 | |
| 258 | struct Waiter : NoAssign { |
| 259 | tbb::task* const a_task; |
| 260 | Waiter(tbb::task* const t) : a_task(t) {} |
| 261 | void operator()() const { |
| 262 | a_task->wait_for_all(); |
| 263 | } |
| 264 | }; |
| 265 | |
| 266 | public: |
| 267 | MultipleMastersPart3(tbb::task_arena &a, Harness::SpinBarrier &b) |
| 268 | : my_a(a), my_b(b) {} |
| 269 | void operator()(int idx) const { |
| 270 | tbb::empty_task* root_task = new(tbb::task::allocate_root()) tbb::empty_task; |
| 271 | my_b.timed_wait(10); // increases chances for task_arena initialization contention |
| 272 | for( int i=0; i<100; ++i) { |
| 273 | root_task->set_ref_count(2); |
| 274 | my_a.enqueue(Runner(root_task)); |
| 275 | my_a.execute(Waiter(root_task)); |
| 276 | } |
| 277 | tbb::task::destroy(*root_task); |
| 278 | REMARK("Master #%d: job completed, wait for others\n" , idx); |
| 279 | my_b.timed_wait(10); |
| 280 | } |
| 281 | }; |
| 282 | |
| 283 | class MultipleMastersPart4 : NoAssign { |
| 284 | tbb::task_arena &my_a; |
| 285 | Harness::SpinBarrier &my_b; |
| 286 | tbb::task_group_context *my_ag; |
| 287 | |
| 288 | struct Getter : NoAssign { |
| 289 | tbb::task_group_context *& my_g; |
| 290 | Getter(tbb::task_group_context *&a_g) : my_g(a_g) {} |
| 291 | void operator()() const { |
| 292 | my_g = tbb::task::self().group(); |
| 293 | } |
| 294 | }; |
| 295 | struct Checker : NoAssign { |
| 296 | tbb::task_group_context *my_g; |
| 297 | Checker(tbb::task_group_context *a_g) : my_g(a_g) {} |
| 298 | void operator()() const { |
| 299 | ASSERT(my_g == tbb::task::self().group(), NULL); |
| 300 | tbb::task *t = new( tbb::task::allocate_root() ) tbb::empty_task; |
| 301 | ASSERT(my_g == t->group(), NULL); |
| 302 | tbb::task::destroy(*t); |
| 303 | } |
| 304 | }; |
| 305 | struct NestedChecker : NoAssign { |
| 306 | const MultipleMastersPart4 &my_body; |
| 307 | NestedChecker(const MultipleMastersPart4 &b) : my_body(b) {} |
| 308 | void operator()() const { |
| 309 | tbb::task_group_context *nested_g = tbb::task::self().group(); |
| 310 | ASSERT(my_body.my_ag != nested_g, NULL); |
| 311 | tbb::task *t = new( tbb::task::allocate_root() ) tbb::empty_task; |
| 312 | ASSERT(nested_g == t->group(), NULL); |
| 313 | tbb::task::destroy(*t); |
| 314 | my_body.my_a.enqueue(Checker(my_body.my_ag)); |
| 315 | } |
| 316 | }; |
| 317 | public: |
| 318 | MultipleMastersPart4( tbb::task_arena &a, Harness::SpinBarrier &b) : my_a(a), my_b(b) { |
| 319 | my_a.execute(Getter(my_ag)); |
| 320 | } |
| 321 | // NativeParallelFor's functor |
| 322 | void operator()(int) const { |
| 323 | my_a.execute(*this); |
| 324 | } |
| 325 | // Arena's functor |
| 326 | void operator()() const { |
| 327 | Checker check(my_ag); |
| 328 | check(); |
| 329 | tbb::task_arena nested(1,1); |
| 330 | nested.execute(NestedChecker(*this)); // change arena |
| 331 | tbb::parallel_for(Range(0,1),*this); // change group context only |
| 332 | my_b.timed_wait(10); |
| 333 | my_a.execute(check); |
| 334 | check(); |
| 335 | } |
| 336 | // parallel_for's functor |
| 337 | void operator()(const Range &) const { |
| 338 | NestedChecker(*this)(); |
| 339 | my_a.execute(Checker(my_ag)); // restore arena context |
| 340 | } |
| 341 | }; |
| 342 | |
| 343 | void TestMultipleMasters(int p) { |
| 344 | { |
| 345 | REMARK("multiple masters, part 1\n" ); |
| 346 | ResetTLS(); |
| 347 | tbb::task_arena a(1,0); |
| 348 | a.initialize(); |
| 349 | ArenaObserver o(a, 1, 0, 1); |
| 350 | Harness::SpinBarrier barrier1(p), barrier2(2*p+1); // each of p threads will submit two tasks signaling the barrier |
| 351 | NativeParallelFor( p, MultipleMastersPart1(a, barrier1, barrier2) ); |
| 352 | barrier2.timed_wait(10); |
| 353 | a.debug_wait_until_empty(); |
| 354 | } { |
| 355 | REMARK("multiple masters, part 2\n" ); |
| 356 | ResetTLS(); |
| 357 | tbb::task_arena a(2,1); |
| 358 | ArenaObserver o(a, 2, 1, 2); |
| 359 | Harness::SpinBarrier barrier(p+2); |
| 360 | a.enqueue(AsynchronousWork(barrier, /*blocking=*/true)); // occupy the worker, a regression test for bug 1981 |
| 361 | NativeParallelFor( p, MultipleMastersPart2(a, barrier) ); |
| 362 | barrier.timed_wait(10); |
| 363 | a.debug_wait_until_empty(); |
| 364 | } { |
| 365 | // Regression test for the bug 1981 part 2 (task_arena::execute() with wait_for_all for an enqueued task) |
| 366 | REMARK("multiple masters, part 3: wait_for_all() in execute()\n" ); |
| 367 | tbb::task_arena a(p,1); |
| 368 | Harness::SpinBarrier barrier(p+1); // for masters to avoid endless waiting at least in some runs |
| 369 | // "Oversubscribe" the arena by 1 master thread |
| 370 | NativeParallelFor( p+1, MultipleMastersPart3(a, barrier) ); |
| 371 | a.debug_wait_until_empty(); |
| 372 | } { |
| 373 | int c = p%3? (p%2? p : 2) : 3; |
| 374 | REMARK("multiple masters, part 4: contexts, arena(%d)\n" , c); |
| 375 | ResetTLS(); |
| 376 | tbb::task_arena a(c, 1); |
| 377 | ArenaObserver o(a, c, 1, c); |
| 378 | Harness::SpinBarrier barrier(c); |
| 379 | MultipleMastersPart4 test(a, barrier); |
| 380 | NativeParallelFor(p, test); |
| 381 | a.debug_wait_until_empty(); |
| 382 | } |
| 383 | } |
| 384 | |
| 385 | //--------------------------------------------------// |
| 386 | // TODO: explain what TestArenaEntryConsistency does |
| 387 | #include <sstream> |
| 388 | #if TBB_USE_EXCEPTIONS |
| 389 | #include <stdexcept> |
| 390 | #include "tbb/tbb_exception.h" |
| 391 | #endif |
| 392 | |
| 393 | struct TestArenaEntryBody : FPModeContext { |
| 394 | tbb::atomic<int> &my_stage; // each execute increases it |
| 395 | std::stringstream my_id; |
| 396 | bool is_caught, is_expected; |
| 397 | enum { arenaFPMode = 1 }; |
| 398 | |
| 399 | TestArenaEntryBody(tbb::atomic<int> &s, int idx, int i) // init thread-specific instance |
| 400 | : FPModeContext(idx+i) |
| 401 | , my_stage(s) |
| 402 | , is_caught(false) |
| 403 | , is_expected( (idx&(1<<i)) != 0 && (TBB_USE_EXCEPTIONS) != 0 ) |
| 404 | { |
| 405 | my_id << idx << ':' << i << '@'; |
| 406 | } |
| 407 | void operator()() { // inside task_arena::execute() |
| 408 | // synchronize with other stages |
| 409 | int stage = my_stage++; |
| 410 | int slot = tbb::this_task_arena::current_thread_index(); |
| 411 | ASSERT(slot >= 0 && slot <= 1, "master or the only worker" ); |
| 412 | // wait until the third stage is delegated and then starts on slot 0 |
| 413 | while(my_stage < 2+slot) __TBB_Yield(); |
| 414 | // deduct its entry type and put it into id, it helps to find source of a problem |
| 415 | my_id << (stage < 3 ? (tbb::this_task_arena::current_thread_index()? |
| 416 | "delegated_to_worker" : stage < 2? "direct" : "delegated_to_master" ) |
| 417 | : stage == 3? "nested_same_ctx" : "nested_alien_ctx" ); |
| 418 | REMARK("running %s\n" , my_id.str().c_str()); |
| 419 | AssertFPMode(arenaFPMode); |
| 420 | if(is_expected) |
| 421 | __TBB_THROW(std::logic_error(my_id.str())); |
| 422 | // no code can be put here since exceptions can be thrown |
| 423 | } |
| 424 | void on_exception(const char *e) { // outside arena, in catch block |
| 425 | is_caught = true; |
| 426 | REMARK("caught %s\n" , e); |
| 427 | ASSERT(my_id.str() == e, NULL); |
| 428 | assertFPMode(); |
| 429 | } |
| 430 | void after_execute() { // outside arena and catch block |
| 431 | REMARK("completing %s\n" , my_id.str().c_str() ); |
| 432 | ASSERT(is_caught == is_expected, NULL); |
| 433 | assertFPMode(); |
| 434 | } |
| 435 | }; |
| 436 | |
| 437 | class ForEachArenaEntryBody : NoAssign { |
| 438 | tbb::task_arena &my_a; // expected task_arena(2,1) |
| 439 | tbb::atomic<int> &my_stage; // each execute increases it |
| 440 | int my_idx; |
| 441 | |
| 442 | public: |
| 443 | ForEachArenaEntryBody(tbb::task_arena &a, tbb::atomic<int> &c) |
| 444 | : my_a(a), my_stage(c), my_idx(0) {} |
| 445 | |
| 446 | void test(int idx) { |
| 447 | my_idx = idx; |
| 448 | my_stage = 0; |
| 449 | NativeParallelFor(3, *this); // test cross-arena calls |
| 450 | ASSERT(my_stage == 3, NULL); |
| 451 | my_a.execute(*this); // test nested calls |
| 452 | ASSERT(my_stage == 5, NULL); |
| 453 | } |
| 454 | |
| 455 | // task_arena functor for nested tests |
| 456 | void operator()() const { |
| 457 | test_arena_entry(3); // in current task group context |
| 458 | tbb::parallel_for(4, 5, *this); // in different context |
| 459 | } |
| 460 | |
| 461 | // NativeParallelFor & parallel_for functor |
| 462 | void operator()(int i) const { |
| 463 | test_arena_entry(i); |
| 464 | } |
| 465 | |
| 466 | private: |
| 467 | void test_arena_entry(int i) const { |
| 468 | TestArenaEntryBody scoped_functor(my_stage, my_idx, i); |
| 469 | __TBB_TRY { |
| 470 | my_a.execute(scoped_functor); |
| 471 | } |
| 472 | #if TBB_USE_EXCEPTIONS |
| 473 | catch(tbb::captured_exception &e) { |
| 474 | scoped_functor.on_exception(e.what()); |
| 475 | ASSERT_WARNING(TBB_USE_CAPTURED_EXCEPTION, "Caught captured_exception while expecting exact one" ); |
| 476 | } catch(std::logic_error &e) { |
| 477 | scoped_functor.on_exception(e.what()); |
| 478 | ASSERT(!TBB_USE_CAPTURED_EXCEPTION, "Caught exception of wrong type" ); |
| 479 | } catch(...) { ASSERT(false, "Unexpected exception type" ); } |
| 480 | #endif //TBB_USE_EXCEPTIONS |
| 481 | scoped_functor.after_execute(); |
| 482 | } |
| 483 | }; |
| 484 | |
| 485 | void TestArenaEntryConsistency() { |
| 486 | REMARK("test arena entry consistency\n" ); |
| 487 | |
| 488 | tbb::task_arena a(2, 1); |
| 489 | tbb::atomic<int> c; |
| 490 | ForEachArenaEntryBody body(a, c); |
| 491 | |
| 492 | FPModeContext fp_scope(TestArenaEntryBody::arenaFPMode); |
| 493 | a.initialize(); // capture FP settings to arena |
| 494 | fp_scope.setNextFPMode(); |
| 495 | |
| 496 | for (int i = 0; i < 100; i++) // not less than 32 = 2^5 of entry types |
| 497 | body.test(i); |
| 498 | } |
| 499 | |
| 500 | //-------------------------------------------------- |
| 501 | // Test that the requested degree of concurrency for task_arena is achieved in various conditions |
| 502 | class TestArenaConcurrencyBody : NoAssign { |
| 503 | tbb::task_arena &my_a; |
| 504 | int my_max_concurrency; |
| 505 | int my_reserved_slots; |
| 506 | Harness::SpinBarrier *my_barrier; |
| 507 | Harness::SpinBarrier *my_worker_barrier; |
| 508 | public: |
| 509 | TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, Harness::SpinBarrier *b = NULL, Harness::SpinBarrier *wb = NULL ) |
| 510 | : my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {} |
| 511 | // NativeParallelFor's functor |
| 512 | void operator()( int ) const { |
| 513 | ASSERT( local_id.local() == 0, "TLS was not cleaned?" ); |
| 514 | local_id.local() = 1; |
| 515 | my_a.execute( *this ); |
| 516 | } |
| 517 | // Arena's functor |
| 518 | void operator()() const { |
| 519 | ASSERT( tbb::task_arena::current_thread_index() == tbb::this_task_arena::current_thread_index(), NULL ); |
| 520 | int idx = tbb::this_task_arena::current_thread_index(); |
| 521 | ASSERT( idx < (my_max_concurrency > 1 ? my_max_concurrency : 2), NULL ); |
| 522 | ASSERT( my_a.max_concurrency() == tbb::this_task_arena::max_concurrency(), NULL ); |
| 523 | int max_arena_concurrency = tbb::this_task_arena::max_concurrency(); |
| 524 | ASSERT( max_arena_concurrency == my_max_concurrency, NULL ); |
| 525 | if ( my_worker_barrier ) { |
| 526 | if ( local_id.local() == 1 ) { |
| 527 | // Master thread in a reserved slot |
| 528 | ASSERT( idx < my_reserved_slots, "Masters are supposed to use only reserved slots in this test" ); |
| 529 | } else { |
| 530 | // Worker thread |
| 531 | ASSERT( idx >= my_reserved_slots, NULL ); |
| 532 | my_worker_barrier->timed_wait( 10 ); |
| 533 | } |
| 534 | } else if ( my_barrier ) |
| 535 | ASSERT( local_id.local() == 1, "Workers are not supposed to enter the arena in this test" ); |
| 536 | if ( my_barrier ) my_barrier->timed_wait( 10 ); |
| 537 | else Harness::Sleep( 10 ); |
| 538 | } |
| 539 | }; |
| 540 | |
| 541 | void TestArenaConcurrency( int p, int reserved = 0, int step = 1) { |
| 542 | for (; reserved <= p; reserved += step) { |
| 543 | REMARK("TestArenaConcurrency: %d slots, %d reserved\n" , p, reserved); |
| 544 | tbb::task_arena a( p, reserved ); |
| 545 | { // Check concurrency with worker & reserved master threads. |
| 546 | ResetTLS(); |
| 547 | Harness::SpinBarrier b( p ); |
| 548 | Harness::SpinBarrier wb( p-reserved ); |
| 549 | TestArenaConcurrencyBody test( a, p, reserved, &b, &wb ); |
| 550 | for ( int i = reserved; i < p; ++i ) |
| 551 | a.enqueue( test ); |
| 552 | if ( reserved==1 ) |
| 553 | test( 0 ); // calls execute() |
| 554 | else |
| 555 | NativeParallelFor( reserved, test ); |
| 556 | a.debug_wait_until_empty(); |
| 557 | } { // Check if multiple masters alone can achieve maximum concurrency. |
| 558 | ResetTLS(); |
| 559 | Harness::SpinBarrier b( p ); |
| 560 | NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) ); |
| 561 | a.debug_wait_until_empty(); |
| 562 | } { // Check oversubscription by masters. |
| 563 | ResetTLS(); |
| 564 | NativeParallelFor( 2*p, TestArenaConcurrencyBody( a, p, reserved ) ); |
| 565 | a.debug_wait_until_empty(); |
| 566 | } |
| 567 | } |
| 568 | } |
| 569 | |
| 570 | //--------------------------------------------------// |
| 571 | // Test creation/initialization of a task_arena that references an existing arena (aka attach). |
| 572 | // This part of the test uses the knowledge of task_arena internals |
| 573 | |
| 574 | typedef tbb::interface7::internal::task_arena_base task_arena_internals; |
| 575 | |
| 576 | struct TaskArenaValidator : public task_arena_internals { |
| 577 | int my_slot_at_construction; |
| 578 | TaskArenaValidator( const task_arena_internals& other ) |
| 579 | : task_arena_internals(other) /*copies the internal state of other*/ { |
| 580 | my_slot_at_construction = tbb::this_task_arena::current_thread_index(); |
| 581 | } |
| 582 | // Inspect the internal state |
| 583 | int concurrency() { return my_max_concurrency; } |
| 584 | int reserved_for_masters() { return (int)my_master_slots; } |
| 585 | |
| 586 | // This method should be called in task_arena::execute() for a captured arena |
| 587 | // by the same thread that created the validator. |
| 588 | void operator()() { |
| 589 | ASSERT( tbb::this_task_arena::current_thread_index()==my_slot_at_construction, |
| 590 | "Current thread index has changed since the validator construction" ); |
| 591 | //!deprecated |
| 592 | ASSERT( tbb::task_arena::current_thread_index()==my_slot_at_construction, |
| 593 | "Current thread index has changed since the validator construction" ); |
| 594 | } |
| 595 | }; |
| 596 | |
| 597 | void ValidateAttachedArena( tbb::task_arena& arena, bool expect_activated, |
| 598 | int expect_concurrency, int expect_masters ) { |
| 599 | ASSERT( arena.is_active()==expect_activated, "Unexpected activation state" ); |
| 600 | if( arena.is_active() ) { |
| 601 | TaskArenaValidator validator( arena ); |
| 602 | ASSERT( validator.concurrency()==expect_concurrency, "Unexpected arena size" ); |
| 603 | ASSERT( validator.reserved_for_masters()==expect_masters, "Unexpected # of reserved slots" ); |
| 604 | if ( tbb::this_task_arena::current_thread_index() != tbb::task_arena::not_initialized ) { |
| 605 | ASSERT( tbb::task_arena::current_thread_index() >= 0 && |
| 606 | tbb::this_task_arena::current_thread_index() >= 0, NULL); |
| 607 | // for threads already in arena, check that the thread index remains the same |
| 608 | arena.execute( validator ); |
| 609 | } else { // not_initialized |
| 610 | // Test the deprecated method |
| 611 | ASSERT( tbb::task_arena::current_thread_index()==-1, NULL); |
| 612 | } |
| 613 | |
| 614 | // Ideally, there should be a check for having the same internal arena object, |
| 615 | // but that object is not easily accessible for implicit arenas. |
| 616 | } |
| 617 | } |
| 618 | |
| 619 | struct TestAttachBody : NoAssign { |
| 620 | mutable int my_idx; // safe to modify and use within the NativeParallelFor functor |
| 621 | const int maxthread; |
| 622 | TestAttachBody( int max_thr ) : maxthread(max_thr) {} |
| 623 | |
| 624 | // The functor body for NativeParallelFor |
| 625 | void operator()( int idx ) const { |
| 626 | my_idx = idx; |
| 627 | int default_threads = tbb::task_scheduler_init::default_num_threads(); |
| 628 | |
| 629 | tbb::task_arena arena = tbb::task_arena( tbb::task_arena::attach() ); |
| 630 | ValidateAttachedArena( arena, false, -1, -1 ); // Nothing yet to attach to |
| 631 | |
| 632 | { // attach to an arena created via task_scheduler_init |
| 633 | tbb::task_scheduler_init init( idx+1 ); |
| 634 | |
| 635 | tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() ); |
| 636 | ValidateAttachedArena( arena2, true, idx+1, 1 ); |
| 637 | |
| 638 | arena.initialize( tbb::task_arena::attach() ); |
| 639 | } |
| 640 | ValidateAttachedArena( arena, true, idx+1, 1 ); |
| 641 | |
| 642 | arena.terminate(); |
| 643 | ValidateAttachedArena( arena, false, -1, -1 ); |
| 644 | |
| 645 | // Check default behavior when attach cannot succeed |
| 646 | switch (idx%2) { |
| 647 | case 0: |
| 648 | { // construct as attached, then initialize |
| 649 | tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() ); |
| 650 | ValidateAttachedArena( arena2, false, -1, -1 ); |
| 651 | arena2.initialize(); // must be initialized with default parameters |
| 652 | ValidateAttachedArena( arena2, true, default_threads, 1 ); |
| 653 | } |
| 654 | break; |
| 655 | case 1: |
| 656 | { // default-construct, then initialize as attached |
| 657 | tbb::task_arena arena2; |
| 658 | ValidateAttachedArena( arena2, false, -1, -1 ); |
| 659 | arena2.initialize( tbb::task_arena::attach() ); // must use default parameters |
| 660 | ValidateAttachedArena( arena2, true, default_threads, 1 ); |
| 661 | } |
| 662 | break; |
| 663 | } // switch |
| 664 | |
| 665 | // attach to an auto-initialized arena |
| 666 | tbb::empty_task& tsk = *new (tbb::task::allocate_root()) tbb::empty_task; |
| 667 | tbb::task::spawn_root_and_wait(tsk); |
| 668 | tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() ); |
| 669 | ValidateAttachedArena( arena2, true, default_threads, 1 ); |
| 670 | |
| 671 | // attach to another task_arena |
| 672 | arena.initialize( maxthread, min(maxthread,idx) ); |
| 673 | arena.execute( *this ); |
| 674 | } |
| 675 | |
| 676 | // The functor body for task_arena::execute above |
| 677 | void operator()() const { |
| 678 | tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() ); |
| 679 | ValidateAttachedArena( arena2, true, maxthread, min(maxthread,my_idx) ); |
| 680 | } |
| 681 | |
| 682 | // The functor body for tbb::parallel_for |
| 683 | void operator()( const Range& r ) const { |
| 684 | for( int i = r.begin(); i<r.end(); ++i ) { |
| 685 | tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() ); |
| 686 | ValidateAttachedArena( arena2, true, maxthread+1, 1 ); // +1 to match initialization in TestMain |
| 687 | } |
| 688 | } |
| 689 | }; |
| 690 | |
| 691 | void TestAttach( int maxthread ) { |
| 692 | REMARK( "Testing attached task_arenas\n" ); |
| 693 | // Externally concurrent, but no concurrency within a thread |
| 694 | NativeParallelFor( max(maxthread,4), TestAttachBody( maxthread ) ); |
| 695 | // Concurrent within the current arena; may also serve as a stress test |
| 696 | tbb::parallel_for( Range(0,10000*maxthread), TestAttachBody( maxthread ) ); |
| 697 | } |
| 698 | |
| 699 | //--------------------------------------------------// |
| 700 | // Test that task_arena::enqueue does not tolerate a non-const functor. |
| 701 | // TODO: can it be reworked as SFINAE-based compile-time check? |
| 702 | struct TestFunctor { |
| 703 | void operator()() { ASSERT( false, "Non-const operator called" ); } |
| 704 | void operator()() const { /* library requires this overload only */ } |
| 705 | }; |
| 706 | |
| 707 | void TestConstantFunctorRequirement() { |
| 708 | tbb::task_arena a; |
| 709 | TestFunctor tf; |
| 710 | a.enqueue( tf ); |
| 711 | #if __TBB_TASK_PRIORITY |
| 712 | a.enqueue( tf, tbb::priority_normal ); |
| 713 | #endif |
| 714 | } |
| 715 | //--------------------------------------------------// |
| 716 | #if __TBB_TASK_ISOLATION |
| 717 | #include "tbb/parallel_reduce.h" |
| 718 | #include "tbb/parallel_invoke.h" |
| 719 | // Test this_task_arena::isolate |
| 720 | namespace TestIsolatedExecuteNS { |
| 721 | //--------------------------------------------------// |
| 722 | template <typename NestedPartitioner> |
| 723 | class NestedParFor : NoAssign { |
| 724 | public: |
| 725 | NestedParFor() {} |
| 726 | void operator()() const { |
| 727 | NestedPartitioner p; |
| 728 | tbb::parallel_for( 0, 10, Harness::DummyBody( 10 ), p ); |
| 729 | } |
| 730 | }; |
| 731 | |
| 732 | template <typename NestedPartitioner> |
| 733 | class ParForBody : NoAssign { |
| 734 | bool myOuterIsolation; |
| 735 | tbb::enumerable_thread_specific<int> &myEts; |
| 736 | tbb::atomic<bool> &myIsStolen; |
| 737 | public: |
| 738 | ParForBody( bool outer_isolation, tbb::enumerable_thread_specific<int> &ets, tbb::atomic<bool> &is_stolen ) |
| 739 | : myOuterIsolation( outer_isolation ), myEts( ets ), myIsStolen( is_stolen ) {} |
| 740 | void operator()( int ) const { |
| 741 | int &e = myEts.local(); |
| 742 | if ( e++ > 0 ) myIsStolen = true; |
| 743 | if ( myOuterIsolation ) |
| 744 | NestedParFor<NestedPartitioner>()(); |
| 745 | else |
| 746 | tbb::this_task_arena::isolate( NestedParFor<NestedPartitioner>() ); |
| 747 | --e; |
| 748 | } |
| 749 | }; |
| 750 | |
| 751 | template <typename OuterPartitioner, typename NestedPartitioner> |
| 752 | class OuterParFor : NoAssign { |
| 753 | bool myOuterIsolation; |
| 754 | tbb::atomic<bool> &myIsStolen; |
| 755 | public: |
| 756 | OuterParFor( bool outer_isolation, tbb::atomic<bool> &is_stolen ) : myOuterIsolation( outer_isolation ), myIsStolen( is_stolen ) {} |
| 757 | void operator()() const { |
| 758 | tbb::enumerable_thread_specific<int> ets( 0 ); |
| 759 | OuterPartitioner p; |
| 760 | tbb::parallel_for( 0, 1000, ParForBody<NestedPartitioner>( myOuterIsolation, ets, myIsStolen ), p ); |
| 761 | } |
| 762 | }; |
| 763 | |
| 764 | template <typename OuterPartitioner, typename NestedPartitioner> |
| 765 | void TwoLoopsTest( bool outer_isolation ) { |
| 766 | tbb::atomic<bool> is_stolen; |
| 767 | is_stolen = false; |
| 768 | const int max_repeats = 100; |
| 769 | if ( outer_isolation ) { |
| 770 | for ( int i = 0; i <= max_repeats; ++i ) { |
| 771 | tbb::this_task_arena::isolate( OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen ) ); |
| 772 | if ( is_stolen ) break; |
| 773 | } |
| 774 | ASSERT_WARNING( is_stolen, "isolate() should not block stealing on nested levels without isolation" ); |
| 775 | } else { |
| 776 | for ( int i = 0; i <= max_repeats; ++i ) { |
| 777 | OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen )(); |
| 778 | } |
| 779 | ASSERT( !is_stolen, "isolate() on nested levels should prevent stealing from outer leves" ); |
| 780 | } |
| 781 | } |
| 782 | |
| 783 | void TwoLoopsTest( bool outer_isolation ) { |
| 784 | TwoLoopsTest<tbb::simple_partitioner, tbb::simple_partitioner>( outer_isolation ); |
| 785 | TwoLoopsTest<tbb::simple_partitioner, tbb::affinity_partitioner>( outer_isolation ); |
| 786 | TwoLoopsTest<tbb::affinity_partitioner, tbb::simple_partitioner>( outer_isolation ); |
| 787 | TwoLoopsTest<tbb::affinity_partitioner, tbb::affinity_partitioner>( outer_isolation ); |
| 788 | } |
| 789 | |
| 790 | void TwoLoopsTest() { |
| 791 | TwoLoopsTest( true ); |
| 792 | TwoLoopsTest( false ); |
| 793 | } |
| 794 | //--------------------------------------------------// |
| 795 | class HeavyMixTestBody : NoAssign { |
| 796 | tbb::enumerable_thread_specific<Harness::FastRandom>& myRandom; |
| 797 | tbb::enumerable_thread_specific<int>& myIsolatedLevel; |
| 798 | int myNestedLevel; |
| 799 | bool myHighPriority; |
| 800 | |
| 801 | template <typename Partitioner, typename Body> |
| 802 | static void RunTwoBodies( Harness::FastRandom& rnd, const Body &body, Partitioner& p, tbb::task_group_context* ctx = NULL ) { |
| 803 | if ( rnd.get() % 2 ) |
| 804 | if (ctx ) |
| 805 | tbb::parallel_for( 0, 2, body, p, *ctx ); |
| 806 | else |
| 807 | tbb::parallel_for( 0, 2, body, p ); |
| 808 | else |
| 809 | tbb::parallel_invoke( body, body ); |
| 810 | } |
| 811 | |
| 812 | template <typename Partitioner> |
| 813 | class IsolatedBody : NoAssign { |
| 814 | const HeavyMixTestBody &myHeavyMixTestBody; |
| 815 | Partitioner &myPartitioner; |
| 816 | public: |
| 817 | IsolatedBody( const HeavyMixTestBody &body, Partitioner &partitioner ) |
| 818 | : myHeavyMixTestBody( body ), myPartitioner( partitioner ) {} |
| 819 | void operator()() const { |
| 820 | RunTwoBodies( myHeavyMixTestBody.myRandom.local(), |
| 821 | HeavyMixTestBody( myHeavyMixTestBody.myRandom, myHeavyMixTestBody.myIsolatedLevel, |
| 822 | myHeavyMixTestBody.myNestedLevel + 1, myHeavyMixTestBody.myHighPriority ), |
| 823 | myPartitioner ); |
| 824 | } |
| 825 | }; |
| 826 | |
| 827 | template <typename Partitioner> |
| 828 | void RunNextLevel( Harness::FastRandom& rnd, int &isolated_level ) const { |
| 829 | Partitioner p; |
| 830 | switch ( rnd.get() % 3 ) { |
| 831 | case 0: { |
| 832 | // No features |
| 833 | tbb::task_group_context ctx; |
| 834 | if ( myHighPriority ) |
| 835 | ctx.set_priority( tbb::priority_high ); |
| 836 | RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1, myHighPriority), p, &ctx ); |
| 837 | break; |
| 838 | } |
| 839 | case 1: { |
| 840 | // High priority |
| 841 | tbb::task_group_context ctx; |
| 842 | ctx.set_priority( tbb::priority_high ); |
| 843 | RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1, true), p, &ctx ); |
| 844 | break; |
| 845 | } |
| 846 | case 2: { |
| 847 | // Isolation |
| 848 | int previous_isolation = isolated_level; |
| 849 | isolated_level = myNestedLevel; |
| 850 | tbb::this_task_arena::isolate( IsolatedBody<Partitioner>( *this, p ) ); |
| 851 | isolated_level = previous_isolation; |
| 852 | break; |
| 853 | } |
| 854 | } |
| 855 | } |
| 856 | public: |
| 857 | HeavyMixTestBody( tbb::enumerable_thread_specific<Harness::FastRandom>& random, |
| 858 | tbb::enumerable_thread_specific<int>& isolated_level, int nested_level, bool high_priority ) |
| 859 | : myRandom( random ), myIsolatedLevel( isolated_level ) |
| 860 | , myNestedLevel( nested_level ), myHighPriority( high_priority ) {} |
| 861 | void operator()() const { |
| 862 | int &isolated_level = myIsolatedLevel.local(); |
| 863 | ASSERT( myNestedLevel > isolated_level, "The outer-level task should not be stolen on isolated level" ); |
| 864 | if ( myNestedLevel == 20 ) |
| 865 | return; |
| 866 | Harness::FastRandom &rnd = myRandom.local(); |
| 867 | if ( rnd.get() % 2 == 1 ) { |
| 868 | RunNextLevel<tbb::auto_partitioner>( rnd, isolated_level ); |
| 869 | } else { |
| 870 | RunNextLevel<tbb::affinity_partitioner>( rnd, isolated_level ); |
| 871 | } |
| 872 | } |
| 873 | void operator()(int) const { |
| 874 | this->operator()(); |
| 875 | } |
| 876 | }; |
| 877 | |
| 878 | struct RandomInitializer { |
| 879 | Harness::FastRandom operator()() { |
| 880 | return Harness::FastRandom( tbb::this_task_arena::current_thread_index() ); |
| 881 | } |
| 882 | }; |
| 883 | |
| 884 | void HeavyMixTest() { |
| 885 | tbb::task_scheduler_init init( tbb::task_scheduler_init::default_num_threads() < 3 ? 3 : tbb::task_scheduler_init::automatic ); |
| 886 | RandomInitializer init_random; |
| 887 | tbb::enumerable_thread_specific<Harness::FastRandom> random( init_random ); |
| 888 | tbb::enumerable_thread_specific<int> isolated_level( 0 ); |
| 889 | for ( int i = 0; i < 5; ++i ) { |
| 890 | HeavyMixTestBody b( random, isolated_level, 1, false ); |
| 891 | b( 0 ); |
| 892 | REMARK( "." ); |
| 893 | } |
| 894 | } |
| 895 | //--------------------------------------------------// |
| 896 | struct ContinuationTestReduceBody : NoAssign { |
| 897 | tbb::internal::isolation_tag myIsolation; |
| 898 | ContinuationTestReduceBody( tbb::internal::isolation_tag isolation ) : myIsolation( isolation ) {} |
| 899 | ContinuationTestReduceBody( ContinuationTestReduceBody& b, tbb::split ) : myIsolation( b.myIsolation ) {} |
| 900 | void operator()( tbb::blocked_range<int> ) {} |
| 901 | void join( ContinuationTestReduceBody& ) { |
| 902 | tbb::internal::isolation_tag isolation = tbb::task::self().prefix().isolation; |
| 903 | ASSERT( isolation == myIsolation, "The continuations should preserve children's isolation" ); |
| 904 | } |
| 905 | }; |
| 906 | struct ContinuationTestIsolated { |
| 907 | void operator()() const { |
| 908 | ContinuationTestReduceBody b( tbb::task::self().prefix().isolation ); |
| 909 | tbb::parallel_deterministic_reduce( tbb::blocked_range<int>( 0, 100 ), b ); |
| 910 | } |
| 911 | }; |
| 912 | struct ContinuationTestParForBody : NoAssign { |
| 913 | tbb::enumerable_thread_specific<int> &myEts; |
| 914 | public: |
| 915 | ContinuationTestParForBody( tbb::enumerable_thread_specific<int> &ets ) : myEts( ets ){} |
| 916 | void operator()( int ) const { |
| 917 | int &e = myEts.local(); |
| 918 | ++e; |
| 919 | ASSERT( e==1, "The task is stolen on isolated level" ); |
| 920 | tbb::this_task_arena::isolate( ContinuationTestIsolated() ); |
| 921 | --e; |
| 922 | } |
| 923 | }; |
| 924 | void ContinuationTest() { |
| 925 | for ( int i = 0; i < 5; ++i ) { |
| 926 | tbb::enumerable_thread_specific<int> myEts; |
| 927 | tbb::parallel_for( 0, 100, ContinuationTestParForBody( myEts ), tbb::simple_partitioner() ); |
| 928 | } |
| 929 | } |
| 930 | //--------------------------------------------------// |
| 931 | #if TBB_USE_EXCEPTIONS |
| 932 | struct MyException {}; |
| 933 | struct IsolatedBodyThrowsException { |
| 934 | void operator()() const { |
| 935 | __TBB_THROW( MyException() ); |
| 936 | } |
| 937 | }; |
| 938 | struct ExceptionTestBody : NoAssign { |
| 939 | tbb::enumerable_thread_specific<int>& myEts; |
| 940 | tbb::atomic<bool>& myIsStolen; |
| 941 | ExceptionTestBody( tbb::enumerable_thread_specific<int>& ets, tbb::atomic<bool>& is_stolen ) |
| 942 | : myEts( ets ), myIsStolen( is_stolen ) {} |
| 943 | void operator()( int i ) const { |
| 944 | try { |
| 945 | tbb::this_task_arena::isolate( IsolatedBodyThrowsException() ); |
| 946 | ASSERT( false, "The exception has been lost" ); |
| 947 | } |
| 948 | catch ( MyException ) {} |
| 949 | catch ( ... ) { |
| 950 | ASSERT( false, "Unexpected exception" ); |
| 951 | } |
| 952 | // Check that nested algorithms can steal outer-level tasks |
| 953 | int &e = myEts.local(); |
| 954 | if ( e++ > 0 ) myIsStolen = true; |
| 955 | // work imbalance increases chances for stealing |
| 956 | tbb::parallel_for( 0, 10+i, Harness::DummyBody( 100 ) ); |
| 957 | --e; |
| 958 | } |
| 959 | }; |
| 960 | |
| 961 | #endif /* TBB_USE_EXCEPTIONS */ |
| 962 | void ExceptionTest() { |
| 963 | #if TBB_USE_EXCEPTIONS |
| 964 | tbb::enumerable_thread_specific<int> ets; |
| 965 | tbb::atomic<bool> is_stolen; |
| 966 | is_stolen = false; |
| 967 | for ( int i = 0; i<10; ++i ) { |
| 968 | tbb::parallel_for( 0, 1000, ExceptionTestBody( ets, is_stolen ) ); |
| 969 | if ( is_stolen ) break; |
| 970 | } |
| 971 | ASSERT( is_stolen, "isolate should not affect non-isolated work" ); |
| 972 | #endif /* TBB_USE_EXCEPTIONS */ |
| 973 | } |
| 974 | |
| 975 | struct NonConstBody { |
| 976 | unsigned int state; |
| 977 | void operator()() { |
| 978 | state ^= ~0u; |
| 979 | } |
| 980 | }; |
| 981 | |
| 982 | void TestNonConstBody() { |
| 983 | NonConstBody body; |
| 984 | body.state = 0x6c97d5ed; |
| 985 | tbb::this_task_arena::isolate(body); |
| 986 | ASSERT(body.state == 0x93682a12, "The wrong state" ); |
| 987 | } |
| 988 | |
| 989 | class TestEnqueueTask : public tbb::task { |
| 990 | bool enqueued; |
| 991 | tbb::enumerable_thread_specific<bool>& executed; |
| 992 | tbb::atomic<int>& completed; |
| 993 | public: |
| 994 | static const int N = 100; |
| 995 | |
| 996 | TestEnqueueTask(bool enq, tbb::enumerable_thread_specific<bool>& exe, tbb::atomic<int>& c) |
| 997 | : enqueued(enq), executed(exe), completed(c) {} |
| 998 | tbb::task* execute() __TBB_override { |
| 999 | if (enqueued) { |
| 1000 | executed.local() = true; |
| 1001 | ++completed; |
| 1002 | __TBB_Yield(); |
| 1003 | } else { |
| 1004 | parent()->add_ref_count(N); |
| 1005 | for (int i = 0; i < N; ++i) |
| 1006 | tbb::task::enqueue(*new (parent()->allocate_child()) TestEnqueueTask(true, executed, completed)); |
| 1007 | } |
| 1008 | return NULL; |
| 1009 | } |
| 1010 | }; |
| 1011 | |
| 1012 | class TestEnqueueIsolateBody : NoCopy { |
| 1013 | tbb::enumerable_thread_specific<bool>& executed; |
| 1014 | tbb::atomic<int>& completed; |
| 1015 | public: |
| 1016 | TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool>& exe, tbb::atomic<int>& c) |
| 1017 | : executed(exe), completed(c) {} |
| 1018 | void operator()() { |
| 1019 | tbb::task::spawn_root_and_wait(*new (tbb::task::allocate_root()) TestEnqueueTask(false, executed, completed)); |
| 1020 | } |
| 1021 | }; |
| 1022 | |
| 1023 | void TestEnqueue() { |
| 1024 | tbb::enumerable_thread_specific<bool> executed(false); |
| 1025 | tbb::atomic<int> completed; |
| 1026 | |
| 1027 | // Check that the main thread can process enqueued tasks. |
| 1028 | completed = 0; |
| 1029 | TestEnqueueIsolateBody b1(executed, completed); |
| 1030 | b1(); |
| 1031 | if (!executed.local()) |
| 1032 | REPORT("Warning: No one enqueued task has executed by the main thread.\n" ); |
| 1033 | |
| 1034 | executed.local() = false; |
| 1035 | completed = 0; |
| 1036 | const int N = 100; |
| 1037 | // Create enqueued tasks out of isolation. |
| 1038 | for (int i = 0; i < N; ++i) |
| 1039 | tbb::task::enqueue(*new (tbb::task::allocate_root()) TestEnqueueTask(true, executed, completed)); |
| 1040 | TestEnqueueIsolateBody b2(executed, completed); |
| 1041 | tbb::this_task_arena::isolate(b2); |
| 1042 | ASSERT(executed.local() == false, "An enqueued task was executed within isolate." ); |
| 1043 | |
| 1044 | while (completed < TestEnqueueTask::N + N) __TBB_Yield(); |
| 1045 | } |
| 1046 | } |
| 1047 | |
| 1048 | void TestIsolatedExecute() { |
| 1049 | REMARK("TestIsolatedExecute" ); |
| 1050 | // At least 3 threads (owner + 2 thieves) are required to reproduce a situation when the owner steals outer |
| 1051 | // level task on a nested level. If we have only one thief then it will execute outer level tasks first and |
| 1052 | // the owner will not have a possibility to steal outer level tasks. |
| 1053 | int num_threads = min( tbb::task_scheduler_init::default_num_threads(), 3 ); |
| 1054 | { |
| 1055 | // Too many threads require too many work to reproduce the stealing from outer level. |
| 1056 | tbb::task_scheduler_init init( max(num_threads, 7) ); |
| 1057 | REMARK("." ); TestIsolatedExecuteNS::TwoLoopsTest(); |
| 1058 | REMARK("." ); TestIsolatedExecuteNS::HeavyMixTest(); |
| 1059 | REMARK("." ); TestIsolatedExecuteNS::ContinuationTest(); |
| 1060 | REMARK("." ); TestIsolatedExecuteNS::ExceptionTest(); |
| 1061 | } |
| 1062 | tbb::task_scheduler_init init(num_threads); |
| 1063 | REMARK("." ); TestIsolatedExecuteNS::HeavyMixTest(); |
| 1064 | REMARK("." ); TestIsolatedExecuteNS::ContinuationTest(); |
| 1065 | REMARK("." ); TestIsolatedExecuteNS::TestNonConstBody(); |
| 1066 | REMARK("." ); TestIsolatedExecuteNS::TestEnqueue(); |
| 1067 | REMARK("\rTestIsolatedExecute: done \n" ); |
| 1068 | } |
| 1069 | #endif /* __TBB_TASK_ISOLATION */ |
| 1070 | //--------------------------------------------------// |
| 1071 | //--------------------------------------------------// |
| 1072 | |
| 1073 | class TestDelegatedSpawnWaitBody : NoAssign { |
| 1074 | tbb::task_arena &my_a; |
| 1075 | Harness::SpinBarrier &my_b1, &my_b2; |
| 1076 | |
| 1077 | struct Spawner : NoAssign { |
| 1078 | tbb::task* const a_task; |
| 1079 | Spawner(tbb::task* const t) : a_task(t) {} |
| 1080 | void operator()() const { |
| 1081 | tbb::task::spawn( *new(a_task->allocate_child()) tbb::empty_task ); |
| 1082 | } |
| 1083 | }; |
| 1084 | |
| 1085 | struct Waiter : NoAssign { |
| 1086 | tbb::task* const a_task; |
| 1087 | Waiter(tbb::task* const t) : a_task(t) {} |
| 1088 | void operator()() const { |
| 1089 | a_task->wait_for_all(); |
| 1090 | } |
| 1091 | }; |
| 1092 | |
| 1093 | public: |
| 1094 | TestDelegatedSpawnWaitBody( tbb::task_arena &a, Harness::SpinBarrier &b1, Harness::SpinBarrier &b2) |
| 1095 | : my_a(a), my_b1(b1), my_b2(b2) {} |
| 1096 | // NativeParallelFor's functor |
| 1097 | void operator()(int idx) const { |
| 1098 | if ( idx==0 ) { // thread 0 works in the arena, thread 1 waits for it (to prevent test hang) |
| 1099 | for( int i=0; i<2; ++i ) my_a.enqueue(*this); // tasks to sync with workers |
| 1100 | tbb::empty_task* root_task = new(tbb::task::allocate_root()) tbb::empty_task; |
| 1101 | root_task->set_ref_count(100001); |
| 1102 | my_b1.timed_wait(10); // sync with the workers |
| 1103 | for( int i=0; i<100000; ++i) { |
| 1104 | my_a.execute(Spawner(root_task)); |
| 1105 | } |
| 1106 | my_a.execute(Waiter(root_task)); |
| 1107 | tbb::task::destroy(*root_task); |
| 1108 | } |
| 1109 | my_b2.timed_wait(10); // sync both threads |
| 1110 | } |
| 1111 | // Arena's functor |
| 1112 | void operator()() const { |
| 1113 | my_b1.timed_wait(10); // sync with the arena master |
| 1114 | } |
| 1115 | }; |
| 1116 | |
| 1117 | void TestDelegatedSpawnWait() { |
| 1118 | // Regression test for a bug with missed wakeup notification from a delegated task |
| 1119 | REMARK( "Testing delegated spawn & wait\n" ); |
| 1120 | tbb::task_arena a(2,0); |
| 1121 | a.initialize(); |
| 1122 | Harness::SpinBarrier barrier1(3), barrier2(2); |
| 1123 | NativeParallelFor( 2, TestDelegatedSpawnWaitBody(a, barrier1, barrier2) ); |
| 1124 | a.debug_wait_until_empty(); |
| 1125 | } |
| 1126 | |
| 1127 | class TestMultipleWaitsArenaWait : NoAssign { |
| 1128 | public: |
| 1129 | TestMultipleWaitsArenaWait( int idx, int bunch_size, int num_tasks, tbb::task** waiters, tbb::atomic<int>& processed ) |
| 1130 | : my_idx( idx ), my_bunch_size( bunch_size ), my_num_tasks(num_tasks), my_waiters( waiters ), my_processed( processed ) {} |
| 1131 | void operator()() const { |
| 1132 | ++my_processed; |
| 1133 | // Wait for all tasks |
| 1134 | if ( my_idx < my_num_tasks ) |
| 1135 | my_waiters[my_idx]->wait_for_all(); |
| 1136 | // Signal waiting tasks |
| 1137 | if ( my_idx >= my_bunch_size ) |
| 1138 | my_waiters[my_idx-my_bunch_size]->decrement_ref_count(); |
| 1139 | } |
| 1140 | private: |
| 1141 | int my_idx; |
| 1142 | int my_bunch_size; |
| 1143 | int my_num_tasks; |
| 1144 | tbb::task** my_waiters; |
| 1145 | tbb::atomic<int>& my_processed; |
| 1146 | }; |
| 1147 | |
| 1148 | class TestMultipleWaitsThreadBody : NoAssign { |
| 1149 | public: |
| 1150 | TestMultipleWaitsThreadBody( int bunch_size, int num_tasks, tbb::task_arena& a, tbb::task** waiters, tbb::atomic<int>& processed ) |
| 1151 | : my_bunch_size( bunch_size ), my_num_tasks( num_tasks ), my_arena( a ), my_waiters( waiters ), my_processed( processed ) {} |
| 1152 | void operator()( int idx ) const { |
| 1153 | my_arena.execute( TestMultipleWaitsArenaWait( idx, my_bunch_size, my_num_tasks, my_waiters, my_processed ) ); |
| 1154 | --my_processed; |
| 1155 | } |
| 1156 | private: |
| 1157 | int my_bunch_size; |
| 1158 | int my_num_tasks; |
| 1159 | tbb::task_arena& my_arena; |
| 1160 | tbb::task** my_waiters; |
| 1161 | tbb::atomic<int>& my_processed; |
| 1162 | }; |
| 1163 | |
| 1164 | #include "tbb/tbb_thread.h" |
| 1165 | |
| 1166 | void TestMultipleWaits( int num_threads, int num_bunches, int bunch_size ) { |
| 1167 | tbb::task_arena a( num_threads ); |
| 1168 | const int num_tasks = (num_bunches-1)*bunch_size; |
| 1169 | tbb::task** tasks = new tbb::task*[num_tasks]; |
| 1170 | for ( int i = 0; i<num_tasks; ++i ) |
| 1171 | tasks[i] = new (tbb::task::allocate_root()) tbb::empty_task(); |
| 1172 | tbb::atomic<int> processed; |
| 1173 | processed = 0; |
| 1174 | for ( int repeats = 0; repeats<10; ++repeats ) { |
| 1175 | int idx = 0; |
| 1176 | for ( int bunch = 0; bunch < num_bunches-1; ++bunch ) { |
| 1177 | // Sync with the previous bunch of tasks to prevent "false" nested dependicies (when a nested task waits for an outer task). |
| 1178 | while ( processed < bunch*bunch_size ) __TBB_Yield(); |
| 1179 | // Run the bunch of threads/tasks that depend on the next bunch of threads/tasks. |
| 1180 | for ( int i = 0; i<bunch_size; ++i ) { |
| 1181 | tasks[idx]->set_ref_count( 2 ); |
| 1182 | tbb::tbb_thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, tasks, processed ), idx++ ).detach(); |
| 1183 | } |
| 1184 | } |
| 1185 | // No sync because the threads of the last bunch do not call wait_for_all. |
| 1186 | // Run the last bunch of threads. |
| 1187 | for ( int i = 0; i<bunch_size; ++i ) |
| 1188 | tbb::tbb_thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, tasks, processed ), idx++ ).detach(); |
| 1189 | while ( processed ) __TBB_Yield(); |
| 1190 | } |
| 1191 | for ( int i = 0; i<num_tasks; ++i ) |
| 1192 | tbb::task::destroy( *tasks[i] ); |
| 1193 | delete[] tasks; |
| 1194 | } |
| 1195 | |
| 1196 | void TestMultipleWaits() { |
| 1197 | REMARK( "Testing multiple waits\n" ); |
| 1198 | // Limit the number of threads to prevent heavy oversubscription. |
| 1199 | const int max_threads = min( 16, tbb::task_scheduler_init::default_num_threads() ); |
| 1200 | |
| 1201 | Harness::FastRandom rnd(1234); |
| 1202 | for ( int threads = 1; threads <= max_threads; threads += max( threads/2, 1 ) ) { |
| 1203 | for ( int i = 0; i<3; ++i ) { |
| 1204 | const int num_bunches = 3 + rnd.get()%3; |
| 1205 | const int bunch_size = max_threads + rnd.get()%max_threads; |
| 1206 | TestMultipleWaits( threads, num_bunches, bunch_size ); |
| 1207 | } |
| 1208 | } |
| 1209 | } |
| 1210 | //--------------------------------------------------// |
| 1211 | #include "tbb/global_control.h" |
| 1212 | |
| 1213 | void TestSmallStackSize() { |
| 1214 | tbb::task_scheduler_init init(tbb::task_scheduler_init::automatic, |
| 1215 | tbb::global_control::active_value(tbb::global_control::thread_stack_size) / 2 ); |
| 1216 | // The test produces the warning (not a error) if fails. So the test is run many times |
| 1217 | // to make the log annoying (to force to consider it as an error). |
| 1218 | for (int i = 0; i < 100; ++i) { |
| 1219 | tbb::task_arena a; |
| 1220 | a.initialize(); |
| 1221 | } |
| 1222 | } |
| 1223 | //--------------------------------------------------// |
| 1224 | #if __TBB_CPP11_RVALUE_REF_PRESENT |
| 1225 | namespace TestMoveSemanticsNS { |
| 1226 | struct TestFunctor { |
| 1227 | void operator()() const {}; |
| 1228 | }; |
| 1229 | |
| 1230 | struct MoveOnlyFunctor : MoveOnly, TestFunctor { |
| 1231 | MoveOnlyFunctor() : MoveOnly() {}; |
| 1232 | MoveOnlyFunctor(MoveOnlyFunctor&& other) : MoveOnly(std::move(other)) {}; |
| 1233 | }; |
| 1234 | |
| 1235 | struct MovePreferableFunctor : Movable, TestFunctor { |
| 1236 | MovePreferableFunctor() : Movable() {}; |
| 1237 | MovePreferableFunctor(MovePreferableFunctor&& other) : Movable( std::move(other) ) {}; |
| 1238 | MovePreferableFunctor(const MovePreferableFunctor& other) : Movable(other) {}; |
| 1239 | }; |
| 1240 | |
| 1241 | struct NoMoveNoCopyFunctor : NoCopy, TestFunctor { |
| 1242 | NoMoveNoCopyFunctor() : NoCopy() {}; |
| 1243 | // mv ctor is not allowed as cp ctor from parent NoCopy |
| 1244 | private: |
| 1245 | NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&); |
| 1246 | }; |
| 1247 | |
| 1248 | |
| 1249 | void TestFunctors() { |
| 1250 | tbb::task_arena ta; |
| 1251 | MovePreferableFunctor mpf; |
| 1252 | // execute() doesn't have any copies or moves of arguments inside the impl |
| 1253 | ta.execute( NoMoveNoCopyFunctor() ); |
| 1254 | |
| 1255 | ta.enqueue( MoveOnlyFunctor() ); |
| 1256 | ta.enqueue( mpf ); |
| 1257 | ASSERT(mpf.alive, "object was moved when was passed by lval" ); |
| 1258 | mpf.Reset(); |
| 1259 | ta.enqueue( std::move(mpf) ); |
| 1260 | ASSERT(!mpf.alive, "object was copied when was passed by rval" ); |
| 1261 | mpf.Reset(); |
| 1262 | #if __TBB_TASK_PRIORITY |
| 1263 | ta.enqueue( MoveOnlyFunctor(), tbb::priority_normal ); |
| 1264 | ta.enqueue( mpf, tbb::priority_normal ); |
| 1265 | ASSERT(mpf.alive, "object was moved when was passed by lval" ); |
| 1266 | mpf.Reset(); |
| 1267 | ta.enqueue( std::move(mpf), tbb::priority_normal ); |
| 1268 | ASSERT(!mpf.alive, "object was copied when was passed by rval" ); |
| 1269 | mpf.Reset(); |
| 1270 | #endif |
| 1271 | } |
| 1272 | } |
| 1273 | #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */ |
| 1274 | |
| 1275 | void TestMoveSemantics() { |
| 1276 | #if __TBB_CPP11_RVALUE_REF_PRESENT |
| 1277 | TestMoveSemanticsNS::TestFunctors(); |
| 1278 | #else |
| 1279 | REPORT("Known issue: move support tests are skipped.\n" ); |
| 1280 | #endif |
| 1281 | } |
| 1282 | //--------------------------------------------------// |
| 1283 | #if __TBB_CPP11_DECLTYPE_PRESENT && !__TBB_CPP11_DECLTYPE_OF_FUNCTION_RETURN_TYPE_BROKEN |
| 1284 | #include <vector> |
| 1285 | #include "harness_state_trackable.h" |
| 1286 | |
| 1287 | namespace TestReturnValueNS { |
| 1288 | struct noDefaultTag {}; |
| 1289 | class ReturnType : public Harness::StateTrackable<> { |
| 1290 | static const int SIZE = 42; |
| 1291 | std::vector<int> data; |
| 1292 | public: |
| 1293 | ReturnType(noDefaultTag) : Harness::StateTrackable<>(0) {} |
| 1294 | #if !__TBB_IMPLICIT_MOVE_PRESENT |
| 1295 | // Define copy constructor to test that it is never called |
| 1296 | ReturnType(const ReturnType& r) : Harness::StateTrackable<>(r), data(r.data) {} |
| 1297 | ReturnType(ReturnType&& r) : Harness::StateTrackable<>(std::move(r)), data(std::move(r.data)) {} |
| 1298 | #endif |
| 1299 | void fill() { |
| 1300 | for (int i = 0; i < SIZE; ++i) |
| 1301 | data.push_back(i); |
| 1302 | } |
| 1303 | void check() { |
| 1304 | ASSERT(data.size() == unsigned(SIZE), NULL); |
| 1305 | for (int i = 0; i < SIZE; ++i) |
| 1306 | ASSERT(data[i] == i, NULL); |
| 1307 | Harness::StateTrackableCounters::counters_t& cnts = Harness::StateTrackableCounters::counters; |
| 1308 | ASSERT((cnts[Harness::StateTrackableBase::DefaultInitialized] == 0), NULL); |
| 1309 | ASSERT(cnts[Harness::StateTrackableBase::DirectInitialized] == 1, NULL); |
| 1310 | std::size_t copied = cnts[Harness::StateTrackableBase::CopyInitialized]; |
| 1311 | std::size_t moved = cnts[Harness::StateTrackableBase::MoveInitialized]; |
| 1312 | ASSERT(cnts[Harness::StateTrackableBase::Destroyed] == copied + moved, NULL); |
| 1313 | // The number of copies/moves should not exceed 3: function return, store to an internal storage, |
| 1314 | // acquire internal storage. |
| 1315 | #if __TBB_CPP11_RVALUE_REF_PRESENT |
| 1316 | ASSERT(copied == 0 && moved <=3, NULL); |
| 1317 | #else |
| 1318 | ASSERT(copied <= 3 && moved == 0, NULL); |
| 1319 | #endif |
| 1320 | } |
| 1321 | }; |
| 1322 | |
| 1323 | template <typename R> |
| 1324 | R function() { |
| 1325 | noDefaultTag tag; |
| 1326 | R r(tag); |
| 1327 | r.fill(); |
| 1328 | return r; |
| 1329 | } |
| 1330 | |
| 1331 | template <> |
| 1332 | void function<void>() {} |
| 1333 | |
| 1334 | template <typename R> |
| 1335 | struct Functor { |
| 1336 | R operator()() const { |
| 1337 | return function<R>(); |
| 1338 | } |
| 1339 | }; |
| 1340 | |
| 1341 | tbb::task_arena& arena() { |
| 1342 | static tbb::task_arena a; |
| 1343 | return a; |
| 1344 | } |
| 1345 | |
| 1346 | template <typename F> |
| 1347 | void TestExecute(F &f) { |
| 1348 | Harness::StateTrackableCounters::reset(); |
| 1349 | ReturnType r = arena().execute(f); |
| 1350 | r.check(); |
| 1351 | } |
| 1352 | |
| 1353 | template <typename F> |
| 1354 | void TestExecute(const F &f) { |
| 1355 | Harness::StateTrackableCounters::reset(); |
| 1356 | ReturnType r = arena().execute(f); |
| 1357 | r.check(); |
| 1358 | } |
| 1359 | #if TBB_PREVIEW_TASK_ISOLATION |
| 1360 | template <typename F> |
| 1361 | void TestIsolate(F &f) { |
| 1362 | Harness::StateTrackableCounters::reset(); |
| 1363 | ReturnType r = tbb::this_task_arena::isolate(f); |
| 1364 | r.check(); |
| 1365 | } |
| 1366 | |
| 1367 | template <typename F> |
| 1368 | void TestIsolate(const F &f) { |
| 1369 | Harness::StateTrackableCounters::reset(); |
| 1370 | ReturnType r = tbb::this_task_arena::isolate(f); |
| 1371 | r.check(); |
| 1372 | } |
| 1373 | #endif |
| 1374 | |
| 1375 | void Test() { |
| 1376 | TestExecute(Functor<ReturnType>()); |
| 1377 | Functor<ReturnType> f1; |
| 1378 | TestExecute(f1); |
| 1379 | TestExecute(function<ReturnType>); |
| 1380 | |
| 1381 | arena().execute(Functor<void>()); |
| 1382 | Functor<void> f2; |
| 1383 | arena().execute(f2); |
| 1384 | arena().execute(function<void>); |
| 1385 | #if TBB_PREVIEW_TASK_ISOLATION |
| 1386 | TestIsolate(Functor<ReturnType>()); |
| 1387 | TestIsolate(f1); |
| 1388 | TestIsolate(function<ReturnType>); |
| 1389 | tbb::this_task_arena::isolate(Functor<void>()); |
| 1390 | tbb::this_task_arena::isolate(f2); |
| 1391 | tbb::this_task_arena::isolate(function<void>); |
| 1392 | #endif |
| 1393 | } |
| 1394 | } |
| 1395 | #endif /* __TBB_CPP11_DECLTYPE_PRESENT */ |
| 1396 | |
| 1397 | void TestReturnValue() { |
| 1398 | #if __TBB_CPP11_DECLTYPE_PRESENT && !__TBB_CPP11_DECLTYPE_OF_FUNCTION_RETURN_TYPE_BROKEN |
| 1399 | TestReturnValueNS::Test(); |
| 1400 | #endif |
| 1401 | } |
| 1402 | //--------------------------------------------------// |
| 1403 | void TestConcurrentFunctionality(int min_thread_num = MinThread, int max_thread_num = MaxThread) { |
| 1404 | InitializeAndTerminate(max_thread_num); |
| 1405 | for (int p = min_thread_num; p <= max_thread_num; ++p) { |
| 1406 | REMARK("testing with %d threads\n" , p); |
| 1407 | TestConcurrentArenas(p); |
| 1408 | TestMultipleMasters(p); |
| 1409 | TestArenaConcurrency(p); |
| 1410 | } |
| 1411 | } |
| 1412 | //--------------------------------------------------// |
| 1413 | struct DefaultCreatedWorkersAmountBody { |
| 1414 | int my_threadnum; |
| 1415 | DefaultCreatedWorkersAmountBody(int threadnum) : my_threadnum(threadnum) {} |
| 1416 | void operator()(int) const { |
| 1417 | ASSERT(my_threadnum == tbb::this_task_arena::max_concurrency(), "concurrency level is not equal specified threadnum" ); |
| 1418 | ASSERT(tbb::this_task_arena::current_thread_index() < tbb::this_task_arena::max_concurrency(), "amount of created threads is more than specified by default" ); |
| 1419 | local_id.local() = 1; |
| 1420 | Harness::Sleep(1); |
| 1421 | } |
| 1422 | }; |
| 1423 | |
| 1424 | struct NativeParallelForBody { |
| 1425 | int my_thread_num; |
| 1426 | int iterations; |
| 1427 | NativeParallelForBody(int thread_num, int multiplier = 100) : my_thread_num(thread_num), iterations(multiplier * thread_num) {} |
| 1428 | void operator()(int idx) const { |
| 1429 | ASSERT(idx == 0, "more than 1 thread is going to reset TLS" ); |
| 1430 | ResetTLS(); |
| 1431 | tbb::parallel_for(0, iterations, DefaultCreatedWorkersAmountBody(my_thread_num), tbb::simple_partitioner()); |
| 1432 | ASSERT(local_id.size() == size_t(my_thread_num), "amount of created threads is not equal to default num" ); |
| 1433 | } |
| 1434 | }; |
| 1435 | |
| 1436 | void TestDefaultCreatedWorkersAmount() { |
| 1437 | NativeParallelFor(1, NativeParallelForBody(tbb::task_scheduler_init::default_num_threads())); |
| 1438 | } |
| 1439 | |
| 1440 | void TestAbilityToCreateWorkers(int thread_num) { |
| 1441 | tbb::task_scheduler_init init_market_with_necessary_amount_plus_one(thread_num); |
| 1442 | // Checks only some part of reserved-master threads amount: |
| 1443 | // 0 and 1 reserved threads are important cases but it is also needed |
| 1444 | // to collect some statistic data with other amount and to not consume |
| 1445 | // whole test sesion time checking each amount |
| 1446 | TestArenaConcurrency(thread_num - 1, 0, int(thread_num / 2.72)); |
| 1447 | TestArenaConcurrency(thread_num, 1, int(thread_num / 3.14)); |
| 1448 | } |
| 1449 | |
| 1450 | void TestDefaultWorkersLimit() { |
| 1451 | TestDefaultCreatedWorkersAmount(); |
| 1452 | // Shared RML might limit the number of workers even if you specify the limits |
| 1453 | // by the reason of (default_concurrency==max_concurrency) for shared RML |
| 1454 | #ifndef RML_USE_WCRM |
| 1455 | TestAbilityToCreateWorkers(256); |
| 1456 | #endif |
| 1457 | } |
| 1458 | //--------------------------------------------------// |
| 1459 | |
| 1460 | // MyObserver checks if threads join to the same arena |
| 1461 | struct MyObserver: public tbb::task_scheduler_observer { |
| 1462 | tbb::enumerable_thread_specific<tbb::task_arena*>& my_tls; |
| 1463 | tbb::task_arena& my_arena; |
| 1464 | tbb::atomic<int>& my_failure_counter; |
| 1465 | tbb::atomic<int>& my_counter; |
| 1466 | |
| 1467 | MyObserver(tbb::task_arena& a, |
| 1468 | tbb::enumerable_thread_specific<tbb::task_arena*>& tls, |
| 1469 | tbb::atomic<int>& failure_counter, |
| 1470 | tbb::atomic<int>& counter) |
| 1471 | : tbb::task_scheduler_observer(a), my_tls(tls), my_arena(a), |
| 1472 | my_failure_counter(failure_counter), my_counter(counter) { |
| 1473 | observe(true); |
| 1474 | } |
| 1475 | void on_scheduler_entry(bool worker) __TBB_override { |
| 1476 | if (worker) { |
| 1477 | ++my_counter; |
| 1478 | tbb::task_arena*& cur_arena = my_tls.local(); |
| 1479 | if (cur_arena != 0 && cur_arena != &my_arena) { |
| 1480 | ++my_failure_counter; |
| 1481 | } |
| 1482 | cur_arena = &my_arena; |
| 1483 | } |
| 1484 | } |
| 1485 | }; |
| 1486 | |
| 1487 | struct MyLoopBody { |
| 1488 | Harness::SpinBarrier& m_barrier; |
| 1489 | MyLoopBody(Harness::SpinBarrier& b):m_barrier(b) { } |
| 1490 | void operator()(int) const { |
| 1491 | m_barrier.wait(); |
| 1492 | } |
| 1493 | }; |
| 1494 | |
| 1495 | struct TaskForArenaExecute { |
| 1496 | Harness::SpinBarrier& m_barrier; |
| 1497 | TaskForArenaExecute(Harness::SpinBarrier& b):m_barrier(b) { } |
| 1498 | void operator()() const { |
| 1499 | tbb::parallel_for(0, tbb::this_task_arena::max_concurrency(), |
| 1500 | MyLoopBody(m_barrier), tbb::simple_partitioner() |
| 1501 | ); |
| 1502 | } |
| 1503 | }; |
| 1504 | |
| 1505 | struct ExecuteParallelFor { |
| 1506 | int n_per_thread; |
| 1507 | int n_repetitions; |
| 1508 | std::vector<tbb::task_arena>& arenas; |
| 1509 | Harness::SpinBarrier& arena_barrier; |
| 1510 | Harness::SpinBarrier& master_barrier; |
| 1511 | ExecuteParallelFor(const int n_per_thread_, const int n_repetitions_, |
| 1512 | std::vector<tbb::task_arena>& arenas_, |
| 1513 | Harness::SpinBarrier& arena_barrier_, Harness::SpinBarrier& master_barrier_) |
| 1514 | : n_per_thread(n_per_thread_), n_repetitions(n_repetitions_), arenas(arenas_), |
| 1515 | arena_barrier(arena_barrier_), master_barrier(master_barrier_){ } |
| 1516 | void operator()(int i) const { |
| 1517 | for (int j = 0; j < n_repetitions; ++j) { |
| 1518 | arenas[i].execute(TaskForArenaExecute(arena_barrier)); |
| 1519 | for(volatile int k = 0; k < n_per_thread; ++k){/* waiting until workers fall asleep */} |
| 1520 | master_barrier.wait(); |
| 1521 | } |
| 1522 | } |
| 1523 | }; |
| 1524 | |
| 1525 | // if n_threads == -1 then global_control initialized with default value |
| 1526 | void TestArenaWorkersMigrationWithNumThreads(int n_threads = 0) { |
| 1527 | if (n_threads == 0) { |
| 1528 | n_threads = tbb::task_scheduler_init::default_num_threads(); |
| 1529 | } |
| 1530 | const int max_n_arenas = 8; |
| 1531 | int n_arenas = 2; |
| 1532 | if(n_threads >= 16) |
| 1533 | n_arenas = max_n_arenas; |
| 1534 | else if (n_threads >= 8) |
| 1535 | n_arenas = 4; |
| 1536 | n_threads = n_arenas * (n_threads / n_arenas); |
| 1537 | const int n_per_thread = 10000000; |
| 1538 | const int n_repetitions = 100; |
| 1539 | const int n_outer_repetitions = 20; |
| 1540 | std::multiset<float> failure_ratio; // for median calculating |
| 1541 | tbb::global_control control(tbb::global_control::max_allowed_parallelism, n_threads - (n_arenas - 1)); |
| 1542 | Harness::SpinBarrier master_barrier(n_arenas); |
| 1543 | Harness::SpinBarrier arena_barrier(n_threads); |
| 1544 | MyObserver* observer[max_n_arenas]; |
| 1545 | std::vector<tbb::task_arena> arenas(n_arenas); |
| 1546 | tbb::atomic<int> failure_counter; |
| 1547 | tbb::atomic<int> counter; |
| 1548 | tbb::enumerable_thread_specific<tbb::task_arena*> tls; |
| 1549 | for (int i = 0; i < n_arenas; ++i) { |
| 1550 | arenas[i].initialize(n_threads / n_arenas); |
| 1551 | observer[i] = new MyObserver(arenas[i], tls, failure_counter, counter); |
| 1552 | } |
| 1553 | int ii = 0; |
| 1554 | for (; ii < n_outer_repetitions; ++ii) { |
| 1555 | failure_counter = 0; |
| 1556 | counter = 0; |
| 1557 | // Main code |
| 1558 | NativeParallelFor(n_arenas, ExecuteParallelFor(n_per_thread, n_repetitions, |
| 1559 | arenas, arena_barrier, master_barrier)); |
| 1560 | // TODO: get rid of check below by setting ratio between n_threads and n_arenas |
| 1561 | failure_ratio.insert((counter != 0 ? float(failure_counter) / counter : 1.0f)); |
| 1562 | tls.clear(); |
| 1563 | // collect 3 elements in failure_ratio before calculating median |
| 1564 | if (ii > 1) { |
| 1565 | std::multiset<float>::iterator it = failure_ratio.begin(); |
| 1566 | std::advance(it, failure_ratio.size() / 2); |
| 1567 | if (*it < 0.02) |
| 1568 | break; |
| 1569 | } |
| 1570 | } |
| 1571 | for (int i = 0; i < n_arenas; ++i) { |
| 1572 | delete observer[i]; |
| 1573 | } |
| 1574 | // check if median is so big |
| 1575 | std::multiset<float>::iterator it = failure_ratio.begin(); |
| 1576 | std::advance(it, failure_ratio.size() / 2); |
| 1577 | // TODO: decrease constants 0.05 and 0.3 by setting ratio between n_threads and n_arenas |
| 1578 | if (*it > 0.05) { |
| 1579 | REPORT("Warning: So many cases when threads join to different arenas.\n" ); |
| 1580 | ASSERT(*it <= 0.3, "A lot of cases when threads join to different arenas.\n" ); |
| 1581 | } |
| 1582 | } |
| 1583 | |
| 1584 | void TestArenaWorkersMigration() { |
| 1585 | TestArenaWorkersMigrationWithNumThreads(4); |
| 1586 | if (tbb::task_scheduler_init::default_num_threads() != 4) { |
| 1587 | TestArenaWorkersMigrationWithNumThreads(); |
| 1588 | } |
| 1589 | } |
| 1590 | |
| 1591 | class CheckArenaNumThreads : public tbb::task { |
| 1592 | public: |
| 1593 | static Harness::SpinBarrier m_barrier; |
| 1594 | |
| 1595 | CheckArenaNumThreads(int nt, int rm): num_threads(nt), reserved_for_masters(rm) { |
| 1596 | m_barrier.initialize(2); |
| 1597 | } |
| 1598 | |
| 1599 | tbb::task* execute() __TBB_override { |
| 1600 | ASSERT( tbb::this_task_arena::max_concurrency() == num_threads, "Wrong concurrency of current arena" ); |
| 1601 | ASSERT( tbb::this_task_arena::current_thread_index() >= reserved_for_masters, "Thread shouldn't attach to master's slots" ); |
| 1602 | m_barrier.wait(); |
| 1603 | return NULL; |
| 1604 | } |
| 1605 | |
| 1606 | private: |
| 1607 | const int num_threads; |
| 1608 | const int reserved_for_masters; |
| 1609 | }; |
| 1610 | |
| 1611 | Harness::SpinBarrier CheckArenaNumThreads::m_barrier; |
| 1612 | |
| 1613 | class EnqueueTaskIntoTaskArena |
| 1614 | { |
| 1615 | public: |
| 1616 | EnqueueTaskIntoTaskArena(tbb::task& t, tbb::task_arena& a) : my_task(t), my_arena(a) {} |
| 1617 | void operator() () |
| 1618 | { |
| 1619 | tbb::task::enqueue(my_task, my_arena); |
| 1620 | } |
| 1621 | private: |
| 1622 | tbb::task& my_task; |
| 1623 | tbb::task_arena& my_arena; |
| 1624 | }; |
| 1625 | |
| 1626 | void TestTaskEnqueueInArena() |
| 1627 | { |
| 1628 | int pp[8]={3, 4, 5, 7, 8, 11, 13, 17}; |
| 1629 | for(int i = 0; i < 8; ++i) |
| 1630 | { |
| 1631 | int p = pp[i]; |
| 1632 | int reserved_for_masters = p - 1; |
| 1633 | tbb::task_arena a(p, reserved_for_masters); |
| 1634 | a.initialize(); |
| 1635 | //Enqueue on master thread |
| 1636 | { |
| 1637 | CheckArenaNumThreads& t = *new( tbb::task::allocate_root() ) CheckArenaNumThreads(p, reserved_for_masters); |
| 1638 | tbb::task::enqueue(t, a); |
| 1639 | CheckArenaNumThreads::m_barrier.wait(); |
| 1640 | a.debug_wait_until_empty(); |
| 1641 | } |
| 1642 | //Enqueue on thread without scheduler |
| 1643 | { |
| 1644 | CheckArenaNumThreads& t = *new( tbb::task::allocate_root() ) CheckArenaNumThreads(p, reserved_for_masters); |
| 1645 | tbb::tbb_thread thr(EnqueueTaskIntoTaskArena(t, a)); |
| 1646 | CheckArenaNumThreads::m_barrier.wait(); |
| 1647 | a.debug_wait_until_empty(); |
| 1648 | thr.join(); |
| 1649 | } |
| 1650 | } |
| 1651 | } |
| 1652 | |
| 1653 | //--------------------------------------------------// |
| 1654 | |
| 1655 | int TestMain() { |
| 1656 | #if __TBB_TASK_ISOLATION |
| 1657 | TestIsolatedExecute(); |
| 1658 | #endif /* __TBB_TASK_ISOLATION */ |
| 1659 | TestSmallStackSize(); |
| 1660 | TestDefaultWorkersLimit(); |
| 1661 | // The test uses up to MaxThread workers (in arenas with no master thread), |
| 1662 | // so the runtime should be initialized appropriately. |
| 1663 | tbb::task_scheduler_init init_market_p_plus_one(MaxThread + 1); |
| 1664 | TestConcurrentFunctionality(); |
| 1665 | TestArenaEntryConsistency(); |
| 1666 | TestAttach(MaxThread); |
| 1667 | TestConstantFunctorRequirement(); |
| 1668 | TestDelegatedSpawnWait(); |
| 1669 | TestMultipleWaits(); |
| 1670 | TestMoveSemantics(); |
| 1671 | TestReturnValue(); |
| 1672 | TestArenaWorkersMigration(); |
| 1673 | TestTaskEnqueueInArena(); |
| 1674 | return Harness::Done; |
| 1675 | } |
| 1676 | |