1/*
2 * Copyright 2015-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <boost/thread/barrier.hpp>
18
19#include <folly/Conv.h>
20#include <folly/executors/ManualExecutor.h>
21#include <folly/futures/Future.h>
22#include <folly/portability/GTest.h>
23
24#include <vector>
25
26using namespace folly;
27
28typedef FutureException eggs_t;
29static eggs_t eggs("eggs");
30
31TEST(Window, basic) {
32 // int -> Future<int>
33 auto fn = [](std::vector<int> input, size_t window_size, size_t expect) {
34 auto res =
35 reduce(
36 window(input, [](int i) { return makeFuture(i); }, window_size),
37 0,
38 [](int sum, const Try<int>& b) { return sum + *b; })
39 .get();
40 EXPECT_EQ(expect, res);
41 };
42 {
43 SCOPED_TRACE("2 in-flight at a time");
44 std::vector<int> input = {1, 2, 3};
45 fn(input, 2, 6);
46 }
47 {
48 SCOPED_TRACE("4 in-flight at a time");
49 std::vector<int> input = {1, 2, 3};
50 fn(input, 4, 6);
51 }
52 {
53 SCOPED_TRACE("empty input");
54 std::vector<int> input;
55 fn(input, 1, 0);
56 }
57 {
58 // int -> Future<Unit>
59 auto res = reduce(
60 window(
61 std::vector<int>({1, 2, 3}),
62 [](int /* i */) { return makeFuture(); },
63 2),
64 0,
65 [](int sum, const Try<Unit>& b) {
66 EXPECT_TRUE(b.hasValue());
67 return sum + 1;
68 })
69 .get();
70 EXPECT_EQ(3, res);
71 }
72 {
73 // string -> return Future<int>
74 auto res = reduce(
75 window(
76 std::vector<std::string>{"1", "2", "3"},
77 [](std::string s) {
78 return makeFuture<int>(folly::to<int>(s));
79 },
80 2),
81 0,
82 [](int sum, const Try<int>& b) { return sum + *b; })
83 .get();
84 EXPECT_EQ(6, res);
85 }
86 {
87 // string -> return SemiFuture<int>
88 auto res = reduce(
89 window(
90 std::vector<std::string>{"1", "2", "3"},
91 [](std::string s) {
92 return makeSemiFuture<int>(folly::to<int>(s));
93 },
94 2),
95 0,
96 [](int sum, const Try<int>& b) { return sum + *b; })
97 .get();
98 EXPECT_EQ(6, res);
99 }
100 {
101 SCOPED_TRACE("repeat same fn");
102 auto res =
103 reduce(
104 window(
105 5UL,
106 [](size_t iteration) { return folly::makeFuture(iteration); },
107 2),
108 0UL,
109 [](size_t sum, const Try<size_t>& b) { return sum + b.value(); })
110 .get();
111 EXPECT_EQ(0 + 1 + 2 + 3 + 4, res);
112 }
113}
114
115TEST(Window, exception) {
116 std::vector<int> ints = {1, 2, 3, 4};
117 std::vector<Promise<int>> ps(4);
118
119 auto res = reduce(
120 window(
121 ints,
122 [&ps](int i) {
123 if (i > 2) {
124 throw std::runtime_error("exception should not kill process");
125 }
126 return ps[i].getFuture();
127 },
128 2),
129 0,
130 [](int sum, const Try<int>& b) {
131 sum += b.hasException<std::exception>() ? 1 : 0;
132 return sum;
133 });
134
135 for (auto& p : ps) {
136 p.setValue(0);
137 }
138
139 // Should have received 2 exceptions.
140 EXPECT_EQ(2, std::move(res).get());
141}
142
143TEST(Window, stackOverflow) {
144 // Number of futures to spawn.
145 static constexpr size_t m = 1000;
146 // Size of each block of input and output.
147 static constexpr size_t n = 1000;
148
149 std::vector<std::array<int, n>> ints;
150 int64_t expectedSum = 0;
151 for (size_t i = 0; i < m; i++) {
152 std::array<int, n> next{};
153 next[i % n] = i;
154 ints.emplace_back(next);
155 expectedSum += i;
156 }
157
158 // Try to overflow window's executor.
159 auto res = reduce(
160 window(
161 ints,
162 [](std::array<int, n> i) {
163 return folly::Future<std::array<int, n>>(i);
164 },
165 1),
166 static_cast<int64_t>(0),
167 [](int64_t sum, const Try<std::array<int, n>>& b) {
168 for (int a : b.value()) {
169 sum += a;
170 }
171 return sum;
172 });
173
174 EXPECT_EQ(std::move(res).get(), expectedSum);
175}
176
177TEST(Window, parallel) {
178 std::vector<int> input;
179 std::vector<Promise<int>> ps(10);
180 for (size_t i = 0; i < ps.size(); i++) {
181 input.emplace_back(i);
182 }
183 auto f = collect(window(input, [&](int i) { return ps[i].getFuture(); }, 3));
184
185 std::vector<std::thread> ts;
186 boost::barrier barrier(ps.size() + 1);
187 for (size_t i = 0; i < ps.size(); i++) {
188 ts.emplace_back([&ps, &barrier, i]() {
189 barrier.wait();
190 ps[i].setValue(i);
191 });
192 }
193
194 barrier.wait();
195
196 for (auto& t : ts) {
197 t.join();
198 }
199
200 EXPECT_TRUE(f.isReady());
201 for (size_t i = 0; i < ps.size(); i++) {
202 EXPECT_EQ(i, f.value()[i]);
203 }
204}
205
206TEST(Window, parallelWithError) {
207 std::vector<int> input;
208 std::vector<Promise<int>> ps(10);
209 for (size_t i = 0; i < ps.size(); i++) {
210 input.emplace_back(i);
211 }
212 auto f = collect(window(input, [&](int i) { return ps[i].getFuture(); }, 3));
213
214 std::vector<std::thread> ts;
215 boost::barrier barrier(ps.size() + 1);
216 for (size_t i = 0; i < ps.size(); i++) {
217 ts.emplace_back([&ps, &barrier, i]() {
218 barrier.wait();
219 if (i == (ps.size() / 2)) {
220 ps[i].setException(eggs);
221 } else {
222 ps[i].setValue(i);
223 }
224 });
225 }
226
227 barrier.wait();
228
229 for (auto& t : ts) {
230 t.join();
231 }
232
233 EXPECT_TRUE(f.isReady());
234 EXPECT_THROW(f.value(), eggs_t);
235}
236
237TEST(Window, allParallelWithError) {
238 std::vector<int> input;
239 std::vector<Promise<int>> ps(10);
240 for (size_t i = 0; i < ps.size(); i++) {
241 input.emplace_back(i);
242 }
243 auto f =
244 collectAll(window(input, [&](int i) { return ps[i].getFuture(); }, 3));
245
246 std::vector<std::thread> ts;
247 boost::barrier barrier(ps.size() + 1);
248 for (size_t i = 0; i < ps.size(); i++) {
249 ts.emplace_back([&ps, &barrier, i]() {
250 barrier.wait();
251 if (i == (ps.size() / 2)) {
252 ps[i].setException(eggs);
253 } else {
254 ps[i].setValue(i);
255 }
256 });
257 }
258
259 barrier.wait();
260
261 for (auto& t : ts) {
262 t.join();
263 }
264
265 EXPECT_TRUE(f.isReady());
266 for (size_t i = 0; i < ps.size(); i++) {
267 if (i == (ps.size() / 2)) {
268 EXPECT_THROW(f.value()[i].value(), eggs_t);
269 } else {
270 EXPECT_TRUE(f.value()[i].hasValue());
271 EXPECT_EQ(i, f.value()[i].value());
272 }
273 }
274}
275
276TEST(WindowExecutor, basic) {
277 ManualExecutor executor;
278
279 // int -> Future<int>
280 auto fn = [executor_ = &executor](
281 std::vector<int> input, size_t window_size, size_t expect) {
282 auto res = reduce(
283 window(
284 executor_, input, [](int i) { return makeFuture(i); }, window_size),
285 0,
286 [](int sum, const Try<int>& b) { return sum + *b; });
287 executor_->waitFor(res);
288 EXPECT_EQ(expect, std::move(res).get());
289 };
290 {
291 SCOPED_TRACE("2 in-flight at a time");
292 std::vector<int> input = {1, 2, 3};
293 fn(input, 2, 6);
294 }
295 {
296 SCOPED_TRACE("4 in-flight at a time");
297 std::vector<int> input = {1, 2, 3};
298 fn(input, 4, 6);
299 }
300 {
301 SCOPED_TRACE("empty input");
302 std::vector<int> input;
303 fn(input, 1, 0);
304 }
305 {
306 // int -> Future<Unit>
307 auto res = reduce(
308 window(
309 &executor,
310 std::vector<int>({1, 2, 3}),
311 [](int /* i */) { return makeFuture(); },
312 2),
313 0,
314 [](int sum, const Try<Unit>& b) {
315 EXPECT_TRUE(b.hasValue());
316 return sum + 1;
317 });
318 executor.waitFor(res);
319 EXPECT_EQ(3, std::move(res).get());
320 }
321 {
322 // string -> return Future<int>
323 auto res = reduce(
324 window(
325 &executor,
326 std::vector<std::string>{"1", "2", "3"},
327 [](std::string s) { return makeFuture<int>(folly::to<int>(s)); },
328 2),
329 0,
330 [](int sum, const Try<int>& b) { return sum + *b; });
331 executor.waitFor(res);
332 EXPECT_EQ(6, std::move(res).get());
333 }
334}
335
336TEST(WindowExecutor, parallel) {
337 ManualExecutor executor;
338
339 std::vector<int> input;
340 std::vector<Promise<int>> ps(10);
341 for (size_t i = 0; i < ps.size(); i++) {
342 input.emplace_back(i);
343 }
344 auto f = collect(
345 window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
346
347 std::vector<std::thread> ts;
348 boost::barrier barrier(ps.size() + 1);
349 for (size_t i = 0; i < ps.size(); i++) {
350 ts.emplace_back([&ps, &barrier, i]() {
351 barrier.wait();
352 ps[i].setValue(i);
353 });
354 }
355
356 barrier.wait();
357
358 for (auto& t : ts) {
359 t.join();
360 }
361
362 executor.waitFor(f);
363 EXPECT_TRUE(f.isReady());
364 for (size_t i = 0; i < ps.size(); i++) {
365 EXPECT_EQ(i, f.value()[i]);
366 }
367}
368
369TEST(WindowExecutor, parallelWithError) {
370 ManualExecutor executor;
371
372 std::vector<int> input;
373 std::vector<Promise<int>> ps(10);
374 for (size_t i = 0; i < ps.size(); i++) {
375 input.emplace_back(i);
376 }
377 auto f = collect(
378 window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
379
380 std::vector<std::thread> ts;
381 boost::barrier barrier(ps.size() + 1);
382 for (size_t i = 0; i < ps.size(); i++) {
383 ts.emplace_back([&ps, &barrier, i]() {
384 barrier.wait();
385 if (i == (ps.size() / 2)) {
386 ps[i].setException(eggs);
387 } else {
388 ps[i].setValue(i);
389 }
390 });
391 }
392
393 barrier.wait();
394
395 for (auto& t : ts) {
396 t.join();
397 }
398
399 executor.waitFor(f);
400 EXPECT_TRUE(f.isReady());
401 EXPECT_THROW(f.value(), eggs_t);
402}
403
404TEST(WindowExecutor, allParallelWithError) {
405 ManualExecutor executor;
406
407 std::vector<int> input;
408 std::vector<Promise<int>> ps(10);
409 for (size_t i = 0; i < ps.size(); i++) {
410 input.emplace_back(i);
411 }
412 auto f = collectAll(
413 window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
414
415 std::vector<std::thread> ts;
416 boost::barrier barrier(ps.size() + 1);
417 for (size_t i = 0; i < ps.size(); i++) {
418 ts.emplace_back([&ps, &barrier, i]() {
419 barrier.wait();
420 if (i == (ps.size() / 2)) {
421 ps[i].setException(eggs);
422 } else {
423 ps[i].setValue(i);
424 }
425 });
426 }
427
428 barrier.wait();
429
430 for (auto& t : ts) {
431 t.join();
432 }
433
434 executor.waitFor(f);
435 EXPECT_TRUE(f.isReady());
436 for (size_t i = 0; i < ps.size(); i++) {
437 if (i == (ps.size() / 2)) {
438 EXPECT_THROW(f.value()[i].value(), eggs_t);
439 } else {
440 EXPECT_TRUE(f.value()[i].hasValue());
441 EXPECT_EQ(i, f.value()[i].value());
442 }
443 }
444}
445