1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | #if __TBB_CPF_BUILD |
18 | #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1 |
19 | #endif |
20 | |
21 | #include "harness_graph.h" |
22 | |
23 | #include "tbb/flow_graph.h" |
24 | #include "tbb/task_scheduler_init.h" |
25 | #include "tbb/spin_rw_mutex.h" |
26 | |
27 | #define N 100 |
28 | #define MAX_NODES 4 |
29 | |
30 | //! Performs test on function nodes with limited concurrency and buffering |
31 | /** These tests check: |
32 | 1) that the number of executing copies never exceed the concurrency limit |
33 | 2) that the node never rejects |
34 | 3) that no items are lost |
35 | and 4) all of this happens even if there are multiple predecessors and successors |
36 | */ |
37 | |
38 | template< typename InputType > |
39 | struct parallel_put_until_limit : private NoAssign { |
40 | |
41 | harness_counting_sender<InputType> *my_senders; |
42 | |
43 | parallel_put_until_limit( harness_counting_sender<InputType> *senders ) : my_senders(senders) {} |
44 | |
45 | void operator()( int i ) const { |
46 | if ( my_senders ) { |
47 | my_senders[i].try_put_until_limit(); |
48 | } |
49 | } |
50 | |
51 | }; |
52 | |
53 | template<typename IO> |
54 | struct pass_through { |
55 | IO operator()(const IO& i) { return i; } |
56 | }; |
57 | |
58 | template< typename InputType, typename OutputType, typename Body > |
59 | void buffered_levels( size_t concurrency, Body body ) { |
60 | |
61 | // Do for lc = 1 to concurrency level |
62 | for ( size_t lc = 1; lc <= concurrency; ++lc ) { |
63 | tbb::flow::graph g; |
64 | |
65 | // Set the execute_counter back to zero in the harness |
66 | harness_graph_executor<InputType, OutputType>::execute_count = 0; |
67 | // Set the number of current executors to zero. |
68 | harness_graph_executor<InputType, OutputType>::current_executors = 0; |
69 | // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. |
70 | harness_graph_executor<InputType, OutputType>::max_executors = lc; |
71 | |
72 | // Create the function_node with the appropriate concurrency level, and use default buffering |
73 | tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, body ); |
74 | tbb::flow::function_node<InputType, InputType> pass_thru( g, tbb::flow::unlimited, pass_through<InputType>()); |
75 | |
76 | // Create a vector of identical exe_nodes and pass_thrus |
77 | std::vector< tbb::flow::function_node< InputType, OutputType > > exe_vec(2, exe_node); |
78 | std::vector< tbb::flow::function_node< InputType, InputType > > pass_thru_vec(2, pass_thru); |
79 | // Attach each pass_thru to its corresponding exe_node |
80 | for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { |
81 | tbb::flow::make_edge(pass_thru_vec[node_idx], exe_vec[node_idx]); |
82 | } |
83 | |
84 | // TODO: why the test is executed serially for the node pairs, not concurrently? |
85 | for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { |
86 | // For num_receivers = 1 to MAX_NODES |
87 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { |
88 | // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them. |
89 | std::vector< harness_mapped_receiver<OutputType>* > receivers(num_receivers); |
90 | for (size_t i = 0; i < num_receivers; i++) { |
91 | receivers[i] = new harness_mapped_receiver<OutputType>(g); |
92 | } |
93 | |
94 | for (size_t r = 0; r < num_receivers; ++r ) { |
95 | tbb::flow::make_edge( exe_vec[node_idx], *receivers[r] ); |
96 | } |
97 | |
98 | // Do the test with varying numbers of senders |
99 | harness_counting_sender<InputType> *senders = NULL; |
100 | for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { |
101 | // Create num_senders senders, set there message limit each to N, and connect them to pass_thru_vec[node_idx] |
102 | senders = new harness_counting_sender<InputType>[num_senders]; |
103 | for (size_t s = 0; s < num_senders; ++s ) { |
104 | senders[s].my_limit = N; |
105 | senders[s].register_successor(pass_thru_vec[node_idx] ); |
106 | } |
107 | |
108 | // Initialize the receivers so they know how many senders and messages to check for |
109 | for (size_t r = 0; r < num_receivers; ++r ) { |
110 | receivers[r]->initialize_map( N, num_senders ); |
111 | } |
112 | |
113 | // Do the test |
114 | NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); |
115 | g.wait_for_all(); |
116 | |
117 | // confirm that each sender was requested from N times |
118 | for (size_t s = 0; s < num_senders; ++s ) { |
119 | size_t n = senders[s].my_received; |
120 | ASSERT( n == N, NULL ); |
121 | ASSERT( senders[s].my_receiver == &pass_thru_vec[node_idx], NULL ); |
122 | } |
123 | // validate the receivers |
124 | for (size_t r = 0; r < num_receivers; ++r ) { |
125 | receivers[r]->validate(); |
126 | } |
127 | delete [] senders; |
128 | } |
129 | for (size_t r = 0; r < num_receivers; ++r ) { |
130 | tbb::flow::remove_edge( exe_vec[node_idx], *receivers[r] ); |
131 | } |
132 | ASSERT( exe_vec[node_idx].try_put( InputType() ) == true, NULL ); |
133 | g.wait_for_all(); |
134 | for (size_t r = 0; r < num_receivers; ++r ) { |
135 | // since it's detached, nothing should have changed |
136 | receivers[r]->validate(); |
137 | } |
138 | |
139 | for (size_t i = 0; i < num_receivers; i++) { |
140 | delete receivers[i]; |
141 | } |
142 | |
143 | } // for num_receivers |
144 | } // for node_idx |
145 | } // for concurrency level lc |
146 | } |
147 | |
148 | const size_t Offset = 123; |
149 | tbb::atomic<size_t> global_execute_count; |
150 | |
151 | struct inc_functor { |
152 | |
153 | tbb::atomic<size_t> local_execute_count; |
154 | inc_functor( ) { local_execute_count = 0; } |
155 | inc_functor( const inc_functor &f ) { local_execute_count = f.local_execute_count; } |
156 | void operator=( const inc_functor &f ) { local_execute_count = f.local_execute_count; } |
157 | |
158 | int operator()( int i ) { |
159 | ++global_execute_count; |
160 | ++local_execute_count; |
161 | return i; |
162 | } |
163 | |
164 | }; |
165 | |
166 | template< typename InputType, typename OutputType > |
167 | void buffered_levels_with_copy( size_t concurrency ) { |
168 | |
169 | // Do for lc = 1 to concurrency level |
170 | for ( size_t lc = 1; lc <= concurrency; ++lc ) { |
171 | tbb::flow::graph g; |
172 | |
173 | inc_functor cf; |
174 | cf.local_execute_count = Offset; |
175 | global_execute_count = Offset; |
176 | |
177 | tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, cf ); |
178 | |
179 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { |
180 | |
181 | std::vector< harness_mapped_receiver<OutputType>* > receivers(num_receivers); |
182 | for (size_t i = 0; i < num_receivers; i++) { |
183 | receivers[i] = new harness_mapped_receiver<OutputType>(g); |
184 | } |
185 | |
186 | for (size_t r = 0; r < num_receivers; ++r ) { |
187 | tbb::flow::make_edge( exe_node, *receivers[r] ); |
188 | } |
189 | |
190 | harness_counting_sender<InputType> *senders = NULL; |
191 | for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { |
192 | senders = new harness_counting_sender<InputType>[num_senders]; |
193 | for (size_t s = 0; s < num_senders; ++s ) { |
194 | senders[s].my_limit = N; |
195 | tbb::flow::make_edge( senders[s], exe_node ); |
196 | } |
197 | |
198 | for (size_t r = 0; r < num_receivers; ++r ) { |
199 | receivers[r]->initialize_map( N, num_senders ); |
200 | } |
201 | |
202 | NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); |
203 | g.wait_for_all(); |
204 | |
205 | for (size_t s = 0; s < num_senders; ++s ) { |
206 | size_t n = senders[s].my_received; |
207 | ASSERT( n == N, NULL ); |
208 | ASSERT( senders[s].my_receiver == &exe_node, NULL ); |
209 | } |
210 | for (size_t r = 0; r < num_receivers; ++r ) { |
211 | receivers[r]->validate(); |
212 | } |
213 | delete [] senders; |
214 | } |
215 | for (size_t r = 0; r < num_receivers; ++r ) { |
216 | tbb::flow::remove_edge( exe_node, *receivers[r] ); |
217 | } |
218 | ASSERT( exe_node.try_put( InputType() ) == true, NULL ); |
219 | g.wait_for_all(); |
220 | for (size_t r = 0; r < num_receivers; ++r ) { |
221 | receivers[r]->validate(); |
222 | } |
223 | |
224 | for (size_t i = 0; i < num_receivers; i++) { |
225 | delete receivers[i]; |
226 | } |
227 | } |
228 | |
229 | // validate that the local body matches the global execute_count and both are correct |
230 | inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); |
231 | const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset; |
232 | size_t global_count = global_execute_count; |
233 | size_t inc_count = body_copy.local_execute_count; |
234 | ASSERT( global_count == expected_count && global_count == inc_count, NULL ); |
235 | g.reset(tbb::flow::rf_reset_bodies); |
236 | body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); |
237 | inc_count = body_copy.local_execute_count; |
238 | ASSERT( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor" ); |
239 | } |
240 | } |
241 | |
242 | template< typename InputType, typename OutputType > |
243 | void run_buffered_levels( int c ) { |
244 | #if __TBB_CPP11_LAMBDAS_PRESENT |
245 | buffered_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } ); |
246 | #endif |
247 | buffered_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::func ); |
248 | buffered_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::functor() ); |
249 | buffered_levels_with_copy<InputType,OutputType>( c ); |
250 | } |
251 | |
252 | |
253 | //! Performs test on executable nodes with limited concurrency |
254 | /** These tests check: |
255 | 1) that the nodes will accepts puts up to the concurrency limit, |
256 | 2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor), |
257 | 3) the nodes will receive puts from multiple successors simultaneously, |
258 | and 4) the nodes will send to multiple predecessors. |
259 | There is no checking of the contents of the messages for corruption. |
260 | */ |
261 | |
262 | template< typename InputType, typename OutputType, typename Body > |
263 | void concurrency_levels( size_t concurrency, Body body ) { |
264 | |
265 | for ( size_t lc = 1; lc <= concurrency; ++lc ) { |
266 | tbb::flow::graph g; |
267 | |
268 | // Set the execute_counter back to zero in the harness |
269 | harness_graph_executor<InputType, OutputType>::execute_count = 0; |
270 | // Set the number of current executors to zero. |
271 | harness_graph_executor<InputType, OutputType>::current_executors = 0; |
272 | // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. |
273 | harness_graph_executor<InputType, OutputType>::max_executors = lc; |
274 | |
275 | typedef tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > fnode_type; |
276 | fnode_type exe_node( g, lc, body ); |
277 | |
278 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { |
279 | |
280 | std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g)); |
281 | |
282 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
283 | ASSERT(exe_node.successor_count() == 0, NULL); |
284 | ASSERT(exe_node.predecessor_count() == 0, NULL); |
285 | #endif |
286 | |
287 | for (size_t r = 0; r < num_receivers; ++r ) { |
288 | tbb::flow::make_edge( exe_node, receivers[r] ); |
289 | } |
290 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
291 | ASSERT(exe_node.successor_count() == num_receivers, NULL); |
292 | typename fnode_type::successor_list_type my_succs; |
293 | exe_node.copy_successors(my_succs); |
294 | ASSERT(my_succs.size() == num_receivers, NULL); |
295 | typename fnode_type::predecessor_list_type my_preds; |
296 | exe_node.copy_predecessors(my_preds); |
297 | ASSERT(my_preds.size() == 0, NULL); |
298 | #endif |
299 | |
300 | harness_counting_sender<InputType> *senders = NULL; |
301 | |
302 | for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { |
303 | senders = new harness_counting_sender<InputType>[num_senders]; |
304 | { |
305 | // Exclusively lock m to prevent exe_node from finishing |
306 | tbb::spin_rw_mutex::scoped_lock l( harness_graph_executor<InputType, OutputType>::template mutex_holder<tbb::spin_rw_mutex>::mutex ); |
307 | |
308 | // put to lc level, it will accept and then block at m |
309 | for ( size_t c = 0 ; c < lc ; ++c ) { |
310 | ASSERT( exe_node.try_put( InputType() ) == true, NULL ); |
311 | } |
312 | // it only accepts to lc level |
313 | ASSERT( exe_node.try_put( InputType() ) == false, NULL ); |
314 | |
315 | for (size_t s = 0; s < num_senders; ++s ) { |
316 | // register a sender |
317 | senders[s].my_limit = N; |
318 | exe_node.register_predecessor( senders[s] ); |
319 | } |
320 | |
321 | } // release lock at end of scope, setting the exe node free to continue |
322 | // wait for graph to settle down |
323 | g.wait_for_all(); |
324 | |
325 | // confirm that each sender was requested from N times |
326 | for (size_t s = 0; s < num_senders; ++s ) { |
327 | size_t n = senders[s].my_received; |
328 | ASSERT( n == N, NULL ); |
329 | ASSERT( senders[s].my_receiver == &exe_node, NULL ); |
330 | } |
331 | // confirm that each receivers got N * num_senders + the initial lc puts |
332 | for (size_t r = 0; r < num_receivers; ++r ) { |
333 | size_t n = receivers[r].my_count; |
334 | ASSERT( n == num_senders*N+lc, NULL ); |
335 | receivers[r].my_count = 0; |
336 | } |
337 | delete [] senders; |
338 | } |
339 | for (size_t r = 0; r < num_receivers; ++r ) { |
340 | tbb::flow::remove_edge( exe_node, receivers[r] ); |
341 | } |
342 | ASSERT( exe_node.try_put( InputType() ) == true, NULL ); |
343 | g.wait_for_all(); |
344 | for (size_t r = 0; r < num_receivers; ++r ) { |
345 | ASSERT( int(receivers[r].my_count) == 0, NULL ); |
346 | } |
347 | } |
348 | } |
349 | } |
350 | |
351 | |
352 | template< typename InputType, typename OutputType > |
353 | void run_concurrency_levels( int c ) { |
354 | #if __TBB_CPP11_LAMBDAS_PRESENT |
355 | concurrency_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex>(i); } ); |
356 | #endif |
357 | concurrency_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex> ); |
358 | concurrency_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::template tfunctor<tbb::spin_rw_mutex>() ); |
359 | } |
360 | |
361 | |
362 | struct empty_no_assign { |
363 | empty_no_assign() {} |
364 | empty_no_assign( int ) {} |
365 | operator int() { return 0; } |
366 | }; |
367 | |
368 | template< typename InputType > |
369 | struct parallel_puts : private NoAssign { |
370 | |
371 | tbb::flow::receiver< InputType > * const my_exe_node; |
372 | |
373 | parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} |
374 | |
375 | void operator()( int ) const { |
376 | for ( int i = 0; i < N; ++i ) { |
377 | // the nodes will accept all puts |
378 | ASSERT( my_exe_node->try_put( InputType() ) == true, NULL ); |
379 | } |
380 | } |
381 | |
382 | }; |
383 | |
384 | //! Performs test on executable nodes with unlimited concurrency |
385 | /** These tests check: |
386 | 1) that the nodes will accept all puts |
387 | 2) the nodes will receive puts from multiple predecessors simultaneously, |
388 | and 3) the nodes will send to multiple successors. |
389 | There is no checking of the contents of the messages for corruption. |
390 | */ |
391 | |
392 | template< typename InputType, typename OutputType, typename Body > |
393 | void unlimited_concurrency( Body body ) { |
394 | |
395 | for (int p = 1; p < 2*MaxThread; ++p) { |
396 | tbb::flow::graph g; |
397 | tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body ); |
398 | |
399 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { |
400 | |
401 | std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g)); |
402 | harness_graph_executor<InputType, OutputType>::execute_count = 0; |
403 | |
404 | for (size_t r = 0; r < num_receivers; ++r ) { |
405 | tbb::flow::make_edge( exe_node, receivers[r] ); |
406 | } |
407 | |
408 | NativeParallelFor( p, parallel_puts<InputType>(exe_node) ); |
409 | g.wait_for_all(); |
410 | |
411 | // 2) the nodes will receive puts from multiple predecessors simultaneously, |
412 | size_t ec = harness_graph_executor<InputType, OutputType>::execute_count; |
413 | ASSERT( (int)ec == p*N, NULL ); |
414 | for (size_t r = 0; r < num_receivers; ++r ) { |
415 | size_t c = receivers[r].my_count; |
416 | // 3) the nodes will send to multiple successors. |
417 | ASSERT( (int)c == p*N, NULL ); |
418 | } |
419 | for (size_t r = 0; r < num_receivers; ++r ) { |
420 | tbb::flow::remove_edge( exe_node, receivers[r] ); |
421 | } |
422 | } |
423 | } |
424 | } |
425 | |
426 | template< typename InputType, typename OutputType > |
427 | void run_unlimited_concurrency() { |
428 | harness_graph_executor<InputType, OutputType>::max_executors = 0; |
429 | #if __TBB_CPP11_LAMBDAS_PRESENT |
430 | unlimited_concurrency<InputType,OutputType>( []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } ); |
431 | #endif |
432 | unlimited_concurrency<InputType,OutputType>( &harness_graph_executor<InputType, OutputType>::func ); |
433 | unlimited_concurrency<InputType,OutputType>( typename harness_graph_executor<InputType, OutputType>::functor() ); |
434 | } |
435 | |
436 | struct continue_msg_to_int { |
437 | int my_int; |
438 | continue_msg_to_int(int x) : my_int(x) {} |
439 | int operator()(tbb::flow::continue_msg) { return my_int; } |
440 | }; |
441 | |
442 | void test_function_node_with_continue_msg_as_input() { |
443 | // If this function terminates, then this test is successful |
444 | tbb::flow::graph g; |
445 | |
446 | tbb::flow::broadcast_node<tbb::flow::continue_msg> Start(g); |
447 | |
448 | tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN1( g, tbb::flow::serial, continue_msg_to_int(42)); |
449 | tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN2( g, tbb::flow::serial, continue_msg_to_int(43)); |
450 | |
451 | tbb::flow::make_edge( Start, FN1 ); |
452 | tbb::flow::make_edge( Start, FN2 ); |
453 | |
454 | Start.try_put( tbb::flow::continue_msg() ); |
455 | g.wait_for_all(); |
456 | } |
457 | |
458 | //! Tests limited concurrency cases for nodes that accept data messages |
459 | void test_concurrency(int num_threads) { |
460 | tbb::task_scheduler_init init(num_threads); |
461 | run_concurrency_levels<int,int>(num_threads); |
462 | run_concurrency_levels<int,tbb::flow::continue_msg>(num_threads); |
463 | run_buffered_levels<int, int>(num_threads); |
464 | run_unlimited_concurrency<int,int>(); |
465 | run_unlimited_concurrency<int,empty_no_assign>(); |
466 | run_unlimited_concurrency<empty_no_assign,int>(); |
467 | run_unlimited_concurrency<empty_no_assign,empty_no_assign>(); |
468 | run_unlimited_concurrency<int,tbb::flow::continue_msg>(); |
469 | run_unlimited_concurrency<empty_no_assign,tbb::flow::continue_msg>(); |
470 | test_function_node_with_continue_msg_as_input(); |
471 | } |
472 | |
473 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
474 | struct add_to_counter { |
475 | int* counter; |
476 | add_to_counter(int& var):counter(&var){} |
477 | int operator()(int i){*counter+=1; return i + 1;} |
478 | }; |
479 | |
480 | template<typename FTYPE> |
481 | void test_extract() { |
482 | int my_count = 0; |
483 | int cm; |
484 | tbb::flow::graph g; |
485 | tbb::flow::broadcast_node<int> b0(g); |
486 | tbb::flow::broadcast_node<int> b1(g); |
487 | tbb::flow::function_node<int, int, FTYPE> f0(g, tbb::flow::unlimited, add_to_counter(my_count)); |
488 | tbb::flow::queue_node<int> q0(g); |
489 | |
490 | tbb::flow::make_edge(b0, f0); |
491 | tbb::flow::make_edge(b1, f0); |
492 | tbb::flow::make_edge(f0, q0); |
493 | for( int i = 0; i < 2; ++i ) { |
494 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts" ); |
495 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts" ); |
496 | ASSERT(f0.predecessor_count() == 2 && f0.successor_count() == 1, "f0 has incorrect counts" ); |
497 | ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
498 | |
499 | /* b0 */ |
500 | /* \ */ |
501 | /* f0 - q0 */ |
502 | /* / */ |
503 | /* b1 */ |
504 | |
505 | b0.try_put(1); |
506 | g.wait_for_all(); |
507 | ASSERT(my_count == 1, "function_node didn't fire" ); |
508 | ASSERT(q0.try_get(cm), "function_node didn't forward" ); |
509 | b1.try_put(1); |
510 | g.wait_for_all(); |
511 | ASSERT(my_count == 2, "function_node didn't fire" ); |
512 | ASSERT(q0.try_get(cm), "function_node didn't forward" ); |
513 | |
514 | b0.extract(); |
515 | |
516 | /* b0 */ |
517 | /* */ |
518 | /* f0 - q0 */ |
519 | /* / */ |
520 | /* b1 */ |
521 | |
522 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts" ); |
523 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts" ); |
524 | ASSERT(f0.predecessor_count() == 1 && f0.successor_count() == 1, "f0 has incorrect counts" ); |
525 | ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
526 | b0.try_put(1); |
527 | b0.try_put(1); |
528 | g.wait_for_all(); |
529 | ASSERT(my_count == 2, "b0 messages being forwarded to function_node even though it is disconnected" ); |
530 | b1.try_put(1); |
531 | g.wait_for_all(); |
532 | ASSERT(my_count == 3, "function_node didn't fire though it has only one predecessor" ); |
533 | ASSERT(q0.try_get(cm), "function_node didn't forward second time" ); |
534 | |
535 | f0.extract(); |
536 | |
537 | /* b0 */ |
538 | /* */ |
539 | /* f0 q0 */ |
540 | /* */ |
541 | /* b1 */ |
542 | |
543 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts" ); |
544 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts" ); |
545 | ASSERT(f0.predecessor_count() == 0 && f0.successor_count() == 0, "f0 has incorrect counts" ); |
546 | ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
547 | b0.try_put(1); |
548 | b0.try_put(1); |
549 | b1.try_put(1); |
550 | b1.try_put(1); |
551 | g.wait_for_all(); |
552 | ASSERT(my_count == 3, "function_node didn't fire though it has only one predecessor" ); |
553 | ASSERT(!q0.try_get(cm), "function_node forwarded though it shouldn't" ); |
554 | make_edge(b0, f0); |
555 | |
556 | /* b0 */ |
557 | /* \ */ |
558 | /* f0 q0 */ |
559 | /* */ |
560 | /* b1 */ |
561 | |
562 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts" ); |
563 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts" ); |
564 | ASSERT(f0.predecessor_count() == 1 && f0.successor_count() == 0, "f0 has incorrect counts" ); |
565 | ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
566 | |
567 | b0.try_put(int()); |
568 | g.wait_for_all(); |
569 | |
570 | ASSERT(my_count == 4, "function_node didn't fire though it has only one predecessor" ); |
571 | ASSERT(!q0.try_get(cm), "function_node forwarded though it shouldn't" ); |
572 | |
573 | tbb::flow::make_edge(b1, f0); |
574 | tbb::flow::make_edge(f0, q0); |
575 | my_count = 0; |
576 | } |
577 | } |
578 | #endif |
579 | |
580 | int TestMain() { |
581 | if( MinThread<1 ) { |
582 | REPORT("number of threads must be positive\n" ); |
583 | exit(1); |
584 | } |
585 | for( int p=MinThread; p<=MaxThread; ++p ) { |
586 | test_concurrency(p); |
587 | } |
588 | lightweight_testing::test<tbb::flow::function_node>(10); |
589 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
590 | test_extract<tbb::flow::rejecting>(); |
591 | test_extract<tbb::flow::queueing>(); |
592 | #endif |
593 | return Harness::Done; |
594 | } |
595 | |
596 | |