1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | #define TBB_PREVIEW_WAITING_FOR_WORKERS 1 |
18 | #include "tbb/global_control.h" |
19 | #include "harness.h" |
20 | #include "tbb/task_scheduler_observer.h" |
21 | |
22 | const size_t MB = 1024*1024; |
23 | const double BARRIER_TIMEOUT = 10.; |
24 | |
25 | void TestStackSizeSimpleControl() |
26 | { |
27 | { |
28 | tbb::global_control s0(tbb::global_control::thread_stack_size, 1*MB); |
29 | |
30 | { |
31 | tbb::global_control s1(tbb::global_control::thread_stack_size, 8*MB); |
32 | |
33 | ASSERT(8*MB == tbb::global_control::active_value(tbb::global_control::thread_stack_size), NULL); |
34 | } |
35 | ASSERT(1*MB == tbb::global_control::active_value(tbb::global_control::thread_stack_size), NULL); |
36 | } |
37 | } |
38 | |
39 | #include "harness_concurrency_tracker.h" |
40 | #include "tbb/task_scheduler_init.h" |
41 | #include <limits.h> // for UINT_MAX |
42 | |
43 | struct StackSizeRun: NoAssign { |
44 | int num_threads; |
45 | Harness::SpinBarrier *barr1, *barr2; |
46 | |
47 | StackSizeRun(int threads, Harness::SpinBarrier *b1, Harness::SpinBarrier *b2) : |
48 | num_threads(threads), barr1(b1), barr2(b2) {} |
49 | void operator()( int id ) const { |
50 | tbb::global_control s1(tbb::global_control::thread_stack_size, (1+id)*MB); |
51 | |
52 | barr1->timed_wait(BARRIER_TIMEOUT); |
53 | |
54 | ASSERT(num_threads*MB == tbb::global_control::active_value(tbb::global_control::thread_stack_size), NULL); |
55 | barr2->timed_wait(BARRIER_TIMEOUT); |
56 | } |
57 | }; |
58 | |
59 | void TestStackSizeThreadsControl() |
60 | { |
61 | int threads = 4; |
62 | Harness::SpinBarrier barr1(threads), barr2(threads); |
63 | NativeParallelFor( threads, StackSizeRun(threads, &barr1, &barr2) ); |
64 | } |
65 | |
66 | void RunWorkersLimited(int tsi_max_threads, size_t parallelism, bool wait) |
67 | { |
68 | tbb::global_control s(tbb::global_control::max_allowed_parallelism, parallelism); |
69 | // try both configuration with already sleeping workers and with not yet sleeping |
70 | if (wait) |
71 | Harness::Sleep(100); |
72 | const size_t expected_threads = tsi_max_threads>0? |
73 | min( (unsigned)tsi_max_threads, parallelism ) |
74 | : ( tbb::tbb_thread::hardware_concurrency()==1? 1 : parallelism ); |
75 | Harness::ExactConcurrencyLevel::check(expected_threads); |
76 | } |
77 | |
78 | class blocking_task_scheduler_init { |
79 | tbb::task_scheduler_init init; |
80 | public: |
81 | blocking_task_scheduler_init(int num_threads = tbb::task_scheduler_init::automatic) : init(num_threads) {} |
82 | ~blocking_task_scheduler_init() { |
83 | bool ok = init.blocking_terminate(std::nothrow); |
84 | ASSERT(ok, "blocking_terminate has failed" ); |
85 | } |
86 | }; |
87 | |
88 | void TSI_and_RunWorkers(int tsi_max_threads, size_t parallelism, size_t max_value) |
89 | { |
90 | blocking_task_scheduler_init tsi(tsi_max_threads); |
91 | size_t active = tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism); |
92 | ASSERT(active == max(2U, max_value), "active_value must not be changed by task_scheduler_init" ); |
93 | RunWorkersLimited(tsi_max_threads, parallelism, /*wait=*/false); |
94 | } |
95 | |
96 | #include "tbb/tbb_thread.h" |
97 | |
98 | void TestWorkers(size_t curr_par) |
99 | { |
100 | const size_t max_parallelism = |
101 | tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism); |
102 | ASSERT(max(2U, tbb::tbb_thread::hardware_concurrency()) == max_parallelism, NULL); |
103 | { |
104 | const unsigned h_c = tbb::tbb_thread::hardware_concurrency(); |
105 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, curr_par); |
106 | size_t v = tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism); |
107 | ASSERT(!curr_par || max((size_t)2, curr_par) == v, NULL); |
108 | if (h_c > 1) |
109 | TSI_and_RunWorkers(tbb::task_scheduler_init::automatic, min(h_c, curr_par), curr_par); |
110 | if (curr_par) // do not call task_scheduler_init t(0); |
111 | TSI_and_RunWorkers((int)curr_par, curr_par, curr_par); |
112 | if (curr_par > 2) { // check that min(tsi, parallelism) is active |
113 | TSI_and_RunWorkers((int)curr_par-1, curr_par, curr_par); |
114 | TSI_and_RunWorkers((int)curr_par, curr_par-1, curr_par); |
115 | } |
116 | // check constrains on control's value: it can't be increased |
117 | tbb::global_control c1(tbb::global_control::max_allowed_parallelism, curr_par+1); |
118 | v = tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism); |
119 | if (curr_par) |
120 | ASSERT(max(2U, curr_par) == v, "It's impossible to increase maximal parallelism." ); |
121 | else |
122 | ASSERT(2 == v, NULL); |
123 | } |
124 | ASSERT(tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) |
125 | == max_parallelism, |
126 | "max parallelism has been restored successfully after decreasing/increasing" ); |
127 | } |
128 | |
129 | void TestWorkersConstraints() { |
130 | const size_t max_parallelism = |
131 | tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism); |
132 | blocking_task_scheduler_init tsi; |
133 | if (max_parallelism > 3) { |
134 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, max_parallelism-1); |
135 | ASSERT(max_parallelism-1 == |
136 | tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism), |
137 | "Allowed parallelism must be decreasable." ); |
138 | tbb::global_control c1(tbb::global_control::max_allowed_parallelism, max_parallelism-2); |
139 | ASSERT(max_parallelism-2 == |
140 | tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism), |
141 | "Allowed parallelism must be decreasable." ); |
142 | } |
143 | const size_t limit_par = min(max_parallelism, 4U); |
144 | // check that constrains are really met |
145 | for (int wait=0; wait<2; wait++) { |
146 | for (size_t num=2; num<limit_par; num++) |
147 | RunWorkersLimited(tbb::task_scheduler_init::automatic, num, wait==1); |
148 | for (size_t num=limit_par; num>1; num--) |
149 | RunWorkersLimited(tbb::task_scheduler_init::automatic, num, wait==1); |
150 | } |
151 | } |
152 | |
153 | struct DummyBody { |
154 | void operator()(int) const { |
155 | __TBB_Pause(1); |
156 | } |
157 | }; |
158 | |
159 | void RunParallelWork() { |
160 | const int LOOP_ITERS = 10*1000; |
161 | tbb::parallel_for(0, LOOP_ITERS, DummyBody(), tbb::simple_partitioner()); |
162 | } |
163 | |
164 | struct SetUseRun: NoAssign { |
165 | Harness::SpinBarrier *barr; |
166 | |
167 | SetUseRun(Harness::SpinBarrier *b) : barr(b) {} |
168 | void operator()( int id ) const { |
169 | if (id == 0) { |
170 | for (int i=0; i<10; i++) { |
171 | blocking_task_scheduler_init tsi; |
172 | RunParallelWork(); |
173 | barr->timed_wait(BARRIER_TIMEOUT); |
174 | } |
175 | } else { |
176 | for (int i=0; i<10; i++) { |
177 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 8); |
178 | barr->timed_wait(BARRIER_TIMEOUT); |
179 | } |
180 | } |
181 | } |
182 | }; |
183 | |
184 | void TestConcurrentSetUseConcurrency() |
185 | { |
186 | Harness::SpinBarrier barr(2); |
187 | NativeParallelFor( 2, SetUseRun(&barr) ); |
188 | } |
189 | |
190 | // check number of workers after autoinitialization |
191 | void TestAutoInit() |
192 | { |
193 | const size_t max_parallelism = |
194 | tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism); |
195 | const unsigned expected_threads = tbb::tbb_thread::hardware_concurrency()==1? |
196 | 1 : (unsigned)max_parallelism; |
197 | Harness::ExactConcurrencyLevel::check(expected_threads); |
198 | ASSERT(tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) |
199 | == max_parallelism, "max_allowed_parallelism must not be changed after auto init" ); |
200 | if (max_parallelism > 2) { |
201 | // after autoinit it's possible to decrease workers number |
202 | tbb::global_control s(tbb::global_control::max_allowed_parallelism, max_parallelism-1); |
203 | Harness::ExactConcurrencyLevel::check(max_parallelism-1); |
204 | } |
205 | } |
206 | |
207 | // need this to use TRY_BAD_EXPR_ENABLED when TBB_USE_ASSERT is not defined |
208 | #undef TBB_USE_ASSERT |
209 | #define TBB_USE_ASSERT 1 |
210 | |
211 | #include "harness_bad_expr.h" |
212 | |
213 | void TestInvalidParallelism() |
214 | { |
215 | #if TRY_BAD_EXPR_ENABLED |
216 | const size_t max_parallelism = |
217 | tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism); |
218 | { |
219 | tbb::set_assertion_handler( AssertionFailureHandler ); |
220 | TRY_BAD_EXPR( tbb::global_control c(tbb::global_control::max_allowed_parallelism, 0), |
221 | "max_allowed_parallelism cannot be 0." ); |
222 | tbb::set_assertion_handler( ReportError ); |
223 | ASSERT(tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) |
224 | == max_parallelism, NULL); |
225 | } |
226 | { |
227 | const size_t P = 2; |
228 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, P); |
229 | ASSERT(tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) |
230 | == P, NULL); |
231 | tbb::set_assertion_handler( AssertionFailureHandler ); |
232 | TRY_BAD_EXPR( tbb::global_control cZ(tbb::global_control::max_allowed_parallelism, 0), |
233 | "max_allowed_parallelism cannot be 0." ); |
234 | tbb::set_assertion_handler( ReportError ); |
235 | ASSERT(tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) |
236 | == P, NULL); |
237 | } |
238 | ASSERT(tbb::global_control::active_value(tbb::global_control::max_allowed_parallelism) |
239 | == max_parallelism, NULL); |
240 | #endif /* TRY_BAD_EXPR_ENABLED */ |
241 | } |
242 | |
243 | void TestTooBigStack() |
244 | { |
245 | #if __TBB_x86_32 |
246 | const size_t stack_sizes[] = {512*MB, 2*1024*MB, UINT_MAX}; |
247 | #else |
248 | const size_t stack_sizes[] = {512*MB, 2*1024*MB, UINT_MAX, 10LU*1024*MB}; |
249 | #endif |
250 | |
251 | #if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00) |
252 | size_t default_ss = tbb::global_control::active_value(tbb::global_control::thread_stack_size); |
253 | #endif |
254 | for (unsigned i = 0; i<Harness::array_length(stack_sizes); i++) { |
255 | // No stack size setting for Windows 8 Store* apps, skip it |
256 | #if TRY_BAD_EXPR_ENABLED && __TBB_x86_64 && (_WIN32 || _WIN64) && !(__TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)) |
257 | if (stack_sizes[i] != (unsigned)stack_sizes[i]) { |
258 | size_t curr_ss = tbb::global_control::active_value(tbb::global_control::thread_stack_size); |
259 | tbb::set_assertion_handler( AssertionFailureHandler ); |
260 | TRY_BAD_EXPR( tbb::global_control s1(tbb::global_control::thread_stack_size, stack_sizes[i]), "Stack size is limited to unsigned int range" ); |
261 | tbb::set_assertion_handler( ReportError ); |
262 | ASSERT(curr_ss == tbb::global_control::active_value(tbb::global_control::thread_stack_size), "Changing of stack size is not expected." ); |
263 | continue; |
264 | } |
265 | #endif |
266 | tbb::global_control s1(tbb::global_control::thread_stack_size, stack_sizes[i]); |
267 | size_t actual_stack_sz = tbb::global_control::active_value(tbb::global_control::thread_stack_size); |
268 | #if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00) |
269 | ASSERT(actual_stack_sz == default_ss, "It's ignored for Windows 8.x Store* apps" ); |
270 | #else |
271 | ASSERT(actual_stack_sz==stack_sizes[i], NULL); |
272 | #endif |
273 | } |
274 | } |
275 | |
276 | struct ParallelForRun: NoAssign { |
277 | int num_threads; |
278 | Harness::SpinBarrier *barr1, *barr2; |
279 | |
280 | ParallelForRun(Harness::SpinBarrier *b1, Harness::SpinBarrier *b2) : |
281 | barr1(b1), barr2(b2) {} |
282 | void operator()( int /*id*/ ) const { |
283 | barr1->timed_wait(BARRIER_TIMEOUT); |
284 | RunParallelWork(); |
285 | barr2->timed_wait(BARRIER_TIMEOUT); |
286 | } |
287 | }; |
288 | |
289 | class FFTask: public tbb::task { |
290 | tbb::atomic<int> *counter; |
291 | tbb::task* execute() __TBB_override { |
292 | (*counter)++; |
293 | return NULL; |
294 | } |
295 | public: |
296 | FFTask(tbb::atomic<int> *counter_) : counter(counter_) {} |
297 | }; |
298 | |
299 | class WaiterTask: public tbb::task { |
300 | tbb::atomic<bool> *flag; |
301 | tbb::task* execute() __TBB_override { |
302 | while(!*flag) |
303 | __TBB_Yield(); |
304 | return NULL; |
305 | } |
306 | public: |
307 | WaiterTask(tbb::atomic<bool> *flag_) : flag(flag_) {} |
308 | }; |
309 | |
310 | class WorkAndEnqueueTask: public tbb::task { |
311 | tbb::atomic<int> *counter; |
312 | tbb::atomic<bool> *signalToLeave; |
313 | tbb::task* execute() __TBB_override { |
314 | RunParallelWork(); |
315 | *signalToLeave = true; |
316 | for (int i=0; i<ENQUEUE_TASKS; i++) { |
317 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(counter); |
318 | tbb::task::enqueue(*t); |
319 | } |
320 | |
321 | return NULL; |
322 | } |
323 | public: |
324 | static const int ENQUEUE_TASKS = 10; |
325 | WorkAndEnqueueTask(tbb::atomic<int> *counter_, tbb::atomic<bool> *signal_) |
326 | : counter(counter_), signalToLeave(signal_) {} |
327 | }; |
328 | |
329 | #if __TBB_TASK_PRIORITY |
330 | tbb::priority_t getPriorityByInt(int i) { |
331 | return i%3==0? tbb::priority_low : (i%3==1? tbb::priority_normal : |
332 | tbb::priority_high); |
333 | } |
334 | #endif |
335 | |
336 | class FFTasksRun: NoAssign { |
337 | void enqTasks(int id) const { |
338 | for (int i=0; i<ITERS; i++) { |
339 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(cnt); |
340 | #if __TBB_TASK_PRIORITY |
341 | tbb::priority_t p = getPriorityByInt(i+id); |
342 | tbb::task::enqueue(*t, p); |
343 | #else |
344 | tbb::internal::suppress_unused_warning(id); |
345 | tbb::task::enqueue(*t); |
346 | #endif |
347 | } |
348 | } |
349 | public: |
350 | static const int ITERS = 20; |
351 | Harness::SpinBarrier *barr; |
352 | tbb::atomic<int> *cnt; |
353 | |
354 | FFTasksRun(Harness::SpinBarrier *b, tbb::atomic<int> *c) : |
355 | barr(b), cnt(c) {} |
356 | void operator()(int id) const { |
357 | if (id) |
358 | enqTasks(id); |
359 | barr->wait(); |
360 | if (!id) |
361 | enqTasks(id); |
362 | } |
363 | }; |
364 | |
365 | void TestTaskEnqueue() |
366 | { |
367 | { |
368 | blocking_task_scheduler_init tsi(20); |
369 | tbb::atomic<int> flag; |
370 | tbb::atomic<bool> taskDoneFlag; |
371 | flag = 0; |
372 | taskDoneFlag = false; |
373 | |
374 | for (int i=0; i<10; i++) { |
375 | WaiterTask* w = new( tbb::task::allocate_root() ) WaiterTask(&taskDoneFlag); |
376 | tbb::task::enqueue(*w); |
377 | } |
378 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1); |
379 | taskDoneFlag = true; |
380 | |
381 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag); |
382 | tbb::task::enqueue(*t); |
383 | while(!flag) |
384 | __TBB_Yield(); |
385 | } |
386 | { |
387 | blocking_task_scheduler_init tsi(1); |
388 | tbb::atomic<int> flag; |
389 | tbb::atomic<bool> taskDoneFlag; |
390 | flag = 0; |
391 | taskDoneFlag = false; |
392 | |
393 | WaiterTask* w = new( tbb::task::allocate_root() ) WaiterTask(&taskDoneFlag); |
394 | tbb::task::enqueue(*w); |
395 | taskDoneFlag = true; |
396 | |
397 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1); |
398 | |
399 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag); |
400 | tbb::task::enqueue(*t); |
401 | while(!flag) |
402 | __TBB_Yield(); |
403 | } |
404 | { |
405 | blocking_task_scheduler_init tsi(2); |
406 | tbb::atomic<int> flag; |
407 | flag = 0; |
408 | |
409 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1); |
410 | |
411 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag); |
412 | tbb::task::enqueue(*t); |
413 | while(!flag) |
414 | __TBB_Yield(); |
415 | } |
416 | { |
417 | blocking_task_scheduler_init tsi(2); |
418 | tbb::atomic<int> flag; |
419 | flag = 0; |
420 | |
421 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag); |
422 | tbb::task::enqueue(*t); |
423 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1); |
424 | |
425 | while(!flag) |
426 | __TBB_Yield(); |
427 | } |
428 | |
429 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1); |
430 | |
431 | { // check that enqueue() guarantee mandatory parallelism |
432 | blocking_task_scheduler_init tsi(1); |
433 | tbb::atomic<int> flag; |
434 | flag = 0; |
435 | |
436 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag); |
437 | tbb::task::enqueue(*t); |
438 | while(!flag) |
439 | __TBB_Yield(); |
440 | } |
441 | { |
442 | tbb::atomic<int> flag; |
443 | flag = 0; |
444 | { |
445 | blocking_task_scheduler_init tsi(1); |
446 | |
447 | for (int i=0; i<10; i++) { |
448 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag); |
449 | #if __TBB_TASK_PRIORITY |
450 | const tbb::priority_t p = getPriorityByInt(i); |
451 | tbb::task::enqueue(*t, p); |
452 | #else |
453 | tbb::task::enqueue(*t); |
454 | #endif |
455 | } |
456 | } |
457 | ASSERT(flag==10, "The tasks must be terminated when task_scheduler_init destroyed." ); |
458 | } |
459 | const unsigned threads = 2; |
460 | { |
461 | blocking_task_scheduler_init tsi(1); |
462 | Harness::SpinBarrier barr1(threads), barr2(threads); |
463 | RunWorkersLimited(1, 1, false); |
464 | |
465 | NativeParallelFor( threads, ParallelForRun(&barr1, &barr2) ); |
466 | } |
467 | |
468 | tbb::atomic<int> counter; |
469 | counter = 0; |
470 | { |
471 | blocking_task_scheduler_init tsi(1); |
472 | Harness::SpinBarrier barr(threads); |
473 | RunWorkersLimited(1, 1, false); |
474 | |
475 | NativeParallelFor( threads, FFTasksRun(&barr, &counter) ); |
476 | } |
477 | ASSERT(counter == threads*FFTasksRun::ITERS, "All tasks must be done when task_scheduler_init destroyed." ); |
478 | counter = 0; |
479 | { // an enqueued task can enqueue other tasks and calls parallel_for |
480 | tbb::atomic<bool> signalToLeave; |
481 | blocking_task_scheduler_init tsi(1); |
482 | |
483 | signalToLeave = false; |
484 | WorkAndEnqueueTask *t = new( tbb::task::allocate_root() ) |
485 | WorkAndEnqueueTask(&counter, &signalToLeave); |
486 | tbb::task::enqueue(*t); |
487 | RunParallelWork(); |
488 | |
489 | while (!signalToLeave) |
490 | __TBB_Yield(); |
491 | } |
492 | ASSERT(counter == WorkAndEnqueueTask::ENQUEUE_TASKS, "All tasks must be done when task_scheduler_init destroyed." ); |
493 | } |
494 | |
495 | class CountWorkersTask: public tbb::task { |
496 | tbb::atomic<bool> *flag; |
497 | // count unique worker threads |
498 | static tbb::combinable<size_t> uniqThreads; |
499 | |
500 | tbb::task* execute() __TBB_override { |
501 | uniqThreads.local() = 1; |
502 | Harness::Sleep(10); |
503 | *flag = 1; |
504 | return NULL; |
505 | } |
506 | public: |
507 | CountWorkersTask(tbb::atomic<bool> *flag_) : flag(flag_) {} |
508 | static size_t observedThreads() { |
509 | return uniqThreads.combine(std::plus<size_t>()); |
510 | } |
511 | }; |
512 | |
513 | tbb::combinable<size_t> CountWorkersTask::uniqThreads; |
514 | |
515 | tbb::atomic<int> activeArenas; |
516 | |
517 | class ArenaObserver: public tbb::task_scheduler_observer { |
518 | public: |
519 | ArenaObserver() : tbb::task_scheduler_observer(/*local=*/true) { |
520 | } |
521 | void on_scheduler_entry( bool worker ) __TBB_override { |
522 | if (worker) { |
523 | ++activeArenas; |
524 | } |
525 | } |
526 | void on_scheduler_exit( bool worker ) __TBB_override { |
527 | if (worker) { |
528 | --activeArenas; |
529 | } |
530 | } |
531 | }; |
532 | |
533 | ArenaObserver observers[2]; |
534 | |
535 | struct ArenasObserveRun: NoAssign { |
536 | Harness::SpinBarrier *barr; |
537 | |
538 | ArenasObserveRun(Harness::SpinBarrier *b) : barr(b) {} |
539 | void operator()( int id ) const { |
540 | observers[id].observe(true); |
541 | ArenaObserver o; |
542 | tbb::atomic<bool> flag; |
543 | flag = false; |
544 | |
545 | CountWorkersTask* t = new( tbb::task::allocate_root() ) |
546 | CountWorkersTask(&flag); |
547 | barr->wait(); |
548 | tbb::task::enqueue(*t); |
549 | while(!flag) |
550 | __TBB_Yield(); |
551 | } |
552 | }; |
553 | |
554 | struct ArenaRun: NoAssign { |
555 | tbb::atomic<int> *counter; |
556 | |
557 | ArenaRun(tbb::atomic<int> *counter_) : counter(counter_) {} |
558 | void operator()() const { |
559 | (*counter)++; |
560 | } |
561 | }; |
562 | |
563 | struct ArenaUserRun: NoAssign { |
564 | static const int ENQUEUE_TASKS = 10; |
565 | tbb::task_arena *arena; |
566 | Harness::SpinBarrier *barr; |
567 | tbb::atomic<int> *counter; |
568 | |
569 | ArenaUserRun(tbb::task_arena *a, Harness::SpinBarrier *b, tbb::atomic<int> *c) : |
570 | arena(a), barr(b), counter(c) {} |
571 | void operator()( int id ) const { |
572 | |
573 | for (int i=0; i<ENQUEUE_TASKS; i++) |
574 | arena->enqueue(ArenaRun(counter)); |
575 | barr->wait(); |
576 | if (!id) |
577 | arena->terminate(); |
578 | } |
579 | }; |
580 | |
581 | void TestConcurrentArenas() |
582 | { |
583 | Harness::SpinBarrier barrier(2); |
584 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1); |
585 | { |
586 | blocking_task_scheduler_init tsi(2); |
587 | ArenaObserver observer; |
588 | observer.observe(true); |
589 | |
590 | Harness::ExactConcurrencyLevel::check(1); // must have 0 worker threads |
591 | |
592 | NativeParallelFor( 2, ArenasObserveRun(&barrier) ); |
593 | ASSERT(1 == CountWorkersTask::observedThreads(), |
594 | "Single worker is expecting to serve mandatory parallelism." ); |
595 | while(activeArenas) // wait till single worker termination |
596 | __TBB_Yield(); |
597 | |
598 | // check that without mandatory parallelism, still have 0 worker threads |
599 | Harness::ExactConcurrencyLevel::check(1); |
600 | } |
601 | tbb::atomic<int> counter; |
602 | counter = 0; |
603 | { |
604 | blocking_task_scheduler_init tsi(1); |
605 | tbb::task_arena arena(2); |
606 | |
607 | NativeParallelFor( 2, ArenaUserRun(&arena, &barrier, &counter) ); |
608 | } |
609 | ASSERT(counter == 2*ArenaUserRun::ENQUEUE_TASKS, "All tasks must be done." ); |
610 | } |
611 | |
612 | void TestParallelismRestored() |
613 | { |
614 | const int TASKS = 5; |
615 | tbb::atomic<int> counter; |
616 | counter = 0; |
617 | { |
618 | const int P = 4; |
619 | blocking_task_scheduler_init tsi(P); |
620 | { |
621 | tbb::global_control s(tbb::global_control::max_allowed_parallelism, 1); |
622 | Harness::ExactConcurrencyLevel::check(1); |
623 | // create enforced concurrency in the arena |
624 | for (int i=0; i<TASKS; i++) { |
625 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(&counter); |
626 | tbb::task::enqueue(*t); |
627 | } |
628 | } |
629 | // global control is off, check that concurrency P is available |
630 | Harness::ExactConcurrencyLevel::check(P); |
631 | } |
632 | ASSERT(counter==TASKS, "The tasks must be executed at this point." ); |
633 | } |
634 | |
635 | class NoUnwantedEnforcedRun { |
636 | Harness::SpinBarrier *globalBarrier; |
637 | public: |
638 | NoUnwantedEnforcedRun(Harness::SpinBarrier *b) : globalBarrier(b) {} |
639 | void operator()( int id ) const { |
640 | Harness::SpinBarrier barr(1); |
641 | |
642 | tbb::combinable<size_t> uniqThreads; |
643 | Harness::ExactConcurrencyLevel::check(1); |
644 | globalBarrier->wait(); |
645 | if (id) { |
646 | for (int i=0; i<20; i++) { |
647 | Harness::ExactConcurrencyLevel::check(1); // no workers expected in the thread |
648 | } |
649 | } else { |
650 | // create enforced concurrency in a separate thread, thus provoke enforced worker without |
651 | // work to do to join arena with parallel_for |
652 | for (int i=0; i<10; i++) { |
653 | tbb::atomic<int> flag; |
654 | flag = 0; |
655 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(&flag); |
656 | tbb::task::enqueue(*t); |
657 | Harness::ExactConcurrencyLevel::checkLessOrEqual(2, &uniqThreads); |
658 | size_t seen = uniqThreads.combine(std::plus<size_t>()); |
659 | ASSERT(seen==1 || seen==2, NULL); |
660 | while(!flag) |
661 | __TBB_Yield(); |
662 | } |
663 | } |
664 | } |
665 | }; |
666 | |
667 | // test that enforced concurrency from one thread doesn't affect another |
668 | void TestNoUnwantedEnforced() |
669 | { |
670 | Harness::SpinBarrier barrier(2); |
671 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1); |
672 | blocking_task_scheduler_init tsi(4); |
673 | NativeParallelFor( 2, NoUnwantedEnforcedRun(&barrier) ); |
674 | } |
675 | |
676 | class TestMultipleControlsRun { |
677 | Harness::SpinBarrier *barrier; |
678 | public: |
679 | TestMultipleControlsRun(Harness::SpinBarrier *b) : barrier(b) {} |
680 | void operator()( int id ) const { |
681 | barrier->wait(); |
682 | if (id) { |
683 | { |
684 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1); |
685 | Harness::ExactConcurrencyLevel::check(1); |
686 | barrier->wait(); |
687 | } |
688 | Harness::ExactConcurrencyLevel::check(1); |
689 | barrier->wait(); |
690 | { |
691 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 2); |
692 | Harness::ExactConcurrencyLevel::check(1); |
693 | barrier->wait(); |
694 | Harness::ExactConcurrencyLevel::check(2); |
695 | barrier->wait(); |
696 | } |
697 | } else { |
698 | { |
699 | Harness::ExactConcurrencyLevel::check(1); |
700 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1); |
701 | barrier->wait(); |
702 | Harness::ExactConcurrencyLevel::check(1); |
703 | barrier->wait(); |
704 | Harness::ExactConcurrencyLevel::check(1); |
705 | barrier->wait(); |
706 | } |
707 | Harness::ExactConcurrencyLevel::check(2); |
708 | barrier->wait(); |
709 | } |
710 | } |
711 | }; |
712 | |
713 | // test that global controls from different thread with overlapping lifetime |
714 | // still keep parallelism under control |
715 | void TestMultipleControls() |
716 | { |
717 | blocking_task_scheduler_init tsi(2); // to prevent autoinitialization |
718 | Harness::SpinBarrier barrier(2); |
719 | NativeParallelFor( 2, TestMultipleControlsRun(&barrier) ); |
720 | } |
721 | |
722 | // enqueued tasks with priority below current must not be forgotten, |
723 | // when enqueue enforced priority is enabled |
724 | void TestForgottenEnqueuedTasks() |
725 | { |
726 | tbb::task_scheduler_init tsi(2); |
727 | tbb::atomic<int> counter; |
728 | tbb::atomic<bool> waitFlag; |
729 | |
730 | waitFlag = false; |
731 | counter = 0; |
732 | tbb::task &r = *new( tbb::task::allocate_root() ) tbb::empty_task; |
733 | r.set_ref_count(3); |
734 | for (int i=0; i<2; i++) { |
735 | tbb::task &t = *new( r.allocate_child() ) WaiterTask(&waitFlag); |
736 | tbb::task::spawn(t); |
737 | } |
738 | // all workers are occupied by blocked WaiterTask() |
739 | FFTask* t = new( tbb::task::allocate_root() ) FFTask(&counter); |
740 | tbb::task::enqueue(*t, tbb::priority_low); |
741 | { |
742 | tbb::global_control c(tbb::global_control::max_allowed_parallelism, 1); |
743 | waitFlag = true; // WaiterTask() done, workers ready to use |
744 | while (!counter) // wait till FFTask() executed |
745 | __TBB_Yield(); |
746 | } |
747 | r.wait_for_all(); |
748 | tbb::task::destroy(r); |
749 | } |
750 | |
751 | int TestMain() |
752 | { |
753 | TestTaskEnqueue(); |
754 | TestConcurrentArenas(); |
755 | TestMultipleControls(); |
756 | TestNoUnwantedEnforced(); |
757 | const unsigned h_c = tbb::tbb_thread::hardware_concurrency(); |
758 | bool excessHC; |
759 | { |
760 | tbb::task_scheduler_init t(h_c+1); |
761 | excessHC = Harness::ExactConcurrencyLevel::isEqual(h_c+1); |
762 | } |
763 | if (h_c>2) |
764 | TestWorkers(h_c-1); |
765 | if (excessHC) // requires hardware concurrency +1, otherwise hangs |
766 | TestWorkers(h_c+1); |
767 | if (excessHC || h_c >= 2) |
768 | TestWorkers(2); |
769 | if (excessHC || h_c >= 3) |
770 | TestWorkers(3); |
771 | TestWorkersConstraints(); |
772 | TestConcurrentSetUseConcurrency(); |
773 | TestInvalidParallelism(); |
774 | TestAutoInit(); // auto-initialization done at this point |
775 | |
776 | size_t default_ss = tbb::global_control::active_value(tbb::global_control::thread_stack_size); |
777 | ASSERT(default_ss, NULL); |
778 | |
779 | // it's impossible to change stack size for Windows 8 Store* apps, so skip the tests |
780 | #if !(__TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)) |
781 | TestStackSizeSimpleControl(); |
782 | TestStackSizeThreadsControl(); |
783 | #endif |
784 | TestTooBigStack(); |
785 | ASSERT(default_ss == tbb::global_control::active_value(tbb::global_control::thread_stack_size), NULL); |
786 | return Harness::Done; |
787 | } |
788 | |