1/*
2 * Copyright 2017-present Facebook, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <atomic>
18#include <memory>
19#include <thread>
20
21#include <boost/thread.hpp>
22
23#include <folly/Exception.h>
24#include <folly/VirtualExecutor.h>
25#include <folly/executors/CPUThreadPoolExecutor.h>
26#include <folly/executors/FutureExecutor.h>
27#include <folly/executors/IOThreadPoolExecutor.h>
28#include <folly/executors/ThreadPoolExecutor.h>
29#include <folly/executors/task_queue/LifoSemMPMCQueue.h>
30#include <folly/executors/task_queue/UnboundedBlockingQueue.h>
31#include <folly/executors/thread_factory/InitThreadFactory.h>
32#include <folly/executors/thread_factory/PriorityThreadFactory.h>
33#include <folly/portability/GTest.h>
34#include <folly/synchronization/detail/Spin.h>
35
36using namespace folly;
37using namespace std::chrono;
38
39static Func burnMs(uint64_t ms) {
40 return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
41}
42
43template <class TPE>
44static void basic() {
45 // Create and destroy
46 TPE tpe(10);
47}
48
49TEST(ThreadPoolExecutorTest, CPUBasic) {
50 basic<CPUThreadPoolExecutor>();
51}
52
53TEST(IOThreadPoolExecutorTest, IOBasic) {
54 basic<IOThreadPoolExecutor>();
55}
56
57template <class TPE>
58static void resize() {
59 TPE tpe(100);
60 EXPECT_EQ(100, tpe.numThreads());
61 tpe.setNumThreads(50);
62 EXPECT_EQ(50, tpe.numThreads());
63 tpe.setNumThreads(150);
64 EXPECT_EQ(150, tpe.numThreads());
65}
66
67TEST(ThreadPoolExecutorTest, CPUResize) {
68 resize<CPUThreadPoolExecutor>();
69}
70
71TEST(ThreadPoolExecutorTest, IOResize) {
72 resize<IOThreadPoolExecutor>();
73}
74
75template <class TPE>
76static void stop() {
77 TPE tpe(1);
78 std::atomic<int> completed(0);
79 auto f = [&]() {
80 burnMs(10)();
81 completed++;
82 };
83 for (int i = 0; i < 1000; i++) {
84 tpe.add(f);
85 }
86 tpe.stop();
87 EXPECT_GT(1000, completed);
88}
89
90// IOThreadPoolExecutor's stop() behaves like join(). Outstanding tasks belong
91// to the event base, will be executed upon its destruction, and cannot be
92// taken back.
93template <>
94void stop<IOThreadPoolExecutor>() {
95 IOThreadPoolExecutor tpe(1);
96 std::atomic<int> completed(0);
97 auto f = [&]() {
98 burnMs(10)();
99 completed++;
100 };
101 for (int i = 0; i < 10; i++) {
102 tpe.add(f);
103 }
104 tpe.stop();
105 EXPECT_EQ(10, completed);
106}
107
108TEST(ThreadPoolExecutorTest, CPUStop) {
109 stop<CPUThreadPoolExecutor>();
110}
111
112TEST(ThreadPoolExecutorTest, IOStop) {
113 stop<IOThreadPoolExecutor>();
114}
115
116template <class TPE>
117static void join() {
118 TPE tpe(10);
119 std::atomic<int> completed(0);
120 auto f = [&]() {
121 burnMs(1)();
122 completed++;
123 };
124 for (int i = 0; i < 1000; i++) {
125 tpe.add(f);
126 }
127 tpe.join();
128 EXPECT_EQ(1000, completed);
129}
130
131TEST(ThreadPoolExecutorTest, CPUJoin) {
132 join<CPUThreadPoolExecutor>();
133}
134
135TEST(ThreadPoolExecutorTest, IOJoin) {
136 join<IOThreadPoolExecutor>();
137}
138
139template <class TPE>
140static void destroy() {
141 TPE tpe(1);
142 std::atomic<int> completed(0);
143 auto f = [&]() {
144 burnMs(10)();
145 completed++;
146 };
147 for (int i = 0; i < 1000; i++) {
148 tpe.add(f);
149 }
150 tpe.stop();
151 EXPECT_GT(1000, completed);
152}
153
154// IOThreadPoolExecutor's destuctor joins all tasks. Outstanding tasks belong
155// to the event base, will be executed upon its destruction, and cannot be
156// taken back.
157template <>
158void destroy<IOThreadPoolExecutor>() {
159 Optional<IOThreadPoolExecutor> tpe(in_place, 1);
160 std::atomic<int> completed(0);
161 auto f = [&]() {
162 burnMs(10)();
163 completed++;
164 };
165 for (int i = 0; i < 10; i++) {
166 tpe->add(f);
167 }
168 tpe.clear();
169 EXPECT_EQ(10, completed);
170}
171
172TEST(ThreadPoolExecutorTest, CPUDestroy) {
173 destroy<CPUThreadPoolExecutor>();
174}
175
176TEST(ThreadPoolExecutorTest, IODestroy) {
177 destroy<IOThreadPoolExecutor>();
178}
179
180template <class TPE>
181static void resizeUnderLoad() {
182 TPE tpe(10);
183 std::atomic<int> completed(0);
184 auto f = [&]() {
185 burnMs(1)();
186 completed++;
187 };
188 for (int i = 0; i < 1000; i++) {
189 tpe.add(f);
190 }
191 tpe.setNumThreads(5);
192 tpe.setNumThreads(15);
193 tpe.join();
194 EXPECT_EQ(1000, completed);
195}
196
197TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
198 resizeUnderLoad<CPUThreadPoolExecutor>();
199}
200
201TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
202 resizeUnderLoad<IOThreadPoolExecutor>();
203}
204
205template <class TPE>
206static void poolStats() {
207 folly::Baton<> startBaton, endBaton;
208 TPE tpe(1);
209 auto stats = tpe.getPoolStats();
210 EXPECT_GE(1, stats.threadCount);
211 EXPECT_GE(1, stats.idleThreadCount);
212 EXPECT_EQ(0, stats.activeThreadCount);
213 EXPECT_EQ(0, stats.pendingTaskCount);
214 EXPECT_EQ(0, tpe.getPendingTaskCount());
215 EXPECT_EQ(0, stats.totalTaskCount);
216 tpe.add([&]() {
217 startBaton.post();
218 endBaton.wait();
219 });
220 tpe.add([&]() {});
221 startBaton.wait();
222 stats = tpe.getPoolStats();
223 EXPECT_EQ(1, stats.threadCount);
224 EXPECT_EQ(0, stats.idleThreadCount);
225 EXPECT_EQ(1, stats.activeThreadCount);
226 EXPECT_EQ(1, stats.pendingTaskCount);
227 EXPECT_EQ(1, tpe.getPendingTaskCount());
228 EXPECT_EQ(2, stats.totalTaskCount);
229 endBaton.post();
230}
231
232TEST(ThreadPoolExecutorTest, CPUPoolStats) {
233 poolStats<CPUThreadPoolExecutor>();
234}
235
236TEST(ThreadPoolExecutorTest, IOPoolStats) {
237 poolStats<IOThreadPoolExecutor>();
238}
239
240template <class TPE>
241static void taskStats() {
242 TPE tpe(1);
243 std::atomic<int> c(0);
244 tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
245 int i = c++;
246 EXPECT_LT(milliseconds(0), stats.runTime);
247 if (i == 1) {
248 EXPECT_LT(milliseconds(0), stats.waitTime);
249 }
250 });
251 tpe.add(burnMs(10));
252 tpe.add(burnMs(10));
253 tpe.join();
254 EXPECT_EQ(2, c);
255}
256
257TEST(ThreadPoolExecutorTest, CPUTaskStats) {
258 taskStats<CPUThreadPoolExecutor>();
259}
260
261TEST(ThreadPoolExecutorTest, IOTaskStats) {
262 taskStats<IOThreadPoolExecutor>();
263}
264
265template <class TPE>
266static void expiration() {
267 TPE tpe(1);
268 std::atomic<int> statCbCount(0);
269 tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
270 int i = statCbCount++;
271 if (i == 0) {
272 EXPECT_FALSE(stats.expired);
273 } else if (i == 1) {
274 EXPECT_TRUE(stats.expired);
275 } else {
276 FAIL();
277 }
278 });
279 std::atomic<int> expireCbCount(0);
280 auto expireCb = [&]() { expireCbCount++; };
281 tpe.add(burnMs(10), seconds(60), expireCb);
282 tpe.add(burnMs(10), milliseconds(10), expireCb);
283 tpe.join();
284 EXPECT_EQ(2, statCbCount);
285 EXPECT_EQ(1, expireCbCount);
286}
287
288TEST(ThreadPoolExecutorTest, CPUExpiration) {
289 expiration<CPUThreadPoolExecutor>();
290}
291
292TEST(ThreadPoolExecutorTest, IOExpiration) {
293 expiration<IOThreadPoolExecutor>();
294}
295
296template <typename TPE>
297static void futureExecutor() {
298 FutureExecutor<TPE> fe(2);
299 std::atomic<int> c{0};
300 fe.addFuture([]() { return makeFuture<int>(42); }).then([&](Try<int>&& t) {
301 c++;
302 EXPECT_EQ(42, t.value());
303 });
304 fe.addFuture([]() { return 100; }).then([&](Try<int>&& t) {
305 c++;
306 EXPECT_EQ(100, t.value());
307 });
308 fe.addFuture([]() { return makeFuture(); }).then([&](Try<Unit>&& t) {
309 c++;
310 EXPECT_NO_THROW(t.value());
311 });
312 fe.addFuture([]() { return; }).then([&](Try<Unit>&& t) {
313 c++;
314 EXPECT_NO_THROW(t.value());
315 });
316 fe.addFuture([]() { throw std::runtime_error("oops"); })
317 .then([&](Try<Unit>&& t) {
318 c++;
319 EXPECT_THROW(t.value(), std::runtime_error);
320 });
321 // Test doing actual async work
322 folly::Baton<> baton;
323 fe.addFuture([&]() {
324 auto p = std::make_shared<Promise<int>>();
325 std::thread t([p]() {
326 burnMs(10)();
327 p->setValue(42);
328 });
329 t.detach();
330 return p->getFuture();
331 })
332 .then([&](Try<int>&& t) {
333 EXPECT_EQ(42, t.value());
334 c++;
335 baton.post();
336 });
337 baton.wait();
338 fe.join();
339 EXPECT_EQ(6, c);
340}
341
342TEST(ThreadPoolExecutorTest, CPUFuturePool) {
343 futureExecutor<CPUThreadPoolExecutor>();
344}
345
346TEST(ThreadPoolExecutorTest, IOFuturePool) {
347 futureExecutor<IOThreadPoolExecutor>();
348}
349
350TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
351 bool tookLopri = false;
352 auto completed = 0;
353 auto hipri = [&] {
354 EXPECT_FALSE(tookLopri);
355 completed++;
356 };
357 auto lopri = [&] {
358 tookLopri = true;
359 completed++;
360 };
361 CPUThreadPoolExecutor pool(0, 2);
362 {
363 VirtualExecutor ve(pool);
364 for (int i = 0; i < 50; i++) {
365 ve.addWithPriority(lopri, Executor::LO_PRI);
366 }
367 for (int i = 0; i < 50; i++) {
368 ve.addWithPriority(hipri, Executor::HI_PRI);
369 }
370 pool.setNumThreads(1);
371 }
372 EXPECT_EQ(100, completed);
373}
374
375class TestObserver : public ThreadPoolExecutor::Observer {
376 public:
377 void threadStarted(ThreadPoolExecutor::ThreadHandle*) override {
378 threads_++;
379 }
380 void threadStopped(ThreadPoolExecutor::ThreadHandle*) override {
381 threads_--;
382 }
383 void threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle*) override {
384 threads_++;
385 }
386 void threadNotYetStopped(ThreadPoolExecutor::ThreadHandle*) override {
387 threads_--;
388 }
389 void checkCalls() {
390 ASSERT_EQ(threads_, 0);
391 }
392
393 private:
394 std::atomic<int> threads_{0};
395};
396
397TEST(ThreadPoolExecutorTest, IOObserver) {
398 auto observer = std::make_shared<TestObserver>();
399
400 {
401 IOThreadPoolExecutor exe(10);
402 exe.addObserver(observer);
403 exe.setNumThreads(3);
404 exe.setNumThreads(0);
405 exe.setNumThreads(7);
406 exe.removeObserver(observer);
407 exe.setNumThreads(10);
408 }
409
410 observer->checkCalls();
411}
412
413TEST(ThreadPoolExecutorTest, CPUObserver) {
414 auto observer = std::make_shared<TestObserver>();
415
416 {
417 CPUThreadPoolExecutor exe(10);
418 exe.addObserver(observer);
419 exe.setNumThreads(3);
420 exe.setNumThreads(0);
421 exe.setNumThreads(7);
422 exe.removeObserver(observer);
423 exe.setNumThreads(10);
424 }
425
426 observer->checkCalls();
427}
428
429TEST(ThreadPoolExecutorTest, AddWithPriority) {
430 std::atomic_int c{0};
431 auto f = [&] { c++; };
432
433 // IO exe doesn't support priorities
434 IOThreadPoolExecutor ioExe(10);
435 EXPECT_THROW(ioExe.addWithPriority(f, 0), std::runtime_error);
436
437 CPUThreadPoolExecutor cpuExe(10, 3);
438 cpuExe.addWithPriority(f, -1);
439 cpuExe.addWithPriority(f, 0);
440 cpuExe.addWithPriority(f, 1);
441 cpuExe.addWithPriority(f, -2); // will add at the lowest priority
442 cpuExe.addWithPriority(f, 2); // will add at the highest priority
443 cpuExe.addWithPriority(f, Executor::LO_PRI);
444 cpuExe.addWithPriority(f, Executor::HI_PRI);
445 cpuExe.join();
446
447 EXPECT_EQ(7, c);
448}
449
450TEST(ThreadPoolExecutorTest, BlockingQueue) {
451 std::atomic_int c{0};
452 auto f = [&] {
453 burnMs(1)();
454 c++;
455 };
456 const int kQueueCapacity = 1;
457 const int kThreads = 1;
458
459 auto queue = std::make_unique<LifoSemMPMCQueue<
460 CPUThreadPoolExecutor::CPUTask,
461 QueueBehaviorIfFull::BLOCK>>(kQueueCapacity);
462
463 CPUThreadPoolExecutor cpuExe(
464 kThreads,
465 std::move(queue),
466 std::make_shared<NamedThreadFactory>("CPUThreadPool"));
467
468 // Add `f` five times. It sleeps for 1ms every time. Calling
469 // `cppExec.add()` is *almost* guaranteed to block because there's
470 // only 1 cpu worker thread.
471 for (int i = 0; i < 5; i++) {
472 EXPECT_NO_THROW(cpuExe.add(f));
473 }
474 cpuExe.join();
475
476 EXPECT_EQ(5, c);
477}
478
479TEST(PriorityThreadFactoryTest, ThreadPriority) {
480 errno = 0;
481 auto currentPriority = getpriority(PRIO_PROCESS, 0);
482 if (errno != 0) {
483 throwSystemError("failed to get current priority");
484 }
485
486 // Non-root users can only increase the priority value. Make sure we are
487 // trying to go to a higher priority than we are currently running as, up to
488 // the maximum allowed of 20.
489 int desiredPriority = std::min(20, currentPriority + 1);
490
491 PriorityThreadFactory factory(
492 std::make_shared<NamedThreadFactory>("stuff"), desiredPriority);
493 int actualPriority = -21;
494 factory.newThread([&]() { actualPriority = getpriority(PRIO_PROCESS, 0); })
495 .join();
496 EXPECT_EQ(desiredPriority, actualPriority);
497}
498
499TEST(InitThreadFactoryTest, InitializerCalled) {
500 int initializerCalledCount = 0;
501 InitThreadFactory factory(
502 std::make_shared<NamedThreadFactory>("test"),
503 [&initializerCalledCount] { initializerCalledCount++; });
504 factory
505 .newThread(
506 [&initializerCalledCount]() { EXPECT_EQ(initializerCalledCount, 1); })
507 .join();
508 EXPECT_EQ(initializerCalledCount, 1);
509}
510
511TEST(InitThreadFactoryTest, InitializerAndFinalizerCalled) {
512 bool initializerCalled = false;
513 bool taskBodyCalled = false;
514 bool finalizerCalled = false;
515
516 InitThreadFactory factory(
517 std::make_shared<NamedThreadFactory>("test"),
518 [&] {
519 // thread initializer
520 EXPECT_FALSE(initializerCalled);
521 EXPECT_FALSE(taskBodyCalled);
522 EXPECT_FALSE(finalizerCalled);
523 initializerCalled = true;
524 },
525 [&] {
526 // thread finalizer
527 EXPECT_TRUE(initializerCalled);
528 EXPECT_TRUE(taskBodyCalled);
529 EXPECT_FALSE(finalizerCalled);
530 finalizerCalled = true;
531 });
532
533 factory
534 .newThread([&]() {
535 EXPECT_TRUE(initializerCalled);
536 EXPECT_FALSE(taskBodyCalled);
537 EXPECT_FALSE(finalizerCalled);
538 taskBodyCalled = true;
539 })
540 .join();
541
542 EXPECT_TRUE(initializerCalled);
543 EXPECT_TRUE(taskBodyCalled);
544 EXPECT_TRUE(finalizerCalled);
545}
546
547class TestData : public folly::RequestData {
548 public:
549 explicit TestData(int data) : data_(data) {}
550 ~TestData() override {}
551
552 bool hasCallback() override {
553 return false;
554 }
555
556 int data_;
557};
558
559TEST(ThreadPoolExecutorTest, RequestContext) {
560 CPUThreadPoolExecutor executor(1);
561
562 RequestContextScopeGuard rctx; // create new request context for this scope
563 EXPECT_EQ(nullptr, RequestContext::get()->getContextData("test"));
564 RequestContext::get()->setContextData("test", std::make_unique<TestData>(42));
565 auto data = RequestContext::get()->getContextData("test");
566 EXPECT_EQ(42, dynamic_cast<TestData*>(data)->data_);
567
568 executor.add([] {
569 auto data2 = RequestContext::get()->getContextData("test");
570 ASSERT_TRUE(data2 != nullptr);
571 EXPECT_EQ(42, dynamic_cast<TestData*>(data2)->data_);
572 });
573}
574
575struct SlowMover {
576 explicit SlowMover(bool slow_ = false) : slow(slow_) {}
577 SlowMover(SlowMover&& other) noexcept {
578 *this = std::move(other);
579 }
580 SlowMover& operator=(SlowMover&& other) noexcept {
581 slow = other.slow;
582 if (slow) {
583 /* sleep override */ std::this_thread::sleep_for(milliseconds(50));
584 }
585 return *this;
586 }
587
588 bool slow;
589};
590
591template <typename Q>
592void bugD3527722_test() {
593 // Test that the queue does not get stuck if writes are completed in
594 // order opposite to how they are initiated.
595 Q q(1024);
596 std::atomic<int> turn{};
597
598 std::thread consumer1([&] {
599 ++turn;
600 q.take();
601 });
602 std::thread consumer2([&] {
603 ++turn;
604 q.take();
605 });
606
607 std::thread producer1([&] {
608 ++turn;
609 while (turn < 4) {
610 ;
611 }
612 ++turn;
613 q.add(SlowMover(true));
614 });
615 std::thread producer2([&] {
616 ++turn;
617 while (turn < 5) {
618 ;
619 }
620 q.add(SlowMover(false));
621 });
622
623 producer1.join();
624 producer2.join();
625 consumer1.join();
626 consumer2.join();
627}
628
629TEST(ThreadPoolExecutorTest, LifoSemMPMCQueueBugD3527722) {
630 bugD3527722_test<LifoSemMPMCQueue<SlowMover>>();
631}
632
633template <typename T>
634struct UBQ : public UnboundedBlockingQueue<T> {
635 explicit UBQ(int) {}
636};
637
638TEST(ThreadPoolExecutorTest, UnboundedBlockingQueueBugD3527722) {
639 bugD3527722_test<UBQ<SlowMover>>();
640}
641
642template <typename TPE>
643static void removeThreadTest() {
644 // test that adding a .then() after we have removed some threads
645 // doesn't cause deadlock and they are executed on different threads
646 folly::Optional<folly::Future<int>> f;
647 std::thread::id id1, id2;
648 TPE fe(2);
649 f = folly::makeFuture()
650 .via(&fe)
651 .thenValue([&id1](auto&&) {
652 burnMs(100)();
653 id1 = std::this_thread::get_id();
654 })
655 .thenValue([&id2](auto&&) {
656 return 77;
657 id2 = std::this_thread::get_id();
658 });
659 fe.setNumThreads(1);
660
661 // future::then should be fulfilled because there is other thread available
662 EXPECT_EQ(77, std::move(*f).get());
663 // two thread should be different because then part should be rescheduled to
664 // the other thread
665 EXPECT_NE(id1, id2);
666}
667
668TEST(ThreadPoolExecutorTest, RemoveThreadTestIO) {
669 removeThreadTest<IOThreadPoolExecutor>();
670}
671
672TEST(ThreadPoolExecutorTest, RemoveThreadTestCPU) {
673 removeThreadTest<CPUThreadPoolExecutor>();
674}
675
676template <typename TPE>
677static void resizeThreadWhileExecutingTest() {
678 TPE tpe(10);
679 EXPECT_EQ(10, tpe.numThreads());
680
681 std::atomic<int> completed(0);
682 auto f = [&]() {
683 burnMs(10)();
684 completed++;
685 };
686 for (int i = 0; i < 1000; i++) {
687 tpe.add(f);
688 }
689 tpe.setNumThreads(8);
690 EXPECT_EQ(8, tpe.numThreads());
691 tpe.setNumThreads(5);
692 EXPECT_EQ(5, tpe.numThreads());
693 tpe.setNumThreads(15);
694 EXPECT_EQ(15, tpe.numThreads());
695 tpe.join();
696 EXPECT_EQ(1000, completed);
697}
698
699TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestIO) {
700 resizeThreadWhileExecutingTest<IOThreadPoolExecutor>();
701}
702
703TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestCPU) {
704 resizeThreadWhileExecutingTest<CPUThreadPoolExecutor>();
705}
706
707template <typename TPE>
708void keepAliveTest() {
709 auto executor = std::make_unique<TPE>(4);
710
711 auto f = futures::sleep(std::chrono::milliseconds{100})
712 .via(executor.get())
713 .thenValue([keepAlive = getKeepAliveToken(executor.get())](
714 auto&&) { return 42; })
715 .semi();
716
717 executor.reset();
718
719 EXPECT_TRUE(f.isReady());
720 EXPECT_EQ(42, std::move(f).get());
721}
722
723TEST(ThreadPoolExecutorTest, KeepAliveTestIO) {
724 keepAliveTest<IOThreadPoolExecutor>();
725}
726
727TEST(ThreadPoolExecutorTest, KeepAliveTestCPU) {
728 keepAliveTest<CPUThreadPoolExecutor>();
729}
730
731int getNumThreadPoolExecutors() {
732 int count = 0;
733 ThreadPoolExecutor::withAll([&count](ThreadPoolExecutor&) { count++; });
734 return count;
735}
736
737template <typename TPE>
738static void registersToExecutorListTest() {
739 EXPECT_EQ(0, getNumThreadPoolExecutors());
740 {
741 TPE tpe(10);
742 EXPECT_EQ(1, getNumThreadPoolExecutors());
743 {
744 TPE tpe2(5);
745 EXPECT_EQ(2, getNumThreadPoolExecutors());
746 }
747 EXPECT_EQ(1, getNumThreadPoolExecutors());
748 }
749 EXPECT_EQ(0, getNumThreadPoolExecutors());
750}
751
752TEST(ThreadPoolExecutorTest, registersToExecutorListTestIO) {
753 registersToExecutorListTest<IOThreadPoolExecutor>();
754}
755
756TEST(ThreadPoolExecutorTest, registersToExecutorListTestCPU) {
757 registersToExecutorListTest<CPUThreadPoolExecutor>();
758}
759
760template <typename TPE>
761static void testUsesNameFromNamedThreadFactory() {
762 auto ntf = std::make_shared<NamedThreadFactory>("my_executor");
763 TPE tpe(10, ntf);
764 EXPECT_EQ("my_executor", tpe.getName());
765}
766
767TEST(ThreadPoolExecutorTest, testUsesNameFromNamedThreadFactoryIO) {
768 testUsesNameFromNamedThreadFactory<IOThreadPoolExecutor>();
769}
770
771TEST(ThreadPoolExecutorTest, testUsesNameFromNamedThreadFactoryCPU) {
772 testUsesNameFromNamedThreadFactory<CPUThreadPoolExecutor>();
773}
774
775TEST(ThreadPoolExecutorTest, DynamicThreadsTest) {
776 boost::barrier barrier{3};
777 auto twice_waiting_task = [&] { barrier.wait(), barrier.wait(); };
778 CPUThreadPoolExecutor e(2);
779 e.setThreadDeathTimeout(std::chrono::milliseconds(100));
780 e.add(twice_waiting_task);
781 e.add(twice_waiting_task);
782 barrier.wait(); // ensure both tasks are mid-flight
783 EXPECT_EQ(2, e.getPoolStats().activeThreadCount) << "sanity check";
784
785 auto pred = [&] { return e.getPoolStats().activeThreadCount == 0; };
786 EXPECT_FALSE(pred()) << "sanity check";
787 barrier.wait(); // let both mid-flight tasks complete
788 EXPECT_EQ(
789 folly::detail::spin_result::success,
790 folly::detail::spin_yield_until(
791 std::chrono::steady_clock::now() + std::chrono::seconds(1), pred));
792}
793
794TEST(ThreadPoolExecutorTest, DynamicThreadAddRemoveRace) {
795 CPUThreadPoolExecutor e(1);
796 e.setThreadDeathTimeout(std::chrono::milliseconds(0));
797 std::atomic<uint64_t> count{0};
798 for (int i = 0; i < 10000; i++) {
799 Baton<> b;
800 e.add([&]() {
801 count.fetch_add(1, std::memory_order_relaxed);
802 b.post();
803 });
804 b.wait();
805 }
806 e.join();
807 EXPECT_EQ(count, 10000);
808}
809
810TEST(ThreadPoolExecutorTest, AddPerf) {
811 auto queue = std::make_unique<
812 UnboundedBlockingQueue<CPUThreadPoolExecutor::CPUTask>>();
813 CPUThreadPoolExecutor e(
814 1000,
815 std::move(queue),
816 std::make_shared<NamedThreadFactory>("CPUThreadPool"));
817 e.setThreadDeathTimeout(std::chrono::milliseconds(1));
818 for (int i = 0; i < 10000; i++) {
819 e.add([&]() { e.add([]() { /* sleep override */ usleep(1000); }); });
820 }
821 e.stop();
822}
823
824template <typename TPE>
825static void WeakRefTest() {
826 // test that adding a .then() after we have
827 // started shutting down does not deadlock
828 folly::Optional<folly::Future<folly::Unit>> f;
829 int counter{0};
830 {
831 TPE fe(1);
832 f = folly::makeFuture()
833 .via(&fe)
834 .thenValue([](auto&&) { burnMs(100)(); })
835 .thenValue([&](auto&&) { ++counter; })
836 .via(fe.weakRef())
837 .thenValue([](auto&&) { burnMs(100)(); })
838 .thenValue([&](auto&&) { ++counter; });
839 }
840 EXPECT_THROW(std::move(*f).get(), folly::BrokenPromise);
841 EXPECT_EQ(1, counter);
842}
843
844template <typename TPE>
845static void virtualExecutorTest() {
846 using namespace std::literals;
847
848 folly::Optional<folly::SemiFuture<folly::Unit>> f;
849 int counter{0};
850 {
851 TPE fe(1);
852 {
853 VirtualExecutor ve(fe);
854 f = futures::sleep(100ms)
855 .via(&ve)
856 .thenValue([&](auto&&) {
857 ++counter;
858 return futures::sleep(100ms);
859 })
860 .via(&fe)
861 .thenValue([&](auto&&) { ++counter; })
862 .semi();
863 }
864 EXPECT_EQ(1, counter);
865
866 bool functionDestroyed{false};
867 bool functionCalled{false};
868 {
869 VirtualExecutor ve(fe);
870 auto guard = makeGuard([&functionDestroyed] {
871 std::this_thread::sleep_for(100ms);
872 functionDestroyed = true;
873 });
874 ve.add([&functionCalled, guard = std::move(guard)] {
875 functionCalled = true;
876 });
877 }
878 EXPECT_TRUE(functionCalled);
879 EXPECT_TRUE(functionDestroyed);
880 }
881 EXPECT_TRUE(f->isReady());
882 EXPECT_NO_THROW(std::move(*f).get());
883 EXPECT_EQ(2, counter);
884}
885
886TEST(ThreadPoolExecutorTest, WeakRefTestIO) {
887 WeakRefTest<IOThreadPoolExecutor>();
888}
889
890TEST(ThreadPoolExecutorTest, WeakRefTestCPU) {
891 WeakRefTest<CPUThreadPoolExecutor>();
892}
893
894TEST(ThreadPoolExecutorTest, VirtualExecutorTestIO) {
895 virtualExecutorTest<IOThreadPoolExecutor>();
896}
897
898TEST(ThreadPoolExecutorTest, VirtualExecutorTestCPU) {
899 virtualExecutorTest<CPUThreadPoolExecutor>();
900}
901