1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#include "concurrent_queue_v2.h"
18#include "tbb/cache_aligned_allocator.h"
19#include "tbb/spin_mutex.h"
20#include "tbb/atomic.h"
21#include <cstring>
22#include <stdio.h>
23
24#if defined(_MSC_VER) && defined(_Wp64)
25 // Workaround for overzealous compiler warnings in /Wp64 mode
26 #pragma warning (disable: 4267)
27#endif
28
29#define RECORD_EVENTS 0
30
31namespace tbb {
32
33namespace internal {
34
35class concurrent_queue_rep;
36
37//! A queue using simple locking.
38/** For efficiency, this class has no constructor.
39 The caller is expected to zero-initialize it. */
40struct micro_queue {
41 typedef concurrent_queue_base::page page;
42 typedef size_t ticket;
43
44 atomic<page*> head_page;
45 atomic<ticket> head_counter;
46
47 atomic<page*> tail_page;
48 atomic<ticket> tail_counter;
49
50 spin_mutex page_mutex;
51
52 class push_finalizer: no_copy {
53 ticket my_ticket;
54 micro_queue& my_queue;
55 public:
56 push_finalizer( micro_queue& queue, ticket k ) :
57 my_ticket(k), my_queue(queue)
58 {}
59 ~push_finalizer() {
60 my_queue.tail_counter = my_ticket;
61 }
62 };
63
64 void push( const void* item, ticket k, concurrent_queue_base& base );
65
66 class pop_finalizer: no_copy {
67 ticket my_ticket;
68 micro_queue& my_queue;
69 page* my_page;
70 public:
71 pop_finalizer( micro_queue& queue, ticket k, page* p ) :
72 my_ticket(k), my_queue(queue), my_page(p)
73 {}
74 ~pop_finalizer() {
75 page* p = my_page;
76 if( p ) {
77 spin_mutex::scoped_lock lock( my_queue.page_mutex );
78 page* q = p->next;
79 my_queue.head_page = q;
80 if( !q ) {
81 my_queue.tail_page = NULL;
82 }
83 }
84 my_queue.head_counter = my_ticket;
85 if( p )
86 operator delete(p);
87 }
88 };
89
90 bool pop( void* dst, ticket k, concurrent_queue_base& base );
91};
92
93//! Internal representation of a ConcurrentQueue.
94/** For efficiency, this class has no constructor.
95 The caller is expected to zero-initialize it. */
96class concurrent_queue_rep {
97public:
98 typedef size_t ticket;
99
100private:
101 friend struct micro_queue;
102
103 //! Approximately n_queue/golden ratio
104 static const size_t phi = 3;
105
106public:
107 //! Must be power of 2
108 static const size_t n_queue = 8;
109
110 //! Map ticket to an array index
111 static size_t index( ticket k ) {
112 return k*phi%n_queue;
113 }
114
115 atomic<ticket> head_counter;
116 char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
117
118 atomic<ticket> tail_counter;
119 char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
120 micro_queue array[n_queue];
121
122 micro_queue& choose( ticket k ) {
123 // The formula here approximates LRU in a cache-oblivious way.
124 return array[index(k)];
125 }
126
127 //! Value for effective_capacity that denotes unbounded queue.
128 static const ptrdiff_t infinite_capacity = ptrdiff_t(~size_t(0)/2);
129};
130
131#if _MSC_VER && !defined(__INTEL_COMPILER)
132 // unary minus operator applied to unsigned type, result still unsigned
133 #pragma warning( push )
134 #pragma warning( disable: 4146 )
135#endif
136
137//------------------------------------------------------------------------
138// micro_queue
139//------------------------------------------------------------------------
140void micro_queue::push( const void* item, ticket k, concurrent_queue_base& base ) {
141 k &= -concurrent_queue_rep::n_queue;
142 page* p = NULL;
143 size_t index = modulo_power_of_two( k/concurrent_queue_rep::n_queue, base.items_per_page );
144 if( !index ) {
145 size_t n = sizeof(page) + base.items_per_page*base.item_size;
146 p = static_cast<page*>(operator new( n ));
147 p->mask = 0;
148 p->next = NULL;
149 }
150 {
151 push_finalizer finalizer( *this, k+concurrent_queue_rep::n_queue );
152 spin_wait_until_eq( tail_counter, k );
153 if( p ) {
154 spin_mutex::scoped_lock lock( page_mutex );
155 if( page* q = tail_page )
156 q->next = p;
157 else
158 head_page = p;
159 tail_page = p;
160 } else {
161 p = tail_page;
162 }
163 base.copy_item( *p, index, item );
164 // If no exception was thrown, mark item as present.
165 p->mask |= uintptr_t(1)<<index;
166 }
167}
168
169bool micro_queue::pop( void* dst, ticket k, concurrent_queue_base& base ) {
170 k &= -concurrent_queue_rep::n_queue;
171 spin_wait_until_eq( head_counter, k );
172 spin_wait_while_eq( tail_counter, k );
173 page *p = head_page;
174 __TBB_ASSERT( p, NULL );
175 size_t index = modulo_power_of_two( k/concurrent_queue_rep::n_queue, base.items_per_page );
176 bool success = false;
177 {
178 pop_finalizer finalizer( *this, k+concurrent_queue_rep::n_queue, index==base.items_per_page-1 ? p : NULL );
179 if( p->mask & uintptr_t(1)<<index ) {
180 success = true;
181 base.assign_and_destroy_item( dst, *p, index );
182 }
183 }
184 return success;
185}
186
187#if _MSC_VER && !defined(__INTEL_COMPILER)
188 #pragma warning( pop )
189#endif
190
191//------------------------------------------------------------------------
192// concurrent_queue_base
193//------------------------------------------------------------------------
194concurrent_queue_base::concurrent_queue_base( size_t item_sz ) {
195 items_per_page = item_sz<= 8 ? 32 :
196 item_sz<= 16 ? 16 :
197 item_sz<= 32 ? 8 :
198 item_sz<= 64 ? 4 :
199 item_sz<=128 ? 2 :
200 1;
201 my_capacity = size_t(-1)/(item_sz>1 ? item_sz : 2);
202 my_rep = cache_aligned_allocator<concurrent_queue_rep>().allocate(1);
203 __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
204 __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
205 __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
206 __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
207 std::memset(static_cast<void*>(my_rep),0,sizeof(concurrent_queue_rep));
208 this->item_size = item_sz;
209}
210
211concurrent_queue_base::~concurrent_queue_base() {
212 size_t nq = my_rep->n_queue;
213 for( size_t i=0; i<nq; i++ ) {
214 page* tp = my_rep->array[i].tail_page;
215 __TBB_ASSERT( my_rep->array[i].head_page==tp, "at most one page should remain" );
216 if( tp!=NULL )
217 delete tp;
218 }
219 cache_aligned_allocator<concurrent_queue_rep>().deallocate(my_rep,1);
220}
221
222void concurrent_queue_base::internal_push( const void* src ) {
223 concurrent_queue_rep& r = *my_rep;
224 concurrent_queue_rep::ticket k = r.tail_counter++;
225 if( my_capacity<concurrent_queue_rep::infinite_capacity ) {
226 // Capacity is limited, wait to not exceed it
227 atomic_backoff backoff;
228 while( (ptrdiff_t)(k-r.head_counter)>=const_cast<volatile ptrdiff_t&>(my_capacity) )
229 backoff.pause();
230 }
231 r.choose(k).push(src,k,*this);
232}
233
234void concurrent_queue_base::internal_pop( void* dst ) {
235 concurrent_queue_rep& r = *my_rep;
236 concurrent_queue_rep::ticket k;
237 do {
238 k = r.head_counter++;
239 } while( !r.choose(k).pop(dst,k,*this) );
240}
241
242bool concurrent_queue_base::internal_pop_if_present( void* dst ) {
243 concurrent_queue_rep& r = *my_rep;
244 concurrent_queue_rep::ticket k;
245 do {
246 for( atomic_backoff b;;b.pause() ) {
247 k = r.head_counter;
248 if( r.tail_counter<=k ) {
249 // Queue is empty
250 return false;
251 }
252 // Queue had item with ticket k when we looked. Attempt to get that item.
253 if( r.head_counter.compare_and_swap(k+1,k)==k ) {
254 break;
255 }
256 // Another thread snatched the item, so pause and retry.
257 }
258 } while( !r.choose(k).pop(dst,k,*this) );
259 return true;
260}
261
262bool concurrent_queue_base::internal_push_if_not_full( const void* src ) {
263 concurrent_queue_rep& r = *my_rep;
264 concurrent_queue_rep::ticket k;
265 for( atomic_backoff b;;b.pause() ) {
266 k = r.tail_counter;
267 if( (ptrdiff_t)(k-r.head_counter)>=my_capacity ) {
268 // Queue is full
269 return false;
270 }
271 // Queue had empty slot with ticket k when we looked. Attempt to claim that slot.
272 if( r.tail_counter.compare_and_swap(k+1,k)==k )
273 break;
274 // Another thread claimed the slot, so pause and retry.
275 }
276 r.choose(k).push(src,k,*this);
277 return true;
278}
279
280ptrdiff_t concurrent_queue_base::internal_size() const {
281 __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
282 return ptrdiff_t(my_rep->tail_counter-my_rep->head_counter);
283}
284
285void concurrent_queue_base::internal_set_capacity( ptrdiff_t capacity, size_t /*item_sz*/ ) {
286 my_capacity = capacity<0 ? concurrent_queue_rep::infinite_capacity : capacity;
287}
288
289//------------------------------------------------------------------------
290// concurrent_queue_iterator_rep
291//------------------------------------------------------------------------
292class concurrent_queue_iterator_rep: no_assign {
293public:
294 typedef concurrent_queue_rep::ticket ticket;
295 ticket head_counter;
296 const concurrent_queue_base& my_queue;
297 concurrent_queue_base::page* array[concurrent_queue_rep::n_queue];
298 concurrent_queue_iterator_rep( const concurrent_queue_base& queue ) :
299 head_counter(queue.my_rep->head_counter),
300 my_queue(queue)
301 {
302 const concurrent_queue_rep& rep = *queue.my_rep;
303 for( size_t k=0; k<concurrent_queue_rep::n_queue; ++k )
304 array[k] = rep.array[k].head_page;
305 }
306 //! Get pointer to kth element
307 void* choose( size_t k ) {
308 if( k==my_queue.my_rep->tail_counter )
309 return NULL;
310 else {
311 concurrent_queue_base::page* p = array[concurrent_queue_rep::index(k)];
312 __TBB_ASSERT(p,NULL);
313 size_t i = modulo_power_of_two( k/concurrent_queue_rep::n_queue, my_queue.items_per_page );
314 return static_cast<unsigned char*>(static_cast<void*>(p+1)) + my_queue.item_size*i;
315 }
316 }
317};
318
319//------------------------------------------------------------------------
320// concurrent_queue_iterator_base
321//------------------------------------------------------------------------
322concurrent_queue_iterator_base::concurrent_queue_iterator_base( const concurrent_queue_base& queue ) {
323 my_rep = new concurrent_queue_iterator_rep(queue);
324 my_item = my_rep->choose(my_rep->head_counter);
325}
326
327void concurrent_queue_iterator_base::assign( const concurrent_queue_iterator_base& other ) {
328 if( my_rep!=other.my_rep ) {
329 if( my_rep ) {
330 delete my_rep;
331 my_rep = NULL;
332 }
333 if( other.my_rep ) {
334 my_rep = new concurrent_queue_iterator_rep( *other.my_rep );
335 }
336 }
337 my_item = other.my_item;
338}
339
340void concurrent_queue_iterator_base::advance() {
341 __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
342 size_t k = my_rep->head_counter;
343 const concurrent_queue_base& queue = my_rep->my_queue;
344 __TBB_ASSERT( my_item==my_rep->choose(k), NULL );
345 size_t i = modulo_power_of_two( k/concurrent_queue_rep::n_queue, queue.items_per_page );
346 if( i==queue.items_per_page-1 ) {
347 concurrent_queue_base::page*& root = my_rep->array[concurrent_queue_rep::index(k)];
348 root = root->next;
349 }
350 my_rep->head_counter = k+1;
351 my_item = my_rep->choose(k+1);
352}
353
354concurrent_queue_iterator_base::~concurrent_queue_iterator_base() {
355 delete my_rep;
356 my_rep = NULL;
357}
358
359} // namespace internal
360
361} // namespace tbb
362