1 | /* |
2 | * Copyright 2015-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #include <boost/thread/barrier.hpp> |
18 | |
19 | #include <folly/Conv.h> |
20 | #include <folly/executors/ManualExecutor.h> |
21 | #include <folly/futures/Future.h> |
22 | #include <folly/portability/GTest.h> |
23 | |
24 | #include <vector> |
25 | |
26 | using namespace folly; |
27 | |
28 | typedef FutureException eggs_t; |
29 | static eggs_t eggs("eggs" ); |
30 | |
31 | TEST(Window, basic) { |
32 | // int -> Future<int> |
33 | auto fn = [](std::vector<int> input, size_t window_size, size_t expect) { |
34 | auto res = |
35 | reduce( |
36 | window(input, [](int i) { return makeFuture(i); }, window_size), |
37 | 0, |
38 | [](int sum, const Try<int>& b) { return sum + *b; }) |
39 | .get(); |
40 | EXPECT_EQ(expect, res); |
41 | }; |
42 | { |
43 | SCOPED_TRACE("2 in-flight at a time" ); |
44 | std::vector<int> input = {1, 2, 3}; |
45 | fn(input, 2, 6); |
46 | } |
47 | { |
48 | SCOPED_TRACE("4 in-flight at a time" ); |
49 | std::vector<int> input = {1, 2, 3}; |
50 | fn(input, 4, 6); |
51 | } |
52 | { |
53 | SCOPED_TRACE("empty input" ); |
54 | std::vector<int> input; |
55 | fn(input, 1, 0); |
56 | } |
57 | { |
58 | // int -> Future<Unit> |
59 | auto res = reduce( |
60 | window( |
61 | std::vector<int>({1, 2, 3}), |
62 | [](int /* i */) { return makeFuture(); }, |
63 | 2), |
64 | 0, |
65 | [](int sum, const Try<Unit>& b) { |
66 | EXPECT_TRUE(b.hasValue()); |
67 | return sum + 1; |
68 | }) |
69 | .get(); |
70 | EXPECT_EQ(3, res); |
71 | } |
72 | { |
73 | // string -> return Future<int> |
74 | auto res = reduce( |
75 | window( |
76 | std::vector<std::string>{"1" , "2" , "3" }, |
77 | [](std::string s) { |
78 | return makeFuture<int>(folly::to<int>(s)); |
79 | }, |
80 | 2), |
81 | 0, |
82 | [](int sum, const Try<int>& b) { return sum + *b; }) |
83 | .get(); |
84 | EXPECT_EQ(6, res); |
85 | } |
86 | { |
87 | // string -> return SemiFuture<int> |
88 | auto res = reduce( |
89 | window( |
90 | std::vector<std::string>{"1" , "2" , "3" }, |
91 | [](std::string s) { |
92 | return makeSemiFuture<int>(folly::to<int>(s)); |
93 | }, |
94 | 2), |
95 | 0, |
96 | [](int sum, const Try<int>& b) { return sum + *b; }) |
97 | .get(); |
98 | EXPECT_EQ(6, res); |
99 | } |
100 | { |
101 | SCOPED_TRACE("repeat same fn" ); |
102 | auto res = |
103 | reduce( |
104 | window( |
105 | 5UL, |
106 | [](size_t iteration) { return folly::makeFuture(iteration); }, |
107 | 2), |
108 | 0UL, |
109 | [](size_t sum, const Try<size_t>& b) { return sum + b.value(); }) |
110 | .get(); |
111 | EXPECT_EQ(0 + 1 + 2 + 3 + 4, res); |
112 | } |
113 | } |
114 | |
115 | TEST(Window, exception) { |
116 | std::vector<int> ints = {1, 2, 3, 4}; |
117 | std::vector<Promise<int>> ps(4); |
118 | |
119 | auto res = reduce( |
120 | window( |
121 | ints, |
122 | [&ps](int i) { |
123 | if (i > 2) { |
124 | throw std::runtime_error("exception should not kill process" ); |
125 | } |
126 | return ps[i].getFuture(); |
127 | }, |
128 | 2), |
129 | 0, |
130 | [](int sum, const Try<int>& b) { |
131 | sum += b.hasException<std::exception>() ? 1 : 0; |
132 | return sum; |
133 | }); |
134 | |
135 | for (auto& p : ps) { |
136 | p.setValue(0); |
137 | } |
138 | |
139 | // Should have received 2 exceptions. |
140 | EXPECT_EQ(2, std::move(res).get()); |
141 | } |
142 | |
143 | TEST(Window, stackOverflow) { |
144 | // Number of futures to spawn. |
145 | static constexpr size_t m = 1000; |
146 | // Size of each block of input and output. |
147 | static constexpr size_t n = 1000; |
148 | |
149 | std::vector<std::array<int, n>> ints; |
150 | int64_t expectedSum = 0; |
151 | for (size_t i = 0; i < m; i++) { |
152 | std::array<int, n> next{}; |
153 | next[i % n] = i; |
154 | ints.emplace_back(next); |
155 | expectedSum += i; |
156 | } |
157 | |
158 | // Try to overflow window's executor. |
159 | auto res = reduce( |
160 | window( |
161 | ints, |
162 | [](std::array<int, n> i) { |
163 | return folly::Future<std::array<int, n>>(i); |
164 | }, |
165 | 1), |
166 | static_cast<int64_t>(0), |
167 | [](int64_t sum, const Try<std::array<int, n>>& b) { |
168 | for (int a : b.value()) { |
169 | sum += a; |
170 | } |
171 | return sum; |
172 | }); |
173 | |
174 | EXPECT_EQ(std::move(res).get(), expectedSum); |
175 | } |
176 | |
177 | TEST(Window, parallel) { |
178 | std::vector<int> input; |
179 | std::vector<Promise<int>> ps(10); |
180 | for (size_t i = 0; i < ps.size(); i++) { |
181 | input.emplace_back(i); |
182 | } |
183 | auto f = collect(window(input, [&](int i) { return ps[i].getFuture(); }, 3)); |
184 | |
185 | std::vector<std::thread> ts; |
186 | boost::barrier barrier(ps.size() + 1); |
187 | for (size_t i = 0; i < ps.size(); i++) { |
188 | ts.emplace_back([&ps, &barrier, i]() { |
189 | barrier.wait(); |
190 | ps[i].setValue(i); |
191 | }); |
192 | } |
193 | |
194 | barrier.wait(); |
195 | |
196 | for (auto& t : ts) { |
197 | t.join(); |
198 | } |
199 | |
200 | EXPECT_TRUE(f.isReady()); |
201 | for (size_t i = 0; i < ps.size(); i++) { |
202 | EXPECT_EQ(i, f.value()[i]); |
203 | } |
204 | } |
205 | |
206 | TEST(Window, parallelWithError) { |
207 | std::vector<int> input; |
208 | std::vector<Promise<int>> ps(10); |
209 | for (size_t i = 0; i < ps.size(); i++) { |
210 | input.emplace_back(i); |
211 | } |
212 | auto f = collect(window(input, [&](int i) { return ps[i].getFuture(); }, 3)); |
213 | |
214 | std::vector<std::thread> ts; |
215 | boost::barrier barrier(ps.size() + 1); |
216 | for (size_t i = 0; i < ps.size(); i++) { |
217 | ts.emplace_back([&ps, &barrier, i]() { |
218 | barrier.wait(); |
219 | if (i == (ps.size() / 2)) { |
220 | ps[i].setException(eggs); |
221 | } else { |
222 | ps[i].setValue(i); |
223 | } |
224 | }); |
225 | } |
226 | |
227 | barrier.wait(); |
228 | |
229 | for (auto& t : ts) { |
230 | t.join(); |
231 | } |
232 | |
233 | EXPECT_TRUE(f.isReady()); |
234 | EXPECT_THROW(f.value(), eggs_t); |
235 | } |
236 | |
237 | TEST(Window, allParallelWithError) { |
238 | std::vector<int> input; |
239 | std::vector<Promise<int>> ps(10); |
240 | for (size_t i = 0; i < ps.size(); i++) { |
241 | input.emplace_back(i); |
242 | } |
243 | auto f = |
244 | collectAll(window(input, [&](int i) { return ps[i].getFuture(); }, 3)); |
245 | |
246 | std::vector<std::thread> ts; |
247 | boost::barrier barrier(ps.size() + 1); |
248 | for (size_t i = 0; i < ps.size(); i++) { |
249 | ts.emplace_back([&ps, &barrier, i]() { |
250 | barrier.wait(); |
251 | if (i == (ps.size() / 2)) { |
252 | ps[i].setException(eggs); |
253 | } else { |
254 | ps[i].setValue(i); |
255 | } |
256 | }); |
257 | } |
258 | |
259 | barrier.wait(); |
260 | |
261 | for (auto& t : ts) { |
262 | t.join(); |
263 | } |
264 | |
265 | EXPECT_TRUE(f.isReady()); |
266 | for (size_t i = 0; i < ps.size(); i++) { |
267 | if (i == (ps.size() / 2)) { |
268 | EXPECT_THROW(f.value()[i].value(), eggs_t); |
269 | } else { |
270 | EXPECT_TRUE(f.value()[i].hasValue()); |
271 | EXPECT_EQ(i, f.value()[i].value()); |
272 | } |
273 | } |
274 | } |
275 | |
276 | TEST(WindowExecutor, basic) { |
277 | ManualExecutor executor; |
278 | |
279 | // int -> Future<int> |
280 | auto fn = [executor_ = &executor]( |
281 | std::vector<int> input, size_t window_size, size_t expect) { |
282 | auto res = reduce( |
283 | window( |
284 | executor_, input, [](int i) { return makeFuture(i); }, window_size), |
285 | 0, |
286 | [](int sum, const Try<int>& b) { return sum + *b; }); |
287 | executor_->waitFor(res); |
288 | EXPECT_EQ(expect, std::move(res).get()); |
289 | }; |
290 | { |
291 | SCOPED_TRACE("2 in-flight at a time" ); |
292 | std::vector<int> input = {1, 2, 3}; |
293 | fn(input, 2, 6); |
294 | } |
295 | { |
296 | SCOPED_TRACE("4 in-flight at a time" ); |
297 | std::vector<int> input = {1, 2, 3}; |
298 | fn(input, 4, 6); |
299 | } |
300 | { |
301 | SCOPED_TRACE("empty input" ); |
302 | std::vector<int> input; |
303 | fn(input, 1, 0); |
304 | } |
305 | { |
306 | // int -> Future<Unit> |
307 | auto res = reduce( |
308 | window( |
309 | &executor, |
310 | std::vector<int>({1, 2, 3}), |
311 | [](int /* i */) { return makeFuture(); }, |
312 | 2), |
313 | 0, |
314 | [](int sum, const Try<Unit>& b) { |
315 | EXPECT_TRUE(b.hasValue()); |
316 | return sum + 1; |
317 | }); |
318 | executor.waitFor(res); |
319 | EXPECT_EQ(3, std::move(res).get()); |
320 | } |
321 | { |
322 | // string -> return Future<int> |
323 | auto res = reduce( |
324 | window( |
325 | &executor, |
326 | std::vector<std::string>{"1" , "2" , "3" }, |
327 | [](std::string s) { return makeFuture<int>(folly::to<int>(s)); }, |
328 | 2), |
329 | 0, |
330 | [](int sum, const Try<int>& b) { return sum + *b; }); |
331 | executor.waitFor(res); |
332 | EXPECT_EQ(6, std::move(res).get()); |
333 | } |
334 | } |
335 | |
336 | TEST(WindowExecutor, parallel) { |
337 | ManualExecutor executor; |
338 | |
339 | std::vector<int> input; |
340 | std::vector<Promise<int>> ps(10); |
341 | for (size_t i = 0; i < ps.size(); i++) { |
342 | input.emplace_back(i); |
343 | } |
344 | auto f = collect( |
345 | window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3)); |
346 | |
347 | std::vector<std::thread> ts; |
348 | boost::barrier barrier(ps.size() + 1); |
349 | for (size_t i = 0; i < ps.size(); i++) { |
350 | ts.emplace_back([&ps, &barrier, i]() { |
351 | barrier.wait(); |
352 | ps[i].setValue(i); |
353 | }); |
354 | } |
355 | |
356 | barrier.wait(); |
357 | |
358 | for (auto& t : ts) { |
359 | t.join(); |
360 | } |
361 | |
362 | executor.waitFor(f); |
363 | EXPECT_TRUE(f.isReady()); |
364 | for (size_t i = 0; i < ps.size(); i++) { |
365 | EXPECT_EQ(i, f.value()[i]); |
366 | } |
367 | } |
368 | |
369 | TEST(WindowExecutor, parallelWithError) { |
370 | ManualExecutor executor; |
371 | |
372 | std::vector<int> input; |
373 | std::vector<Promise<int>> ps(10); |
374 | for (size_t i = 0; i < ps.size(); i++) { |
375 | input.emplace_back(i); |
376 | } |
377 | auto f = collect( |
378 | window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3)); |
379 | |
380 | std::vector<std::thread> ts; |
381 | boost::barrier barrier(ps.size() + 1); |
382 | for (size_t i = 0; i < ps.size(); i++) { |
383 | ts.emplace_back([&ps, &barrier, i]() { |
384 | barrier.wait(); |
385 | if (i == (ps.size() / 2)) { |
386 | ps[i].setException(eggs); |
387 | } else { |
388 | ps[i].setValue(i); |
389 | } |
390 | }); |
391 | } |
392 | |
393 | barrier.wait(); |
394 | |
395 | for (auto& t : ts) { |
396 | t.join(); |
397 | } |
398 | |
399 | executor.waitFor(f); |
400 | EXPECT_TRUE(f.isReady()); |
401 | EXPECT_THROW(f.value(), eggs_t); |
402 | } |
403 | |
404 | TEST(WindowExecutor, allParallelWithError) { |
405 | ManualExecutor executor; |
406 | |
407 | std::vector<int> input; |
408 | std::vector<Promise<int>> ps(10); |
409 | for (size_t i = 0; i < ps.size(); i++) { |
410 | input.emplace_back(i); |
411 | } |
412 | auto f = collectAll( |
413 | window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3)); |
414 | |
415 | std::vector<std::thread> ts; |
416 | boost::barrier barrier(ps.size() + 1); |
417 | for (size_t i = 0; i < ps.size(); i++) { |
418 | ts.emplace_back([&ps, &barrier, i]() { |
419 | barrier.wait(); |
420 | if (i == (ps.size() / 2)) { |
421 | ps[i].setException(eggs); |
422 | } else { |
423 | ps[i].setValue(i); |
424 | } |
425 | }); |
426 | } |
427 | |
428 | barrier.wait(); |
429 | |
430 | for (auto& t : ts) { |
431 | t.join(); |
432 | } |
433 | |
434 | executor.waitFor(f); |
435 | EXPECT_TRUE(f.isReady()); |
436 | for (size_t i = 0; i < ps.size(); i++) { |
437 | if (i == (ps.size() / 2)) { |
438 | EXPECT_THROW(f.value()[i].value(), eggs_t); |
439 | } else { |
440 | EXPECT_TRUE(f.value()[i].hasValue()); |
441 | EXPECT_EQ(i, f.value()[i].value()); |
442 | } |
443 | } |
444 | } |
445 | |