| 1 | /* |
| 2 | Copyright (c) 2005-2019 Intel Corporation |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | #if __TBB_CPF_BUILD |
| 18 | #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1 |
| 19 | #endif |
| 20 | |
| 21 | #include "harness_graph.h" |
| 22 | |
| 23 | #include "tbb/flow_graph.h" |
| 24 | #include "tbb/task_scheduler_init.h" |
| 25 | |
| 26 | #define N 1000 |
| 27 | #define MAX_NODES 4 |
| 28 | #define C 8 |
| 29 | |
| 30 | struct empty_no_assign : private NoAssign { |
| 31 | empty_no_assign() {} |
| 32 | empty_no_assign( int ) {} |
| 33 | operator int() { return 0; } |
| 34 | }; |
| 35 | |
| 36 | // A class to use as a fake predecessor of continue_node |
| 37 | struct fake_continue_sender : public tbb::flow::sender<tbb::flow::continue_msg> |
| 38 | { |
| 39 | typedef tbb::flow::sender<tbb::flow::continue_msg>::successor_type successor_type; |
| 40 | // Define implementations of virtual methods that are abstract in the base class |
| 41 | bool register_successor( successor_type& ) __TBB_override { return false; } |
| 42 | bool remove_successor( successor_type& ) __TBB_override { return false; } |
| 43 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
| 44 | typedef tbb::flow::sender<tbb::flow::continue_msg>::built_successors_type built_successors_type; |
| 45 | built_successors_type bst; |
| 46 | built_successors_type &built_successors() __TBB_override { return bst; } |
| 47 | void internal_add_built_successor( successor_type &) __TBB_override { } |
| 48 | void internal_delete_built_successor( successor_type &) __TBB_override { } |
| 49 | void copy_successors(successor_list_type &) __TBB_override {} |
| 50 | size_t successor_count() __TBB_override {return 0;} |
| 51 | #endif |
| 52 | }; |
| 53 | |
| 54 | template< typename InputType > |
| 55 | struct parallel_puts : private NoAssign { |
| 56 | |
| 57 | tbb::flow::receiver< InputType > * const my_exe_node; |
| 58 | |
| 59 | parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} |
| 60 | |
| 61 | void operator()( int ) const { |
| 62 | for ( int i = 0; i < N; ++i ) { |
| 63 | // the nodes will accept all puts |
| 64 | ASSERT( my_exe_node->try_put( InputType() ) == true, NULL ); |
| 65 | } |
| 66 | } |
| 67 | |
| 68 | }; |
| 69 | |
| 70 | template< typename OutputType > |
| 71 | void run_continue_nodes( int p, tbb::flow::graph& g, tbb::flow::continue_node< OutputType >& n ) { |
| 72 | fake_continue_sender fake_sender; |
| 73 | for (size_t i = 0; i < N; ++i) { |
| 74 | n.register_predecessor( fake_sender ); |
| 75 | } |
| 76 | |
| 77 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { |
| 78 | std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g)); |
| 79 | harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count = 0; |
| 80 | |
| 81 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 82 | tbb::flow::make_edge( n, receivers[r] ); |
| 83 | } |
| 84 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
| 85 | ASSERT(n.successor_count() == (size_t)num_receivers, NULL); |
| 86 | ASSERT(n.predecessor_count() == 0, NULL); |
| 87 | typename tbb::flow::continue_node<OutputType>::successor_list_type my_succs; |
| 88 | typedef typename tbb::flow::continue_node<OutputType>::successor_list_type::iterator sv_iter_type; |
| 89 | n.copy_successors(my_succs); |
| 90 | ASSERT(my_succs.size() == num_receivers, NULL); |
| 91 | #endif |
| 92 | |
| 93 | NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(n) ); |
| 94 | g.wait_for_all(); |
| 95 | |
| 96 | // 2) the nodes will receive puts from multiple predecessors simultaneously, |
| 97 | size_t ec = harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count; |
| 98 | ASSERT( (int)ec == p, NULL ); |
| 99 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 100 | size_t c = receivers[r].my_count; |
| 101 | // 3) the nodes will send to multiple successors. |
| 102 | ASSERT( (int)c == p, NULL ); |
| 103 | } |
| 104 | |
| 105 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
| 106 | for(sv_iter_type si=my_succs.begin(); si != my_succs.end(); ++si) { |
| 107 | tbb::flow::remove_edge( n, **si ); |
| 108 | } |
| 109 | #else |
| 110 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 111 | tbb::flow::remove_edge( n, receivers[r] ); |
| 112 | } |
| 113 | #endif |
| 114 | } |
| 115 | } |
| 116 | |
| 117 | template< typename OutputType, typename Body > |
| 118 | void continue_nodes( Body body ) { |
| 119 | for (int p = 1; p < 2*MaxThread; ++p) { |
| 120 | tbb::flow::graph g; |
| 121 | tbb::flow::continue_node< OutputType > exe_node( g, body ); |
| 122 | run_continue_nodes( p, g, exe_node); |
| 123 | exe_node.try_put(tbb::flow::continue_msg()); |
| 124 | tbb::flow::continue_node< OutputType > exe_node_copy( exe_node ); |
| 125 | run_continue_nodes( p, g, exe_node_copy); |
| 126 | } |
| 127 | } |
| 128 | |
| 129 | const size_t Offset = 123; |
| 130 | tbb::atomic<size_t> global_execute_count; |
| 131 | |
| 132 | template< typename OutputType > |
| 133 | struct inc_functor { |
| 134 | |
| 135 | tbb::atomic<size_t> local_execute_count; |
| 136 | inc_functor( ) { local_execute_count = 0; } |
| 137 | inc_functor( const inc_functor &f ) { local_execute_count = f.local_execute_count; } |
| 138 | void operator=(const inc_functor &f) { local_execute_count = f.local_execute_count; } |
| 139 | |
| 140 | OutputType operator()( tbb::flow::continue_msg ) { |
| 141 | ++global_execute_count; |
| 142 | ++local_execute_count; |
| 143 | return OutputType(); |
| 144 | } |
| 145 | |
| 146 | }; |
| 147 | |
| 148 | template< typename OutputType > |
| 149 | void continue_nodes_with_copy( ) { |
| 150 | |
| 151 | for (int p = 1; p < 2*MaxThread; ++p) { |
| 152 | tbb::flow::graph g; |
| 153 | inc_functor<OutputType> cf; |
| 154 | cf.local_execute_count = Offset; |
| 155 | global_execute_count = Offset; |
| 156 | |
| 157 | tbb::flow::continue_node< OutputType > exe_node( g, cf ); |
| 158 | fake_continue_sender fake_sender; |
| 159 | for (size_t i = 0; i < N; ++i) { |
| 160 | exe_node.register_predecessor( fake_sender ); |
| 161 | } |
| 162 | |
| 163 | for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { |
| 164 | std::vector< harness_counting_receiver<OutputType> > receivers(num_receivers, harness_counting_receiver<OutputType>(g)); |
| 165 | |
| 166 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 167 | tbb::flow::make_edge( exe_node, receivers[r] ); |
| 168 | } |
| 169 | |
| 170 | NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(exe_node) ); |
| 171 | g.wait_for_all(); |
| 172 | |
| 173 | // 2) the nodes will receive puts from multiple predecessors simultaneously, |
| 174 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 175 | size_t c = receivers[r].my_count; |
| 176 | // 3) the nodes will send to multiple successors. |
| 177 | ASSERT( (int)c == p, NULL ); |
| 178 | } |
| 179 | for (size_t r = 0; r < num_receivers; ++r ) { |
| 180 | tbb::flow::remove_edge( exe_node, receivers[r] ); |
| 181 | } |
| 182 | } |
| 183 | |
| 184 | // validate that the local body matches the global execute_count and both are correct |
| 185 | inc_functor<OutputType> body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node ); |
| 186 | const size_t expected_count = p*MAX_NODES + Offset; |
| 187 | size_t global_count = global_execute_count; |
| 188 | size_t inc_count = body_copy.local_execute_count; |
| 189 | ASSERT( global_count == expected_count && global_count == inc_count, NULL ); |
| 190 | g.reset(tbb::flow::rf_reset_bodies); |
| 191 | body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node ); |
| 192 | inc_count = body_copy.local_execute_count; |
| 193 | ASSERT( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor" ); |
| 194 | |
| 195 | } |
| 196 | } |
| 197 | |
| 198 | template< typename OutputType > |
| 199 | void run_continue_nodes() { |
| 200 | harness_graph_executor< tbb::flow::continue_msg, OutputType>::max_executors = 0; |
| 201 | #if __TBB_CPP11_LAMBDAS_PRESENT |
| 202 | continue_nodes<OutputType>( []( tbb::flow::continue_msg i ) -> OutputType { return harness_graph_executor<tbb::flow::continue_msg, OutputType>::func(i); } ); |
| 203 | #endif |
| 204 | continue_nodes<OutputType>( &harness_graph_executor<tbb::flow::continue_msg, OutputType>::func ); |
| 205 | continue_nodes<OutputType>( typename harness_graph_executor<tbb::flow::continue_msg, OutputType>::functor() ); |
| 206 | continue_nodes_with_copy<OutputType>(); |
| 207 | } |
| 208 | |
| 209 | //! Tests limited concurrency cases for nodes that accept data messages |
| 210 | void test_concurrency(int num_threads) { |
| 211 | tbb::task_scheduler_init init(num_threads); |
| 212 | run_continue_nodes<tbb::flow::continue_msg>(); |
| 213 | run_continue_nodes<int>(); |
| 214 | run_continue_nodes<empty_no_assign>(); |
| 215 | } |
| 216 | /* |
| 217 | * Connection of two graphs is not currently supported, but works to some limited extent. |
| 218 | * This test is included to check for backward compatibility. It checks that a continue_node |
| 219 | * with predecessors in two different graphs receives the required |
| 220 | * number of continue messages before it executes. |
| 221 | */ |
| 222 | using namespace tbb::flow; |
| 223 | |
| 224 | struct add_to_counter { |
| 225 | int* counter; |
| 226 | add_to_counter(int& var):counter(&var){} |
| 227 | void operator()(continue_msg){*counter+=1;} |
| 228 | }; |
| 229 | |
| 230 | void test_two_graphs(){ |
| 231 | int count=0; |
| 232 | |
| 233 | //graph g with broadcast_node and continue_node |
| 234 | graph g; |
| 235 | broadcast_node<continue_msg> start_g(g); |
| 236 | continue_node<continue_msg> first_g(g, add_to_counter(count)); |
| 237 | |
| 238 | //graph h with broadcast_node |
| 239 | graph h; |
| 240 | broadcast_node<continue_msg> start_h(h); |
| 241 | |
| 242 | //making two edges to first_g from the two graphs |
| 243 | make_edge(start_g,first_g); |
| 244 | make_edge(start_h, first_g); |
| 245 | |
| 246 | //two try_puts from the two graphs |
| 247 | start_g.try_put(continue_msg()); |
| 248 | start_h.try_put(continue_msg()); |
| 249 | g.wait_for_all(); |
| 250 | ASSERT(count==1, "Not all continue messages received" ); |
| 251 | |
| 252 | //two try_puts from the graph that doesn't contain the node |
| 253 | count=0; |
| 254 | start_h.try_put(continue_msg()); |
| 255 | start_h.try_put(continue_msg()); |
| 256 | g.wait_for_all(); |
| 257 | ASSERT(count==1, "Not all continue messages received -1" ); |
| 258 | |
| 259 | //only one try_put |
| 260 | count=0; |
| 261 | start_g.try_put(continue_msg()); |
| 262 | g.wait_for_all(); |
| 263 | ASSERT(count==0, "Node executed without waiting for all predecessors" ); |
| 264 | } |
| 265 | |
| 266 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
| 267 | void test_extract() { |
| 268 | int my_count = 0; |
| 269 | tbb::flow::continue_msg cm; |
| 270 | tbb::flow::graph g; |
| 271 | tbb::flow::broadcast_node<tbb::flow::continue_msg> b0(g); |
| 272 | tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g); |
| 273 | tbb::flow::continue_node<tbb::flow::continue_msg> c0(g, add_to_counter(my_count)); |
| 274 | tbb::flow::queue_node<tbb::flow::continue_msg> q0(g); |
| 275 | |
| 276 | tbb::flow::make_edge(b0, c0); |
| 277 | tbb::flow::make_edge(b1, c0); |
| 278 | tbb::flow::make_edge(c0, q0); |
| 279 | for( int i = 0; i < 2; ++i ) { |
| 280 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts" ); |
| 281 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts" ); |
| 282 | ASSERT(c0.predecessor_count() == 2 && c0.successor_count() == 1, "c0 has incorrect counts" ); |
| 283 | ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
| 284 | |
| 285 | /* b0 */ |
| 286 | /* \ */ |
| 287 | /* c0 - q0 */ |
| 288 | /* / */ |
| 289 | /* b1 */ |
| 290 | |
| 291 | b0.try_put(tbb::flow::continue_msg()); |
| 292 | g.wait_for_all(); |
| 293 | ASSERT(my_count == 0, "continue_node fired too soon" ); |
| 294 | b1.try_put(tbb::flow::continue_msg()); |
| 295 | g.wait_for_all(); |
| 296 | ASSERT(my_count == 1, "continue_node didn't fire" ); |
| 297 | ASSERT(q0.try_get(cm), "continue_node didn't forward" ); |
| 298 | |
| 299 | b0.extract(); |
| 300 | |
| 301 | /* b0 */ |
| 302 | /* */ |
| 303 | /* c0 - q0 */ |
| 304 | /* / */ |
| 305 | /* b1 */ |
| 306 | |
| 307 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts" ); |
| 308 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 1, "b1 has incorrect counts" ); |
| 309 | ASSERT(c0.predecessor_count() == 1 && c0.successor_count() == 1, "c0 has incorrect counts" ); |
| 310 | ASSERT(q0.predecessor_count() == 1 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
| 311 | b0.try_put(tbb::flow::continue_msg()); |
| 312 | b0.try_put(tbb::flow::continue_msg()); |
| 313 | g.wait_for_all(); |
| 314 | ASSERT(my_count == 1, "b0 messages being forwarded to continue_node even though it is disconnected" ); |
| 315 | b1.try_put(tbb::flow::continue_msg()); |
| 316 | g.wait_for_all(); |
| 317 | ASSERT(my_count == 2, "continue_node didn't fire though it has only one predecessor" ); |
| 318 | ASSERT(q0.try_get(cm), "continue_node didn't forward second time" ); |
| 319 | |
| 320 | c0.extract(); |
| 321 | |
| 322 | /* b0 */ |
| 323 | /* */ |
| 324 | /* c0 q0 */ |
| 325 | /* */ |
| 326 | /* b1 */ |
| 327 | |
| 328 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 0, "b0 has incorrect counts" ); |
| 329 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts" ); |
| 330 | ASSERT(c0.predecessor_count() == 0 && c0.successor_count() == 0, "c0 has incorrect counts" ); |
| 331 | ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
| 332 | b0.try_put(tbb::flow::continue_msg()); |
| 333 | b0.try_put(tbb::flow::continue_msg()); |
| 334 | b1.try_put(tbb::flow::continue_msg()); |
| 335 | b1.try_put(tbb::flow::continue_msg()); |
| 336 | g.wait_for_all(); |
| 337 | ASSERT(my_count == 2, "continue didn't fire though it has only one predecessor" ); |
| 338 | ASSERT(!q0.try_get(cm), "continue_node forwarded though it shouldn't" ); |
| 339 | make_edge(b0, c0); |
| 340 | |
| 341 | /* b0 */ |
| 342 | /* \ */ |
| 343 | /* c0 q0 */ |
| 344 | /* */ |
| 345 | /* b1 */ |
| 346 | |
| 347 | ASSERT(b0.predecessor_count() == 0 && b0.successor_count() == 1, "b0 has incorrect counts" ); |
| 348 | ASSERT(b1.predecessor_count() == 0 && b1.successor_count() == 0, "b1 has incorrect counts" ); |
| 349 | ASSERT(c0.predecessor_count() == 1 && c0.successor_count() == 0, "c0 has incorrect counts" ); |
| 350 | ASSERT(q0.predecessor_count() == 0 && q0.successor_count() == 0, "q0 has incorrect counts" ); |
| 351 | |
| 352 | b0.try_put(tbb::flow::continue_msg()); |
| 353 | g.wait_for_all(); |
| 354 | |
| 355 | ASSERT(my_count == 3, "continue didn't fire though it has only one predecessor" ); |
| 356 | ASSERT(!q0.try_get(cm), "continue_node forwarded though it shouldn't" ); |
| 357 | |
| 358 | tbb::flow::make_edge(b1, c0); |
| 359 | tbb::flow::make_edge(c0, q0); |
| 360 | my_count = 0; |
| 361 | } |
| 362 | } |
| 363 | #endif |
| 364 | |
| 365 | struct lightweight_policy_body : NoAssign { |
| 366 | const tbb::tbb_thread::id my_thread_id; |
| 367 | tbb::atomic<size_t> my_count; |
| 368 | |
| 369 | lightweight_policy_body() : my_thread_id(tbb::this_tbb_thread::get_id()) { |
| 370 | my_count = 0; |
| 371 | } |
| 372 | void operator()(tbb::flow::continue_msg) { |
| 373 | ++my_count; |
| 374 | tbb::tbb_thread::id body_thread_id = tbb::this_tbb_thread::get_id(); |
| 375 | ASSERT(body_thread_id == my_thread_id, "Body executed as not lightweight" ); |
| 376 | } |
| 377 | }; |
| 378 | |
| 379 | void test_lightweight_policy() { |
| 380 | tbb::flow::graph g; |
| 381 | tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> node1(g, lightweight_policy_body()); |
| 382 | tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> node2(g, lightweight_policy_body()); |
| 383 | |
| 384 | tbb::flow::make_edge(node1, node2); |
| 385 | const size_t n = 10; |
| 386 | for(size_t i = 0; i < n; ++i) { |
| 387 | node1.try_put(tbb::flow::continue_msg()); |
| 388 | } |
| 389 | g.wait_for_all(); |
| 390 | |
| 391 | lightweight_policy_body body1 = tbb::flow::copy_body<lightweight_policy_body>(node1); |
| 392 | lightweight_policy_body body2 = tbb::flow::copy_body<lightweight_policy_body>(node2); |
| 393 | ASSERT(body1.my_count == n, "Body of the first node needs to be executed N times" ); |
| 394 | ASSERT(body2.my_count == n, "Body of the second node needs to be executed N times" ); |
| 395 | } |
| 396 | |
| 397 | int TestMain() { |
| 398 | if( MinThread<1 ) { |
| 399 | REPORT("number of threads must be positive\n" ); |
| 400 | exit(1); |
| 401 | } |
| 402 | for( int p=MinThread; p<=MaxThread; ++p ) { |
| 403 | test_concurrency(p); |
| 404 | } |
| 405 | test_two_graphs(); |
| 406 | #if __TBB_PREVIEW_LIGHTWEIGHT_POLICY |
| 407 | test_lightweight_policy(); |
| 408 | #endif |
| 409 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION |
| 410 | test_extract(); |
| 411 | #endif |
| 412 | return Harness::Done; |
| 413 | } |
| 414 | |
| 415 | |