1// Copyright 2017 The Abseil Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#include "absl/synchronization/internal/waiter.h"
16
17#include "absl/base/config.h"
18
19#ifdef _WIN32
20#include <windows.h>
21#else
22#include <pthread.h>
23#include <sys/time.h>
24#include <unistd.h>
25#endif
26
27#ifdef __linux__
28#include <linux/futex.h>
29#include <sys/syscall.h>
30#endif
31
32#ifdef ABSL_HAVE_SEMAPHORE_H
33#include <semaphore.h>
34#endif
35
36#include <errno.h>
37#include <stdio.h>
38#include <time.h>
39
40#include <atomic>
41#include <cassert>
42#include <cstdint>
43#include <new>
44#include <type_traits>
45
46#include "absl/base/internal/raw_logging.h"
47#include "absl/base/internal/thread_identity.h"
48#include "absl/base/optimization.h"
49#include "absl/synchronization/internal/kernel_timeout.h"
50
51namespace absl {
52namespace synchronization_internal {
53
54static void MaybeBecomeIdle() {
55 base_internal::ThreadIdentity *identity =
56 base_internal::CurrentThreadIdentityIfPresent();
57 assert(identity != nullptr);
58 const bool is_idle = identity->is_idle.load(std::memory_order_relaxed);
59 const int ticker = identity->ticker.load(std::memory_order_relaxed);
60 const int wait_start = identity->wait_start.load(std::memory_order_relaxed);
61 if (!is_idle && ticker - wait_start > Waiter::kIdlePeriods) {
62 identity->is_idle.store(true, std::memory_order_relaxed);
63 }
64}
65
66#if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
67
68// Some Android headers are missing these definitions even though they
69// support these futex operations.
70#ifdef __BIONIC__
71#ifndef SYS_futex
72#define SYS_futex __NR_futex
73#endif
74#ifndef FUTEX_WAIT_BITSET
75#define FUTEX_WAIT_BITSET 9
76#endif
77#ifndef FUTEX_PRIVATE_FLAG
78#define FUTEX_PRIVATE_FLAG 128
79#endif
80#ifndef FUTEX_CLOCK_REALTIME
81#define FUTEX_CLOCK_REALTIME 256
82#endif
83#ifndef FUTEX_BITSET_MATCH_ANY
84#define FUTEX_BITSET_MATCH_ANY 0xFFFFFFFF
85#endif
86#endif
87
88class Futex {
89 public:
90 static int WaitUntil(std::atomic<int32_t> *v, int32_t val,
91 KernelTimeout t) {
92 int err = 0;
93 if (t.has_timeout()) {
94 // https://locklessinc.com/articles/futex_cheat_sheet/
95 // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time.
96 struct timespec abs_timeout = t.MakeAbsTimespec();
97 // Atomically check that the futex value is still 0, and if it
98 // is, sleep until abs_timeout or until woken by FUTEX_WAKE.
99 err = syscall(
100 SYS_futex, reinterpret_cast<int32_t *>(v),
101 FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, val,
102 &abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY);
103 } else {
104 // Atomically check that the futex value is still 0, and if it
105 // is, sleep until woken by FUTEX_WAKE.
106 err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
107 FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, nullptr);
108 }
109 if (err != 0) {
110 err = -errno;
111 }
112 return err;
113 }
114
115 static int Wake(std::atomic<int32_t> *v, int32_t count) {
116 int err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
117 FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count);
118 if (ABSL_PREDICT_FALSE(err < 0)) {
119 err = -errno;
120 }
121 return err;
122 }
123};
124
125void Waiter::Init() {
126 futex_.store(0, std::memory_order_relaxed);
127}
128
129bool Waiter::Wait(KernelTimeout t) {
130 // Loop until we can atomically decrement futex from a positive
131 // value, waiting on a futex while we believe it is zero.
132 while (true) {
133 int32_t x = futex_.load(std::memory_order_relaxed);
134 if (x != 0) {
135 if (!futex_.compare_exchange_weak(x, x - 1,
136 std::memory_order_acquire,
137 std::memory_order_relaxed)) {
138 continue; // Raced with someone, retry.
139 }
140 return true; // Consumed a wakeup, we are done.
141 }
142
143 const int err = Futex::WaitUntil(&futex_, 0, t);
144 if (err != 0) {
145 if (err == -EINTR || err == -EWOULDBLOCK) {
146 // Do nothing, the loop will retry.
147 } else if (err == -ETIMEDOUT) {
148 return false;
149 } else {
150 ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
151 }
152 }
153
154 MaybeBecomeIdle();
155 }
156}
157
158void Waiter::Post() {
159 if (futex_.fetch_add(1, std::memory_order_release) == 0) {
160 // We incremented from 0, need to wake a potential waker.
161 Poke();
162 }
163}
164
165void Waiter::Poke() {
166 // Wake one thread waiting on the futex.
167 const int err = Futex::Wake(&futex_, 1);
168 if (ABSL_PREDICT_FALSE(err < 0)) {
169 ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
170 }
171}
172
173#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR
174
175class PthreadMutexHolder {
176 public:
177 explicit PthreadMutexHolder(pthread_mutex_t *mu) : mu_(mu) {
178 const int err = pthread_mutex_lock(mu_);
179 if (err != 0) {
180 ABSL_RAW_LOG(FATAL, "pthread_mutex_lock failed: %d", err);
181 }
182 }
183
184 PthreadMutexHolder(const PthreadMutexHolder &rhs) = delete;
185 PthreadMutexHolder &operator=(const PthreadMutexHolder &rhs) = delete;
186
187 ~PthreadMutexHolder() {
188 const int err = pthread_mutex_unlock(mu_);
189 if (err != 0) {
190 ABSL_RAW_LOG(FATAL, "pthread_mutex_unlock failed: %d", err);
191 }
192 }
193
194 private:
195 pthread_mutex_t *mu_;
196};
197
198void Waiter::Init() {
199 const int err = pthread_mutex_init(&mu_, 0);
200 if (err != 0) {
201 ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err);
202 }
203
204 const int err2 = pthread_cond_init(&cv_, 0);
205 if (err2 != 0) {
206 ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2);
207 }
208
209 waiter_count_.store(0, std::memory_order_relaxed);
210 wakeup_count_.store(0, std::memory_order_relaxed);
211}
212
213bool Waiter::Wait(KernelTimeout t) {
214 struct timespec abs_timeout;
215 if (t.has_timeout()) {
216 abs_timeout = t.MakeAbsTimespec();
217 }
218
219 PthreadMutexHolder h(&mu_);
220 waiter_count_.fetch_add(1, std::memory_order_relaxed);
221 // Loop until we find a wakeup to consume or timeout.
222 while (true) {
223 int x = wakeup_count_.load(std::memory_order_relaxed);
224 if (x != 0) {
225 if (!wakeup_count_.compare_exchange_weak(x, x - 1,
226 std::memory_order_acquire,
227 std::memory_order_relaxed)) {
228 continue; // Raced with someone, retry.
229 }
230 // Successfully consumed a wakeup, we're done.
231 waiter_count_.fetch_sub(1, std::memory_order_relaxed);
232 return true;
233 }
234
235 // No wakeups available, time to wait.
236 if (!t.has_timeout()) {
237 const int err = pthread_cond_wait(&cv_, &mu_);
238 if (err != 0) {
239 ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
240 }
241 } else {
242 const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout);
243 if (err == ETIMEDOUT) {
244 waiter_count_.fetch_sub(1, std::memory_order_relaxed);
245 return false;
246 }
247 if (err != 0) {
248 ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
249 }
250 }
251 MaybeBecomeIdle();
252 }
253}
254
255void Waiter::Post() {
256 wakeup_count_.fetch_add(1, std::memory_order_release);
257 Poke();
258}
259
260void Waiter::Poke() {
261 if (waiter_count_.load(std::memory_order_relaxed) == 0) {
262 return;
263 }
264 // Potentially a waker. Take the lock and check again.
265 PthreadMutexHolder h(&mu_);
266 if (waiter_count_.load(std::memory_order_relaxed) == 0) {
267 return;
268 }
269 const int err = pthread_cond_signal(&cv_);
270 if (err != 0) {
271 ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err);
272 }
273}
274
275#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
276
277void Waiter::Init() {
278 if (sem_init(&sem_, 0, 0) != 0) {
279 ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno);
280 }
281 wakeups_.store(0, std::memory_order_relaxed);
282}
283
284bool Waiter::Wait(KernelTimeout t) {
285 struct timespec abs_timeout;
286 if (t.has_timeout()) {
287 abs_timeout = t.MakeAbsTimespec();
288 }
289
290 // Loop until we timeout or consume a wakeup.
291 while (true) {
292 int x = wakeups_.load(std::memory_order_relaxed);
293 if (x != 0) {
294 if (!wakeups_.compare_exchange_weak(x, x - 1,
295 std::memory_order_acquire,
296 std::memory_order_relaxed)) {
297 continue; // Raced with someone, retry.
298 }
299 // Successfully consumed a wakeup, we're done.
300 return true;
301 }
302
303 // Nothing to consume, wait (looping on EINTR).
304 while (true) {
305 if (!t.has_timeout()) {
306 if (sem_wait(&sem_) == 0) break;
307 if (errno == EINTR) continue;
308 ABSL_RAW_LOG(FATAL, "sem_wait failed: %d", errno);
309 } else {
310 if (sem_timedwait(&sem_, &abs_timeout) == 0) break;
311 if (errno == EINTR) continue;
312 if (errno == ETIMEDOUT) return false;
313 ABSL_RAW_LOG(FATAL, "sem_timedwait failed: %d", errno);
314 }
315 }
316 MaybeBecomeIdle();
317 }
318}
319
320void Waiter::Post() {
321 wakeups_.fetch_add(1, std::memory_order_release); // Post a wakeup.
322 Poke();
323}
324
325void Waiter::Poke() {
326 if (sem_post(&sem_) != 0) { // Wake any semaphore waiter.
327 ABSL_RAW_LOG(FATAL, "sem_post failed with errno %d\n", errno);
328 }
329}
330
331#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32
332
333class Waiter::WinHelper {
334 public:
335 static SRWLOCK *GetLock(Waiter *w) {
336 return reinterpret_cast<SRWLOCK *>(&w->mu_storage_);
337 }
338
339 static CONDITION_VARIABLE *GetCond(Waiter *w) {
340 return reinterpret_cast<CONDITION_VARIABLE *>(&w->cv_storage_);
341 }
342
343 static_assert(sizeof(SRWLOCK) == sizeof(Waiter::SRWLockStorage),
344 "SRWLockStorage does not have the same size as SRWLOCK");
345 static_assert(
346 alignof(SRWLOCK) == alignof(Waiter::SRWLockStorage),
347 "SRWLockStorage does not have the same alignment as SRWLOCK");
348
349 static_assert(sizeof(CONDITION_VARIABLE) ==
350 sizeof(Waiter::ConditionVariableStorage),
351 "ABSL_CONDITION_VARIABLE_STORAGE does not have the same size "
352 "as CONDITION_VARIABLE");
353 static_assert(alignof(CONDITION_VARIABLE) ==
354 alignof(Waiter::ConditionVariableStorage),
355 "ConditionVariableStorage does not have the same "
356 "alignment as CONDITION_VARIABLE");
357
358 // The SRWLOCK and CONDITION_VARIABLE types must be trivially constuctible
359 // and destructible because we never call their constructors or destructors.
360 static_assert(std::is_trivially_constructible<SRWLOCK>::value,
361 "The SRWLOCK type must be trivially constructible");
362 static_assert(std::is_trivially_constructible<CONDITION_VARIABLE>::value,
363 "The CONDITION_VARIABLE type must be trivially constructible");
364 static_assert(std::is_trivially_destructible<SRWLOCK>::value,
365 "The SRWLOCK type must be trivially destructible");
366 static_assert(std::is_trivially_destructible<CONDITION_VARIABLE>::value,
367 "The CONDITION_VARIABLE type must be trivially destructible");
368};
369
370class LockHolder {
371 public:
372 explicit LockHolder(SRWLOCK* mu) : mu_(mu) {
373 AcquireSRWLockExclusive(mu_);
374 }
375
376 LockHolder(const LockHolder&) = delete;
377 LockHolder& operator=(const LockHolder&) = delete;
378
379 ~LockHolder() {
380 ReleaseSRWLockExclusive(mu_);
381 }
382
383 private:
384 SRWLOCK* mu_;
385};
386
387void Waiter::Init() {
388 auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK;
389 auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE;
390 InitializeSRWLock(mu);
391 InitializeConditionVariable(cv);
392 waiter_count_.store(0, std::memory_order_relaxed);
393 wakeup_count_.store(0, std::memory_order_relaxed);
394}
395
396bool Waiter::Wait(KernelTimeout t) {
397 SRWLOCK *mu = WinHelper::GetLock(this);
398 CONDITION_VARIABLE *cv = WinHelper::GetCond(this);
399
400 LockHolder h(mu);
401 waiter_count_.fetch_add(1, std::memory_order_relaxed);
402
403 // Loop until we find a wakeup to consume or timeout.
404 while (true) {
405 int x = wakeup_count_.load(std::memory_order_relaxed);
406 if (x != 0) {
407 if (!wakeup_count_.compare_exchange_weak(x, x - 1,
408 std::memory_order_acquire,
409 std::memory_order_relaxed)) {
410 continue; // Raced with someone, retry.
411 }
412 // Successfully consumed a wakeup, we're done.
413 waiter_count_.fetch_sub(1, std::memory_order_relaxed);
414 return true;
415 }
416
417 // No wakeups available, time to wait.
418 if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) {
419 // GetLastError() returns a Win32 DWORD, but we assign to
420 // unsigned long to simplify the ABSL_RAW_LOG case below. The uniform
421 // initialization guarantees this is not a narrowing conversion.
422 const unsigned long err{GetLastError()}; // NOLINT(runtime/int)
423 if (err == ERROR_TIMEOUT) {
424 waiter_count_.fetch_sub(1, std::memory_order_relaxed);
425 return false;
426 } else {
427 ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err);
428 }
429 }
430
431 MaybeBecomeIdle();
432 }
433}
434
435void Waiter::Post() {
436 wakeup_count_.fetch_add(1, std::memory_order_release);
437 Poke();
438}
439
440void Waiter::Poke() {
441 if (waiter_count_.load(std::memory_order_relaxed) == 0) {
442 return;
443 }
444 // Potentially a waker. Take the lock and check again.
445 LockHolder h(WinHelper::GetLock(this));
446 if (waiter_count_.load(std::memory_order_relaxed) == 0) {
447 return;
448 }
449 WakeConditionVariable(WinHelper::GetCond(this));
450}
451
452#else
453#error Unknown ABSL_WAITER_MODE
454#endif
455
456} // namespace synchronization_internal
457} // namespace absl
458