1/*
2 * Copyright 2012-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <atomic>
20#include <climits>
21#include <thread>
22
23#include <glog/logging.h>
24
25#include <folly/Likely.h>
26#include <folly/detail/Futex.h>
27#include <folly/lang/Bits.h>
28#include <folly/portability/SysTime.h>
29#include <folly/portability/Unistd.h>
30
31namespace folly {
32
33/**
34 * Event count: a condition variable for lock free algorithms.
35 *
36 * See http://www.1024cores.net/home/lock-free-algorithms/eventcounts for
37 * details.
38 *
39 * Event counts allow you to convert a non-blocking lock-free / wait-free
40 * algorithm into a blocking one, by isolating the blocking logic. You call
41 * prepareWait() before checking your condition and then either cancelWait()
42 * or wait() depending on whether the condition was true. When another
43 * thread makes the condition true, it must call notify() / notifyAll() just
44 * like a regular condition variable.
45 *
46 * If "<" denotes the happens-before relationship, consider 2 threads (T1 and
47 * T2) and 3 events:
48 * - E1: T1 returns from prepareWait
49 * - E2: T1 calls wait
50 * (obviously E1 < E2, intra-thread)
51 * - E3: T2 calls notifyAll
52 *
53 * If E1 < E3, then E2's wait will complete (and T1 will either wake up,
54 * or not block at all)
55 *
56 * This means that you can use an EventCount in the following manner:
57 *
58 * Waiter:
59 * if (!condition()) { // handle fast path first
60 * for (;;) {
61 * auto key = eventCount.prepareWait();
62 * if (condition()) {
63 * eventCount.cancelWait();
64 * break;
65 * } else {
66 * eventCount.wait(key);
67 * }
68 * }
69 * }
70 *
71 * (This pattern is encapsulated in await())
72 *
73 * Poster:
74 * make_condition_true();
75 * eventCount.notifyAll();
76 *
77 * Note that, just like with regular condition variables, the waiter needs to
78 * be tolerant of spurious wakeups and needs to recheck the condition after
79 * being woken up. Also, as there is no mutual exclusion implied, "checking"
80 * the condition likely means attempting an operation on an underlying
81 * data structure (push into a lock-free queue, etc) and returning true on
82 * success and false on failure.
83 */
84class EventCount {
85 public:
86 EventCount() noexcept : val_(0) {}
87
88 class Key {
89 friend class EventCount;
90 explicit Key(uint32_t e) noexcept : epoch_(e) {}
91 uint32_t epoch_;
92 };
93
94 void notify() noexcept;
95 void notifyAll() noexcept;
96 Key prepareWait() noexcept;
97 void cancelWait() noexcept;
98 void wait(Key key) noexcept;
99
100 /**
101 * Wait for condition() to become true. Will clean up appropriately if
102 * condition() throws, and then rethrow.
103 */
104 template <class Condition>
105 void await(Condition condition);
106
107 private:
108 void doNotify(int n) noexcept;
109 EventCount(const EventCount&) = delete;
110 EventCount(EventCount&&) = delete;
111 EventCount& operator=(const EventCount&) = delete;
112 EventCount& operator=(EventCount&&) = delete;
113
114 // This requires 64-bit
115 static_assert(sizeof(int) == 4, "bad platform");
116 static_assert(sizeof(uint32_t) == 4, "bad platform");
117 static_assert(sizeof(uint64_t) == 8, "bad platform");
118 static_assert(sizeof(std::atomic<uint64_t>) == 8, "bad platform");
119 static_assert(sizeof(detail::Futex<std::atomic>) == 4, "bad platform");
120
121 static constexpr size_t kEpochOffset = kIsLittleEndian ? 1 : 0;
122
123 // val_ stores the epoch in the most significant 32 bits and the
124 // waiter count in the least significant 32 bits.
125 std::atomic<uint64_t> val_;
126
127 static constexpr uint64_t kAddWaiter = uint64_t(1);
128 static constexpr uint64_t kSubWaiter = uint64_t(-1);
129 static constexpr size_t kEpochShift = 32;
130 static constexpr uint64_t kAddEpoch = uint64_t(1) << kEpochShift;
131 static constexpr uint64_t kWaiterMask = kAddEpoch - 1;
132};
133
134inline void EventCount::notify() noexcept {
135 doNotify(1);
136}
137
138inline void EventCount::notifyAll() noexcept {
139 doNotify(INT_MAX);
140}
141
142inline void EventCount::doNotify(int n) noexcept {
143 uint64_t prev = val_.fetch_add(kAddEpoch, std::memory_order_acq_rel);
144 if (UNLIKELY(prev & kWaiterMask)) {
145 detail::futexWake(
146 reinterpret_cast<detail::Futex<std::atomic>*>(&val_) + kEpochOffset, n);
147 }
148}
149
150inline EventCount::Key EventCount::prepareWait() noexcept {
151 uint64_t prev = val_.fetch_add(kAddWaiter, std::memory_order_acq_rel);
152 return Key(prev >> kEpochShift);
153}
154
155inline void EventCount::cancelWait() noexcept {
156 // memory_order_relaxed would suffice for correctness, but the faster
157 // #waiters gets to 0, the less likely it is that we'll do spurious wakeups
158 // (and thus system calls).
159 uint64_t prev = val_.fetch_add(kSubWaiter, std::memory_order_seq_cst);
160 DCHECK_NE((prev & kWaiterMask), 0);
161}
162
163inline void EventCount::wait(Key key) noexcept {
164 while ((val_.load(std::memory_order_acquire) >> kEpochShift) == key.epoch_) {
165 detail::futexWait(
166 reinterpret_cast<detail::Futex<std::atomic>*>(&val_) + kEpochOffset,
167 key.epoch_);
168 }
169 // memory_order_relaxed would suffice for correctness, but the faster
170 // #waiters gets to 0, the less likely it is that we'll do spurious wakeups
171 // (and thus system calls)
172 uint64_t prev = val_.fetch_add(kSubWaiter, std::memory_order_seq_cst);
173 DCHECK_NE((prev & kWaiterMask), 0);
174}
175
176template <class Condition>
177void EventCount::await(Condition condition) {
178 if (condition()) {
179 return; // fast path
180 }
181
182 // condition() is the only thing that may throw, everything else is
183 // noexcept, so we can hoist the try/catch block outside of the loop
184 try {
185 for (;;) {
186 auto key = prepareWait();
187 if (condition()) {
188 cancelWait();
189 break;
190 } else {
191 wait(key);
192 }
193 }
194 } catch (...) {
195 cancelWait();
196 throw;
197 }
198}
199
200} // namespace folly
201