1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include "harness.h"
18#if __TBB_CPF_BUILD
19#define TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1
20#include "harness_graph.h"
21#endif
22#include "tbb/flow_graph.h"
23#include "tbb/atomic.h"
24#include "tbb/task_scheduler_init.h"
25
26const int L = 10;
27const int N = 1000;
28
29using tbb::flow::internal::SUCCESSFULLY_ENQUEUED;
30
31template< typename T >
32struct serial_receiver : public tbb::flow::receiver<T>, NoAssign {
33 T next_value;
34 tbb::flow::graph& my_graph;
35
36 serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {}
37
38 tbb::task *try_put_task( const T &v ) __TBB_override {
39 ASSERT( next_value++ == v, NULL );
40 return const_cast<tbb::task *>(SUCCESSFULLY_ENQUEUED);
41 }
42
43 tbb::flow::graph& graph_reference() __TBB_override {
44 return my_graph;
45 }
46
47#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
48 typedef typename tbb::flow::receiver<T>::built_predecessors_type built_predecessors_type;
49 typedef typename tbb::flow::receiver<T>::predecessor_list_type predecessor_list_type;
50 typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type;
51 built_predecessors_type bpt;
52 built_predecessors_type &built_predecessors() __TBB_override { return bpt; }
53 void internal_add_built_predecessor( predecessor_type & ) __TBB_override { }
54 void internal_delete_built_predecessor( predecessor_type & ) __TBB_override { }
55 void copy_predecessors( predecessor_list_type & ) __TBB_override { }
56 size_t predecessor_count() __TBB_override { return 0; }
57#endif
58
59 void reset_receiver(tbb::flow::reset_flags /*f*/) __TBB_override {next_value = T(0);}
60};
61
62template< typename T >
63struct parallel_receiver : public tbb::flow::receiver<T>, NoAssign {
64
65 tbb::atomic<int> my_count;
66 tbb::flow::graph& my_graph;
67
68 parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; }
69
70 tbb::task *try_put_task( const T &/*v*/ ) __TBB_override {
71 ++my_count;
72 return const_cast<tbb::task *>(tbb::flow::internal::SUCCESSFULLY_ENQUEUED);
73 }
74
75 tbb::flow::graph& graph_reference() __TBB_override {
76 return my_graph;
77 }
78
79#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
80 typedef typename tbb::flow::receiver<T>::built_predecessors_type built_predecessors_type;
81 typedef typename tbb::flow::receiver<T>::predecessor_list_type predecessor_list_type;
82 typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type;
83 built_predecessors_type bpt;
84 built_predecessors_type &built_predecessors() __TBB_override { return bpt; }
85 void internal_add_built_predecessor( predecessor_type & ) __TBB_override { }
86 void internal_delete_built_predecessor( predecessor_type & ) __TBB_override { }
87 void copy_predecessors( predecessor_list_type & ) __TBB_override { }
88 size_t predecessor_count( ) __TBB_override { return 0; }
89#endif
90 void reset_receiver(tbb::flow::reset_flags /*f*/) __TBB_override {my_count = 0;}
91};
92
93template< typename T >
94struct empty_sender : public tbb::flow::sender<T> {
95 typedef typename tbb::flow::sender<T>::successor_type successor_type;
96
97 bool register_successor( successor_type & ) __TBB_override { return false; }
98 bool remove_successor( successor_type & ) __TBB_override { return false; }
99#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
100 typedef typename tbb::flow::sender<T>::built_successors_type built_successors_type;
101 typedef typename tbb::flow::sender<T>::successor_list_type successor_list_type;
102 built_successors_type bst;
103 built_successors_type &built_successors() __TBB_override { return bst; }
104 void internal_add_built_successor( successor_type & ) __TBB_override { }
105 void internal_delete_built_successor( successor_type & ) __TBB_override { }
106 void copy_successors( successor_list_type & ) __TBB_override { }
107 size_t successor_count() __TBB_override { return 0; }
108#endif
109};
110
111
112template< typename T >
113struct put_body : NoAssign {
114
115 tbb::flow::limiter_node<T> &my_lim;
116 tbb::atomic<int> &my_accept_count;
117
118 put_body( tbb::flow::limiter_node<T> &lim, tbb::atomic<int> &accept_count ) :
119 my_lim(lim), my_accept_count(accept_count) {}
120
121 void operator()( int ) const {
122 for ( int i = 0; i < L; ++i ) {
123 bool msg = my_lim.try_put( T(i) );
124 if ( msg == true )
125 ++my_accept_count;
126 }
127 }
128};
129
130template< typename T >
131struct put_dec_body : NoAssign {
132
133 tbb::flow::limiter_node<T> &my_lim;
134 tbb::atomic<int> &my_accept_count;
135
136 put_dec_body( tbb::flow::limiter_node<T> &lim, tbb::atomic<int> &accept_count ) :
137 my_lim(lim), my_accept_count(accept_count) {}
138
139 void operator()( int ) const {
140 int local_accept_count = 0;
141 while ( local_accept_count < N ) {
142 bool msg = my_lim.try_put( T(local_accept_count) );
143 if ( msg == true ) {
144 ++local_accept_count;
145 ++my_accept_count;
146 my_lim.decrement.try_put( tbb::flow::continue_msg() );
147 }
148 }
149 }
150
151};
152
153template< typename T >
154void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) {
155 parallel_receiver<T> r(g);
156 empty_sender< tbb::flow::continue_msg > s;
157 tbb::atomic<int> accept_count;
158 accept_count = 0;
159 tbb::flow::make_edge( lim, r );
160 tbb::flow::make_edge(s, lim.decrement);
161#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
162 ASSERT(lim.decrement.predecessor_count() == 1, NULL);
163 ASSERT(lim.successor_count() == 1, NULL);
164 ASSERT(lim.predecessor_count() == 0, NULL);
165 typename tbb::flow::interface10::internal::decrementer
166 <tbb::flow::limiter_node<T>, tbb::flow::continue_msg>::predecessor_list_type dec_preds;
167 lim.decrement.copy_predecessors(dec_preds);
168 ASSERT(dec_preds.size() == 1, NULL);
169#endif
170 // test puts with decrements
171 NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) );
172 int c = accept_count;
173 ASSERT( c == N*num_threads, NULL );
174 ASSERT( r.my_count == N*num_threads, NULL );
175}
176
177//
178// Tests
179//
180// limiter only forwards below the limit, multiple parallel senders / single receiver
181// multiple parallel senders that put to decrement at each accept, limiter accepts new messages
182//
183//
184template< typename T >
185int test_parallel(int num_threads) {
186
187 // test puts with no decrements
188 for ( int i = 0; i < L; ++i ) {
189 tbb::flow::graph g;
190 tbb::flow::limiter_node< T > lim(g, i);
191 parallel_receiver<T> r(g);
192 tbb::atomic<int> accept_count;
193 accept_count = 0;
194 tbb::flow::make_edge( lim, r );
195 // test puts with no decrements
196 NativeParallelFor( num_threads, put_body<T>(lim, accept_count) );
197 g.wait_for_all();
198 int c = accept_count;
199 ASSERT( c == i, NULL );
200 }
201
202 // test puts with decrements
203 for ( int i = 1; i < L; ++i ) {
204 tbb::flow::graph g;
205 tbb::flow::limiter_node< T > lim(g, i);
206 test_puts_with_decrements(num_threads, lim, g);
207 tbb::flow::limiter_node< T > lim_copy( lim );
208 test_puts_with_decrements(num_threads, lim_copy, g);
209 }
210
211 return 0;
212}
213
214//
215// Tests
216//
217// limiter only forwards below the limit, single sender / single receiver
218// at reject, a put to decrement, will cause next message to be accepted
219//
220template< typename T >
221int test_serial() {
222
223 // test puts with no decrements
224 for ( int i = 0; i < L; ++i ) {
225 tbb::flow::graph g;
226 tbb::flow::limiter_node< T > lim(g, i);
227 serial_receiver<T> r(g);
228 tbb::flow::make_edge( lim, r );
229 for ( int j = 0; j < L; ++j ) {
230 bool msg = lim.try_put( T(j) );
231 ASSERT( ( j < i && msg == true ) || ( j >= i && msg == false ), NULL );
232 }
233 g.wait_for_all();
234 }
235
236 // test puts with decrements
237 for ( int i = 1; i < L; ++i ) {
238 tbb::flow::graph g;
239 tbb::flow::limiter_node< T > lim(g, i);
240 serial_receiver<T> r(g);
241 empty_sender< tbb::flow::continue_msg > s;
242 tbb::flow::make_edge( lim, r );
243 tbb::flow::make_edge(s, lim.decrement);
244 for ( int j = 0; j < N; ++j ) {
245 bool msg = lim.try_put( T(j) );
246 ASSERT( ( j < i && msg == true ) || ( j >= i && msg == false ), NULL );
247 if ( msg == false ) {
248 lim.decrement.try_put( tbb::flow::continue_msg() );
249 msg = lim.try_put( T(j) );
250 ASSERT( msg == true, NULL );
251 }
252 }
253 }
254 return 0;
255}
256
257// reported bug in limiter (http://software.intel.com/en-us/comment/1752355)
258#define DECREMENT_OUTPUT 1 // the port number of the decrement output of the multifunction_node
259#define LIMITER_OUTPUT 0 // port number of the integer output
260
261typedef tbb::flow::multifunction_node<int, tbb::flow::tuple<int,tbb::flow::continue_msg> > mfnode_type;
262
263tbb::atomic<size_t> emit_count;
264tbb::atomic<size_t> emit_sum;
265tbb::atomic<size_t> receive_count;
266tbb::atomic<size_t> receive_sum;
267
268struct mfnode_body {
269 int max_cnt;
270 tbb::atomic<int>* my_cnt;
271 mfnode_body(const int& _max, tbb::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my) { }
272 void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) {
273 int lcnt = ++(*my_cnt);
274 if(lcnt > max_cnt) {
275 return;
276 }
277 // put one continue_msg to the decrement of the limiter.
278 if(!tbb::flow::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) {
279 ASSERT(false,"Unexpected rejection of decrement");
280 }
281 {
282 // put messages to the input of the limiter_node until it rejects.
283 while( tbb::flow::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) {
284 emit_sum += lcnt;
285 ++emit_count;
286 }
287 }
288 }
289};
290
291struct fn_body {
292 int operator()(const int &in) {
293 receive_sum += in;
294 ++receive_count;
295 return in;
296 }
297};
298
299// +------------+
300// +---------+ | v
301// | mf_node |0---+ +----------+ +----------+
302// +->| |1---------->| lim_node |--------->| fn_node |--+
303// | +---------+ +----------+ +----------+ |
304// | |
305// | |
306// +-------------------------------------------------------------+
307//
308void
309test_multifunction_to_limiter(int _max, int _nparallel) {
310 tbb::flow::graph g;
311 emit_count = 0;
312 emit_sum = 0;
313 receive_count = 0;
314 receive_sum = 0;
315 tbb::atomic<int> local_cnt;
316 local_cnt = 0;
317 mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt));
318 tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body());
319 tbb::flow::limiter_node<int> lim_node(g, _nparallel);
320 tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node);
321 tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrement);
322 tbb::flow::make_edge(lim_node, fn_node);
323 tbb::flow::make_edge(fn_node, mf_node);
324#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
325 REMARK("pred cnt == %d\n",(int)(lim_node.predecessor_count()));
326 REMARK("succ cnt == %d\n",(int)(lim_node.successor_count()));
327 tbb::flow::limiter_node<int>::successor_list_type my_succs;
328 lim_node.copy_successors(my_succs);
329 REMARK("succ cnt from vector == %d\n",(int)(my_succs.size()));
330 tbb::flow::limiter_node<int>::predecessor_list_type my_preds;
331 lim_node.copy_predecessors(my_preds);
332 REMARK("pred cnt from vector == %d\n",(int)(my_preds.size()));
333#endif
334 mf_node.try_put(1);
335 g.wait_for_all();
336 ASSERT(emit_count == receive_count, "counts do not match");
337 ASSERT(emit_sum == receive_sum, "sums do not match");
338
339 // reset, test again
340 g.reset();
341 emit_count = 0;
342 emit_sum = 0;
343 receive_count = 0;
344 receive_sum = 0;
345 local_cnt = 0;;
346 mf_node.try_put(1);
347 g.wait_for_all();
348 ASSERT(emit_count == receive_count, "counts do not match");
349 ASSERT(emit_sum == receive_sum, "sums do not match");
350}
351
352
353void
354test_continue_msg_reception() {
355 tbb::flow::graph g;
356 tbb::flow::limiter_node<int> ln(g,2);
357 tbb::flow::queue_node<int> qn(g);
358 tbb::flow::make_edge(ln, qn);
359 ln.decrement.try_put(tbb::flow::continue_msg());
360 ln.try_put(42);
361 g.wait_for_all();
362 int outint;
363 ASSERT(qn.try_get(outint) && outint == 42, "initial put to decrement stops node");
364}
365
366#if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
367using namespace tbb::flow;
368void run_and_check_result(graph& g, limiter_node<int>& limit, queue_node<int>& queue, broadcast_node<continue_msg>& broad) {
369 ASSERT( limit.try_put(1), NULL );
370 ASSERT( limit.try_put(2), NULL );
371 ASSERT( !limit.try_put(3), NULL );
372 ASSERT( broad.try_put(continue_msg()), NULL );
373 ASSERT( limit.decrement.try_put(continue_msg()), NULL );
374 ASSERT( limit.try_put(4), NULL );
375 ASSERT( !limit.try_put(5), NULL );
376 g.wait_for_all();
377
378 int list[] = {1, 2, 4};
379 int var = 0;
380 for (size_t i = 0; i < sizeof(list)/sizeof(list[0]); i++) {
381 queue.try_get(var);
382 ASSERT(var==list[i], "some data dropped, input does not match output");
383 }
384}
385
386void test_num_decrement_predecessors() {
387 graph g;
388 queue_node<int> output_queue(g);
389 limiter_node<int> limit1(g, 2, /*number_of_predecessors*/1);
390 limiter_node<int, continue_msg> limit2(g, 2, /*number_of_predecessors*/1);
391 broadcast_node<continue_msg> broadcast(g);
392
393 make_edge(limit1, output_queue);
394 make_edge(limit2, output_queue);
395
396 make_edge(broadcast, limit1.decrement);
397 make_edge(broadcast, limit2.decrement);
398
399 run_and_check_result(g, limit1, output_queue, broadcast);
400 run_and_check_result(g, limit2, output_queue, broadcast);
401}
402#else // TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
403//
404// This test ascertains that if a message is not successfully put
405// to a successor, the message is not dropped but released.
406//
407
408void test_reserve_release_messages() {
409 using namespace tbb::flow;
410 graph g;
411
412 //making two queue_nodes: one broadcast_node and one limiter_node
413 queue_node<int> input_queue(g);
414 queue_node<int> output_queue(g);
415 broadcast_node<int> broad(g);
416 limiter_node<int, int> limit(g,2); //threshold of 2
417
418 //edges
419 make_edge(input_queue, limit);
420 make_edge(limit, output_queue);
421 make_edge(broad,limit.decrement);
422
423 int list[4] = {19, 33, 72, 98}; //list to be put to the input queue
424
425 input_queue.try_put(list[0]); // succeeds
426 input_queue.try_put(list[1]); // succeeds
427 input_queue.try_put(list[2]); // fails, stored in upstream buffer
428 g.wait_for_all();
429
430 remove_edge(limit, output_queue); //remove successor
431
432 //sending message to the decrement port of the limiter
433 broad.try_put(1); //failed message retrieved.
434 g.wait_for_all();
435
436 make_edge(limit, output_queue); //putting the successor back
437
438 broad.try_put(1); //drop the count
439
440 input_queue.try_put(list[3]); //success
441 g.wait_for_all();
442
443 int var=0;
444
445 for (int i=0; i<4; i++) {
446 output_queue.try_get(var);
447 ASSERT(var==list[i], "some data dropped, input does not match output");
448 g.wait_for_all();
449 }
450}
451
452void test_decrementer() {
453 const int threshold = 5;
454 tbb::flow::graph g;
455 tbb::flow::limiter_node<int, int> limit(g, threshold);
456 tbb::flow::queue_node<int> queue(g);
457 make_edge(limit, queue);
458 int m = 0;
459 ASSERT( limit.try_put( m++ ), "Newly constructed limiter node does not accept message." );
460 ASSERT( limit.decrement.try_put( -threshold ), // close limiter's gate
461 "Limiter node decrementer's port does not accept message." );
462 ASSERT( !limit.try_put( m++ ), "Closed limiter node's accepts message." );
463 ASSERT( limit.decrement.try_put( threshold + 5 ), // open limiter's gate
464 "Limiter node decrementer's port does not accept message." );
465 for( int i = 0; i < threshold; ++i )
466 ASSERT( limit.try_put( m++ ), "Limiter node does not accept message while open." );
467 ASSERT( !limit.try_put( m ), "Limiter node's gate is not closed." );
468 g.wait_for_all();
469 int expected[] = {0, 2, 3, 4, 5, 6};
470 int actual = -1; m = 0;
471 while( queue.try_get(actual) )
472 ASSERT( actual == expected[m++], NULL );
473 ASSERT( sizeof(expected) / sizeof(expected[0]) == m, "Not all messages have been processed." );
474 g.wait_for_all();
475
476 const size_t threshold2 = size_t(-1);
477 tbb::flow::limiter_node<int, long long> limit2(g, threshold2);
478 make_edge(limit2, queue);
479 ASSERT( limit2.try_put( 1 ), "Newly constructed limiter node does not accept message." );
480 long long decrement_value = (long long)( size_t(-1)/2 );
481 ASSERT( limit2.decrement.try_put( -decrement_value ),
482 "Limiter node decrementer's port does not accept message" );
483 ASSERT( limit2.try_put( 2 ), "Limiter's gate should not be closed yet." );
484 ASSERT( limit2.decrement.try_put( -decrement_value ),
485 "Limiter node decrementer's port does not accept message" );
486 ASSERT( !limit2.try_put( 3 ), "Overflow happened for internal counter." );
487 int expected2[] = {1, 2};
488 actual = -1; m = 0;
489 while( queue.try_get(actual) )
490 ASSERT( actual == expected2[m++], NULL );
491 ASSERT( sizeof(expected2) / sizeof(expected2[0]) == m, "Not all messages have been processed." );
492 g.wait_for_all();
493}
494#endif // TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
495
496#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
497void test_extract() {
498 tbb::flow::graph g;
499 int j;
500 tbb::flow::limiter_node<int> node0(g, /*threshold*/1);
501 tbb::flow::queue_node<int> q0(g);
502 tbb::flow::queue_node<int> q1(g);
503 tbb::flow::queue_node<int> q2(g);
504 tbb::flow::broadcast_node<tbb::flow::continue_msg> b0(g);
505 tbb::flow::broadcast_node<tbb::flow::continue_msg> b1(g);
506
507 for( int i = 0; i < 2; ++i ) {
508 REMARK("At pass %d\n", i);
509 ASSERT(node0.predecessor_count() == 0, "incorrect predecessor count at start");
510 ASSERT(node0.successor_count() == 0, "incorrect successor count at start");
511 ASSERT(node0.decrement.predecessor_count() == 0, "incorrect decrement pred count at start");
512
513 tbb::flow::make_edge(q0, node0);
514 tbb::flow::make_edge(q1, node0);
515 tbb::flow::make_edge(node0, q2);
516 tbb::flow::make_edge(b0, node0.decrement);
517 tbb::flow::make_edge(b1, node0.decrement);
518 g.wait_for_all();
519
520 /* b0 b1 */
521 /* \ | */
522 /* q0\ \ | */
523 /* \ \| */
524 /* +-node0---q2 */
525 /* / */
526 /* q1/ */
527
528 q0.try_put(i);
529 g.wait_for_all();
530 ASSERT(node0.predecessor_count() == 2, "incorrect predecessor count after construction");
531 ASSERT(node0.successor_count() == 1, "incorrect successor count after construction");
532 ASSERT(node0.decrement.predecessor_count() == 2, "incorrect decrement pred count after construction");
533 ASSERT(q2.try_get(j), "fetch of value forwarded to output queue failed");
534 ASSERT(j == i, "improper value forwarded to output queue");
535 q0.try_put(2*i);
536 g.wait_for_all();
537 ASSERT(!q2.try_get(j), "limiter_node forwarded item improperly");
538 b0.try_put(tbb::flow::continue_msg());
539 g.wait_for_all();
540 ASSERT(!q2.try_get(j), "limiter_node forwarded item improperly");
541 b0.try_put(tbb::flow::continue_msg());
542 g.wait_for_all();
543 ASSERT(q2.try_get(j) && j == 2*i, "limiter_node failed to forward item");
544
545 tbb::flow::limiter_node<int>::successor_list_type sv;
546 tbb::flow::limiter_node<int>::predecessor_list_type pv;
547 tbb::flow::continue_receiver::predecessor_list_type dv;
548 tbb::flow::limiter_node<int>::successor_list_type sv1;
549 tbb::flow::limiter_node<int>::predecessor_list_type pv1;
550 tbb::flow::continue_receiver::predecessor_list_type dv1;
551
552 node0.copy_predecessors(pv);
553 node0.copy_successors(sv);
554 node0.decrement.copy_predecessors(dv);
555 pv1.push_back(&(q0));
556 pv1.push_back(&(q1));
557 sv1.push_back(&(q2));
558 dv1.push_back(&(b0));
559 dv1.push_back(&(b1));
560
561 ASSERT(pv.size() == 2, "improper size for predecessors");
562 ASSERT(sv.size() == 1, "improper size for successors");
563 ASSERT(lists_match(pv,pv1), "predecessor lists do not match");
564 ASSERT(lists_match(sv,sv1), "successor lists do not match");
565 ASSERT(lists_match(dv,dv1), "successor lists do not match");
566
567 if(i == 0) {
568 node0.extract();
569 ASSERT(node0.predecessor_count() == 0, "incorrect predecessor count after extraction");
570 ASSERT(node0.successor_count() == 0, "incorrect successor count after extraction");
571 ASSERT(node0.decrement.predecessor_count() == 0, "incorrect decrement pred count after extraction");
572 }
573 else {
574 q0.extract();
575 b0.extract();
576 q2.extract();
577
578 ASSERT(node0.predecessor_count() == 1, "incorrect predecessor count after extract second iter");
579 ASSERT(node0.successor_count() == 0, "incorrect successor count after extract second iter");
580 ASSERT(node0.decrement.predecessor_count() == 1, "incorrect decrement pred count after extract second iter");
581
582 node0.copy_predecessors(pv);
583 node0.copy_successors(sv);
584 node0.decrement.copy_predecessors(dv);
585 pv1.clear();
586 sv1.clear();
587 dv1.clear();
588 pv1.push_back(&(q1));
589 dv1.push_back(&(b1));
590
591 ASSERT(lists_match(pv,pv1), "predecessor lists do not match second iter");
592 ASSERT(lists_match(sv,sv1), "successor lists do not match second iter");
593 ASSERT(lists_match(dv,dv1), "successor lists do not match second iter");
594
595 q1.extract();
596 b1.extract();
597 }
598 ASSERT(node0.predecessor_count() == 0, "incorrect predecessor count after extract");
599 ASSERT(node0.successor_count() == 0, "incorrect successor count after extract");
600 ASSERT(node0.decrement.predecessor_count() == 0, "incorrect decrement pred count after extract");
601
602 }
603
604}
605#endif // TBB_DEPRECATED_FLOW_NODE_EXTRACTION
606
607int TestMain() {
608 for (int i = 1; i <= 8; ++i) {
609 tbb::task_scheduler_init init(i);
610 test_serial<int>();
611 test_parallel<int>(i);
612 }
613 test_continue_msg_reception();
614 test_multifunction_to_limiter(30,3);
615 test_multifunction_to_limiter(300,13);
616 test_multifunction_to_limiter(3000,1);
617#if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
618 test_num_decrement_predecessors();
619#else
620 test_reserve_release_messages();
621 test_decrementer();
622#endif
623#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
624 test_extract();
625#endif
626 return Harness::Done;
627}
628