1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#if __TBB_CPF_BUILD
18#define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
19#endif
20
21#include "harness_graph.h"
22
23#include "tbb/flow_graph.h"
24#include "tbb/task_scheduler_init.h"
25#include "tbb/spin_rw_mutex.h"
26
27#define N 100
28#define MAX_NODES 4
29
30//! Performs test on function nodes with limited concurrency and buffering
31/** These tests check:
32 1) that the number of executing copies never exceed the concurrency limit
33 2) that the node never rejects
34 3) that no items are lost
35 and 4) all of this happens even if there are multiple predecessors and successors
36*/
37
38template< typename InputType >
39struct parallel_put_until_limit : private NoAssign {
40
41 harness_counting_sender<InputType> *my_senders;
42
43 parallel_put_until_limit( harness_counting_sender<InputType> *senders ) : my_senders(senders) {}
44
45 void operator()( int i ) const {
46 if ( my_senders ) {
47 my_senders[i].try_put_until_limit();
48 }
49 }
50
51};
52
53template<typename IO>
54struct pass_through {
55 IO operator()(const IO& i) { return i; }
56};
57
58template< typename InputType, typename OutputType, typename Body >
59void buffered_levels( size_t concurrency, Body body ) {
60
61 // Do for lc = 1 to concurrency level
62 for ( size_t lc = 1; lc <= concurrency; ++lc ) {
63 tbb::flow::graph g;
64
65 // Set the execute_counter back to zero in the harness
66 harness_graph_executor<InputType, OutputType>::execute_count = 0;
67 // Set the number of current executors to zero.
68 harness_graph_executor<InputType, OutputType>::current_executors = 0;
69 // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded.
70 harness_graph_executor<InputType, OutputType>::max_executors = lc;
71
72 // Create the function_node with the appropriate concurrency level, and use default buffering
73 tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, body );
74 tbb::flow::function_node<InputType, InputType> pass_thru( g, tbb::flow::unlimited, pass_through<InputType>());
75
76 // Create a vector of identical exe_nodes and pass_thrus
77 std::vector< tbb::flow::function_node< InputType, OutputType > > exe_vec(2, exe_node);
78 std::vector< tbb::flow::function_node< InputType, InputType > > pass_thru_vec(2, pass_thru);
79 // Attach each pass_thru to its corresponding exe_node
80 for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
81 tbb::flow::make_edge(pass_thru_vec[node_idx], exe_vec[node_idx]);
82 }
83
84 // TODO: why the test is executed serially for the node pairs, not concurrently?
85 for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
86 // For num_receivers = 1 to MAX_NODES
87 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
88 // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them.
89 std::vector< harness_mapped_receiver<OutputType>* > receivers(num_receivers);
90 for (size_t i = 0; i < num_receivers; i++) {
91 receivers[i] = new harness_mapped_receiver<OutputType>(g);
92 }
93
94 for (size_t r = 0; r < num_receivers; ++r ) {
95 tbb::flow::make_edge( exe_vec[node_idx], *receivers[r] );
96 }
97
98 // Do the test with varying numbers of senders
99 harness_counting_sender<InputType> *senders = NULL;
100 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
101 // Create num_senders senders, set there message limit each to N, and connect them to pass_thru_vec[node_idx]
102 senders = new harness_counting_sender<InputType>[num_senders];
103 for (size_t s = 0; s < num_senders; ++s ) {
104 senders[s].my_limit = N;
105 senders[s].register_successor(pass_thru_vec[node_idx] );
106 }
107
108 // Initialize the receivers so they know how many senders and messages to check for
109 for (size_t r = 0; r < num_receivers; ++r ) {
110 receivers[r]->initialize_map( N, num_senders );
111 }
112
113 // Do the test
114 NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
115 g.wait_for_all();
116
117 // confirm that each sender was requested from N times
118 for (size_t s = 0; s < num_senders; ++s ) {
119 size_t n = senders[s].my_received;
120 ASSERT( n == N, NULL );
121 ASSERT( senders[s].my_receiver == &pass_thru_vec[node_idx], NULL );
122 }
123 // validate the receivers
124 for (size_t r = 0; r < num_receivers; ++r ) {
125 receivers[r]->validate();
126 }
127 delete [] senders;
128 }
129 for (size_t r = 0; r < num_receivers; ++r ) {
130 tbb::flow::remove_edge( exe_vec[node_idx], *receivers[r] );
131 }
132 ASSERT( exe_vec[node_idx].try_put( InputType() ) == true, NULL );
133 g.wait_for_all();
134 for (size_t r = 0; r < num_receivers; ++r ) {
135 // since it's detached, nothing should have changed
136 receivers[r]->validate();
137 }
138
139 for (size_t i = 0; i < num_receivers; i++) {
140 delete receivers[i];
141 }
142
143 } // for num_receivers
144 } // for node_idx
145 } // for concurrency level lc
146}
147
148const size_t Offset = 123;
149tbb::atomic<size_t> global_execute_count;
150
151struct inc_functor {
152
153 tbb::atomic<size_t> local_execute_count;
154 inc_functor( ) { local_execute_count = 0; }
155 inc_functor( const inc_functor &f ) { local_execute_count = f.local_execute_count; }
156 void operator=( const inc_functor &f ) { local_execute_count = f.local_execute_count; }
157
158 int operator()( int i ) {
159 ++global_execute_count;
160 ++local_execute_count;
161 return i;
162 }
163
164};
165
166template< typename InputType, typename OutputType >
167void buffered_levels_with_copy( size_t concurrency ) {
168
169 // Do for lc = 1 to concurrency level
170 for ( size_t lc = 1; lc <= concurrency; ++lc ) {
171 tbb::flow::graph g;
172
173 inc_functor cf;
174 cf.local_execute_count = Offset;
175 global_execute_count = Offset;
176
177 tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, cf );
178
179 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
180
181 std::vector< harness_mapped_receiver<OutputType>* > receivers(num_receivers);
182 for (size_t i = 0; i < num_receivers; i++) {
183 receivers[i] = new harness_mapped_receiver<OutputType>(g);
184 }
185
186 for (size_t r = 0; r < num_receivers; ++r ) {
187 tbb::flow::make_edge( exe_node, *receivers[r] );
188 }
189
190 harness_counting_sender<InputType> *senders = NULL;
191 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
192 senders = new harness_counting_sender<InputType>[num_senders];
193 for (size_t s = 0; s < num_senders; ++s ) {
194 senders[s].my_limit = N;
195 tbb::flow::make_edge( senders[s], exe_node );
196 }
197
198 for (size_t r = 0; r < num_receivers; ++r ) {
199 receivers[r]->initialize_map( N, num_senders );
200 }
201
202 NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
203 g.wait_for_all();
204
205 for (size_t s = 0; s < num_senders; ++s ) {
206 size_t n = senders[s].my_received;
207 ASSERT( n == N, NULL );
208 ASSERT( senders[s].my_receiver == &exe_node, NULL );
209 }
210 for (size_t r = 0; r < num_receivers; ++r ) {
211 receivers[r]->validate();
212 }
213 delete [] senders;
214 }
215 for (size_t r = 0; r < num_receivers; ++r ) {
216 tbb::flow::remove_edge( exe_node, *receivers[r] );
217 }
218 ASSERT( exe_node.try_put( InputType() ) == true, NULL );
219 g.wait_for_all();
220 for (size_t r = 0; r < num_receivers; ++r ) {
221 receivers[r]->validate();
222 }
223
224 for (size_t i = 0; i < num_receivers; i++) {
225 delete receivers[i];
226 }
227 }
228
229 // validate that the local body matches the global execute_count and both are correct
230 inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
231 const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset;
232 size_t global_count = global_execute_count;
233 size_t inc_count = body_copy.local_execute_count;
234 ASSERT( global_count == expected_count && global_count == inc_count, NULL );
235 g.reset(tbb::flow::rf_reset_bodies);
236 body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
237 inc_count = body_copy.local_execute_count;
238 ASSERT( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor" );
239 }
240}
241
242template< typename InputType, typename OutputType >
243void run_buffered_levels( int c ) {
244 #if __TBB_CPP11_LAMBDAS_PRESENT
245 buffered_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } );
246 #endif
247 buffered_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::func );
248 buffered_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::functor() );
249 buffered_levels_with_copy<InputType,OutputType>( c );
250}
251
252
253//! Performs test on executable nodes with limited concurrency
254/** These tests check:
255 1) that the nodes will accepts puts up to the concurrency limit,
256 2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor),
257 3) the nodes will receive puts from multiple successors simultaneously,
258 and 4) the nodes will send to multiple predecessors.
259 There is no checking of the contents of the messages for corruption.
260*/
261
262template< typename InputType, typename OutputType, typename Body >
263void concurrency_levels( size_t concurrency, Body body ) {
264
265 for ( size_t lc = 1; lc <= concurrency; ++lc ) {
266 tbb::flow::graph g;
267
268 // Set the execute_counter back to zero in the harness
269 harness_graph_executor<InputType, OutputType>::execute_count = 0;
270 // Set the number of current executors to zero.
271 harness_graph_executor<InputType, OutputType>::current_executors = 0;
272 // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded.
273 harness_graph_executor<InputType, OutputType>::max_executors = lc;
274
275 typedef tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > fnode_type;
276 fnode_type exe_node( g, lc, body );
277
278 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
279
280 std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g));
281
282#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
283 ASSERT(exe_node.successor_count() == 0, NULL);
284 ASSERT(exe_node.predecessor_count() == 0, NULL);
285#endif
286
287 for (size_t r = 0; r < num_receivers; ++r ) {
288 tbb::flow::make_edge( exe_node, receivers[r] );
289 }
290#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
291 ASSERT(exe_node.successor_count() == num_receivers, NULL);
292 typename fnode_type::successor_list_type my_succs;
293 exe_node.copy_successors(my_succs);
294 ASSERT(my_succs.size() == num_receivers, NULL);
295 typename fnode_type::predecessor_list_type my_preds;
296 exe_node.copy_predecessors(my_preds);
297 ASSERT(my_preds.size() == 0, NULL);
298#endif
299
300 harness_counting_sender<InputType> *senders = NULL;
301
302 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
303 senders = new harness_counting_sender<InputType>[num_senders];
304 {
305 // Exclusively lock m to prevent exe_node from finishing
306 tbb::spin_rw_mutex::scoped_lock l( harness_graph_executor<InputType, OutputType>::template mutex_holder<tbb::spin_rw_mutex>::mutex );
307
308 // put to lc level, it will accept and then block at m
309 for ( size_t c = 0 ; c < lc ; ++c ) {
310 ASSERT( exe_node.try_put( InputType() ) == true, NULL );
311 }
312 // it only accepts to lc level
313 ASSERT( exe_node.try_put( InputType() ) == false, NULL );
314
315 for (size_t s = 0; s < num_senders; ++s ) {
316 // register a sender
317 senders[s].my_limit = N;
318 exe_node.register_predecessor( senders[s] );
319 }
320
321 } // release lock at end of scope, setting the exe node free to continue
322 // wait for graph to settle down
323 g.wait_for_all();
324
325 // confirm that each sender was requested from N times
326 for (size_t s = 0; s < num_senders; ++s ) {
327 size_t n = senders[s].my_received;
328 ASSERT( n == N, NULL );
329 ASSERT( senders[s].my_receiver == &exe_node, NULL );
330 }
331 // confirm that each receivers got N * num_senders + the initial lc puts
332 for (size_t r = 0; r < num_receivers; ++r ) {
333 size_t n = receivers[r].my_count;
334 ASSERT( n == num_senders*N+lc, NULL );
335 receivers[r].my_count = 0;
336 }
337 delete [] senders;
338 }
339 for (size_t r = 0; r < num_receivers; ++r ) {
340 tbb::flow::remove_edge( exe_node, receivers[r] );
341 }
342 ASSERT( exe_node.try_put( InputType() ) == true, NULL );
343 g.wait_for_all();
344 for (size_t r = 0; r < num_receivers; ++r ) {
345 ASSERT( int(receivers[r].my_count) == 0, NULL );
346 }
347 }
348 }
349}
350
351
352template< typename InputType, typename OutputType >
353void run_concurrency_levels( int c ) {
354 #if __TBB_CPP11_LAMBDAS_PRESENT
355 concurrency_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex>(i); } );
356 #endif
357 concurrency_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex> );
358 concurrency_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::template tfunctor<tbb::spin_rw_mutex>() );
359}
360
361
362struct empty_no_assign {
363 empty_no_assign() {}
364 empty_no_assign( int ) {}
365 operator int() { return 0; }
366};
367
368template< typename InputType >
369struct parallel_puts : private NoAssign {
370
371 tbb::flow::receiver< InputType > * const my_exe_node;
372
373 parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
374
375 void operator()( int ) const {
376 for ( int i = 0; i < N; ++i ) {
377 // the nodes will accept all puts
378 ASSERT( my_exe_node->try_put( InputType() ) == true, NULL );
379 }
380 }
381
382};
383
384//! Performs test on executable nodes with unlimited concurrency
385/** These tests check:
386 1) that the nodes will accept all puts
387 2) the nodes will receive puts from multiple predecessors simultaneously,
388 and 3) the nodes will send to multiple successors.
389 There is no checking of the contents of the messages for corruption.
390*/
391
392template< typename InputType, typename OutputType, typename Body >
393void unlimited_concurrency( Body body ) {
394
395 for (int p = 1; p < 2*MaxThread; ++p) {
396 tbb::flow::graph g;
397 tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body );
398
399 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
400
401 std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g));
402 harness_graph_executor<InputType, OutputType>::execute_count = 0;
403
404 for (size_t r = 0; r < num_receivers; ++r ) {
405 tbb::flow::make_edge( exe_node, receivers[r] );
406 }
407
408 NativeParallelFor( p, parallel_puts<InputType>(exe_node) );
409 g.wait_for_all();
410
411 // 2) the nodes will receive puts from multiple predecessors simultaneously,
412 size_t ec = harness_graph_executor<InputType, OutputType>::execute_count;
413 ASSERT( (int)ec == p*N, NULL );
414 for (size_t r = 0; r < num_receivers; ++r ) {
415 size_t c = receivers[r].my_count;
416 // 3) the nodes will send to multiple successors.
417 ASSERT( (int)c == p*N, NULL );
418 }
419 for (size_t r = 0; r < num_receivers; ++r ) {
420 tbb::flow::remove_edge( exe_node, receivers[r] );
421 }
422 }
423 }
424 }
425
426template< typename InputType, typename OutputType >
427void run_unlimited_concurrency() {
428 harness_graph_executor<InputType, OutputType>::max_executors = 0;
429 #if __TBB_CPP11_LAMBDAS_PRESENT
430 unlimited_concurrency<InputType,OutputType>( []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } );
431 #endif
432 unlimited_concurrency<InputType,OutputType>( &harness_graph_executor<InputType, OutputType>::func );
433 unlimited_concurrency<InputType,OutputType>( typename harness_graph_executor<InputType, OutputType>::functor() );
434}
435
436struct continue_msg_to_int {
437 int my_int;
438 continue_msg_to_int(int x) : my_int(x) {}
439 int operator()(tbb::flow::continue_msg) { return my_int; }
440};
441
442void test_function_node_with_continue_msg_as_input() {
443 // If this function terminates, then this test is successful
444 tbb::flow::graph g;
445
446 tbb::flow::broadcast_node<tbb::flow::continue_msg> Start(g);
447
448 tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN1( g, tbb::flow::serial, continue_msg_to_int(42));
449 tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN2( g, tbb::flow::serial, continue_msg_to_int(43));
450
451 tbb::flow::make_edge( Start, FN1 );
452 tbb::flow::make_edge( Start, FN2 );
453
454 Start.try_put( tbb::flow::continue_msg() );
455 g.wait_for_all();
456}
457
458//! Tests limited concurrency cases for nodes that accept data messages
459void test_concurrency(int num_threads) {
460 tbb::task_scheduler_init init(num_threads);
461 run_concurrency_levels<int,int>(num_threads);
462 run_concurrency_levels<int,tbb::flow::continue_msg>(num_threads);
463 run_buffered_levels<int, int>(num_threads);
464 run_unlimited_concurrency<int,int>();
465 run_unlimited_concurrency<int,empty_no_assign>();
466 run_unlimited_concurrency<empty_no_assign,int>();
467 run_unlimited_concurrency<empty_no_assign,empty_no_assign>();
468 run_unlimited_concurrency<int,tbb::flow::continue_msg>();
469 run_unlimited_concurrency<empty_no_assign,tbb::flow::continue_msg>();
470 test_function_node_with_continue_msg_as_input();
471}
472
473#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
474struct add_to_counter {
475 int* counter;
476 add_to_counter(int& var):counter(&var){}
477 int operator()(int i){*counter+=1; return i + 1;}
478};
479
480template<typename FTYPE>
481void test_extract() {
482 int my_count = 0;
483 int cm;
484 tbb::flow::graph g;
485 tbb::flow::broadcast_node<int> b0(g);
486 tbb::flow::broadcast_node<int> b1(g);
487 tbb::flow::function_node<int, int, FTYPE> f0(g, tbb::flow::unlimited, add_to_counter(my_count));
488 tbb::flow::queue_node<int> q0(g);
489
490 tbb::flow::make_edge(b0, f0);
491 tbb::flow::make_edge(b1, f0);
492 tbb::flow::make_edge(f0, q0);
493 for( int i = 0; i < 2; ++i ) {
494 ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts");
495 ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts");
496 ASSERT(f0.predecessor_count() == 2 && f0.successor_count() == 1, "f0 has incorrect counts");
497 ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts");
498
499 /* b0 */
500 /* \ */
501 /* f0 - q0 */
502 /* / */
503 /* b1 */
504
505 b0.try_put(1);
506 g.wait_for_all();
507 ASSERT(my_count == 1, "function_node didn't fire");
508 ASSERT(q0.try_get(cm), "function_node didn't forward");
509 b1.try_put(1);
510 g.wait_for_all();
511 ASSERT(my_count == 2, "function_node didn't fire");
512 ASSERT(q0.try_get(cm), "function_node didn't forward");
513
514 b0.extract();
515
516 /* b0 */
517 /* */
518 /* f0 - q0 */
519 /* / */
520 /* b1 */
521
522 ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts");
523 ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts");
524 ASSERT(f0.predecessor_count() == 1 && f0.successor_count() == 1, "f0 has incorrect counts");
525 ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts");
526 b0.try_put(1);
527 b0.try_put(1);
528 g.wait_for_all();
529 ASSERT(my_count == 2, "b0 messages being forwarded to function_node even though it is disconnected");
530 b1.try_put(1);
531 g.wait_for_all();
532 ASSERT(my_count == 3, "function_node didn't fire though it has only one predecessor");
533 ASSERT(q0.try_get(cm), "function_node didn't forward second time");
534
535 f0.extract();
536
537 /* b0 */
538 /* */
539 /* f0 q0 */
540 /* */
541 /* b1 */
542
543 ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts");
544 ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts");
545 ASSERT(f0.predecessor_count() == 0 && f0.successor_count() == 0, "f0 has incorrect counts");
546 ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts");
547 b0.try_put(1);
548 b0.try_put(1);
549 b1.try_put(1);
550 b1.try_put(1);
551 g.wait_for_all();
552 ASSERT(my_count == 3, "function_node didn't fire though it has only one predecessor");
553 ASSERT(!q0.try_get(cm), "function_node forwarded though it shouldn't");
554 make_edge(b0, f0);
555
556 /* b0 */
557 /* \ */
558 /* f0 q0 */
559 /* */
560 /* b1 */
561
562 ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts");
563 ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts");
564 ASSERT(f0.predecessor_count() == 1 && f0.successor_count() == 0, "f0 has incorrect counts");
565 ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts");
566
567 b0.try_put(int());
568 g.wait_for_all();
569
570 ASSERT(my_count == 4, "function_node didn't fire though it has only one predecessor");
571 ASSERT(!q0.try_get(cm), "function_node forwarded though it shouldn't");
572
573 tbb::flow::make_edge(b1, f0);
574 tbb::flow::make_edge(f0, q0);
575 my_count = 0;
576 }
577}
578#endif
579
580int TestMain() {
581 if( MinThread<1 ) {
582 REPORT("number of threads must be positive\n");
583 exit(1);
584 }
585 for( int p=MinThread; p<=MaxThread; ++p ) {
586 test_concurrency(p);
587 }
588 lightweight_testing::test<tbb::flow::function_node>(10);
589#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
590 test_extract<tbb::flow::rejecting>();
591 test_extract<tbb::flow::queueing>();
592#endif
593 return Harness::Done;
594}
595
596