1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | #include "harness.h" |
18 | #include "harness_graph.h" |
19 | #include "harness_barrier.h" |
20 | #include "tbb/concurrent_queue.h" |
21 | #include "tbb/flow_graph.h" |
22 | #include "tbb/task.h" |
23 | #include "tbb/tbb_thread.h" |
24 | #include "tbb/mutex.h" |
25 | #include "tbb/compat/condition_variable" |
26 | |
27 | #include <string> |
28 | |
29 | class minimal_type { |
30 | template<typename T> |
31 | friend struct place_wrapper; |
32 | |
33 | int value; |
34 | |
35 | public: |
36 | minimal_type() : value(-1) {} |
37 | minimal_type(int v) : value(v) {} |
38 | minimal_type(const minimal_type &m) : value(m.value) { } |
39 | minimal_type &operator=(const minimal_type &m) { value = m.value; return *this; } |
40 | }; |
41 | |
42 | template <typename T> |
43 | struct place_wrapper { |
44 | typedef T wrapped_type; |
45 | T value; |
46 | tbb::tbb_thread::id thread_id; |
47 | tbb::task* task_ptr; |
48 | |
49 | place_wrapper( ) : value(0) { |
50 | thread_id = tbb::this_tbb_thread::get_id(); |
51 | task_ptr = &tbb::task::self(); |
52 | } |
53 | place_wrapper( int v ) : value(v) { |
54 | thread_id = tbb::this_tbb_thread::get_id(); |
55 | task_ptr = &tbb::task::self(); |
56 | } |
57 | |
58 | place_wrapper( const place_wrapper<int> &v ) : value(v.value), thread_id(v.thread_id), task_ptr(v.task_ptr) { } |
59 | |
60 | place_wrapper( const place_wrapper<minimal_type> &v ) : value(v.value), thread_id(v.thread_id), task_ptr(v.task_ptr) { } |
61 | }; |
62 | |
63 | template<typename T1, typename T2> |
64 | struct wrapper_helper { |
65 | static void check(const T1 &, const T2 &) { } |
66 | |
67 | static void copy_value(const T1 &in, T2 &out) { |
68 | out = in; |
69 | } |
70 | }; |
71 | |
72 | template<typename T1, typename T2> |
73 | struct wrapper_helper< place_wrapper<T1>, place_wrapper<T2> > { |
74 | static void check(const place_wrapper<T1> &a, const place_wrapper<T2> &b) { |
75 | REMARK("a.task_ptr == %p != b.task_ptr == %p\n" , a.task_ptr, b.task_ptr); |
76 | ASSERT( (a.thread_id != b.thread_id), "same thread used to execute adjacent nodes" ); |
77 | ASSERT( (a.task_ptr != b.task_ptr), "same task used to execute adjacent nodes" ); |
78 | return; |
79 | } |
80 | static void copy_value(const place_wrapper<T1> &in, place_wrapper<T2> &out) { |
81 | out.value = in.value; |
82 | } |
83 | }; |
84 | |
85 | const int NUMBER_OF_MSGS = 10; |
86 | const int UNKNOWN_NUMBER_OF_ITEMS = -1; |
87 | tbb::atomic<int> async_body_exec_count; |
88 | tbb::atomic<int> async_activity_processed_msg_count; |
89 | tbb::atomic<int> end_body_exec_count; |
90 | |
91 | // queueing required in test_reset for testing of cancellation |
92 | typedef tbb::flow::async_node< int, int, tbb::flow::queueing > counting_async_node_type; |
93 | typedef counting_async_node_type::gateway_type counting_gateway_type; |
94 | |
95 | struct counting_async_body { |
96 | tbb::atomic<int> my_async_body_exec_count; |
97 | |
98 | counting_async_body() { |
99 | my_async_body_exec_count = 0; |
100 | } |
101 | |
102 | void operator()( const int &input, counting_gateway_type& gateway) { |
103 | REMARK( "Body execution with input == %d\n" , input); |
104 | ++my_async_body_exec_count; |
105 | ++async_body_exec_count; |
106 | if ( input == -1 ) { |
107 | bool result = tbb::task::self().group()->cancel_group_execution(); |
108 | REMARK( "Canceling graph execution\n" ); |
109 | ASSERT( result == true, "attempted to cancel graph twice" ); |
110 | Harness::Sleep(50); |
111 | } |
112 | gateway.try_put(input); |
113 | } |
114 | }; |
115 | |
116 | void test_reset() { |
117 | const int N = NUMBER_OF_MSGS; |
118 | async_body_exec_count = 0; |
119 | |
120 | tbb::flow::graph g; |
121 | counting_async_node_type a(g, tbb::flow::serial, counting_async_body() ); |
122 | |
123 | const int R = 3; |
124 | std::vector< harness_counting_receiver<int> > r(R, harness_counting_receiver<int>(g)); |
125 | |
126 | for (int i = 0; i < R; ++i) { |
127 | #if __TBB_FLOW_GRAPH_CPP11_FEATURES |
128 | tbb::flow::make_edge(a, r[i]); |
129 | #else |
130 | tbb::flow::make_edge( tbb::flow::output_port<0>(a), r[i] ); |
131 | #endif |
132 | } |
133 | |
134 | REMARK( "One body execution\n" ); |
135 | a.try_put(-1); |
136 | for (int i = 0; i < N; ++i) { |
137 | a.try_put(i); |
138 | } |
139 | g.wait_for_all(); |
140 | // should be canceled with only 1 item reaching the async_body and the counting receivers |
141 | // and N items left in the node's queue |
142 | ASSERT( g.is_cancelled() == true, "task group not canceled" ); |
143 | |
144 | counting_async_body b1 = tbb::flow::copy_body<counting_async_body>(a); |
145 | ASSERT( int(async_body_exec_count) == int(b1.my_async_body_exec_count), "body and global body counts are different" ); |
146 | ASSERT( int(async_body_exec_count) == 1, "global body execution count not 1" ); |
147 | for (int i = 0; i < R; ++i) { |
148 | ASSERT( int(r[i].my_count) == 1, "counting receiver count not 1" ); |
149 | } |
150 | |
151 | // should clear the async_node queue, but retain its local count at 1 and keep all edges |
152 | g.reset(tbb::flow::rf_reset_protocol); |
153 | |
154 | REMARK( "N body executions\n" ); |
155 | for (int i = 0; i < N; ++i) { |
156 | a.try_put(i); |
157 | } |
158 | g.wait_for_all(); |
159 | ASSERT( g.is_cancelled() == false, "task group not canceled" ); |
160 | |
161 | // a total of N+1 items should have passed through the node body |
162 | // the local body count should also be N+1 |
163 | // and the counting receivers should all have a count of N+1 |
164 | counting_async_body b2 = tbb::flow::copy_body<counting_async_body>(a); |
165 | ASSERT( int(async_body_exec_count) == int(b2.my_async_body_exec_count), "local and global body execution counts are different" ); |
166 | REMARK( "async_body_exec_count==%d\n" , int(async_body_exec_count) ); |
167 | ASSERT( int(async_body_exec_count) == N+1, "globcal body execution count not N+1" ); |
168 | for (int i = 0; i < R; ++i) { |
169 | ASSERT( int(r[i].my_count) == N+1, "counting receiver has not received N+1 items" ); |
170 | } |
171 | |
172 | REMARK( "N body executions with new bodies\n" ); |
173 | // should clear the async_node queue and reset its local count to 0, but keep all edges |
174 | g.reset(tbb::flow::rf_reset_bodies); |
175 | for (int i = 0; i < N; ++i) { |
176 | a.try_put(i); |
177 | } |
178 | g.wait_for_all(); |
179 | ASSERT( g.is_cancelled() == false, "task group not canceled" ); |
180 | |
181 | // a total of 2N+1 items should have passed through the node body |
182 | // the local body count should be N |
183 | // and the counting receivers should all have a count of 2N+1 |
184 | counting_async_body b3 = tbb::flow::copy_body<counting_async_body>(a); |
185 | ASSERT( int(async_body_exec_count) == 2*N+1, "global body execution count not 2N+1" ); |
186 | ASSERT( int(b3.my_async_body_exec_count) == N, "local body execution count not N" ); |
187 | for (int i = 0; i < R; ++i) { |
188 | ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" ); |
189 | } |
190 | |
191 | // should clear the async_node queue and keep its local count at N and remove all edges |
192 | REMARK( "N body executions with no edges\n" ); |
193 | g.reset(tbb::flow::rf_clear_edges); |
194 | for (int i = 0; i < N; ++i) { |
195 | a.try_put(i); |
196 | } |
197 | g.wait_for_all(); |
198 | ASSERT( g.is_cancelled() == false, "task group not canceled" ); |
199 | |
200 | // a total of 3N+1 items should have passed through the node body |
201 | // the local body count should now be 2*N |
202 | // and the counting receivers should remain at a count of 2N+1 |
203 | counting_async_body b4 = tbb::flow::copy_body<counting_async_body>(a); |
204 | ASSERT( int(async_body_exec_count) == 3*N+1, "global body execution count not 3N+1" ); |
205 | ASSERT( int(b4.my_async_body_exec_count) == 2*N, "local body execution count not 2N" ); |
206 | for (int i = 0; i < R; ++i) { |
207 | ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" ); |
208 | } |
209 | |
210 | // put back 1 edge to receiver 0 |
211 | REMARK( "N body executions with 1 edge\n" ); |
212 | #if __TBB_FLOW_GRAPH_CPP11_FEATURES |
213 | tbb::flow::make_edge(a, r[0]); |
214 | #else |
215 | tbb::flow::make_edge( tbb::flow::output_port<0>(a), r[0] ); |
216 | #endif |
217 | for (int i = 0; i < N; ++i) { |
218 | a.try_put(i); |
219 | } |
220 | g.wait_for_all(); |
221 | ASSERT( g.is_cancelled() == false, "task group not canceled" ); |
222 | |
223 | // a total of 4N+1 items should have passed through the node body |
224 | // the local body count should now be 3*N |
225 | // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1 |
226 | counting_async_body b5 = tbb::flow::copy_body<counting_async_body>(a); |
227 | ASSERT( int(async_body_exec_count) == 4*N+1, "global body execution count not 4N+1" ); |
228 | ASSERT( int(b5.my_async_body_exec_count) == 3*N, "local body execution count not 3N" ); |
229 | ASSERT( int(r[0].my_count) == 3*N+1, "counting receiver has not received 3N+1 items" ); |
230 | for (int i = 1; i < R; ++i) { |
231 | ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" ); |
232 | } |
233 | |
234 | // should clear the async_node queue and keep its local count at N and remove all edges |
235 | REMARK( "N body executions with no edges and new body\n" ); |
236 | g.reset(static_cast<tbb::flow::reset_flags>(tbb::flow::rf_reset_bodies|tbb::flow::rf_clear_edges)); |
237 | for (int i = 0; i < N; ++i) { |
238 | a.try_put(i); |
239 | } |
240 | g.wait_for_all(); |
241 | ASSERT( g.is_cancelled() == false, "task group not canceled" ); |
242 | |
243 | // a total of 4N+1 items should have passed through the node body |
244 | // the local body count should now be 3*N |
245 | // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1 |
246 | counting_async_body b6 = tbb::flow::copy_body<counting_async_body>(a); |
247 | ASSERT( int(async_body_exec_count) == 5*N+1, "global body execution count not 5N+1" ); |
248 | ASSERT( int(b6.my_async_body_exec_count) == N, "local body execution count not N" ); |
249 | ASSERT( int(r[0].my_count) == 3*N+1, "counting receiver has not received 3N+1 items" ); |
250 | for (int i = 1; i < R; ++i) { |
251 | ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" ); |
252 | } |
253 | } |
254 | |
255 | template< typename Input, typename Output > |
256 | class async_activity : NoAssign { |
257 | public: |
258 | typedef Input input_type; |
259 | typedef Output output_type; |
260 | typedef tbb::flow::async_node< input_type, output_type > async_node_type; |
261 | typedef typename async_node_type::gateway_type gateway_type; |
262 | |
263 | struct work_type { |
264 | input_type input; |
265 | gateway_type* gateway; |
266 | }; |
267 | |
268 | class ServiceThreadBody { |
269 | public: |
270 | ServiceThreadBody( async_activity* activity ) : my_activity( activity ) {} |
271 | |
272 | void operator()() { |
273 | my_activity->process(); |
274 | } |
275 | private: |
276 | async_activity* my_activity; |
277 | }; |
278 | |
279 | async_activity(int expected_items, bool deferred = false, int sleep_time = 50) |
280 | : my_expected_items(expected_items), my_sleep_time(sleep_time) { |
281 | is_active = !deferred; |
282 | my_quit = false; |
283 | tbb::tbb_thread( ServiceThreadBody( this ) ).swap( my_service_thread ); |
284 | } |
285 | |
286 | private: |
287 | |
288 | async_activity( const async_activity& ) |
289 | : my_expected_items(UNKNOWN_NUMBER_OF_ITEMS), my_sleep_time(0) { |
290 | is_active = true; |
291 | } |
292 | |
293 | public: |
294 | ~async_activity() { |
295 | stop(); |
296 | my_service_thread.join(); |
297 | } |
298 | |
299 | void submit( const input_type &input, gateway_type& gateway ) { |
300 | work_type work = { input, &gateway}; |
301 | my_work_queue.push( work ); |
302 | } |
303 | |
304 | void process() { |
305 | do { |
306 | work_type work; |
307 | if( is_active && my_work_queue.try_pop( work ) ) { |
308 | Harness::Sleep(my_sleep_time); |
309 | ++async_activity_processed_msg_count; |
310 | output_type output; |
311 | wrapper_helper<output_type, output_type>::copy_value(work.input, output); |
312 | wrapper_helper<output_type, output_type>::check(work.input, output); |
313 | work.gateway->try_put(output); |
314 | if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS || |
315 | int(async_activity_processed_msg_count) == my_expected_items ) { |
316 | work.gateway->release_wait(); |
317 | } |
318 | } |
319 | } while( my_quit == false || !my_work_queue.empty()); |
320 | } |
321 | |
322 | void stop() { |
323 | my_quit = true; |
324 | } |
325 | |
326 | void activate() { |
327 | is_active = true; |
328 | } |
329 | |
330 | bool should_reserve_each_time() { |
331 | if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS ) |
332 | return true; |
333 | else |
334 | return false; |
335 | } |
336 | |
337 | private: |
338 | |
339 | const int my_expected_items; |
340 | const int my_sleep_time; |
341 | tbb::atomic< bool > is_active; |
342 | |
343 | tbb::concurrent_queue< work_type > my_work_queue; |
344 | |
345 | tbb::atomic< bool > my_quit; |
346 | |
347 | tbb::tbb_thread my_service_thread; |
348 | }; |
349 | |
350 | template<typename Input, typename Output> |
351 | struct basic_test { |
352 | typedef Input input_type; |
353 | typedef Output output_type; |
354 | typedef tbb::flow::async_node< input_type, output_type > async_node_type; |
355 | typedef typename async_node_type::gateway_type gateway_type; |
356 | |
357 | class start_body_type { |
358 | typedef Input input_type; |
359 | public: |
360 | input_type operator()( int input ) { |
361 | return input_type(input); |
362 | } |
363 | }; |
364 | |
365 | #if !__TBB_CPP11_LAMBDAS_PRESENT |
366 | class async_body_type { |
367 | typedef Input input_type; |
368 | typedef Output output_type; |
369 | typedef tbb::flow::async_node< input_type, output_type > async_node_type; |
370 | typedef typename async_node_type::gateway_type gateway_type; |
371 | public: |
372 | typedef async_activity<input_type, output_type> async_activity_type; |
373 | |
374 | async_body_type( async_activity_type* aa ) : my_async_activity( aa ) { } |
375 | |
376 | async_body_type( const async_body_type& other ) : my_async_activity( other.my_async_activity ) { } |
377 | |
378 | void operator()( const input_type &input, gateway_type& gateway ) { |
379 | ++async_body_exec_count; |
380 | my_async_activity->submit( input, gateway); |
381 | if ( my_async_activity->should_reserve_each_time() ) |
382 | gateway.reserve_wait(); |
383 | } |
384 | |
385 | private: |
386 | async_activity_type* my_async_activity; |
387 | }; |
388 | #endif |
389 | |
390 | class end_body_type { |
391 | typedef Output output_type; |
392 | public: |
393 | void operator()( const output_type &input ) { |
394 | ++end_body_exec_count; |
395 | output_type output; |
396 | wrapper_helper<output_type, output_type>::check(input, output); |
397 | } |
398 | }; |
399 | |
400 | basic_test() {} |
401 | |
402 | public: |
403 | |
404 | static int run(int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) { |
405 | async_activity<input_type, output_type> my_async_activity(async_expected_items); |
406 | tbb::flow::graph g; |
407 | tbb::flow::function_node< int, input_type > start_node( g, tbb::flow::unlimited, start_body_type() ); |
408 | #if __TBB_CPP11_LAMBDAS_PRESENT |
409 | async_node_type offload_node(g, tbb::flow::unlimited, [&] (const input_type &input, gateway_type& gateway) { |
410 | ++async_body_exec_count; |
411 | my_async_activity.submit(input, gateway); |
412 | if(my_async_activity.should_reserve_each_time()) |
413 | gateway.reserve_wait(); |
414 | } ); |
415 | #else |
416 | async_node_type offload_node( g, tbb::flow::unlimited, async_body_type( &my_async_activity ) ); |
417 | #endif |
418 | |
419 | tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, end_body_type() ); |
420 | |
421 | tbb::flow::make_edge( start_node, offload_node ); |
422 | #if __TBB_FLOW_GRAPH_CPP11_FEATURES |
423 | tbb::flow::make_edge( offload_node, end_node ); |
424 | #else |
425 | tbb::flow::make_edge( tbb::flow::output_port<0>(offload_node), end_node ); |
426 | #endif |
427 | async_body_exec_count = 0; |
428 | async_activity_processed_msg_count = 0; |
429 | end_body_exec_count = 0; |
430 | |
431 | if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) { |
432 | offload_node.gateway().reserve_wait(); |
433 | } |
434 | for (int i = 0; i < NUMBER_OF_MSGS; ++i) { |
435 | start_node.try_put(i); |
436 | } |
437 | g.wait_for_all(); |
438 | ASSERT( async_body_exec_count == NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" ); |
439 | ASSERT( async_activity_processed_msg_count == NUMBER_OF_MSGS, "AsyncActivity processed wrong number of signals" ); |
440 | ASSERT( end_body_exec_count == NUMBER_OF_MSGS, "EndBody processed wrong number of signals" ); |
441 | REMARK("async_body_exec_count == %d == async_activity_processed_msg_count == %d == end_body_exec_count == %d\n" , |
442 | int(async_body_exec_count), int(async_activity_processed_msg_count), int(end_body_exec_count)); |
443 | return Harness::Done; |
444 | } |
445 | |
446 | }; |
447 | |
448 | int test_copy_ctor() { |
449 | const int N = NUMBER_OF_MSGS; |
450 | async_body_exec_count = 0; |
451 | |
452 | tbb::flow::graph g; |
453 | |
454 | harness_counting_receiver<int> r1(g); |
455 | harness_counting_receiver<int> r2(g); |
456 | |
457 | counting_async_node_type a(g, tbb::flow::unlimited, counting_async_body() ); |
458 | counting_async_node_type b(a); |
459 | #if __TBB_FLOW_GRAPH_CPP11_FEATURES |
460 | tbb::flow::make_edge(a, r1); |
461 | tbb::flow::make_edge(b, r2); |
462 | #else |
463 | tbb::flow::make_edge(tbb::flow::output_port<0>(a), r1); |
464 | tbb::flow::make_edge(tbb::flow::output_port<0>(b), r2); |
465 | #endif |
466 | |
467 | for (int i = 0; i < N; ++i) { |
468 | a.try_put(i); |
469 | } |
470 | g.wait_for_all(); |
471 | |
472 | REMARK("async_body_exec_count = %d\n" , int(async_body_exec_count)); |
473 | REMARK("r1.my_count == %d and r2.my_count = %d\n" , int(r1.my_count), int(r2.my_count)); |
474 | ASSERT( int(async_body_exec_count) == NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" ); |
475 | ASSERT( int(r1.my_count) == N, "counting receiver r1 has not received N items" ); |
476 | ASSERT( int(r2.my_count) == 0, "counting receiver r2 has not received 0 items" ); |
477 | |
478 | for (int i = 0; i < N; ++i) { |
479 | b.try_put(i); |
480 | } |
481 | g.wait_for_all(); |
482 | |
483 | REMARK("async_body_exec_count = %d\n" , int(async_body_exec_count)); |
484 | REMARK("r1.my_count == %d and r2.my_count = %d\n" , int(r1.my_count), int(r2.my_count)); |
485 | ASSERT( int(async_body_exec_count) == 2*NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" ); |
486 | ASSERT( int(r1.my_count) == N, "counting receiver r1 has not received N items" ); |
487 | ASSERT( int(r2.my_count) == N, "counting receiver r2 has not received N items" ); |
488 | return Harness::Done; |
489 | } |
490 | |
491 | tbb::atomic<int> main_tid_count; |
492 | |
493 | template<typename Input, typename Output> |
494 | struct spin_test { |
495 | typedef Input input_type; |
496 | typedef Output output_type; |
497 | typedef tbb::flow::async_node< input_type, output_type > async_node_type; |
498 | typedef typename async_node_type::gateway_type gateway_type; |
499 | |
500 | class start_body_type { |
501 | typedef Input input_type; |
502 | public: |
503 | input_type operator()( int input ) { |
504 | return input_type(input); |
505 | } |
506 | }; |
507 | |
508 | #if !__TBB_CPP11_LAMBDAS_PRESENT |
509 | class async_body_type { |
510 | typedef Input input_type; |
511 | typedef Output output_type; |
512 | typedef tbb::flow::async_node< input_type, output_type > async_node_type; |
513 | typedef typename async_node_type::gateway_type gateway_type; |
514 | public: |
515 | typedef async_activity<input_type, output_type> async_activity_type; |
516 | |
517 | async_body_type( async_activity_type* aa ) : my_async_activity( aa ) { } |
518 | |
519 | async_body_type( const async_body_type& other ) : my_async_activity( other.my_async_activity ) { } |
520 | |
521 | void operator()(const input_type &input, gateway_type& gateway) { |
522 | ++async_body_exec_count; |
523 | my_async_activity->submit(input, gateway); |
524 | if(my_async_activity->should_reserve_each_time()) |
525 | gateway.reserve_wait(); |
526 | } |
527 | |
528 | private: |
529 | async_activity_type* my_async_activity; |
530 | }; |
531 | #endif |
532 | |
533 | class end_body_type { |
534 | typedef Output output_type; |
535 | tbb::tbb_thread::id my_main_tid; |
536 | Harness::SpinBarrier *my_barrier; |
537 | public: |
538 | end_body_type(tbb::tbb_thread::id t, Harness::SpinBarrier &b) : my_main_tid(t), my_barrier(&b) { } |
539 | |
540 | void operator()( const output_type & ) { |
541 | ++end_body_exec_count; |
542 | if (tbb::this_tbb_thread::get_id() == my_main_tid) { |
543 | ++main_tid_count; |
544 | } |
545 | my_barrier->timed_wait_noerror(10); |
546 | } |
547 | }; |
548 | |
549 | spin_test() {} |
550 | |
551 | static int run(int nthreads, int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) { |
552 | async_activity<input_type, output_type> my_async_activity(async_expected_items, false, 0); |
553 | Harness::SpinBarrier spin_barrier(nthreads); |
554 | tbb::flow::graph g; |
555 | tbb::flow::function_node< int, input_type > start_node( g, tbb::flow::unlimited, start_body_type() ); |
556 | #if __TBB_CPP11_LAMBDAS_PRESENT |
557 | async_node_type offload_node(g, tbb::flow::unlimited, [&](const input_type &input, gateway_type& gateway) { |
558 | ++async_body_exec_count; |
559 | my_async_activity.submit(input, gateway); |
560 | if(my_async_activity.should_reserve_each_time()) |
561 | gateway.reserve_wait(); |
562 | }); |
563 | #else |
564 | async_node_type offload_node( g, tbb::flow::unlimited, async_body_type( &my_async_activity ) ); |
565 | #endif |
566 | tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, end_body_type(tbb::this_tbb_thread::get_id(), spin_barrier) ); |
567 | tbb::flow::make_edge( start_node, offload_node ); |
568 | #if __TBB_FLOW_GRAPH_CPP11_FEATURES |
569 | tbb::flow::make_edge( offload_node, end_node ); |
570 | #else |
571 | tbb::flow::make_edge( tbb::flow::output_port<0>(offload_node), end_node ); |
572 | #endif |
573 | async_body_exec_count = 0; |
574 | async_activity_processed_msg_count = 0; |
575 | end_body_exec_count = 0; |
576 | main_tid_count = 0; |
577 | |
578 | if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) { |
579 | offload_node.gateway().reserve_wait(); |
580 | } |
581 | for (int i = 0; i < nthreads*NUMBER_OF_MSGS; ++i) { |
582 | start_node.try_put(i); |
583 | } |
584 | g.wait_for_all(); |
585 | ASSERT( async_body_exec_count == nthreads*NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" ); |
586 | ASSERT( async_activity_processed_msg_count == nthreads*NUMBER_OF_MSGS, "AsyncActivity processed wrong number of signals" ); |
587 | ASSERT( end_body_exec_count == nthreads*NUMBER_OF_MSGS, "EndBody processed wrong number of signals" ); |
588 | ASSERT_WARNING( main_tid_count != 0, "Main thread did not participate in end_body tasks" ); |
589 | REMARK("async_body_exec_count == %d == async_activity_processed_msg_count == %d == end_body_exec_count == %d\n" , |
590 | int(async_body_exec_count), int(async_activity_processed_msg_count), int(end_body_exec_count)); |
591 | return Harness::Done; |
592 | } |
593 | |
594 | }; |
595 | |
596 | void test_for_spin_avoidance() { |
597 | spin_test<int, int>::run(4); |
598 | } |
599 | |
600 | template< typename Input, typename Output > |
601 | int run_tests() { |
602 | basic_test<Input, Output>::run(); |
603 | basic_test<Input, Output>::run(NUMBER_OF_MSGS); |
604 | basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(); |
605 | basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(NUMBER_OF_MSGS); |
606 | return Harness::Done; |
607 | } |
608 | |
609 | #include "tbb/parallel_for.h" |
610 | template<typename Input, typename Output> |
611 | class equeueing_on_inner_level { |
612 | typedef Input input_type; |
613 | typedef Output output_type; |
614 | typedef async_activity<input_type, output_type> async_activity_type; |
615 | typedef tbb::flow::async_node<Input, Output> async_node_type; |
616 | typedef typename async_node_type::gateway_type gateway_type; |
617 | |
618 | class start_body_type { |
619 | public: |
620 | input_type operator() ( int input ) { |
621 | return input_type( input); |
622 | } |
623 | }; |
624 | |
625 | class async_body_type { |
626 | public: |
627 | async_body_type( async_activity_type& activity ) : my_async_activity(&activity) {} |
628 | |
629 | void operator() ( const input_type &input, gateway_type& gateway ) { |
630 | gateway.reserve_wait(); |
631 | my_async_activity->submit( input, gateway ); |
632 | } |
633 | private: |
634 | async_activity_type* my_async_activity; |
635 | }; |
636 | |
637 | class end_body_type { |
638 | public: |
639 | void operator()( output_type ) {} |
640 | }; |
641 | |
642 | class body_graph_with_async { |
643 | public: |
644 | body_graph_with_async( Harness::SpinBarrier& barrier, async_activity_type& activity ) |
645 | : spin_barrier(&barrier), my_async_activity(&activity) {} |
646 | |
647 | void operator()(int) const { |
648 | tbb::flow::graph g; |
649 | tbb::flow::function_node< int, input_type > start_node( g, tbb::flow::unlimited, start_body_type() ); |
650 | |
651 | async_node_type offload_node( g, tbb::flow::unlimited, async_body_type( *my_async_activity ) ); |
652 | |
653 | tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, end_body_type() ); |
654 | |
655 | tbb::flow::make_edge( start_node, offload_node ); |
656 | tbb::flow::make_edge( offload_node, end_node ); |
657 | |
658 | start_node.try_put(1); |
659 | |
660 | spin_barrier->wait(); |
661 | |
662 | my_async_activity->activate(); |
663 | |
664 | g.wait_for_all(); |
665 | } |
666 | |
667 | private: |
668 | Harness::SpinBarrier* spin_barrier; |
669 | async_activity_type* my_async_activity; |
670 | }; |
671 | |
672 | |
673 | public: |
674 | static int run () |
675 | { |
676 | const int nthreads = tbb::this_task_arena::max_concurrency(); |
677 | Harness::SpinBarrier spin_barrier( nthreads ); |
678 | |
679 | async_activity_type my_async_activity( UNKNOWN_NUMBER_OF_ITEMS, true ); |
680 | |
681 | tbb::parallel_for( 0, nthreads, body_graph_with_async( spin_barrier, my_async_activity ) ); |
682 | return Harness::Done; |
683 | } |
684 | }; |
685 | |
686 | int run_test_equeueing_on_inner_level() { |
687 | equeueing_on_inner_level<int, int>::run(); |
688 | return Harness::Done; |
689 | } |
690 | |
691 | int TestMain() { |
692 | tbb::task_scheduler_init init(4); |
693 | run_tests<int, int>(); |
694 | run_tests<minimal_type, minimal_type>(); |
695 | run_tests<int, minimal_type>(); |
696 | |
697 | lightweight_testing::test<tbb::flow::async_node>(NUMBER_OF_MSGS); |
698 | |
699 | test_reset(); |
700 | test_copy_ctor(); |
701 | test_for_spin_avoidance(); |
702 | run_test_equeueing_on_inner_level(); |
703 | return Harness::Done; |
704 | } |
705 | |
706 | |