1/*
2 * Copyright 2014-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef FOLLY_GEN_PARALLEL_H_
18#error This file may only be included from folly/gen/Parallel.h
19#endif
20
21#include <folly/MPMCQueue.h>
22#include <folly/ScopeGuard.h>
23#include <folly/experimental/EventCount.h>
24#include <atomic>
25#include <thread>
26#include <vector>
27
28namespace folly {
29namespace gen {
30namespace detail {
31
32template <typename T>
33class ClosableMPMCQueue {
34 MPMCQueue<T> queue_;
35 std::atomic<size_t> producers_{0};
36 std::atomic<size_t> consumers_{0};
37 folly::EventCount wakeProducer_;
38 folly::EventCount wakeConsumer_;
39
40 public:
41 explicit ClosableMPMCQueue(size_t capacity) : queue_(capacity) {}
42
43 ~ClosableMPMCQueue() {
44 CHECK(!producers());
45 CHECK(!consumers());
46 }
47
48 void openProducer() {
49 ++producers_;
50 }
51 void openConsumer() {
52 ++consumers_;
53 }
54
55 void closeInputProducer() {
56 size_t producers = producers_--;
57 CHECK(producers);
58 if (producers == 1) { // last producer
59 wakeConsumer_.notifyAll();
60 }
61 }
62
63 void closeOutputConsumer() {
64 size_t consumers = consumers_--;
65 CHECK(consumers);
66 if (consumers == 1) { // last consumer
67 wakeProducer_.notifyAll();
68 }
69 }
70
71 size_t producers() const {
72 return producers_.load(std::memory_order_acquire);
73 }
74
75 size_t consumers() const {
76 return consumers_.load(std::memory_order_acquire);
77 }
78
79 template <typename... Args>
80 bool writeUnlessFull(Args&&... args) noexcept {
81 if (queue_.write(std::forward<Args>(args)...)) {
82 // wake consumers to pick up new value
83 wakeConsumer_.notify();
84 return true;
85 }
86 return false;
87 }
88
89 template <typename... Args>
90 bool writeUnlessClosed(Args&&... args) {
91 // write if there's room
92 while (!queue_.writeIfNotFull(std::forward<Args>(args)...)) {
93 // if write fails, check if there are still consumers listening
94 auto key = wakeProducer_.prepareWait();
95 if (!consumers()) {
96 // no consumers left; bail out
97 wakeProducer_.cancelWait();
98 return false;
99 }
100 wakeProducer_.wait(key);
101 }
102 // wake consumers to pick up new value
103 wakeConsumer_.notify();
104 return true;
105 }
106
107 bool readUnlessEmpty(T& out) {
108 if (queue_.read(out)) {
109 // wake producers to fill empty space
110 wakeProducer_.notify();
111 return true;
112 }
113 return false;
114 }
115
116 bool readUnlessClosed(T& out) {
117 while (!queue_.readIfNotEmpty(out)) {
118 auto key = wakeConsumer_.prepareWait();
119 if (!producers()) {
120 // wake producers to fill empty space
121 wakeProducer_.notify();
122 return false;
123 }
124 wakeConsumer_.wait(key);
125 }
126 // wake writers blocked by full queue
127 wakeProducer_.notify();
128 return true;
129 }
130};
131
132template <class Sink>
133class Sub : public Operator<Sub<Sink>> {
134 Sink sink_;
135
136 public:
137 explicit Sub(Sink sink) : sink_(sink) {}
138
139 template <
140 class Value,
141 class Source,
142 class Result =
143 decltype(std::declval<Sink>().compose(std::declval<Source>())),
144 class Just = SingleCopy<typename std::decay<Result>::type>>
145 Just compose(const GenImpl<Value, Source>& source) const {
146 return Just(source | sink_);
147 }
148};
149
150template <class Ops>
151class Parallel : public Operator<Parallel<Ops>> {
152 Ops ops_;
153 size_t threads_;
154
155 public:
156 Parallel(Ops ops, size_t threads) : ops_(std::move(ops)), threads_(threads) {}
157
158 template <
159 class Input,
160 class Source,
161 class InputDecayed = typename std::decay<Input>::type,
162 class Composed =
163 decltype(std::declval<Ops>().compose(Empty<InputDecayed&&>())),
164 class Output = typename Composed::ValueType,
165 class OutputDecayed = typename std::decay<Output>::type>
166 class Generator : public GenImpl<
167 OutputDecayed&&,
168 Generator<
169 Input,
170 Source,
171 InputDecayed,
172 Composed,
173 Output,
174 OutputDecayed>> {
175 Source source_;
176 Ops ops_;
177 size_t threads_;
178
179 using InQueue = ClosableMPMCQueue<InputDecayed>;
180 using OutQueue = ClosableMPMCQueue<OutputDecayed>;
181
182 class Puller : public GenImpl<InputDecayed&&, Puller> {
183 InQueue* queue_;
184
185 public:
186 explicit Puller(InQueue* queue) : queue_(queue) {}
187
188 template <class Handler>
189 bool apply(Handler&& handler) const {
190 InputDecayed input;
191 while (queue_->readUnlessClosed(input)) {
192 if (!handler(std::move(input))) {
193 return false;
194 }
195 }
196 return true;
197 }
198
199 template <class Body>
200 void foreach(Body&& body) const {
201 InputDecayed input;
202 while (queue_->readUnlessClosed(input)) {
203 body(std::move(input));
204 }
205 }
206 };
207
208 template <bool all = false>
209 class Pusher : public Operator<Pusher<all>> {
210 OutQueue* queue_;
211
212 public:
213 explicit Pusher(OutQueue* queue) : queue_(queue) {}
214
215 template <class Value, class InnerSource>
216 void compose(const GenImpl<Value, InnerSource>& source) const {
217 if (all) {
218 source.self().foreach([&](Value value) {
219 queue_->writeUnlessClosed(std::forward<Value>(value));
220 });
221 } else {
222 source.self().apply([&](Value value) {
223 return queue_->writeUnlessClosed(std::forward<Value>(value));
224 });
225 }
226 }
227 };
228
229 template <bool all = false>
230 class Executor {
231 InQueue inQueue_;
232 OutQueue outQueue_;
233 Puller puller_;
234 Pusher<all> pusher_;
235 std::vector<std::thread> workers_;
236 const Ops* ops_;
237
238 void work() {
239 puller_ | *ops_ | pusher_;
240 }
241
242 public:
243 Executor(size_t threads, const Ops* ops)
244 : inQueue_(threads * 4),
245 outQueue_(threads * 4),
246 puller_(&inQueue_),
247 pusher_(&outQueue_),
248 ops_(ops) {
249 inQueue_.openProducer();
250 outQueue_.openConsumer();
251 for (size_t t = 0; t < threads; ++t) {
252 inQueue_.openConsumer();
253 outQueue_.openProducer();
254 workers_.emplace_back([this] {
255 SCOPE_EXIT {
256 inQueue_.closeOutputConsumer();
257 outQueue_.closeInputProducer();
258 };
259 this->work();
260 });
261 }
262 }
263
264 ~Executor() {
265 if (inQueue_.producers()) {
266 inQueue_.closeInputProducer();
267 }
268 if (outQueue_.consumers()) {
269 outQueue_.closeOutputConsumer();
270 }
271 while (!workers_.empty()) {
272 workers_.back().join();
273 workers_.pop_back();
274 }
275 CHECK(!inQueue_.consumers());
276 CHECK(!outQueue_.producers());
277 }
278
279 void closeInputProducer() {
280 inQueue_.closeInputProducer();
281 }
282
283 void closeOutputConsumer() {
284 outQueue_.closeOutputConsumer();
285 }
286
287 bool writeUnlessClosed(Input&& input) {
288 return inQueue_.writeUnlessClosed(std::forward<Input>(input));
289 }
290
291 bool writeUnlessFull(Input&& input) {
292 return inQueue_.writeUnlessFull(std::forward<Input>(input));
293 }
294
295 bool readUnlessClosed(OutputDecayed& output) {
296 return outQueue_.readUnlessClosed(output);
297 }
298
299 bool readUnlessEmpty(OutputDecayed& output) {
300 return outQueue_.readUnlessEmpty(output);
301 }
302 };
303
304 public:
305 Generator(Source source, Ops ops, size_t threads)
306 : source_(std::move(source)),
307 ops_(std::move(ops)),
308 threads_(
309 threads
310 ? threads
311 : size_t(std::max<long>(1, sysconf(_SC_NPROCESSORS_CONF)))) {}
312
313 template <class Handler>
314 bool apply(Handler&& handler) const {
315 Executor<false> executor(threads_, &ops_);
316 bool more = true;
317 source_.apply([&](Input input) {
318 if (executor.writeUnlessFull(std::forward<Input>(input))) {
319 return true;
320 }
321 OutputDecayed output;
322 while (executor.readUnlessEmpty(output)) {
323 if (!handler(std::move(output))) {
324 more = false;
325 return false;
326 }
327 }
328 if (!executor.writeUnlessClosed(std::forward<Input>(input))) {
329 return false;
330 }
331 return true;
332 });
333 executor.closeInputProducer();
334
335 if (more) {
336 OutputDecayed output;
337 while (executor.readUnlessClosed(output)) {
338 if (!handler(std::move(output))) {
339 more = false;
340 break;
341 }
342 }
343 }
344 executor.closeOutputConsumer();
345
346 return more;
347 }
348
349 template <class Body>
350 void foreach(Body&& body) const {
351 Executor<true> executor(threads_, &ops_);
352 source_.foreach([&](Input input) {
353 if (executor.writeUnlessFull(std::forward<Input>(input))) {
354 return;
355 }
356 OutputDecayed output;
357 while (executor.readUnlessEmpty(output)) {
358 body(std::move(output));
359 }
360 CHECK(executor.writeUnlessClosed(std::forward<Input>(input)));
361 });
362 executor.closeInputProducer();
363
364 OutputDecayed output;
365 while (executor.readUnlessClosed(output)) {
366 body(std::move(output));
367 }
368 executor.closeOutputConsumer();
369 }
370 };
371
372 template <class Value, class Source>
373 Generator<Value, Source> compose(const GenImpl<Value, Source>& source) const {
374 return Generator<Value, Source>(source.self(), ops_, threads_);
375 }
376
377 template <class Value, class Source>
378 Generator<Value, Source> compose(GenImpl<Value, Source>&& source) const {
379 return Generator<Value, Source>(std::move(source.self()), ops_, threads_);
380 }
381};
382
383/**
384 * ChunkedRangeSource - For slicing up ranges into a sequence of chunks given a
385 * maximum chunk size.
386 *
387 * Usually used through the 'chunked' helper, like:
388 *
389 * int n
390 * = chunked(values)
391 * | parallel // each thread processes a chunk
392 * | concat // but can still process values one at a time
393 * | filter(isPrime)
394 * | atomic_count;
395 */
396template <class Iterator>
397class ChunkedRangeSource
398 : public GenImpl<RangeSource<Iterator>&&, ChunkedRangeSource<Iterator>> {
399 int chunkSize_;
400 Range<Iterator> range_;
401
402 public:
403 ChunkedRangeSource() = default;
404 ChunkedRangeSource(int chunkSize, Range<Iterator> range)
405 : chunkSize_(chunkSize), range_(std::move(range)) {}
406
407 template <class Handler>
408 bool apply(Handler&& handler) const {
409 auto remaining = range_;
410 while (!remaining.empty()) {
411 auto chunk = remaining.subpiece(0, chunkSize_);
412 remaining.advance(chunk.size());
413 auto gen = RangeSource<Iterator>(chunk);
414 if (!handler(std::move(gen))) {
415 return false;
416 }
417 }
418 return true;
419 }
420};
421
422} // namespace detail
423
424} // namespace gen
425} // namespace folly
426