1 | // Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors |
2 | // Licensed under the MIT License: |
3 | // |
4 | // Permission is hereby granted, free of charge, to any person obtaining a copy |
5 | // of this software and associated documentation files (the "Software"), to deal |
6 | // in the Software without restriction, including without limitation the rights |
7 | // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
8 | // copies of the Software, and to permit persons to whom the Software is |
9 | // furnished to do so, subject to the following conditions: |
10 | // |
11 | // The above copyright notice and this permission notice shall be included in |
12 | // all copies or substantial portions of the Software. |
13 | // |
14 | // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
15 | // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
16 | // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
17 | // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
18 | // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
19 | // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
20 | // THE SOFTWARE. |
21 | |
22 | #if _WIN32 |
23 | #define WIN32_LEAN_AND_MEAN 1 // lolz |
24 | #define WINVER 0x0600 |
25 | #define _WIN32_WINNT 0x0600 |
26 | #endif |
27 | |
28 | #include "mutex.h" |
29 | #include "debug.h" |
30 | |
31 | #if KJ_USE_FUTEX |
32 | #include <unistd.h> |
33 | #include <sys/syscall.h> |
34 | #include <linux/futex.h> |
35 | #include <limits.h> |
36 | |
37 | #ifndef SYS_futex |
38 | // Missing on Android/Bionic. |
39 | #define SYS_futex __NR_futex |
40 | #endif |
41 | |
42 | #ifndef FUTEX_WAIT_PRIVATE |
43 | // Missing on Android/Bionic. |
44 | #define FUTEX_WAIT_PRIVATE FUTEX_WAIT |
45 | #define FUTEX_WAKE_PRIVATE FUTEX_WAKE |
46 | #endif |
47 | |
48 | #elif _WIN32 |
49 | #include <windows.h> |
50 | #endif |
51 | |
52 | namespace kj { |
53 | namespace _ { // private |
54 | |
55 | #if KJ_USE_FUTEX |
56 | // ======================================================================================= |
57 | // Futex-based implementation (Linux-only) |
58 | |
59 | Mutex::Mutex(): futex(0) {} |
60 | Mutex::~Mutex() { |
61 | // This will crash anyway, might as well crash with a nice error message. |
62 | KJ_ASSERT(futex == 0, "Mutex destroyed while locked." ) { break; } |
63 | } |
64 | |
65 | void Mutex::lock(Exclusivity exclusivity) { |
66 | switch (exclusivity) { |
67 | case EXCLUSIVE: |
68 | for (;;) { |
69 | uint state = 0; |
70 | if (KJ_LIKELY(__atomic_compare_exchange_n(&futex, &state, EXCLUSIVE_HELD, false, |
71 | __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))) { |
72 | // Acquired. |
73 | break; |
74 | } |
75 | |
76 | // The mutex is contended. Set the exclusive-requested bit and wait. |
77 | if ((state & EXCLUSIVE_REQUESTED) == 0) { |
78 | if (!__atomic_compare_exchange_n(&futex, &state, state | EXCLUSIVE_REQUESTED, false, |
79 | __ATOMIC_RELAXED, __ATOMIC_RELAXED)) { |
80 | // Oops, the state changed before we could set the request bit. Start over. |
81 | continue; |
82 | } |
83 | |
84 | state |= EXCLUSIVE_REQUESTED; |
85 | } |
86 | |
87 | syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, state, NULL, NULL, 0); |
88 | } |
89 | break; |
90 | case SHARED: { |
91 | uint state = __atomic_add_fetch(&futex, 1, __ATOMIC_ACQUIRE); |
92 | for (;;) { |
93 | if (KJ_LIKELY((state & EXCLUSIVE_HELD) == 0)) { |
94 | // Acquired. |
95 | break; |
96 | } |
97 | |
98 | // The mutex is exclusively locked by another thread. Since we incremented the counter |
99 | // already, we just have to wait for it to be unlocked. |
100 | syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, state, NULL, NULL, 0); |
101 | state = __atomic_load_n(&futex, __ATOMIC_ACQUIRE); |
102 | } |
103 | break; |
104 | } |
105 | } |
106 | } |
107 | |
108 | struct Mutex::Waiter { |
109 | kj::Maybe<Waiter&> next; |
110 | kj::Maybe<Waiter&>* prev; |
111 | Predicate& predicate; |
112 | uint futex; |
113 | }; |
114 | |
115 | void Mutex::unlock(Exclusivity exclusivity) { |
116 | switch (exclusivity) { |
117 | case EXCLUSIVE: { |
118 | KJ_DASSERT(futex & EXCLUSIVE_HELD, "Unlocked a mutex that wasn't locked." ); |
119 | |
120 | // First check if there are any conditional waiters. Note we only do this when unlocking an |
121 | // exclusive lock since under a shared lock the state couldn't have changed. |
122 | auto nextWaiter = waitersHead; |
123 | for (;;) { |
124 | KJ_IF_MAYBE(waiter, nextWaiter) { |
125 | nextWaiter = waiter->next; |
126 | |
127 | if (waiter->predicate.check()) { |
128 | // This waiter's predicate now evaluates true, so wake it up. |
129 | __atomic_store_n(&waiter->futex, 1, __ATOMIC_RELEASE); |
130 | syscall(SYS_futex, &waiter->futex, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0); |
131 | |
132 | // We transferred ownership of the lock to this waiter, so we're done now. |
133 | return; |
134 | } |
135 | } else { |
136 | // No more waiters. |
137 | break; |
138 | } |
139 | } |
140 | |
141 | // Didn't wake any waiters, so wake normally. |
142 | uint oldState = __atomic_fetch_and( |
143 | &futex, ~(EXCLUSIVE_HELD | EXCLUSIVE_REQUESTED), __ATOMIC_RELEASE); |
144 | |
145 | if (KJ_UNLIKELY(oldState & ~EXCLUSIVE_HELD)) { |
146 | // Other threads are waiting. If there are any shared waiters, they now collectively hold |
147 | // the lock, and we must wake them up. If there are any exclusive waiters, we must wake |
148 | // them up even if readers are waiting so that at the very least they may re-establish the |
149 | // EXCLUSIVE_REQUESTED bit that we just removed. |
150 | syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0); |
151 | } |
152 | break; |
153 | } |
154 | |
155 | case SHARED: { |
156 | KJ_DASSERT(futex & SHARED_COUNT_MASK, "Unshared a mutex that wasn't shared." ); |
157 | uint state = __atomic_sub_fetch(&futex, 1, __ATOMIC_RELEASE); |
158 | |
159 | // The only case where anyone is waiting is if EXCLUSIVE_REQUESTED is set, and the only time |
160 | // it makes sense to wake up that waiter is if the shared count has reached zero. |
161 | if (KJ_UNLIKELY(state == EXCLUSIVE_REQUESTED)) { |
162 | if (__atomic_compare_exchange_n( |
163 | &futex, &state, 0, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) { |
164 | // Wake all exclusive waiters. We have to wake all of them because one of them will |
165 | // grab the lock while the others will re-establish the exclusive-requested bit. |
166 | syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0); |
167 | } |
168 | } |
169 | break; |
170 | } |
171 | } |
172 | } |
173 | |
174 | void Mutex::assertLockedByCaller(Exclusivity exclusivity) { |
175 | switch (exclusivity) { |
176 | case EXCLUSIVE: |
177 | KJ_ASSERT(futex & EXCLUSIVE_HELD, |
178 | "Tried to call getAlreadyLocked*() but lock is not held." ); |
179 | break; |
180 | case SHARED: |
181 | KJ_ASSERT(futex & SHARED_COUNT_MASK, |
182 | "Tried to call getAlreadyLocked*() but lock is not held." ); |
183 | break; |
184 | } |
185 | } |
186 | |
187 | void Mutex::lockWhen(Predicate& predicate) { |
188 | lock(EXCLUSIVE); |
189 | |
190 | // Add waiter to list. |
191 | Waiter waiter { nullptr, waitersTail, predicate, 0 }; |
192 | *waitersTail = waiter; |
193 | waitersTail = &waiter.next; |
194 | |
195 | KJ_DEFER({ |
196 | // Remove from list. |
197 | *waiter.prev = waiter.next; |
198 | KJ_IF_MAYBE(next, waiter.next) { |
199 | next->prev = waiter.prev; |
200 | } else { |
201 | KJ_DASSERT(waitersTail == &waiter.next); |
202 | waitersTail = waiter.prev; |
203 | } |
204 | }); |
205 | |
206 | if (!predicate.check()) { |
207 | unlock(EXCLUSIVE); |
208 | |
209 | // Wait for someone to set out futex to 1. |
210 | while (__atomic_load_n(&waiter.futex, __ATOMIC_ACQUIRE) == 0) { |
211 | syscall(SYS_futex, &waiter.futex, FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0); |
212 | } |
213 | |
214 | // Ownership of an exclusive lock was transferred to us. We can continue. |
215 | #ifdef KJ_DEBUG |
216 | assertLockedByCaller(EXCLUSIVE); |
217 | #endif |
218 | } |
219 | } |
220 | |
221 | void Once::runOnce(Initializer& init) { |
222 | startOver: |
223 | uint state = UNINITIALIZED; |
224 | if (__atomic_compare_exchange_n(&futex, &state, INITIALIZING, false, |
225 | __ATOMIC_RELAXED, __ATOMIC_RELAXED)) { |
226 | // It's our job to initialize! |
227 | { |
228 | KJ_ON_SCOPE_FAILURE({ |
229 | // An exception was thrown by the initializer. We have to revert. |
230 | if (__atomic_exchange_n(&futex, UNINITIALIZED, __ATOMIC_RELEASE) == |
231 | INITIALIZING_WITH_WAITERS) { |
232 | // Someone was waiting for us to finish. |
233 | syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0); |
234 | } |
235 | }); |
236 | |
237 | init.run(); |
238 | } |
239 | if (__atomic_exchange_n(&futex, INITIALIZED, __ATOMIC_RELEASE) == |
240 | INITIALIZING_WITH_WAITERS) { |
241 | // Someone was waiting for us to finish. |
242 | syscall(SYS_futex, &futex, FUTEX_WAKE_PRIVATE, INT_MAX, NULL, NULL, 0); |
243 | } |
244 | } else { |
245 | for (;;) { |
246 | if (state == INITIALIZED) { |
247 | break; |
248 | } else if (state == INITIALIZING) { |
249 | // Initialization is taking place in another thread. Indicate that we're waiting. |
250 | if (!__atomic_compare_exchange_n(&futex, &state, INITIALIZING_WITH_WAITERS, true, |
251 | __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) { |
252 | // State changed, retry. |
253 | continue; |
254 | } |
255 | } else { |
256 | KJ_DASSERT(state == INITIALIZING_WITH_WAITERS); |
257 | } |
258 | |
259 | // Wait for initialization. |
260 | syscall(SYS_futex, &futex, FUTEX_WAIT_PRIVATE, INITIALIZING_WITH_WAITERS, NULL, NULL, 0); |
261 | state = __atomic_load_n(&futex, __ATOMIC_ACQUIRE); |
262 | |
263 | if (state == UNINITIALIZED) { |
264 | // Oh hey, apparently whoever was trying to initialize gave up. Let's take it from the |
265 | // top. |
266 | goto startOver; |
267 | } |
268 | } |
269 | } |
270 | } |
271 | |
272 | void Once::reset() { |
273 | uint state = INITIALIZED; |
274 | if (!__atomic_compare_exchange_n(&futex, &state, UNINITIALIZED, |
275 | false, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) { |
276 | KJ_FAIL_REQUIRE("reset() called while not initialized." ); |
277 | } |
278 | } |
279 | |
280 | #elif _WIN32 |
281 | // ======================================================================================= |
282 | // Win32 implementation |
283 | |
284 | #define coercedSrwLock (*reinterpret_cast<SRWLOCK*>(&srwLock)) |
285 | #define coercedInitOnce (*reinterpret_cast<INIT_ONCE*>(&initOnce)) |
286 | |
287 | Mutex::Mutex() { |
288 | static_assert(sizeof(SRWLOCK) == sizeof(srwLock), "SRWLOCK is not a pointer?" ); |
289 | InitializeSRWLock(&coercedSrwLock); |
290 | } |
291 | Mutex::~Mutex() {} |
292 | |
293 | void Mutex::lock(Exclusivity exclusivity) { |
294 | switch (exclusivity) { |
295 | case EXCLUSIVE: |
296 | AcquireSRWLockExclusive(&coercedSrwLock); |
297 | break; |
298 | case SHARED: |
299 | AcquireSRWLockShared(&coercedSrwLock); |
300 | break; |
301 | } |
302 | } |
303 | |
304 | void Mutex::unlock(Exclusivity exclusivity) { |
305 | switch (exclusivity) { |
306 | case EXCLUSIVE: |
307 | ReleaseSRWLockExclusive(&coercedSrwLock); |
308 | break; |
309 | case SHARED: |
310 | ReleaseSRWLockShared(&coercedSrwLock); |
311 | break; |
312 | } |
313 | } |
314 | |
315 | void Mutex::assertLockedByCaller(Exclusivity exclusivity) { |
316 | // We could use TryAcquireSRWLock*() here like we do with the pthread version. However, as of |
317 | // this writing, my version of Wine (1.6.2) doesn't implement these functions and will abort if |
318 | // they are called. Since we were only going to use them as a hacky way to check if the lock is |
319 | // held for debug purposes anyway, we just don't bother. |
320 | } |
321 | |
322 | static BOOL WINAPI nullInitializer(PINIT_ONCE initOnce, PVOID parameter, PVOID* context) { |
323 | return true; |
324 | } |
325 | |
326 | Once::Once(bool startInitialized) { |
327 | static_assert(sizeof(INIT_ONCE) == sizeof(initOnce), "INIT_ONCE is not a pointer?" ); |
328 | InitOnceInitialize(&coercedInitOnce); |
329 | if (startInitialized) { |
330 | InitOnceExecuteOnce(&coercedInitOnce, &nullInitializer, nullptr, nullptr); |
331 | } |
332 | } |
333 | Once::~Once() {} |
334 | |
335 | void Once::runOnce(Initializer& init) { |
336 | BOOL needInit; |
337 | while (!InitOnceBeginInitialize(&coercedInitOnce, 0, &needInit, nullptr)) { |
338 | // Init was occurring in another thread, but then failed with an exception. Retry. |
339 | } |
340 | |
341 | if (needInit) { |
342 | { |
343 | KJ_ON_SCOPE_FAILURE(InitOnceComplete(&coercedInitOnce, INIT_ONCE_INIT_FAILED, nullptr)); |
344 | init.run(); |
345 | } |
346 | |
347 | KJ_ASSERT(InitOnceComplete(&coercedInitOnce, 0, nullptr)); |
348 | } |
349 | } |
350 | |
351 | bool Once::isInitialized() noexcept { |
352 | BOOL junk; |
353 | return InitOnceBeginInitialize(&coercedInitOnce, INIT_ONCE_CHECK_ONLY, &junk, nullptr); |
354 | } |
355 | |
356 | void Once::reset() { |
357 | InitOnceInitialize(&coercedInitOnce); |
358 | } |
359 | |
360 | #else |
361 | // ======================================================================================= |
362 | // Generic pthreads-based implementation |
363 | |
364 | #define KJ_PTHREAD_CALL(code) \ |
365 | { \ |
366 | int pthreadError = code; \ |
367 | if (pthreadError != 0) { \ |
368 | KJ_FAIL_SYSCALL(#code, pthreadError); \ |
369 | } \ |
370 | } |
371 | |
372 | #define KJ_PTHREAD_CLEANUP(code) \ |
373 | { \ |
374 | int pthreadError = code; \ |
375 | if (pthreadError != 0) { \ |
376 | KJ_LOG(ERROR, #code, strerror(pthreadError)); \ |
377 | } \ |
378 | } |
379 | |
380 | Mutex::Mutex() { |
381 | KJ_PTHREAD_CALL(pthread_rwlock_init(&mutex, nullptr)); |
382 | } |
383 | Mutex::~Mutex() { |
384 | KJ_PTHREAD_CLEANUP(pthread_rwlock_destroy(&mutex)); |
385 | } |
386 | |
387 | void Mutex::lock(Exclusivity exclusivity) { |
388 | switch (exclusivity) { |
389 | case EXCLUSIVE: |
390 | KJ_PTHREAD_CALL(pthread_rwlock_wrlock(&mutex)); |
391 | break; |
392 | case SHARED: |
393 | KJ_PTHREAD_CALL(pthread_rwlock_rdlock(&mutex)); |
394 | break; |
395 | } |
396 | } |
397 | |
398 | void Mutex::unlock(Exclusivity exclusivity) { |
399 | KJ_PTHREAD_CALL(pthread_rwlock_unlock(&mutex)); |
400 | } |
401 | |
402 | void Mutex::assertLockedByCaller(Exclusivity exclusivity) { |
403 | switch (exclusivity) { |
404 | case EXCLUSIVE: |
405 | // A read lock should fail if the mutex is already held for writing. |
406 | if (pthread_rwlock_tryrdlock(&mutex) == 0) { |
407 | pthread_rwlock_unlock(&mutex); |
408 | KJ_FAIL_ASSERT("Tried to call getAlreadyLocked*() but lock is not held." ); |
409 | } |
410 | break; |
411 | case SHARED: |
412 | // A write lock should fail if the mutex is already held for reading or writing. We don't |
413 | // have any way to prove that the lock is held only for reading. |
414 | if (pthread_rwlock_trywrlock(&mutex) == 0) { |
415 | pthread_rwlock_unlock(&mutex); |
416 | KJ_FAIL_ASSERT("Tried to call getAlreadyLocked*() but lock is not held." ); |
417 | } |
418 | break; |
419 | } |
420 | } |
421 | |
422 | Once::Once(bool startInitialized): state(startInitialized ? INITIALIZED : UNINITIALIZED) { |
423 | KJ_PTHREAD_CALL(pthread_mutex_init(&mutex, nullptr)); |
424 | } |
425 | Once::~Once() { |
426 | KJ_PTHREAD_CLEANUP(pthread_mutex_destroy(&mutex)); |
427 | } |
428 | |
429 | void Once::runOnce(Initializer& init) { |
430 | KJ_PTHREAD_CALL(pthread_mutex_lock(&mutex)); |
431 | KJ_DEFER(KJ_PTHREAD_CALL(pthread_mutex_unlock(&mutex))); |
432 | |
433 | if (state != UNINITIALIZED) { |
434 | return; |
435 | } |
436 | |
437 | init.run(); |
438 | |
439 | __atomic_store_n(&state, INITIALIZED, __ATOMIC_RELEASE); |
440 | } |
441 | |
442 | void Once::reset() { |
443 | State oldState = INITIALIZED; |
444 | if (!__atomic_compare_exchange_n(&state, &oldState, UNINITIALIZED, |
445 | false, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) { |
446 | KJ_FAIL_REQUIRE("reset() called while not initialized." ); |
447 | } |
448 | } |
449 | |
450 | #endif |
451 | |
452 | } // namespace _ (private) |
453 | } // namespace kj |
454 | |