1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef __TBB_concurrent_queue_H
18#define __TBB_concurrent_queue_H
19
20#include "internal/_concurrent_queue_impl.h"
21#include "internal/_allocator_traits.h"
22
23namespace tbb {
24
25namespace strict_ppl {
26
27//! A high-performance thread-safe non-blocking concurrent queue.
28/** Multiple threads may each push and pop concurrently.
29 Assignment construction is not allowed.
30 @ingroup containers */
31template<typename T, typename A = cache_aligned_allocator<T> >
32class concurrent_queue: public internal::concurrent_queue_base_v3<T> {
33 template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
34
35 //! Allocator type
36 typedef typename tbb::internal::allocator_rebind<A, char>::type page_allocator_type;
37 page_allocator_type my_allocator;
38
39 //! Allocates a block of size n (bytes)
40 virtual void *allocate_block( size_t n ) __TBB_override {
41 void *b = reinterpret_cast<void*>(my_allocator.allocate( n ));
42 if( !b )
43 internal::throw_exception(internal::eid_bad_alloc);
44 return b;
45 }
46
47 //! Deallocates block created by allocate_block.
48 virtual void deallocate_block( void *b, size_t n ) __TBB_override {
49 my_allocator.deallocate( reinterpret_cast<char*>(b), n );
50 }
51
52 static void copy_construct_item(T* location, const void* src){
53 new (location) T(*static_cast<const T*>(src));
54 }
55
56#if __TBB_CPP11_RVALUE_REF_PRESENT
57 static void move_construct_item(T* location, const void* src) {
58 new (location) T( std::move(*static_cast<T*>(const_cast<void*>(src))) );
59 }
60#endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
61public:
62 //! Element type in the queue.
63 typedef T value_type;
64
65 //! Reference type
66 typedef T& reference;
67
68 //! Const reference type
69 typedef const T& const_reference;
70
71 //! Integral type for representing size of the queue.
72 typedef size_t size_type;
73
74 //! Difference type for iterator
75 typedef ptrdiff_t difference_type;
76
77 //! Allocator type
78 typedef A allocator_type;
79
80 //! Construct empty queue
81 explicit concurrent_queue(const allocator_type& a = allocator_type()) :
82 my_allocator( a )
83 {
84 }
85
86 //! [begin,end) constructor
87 template<typename InputIterator>
88 concurrent_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
89 my_allocator( a )
90 {
91 for( ; begin != end; ++begin )
92 this->push(*begin);
93 }
94
95 //! Copy constructor
96 concurrent_queue( const concurrent_queue& src, const allocator_type& a = allocator_type()) :
97 internal::concurrent_queue_base_v3<T>(), my_allocator( a )
98 {
99 this->assign( src, copy_construct_item );
100 }
101
102#if __TBB_CPP11_RVALUE_REF_PRESENT
103 //! Move constructors
104 concurrent_queue( concurrent_queue&& src ) :
105 internal::concurrent_queue_base_v3<T>(), my_allocator( std::move(src.my_allocator) )
106 {
107 this->internal_swap( src );
108 }
109
110 concurrent_queue( concurrent_queue&& src, const allocator_type& a ) :
111 internal::concurrent_queue_base_v3<T>(), my_allocator( a )
112 {
113 // checking that memory allocated by one instance of allocator can be deallocated
114 // with another
115 if( my_allocator == src.my_allocator) {
116 this->internal_swap( src );
117 } else {
118 // allocators are different => performing per-element move
119 this->assign( src, move_construct_item );
120 src.clear();
121 }
122 }
123#endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
124
125 //! Destroy queue
126 ~concurrent_queue();
127
128 //! Enqueue an item at tail of queue.
129 void push( const T& source ) {
130 this->internal_push( &source, copy_construct_item );
131 }
132
133#if __TBB_CPP11_RVALUE_REF_PRESENT
134 void push( T&& source ) {
135 this->internal_push( &source, move_construct_item );
136 }
137
138#if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
139 template<typename... Arguments>
140 void emplace( Arguments&&... args ) {
141 push( T(std::forward<Arguments>( args )...) );
142 }
143#endif //__TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
144#endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
145
146 //! Attempt to dequeue an item from head of queue.
147 /** Does not wait for item to become available.
148 Returns true if successful; false otherwise. */
149 bool try_pop( T& result ) {
150 return this->internal_try_pop( &result );
151 }
152
153 //! Return the number of items in the queue; thread unsafe
154 size_type unsafe_size() const {return this->internal_size();}
155
156 //! Equivalent to size()==0.
157 bool empty() const {return this->internal_empty();}
158
159 //! Clear the queue. not thread-safe.
160 void clear() ;
161
162 //! Return allocator object
163 allocator_type get_allocator() const { return this->my_allocator; }
164
165 typedef internal::concurrent_queue_iterator<concurrent_queue,T> iterator;
166 typedef internal::concurrent_queue_iterator<concurrent_queue,const T> const_iterator;
167
168 //------------------------------------------------------------------------
169 // The iterators are intended only for debugging. They are slow and not thread safe.
170 //------------------------------------------------------------------------
171 iterator unsafe_begin() {return iterator(*this);}
172 iterator unsafe_end() {return iterator();}
173 const_iterator unsafe_begin() const {return const_iterator(*this);}
174 const_iterator unsafe_end() const {return const_iterator();}
175} ;
176
177#if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
178// Deduction guide for the constructor from two iterators
179template<typename InputIterator,
180 typename T = typename std::iterator_traits<InputIterator>::value_type,
181 typename A = cache_aligned_allocator<T>
182> concurrent_queue(InputIterator, InputIterator, const A& = A())
183-> concurrent_queue<T, A>;
184#endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
185
186template<typename T, class A>
187concurrent_queue<T,A>::~concurrent_queue() {
188 clear();
189 this->internal_finish_clear();
190}
191
192template<typename T, class A>
193void concurrent_queue<T,A>::clear() {
194 T value;
195 while( !empty() ) try_pop(value);
196}
197
198} // namespace strict_ppl
199
200//! A high-performance thread-safe blocking concurrent bounded queue.
201/** This is the pre-PPL TBB concurrent queue which supports boundedness and blocking semantics.
202 Note that method names agree with the PPL-style concurrent queue.
203 Multiple threads may each push and pop concurrently.
204 Assignment construction is not allowed.
205 @ingroup containers */
206template<typename T, class A = cache_aligned_allocator<T> >
207class concurrent_bounded_queue: public internal::concurrent_queue_base_v8 {
208 template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
209 typedef typename tbb::internal::allocator_rebind<A, char>::type page_allocator_type;
210
211 //! Allocator type
212 page_allocator_type my_allocator;
213
214 typedef typename concurrent_queue_base_v3::padded_page<T> padded_page;
215 typedef typename concurrent_queue_base_v3::copy_specifics copy_specifics;
216
217 //! Class used to ensure exception-safety of method "pop"
218 class destroyer: internal::no_copy {
219 T& my_value;
220 public:
221 destroyer( T& value ) : my_value(value) {}
222 ~destroyer() {my_value.~T();}
223 };
224
225 T& get_ref( page& p, size_t index ) {
226 __TBB_ASSERT( index<items_per_page, NULL );
227 return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
228 }
229
230 virtual void copy_item( page& dst, size_t index, const void* src ) __TBB_override {
231 new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
232 }
233
234#if __TBB_CPP11_RVALUE_REF_PRESENT
235 virtual void move_item( page& dst, size_t index, const void* src ) __TBB_override {
236 new( &get_ref(dst,index) ) T( std::move(*static_cast<T*>(const_cast<void*>(src))) );
237 }
238#else
239 virtual void move_item( page&, size_t, const void* ) __TBB_override {
240 __TBB_ASSERT( false, "Unreachable code" );
241 }
242#endif
243
244 virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) __TBB_override {
245 new( &get_ref(dst,dindex) ) T( get_ref( const_cast<page&>(src), sindex ) );
246 }
247
248#if __TBB_CPP11_RVALUE_REF_PRESENT
249 virtual void move_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) __TBB_override {
250 new( &get_ref(dst,dindex) ) T( std::move(get_ref( const_cast<page&>(src), sindex )) );
251 }
252#else
253 virtual void move_page_item( page&, size_t, const page&, size_t ) __TBB_override {
254 __TBB_ASSERT( false, "Unreachable code" );
255 }
256#endif
257
258 virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) __TBB_override {
259 T& from = get_ref(src,index);
260 destroyer d(from);
261 *static_cast<T*>(dst) = tbb::internal::move( from );
262 }
263
264 virtual page *allocate_page() __TBB_override {
265 size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
266 page *p = reinterpret_cast<page*>(my_allocator.allocate( n ));
267 if( !p )
268 internal::throw_exception(internal::eid_bad_alloc);
269 return p;
270 }
271
272 virtual void deallocate_page( page *p ) __TBB_override {
273 size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
274 my_allocator.deallocate( reinterpret_cast<char*>(p), n );
275 }
276
277public:
278 //! Element type in the queue.
279 typedef T value_type;
280
281 //! Allocator type
282 typedef A allocator_type;
283
284 //! Reference type
285 typedef T& reference;
286
287 //! Const reference type
288 typedef const T& const_reference;
289
290 //! Integral type for representing size of the queue.
291 /** Note that the size_type is a signed integral type.
292 This is because the size can be negative if there are pending pops without corresponding pushes. */
293 typedef std::ptrdiff_t size_type;
294
295 //! Difference type for iterator
296 typedef std::ptrdiff_t difference_type;
297
298 //! Construct empty queue
299 explicit concurrent_bounded_queue(const allocator_type& a = allocator_type()) :
300 concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
301 {
302 }
303
304 //! Copy constructor
305 concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a = allocator_type())
306 : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
307 {
308 assign( src );
309 }
310
311#if __TBB_CPP11_RVALUE_REF_PRESENT
312 //! Move constructors
313 concurrent_bounded_queue( concurrent_bounded_queue&& src )
314 : concurrent_queue_base_v8( sizeof(T) ), my_allocator( std::move(src.my_allocator) )
315 {
316 internal_swap( src );
317 }
318
319 concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a )
320 : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
321 {
322 // checking that memory allocated by one instance of allocator can be deallocated
323 // with another
324 if( my_allocator == src.my_allocator) {
325 this->internal_swap( src );
326 } else {
327 // allocators are different => performing per-element move
328 this->move_content( src );
329 src.clear();
330 }
331 }
332#endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
333
334 //! [begin,end) constructor
335 template<typename InputIterator>
336 concurrent_bounded_queue( InputIterator begin, InputIterator end,
337 const allocator_type& a = allocator_type())
338 : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
339 {
340 for( ; begin != end; ++begin )
341 internal_push_if_not_full(&*begin);
342 }
343
344 //! Destroy queue
345 ~concurrent_bounded_queue();
346
347 //! Enqueue an item at tail of queue.
348 void push( const T& source ) {
349 internal_push( &source );
350 }
351
352#if __TBB_CPP11_RVALUE_REF_PRESENT
353 //! Move an item at tail of queue.
354 void push( T&& source ) {
355 internal_push_move( &source );
356 }
357
358#if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
359 template<typename... Arguments>
360 void emplace( Arguments&&... args ) {
361 push( T(std::forward<Arguments>( args )...) );
362 }
363#endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
364#endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
365
366 //! Dequeue item from head of queue.
367 /** Block until an item becomes available, and then dequeue it. */
368 void pop( T& destination ) {
369 internal_pop( &destination );
370 }
371
372#if TBB_USE_EXCEPTIONS
373 //! Abort all pending queue operations
374 void abort() {
375 internal_abort();
376 }
377#endif
378
379 //! Enqueue an item at tail of queue if queue is not already full.
380 /** Does not wait for queue to become not full.
381 Returns true if item is pushed; false if queue was already full. */
382 bool try_push( const T& source ) {
383 return internal_push_if_not_full( &source );
384 }
385
386#if __TBB_CPP11_RVALUE_REF_PRESENT
387 //! Move an item at tail of queue if queue is not already full.
388 /** Does not wait for queue to become not full.
389 Returns true if item is pushed; false if queue was already full. */
390 bool try_push( T&& source ) {
391 return internal_push_move_if_not_full( &source );
392 }
393#if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
394 template<typename... Arguments>
395 bool try_emplace( Arguments&&... args ) {
396 return try_push( T(std::forward<Arguments>( args )...) );
397 }
398#endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
399#endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
400
401 //! Attempt to dequeue an item from head of queue.
402 /** Does not wait for item to become available.
403 Returns true if successful; false otherwise. */
404 bool try_pop( T& destination ) {
405 return internal_pop_if_present( &destination );
406 }
407
408 //! Return number of pushes minus number of pops.
409 /** Note that the result can be negative if there are pops waiting for the
410 corresponding pushes. The result can also exceed capacity() if there
411 are push operations in flight. */
412 size_type size() const {return internal_size();}
413
414 //! Equivalent to size()<=0.
415 bool empty() const {return internal_empty();}
416
417 //! Maximum number of allowed elements
418 size_type capacity() const {
419 return my_capacity;
420 }
421
422 //! Set the capacity
423 /** Setting the capacity to 0 causes subsequent try_push operations to always fail,
424 and subsequent push operations to block forever. */
425 void set_capacity( size_type new_capacity ) {
426 internal_set_capacity( new_capacity, sizeof(T) );
427 }
428
429 //! return allocator object
430 allocator_type get_allocator() const { return this->my_allocator; }
431
432 //! clear the queue. not thread-safe.
433 void clear() ;
434
435 typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,T> iterator;
436 typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,const T> const_iterator;
437
438 //------------------------------------------------------------------------
439 // The iterators are intended only for debugging. They are slow and not thread safe.
440 //------------------------------------------------------------------------
441 iterator unsafe_begin() {return iterator(*this);}
442 iterator unsafe_end() {return iterator();}
443 const_iterator unsafe_begin() const {return const_iterator(*this);}
444 const_iterator unsafe_end() const {return const_iterator();}
445
446};
447
448#if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
449// guide for concurrent_bounded_queue(InputIterator, InputIterator, ...)
450template<typename InputIterator,
451 typename T = typename std::iterator_traits<InputIterator>::value_type,
452 typename A = cache_aligned_allocator<T>
453> concurrent_bounded_queue(InputIterator, InputIterator, const A& = A())
454-> concurrent_bounded_queue<T, A>;
455#endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
456
457template<typename T, class A>
458concurrent_bounded_queue<T,A>::~concurrent_bounded_queue() {
459 clear();
460 internal_finish_clear();
461}
462
463template<typename T, class A>
464void concurrent_bounded_queue<T,A>::clear() {
465 T value;
466 while( try_pop(value) ) /*noop*/;
467}
468
469using strict_ppl::concurrent_queue;
470
471} // namespace tbb
472
473#endif /* __TBB_concurrent_queue_H */
474