1/*
2 Copyright 2005-2013 Intel Corporation. All Rights Reserved.
3
4 This file is part of Threading Building Blocks.
5
6 Threading Building Blocks is free software; you can redistribute it
7 and/or modify it under the terms of the GNU General Public License
8 version 2 as published by the Free Software Foundation.
9
10 Threading Building Blocks is distributed in the hope that it will be
11 useful, but WITHOUT ANY WARRANTY; without even the implied warranty
12 of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with Threading Building Blocks; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
19 As a special exception, you may use this file as part of a free software
20 library without restriction. Specifically, if other files instantiate
21 templates or use macros or inline functions from this file, or you compile
22 this file and link it with other files to produce an executable, this
23 file does not by itself cause the resulting executable to be covered by
24 the GNU General Public License. This exception does not however
25 invalidate any other reasons why the executable file might be covered by
26 the GNU General Public License.
27*/
28
29#ifndef __TBB__concurrent_queue_impl_H
30#define __TBB__concurrent_queue_impl_H
31
32#ifndef __TBB_concurrent_queue_H
33#error Do not #include this internal file directly; use public TBB headers instead.
34#endif
35
36#include "../tbb_stddef.h"
37#include "../tbb_machine.h"
38#include "../atomic.h"
39#include "../spin_mutex.h"
40#include "../cache_aligned_allocator.h"
41#include "../tbb_exception.h"
42#include "../tbb_profiling.h"
43#include <new>
44
45#if !TBB_USE_EXCEPTIONS && _MSC_VER
46 // Suppress "C++ exception handler used, but unwind semantics are not enabled" warning in STL headers
47 #pragma warning (push)
48 #pragma warning (disable: 4530)
49#endif
50
51#include <iterator>
52
53#if !TBB_USE_EXCEPTIONS && _MSC_VER
54 #pragma warning (pop)
55#endif
56
57namespace tbb {
58
59#if !__TBB_TEMPLATE_FRIENDS_BROKEN
60
61// forward declaration
62namespace strict_ppl {
63template<typename T, typename A> class concurrent_queue;
64}
65
66template<typename T, typename A> class concurrent_bounded_queue;
67
68namespace deprecated {
69template<typename T, typename A> class concurrent_queue;
70}
71#endif
72
73//! For internal use only.
74namespace strict_ppl {
75
76//! @cond INTERNAL
77namespace internal {
78
79using namespace tbb::internal;
80
81typedef size_t ticket;
82
83template<typename T> class micro_queue ;
84template<typename T> class micro_queue_pop_finalizer ;
85template<typename T> class concurrent_queue_base_v3;
86
87//! parts of concurrent_queue_rep that do not have references to micro_queue
88/**
89 * For internal use only.
90 */
91struct concurrent_queue_rep_base : no_copy {
92 template<typename T> friend class micro_queue;
93 template<typename T> friend class concurrent_queue_base_v3;
94
95protected:
96 //! Approximately n_queue/golden ratio
97 static const size_t phi = 3;
98
99public:
100 // must be power of 2
101 static const size_t n_queue = 8;
102
103 //! Prefix on a page
104 struct page {
105 page* next;
106 uintptr_t mask;
107 };
108
109 atomic<ticket> head_counter;
110 char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
111 atomic<ticket> tail_counter;
112 char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
113
114 //! Always a power of 2
115 size_t items_per_page;
116
117 //! Size of an item
118 size_t item_size;
119
120 //! number of invalid entries in the queue
121 atomic<size_t> n_invalid_entries;
122
123 char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
124} ;
125
126inline bool is_valid_page(const concurrent_queue_rep_base::page* p) {
127 return uintptr_t(p)>1;
128}
129
130//! Abstract class to define interface for page allocation/deallocation
131/**
132 * For internal use only.
133 */
134class concurrent_queue_page_allocator
135{
136 template<typename T> friend class micro_queue ;
137 template<typename T> friend class micro_queue_pop_finalizer ;
138protected:
139 virtual ~concurrent_queue_page_allocator() {}
140private:
141 virtual concurrent_queue_rep_base::page* allocate_page() = 0;
142 virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
143} ;
144
145#if _MSC_VER && !defined(__INTEL_COMPILER)
146// unary minus operator applied to unsigned type, result still unsigned
147#pragma warning( push )
148#pragma warning( disable: 4146 )
149#endif
150
151//! A queue using simple locking.
152/** For efficiency, this class has no constructor.
153 The caller is expected to zero-initialize it. */
154template<typename T>
155class micro_queue : no_copy {
156 typedef concurrent_queue_rep_base::page page;
157
158 //! Class used to ensure exception-safety of method "pop"
159 class destroyer: no_copy {
160 T& my_value;
161 public:
162 destroyer( T& value ) : my_value(value) {}
163 ~destroyer() {my_value.~T();}
164 };
165
166 void copy_item( page& dst, size_t index, const void* src ) {
167 new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
168 }
169
170 void copy_item( page& dst, size_t dindex, const page& src, size_t sindex ) {
171 new( &get_ref(dst,dindex) ) T( get_ref(const_cast<page&>(src),sindex) );
172 }
173
174 void assign_and_destroy_item( void* dst, page& src, size_t index ) {
175 T& from = get_ref(src,index);
176 destroyer d(from);
177 *static_cast<T*>(dst) = from;
178 }
179
180 void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
181
182public:
183 friend class micro_queue_pop_finalizer<T>;
184
185 struct padded_page: page {
186 //! Not defined anywhere - exists to quiet warnings.
187 padded_page();
188 //! Not defined anywhere - exists to quiet warnings.
189 void operator=( const padded_page& );
190 //! Must be last field.
191 T last;
192 };
193
194 static T& get_ref( page& p, size_t index ) {
195 return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
196 }
197
198 atomic<page*> head_page;
199 atomic<ticket> head_counter;
200
201 atomic<page*> tail_page;
202 atomic<ticket> tail_counter;
203
204 spin_mutex page_mutex;
205
206 void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) ;
207
208 bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
209
210 micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base ) ;
211
212 page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) ;
213
214 void invalidate_page_and_rethrow( ticket k ) ;
215};
216
217template<typename T>
218void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
219 for( atomic_backoff b(true);;b.pause() ) {
220 ticket c = counter;
221 if( c==k ) return;
222 else if( c&1 ) {
223 ++rb.n_invalid_entries;
224 throw_exception( eid_bad_last_alloc );
225 }
226 }
227}
228
229template<typename T>
230void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) {
231 k &= -concurrent_queue_rep_base::n_queue;
232 page* p = NULL;
233 size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page);
234 if( !index ) {
235 __TBB_TRY {
236 concurrent_queue_page_allocator& pa = base;
237 p = pa.allocate_page();
238 } __TBB_CATCH (...) {
239 ++base.my_rep->n_invalid_entries;
240 invalidate_page_and_rethrow( k );
241 }
242 p->mask = 0;
243 p->next = NULL;
244 }
245
246 if( tail_counter!=k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
247 call_itt_notify(acquired, &tail_counter);
248
249 if( p ) {
250 spin_mutex::scoped_lock lock( page_mutex );
251 page* q = tail_page;
252 if( is_valid_page(q) )
253 q->next = p;
254 else
255 head_page = p;
256 tail_page = p;
257 } else {
258 p = tail_page;
259 }
260 __TBB_TRY {
261 copy_item( *p, index, item );
262 // If no exception was thrown, mark item as present.
263 itt_hide_store_word(p->mask, p->mask | uintptr_t(1)<<index);
264 call_itt_notify(releasing, &tail_counter);
265 tail_counter += concurrent_queue_rep_base::n_queue;
266 } __TBB_CATCH (...) {
267 ++base.my_rep->n_invalid_entries;
268 call_itt_notify(releasing, &tail_counter);
269 tail_counter += concurrent_queue_rep_base::n_queue;
270 __TBB_RETHROW();
271 }
272}
273
274template<typename T>
275bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
276 k &= -concurrent_queue_rep_base::n_queue;
277 if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
278 call_itt_notify(acquired, &head_counter);
279 if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
280 call_itt_notify(acquired, &tail_counter);
281 page& p = *head_page;
282 __TBB_ASSERT( &p, NULL );
283 size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
284 bool success = false;
285 {
286 micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL );
287 if( p.mask & uintptr_t(1)<<index ) {
288 success = true;
289 assign_and_destroy_item( dst, p, index );
290 } else {
291 --base.my_rep->n_invalid_entries;
292 }
293 }
294 return success;
295}
296
297template<typename T>
298micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base ) {
299 head_counter = src.head_counter;
300 tail_counter = src.tail_counter;
301 page_mutex = src.page_mutex;
302
303 const page* srcp = src.head_page;
304 if( is_valid_page(srcp) ) {
305 ticket g_index = head_counter;
306 __TBB_TRY {
307 size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
308 size_t index = modulo_power_of_two( head_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
309 size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
310
311 head_page = make_copy( base, srcp, index, end_in_first_page, g_index );
312 page* cur_page = head_page;
313
314 if( srcp != src.tail_page ) {
315 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
316 cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index );
317 cur_page = cur_page->next;
318 }
319
320 __TBB_ASSERT( srcp==src.tail_page, NULL );
321 size_t last_index = modulo_power_of_two( tail_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
322 if( last_index==0 ) last_index = base.my_rep->items_per_page;
323
324 cur_page->next = make_copy( base, srcp, 0, last_index, g_index );
325 cur_page = cur_page->next;
326 }
327 tail_page = cur_page;
328 } __TBB_CATCH (...) {
329 invalidate_page_and_rethrow( g_index );
330 }
331 } else {
332 head_page = tail_page = NULL;
333 }
334 return *this;
335}
336
337template<typename T>
338void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) {
339 // Append an invalid page at address 1 so that no more pushes are allowed.
340 page* invalid_page = (page*)uintptr_t(1);
341 {
342 spin_mutex::scoped_lock lock( page_mutex );
343 itt_store_word_with_release(tail_counter, k+concurrent_queue_rep_base::n_queue+1);
344 page* q = tail_page;
345 if( is_valid_page(q) )
346 q->next = invalid_page;
347 else
348 head_page = invalid_page;
349 tail_page = invalid_page;
350 }
351 __TBB_RETHROW();
352}
353
354template<typename T>
355concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base, const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) {
356 concurrent_queue_page_allocator& pa = base;
357 page* new_page = pa.allocate_page();
358 new_page->next = NULL;
359 new_page->mask = src_page->mask;
360 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
361 if( new_page->mask & uintptr_t(1)<<begin_in_page )
362 copy_item( *new_page, begin_in_page, *src_page, begin_in_page );
363 return new_page;
364}
365
366template<typename T>
367class micro_queue_pop_finalizer: no_copy {
368 typedef concurrent_queue_rep_base::page page;
369 ticket my_ticket;
370 micro_queue<T>& my_queue;
371 page* my_page;
372 concurrent_queue_page_allocator& allocator;
373public:
374 micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
375 my_ticket(k), my_queue(queue), my_page(p), allocator(b)
376 {}
377 ~micro_queue_pop_finalizer() ;
378};
379
380template<typename T>
381micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
382 page* p = my_page;
383 if( is_valid_page(p) ) {
384 spin_mutex::scoped_lock lock( my_queue.page_mutex );
385 page* q = p->next;
386 my_queue.head_page = q;
387 if( !is_valid_page(q) ) {
388 my_queue.tail_page = NULL;
389 }
390 }
391 itt_store_word_with_release(my_queue.head_counter, my_ticket);
392 if( is_valid_page(p) ) {
393 allocator.deallocate_page( p );
394 }
395}
396
397#if _MSC_VER && !defined(__INTEL_COMPILER)
398#pragma warning( pop )
399#endif // warning 4146 is back
400
401template<typename T> class concurrent_queue_iterator_rep ;
402template<typename T> class concurrent_queue_iterator_base_v3;
403
404//! representation of concurrent_queue_base
405/**
406 * the class inherits from concurrent_queue_rep_base and defines an array of micro_queue<T>'s
407 */
408template<typename T>
409struct concurrent_queue_rep : public concurrent_queue_rep_base {
410 micro_queue<T> array[n_queue];
411
412 //! Map ticket to an array index
413 static size_t index( ticket k ) {
414 return k*phi%n_queue;
415 }
416
417 micro_queue<T>& choose( ticket k ) {
418 // The formula here approximates LRU in a cache-oblivious way.
419 return array[index(k)];
420 }
421};
422
423//! base class of concurrent_queue
424/**
425 * The class implements the interface defined by concurrent_queue_page_allocator
426 * and has a pointer to an instance of concurrent_queue_rep.
427 */
428template<typename T>
429class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
430 //! Internal representation
431 concurrent_queue_rep<T>* my_rep;
432
433 friend struct concurrent_queue_rep<T>;
434 friend class micro_queue<T>;
435 friend class concurrent_queue_iterator_rep<T>;
436 friend class concurrent_queue_iterator_base_v3<T>;
437
438protected:
439 typedef typename concurrent_queue_rep<T>::page page;
440
441private:
442 typedef typename micro_queue<T>::padded_page padded_page;
443
444 /* override */ virtual page *allocate_page() {
445 concurrent_queue_rep<T>& r = *my_rep;
446 size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
447 return reinterpret_cast<page*>(allocate_block ( n ));
448 }
449
450 /* override */ virtual void deallocate_page( concurrent_queue_rep_base::page *p ) {
451 concurrent_queue_rep<T>& r = *my_rep;
452 size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
453 deallocate_block( reinterpret_cast<void*>(p), n );
454 }
455
456 //! custom allocator
457 virtual void *allocate_block( size_t n ) = 0;
458
459 //! custom de-allocator
460 virtual void deallocate_block( void *p, size_t n ) = 0;
461
462protected:
463 concurrent_queue_base_v3();
464
465 /* override */ virtual ~concurrent_queue_base_v3() {
466#if TBB_USE_ASSERT
467 size_t nq = my_rep->n_queue;
468 for( size_t i=0; i<nq; i++ )
469 __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
470#endif /* TBB_USE_ASSERT */
471 cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
472 }
473
474 //! Enqueue item at tail of queue
475 void internal_push( const void* src ) {
476 concurrent_queue_rep<T>& r = *my_rep;
477 ticket k = r.tail_counter++;
478 r.choose(k).push( src, k, *this );
479 }
480
481 //! Attempt to dequeue item from queue.
482 /** NULL if there was no item to dequeue. */
483 bool internal_try_pop( void* dst ) ;
484
485 //! Get size of queue; result may be invalid if queue is modified concurrently
486 size_t internal_size() const ;
487
488 //! check if the queue is empty; thread safe
489 bool internal_empty() const ;
490
491 //! free any remaining pages
492 /* note that the name may be misleading, but it remains so due to a historical accident. */
493 void internal_finish_clear() ;
494
495 //! Obsolete
496 void internal_throw_exception() const {
497 throw_exception( eid_bad_alloc );
498 }
499
500 //! copy internal representation
501 void assign( const concurrent_queue_base_v3& src ) ;
502};
503
504template<typename T>
505concurrent_queue_base_v3<T>::concurrent_queue_base_v3() {
506 const size_t item_size = sizeof(T);
507 my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
508 __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
509 __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
510 __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
511 __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
512 memset(my_rep,0,sizeof(concurrent_queue_rep<T>));
513 my_rep->item_size = item_size;
514 my_rep->items_per_page = item_size<= 8 ? 32 :
515 item_size<= 16 ? 16 :
516 item_size<= 32 ? 8 :
517 item_size<= 64 ? 4 :
518 item_size<=128 ? 2 :
519 1;
520}
521
522template<typename T>
523bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) {
524 concurrent_queue_rep<T>& r = *my_rep;
525 ticket k;
526 do {
527 k = r.head_counter;
528 for(;;) {
529 if( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
530 // Queue is empty
531 return false;
532 }
533 // Queue had item with ticket k when we looked. Attempt to get that item.
534 ticket tk=k;
535#if defined(_MSC_VER) && defined(_Wp64)
536 #pragma warning (push)
537 #pragma warning (disable: 4267)
538#endif
539 k = r.head_counter.compare_and_swap( tk+1, tk );
540#if defined(_MSC_VER) && defined(_Wp64)
541 #pragma warning (pop)
542#endif
543 if( k==tk )
544 break;
545 // Another thread snatched the item, retry.
546 }
547 } while( !r.choose( k ).pop( dst, k, *this ) );
548 return true;
549}
550
551template<typename T>
552size_t concurrent_queue_base_v3<T>::internal_size() const {
553 concurrent_queue_rep<T>& r = *my_rep;
554 __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
555 ticket hc = r.head_counter;
556 size_t nie = r.n_invalid_entries;
557 ticket tc = r.tail_counter;
558 __TBB_ASSERT( hc!=tc || !nie, NULL );
559 ptrdiff_t sz = tc-hc-nie;
560 return sz<0 ? 0 : size_t(sz);
561}
562
563template<typename T>
564bool concurrent_queue_base_v3<T>::internal_empty() const {
565 concurrent_queue_rep<T>& r = *my_rep;
566 ticket tc = r.tail_counter;
567 ticket hc = r.head_counter;
568 // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
569 return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
570}
571
572template<typename T>
573void concurrent_queue_base_v3<T>::internal_finish_clear() {
574 concurrent_queue_rep<T>& r = *my_rep;
575 size_t nq = r.n_queue;
576 for( size_t i=0; i<nq; ++i ) {
577 page* tp = r.array[i].tail_page;
578 if( is_valid_page(tp) ) {
579 __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
580 deallocate_page( tp );
581 r.array[i].tail_page = NULL;
582 } else
583 __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
584 }
585}
586
587template<typename T>
588void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src ) {
589 concurrent_queue_rep<T>& r = *my_rep;
590 r.items_per_page = src.my_rep->items_per_page;
591
592 // copy concurrent_queue_rep.
593 r.head_counter = src.my_rep->head_counter;
594 r.tail_counter = src.my_rep->tail_counter;
595 r.n_invalid_entries = src.my_rep->n_invalid_entries;
596
597 // copy micro_queues
598 for( size_t i = 0; i<r.n_queue; ++i )
599 r.array[i].assign( src.my_rep->array[i], *this);
600
601 __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter,
602 "the source concurrent queue should not be concurrently modified." );
603}
604
605template<typename Container, typename Value> class concurrent_queue_iterator;
606
607template<typename T>
608class concurrent_queue_iterator_rep: no_assign {
609 typedef typename micro_queue<T>::padded_page padded_page;
610public:
611 ticket head_counter;
612 const concurrent_queue_base_v3<T>& my_queue;
613 typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
614 concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) :
615 head_counter(queue.my_rep->head_counter),
616 my_queue(queue)
617 {
618 for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
619 array[k] = queue.my_rep->array[k].head_page;
620 }
621
622 //! Set item to point to kth element. Return true if at end of queue or item is marked valid; false otherwise.
623 bool get_item( T*& item, size_t k ) ;
624};
625
626template<typename T>
627bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) {
628 if( k==my_queue.my_rep->tail_counter ) {
629 item = NULL;
630 return true;
631 } else {
632 typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
633 __TBB_ASSERT(p,NULL);
634 size_t i = modulo_power_of_two( k/concurrent_queue_rep<T>::n_queue, my_queue.my_rep->items_per_page );
635 item = &micro_queue<T>::get_ref(*p,i);
636 return (p->mask & uintptr_t(1)<<i)!=0;
637 }
638}
639
640//! Constness-independent portion of concurrent_queue_iterator.
641/** @ingroup containers */
642template<typename Value>
643class concurrent_queue_iterator_base_v3 : no_assign {
644 //! Represents concurrent_queue over which we are iterating.
645 /** NULL if one past last element in queue. */
646 concurrent_queue_iterator_rep<Value>* my_rep;
647
648 template<typename C, typename T, typename U>
649 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
650
651 template<typename C, typename T, typename U>
652 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
653protected:
654 //! Pointer to current item
655 Value* my_item;
656
657 //! Default constructor
658 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
659#if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN
660 __TBB_compiler_fence();
661#endif
662 }
663
664 //! Copy constructor
665 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i )
666 : no_assign(), my_rep(NULL), my_item(NULL) {
667 assign(i);
668 }
669
670 //! Construct iterator pointing to head of queue.
671 concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ;
672
673 //! Assignment
674 void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
675
676 //! Advance iterator one step towards tail of queue.
677 void advance() ;
678
679 //! Destructor
680 ~concurrent_queue_iterator_base_v3() {
681 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
682 my_rep = NULL;
683 }
684};
685
686template<typename Value>
687concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) {
688 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
689 new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
690 size_t k = my_rep->head_counter;
691 if( !my_rep->get_item(my_item, k) ) advance();
692}
693
694template<typename Value>
695void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) {
696 if( my_rep!=other.my_rep ) {
697 if( my_rep ) {
698 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
699 my_rep = NULL;
700 }
701 if( other.my_rep ) {
702 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
703 new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
704 }
705 }
706 my_item = other.my_item;
707}
708
709template<typename Value>
710void concurrent_queue_iterator_base_v3<Value>::advance() {
711 __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
712 size_t k = my_rep->head_counter;
713 const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
714#if TBB_USE_ASSERT
715 Value* tmp;
716 my_rep->get_item(tmp,k);
717 __TBB_ASSERT( my_item==tmp, NULL );
718#endif /* TBB_USE_ASSERT */
719 size_t i = modulo_power_of_two( k/concurrent_queue_rep<Value>::n_queue, queue.my_rep->items_per_page );
720 if( i==queue.my_rep->items_per_page-1 ) {
721 typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
722 root = root->next;
723 }
724 // advance k
725 my_rep->head_counter = ++k;
726 if( !my_rep->get_item(my_item, k) ) advance();
727}
728
729//! Similar to C++0x std::remove_cv
730/** "tbb_" prefix added to avoid overload confusion with C++0x implementations. */
731template<typename T> struct tbb_remove_cv {typedef T type;};
732template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
733template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
734template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
735
736//! Meets requirements of a forward iterator for STL.
737/** Value is either the T or const T type of the container.
738 @ingroup containers */
739template<typename Container, typename Value>
740class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
741 public std::iterator<std::forward_iterator_tag,Value> {
742#if !__TBB_TEMPLATE_FRIENDS_BROKEN
743 template<typename T, class A>
744 friend class ::tbb::strict_ppl::concurrent_queue;
745#else
746public: // workaround for MSVC
747#endif
748 //! Construct iterator pointing to head of queue.
749 concurrent_queue_iterator( const concurrent_queue_base_v3<Value>& queue ) :
750 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
751 {
752 }
753
754public:
755 concurrent_queue_iterator() {}
756
757 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
758 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
759 {}
760
761 //! Iterator assignment
762 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
763 this->assign(other);
764 return *this;
765 }
766
767 //! Reference to current item
768 Value& operator*() const {
769 return *static_cast<Value*>(this->my_item);
770 }
771
772 Value* operator->() const {return &operator*();}
773
774 //! Advance to next item in queue
775 concurrent_queue_iterator& operator++() {
776 this->advance();
777 return *this;
778 }
779
780 //! Post increment
781 Value* operator++(int) {
782 Value* result = &operator*();
783 operator++();
784 return result;
785 }
786}; // concurrent_queue_iterator
787
788
789template<typename C, typename T, typename U>
790bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
791 return i.my_item==j.my_item;
792}
793
794template<typename C, typename T, typename U>
795bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
796 return i.my_item!=j.my_item;
797}
798
799} // namespace internal
800
801//! @endcond
802
803} // namespace strict_ppl
804
805//! @cond INTERNAL
806namespace internal {
807
808class concurrent_queue_rep;
809class concurrent_queue_iterator_rep;
810class concurrent_queue_iterator_base_v3;
811template<typename Container, typename Value> class concurrent_queue_iterator;
812
813//! For internal use only.
814/** Type-independent portion of concurrent_queue.
815 @ingroup containers */
816class concurrent_queue_base_v3: no_copy {
817 //! Internal representation
818 concurrent_queue_rep* my_rep;
819
820 friend class concurrent_queue_rep;
821 friend struct micro_queue;
822 friend class micro_queue_pop_finalizer;
823 friend class concurrent_queue_iterator_rep;
824 friend class concurrent_queue_iterator_base_v3;
825protected:
826 //! Prefix on a page
827 struct page {
828 page* next;
829 uintptr_t mask;
830 };
831
832 //! Capacity of the queue
833 ptrdiff_t my_capacity;
834
835 //! Always a power of 2
836 size_t items_per_page;
837
838 //! Size of an item
839 size_t item_size;
840
841#if __TBB_PROTECTED_NESTED_CLASS_BROKEN
842public:
843#endif
844 template<typename T>
845 struct padded_page: page {
846 //! Not defined anywhere - exists to quiet warnings.
847 padded_page();
848 //! Not defined anywhere - exists to quiet warnings.
849 void operator=( const padded_page& );
850 //! Must be last field.
851 T last;
852 };
853
854private:
855 virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
856 virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
857protected:
858 __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
859 virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
860
861 //! Enqueue item at tail of queue
862 void __TBB_EXPORTED_METHOD internal_push( const void* src );
863
864 //! Dequeue item from head of queue
865 void __TBB_EXPORTED_METHOD internal_pop( void* dst );
866
867 //! Abort all pending queue operations
868 void __TBB_EXPORTED_METHOD internal_abort();
869
870 //! Attempt to enqueue item onto queue.
871 bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
872
873 //! Attempt to dequeue item from queue.
874 /** NULL if there was no item to dequeue. */
875 bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
876
877 //! Get size of queue
878 ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
879
880 //! Check if the queue is emtpy
881 bool __TBB_EXPORTED_METHOD internal_empty() const;
882
883 //! Set the queue capacity
884 void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
885
886 //! custom allocator
887 virtual page *allocate_page() = 0;
888
889 //! custom de-allocator
890 virtual void deallocate_page( page *p ) = 0;
891
892 //! free any remaining pages
893 /* note that the name may be misleading, but it remains so due to a historical accident. */
894 void __TBB_EXPORTED_METHOD internal_finish_clear() ;
895
896 //! throw an exception
897 void __TBB_EXPORTED_METHOD internal_throw_exception() const;
898
899 //! copy internal representation
900 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
901
902private:
903 virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
904};
905
906//! Type-independent portion of concurrent_queue_iterator.
907/** @ingroup containers */
908class concurrent_queue_iterator_base_v3 {
909 //! concurrent_queue over which we are iterating.
910 /** NULL if one past last element in queue. */
911 concurrent_queue_iterator_rep* my_rep;
912
913 template<typename C, typename T, typename U>
914 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
915
916 template<typename C, typename T, typename U>
917 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
918
919 void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
920protected:
921 //! Pointer to current item
922 void* my_item;
923
924 //! Default constructor
925 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
926
927 //! Copy constructor
928 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
929 assign(i);
930 }
931
932 //! Obsolete entry point for constructing iterator pointing to head of queue.
933 /** Does not work correctly for SSE types. */
934 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue );
935
936 //! Construct iterator pointing to head of queue.
937 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue, size_t offset_of_data );
938
939 //! Assignment
940 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
941
942 //! Advance iterator one step towards tail of queue.
943 void __TBB_EXPORTED_METHOD advance();
944
945 //! Destructor
946 __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
947};
948
949typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
950
951//! Meets requirements of a forward iterator for STL.
952/** Value is either the T or const T type of the container.
953 @ingroup containers */
954template<typename Container, typename Value>
955class concurrent_queue_iterator: public concurrent_queue_iterator_base,
956 public std::iterator<std::forward_iterator_tag,Value> {
957
958#if !defined(_MSC_VER) || defined(__INTEL_COMPILER)
959 template<typename T, class A>
960 friend class ::tbb::concurrent_bounded_queue;
961
962 template<typename T, class A>
963 friend class ::tbb::deprecated::concurrent_queue;
964#else
965public: // workaround for MSVC
966#endif
967 //! Construct iterator pointing to head of queue.
968 concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) :
969 concurrent_queue_iterator_base_v3(queue,__TBB_offsetof(concurrent_queue_base_v3::padded_page<Value>,last))
970 {
971 }
972
973public:
974 concurrent_queue_iterator() {}
975
976 /** If Value==Container::value_type, then this routine is the copy constructor.
977 If Value==const Container::value_type, then this routine is a conversion constructor. */
978 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
979 concurrent_queue_iterator_base_v3(other)
980 {}
981
982 //! Iterator assignment
983 concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) {
984 assign(other);
985 return *this;
986 }
987
988 //! Reference to current item
989 Value& operator*() const {
990 return *static_cast<Value*>(my_item);
991 }
992
993 Value* operator->() const {return &operator*();}
994
995 //! Advance to next item in queue
996 concurrent_queue_iterator& operator++() {
997 advance();
998 return *this;
999 }
1000
1001 //! Post increment
1002 Value* operator++(int) {
1003 Value* result = &operator*();
1004 operator++();
1005 return result;
1006 }
1007}; // concurrent_queue_iterator
1008
1009
1010template<typename C, typename T, typename U>
1011bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
1012 return i.my_item==j.my_item;
1013}
1014
1015template<typename C, typename T, typename U>
1016bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
1017 return i.my_item!=j.my_item;
1018}
1019
1020} // namespace internal;
1021
1022//! @endcond
1023
1024} // namespace tbb
1025
1026#endif /* __TBB__concurrent_queue_impl_H */
1027