1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#define TBB_PREVIEW_WAITING_FOR_WORKERS 1
18#include "tbb/global_control.h"
19#include "harness.h"
20#include "tbb/task_scheduler_observer.h"
21
22const size_t MB = 1024*1024;
23const double BARRIER_TIMEOUT = 10.;
24
25void TestStackSizeSimpleControl()
26{
27 {
28 tbb::global_control s0(tbb::global_control::thread_stack_size, 1*MB);
29
30 {
31 tbb::global_control s1(tbb::global_control::thread_stack_size, 8*MB);
32
33 ASSERT(8*MB == tbb::global_control::active_value(tbb::global_control::thread_stack_size), NULL);
34 }
35 ASSERT(1*MB == tbb::global_control::active_value(tbb::global_control::thread_stack_size), NULL);
36 }
37}
38
39#include "harness_concurrency_tracker.h"
40#include "tbb/task_scheduler_init.h"
41#include <limits.h> // for UINT_MAX
42
43struct StackSizeRun: NoAssign {
44 int num_threads;
45 Harness::SpinBarrier *barr1, *barr2;
46
47 StackSizeRun(int threads, Harness::SpinBarrier *b1, Harness::SpinBarrier *b2) :
48 num_threads(threads), barr1(b1), barr2(b2) {}
49 void operator()( int id ) const {
50 tbb::global_control s1(tbb::global_control::thread_stack_size, (1+id)*MB);
51
52 barr1->timed_wait(BARRIER_TIMEOUT);
53
54 ASSERT(num_threads*MB == tbb::global_control::active_value(tbb::global_control::thread_stack_size), NULL);
55 barr2->timed_wait(BARRIER_TIMEOUT);
56 }
57};
58
59void TestStackSizeThreadsControl()
60{
61 int threads = 4;
62 Harness::SpinBarrier barr1(threads), barr2(threads);
63 NativeParallelFor( threads, StackSizeRun(threads, &barr1, &barr2) );
64}
65
66void RunWorkersLimited(int tsi_max_threads, size_t parallelism, bool wait)
67{
68 tbb::global_control s(tbb::global_control::max_allowed_parallelism, parallelism);
69 // try both configuration with already sleeping workers and with not yet sleeping
70 if (wait)
71 Harness::Sleep(100);
72 const size_t expected_threads = tsi_max_threads>0?
73 min( (unsigned)tsi_max_threads, parallelism )
74 : ( tbb::tbb_thread::hardware_concurrency()==1? 1 : parallelism );
75 Harness::ExactConcurrencyLevel::check(expected_threads);
76}
77
78class blocking_task_scheduler_init {
79 tbb::task_scheduler_init init;
80public:
81 blocking_task_scheduler_init(int num_threads = tbb::task_scheduler_init::automatic) : init(num_threads) {}
82 ~blocking_task_scheduler_init() {
83 bool ok = init.blocking_terminate(std::nothrow);
84 ASSERT(ok, "blocking_terminate has failed");
85 }
86};
87
88void TSI_and_RunWorkers(int tsi_max_threads, size_t parallelism, size_t max_value)
89{
90 blocking_task_scheduler_init tsi(tsi_max_threads);
91 size_t active = tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism);
92 ASSERT(active == max(2U, max_value), "active_value must not be changed by task_scheduler_init");
93 RunWorkersLimited(tsi_max_threads, parallelism, /*wait=*/false);
94}
95
96#include "tbb/tbb_thread.h"
97
98void TestWorkers(size_t curr_par)
99{
100 const size_t max_parallelism =
101 tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism);
102 ASSERT(max(2U, tbb::tbb_thread::hardware_concurrency()) == max_parallelism, NULL);
103 {
104 const unsigned h_c = tbb::tbb_thread::hardware_concurrency();
105 tbb::global_control c(tbb::global_control::max_allowed_parallelism, curr_par);
106 size_t v = tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism);
107 ASSERT(!curr_par || max((size_t)2, curr_par) == v, NULL);
108 if (h_c > 1)
109 TSI_and_RunWorkers(tbb::task_scheduler_init::automatic, min(h_c, curr_par), curr_par);
110 if (curr_par) // do not call task_scheduler_init t(0);
111 TSI_and_RunWorkers((int)curr_par, curr_par, curr_par);
112 if (curr_par > 2) { // check that min(tsi, parallelism) is active
113 TSI_and_RunWorkers((int)curr_par-1, curr_par, curr_par);
114 TSI_and_RunWorkers((int)curr_par, curr_par-1, curr_par);
115 }
116 // check constrains on control's value: it can't be increased
117 tbb::global_control c1(tbb::global_control::max_allowed_parallelism, curr_par+1);
118 v = tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism);
119 if (curr_par)
120 ASSERT(max(2U, curr_par) == v, "It's impossible to increase maximal parallelism.");
121 else
122 ASSERT(2 == v, NULL);
123 }
124 ASSERT(tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)
125 == max_parallelism,
126 "max parallelism has been restored successfully after decreasing/increasing");
127}
128
129void TestWorkersConstraints() {
130 const size_t max_parallelism =
131 tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism);
132 blocking_task_scheduler_init tsi;
133 if (max_parallelism > 3) {
134 tbb::global_control c(tbb::global_control::max_allowed_parallelism, max_parallelism-1);
135 ASSERT(max_parallelism-1 ==
136 tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism),
137 "Allowed parallelism must be decreasable.");
138 tbb::global_control c1(tbb::global_control::max_allowed_parallelism, max_parallelism-2);
139 ASSERT(max_parallelism-2 ==
140 tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism),
141 "Allowed parallelism must be decreasable.");
142 }
143 const size_t limit_par = min(max_parallelism, 4U);
144 // check that constrains are really met
145 for (int wait=0; wait<2; wait++) {
146 for (size_t num=2; num<limit_par; num++)
147 RunWorkersLimited(tbb::task_scheduler_init::automatic, num, wait==1);
148 for (size_t num=limit_par; num>1; num--)
149 RunWorkersLimited(tbb::task_scheduler_init::automatic, num, wait==1);
150 }
151}
152
153struct DummyBody {
154 void operator()(int) const {
155 __TBB_Pause(1);
156 }
157};
158
159void RunParallelWork() {
160 const int LOOP_ITERS = 10*1000;
161 tbb::parallel_for(0, LOOP_ITERS, DummyBody(), tbb::simple_partitioner());
162}
163
164struct SetUseRun: NoAssign {
165 Harness::SpinBarrier *barr;
166
167 SetUseRun(Harness::SpinBarrier *b) : barr(b) {}
168 void operator()( int id ) const {
169 if (id == 0) {
170 for (int i=0; i<10; i++) {
171 blocking_task_scheduler_init tsi;
172 RunParallelWork();
173 barr->timed_wait(BARRIER_TIMEOUT);
174 }
175 } else {
176 for (int i=0; i<10; i++) {
177 tbb::global_control c(tbb::global_control::max_allowed_parallelism, 8);
178 barr->timed_wait(BARRIER_TIMEOUT);
179 }
180 }
181 }
182};
183
184void TestConcurrentSetUseConcurrency()
185{
186 Harness::SpinBarrier barr(2);
187 NativeParallelFor( 2, SetUseRun(&barr) );
188}
189
190// check number of workers after autoinitialization
191void TestAutoInit()
192{
193 const size_t max_parallelism =
194 tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism);
195 const unsigned expected_threads = tbb::tbb_thread::hardware_concurrency()==1?
196 1 : (unsigned)max_parallelism;
197 Harness::ExactConcurrencyLevel::check(expected_threads);
198 ASSERT(tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)
199 == max_parallelism, "max_allowed_parallelism must not be changed after auto init");
200 if (max_parallelism > 2) {
201 // after autoinit it's possible to decrease workers number
202 tbb::global_control s(tbb::global_control::max_allowed_parallelism, max_parallelism-1);
203 Harness::ExactConcurrencyLevel::check(max_parallelism-1);
204 }
205}
206
207// need this to use TRY_BAD_EXPR_ENABLED when TBB_USE_ASSERT is not defined
208#undef TBB_USE_ASSERT
209#define TBB_USE_ASSERT 1
210
211#include "harness_bad_expr.h"
212
213void TestInvalidParallelism()
214{
215#if TRY_BAD_EXPR_ENABLED
216 const size_t max_parallelism =
217 tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism);
218 {
219 tbb::set_assertion_handler( AssertionFailureHandler );
220 TRY_BAD_EXPR( tbb::global_control c(tbb::global_control::max_allowed_parallelism, 0),
221 "max_allowed_parallelism cannot be 0." );
222 tbb::set_assertion_handler( ReportError );
223 ASSERT(tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)
224 == max_parallelism, NULL);
225 }
226 {
227 const size_t P = 2;
228 tbb::global_control c(tbb::global_control::max_allowed_parallelism, P);
229 ASSERT(tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)
230 == P, NULL);
231 tbb::set_assertion_handler( AssertionFailureHandler );
232 TRY_BAD_EXPR( tbb::global_control cZ(tbb::global_control::max_allowed_parallelism, 0),
233 "max_allowed_parallelism cannot be 0." );
234 tbb::set_assertion_handler( ReportError );
235 ASSERT(tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)
236 == P, NULL);
237 }
238 ASSERT(tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism)
239 == max_parallelism, NULL);
240#endif /* TRY_BAD_EXPR_ENABLED */
241}
242
243void TestTooBigStack()
244{
245#if __TBB_x86_32
246 const size_t stack_sizes[] = {512*MB, 2*1024*MB, UINT_MAX};
247#else
248 const size_t stack_sizes[] = {512*MB, 2*1024*MB, UINT_MAX, 10LU*1024*MB};
249#endif
250
251#if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)
252 size_t default_ss = tbb::global_control::active_value(tbb::global_control::thread_stack_size);
253#endif
254 for (unsigned i = 0; i<Harness::array_length(stack_sizes); i++) {
255 // No stack size setting for Windows 8 Store* apps, skip it
256#if TRY_BAD_EXPR_ENABLED && __TBB_x86_64 && (_WIN32 || _WIN64) && !(__TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00))
257 if (stack_sizes[i] != (unsigned)stack_sizes[i]) {
258 size_t curr_ss = tbb::global_control::active_value(tbb::global_control::thread_stack_size);
259 tbb::set_assertion_handler( AssertionFailureHandler );
260 TRY_BAD_EXPR( tbb::global_control s1(tbb::global_control::thread_stack_size, stack_sizes[i]), "Stack size is limited to unsigned int range" );
261 tbb::set_assertion_handler( ReportError );
262 ASSERT(curr_ss == tbb::global_control::active_value(tbb::global_control::thread_stack_size), "Changing of stack size is not expected.");
263 continue;
264 }
265#endif
266 tbb::global_control s1(tbb::global_control::thread_stack_size, stack_sizes[i]);
267 size_t actual_stack_sz = tbb::global_control::active_value(tbb::global_control::thread_stack_size);
268#if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)
269 ASSERT(actual_stack_sz == default_ss, "It's ignored for Windows 8.x Store* apps");
270#else
271 ASSERT(actual_stack_sz==stack_sizes[i], NULL);
272#endif
273 }
274}
275
276struct ParallelForRun: NoAssign {
277 int num_threads;
278 Harness::SpinBarrier *barr1, *barr2;
279
280 ParallelForRun(Harness::SpinBarrier *b1, Harness::SpinBarrier *b2) :
281 barr1(b1), barr2(b2) {}
282 void operator()( int /*id*/ ) const {
283 barr1->timed_wait(BARRIER_TIMEOUT);
284 RunParallelWork();
285 barr2->timed_wait(BARRIER_TIMEOUT);
286 }
287};
288
289class FFTask: public tbb::task {
290 tbb::atomic<int> *counter;
291 tbb::task* execute() __TBB_override {
292 (*counter)++;
293 return NULL;
294 }
295public:
296 FFTask(tbb::atomic<int> *counter_) : counter(counter_) {}
297};
298
299class WaiterTask: public tbb::task {
300 tbb::atomic<bool> *flag;
301 tbb::task* execute() __TBB_override {
302 while(!*flag)
303 __TBB_Yield();
304 return NULL;
305 }
306public:
307 WaiterTask(tbb::atomic<bool> *flag_) : flag(flag_) {}
308};
309
310class WorkAndEnqueueTask: public tbb::task {
311 tbb::atomic<int> *counter;
312 tbb::atomic<bool> *signalToLeave;
313 tbb::task* execute() __TBB_override {
314 RunParallelWork();
315 *signalToLeave = true;
316 for (int i=0; i<ENQUEUE_TASKS; i++) {
317 FFTask* t = new( tbb::task::allocate_root() ) FFTask(counter);
318 tbb::task::enqueue(*t);
319 }
320
321 return NULL;
322 }
323public:
324 static const int ENQUEUE_TASKS = 10;
325 WorkAndEnqueueTask(tbb::atomic<int> *counter_, tbb::atomic<bool> *signal_)
326 : counter(counter_), signalToLeave(signal_) {}
327};
328
329#if __TBB_TASK_PRIORITY
330tbb::priority_t getPriorityByInt(int i) {
331 return i%3==0? tbb::priority_low : (i%3==1? tbb::priority_normal :
332 tbb::priority_high);
333}
334#endif
335
336class FFTasksRun: NoAssign {
337 void enqTasks(int id) const {
338 for (int i=0; i<ITERS; i++) {
339 FFTask* t = new( tbb::task::allocate_root() ) FFTask(cnt);
340#if __TBB_TASK_PRIORITY
341 tbb::priority_t p = getPriorityByInt(i+id);
342 tbb::task::enqueue(*t, p);
343#else
344 tbb::internal::suppress_unused_warning(id);
345 tbb::task::enqueue(*t);
346#endif
347 }
348 }
349public:
350 static const int ITERS = 20;
351 Harness::SpinBarrier *barr;
352 tbb::atomic<int> *cnt;
353
354 FFTasksRun(Harness::SpinBarrier *b, tbb::atomic<int> *c) :
355 barr(b), cnt(c) {}
356 void operator()(int id) const {
357 if (id)
358 enqTasks(id);
359 barr->wait();
360 if (!id)
361 enqTasks(id);
362 }
363};
364
365void TestTaskEnqueue()
366{
367 {
368 blocking_task_scheduler_init tsi(20);
369 tbb::atomic<int> flag;
370 tbb::atomic<bool> taskDoneFlag;
371 flag = 0;
372 taskDoneFlag = false;
373
374 for (int i=0; i<10; i++) {
375 WaiterTask* w = new( tbb::task::allocate_root() ) WaiterTask(&taskDoneFlag);
376 tbb::task::enqueue(*w);
377 }
378 tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1);
379 taskDoneFlag = true;
380
381 FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag);
382 tbb::task::enqueue(*t);
383 while(!flag)
384 __TBB_Yield();
385 }
386 {
387 blocking_task_scheduler_init tsi(1);
388 tbb::atomic<int> flag;
389 tbb::atomic<bool> taskDoneFlag;
390 flag = 0;
391 taskDoneFlag = false;
392
393 WaiterTask* w = new( tbb::task::allocate_root() ) WaiterTask(&taskDoneFlag);
394 tbb::task::enqueue(*w);
395 taskDoneFlag = true;
396
397 tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1);
398
399 FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag);
400 tbb::task::enqueue(*t);
401 while(!flag)
402 __TBB_Yield();
403 }
404 {
405 blocking_task_scheduler_init tsi(2);
406 tbb::atomic<int> flag;
407 flag = 0;
408
409 tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1);
410
411 FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag);
412 tbb::task::enqueue(*t);
413 while(!flag)
414 __TBB_Yield();
415 }
416 {
417 blocking_task_scheduler_init tsi(2);
418 tbb::atomic<int> flag;
419 flag = 0;
420
421 FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag);
422 tbb::task::enqueue(*t);
423 tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1);
424
425 while(!flag)
426 __TBB_Yield();
427 }
428
429 tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1);
430
431 { // check that enqueue() guarantee mandatory parallelism
432 blocking_task_scheduler_init tsi(1);
433 tbb::atomic<int> flag;
434 flag = 0;
435
436 FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag);
437 tbb::task::enqueue(*t);
438 while(!flag)
439 __TBB_Yield();
440 }
441 {
442 tbb::atomic<int> flag;
443 flag = 0;
444 {
445 blocking_task_scheduler_init tsi(1);
446
447 for (int i=0; i<10; i++) {
448 FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag);
449#if __TBB_TASK_PRIORITY
450 const tbb::priority_t p = getPriorityByInt(i);
451 tbb::task::enqueue(*t, p);
452#else
453 tbb::task::enqueue(*t);
454#endif
455 }
456 }
457 ASSERT(flag==10, "The tasks must be terminated when task_scheduler_init destroyed.");
458 }
459 const unsigned threads = 2;
460 {
461 blocking_task_scheduler_init tsi(1);
462 Harness::SpinBarrier barr1(threads), barr2(threads);
463 RunWorkersLimited(1, 1, false);
464
465 NativeParallelFor( threads, ParallelForRun(&barr1, &barr2) );
466 }
467
468 tbb::atomic<int> counter;
469 counter = 0;
470 {
471 blocking_task_scheduler_init tsi(1);
472 Harness::SpinBarrier barr(threads);
473 RunWorkersLimited(1, 1, false);
474
475 NativeParallelFor( threads, FFTasksRun(&barr, &counter) );
476 }
477 ASSERT(counter == threads*FFTasksRun::ITERS, "All tasks must be done when task_scheduler_init destroyed.");
478 counter = 0;
479 { // an enqueued task can enqueue other tasks and calls parallel_for
480 tbb::atomic<bool> signalToLeave;
481 blocking_task_scheduler_init tsi(1);
482
483 signalToLeave = false;
484 WorkAndEnqueueTask *t = new( tbb::task::allocate_root() )
485 WorkAndEnqueueTask(&counter, &signalToLeave);
486 tbb::task::enqueue(*t);
487 RunParallelWork();
488
489 while (!signalToLeave)
490 __TBB_Yield();
491 }
492 ASSERT(counter == WorkAndEnqueueTask::ENQUEUE_TASKS, "All tasks must be done when task_scheduler_init destroyed.");
493}
494
495class CountWorkersTask: public tbb::task {
496 tbb::atomic<bool> *flag;
497 // count unique worker threads
498 static tbb::combinable<size_t> uniqThreads;
499
500 tbb::task* execute() __TBB_override {
501 uniqThreads.local() = 1;
502 Harness::Sleep(10);
503 *flag = 1;
504 return NULL;
505 }
506public:
507 CountWorkersTask(tbb::atomic<bool> *flag_) : flag(flag_) {}
508 static size_t observedThreads() {
509 return uniqThreads.combine(std::plus<size_t>());
510 }
511};
512
513tbb::combinable<size_t> CountWorkersTask::uniqThreads;
514
515tbb::atomic<int> activeArenas;
516
517class ArenaObserver: public tbb::task_scheduler_observer {
518public:
519 ArenaObserver() : tbb::task_scheduler_observer(/*local=*/true) {
520 }
521 void on_scheduler_entry( bool worker ) __TBB_override {
522 if (worker) {
523 ++activeArenas;
524 }
525 }
526 void on_scheduler_exit( bool worker ) __TBB_override {
527 if (worker) {
528 --activeArenas;
529 }
530 }
531};
532
533ArenaObserver observers[2];
534
535struct ArenasObserveRun: NoAssign {
536 Harness::SpinBarrier *barr;
537
538 ArenasObserveRun(Harness::SpinBarrier *b) : barr(b) {}
539 void operator()( int id ) const {
540 observers[id].observe(true);
541 ArenaObserver o;
542 tbb::atomic<bool> flag;
543 flag = false;
544
545 CountWorkersTask* t = new( tbb::task::allocate_root() )
546 CountWorkersTask(&flag);
547 barr->wait();
548 tbb::task::enqueue(*t);
549 while(!flag)
550 __TBB_Yield();
551 }
552};
553
554struct ArenaRun: NoAssign {
555 tbb::atomic<int> *counter;
556
557 ArenaRun(tbb::atomic<int> *counter_) : counter(counter_) {}
558 void operator()() const {
559 (*counter)++;
560 }
561};
562
563struct ArenaUserRun: NoAssign {
564 static const int ENQUEUE_TASKS = 10;
565 tbb::task_arena *arena;
566 Harness::SpinBarrier *barr;
567 tbb::atomic<int> *counter;
568
569 ArenaUserRun(tbb::task_arena *a, Harness::SpinBarrier *b, tbb::atomic<int> *c) :
570 arena(a), barr(b), counter(c) {}
571 void operator()( int id ) const {
572
573 for (int i=0; i<ENQUEUE_TASKS; i++)
574 arena->enqueue(ArenaRun(counter));
575 barr->wait();
576 if (!id)
577 arena->terminate();
578 }
579};
580
581void TestConcurrentArenas()
582{
583 Harness::SpinBarrier barrier(2);
584 tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1);
585 {
586 blocking_task_scheduler_init tsi(2);
587 ArenaObserver observer;
588 observer.observe(true);
589
590 Harness::ExactConcurrencyLevel::check(1); // must have 0 worker threads
591
592 NativeParallelFor( 2, ArenasObserveRun(&barrier) );
593 ASSERT(1 == CountWorkersTask::observedThreads(),
594 "Single worker is expecting to serve mandatory parallelism.");
595 while(activeArenas) // wait till single worker termination
596 __TBB_Yield();
597
598 // check that without mandatory parallelism, still have 0 worker threads
599 Harness::ExactConcurrencyLevel::check(1);
600 }
601 tbb::atomic<int> counter;
602 counter = 0;
603 {
604 blocking_task_scheduler_init tsi(1);
605 tbb::task_arena arena(2);
606
607 NativeParallelFor( 2, ArenaUserRun(&arena, &barrier, &counter) );
608 }
609 ASSERT(counter == 2*ArenaUserRun::ENQUEUE_TASKS, "All tasks must be done.");
610}
611
612void TestParallelismRestored()
613{
614 const int TASKS = 5;
615 tbb::atomic<int> counter;
616 counter = 0;
617 {
618 const int P = 4;
619 blocking_task_scheduler_init tsi(P);
620 {
621 tbb::global_control s(tbb::global_control::max_allowed_parallelism, 1);
622 Harness::ExactConcurrencyLevel::check(1);
623 // create enforced concurrency in the arena
624 for (int i=0; i<TASKS; i++) {
625 FFTask* t = new( tbb::task::allocate_root() ) FFTask(&counter);
626 tbb::task::enqueue(*t);
627 }
628 }
629 // global control is off, check that concurrency P is available
630 Harness::ExactConcurrencyLevel::check(P);
631 }
632 ASSERT(counter==TASKS, "The tasks must be executed at this point.");
633}
634
635class NoUnwantedEnforcedRun {
636 Harness::SpinBarrier *globalBarrier;
637public:
638 NoUnwantedEnforcedRun(Harness::SpinBarrier *b) : globalBarrier(b) {}
639 void operator()( int id ) const {
640 Harness::SpinBarrier barr(1);
641
642 tbb::combinable<size_t> uniqThreads;
643 Harness::ExactConcurrencyLevel::check(1);
644 globalBarrier->wait();
645 if (id) {
646 for (int i=0; i<20; i++) {
647 Harness::ExactConcurrencyLevel::check(1); // no workers expected in the thread
648 }
649 } else {
650 // create enforced concurrency in a separate thread, thus provoke enforced worker without
651 // work to do to join arena with parallel_for
652 for (int i=0; i<10; i++) {
653 tbb::atomic<int> flag;
654 flag = 0;
655 FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag);
656 tbb::task::enqueue(*t);
657 Harness::ExactConcurrencyLevel::checkLessOrEqual(2, &uniqThreads);
658 size_t seen = uniqThreads.combine(std::plus<size_t>());
659 ASSERT(seen==1 || seen==2, NULL);
660 while(!flag)
661 __TBB_Yield();
662 }
663 }
664 }
665};
666
667// test that enforced concurrency from one thread doesn't affect another
668void TestNoUnwantedEnforced()
669{
670 Harness::SpinBarrier barrier(2);
671 tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1);
672 blocking_task_scheduler_init tsi(4);
673 NativeParallelFor( 2, NoUnwantedEnforcedRun(&barrier) );
674}
675
676class TestMultipleControlsRun {
677 Harness::SpinBarrier *barrier;
678public:
679 TestMultipleControlsRun(Harness::SpinBarrier *b) : barrier(b) {}
680 void operator()( int id ) const {
681 barrier->wait();
682 if (id) {
683 {
684 tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1);
685 Harness::ExactConcurrencyLevel::check(1);
686 barrier->wait();
687 }
688 Harness::ExactConcurrencyLevel::check(1);
689 barrier->wait();
690 {
691 tbb::global_control c(tbb::global_control::max_allowed_parallelism, 2);
692 Harness::ExactConcurrencyLevel::check(1);
693 barrier->wait();
694 Harness::ExactConcurrencyLevel::check(2);
695 barrier->wait();
696 }
697 } else {
698 {
699 Harness::ExactConcurrencyLevel::check(1);
700 tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1);
701 barrier->wait();
702 Harness::ExactConcurrencyLevel::check(1);
703 barrier->wait();
704 Harness::ExactConcurrencyLevel::check(1);
705 barrier->wait();
706 }
707 Harness::ExactConcurrencyLevel::check(2);
708 barrier->wait();
709 }
710 }
711};
712
713// test that global controls from different thread with overlapping lifetime
714// still keep parallelism under control
715void TestMultipleControls()
716{
717 blocking_task_scheduler_init tsi(2); // to prevent autoinitialization
718 Harness::SpinBarrier barrier(2);
719 NativeParallelFor( 2, TestMultipleControlsRun(&barrier) );
720}
721
722// enqueued tasks with priority below current must not be forgotten,
723// when enqueue enforced priority is enabled
724void TestForgottenEnqueuedTasks()
725{
726 tbb::task_scheduler_init tsi(2);
727 tbb::atomic<int> counter;
728 tbb::atomic<bool> waitFlag;
729
730 waitFlag = false;
731 counter = 0;
732 tbb::task &r = *new( tbb::task::allocate_root() ) tbb::empty_task;
733 r.set_ref_count(3);
734 for (int i=0; i<2; i++) {
735 tbb::task &t = *new( r.allocate_child() ) WaiterTask(&waitFlag);
736 tbb::task::spawn(t);
737 }
738 // all workers are occupied by blocked WaiterTask()
739 FFTask* t = new( tbb::task::allocate_root() ) FFTask(&counter);
740 tbb::task::enqueue(*t, tbb::priority_low);
741 {
742 tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1);
743 waitFlag = true; // WaiterTask() done, workers ready to use
744 while (!counter) // wait till FFTask() executed
745 __TBB_Yield();
746 }
747 r.wait_for_all();
748 tbb::task::destroy(r);
749}
750
751int TestMain()
752{
753 TestTaskEnqueue();
754 TestConcurrentArenas();
755 TestMultipleControls();
756 TestNoUnwantedEnforced();
757 const unsigned h_c = tbb::tbb_thread::hardware_concurrency();
758 bool excessHC;
759 {
760 tbb::task_scheduler_init t(h_c+1);
761 excessHC = Harness::ExactConcurrencyLevel::isEqual(h_c+1);
762 }
763 if (h_c>2)
764 TestWorkers(h_c-1);
765 if (excessHC) // requires hardware concurrency +1, otherwise hangs
766 TestWorkers(h_c+1);
767 if (excessHC || h_c >= 2)
768 TestWorkers(2);
769 if (excessHC || h_c >= 3)
770 TestWorkers(3);
771 TestWorkersConstraints();
772 TestConcurrentSetUseConcurrency();
773 TestInvalidParallelism();
774 TestAutoInit(); // auto-initialization done at this point
775
776 size_t default_ss = tbb::global_control::active_value(tbb::global_control::thread_stack_size);
777 ASSERT(default_ss, NULL);
778
779// it's impossible to change stack size for Windows 8 Store* apps, so skip the tests
780#if !(__TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00))
781 TestStackSizeSimpleControl();
782 TestStackSizeThreadsControl();
783#endif
784 TestTooBigStack();
785 ASSERT(default_ss == tbb::global_control::active_value(tbb::global_control::thread_stack_size), NULL);
786 return Harness::Done;
787}
788