1/*
2 Copyright (c) 2005-2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17#ifndef __TBB_concurrent_priority_queue_H
18#define __TBB_concurrent_priority_queue_H
19
20#include "atomic.h"
21#include "cache_aligned_allocator.h"
22#include "tbb_exception.h"
23#include "tbb_stddef.h"
24#include "tbb_profiling.h"
25#include "internal/_aggregator_impl.h"
26#include "internal/_template_helpers.h"
27#include "internal/_allocator_traits.h"
28#include <vector>
29#include <iterator>
30#include <functional>
31#include __TBB_STD_SWAP_HEADER
32
33#if __TBB_INITIALIZER_LISTS_PRESENT
34 #include <initializer_list>
35#endif
36
37#if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
38 #include <type_traits>
39#endif
40
41namespace tbb {
42namespace interface5 {
43namespace internal {
44#if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
45 template<typename T, bool C = std::is_copy_constructible<T>::value>
46 struct use_element_copy_constructor {
47 typedef tbb::internal::true_type type;
48 };
49 template<typename T>
50 struct use_element_copy_constructor <T,false> {
51 typedef tbb::internal::false_type type;
52 };
53#else
54 template<typename>
55 struct use_element_copy_constructor {
56 typedef tbb::internal::true_type type;
57 };
58#endif
59} // namespace internal
60
61using namespace tbb::internal;
62
63//! Concurrent priority queue
64template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
65class concurrent_priority_queue {
66 public:
67 //! Element type in the queue.
68 typedef T value_type;
69
70 //! Reference type
71 typedef T& reference;
72
73 //! Const reference type
74 typedef const T& const_reference;
75
76 //! Integral type for representing size of the queue.
77 typedef size_t size_type;
78
79 //! Difference type for iterator
80 typedef ptrdiff_t difference_type;
81
82 //! Allocator type
83 typedef A allocator_type;
84
85 //! Constructs a new concurrent_priority_queue with default capacity
86 explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), compare(), data(a)
87 {
88 my_aggregator.initialize_handler(my_functor_t(this));
89 }
90
91 //! Constructs a new concurrent_priority_queue with default capacity
92 explicit concurrent_priority_queue(const Compare& c, const allocator_type& a = allocator_type()) : mark(0), my_size(0), compare(c), data(a)
93 {
94 my_aggregator.initialize_handler(my_functor_t(this));
95 }
96
97 //! Constructs a new concurrent_priority_queue with init_sz capacity
98 explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) :
99 mark(0), my_size(0), compare(), data(a)
100 {
101 data.reserve(init_capacity);
102 my_aggregator.initialize_handler(my_functor_t(this));
103 }
104
105 //! Constructs a new concurrent_priority_queue with init_sz capacity
106 explicit concurrent_priority_queue(size_type init_capacity, const Compare& c, const allocator_type& a = allocator_type()) :
107 mark(0), my_size(0), compare(c), data(a)
108 {
109 data.reserve(init_capacity);
110 my_aggregator.initialize_handler(my_functor_t(this));
111 }
112
113 //! [begin,end) constructor
114 template<typename InputIterator>
115 concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
116 mark(0), compare(), data(begin, end, a)
117 {
118 my_aggregator.initialize_handler(my_functor_t(this));
119 heapify();
120 my_size = data.size();
121 }
122
123 //! [begin,end) constructor
124 template<typename InputIterator>
125 concurrent_priority_queue(InputIterator begin, InputIterator end, const Compare& c, const allocator_type& a = allocator_type()) :
126 mark(0), compare(c), data(begin, end, a)
127 {
128 my_aggregator.initialize_handler(my_functor_t(this));
129 heapify();
130 my_size = data.size();
131 }
132
133#if __TBB_INITIALIZER_LISTS_PRESENT
134 //! Constructor from std::initializer_list
135 concurrent_priority_queue(std::initializer_list<T> init_list, const allocator_type &a = allocator_type()) :
136 mark(0), compare(), data(init_list.begin(), init_list.end(), a)
137 {
138 my_aggregator.initialize_handler(my_functor_t(this));
139 heapify();
140 my_size = data.size();
141 }
142
143 //! Constructor from std::initializer_list
144 concurrent_priority_queue(std::initializer_list<T> init_list, const Compare& c, const allocator_type &a = allocator_type()) :
145 mark(0), compare(c), data(init_list.begin(), init_list.end(), a)
146 {
147 my_aggregator.initialize_handler(my_functor_t(this));
148 heapify();
149 my_size = data.size();
150 }
151#endif //# __TBB_INITIALIZER_LISTS_PRESENT
152
153 //! Copy constructor
154 /** This operation is unsafe if there are pending concurrent operations on the src queue. */
155 concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark),
156 my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
157 {
158 my_aggregator.initialize_handler(my_functor_t(this));
159 heapify();
160 }
161
162 //! Copy constructor with specific allocator
163 /** This operation is unsafe if there are pending concurrent operations on the src queue. */
164 concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark),
165 my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
166 {
167 my_aggregator.initialize_handler(my_functor_t(this));
168 heapify();
169 }
170
171 //! Assignment operator
172 /** This operation is unsafe if there are pending concurrent operations on the src queue. */
173 concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
174 if (this != &src) {
175 vector_t(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
176 mark = src.mark;
177 my_size = src.my_size;
178 }
179 return *this;
180 }
181
182#if __TBB_CPP11_RVALUE_REF_PRESENT
183 //! Move constructor
184 /** This operation is unsafe if there are pending concurrent operations on the src queue. */
185 concurrent_priority_queue(concurrent_priority_queue&& src) : mark(src.mark),
186 my_size(src.my_size), data(std::move(src.data))
187 {
188 my_aggregator.initialize_handler(my_functor_t(this));
189 }
190
191 //! Move constructor with specific allocator
192 /** This operation is unsafe if there are pending concurrent operations on the src queue. */
193 concurrent_priority_queue(concurrent_priority_queue&& src, const allocator_type& a) : mark(src.mark),
194 my_size(src.my_size),
195#if __TBB_ALLOCATOR_TRAITS_PRESENT
196 data(std::move(src.data), a)
197#else
198 // Some early version of C++11 STL vector does not have a constructor of vector(vector&& , allocator).
199 // It seems that the reason is absence of support of allocator_traits (stateful allocators).
200 data(a)
201#endif //__TBB_ALLOCATOR_TRAITS_PRESENT
202 {
203 my_aggregator.initialize_handler(my_functor_t(this));
204#if !__TBB_ALLOCATOR_TRAITS_PRESENT
205 if (a != src.data.get_allocator()){
206 data.reserve(src.data.size());
207 data.assign(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()));
208 }else{
209 data = std::move(src.data);
210 }
211#endif //!__TBB_ALLOCATOR_TRAITS_PRESENT
212 }
213
214 //! Move assignment operator
215 /** This operation is unsafe if there are pending concurrent operations on the src queue. */
216 concurrent_priority_queue& operator=( concurrent_priority_queue&& src) {
217 if (this != &src) {
218 mark = src.mark;
219 my_size = src.my_size;
220#if !__TBB_ALLOCATOR_TRAITS_PRESENT
221 if (data.get_allocator() != src.data.get_allocator()){
222 vector_t(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()), data.get_allocator()).swap(data);
223 }else
224#endif //!__TBB_ALLOCATOR_TRAITS_PRESENT
225 {
226 data = std::move(src.data);
227 }
228 }
229 return *this;
230 }
231#endif //__TBB_CPP11_RVALUE_REF_PRESENT
232
233 //! Assign the queue from [begin,end) range, not thread-safe
234 template<typename InputIterator>
235 void assign(InputIterator begin, InputIterator end) {
236 vector_t(begin, end, data.get_allocator()).swap(data);
237 mark = 0;
238 my_size = data.size();
239 heapify();
240 }
241
242#if __TBB_INITIALIZER_LISTS_PRESENT
243 //! Assign the queue from std::initializer_list, not thread-safe
244 void assign(std::initializer_list<T> il) { this->assign(il.begin(), il.end()); }
245
246 //! Assign from std::initializer_list, not thread-safe
247 concurrent_priority_queue& operator=(std::initializer_list<T> il) {
248 this->assign(il.begin(), il.end());
249 return *this;
250 }
251#endif //# __TBB_INITIALIZER_LISTS_PRESENT
252
253 //! Returns true if empty, false otherwise
254 /** Returned value may not reflect results of pending operations.
255 This operation reads shared data and will trigger a race condition. */
256 bool empty() const { return size()==0; }
257
258 //! Returns the current number of elements contained in the queue
259 /** Returned value may not reflect results of pending operations.
260 This operation reads shared data and will trigger a race condition. */
261 size_type size() const { return __TBB_load_with_acquire(my_size); }
262
263 //! Pushes elem onto the queue, increasing capacity of queue if necessary
264 /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
265 void push(const_reference elem) {
266#if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
267 __TBB_STATIC_ASSERT( std::is_copy_constructible<value_type>::value, "The type is not copy constructible. Copying push operation is impossible." );
268#endif
269 cpq_operation op_data(elem, PUSH_OP);
270 my_aggregator.execute(&op_data);
271 if (op_data.status == FAILED) // exception thrown
272 throw_exception(eid_bad_alloc);
273 }
274
275#if __TBB_CPP11_RVALUE_REF_PRESENT
276 //! Pushes elem onto the queue, increasing capacity of queue if necessary
277 /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
278 void push(value_type &&elem) {
279 cpq_operation op_data(elem, PUSH_RVALUE_OP);
280 my_aggregator.execute(&op_data);
281 if (op_data.status == FAILED) // exception thrown
282 throw_exception(eid_bad_alloc);
283 }
284
285#if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
286 //! Constructs a new element using args as the arguments for its construction and pushes it onto the queue */
287 /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
288 template<typename... Args>
289 void emplace(Args&&... args) {
290 push(value_type(std::forward<Args>(args)...));
291 }
292#endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
293#endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
294
295 //! Gets a reference to and removes highest priority element
296 /** If a highest priority element was found, sets elem and returns true,
297 otherwise returns false.
298 This operation can be safely used concurrently with other push, try_pop or emplace operations. */
299 bool try_pop(reference elem) {
300 cpq_operation op_data(POP_OP);
301 op_data.elem = &elem;
302 my_aggregator.execute(&op_data);
303 return op_data.status==SUCCEEDED;
304 }
305
306 //! Clear the queue; not thread-safe
307 /** This operation is unsafe if there are pending concurrent operations on the queue.
308 Resets size, effectively emptying queue; does not free space.
309 May not clear elements added in pending operations. */
310 void clear() {
311 data.clear();
312 mark = 0;
313 my_size = 0;
314 }
315
316 //! Swap this queue with another; not thread-safe
317 /** This operation is unsafe if there are pending concurrent operations on the queue. */
318 void swap(concurrent_priority_queue& q) {
319 using std::swap;
320 data.swap(q.data);
321 swap(mark, q.mark);
322 swap(my_size, q.my_size);
323 }
324
325 //! Return allocator object
326 allocator_type get_allocator() const { return data.get_allocator(); }
327
328 private:
329 enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP};
330 enum operation_status { WAIT=0, SUCCEEDED, FAILED };
331
332 class cpq_operation : public aggregated_operation<cpq_operation> {
333 public:
334 operation_type type;
335 union {
336 value_type *elem;
337 size_type sz;
338 };
339 cpq_operation(const_reference e, operation_type t) :
340 type(t), elem(const_cast<value_type*>(&e)) {}
341 cpq_operation(operation_type t) : type(t) {}
342 };
343
344 class my_functor_t {
345 concurrent_priority_queue<T, Compare, A> *cpq;
346 public:
347 my_functor_t() {}
348 my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {}
349 void operator()(cpq_operation* op_list) {
350 cpq->handle_operations(op_list);
351 }
352 };
353
354 typedef tbb::internal::aggregator< my_functor_t, cpq_operation > aggregator_t;
355 aggregator_t my_aggregator;
356 //! Padding added to avoid false sharing
357 char padding1[NFS_MaxLineSize - sizeof(aggregator_t)];
358 //! The point at which unsorted elements begin
359 size_type mark;
360 __TBB_atomic size_type my_size;
361 Compare compare;
362 //! Padding added to avoid false sharing
363 char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)];
364 //! Storage for the heap of elements in queue, plus unheapified elements
365 /** data has the following structure:
366
367 binary unheapified
368 heap elements
369 ____|_______|____
370 | | |
371 v v v
372 [_|...|_|_|...|_| |...| ]
373 0 ^ ^ ^
374 | | |__capacity
375 | |__my_size
376 |__mark
377
378 Thus, data stores the binary heap starting at position 0 through
379 mark-1 (it may be empty). Then there are 0 or more elements
380 that have not yet been inserted into the heap, in positions
381 mark through my_size-1. */
382 typedef std::vector<value_type, allocator_type> vector_t;
383 vector_t data;
384
385 void handle_operations(cpq_operation *op_list) {
386 cpq_operation *tmp, *pop_list=NULL;
387
388 __TBB_ASSERT(mark == data.size(), NULL);
389
390 // First pass processes all constant (amortized; reallocation may happen) time pushes and pops.
391 while (op_list) {
392 // ITT note: &(op_list->status) tag is used to cover accesses to op_list
393 // node. This thread is going to handle the operation, and so will acquire it
394 // and perform the associated operation w/o triggering a race condition; the
395 // thread that created the operation is waiting on the status field, so when
396 // this thread is done with the operation, it will perform a
397 // store_with_release to give control back to the waiting thread in
398 // aggregator::insert_operation.
399 call_itt_notify(acquired, &(op_list->status));
400 __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
401 tmp = op_list;
402 op_list = itt_hide_load_word(op_list->next);
403 if (tmp->type == POP_OP) {
404 if (mark < data.size() &&
405 compare(data[0], data[data.size()-1])) {
406 // there are newly pushed elems and the last one
407 // is higher than top
408 *(tmp->elem) = tbb::internal::move(data[data.size()-1]);
409 __TBB_store_with_release(my_size, my_size-1);
410 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
411 data.pop_back();
412 __TBB_ASSERT(mark<=data.size(), NULL);
413 }
414 else { // no convenient item to pop; postpone
415 itt_hide_store_word(tmp->next, pop_list);
416 pop_list = tmp;
417 }
418 } else { // PUSH_OP or PUSH_RVALUE_OP
419 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation" );
420 __TBB_TRY{
421 if (tmp->type == PUSH_OP) {
422 push_back_helper(*(tmp->elem), typename internal::use_element_copy_constructor<value_type>::type());
423 } else {
424 data.push_back(tbb::internal::move(*(tmp->elem)));
425 }
426 __TBB_store_with_release(my_size, my_size + 1);
427 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
428 } __TBB_CATCH(...) {
429 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
430 }
431 }
432 }
433
434 // second pass processes pop operations
435 while (pop_list) {
436 tmp = pop_list;
437 pop_list = itt_hide_load_word(pop_list->next);
438 __TBB_ASSERT(tmp->type == POP_OP, NULL);
439 if (data.empty()) {
440 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
441 }
442 else {
443 __TBB_ASSERT(mark<=data.size(), NULL);
444 if (mark < data.size() &&
445 compare(data[0], data[data.size()-1])) {
446 // there are newly pushed elems and the last one is
447 // higher than top
448 *(tmp->elem) = tbb::internal::move(data[data.size()-1]);
449 __TBB_store_with_release(my_size, my_size-1);
450 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
451 data.pop_back();
452 }
453 else { // extract top and push last element down heap
454 *(tmp->elem) = tbb::internal::move(data[0]);
455 __TBB_store_with_release(my_size, my_size-1);
456 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
457 reheap();
458 }
459 }
460 }
461
462 // heapify any leftover pushed elements before doing the next
463 // batch of operations
464 if (mark<data.size()) heapify();
465 __TBB_ASSERT(mark == data.size(), NULL);
466 }
467
468 //! Merge unsorted elements into heap
469 void heapify() {
470 if (!mark && data.size()>0) mark = 1;
471 for (; mark<data.size(); ++mark) {
472 // for each unheapified element under size
473 size_type cur_pos = mark;
474 value_type to_place = tbb::internal::move(data[mark]);
475 do { // push to_place up the heap
476 size_type parent = (cur_pos-1)>>1;
477 if (!compare(data[parent], to_place)) break;
478 data[cur_pos] = tbb::internal::move(data[parent]);
479 cur_pos = parent;
480 } while( cur_pos );
481 data[cur_pos] = tbb::internal::move(to_place);
482 }
483 }
484
485 //! Re-heapify after an extraction
486 /** Re-heapify by pushing last element down the heap from the root. */
487 void reheap() {
488 size_type cur_pos=0, child=1;
489
490 while (child < mark) {
491 size_type target = child;
492 if (child+1 < mark && compare(data[child], data[child+1]))
493 ++target;
494 // target now has the higher priority child
495 if (compare(data[target], data[data.size()-1])) break;
496 data[cur_pos] = tbb::internal::move(data[target]);
497 cur_pos = target;
498 child = (cur_pos<<1)+1;
499 }
500 if (cur_pos != data.size()-1)
501 data[cur_pos] = tbb::internal::move(data[data.size()-1]);
502 data.pop_back();
503 if (mark > data.size()) mark = data.size();
504 }
505
506 void push_back_helper(const T& t, tbb::internal::true_type) {
507 data.push_back(t);
508 }
509
510 void push_back_helper(const T&, tbb::internal::false_type) {
511 __TBB_ASSERT( false, "The type is not copy constructible. Copying push operation is impossible." );
512 }
513};
514
515#if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
516namespace internal {
517
518template<typename T, typename... Args>
519using priority_queue_t = concurrent_priority_queue<
520 T,
521 std::conditional_t< (sizeof...(Args)>0) && !is_allocator_v< pack_element_t<0, Args...> >,
522 pack_element_t<0, Args...>, std::less<T> >,
523 std::conditional_t< (sizeof...(Args)>0) && is_allocator_v< pack_element_t<sizeof...(Args)-1, Args...> >,
524 pack_element_t<sizeof...(Args)-1, Args...>, cache_aligned_allocator<T> >
525>;
526}
527
528// Deduction guide for the constructor from two iterators
529template<typename InputIterator,
530 typename T = typename std::iterator_traits<InputIterator>::value_type,
531 typename... Args
532> concurrent_priority_queue(InputIterator, InputIterator, Args...)
533-> internal::priority_queue_t<T, Args...>;
534
535template<typename T, typename CompareOrAllocalor>
536concurrent_priority_queue(std::initializer_list<T> init_list, CompareOrAllocalor)
537-> internal::priority_queue_t<T, CompareOrAllocalor>;
538
539#endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
540} // namespace interface5
541
542using interface5::concurrent_priority_queue;
543
544} // namespace tbb
545
546#endif /* __TBB_concurrent_priority_queue_H */
547