| 1 | /* |
| 2 | Copyright (c) 2005-2019 Intel Corporation |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | #if __TBB_CPF_BUILD |
| 18 | #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1 |
| 19 | #endif |
| 20 | |
| 21 | #include "harness_graph.h" |
| 22 | |
| 23 | #include "tbb/flow_graph.h" |
| 24 | #include "tbb/task_scheduler_init.h" |
| 25 | #include "tbb/spin_rw_mutex.h" |
| 26 | |
| 27 | #define N 100 |
| 28 | #define MAX_NODES 4 |
| 29 | |
| 30 | //! Performs test on function nodes with limited concurrency and buffering |
| 31 | /** These tests check: |
| 32 | 1) that the number of executing copies never exceed the concurrency limit |
| 33 | 2) that the node never rejects |
| 34 | 3) that no items are lost |
| 35 | and 4) all of this happens even if there are multiple predecessors and successors |
| 36 | */ |
| 37 | |
| 38 | template< typename InputType > |
| 39 | struct parallel_put_until_limit : private NoAssign { |
| 40 | |
| 41 | harness_counting_sender<InputType> *my_senders; |
| 42 | |
| 43 | parallel_put_until_limit( harness_counting_sender<InputType> *senders ) : my_senders(senders) {} |
| 44 | |
| 45 | void operator()( int i ) const { |
| 46 | if ( my_senders ) { |
| 47 | my_senders[i].try_put_until_limit(); |
| 48 | } |
| 49 | } |
| 50 | |
| 51 | }; |
| 52 | |
| 53 | template<typename IO> |
| 54 | struct pass_through { |
| 55 | IO operator()(const IO& i) { return i; } |
| 56 | }; |
| 57 | |
| 58 | template< typename InputType, typename OutputType, typename Body > |
| 59 | void buffered_levels( size_t concurrency, Body body ) { |
| 60 | |
| 61 | // Do for lc = 1 to concurrency level |
| 62 | for ( size_t lc = 1; lc <= concurrency; ++lc ) { |
| 63 | tbb::flow::graph g; |
| 64 | |
| 65 | // Set the execute_counter back to zero in the harness |
| 66 | harness_graph_executor<InputType, OutputType>::execute_count = 0; |
| 67 | // Set the number of current executors to zero. |
| 68 | harness_graph_executor<InputType, OutputType>::current_executors = 0; |
| 69 | // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. |
| 70 | harness_graph_executor<InputType, OutputType>::max_executors = lc; |
| 71 | |
| 72 | // Create the function_node with the appropriate concurrency level, and use default buffering |
| 73 | tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, body ); |
| 74 | tbb::flow::function_node<InputType, InputType> pass_thru( g, tbb::flow::unlimited, pass_through<InputType>()); |
| 75 | |
| 76 | // Create a vector of identical exe_nodes and pass_thrus |
| 77 | std::vector< tbb::flow::function_node< InputType, OutputType > > exe_vec(2, exe_node); |
| 78 | std::vector< tbb::flow::function_node< InputType, InputType > > pass_thru_vec(2, pass_thru); |
| 79 | // Attach each pass_thru to its corresponding exe_node |
| 80 | for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { |
| 81 | tbb::flow::make_edge(pass_thru_vec[node_idx], exe_vec[node_idx]); |
| 82 | } |
| 83 | |
| 84 | // TODO: why the test is executed serially for the node pairs, not concurrently? |
| 85 | for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { |
| 86 | // For num_receivers = 1 to MAX_NODES |
| 87 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { |
| 88 | // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them. |
| 89 | std::vector< harness_mapped_receiver<OutputType>* > receivers(num_receivers); |
| 90 | for (size_t i = 0; i < num_receivers; i++) { |
| 91 | receivers[i] = new harness_mapped_receiver<OutputType>(g); |
| 92 | } |
| 93 | |
| 94 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 95 | tbb::flow::make_edge( exe_vec[node_idx], *receivers[r] ); |
| 96 | } |
| 97 | |
| 98 | // Do the test with varying numbers of senders |
| 99 | harness_counting_sender<InputType> *senders = NULL; |
| 100 | for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { |
| 101 | // Create num_senders senders, set there message limit each to N, and connect them to pass_thru_vec[node_idx] |
| 102 | senders = new harness_counting_sender<InputType>[num_senders]; |
| 103 | for (size_t s = 0; s < num_senders; ++s ) { |
| 104 | senders[s].my_limit = N; |
| 105 | senders[s].register_successor(pass_thru_vec[node_idx] ); |
| 106 | } |
| 107 | |
| 108 | // Initialize the receivers so they know how many senders and messages to check for |
| 109 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 110 | receivers[r]->initialize_map( N, num_senders ); |
| 111 | } |
| 112 | |
| 113 | // Do the test |
| 114 | NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); |
| 115 | g.wait_for_all(); |
| 116 | |
| 117 | // confirm that each sender was requested from N times |
| 118 | for (size_t s = 0; s < num_senders; ++s ) { |
| 119 | size_t n = senders[s].my_received; |
| 120 | ASSERT( n == N, NULL ); |
| 121 | ASSERT( senders[s].my_receiver == &pass_thru_vec[node_idx], NULL ); |
| 122 | } |
| 123 | // validate the receivers |
| 124 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 125 | receivers[r]->validate(); |
| 126 | } |
| 127 | delete [] senders; |
| 128 | } |
| 129 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 130 | tbb::flow::remove_edge( exe_vec[node_idx], *receivers[r] ); |
| 131 | } |
| 132 | ASSERT( exe_vec[node_idx].try_put( InputType() ) == true, NULL ); |
| 133 | g.wait_for_all(); |
| 134 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 135 | // since it's detached, nothing should have changed |
| 136 | receivers[r]->validate(); |
| 137 | } |
| 138 | |
| 139 | for (size_t i = 0; i < num_receivers; i++) { |
| 140 | delete receivers[i]; |
| 141 | } |
| 142 | |
| 143 | } // for num_receivers |
| 144 | } // for node_idx |
| 145 | } // for concurrency level lc |
| 146 | } |
| 147 | |
| 148 | const size_t Offset = 123; |
| 149 | tbb::atomic<size_t> global_execute_count; |
| 150 | |
| 151 | struct inc_functor { |
| 152 | |
| 153 | tbb::atomic<size_t> local_execute_count; |
| 154 | inc_functor( ) { local_execute_count = 0; } |
| 155 | inc_functor( const inc_functor &f ) { local_execute_count = f.local_execute_count; } |
| 156 | void operator=( const inc_functor &f ) { local_execute_count = f.local_execute_count; } |
| 157 | |
| 158 | int operator()( int i ) { |
| 159 | ++global_execute_count; |
| 160 | ++local_execute_count; |
| 161 | return i; |
| 162 | } |
| 163 | |
| 164 | }; |
| 165 | |
| 166 | template< typename InputType, typename OutputType > |
| 167 | void buffered_levels_with_copy( size_t concurrency ) { |
| 168 | |
| 169 | // Do for lc = 1 to concurrency level |
| 170 | for ( size_t lc = 1; lc <= concurrency; ++lc ) { |
| 171 | tbb::flow::graph g; |
| 172 | |
| 173 | inc_functor cf; |
| 174 | cf.local_execute_count = Offset; |
| 175 | global_execute_count = Offset; |
| 176 | |
| 177 | tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, cf ); |
| 178 | |
| 179 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { |
| 180 | |
| 181 | std::vector< harness_mapped_receiver<OutputType>* > receivers(num_receivers); |
| 182 | for (size_t i = 0; i < num_receivers; i++) { |
| 183 | receivers[i] = new harness_mapped_receiver<OutputType>(g); |
| 184 | } |
| 185 | |
| 186 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 187 | tbb::flow::make_edge( exe_node, *receivers[r] ); |
| 188 | } |
| 189 | |
| 190 | harness_counting_sender<InputType> *senders = NULL; |
| 191 | for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { |
| 192 | senders = new harness_counting_sender<InputType>[num_senders]; |
| 193 | for (size_t s = 0; s < num_senders; ++s ) { |
| 194 | senders[s].my_limit = N; |
| 195 | tbb::flow::make_edge( senders[s], exe_node ); |
| 196 | } |
| 197 | |
| 198 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 199 | receivers[r]->initialize_map( N, num_senders ); |
| 200 | } |
| 201 | |
| 202 | NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); |
| 203 | g.wait_for_all(); |
| 204 | |
| 205 | for (size_t s = 0; s < num_senders; ++s ) { |
| 206 | size_t n = senders[s].my_received; |
| 207 | ASSERT( n == N, NULL ); |
| 208 | ASSERT( senders[s].my_receiver == &exe_node, NULL ); |
| 209 | } |
| 210 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 211 | receivers[r]->validate(); |
| 212 | } |
| 213 | delete [] senders; |
| 214 | } |
| 215 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 216 | tbb::flow::remove_edge( exe_node, *receivers[r] ); |
| 217 | } |
| 218 | ASSERT( exe_node.try_put( InputType() ) == true, NULL ); |
| 219 | g.wait_for_all(); |
| 220 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 221 | receivers[r]->validate(); |
| 222 | } |
| 223 | |
| 224 | for (size_t i = 0; i < num_receivers; i++) { |
| 225 | delete receivers[i]; |
| 226 | } |
| 227 | } |
| 228 | |
| 229 | // validate that the local body matches the global execute_count and both are correct |
| 230 | inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); |
| 231 | const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset; |
| 232 | size_t global_count = global_execute_count; |
| 233 | size_t inc_count = body_copy.local_execute_count; |
| 234 | ASSERT( global_count == expected_count && global_count == inc_count, NULL ); |
| 235 | g.reset(tbb::flow::rf_reset_bodies); |
| 236 | body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); |
| 237 | inc_count = body_copy.local_execute_count; |
| 238 | ASSERT( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor" ); |
| 239 | } |
| 240 | } |
| 241 | |
| 242 | template< typename InputType, typename OutputType > |
| 243 | void run_buffered_levels( int c ) { |
| 244 | #if __TBB_CPP11_LAMBDAS_PRESENT |
| 245 | buffered_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } ); |
| 246 | #endif |
| 247 | buffered_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::func ); |
| 248 | buffered_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::functor() ); |
| 249 | buffered_levels_with_copy<InputType,OutputType>( c ); |
| 250 | } |
| 251 | |
| 252 | |
| 253 | //! Performs test on executable nodes with limited concurrency |
| 254 | /** These tests check: |
| 255 | 1) that the nodes will accepts puts up to the concurrency limit, |
| 256 | 2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor), |
| 257 | 3) the nodes will receive puts from multiple successors simultaneously, |
| 258 | and 4) the nodes will send to multiple predecessors. |
| 259 | There is no checking of the contents of the messages for corruption. |
| 260 | */ |
| 261 | |
| 262 | template< typename InputType, typename OutputType, typename Body > |
| 263 | void concurrency_levels( size_t concurrency, Body body ) { |
| 264 | |
| 265 | for ( size_t lc = 1; lc <= concurrency; ++lc ) { |
| 266 | tbb::flow::graph g; |
| 267 | |
| 268 | // Set the execute_counter back to zero in the harness |
| 269 | harness_graph_executor<InputType, OutputType>::execute_count = 0; |
| 270 | // Set the number of current executors to zero. |
| 271 | harness_graph_executor<InputType, OutputType>::current_executors = 0; |
| 272 | // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. |
| 273 | harness_graph_executor<InputType, OutputType>::max_executors = lc; |
| 274 | |
| 275 | typedef tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > fnode_type; |
| 276 | fnode_type exe_node( g, lc, body ); |
| 277 | |
| 278 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { |
| 279 | |
| 280 | std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g)); |
| 281 | |
| 282 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
| 283 | ASSERT(exe_node.successor_count() == 0, NULL); |
| 284 | ASSERT(exe_node.predecessor_count() == 0, NULL); |
| 285 | #endif |
| 286 | |
| 287 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 288 | tbb::flow::make_edge( exe_node, receivers[r] ); |
| 289 | } |
| 290 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
| 291 | ASSERT(exe_node.successor_count() == num_receivers, NULL); |
| 292 | typename fnode_type::successor_list_type my_succs; |
| 293 | exe_node.copy_successors(my_succs); |
| 294 | ASSERT(my_succs.size() == num_receivers, NULL); |
| 295 | typename fnode_type::predecessor_list_type my_preds; |
| 296 | exe_node.copy_predecessors(my_preds); |
| 297 | ASSERT(my_preds.size() == 0, NULL); |
| 298 | #endif |
| 299 | |
| 300 | harness_counting_sender<InputType> *senders = NULL; |
| 301 | |
| 302 | for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { |
| 303 | senders = new harness_counting_sender<InputType>[num_senders]; |
| 304 | { |
| 305 | // Exclusively lock m to prevent exe_node from finishing |
| 306 | tbb::spin_rw_mutex::scoped_lock l( harness_graph_executor<InputType, OutputType>::template mutex_holder<tbb::spin_rw_mutex>::mutex ); |
| 307 | |
| 308 | // put to lc level, it will accept and then block at m |
| 309 | for ( size_t c = 0 ; c < lc ; ++c ) { |
| 310 | ASSERT( exe_node.try_put( InputType() ) == true, NULL ); |
| 311 | } |
| 312 | // it only accepts to lc level |
| 313 | ASSERT( exe_node.try_put( InputType() ) == false, NULL ); |
| 314 | |
| 315 | for (size_t s = 0; s < num_senders; ++s ) { |
| 316 | // register a sender |
| 317 | senders[s].my_limit = N; |
| 318 | exe_node.register_predecessor( senders[s] ); |
| 319 | } |
| 320 | |
| 321 | } // release lock at end of scope, setting the exe node free to continue |
| 322 | // wait for graph to settle down |
| 323 | g.wait_for_all(); |
| 324 | |
| 325 | // confirm that each sender was requested from N times |
| 326 | for (size_t s = 0; s < num_senders; ++s ) { |
| 327 | size_t n = senders[s].my_received; |
| 328 | ASSERT( n == N, NULL ); |
| 329 | ASSERT( senders[s].my_receiver == &exe_node, NULL ); |
| 330 | } |
| 331 | // confirm that each receivers got N * num_senders + the initial lc puts |
| 332 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 333 | size_t n = receivers[r].my_count; |
| 334 | ASSERT( n == num_senders*N+lc, NULL ); |
| 335 | receivers[r].my_count = 0; |
| 336 | } |
| 337 | delete [] senders; |
| 338 | } |
| 339 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 340 | tbb::flow::remove_edge( exe_node, receivers[r] ); |
| 341 | } |
| 342 | ASSERT( exe_node.try_put( InputType() ) == true, NULL ); |
| 343 | g.wait_for_all(); |
| 344 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 345 | ASSERT( int(receivers[r].my_count) == 0, NULL ); |
| 346 | } |
| 347 | } |
| 348 | } |
| 349 | } |
| 350 | |
| 351 | |
| 352 | template< typename InputType, typename OutputType > |
| 353 | void run_concurrency_levels( int c ) { |
| 354 | #if __TBB_CPP11_LAMBDAS_PRESENT |
| 355 | concurrency_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex>(i); } ); |
| 356 | #endif |
| 357 | concurrency_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex> ); |
| 358 | concurrency_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::template tfunctor<tbb::spin_rw_mutex>() ); |
| 359 | } |
| 360 | |
| 361 | |
| 362 | struct empty_no_assign { |
| 363 | empty_no_assign() {} |
| 364 | empty_no_assign( int ) {} |
| 365 | operator int() { return 0; } |
| 366 | }; |
| 367 | |
| 368 | template< typename InputType > |
| 369 | struct parallel_puts : private NoAssign { |
| 370 | |
| 371 | tbb::flow::receiver< InputType > * const my_exe_node; |
| 372 | |
| 373 | parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} |
| 374 | |
| 375 | void operator()( int ) const { |
| 376 | for ( int i = 0; i < N; ++i ) { |
| 377 | // the nodes will accept all puts |
| 378 | ASSERT( my_exe_node->try_put( InputType() ) == true, NULL ); |
| 379 | } |
| 380 | } |
| 381 | |
| 382 | }; |
| 383 | |
| 384 | //! Performs test on executable nodes with unlimited concurrency |
| 385 | /** These tests check: |
| 386 | 1) that the nodes will accept all puts |
| 387 | 2) the nodes will receive puts from multiple predecessors simultaneously, |
| 388 | and 3) the nodes will send to multiple successors. |
| 389 | There is no checking of the contents of the messages for corruption. |
| 390 | */ |
| 391 | |
| 392 | template< typename InputType, typename OutputType, typename Body > |
| 393 | void unlimited_concurrency( Body body ) { |
| 394 | |
| 395 | for (int p = 1; p < 2*MaxThread; ++p) { |
| 396 | tbb::flow::graph g; |
| 397 | tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body ); |
| 398 | |
| 399 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { |
| 400 | |
| 401 | std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g)); |
| 402 | harness_graph_executor<InputType, OutputType>::execute_count = 0; |
| 403 | |
| 404 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 405 | tbb::flow::make_edge( exe_node, receivers[r] ); |
| 406 | } |
| 407 | |
| 408 | NativeParallelFor( p, parallel_puts<InputType>(exe_node) ); |
| 409 | g.wait_for_all(); |
| 410 | |
| 411 | // 2) the nodes will receive puts from multiple predecessors simultaneously, |
| 412 | size_t ec = harness_graph_executor<InputType, OutputType>::execute_count; |
| 413 | ASSERT( (int)ec == p*N, NULL ); |
| 414 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 415 | size_t c = receivers[r].my_count; |
| 416 | // 3) the nodes will send to multiple successors. |
| 417 | ASSERT( (int)c == p*N, NULL ); |
| 418 | } |
| 419 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 420 | tbb::flow::remove_edge( exe_node, receivers[r] ); |
| 421 | } |
| 422 | } |
| 423 | } |
| 424 | } |
| 425 | |
| 426 | template< typename InputType, typename OutputType > |
| 427 | void run_unlimited_concurrency() { |
| 428 | harness_graph_executor<InputType, OutputType>::max_executors = 0; |
| 429 | #if __TBB_CPP11_LAMBDAS_PRESENT |
| 430 | unlimited_concurrency<InputType,OutputType>( []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } ); |
| 431 | #endif |
| 432 | unlimited_concurrency<InputType,OutputType>( &harness_graph_executor<InputType, OutputType>::func ); |
| 433 | unlimited_concurrency<InputType,OutputType>( typename harness_graph_executor<InputType, OutputType>::functor() ); |
| 434 | } |
| 435 | |
| 436 | struct continue_msg_to_int { |
| 437 | int my_int; |
| 438 | continue_msg_to_int(int x) : my_int(x) {} |
| 439 | int operator()(tbb::flow::continue_msg) { return my_int; } |
| 440 | }; |
| 441 | |
| 442 | void test_function_node_with_continue_msg_as_input() { |
| 443 | // If this function terminates, then this test is successful |
| 444 | tbb::flow::graph g; |
| 445 | |
| 446 | tbb::flow::broadcast_node<tbb::flow::continue_msg> Start(g); |
| 447 | |
| 448 | tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN1( g, tbb::flow::serial, continue_msg_to_int(42)); |
| 449 | tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN2( g, tbb::flow::serial, continue_msg_to_int(43)); |
| 450 | |
| 451 | tbb::flow::make_edge( Start, FN1 ); |
| 452 | tbb::flow::make_edge( Start, FN2 ); |
| 453 | |
| 454 | Start.try_put( tbb::flow::continue_msg() ); |
| 455 | g.wait_for_all(); |
| 456 | } |
| 457 | |
| 458 | //! Tests limited concurrency cases for nodes that accept data messages |
| 459 | void test_concurrency(int num_threads) { |
| 460 | tbb::task_scheduler_init init(num_threads); |
| 461 | run_concurrency_levels<int,int>(num_threads); |
| 462 | run_concurrency_levels<int,tbb::flow::continue_msg>(num_threads); |
| 463 | run_buffered_levels<int, int>(num_threads); |
| 464 | run_unlimited_concurrency<int,int>(); |
| 465 | run_unlimited_concurrency<int,empty_no_assign>(); |
| 466 | run_unlimited_concurrency<empty_no_assign,int>(); |
| 467 | run_unlimited_concurrency<empty_no_assign,empty_no_assign>(); |
| 468 | run_unlimited_concurrency<int,tbb::flow::continue_msg>(); |
| 469 | run_unlimited_concurrency<empty_no_assign,tbb::flow::continue_msg>(); |
| 470 | test_function_node_with_continue_msg_as_input(); |
| 471 | } |
| 472 | |
| 473 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
| 474 | struct add_to_counter { |
| 475 | int* counter; |
| 476 | add_to_counter(int& var):counter(&var){} |
| 477 | int operator()(int i){*counter+=1; return i + 1;} |
| 478 | }; |
| 479 | |
| 480 | template<typename FTYPE> |
| 481 | void test_extract() { |
| 482 | int my_count = 0; |
| 483 | int cm; |
| 484 | tbb::flow::graph g; |
| 485 | tbb::flow::broadcast_node<int> b0(g); |
| 486 | tbb::flow::broadcast_node<int> b1(g); |
| 487 | tbb::flow::function_node<int, int, FTYPE> f0(g, tbb::flow::unlimited, add_to_counter(my_count)); |
| 488 | tbb::flow::queue_node<int> q0(g); |
| 489 | |
| 490 | tbb::flow::make_edge(b0, f0); |
| 491 | tbb::flow::make_edge(b1, f0); |
| 492 | tbb::flow::make_edge(f0, q0); |
| 493 | for( int i = 0; i < 2; ++i ) { |
| 494 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts" ); |
| 495 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts" ); |
| 496 | ASSERT(f0.predecessor_count() == 2 && f0.successor_count() == 1, "f0 has incorrect counts" ); |
| 497 | ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
| 498 | |
| 499 | /* b0 */ |
| 500 | /* \ */ |
| 501 | /* f0 - q0 */ |
| 502 | /* / */ |
| 503 | /* b1 */ |
| 504 | |
| 505 | b0.try_put(1); |
| 506 | g.wait_for_all(); |
| 507 | ASSERT(my_count == 1, "function_node didn't fire" ); |
| 508 | ASSERT(q0.try_get(cm), "function_node didn't forward" ); |
| 509 | b1.try_put(1); |
| 510 | g.wait_for_all(); |
| 511 | ASSERT(my_count == 2, "function_node didn't fire" ); |
| 512 | ASSERT(q0.try_get(cm), "function_node didn't forward" ); |
| 513 | |
| 514 | b0.extract(); |
| 515 | |
| 516 | /* b0 */ |
| 517 | /* */ |
| 518 | /* f0 - q0 */ |
| 519 | /* / */ |
| 520 | /* b1 */ |
| 521 | |
| 522 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts" ); |
| 523 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts" ); |
| 524 | ASSERT(f0.predecessor_count() == 1 && f0.successor_count() == 1, "f0 has incorrect counts" ); |
| 525 | ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
| 526 | b0.try_put(1); |
| 527 | b0.try_put(1); |
| 528 | g.wait_for_all(); |
| 529 | ASSERT(my_count == 2, "b0 messages being forwarded to function_node even though it is disconnected" ); |
| 530 | b1.try_put(1); |
| 531 | g.wait_for_all(); |
| 532 | ASSERT(my_count == 3, "function_node didn't fire though it has only one predecessor" ); |
| 533 | ASSERT(q0.try_get(cm), "function_node didn't forward second time" ); |
| 534 | |
| 535 | f0.extract(); |
| 536 | |
| 537 | /* b0 */ |
| 538 | /* */ |
| 539 | /* f0 q0 */ |
| 540 | /* */ |
| 541 | /* b1 */ |
| 542 | |
| 543 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts" ); |
| 544 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts" ); |
| 545 | ASSERT(f0.predecessor_count() == 0 && f0.successor_count() == 0, "f0 has incorrect counts" ); |
| 546 | ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
| 547 | b0.try_put(1); |
| 548 | b0.try_put(1); |
| 549 | b1.try_put(1); |
| 550 | b1.try_put(1); |
| 551 | g.wait_for_all(); |
| 552 | ASSERT(my_count == 3, "function_node didn't fire though it has only one predecessor" ); |
| 553 | ASSERT(!q0.try_get(cm), "function_node forwarded though it shouldn't" ); |
| 554 | make_edge(b0, f0); |
| 555 | |
| 556 | /* b0 */ |
| 557 | /* \ */ |
| 558 | /* f0 q0 */ |
| 559 | /* */ |
| 560 | /* b1 */ |
| 561 | |
| 562 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts" ); |
| 563 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts" ); |
| 564 | ASSERT(f0.predecessor_count() == 1 && f0.successor_count() == 0, "f0 has incorrect counts" ); |
| 565 | ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
| 566 | |
| 567 | b0.try_put(int()); |
| 568 | g.wait_for_all(); |
| 569 | |
| 570 | ASSERT(my_count == 4, "function_node didn't fire though it has only one predecessor" ); |
| 571 | ASSERT(!q0.try_get(cm), "function_node forwarded though it shouldn't" ); |
| 572 | |
| 573 | tbb::flow::make_edge(b1, f0); |
| 574 | tbb::flow::make_edge(f0, q0); |
| 575 | my_count = 0; |
| 576 | } |
| 577 | } |
| 578 | #endif |
| 579 | |
| 580 | int TestMain() { |
| 581 | if( MinThread<1 ) { |
| 582 | REPORT("number of threads must be positive\n" ); |
| 583 | exit(1); |
| 584 | } |
| 585 | for( int p=MinThread; p<=MaxThread; ++p ) { |
| 586 | test_concurrency(p); |
| 587 | } |
| 588 | lightweight_testing::test<tbb::flow::function_node>(10); |
| 589 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
| 590 | test_extract<tbb::flow::rejecting>(); |
| 591 | test_extract<tbb::flow::queueing>(); |
| 592 | #endif |
| 593 | return Harness::Done; |
| 594 | } |
| 595 | |
| 596 | |