1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef __TBB_pipeline_H
18#define __TBB_pipeline_H
19
20#include "atomic.h"
21#include "task.h"
22#include "tbb_allocator.h"
23#include <cstddef>
24
25#if __TBB_CPP11_TYPE_PROPERTIES_PRESENT || __TBB_TR1_TYPE_PROPERTIES_IN_STD_PRESENT
26#include <type_traits>
27#endif
28
29namespace tbb {
30
31class pipeline;
32class filter;
33
34//! @cond INTERNAL
35namespace internal {
36
37// The argument for PIPELINE_VERSION should be an integer between 2 and 9
38#define __TBB_PIPELINE_VERSION(x) ((unsigned char)(x-2)<<1)
39
40typedef unsigned long Token;
41typedef long tokendiff_t;
42class stage_task;
43class input_buffer;
44class pipeline_root_task;
45class pipeline_cleaner;
46
47} // namespace internal
48
49namespace interface6 {
50 template<typename T, typename U> class filter_t;
51
52 namespace internal {
53 class pipeline_proxy;
54 }
55}
56
57//! @endcond
58
59//! A stage in a pipeline.
60/** @ingroup algorithms */
61class filter: internal::no_copy {
62private:
63 //! Value used to mark "not in pipeline"
64 static filter* not_in_pipeline() {return reinterpret_cast<filter*>(intptr_t(-1));}
65protected:
66 //! The lowest bit 0 is for parallel vs. serial
67 static const unsigned char filter_is_serial = 0x1;
68
69 //! 4th bit distinguishes ordered vs unordered filters.
70 /** The bit was not set for parallel filters in TBB 2.1 and earlier,
71 but is_ordered() function always treats parallel filters as out of order. */
72 static const unsigned char filter_is_out_of_order = 0x1<<4;
73
74 //! 5th bit distinguishes thread-bound and regular filters.
75 static const unsigned char filter_is_bound = 0x1<<5;
76
77 //! 6th bit marks input filters emitting small objects
78 static const unsigned char filter_may_emit_null = 0x1<<6;
79
80 //! 7th bit defines exception propagation mode expected by the application.
81 static const unsigned char exact_exception_propagation =
82#if TBB_USE_CAPTURED_EXCEPTION
83 0x0;
84#else
85 0x1<<7;
86#endif /* TBB_USE_CAPTURED_EXCEPTION */
87
88 static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
89 static const unsigned char version_mask = 0x7<<1; // bits 1-3 are for version
90public:
91 enum mode {
92 //! processes multiple items in parallel and in no particular order
93 parallel = current_version | filter_is_out_of_order,
94 //! processes items one at a time; all such filters process items in the same order
95 serial_in_order = current_version | filter_is_serial,
96 //! processes items one at a time and in no particular order
97 serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
98 //! @deprecated use serial_in_order instead
99 serial = serial_in_order
100 };
101protected:
102 explicit filter( bool is_serial_ ) :
103 next_filter_in_pipeline(not_in_pipeline()),
104 my_input_buffer(NULL),
105 my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)),
106 prev_filter_in_pipeline(not_in_pipeline()),
107 my_pipeline(NULL),
108 next_segment(NULL)
109 {}
110
111 explicit filter( mode filter_mode ) :
112 next_filter_in_pipeline(not_in_pipeline()),
113 my_input_buffer(NULL),
114 my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
115 prev_filter_in_pipeline(not_in_pipeline()),
116 my_pipeline(NULL),
117 next_segment(NULL)
118 {}
119
120 // signal end-of-input for concrete_filters
121 void __TBB_EXPORTED_METHOD set_end_of_input();
122
123public:
124 //! True if filter is serial.
125 bool is_serial() const {
126 return bool( my_filter_mode & filter_is_serial );
127 }
128
129 //! True if filter must receive stream in order.
130 bool is_ordered() const {
131 return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
132 }
133
134 //! True if filter is thread-bound.
135 bool is_bound() const {
136 return ( my_filter_mode & filter_is_bound )==filter_is_bound;
137 }
138
139 //! true if an input filter can emit null
140 bool object_may_be_null() {
141 return ( my_filter_mode & filter_may_emit_null ) == filter_may_emit_null;
142 }
143
144 //! Operate on an item from the input stream, and return item for output stream.
145 /** Returns NULL if filter is a sink. */
146 virtual void* operator()( void* item ) = 0;
147
148 //! Destroy filter.
149 /** If the filter was added to a pipeline, the pipeline must be destroyed first. */
150 virtual __TBB_EXPORTED_METHOD ~filter();
151
152#if __TBB_TASK_GROUP_CONTEXT
153 //! Destroys item if pipeline was cancelled.
154 /** Required to prevent memory leaks.
155 Note it can be called concurrently even for serial filters.*/
156 virtual void finalize( void* /*item*/ ) {};
157#endif
158
159private:
160 //! Pointer to next filter in the pipeline.
161 filter* next_filter_in_pipeline;
162
163 //! has the filter not yet processed all the tokens it will ever see?
164 // (pipeline has not yet reached end_of_input or this filter has not yet
165 // seen the last token produced by input_filter)
166 bool has_more_work();
167
168 //! Buffer for incoming tokens, or NULL if not required.
169 /** The buffer is required if the filter is serial or follows a thread-bound one. */
170 internal::input_buffer* my_input_buffer;
171
172 friend class internal::stage_task;
173 friend class internal::pipeline_root_task;
174 friend class pipeline;
175 friend class thread_bound_filter;
176
177 //! Storage for filter mode and dynamically checked implementation version.
178 const unsigned char my_filter_mode;
179
180 //! Pointer to previous filter in the pipeline.
181 filter* prev_filter_in_pipeline;
182
183 //! Pointer to the pipeline.
184 pipeline* my_pipeline;
185
186 //! Pointer to the next "segment" of filters, or NULL if not required.
187 /** In each segment, the first filter is not thread-bound but follows a thread-bound one. */
188 filter* next_segment;
189};
190
191//! A stage in a pipeline served by a user thread.
192/** @ingroup algorithms */
193class thread_bound_filter: public filter {
194public:
195 enum result_type {
196 // item was processed
197 success,
198 // item is currently not available
199 item_not_available,
200 // there are no more items to process
201 end_of_stream
202 };
203protected:
204 explicit thread_bound_filter(mode filter_mode):
205 filter(static_cast<mode>(filter_mode | filter::filter_is_bound))
206 {
207 __TBB_ASSERT(filter_mode & filter::filter_is_serial, "thread-bound filters must be serial");
208 }
209public:
210 //! If a data item is available, invoke operator() on that item.
211 /** This interface is non-blocking.
212 Returns 'success' if an item was processed.
213 Returns 'item_not_available' if no item can be processed now
214 but more may arrive in the future, or if token limit is reached.
215 Returns 'end_of_stream' if there are no more items to process. */
216 result_type __TBB_EXPORTED_METHOD try_process_item();
217
218 //! Wait until a data item becomes available, and invoke operator() on that item.
219 /** This interface is blocking.
220 Returns 'success' if an item was processed.
221 Returns 'end_of_stream' if there are no more items to process.
222 Never returns 'item_not_available', as it blocks until another return condition applies. */
223 result_type __TBB_EXPORTED_METHOD process_item();
224
225private:
226 //! Internal routine for item processing
227 result_type internal_process_item(bool is_blocking);
228};
229
230//! A processing pipeline that applies filters to items.
231/** @ingroup algorithms */
232class pipeline {
233public:
234 //! Construct empty pipeline.
235 __TBB_EXPORTED_METHOD pipeline();
236
237 /** Though the current implementation declares the destructor virtual, do not rely on this
238 detail. The virtualness is deprecated and may disappear in future versions of TBB. */
239 virtual __TBB_EXPORTED_METHOD ~pipeline();
240
241 //! Add filter to end of pipeline.
242 void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
243
244 //! Run the pipeline to completion.
245 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
246
247#if __TBB_TASK_GROUP_CONTEXT
248 //! Run the pipeline to completion with user-supplied context.
249 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
250#endif
251
252 //! Remove all filters from the pipeline.
253 void __TBB_EXPORTED_METHOD clear();
254
255private:
256 friend class internal::stage_task;
257 friend class internal::pipeline_root_task;
258 friend class filter;
259 friend class thread_bound_filter;
260 friend class internal::pipeline_cleaner;
261 friend class tbb::interface6::internal::pipeline_proxy;
262
263 //! Pointer to first filter in the pipeline.
264 filter* filter_list;
265
266 //! Pointer to location where address of next filter to be added should be stored.
267 filter* filter_end;
268
269 //! task who's reference count is used to determine when all stages are done.
270 task* end_counter;
271
272 //! Number of idle tokens waiting for input stage.
273 atomic<internal::Token> input_tokens;
274
275 //! Global counter of tokens
276 atomic<internal::Token> token_counter;
277
278 //! False until fetch_input returns NULL.
279 bool end_of_input;
280
281 //! True if the pipeline contains a thread-bound filter; false otherwise.
282 bool has_thread_bound_filters;
283
284 //! Remove filter from pipeline.
285 void remove_filter( filter& filter_ );
286
287 //! Not used, but retained to satisfy old export files.
288 void __TBB_EXPORTED_METHOD inject_token( task& self );
289
290#if __TBB_TASK_GROUP_CONTEXT
291 //! Does clean up if pipeline is cancelled or exception occurred
292 void clear_filters();
293#endif
294};
295
296//------------------------------------------------------------------------
297// Support for lambda-friendly parallel_pipeline interface
298//------------------------------------------------------------------------
299
300namespace interface6 {
301
302namespace internal {
303 template<typename T, typename U, typename Body> class concrete_filter;
304}
305
306//! input_filter control to signal end-of-input for parallel_pipeline
307class flow_control {
308 bool is_pipeline_stopped;
309 flow_control() { is_pipeline_stopped = false; }
310 template<typename T, typename U, typename Body> friend class internal::concrete_filter;
311public:
312 void stop() { is_pipeline_stopped = true; }
313};
314
315//! @cond INTERNAL
316namespace internal {
317
318template<typename T> struct tbb_large_object {enum { value = sizeof(T) > sizeof(void *) }; };
319
320// Obtain type properties in one or another way
321#if __TBB_CPP11_TYPE_PROPERTIES_PRESENT
322template<typename T> struct tbb_trivially_copyable { enum { value = std::is_trivially_copyable<T>::value }; };
323#elif __TBB_TR1_TYPE_PROPERTIES_IN_STD_PRESENT
324template<typename T> struct tbb_trivially_copyable { enum { value = std::has_trivial_copy_constructor<T>::value }; };
325#else
326// Explicitly list the types we wish to be placed as-is in the pipeline input_buffers.
327template<typename T> struct tbb_trivially_copyable { enum { value = false }; };
328template<typename T> struct tbb_trivially_copyable <T*> { enum { value = true }; };
329template<> struct tbb_trivially_copyable <short> { enum { value = true }; };
330template<> struct tbb_trivially_copyable <unsigned short> { enum { value = true }; };
331template<> struct tbb_trivially_copyable <int> { enum { value = !tbb_large_object<int>::value }; };
332template<> struct tbb_trivially_copyable <unsigned int> { enum { value = !tbb_large_object<int>::value }; };
333template<> struct tbb_trivially_copyable <long> { enum { value = !tbb_large_object<long>::value }; };
334template<> struct tbb_trivially_copyable <unsigned long> { enum { value = !tbb_large_object<long>::value }; };
335template<> struct tbb_trivially_copyable <float> { enum { value = !tbb_large_object<float>::value }; };
336template<> struct tbb_trivially_copyable <double> { enum { value = !tbb_large_object<double>::value }; };
337#endif // Obtaining type properties
338
339template<typename T> struct is_large_object {enum { value = tbb_large_object<T>::value || !tbb_trivially_copyable<T>::value }; };
340
341template<typename T, bool> class token_helper;
342
343// large object helper (uses tbb_allocator)
344template<typename T>
345class token_helper<T, true> {
346 public:
347 typedef typename tbb::tbb_allocator<T> allocator;
348 typedef T* pointer;
349 typedef T value_type;
350 static pointer create_token(const value_type & source) {
351 pointer output_t = allocator().allocate(1);
352 return new (output_t) T(source);
353 }
354 static value_type & token(pointer & t) { return *t;}
355 static void * cast_to_void_ptr(pointer ref) { return (void *) ref; }
356 static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
357 static void destroy_token(pointer token) {
358 allocator().destroy(token);
359 allocator().deallocate(token,1);
360 }
361};
362
363// pointer specialization
364template<typename T>
365class token_helper<T*, false > {
366 public:
367 typedef T* pointer;
368 typedef T* value_type;
369 static pointer create_token(const value_type & source) { return source; }
370 static value_type & token(pointer & t) { return t;}
371 static void * cast_to_void_ptr(pointer ref) { return (void *)ref; }
372 static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
373 static void destroy_token( pointer /*token*/) {}
374};
375
376// small object specialization (converts void* to the correct type, passes objects directly.)
377template<typename T>
378class token_helper<T, false> {
379 typedef union {
380 T actual_value;
381 void * void_overlay;
382 } type_to_void_ptr_map;
383 public:
384 typedef T pointer; // not really a pointer in this case.
385 typedef T value_type;
386 static pointer create_token(const value_type & source) {
387 return source; }
388 static value_type & token(pointer & t) { return t;}
389 static void * cast_to_void_ptr(pointer ref) {
390 type_to_void_ptr_map mymap;
391 mymap.void_overlay = NULL;
392 mymap.actual_value = ref;
393 return mymap.void_overlay;
394 }
395 static pointer cast_from_void_ptr(void * ref) {
396 type_to_void_ptr_map mymap;
397 mymap.void_overlay = ref;
398 return mymap.actual_value;
399 }
400 static void destroy_token( pointer /*token*/) {}
401};
402
403template<typename T, typename U, typename Body>
404class concrete_filter: public tbb::filter {
405 const Body& my_body;
406 typedef token_helper<T,is_large_object<T>::value > t_helper;
407 typedef typename t_helper::pointer t_pointer;
408 typedef token_helper<U,is_large_object<U>::value > u_helper;
409 typedef typename u_helper::pointer u_pointer;
410
411 void* operator()(void* input) __TBB_override {
412 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
413 u_pointer output_u = u_helper::create_token(my_body(t_helper::token(temp_input)));
414 t_helper::destroy_token(temp_input);
415 return u_helper::cast_to_void_ptr(output_u);
416 }
417
418 void finalize(void * input) __TBB_override {
419 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
420 t_helper::destroy_token(temp_input);
421 }
422
423public:
424 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
425};
426
427// input
428template<typename U, typename Body>
429class concrete_filter<void,U,Body>: public filter {
430 const Body& my_body;
431 typedef token_helper<U, is_large_object<U>::value > u_helper;
432 typedef typename u_helper::pointer u_pointer;
433
434 void* operator()(void*) __TBB_override {
435 flow_control control;
436 u_pointer output_u = u_helper::create_token(my_body(control));
437 if(control.is_pipeline_stopped) {
438 u_helper::destroy_token(output_u);
439 set_end_of_input();
440 return NULL;
441 }
442 return u_helper::cast_to_void_ptr(output_u);
443 }
444
445public:
446 concrete_filter(tbb::filter::mode filter_mode, const Body& body) :
447 filter(static_cast<tbb::filter::mode>(filter_mode | filter_may_emit_null)),
448 my_body(body)
449 {}
450};
451
452template<typename T, typename Body>
453class concrete_filter<T,void,Body>: public filter {
454 const Body& my_body;
455 typedef token_helper<T, is_large_object<T>::value > t_helper;
456 typedef typename t_helper::pointer t_pointer;
457
458 void* operator()(void* input) __TBB_override {
459 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
460 my_body(t_helper::token(temp_input));
461 t_helper::destroy_token(temp_input);
462 return NULL;
463 }
464 void finalize(void* input) __TBB_override {
465 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
466 t_helper::destroy_token(temp_input);
467 }
468
469public:
470 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
471};
472
473template<typename Body>
474class concrete_filter<void,void,Body>: public filter {
475 const Body& my_body;
476
477 /** Override privately because it is always called virtually */
478 void* operator()(void*) __TBB_override {
479 flow_control control;
480 my_body(control);
481 void* output = control.is_pipeline_stopped ? NULL : (void*)(intptr_t)-1;
482 return output;
483 }
484public:
485 concrete_filter(filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
486};
487
488//! The class that represents an object of the pipeline for parallel_pipeline().
489/** It primarily serves as RAII class that deletes heap-allocated filter instances. */
490class pipeline_proxy {
491 tbb::pipeline my_pipe;
492public:
493 pipeline_proxy( const filter_t<void,void>& filter_chain );
494 ~pipeline_proxy() {
495 while( filter* f = my_pipe.filter_list )
496 delete f; // filter destructor removes it from the pipeline
497 }
498 tbb::pipeline* operator->() { return &my_pipe; }
499};
500
501//! Abstract base class that represents a node in a parse tree underlying a filter_t.
502/** These nodes are always heap-allocated and can be shared by filter_t objects. */
503class filter_node: tbb::internal::no_copy {
504 /** Count must be atomic because it is hidden state for user, but might be shared by threads. */
505 tbb::atomic<intptr_t> ref_count;
506protected:
507 filter_node() {
508 ref_count = 0;
509#ifdef __TBB_TEST_FILTER_NODE_COUNT
510 ++(__TBB_TEST_FILTER_NODE_COUNT);
511#endif
512 }
513public:
514 //! Add concrete_filter to pipeline
515 virtual void add_to( pipeline& ) = 0;
516 //! Increment reference count
517 void add_ref() {++ref_count;}
518 //! Decrement reference count and delete if it becomes zero.
519 void remove_ref() {
520 __TBB_ASSERT(ref_count>0,"ref_count underflow");
521 if( --ref_count==0 )
522 delete this;
523 }
524 virtual ~filter_node() {
525#ifdef __TBB_TEST_FILTER_NODE_COUNT
526 --(__TBB_TEST_FILTER_NODE_COUNT);
527#endif
528 }
529};
530
531//! Node in parse tree representing result of make_filter.
532template<typename T, typename U, typename Body>
533class filter_node_leaf: public filter_node {
534 const tbb::filter::mode mode;
535 const Body body;
536 void add_to( pipeline& p ) __TBB_override {
537 concrete_filter<T,U,Body>* f = new concrete_filter<T,U,Body>(mode,body);
538 p.add_filter( *f );
539 }
540public:
541 filter_node_leaf( tbb::filter::mode m, const Body& b ) : mode(m), body(b) {}
542};
543
544//! Node in parse tree representing join of two filters.
545class filter_node_join: public filter_node {
546 friend class filter_node; // to suppress GCC 3.2 warnings
547 filter_node& left;
548 filter_node& right;
549 ~filter_node_join() {
550 left.remove_ref();
551 right.remove_ref();
552 }
553 void add_to( pipeline& p ) __TBB_override {
554 left.add_to(p);
555 right.add_to(p);
556 }
557public:
558 filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) {
559 left.add_ref();
560 right.add_ref();
561 }
562};
563
564} // namespace internal
565//! @endcond
566
567//! Create a filter to participate in parallel_pipeline
568template<typename T, typename U, typename Body>
569filter_t<T,U> make_filter(tbb::filter::mode mode, const Body& body) {
570 return new internal::filter_node_leaf<T,U,Body>(mode, body);
571}
572
573template<typename T, typename V, typename U>
574filter_t<T,U> operator& (const filter_t<T,V>& left, const filter_t<V,U>& right) {
575 __TBB_ASSERT(left.root,"cannot use default-constructed filter_t as left argument of '&'");
576 __TBB_ASSERT(right.root,"cannot use default-constructed filter_t as right argument of '&'");
577 return new internal::filter_node_join(*left.root,*right.root);
578}
579
580//! Class representing a chain of type-safe pipeline filters
581template<typename T, typename U>
582class filter_t {
583 typedef internal::filter_node filter_node;
584 filter_node* root;
585 filter_t( filter_node* root_ ) : root(root_) {
586 root->add_ref();
587 }
588 friend class internal::pipeline_proxy;
589 template<typename T_, typename U_, typename Body>
590 friend filter_t<T_,U_> make_filter(tbb::filter::mode, const Body& );
591 template<typename T_, typename V_, typename U_>
592 friend filter_t<T_,U_> operator& (const filter_t<T_,V_>& , const filter_t<V_,U_>& );
593public:
594 // TODO: add move-constructors, move-assignment, etc. where C++11 is available.
595 filter_t() : root(NULL) {}
596 filter_t( const filter_t<T,U>& rhs ) : root(rhs.root) {
597 if( root ) root->add_ref();
598 }
599 template<typename Body>
600 filter_t( tbb::filter::mode mode, const Body& body ) :
601 root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
602 root->add_ref();
603 }
604
605 void operator=( const filter_t<T,U>& rhs ) {
606 // Order of operations below carefully chosen so that reference counts remain correct
607 // in unlikely event that remove_ref throws exception.
608 filter_node* old = root;
609 root = rhs.root;
610 if( root ) root->add_ref();
611 if( old ) old->remove_ref();
612 }
613 ~filter_t() {
614 if( root ) root->remove_ref();
615 }
616 void clear() {
617 // Like operator= with filter_t() on right side.
618 if( root ) {
619 filter_node* old = root;
620 root = NULL;
621 old->remove_ref();
622 }
623 }
624};
625
626inline internal::pipeline_proxy::pipeline_proxy( const filter_t<void,void>& filter_chain ) : my_pipe() {
627 __TBB_ASSERT( filter_chain.root, "cannot apply parallel_pipeline to default-constructed filter_t" );
628 filter_chain.root->add_to(my_pipe);
629}
630
631inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain
632#if __TBB_TASK_GROUP_CONTEXT
633 , tbb::task_group_context& context
634#endif
635 ) {
636 internal::pipeline_proxy pipe(filter_chain);
637 // tbb::pipeline::run() is called via the proxy
638 pipe->run(max_number_of_live_tokens
639#if __TBB_TASK_GROUP_CONTEXT
640 , context
641#endif
642 );
643}
644
645#if __TBB_TASK_GROUP_CONTEXT
646inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain) {
647 tbb::task_group_context context;
648 parallel_pipeline(max_number_of_live_tokens, filter_chain, context);
649}
650#endif // __TBB_TASK_GROUP_CONTEXT
651
652} // interface6
653
654using interface6::flow_control;
655using interface6::filter_t;
656using interface6::make_filter;
657using interface6::parallel_pipeline;
658
659} // tbb
660
661#endif /* __TBB_pipeline_H */
662