1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | #if __TBB_CPF_BUILD |
18 | #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1 |
19 | #endif |
20 | |
21 | #include "harness_graph.h" |
22 | |
23 | #include "tbb/flow_graph.h" |
24 | #include "tbb/task_scheduler_init.h" |
25 | #include "tbb/spin_rw_mutex.h" |
26 | |
27 | #if TBB_USE_DEBUG |
28 | #define N 16 |
29 | #else |
30 | #define N 100 |
31 | #endif |
32 | #define MAX_NODES 4 |
33 | |
34 | //! Performs test on function nodes with limited concurrency and buffering |
35 | /** These tests check: |
36 | 1) that the number of executing copies never exceed the concurrency limit |
37 | 2) that the node never rejects |
38 | 3) that no items are lost |
39 | and 4) all of this happens even if there are multiple predecessors and successors |
40 | */ |
41 | |
42 | template< typename InputType > |
43 | struct parallel_put_until_limit : private NoAssign { |
44 | |
45 | harness_counting_sender<InputType> *my_senders; |
46 | |
47 | parallel_put_until_limit( harness_counting_sender<InputType> *senders ) : my_senders(senders) {} |
48 | |
49 | void operator()( int i ) const { |
50 | if ( my_senders ) { |
51 | my_senders[i].try_put_until_limit(); |
52 | } |
53 | } |
54 | |
55 | }; |
56 | |
57 | //! exercise buffered multifunction_node. |
58 | template< typename InputType, typename OutputTuple, typename Body > |
59 | void buffered_levels( size_t concurrency, Body body ) { |
60 | typedef typename tbb::flow::tuple_element<0,OutputTuple>::type OutputType; |
61 | // Do for lc = 1 to concurrency level |
62 | for ( size_t lc = 1; lc <= concurrency; ++lc ) { |
63 | tbb::flow::graph g; |
64 | |
65 | // Set the execute_counter back to zero in the harness |
66 | harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; |
67 | // Set the number of current executors to zero. |
68 | harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0; |
69 | // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. |
70 | harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc; |
71 | |
72 | // Create the function_node with the appropriate concurrency level, and use default buffering |
73 | tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, body ); |
74 | |
75 | //Create a vector of identical exe_nodes |
76 | std::vector< tbb::flow::multifunction_node< InputType, OutputTuple > > exe_vec(2, exe_node); |
77 | |
78 | // exercise each of the copied nodes |
79 | for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { |
80 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { |
81 | // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them. |
82 | std::vector< harness_mapped_receiver<OutputType>* > receivers(num_receivers); |
83 | for (size_t i = 0; i < num_receivers; i++) { |
84 | receivers[i] = new harness_mapped_receiver<OutputType>(g); |
85 | } |
86 | |
87 | for (size_t r = 0; r < num_receivers; ++r ) { |
88 | tbb::flow::make_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] ); |
89 | } |
90 | |
91 | // Do the test with varying numbers of senders |
92 | harness_counting_sender<InputType> *senders = NULL; |
93 | for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { |
94 | // Create num_senders senders, set their message limit each to N, and connect them to the exe_vec[node_idx] |
95 | senders = new harness_counting_sender<InputType>[num_senders]; |
96 | for (size_t s = 0; s < num_senders; ++s ) { |
97 | senders[s].my_limit = N; |
98 | tbb::flow::make_edge( senders[s], exe_vec[node_idx] ); |
99 | } |
100 | |
101 | // Initialize the receivers so they know how many senders and messages to check for |
102 | for (size_t r = 0; r < num_receivers; ++r ) { |
103 | receivers[r]->initialize_map( N, num_senders ); |
104 | } |
105 | |
106 | // Do the test |
107 | NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); |
108 | g.wait_for_all(); |
109 | |
110 | // confirm that each sender was requested from N times |
111 | for (size_t s = 0; s < num_senders; ++s ) { |
112 | size_t n = senders[s].my_received; |
113 | ASSERT( n == N, NULL ); |
114 | ASSERT( senders[s].my_receiver == &exe_vec[node_idx], NULL ); |
115 | } |
116 | // validate the receivers |
117 | for (size_t r = 0; r < num_receivers; ++r ) { |
118 | receivers[r]->validate(); |
119 | } |
120 | delete [] senders; |
121 | } |
122 | for (size_t r = 0; r < num_receivers; ++r ) { |
123 | tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] ); |
124 | } |
125 | ASSERT( exe_vec[node_idx].try_put( InputType() ) == true, NULL ); |
126 | g.wait_for_all(); |
127 | for (size_t r = 0; r < num_receivers; ++r ) { |
128 | // since it's detached, nothing should have changed |
129 | receivers[r]->validate(); |
130 | } |
131 | |
132 | for (size_t i = 0; i < num_receivers; i++) { |
133 | delete receivers[i]; |
134 | } |
135 | } |
136 | } |
137 | } |
138 | } |
139 | |
140 | const size_t Offset = 123; |
141 | tbb::atomic<size_t> global_execute_count; |
142 | |
143 | struct inc_functor { |
144 | |
145 | tbb::atomic<size_t> local_execute_count; |
146 | inc_functor( ) { local_execute_count = 0; } |
147 | inc_functor( const inc_functor &f ) { local_execute_count = f.local_execute_count; } |
148 | |
149 | template<typename output_ports_type> |
150 | void operator()( int i, output_ports_type &p ) { |
151 | ++global_execute_count; |
152 | ++local_execute_count; |
153 | (void)tbb::flow::get<0>(p).try_put(i); |
154 | } |
155 | |
156 | }; |
157 | |
158 | template< typename InputType, typename OutputTuple > |
159 | void buffered_levels_with_copy( size_t concurrency ) { |
160 | typedef typename tbb::flow::tuple_element<0,OutputTuple>::type OutputType; |
161 | // Do for lc = 1 to concurrency level |
162 | for ( size_t lc = 1; lc <= concurrency; ++lc ) { |
163 | tbb::flow::graph g; |
164 | |
165 | inc_functor cf; |
166 | cf.local_execute_count = Offset; |
167 | global_execute_count = Offset; |
168 | |
169 | tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, cf ); |
170 | |
171 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { |
172 | |
173 | std::vector< harness_mapped_receiver<OutputType>* > receivers(num_receivers); |
174 | for (size_t i = 0; i < num_receivers; i++) { |
175 | receivers[i] = new harness_mapped_receiver<OutputType>(g); |
176 | } |
177 | |
178 | for (size_t r = 0; r < num_receivers; ++r ) { |
179 | tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); |
180 | } |
181 | |
182 | harness_counting_sender<InputType> *senders = NULL; |
183 | for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { |
184 | senders = new harness_counting_sender<InputType>[num_senders]; |
185 | for (size_t s = 0; s < num_senders; ++s ) { |
186 | senders[s].my_limit = N; |
187 | tbb::flow::make_edge( senders[s], exe_node ); |
188 | } |
189 | |
190 | for (size_t r = 0; r < num_receivers; ++r ) { |
191 | receivers[r]->initialize_map( N, num_senders ); |
192 | } |
193 | |
194 | NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); |
195 | g.wait_for_all(); |
196 | |
197 | for (size_t s = 0; s < num_senders; ++s ) { |
198 | size_t n = senders[s].my_received; |
199 | ASSERT( n == N, NULL ); |
200 | ASSERT( senders[s].my_receiver == &exe_node, NULL ); |
201 | } |
202 | for (size_t r = 0; r < num_receivers; ++r ) { |
203 | receivers[r]->validate(); |
204 | } |
205 | delete [] senders; |
206 | } |
207 | for (size_t r = 0; r < num_receivers; ++r ) { |
208 | tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); |
209 | } |
210 | ASSERT( exe_node.try_put( InputType() ) == true, NULL ); |
211 | g.wait_for_all(); |
212 | for (size_t r = 0; r < num_receivers; ++r ) { |
213 | receivers[r]->validate(); |
214 | } |
215 | |
216 | for (size_t i = 0; i < num_receivers; i++) { |
217 | delete receivers[i]; |
218 | } |
219 | } |
220 | |
221 | // validate that the local body matches the global execute_count and both are correct |
222 | inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); |
223 | const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset; |
224 | size_t global_count = global_execute_count; |
225 | size_t inc_count = body_copy.local_execute_count; |
226 | ASSERT( global_count == expected_count && global_count == inc_count, NULL ); |
227 | } |
228 | } |
229 | |
230 | template< typename InputType, typename OutputTuple > |
231 | void run_buffered_levels( int c ) { |
232 | #if __TBB_CPP11_LAMBDAS_PRESENT |
233 | typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; |
234 | buffered_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } ); |
235 | #endif |
236 | buffered_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::func ); |
237 | buffered_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() ); |
238 | buffered_levels_with_copy<InputType,OutputTuple>( c ); |
239 | } |
240 | |
241 | |
242 | //! Performs test on executable nodes with limited concurrency |
243 | /** These tests check: |
244 | 1) that the nodes will accepts puts up to the concurrency limit, |
245 | 2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor), |
246 | 3) the nodes will receive puts from multiple successors simultaneously, |
247 | and 4) the nodes will send to multiple predecessors. |
248 | There is no checking of the contents of the messages for corruption. |
249 | */ |
250 | |
251 | template< typename InputType, typename OutputTuple, typename Body > |
252 | void concurrency_levels( size_t concurrency, Body body ) { |
253 | typedef typename tbb::flow::tuple_element<0,OutputTuple>::type OutputType; |
254 | for ( size_t lc = 1; lc <= concurrency; ++lc ) { |
255 | tbb::flow::graph g; |
256 | |
257 | // Set the execute_counter back to zero in the harness |
258 | harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; |
259 | // Set the number of current executors to zero. |
260 | harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0; |
261 | // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. |
262 | harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc; |
263 | |
264 | |
265 | tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, lc, body ); |
266 | |
267 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { |
268 | |
269 | std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g)); |
270 | |
271 | for (size_t r = 0; r < num_receivers; ++r ) { |
272 | tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), receivers[r] ); |
273 | } |
274 | |
275 | harness_counting_sender<InputType> *senders = NULL; |
276 | |
277 | for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { |
278 | { |
279 | // Exclusively lock m to prevent exe_node from finishing |
280 | tbb::spin_rw_mutex::scoped_lock l( harness_graph_multifunction_executor< InputType, OutputTuple>::template mutex_holder<tbb::spin_rw_mutex>::mutex ); |
281 | |
282 | // put to lc level, it will accept and then block at m |
283 | for ( size_t c = 0 ; c < lc ; ++c ) { |
284 | ASSERT( exe_node.try_put( InputType() ) == true, NULL ); |
285 | } |
286 | // it only accepts to lc level |
287 | ASSERT( exe_node.try_put( InputType() ) == false, NULL ); |
288 | |
289 | senders = new harness_counting_sender<InputType>[num_senders]; |
290 | for (size_t s = 0; s < num_senders; ++s ) { |
291 | // register a sender |
292 | senders[s].my_limit = N; |
293 | exe_node.register_predecessor( senders[s] ); |
294 | } |
295 | |
296 | } // release lock at end of scope, setting the exe node free to continue |
297 | // wait for graph to settle down |
298 | g.wait_for_all(); |
299 | |
300 | // confirm that each sender was requested from N times |
301 | for (size_t s = 0; s < num_senders; ++s ) { |
302 | size_t n = senders[s].my_received; |
303 | ASSERT( n == N, NULL ); |
304 | ASSERT( senders[s].my_receiver == &exe_node, NULL ); |
305 | } |
306 | // confirm that each receivers got N * num_senders + the initial lc puts |
307 | for (size_t r = 0; r < num_receivers; ++r ) { |
308 | size_t n = receivers[r].my_count; |
309 | ASSERT( n == num_senders*N+lc, NULL ); |
310 | receivers[r].my_count = 0; |
311 | } |
312 | delete [] senders; |
313 | } |
314 | for (size_t r = 0; r < num_receivers; ++r ) { |
315 | tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), receivers[r] ); |
316 | } |
317 | ASSERT( exe_node.try_put( InputType() ) == true, NULL ); |
318 | g.wait_for_all(); |
319 | for (size_t r = 0; r < num_receivers; ++r ) { |
320 | ASSERT( int(receivers[r].my_count) == 0, NULL ); |
321 | } |
322 | } |
323 | } |
324 | } |
325 | |
326 | template< typename InputType, typename OutputTuple > |
327 | void run_concurrency_levels( int c ) { |
328 | #if __TBB_CPP11_LAMBDAS_PRESENT |
329 | typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; |
330 | concurrency_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex>(i,p); } ); |
331 | #endif |
332 | concurrency_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex> ); |
333 | concurrency_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunctor<tbb::spin_rw_mutex>() ); |
334 | } |
335 | |
336 | |
337 | struct empty_no_assign { |
338 | empty_no_assign() {} |
339 | empty_no_assign( int ) {} |
340 | operator int() { return 0; } |
341 | operator int() const { return 0; } |
342 | }; |
343 | |
344 | template< typename InputType > |
345 | struct parallel_puts : private NoAssign { |
346 | |
347 | tbb::flow::receiver< InputType > * const my_exe_node; |
348 | |
349 | parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} |
350 | |
351 | void operator()( int ) const { |
352 | for ( int i = 0; i < N; ++i ) { |
353 | // the nodes will accept all puts |
354 | ASSERT( my_exe_node->try_put( InputType() ) == true, NULL ); |
355 | } |
356 | } |
357 | |
358 | }; |
359 | |
360 | //! Performs test on executable nodes with unlimited concurrency |
361 | /** These tests check: |
362 | 1) that the nodes will accept all puts |
363 | 2) the nodes will receive puts from multiple predecessors simultaneously, |
364 | and 3) the nodes will send to multiple successors. |
365 | There is no checking of the contents of the messages for corruption. |
366 | */ |
367 | |
368 | template< typename InputType, typename OutputTuple, typename Body > |
369 | void unlimited_concurrency( Body body ) { |
370 | typedef typename tbb::flow::tuple_element<0,OutputTuple>::type OutputType; |
371 | |
372 | for (int p = 1; p < 2*MaxThread; ++p) { |
373 | tbb::flow::graph g; |
374 | tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body ); |
375 | |
376 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { |
377 | std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g)); |
378 | |
379 | harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; |
380 | |
381 | for (size_t r = 0; r < num_receivers; ++r ) { |
382 | tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), receivers[r] ); |
383 | } |
384 | |
385 | NativeParallelFor( p, parallel_puts<InputType>(exe_node) ); |
386 | g.wait_for_all(); |
387 | |
388 | // 2) the nodes will receive puts from multiple predecessors simultaneously, |
389 | size_t ec = harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count; |
390 | ASSERT( (int)ec == p*N, NULL ); |
391 | for (size_t r = 0; r < num_receivers; ++r ) { |
392 | size_t c = receivers[r].my_count; |
393 | // 3) the nodes will send to multiple successors. |
394 | ASSERT( (int)c == p*N, NULL ); |
395 | } |
396 | for (size_t r = 0; r < num_receivers; ++r ) { |
397 | tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), receivers[r] ); |
398 | } |
399 | } |
400 | } |
401 | } |
402 | |
403 | template< typename InputType, typename OutputTuple > |
404 | void run_unlimited_concurrency() { |
405 | harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0; |
406 | #if __TBB_CPP11_LAMBDAS_PRESENT |
407 | typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; |
408 | unlimited_concurrency<InputType,OutputTuple>( []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } ); |
409 | #endif |
410 | unlimited_concurrency<InputType,OutputTuple>( &harness_graph_multifunction_executor<InputType, OutputTuple>::func ); |
411 | unlimited_concurrency<InputType,OutputTuple>( typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() ); |
412 | } |
413 | |
414 | template<typename InputType, typename OutputTuple> |
415 | struct oddEvenBody { |
416 | typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; |
417 | typedef typename tbb::flow::tuple_element<0,OutputTuple>::type EvenType; |
418 | typedef typename tbb::flow::tuple_element<1,OutputTuple>::type OddType; |
419 | void operator() (const InputType &i, output_ports_type &p) { |
420 | if((int)i % 2) { |
421 | (void)tbb::flow::get<1>(p).try_put(OddType(i)); |
422 | } |
423 | else { |
424 | (void)tbb::flow::get<0>(p).try_put(EvenType(i)); |
425 | } |
426 | } |
427 | }; |
428 | |
429 | template<typename InputType, typename OutputTuple > |
430 | void run_multiport_test(int num_threads) { |
431 | typedef typename tbb::flow::multifunction_node<InputType, OutputTuple> mo_node_type; |
432 | typedef typename tbb::flow::tuple_element<0,OutputTuple>::type EvenType; |
433 | typedef typename tbb::flow::tuple_element<1,OutputTuple>::type OddType; |
434 | tbb::task_scheduler_init init(num_threads); |
435 | tbb::flow::graph g; |
436 | mo_node_type mo_node(g, tbb::flow::unlimited, oddEvenBody<InputType, OutputTuple>() ); |
437 | |
438 | tbb::flow::queue_node<EvenType> q0(g); |
439 | tbb::flow::queue_node<OddType> q1(g); |
440 | |
441 | tbb::flow::make_edge(tbb::flow::output_port<0>(mo_node), q0); |
442 | tbb::flow::make_edge(tbb::flow::output_port<1>(mo_node), q1); |
443 | |
444 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
445 | ASSERT(mo_node.predecessor_count() == 0, NULL); |
446 | ASSERT(tbb::flow::output_port<0>(mo_node).successor_count() == 1, NULL); |
447 | typedef typename mo_node_type::output_ports_type oports_type; |
448 | typedef typename tbb::flow::tuple_element<0,oports_type>::type port0_type; |
449 | typename port0_type::successor_list_type my_0succs; |
450 | tbb::flow::output_port<0>(mo_node).copy_successors(my_0succs); |
451 | ASSERT(my_0succs.size() == 1, NULL); |
452 | typename mo_node_type::predecessor_list_type my_preds; |
453 | mo_node.copy_predecessors(my_preds); |
454 | ASSERT(my_preds.size() == 0, NULL); |
455 | #endif |
456 | |
457 | for(InputType i = 0; i < N; ++i) { |
458 | mo_node.try_put(i); |
459 | } |
460 | |
461 | g.wait_for_all(); |
462 | for(int i = 0; i < N/2; ++i) { |
463 | EvenType e; |
464 | OddType o; |
465 | ASSERT(q0.try_get(e) && (int)e % 2 == 0, NULL); |
466 | ASSERT(q1.try_get(o) && (int)o % 2 == 1, NULL); |
467 | } |
468 | } |
469 | |
470 | //! Tests limited concurrency cases for nodes that accept data messages |
471 | void test_concurrency(int num_threads) { |
472 | tbb::task_scheduler_init init(num_threads); |
473 | run_concurrency_levels<int,tbb::flow::tuple<int> >(num_threads); |
474 | run_concurrency_levels<int,tbb::flow::tuple<tbb::flow::continue_msg> >(num_threads); |
475 | run_buffered_levels<int, tbb::flow::tuple<int> >(num_threads); |
476 | run_unlimited_concurrency<int, tbb::flow::tuple<int> >(); |
477 | run_unlimited_concurrency<int,tbb::flow::tuple<empty_no_assign> >(); |
478 | run_unlimited_concurrency<empty_no_assign,tbb::flow::tuple<int> >(); |
479 | run_unlimited_concurrency<empty_no_assign,tbb::flow::tuple<empty_no_assign> >(); |
480 | run_unlimited_concurrency<int,tbb::flow::tuple<tbb::flow::continue_msg> >(); |
481 | run_unlimited_concurrency<empty_no_assign,tbb::flow::tuple<tbb::flow::continue_msg> >(); |
482 | run_multiport_test<int, tbb::flow::tuple<int, int> >(num_threads); |
483 | run_multiport_test<float, tbb::flow::tuple<int, double> >(num_threads); |
484 | } |
485 | |
486 | template<typename Policy> |
487 | void test_ports_return_references() { |
488 | tbb::flow::graph g; |
489 | typedef int InputType; |
490 | typedef tbb::flow::tuple<int> OutputTuple; |
491 | tbb::flow::multifunction_node<InputType, OutputTuple, Policy> mf_node( |
492 | g, tbb::flow::unlimited, |
493 | &harness_graph_multifunction_executor<InputType, OutputTuple>::empty_func ); |
494 | test_output_ports_return_ref(mf_node); |
495 | } |
496 | |
497 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
498 | // the integer received indicates which output ports should succeed and which should fail |
499 | // on try_put(). |
500 | typedef tbb::flow::multifunction_node<int, tbb::flow::tuple<int, int> > mf_node_type; |
501 | |
502 | struct add_to_counter { |
503 | int my_invocations; |
504 | int *counter; |
505 | add_to_counter(int& var):counter(&var){ my_invocations = 0;} |
506 | void operator()(const int &i, mf_node_type::output_ports_type &outports) { |
507 | *counter+=1; |
508 | ++my_invocations; |
509 | if(i & 0x1) { |
510 | ASSERT(tbb::flow::get<0>(outports).try_put(i), "port 0 expected to succeed" ); |
511 | } |
512 | else { |
513 | ASSERT(!tbb::flow::get<0>(outports).try_put(i), "port 0 expected to fail" ); |
514 | } |
515 | if(i & 0x2) { |
516 | ASSERT(tbb::flow::get<1>(outports).try_put(i), "port 1 expected to succeed" ); |
517 | } |
518 | else { |
519 | ASSERT(!tbb::flow::get<1>(outports).try_put(i), "port 1 expected to fail" ); |
520 | } |
521 | } |
522 | int my_inner() { return my_invocations; } |
523 | }; |
524 | |
525 | template<class FTYPE> |
526 | void test_extract() { |
527 | int my_count = 0; |
528 | int cm; |
529 | tbb::flow::graph g; |
530 | tbb::flow::broadcast_node<int> b0(g); |
531 | tbb::flow::broadcast_node<int> b1(g); |
532 | tbb::flow::multifunction_node<int, tbb::flow::tuple<int,int>, FTYPE> mf0(g, tbb::flow::unlimited, add_to_counter(my_count)); |
533 | tbb::flow::queue_node<int> q0(g); |
534 | tbb::flow::queue_node<int> q1(g); |
535 | |
536 | tbb::flow::make_edge(b0, mf0); |
537 | tbb::flow::make_edge(b1, mf0); |
538 | tbb::flow::make_edge(tbb::flow::output_port<0>(mf0), q0); |
539 | tbb::flow::make_edge(tbb::flow::output_port<1>(mf0), q1); |
540 | for( int i = 0; i < 2; ++i ) { |
541 | |
542 | /* b0 */ |
543 | /* \ |--q0 */ |
544 | /* mf0+ */ |
545 | /* / |--q1 */ |
546 | /* b1 */ |
547 | |
548 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts" ); |
549 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts" ); |
550 | ASSERT(mf0.predecessor_count() == 2 |
551 | && tbb::flow::output_port<0>(mf0).successor_count() == 1 |
552 | && tbb::flow::output_port<1>(mf0).successor_count() == 1 |
553 | , "mf0 has incorrect counts" ); |
554 | ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
555 | ASSERT(q1.predecessor_count() == 1 && q1.successor_count() == 0, "q0 has incorrect counts" ); |
556 | b0.try_put(3); |
557 | g.wait_for_all(); |
558 | ASSERT(my_count == 1, "multifunction_node didn't fire" ); |
559 | ASSERT(q0.try_get(cm), "multifunction_node didn't forward to 0" ); |
560 | ASSERT(q1.try_get(cm), "multifunction_node didn't forward to 1" ); |
561 | b1.try_put(3); |
562 | g.wait_for_all(); |
563 | ASSERT(my_count == 2, "multifunction_node didn't fire" ); |
564 | ASSERT(q0.try_get(cm), "multifunction_node didn't forward to 0" ); |
565 | ASSERT(q1.try_get(cm), "multifunction_node didn't forward to 1" ); |
566 | |
567 | b0.extract(); |
568 | |
569 | |
570 | /* b0 */ |
571 | /* |--q0 */ |
572 | /* mf0+ */ |
573 | /* / |--q1 */ |
574 | /* b1 */ |
575 | |
576 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts" ); |
577 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts" ); |
578 | ASSERT(mf0.predecessor_count() == 1 |
579 | && tbb::flow::output_port<0>(mf0).successor_count() == 1 |
580 | && tbb::flow::output_port<1>(mf0).successor_count() == 1 |
581 | , "mf0 has incorrect counts" ); |
582 | ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
583 | ASSERT(q1.predecessor_count() == 1 && q1.successor_count() == 0, "q0 has incorrect counts" ); |
584 | b0.try_put(1); |
585 | b0.try_put(1); |
586 | g.wait_for_all(); |
587 | ASSERT(my_count == 2, "b0 messages being forwarded to multifunction_node even though it is disconnected" ); |
588 | b1.try_put(3); |
589 | g.wait_for_all(); |
590 | ASSERT(my_count == 3, "multifunction_node didn't fire though it has only one predecessor" ); |
591 | ASSERT(q0.try_get(cm), "multifunction_node didn't forward second time" ); |
592 | ASSERT(q1.try_get(cm), "multifunction_node didn't forward second time" ); |
593 | |
594 | q0.extract(); |
595 | |
596 | /* b0 */ |
597 | /* | q0 */ |
598 | /* mf0+ */ |
599 | /* / |--q1 */ |
600 | /* b1 */ |
601 | |
602 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts" ); |
603 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts" ); |
604 | ASSERT(mf0.predecessor_count() == 1 |
605 | && tbb::flow::output_port<0>(mf0).successor_count() == 0 |
606 | && tbb::flow::output_port<1>(mf0).successor_count() == 1 |
607 | , "mf0 has incorrect counts" ); |
608 | ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
609 | ASSERT(q1.predecessor_count() == 1 && q1.successor_count() == 0, "q0 has incorrect counts" ); |
610 | b0.try_put(1); |
611 | b0.try_put(1); |
612 | g.wait_for_all(); |
613 | ASSERT(my_count == 3, "b0 messages being forwarded to multifunction_node even though it is disconnected" ); |
614 | b1.try_put(2); |
615 | g.wait_for_all(); |
616 | ASSERT(my_count == 4, "multifunction_node didn't fire though it has one predecessor" ); |
617 | ASSERT(!q0.try_get(cm), "multifunction_node forwarded" ); |
618 | ASSERT(q1.try_get(cm), "multifunction_node forwarded" ); |
619 | mf0.extract(); |
620 | |
621 | if(i == 0) { |
622 | } |
623 | else { |
624 | g.reset(tbb::flow::rf_reset_bodies); |
625 | } |
626 | |
627 | |
628 | /* b0 */ |
629 | /* | q0 */ |
630 | /* mf0+ */ |
631 | /* | q1 */ |
632 | /* b1 */ |
633 | |
634 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts" ); |
635 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts" ); |
636 | ASSERT(mf0.predecessor_count() == 0 |
637 | && tbb::flow::output_port<0>(mf0).successor_count() == 0 |
638 | && tbb::flow::output_port<1>(mf0).successor_count() == 0 |
639 | , "mf0 has incorrect counts" ); |
640 | ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
641 | ASSERT(q1.predecessor_count() == 0 && q1.successor_count() == 0, "q0 has incorrect counts" ); |
642 | b0.try_put(1); |
643 | b0.try_put(1); |
644 | g.wait_for_all(); |
645 | ASSERT(my_count == 4, "b0 messages being forwarded to multifunction_node even though it is disconnected" ); |
646 | b1.try_put(2); |
647 | g.wait_for_all(); |
648 | ASSERT(my_count == 4, "b1 messages being forwarded to multifunction_node even though it is disconnected" ); |
649 | ASSERT(!q0.try_get(cm), "multifunction_node forwarded" ); |
650 | ASSERT(!q1.try_get(cm), "multifunction_node forwarded" ); |
651 | make_edge(b0, mf0); |
652 | |
653 | /* b0 */ |
654 | /* \ | q0 */ |
655 | /* mf0+ */ |
656 | /* | q1 */ |
657 | /* b1 */ |
658 | |
659 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts" ); |
660 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts" ); |
661 | ASSERT(mf0.predecessor_count() == 1 |
662 | && tbb::flow::output_port<0>(mf0).successor_count() == 0 |
663 | && tbb::flow::output_port<1>(mf0).successor_count() == 0 |
664 | , "mf0 has incorrect counts" ); |
665 | ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
666 | ASSERT(q1.predecessor_count() == 0 && q1.successor_count() == 0, "q0 has incorrect counts" ); |
667 | b0.try_put(0); |
668 | g.wait_for_all(); |
669 | ASSERT(my_count == 5, "multifunction_node didn't fire though it has one predecessor" ); |
670 | b1.try_put(2); |
671 | g.wait_for_all(); |
672 | ASSERT(my_count == 5, "multifunction_node fired though it has only one predecessor" ); |
673 | ASSERT(!q0.try_get(cm), "multifunction_node forwarded" ); |
674 | ASSERT(!q1.try_get(cm), "multifunction_node forwarded" ); |
675 | |
676 | tbb::flow::make_edge(b1, mf0); |
677 | tbb::flow::make_edge(tbb::flow::output_port<0>(mf0), q0); |
678 | tbb::flow::make_edge(tbb::flow::output_port<1>(mf0), q1); |
679 | ASSERT( ( i == 0 && tbb::flow::copy_body<add_to_counter>(mf0).my_inner() == 5 ) || |
680 | ( i == 1 && tbb::flow::copy_body<add_to_counter>(mf0).my_inner() == 1 ) , "reset_bodies failed" ); |
681 | my_count = 0; |
682 | } |
683 | } |
684 | #endif |
685 | |
686 | int TestMain() { |
687 | if( MinThread<1 ) { |
688 | REPORT("number of threads must be positive\n" ); |
689 | exit(1); |
690 | } |
691 | for( int p=MinThread; p<=MaxThread; ++p ) { |
692 | test_concurrency(p); |
693 | } |
694 | test_ports_return_references<tbb::flow::queueing>(); |
695 | test_ports_return_references<tbb::flow::rejecting>(); |
696 | lightweight_testing::test<tbb::flow::multifunction_node>(10); |
697 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
698 | test_extract<tbb::flow::rejecting>(); |
699 | test_extract<tbb::flow::queueing>(); |
700 | #endif |
701 | return Harness::Done; |
702 | } |
703 | |