1 | /* |
2 | * Copyright 2013-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <utility> |
20 | |
21 | #include <glog/logging.h> |
22 | |
23 | #include <folly/Portability.h> |
24 | #include <folly/detail/MPMCPipelineDetail.h> |
25 | |
26 | namespace folly { |
27 | |
28 | /** |
29 | * Helper tag template to use amplification > 1 |
30 | */ |
31 | template <class T, size_t Amp> |
32 | class MPMCPipelineStage; |
33 | |
34 | /** |
35 | * Multi-Producer, Multi-Consumer pipeline. |
36 | * |
37 | * A N-stage pipeline is a combination of N+1 MPMC queues (see MPMCQueue.h). |
38 | * |
39 | * At each stage, you may dequeue the results from the previous stage (possibly |
40 | * from multiple threads) and enqueue results to the next stage. Regardless of |
41 | * the order of completion, data is delivered to the next stage in the original |
42 | * order. Each input is matched with a "ticket" which must be produced |
43 | * when enqueueing to the next stage. |
44 | * |
45 | * A given stage must produce exactly K ("amplification factor", default K=1) |
46 | * results for every input. This is enforced by requiring that each ticket |
47 | * is used exactly K times. |
48 | * |
49 | * Usage: |
50 | * |
51 | * // arguments are queue sizes |
52 | * MPMCPipeline<int, std::string, int> pipeline(10, 10, 10); |
53 | * |
54 | * pipeline.blockingWrite(42); |
55 | * |
56 | * { |
57 | * int val; |
58 | * auto ticket = pipeline.blockingReadStage<0>(val); |
59 | * pipeline.blockingWriteStage<0>(ticket, folly::to<std::string>(val)); |
60 | * } |
61 | * |
62 | * { |
63 | * std::string val; |
64 | * auto ticket = pipeline.blockingReadStage<1>(val); |
65 | * int ival = 0; |
66 | * try { |
67 | * ival = folly::to<int>(val); |
68 | * } catch (...) { |
69 | * // We must produce exactly 1 output even on exception! |
70 | * } |
71 | * pipeline.blockingWriteStage<1>(ticket, ival); |
72 | * } |
73 | * |
74 | * int result; |
75 | * pipeline.blockingRead(result); |
76 | * // result == 42 |
77 | * |
78 | * To specify amplification factors greater than 1, use |
79 | * MPMCPipelineStage<T, amplification> instead of T in the declaration: |
80 | * |
81 | * MPMCPipeline<int, |
82 | * MPMCPipelineStage<std::string, 2>, |
83 | * MPMCPipelineStage<int, 4>> |
84 | * |
85 | * declares a two-stage pipeline: the first stage produces 2 strings |
86 | * for each input int, the second stage produces 4 ints for each input string, |
87 | * so, overall, the pipeline produces 2*4 = 8 ints for each input int. |
88 | * |
89 | * Implementation details: we use N+1 MPMCQueue objects; each intermediate |
90 | * queue connects two adjacent stages. The MPMCQueue implementation is abused; |
91 | * instead of using it as a queue, we insert in the output queue at the |
92 | * position determined by the input queue's popTicket_. We guarantee that |
93 | * all slots are filled (and therefore the queue doesn't freeze) because |
94 | * we require that each step produces exactly K outputs for every input. |
95 | */ |
96 | template <class In, class... Stages> |
97 | class MPMCPipeline { |
98 | typedef std::tuple<detail::PipelineStageInfo<Stages>...> StageInfos; |
99 | typedef std::tuple< |
100 | detail::MPMCPipelineStageImpl<In>, |
101 | detail::MPMCPipelineStageImpl< |
102 | typename detail::PipelineStageInfo<Stages>::value_type>...> |
103 | StageTuple; |
104 | static constexpr size_t kAmplification = |
105 | detail::AmplificationProduct<StageInfos>::value; |
106 | |
107 | class TicketBaseDebug { |
108 | public: |
109 | TicketBaseDebug() noexcept : owner_(nullptr), value_(0xdeadbeeffaceb00c) {} |
110 | TicketBaseDebug(TicketBaseDebug&& other) noexcept |
111 | : owner_(std::exchange(other.owner_, nullptr)), |
112 | value_(std::exchange(other.value_, 0xdeadbeeffaceb00c)) {} |
113 | explicit TicketBaseDebug(MPMCPipeline* owner, uint64_t value) noexcept |
114 | : owner_(owner), value_(value) {} |
115 | void check_owner(MPMCPipeline* owner) const { |
116 | CHECK(owner == owner_); |
117 | } |
118 | |
119 | MPMCPipeline* owner_; |
120 | uint64_t value_; |
121 | }; |
122 | |
123 | class TicketBaseNDebug { |
124 | public: |
125 | TicketBaseNDebug() = default; |
126 | TicketBaseNDebug(TicketBaseNDebug&&) = default; |
127 | explicit TicketBaseNDebug(MPMCPipeline*, uint64_t value) noexcept |
128 | : value_(value) {} |
129 | void check_owner(MPMCPipeline*) const {} |
130 | |
131 | uint64_t value_; |
132 | }; |
133 | |
134 | using TicketBase = |
135 | std::conditional_t<kIsDebug, TicketBaseDebug, TicketBaseNDebug>; |
136 | |
137 | public: |
138 | /** |
139 | * Ticket, returned by blockingReadStage, must be given back to |
140 | * blockingWriteStage. Tickets are not thread-safe. |
141 | */ |
142 | template <size_t Stage> |
143 | class Ticket : TicketBase { |
144 | public: |
145 | ~Ticket() noexcept { |
146 | CHECK_EQ(remainingUses_, 0) << "All tickets must be completely used!" ; |
147 | } |
148 | |
149 | Ticket() noexcept : remainingUses_(0) {} |
150 | |
151 | Ticket(Ticket&& other) noexcept |
152 | : TicketBase(static_cast<TicketBase&&>(other)), |
153 | remainingUses_(std::exchange(other.remainingUses_, 0)) {} |
154 | |
155 | Ticket& operator=(Ticket&& other) noexcept { |
156 | if (this != &other) { |
157 | this->~Ticket(); |
158 | new (this) Ticket(std::move(other)); |
159 | } |
160 | return *this; |
161 | } |
162 | |
163 | private: |
164 | friend class MPMCPipeline; |
165 | size_t remainingUses_; |
166 | |
167 | Ticket(MPMCPipeline* owner, size_t amplification, uint64_t value) noexcept |
168 | : TicketBase(owner, value * amplification), |
169 | remainingUses_(amplification) {} |
170 | |
171 | uint64_t use(MPMCPipeline* owner) { |
172 | CHECK_GT(remainingUses_--, 0); |
173 | TicketBase::check_owner(owner); |
174 | return TicketBase::value_++; |
175 | } |
176 | }; |
177 | |
178 | /** |
179 | * Default-construct pipeline. Useful to move-assign later, |
180 | * just like MPMCQueue, see MPMCQueue.h for more details. |
181 | */ |
182 | MPMCPipeline() = default; |
183 | |
184 | /** |
185 | * Construct a pipeline with N+1 queue sizes. |
186 | */ |
187 | template <class... Sizes> |
188 | explicit MPMCPipeline(Sizes... sizes) : stages_(sizes...) {} |
189 | |
190 | /** |
191 | * Push an element into (the first stage of) the pipeline. Blocking. |
192 | */ |
193 | template <class... Args> |
194 | void blockingWrite(Args&&... args) { |
195 | std::get<0>(stages_).blockingWrite(std::forward<Args>(args)...); |
196 | } |
197 | |
198 | /** |
199 | * Try to push an element into (the first stage of) the pipeline. |
200 | * Non-blocking. |
201 | */ |
202 | template <class... Args> |
203 | bool write(Args&&... args) { |
204 | return std::get<0>(stages_).write(std::forward<Args>(args)...); |
205 | } |
206 | |
207 | /** |
208 | * Read an element for stage Stage and obtain a ticket. Blocking. |
209 | */ |
210 | template <size_t Stage> |
211 | Ticket<Stage> blockingReadStage( |
212 | typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) { |
213 | return Ticket<Stage>( |
214 | this, |
215 | std::tuple_element<Stage, StageInfos>::type::kAmplification, |
216 | std::get<Stage>(stages_).blockingRead(elem)); |
217 | } |
218 | |
219 | /** |
220 | * Try to read an element for stage Stage and obtain a ticket. |
221 | * Non-blocking. |
222 | */ |
223 | template <size_t Stage> |
224 | bool readStage( |
225 | Ticket<Stage>& ticket, |
226 | typename std::tuple_element<Stage, StageTuple>::type::value_type& elem) { |
227 | uint64_t tval; |
228 | if (!std::get<Stage>(stages_).readAndGetTicket(tval, elem)) { |
229 | return false; |
230 | } |
231 | ticket = Ticket<Stage>( |
232 | this, |
233 | std::tuple_element<Stage, StageInfos>::type::kAmplification, |
234 | tval); |
235 | return true; |
236 | } |
237 | |
238 | /** |
239 | * Complete an element in stage Stage (pushing it for stage Stage+1). |
240 | * Blocking. |
241 | */ |
242 | template <size_t Stage, class... Args> |
243 | void blockingWriteStage(Ticket<Stage>& ticket, Args&&... args) { |
244 | std::get<Stage + 1>(stages_).blockingWriteWithTicket( |
245 | ticket.use(this), std::forward<Args>(args)...); |
246 | } |
247 | |
248 | /** |
249 | * Pop an element from (the final stage of) the pipeline. Blocking. |
250 | */ |
251 | void blockingRead(typename std::tuple_element<sizeof...(Stages), StageTuple>:: |
252 | type::value_type& elem) { |
253 | std::get<sizeof...(Stages)>(stages_).blockingRead(elem); |
254 | } |
255 | |
256 | /** |
257 | * Try to pop an element from (the final stage of) the pipeline. |
258 | * Non-blocking. |
259 | */ |
260 | bool read(typename std::tuple_element<sizeof...(Stages), StageTuple>::type:: |
261 | value_type& elem) { |
262 | return std::get<sizeof...(Stages)>(stages_).read(elem); |
263 | } |
264 | |
265 | /** |
266 | * Estimate queue size, measured as values from the last stage. |
267 | * (so if the pipeline has an amplification factor > 1, pushing an element |
268 | * into the first stage will cause sizeGuess() to be == amplification factor) |
269 | * Elements "in flight" (currently processed as part of a stage, so not |
270 | * in any queue) are also counted. |
271 | */ |
272 | ssize_t sizeGuess() const noexcept { |
273 | return ssize_t( |
274 | std::get<0>(stages_).writeCount() * kAmplification - |
275 | std::get<sizeof...(Stages)>(stages_).readCount()); |
276 | } |
277 | |
278 | private: |
279 | StageTuple stages_; |
280 | }; |
281 | |
282 | } // namespace folly |
283 | |