1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | #define 1 |
18 | |
19 | #include <stdexcept> |
20 | #include <cstdlib> |
21 | #include <cstdio> |
22 | #include <vector> |
23 | #include <set> |
24 | |
25 | #include "harness_fp.h" |
26 | |
27 | #if __TBB_TASK_ISOLATION |
28 | // Whitebox stuff for TestIsolatedExecuteNS::ContinuationTest(). |
29 | // TODO: Consider better approach instead of the whitebox approach. |
30 | #define private public |
31 | #include "tbb/task.h" |
32 | #undef private |
33 | #endif /* __TBB_TASK_ISOLATION */ |
34 | |
35 | #include "tbb/task_arena.h" |
36 | #include "tbb/atomic.h" |
37 | #include "tbb/task_scheduler_observer.h" |
38 | #include "tbb/task_scheduler_init.h" |
39 | #include "tbb/parallel_for.h" |
40 | #include "tbb/blocked_range.h" |
41 | #include "tbb/enumerable_thread_specific.h" |
42 | |
43 | #include "harness_assert.h" |
44 | #include "harness.h" |
45 | #include "harness_barrier.h" |
46 | |
47 | #include "tbb/tbb_thread.h" |
48 | |
49 | #if _MSC_VER |
50 | // plays around __TBB_NO_IMPLICIT_LINKAGE. __TBB_LIB_NAME should be defined (in makefiles) |
51 | #pragma comment(lib, __TBB_STRING(__TBB_LIB_NAME)) |
52 | #endif |
53 | |
54 | #include "tbb/global_control.h" |
55 | //--------------------------------------------------// |
56 | // Test that task_arena::initialize and task_arena::terminate work when doing nothing else. |
57 | /* maxthread is treated as the biggest possible concurrency level. */ |
58 | void InitializeAndTerminate( int maxthread ) { |
59 | __TBB_TRY { |
60 | for( int i=0; i<200; ++i ) { |
61 | switch( i&3 ) { |
62 | // Arena is created inactive, initialization is always explicit. Lazy initialization is covered by other test functions. |
63 | // Explicit initialization can either keep the original values or change those. |
64 | // Arena termination can be explicit or implicit (in the destructor). |
65 | // TODO: extend with concurrency level checks if such a method is added. |
66 | default: { |
67 | tbb::task_arena arena( std::rand() % maxthread + 1 ); |
68 | ASSERT(!arena.is_active(), "arena should not be active until initialized" ); |
69 | arena.initialize(); |
70 | ASSERT(arena.is_active(), NULL); |
71 | arena.terminate(); |
72 | ASSERT(!arena.is_active(), "arena should not be active; it was terminated" ); |
73 | break; |
74 | } |
75 | case 0: { |
76 | tbb::task_arena arena( 1 ); |
77 | ASSERT(!arena.is_active(), "arena should not be active until initialized" ); |
78 | arena.initialize( std::rand() % maxthread + 1 ); // change the parameters |
79 | ASSERT(arena.is_active(), NULL); |
80 | break; |
81 | } |
82 | case 1: { |
83 | tbb::task_arena arena( tbb::task_arena::automatic ); |
84 | ASSERT(!arena.is_active(), NULL); |
85 | arena.initialize(); |
86 | ASSERT(arena.is_active(), NULL); |
87 | break; |
88 | } |
89 | case 2: { |
90 | tbb::task_arena arena; |
91 | ASSERT(!arena.is_active(), "arena should not be active until initialized" ); |
92 | arena.initialize( std::rand() % maxthread + 1 ); |
93 | ASSERT(arena.is_active(), NULL); |
94 | arena.terminate(); |
95 | ASSERT(!arena.is_active(), "arena should not be active; it was terminated" ); |
96 | break; |
97 | } |
98 | } |
99 | } |
100 | } __TBB_CATCH( std::runtime_error& error ) { |
101 | #if TBB_USE_EXCEPTIONS |
102 | REPORT("ERROR: %s\n" , error.what() ); |
103 | #endif /* TBB_USE_EXCEPTIONS */ |
104 | } |
105 | } |
106 | |
107 | //--------------------------------------------------// |
108 | // Definitions used in more than one test |
109 | typedef tbb::blocked_range<int> Range; |
110 | |
111 | // slot_id value: -1 is reserved by current_slot(), -2 is set in on_scheduler_exit() below |
112 | static tbb::enumerable_thread_specific<int> local_id, old_id, slot_id(-3); |
113 | |
114 | void ResetTLS() { |
115 | local_id.clear(); |
116 | old_id.clear(); |
117 | slot_id.clear(); |
118 | } |
119 | |
120 | class ArenaObserver : public tbb::task_scheduler_observer { |
121 | int myId; // unique observer/arena id within a test |
122 | int myMaxConcurrency; // concurrency of the associated arena |
123 | int myNumReservedSlots; // reserved slots in the associated arena |
124 | void on_scheduler_entry( bool is_worker ) __TBB_override { |
125 | int current_index = tbb::this_task_arena::current_thread_index(); |
126 | REMARK("a %s #%p is entering arena %d from %d on slot %d\n" , is_worker?"worker" :"master" , |
127 | &local_id.local(), myId, local_id.local(), current_index ); |
128 | ASSERT(current_index<(myMaxConcurrency>1?myMaxConcurrency:2), NULL); |
129 | if(is_worker) ASSERT(current_index>=myNumReservedSlots, NULL); |
130 | |
131 | ASSERT(!old_id.local(), "double call to on_scheduler_entry" ); |
132 | old_id.local() = local_id.local(); |
133 | ASSERT(old_id.local() != myId, "double entry to the same arena" ); |
134 | local_id.local() = myId; |
135 | slot_id.local() = current_index; |
136 | } |
137 | void on_scheduler_exit( bool is_worker ) __TBB_override { |
138 | REMARK("a %s #%p is leaving arena %d to %d\n" , is_worker?"worker" :"master" , |
139 | &local_id.local(), myId, old_id.local()); |
140 | ASSERT(local_id.local() == myId, "nesting of arenas is broken" ); |
141 | ASSERT(slot_id.local() == tbb::this_task_arena::current_thread_index(), NULL); |
142 | //!deprecated, remove when tbb::task_arena::current_thread_index is removed. |
143 | ASSERT(slot_id.local() == tbb::task_arena::current_thread_index(), NULL); |
144 | slot_id.local() = -2; |
145 | local_id.local() = old_id.local(); |
146 | old_id.local() = 0; |
147 | } |
148 | public: |
149 | ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id) |
150 | : tbb::task_scheduler_observer(a) |
151 | , myId(id) |
152 | , myMaxConcurrency(maxConcurrency) |
153 | , myNumReservedSlots(numReservedSlots) { |
154 | ASSERT(myId, NULL); |
155 | observe(true); |
156 | } |
157 | ~ArenaObserver () { |
158 | ASSERT(!old_id.local(), "inconsistent observer state" ); |
159 | } |
160 | }; |
161 | |
162 | struct IndexTrackingBody { // Must be used together with ArenaObserver |
163 | void operator() ( const Range& ) const { |
164 | ASSERT(slot_id.local() == tbb::this_task_arena::current_thread_index(), NULL); |
165 | //!deprecated, remove when tbb::task_arena::current_thread_index is removed. |
166 | ASSERT(slot_id.local() == tbb::task_arena::current_thread_index(), NULL); |
167 | for ( volatile int i = 0; i < 50000; ++i ) |
168 | ; |
169 | } |
170 | }; |
171 | |
172 | struct AsynchronousWork : NoAssign { |
173 | Harness::SpinBarrier &my_barrier; |
174 | bool my_is_blocking; |
175 | AsynchronousWork(Harness::SpinBarrier &a_barrier, bool blocking = true) |
176 | : my_barrier(a_barrier), my_is_blocking(blocking) {} |
177 | void operator()() const { |
178 | ASSERT(local_id.local() != 0, "not in explicit arena" ); |
179 | tbb::parallel_for(Range(0,500), IndexTrackingBody(), tbb::simple_partitioner(), *tbb::task::self().group()); |
180 | if(my_is_blocking) my_barrier.timed_wait(10); // must be asynchronous to master thread |
181 | else my_barrier.signal_nowait(); |
182 | } |
183 | }; |
184 | |
185 | //--------------------------------------------------// |
186 | // Test that task_arenas might be created and used from multiple application threads. |
187 | // Also tests arena observers. The parameter p is the index of an app thread running this test. |
188 | void TestConcurrentArenasFunc(int idx) { |
189 | // A regression test for observer activation order: |
190 | // check that arena observer can be activated before local observer |
191 | struct LocalObserver : public tbb::task_scheduler_observer { |
192 | LocalObserver() : tbb::task_scheduler_observer(/*local=*/true) { observe(true); } |
193 | }; |
194 | tbb::task_arena a1; |
195 | a1.initialize(1,0); |
196 | ArenaObserver o1(a1, 1, 0, idx*2+1); // the last argument is a "unique" observer/arena id for the test |
197 | ASSERT(o1.is_observing(), "Arena observer has not been activated" ); |
198 | LocalObserver lo; |
199 | ASSERT(lo.is_observing(), "Local observer has not been activated" ); |
200 | tbb::task_arena a2(2,1); |
201 | ArenaObserver o2(a2, 2, 1, idx*2+2); |
202 | ASSERT(o2.is_observing(), "Arena observer has not been activated" ); |
203 | Harness::SpinBarrier barrier(2); |
204 | AsynchronousWork work(barrier); |
205 | a1.enqueue(work); // put async work |
206 | barrier.timed_wait(10); |
207 | a2.enqueue(work); // another work |
208 | a2.execute(work); // my_barrier.timed_wait(10) inside |
209 | a1.debug_wait_until_empty(); |
210 | a2.debug_wait_until_empty(); |
211 | } |
212 | |
213 | void TestConcurrentArenas(int p) { |
214 | ResetTLS(); |
215 | NativeParallelFor( p, &TestConcurrentArenasFunc ); |
216 | } |
217 | |
218 | //--------------------------------------------------// |
219 | // Test multiple application threads working with a single arena at the same time. |
220 | class MultipleMastersPart1 : NoAssign { |
221 | tbb::task_arena &my_a; |
222 | Harness::SpinBarrier &my_b1, &my_b2; |
223 | public: |
224 | MultipleMastersPart1( tbb::task_arena &a, Harness::SpinBarrier &b1, Harness::SpinBarrier &b2) |
225 | : my_a(a), my_b1(b1), my_b2(b2) {} |
226 | void operator()(int) const { |
227 | my_a.execute(AsynchronousWork(my_b2, /*blocking=*/false)); |
228 | my_b1.timed_wait(10); |
229 | // A regression test for bugs 1954 & 1971 |
230 | my_a.enqueue(AsynchronousWork(my_b2, /*blocking=*/false)); |
231 | } |
232 | }; |
233 | |
234 | class MultipleMastersPart2 : NoAssign { |
235 | tbb::task_arena &my_a; |
236 | Harness::SpinBarrier &my_b; |
237 | public: |
238 | MultipleMastersPart2( tbb::task_arena &a, Harness::SpinBarrier &b) : my_a(a), my_b(b) {} |
239 | void operator()(int) const { |
240 | my_a.execute(AsynchronousWork(my_b, /*blocking=*/false)); |
241 | } |
242 | }; |
243 | |
244 | class MultipleMastersPart3 : NoAssign { |
245 | tbb::task_arena &my_a; |
246 | Harness::SpinBarrier &my_b; |
247 | |
248 | struct Runner : NoAssign { |
249 | tbb::task* const a_task; |
250 | Runner(tbb::task* const t) : a_task(t) {} |
251 | void operator()() const { |
252 | for ( volatile int i = 0; i < 10000; ++i ) |
253 | ; |
254 | a_task->decrement_ref_count(); |
255 | } |
256 | }; |
257 | |
258 | struct Waiter : NoAssign { |
259 | tbb::task* const a_task; |
260 | Waiter(tbb::task* const t) : a_task(t) {} |
261 | void operator()() const { |
262 | a_task->wait_for_all(); |
263 | } |
264 | }; |
265 | |
266 | public: |
267 | MultipleMastersPart3(tbb::task_arena &a, Harness::SpinBarrier &b) |
268 | : my_a(a), my_b(b) {} |
269 | void operator()(int idx) const { |
270 | tbb::empty_task* root_task = new(tbb::task::allocate_root()) tbb::empty_task; |
271 | my_b.timed_wait(10); // increases chances for task_arena initialization contention |
272 | for( int i=0; i<100; ++i) { |
273 | root_task->set_ref_count(2); |
274 | my_a.enqueue(Runner(root_task)); |
275 | my_a.execute(Waiter(root_task)); |
276 | } |
277 | tbb::task::destroy(*root_task); |
278 | REMARK("Master #%d: job completed, wait for others\n" , idx); |
279 | my_b.timed_wait(10); |
280 | } |
281 | }; |
282 | |
283 | class MultipleMastersPart4 : NoAssign { |
284 | tbb::task_arena &my_a; |
285 | Harness::SpinBarrier &my_b; |
286 | tbb::task_group_context *my_ag; |
287 | |
288 | struct Getter : NoAssign { |
289 | tbb::task_group_context *& my_g; |
290 | Getter(tbb::task_group_context *&a_g) : my_g(a_g) {} |
291 | void operator()() const { |
292 | my_g = tbb::task::self().group(); |
293 | } |
294 | }; |
295 | struct Checker : NoAssign { |
296 | tbb::task_group_context *my_g; |
297 | Checker(tbb::task_group_context *a_g) : my_g(a_g) {} |
298 | void operator()() const { |
299 | ASSERT(my_g == tbb::task::self().group(), NULL); |
300 | tbb::task *t = new( tbb::task::allocate_root() ) tbb::empty_task; |
301 | ASSERT(my_g == t->group(), NULL); |
302 | tbb::task::destroy(*t); |
303 | } |
304 | }; |
305 | struct NestedChecker : NoAssign { |
306 | const MultipleMastersPart4 &my_body; |
307 | NestedChecker(const MultipleMastersPart4 &b) : my_body(b) {} |
308 | void operator()() const { |
309 | tbb::task_group_context *nested_g = tbb::task::self().group(); |
310 | ASSERT(my_body.my_ag != nested_g, NULL); |
311 | tbb::task *t = new( tbb::task::allocate_root() ) tbb::empty_task; |
312 | ASSERT(nested_g == t->group(), NULL); |
313 | tbb::task::destroy(*t); |
314 | my_body.my_a.enqueue(Checker(my_body.my_ag)); |
315 | } |
316 | }; |
317 | public: |
318 | MultipleMastersPart4( tbb::task_arena &a, Harness::SpinBarrier &b) : my_a(a), my_b(b) { |
319 | my_a.execute(Getter(my_ag)); |
320 | } |
321 | // NativeParallelFor's functor |
322 | void operator()(int) const { |
323 | my_a.execute(*this); |
324 | } |
325 | // Arena's functor |
326 | void operator()() const { |
327 | Checker check(my_ag); |
328 | check(); |
329 | tbb::task_arena nested(1,1); |
330 | nested.execute(NestedChecker(*this)); // change arena |
331 | tbb::parallel_for(Range(0,1),*this); // change group context only |
332 | my_b.timed_wait(10); |
333 | my_a.execute(check); |
334 | check(); |
335 | } |
336 | // parallel_for's functor |
337 | void operator()(const Range &) const { |
338 | NestedChecker(*this)(); |
339 | my_a.execute(Checker(my_ag)); // restore arena context |
340 | } |
341 | }; |
342 | |
343 | void TestMultipleMasters(int p) { |
344 | { |
345 | REMARK("multiple masters, part 1\n" ); |
346 | ResetTLS(); |
347 | tbb::task_arena a(1,0); |
348 | a.initialize(); |
349 | ArenaObserver o(a, 1, 0, 1); |
350 | Harness::SpinBarrier barrier1(p), barrier2(2*p+1); // each of p threads will submit two tasks signaling the barrier |
351 | NativeParallelFor( p, MultipleMastersPart1(a, barrier1, barrier2) ); |
352 | barrier2.timed_wait(10); |
353 | a.debug_wait_until_empty(); |
354 | } { |
355 | REMARK("multiple masters, part 2\n" ); |
356 | ResetTLS(); |
357 | tbb::task_arena a(2,1); |
358 | ArenaObserver o(a, 2, 1, 2); |
359 | Harness::SpinBarrier barrier(p+2); |
360 | a.enqueue(AsynchronousWork(barrier, /*blocking=*/true)); // occupy the worker, a regression test for bug 1981 |
361 | NativeParallelFor( p, MultipleMastersPart2(a, barrier) ); |
362 | barrier.timed_wait(10); |
363 | a.debug_wait_until_empty(); |
364 | } { |
365 | // Regression test for the bug 1981 part 2 (task_arena::execute() with wait_for_all for an enqueued task) |
366 | REMARK("multiple masters, part 3: wait_for_all() in execute()\n" ); |
367 | tbb::task_arena a(p,1); |
368 | Harness::SpinBarrier barrier(p+1); // for masters to avoid endless waiting at least in some runs |
369 | // "Oversubscribe" the arena by 1 master thread |
370 | NativeParallelFor( p+1, MultipleMastersPart3(a, barrier) ); |
371 | a.debug_wait_until_empty(); |
372 | } { |
373 | int c = p%3? (p%2? p : 2) : 3; |
374 | REMARK("multiple masters, part 4: contexts, arena(%d)\n" , c); |
375 | ResetTLS(); |
376 | tbb::task_arena a(c, 1); |
377 | ArenaObserver o(a, c, 1, c); |
378 | Harness::SpinBarrier barrier(c); |
379 | MultipleMastersPart4 test(a, barrier); |
380 | NativeParallelFor(p, test); |
381 | a.debug_wait_until_empty(); |
382 | } |
383 | } |
384 | |
385 | //--------------------------------------------------// |
386 | // TODO: explain what TestArenaEntryConsistency does |
387 | #include <sstream> |
388 | #if TBB_USE_EXCEPTIONS |
389 | #include <stdexcept> |
390 | #include "tbb/tbb_exception.h" |
391 | #endif |
392 | |
393 | struct TestArenaEntryBody : FPModeContext { |
394 | tbb::atomic<int> &my_stage; // each execute increases it |
395 | std::stringstream my_id; |
396 | bool is_caught, is_expected; |
397 | enum { arenaFPMode = 1 }; |
398 | |
399 | TestArenaEntryBody(tbb::atomic<int> &s, int idx, int i) // init thread-specific instance |
400 | : FPModeContext(idx+i) |
401 | , my_stage(s) |
402 | , is_caught(false) |
403 | , is_expected( (idx&(1<<i)) != 0 && (TBB_USE_EXCEPTIONS) != 0 ) |
404 | { |
405 | my_id << idx << ':' << i << '@'; |
406 | } |
407 | void operator()() { // inside task_arena::execute() |
408 | // synchronize with other stages |
409 | int stage = my_stage++; |
410 | int slot = tbb::this_task_arena::current_thread_index(); |
411 | ASSERT(slot >= 0 && slot <= 1, "master or the only worker" ); |
412 | // wait until the third stage is delegated and then starts on slot 0 |
413 | while(my_stage < 2+slot) __TBB_Yield(); |
414 | // deduct its entry type and put it into id, it helps to find source of a problem |
415 | my_id << (stage < 3 ? (tbb::this_task_arena::current_thread_index()? |
416 | "delegated_to_worker" : stage < 2? "direct" : "delegated_to_master" ) |
417 | : stage == 3? "nested_same_ctx" : "nested_alien_ctx" ); |
418 | REMARK("running %s\n" , my_id.str().c_str()); |
419 | AssertFPMode(arenaFPMode); |
420 | if(is_expected) |
421 | __TBB_THROW(std::logic_error(my_id.str())); |
422 | // no code can be put here since exceptions can be thrown |
423 | } |
424 | void on_exception(const char *e) { // outside arena, in catch block |
425 | is_caught = true; |
426 | REMARK("caught %s\n" , e); |
427 | ASSERT(my_id.str() == e, NULL); |
428 | assertFPMode(); |
429 | } |
430 | void after_execute() { // outside arena and catch block |
431 | REMARK("completing %s\n" , my_id.str().c_str() ); |
432 | ASSERT(is_caught == is_expected, NULL); |
433 | assertFPMode(); |
434 | } |
435 | }; |
436 | |
437 | class ForEachArenaEntryBody : NoAssign { |
438 | tbb::task_arena &my_a; // expected task_arena(2,1) |
439 | tbb::atomic<int> &my_stage; // each execute increases it |
440 | int my_idx; |
441 | |
442 | public: |
443 | ForEachArenaEntryBody(tbb::task_arena &a, tbb::atomic<int> &c) |
444 | : my_a(a), my_stage(c), my_idx(0) {} |
445 | |
446 | void test(int idx) { |
447 | my_idx = idx; |
448 | my_stage = 0; |
449 | NativeParallelFor(3, *this); // test cross-arena calls |
450 | ASSERT(my_stage == 3, NULL); |
451 | my_a.execute(*this); // test nested calls |
452 | ASSERT(my_stage == 5, NULL); |
453 | } |
454 | |
455 | // task_arena functor for nested tests |
456 | void operator()() const { |
457 | test_arena_entry(3); // in current task group context |
458 | tbb::parallel_for(4, 5, *this); // in different context |
459 | } |
460 | |
461 | // NativeParallelFor & parallel_for functor |
462 | void operator()(int i) const { |
463 | test_arena_entry(i); |
464 | } |
465 | |
466 | private: |
467 | void test_arena_entry(int i) const { |
468 | TestArenaEntryBody scoped_functor(my_stage, my_idx, i); |
469 | __TBB_TRY { |
470 | my_a.execute(scoped_functor); |
471 | } |
472 | #if TBB_USE_EXCEPTIONS |
473 | catch(tbb::captured_exception &e) { |
474 | scoped_functor.on_exception(e.what()); |
475 | ASSERT_WARNING(TBB_USE_CAPTURED_EXCEPTION, "Caught captured_exception while expecting exact one" ); |
476 | } catch(std::logic_error &e) { |
477 | scoped_functor.on_exception(e.what()); |
478 | ASSERT(!TBB_USE_CAPTURED_EXCEPTION, "Caught exception of wrong type" ); |
479 | } catch(...) { ASSERT(false, "Unexpected exception type" ); } |
480 | #endif //TBB_USE_EXCEPTIONS |
481 | scoped_functor.after_execute(); |
482 | } |
483 | }; |
484 | |
485 | void TestArenaEntryConsistency() { |
486 | REMARK("test arena entry consistency\n" ); |
487 | |
488 | tbb::task_arena a(2, 1); |
489 | tbb::atomic<int> c; |
490 | ForEachArenaEntryBody body(a, c); |
491 | |
492 | FPModeContext fp_scope(TestArenaEntryBody::arenaFPMode); |
493 | a.initialize(); // capture FP settings to arena |
494 | fp_scope.setNextFPMode(); |
495 | |
496 | for (int i = 0; i < 100; i++) // not less than 32 = 2^5 of entry types |
497 | body.test(i); |
498 | } |
499 | |
500 | //-------------------------------------------------- |
501 | // Test that the requested degree of concurrency for task_arena is achieved in various conditions |
502 | class TestArenaConcurrencyBody : NoAssign { |
503 | tbb::task_arena &my_a; |
504 | int my_max_concurrency; |
505 | int my_reserved_slots; |
506 | Harness::SpinBarrier *my_barrier; |
507 | Harness::SpinBarrier *my_worker_barrier; |
508 | public: |
509 | TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, Harness::SpinBarrier *b = NULL, Harness::SpinBarrier *wb = NULL ) |
510 | : my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {} |
511 | // NativeParallelFor's functor |
512 | void operator()( int ) const { |
513 | ASSERT( local_id.local() == 0, "TLS was not cleaned?" ); |
514 | local_id.local() = 1; |
515 | my_a.execute( *this ); |
516 | } |
517 | // Arena's functor |
518 | void operator()() const { |
519 | ASSERT( tbb::task_arena::current_thread_index() == tbb::this_task_arena::current_thread_index(), NULL ); |
520 | int idx = tbb::this_task_arena::current_thread_index(); |
521 | ASSERT( idx < (my_max_concurrency > 1 ? my_max_concurrency : 2), NULL ); |
522 | ASSERT( my_a.max_concurrency() == tbb::this_task_arena::max_concurrency(), NULL ); |
523 | int max_arena_concurrency = tbb::this_task_arena::max_concurrency(); |
524 | ASSERT( max_arena_concurrency == my_max_concurrency, NULL ); |
525 | if ( my_worker_barrier ) { |
526 | if ( local_id.local() == 1 ) { |
527 | // Master thread in a reserved slot |
528 | ASSERT( idx < my_reserved_slots, "Masters are supposed to use only reserved slots in this test" ); |
529 | } else { |
530 | // Worker thread |
531 | ASSERT( idx >= my_reserved_slots, NULL ); |
532 | my_worker_barrier->timed_wait( 10 ); |
533 | } |
534 | } else if ( my_barrier ) |
535 | ASSERT( local_id.local() == 1, "Workers are not supposed to enter the arena in this test" ); |
536 | if ( my_barrier ) my_barrier->timed_wait( 10 ); |
537 | else Harness::Sleep( 10 ); |
538 | } |
539 | }; |
540 | |
541 | void TestArenaConcurrency( int p, int reserved = 0, int step = 1) { |
542 | for (; reserved <= p; reserved += step) { |
543 | REMARK("TestArenaConcurrency: %d slots, %d reserved\n" , p, reserved); |
544 | tbb::task_arena a( p, reserved ); |
545 | { // Check concurrency with worker & reserved master threads. |
546 | ResetTLS(); |
547 | Harness::SpinBarrier b( p ); |
548 | Harness::SpinBarrier wb( p-reserved ); |
549 | TestArenaConcurrencyBody test( a, p, reserved, &b, &wb ); |
550 | for ( int i = reserved; i < p; ++i ) |
551 | a.enqueue( test ); |
552 | if ( reserved==1 ) |
553 | test( 0 ); // calls execute() |
554 | else |
555 | NativeParallelFor( reserved, test ); |
556 | a.debug_wait_until_empty(); |
557 | } { // Check if multiple masters alone can achieve maximum concurrency. |
558 | ResetTLS(); |
559 | Harness::SpinBarrier b( p ); |
560 | NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) ); |
561 | a.debug_wait_until_empty(); |
562 | } { // Check oversubscription by masters. |
563 | ResetTLS(); |
564 | NativeParallelFor( 2*p, TestArenaConcurrencyBody( a, p, reserved ) ); |
565 | a.debug_wait_until_empty(); |
566 | } |
567 | } |
568 | } |
569 | |
570 | //--------------------------------------------------// |
571 | // Test creation/initialization of a task_arena that references an existing arena (aka attach). |
572 | // This part of the test uses the knowledge of task_arena internals |
573 | |
574 | typedef tbb::interface7::internal::task_arena_base task_arena_internals; |
575 | |
576 | struct TaskArenaValidator : public task_arena_internals { |
577 | int my_slot_at_construction; |
578 | TaskArenaValidator( const task_arena_internals& other ) |
579 | : task_arena_internals(other) /*copies the internal state of other*/ { |
580 | my_slot_at_construction = tbb::this_task_arena::current_thread_index(); |
581 | } |
582 | // Inspect the internal state |
583 | int concurrency() { return my_max_concurrency; } |
584 | int reserved_for_masters() { return (int)my_master_slots; } |
585 | |
586 | // This method should be called in task_arena::execute() for a captured arena |
587 | // by the same thread that created the validator. |
588 | void operator()() { |
589 | ASSERT( tbb::this_task_arena::current_thread_index()==my_slot_at_construction, |
590 | "Current thread index has changed since the validator construction" ); |
591 | //!deprecated |
592 | ASSERT( tbb::task_arena::current_thread_index()==my_slot_at_construction, |
593 | "Current thread index has changed since the validator construction" ); |
594 | } |
595 | }; |
596 | |
597 | void ValidateAttachedArena( tbb::task_arena& arena, bool expect_activated, |
598 | int expect_concurrency, int expect_masters ) { |
599 | ASSERT( arena.is_active()==expect_activated, "Unexpected activation state" ); |
600 | if( arena.is_active() ) { |
601 | TaskArenaValidator validator( arena ); |
602 | ASSERT( validator.concurrency()==expect_concurrency, "Unexpected arena size" ); |
603 | ASSERT( validator.reserved_for_masters()==expect_masters, "Unexpected # of reserved slots" ); |
604 | if ( tbb::this_task_arena::current_thread_index() != tbb::task_arena::not_initialized ) { |
605 | ASSERT( tbb::task_arena::current_thread_index() >= 0 && |
606 | tbb::this_task_arena::current_thread_index() >= 0, NULL); |
607 | // for threads already in arena, check that the thread index remains the same |
608 | arena.execute( validator ); |
609 | } else { // not_initialized |
610 | // Test the deprecated method |
611 | ASSERT( tbb::task_arena::current_thread_index()==-1, NULL); |
612 | } |
613 | |
614 | // Ideally, there should be a check for having the same internal arena object, |
615 | // but that object is not easily accessible for implicit arenas. |
616 | } |
617 | } |
618 | |
619 | struct TestAttachBody : NoAssign { |
620 | mutable int my_idx; // safe to modify and use within the NativeParallelFor functor |
621 | const int maxthread; |
622 | TestAttachBody( int max_thr ) : maxthread(max_thr) {} |
623 | |
624 | // The functor body for NativeParallelFor |
625 | void operator()( int idx ) const { |
626 | my_idx = idx; |
627 | int default_threads = tbb::task_scheduler_init::default_num_threads(); |
628 | |
629 | tbb::task_arena arena = tbb::task_arena( tbb::task_arena::attach() ); |
630 | ValidateAttachedArena( arena, false, -1, -1 ); // Nothing yet to attach to |
631 | |
632 | { // attach to an arena created via task_scheduler_init |
633 | tbb::task_scheduler_init init( idx+1 ); |
634 | |
635 | tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() ); |
636 | ValidateAttachedArena( arena2, true, idx+1, 1 ); |
637 | |
638 | arena.initialize( tbb::task_arena::attach() ); |
639 | } |
640 | ValidateAttachedArena( arena, true, idx+1, 1 ); |
641 | |
642 | arena.terminate(); |
643 | ValidateAttachedArena( arena, false, -1, -1 ); |
644 | |
645 | // Check default behavior when attach cannot succeed |
646 | switch (idx%2) { |
647 | case 0: |
648 | { // construct as attached, then initialize |
649 | tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() ); |
650 | ValidateAttachedArena( arena2, false, -1, -1 ); |
651 | arena2.initialize(); // must be initialized with default parameters |
652 | ValidateAttachedArena( arena2, true, default_threads, 1 ); |
653 | } |
654 | break; |
655 | case 1: |
656 | { // default-construct, then initialize as attached |
657 | tbb::task_arena arena2; |
658 | ValidateAttachedArena( arena2, false, -1, -1 ); |
659 | arena2.initialize( tbb::task_arena::attach() ); // must use default parameters |
660 | ValidateAttachedArena( arena2, true, default_threads, 1 ); |
661 | } |
662 | break; |
663 | } // switch |
664 | |
665 | // attach to an auto-initialized arena |
666 | tbb::empty_task& tsk = *new (tbb::task::allocate_root()) tbb::empty_task; |
667 | tbb::task::spawn_root_and_wait(tsk); |
668 | tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() ); |
669 | ValidateAttachedArena( arena2, true, default_threads, 1 ); |
670 | |
671 | // attach to another task_arena |
672 | arena.initialize( maxthread, min(maxthread,idx) ); |
673 | arena.execute( *this ); |
674 | } |
675 | |
676 | // The functor body for task_arena::execute above |
677 | void operator()() const { |
678 | tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() ); |
679 | ValidateAttachedArena( arena2, true, maxthread, min(maxthread,my_idx) ); |
680 | } |
681 | |
682 | // The functor body for tbb::parallel_for |
683 | void operator()( const Range& r ) const { |
684 | for( int i = r.begin(); i<r.end(); ++i ) { |
685 | tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() ); |
686 | ValidateAttachedArena( arena2, true, maxthread+1, 1 ); // +1 to match initialization in TestMain |
687 | } |
688 | } |
689 | }; |
690 | |
691 | void TestAttach( int maxthread ) { |
692 | REMARK( "Testing attached task_arenas\n" ); |
693 | // Externally concurrent, but no concurrency within a thread |
694 | NativeParallelFor( max(maxthread,4), TestAttachBody( maxthread ) ); |
695 | // Concurrent within the current arena; may also serve as a stress test |
696 | tbb::parallel_for( Range(0,10000*maxthread), TestAttachBody( maxthread ) ); |
697 | } |
698 | |
699 | //--------------------------------------------------// |
700 | // Test that task_arena::enqueue does not tolerate a non-const functor. |
701 | // TODO: can it be reworked as SFINAE-based compile-time check? |
702 | struct TestFunctor { |
703 | void operator()() { ASSERT( false, "Non-const operator called" ); } |
704 | void operator()() const { /* library requires this overload only */ } |
705 | }; |
706 | |
707 | void TestConstantFunctorRequirement() { |
708 | tbb::task_arena a; |
709 | TestFunctor tf; |
710 | a.enqueue( tf ); |
711 | #if __TBB_TASK_PRIORITY |
712 | a.enqueue( tf, tbb::priority_normal ); |
713 | #endif |
714 | } |
715 | //--------------------------------------------------// |
716 | #if __TBB_TASK_ISOLATION |
717 | #include "tbb/parallel_reduce.h" |
718 | #include "tbb/parallel_invoke.h" |
719 | // Test this_task_arena::isolate |
720 | namespace TestIsolatedExecuteNS { |
721 | //--------------------------------------------------// |
722 | template <typename NestedPartitioner> |
723 | class NestedParFor : NoAssign { |
724 | public: |
725 | NestedParFor() {} |
726 | void operator()() const { |
727 | NestedPartitioner p; |
728 | tbb::parallel_for( 0, 10, Harness::DummyBody( 10 ), p ); |
729 | } |
730 | }; |
731 | |
732 | template <typename NestedPartitioner> |
733 | class ParForBody : NoAssign { |
734 | bool myOuterIsolation; |
735 | tbb::enumerable_thread_specific<int> &myEts; |
736 | tbb::atomic<bool> &myIsStolen; |
737 | public: |
738 | ParForBody( bool outer_isolation, tbb::enumerable_thread_specific<int> &ets, tbb::atomic<bool> &is_stolen ) |
739 | : myOuterIsolation( outer_isolation ), myEts( ets ), myIsStolen( is_stolen ) {} |
740 | void operator()( int ) const { |
741 | int &e = myEts.local(); |
742 | if ( e++ > 0 ) myIsStolen = true; |
743 | if ( myOuterIsolation ) |
744 | NestedParFor<NestedPartitioner>()(); |
745 | else |
746 | tbb::this_task_arena::isolate( NestedParFor<NestedPartitioner>() ); |
747 | --e; |
748 | } |
749 | }; |
750 | |
751 | template <typename OuterPartitioner, typename NestedPartitioner> |
752 | class OuterParFor : NoAssign { |
753 | bool myOuterIsolation; |
754 | tbb::atomic<bool> &myIsStolen; |
755 | public: |
756 | OuterParFor( bool outer_isolation, tbb::atomic<bool> &is_stolen ) : myOuterIsolation( outer_isolation ), myIsStolen( is_stolen ) {} |
757 | void operator()() const { |
758 | tbb::enumerable_thread_specific<int> ets( 0 ); |
759 | OuterPartitioner p; |
760 | tbb::parallel_for( 0, 1000, ParForBody<NestedPartitioner>( myOuterIsolation, ets, myIsStolen ), p ); |
761 | } |
762 | }; |
763 | |
764 | template <typename OuterPartitioner, typename NestedPartitioner> |
765 | void TwoLoopsTest( bool outer_isolation ) { |
766 | tbb::atomic<bool> is_stolen; |
767 | is_stolen = false; |
768 | const int max_repeats = 100; |
769 | if ( outer_isolation ) { |
770 | for ( int i = 0; i <= max_repeats; ++i ) { |
771 | tbb::this_task_arena::isolate( OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen ) ); |
772 | if ( is_stolen ) break; |
773 | } |
774 | ASSERT_WARNING( is_stolen, "isolate() should not block stealing on nested levels without isolation" ); |
775 | } else { |
776 | for ( int i = 0; i <= max_repeats; ++i ) { |
777 | OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen )(); |
778 | } |
779 | ASSERT( !is_stolen, "isolate() on nested levels should prevent stealing from outer leves" ); |
780 | } |
781 | } |
782 | |
783 | void TwoLoopsTest( bool outer_isolation ) { |
784 | TwoLoopsTest<tbb::simple_partitioner, tbb::simple_partitioner>( outer_isolation ); |
785 | TwoLoopsTest<tbb::simple_partitioner, tbb::affinity_partitioner>( outer_isolation ); |
786 | TwoLoopsTest<tbb::affinity_partitioner, tbb::simple_partitioner>( outer_isolation ); |
787 | TwoLoopsTest<tbb::affinity_partitioner, tbb::affinity_partitioner>( outer_isolation ); |
788 | } |
789 | |
790 | void TwoLoopsTest() { |
791 | TwoLoopsTest( true ); |
792 | TwoLoopsTest( false ); |
793 | } |
794 | //--------------------------------------------------// |
795 | class HeavyMixTestBody : NoAssign { |
796 | tbb::enumerable_thread_specific<Harness::FastRandom>& myRandom; |
797 | tbb::enumerable_thread_specific<int>& myIsolatedLevel; |
798 | int myNestedLevel; |
799 | bool myHighPriority; |
800 | |
801 | template <typename Partitioner, typename Body> |
802 | static void RunTwoBodies( Harness::FastRandom& rnd, const Body &body, Partitioner& p, tbb::task_group_context* ctx = NULL ) { |
803 | if ( rnd.get() % 2 ) |
804 | if (ctx ) |
805 | tbb::parallel_for( 0, 2, body, p, *ctx ); |
806 | else |
807 | tbb::parallel_for( 0, 2, body, p ); |
808 | else |
809 | tbb::parallel_invoke( body, body ); |
810 | } |
811 | |
812 | template <typename Partitioner> |
813 | class IsolatedBody : NoAssign { |
814 | const HeavyMixTestBody &myHeavyMixTestBody; |
815 | Partitioner &myPartitioner; |
816 | public: |
817 | IsolatedBody( const HeavyMixTestBody &body, Partitioner &partitioner ) |
818 | : myHeavyMixTestBody( body ), myPartitioner( partitioner ) {} |
819 | void operator()() const { |
820 | RunTwoBodies( myHeavyMixTestBody.myRandom.local(), |
821 | HeavyMixTestBody( myHeavyMixTestBody.myRandom, myHeavyMixTestBody.myIsolatedLevel, |
822 | myHeavyMixTestBody.myNestedLevel + 1, myHeavyMixTestBody.myHighPriority ), |
823 | myPartitioner ); |
824 | } |
825 | }; |
826 | |
827 | template <typename Partitioner> |
828 | void RunNextLevel( Harness::FastRandom& rnd, int &isolated_level ) const { |
829 | Partitioner p; |
830 | switch ( rnd.get() % 3 ) { |
831 | case 0: { |
832 | // No features |
833 | tbb::task_group_context ctx; |
834 | if ( myHighPriority ) |
835 | ctx.set_priority( tbb::priority_high ); |
836 | RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1, myHighPriority), p, &ctx ); |
837 | break; |
838 | } |
839 | case 1: { |
840 | // High priority |
841 | tbb::task_group_context ctx; |
842 | ctx.set_priority( tbb::priority_high ); |
843 | RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1, true), p, &ctx ); |
844 | break; |
845 | } |
846 | case 2: { |
847 | // Isolation |
848 | int previous_isolation = isolated_level; |
849 | isolated_level = myNestedLevel; |
850 | tbb::this_task_arena::isolate( IsolatedBody<Partitioner>( *this, p ) ); |
851 | isolated_level = previous_isolation; |
852 | break; |
853 | } |
854 | } |
855 | } |
856 | public: |
857 | HeavyMixTestBody( tbb::enumerable_thread_specific<Harness::FastRandom>& random, |
858 | tbb::enumerable_thread_specific<int>& isolated_level, int nested_level, bool high_priority ) |
859 | : myRandom( random ), myIsolatedLevel( isolated_level ) |
860 | , myNestedLevel( nested_level ), myHighPriority( high_priority ) {} |
861 | void operator()() const { |
862 | int &isolated_level = myIsolatedLevel.local(); |
863 | ASSERT( myNestedLevel > isolated_level, "The outer-level task should not be stolen on isolated level" ); |
864 | if ( myNestedLevel == 20 ) |
865 | return; |
866 | Harness::FastRandom &rnd = myRandom.local(); |
867 | if ( rnd.get() % 2 == 1 ) { |
868 | RunNextLevel<tbb::auto_partitioner>( rnd, isolated_level ); |
869 | } else { |
870 | RunNextLevel<tbb::affinity_partitioner>( rnd, isolated_level ); |
871 | } |
872 | } |
873 | void operator()(int) const { |
874 | this->operator()(); |
875 | } |
876 | }; |
877 | |
878 | struct RandomInitializer { |
879 | Harness::FastRandom operator()() { |
880 | return Harness::FastRandom( tbb::this_task_arena::current_thread_index() ); |
881 | } |
882 | }; |
883 | |
884 | void HeavyMixTest() { |
885 | tbb::task_scheduler_init init( tbb::task_scheduler_init::default_num_threads() < 3 ? 3 : tbb::task_scheduler_init::automatic ); |
886 | RandomInitializer init_random; |
887 | tbb::enumerable_thread_specific<Harness::FastRandom> random( init_random ); |
888 | tbb::enumerable_thread_specific<int> isolated_level( 0 ); |
889 | for ( int i = 0; i < 5; ++i ) { |
890 | HeavyMixTestBody b( random, isolated_level, 1, false ); |
891 | b( 0 ); |
892 | REMARK( "." ); |
893 | } |
894 | } |
895 | //--------------------------------------------------// |
896 | struct ContinuationTestReduceBody : NoAssign { |
897 | tbb::internal::isolation_tag myIsolation; |
898 | ContinuationTestReduceBody( tbb::internal::isolation_tag isolation ) : myIsolation( isolation ) {} |
899 | ContinuationTestReduceBody( ContinuationTestReduceBody& b, tbb::split ) : myIsolation( b.myIsolation ) {} |
900 | void operator()( tbb::blocked_range<int> ) {} |
901 | void join( ContinuationTestReduceBody& ) { |
902 | tbb::internal::isolation_tag isolation = tbb::task::self().prefix().isolation; |
903 | ASSERT( isolation == myIsolation, "The continuations should preserve children's isolation" ); |
904 | } |
905 | }; |
906 | struct ContinuationTestIsolated { |
907 | void operator()() const { |
908 | ContinuationTestReduceBody b( tbb::task::self().prefix().isolation ); |
909 | tbb::parallel_deterministic_reduce( tbb::blocked_range<int>( 0, 100 ), b ); |
910 | } |
911 | }; |
912 | struct ContinuationTestParForBody : NoAssign { |
913 | tbb::enumerable_thread_specific<int> &myEts; |
914 | public: |
915 | ContinuationTestParForBody( tbb::enumerable_thread_specific<int> &ets ) : myEts( ets ){} |
916 | void operator()( int ) const { |
917 | int &e = myEts.local(); |
918 | ++e; |
919 | ASSERT( e==1, "The task is stolen on isolated level" ); |
920 | tbb::this_task_arena::isolate( ContinuationTestIsolated() ); |
921 | --e; |
922 | } |
923 | }; |
924 | void ContinuationTest() { |
925 | for ( int i = 0; i < 5; ++i ) { |
926 | tbb::enumerable_thread_specific<int> myEts; |
927 | tbb::parallel_for( 0, 100, ContinuationTestParForBody( myEts ), tbb::simple_partitioner() ); |
928 | } |
929 | } |
930 | //--------------------------------------------------// |
931 | #if TBB_USE_EXCEPTIONS |
932 | struct MyException {}; |
933 | struct IsolatedBodyThrowsException { |
934 | void operator()() const { |
935 | __TBB_THROW( MyException() ); |
936 | } |
937 | }; |
938 | struct ExceptionTestBody : NoAssign { |
939 | tbb::enumerable_thread_specific<int>& myEts; |
940 | tbb::atomic<bool>& myIsStolen; |
941 | ExceptionTestBody( tbb::enumerable_thread_specific<int>& ets, tbb::atomic<bool>& is_stolen ) |
942 | : myEts( ets ), myIsStolen( is_stolen ) {} |
943 | void operator()( int i ) const { |
944 | try { |
945 | tbb::this_task_arena::isolate( IsolatedBodyThrowsException() ); |
946 | ASSERT( false, "The exception has been lost" ); |
947 | } |
948 | catch ( MyException ) {} |
949 | catch ( ... ) { |
950 | ASSERT( false, "Unexpected exception" ); |
951 | } |
952 | // Check that nested algorithms can steal outer-level tasks |
953 | int &e = myEts.local(); |
954 | if ( e++ > 0 ) myIsStolen = true; |
955 | // work imbalance increases chances for stealing |
956 | tbb::parallel_for( 0, 10+i, Harness::DummyBody( 100 ) ); |
957 | --e; |
958 | } |
959 | }; |
960 | |
961 | #endif /* TBB_USE_EXCEPTIONS */ |
962 | void ExceptionTest() { |
963 | #if TBB_USE_EXCEPTIONS |
964 | tbb::enumerable_thread_specific<int> ets; |
965 | tbb::atomic<bool> is_stolen; |
966 | is_stolen = false; |
967 | for ( int i = 0; i<10; ++i ) { |
968 | tbb::parallel_for( 0, 1000, ExceptionTestBody( ets, is_stolen ) ); |
969 | if ( is_stolen ) break; |
970 | } |
971 | ASSERT( is_stolen, "isolate should not affect non-isolated work" ); |
972 | #endif /* TBB_USE_EXCEPTIONS */ |
973 | } |
974 | |
975 | struct NonConstBody { |
976 | unsigned int state; |
977 | void operator()() { |
978 | state ^= ~0u; |
979 | } |
980 | }; |
981 | |
982 | void TestNonConstBody() { |
983 | NonConstBody body; |
984 | body.state = 0x6c97d5ed; |
985 | tbb::this_task_arena::isolate(body); |
986 | ASSERT(body.state == 0x93682a12, "The wrong state" ); |
987 | } |
988 | |
989 | class TestEnqueueTask : public tbb::task { |
990 | bool enqueued; |
991 | tbb::enumerable_thread_specific<bool>& executed; |
992 | tbb::atomic<int>& completed; |
993 | public: |
994 | static const int N = 100; |
995 | |
996 | TestEnqueueTask(bool enq, tbb::enumerable_thread_specific<bool>& exe, tbb::atomic<int>& c) |
997 | : enqueued(enq), executed(exe), completed(c) {} |
998 | tbb::task* execute() __TBB_override { |
999 | if (enqueued) { |
1000 | executed.local() = true; |
1001 | ++completed; |
1002 | __TBB_Yield(); |
1003 | } else { |
1004 | parent()->add_ref_count(N); |
1005 | for (int i = 0; i < N; ++i) |
1006 | tbb::task::enqueue(*new (parent()->allocate_child()) TestEnqueueTask(true, executed, completed)); |
1007 | } |
1008 | return NULL; |
1009 | } |
1010 | }; |
1011 | |
1012 | class TestEnqueueIsolateBody : NoCopy { |
1013 | tbb::enumerable_thread_specific<bool>& executed; |
1014 | tbb::atomic<int>& completed; |
1015 | public: |
1016 | TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool>& exe, tbb::atomic<int>& c) |
1017 | : executed(exe), completed(c) {} |
1018 | void operator()() { |
1019 | tbb::task::spawn_root_and_wait(*new (tbb::task::allocate_root()) TestEnqueueTask(false, executed, completed)); |
1020 | } |
1021 | }; |
1022 | |
1023 | void TestEnqueue() { |
1024 | tbb::enumerable_thread_specific<bool> executed(false); |
1025 | tbb::atomic<int> completed; |
1026 | |
1027 | // Check that the main thread can process enqueued tasks. |
1028 | completed = 0; |
1029 | TestEnqueueIsolateBody b1(executed, completed); |
1030 | b1(); |
1031 | if (!executed.local()) |
1032 | REPORT("Warning: No one enqueued task has executed by the main thread.\n" ); |
1033 | |
1034 | executed.local() = false; |
1035 | completed = 0; |
1036 | const int N = 100; |
1037 | // Create enqueued tasks out of isolation. |
1038 | for (int i = 0; i < N; ++i) |
1039 | tbb::task::enqueue(*new (tbb::task::allocate_root()) TestEnqueueTask(true, executed, completed)); |
1040 | TestEnqueueIsolateBody b2(executed, completed); |
1041 | tbb::this_task_arena::isolate(b2); |
1042 | ASSERT(executed.local() == false, "An enqueued task was executed within isolate." ); |
1043 | |
1044 | while (completed < TestEnqueueTask::N + N) __TBB_Yield(); |
1045 | } |
1046 | } |
1047 | |
1048 | void TestIsolatedExecute() { |
1049 | REMARK("TestIsolatedExecute" ); |
1050 | // At least 3 threads (owner + 2 thieves) are required to reproduce a situation when the owner steals outer |
1051 | // level task on a nested level. If we have only one thief then it will execute outer level tasks first and |
1052 | // the owner will not have a possibility to steal outer level tasks. |
1053 | int num_threads = min( tbb::task_scheduler_init::default_num_threads(), 3 ); |
1054 | { |
1055 | // Too many threads require too many work to reproduce the stealing from outer level. |
1056 | tbb::task_scheduler_init init( max(num_threads, 7) ); |
1057 | REMARK("." ); TestIsolatedExecuteNS::TwoLoopsTest(); |
1058 | REMARK("." ); TestIsolatedExecuteNS::HeavyMixTest(); |
1059 | REMARK("." ); TestIsolatedExecuteNS::ContinuationTest(); |
1060 | REMARK("." ); TestIsolatedExecuteNS::ExceptionTest(); |
1061 | } |
1062 | tbb::task_scheduler_init init(num_threads); |
1063 | REMARK("." ); TestIsolatedExecuteNS::HeavyMixTest(); |
1064 | REMARK("." ); TestIsolatedExecuteNS::ContinuationTest(); |
1065 | REMARK("." ); TestIsolatedExecuteNS::TestNonConstBody(); |
1066 | REMARK("." ); TestIsolatedExecuteNS::TestEnqueue(); |
1067 | REMARK("\rTestIsolatedExecute: done \n" ); |
1068 | } |
1069 | #endif /* __TBB_TASK_ISOLATION */ |
1070 | //--------------------------------------------------// |
1071 | //--------------------------------------------------// |
1072 | |
1073 | class TestDelegatedSpawnWaitBody : NoAssign { |
1074 | tbb::task_arena &my_a; |
1075 | Harness::SpinBarrier &my_b1, &my_b2; |
1076 | |
1077 | struct Spawner : NoAssign { |
1078 | tbb::task* const a_task; |
1079 | Spawner(tbb::task* const t) : a_task(t) {} |
1080 | void operator()() const { |
1081 | tbb::task::spawn( *new(a_task->allocate_child()) tbb::empty_task ); |
1082 | } |
1083 | }; |
1084 | |
1085 | struct Waiter : NoAssign { |
1086 | tbb::task* const a_task; |
1087 | Waiter(tbb::task* const t) : a_task(t) {} |
1088 | void operator()() const { |
1089 | a_task->wait_for_all(); |
1090 | } |
1091 | }; |
1092 | |
1093 | public: |
1094 | TestDelegatedSpawnWaitBody( tbb::task_arena &a, Harness::SpinBarrier &b1, Harness::SpinBarrier &b2) |
1095 | : my_a(a), my_b1(b1), my_b2(b2) {} |
1096 | // NativeParallelFor's functor |
1097 | void operator()(int idx) const { |
1098 | if ( idx==0 ) { // thread 0 works in the arena, thread 1 waits for it (to prevent test hang) |
1099 | for( int i=0; i<2; ++i ) my_a.enqueue(*this); // tasks to sync with workers |
1100 | tbb::empty_task* root_task = new(tbb::task::allocate_root()) tbb::empty_task; |
1101 | root_task->set_ref_count(100001); |
1102 | my_b1.timed_wait(10); // sync with the workers |
1103 | for( int i=0; i<100000; ++i) { |
1104 | my_a.execute(Spawner(root_task)); |
1105 | } |
1106 | my_a.execute(Waiter(root_task)); |
1107 | tbb::task::destroy(*root_task); |
1108 | } |
1109 | my_b2.timed_wait(10); // sync both threads |
1110 | } |
1111 | // Arena's functor |
1112 | void operator()() const { |
1113 | my_b1.timed_wait(10); // sync with the arena master |
1114 | } |
1115 | }; |
1116 | |
1117 | void TestDelegatedSpawnWait() { |
1118 | // Regression test for a bug with missed wakeup notification from a delegated task |
1119 | REMARK( "Testing delegated spawn & wait\n" ); |
1120 | tbb::task_arena a(2,0); |
1121 | a.initialize(); |
1122 | Harness::SpinBarrier barrier1(3), barrier2(2); |
1123 | NativeParallelFor( 2, TestDelegatedSpawnWaitBody(a, barrier1, barrier2) ); |
1124 | a.debug_wait_until_empty(); |
1125 | } |
1126 | |
1127 | class TestMultipleWaitsArenaWait : NoAssign { |
1128 | public: |
1129 | TestMultipleWaitsArenaWait( int idx, int bunch_size, int num_tasks, tbb::task** waiters, tbb::atomic<int>& processed ) |
1130 | : my_idx( idx ), my_bunch_size( bunch_size ), my_num_tasks(num_tasks), my_waiters( waiters ), my_processed( processed ) {} |
1131 | void operator()() const { |
1132 | ++my_processed; |
1133 | // Wait for all tasks |
1134 | if ( my_idx < my_num_tasks ) |
1135 | my_waiters[my_idx]->wait_for_all(); |
1136 | // Signal waiting tasks |
1137 | if ( my_idx >= my_bunch_size ) |
1138 | my_waiters[my_idx-my_bunch_size]->decrement_ref_count(); |
1139 | } |
1140 | private: |
1141 | int my_idx; |
1142 | int my_bunch_size; |
1143 | int my_num_tasks; |
1144 | tbb::task** my_waiters; |
1145 | tbb::atomic<int>& my_processed; |
1146 | }; |
1147 | |
1148 | class TestMultipleWaitsThreadBody : NoAssign { |
1149 | public: |
1150 | TestMultipleWaitsThreadBody( int bunch_size, int num_tasks, tbb::task_arena& a, tbb::task** waiters, tbb::atomic<int>& processed ) |
1151 | : my_bunch_size( bunch_size ), my_num_tasks( num_tasks ), my_arena( a ), my_waiters( waiters ), my_processed( processed ) {} |
1152 | void operator()( int idx ) const { |
1153 | my_arena.execute( TestMultipleWaitsArenaWait( idx, my_bunch_size, my_num_tasks, my_waiters, my_processed ) ); |
1154 | --my_processed; |
1155 | } |
1156 | private: |
1157 | int my_bunch_size; |
1158 | int my_num_tasks; |
1159 | tbb::task_arena& my_arena; |
1160 | tbb::task** my_waiters; |
1161 | tbb::atomic<int>& my_processed; |
1162 | }; |
1163 | |
1164 | #include "tbb/tbb_thread.h" |
1165 | |
1166 | void TestMultipleWaits( int num_threads, int num_bunches, int bunch_size ) { |
1167 | tbb::task_arena a( num_threads ); |
1168 | const int num_tasks = (num_bunches-1)*bunch_size; |
1169 | tbb::task** tasks = new tbb::task*[num_tasks]; |
1170 | for ( int i = 0; i<num_tasks; ++i ) |
1171 | tasks[i] = new (tbb::task::allocate_root()) tbb::empty_task(); |
1172 | tbb::atomic<int> processed; |
1173 | processed = 0; |
1174 | for ( int repeats = 0; repeats<10; ++repeats ) { |
1175 | int idx = 0; |
1176 | for ( int bunch = 0; bunch < num_bunches-1; ++bunch ) { |
1177 | // Sync with the previous bunch of tasks to prevent "false" nested dependicies (when a nested task waits for an outer task). |
1178 | while ( processed < bunch*bunch_size ) __TBB_Yield(); |
1179 | // Run the bunch of threads/tasks that depend on the next bunch of threads/tasks. |
1180 | for ( int i = 0; i<bunch_size; ++i ) { |
1181 | tasks[idx]->set_ref_count( 2 ); |
1182 | tbb::tbb_thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, tasks, processed ), idx++ ).detach(); |
1183 | } |
1184 | } |
1185 | // No sync because the threads of the last bunch do not call wait_for_all. |
1186 | // Run the last bunch of threads. |
1187 | for ( int i = 0; i<bunch_size; ++i ) |
1188 | tbb::tbb_thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, tasks, processed ), idx++ ).detach(); |
1189 | while ( processed ) __TBB_Yield(); |
1190 | } |
1191 | for ( int i = 0; i<num_tasks; ++i ) |
1192 | tbb::task::destroy( *tasks[i] ); |
1193 | delete[] tasks; |
1194 | } |
1195 | |
1196 | void TestMultipleWaits() { |
1197 | REMARK( "Testing multiple waits\n" ); |
1198 | // Limit the number of threads to prevent heavy oversubscription. |
1199 | const int max_threads = min( 16, tbb::task_scheduler_init::default_num_threads() ); |
1200 | |
1201 | Harness::FastRandom rnd(1234); |
1202 | for ( int threads = 1; threads <= max_threads; threads += max( threads/2, 1 ) ) { |
1203 | for ( int i = 0; i<3; ++i ) { |
1204 | const int num_bunches = 3 + rnd.get()%3; |
1205 | const int bunch_size = max_threads + rnd.get()%max_threads; |
1206 | TestMultipleWaits( threads, num_bunches, bunch_size ); |
1207 | } |
1208 | } |
1209 | } |
1210 | //--------------------------------------------------// |
1211 | #include "tbb/global_control.h" |
1212 | |
1213 | void TestSmallStackSize() { |
1214 | tbb::task_scheduler_init init(tbb::task_scheduler_init::automatic, |
1215 | tbb::global_control::active_value(tbb::global_control::thread_stack_size) / 2 ); |
1216 | // The test produces the warning (not a error) if fails. So the test is run many times |
1217 | // to make the log annoying (to force to consider it as an error). |
1218 | for (int i = 0; i < 100; ++i) { |
1219 | tbb::task_arena a; |
1220 | a.initialize(); |
1221 | } |
1222 | } |
1223 | //--------------------------------------------------// |
1224 | #if __TBB_CPP11_RVALUE_REF_PRESENT |
1225 | namespace TestMoveSemanticsNS { |
1226 | struct TestFunctor { |
1227 | void operator()() const {}; |
1228 | }; |
1229 | |
1230 | struct MoveOnlyFunctor : MoveOnly, TestFunctor { |
1231 | MoveOnlyFunctor() : MoveOnly() {}; |
1232 | MoveOnlyFunctor(MoveOnlyFunctor&& other) : MoveOnly(std::move(other)) {}; |
1233 | }; |
1234 | |
1235 | struct MovePreferableFunctor : Movable, TestFunctor { |
1236 | MovePreferableFunctor() : Movable() {}; |
1237 | MovePreferableFunctor(MovePreferableFunctor&& other) : Movable( std::move(other) ) {}; |
1238 | MovePreferableFunctor(const MovePreferableFunctor& other) : Movable(other) {}; |
1239 | }; |
1240 | |
1241 | struct NoMoveNoCopyFunctor : NoCopy, TestFunctor { |
1242 | NoMoveNoCopyFunctor() : NoCopy() {}; |
1243 | // mv ctor is not allowed as cp ctor from parent NoCopy |
1244 | private: |
1245 | NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&); |
1246 | }; |
1247 | |
1248 | |
1249 | void TestFunctors() { |
1250 | tbb::task_arena ta; |
1251 | MovePreferableFunctor mpf; |
1252 | // execute() doesn't have any copies or moves of arguments inside the impl |
1253 | ta.execute( NoMoveNoCopyFunctor() ); |
1254 | |
1255 | ta.enqueue( MoveOnlyFunctor() ); |
1256 | ta.enqueue( mpf ); |
1257 | ASSERT(mpf.alive, "object was moved when was passed by lval" ); |
1258 | mpf.Reset(); |
1259 | ta.enqueue( std::move(mpf) ); |
1260 | ASSERT(!mpf.alive, "object was copied when was passed by rval" ); |
1261 | mpf.Reset(); |
1262 | #if __TBB_TASK_PRIORITY |
1263 | ta.enqueue( MoveOnlyFunctor(), tbb::priority_normal ); |
1264 | ta.enqueue( mpf, tbb::priority_normal ); |
1265 | ASSERT(mpf.alive, "object was moved when was passed by lval" ); |
1266 | mpf.Reset(); |
1267 | ta.enqueue( std::move(mpf), tbb::priority_normal ); |
1268 | ASSERT(!mpf.alive, "object was copied when was passed by rval" ); |
1269 | mpf.Reset(); |
1270 | #endif |
1271 | } |
1272 | } |
1273 | #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */ |
1274 | |
1275 | void TestMoveSemantics() { |
1276 | #if __TBB_CPP11_RVALUE_REF_PRESENT |
1277 | TestMoveSemanticsNS::TestFunctors(); |
1278 | #else |
1279 | REPORT("Known issue: move support tests are skipped.\n" ); |
1280 | #endif |
1281 | } |
1282 | //--------------------------------------------------// |
1283 | #if __TBB_CPP11_DECLTYPE_PRESENT && !__TBB_CPP11_DECLTYPE_OF_FUNCTION_RETURN_TYPE_BROKEN |
1284 | #include <vector> |
1285 | #include "harness_state_trackable.h" |
1286 | |
1287 | namespace TestReturnValueNS { |
1288 | struct noDefaultTag {}; |
1289 | class ReturnType : public Harness::StateTrackable<> { |
1290 | static const int SIZE = 42; |
1291 | std::vector<int> data; |
1292 | public: |
1293 | ReturnType(noDefaultTag) : Harness::StateTrackable<>(0) {} |
1294 | #if !__TBB_IMPLICIT_MOVE_PRESENT |
1295 | // Define copy constructor to test that it is never called |
1296 | ReturnType(const ReturnType& r) : Harness::StateTrackable<>(r), data(r.data) {} |
1297 | ReturnType(ReturnType&& r) : Harness::StateTrackable<>(std::move(r)), data(std::move(r.data)) {} |
1298 | #endif |
1299 | void fill() { |
1300 | for (int i = 0; i < SIZE; ++i) |
1301 | data.push_back(i); |
1302 | } |
1303 | void check() { |
1304 | ASSERT(data.size() == unsigned(SIZE), NULL); |
1305 | for (int i = 0; i < SIZE; ++i) |
1306 | ASSERT(data[i] == i, NULL); |
1307 | Harness::StateTrackableCounters::counters_t& cnts = Harness::StateTrackableCounters::counters; |
1308 | ASSERT((cnts[Harness::StateTrackableBase::DefaultInitialized] == 0), NULL); |
1309 | ASSERT(cnts[Harness::StateTrackableBase::DirectInitialized] == 1, NULL); |
1310 | std::size_t copied = cnts[Harness::StateTrackableBase::CopyInitialized]; |
1311 | std::size_t moved = cnts[Harness::StateTrackableBase::MoveInitialized]; |
1312 | ASSERT(cnts[Harness::StateTrackableBase::Destroyed] == copied + moved, NULL); |
1313 | // The number of copies/moves should not exceed 3: function return, store to an internal storage, |
1314 | // acquire internal storage. |
1315 | #if __TBB_CPP11_RVALUE_REF_PRESENT |
1316 | ASSERT(copied == 0 && moved <=3, NULL); |
1317 | #else |
1318 | ASSERT(copied <= 3 && moved == 0, NULL); |
1319 | #endif |
1320 | } |
1321 | }; |
1322 | |
1323 | template <typename R> |
1324 | R function() { |
1325 | noDefaultTag tag; |
1326 | R r(tag); |
1327 | r.fill(); |
1328 | return r; |
1329 | } |
1330 | |
1331 | template <> |
1332 | void function<void>() {} |
1333 | |
1334 | template <typename R> |
1335 | struct Functor { |
1336 | R operator()() const { |
1337 | return function<R>(); |
1338 | } |
1339 | }; |
1340 | |
1341 | tbb::task_arena& arena() { |
1342 | static tbb::task_arena a; |
1343 | return a; |
1344 | } |
1345 | |
1346 | template <typename F> |
1347 | void TestExecute(F &f) { |
1348 | Harness::StateTrackableCounters::reset(); |
1349 | ReturnType r = arena().execute(f); |
1350 | r.check(); |
1351 | } |
1352 | |
1353 | template <typename F> |
1354 | void TestExecute(const F &f) { |
1355 | Harness::StateTrackableCounters::reset(); |
1356 | ReturnType r = arena().execute(f); |
1357 | r.check(); |
1358 | } |
1359 | #if TBB_PREVIEW_TASK_ISOLATION |
1360 | template <typename F> |
1361 | void TestIsolate(F &f) { |
1362 | Harness::StateTrackableCounters::reset(); |
1363 | ReturnType r = tbb::this_task_arena::isolate(f); |
1364 | r.check(); |
1365 | } |
1366 | |
1367 | template <typename F> |
1368 | void TestIsolate(const F &f) { |
1369 | Harness::StateTrackableCounters::reset(); |
1370 | ReturnType r = tbb::this_task_arena::isolate(f); |
1371 | r.check(); |
1372 | } |
1373 | #endif |
1374 | |
1375 | void Test() { |
1376 | TestExecute(Functor<ReturnType>()); |
1377 | Functor<ReturnType> f1; |
1378 | TestExecute(f1); |
1379 | TestExecute(function<ReturnType>); |
1380 | |
1381 | arena().execute(Functor<void>()); |
1382 | Functor<void> f2; |
1383 | arena().execute(f2); |
1384 | arena().execute(function<void>); |
1385 | #if TBB_PREVIEW_TASK_ISOLATION |
1386 | TestIsolate(Functor<ReturnType>()); |
1387 | TestIsolate(f1); |
1388 | TestIsolate(function<ReturnType>); |
1389 | tbb::this_task_arena::isolate(Functor<void>()); |
1390 | tbb::this_task_arena::isolate(f2); |
1391 | tbb::this_task_arena::isolate(function<void>); |
1392 | #endif |
1393 | } |
1394 | } |
1395 | #endif /* __TBB_CPP11_DECLTYPE_PRESENT */ |
1396 | |
1397 | void TestReturnValue() { |
1398 | #if __TBB_CPP11_DECLTYPE_PRESENT && !__TBB_CPP11_DECLTYPE_OF_FUNCTION_RETURN_TYPE_BROKEN |
1399 | TestReturnValueNS::Test(); |
1400 | #endif |
1401 | } |
1402 | //--------------------------------------------------// |
1403 | void TestConcurrentFunctionality(int min_thread_num = MinThread, int max_thread_num = MaxThread) { |
1404 | InitializeAndTerminate(max_thread_num); |
1405 | for (int p = min_thread_num; p <= max_thread_num; ++p) { |
1406 | REMARK("testing with %d threads\n" , p); |
1407 | TestConcurrentArenas(p); |
1408 | TestMultipleMasters(p); |
1409 | TestArenaConcurrency(p); |
1410 | } |
1411 | } |
1412 | //--------------------------------------------------// |
1413 | struct DefaultCreatedWorkersAmountBody { |
1414 | int my_threadnum; |
1415 | DefaultCreatedWorkersAmountBody(int threadnum) : my_threadnum(threadnum) {} |
1416 | void operator()(int) const { |
1417 | ASSERT(my_threadnum == tbb::this_task_arena::max_concurrency(), "concurrency level is not equal specified threadnum" ); |
1418 | ASSERT(tbb::this_task_arena::current_thread_index() < tbb::this_task_arena::max_concurrency(), "amount of created threads is more than specified by default" ); |
1419 | local_id.local() = 1; |
1420 | Harness::Sleep(1); |
1421 | } |
1422 | }; |
1423 | |
1424 | struct NativeParallelForBody { |
1425 | int my_thread_num; |
1426 | int iterations; |
1427 | NativeParallelForBody(int thread_num, int multiplier = 100) : my_thread_num(thread_num), iterations(multiplier * thread_num) {} |
1428 | void operator()(int idx) const { |
1429 | ASSERT(idx == 0, "more than 1 thread is going to reset TLS" ); |
1430 | ResetTLS(); |
1431 | tbb::parallel_for(0, iterations, DefaultCreatedWorkersAmountBody(my_thread_num), tbb::simple_partitioner()); |
1432 | ASSERT(local_id.size() == size_t(my_thread_num), "amount of created threads is not equal to default num" ); |
1433 | } |
1434 | }; |
1435 | |
1436 | void TestDefaultCreatedWorkersAmount() { |
1437 | NativeParallelFor(1, NativeParallelForBody(tbb::task_scheduler_init::default_num_threads())); |
1438 | } |
1439 | |
1440 | void TestAbilityToCreateWorkers(int thread_num) { |
1441 | tbb::task_scheduler_init init_market_with_necessary_amount_plus_one(thread_num); |
1442 | // Checks only some part of reserved-master threads amount: |
1443 | // 0 and 1 reserved threads are important cases but it is also needed |
1444 | // to collect some statistic data with other amount and to not consume |
1445 | // whole test sesion time checking each amount |
1446 | TestArenaConcurrency(thread_num - 1, 0, int(thread_num / 2.72)); |
1447 | TestArenaConcurrency(thread_num, 1, int(thread_num / 3.14)); |
1448 | } |
1449 | |
1450 | void TestDefaultWorkersLimit() { |
1451 | TestDefaultCreatedWorkersAmount(); |
1452 | // Shared RML might limit the number of workers even if you specify the limits |
1453 | // by the reason of (default_concurrency==max_concurrency) for shared RML |
1454 | #ifndef RML_USE_WCRM |
1455 | TestAbilityToCreateWorkers(256); |
1456 | #endif |
1457 | } |
1458 | //--------------------------------------------------// |
1459 | |
1460 | // MyObserver checks if threads join to the same arena |
1461 | struct MyObserver: public tbb::task_scheduler_observer { |
1462 | tbb::enumerable_thread_specific<tbb::task_arena*>& my_tls; |
1463 | tbb::task_arena& my_arena; |
1464 | tbb::atomic<int>& my_failure_counter; |
1465 | tbb::atomic<int>& my_counter; |
1466 | |
1467 | MyObserver(tbb::task_arena& a, |
1468 | tbb::enumerable_thread_specific<tbb::task_arena*>& tls, |
1469 | tbb::atomic<int>& failure_counter, |
1470 | tbb::atomic<int>& counter) |
1471 | : tbb::task_scheduler_observer(a), my_tls(tls), my_arena(a), |
1472 | my_failure_counter(failure_counter), my_counter(counter) { |
1473 | observe(true); |
1474 | } |
1475 | void on_scheduler_entry(bool worker) __TBB_override { |
1476 | if (worker) { |
1477 | ++my_counter; |
1478 | tbb::task_arena*& cur_arena = my_tls.local(); |
1479 | if (cur_arena != 0 && cur_arena != &my_arena) { |
1480 | ++my_failure_counter; |
1481 | } |
1482 | cur_arena = &my_arena; |
1483 | } |
1484 | } |
1485 | }; |
1486 | |
1487 | struct MyLoopBody { |
1488 | Harness::SpinBarrier& m_barrier; |
1489 | MyLoopBody(Harness::SpinBarrier& b):m_barrier(b) { } |
1490 | void operator()(int) const { |
1491 | m_barrier.wait(); |
1492 | } |
1493 | }; |
1494 | |
1495 | struct TaskForArenaExecute { |
1496 | Harness::SpinBarrier& m_barrier; |
1497 | TaskForArenaExecute(Harness::SpinBarrier& b):m_barrier(b) { } |
1498 | void operator()() const { |
1499 | tbb::parallel_for(0, tbb::this_task_arena::max_concurrency(), |
1500 | MyLoopBody(m_barrier), tbb::simple_partitioner() |
1501 | ); |
1502 | } |
1503 | }; |
1504 | |
1505 | struct ExecuteParallelFor { |
1506 | int n_per_thread; |
1507 | int n_repetitions; |
1508 | std::vector<tbb::task_arena>& arenas; |
1509 | Harness::SpinBarrier& arena_barrier; |
1510 | Harness::SpinBarrier& master_barrier; |
1511 | ExecuteParallelFor(const int n_per_thread_, const int n_repetitions_, |
1512 | std::vector<tbb::task_arena>& arenas_, |
1513 | Harness::SpinBarrier& arena_barrier_, Harness::SpinBarrier& master_barrier_) |
1514 | : n_per_thread(n_per_thread_), n_repetitions(n_repetitions_), arenas(arenas_), |
1515 | arena_barrier(arena_barrier_), master_barrier(master_barrier_){ } |
1516 | void operator()(int i) const { |
1517 | for (int j = 0; j < n_repetitions; ++j) { |
1518 | arenas[i].execute(TaskForArenaExecute(arena_barrier)); |
1519 | for(volatile int k = 0; k < n_per_thread; ++k){/* waiting until workers fall asleep */} |
1520 | master_barrier.wait(); |
1521 | } |
1522 | } |
1523 | }; |
1524 | |
1525 | // if n_threads == -1 then global_control initialized with default value |
1526 | void TestArenaWorkersMigrationWithNumThreads(int n_threads = 0) { |
1527 | if (n_threads == 0) { |
1528 | n_threads = tbb::task_scheduler_init::default_num_threads(); |
1529 | } |
1530 | const int max_n_arenas = 8; |
1531 | int n_arenas = 2; |
1532 | if(n_threads >= 16) |
1533 | n_arenas = max_n_arenas; |
1534 | else if (n_threads >= 8) |
1535 | n_arenas = 4; |
1536 | n_threads = n_arenas * (n_threads / n_arenas); |
1537 | const int n_per_thread = 10000000; |
1538 | const int n_repetitions = 100; |
1539 | const int n_outer_repetitions = 20; |
1540 | std::multiset<float> failure_ratio; // for median calculating |
1541 | tbb::global_control control(tbb::global_control::max_allowed_parallelism, n_threads - (n_arenas - 1)); |
1542 | Harness::SpinBarrier master_barrier(n_arenas); |
1543 | Harness::SpinBarrier arena_barrier(n_threads); |
1544 | MyObserver* observer[max_n_arenas]; |
1545 | std::vector<tbb::task_arena> arenas(n_arenas); |
1546 | tbb::atomic<int> failure_counter; |
1547 | tbb::atomic<int> counter; |
1548 | tbb::enumerable_thread_specific<tbb::task_arena*> tls; |
1549 | for (int i = 0; i < n_arenas; ++i) { |
1550 | arenas[i].initialize(n_threads / n_arenas); |
1551 | observer[i] = new MyObserver(arenas[i], tls, failure_counter, counter); |
1552 | } |
1553 | int ii = 0; |
1554 | for (; ii < n_outer_repetitions; ++ii) { |
1555 | failure_counter = 0; |
1556 | counter = 0; |
1557 | // Main code |
1558 | NativeParallelFor(n_arenas, ExecuteParallelFor(n_per_thread, n_repetitions, |
1559 | arenas, arena_barrier, master_barrier)); |
1560 | // TODO: get rid of check below by setting ratio between n_threads and n_arenas |
1561 | failure_ratio.insert((counter != 0 ? float(failure_counter) / counter : 1.0f)); |
1562 | tls.clear(); |
1563 | // collect 3 elements in failure_ratio before calculating median |
1564 | if (ii > 1) { |
1565 | std::multiset<float>::iterator it = failure_ratio.begin(); |
1566 | std::advance(it, failure_ratio.size() / 2); |
1567 | if (*it < 0.02) |
1568 | break; |
1569 | } |
1570 | } |
1571 | for (int i = 0; i < n_arenas; ++i) { |
1572 | delete observer[i]; |
1573 | } |
1574 | // check if median is so big |
1575 | std::multiset<float>::iterator it = failure_ratio.begin(); |
1576 | std::advance(it, failure_ratio.size() / 2); |
1577 | // TODO: decrease constants 0.05 and 0.3 by setting ratio between n_threads and n_arenas |
1578 | if (*it > 0.05) { |
1579 | REPORT("Warning: So many cases when threads join to different arenas.\n" ); |
1580 | ASSERT(*it <= 0.3, "A lot of cases when threads join to different arenas.\n" ); |
1581 | } |
1582 | } |
1583 | |
1584 | void TestArenaWorkersMigration() { |
1585 | TestArenaWorkersMigrationWithNumThreads(4); |
1586 | if (tbb::task_scheduler_init::default_num_threads() != 4) { |
1587 | TestArenaWorkersMigrationWithNumThreads(); |
1588 | } |
1589 | } |
1590 | |
1591 | class CheckArenaNumThreads : public tbb::task { |
1592 | public: |
1593 | static Harness::SpinBarrier m_barrier; |
1594 | |
1595 | CheckArenaNumThreads(int nt, int rm): num_threads(nt), reserved_for_masters(rm) { |
1596 | m_barrier.initialize(2); |
1597 | } |
1598 | |
1599 | tbb::task* execute() __TBB_override { |
1600 | ASSERT( tbb::this_task_arena::max_concurrency() == num_threads, "Wrong concurrency of current arena" ); |
1601 | ASSERT( tbb::this_task_arena::current_thread_index() >= reserved_for_masters, "Thread shouldn't attach to master's slots" ); |
1602 | m_barrier.wait(); |
1603 | return NULL; |
1604 | } |
1605 | |
1606 | private: |
1607 | const int num_threads; |
1608 | const int reserved_for_masters; |
1609 | }; |
1610 | |
1611 | Harness::SpinBarrier CheckArenaNumThreads::m_barrier; |
1612 | |
1613 | class EnqueueTaskIntoTaskArena |
1614 | { |
1615 | public: |
1616 | EnqueueTaskIntoTaskArena(tbb::task& t, tbb::task_arena& a) : my_task(t), my_arena(a) {} |
1617 | void operator() () |
1618 | { |
1619 | tbb::task::enqueue(my_task, my_arena); |
1620 | } |
1621 | private: |
1622 | tbb::task& my_task; |
1623 | tbb::task_arena& my_arena; |
1624 | }; |
1625 | |
1626 | void TestTaskEnqueueInArena() |
1627 | { |
1628 | int pp[8]={3, 4, 5, 7, 8, 11, 13, 17}; |
1629 | for(int i = 0; i < 8; ++i) |
1630 | { |
1631 | int p = pp[i]; |
1632 | int reserved_for_masters = p - 1; |
1633 | tbb::task_arena a(p, reserved_for_masters); |
1634 | a.initialize(); |
1635 | //Enqueue on master thread |
1636 | { |
1637 | CheckArenaNumThreads& t = *new( tbb::task::allocate_root() ) CheckArenaNumThreads(p, reserved_for_masters); |
1638 | tbb::task::enqueue(t, a); |
1639 | CheckArenaNumThreads::m_barrier.wait(); |
1640 | a.debug_wait_until_empty(); |
1641 | } |
1642 | //Enqueue on thread without scheduler |
1643 | { |
1644 | CheckArenaNumThreads& t = *new( tbb::task::allocate_root() ) CheckArenaNumThreads(p, reserved_for_masters); |
1645 | tbb::tbb_thread thr(EnqueueTaskIntoTaskArena(t, a)); |
1646 | CheckArenaNumThreads::m_barrier.wait(); |
1647 | a.debug_wait_until_empty(); |
1648 | thr.join(); |
1649 | } |
1650 | } |
1651 | } |
1652 | |
1653 | //--------------------------------------------------// |
1654 | |
1655 | int TestMain() { |
1656 | #if __TBB_TASK_ISOLATION |
1657 | TestIsolatedExecute(); |
1658 | #endif /* __TBB_TASK_ISOLATION */ |
1659 | TestSmallStackSize(); |
1660 | TestDefaultWorkersLimit(); |
1661 | // The test uses up to MaxThread workers (in arenas with no master thread), |
1662 | // so the runtime should be initialized appropriately. |
1663 | tbb::task_scheduler_init init_market_p_plus_one(MaxThread + 1); |
1664 | TestConcurrentFunctionality(); |
1665 | TestArenaEntryConsistency(); |
1666 | TestAttach(MaxThread); |
1667 | TestConstantFunctorRequirement(); |
1668 | TestDelegatedSpawnWait(); |
1669 | TestMultipleWaits(); |
1670 | TestMoveSemantics(); |
1671 | TestReturnValue(); |
1672 | TestArenaWorkersMigration(); |
1673 | TestTaskEnqueueInArena(); |
1674 | return Harness::Done; |
1675 | } |
1676 | |