| 1 | /* | 
| 2 |     Copyright (c) 2005-2019 Intel Corporation | 
| 3 |  | 
| 4 |     Licensed under the Apache License, Version 2.0 (the "License"); | 
| 5 |     you may not use this file except in compliance with the License. | 
| 6 |     You may obtain a copy of the License at | 
| 7 |  | 
| 8 |         http://www.apache.org/licenses/LICENSE-2.0 | 
| 9 |  | 
| 10 |     Unless required by applicable law or agreed to in writing, software | 
| 11 |     distributed under the License is distributed on an "AS IS" BASIS, | 
| 12 |     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
| 13 |     See the License for the specific language governing permissions and | 
| 14 |     limitations under the License. | 
| 15 | */ | 
| 16 |  | 
| 17 | #include "harness.h" | 
| 18 | #if __TBB_CPF_BUILD | 
| 19 | #define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1 | 
| 20 | #include "harness_graph.h" | 
| 21 | #endif | 
| 22 | #include "tbb/flow_graph.h" | 
| 23 | #include "tbb/atomic.h" | 
| 24 | #include "tbb/task_scheduler_init.h" | 
| 25 |  | 
| 26 | const int L = 10; | 
| 27 | const int N = 1000; | 
| 28 |  | 
| 29 | using tbb::flow::internal::SUCCESSFULLY_ENQUEUED; | 
| 30 |  | 
| 31 | template< typename T > | 
| 32 | struct serial_receiver : public tbb::flow::receiver<T>, NoAssign { | 
| 33 |    T next_value; | 
| 34 |    tbb::flow::graph& my_graph; | 
| 35 |  | 
| 36 |    serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {} | 
| 37 |  | 
| 38 |    tbb::task *try_put_task( const T &v ) __TBB_override { | 
| 39 |        ASSERT( next_value++  == v, NULL ); | 
| 40 |        return const_cast<tbb::task *>(SUCCESSFULLY_ENQUEUED); | 
| 41 |    } | 
| 42 |  | 
| 43 |     tbb::flow::graph& graph_reference() __TBB_override { | 
| 44 |         return my_graph; | 
| 45 |     } | 
| 46 |  | 
| 47 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION | 
| 48 |     typedef typename tbb::flow::receiver<T>::built_predecessors_type built_predecessors_type; | 
| 49 |     typedef typename tbb::flow::receiver<T>::predecessor_list_type predecessor_list_type; | 
| 50 |     typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type; | 
| 51 |     built_predecessors_type bpt; | 
| 52 |     built_predecessors_type &built_predecessors() __TBB_override { return bpt; } | 
| 53 |     void internal_add_built_predecessor( predecessor_type & ) __TBB_override { } | 
| 54 |     void internal_delete_built_predecessor( predecessor_type & ) __TBB_override { } | 
| 55 |     void copy_predecessors( predecessor_list_type & ) __TBB_override { } | 
| 56 |     size_t predecessor_count() __TBB_override { return 0; } | 
| 57 | #endif | 
| 58 |  | 
| 59 |    void reset_receiver(tbb::flow::reset_flags /*f*/) __TBB_override {next_value = T(0);} | 
| 60 | }; | 
| 61 |  | 
| 62 | template< typename T > | 
| 63 | struct parallel_receiver : public tbb::flow::receiver<T>, NoAssign { | 
| 64 |  | 
| 65 |     tbb::atomic<int> my_count; | 
| 66 |     tbb::flow::graph& my_graph; | 
| 67 |  | 
| 68 |     parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; } | 
| 69 |  | 
| 70 |     tbb::task *try_put_task( const T &/*v*/ ) __TBB_override { | 
| 71 |        ++my_count; | 
| 72 |        return const_cast<tbb::task *>(tbb::flow::internal::SUCCESSFULLY_ENQUEUED); | 
| 73 |     } | 
| 74 |  | 
| 75 |     tbb::flow::graph& graph_reference() __TBB_override { | 
| 76 |         return my_graph; | 
| 77 |     } | 
| 78 |  | 
| 79 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION | 
| 80 |     typedef typename tbb::flow::receiver<T>::built_predecessors_type built_predecessors_type; | 
| 81 |     typedef typename tbb::flow::receiver<T>::predecessor_list_type predecessor_list_type; | 
| 82 |     typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type; | 
| 83 |     built_predecessors_type bpt; | 
| 84 |     built_predecessors_type &built_predecessors() __TBB_override { return bpt; } | 
| 85 |     void internal_add_built_predecessor( predecessor_type & ) __TBB_override { } | 
| 86 |     void internal_delete_built_predecessor( predecessor_type & ) __TBB_override { } | 
| 87 |     void copy_predecessors( predecessor_list_type & ) __TBB_override { } | 
| 88 |     size_t predecessor_count( ) __TBB_override { return 0; } | 
| 89 | #endif | 
| 90 |     void reset_receiver(tbb::flow::reset_flags /*f*/) __TBB_override {my_count = 0;} | 
| 91 | }; | 
| 92 |  | 
| 93 | template< typename T > | 
| 94 | struct empty_sender : public tbb::flow::sender<T> { | 
| 95 |         typedef typename tbb::flow::sender<T>::successor_type successor_type; | 
| 96 |  | 
| 97 |         bool register_successor( successor_type & ) __TBB_override { return false; } | 
| 98 |         bool remove_successor( successor_type & ) __TBB_override { return false; } | 
| 99 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION | 
| 100 |         typedef typename tbb::flow::sender<T>::built_successors_type built_successors_type; | 
| 101 |         typedef typename tbb::flow::sender<T>::successor_list_type successor_list_type; | 
| 102 |         built_successors_type bst; | 
| 103 |         built_successors_type &built_successors() __TBB_override { return bst; } | 
| 104 |         void    internal_add_built_successor( successor_type & ) __TBB_override { } | 
| 105 |         void internal_delete_built_successor( successor_type & ) __TBB_override { } | 
| 106 |         void copy_successors( successor_list_type & ) __TBB_override { } | 
| 107 |         size_t successor_count() __TBB_override { return 0; } | 
| 108 | #endif | 
| 109 | }; | 
| 110 |  | 
| 111 |  | 
| 112 | template< typename T > | 
| 113 | struct put_body : NoAssign { | 
| 114 |  | 
| 115 |     tbb::flow::limiter_node<T> &my_lim; | 
| 116 |     tbb::atomic<int> &my_accept_count; | 
| 117 |  | 
| 118 |     put_body( tbb::flow::limiter_node<T> &lim, tbb::atomic<int> &accept_count ) : | 
| 119 |         my_lim(lim), my_accept_count(accept_count) {} | 
| 120 |  | 
| 121 |     void operator()( int ) const { | 
| 122 |         for ( int i = 0; i < L; ++i ) { | 
| 123 |             bool msg = my_lim.try_put( T(i) ); | 
| 124 |             if ( msg == true ) | 
| 125 |                ++my_accept_count; | 
| 126 |         } | 
| 127 |     } | 
| 128 | }; | 
| 129 |  | 
| 130 | template< typename T > | 
| 131 | struct put_dec_body : NoAssign { | 
| 132 |  | 
| 133 |     tbb::flow::limiter_node<T> &my_lim; | 
| 134 |     tbb::atomic<int> &my_accept_count; | 
| 135 |  | 
| 136 |     put_dec_body( tbb::flow::limiter_node<T> &lim, tbb::atomic<int> &accept_count ) : | 
| 137 |         my_lim(lim), my_accept_count(accept_count) {} | 
| 138 |  | 
| 139 |     void operator()( int ) const { | 
| 140 |         int local_accept_count = 0; | 
| 141 |         while ( local_accept_count < N ) { | 
| 142 |             bool msg = my_lim.try_put( T(local_accept_count) ); | 
| 143 |             if ( msg == true ) { | 
| 144 |                 ++local_accept_count; | 
| 145 |                 ++my_accept_count; | 
| 146 |                 my_lim.decrement.try_put( tbb::flow::continue_msg() ); | 
| 147 |             } | 
| 148 |         } | 
| 149 |     } | 
| 150 |  | 
| 151 | }; | 
| 152 |  | 
| 153 | template< typename T > | 
| 154 | void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) { | 
| 155 |     parallel_receiver<T> r(g); | 
| 156 |     empty_sender< tbb::flow::continue_msg > s; | 
| 157 |     tbb::atomic<int> accept_count; | 
| 158 |     accept_count = 0; | 
| 159 |     tbb::flow::make_edge( lim, r ); | 
| 160 |     tbb::flow::make_edge(s, lim.decrement); | 
| 161 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION | 
| 162 |     ASSERT(lim.decrement.predecessor_count() == 1, NULL); | 
| 163 |     ASSERT(lim.successor_count() == 1, NULL); | 
| 164 |     ASSERT(lim.predecessor_count() == 0, NULL); | 
| 165 |     typename tbb::flow::interface10::internal::decrementer | 
| 166 |         <tbb::flow::limiter_node<T>, tbb::flow::continue_msg>::predecessor_list_type dec_preds; | 
| 167 |     lim.decrement.copy_predecessors(dec_preds); | 
| 168 |     ASSERT(dec_preds.size() == 1, NULL); | 
| 169 | #endif | 
| 170 |     // test puts with decrements | 
| 171 |     NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) ); | 
| 172 |     int c = accept_count; | 
| 173 |     ASSERT( c == N*num_threads, NULL ); | 
| 174 |     ASSERT( r.my_count == N*num_threads, NULL ); | 
| 175 | } | 
| 176 |  | 
| 177 | // | 
| 178 | // Tests | 
| 179 | // | 
| 180 | // limiter only forwards below the limit, multiple parallel senders / single receiver | 
| 181 | // multiple parallel senders that put to decrement at each accept, limiter accepts new messages | 
| 182 | // | 
| 183 | // | 
| 184 | template< typename T > | 
| 185 | int test_parallel(int num_threads) { | 
| 186 |  | 
| 187 |    // test puts with no decrements | 
| 188 |    for ( int i = 0; i < L; ++i ) { | 
| 189 |        tbb::flow::graph g; | 
| 190 |        tbb::flow::limiter_node< T > lim(g, i); | 
| 191 |        parallel_receiver<T> r(g); | 
| 192 |        tbb::atomic<int> accept_count; | 
| 193 |        accept_count = 0; | 
| 194 |        tbb::flow::make_edge( lim, r ); | 
| 195 |        // test puts with no decrements | 
| 196 |        NativeParallelFor( num_threads, put_body<T>(lim, accept_count) ); | 
| 197 |        g.wait_for_all(); | 
| 198 |        int c = accept_count; | 
| 199 |        ASSERT( c == i, NULL ); | 
| 200 |    } | 
| 201 |  | 
| 202 |    // test puts with decrements | 
| 203 |    for ( int i = 1; i < L; ++i ) { | 
| 204 |        tbb::flow::graph g; | 
| 205 |        tbb::flow::limiter_node< T > lim(g, i); | 
| 206 |        test_puts_with_decrements(num_threads, lim, g); | 
| 207 |        tbb::flow::limiter_node< T > lim_copy( lim ); | 
| 208 |        test_puts_with_decrements(num_threads, lim_copy, g); | 
| 209 |    } | 
| 210 |  | 
| 211 |    return 0; | 
| 212 | } | 
| 213 |  | 
| 214 | // | 
| 215 | // Tests | 
| 216 | // | 
| 217 | // limiter only forwards below the limit, single sender / single receiver | 
| 218 | // at reject, a put to decrement, will cause next message to be accepted | 
| 219 | // | 
| 220 | template< typename T > | 
| 221 | int test_serial() { | 
| 222 |  | 
| 223 |    // test puts with no decrements | 
| 224 |    for ( int i = 0; i < L; ++i ) { | 
| 225 |        tbb::flow::graph g; | 
| 226 |        tbb::flow::limiter_node< T > lim(g, i); | 
| 227 |        serial_receiver<T> r(g); | 
| 228 |        tbb::flow::make_edge( lim, r ); | 
| 229 |        for ( int j = 0; j < L; ++j ) { | 
| 230 |            bool msg = lim.try_put( T(j) ); | 
| 231 |            ASSERT( ( j < i && msg == true ) || ( j >= i && msg == false ), NULL ); | 
| 232 |        } | 
| 233 |        g.wait_for_all(); | 
| 234 |    } | 
| 235 |  | 
| 236 |    // test puts with decrements | 
| 237 |    for ( int i = 1; i < L; ++i ) { | 
| 238 |        tbb::flow::graph g; | 
| 239 |        tbb::flow::limiter_node< T > lim(g, i); | 
| 240 |        serial_receiver<T> r(g); | 
| 241 |        empty_sender< tbb::flow::continue_msg > s; | 
| 242 |        tbb::flow::make_edge( lim, r ); | 
| 243 |        tbb::flow::make_edge(s, lim.decrement); | 
| 244 |        for ( int j = 0; j < N; ++j ) { | 
| 245 |            bool msg = lim.try_put( T(j) ); | 
| 246 |            ASSERT( ( j < i && msg == true ) || ( j >= i && msg == false ), NULL ); | 
| 247 |            if ( msg == false ) { | 
| 248 |                lim.decrement.try_put( tbb::flow::continue_msg() ); | 
| 249 |                msg = lim.try_put( T(j) ); | 
| 250 |                ASSERT( msg == true, NULL ); | 
| 251 |            } | 
| 252 |        } | 
| 253 |    } | 
| 254 |    return 0; | 
| 255 | } | 
| 256 |  | 
| 257 | // reported bug in limiter (http://software.intel.com/en-us/comment/1752355) | 
| 258 | #define DECREMENT_OUTPUT 1  // the port number of the decrement output of the multifunction_node | 
| 259 | #define LIMITER_OUTPUT 0    // port number of the integer output | 
| 260 |  | 
| 261 | typedef tbb::flow::multifunction_node<int, tbb::flow::tuple<int,tbb::flow::continue_msg> > mfnode_type; | 
| 262 |  | 
| 263 | tbb::atomic<size_t> emit_count; | 
| 264 | tbb::atomic<size_t> emit_sum; | 
| 265 | tbb::atomic<size_t> receive_count; | 
| 266 | tbb::atomic<size_t> receive_sum; | 
| 267 |  | 
| 268 | struct mfnode_body { | 
| 269 |     int max_cnt; | 
| 270 |     tbb::atomic<int>* my_cnt; | 
| 271 |     mfnode_body(const int& _max, tbb::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my)  { } | 
| 272 |     void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) { | 
| 273 |         int lcnt = ++(*my_cnt); | 
| 274 |         if(lcnt > max_cnt) { | 
| 275 |             return; | 
| 276 |         } | 
| 277 |         // put one continue_msg to the decrement of the limiter. | 
| 278 |         if(!tbb::flow::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) { | 
| 279 |             ASSERT(false,"Unexpected rejection of decrement" ); | 
| 280 |         } | 
| 281 |         { | 
| 282 |             // put messages to the input of the limiter_node until it rejects. | 
| 283 |             while( tbb::flow::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) { | 
| 284 |                 emit_sum += lcnt; | 
| 285 |                 ++emit_count; | 
| 286 |             } | 
| 287 |         } | 
| 288 |     } | 
| 289 | }; | 
| 290 |  | 
| 291 | struct fn_body { | 
| 292 |     int operator()(const int &in) { | 
| 293 |         receive_sum += in; | 
| 294 |         ++receive_count; | 
| 295 |         return in; | 
| 296 |     } | 
| 297 | }; | 
| 298 |  | 
| 299 | //                   +------------+ | 
| 300 | //    +---------+    |            v | 
| 301 | //    | mf_node |0---+       +----------+          +----------+ | 
| 302 | // +->|         |1---------->| lim_node |--------->| fn_node  |--+ | 
| 303 | // |  +---------+            +----------+          +----------+  | | 
| 304 | // |                                                             | | 
| 305 | // |                                                             | | 
| 306 | // +-------------------------------------------------------------+ | 
| 307 | // | 
| 308 | void | 
| 309 | test_multifunction_to_limiter(int _max, int _nparallel) { | 
| 310 |     tbb::flow::graph g; | 
| 311 |     emit_count = 0; | 
| 312 |     emit_sum = 0; | 
| 313 |     receive_count = 0; | 
| 314 |     receive_sum = 0; | 
| 315 |     tbb::atomic<int> local_cnt; | 
| 316 |     local_cnt = 0; | 
| 317 |     mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt)); | 
| 318 |     tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body()); | 
| 319 |     tbb::flow::limiter_node<int> lim_node(g, _nparallel); | 
| 320 |     tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node); | 
| 321 |     tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrement); | 
| 322 |     tbb::flow::make_edge(lim_node, fn_node); | 
| 323 |     tbb::flow::make_edge(fn_node, mf_node); | 
| 324 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION | 
| 325 |     REMARK("pred cnt == %d\n" ,(int)(lim_node.predecessor_count())); | 
| 326 |     REMARK("succ cnt == %d\n" ,(int)(lim_node.successor_count())); | 
| 327 |     tbb::flow::limiter_node<int>::successor_list_type my_succs; | 
| 328 |     lim_node.copy_successors(my_succs); | 
| 329 |     REMARK("succ cnt from vector  == %d\n" ,(int)(my_succs.size())); | 
| 330 |     tbb::flow::limiter_node<int>::predecessor_list_type my_preds; | 
| 331 |     lim_node.copy_predecessors(my_preds); | 
| 332 |     REMARK("pred cnt from vector  == %d\n" ,(int)(my_preds.size())); | 
| 333 | #endif | 
| 334 |     mf_node.try_put(1); | 
| 335 |     g.wait_for_all(); | 
| 336 |     ASSERT(emit_count == receive_count, "counts do not match" ); | 
| 337 |     ASSERT(emit_sum == receive_sum, "sums do not match" ); | 
| 338 |  | 
| 339 |     // reset, test again | 
| 340 |     g.reset(); | 
| 341 |     emit_count = 0; | 
| 342 |     emit_sum = 0; | 
| 343 |     receive_count = 0; | 
| 344 |     receive_sum = 0; | 
| 345 |     local_cnt = 0;; | 
| 346 |     mf_node.try_put(1); | 
| 347 |     g.wait_for_all(); | 
| 348 |     ASSERT(emit_count == receive_count, "counts do not match" ); | 
| 349 |     ASSERT(emit_sum == receive_sum, "sums do not match" ); | 
| 350 | } | 
| 351 |  | 
| 352 |  | 
| 353 | void | 
| 354 | test_continue_msg_reception() { | 
| 355 |     tbb::flow::graph g; | 
| 356 |     tbb::flow::limiter_node<int> ln(g,2); | 
| 357 |     tbb::flow::queue_node<int>   qn(g); | 
| 358 |     tbb::flow::make_edge(ln, qn); | 
| 359 |     ln.decrement.try_put(tbb::flow::continue_msg()); | 
| 360 |     ln.try_put(42); | 
| 361 |     g.wait_for_all(); | 
| 362 |     int outint; | 
| 363 |     ASSERT(qn.try_get(outint) && outint == 42, "initial put to decrement stops node" ); | 
| 364 | } | 
| 365 |  | 
| 366 | #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR | 
| 367 | using namespace tbb::flow; | 
| 368 | void run_and_check_result(graph& g, limiter_node<int>& limit, queue_node<int>& queue, broadcast_node<continue_msg>& broad) { | 
| 369 |     ASSERT( limit.try_put(1), NULL ); | 
| 370 |     ASSERT( limit.try_put(2), NULL ); | 
| 371 |     ASSERT( !limit.try_put(3), NULL ); | 
| 372 |     ASSERT( broad.try_put(continue_msg()), NULL ); | 
| 373 |     ASSERT( limit.decrement.try_put(continue_msg()), NULL ); | 
| 374 |     ASSERT( limit.try_put(4), NULL ); | 
| 375 |     ASSERT( !limit.try_put(5), NULL ); | 
| 376 |     g.wait_for_all(); | 
| 377 |  | 
| 378 |     int list[] = {1, 2, 4}; | 
| 379 |     int var = 0; | 
| 380 |     for (size_t i = 0; i < sizeof(list)/sizeof(list[0]); i++) { | 
| 381 |         queue.try_get(var); | 
| 382 |         ASSERT(var==list[i], "some data dropped, input does not match output" ); | 
| 383 |     } | 
| 384 | } | 
| 385 |  | 
| 386 | void test_num_decrement_predecessors() { | 
| 387 |     graph g; | 
| 388 |     queue_node<int> output_queue(g); | 
| 389 |     limiter_node<int> limit1(g, 2, /*number_of_predecessors*/1); | 
| 390 |     limiter_node<int, continue_msg> limit2(g, 2, /*number_of_predecessors*/1); | 
| 391 |     broadcast_node<continue_msg> broadcast(g); | 
| 392 |  | 
| 393 |     make_edge(limit1, output_queue); | 
| 394 |     make_edge(limit2, output_queue); | 
| 395 |  | 
| 396 |     make_edge(broadcast, limit1.decrement); | 
| 397 |     make_edge(broadcast, limit2.decrement); | 
| 398 |  | 
| 399 |     run_and_check_result(g, limit1, output_queue, broadcast); | 
| 400 |     run_and_check_result(g, limit2, output_queue, broadcast); | 
| 401 | } | 
| 402 | #else // TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR | 
| 403 | // | 
| 404 | // This test ascertains that if a message is not successfully put | 
| 405 | // to a successor, the message is not dropped but released. | 
| 406 | // | 
| 407 |  | 
| 408 | void test_reserve_release_messages() { | 
| 409 |     using namespace tbb::flow; | 
| 410 |     graph g; | 
| 411 |  | 
| 412 |     //making two queue_nodes: one broadcast_node and one limiter_node | 
| 413 |     queue_node<int> input_queue(g); | 
| 414 |     queue_node<int> output_queue(g); | 
| 415 |     broadcast_node<int> broad(g); | 
| 416 |     limiter_node<int, int> limit(g,2); //threshold of 2 | 
| 417 |  | 
| 418 |     //edges | 
| 419 |     make_edge(input_queue, limit); | 
| 420 |     make_edge(limit, output_queue); | 
| 421 |     make_edge(broad,limit.decrement); | 
| 422 |  | 
| 423 |     int list[4] = {19, 33, 72, 98}; //list to be put to the input queue | 
| 424 |  | 
| 425 |     input_queue.try_put(list[0]); // succeeds | 
| 426 |     input_queue.try_put(list[1]); // succeeds | 
| 427 |     input_queue.try_put(list[2]); // fails, stored in upstream buffer | 
| 428 |     g.wait_for_all(); | 
| 429 |  | 
| 430 |     remove_edge(limit, output_queue); //remove successor | 
| 431 |  | 
| 432 |     //sending message to the decrement port of the limiter | 
| 433 |     broad.try_put(1); //failed message retrieved. | 
| 434 |     g.wait_for_all(); | 
| 435 |  | 
| 436 |     make_edge(limit, output_queue); //putting the successor back | 
| 437 |  | 
| 438 |     broad.try_put(1);  //drop the count | 
| 439 |  | 
| 440 |     input_queue.try_put(list[3]);  //success | 
| 441 |     g.wait_for_all(); | 
| 442 |  | 
| 443 |     int var=0; | 
| 444 |  | 
| 445 |     for (int i=0; i<4; i++) { | 
| 446 |         output_queue.try_get(var); | 
| 447 |         ASSERT(var==list[i], "some data dropped, input does not match output" ); | 
| 448 |         g.wait_for_all(); | 
| 449 |     } | 
| 450 | } | 
| 451 |  | 
| 452 | void test_decrementer() { | 
| 453 |     const int threshold = 5; | 
| 454 |     tbb::flow::graph g; | 
| 455 |     tbb::flow::limiter_node<int, int> limit(g, threshold); | 
| 456 |     tbb::flow::queue_node<int> queue(g); | 
| 457 |     make_edge(limit, queue); | 
| 458 |     int m = 0; | 
| 459 |     ASSERT( limit.try_put( m++ ), "Newly constructed limiter node does not accept message."  ); | 
| 460 |     ASSERT( limit.decrement.try_put( -threshold ), // close limiter's gate | 
| 461 |             "Limiter node decrementer's port does not accept message."  ); | 
| 462 |     ASSERT( !limit.try_put( m++ ), "Closed limiter node's accepts message."  ); | 
| 463 |     ASSERT( limit.decrement.try_put( threshold + 5 ),  // open limiter's gate | 
| 464 |             "Limiter node decrementer's port does not accept message."  ); | 
| 465 |     for( int i = 0; i < threshold; ++i ) | 
| 466 |         ASSERT( limit.try_put( m++ ), "Limiter node does not accept message while open."  ); | 
| 467 |     ASSERT( !limit.try_put( m ), "Limiter node's gate is not closed."  ); | 
| 468 |     g.wait_for_all(); | 
| 469 |     int expected[] = {0, 2, 3, 4, 5, 6}; | 
| 470 |     int actual = -1; m = 0; | 
| 471 |     while( queue.try_get(actual) ) | 
| 472 |         ASSERT( actual == expected[m++], NULL ); | 
| 473 |     ASSERT( sizeof(expected) / sizeof(expected[0]) == m, "Not all messages have been processed."  ); | 
| 474 |     g.wait_for_all(); | 
| 475 |  | 
| 476 |     const size_t threshold2 = size_t(-1); | 
| 477 |     tbb::flow::limiter_node<int, long long> limit2(g, threshold2); | 
| 478 |     make_edge(limit2, queue); | 
| 479 |     ASSERT( limit2.try_put( 1 ), "Newly constructed limiter node does not accept message."  ); | 
| 480 |     long long decrement_value = (long long)( size_t(-1)/2 ); | 
| 481 |     ASSERT( limit2.decrement.try_put( -decrement_value ), | 
| 482 |             "Limiter node decrementer's port does not accept message"  ); | 
| 483 |     ASSERT( limit2.try_put( 2 ), "Limiter's gate should not be closed yet."  ); | 
| 484 |     ASSERT( limit2.decrement.try_put( -decrement_value ), | 
| 485 |             "Limiter node decrementer's port does not accept message"  ); | 
| 486 |     ASSERT( !limit2.try_put( 3 ), "Overflow happened for internal counter."  ); | 
| 487 |     int expected2[] = {1, 2}; | 
| 488 |     actual = -1; m = 0; | 
| 489 |     while( queue.try_get(actual) ) | 
| 490 |         ASSERT( actual == expected2[m++], NULL ); | 
| 491 |     ASSERT( sizeof(expected2) / sizeof(expected2[0]) == m, "Not all messages have been processed."  ); | 
| 492 |     g.wait_for_all(); | 
| 493 | } | 
| 494 | #endif // TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR | 
| 495 |  | 
| 496 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION | 
| 497 | void test_extract() { | 
| 498 |     tbb::flow::graph g; | 
| 499 |     int j; | 
| 500 |     tbb::flow::limiter_node<int> node0(g, /*threshold*/1); | 
| 501 |     tbb::flow::queue_node<int> q0(g); | 
| 502 |     tbb::flow::queue_node<int> q1(g); | 
| 503 |     tbb::flow::queue_node<int> q2(g); | 
| 504 |     tbb::flow::broadcast_node<tbb::flow::continue_msg> b0(g); | 
| 505 |     tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g); | 
| 506 |  | 
| 507 |     for( int i = 0; i < 2; ++i ) { | 
| 508 |         REMARK("At pass %d\n" , i); | 
| 509 |         ASSERT(node0.predecessor_count() == 0, "incorrect predecessor count at start" ); | 
| 510 |         ASSERT(node0.successor_count() == 0, "incorrect successor count at start" ); | 
| 511 |         ASSERT(node0.decrement.predecessor_count() == 0, "incorrect decrement pred count at start" ); | 
| 512 |  | 
| 513 |         tbb::flow::make_edge(q0, node0); | 
| 514 |         tbb::flow::make_edge(q1, node0); | 
| 515 |         tbb::flow::make_edge(node0, q2); | 
| 516 |         tbb::flow::make_edge(b0, node0.decrement); | 
| 517 |         tbb::flow::make_edge(b1, node0.decrement); | 
| 518 |         g.wait_for_all(); | 
| 519 |  | 
| 520 |         /*    b0   b1              */ | 
| 521 |         /*      \  |               */ | 
| 522 |         /*  q0\  \ |               */ | 
| 523 |         /*     \  \|               */ | 
| 524 |         /*      +-node0---q2       */ | 
| 525 |         /*     /                   */ | 
| 526 |         /*  q1/                    */ | 
| 527 |  | 
| 528 |         q0.try_put(i); | 
| 529 |         g.wait_for_all(); | 
| 530 |         ASSERT(node0.predecessor_count() == 2, "incorrect predecessor count after construction" ); | 
| 531 |         ASSERT(node0.successor_count() == 1, "incorrect successor count after construction" ); | 
| 532 |         ASSERT(node0.decrement.predecessor_count() == 2, "incorrect decrement pred count after construction" ); | 
| 533 |         ASSERT(q2.try_get(j), "fetch of value forwarded to output queue failed" ); | 
| 534 |         ASSERT(j == i, "improper value forwarded to output queue" ); | 
| 535 |         q0.try_put(2*i); | 
| 536 |         g.wait_for_all(); | 
| 537 |         ASSERT(!q2.try_get(j), "limiter_node forwarded item improperly" ); | 
| 538 |         b0.try_put(tbb::flow::continue_msg()); | 
| 539 |         g.wait_for_all(); | 
| 540 |         ASSERT(!q2.try_get(j), "limiter_node forwarded item improperly" ); | 
| 541 |         b0.try_put(tbb::flow::continue_msg()); | 
| 542 |         g.wait_for_all(); | 
| 543 |         ASSERT(q2.try_get(j) && j == 2*i, "limiter_node failed to forward item" ); | 
| 544 |  | 
| 545 |         tbb::flow::limiter_node<int>::successor_list_type sv; | 
| 546 |         tbb::flow::limiter_node<int>::predecessor_list_type pv; | 
| 547 |         tbb::flow::continue_receiver::predecessor_list_type dv; | 
| 548 |         tbb::flow::limiter_node<int>::successor_list_type sv1; | 
| 549 |         tbb::flow::limiter_node<int>::predecessor_list_type pv1; | 
| 550 |         tbb::flow::continue_receiver::predecessor_list_type dv1; | 
| 551 |  | 
| 552 |         node0.copy_predecessors(pv); | 
| 553 |         node0.copy_successors(sv); | 
| 554 |         node0.decrement.copy_predecessors(dv); | 
| 555 |         pv1.push_back(&(q0)); | 
| 556 |         pv1.push_back(&(q1)); | 
| 557 |         sv1.push_back(&(q2)); | 
| 558 |         dv1.push_back(&(b0)); | 
| 559 |         dv1.push_back(&(b1)); | 
| 560 |  | 
| 561 |         ASSERT(pv.size() == 2, "improper size for predecessors" ); | 
| 562 |         ASSERT(sv.size() == 1, "improper size for successors" ); | 
| 563 |         ASSERT(lists_match(pv,pv1), "predecessor lists do not match" ); | 
| 564 |         ASSERT(lists_match(sv,sv1), "successor lists do not match" ); | 
| 565 |         ASSERT(lists_match(dv,dv1), "successor lists do not match" ); | 
| 566 |  | 
| 567 |         if(i == 0) { | 
| 568 |             node0.extract(); | 
| 569 |             ASSERT(node0.predecessor_count() == 0, "incorrect predecessor count after extraction" ); | 
| 570 |             ASSERT(node0.successor_count() == 0, "incorrect successor count after extraction" ); | 
| 571 |             ASSERT(node0.decrement.predecessor_count() == 0, "incorrect decrement pred count after extraction" ); | 
| 572 |         } | 
| 573 |         else { | 
| 574 |             q0.extract(); | 
| 575 |             b0.extract(); | 
| 576 |             q2.extract(); | 
| 577 |  | 
| 578 |             ASSERT(node0.predecessor_count() == 1, "incorrect predecessor count after extract second iter" ); | 
| 579 |             ASSERT(node0.successor_count() == 0, "incorrect successor count after extract second iter" ); | 
| 580 |             ASSERT(node0.decrement.predecessor_count() == 1, "incorrect decrement pred count after extract second iter" ); | 
| 581 |  | 
| 582 |             node0.copy_predecessors(pv); | 
| 583 |             node0.copy_successors(sv); | 
| 584 |             node0.decrement.copy_predecessors(dv); | 
| 585 |             pv1.clear(); | 
| 586 |             sv1.clear(); | 
| 587 |             dv1.clear(); | 
| 588 |             pv1.push_back(&(q1)); | 
| 589 |             dv1.push_back(&(b1)); | 
| 590 |  | 
| 591 |             ASSERT(lists_match(pv,pv1), "predecessor lists do not match second iter" ); | 
| 592 |             ASSERT(lists_match(sv,sv1), "successor lists do not match second iter" ); | 
| 593 |             ASSERT(lists_match(dv,dv1), "successor lists do not match second iter" ); | 
| 594 |  | 
| 595 |             q1.extract(); | 
| 596 |             b1.extract(); | 
| 597 |         } | 
| 598 |         ASSERT(node0.predecessor_count() == 0, "incorrect predecessor count after extract" ); | 
| 599 |         ASSERT(node0.successor_count() == 0, "incorrect successor count after extract" ); | 
| 600 |         ASSERT(node0.decrement.predecessor_count() == 0, "incorrect decrement pred count after extract" ); | 
| 601 |  | 
| 602 |     } | 
| 603 |  | 
| 604 | } | 
| 605 | #endif  // TBB_DEPRECATED_FLOW_NODE_EXTRACTION | 
| 606 |  | 
| 607 | int TestMain() { | 
| 608 |     for (int i = 1; i <= 8; ++i) { | 
| 609 |         tbb::task_scheduler_init init(i); | 
| 610 |         test_serial<int>(); | 
| 611 |         test_parallel<int>(i); | 
| 612 |     } | 
| 613 |     test_continue_msg_reception(); | 
| 614 |     test_multifunction_to_limiter(30,3); | 
| 615 |     test_multifunction_to_limiter(300,13); | 
| 616 |     test_multifunction_to_limiter(3000,1); | 
| 617 | #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR | 
| 618 |     test_num_decrement_predecessors(); | 
| 619 | #else | 
| 620 |     test_reserve_release_messages(); | 
| 621 |     test_decrementer(); | 
| 622 | #endif | 
| 623 | #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION | 
| 624 |     test_extract(); | 
| 625 | #endif | 
| 626 |    return Harness::Done; | 
| 627 | } | 
| 628 |  |