1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | #ifndef __TBB_task_arena_H |
18 | #define __TBB_task_arena_H |
19 | |
20 | #include "task.h" |
21 | #include "tbb_exception.h" |
22 | #include "internal/_template_helpers.h" |
23 | #if TBB_USE_THREADING_TOOLS |
24 | #include "atomic.h" // for as_atomic |
25 | #endif |
26 | #include "aligned_space.h" |
27 | |
28 | namespace tbb { |
29 | |
30 | namespace this_task_arena { |
31 | int max_concurrency(); |
32 | } // namespace this_task_arena |
33 | |
34 | //! @cond INTERNAL |
35 | namespace internal { |
36 | //! Internal to library. Should not be used by clients. |
37 | /** @ingroup task_scheduling */ |
38 | class arena; |
39 | class task_scheduler_observer_v3; |
40 | } // namespace internal |
41 | //! @endcond |
42 | |
43 | namespace interface7 { |
44 | class task_arena; |
45 | |
46 | //! @cond INTERNAL |
47 | namespace internal { |
48 | using namespace tbb::internal; //e.g. function_task from task.h |
49 | |
50 | class delegate_base : no_assign { |
51 | public: |
52 | virtual void operator()() const = 0; |
53 | virtual ~delegate_base() {} |
54 | }; |
55 | |
56 | // If decltype is available, the helper detects the return type of functor of specified type, |
57 | // otherwise it defines the void type. |
58 | template <typename F> |
59 | struct return_type_or_void { |
60 | #if __TBB_CPP11_DECLTYPE_PRESENT && !__TBB_CPP11_DECLTYPE_OF_FUNCTION_RETURN_TYPE_BROKEN |
61 | typedef decltype(declval<F>()()) type; |
62 | #else |
63 | typedef void type; |
64 | #endif |
65 | }; |
66 | |
67 | template<typename F, typename R> |
68 | class delegated_function : public delegate_base { |
69 | F &my_func; |
70 | tbb::aligned_space<R> my_return_storage; |
71 | // The function should be called only once. |
72 | void operator()() const __TBB_override { |
73 | new (my_return_storage.begin()) R(my_func()); |
74 | } |
75 | public: |
76 | delegated_function(F& f) : my_func(f) {} |
77 | // The function can be called only after operator() and only once. |
78 | R consume_result() const { |
79 | return tbb::internal::move(*(my_return_storage.begin())); |
80 | } |
81 | ~delegated_function() { |
82 | my_return_storage.begin()->~R(); |
83 | } |
84 | }; |
85 | |
86 | template<typename F> |
87 | class delegated_function<F,void> : public delegate_base { |
88 | F &my_func; |
89 | void operator()() const __TBB_override { |
90 | my_func(); |
91 | } |
92 | public: |
93 | delegated_function(F& f) : my_func(f) {} |
94 | void consume_result() const {} |
95 | |
96 | friend class task_arena_base; |
97 | }; |
98 | |
99 | class task_arena_base { |
100 | protected: |
101 | //! NULL if not currently initialized. |
102 | internal::arena* my_arena; |
103 | |
104 | #if __TBB_TASK_GROUP_CONTEXT |
105 | //! default context of the arena |
106 | task_group_context *my_context; |
107 | #endif |
108 | |
109 | //! Concurrency level for deferred initialization |
110 | int my_max_concurrency; |
111 | |
112 | //! Reserved master slots |
113 | unsigned my_master_slots; |
114 | |
115 | //! Special settings |
116 | intptr_t my_version_and_traits; |
117 | |
118 | enum { |
119 | default_flags = 0 |
120 | #if __TBB_TASK_GROUP_CONTEXT |
121 | | (task_group_context::default_traits & task_group_context::exact_exception) // 0 or 1 << 16 |
122 | , exact_exception_flag = task_group_context::exact_exception // used to specify flag for context directly |
123 | #endif |
124 | }; |
125 | |
126 | task_arena_base(int max_concurrency, unsigned reserved_for_masters) |
127 | : my_arena(0) |
128 | #if __TBB_TASK_GROUP_CONTEXT |
129 | , my_context(0) |
130 | #endif |
131 | , my_max_concurrency(max_concurrency) |
132 | , my_master_slots(reserved_for_masters) |
133 | , my_version_and_traits(default_flags) |
134 | {} |
135 | |
136 | void __TBB_EXPORTED_METHOD internal_initialize(); |
137 | void __TBB_EXPORTED_METHOD internal_terminate(); |
138 | void __TBB_EXPORTED_METHOD internal_attach(); |
139 | void __TBB_EXPORTED_METHOD internal_enqueue( task&, intptr_t ) const; |
140 | void __TBB_EXPORTED_METHOD internal_execute( delegate_base& ) const; |
141 | void __TBB_EXPORTED_METHOD internal_wait() const; |
142 | static int __TBB_EXPORTED_FUNC internal_current_slot(); |
143 | static int __TBB_EXPORTED_FUNC internal_max_concurrency( const task_arena * ); |
144 | public: |
145 | //! Typedef for number of threads that is automatic. |
146 | static const int automatic = -1; |
147 | static const int not_initialized = -2; |
148 | |
149 | }; |
150 | |
151 | #if __TBB_TASK_ISOLATION |
152 | void __TBB_EXPORTED_FUNC isolate_within_arena( delegate_base& d, intptr_t reserved = 0 ); |
153 | |
154 | template<typename R, typename F> |
155 | R isolate_impl(F& f) { |
156 | delegated_function<F, R> d(f); |
157 | isolate_within_arena(d); |
158 | return d.consume_result(); |
159 | } |
160 | #endif /* __TBB_TASK_ISOLATION */ |
161 | } // namespace internal |
162 | //! @endcond |
163 | |
164 | /** 1-to-1 proxy representation class of scheduler's arena |
165 | * Constructors set up settings only, real construction is deferred till the first method invocation |
166 | * Destructor only removes one of the references to the inner arena representation. |
167 | * Final destruction happens when all the references (and the work) are gone. |
168 | */ |
169 | class task_arena : public internal::task_arena_base { |
170 | friend class tbb::internal::task_scheduler_observer_v3; |
171 | friend void task::enqueue(task&, task_arena& |
172 | #if __TBB_TASK_PRIORITY |
173 | , priority_t |
174 | #endif |
175 | ); |
176 | friend int tbb::this_task_arena::max_concurrency(); |
177 | bool my_initialized; |
178 | void mark_initialized() { |
179 | __TBB_ASSERT( my_arena, "task_arena initialization is incomplete" ); |
180 | #if __TBB_TASK_GROUP_CONTEXT |
181 | __TBB_ASSERT( my_context, "task_arena initialization is incomplete" ); |
182 | #endif |
183 | #if TBB_USE_THREADING_TOOLS |
184 | // Actual synchronization happens in internal_initialize & internal_attach. |
185 | // The race on setting my_initialized is benign, but should be hidden from Intel(R) Inspector |
186 | internal::as_atomic(my_initialized).fetch_and_store<release>(true); |
187 | #else |
188 | my_initialized = true; |
189 | #endif |
190 | } |
191 | |
192 | template<typename F> |
193 | void enqueue_impl( __TBB_FORWARDING_REF(F) f |
194 | #if __TBB_TASK_PRIORITY |
195 | , priority_t p = priority_t(0) |
196 | #endif |
197 | ) { |
198 | #if !__TBB_TASK_PRIORITY |
199 | intptr_t p = 0; |
200 | #endif |
201 | initialize(); |
202 | #if __TBB_TASK_GROUP_CONTEXT |
203 | internal_enqueue(*new(task::allocate_root(*my_context)) internal::function_task< typename internal::strip<F>::type >(internal::forward<F>(f)), p); |
204 | #else |
205 | internal_enqueue(*new(task::allocate_root()) internal::function_task< typename internal::strip<F>::type >(internal::forward<F>(f)), p); |
206 | #endif /* __TBB_TASK_GROUP_CONTEXT */ |
207 | } |
208 | |
209 | template<typename R, typename F> |
210 | R execute_impl(F& f) { |
211 | initialize(); |
212 | internal::delegated_function<F, R> d(f); |
213 | internal_execute(d); |
214 | return d.consume_result(); |
215 | } |
216 | |
217 | public: |
218 | //! Creates task_arena with certain concurrency limits |
219 | /** Sets up settings only, real construction is deferred till the first method invocation |
220 | * @arg max_concurrency specifies total number of slots in arena where threads work |
221 | * @arg reserved_for_masters specifies number of slots to be used by master threads only. |
222 | * Value of 1 is default and reflects behavior of implicit arenas. |
223 | **/ |
224 | task_arena(int max_concurrency_ = automatic, unsigned reserved_for_masters = 1) |
225 | : task_arena_base(max_concurrency_, reserved_for_masters) |
226 | , my_initialized(false) |
227 | {} |
228 | |
229 | //! Copies settings from another task_arena |
230 | task_arena(const task_arena &s) // copy settings but not the reference or instance |
231 | : task_arena_base(s.my_max_concurrency, s.my_master_slots) |
232 | , my_initialized(false) |
233 | {} |
234 | |
235 | //! Tag class used to indicate the "attaching" constructor |
236 | struct attach {}; |
237 | |
238 | //! Creates an instance of task_arena attached to the current arena of the thread |
239 | explicit task_arena( attach ) |
240 | : task_arena_base(automatic, 1) // use default settings if attach fails |
241 | , my_initialized(false) |
242 | { |
243 | internal_attach(); |
244 | if( my_arena ) my_initialized = true; |
245 | } |
246 | |
247 | //! Forces allocation of the resources for the task_arena as specified in constructor arguments |
248 | inline void initialize() { |
249 | if( !my_initialized ) { |
250 | internal_initialize(); |
251 | mark_initialized(); |
252 | } |
253 | } |
254 | |
255 | //! Overrides concurrency level and forces initialization of internal representation |
256 | inline void initialize(int max_concurrency_, unsigned reserved_for_masters = 1) { |
257 | // TODO: decide if this call must be thread-safe |
258 | __TBB_ASSERT(!my_arena, "Impossible to modify settings of an already initialized task_arena" ); |
259 | if( !my_initialized ) { |
260 | my_max_concurrency = max_concurrency_; |
261 | my_master_slots = reserved_for_masters; |
262 | initialize(); |
263 | } |
264 | } |
265 | |
266 | //! Attaches this instance to the current arena of the thread |
267 | inline void initialize(attach) { |
268 | // TODO: decide if this call must be thread-safe |
269 | __TBB_ASSERT(!my_arena, "Impossible to modify settings of an already initialized task_arena" ); |
270 | if( !my_initialized ) { |
271 | internal_attach(); |
272 | if ( !my_arena ) internal_initialize(); |
273 | mark_initialized(); |
274 | } |
275 | } |
276 | |
277 | //! Removes the reference to the internal arena representation. |
278 | //! Not thread safe wrt concurrent invocations of other methods. |
279 | inline void terminate() { |
280 | if( my_initialized ) { |
281 | internal_terminate(); |
282 | my_initialized = false; |
283 | } |
284 | } |
285 | |
286 | //! Removes the reference to the internal arena representation, and destroys the external object. |
287 | //! Not thread safe wrt concurrent invocations of other methods. |
288 | ~task_arena() { |
289 | terminate(); |
290 | } |
291 | |
292 | //! Returns true if the arena is active (initialized); false otherwise. |
293 | //! The name was chosen to match a task_scheduler_init method with the same semantics. |
294 | bool is_active() const { return my_initialized; } |
295 | |
296 | //! Enqueues a task into the arena to process a functor, and immediately returns. |
297 | //! Does not require the calling thread to join the arena |
298 | |
299 | #if __TBB_CPP11_RVALUE_REF_PRESENT |
300 | template<typename F> |
301 | void enqueue( F&& f ) { |
302 | enqueue_impl(std::forward<F>(f)); |
303 | } |
304 | #else |
305 | template<typename F> |
306 | void enqueue( const F& f ) { |
307 | enqueue_impl(f); |
308 | } |
309 | #endif |
310 | |
311 | #if __TBB_TASK_PRIORITY |
312 | //! Enqueues a task with priority p into the arena to process a functor f, and immediately returns. |
313 | //! Does not require the calling thread to join the arena |
314 | template<typename F> |
315 | #if __TBB_CPP11_RVALUE_REF_PRESENT |
316 | void enqueue( F&& f, priority_t p ) { |
317 | #if __TBB_PREVIEW_CRITICAL_TASKS |
318 | __TBB_ASSERT(p == priority_low || p == priority_normal || p == priority_high |
319 | || p == internal::priority_critical, "Invalid priority level value" ); |
320 | #else |
321 | __TBB_ASSERT(p == priority_low || p == priority_normal || p == priority_high, "Invalid priority level value" ); |
322 | #endif |
323 | enqueue_impl(std::forward<F>(f), p); |
324 | } |
325 | #else |
326 | void enqueue( const F& f, priority_t p ) { |
327 | #if __TBB_PREVIEW_CRITICAL_TASKS |
328 | __TBB_ASSERT(p == priority_low || p == priority_normal || p == priority_high |
329 | || p == internal::priority_critical, "Invalid priority level value" ); |
330 | #else |
331 | __TBB_ASSERT(p == priority_low || p == priority_normal || p == priority_high, "Invalid priority level value" ); |
332 | #endif |
333 | enqueue_impl(f,p); |
334 | } |
335 | #endif |
336 | #endif// __TBB_TASK_PRIORITY |
337 | |
338 | //! Joins the arena and executes a mutable functor, then returns |
339 | //! If not possible to join, wraps the functor into a task, enqueues it and waits for task completion |
340 | //! Can decrement the arena demand for workers, causing a worker to leave and free a slot to the calling thread |
341 | //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void). |
342 | template<typename F> |
343 | typename internal::return_type_or_void<F>::type execute(F& f) { |
344 | return execute_impl<typename internal::return_type_or_void<F>::type>(f); |
345 | } |
346 | |
347 | //! Joins the arena and executes a constant functor, then returns |
348 | //! If not possible to join, wraps the functor into a task, enqueues it and waits for task completion |
349 | //! Can decrement the arena demand for workers, causing a worker to leave and free a slot to the calling thread |
350 | //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void). |
351 | template<typename F> |
352 | typename internal::return_type_or_void<F>::type execute(const F& f) { |
353 | return execute_impl<typename internal::return_type_or_void<F>::type>(f); |
354 | } |
355 | |
356 | #if __TBB_EXTRA_DEBUG |
357 | //! Wait for all work in the arena to be completed |
358 | //! Even submitted by other application threads |
359 | //! Joins arena if/when possible (in the same way as execute()) |
360 | void debug_wait_until_empty() { |
361 | initialize(); |
362 | internal_wait(); |
363 | } |
364 | #endif //__TBB_EXTRA_DEBUG |
365 | |
366 | //! Returns the index, aka slot number, of the calling thread in its current arena |
367 | //! This method is deprecated and replaced with this_task_arena::current_thread_index() |
368 | inline static int current_thread_index() { |
369 | return internal_current_slot(); |
370 | } |
371 | |
372 | //! Returns the maximal number of threads that can work inside the arena |
373 | inline int max_concurrency() const { |
374 | // Handle special cases inside the library |
375 | return (my_max_concurrency>1) ? my_max_concurrency : internal_max_concurrency(this); |
376 | } |
377 | }; |
378 | |
379 | #if __TBB_TASK_ISOLATION |
380 | namespace this_task_arena { |
381 | //! Executes a mutable functor in isolation within the current task arena. |
382 | //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void). |
383 | template<typename F> |
384 | typename internal::return_type_or_void<F>::type isolate(F& f) { |
385 | return internal::isolate_impl<typename internal::return_type_or_void<F>::type>(f); |
386 | } |
387 | |
388 | //! Executes a constant functor in isolation within the current task arena. |
389 | //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void). |
390 | template<typename F> |
391 | typename internal::return_type_or_void<F>::type isolate(const F& f) { |
392 | return internal::isolate_impl<typename internal::return_type_or_void<F>::type>(f); |
393 | } |
394 | } |
395 | #endif /* __TBB_TASK_ISOLATION */ |
396 | } // namespace interfaceX |
397 | |
398 | using interface7::task_arena; |
399 | #if __TBB_TASK_ISOLATION |
400 | namespace this_task_arena { |
401 | using namespace interface7::this_task_arena; |
402 | } |
403 | #endif /* __TBB_TASK_ISOLATION */ |
404 | |
405 | namespace this_task_arena { |
406 | //! Returns the index, aka slot number, of the calling thread in its current arena |
407 | inline int current_thread_index() { |
408 | int idx = tbb::task_arena::current_thread_index(); |
409 | return idx == -1 ? tbb::task_arena::not_initialized : idx; |
410 | } |
411 | |
412 | //! Returns the maximal number of threads that can work inside the arena |
413 | inline int max_concurrency() { |
414 | return tbb::task_arena::internal_max_concurrency(NULL); |
415 | } |
416 | } // namespace this_task_arena |
417 | |
418 | //! Enqueue task in task_arena |
419 | void task::enqueue( task& t, task_arena& arena |
420 | #if __TBB_TASK_PRIORITY |
421 | , priority_t p |
422 | #endif |
423 | ) { |
424 | #if !__TBB_TASK_PRIORITY |
425 | intptr_t p = 0; |
426 | #endif |
427 | arena.initialize(); |
428 | //! Note: the context of the task may differ from the context instantiated by task_arena |
429 | arena.internal_enqueue(t, p); |
430 | } |
431 | } // namespace tbb |
432 | |
433 | #endif /* __TBB_task_arena_H */ |
434 | |