1 | // Provides an efficient implementation of a semaphore (LightweightSemaphore). |
2 | // This is an extension of Jeff Preshing's sempahore implementation (licensed |
3 | // under the terms of its separate zlib license) that has been adapted and |
4 | // extended by Cameron Desrochers. |
5 | |
6 | #pragma once |
7 | |
8 | #include <cstddef> // For std::size_t |
9 | #include <atomic> |
10 | #include <type_traits> // For std::make_signed<T> |
11 | |
12 | #if defined(_WIN32) |
13 | // Avoid including windows.h in a header; we only need a handful of |
14 | // items, so we'll redeclare them here (this is relatively safe since |
15 | // the API generally has to remain stable between Windows versions). |
16 | // I know this is an ugly hack but it still beats polluting the global |
17 | // namespace with thousands of generic names or adding a .cpp for nothing. |
18 | extern "C" { |
19 | struct _SECURITY_ATTRIBUTES; |
20 | __declspec(dllimport) void* __stdcall CreateSemaphoreW(_SECURITY_ATTRIBUTES* lpSemaphoreAttributes, long lInitialCount, long lMaximumCount, const wchar_t* lpName); |
21 | __declspec(dllimport) int __stdcall CloseHandle(void* hObject); |
22 | __declspec(dllimport) unsigned long __stdcall WaitForSingleObject(void* hHandle, unsigned long dwMilliseconds); |
23 | __declspec(dllimport) int __stdcall ReleaseSemaphore(void* hSemaphore, long lReleaseCount, long* lpPreviousCount); |
24 | } |
25 | #elif defined(__MACH__) |
26 | #include <mach/mach.h> |
27 | #elif defined(__unix__) |
28 | #include <semaphore.h> |
29 | #include <chrono> |
30 | #endif |
31 | |
32 | namespace duckdb_moodycamel |
33 | { |
34 | namespace details |
35 | { |
36 | |
37 | // Code in the mpmc_sema namespace below is an adaptation of Jeff Preshing's |
38 | // portable + lightweight semaphore implementations, originally from |
39 | // https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h |
40 | // LICENSE: |
41 | // Copyright (c) 2015 Jeff Preshing |
42 | // |
43 | // This software is provided 'as-is', without any express or implied |
44 | // warranty. In no event will the authors be held liable for any damages |
45 | // arising from the use of this software. |
46 | // |
47 | // Permission is granted to anyone to use this software for any purpose, |
48 | // including commercial applications, and to alter it and redistribute it |
49 | // freely, subject to the following restrictions: |
50 | // |
51 | // 1. The origin of this software must not be misrepresented; you must not |
52 | // claim that you wrote the original software. If you use this software |
53 | // in a product, an acknowledgement in the product documentation would be |
54 | // appreciated but is not required. |
55 | // 2. Altered source versions must be plainly marked as such, and must not be |
56 | // misrepresented as being the original software. |
57 | // 3. This notice may not be removed or altered from any source distribution. |
58 | #if defined(_WIN32) |
59 | class Semaphore |
60 | { |
61 | private: |
62 | void* m_hSema; |
63 | |
64 | Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION; |
65 | Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION; |
66 | |
67 | public: |
68 | Semaphore(int initialCount = 0) |
69 | { |
70 | assert(initialCount >= 0); |
71 | const long maxLong = 0x7fffffff; |
72 | m_hSema = CreateSemaphoreW(nullptr, initialCount, maxLong, nullptr); |
73 | assert(m_hSema); |
74 | } |
75 | |
76 | ~Semaphore() |
77 | { |
78 | CloseHandle(m_hSema); |
79 | } |
80 | |
81 | bool wait() |
82 | { |
83 | const unsigned long infinite = 0xffffffff; |
84 | return WaitForSingleObject(m_hSema, infinite) == 0; |
85 | } |
86 | |
87 | bool try_wait() |
88 | { |
89 | return WaitForSingleObject(m_hSema, 0) == 0; |
90 | } |
91 | |
92 | bool timed_wait(std::uint64_t usecs) |
93 | { |
94 | return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) == 0; |
95 | } |
96 | |
97 | void signal(int count = 1) |
98 | { |
99 | while (!ReleaseSemaphore(m_hSema, count, nullptr)); |
100 | } |
101 | }; |
102 | #elif defined(__MACH__) |
103 | //--------------------------------------------------------- |
104 | // Semaphore (Apple iOS and OSX) |
105 | // Can't use POSIX semaphores due to http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html |
106 | //--------------------------------------------------------- |
107 | class Semaphore |
108 | { |
109 | private: |
110 | semaphore_t m_sema; |
111 | |
112 | Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION; |
113 | Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION; |
114 | |
115 | public: |
116 | Semaphore(int initialCount = 0) |
117 | { |
118 | assert(initialCount >= 0); |
119 | kern_return_t rc = semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount); |
120 | assert(rc == KERN_SUCCESS); |
121 | (void)rc; |
122 | } |
123 | |
124 | ~Semaphore() |
125 | { |
126 | semaphore_destroy(mach_task_self(), m_sema); |
127 | } |
128 | |
129 | bool wait() |
130 | { |
131 | return semaphore_wait(m_sema) == KERN_SUCCESS; |
132 | } |
133 | |
134 | bool try_wait() |
135 | { |
136 | return timed_wait(0); |
137 | } |
138 | |
139 | bool timed_wait(std::uint64_t timeout_usecs) |
140 | { |
141 | mach_timespec_t ts; |
142 | ts.tv_sec = static_cast<unsigned int>(timeout_usecs / 1000000); |
143 | ts.tv_nsec = (timeout_usecs % 1000000) * 1000; |
144 | |
145 | // added in OSX 10.10: https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html |
146 | kern_return_t rc = semaphore_timedwait(m_sema, ts); |
147 | return rc == KERN_SUCCESS; |
148 | } |
149 | |
150 | void signal() |
151 | { |
152 | while (semaphore_signal(m_sema) != KERN_SUCCESS); |
153 | } |
154 | |
155 | void signal(int count) |
156 | { |
157 | while (count-- > 0) |
158 | { |
159 | while (semaphore_signal(m_sema) != KERN_SUCCESS); |
160 | } |
161 | } |
162 | }; |
163 | #elif defined(__unix__) |
164 | //--------------------------------------------------------- |
165 | // Semaphore (POSIX, Linux) |
166 | //--------------------------------------------------------- |
167 | class Semaphore |
168 | { |
169 | private: |
170 | sem_t m_sema; |
171 | |
172 | Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION; |
173 | Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION; |
174 | |
175 | public: |
176 | Semaphore(int initialCount = 0) |
177 | { |
178 | assert(initialCount >= 0); |
179 | int rc = sem_init(sem: &m_sema, pshared: 0, value: initialCount); |
180 | assert(rc == 0); |
181 | (void)rc; |
182 | } |
183 | |
184 | ~Semaphore() |
185 | { |
186 | sem_destroy(sem: &m_sema); |
187 | } |
188 | |
189 | bool wait() |
190 | { |
191 | // http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error |
192 | int rc; |
193 | do { |
194 | rc = sem_wait(sem: &m_sema); |
195 | } while (rc == -1 && errno == EINTR); |
196 | return rc == 0; |
197 | } |
198 | |
199 | bool try_wait() |
200 | { |
201 | int rc; |
202 | do { |
203 | rc = sem_trywait(sem: &m_sema); |
204 | } while (rc == -1 && errno == EINTR); |
205 | return rc == 0; |
206 | } |
207 | |
208 | bool timed_wait(std::uint64_t usecs) |
209 | { |
210 | struct timespec ts; |
211 | const int usecs_in_1_sec = 1000000; |
212 | const int nsecs_in_1_sec = 1000000000; |
213 | |
214 | // sem_timedwait needs an absolute time |
215 | // hence we need to first obtain the current time |
216 | // and then add the maximum time we want to wait |
217 | // we want to avoid clock_gettime because of linking issues |
218 | // chrono -> timespec conversion from here: https://embeddedartistry.com/blog/2019/01/31/converting-between-timespec-stdchrono/ |
219 | auto current_time = std::chrono::system_clock::now(); |
220 | auto secs = std::chrono::time_point_cast<std::chrono::seconds>(t: current_time); |
221 | auto ns = std::chrono::time_point_cast<std::chrono::nanoseconds>(t: current_time) - std::chrono::time_point_cast<std::chrono::nanoseconds>(t: secs); |
222 | |
223 | ts.tv_sec = secs.time_since_epoch().count(); |
224 | ts.tv_nsec = ns.count(); |
225 | |
226 | // now add the time we want to wait |
227 | ts.tv_sec += usecs / usecs_in_1_sec; |
228 | ts.tv_nsec += (usecs % usecs_in_1_sec) * 1000; |
229 | |
230 | // sem_timedwait bombs if you have more than 1e9 in tv_nsec |
231 | // so we have to clean things up before passing it in |
232 | if (ts.tv_nsec >= nsecs_in_1_sec) { |
233 | ts.tv_nsec -= nsecs_in_1_sec; |
234 | ++ts.tv_sec; |
235 | } |
236 | |
237 | int rc; |
238 | do { |
239 | rc = sem_timedwait(sem: &m_sema, abstime: &ts); |
240 | } while (rc == -1 && errno == EINTR); |
241 | return rc == 0; |
242 | } |
243 | |
244 | void signal() |
245 | { |
246 | while (sem_post(sem: &m_sema) == -1); |
247 | } |
248 | |
249 | void signal(int count) |
250 | { |
251 | while (count-- > 0) |
252 | { |
253 | while (sem_post(sem: &m_sema) == -1); |
254 | } |
255 | } |
256 | }; |
257 | #else |
258 | #error Unsupported platform! (No semaphore wrapper available) |
259 | #endif |
260 | |
261 | } // end namespace details |
262 | |
263 | |
264 | //--------------------------------------------------------- |
265 | // LightweightSemaphore |
266 | //--------------------------------------------------------- |
267 | class LightweightSemaphore |
268 | { |
269 | public: |
270 | typedef std::make_signed<std::size_t>::type ssize_t; |
271 | |
272 | private: |
273 | std::atomic<ssize_t> m_count; |
274 | details::Semaphore m_sema; |
275 | |
276 | bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1) |
277 | { |
278 | ssize_t oldCount; |
279 | // Is there a better way to set the initial spin count? |
280 | // If we lower it to 1000, testBenaphore becomes 15x slower on my Core i7-5930K Windows PC, |
281 | // as threads start hitting the kernel semaphore. |
282 | int spin = 10000; |
283 | while (--spin >= 0) |
284 | { |
285 | oldCount = m_count.load(m: std::memory_order_relaxed); |
286 | if ((oldCount > 0) && m_count.compare_exchange_strong(i1&: oldCount, i2: oldCount - 1, m1: std::memory_order_acquire, m2: std::memory_order_relaxed)) |
287 | return true; |
288 | std::atomic_signal_fence(m: std::memory_order_acquire); // Prevent the compiler from collapsing the loop. |
289 | } |
290 | oldCount = m_count.fetch_sub(i: 1, m: std::memory_order_acquire); |
291 | if (oldCount > 0) |
292 | return true; |
293 | if (timeout_usecs < 0) |
294 | return m_sema.wait(); |
295 | if (m_sema.timed_wait(usecs: (std::uint64_t)timeout_usecs)) |
296 | return true; |
297 | // At this point, we've timed out waiting for the semaphore, but the |
298 | // count is still decremented indicating we may still be waiting on |
299 | // it. So we have to re-adjust the count, but only if the semaphore |
300 | // wasn't signaled enough times for us too since then. If it was, we |
301 | // need to release the semaphore too. |
302 | while (true) |
303 | { |
304 | oldCount = m_count.load(m: std::memory_order_acquire); |
305 | if (oldCount >= 0 && m_sema.try_wait()) |
306 | return true; |
307 | if (oldCount < 0 && m_count.compare_exchange_strong(i1&: oldCount, i2: oldCount + 1, m1: std::memory_order_relaxed, m2: std::memory_order_relaxed)) |
308 | return false; |
309 | } |
310 | } |
311 | |
312 | ssize_t waitManyWithPartialSpinning(ssize_t max, std::int64_t timeout_usecs = -1) |
313 | { |
314 | assert(max > 0); |
315 | ssize_t oldCount; |
316 | int spin = 10000; |
317 | while (--spin >= 0) |
318 | { |
319 | oldCount = m_count.load(m: std::memory_order_relaxed); |
320 | if (oldCount > 0) |
321 | { |
322 | ssize_t newCount = oldCount > max ? oldCount - max : 0; |
323 | if (m_count.compare_exchange_strong(i1&: oldCount, i2: newCount, m1: std::memory_order_acquire, m2: std::memory_order_relaxed)) |
324 | return oldCount - newCount; |
325 | } |
326 | std::atomic_signal_fence(m: std::memory_order_acquire); |
327 | } |
328 | oldCount = m_count.fetch_sub(i: 1, m: std::memory_order_acquire); |
329 | if (oldCount <= 0) |
330 | { |
331 | if (timeout_usecs < 0) |
332 | { |
333 | if (!m_sema.wait()) |
334 | return 0; |
335 | } |
336 | else if (!m_sema.timed_wait(usecs: (std::uint64_t)timeout_usecs)) |
337 | { |
338 | while (true) |
339 | { |
340 | oldCount = m_count.load(m: std::memory_order_acquire); |
341 | if (oldCount >= 0 && m_sema.try_wait()) |
342 | break; |
343 | if (oldCount < 0 && m_count.compare_exchange_strong(i1&: oldCount, i2: oldCount + 1, m1: std::memory_order_relaxed, m2: std::memory_order_relaxed)) |
344 | return 0; |
345 | } |
346 | } |
347 | } |
348 | if (max > 1) |
349 | return 1 + tryWaitMany(max: max - 1); |
350 | return 1; |
351 | } |
352 | |
353 | public: |
354 | LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount) |
355 | { |
356 | assert(initialCount >= 0); |
357 | } |
358 | |
359 | bool tryWait() |
360 | { |
361 | ssize_t oldCount = m_count.load(m: std::memory_order_relaxed); |
362 | while (oldCount > 0) |
363 | { |
364 | if (m_count.compare_exchange_weak(i1&: oldCount, i2: oldCount - 1, m1: std::memory_order_acquire, m2: std::memory_order_relaxed)) |
365 | return true; |
366 | } |
367 | return false; |
368 | } |
369 | |
370 | bool wait() |
371 | { |
372 | return tryWait() || waitWithPartialSpinning(); |
373 | } |
374 | |
375 | bool wait(std::int64_t timeout_usecs) |
376 | { |
377 | return tryWait() || waitWithPartialSpinning(timeout_usecs); |
378 | } |
379 | |
380 | // Acquires between 0 and (greedily) max, inclusive |
381 | ssize_t tryWaitMany(ssize_t max) |
382 | { |
383 | assert(max >= 0); |
384 | ssize_t oldCount = m_count.load(m: std::memory_order_relaxed); |
385 | while (oldCount > 0) |
386 | { |
387 | ssize_t newCount = oldCount > max ? oldCount - max : 0; |
388 | if (m_count.compare_exchange_weak(i1&: oldCount, i2: newCount, m1: std::memory_order_acquire, m2: std::memory_order_relaxed)) |
389 | return oldCount - newCount; |
390 | } |
391 | return 0; |
392 | } |
393 | |
394 | // Acquires at least one, and (greedily) at most max |
395 | ssize_t waitMany(ssize_t max, std::int64_t timeout_usecs) |
396 | { |
397 | assert(max >= 0); |
398 | ssize_t result = tryWaitMany(max); |
399 | if (result == 0 && max > 0) |
400 | result = waitManyWithPartialSpinning(max, timeout_usecs); |
401 | return result; |
402 | } |
403 | |
404 | ssize_t waitMany(ssize_t max) |
405 | { |
406 | ssize_t result = waitMany(max, timeout_usecs: -1); |
407 | assert(result > 0); |
408 | return result; |
409 | } |
410 | |
411 | void signal(ssize_t count = 1) |
412 | { |
413 | assert(count >= 0); |
414 | ssize_t oldCount = m_count.fetch_add(i: count, m: std::memory_order_release); |
415 | ssize_t toRelease = -oldCount < count ? -oldCount : count; |
416 | if (toRelease > 0) |
417 | { |
418 | m_sema.signal(count: (int)toRelease); |
419 | } |
420 | } |
421 | |
422 | ssize_t availableApprox() const |
423 | { |
424 | ssize_t count = m_count.load(m: std::memory_order_relaxed); |
425 | return count > 0 ? count : 0; |
426 | } |
427 | }; |
428 | |
429 | } // end namespace duckdb_moodycamel |
430 | |