1/*
2 * Copyright 2014-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18#define FOLLY_GEN_PARALLEL_H_
19
20#include <mutex>
21
22#include <folly/gen/Base.h>
23
24namespace folly {
25namespace gen {
26namespace detail {
27
28template <class Ops>
29class Parallel;
30
31template <class Sink>
32class Sub;
33
34template <class Iterator>
35class ChunkedRangeSource;
36
37} // namespace detail
38
39/**
40 * chunked() - For producing values from a container in slices.
41 *
42 * Especially for use with 'parallel()', chunked can be used to process values
43 * from a persistent container in chunks larger than one value at a time. The
44 * values produced are generators for slices of the input container. */
45template <
46 class Container,
47 class Iterator = typename Container::const_iterator,
48 class Chunked = detail::ChunkedRangeSource<Iterator>>
49Chunked chunked(const Container& container, int chunkSize = 256) {
50 return Chunked(chunkSize, folly::range(container.begin(), container.end()));
51}
52
53template <
54 class Container,
55 class Iterator = typename Container::iterator,
56 class Chunked = detail::ChunkedRangeSource<Iterator>>
57Chunked chunked(Container& container, int chunkSize = 256) {
58 return Chunked(chunkSize, folly::range(container.begin(), container.end()));
59}
60
61/**
62 * parallel - A parallelization operator.
63 *
64 * 'parallel(ops)' can be used with any generator to process a segment
65 * of the pipeline in parallel. Multiple threads are used to apply the
66 * operations ('ops') to the input sequence, with the resulting sequence
67 * interleaved to be processed on the client thread.
68 *
69 * auto scoredResults
70 * = from(ids)
71 * | parallel(map(fetchObj) | filter(isValid) | map(scoreObj))
72 * | as<vector>();
73 *
74 * Operators specified for parallel execution must yield sequences, not just
75 * individual values. If a sink function such as 'count' is desired, it must be
76 * wrapped in 'sub' to produce a subcount, since any such aggregation must be
77 * re-aggregated.
78 *
79 * auto matches
80 * = from(docs)
81 * | parallel(filter(expensiveTest) | sub(count))
82 * | sum;
83 *
84 * Here, each thread counts its portion of the result, then the sub-counts are
85 * summed up to produce the total count.
86 */
87template <class Ops, class Parallel = detail::Parallel<Ops>>
88Parallel parallel(Ops ops, size_t threads = 0) {
89 return Parallel(std::move(ops), threads);
90}
91
92/**
93 * sub - For sub-summarization of a sequence.
94 *
95 * 'sub' can be used to apply a sink function to a generator, but wrap the
96 * single value in another generator. Note that the sink is eagerly evaluated on
97 * the input sequence.
98 *
99 * auto sum = from(list) | sub(count) | first;
100 *
101 * This is primarily used with 'parallel', as noted above.
102 */
103template <class Sink, class Sub = detail::Sub<Sink>>
104Sub sub(Sink sink) {
105 return Sub(std::move(sink));
106}
107} // namespace gen
108} // namespace folly
109
110#include <folly/gen/Parallel-inl.h>
111