1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17/** @file harness_graph.cpp
18 This contains common helper classes and functions for testing graph nodes
19**/
20
21#ifndef harness_graph_H
22#define harness_graph_H
23
24#include "harness.h"
25#include "harness_barrier.h"
26#include "tbb/flow_graph.h"
27#include "tbb/null_rw_mutex.h"
28#include "tbb/atomic.h"
29#include "tbb/concurrent_unordered_map.h"
30#include "tbb/task.h"
31#include "tbb/task_scheduler_init.h"
32#include "tbb/compat/condition_variable"
33#include "tbb/mutex.h"
34#include "tbb/tbb_thread.h"
35
36using tbb::flow::internal::SUCCESSFULLY_ENQUEUED;
37
38#define WAIT_MAX 2000000
39#define BACKOFF_WAIT(ex,msg) \
40{ \
41 int wait_cnt = 0; \
42 tbb::internal::atomic_backoff backoff; \
43 do { \
44 backoff.pause(); \
45 ++wait_cnt; \
46 } \
47 while( (ex) && (wait_cnt < WAIT_MAX)); \
48 ASSERT(wait_cnt < WAIT_MAX, msg); \
49}
50#define BACKOFF_WAIT_NOASSERT(ex,msg) \
51{ \
52 int wait_cnt = 0; \
53 tbb::internal::atomic_backoff backoff; \
54 do { \
55 backoff.pause(); \
56 ++wait_cnt; \
57 } \
58 while( (ex) && (wait_cnt < WAIT_MAX)); \
59 if(wait_cnt >= WAIT_MAX) REMARK("%s\n",msg); \
60}
61
62// Needed conversion to and from continue_msg, but didn't want to add
63// conversion operators to the class, since we don't want it in general,
64// only in these tests.
65template<typename InputType, typename OutputType>
66struct converter {
67 static OutputType convert_value(const InputType &i) {
68 return OutputType(i);
69 }
70};
71
72template<typename InputType>
73struct converter<InputType,tbb::flow::continue_msg> {
74 static tbb::flow::continue_msg convert_value(const InputType &/*i*/) {
75 return tbb::flow::continue_msg();
76 }
77};
78
79template<typename OutputType>
80struct converter<tbb::flow::continue_msg,OutputType> {
81 static OutputType convert_value(const tbb::flow::continue_msg &/*i*/) {
82 return OutputType();
83 }
84};
85
86// helper for multifunction_node tests.
87template<size_t N>
88struct mof_helper {
89 template<typename InputType, typename ports_type>
90 static inline void output_converted_value(const InputType &i, ports_type &p) {
91 (void)tbb::flow::get<N-1>(p).try_put(converter<InputType,typename tbb::flow::tuple_element<N-1,ports_type>::type::output_type>::convert_value(i));
92 output_converted_value<N-1>(i, p);
93 }
94};
95
96template<>
97struct mof_helper<1> {
98 template<typename InputType, typename ports_type>
99 static inline void output_converted_value(const InputType &i, ports_type &p) {
100 // just emit a default-constructed object
101 (void)tbb::flow::get<0>(p).try_put(converter<InputType,typename tbb::flow::tuple_element<0,ports_type>::type::output_type>::convert_value(i));
102 }
103};
104
105template< typename InputType, typename OutputType >
106struct harness_graph_default_functor {
107 static OutputType construct( InputType v ) {
108 return OutputType(v);
109 }
110};
111
112template< typename OutputType >
113struct harness_graph_default_functor< tbb::flow::continue_msg, OutputType > {
114 static OutputType construct( tbb::flow::continue_msg ) {
115 return OutputType();
116 }
117};
118
119template< typename InputType >
120struct harness_graph_default_functor< InputType, tbb::flow::continue_msg > {
121 static tbb::flow::continue_msg construct( InputType ) {
122 return tbb::flow::continue_msg();
123 }
124};
125
126template< >
127struct harness_graph_default_functor< tbb::flow::continue_msg, tbb::flow::continue_msg > {
128 static tbb::flow::continue_msg construct( tbb::flow::continue_msg ) {
129 return tbb::flow::continue_msg();
130 }
131};
132
133template<typename InputType, typename OutputSet>
134struct harness_graph_default_multifunction_functor {
135 static const int N = tbb::flow::tuple_size<OutputSet>::value;
136 typedef typename tbb::flow::multifunction_node<InputType,OutputSet>::output_ports_type ports_type;
137 static void construct(const InputType &i, ports_type &p) {
138 mof_helper<N>::output_converted_value(i, p);
139 }
140};
141
142//! An executor that accepts InputType and generates OutputType
143template< typename InputType, typename OutputType >
144struct harness_graph_executor {
145
146 typedef OutputType (*function_ptr_type)( InputType v );
147
148 template<typename RW>
149 struct mutex_holder { static RW mutex; };
150
151 static function_ptr_type fptr;
152 static tbb::atomic<size_t> execute_count;
153 static tbb::atomic<size_t> current_executors;
154 static size_t max_executors;
155
156 static inline OutputType func( InputType v ) {
157 size_t c; // Declaration separate from initialization to avoid ICC internal error on IA-64 architecture
158 c = current_executors.fetch_and_increment();
159 ASSERT( max_executors == 0 || c <= max_executors, NULL );
160 ++execute_count;
161 OutputType v2 = (*fptr)(v);
162 current_executors.fetch_and_decrement();
163 return v2;
164 }
165
166 template< typename RW >
167 static inline OutputType tfunc( InputType v ) {
168 // Invocations allowed to be concurrent, the lock is acquired in shared ("read") mode.
169 // A test can take it exclusively, thus creating a barrier for invocations.
170 typename RW::scoped_lock l( mutex_holder<RW>::mutex, /*write=*/false );
171 return func(v);
172 }
173
174 template< typename RW >
175 struct tfunctor {
176 tbb::atomic<size_t> my_execute_count;
177 tfunctor() { my_execute_count = 0; }
178 tfunctor( const tfunctor &f ) { my_execute_count = f.my_execute_count; }
179 OutputType operator()( InputType i ) {
180 typename RW::scoped_lock l( harness_graph_executor::mutex_holder<RW>::mutex, /*write=*/false );
181 my_execute_count.fetch_and_increment();
182 return harness_graph_executor::func(i);
183 }
184 };
185 typedef tfunctor<tbb::null_rw_mutex> functor;
186
187};
188
189//! A multifunction executor that accepts InputType and has only one Output of OutputType.
190template< typename InputType, typename OutputTuple >
191struct harness_graph_multifunction_executor {
192 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type ports_type;
193 typedef typename tbb::flow::tuple_element<0,OutputTuple>::type OutputType;
194
195 typedef void (*mfunction_ptr_type)( const InputType& v, ports_type &p );
196
197 template<typename RW>
198 struct mutex_holder { static RW mutex; };
199
200 static mfunction_ptr_type fptr;
201 static tbb::atomic<size_t> execute_count;
202 static tbb::atomic<size_t> current_executors;
203 static size_t max_executors;
204
205 static inline void empty_func( const InputType&, ports_type& ) {
206 }
207
208 static inline void func( const InputType &v, ports_type &p ) {
209 size_t c; // Declaration separate from initialization to avoid ICC internal error on IA-64 architecture
210 c = current_executors.fetch_and_increment();
211 ASSERT( max_executors == 0 || c <= max_executors, NULL );
212 ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 1, NULL);
213 ++execute_count;
214 (*fptr)(v,p);
215 current_executors.fetch_and_decrement();
216 }
217
218 template< typename RW >
219 static inline void tfunc( const InputType& v, ports_type &p ) {
220 // Shared lock in invocations, exclusive in a test; see a comment in harness_graph_executor.
221 typename RW::scoped_lock l( mutex_holder<RW>::mutex, /*write=*/false );
222 func(v,p);
223 }
224
225 template< typename RW >
226 struct tfunctor {
227 tbb::atomic<size_t> my_execute_count;
228 tfunctor() { my_execute_count = 0; }
229 tfunctor( const tfunctor &f ) { my_execute_count = f.my_execute_count; }
230 void operator()( const InputType &i, ports_type &p ) {
231 typename RW::scoped_lock l( harness_graph_multifunction_executor::mutex_holder<RW>::mutex, /*write=*/false );
232 my_execute_count.fetch_and_increment();
233 harness_graph_multifunction_executor::func(i,p);
234 }
235 };
236 typedef tfunctor<tbb::null_rw_mutex> functor;
237
238};
239
240// static vars for function_node tests
241template< typename InputType, typename OutputType >
242template< typename RW >
243RW harness_graph_executor<InputType, OutputType>::mutex_holder<RW>::mutex;
244
245template< typename InputType, typename OutputType >
246tbb::atomic<size_t> harness_graph_executor<InputType, OutputType>::execute_count;
247
248template< typename InputType, typename OutputType >
249typename harness_graph_executor<InputType, OutputType>::function_ptr_type harness_graph_executor<InputType, OutputType>::fptr
250 = harness_graph_default_functor< InputType, OutputType >::construct;
251
252template< typename InputType, typename OutputType >
253tbb::atomic<size_t> harness_graph_executor<InputType, OutputType>::current_executors;
254
255template< typename InputType, typename OutputType >
256size_t harness_graph_executor<InputType, OutputType>::max_executors = 0;
257
258// static vars for multifunction_node tests
259template< typename InputType, typename OutputTuple >
260template< typename RW >
261RW harness_graph_multifunction_executor<InputType, OutputTuple>::mutex_holder<RW>::mutex;
262
263template< typename InputType, typename OutputTuple >
264tbb::atomic<size_t> harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count;
265
266template< typename InputType, typename OutputTuple >
267typename harness_graph_multifunction_executor<InputType, OutputTuple>::mfunction_ptr_type harness_graph_multifunction_executor<InputType, OutputTuple>::fptr
268 = harness_graph_default_multifunction_functor< InputType, OutputTuple >::construct;
269
270template< typename InputType, typename OutputTuple >
271tbb::atomic<size_t> harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors;
272
273template< typename InputType, typename OutputTuple >
274size_t harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0;
275
276//! Counts the number of puts received
277template< typename T >
278struct harness_counting_receiver : public tbb::flow::receiver<T>, NoAssign {
279
280 tbb::atomic< size_t > my_count;
281 T max_value;
282 size_t num_copies;
283 tbb::flow::graph& my_graph;
284
285 harness_counting_receiver(tbb::flow::graph& g) : num_copies(1), my_graph(g) {
286 my_count = 0;
287 }
288
289 void initialize_map( const T& m, size_t c ) {
290 my_count = 0;
291 max_value = m;
292 num_copies = c;
293 }
294
295 tbb::flow::graph& graph_reference() __TBB_override {
296 return my_graph;
297 }
298
299 tbb::task *try_put_task( const T & ) __TBB_override {
300 ++my_count;
301 return const_cast<tbb::task *>(SUCCESSFULLY_ENQUEUED);
302 }
303
304 void validate() {
305 size_t n = my_count;
306 ASSERT( n == num_copies*max_value, NULL );
307 }
308
309#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
310 typedef typename tbb::flow::receiver<T>::built_predecessors_type built_predecessors_type;
311 built_predecessors_type mbp;
312 built_predecessors_type &built_predecessors() __TBB_override { return mbp; }
313 typedef typename tbb::flow::receiver<T>::predecessor_list_type predecessor_list_type;
314 typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type;
315 void internal_add_built_predecessor(predecessor_type &) __TBB_override {}
316 void internal_delete_built_predecessor(predecessor_type &) __TBB_override {}
317 void copy_predecessors(predecessor_list_type &) __TBB_override { }
318 size_t predecessor_count() __TBB_override { return 0; }
319#endif
320 void reset_receiver(tbb::flow::reset_flags /*f*/) __TBB_override { my_count = 0; }
321};
322
323//! Counts the number of puts received
324template< typename T >
325struct harness_mapped_receiver : public tbb::flow::receiver<T>, NoCopy {
326
327 tbb::atomic< size_t > my_count;
328 T max_value;
329 size_t num_copies;
330 typedef tbb::concurrent_unordered_map< T, tbb::atomic< size_t > > map_type;
331 map_type *my_map;
332 tbb::flow::graph& my_graph;
333
334 harness_mapped_receiver(tbb::flow::graph& g) : my_map(NULL), my_graph(g) {
335 my_count = 0;
336 }
337
338 ~harness_mapped_receiver() {
339 if ( my_map ) delete my_map;
340 }
341
342 void initialize_map( const T& m, size_t c ) {
343 my_count = 0;
344 max_value = m;
345 num_copies = c;
346 if ( my_map ) delete my_map;
347 my_map = new map_type;
348 }
349
350 tbb::task * try_put_task( const T &t ) __TBB_override {
351 if ( my_map ) {
352 tbb::atomic<size_t> a;
353 a = 1;
354 std::pair< typename map_type::iterator, bool > r = (*my_map).insert( typename map_type::value_type( t, a ) );
355 if ( r.second == false ) {
356 size_t v = r.first->second.fetch_and_increment();
357 ASSERT( v < num_copies, NULL );
358 }
359 } else {
360 ++my_count;
361 }
362 return const_cast<tbb::task *>(SUCCESSFULLY_ENQUEUED);
363 }
364
365 tbb::flow::graph& graph_reference() __TBB_override {
366 return my_graph;
367 }
368
369 void validate() {
370 if ( my_map ) {
371 for ( size_t i = 0; i < (size_t)max_value; ++i ) {
372 size_t n = (*my_map)[(int)i];
373 ASSERT( n == num_copies, NULL );
374 }
375 } else {
376 size_t n = my_count;
377 ASSERT( n == num_copies*max_value, NULL );
378 }
379 }
380#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
381 typedef typename tbb::flow::receiver<T>::built_predecessors_type built_predecessors_type;
382 built_predecessors_type mbp;
383 built_predecessors_type &built_predecessors() __TBB_override { return mbp; }
384 typedef typename tbb::flow::receiver<T>::predecessor_list_type predecessor_list_type;
385 typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type;
386 void internal_add_built_predecessor(predecessor_type &) __TBB_override {}
387 void internal_delete_built_predecessor(predecessor_type &) __TBB_override {}
388 void copy_predecessors(predecessor_list_type &) __TBB_override { }
389 size_t predecessor_count() __TBB_override { return 0; }
390#endif
391 void reset_receiver(tbb::flow::reset_flags /*f*/) __TBB_override {
392 my_count = 0;
393 if(my_map) delete my_map;
394 my_map = new map_type;
395 }
396
397};
398
399//! Counts the number of puts received
400template< typename T >
401struct harness_counting_sender : public tbb::flow::sender<T>, NoCopy {
402
403 typedef typename tbb::flow::sender<T>::successor_type successor_type;
404 tbb::atomic< successor_type * > my_receiver;
405 tbb::atomic< size_t > my_count;
406 tbb::atomic< size_t > my_received;
407 size_t my_limit;
408
409 harness_counting_sender( ) : my_limit(~size_t(0)) {
410 my_receiver = NULL;
411 my_count = 0;
412 my_received = 0;
413 }
414
415 harness_counting_sender( size_t limit ) : my_limit(limit) {
416 my_receiver = NULL;
417 my_count = 0;
418 my_received = 0;
419 }
420
421 bool register_successor( successor_type &r ) __TBB_override {
422 my_receiver = &r;
423 return true;
424 }
425
426 bool remove_successor( successor_type &r ) __TBB_override {
427 successor_type *s = my_receiver.fetch_and_store( NULL );
428 ASSERT( s == &r, NULL );
429 return true;
430 }
431
432#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
433 typedef typename tbb::flow::sender<T>::successor_list_type successor_list_type;
434 typedef typename tbb::flow::sender<T>::built_successors_type built_successors_type;
435 built_successors_type bst;
436 built_successors_type &built_successors() __TBB_override { return bst; }
437 void internal_add_built_successor( successor_type &) __TBB_override {}
438 void internal_delete_built_successor( successor_type &) __TBB_override {}
439 void copy_successors(successor_list_type &) __TBB_override { }
440 size_t successor_count() __TBB_override { return 0; }
441#endif
442
443 bool try_get( T & v ) __TBB_override {
444 size_t i = my_count.fetch_and_increment();
445 if ( i < my_limit ) {
446 v = T( i );
447 ++my_received;
448 return true;
449 } else {
450 return false;
451 }
452 }
453
454 bool try_put_once() {
455 successor_type *s = my_receiver;
456 size_t i = my_count.fetch_and_increment();
457 if ( s->try_put( T(i) ) ) {
458 ++my_received;
459 return true;
460 } else {
461 return false;
462 }
463 }
464
465 void try_put_until_false() {
466 successor_type *s = my_receiver;
467 size_t i = my_count.fetch_and_increment();
468
469 while ( s->try_put( T(i) ) ) {
470 ++my_received;
471 i = my_count.fetch_and_increment();
472 }
473 }
474
475 void try_put_until_limit() {
476 successor_type *s = my_receiver;
477
478 for ( int i = 0; i < (int)my_limit; ++i ) {
479 ASSERT( s->try_put( T(i) ), NULL );
480 ++my_received;
481 }
482 ASSERT( my_received == my_limit, NULL );
483 }
484
485};
486
487// test for resets of buffer-type nodes.
488tbb::atomic<int> serial_fn_state0;
489tbb::atomic<int> serial_fn_state1;
490tbb::atomic<int> serial_continue_state0;
491
492template<typename T>
493struct serial_fn_body {
494 tbb::atomic<int> *_flag;
495 serial_fn_body(tbb::atomic<int> &myatomic) : _flag(&myatomic) { }
496 T operator()(const T& in) {
497 if(*_flag == 0) {
498 *_flag = 1;
499 // wait until we are released
500 tbb::internal::atomic_backoff backoff;
501 do {
502 backoff.pause();
503 } while(*_flag == 1);
504 }
505 // return value
506 return in;
507 }
508};
509
510template<typename T>
511struct serial_continue_body {
512 tbb::atomic<int> *_flag;
513 serial_continue_body(tbb::atomic<int> &myatomic) : _flag(&myatomic) {}
514 T operator()(const tbb::flow::continue_msg& /*in*/) {
515 // signal we have received a value
516 *_flag = 1;
517 // wait until we are released
518 tbb::internal::atomic_backoff backoff;
519 do {
520 backoff.pause();
521 } while(*_flag == 1);
522 // return value
523 return (T)1;
524 }
525};
526
527#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
528
529
530// walk two lists via iterator, match elements of each, in possibly-different ordder, and
531// return true if all elements of sv appear in tv.
532template<typename SV, typename TV>
533bool lists_match(SV &sv, TV &tv) {
534 if(sv.size() != tv.size()) {
535 return false;
536 }
537 std::vector<bool> bv(sv.size(), false);
538 for(typename TV::iterator itv = tv.begin(); itv != tv.end(); ++itv) {
539 int ibv = 0;
540 for(typename SV::iterator isv = sv.begin(); isv != sv.end(); ++isv) {
541 if(!bv[ibv]) {
542 if(*itv == *isv) {
543 bv[ibv] = true;
544 goto found_it;;
545 }
546 }
547 ++ibv;
548 }
549 return false;
550found_it:
551 continue;
552 }
553 return true;
554}
555#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
556
557template<typename T, typename BufferType>
558void test_resets() {
559 const int NN = 3;
560 tbb::task_scheduler_init init(4);
561 tbb::task_group_context tgc;
562 tbb::flow::graph g(tgc);
563 BufferType b0(g);
564 tbb::flow::queue_node<T> q0(g);
565 T j;
566 bool nFound[NN];
567
568 // reset empties buffer
569 for(T i = 0; i < NN; ++i) {
570 b0.try_put(i);
571 nFound[(int)i] = false;
572 }
573 g.wait_for_all();
574 g.reset();
575 ASSERT(!b0.try_get(j), "reset did not empty buffer");
576
577 // reset doesn't delete edge
578
579 tbb::flow::make_edge(b0,q0);
580 g.reset();
581 for(T i = 0; i < NN; ++i) {
582 b0.try_put(i);
583 }
584
585 g.wait_for_all();
586 for( T i = 0; i < NN; ++i) {
587 ASSERT(q0.try_get(j), "Missing value from buffer");
588 ASSERT(!nFound[(int)j], "Duplicate value found");
589 nFound[(int)j] = true;
590 }
591
592 for(int ii = 0; ii < NN; ++ii) {
593 ASSERT(nFound[ii], "missing value");
594 }
595 ASSERT(!q0.try_get(j), "Extra values in output");
596
597 // reset reverses a reversed edge.
598 // we will use a serial rejecting node to get the edge to reverse.
599 tbb::flow::function_node<T, T, tbb::flow::rejecting> sfn(g, tbb::flow::serial, serial_fn_body<T>(serial_fn_state0));
600 tbb::flow::queue_node<T> outq(g);
601 tbb::flow::remove_edge(b0,q0);
602 tbb::flow::make_edge(b0, sfn);
603 tbb::flow::make_edge(sfn,outq);
604 g.wait_for_all(); // wait for all the tasks started by building the graph are done.
605 serial_fn_state0 = 0;
606
607 // b0 ------> sfn ------> outq
608
609 for(int icnt = 0; icnt < 2; ++icnt) {
610 g.wait_for_all();
611 serial_fn_state0 = 0;
612 b0.try_put((T)0); // will start sfn
613 // wait until function_node starts
614 BACKOFF_WAIT(serial_fn_state0 == 0,"Timed out waiting for function_node to start");
615 // now the function_node is executing.
616 // this will start a task to forward the second item
617 // to the serial function node
618 b0.try_put((T)1); // first item will be consumed by task completing the execution
619 BACKOFF_WAIT_NOASSERT(g.root_task()->ref_count() >= 3,"Timed out waiting try_put task to wind down");
620 b0.try_put((T)2); // second item will remain after cancellation
621 // now wait for the task that attempts to forward the buffer item to
622 // complete.
623 BACKOFF_WAIT_NOASSERT(g.root_task()->ref_count() >= 3,"Timed out waiting for tasks to wind down");
624 // now cancel the graph.
625 ASSERT(tgc.cancel_group_execution(), "task group already cancelled");
626 serial_fn_state0 = 0; // release the function_node.
627 g.wait_for_all(); // wait for all the tasks to complete.
628 // check that at most one output reached the queue_node
629 T outt;
630 T outt2;
631 bool got_item1 = outq.try_get(outt);
632 bool got_item2 = outq.try_get(outt2);
633 // either the output queue was empty (if the function_node tested for cancellation before putting the
634 // result to the queue) or there was one element in the queue (the 0).
635 ASSERT(!got_item1 || ((int)outt == 0 && !got_item2), "incorrect output from function_node");
636 // the edge between the buffer and the function_node should be reversed, and the last
637 // message we put in the buffer should still be there. We can't directly test for the
638 // edge reversal.
639 got_item1 = b0.try_get(outt);
640 ASSERT(got_item1, " buffer lost a message");
641 ASSERT(2 == (int)outt || 1 == (int)outt, " buffer had incorrect message"); // the one not consumed by the node.
642 ASSERT(g.is_cancelled(), "Graph was not cancelled");
643 g.reset();
644 } // icnt
645
646 // reset with remove_edge removes edge. (icnt ==0 => forward edge, 1 => reversed edge
647 for(int icnt = 0; icnt < 2; ++icnt) {
648 if(icnt == 1) {
649 // set up reversed edge
650 tbb::flow::make_edge(b0, sfn);
651 tbb::flow::make_edge(sfn,outq);
652 serial_fn_state0 = 0;
653 b0.try_put((T)0); // starts up the function node
654 b0.try_put((T)1); // shoyuld reverse the edge
655 BACKOFF_WAIT(serial_fn_state0 == 0,"Timed out waiting for edge reversal");
656 ASSERT(tgc.cancel_group_execution(), "task group already cancelled");
657 serial_fn_state0 = 0; // release the function_node.
658 g.wait_for_all(); // wait for all the tasks to complete.
659 }
660 g.reset(tbb::flow::rf_clear_edges);
661 // test that no one is a successor to the buffer now.
662 serial_fn_state0 = 1; // let the function_node go if it gets an input message
663 b0.try_put((T)23);
664 g.wait_for_all();
665 ASSERT((int)serial_fn_state0 == 1, "function_node executed when it shouldn't");
666 T outt;
667 ASSERT(b0.try_get(outt) && (T)23 == outt, "node lost its input");
668 }
669}
670
671#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
672
673template< typename NODE_TYPE >
674class test_buffer_base_extract {
675protected:
676 tbb::flow::graph &g;
677 NODE_TYPE &in0;
678 NODE_TYPE &in1;
679 NODE_TYPE &middle;
680 NODE_TYPE &out0;
681 NODE_TYPE &out1;
682 NODE_TYPE *ins[2];
683 NODE_TYPE *outs[2];
684 typename NODE_TYPE::successor_type *ms_ptr;
685 typename NODE_TYPE::predecessor_type *mp_ptr;
686
687 typename NODE_TYPE::predecessor_list_type in0_p_list;
688 typename NODE_TYPE::successor_list_type in0_s_list;
689 typename NODE_TYPE::predecessor_list_type in1_p_list;
690 typename NODE_TYPE::successor_list_type in1_s_list;
691 typename NODE_TYPE::predecessor_list_type out0_p_list;
692 typename NODE_TYPE::successor_list_type out0_s_list;
693 typename NODE_TYPE::predecessor_list_type out1_p_list;
694 typename NODE_TYPE::successor_list_type out1_s_list;
695 typename NODE_TYPE::predecessor_list_type mp_list;
696 typename NODE_TYPE::predecessor_list_type::iterator mp_list_iter;
697 typename NODE_TYPE::successor_list_type ms_list;
698 typename NODE_TYPE::successor_list_type::iterator ms_list_iter;
699
700 virtual void set_up_lists() {
701 in0_p_list.clear();
702 in0_s_list.clear();
703 in1_p_list.clear();
704 in1_s_list.clear();
705 mp_list.clear();
706 ms_list.clear();
707 out0_p_list.clear();
708 out0_s_list.clear();
709 out1_p_list.clear();
710 out1_s_list.clear();
711 in0.copy_predecessors(in0_p_list);
712 in0.copy_successors(in0_s_list);
713 in1.copy_predecessors(in1_p_list);
714 in1.copy_successors(in1_s_list);
715 middle.copy_predecessors(mp_list);
716 middle.copy_successors(ms_list);
717 out0.copy_predecessors(out0_p_list);
718 out0.copy_successors(out0_s_list);
719 out1.copy_predecessors(out1_p_list);
720 out1.copy_successors(out1_s_list);
721 }
722
723 void make_and_validate_full_graph() {
724 /* in0 out0 */
725 /* \ / */
726 /* middle */
727 /* / \ */
728 /* in1 out1 */
729 tbb::flow::make_edge( in0, middle );
730 tbb::flow::make_edge( in1, middle );
731 tbb::flow::make_edge( middle, out0 );
732 tbb::flow::make_edge( middle, out1 );
733
734 set_up_lists();
735
736 ASSERT( in0.predecessor_count() == 0 && in0_p_list.size() == 0, "expected 0 predecessors" );
737 ASSERT( in0.successor_count() == 1 && in0_s_list.size() == 1 && *(in0_s_list.begin()) == ms_ptr, "expected 1 successor" );
738 ASSERT( in1.predecessor_count() == 0 && in1_p_list.size() == 0, "expected 0 predecessors" );
739 ASSERT( in1.successor_count() == 1 && in1_s_list.size() == 1 && *(in1_s_list.begin()) == ms_ptr, "expected 1 successor" );
740 ASSERT( middle.predecessor_count() == 2 && mp_list.size() == 2, "expected 2 predecessors" );
741 ASSERT( middle.successor_count() == 2 && ms_list.size() == 2, "expected 2 successors" );
742 ASSERT( out0.predecessor_count() == 1 && out0_p_list.size() == 1 && *(out0_p_list.begin()) == mp_ptr, "expected 1 predecessor" );
743 ASSERT( out0.successor_count() == 0 && out0_s_list.size() == 0, "expected 0 successors" );
744 ASSERT( out1.predecessor_count() == 1 && out1_p_list.size() == 1 && *(out1_p_list.begin()) == mp_ptr, "expected 1 predecessor" );
745 ASSERT( out1.successor_count() == 0 && out1_s_list.size() == 0, "expected 0 successors" );
746
747 int first_pred = *(mp_list.begin()) == ins[0] ? 0 : ( *(mp_list.begin()) == ins[1] ? 1 : -1 );
748 mp_list_iter = mp_list.begin(); ++mp_list_iter;
749 int second_pred = *mp_list_iter == ins[0] ? 0 : ( *mp_list_iter == ins[1] ? 1 : -1 );
750 ASSERT( first_pred != -1 && second_pred != -1 && first_pred != second_pred, "bad predecessor(s) for middle" );
751
752 int first_succ = *(ms_list.begin()) == outs[0] ? 0 : ( *(ms_list.begin()) == outs[1] ? 1 : -1 );
753 ms_list_iter = ++(ms_list.begin());
754 int second_succ = *ms_list_iter == outs[0] ? 0 : ( *ms_list_iter == outs[1] ? 1 : -1 );
755 ASSERT( first_succ != -1 && second_succ != -1 && first_succ != second_succ, "bad successor(s) for middle" );
756
757 in0.try_put(1);
758 in1.try_put(2);
759 g.wait_for_all();
760
761 int r = 0;
762 int v = 0;
763
764 ASSERT( in0.try_get(v) == false, "buffer should not have a value" );
765 ASSERT( in1.try_get(v) == false, "buffer should not have a value" );
766 ASSERT( middle.try_get(v) == false, "buffer should not have a value" );
767 while ( out0.try_get(v) ) {
768 ASSERT( (v == 1 || v == 2) && (v&r) == 0, "duplicate value" );
769 r |= v;
770 g.wait_for_all();
771 }
772 while ( out1.try_get(v) ) {
773 ASSERT( (v == 1 || v == 2) && (v&r) == 0, "duplicate value" );
774 r |= v;
775 g.wait_for_all();
776 }
777 ASSERT( r == 3, "not all values received" );
778 g.wait_for_all();
779 }
780
781 void validate_half_graph() {
782 /* in0 out0 */
783 /* */
784 /* middle */
785 /* / \ */
786 /* in1 out1 */
787 set_up_lists();
788
789 ASSERT( in0.predecessor_count() == 0 && in0_p_list.size() == 0, "expected 0 predecessors" );
790 ASSERT( in0.successor_count() == 0 && in0_s_list.size() == 0, "expected 0 successors" );
791 ASSERT( in1.predecessor_count() == 0 && in1_p_list.size() == 0, "expected 0 predecessors" );
792 ASSERT( in1.successor_count() == 1 && in1_s_list.size() == 1 && *(in1_s_list.begin()) == ms_ptr, "expected 1 successor" );
793 ASSERT( middle.predecessor_count() == 1 && mp_list.size() == 1, "expected 1 predecessor" );
794 ASSERT( middle.successor_count() == 1 && ms_list.size() == 1, "expected 1 successor" );
795 ASSERT( out0.predecessor_count() == 0 && out0_p_list.size() == 0, "expected 0 predecessors" );
796 ASSERT( out0.successor_count() == 0 && out0_s_list.size() == 0, "expected 0 successors" );
797 ASSERT( out1.predecessor_count() == 1 && out1_p_list.size() == 1 && *(out1_p_list.begin()) == mp_ptr, "expected 1 predecessor" );
798 ASSERT( out1.successor_count() == 0 && out1_s_list.size() == 0, "expected 0 successors" );
799
800 ASSERT( middle.predecessor_count() == 1 && mp_list.size() == 1, "expected two predecessors" );
801 ASSERT( middle.successor_count() == 1 && ms_list.size() == 1, "expected two successors" );
802
803 ASSERT( *(mp_list.begin()) == ins[1], "incorrect predecessor" );
804 ASSERT( *(ms_list.begin()) == outs[1], "incorrect successor" );
805
806 in0.try_put(1);
807 in1.try_put(2);
808 g.wait_for_all();
809
810 int v = 0;
811 ASSERT( in0.try_get(v) == true && v == 1, "buffer should have a value of 1" );
812 ASSERT( in1.try_get(v) == false, "buffer should not have a value" );
813 ASSERT( middle.try_get(v) == false, "buffer should not have a value" );
814 ASSERT( out0.try_get(v) == false, "buffer should not have a value" );
815 ASSERT( out1.try_get(v) == true && v == 2, "buffer should have a value of 2" );
816 g.wait_for_all();
817 }
818
819 void validate_empty_graph() {
820 /* in0 out0 */
821 /* */
822 /* middle */
823 /* */
824 /* in1 out1 */
825 set_up_lists();
826
827 ASSERT( in0.predecessor_count() == 0 && in0_p_list.size() == 0, "expected 0 predecessors" );
828 ASSERT( in0.successor_count() == 0 && in0_s_list.size() == 0, "expected 0 successors" );
829 ASSERT( in1.predecessor_count() == 0 && in1_p_list.size() == 0, "expected 0 predecessors" );
830 ASSERT( in1.successor_count() == 0 && in1_s_list.size() == 0, "expected 0 successors" );
831 ASSERT( middle.predecessor_count() == 0 && mp_list.size() == 0, "expected 0 predecessors" );
832 ASSERT( middle.successor_count() == 0 && ms_list.size() == 0, "expected 0 successors" );
833 ASSERT( out0.predecessor_count() == 0 && out0_p_list.size() == 0, "expected 0 predecessors" );
834 ASSERT( out0.successor_count() == 0 && out0_s_list.size() == 0, "expected 0 successors" );
835 ASSERT( out1.predecessor_count() == 0 && out1_p_list.size() == 0, "expected 0 predecessors" );
836 ASSERT( out1.successor_count() == 0 && out1_s_list.size() == 0, "expected 0 successors" );
837
838 ASSERT( middle.predecessor_count() == 0 && mp_list.size() == 0, "expected 0 predecessors" );
839 ASSERT( middle.successor_count() == 0 && ms_list.size() == 0, "expected 0 successors" );
840
841 in0.try_put(1);
842 in1.try_put(2);
843 g.wait_for_all();
844
845 int v = 0;
846 ASSERT( in0.try_get(v) == true && v == 1, "buffer should have a value of 1" );
847 ASSERT( in1.try_get(v) == true && v == 2, "buffer should have a value of 2" );
848 ASSERT( middle.try_get(v) == false, "buffer should not have a value" );
849 ASSERT( out0.try_get(v) == false, "buffer should not have a value" );
850 ASSERT( out1.try_get(v) == false, "buffer should not have a value" );
851 g.wait_for_all();
852 }
853
854 // forbid the ecompiler generation of operator= (VS2012 warning)
855 test_buffer_base_extract& operator=(test_buffer_base_extract & /*other*/);
856
857public:
858
859 test_buffer_base_extract(tbb::flow::graph &_g, NODE_TYPE &i0, NODE_TYPE &i1, NODE_TYPE &m, NODE_TYPE &o0, NODE_TYPE &o1) :
860 g(_g), in0(i0), in1(i1), middle(m), out0(o0), out1(o1) {
861 ins[0] = &in0;
862 ins[1] = &in1;
863 outs[0] = &out0;
864 outs[1] = &out1;
865 ms_ptr = static_cast< typename NODE_TYPE::successor_type * >(&middle);
866 mp_ptr = static_cast< typename NODE_TYPE::predecessor_type *>(&middle);
867 }
868
869 virtual ~test_buffer_base_extract() {}
870
871 void run_tests() {
872 make_and_validate_full_graph();
873
874 in0.extract();
875 out0.extract();
876 validate_half_graph();
877
878 in1.extract();
879 out1.extract();
880 validate_empty_graph();
881
882 make_and_validate_full_graph();
883
884 middle.extract();
885 validate_empty_graph();
886
887 make_and_validate_full_graph();
888 }
889
890};
891
892template< typename NODE_TYPE >
893class test_buffer_extract : public test_buffer_base_extract<NODE_TYPE> {
894protected:
895 tbb::flow::graph my_g;
896 NODE_TYPE my_in0;
897 NODE_TYPE my_in1;
898 NODE_TYPE my_middle;
899 NODE_TYPE my_out0;
900 NODE_TYPE my_out1;
901public:
902 test_buffer_extract() : test_buffer_base_extract<NODE_TYPE>( my_g, my_in0, my_in1, my_middle, my_out0, my_out1),
903 my_in0(my_g), my_in1(my_g), my_middle(my_g), my_out0(my_g), my_out1(my_g) { }
904};
905
906template< >
907class test_buffer_extract< tbb::flow::sequencer_node<int> > : public test_buffer_base_extract< tbb::flow::sequencer_node<int> > {
908protected:
909 typedef tbb::flow::sequencer_node<int> my_node_t;
910 tbb::flow::graph my_g;
911 my_node_t my_in0;
912 my_node_t my_in1;
913 my_node_t my_middle;
914 my_node_t my_out0;
915 my_node_t my_out1;
916
917 typedef tbb::atomic<size_t> count_t;
918 count_t middle_count;
919 count_t out0_count;
920 count_t out1_count;
921
922 struct always_zero { size_t operator()(int) { return 0; } };
923 struct always_inc {
924 count_t *c;
925 always_inc(count_t &_c) : c(&_c) {}
926 size_t operator()(int) {
927 return c->fetch_and_increment();
928 }
929 };
930
931 void set_up_lists() __TBB_override {
932 middle_count = 0;
933 out0_count = 0;
934 out1_count = 0;
935 my_g.reset(); // reset the sequencer nodes to start at 0 again
936 test_buffer_base_extract< my_node_t >::set_up_lists();
937 }
938
939
940public:
941 test_buffer_extract() : test_buffer_base_extract<my_node_t>( my_g, my_in0, my_in1, my_middle, my_out0, my_out1),
942 my_in0(my_g, always_zero()), my_in1(my_g, always_zero()), my_middle(my_g, always_inc(middle_count)),
943 my_out0(my_g, always_inc(out0_count)), my_out1(my_g, always_inc(out1_count)) {
944 }
945};
946
947// test for simple node that has one input, one output (overwrite_node, write_once_node, limiter_node)
948// decrement tests have to be done separately.
949template<template< class > class NType, typename ItemType>
950void test_extract_on_node() {
951 tbb::flow::graph g;
952 ItemType dont_care;
953 NType<ItemType> node0(g);
954 tbb::flow::queue_node<ItemType> q0(g);
955 tbb::flow::queue_node<ItemType> q1(g);
956 tbb::flow::queue_node<ItemType> q2(g);
957 for( int i = 0; i < 2; ++i) {
958 tbb::flow::make_edge(q0,node0);
959 tbb::flow::make_edge(q1,node0);
960 tbb::flow::make_edge(node0, q2);
961 q0.try_put(ItemType(i));
962 g.wait_for_all();
963
964 /* q0 */
965 /* \ */
966 /* \ */
967 /* node0 -- q2 */
968 /* / */
969 /* / */
970 /* q1 */
971
972 ASSERT(node0.predecessor_count() == 2 && q0.successor_count() == 1 && q1.successor_count() == 1, "bad predecessor count");
973 ASSERT(node0.successor_count() == 1 && q2.predecessor_count() == 1, "bad successor count");
974
975 ASSERT(q2.try_get(dont_care) && int(dont_care) == i, "item not forwarded");
976 typename NType<ItemType>::successor_list_type sv, sv1;
977 typename NType<ItemType>::predecessor_list_type pv, pv1;
978
979 pv1.push_back(&q0);
980 pv1.push_back(&q1);
981 sv1.push_back(&q2);
982 node0.copy_predecessors(pv);
983 node0.copy_successors(sv);
984 ASSERT(lists_match(pv,pv1), "predecessor vector incorrect");
985 ASSERT(lists_match(sv,sv1), "successor vector incorrect");
986
987 if(i == 0) {
988 node0.extract();
989 }
990 else {
991 q0.extract();
992 q1.extract();
993 q2.extract();
994 }
995
996 q0.try_put(ItemType(2));
997 g.wait_for_all();
998 ASSERT(!q2.try_get(dont_care), "node0 not disconnected");
999 ASSERT(q0.try_get(dont_care), "q0 empty (should have one item)");
1000
1001 node0.copy_predecessors(pv);
1002 node0.copy_successors(sv);
1003 ASSERT(node0.predecessor_count() == 0 && q0.successor_count() == 0 && q1.successor_count() == 0, "error in pred count after extract");
1004 ASSERT(pv.size() == 0, "error in pred array count after extract");
1005 ASSERT(node0.successor_count() == 0 && q2.predecessor_count() == 0, "error in succ count after extract");
1006 ASSERT(sv.size() == 0, "error in succ array count after extract");
1007 g.wait_for_all();
1008 }
1009}
1010
1011#endif // TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1012
1013template<typename NodeType>
1014void test_input_ports_return_ref(NodeType& mip_node) {
1015 typename NodeType::input_ports_type& input_ports1 = mip_node.input_ports();
1016 typename NodeType::input_ports_type& input_ports2 = mip_node.input_ports();
1017 ASSERT(&input_ports1 == &input_ports2, "input_ports() should return reference");
1018}
1019
1020template<typename NodeType>
1021void test_output_ports_return_ref(NodeType& mop_node) {
1022 typename NodeType::output_ports_type& output_ports1 = mop_node.output_ports();
1023 typename NodeType::output_ports_type& output_ports2 = mop_node.output_ports();
1024 ASSERT(&output_ports1 == &output_ports2, "output_ports() should return reference");
1025}
1026
1027template< template <typename> class ReservingNodeType, typename DataType, bool DoClear >
1028class harness_reserving_body : NoAssign {
1029 ReservingNodeType<DataType> &my_reserving_node;
1030 tbb::flow::buffer_node<DataType> &my_buffer_node;
1031public:
1032 harness_reserving_body(ReservingNodeType<DataType> &reserving_node, tbb::flow::buffer_node<DataType> &bn) : my_reserving_node(reserving_node), my_buffer_node(bn) {}
1033 void operator()(DataType i) const {
1034 my_reserving_node.try_put(i);
1035#if _MSC_VER && !__INTEL_COMPILER
1036#pragma warning (push)
1037#pragma warning (disable: 4127) /* suppress conditional expression is constant */
1038#endif
1039 if (DoClear) {
1040#if _MSC_VER && !__INTEL_COMPILER
1041#pragma warning (pop)
1042#endif
1043 my_reserving_node.clear();
1044 }
1045 my_buffer_node.try_put(i);
1046 my_reserving_node.try_put(i);
1047 }
1048};
1049
1050template< template <typename> class ReservingNodeType, typename DataType >
1051void test_reserving_nodes() {
1052 const size_t N = 300;
1053
1054 tbb::flow::graph g;
1055
1056 ReservingNodeType<DataType> reserving_n(g);
1057
1058 tbb::flow::buffer_node<DataType> buffering_n(g);
1059 tbb::flow::join_node< tbb::flow::tuple<DataType, DataType>, tbb::flow::reserving > join_n(g);
1060 harness_counting_receiver< tbb::flow::tuple<DataType, DataType> > end_receiver(g);
1061
1062 tbb::flow::make_edge(reserving_n, tbb::flow::input_port<0>(join_n));
1063 tbb::flow::make_edge(buffering_n, tbb::flow::input_port<1>(join_n));
1064 tbb::flow::make_edge(join_n, end_receiver);
1065
1066 NativeParallelFor(N, harness_reserving_body<ReservingNodeType, DataType, false>(reserving_n, buffering_n));
1067 g.wait_for_all();
1068
1069 ASSERT(end_receiver.my_count == N, NULL);
1070
1071 // Should not hang
1072 NativeParallelFor(N, harness_reserving_body<ReservingNodeType, DataType, true>(reserving_n, buffering_n));
1073 g.wait_for_all();
1074
1075 ASSERT(end_receiver.my_count == 2 * N, NULL);
1076}
1077
1078namespace lightweight_testing {
1079
1080typedef tbb::flow::tuple<int, int> output_tuple_type;
1081
1082template<typename NodeType>
1083class native_loop_body : NoAssign {
1084 NodeType& my_node;
1085public:
1086 native_loop_body(NodeType& node) : my_node(node) {}
1087
1088 void operator()(int) const {
1089 tbb::tbb_thread::id this_id = tbb::this_tbb_thread::get_id();
1090 my_node.try_put(this_id);
1091 }
1092};
1093
1094class concurrency_checker_body {
1095public:
1096 tbb::atomic<unsigned> my_body_count;
1097
1098 concurrency_checker_body() {
1099 my_body_count = 0;
1100 }
1101
1102 template<typename gateway_type>
1103 void operator()(const tbb::tbb_thread::id& input, gateway_type&) {
1104 increase_and_check(input);
1105 }
1106
1107 output_tuple_type operator()(const tbb::tbb_thread::id& input) {
1108 increase_and_check(input);
1109 return output_tuple_type();
1110 }
1111
1112private:
1113 void increase_and_check(const tbb::tbb_thread::id& input) {
1114 ++my_body_count;
1115 tbb::tbb_thread::id body_thread_id = tbb::this_tbb_thread::get_id();
1116 ASSERT(input == body_thread_id, "Body executed as not lightweight");
1117 }
1118};
1119
1120template<typename NodeType>
1121void test_unlimited_lightweight_execution(unsigned N) {
1122 tbb::flow::graph g;
1123 NodeType node(g, tbb::flow::unlimited, concurrency_checker_body());
1124
1125 NativeParallelFor(N, native_loop_body<NodeType>(node));
1126 g.wait_for_all();
1127
1128 concurrency_checker_body body = tbb::flow::copy_body<concurrency_checker_body>(node);
1129 ASSERT(body.my_body_count == N, "Body needs to be executed N times");
1130}
1131
1132// Using TBB implementation of condition variable
1133// not to include std header, which has problems with old GCC
1134using tbb::interface5::condition_variable;
1135using tbb::interface5::unique_lock;
1136
1137tbb::mutex m;
1138condition_variable lightweight_condition;
1139bool work_submitted;
1140bool lightweight_work_processed;
1141
1142template<typename NodeType>
1143class native_loop_limited_body : NoAssign {
1144 NodeType& my_node;
1145 Harness::SpinBarrier& my_barrier;
1146public:
1147 native_loop_limited_body(NodeType& node, Harness::SpinBarrier& barrier):
1148 my_node(node), my_barrier(barrier) {}
1149 void operator()(int) const {
1150 tbb::tbb_thread::id this_id = tbb::this_tbb_thread::get_id();
1151 my_node.try_put(this_id);
1152 if(!lightweight_work_processed) {
1153 my_barrier.wait();
1154 work_submitted = true;
1155 lightweight_condition.notify_all();
1156 }
1157 }
1158};
1159
1160struct condition_predicate {
1161 bool operator()() {
1162 return work_submitted;
1163 }
1164};
1165
1166class limited_lightweight_checker_body {
1167public:
1168 tbb::atomic<unsigned> my_body_count;
1169 tbb::atomic<unsigned> my_lightweight_count;
1170 tbb::atomic<unsigned> my_task_count;
1171 limited_lightweight_checker_body() {
1172 my_body_count = 0;
1173 my_lightweight_count = 0;
1174 my_task_count = 0;
1175 }
1176private:
1177 void increase_and_check(const tbb::tbb_thread::id& /*input*/) {
1178 ++my_body_count;
1179 bool is_task = tbb::task::self().state() == tbb::task::executing;
1180 if(is_task) {
1181 ++my_task_count;
1182 } else {
1183 unique_lock<tbb::mutex> lock(m);
1184 lightweight_condition.wait(lock, condition_predicate());
1185 ++my_lightweight_count;
1186 lightweight_work_processed = true;
1187 }
1188 }
1189public:
1190 template<typename gateway_type>
1191 void operator()(const tbb::tbb_thread::id& input, gateway_type&) {
1192 increase_and_check(input);
1193 }
1194 output_tuple_type operator()(const tbb::tbb_thread::id& input) {
1195 increase_and_check(input);
1196 return output_tuple_type();
1197 }
1198};
1199
1200template<typename NodeType>
1201void test_limited_lightweight_execution(unsigned N, unsigned concurrency) {
1202 ASSERT(concurrency != tbb::flow::unlimited,
1203 "Test for limited concurrency cannot be called with unlimited concurrency argument");
1204 tbb::flow::graph g;
1205 NodeType node(g, concurrency, limited_lightweight_checker_body());
1206 // Execute first body as lightweight, then wait for all other threads to fill internal buffer.
1207 // Then unblock the lightweightd thread and check if other body executions are inside tbb task.
1208 Harness::SpinBarrier barrier(N - concurrency);
1209 NativeParallelFor(N, native_loop_limited_body<NodeType>(node, barrier));
1210 g.wait_for_all();
1211 limited_lightweight_checker_body body = tbb::flow::copy_body<limited_lightweight_checker_body>(node);
1212 ASSERT(body.my_body_count == N, "Body needs to be executed N times");
1213 ASSERT(body.my_lightweight_count == concurrency, "Body needs to be executed as lightweight once");
1214 ASSERT(body.my_task_count == N - concurrency, "Body needs to be executed as not lightweight N - 1 times");
1215 work_submitted = false;
1216 lightweight_work_processed = false;
1217}
1218
1219template<typename NodeType>
1220void test_lightweight(unsigned N) {
1221 test_unlimited_lightweight_execution<NodeType>(N);
1222 test_limited_lightweight_execution<NodeType>(N, tbb::flow::serial);
1223 test_limited_lightweight_execution<NodeType>(N, (std::min)(tbb::tbb_thread::hardware_concurrency() / 2, N/2));
1224}
1225
1226template<template<typename, typename, typename, typename> class NodeType>
1227void test(unsigned N) {
1228 typedef tbb::tbb_thread::id input_type;
1229 typedef tbb::cache_aligned_allocator<input_type> allocator_type;
1230 typedef NodeType<input_type, output_tuple_type, tbb::flow::queueing_lightweight, allocator_type> node_type;
1231 test_lightweight<node_type>(N);
1232}
1233
1234}
1235
1236#endif
1237