| 1 | /* |
| 2 | Copyright (c) 2005-2019 Intel Corporation |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | #if __TBB_CPF_BUILD |
| 18 | #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1 |
| 19 | #endif |
| 20 | |
| 21 | #include "harness_graph.h" |
| 22 | |
| 23 | #include "tbb/flow_graph.h" |
| 24 | #include "tbb/task_scheduler_init.h" |
| 25 | #include "tbb/spin_rw_mutex.h" |
| 26 | |
| 27 | #if TBB_USE_DEBUG |
| 28 | #define N 16 |
| 29 | #else |
| 30 | #define N 100 |
| 31 | #endif |
| 32 | #define MAX_NODES 4 |
| 33 | |
| 34 | //! Performs test on function nodes with limited concurrency and buffering |
| 35 | /** These tests check: |
| 36 | 1) that the number of executing copies never exceed the concurrency limit |
| 37 | 2) that the node never rejects |
| 38 | 3) that no items are lost |
| 39 | and 4) all of this happens even if there are multiple predecessors and successors |
| 40 | */ |
| 41 | |
| 42 | template< typename InputType > |
| 43 | struct parallel_put_until_limit : private NoAssign { |
| 44 | |
| 45 | harness_counting_sender<InputType> *my_senders; |
| 46 | |
| 47 | parallel_put_until_limit( harness_counting_sender<InputType> *senders ) : my_senders(senders) {} |
| 48 | |
| 49 | void operator()( int i ) const { |
| 50 | if ( my_senders ) { |
| 51 | my_senders[i].try_put_until_limit(); |
| 52 | } |
| 53 | } |
| 54 | |
| 55 | }; |
| 56 | |
| 57 | //! exercise buffered multifunction_node. |
| 58 | template< typename InputType, typename OutputTuple, typename Body > |
| 59 | void buffered_levels( size_t concurrency, Body body ) { |
| 60 | typedef typename tbb::flow::tuple_element<0,OutputTuple>::type OutputType; |
| 61 | // Do for lc = 1 to concurrency level |
| 62 | for ( size_t lc = 1; lc <= concurrency; ++lc ) { |
| 63 | tbb::flow::graph g; |
| 64 | |
| 65 | // Set the execute_counter back to zero in the harness |
| 66 | harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; |
| 67 | // Set the number of current executors to zero. |
| 68 | harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0; |
| 69 | // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. |
| 70 | harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc; |
| 71 | |
| 72 | // Create the function_node with the appropriate concurrency level, and use default buffering |
| 73 | tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, body ); |
| 74 | |
| 75 | //Create a vector of identical exe_nodes |
| 76 | std::vector< tbb::flow::multifunction_node< InputType, OutputTuple > > exe_vec(2, exe_node); |
| 77 | |
| 78 | // exercise each of the copied nodes |
| 79 | for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { |
| 80 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { |
| 81 | // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them. |
| 82 | std::vector< harness_mapped_receiver<OutputType>* > receivers(num_receivers); |
| 83 | for (size_t i = 0; i < num_receivers; i++) { |
| 84 | receivers[i] = new harness_mapped_receiver<OutputType>(g); |
| 85 | } |
| 86 | |
| 87 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 88 | tbb::flow::make_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] ); |
| 89 | } |
| 90 | |
| 91 | // Do the test with varying numbers of senders |
| 92 | harness_counting_sender<InputType> *senders = NULL; |
| 93 | for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { |
| 94 | // Create num_senders senders, set their message limit each to N, and connect them to the exe_vec[node_idx] |
| 95 | senders = new harness_counting_sender<InputType>[num_senders]; |
| 96 | for (size_t s = 0; s < num_senders; ++s ) { |
| 97 | senders[s].my_limit = N; |
| 98 | tbb::flow::make_edge( senders[s], exe_vec[node_idx] ); |
| 99 | } |
| 100 | |
| 101 | // Initialize the receivers so they know how many senders and messages to check for |
| 102 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 103 | receivers[r]->initialize_map( N, num_senders ); |
| 104 | } |
| 105 | |
| 106 | // Do the test |
| 107 | NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); |
| 108 | g.wait_for_all(); |
| 109 | |
| 110 | // confirm that each sender was requested from N times |
| 111 | for (size_t s = 0; s < num_senders; ++s ) { |
| 112 | size_t n = senders[s].my_received; |
| 113 | ASSERT( n == N, NULL ); |
| 114 | ASSERT( senders[s].my_receiver == &exe_vec[node_idx], NULL ); |
| 115 | } |
| 116 | // validate the receivers |
| 117 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 118 | receivers[r]->validate(); |
| 119 | } |
| 120 | delete [] senders; |
| 121 | } |
| 122 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 123 | tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] ); |
| 124 | } |
| 125 | ASSERT( exe_vec[node_idx].try_put( InputType() ) == true, NULL ); |
| 126 | g.wait_for_all(); |
| 127 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 128 | // since it's detached, nothing should have changed |
| 129 | receivers[r]->validate(); |
| 130 | } |
| 131 | |
| 132 | for (size_t i = 0; i < num_receivers; i++) { |
| 133 | delete receivers[i]; |
| 134 | } |
| 135 | } |
| 136 | } |
| 137 | } |
| 138 | } |
| 139 | |
| 140 | const size_t Offset = 123; |
| 141 | tbb::atomic<size_t> global_execute_count; |
| 142 | |
| 143 | struct inc_functor { |
| 144 | |
| 145 | tbb::atomic<size_t> local_execute_count; |
| 146 | inc_functor( ) { local_execute_count = 0; } |
| 147 | inc_functor( const inc_functor &f ) { local_execute_count = f.local_execute_count; } |
| 148 | |
| 149 | template<typename output_ports_type> |
| 150 | void operator()( int i, output_ports_type &p ) { |
| 151 | ++global_execute_count; |
| 152 | ++local_execute_count; |
| 153 | (void)tbb::flow::get<0>(p).try_put(i); |
| 154 | } |
| 155 | |
| 156 | }; |
| 157 | |
| 158 | template< typename InputType, typename OutputTuple > |
| 159 | void buffered_levels_with_copy( size_t concurrency ) { |
| 160 | typedef typename tbb::flow::tuple_element<0,OutputTuple>::type OutputType; |
| 161 | // Do for lc = 1 to concurrency level |
| 162 | for ( size_t lc = 1; lc <= concurrency; ++lc ) { |
| 163 | tbb::flow::graph g; |
| 164 | |
| 165 | inc_functor cf; |
| 166 | cf.local_execute_count = Offset; |
| 167 | global_execute_count = Offset; |
| 168 | |
| 169 | tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, cf ); |
| 170 | |
| 171 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { |
| 172 | |
| 173 | std::vector< harness_mapped_receiver<OutputType>* > receivers(num_receivers); |
| 174 | for (size_t i = 0; i < num_receivers; i++) { |
| 175 | receivers[i] = new harness_mapped_receiver<OutputType>(g); |
| 176 | } |
| 177 | |
| 178 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 179 | tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); |
| 180 | } |
| 181 | |
| 182 | harness_counting_sender<InputType> *senders = NULL; |
| 183 | for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { |
| 184 | senders = new harness_counting_sender<InputType>[num_senders]; |
| 185 | for (size_t s = 0; s < num_senders; ++s ) { |
| 186 | senders[s].my_limit = N; |
| 187 | tbb::flow::make_edge( senders[s], exe_node ); |
| 188 | } |
| 189 | |
| 190 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 191 | receivers[r]->initialize_map( N, num_senders ); |
| 192 | } |
| 193 | |
| 194 | NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); |
| 195 | g.wait_for_all(); |
| 196 | |
| 197 | for (size_t s = 0; s < num_senders; ++s ) { |
| 198 | size_t n = senders[s].my_received; |
| 199 | ASSERT( n == N, NULL ); |
| 200 | ASSERT( senders[s].my_receiver == &exe_node, NULL ); |
| 201 | } |
| 202 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 203 | receivers[r]->validate(); |
| 204 | } |
| 205 | delete [] senders; |
| 206 | } |
| 207 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 208 | tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); |
| 209 | } |
| 210 | ASSERT( exe_node.try_put( InputType() ) == true, NULL ); |
| 211 | g.wait_for_all(); |
| 212 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 213 | receivers[r]->validate(); |
| 214 | } |
| 215 | |
| 216 | for (size_t i = 0; i < num_receivers; i++) { |
| 217 | delete receivers[i]; |
| 218 | } |
| 219 | } |
| 220 | |
| 221 | // validate that the local body matches the global execute_count and both are correct |
| 222 | inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); |
| 223 | const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset; |
| 224 | size_t global_count = global_execute_count; |
| 225 | size_t inc_count = body_copy.local_execute_count; |
| 226 | ASSERT( global_count == expected_count && global_count == inc_count, NULL ); |
| 227 | } |
| 228 | } |
| 229 | |
| 230 | template< typename InputType, typename OutputTuple > |
| 231 | void run_buffered_levels( int c ) { |
| 232 | #if __TBB_CPP11_LAMBDAS_PRESENT |
| 233 | typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; |
| 234 | buffered_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } ); |
| 235 | #endif |
| 236 | buffered_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::func ); |
| 237 | buffered_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() ); |
| 238 | buffered_levels_with_copy<InputType,OutputTuple>( c ); |
| 239 | } |
| 240 | |
| 241 | |
| 242 | //! Performs test on executable nodes with limited concurrency |
| 243 | /** These tests check: |
| 244 | 1) that the nodes will accepts puts up to the concurrency limit, |
| 245 | 2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor), |
| 246 | 3) the nodes will receive puts from multiple successors simultaneously, |
| 247 | and 4) the nodes will send to multiple predecessors. |
| 248 | There is no checking of the contents of the messages for corruption. |
| 249 | */ |
| 250 | |
| 251 | template< typename InputType, typename OutputTuple, typename Body > |
| 252 | void concurrency_levels( size_t concurrency, Body body ) { |
| 253 | typedef typename tbb::flow::tuple_element<0,OutputTuple>::type OutputType; |
| 254 | for ( size_t lc = 1; lc <= concurrency; ++lc ) { |
| 255 | tbb::flow::graph g; |
| 256 | |
| 257 | // Set the execute_counter back to zero in the harness |
| 258 | harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; |
| 259 | // Set the number of current executors to zero. |
| 260 | harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0; |
| 261 | // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. |
| 262 | harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc; |
| 263 | |
| 264 | |
| 265 | tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, lc, body ); |
| 266 | |
| 267 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { |
| 268 | |
| 269 | std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g)); |
| 270 | |
| 271 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 272 | tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), receivers[r] ); |
| 273 | } |
| 274 | |
| 275 | harness_counting_sender<InputType> *senders = NULL; |
| 276 | |
| 277 | for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { |
| 278 | { |
| 279 | // Exclusively lock m to prevent exe_node from finishing |
| 280 | tbb::spin_rw_mutex::scoped_lock l( harness_graph_multifunction_executor< InputType, OutputTuple>::template mutex_holder<tbb::spin_rw_mutex>::mutex ); |
| 281 | |
| 282 | // put to lc level, it will accept and then block at m |
| 283 | for ( size_t c = 0 ; c < lc ; ++c ) { |
| 284 | ASSERT( exe_node.try_put( InputType() ) == true, NULL ); |
| 285 | } |
| 286 | // it only accepts to lc level |
| 287 | ASSERT( exe_node.try_put( InputType() ) == false, NULL ); |
| 288 | |
| 289 | senders = new harness_counting_sender<InputType>[num_senders]; |
| 290 | for (size_t s = 0; s < num_senders; ++s ) { |
| 291 | // register a sender |
| 292 | senders[s].my_limit = N; |
| 293 | exe_node.register_predecessor( senders[s] ); |
| 294 | } |
| 295 | |
| 296 | } // release lock at end of scope, setting the exe node free to continue |
| 297 | // wait for graph to settle down |
| 298 | g.wait_for_all(); |
| 299 | |
| 300 | // confirm that each sender was requested from N times |
| 301 | for (size_t s = 0; s < num_senders; ++s ) { |
| 302 | size_t n = senders[s].my_received; |
| 303 | ASSERT( n == N, NULL ); |
| 304 | ASSERT( senders[s].my_receiver == &exe_node, NULL ); |
| 305 | } |
| 306 | // confirm that each receivers got N * num_senders + the initial lc puts |
| 307 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 308 | size_t n = receivers[r].my_count; |
| 309 | ASSERT( n == num_senders*N+lc, NULL ); |
| 310 | receivers[r].my_count = 0; |
| 311 | } |
| 312 | delete [] senders; |
| 313 | } |
| 314 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 315 | tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), receivers[r] ); |
| 316 | } |
| 317 | ASSERT( exe_node.try_put( InputType() ) == true, NULL ); |
| 318 | g.wait_for_all(); |
| 319 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 320 | ASSERT( int(receivers[r].my_count) == 0, NULL ); |
| 321 | } |
| 322 | } |
| 323 | } |
| 324 | } |
| 325 | |
| 326 | template< typename InputType, typename OutputTuple > |
| 327 | void run_concurrency_levels( int c ) { |
| 328 | #if __TBB_CPP11_LAMBDAS_PRESENT |
| 329 | typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; |
| 330 | concurrency_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex>(i,p); } ); |
| 331 | #endif |
| 332 | concurrency_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex> ); |
| 333 | concurrency_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunctor<tbb::spin_rw_mutex>() ); |
| 334 | } |
| 335 | |
| 336 | |
| 337 | struct empty_no_assign { |
| 338 | empty_no_assign() {} |
| 339 | empty_no_assign( int ) {} |
| 340 | operator int() { return 0; } |
| 341 | operator int() const { return 0; } |
| 342 | }; |
| 343 | |
| 344 | template< typename InputType > |
| 345 | struct parallel_puts : private NoAssign { |
| 346 | |
| 347 | tbb::flow::receiver< InputType > * const my_exe_node; |
| 348 | |
| 349 | parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} |
| 350 | |
| 351 | void operator()( int ) const { |
| 352 | for ( int i = 0; i < N; ++i ) { |
| 353 | // the nodes will accept all puts |
| 354 | ASSERT( my_exe_node->try_put( InputType() ) == true, NULL ); |
| 355 | } |
| 356 | } |
| 357 | |
| 358 | }; |
| 359 | |
| 360 | //! Performs test on executable nodes with unlimited concurrency |
| 361 | /** These tests check: |
| 362 | 1) that the nodes will accept all puts |
| 363 | 2) the nodes will receive puts from multiple predecessors simultaneously, |
| 364 | and 3) the nodes will send to multiple successors. |
| 365 | There is no checking of the contents of the messages for corruption. |
| 366 | */ |
| 367 | |
| 368 | template< typename InputType, typename OutputTuple, typename Body > |
| 369 | void unlimited_concurrency( Body body ) { |
| 370 | typedef typename tbb::flow::tuple_element<0,OutputTuple>::type OutputType; |
| 371 | |
| 372 | for (int p = 1; p < 2*MaxThread; ++p) { |
| 373 | tbb::flow::graph g; |
| 374 | tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body ); |
| 375 | |
| 376 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { |
| 377 | std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g)); |
| 378 | |
| 379 | harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; |
| 380 | |
| 381 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 382 | tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), receivers[r] ); |
| 383 | } |
| 384 | |
| 385 | NativeParallelFor( p, parallel_puts<InputType>(exe_node) ); |
| 386 | g.wait_for_all(); |
| 387 | |
| 388 | // 2) the nodes will receive puts from multiple predecessors simultaneously, |
| 389 | size_t ec = harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count; |
| 390 | ASSERT( (int)ec == p*N, NULL ); |
| 391 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 392 | size_t c = receivers[r].my_count; |
| 393 | // 3) the nodes will send to multiple successors. |
| 394 | ASSERT( (int)c == p*N, NULL ); |
| 395 | } |
| 396 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 397 | tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), receivers[r] ); |
| 398 | } |
| 399 | } |
| 400 | } |
| 401 | } |
| 402 | |
| 403 | template< typename InputType, typename OutputTuple > |
| 404 | void run_unlimited_concurrency() { |
| 405 | harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0; |
| 406 | #if __TBB_CPP11_LAMBDAS_PRESENT |
| 407 | typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; |
| 408 | unlimited_concurrency<InputType,OutputTuple>( []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } ); |
| 409 | #endif |
| 410 | unlimited_concurrency<InputType,OutputTuple>( &harness_graph_multifunction_executor<InputType, OutputTuple>::func ); |
| 411 | unlimited_concurrency<InputType,OutputTuple>( typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() ); |
| 412 | } |
| 413 | |
| 414 | template<typename InputType, typename OutputTuple> |
| 415 | struct oddEvenBody { |
| 416 | typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; |
| 417 | typedef typename tbb::flow::tuple_element<0,OutputTuple>::type EvenType; |
| 418 | typedef typename tbb::flow::tuple_element<1,OutputTuple>::type OddType; |
| 419 | void operator() (const InputType &i, output_ports_type &p) { |
| 420 | if((int)i % 2) { |
| 421 | (void)tbb::flow::get<1>(p).try_put(OddType(i)); |
| 422 | } |
| 423 | else { |
| 424 | (void)tbb::flow::get<0>(p).try_put(EvenType(i)); |
| 425 | } |
| 426 | } |
| 427 | }; |
| 428 | |
| 429 | template<typename InputType, typename OutputTuple > |
| 430 | void run_multiport_test(int num_threads) { |
| 431 | typedef typename tbb::flow::multifunction_node<InputType, OutputTuple> mo_node_type; |
| 432 | typedef typename tbb::flow::tuple_element<0,OutputTuple>::type EvenType; |
| 433 | typedef typename tbb::flow::tuple_element<1,OutputTuple>::type OddType; |
| 434 | tbb::task_scheduler_init init(num_threads); |
| 435 | tbb::flow::graph g; |
| 436 | mo_node_type mo_node(g, tbb::flow::unlimited, oddEvenBody<InputType, OutputTuple>() ); |
| 437 | |
| 438 | tbb::flow::queue_node<EvenType> q0(g); |
| 439 | tbb::flow::queue_node<OddType> q1(g); |
| 440 | |
| 441 | tbb::flow::make_edge(tbb::flow::output_port<0>(mo_node), q0); |
| 442 | tbb::flow::make_edge(tbb::flow::output_port<1>(mo_node), q1); |
| 443 | |
| 444 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
| 445 | ASSERT(mo_node.predecessor_count() == 0, NULL); |
| 446 | ASSERT(tbb::flow::output_port<0>(mo_node).successor_count() == 1, NULL); |
| 447 | typedef typename mo_node_type::output_ports_type oports_type; |
| 448 | typedef typename tbb::flow::tuple_element<0,oports_type>::type port0_type; |
| 449 | typename port0_type::successor_list_type my_0succs; |
| 450 | tbb::flow::output_port<0>(mo_node).copy_successors(my_0succs); |
| 451 | ASSERT(my_0succs.size() == 1, NULL); |
| 452 | typename mo_node_type::predecessor_list_type my_preds; |
| 453 | mo_node.copy_predecessors(my_preds); |
| 454 | ASSERT(my_preds.size() == 0, NULL); |
| 455 | #endif |
| 456 | |
| 457 | for(InputType i = 0; i < N; ++i) { |
| 458 | mo_node.try_put(i); |
| 459 | } |
| 460 | |
| 461 | g.wait_for_all(); |
| 462 | for(int i = 0; i < N/2; ++i) { |
| 463 | EvenType e; |
| 464 | OddType o; |
| 465 | ASSERT(q0.try_get(e) && (int)e % 2 == 0, NULL); |
| 466 | ASSERT(q1.try_get(o) && (int)o % 2 == 1, NULL); |
| 467 | } |
| 468 | } |
| 469 | |
| 470 | //! Tests limited concurrency cases for nodes that accept data messages |
| 471 | void test_concurrency(int num_threads) { |
| 472 | tbb::task_scheduler_init init(num_threads); |
| 473 | run_concurrency_levels<int,tbb::flow::tuple<int> >(num_threads); |
| 474 | run_concurrency_levels<int,tbb::flow::tuple<tbb::flow::continue_msg> >(num_threads); |
| 475 | run_buffered_levels<int, tbb::flow::tuple<int> >(num_threads); |
| 476 | run_unlimited_concurrency<int, tbb::flow::tuple<int> >(); |
| 477 | run_unlimited_concurrency<int,tbb::flow::tuple<empty_no_assign> >(); |
| 478 | run_unlimited_concurrency<empty_no_assign,tbb::flow::tuple<int> >(); |
| 479 | run_unlimited_concurrency<empty_no_assign,tbb::flow::tuple<empty_no_assign> >(); |
| 480 | run_unlimited_concurrency<int,tbb::flow::tuple<tbb::flow::continue_msg> >(); |
| 481 | run_unlimited_concurrency<empty_no_assign,tbb::flow::tuple<tbb::flow::continue_msg> >(); |
| 482 | run_multiport_test<int, tbb::flow::tuple<int, int> >(num_threads); |
| 483 | run_multiport_test<float, tbb::flow::tuple<int, double> >(num_threads); |
| 484 | } |
| 485 | |
| 486 | template<typename Policy> |
| 487 | void test_ports_return_references() { |
| 488 | tbb::flow::graph g; |
| 489 | typedef int InputType; |
| 490 | typedef tbb::flow::tuple<int> OutputTuple; |
| 491 | tbb::flow::multifunction_node<InputType, OutputTuple, Policy> mf_node( |
| 492 | g, tbb::flow::unlimited, |
| 493 | &harness_graph_multifunction_executor<InputType, OutputTuple>::empty_func ); |
| 494 | test_output_ports_return_ref(mf_node); |
| 495 | } |
| 496 | |
| 497 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
| 498 | // the integer received indicates which output ports should succeed and which should fail |
| 499 | // on try_put(). |
| 500 | typedef tbb::flow::multifunction_node<int, tbb::flow::tuple<int, int> > mf_node_type; |
| 501 | |
| 502 | struct add_to_counter { |
| 503 | int my_invocations; |
| 504 | int *counter; |
| 505 | add_to_counter(int& var):counter(&var){ my_invocations = 0;} |
| 506 | void operator()(const int &i, mf_node_type::output_ports_type &outports) { |
| 507 | *counter+=1; |
| 508 | ++my_invocations; |
| 509 | if(i & 0x1) { |
| 510 | ASSERT(tbb::flow::get<0>(outports).try_put(i), "port 0 expected to succeed" ); |
| 511 | } |
| 512 | else { |
| 513 | ASSERT(!tbb::flow::get<0>(outports).try_put(i), "port 0 expected to fail" ); |
| 514 | } |
| 515 | if(i & 0x2) { |
| 516 | ASSERT(tbb::flow::get<1>(outports).try_put(i), "port 1 expected to succeed" ); |
| 517 | } |
| 518 | else { |
| 519 | ASSERT(!tbb::flow::get<1>(outports).try_put(i), "port 1 expected to fail" ); |
| 520 | } |
| 521 | } |
| 522 | int my_inner() { return my_invocations; } |
| 523 | }; |
| 524 | |
| 525 | template<class FTYPE> |
| 526 | void test_extract() { |
| 527 | int my_count = 0; |
| 528 | int cm; |
| 529 | tbb::flow::graph g; |
| 530 | tbb::flow::broadcast_node<int> b0(g); |
| 531 | tbb::flow::broadcast_node<int> b1(g); |
| 532 | tbb::flow::multifunction_node<int, tbb::flow::tuple<int,int>, FTYPE> mf0(g, tbb::flow::unlimited, add_to_counter(my_count)); |
| 533 | tbb::flow::queue_node<int> q0(g); |
| 534 | tbb::flow::queue_node<int> q1(g); |
| 535 | |
| 536 | tbb::flow::make_edge(b0, mf0); |
| 537 | tbb::flow::make_edge(b1, mf0); |
| 538 | tbb::flow::make_edge(tbb::flow::output_port<0>(mf0), q0); |
| 539 | tbb::flow::make_edge(tbb::flow::output_port<1>(mf0), q1); |
| 540 | for( int i = 0; i < 2; ++i ) { |
| 541 | |
| 542 | /* b0 */ |
| 543 | /* \ |--q0 */ |
| 544 | /* mf0+ */ |
| 545 | /* / |--q1 */ |
| 546 | /* b1 */ |
| 547 | |
| 548 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts" ); |
| 549 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts" ); |
| 550 | ASSERT(mf0.predecessor_count() == 2 |
| 551 | && tbb::flow::output_port<0>(mf0).successor_count() == 1 |
| 552 | && tbb::flow::output_port<1>(mf0).successor_count() == 1 |
| 553 | , "mf0 has incorrect counts" ); |
| 554 | ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
| 555 | ASSERT(q1.predecessor_count() == 1 && q1.successor_count() == 0, "q0 has incorrect counts" ); |
| 556 | b0.try_put(3); |
| 557 | g.wait_for_all(); |
| 558 | ASSERT(my_count == 1, "multifunction_node didn't fire" ); |
| 559 | ASSERT(q0.try_get(cm), "multifunction_node didn't forward to 0" ); |
| 560 | ASSERT(q1.try_get(cm), "multifunction_node didn't forward to 1" ); |
| 561 | b1.try_put(3); |
| 562 | g.wait_for_all(); |
| 563 | ASSERT(my_count == 2, "multifunction_node didn't fire" ); |
| 564 | ASSERT(q0.try_get(cm), "multifunction_node didn't forward to 0" ); |
| 565 | ASSERT(q1.try_get(cm), "multifunction_node didn't forward to 1" ); |
| 566 | |
| 567 | b0.extract(); |
| 568 | |
| 569 | |
| 570 | /* b0 */ |
| 571 | /* |--q0 */ |
| 572 | /* mf0+ */ |
| 573 | /* / |--q1 */ |
| 574 | /* b1 */ |
| 575 | |
| 576 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts" ); |
| 577 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts" ); |
| 578 | ASSERT(mf0.predecessor_count() == 1 |
| 579 | && tbb::flow::output_port<0>(mf0).successor_count() == 1 |
| 580 | && tbb::flow::output_port<1>(mf0).successor_count() == 1 |
| 581 | , "mf0 has incorrect counts" ); |
| 582 | ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
| 583 | ASSERT(q1.predecessor_count() == 1 && q1.successor_count() == 0, "q0 has incorrect counts" ); |
| 584 | b0.try_put(1); |
| 585 | b0.try_put(1); |
| 586 | g.wait_for_all(); |
| 587 | ASSERT(my_count == 2, "b0 messages being forwarded to multifunction_node even though it is disconnected" ); |
| 588 | b1.try_put(3); |
| 589 | g.wait_for_all(); |
| 590 | ASSERT(my_count == 3, "multifunction_node didn't fire though it has only one predecessor" ); |
| 591 | ASSERT(q0.try_get(cm), "multifunction_node didn't forward second time" ); |
| 592 | ASSERT(q1.try_get(cm), "multifunction_node didn't forward second time" ); |
| 593 | |
| 594 | q0.extract(); |
| 595 | |
| 596 | /* b0 */ |
| 597 | /* | q0 */ |
| 598 | /* mf0+ */ |
| 599 | /* / |--q1 */ |
| 600 | /* b1 */ |
| 601 | |
| 602 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts" ); |
| 603 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts" ); |
| 604 | ASSERT(mf0.predecessor_count() == 1 |
| 605 | && tbb::flow::output_port<0>(mf0).successor_count() == 0 |
| 606 | && tbb::flow::output_port<1>(mf0).successor_count() == 1 |
| 607 | , "mf0 has incorrect counts" ); |
| 608 | ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
| 609 | ASSERT(q1.predecessor_count() == 1 && q1.successor_count() == 0, "q0 has incorrect counts" ); |
| 610 | b0.try_put(1); |
| 611 | b0.try_put(1); |
| 612 | g.wait_for_all(); |
| 613 | ASSERT(my_count == 3, "b0 messages being forwarded to multifunction_node even though it is disconnected" ); |
| 614 | b1.try_put(2); |
| 615 | g.wait_for_all(); |
| 616 | ASSERT(my_count == 4, "multifunction_node didn't fire though it has one predecessor" ); |
| 617 | ASSERT(!q0.try_get(cm), "multifunction_node forwarded" ); |
| 618 | ASSERT(q1.try_get(cm), "multifunction_node forwarded" ); |
| 619 | mf0.extract(); |
| 620 | |
| 621 | if(i == 0) { |
| 622 | } |
| 623 | else { |
| 624 | g.reset(tbb::flow::rf_reset_bodies); |
| 625 | } |
| 626 | |
| 627 | |
| 628 | /* b0 */ |
| 629 | /* | q0 */ |
| 630 | /* mf0+ */ |
| 631 | /* | q1 */ |
| 632 | /* b1 */ |
| 633 | |
| 634 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts" ); |
| 635 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts" ); |
| 636 | ASSERT(mf0.predecessor_count() == 0 |
| 637 | && tbb::flow::output_port<0>(mf0).successor_count() == 0 |
| 638 | && tbb::flow::output_port<1>(mf0).successor_count() == 0 |
| 639 | , "mf0 has incorrect counts" ); |
| 640 | ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
| 641 | ASSERT(q1.predecessor_count() == 0 && q1.successor_count() == 0, "q0 has incorrect counts" ); |
| 642 | b0.try_put(1); |
| 643 | b0.try_put(1); |
| 644 | g.wait_for_all(); |
| 645 | ASSERT(my_count == 4, "b0 messages being forwarded to multifunction_node even though it is disconnected" ); |
| 646 | b1.try_put(2); |
| 647 | g.wait_for_all(); |
| 648 | ASSERT(my_count == 4, "b1 messages being forwarded to multifunction_node even though it is disconnected" ); |
| 649 | ASSERT(!q0.try_get(cm), "multifunction_node forwarded" ); |
| 650 | ASSERT(!q1.try_get(cm), "multifunction_node forwarded" ); |
| 651 | make_edge(b0, mf0); |
| 652 | |
| 653 | /* b0 */ |
| 654 | /* \ | q0 */ |
| 655 | /* mf0+ */ |
| 656 | /* | q1 */ |
| 657 | /* b1 */ |
| 658 | |
| 659 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts" ); |
| 660 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts" ); |
| 661 | ASSERT(mf0.predecessor_count() == 1 |
| 662 | && tbb::flow::output_port<0>(mf0).successor_count() == 0 |
| 663 | && tbb::flow::output_port<1>(mf0).successor_count() == 0 |
| 664 | , "mf0 has incorrect counts" ); |
| 665 | ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
| 666 | ASSERT(q1.predecessor_count() == 0 && q1.successor_count() == 0, "q0 has incorrect counts" ); |
| 667 | b0.try_put(0); |
| 668 | g.wait_for_all(); |
| 669 | ASSERT(my_count == 5, "multifunction_node didn't fire though it has one predecessor" ); |
| 670 | b1.try_put(2); |
| 671 | g.wait_for_all(); |
| 672 | ASSERT(my_count == 5, "multifunction_node fired though it has only one predecessor" ); |
| 673 | ASSERT(!q0.try_get(cm), "multifunction_node forwarded" ); |
| 674 | ASSERT(!q1.try_get(cm), "multifunction_node forwarded" ); |
| 675 | |
| 676 | tbb::flow::make_edge(b1, mf0); |
| 677 | tbb::flow::make_edge(tbb::flow::output_port<0>(mf0), q0); |
| 678 | tbb::flow::make_edge(tbb::flow::output_port<1>(mf0), q1); |
| 679 | ASSERT( ( i == 0 && tbb::flow::copy_body<add_to_counter>(mf0).my_inner() == 5 ) || |
| 680 | ( i == 1 && tbb::flow::copy_body<add_to_counter>(mf0).my_inner() == 1 ) , "reset_bodies failed" ); |
| 681 | my_count = 0; |
| 682 | } |
| 683 | } |
| 684 | #endif |
| 685 | |
| 686 | int TestMain() { |
| 687 | if( MinThread<1 ) { |
| 688 | REPORT("number of threads must be positive\n" ); |
| 689 | exit(1); |
| 690 | } |
| 691 | for( int p=MinThread; p<=MaxThread; ++p ) { |
| 692 | test_concurrency(p); |
| 693 | } |
| 694 | test_ports_return_references<tbb::flow::queueing>(); |
| 695 | test_ports_return_references<tbb::flow::rejecting>(); |
| 696 | lightweight_testing::test<tbb::flow::multifunction_node>(10); |
| 697 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
| 698 | test_extract<tbb::flow::rejecting>(); |
| 699 | test_extract<tbb::flow::queueing>(); |
| 700 | #endif |
| 701 | return Harness::Done; |
| 702 | } |
| 703 | |