1/*
2 * Copyright 2013-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <folly/MPMCQueue.h>
20
21namespace folly {
22
23template <class T, class... Stages>
24class MPMCPipeline;
25
26template <class T, size_t Amp>
27class MPMCPipelineStage {
28 public:
29 typedef T value_type;
30 static constexpr size_t kAmplification = Amp;
31};
32
33namespace detail {
34
35/**
36 * Helper template to determine value type and amplification whether or not
37 * we use MPMCPipelineStage<>
38 */
39template <class T>
40struct PipelineStageInfo {
41 static constexpr size_t kAmplification = 1;
42 typedef T value_type;
43};
44
45template <class T, size_t Amp>
46struct PipelineStageInfo<MPMCPipelineStage<T, Amp>> {
47 static constexpr size_t kAmplification = Amp;
48 typedef T value_type;
49};
50
51/**
52 * Wrapper around MPMCQueue (friend) that keeps track of tickets.
53 */
54template <class T>
55class MPMCPipelineStageImpl {
56 public:
57 typedef T value_type;
58 template <class U, class... Stages>
59 friend class MPMCPipeline;
60
61 // Implicit so that MPMCPipeline construction works
62 /* implicit */ MPMCPipelineStageImpl(size_t capacity) : queue_(capacity) {}
63 MPMCPipelineStageImpl() {}
64
65 // only use on first stage, uses queue_.pushTicket_ instead of existing
66 // ticket
67 template <class... Args>
68 void blockingWrite(Args&&... args) noexcept {
69 queue_.blockingWrite(std::forward<Args>(args)...);
70 }
71
72 template <class... Args>
73 bool write(Args&&... args) noexcept {
74 return queue_.write(std::forward<Args>(args)...);
75 }
76
77 template <class... Args>
78 void blockingWriteWithTicket(uint64_t ticket, Args&&... args) noexcept {
79 queue_.enqueueWithTicket(ticket, std::forward<Args>(args)...);
80 }
81
82 uint64_t blockingRead(T& elem) noexcept {
83 uint64_t ticket;
84 queue_.blockingReadWithTicket(ticket, elem);
85 return ticket;
86 }
87
88 bool read(T& elem) noexcept { // only use on last stage, won't track ticket
89 return queue_.read(elem);
90 }
91
92 template <class... Args>
93 bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept {
94 return queue_.readAndGetTicket(ticket, elem);
95 }
96
97 // See MPMCQueue<T>::writeCount; only works for the first stage
98 uint64_t writeCount() const noexcept {
99 return queue_.writeCount();
100 }
101
102 uint64_t readCount() const noexcept {
103 return queue_.readCount();
104 }
105
106 private:
107 MPMCQueue<T> queue_;
108};
109
110// Product of amplifications of a tuple of PipelineStageInfo<X>
111template <class Tuple>
112struct AmplificationProduct;
113
114template <>
115struct AmplificationProduct<std::tuple<>> {
116 static constexpr size_t value = 1;
117};
118
119template <class T, class... Ts>
120struct AmplificationProduct<std::tuple<T, Ts...>> {
121 static constexpr size_t value =
122 T::kAmplification * AmplificationProduct<std::tuple<Ts...>>::value;
123};
124
125} // namespace detail
126} // namespace folly
127