1 | /* |
2 | * Copyright 2013-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #include <folly/MPMCPipeline.h> |
18 | |
19 | #include <thread> |
20 | #include <vector> |
21 | |
22 | #include <glog/logging.h> |
23 | |
24 | #include <folly/Conv.h> |
25 | #include <folly/portability/GTest.h> |
26 | |
27 | namespace folly { |
28 | namespace test { |
29 | |
30 | TEST(MPMCPipeline, Trivial) { |
31 | MPMCPipeline<int, std::string> a(2, 2); |
32 | EXPECT_EQ(0, a.sizeGuess()); |
33 | a.blockingWrite(42); |
34 | EXPECT_EQ(1, a.sizeGuess()); |
35 | |
36 | int val; |
37 | auto ticket = a.blockingReadStage<0>(val); |
38 | EXPECT_EQ(42, val); |
39 | EXPECT_EQ(1, a.sizeGuess()); |
40 | |
41 | a.blockingWriteStage<0>(ticket, "hello world" ); |
42 | EXPECT_EQ(1, a.sizeGuess()); |
43 | |
44 | std::string s; |
45 | |
46 | a.blockingRead(s); |
47 | EXPECT_EQ("hello world" , s); |
48 | EXPECT_EQ(0, a.sizeGuess()); |
49 | } |
50 | |
51 | TEST(MPMCPipeline, TrivialAmplification) { |
52 | MPMCPipeline<int, MPMCPipelineStage<std::string, 2>> a(2, 2); |
53 | EXPECT_EQ(0, a.sizeGuess()); |
54 | a.blockingWrite(42); |
55 | EXPECT_EQ(2, a.sizeGuess()); |
56 | |
57 | int val; |
58 | auto ticket = a.blockingReadStage<0>(val); |
59 | EXPECT_EQ(42, val); |
60 | EXPECT_EQ(2, a.sizeGuess()); |
61 | |
62 | a.blockingWriteStage<0>(ticket, "hello world" ); |
63 | EXPECT_EQ(2, a.sizeGuess()); |
64 | a.blockingWriteStage<0>(ticket, "goodbye" ); |
65 | EXPECT_EQ(2, a.sizeGuess()); |
66 | |
67 | std::string s; |
68 | |
69 | a.blockingRead(s); |
70 | EXPECT_EQ("hello world" , s); |
71 | EXPECT_EQ(1, a.sizeGuess()); |
72 | |
73 | a.blockingRead(s); |
74 | EXPECT_EQ("goodbye" , s); |
75 | EXPECT_EQ(0, a.sizeGuess()); |
76 | } |
77 | |
78 | TEST(MPMCPipeline, MultiThreaded) { |
79 | constexpr size_t numThreadsPerStage = 6; |
80 | MPMCPipeline<int, std::string, std::string> a(5, 5, 5); |
81 | |
82 | std::vector<std::thread> threads; |
83 | threads.reserve(numThreadsPerStage * 2 + 1); |
84 | for (size_t i = 0; i < numThreadsPerStage; ++i) { |
85 | threads.emplace_back([&a] { |
86 | for (;;) { |
87 | int val; |
88 | auto ticket = a.blockingReadStage<0>(val); |
89 | if (val == -1) { // stop |
90 | // We still need to propagate |
91 | a.blockingWriteStage<0>(ticket, "" ); |
92 | break; |
93 | } |
94 | a.blockingWriteStage<0>(ticket, folly::to<std::string>(val, " hello" )); |
95 | } |
96 | }); |
97 | } |
98 | |
99 | for (size_t i = 0; i < numThreadsPerStage; ++i) { |
100 | threads.emplace_back([&a] { |
101 | for (;;) { |
102 | std::string val; |
103 | auto ticket = a.blockingReadStage<1>(val); |
104 | if (val.empty()) { // stop |
105 | // We still need to propagate |
106 | a.blockingWriteStage<1>(ticket, "" ); |
107 | break; |
108 | } |
109 | a.blockingWriteStage<1>(ticket, folly::to<std::string>(val, " world" )); |
110 | } |
111 | }); |
112 | } |
113 | |
114 | std::vector<std::string> results; |
115 | threads.emplace_back([&a, &results]() { |
116 | for (;;) { |
117 | std::string val; |
118 | a.blockingRead(val); |
119 | if (val.empty()) { |
120 | break; |
121 | } |
122 | results.push_back(val); |
123 | } |
124 | }); |
125 | |
126 | constexpr size_t numValues = 1000; |
127 | for (size_t i = 0; i < numValues; ++i) { |
128 | a.blockingWrite(i); |
129 | } |
130 | for (size_t i = 0; i < numThreadsPerStage; ++i) { |
131 | a.blockingWrite(-1); |
132 | } |
133 | |
134 | for (auto& t : threads) { |
135 | t.join(); |
136 | } |
137 | |
138 | // The consumer thread dequeued the first empty string, there should be |
139 | // numThreadsPerStage - 1 left. |
140 | EXPECT_EQ(numThreadsPerStage - 1, a.sizeGuess()); |
141 | for (size_t i = 0; i < numThreadsPerStage - 1; ++i) { |
142 | std::string val; |
143 | a.blockingRead(val); |
144 | EXPECT_TRUE(val.empty()); |
145 | } |
146 | { |
147 | std::string tmp; |
148 | EXPECT_FALSE(a.read(tmp)); |
149 | } |
150 | EXPECT_EQ(0, a.sizeGuess()); |
151 | |
152 | EXPECT_EQ(numValues, results.size()); |
153 | for (size_t i = 0; i < results.size(); ++i) { |
154 | EXPECT_EQ(folly::to<std::string>(i, " hello world" ), results[i]); |
155 | } |
156 | } |
157 | |
158 | } // namespace test |
159 | } // namespace folly |
160 | |
161 | int main(int argc, char* argv[]) { |
162 | testing::InitGoogleTest(&argc, argv); |
163 | gflags::ParseCommandLineFlags(&argc, &argv, true); |
164 | return RUN_ALL_TESTS(); |
165 | } |
166 | |