1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include "tbb/tbb_stddef.h"
18#include "tbb/tbb_machine.h"
19#include "tbb/tbb_exception.h"
20// Define required to satisfy test in internal file.
21#define __TBB_concurrent_queue_H
22#include "tbb/internal/_concurrent_queue_impl.h"
23#include "concurrent_monitor.h"
24#include "itt_notify.h"
25#include <new>
26#include <cstring> // for memset()
27
28#if defined(_MSC_VER) && defined(_Wp64)
29 // Workaround for overzealous compiler warnings in /Wp64 mode
30 #pragma warning (disable: 4267)
31#endif
32
33#define RECORD_EVENTS 0
34
35
36namespace tbb {
37
38namespace internal {
39
40typedef concurrent_queue_base_v3 concurrent_queue_base;
41
42typedef size_t ticket;
43
44//! A queue using simple locking.
45/** For efficiency, this class has no constructor.
46 The caller is expected to zero-initialize it. */
47struct micro_queue {
48 typedef concurrent_queue_base::page page;
49
50 friend class micro_queue_pop_finalizer;
51
52 atomic<page*> head_page;
53 atomic<ticket> head_counter;
54
55 atomic<page*> tail_page;
56 atomic<ticket> tail_counter;
57
58 spin_mutex page_mutex;
59
60 void push( const void* item, ticket k, concurrent_queue_base& base,
61 concurrent_queue_base::copy_specifics op_type );
62
63 void abort_push( ticket k, concurrent_queue_base& base );
64
65 bool pop( void* dst, ticket k, concurrent_queue_base& base );
66
67 micro_queue& assign( const micro_queue& src, concurrent_queue_base& base,
68 concurrent_queue_base::copy_specifics op_type );
69
70 page* make_copy ( concurrent_queue_base& base, const page* src_page, size_t begin_in_page,
71 size_t end_in_page, ticket& g_index, concurrent_queue_base::copy_specifics op_type ) ;
72
73 void make_invalid( ticket k );
74};
75
76// we need to yank it out of micro_queue because of concurrent_queue_base::deallocate_page being virtual.
77class micro_queue_pop_finalizer: no_copy {
78 typedef concurrent_queue_base::page page;
79 ticket my_ticket;
80 micro_queue& my_queue;
81 page* my_page;
82 concurrent_queue_base &base;
83public:
84 micro_queue_pop_finalizer( micro_queue& queue, concurrent_queue_base& b, ticket k, page* p ) :
85 my_ticket(k), my_queue(queue), my_page(p), base(b)
86 {}
87 ~micro_queue_pop_finalizer() {
88 page* p = my_page;
89 if( p ) {
90 spin_mutex::scoped_lock lock( my_queue.page_mutex );
91 page* q = p->next;
92 my_queue.head_page = q;
93 if( !q ) {
94 my_queue.tail_page = NULL;
95 }
96 }
97 my_queue.head_counter = my_ticket;
98 if( p )
99 base.deallocate_page( p );
100 }
101};
102
103struct predicate_leq {
104 ticket t;
105 predicate_leq( ticket t_ ) : t(t_) {}
106 bool operator() ( uintptr_t p ) const {return (ticket)p<=t;}
107};
108
109//! Internal representation of a ConcurrentQueue.
110/** For efficiency, this class has no constructor.
111 The caller is expected to zero-initialize it. */
112class concurrent_queue_rep {
113public:
114private:
115 friend struct micro_queue;
116
117 //! Approximately n_queue/golden ratio
118 static const size_t phi = 3;
119
120public:
121 //! Must be power of 2
122 static const size_t n_queue = 8;
123
124 //! Map ticket to an array index
125 static size_t index( ticket k ) {
126 return k*phi%n_queue;
127 }
128
129 atomic<ticket> head_counter;
130 concurrent_monitor items_avail;
131 atomic<size_t> n_invalid_entries;
132 char pad1[NFS_MaxLineSize-((sizeof(atomic<ticket>)+sizeof(concurrent_monitor)+sizeof(atomic<size_t>))&(NFS_MaxLineSize-1))];
133
134 atomic<ticket> tail_counter;
135 concurrent_monitor slots_avail;
136 char pad2[NFS_MaxLineSize-((sizeof(atomic<ticket>)+sizeof(concurrent_monitor))&(NFS_MaxLineSize-1))];
137 micro_queue array[n_queue];
138
139 micro_queue& choose( ticket k ) {
140 // The formula here approximates LRU in a cache-oblivious way.
141 return array[index(k)];
142 }
143
144 atomic<unsigned> abort_counter;
145
146 //! Value for effective_capacity that denotes unbounded queue.
147 static const ptrdiff_t infinite_capacity = ptrdiff_t(~size_t(0)/2);
148};
149
150#if _MSC_VER && !defined(__INTEL_COMPILER)
151 // unary minus operator applied to unsigned type, result still unsigned
152 #pragma warning( push )
153 #pragma warning( disable: 4146 )
154#endif
155
156static void* static_invalid_page;
157
158//------------------------------------------------------------------------
159// micro_queue
160//------------------------------------------------------------------------
161void micro_queue::push( const void* item, ticket k, concurrent_queue_base& base,
162 concurrent_queue_base::copy_specifics op_type ) {
163 k &= -concurrent_queue_rep::n_queue;
164 page* p = NULL;
165 // find index on page where we would put the data
166 size_t index = modulo_power_of_two( k/concurrent_queue_rep::n_queue, base.items_per_page );
167 if( !index ) { // make a new page
168 __TBB_TRY {
169 p = base.allocate_page();
170 } __TBB_CATCH(...) {
171 ++base.my_rep->n_invalid_entries;
172 make_invalid( k );
173 __TBB_RETHROW();
174 }
175 p->mask = 0;
176 p->next = NULL;
177 }
178
179 // wait for my turn
180 if( tail_counter!=k ) // The developer insisted on keeping first check out of the backoff loop
181 for( atomic_backoff b(true);;b.pause() ) {
182 ticket tail = tail_counter;
183 if( tail==k ) break;
184 else if( tail&0x1 ) {
185 // no memory. throws an exception; assumes concurrent_queue_rep::n_queue>1
186 ++base.my_rep->n_invalid_entries;
187 throw_exception( eid_bad_last_alloc );
188 }
189 }
190
191 if( p ) { // page is newly allocated; insert in micro_queue
192 spin_mutex::scoped_lock lock( page_mutex );
193 if( page* q = tail_page )
194 q->next = p;
195 else
196 head_page = p;
197 tail_page = p;
198 }
199
200 if (item) {
201 p = tail_page;
202 ITT_NOTIFY( sync_acquired, p );
203 __TBB_TRY {
204 if( concurrent_queue_base::copy == op_type ) {
205 base.copy_item( *p, index, item );
206 } else {
207 __TBB_ASSERT( concurrent_queue_base::move == op_type, NULL );
208 static_cast<concurrent_queue_base_v8&>(base).move_item( *p, index, item );
209 }
210 } __TBB_CATCH(...) {
211 ++base.my_rep->n_invalid_entries;
212 tail_counter += concurrent_queue_rep::n_queue;
213 __TBB_RETHROW();
214 }
215 ITT_NOTIFY( sync_releasing, p );
216 // If no exception was thrown, mark item as present.
217 p->mask |= uintptr_t(1)<<index;
218 }
219 else // no item; this was called from abort_push
220 ++base.my_rep->n_invalid_entries;
221
222 tail_counter += concurrent_queue_rep::n_queue;
223}
224
225
226void micro_queue::abort_push( ticket k, concurrent_queue_base& base ) {
227 push(NULL, k, base, concurrent_queue_base::copy);
228}
229
230bool micro_queue::pop( void* dst, ticket k, concurrent_queue_base& base ) {
231 k &= -concurrent_queue_rep::n_queue;
232 spin_wait_until_eq( head_counter, k );
233 spin_wait_while_eq( tail_counter, k );
234 page *p = head_page;
235 __TBB_ASSERT( p, NULL );
236 size_t index = modulo_power_of_two( k/concurrent_queue_rep::n_queue, base.items_per_page );
237 bool success = false;
238 {
239 micro_queue_pop_finalizer finalizer( *this, base, k+concurrent_queue_rep::n_queue, index==base.items_per_page-1 ? p : NULL );
240 if( p->mask & uintptr_t(1)<<index ) {
241 success = true;
242 ITT_NOTIFY( sync_acquired, dst );
243 ITT_NOTIFY( sync_acquired, head_page );
244 base.assign_and_destroy_item( dst, *p, index );
245 ITT_NOTIFY( sync_releasing, head_page );
246 } else {
247 --base.my_rep->n_invalid_entries;
248 }
249 }
250 return success;
251}
252
253micro_queue& micro_queue::assign( const micro_queue& src, concurrent_queue_base& base,
254 concurrent_queue_base::copy_specifics op_type )
255{
256 head_counter = src.head_counter;
257 tail_counter = src.tail_counter;
258
259 const page* srcp = src.head_page;
260 if( srcp ) {
261 ticket g_index = head_counter;
262 __TBB_TRY {
263 size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep::n_queue;
264 size_t index = modulo_power_of_two( head_counter/concurrent_queue_rep::n_queue, base.items_per_page );
265 size_t end_in_first_page = (index+n_items<base.items_per_page)?(index+n_items):base.items_per_page;
266
267 head_page = make_copy( base, srcp, index, end_in_first_page, g_index, op_type );
268 page* cur_page = head_page;
269
270 if( srcp != src.tail_page ) {
271 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
272 cur_page->next = make_copy( base, srcp, 0, base.items_per_page, g_index, op_type );
273 cur_page = cur_page->next;
274 }
275
276 __TBB_ASSERT( srcp==src.tail_page, NULL );
277
278 size_t last_index = modulo_power_of_two( tail_counter/concurrent_queue_rep::n_queue, base.items_per_page );
279 if( last_index==0 ) last_index = base.items_per_page;
280
281 cur_page->next = make_copy( base, srcp, 0, last_index, g_index, op_type );
282 cur_page = cur_page->next;
283 }
284 tail_page = cur_page;
285 } __TBB_CATCH(...) {
286 make_invalid( g_index );
287 __TBB_RETHROW();
288 }
289 } else {
290 head_page = tail_page = NULL;
291 }
292 return *this;
293}
294
295concurrent_queue_base::page* micro_queue::make_copy( concurrent_queue_base& base,
296 const concurrent_queue_base::page* src_page, size_t begin_in_page, size_t end_in_page,
297 ticket& g_index, concurrent_queue_base::copy_specifics op_type )
298{
299 page* new_page = base.allocate_page();
300 new_page->next = NULL;
301 new_page->mask = src_page->mask;
302 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
303 if( new_page->mask & uintptr_t(1)<<begin_in_page ) {
304 if( concurrent_queue_base::copy == op_type ) {
305 base.copy_page_item( *new_page, begin_in_page, *src_page, begin_in_page );
306 } else {
307 __TBB_ASSERT( concurrent_queue_base::move == op_type, NULL );
308 static_cast<concurrent_queue_base_v8&>(base).move_page_item( *new_page, begin_in_page, *src_page, begin_in_page );
309 }
310 }
311 return new_page;
312}
313
314void micro_queue::make_invalid( ticket k )
315{
316 static concurrent_queue_base::page dummy = {static_cast<page*>((void*)1), 0};
317 // mark it so that no more pushes are allowed.
318 static_invalid_page = &dummy;
319 {
320 spin_mutex::scoped_lock lock( page_mutex );
321 tail_counter = k+concurrent_queue_rep::n_queue+1;
322 if( page* q = tail_page )
323 q->next = static_cast<page*>(static_invalid_page);
324 else
325 head_page = static_cast<page*>(static_invalid_page);
326 tail_page = static_cast<page*>(static_invalid_page);
327 }
328}
329
330#if _MSC_VER && !defined(__INTEL_COMPILER)
331 #pragma warning( pop )
332#endif // warning 4146 is back
333
334//------------------------------------------------------------------------
335// concurrent_queue_base
336//------------------------------------------------------------------------
337concurrent_queue_base_v3::concurrent_queue_base_v3( size_t item_sz ) {
338 items_per_page = item_sz<= 8 ? 32 :
339 item_sz<= 16 ? 16 :
340 item_sz<= 32 ? 8 :
341 item_sz<= 64 ? 4 :
342 item_sz<=128 ? 2 :
343 1;
344 my_capacity = size_t(-1)/(item_sz>1 ? item_sz : 2);
345 my_rep = cache_aligned_allocator<concurrent_queue_rep>().allocate(1);
346 __TBB_ASSERT( is_aligned(my_rep, NFS_GetLineSize()), "alignment error" );
347 __TBB_ASSERT( is_aligned(&my_rep->head_counter, NFS_GetLineSize()), "alignment error" );
348 __TBB_ASSERT( is_aligned(&my_rep->tail_counter, NFS_GetLineSize()), "alignment error" );
349 __TBB_ASSERT( is_aligned(&my_rep->array, NFS_GetLineSize()), "alignment error" );
350 std::memset(static_cast<void*>(my_rep),0,sizeof(concurrent_queue_rep));
351 new ( &my_rep->items_avail ) concurrent_monitor();
352 new ( &my_rep->slots_avail ) concurrent_monitor();
353 this->item_size = item_sz;
354}
355
356concurrent_queue_base_v3::~concurrent_queue_base_v3() {
357 size_t nq = my_rep->n_queue;
358 for( size_t i=0; i<nq; i++ )
359 __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
360 cache_aligned_allocator<concurrent_queue_rep>().deallocate(my_rep,1);
361}
362
363void concurrent_queue_base_v3::internal_push( const void* src ) {
364 internal_insert_item( src, copy );
365}
366
367void concurrent_queue_base_v8::internal_push_move( const void* src ) {
368 internal_insert_item( src, move );
369}
370
371void concurrent_queue_base_v3::internal_insert_item( const void* src, copy_specifics op_type ) {
372 concurrent_queue_rep& r = *my_rep;
373 unsigned old_abort_counter = r.abort_counter;
374 ticket k = r.tail_counter++;
375 ptrdiff_t e = my_capacity;
376#if DO_ITT_NOTIFY
377 bool sync_prepare_done = false;
378#endif
379 if( (ptrdiff_t)(k-r.head_counter)>=e ) { // queue is full
380#if DO_ITT_NOTIFY
381 if( !sync_prepare_done ) {
382 ITT_NOTIFY( sync_prepare, &sync_prepare_done );
383 sync_prepare_done = true;
384 }
385#endif
386 bool slept = false;
387 concurrent_monitor::thread_context thr_ctx;
388 r.slots_avail.prepare_wait( thr_ctx, ((ptrdiff_t)(k-e)) );
389 while( (ptrdiff_t)(k-r.head_counter)>=const_cast<volatile ptrdiff_t&>(e = my_capacity) ) {
390 __TBB_TRY {
391 if( r.abort_counter!=old_abort_counter ) {
392 r.slots_avail.cancel_wait( thr_ctx );
393 throw_exception( eid_user_abort );
394 }
395 slept = r.slots_avail.commit_wait( thr_ctx );
396 } __TBB_CATCH( tbb::user_abort& ) {
397 r.choose(k).abort_push(k, *this);
398 __TBB_RETHROW();
399 } __TBB_CATCH(...) {
400 __TBB_RETHROW();
401 }
402 if (slept == true) break;
403 r.slots_avail.prepare_wait( thr_ctx, ((ptrdiff_t)(k-e)) );
404 }
405 if( !slept )
406 r.slots_avail.cancel_wait( thr_ctx );
407 }
408 ITT_NOTIFY( sync_acquired, &sync_prepare_done );
409 __TBB_ASSERT( (ptrdiff_t)(k-r.head_counter)<my_capacity, NULL);
410 r.choose( k ).push( src, k, *this, op_type );
411 r.items_avail.notify( predicate_leq(k) );
412}
413
414void concurrent_queue_base_v3::internal_pop( void* dst ) {
415 concurrent_queue_rep& r = *my_rep;
416 ticket k;
417#if DO_ITT_NOTIFY
418 bool sync_prepare_done = false;
419#endif
420 unsigned old_abort_counter = r.abort_counter;
421 // This loop is a single pop operation; abort_counter should not be re-read inside
422 do {
423 k=r.head_counter++;
424 if ( (ptrdiff_t)(r.tail_counter-k)<=0 ) { // queue is empty
425#if DO_ITT_NOTIFY
426 if( !sync_prepare_done ) {
427 ITT_NOTIFY( sync_prepare, dst );
428 sync_prepare_done = true;
429 }
430#endif
431 bool slept = false;
432 concurrent_monitor::thread_context thr_ctx;
433 r.items_avail.prepare_wait( thr_ctx, k );
434 while( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
435 __TBB_TRY {
436 if( r.abort_counter!=old_abort_counter ) {
437 r.items_avail.cancel_wait( thr_ctx );
438 throw_exception( eid_user_abort );
439 }
440 slept = r.items_avail.commit_wait( thr_ctx );
441 } __TBB_CATCH( tbb::user_abort& ) {
442 r.head_counter--;
443 __TBB_RETHROW();
444 } __TBB_CATCH(...) {
445 __TBB_RETHROW();
446 }
447 if (slept == true) break;
448 r.items_avail.prepare_wait( thr_ctx, k );
449 }
450 if( !slept )
451 r.items_avail.cancel_wait( thr_ctx );
452 }
453 __TBB_ASSERT((ptrdiff_t)(r.tail_counter-k)>0, NULL);
454 } while( !r.choose(k).pop(dst,k,*this) );
455
456 // wake up a producer..
457 r.slots_avail.notify( predicate_leq(k) );
458}
459
460void concurrent_queue_base_v3::internal_abort() {
461 concurrent_queue_rep& r = *my_rep;
462 ++r.abort_counter;
463 r.items_avail.abort_all();
464 r.slots_avail.abort_all();
465}
466
467bool concurrent_queue_base_v3::internal_pop_if_present( void* dst ) {
468 concurrent_queue_rep& r = *my_rep;
469 ticket k;
470 do {
471 k = r.head_counter;
472 for(;;) {
473 if( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
474 // Queue is empty
475 return false;
476 }
477 // Queue had item with ticket k when we looked. Attempt to get that item.
478 ticket tk=k;
479 k = r.head_counter.compare_and_swap( tk+1, tk );
480 if( k==tk )
481 break;
482 // Another thread snatched the item, retry.
483 }
484 } while( !r.choose( k ).pop( dst, k, *this ) );
485
486 r.slots_avail.notify( predicate_leq(k) );
487
488 return true;
489}
490
491bool concurrent_queue_base_v3::internal_push_if_not_full( const void* src ) {
492 return internal_insert_if_not_full( src, copy );
493}
494
495bool concurrent_queue_base_v8::internal_push_move_if_not_full( const void* src ) {
496 return internal_insert_if_not_full( src, move );
497}
498
499bool concurrent_queue_base_v3::internal_insert_if_not_full( const void* src, copy_specifics op_type ) {
500 concurrent_queue_rep& r = *my_rep;
501 ticket k = r.tail_counter;
502 for(;;) {
503 if( (ptrdiff_t)(k-r.head_counter)>=my_capacity ) {
504 // Queue is full
505 return false;
506 }
507 // Queue had empty slot with ticket k when we looked. Attempt to claim that slot.
508 ticket tk=k;
509 k = r.tail_counter.compare_and_swap( tk+1, tk );
510 if( k==tk )
511 break;
512 // Another thread claimed the slot, so retry.
513 }
514 r.choose(k).push(src, k, *this, op_type);
515 r.items_avail.notify( predicate_leq(k) );
516 return true;
517}
518
519ptrdiff_t concurrent_queue_base_v3::internal_size() const {
520 __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
521 return ptrdiff_t(my_rep->tail_counter-my_rep->head_counter-my_rep->n_invalid_entries);
522}
523
524bool concurrent_queue_base_v3::internal_empty() const {
525 ticket tc = my_rep->tail_counter;
526 ticket hc = my_rep->head_counter;
527 // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
528 return ( tc==my_rep->tail_counter && ptrdiff_t(tc-hc-my_rep->n_invalid_entries)<=0 );
529}
530
531void concurrent_queue_base_v3::internal_set_capacity( ptrdiff_t capacity, size_t /*item_sz*/ ) {
532 my_capacity = capacity<0 ? concurrent_queue_rep::infinite_capacity : capacity;
533}
534
535void concurrent_queue_base_v3::internal_finish_clear() {
536 size_t nq = my_rep->n_queue;
537 for( size_t i=0; i<nq; ++i ) {
538 page* tp = my_rep->array[i].tail_page;
539 __TBB_ASSERT( my_rep->array[i].head_page==tp, "at most one page should remain" );
540 if( tp!=NULL) {
541 if( tp!=static_invalid_page ) deallocate_page( tp );
542 my_rep->array[i].tail_page = NULL;
543 }
544 }
545}
546
547void concurrent_queue_base_v3::internal_throw_exception() const {
548 throw_exception( eid_bad_alloc );
549}
550
551void concurrent_queue_base_v3::internal_assign( const concurrent_queue_base& src, copy_specifics op_type ) {
552 items_per_page = src.items_per_page;
553 my_capacity = src.my_capacity;
554
555 // copy concurrent_queue_rep.
556 my_rep->head_counter = src.my_rep->head_counter;
557 my_rep->tail_counter = src.my_rep->tail_counter;
558 my_rep->n_invalid_entries = src.my_rep->n_invalid_entries;
559 my_rep->abort_counter = src.my_rep->abort_counter;
560
561 // copy micro_queues
562 for( size_t i = 0; i<my_rep->n_queue; ++i )
563 my_rep->array[i].assign( src.my_rep->array[i], *this, op_type );
564
565 __TBB_ASSERT( my_rep->head_counter==src.my_rep->head_counter && my_rep->tail_counter==src.my_rep->tail_counter,
566 "the source concurrent queue should not be concurrently modified." );
567}
568
569void concurrent_queue_base_v3::assign( const concurrent_queue_base& src ) {
570 internal_assign( src, copy );
571}
572
573void concurrent_queue_base_v8::move_content( concurrent_queue_base_v8& src ) {
574 internal_assign( src, move );
575}
576
577//------------------------------------------------------------------------
578// concurrent_queue_iterator_rep
579//------------------------------------------------------------------------
580class concurrent_queue_iterator_rep: no_assign {
581public:
582 ticket head_counter;
583 const concurrent_queue_base& my_queue;
584 const size_t offset_of_last;
585 concurrent_queue_base::page* array[concurrent_queue_rep::n_queue];
586 concurrent_queue_iterator_rep( const concurrent_queue_base& queue, size_t offset_of_last_ ) :
587 head_counter(queue.my_rep->head_counter),
588 my_queue(queue),
589 offset_of_last(offset_of_last_)
590 {
591 const concurrent_queue_rep& rep = *queue.my_rep;
592 for( size_t k=0; k<concurrent_queue_rep::n_queue; ++k )
593 array[k] = rep.array[k].head_page;
594 }
595 //! Set item to point to kth element. Return true if at end of queue or item is marked valid; false otherwise.
596 bool get_item( void*& item, size_t k ) {
597 if( k==my_queue.my_rep->tail_counter ) {
598 item = NULL;
599 return true;
600 } else {
601 concurrent_queue_base::page* p = array[concurrent_queue_rep::index(k)];
602 __TBB_ASSERT(p,NULL);
603 size_t i = modulo_power_of_two( k/concurrent_queue_rep::n_queue, my_queue.items_per_page );
604 item = static_cast<unsigned char*>(static_cast<void*>(p)) + offset_of_last + my_queue.item_size*i;
605 return (p->mask & uintptr_t(1)<<i)!=0;
606 }
607 }
608};
609
610//------------------------------------------------------------------------
611// concurrent_queue_iterator_base
612//------------------------------------------------------------------------
613
614void concurrent_queue_iterator_base_v3::initialize( const concurrent_queue_base& queue, size_t offset_of_last ) {
615 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep>().allocate(1);
616 new( my_rep ) concurrent_queue_iterator_rep(queue,offset_of_last);
617 size_t k = my_rep->head_counter;
618 if( !my_rep->get_item(my_item, k) ) advance();
619}
620
621concurrent_queue_iterator_base_v3::concurrent_queue_iterator_base_v3( const concurrent_queue_base& queue ) {
622 initialize(queue,0);
623}
624
625concurrent_queue_iterator_base_v3::concurrent_queue_iterator_base_v3( const concurrent_queue_base& queue, size_t offset_of_last ) {
626 initialize(queue,offset_of_last);
627}
628
629void concurrent_queue_iterator_base_v3::assign( const concurrent_queue_iterator_base& other ) {
630 if( my_rep!=other.my_rep ) {
631 if( my_rep ) {
632 cache_aligned_allocator<concurrent_queue_iterator_rep>().deallocate(my_rep, 1);
633 my_rep = NULL;
634 }
635 if( other.my_rep ) {
636 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep>().allocate(1);
637 new( my_rep ) concurrent_queue_iterator_rep( *other.my_rep );
638 }
639 }
640 my_item = other.my_item;
641}
642
643void concurrent_queue_iterator_base_v3::advance() {
644 __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
645 size_t k = my_rep->head_counter;
646 const concurrent_queue_base& queue = my_rep->my_queue;
647#if TBB_USE_ASSERT
648 void* tmp;
649 my_rep->get_item(tmp,k);
650 __TBB_ASSERT( my_item==tmp, NULL );
651#endif /* TBB_USE_ASSERT */
652 size_t i = modulo_power_of_two( k/concurrent_queue_rep::n_queue, queue.items_per_page );
653 if( i==queue.items_per_page-1 ) {
654 concurrent_queue_base::page*& root = my_rep->array[concurrent_queue_rep::index(k)];
655 root = root->next;
656 }
657 // advance k
658 my_rep->head_counter = ++k;
659 if( !my_rep->get_item(my_item, k) ) advance();
660}
661
662concurrent_queue_iterator_base_v3::~concurrent_queue_iterator_base_v3() {
663 //delete my_rep;
664 cache_aligned_allocator<concurrent_queue_iterator_rep>().deallocate(my_rep, 1);
665 my_rep = NULL;
666}
667
668} // namespace internal
669
670} // namespace tbb
671