1 | /* |
2 | * Copyright 2015-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <atomic> |
20 | #include <cmath> |
21 | #include <cstring> |
22 | #include <memory> |
23 | #include <type_traits> |
24 | |
25 | #include <boost/noncopyable.hpp> |
26 | |
27 | #include <folly/Portability.h> |
28 | #include <folly/Traits.h> |
29 | #include <folly/detail/TurnSequencer.h> |
30 | #include <folly/portability/Unistd.h> |
31 | |
32 | namespace folly { |
33 | namespace detail { |
34 | |
35 | template <typename T, template <typename> class Atom> |
36 | class RingBufferSlot; |
37 | } // namespace detail |
38 | |
39 | /// LockFreeRingBuffer<T> is a fixed-size, concurrent ring buffer with the |
40 | /// following semantics: |
41 | /// |
42 | /// 1. Writers cannot block on other writers UNLESS they are <capacity> writes |
43 | /// apart from each other (writing to the same slot after a wrap-around) |
44 | /// 2. Writers cannot block on readers |
45 | /// 3. Readers can wait for writes that haven't occurred yet |
46 | /// 4. Readers can detect if they are lagging behind |
47 | /// |
48 | /// In this sense, reads from this buffer are best-effort but writes |
49 | /// are guaranteed. |
50 | /// |
51 | /// Another way to think about this is as an unbounded stream of writes. The |
52 | /// buffer contains the last <capacity> writes but readers can attempt to read |
53 | /// any part of the stream, even outside this window. The read API takes a |
54 | /// Cursor that can point anywhere in this stream of writes. Reads from the |
55 | /// "future" can optionally block but reads from the "past" will always fail. |
56 | /// |
57 | |
58 | template <typename T, template <typename> class Atom = std::atomic> |
59 | class LockFreeRingBuffer : boost::noncopyable { |
60 | static_assert( |
61 | std::is_nothrow_default_constructible<T>::value, |
62 | "Element type must be nothrow default constructible" ); |
63 | |
64 | static_assert( |
65 | folly::is_trivially_copyable<T>::value, |
66 | "Element type must be trivially copyable" ); |
67 | |
68 | public: |
69 | /// Opaque pointer to a past or future write. |
70 | /// Can be moved relative to its current location but not in absolute terms. |
71 | struct Cursor { |
72 | explicit Cursor(uint64_t initialTicket) noexcept : ticket(initialTicket) {} |
73 | |
74 | /// Returns true if this cursor now points to a different |
75 | /// write, false otherwise. |
76 | bool moveForward(uint64_t steps = 1) noexcept { |
77 | uint64_t prevTicket = ticket; |
78 | ticket += steps; |
79 | return prevTicket != ticket; |
80 | } |
81 | |
82 | /// Returns true if this cursor now points to a previous |
83 | /// write, false otherwise. |
84 | bool moveBackward(uint64_t steps = 1) noexcept { |
85 | uint64_t prevTicket = ticket; |
86 | if (steps > ticket) { |
87 | ticket = 0; |
88 | } else { |
89 | ticket -= steps; |
90 | } |
91 | return prevTicket != ticket; |
92 | } |
93 | |
94 | protected: // for test visibility reasons |
95 | uint64_t ticket; |
96 | friend class LockFreeRingBuffer; |
97 | }; |
98 | |
99 | explicit LockFreeRingBuffer(uint32_t capacity) noexcept |
100 | : capacity_(capacity), |
101 | slots_(new detail::RingBufferSlot<T, Atom>[capacity]), |
102 | ticket_(0) {} |
103 | |
104 | /// Perform a single write of an object of type T. |
105 | /// Writes can block iff a previous writer has not yet completed a write |
106 | /// for the same slot (before the most recent wrap-around). |
107 | template <typename V> |
108 | void write(V& value) noexcept { |
109 | uint64_t ticket = ticket_.fetch_add(1); |
110 | slots_[idx(ticket)].write(turn(ticket), value); |
111 | } |
112 | |
113 | /// Perform a single write of an object of type T. |
114 | /// Writes can block iff a previous writer has not yet completed a write |
115 | /// for the same slot (before the most recent wrap-around). |
116 | /// Returns a Cursor pointing to the just-written T. |
117 | template <typename V> |
118 | Cursor writeAndGetCursor(V& value) noexcept { |
119 | uint64_t ticket = ticket_.fetch_add(1); |
120 | slots_[idx(ticket)].write(turn(ticket), value); |
121 | return Cursor(ticket); |
122 | } |
123 | |
124 | /// Read the value at the cursor. |
125 | /// Returns true if the read succeeded, false otherwise. If the return |
126 | /// value is false, dest is to be considered partially read and in an |
127 | /// inconsistent state. Readers are advised to discard it. |
128 | template <typename V> |
129 | bool tryRead(V& dest, const Cursor& cursor) noexcept { |
130 | return slots_[idx(cursor.ticket)].tryRead(dest, turn(cursor.ticket)); |
131 | } |
132 | |
133 | /// Read the value at the cursor or block if the write has not occurred yet. |
134 | /// Returns true if the read succeeded, false otherwise. If the return |
135 | /// value is false, dest is to be considered partially read and in an |
136 | /// inconsistent state. Readers are advised to discard it. |
137 | template <typename V> |
138 | bool waitAndTryRead(V& dest, const Cursor& cursor) noexcept { |
139 | return slots_[idx(cursor.ticket)].waitAndTryRead(dest, turn(cursor.ticket)); |
140 | } |
141 | |
142 | /// Returns a Cursor pointing to the first write that has not occurred yet. |
143 | Cursor currentHead() noexcept { |
144 | return Cursor(ticket_.load()); |
145 | } |
146 | |
147 | /// Returns a Cursor pointing to a currently readable write. |
148 | /// skipFraction is a value in the [0, 1] range indicating how far into the |
149 | /// currently readable window to place the cursor. 0 means the |
150 | /// earliest readable write, 1 means the latest readable write (if any). |
151 | Cursor currentTail(double skipFraction = 0.0) noexcept { |
152 | assert(skipFraction >= 0.0 && skipFraction <= 1.0); |
153 | uint64_t ticket = ticket_.load(); |
154 | |
155 | uint64_t backStep = llround((1.0 - skipFraction) * capacity_); |
156 | |
157 | // always try to move at least one step backward to something readable |
158 | backStep = std::max<uint64_t>(1, backStep); |
159 | |
160 | // can't go back more steps than we've taken |
161 | backStep = std::min(ticket, backStep); |
162 | |
163 | return Cursor(ticket - backStep); |
164 | } |
165 | |
166 | ~LockFreeRingBuffer() {} |
167 | |
168 | private: |
169 | const uint32_t capacity_; |
170 | |
171 | const std::unique_ptr<detail::RingBufferSlot<T, Atom>[]> slots_; |
172 | |
173 | Atom<uint64_t> ticket_; |
174 | |
175 | uint32_t idx(uint64_t ticket) noexcept { |
176 | return ticket % capacity_; |
177 | } |
178 | |
179 | uint32_t turn(uint64_t ticket) noexcept { |
180 | return (uint32_t)(ticket / capacity_); |
181 | } |
182 | }; // LockFreeRingBuffer |
183 | |
184 | namespace detail { |
185 | template <typename T, template <typename> class Atom> |
186 | class RingBufferSlot { |
187 | void copy(T& dest, T& src) { |
188 | memcpy(&dest, &src, sizeof(T)); |
189 | } |
190 | |
191 | template <typename V> |
192 | void copy(V& dest, T& src) { |
193 | dest = src; |
194 | } |
195 | |
196 | public: |
197 | explicit RingBufferSlot() noexcept : sequencer_(), data() {} |
198 | |
199 | template <typename V> |
200 | void write(const uint32_t turn, V& value) noexcept { |
201 | Atom<uint32_t> cutoff(0); |
202 | sequencer_.waitForTurn(turn * 2, cutoff, false); |
203 | |
204 | // Change to an odd-numbered turn to indicate write in process |
205 | sequencer_.completeTurn(turn * 2); |
206 | |
207 | data = std::move(value); |
208 | sequencer_.completeTurn(turn * 2 + 1); |
209 | // At (turn + 1) * 2 |
210 | } |
211 | |
212 | template <typename V> |
213 | bool waitAndTryRead(V& dest, uint32_t turn) noexcept { |
214 | uint32_t desired_turn = (turn + 1) * 2; |
215 | Atom<uint32_t> cutoff(0); |
216 | if (sequencer_.tryWaitForTurn(desired_turn, cutoff, false) != |
217 | TurnSequencer<Atom>::TryWaitResult::SUCCESS) { |
218 | return false; |
219 | } |
220 | copy(dest, data); |
221 | |
222 | // if it's still the same turn, we read the value successfully |
223 | return sequencer_.isTurn(desired_turn); |
224 | } |
225 | |
226 | template <typename V> |
227 | bool tryRead(V& dest, uint32_t turn) noexcept { |
228 | // The write that started at turn 0 ended at turn 2 |
229 | if (!sequencer_.isTurn((turn + 1) * 2)) { |
230 | return false; |
231 | } |
232 | copy(dest, data); |
233 | |
234 | // if it's still the same turn, we read the value successfully |
235 | return sequencer_.isTurn((turn + 1) * 2); |
236 | } |
237 | |
238 | private: |
239 | TurnSequencer<Atom> sequencer_; |
240 | T data; |
241 | }; // RingBufferSlot |
242 | |
243 | } // namespace detail |
244 | |
245 | } // namespace folly |
246 | |