1// Provides an efficient implementation of a semaphore (LightweightSemaphore).
2// This is an extension of Jeff Preshing's sempahore implementation (licensed
3// under the terms of its separate zlib license) that has been adapted and
4// extended by Cameron Desrochers.
5
6#pragma once
7
8#include <cstddef> // For std::size_t
9#include <atomic>
10#include <type_traits> // For std::make_signed<T>
11
12#if defined(_WIN32)
13// Avoid including windows.h in a header; we only need a handful of
14// items, so we'll redeclare them here (this is relatively safe since
15// the API generally has to remain stable between Windows versions).
16// I know this is an ugly hack but it still beats polluting the global
17// namespace with thousands of generic names or adding a .cpp for nothing.
18extern "C" {
19 struct _SECURITY_ATTRIBUTES;
20 __declspec(dllimport) void* __stdcall CreateSemaphoreW(_SECURITY_ATTRIBUTES* lpSemaphoreAttributes, long lInitialCount, long lMaximumCount, const wchar_t* lpName);
21 __declspec(dllimport) int __stdcall CloseHandle(void* hObject);
22 __declspec(dllimport) unsigned long __stdcall WaitForSingleObject(void* hHandle, unsigned long dwMilliseconds);
23 __declspec(dllimport) int __stdcall ReleaseSemaphore(void* hSemaphore, long lReleaseCount, long* lpPreviousCount);
24}
25#elif defined(__MACH__)
26#include <mach/mach.h>
27#elif defined(__unix__)
28#include <semaphore.h>
29#include <chrono>
30#endif
31
32namespace duckdb_moodycamel
33{
34namespace details
35{
36
37// Code in the mpmc_sema namespace below is an adaptation of Jeff Preshing's
38// portable + lightweight semaphore implementations, originally from
39// https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h
40// LICENSE:
41// Copyright (c) 2015 Jeff Preshing
42//
43// This software is provided 'as-is', without any express or implied
44// warranty. In no event will the authors be held liable for any damages
45// arising from the use of this software.
46//
47// Permission is granted to anyone to use this software for any purpose,
48// including commercial applications, and to alter it and redistribute it
49// freely, subject to the following restrictions:
50//
51// 1. The origin of this software must not be misrepresented; you must not
52// claim that you wrote the original software. If you use this software
53// in a product, an acknowledgement in the product documentation would be
54// appreciated but is not required.
55// 2. Altered source versions must be plainly marked as such, and must not be
56// misrepresented as being the original software.
57// 3. This notice may not be removed or altered from any source distribution.
58#if defined(_WIN32)
59class Semaphore
60{
61private:
62 void* m_hSema;
63
64 Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
65 Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
66
67public:
68 Semaphore(int initialCount = 0)
69 {
70 assert(initialCount >= 0);
71 const long maxLong = 0x7fffffff;
72 m_hSema = CreateSemaphoreW(nullptr, initialCount, maxLong, nullptr);
73 assert(m_hSema);
74 }
75
76 ~Semaphore()
77 {
78 CloseHandle(m_hSema);
79 }
80
81 bool wait()
82 {
83 const unsigned long infinite = 0xffffffff;
84 return WaitForSingleObject(m_hSema, infinite) == 0;
85 }
86
87 bool try_wait()
88 {
89 return WaitForSingleObject(m_hSema, 0) == 0;
90 }
91
92 bool timed_wait(std::uint64_t usecs)
93 {
94 return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) == 0;
95 }
96
97 void signal(int count = 1)
98 {
99 while (!ReleaseSemaphore(m_hSema, count, nullptr));
100 }
101};
102#elif defined(__MACH__)
103//---------------------------------------------------------
104// Semaphore (Apple iOS and OSX)
105// Can't use POSIX semaphores due to http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html
106//---------------------------------------------------------
107class Semaphore
108{
109private:
110 semaphore_t m_sema;
111
112 Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
113 Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
114
115public:
116 Semaphore(int initialCount = 0)
117 {
118 assert(initialCount >= 0);
119 kern_return_t rc = semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount);
120 assert(rc == KERN_SUCCESS);
121 (void)rc;
122 }
123
124 ~Semaphore()
125 {
126 semaphore_destroy(mach_task_self(), m_sema);
127 }
128
129 bool wait()
130 {
131 return semaphore_wait(m_sema) == KERN_SUCCESS;
132 }
133
134 bool try_wait()
135 {
136 return timed_wait(0);
137 }
138
139 bool timed_wait(std::uint64_t timeout_usecs)
140 {
141 mach_timespec_t ts;
142 ts.tv_sec = static_cast<unsigned int>(timeout_usecs / 1000000);
143 ts.tv_nsec = (timeout_usecs % 1000000) * 1000;
144
145 // added in OSX 10.10: https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html
146 kern_return_t rc = semaphore_timedwait(m_sema, ts);
147 return rc == KERN_SUCCESS;
148 }
149
150 void signal()
151 {
152 while (semaphore_signal(m_sema) != KERN_SUCCESS);
153 }
154
155 void signal(int count)
156 {
157 while (count-- > 0)
158 {
159 while (semaphore_signal(m_sema) != KERN_SUCCESS);
160 }
161 }
162};
163#elif defined(__unix__)
164//---------------------------------------------------------
165// Semaphore (POSIX, Linux)
166//---------------------------------------------------------
167class Semaphore
168{
169private:
170 sem_t m_sema;
171
172 Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
173 Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
174
175public:
176 Semaphore(int initialCount = 0)
177 {
178 assert(initialCount >= 0);
179 int rc = sem_init(sem: &m_sema, pshared: 0, value: initialCount);
180 assert(rc == 0);
181 (void)rc;
182 }
183
184 ~Semaphore()
185 {
186 sem_destroy(sem: &m_sema);
187 }
188
189 bool wait()
190 {
191 // http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error
192 int rc;
193 do {
194 rc = sem_wait(sem: &m_sema);
195 } while (rc == -1 && errno == EINTR);
196 return rc == 0;
197 }
198
199 bool try_wait()
200 {
201 int rc;
202 do {
203 rc = sem_trywait(sem: &m_sema);
204 } while (rc == -1 && errno == EINTR);
205 return rc == 0;
206 }
207
208 bool timed_wait(std::uint64_t usecs)
209 {
210 struct timespec ts;
211 const int usecs_in_1_sec = 1000000;
212 const int nsecs_in_1_sec = 1000000000;
213
214 // sem_timedwait needs an absolute time
215 // hence we need to first obtain the current time
216 // and then add the maximum time we want to wait
217 // we want to avoid clock_gettime because of linking issues
218 // chrono -> timespec conversion from here: https://embeddedartistry.com/blog/2019/01/31/converting-between-timespec-stdchrono/
219 auto current_time = std::chrono::system_clock::now();
220 auto secs = std::chrono::time_point_cast<std::chrono::seconds>(t: current_time);
221 auto ns = std::chrono::time_point_cast<std::chrono::nanoseconds>(t: current_time) - std::chrono::time_point_cast<std::chrono::nanoseconds>(t: secs);
222
223 ts.tv_sec = secs.time_since_epoch().count();
224 ts.tv_nsec = ns.count();
225
226 // now add the time we want to wait
227 ts.tv_sec += usecs / usecs_in_1_sec;
228 ts.tv_nsec += (usecs % usecs_in_1_sec) * 1000;
229
230 // sem_timedwait bombs if you have more than 1e9 in tv_nsec
231 // so we have to clean things up before passing it in
232 if (ts.tv_nsec >= nsecs_in_1_sec) {
233 ts.tv_nsec -= nsecs_in_1_sec;
234 ++ts.tv_sec;
235 }
236
237 int rc;
238 do {
239 rc = sem_timedwait(sem: &m_sema, abstime: &ts);
240 } while (rc == -1 && errno == EINTR);
241 return rc == 0;
242 }
243
244 void signal()
245 {
246 while (sem_post(sem: &m_sema) == -1);
247 }
248
249 void signal(int count)
250 {
251 while (count-- > 0)
252 {
253 while (sem_post(sem: &m_sema) == -1);
254 }
255 }
256};
257#else
258#error Unsupported platform! (No semaphore wrapper available)
259#endif
260
261} // end namespace details
262
263
264//---------------------------------------------------------
265// LightweightSemaphore
266//---------------------------------------------------------
267class LightweightSemaphore
268{
269public:
270 typedef std::make_signed<std::size_t>::type ssize_t;
271
272private:
273 std::atomic<ssize_t> m_count;
274 details::Semaphore m_sema;
275
276 bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1)
277 {
278 ssize_t oldCount;
279 // Is there a better way to set the initial spin count?
280 // If we lower it to 1000, testBenaphore becomes 15x slower on my Core i7-5930K Windows PC,
281 // as threads start hitting the kernel semaphore.
282 int spin = 10000;
283 while (--spin >= 0)
284 {
285 oldCount = m_count.load(m: std::memory_order_relaxed);
286 if ((oldCount > 0) && m_count.compare_exchange_strong(i1&: oldCount, i2: oldCount - 1, m1: std::memory_order_acquire, m2: std::memory_order_relaxed))
287 return true;
288 std::atomic_signal_fence(m: std::memory_order_acquire); // Prevent the compiler from collapsing the loop.
289 }
290 oldCount = m_count.fetch_sub(i: 1, m: std::memory_order_acquire);
291 if (oldCount > 0)
292 return true;
293 if (timeout_usecs < 0)
294 return m_sema.wait();
295 if (m_sema.timed_wait(usecs: (std::uint64_t)timeout_usecs))
296 return true;
297 // At this point, we've timed out waiting for the semaphore, but the
298 // count is still decremented indicating we may still be waiting on
299 // it. So we have to re-adjust the count, but only if the semaphore
300 // wasn't signaled enough times for us too since then. If it was, we
301 // need to release the semaphore too.
302 while (true)
303 {
304 oldCount = m_count.load(m: std::memory_order_acquire);
305 if (oldCount >= 0 && m_sema.try_wait())
306 return true;
307 if (oldCount < 0 && m_count.compare_exchange_strong(i1&: oldCount, i2: oldCount + 1, m1: std::memory_order_relaxed, m2: std::memory_order_relaxed))
308 return false;
309 }
310 }
311
312 ssize_t waitManyWithPartialSpinning(ssize_t max, std::int64_t timeout_usecs = -1)
313 {
314 assert(max > 0);
315 ssize_t oldCount;
316 int spin = 10000;
317 while (--spin >= 0)
318 {
319 oldCount = m_count.load(m: std::memory_order_relaxed);
320 if (oldCount > 0)
321 {
322 ssize_t newCount = oldCount > max ? oldCount - max : 0;
323 if (m_count.compare_exchange_strong(i1&: oldCount, i2: newCount, m1: std::memory_order_acquire, m2: std::memory_order_relaxed))
324 return oldCount - newCount;
325 }
326 std::atomic_signal_fence(m: std::memory_order_acquire);
327 }
328 oldCount = m_count.fetch_sub(i: 1, m: std::memory_order_acquire);
329 if (oldCount <= 0)
330 {
331 if (timeout_usecs < 0)
332 {
333 if (!m_sema.wait())
334 return 0;
335 }
336 else if (!m_sema.timed_wait(usecs: (std::uint64_t)timeout_usecs))
337 {
338 while (true)
339 {
340 oldCount = m_count.load(m: std::memory_order_acquire);
341 if (oldCount >= 0 && m_sema.try_wait())
342 break;
343 if (oldCount < 0 && m_count.compare_exchange_strong(i1&: oldCount, i2: oldCount + 1, m1: std::memory_order_relaxed, m2: std::memory_order_relaxed))
344 return 0;
345 }
346 }
347 }
348 if (max > 1)
349 return 1 + tryWaitMany(max: max - 1);
350 return 1;
351 }
352
353public:
354 LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount)
355 {
356 assert(initialCount >= 0);
357 }
358
359 bool tryWait()
360 {
361 ssize_t oldCount = m_count.load(m: std::memory_order_relaxed);
362 while (oldCount > 0)
363 {
364 if (m_count.compare_exchange_weak(i1&: oldCount, i2: oldCount - 1, m1: std::memory_order_acquire, m2: std::memory_order_relaxed))
365 return true;
366 }
367 return false;
368 }
369
370 bool wait()
371 {
372 return tryWait() || waitWithPartialSpinning();
373 }
374
375 bool wait(std::int64_t timeout_usecs)
376 {
377 return tryWait() || waitWithPartialSpinning(timeout_usecs);
378 }
379
380 // Acquires between 0 and (greedily) max, inclusive
381 ssize_t tryWaitMany(ssize_t max)
382 {
383 assert(max >= 0);
384 ssize_t oldCount = m_count.load(m: std::memory_order_relaxed);
385 while (oldCount > 0)
386 {
387 ssize_t newCount = oldCount > max ? oldCount - max : 0;
388 if (m_count.compare_exchange_weak(i1&: oldCount, i2: newCount, m1: std::memory_order_acquire, m2: std::memory_order_relaxed))
389 return oldCount - newCount;
390 }
391 return 0;
392 }
393
394 // Acquires at least one, and (greedily) at most max
395 ssize_t waitMany(ssize_t max, std::int64_t timeout_usecs)
396 {
397 assert(max >= 0);
398 ssize_t result = tryWaitMany(max);
399 if (result == 0 && max > 0)
400 result = waitManyWithPartialSpinning(max, timeout_usecs);
401 return result;
402 }
403
404 ssize_t waitMany(ssize_t max)
405 {
406 ssize_t result = waitMany(max, timeout_usecs: -1);
407 assert(result > 0);
408 return result;
409 }
410
411 void signal(ssize_t count = 1)
412 {
413 assert(count >= 0);
414 ssize_t oldCount = m_count.fetch_add(i: count, m: std::memory_order_release);
415 ssize_t toRelease = -oldCount < count ? -oldCount : count;
416 if (toRelease > 0)
417 {
418 m_sema.signal(count: (int)toRelease);
419 }
420 }
421
422 ssize_t availableApprox() const
423 {
424 ssize_t count = m_count.load(m: std::memory_order_relaxed);
425 return count > 0 ? count : 0;
426 }
427};
428
429} // end namespace duckdb_moodycamel
430