| 1 | /* | 
|---|
| 2 | Copyright (c) 2005-2019 Intel Corporation | 
|---|
| 3 |  | 
|---|
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); | 
|---|
| 5 | you may not use this file except in compliance with the License. | 
|---|
| 6 | You may obtain a copy of the License at | 
|---|
| 7 |  | 
|---|
| 8 | http://www.apache.org/licenses/LICENSE-2.0 | 
|---|
| 9 |  | 
|---|
| 10 | Unless required by applicable law or agreed to in writing, software | 
|---|
| 11 | distributed under the License is distributed on an "AS IS" BASIS, | 
|---|
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|---|
| 13 | See the License for the specific language governing permissions and | 
|---|
| 14 | limitations under the License. | 
|---|
| 15 | */ | 
|---|
| 16 |  | 
|---|
| 17 | #if __TBB_CPF_BUILD | 
|---|
| 18 | #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1 | 
|---|
| 19 | #endif | 
|---|
| 20 |  | 
|---|
| 21 | #include "harness_graph.h" | 
|---|
| 22 |  | 
|---|
| 23 | #include "tbb/flow_graph.h" | 
|---|
| 24 | #include "tbb/task_scheduler_init.h" | 
|---|
| 25 | #include "tbb/spin_rw_mutex.h" | 
|---|
| 26 |  | 
|---|
| 27 | #define N 100 | 
|---|
| 28 | #define MAX_NODES 4 | 
|---|
| 29 |  | 
|---|
| 30 | //! Performs test on function nodes with limited concurrency and buffering | 
|---|
| 31 | /** These tests check: | 
|---|
| 32 | 1) that the number of executing copies never exceed the concurrency limit | 
|---|
| 33 | 2) that the node never rejects | 
|---|
| 34 | 3) that no items are lost | 
|---|
| 35 | and 4) all of this happens even if there are multiple predecessors and successors | 
|---|
| 36 | */ | 
|---|
| 37 |  | 
|---|
| 38 | template< typename InputType > | 
|---|
| 39 | struct parallel_put_until_limit : private NoAssign { | 
|---|
| 40 |  | 
|---|
| 41 | harness_counting_sender<InputType> *my_senders; | 
|---|
| 42 |  | 
|---|
| 43 | parallel_put_until_limit( harness_counting_sender<InputType> *senders ) : my_senders(senders) {} | 
|---|
| 44 |  | 
|---|
| 45 | void operator()( int i ) const  { | 
|---|
| 46 | if ( my_senders ) { | 
|---|
| 47 | my_senders[i].try_put_until_limit(); | 
|---|
| 48 | } | 
|---|
| 49 | } | 
|---|
| 50 |  | 
|---|
| 51 | }; | 
|---|
| 52 |  | 
|---|
| 53 | template<typename IO> | 
|---|
| 54 | struct pass_through { | 
|---|
| 55 | IO operator()(const IO& i) { return i; } | 
|---|
| 56 | }; | 
|---|
| 57 |  | 
|---|
| 58 | template< typename InputType, typename OutputType, typename Body > | 
|---|
| 59 | void buffered_levels( size_t concurrency, Body body ) { | 
|---|
| 60 |  | 
|---|
| 61 | // Do for lc = 1 to concurrency level | 
|---|
| 62 | for ( size_t lc = 1; lc <= concurrency; ++lc ) { | 
|---|
| 63 | tbb::flow::graph g; | 
|---|
| 64 |  | 
|---|
| 65 | // Set the execute_counter back to zero in the harness | 
|---|
| 66 | harness_graph_executor<InputType, OutputType>::execute_count = 0; | 
|---|
| 67 | // Set the number of current executors to zero. | 
|---|
| 68 | harness_graph_executor<InputType, OutputType>::current_executors = 0; | 
|---|
| 69 | // Set the max allowed executors to lc.  There is a check in the functor to make sure this is never exceeded. | 
|---|
| 70 | harness_graph_executor<InputType, OutputType>::max_executors = lc; | 
|---|
| 71 |  | 
|---|
| 72 | // Create the function_node with the appropriate concurrency level, and use default buffering | 
|---|
| 73 | tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, body ); | 
|---|
| 74 | tbb::flow::function_node<InputType, InputType> pass_thru( g, tbb::flow::unlimited, pass_through<InputType>()); | 
|---|
| 75 |  | 
|---|
| 76 | // Create a vector of identical exe_nodes and pass_thrus | 
|---|
| 77 | std::vector< tbb::flow::function_node< InputType, OutputType > > exe_vec(2, exe_node); | 
|---|
| 78 | std::vector< tbb::flow::function_node< InputType, InputType > > pass_thru_vec(2, pass_thru); | 
|---|
| 79 | // Attach each pass_thru to its corresponding exe_node | 
|---|
| 80 | for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { | 
|---|
| 81 | tbb::flow::make_edge(pass_thru_vec[node_idx], exe_vec[node_idx]); | 
|---|
| 82 | } | 
|---|
| 83 |  | 
|---|
| 84 | // TODO: why the test is executed serially for the node pairs, not concurrently? | 
|---|
| 85 | for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { | 
|---|
| 86 | // For num_receivers = 1 to MAX_NODES | 
|---|
| 87 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { | 
|---|
| 88 | // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them. | 
|---|
| 89 | std::vector< harness_mapped_receiver<OutputType>* > receivers(num_receivers); | 
|---|
| 90 | for (size_t i = 0; i < num_receivers; i++) { | 
|---|
| 91 | receivers[i] = new harness_mapped_receiver<OutputType>(g); | 
|---|
| 92 | } | 
|---|
| 93 |  | 
|---|
| 94 | for (size_t r = 0; r < num_receivers; ++r ) { | 
|---|
| 95 | tbb::flow::make_edge( exe_vec[node_idx], *receivers[r] ); | 
|---|
| 96 | } | 
|---|
| 97 |  | 
|---|
| 98 | // Do the test with varying numbers of senders | 
|---|
| 99 | harness_counting_sender<InputType> *senders = NULL; | 
|---|
| 100 | for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { | 
|---|
| 101 | // Create num_senders senders, set there message limit each to N, and connect them to pass_thru_vec[node_idx] | 
|---|
| 102 | senders = new harness_counting_sender<InputType>[num_senders]; | 
|---|
| 103 | for (size_t s = 0; s < num_senders; ++s ) { | 
|---|
| 104 | senders[s].my_limit = N; | 
|---|
| 105 | senders[s].register_successor(pass_thru_vec[node_idx] ); | 
|---|
| 106 | } | 
|---|
| 107 |  | 
|---|
| 108 | // Initialize the receivers so they know how many senders and messages to check for | 
|---|
| 109 | for (size_t r = 0; r < num_receivers; ++r ) { | 
|---|
| 110 | receivers[r]->initialize_map( N, num_senders ); | 
|---|
| 111 | } | 
|---|
| 112 |  | 
|---|
| 113 | // Do the test | 
|---|
| 114 | NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); | 
|---|
| 115 | g.wait_for_all(); | 
|---|
| 116 |  | 
|---|
| 117 | // confirm that each sender was requested from N times | 
|---|
| 118 | for (size_t s = 0; s < num_senders; ++s ) { | 
|---|
| 119 | size_t n = senders[s].my_received; | 
|---|
| 120 | ASSERT( n == N, NULL ); | 
|---|
| 121 | ASSERT( senders[s].my_receiver == &pass_thru_vec[node_idx], NULL ); | 
|---|
| 122 | } | 
|---|
| 123 | // validate the receivers | 
|---|
| 124 | for (size_t r = 0; r < num_receivers; ++r ) { | 
|---|
| 125 | receivers[r]->validate(); | 
|---|
| 126 | } | 
|---|
| 127 | delete [] senders; | 
|---|
| 128 | } | 
|---|
| 129 | for (size_t r = 0; r < num_receivers; ++r ) { | 
|---|
| 130 | tbb::flow::remove_edge( exe_vec[node_idx], *receivers[r] ); | 
|---|
| 131 | } | 
|---|
| 132 | ASSERT( exe_vec[node_idx].try_put( InputType() ) == true, NULL ); | 
|---|
| 133 | g.wait_for_all(); | 
|---|
| 134 | for (size_t r = 0; r < num_receivers; ++r ) { | 
|---|
| 135 | // since it's detached, nothing should have changed | 
|---|
| 136 | receivers[r]->validate(); | 
|---|
| 137 | } | 
|---|
| 138 |  | 
|---|
| 139 | for (size_t i = 0; i < num_receivers; i++) { | 
|---|
| 140 | delete receivers[i]; | 
|---|
| 141 | } | 
|---|
| 142 |  | 
|---|
| 143 | } // for num_receivers | 
|---|
| 144 | } // for node_idx | 
|---|
| 145 | } // for concurrency level lc | 
|---|
| 146 | } | 
|---|
| 147 |  | 
|---|
| 148 | const size_t Offset = 123; | 
|---|
| 149 | tbb::atomic<size_t> global_execute_count; | 
|---|
| 150 |  | 
|---|
| 151 | struct inc_functor { | 
|---|
| 152 |  | 
|---|
| 153 | tbb::atomic<size_t> local_execute_count; | 
|---|
| 154 | inc_functor( ) { local_execute_count = 0; } | 
|---|
| 155 | inc_functor( const inc_functor &f ) { local_execute_count = f.local_execute_count; } | 
|---|
| 156 | void operator=( const inc_functor &f ) { local_execute_count = f.local_execute_count; } | 
|---|
| 157 |  | 
|---|
| 158 | int operator()( int i ) { | 
|---|
| 159 | ++global_execute_count; | 
|---|
| 160 | ++local_execute_count; | 
|---|
| 161 | return i; | 
|---|
| 162 | } | 
|---|
| 163 |  | 
|---|
| 164 | }; | 
|---|
| 165 |  | 
|---|
| 166 | template< typename InputType, typename OutputType > | 
|---|
| 167 | void buffered_levels_with_copy( size_t concurrency ) { | 
|---|
| 168 |  | 
|---|
| 169 | // Do for lc = 1 to concurrency level | 
|---|
| 170 | for ( size_t lc = 1; lc <= concurrency; ++lc ) { | 
|---|
| 171 | tbb::flow::graph g; | 
|---|
| 172 |  | 
|---|
| 173 | inc_functor cf; | 
|---|
| 174 | cf.local_execute_count = Offset; | 
|---|
| 175 | global_execute_count = Offset; | 
|---|
| 176 |  | 
|---|
| 177 | tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, cf ); | 
|---|
| 178 |  | 
|---|
| 179 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { | 
|---|
| 180 |  | 
|---|
| 181 | std::vector< harness_mapped_receiver<OutputType>* > receivers(num_receivers); | 
|---|
| 182 | for (size_t i = 0; i < num_receivers; i++) { | 
|---|
| 183 | receivers[i] = new harness_mapped_receiver<OutputType>(g); | 
|---|
| 184 | } | 
|---|
| 185 |  | 
|---|
| 186 | for (size_t r = 0; r < num_receivers; ++r ) { | 
|---|
| 187 | tbb::flow::make_edge( exe_node, *receivers[r] ); | 
|---|
| 188 | } | 
|---|
| 189 |  | 
|---|
| 190 | harness_counting_sender<InputType> *senders = NULL; | 
|---|
| 191 | for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { | 
|---|
| 192 | senders = new harness_counting_sender<InputType>[num_senders]; | 
|---|
| 193 | for (size_t s = 0; s < num_senders; ++s ) { | 
|---|
| 194 | senders[s].my_limit = N; | 
|---|
| 195 | tbb::flow::make_edge( senders[s], exe_node ); | 
|---|
| 196 | } | 
|---|
| 197 |  | 
|---|
| 198 | for (size_t r = 0; r < num_receivers; ++r ) { | 
|---|
| 199 | receivers[r]->initialize_map( N, num_senders ); | 
|---|
| 200 | } | 
|---|
| 201 |  | 
|---|
| 202 | NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); | 
|---|
| 203 | g.wait_for_all(); | 
|---|
| 204 |  | 
|---|
| 205 | for (size_t s = 0; s < num_senders; ++s ) { | 
|---|
| 206 | size_t n = senders[s].my_received; | 
|---|
| 207 | ASSERT( n == N, NULL ); | 
|---|
| 208 | ASSERT( senders[s].my_receiver == &exe_node, NULL ); | 
|---|
| 209 | } | 
|---|
| 210 | for (size_t r = 0; r < num_receivers; ++r ) { | 
|---|
| 211 | receivers[r]->validate(); | 
|---|
| 212 | } | 
|---|
| 213 | delete [] senders; | 
|---|
| 214 | } | 
|---|
| 215 | for (size_t r = 0; r < num_receivers; ++r ) { | 
|---|
| 216 | tbb::flow::remove_edge( exe_node, *receivers[r] ); | 
|---|
| 217 | } | 
|---|
| 218 | ASSERT( exe_node.try_put( InputType() ) == true, NULL ); | 
|---|
| 219 | g.wait_for_all(); | 
|---|
| 220 | for (size_t r = 0; r < num_receivers; ++r ) { | 
|---|
| 221 | receivers[r]->validate(); | 
|---|
| 222 | } | 
|---|
| 223 |  | 
|---|
| 224 | for (size_t i = 0; i < num_receivers; i++) { | 
|---|
| 225 | delete receivers[i]; | 
|---|
| 226 | } | 
|---|
| 227 | } | 
|---|
| 228 |  | 
|---|
| 229 | // validate that the local body matches the global execute_count and both are correct | 
|---|
| 230 | inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); | 
|---|
| 231 | const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset; | 
|---|
| 232 | size_t global_count = global_execute_count; | 
|---|
| 233 | size_t inc_count = body_copy.local_execute_count; | 
|---|
| 234 | ASSERT( global_count == expected_count && global_count == inc_count, NULL ); | 
|---|
| 235 | g.reset(tbb::flow::rf_reset_bodies); | 
|---|
| 236 | body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); | 
|---|
| 237 | inc_count = body_copy.local_execute_count; | 
|---|
| 238 | ASSERT( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor"); | 
|---|
| 239 | } | 
|---|
| 240 | } | 
|---|
| 241 |  | 
|---|
| 242 | template< typename InputType, typename OutputType > | 
|---|
| 243 | void run_buffered_levels( int c ) { | 
|---|
| 244 | #if __TBB_CPP11_LAMBDAS_PRESENT | 
|---|
| 245 | buffered_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } ); | 
|---|
| 246 | #endif | 
|---|
| 247 | buffered_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::func ); | 
|---|
| 248 | buffered_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::functor() ); | 
|---|
| 249 | buffered_levels_with_copy<InputType,OutputType>( c ); | 
|---|
| 250 | } | 
|---|
| 251 |  | 
|---|
| 252 |  | 
|---|
| 253 | //! Performs test on executable nodes with limited concurrency | 
|---|
| 254 | /** These tests check: | 
|---|
| 255 | 1) that the nodes will accepts puts up to the concurrency limit, | 
|---|
| 256 | 2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor), | 
|---|
| 257 | 3) the nodes will receive puts from multiple successors simultaneously, | 
|---|
| 258 | and 4) the nodes will send to multiple predecessors. | 
|---|
| 259 | There is no checking of the contents of the messages for corruption. | 
|---|
| 260 | */ | 
|---|
| 261 |  | 
|---|
| 262 | template< typename InputType, typename OutputType, typename Body > | 
|---|
| 263 | void concurrency_levels( size_t concurrency, Body body ) { | 
|---|
| 264 |  | 
|---|
| 265 | for ( size_t lc = 1; lc <= concurrency; ++lc ) { | 
|---|
| 266 | tbb::flow::graph g; | 
|---|
| 267 |  | 
|---|
| 268 | // Set the execute_counter back to zero in the harness | 
|---|
| 269 | harness_graph_executor<InputType, OutputType>::execute_count = 0; | 
|---|
| 270 | // Set the number of current executors to zero. | 
|---|
| 271 | harness_graph_executor<InputType, OutputType>::current_executors = 0; | 
|---|
| 272 | // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. | 
|---|
| 273 | harness_graph_executor<InputType, OutputType>::max_executors = lc; | 
|---|
| 274 |  | 
|---|
| 275 | typedef tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > fnode_type; | 
|---|
| 276 | fnode_type exe_node( g, lc, body ); | 
|---|
| 277 |  | 
|---|
| 278 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { | 
|---|
| 279 |  | 
|---|
| 280 | std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g)); | 
|---|
| 281 |  | 
|---|
| 282 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION | 
|---|
| 283 | ASSERT(exe_node.successor_count() == 0, NULL); | 
|---|
| 284 | ASSERT(exe_node.predecessor_count() == 0, NULL); | 
|---|
| 285 | #endif | 
|---|
| 286 |  | 
|---|
| 287 | for (size_t r = 0; r < num_receivers; ++r ) { | 
|---|
| 288 | tbb::flow::make_edge( exe_node, receivers[r] ); | 
|---|
| 289 | } | 
|---|
| 290 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION | 
|---|
| 291 | ASSERT(exe_node.successor_count() == num_receivers, NULL); | 
|---|
| 292 | typename fnode_type::successor_list_type my_succs; | 
|---|
| 293 | exe_node.copy_successors(my_succs); | 
|---|
| 294 | ASSERT(my_succs.size() == num_receivers, NULL); | 
|---|
| 295 | typename fnode_type::predecessor_list_type my_preds; | 
|---|
| 296 | exe_node.copy_predecessors(my_preds); | 
|---|
| 297 | ASSERT(my_preds.size() == 0, NULL); | 
|---|
| 298 | #endif | 
|---|
| 299 |  | 
|---|
| 300 | harness_counting_sender<InputType> *senders = NULL; | 
|---|
| 301 |  | 
|---|
| 302 | for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { | 
|---|
| 303 | senders = new harness_counting_sender<InputType>[num_senders]; | 
|---|
| 304 | { | 
|---|
| 305 | // Exclusively lock m to prevent exe_node from finishing | 
|---|
| 306 | tbb::spin_rw_mutex::scoped_lock l( harness_graph_executor<InputType, OutputType>::template mutex_holder<tbb::spin_rw_mutex>::mutex ); | 
|---|
| 307 |  | 
|---|
| 308 | // put to lc level, it will accept and then block at m | 
|---|
| 309 | for ( size_t c = 0 ; c < lc ; ++c ) { | 
|---|
| 310 | ASSERT( exe_node.try_put( InputType() ) == true, NULL ); | 
|---|
| 311 | } | 
|---|
| 312 | // it only accepts to lc level | 
|---|
| 313 | ASSERT( exe_node.try_put( InputType() ) == false, NULL ); | 
|---|
| 314 |  | 
|---|
| 315 | for (size_t s = 0; s < num_senders; ++s ) { | 
|---|
| 316 | // register a sender | 
|---|
| 317 | senders[s].my_limit = N; | 
|---|
| 318 | exe_node.register_predecessor( senders[s] ); | 
|---|
| 319 | } | 
|---|
| 320 |  | 
|---|
| 321 | } // release lock at end of scope, setting the exe node free to continue | 
|---|
| 322 | // wait for graph to settle down | 
|---|
| 323 | g.wait_for_all(); | 
|---|
| 324 |  | 
|---|
| 325 | // confirm that each sender was requested from N times | 
|---|
| 326 | for (size_t s = 0; s < num_senders; ++s ) { | 
|---|
| 327 | size_t n = senders[s].my_received; | 
|---|
| 328 | ASSERT( n == N, NULL ); | 
|---|
| 329 | ASSERT( senders[s].my_receiver == &exe_node, NULL ); | 
|---|
| 330 | } | 
|---|
| 331 | // confirm that each receivers got N * num_senders + the initial lc puts | 
|---|
| 332 | for (size_t r = 0; r < num_receivers; ++r ) { | 
|---|
| 333 | size_t n = receivers[r].my_count; | 
|---|
| 334 | ASSERT( n == num_senders*N+lc, NULL ); | 
|---|
| 335 | receivers[r].my_count = 0; | 
|---|
| 336 | } | 
|---|
| 337 | delete [] senders; | 
|---|
| 338 | } | 
|---|
| 339 | for (size_t r = 0; r < num_receivers; ++r ) { | 
|---|
| 340 | tbb::flow::remove_edge( exe_node, receivers[r] ); | 
|---|
| 341 | } | 
|---|
| 342 | ASSERT( exe_node.try_put( InputType() ) == true, NULL ); | 
|---|
| 343 | g.wait_for_all(); | 
|---|
| 344 | for (size_t r = 0; r < num_receivers; ++r ) { | 
|---|
| 345 | ASSERT( int(receivers[r].my_count) == 0, NULL ); | 
|---|
| 346 | } | 
|---|
| 347 | } | 
|---|
| 348 | } | 
|---|
| 349 | } | 
|---|
| 350 |  | 
|---|
| 351 |  | 
|---|
| 352 | template< typename InputType, typename OutputType > | 
|---|
| 353 | void run_concurrency_levels( int c ) { | 
|---|
| 354 | #if __TBB_CPP11_LAMBDAS_PRESENT | 
|---|
| 355 | concurrency_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex>(i); } ); | 
|---|
| 356 | #endif | 
|---|
| 357 | concurrency_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex> ); | 
|---|
| 358 | concurrency_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::template tfunctor<tbb::spin_rw_mutex>() ); | 
|---|
| 359 | } | 
|---|
| 360 |  | 
|---|
| 361 |  | 
|---|
| 362 | struct empty_no_assign { | 
|---|
| 363 | empty_no_assign() {} | 
|---|
| 364 | empty_no_assign( int ) {} | 
|---|
| 365 | operator int() { return 0; } | 
|---|
| 366 | }; | 
|---|
| 367 |  | 
|---|
| 368 | template< typename InputType > | 
|---|
| 369 | struct parallel_puts : private NoAssign { | 
|---|
| 370 |  | 
|---|
| 371 | tbb::flow::receiver< InputType > * const my_exe_node; | 
|---|
| 372 |  | 
|---|
| 373 | parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} | 
|---|
| 374 |  | 
|---|
| 375 | void operator()( int ) const  { | 
|---|
| 376 | for ( int i = 0; i < N; ++i ) { | 
|---|
| 377 | // the nodes will accept all puts | 
|---|
| 378 | ASSERT( my_exe_node->try_put( InputType() ) == true, NULL ); | 
|---|
| 379 | } | 
|---|
| 380 | } | 
|---|
| 381 |  | 
|---|
| 382 | }; | 
|---|
| 383 |  | 
|---|
| 384 | //! Performs test on executable nodes with unlimited concurrency | 
|---|
| 385 | /** These tests check: | 
|---|
| 386 | 1) that the nodes will accept all puts | 
|---|
| 387 | 2) the nodes will receive puts from multiple predecessors simultaneously, | 
|---|
| 388 | and 3) the nodes will send to multiple successors. | 
|---|
| 389 | There is no checking of the contents of the messages for corruption. | 
|---|
| 390 | */ | 
|---|
| 391 |  | 
|---|
| 392 | template< typename InputType, typename OutputType, typename Body > | 
|---|
| 393 | void unlimited_concurrency( Body body ) { | 
|---|
| 394 |  | 
|---|
| 395 | for (int p = 1; p < 2*MaxThread; ++p) { | 
|---|
| 396 | tbb::flow::graph g; | 
|---|
| 397 | tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body ); | 
|---|
| 398 |  | 
|---|
| 399 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { | 
|---|
| 400 |  | 
|---|
| 401 | std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g)); | 
|---|
| 402 | harness_graph_executor<InputType, OutputType>::execute_count = 0; | 
|---|
| 403 |  | 
|---|
| 404 | for (size_t r = 0; r < num_receivers; ++r ) { | 
|---|
| 405 | tbb::flow::make_edge( exe_node, receivers[r] ); | 
|---|
| 406 | } | 
|---|
| 407 |  | 
|---|
| 408 | NativeParallelFor( p, parallel_puts<InputType>(exe_node) ); | 
|---|
| 409 | g.wait_for_all(); | 
|---|
| 410 |  | 
|---|
| 411 | // 2) the nodes will receive puts from multiple predecessors simultaneously, | 
|---|
| 412 | size_t ec = harness_graph_executor<InputType, OutputType>::execute_count; | 
|---|
| 413 | ASSERT( (int)ec == p*N, NULL ); | 
|---|
| 414 | for (size_t r = 0; r < num_receivers; ++r ) { | 
|---|
| 415 | size_t c = receivers[r].my_count; | 
|---|
| 416 | // 3) the nodes will send to multiple successors. | 
|---|
| 417 | ASSERT( (int)c == p*N, NULL ); | 
|---|
| 418 | } | 
|---|
| 419 | for (size_t r = 0; r < num_receivers; ++r ) { | 
|---|
| 420 | tbb::flow::remove_edge( exe_node, receivers[r] ); | 
|---|
| 421 | } | 
|---|
| 422 | } | 
|---|
| 423 | } | 
|---|
| 424 | } | 
|---|
| 425 |  | 
|---|
| 426 | template< typename InputType, typename OutputType > | 
|---|
| 427 | void run_unlimited_concurrency() { | 
|---|
| 428 | harness_graph_executor<InputType, OutputType>::max_executors = 0; | 
|---|
| 429 | #if __TBB_CPP11_LAMBDAS_PRESENT | 
|---|
| 430 | unlimited_concurrency<InputType,OutputType>( []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } ); | 
|---|
| 431 | #endif | 
|---|
| 432 | unlimited_concurrency<InputType,OutputType>( &harness_graph_executor<InputType, OutputType>::func ); | 
|---|
| 433 | unlimited_concurrency<InputType,OutputType>( typename harness_graph_executor<InputType, OutputType>::functor() ); | 
|---|
| 434 | } | 
|---|
| 435 |  | 
|---|
| 436 | struct continue_msg_to_int { | 
|---|
| 437 | int my_int; | 
|---|
| 438 | continue_msg_to_int(int x) : my_int(x) {} | 
|---|
| 439 | int operator()(tbb::flow::continue_msg) { return my_int; } | 
|---|
| 440 | }; | 
|---|
| 441 |  | 
|---|
| 442 | void test_function_node_with_continue_msg_as_input() { | 
|---|
| 443 | // If this function terminates, then this test is successful | 
|---|
| 444 | tbb::flow::graph g; | 
|---|
| 445 |  | 
|---|
| 446 | tbb::flow::broadcast_node<tbb::flow::continue_msg> Start(g); | 
|---|
| 447 |  | 
|---|
| 448 | tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN1( g, tbb::flow::serial, continue_msg_to_int(42)); | 
|---|
| 449 | tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN2( g, tbb::flow::serial, continue_msg_to_int(43)); | 
|---|
| 450 |  | 
|---|
| 451 | tbb::flow::make_edge( Start, FN1 ); | 
|---|
| 452 | tbb::flow::make_edge( Start, FN2 ); | 
|---|
| 453 |  | 
|---|
| 454 | Start.try_put( tbb::flow::continue_msg() ); | 
|---|
| 455 | g.wait_for_all(); | 
|---|
| 456 | } | 
|---|
| 457 |  | 
|---|
| 458 | //! Tests limited concurrency cases for nodes that accept data messages | 
|---|
| 459 | void test_concurrency(int num_threads) { | 
|---|
| 460 | tbb::task_scheduler_init init(num_threads); | 
|---|
| 461 | run_concurrency_levels<int,int>(num_threads); | 
|---|
| 462 | run_concurrency_levels<int,tbb::flow::continue_msg>(num_threads); | 
|---|
| 463 | run_buffered_levels<int, int>(num_threads); | 
|---|
| 464 | run_unlimited_concurrency<int,int>(); | 
|---|
| 465 | run_unlimited_concurrency<int,empty_no_assign>(); | 
|---|
| 466 | run_unlimited_concurrency<empty_no_assign,int>(); | 
|---|
| 467 | run_unlimited_concurrency<empty_no_assign,empty_no_assign>(); | 
|---|
| 468 | run_unlimited_concurrency<int,tbb::flow::continue_msg>(); | 
|---|
| 469 | run_unlimited_concurrency<empty_no_assign,tbb::flow::continue_msg>(); | 
|---|
| 470 | test_function_node_with_continue_msg_as_input(); | 
|---|
| 471 | } | 
|---|
| 472 |  | 
|---|
| 473 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION | 
|---|
| 474 | struct add_to_counter { | 
|---|
| 475 | int* counter; | 
|---|
| 476 | add_to_counter(int& var):counter(&var){} | 
|---|
| 477 | int operator()(int i){*counter+=1; return i + 1;} | 
|---|
| 478 | }; | 
|---|
| 479 |  | 
|---|
| 480 | template<typename FTYPE> | 
|---|
| 481 | void test_extract() { | 
|---|
| 482 | int my_count = 0; | 
|---|
| 483 | int cm; | 
|---|
| 484 | tbb::flow::graph g; | 
|---|
| 485 | tbb::flow::broadcast_node<int> b0(g); | 
|---|
| 486 | tbb::flow::broadcast_node<int> b1(g); | 
|---|
| 487 | tbb::flow::function_node<int, int, FTYPE> f0(g, tbb::flow::unlimited, add_to_counter(my_count)); | 
|---|
| 488 | tbb::flow::queue_node<int> q0(g); | 
|---|
| 489 |  | 
|---|
| 490 | tbb::flow::make_edge(b0, f0); | 
|---|
| 491 | tbb::flow::make_edge(b1, f0); | 
|---|
| 492 | tbb::flow::make_edge(f0, q0); | 
|---|
| 493 | for( int i = 0; i < 2; ++i ) { | 
|---|
| 494 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts"); | 
|---|
| 495 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts"); | 
|---|
| 496 | ASSERT(f0.predecessor_count() == 2 && f0.successor_count() == 1, "f0 has incorrect counts"); | 
|---|
| 497 | ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts"); | 
|---|
| 498 |  | 
|---|
| 499 | /* b0         */ | 
|---|
| 500 | /*   \        */ | 
|---|
| 501 | /*    f0 - q0 */ | 
|---|
| 502 | /*   /        */ | 
|---|
| 503 | /* b1         */ | 
|---|
| 504 |  | 
|---|
| 505 | b0.try_put(1); | 
|---|
| 506 | g.wait_for_all(); | 
|---|
| 507 | ASSERT(my_count == 1, "function_node didn't fire"); | 
|---|
| 508 | ASSERT(q0.try_get(cm), "function_node didn't forward"); | 
|---|
| 509 | b1.try_put(1); | 
|---|
| 510 | g.wait_for_all(); | 
|---|
| 511 | ASSERT(my_count == 2, "function_node didn't fire"); | 
|---|
| 512 | ASSERT(q0.try_get(cm), "function_node didn't forward"); | 
|---|
| 513 |  | 
|---|
| 514 | b0.extract(); | 
|---|
| 515 |  | 
|---|
| 516 | /* b0         */ | 
|---|
| 517 | /*            */ | 
|---|
| 518 | /*    f0 - q0 */ | 
|---|
| 519 | /*   /        */ | 
|---|
| 520 | /* b1         */ | 
|---|
| 521 |  | 
|---|
| 522 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts"); | 
|---|
| 523 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts"); | 
|---|
| 524 | ASSERT(f0.predecessor_count() == 1 && f0.successor_count() == 1, "f0 has incorrect counts"); | 
|---|
| 525 | ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts"); | 
|---|
| 526 | b0.try_put(1); | 
|---|
| 527 | b0.try_put(1); | 
|---|
| 528 | g.wait_for_all(); | 
|---|
| 529 | ASSERT(my_count == 2, "b0 messages being forwarded to function_node even though it is disconnected"); | 
|---|
| 530 | b1.try_put(1); | 
|---|
| 531 | g.wait_for_all(); | 
|---|
| 532 | ASSERT(my_count == 3, "function_node didn't fire though it has only one predecessor"); | 
|---|
| 533 | ASSERT(q0.try_get(cm), "function_node didn't forward second time"); | 
|---|
| 534 |  | 
|---|
| 535 | f0.extract(); | 
|---|
| 536 |  | 
|---|
| 537 | /* b0         */ | 
|---|
| 538 | /*            */ | 
|---|
| 539 | /*    f0   q0 */ | 
|---|
| 540 | /*            */ | 
|---|
| 541 | /* b1         */ | 
|---|
| 542 |  | 
|---|
| 543 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts"); | 
|---|
| 544 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts"); | 
|---|
| 545 | ASSERT(f0.predecessor_count() == 0 && f0.successor_count() == 0, "f0 has incorrect counts"); | 
|---|
| 546 | ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts"); | 
|---|
| 547 | b0.try_put(1); | 
|---|
| 548 | b0.try_put(1); | 
|---|
| 549 | b1.try_put(1); | 
|---|
| 550 | b1.try_put(1); | 
|---|
| 551 | g.wait_for_all(); | 
|---|
| 552 | ASSERT(my_count == 3, "function_node didn't fire though it has only one predecessor"); | 
|---|
| 553 | ASSERT(!q0.try_get(cm), "function_node forwarded though it shouldn't"); | 
|---|
| 554 | make_edge(b0, f0); | 
|---|
| 555 |  | 
|---|
| 556 | /* b0         */ | 
|---|
| 557 | /*   \        */ | 
|---|
| 558 | /*    f0   q0 */ | 
|---|
| 559 | /*            */ | 
|---|
| 560 | /* b1         */ | 
|---|
| 561 |  | 
|---|
| 562 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts"); | 
|---|
| 563 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts"); | 
|---|
| 564 | ASSERT(f0.predecessor_count() == 1 && f0.successor_count() == 0, "f0 has incorrect counts"); | 
|---|
| 565 | ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts"); | 
|---|
| 566 |  | 
|---|
| 567 | b0.try_put(int()); | 
|---|
| 568 | g.wait_for_all(); | 
|---|
| 569 |  | 
|---|
| 570 | ASSERT(my_count == 4, "function_node didn't fire though it has only one predecessor"); | 
|---|
| 571 | ASSERT(!q0.try_get(cm), "function_node forwarded though it shouldn't"); | 
|---|
| 572 |  | 
|---|
| 573 | tbb::flow::make_edge(b1, f0); | 
|---|
| 574 | tbb::flow::make_edge(f0, q0); | 
|---|
| 575 | my_count = 0; | 
|---|
| 576 | } | 
|---|
| 577 | } | 
|---|
| 578 | #endif | 
|---|
| 579 |  | 
|---|
| 580 | int TestMain() { | 
|---|
| 581 | if( MinThread<1 ) { | 
|---|
| 582 | REPORT( "number of threads must be positive\n"); | 
|---|
| 583 | exit(1); | 
|---|
| 584 | } | 
|---|
| 585 | for( int p=MinThread; p<=MaxThread; ++p ) { | 
|---|
| 586 | test_concurrency(p); | 
|---|
| 587 | } | 
|---|
| 588 | lightweight_testing::test<tbb::flow::function_node>(10); | 
|---|
| 589 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION | 
|---|
| 590 | test_extract<tbb::flow::rejecting>(); | 
|---|
| 591 | test_extract<tbb::flow::queueing>(); | 
|---|
| 592 | #endif | 
|---|
| 593 | return Harness::Done; | 
|---|
| 594 | } | 
|---|
| 595 |  | 
|---|
| 596 |  | 
|---|