1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include "harness.h"
18#include "harness_graph.h"
19#include "harness_barrier.h"
20#include "tbb/concurrent_queue.h"
21#include "tbb/flow_graph.h"
22#include "tbb/task.h"
23#include "tbb/tbb_thread.h"
24#include "tbb/mutex.h"
25#include "tbb/compat/condition_variable"
26
27#include <string>
28
29class minimal_type {
30 template<typename T>
31 friend struct place_wrapper;
32
33 int value;
34
35public:
36 minimal_type() : value(-1) {}
37 minimal_type(int v) : value(v) {}
38 minimal_type(const minimal_type &m) : value(m.value) { }
39 minimal_type &operator=(const minimal_type &m) { value = m.value; return *this; }
40};
41
42template <typename T>
43struct place_wrapper {
44 typedef T wrapped_type;
45 T value;
46 tbb::tbb_thread::id thread_id;
47 tbb::task* task_ptr;
48
49 place_wrapper( ) : value(0) {
50 thread_id = tbb::this_tbb_thread::get_id();
51 task_ptr = &tbb::task::self();
52 }
53 place_wrapper( int v ) : value(v) {
54 thread_id = tbb::this_tbb_thread::get_id();
55 task_ptr = &tbb::task::self();
56 }
57
58 place_wrapper( const place_wrapper<int> &v ) : value(v.value), thread_id(v.thread_id), task_ptr(v.task_ptr) { }
59
60 place_wrapper( const place_wrapper<minimal_type> &v ) : value(v.value), thread_id(v.thread_id), task_ptr(v.task_ptr) { }
61};
62
63template<typename T1, typename T2>
64struct wrapper_helper {
65 static void check(const T1 &, const T2 &) { }
66
67 static void copy_value(const T1 &in, T2 &out) {
68 out = in;
69 }
70};
71
72template<typename T1, typename T2>
73struct wrapper_helper< place_wrapper<T1>, place_wrapper<T2> > {
74 static void check(const place_wrapper<T1> &a, const place_wrapper<T2> &b) {
75 REMARK("a.task_ptr == %p != b.task_ptr == %p\n", a.task_ptr, b.task_ptr);
76 ASSERT( (a.thread_id != b.thread_id), "same thread used to execute adjacent nodes");
77 ASSERT( (a.task_ptr != b.task_ptr), "same task used to execute adjacent nodes");
78 return;
79 }
80 static void copy_value(const place_wrapper<T1> &in, place_wrapper<T2> &out) {
81 out.value = in.value;
82 }
83};
84
85const int NUMBER_OF_MSGS = 10;
86const int UNKNOWN_NUMBER_OF_ITEMS = -1;
87tbb::atomic<int> async_body_exec_count;
88tbb::atomic<int> async_activity_processed_msg_count;
89tbb::atomic<int> end_body_exec_count;
90
91// queueing required in test_reset for testing of cancellation
92typedef tbb::flow::async_node< int, int, tbb::flow::queueing > counting_async_node_type;
93typedef counting_async_node_type::gateway_type counting_gateway_type;
94
95struct counting_async_body {
96 tbb::atomic<int> my_async_body_exec_count;
97
98 counting_async_body() {
99 my_async_body_exec_count = 0;
100 }
101
102 void operator()( const int &input, counting_gateway_type& gateway) {
103 REMARK( "Body execution with input == %d\n", input);
104 ++my_async_body_exec_count;
105 ++async_body_exec_count;
106 if ( input == -1 ) {
107 bool result = tbb::task::self().group()->cancel_group_execution();
108 REMARK( "Canceling graph execution\n" );
109 ASSERT( result == true, "attempted to cancel graph twice" );
110 Harness::Sleep(50);
111 }
112 gateway.try_put(input);
113 }
114};
115
116void test_reset() {
117 const int N = NUMBER_OF_MSGS;
118 async_body_exec_count = 0;
119
120 tbb::flow::graph g;
121 counting_async_node_type a(g, tbb::flow::serial, counting_async_body() );
122
123 const int R = 3;
124 std::vector< harness_counting_receiver<int> > r(R, harness_counting_receiver<int>(g));
125
126 for (int i = 0; i < R; ++i) {
127#if __TBB_FLOW_GRAPH_CPP11_FEATURES
128 tbb::flow::make_edge(a, r[i]);
129#else
130 tbb::flow::make_edge( tbb::flow::output_port<0>(a), r[i] );
131#endif
132 }
133
134 REMARK( "One body execution\n" );
135 a.try_put(-1);
136 for (int i = 0; i < N; ++i) {
137 a.try_put(i);
138 }
139 g.wait_for_all();
140 // should be canceled with only 1 item reaching the async_body and the counting receivers
141 // and N items left in the node's queue
142 ASSERT( g.is_cancelled() == true, "task group not canceled" );
143
144 counting_async_body b1 = tbb::flow::copy_body<counting_async_body>(a);
145 ASSERT( int(async_body_exec_count) == int(b1.my_async_body_exec_count), "body and global body counts are different" );
146 ASSERT( int(async_body_exec_count) == 1, "global body execution count not 1" );
147 for (int i = 0; i < R; ++i) {
148 ASSERT( int(r[i].my_count) == 1, "counting receiver count not 1" );
149 }
150
151 // should clear the async_node queue, but retain its local count at 1 and keep all edges
152 g.reset(tbb::flow::rf_reset_protocol);
153
154 REMARK( "N body executions\n" );
155 for (int i = 0; i < N; ++i) {
156 a.try_put(i);
157 }
158 g.wait_for_all();
159 ASSERT( g.is_cancelled() == false, "task group not canceled" );
160
161 // a total of N+1 items should have passed through the node body
162 // the local body count should also be N+1
163 // and the counting receivers should all have a count of N+1
164 counting_async_body b2 = tbb::flow::copy_body<counting_async_body>(a);
165 ASSERT( int(async_body_exec_count) == int(b2.my_async_body_exec_count), "local and global body execution counts are different" );
166 REMARK( "async_body_exec_count==%d\n", int(async_body_exec_count) );
167 ASSERT( int(async_body_exec_count) == N+1, "globcal body execution count not N+1" );
168 for (int i = 0; i < R; ++i) {
169 ASSERT( int(r[i].my_count) == N+1, "counting receiver has not received N+1 items" );
170 }
171
172 REMARK( "N body executions with new bodies\n" );
173 // should clear the async_node queue and reset its local count to 0, but keep all edges
174 g.reset(tbb::flow::rf_reset_bodies);
175 for (int i = 0; i < N; ++i) {
176 a.try_put(i);
177 }
178 g.wait_for_all();
179 ASSERT( g.is_cancelled() == false, "task group not canceled" );
180
181 // a total of 2N+1 items should have passed through the node body
182 // the local body count should be N
183 // and the counting receivers should all have a count of 2N+1
184 counting_async_body b3 = tbb::flow::copy_body<counting_async_body>(a);
185 ASSERT( int(async_body_exec_count) == 2*N+1, "global body execution count not 2N+1" );
186 ASSERT( int(b3.my_async_body_exec_count) == N, "local body execution count not N" );
187 for (int i = 0; i < R; ++i) {
188 ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" );
189 }
190
191 // should clear the async_node queue and keep its local count at N and remove all edges
192 REMARK( "N body executions with no edges\n" );
193 g.reset(tbb::flow::rf_clear_edges);
194 for (int i = 0; i < N; ++i) {
195 a.try_put(i);
196 }
197 g.wait_for_all();
198 ASSERT( g.is_cancelled() == false, "task group not canceled" );
199
200 // a total of 3N+1 items should have passed through the node body
201 // the local body count should now be 2*N
202 // and the counting receivers should remain at a count of 2N+1
203 counting_async_body b4 = tbb::flow::copy_body<counting_async_body>(a);
204 ASSERT( int(async_body_exec_count) == 3*N+1, "global body execution count not 3N+1" );
205 ASSERT( int(b4.my_async_body_exec_count) == 2*N, "local body execution count not 2N" );
206 for (int i = 0; i < R; ++i) {
207 ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" );
208 }
209
210 // put back 1 edge to receiver 0
211 REMARK( "N body executions with 1 edge\n" );
212#if __TBB_FLOW_GRAPH_CPP11_FEATURES
213 tbb::flow::make_edge(a, r[0]);
214#else
215 tbb::flow::make_edge( tbb::flow::output_port<0>(a), r[0] );
216#endif
217 for (int i = 0; i < N; ++i) {
218 a.try_put(i);
219 }
220 g.wait_for_all();
221 ASSERT( g.is_cancelled() == false, "task group not canceled" );
222
223 // a total of 4N+1 items should have passed through the node body
224 // the local body count should now be 3*N
225 // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
226 counting_async_body b5 = tbb::flow::copy_body<counting_async_body>(a);
227 ASSERT( int(async_body_exec_count) == 4*N+1, "global body execution count not 4N+1" );
228 ASSERT( int(b5.my_async_body_exec_count) == 3*N, "local body execution count not 3N" );
229 ASSERT( int(r[0].my_count) == 3*N+1, "counting receiver has not received 3N+1 items" );
230 for (int i = 1; i < R; ++i) {
231 ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" );
232 }
233
234 // should clear the async_node queue and keep its local count at N and remove all edges
235 REMARK( "N body executions with no edges and new body\n" );
236 g.reset(static_cast<tbb::flow::reset_flags>(tbb::flow::rf_reset_bodies|tbb::flow::rf_clear_edges));
237 for (int i = 0; i < N; ++i) {
238 a.try_put(i);
239 }
240 g.wait_for_all();
241 ASSERT( g.is_cancelled() == false, "task group not canceled" );
242
243 // a total of 4N+1 items should have passed through the node body
244 // the local body count should now be 3*N
245 // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
246 counting_async_body b6 = tbb::flow::copy_body<counting_async_body>(a);
247 ASSERT( int(async_body_exec_count) == 5*N+1, "global body execution count not 5N+1" );
248 ASSERT( int(b6.my_async_body_exec_count) == N, "local body execution count not N" );
249 ASSERT( int(r[0].my_count) == 3*N+1, "counting receiver has not received 3N+1 items" );
250 for (int i = 1; i < R; ++i) {
251 ASSERT( int(r[i].my_count) == 2*N+1, "counting receiver has not received 2N+1 items" );
252 }
253}
254
255template< typename Input, typename Output >
256class async_activity : NoAssign {
257public:
258 typedef Input input_type;
259 typedef Output output_type;
260 typedef tbb::flow::async_node< input_type, output_type > async_node_type;
261 typedef typename async_node_type::gateway_type gateway_type;
262
263 struct work_type {
264 input_type input;
265 gateway_type* gateway;
266 };
267
268 class ServiceThreadBody {
269 public:
270 ServiceThreadBody( async_activity* activity ) : my_activity( activity ) {}
271
272 void operator()() {
273 my_activity->process();
274 }
275 private:
276 async_activity* my_activity;
277 };
278
279 async_activity(int expected_items, bool deferred = false, int sleep_time = 50)
280 : my_expected_items(expected_items), my_sleep_time(sleep_time) {
281 is_active = !deferred;
282 my_quit = false;
283 tbb::tbb_thread( ServiceThreadBody( this ) ).swap( my_service_thread );
284 }
285
286private:
287
288 async_activity( const async_activity& )
289 : my_expected_items(UNKNOWN_NUMBER_OF_ITEMS), my_sleep_time(0) {
290 is_active = true;
291 }
292
293public:
294 ~async_activity() {
295 stop();
296 my_service_thread.join();
297 }
298
299 void submit( const input_type &input, gateway_type& gateway ) {
300 work_type work = { input, &gateway};
301 my_work_queue.push( work );
302 }
303
304 void process() {
305 do {
306 work_type work;
307 if( is_active && my_work_queue.try_pop( work ) ) {
308 Harness::Sleep(my_sleep_time);
309 ++async_activity_processed_msg_count;
310 output_type output;
311 wrapper_helper<output_type, output_type>::copy_value(work.input, output);
312 wrapper_helper<output_type, output_type>::check(work.input, output);
313 work.gateway->try_put(output);
314 if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS ||
315 int(async_activity_processed_msg_count) == my_expected_items ) {
316 work.gateway->release_wait();
317 }
318 }
319 } while( my_quit == false || !my_work_queue.empty());
320 }
321
322 void stop() {
323 my_quit = true;
324 }
325
326 void activate() {
327 is_active = true;
328 }
329
330 bool should_reserve_each_time() {
331 if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS )
332 return true;
333 else
334 return false;
335 }
336
337private:
338
339 const int my_expected_items;
340 const int my_sleep_time;
341 tbb::atomic< bool > is_active;
342
343 tbb::concurrent_queue< work_type > my_work_queue;
344
345 tbb::atomic< bool > my_quit;
346
347 tbb::tbb_thread my_service_thread;
348};
349
350template<typename Input, typename Output>
351struct basic_test {
352 typedef Input input_type;
353 typedef Output output_type;
354 typedef tbb::flow::async_node< input_type, output_type > async_node_type;
355 typedef typename async_node_type::gateway_type gateway_type;
356
357 class start_body_type {
358 typedef Input input_type;
359 public:
360 input_type operator()( int input ) {
361 return input_type(input);
362 }
363 };
364
365#if !__TBB_CPP11_LAMBDAS_PRESENT
366 class async_body_type {
367 typedef Input input_type;
368 typedef Output output_type;
369 typedef tbb::flow::async_node< input_type, output_type > async_node_type;
370 typedef typename async_node_type::gateway_type gateway_type;
371 public:
372 typedef async_activity<input_type, output_type> async_activity_type;
373
374 async_body_type( async_activity_type* aa ) : my_async_activity( aa ) { }
375
376 async_body_type( const async_body_type& other ) : my_async_activity( other.my_async_activity ) { }
377
378 void operator()( const input_type &input, gateway_type& gateway ) {
379 ++async_body_exec_count;
380 my_async_activity->submit( input, gateway);
381 if ( my_async_activity->should_reserve_each_time() )
382 gateway.reserve_wait();
383 }
384
385 private:
386 async_activity_type* my_async_activity;
387 };
388#endif
389
390 class end_body_type {
391 typedef Output output_type;
392 public:
393 void operator()( const output_type &input ) {
394 ++end_body_exec_count;
395 output_type output;
396 wrapper_helper<output_type, output_type>::check(input, output);
397 }
398 };
399
400 basic_test() {}
401
402public:
403
404 static int run(int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
405 async_activity<input_type, output_type> my_async_activity(async_expected_items);
406 tbb::flow::graph g;
407 tbb::flow::function_node< int, input_type > start_node( g, tbb::flow::unlimited, start_body_type() );
408#if __TBB_CPP11_LAMBDAS_PRESENT
409 async_node_type offload_node(g, tbb::flow::unlimited, [&] (const input_type &input, gateway_type& gateway) {
410 ++async_body_exec_count;
411 my_async_activity.submit(input, gateway);
412 if(my_async_activity.should_reserve_each_time())
413 gateway.reserve_wait();
414 } );
415#else
416 async_node_type offload_node( g, tbb::flow::unlimited, async_body_type( &my_async_activity ) );
417#endif
418
419 tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, end_body_type() );
420
421 tbb::flow::make_edge( start_node, offload_node );
422#if __TBB_FLOW_GRAPH_CPP11_FEATURES
423 tbb::flow::make_edge( offload_node, end_node );
424#else
425 tbb::flow::make_edge( tbb::flow::output_port<0>(offload_node), end_node );
426#endif
427 async_body_exec_count = 0;
428 async_activity_processed_msg_count = 0;
429 end_body_exec_count = 0;
430
431 if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) {
432 offload_node.gateway().reserve_wait();
433 }
434 for (int i = 0; i < NUMBER_OF_MSGS; ++i) {
435 start_node.try_put(i);
436 }
437 g.wait_for_all();
438 ASSERT( async_body_exec_count == NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" );
439 ASSERT( async_activity_processed_msg_count == NUMBER_OF_MSGS, "AsyncActivity processed wrong number of signals" );
440 ASSERT( end_body_exec_count == NUMBER_OF_MSGS, "EndBody processed wrong number of signals");
441 REMARK("async_body_exec_count == %d == async_activity_processed_msg_count == %d == end_body_exec_count == %d\n",
442 int(async_body_exec_count), int(async_activity_processed_msg_count), int(end_body_exec_count));
443 return Harness::Done;
444 }
445
446};
447
448int test_copy_ctor() {
449 const int N = NUMBER_OF_MSGS;
450 async_body_exec_count = 0;
451
452 tbb::flow::graph g;
453
454 harness_counting_receiver<int> r1(g);
455 harness_counting_receiver<int> r2(g);
456
457 counting_async_node_type a(g, tbb::flow::unlimited, counting_async_body() );
458 counting_async_node_type b(a);
459#if __TBB_FLOW_GRAPH_CPP11_FEATURES
460 tbb::flow::make_edge(a, r1);
461 tbb::flow::make_edge(b, r2);
462#else
463 tbb::flow::make_edge(tbb::flow::output_port<0>(a), r1);
464 tbb::flow::make_edge(tbb::flow::output_port<0>(b), r2);
465#endif
466
467 for (int i = 0; i < N; ++i) {
468 a.try_put(i);
469 }
470 g.wait_for_all();
471
472 REMARK("async_body_exec_count = %d\n", int(async_body_exec_count));
473 REMARK("r1.my_count == %d and r2.my_count = %d\n", int(r1.my_count), int(r2.my_count));
474 ASSERT( int(async_body_exec_count) == NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" );
475 ASSERT( int(r1.my_count) == N, "counting receiver r1 has not received N items" );
476 ASSERT( int(r2.my_count) == 0, "counting receiver r2 has not received 0 items" );
477
478 for (int i = 0; i < N; ++i) {
479 b.try_put(i);
480 }
481 g.wait_for_all();
482
483 REMARK("async_body_exec_count = %d\n", int(async_body_exec_count));
484 REMARK("r1.my_count == %d and r2.my_count = %d\n", int(r1.my_count), int(r2.my_count));
485 ASSERT( int(async_body_exec_count) == 2*NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" );
486 ASSERT( int(r1.my_count) == N, "counting receiver r1 has not received N items" );
487 ASSERT( int(r2.my_count) == N, "counting receiver r2 has not received N items" );
488 return Harness::Done;
489}
490
491tbb::atomic<int> main_tid_count;
492
493template<typename Input, typename Output>
494struct spin_test {
495 typedef Input input_type;
496 typedef Output output_type;
497 typedef tbb::flow::async_node< input_type, output_type > async_node_type;
498 typedef typename async_node_type::gateway_type gateway_type;
499
500 class start_body_type {
501 typedef Input input_type;
502 public:
503 input_type operator()( int input ) {
504 return input_type(input);
505 }
506 };
507
508#if !__TBB_CPP11_LAMBDAS_PRESENT
509 class async_body_type {
510 typedef Input input_type;
511 typedef Output output_type;
512 typedef tbb::flow::async_node< input_type, output_type > async_node_type;
513 typedef typename async_node_type::gateway_type gateway_type;
514 public:
515 typedef async_activity<input_type, output_type> async_activity_type;
516
517 async_body_type( async_activity_type* aa ) : my_async_activity( aa ) { }
518
519 async_body_type( const async_body_type& other ) : my_async_activity( other.my_async_activity ) { }
520
521 void operator()(const input_type &input, gateway_type& gateway) {
522 ++async_body_exec_count;
523 my_async_activity->submit(input, gateway);
524 if(my_async_activity->should_reserve_each_time())
525 gateway.reserve_wait();
526 }
527
528 private:
529 async_activity_type* my_async_activity;
530 };
531#endif
532
533 class end_body_type {
534 typedef Output output_type;
535 tbb::tbb_thread::id my_main_tid;
536 Harness::SpinBarrier *my_barrier;
537 public:
538 end_body_type(tbb::tbb_thread::id t, Harness::SpinBarrier &b) : my_main_tid(t), my_barrier(&b) { }
539
540 void operator()( const output_type & ) {
541 ++end_body_exec_count;
542 if (tbb::this_tbb_thread::get_id() == my_main_tid) {
543 ++main_tid_count;
544 }
545 my_barrier->timed_wait_noerror(10);
546 }
547 };
548
549 spin_test() {}
550
551 static int run(int nthreads, int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
552 async_activity<input_type, output_type> my_async_activity(async_expected_items, false, 0);
553 Harness::SpinBarrier spin_barrier(nthreads);
554 tbb::flow::graph g;
555 tbb::flow::function_node< int, input_type > start_node( g, tbb::flow::unlimited, start_body_type() );
556#if __TBB_CPP11_LAMBDAS_PRESENT
557 async_node_type offload_node(g, tbb::flow::unlimited, [&](const input_type &input, gateway_type& gateway) {
558 ++async_body_exec_count;
559 my_async_activity.submit(input, gateway);
560 if(my_async_activity.should_reserve_each_time())
561 gateway.reserve_wait();
562 });
563#else
564 async_node_type offload_node( g, tbb::flow::unlimited, async_body_type( &my_async_activity ) );
565#endif
566 tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, end_body_type(tbb::this_tbb_thread::get_id(), spin_barrier) );
567 tbb::flow::make_edge( start_node, offload_node );
568#if __TBB_FLOW_GRAPH_CPP11_FEATURES
569 tbb::flow::make_edge( offload_node, end_node );
570#else
571 tbb::flow::make_edge( tbb::flow::output_port<0>(offload_node), end_node );
572#endif
573 async_body_exec_count = 0;
574 async_activity_processed_msg_count = 0;
575 end_body_exec_count = 0;
576 main_tid_count = 0;
577
578 if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) {
579 offload_node.gateway().reserve_wait();
580 }
581 for (int i = 0; i < nthreads*NUMBER_OF_MSGS; ++i) {
582 start_node.try_put(i);
583 }
584 g.wait_for_all();
585 ASSERT( async_body_exec_count == nthreads*NUMBER_OF_MSGS, "AsyncBody processed wrong number of signals" );
586 ASSERT( async_activity_processed_msg_count == nthreads*NUMBER_OF_MSGS, "AsyncActivity processed wrong number of signals" );
587 ASSERT( end_body_exec_count == nthreads*NUMBER_OF_MSGS, "EndBody processed wrong number of signals");
588 ASSERT_WARNING( main_tid_count != 0, "Main thread did not participate in end_body tasks");
589 REMARK("async_body_exec_count == %d == async_activity_processed_msg_count == %d == end_body_exec_count == %d\n",
590 int(async_body_exec_count), int(async_activity_processed_msg_count), int(end_body_exec_count));
591 return Harness::Done;
592 }
593
594};
595
596void test_for_spin_avoidance() {
597 spin_test<int, int>::run(4);
598}
599
600template< typename Input, typename Output >
601int run_tests() {
602 basic_test<Input, Output>::run();
603 basic_test<Input, Output>::run(NUMBER_OF_MSGS);
604 basic_test<place_wrapper<Input>, place_wrapper<Output> >::run();
605 basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(NUMBER_OF_MSGS);
606 return Harness::Done;
607}
608
609#include "tbb/parallel_for.h"
610template<typename Input, typename Output>
611class equeueing_on_inner_level {
612 typedef Input input_type;
613 typedef Output output_type;
614 typedef async_activity<input_type, output_type> async_activity_type;
615 typedef tbb::flow::async_node<Input, Output> async_node_type;
616 typedef typename async_node_type::gateway_type gateway_type;
617
618 class start_body_type {
619 public:
620 input_type operator() ( int input ) {
621 return input_type( input);
622 }
623 };
624
625 class async_body_type {
626 public:
627 async_body_type( async_activity_type& activity ) : my_async_activity(&activity) {}
628
629 void operator() ( const input_type &input, gateway_type& gateway ) {
630 gateway.reserve_wait();
631 my_async_activity->submit( input, gateway );
632 }
633 private:
634 async_activity_type* my_async_activity;
635 };
636
637 class end_body_type {
638 public:
639 void operator()( output_type ) {}
640 };
641
642 class body_graph_with_async {
643 public:
644 body_graph_with_async( Harness::SpinBarrier& barrier, async_activity_type& activity )
645 : spin_barrier(&barrier), my_async_activity(&activity) {}
646
647 void operator()(int) const {
648 tbb::flow::graph g;
649 tbb::flow::function_node< int, input_type > start_node( g, tbb::flow::unlimited, start_body_type() );
650
651 async_node_type offload_node( g, tbb::flow::unlimited, async_body_type( *my_async_activity ) );
652
653 tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, end_body_type() );
654
655 tbb::flow::make_edge( start_node, offload_node );
656 tbb::flow::make_edge( offload_node, end_node );
657
658 start_node.try_put(1);
659
660 spin_barrier->wait();
661
662 my_async_activity->activate();
663
664 g.wait_for_all();
665 }
666
667 private:
668 Harness::SpinBarrier* spin_barrier;
669 async_activity_type* my_async_activity;
670 };
671
672
673public:
674 static int run ()
675 {
676 const int nthreads = tbb::this_task_arena::max_concurrency();
677 Harness::SpinBarrier spin_barrier( nthreads );
678
679 async_activity_type my_async_activity( UNKNOWN_NUMBER_OF_ITEMS, true );
680
681 tbb::parallel_for( 0, nthreads, body_graph_with_async( spin_barrier, my_async_activity ) );
682 return Harness::Done;
683 }
684};
685
686int run_test_equeueing_on_inner_level() {
687 equeueing_on_inner_level<int, int>::run();
688 return Harness::Done;
689}
690
691int TestMain() {
692 tbb::task_scheduler_init init(4);
693 run_tests<int, int>();
694 run_tests<minimal_type, minimal_type>();
695 run_tests<int, minimal_type>();
696
697 lightweight_testing::test<tbb::flow::async_node>(NUMBER_OF_MSGS);
698
699 test_reset();
700 test_copy_ctor();
701 test_for_spin_avoidance();
702 run_test_equeueing_on_inner_level();
703 return Harness::Done;
704}
705
706