1/*
2 * Copyright 2015-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <numeric>
18
19#include <boost/thread/barrier.hpp>
20
21#include <folly/Random.h>
22#include <folly/futures/Future.h>
23#include <folly/portability/GTest.h>
24#include <folly/small_vector.h>
25
26using namespace folly;
27
28typedef FutureException eggs_t;
29static eggs_t eggs("eggs");
30
31auto rng = std::mt19937(folly::randomNumberSeed());
32
33TEST(Collect, collectAll) {
34 // returns a vector variant
35 {
36 std::vector<Promise<int>> promises(10);
37 std::vector<Future<int>> futures;
38
39 for (auto& p : promises) {
40 futures.push_back(p.getFuture());
41 }
42
43 auto allf = collectAll(futures);
44
45 std::shuffle(promises.begin(), promises.end(), rng);
46 for (auto& p : promises) {
47 EXPECT_FALSE(allf.isReady());
48 p.setValue(42);
49 }
50
51 EXPECT_TRUE(allf.isReady());
52 auto& results = allf.value();
53 for (auto& t : results) {
54 EXPECT_EQ(42, t.value());
55 }
56 }
57
58 // check error semantics
59 {
60 std::vector<Promise<int>> promises(4);
61 std::vector<Future<int>> futures;
62
63 for (auto& p : promises) {
64 futures.push_back(p.getFuture());
65 }
66
67 auto allf = collectAll(futures);
68
69 promises[0].setValue(42);
70 promises[1].setException(eggs);
71
72 EXPECT_FALSE(allf.isReady());
73
74 promises[2].setValue(42);
75
76 EXPECT_FALSE(allf.isReady());
77
78 promises[3].setException(eggs);
79
80 EXPECT_TRUE(allf.isReady());
81 EXPECT_FALSE(allf.getTry().hasException());
82
83 auto& results = allf.value();
84 EXPECT_EQ(42, results[0].value());
85 EXPECT_TRUE(results[1].hasException());
86 EXPECT_EQ(42, results[2].value());
87 EXPECT_TRUE(results[3].hasException());
88 }
89
90 // check that futures are ready in thenValue()
91 {
92 std::vector<Promise<Unit>> promises(10);
93 std::vector<Future<Unit>> futures;
94
95 for (auto& p : promises) {
96 futures.push_back(p.getFuture());
97 }
98
99 auto allf = collectAllSemiFuture(futures).toUnsafeFuture().thenTry(
100 [](Try<std::vector<Try<Unit>>>&& ts) {
101 for (auto& f : ts.value()) {
102 f.value();
103 }
104 });
105
106 std::shuffle(promises.begin(), promises.end(), rng);
107 for (auto& p : promises) {
108 p.setValue();
109 }
110 EXPECT_TRUE(allf.isReady());
111 }
112}
113
114TEST(Collect, collect) {
115 // success case
116 {
117 std::vector<Promise<int>> promises(10);
118 std::vector<Future<int>> futures;
119
120 for (auto& p : promises) {
121 futures.push_back(p.getFuture());
122 }
123
124 auto allf = collect(futures);
125
126 std::shuffle(promises.begin(), promises.end(), rng);
127 for (auto& p : promises) {
128 EXPECT_FALSE(allf.isReady());
129 p.setValue(42);
130 }
131
132 EXPECT_TRUE(allf.isReady());
133 for (auto i : allf.value()) {
134 EXPECT_EQ(42, i);
135 }
136 }
137
138 // failure case
139 {
140 std::vector<Promise<int>> promises(10);
141 std::vector<Future<int>> futures;
142
143 for (auto& p : promises) {
144 futures.push_back(p.getFuture());
145 }
146
147 auto allf = collect(futures);
148
149 std::shuffle(promises.begin(), promises.end(), rng);
150 for (int i = 0; i < 10; i++) {
151 if (i < 5) {
152 // everthing goes well so far...
153 EXPECT_FALSE(allf.isReady());
154 promises[i].setValue(42);
155 } else if (i == 5) {
156 // short circuit with an exception
157 EXPECT_FALSE(allf.isReady());
158 promises[i].setException(eggs);
159 EXPECT_TRUE(allf.isReady());
160 } else if (i < 8) {
161 // don't blow up on further values
162 EXPECT_TRUE(allf.isReady());
163 promises[i].setValue(42);
164 } else {
165 // don't blow up on further exceptions
166 EXPECT_TRUE(allf.isReady());
167 promises[i].setException(eggs);
168 }
169 }
170
171 EXPECT_THROW(allf.value(), eggs_t);
172 }
173
174 // void futures success case
175 {
176 std::vector<Promise<Unit>> promises(10);
177 std::vector<Future<Unit>> futures;
178
179 for (auto& p : promises) {
180 futures.push_back(p.getFuture());
181 }
182
183 auto allf = collect(futures);
184
185 std::shuffle(promises.begin(), promises.end(), rng);
186 for (auto& p : promises) {
187 EXPECT_FALSE(allf.isReady());
188 p.setValue();
189 }
190
191 EXPECT_TRUE(allf.isReady());
192 }
193
194 // void futures failure case
195 {
196 std::vector<Promise<Unit>> promises(10);
197 std::vector<Future<Unit>> futures;
198
199 for (auto& p : promises) {
200 futures.push_back(p.getFuture());
201 }
202
203 auto allf = collect(futures);
204
205 std::shuffle(promises.begin(), promises.end(), rng);
206 for (int i = 0; i < 10; i++) {
207 if (i < 5) {
208 // everthing goes well so far...
209 EXPECT_FALSE(allf.isReady());
210 promises[i].setValue();
211 } else if (i == 5) {
212 // short circuit with an exception
213 EXPECT_FALSE(allf.isReady());
214 promises[i].setException(eggs);
215 EXPECT_TRUE(allf.isReady());
216 } else if (i < 8) {
217 // don't blow up on further values
218 EXPECT_TRUE(allf.isReady());
219 promises[i].setValue();
220 } else {
221 // don't blow up on further exceptions
222 EXPECT_TRUE(allf.isReady());
223 promises[i].setException(eggs);
224 }
225 }
226
227 EXPECT_THROW(allf.value(), eggs_t);
228 }
229
230 // move only compiles
231 {
232 std::vector<Promise<std::unique_ptr<int>>> promises(10);
233 std::vector<Future<std::unique_ptr<int>>> futures;
234
235 for (auto& p : promises) {
236 futures.push_back(p.getFuture());
237 }
238
239 collect(futures);
240 }
241}
242
243struct NotDefaultConstructible {
244 NotDefaultConstructible() = delete;
245 explicit NotDefaultConstructible(int arg) : i(arg) {}
246 int i;
247};
248
249// We have a specialized implementation for non-default-constructible objects
250// Ensure that it works and preserves order
251TEST(Collect, collectNotDefaultConstructible) {
252 std::vector<Promise<NotDefaultConstructible>> promises(10);
253 std::vector<Future<NotDefaultConstructible>> futures;
254 std::vector<int> indices(10);
255 std::iota(indices.begin(), indices.end(), 0);
256 std::shuffle(indices.begin(), indices.end(), rng);
257
258 for (auto& p : promises) {
259 futures.push_back(p.getFuture());
260 }
261
262 auto allf = collect(futures);
263
264 for (auto i : indices) {
265 EXPECT_FALSE(allf.isReady());
266 promises[i].setValue(NotDefaultConstructible(i));
267 }
268
269 EXPECT_TRUE(allf.isReady());
270 int i = 0;
271 for (auto val : allf.value()) {
272 EXPECT_EQ(i, val.i);
273 i++;
274 }
275}
276
277TEST(Collect, collectAny) {
278 {
279 std::vector<Promise<int>> promises(10);
280 std::vector<Future<int>> futures;
281
282 for (auto& p : promises) {
283 futures.push_back(p.getFuture());
284 }
285
286 for (auto& f : futures) {
287 EXPECT_FALSE(f.isReady());
288 }
289
290 auto anyf = collectAny(futures);
291
292 /* futures were moved in, so these are invalid now */
293 EXPECT_FALSE(anyf.isReady());
294
295 promises[7].setValue(42);
296 EXPECT_TRUE(anyf.isReady());
297 auto& idx_fut = anyf.value();
298
299 auto i = idx_fut.first;
300 EXPECT_EQ(7, i);
301
302 auto& f = idx_fut.second;
303 EXPECT_EQ(42, f.value());
304 }
305
306 // error
307 {
308 std::vector<Promise<Unit>> promises(10);
309 std::vector<Future<Unit>> futures;
310
311 for (auto& p : promises) {
312 futures.push_back(p.getFuture());
313 }
314
315 for (auto& f : futures) {
316 EXPECT_FALSE(f.isReady());
317 }
318
319 auto anyf = collectAny(futures);
320
321 EXPECT_FALSE(anyf.isReady());
322
323 promises[3].setException(eggs);
324 EXPECT_TRUE(anyf.isReady());
325 EXPECT_TRUE(anyf.value().second.hasException());
326 }
327
328 // thenValue()
329 {
330 std::vector<Promise<int>> promises(10);
331 std::vector<Future<int>> futures;
332
333 for (auto& p : promises) {
334 futures.push_back(p.getFuture());
335 }
336
337 auto anyf = collectAny(futures).thenValue(
338 [](std::pair<size_t, Try<int>> p) { EXPECT_EQ(42, p.second.value()); });
339
340 promises[3].setValue(42);
341 EXPECT_TRUE(anyf.isReady());
342 }
343}
344
345TEST(Collect, collectAnyWithoutException) {
346 auto& executor = folly::InlineExecutor::instance();
347
348 {
349 std::vector<Promise<int>> promises(10);
350 std::vector<Future<int>> futures;
351
352 for (auto& p : promises) {
353 futures.push_back(p.getFuture());
354 }
355
356 auto onef = collectAnyWithoutException(futures).via(&executor);
357
358 /* futures were moved in, so these are invalid now */
359 EXPECT_FALSE(onef.isReady());
360
361 promises[7].setValue(42);
362 EXPECT_TRUE(onef.isReady());
363 auto& idx_fut = onef.value();
364 EXPECT_EQ(7, idx_fut.first);
365 EXPECT_EQ(42, idx_fut.second);
366 }
367
368 // some exception before ready
369 {
370 std::vector<Promise<int>> promises(10);
371 std::vector<Future<int>> futures;
372
373 for (auto& p : promises) {
374 futures.push_back(p.getFuture());
375 }
376
377 auto onef = collectAnyWithoutException(futures).via(&executor);
378
379 EXPECT_FALSE(onef.isReady());
380
381 promises[3].setException(eggs);
382 EXPECT_FALSE(onef.isReady());
383 promises[4].setException(eggs);
384 EXPECT_FALSE(onef.isReady());
385 promises[0].setValue(99);
386 EXPECT_TRUE(onef.isReady());
387 auto& idx_fut = onef.value();
388 EXPECT_EQ(0, idx_fut.first);
389 EXPECT_EQ(99, idx_fut.second);
390 }
391
392 // all exceptions
393 {
394 std::vector<Promise<int>> promises(10);
395 std::vector<Future<int>> futures;
396
397 for (auto& p : promises) {
398 futures.push_back(p.getFuture());
399 }
400
401 auto onef = collectAnyWithoutException(futures).via(&executor);
402
403 EXPECT_FALSE(onef.isReady());
404 for (int i = 0; i < 9; ++i) {
405 promises[i].setException(eggs);
406 }
407 EXPECT_FALSE(onef.isReady());
408
409 promises[9].setException(eggs);
410 EXPECT_TRUE(onef.isReady());
411 EXPECT_TRUE(onef.hasException());
412 }
413
414 // Deferred work
415 {
416 std::vector<Promise<int>> promises(10);
417
418 auto onef = [&] {
419 std::vector<SemiFuture<int>> futures;
420
421 for (auto& p : promises) {
422 futures.push_back(
423 p.getSemiFuture().deferValue([](auto v) { return v; }));
424 }
425 return collectAnyWithoutException(futures);
426 }();
427
428 /* futures were moved in, so these are invalid now */
429
430 promises[7].setValue(42);
431 auto idx_fut = std::move(onef).get();
432 EXPECT_EQ(7, idx_fut.first);
433 EXPECT_EQ(42, idx_fut.second);
434 }
435}
436
437TEST(Collect, alreadyCompleted) {
438 {
439 std::vector<Future<Unit>> fs;
440 for (int i = 0; i < 10; i++) {
441 fs.push_back(makeFuture());
442 }
443
444 collectAllSemiFuture(fs).toUnsafeFuture().thenValue(
445 [&](std::vector<Try<Unit>> ts) { EXPECT_EQ(fs.size(), ts.size()); });
446 }
447 {
448 std::vector<Future<int>> fs;
449 for (int i = 0; i < 10; i++) {
450 fs.push_back(makeFuture(i));
451 }
452
453 collectAny(fs).thenValue([&](std::pair<size_t, Try<int>> p) {
454 EXPECT_EQ(p.first, p.second.value());
455 });
456 }
457}
458
459TEST(Collect, parallel) {
460 std::vector<Promise<int>> ps(10);
461 std::vector<Future<int>> fs;
462 for (size_t i = 0; i < ps.size(); i++) {
463 fs.emplace_back(ps[i].getFuture());
464 }
465 auto f = collect(fs);
466
467 std::vector<std::thread> ts;
468 boost::barrier barrier(ps.size() + 1);
469 for (size_t i = 0; i < ps.size(); i++) {
470 ts.emplace_back([&ps, &barrier, i]() {
471 barrier.wait();
472 ps[i].setValue(i);
473 });
474 }
475
476 barrier.wait();
477
478 for (size_t i = 0; i < ps.size(); i++) {
479 ts[i].join();
480 }
481
482 EXPECT_TRUE(f.isReady());
483 for (size_t i = 0; i < ps.size(); i++) {
484 EXPECT_EQ(i, f.value()[i]);
485 }
486}
487
488TEST(Collect, parallelWithError) {
489 std::vector<Promise<int>> ps(10);
490 std::vector<Future<int>> fs;
491 for (size_t i = 0; i < ps.size(); i++) {
492 fs.emplace_back(ps[i].getFuture());
493 }
494 auto f = collect(fs);
495
496 std::vector<std::thread> ts;
497 boost::barrier barrier(ps.size() + 1);
498 for (size_t i = 0; i < ps.size(); i++) {
499 ts.emplace_back([&ps, &barrier, i]() {
500 barrier.wait();
501 if (i == (ps.size() / 2)) {
502 ps[i].setException(eggs);
503 } else {
504 ps[i].setValue(i);
505 }
506 });
507 }
508
509 barrier.wait();
510
511 for (size_t i = 0; i < ps.size(); i++) {
512 ts[i].join();
513 }
514
515 EXPECT_TRUE(f.isReady());
516 EXPECT_THROW(f.value(), eggs_t);
517}
518
519TEST(Collect, allParallel) {
520 std::vector<Promise<int>> ps(10);
521 std::vector<Future<int>> fs;
522 for (size_t i = 0; i < ps.size(); i++) {
523 fs.emplace_back(ps[i].getFuture());
524 }
525 auto f = collectAll(fs);
526
527 std::vector<std::thread> ts;
528 boost::barrier barrier(ps.size() + 1);
529 for (size_t i = 0; i < ps.size(); i++) {
530 ts.emplace_back([&ps, &barrier, i]() {
531 barrier.wait();
532 ps[i].setValue(i);
533 });
534 }
535
536 barrier.wait();
537
538 for (size_t i = 0; i < ps.size(); i++) {
539 ts[i].join();
540 }
541
542 EXPECT_TRUE(f.isReady());
543 for (size_t i = 0; i < ps.size(); i++) {
544 EXPECT_TRUE(f.value()[i].hasValue());
545 EXPECT_EQ(i, f.value()[i].value());
546 }
547}
548
549TEST(Collect, allParallelWithError) {
550 std::vector<Promise<int>> ps(10);
551 std::vector<Future<int>> fs;
552 for (size_t i = 0; i < ps.size(); i++) {
553 fs.emplace_back(ps[i].getFuture());
554 }
555 auto f = collectAll(fs);
556
557 std::vector<std::thread> ts;
558 boost::barrier barrier(ps.size() + 1);
559 for (size_t i = 0; i < ps.size(); i++) {
560 ts.emplace_back([&ps, &barrier, i]() {
561 barrier.wait();
562 if (i == (ps.size() / 2)) {
563 ps[i].setException(eggs);
564 } else {
565 ps[i].setValue(i);
566 }
567 });
568 }
569
570 barrier.wait();
571
572 for (size_t i = 0; i < ps.size(); i++) {
573 ts[i].join();
574 }
575
576 EXPECT_TRUE(f.isReady());
577 for (size_t i = 0; i < ps.size(); i++) {
578 if (i == (ps.size() / 2)) {
579 EXPECT_THROW(f.value()[i].value(), eggs_t);
580 } else {
581 EXPECT_TRUE(f.value()[i].hasValue());
582 EXPECT_EQ(i, f.value()[i].value());
583 }
584 }
585}
586
587TEST(Collect, collectN) {
588 std::vector<Promise<Unit>> promises(10);
589 std::vector<Future<Unit>> futures;
590
591 for (auto& p : promises) {
592 futures.push_back(p.getFuture());
593 }
594
595 bool flag = false;
596 size_t n = 3;
597 collectN(futures, n)
598 .via(&InlineExecutor::instance())
599 .thenValue([&](std::vector<std::pair<size_t, Try<Unit>>> v) {
600 flag = true;
601 EXPECT_EQ(n, v.size());
602 for (auto& tt : v) {
603 EXPECT_TRUE(tt.second.hasValue());
604 }
605 });
606
607 promises[0].setValue();
608 EXPECT_FALSE(flag);
609 promises[1].setValue();
610 EXPECT_FALSE(flag);
611 promises[2].setValue();
612 EXPECT_TRUE(flag);
613}
614
615TEST(Collect, collectNParallel) {
616 std::vector<Promise<Unit>> ps(100);
617 std::vector<Future<Unit>> futures;
618
619 for (auto& p : ps) {
620 futures.push_back(p.getFuture());
621 }
622
623 bool flag = false;
624 size_t n = 90;
625 collectN(futures, n)
626 .via(&InlineExecutor::instance())
627 .thenValue([&](std::vector<std::pair<size_t, Try<Unit>>> v) {
628 flag = true;
629 EXPECT_EQ(n, v.size());
630 for (auto& tt : v) {
631 EXPECT_TRUE(tt.second.hasValue());
632 }
633 });
634
635 std::vector<std::thread> ts;
636 boost::barrier barrier(ps.size() + 1);
637 for (size_t i = 0; i < ps.size(); i++) {
638 ts.emplace_back([&ps, &barrier, i]() {
639 barrier.wait();
640 ps[i].setValue();
641 });
642 }
643
644 barrier.wait();
645
646 for (size_t i = 0; i < ps.size(); i++) {
647 ts[i].join();
648 }
649
650 EXPECT_TRUE(flag);
651}
652
653/// Ensure that we can compile collectAll/Any with folly::small_vector
654TEST(Collect, smallVector) {
655 static_assert(
656 !folly::is_trivially_copyable<Future<Unit>>::value,
657 "Futures should not be trivially copyable");
658 static_assert(
659 !folly::is_trivially_copyable<Future<int>>::value,
660 "Futures should not be trivially copyable");
661
662 {
663 folly::small_vector<Future<Unit>> futures;
664
665 for (int i = 0; i < 10; i++) {
666 futures.push_back(makeFuture());
667 }
668
669 auto anyf = collectAny(futures);
670 }
671 {
672 folly::small_vector<Future<Unit>> futures;
673
674 for (int i = 0; i < 10; i++) {
675 futures.push_back(makeFuture());
676 }
677
678 auto allf = collectAll(futures);
679 }
680}
681
682TEST(Collect, collectAllVariadic) {
683 Promise<bool> pb;
684 Promise<int> pi;
685 Future<bool> fb = pb.getFuture();
686 Future<int> fi = pi.getFuture();
687 bool flag = false;
688 collectAllSemiFuture(std::move(fb), std::move(fi))
689 .toUnsafeFuture()
690 .thenValue([&](std::tuple<Try<bool>, Try<int>> tup) {
691 flag = true;
692 EXPECT_TRUE(std::get<0>(tup).hasValue());
693 EXPECT_EQ(std::get<0>(tup).value(), true);
694 EXPECT_TRUE(std::get<1>(tup).hasValue());
695 EXPECT_EQ(std::get<1>(tup).value(), 42);
696 });
697 pb.setValue(true);
698 EXPECT_FALSE(flag);
699 pi.setValue(42);
700 EXPECT_TRUE(flag);
701}
702
703TEST(Collect, collectAllVariadicReferences) {
704 Promise<bool> pb;
705 Promise<int> pi;
706 Future<bool> fb = pb.getFuture();
707 Future<int> fi = pi.getFuture();
708 bool flag = false;
709 collectAllSemiFuture(fb, fi).toUnsafeFuture().thenValue(
710 [&](std::tuple<Try<bool>, Try<int>> tup) {
711 flag = true;
712 EXPECT_TRUE(std::get<0>(tup).hasValue());
713 EXPECT_EQ(std::get<0>(tup).value(), true);
714 EXPECT_TRUE(std::get<1>(tup).hasValue());
715 EXPECT_EQ(std::get<1>(tup).value(), 42);
716 });
717 pb.setValue(true);
718 EXPECT_FALSE(flag);
719 pi.setValue(42);
720 EXPECT_TRUE(flag);
721}
722
723TEST(Collect, collectAllVariadicWithException) {
724 Promise<bool> pb;
725 Promise<int> pi;
726 Future<bool> fb = pb.getFuture();
727 Future<int> fi = pi.getFuture();
728 bool flag = false;
729 collectAllSemiFuture(std::move(fb), std::move(fi))
730 .toUnsafeFuture()
731 .thenValue([&](std::tuple<Try<bool>, Try<int>> tup) {
732 flag = true;
733 EXPECT_TRUE(std::get<0>(tup).hasValue());
734 EXPECT_EQ(std::get<0>(tup).value(), true);
735 EXPECT_TRUE(std::get<1>(tup).hasException());
736 EXPECT_THROW(std::get<1>(tup).value(), eggs_t);
737 });
738 pb.setValue(true);
739 EXPECT_FALSE(flag);
740 pi.setException(eggs);
741 EXPECT_TRUE(flag);
742}
743
744TEST(Collect, collectVariadic) {
745 Promise<bool> pb;
746 Promise<int> pi;
747 Future<bool> fb = pb.getFuture();
748 Future<int> fi = pi.getFuture();
749 bool flag = false;
750 collect(std::move(fb), std::move(fi))
751 .thenValue([&](std::tuple<bool, int> tup) {
752 flag = true;
753 EXPECT_EQ(std::get<0>(tup), true);
754 EXPECT_EQ(std::get<1>(tup), 42);
755 });
756 pb.setValue(true);
757 EXPECT_FALSE(flag);
758 pi.setValue(42);
759 EXPECT_TRUE(flag);
760}
761
762TEST(Collect, collectVariadicWithException) {
763 Promise<bool> pb;
764 Promise<int> pi;
765 Future<bool> fb = pb.getFuture();
766 Future<int> fi = pi.getFuture();
767 auto f = collect(std::move(fb), std::move(fi));
768 pb.setValue(true);
769 EXPECT_FALSE(f.isReady());
770 pi.setException(eggs);
771 EXPECT_TRUE(f.isReady());
772 EXPECT_TRUE(f.getTry().hasException());
773 EXPECT_THROW(std::move(f).get(), eggs_t);
774}
775
776TEST(Collect, collectAllNone) {
777 std::vector<Future<int>> fs;
778 auto f = collectAll(fs);
779 EXPECT_TRUE(f.isReady());
780}
781
782TEST(Collect, noDefaultConstructor) {
783 struct A {
784 explicit A(size_t /* x */) {}
785 };
786
787 auto f1 = makeFuture(A(1));
788 auto f2 = makeFuture(A(2));
789
790 auto f = collect(std::move(f1), std::move(f2));
791}
792