| 1 | /* |
| 2 | Copyright (c) 2005-2019 Intel Corporation |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | #define TBB_PREVIEW_WAITING_FOR_WORKERS 1 |
| 18 | #include "tbb/global_control.h" |
| 19 | #include "harness.h" |
| 20 | #include "tbb/task_scheduler_observer.h" |
| 21 | |
| 22 | const size_t MB = 1024*1024; |
| 23 | const double BARRIER_TIMEOUT = 10.; |
| 24 | |
| 25 | void TestStackSizeSimpleControl() |
| 26 | { |
| 27 | { |
| 28 | tbb::global_control s0(tbb::global_control::thread_stack_size, 1*MB); |
| 29 | |
| 30 | { |
| 31 | tbb::global_control s1(tbb::global_control::thread_stack_size, 8*MB); |
| 32 | |
| 33 | ASSERT(8*MB == tbb::global_control::active_value(tbb::global_control::thread_stack_size), NULL); |
| 34 | } |
| 35 | ASSERT(1*MB == tbb::global_control::active_value(tbb::global_control::thread_stack_size), NULL); |
| 36 | } |
| 37 | } |
| 38 | |
| 39 | #include "harness_concurrency_tracker.h" |
| 40 | #include "tbb/task_scheduler_init.h" |
| 41 | #include <limits.h> // for UINT_MAX |
| 42 | |
| 43 | struct StackSizeRun: NoAssign { |
| 44 | int num_threads; |
| 45 | Harness::SpinBarrier *barr1, *barr2; |
| 46 | |
| 47 | StackSizeRun(int threads, Harness::SpinBarrier *b1, Harness::SpinBarrier *b2) : |
| 48 | num_threads(threads), barr1(b1), barr2(b2) {} |
| 49 | void operator()( int id ) const { |
| 50 | tbb::global_control s1(tbb::global_control::thread_stack_size, (1+id)*MB); |
| 51 | |
| 52 | barr1->timed_wait(BARRIER_TIMEOUT); |
| 53 | |
| 54 | ASSERT(num_threads*MB == tbb::global_control::active_value(tbb::global_control::thread_stack_size), NULL); |
| 55 | barr2->timed_wait(BARRIER_TIMEOUT); |
| 56 | } |
| 57 | }; |
| 58 | |
| 59 | void TestStackSizeThreadsControl() |
| 60 | { |
| 61 | int threads = 4; |
| 62 | Harness::SpinBarrier barr1(threads), barr2(threads); |
| 63 | NativeParallelFor( threads, StackSizeRun(threads, &barr1, &barr2) ); |
| 64 | } |
| 65 | |
| 66 | void RunWorkersLimited(int tsi_max_threads, size_t parallelism, bool wait) |
| 67 | { |
| 68 | tbb::global_control s(tbb::global_control::max_allowed_parallelism, parallelism); |
| 69 | // try both configuration with already sleeping workers and with not yet sleeping |
| 70 | if (wait) |
| 71 | Harness::Sleep(100); |
| 72 | const size_t expected_threads = tsi_max_threads>0? |
| 73 | min( (unsigned)tsi_max_threads, parallelism ) |
| 74 | : ( tbb::tbb_thread::hardware_concurrency()==1? 1 : parallelism ); |
| 75 | Harness::ExactConcurrencyLevel::check(expected_threads); |
| 76 | } |
| 77 | |
| 78 | class blocking_task_scheduler_init { |
| 79 | tbb::task_scheduler_init init; |
| 80 | public: |
| 81 | blocking_task_scheduler_init(int num_threads = tbb::task_scheduler_init::automatic) : init(num_threads) {} |
| 82 | ~blocking_task_scheduler_init() { |
| 83 | bool ok = init.blocking_terminate(std::nothrow); |
| 84 | ASSERT(ok, "blocking_terminate has failed" ); |
| 85 | } |
| 86 | }; |
| 87 | |
| 88 | void TSI_and_RunWorkers(int tsi_max_threads, size_t parallelism, size_t max_value) |
| 89 | { |
| 90 | blocking_task_scheduler_init tsi(tsi_max_threads); |
| 91 | size_t active = tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism); |
| 92 | ASSERT(active == max(2U, max_value), "active_value must not be changed by task_scheduler_init" ); |
| 93 | RunWorkersLimited(tsi_max_threads, parallelism, /*wait=*/false); |
| 94 | } |
| 95 | |
| 96 | #include "tbb/tbb_thread.h" |
| 97 | |
| 98 | void TestWorkers(size_t curr_par) |
| 99 | { |
| 100 | const size_t max_parallelism = |
| 101 | tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism); |
| 102 | ASSERT(max(2U, tbb::tbb_thread::hardware_concurrency()) == max_parallelism, NULL); |
| 103 | { |
| 104 | const unsigned h_c = tbb::tbb_thread::hardware_concurrency(); |
| 105 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, curr_par); |
| 106 | size_t v = tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism); |
| 107 | ASSERT(!curr_par || max((size_t)2, curr_par) == v, NULL); |
| 108 | if (h_c > 1) |
| 109 | TSI_and_RunWorkers(tbb::task_scheduler_init::automatic, min(h_c, curr_par), curr_par); |
| 110 | if (curr_par) // do not call task_scheduler_init t(0); |
| 111 | TSI_and_RunWorkers((int)curr_par, curr_par, curr_par); |
| 112 | if (curr_par > 2) { // check that min(tsi, parallelism) is active |
| 113 | TSI_and_RunWorkers((int)curr_par-1, curr_par, curr_par); |
| 114 | TSI_and_RunWorkers((int)curr_par, curr_par-1, curr_par); |
| 115 | } |
| 116 | // check constrains on control's value: it can't be increased |
| 117 | tbb::global_control c1(tbb::global_control::max_allowed_parallelism, curr_par+1); |
| 118 | v = tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism); |
| 119 | if (curr_par) |
| 120 | ASSERT(max(2U, curr_par) == v, "It's impossible to increase maximal parallelism." ); |
| 121 | else |
| 122 | ASSERT(2 == v, NULL); |
| 123 | } |
| 124 | ASSERT(tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) |
| 125 | == max_parallelism, |
| 126 | "max parallelism has been restored successfully after decreasing/increasing" ); |
| 127 | } |
| 128 | |
| 129 | void TestWorkersConstraints() { |
| 130 | const size_t max_parallelism = |
| 131 | tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism); |
| 132 | blocking_task_scheduler_init tsi; |
| 133 | if (max_parallelism > 3) { |
| 134 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, max_parallelism-1); |
| 135 | ASSERT(max_parallelism-1 == |
| 136 | tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism), |
| 137 | "Allowed parallelism must be decreasable." ); |
| 138 | tbb::global_control c1(tbb::global_control::max_allowed_parallelism, max_parallelism-2); |
| 139 | ASSERT(max_parallelism-2 == |
| 140 | tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism), |
| 141 | "Allowed parallelism must be decreasable." ); |
| 142 | } |
| 143 | const size_t limit_par = min(max_parallelism, 4U); |
| 144 | // check that constrains are really met |
| 145 | for (int wait=0; wait<2; wait++) { |
| 146 | for (size_t num=2; num<limit_par; num++) |
| 147 | RunWorkersLimited(tbb::task_scheduler_init::automatic, num, wait==1); |
| 148 | for (size_t num=limit_par; num>1; num--) |
| 149 | RunWorkersLimited(tbb::task_scheduler_init::automatic, num, wait==1); |
| 150 | } |
| 151 | } |
| 152 | |
| 153 | struct DummyBody { |
| 154 | void operator()(int) const { |
| 155 | __TBB_Pause(1); |
| 156 | } |
| 157 | }; |
| 158 | |
| 159 | void RunParallelWork() { |
| 160 | const int LOOP_ITERS = 10*1000; |
| 161 | tbb::parallel_for(0, LOOP_ITERS, DummyBody(), tbb::simple_partitioner()); |
| 162 | } |
| 163 | |
| 164 | struct SetUseRun: NoAssign { |
| 165 | Harness::SpinBarrier *barr; |
| 166 | |
| 167 | SetUseRun(Harness::SpinBarrier *b) : barr(b) {} |
| 168 | void operator()( int id ) const { |
| 169 | if (id == 0) { |
| 170 | for (int i=0; i<10; i++) { |
| 171 | blocking_task_scheduler_init tsi; |
| 172 | RunParallelWork(); |
| 173 | barr->timed_wait(BARRIER_TIMEOUT); |
| 174 | } |
| 175 | } else { |
| 176 | for (int i=0; i<10; i++) { |
| 177 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 8); |
| 178 | barr->timed_wait(BARRIER_TIMEOUT); |
| 179 | } |
| 180 | } |
| 181 | } |
| 182 | }; |
| 183 | |
| 184 | void TestConcurrentSetUseConcurrency() |
| 185 | { |
| 186 | Harness::SpinBarrier barr(2); |
| 187 | NativeParallelFor( 2, SetUseRun(&barr) ); |
| 188 | } |
| 189 | |
| 190 | // check number of workers after autoinitialization |
| 191 | void TestAutoInit() |
| 192 | { |
| 193 | const size_t max_parallelism = |
| 194 | tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism); |
| 195 | const unsigned expected_threads = tbb::tbb_thread::hardware_concurrency()==1? |
| 196 | 1 : (unsigned)max_parallelism; |
| 197 | Harness::ExactConcurrencyLevel::check(expected_threads); |
| 198 | ASSERT(tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) |
| 199 | == max_parallelism, "max_allowed_parallelism must not be changed after auto init" ); |
| 200 | if (max_parallelism > 2) { |
| 201 | // after autoinit it's possible to decrease workers number |
| 202 | tbb::global_control s(tbb::global_control::max_allowed_parallelism, max_parallelism-1); |
| 203 | Harness::ExactConcurrencyLevel::check(max_parallelism-1); |
| 204 | } |
| 205 | } |
| 206 | |
| 207 | // need this to use TRY_BAD_EXPR_ENABLED when TBB_USE_ASSERT is not defined |
| 208 | #undef TBB_USE_ASSERT |
| 209 | #define TBB_USE_ASSERT 1 |
| 210 | |
| 211 | #include "harness_bad_expr.h" |
| 212 | |
| 213 | void TestInvalidParallelism() |
| 214 | { |
| 215 | #if TRY_BAD_EXPR_ENABLED |
| 216 | const size_t max_parallelism = |
| 217 | tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism); |
| 218 | { |
| 219 | tbb::set_assertion_handler( AssertionFailureHandler ); |
| 220 | TRY_BAD_EXPR( tbb::global_control c(tbb::global_control::max_allowed_parallelism, 0), |
| 221 | "max_allowed_parallelism cannot be 0." ); |
| 222 | tbb::set_assertion_handler( ReportError ); |
| 223 | ASSERT(tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) |
| 224 | == max_parallelism, NULL); |
| 225 | } |
| 226 | { |
| 227 | const size_t P = 2; |
| 228 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, P); |
| 229 | ASSERT(tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) |
| 230 | == P, NULL); |
| 231 | tbb::set_assertion_handler( AssertionFailureHandler ); |
| 232 | TRY_BAD_EXPR( tbb::global_control cZ(tbb::global_control::max_allowed_parallelism, 0), |
| 233 | "max_allowed_parallelism cannot be 0." ); |
| 234 | tbb::set_assertion_handler( ReportError ); |
| 235 | ASSERT(tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) |
| 236 | == P, NULL); |
| 237 | } |
| 238 | ASSERT(tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) |
| 239 | == max_parallelism, NULL); |
| 240 | #endif /* TRY_BAD_EXPR_ENABLED */ |
| 241 | } |
| 242 | |
| 243 | void TestTooBigStack() |
| 244 | { |
| 245 | #if __TBB_x86_32 |
| 246 | const size_t stack_sizes[] = {512*MB, 2*1024*MB, UINT_MAX}; |
| 247 | #else |
| 248 | const size_t stack_sizes[] = {512*MB, 2*1024*MB, UINT_MAX, 10LU*1024*MB}; |
| 249 | #endif |
| 250 | |
| 251 | #if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00) |
| 252 | size_t default_ss = tbb::global_control::active_value(tbb::global_control::thread_stack_size); |
| 253 | #endif |
| 254 | for (unsigned i = 0; i<Harness::array_length(stack_sizes); i++) { |
| 255 | // No stack size setting for Windows 8 Store* apps, skip it |
| 256 | #if TRY_BAD_EXPR_ENABLED && __TBB_x86_64 && (_WIN32 || _WIN64) && !(__TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)) |
| 257 | if (stack_sizes[i] != (unsigned)stack_sizes[i]) { |
| 258 | size_t curr_ss = tbb::global_control::active_value(tbb::global_control::thread_stack_size); |
| 259 | tbb::set_assertion_handler( AssertionFailureHandler ); |
| 260 | TRY_BAD_EXPR( tbb::global_control s1(tbb::global_control::thread_stack_size, stack_sizes[i]), "Stack size is limited to unsigned int range" ); |
| 261 | tbb::set_assertion_handler( ReportError ); |
| 262 | ASSERT(curr_ss == tbb::global_control::active_value(tbb::global_control::thread_stack_size), "Changing of stack size is not expected." ); |
| 263 | continue; |
| 264 | } |
| 265 | #endif |
| 266 | tbb::global_control s1(tbb::global_control::thread_stack_size, stack_sizes[i]); |
| 267 | size_t actual_stack_sz = tbb::global_control::active_value(tbb::global_control::thread_stack_size); |
| 268 | #if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00) |
| 269 | ASSERT(actual_stack_sz == default_ss, "It's ignored for Windows 8.x Store* apps" ); |
| 270 | #else |
| 271 | ASSERT(actual_stack_sz==stack_sizes[i], NULL); |
| 272 | #endif |
| 273 | } |
| 274 | } |
| 275 | |
| 276 | struct ParallelForRun: NoAssign { |
| 277 | int num_threads; |
| 278 | Harness::SpinBarrier *barr1, *barr2; |
| 279 | |
| 280 | ParallelForRun(Harness::SpinBarrier *b1, Harness::SpinBarrier *b2) : |
| 281 | barr1(b1), barr2(b2) {} |
| 282 | void operator()( int /*id*/ ) const { |
| 283 | barr1->timed_wait(BARRIER_TIMEOUT); |
| 284 | RunParallelWork(); |
| 285 | barr2->timed_wait(BARRIER_TIMEOUT); |
| 286 | } |
| 287 | }; |
| 288 | |
| 289 | class FFTask: public tbb::task { |
| 290 | tbb::atomic<int> *counter; |
| 291 | tbb::task* execute() __TBB_override { |
| 292 | (*counter)++; |
| 293 | return NULL; |
| 294 | } |
| 295 | public: |
| 296 | FFTask(tbb::atomic<int> *counter_) : counter(counter_) {} |
| 297 | }; |
| 298 | |
| 299 | class WaiterTask: public tbb::task { |
| 300 | tbb::atomic<bool> *flag; |
| 301 | tbb::task* execute() __TBB_override { |
| 302 | while(!*flag) |
| 303 | __TBB_Yield(); |
| 304 | return NULL; |
| 305 | } |
| 306 | public: |
| 307 | WaiterTask(tbb::atomic<bool> *flag_) : flag(flag_) {} |
| 308 | }; |
| 309 | |
| 310 | class WorkAndEnqueueTask: public tbb::task { |
| 311 | tbb::atomic<int> *counter; |
| 312 | tbb::atomic<bool> *signalToLeave; |
| 313 | tbb::task* execute() __TBB_override { |
| 314 | RunParallelWork(); |
| 315 | *signalToLeave = true; |
| 316 | for (int i=0; i<ENQUEUE_TASKS; i++) { |
| 317 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(counter); |
| 318 | tbb::task::enqueue(*t); |
| 319 | } |
| 320 | |
| 321 | return NULL; |
| 322 | } |
| 323 | public: |
| 324 | static const int ENQUEUE_TASKS = 10; |
| 325 | WorkAndEnqueueTask(tbb::atomic<int> *counter_, tbb::atomic<bool> *signal_) |
| 326 | : counter(counter_), signalToLeave(signal_) {} |
| 327 | }; |
| 328 | |
| 329 | #if __TBB_TASK_PRIORITY |
| 330 | tbb::priority_t getPriorityByInt(int i) { |
| 331 | return i%3==0? tbb::priority_low : (i%3==1? tbb::priority_normal : |
| 332 | tbb::priority_high); |
| 333 | } |
| 334 | #endif |
| 335 | |
| 336 | class FFTasksRun: NoAssign { |
| 337 | void enqTasks(int id) const { |
| 338 | for (int i=0; i<ITERS; i++) { |
| 339 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(cnt); |
| 340 | #if __TBB_TASK_PRIORITY |
| 341 | tbb::priority_t p = getPriorityByInt(i+id); |
| 342 | tbb::task::enqueue(*t, p); |
| 343 | #else |
| 344 | tbb::internal::suppress_unused_warning(id); |
| 345 | tbb::task::enqueue(*t); |
| 346 | #endif |
| 347 | } |
| 348 | } |
| 349 | public: |
| 350 | static const int ITERS = 20; |
| 351 | Harness::SpinBarrier *barr; |
| 352 | tbb::atomic<int> *cnt; |
| 353 | |
| 354 | FFTasksRun(Harness::SpinBarrier *b, tbb::atomic<int> *c) : |
| 355 | barr(b), cnt(c) {} |
| 356 | void operator()(int id) const { |
| 357 | if (id) |
| 358 | enqTasks(id); |
| 359 | barr->wait(); |
| 360 | if (!id) |
| 361 | enqTasks(id); |
| 362 | } |
| 363 | }; |
| 364 | |
| 365 | void TestTaskEnqueue() |
| 366 | { |
| 367 | { |
| 368 | blocking_task_scheduler_init tsi(20); |
| 369 | tbb::atomic<int> flag; |
| 370 | tbb::atomic<bool> taskDoneFlag; |
| 371 | flag = 0; |
| 372 | taskDoneFlag = false; |
| 373 | |
| 374 | for (int i=0; i<10; i++) { |
| 375 | WaiterTask* w = new( tbb::task::allocate_root() ) WaiterTask(&taskDoneFlag); |
| 376 | tbb::task::enqueue(*w); |
| 377 | } |
| 378 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1); |
| 379 | taskDoneFlag = true; |
| 380 | |
| 381 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag); |
| 382 | tbb::task::enqueue(*t); |
| 383 | while(!flag) |
| 384 | __TBB_Yield(); |
| 385 | } |
| 386 | { |
| 387 | blocking_task_scheduler_init tsi(1); |
| 388 | tbb::atomic<int> flag; |
| 389 | tbb::atomic<bool> taskDoneFlag; |
| 390 | flag = 0; |
| 391 | taskDoneFlag = false; |
| 392 | |
| 393 | WaiterTask* w = new( tbb::task::allocate_root() ) WaiterTask(&taskDoneFlag); |
| 394 | tbb::task::enqueue(*w); |
| 395 | taskDoneFlag = true; |
| 396 | |
| 397 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1); |
| 398 | |
| 399 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag); |
| 400 | tbb::task::enqueue(*t); |
| 401 | while(!flag) |
| 402 | __TBB_Yield(); |
| 403 | } |
| 404 | { |
| 405 | blocking_task_scheduler_init tsi(2); |
| 406 | tbb::atomic<int> flag; |
| 407 | flag = 0; |
| 408 | |
| 409 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1); |
| 410 | |
| 411 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag); |
| 412 | tbb::task::enqueue(*t); |
| 413 | while(!flag) |
| 414 | __TBB_Yield(); |
| 415 | } |
| 416 | { |
| 417 | blocking_task_scheduler_init tsi(2); |
| 418 | tbb::atomic<int> flag; |
| 419 | flag = 0; |
| 420 | |
| 421 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag); |
| 422 | tbb::task::enqueue(*t); |
| 423 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1); |
| 424 | |
| 425 | while(!flag) |
| 426 | __TBB_Yield(); |
| 427 | } |
| 428 | |
| 429 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1); |
| 430 | |
| 431 | { // check that enqueue() guarantee mandatory parallelism |
| 432 | blocking_task_scheduler_init tsi(1); |
| 433 | tbb::atomic<int> flag; |
| 434 | flag = 0; |
| 435 | |
| 436 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag); |
| 437 | tbb::task::enqueue(*t); |
| 438 | while(!flag) |
| 439 | __TBB_Yield(); |
| 440 | } |
| 441 | { |
| 442 | tbb::atomic<int> flag; |
| 443 | flag = 0; |
| 444 | { |
| 445 | blocking_task_scheduler_init tsi(1); |
| 446 | |
| 447 | for (int i=0; i<10; i++) { |
| 448 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag); |
| 449 | #if __TBB_TASK_PRIORITY |
| 450 | const tbb::priority_t p = getPriorityByInt(i); |
| 451 | tbb::task::enqueue(*t, p); |
| 452 | #else |
| 453 | tbb::task::enqueue(*t); |
| 454 | #endif |
| 455 | } |
| 456 | } |
| 457 | ASSERT(flag==10, "The tasks must be terminated when task_scheduler_init destroyed." ); |
| 458 | } |
| 459 | const unsigned threads = 2; |
| 460 | { |
| 461 | blocking_task_scheduler_init tsi(1); |
| 462 | Harness::SpinBarrier barr1(threads), barr2(threads); |
| 463 | RunWorkersLimited(1, 1, false); |
| 464 | |
| 465 | NativeParallelFor( threads, ParallelForRun(&barr1, &barr2) ); |
| 466 | } |
| 467 | |
| 468 | tbb::atomic<int> counter; |
| 469 | counter = 0; |
| 470 | { |
| 471 | blocking_task_scheduler_init tsi(1); |
| 472 | Harness::SpinBarrier barr(threads); |
| 473 | RunWorkersLimited(1, 1, false); |
| 474 | |
| 475 | NativeParallelFor( threads, FFTasksRun(&barr, &counter) ); |
| 476 | } |
| 477 | ASSERT(counter == threads*FFTasksRun::ITERS, "All tasks must be done when task_scheduler_init destroyed." ); |
| 478 | counter = 0; |
| 479 | { // an enqueued task can enqueue other tasks and calls parallel_for |
| 480 | tbb::atomic<bool> signalToLeave; |
| 481 | blocking_task_scheduler_init tsi(1); |
| 482 | |
| 483 | signalToLeave = false; |
| 484 | WorkAndEnqueueTask *t = new( tbb::task::allocate_root() ) |
| 485 | WorkAndEnqueueTask(&counter, &signalToLeave); |
| 486 | tbb::task::enqueue(*t); |
| 487 | RunParallelWork(); |
| 488 | |
| 489 | while (!signalToLeave) |
| 490 | __TBB_Yield(); |
| 491 | } |
| 492 | ASSERT(counter == WorkAndEnqueueTask::ENQUEUE_TASKS, "All tasks must be done when task_scheduler_init destroyed." ); |
| 493 | } |
| 494 | |
| 495 | class CountWorkersTask: public tbb::task { |
| 496 | tbb::atomic<bool> *flag; |
| 497 | // count unique worker threads |
| 498 | static tbb::combinable<size_t> uniqThreads; |
| 499 | |
| 500 | tbb::task* execute() __TBB_override { |
| 501 | uniqThreads.local() = 1; |
| 502 | Harness::Sleep(10); |
| 503 | *flag = 1; |
| 504 | return NULL; |
| 505 | } |
| 506 | public: |
| 507 | CountWorkersTask(tbb::atomic<bool> *flag_) : flag(flag_) {} |
| 508 | static size_t observedThreads() { |
| 509 | return uniqThreads.combine(std::plus<size_t>()); |
| 510 | } |
| 511 | }; |
| 512 | |
| 513 | tbb::combinable<size_t> CountWorkersTask::uniqThreads; |
| 514 | |
| 515 | tbb::atomic<int> activeArenas; |
| 516 | |
| 517 | class ArenaObserver: public tbb::task_scheduler_observer { |
| 518 | public: |
| 519 | ArenaObserver() : tbb::task_scheduler_observer(/*local=*/true) { |
| 520 | } |
| 521 | void on_scheduler_entry( bool worker ) __TBB_override { |
| 522 | if (worker) { |
| 523 | ++activeArenas; |
| 524 | } |
| 525 | } |
| 526 | void on_scheduler_exit( bool worker ) __TBB_override { |
| 527 | if (worker) { |
| 528 | --activeArenas; |
| 529 | } |
| 530 | } |
| 531 | }; |
| 532 | |
| 533 | ArenaObserver observers[2]; |
| 534 | |
| 535 | struct ArenasObserveRun: NoAssign { |
| 536 | Harness::SpinBarrier *barr; |
| 537 | |
| 538 | ArenasObserveRun(Harness::SpinBarrier *b) : barr(b) {} |
| 539 | void operator()( int id ) const { |
| 540 | observers[id].observe(true); |
| 541 | ArenaObserver o; |
| 542 | tbb::atomic<bool> flag; |
| 543 | flag = false; |
| 544 | |
| 545 | CountWorkersTask* t = new( tbb::task::allocate_root() ) |
| 546 | CountWorkersTask(&flag); |
| 547 | barr->wait(); |
| 548 | tbb::task::enqueue(*t); |
| 549 | while(!flag) |
| 550 | __TBB_Yield(); |
| 551 | } |
| 552 | }; |
| 553 | |
| 554 | struct ArenaRun: NoAssign { |
| 555 | tbb::atomic<int> *counter; |
| 556 | |
| 557 | ArenaRun(tbb::atomic<int> *counter_) : counter(counter_) {} |
| 558 | void operator()() const { |
| 559 | (*counter)++; |
| 560 | } |
| 561 | }; |
| 562 | |
| 563 | struct ArenaUserRun: NoAssign { |
| 564 | static const int ENQUEUE_TASKS = 10; |
| 565 | tbb::task_arena *arena; |
| 566 | Harness::SpinBarrier *barr; |
| 567 | tbb::atomic<int> *counter; |
| 568 | |
| 569 | ArenaUserRun(tbb::task_arena *a, Harness::SpinBarrier *b, tbb::atomic<int> *c) : |
| 570 | arena(a), barr(b), counter(c) {} |
| 571 | void operator()( int id ) const { |
| 572 | |
| 573 | for (int i=0; i<ENQUEUE_TASKS; i++) |
| 574 | arena->enqueue(ArenaRun(counter)); |
| 575 | barr->wait(); |
| 576 | if (!id) |
| 577 | arena->terminate(); |
| 578 | } |
| 579 | }; |
| 580 | |
| 581 | void TestConcurrentArenas() |
| 582 | { |
| 583 | Harness::SpinBarrier barrier(2); |
| 584 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1); |
| 585 | { |
| 586 | blocking_task_scheduler_init tsi(2); |
| 587 | ArenaObserver observer; |
| 588 | observer.observe(true); |
| 589 | |
| 590 | Harness::ExactConcurrencyLevel::check(1); // must have 0 worker threads |
| 591 | |
| 592 | NativeParallelFor( 2, ArenasObserveRun(&barrier) ); |
| 593 | ASSERT(1 == CountWorkersTask::observedThreads(), |
| 594 | "Single worker is expecting to serve mandatory parallelism." ); |
| 595 | while(activeArenas) // wait till single worker termination |
| 596 | __TBB_Yield(); |
| 597 | |
| 598 | // check that without mandatory parallelism, still have 0 worker threads |
| 599 | Harness::ExactConcurrencyLevel::check(1); |
| 600 | } |
| 601 | tbb::atomic<int> counter; |
| 602 | counter = 0; |
| 603 | { |
| 604 | blocking_task_scheduler_init tsi(1); |
| 605 | tbb::task_arena arena(2); |
| 606 | |
| 607 | NativeParallelFor( 2, ArenaUserRun(&arena, &barrier, &counter) ); |
| 608 | } |
| 609 | ASSERT(counter == 2*ArenaUserRun::ENQUEUE_TASKS, "All tasks must be done." ); |
| 610 | } |
| 611 | |
| 612 | void TestParallelismRestored() |
| 613 | { |
| 614 | const int TASKS = 5; |
| 615 | tbb::atomic<int> counter; |
| 616 | counter = 0; |
| 617 | { |
| 618 | const int P = 4; |
| 619 | blocking_task_scheduler_init tsi(P); |
| 620 | { |
| 621 | tbb::global_control s(tbb::global_control::max_allowed_parallelism, 1); |
| 622 | Harness::ExactConcurrencyLevel::check(1); |
| 623 | // create enforced concurrency in the arena |
| 624 | for (int i=0; i<TASKS; i++) { |
| 625 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(&counter); |
| 626 | tbb::task::enqueue(*t); |
| 627 | } |
| 628 | } |
| 629 | // global control is off, check that concurrency P is available |
| 630 | Harness::ExactConcurrencyLevel::check(P); |
| 631 | } |
| 632 | ASSERT(counter==TASKS, "The tasks must be executed at this point." ); |
| 633 | } |
| 634 | |
| 635 | class NoUnwantedEnforcedRun { |
| 636 | Harness::SpinBarrier *globalBarrier; |
| 637 | public: |
| 638 | NoUnwantedEnforcedRun(Harness::SpinBarrier *b) : globalBarrier(b) {} |
| 639 | void operator()( int id ) const { |
| 640 | Harness::SpinBarrier barr(1); |
| 641 | |
| 642 | tbb::combinable<size_t> uniqThreads; |
| 643 | Harness::ExactConcurrencyLevel::check(1); |
| 644 | globalBarrier->wait(); |
| 645 | if (id) { |
| 646 | for (int i=0; i<20; i++) { |
| 647 | Harness::ExactConcurrencyLevel::check(1); // no workers expected in the thread |
| 648 | } |
| 649 | } else { |
| 650 | // create enforced concurrency in a separate thread, thus provoke enforced worker without |
| 651 | // work to do to join arena with parallel_for |
| 652 | for (int i=0; i<10; i++) { |
| 653 | tbb::atomic<int> flag; |
| 654 | flag = 0; |
| 655 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag); |
| 656 | tbb::task::enqueue(*t); |
| 657 | Harness::ExactConcurrencyLevel::checkLessOrEqual(2, &uniqThreads); |
| 658 | size_t seen = uniqThreads.combine(std::plus<size_t>()); |
| 659 | ASSERT(seen==1 || seen==2, NULL); |
| 660 | while(!flag) |
| 661 | __TBB_Yield(); |
| 662 | } |
| 663 | } |
| 664 | } |
| 665 | }; |
| 666 | |
| 667 | // test that enforced concurrency from one thread doesn't affect another |
| 668 | void TestNoUnwantedEnforced() |
| 669 | { |
| 670 | Harness::SpinBarrier barrier(2); |
| 671 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1); |
| 672 | blocking_task_scheduler_init tsi(4); |
| 673 | NativeParallelFor( 2, NoUnwantedEnforcedRun(&barrier) ); |
| 674 | } |
| 675 | |
| 676 | class TestMultipleControlsRun { |
| 677 | Harness::SpinBarrier *barrier; |
| 678 | public: |
| 679 | TestMultipleControlsRun(Harness::SpinBarrier *b) : barrier(b) {} |
| 680 | void operator()( int id ) const { |
| 681 | barrier->wait(); |
| 682 | if (id) { |
| 683 | { |
| 684 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1); |
| 685 | Harness::ExactConcurrencyLevel::check(1); |
| 686 | barrier->wait(); |
| 687 | } |
| 688 | Harness::ExactConcurrencyLevel::check(1); |
| 689 | barrier->wait(); |
| 690 | { |
| 691 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 2); |
| 692 | Harness::ExactConcurrencyLevel::check(1); |
| 693 | barrier->wait(); |
| 694 | Harness::ExactConcurrencyLevel::check(2); |
| 695 | barrier->wait(); |
| 696 | } |
| 697 | } else { |
| 698 | { |
| 699 | Harness::ExactConcurrencyLevel::check(1); |
| 700 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1); |
| 701 | barrier->wait(); |
| 702 | Harness::ExactConcurrencyLevel::check(1); |
| 703 | barrier->wait(); |
| 704 | Harness::ExactConcurrencyLevel::check(1); |
| 705 | barrier->wait(); |
| 706 | } |
| 707 | Harness::ExactConcurrencyLevel::check(2); |
| 708 | barrier->wait(); |
| 709 | } |
| 710 | } |
| 711 | }; |
| 712 | |
| 713 | // test that global controls from different thread with overlapping lifetime |
| 714 | // still keep parallelism under control |
| 715 | void TestMultipleControls() |
| 716 | { |
| 717 | blocking_task_scheduler_init tsi(2); // to prevent autoinitialization |
| 718 | Harness::SpinBarrier barrier(2); |
| 719 | NativeParallelFor( 2, TestMultipleControlsRun(&barrier) ); |
| 720 | } |
| 721 | |
| 722 | // enqueued tasks with priority below current must not be forgotten, |
| 723 | // when enqueue enforced priority is enabled |
| 724 | void TestForgottenEnqueuedTasks() |
| 725 | { |
| 726 | tbb::task_scheduler_init tsi(2); |
| 727 | tbb::atomic<int> counter; |
| 728 | tbb::atomic<bool> waitFlag; |
| 729 | |
| 730 | waitFlag = false; |
| 731 | counter = 0; |
| 732 | tbb::task &r = *new( tbb::task::allocate_root() ) tbb::empty_task; |
| 733 | r.set_ref_count(3); |
| 734 | for (int i=0; i<2; i++) { |
| 735 | tbb::task &t = *new( r.allocate_child() ) WaiterTask(&waitFlag); |
| 736 | tbb::task::spawn(t); |
| 737 | } |
| 738 | // all workers are occupied by blocked WaiterTask() |
| 739 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(&counter); |
| 740 | tbb::task::enqueue(*t, tbb::priority_low); |
| 741 | { |
| 742 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1); |
| 743 | waitFlag = true; // WaiterTask() done, workers ready to use |
| 744 | while (!counter) // wait till FFTask() executed |
| 745 | __TBB_Yield(); |
| 746 | } |
| 747 | r.wait_for_all(); |
| 748 | tbb::task::destroy(r); |
| 749 | } |
| 750 | |
| 751 | int TestMain() |
| 752 | { |
| 753 | TestTaskEnqueue(); |
| 754 | TestConcurrentArenas(); |
| 755 | TestMultipleControls(); |
| 756 | TestNoUnwantedEnforced(); |
| 757 | const unsigned h_c = tbb::tbb_thread::hardware_concurrency(); |
| 758 | bool excessHC; |
| 759 | { |
| 760 | tbb::task_scheduler_init t(h_c+1); |
| 761 | excessHC = Harness::ExactConcurrencyLevel::isEqual(h_c+1); |
| 762 | } |
| 763 | if (h_c>2) |
| 764 | TestWorkers(h_c-1); |
| 765 | if (excessHC) // requires hardware concurrency +1, otherwise hangs |
| 766 | TestWorkers(h_c+1); |
| 767 | if (excessHC || h_c >= 2) |
| 768 | TestWorkers(2); |
| 769 | if (excessHC || h_c >= 3) |
| 770 | TestWorkers(3); |
| 771 | TestWorkersConstraints(); |
| 772 | TestConcurrentSetUseConcurrency(); |
| 773 | TestInvalidParallelism(); |
| 774 | TestAutoInit(); // auto-initialization done at this point |
| 775 | |
| 776 | size_t default_ss = tbb::global_control::active_value(tbb::global_control::thread_stack_size); |
| 777 | ASSERT(default_ss, NULL); |
| 778 | |
| 779 | // it's impossible to change stack size for Windows 8 Store* apps, so skip the tests |
| 780 | #if !(__TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)) |
| 781 | TestStackSizeSimpleControl(); |
| 782 | TestStackSizeThreadsControl(); |
| 783 | #endif |
| 784 | TestTooBigStack(); |
| 785 | ASSERT(default_ss == tbb::global_control::active_value(tbb::global_control::thread_stack_size), NULL); |
| 786 | return Harness::Done; |
| 787 | } |
| 788 | |