1/*
2 * Copyright 2014-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <thread>
18
19#include <folly/MPMCQueue.h>
20#include <folly/executors/DrivableExecutor.h>
21#include <folly/executors/InlineExecutor.h>
22#include <folly/executors/ManualExecutor.h>
23#include <folly/futures/Future.h>
24#include <folly/portability/GTest.h>
25#include <folly/synchronization/Baton.h>
26
27using namespace folly;
28
29struct ManualWaiter : public DrivableExecutor {
30 explicit ManualWaiter(std::shared_ptr<ManualExecutor> ex_) : ex(ex_) {}
31
32 void add(Func f) override {
33 ex->add(std::move(f));
34 }
35
36 void drive() override {
37 ex->wait();
38 ex->run();
39 }
40
41 std::shared_ptr<ManualExecutor> ex;
42};
43
44struct ViaFixture : public testing::Test {
45 ViaFixture()
46 : westExecutor(new ManualExecutor),
47 eastExecutor(new ManualExecutor),
48 waiter(new ManualWaiter(westExecutor)),
49 done(false) {
50 th = std::thread([=] {
51 ManualWaiter eastWaiter(eastExecutor);
52 while (!done) {
53 eastWaiter.drive();
54 }
55 });
56 }
57
58 ~ViaFixture() override {
59 done = true;
60 eastExecutor->add([=]() {});
61 th.join();
62 }
63
64 void addAsync(int a, int b, std::function<void(int&&)>&& cob) {
65 eastExecutor->add([=]() { cob(a + b); });
66 }
67
68 std::shared_ptr<ManualExecutor> westExecutor;
69 std::shared_ptr<ManualExecutor> eastExecutor;
70 std::shared_ptr<ManualWaiter> waiter;
71 InlineExecutor inlineExecutor;
72 std::atomic<bool> done;
73 std::thread th;
74};
75
76TEST(Via, exceptionOnLaunch) {
77 auto future = makeFuture<int>(std::runtime_error("E"));
78 EXPECT_THROW(future.value(), std::runtime_error);
79}
80
81TEST(Via, thenValue) {
82 auto future = makeFuture(std::move(1)).thenTry([](Try<int>&& t) {
83 return t.value() == 1;
84 });
85
86 EXPECT_TRUE(future.value());
87}
88
89TEST(Via, thenFuture) {
90 auto future = makeFuture(1).thenTry(
91 [](Try<int>&& t) { return makeFuture(t.value() == 1); });
92 EXPECT_TRUE(future.value());
93}
94
95static Future<std::string> doWorkStatic(Try<std::string>&& t) {
96 return makeFuture(t.value() + ";static");
97}
98
99TEST(Via, thenFunction) {
100 struct Worker {
101 Future<std::string> doWork(Try<std::string>&& t) {
102 return makeFuture(t.value() + ";class");
103 }
104 static Future<std::string> doWorkStatic(Try<std::string>&& t) {
105 return makeFuture(t.value() + ";class-static");
106 }
107 } w;
108
109 auto f = makeFuture(std::string("start"))
110 .thenTry(doWorkStatic)
111 .thenTry(Worker::doWorkStatic)
112 .then(&Worker::doWork, &w);
113
114 EXPECT_EQ(f.value(), "start;static;class-static;class");
115}
116
117TEST_F(ViaFixture, threadHops) {
118 auto westThreadId = std::this_thread::get_id();
119 auto f = via(eastExecutor.get())
120 .thenTry([=](Try<Unit>&& /* t */) {
121 EXPECT_NE(std::this_thread::get_id(), westThreadId);
122 return makeFuture<int>(1);
123 })
124 .via(westExecutor.get())
125 .thenTry([=](Try<int>&& t) {
126 EXPECT_EQ(std::this_thread::get_id(), westThreadId);
127 return t.value();
128 });
129 EXPECT_EQ(f.getVia(waiter.get()), 1);
130}
131
132TEST_F(ViaFixture, chainVias) {
133 auto westThreadId = std::this_thread::get_id();
134 auto f = via(eastExecutor.get())
135 .thenValue([=](auto&&) {
136 EXPECT_NE(std::this_thread::get_id(), westThreadId);
137 return 1;
138 })
139 .thenValue([=](int val) {
140 return makeFuture(val)
141 .via(westExecutor.get())
142 .thenValue([=](int v) mutable {
143 EXPECT_EQ(std::this_thread::get_id(), westThreadId);
144 return v + 1;
145 });
146 })
147 .thenValue([=](int val) {
148 // even though ultimately the future that triggers this one
149 // executed in the west thread, this thenValue() inherited the
150 // executor from its predecessor, ie the eastExecutor.
151 EXPECT_NE(std::this_thread::get_id(), westThreadId);
152 return val + 1;
153 })
154 .via(westExecutor.get())
155 .thenValue([=](int val) {
156 // go back to west, so we can wait on it
157 EXPECT_EQ(std::this_thread::get_id(), westThreadId);
158 return val + 1;
159 });
160
161 EXPECT_EQ(f.getVia(waiter.get()), 4);
162}
163
164TEST_F(ViaFixture, bareViaAssignment) {
165 auto f = via(eastExecutor.get());
166}
167TEST_F(ViaFixture, viaAssignment) {
168 // via()&&
169 auto f = makeFuture().via(eastExecutor.get());
170 // via()&
171 auto f2 = f.via(eastExecutor.get());
172}
173
174TEST(Via, chain1) {
175 EXPECT_EQ(42, makeFuture().thenMulti([] { return 42; }).get());
176}
177
178TEST(Via, chain3) {
179 int count = 0;
180 auto f = makeFuture().thenMulti(
181 [&] {
182 count++;
183 return 3.14159;
184 },
185 [&](double) {
186 count++;
187 return std::string("hello");
188 },
189 [&] {
190 count++;
191 return makeFuture(42);
192 });
193 EXPECT_EQ(42, std::move(f).get());
194 EXPECT_EQ(3, count);
195}
196
197struct PriorityExecutor : public Executor {
198 void add(Func /* f */) override {}
199
200 void addWithPriority(Func f, int8_t priority) override {
201 int mid = getNumPriorities() / 2;
202 int p = priority < 0 ? std::max(0, mid + priority)
203 : std::min(getNumPriorities() - 1, mid + priority);
204 EXPECT_LT(p, 3);
205 EXPECT_GE(p, 0);
206 if (p == 0) {
207 count0++;
208 } else if (p == 1) {
209 count1++;
210 } else if (p == 2) {
211 count2++;
212 }
213 f();
214 }
215
216 uint8_t getNumPriorities() const override {
217 return 3;
218 }
219
220 int count0{0};
221 int count1{0};
222 int count2{0};
223};
224
225TEST(Via, priority) {
226 PriorityExecutor exe;
227 via(&exe, -1).thenValue([](auto&&) {});
228 via(&exe, 0).thenValue([](auto&&) {});
229 via(&exe, 1).thenValue([](auto&&) {});
230 via(&exe, 42).thenValue([](auto&&) {}); // overflow should go to max priority
231 via(&exe, -42).thenValue(
232 [](auto&&) {}); // underflow should go to min priority
233 via(&exe).thenValue([](auto&&) {}); // default to mid priority
234 via(&exe, Executor::LO_PRI).thenValue([](auto&&) {});
235 via(&exe, Executor::HI_PRI).thenValue([](auto&&) {});
236 EXPECT_EQ(3, exe.count0);
237 EXPECT_EQ(2, exe.count1);
238 EXPECT_EQ(3, exe.count2);
239}
240
241TEST_F(ViaFixture, chainX1) {
242 EXPECT_EQ(
243 42,
244 makeFuture()
245 .thenMultiWithExecutor(eastExecutor.get(), [] { return 42; })
246 .get());
247}
248
249TEST_F(ViaFixture, chainX3) {
250 auto westThreadId = std::this_thread::get_id();
251 int count = 0;
252 auto f = via(westExecutor.get())
253 .thenMultiWithExecutor(
254 eastExecutor.get(),
255 [&] {
256 EXPECT_NE(std::this_thread::get_id(), westThreadId);
257 count++;
258 return 3.14159;
259 },
260 [&](double) {
261 count++;
262 return std::string("hello");
263 },
264 [&] { count++; })
265 .thenValue([&](auto&&) {
266 EXPECT_EQ(std::this_thread::get_id(), westThreadId);
267 return makeFuture(42);
268 });
269 EXPECT_EQ(42, f.getVia(waiter.get()));
270 EXPECT_EQ(3, count);
271}
272
273TEST(Via, then2) {
274 ManualExecutor x1, x2;
275 bool a = false, b = false, c = false;
276 via(&x1)
277 .thenValue([&](auto&&) { a = true; })
278 .then(&x2, [&](auto&&) { b = true; })
279 .thenValue([&](auto&&) { c = true; });
280
281 EXPECT_FALSE(a);
282 EXPECT_FALSE(b);
283
284 x1.run();
285 EXPECT_TRUE(a);
286 EXPECT_FALSE(b);
287 EXPECT_FALSE(c);
288
289 x2.run();
290 EXPECT_TRUE(b);
291 EXPECT_FALSE(c);
292
293 x1.run();
294 EXPECT_TRUE(c);
295}
296
297TEST(Via, then2Variadic) {
298 struct Foo {
299 bool a = false;
300 void foo(Try<Unit>) {
301 a = true;
302 }
303 };
304 Foo f;
305 ManualExecutor x;
306 makeFuture().then(&x, &Foo::foo, &f);
307 EXPECT_FALSE(f.a);
308 x.run();
309 EXPECT_TRUE(f.a);
310}
311
312#ifndef __APPLE__ // TODO #7372389
313/// Simple executor that does work in another thread
314class ThreadExecutor : public Executor {
315 folly::MPMCQueue<Func> funcs;
316 std::atomic<bool> done{false};
317 std::thread worker;
318 folly::Baton<> baton;
319
320 void work() {
321 baton.post();
322 Func fn;
323 while (!done) {
324 while (!funcs.isEmpty()) {
325 funcs.blockingRead(fn);
326 fn();
327 }
328 }
329 }
330
331 public:
332 explicit ThreadExecutor(size_t n = 1024) : funcs(n) {
333 worker = std::thread(std::bind(&ThreadExecutor::work, this));
334 }
335
336 ~ThreadExecutor() override {
337 done = true;
338 funcs.write([] {});
339 worker.join();
340 }
341
342 void add(Func fn) override {
343 funcs.blockingWrite(std::move(fn));
344 }
345
346 void waitForStartup() {
347 baton.wait();
348 }
349};
350
351TEST(Via, viaThenGetWasRacy) {
352 ThreadExecutor x;
353 std::unique_ptr<int> val =
354 folly::via(&x)
355 .thenValue([](auto&&) { return std::make_unique<int>(42); })
356 .get();
357 ASSERT_TRUE(!!val);
358 EXPECT_EQ(42, *val);
359}
360
361TEST(Via, callbackRace) {
362 ThreadExecutor x;
363
364 auto fn = [&x] {
365 auto promises = std::make_shared<std::vector<Promise<Unit>>>(4);
366 std::vector<Future<Unit>> futures;
367
368 for (auto& p : *promises) {
369 futures.emplace_back(p.getFuture().via(&x).thenTry([](Try<Unit>&&) {}));
370 }
371
372 x.waitForStartup();
373 x.add([promises] {
374 for (auto& p : *promises) {
375 p.setValue();
376 }
377 });
378
379 return collectAll(futures);
380 };
381
382 fn().wait();
383}
384#endif
385
386class DummyDrivableExecutor : public DrivableExecutor {
387 public:
388 void add(Func /* f */) override {}
389 void drive() override {
390 ran = true;
391 }
392 bool ran{false};
393};
394
395TEST(Via, getVia) {
396 {
397 // non-void
398 ManualExecutor x;
399 auto f = via(&x).thenValue([](auto&&) { return true; });
400 EXPECT_TRUE(f.getVia(&x));
401 }
402
403 {
404 // void
405 ManualExecutor x;
406 auto f = via(&x).then();
407 f.getVia(&x);
408 }
409
410 {
411 DummyDrivableExecutor x;
412 auto f = makeFuture(true);
413 EXPECT_TRUE(f.getVia(&x));
414 EXPECT_FALSE(x.ran);
415 }
416}
417
418TEST(Via, SimpleTimedGetVia) {
419 TimedDrivableExecutor e2;
420 Promise<folly::Unit> p;
421 auto f = p.getFuture();
422 EXPECT_THROW(f.getVia(&e2, std::chrono::seconds(1)), FutureTimeout);
423}
424
425TEST(Via, getTryVia) {
426 {
427 // non-void
428 ManualExecutor x;
429 auto f = via(&x).thenValue([](auto&&) { return 23; });
430 EXPECT_FALSE(f.isReady());
431 EXPECT_EQ(23, f.getTryVia(&x).value());
432 }
433
434 {
435 // void
436 ManualExecutor x;
437 auto f = via(&x).then();
438 EXPECT_FALSE(f.isReady());
439 auto t = f.getTryVia(&x);
440 EXPECT_TRUE(t.hasValue());
441 }
442
443 {
444 DummyDrivableExecutor x;
445 auto f = makeFuture(23);
446 EXPECT_EQ(23, f.getTryVia(&x).value());
447 EXPECT_FALSE(x.ran);
448 }
449}
450
451TEST(Via, SimpleTimedGetTryVia) {
452 TimedDrivableExecutor e2;
453 Promise<folly::Unit> p;
454 auto f = p.getFuture();
455 EXPECT_THROW(f.getTryVia(&e2, std::chrono::seconds(1)), FutureTimeout);
456}
457
458TEST(Via, waitVia) {
459 {
460 ManualExecutor x;
461 auto f = via(&x).then();
462 EXPECT_FALSE(f.isReady());
463 f.waitVia(&x);
464 EXPECT_TRUE(f.isReady());
465 }
466
467 {
468 // try rvalue as well
469 ManualExecutor x;
470 auto f = via(&x).then().waitVia(&x);
471 EXPECT_TRUE(f.isReady());
472 }
473
474 {
475 DummyDrivableExecutor x;
476 makeFuture(true).waitVia(&x);
477 EXPECT_FALSE(x.ran);
478 }
479}
480
481TEST(Via, viaRaces) {
482 ManualExecutor x;
483 Promise<Unit> p;
484 auto tid = std::this_thread::get_id();
485 bool done = false;
486
487 std::thread t1([&] {
488 p.getFuture()
489 .via(&x)
490 .thenTry(
491 [&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
492 .thenTry(
493 [&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
494 .thenTry([&](Try<Unit>&&) { done = true; });
495 });
496
497 std::thread t2([&] { p.setValue(); });
498
499 while (!done) {
500 x.run();
501 }
502 t1.join();
503 t2.join();
504}
505
506TEST(Via, viaDummyExecutorFutureSetValueFirst) {
507 // The callback object will get destroyed when passed to the executor.
508
509 // A promise will be captured by the callback lambda so we can observe that
510 // it will be destroyed.
511 Promise<Unit> captured_promise;
512 auto captured_promise_future = captured_promise.getFuture();
513
514 DummyDrivableExecutor x;
515 auto future = makeFuture().via(&x).thenValue(
516 [c = std::move(captured_promise)](auto&&) { return 42; });
517
518 EXPECT_THROW(std::move(future).get(std::chrono::seconds(5)), BrokenPromise);
519 EXPECT_THROW(
520 std::move(captured_promise_future).get(std::chrono::seconds(5)),
521 BrokenPromise);
522}
523
524TEST(Via, viaDummyExecutorFutureSetCallbackFirst) {
525 // The callback object will get destroyed when passed to the executor.
526
527 // A promise will be captured by the callback lambda so we can observe that
528 // it will be destroyed.
529 Promise<Unit> captured_promise;
530 auto captured_promise_future = captured_promise.getFuture();
531
532 DummyDrivableExecutor x;
533 Promise<Unit> trigger;
534 auto future = trigger.getFuture().via(&x).thenValue(
535 [c = std::move(captured_promise)](auto&&) { return 42; });
536 trigger.setValue();
537
538 EXPECT_THROW(std::move(future).get(std::chrono::seconds(5)), BrokenPromise);
539 EXPECT_THROW(
540 std::move(captured_promise_future).get(std::chrono::seconds(5)),
541 BrokenPromise);
542}
543
544TEST(Via, viaExecutorDiscardsTaskFutureSetValueFirst) {
545 // The callback object will get destroyed when the ManualExecutor runs out
546 // of scope.
547
548 // A promise will be captured by the callback lambda so we can observe that
549 // it will be destroyed.
550 Promise<Unit> captured_promise;
551 auto captured_promise_future = captured_promise.getFuture();
552
553 Optional<Future<int>> future;
554 {
555 ManualExecutor x;
556 future = makeFuture().via(&x).thenValue(
557 [c = std::move(captured_promise)](auto&&) { return 42; });
558 x.clear();
559 }
560
561 EXPECT_THROW(std::move(*future).get(std::chrono::seconds(5)), BrokenPromise);
562 EXPECT_THROW(
563 std::move(captured_promise_future).get(std::chrono::seconds(5)),
564 BrokenPromise);
565}
566
567TEST(Via, viaExecutorDiscardsTaskFutureSetCallbackFirst) {
568 // The callback object will get destroyed when the ManualExecutor runs out
569 // of scope.
570
571 // A promise will be captured by the callback lambda so we can observe that
572 // it will be destroyed.
573 Promise<Unit> captured_promise;
574 auto captured_promise_future = captured_promise.getFuture();
575
576 Optional<Future<int>> future;
577 {
578 ManualExecutor x;
579 Promise<Unit> trigger;
580 future = trigger.getFuture().via(&x).thenValue(
581 [c = std::move(captured_promise)](auto&&) { return 42; });
582 trigger.setValue();
583 x.clear();
584 }
585
586 EXPECT_THROW(std::move(*future).get(std::chrono::seconds(5)), BrokenPromise);
587 EXPECT_THROW(
588 std::move(captured_promise_future).get(std::chrono::seconds(5)),
589 BrokenPromise);
590}
591
592TEST(ViaFunc, liftsVoid) {
593 ManualExecutor x;
594 int count = 0;
595 Future<Unit> f = via(&x, [&] { count++; });
596
597 EXPECT_EQ(0, count);
598 x.run();
599 EXPECT_EQ(1, count);
600}
601
602TEST(ViaFunc, value) {
603 ManualExecutor x;
604 EXPECT_EQ(42, via(&x, [] { return 42; }).getVia(&x));
605}
606
607TEST(ViaFunc, exception) {
608 ManualExecutor x;
609 EXPECT_THROW(
610 via(&x, []() -> int { throw std::runtime_error("expected"); }).getVia(&x),
611 std::runtime_error);
612}
613
614TEST(ViaFunc, future) {
615 ManualExecutor x;
616 EXPECT_EQ(42, via(&x, [] { return makeFuture(42); }).getVia(&x));
617}
618
619TEST(ViaFunc, semi_future) {
620 ManualExecutor x;
621 EXPECT_EQ(42, via(&x, [] { return makeSemiFuture(42); }).getVia(&x));
622}
623
624TEST(ViaFunc, voidFuture) {
625 ManualExecutor x;
626 int count = 0;
627 via(&x, [&] { count++; }).getVia(&x);
628 EXPECT_EQ(1, count);
629}
630
631TEST(ViaFunc, isSticky) {
632 ManualExecutor x;
633 int count = 0;
634
635 auto f = via(&x, [&] { count++; });
636 x.run();
637
638 std::move(f).thenValue([&](auto&&) { count++; });
639 EXPECT_EQ(1, count);
640 x.run();
641 EXPECT_EQ(2, count);
642}
643
644TEST(ViaFunc, moveOnly) {
645 ManualExecutor x;
646 auto intp = std::make_unique<int>(42);
647
648 EXPECT_EQ(42, via(&x, [intp = std::move(intp)] { return *intp; }).getVia(&x));
649}
650
651TEST(ViaFunc, valueKeepAlive) {
652 ManualExecutor x;
653 EXPECT_EQ(42, via(getKeepAliveToken(&x), [] { return 42; }).getVia(&x));
654}
655
656TEST(ViaFunc, thenValueKeepAlive) {
657 ManualExecutor x;
658 EXPECT_EQ(
659 42,
660 via(getKeepAliveToken(&x))
661 .thenValue([](auto&&) { return 42; })
662 .getVia(&x));
663}
664