1/*
2 * Copyright 2015-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <atomic>
20#include <cmath>
21#include <cstring>
22#include <memory>
23#include <type_traits>
24
25#include <boost/noncopyable.hpp>
26
27#include <folly/Portability.h>
28#include <folly/Traits.h>
29#include <folly/detail/TurnSequencer.h>
30#include <folly/portability/Unistd.h>
31
32namespace folly {
33namespace detail {
34
35template <typename T, template <typename> class Atom>
36class RingBufferSlot;
37} // namespace detail
38
39/// LockFreeRingBuffer<T> is a fixed-size, concurrent ring buffer with the
40/// following semantics:
41///
42/// 1. Writers cannot block on other writers UNLESS they are <capacity> writes
43/// apart from each other (writing to the same slot after a wrap-around)
44/// 2. Writers cannot block on readers
45/// 3. Readers can wait for writes that haven't occurred yet
46/// 4. Readers can detect if they are lagging behind
47///
48/// In this sense, reads from this buffer are best-effort but writes
49/// are guaranteed.
50///
51/// Another way to think about this is as an unbounded stream of writes. The
52/// buffer contains the last <capacity> writes but readers can attempt to read
53/// any part of the stream, even outside this window. The read API takes a
54/// Cursor that can point anywhere in this stream of writes. Reads from the
55/// "future" can optionally block but reads from the "past" will always fail.
56///
57
58template <typename T, template <typename> class Atom = std::atomic>
59class LockFreeRingBuffer : boost::noncopyable {
60 static_assert(
61 std::is_nothrow_default_constructible<T>::value,
62 "Element type must be nothrow default constructible");
63
64 static_assert(
65 folly::is_trivially_copyable<T>::value,
66 "Element type must be trivially copyable");
67
68 public:
69 /// Opaque pointer to a past or future write.
70 /// Can be moved relative to its current location but not in absolute terms.
71 struct Cursor {
72 explicit Cursor(uint64_t initialTicket) noexcept : ticket(initialTicket) {}
73
74 /// Returns true if this cursor now points to a different
75 /// write, false otherwise.
76 bool moveForward(uint64_t steps = 1) noexcept {
77 uint64_t prevTicket = ticket;
78 ticket += steps;
79 return prevTicket != ticket;
80 }
81
82 /// Returns true if this cursor now points to a previous
83 /// write, false otherwise.
84 bool moveBackward(uint64_t steps = 1) noexcept {
85 uint64_t prevTicket = ticket;
86 if (steps > ticket) {
87 ticket = 0;
88 } else {
89 ticket -= steps;
90 }
91 return prevTicket != ticket;
92 }
93
94 protected: // for test visibility reasons
95 uint64_t ticket;
96 friend class LockFreeRingBuffer;
97 };
98
99 explicit LockFreeRingBuffer(uint32_t capacity) noexcept
100 : capacity_(capacity),
101 slots_(new detail::RingBufferSlot<T, Atom>[capacity]),
102 ticket_(0) {}
103
104 /// Perform a single write of an object of type T.
105 /// Writes can block iff a previous writer has not yet completed a write
106 /// for the same slot (before the most recent wrap-around).
107 template <typename V>
108 void write(V& value) noexcept {
109 uint64_t ticket = ticket_.fetch_add(1);
110 slots_[idx(ticket)].write(turn(ticket), value);
111 }
112
113 /// Perform a single write of an object of type T.
114 /// Writes can block iff a previous writer has not yet completed a write
115 /// for the same slot (before the most recent wrap-around).
116 /// Returns a Cursor pointing to the just-written T.
117 template <typename V>
118 Cursor writeAndGetCursor(V& value) noexcept {
119 uint64_t ticket = ticket_.fetch_add(1);
120 slots_[idx(ticket)].write(turn(ticket), value);
121 return Cursor(ticket);
122 }
123
124 /// Read the value at the cursor.
125 /// Returns true if the read succeeded, false otherwise. If the return
126 /// value is false, dest is to be considered partially read and in an
127 /// inconsistent state. Readers are advised to discard it.
128 template <typename V>
129 bool tryRead(V& dest, const Cursor& cursor) noexcept {
130 return slots_[idx(cursor.ticket)].tryRead(dest, turn(cursor.ticket));
131 }
132
133 /// Read the value at the cursor or block if the write has not occurred yet.
134 /// Returns true if the read succeeded, false otherwise. If the return
135 /// value is false, dest is to be considered partially read and in an
136 /// inconsistent state. Readers are advised to discard it.
137 template <typename V>
138 bool waitAndTryRead(V& dest, const Cursor& cursor) noexcept {
139 return slots_[idx(cursor.ticket)].waitAndTryRead(dest, turn(cursor.ticket));
140 }
141
142 /// Returns a Cursor pointing to the first write that has not occurred yet.
143 Cursor currentHead() noexcept {
144 return Cursor(ticket_.load());
145 }
146
147 /// Returns a Cursor pointing to a currently readable write.
148 /// skipFraction is a value in the [0, 1] range indicating how far into the
149 /// currently readable window to place the cursor. 0 means the
150 /// earliest readable write, 1 means the latest readable write (if any).
151 Cursor currentTail(double skipFraction = 0.0) noexcept {
152 assert(skipFraction >= 0.0 && skipFraction <= 1.0);
153 uint64_t ticket = ticket_.load();
154
155 uint64_t backStep = llround((1.0 - skipFraction) * capacity_);
156
157 // always try to move at least one step backward to something readable
158 backStep = std::max<uint64_t>(1, backStep);
159
160 // can't go back more steps than we've taken
161 backStep = std::min(ticket, backStep);
162
163 return Cursor(ticket - backStep);
164 }
165
166 ~LockFreeRingBuffer() {}
167
168 private:
169 const uint32_t capacity_;
170
171 const std::unique_ptr<detail::RingBufferSlot<T, Atom>[]> slots_;
172
173 Atom<uint64_t> ticket_;
174
175 uint32_t idx(uint64_t ticket) noexcept {
176 return ticket % capacity_;
177 }
178
179 uint32_t turn(uint64_t ticket) noexcept {
180 return (uint32_t)(ticket / capacity_);
181 }
182}; // LockFreeRingBuffer
183
184namespace detail {
185template <typename T, template <typename> class Atom>
186class RingBufferSlot {
187 void copy(T& dest, T& src) {
188 memcpy(&dest, &src, sizeof(T));
189 }
190
191 template <typename V>
192 void copy(V& dest, T& src) {
193 dest = src;
194 }
195
196 public:
197 explicit RingBufferSlot() noexcept : sequencer_(), data() {}
198
199 template <typename V>
200 void write(const uint32_t turn, V& value) noexcept {
201 Atom<uint32_t> cutoff(0);
202 sequencer_.waitForTurn(turn * 2, cutoff, false);
203
204 // Change to an odd-numbered turn to indicate write in process
205 sequencer_.completeTurn(turn * 2);
206
207 data = std::move(value);
208 sequencer_.completeTurn(turn * 2 + 1);
209 // At (turn + 1) * 2
210 }
211
212 template <typename V>
213 bool waitAndTryRead(V& dest, uint32_t turn) noexcept {
214 uint32_t desired_turn = (turn + 1) * 2;
215 Atom<uint32_t> cutoff(0);
216 if (sequencer_.tryWaitForTurn(desired_turn, cutoff, false) !=
217 TurnSequencer<Atom>::TryWaitResult::SUCCESS) {
218 return false;
219 }
220 copy(dest, data);
221
222 // if it's still the same turn, we read the value successfully
223 return sequencer_.isTurn(desired_turn);
224 }
225
226 template <typename V>
227 bool tryRead(V& dest, uint32_t turn) noexcept {
228 // The write that started at turn 0 ended at turn 2
229 if (!sequencer_.isTurn((turn + 1) * 2)) {
230 return false;
231 }
232 copy(dest, data);
233
234 // if it's still the same turn, we read the value successfully
235 return sequencer_.isTurn((turn + 1) * 2);
236 }
237
238 private:
239 TurnSequencer<Atom> sequencer_;
240 T data;
241}; // RingBufferSlot
242
243} // namespace detail
244
245} // namespace folly
246