1/*
2 Copyright 2005-2013 Intel Corporation. All Rights Reserved.
3
4 This file is part of Threading Building Blocks.
5
6 Threading Building Blocks is free software; you can redistribute it
7 and/or modify it under the terms of the GNU General Public License
8 version 2 as published by the Free Software Foundation.
9
10 Threading Building Blocks is distributed in the hope that it will be
11 useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12 of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with Threading Building Blocks; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
19 As a special exception, you may use this file as part of a free software
20 library without restriction. Specifically, if other files instantiate
21 templates or use macros or inline functions from this file, or you compile
22 this file and link it with other files to produce an executable, this
23 file does not by itself cause the resulting executable to be covered by
24 the GNU General Public License. This exception does not however
25 invalidate any other reasons why the executable file might be covered by
26 the GNU General Public License.
27*/
28
29#ifndef __TBB_concurrent_queue_H
30#define __TBB_concurrent_queue_H
31
32#include "internal/_concurrent_queue_impl.h"
33
34namespace tbb {
35
36namespace strict_ppl {
37
38//! A high-performance thread-safe non-blocking concurrent queue.
39/** Multiple threads may each push and pop concurrently.
40 Assignment construction is not allowed.
41 @ingroup containers */
42template<typename T, typename A = cache_aligned_allocator<T> >
43class concurrent_queue: public internal::concurrent_queue_base_v3<T> {
44 template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
45
46 //! Allocator type
47 typedef typename A::template rebind<char>::other page_allocator_type;
48 page_allocator_type my_allocator;
49
50 //! Allocates a block of size n (bytes)
51 /*override*/ virtual void *allocate_block( size_t n ) {
52 void *b = reinterpret_cast<void*>(my_allocator.allocate( n ));
53 if( !b )
54 internal::throw_exception(internal::eid_bad_alloc);
55 return b;
56 }
57
58 //! Deallocates block created by allocate_block.
59 /*override*/ virtual void deallocate_block( void *b, size_t n ) {
60 my_allocator.deallocate( reinterpret_cast<char*>(b), n );
61 }
62
63public:
64 //! Element type in the queue.
65 typedef T value_type;
66
67 //! Reference type
68 typedef T& reference;
69
70 //! Const reference type
71 typedef const T& const_reference;
72
73 //! Integral type for representing size of the queue.
74 typedef size_t size_type;
75
76 //! Difference type for iterator
77 typedef ptrdiff_t difference_type;
78
79 //! Allocator type
80 typedef A allocator_type;
81
82 //! Construct empty queue
83 explicit concurrent_queue(const allocator_type& a = allocator_type()) :
84 my_allocator( a )
85 {
86 }
87
88 //! [begin,end) constructor
89 template<typename InputIterator>
90 concurrent_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
91 my_allocator( a )
92 {
93 for( ; begin != end; ++begin )
94 this->internal_push(&*begin);
95 }
96
97 //! Copy constructor
98 concurrent_queue( const concurrent_queue& src, const allocator_type& a = allocator_type()) :
99 internal::concurrent_queue_base_v3<T>(), my_allocator( a )
100 {
101 this->assign( src );
102 }
103
104 //! Destroy queue
105 ~concurrent_queue();
106
107 //! Enqueue an item at tail of queue.
108 void push( const T& source ) {
109 this->internal_push( &source );
110 }
111
112 //! Attempt to dequeue an item from head of queue.
113 /** Does not wait for item to become available.
114 Returns true if successful; false otherwise. */
115 bool try_pop( T& result ) {
116 return this->internal_try_pop( &result );
117 }
118
119 //! Return the number of items in the queue; thread unsafe
120 size_type unsafe_size() const {return this->internal_size();}
121
122 //! Equivalent to size()==0.
123 bool empty() const {return this->internal_empty();}
124
125 //! Clear the queue. not thread-safe.
126 void clear() ;
127
128 //! Return allocator object
129 allocator_type get_allocator() const { return this->my_allocator; }
130
131 typedef internal::concurrent_queue_iterator<concurrent_queue,T> iterator;
132 typedef internal::concurrent_queue_iterator<concurrent_queue,const T> const_iterator;
133
134 //------------------------------------------------------------------------
135 // The iterators are intended only for debugging. They are slow and not thread safe.
136 //------------------------------------------------------------------------
137 iterator unsafe_begin() {return iterator(*this);}
138 iterator unsafe_end() {return iterator();}
139 const_iterator unsafe_begin() const {return const_iterator(*this);}
140 const_iterator unsafe_end() const {return const_iterator();}
141} ;
142
143template<typename T, class A>
144concurrent_queue<T,A>::~concurrent_queue() {
145 clear();
146 this->internal_finish_clear();
147}
148
149template<typename T, class A>
150void concurrent_queue<T,A>::clear() {
151 while( !empty() ) {
152 T value;
153 this->internal_try_pop(&value);
154 }
155}
156
157} // namespace strict_ppl
158
159//! A high-performance thread-safe blocking concurrent bounded queue.
160/** This is the pre-PPL TBB concurrent queue which supports boundedness and blocking semantics.
161 Note that method names agree with the PPL-style concurrent queue.
162 Multiple threads may each push and pop concurrently.
163 Assignment construction is not allowed.
164 @ingroup containers */
165template<typename T, class A = cache_aligned_allocator<T> >
166class concurrent_bounded_queue: public internal::concurrent_queue_base_v3 {
167 template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
168
169 //! Allocator type
170 typedef typename A::template rebind<char>::other page_allocator_type;
171 page_allocator_type my_allocator;
172
173 typedef typename concurrent_queue_base_v3::padded_page<T> padded_page;
174
175 //! Class used to ensure exception-safety of method "pop"
176 class destroyer: internal::no_copy {
177 T& my_value;
178 public:
179 destroyer( T& value ) : my_value(value) {}
180 ~destroyer() {my_value.~T();}
181 };
182
183 T& get_ref( page& p, size_t index ) {
184 __TBB_ASSERT( index<items_per_page, NULL );
185 return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
186 }
187
188 /*override*/ virtual void copy_item( page& dst, size_t index, const void* src ) {
189 new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
190 }
191
192 /*override*/ virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
193 new( &get_ref(dst,dindex) ) T( get_ref( const_cast<page&>(src), sindex ) );
194 }
195
196 /*override*/ virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) {
197 T& from = get_ref(src,index);
198 destroyer d(from);
199 *static_cast<T*>(dst) = from;
200 }
201
202 /*override*/ virtual page *allocate_page() {
203 size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
204 page *p = reinterpret_cast<page*>(my_allocator.allocate( n ));
205 if( !p )
206 internal::throw_exception(internal::eid_bad_alloc);
207 return p;
208 }
209
210 /*override*/ virtual void deallocate_page( page *p ) {
211 size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
212 my_allocator.deallocate( reinterpret_cast<char*>(p), n );
213 }
214
215public:
216 //! Element type in the queue.
217 typedef T value_type;
218
219 //! Allocator type
220 typedef A allocator_type;
221
222 //! Reference type
223 typedef T& reference;
224
225 //! Const reference type
226 typedef const T& const_reference;
227
228 //! Integral type for representing size of the queue.
229 /** Note that the size_type is a signed integral type.
230 This is because the size can be negative if there are pending pops without corresponding pushes. */
231 typedef std::ptrdiff_t size_type;
232
233 //! Difference type for iterator
234 typedef std::ptrdiff_t difference_type;
235
236 //! Construct empty queue
237 explicit concurrent_bounded_queue(const allocator_type& a = allocator_type()) :
238 concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
239 {
240 }
241
242 //! Copy constructor
243 concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a = allocator_type()) :
244 concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
245 {
246 assign( src );
247 }
248
249 //! [begin,end) constructor
250 template<typename InputIterator>
251 concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
252 concurrent_queue_base_v3( sizeof(T) ), my_allocator( a )
253 {
254 for( ; begin != end; ++begin )
255 internal_push_if_not_full(&*begin);
256 }
257
258 //! Destroy queue
259 ~concurrent_bounded_queue();
260
261 //! Enqueue an item at tail of queue.
262 void push( const T& source ) {
263 internal_push( &source );
264 }
265
266 //! Dequeue item from head of queue.
267 /** Block until an item becomes available, and then dequeue it. */
268 void pop( T& destination ) {
269 internal_pop( &destination );
270 }
271
272#if TBB_USE_EXCEPTIONS
273 //! Abort all pending queue operations
274 void abort() {
275 internal_abort();
276 }
277#endif
278
279 //! Enqueue an item at tail of queue if queue is not already full.
280 /** Does not wait for queue to become not full.
281 Returns true if item is pushed; false if queue was already full. */
282 bool try_push( const T& source ) {
283 return internal_push_if_not_full( &source );
284 }
285
286 //! Attempt to dequeue an item from head of queue.
287 /** Does not wait for item to become available.
288 Returns true if successful; false otherwise. */
289 bool try_pop( T& destination ) {
290 return internal_pop_if_present( &destination );
291 }
292
293 //! Return number of pushes minus number of pops.
294 /** Note that the result can be negative if there are pops waiting for the
295 corresponding pushes. The result can also exceed capacity() if there
296 are push operations in flight. */
297 size_type size() const {return internal_size();}
298
299 //! Equivalent to size()<=0.
300 bool empty() const {return internal_empty();}
301
302 //! Maximum number of allowed elements
303 size_type capacity() const {
304 return my_capacity;
305 }
306
307 //! Set the capacity
308 /** Setting the capacity to 0 causes subsequent try_push operations to always fail,
309 and subsequent push operations to block forever. */
310 void set_capacity( size_type new_capacity ) {
311 internal_set_capacity( new_capacity, sizeof(T) );
312 }
313
314 //! return allocator object
315 allocator_type get_allocator() const { return this->my_allocator; }
316
317 //! clear the queue. not thread-safe.
318 void clear() ;
319
320 typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,T> iterator;
321 typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,const T> const_iterator;
322
323 //------------------------------------------------------------------------
324 // The iterators are intended only for debugging. They are slow and not thread safe.
325 //------------------------------------------------------------------------
326 iterator unsafe_begin() {return iterator(*this);}
327 iterator unsafe_end() {return iterator();}
328 const_iterator unsafe_begin() const {return const_iterator(*this);}
329 const_iterator unsafe_end() const {return const_iterator();}
330
331};
332
333template<typename T, class A>
334concurrent_bounded_queue<T,A>::~concurrent_bounded_queue() {
335 clear();
336 internal_finish_clear();
337}
338
339template<typename T, class A>
340void concurrent_bounded_queue<T,A>::clear() {
341 while( !empty() ) {
342 T value;
343 internal_pop_if_present(&value);
344 }
345}
346
347namespace deprecated {
348
349//! A high-performance thread-safe blocking concurrent bounded queue.
350/** This is the pre-PPL TBB concurrent queue which support boundedness and blocking semantics.
351 Note that method names agree with the PPL-style concurrent queue.
352 Multiple threads may each push and pop concurrently.
353 Assignment construction is not allowed.
354 @ingroup containers */
355template<typename T, class A = cache_aligned_allocator<T> >
356class concurrent_queue: public concurrent_bounded_queue<T,A> {
357#if !__TBB_TEMPLATE_FRIENDS_BROKEN
358 template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
359#endif
360
361public:
362 //! Construct empty queue
363 explicit concurrent_queue(const A& a = A()) :
364 concurrent_bounded_queue<T,A>( a )
365 {
366 }
367
368 //! Copy constructor
369 concurrent_queue( const concurrent_queue& src, const A& a = A()) :
370 concurrent_bounded_queue<T,A>( src, a )
371 {
372 }
373
374 //! [begin,end) constructor
375 template<typename InputIterator>
376 concurrent_queue( InputIterator b /*begin*/, InputIterator e /*end*/, const A& a = A()) :
377 concurrent_bounded_queue<T,A>( b, e, a )
378 {
379 }
380
381 //! Enqueue an item at tail of queue if queue is not already full.
382 /** Does not wait for queue to become not full.
383 Returns true if item is pushed; false if queue was already full. */
384 bool push_if_not_full( const T& source ) {
385 return this->try_push( source );
386 }
387
388 //! Attempt to dequeue an item from head of queue.
389 /** Does not wait for item to become available.
390 Returns true if successful; false otherwise.
391 @deprecated Use try_pop()
392 */
393 bool pop_if_present( T& destination ) {
394 return this->try_pop( destination );
395 }
396
397 typedef typename concurrent_bounded_queue<T,A>::iterator iterator;
398 typedef typename concurrent_bounded_queue<T,A>::const_iterator const_iterator;
399 //
400 //------------------------------------------------------------------------
401 // The iterators are intended only for debugging. They are slow and not thread safe.
402 //------------------------------------------------------------------------
403 iterator begin() {return this->unsafe_begin();}
404 iterator end() {return this->unsafe_end();}
405 const_iterator begin() const {return this->unsafe_begin();}
406 const_iterator end() const {return this->unsafe_end();}
407};
408
409}
410
411
412#if TBB_DEPRECATED
413using deprecated::concurrent_queue;
414#else
415using strict_ppl::concurrent_queue;
416#endif
417
418} // namespace tbb
419
420#endif /* __TBB_concurrent_queue_H */
421