1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef __TBB_parallel_while
18#define __TBB_parallel_while
19
20#include "task.h"
21#include <new>
22
23namespace tbb {
24
25template<typename Body>
26class parallel_while;
27
28//! @cond INTERNAL
29namespace internal {
30
31 template<typename Stream, typename Body> class while_task;
32
33 //! For internal use only.
34 /** Executes one iteration of a while.
35 @ingroup algorithms */
36 template<typename Body>
37 class while_iteration_task: public task {
38 const Body& my_body;
39 typename Body::argument_type my_value;
40 task* execute() __TBB_override {
41 my_body(my_value);
42 return NULL;
43 }
44 while_iteration_task( const typename Body::argument_type& value, const Body& body ) :
45 my_body(body), my_value(value)
46 {}
47 template<typename Body_> friend class while_group_task;
48 friend class tbb::parallel_while<Body>;
49 };
50
51 //! For internal use only
52 /** Unpacks a block of iterations.
53 @ingroup algorithms */
54 template<typename Body>
55 class while_group_task: public task {
56 static const size_t max_arg_size = 4;
57 const Body& my_body;
58 size_t size;
59 typename Body::argument_type my_arg[max_arg_size];
60 while_group_task( const Body& body ) : my_body(body), size(0) {}
61 task* execute() __TBB_override {
62 typedef while_iteration_task<Body> iteration_type;
63 __TBB_ASSERT( size>0, NULL );
64 task_list list;
65 task* t;
66 size_t k=0;
67 for(;;) {
68 t = new( allocate_child() ) iteration_type(my_arg[k],my_body);
69 if( ++k==size ) break;
70 list.push_back(*t);
71 }
72 set_ref_count(int(k+1));
73 spawn(list);
74 spawn_and_wait_for_all(*t);
75 return NULL;
76 }
77 template<typename Stream, typename Body_> friend class while_task;
78 };
79
80 //! For internal use only.
81 /** Gets block of iterations from a stream and packages them into a while_group_task.
82 @ingroup algorithms */
83 template<typename Stream, typename Body>
84 class while_task: public task {
85 Stream& my_stream;
86 const Body& my_body;
87 empty_task& my_barrier;
88 task* execute() __TBB_override {
89 typedef while_group_task<Body> block_type;
90 block_type& t = *new( allocate_additional_child_of(my_barrier) ) block_type(my_body);
91 size_t k=0;
92 while( my_stream.pop_if_present(t.my_arg[k]) ) {
93 if( ++k==block_type::max_arg_size ) {
94 // There might be more iterations.
95 recycle_to_reexecute();
96 break;
97 }
98 }
99 if( k==0 ) {
100 destroy(t);
101 return NULL;
102 } else {
103 t.size = k;
104 return &t;
105 }
106 }
107 while_task( Stream& stream, const Body& body, empty_task& barrier ) :
108 my_stream(stream),
109 my_body(body),
110 my_barrier(barrier)
111 {}
112 friend class tbb::parallel_while<Body>;
113 };
114
115} // namespace internal
116//! @endcond
117
118//! Parallel iteration over a stream, with optional addition of more work.
119/** The Body b has the requirement: \n
120 "b(v)" \n
121 "b.argument_type" \n
122 where v is an argument_type
123 @ingroup algorithms */
124template<typename Body>
125class parallel_while: internal::no_copy {
126public:
127 //! Construct empty non-running parallel while.
128 parallel_while() : my_body(NULL), my_barrier(NULL) {}
129
130 //! Destructor cleans up data members before returning.
131 ~parallel_while() {
132 if( my_barrier ) {
133 my_barrier->destroy(*my_barrier);
134 my_barrier = NULL;
135 }
136 }
137
138 //! Type of items
139 typedef typename Body::argument_type value_type;
140
141 //! Apply body.apply to each item in the stream.
142 /** A Stream s has the requirements \n
143 "S::value_type" \n
144 "s.pop_if_present(value) is convertible to bool */
145 template<typename Stream>
146 void run( Stream& stream, const Body& body );
147
148 //! Add a work item while running.
149 /** Should be executed only by body.apply or a thread spawned therefrom. */
150 void add( const value_type& item );
151
152private:
153 const Body* my_body;
154 empty_task* my_barrier;
155};
156
157template<typename Body>
158template<typename Stream>
159void parallel_while<Body>::run( Stream& stream, const Body& body ) {
160 using namespace internal;
161 empty_task& barrier = *new( task::allocate_root() ) empty_task();
162 my_body = &body;
163 my_barrier = &barrier;
164 my_barrier->set_ref_count(2);
165 while_task<Stream,Body>& w = *new( my_barrier->allocate_child() ) while_task<Stream,Body>( stream, body, barrier );
166 my_barrier->spawn_and_wait_for_all(w);
167 my_barrier->destroy(*my_barrier);
168 my_barrier = NULL;
169 my_body = NULL;
170}
171
172template<typename Body>
173void parallel_while<Body>::add( const value_type& item ) {
174 __TBB_ASSERT(my_barrier,"attempt to add to parallel_while that is not running");
175 typedef internal::while_iteration_task<Body> iteration_type;
176 iteration_type& i = *new( task::allocate_additional_child_of(*my_barrier) ) iteration_type(item,*my_body);
177 task::self().spawn( i );
178}
179
180} // namespace
181
182#endif /* __TBB_parallel_while */
183