1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#if __TBB_CPF_BUILD
18#define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
19#endif
20
21#include "harness_graph.h"
22
23#include "tbb/flow_graph.h"
24#include "tbb/task_scheduler_init.h"
25#include "tbb/spin_rw_mutex.h"
26
27#if TBB_USE_DEBUG
28#define N 16
29#else
30#define N 100
31#endif
32#define MAX_NODES 4
33
34//! Performs test on function nodes with limited concurrency and buffering
35/** These tests check:
36 1) that the number of executing copies never exceed the concurrency limit
37 2) that the node never rejects
38 3) that no items are lost
39 and 4) all of this happens even if there are multiple predecessors and successors
40*/
41
42template< typename InputType >
43struct parallel_put_until_limit : private NoAssign {
44
45 harness_counting_sender<InputType> *my_senders;
46
47 parallel_put_until_limit( harness_counting_sender<InputType> *senders ) : my_senders(senders) {}
48
49 void operator()( int i ) const {
50 if ( my_senders ) {
51 my_senders[i].try_put_until_limit();
52 }
53 }
54
55};
56
57//! exercise buffered multifunction_node.
58template< typename InputType, typename OutputTuple, typename Body >
59void buffered_levels( size_t concurrency, Body body ) {
60 typedef typename tbb::flow::tuple_element<0,OutputTuple>::type OutputType;
61 // Do for lc = 1 to concurrency level
62 for ( size_t lc = 1; lc <= concurrency; ++lc ) {
63 tbb::flow::graph g;
64
65 // Set the execute_counter back to zero in the harness
66 harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
67 // Set the number of current executors to zero.
68 harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0;
69 // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded.
70 harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc;
71
72 // Create the function_node with the appropriate concurrency level, and use default buffering
73 tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, body );
74
75 //Create a vector of identical exe_nodes
76 std::vector< tbb::flow::multifunction_node< InputType, OutputTuple > > exe_vec(2, exe_node);
77
78 // exercise each of the copied nodes
79 for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
80 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
81 // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them.
82 std::vector< harness_mapped_receiver<OutputType>* > receivers(num_receivers);
83 for (size_t i = 0; i < num_receivers; i++) {
84 receivers[i] = new harness_mapped_receiver<OutputType>(g);
85 }
86
87 for (size_t r = 0; r < num_receivers; ++r ) {
88 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] );
89 }
90
91 // Do the test with varying numbers of senders
92 harness_counting_sender<InputType> *senders = NULL;
93 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
94 // Create num_senders senders, set their message limit each to N, and connect them to the exe_vec[node_idx]
95 senders = new harness_counting_sender<InputType>[num_senders];
96 for (size_t s = 0; s < num_senders; ++s ) {
97 senders[s].my_limit = N;
98 tbb::flow::make_edge( senders[s], exe_vec[node_idx] );
99 }
100
101 // Initialize the receivers so they know how many senders and messages to check for
102 for (size_t r = 0; r < num_receivers; ++r ) {
103 receivers[r]->initialize_map( N, num_senders );
104 }
105
106 // Do the test
107 NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
108 g.wait_for_all();
109
110 // confirm that each sender was requested from N times
111 for (size_t s = 0; s < num_senders; ++s ) {
112 size_t n = senders[s].my_received;
113 ASSERT( n == N, NULL );
114 ASSERT( senders[s].my_receiver == &exe_vec[node_idx], NULL );
115 }
116 // validate the receivers
117 for (size_t r = 0; r < num_receivers; ++r ) {
118 receivers[r]->validate();
119 }
120 delete [] senders;
121 }
122 for (size_t r = 0; r < num_receivers; ++r ) {
123 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] );
124 }
125 ASSERT( exe_vec[node_idx].try_put( InputType() ) == true, NULL );
126 g.wait_for_all();
127 for (size_t r = 0; r < num_receivers; ++r ) {
128 // since it's detached, nothing should have changed
129 receivers[r]->validate();
130 }
131
132 for (size_t i = 0; i < num_receivers; i++) {
133 delete receivers[i];
134 }
135 }
136 }
137 }
138}
139
140const size_t Offset = 123;
141tbb::atomic<size_t> global_execute_count;
142
143struct inc_functor {
144
145 tbb::atomic<size_t> local_execute_count;
146 inc_functor( ) { local_execute_count = 0; }
147 inc_functor( const inc_functor &f ) { local_execute_count = f.local_execute_count; }
148
149 template<typename output_ports_type>
150 void operator()( int i, output_ports_type &p ) {
151 ++global_execute_count;
152 ++local_execute_count;
153 (void)tbb::flow::get<0>(p).try_put(i);
154 }
155
156};
157
158template< typename InputType, typename OutputTuple >
159void buffered_levels_with_copy( size_t concurrency ) {
160 typedef typename tbb::flow::tuple_element<0,OutputTuple>::type OutputType;
161 // Do for lc = 1 to concurrency level
162 for ( size_t lc = 1; lc <= concurrency; ++lc ) {
163 tbb::flow::graph g;
164
165 inc_functor cf;
166 cf.local_execute_count = Offset;
167 global_execute_count = Offset;
168
169 tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, cf );
170
171 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
172
173 std::vector< harness_mapped_receiver<OutputType>* > receivers(num_receivers);
174 for (size_t i = 0; i < num_receivers; i++) {
175 receivers[i] = new harness_mapped_receiver<OutputType>(g);
176 }
177
178 for (size_t r = 0; r < num_receivers; ++r ) {
179 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
180 }
181
182 harness_counting_sender<InputType> *senders = NULL;
183 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
184 senders = new harness_counting_sender<InputType>[num_senders];
185 for (size_t s = 0; s < num_senders; ++s ) {
186 senders[s].my_limit = N;
187 tbb::flow::make_edge( senders[s], exe_node );
188 }
189
190 for (size_t r = 0; r < num_receivers; ++r ) {
191 receivers[r]->initialize_map( N, num_senders );
192 }
193
194 NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
195 g.wait_for_all();
196
197 for (size_t s = 0; s < num_senders; ++s ) {
198 size_t n = senders[s].my_received;
199 ASSERT( n == N, NULL );
200 ASSERT( senders[s].my_receiver == &exe_node, NULL );
201 }
202 for (size_t r = 0; r < num_receivers; ++r ) {
203 receivers[r]->validate();
204 }
205 delete [] senders;
206 }
207 for (size_t r = 0; r < num_receivers; ++r ) {
208 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
209 }
210 ASSERT( exe_node.try_put( InputType() ) == true, NULL );
211 g.wait_for_all();
212 for (size_t r = 0; r < num_receivers; ++r ) {
213 receivers[r]->validate();
214 }
215
216 for (size_t i = 0; i < num_receivers; i++) {
217 delete receivers[i];
218 }
219 }
220
221 // validate that the local body matches the global execute_count and both are correct
222 inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
223 const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset;
224 size_t global_count = global_execute_count;
225 size_t inc_count = body_copy.local_execute_count;
226 ASSERT( global_count == expected_count && global_count == inc_count, NULL );
227 }
228}
229
230template< typename InputType, typename OutputTuple >
231void run_buffered_levels( int c ) {
232 #if __TBB_CPP11_LAMBDAS_PRESENT
233 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
234 buffered_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } );
235 #endif
236 buffered_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::func );
237 buffered_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() );
238 buffered_levels_with_copy<InputType,OutputTuple>( c );
239}
240
241
242//! Performs test on executable nodes with limited concurrency
243/** These tests check:
244 1) that the nodes will accepts puts up to the concurrency limit,
245 2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor),
246 3) the nodes will receive puts from multiple successors simultaneously,
247 and 4) the nodes will send to multiple predecessors.
248 There is no checking of the contents of the messages for corruption.
249*/
250
251template< typename InputType, typename OutputTuple, typename Body >
252void concurrency_levels( size_t concurrency, Body body ) {
253 typedef typename tbb::flow::tuple_element<0,OutputTuple>::type OutputType;
254 for ( size_t lc = 1; lc <= concurrency; ++lc ) {
255 tbb::flow::graph g;
256
257 // Set the execute_counter back to zero in the harness
258 harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
259 // Set the number of current executors to zero.
260 harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0;
261 // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded.
262 harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc;
263
264
265 tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, lc, body );
266
267 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
268
269 std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g));
270
271 for (size_t r = 0; r < num_receivers; ++r ) {
272 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), receivers[r] );
273 }
274
275 harness_counting_sender<InputType> *senders = NULL;
276
277 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
278 {
279 // Exclusively lock m to prevent exe_node from finishing
280 tbb::spin_rw_mutex::scoped_lock l( harness_graph_multifunction_executor< InputType, OutputTuple>::template mutex_holder<tbb::spin_rw_mutex>::mutex );
281
282 // put to lc level, it will accept and then block at m
283 for ( size_t c = 0 ; c < lc ; ++c ) {
284 ASSERT( exe_node.try_put( InputType() ) == true, NULL );
285 }
286 // it only accepts to lc level
287 ASSERT( exe_node.try_put( InputType() ) == false, NULL );
288
289 senders = new harness_counting_sender<InputType>[num_senders];
290 for (size_t s = 0; s < num_senders; ++s ) {
291 // register a sender
292 senders[s].my_limit = N;
293 exe_node.register_predecessor( senders[s] );
294 }
295
296 } // release lock at end of scope, setting the exe node free to continue
297 // wait for graph to settle down
298 g.wait_for_all();
299
300 // confirm that each sender was requested from N times
301 for (size_t s = 0; s < num_senders; ++s ) {
302 size_t n = senders[s].my_received;
303 ASSERT( n == N, NULL );
304 ASSERT( senders[s].my_receiver == &exe_node, NULL );
305 }
306 // confirm that each receivers got N * num_senders + the initial lc puts
307 for (size_t r = 0; r < num_receivers; ++r ) {
308 size_t n = receivers[r].my_count;
309 ASSERT( n == num_senders*N+lc, NULL );
310 receivers[r].my_count = 0;
311 }
312 delete [] senders;
313 }
314 for (size_t r = 0; r < num_receivers; ++r ) {
315 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), receivers[r] );
316 }
317 ASSERT( exe_node.try_put( InputType() ) == true, NULL );
318 g.wait_for_all();
319 for (size_t r = 0; r < num_receivers; ++r ) {
320 ASSERT( int(receivers[r].my_count) == 0, NULL );
321 }
322 }
323 }
324}
325
326template< typename InputType, typename OutputTuple >
327void run_concurrency_levels( int c ) {
328 #if __TBB_CPP11_LAMBDAS_PRESENT
329 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
330 concurrency_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex>(i,p); } );
331 #endif
332 concurrency_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex> );
333 concurrency_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunctor<tbb::spin_rw_mutex>() );
334}
335
336
337struct empty_no_assign {
338 empty_no_assign() {}
339 empty_no_assign( int ) {}
340 operator int() { return 0; }
341 operator int() const { return 0; }
342};
343
344template< typename InputType >
345struct parallel_puts : private NoAssign {
346
347 tbb::flow::receiver< InputType > * const my_exe_node;
348
349 parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
350
351 void operator()( int ) const {
352 for ( int i = 0; i < N; ++i ) {
353 // the nodes will accept all puts
354 ASSERT( my_exe_node->try_put( InputType() ) == true, NULL );
355 }
356 }
357
358};
359
360//! Performs test on executable nodes with unlimited concurrency
361/** These tests check:
362 1) that the nodes will accept all puts
363 2) the nodes will receive puts from multiple predecessors simultaneously,
364 and 3) the nodes will send to multiple successors.
365 There is no checking of the contents of the messages for corruption.
366*/
367
368template< typename InputType, typename OutputTuple, typename Body >
369void unlimited_concurrency( Body body ) {
370 typedef typename tbb::flow::tuple_element<0,OutputTuple>::type OutputType;
371
372 for (int p = 1; p < 2*MaxThread; ++p) {
373 tbb::flow::graph g;
374 tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body );
375
376 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
377 std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g));
378
379 harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
380
381 for (size_t r = 0; r < num_receivers; ++r ) {
382 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), receivers[r] );
383 }
384
385 NativeParallelFor( p, parallel_puts<InputType>(exe_node) );
386 g.wait_for_all();
387
388 // 2) the nodes will receive puts from multiple predecessors simultaneously,
389 size_t ec = harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count;
390 ASSERT( (int)ec == p*N, NULL );
391 for (size_t r = 0; r < num_receivers; ++r ) {
392 size_t c = receivers[r].my_count;
393 // 3) the nodes will send to multiple successors.
394 ASSERT( (int)c == p*N, NULL );
395 }
396 for (size_t r = 0; r < num_receivers; ++r ) {
397 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), receivers[r] );
398 }
399 }
400 }
401}
402
403template< typename InputType, typename OutputTuple >
404void run_unlimited_concurrency() {
405 harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0;
406 #if __TBB_CPP11_LAMBDAS_PRESENT
407 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
408 unlimited_concurrency<InputType,OutputTuple>( []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } );
409 #endif
410 unlimited_concurrency<InputType,OutputTuple>( &harness_graph_multifunction_executor<InputType, OutputTuple>::func );
411 unlimited_concurrency<InputType,OutputTuple>( typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() );
412}
413
414template<typename InputType, typename OutputTuple>
415struct oddEvenBody {
416 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
417 typedef typename tbb::flow::tuple_element<0,OutputTuple>::type EvenType;
418 typedef typename tbb::flow::tuple_element<1,OutputTuple>::type OddType;
419 void operator() (const InputType &i, output_ports_type &p) {
420 if((int)i % 2) {
421 (void)tbb::flow::get<1>(p).try_put(OddType(i));
422 }
423 else {
424 (void)tbb::flow::get<0>(p).try_put(EvenType(i));
425 }
426 }
427};
428
429template<typename InputType, typename OutputTuple >
430void run_multiport_test(int num_threads) {
431 typedef typename tbb::flow::multifunction_node<InputType, OutputTuple> mo_node_type;
432 typedef typename tbb::flow::tuple_element<0,OutputTuple>::type EvenType;
433 typedef typename tbb::flow::tuple_element<1,OutputTuple>::type OddType;
434 tbb::task_scheduler_init init(num_threads);
435 tbb::flow::graph g;
436 mo_node_type mo_node(g, tbb::flow::unlimited, oddEvenBody<InputType, OutputTuple>() );
437
438 tbb::flow::queue_node<EvenType> q0(g);
439 tbb::flow::queue_node<OddType> q1(g);
440
441 tbb::flow::make_edge(tbb::flow::output_port<0>(mo_node), q0);
442 tbb::flow::make_edge(tbb::flow::output_port<1>(mo_node), q1);
443
444#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
445 ASSERT(mo_node.predecessor_count() == 0, NULL);
446 ASSERT(tbb::flow::output_port<0>(mo_node).successor_count() == 1, NULL);
447 typedef typename mo_node_type::output_ports_type oports_type;
448 typedef typename tbb::flow::tuple_element<0,oports_type>::type port0_type;
449 typename port0_type::successor_list_type my_0succs;
450 tbb::flow::output_port<0>(mo_node).copy_successors(my_0succs);
451 ASSERT(my_0succs.size() == 1, NULL);
452 typename mo_node_type::predecessor_list_type my_preds;
453 mo_node.copy_predecessors(my_preds);
454 ASSERT(my_preds.size() == 0, NULL);
455#endif
456
457 for(InputType i = 0; i < N; ++i) {
458 mo_node.try_put(i);
459 }
460
461 g.wait_for_all();
462 for(int i = 0; i < N/2; ++i) {
463 EvenType e;
464 OddType o;
465 ASSERT(q0.try_get(e) && (int)e % 2 == 0, NULL);
466 ASSERT(q1.try_get(o) && (int)o % 2 == 1, NULL);
467 }
468}
469
470//! Tests limited concurrency cases for nodes that accept data messages
471void test_concurrency(int num_threads) {
472 tbb::task_scheduler_init init(num_threads);
473 run_concurrency_levels<int,tbb::flow::tuple<int> >(num_threads);
474 run_concurrency_levels<int,tbb::flow::tuple<tbb::flow::continue_msg> >(num_threads);
475 run_buffered_levels<int, tbb::flow::tuple<int> >(num_threads);
476 run_unlimited_concurrency<int, tbb::flow::tuple<int> >();
477 run_unlimited_concurrency<int,tbb::flow::tuple<empty_no_assign> >();
478 run_unlimited_concurrency<empty_no_assign,tbb::flow::tuple<int> >();
479 run_unlimited_concurrency<empty_no_assign,tbb::flow::tuple<empty_no_assign> >();
480 run_unlimited_concurrency<int,tbb::flow::tuple<tbb::flow::continue_msg> >();
481 run_unlimited_concurrency<empty_no_assign,tbb::flow::tuple<tbb::flow::continue_msg> >();
482 run_multiport_test<int, tbb::flow::tuple<int, int> >(num_threads);
483 run_multiport_test<float, tbb::flow::tuple<int, double> >(num_threads);
484}
485
486template<typename Policy>
487void test_ports_return_references() {
488 tbb::flow::graph g;
489 typedef int InputType;
490 typedef tbb::flow::tuple<int> OutputTuple;
491 tbb::flow::multifunction_node<InputType, OutputTuple, Policy> mf_node(
492 g, tbb::flow::unlimited,
493 &harness_graph_multifunction_executor<InputType, OutputTuple>::empty_func );
494 test_output_ports_return_ref(mf_node);
495}
496
497#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
498// the integer received indicates which output ports should succeed and which should fail
499// on try_put().
500typedef tbb::flow::multifunction_node<int, tbb::flow::tuple<int, int> > mf_node_type;
501
502struct add_to_counter {
503 int my_invocations;
504 int *counter;
505 add_to_counter(int& var):counter(&var){ my_invocations = 0;}
506 void operator()(const int &i, mf_node_type::output_ports_type &outports) {
507 *counter+=1;
508 ++my_invocations;
509 if(i & 0x1) {
510 ASSERT(tbb::flow::get<0>(outports).try_put(i), "port 0 expected to succeed");
511 }
512 else {
513 ASSERT(!tbb::flow::get<0>(outports).try_put(i), "port 0 expected to fail");
514 }
515 if(i & 0x2) {
516 ASSERT(tbb::flow::get<1>(outports).try_put(i), "port 1 expected to succeed");
517 }
518 else {
519 ASSERT(!tbb::flow::get<1>(outports).try_put(i), "port 1 expected to fail");
520 }
521 }
522 int my_inner() { return my_invocations; }
523};
524
525template<class FTYPE>
526void test_extract() {
527 int my_count = 0;
528 int cm;
529 tbb::flow::graph g;
530 tbb::flow::broadcast_node<int> b0(g);
531 tbb::flow::broadcast_node<int> b1(g);
532 tbb::flow::multifunction_node<int, tbb::flow::tuple<int,int>, FTYPE> mf0(g, tbb::flow::unlimited, add_to_counter(my_count));
533 tbb::flow::queue_node<int> q0(g);
534 tbb::flow::queue_node<int> q1(g);
535
536 tbb::flow::make_edge(b0, mf0);
537 tbb::flow::make_edge(b1, mf0);
538 tbb::flow::make_edge(tbb::flow::output_port<0>(mf0), q0);
539 tbb::flow::make_edge(tbb::flow::output_port<1>(mf0), q1);
540 for( int i = 0; i < 2; ++i ) {
541
542 /* b0 */
543 /* \ |--q0 */
544 /* mf0+ */
545 /* / |--q1 */
546 /* b1 */
547
548 ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts");
549 ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts");
550 ASSERT(mf0.predecessor_count() == 2
551 && tbb::flow::output_port<0>(mf0).successor_count() == 1
552 && tbb::flow::output_port<1>(mf0).successor_count() == 1
553 , "mf0 has incorrect counts");
554 ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts");
555 ASSERT(q1.predecessor_count() == 1 && q1.successor_count() == 0, "q0 has incorrect counts");
556 b0.try_put(3);
557 g.wait_for_all();
558 ASSERT(my_count == 1, "multifunction_node didn't fire");
559 ASSERT(q0.try_get(cm), "multifunction_node didn't forward to 0");
560 ASSERT(q1.try_get(cm), "multifunction_node didn't forward to 1");
561 b1.try_put(3);
562 g.wait_for_all();
563 ASSERT(my_count == 2, "multifunction_node didn't fire");
564 ASSERT(q0.try_get(cm), "multifunction_node didn't forward to 0");
565 ASSERT(q1.try_get(cm), "multifunction_node didn't forward to 1");
566
567 b0.extract();
568
569
570 /* b0 */
571 /* |--q0 */
572 /* mf0+ */
573 /* / |--q1 */
574 /* b1 */
575
576 ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts");
577 ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts");
578 ASSERT(mf0.predecessor_count() == 1
579 && tbb::flow::output_port<0>(mf0).successor_count() == 1
580 && tbb::flow::output_port<1>(mf0).successor_count() == 1
581 , "mf0 has incorrect counts");
582 ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts");
583 ASSERT(q1.predecessor_count() == 1 && q1.successor_count() == 0, "q0 has incorrect counts");
584 b0.try_put(1);
585 b0.try_put(1);
586 g.wait_for_all();
587 ASSERT(my_count == 2, "b0 messages being forwarded to multifunction_node even though it is disconnected");
588 b1.try_put(3);
589 g.wait_for_all();
590 ASSERT(my_count == 3, "multifunction_node didn't fire though it has only one predecessor");
591 ASSERT(q0.try_get(cm), "multifunction_node didn't forward second time");
592 ASSERT(q1.try_get(cm), "multifunction_node didn't forward second time");
593
594 q0.extract();
595
596 /* b0 */
597 /* | q0 */
598 /* mf0+ */
599 /* / |--q1 */
600 /* b1 */
601
602 ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts");
603 ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts");
604 ASSERT(mf0.predecessor_count() == 1
605 && tbb::flow::output_port<0>(mf0).successor_count() == 0
606 && tbb::flow::output_port<1>(mf0).successor_count() == 1
607 , "mf0 has incorrect counts");
608 ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts");
609 ASSERT(q1.predecessor_count() == 1 && q1.successor_count() == 0, "q0 has incorrect counts");
610 b0.try_put(1);
611 b0.try_put(1);
612 g.wait_for_all();
613 ASSERT(my_count == 3, "b0 messages being forwarded to multifunction_node even though it is disconnected");
614 b1.try_put(2);
615 g.wait_for_all();
616 ASSERT(my_count == 4, "multifunction_node didn't fire though it has one predecessor");
617 ASSERT(!q0.try_get(cm), "multifunction_node forwarded");
618 ASSERT(q1.try_get(cm), "multifunction_node forwarded");
619 mf0.extract();
620
621 if(i == 0) {
622 }
623 else {
624 g.reset(tbb::flow::rf_reset_bodies);
625 }
626
627
628 /* b0 */
629 /* | q0 */
630 /* mf0+ */
631 /* | q1 */
632 /* b1 */
633
634 ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts");
635 ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts");
636 ASSERT(mf0.predecessor_count() == 0
637 && tbb::flow::output_port<0>(mf0).successor_count() == 0
638 && tbb::flow::output_port<1>(mf0).successor_count() == 0
639 , "mf0 has incorrect counts");
640 ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts");
641 ASSERT(q1.predecessor_count() == 0 && q1.successor_count() == 0, "q0 has incorrect counts");
642 b0.try_put(1);
643 b0.try_put(1);
644 g.wait_for_all();
645 ASSERT(my_count == 4, "b0 messages being forwarded to multifunction_node even though it is disconnected");
646 b1.try_put(2);
647 g.wait_for_all();
648 ASSERT(my_count == 4, "b1 messages being forwarded to multifunction_node even though it is disconnected");
649 ASSERT(!q0.try_get(cm), "multifunction_node forwarded");
650 ASSERT(!q1.try_get(cm), "multifunction_node forwarded");
651 make_edge(b0, mf0);
652
653 /* b0 */
654 /* \ | q0 */
655 /* mf0+ */
656 /* | q1 */
657 /* b1 */
658
659 ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts");
660 ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts");
661 ASSERT(mf0.predecessor_count() == 1
662 && tbb::flow::output_port<0>(mf0).successor_count() == 0
663 && tbb::flow::output_port<1>(mf0).successor_count() == 0
664 , "mf0 has incorrect counts");
665 ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts");
666 ASSERT(q1.predecessor_count() == 0 && q1.successor_count() == 0, "q0 has incorrect counts");
667 b0.try_put(0);
668 g.wait_for_all();
669 ASSERT(my_count == 5, "multifunction_node didn't fire though it has one predecessor");
670 b1.try_put(2);
671 g.wait_for_all();
672 ASSERT(my_count == 5, "multifunction_node fired though it has only one predecessor");
673 ASSERT(!q0.try_get(cm), "multifunction_node forwarded");
674 ASSERT(!q1.try_get(cm), "multifunction_node forwarded");
675
676 tbb::flow::make_edge(b1, mf0);
677 tbb::flow::make_edge(tbb::flow::output_port<0>(mf0), q0);
678 tbb::flow::make_edge(tbb::flow::output_port<1>(mf0), q1);
679 ASSERT( ( i == 0 && tbb::flow::copy_body<add_to_counter>(mf0).my_inner() == 5 ) ||
680 ( i == 1 && tbb::flow::copy_body<add_to_counter>(mf0).my_inner() == 1 ) , "reset_bodies failed");
681 my_count = 0;
682 }
683}
684#endif
685
686int TestMain() {
687 if( MinThread<1 ) {
688 REPORT("number of threads must be positive\n");
689 exit(1);
690 }
691 for( int p=MinThread; p<=MaxThread; ++p ) {
692 test_concurrency(p);
693 }
694 test_ports_return_references<tbb::flow::queueing>();
695 test_ports_return_references<tbb::flow::rejecting>();
696 lightweight_testing::test<tbb::flow::multifunction_node>(10);
697#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
698 test_extract<tbb::flow::rejecting>();
699 test_extract<tbb::flow::queueing>();
700#endif
701 return Harness::Done;
702}
703