1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef __TBB_flow_graph_H
18#define __TBB_flow_graph_H
19
20#include "tbb_stddef.h"
21#include "atomic.h"
22#include "spin_mutex.h"
23#include "null_mutex.h"
24#include "spin_rw_mutex.h"
25#include "null_rw_mutex.h"
26#include "task.h"
27#include "cache_aligned_allocator.h"
28#include "tbb_exception.h"
29#include "internal/_template_helpers.h"
30#include "internal/_aggregator_impl.h"
31#include "tbb_profiling.h"
32#include "task_arena.h"
33
34#if __TBB_PREVIEW_ASYNC_MSG
35#include <vector> // std::vector in internal::async_storage
36#include <memory> // std::shared_ptr in async_msg
37#endif
38
39#if __TBB_PREVIEW_STREAMING_NODE
40// For streaming_node
41#include <array> // std::array
42#include <unordered_map> // std::unordered_map
43#include <type_traits> // std::decay, std::true_type, std::false_type
44#endif // __TBB_PREVIEW_STREAMING_NODE
45
46#if TBB_DEPRECATED_FLOW_ENQUEUE
47#define FLOW_SPAWN(a) tbb::task::enqueue((a))
48#else
49#define FLOW_SPAWN(a) tbb::task::spawn((a))
50#endif
51
52// use the VC10 or gcc version of tuple if it is available.
53#if __TBB_CPP11_TUPLE_PRESENT
54 #include <tuple>
55namespace tbb {
56 namespace flow {
57 using std::tuple;
58 using std::tuple_size;
59 using std::tuple_element;
60 using std::get;
61 }
62}
63#else
64 #include "compat/tuple"
65#endif
66
67#include<list>
68#include<queue>
69
70/** @file
71 \brief The graph related classes and functions
72
73 There are some applications that best express dependencies as messages
74 passed between nodes in a graph. These messages may contain data or
75 simply act as signals that a predecessors has completed. The graph
76 class and its associated node classes can be used to express such
77 applications.
78*/
79
80namespace tbb {
81namespace flow {
82
83//! An enumeration the provides the two most common concurrency levels: unlimited and serial
84enum concurrency { unlimited = 0, serial = 1 };
85
86namespace interface10 {
87
88//! A generic null type
89struct null_type {};
90
91//! An empty class used for messages that mean "I'm done"
92class continue_msg {};
93
94//! Forward declaration section
95template< typename T > class sender;
96template< typename T > class receiver;
97class continue_receiver;
98} // namespaceX
99namespace interface11 {
100template< typename T, typename U > class limiter_node; // needed for resetting decrementer
101}
102namespace interface10 {
103template< typename R, typename B > class run_and_put_task;
104
105namespace internal {
106
107template<typename T, typename M> class successor_cache;
108template<typename T, typename M> class broadcast_cache;
109template<typename T, typename M> class round_robin_cache;
110template<typename T, typename M> class predecessor_cache;
111template<typename T, typename M> class reservable_predecessor_cache;
112
113#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
114// Holder of edges both for caches and for those nodes which do not have predecessor caches.
115// C == receiver< ... > or sender< ... >, depending.
116template<typename C>
117class edge_container {
118
119public:
120 typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type;
121
122 void add_edge(C &s) {
123 built_edges.push_back(&s);
124 }
125
126 void delete_edge(C &s) {
127 for (typename edge_list_type::iterator i = built_edges.begin(); i != built_edges.end(); ++i) {
128 if (*i == &s) {
129 (void)built_edges.erase(i);
130 return; // only remove one predecessor per request
131 }
132 }
133 }
134
135 void copy_edges(edge_list_type &v) {
136 v = built_edges;
137 }
138
139 size_t edge_count() {
140 return (size_t)(built_edges.size());
141 }
142
143 void clear() {
144 built_edges.clear();
145 }
146
147 // methods remove the statement from all predecessors/successors liste in the edge
148 // container.
149 template< typename S > void sender_extract(S &s);
150 template< typename R > void receiver_extract(R &r);
151
152private:
153 edge_list_type built_edges;
154}; // class edge_container
155#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
156
157} // namespace internal
158
159} // namespace interface10
160} // namespace flow
161} // namespace tbb
162
163//! The graph class
164#include "internal/_flow_graph_impl.h"
165
166namespace tbb {
167namespace flow {
168namespace interface10 {
169
170// enqueue left task if necessary. Returns the non-enqueued task if there is one.
171static inline tbb::task *combine_tasks(graph& g, tbb::task * left, tbb::task * right) {
172 // if no RHS task, don't change left.
173 if (right == NULL) return left;
174 // right != NULL
175 if (left == NULL) return right;
176 if (left == SUCCESSFULLY_ENQUEUED) return right;
177 // left contains a task
178 if (right != SUCCESSFULLY_ENQUEUED) {
179 // both are valid tasks
180 internal::spawn_in_graph_arena(g, *left);
181 return right;
182 }
183 return left;
184}
185
186#if __TBB_PREVIEW_ASYNC_MSG
187
188template < typename T > class async_msg;
189
190namespace internal {
191
192template < typename T > class async_storage;
193
194template< typename T, typename = void >
195struct async_helpers {
196 typedef async_msg<T> async_type;
197 typedef T filtered_type;
198
199 static const bool is_async_type = false;
200
201 static const void* to_void_ptr(const T& t) {
202 return static_cast<const void*>(&t);
203 }
204
205 static void* to_void_ptr(T& t) {
206 return static_cast<void*>(&t);
207 }
208
209 static const T& from_void_ptr(const void* p) {
210 return *static_cast<const T*>(p);
211 }
212
213 static T& from_void_ptr(void* p) {
214 return *static_cast<T*>(p);
215 }
216
217 static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
218 if (is_async) {
219 // This (T) is NOT async and incoming 'A<X> t' IS async
220 // Get data from async_msg
221 const async_msg<filtered_type>& msg = async_helpers< async_msg<filtered_type> >::from_void_ptr(p);
222 task* const new_task = msg.my_storage->subscribe(*this_recv, this_recv->graph_reference());
223 // finalize() must be called after subscribe() because set() can be called in finalize()
224 // and 'this_recv' client must be subscribed by this moment
225 msg.finalize();
226 return new_task;
227 }
228 else {
229 // Incoming 't' is NOT async
230 return this_recv->try_put_task(from_void_ptr(p));
231 }
232 }
233};
234
235template< typename T >
236struct async_helpers< T, typename std::enable_if< std::is_base_of<async_msg<typename T::async_msg_data_type>, T>::value >::type > {
237 typedef T async_type;
238 typedef typename T::async_msg_data_type filtered_type;
239
240 static const bool is_async_type = true;
241
242 // Receiver-classes use const interfaces
243 static const void* to_void_ptr(const T& t) {
244 return static_cast<const void*>(&static_cast<const async_msg<filtered_type>&>(t));
245 }
246
247 static void* to_void_ptr(T& t) {
248 return static_cast<void*>(&static_cast<async_msg<filtered_type>&>(t));
249 }
250
251 // Sender-classes use non-const interfaces
252 static const T& from_void_ptr(const void* p) {
253 return *static_cast<const T*>(static_cast<const async_msg<filtered_type>*>(p));
254 }
255
256 static T& from_void_ptr(void* p) {
257 return *static_cast<T*>(static_cast<async_msg<filtered_type>*>(p));
258 }
259
260 // Used in receiver<T> class
261 static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
262 if (is_async) {
263 // Both are async
264 return this_recv->try_put_task(from_void_ptr(p));
265 }
266 else {
267 // This (T) is async and incoming 'X t' is NOT async
268 // Create async_msg for X
269 const filtered_type& t = async_helpers<filtered_type>::from_void_ptr(p);
270 const T msg(t);
271 return this_recv->try_put_task(msg);
272 }
273 }
274};
275
276class untyped_receiver;
277
278class untyped_sender {
279 template< typename, typename > friend class internal::predecessor_cache;
280 template< typename, typename > friend class internal::reservable_predecessor_cache;
281public:
282 //! The successor type for this node
283 typedef untyped_receiver successor_type;
284
285 virtual ~untyped_sender() {}
286
287 // NOTE: Following part of PUBLIC section is copy-paste from original sender<T> class
288
289 // TODO: Prevent untyped successor registration
290
291 //! Add a new successor to this node
292 virtual bool register_successor( successor_type &r ) = 0;
293
294 //! Removes a successor from this node
295 virtual bool remove_successor( successor_type &r ) = 0;
296
297 //! Releases the reserved item
298 virtual bool try_release( ) { return false; }
299
300 //! Consumes the reserved item
301 virtual bool try_consume( ) { return false; }
302
303#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
304 //! interface to record edges for traversal & deletion
305 typedef internal::edge_container<successor_type> built_successors_type;
306 typedef built_successors_type::edge_list_type successor_list_type;
307 virtual built_successors_type &built_successors() = 0;
308 virtual void internal_add_built_successor( successor_type & ) = 0;
309 virtual void internal_delete_built_successor( successor_type & ) = 0;
310 virtual void copy_successors( successor_list_type &) = 0;
311 virtual size_t successor_count() = 0;
312#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
313protected:
314 //! Request an item from the sender
315 template< typename X >
316 bool try_get( X &t ) {
317 return try_get_wrapper( internal::async_helpers<X>::to_void_ptr(t), internal::async_helpers<X>::is_async_type );
318 }
319
320 //! Reserves an item in the sender
321 template< typename X >
322 bool try_reserve( X &t ) {
323 return try_reserve_wrapper( internal::async_helpers<X>::to_void_ptr(t), internal::async_helpers<X>::is_async_type );
324 }
325
326 virtual bool try_get_wrapper( void* p, bool is_async ) = 0;
327 virtual bool try_reserve_wrapper( void* p, bool is_async ) = 0;
328};
329
330class untyped_receiver {
331 template< typename, typename > friend class run_and_put_task;
332
333 template< typename, typename > friend class internal::broadcast_cache;
334 template< typename, typename > friend class internal::round_robin_cache;
335 template< typename, typename > friend class internal::successor_cache;
336
337#if __TBB_PREVIEW_OPENCL_NODE
338 template< typename, typename > friend class proxy_dependency_receiver;
339#endif /* __TBB_PREVIEW_OPENCL_NODE */
340public:
341 //! The predecessor type for this node
342 typedef untyped_sender predecessor_type;
343
344 //! Destructor
345 virtual ~untyped_receiver() {}
346
347 //! Put an item to the receiver
348 template<typename X>
349 bool try_put(const X& t) {
350 task *res = try_put_task(t);
351 if (!res) return false;
352 if (res != SUCCESSFULLY_ENQUEUED) internal::spawn_in_graph_arena(graph_reference(), *res);
353 return true;
354 }
355
356 // NOTE: Following part of PUBLIC section is copy-paste from original receiver<T> class
357
358 // TODO: Prevent untyped predecessor registration
359
360 //! Add a predecessor to the node
361 virtual bool register_predecessor( predecessor_type & ) { return false; }
362
363 //! Remove a predecessor from the node
364 virtual bool remove_predecessor( predecessor_type & ) { return false; }
365
366#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
367 typedef internal::edge_container<predecessor_type> built_predecessors_type;
368 typedef built_predecessors_type::edge_list_type predecessor_list_type;
369 virtual built_predecessors_type &built_predecessors() = 0;
370 virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
371 virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
372 virtual void copy_predecessors( predecessor_list_type & ) = 0;
373 virtual size_t predecessor_count() = 0;
374#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
375protected:
376 template<typename X>
377 task *try_put_task(const X& t) {
378 return try_put_task_wrapper( internal::async_helpers<X>::to_void_ptr(t), internal::async_helpers<X>::is_async_type );
379 }
380
381 virtual task* try_put_task_wrapper( const void* p, bool is_async ) = 0;
382
383 virtual graph& graph_reference() = 0;
384
385 // NOTE: Following part of PROTECTED and PRIVATE sections is copy-paste from original receiver<T> class
386
387 //! put receiver back in initial state
388 virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
389
390 virtual bool is_continue_receiver() { return false; }
391};
392
393} // namespace internal
394
395//! Pure virtual template class that defines a sender of messages of type T
396template< typename T >
397class sender : public internal::untyped_sender {
398public:
399 //! The output type of this sender
400 typedef T output_type;
401
402 typedef typename internal::async_helpers<T>::filtered_type filtered_type;
403
404 //! Request an item from the sender
405 virtual bool try_get( T & ) { return false; }
406
407 //! Reserves an item in the sender
408 virtual bool try_reserve( T & ) { return false; }
409
410protected:
411 virtual bool try_get_wrapper( void* p, bool is_async ) __TBB_override {
412 // Both async OR both are NOT async
413 if ( internal::async_helpers<T>::is_async_type == is_async ) {
414 return try_get( internal::async_helpers<T>::from_void_ptr(p) );
415 }
416 // Else: this (T) is async OR incoming 't' is async
417 __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_get()");
418 return false;
419 }
420
421 virtual bool try_reserve_wrapper( void* p, bool is_async ) __TBB_override {
422 // Both async OR both are NOT async
423 if ( internal::async_helpers<T>::is_async_type == is_async ) {
424 return try_reserve( internal::async_helpers<T>::from_void_ptr(p) );
425 }
426 // Else: this (T) is async OR incoming 't' is async
427 __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_reserve()");
428 return false;
429 }
430}; // class sender<T>
431
432//! Pure virtual template class that defines a receiver of messages of type T
433template< typename T >
434class receiver : public internal::untyped_receiver {
435 template< typename > friend class internal::async_storage;
436 template< typename, typename > friend struct internal::async_helpers;
437public:
438 //! The input type of this receiver
439 typedef T input_type;
440
441 typedef typename internal::async_helpers<T>::filtered_type filtered_type;
442
443 //! Put an item to the receiver
444 bool try_put( const typename internal::async_helpers<T>::filtered_type& t ) {
445 return internal::untyped_receiver::try_put(t);
446 }
447
448 bool try_put( const typename internal::async_helpers<T>::async_type& t ) {
449 return internal::untyped_receiver::try_put(t);
450 }
451
452protected:
453 virtual task* try_put_task_wrapper( const void *p, bool is_async ) __TBB_override {
454 return internal::async_helpers<T>::try_put_task_wrapper_impl(this, p, is_async);
455 }
456
457 //! Put item to successor; return task to run the successor if possible.
458 virtual task *try_put_task(const T& t) = 0;
459
460}; // class receiver<T>
461
462#else // __TBB_PREVIEW_ASYNC_MSG
463
464//! Pure virtual template class that defines a sender of messages of type T
465template< typename T >
466class sender {
467public:
468 //! The output type of this sender
469 typedef T output_type;
470
471 //! The successor type for this node
472 typedef receiver<T> successor_type;
473
474 virtual ~sender() {}
475
476 // NOTE: Following part of PUBLIC section is partly copy-pasted in sender<T> under #if __TBB_PREVIEW_ASYNC_MSG
477
478 //! Add a new successor to this node
479 virtual bool register_successor( successor_type &r ) = 0;
480
481 //! Removes a successor from this node
482 virtual bool remove_successor( successor_type &r ) = 0;
483
484 //! Request an item from the sender
485 virtual bool try_get( T & ) { return false; }
486
487 //! Reserves an item in the sender
488 virtual bool try_reserve( T & ) { return false; }
489
490 //! Releases the reserved item
491 virtual bool try_release( ) { return false; }
492
493 //! Consumes the reserved item
494 virtual bool try_consume( ) { return false; }
495
496#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
497 //! interface to record edges for traversal & deletion
498 typedef typename internal::edge_container<successor_type> built_successors_type;
499 typedef typename built_successors_type::edge_list_type successor_list_type;
500 virtual built_successors_type &built_successors() = 0;
501 virtual void internal_add_built_successor( successor_type & ) = 0;
502 virtual void internal_delete_built_successor( successor_type & ) = 0;
503 virtual void copy_successors( successor_list_type &) = 0;
504 virtual size_t successor_count() = 0;
505#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
506}; // class sender<T>
507
508//! Pure virtual template class that defines a receiver of messages of type T
509template< typename T >
510class receiver {
511public:
512 //! The input type of this receiver
513 typedef T input_type;
514
515 //! The predecessor type for this node
516 typedef sender<T> predecessor_type;
517
518 //! Destructor
519 virtual ~receiver() {}
520
521 //! Put an item to the receiver
522 bool try_put( const T& t ) {
523 task *res = try_put_task(t);
524 if (!res) return false;
525 if (res != SUCCESSFULLY_ENQUEUED) internal::spawn_in_graph_arena(graph_reference(), *res);
526 return true;
527 }
528
529 //! put item to successor; return task to run the successor if possible.
530protected:
531 template< typename R, typename B > friend class run_and_put_task;
532 template< typename X, typename Y > friend class internal::broadcast_cache;
533 template< typename X, typename Y > friend class internal::round_robin_cache;
534 virtual task *try_put_task(const T& t) = 0;
535 virtual graph& graph_reference() = 0;
536public:
537 // NOTE: Following part of PUBLIC and PROTECTED sections is copy-pasted in receiver<T> under #if __TBB_PREVIEW_ASYNC_MSG
538
539 //! Add a predecessor to the node
540 virtual bool register_predecessor( predecessor_type & ) { return false; }
541
542 //! Remove a predecessor from the node
543 virtual bool remove_predecessor( predecessor_type & ) { return false; }
544
545#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
546 typedef typename internal::edge_container<predecessor_type> built_predecessors_type;
547 typedef typename built_predecessors_type::edge_list_type predecessor_list_type;
548 virtual built_predecessors_type &built_predecessors() = 0;
549 virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
550 virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
551 virtual void copy_predecessors( predecessor_list_type & ) = 0;
552 virtual size_t predecessor_count() = 0;
553#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
554
555protected:
556 //! put receiver back in initial state
557 virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
558
559 template<typename TT, typename M> friend class internal::successor_cache;
560 virtual bool is_continue_receiver() { return false; }
561
562#if __TBB_PREVIEW_OPENCL_NODE
563 template< typename, typename > friend class proxy_dependency_receiver;
564#endif /* __TBB_PREVIEW_OPENCL_NODE */
565}; // class receiver<T>
566
567#endif // __TBB_PREVIEW_ASYNC_MSG
568
569//! Base class for receivers of completion messages
570/** These receivers automatically reset, but cannot be explicitly waited on */
571class continue_receiver : public receiver< continue_msg > {
572public:
573
574 //! The input type
575 typedef continue_msg input_type;
576
577 //! The predecessor type for this node
578 typedef receiver<input_type>::predecessor_type predecessor_type;
579
580 //! Constructor
581 explicit continue_receiver(
582 __TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority)) {
583 my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
584 my_current_count = 0;
585 __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = priority; )
586 }
587
588 //! Copy constructor
589 continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
590 my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
591 my_current_count = 0;
592 __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = src.my_priority; )
593 }
594
595 //! Increments the trigger threshold
596 bool register_predecessor( predecessor_type & ) __TBB_override {
597 spin_mutex::scoped_lock l(my_mutex);
598 ++my_predecessor_count;
599 return true;
600 }
601
602 //! Decrements the trigger threshold
603 /** Does not check to see if the removal of the predecessor now makes the current count
604 exceed the new threshold. So removing a predecessor while the graph is active can cause
605 unexpected results. */
606 bool remove_predecessor( predecessor_type & ) __TBB_override {
607 spin_mutex::scoped_lock l(my_mutex);
608 --my_predecessor_count;
609 return true;
610 }
611
612#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
613 typedef internal::edge_container<predecessor_type> built_predecessors_type;
614 typedef built_predecessors_type::edge_list_type predecessor_list_type;
615 built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
616
617 void internal_add_built_predecessor( predecessor_type &s) __TBB_override {
618 spin_mutex::scoped_lock l(my_mutex);
619 my_built_predecessors.add_edge( s );
620 }
621
622 void internal_delete_built_predecessor( predecessor_type &s) __TBB_override {
623 spin_mutex::scoped_lock l(my_mutex);
624 my_built_predecessors.delete_edge(s);
625 }
626
627 void copy_predecessors( predecessor_list_type &v) __TBB_override {
628 spin_mutex::scoped_lock l(my_mutex);
629 my_built_predecessors.copy_edges(v);
630 }
631
632 size_t predecessor_count() __TBB_override {
633 spin_mutex::scoped_lock l(my_mutex);
634 return my_built_predecessors.edge_count();
635 }
636
637#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
638
639protected:
640 template< typename R, typename B > friend class run_and_put_task;
641 template<typename X, typename Y> friend class internal::broadcast_cache;
642 template<typename X, typename Y> friend class internal::round_robin_cache;
643 // execute body is supposed to be too small to create a task for.
644 task *try_put_task( const input_type & ) __TBB_override {
645 {
646 spin_mutex::scoped_lock l(my_mutex);
647 if ( ++my_current_count < my_predecessor_count )
648 return SUCCESSFULLY_ENQUEUED;
649 else
650 my_current_count = 0;
651 }
652 task * res = execute();
653 return res? res : SUCCESSFULLY_ENQUEUED;
654 }
655
656#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
657 // continue_receiver must contain its own built_predecessors because it does
658 // not have a node_cache.
659 built_predecessors_type my_built_predecessors;
660#endif
661 spin_mutex my_mutex;
662 int my_predecessor_count;
663 int my_current_count;
664 int my_initial_predecessor_count;
665 __TBB_FLOW_GRAPH_PRIORITY_EXPR( node_priority_t my_priority; )
666 // the friend declaration in the base class did not eliminate the "protected class"
667 // error in gcc 4.1.2
668 template<typename U, typename V> friend class tbb::flow::interface11::limiter_node;
669
670 void reset_receiver( reset_flags f ) __TBB_override {
671 my_current_count = 0;
672 if (f & rf_clear_edges) {
673#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
674 my_built_predecessors.clear();
675#endif
676 my_predecessor_count = my_initial_predecessor_count;
677 }
678 }
679
680 //! Does whatever should happen when the threshold is reached
681 /** This should be very fast or else spawn a task. This is
682 called while the sender is blocked in the try_put(). */
683 virtual task * execute() = 0;
684 template<typename TT, typename M> friend class internal::successor_cache;
685 bool is_continue_receiver() __TBB_override { return true; }
686
687}; // class continue_receiver
688
689} // interfaceX
690
691#if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
692 template <typename K, typename T>
693 K key_from_message( const T &t ) {
694 return t.key();
695 }
696#endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
697
698 using interface10::sender;
699 using interface10::receiver;
700 using interface10::continue_receiver;
701} // flow
702} // tbb
703
704#include "internal/_flow_graph_trace_impl.h"
705#include "internal/_tbb_hash_compare_impl.h"
706
707namespace tbb {
708namespace flow {
709namespace interface10 {
710
711#include "internal/_flow_graph_body_impl.h"
712#include "internal/_flow_graph_cache_impl.h"
713#include "internal/_flow_graph_types_impl.h"
714#if __TBB_PREVIEW_ASYNC_MSG
715#include "internal/_flow_graph_async_msg_impl.h"
716#endif
717using namespace internal::graph_policy_namespace;
718
719template <typename C, typename N>
720graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
721{
722 if (begin) current_node = my_graph->my_nodes;
723 //else it is an end iterator by default
724}
725
726template <typename C, typename N>
727typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
728 __TBB_ASSERT(current_node, "graph_iterator at end");
729 return *operator->();
730}
731
732template <typename C, typename N>
733typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
734 return current_node;
735}
736
737template <typename C, typename N>
738void graph_iterator<C,N>::internal_forward() {
739 if (current_node) current_node = current_node->next;
740}
741
742//! Constructs a graph with isolated task_group_context
743inline graph::graph() : my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
744 prepare_task_arena();
745 own_context = true;
746 cancelled = false;
747 caught_exception = false;
748 my_context = new task_group_context(tbb::internal::FLOW_TASKS);
749 my_root_task = (new (task::allocate_root(*my_context)) empty_task);
750 my_root_task->set_ref_count(1);
751 tbb::internal::fgt_graph(this);
752 my_is_active = true;
753}
754
755inline graph::graph(task_group_context& use_this_context) :
756 my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
757 prepare_task_arena();
758 own_context = false;
759 my_root_task = (new (task::allocate_root(*my_context)) empty_task);
760 my_root_task->set_ref_count(1);
761 tbb::internal::fgt_graph(this);
762 my_is_active = true;
763}
764
765inline graph::~graph() {
766 wait_for_all();
767 my_root_task->set_ref_count(0);
768 tbb::task::destroy(*my_root_task);
769 if (own_context) delete my_context;
770 delete my_task_arena;
771}
772
773inline void graph::reserve_wait() {
774 if (my_root_task) {
775 my_root_task->increment_ref_count();
776 tbb::internal::fgt_reserve_wait(this);
777 }
778}
779
780inline void graph::release_wait() {
781 if (my_root_task) {
782 tbb::internal::fgt_release_wait(this);
783 my_root_task->decrement_ref_count();
784 }
785}
786
787inline void graph::register_node(graph_node *n) {
788 n->next = NULL;
789 {
790 spin_mutex::scoped_lock lock(nodelist_mutex);
791 n->prev = my_nodes_last;
792 if (my_nodes_last) my_nodes_last->next = n;
793 my_nodes_last = n;
794 if (!my_nodes) my_nodes = n;
795 }
796}
797
798inline void graph::remove_node(graph_node *n) {
799 {
800 spin_mutex::scoped_lock lock(nodelist_mutex);
801 __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
802 if (n->prev) n->prev->next = n->next;
803 if (n->next) n->next->prev = n->prev;
804 if (my_nodes_last == n) my_nodes_last = n->prev;
805 if (my_nodes == n) my_nodes = n->next;
806 }
807 n->prev = n->next = NULL;
808}
809
810inline void graph::reset( reset_flags f ) {
811 // reset context
812 internal::deactivate_graph(*this);
813
814 if(my_context) my_context->reset();
815 cancelled = false;
816 caught_exception = false;
817 // reset all the nodes comprising the graph
818 for(iterator ii = begin(); ii != end(); ++ii) {
819 graph_node *my_p = &(*ii);
820 my_p->reset_node(f);
821 }
822 // Reattach the arena. Might be useful to run the graph in a particular task_arena
823 // while not limiting graph lifetime to a single task_arena::execute() call.
824 prepare_task_arena( /*reinit=*/true );
825 internal::activate_graph(*this);
826 // now spawn the tasks necessary to start the graph
827 for(task_list_type::iterator rti = my_reset_task_list.begin(); rti != my_reset_task_list.end(); ++rti) {
828 internal::spawn_in_graph_arena(*this, *(*rti));
829 }
830 my_reset_task_list.clear();
831}
832
833inline graph::iterator graph::begin() { return iterator(this, true); }
834
835inline graph::iterator graph::end() { return iterator(this, false); }
836
837inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
838
839inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
840
841inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
842
843inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
844
845#if TBB_PREVIEW_FLOW_GRAPH_TRACE
846inline void graph::set_name(const char *name) {
847 tbb::internal::fgt_graph_desc(this, name);
848}
849#endif
850
851inline graph_node::graph_node(graph& g) : my_graph(g) {
852 my_graph.register_node(this);
853}
854
855inline graph_node::~graph_node() {
856 my_graph.remove_node(this);
857}
858
859#include "internal/_flow_graph_node_impl.h"
860
861//! An executable node that acts as a source, i.e. it has no predecessors
862template < typename Output >
863class source_node : public graph_node, public sender< Output > {
864public:
865 //! The type of the output message, which is complete
866 typedef Output output_type;
867
868 //! The type of successors of this node
869 typedef typename sender<output_type>::successor_type successor_type;
870
871 //Source node has no input type
872 typedef null_type input_type;
873
874#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
875 typedef typename sender<output_type>::built_successors_type built_successors_type;
876 typedef typename sender<output_type>::successor_list_type successor_list_type;
877#endif
878
879 //! Constructor for a node with a successor
880 template< typename Body >
881 source_node( graph &g, Body body, bool is_active = true )
882 : graph_node(g), my_active(is_active), init_my_active(is_active),
883 my_body( new internal::source_body_leaf< output_type, Body>(body) ),
884 my_init_body( new internal::source_body_leaf< output_type, Body>(body) ),
885 my_reserved(false), my_has_cached_item(false)
886 {
887 my_successors.set_owner(this);
888 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
889 static_cast<sender<output_type> *>(this), this->my_body );
890 }
891
892 //! Copy constructor
893 source_node( const source_node& src ) :
894 graph_node(src.my_graph), sender<Output>(),
895 my_active(src.init_my_active),
896 init_my_active(src.init_my_active), my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
897 my_reserved(false), my_has_cached_item(false)
898 {
899 my_successors.set_owner(this);
900 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
901 static_cast<sender<output_type> *>(this), this->my_body );
902 }
903
904 //! The destructor
905 ~source_node() { delete my_body; delete my_init_body; }
906
907#if TBB_PREVIEW_FLOW_GRAPH_TRACE
908 void set_name( const char *name ) __TBB_override {
909 tbb::internal::fgt_node_desc( this, name );
910 }
911#endif
912
913 //! Add a new successor to this node
914 bool register_successor( successor_type &r ) __TBB_override {
915 spin_mutex::scoped_lock lock(my_mutex);
916 my_successors.register_successor(r);
917 if ( my_active )
918 spawn_put();
919 return true;
920 }
921
922 //! Removes a successor from this node
923 bool remove_successor( successor_type &r ) __TBB_override {
924 spin_mutex::scoped_lock lock(my_mutex);
925 my_successors.remove_successor(r);
926 return true;
927 }
928
929#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
930
931 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
932
933 void internal_add_built_successor( successor_type &r) __TBB_override {
934 spin_mutex::scoped_lock lock(my_mutex);
935 my_successors.internal_add_built_successor(r);
936 }
937
938 void internal_delete_built_successor( successor_type &r) __TBB_override {
939 spin_mutex::scoped_lock lock(my_mutex);
940 my_successors.internal_delete_built_successor(r);
941 }
942
943 size_t successor_count() __TBB_override {
944 spin_mutex::scoped_lock lock(my_mutex);
945 return my_successors.successor_count();
946 }
947
948 void copy_successors(successor_list_type &v) __TBB_override {
949 spin_mutex::scoped_lock l(my_mutex);
950 my_successors.copy_successors(v);
951 }
952#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
953
954 //! Request an item from the node
955 bool try_get( output_type &v ) __TBB_override {
956 spin_mutex::scoped_lock lock(my_mutex);
957 if ( my_reserved )
958 return false;
959
960 if ( my_has_cached_item ) {
961 v = my_cached_item;
962 my_has_cached_item = false;
963 return true;
964 }
965 // we've been asked to provide an item, but we have none. enqueue a task to
966 // provide one.
967 spawn_put();
968 return false;
969 }
970
971 //! Reserves an item.
972 bool try_reserve( output_type &v ) __TBB_override {
973 spin_mutex::scoped_lock lock(my_mutex);
974 if ( my_reserved ) {
975 return false;
976 }
977
978 if ( my_has_cached_item ) {
979 v = my_cached_item;
980 my_reserved = true;
981 return true;
982 } else {
983 return false;
984 }
985 }
986
987 //! Release a reserved item.
988 /** true = item has been released and so remains in sender, dest must request or reserve future items */
989 bool try_release( ) __TBB_override {
990 spin_mutex::scoped_lock lock(my_mutex);
991 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
992 my_reserved = false;
993 if(!my_successors.empty())
994 spawn_put();
995 return true;
996 }
997
998 //! Consumes a reserved item
999 bool try_consume( ) __TBB_override {
1000 spin_mutex::scoped_lock lock(my_mutex);
1001 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1002 my_reserved = false;
1003 my_has_cached_item = false;
1004 if ( !my_successors.empty() ) {
1005 spawn_put();
1006 }
1007 return true;
1008 }
1009
1010 //! Activates a node that was created in the inactive state
1011 void activate() {
1012 spin_mutex::scoped_lock lock(my_mutex);
1013 my_active = true;
1014 if (!my_successors.empty())
1015 spawn_put();
1016 }
1017
1018 template<typename Body>
1019 Body copy_function_object() {
1020 internal::source_body<output_type> &body_ref = *this->my_body;
1021 return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
1022 }
1023
1024#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1025 void extract( ) __TBB_override {
1026 my_successors.built_successors().sender_extract(*this); // removes "my_owner" == this from each successor
1027 my_active = init_my_active;
1028 my_reserved = false;
1029 if(my_has_cached_item) my_has_cached_item = false;
1030 }
1031#endif
1032
1033protected:
1034
1035 //! resets the source_node to its initial state
1036 void reset_node( reset_flags f) __TBB_override {
1037 my_active = init_my_active;
1038 my_reserved =false;
1039 if(my_has_cached_item) {
1040 my_has_cached_item = false;
1041 }
1042 if(f & rf_clear_edges) my_successors.clear();
1043 if(f & rf_reset_bodies) {
1044 internal::source_body<output_type> *tmp = my_init_body->clone();
1045 delete my_body;
1046 my_body = tmp;
1047 }
1048 if(my_active)
1049 internal::add_task_to_graph_reset_list(this->my_graph, create_put_task());
1050 }
1051
1052private:
1053 spin_mutex my_mutex;
1054 bool my_active;
1055 bool init_my_active;
1056 internal::source_body<output_type> *my_body;
1057 internal::source_body<output_type> *my_init_body;
1058 internal::broadcast_cache< output_type > my_successors;
1059 bool my_reserved;
1060 bool my_has_cached_item;
1061 output_type my_cached_item;
1062
1063 // used by apply_body_bypass, can invoke body of node.
1064 bool try_reserve_apply_body(output_type &v) {
1065 spin_mutex::scoped_lock lock(my_mutex);
1066 if ( my_reserved ) {
1067 return false;
1068 }
1069 if ( !my_has_cached_item ) {
1070 tbb::internal::fgt_begin_body( my_body );
1071 bool r = (*my_body)(my_cached_item);
1072 tbb::internal::fgt_end_body( my_body );
1073 if (r) {
1074 my_has_cached_item = true;
1075 }
1076 }
1077 if ( my_has_cached_item ) {
1078 v = my_cached_item;
1079 my_reserved = true;
1080 return true;
1081 } else {
1082 return false;
1083 }
1084 }
1085
1086 // when resetting, and if the source_node was created with my_active == true, then
1087 // when we reset the node we must store a task to run the node, and spawn it only
1088 // after the reset is complete and is_active() is again true. This is why we don't
1089 // test for is_active() here.
1090 task* create_put_task() {
1091 return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1092 internal:: source_task_bypass < source_node< output_type > >( *this ) );
1093 }
1094
1095 //! Spawns a task that applies the body
1096 void spawn_put( ) {
1097 if(internal::is_graph_active(this->my_graph)) {
1098 internal::spawn_in_graph_arena(this->my_graph, *create_put_task());
1099 }
1100 }
1101
1102 friend class internal::source_task_bypass< source_node< output_type > >;
1103 //! Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
1104 task * apply_body_bypass( ) {
1105 output_type v;
1106 if ( !try_reserve_apply_body(v) )
1107 return NULL;
1108
1109 task *last_task = my_successors.try_put_task(v);
1110 if ( last_task )
1111 try_consume();
1112 else
1113 try_release();
1114 return last_task;
1115 }
1116}; // class source_node
1117
1118//! Implements a function node that supports Input -> Output
1119template < typename Input, typename Output = continue_msg, typename Policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
1120class function_node : public graph_node, public internal::function_input<Input,Output,Policy,Allocator>, public internal::function_output<Output> {
1121public:
1122 typedef Input input_type;
1123 typedef Output output_type;
1124 typedef internal::function_input<input_type,output_type,Policy,Allocator> input_impl_type;
1125 typedef internal::function_input_queue<input_type, Allocator> input_queue_type;
1126 typedef internal::function_output<output_type> fOutput_type;
1127 typedef typename input_impl_type::predecessor_type predecessor_type;
1128 typedef typename fOutput_type::successor_type successor_type;
1129#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1130 typedef typename input_impl_type::predecessor_list_type predecessor_list_type;
1131 typedef typename fOutput_type::successor_list_type successor_list_type;
1132#endif
1133 using input_impl_type::my_predecessors;
1134
1135 //! Constructor
1136 // input_queue_type is allocated here, but destroyed in the function_input_base.
1137 // TODO: pass the graph_buffer_policy to the function_input_base so it can all
1138 // be done in one place. This would be an interface-breaking change.
1139 template< typename Body >
1140 function_node(
1141 graph &g, size_t concurrency,
1142 __TBB_FLOW_GRAPH_PRIORITY_ARG1( Body body, node_priority_t priority = tbb::flow::internal::no_priority )
1143 ) : graph_node(g), input_impl_type(g, concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
1144 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1145 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1146 }
1147
1148 //! Copy constructor
1149 function_node( const function_node& src ) :
1150 graph_node(src.my_graph),
1151 input_impl_type(src),
1152 fOutput_type() {
1153 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1154 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1155 }
1156
1157#if TBB_PREVIEW_FLOW_GRAPH_TRACE
1158 void set_name( const char *name ) __TBB_override {
1159 tbb::internal::fgt_node_desc( this, name );
1160 }
1161#endif
1162
1163#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1164 void extract( ) __TBB_override {
1165 my_predecessors.built_predecessors().receiver_extract(*this);
1166 successors().built_successors().sender_extract(*this);
1167 }
1168#endif
1169
1170protected:
1171 template< typename R, typename B > friend class run_and_put_task;
1172 template<typename X, typename Y> friend class internal::broadcast_cache;
1173 template<typename X, typename Y> friend class internal::round_robin_cache;
1174 using input_impl_type::try_put_task;
1175
1176 internal::broadcast_cache<output_type> &successors () __TBB_override { return fOutput_type::my_successors; }
1177
1178 void reset_node(reset_flags f) __TBB_override {
1179 input_impl_type::reset_function_input(f);
1180 // TODO: use clear() instead.
1181 if(f & rf_clear_edges) {
1182 successors().clear();
1183 my_predecessors.clear();
1184 }
1185 __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
1186 __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
1187 }
1188
1189}; // class function_node
1190
1191//! implements a function node that supports Input -> (set of outputs)
1192// Output is a tuple of output types.
1193template < typename Input, typename Output, typename Policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
1194class multifunction_node :
1195 public graph_node,
1196 public internal::multifunction_input
1197 <
1198 Input,
1199 typename internal::wrap_tuple_elements<
1200 tbb::flow::tuple_size<Output>::value, // #elements in tuple
1201 internal::multifunction_output, // wrap this around each element
1202 Output // the tuple providing the types
1203 >::type,
1204 Policy,
1205 Allocator
1206 > {
1207protected:
1208 static const int N = tbb::flow::tuple_size<Output>::value;
1209public:
1210 typedef Input input_type;
1211 typedef null_type output_type;
1212 typedef typename internal::wrap_tuple_elements<N,internal::multifunction_output, Output>::type output_ports_type;
1213 typedef internal::multifunction_input<input_type, output_ports_type, Policy, Allocator> input_impl_type;
1214 typedef internal::function_input_queue<input_type, Allocator> input_queue_type;
1215private:
1216 typedef typename internal::multifunction_input<input_type, output_ports_type, Policy, Allocator> base_type;
1217 using input_impl_type::my_predecessors;
1218public:
1219 template<typename Body>
1220 multifunction_node(
1221 graph &g, size_t concurrency,
1222 __TBB_FLOW_GRAPH_PRIORITY_ARG1( Body body, node_priority_t priority = tbb::flow::internal::no_priority )
1223 ) : graph_node(g), base_type(g, concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
1224 tbb::internal::fgt_multioutput_node_with_body<N>(
1225 tbb::internal::FLOW_MULTIFUNCTION_NODE,
1226 &this->my_graph, static_cast<receiver<input_type> *>(this),
1227 this->output_ports(), this->my_body
1228 );
1229 }
1230
1231 multifunction_node( const multifunction_node &other) :
1232 graph_node(other.my_graph), base_type(other) {
1233 tbb::internal::fgt_multioutput_node_with_body<N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1234 &this->my_graph, static_cast<receiver<input_type> *>(this),
1235 this->output_ports(), this->my_body );
1236 }
1237
1238#if TBB_PREVIEW_FLOW_GRAPH_TRACE
1239 void set_name( const char *name ) __TBB_override {
1240 tbb::internal::fgt_multioutput_node_desc( this, name );
1241 }
1242#endif
1243
1244#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1245 void extract( ) __TBB_override {
1246 my_predecessors.built_predecessors().receiver_extract(*this);
1247 base_type::extract();
1248 }
1249#endif
1250 // all the guts are in multifunction_input...
1251protected:
1252 void reset_node(reset_flags f) __TBB_override { base_type::reset(f); }
1253}; // multifunction_node
1254
1255//! split_node: accepts a tuple as input, forwards each element of the tuple to its
1256// successors. The node has unlimited concurrency, so it does not reject inputs.
1257template<typename TupleType, typename Allocator=cache_aligned_allocator<TupleType> >
1258class split_node : public graph_node, public receiver<TupleType> {
1259 static const int N = tbb::flow::tuple_size<TupleType>::value;
1260 typedef receiver<TupleType> base_type;
1261public:
1262 typedef TupleType input_type;
1263 typedef Allocator allocator_type;
1264#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1265 typedef typename base_type::predecessor_type predecessor_type;
1266 typedef typename base_type::predecessor_list_type predecessor_list_type;
1267 typedef internal::predecessor_cache<input_type, null_mutex > predecessor_cache_type;
1268 typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
1269#endif
1270
1271 typedef typename internal::wrap_tuple_elements<
1272 N, // #elements in tuple
1273 internal::multifunction_output, // wrap this around each element
1274 TupleType // the tuple providing the types
1275 >::type output_ports_type;
1276
1277 explicit split_node(graph &g) : graph_node(g)
1278 {
1279 tbb::internal::fgt_multioutput_node<N>(tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1280 static_cast<receiver<input_type> *>(this), this->output_ports());
1281 }
1282 split_node( const split_node & other) : graph_node(other.my_graph), base_type(other)
1283 {
1284 tbb::internal::fgt_multioutput_node<N>(tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1285 static_cast<receiver<input_type> *>(this), this->output_ports());
1286 }
1287
1288#if TBB_PREVIEW_FLOW_GRAPH_TRACE
1289 void set_name( const char *name ) __TBB_override {
1290 tbb::internal::fgt_multioutput_node_desc( this, name );
1291 }
1292#endif
1293
1294 output_ports_type &output_ports() { return my_output_ports; }
1295
1296protected:
1297 task *try_put_task(const TupleType& t) __TBB_override {
1298 // Sending split messages in parallel is not justified, as overheads would prevail.
1299 // Also, we do not have successors here. So we just tell the task returned here is successful.
1300 return internal::emit_element<N>::emit_this(this->my_graph, t, output_ports());
1301 }
1302 void reset_node(reset_flags f) __TBB_override {
1303 if (f & rf_clear_edges)
1304 internal::clear_element<N>::clear_this(my_output_ports);
1305
1306 __TBB_ASSERT(!(f & rf_clear_edges) || internal::clear_element<N>::this_empty(my_output_ports), "split_node reset failed");
1307 }
1308 void reset_receiver(reset_flags /*f*/) __TBB_override {}
1309 graph& graph_reference() __TBB_override {
1310 return my_graph;
1311 }
1312#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1313private: //! split_node doesn't use this "predecessors" functionality; so, we have "dummies" here;
1314 void extract() __TBB_override {}
1315
1316 //! Adds to list of predecessors added by make_edge
1317 void internal_add_built_predecessor(predecessor_type&) __TBB_override {}
1318
1319 //! removes from to list of predecessors (used by remove_edge)
1320 void internal_delete_built_predecessor(predecessor_type&) __TBB_override {}
1321
1322 size_t predecessor_count() __TBB_override { return 0; }
1323
1324 void copy_predecessors(predecessor_list_type&) __TBB_override {}
1325
1326 built_predecessors_type &built_predecessors() __TBB_override { return my_predessors; }
1327
1328 //! dummy member
1329 built_predecessors_type my_predessors;
1330#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1331
1332private:
1333 output_ports_type my_output_ports;
1334};
1335
1336//! Implements an executable node that supports continue_msg -> Output
1337template <typename Output, typename Policy = internal::Policy<void> >
1338class continue_node : public graph_node, public internal::continue_input<Output, Policy>,
1339 public internal::function_output<Output> {
1340public:
1341 typedef continue_msg input_type;
1342 typedef Output output_type;
1343 typedef internal::continue_input<Output, Policy> input_impl_type;
1344 typedef internal::function_output<output_type> fOutput_type;
1345 typedef typename input_impl_type::predecessor_type predecessor_type;
1346 typedef typename fOutput_type::successor_type successor_type;
1347
1348 //! Constructor for executable node with continue_msg -> Output
1349 template <typename Body >
1350 continue_node(
1351 graph &g,
1352 __TBB_FLOW_GRAPH_PRIORITY_ARG1( Body body, node_priority_t priority = tbb::flow::internal::no_priority )
1353 ) : graph_node(g), input_impl_type( g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority) ) {
1354 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1355 static_cast<receiver<input_type> *>(this),
1356 static_cast<sender<output_type> *>(this), this->my_body );
1357 }
1358
1359 //! Constructor for executable node with continue_msg -> Output
1360 template <typename Body >
1361 continue_node(
1362 graph &g, int number_of_predecessors,
1363 __TBB_FLOW_GRAPH_PRIORITY_ARG1( Body body, node_priority_t priority = tbb::flow::internal::no_priority )
1364 ) : graph_node(g)
1365 , input_impl_type(g, number_of_predecessors, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
1366 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1367 static_cast<receiver<input_type> *>(this),
1368 static_cast<sender<output_type> *>(this), this->my_body );
1369 }
1370
1371 //! Copy constructor
1372 continue_node( const continue_node& src ) :
1373 graph_node(src.my_graph), input_impl_type(src),
1374 internal::function_output<Output>() {
1375 tbb::internal::fgt_node_with_body( tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1376 static_cast<receiver<input_type> *>(this),
1377 static_cast<sender<output_type> *>(this), this->my_body );
1378 }
1379
1380#if TBB_PREVIEW_FLOW_GRAPH_TRACE
1381 void set_name( const char *name ) __TBB_override {
1382 tbb::internal::fgt_node_desc( this, name );
1383 }
1384#endif
1385
1386#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1387 void extract() __TBB_override {
1388 input_impl_type::my_built_predecessors.receiver_extract(*this);
1389 successors().built_successors().sender_extract(*this);
1390 }
1391#endif
1392
1393protected:
1394 template< typename R, typename B > friend class run_and_put_task;
1395 template<typename X, typename Y> friend class internal::broadcast_cache;
1396 template<typename X, typename Y> friend class internal::round_robin_cache;
1397 using input_impl_type::try_put_task;
1398 internal::broadcast_cache<output_type> &successors () __TBB_override { return fOutput_type::my_successors; }
1399
1400 void reset_node(reset_flags f) __TBB_override {
1401 input_impl_type::reset_receiver(f);
1402 if(f & rf_clear_edges)successors().clear();
1403 __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1404 }
1405}; // continue_node
1406
1407//! Forwards messages of type T to all successors
1408template <typename T>
1409class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1410public:
1411 typedef T input_type;
1412 typedef T output_type;
1413 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1414 typedef typename sender<output_type>::successor_type successor_type;
1415#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1416 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1417 typedef typename sender<output_type>::successor_list_type successor_list_type;
1418#endif
1419private:
1420 internal::broadcast_cache<input_type> my_successors;
1421#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1422 internal::edge_container<predecessor_type> my_built_predecessors;
1423 spin_mutex pred_mutex; // serialize accesses on edge_container
1424#endif
1425public:
1426
1427 explicit broadcast_node(graph& g) : graph_node(g) {
1428 my_successors.set_owner( this );
1429 tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1430 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1431 }
1432
1433 // Copy constructor
1434 broadcast_node( const broadcast_node& src ) :
1435 graph_node(src.my_graph), receiver<T>(), sender<T>()
1436 {
1437 my_successors.set_owner( this );
1438 tbb::internal::fgt_node( tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1439 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1440 }
1441
1442#if TBB_PREVIEW_FLOW_GRAPH_TRACE
1443 void set_name( const char *name ) __TBB_override {
1444 tbb::internal::fgt_node_desc( this, name );
1445 }
1446#endif
1447
1448 //! Adds a successor
1449 bool register_successor( successor_type &r ) __TBB_override {
1450 my_successors.register_successor( r );
1451 return true;
1452 }
1453
1454 //! Removes s as a successor
1455 bool remove_successor( successor_type &r ) __TBB_override {
1456 my_successors.remove_successor( r );
1457 return true;
1458 }
1459
1460#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1461 typedef typename sender<T>::built_successors_type built_successors_type;
1462
1463 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1464
1465 void internal_add_built_successor(successor_type &r) __TBB_override {
1466 my_successors.internal_add_built_successor(r);
1467 }
1468
1469 void internal_delete_built_successor(successor_type &r) __TBB_override {
1470 my_successors.internal_delete_built_successor(r);
1471 }
1472
1473 size_t successor_count() __TBB_override {
1474 return my_successors.successor_count();
1475 }
1476
1477 void copy_successors(successor_list_type &v) __TBB_override {
1478 my_successors.copy_successors(v);
1479 }
1480
1481 typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1482
1483 built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1484
1485 void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
1486 spin_mutex::scoped_lock l(pred_mutex);
1487 my_built_predecessors.add_edge(p);
1488 }
1489
1490 void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
1491 spin_mutex::scoped_lock l(pred_mutex);
1492 my_built_predecessors.delete_edge(p);
1493 }
1494
1495 size_t predecessor_count() __TBB_override {
1496 spin_mutex::scoped_lock l(pred_mutex);
1497 return my_built_predecessors.edge_count();
1498 }
1499
1500 void copy_predecessors(predecessor_list_type &v) __TBB_override {
1501 spin_mutex::scoped_lock l(pred_mutex);
1502 my_built_predecessors.copy_edges(v);
1503 }
1504
1505 void extract() __TBB_override {
1506 my_built_predecessors.receiver_extract(*this);
1507 my_successors.built_successors().sender_extract(*this);
1508 }
1509#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1510
1511protected:
1512 template< typename R, typename B > friend class run_and_put_task;
1513 template<typename X, typename Y> friend class internal::broadcast_cache;
1514 template<typename X, typename Y> friend class internal::round_robin_cache;
1515 //! build a task to run the successor if possible. Default is old behavior.
1516 task *try_put_task(const T& t) __TBB_override {
1517 task *new_task = my_successors.try_put_task(t);
1518 if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1519 return new_task;
1520 }
1521
1522 graph& graph_reference() __TBB_override {
1523 return my_graph;
1524 }
1525
1526 void reset_receiver(reset_flags /*f*/) __TBB_override {}
1527
1528 void reset_node(reset_flags f) __TBB_override {
1529 if (f&rf_clear_edges) {
1530 my_successors.clear();
1531#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1532 my_built_predecessors.clear();
1533#endif
1534 }
1535 __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
1536 }
1537}; // broadcast_node
1538
1539//! Forwards messages in arbitrary order
1540template <typename T, typename A=cache_aligned_allocator<T> >
1541class buffer_node : public graph_node, public internal::reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
1542public:
1543 typedef T input_type;
1544 typedef T output_type;
1545 typedef typename receiver<input_type>::predecessor_type predecessor_type;
1546 typedef typename sender<output_type>::successor_type successor_type;
1547 typedef buffer_node<T, A> class_type;
1548#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1549 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1550 typedef typename sender<output_type>::successor_list_type successor_list_type;
1551#endif
1552protected:
1553 typedef size_t size_type;
1554 internal::round_robin_cache< T, null_rw_mutex > my_successors;
1555
1556#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1557 internal::edge_container<predecessor_type> my_built_predecessors;
1558#endif
1559
1560 friend class internal::forward_task_bypass< buffer_node< T, A > >;
1561
1562 enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1563#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1564 , add_blt_succ, del_blt_succ,
1565 add_blt_pred, del_blt_pred,
1566 blt_succ_cnt, blt_pred_cnt,
1567 blt_succ_cpy, blt_pred_cpy // create vector copies of preds and succs
1568#endif
1569 };
1570
1571 // implements the aggregator_operation concept
1572 class buffer_operation : public internal::aggregated_operation< buffer_operation > {
1573 public:
1574 char type;
1575#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1576 task * ltask;
1577 union {
1578 input_type *elem;
1579 successor_type *r;
1580 predecessor_type *p;
1581 size_t cnt_val;
1582 successor_list_type *svec;
1583 predecessor_list_type *pvec;
1584 };
1585#else
1586 T *elem;
1587 task * ltask;
1588 successor_type *r;
1589#endif
1590 buffer_operation(const T& e, op_type t) : type(char(t))
1591
1592#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1593 , ltask(NULL), elem(const_cast<T*>(&e))
1594#else
1595 , elem(const_cast<T*>(&e)) , ltask(NULL)
1596#endif
1597 {}
1598 buffer_operation(op_type t) : type(char(t)), ltask(NULL) {}
1599 };
1600
1601 bool forwarder_busy;
1602 typedef internal::aggregating_functor<class_type, buffer_operation> handler_type;
1603 friend class internal::aggregating_functor<class_type, buffer_operation>;
1604 internal::aggregator< handler_type, buffer_operation> my_aggregator;
1605
1606 virtual void handle_operations(buffer_operation *op_list) {
1607 handle_operations_impl(op_list, this);
1608 }
1609
1610 template<typename derived_type>
1611 void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
1612 __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1613
1614 buffer_operation *tmp = NULL;
1615 bool try_forwarding = false;
1616 while (op_list) {
1617 tmp = op_list;
1618 op_list = op_list->next;
1619 switch (tmp->type) {
1620 case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
1621 case rem_succ: internal_rem_succ(tmp); break;
1622 case req_item: internal_pop(tmp); break;
1623 case res_item: internal_reserve(tmp); break;
1624 case rel_res: internal_release(tmp); try_forwarding = true; break;
1625 case con_res: internal_consume(tmp); try_forwarding = true; break;
1626 case put_item: try_forwarding = internal_push(tmp); break;
1627 case try_fwd_task: internal_forward_task(tmp); break;
1628#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1629 // edge recording
1630 case add_blt_succ: internal_add_built_succ(tmp); break;
1631 case del_blt_succ: internal_del_built_succ(tmp); break;
1632 case add_blt_pred: internal_add_built_pred(tmp); break;
1633 case del_blt_pred: internal_del_built_pred(tmp); break;
1634 case blt_succ_cnt: internal_succ_cnt(tmp); break;
1635 case blt_pred_cnt: internal_pred_cnt(tmp); break;
1636 case blt_succ_cpy: internal_copy_succs(tmp); break;
1637 case blt_pred_cpy: internal_copy_preds(tmp); break;
1638#endif
1639 }
1640 }
1641
1642 derived->order();
1643
1644 if (try_forwarding && !forwarder_busy) {
1645 if(internal::is_graph_active(this->my_graph)) {
1646 forwarder_busy = true;
1647 task *new_task = new(task::allocate_additional_child_of(*(this->my_graph.root_task()))) internal::
1648 forward_task_bypass
1649 < buffer_node<input_type, A> >(*this);
1650 // tmp should point to the last item handled by the aggregator. This is the operation
1651 // the handling thread enqueued. So modifying that record will be okay.
1652 // workaround for icc bug
1653 tbb::task *z = tmp->ltask;
1654 graph &g = this->my_graph;
1655 tmp->ltask = combine_tasks(g, z, new_task); // in case the op generated a task
1656 }
1657 }
1658 } // handle_operations
1659
1660 inline task *grab_forwarding_task( buffer_operation &op_data) {
1661 return op_data.ltask;
1662 }
1663
1664 inline bool enqueue_forwarding_task(buffer_operation &op_data) {
1665 task *ft = grab_forwarding_task(op_data);
1666 if(ft) {
1667 internal::spawn_in_graph_arena(graph_reference(), *ft);
1668 return true;
1669 }
1670 return false;
1671 }
1672
1673 //! This is executed by an enqueued task, the "forwarder"
1674 virtual task *forward_task() {
1675 buffer_operation op_data(try_fwd_task);
1676 task *last_task = NULL;
1677 do {
1678 op_data.status = internal::WAIT;
1679 op_data.ltask = NULL;
1680 my_aggregator.execute(&op_data);
1681
1682 // workaround for icc bug
1683 tbb::task *xtask = op_data.ltask;
1684 graph& g = this->my_graph;
1685 last_task = combine_tasks(g, last_task, xtask);
1686 } while (op_data.status ==internal::SUCCEEDED);
1687 return last_task;
1688 }
1689
1690 //! Register successor
1691 virtual void internal_reg_succ(buffer_operation *op) {
1692 my_successors.register_successor(*(op->r));
1693 __TBB_store_with_release(op->status, internal::SUCCEEDED);
1694 }
1695
1696 //! Remove successor
1697 virtual void internal_rem_succ(buffer_operation *op) {
1698 my_successors.remove_successor(*(op->r));
1699 __TBB_store_with_release(op->status, internal::SUCCEEDED);
1700 }
1701
1702#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1703 typedef typename sender<T>::built_successors_type built_successors_type;
1704
1705 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1706
1707 virtual void internal_add_built_succ(buffer_operation *op) {
1708 my_successors.internal_add_built_successor(*(op->r));
1709 __TBB_store_with_release(op->status, internal::SUCCEEDED);
1710 }
1711
1712 virtual void internal_del_built_succ(buffer_operation *op) {
1713 my_successors.internal_delete_built_successor(*(op->r));
1714 __TBB_store_with_release(op->status, internal::SUCCEEDED);
1715 }
1716
1717 typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1718
1719 built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1720
1721 virtual void internal_add_built_pred(buffer_operation *op) {
1722 my_built_predecessors.add_edge(*(op->p));
1723 __TBB_store_with_release(op->status, internal::SUCCEEDED);
1724 }
1725
1726 virtual void internal_del_built_pred(buffer_operation *op) {
1727 my_built_predecessors.delete_edge(*(op->p));
1728 __TBB_store_with_release(op->status, internal::SUCCEEDED);
1729 }
1730
1731 virtual void internal_succ_cnt(buffer_operation *op) {
1732 op->cnt_val = my_successors.successor_count();
1733 __TBB_store_with_release(op->status, internal::SUCCEEDED);
1734 }
1735
1736 virtual void internal_pred_cnt(buffer_operation *op) {
1737 op->cnt_val = my_built_predecessors.edge_count();
1738 __TBB_store_with_release(op->status, internal::SUCCEEDED);
1739 }
1740
1741 virtual void internal_copy_succs(buffer_operation *op) {
1742 my_successors.copy_successors(*(op->svec));
1743 __TBB_store_with_release(op->status, internal::SUCCEEDED);
1744 }
1745
1746 virtual void internal_copy_preds(buffer_operation *op) {
1747 my_built_predecessors.copy_edges(*(op->pvec));
1748 __TBB_store_with_release(op->status, internal::SUCCEEDED);
1749 }
1750
1751#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1752
1753private:
1754 void order() {}
1755
1756 bool is_item_valid() {
1757 return this->my_item_valid(this->my_tail - 1);
1758 }
1759
1760 void try_put_and_add_task(task*& last_task) {
1761 task *new_task = my_successors.try_put_task(this->back());
1762 if (new_task) {
1763 // workaround for icc bug
1764 graph& g = this->my_graph;
1765 last_task = combine_tasks(g, last_task, new_task);
1766 this->destroy_back();
1767 }
1768 }
1769
1770protected:
1771 //! Tries to forward valid items to successors
1772 virtual void internal_forward_task(buffer_operation *op) {
1773 internal_forward_task_impl(op, this);
1774 }
1775
1776 template<typename derived_type>
1777 void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
1778 __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1779
1780 if (this->my_reserved || !derived->is_item_valid()) {
1781 __TBB_store_with_release(op->status, internal::FAILED);
1782 this->forwarder_busy = false;
1783 return;
1784 }
1785 // Try forwarding, giving each successor a chance
1786 task * last_task = NULL;
1787 size_type counter = my_successors.size();
1788 for (; counter > 0 && derived->is_item_valid(); --counter)
1789 derived->try_put_and_add_task(last_task);
1790
1791 op->ltask = last_task; // return task
1792 if (last_task && !counter) {
1793 __TBB_store_with_release(op->status, internal::SUCCEEDED);
1794 }
1795 else {
1796 __TBB_store_with_release(op->status, internal::FAILED);
1797 forwarder_busy = false;
1798 }
1799 }
1800
1801 virtual bool internal_push(buffer_operation *op) {
1802 this->push_back(*(op->elem));
1803 __TBB_store_with_release(op->status, internal::SUCCEEDED);
1804 return true;
1805 }
1806
1807 virtual void internal_pop(buffer_operation *op) {
1808 if(this->pop_back(*(op->elem))) {
1809 __TBB_store_with_release(op->status, internal::SUCCEEDED);
1810 }
1811 else {
1812 __TBB_store_with_release(op->status, internal::FAILED);
1813 }
1814 }
1815
1816 virtual void internal_reserve(buffer_operation *op) {
1817 if(this->reserve_front(*(op->elem))) {
1818 __TBB_store_with_release(op->status, internal::SUCCEEDED);
1819 }
1820 else {
1821 __TBB_store_with_release(op->status, internal::FAILED);
1822 }
1823 }
1824
1825 virtual void internal_consume(buffer_operation *op) {
1826 this->consume_front();
1827 __TBB_store_with_release(op->status, internal::SUCCEEDED);
1828 }
1829
1830 virtual void internal_release(buffer_operation *op) {
1831 this->release_front();
1832 __TBB_store_with_release(op->status, internal::SUCCEEDED);
1833 }
1834
1835public:
1836 //! Constructor
1837 explicit buffer_node( graph &g ) : graph_node(g), internal::reservable_item_buffer<T>(),
1838 forwarder_busy(false) {
1839 my_successors.set_owner(this);
1840 my_aggregator.initialize_handler(handler_type(this));
1841 tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
1842 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1843 }
1844
1845 //! Copy constructor
1846 buffer_node( const buffer_node& src ) : graph_node(src.my_graph),
1847 internal::reservable_item_buffer<T>(), receiver<T>(), sender<T>() {
1848 forwarder_busy = false;
1849 my_successors.set_owner(this);
1850 my_aggregator.initialize_handler(handler_type(this));
1851 tbb::internal::fgt_node( tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
1852 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1853 }
1854
1855#if TBB_PREVIEW_FLOW_GRAPH_TRACE
1856 void set_name( const char *name ) __TBB_override {
1857 tbb::internal::fgt_node_desc( this, name );
1858 }
1859#endif
1860
1861 //
1862 // message sender implementation
1863 //
1864
1865 //! Adds a new successor.
1866 /** Adds successor r to the list of successors; may forward tasks. */
1867 bool register_successor( successor_type &r ) __TBB_override {
1868 buffer_operation op_data(reg_succ);
1869 op_data.r = &r;
1870 my_aggregator.execute(&op_data);
1871 (void)enqueue_forwarding_task(op_data);
1872 return true;
1873 }
1874
1875#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1876 void internal_add_built_successor( successor_type &r) __TBB_override {
1877 buffer_operation op_data(add_blt_succ);
1878 op_data.r = &r;
1879 my_aggregator.execute(&op_data);
1880 }
1881
1882 void internal_delete_built_successor( successor_type &r) __TBB_override {
1883 buffer_operation op_data(del_blt_succ);
1884 op_data.r = &r;
1885 my_aggregator.execute(&op_data);
1886 }
1887
1888 void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
1889 buffer_operation op_data(add_blt_pred);
1890 op_data.p = &p;
1891 my_aggregator.execute(&op_data);
1892 }
1893
1894 void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
1895 buffer_operation op_data(del_blt_pred);
1896 op_data.p = &p;
1897 my_aggregator.execute(&op_data);
1898 }
1899
1900 size_t predecessor_count() __TBB_override {
1901 buffer_operation op_data(blt_pred_cnt);
1902 my_aggregator.execute(&op_data);
1903 return op_data.cnt_val;
1904 }
1905
1906 size_t successor_count() __TBB_override {
1907 buffer_operation op_data(blt_succ_cnt);
1908 my_aggregator.execute(&op_data);
1909 return op_data.cnt_val;
1910 }
1911
1912 void copy_predecessors( predecessor_list_type &v ) __TBB_override {
1913 buffer_operation op_data(blt_pred_cpy);
1914 op_data.pvec = &v;
1915 my_aggregator.execute(&op_data);
1916 }
1917
1918 void copy_successors( successor_list_type &v ) __TBB_override {
1919 buffer_operation op_data(blt_succ_cpy);
1920 op_data.svec = &v;
1921 my_aggregator.execute(&op_data);
1922 }
1923
1924#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1925
1926 //! Removes a successor.
1927 /** Removes successor r from the list of successors.
1928 It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
1929 bool remove_successor( successor_type &r ) __TBB_override {
1930 r.remove_predecessor(*this);
1931 buffer_operation op_data(rem_succ);
1932 op_data.r = &r;
1933 my_aggregator.execute(&op_data);
1934 // even though this operation does not cause a forward, if we are the handler, and
1935 // a forward is scheduled, we may be the first to reach this point after the aggregator,
1936 // and so should check for the task.
1937 (void)enqueue_forwarding_task(op_data);
1938 return true;
1939 }
1940
1941 //! Request an item from the buffer_node
1942 /** true = v contains the returned item<BR>
1943 false = no item has been returned */
1944 bool try_get( T &v ) __TBB_override {
1945 buffer_operation op_data(req_item);
1946 op_data.elem = &v;
1947 my_aggregator.execute(&op_data);
1948 (void)enqueue_forwarding_task(op_data);
1949 return (op_data.status==internal::SUCCEEDED);
1950 }
1951
1952 //! Reserves an item.
1953 /** false = no item can be reserved<BR>
1954 true = an item is reserved */
1955 bool try_reserve( T &v ) __TBB_override {
1956 buffer_operation op_data(res_item);
1957 op_data.elem = &v;
1958 my_aggregator.execute(&op_data);
1959 (void)enqueue_forwarding_task(op_data);
1960 return (op_data.status==internal::SUCCEEDED);
1961 }
1962
1963 //! Release a reserved item.
1964 /** true = item has been released and so remains in sender */
1965 bool try_release() __TBB_override {
1966 buffer_operation op_data(rel_res);
1967 my_aggregator.execute(&op_data);
1968 (void)enqueue_forwarding_task(op_data);
1969 return true;
1970 }
1971
1972 //! Consumes a reserved item.
1973 /** true = item is removed from sender and reservation removed */
1974 bool try_consume() __TBB_override {
1975 buffer_operation op_data(con_res);
1976 my_aggregator.execute(&op_data);
1977 (void)enqueue_forwarding_task(op_data);
1978 return true;
1979 }
1980
1981protected:
1982
1983 template< typename R, typename B > friend class run_and_put_task;
1984 template<typename X, typename Y> friend class internal::broadcast_cache;
1985 template<typename X, typename Y> friend class internal::round_robin_cache;
1986 //! receive an item, return a task *if possible
1987 task *try_put_task(const T &t) __TBB_override {
1988 buffer_operation op_data(t, put_item);
1989 my_aggregator.execute(&op_data);
1990 task *ft = grab_forwarding_task(op_data);
1991 // sequencer_nodes can return failure (if an item has been previously inserted)
1992 // We have to spawn the returned task if our own operation fails.
1993
1994 if(ft && op_data.status ==internal::FAILED) {
1995 // we haven't succeeded queueing the item, but for some reason the
1996 // call returned a task (if another request resulted in a successful
1997 // forward this could happen.) Queue the task and reset the pointer.
1998 internal::spawn_in_graph_arena(graph_reference(), *ft); ft = NULL;
1999 }
2000 else if(!ft && op_data.status ==internal::SUCCEEDED) {
2001 ft = SUCCESSFULLY_ENQUEUED;
2002 }
2003 return ft;
2004 }
2005
2006 graph& graph_reference() __TBB_override {
2007 return my_graph;
2008 }
2009
2010 void reset_receiver(reset_flags /*f*/) __TBB_override { }
2011
2012#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2013public:
2014 void extract() __TBB_override {
2015 my_built_predecessors.receiver_extract(*this);
2016 my_successors.built_successors().sender_extract(*this);
2017 }
2018#endif
2019
2020protected:
2021 void reset_node( reset_flags f) __TBB_override {
2022 internal::reservable_item_buffer<T, A>::reset();
2023 // TODO: just clear structures
2024 if (f&rf_clear_edges) {
2025 my_successors.clear();
2026#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2027 my_built_predecessors.clear();
2028#endif
2029 }
2030 forwarder_busy = false;
2031 }
2032}; // buffer_node
2033
2034//! Forwards messages in FIFO order
2035template <typename T, typename A=cache_aligned_allocator<T> >
2036class queue_node : public buffer_node<T, A> {
2037protected:
2038 typedef buffer_node<T, A> base_type;
2039 typedef typename base_type::size_type size_type;
2040 typedef typename base_type::buffer_operation queue_operation;
2041 typedef queue_node class_type;
2042
2043private:
2044 template<typename, typename> friend class buffer_node;
2045
2046 bool is_item_valid() {
2047 return this->my_item_valid(this->my_head);
2048 }
2049
2050 void try_put_and_add_task(task*& last_task) {
2051 task *new_task = this->my_successors.try_put_task(this->front());
2052 if (new_task) {
2053 // workaround for icc bug
2054 graph& graph_ref = this->graph_reference();
2055 last_task = combine_tasks(graph_ref, last_task, new_task);
2056 this->destroy_front();
2057 }
2058 }
2059
2060protected:
2061 void internal_forward_task(queue_operation *op) __TBB_override {
2062 this->internal_forward_task_impl(op, this);
2063 }
2064
2065 void internal_pop(queue_operation *op) __TBB_override {
2066 if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2067 __TBB_store_with_release(op->status, internal::FAILED);
2068 }
2069 else {
2070 this->pop_front(*(op->elem));
2071 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2072 }
2073 }
2074 void internal_reserve(queue_operation *op) __TBB_override {
2075 if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2076 __TBB_store_with_release(op->status, internal::FAILED);
2077 }
2078 else {
2079 this->reserve_front(*(op->elem));
2080 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2081 }
2082 }
2083 void internal_consume(queue_operation *op) __TBB_override {
2084 this->consume_front();
2085 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2086 }
2087
2088public:
2089 typedef T input_type;
2090 typedef T output_type;
2091 typedef typename receiver<input_type>::predecessor_type predecessor_type;
2092 typedef typename sender<output_type>::successor_type successor_type;
2093
2094 //! Constructor
2095 explicit queue_node( graph &g ) : base_type(g) {
2096 tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2097 static_cast<receiver<input_type> *>(this),
2098 static_cast<sender<output_type> *>(this) );
2099 }
2100
2101 //! Copy constructor
2102 queue_node( const queue_node& src) : base_type(src) {
2103 tbb::internal::fgt_node( tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2104 static_cast<receiver<input_type> *>(this),
2105 static_cast<sender<output_type> *>(this) );
2106 }
2107
2108#if TBB_PREVIEW_FLOW_GRAPH_TRACE
2109 void set_name( const char *name ) __TBB_override {
2110 tbb::internal::fgt_node_desc( this, name );
2111 }
2112#endif
2113
2114protected:
2115 void reset_node( reset_flags f) __TBB_override {
2116 base_type::reset_node(f);
2117 }
2118}; // queue_node
2119
2120//! Forwards messages in sequence order
2121template< typename T, typename A=cache_aligned_allocator<T> >
2122class sequencer_node : public queue_node<T, A> {
2123 internal::function_body< T, size_t > *my_sequencer;
2124 // my_sequencer should be a benign function and must be callable
2125 // from a parallel context. Does this mean it needn't be reset?
2126public:
2127 typedef T input_type;
2128 typedef T output_type;
2129 typedef typename receiver<input_type>::predecessor_type predecessor_type;
2130 typedef typename sender<output_type>::successor_type successor_type;
2131
2132 //! Constructor
2133 template< typename Sequencer >
2134 sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
2135 my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {
2136 tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2137 static_cast<receiver<input_type> *>(this),
2138 static_cast<sender<output_type> *>(this) );
2139 }
2140
2141 //! Copy constructor
2142 sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
2143 my_sequencer( src.my_sequencer->clone() ) {
2144 tbb::internal::fgt_node( tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2145 static_cast<receiver<input_type> *>(this),
2146 static_cast<sender<output_type> *>(this) );
2147 }
2148
2149 //! Destructor
2150 ~sequencer_node() { delete my_sequencer; }
2151
2152#if TBB_PREVIEW_FLOW_GRAPH_TRACE
2153 void set_name( const char *name ) __TBB_override {
2154 tbb::internal::fgt_node_desc( this, name );
2155 }
2156#endif
2157
2158protected:
2159 typedef typename buffer_node<T, A>::size_type size_type;
2160 typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
2161
2162private:
2163 bool internal_push(sequencer_operation *op) __TBB_override {
2164 size_type tag = (*my_sequencer)(*(op->elem));
2165#if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
2166 if (tag < this->my_head) {
2167 // have already emitted a message with this tag
2168 __TBB_store_with_release(op->status, internal::FAILED);
2169 return false;
2170 }
2171#endif
2172 // cannot modify this->my_tail now; the buffer would be inconsistent.
2173 size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2174
2175 if (this->size(new_tail) > this->capacity()) {
2176 this->grow_my_array(this->size(new_tail));
2177 }
2178 this->my_tail = new_tail;
2179
2180 const internal::op_stat res = this->place_item(tag, *(op->elem)) ? internal::SUCCEEDED : internal::FAILED;
2181 __TBB_store_with_release(op->status, res);
2182 return res ==internal::SUCCEEDED;
2183 }
2184}; // sequencer_node
2185
2186//! Forwards messages in priority order
2187template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
2188class priority_queue_node : public buffer_node<T, A> {
2189public:
2190 typedef T input_type;
2191 typedef T output_type;
2192 typedef buffer_node<T,A> base_type;
2193 typedef priority_queue_node class_type;
2194 typedef typename receiver<input_type>::predecessor_type predecessor_type;
2195 typedef typename sender<output_type>::successor_type successor_type;
2196
2197 //! Constructor
2198 explicit priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {
2199 tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2200 static_cast<receiver<input_type> *>(this),
2201 static_cast<sender<output_type> *>(this) );
2202 }
2203
2204 //! Copy constructor
2205 priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {
2206 tbb::internal::fgt_node( tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2207 static_cast<receiver<input_type> *>(this),
2208 static_cast<sender<output_type> *>(this) );
2209 }
2210
2211#if TBB_PREVIEW_FLOW_GRAPH_TRACE
2212 void set_name( const char *name ) __TBB_override {
2213 tbb::internal::fgt_node_desc( this, name );
2214 }
2215#endif
2216
2217protected:
2218
2219 void reset_node( reset_flags f) __TBB_override {
2220 mark = 0;
2221 base_type::reset_node(f);
2222 }
2223
2224 typedef typename buffer_node<T, A>::size_type size_type;
2225 typedef typename buffer_node<T, A>::item_type item_type;
2226 typedef typename buffer_node<T, A>::buffer_operation prio_operation;
2227
2228 //! Tries to forward valid items to successors
2229 void internal_forward_task(prio_operation *op) __TBB_override {
2230 this->internal_forward_task_impl(op, this);
2231 }
2232
2233 void handle_operations(prio_operation *op_list) __TBB_override {
2234 this->handle_operations_impl(op_list, this);
2235 }
2236
2237 bool internal_push(prio_operation *op) __TBB_override {
2238 prio_push(*(op->elem));
2239 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2240 return true;
2241 }
2242
2243 void internal_pop(prio_operation *op) __TBB_override {
2244 // if empty or already reserved, don't pop
2245 if ( this->my_reserved == true || this->my_tail == 0 ) {
2246 __TBB_store_with_release(op->status, internal::FAILED);
2247 return;
2248 }
2249
2250 *(op->elem) = prio();
2251 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2252 prio_pop();
2253
2254 }
2255
2256 // pops the highest-priority item, saves copy
2257 void internal_reserve(prio_operation *op) __TBB_override {
2258 if (this->my_reserved == true || this->my_tail == 0) {
2259 __TBB_store_with_release(op->status, internal::FAILED);
2260 return;
2261 }
2262 this->my_reserved = true;
2263 *(op->elem) = prio();
2264 reserved_item = *(op->elem);
2265 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2266 prio_pop();
2267 }
2268
2269 void internal_consume(prio_operation *op) __TBB_override {
2270 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2271 this->my_reserved = false;
2272 reserved_item = input_type();
2273 }
2274
2275 void internal_release(prio_operation *op) __TBB_override {
2276 __TBB_store_with_release(op->status, internal::SUCCEEDED);
2277 prio_push(reserved_item);
2278 this->my_reserved = false;
2279 reserved_item = input_type();
2280 }
2281
2282private:
2283 template<typename, typename> friend class buffer_node;
2284
2285 void order() {
2286 if (mark < this->my_tail) heapify();
2287 __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
2288 }
2289
2290 bool is_item_valid() {
2291 return this->my_tail > 0;
2292 }
2293
2294 void try_put_and_add_task(task*& last_task) {
2295 task * new_task = this->my_successors.try_put_task(this->prio());
2296 if (new_task) {
2297 // workaround for icc bug
2298 graph& graph_ref = this->graph_reference();
2299 last_task = combine_tasks(graph_ref, last_task, new_task);
2300 prio_pop();
2301 }
2302 }
2303
2304private:
2305 Compare compare;
2306 size_type mark;
2307
2308 input_type reserved_item;
2309
2310 // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
2311 bool prio_use_tail() {
2312 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
2313 return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2314 }
2315
2316 // prio_push: checks that the item will fit, expand array if necessary, put at end
2317 void prio_push(const T &src) {
2318 if ( this->my_tail >= this->my_array_size )
2319 this->grow_my_array( this->my_tail + 1 );
2320 (void) this->place_item(this->my_tail, src);
2321 ++(this->my_tail);
2322 __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
2323 }
2324
2325 // prio_pop: deletes highest priority item from the array, and if it is item
2326 // 0, move last item to 0 and reheap. If end of array, just destroy and decrement tail
2327 // and mark. Assumes the array has already been tested for emptiness; no failure.
2328 void prio_pop() {
2329 if (prio_use_tail()) {
2330 // there are newly pushed elements; last one higher than top
2331 // copy the data
2332 this->destroy_item(this->my_tail-1);
2333 --(this->my_tail);
2334 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2335 return;
2336 }
2337 this->destroy_item(0);
2338 if(this->my_tail > 1) {
2339 // push the last element down heap
2340 __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2341 this->move_item(0,this->my_tail - 1);
2342 }
2343 --(this->my_tail);
2344 if(mark > this->my_tail) --mark;
2345 if (this->my_tail > 1) // don't reheap for heap of size 1
2346 reheap();
2347 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2348 }
2349
2350 const T& prio() {
2351 return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
2352 }
2353
2354 // turn array into heap
2355 void heapify() {
2356 if(this->my_tail == 0) {
2357 mark = 0;
2358 return;
2359 }
2360 if (!mark) mark = 1;
2361 for (; mark<this->my_tail; ++mark) { // for each unheaped element
2362 size_type cur_pos = mark;
2363 input_type to_place;
2364 this->fetch_item(mark,to_place);
2365 do { // push to_place up the heap
2366 size_type parent = (cur_pos-1)>>1;
2367 if (!compare(this->get_my_item(parent), to_place))
2368 break;
2369 this->move_item(cur_pos, parent);
2370 cur_pos = parent;
2371 } while( cur_pos );
2372 (void) this->place_item(cur_pos, to_place);
2373 }
2374 }
2375
2376 // otherwise heapified array with new root element; rearrange to heap
2377 void reheap() {
2378 size_type cur_pos=0, child=1;
2379 while (child < mark) {
2380 size_type target = child;
2381 if (child+1<mark &&
2382 compare(this->get_my_item(child),
2383 this->get_my_item(child+1)))
2384 ++target;
2385 // target now has the higher priority child
2386 if (compare(this->get_my_item(target),
2387 this->get_my_item(cur_pos)))
2388 break;
2389 // swap
2390 this->swap_items(cur_pos, target);
2391 cur_pos = target;
2392 child = (cur_pos<<1)+1;
2393 }
2394 }
2395}; // priority_queue_node
2396
2397} // interfaceX
2398
2399namespace interface11 {
2400
2401using namespace interface10;
2402namespace internal = interface10::internal;
2403
2404//! Forwards messages only if the threshold has not been reached
2405/** This node forwards items until its threshold is reached.
2406 It contains no buffering. If the downstream node rejects, the
2407 message is dropped. */
2408template< typename T, typename DecrementType=continue_msg >
2409class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2410public:
2411 typedef T input_type;
2412 typedef T output_type;
2413 typedef typename receiver<input_type>::predecessor_type predecessor_type;
2414 typedef typename sender<output_type>::successor_type successor_type;
2415#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2416 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
2417 typedef typename sender<output_type>::built_successors_type built_successors_type;
2418 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2419 typedef typename sender<output_type>::successor_list_type successor_list_type;
2420#endif
2421 //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
2422
2423private:
2424 size_t my_threshold;
2425 size_t my_count; //number of successful puts
2426 size_t my_tries; //number of active put attempts
2427 internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors;
2428 spin_mutex my_mutex;
2429 internal::broadcast_cache< T > my_successors;
2430 __TBB_DEPRECATED_LIMITER_EXPR( int init_decrement_predecessors; )
2431
2432 friend class internal::forward_task_bypass< limiter_node<T,DecrementType> >;
2433
2434 // Let decrementer call decrement_counter()
2435 friend class internal::decrementer< limiter_node<T,DecrementType>, DecrementType >;
2436
2437 bool check_conditions() { // always called under lock
2438 return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
2439 }
2440
2441 // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
2442 task *forward_task() {
2443 input_type v;
2444 task *rval = NULL;
2445 bool reserved = false;
2446 {
2447 spin_mutex::scoped_lock lock(my_mutex);
2448 if ( check_conditions() )
2449 ++my_tries;
2450 else
2451 return NULL;
2452 }
2453
2454 //SUCCESS
2455 // if we can reserve and can put, we consume the reservation
2456 // we increment the count and decrement the tries
2457 if ( (my_predecessors.try_reserve(v)) == true ){
2458 reserved=true;
2459 if ( (rval = my_successors.try_put_task(v)) != NULL ){
2460 {
2461 spin_mutex::scoped_lock lock(my_mutex);
2462 ++my_count;
2463 --my_tries;
2464 my_predecessors.try_consume();
2465 if ( check_conditions() ) {
2466 if ( internal::is_graph_active(this->my_graph) ) {
2467 task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2468 internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
2469 internal::spawn_in_graph_arena(graph_reference(), *rtask);
2470 }
2471 }
2472 }
2473 return rval;
2474 }
2475 }
2476 //FAILURE
2477 //if we can't reserve, we decrement the tries
2478 //if we can reserve but can't put, we decrement the tries and release the reservation
2479 {
2480 spin_mutex::scoped_lock lock(my_mutex);
2481 --my_tries;
2482 if (reserved) my_predecessors.try_release();
2483 if ( check_conditions() ) {
2484 if ( internal::is_graph_active(this->my_graph) ) {
2485 task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2486 internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
2487 __TBB_ASSERT(!rval, "Have two tasks to handle");
2488 return rtask;
2489 }
2490 }
2491 return rval;
2492 }
2493 }
2494
2495 void forward() {
2496 __TBB_ASSERT(false, "Should never be called");
2497 return;
2498 }
2499
2500 task* decrement_counter( long long delta ) {
2501 {
2502 spin_mutex::scoped_lock lock(my_mutex);
2503 if( delta > 0 && size_t(delta) > my_count )
2504 my_count = 0;
2505 else if( delta < 0 && size_t(delta) > my_threshold - my_count )
2506 my_count = my_threshold;
2507 else
2508 my_count -= size_t(delta); // absolute value of delta is sufficiently small
2509 }
2510 return forward_task();
2511 }
2512
2513 void initialize() {
2514 my_predecessors.set_owner(this);
2515 my_successors.set_owner(this);
2516 decrement.set_owner(this);
2517 tbb::internal::fgt_node(
2518 tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2519 static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
2520 static_cast<sender<output_type> *>(this)
2521 );
2522 }
2523public:
2524 //! The internal receiver< DecrementType > that decrements the count
2525 internal::decrementer< limiter_node<T, DecrementType>, DecrementType > decrement;
2526
2527#if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
2528 __TBB_STATIC_ASSERT( (tbb::internal::is_same_type<DecrementType, continue_msg>::value),
2529 "Deprecated interface of the limiter node can be used only in conjunction "
2530 "with continue_msg as the type of DecrementType template parameter." );
2531#endif // Check for incompatible interface
2532
2533 //! Constructor
2534 limiter_node(graph &g,
2535 __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
2536 : graph_node(g), my_threshold(threshold), my_count(0),
2537 __TBB_DEPRECATED_LIMITER_ARG4(
2538 my_tries(0), decrement(),
2539 init_decrement_predecessors(num_decrement_predecessors),
2540 decrement(num_decrement_predecessors)) {
2541 initialize();
2542 }
2543
2544 //! Copy constructor
2545 limiter_node( const limiter_node& src ) :
2546 graph_node(src.my_graph), receiver<T>(), sender<T>(),
2547 my_threshold(src.my_threshold), my_count(0),
2548 __TBB_DEPRECATED_LIMITER_ARG4(
2549 my_tries(0), decrement(),
2550 init_decrement_predecessors(src.init_decrement_predecessors),
2551 decrement(src.init_decrement_predecessors)) {
2552 initialize();
2553 }
2554
2555#if TBB_PREVIEW_FLOW_GRAPH_TRACE
2556 void set_name( const char *name ) __TBB_override {
2557 tbb::internal::fgt_node_desc( this, name );
2558 }
2559#endif
2560
2561 //! Replace the current successor with this new successor
2562 bool register_successor( successor_type &r ) __TBB_override {
2563 spin_mutex::scoped_lock lock(my_mutex);
2564 bool was_empty = my_successors.empty();
2565 my_successors.register_successor(r);
2566 //spawn a forward task if this is the only successor
2567 if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2568 if ( internal::is_graph_active(this->my_graph) ) {
2569 task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2570 internal::forward_task_bypass < limiter_node<T, DecrementType> >( *this );
2571 internal::spawn_in_graph_arena(graph_reference(), *task);
2572 }
2573 }
2574 return true;
2575 }
2576
2577 //! Removes a successor from this node
2578 /** r.remove_predecessor(*this) is also called. */
2579 bool remove_successor( successor_type &r ) __TBB_override {
2580 r.remove_predecessor(*this);
2581 my_successors.remove_successor(r);
2582 return true;
2583 }
2584
2585#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2586 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
2587 built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
2588
2589 void internal_add_built_successor(successor_type &src) __TBB_override {
2590 my_successors.internal_add_built_successor(src);
2591 }
2592
2593 void internal_delete_built_successor(successor_type &src) __TBB_override {
2594 my_successors.internal_delete_built_successor(src);
2595 }
2596
2597 size_t successor_count() __TBB_override { return my_successors.successor_count(); }
2598
2599 void copy_successors(successor_list_type &v) __TBB_override {
2600 my_successors.copy_successors(v);
2601 }
2602
2603 void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
2604 my_predecessors.internal_add_built_predecessor(src);
2605 }
2606
2607 void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
2608 my_predecessors.internal_delete_built_predecessor(src);
2609 }
2610
2611 size_t predecessor_count() __TBB_override { return my_predecessors.predecessor_count(); }
2612
2613 void copy_predecessors(predecessor_list_type &v) __TBB_override {
2614 my_predecessors.copy_predecessors(v);
2615 }
2616
2617 void extract() __TBB_override {
2618 my_count = 0;
2619 my_successors.built_successors().sender_extract(*this);
2620 my_predecessors.built_predecessors().receiver_extract(*this);
2621 decrement.built_predecessors().receiver_extract(decrement);
2622 }
2623#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2624
2625 //! Adds src to the list of cached predecessors.
2626 bool register_predecessor( predecessor_type &src ) __TBB_override {
2627 spin_mutex::scoped_lock lock(my_mutex);
2628 my_predecessors.add( src );
2629 if ( my_count + my_tries < my_threshold && !my_successors.empty() && internal::is_graph_active(this->my_graph) ) {
2630 task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2631 internal::forward_task_bypass < limiter_node<T, DecrementType> >( *this );
2632 internal::spawn_in_graph_arena(graph_reference(), *task);
2633 }
2634 return true;
2635 }
2636
2637 //! Removes src from the list of cached predecessors.
2638 bool remove_predecessor( predecessor_type &src ) __TBB_override {
2639 my_predecessors.remove( src );
2640 return true;
2641 }
2642
2643protected:
2644
2645 template< typename R, typename B > friend class run_and_put_task;
2646 template<typename X, typename Y> friend class internal::broadcast_cache;
2647 template<typename X, typename Y> friend class internal::round_robin_cache;
2648 //! Puts an item to this receiver
2649 task *try_put_task( const T &t ) __TBB_override {
2650 {
2651 spin_mutex::scoped_lock lock(my_mutex);
2652 if ( my_count + my_tries >= my_threshold )
2653 return NULL;
2654 else
2655 ++my_tries;
2656 }
2657
2658 task * rtask = my_successors.try_put_task(t);
2659
2660 if ( !rtask ) { // try_put_task failed.
2661 spin_mutex::scoped_lock lock(my_mutex);
2662 --my_tries;
2663 if (check_conditions() && internal::is_graph_active(this->my_graph)) {
2664 rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2665 internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
2666 }
2667 }
2668 else {
2669 spin_mutex::scoped_lock lock(my_mutex);
2670 ++my_count;
2671 --my_tries;
2672 }
2673 return rtask;
2674 }
2675
2676 graph& graph_reference() __TBB_override { return my_graph; }
2677
2678 void reset_receiver(reset_flags /*f*/) __TBB_override {
2679 __TBB_ASSERT(false,NULL); // should never be called
2680 }
2681
2682 void reset_node( reset_flags f) __TBB_override {
2683 my_count = 0;
2684 if(f & rf_clear_edges) {
2685 my_predecessors.clear();
2686 my_successors.clear();
2687 }
2688 else
2689 {
2690 my_predecessors.reset( );
2691 }
2692 decrement.reset_receiver(f);
2693 }
2694}; // limiter_node
2695} // namespace interfaceX
2696
2697namespace interface10 {
2698
2699#include "internal/_flow_graph_join_impl.h"
2700
2701using internal::reserving_port;
2702using internal::queueing_port;
2703using internal::key_matching_port;
2704using internal::input_port;
2705using internal::tag_value;
2706
2707template<typename OutputTuple, typename JP=queueing> class join_node;
2708
2709template<typename OutputTuple>
2710class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
2711private:
2712 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
2713 typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
2714public:
2715 typedef OutputTuple output_type;
2716 typedef typename unfolded_type::input_ports_type input_ports_type;
2717 explicit join_node(graph &g) : unfolded_type(g) {
2718 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2719 this->input_ports(), static_cast< sender< output_type > *>(this) );
2720 }
2721 join_node(const join_node &other) : unfolded_type(other) {
2722 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2723 this->input_ports(), static_cast< sender< output_type > *>(this) );
2724 }
2725
2726#if TBB_PREVIEW_FLOW_GRAPH_TRACE
2727 void set_name( const char *name ) __TBB_override {
2728 tbb::internal::fgt_node_desc( this, name );
2729 }
2730#endif
2731
2732};
2733
2734template<typename OutputTuple>
2735class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
2736private:
2737 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
2738 typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
2739public:
2740 typedef OutputTuple output_type;
2741 typedef typename unfolded_type::input_ports_type input_ports_type;
2742 explicit join_node(graph &g) : unfolded_type(g) {
2743 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2744 this->input_ports(), static_cast< sender< output_type > *>(this) );
2745 }
2746 join_node(const join_node &other) : unfolded_type(other) {
2747 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2748 this->input_ports(), static_cast< sender< output_type > *>(this) );
2749 }
2750
2751#if TBB_PREVIEW_FLOW_GRAPH_TRACE
2752 void set_name( const char *name ) __TBB_override {
2753 tbb::internal::fgt_node_desc( this, name );
2754 }
2755#endif
2756
2757};
2758
2759// template for key_matching join_node
2760// tag_matching join_node is a specialization of key_matching, and is source-compatible.
2761template<typename OutputTuple, typename K, typename KHash>
2762class join_node<OutputTuple, key_matching<K, KHash> > : public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value,
2763 key_matching_port, OutputTuple, key_matching<K,KHash> > {
2764private:
2765 static const int N = tbb::flow::tuple_size<OutputTuple>::value;
2766 typedef typename internal::unfolded_join_node<N, key_matching_port, OutputTuple, key_matching<K,KHash> > unfolded_type;
2767public:
2768 typedef OutputTuple output_type;
2769 typedef typename unfolded_type::input_ports_type input_ports_type;
2770
2771#if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
2772 join_node(graph &g) : unfolded_type(g) {}
2773#endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
2774
2775 template<typename __TBB_B0, typename __TBB_B1>
2776 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
2777 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2778 this->input_ports(), static_cast< sender< output_type > *>(this) );
2779 }
2780 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
2781 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
2782 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2783 this->input_ports(), static_cast< sender< output_type > *>(this) );
2784 }
2785 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
2786 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
2787 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2788 this->input_ports(), static_cast< sender< output_type > *>(this) );
2789 }
2790 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
2791 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
2792 unfolded_type(g, b0, b1, b2, b3, b4) {
2793 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2794 this->input_ports(), static_cast< sender< output_type > *>(this) );
2795 }
2796#if __TBB_VARIADIC_MAX >= 6
2797 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2798 typename __TBB_B5>
2799 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
2800 unfolded_type(g, b0, b1, b2, b3, b4, b5) {
2801 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2802 this->input_ports(), static_cast< sender< output_type > *>(this) );
2803 }
2804#endif
2805#if __TBB_VARIADIC_MAX >= 7
2806 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2807 typename __TBB_B5, typename __TBB_B6>
2808 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
2809 unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
2810 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2811 this->input_ports(), static_cast< sender< output_type > *>(this) );
2812 }
2813#endif
2814#if __TBB_VARIADIC_MAX >= 8
2815 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2816 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
2817 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2818 __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
2819 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2820 this->input_ports(), static_cast< sender< output_type > *>(this) );
2821 }
2822#endif
2823#if __TBB_VARIADIC_MAX >= 9
2824 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2825 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
2826 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2827 __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
2828 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2829 this->input_ports(), static_cast< sender< output_type > *>(this) );
2830 }
2831#endif
2832#if __TBB_VARIADIC_MAX >= 10
2833 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2834 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
2835 join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2836 __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
2837 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2838 this->input_ports(), static_cast< sender< output_type > *>(this) );
2839 }
2840#endif
2841 join_node(const join_node &other) : unfolded_type(other) {
2842 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2843 this->input_ports(), static_cast< sender< output_type > *>(this) );
2844 }
2845
2846#if TBB_PREVIEW_FLOW_GRAPH_TRACE
2847 void set_name( const char *name ) __TBB_override {
2848 tbb::internal::fgt_node_desc( this, name );
2849 }
2850#endif
2851
2852};
2853
2854// indexer node
2855#include "internal/_flow_graph_indexer_impl.h"
2856
2857// TODO: Implement interface with variadic template or tuple
2858template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
2859 typename T4=null_type, typename T5=null_type, typename T6=null_type,
2860 typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
2861
2862//indexer node specializations
2863template<typename T0>
2864class indexer_node<T0> : public internal::unfolded_indexer_node<tuple<T0> > {
2865private:
2866 static const int N = 1;
2867public:
2868 typedef tuple<T0> InputTuple;
2869 typedef typename internal::tagged_msg<size_t, T0> output_type;
2870 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
2871 indexer_node(graph& g) : unfolded_type(g) {
2872 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2873 this->input_ports(), static_cast< sender< output_type > *>(this) );
2874 }
2875 // Copy constructor
2876 indexer_node( const indexer_node& other ) : unfolded_type(other) {
2877 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2878 this->input_ports(), static_cast< sender< output_type > *>(this) );
2879 }
2880
2881#if TBB_PREVIEW_FLOW_GRAPH_TRACE
2882 void set_name( const char *name ) __TBB_override {
2883 tbb::internal::fgt_node_desc( this, name );
2884 }
2885#endif
2886};
2887
2888template<typename T0, typename T1>
2889class indexer_node<T0, T1> : public internal::unfolded_indexer_node<tuple<T0, T1> > {
2890private:
2891 static const int N = 2;
2892public:
2893 typedef tuple<T0, T1> InputTuple;
2894 typedef typename internal::tagged_msg<size_t, T0, T1> output_type;
2895 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
2896 indexer_node(graph& g) : unfolded_type(g) {
2897 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2898 this->input_ports(), static_cast< sender< output_type > *>(this) );
2899 }
2900 // Copy constructor
2901 indexer_node( const indexer_node& other ) : unfolded_type(other) {
2902 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2903 this->input_ports(), static_cast< sender< output_type > *>(this) );
2904 }
2905
2906#if TBB_PREVIEW_FLOW_GRAPH_TRACE
2907 void set_name( const char *name ) __TBB_override {
2908 tbb::internal::fgt_node_desc( this, name );
2909 }
2910#endif
2911};
2912
2913template<typename T0, typename T1, typename T2>
2914class indexer_node<T0, T1, T2> : public internal::unfolded_indexer_node<tuple<T0, T1, T2> > {
2915private:
2916 static const int N = 3;
2917public:
2918 typedef tuple<T0, T1, T2> InputTuple;
2919 typedef typename internal::tagged_msg<size_t, T0, T1, T2> output_type;
2920 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
2921 indexer_node(graph& g) : unfolded_type(g) {
2922 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2923 this->input_ports(), static_cast< sender< output_type > *>(this) );
2924 }
2925 // Copy constructor
2926 indexer_node( const indexer_node& other ) : unfolded_type(other) {
2927 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2928 this->input_ports(), static_cast< sender< output_type > *>(this) );
2929 }
2930
2931#if TBB_PREVIEW_FLOW_GRAPH_TRACE
2932 void set_name( const char *name ) __TBB_override {
2933 tbb::internal::fgt_node_desc( this, name );
2934 }
2935#endif
2936};
2937
2938template<typename T0, typename T1, typename T2, typename T3>
2939class indexer_node<T0, T1, T2, T3> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3> > {
2940private:
2941 static const int N = 4;
2942public:
2943 typedef tuple<T0, T1, T2, T3> InputTuple;
2944 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3> output_type;
2945 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
2946 indexer_node(graph& g) : unfolded_type(g) {
2947 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2948 this->input_ports(), static_cast< sender< output_type > *>(this) );
2949 }
2950 // Copy constructor
2951 indexer_node( const indexer_node& other ) : unfolded_type(other) {
2952 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2953 this->input_ports(), static_cast< sender< output_type > *>(this) );
2954 }
2955
2956#if TBB_PREVIEW_FLOW_GRAPH_TRACE
2957 void set_name( const char *name ) __TBB_override {
2958 tbb::internal::fgt_node_desc( this, name );
2959 }
2960#endif
2961};
2962
2963template<typename T0, typename T1, typename T2, typename T3, typename T4>
2964class indexer_node<T0, T1, T2, T3, T4> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4> > {
2965private:
2966 static const int N = 5;
2967public:
2968 typedef tuple<T0, T1, T2, T3, T4> InputTuple;
2969 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
2970 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
2971 indexer_node(graph& g) : unfolded_type(g) {
2972 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2973 this->input_ports(), static_cast< sender< output_type > *>(this) );
2974 }
2975 // Copy constructor
2976 indexer_node( const indexer_node& other ) : unfolded_type(other) {
2977 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2978 this->input_ports(), static_cast< sender< output_type > *>(this) );
2979 }
2980
2981#if TBB_PREVIEW_FLOW_GRAPH_TRACE
2982 void set_name( const char *name ) __TBB_override {
2983 tbb::internal::fgt_node_desc( this, name );
2984 }
2985#endif
2986};
2987
2988#if __TBB_VARIADIC_MAX >= 6
2989template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
2990class indexer_node<T0, T1, T2, T3, T4, T5> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
2991private:
2992 static const int N = 6;
2993public:
2994 typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
2995 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
2996 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
2997 indexer_node(graph& g) : unfolded_type(g) {
2998 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
2999 this->input_ports(), static_cast< sender< output_type > *>(this) );
3000 }
3001 // Copy constructor
3002 indexer_node( const indexer_node& other ) : unfolded_type(other) {
3003 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3004 this->input_ports(), static_cast< sender< output_type > *>(this) );
3005 }
3006
3007#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3008 void set_name( const char *name ) __TBB_override {
3009 tbb::internal::fgt_node_desc( this, name );
3010 }
3011#endif
3012};
3013#endif //variadic max 6
3014
3015#if __TBB_VARIADIC_MAX >= 7
3016template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3017 typename T6>
3018class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
3019private:
3020 static const int N = 7;
3021public:
3022 typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
3023 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
3024 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3025 indexer_node(graph& g) : unfolded_type(g) {
3026 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3027 this->input_ports(), static_cast< sender< output_type > *>(this) );
3028 }
3029 // Copy constructor
3030 indexer_node( const indexer_node& other ) : unfolded_type(other) {
3031 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3032 this->input_ports(), static_cast< sender< output_type > *>(this) );
3033 }
3034
3035#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3036 void set_name( const char *name ) __TBB_override {
3037 tbb::internal::fgt_node_desc( this, name );
3038 }
3039#endif
3040};
3041#endif //variadic max 7
3042
3043#if __TBB_VARIADIC_MAX >= 8
3044template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3045 typename T6, typename T7>
3046class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3047private:
3048 static const int N = 8;
3049public:
3050 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
3051 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type;
3052 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3053 indexer_node(graph& g) : unfolded_type(g) {
3054 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3055 this->input_ports(), static_cast< sender< output_type > *>(this) );
3056 }
3057 // Copy constructor
3058 indexer_node( const indexer_node& other ) : unfolded_type(other) {
3059 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3060 this->input_ports(), static_cast< sender< output_type > *>(this) );
3061 }
3062
3063#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3064 void set_name( const char *name ) __TBB_override {
3065 tbb::internal::fgt_node_desc( this, name );
3066 }
3067#endif
3068};
3069#endif //variadic max 8
3070
3071#if __TBB_VARIADIC_MAX >= 9
3072template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3073 typename T6, typename T7, typename T8>
3074class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3075private:
3076 static const int N = 9;
3077public:
3078 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
3079 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type;
3080 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3081 indexer_node(graph& g) : unfolded_type(g) {
3082 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3083 this->input_ports(), static_cast< sender< output_type > *>(this) );
3084 }
3085 // Copy constructor
3086 indexer_node( const indexer_node& other ) : unfolded_type(other) {
3087 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3088 this->input_ports(), static_cast< sender< output_type > *>(this) );
3089 }
3090
3091#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3092 void set_name( const char *name ) __TBB_override {
3093 tbb::internal::fgt_node_desc( this, name );
3094 }
3095#endif
3096};
3097#endif //variadic max 9
3098
3099#if __TBB_VARIADIC_MAX >= 10
3100template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3101 typename T6, typename T7, typename T8, typename T9>
3102class indexer_node/*default*/ : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
3103private:
3104 static const int N = 10;
3105public:
3106 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
3107 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
3108 typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
3109 indexer_node(graph& g) : unfolded_type(g) {
3110 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3111 this->input_ports(), static_cast< sender< output_type > *>(this) );
3112 }
3113 // Copy constructor
3114 indexer_node( const indexer_node& other ) : unfolded_type(other) {
3115 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3116 this->input_ports(), static_cast< sender< output_type > *>(this) );
3117 }
3118
3119#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3120 void set_name( const char *name ) __TBB_override {
3121 tbb::internal::fgt_node_desc( this, name );
3122 }
3123#endif
3124};
3125#endif //variadic max 10
3126
3127#if __TBB_PREVIEW_ASYNC_MSG
3128inline void internal_make_edge( internal::untyped_sender &p, internal::untyped_receiver &s ) {
3129#else
3130template< typename T >
3131inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
3132#endif
3133#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3134 s.internal_add_built_predecessor(p);
3135 p.internal_add_built_successor(s);
3136#endif
3137 p.register_successor( s );
3138 tbb::internal::fgt_make_edge( &p, &s );
3139}
3140
3141//! Makes an edge between a single predecessor and a single successor
3142template< typename T >
3143inline void make_edge( sender<T> &p, receiver<T> &s ) {
3144 internal_make_edge( p, s );
3145}
3146
3147#if __TBB_PREVIEW_ASYNC_MSG
3148template< typename TS, typename TR,
3149 typename = typename tbb::internal::enable_if<tbb::internal::is_same_type<TS, internal::untyped_sender>::value
3150 || tbb::internal::is_same_type<TR, internal::untyped_receiver>::value>::type>
3151inline void make_edge( TS &p, TR &s ) {
3152 internal_make_edge( p, s );
3153}
3154
3155template< typename T >
3156inline void make_edge( sender<T> &p, receiver<typename T::async_msg_data_type> &s ) {
3157 internal_make_edge( p, s );
3158}
3159
3160template< typename T >
3161inline void make_edge( sender<typename T::async_msg_data_type> &p, receiver<T> &s ) {
3162 internal_make_edge( p, s );
3163}
3164
3165#endif // __TBB_PREVIEW_ASYNC_MSG
3166
3167#if __TBB_FLOW_GRAPH_CPP11_FEATURES
3168//Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
3169template< typename T, typename V,
3170 typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3171inline void make_edge( T& output, V& input) {
3172 make_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3173}
3174
3175//Makes an edge from port 0 of a multi-output predecessor to a receiver.
3176template< typename T, typename R,
3177 typename = typename T::output_ports_type >
3178inline void make_edge( T& output, receiver<R>& input) {
3179 make_edge(get<0>(output.output_ports()), input);
3180}
3181
3182//Makes an edge from a sender to port 0 of a multi-input successor.
3183template< typename S, typename V,
3184 typename = typename V::input_ports_type >
3185inline void make_edge( sender<S>& output, V& input) {
3186 make_edge(output, get<0>(input.input_ports()));
3187}
3188#endif
3189
3190#if __TBB_PREVIEW_ASYNC_MSG
3191inline void internal_remove_edge( internal::untyped_sender &p, internal::untyped_receiver &s ) {
3192#else
3193template< typename T >
3194inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
3195#endif
3196 p.remove_successor( s );
3197#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3198 // TODO: should we try to remove p from the predecessor list of s, in case the edge is reversed?
3199 p.internal_delete_built_successor(s);
3200 s.internal_delete_built_predecessor(p);
3201#endif
3202 tbb::internal::fgt_remove_edge( &p, &s );
3203}
3204
3205//! Removes an edge between a single predecessor and a single successor
3206template< typename T >
3207inline void remove_edge( sender<T> &p, receiver<T> &s ) {
3208 internal_remove_edge( p, s );
3209}
3210
3211#if __TBB_PREVIEW_ASYNC_MSG
3212template< typename TS, typename TR,
3213 typename = typename tbb::internal::enable_if<tbb::internal::is_same_type<TS, internal::untyped_sender>::value
3214 || tbb::internal::is_same_type<TR, internal::untyped_receiver>::value>::type>
3215inline void remove_edge( TS &p, TR &s ) {
3216 internal_remove_edge( p, s );
3217}
3218
3219template< typename T >
3220inline void remove_edge( sender<T> &p, receiver<typename T::async_msg_data_type> &s ) {
3221 internal_remove_edge( p, s );
3222}
3223
3224template< typename T >
3225inline void remove_edge( sender<typename T::async_msg_data_type> &p, receiver<T> &s ) {
3226 internal_remove_edge( p, s );
3227}
3228#endif // __TBB_PREVIEW_ASYNC_MSG
3229
3230#if __TBB_FLOW_GRAPH_CPP11_FEATURES
3231//Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
3232template< typename T, typename V,
3233 typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3234inline void remove_edge( T& output, V& input) {
3235 remove_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3236}
3237
3238//Removes an edge between port 0 of a multi-output predecessor and a receiver.
3239template< typename T, typename R,
3240 typename = typename T::output_ports_type >
3241inline void remove_edge( T& output, receiver<R>& input) {
3242 remove_edge(get<0>(output.output_ports()), input);
3243}
3244//Removes an edge between a sender and port 0 of a multi-input successor.
3245template< typename S, typename V,
3246 typename = typename V::input_ports_type >
3247inline void remove_edge( sender<S>& output, V& input) {
3248 remove_edge(output, get<0>(input.input_ports()));
3249}
3250#endif
3251
3252#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3253template<typename C >
3254template< typename S >
3255void internal::edge_container<C>::sender_extract( S &s ) {
3256 edge_list_type e = built_edges;
3257 for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3258 remove_edge(s, **i);
3259 }
3260}
3261
3262template<typename C >
3263template< typename R >
3264void internal::edge_container<C>::receiver_extract( R &r ) {
3265 edge_list_type e = built_edges;
3266 for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3267 remove_edge(**i, r);
3268 }
3269}
3270#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3271
3272//! Returns a copy of the body from a function or continue node
3273template< typename Body, typename Node >
3274Body copy_body( Node &n ) {
3275 return n.template copy_function_object<Body>();
3276}
3277
3278#if __TBB_FLOW_GRAPH_CPP11_FEATURES
3279
3280//composite_node
3281template< typename InputTuple, typename OutputTuple > class composite_node;
3282
3283template< typename... InputTypes, typename... OutputTypes>
3284class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<OutputTypes...> > : public graph_node{
3285
3286public:
3287 typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3288 typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3289
3290private:
3291 std::unique_ptr<input_ports_type> my_input_ports;
3292 std::unique_ptr<output_ports_type> my_output_ports;
3293
3294 static const size_t NUM_INPUTS = sizeof...(InputTypes);
3295 static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3296
3297protected:
3298 void reset_node(reset_flags) __TBB_override {}
3299
3300public:
3301#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3302 composite_node( graph &g, const char *type_name = "composite_node" ) : graph_node(g) {
3303 tbb::internal::fgt_multiinput_multioutput_node( tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
3304 tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_name );
3305 }
3306#else
3307 composite_node( graph &g ) : graph_node(g) {
3308 tbb::internal::fgt_multiinput_multioutput_node( tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
3309 }
3310#endif
3311
3312 template<typename T1, typename T2>
3313 void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
3314 __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
3315 __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
3316 my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T1>(input_ports_tuple));
3317 my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T2>(output_ports_tuple));
3318
3319 tbb::internal::fgt_internal_input_alias_helper<T1, NUM_INPUTS>::alias_port( this, input_ports_tuple);
3320 tbb::internal::fgt_internal_output_alias_helper<T2, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
3321 }
3322
3323 template< typename... NodeTypes >
3324 void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3325
3326 template< typename... NodeTypes >
3327 void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3328
3329#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3330 void set_name( const char *name ) __TBB_override {
3331 tbb::internal::fgt_multiinput_multioutput_node_desc( this, name );
3332 }
3333#endif
3334
3335 input_ports_type& input_ports() {
3336 __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
3337 return *my_input_ports;
3338 }
3339
3340 output_ports_type& output_ports() {
3341 __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
3342 return *my_output_ports;
3343 }
3344
3345#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3346 void extract() __TBB_override {
3347 __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3348 }
3349#endif
3350}; // class composite_node
3351
3352//composite_node with only input ports
3353template< typename... InputTypes>
3354class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> > : public graph_node {
3355public:
3356 typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3357
3358private:
3359 std::unique_ptr<input_ports_type> my_input_ports;
3360 static const size_t NUM_INPUTS = sizeof...(InputTypes);
3361
3362protected:
3363 void reset_node(reset_flags) __TBB_override {}
3364
3365public:
3366#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3367 composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
3368 tbb::internal::fgt_composite( this, &g );
3369 tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_name );
3370 }
3371#else
3372 composite_node( graph &g ) : graph_node(g) {
3373 tbb::internal::fgt_composite( this, &g );
3374 }
3375#endif
3376
3377 template<typename T>
3378 void set_external_ports(T&& input_ports_tuple) {
3379 __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
3380
3381 my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T>(input_ports_tuple));
3382
3383 tbb::internal::fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, std::forward<T>(input_ports_tuple));
3384 }
3385
3386 template< typename... NodeTypes >
3387 void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3388
3389 template< typename... NodeTypes >
3390 void add_nodes( const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3391
3392#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3393 void set_name( const char *name ) __TBB_override {
3394 tbb::internal::fgt_multiinput_multioutput_node_desc( this, name );
3395 }
3396#endif
3397
3398 input_ports_type& input_ports() {
3399 __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
3400 return *my_input_ports;
3401 }
3402
3403#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3404 void extract() __TBB_override {
3405 __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3406 }
3407#endif
3408
3409}; // class composite_node
3410
3411//composite_nodes with only output_ports
3412template<typename... OutputTypes>
3413class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...> > : public graph_node {
3414public:
3415 typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3416
3417private:
3418 std::unique_ptr<output_ports_type> my_output_ports;
3419 static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3420
3421protected:
3422 void reset_node(reset_flags) __TBB_override {}
3423
3424public:
3425#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3426 composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
3427 tbb::internal::fgt_composite( this, &g );
3428 tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_name );
3429 }
3430#else
3431 composite_node( graph &g ) : graph_node(g) {
3432 tbb::internal::fgt_composite( this, &g );
3433 }
3434#endif
3435
3436 template<typename T>
3437 void set_external_ports(T&& output_ports_tuple) {
3438 __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
3439
3440 my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T>(output_ports_tuple));
3441
3442 tbb::internal::fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, std::forward<T>(output_ports_tuple));
3443 }
3444
3445 template<typename... NodeTypes >
3446 void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3447
3448 template<typename... NodeTypes >
3449 void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3450
3451#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3452 void set_name( const char *name ) __TBB_override {
3453 tbb::internal::fgt_multiinput_multioutput_node_desc( this, name );
3454 }
3455#endif
3456
3457 output_ports_type& output_ports() {
3458 __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
3459 return *my_output_ports;
3460 }
3461
3462#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3463 void extract() __TBB_override {
3464 __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3465 }
3466#endif
3467
3468}; // class composite_node
3469
3470#endif // __TBB_FLOW_GRAPH_CPP11_FEATURES
3471
3472namespace internal {
3473
3474template<typename Gateway>
3475class async_body_base: tbb::internal::no_assign {
3476public:
3477 typedef Gateway gateway_type;
3478
3479 async_body_base(gateway_type *gateway): my_gateway(gateway) { }
3480 void set_gateway(gateway_type *gateway) {
3481 my_gateway = gateway;
3482 }
3483
3484protected:
3485 gateway_type *my_gateway;
3486};
3487
3488template<typename Input, typename Ports, typename Gateway, typename Body>
3489class async_body: public async_body_base<Gateway> {
3490public:
3491 typedef async_body_base<Gateway> base_type;
3492 typedef Gateway gateway_type;
3493
3494 async_body(const Body &body, gateway_type *gateway)
3495 : base_type(gateway), my_body(body) { }
3496
3497 void operator()( const Input &v, Ports & ) {
3498 my_body(v, *this->my_gateway);
3499 }
3500
3501 Body get_body() { return my_body; }
3502
3503private:
3504 Body my_body;
3505};
3506
3507}
3508
3509//! Implements async node
3510template < typename Input, typename Output,
3511 typename Policy = queueing_lightweight,
3512 typename Allocator=cache_aligned_allocator<Input> >
3513class async_node : public multifunction_node< Input, tuple< Output >, Policy, Allocator >, public sender< Output > {
3514 typedef multifunction_node< Input, tuple< Output >, Policy, Allocator > base_type;
3515 typedef typename internal::multifunction_input<Input, typename base_type::output_ports_type, Policy, Allocator> mfn_input_type;
3516
3517public:
3518 typedef Input input_type;
3519 typedef Output output_type;
3520 typedef receiver<input_type> receiver_type;
3521 typedef typename receiver_type::predecessor_type predecessor_type;
3522 typedef typename sender<output_type>::successor_type successor_type;
3523 typedef receiver_gateway<output_type> gateway_type;
3524 typedef internal::async_body_base<gateway_type> async_body_base_type;
3525 typedef typename base_type::output_ports_type output_ports_type;
3526
3527private:
3528 struct try_put_functor {
3529 typedef internal::multifunction_output<Output> output_port_type;
3530 output_port_type *port;
3531 // TODO: pass value by copy since we do not want to block asynchronous thread.
3532 const Output *value;
3533 bool result;
3534 try_put_functor(output_port_type &p, const Output &v) : port(&p), value(&v), result(false) { }
3535 void operator()() {
3536 result = port->try_put(*value);
3537 }
3538 };
3539
3540 class receiver_gateway_impl: public receiver_gateway<Output> {
3541 public:
3542 receiver_gateway_impl(async_node* node): my_node(node) {}
3543 void reserve_wait() __TBB_override {
3544 tbb::internal::fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
3545 my_node->my_graph.reserve_wait();
3546 }
3547
3548 void release_wait() __TBB_override {
3549 my_node->my_graph.release_wait();
3550 tbb::internal::fgt_async_commit(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
3551 }
3552
3553 //! Implements gateway_type::try_put for an external activity to submit a message to FG
3554 bool try_put(const Output &i) __TBB_override {
3555 return my_node->try_put_impl(i);
3556 }
3557
3558 private:
3559 async_node* my_node;
3560 } my_gateway;
3561
3562 //The substitute of 'this' for member construction, to prevent compiler warnings
3563 async_node* self() { return this; }
3564
3565 //! Implements gateway_type::try_put for an external activity to submit a message to FG
3566 bool try_put_impl(const Output &i) {
3567 internal::multifunction_output<Output> &port_0 = internal::output_port<0>(*this);
3568 internal::broadcast_cache<output_type>& port_successors = port_0.successors();
3569 tbb::internal::fgt_async_try_put_begin(this, &port_0);
3570 task_list tasks;
3571 bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
3572 __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
3573 "Return status is inconsistent with the method operation." );
3574
3575 while( !tasks.empty() ) {
3576 internal::enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
3577 }
3578 tbb::internal::fgt_async_try_put_end(this, &port_0);
3579 return is_at_least_one_put_successful;
3580 }
3581
3582public:
3583 template<typename Body>
3584 async_node(
3585 graph &g, size_t concurrency,
3586 __TBB_FLOW_GRAPH_PRIORITY_ARG1( Body body, node_priority_t priority = tbb::flow::internal::no_priority )
3587 ) : base_type(
3588 g, concurrency,
3589 internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
3590 (body, &my_gateway) __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority) ), my_gateway(self()) {
3591 tbb::internal::fgt_multioutput_node_with_body<1>(
3592 tbb::internal::FLOW_ASYNC_NODE,
3593 &this->my_graph, static_cast<receiver<input_type> *>(this),
3594 this->output_ports(), this->my_body
3595 );
3596 }
3597
3598 async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
3599 static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
3600 static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
3601
3602 tbb::internal::fgt_multioutput_node_with_body<1>( tbb::internal::FLOW_ASYNC_NODE,
3603 &this->my_graph, static_cast<receiver<input_type> *>(this),
3604 this->output_ports(), this->my_body );
3605 }
3606
3607 gateway_type& gateway() {
3608 return my_gateway;
3609 }
3610
3611#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3612 void set_name( const char *name ) __TBB_override {
3613 tbb::internal::fgt_multioutput_node_desc( this, name );
3614 }
3615#endif
3616
3617 // Define sender< Output >
3618
3619 //! Add a new successor to this node
3620 bool register_successor( successor_type &r ) __TBB_override {
3621 return internal::output_port<0>(*this).register_successor(r);
3622 }
3623
3624 //! Removes a successor from this node
3625 bool remove_successor( successor_type &r ) __TBB_override {
3626 return internal::output_port<0>(*this).remove_successor(r);
3627 }
3628
3629 template<typename Body>
3630 Body copy_function_object() {
3631 typedef internal::multifunction_body<input_type, typename base_type::output_ports_type> mfn_body_type;
3632 typedef internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body> async_body_type;
3633 mfn_body_type &body_ref = *this->my_body;
3634 async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< internal::multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
3635 return ab.get_body();
3636 }
3637
3638#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3639 //! interface to record edges for traversal & deletion
3640 typedef typename internal::edge_container<successor_type> built_successors_type;
3641 typedef typename built_successors_type::edge_list_type successor_list_type;
3642 built_successors_type &built_successors() __TBB_override {
3643 return internal::output_port<0>(*this).built_successors();
3644 }
3645
3646 void internal_add_built_successor( successor_type &r ) __TBB_override {
3647 internal::output_port<0>(*this).internal_add_built_successor(r);
3648 }
3649
3650 void internal_delete_built_successor( successor_type &r ) __TBB_override {
3651 internal::output_port<0>(*this).internal_delete_built_successor(r);
3652 }
3653
3654 void copy_successors( successor_list_type &l ) __TBB_override {
3655 internal::output_port<0>(*this).copy_successors(l);
3656 }
3657
3658 size_t successor_count() __TBB_override {
3659 return internal::output_port<0>(*this).successor_count();
3660 }
3661#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3662
3663protected:
3664
3665 void reset_node( reset_flags f) __TBB_override {
3666 base_type::reset_node(f);
3667 }
3668};
3669
3670#if __TBB_PREVIEW_STREAMING_NODE
3671#include "internal/_flow_graph_streaming_node.h"
3672#endif // __TBB_PREVIEW_STREAMING_NODE
3673
3674} // interfaceX
3675
3676
3677namespace interface10a {
3678
3679using namespace interface10;
3680namespace internal = interface10::internal;
3681
3682template< typename T >
3683class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
3684public:
3685 typedef T input_type;
3686 typedef T output_type;
3687 typedef typename receiver<input_type>::predecessor_type predecessor_type;
3688 typedef typename sender<output_type>::successor_type successor_type;
3689#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3690 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
3691 typedef typename sender<output_type>::built_successors_type built_successors_type;
3692 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
3693 typedef typename sender<output_type>::successor_list_type successor_list_type;
3694#endif
3695
3696 explicit overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
3697 my_successors.set_owner( this );
3698 tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
3699 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
3700 }
3701
3702 //! Copy constructor; doesn't take anything from src; default won't work
3703 overwrite_node( const overwrite_node& src ) :
3704 graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
3705 {
3706 my_successors.set_owner( this );
3707 tbb::internal::fgt_node( tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
3708 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
3709 }
3710
3711 ~overwrite_node() {}
3712
3713#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3714 void set_name( const char *name ) __TBB_override {
3715 tbb::internal::fgt_node_desc( this, name );
3716 }
3717#endif
3718
3719 bool register_successor( successor_type &s ) __TBB_override {
3720 spin_mutex::scoped_lock l( my_mutex );
3721 if (my_buffer_is_valid && internal::is_graph_active( my_graph )) {
3722 // We have a valid value that must be forwarded immediately.
3723 bool ret = s.try_put( my_buffer );
3724 if ( ret ) {
3725 // We add the successor that accepted our put
3726 my_successors.register_successor( s );
3727 } else {
3728 // In case of reservation a race between the moment of reservation and register_successor can appear,
3729 // because failed reserve does not mean that register_successor is not ready to put a message immediately.
3730 // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
3731 // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
3732 task *rtask = new ( task::allocate_additional_child_of( *( my_graph.root_task() ) ) )
3733 register_predecessor_task( *this, s );
3734 internal::spawn_in_graph_arena( my_graph, *rtask );
3735 }
3736 } else {
3737 // No valid value yet, just add as successor
3738 my_successors.register_successor( s );
3739 }
3740 return true;
3741 }
3742
3743 bool remove_successor( successor_type &s ) __TBB_override {
3744 spin_mutex::scoped_lock l( my_mutex );
3745 my_successors.remove_successor(s);
3746 return true;
3747 }
3748
3749#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3750 built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
3751 built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
3752
3753 void internal_add_built_successor( successor_type &s) __TBB_override {
3754 spin_mutex::scoped_lock l( my_mutex );
3755 my_successors.internal_add_built_successor(s);
3756 }
3757
3758 void internal_delete_built_successor( successor_type &s) __TBB_override {
3759 spin_mutex::scoped_lock l( my_mutex );
3760 my_successors.internal_delete_built_successor(s);
3761 }
3762
3763 size_t successor_count() __TBB_override {
3764 spin_mutex::scoped_lock l( my_mutex );
3765 return my_successors.successor_count();
3766 }
3767
3768 void copy_successors(successor_list_type &v) __TBB_override {
3769 spin_mutex::scoped_lock l( my_mutex );
3770 my_successors.copy_successors(v);
3771 }
3772
3773 void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
3774 spin_mutex::scoped_lock l( my_mutex );
3775 my_built_predecessors.add_edge(p);
3776 }
3777
3778 void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
3779 spin_mutex::scoped_lock l( my_mutex );
3780 my_built_predecessors.delete_edge(p);
3781 }
3782
3783 size_t predecessor_count() __TBB_override {
3784 spin_mutex::scoped_lock l( my_mutex );
3785 return my_built_predecessors.edge_count();
3786 }
3787
3788 void copy_predecessors( predecessor_list_type &v ) __TBB_override {
3789 spin_mutex::scoped_lock l( my_mutex );
3790 my_built_predecessors.copy_edges(v);
3791 }
3792
3793 void extract() __TBB_override {
3794 my_buffer_is_valid = false;
3795 built_successors().sender_extract(*this);
3796 built_predecessors().receiver_extract(*this);
3797 }
3798
3799#endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3800
3801 bool try_get( input_type &v ) __TBB_override {
3802 spin_mutex::scoped_lock l( my_mutex );
3803 if ( my_buffer_is_valid ) {
3804 v = my_buffer;
3805 return true;
3806 }
3807 return false;
3808 }
3809
3810 //! Reserves an item
3811 bool try_reserve( T &v ) __TBB_override {
3812 return try_get(v);
3813 }
3814
3815 //! Releases the reserved item
3816 bool try_release() __TBB_override { return true; }
3817
3818 //! Consumes the reserved item
3819 bool try_consume() __TBB_override { return true; }
3820
3821 bool is_valid() {
3822 spin_mutex::scoped_lock l( my_mutex );
3823 return my_buffer_is_valid;
3824 }
3825
3826 void clear() {
3827 spin_mutex::scoped_lock l( my_mutex );
3828 my_buffer_is_valid = false;
3829 }
3830
3831protected:
3832
3833 template< typename R, typename B > friend class run_and_put_task;
3834 template<typename X, typename Y> friend class internal::broadcast_cache;
3835 template<typename X, typename Y> friend class internal::round_robin_cache;
3836 task * try_put_task( const input_type &v ) __TBB_override {
3837 spin_mutex::scoped_lock l( my_mutex );
3838 return try_put_task_impl(v);
3839 }
3840
3841 task * try_put_task_impl(const input_type &v) {
3842 my_buffer = v;
3843 my_buffer_is_valid = true;
3844 task * rtask = my_successors.try_put_task(v);
3845 if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
3846 return rtask;
3847 }
3848
3849 graph& graph_reference() __TBB_override {
3850 return my_graph;
3851 }
3852
3853 //! Breaks an infinite loop between the node reservation and register_successor call
3854 struct register_predecessor_task : public graph_task {
3855
3856 register_predecessor_task(predecessor_type& owner, successor_type& succ) :
3857 o(owner), s(succ) {};
3858
3859 tbb::task* execute() __TBB_override {
3860 if (!s.register_predecessor(o)) {
3861 o.register_successor(s);
3862 }
3863 return NULL;
3864 }
3865
3866 predecessor_type& o;
3867 successor_type& s;
3868 };
3869
3870 spin_mutex my_mutex;
3871 internal::broadcast_cache< input_type, null_rw_mutex > my_successors;
3872#if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3873 internal::edge_container<predecessor_type> my_built_predecessors;
3874#endif
3875 input_type my_buffer;
3876 bool my_buffer_is_valid;
3877 void reset_receiver(reset_flags /*f*/) __TBB_override {}
3878
3879 void reset_node( reset_flags f) __TBB_override {
3880 my_buffer_is_valid = false;
3881 if (f&rf_clear_edges) {
3882 my_successors.clear();
3883 }
3884 }
3885}; // overwrite_node
3886
3887template< typename T >
3888class write_once_node : public overwrite_node<T> {
3889public:
3890 typedef T input_type;
3891 typedef T output_type;
3892 typedef overwrite_node<T> base_type;
3893 typedef typename receiver<input_type>::predecessor_type predecessor_type;
3894 typedef typename sender<output_type>::successor_type successor_type;
3895
3896 //! Constructor
3897 explicit write_once_node(graph& g) : base_type(g) {
3898 tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3899 static_cast<receiver<input_type> *>(this),
3900 static_cast<sender<output_type> *>(this) );
3901 }
3902
3903 //! Copy constructor: call base class copy constructor
3904 write_once_node( const write_once_node& src ) : base_type(src) {
3905 tbb::internal::fgt_node( tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3906 static_cast<receiver<input_type> *>(this),
3907 static_cast<sender<output_type> *>(this) );
3908 }
3909
3910#if TBB_PREVIEW_FLOW_GRAPH_TRACE
3911 void set_name( const char *name ) __TBB_override {
3912 tbb::internal::fgt_node_desc( this, name );
3913 }
3914#endif
3915
3916protected:
3917 template< typename R, typename B > friend class run_and_put_task;
3918 template<typename X, typename Y> friend class internal::broadcast_cache;
3919 template<typename X, typename Y> friend class internal::round_robin_cache;
3920 task *try_put_task( const T &v ) __TBB_override {
3921 spin_mutex::scoped_lock l( this->my_mutex );
3922 return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
3923 }
3924};
3925} // interfaceX
3926
3927 using interface10::reset_flags;
3928 using interface10::rf_reset_protocol;
3929 using interface10::rf_reset_bodies;
3930 using interface10::rf_clear_edges;
3931
3932 using interface10::graph;
3933 using interface10::graph_node;
3934 using interface10::continue_msg;
3935
3936 using interface10::source_node;
3937 using interface10::function_node;
3938 using interface10::multifunction_node;
3939 using interface10::split_node;
3940 using interface10::internal::output_port;
3941 using interface10::indexer_node;
3942 using interface10::internal::tagged_msg;
3943 using interface10::internal::cast_to;
3944 using interface10::internal::is_a;
3945 using interface10::continue_node;
3946 using interface10a::overwrite_node;
3947 using interface10a::write_once_node;
3948 using interface10::broadcast_node;
3949 using interface10::buffer_node;
3950 using interface10::queue_node;
3951 using interface10::sequencer_node;
3952 using interface10::priority_queue_node;
3953 using interface11::limiter_node;
3954 using namespace interface10::internal::graph_policy_namespace;
3955 using interface10::join_node;
3956 using interface10::input_port;
3957 using interface10::copy_body;
3958 using interface10::make_edge;
3959 using interface10::remove_edge;
3960 using interface10::internal::tag_value;
3961#if __TBB_FLOW_GRAPH_CPP11_FEATURES
3962 using interface10::composite_node;
3963#endif
3964 using interface10::async_node;
3965#if __TBB_PREVIEW_ASYNC_MSG
3966 using interface10::async_msg;
3967#endif
3968#if __TBB_PREVIEW_STREAMING_NODE
3969 using interface10::port_ref;
3970 using interface10::streaming_node;
3971#endif // __TBB_PREVIEW_STREAMING_NODE
3972#if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
3973 using internal::node_priority_t;
3974 using internal::no_priority;
3975#endif
3976
3977
3978} // flow
3979} // tbb
3980
3981#undef __TBB_PFG_RESET_ARG
3982#undef __TBB_COMMA
3983
3984#endif // __TBB_flow_graph_H
3985