1// Provides a C++11 implementation of a multi-producer, multi-consumer lock-free queue.
2// An overview, including benchmark results, is provided here:
3// http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
4// The full design is also described in excruciating detail at:
5// http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue
6
7// Simplified BSD license:
8// Copyright (c) 2013-2016, Cameron Desrochers.
9// All rights reserved.
10//
11// Redistribution and use in source and binary forms, with or without modification,
12// are permitted provided that the following conditions are met:
13//
14// - Redistributions of source code must retain the above copyright notice, this list of
15// conditions and the following disclaimer.
16// - Redistributions in binary form must reproduce the above copyright notice, this list of
17// conditions and the following disclaimer in the documentation and/or other materials
18// provided with the distribution.
19//
20// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
21// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
22// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
23// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
25// OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
26// HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
27// TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
28// EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31#pragma once
32
33#if defined(__GNUC__)
34// Disable -Wconversion warnings (spuriously triggered when Traits::size_t and
35// Traits::index_t are set to < 32 bits, causing integer promotion, causing warnings
36// upon assigning any computed values)
37
38#endif
39
40#if defined(__APPLE__)
41#include <TargetConditionals.h>
42#endif
43
44#include <atomic> // Requires C++11. Sorry VS2010.
45#include <cassert>
46#include <cstddef> // for max_align_t
47#include <cstdint>
48#include <cstdlib>
49#include <type_traits>
50#include <algorithm>
51#include <utility>
52#include <limits>
53#include <climits> // for CHAR_BIT
54#include <array>
55#include <thread> // partly for __WINPTHREADS_VERSION if on MinGW-w64 w/ POSIX threading
56
57// Platform-specific definitions of a numeric thread ID type and an invalid value
58namespace duckdb_moodycamel { namespace details {
59 template<typename thread_id_t> struct thread_id_converter {
60 typedef thread_id_t thread_id_numeric_size_t;
61 typedef thread_id_t thread_id_hash_t;
62 static thread_id_hash_t prehash(thread_id_t const& x) { return x; }
63 };
64} }
65#if defined(MCDBGQ_USE_RELACY)
66namespace duckdb_moodycamel { namespace details {
67 typedef std::uint32_t thread_id_t;
68 static const thread_id_t invalid_thread_id = 0xFFFFFFFFU;
69 static const thread_id_t invalid_thread_id2 = 0xFFFFFFFEU;
70 static inline thread_id_t thread_id() { return rl::thread_index(); }
71} }
72#elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__)
73// No sense pulling in windows.h in a header, we'll manually declare the function
74// we use and rely on backwards-compatibility for this not to break
75extern "C" __declspec(dllimport) unsigned long __stdcall GetCurrentThreadId(void);
76namespace duckdb_moodycamel { namespace details {
77 static_assert(sizeof(unsigned long) == sizeof(std::uint32_t), "Expected size of unsigned long to be 32 bits on Windows");
78 typedef std::uint32_t thread_id_t;
79 static const thread_id_t invalid_thread_id = 0; // See http://blogs.msdn.com/b/oldnewthing/archive/2004/02/23/78395.aspx
80 static const thread_id_t invalid_thread_id2 = 0xFFFFFFFFU; // Not technically guaranteed to be invalid, but is never used in practice. Note that all Win32 thread IDs are presently multiples of 4.
81 static inline thread_id_t thread_id() { return static_cast<thread_id_t>(::GetCurrentThreadId()); }
82} }
83#elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || (defined(__APPLE__) && TARGET_OS_IPHONE)
84namespace duckdb_moodycamel { namespace details {
85 static_assert(sizeof(std::thread::id) == 4 || sizeof(std::thread::id) == 8, "std::thread::id is expected to be either 4 or 8 bytes");
86
87 typedef std::thread::id thread_id_t;
88 static const thread_id_t invalid_thread_id; // Default ctor creates invalid ID
89
90 // Note we don't define a invalid_thread_id2 since std::thread::id doesn't have one; it's
91 // only used if MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is defined anyway, which it won't
92 // be.
93 static inline thread_id_t thread_id() { return std::this_thread::get_id(); }
94
95 template<std::size_t> struct thread_id_size { };
96 template<> struct thread_id_size<4> { typedef std::uint32_t numeric_t; };
97 template<> struct thread_id_size<8> { typedef std::uint64_t numeric_t; };
98
99 template<> struct thread_id_converter<thread_id_t> {
100 typedef thread_id_size<sizeof(thread_id_t)>::numeric_t thread_id_numeric_size_t;
101#ifndef __APPLE__
102 typedef std::size_t thread_id_hash_t;
103#else
104 typedef thread_id_numeric_size_t thread_id_hash_t;
105#endif
106
107 static thread_id_hash_t prehash(thread_id_t const& x)
108 {
109#ifndef __APPLE__
110 return std::hash<std::thread::id>()(x);
111#else
112 return *reinterpret_cast<thread_id_hash_t const*>(&x);
113#endif
114 }
115 };
116} }
117#else
118// Use a nice trick from this answer: http://stackoverflow.com/a/8438730/21475
119// In order to get a numeric thread ID in a platform-independent way, we use a thread-local
120// static variable's address as a thread identifier :-)
121#if defined(__GNUC__) || defined(__INTEL_COMPILER)
122#define MOODYCAMEL_THREADLOCAL __thread
123#elif defined(_MSC_VER)
124#define MOODYCAMEL_THREADLOCAL __declspec(thread)
125#else
126// Assume C++11 compliant compiler
127#define MOODYCAMEL_THREADLOCAL thread_local
128#endif
129namespace duckdb_moodycamel { namespace details {
130 typedef std::uintptr_t thread_id_t;
131 static const thread_id_t invalid_thread_id = 0; // Address can't be nullptr
132#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
133 static const thread_id_t invalid_thread_id2 = 1; // Member accesses off a null pointer are also generally invalid. Plus it's not aligned.
134#endif
135 inline thread_id_t thread_id() { static MOODYCAMEL_THREADLOCAL int x; return reinterpret_cast<thread_id_t>(&x); }
136} }
137#endif
138
139// Constexpr if
140#ifndef MOODYCAMEL_CONSTEXPR_IF
141#if (defined(_MSC_VER) && defined(_HAS_CXX17) && _HAS_CXX17) || __cplusplus > 201402L
142#define MOODYCAMEL_CONSTEXPR_IF if constexpr
143#define MOODYCAMEL_MAYBE_UNUSED [[maybe_unused]]
144#else
145#define MOODYCAMEL_CONSTEXPR_IF if
146#define MOODYCAMEL_MAYBE_UNUSED
147#endif
148#endif
149
150// Exceptions
151#ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
152#if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__))
153#define MOODYCAMEL_EXCEPTIONS_ENABLED
154#endif
155#endif
156#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
157#define MOODYCAMEL_TRY try
158#define MOODYCAMEL_CATCH(...) catch(__VA_ARGS__)
159#define MOODYCAMEL_RETHROW throw
160#define MOODYCAMEL_THROW(expr) throw (expr)
161#else
162#define MOODYCAMEL_TRY MOODYCAMEL_CONSTEXPR_IF (true)
163#define MOODYCAMEL_CATCH(...) else MOODYCAMEL_CONSTEXPR_IF (false)
164#define MOODYCAMEL_RETHROW
165#define MOODYCAMEL_THROW(expr)
166#endif
167
168#ifndef MOODYCAMEL_NOEXCEPT
169#if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED)
170#define MOODYCAMEL_NOEXCEPT
171#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true
172#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true
173#elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800
174// VS2012's std::is_nothrow_[move_]constructible is broken and returns true when it shouldn't :-(
175// We have to assume *all* non-trivial constructors may throw on VS2012!
176#define MOODYCAMEL_NOEXCEPT _NOEXCEPT
177#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value)
178#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
179#elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900
180#define MOODYCAMEL_NOEXCEPT _NOEXCEPT
181#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) (std::is_rvalue_reference<valueType>::value && std::is_move_constructible<type>::value ? std::is_trivially_move_constructible<type>::value || std::is_nothrow_move_constructible<type>::value : std::is_trivially_copy_constructible<type>::value || std::is_nothrow_copy_constructible<type>::value)
182#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) ((std::is_rvalue_reference<valueType>::value && std::is_move_assignable<type>::value ? std::is_trivially_move_assignable<type>::value || std::is_nothrow_move_assignable<type>::value : std::is_trivially_copy_assignable<type>::value || std::is_nothrow_copy_assignable<type>::value) && MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
183#else
184#define MOODYCAMEL_NOEXCEPT noexcept
185#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr)
186#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr)
187#endif
188#endif
189
190#ifndef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
191#ifdef MCDBGQ_USE_RELACY
192#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
193#else
194// VS2013 doesn't support `thread_local`, and MinGW-w64 w/ POSIX threading has a crippling bug: http://sourceforge.net/p/mingw-w64/bugs/445
195// g++ <=4.7 doesn't support thread_local either.
196// Finally, iOS/ARM doesn't have support for it either, and g++/ARM allows it to compile but it's unconfirmed to actually work
197#if (!defined(_MSC_VER) || _MSC_VER >= 1900) && (!defined(__MINGW32__) && !defined(__MINGW64__) || !defined(__WINPTHREADS_VERSION)) && (!defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && !defined(_M_ARM) && !defined(__aarch64__)
198// Assume `thread_local` is fully supported in all other C++11 compilers/platforms
199//#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED // always disabled for now since several users report having problems with it on
200#endif
201#endif
202#endif
203
204// VS2012 doesn't support deleted functions.
205// In this case, we declare the function normally but don't define it. A link error will be generated if the function is called.
206#ifndef MOODYCAMEL_DELETE_FUNCTION
207#if defined(_MSC_VER) && _MSC_VER < 1800
208#define MOODYCAMEL_DELETE_FUNCTION
209#else
210#define MOODYCAMEL_DELETE_FUNCTION = delete
211#endif
212#endif
213
214#ifndef MOODYCAMEL_ALIGNAS
215// VS2013 doesn't support alignas or alignof
216#if defined(_MSC_VER) && _MSC_VER <= 1800
217#define MOODYCAMEL_ALIGNAS(alignment) __declspec(align(alignment))
218#define MOODYCAMEL_ALIGNOF(obj) __alignof(obj)
219#else
220#define MOODYCAMEL_ALIGNAS(alignment) alignas(alignment)
221#define MOODYCAMEL_ALIGNOF(obj) alignof(obj)
222#endif
223#endif
224
225
226
227// Compiler-specific likely/unlikely hints
228namespace duckdb_moodycamel { namespace details {
229
230#if defined(__GNUC__)
231 static inline bool (likely)(bool x) { return __builtin_expect((x), true); }
232// static inline bool (unlikely)(bool x) { return __builtin_expect((x), false); }
233#else
234 static inline bool (likely)(bool x) { return x; }
235// static inline bool (unlikely)(bool x) { return x; }
236#endif
237} }
238
239namespace duckdb_moodycamel {
240namespace details {
241 template<typename T>
242 struct const_numeric_max {
243 static_assert(std::is_integral<T>::value, "const_numeric_max can only be used with integers");
244 static const T value = std::numeric_limits<T>::is_signed
245 ? (static_cast<T>(1) << (sizeof(T) * CHAR_BIT - 1)) - static_cast<T>(1)
246 : static_cast<T>(-1);
247 };
248
249#if defined(__GLIBCXX__)
250 typedef ::max_align_t std_max_align_t; // libstdc++ forgot to add it to std:: for a while
251#else
252 typedef std::max_align_t std_max_align_t; // Others (e.g. MSVC) insist it can *only* be accessed via std::
253#endif
254
255 // Some platforms have incorrectly set max_align_t to a type with <8 bytes alignment even while supporting
256 // 8-byte aligned scalar values (*cough* 32-bit iOS). Work around this with our own union. See issue #64.
257 typedef union {
258 std_max_align_t x;
259 long long y;
260 void* z;
261 } max_align_t;
262}
263
264// Default traits for the ConcurrentQueue. To change some of the
265// traits without re-implementing all of them, inherit from this
266// struct and shadow the declarations you wish to be different;
267// since the traits are used as a template type parameter, the
268// shadowed declarations will be used where defined, and the defaults
269// otherwise.
270struct ConcurrentQueueDefaultTraits
271{
272 // General-purpose size type. std::size_t is strongly recommended.
273 typedef std::size_t size_t;
274
275 // The type used for the enqueue and dequeue indices. Must be at least as
276 // large as size_t. Should be significantly larger than the number of elements
277 // you expect to hold at once, especially if you have a high turnover rate;
278 // for example, on 32-bit x86, if you expect to have over a hundred million
279 // elements or pump several million elements through your queue in a very
280 // short space of time, using a 32-bit type *may* trigger a race condition.
281 // A 64-bit int type is recommended in that case, and in practice will
282 // prevent a race condition no matter the usage of the queue. Note that
283 // whether the queue is lock-free with a 64-int type depends on the whether
284 // std::atomic<std::uint64_t> is lock-free, which is platform-specific.
285 typedef std::size_t index_t;
286
287 // Internally, all elements are enqueued and dequeued from multi-element
288 // blocks; this is the smallest controllable unit. If you expect few elements
289 // but many producers, a smaller block size should be favoured. For few producers
290 // and/or many elements, a larger block size is preferred. A sane default
291 // is provided. Must be a power of 2.
292 static const size_t BLOCK_SIZE = 32;
293
294 // For explicit producers (i.e. when using a producer token), the block is
295 // checked for being empty by iterating through a list of flags, one per element.
296 // For large block sizes, this is too inefficient, and switching to an atomic
297 // counter-based approach is faster. The switch is made for block sizes strictly
298 // larger than this threshold.
299 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32;
300
301 // How many full blocks can be expected for a single explicit producer? This should
302 // reflect that number's maximum for optimal performance. Must be a power of 2.
303 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32;
304
305 // How many full blocks can be expected for a single implicit producer? This should
306 // reflect that number's maximum for optimal performance. Must be a power of 2.
307 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32;
308
309 // The initial size of the hash table mapping thread IDs to implicit producers.
310 // Note that the hash is resized every time it becomes half full.
311 // Must be a power of two, and either 0 or at least 1. If 0, implicit production
312 // (using the enqueue methods without an explicit producer token) is disabled.
313 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 32;
314
315 // Controls the number of items that an explicit consumer (i.e. one with a token)
316 // must consume before it causes all consumers to rotate and move on to the next
317 // internal queue.
318 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = 256;
319
320 // The maximum number of elements (inclusive) that can be enqueued to a sub-queue.
321 // Enqueue operations that would cause this limit to be surpassed will fail. Note
322 // that this limit is enforced at the block level (for performance reasons), i.e.
323 // it's rounded up to the nearest block size.
324 static const size_t MAX_SUBQUEUE_SIZE = details::const_numeric_max<size_t>::value;
325
326
327#ifndef MCDBGQ_USE_RELACY
328 // Memory allocation can be customized if needed.
329 // malloc should return nullptr on failure, and handle alignment like std::malloc.
330#if defined(malloc) || defined(free)
331 // Gah, this is 2015, stop defining macros that break standard code already!
332 // Work around malloc/free being special macros:
333 static inline void* WORKAROUND_malloc(size_t size) { return malloc(size); }
334 static inline void WORKAROUND_free(void* ptr) { return free(ptr); }
335 static inline void* (malloc)(size_t size) { return WORKAROUND_malloc(size); }
336 static inline void (free)(void* ptr) { return WORKAROUND_free(ptr); }
337#else
338 static inline void* malloc(size_t size) { return std::malloc(size: size); }
339 static inline void free(void* ptr) { return std::free(ptr: ptr); }
340#endif
341#else
342 // Debug versions when running under the Relacy race detector (ignore
343 // these in user code)
344 static inline void* malloc(size_t size) { return rl::rl_malloc(size, $); }
345 static inline void free(void* ptr) { return rl::rl_free(ptr, $); }
346#endif
347};
348
349
350// When producing or consuming many elements, the most efficient way is to:
351// 1) Use one of the bulk-operation methods of the queue with a token
352// 2) Failing that, use the bulk-operation methods without a token
353// 3) Failing that, create a token and use that with the single-item methods
354// 4) Failing that, use the single-parameter methods of the queue
355// Having said that, don't create tokens willy-nilly -- ideally there should be
356// a maximum of one token per thread (of each kind).
357struct ProducerToken;
358struct ConsumerToken;
359
360template<typename T, typename Traits> class ConcurrentQueue;
361template<typename T, typename Traits> class BlockingConcurrentQueue;
362class ConcurrentQueueTests;
363
364
365namespace details
366{
367 struct ConcurrentQueueProducerTypelessBase
368 {
369 ConcurrentQueueProducerTypelessBase* next;
370 std::atomic<bool> inactive;
371 ProducerToken* token;
372
373 ConcurrentQueueProducerTypelessBase()
374 : next(nullptr), inactive(false), token(nullptr)
375 {
376 }
377 };
378
379 template<bool use32> struct _hash_32_or_64 {
380 static inline std::uint32_t hash(std::uint32_t h)
381 {
382 // MurmurHash3 finalizer -- see https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp
383 // Since the thread ID is already unique, all we really want to do is propagate that
384 // uniqueness evenly across all the bits, so that we can use a subset of the bits while
385 // reducing collisions significantly
386 h ^= h >> 16;
387 h *= 0x85ebca6b;
388 h ^= h >> 13;
389 h *= 0xc2b2ae35;
390 return h ^ (h >> 16);
391 }
392 };
393 template<> struct _hash_32_or_64<1> {
394 static inline std::uint64_t hash(std::uint64_t h)
395 {
396 h ^= h >> 33;
397 h *= 0xff51afd7ed558ccd;
398 h ^= h >> 33;
399 h *= 0xc4ceb9fe1a85ec53;
400 return h ^ (h >> 33);
401 }
402 };
403 template<std::size_t size> struct hash_32_or_64 : public _hash_32_or_64<(size > 4)> { };
404
405 static inline size_t hash_thread_id(thread_id_t id)
406 {
407 static_assert(sizeof(thread_id_t) <= 8, "Expected a platform where thread IDs are at most 64-bit values");
408 return static_cast<size_t>(hash_32_or_64<sizeof(thread_id_converter<thread_id_t>::thread_id_hash_t)>::hash(
409 h: thread_id_converter<thread_id_t>::prehash(x: id)));
410 }
411
412 template<typename T>
413 static inline bool circular_less_than(T a, T b)
414 {
415#ifdef _MSC_VER
416#pragma warning(push)
417#pragma warning(disable: 4554)
418#endif
419 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "circular_less_than is intended to be used only with unsigned integer types");
420 return static_cast<T>(a - b) > static_cast<T>(static_cast<T>(1) << static_cast<T>(sizeof(T) * CHAR_BIT - 1));
421#ifdef _MSC_VER
422#pragma warning(pop)
423#endif
424 }
425
426 template<typename U>
427 static inline char* align_for(char* ptr)
428 {
429 const std::size_t alignment = std::alignment_of<U>::value;
430 return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
431 }
432
433 template<typename T>
434 static inline T ceil_to_pow_2(T x)
435 {
436 static_assert(std::is_integral<T>::value && !std::numeric_limits<T>::is_signed, "ceil_to_pow_2 is intended to be used only with unsigned integer types");
437
438 // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
439 --x;
440 x |= x >> 1;
441 x |= x >> 2;
442 x |= x >> 4;
443 for (std::size_t i = 1; i < sizeof(T); i <<= 1) {
444 x |= x >> (i << 3);
445 }
446 ++x;
447 return x;
448 }
449
450 template<typename T>
451 static inline void swap_relaxed(std::atomic<T>& left, std::atomic<T>& right)
452 {
453 T temp = std::move(left.load(std::memory_order_relaxed));
454 left.store(std::move(right.load(std::memory_order_relaxed)), std::memory_order_relaxed);
455 right.store(std::move(temp), std::memory_order_relaxed);
456 }
457
458 template<typename T>
459 static inline T const& nomove(T const& x)
460 {
461 return x;
462 }
463
464 template<bool Enable>
465 struct nomove_if
466 {
467 template<typename T>
468 static inline T const& eval(T const& x)
469 {
470 return x;
471 }
472 };
473
474 template<>
475 struct nomove_if<false>
476 {
477 template<typename U>
478 static inline auto eval(U&& x)
479 -> decltype(std::forward<U>(x))
480 {
481 return std::forward<U>(x);
482 }
483 };
484
485 template<typename It>
486 static inline auto deref_noexcept(It& it) MOODYCAMEL_NOEXCEPT -> decltype(*it)
487 {
488 return *it;
489 }
490
491#if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
492 template<typename T> struct is_trivially_destructible : std::is_trivially_destructible<T> { };
493#else
494 template<typename T> struct is_trivially_destructible : std::has_trivial_destructor<T> { };
495#endif
496
497#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
498#ifdef MCDBGQ_USE_RELACY
499 typedef RelacyThreadExitListener ThreadExitListener;
500 typedef RelacyThreadExitNotifier ThreadExitNotifier;
501#else
502 struct ThreadExitListener
503 {
504 typedef void (*callback_t)(void*);
505 callback_t callback;
506 void* userData;
507
508 ThreadExitListener* next; // reserved for use by the ThreadExitNotifier
509 };
510
511
512 class ThreadExitNotifier
513 {
514 public:
515 static void subscribe(ThreadExitListener* listener)
516 {
517 auto& tlsInst = instance();
518 listener->next = tlsInst.tail;
519 tlsInst.tail = listener;
520 }
521
522 static void unsubscribe(ThreadExitListener* listener)
523 {
524 auto& tlsInst = instance();
525 ThreadExitListener** prev = &tlsInst.tail;
526 for (auto ptr = tlsInst.tail; ptr != nullptr; ptr = ptr->next) {
527 if (ptr == listener) {
528 *prev = ptr->next;
529 break;
530 }
531 prev = &ptr->next;
532 }
533 }
534
535 private:
536 ThreadExitNotifier() : tail(nullptr) { }
537 ThreadExitNotifier(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
538 ThreadExitNotifier& operator=(ThreadExitNotifier const&) MOODYCAMEL_DELETE_FUNCTION;
539
540 ~ThreadExitNotifier()
541 {
542 // This thread is about to exit, let everyone know!
543 assert(this == &instance() && "If this assert fails, you likely have a buggy compiler! Change the preprocessor conditions such that MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
544 for (auto ptr = tail; ptr != nullptr; ptr = ptr->next) {
545 ptr->callback(ptr->userData);
546 }
547 }
548
549 // Thread-local
550 static inline ThreadExitNotifier& instance()
551 {
552 static thread_local ThreadExitNotifier notifier;
553 return notifier;
554 }
555
556 private:
557 ThreadExitListener* tail;
558 };
559#endif
560#endif
561
562 template<typename T> struct static_is_lock_free_num { enum { value = 0 }; };
563 template<> struct static_is_lock_free_num<signed char> { enum { value = ATOMIC_CHAR_LOCK_FREE }; };
564 template<> struct static_is_lock_free_num<short> { enum { value = ATOMIC_SHORT_LOCK_FREE }; };
565 template<> struct static_is_lock_free_num<int> { enum { value = ATOMIC_INT_LOCK_FREE }; };
566 template<> struct static_is_lock_free_num<long> { enum { value = ATOMIC_LONG_LOCK_FREE }; };
567 template<> struct static_is_lock_free_num<long long> { enum { value = ATOMIC_LLONG_LOCK_FREE }; };
568 template<typename T> struct static_is_lock_free : static_is_lock_free_num<typename std::make_signed<T>::type> { };
569 template<> struct static_is_lock_free<bool> { enum { value = ATOMIC_BOOL_LOCK_FREE }; };
570 template<typename U> struct static_is_lock_free<U*> { enum { value = ATOMIC_POINTER_LOCK_FREE }; };
571}
572
573
574struct ProducerToken
575{
576 template<typename T, typename Traits>
577 explicit ProducerToken(ConcurrentQueue<T, Traits>& queue);
578
579 template<typename T, typename Traits>
580 explicit ProducerToken(BlockingConcurrentQueue<T, Traits>& queue);
581
582 ProducerToken(ProducerToken&& other) MOODYCAMEL_NOEXCEPT
583 : producer(other.producer)
584 {
585 other.producer = nullptr;
586 if (producer != nullptr) {
587 producer->token = this;
588 }
589 }
590
591 inline ProducerToken& operator=(ProducerToken&& other) MOODYCAMEL_NOEXCEPT
592 {
593 swap(other);
594 return *this;
595 }
596
597 void swap(ProducerToken& other) MOODYCAMEL_NOEXCEPT
598 {
599 std::swap(a&: producer, b&: other.producer);
600 if (producer != nullptr) {
601 producer->token = this;
602 }
603 if (other.producer != nullptr) {
604 other.producer->token = &other;
605 }
606 }
607
608 // A token is always valid unless:
609 // 1) Memory allocation failed during construction
610 // 2) It was moved via the move constructor
611 // (Note: assignment does a swap, leaving both potentially valid)
612 // 3) The associated queue was destroyed
613 // Note that if valid() returns true, that only indicates
614 // that the token is valid for use with a specific queue,
615 // but not which one; that's up to the user to track.
616 inline bool valid() const { return producer != nullptr; }
617
618 ~ProducerToken()
619 {
620 if (producer != nullptr) {
621 producer->token = nullptr;
622 producer->inactive.store(i: true, m: std::memory_order_release);
623 }
624 }
625
626 // Disable copying and assignment
627 ProducerToken(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION;
628 ProducerToken& operator=(ProducerToken const&) MOODYCAMEL_DELETE_FUNCTION;
629
630private:
631 template<typename T, typename Traits> friend class ConcurrentQueue;
632 friend class ConcurrentQueueTests;
633
634protected:
635 details::ConcurrentQueueProducerTypelessBase* producer;
636};
637
638
639struct ConsumerToken
640{
641 template<typename T, typename Traits>
642 explicit ConsumerToken(ConcurrentQueue<T, Traits>& q);
643
644 template<typename T, typename Traits>
645 explicit ConsumerToken(BlockingConcurrentQueue<T, Traits>& q);
646
647 ConsumerToken(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT
648 : initialOffset(other.initialOffset), lastKnownGlobalOffset(other.lastKnownGlobalOffset), itemsConsumedFromCurrent(other.itemsConsumedFromCurrent), currentProducer(other.currentProducer), desiredProducer(other.desiredProducer)
649 {
650 }
651
652 inline ConsumerToken& operator=(ConsumerToken&& other) MOODYCAMEL_NOEXCEPT
653 {
654 swap(other);
655 return *this;
656 }
657
658 void swap(ConsumerToken& other) MOODYCAMEL_NOEXCEPT
659 {
660 std::swap(a&: initialOffset, b&: other.initialOffset);
661 std::swap(a&: lastKnownGlobalOffset, b&: other.lastKnownGlobalOffset);
662 std::swap(a&: itemsConsumedFromCurrent, b&: other.itemsConsumedFromCurrent);
663 std::swap(a&: currentProducer, b&: other.currentProducer);
664 std::swap(a&: desiredProducer, b&: other.desiredProducer);
665 }
666
667 // Disable copying and assignment
668 ConsumerToken(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION;
669 ConsumerToken& operator=(ConsumerToken const&) MOODYCAMEL_DELETE_FUNCTION;
670
671private:
672 template<typename T, typename Traits> friend class ConcurrentQueue;
673 friend class ConcurrentQueueTests;
674
675private: // but shared with ConcurrentQueue
676 std::uint32_t initialOffset;
677 std::uint32_t lastKnownGlobalOffset;
678 std::uint32_t itemsConsumedFromCurrent;
679 details::ConcurrentQueueProducerTypelessBase* currentProducer;
680 details::ConcurrentQueueProducerTypelessBase* desiredProducer;
681};
682
683// Need to forward-declare this swap because it's in a namespace.
684// See http://stackoverflow.com/questions/4492062/why-does-a-c-friend-class-need-a-forward-declaration-only-in-other-namespaces
685template<typename T, typename Traits>
686inline void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& a, typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& b) MOODYCAMEL_NOEXCEPT;
687
688
689template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
690class ConcurrentQueue
691{
692public:
693 typedef ::duckdb_moodycamel::ProducerToken producer_token_t;
694 typedef ::duckdb_moodycamel::ConsumerToken consumer_token_t;
695
696 typedef typename Traits::index_t index_t;
697 typedef typename Traits::size_t size_t;
698
699 static const size_t BLOCK_SIZE = static_cast<size_t>(Traits::BLOCK_SIZE);
700 static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);
701 static const size_t EXPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE);
702 static const size_t IMPLICIT_INITIAL_INDEX_SIZE = static_cast<size_t>(Traits::IMPLICIT_INITIAL_INDEX_SIZE);
703 static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = static_cast<size_t>(Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE);
704 static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = static_cast<std::uint32_t>(Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);
705#ifdef _MSC_VER
706#pragma warning(push)
707#pragma warning(disable: 4307) // + integral constant overflow (that's what the ternary expression is for!)
708#pragma warning(disable: 4309) // static_cast: Truncation of constant value
709#endif
710 static const size_t MAX_SUBQUEUE_SIZE = (details::const_numeric_max<size_t>::value - static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) < BLOCK_SIZE) ? details::const_numeric_max<size_t>::value : ((static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) + (BLOCK_SIZE - 1)) / BLOCK_SIZE * BLOCK_SIZE);
711#ifdef _MSC_VER
712#pragma warning(pop)
713#endif
714
715 static_assert(!std::numeric_limits<size_t>::is_signed && std::is_integral<size_t>::value, "Traits::size_t must be an unsigned integral type");
716 static_assert(!std::numeric_limits<index_t>::is_signed && std::is_integral<index_t>::value, "Traits::index_t must be an unsigned integral type");
717 static_assert(sizeof(index_t) >= sizeof(size_t), "Traits::index_t must be at least as wide as Traits::size_t");
718 static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)), "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)");
719 static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) && !(EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD & (EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD - 1)), "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a power of 2 (and greater than 1)");
720 static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) && !(EXPLICIT_INITIAL_INDEX_SIZE & (EXPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
721 static_assert((IMPLICIT_INITIAL_INDEX_SIZE > 1) && !(IMPLICIT_INITIAL_INDEX_SIZE & (IMPLICIT_INITIAL_INDEX_SIZE - 1)), "Traits::IMPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and greater than 1)");
722 static_assert((INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) || !(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE & (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE - 1)), "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be a power of 2");
723 static_assert(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0 || INITIAL_IMPLICIT_PRODUCER_HASH_SIZE >= 1, "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be at least 1 (or 0 to disable implicit enqueueing)");
724
725public:
726 // Creates a queue with at least `capacity` element slots; note that the
727 // actual number of elements that can be inserted without additional memory
728 // allocation depends on the number of producers and the block size (e.g. if
729 // the block size is equal to `capacity`, only a single block will be allocated
730 // up-front, which means only a single producer will be able to enqueue elements
731 // without an extra allocation -- blocks aren't shared between producers).
732 // This method is not thread safe -- it is up to the user to ensure that the
733 // queue is fully constructed before it starts being used by other threads (this
734 // includes making the memory effects of construction visible, possibly with a
735 // memory barrier).
736 explicit ConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
737 : producerListTail(nullptr),
738 producerCount(0),
739 initialBlockPoolIndex(0),
740 nextExplicitConsumerId(0),
741 globalExplicitConsumerOffset(0)
742 {
743 implicitProducerHashResizeInProgress.clear(m: std::memory_order_relaxed);
744 populate_initial_implicit_producer_hash();
745 populate_initial_block_list(blockCount: capacity / BLOCK_SIZE + ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1));
746
747#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
748 // Track all the producers using a fully-resolved typed list for
749 // each kind; this makes it possible to debug them starting from
750 // the root queue object (otherwise wacky casts are needed that
751 // don't compile in the debugger's expression evaluator).
752 explicitProducers.store(nullptr, std::memory_order_relaxed);
753 implicitProducers.store(nullptr, std::memory_order_relaxed);
754#endif
755 }
756
757 // Computes the correct amount of pre-allocated blocks for you based
758 // on the minimum number of elements you want available at any given
759 // time, and the maximum concurrent number of each type of producer.
760 ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
761 : producerListTail(nullptr),
762 producerCount(0),
763 initialBlockPoolIndex(0),
764 nextExplicitConsumerId(0),
765 globalExplicitConsumerOffset(0)
766 {
767 implicitProducerHashResizeInProgress.clear(m: std::memory_order_relaxed);
768 populate_initial_implicit_producer_hash();
769 size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) * (maxExplicitProducers + 1) + 2 * (maxExplicitProducers + maxImplicitProducers);
770 populate_initial_block_list(blockCount: blocks);
771
772#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
773 explicitProducers.store(nullptr, std::memory_order_relaxed);
774 implicitProducers.store(nullptr, std::memory_order_relaxed);
775#endif
776 }
777
778 // Note: The queue should not be accessed concurrently while it's
779 // being deleted. It's up to the user to synchronize this.
780 // This method is not thread safe.
781 ~ConcurrentQueue()
782 {
783 // Destroy producers
784 auto ptr = producerListTail.load(std::memory_order_relaxed);
785 while (ptr != nullptr) {
786 auto next = ptr->next_prod();
787 if (ptr->token != nullptr) {
788 ptr->token->producer = nullptr;
789 }
790 destroy(ptr);
791 ptr = next;
792 }
793
794 // Destroy implicit producer hash tables
795 MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE != 0) {
796 auto hash = implicitProducerHash.load(std::memory_order_relaxed);
797 while (hash != nullptr) {
798 auto prev = hash->prev;
799 if (prev != nullptr) { // The last hash is part of this object and was not allocated dynamically
800 for (size_t i = 0; i != hash->capacity; ++i) {
801 hash->entries[i].~ImplicitProducerKVP();
802 }
803 hash->~ImplicitProducerHash();
804 (Traits::free)(hash);
805 }
806 hash = prev;
807 }
808 }
809
810 // Destroy global free list
811 auto block = freeList.head_unsafe();
812 while (block != nullptr) {
813 auto next = block->freeListNext.load(std::memory_order_relaxed);
814 if (block->dynamicallyAllocated) {
815 destroy(block);
816 }
817 block = next;
818 }
819
820 // Destroy initial free list
821 destroy_array(initialBlockPool, initialBlockPoolSize);
822 }
823
824 // Disable copying and copy assignment
825 ConcurrentQueue(ConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
826 ConcurrentQueue& operator=(ConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
827
828 // Moving is supported, but note that it is *not* a thread-safe operation.
829 // Nobody can use the queue while it's being moved, and the memory effects
830 // of that move must be propagated to other threads before they can use it.
831 // Note: When a queue is moved, its tokens are still valid but can only be
832 // used with the destination queue (i.e. semantically they are moved along
833 // with the queue itself).
834 ConcurrentQueue(ConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
835 : producerListTail(other.producerListTail.load(std::memory_order_relaxed)),
836 producerCount(other.producerCount.load(std::memory_order_relaxed)),
837 initialBlockPoolIndex(other.initialBlockPoolIndex.load(std::memory_order_relaxed)),
838 initialBlockPool(other.initialBlockPool),
839 initialBlockPoolSize(other.initialBlockPoolSize),
840 freeList(std::move(other.freeList)),
841 nextExplicitConsumerId(other.nextExplicitConsumerId.load(std::memory_order_relaxed)),
842 globalExplicitConsumerOffset(other.globalExplicitConsumerOffset.load(std::memory_order_relaxed))
843 {
844 // Move the other one into this, and leave the other one as an empty queue
845 implicitProducerHashResizeInProgress.clear(m: std::memory_order_relaxed);
846 populate_initial_implicit_producer_hash();
847 swap_implicit_producer_hashes(other);
848
849 other.producerListTail.store(nullptr, std::memory_order_relaxed);
850 other.producerCount.store(0, std::memory_order_relaxed);
851 other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
852 other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
853
854#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
855 explicitProducers.store(other.explicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
856 other.explicitProducers.store(nullptr, std::memory_order_relaxed);
857 implicitProducers.store(other.implicitProducers.load(std::memory_order_relaxed), std::memory_order_relaxed);
858 other.implicitProducers.store(nullptr, std::memory_order_relaxed);
859#endif
860
861 other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
862 other.initialBlockPoolSize = 0;
863 other.initialBlockPool = nullptr;
864
865 reown_producers();
866 }
867
868 inline ConcurrentQueue& operator=(ConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
869 {
870 return swap_internal(other);
871 }
872
873 // Swaps this queue's state with the other's. Not thread-safe.
874 // Swapping two queues does not invalidate their tokens, however
875 // the tokens that were created for one queue must be used with
876 // only the swapped queue (i.e. the tokens are tied to the
877 // queue's movable state, not the object itself).
878 inline void swap(ConcurrentQueue& other) MOODYCAMEL_NOEXCEPT
879 {
880 swap_internal(other);
881 }
882
883private:
884 ConcurrentQueue& swap_internal(ConcurrentQueue& other)
885 {
886 if (this == &other) {
887 return *this;
888 }
889
890 details::swap_relaxed(producerListTail, other.producerListTail);
891 details::swap_relaxed(producerCount, other.producerCount);
892 details::swap_relaxed(initialBlockPoolIndex, other.initialBlockPoolIndex);
893 std::swap(initialBlockPool, other.initialBlockPool);
894 std::swap(initialBlockPoolSize, other.initialBlockPoolSize);
895 freeList.swap(other.freeList);
896 details::swap_relaxed(nextExplicitConsumerId, other.nextExplicitConsumerId);
897 details::swap_relaxed(globalExplicitConsumerOffset, other.globalExplicitConsumerOffset);
898
899 swap_implicit_producer_hashes(other);
900
901 reown_producers();
902 other.reown_producers();
903
904#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
905 details::swap_relaxed(explicitProducers, other.explicitProducers);
906 details::swap_relaxed(implicitProducers, other.implicitProducers);
907#endif
908
909 return *this;
910 }
911
912public:
913 // Enqueues a single item (by copying it).
914 // Allocates memory if required. Only fails if memory allocation fails (or implicit
915 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
916 // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
917 // Thread-safe.
918 inline bool enqueue(T const& item)
919 {
920 MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
921 else return inner_enqueue<CanAlloc>(item);
922 }
923
924 // Enqueues a single item (by moving it, if possible).
925 // Allocates memory if required. Only fails if memory allocation fails (or implicit
926 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
927 // or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
928 // Thread-safe.
929 inline bool enqueue(T&& item)
930 {
931 MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
932 else return inner_enqueue<CanAlloc>(std::move(item));
933 }
934
935 // Enqueues a single item (by copying it) using an explicit producer token.
936 // Allocates memory if required. Only fails if memory allocation fails (or
937 // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
938 // Thread-safe.
939 inline bool enqueue(producer_token_t const& token, T const& item)
940 {
941 return inner_enqueue<CanAlloc>(token, item);
942 }
943
944 // Enqueues a single item (by moving it, if possible) using an explicit producer token.
945 // Allocates memory if required. Only fails if memory allocation fails (or
946 // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
947 // Thread-safe.
948 inline bool enqueue(producer_token_t const& token, T&& item)
949 {
950 return inner_enqueue<CanAlloc>(token, std::move(item));
951 }
952
953 // Enqueues several items.
954 // Allocates memory if required. Only fails if memory allocation fails (or
955 // implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
956 // is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
957 // Note: Use std::make_move_iterator if the elements should be moved instead of copied.
958 // Thread-safe.
959 template<typename It>
960 bool enqueue_bulk(It itemFirst, size_t count)
961 {
962 MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
963 else return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
964 }
965
966 // Enqueues several items using an explicit producer token.
967 // Allocates memory if required. Only fails if memory allocation fails
968 // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
969 // Note: Use std::make_move_iterator if the elements should be moved
970 // instead of copied.
971 // Thread-safe.
972 template<typename It>
973 bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
974 {
975 return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
976 }
977
978 // Enqueues a single item (by copying it).
979 // Does not allocate memory. Fails if not enough room to enqueue (or implicit
980 // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
981 // is 0).
982 // Thread-safe.
983 inline bool try_enqueue(T const& item)
984 {
985 MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
986 else return inner_enqueue<CannotAlloc>(item);
987 }
988
989 // Enqueues a single item (by moving it, if possible).
990 // Does not allocate memory (except for one-time implicit producer).
991 // Fails if not enough room to enqueue (or implicit production is
992 // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
993 // Thread-safe.
994 inline bool try_enqueue(T&& item)
995 {
996 MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
997 else return inner_enqueue<CannotAlloc>(std::move(item));
998 }
999
1000 // Enqueues a single item (by copying it) using an explicit producer token.
1001 // Does not allocate memory. Fails if not enough room to enqueue.
1002 // Thread-safe.
1003 inline bool try_enqueue(producer_token_t const& token, T const& item)
1004 {
1005 return inner_enqueue<CannotAlloc>(token, item);
1006 }
1007
1008 // Enqueues a single item (by moving it, if possible) using an explicit producer token.
1009 // Does not allocate memory. Fails if not enough room to enqueue.
1010 // Thread-safe.
1011 inline bool try_enqueue(producer_token_t const& token, T&& item)
1012 {
1013 return inner_enqueue<CannotAlloc>(token, std::move(item));
1014 }
1015
1016 // Enqueues several items.
1017 // Does not allocate memory (except for one-time implicit producer).
1018 // Fails if not enough room to enqueue (or implicit production is
1019 // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
1020 // Note: Use std::make_move_iterator if the elements should be moved
1021 // instead of copied.
1022 // Thread-safe.
1023 template<typename It>
1024 bool try_enqueue_bulk(It itemFirst, size_t count)
1025 {
1026 MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1027 else return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
1028 }
1029
1030 // Enqueues several items using an explicit producer token.
1031 // Does not allocate memory. Fails if not enough room to enqueue.
1032 // Note: Use std::make_move_iterator if the elements should be moved
1033 // instead of copied.
1034 // Thread-safe.
1035 template<typename It>
1036 bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
1037 {
1038 return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
1039 }
1040
1041
1042
1043 // Attempts to dequeue from the queue.
1044 // Returns false if all producer streams appeared empty at the time they
1045 // were checked (so, the queue is likely but not guaranteed to be empty).
1046 // Never allocates. Thread-safe.
1047 template<typename U>
1048 bool try_dequeue(U& item)
1049 {
1050 // Instead of simply trying each producer in turn (which could cause needless contention on the first
1051 // producer), we score them heuristically.
1052 size_t nonEmptyCount = 0;
1053 ProducerBase* best = nullptr;
1054 size_t bestSize = 0;
1055 for (auto ptr = producerListTail.load(std::memory_order_acquire); nonEmptyCount < 3 && ptr != nullptr; ptr = ptr->next_prod()) {
1056 auto size = ptr->size_approx();
1057 if (size > 0) {
1058 if (size > bestSize) {
1059 bestSize = size;
1060 best = ptr;
1061 }
1062 ++nonEmptyCount;
1063 }
1064 }
1065
1066 // If there was at least one non-empty queue but it appears empty at the time
1067 // we try to dequeue from it, we need to make sure every queue's been tried
1068 if (nonEmptyCount > 0) {
1069 if ((details::likely)(x: best->dequeue(item))) {
1070 return true;
1071 }
1072 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1073 if (ptr != best && ptr->dequeue(item)) {
1074 return true;
1075 }
1076 }
1077 }
1078 return false;
1079 }
1080
1081 // Attempts to dequeue from the queue.
1082 // Returns false if all producer streams appeared empty at the time they
1083 // were checked (so, the queue is likely but not guaranteed to be empty).
1084 // This differs from the try_dequeue(item) method in that this one does
1085 // not attempt to reduce contention by interleaving the order that producer
1086 // streams are dequeued from. So, using this method can reduce overall throughput
1087 // under contention, but will give more predictable results in single-threaded
1088 // consumer scenarios. This is mostly only useful for internal unit tests.
1089 // Never allocates. Thread-safe.
1090 template<typename U>
1091 bool try_dequeue_non_interleaved(U& item)
1092 {
1093 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1094 if (ptr->dequeue(item)) {
1095 return true;
1096 }
1097 }
1098 return false;
1099 }
1100
1101 // Attempts to dequeue from the queue using an explicit consumer token.
1102 // Returns false if all producer streams appeared empty at the time they
1103 // were checked (so, the queue is likely but not guaranteed to be empty).
1104 // Never allocates. Thread-safe.
1105 template<typename U>
1106 bool try_dequeue(consumer_token_t& token, U& item)
1107 {
1108 // The idea is roughly as follows:
1109 // Every 256 items from one producer, make everyone rotate (increase the global offset) -> this means the highest efficiency consumer dictates the rotation speed of everyone else, more or less
1110 // If you see that the global offset has changed, you must reset your consumption counter and move to your designated place
1111 // If there's no items where you're supposed to be, keep moving until you find a producer with some items
1112 // If the global offset has not changed but you've run out of items to consume, move over from your current position until you find an producer with something in it
1113
1114 if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(m: std::memory_order_relaxed)) {
1115 if (!update_current_producer_after_rotation(token)) {
1116 return false;
1117 }
1118 }
1119
1120 // If there was at least one non-empty queue but it appears empty at the time
1121 // we try to dequeue from it, we need to make sure every queue's been tried
1122 if (static_cast<ProducerBase*>(token.currentProducer)->dequeue(item)) {
1123 if (++token.itemsConsumedFromCurrent == EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1124 globalExplicitConsumerOffset.fetch_add(i: 1, m: std::memory_order_relaxed);
1125 }
1126 return true;
1127 }
1128
1129 auto tail = producerListTail.load(std::memory_order_acquire);
1130 auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod();
1131 if (ptr == nullptr) {
1132 ptr = tail;
1133 }
1134 while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
1135 if (ptr->dequeue(item)) {
1136 token.currentProducer = ptr;
1137 token.itemsConsumedFromCurrent = 1;
1138 return true;
1139 }
1140 ptr = ptr->next_prod();
1141 if (ptr == nullptr) {
1142 ptr = tail;
1143 }
1144 }
1145 return false;
1146 }
1147
1148 // Attempts to dequeue several elements from the queue.
1149 // Returns the number of items actually dequeued.
1150 // Returns 0 if all producer streams appeared empty at the time they
1151 // were checked (so, the queue is likely but not guaranteed to be empty).
1152 // Never allocates. Thread-safe.
1153 template<typename It>
1154 size_t try_dequeue_bulk(It itemFirst, size_t max)
1155 {
1156 size_t count = 0;
1157 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1158 count += ptr->dequeue_bulk(itemFirst, max - count);
1159 if (count == max) {
1160 break;
1161 }
1162 }
1163 return count;
1164 }
1165
1166 // Attempts to dequeue several elements from the queue using an explicit consumer token.
1167 // Returns the number of items actually dequeued.
1168 // Returns 0 if all producer streams appeared empty at the time they
1169 // were checked (so, the queue is likely but not guaranteed to be empty).
1170 // Never allocates. Thread-safe.
1171 template<typename It>
1172 size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
1173 {
1174 if (token.desiredProducer == nullptr || token.lastKnownGlobalOffset != globalExplicitConsumerOffset.load(m: std::memory_order_relaxed)) {
1175 if (!update_current_producer_after_rotation(token)) {
1176 return 0;
1177 }
1178 }
1179
1180 size_t count = static_cast<ProducerBase*>(token.currentProducer)->dequeue_bulk(itemFirst, max);
1181 if (count == max) {
1182 if ((token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(max)) >= EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE) {
1183 globalExplicitConsumerOffset.fetch_add(i: 1, m: std::memory_order_relaxed);
1184 }
1185 return max;
1186 }
1187 token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(count);
1188 max -= count;
1189
1190 auto tail = producerListTail.load(std::memory_order_acquire);
1191 auto ptr = static_cast<ProducerBase*>(token.currentProducer)->next_prod();
1192 if (ptr == nullptr) {
1193 ptr = tail;
1194 }
1195 while (ptr != static_cast<ProducerBase*>(token.currentProducer)) {
1196 auto dequeued = ptr->dequeue_bulk(itemFirst, max);
1197 count += dequeued;
1198 if (dequeued != 0) {
1199 token.currentProducer = ptr;
1200 token.itemsConsumedFromCurrent = static_cast<std::uint32_t>(dequeued);
1201 }
1202 if (dequeued == max) {
1203 break;
1204 }
1205 max -= dequeued;
1206 ptr = ptr->next_prod();
1207 if (ptr == nullptr) {
1208 ptr = tail;
1209 }
1210 }
1211 return count;
1212 }
1213
1214
1215
1216 // Attempts to dequeue from a specific producer's inner queue.
1217 // If you happen to know which producer you want to dequeue from, this
1218 // is significantly faster than using the general-case try_dequeue methods.
1219 // Returns false if the producer's queue appeared empty at the time it
1220 // was checked (so, the queue is likely but not guaranteed to be empty).
1221 // Never allocates. Thread-safe.
1222 template<typename U>
1223 inline bool try_dequeue_from_producer(producer_token_t const& producer, U& item)
1224 {
1225 return static_cast<ExplicitProducer*>(producer.producer)->dequeue(item);
1226 }
1227
1228 // Attempts to dequeue several elements from a specific producer's inner queue.
1229 // Returns the number of items actually dequeued.
1230 // If you happen to know which producer you want to dequeue from, this
1231 // is significantly faster than using the general-case try_dequeue methods.
1232 // Returns 0 if the producer's queue appeared empty at the time it
1233 // was checked (so, the queue is likely but not guaranteed to be empty).
1234 // Never allocates. Thread-safe.
1235 template<typename It>
1236 inline size_t try_dequeue_bulk_from_producer(producer_token_t const& producer, It itemFirst, size_t max)
1237 {
1238 return static_cast<ExplicitProducer*>(producer.producer)->dequeue_bulk(itemFirst, max);
1239 }
1240
1241
1242 // Returns an estimate of the total number of elements currently in the queue. This
1243 // estimate is only accurate if the queue has completely stabilized before it is called
1244 // (i.e. all enqueue and dequeue operations have completed and their memory effects are
1245 // visible on the calling thread, and no further operations start while this method is
1246 // being called).
1247 // Thread-safe.
1248 size_t size_approx() const
1249 {
1250 size_t size = 0;
1251 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
1252 size += ptr->size_approx();
1253 }
1254 return size;
1255 }
1256
1257
1258 // Returns true if the underlying atomic variables used by
1259 // the queue are lock-free (they should be on most platforms).
1260 // Thread-safe.
1261 static bool is_lock_free()
1262 {
1263 return
1264 details::static_is_lock_free<bool>::value == 2 &&
1265 details::static_is_lock_free<size_t>::value == 2 &&
1266 details::static_is_lock_free<std::uint32_t>::value == 2 &&
1267 details::static_is_lock_free<index_t>::value == 2 &&
1268 details::static_is_lock_free<void*>::value == 2 &&
1269 details::static_is_lock_free<typename details::thread_id_converter<details::thread_id_t>::thread_id_numeric_size_t>::value == 2;
1270 }
1271
1272
1273private:
1274 friend struct ProducerToken;
1275 friend struct ConsumerToken;
1276 struct ExplicitProducer;
1277 friend struct ExplicitProducer;
1278 struct ImplicitProducer;
1279 friend struct ImplicitProducer;
1280 friend class ConcurrentQueueTests;
1281
1282 enum AllocationMode { CanAlloc, CannotAlloc };
1283
1284
1285 ///////////////////////////////
1286 // Queue methods
1287 ///////////////////////////////
1288
1289 template<AllocationMode canAlloc, typename U>
1290 inline bool inner_enqueue(producer_token_t const& token, U&& element)
1291 {
1292 return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1293 }
1294
1295 template<AllocationMode canAlloc, typename U>
1296 inline bool inner_enqueue(U&& element)
1297 {
1298 auto producer = get_or_add_implicit_producer();
1299 return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue<canAlloc>(std::forward<U>(element));
1300 }
1301
1302 template<AllocationMode canAlloc, typename It>
1303 inline bool inner_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
1304 {
1305 return static_cast<ExplicitProducer*>(token.producer)->ConcurrentQueue::ExplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1306 }
1307
1308 template<AllocationMode canAlloc, typename It>
1309 inline bool inner_enqueue_bulk(It itemFirst, size_t count)
1310 {
1311 auto producer = get_or_add_implicit_producer();
1312 return producer == nullptr ? false : producer->ConcurrentQueue::ImplicitProducer::template enqueue_bulk<canAlloc>(itemFirst, count);
1313 }
1314
1315 inline bool update_current_producer_after_rotation(consumer_token_t& token)
1316 {
1317 // Ah, there's been a rotation, figure out where we should be!
1318 auto tail = producerListTail.load(std::memory_order_acquire);
1319 if (token.desiredProducer == nullptr && tail == nullptr) {
1320 return false;
1321 }
1322 auto prodCount = producerCount.load(m: std::memory_order_relaxed);
1323 auto globalOffset = globalExplicitConsumerOffset.load(m: std::memory_order_relaxed);
1324 if (token.desiredProducer == nullptr) {
1325 // Aha, first time we're dequeueing anything.
1326 // Figure out our local position
1327 // Note: offset is from start, not end, but we're traversing from end -- subtract from count first
1328 std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount);
1329 token.desiredProducer = tail;
1330 for (std::uint32_t i = 0; i != offset; ++i) {
1331 token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod();
1332 if (token.desiredProducer == nullptr) {
1333 token.desiredProducer = tail;
1334 }
1335 }
1336 }
1337
1338 std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
1339 if (delta >= prodCount) {
1340 delta = delta % prodCount;
1341 }
1342 for (std::uint32_t i = 0; i != delta; ++i) {
1343 token.desiredProducer = static_cast<ProducerBase*>(token.desiredProducer)->next_prod();
1344 if (token.desiredProducer == nullptr) {
1345 token.desiredProducer = tail;
1346 }
1347 }
1348
1349 token.lastKnownGlobalOffset = globalOffset;
1350 token.currentProducer = token.desiredProducer;
1351 token.itemsConsumedFromCurrent = 0;
1352 return true;
1353 }
1354
1355
1356 ///////////////////////////
1357 // Free list
1358 ///////////////////////////
1359
1360 template <typename N>
1361 struct FreeListNode
1362 {
1363 FreeListNode() : freeListRefs(0), freeListNext(nullptr) { }
1364
1365 std::atomic<std::uint32_t> freeListRefs;
1366 std::atomic<N*> freeListNext;
1367 };
1368
1369 // A simple CAS-based lock-free free list. Not the fastest thing in the world under heavy contention, but
1370 // simple and correct (assuming nodes are never freed until after the free list is destroyed), and fairly
1371 // speedy under low contention.
1372 template<typename N> // N must inherit FreeListNode or have the same fields (and initialization of them)
1373 struct FreeList
1374 {
1375 FreeList() : freeListHead(nullptr) { }
1376 FreeList(FreeList&& other) : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) { other.freeListHead.store(nullptr, std::memory_order_relaxed); }
1377 void swap(FreeList& other) { details::swap_relaxed(freeListHead, other.freeListHead); }
1378
1379 FreeList(FreeList const&) MOODYCAMEL_DELETE_FUNCTION;
1380 FreeList& operator=(FreeList const&) MOODYCAMEL_DELETE_FUNCTION;
1381
1382 inline void add(N* node)
1383 {
1384#ifdef MCDBGQ_NOLOCKFREE_FREELIST
1385 debug::DebugLock lock(mutex);
1386#endif
1387 // We know that the should-be-on-freelist bit is 0 at this point, so it's safe to
1388 // set it using a fetch_add
1389 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST, std::memory_order_acq_rel) == 0) {
1390 // Oh look! We were the last ones referencing this node, and we know
1391 // we want to add it to the free list, so let's do it!
1392 add_knowing_refcount_is_zero(node);
1393 }
1394 }
1395
1396 inline N* try_get()
1397 {
1398#ifdef MCDBGQ_NOLOCKFREE_FREELIST
1399 debug::DebugLock lock(mutex);
1400#endif
1401 auto head = freeListHead.load(std::memory_order_acquire);
1402 while (head != nullptr) {
1403 auto prevHead = head;
1404 auto refs = head->freeListRefs.load(std::memory_order_relaxed);
1405 if ((refs & REFS_MASK) == 0 || !head->freeListRefs.compare_exchange_strong(refs, refs + 1, std::memory_order_acquire, std::memory_order_relaxed)) {
1406 head = freeListHead.load(std::memory_order_acquire);
1407 continue;
1408 }
1409
1410 // Good, reference count has been incremented (it wasn't at zero), which means we can read the
1411 // next and not worry about it changing between now and the time we do the CAS
1412 auto next = head->freeListNext.load(std::memory_order_relaxed);
1413 if (freeListHead.compare_exchange_strong(head, next, std::memory_order_acquire, std::memory_order_relaxed)) {
1414 // Yay, got the node. This means it was on the list, which means shouldBeOnFreeList must be false no
1415 // matter the refcount (because nobody else knows it's been taken off yet, it can't have been put back on).
1416 assert((head->freeListRefs.load(std::memory_order_relaxed) & SHOULD_BE_ON_FREELIST) == 0);
1417
1418 // Decrease refcount twice, once for our ref, and once for the list's ref
1419 head->freeListRefs.fetch_sub(2, std::memory_order_release);
1420 return head;
1421 }
1422
1423 // OK, the head must have changed on us, but we still need to decrease the refcount we increased.
1424 // Note that we don't need to release any memory effects, but we do need to ensure that the reference
1425 // count decrement happens-after the CAS on the head.
1426 refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
1427 if (refs == SHOULD_BE_ON_FREELIST + 1) {
1428 add_knowing_refcount_is_zero(node: prevHead);
1429 }
1430 }
1431
1432 return nullptr;
1433 }
1434
1435 // Useful for traversing the list when there's no contention (e.g. to destroy remaining nodes)
1436 N* head_unsafe() const { return freeListHead.load(std::memory_order_relaxed); }
1437
1438 private:
1439 inline void add_knowing_refcount_is_zero(N* node)
1440 {
1441 // Since the refcount is zero, and nobody can increase it once it's zero (except us, and we run
1442 // only one copy of this method per node at a time, i.e. the single thread case), then we know
1443 // we can safely change the next pointer of the node; however, once the refcount is back above
1444 // zero, then other threads could increase it (happens under heavy contention, when the refcount
1445 // goes to zero in between a load and a refcount increment of a node in try_get, then back up to
1446 // something non-zero, then the refcount increment is done by the other thread) -- so, if the CAS
1447 // to add the node to the actual list fails, decrease the refcount and leave the add operation to
1448 // the next thread who puts the refcount back at zero (which could be us, hence the loop).
1449 auto head = freeListHead.load(std::memory_order_relaxed);
1450 while (true) {
1451 node->freeListNext.store(head, std::memory_order_relaxed);
1452 node->freeListRefs.store(1, std::memory_order_release);
1453 if (!freeListHead.compare_exchange_strong(head, node, std::memory_order_release, std::memory_order_relaxed)) {
1454 // Hmm, the add failed, but we can only try again when the refcount goes back to zero
1455 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1, std::memory_order_release) == 1) {
1456 continue;
1457 }
1458 }
1459 return;
1460 }
1461 }
1462
1463 private:
1464 // Implemented like a stack, but where node order doesn't matter (nodes are inserted out of order under contention)
1465 std::atomic<N*> freeListHead;
1466
1467 static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
1468 static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
1469
1470#ifdef MCDBGQ_NOLOCKFREE_FREELIST
1471 debug::DebugMutex mutex;
1472#endif
1473 };
1474
1475
1476 ///////////////////////////
1477 // Block
1478 ///////////////////////////
1479
1480 enum InnerQueueContext { implicit_context = 0, explicit_context = 1 };
1481
1482 struct Block
1483 {
1484 Block()
1485 : next(nullptr), elementsCompletelyDequeued(0), freeListRefs(0), freeListNext(nullptr), shouldBeOnFreeList(false), dynamicallyAllocated(true)
1486 {
1487#ifdef MCDBGQ_TRACKMEM
1488 owner = nullptr;
1489#endif
1490 }
1491
1492 template<InnerQueueContext context>
1493 inline bool is_empty() const
1494 {
1495 MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1496 // Check flags
1497 for (size_t i = 0; i < BLOCK_SIZE; ++i) {
1498 if (!emptyFlags[i].load(std::memory_order_relaxed)) {
1499 return false;
1500 }
1501 }
1502
1503 // Aha, empty; make sure we have all other memory effects that happened before the empty flags were set
1504 std::atomic_thread_fence(m: std::memory_order_acquire);
1505 return true;
1506 }
1507 else {
1508 // Check counter
1509 if (elementsCompletelyDequeued.load(std::memory_order_relaxed) == BLOCK_SIZE) {
1510 std::atomic_thread_fence(m: std::memory_order_acquire);
1511 return true;
1512 }
1513 assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <= BLOCK_SIZE);
1514 return false;
1515 }
1516 }
1517
1518 // Returns true if the block is now empty (does not apply in explicit context)
1519 template<InnerQueueContext context>
1520 inline bool set_empty(MOODYCAMEL_MAYBE_UNUSED index_t i)
1521 {
1522 MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1523 // Set flag
1524 assert(!emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].load(std::memory_order_relaxed));
1525 emptyFlags[BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1))].store(true, std::memory_order_release);
1526 return false;
1527 }
1528 else {
1529 // Increment counter
1530 auto prevVal = elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
1531 assert(prevVal < BLOCK_SIZE);
1532 return prevVal == BLOCK_SIZE - 1;
1533 }
1534 }
1535
1536 // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping and count > 0).
1537 // Returns true if the block is now empty (does not apply in explicit context).
1538 template<InnerQueueContext context>
1539 inline bool set_many_empty(MOODYCAMEL_MAYBE_UNUSED index_t i, size_t count)
1540 {
1541 MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1542 // Set flags
1543 std::atomic_thread_fence(m: std::memory_order_release);
1544 i = BLOCK_SIZE - 1 - static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1)) - count + 1;
1545 for (size_t j = 0; j != count; ++j) {
1546 assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
1547 emptyFlags[i + j].store(true, std::memory_order_relaxed);
1548 }
1549 return false;
1550 }
1551 else {
1552 // Increment counter
1553 auto prevVal = elementsCompletelyDequeued.fetch_add(count, std::memory_order_release);
1554 assert(prevVal + count <= BLOCK_SIZE);
1555 return prevVal + count == BLOCK_SIZE;
1556 }
1557 }
1558
1559 template<InnerQueueContext context>
1560 inline void set_all_empty()
1561 {
1562 MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1563 // Set all flags
1564 for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1565 emptyFlags[i].store(true, std::memory_order_relaxed);
1566 }
1567 }
1568 else {
1569 // Reset counter
1570 elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
1571 }
1572 }
1573
1574 template<InnerQueueContext context>
1575 inline void reset_empty()
1576 {
1577 MOODYCAMEL_CONSTEXPR_IF (context == explicit_context && BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD) {
1578 // Reset flags
1579 for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1580 emptyFlags[i].store(false, std::memory_order_relaxed);
1581 }
1582 }
1583 else {
1584 // Reset counter
1585 elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
1586 }
1587 }
1588
1589 inline T* operator[](index_t idx) MOODYCAMEL_NOEXCEPT { return static_cast<T*>(static_cast<void*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
1590 inline T const* operator[](index_t idx) const MOODYCAMEL_NOEXCEPT { return static_cast<T const*>(static_cast<void const*>(elements)) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }
1591
1592 private:
1593 static_assert(std::alignment_of<T>::value <= sizeof(T), "The queue does not support types with an alignment greater than their size at this time");
1594 MOODYCAMEL_ALIGNAS(MOODYCAMEL_ALIGNOF(T)) char elements[sizeof(T) * BLOCK_SIZE];
1595 public:
1596 Block* next;
1597 std::atomic<size_t> elementsCompletelyDequeued;
1598 std::atomic<bool> emptyFlags[BLOCK_SIZE <= EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD ? BLOCK_SIZE : 1];
1599 public:
1600 std::atomic<std::uint32_t> freeListRefs;
1601 std::atomic<Block*> freeListNext;
1602 std::atomic<bool> shouldBeOnFreeList;
1603 bool dynamicallyAllocated; // Perhaps a better name for this would be 'isNotPartOfInitialBlockPool'
1604
1605#ifdef MCDBGQ_TRACKMEM
1606 void* owner;
1607#endif
1608 };
1609 static_assert(std::alignment_of<Block>::value >= std::alignment_of<T>::value, "Internal error: Blocks must be at least as aligned as the type they are wrapping");
1610
1611
1612#ifdef MCDBGQ_TRACKMEM
1613public:
1614 struct MemStats;
1615private:
1616#endif
1617
1618 ///////////////////////////
1619 // Producer base
1620 ///////////////////////////
1621
1622 struct ProducerBase : public details::ConcurrentQueueProducerTypelessBase
1623 {
1624 ProducerBase(ConcurrentQueue* parent_, bool isExplicit_) :
1625 tailIndex(0),
1626 headIndex(0),
1627 dequeueOptimisticCount(0),
1628 dequeueOvercommit(0),
1629 tailBlock(nullptr),
1630 isExplicit(isExplicit_),
1631 parent(parent_)
1632 {
1633 }
1634
1635 virtual ~ProducerBase() { };
1636
1637 template<typename U>
1638 inline bool dequeue(U& element)
1639 {
1640 if (isExplicit) {
1641 return static_cast<ExplicitProducer*>(this)->dequeue(element);
1642 }
1643 else {
1644 return static_cast<ImplicitProducer*>(this)->dequeue(element);
1645 }
1646 }
1647
1648 template<typename It>
1649 inline size_t dequeue_bulk(It& itemFirst, size_t max)
1650 {
1651 if (isExplicit) {
1652 return static_cast<ExplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
1653 }
1654 else {
1655 return static_cast<ImplicitProducer*>(this)->dequeue_bulk(itemFirst, max);
1656 }
1657 }
1658
1659 inline ProducerBase* next_prod() const { return static_cast<ProducerBase*>(next); }
1660
1661 inline size_t size_approx() const
1662 {
1663 auto tail = tailIndex.load(std::memory_order_relaxed);
1664 auto head = headIndex.load(std::memory_order_relaxed);
1665 return details::circular_less_than(head, tail) ? static_cast<size_t>(tail - head) : 0;
1666 }
1667
1668 inline index_t getTail() const { return tailIndex.load(std::memory_order_relaxed); }
1669 protected:
1670 std::atomic<index_t> tailIndex; // Where to enqueue to next
1671 std::atomic<index_t> headIndex; // Where to dequeue from next
1672
1673 std::atomic<index_t> dequeueOptimisticCount;
1674 std::atomic<index_t> dequeueOvercommit;
1675
1676 Block* tailBlock;
1677
1678 public:
1679 bool isExplicit;
1680 ConcurrentQueue* parent;
1681
1682 protected:
1683#ifdef MCDBGQ_TRACKMEM
1684 friend struct MemStats;
1685#endif
1686 };
1687
1688
1689 ///////////////////////////
1690 // Explicit queue
1691 ///////////////////////////
1692
1693 struct ExplicitProducer : public ProducerBase
1694 {
1695 explicit ExplicitProducer(ConcurrentQueue* parent_) :
1696 ProducerBase(parent_, true),
1697 blockIndex(nullptr),
1698 pr_blockIndexSlotsUsed(0),
1699 pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1),
1700 pr_blockIndexFront(0),
1701 pr_blockIndexEntries(nullptr),
1702 pr_blockIndexRaw(nullptr)
1703 {
1704 size_t poolBasedIndexSize = details::ceil_to_pow_2(parent_->initialBlockPoolSize) >> 1;
1705 if (poolBasedIndexSize > pr_blockIndexSize) {
1706 pr_blockIndexSize = poolBasedIndexSize;
1707 }
1708
1709 new_block_index(numberOfFilledSlotsToExpose: 0); // This creates an index with double the number of current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE
1710 }
1711
1712 ~ExplicitProducer()
1713 {
1714 // Destruct any elements not yet dequeued.
1715 // Since we're in the destructor, we can assume all elements
1716 // are either completely dequeued or completely not (no halfways).
1717 if (this->tailBlock != nullptr) { // Note this means there must be a block index too
1718 // First find the block that's partially dequeued, if any
1719 Block* halfDequeuedBlock = nullptr;
1720 if ((this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) != 0) {
1721 // The head's not on a block boundary, meaning a block somewhere is partially dequeued
1722 // (or the head block is the tail block and was fully dequeued, but the head/tail are still not on a boundary)
1723 size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & (pr_blockIndexSize - 1);
1724 while (details::circular_less_than<index_t>(pr_blockIndexEntries[i].base + BLOCK_SIZE, this->headIndex.load(std::memory_order_relaxed))) {
1725 i = (i + 1) & (pr_blockIndexSize - 1);
1726 }
1727 assert(details::circular_less_than<index_t>(pr_blockIndexEntries[i].base, this->headIndex.load(std::memory_order_relaxed)));
1728 halfDequeuedBlock = pr_blockIndexEntries[i].block;
1729 }
1730
1731 // Start at the head block (note the first line in the loop gives us the head from the tail on the first iteration)
1732 auto block = this->tailBlock;
1733 do {
1734 block = block->next;
1735 if (block->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1736 continue;
1737 }
1738
1739 size_t i = 0; // Offset into block
1740 if (block == halfDequeuedBlock) {
1741 i = static_cast<size_t>(this->headIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1));
1742 }
1743
1744 // Walk through all the items in the block; if this is the tail block, we need to stop when we reach the tail index
1745 auto lastValidIndex = (this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 ? BLOCK_SIZE : static_cast<size_t>(this->tailIndex.load(std::memory_order_relaxed) & static_cast<index_t>(BLOCK_SIZE - 1));
1746 while (i != BLOCK_SIZE && (block != this->tailBlock || i != lastValidIndex)) {
1747 (*block)[i++]->~T();
1748 }
1749 } while (block != this->tailBlock);
1750 }
1751
1752 // Destroy all blocks that we own
1753 if (this->tailBlock != nullptr) {
1754 auto block = this->tailBlock;
1755 do {
1756 auto nextBlock = block->next;
1757 if (block->dynamicallyAllocated) {
1758 destroy(block);
1759 }
1760 else {
1761 this->parent->add_block_to_free_list(block);
1762 }
1763 block = nextBlock;
1764 } while (block != this->tailBlock);
1765 }
1766
1767 // Destroy the block indices
1768 auto header = static_cast<BlockIndexHeader*>(pr_blockIndexRaw);
1769 while (header != nullptr) {
1770 auto prev = static_cast<BlockIndexHeader*>(header->prev);
1771 header->~BlockIndexHeader();
1772 (Traits::free)(header);
1773 header = prev;
1774 }
1775 }
1776
1777 template<AllocationMode allocMode, typename U>
1778 inline bool enqueue(U&& element)
1779 {
1780 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
1781 index_t newTailIndex = 1 + currentTailIndex;
1782 if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
1783 // We reached the end of a block, start a new one
1784 auto startBlock = this->tailBlock;
1785 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
1786 if (this->tailBlock != nullptr && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
1787 // We can re-use the block ahead of us, it's empty!
1788 this->tailBlock = this->tailBlock->next;
1789 this->tailBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1790
1791 // We'll put the block on the block index (guaranteed to be room since we're conceptually removing the
1792 // last block from it first -- except instead of removing then adding, we can just overwrite).
1793 // Note that there must be a valid block index here, since even if allocation failed in the ctor,
1794 // it would have been re-attempted when adding the first block to the queue; since there is such
1795 // a block, a block index must have been successfully allocated.
1796 }
1797 else {
1798 // Whatever head value we see here is >= the last value we saw here (relatively),
1799 // and <= its current value. Since we have the most recent tail, the head must be
1800 // <= to it.
1801 auto head = this->headIndex.load(std::memory_order_relaxed);
1802 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
1803 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE)
1804 || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
1805 // We can't enqueue in another block because there's not enough leeway -- the
1806 // tail could surpass the head by the time the block fills up! (Or we'll exceed
1807 // the size limit, if the second part of the condition was true.)
1808 return false;
1809 }
1810 // We're going to need a new block; check that the block index has room
1811 if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize) {
1812 // Hmm, the circular block index is already full -- we'll need
1813 // to allocate a new index. Note pr_blockIndexRaw can only be nullptr if
1814 // the initial allocation failed in the constructor.
1815
1816 MOODYCAMEL_CONSTEXPR_IF (allocMode == CannotAlloc) {
1817 return false;
1818 }
1819 else if (!new_block_index(numberOfFilledSlotsToExpose: pr_blockIndexSlotsUsed)) {
1820 return false;
1821 }
1822 }
1823
1824 // Insert a new block in the circular linked list
1825 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
1826 if (newBlock == nullptr) {
1827 return false;
1828 }
1829#ifdef MCDBGQ_TRACKMEM
1830 newBlock->owner = this;
1831#endif
1832 newBlock->ConcurrentQueue::Block::template reset_empty<explicit_context>();
1833 if (this->tailBlock == nullptr) {
1834 newBlock->next = newBlock;
1835 }
1836 else {
1837 newBlock->next = this->tailBlock->next;
1838 this->tailBlock->next = newBlock;
1839 }
1840 this->tailBlock = newBlock;
1841 ++pr_blockIndexSlotsUsed;
1842 }
1843
1844 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new ((T*)nullptr) T(std::forward<U>(element)))) {
1845 // The constructor may throw. We want the element not to appear in the queue in
1846 // that case (without corrupting the queue):
1847 MOODYCAMEL_TRY {
1848 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1849 }
1850 MOODYCAMEL_CATCH (...) {
1851 // Revert change to the current block, but leave the new block available
1852 // for next time
1853 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
1854 this->tailBlock = startBlock == nullptr ? this->tailBlock : startBlock;
1855 MOODYCAMEL_RETHROW;
1856 }
1857 }
1858 else {
1859 (void)startBlock;
1860 (void)originalBlockIndexSlotsUsed;
1861 }
1862
1863 // Add block to block index
1864 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
1865 entry.base = currentTailIndex;
1866 entry.block = this->tailBlock;
1867 blockIndex.load(std::memory_order_relaxed)->front.store(pr_blockIndexFront, std::memory_order_release);
1868 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
1869
1870 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new ((T*)nullptr) T(std::forward<U>(element)))) {
1871 this->tailIndex.store(newTailIndex, std::memory_order_release);
1872 return true;
1873 }
1874 }
1875
1876 // Enqueue
1877 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
1878
1879 this->tailIndex.store(newTailIndex, std::memory_order_release);
1880 return true;
1881 }
1882
1883 template<typename U>
1884 bool dequeue(U& element)
1885 {
1886 auto tail = this->tailIndex.load(std::memory_order_relaxed);
1887 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
1888 if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
1889 // Might be something to dequeue, let's give it a try
1890
1891 // Note that this if is purely for performance purposes in the common case when the queue is
1892 // empty and the values are eventually consistent -- we may enter here spuriously.
1893
1894 // Note that whatever the values of overcommit and tail are, they are not going to change (unless we
1895 // change them) and must be the same value at this point (inside the if) as when the if condition was
1896 // evaluated.
1897
1898 // We insert an acquire fence here to synchronize-with the release upon incrementing dequeueOvercommit below.
1899 // This ensures that whatever the value we got loaded into overcommit, the load of dequeueOptisticCount in
1900 // the fetch_add below will result in a value at least as recent as that (and therefore at least as large).
1901 // Note that I believe a compiler (signal) fence here would be sufficient due to the nature of fetch_add (all
1902 // read-modify-write operations are guaranteed to work on the latest value in the modification order), but
1903 // unfortunately that can't be shown to be correct using only the C++11 standard.
1904 // See http://stackoverflow.com/questions/18223161/what-are-the-c11-memory-ordering-guarantees-in-this-corner-case
1905 std::atomic_thread_fence(m: std::memory_order_acquire);
1906
1907 // Increment optimistic counter, then check if it went over the boundary
1908 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
1909
1910 // Note that since dequeueOvercommit must be <= dequeueOptimisticCount (because dequeueOvercommit is only ever
1911 // incremented after dequeueOptimisticCount -- this is enforced in the `else` block below), and since we now
1912 // have a version of dequeueOptimisticCount that is at least as recent as overcommit (due to the release upon
1913 // incrementing dequeueOvercommit and the acquire above that synchronizes with it), overcommit <= myDequeueCount.
1914 // However, we can't assert this since both dequeueOptimisticCount and dequeueOvercommit may (independently)
1915 // overflow; in such a case, though, the logic still holds since the difference between the two is maintained.
1916
1917 // Note that we reload tail here in case it changed; it will be the same value as before or greater, since
1918 // this load is sequenced after (happens after) the earlier load above. This is supported by read-read
1919 // coherency (as defined in the standard), explained here: http://en.cppreference.com/w/cpp/atomic/memory_order
1920 tail = this->tailIndex.load(std::memory_order_acquire);
1921 if ((details::likely)(x: details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
1922 // Guaranteed to be at least one element to dequeue!
1923
1924 // Get the index. Note that since there's guaranteed to be at least one element, this
1925 // will never exceed tail. We need to do an acquire-release fence here since it's possible
1926 // that whatever condition got us to this point was for an earlier enqueued element (that
1927 // we already see the memory effects for), but that by the time we increment somebody else
1928 // has incremented it, and we need to see the memory effects for *that* element, which is
1929 // in such a case is necessarily visible on the thread that incremented it in the first
1930 // place with the more current condition (they must have acquired a tail that is at least
1931 // as recent).
1932 auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
1933
1934
1935 // Determine which block the element is in
1936
1937 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
1938 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
1939
1940 // We need to be careful here about subtracting and dividing because of index wrap-around.
1941 // When an index wraps, we need to preserve the sign of the offset when dividing it by the
1942 // block size (in order to get a correct signed block count offset in all cases):
1943 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
1944 auto blockBaseIndex = index & ~static_cast<index_t>(BLOCK_SIZE - 1);
1945 auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(blockBaseIndex - headBase) / BLOCK_SIZE);
1946 auto block = localBlockIndex->entries[(localBlockIndexHead + offset) & (localBlockIndex->size - 1)].block;
1947
1948 // Dequeue
1949 auto& el = *((*block)[index]);
1950 if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = std::move(el))) {
1951 // Make sure the element is still fully dequeued and destroyed even if the assignment
1952 // throws
1953 struct Guard {
1954 Block* block;
1955 index_t index;
1956
1957 ~Guard()
1958 {
1959 (*block)[index]->~T();
1960 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
1961 }
1962 } guard = { block, index };
1963
1964 element = std::move(el); // NOLINT
1965 }
1966 else {
1967 element = std::move(el); // NOLINT
1968 el.~T(); // NOLINT
1969 block->ConcurrentQueue::Block::template set_empty<explicit_context>(index);
1970 }
1971
1972 return true;
1973 }
1974 else {
1975 // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
1976 this->dequeueOvercommit.fetch_add(1, std::memory_order_release); // Release so that the fetch_add on dequeueOptimisticCount is guaranteed to happen before this write
1977 }
1978 }
1979
1980 return false;
1981 }
1982
1983 template<AllocationMode allocMode, typename It>
1984 bool enqueue_bulk(It itemFirst, size_t count)
1985 {
1986 // First, we need to make sure we have enough room to enqueue all of the elements;
1987 // this means pre-allocating blocks and putting them in the block index (but only if
1988 // all the allocations succeeded).
1989 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
1990 auto startBlock = this->tailBlock;
1991 auto originalBlockIndexFront = pr_blockIndexFront;
1992 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
1993
1994 Block* firstAllocatedBlock = nullptr;
1995
1996 // Figure out how many blocks we'll need to allocate, and do so
1997 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
1998 index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
1999 if (blockBaseDiff > 0) {
2000 // Allocate as many blocks as possible from ahead
2001 while (blockBaseDiff > 0 && this->tailBlock != nullptr && this->tailBlock->next != firstAllocatedBlock && this->tailBlock->next->ConcurrentQueue::Block::template is_empty<explicit_context>()) {
2002 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2003 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2004
2005 this->tailBlock = this->tailBlock->next;
2006 firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2007
2008 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2009 entry.base = currentTailIndex;
2010 entry.block = this->tailBlock;
2011 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2012 }
2013
2014 // Now allocate as many blocks as necessary from the block pool
2015 while (blockBaseDiff > 0) {
2016 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2017 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2018
2019 auto head = this->headIndex.load(std::memory_order_relaxed);
2020 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2021 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2022 if (pr_blockIndexRaw == nullptr || pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
2023 MOODYCAMEL_CONSTEXPR_IF (allocMode == CannotAlloc) {
2024 // Failed to allocate, undo changes (but keep injected blocks)
2025 pr_blockIndexFront = originalBlockIndexFront;
2026 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2027 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2028 return false;
2029 }
2030 else if (full || !new_block_index(numberOfFilledSlotsToExpose: originalBlockIndexSlotsUsed)) {
2031 // Failed to allocate, undo changes (but keep injected blocks)
2032 pr_blockIndexFront = originalBlockIndexFront;
2033 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2034 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2035 return false;
2036 }
2037
2038 // pr_blockIndexFront is updated inside new_block_index, so we need to
2039 // update our fallback value too (since we keep the new index even if we
2040 // later fail)
2041 originalBlockIndexFront = originalBlockIndexSlotsUsed;
2042 }
2043
2044 // Insert a new block in the circular linked list
2045 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2046 if (newBlock == nullptr) {
2047 pr_blockIndexFront = originalBlockIndexFront;
2048 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2049 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2050 return false;
2051 }
2052
2053#ifdef MCDBGQ_TRACKMEM
2054 newBlock->owner = this;
2055#endif
2056 newBlock->ConcurrentQueue::Block::template set_all_empty<explicit_context>();
2057 if (this->tailBlock == nullptr) {
2058 newBlock->next = newBlock;
2059 }
2060 else {
2061 newBlock->next = this->tailBlock->next;
2062 this->tailBlock->next = newBlock;
2063 }
2064 this->tailBlock = newBlock;
2065 firstAllocatedBlock = firstAllocatedBlock == nullptr ? this->tailBlock : firstAllocatedBlock;
2066
2067 ++pr_blockIndexSlotsUsed;
2068
2069 auto& entry = blockIndex.load(std::memory_order_relaxed)->entries[pr_blockIndexFront];
2070 entry.base = currentTailIndex;
2071 entry.block = this->tailBlock;
2072 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2073 }
2074
2075 // Excellent, all allocations succeeded. Reset each block's emptiness before we fill them up, and
2076 // publish the new block index front
2077 auto block = firstAllocatedBlock;
2078 while (true) {
2079 block->ConcurrentQueue::Block::template reset_empty<explicit_context>();
2080 if (block == this->tailBlock) {
2081 break;
2082 }
2083 block = block->next;
2084 }
2085
2086 if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new ((T*)nullptr) T(details::deref_noexcept(itemFirst)))) {
2087 blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2088 }
2089 }
2090
2091 // Enqueue, one block at a time
2092 index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2093 currentTailIndex = startTailIndex;
2094 auto endBlock = this->tailBlock;
2095 this->tailBlock = startBlock;
2096 assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0);
2097 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
2098 this->tailBlock = firstAllocatedBlock;
2099 }
2100 while (true) {
2101 auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2102 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2103 stopIndex = newTailIndex;
2104 }
2105 if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new ((T*)nullptr) T(details::deref_noexcept(itemFirst)))) {
2106 while (currentTailIndex != stopIndex) {
2107 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2108 }
2109 }
2110 else {
2111 MOODYCAMEL_TRY {
2112 while (currentTailIndex != stopIndex) {
2113 // Must use copy constructor even if move constructor is available
2114 // because we may have to revert if there's an exception.
2115 // Sorry about the horrible templated next line, but it was the only way
2116 // to disable moving *at compile time*, which is important because a type
2117 // may only define a (noexcept) move constructor, and so calls to the
2118 // cctor will not compile, even if they are in an if branch that will never
2119 // be executed
2120 new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<(bool)!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new ((T*)nullptr) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
2121 ++currentTailIndex;
2122 ++itemFirst;
2123 }
2124 }
2125 MOODYCAMEL_CATCH (...) {
2126 // Oh dear, an exception's been thrown -- destroy the elements that
2127 // were enqueued so far and revert the entire bulk operation (we'll keep
2128 // any allocated blocks in our linked list for later, though).
2129 auto constructedStopIndex = currentTailIndex;
2130 auto lastBlockEnqueued = this->tailBlock;
2131
2132 pr_blockIndexFront = originalBlockIndexFront;
2133 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2134 this->tailBlock = startBlock == nullptr ? firstAllocatedBlock : startBlock;
2135
2136 if (!details::is_trivially_destructible<T>::value) {
2137 auto block = startBlock;
2138 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2139 block = firstAllocatedBlock;
2140 }
2141 currentTailIndex = startTailIndex;
2142 while (true) {
2143 stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2144 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2145 stopIndex = constructedStopIndex;
2146 }
2147 while (currentTailIndex != stopIndex) {
2148 (*block)[currentTailIndex++]->~T();
2149 }
2150 if (block == lastBlockEnqueued) {
2151 break;
2152 }
2153 block = block->next;
2154 }
2155 }
2156 MOODYCAMEL_RETHROW;
2157 }
2158 }
2159
2160 if (this->tailBlock == endBlock) {
2161 assert(currentTailIndex == newTailIndex);
2162 break;
2163 }
2164 this->tailBlock = this->tailBlock->next;
2165 }
2166
2167 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new ((T*)nullptr) T(details::deref_noexcept(itemFirst))) && firstAllocatedBlock != nullptr) {
2168 blockIndex.load(std::memory_order_relaxed)->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1), std::memory_order_release);
2169 }
2170
2171 this->tailIndex.store(newTailIndex, std::memory_order_release);
2172 return true;
2173 }
2174
2175 template<typename It>
2176 size_t dequeue_bulk(It& itemFirst, size_t max)
2177 {
2178 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2179 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2180 auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2181 if (details::circular_less_than<size_t>(0, desiredCount)) {
2182 desiredCount = desiredCount < max ? desiredCount : max;
2183 std::atomic_thread_fence(m: std::memory_order_acquire);
2184
2185 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);;
2186
2187 tail = this->tailIndex.load(std::memory_order_acquire);
2188 auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2189 if (details::circular_less_than<size_t>(0, actualCount)) {
2190 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2191 if (actualCount < desiredCount) {
2192 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2193 }
2194
2195 // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
2196 // will never exceed tail.
2197 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2198
2199 // Determine which block the first element is in
2200 auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2201 auto localBlockIndexHead = localBlockIndex->front.load(std::memory_order_acquire);
2202
2203 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2204 auto firstBlockBaseIndex = firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1);
2205 auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(firstBlockBaseIndex - headBase) / BLOCK_SIZE);
2206 auto indexIndex = (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
2207
2208 // Iterate the blocks and dequeue
2209 auto index = firstIndex;
2210 do {
2211 auto firstIndexInBlock = index;
2212 auto endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2213 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2214 auto block = localBlockIndex->entries[indexIndex].block;
2215 if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) {
2216 while (index != endIndex) {
2217 auto& el = *((*block)[index]);
2218 *itemFirst++ = std::move(el);
2219 el.~T();
2220 ++index;
2221 }
2222 }
2223 else {
2224 MOODYCAMEL_TRY {
2225 while (index != endIndex) {
2226 auto& el = *((*block)[index]);
2227 *itemFirst = std::move(el);
2228 ++itemFirst;
2229 el.~T();
2230 ++index;
2231 }
2232 }
2233 MOODYCAMEL_CATCH (...) {
2234 // It's too late to revert the dequeue, but we can make sure that all
2235 // the dequeued objects are properly destroyed and the block index
2236 // (and empty count) are properly updated before we propagate the exception
2237 do {
2238 block = localBlockIndex->entries[indexIndex].block;
2239 while (index != endIndex) {
2240 (*block)[index++]->~T();
2241 }
2242 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
2243 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2244
2245 firstIndexInBlock = index;
2246 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2247 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2248 } while (index != firstIndex + actualCount);
2249
2250 MOODYCAMEL_RETHROW;
2251 }
2252 }
2253 block->ConcurrentQueue::Block::template set_many_empty<explicit_context>(firstIndexInBlock, static_cast<size_t>(endIndex - firstIndexInBlock));
2254 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2255 } while (index != firstIndex + actualCount);
2256
2257 return actualCount;
2258 }
2259 else {
2260 // Wasn't anything to dequeue after all; make the effective dequeue count eventually consistent
2261 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2262 }
2263 }
2264
2265 return 0;
2266 }
2267
2268 private:
2269 struct BlockIndexEntry
2270 {
2271 index_t base;
2272 Block* block;
2273 };
2274
2275 struct BlockIndexHeader
2276 {
2277 size_t size;
2278 std::atomic<size_t> front; // Current slot (not next, like pr_blockIndexFront)
2279 BlockIndexEntry* entries;
2280 void* prev;
2281 };
2282
2283
2284 bool new_block_index(size_t numberOfFilledSlotsToExpose)
2285 {
2286 auto prevBlockSizeMask = pr_blockIndexSize - 1;
2287
2288 // Create the new block
2289 pr_blockIndexSize <<= 1;
2290 auto newRawPtr = static_cast<char*>((Traits::malloc)(sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * pr_blockIndexSize));
2291 if (newRawPtr == nullptr) {
2292 pr_blockIndexSize >>= 1; // Reset to allow graceful retry
2293 return false;
2294 }
2295
2296 auto newBlockIndexEntries = reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(newRawPtr + sizeof(BlockIndexHeader)));
2297
2298 // Copy in all the old indices, if any
2299 size_t j = 0;
2300 if (pr_blockIndexSlotsUsed != 0) {
2301 auto i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
2302 do {
2303 newBlockIndexEntries[j++] = pr_blockIndexEntries[i];
2304 i = (i + 1) & prevBlockSizeMask;
2305 } while (i != pr_blockIndexFront);
2306 }
2307
2308 // Update everything
2309 auto header = new (newRawPtr) BlockIndexHeader;
2310 header->size = pr_blockIndexSize;
2311 header->front.store(numberOfFilledSlotsToExpose - 1, std::memory_order_relaxed);
2312 header->entries = newBlockIndexEntries;
2313 header->prev = pr_blockIndexRaw; // we link the new block to the old one so we can free it later
2314
2315 pr_blockIndexFront = j;
2316 pr_blockIndexEntries = newBlockIndexEntries;
2317 pr_blockIndexRaw = newRawPtr;
2318 blockIndex.store(header, std::memory_order_release);
2319
2320 return true;
2321 }
2322
2323 private:
2324 std::atomic<BlockIndexHeader*> blockIndex;
2325
2326 // To be used by producer only -- consumer must use the ones in referenced by blockIndex
2327 size_t pr_blockIndexSlotsUsed;
2328 size_t pr_blockIndexSize;
2329 size_t pr_blockIndexFront; // Next slot (not current)
2330 BlockIndexEntry* pr_blockIndexEntries;
2331 void* pr_blockIndexRaw;
2332
2333#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2334 public:
2335 ExplicitProducer* nextExplicitProducer;
2336 private:
2337#endif
2338
2339#ifdef MCDBGQ_TRACKMEM
2340 friend struct MemStats;
2341#endif
2342 };
2343
2344
2345 //////////////////////////////////
2346 // Implicit queue
2347 //////////////////////////////////
2348
2349 struct ImplicitProducer : public ProducerBase
2350 {
2351 ImplicitProducer(ConcurrentQueue* parent_) :
2352 ProducerBase(parent_, false),
2353 nextBlockIndexCapacity(IMPLICIT_INITIAL_INDEX_SIZE),
2354 blockIndex(nullptr)
2355 {
2356 new_block_index();
2357 }
2358
2359 ~ImplicitProducer()
2360 {
2361 // Note that since we're in the destructor we can assume that all enqueue/dequeue operations
2362 // completed already; this means that all undequeued elements are placed contiguously across
2363 // contiguous blocks, and that only the first and last remaining blocks can be only partially
2364 // empty (all other remaining blocks must be completely full).
2365
2366#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2367 // Unregister ourselves for thread termination notification
2368 if (!this->inactive.load(std::memory_order_relaxed)) {
2369 details::ThreadExitNotifier::unsubscribe(&threadExitListener);
2370 }
2371#endif
2372
2373 // Destroy all remaining elements!
2374 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2375 auto index = this->headIndex.load(std::memory_order_relaxed);
2376 Block* block = nullptr;
2377 assert(index == tail || details::circular_less_than(index, tail));
2378 bool forceFreeLastBlock = index != tail; // If we enter the loop, then the last (tail) block will not be freed
2379 while (index != tail) {
2380 if ((index & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 || block == nullptr) {
2381 if (block != nullptr) {
2382 // Free the old block
2383 this->parent->add_block_to_free_list(block);
2384 }
2385
2386 block = get_block_index_entry_for_index(index)->value.load(std::memory_order_relaxed);
2387 }
2388
2389 ((*block)[index])->~T();
2390 ++index;
2391 }
2392 // Even if the queue is empty, there's still one block that's not on the free list
2393 // (unless the head index reached the end of it, in which case the tail will be poised
2394 // to create a new block).
2395 if (this->tailBlock != nullptr && (forceFreeLastBlock || (tail & static_cast<index_t>(BLOCK_SIZE - 1)) != 0)) {
2396 this->parent->add_block_to_free_list(this->tailBlock);
2397 }
2398
2399 // Destroy block index
2400 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2401 if (localBlockIndex != nullptr) {
2402 for (size_t i = 0; i != localBlockIndex->capacity; ++i) {
2403 localBlockIndex->index[i]->~BlockIndexEntry();
2404 }
2405 do {
2406 auto prev = localBlockIndex->prev;
2407 localBlockIndex->~BlockIndexHeader();
2408 (Traits::free)(localBlockIndex);
2409 localBlockIndex = prev;
2410 } while (localBlockIndex != nullptr);
2411 }
2412 }
2413
2414 template<AllocationMode allocMode, typename U>
2415 inline bool enqueue(U&& element)
2416 {
2417 index_t currentTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2418 index_t newTailIndex = 1 + currentTailIndex;
2419 if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2420 // We reached the end of a block, start a new one
2421 auto head = this->headIndex.load(std::memory_order_relaxed);
2422 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2423 if (!details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
2424 return false;
2425 }
2426#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2427 debug::DebugLock lock(mutex);
2428#endif
2429 // Find out where we'll be inserting this block in the block index
2430 BlockIndexEntry* idxEntry;
2431 if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) {
2432 return false;
2433 }
2434
2435 // Get ahold of a new block
2436 auto newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>();
2437 if (newBlock == nullptr) {
2438 rewind_block_index_tail();
2439 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2440 return false;
2441 }
2442#ifdef MCDBGQ_TRACKMEM
2443 newBlock->owner = this;
2444#endif
2445 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2446
2447 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new ((T*)nullptr) T(std::forward<U>(element)))) {
2448 // May throw, try to insert now before we publish the fact that we have this new block
2449 MOODYCAMEL_TRY {
2450 new ((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
2451 }
2452 MOODYCAMEL_CATCH (...) {
2453 rewind_block_index_tail();
2454 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2455 this->parent->add_block_to_free_list(newBlock);
2456 MOODYCAMEL_RETHROW;
2457 }
2458 }
2459
2460 // Insert the new block into the index
2461 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2462
2463 this->tailBlock = newBlock;
2464
2465 if (!MOODYCAMEL_NOEXCEPT_CTOR(T, U, new ((T*)nullptr) T(std::forward<U>(element)))) {
2466 this->tailIndex.store(newTailIndex, std::memory_order_release);
2467 return true;
2468 }
2469 }
2470
2471 // Enqueue
2472 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2473
2474 this->tailIndex.store(newTailIndex, std::memory_order_release);
2475 return true;
2476 }
2477
2478 template<typename U>
2479 bool dequeue(U& element)
2480 {
2481 // See ExplicitProducer::dequeue for rationale and explanation
2482 index_t tail = this->tailIndex.load(std::memory_order_relaxed);
2483 index_t overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2484 if (details::circular_less_than<index_t>(this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit, tail)) {
2485 std::atomic_thread_fence(m: std::memory_order_acquire);
2486
2487 index_t myDequeueCount = this->dequeueOptimisticCount.fetch_add(1, std::memory_order_relaxed);
2488 tail = this->tailIndex.load(std::memory_order_acquire);
2489 if ((details::likely)(x: details::circular_less_than<index_t>(myDequeueCount - overcommit, tail))) {
2490 index_t index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2491
2492 // Determine which block the element is in
2493 auto entry = get_block_index_entry_for_index(index);
2494
2495 // Dequeue
2496 auto block = entry->value.load(std::memory_order_relaxed);
2497 auto& el = *((*block)[index]);
2498
2499 if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, element = std::move(el))) {
2500#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2501 // Note: Acquiring the mutex with every dequeue instead of only when a block
2502 // is released is very sub-optimal, but it is, after all, purely debug code.
2503 debug::DebugLock lock(producer->mutex);
2504#endif
2505 struct Guard {
2506 Block* block;
2507 index_t index;
2508 BlockIndexEntry* entry;
2509 ConcurrentQueue* parent;
2510
2511 ~Guard()
2512 {
2513 (*block)[index]->~T();
2514 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2515 entry->value.store(nullptr, std::memory_order_relaxed);
2516 parent->add_block_to_free_list(block);
2517 }
2518 }
2519 } guard = { block, index, entry, this->parent };
2520
2521 element = std::move(el); // NOLINT
2522 }
2523 else {
2524 element = std::move(el); // NOLINT
2525 el.~T(); // NOLINT
2526
2527 if (block->ConcurrentQueue::Block::template set_empty<implicit_context>(index)) {
2528 {
2529#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2530 debug::DebugLock lock(mutex);
2531#endif
2532 // Add the block back into the global free pool (and remove from block index)
2533 entry->value.store(nullptr, std::memory_order_relaxed);
2534 }
2535 this->parent->add_block_to_free_list(block); // releases the above store
2536 }
2537 }
2538
2539 return true;
2540 }
2541 else {
2542 this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
2543 }
2544 }
2545
2546 return false;
2547 }
2548
2549 template<AllocationMode allocMode, typename It>
2550 bool enqueue_bulk(It itemFirst, size_t count)
2551 {
2552 // First, we need to make sure we have enough room to enqueue all of the elements;
2553 // this means pre-allocating blocks and putting them in the block index (but only if
2554 // all the allocations succeeded).
2555
2556 // Note that the tailBlock we start off with may not be owned by us any more;
2557 // this happens if it was filled up exactly to the top (setting tailIndex to
2558 // the first index of the next block which is not yet allocated), then dequeued
2559 // completely (putting it on the free list) before we enqueue again.
2560
2561 index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2562 auto startBlock = this->tailBlock;
2563 Block* firstAllocatedBlock = nullptr;
2564 auto endBlock = this->tailBlock;
2565
2566 // Figure out how many blocks we'll need to allocate, and do so
2567 size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2568 index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2569 if (blockBaseDiff > 0) {
2570#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2571 debug::DebugLock lock(mutex);
2572#endif
2573 do {
2574 blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2575 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2576
2577 // Find out where we'll be inserting this block in the block index
2578 BlockIndexEntry* idxEntry = nullptr; // initialization here unnecessary but compiler can't always tell
2579 Block* newBlock;
2580 bool indexInserted = false;
2581 auto head = this->headIndex.load(std::memory_order_relaxed);
2582 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2583 bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2584 if (full || !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) || (newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>()) == nullptr) {
2585 // Index allocation or block allocation failed; revert any other allocations
2586 // and index insertions done so far for this operation
2587 if (indexInserted) {
2588 rewind_block_index_tail();
2589 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2590 }
2591 currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2592 for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2593 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2594 idxEntry = get_block_index_entry_for_index(index: currentTailIndex);
2595 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2596 rewind_block_index_tail();
2597 }
2598 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2599 this->tailBlock = startBlock;
2600
2601 return false;
2602 }
2603
2604#ifdef MCDBGQ_TRACKMEM
2605 newBlock->owner = this;
2606#endif
2607 newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2608 newBlock->next = nullptr;
2609
2610 // Insert the new block into the index
2611 idxEntry->value.store(newBlock, std::memory_order_relaxed);
2612
2613 // Store the chain of blocks so that we can undo if later allocations fail,
2614 // and so that we can find the blocks when we do the actual enqueueing
2615 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr) {
2616 assert(this->tailBlock != nullptr);
2617 this->tailBlock->next = newBlock;
2618 }
2619 this->tailBlock = newBlock;
2620 endBlock = newBlock;
2621 firstAllocatedBlock = firstAllocatedBlock == nullptr ? newBlock : firstAllocatedBlock;
2622 } while (blockBaseDiff > 0);
2623 }
2624
2625 // Enqueue, one block at a time
2626 index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2627 currentTailIndex = startTailIndex;
2628 this->tailBlock = startBlock;
2629 assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0);
2630 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
2631 this->tailBlock = firstAllocatedBlock;
2632 }
2633 while (true) {
2634 auto stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2635 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2636 stopIndex = newTailIndex;
2637 }
2638 if (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new ((T*)nullptr) T(details::deref_noexcept(itemFirst)))) {
2639 while (currentTailIndex != stopIndex) {
2640 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2641 }
2642 }
2643 else {
2644 MOODYCAMEL_TRY {
2645 while (currentTailIndex != stopIndex) {
2646 new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<(bool)!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new ((T*)nullptr) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
2647 ++currentTailIndex;
2648 ++itemFirst;
2649 }
2650 }
2651 MOODYCAMEL_CATCH (...) {
2652 auto constructedStopIndex = currentTailIndex;
2653 auto lastBlockEnqueued = this->tailBlock;
2654
2655 if (!details::is_trivially_destructible<T>::value) {
2656 auto block = startBlock;
2657 if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2658 block = firstAllocatedBlock;
2659 }
2660 currentTailIndex = startTailIndex;
2661 while (true) {
2662 stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2663 if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
2664 stopIndex = constructedStopIndex;
2665 }
2666 while (currentTailIndex != stopIndex) {
2667 (*block)[currentTailIndex++]->~T();
2668 }
2669 if (block == lastBlockEnqueued) {
2670 break;
2671 }
2672 block = block->next;
2673 }
2674 }
2675
2676 currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2677 for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
2678 currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2679 auto idxEntry = get_block_index_entry_for_index(index: currentTailIndex);
2680 idxEntry->value.store(nullptr, std::memory_order_relaxed);
2681 rewind_block_index_tail();
2682 }
2683 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
2684 this->tailBlock = startBlock;
2685 MOODYCAMEL_RETHROW;
2686 }
2687 }
2688
2689 if (this->tailBlock == endBlock) {
2690 assert(currentTailIndex == newTailIndex);
2691 break;
2692 }
2693 this->tailBlock = this->tailBlock->next;
2694 }
2695 this->tailIndex.store(newTailIndex, std::memory_order_release);
2696 return true;
2697 }
2698
2699 template<typename It>
2700 size_t dequeue_bulk(It& itemFirst, size_t max)
2701 {
2702 auto tail = this->tailIndex.load(std::memory_order_relaxed);
2703 auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2704 auto desiredCount = static_cast<size_t>(tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) - overcommit));
2705 if (details::circular_less_than<size_t>(0, desiredCount)) {
2706 desiredCount = desiredCount < max ? desiredCount : max;
2707 std::atomic_thread_fence(m: std::memory_order_acquire);
2708
2709 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(desiredCount, std::memory_order_relaxed);
2710
2711 tail = this->tailIndex.load(std::memory_order_acquire);
2712 auto actualCount = static_cast<size_t>(tail - (myDequeueCount - overcommit));
2713 if (details::circular_less_than<size_t>(0, actualCount)) {
2714 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2715 if (actualCount < desiredCount) {
2716 this->dequeueOvercommit.fetch_add(desiredCount - actualCount, std::memory_order_release);
2717 }
2718
2719 // Get the first index. Note that since there's guaranteed to be at least actualCount elements, this
2720 // will never exceed tail.
2721 auto firstIndex = this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2722
2723 // Iterate the blocks and dequeue
2724 auto index = firstIndex;
2725 BlockIndexHeader* localBlockIndex;
2726 auto indexIndex = get_block_index_index_for_index(index, localBlockIndex);
2727 do {
2728 auto blockStartIndex = index;
2729 auto endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2730 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2731
2732 auto entry = localBlockIndex->index[indexIndex];
2733 auto block = entry->value.load(std::memory_order_relaxed);
2734 if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&, details::deref_noexcept(itemFirst) = std::move((*(*block)[index])))) {
2735 while (index != endIndex) {
2736 auto& el = *((*block)[index]);
2737 *itemFirst++ = std::move(el);
2738 el.~T();
2739 ++index;
2740 }
2741 }
2742 else {
2743 MOODYCAMEL_TRY {
2744 while (index != endIndex) {
2745 auto& el = *((*block)[index]);
2746 *itemFirst = std::move(el);
2747 ++itemFirst;
2748 el.~T();
2749 ++index;
2750 }
2751 }
2752 MOODYCAMEL_CATCH (...) {
2753 do {
2754 entry = localBlockIndex->index[indexIndex];
2755 block = entry->value.load(std::memory_order_relaxed);
2756 while (index != endIndex) {
2757 (*block)[index++]->~T();
2758 }
2759
2760 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
2761#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2762 debug::DebugLock lock(mutex);
2763#endif
2764 entry->value.store(nullptr, std::memory_order_relaxed);
2765 this->parent->add_block_to_free_list(block);
2766 }
2767 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2768
2769 blockStartIndex = index;
2770 endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
2771 endIndex = details::circular_less_than<index_t>(firstIndex + static_cast<index_t>(actualCount), endIndex) ? firstIndex + static_cast<index_t>(actualCount) : endIndex;
2772 } while (index != firstIndex + actualCount);
2773
2774 MOODYCAMEL_RETHROW;
2775 }
2776 }
2777 if (block->ConcurrentQueue::Block::template set_many_empty<implicit_context>(blockStartIndex, static_cast<size_t>(endIndex - blockStartIndex))) {
2778 {
2779#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2780 debug::DebugLock lock(mutex);
2781#endif
2782 // Note that the set_many_empty above did a release, meaning that anybody who acquires the block
2783 // we're about to free can use it safely since our writes (and reads!) will have happened-before then.
2784 entry->value.store(nullptr, std::memory_order_relaxed);
2785 }
2786 this->parent->add_block_to_free_list(block); // releases the above store
2787 }
2788 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
2789 } while (index != firstIndex + actualCount);
2790
2791 return actualCount;
2792 }
2793 else {
2794 this->dequeueOvercommit.fetch_add(desiredCount, std::memory_order_release);
2795 }
2796 }
2797
2798 return 0;
2799 }
2800
2801 private:
2802 // The block size must be > 1, so any number with the low bit set is an invalid block base index
2803 static const index_t INVALID_BLOCK_BASE = 1;
2804
2805 struct BlockIndexEntry
2806 {
2807 std::atomic<index_t> key;
2808 std::atomic<Block*> value;
2809 };
2810
2811 struct BlockIndexHeader
2812 {
2813 size_t capacity;
2814 std::atomic<size_t> tail;
2815 BlockIndexEntry* entries;
2816 BlockIndexEntry** index;
2817 BlockIndexHeader* prev;
2818 };
2819
2820 template<AllocationMode allocMode>
2821 inline bool insert_block_index_entry(BlockIndexEntry*& idxEntry, index_t blockStartIndex)
2822 {
2823 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed); // We're the only writer thread, relaxed is OK
2824 if (localBlockIndex == nullptr) {
2825 return false; // this can happen if new_block_index failed in the constructor
2826 }
2827 auto newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
2828 idxEntry = localBlockIndex->index[newTail];
2829 if (idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE ||
2830 idxEntry->value.load(std::memory_order_relaxed) == nullptr) {
2831
2832 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2833 localBlockIndex->tail.store(newTail, std::memory_order_release);
2834 return true;
2835 }
2836
2837 // No room in the old block index, try to allocate another one!
2838 MOODYCAMEL_CONSTEXPR_IF (allocMode == CannotAlloc) {
2839 return false;
2840 }
2841 else if (!new_block_index()) {
2842 return false;
2843 }
2844 localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2845 newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) & (localBlockIndex->capacity - 1);
2846 idxEntry = localBlockIndex->index[newTail];
2847 assert(idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE);
2848 idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
2849 localBlockIndex->tail.store(newTail, std::memory_order_release);
2850 return true;
2851 }
2852
2853 inline void rewind_block_index_tail()
2854 {
2855 auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2856 localBlockIndex->tail.store((localBlockIndex->tail.load(std::memory_order_relaxed) - 1) & (localBlockIndex->capacity - 1), std::memory_order_relaxed);
2857 }
2858
2859 inline BlockIndexEntry* get_block_index_entry_for_index(index_t index) const
2860 {
2861 BlockIndexHeader* localBlockIndex;
2862 auto idx = get_block_index_index_for_index(index, localBlockIndex);
2863 return localBlockIndex->index[idx];
2864 }
2865
2866 inline size_t get_block_index_index_for_index(index_t index, BlockIndexHeader*& localBlockIndex) const
2867 {
2868#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2869 debug::DebugLock lock(mutex);
2870#endif
2871 index &= ~static_cast<index_t>(BLOCK_SIZE - 1);
2872 localBlockIndex = blockIndex.load(std::memory_order_acquire);
2873 auto tail = localBlockIndex->tail.load(std::memory_order_acquire);
2874 auto tailBase = localBlockIndex->index[tail]->key.load(std::memory_order_relaxed);
2875 assert(tailBase != INVALID_BLOCK_BASE);
2876 // Note: Must use division instead of shift because the index may wrap around, causing a negative
2877 // offset, whose negativity we want to preserve
2878 auto offset = static_cast<size_t>(static_cast<typename std::make_signed<index_t>::type>(index - tailBase) / BLOCK_SIZE);
2879 size_t idx = (tail + offset) & (localBlockIndex->capacity - 1);
2880 assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) == index && localBlockIndex->index[idx]->value.load(std::memory_order_relaxed) != nullptr);
2881 return idx;
2882 }
2883
2884 bool new_block_index()
2885 {
2886 auto prev = blockIndex.load(std::memory_order_relaxed);
2887 size_t prevCapacity = prev == nullptr ? 0 : prev->capacity;
2888 auto entryCount = prev == nullptr ? nextBlockIndexCapacity : prevCapacity;
2889 auto raw = static_cast<char*>((Traits::malloc)(
2890 sizeof(BlockIndexHeader) +
2891 std::alignment_of<BlockIndexEntry>::value - 1 + sizeof(BlockIndexEntry) * entryCount +
2892 std::alignment_of<BlockIndexEntry*>::value - 1 + sizeof(BlockIndexEntry*) * nextBlockIndexCapacity));
2893 if (raw == nullptr) {
2894 return false;
2895 }
2896
2897 auto header = new (raw) BlockIndexHeader;
2898 auto entries = reinterpret_cast<BlockIndexEntry*>(details::align_for<BlockIndexEntry>(raw + sizeof(BlockIndexHeader)));
2899 auto index = reinterpret_cast<BlockIndexEntry**>(details::align_for<BlockIndexEntry*>(reinterpret_cast<char*>(entries) + sizeof(BlockIndexEntry) * entryCount));
2900 if (prev != nullptr) {
2901 auto prevTail = prev->tail.load(std::memory_order_relaxed);
2902 auto prevPos = prevTail;
2903 size_t i = 0;
2904 do {
2905 prevPos = (prevPos + 1) & (prev->capacity - 1);
2906 index[i++] = prev->index[prevPos];
2907 } while (prevPos != prevTail);
2908 assert(i == prevCapacity);
2909 }
2910 for (size_t i = 0; i != entryCount; ++i) {
2911 new (entries + i) BlockIndexEntry;
2912 entries[i].key.store(INVALID_BLOCK_BASE, std::memory_order_relaxed);
2913 index[prevCapacity + i] = entries + i;
2914 }
2915 header->prev = prev;
2916 header->entries = entries;
2917 header->index = index;
2918 header->capacity = nextBlockIndexCapacity;
2919 header->tail.store((prevCapacity - 1) & (nextBlockIndexCapacity - 1), std::memory_order_relaxed);
2920
2921 blockIndex.store(header, std::memory_order_release);
2922
2923 nextBlockIndexCapacity <<= 1;
2924
2925 return true;
2926 }
2927
2928 private:
2929 size_t nextBlockIndexCapacity;
2930 std::atomic<BlockIndexHeader*> blockIndex;
2931
2932#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2933 public:
2934 details::ThreadExitListener threadExitListener;
2935 private:
2936#endif
2937
2938#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2939 public:
2940 ImplicitProducer* nextImplicitProducer;
2941 private:
2942#endif
2943
2944#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2945 mutable debug::DebugMutex mutex;
2946#endif
2947#ifdef MCDBGQ_TRACKMEM
2948 friend struct MemStats;
2949#endif
2950 };
2951
2952
2953 //////////////////////////////////
2954 // Block pool manipulation
2955 //////////////////////////////////
2956
2957 void populate_initial_block_list(size_t blockCount)
2958 {
2959 initialBlockPoolSize = blockCount;
2960 if (initialBlockPoolSize == 0) {
2961 initialBlockPool = nullptr;
2962 return;
2963 }
2964
2965 initialBlockPool = create_array<Block>(blockCount);
2966 if (initialBlockPool == nullptr) {
2967 initialBlockPoolSize = 0;
2968 }
2969 for (size_t i = 0; i < initialBlockPoolSize; ++i) {
2970 initialBlockPool[i].dynamicallyAllocated = false;
2971 }
2972 }
2973
2974 inline Block* try_get_block_from_initial_pool()
2975 {
2976 if (initialBlockPoolIndex.load(std::memory_order_relaxed) >= initialBlockPoolSize) {
2977 return nullptr;
2978 }
2979
2980 auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed);
2981
2982 return index < initialBlockPoolSize ? (initialBlockPool + index) : nullptr;
2983 }
2984
2985 inline void add_block_to_free_list(Block* block)
2986 {
2987#ifdef MCDBGQ_TRACKMEM
2988 block->owner = nullptr;
2989#endif
2990 freeList.add(block);
2991 }
2992
2993 inline void add_blocks_to_free_list(Block* block)
2994 {
2995 while (block != nullptr) {
2996 auto next = block->next;
2997 add_block_to_free_list(block);
2998 block = next;
2999 }
3000 }
3001
3002 inline Block* try_get_block_from_free_list()
3003 {
3004 return freeList.try_get();
3005 }
3006
3007 // Gets a free block from one of the memory pools, or allocates a new one (if applicable)
3008 template<AllocationMode canAlloc>
3009 Block* requisition_block()
3010 {
3011 auto block = try_get_block_from_initial_pool();
3012 if (block != nullptr) {
3013 return block;
3014 }
3015
3016 block = try_get_block_from_free_list();
3017 if (block != nullptr) {
3018 return block;
3019 }
3020
3021 MOODYCAMEL_CONSTEXPR_IF (canAlloc == CanAlloc) {
3022 return create<Block>();
3023 }
3024 else {
3025 return nullptr;
3026 }
3027 }
3028
3029
3030#ifdef MCDBGQ_TRACKMEM
3031 public:
3032 struct MemStats {
3033 size_t allocatedBlocks;
3034 size_t usedBlocks;
3035 size_t freeBlocks;
3036 size_t ownedBlocksExplicit;
3037 size_t ownedBlocksImplicit;
3038 size_t implicitProducers;
3039 size_t explicitProducers;
3040 size_t elementsEnqueued;
3041 size_t blockClassBytes;
3042 size_t queueClassBytes;
3043 size_t implicitBlockIndexBytes;
3044 size_t explicitBlockIndexBytes;
3045
3046 friend class ConcurrentQueue;
3047
3048 private:
3049 static MemStats getFor(ConcurrentQueue* q)
3050 {
3051 MemStats stats = { 0 };
3052
3053 stats.elementsEnqueued = q->size_approx();
3054
3055 auto block = q->freeList.head_unsafe();
3056 while (block != nullptr) {
3057 ++stats.allocatedBlocks;
3058 ++stats.freeBlocks;
3059 block = block->freeListNext.load(std::memory_order_relaxed);
3060 }
3061
3062 for (auto ptr = q->producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
3063 bool implicit = dynamic_cast<ImplicitProducer*>(ptr) != nullptr;
3064 stats.implicitProducers += implicit ? 1 : 0;
3065 stats.explicitProducers += implicit ? 0 : 1;
3066
3067 if (implicit) {
3068 auto prod = static_cast<ImplicitProducer*>(ptr);
3069 stats.queueClassBytes += sizeof(ImplicitProducer);
3070 auto head = prod->headIndex.load(std::memory_order_relaxed);
3071 auto tail = prod->tailIndex.load(std::memory_order_relaxed);
3072 auto hash = prod->blockIndex.load(std::memory_order_relaxed);
3073 if (hash != nullptr) {
3074 for (size_t i = 0; i != hash->capacity; ++i) {
3075 if (hash->index[i]->key.load(std::memory_order_relaxed) != ImplicitProducer::INVALID_BLOCK_BASE && hash->index[i]->value.load(std::memory_order_relaxed) != nullptr) {
3076 ++stats.allocatedBlocks;
3077 ++stats.ownedBlocksImplicit;
3078 }
3079 }
3080 stats.implicitBlockIndexBytes += hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry);
3081 for (; hash != nullptr; hash = hash->prev) {
3082 stats.implicitBlockIndexBytes += sizeof(typename ImplicitProducer::BlockIndexHeader) + hash->capacity * sizeof(typename ImplicitProducer::BlockIndexEntry*);
3083 }
3084 }
3085 for (; details::circular_less_than<index_t>(head, tail); head += BLOCK_SIZE) {
3086 //auto block = prod->get_block_index_entry_for_index(head);
3087 ++stats.usedBlocks;
3088 }
3089 }
3090 else {
3091 auto prod = static_cast<ExplicitProducer*>(ptr);
3092 stats.queueClassBytes += sizeof(ExplicitProducer);
3093 auto tailBlock = prod->tailBlock;
3094 bool wasNonEmpty = false;
3095 if (tailBlock != nullptr) {
3096 auto block = tailBlock;
3097 do {
3098 ++stats.allocatedBlocks;
3099 if (!block->ConcurrentQueue::Block::template is_empty<explicit_context>() || wasNonEmpty) {
3100 ++stats.usedBlocks;
3101 wasNonEmpty = wasNonEmpty || block != tailBlock;
3102 }
3103 ++stats.ownedBlocksExplicit;
3104 block = block->next;
3105 } while (block != tailBlock);
3106 }
3107 auto index = prod->blockIndex.load(std::memory_order_relaxed);
3108 while (index != nullptr) {
3109 stats.explicitBlockIndexBytes += sizeof(typename ExplicitProducer::BlockIndexHeader) + index->size * sizeof(typename ExplicitProducer::BlockIndexEntry);
3110 index = static_cast<typename ExplicitProducer::BlockIndexHeader*>(index->prev);
3111 }
3112 }
3113 }
3114
3115 auto freeOnInitialPool = q->initialBlockPoolIndex.load(std::memory_order_relaxed) >= q->initialBlockPoolSize ? 0 : q->initialBlockPoolSize - q->initialBlockPoolIndex.load(std::memory_order_relaxed);
3116 stats.allocatedBlocks += freeOnInitialPool;
3117 stats.freeBlocks += freeOnInitialPool;
3118
3119 stats.blockClassBytes = sizeof(Block) * stats.allocatedBlocks;
3120 stats.queueClassBytes += sizeof(ConcurrentQueue);
3121
3122 return stats;
3123 }
3124 };
3125
3126 // For debugging only. Not thread-safe.
3127 MemStats getMemStats()
3128 {
3129 return MemStats::getFor(this);
3130 }
3131 private:
3132 friend struct MemStats;
3133#endif
3134
3135
3136 //////////////////////////////////
3137 // Producer list manipulation
3138 //////////////////////////////////
3139
3140 ProducerBase* recycle_or_create_producer(bool isExplicit)
3141 {
3142 bool recycled;
3143 return recycle_or_create_producer(isExplicit, recycled);
3144 }
3145
3146 ProducerBase* recycle_or_create_producer(bool isExplicit, bool& recycled)
3147 {
3148#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3149 debug::DebugLock lock(implicitProdMutex);
3150#endif
3151 // Try to re-use one first
3152 for (auto ptr = producerListTail.load(std::memory_order_acquire); ptr != nullptr; ptr = ptr->next_prod()) {
3153 if (ptr->inactive.load(std::memory_order_relaxed) && ptr->isExplicit == isExplicit) {
3154 bool expected = true;
3155 if (ptr->inactive.compare_exchange_strong(expected, /* desired */ false, std::memory_order_acquire, std::memory_order_relaxed)) {
3156 // We caught one! It's been marked as activated, the caller can have it
3157 recycled = true;
3158 return ptr;
3159 }
3160 }
3161 }
3162
3163 recycled = false;
3164 return add_producer(producer: isExplicit ? static_cast<ProducerBase*>(create<ExplicitProducer>(this)) : create<ImplicitProducer>(this));
3165 }
3166
3167 ProducerBase* add_producer(ProducerBase* producer)
3168 {
3169 // Handle failed memory allocation
3170 if (producer == nullptr) {
3171 return nullptr;
3172 }
3173
3174 producerCount.fetch_add(i: 1, m: std::memory_order_relaxed);
3175
3176 // Add it to the lock-free list
3177 auto prevTail = producerListTail.load(std::memory_order_relaxed);
3178 do {
3179 producer->next = prevTail;
3180 } while (!producerListTail.compare_exchange_weak(prevTail, producer, std::memory_order_release, std::memory_order_relaxed));
3181
3182#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3183 if (producer->isExplicit) {
3184 auto prevTailExplicit = explicitProducers.load(std::memory_order_relaxed);
3185 do {
3186 static_cast<ExplicitProducer*>(producer)->nextExplicitProducer = prevTailExplicit;
3187 } while (!explicitProducers.compare_exchange_weak(prevTailExplicit, static_cast<ExplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3188 }
3189 else {
3190 auto prevTailImplicit = implicitProducers.load(std::memory_order_relaxed);
3191 do {
3192 static_cast<ImplicitProducer*>(producer)->nextImplicitProducer = prevTailImplicit;
3193 } while (!implicitProducers.compare_exchange_weak(prevTailImplicit, static_cast<ImplicitProducer*>(producer), std::memory_order_release, std::memory_order_relaxed));
3194 }
3195#endif
3196
3197 return producer;
3198 }
3199
3200 void reown_producers()
3201 {
3202 // After another instance is moved-into/swapped-with this one, all the
3203 // producers we stole still think their parents are the other queue.
3204 // So fix them up!
3205 for (auto ptr = producerListTail.load(std::memory_order_relaxed); ptr != nullptr; ptr = ptr->next_prod()) {
3206 ptr->parent = this;
3207 }
3208 }
3209
3210
3211 //////////////////////////////////
3212 // Implicit producer hash
3213 //////////////////////////////////
3214
3215 struct ImplicitProducerKVP
3216 {
3217 std::atomic<details::thread_id_t> key;
3218 ImplicitProducer* value; // No need for atomicity since it's only read by the thread that sets it in the first place
3219
3220 ImplicitProducerKVP() : value(nullptr) { }
3221
3222 ImplicitProducerKVP(ImplicitProducerKVP&& other) MOODYCAMEL_NOEXCEPT
3223 {
3224 key.store(other.key.load(std::memory_order_relaxed), std::memory_order_relaxed);
3225 value = other.value;
3226 }
3227
3228 inline ImplicitProducerKVP& operator=(ImplicitProducerKVP&& other) MOODYCAMEL_NOEXCEPT
3229 {
3230 swap(other);
3231 return *this;
3232 }
3233
3234 inline void swap(ImplicitProducerKVP& other) MOODYCAMEL_NOEXCEPT
3235 {
3236 if (this != &other) {
3237 details::swap_relaxed(key, other.key);
3238 std::swap(value, other.value);
3239 }
3240 }
3241 };
3242
3243 template<typename XT, typename XTraits>
3244 friend void duckdb_moodycamel::swap(typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP&, typename ConcurrentQueue<XT, XTraits>::ImplicitProducerKVP&) MOODYCAMEL_NOEXCEPT;
3245
3246 struct ImplicitProducerHash
3247 {
3248 size_t capacity;
3249 ImplicitProducerKVP* entries;
3250 ImplicitProducerHash* prev;
3251 };
3252
3253 inline void populate_initial_implicit_producer_hash()
3254 {
3255 MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) {
3256 return;
3257 }
3258 else {
3259 implicitProducerHashCount.store(0, std::memory_order_relaxed);
3260 auto hash = &initialImplicitProducerHash;
3261 hash->capacity = INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
3262 hash->entries = &initialImplicitProducerHashEntries[0];
3263 for (size_t i = 0; i != INITIAL_IMPLICIT_PRODUCER_HASH_SIZE; ++i) {
3264 initialImplicitProducerHashEntries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3265 }
3266 hash->prev = nullptr;
3267 implicitProducerHash.store(hash, std::memory_order_relaxed);
3268 }
3269 }
3270
3271 void swap_implicit_producer_hashes(ConcurrentQueue& other)
3272 {
3273 MOODYCAMEL_CONSTEXPR_IF (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) {
3274 return;
3275 }
3276 else {
3277 // Swap (assumes our implicit producer hash is initialized)
3278 initialImplicitProducerHashEntries.swap(other.initialImplicitProducerHashEntries);
3279 initialImplicitProducerHash.entries = &initialImplicitProducerHashEntries[0];
3280 other.initialImplicitProducerHash.entries = &other.initialImplicitProducerHashEntries[0];
3281
3282 details::swap_relaxed(implicitProducerHashCount, other.implicitProducerHashCount);
3283
3284 details::swap_relaxed(implicitProducerHash, other.implicitProducerHash);
3285 if (implicitProducerHash.load(std::memory_order_relaxed) == &other.initialImplicitProducerHash) {
3286 implicitProducerHash.store(&initialImplicitProducerHash, std::memory_order_relaxed);
3287 }
3288 else {
3289 ImplicitProducerHash* hash;
3290 for (hash = implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &other.initialImplicitProducerHash; hash = hash->prev) {
3291 continue;
3292 }
3293 hash->prev = &initialImplicitProducerHash;
3294 }
3295 if (other.implicitProducerHash.load(std::memory_order_relaxed) == &initialImplicitProducerHash) {
3296 other.implicitProducerHash.store(&other.initialImplicitProducerHash, std::memory_order_relaxed);
3297 }
3298 else {
3299 ImplicitProducerHash* hash;
3300 for (hash = other.implicitProducerHash.load(std::memory_order_relaxed); hash->prev != &initialImplicitProducerHash; hash = hash->prev) {
3301 continue;
3302 }
3303 hash->prev = &other.initialImplicitProducerHash;
3304 }
3305 }
3306 }
3307
3308 // Only fails (returns nullptr) if memory allocation fails
3309 ImplicitProducer* get_or_add_implicit_producer()
3310 {
3311 // Note that since the data is essentially thread-local (key is thread ID),
3312 // there's a reduced need for fences (memory ordering is already consistent
3313 // for any individual thread), except for the current table itself.
3314
3315 // Start by looking for the thread ID in the current and all previous hash tables.
3316 // If it's not found, it must not be in there yet, since this same thread would
3317 // have added it previously to one of the tables that we traversed.
3318
3319 // Code and algorithm adapted from http://preshing.com/20130605/the-worlds-simplest-lock-free-hash-table
3320
3321#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3322 debug::DebugLock lock(implicitProdMutex);
3323#endif
3324
3325 auto id = details::thread_id();
3326 auto hashedId = details::hash_thread_id(id);
3327
3328 auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
3329 assert(mainHash != nullptr); // silence clang-tidy and MSVC warnings (hash cannot be null)
3330 for (auto hash = mainHash; hash != nullptr; hash = hash->prev) {
3331 // Look for the id in this hash
3332 auto index = hashedId;
3333 while (true) { // Not an infinite loop because at least one slot is free in the hash table
3334 index &= hash->capacity - 1;
3335
3336 auto probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3337 if (probedKey == id) {
3338 // Found it! If we had to search several hashes deep, though, we should lazily add it
3339 // to the current main hash table to avoid the extended search next time.
3340 // Note there's guaranteed to be room in the current hash table since every subsequent
3341 // table implicitly reserves space for all previous tables (there's only one
3342 // implicitProducerHashCount).
3343 auto value = hash->entries[index].value;
3344 if (hash != mainHash) {
3345 index = hashedId;
3346 while (true) {
3347 index &= mainHash->capacity - 1;
3348 probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3349 auto empty = details::invalid_thread_id;
3350#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3351 auto reusable = details::invalid_thread_id2;
3352 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3353 (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire, std::memory_order_acquire))) {
3354#else
3355 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed))) {
3356#endif
3357 mainHash->entries[index].value = value;
3358 break;
3359 }
3360 ++index;
3361 }
3362 }
3363
3364 return value;
3365 }
3366 if (probedKey == details::invalid_thread_id) {
3367 break; // Not in this hash table
3368 }
3369 ++index;
3370 }
3371 }
3372
3373 // Insert!
3374 auto newCount = 1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
3375 while (true) {
3376 // NOLINTNEXTLINE(clang-analyzer-core.NullDereference)
3377 if (newCount >= (mainHash->capacity >> 1) && !implicitProducerHashResizeInProgress.test_and_set(m: std::memory_order_acquire)) {
3378 // We've acquired the resize lock, try to allocate a bigger hash table.
3379 // Note the acquire fence synchronizes with the release fence at the end of this block, and hence when
3380 // we reload implicitProducerHash it must be the most recent version (it only gets changed within this
3381 // locked block).
3382 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3383 if (newCount >= (mainHash->capacity >> 1)) {
3384 auto newCapacity = mainHash->capacity << 1;
3385 while (newCount >= (newCapacity >> 1)) {
3386 newCapacity <<= 1;
3387 }
3388 auto raw = static_cast<char*>((Traits::malloc)(sizeof(ImplicitProducerHash) + std::alignment_of<ImplicitProducerKVP>::value - 1 + sizeof(ImplicitProducerKVP) * newCapacity));
3389 if (raw == nullptr) {
3390 // Allocation failed
3391 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3392 implicitProducerHashResizeInProgress.clear(m: std::memory_order_relaxed);
3393 return nullptr;
3394 }
3395
3396 auto newHash = new (raw) ImplicitProducerHash;
3397 newHash->capacity = newCapacity;
3398 newHash->entries = reinterpret_cast<ImplicitProducerKVP*>(details::align_for<ImplicitProducerKVP>(raw + sizeof(ImplicitProducerHash)));
3399 for (size_t i = 0; i != newCapacity; ++i) {
3400 new (newHash->entries + i) ImplicitProducerKVP;
3401 newHash->entries[i].key.store(details::invalid_thread_id, std::memory_order_relaxed);
3402 }
3403 newHash->prev = mainHash;
3404 implicitProducerHash.store(newHash, std::memory_order_release);
3405 implicitProducerHashResizeInProgress.clear(m: std::memory_order_release);
3406 mainHash = newHash;
3407 }
3408 else {
3409 implicitProducerHashResizeInProgress.clear(m: std::memory_order_release);
3410 }
3411 }
3412
3413 // If it's < three-quarters full, add to the old one anyway so that we don't have to wait for the next table
3414 // to finish being allocated by another thread (and if we just finished allocating above, the condition will
3415 // always be true)
3416 if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
3417 bool recycled;
3418 auto producer = static_cast<ImplicitProducer*>(recycle_or_create_producer(false, recycled));
3419 if (producer == nullptr) {
3420 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3421 return nullptr;
3422 }
3423 if (recycled) {
3424 implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3425 }
3426
3427#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3428 producer->threadExitListener.callback = &ConcurrentQueue::implicit_producer_thread_exited_callback;
3429 producer->threadExitListener.userData = producer;
3430 details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
3431#endif
3432
3433 auto index = hashedId;
3434 while (true) {
3435 index &= mainHash->capacity - 1;
3436 auto probedKey = mainHash->entries[index].key.load(std::memory_order_relaxed);
3437
3438 auto empty = details::invalid_thread_id;
3439#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3440 auto reusable = details::invalid_thread_id2;
3441 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed)) ||
3442 (probedKey == reusable && mainHash->entries[index].key.compare_exchange_strong(reusable, id, std::memory_order_acquire, std::memory_order_acquire))) {
3443#else
3444 if ((probedKey == empty && mainHash->entries[index].key.compare_exchange_strong(empty, id, std::memory_order_relaxed, std::memory_order_relaxed))) {
3445#endif
3446 mainHash->entries[index].value = producer;
3447 break;
3448 }
3449 ++index;
3450 }
3451 return producer;
3452 }
3453
3454 // Hmm, the old hash is quite full and somebody else is busy allocating a new one.
3455 // We need to wait for the allocating thread to finish (if it succeeds, we add, if not,
3456 // we try to allocate ourselves).
3457 mainHash = implicitProducerHash.load(std::memory_order_acquire);
3458 }
3459 }
3460
3461#ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3462 void implicit_producer_thread_exited(ImplicitProducer* producer)
3463 {
3464 // Remove from thread exit listeners
3465 details::ThreadExitNotifier::unsubscribe(&producer->threadExitListener);
3466
3467 // Remove from hash
3468#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3469 debug::DebugLock lock(implicitProdMutex);
3470#endif
3471 auto hash = implicitProducerHash.load(std::memory_order_acquire);
3472 assert(hash != nullptr); // The thread exit listener is only registered if we were added to a hash in the first place
3473 auto id = details::thread_id();
3474 auto hashedId = details::hash_thread_id(id);
3475 details::thread_id_t probedKey;
3476
3477 // We need to traverse all the hashes just in case other threads aren't on the current one yet and are
3478 // trying to add an entry thinking there's a free slot (because they reused a producer)
3479 for (; hash != nullptr; hash = hash->prev) {
3480 auto index = hashedId;
3481 do {
3482 index &= hash->capacity - 1;
3483 probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
3484 if (probedKey == id) {
3485 hash->entries[index].key.store(details::invalid_thread_id2, std::memory_order_release);
3486 break;
3487 }
3488 ++index;
3489 } while (probedKey != details::invalid_thread_id); // Can happen if the hash has changed but we weren't put back in it yet, or if we weren't added to this hash in the first place
3490 }
3491
3492 // Mark the queue as being recyclable
3493 producer->inactive.store(true, std::memory_order_release);
3494 }
3495
3496 static void implicit_producer_thread_exited_callback(void* userData)
3497 {
3498 auto producer = static_cast<ImplicitProducer*>(userData);
3499 auto queue = producer->parent;
3500 queue->implicit_producer_thread_exited(producer);
3501 }
3502#endif
3503
3504 //////////////////////////////////
3505 // Utility functions
3506 //////////////////////////////////
3507
3508 template<typename TAlign>
3509 static inline void* aligned_malloc(size_t size)
3510 {
3511 if (std::alignment_of<TAlign>::value <= std::alignment_of<details::max_align_t>::value)
3512 return (Traits::malloc)(size);
3513 size_t alignment = std::alignment_of<TAlign>::value;
3514 void* raw = (Traits::malloc)(size + alignment - 1 + sizeof(void*));
3515 if (!raw)
3516 return nullptr;
3517 char* ptr = details::align_for<TAlign>(reinterpret_cast<char*>(raw) + sizeof(void*));
3518 *(reinterpret_cast<void**>(ptr) - 1) = raw;
3519 return ptr;
3520 }
3521
3522 template<typename TAlign>
3523 static inline void aligned_free(void* ptr)
3524 {
3525 if (std::alignment_of<TAlign>::value <= std::alignment_of<details::max_align_t>::value)
3526 return (Traits::free)(ptr);
3527 (Traits::free)(ptr ? *(reinterpret_cast<void**>(ptr) - 1) : nullptr);
3528 }
3529
3530 template<typename U>
3531 static inline U* create_array(size_t count)
3532 {
3533 assert(count > 0);
3534 U* p = static_cast<U*>(aligned_malloc<U>(sizeof(U) * count));
3535 if (p == nullptr)
3536 return nullptr;
3537
3538 for (size_t i = 0; i != count; ++i)
3539 new (p + i) U();
3540 return p;
3541 }
3542
3543 template<typename U>
3544 static inline void destroy_array(U* p, size_t count)
3545 {
3546 if (p != nullptr) {
3547 assert(count > 0);
3548 for (size_t i = count; i != 0; )
3549 (p + --i)->~U();
3550 }
3551 aligned_free<U>(p);
3552 }
3553
3554 template<typename U>
3555 static inline U* create()
3556 {
3557 void* p = aligned_malloc<U>(sizeof(U));
3558 return p != nullptr ? new (p) U : nullptr;
3559 }
3560
3561 template<typename U, typename A1>
3562 static inline U* create(A1&& a1)
3563 {
3564 void* p = aligned_malloc<U>(sizeof(U));
3565 return p != nullptr ? new (p) U(std::forward<A1>(a1)) : nullptr;
3566 }
3567
3568 template<typename U>
3569 static inline void destroy(U* p)
3570 {
3571 if (p != nullptr)
3572 p->~U();
3573 aligned_free<U>(p);
3574 }
3575
3576private:
3577 std::atomic<ProducerBase*> producerListTail;
3578 std::atomic<std::uint32_t> producerCount;
3579
3580 std::atomic<size_t> initialBlockPoolIndex;
3581 Block* initialBlockPool;
3582 size_t initialBlockPoolSize;
3583
3584#ifndef MCDBGQ_USEDEBUGFREELIST
3585 FreeList<Block> freeList;
3586#else
3587 debug::DebugFreeList<Block> freeList;
3588#endif
3589
3590 std::atomic<ImplicitProducerHash*> implicitProducerHash;
3591 std::atomic<size_t> implicitProducerHashCount; // Number of slots logically used
3592 ImplicitProducerHash initialImplicitProducerHash;
3593 std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE> initialImplicitProducerHashEntries;
3594 std::atomic_flag implicitProducerHashResizeInProgress;
3595
3596 std::atomic<std::uint32_t> nextExplicitConsumerId;
3597 std::atomic<std::uint32_t> globalExplicitConsumerOffset;
3598
3599#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3600 debug::DebugMutex implicitProdMutex;
3601#endif
3602
3603#ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3604 std::atomic<ExplicitProducer*> explicitProducers;
3605 std::atomic<ImplicitProducer*> implicitProducers;
3606#endif
3607};
3608
3609
3610template<typename T, typename Traits>
3611ProducerToken::ProducerToken(ConcurrentQueue<T, Traits>& queue)
3612 : producer(queue.recycle_or_create_producer(true))
3613{
3614 if (producer != nullptr) {
3615 producer->token = this;
3616 }
3617}
3618
3619template<typename T, typename Traits>
3620ProducerToken::ProducerToken(BlockingConcurrentQueue<T, Traits>& queue)
3621 : producer(reinterpret_cast<ConcurrentQueue<T, Traits>*>(&queue)->recycle_or_create_producer(true))
3622{
3623 if (producer != nullptr) {
3624 producer->token = this;
3625 }
3626}
3627
3628template<typename T, typename Traits>
3629ConsumerToken::ConsumerToken(ConcurrentQueue<T, Traits>& queue)
3630 : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr)
3631{
3632 initialOffset = queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3633 lastKnownGlobalOffset = -1;
3634}
3635
3636template<typename T, typename Traits>
3637ConsumerToken::ConsumerToken(BlockingConcurrentQueue<T, Traits>& queue)
3638 : itemsConsumedFromCurrent(0), currentProducer(nullptr), desiredProducer(nullptr)
3639{
3640 initialOffset = reinterpret_cast<ConcurrentQueue<T, Traits>*>(&queue)->nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
3641 lastKnownGlobalOffset = -1;
3642}
3643
3644template<typename T, typename Traits>
3645inline void swap(ConcurrentQueue<T, Traits>& a, ConcurrentQueue<T, Traits>& b) MOODYCAMEL_NOEXCEPT
3646{
3647 a.swap(b);
3648}
3649
3650inline void swap(ProducerToken& a, ProducerToken& b) MOODYCAMEL_NOEXCEPT
3651{
3652 a.swap(other&: b);
3653}
3654
3655inline void swap(ConsumerToken& a, ConsumerToken& b) MOODYCAMEL_NOEXCEPT
3656{
3657 a.swap(other&: b);
3658}
3659
3660template<typename T, typename Traits>
3661inline void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& a, typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP& b) MOODYCAMEL_NOEXCEPT
3662{
3663 a.swap(b);
3664}
3665
3666}
3667
3668