1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | #ifndef __TBB_pipeline_H |
18 | #define __TBB_pipeline_H |
19 | |
20 | #include "atomic.h" |
21 | #include "task.h" |
22 | #include "tbb_allocator.h" |
23 | #include <cstddef> |
24 | |
25 | #if __TBB_CPP11_TYPE_PROPERTIES_PRESENT || __TBB_TR1_TYPE_PROPERTIES_IN_STD_PRESENT |
26 | #include <type_traits> |
27 | #endif |
28 | |
29 | namespace tbb { |
30 | |
31 | class pipeline; |
32 | class filter; |
33 | |
34 | //! @cond INTERNAL |
35 | namespace internal { |
36 | |
37 | // The argument for PIPELINE_VERSION should be an integer between 2 and 9 |
38 | #define __TBB_PIPELINE_VERSION(x) ((unsigned char)(x-2)<<1) |
39 | |
40 | typedef unsigned long Token; |
41 | typedef long tokendiff_t; |
42 | class stage_task; |
43 | class input_buffer; |
44 | class pipeline_root_task; |
45 | class pipeline_cleaner; |
46 | |
47 | } // namespace internal |
48 | |
49 | namespace interface6 { |
50 | template<typename T, typename U> class filter_t; |
51 | |
52 | namespace internal { |
53 | class pipeline_proxy; |
54 | } |
55 | } |
56 | |
57 | //! @endcond |
58 | |
59 | //! A stage in a pipeline. |
60 | /** @ingroup algorithms */ |
61 | class filter: internal::no_copy { |
62 | private: |
63 | //! Value used to mark "not in pipeline" |
64 | static filter* not_in_pipeline() {return reinterpret_cast<filter*>(intptr_t(-1));} |
65 | protected: |
66 | //! The lowest bit 0 is for parallel vs. serial |
67 | static const unsigned char filter_is_serial = 0x1; |
68 | |
69 | //! 4th bit distinguishes ordered vs unordered filters. |
70 | /** The bit was not set for parallel filters in TBB 2.1 and earlier, |
71 | but is_ordered() function always treats parallel filters as out of order. */ |
72 | static const unsigned char filter_is_out_of_order = 0x1<<4; |
73 | |
74 | //! 5th bit distinguishes thread-bound and regular filters. |
75 | static const unsigned char filter_is_bound = 0x1<<5; |
76 | |
77 | //! 6th bit marks input filters emitting small objects |
78 | static const unsigned char filter_may_emit_null = 0x1<<6; |
79 | |
80 | //! 7th bit defines exception propagation mode expected by the application. |
81 | static const unsigned char exact_exception_propagation = |
82 | #if TBB_USE_CAPTURED_EXCEPTION |
83 | 0x0; |
84 | #else |
85 | 0x1<<7; |
86 | #endif /* TBB_USE_CAPTURED_EXCEPTION */ |
87 | |
88 | static const unsigned char current_version = __TBB_PIPELINE_VERSION(5); |
89 | static const unsigned char version_mask = 0x7<<1; // bits 1-3 are for version |
90 | public: |
91 | enum mode { |
92 | //! processes multiple items in parallel and in no particular order |
93 | parallel = current_version | filter_is_out_of_order, |
94 | //! processes items one at a time; all such filters process items in the same order |
95 | serial_in_order = current_version | filter_is_serial, |
96 | //! processes items one at a time and in no particular order |
97 | serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order, |
98 | //! @deprecated use serial_in_order instead |
99 | serial = serial_in_order |
100 | }; |
101 | protected: |
102 | explicit filter( bool is_serial_ ) : |
103 | next_filter_in_pipeline(not_in_pipeline()), |
104 | my_input_buffer(NULL), |
105 | my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)), |
106 | prev_filter_in_pipeline(not_in_pipeline()), |
107 | my_pipeline(NULL), |
108 | next_segment(NULL) |
109 | {} |
110 | |
111 | explicit filter( mode filter_mode ) : |
112 | next_filter_in_pipeline(not_in_pipeline()), |
113 | my_input_buffer(NULL), |
114 | my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)), |
115 | prev_filter_in_pipeline(not_in_pipeline()), |
116 | my_pipeline(NULL), |
117 | next_segment(NULL) |
118 | {} |
119 | |
120 | // signal end-of-input for concrete_filters |
121 | void __TBB_EXPORTED_METHOD set_end_of_input(); |
122 | |
123 | public: |
124 | //! True if filter is serial. |
125 | bool is_serial() const { |
126 | return bool( my_filter_mode & filter_is_serial ); |
127 | } |
128 | |
129 | //! True if filter must receive stream in order. |
130 | bool is_ordered() const { |
131 | return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial; |
132 | } |
133 | |
134 | //! True if filter is thread-bound. |
135 | bool is_bound() const { |
136 | return ( my_filter_mode & filter_is_bound )==filter_is_bound; |
137 | } |
138 | |
139 | //! true if an input filter can emit null |
140 | bool object_may_be_null() { |
141 | return ( my_filter_mode & filter_may_emit_null ) == filter_may_emit_null; |
142 | } |
143 | |
144 | //! Operate on an item from the input stream, and return item for output stream. |
145 | /** Returns NULL if filter is a sink. */ |
146 | virtual void* operator()( void* item ) = 0; |
147 | |
148 | //! Destroy filter. |
149 | /** If the filter was added to a pipeline, the pipeline must be destroyed first. */ |
150 | virtual __TBB_EXPORTED_METHOD ~filter(); |
151 | |
152 | #if __TBB_TASK_GROUP_CONTEXT |
153 | //! Destroys item if pipeline was cancelled. |
154 | /** Required to prevent memory leaks. |
155 | Note it can be called concurrently even for serial filters.*/ |
156 | virtual void finalize( void* /*item*/ ) {}; |
157 | #endif |
158 | |
159 | private: |
160 | //! Pointer to next filter in the pipeline. |
161 | filter* next_filter_in_pipeline; |
162 | |
163 | //! has the filter not yet processed all the tokens it will ever see? |
164 | // (pipeline has not yet reached end_of_input or this filter has not yet |
165 | // seen the last token produced by input_filter) |
166 | bool has_more_work(); |
167 | |
168 | //! Buffer for incoming tokens, or NULL if not required. |
169 | /** The buffer is required if the filter is serial or follows a thread-bound one. */ |
170 | internal::input_buffer* my_input_buffer; |
171 | |
172 | friend class internal::stage_task; |
173 | friend class internal::pipeline_root_task; |
174 | friend class pipeline; |
175 | friend class thread_bound_filter; |
176 | |
177 | //! Storage for filter mode and dynamically checked implementation version. |
178 | const unsigned char my_filter_mode; |
179 | |
180 | //! Pointer to previous filter in the pipeline. |
181 | filter* prev_filter_in_pipeline; |
182 | |
183 | //! Pointer to the pipeline. |
184 | pipeline* my_pipeline; |
185 | |
186 | //! Pointer to the next "segment" of filters, or NULL if not required. |
187 | /** In each segment, the first filter is not thread-bound but follows a thread-bound one. */ |
188 | filter* next_segment; |
189 | }; |
190 | |
191 | //! A stage in a pipeline served by a user thread. |
192 | /** @ingroup algorithms */ |
193 | class thread_bound_filter: public filter { |
194 | public: |
195 | enum result_type { |
196 | // item was processed |
197 | success, |
198 | // item is currently not available |
199 | item_not_available, |
200 | // there are no more items to process |
201 | end_of_stream |
202 | }; |
203 | protected: |
204 | explicit thread_bound_filter(mode filter_mode): |
205 | filter(static_cast<mode>(filter_mode | filter::filter_is_bound)) |
206 | { |
207 | __TBB_ASSERT(filter_mode & filter::filter_is_serial, "thread-bound filters must be serial" ); |
208 | } |
209 | public: |
210 | //! If a data item is available, invoke operator() on that item. |
211 | /** This interface is non-blocking. |
212 | Returns 'success' if an item was processed. |
213 | Returns 'item_not_available' if no item can be processed now |
214 | but more may arrive in the future, or if token limit is reached. |
215 | Returns 'end_of_stream' if there are no more items to process. */ |
216 | result_type __TBB_EXPORTED_METHOD try_process_item(); |
217 | |
218 | //! Wait until a data item becomes available, and invoke operator() on that item. |
219 | /** This interface is blocking. |
220 | Returns 'success' if an item was processed. |
221 | Returns 'end_of_stream' if there are no more items to process. |
222 | Never returns 'item_not_available', as it blocks until another return condition applies. */ |
223 | result_type __TBB_EXPORTED_METHOD process_item(); |
224 | |
225 | private: |
226 | //! Internal routine for item processing |
227 | result_type internal_process_item(bool is_blocking); |
228 | }; |
229 | |
230 | //! A processing pipeline that applies filters to items. |
231 | /** @ingroup algorithms */ |
232 | class pipeline { |
233 | public: |
234 | //! Construct empty pipeline. |
235 | __TBB_EXPORTED_METHOD pipeline(); |
236 | |
237 | /** Though the current implementation declares the destructor virtual, do not rely on this |
238 | detail. The virtualness is deprecated and may disappear in future versions of TBB. */ |
239 | virtual __TBB_EXPORTED_METHOD ~pipeline(); |
240 | |
241 | //! Add filter to end of pipeline. |
242 | void __TBB_EXPORTED_METHOD add_filter( filter& filter_ ); |
243 | |
244 | //! Run the pipeline to completion. |
245 | void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens ); |
246 | |
247 | #if __TBB_TASK_GROUP_CONTEXT |
248 | //! Run the pipeline to completion with user-supplied context. |
249 | void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context ); |
250 | #endif |
251 | |
252 | //! Remove all filters from the pipeline. |
253 | void __TBB_EXPORTED_METHOD clear(); |
254 | |
255 | private: |
256 | friend class internal::stage_task; |
257 | friend class internal::pipeline_root_task; |
258 | friend class filter; |
259 | friend class thread_bound_filter; |
260 | friend class internal::pipeline_cleaner; |
261 | friend class tbb::interface6::internal::pipeline_proxy; |
262 | |
263 | //! Pointer to first filter in the pipeline. |
264 | filter* filter_list; |
265 | |
266 | //! Pointer to location where address of next filter to be added should be stored. |
267 | filter* filter_end; |
268 | |
269 | //! task who's reference count is used to determine when all stages are done. |
270 | task* end_counter; |
271 | |
272 | //! Number of idle tokens waiting for input stage. |
273 | atomic<internal::Token> input_tokens; |
274 | |
275 | //! Global counter of tokens |
276 | atomic<internal::Token> token_counter; |
277 | |
278 | //! False until fetch_input returns NULL. |
279 | bool end_of_input; |
280 | |
281 | //! True if the pipeline contains a thread-bound filter; false otherwise. |
282 | bool has_thread_bound_filters; |
283 | |
284 | //! Remove filter from pipeline. |
285 | void remove_filter( filter& filter_ ); |
286 | |
287 | //! Not used, but retained to satisfy old export files. |
288 | void __TBB_EXPORTED_METHOD inject_token( task& self ); |
289 | |
290 | #if __TBB_TASK_GROUP_CONTEXT |
291 | //! Does clean up if pipeline is cancelled or exception occurred |
292 | void clear_filters(); |
293 | #endif |
294 | }; |
295 | |
296 | //------------------------------------------------------------------------ |
297 | // Support for lambda-friendly parallel_pipeline interface |
298 | //------------------------------------------------------------------------ |
299 | |
300 | namespace interface6 { |
301 | |
302 | namespace internal { |
303 | template<typename T, typename U, typename Body> class concrete_filter; |
304 | } |
305 | |
306 | //! input_filter control to signal end-of-input for parallel_pipeline |
307 | class flow_control { |
308 | bool is_pipeline_stopped; |
309 | flow_control() { is_pipeline_stopped = false; } |
310 | template<typename T, typename U, typename Body> friend class internal::concrete_filter; |
311 | public: |
312 | void stop() { is_pipeline_stopped = true; } |
313 | }; |
314 | |
315 | //! @cond INTERNAL |
316 | namespace internal { |
317 | |
318 | template<typename T> struct tbb_large_object {enum { value = sizeof(T) > sizeof(void *) }; }; |
319 | |
320 | // Obtain type properties in one or another way |
321 | #if __TBB_CPP11_TYPE_PROPERTIES_PRESENT |
322 | template<typename T> struct tbb_trivially_copyable { enum { value = std::is_trivially_copyable<T>::value }; }; |
323 | #elif __TBB_TR1_TYPE_PROPERTIES_IN_STD_PRESENT |
324 | template<typename T> struct tbb_trivially_copyable { enum { value = std::has_trivial_copy_constructor<T>::value }; }; |
325 | #else |
326 | // Explicitly list the types we wish to be placed as-is in the pipeline input_buffers. |
327 | template<typename T> struct tbb_trivially_copyable { enum { value = false }; }; |
328 | template<typename T> struct tbb_trivially_copyable <T*> { enum { value = true }; }; |
329 | template<> struct tbb_trivially_copyable <short> { enum { value = true }; }; |
330 | template<> struct tbb_trivially_copyable <unsigned short> { enum { value = true }; }; |
331 | template<> struct tbb_trivially_copyable <int> { enum { value = !tbb_large_object<int>::value }; }; |
332 | template<> struct tbb_trivially_copyable <unsigned int> { enum { value = !tbb_large_object<int>::value }; }; |
333 | template<> struct tbb_trivially_copyable <long> { enum { value = !tbb_large_object<long>::value }; }; |
334 | template<> struct tbb_trivially_copyable <unsigned long> { enum { value = !tbb_large_object<long>::value }; }; |
335 | template<> struct tbb_trivially_copyable <float> { enum { value = !tbb_large_object<float>::value }; }; |
336 | template<> struct tbb_trivially_copyable <double> { enum { value = !tbb_large_object<double>::value }; }; |
337 | #endif // Obtaining type properties |
338 | |
339 | template<typename T> struct is_large_object {enum { value = tbb_large_object<T>::value || !tbb_trivially_copyable<T>::value }; }; |
340 | |
341 | template<typename T, bool> class token_helper; |
342 | |
343 | // large object helper (uses tbb_allocator) |
344 | template<typename T> |
345 | class token_helper<T, true> { |
346 | public: |
347 | typedef typename tbb::tbb_allocator<T> allocator; |
348 | typedef T* pointer; |
349 | typedef T value_type; |
350 | static pointer create_token(const value_type & source) { |
351 | pointer output_t = allocator().allocate(1); |
352 | return new (output_t) T(source); |
353 | } |
354 | static value_type & token(pointer & t) { return *t;} |
355 | static void * cast_to_void_ptr(pointer ref) { return (void *) ref; } |
356 | static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; } |
357 | static void destroy_token(pointer token) { |
358 | allocator().destroy(token); |
359 | allocator().deallocate(token,1); |
360 | } |
361 | }; |
362 | |
363 | // pointer specialization |
364 | template<typename T> |
365 | class token_helper<T*, false > { |
366 | public: |
367 | typedef T* pointer; |
368 | typedef T* value_type; |
369 | static pointer create_token(const value_type & source) { return source; } |
370 | static value_type & token(pointer & t) { return t;} |
371 | static void * cast_to_void_ptr(pointer ref) { return (void *)ref; } |
372 | static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; } |
373 | static void destroy_token( pointer /*token*/) {} |
374 | }; |
375 | |
376 | // small object specialization (converts void* to the correct type, passes objects directly.) |
377 | template<typename T> |
378 | class token_helper<T, false> { |
379 | typedef union { |
380 | T actual_value; |
381 | void * void_overlay; |
382 | } type_to_void_ptr_map; |
383 | public: |
384 | typedef T pointer; // not really a pointer in this case. |
385 | typedef T value_type; |
386 | static pointer create_token(const value_type & source) { |
387 | return source; } |
388 | static value_type & token(pointer & t) { return t;} |
389 | static void * cast_to_void_ptr(pointer ref) { |
390 | type_to_void_ptr_map mymap; |
391 | mymap.void_overlay = NULL; |
392 | mymap.actual_value = ref; |
393 | return mymap.void_overlay; |
394 | } |
395 | static pointer cast_from_void_ptr(void * ref) { |
396 | type_to_void_ptr_map mymap; |
397 | mymap.void_overlay = ref; |
398 | return mymap.actual_value; |
399 | } |
400 | static void destroy_token( pointer /*token*/) {} |
401 | }; |
402 | |
403 | template<typename T, typename U, typename Body> |
404 | class concrete_filter: public tbb::filter { |
405 | const Body& my_body; |
406 | typedef token_helper<T,is_large_object<T>::value > t_helper; |
407 | typedef typename t_helper::pointer t_pointer; |
408 | typedef token_helper<U,is_large_object<U>::value > u_helper; |
409 | typedef typename u_helper::pointer u_pointer; |
410 | |
411 | void* operator()(void* input) __TBB_override { |
412 | t_pointer temp_input = t_helper::cast_from_void_ptr(input); |
413 | u_pointer output_u = u_helper::create_token(my_body(t_helper::token(temp_input))); |
414 | t_helper::destroy_token(temp_input); |
415 | return u_helper::cast_to_void_ptr(output_u); |
416 | } |
417 | |
418 | void finalize(void * input) __TBB_override { |
419 | t_pointer temp_input = t_helper::cast_from_void_ptr(input); |
420 | t_helper::destroy_token(temp_input); |
421 | } |
422 | |
423 | public: |
424 | concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {} |
425 | }; |
426 | |
427 | // input |
428 | template<typename U, typename Body> |
429 | class concrete_filter<void,U,Body>: public filter { |
430 | const Body& my_body; |
431 | typedef token_helper<U, is_large_object<U>::value > u_helper; |
432 | typedef typename u_helper::pointer u_pointer; |
433 | |
434 | void* operator()(void*) __TBB_override { |
435 | flow_control control; |
436 | u_pointer output_u = u_helper::create_token(my_body(control)); |
437 | if(control.is_pipeline_stopped) { |
438 | u_helper::destroy_token(output_u); |
439 | set_end_of_input(); |
440 | return NULL; |
441 | } |
442 | return u_helper::cast_to_void_ptr(output_u); |
443 | } |
444 | |
445 | public: |
446 | concrete_filter(tbb::filter::mode filter_mode, const Body& body) : |
447 | filter(static_cast<tbb::filter::mode>(filter_mode | filter_may_emit_null)), |
448 | my_body(body) |
449 | {} |
450 | }; |
451 | |
452 | template<typename T, typename Body> |
453 | class concrete_filter<T,void,Body>: public filter { |
454 | const Body& my_body; |
455 | typedef token_helper<T, is_large_object<T>::value > t_helper; |
456 | typedef typename t_helper::pointer t_pointer; |
457 | |
458 | void* operator()(void* input) __TBB_override { |
459 | t_pointer temp_input = t_helper::cast_from_void_ptr(input); |
460 | my_body(t_helper::token(temp_input)); |
461 | t_helper::destroy_token(temp_input); |
462 | return NULL; |
463 | } |
464 | void finalize(void* input) __TBB_override { |
465 | t_pointer temp_input = t_helper::cast_from_void_ptr(input); |
466 | t_helper::destroy_token(temp_input); |
467 | } |
468 | |
469 | public: |
470 | concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {} |
471 | }; |
472 | |
473 | template<typename Body> |
474 | class concrete_filter<void,void,Body>: public filter { |
475 | const Body& my_body; |
476 | |
477 | /** Override privately because it is always called virtually */ |
478 | void* operator()(void*) __TBB_override { |
479 | flow_control control; |
480 | my_body(control); |
481 | void* output = control.is_pipeline_stopped ? NULL : (void*)(intptr_t)-1; |
482 | return output; |
483 | } |
484 | public: |
485 | concrete_filter(filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {} |
486 | }; |
487 | |
488 | //! The class that represents an object of the pipeline for parallel_pipeline(). |
489 | /** It primarily serves as RAII class that deletes heap-allocated filter instances. */ |
490 | class pipeline_proxy { |
491 | tbb::pipeline my_pipe; |
492 | public: |
493 | pipeline_proxy( const filter_t<void,void>& filter_chain ); |
494 | ~pipeline_proxy() { |
495 | while( filter* f = my_pipe.filter_list ) |
496 | delete f; // filter destructor removes it from the pipeline |
497 | } |
498 | tbb::pipeline* operator->() { return &my_pipe; } |
499 | }; |
500 | |
501 | //! Abstract base class that represents a node in a parse tree underlying a filter_t. |
502 | /** These nodes are always heap-allocated and can be shared by filter_t objects. */ |
503 | class filter_node: tbb::internal::no_copy { |
504 | /** Count must be atomic because it is hidden state for user, but might be shared by threads. */ |
505 | tbb::atomic<intptr_t> ref_count; |
506 | protected: |
507 | filter_node() { |
508 | ref_count = 0; |
509 | #ifdef __TBB_TEST_FILTER_NODE_COUNT |
510 | ++(__TBB_TEST_FILTER_NODE_COUNT); |
511 | #endif |
512 | } |
513 | public: |
514 | //! Add concrete_filter to pipeline |
515 | virtual void add_to( pipeline& ) = 0; |
516 | //! Increment reference count |
517 | void add_ref() {++ref_count;} |
518 | //! Decrement reference count and delete if it becomes zero. |
519 | void remove_ref() { |
520 | __TBB_ASSERT(ref_count>0,"ref_count underflow" ); |
521 | if( --ref_count==0 ) |
522 | delete this; |
523 | } |
524 | virtual ~filter_node() { |
525 | #ifdef __TBB_TEST_FILTER_NODE_COUNT |
526 | --(__TBB_TEST_FILTER_NODE_COUNT); |
527 | #endif |
528 | } |
529 | }; |
530 | |
531 | //! Node in parse tree representing result of make_filter. |
532 | template<typename T, typename U, typename Body> |
533 | class filter_node_leaf: public filter_node { |
534 | const tbb::filter::mode mode; |
535 | const Body body; |
536 | void add_to( pipeline& p ) __TBB_override { |
537 | concrete_filter<T,U,Body>* f = new concrete_filter<T,U,Body>(mode,body); |
538 | p.add_filter( *f ); |
539 | } |
540 | public: |
541 | filter_node_leaf( tbb::filter::mode m, const Body& b ) : mode(m), body(b) {} |
542 | }; |
543 | |
544 | //! Node in parse tree representing join of two filters. |
545 | class filter_node_join: public filter_node { |
546 | friend class filter_node; // to suppress GCC 3.2 warnings |
547 | filter_node& left; |
548 | filter_node& right; |
549 | ~filter_node_join() { |
550 | left.remove_ref(); |
551 | right.remove_ref(); |
552 | } |
553 | void add_to( pipeline& p ) __TBB_override { |
554 | left.add_to(p); |
555 | right.add_to(p); |
556 | } |
557 | public: |
558 | filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) { |
559 | left.add_ref(); |
560 | right.add_ref(); |
561 | } |
562 | }; |
563 | |
564 | } // namespace internal |
565 | //! @endcond |
566 | |
567 | //! Create a filter to participate in parallel_pipeline |
568 | template<typename T, typename U, typename Body> |
569 | filter_t<T,U> make_filter(tbb::filter::mode mode, const Body& body) { |
570 | return new internal::filter_node_leaf<T,U,Body>(mode, body); |
571 | } |
572 | |
573 | template<typename T, typename V, typename U> |
574 | filter_t<T,U> operator& (const filter_t<T,V>& left, const filter_t<V,U>& right) { |
575 | __TBB_ASSERT(left.root,"cannot use default-constructed filter_t as left argument of '&'" ); |
576 | __TBB_ASSERT(right.root,"cannot use default-constructed filter_t as right argument of '&'" ); |
577 | return new internal::filter_node_join(*left.root,*right.root); |
578 | } |
579 | |
580 | //! Class representing a chain of type-safe pipeline filters |
581 | template<typename T, typename U> |
582 | class filter_t { |
583 | typedef internal::filter_node filter_node; |
584 | filter_node* root; |
585 | filter_t( filter_node* root_ ) : root(root_) { |
586 | root->add_ref(); |
587 | } |
588 | friend class internal::pipeline_proxy; |
589 | template<typename T_, typename U_, typename Body> |
590 | friend filter_t<T_,U_> make_filter(tbb::filter::mode, const Body& ); |
591 | template<typename T_, typename V_, typename U_> |
592 | friend filter_t<T_,U_> operator& (const filter_t<T_,V_>& , const filter_t<V_,U_>& ); |
593 | public: |
594 | // TODO: add move-constructors, move-assignment, etc. where C++11 is available. |
595 | filter_t() : root(NULL) {} |
596 | filter_t( const filter_t<T,U>& rhs ) : root(rhs.root) { |
597 | if( root ) root->add_ref(); |
598 | } |
599 | template<typename Body> |
600 | filter_t( tbb::filter::mode mode, const Body& body ) : |
601 | root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) { |
602 | root->add_ref(); |
603 | } |
604 | |
605 | void operator=( const filter_t<T,U>& rhs ) { |
606 | // Order of operations below carefully chosen so that reference counts remain correct |
607 | // in unlikely event that remove_ref throws exception. |
608 | filter_node* old = root; |
609 | root = rhs.root; |
610 | if( root ) root->add_ref(); |
611 | if( old ) old->remove_ref(); |
612 | } |
613 | ~filter_t() { |
614 | if( root ) root->remove_ref(); |
615 | } |
616 | void clear() { |
617 | // Like operator= with filter_t() on right side. |
618 | if( root ) { |
619 | filter_node* old = root; |
620 | root = NULL; |
621 | old->remove_ref(); |
622 | } |
623 | } |
624 | }; |
625 | |
626 | inline internal::pipeline_proxy::pipeline_proxy( const filter_t<void,void>& filter_chain ) : my_pipe() { |
627 | __TBB_ASSERT( filter_chain.root, "cannot apply parallel_pipeline to default-constructed filter_t" ); |
628 | filter_chain.root->add_to(my_pipe); |
629 | } |
630 | |
631 | inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain |
632 | #if __TBB_TASK_GROUP_CONTEXT |
633 | , tbb::task_group_context& context |
634 | #endif |
635 | ) { |
636 | internal::pipeline_proxy pipe(filter_chain); |
637 | // tbb::pipeline::run() is called via the proxy |
638 | pipe->run(max_number_of_live_tokens |
639 | #if __TBB_TASK_GROUP_CONTEXT |
640 | , context |
641 | #endif |
642 | ); |
643 | } |
644 | |
645 | #if __TBB_TASK_GROUP_CONTEXT |
646 | inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain) { |
647 | tbb::task_group_context context; |
648 | parallel_pipeline(max_number_of_live_tokens, filter_chain, context); |
649 | } |
650 | #endif // __TBB_TASK_GROUP_CONTEXT |
651 | |
652 | } // interface6 |
653 | |
654 | using interface6::flow_control; |
655 | using interface6::filter_t; |
656 | using interface6::make_filter; |
657 | using interface6::parallel_pipeline; |
658 | |
659 | } // tbb |
660 | |
661 | #endif /* __TBB_pipeline_H */ |
662 | |