1 | /* |
2 | Copyright 2005-2013 Intel Corporation. All Rights Reserved. |
3 | |
4 | This file is part of Threading Building Blocks. |
5 | |
6 | Threading Building Blocks is free software; you can redistribute it |
7 | and/or modify it under the terms of the GNU General Public License |
8 | version 2 as published by the Free Software Foundation. |
9 | |
10 | Threading Building Blocks is distributed in the hope that it will be |
11 | useful, but WITHOUT ANY WARRANTY; without even the implied warranty |
12 | of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | GNU General Public License for more details. |
14 | |
15 | You should have received a copy of the GNU General Public License |
16 | along with Threading Building Blocks; if not, write to the Free Software |
17 | Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA |
18 | |
19 | As a special exception, you may use this file as part of a free software |
20 | library without restriction. Specifically, if other files instantiate |
21 | templates or use macros or inline functions from this file, or you compile |
22 | this file and link it with other files to produce an executable, this |
23 | file does not by itself cause the resulting executable to be covered by |
24 | the GNU General Public License. This exception does not however |
25 | invalidate any other reasons why the executable file might be covered by |
26 | the GNU General Public License. |
27 | */ |
28 | |
29 | #ifndef __TBB__concurrent_queue_impl_H |
30 | #define __TBB__concurrent_queue_impl_H |
31 | |
32 | #ifndef __TBB_concurrent_queue_H |
33 | #error Do not #include this internal file directly; use public TBB headers instead. |
34 | #endif |
35 | |
36 | #include "../tbb_stddef.h" |
37 | #include "../tbb_machine.h" |
38 | #include "../atomic.h" |
39 | #include "../spin_mutex.h" |
40 | #include "../cache_aligned_allocator.h" |
41 | #include "../tbb_exception.h" |
42 | #include "../tbb_profiling.h" |
43 | #include <new> |
44 | |
45 | #if !TBB_USE_EXCEPTIONS && _MSC_VER |
46 | // Suppress "C++ exception handler used, but unwind semantics are not enabled" warning in STL headers |
47 | #pragma warning (push) |
48 | #pragma warning (disable: 4530) |
49 | #endif |
50 | |
51 | #include <iterator> |
52 | |
53 | #if !TBB_USE_EXCEPTIONS && _MSC_VER |
54 | #pragma warning (pop) |
55 | #endif |
56 | |
57 | namespace tbb { |
58 | |
59 | #if !__TBB_TEMPLATE_FRIENDS_BROKEN |
60 | |
61 | // forward declaration |
62 | namespace strict_ppl { |
63 | template<typename T, typename A> class concurrent_queue; |
64 | } |
65 | |
66 | template<typename T, typename A> class concurrent_bounded_queue; |
67 | |
68 | namespace deprecated { |
69 | template<typename T, typename A> class concurrent_queue; |
70 | } |
71 | #endif |
72 | |
73 | //! For internal use only. |
74 | namespace strict_ppl { |
75 | |
76 | //! @cond INTERNAL |
77 | namespace internal { |
78 | |
79 | using namespace tbb::internal; |
80 | |
81 | typedef size_t ticket; |
82 | |
83 | template<typename T> class micro_queue ; |
84 | template<typename T> class micro_queue_pop_finalizer ; |
85 | template<typename T> class concurrent_queue_base_v3; |
86 | |
87 | //! parts of concurrent_queue_rep that do not have references to micro_queue |
88 | /** |
89 | * For internal use only. |
90 | */ |
91 | struct concurrent_queue_rep_base : no_copy { |
92 | template<typename T> friend class micro_queue; |
93 | template<typename T> friend class concurrent_queue_base_v3; |
94 | |
95 | protected: |
96 | //! Approximately n_queue/golden ratio |
97 | static const size_t phi = 3; |
98 | |
99 | public: |
100 | // must be power of 2 |
101 | static const size_t n_queue = 8; |
102 | |
103 | //! Prefix on a page |
104 | struct page { |
105 | page* next; |
106 | uintptr_t mask; |
107 | }; |
108 | |
109 | atomic<ticket> head_counter; |
110 | char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)]; |
111 | atomic<ticket> tail_counter; |
112 | char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)]; |
113 | |
114 | //! Always a power of 2 |
115 | size_t items_per_page; |
116 | |
117 | //! Size of an item |
118 | size_t item_size; |
119 | |
120 | //! number of invalid entries in the queue |
121 | atomic<size_t> n_invalid_entries; |
122 | |
123 | char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)]; |
124 | } ; |
125 | |
126 | inline bool is_valid_page(const concurrent_queue_rep_base::page* p) { |
127 | return uintptr_t(p)>1; |
128 | } |
129 | |
130 | //! Abstract class to define interface for page allocation/deallocation |
131 | /** |
132 | * For internal use only. |
133 | */ |
134 | class concurrent_queue_page_allocator |
135 | { |
136 | template<typename T> friend class micro_queue ; |
137 | template<typename T> friend class micro_queue_pop_finalizer ; |
138 | protected: |
139 | virtual ~concurrent_queue_page_allocator() {} |
140 | private: |
141 | virtual concurrent_queue_rep_base::page* allocate_page() = 0; |
142 | virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0; |
143 | } ; |
144 | |
145 | #if _MSC_VER && !defined(__INTEL_COMPILER) |
146 | // unary minus operator applied to unsigned type, result still unsigned |
147 | #pragma warning( push ) |
148 | #pragma warning( disable: 4146 ) |
149 | #endif |
150 | |
151 | //! A queue using simple locking. |
152 | /** For efficiency, this class has no constructor. |
153 | The caller is expected to zero-initialize it. */ |
154 | template<typename T> |
155 | class micro_queue : no_copy { |
156 | typedef concurrent_queue_rep_base::page page; |
157 | |
158 | //! Class used to ensure exception-safety of method "pop" |
159 | class destroyer: no_copy { |
160 | T& my_value; |
161 | public: |
162 | destroyer( T& value ) : my_value(value) {} |
163 | ~destroyer() {my_value.~T();} |
164 | }; |
165 | |
166 | void copy_item( page& dst, size_t index, const void* src ) { |
167 | new( &get_ref(dst,index) ) T(*static_cast<const T*>(src)); |
168 | } |
169 | |
170 | void copy_item( page& dst, size_t dindex, const page& src, size_t sindex ) { |
171 | new( &get_ref(dst,dindex) ) T( get_ref(const_cast<page&>(src),sindex) ); |
172 | } |
173 | |
174 | void assign_and_destroy_item( void* dst, page& src, size_t index ) { |
175 | T& from = get_ref(src,index); |
176 | destroyer d(from); |
177 | *static_cast<T*>(dst) = from; |
178 | } |
179 | |
180 | void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ; |
181 | |
182 | public: |
183 | friend class micro_queue_pop_finalizer<T>; |
184 | |
185 | struct padded_page: page { |
186 | //! Not defined anywhere - exists to quiet warnings. |
187 | padded_page(); |
188 | //! Not defined anywhere - exists to quiet warnings. |
189 | void operator=( const padded_page& ); |
190 | //! Must be last field. |
191 | T last; |
192 | }; |
193 | |
194 | static T& get_ref( page& p, size_t index ) { |
195 | return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index]; |
196 | } |
197 | |
198 | atomic<page*> head_page; |
199 | atomic<ticket> head_counter; |
200 | |
201 | atomic<page*> tail_page; |
202 | atomic<ticket> tail_counter; |
203 | |
204 | spin_mutex page_mutex; |
205 | |
206 | void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) ; |
207 | |
208 | bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ; |
209 | |
210 | micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base ) ; |
211 | |
212 | page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) ; |
213 | |
214 | void invalidate_page_and_rethrow( ticket k ) ; |
215 | }; |
216 | |
217 | template<typename T> |
218 | void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const { |
219 | for( atomic_backoff b(true);;b.pause() ) { |
220 | ticket c = counter; |
221 | if( c==k ) return; |
222 | else if( c&1 ) { |
223 | ++rb.n_invalid_entries; |
224 | throw_exception( eid_bad_last_alloc ); |
225 | } |
226 | } |
227 | } |
228 | |
229 | template<typename T> |
230 | void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base ) { |
231 | k &= -concurrent_queue_rep_base::n_queue; |
232 | page* p = NULL; |
233 | size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page); |
234 | if( !index ) { |
235 | __TBB_TRY { |
236 | concurrent_queue_page_allocator& pa = base; |
237 | p = pa.allocate_page(); |
238 | } __TBB_CATCH (...) { |
239 | ++base.my_rep->n_invalid_entries; |
240 | invalidate_page_and_rethrow( k ); |
241 | } |
242 | p->mask = 0; |
243 | p->next = NULL; |
244 | } |
245 | |
246 | if( tail_counter!=k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep ); |
247 | call_itt_notify(acquired, &tail_counter); |
248 | |
249 | if( p ) { |
250 | spin_mutex::scoped_lock lock( page_mutex ); |
251 | page* q = tail_page; |
252 | if( is_valid_page(q) ) |
253 | q->next = p; |
254 | else |
255 | head_page = p; |
256 | tail_page = p; |
257 | } else { |
258 | p = tail_page; |
259 | } |
260 | __TBB_TRY { |
261 | copy_item( *p, index, item ); |
262 | // If no exception was thrown, mark item as present. |
263 | itt_hide_store_word(p->mask, p->mask | uintptr_t(1)<<index); |
264 | call_itt_notify(releasing, &tail_counter); |
265 | tail_counter += concurrent_queue_rep_base::n_queue; |
266 | } __TBB_CATCH (...) { |
267 | ++base.my_rep->n_invalid_entries; |
268 | call_itt_notify(releasing, &tail_counter); |
269 | tail_counter += concurrent_queue_rep_base::n_queue; |
270 | __TBB_RETHROW(); |
271 | } |
272 | } |
273 | |
274 | template<typename T> |
275 | bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) { |
276 | k &= -concurrent_queue_rep_base::n_queue; |
277 | if( head_counter!=k ) spin_wait_until_eq( head_counter, k ); |
278 | call_itt_notify(acquired, &head_counter); |
279 | if( tail_counter==k ) spin_wait_while_eq( tail_counter, k ); |
280 | call_itt_notify(acquired, &tail_counter); |
281 | page& p = *head_page; |
282 | __TBB_ASSERT( &p, NULL ); |
283 | size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page ); |
284 | bool success = false; |
285 | { |
286 | micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? &p : NULL ); |
287 | if( p.mask & uintptr_t(1)<<index ) { |
288 | success = true; |
289 | assign_and_destroy_item( dst, p, index ); |
290 | } else { |
291 | --base.my_rep->n_invalid_entries; |
292 | } |
293 | } |
294 | return success; |
295 | } |
296 | |
297 | template<typename T> |
298 | micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base ) { |
299 | head_counter = src.head_counter; |
300 | tail_counter = src.tail_counter; |
301 | page_mutex = src.page_mutex; |
302 | |
303 | const page* srcp = src.head_page; |
304 | if( is_valid_page(srcp) ) { |
305 | ticket g_index = head_counter; |
306 | __TBB_TRY { |
307 | size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue; |
308 | size_t index = modulo_power_of_two( head_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page ); |
309 | size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page; |
310 | |
311 | head_page = make_copy( base, srcp, index, end_in_first_page, g_index ); |
312 | page* cur_page = head_page; |
313 | |
314 | if( srcp != src.tail_page ) { |
315 | for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) { |
316 | cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index ); |
317 | cur_page = cur_page->next; |
318 | } |
319 | |
320 | __TBB_ASSERT( srcp==src.tail_page, NULL ); |
321 | size_t last_index = modulo_power_of_two( tail_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page ); |
322 | if( last_index==0 ) last_index = base.my_rep->items_per_page; |
323 | |
324 | cur_page->next = make_copy( base, srcp, 0, last_index, g_index ); |
325 | cur_page = cur_page->next; |
326 | } |
327 | tail_page = cur_page; |
328 | } __TBB_CATCH (...) { |
329 | invalidate_page_and_rethrow( g_index ); |
330 | } |
331 | } else { |
332 | head_page = tail_page = NULL; |
333 | } |
334 | return *this; |
335 | } |
336 | |
337 | template<typename T> |
338 | void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) { |
339 | // Append an invalid page at address 1 so that no more pushes are allowed. |
340 | page* invalid_page = (page*)uintptr_t(1); |
341 | { |
342 | spin_mutex::scoped_lock lock( page_mutex ); |
343 | itt_store_word_with_release(tail_counter, k+concurrent_queue_rep_base::n_queue+1); |
344 | page* q = tail_page; |
345 | if( is_valid_page(q) ) |
346 | q->next = invalid_page; |
347 | else |
348 | head_page = invalid_page; |
349 | tail_page = invalid_page; |
350 | } |
351 | __TBB_RETHROW(); |
352 | } |
353 | |
354 | template<typename T> |
355 | concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base, const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page, ticket& g_index ) { |
356 | concurrent_queue_page_allocator& pa = base; |
357 | page* new_page = pa.allocate_page(); |
358 | new_page->next = NULL; |
359 | new_page->mask = src_page->mask; |
360 | for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index ) |
361 | if( new_page->mask & uintptr_t(1)<<begin_in_page ) |
362 | copy_item( *new_page, begin_in_page, *src_page, begin_in_page ); |
363 | return new_page; |
364 | } |
365 | |
366 | template<typename T> |
367 | class micro_queue_pop_finalizer: no_copy { |
368 | typedef concurrent_queue_rep_base::page page; |
369 | ticket my_ticket; |
370 | micro_queue<T>& my_queue; |
371 | page* my_page; |
372 | concurrent_queue_page_allocator& allocator; |
373 | public: |
374 | micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) : |
375 | my_ticket(k), my_queue(queue), my_page(p), allocator(b) |
376 | {} |
377 | ~micro_queue_pop_finalizer() ; |
378 | }; |
379 | |
380 | template<typename T> |
381 | micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() { |
382 | page* p = my_page; |
383 | if( is_valid_page(p) ) { |
384 | spin_mutex::scoped_lock lock( my_queue.page_mutex ); |
385 | page* q = p->next; |
386 | my_queue.head_page = q; |
387 | if( !is_valid_page(q) ) { |
388 | my_queue.tail_page = NULL; |
389 | } |
390 | } |
391 | itt_store_word_with_release(my_queue.head_counter, my_ticket); |
392 | if( is_valid_page(p) ) { |
393 | allocator.deallocate_page( p ); |
394 | } |
395 | } |
396 | |
397 | #if _MSC_VER && !defined(__INTEL_COMPILER) |
398 | #pragma warning( pop ) |
399 | #endif // warning 4146 is back |
400 | |
401 | template<typename T> class concurrent_queue_iterator_rep ; |
402 | template<typename T> class concurrent_queue_iterator_base_v3; |
403 | |
404 | //! representation of concurrent_queue_base |
405 | /** |
406 | * the class inherits from concurrent_queue_rep_base and defines an array of micro_queue<T>'s |
407 | */ |
408 | template<typename T> |
409 | struct concurrent_queue_rep : public concurrent_queue_rep_base { |
410 | micro_queue<T> array[n_queue]; |
411 | |
412 | //! Map ticket to an array index |
413 | static size_t index( ticket k ) { |
414 | return k*phi%n_queue; |
415 | } |
416 | |
417 | micro_queue<T>& choose( ticket k ) { |
418 | // The formula here approximates LRU in a cache-oblivious way. |
419 | return array[index(k)]; |
420 | } |
421 | }; |
422 | |
423 | //! base class of concurrent_queue |
424 | /** |
425 | * The class implements the interface defined by concurrent_queue_page_allocator |
426 | * and has a pointer to an instance of concurrent_queue_rep. |
427 | */ |
428 | template<typename T> |
429 | class concurrent_queue_base_v3: public concurrent_queue_page_allocator { |
430 | //! Internal representation |
431 | concurrent_queue_rep<T>* my_rep; |
432 | |
433 | friend struct concurrent_queue_rep<T>; |
434 | friend class micro_queue<T>; |
435 | friend class concurrent_queue_iterator_rep<T>; |
436 | friend class concurrent_queue_iterator_base_v3<T>; |
437 | |
438 | protected: |
439 | typedef typename concurrent_queue_rep<T>::page page; |
440 | |
441 | private: |
442 | typedef typename micro_queue<T>::padded_page padded_page; |
443 | |
444 | /* override */ virtual page *allocate_page() { |
445 | concurrent_queue_rep<T>& r = *my_rep; |
446 | size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T); |
447 | return reinterpret_cast<page*>(allocate_block ( n )); |
448 | } |
449 | |
450 | /* override */ virtual void deallocate_page( concurrent_queue_rep_base::page *p ) { |
451 | concurrent_queue_rep<T>& r = *my_rep; |
452 | size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T); |
453 | deallocate_block( reinterpret_cast<void*>(p), n ); |
454 | } |
455 | |
456 | //! custom allocator |
457 | virtual void *allocate_block( size_t n ) = 0; |
458 | |
459 | //! custom de-allocator |
460 | virtual void deallocate_block( void *p, size_t n ) = 0; |
461 | |
462 | protected: |
463 | concurrent_queue_base_v3(); |
464 | |
465 | /* override */ virtual ~concurrent_queue_base_v3() { |
466 | #if TBB_USE_ASSERT |
467 | size_t nq = my_rep->n_queue; |
468 | for( size_t i=0; i<nq; i++ ) |
469 | __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" ); |
470 | #endif /* TBB_USE_ASSERT */ |
471 | cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1); |
472 | } |
473 | |
474 | //! Enqueue item at tail of queue |
475 | void internal_push( const void* src ) { |
476 | concurrent_queue_rep<T>& r = *my_rep; |
477 | ticket k = r.tail_counter++; |
478 | r.choose(k).push( src, k, *this ); |
479 | } |
480 | |
481 | //! Attempt to dequeue item from queue. |
482 | /** NULL if there was no item to dequeue. */ |
483 | bool internal_try_pop( void* dst ) ; |
484 | |
485 | //! Get size of queue; result may be invalid if queue is modified concurrently |
486 | size_t internal_size() const ; |
487 | |
488 | //! check if the queue is empty; thread safe |
489 | bool internal_empty() const ; |
490 | |
491 | //! free any remaining pages |
492 | /* note that the name may be misleading, but it remains so due to a historical accident. */ |
493 | void internal_finish_clear() ; |
494 | |
495 | //! Obsolete |
496 | void internal_throw_exception() const { |
497 | throw_exception( eid_bad_alloc ); |
498 | } |
499 | |
500 | //! copy internal representation |
501 | void assign( const concurrent_queue_base_v3& src ) ; |
502 | }; |
503 | |
504 | template<typename T> |
505 | concurrent_queue_base_v3<T>::concurrent_queue_base_v3() { |
506 | const size_t item_size = sizeof(T); |
507 | my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1); |
508 | __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" ); |
509 | __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" ); |
510 | __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" ); |
511 | __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" ); |
512 | memset(my_rep,0,sizeof(concurrent_queue_rep<T>)); |
513 | my_rep->item_size = item_size; |
514 | my_rep->items_per_page = item_size<= 8 ? 32 : |
515 | item_size<= 16 ? 16 : |
516 | item_size<= 32 ? 8 : |
517 | item_size<= 64 ? 4 : |
518 | item_size<=128 ? 2 : |
519 | 1; |
520 | } |
521 | |
522 | template<typename T> |
523 | bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) { |
524 | concurrent_queue_rep<T>& r = *my_rep; |
525 | ticket k; |
526 | do { |
527 | k = r.head_counter; |
528 | for(;;) { |
529 | if( (ptrdiff_t)(r.tail_counter-k)<=0 ) { |
530 | // Queue is empty |
531 | return false; |
532 | } |
533 | // Queue had item with ticket k when we looked. Attempt to get that item. |
534 | ticket tk=k; |
535 | #if defined(_MSC_VER) && defined(_Wp64) |
536 | #pragma warning (push) |
537 | #pragma warning (disable: 4267) |
538 | #endif |
539 | k = r.head_counter.compare_and_swap( tk+1, tk ); |
540 | #if defined(_MSC_VER) && defined(_Wp64) |
541 | #pragma warning (pop) |
542 | #endif |
543 | if( k==tk ) |
544 | break; |
545 | // Another thread snatched the item, retry. |
546 | } |
547 | } while( !r.choose( k ).pop( dst, k, *this ) ); |
548 | return true; |
549 | } |
550 | |
551 | template<typename T> |
552 | size_t concurrent_queue_base_v3<T>::internal_size() const { |
553 | concurrent_queue_rep<T>& r = *my_rep; |
554 | __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL ); |
555 | ticket hc = r.head_counter; |
556 | size_t nie = r.n_invalid_entries; |
557 | ticket tc = r.tail_counter; |
558 | __TBB_ASSERT( hc!=tc || !nie, NULL ); |
559 | ptrdiff_t sz = tc-hc-nie; |
560 | return sz<0 ? 0 : size_t(sz); |
561 | } |
562 | |
563 | template<typename T> |
564 | bool concurrent_queue_base_v3<T>::internal_empty() const { |
565 | concurrent_queue_rep<T>& r = *my_rep; |
566 | ticket tc = r.tail_counter; |
567 | ticket hc = r.head_counter; |
568 | // if tc!=r.tail_counter, the queue was not empty at some point between the two reads. |
569 | return tc==r.tail_counter && tc==hc+r.n_invalid_entries ; |
570 | } |
571 | |
572 | template<typename T> |
573 | void concurrent_queue_base_v3<T>::internal_finish_clear() { |
574 | concurrent_queue_rep<T>& r = *my_rep; |
575 | size_t nq = r.n_queue; |
576 | for( size_t i=0; i<nq; ++i ) { |
577 | page* tp = r.array[i].tail_page; |
578 | if( is_valid_page(tp) ) { |
579 | __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" ); |
580 | deallocate_page( tp ); |
581 | r.array[i].tail_page = NULL; |
582 | } else |
583 | __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" ); |
584 | } |
585 | } |
586 | |
587 | template<typename T> |
588 | void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src ) { |
589 | concurrent_queue_rep<T>& r = *my_rep; |
590 | r.items_per_page = src.my_rep->items_per_page; |
591 | |
592 | // copy concurrent_queue_rep. |
593 | r.head_counter = src.my_rep->head_counter; |
594 | r.tail_counter = src.my_rep->tail_counter; |
595 | r.n_invalid_entries = src.my_rep->n_invalid_entries; |
596 | |
597 | // copy micro_queues |
598 | for( size_t i = 0; i<r.n_queue; ++i ) |
599 | r.array[i].assign( src.my_rep->array[i], *this); |
600 | |
601 | __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter, |
602 | "the source concurrent queue should not be concurrently modified." ); |
603 | } |
604 | |
605 | template<typename Container, typename Value> class concurrent_queue_iterator; |
606 | |
607 | template<typename T> |
608 | class concurrent_queue_iterator_rep: no_assign { |
609 | typedef typename micro_queue<T>::padded_page padded_page; |
610 | public: |
611 | ticket head_counter; |
612 | const concurrent_queue_base_v3<T>& my_queue; |
613 | typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue]; |
614 | concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) : |
615 | head_counter(queue.my_rep->head_counter), |
616 | my_queue(queue) |
617 | { |
618 | for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k ) |
619 | array[k] = queue.my_rep->array[k].head_page; |
620 | } |
621 | |
622 | //! Set item to point to kth element. Return true if at end of queue or item is marked valid; false otherwise. |
623 | bool get_item( T*& item, size_t k ) ; |
624 | }; |
625 | |
626 | template<typename T> |
627 | bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) { |
628 | if( k==my_queue.my_rep->tail_counter ) { |
629 | item = NULL; |
630 | return true; |
631 | } else { |
632 | typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)]; |
633 | __TBB_ASSERT(p,NULL); |
634 | size_t i = modulo_power_of_two( k/concurrent_queue_rep<T>::n_queue, my_queue.my_rep->items_per_page ); |
635 | item = µ_queue<T>::get_ref(*p,i); |
636 | return (p->mask & uintptr_t(1)<<i)!=0; |
637 | } |
638 | } |
639 | |
640 | //! Constness-independent portion of concurrent_queue_iterator. |
641 | /** @ingroup containers */ |
642 | template<typename Value> |
643 | class concurrent_queue_iterator_base_v3 : no_assign { |
644 | //! Represents concurrent_queue over which we are iterating. |
645 | /** NULL if one past last element in queue. */ |
646 | concurrent_queue_iterator_rep<Value>* my_rep; |
647 | |
648 | template<typename C, typename T, typename U> |
649 | friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ); |
650 | |
651 | template<typename C, typename T, typename U> |
652 | friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ); |
653 | protected: |
654 | //! Pointer to current item |
655 | Value* my_item; |
656 | |
657 | //! Default constructor |
658 | concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) { |
659 | #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN |
660 | __TBB_compiler_fence(); |
661 | #endif |
662 | } |
663 | |
664 | //! Copy constructor |
665 | concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) |
666 | : no_assign(), my_rep(NULL), my_item(NULL) { |
667 | assign(i); |
668 | } |
669 | |
670 | //! Construct iterator pointing to head of queue. |
671 | concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ; |
672 | |
673 | //! Assignment |
674 | void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ; |
675 | |
676 | //! Advance iterator one step towards tail of queue. |
677 | void advance() ; |
678 | |
679 | //! Destructor |
680 | ~concurrent_queue_iterator_base_v3() { |
681 | cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1); |
682 | my_rep = NULL; |
683 | } |
684 | }; |
685 | |
686 | template<typename Value> |
687 | concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) { |
688 | my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1); |
689 | new( my_rep ) concurrent_queue_iterator_rep<Value>(queue); |
690 | size_t k = my_rep->head_counter; |
691 | if( !my_rep->get_item(my_item, k) ) advance(); |
692 | } |
693 | |
694 | template<typename Value> |
695 | void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) { |
696 | if( my_rep!=other.my_rep ) { |
697 | if( my_rep ) { |
698 | cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1); |
699 | my_rep = NULL; |
700 | } |
701 | if( other.my_rep ) { |
702 | my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1); |
703 | new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep ); |
704 | } |
705 | } |
706 | my_item = other.my_item; |
707 | } |
708 | |
709 | template<typename Value> |
710 | void concurrent_queue_iterator_base_v3<Value>::advance() { |
711 | __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" ); |
712 | size_t k = my_rep->head_counter; |
713 | const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue; |
714 | #if TBB_USE_ASSERT |
715 | Value* tmp; |
716 | my_rep->get_item(tmp,k); |
717 | __TBB_ASSERT( my_item==tmp, NULL ); |
718 | #endif /* TBB_USE_ASSERT */ |
719 | size_t i = modulo_power_of_two( k/concurrent_queue_rep<Value>::n_queue, queue.my_rep->items_per_page ); |
720 | if( i==queue.my_rep->items_per_page-1 ) { |
721 | typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)]; |
722 | root = root->next; |
723 | } |
724 | // advance k |
725 | my_rep->head_counter = ++k; |
726 | if( !my_rep->get_item(my_item, k) ) advance(); |
727 | } |
728 | |
729 | //! Similar to C++0x std::remove_cv |
730 | /** "tbb_" prefix added to avoid overload confusion with C++0x implementations. */ |
731 | template<typename T> struct tbb_remove_cv {typedef T type;}; |
732 | template<typename T> struct tbb_remove_cv<const T> {typedef T type;}; |
733 | template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;}; |
734 | template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;}; |
735 | |
736 | //! Meets requirements of a forward iterator for STL. |
737 | /** Value is either the T or const T type of the container. |
738 | @ingroup containers */ |
739 | template<typename Container, typename Value> |
740 | class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>, |
741 | public std::iterator<std::forward_iterator_tag,Value> { |
742 | #if !__TBB_TEMPLATE_FRIENDS_BROKEN |
743 | template<typename T, class A> |
744 | friend class ::tbb::strict_ppl::concurrent_queue; |
745 | #else |
746 | public: // workaround for MSVC |
747 | #endif |
748 | //! Construct iterator pointing to head of queue. |
749 | concurrent_queue_iterator( const concurrent_queue_base_v3<Value>& queue ) : |
750 | concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue) |
751 | { |
752 | } |
753 | |
754 | public: |
755 | concurrent_queue_iterator() {} |
756 | |
757 | concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) : |
758 | concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other) |
759 | {} |
760 | |
761 | //! Iterator assignment |
762 | concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) { |
763 | this->assign(other); |
764 | return *this; |
765 | } |
766 | |
767 | //! Reference to current item |
768 | Value& operator*() const { |
769 | return *static_cast<Value*>(this->my_item); |
770 | } |
771 | |
772 | Value* operator->() const {return &operator*();} |
773 | |
774 | //! Advance to next item in queue |
775 | concurrent_queue_iterator& operator++() { |
776 | this->advance(); |
777 | return *this; |
778 | } |
779 | |
780 | //! Post increment |
781 | Value* operator++(int) { |
782 | Value* result = &operator*(); |
783 | operator++(); |
784 | return result; |
785 | } |
786 | }; // concurrent_queue_iterator |
787 | |
788 | |
789 | template<typename C, typename T, typename U> |
790 | bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) { |
791 | return i.my_item==j.my_item; |
792 | } |
793 | |
794 | template<typename C, typename T, typename U> |
795 | bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) { |
796 | return i.my_item!=j.my_item; |
797 | } |
798 | |
799 | } // namespace internal |
800 | |
801 | //! @endcond |
802 | |
803 | } // namespace strict_ppl |
804 | |
805 | //! @cond INTERNAL |
806 | namespace internal { |
807 | |
808 | class concurrent_queue_rep; |
809 | class concurrent_queue_iterator_rep; |
810 | class concurrent_queue_iterator_base_v3; |
811 | template<typename Container, typename Value> class concurrent_queue_iterator; |
812 | |
813 | //! For internal use only. |
814 | /** Type-independent portion of concurrent_queue. |
815 | @ingroup containers */ |
816 | class concurrent_queue_base_v3: no_copy { |
817 | //! Internal representation |
818 | concurrent_queue_rep* my_rep; |
819 | |
820 | friend class concurrent_queue_rep; |
821 | friend struct micro_queue; |
822 | friend class micro_queue_pop_finalizer; |
823 | friend class concurrent_queue_iterator_rep; |
824 | friend class concurrent_queue_iterator_base_v3; |
825 | protected: |
826 | //! Prefix on a page |
827 | struct page { |
828 | page* next; |
829 | uintptr_t mask; |
830 | }; |
831 | |
832 | //! Capacity of the queue |
833 | ptrdiff_t my_capacity; |
834 | |
835 | //! Always a power of 2 |
836 | size_t items_per_page; |
837 | |
838 | //! Size of an item |
839 | size_t item_size; |
840 | |
841 | #if __TBB_PROTECTED_NESTED_CLASS_BROKEN |
842 | public: |
843 | #endif |
844 | template<typename T> |
845 | struct padded_page: page { |
846 | //! Not defined anywhere - exists to quiet warnings. |
847 | padded_page(); |
848 | //! Not defined anywhere - exists to quiet warnings. |
849 | void operator=( const padded_page& ); |
850 | //! Must be last field. |
851 | T last; |
852 | }; |
853 | |
854 | private: |
855 | virtual void copy_item( page& dst, size_t index, const void* src ) = 0; |
856 | virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0; |
857 | protected: |
858 | __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size ); |
859 | virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3(); |
860 | |
861 | //! Enqueue item at tail of queue |
862 | void __TBB_EXPORTED_METHOD internal_push( const void* src ); |
863 | |
864 | //! Dequeue item from head of queue |
865 | void __TBB_EXPORTED_METHOD internal_pop( void* dst ); |
866 | |
867 | //! Abort all pending queue operations |
868 | void __TBB_EXPORTED_METHOD internal_abort(); |
869 | |
870 | //! Attempt to enqueue item onto queue. |
871 | bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src ); |
872 | |
873 | //! Attempt to dequeue item from queue. |
874 | /** NULL if there was no item to dequeue. */ |
875 | bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst ); |
876 | |
877 | //! Get size of queue |
878 | ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const; |
879 | |
880 | //! Check if the queue is emtpy |
881 | bool __TBB_EXPORTED_METHOD internal_empty() const; |
882 | |
883 | //! Set the queue capacity |
884 | void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size ); |
885 | |
886 | //! custom allocator |
887 | virtual page *allocate_page() = 0; |
888 | |
889 | //! custom de-allocator |
890 | virtual void deallocate_page( page *p ) = 0; |
891 | |
892 | //! free any remaining pages |
893 | /* note that the name may be misleading, but it remains so due to a historical accident. */ |
894 | void __TBB_EXPORTED_METHOD internal_finish_clear() ; |
895 | |
896 | //! throw an exception |
897 | void __TBB_EXPORTED_METHOD internal_throw_exception() const; |
898 | |
899 | //! copy internal representation |
900 | void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ; |
901 | |
902 | private: |
903 | virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0; |
904 | }; |
905 | |
906 | //! Type-independent portion of concurrent_queue_iterator. |
907 | /** @ingroup containers */ |
908 | class concurrent_queue_iterator_base_v3 { |
909 | //! concurrent_queue over which we are iterating. |
910 | /** NULL if one past last element in queue. */ |
911 | concurrent_queue_iterator_rep* my_rep; |
912 | |
913 | template<typename C, typename T, typename U> |
914 | friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ); |
915 | |
916 | template<typename C, typename T, typename U> |
917 | friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ); |
918 | |
919 | void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data ); |
920 | protected: |
921 | //! Pointer to current item |
922 | void* my_item; |
923 | |
924 | //! Default constructor |
925 | concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {} |
926 | |
927 | //! Copy constructor |
928 | concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) { |
929 | assign(i); |
930 | } |
931 | |
932 | //! Obsolete entry point for constructing iterator pointing to head of queue. |
933 | /** Does not work correctly for SSE types. */ |
934 | __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue ); |
935 | |
936 | //! Construct iterator pointing to head of queue. |
937 | __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue, size_t offset_of_data ); |
938 | |
939 | //! Assignment |
940 | void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i ); |
941 | |
942 | //! Advance iterator one step towards tail of queue. |
943 | void __TBB_EXPORTED_METHOD advance(); |
944 | |
945 | //! Destructor |
946 | __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3(); |
947 | }; |
948 | |
949 | typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base; |
950 | |
951 | //! Meets requirements of a forward iterator for STL. |
952 | /** Value is either the T or const T type of the container. |
953 | @ingroup containers */ |
954 | template<typename Container, typename Value> |
955 | class concurrent_queue_iterator: public concurrent_queue_iterator_base, |
956 | public std::iterator<std::forward_iterator_tag,Value> { |
957 | |
958 | #if !defined(_MSC_VER) || defined(__INTEL_COMPILER) |
959 | template<typename T, class A> |
960 | friend class ::tbb::concurrent_bounded_queue; |
961 | |
962 | template<typename T, class A> |
963 | friend class ::tbb::deprecated::concurrent_queue; |
964 | #else |
965 | public: // workaround for MSVC |
966 | #endif |
967 | //! Construct iterator pointing to head of queue. |
968 | concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) : |
969 | concurrent_queue_iterator_base_v3(queue,__TBB_offsetof(concurrent_queue_base_v3::padded_page<Value>,last)) |
970 | { |
971 | } |
972 | |
973 | public: |
974 | concurrent_queue_iterator() {} |
975 | |
976 | /** If Value==Container::value_type, then this routine is the copy constructor. |
977 | If Value==const Container::value_type, then this routine is a conversion constructor. */ |
978 | concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) : |
979 | concurrent_queue_iterator_base_v3(other) |
980 | {} |
981 | |
982 | //! Iterator assignment |
983 | concurrent_queue_iterator& operator=( const concurrent_queue_iterator& other ) { |
984 | assign(other); |
985 | return *this; |
986 | } |
987 | |
988 | //! Reference to current item |
989 | Value& operator*() const { |
990 | return *static_cast<Value*>(my_item); |
991 | } |
992 | |
993 | Value* operator->() const {return &operator*();} |
994 | |
995 | //! Advance to next item in queue |
996 | concurrent_queue_iterator& operator++() { |
997 | advance(); |
998 | return *this; |
999 | } |
1000 | |
1001 | //! Post increment |
1002 | Value* operator++(int) { |
1003 | Value* result = &operator*(); |
1004 | operator++(); |
1005 | return result; |
1006 | } |
1007 | }; // concurrent_queue_iterator |
1008 | |
1009 | |
1010 | template<typename C, typename T, typename U> |
1011 | bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) { |
1012 | return i.my_item==j.my_item; |
1013 | } |
1014 | |
1015 | template<typename C, typename T, typename U> |
1016 | bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) { |
1017 | return i.my_item!=j.my_item; |
1018 | } |
1019 | |
1020 | } // namespace internal; |
1021 | |
1022 | //! @endcond |
1023 | |
1024 | } // namespace tbb |
1025 | |
1026 | #endif /* __TBB__concurrent_queue_impl_H */ |
1027 | |