1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | #ifndef __TBB_parallel_while |
18 | #define __TBB_parallel_while |
19 | |
20 | #include "task.h" |
21 | #include <new> |
22 | |
23 | namespace tbb { |
24 | |
25 | template<typename Body> |
26 | class parallel_while; |
27 | |
28 | //! @cond INTERNAL |
29 | namespace internal { |
30 | |
31 | template<typename Stream, typename Body> class while_task; |
32 | |
33 | //! For internal use only. |
34 | /** Executes one iteration of a while. |
35 | @ingroup algorithms */ |
36 | template<typename Body> |
37 | class while_iteration_task: public task { |
38 | const Body& my_body; |
39 | typename Body::argument_type my_value; |
40 | task* execute() __TBB_override { |
41 | my_body(my_value); |
42 | return NULL; |
43 | } |
44 | while_iteration_task( const typename Body::argument_type& value, const Body& body ) : |
45 | my_body(body), my_value(value) |
46 | {} |
47 | template<typename Body_> friend class while_group_task; |
48 | friend class tbb::parallel_while<Body>; |
49 | }; |
50 | |
51 | //! For internal use only |
52 | /** Unpacks a block of iterations. |
53 | @ingroup algorithms */ |
54 | template<typename Body> |
55 | class while_group_task: public task { |
56 | static const size_t max_arg_size = 4; |
57 | const Body& my_body; |
58 | size_t size; |
59 | typename Body::argument_type my_arg[max_arg_size]; |
60 | while_group_task( const Body& body ) : my_body(body), size(0) {} |
61 | task* execute() __TBB_override { |
62 | typedef while_iteration_task<Body> iteration_type; |
63 | __TBB_ASSERT( size>0, NULL ); |
64 | task_list list; |
65 | task* t; |
66 | size_t k=0; |
67 | for(;;) { |
68 | t = new( allocate_child() ) iteration_type(my_arg[k],my_body); |
69 | if( ++k==size ) break; |
70 | list.push_back(*t); |
71 | } |
72 | set_ref_count(int(k+1)); |
73 | spawn(list); |
74 | spawn_and_wait_for_all(*t); |
75 | return NULL; |
76 | } |
77 | template<typename Stream, typename Body_> friend class while_task; |
78 | }; |
79 | |
80 | //! For internal use only. |
81 | /** Gets block of iterations from a stream and packages them into a while_group_task. |
82 | @ingroup algorithms */ |
83 | template<typename Stream, typename Body> |
84 | class while_task: public task { |
85 | Stream& my_stream; |
86 | const Body& my_body; |
87 | empty_task& my_barrier; |
88 | task* execute() __TBB_override { |
89 | typedef while_group_task<Body> block_type; |
90 | block_type& t = *new( allocate_additional_child_of(my_barrier) ) block_type(my_body); |
91 | size_t k=0; |
92 | while( my_stream.pop_if_present(t.my_arg[k]) ) { |
93 | if( ++k==block_type::max_arg_size ) { |
94 | // There might be more iterations. |
95 | recycle_to_reexecute(); |
96 | break; |
97 | } |
98 | } |
99 | if( k==0 ) { |
100 | destroy(t); |
101 | return NULL; |
102 | } else { |
103 | t.size = k; |
104 | return &t; |
105 | } |
106 | } |
107 | while_task( Stream& stream, const Body& body, empty_task& barrier ) : |
108 | my_stream(stream), |
109 | my_body(body), |
110 | my_barrier(barrier) |
111 | {} |
112 | friend class tbb::parallel_while<Body>; |
113 | }; |
114 | |
115 | } // namespace internal |
116 | //! @endcond |
117 | |
118 | //! Parallel iteration over a stream, with optional addition of more work. |
119 | /** The Body b has the requirement: \n |
120 | "b(v)" \n |
121 | "b.argument_type" \n |
122 | where v is an argument_type |
123 | @ingroup algorithms */ |
124 | template<typename Body> |
125 | class parallel_while: internal::no_copy { |
126 | public: |
127 | //! Construct empty non-running parallel while. |
128 | parallel_while() : my_body(NULL), my_barrier(NULL) {} |
129 | |
130 | //! Destructor cleans up data members before returning. |
131 | ~parallel_while() { |
132 | if( my_barrier ) { |
133 | my_barrier->destroy(*my_barrier); |
134 | my_barrier = NULL; |
135 | } |
136 | } |
137 | |
138 | //! Type of items |
139 | typedef typename Body::argument_type value_type; |
140 | |
141 | //! Apply body.apply to each item in the stream. |
142 | /** A Stream s has the requirements \n |
143 | "S::value_type" \n |
144 | "s.pop_if_present(value) is convertible to bool */ |
145 | template<typename Stream> |
146 | void run( Stream& stream, const Body& body ); |
147 | |
148 | //! Add a work item while running. |
149 | /** Should be executed only by body.apply or a thread spawned therefrom. */ |
150 | void add( const value_type& item ); |
151 | |
152 | private: |
153 | const Body* my_body; |
154 | empty_task* my_barrier; |
155 | }; |
156 | |
157 | template<typename Body> |
158 | template<typename Stream> |
159 | void parallel_while<Body>::run( Stream& stream, const Body& body ) { |
160 | using namespace internal; |
161 | empty_task& barrier = *new( task::allocate_root() ) empty_task(); |
162 | my_body = &body; |
163 | my_barrier = &barrier; |
164 | my_barrier->set_ref_count(2); |
165 | while_task<Stream,Body>& w = *new( my_barrier->allocate_child() ) while_task<Stream,Body>( stream, body, barrier ); |
166 | my_barrier->spawn_and_wait_for_all(w); |
167 | my_barrier->destroy(*my_barrier); |
168 | my_barrier = NULL; |
169 | my_body = NULL; |
170 | } |
171 | |
172 | template<typename Body> |
173 | void parallel_while<Body>::add( const value_type& item ) { |
174 | __TBB_ASSERT(my_barrier,"attempt to add to parallel_while that is not running" ); |
175 | typedef internal::while_iteration_task<Body> iteration_type; |
176 | iteration_type& i = *new( task::allocate_additional_child_of(*my_barrier) ) iteration_type(item,*my_body); |
177 | task::self().spawn( i ); |
178 | } |
179 | |
180 | } // namespace |
181 | |
182 | #endif /* __TBB_parallel_while */ |
183 | |