1/*
2 * Copyright 2013-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <utility>
20
21#include <glog/logging.h>
22
23#include <folly/Portability.h>
24#include <folly/detail/MPMCPipelineDetail.h>
25
26namespace folly {
27
28/**
29 * Helper tag template to use amplification > 1
30 */
31template <class T, size_t Amp>
32class MPMCPipelineStage;
33
34/**
35 * Multi-Producer, Multi-Consumer pipeline.
36 *
37 * A N-stage pipeline is a combination of N+1 MPMC queues (see MPMCQueue.h).
38 *
39 * At each stage, you may dequeue the results from the previous stage (possibly
40 * from multiple threads) and enqueue results to the next stage. Regardless of
41 * the order of completion, data is delivered to the next stage in the original
42 * order. Each input is matched with a "ticket" which must be produced
43 * when enqueueing to the next stage.
44 *
45 * A given stage must produce exactly K ("amplification factor", default K=1)
46 * results for every input. This is enforced by requiring that each ticket
47 * is used exactly K times.
48 *
49 * Usage:
50 *
51 * // arguments are queue sizes
52 * MPMCPipeline<int, std::string, int> pipeline(10, 10, 10);
53 *
54 * pipeline.blockingWrite(42);
55 *
56 * {
57 * int val;
58 * auto ticket = pipeline.blockingReadStage<0>(val);
59 * pipeline.blockingWriteStage<0>(ticket, folly::to<std::string>(val));
60 * }
61 *
62 * {
63 * std::string val;
64 * auto ticket = pipeline.blockingReadStage<1>(val);
65 * int ival = 0;
66 * try {
67 * ival = folly::to<int>(val);
68 * } catch (...) {
69 * // We must produce exactly 1 output even on exception!
70 * }
71 * pipeline.blockingWriteStage<1>(ticket, ival);
72 * }
73 *
74 * int result;
75 * pipeline.blockingRead(result);
76 * // result == 42
77 *
78 * To specify amplification factors greater than 1, use
79 * MPMCPipelineStage<T, amplification> instead of T in the declaration:
80 *
81 * MPMCPipeline<int,
82 * MPMCPipelineStage<std::string, 2>,
83 * MPMCPipelineStage<int, 4>>
84 *
85 * declares a two-stage pipeline: the first stage produces 2 strings
86 * for each input int, the second stage produces 4 ints for each input string,
87 * so, overall, the pipeline produces 2*4 = 8 ints for each input int.
88 *
89 * Implementation details: we use N+1 MPMCQueue objects; each intermediate
90 * queue connects two adjacent stages. The MPMCQueue implementation is abused;
91 * instead of using it as a queue, we insert in the output queue at the
92 * position determined by the input queue's popTicket_. We guarantee that
93 * all slots are filled (and therefore the queue doesn't freeze) because
94 * we require that each step produces exactly K outputs for every input.
95 */
96template <class In, class... Stages>
97class MPMCPipeline {
98 typedef std::tuple<detail::PipelineStageInfo<Stages>...> StageInfos;
99 typedef std::tuple<
100 detail::MPMCPipelineStageImpl<In>,
101 detail::MPMCPipelineStageImpl<
102 typename detail::PipelineStageInfo<Stages>::value_type>...>
103 StageTuple;
104 static constexpr size_t kAmplification =
105 detail::AmplificationProduct<StageInfos>::value;
106
107 class TicketBaseDebug {
108 public:
109 TicketBaseDebug() noexcept : owner_(nullptr), value_(0xdeadbeeffaceb00c) {}
110 TicketBaseDebug(TicketBaseDebug&& other) noexcept
111 : owner_(std::exchange(other.owner_, nullptr)),
112 value_(std::exchange(other.value_, 0xdeadbeeffaceb00c)) {}
113 explicit TicketBaseDebug(MPMCPipeline* owner, uint64_t value) noexcept
114 : owner_(owner), value_(value) {}
115 void check_owner(MPMCPipeline* owner) const {
116 CHECK(owner == owner_);
117 }
118
119 MPMCPipeline* owner_;
120 uint64_t value_;
121 };
122
123 class TicketBaseNDebug {
124 public:
125 TicketBaseNDebug() = default;
126 TicketBaseNDebug(TicketBaseNDebug&&) = default;
127 explicit TicketBaseNDebug(MPMCPipeline*, uint64_t value) noexcept
128 : value_(value) {}
129 void check_owner(MPMCPipeline*) const {}
130
131 uint64_t value_;
132 };
133
134 using TicketBase =
135 std::conditional_t<kIsDebug, TicketBaseDebug, TicketBaseNDebug>;
136
137 public:
138 /**
139 * Ticket, returned by blockingReadStage, must be given back to
140 * blockingWriteStage. Tickets are not thread-safe.
141 */
142 template <size_t Stage>
143 class Ticket : TicketBase {
144 public:
145 ~Ticket() noexcept {
146 CHECK_EQ(remainingUses_, 0) << "All tickets must be completely used!";
147 }
148
149 Ticket() noexcept : remainingUses_(0) {}
150
151 Ticket(Ticket&& other) noexcept
152 : TicketBase(static_cast<TicketBase&&>(other)),
153 remainingUses_(std::exchange(other.remainingUses_, 0)) {}
154
155 Ticket& operator=(Ticket&& other) noexcept {
156 if (this != &other) {
157 this->~Ticket();
158 new (this) Ticket(std::move(other));
159 }
160 return *this;
161 }
162
163 private:
164 friend class MPMCPipeline;
165 size_t remainingUses_;
166
167 Ticket(MPMCPipeline* owner, size_t amplification, uint64_t value) noexcept
168 : TicketBase(owner, value * amplification),
169 remainingUses_(amplification) {}
170
171 uint64_t use(MPMCPipeline* owner) {
172 CHECK_GT(remainingUses_--, 0);
173 TicketBase::check_owner(owner);
174 return TicketBase::value_++;
175 }
176 };
177
178 /**
179 * Default-construct pipeline. Useful to move-assign later,
180 * just like MPMCQueue, see MPMCQueue.h for more details.
181 */
182 MPMCPipeline() = default;
183
184 /**
185 * Construct a pipeline with N+1 queue sizes.
186 */
187 template <class... Sizes>
188 explicit MPMCPipeline(Sizes... sizes) : stages_(sizes...) {}
189
190 /**
191 * Push an element into (the first stage of) the pipeline. Blocking.
192 */
193 template <class... Args>
194 void blockingWrite(Args&&... args) {
195 std::get<0>(stages_).blockingWrite(std::forward<Args>(args)...);
196 }
197
198 /**
199 * Try to push an element into (the first stage of) the pipeline.
200 * Non-blocking.
201 */
202 template <class... Args>
203 bool write(Args&&... args) {
204 return std::get<0>(stages_).write(std::forward<Args>(args)...);
205 }
206
207 /**
208 * Read an element for stage Stage and obtain a ticket. Blocking.
209 */
210 template <size_t Stage>
211 Ticket<Stage> blockingReadStage(
212 typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) {
213 return Ticket<Stage>(
214 this,
215 std::tuple_element<Stage, StageInfos>::type::kAmplification,
216 std::get<Stage>(stages_).blockingRead(elem));
217 }
218
219 /**
220 * Try to read an element for stage Stage and obtain a ticket.
221 * Non-blocking.
222 */
223 template <size_t Stage>
224 bool readStage(
225 Ticket<Stage>& ticket,
226 typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) {
227 uint64_t tval;
228 if (!std::get<Stage>(stages_).readAndGetTicket(tval, elem)) {
229 return false;
230 }
231 ticket = Ticket<Stage>(
232 this,
233 std::tuple_element<Stage, StageInfos>::type::kAmplification,
234 tval);
235 return true;
236 }
237
238 /**
239 * Complete an element in stage Stage (pushing it for stage Stage+1).
240 * Blocking.
241 */
242 template <size_t Stage, class... Args>
243 void blockingWriteStage(Ticket<Stage>& ticket, Args&&... args) {
244 std::get<Stage + 1>(stages_).blockingWriteWithTicket(
245 ticket.use(this), std::forward<Args>(args)...);
246 }
247
248 /**
249 * Pop an element from (the final stage of) the pipeline. Blocking.
250 */
251 void blockingRead(typename std::tuple_element<sizeof...(Stages), StageTuple>::
252 type::value_type& elem) {
253 std::get<sizeof...(Stages)>(stages_).blockingRead(elem);
254 }
255
256 /**
257 * Try to pop an element from (the final stage of) the pipeline.
258 * Non-blocking.
259 */
260 bool read(typename std::tuple_element<sizeof...(Stages), StageTuple>::type::
261 value_type& elem) {
262 return std::get<sizeof...(Stages)>(stages_).read(elem);
263 }
264
265 /**
266 * Estimate queue size, measured as values from the last stage.
267 * (so if the pipeline has an amplification factor > 1, pushing an element
268 * into the first stage will cause sizeGuess() to be == amplification factor)
269 * Elements "in flight" (currently processed as part of a stage, so not
270 * in any queue) are also counted.
271 */
272 ssize_t sizeGuess() const noexcept {
273 return ssize_t(
274 std::get<0>(stages_).writeCount() * kAmplification -
275 std::get<sizeof...(Stages)>(stages_).readCount());
276 }
277
278 private:
279 StageTuple stages_;
280};
281
282} // namespace folly
283