| 1 | /* |
| 2 | Copyright (c) 2005-2019 Intel Corporation |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | #include "harness_graph.h" |
| 18 | #include "harness_barrier.h" |
| 19 | #include "tbb/flow_graph.h" |
| 20 | #include "tbb/task_scheduler_init.h" |
| 21 | |
| 22 | const int T = 4; |
| 23 | const int W = 4; |
| 24 | |
| 25 | struct decrement_wait : NoAssign { |
| 26 | |
| 27 | tbb::flow::graph * const my_graph; |
| 28 | bool * const my_done_flag; |
| 29 | |
| 30 | decrement_wait( tbb::flow::graph &h, bool *done_flag ) : my_graph(&h), my_done_flag(done_flag) {} |
| 31 | |
| 32 | void operator()(int i) const { |
| 33 | Harness::Sleep(10*i); |
| 34 | my_done_flag[i] = true; |
| 35 | my_graph->decrement_wait_count(); |
| 36 | } |
| 37 | }; |
| 38 | |
| 39 | static void test_wait_count() { |
| 40 | tbb::flow::graph h; |
| 41 | for (int i = 0; i < T; ++i ) { |
| 42 | bool done_flag[W]; |
| 43 | for (int j = 0; j < W; ++j ) { |
| 44 | for ( int w = 0; w < W; ++w ) done_flag[w] = false; |
| 45 | for ( int w = 0; w < j; ++w ) h.increment_wait_count(); |
| 46 | |
| 47 | NativeParallelFor( j, decrement_wait(h, done_flag) ); |
| 48 | h.wait_for_all(); |
| 49 | for ( int w = 0; w < W; ++w ) { |
| 50 | if ( w < j ) ASSERT( done_flag[w] == true, NULL ); |
| 51 | else ASSERT( done_flag[w] == false, NULL ); |
| 52 | } |
| 53 | } |
| 54 | } |
| 55 | } |
| 56 | |
| 57 | const int F = 100; |
| 58 | |
| 59 | #if __TBB_CPP11_LAMBDAS_PRESENT |
| 60 | bool lambda_flag[F]; |
| 61 | #endif |
| 62 | bool functor_flag[F]; |
| 63 | |
| 64 | struct set_functor { |
| 65 | int my_i; |
| 66 | set_functor( int i ) : my_i(i) {} |
| 67 | void operator()() { functor_flag[my_i] = true; } |
| 68 | }; |
| 69 | |
| 70 | struct return_functor { |
| 71 | int my_i; |
| 72 | return_functor( int i ) : my_i(i) {} |
| 73 | int operator()() { return my_i; } |
| 74 | }; |
| 75 | |
| 76 | static void test_run() { |
| 77 | tbb::flow::graph h; |
| 78 | for (int i = 0; i < T; ++i ) { |
| 79 | |
| 80 | // Create receivers and flag arrays |
| 81 | #if __TBB_CPP11_LAMBDAS_PRESENT |
| 82 | harness_mapped_receiver<int> lambda_r(h); |
| 83 | lambda_r.initialize_map( F, 1 ); |
| 84 | #endif |
| 85 | harness_mapped_receiver<int> functor_r(h); |
| 86 | functor_r.initialize_map( F, 1 ); |
| 87 | |
| 88 | // Initialize flag arrays |
| 89 | for (int j = 0; j < F; ++j ) { |
| 90 | #if __TBB_CPP11_LAMBDAS_PRESENT |
| 91 | lambda_flag[j] = false; |
| 92 | #endif |
| 93 | functor_flag[j] = false; |
| 94 | } |
| 95 | |
| 96 | for ( int j = 0; j < F; ++j ) { |
| 97 | #if __TBB_CPP11_LAMBDAS_PRESENT |
| 98 | h.run( [=]() { lambda_flag[j] = true; } ); |
| 99 | h.run( lambda_r, [=]() { return j; } ); |
| 100 | #endif |
| 101 | h.run( set_functor(j) ); |
| 102 | h.run( functor_r, return_functor(j) ); |
| 103 | } |
| 104 | h.wait_for_all(); |
| 105 | for ( int j = 0; j < F; ++j ) { |
| 106 | #if __TBB_CPP11_LAMBDAS_PRESENT |
| 107 | ASSERT( lambda_flag[i] == true, NULL ); |
| 108 | #endif |
| 109 | ASSERT( functor_flag[i] == true, NULL ); |
| 110 | } |
| 111 | #if __TBB_CPP11_LAMBDAS_PRESENT |
| 112 | lambda_r.validate(); |
| 113 | #endif |
| 114 | functor_r.validate(); |
| 115 | } |
| 116 | } |
| 117 | |
| 118 | // Encapsulate object we want to store in vector (because contained type must have |
| 119 | // copy constructor and assignment operator |
| 120 | class my_int_buffer { |
| 121 | tbb::flow::buffer_node<int> *b; |
| 122 | tbb::flow::graph& my_graph; |
| 123 | public: |
| 124 | my_int_buffer(tbb::flow::graph &g) : my_graph(g) { b = new tbb::flow::buffer_node<int>(my_graph); } |
| 125 | my_int_buffer(const my_int_buffer& other) : my_graph(other.my_graph) { |
| 126 | b = new tbb::flow::buffer_node<int>(my_graph); |
| 127 | } |
| 128 | ~my_int_buffer() { delete b; } |
| 129 | my_int_buffer& operator=(const my_int_buffer& /*other*/) { |
| 130 | return *this; |
| 131 | } |
| 132 | }; |
| 133 | |
| 134 | // test the graph iterator, delete nodes from graph, test again |
| 135 | void test_iterator() { |
| 136 | tbb::flow::graph g; |
| 137 | my_int_buffer a_buffer(g); |
| 138 | my_int_buffer b_buffer(g); |
| 139 | my_int_buffer c_buffer(g); |
| 140 | my_int_buffer *d_buffer = new my_int_buffer(g); |
| 141 | my_int_buffer e_buffer(g); |
| 142 | std::vector< my_int_buffer > my_buffer_vector(10, c_buffer); |
| 143 | |
| 144 | int count = 0; |
| 145 | for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) { |
| 146 | count++; |
| 147 | } |
| 148 | ASSERT(count==15, "error in iterator count" ); |
| 149 | |
| 150 | delete d_buffer; |
| 151 | |
| 152 | count = 0; |
| 153 | for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) { |
| 154 | count++; |
| 155 | } |
| 156 | ASSERT(count==14, "error in iterator count" ); |
| 157 | |
| 158 | my_buffer_vector.clear(); |
| 159 | |
| 160 | count = 0; |
| 161 | for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) { |
| 162 | count++; |
| 163 | } |
| 164 | ASSERT(count==4, "error in iterator count" ); |
| 165 | } |
| 166 | |
| 167 | class AddRemoveBody : NoAssign { |
| 168 | tbb::flow::graph& g; |
| 169 | int nThreads; |
| 170 | Harness::SpinBarrier &barrier; |
| 171 | public: |
| 172 | AddRemoveBody(int nthr, Harness::SpinBarrier &barrier_, tbb::flow::graph& _g) : |
| 173 | g(_g), nThreads(nthr), barrier(barrier_) |
| 174 | {} |
| 175 | void operator()(const int /*threadID*/) const { |
| 176 | my_int_buffer b(g); |
| 177 | { |
| 178 | std::vector<my_int_buffer> my_buffer_vector(100, b); |
| 179 | barrier.wait(); // wait until all nodes are created |
| 180 | // now test that the proper number of nodes were created |
| 181 | int count = 0; |
| 182 | for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) { |
| 183 | count++; |
| 184 | } |
| 185 | ASSERT(count==101*nThreads, "error in iterator count" ); |
| 186 | barrier.wait(); // wait until all threads are done counting |
| 187 | } // all nodes but for the initial node on this thread are deleted |
| 188 | barrier.wait(); // wait until all threads have deleted all nodes in their vectors |
| 189 | // now test that all the nodes were deleted except for the initial node |
| 190 | int count = 0; |
| 191 | for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) { |
| 192 | count++; |
| 193 | } |
| 194 | ASSERT(count==nThreads, "error in iterator count" ); |
| 195 | barrier.wait(); // wait until all threads are done counting |
| 196 | } // initial node gets deleted |
| 197 | }; |
| 198 | |
| 199 | void test_parallel(int nThreads) { |
| 200 | tbb::flow::graph g; |
| 201 | Harness::SpinBarrier barrier(nThreads); |
| 202 | AddRemoveBody body(nThreads, barrier, g); |
| 203 | NativeParallelFor(nThreads, body); |
| 204 | } |
| 205 | |
| 206 | /* |
| 207 | * Functors for graph arena spawn tests |
| 208 | */ |
| 209 | |
| 210 | inline void check_arena(tbb::task_arena* a) { |
| 211 | ASSERT(a->max_concurrency() == 2, NULL); |
| 212 | ASSERT(tbb::this_task_arena::max_concurrency() == 1, NULL); |
| 213 | } |
| 214 | |
| 215 | struct run_functor { |
| 216 | tbb::task_arena* my_a; |
| 217 | int return_value; |
| 218 | run_functor(tbb::task_arena* a) : my_a(a), return_value(1) {} |
| 219 | int operator()() { |
| 220 | check_arena(my_a); |
| 221 | return return_value; |
| 222 | } |
| 223 | }; |
| 224 | |
| 225 | template < typename T > |
| 226 | struct function_body { |
| 227 | tbb::task_arena* my_a; |
| 228 | function_body(tbb::task_arena* a) : my_a(a) {} |
| 229 | tbb::flow::continue_msg operator()(const T& /*arg*/) { |
| 230 | check_arena(my_a); |
| 231 | return tbb::flow::continue_msg(); |
| 232 | } |
| 233 | }; |
| 234 | |
| 235 | typedef tbb::flow::multifunction_node< int, tbb::flow::tuple< int > > mf_node; |
| 236 | |
| 237 | struct multifunction_body { |
| 238 | tbb::task_arena* my_a; |
| 239 | multifunction_body(tbb::task_arena* a) : my_a(a) {} |
| 240 | void operator()(const int& /*arg*/, mf_node::output_ports_type& /*outports*/) { |
| 241 | check_arena(my_a); |
| 242 | } |
| 243 | }; |
| 244 | |
| 245 | struct source_body { |
| 246 | tbb::task_arena* my_a; |
| 247 | int counter; |
| 248 | source_body(tbb::task_arena* a) : my_a(a), counter(0) {} |
| 249 | bool operator()(const int& /*i*/) { |
| 250 | check_arena(my_a); |
| 251 | if (counter < 1) { |
| 252 | ++counter; |
| 253 | return true; |
| 254 | } |
| 255 | return false; |
| 256 | } |
| 257 | }; |
| 258 | |
| 259 | struct run_test_functor : tbb::internal::no_assign { |
| 260 | tbb::task_arena* fg_arena; |
| 261 | tbb::flow::graph& my_graph; |
| 262 | |
| 263 | run_test_functor(tbb::task_arena* a, tbb::flow::graph& g) : fg_arena(a), my_graph(g) {} |
| 264 | void operator()() const { |
| 265 | harness_mapped_receiver<int> functor_r(my_graph); |
| 266 | functor_r.initialize_map(F, 1); |
| 267 | |
| 268 | my_graph.run(run_functor(fg_arena)); |
| 269 | my_graph.run(functor_r, run_functor(fg_arena)); |
| 270 | |
| 271 | my_graph.wait_for_all(); |
| 272 | } |
| 273 | }; |
| 274 | |
| 275 | struct nodes_test_functor : tbb::internal::no_assign { |
| 276 | tbb::task_arena* fg_arena; |
| 277 | tbb::flow::graph& my_graph; |
| 278 | |
| 279 | nodes_test_functor(tbb::task_arena* a, tbb::flow::graph& g) : fg_arena(a), my_graph(g) {} |
| 280 | void operator()() const { |
| 281 | |
| 282 | // Define test nodes |
| 283 | // Continue, function, source nodes |
| 284 | tbb::flow::continue_node< tbb::flow::continue_msg > c_n(my_graph, function_body<tbb::flow::continue_msg>(fg_arena)); |
| 285 | tbb::flow::function_node< int > f_n(my_graph, tbb::flow::unlimited, function_body<int>(fg_arena)); |
| 286 | tbb::flow::source_node< int > s_n(my_graph, source_body(fg_arena), false); |
| 287 | |
| 288 | // Multifunction node |
| 289 | mf_node m_n(my_graph, tbb::flow::unlimited, multifunction_body(fg_arena)); |
| 290 | |
| 291 | // Join node |
| 292 | tbb::flow::function_node< tbb::flow::tuple< int, int > > join_f_n(my_graph, tbb::flow::unlimited, function_body< tbb::flow::tuple< int, int > >(fg_arena)); |
| 293 | tbb::flow::join_node< tbb::flow::tuple< int, int > > j_n(my_graph); |
| 294 | make_edge(j_n, join_f_n); |
| 295 | |
| 296 | // Split node |
| 297 | tbb::flow::function_node< int > split_f_n1 = f_n; |
| 298 | tbb::flow::function_node< int > split_f_n2 = f_n; |
| 299 | tbb::flow::split_node< tbb::flow::tuple< int, int > > sp_n(my_graph); |
| 300 | make_edge(tbb::flow::output_port<0>(sp_n), split_f_n1); |
| 301 | make_edge(tbb::flow::output_port<1>(sp_n), split_f_n2); |
| 302 | |
| 303 | // Overwrite node |
| 304 | tbb::flow::function_node< int > ow_f_n = f_n; |
| 305 | tbb::flow::overwrite_node< int > ow_n(my_graph); |
| 306 | make_edge(ow_n, ow_f_n); |
| 307 | |
| 308 | // Write once node |
| 309 | tbb::flow::function_node< int > w_f_n = f_n; |
| 310 | tbb::flow::write_once_node< int > w_n(my_graph); |
| 311 | make_edge(w_n, w_f_n); |
| 312 | |
| 313 | // Buffer node |
| 314 | tbb::flow::function_node< int > buf_f_n = f_n; |
| 315 | tbb::flow::buffer_node< int > buf_n(my_graph); |
| 316 | make_edge(w_n, buf_f_n); |
| 317 | |
| 318 | // Limiter node |
| 319 | tbb::flow::function_node< int > l_f_n = f_n; |
| 320 | tbb::flow::limiter_node< int > l_n(my_graph, 1); |
| 321 | make_edge(l_n, l_f_n); |
| 322 | |
| 323 | // Execute nodes |
| 324 | c_n.try_put( tbb::flow::continue_msg() ); |
| 325 | f_n.try_put(1); |
| 326 | m_n.try_put(1); |
| 327 | s_n.activate(); |
| 328 | |
| 329 | tbb::flow::input_port<0>(j_n).try_put(1); |
| 330 | tbb::flow::input_port<1>(j_n).try_put(1); |
| 331 | |
| 332 | tbb::flow::tuple< int, int > sp_tuple(1, 1); |
| 333 | sp_n.try_put(sp_tuple); |
| 334 | |
| 335 | ow_n.try_put(1); |
| 336 | w_n.try_put(1); |
| 337 | buf_n.try_put(1); |
| 338 | l_n.try_put(1); |
| 339 | |
| 340 | my_graph.wait_for_all(); |
| 341 | } |
| 342 | }; |
| 343 | |
| 344 | void test_graph_arena() { |
| 345 | // There is only one thread for execution (master thread). |
| 346 | // So, if graph's tasks get spawned in different arena |
| 347 | // master thread won't be able to find them in its own arena. |
| 348 | // In this case test should hang. |
| 349 | tbb::task_scheduler_init init(1); |
| 350 | |
| 351 | tbb::flow::graph g; |
| 352 | tbb::task_arena fg_arena; |
| 353 | fg_arena.initialize(2); |
| 354 | fg_arena.execute(run_test_functor(&fg_arena, g)); |
| 355 | fg_arena.execute(nodes_test_functor(&fg_arena, g)); |
| 356 | } |
| 357 | |
| 358 | int TestMain() { |
| 359 | if( MinThread<1 ) { |
| 360 | REPORT("number of threads must be positive\n" ); |
| 361 | exit(1); |
| 362 | } |
| 363 | for( int p=MinThread; p<=MaxThread; ++p ) { |
| 364 | tbb::task_scheduler_init init(p); |
| 365 | test_wait_count(); |
| 366 | test_run(); |
| 367 | test_iterator(); |
| 368 | test_parallel(p); |
| 369 | } |
| 370 | test_graph_arena(); |
| 371 | return Harness::Done; |
| 372 | } |
| 373 | |