1 | /* |
2 | * Copyright 2013-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <folly/MPMCQueue.h> |
20 | |
21 | namespace folly { |
22 | |
23 | template <class T, class... Stages> |
24 | class MPMCPipeline; |
25 | |
26 | template <class T, size_t Amp> |
27 | class MPMCPipelineStage { |
28 | public: |
29 | typedef T value_type; |
30 | static constexpr size_t kAmplification = Amp; |
31 | }; |
32 | |
33 | namespace detail { |
34 | |
35 | /** |
36 | * Helper template to determine value type and amplification whether or not |
37 | * we use MPMCPipelineStage<> |
38 | */ |
39 | template <class T> |
40 | struct PipelineStageInfo { |
41 | static constexpr size_t kAmplification = 1; |
42 | typedef T value_type; |
43 | }; |
44 | |
45 | template <class T, size_t Amp> |
46 | struct PipelineStageInfo<MPMCPipelineStage<T, Amp>> { |
47 | static constexpr size_t kAmplification = Amp; |
48 | typedef T value_type; |
49 | }; |
50 | |
51 | /** |
52 | * Wrapper around MPMCQueue (friend) that keeps track of tickets. |
53 | */ |
54 | template <class T> |
55 | class MPMCPipelineStageImpl { |
56 | public: |
57 | typedef T value_type; |
58 | template <class U, class... Stages> |
59 | friend class MPMCPipeline; |
60 | |
61 | // Implicit so that MPMCPipeline construction works |
62 | /* implicit */ MPMCPipelineStageImpl(size_t capacity) : queue_(capacity) {} |
63 | MPMCPipelineStageImpl() {} |
64 | |
65 | // only use on first stage, uses queue_.pushTicket_ instead of existing |
66 | // ticket |
67 | template <class... Args> |
68 | void blockingWrite(Args&&... args) noexcept { |
69 | queue_.blockingWrite(std::forward<Args>(args)...); |
70 | } |
71 | |
72 | template <class... Args> |
73 | bool write(Args&&... args) noexcept { |
74 | return queue_.write(std::forward<Args>(args)...); |
75 | } |
76 | |
77 | template <class... Args> |
78 | void blockingWriteWithTicket(uint64_t ticket, Args&&... args) noexcept { |
79 | queue_.enqueueWithTicket(ticket, std::forward<Args>(args)...); |
80 | } |
81 | |
82 | uint64_t blockingRead(T& elem) noexcept { |
83 | uint64_t ticket; |
84 | queue_.blockingReadWithTicket(ticket, elem); |
85 | return ticket; |
86 | } |
87 | |
88 | bool read(T& elem) noexcept { // only use on last stage, won't track ticket |
89 | return queue_.read(elem); |
90 | } |
91 | |
92 | template <class... Args> |
93 | bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept { |
94 | return queue_.readAndGetTicket(ticket, elem); |
95 | } |
96 | |
97 | // See MPMCQueue<T>::writeCount; only works for the first stage |
98 | uint64_t writeCount() const noexcept { |
99 | return queue_.writeCount(); |
100 | } |
101 | |
102 | uint64_t readCount() const noexcept { |
103 | return queue_.readCount(); |
104 | } |
105 | |
106 | private: |
107 | MPMCQueue<T> queue_; |
108 | }; |
109 | |
110 | // Product of amplifications of a tuple of PipelineStageInfo<X> |
111 | template <class Tuple> |
112 | struct AmplificationProduct; |
113 | |
114 | template <> |
115 | struct AmplificationProduct<std::tuple<>> { |
116 | static constexpr size_t value = 1; |
117 | }; |
118 | |
119 | template <class T, class... Ts> |
120 | struct AmplificationProduct<std::tuple<T, Ts...>> { |
121 | static constexpr size_t value = |
122 | T::kAmplification * AmplificationProduct<std::tuple<Ts...>>::value; |
123 | }; |
124 | |
125 | } // namespace detail |
126 | } // namespace folly |
127 | |