1 | /* |
2 | Copyright (c) 2005-2019 Intel Corporation |
3 | |
4 | Licensed under the Apache License, Version 2.0 (the "License"); |
5 | you may not use this file except in compliance with the License. |
6 | You may obtain a copy of the License at |
7 | |
8 | http://www.apache.org/licenses/LICENSE-2.0 |
9 | |
10 | Unless required by applicable law or agreed to in writing, software |
11 | distributed under the License is distributed on an "AS IS" BASIS, |
12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | See the License for the specific language governing permissions and |
14 | limitations under the License. |
15 | */ |
16 | |
17 | #ifndef __TBB_concurrent_queue_H |
18 | #define __TBB_concurrent_queue_H |
19 | |
20 | #include "internal/_concurrent_queue_impl.h" |
21 | #include "internal/_allocator_traits.h" |
22 | |
23 | namespace tbb { |
24 | |
25 | namespace strict_ppl { |
26 | |
27 | //! A high-performance thread-safe non-blocking concurrent queue. |
28 | /** Multiple threads may each push and pop concurrently. |
29 | Assignment construction is not allowed. |
30 | @ingroup containers */ |
31 | template<typename T, typename A = cache_aligned_allocator<T> > |
32 | class concurrent_queue: public internal::concurrent_queue_base_v3<T> { |
33 | template<typename Container, typename Value> friend class internal::concurrent_queue_iterator; |
34 | |
35 | //! Allocator type |
36 | typedef typename tbb::internal::allocator_rebind<A, char>::type page_allocator_type; |
37 | page_allocator_type my_allocator; |
38 | |
39 | //! Allocates a block of size n (bytes) |
40 | virtual void *allocate_block( size_t n ) __TBB_override { |
41 | void *b = reinterpret_cast<void*>(my_allocator.allocate( n )); |
42 | if( !b ) |
43 | internal::throw_exception(internal::eid_bad_alloc); |
44 | return b; |
45 | } |
46 | |
47 | //! Deallocates block created by allocate_block. |
48 | virtual void deallocate_block( void *b, size_t n ) __TBB_override { |
49 | my_allocator.deallocate( reinterpret_cast<char*>(b), n ); |
50 | } |
51 | |
52 | static void copy_construct_item(T* location, const void* src){ |
53 | new (location) T(*static_cast<const T*>(src)); |
54 | } |
55 | |
56 | #if __TBB_CPP11_RVALUE_REF_PRESENT |
57 | static void move_construct_item(T* location, const void* src) { |
58 | new (location) T( std::move(*static_cast<T*>(const_cast<void*>(src))) ); |
59 | } |
60 | #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */ |
61 | public: |
62 | //! Element type in the queue. |
63 | typedef T value_type; |
64 | |
65 | //! Reference type |
66 | typedef T& reference; |
67 | |
68 | //! Const reference type |
69 | typedef const T& const_reference; |
70 | |
71 | //! Integral type for representing size of the queue. |
72 | typedef size_t size_type; |
73 | |
74 | //! Difference type for iterator |
75 | typedef ptrdiff_t difference_type; |
76 | |
77 | //! Allocator type |
78 | typedef A allocator_type; |
79 | |
80 | //! Construct empty queue |
81 | explicit concurrent_queue(const allocator_type& a = allocator_type()) : |
82 | my_allocator( a ) |
83 | { |
84 | } |
85 | |
86 | //! [begin,end) constructor |
87 | template<typename InputIterator> |
88 | concurrent_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) : |
89 | my_allocator( a ) |
90 | { |
91 | for( ; begin != end; ++begin ) |
92 | this->push(*begin); |
93 | } |
94 | |
95 | //! Copy constructor |
96 | concurrent_queue( const concurrent_queue& src, const allocator_type& a = allocator_type()) : |
97 | internal::concurrent_queue_base_v3<T>(), my_allocator( a ) |
98 | { |
99 | this->assign( src, copy_construct_item ); |
100 | } |
101 | |
102 | #if __TBB_CPP11_RVALUE_REF_PRESENT |
103 | //! Move constructors |
104 | concurrent_queue( concurrent_queue&& src ) : |
105 | internal::concurrent_queue_base_v3<T>(), my_allocator( std::move(src.my_allocator) ) |
106 | { |
107 | this->internal_swap( src ); |
108 | } |
109 | |
110 | concurrent_queue( concurrent_queue&& src, const allocator_type& a ) : |
111 | internal::concurrent_queue_base_v3<T>(), my_allocator( a ) |
112 | { |
113 | // checking that memory allocated by one instance of allocator can be deallocated |
114 | // with another |
115 | if( my_allocator == src.my_allocator) { |
116 | this->internal_swap( src ); |
117 | } else { |
118 | // allocators are different => performing per-element move |
119 | this->assign( src, move_construct_item ); |
120 | src.clear(); |
121 | } |
122 | } |
123 | #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */ |
124 | |
125 | //! Destroy queue |
126 | ~concurrent_queue(); |
127 | |
128 | //! Enqueue an item at tail of queue. |
129 | void push( const T& source ) { |
130 | this->internal_push( &source, copy_construct_item ); |
131 | } |
132 | |
133 | #if __TBB_CPP11_RVALUE_REF_PRESENT |
134 | void push( T&& source ) { |
135 | this->internal_push( &source, move_construct_item ); |
136 | } |
137 | |
138 | #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT |
139 | template<typename... Arguments> |
140 | void emplace( Arguments&&... args ) { |
141 | push( T(std::forward<Arguments>( args )...) ); |
142 | } |
143 | #endif //__TBB_CPP11_VARIADIC_TEMPLATES_PRESENT |
144 | #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */ |
145 | |
146 | //! Attempt to dequeue an item from head of queue. |
147 | /** Does not wait for item to become available. |
148 | Returns true if successful; false otherwise. */ |
149 | bool try_pop( T& result ) { |
150 | return this->internal_try_pop( &result ); |
151 | } |
152 | |
153 | //! Return the number of items in the queue; thread unsafe |
154 | size_type unsafe_size() const {return this->internal_size();} |
155 | |
156 | //! Equivalent to size()==0. |
157 | bool empty() const {return this->internal_empty();} |
158 | |
159 | //! Clear the queue. not thread-safe. |
160 | void clear() ; |
161 | |
162 | //! Return allocator object |
163 | allocator_type get_allocator() const { return this->my_allocator; } |
164 | |
165 | typedef internal::concurrent_queue_iterator<concurrent_queue,T> iterator; |
166 | typedef internal::concurrent_queue_iterator<concurrent_queue,const T> const_iterator; |
167 | |
168 | //------------------------------------------------------------------------ |
169 | // The iterators are intended only for debugging. They are slow and not thread safe. |
170 | //------------------------------------------------------------------------ |
171 | iterator unsafe_begin() {return iterator(*this);} |
172 | iterator unsafe_end() {return iterator();} |
173 | const_iterator unsafe_begin() const {return const_iterator(*this);} |
174 | const_iterator unsafe_end() const {return const_iterator();} |
175 | } ; |
176 | |
177 | #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT |
178 | // Deduction guide for the constructor from two iterators |
179 | template<typename InputIterator, |
180 | typename T = typename std::iterator_traits<InputIterator>::value_type, |
181 | typename A = cache_aligned_allocator<T> |
182 | > concurrent_queue(InputIterator, InputIterator, const A& = A()) |
183 | -> concurrent_queue<T, A>; |
184 | #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */ |
185 | |
186 | template<typename T, class A> |
187 | concurrent_queue<T,A>::~concurrent_queue() { |
188 | clear(); |
189 | this->internal_finish_clear(); |
190 | } |
191 | |
192 | template<typename T, class A> |
193 | void concurrent_queue<T,A>::clear() { |
194 | T value; |
195 | while( !empty() ) try_pop(value); |
196 | } |
197 | |
198 | } // namespace strict_ppl |
199 | |
200 | //! A high-performance thread-safe blocking concurrent bounded queue. |
201 | /** This is the pre-PPL TBB concurrent queue which supports boundedness and blocking semantics. |
202 | Note that method names agree with the PPL-style concurrent queue. |
203 | Multiple threads may each push and pop concurrently. |
204 | Assignment construction is not allowed. |
205 | @ingroup containers */ |
206 | template<typename T, class A = cache_aligned_allocator<T> > |
207 | class concurrent_bounded_queue: public internal::concurrent_queue_base_v8 { |
208 | template<typename Container, typename Value> friend class internal::concurrent_queue_iterator; |
209 | typedef typename tbb::internal::allocator_rebind<A, char>::type page_allocator_type; |
210 | |
211 | //! Allocator type |
212 | page_allocator_type my_allocator; |
213 | |
214 | typedef typename concurrent_queue_base_v3::padded_page<T> padded_page; |
215 | typedef typename concurrent_queue_base_v3::copy_specifics copy_specifics; |
216 | |
217 | //! Class used to ensure exception-safety of method "pop" |
218 | class destroyer: internal::no_copy { |
219 | T& my_value; |
220 | public: |
221 | destroyer( T& value ) : my_value(value) {} |
222 | ~destroyer() {my_value.~T();} |
223 | }; |
224 | |
225 | T& get_ref( page& p, size_t index ) { |
226 | __TBB_ASSERT( index<items_per_page, NULL ); |
227 | return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index]; |
228 | } |
229 | |
230 | virtual void copy_item( page& dst, size_t index, const void* src ) __TBB_override { |
231 | new( &get_ref(dst,index) ) T(*static_cast<const T*>(src)); |
232 | } |
233 | |
234 | #if __TBB_CPP11_RVALUE_REF_PRESENT |
235 | virtual void move_item( page& dst, size_t index, const void* src ) __TBB_override { |
236 | new( &get_ref(dst,index) ) T( std::move(*static_cast<T*>(const_cast<void*>(src))) ); |
237 | } |
238 | #else |
239 | virtual void move_item( page&, size_t, const void* ) __TBB_override { |
240 | __TBB_ASSERT( false, "Unreachable code" ); |
241 | } |
242 | #endif |
243 | |
244 | virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) __TBB_override { |
245 | new( &get_ref(dst,dindex) ) T( get_ref( const_cast<page&>(src), sindex ) ); |
246 | } |
247 | |
248 | #if __TBB_CPP11_RVALUE_REF_PRESENT |
249 | virtual void move_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) __TBB_override { |
250 | new( &get_ref(dst,dindex) ) T( std::move(get_ref( const_cast<page&>(src), sindex )) ); |
251 | } |
252 | #else |
253 | virtual void move_page_item( page&, size_t, const page&, size_t ) __TBB_override { |
254 | __TBB_ASSERT( false, "Unreachable code" ); |
255 | } |
256 | #endif |
257 | |
258 | virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) __TBB_override { |
259 | T& from = get_ref(src,index); |
260 | destroyer d(from); |
261 | *static_cast<T*>(dst) = tbb::internal::move( from ); |
262 | } |
263 | |
264 | virtual page *allocate_page() __TBB_override { |
265 | size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T); |
266 | page *p = reinterpret_cast<page*>(my_allocator.allocate( n )); |
267 | if( !p ) |
268 | internal::throw_exception(internal::eid_bad_alloc); |
269 | return p; |
270 | } |
271 | |
272 | virtual void deallocate_page( page *p ) __TBB_override { |
273 | size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T); |
274 | my_allocator.deallocate( reinterpret_cast<char*>(p), n ); |
275 | } |
276 | |
277 | public: |
278 | //! Element type in the queue. |
279 | typedef T value_type; |
280 | |
281 | //! Allocator type |
282 | typedef A allocator_type; |
283 | |
284 | //! Reference type |
285 | typedef T& reference; |
286 | |
287 | //! Const reference type |
288 | typedef const T& const_reference; |
289 | |
290 | //! Integral type for representing size of the queue. |
291 | /** Note that the size_type is a signed integral type. |
292 | This is because the size can be negative if there are pending pops without corresponding pushes. */ |
293 | typedef std::ptrdiff_t size_type; |
294 | |
295 | //! Difference type for iterator |
296 | typedef std::ptrdiff_t difference_type; |
297 | |
298 | //! Construct empty queue |
299 | explicit concurrent_bounded_queue(const allocator_type& a = allocator_type()) : |
300 | concurrent_queue_base_v8( sizeof(T) ), my_allocator( a ) |
301 | { |
302 | } |
303 | |
304 | //! Copy constructor |
305 | concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a = allocator_type()) |
306 | : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a ) |
307 | { |
308 | assign( src ); |
309 | } |
310 | |
311 | #if __TBB_CPP11_RVALUE_REF_PRESENT |
312 | //! Move constructors |
313 | concurrent_bounded_queue( concurrent_bounded_queue&& src ) |
314 | : concurrent_queue_base_v8( sizeof(T) ), my_allocator( std::move(src.my_allocator) ) |
315 | { |
316 | internal_swap( src ); |
317 | } |
318 | |
319 | concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a ) |
320 | : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a ) |
321 | { |
322 | // checking that memory allocated by one instance of allocator can be deallocated |
323 | // with another |
324 | if( my_allocator == src.my_allocator) { |
325 | this->internal_swap( src ); |
326 | } else { |
327 | // allocators are different => performing per-element move |
328 | this->move_content( src ); |
329 | src.clear(); |
330 | } |
331 | } |
332 | #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */ |
333 | |
334 | //! [begin,end) constructor |
335 | template<typename InputIterator> |
336 | concurrent_bounded_queue( InputIterator begin, InputIterator end, |
337 | const allocator_type& a = allocator_type()) |
338 | : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a ) |
339 | { |
340 | for( ; begin != end; ++begin ) |
341 | internal_push_if_not_full(&*begin); |
342 | } |
343 | |
344 | //! Destroy queue |
345 | ~concurrent_bounded_queue(); |
346 | |
347 | //! Enqueue an item at tail of queue. |
348 | void push( const T& source ) { |
349 | internal_push( &source ); |
350 | } |
351 | |
352 | #if __TBB_CPP11_RVALUE_REF_PRESENT |
353 | //! Move an item at tail of queue. |
354 | void push( T&& source ) { |
355 | internal_push_move( &source ); |
356 | } |
357 | |
358 | #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT |
359 | template<typename... Arguments> |
360 | void emplace( Arguments&&... args ) { |
361 | push( T(std::forward<Arguments>( args )...) ); |
362 | } |
363 | #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */ |
364 | #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */ |
365 | |
366 | //! Dequeue item from head of queue. |
367 | /** Block until an item becomes available, and then dequeue it. */ |
368 | void pop( T& destination ) { |
369 | internal_pop( &destination ); |
370 | } |
371 | |
372 | #if TBB_USE_EXCEPTIONS |
373 | //! Abort all pending queue operations |
374 | void abort() { |
375 | internal_abort(); |
376 | } |
377 | #endif |
378 | |
379 | //! Enqueue an item at tail of queue if queue is not already full. |
380 | /** Does not wait for queue to become not full. |
381 | Returns true if item is pushed; false if queue was already full. */ |
382 | bool try_push( const T& source ) { |
383 | return internal_push_if_not_full( &source ); |
384 | } |
385 | |
386 | #if __TBB_CPP11_RVALUE_REF_PRESENT |
387 | //! Move an item at tail of queue if queue is not already full. |
388 | /** Does not wait for queue to become not full. |
389 | Returns true if item is pushed; false if queue was already full. */ |
390 | bool try_push( T&& source ) { |
391 | return internal_push_move_if_not_full( &source ); |
392 | } |
393 | #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT |
394 | template<typename... Arguments> |
395 | bool try_emplace( Arguments&&... args ) { |
396 | return try_push( T(std::forward<Arguments>( args )...) ); |
397 | } |
398 | #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */ |
399 | #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */ |
400 | |
401 | //! Attempt to dequeue an item from head of queue. |
402 | /** Does not wait for item to become available. |
403 | Returns true if successful; false otherwise. */ |
404 | bool try_pop( T& destination ) { |
405 | return internal_pop_if_present( &destination ); |
406 | } |
407 | |
408 | //! Return number of pushes minus number of pops. |
409 | /** Note that the result can be negative if there are pops waiting for the |
410 | corresponding pushes. The result can also exceed capacity() if there |
411 | are push operations in flight. */ |
412 | size_type size() const {return internal_size();} |
413 | |
414 | //! Equivalent to size()<=0. |
415 | bool empty() const {return internal_empty();} |
416 | |
417 | //! Maximum number of allowed elements |
418 | size_type capacity() const { |
419 | return my_capacity; |
420 | } |
421 | |
422 | //! Set the capacity |
423 | /** Setting the capacity to 0 causes subsequent try_push operations to always fail, |
424 | and subsequent push operations to block forever. */ |
425 | void set_capacity( size_type new_capacity ) { |
426 | internal_set_capacity( new_capacity, sizeof(T) ); |
427 | } |
428 | |
429 | //! return allocator object |
430 | allocator_type get_allocator() const { return this->my_allocator; } |
431 | |
432 | //! clear the queue. not thread-safe. |
433 | void clear() ; |
434 | |
435 | typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,T> iterator; |
436 | typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,const T> const_iterator; |
437 | |
438 | //------------------------------------------------------------------------ |
439 | // The iterators are intended only for debugging. They are slow and not thread safe. |
440 | //------------------------------------------------------------------------ |
441 | iterator unsafe_begin() {return iterator(*this);} |
442 | iterator unsafe_end() {return iterator();} |
443 | const_iterator unsafe_begin() const {return const_iterator(*this);} |
444 | const_iterator unsafe_end() const {return const_iterator();} |
445 | |
446 | }; |
447 | |
448 | #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT |
449 | // guide for concurrent_bounded_queue(InputIterator, InputIterator, ...) |
450 | template<typename InputIterator, |
451 | typename T = typename std::iterator_traits<InputIterator>::value_type, |
452 | typename A = cache_aligned_allocator<T> |
453 | > concurrent_bounded_queue(InputIterator, InputIterator, const A& = A()) |
454 | -> concurrent_bounded_queue<T, A>; |
455 | #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */ |
456 | |
457 | template<typename T, class A> |
458 | concurrent_bounded_queue<T,A>::~concurrent_bounded_queue() { |
459 | clear(); |
460 | internal_finish_clear(); |
461 | } |
462 | |
463 | template<typename T, class A> |
464 | void concurrent_bounded_queue<T,A>::clear() { |
465 | T value; |
466 | while( try_pop(value) ) /*noop*/; |
467 | } |
468 | |
469 | using strict_ppl::concurrent_queue; |
470 | |
471 | } // namespace tbb |
472 | |
473 | #endif /* __TBB_concurrent_queue_H */ |
474 | |