1 | /* |
2 | * Copyright 2017-present Facebook, Inc. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #include <atomic> |
18 | #include <memory> |
19 | #include <thread> |
20 | |
21 | #include <boost/thread.hpp> |
22 | |
23 | #include <folly/Exception.h> |
24 | #include <folly/VirtualExecutor.h> |
25 | #include <folly/executors/CPUThreadPoolExecutor.h> |
26 | #include <folly/executors/FutureExecutor.h> |
27 | #include <folly/executors/IOThreadPoolExecutor.h> |
28 | #include <folly/executors/ThreadPoolExecutor.h> |
29 | #include <folly/executors/task_queue/LifoSemMPMCQueue.h> |
30 | #include <folly/executors/task_queue/UnboundedBlockingQueue.h> |
31 | #include <folly/executors/thread_factory/InitThreadFactory.h> |
32 | #include <folly/executors/thread_factory/PriorityThreadFactory.h> |
33 | #include <folly/portability/GTest.h> |
34 | #include <folly/synchronization/detail/Spin.h> |
35 | |
36 | using namespace folly; |
37 | using namespace std::chrono; |
38 | |
39 | static Func burnMs(uint64_t ms) { |
40 | return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); }; |
41 | } |
42 | |
43 | template <class TPE> |
44 | static void basic() { |
45 | // Create and destroy |
46 | TPE tpe(10); |
47 | } |
48 | |
49 | TEST(ThreadPoolExecutorTest, CPUBasic) { |
50 | basic<CPUThreadPoolExecutor>(); |
51 | } |
52 | |
53 | TEST(IOThreadPoolExecutorTest, IOBasic) { |
54 | basic<IOThreadPoolExecutor>(); |
55 | } |
56 | |
57 | template <class TPE> |
58 | static void resize() { |
59 | TPE tpe(100); |
60 | EXPECT_EQ(100, tpe.numThreads()); |
61 | tpe.setNumThreads(50); |
62 | EXPECT_EQ(50, tpe.numThreads()); |
63 | tpe.setNumThreads(150); |
64 | EXPECT_EQ(150, tpe.numThreads()); |
65 | } |
66 | |
67 | TEST(ThreadPoolExecutorTest, CPUResize) { |
68 | resize<CPUThreadPoolExecutor>(); |
69 | } |
70 | |
71 | TEST(ThreadPoolExecutorTest, IOResize) { |
72 | resize<IOThreadPoolExecutor>(); |
73 | } |
74 | |
75 | template <class TPE> |
76 | static void stop() { |
77 | TPE tpe(1); |
78 | std::atomic<int> completed(0); |
79 | auto f = [&]() { |
80 | burnMs(10)(); |
81 | completed++; |
82 | }; |
83 | for (int i = 0; i < 1000; i++) { |
84 | tpe.add(f); |
85 | } |
86 | tpe.stop(); |
87 | EXPECT_GT(1000, completed); |
88 | } |
89 | |
90 | // IOThreadPoolExecutor's stop() behaves like join(). Outstanding tasks belong |
91 | // to the event base, will be executed upon its destruction, and cannot be |
92 | // taken back. |
93 | template <> |
94 | void stop<IOThreadPoolExecutor>() { |
95 | IOThreadPoolExecutor tpe(1); |
96 | std::atomic<int> completed(0); |
97 | auto f = [&]() { |
98 | burnMs(10)(); |
99 | completed++; |
100 | }; |
101 | for (int i = 0; i < 10; i++) { |
102 | tpe.add(f); |
103 | } |
104 | tpe.stop(); |
105 | EXPECT_EQ(10, completed); |
106 | } |
107 | |
108 | TEST(ThreadPoolExecutorTest, CPUStop) { |
109 | stop<CPUThreadPoolExecutor>(); |
110 | } |
111 | |
112 | TEST(ThreadPoolExecutorTest, IOStop) { |
113 | stop<IOThreadPoolExecutor>(); |
114 | } |
115 | |
116 | template <class TPE> |
117 | static void join() { |
118 | TPE tpe(10); |
119 | std::atomic<int> completed(0); |
120 | auto f = [&]() { |
121 | burnMs(1)(); |
122 | completed++; |
123 | }; |
124 | for (int i = 0; i < 1000; i++) { |
125 | tpe.add(f); |
126 | } |
127 | tpe.join(); |
128 | EXPECT_EQ(1000, completed); |
129 | } |
130 | |
131 | TEST(ThreadPoolExecutorTest, CPUJoin) { |
132 | join<CPUThreadPoolExecutor>(); |
133 | } |
134 | |
135 | TEST(ThreadPoolExecutorTest, IOJoin) { |
136 | join<IOThreadPoolExecutor>(); |
137 | } |
138 | |
139 | template <class TPE> |
140 | static void destroy() { |
141 | TPE tpe(1); |
142 | std::atomic<int> completed(0); |
143 | auto f = [&]() { |
144 | burnMs(10)(); |
145 | completed++; |
146 | }; |
147 | for (int i = 0; i < 1000; i++) { |
148 | tpe.add(f); |
149 | } |
150 | tpe.stop(); |
151 | EXPECT_GT(1000, completed); |
152 | } |
153 | |
154 | // IOThreadPoolExecutor's destuctor joins all tasks. Outstanding tasks belong |
155 | // to the event base, will be executed upon its destruction, and cannot be |
156 | // taken back. |
157 | template <> |
158 | void destroy<IOThreadPoolExecutor>() { |
159 | Optional<IOThreadPoolExecutor> tpe(in_place, 1); |
160 | std::atomic<int> completed(0); |
161 | auto f = [&]() { |
162 | burnMs(10)(); |
163 | completed++; |
164 | }; |
165 | for (int i = 0; i < 10; i++) { |
166 | tpe->add(f); |
167 | } |
168 | tpe.clear(); |
169 | EXPECT_EQ(10, completed); |
170 | } |
171 | |
172 | TEST(ThreadPoolExecutorTest, CPUDestroy) { |
173 | destroy<CPUThreadPoolExecutor>(); |
174 | } |
175 | |
176 | TEST(ThreadPoolExecutorTest, IODestroy) { |
177 | destroy<IOThreadPoolExecutor>(); |
178 | } |
179 | |
180 | template <class TPE> |
181 | static void resizeUnderLoad() { |
182 | TPE tpe(10); |
183 | std::atomic<int> completed(0); |
184 | auto f = [&]() { |
185 | burnMs(1)(); |
186 | completed++; |
187 | }; |
188 | for (int i = 0; i < 1000; i++) { |
189 | tpe.add(f); |
190 | } |
191 | tpe.setNumThreads(5); |
192 | tpe.setNumThreads(15); |
193 | tpe.join(); |
194 | EXPECT_EQ(1000, completed); |
195 | } |
196 | |
197 | TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) { |
198 | resizeUnderLoad<CPUThreadPoolExecutor>(); |
199 | } |
200 | |
201 | TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) { |
202 | resizeUnderLoad<IOThreadPoolExecutor>(); |
203 | } |
204 | |
205 | template <class TPE> |
206 | static void poolStats() { |
207 | folly::Baton<> startBaton, endBaton; |
208 | TPE tpe(1); |
209 | auto stats = tpe.getPoolStats(); |
210 | EXPECT_GE(1, stats.threadCount); |
211 | EXPECT_GE(1, stats.idleThreadCount); |
212 | EXPECT_EQ(0, stats.activeThreadCount); |
213 | EXPECT_EQ(0, stats.pendingTaskCount); |
214 | EXPECT_EQ(0, tpe.getPendingTaskCount()); |
215 | EXPECT_EQ(0, stats.totalTaskCount); |
216 | tpe.add([&]() { |
217 | startBaton.post(); |
218 | endBaton.wait(); |
219 | }); |
220 | tpe.add([&]() {}); |
221 | startBaton.wait(); |
222 | stats = tpe.getPoolStats(); |
223 | EXPECT_EQ(1, stats.threadCount); |
224 | EXPECT_EQ(0, stats.idleThreadCount); |
225 | EXPECT_EQ(1, stats.activeThreadCount); |
226 | EXPECT_EQ(1, stats.pendingTaskCount); |
227 | EXPECT_EQ(1, tpe.getPendingTaskCount()); |
228 | EXPECT_EQ(2, stats.totalTaskCount); |
229 | endBaton.post(); |
230 | } |
231 | |
232 | TEST(ThreadPoolExecutorTest, CPUPoolStats) { |
233 | poolStats<CPUThreadPoolExecutor>(); |
234 | } |
235 | |
236 | TEST(ThreadPoolExecutorTest, IOPoolStats) { |
237 | poolStats<IOThreadPoolExecutor>(); |
238 | } |
239 | |
240 | template <class TPE> |
241 | static void taskStats() { |
242 | TPE tpe(1); |
243 | std::atomic<int> c(0); |
244 | tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) { |
245 | int i = c++; |
246 | EXPECT_LT(milliseconds(0), stats.runTime); |
247 | if (i == 1) { |
248 | EXPECT_LT(milliseconds(0), stats.waitTime); |
249 | } |
250 | }); |
251 | tpe.add(burnMs(10)); |
252 | tpe.add(burnMs(10)); |
253 | tpe.join(); |
254 | EXPECT_EQ(2, c); |
255 | } |
256 | |
257 | TEST(ThreadPoolExecutorTest, CPUTaskStats) { |
258 | taskStats<CPUThreadPoolExecutor>(); |
259 | } |
260 | |
261 | TEST(ThreadPoolExecutorTest, IOTaskStats) { |
262 | taskStats<IOThreadPoolExecutor>(); |
263 | } |
264 | |
265 | template <class TPE> |
266 | static void expiration() { |
267 | TPE tpe(1); |
268 | std::atomic<int> statCbCount(0); |
269 | tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) { |
270 | int i = statCbCount++; |
271 | if (i == 0) { |
272 | EXPECT_FALSE(stats.expired); |
273 | } else if (i == 1) { |
274 | EXPECT_TRUE(stats.expired); |
275 | } else { |
276 | FAIL(); |
277 | } |
278 | }); |
279 | std::atomic<int> expireCbCount(0); |
280 | auto expireCb = [&]() { expireCbCount++; }; |
281 | tpe.add(burnMs(10), seconds(60), expireCb); |
282 | tpe.add(burnMs(10), milliseconds(10), expireCb); |
283 | tpe.join(); |
284 | EXPECT_EQ(2, statCbCount); |
285 | EXPECT_EQ(1, expireCbCount); |
286 | } |
287 | |
288 | TEST(ThreadPoolExecutorTest, CPUExpiration) { |
289 | expiration<CPUThreadPoolExecutor>(); |
290 | } |
291 | |
292 | TEST(ThreadPoolExecutorTest, IOExpiration) { |
293 | expiration<IOThreadPoolExecutor>(); |
294 | } |
295 | |
296 | template <typename TPE> |
297 | static void futureExecutor() { |
298 | FutureExecutor<TPE> fe(2); |
299 | std::atomic<int> c{0}; |
300 | fe.addFuture([]() { return makeFuture<int>(42); }).then([&](Try<int>&& t) { |
301 | c++; |
302 | EXPECT_EQ(42, t.value()); |
303 | }); |
304 | fe.addFuture([]() { return 100; }).then([&](Try<int>&& t) { |
305 | c++; |
306 | EXPECT_EQ(100, t.value()); |
307 | }); |
308 | fe.addFuture([]() { return makeFuture(); }).then([&](Try<Unit>&& t) { |
309 | c++; |
310 | EXPECT_NO_THROW(t.value()); |
311 | }); |
312 | fe.addFuture([]() { return; }).then([&](Try<Unit>&& t) { |
313 | c++; |
314 | EXPECT_NO_THROW(t.value()); |
315 | }); |
316 | fe.addFuture([]() { throw std::runtime_error("oops" ); }) |
317 | .then([&](Try<Unit>&& t) { |
318 | c++; |
319 | EXPECT_THROW(t.value(), std::runtime_error); |
320 | }); |
321 | // Test doing actual async work |
322 | folly::Baton<> baton; |
323 | fe.addFuture([&]() { |
324 | auto p = std::make_shared<Promise<int>>(); |
325 | std::thread t([p]() { |
326 | burnMs(10)(); |
327 | p->setValue(42); |
328 | }); |
329 | t.detach(); |
330 | return p->getFuture(); |
331 | }) |
332 | .then([&](Try<int>&& t) { |
333 | EXPECT_EQ(42, t.value()); |
334 | c++; |
335 | baton.post(); |
336 | }); |
337 | baton.wait(); |
338 | fe.join(); |
339 | EXPECT_EQ(6, c); |
340 | } |
341 | |
342 | TEST(ThreadPoolExecutorTest, CPUFuturePool) { |
343 | futureExecutor<CPUThreadPoolExecutor>(); |
344 | } |
345 | |
346 | TEST(ThreadPoolExecutorTest, IOFuturePool) { |
347 | futureExecutor<IOThreadPoolExecutor>(); |
348 | } |
349 | |
350 | TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) { |
351 | bool tookLopri = false; |
352 | auto completed = 0; |
353 | auto hipri = [&] { |
354 | EXPECT_FALSE(tookLopri); |
355 | completed++; |
356 | }; |
357 | auto lopri = [&] { |
358 | tookLopri = true; |
359 | completed++; |
360 | }; |
361 | CPUThreadPoolExecutor pool(0, 2); |
362 | { |
363 | VirtualExecutor ve(pool); |
364 | for (int i = 0; i < 50; i++) { |
365 | ve.addWithPriority(lopri, Executor::LO_PRI); |
366 | } |
367 | for (int i = 0; i < 50; i++) { |
368 | ve.addWithPriority(hipri, Executor::HI_PRI); |
369 | } |
370 | pool.setNumThreads(1); |
371 | } |
372 | EXPECT_EQ(100, completed); |
373 | } |
374 | |
375 | class TestObserver : public ThreadPoolExecutor::Observer { |
376 | public: |
377 | void threadStarted(ThreadPoolExecutor::ThreadHandle*) override { |
378 | threads_++; |
379 | } |
380 | void threadStopped(ThreadPoolExecutor::ThreadHandle*) override { |
381 | threads_--; |
382 | } |
383 | void threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle*) override { |
384 | threads_++; |
385 | } |
386 | void threadNotYetStopped(ThreadPoolExecutor::ThreadHandle*) override { |
387 | threads_--; |
388 | } |
389 | void checkCalls() { |
390 | ASSERT_EQ(threads_, 0); |
391 | } |
392 | |
393 | private: |
394 | std::atomic<int> threads_{0}; |
395 | }; |
396 | |
397 | TEST(ThreadPoolExecutorTest, IOObserver) { |
398 | auto observer = std::make_shared<TestObserver>(); |
399 | |
400 | { |
401 | IOThreadPoolExecutor exe(10); |
402 | exe.addObserver(observer); |
403 | exe.setNumThreads(3); |
404 | exe.setNumThreads(0); |
405 | exe.setNumThreads(7); |
406 | exe.removeObserver(observer); |
407 | exe.setNumThreads(10); |
408 | } |
409 | |
410 | observer->checkCalls(); |
411 | } |
412 | |
413 | TEST(ThreadPoolExecutorTest, CPUObserver) { |
414 | auto observer = std::make_shared<TestObserver>(); |
415 | |
416 | { |
417 | CPUThreadPoolExecutor exe(10); |
418 | exe.addObserver(observer); |
419 | exe.setNumThreads(3); |
420 | exe.setNumThreads(0); |
421 | exe.setNumThreads(7); |
422 | exe.removeObserver(observer); |
423 | exe.setNumThreads(10); |
424 | } |
425 | |
426 | observer->checkCalls(); |
427 | } |
428 | |
429 | TEST(ThreadPoolExecutorTest, AddWithPriority) { |
430 | std::atomic_int c{0}; |
431 | auto f = [&] { c++; }; |
432 | |
433 | // IO exe doesn't support priorities |
434 | IOThreadPoolExecutor ioExe(10); |
435 | EXPECT_THROW(ioExe.addWithPriority(f, 0), std::runtime_error); |
436 | |
437 | CPUThreadPoolExecutor cpuExe(10, 3); |
438 | cpuExe.addWithPriority(f, -1); |
439 | cpuExe.addWithPriority(f, 0); |
440 | cpuExe.addWithPriority(f, 1); |
441 | cpuExe.addWithPriority(f, -2); // will add at the lowest priority |
442 | cpuExe.addWithPriority(f, 2); // will add at the highest priority |
443 | cpuExe.addWithPriority(f, Executor::LO_PRI); |
444 | cpuExe.addWithPriority(f, Executor::HI_PRI); |
445 | cpuExe.join(); |
446 | |
447 | EXPECT_EQ(7, c); |
448 | } |
449 | |
450 | TEST(ThreadPoolExecutorTest, BlockingQueue) { |
451 | std::atomic_int c{0}; |
452 | auto f = [&] { |
453 | burnMs(1)(); |
454 | c++; |
455 | }; |
456 | const int kQueueCapacity = 1; |
457 | const int kThreads = 1; |
458 | |
459 | auto queue = std::make_unique<LifoSemMPMCQueue< |
460 | CPUThreadPoolExecutor::CPUTask, |
461 | QueueBehaviorIfFull::BLOCK>>(kQueueCapacity); |
462 | |
463 | CPUThreadPoolExecutor cpuExe( |
464 | kThreads, |
465 | std::move(queue), |
466 | std::make_shared<NamedThreadFactory>("CPUThreadPool" )); |
467 | |
468 | // Add `f` five times. It sleeps for 1ms every time. Calling |
469 | // `cppExec.add()` is *almost* guaranteed to block because there's |
470 | // only 1 cpu worker thread. |
471 | for (int i = 0; i < 5; i++) { |
472 | EXPECT_NO_THROW(cpuExe.add(f)); |
473 | } |
474 | cpuExe.join(); |
475 | |
476 | EXPECT_EQ(5, c); |
477 | } |
478 | |
479 | TEST(PriorityThreadFactoryTest, ThreadPriority) { |
480 | errno = 0; |
481 | auto currentPriority = getpriority(PRIO_PROCESS, 0); |
482 | if (errno != 0) { |
483 | throwSystemError("failed to get current priority" ); |
484 | } |
485 | |
486 | // Non-root users can only increase the priority value. Make sure we are |
487 | // trying to go to a higher priority than we are currently running as, up to |
488 | // the maximum allowed of 20. |
489 | int desiredPriority = std::min(20, currentPriority + 1); |
490 | |
491 | PriorityThreadFactory factory( |
492 | std::make_shared<NamedThreadFactory>("stuff" ), desiredPriority); |
493 | int actualPriority = -21; |
494 | factory.newThread([&]() { actualPriority = getpriority(PRIO_PROCESS, 0); }) |
495 | .join(); |
496 | EXPECT_EQ(desiredPriority, actualPriority); |
497 | } |
498 | |
499 | TEST(InitThreadFactoryTest, InitializerCalled) { |
500 | int initializerCalledCount = 0; |
501 | InitThreadFactory factory( |
502 | std::make_shared<NamedThreadFactory>("test" ), |
503 | [&initializerCalledCount] { initializerCalledCount++; }); |
504 | factory |
505 | .newThread( |
506 | [&initializerCalledCount]() { EXPECT_EQ(initializerCalledCount, 1); }) |
507 | .join(); |
508 | EXPECT_EQ(initializerCalledCount, 1); |
509 | } |
510 | |
511 | TEST(InitThreadFactoryTest, InitializerAndFinalizerCalled) { |
512 | bool initializerCalled = false; |
513 | bool taskBodyCalled = false; |
514 | bool finalizerCalled = false; |
515 | |
516 | InitThreadFactory factory( |
517 | std::make_shared<NamedThreadFactory>("test" ), |
518 | [&] { |
519 | // thread initializer |
520 | EXPECT_FALSE(initializerCalled); |
521 | EXPECT_FALSE(taskBodyCalled); |
522 | EXPECT_FALSE(finalizerCalled); |
523 | initializerCalled = true; |
524 | }, |
525 | [&] { |
526 | // thread finalizer |
527 | EXPECT_TRUE(initializerCalled); |
528 | EXPECT_TRUE(taskBodyCalled); |
529 | EXPECT_FALSE(finalizerCalled); |
530 | finalizerCalled = true; |
531 | }); |
532 | |
533 | factory |
534 | .newThread([&]() { |
535 | EXPECT_TRUE(initializerCalled); |
536 | EXPECT_FALSE(taskBodyCalled); |
537 | EXPECT_FALSE(finalizerCalled); |
538 | taskBodyCalled = true; |
539 | }) |
540 | .join(); |
541 | |
542 | EXPECT_TRUE(initializerCalled); |
543 | EXPECT_TRUE(taskBodyCalled); |
544 | EXPECT_TRUE(finalizerCalled); |
545 | } |
546 | |
547 | class TestData : public folly::RequestData { |
548 | public: |
549 | explicit TestData(int data) : data_(data) {} |
550 | ~TestData() override {} |
551 | |
552 | bool hasCallback() override { |
553 | return false; |
554 | } |
555 | |
556 | int data_; |
557 | }; |
558 | |
559 | TEST(ThreadPoolExecutorTest, RequestContext) { |
560 | CPUThreadPoolExecutor executor(1); |
561 | |
562 | RequestContextScopeGuard rctx; // create new request context for this scope |
563 | EXPECT_EQ(nullptr, RequestContext::get()->getContextData("test" )); |
564 | RequestContext::get()->setContextData("test" , std::make_unique<TestData>(42)); |
565 | auto data = RequestContext::get()->getContextData("test" ); |
566 | EXPECT_EQ(42, dynamic_cast<TestData*>(data)->data_); |
567 | |
568 | executor.add([] { |
569 | auto data2 = RequestContext::get()->getContextData("test" ); |
570 | ASSERT_TRUE(data2 != nullptr); |
571 | EXPECT_EQ(42, dynamic_cast<TestData*>(data2)->data_); |
572 | }); |
573 | } |
574 | |
575 | struct SlowMover { |
576 | explicit SlowMover(bool slow_ = false) : slow(slow_) {} |
577 | SlowMover(SlowMover&& other) noexcept { |
578 | *this = std::move(other); |
579 | } |
580 | SlowMover& operator=(SlowMover&& other) noexcept { |
581 | slow = other.slow; |
582 | if (slow) { |
583 | /* sleep override */ std::this_thread::sleep_for(milliseconds(50)); |
584 | } |
585 | return *this; |
586 | } |
587 | |
588 | bool slow; |
589 | }; |
590 | |
591 | template <typename Q> |
592 | void bugD3527722_test() { |
593 | // Test that the queue does not get stuck if writes are completed in |
594 | // order opposite to how they are initiated. |
595 | Q q(1024); |
596 | std::atomic<int> turn{}; |
597 | |
598 | std::thread consumer1([&] { |
599 | ++turn; |
600 | q.take(); |
601 | }); |
602 | std::thread consumer2([&] { |
603 | ++turn; |
604 | q.take(); |
605 | }); |
606 | |
607 | std::thread producer1([&] { |
608 | ++turn; |
609 | while (turn < 4) { |
610 | ; |
611 | } |
612 | ++turn; |
613 | q.add(SlowMover(true)); |
614 | }); |
615 | std::thread producer2([&] { |
616 | ++turn; |
617 | while (turn < 5) { |
618 | ; |
619 | } |
620 | q.add(SlowMover(false)); |
621 | }); |
622 | |
623 | producer1.join(); |
624 | producer2.join(); |
625 | consumer1.join(); |
626 | consumer2.join(); |
627 | } |
628 | |
629 | TEST(ThreadPoolExecutorTest, LifoSemMPMCQueueBugD3527722) { |
630 | bugD3527722_test<LifoSemMPMCQueue<SlowMover>>(); |
631 | } |
632 | |
633 | template <typename T> |
634 | struct UBQ : public UnboundedBlockingQueue<T> { |
635 | explicit UBQ(int) {} |
636 | }; |
637 | |
638 | TEST(ThreadPoolExecutorTest, UnboundedBlockingQueueBugD3527722) { |
639 | bugD3527722_test<UBQ<SlowMover>>(); |
640 | } |
641 | |
642 | template <typename TPE> |
643 | static void removeThreadTest() { |
644 | // test that adding a .then() after we have removed some threads |
645 | // doesn't cause deadlock and they are executed on different threads |
646 | folly::Optional<folly::Future<int>> f; |
647 | std::thread::id id1, id2; |
648 | TPE fe(2); |
649 | f = folly::makeFuture() |
650 | .via(&fe) |
651 | .thenValue([&id1](auto&&) { |
652 | burnMs(100)(); |
653 | id1 = std::this_thread::get_id(); |
654 | }) |
655 | .thenValue([&id2](auto&&) { |
656 | return 77; |
657 | id2 = std::this_thread::get_id(); |
658 | }); |
659 | fe.setNumThreads(1); |
660 | |
661 | // future::then should be fulfilled because there is other thread available |
662 | EXPECT_EQ(77, std::move(*f).get()); |
663 | // two thread should be different because then part should be rescheduled to |
664 | // the other thread |
665 | EXPECT_NE(id1, id2); |
666 | } |
667 | |
668 | TEST(ThreadPoolExecutorTest, RemoveThreadTestIO) { |
669 | removeThreadTest<IOThreadPoolExecutor>(); |
670 | } |
671 | |
672 | TEST(ThreadPoolExecutorTest, RemoveThreadTestCPU) { |
673 | removeThreadTest<CPUThreadPoolExecutor>(); |
674 | } |
675 | |
676 | template <typename TPE> |
677 | static void resizeThreadWhileExecutingTest() { |
678 | TPE tpe(10); |
679 | EXPECT_EQ(10, tpe.numThreads()); |
680 | |
681 | std::atomic<int> completed(0); |
682 | auto f = [&]() { |
683 | burnMs(10)(); |
684 | completed++; |
685 | }; |
686 | for (int i = 0; i < 1000; i++) { |
687 | tpe.add(f); |
688 | } |
689 | tpe.setNumThreads(8); |
690 | EXPECT_EQ(8, tpe.numThreads()); |
691 | tpe.setNumThreads(5); |
692 | EXPECT_EQ(5, tpe.numThreads()); |
693 | tpe.setNumThreads(15); |
694 | EXPECT_EQ(15, tpe.numThreads()); |
695 | tpe.join(); |
696 | EXPECT_EQ(1000, completed); |
697 | } |
698 | |
699 | TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestIO) { |
700 | resizeThreadWhileExecutingTest<IOThreadPoolExecutor>(); |
701 | } |
702 | |
703 | TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestCPU) { |
704 | resizeThreadWhileExecutingTest<CPUThreadPoolExecutor>(); |
705 | } |
706 | |
707 | template <typename TPE> |
708 | void keepAliveTest() { |
709 | auto executor = std::make_unique<TPE>(4); |
710 | |
711 | auto f = futures::sleep(std::chrono::milliseconds{100}) |
712 | .via(executor.get()) |
713 | .thenValue([keepAlive = getKeepAliveToken(executor.get())]( |
714 | auto&&) { return 42; }) |
715 | .semi(); |
716 | |
717 | executor.reset(); |
718 | |
719 | EXPECT_TRUE(f.isReady()); |
720 | EXPECT_EQ(42, std::move(f).get()); |
721 | } |
722 | |
723 | TEST(ThreadPoolExecutorTest, KeepAliveTestIO) { |
724 | keepAliveTest<IOThreadPoolExecutor>(); |
725 | } |
726 | |
727 | TEST(ThreadPoolExecutorTest, KeepAliveTestCPU) { |
728 | keepAliveTest<CPUThreadPoolExecutor>(); |
729 | } |
730 | |
731 | int getNumThreadPoolExecutors() { |
732 | int count = 0; |
733 | ThreadPoolExecutor::withAll([&count](ThreadPoolExecutor&) { count++; }); |
734 | return count; |
735 | } |
736 | |
737 | template <typename TPE> |
738 | static void registersToExecutorListTest() { |
739 | EXPECT_EQ(0, getNumThreadPoolExecutors()); |
740 | { |
741 | TPE tpe(10); |
742 | EXPECT_EQ(1, getNumThreadPoolExecutors()); |
743 | { |
744 | TPE tpe2(5); |
745 | EXPECT_EQ(2, getNumThreadPoolExecutors()); |
746 | } |
747 | EXPECT_EQ(1, getNumThreadPoolExecutors()); |
748 | } |
749 | EXPECT_EQ(0, getNumThreadPoolExecutors()); |
750 | } |
751 | |
752 | TEST(ThreadPoolExecutorTest, registersToExecutorListTestIO) { |
753 | registersToExecutorListTest<IOThreadPoolExecutor>(); |
754 | } |
755 | |
756 | TEST(ThreadPoolExecutorTest, registersToExecutorListTestCPU) { |
757 | registersToExecutorListTest<CPUThreadPoolExecutor>(); |
758 | } |
759 | |
760 | template <typename TPE> |
761 | static void testUsesNameFromNamedThreadFactory() { |
762 | auto ntf = std::make_shared<NamedThreadFactory>("my_executor" ); |
763 | TPE tpe(10, ntf); |
764 | EXPECT_EQ("my_executor" , tpe.getName()); |
765 | } |
766 | |
767 | TEST(ThreadPoolExecutorTest, testUsesNameFromNamedThreadFactoryIO) { |
768 | testUsesNameFromNamedThreadFactory<IOThreadPoolExecutor>(); |
769 | } |
770 | |
771 | TEST(ThreadPoolExecutorTest, testUsesNameFromNamedThreadFactoryCPU) { |
772 | testUsesNameFromNamedThreadFactory<CPUThreadPoolExecutor>(); |
773 | } |
774 | |
775 | TEST(ThreadPoolExecutorTest, DynamicThreadsTest) { |
776 | boost::barrier barrier{3}; |
777 | auto twice_waiting_task = [&] { barrier.wait(), barrier.wait(); }; |
778 | CPUThreadPoolExecutor e(2); |
779 | e.setThreadDeathTimeout(std::chrono::milliseconds(100)); |
780 | e.add(twice_waiting_task); |
781 | e.add(twice_waiting_task); |
782 | barrier.wait(); // ensure both tasks are mid-flight |
783 | EXPECT_EQ(2, e.getPoolStats().activeThreadCount) << "sanity check" ; |
784 | |
785 | auto pred = [&] { return e.getPoolStats().activeThreadCount == 0; }; |
786 | EXPECT_FALSE(pred()) << "sanity check" ; |
787 | barrier.wait(); // let both mid-flight tasks complete |
788 | EXPECT_EQ( |
789 | folly::detail::spin_result::success, |
790 | folly::detail::spin_yield_until( |
791 | std::chrono::steady_clock::now() + std::chrono::seconds(1), pred)); |
792 | } |
793 | |
794 | TEST(ThreadPoolExecutorTest, DynamicThreadAddRemoveRace) { |
795 | CPUThreadPoolExecutor e(1); |
796 | e.setThreadDeathTimeout(std::chrono::milliseconds(0)); |
797 | std::atomic<uint64_t> count{0}; |
798 | for (int i = 0; i < 10000; i++) { |
799 | Baton<> b; |
800 | e.add([&]() { |
801 | count.fetch_add(1, std::memory_order_relaxed); |
802 | b.post(); |
803 | }); |
804 | b.wait(); |
805 | } |
806 | e.join(); |
807 | EXPECT_EQ(count, 10000); |
808 | } |
809 | |
810 | TEST(ThreadPoolExecutorTest, AddPerf) { |
811 | auto queue = std::make_unique< |
812 | UnboundedBlockingQueue<CPUThreadPoolExecutor::CPUTask>>(); |
813 | CPUThreadPoolExecutor e( |
814 | 1000, |
815 | std::move(queue), |
816 | std::make_shared<NamedThreadFactory>("CPUThreadPool" )); |
817 | e.setThreadDeathTimeout(std::chrono::milliseconds(1)); |
818 | for (int i = 0; i < 10000; i++) { |
819 | e.add([&]() { e.add([]() { /* sleep override */ usleep(1000); }); }); |
820 | } |
821 | e.stop(); |
822 | } |
823 | |
824 | template <typename TPE> |
825 | static void WeakRefTest() { |
826 | // test that adding a .then() after we have |
827 | // started shutting down does not deadlock |
828 | folly::Optional<folly::Future<folly::Unit>> f; |
829 | int counter{0}; |
830 | { |
831 | TPE fe(1); |
832 | f = folly::makeFuture() |
833 | .via(&fe) |
834 | .thenValue([](auto&&) { burnMs(100)(); }) |
835 | .thenValue([&](auto&&) { ++counter; }) |
836 | .via(fe.weakRef()) |
837 | .thenValue([](auto&&) { burnMs(100)(); }) |
838 | .thenValue([&](auto&&) { ++counter; }); |
839 | } |
840 | EXPECT_THROW(std::move(*f).get(), folly::BrokenPromise); |
841 | EXPECT_EQ(1, counter); |
842 | } |
843 | |
844 | template <typename TPE> |
845 | static void virtualExecutorTest() { |
846 | using namespace std::literals; |
847 | |
848 | folly::Optional<folly::SemiFuture<folly::Unit>> f; |
849 | int counter{0}; |
850 | { |
851 | TPE fe(1); |
852 | { |
853 | VirtualExecutor ve(fe); |
854 | f = futures::sleep(100ms) |
855 | .via(&ve) |
856 | .thenValue([&](auto&&) { |
857 | ++counter; |
858 | return futures::sleep(100ms); |
859 | }) |
860 | .via(&fe) |
861 | .thenValue([&](auto&&) { ++counter; }) |
862 | .semi(); |
863 | } |
864 | EXPECT_EQ(1, counter); |
865 | |
866 | bool functionDestroyed{false}; |
867 | bool functionCalled{false}; |
868 | { |
869 | VirtualExecutor ve(fe); |
870 | auto guard = makeGuard([&functionDestroyed] { |
871 | std::this_thread::sleep_for(100ms); |
872 | functionDestroyed = true; |
873 | }); |
874 | ve.add([&functionCalled, guard = std::move(guard)] { |
875 | functionCalled = true; |
876 | }); |
877 | } |
878 | EXPECT_TRUE(functionCalled); |
879 | EXPECT_TRUE(functionDestroyed); |
880 | } |
881 | EXPECT_TRUE(f->isReady()); |
882 | EXPECT_NO_THROW(std::move(*f).get()); |
883 | EXPECT_EQ(2, counter); |
884 | } |
885 | |
886 | TEST(ThreadPoolExecutorTest, WeakRefTestIO) { |
887 | WeakRefTest<IOThreadPoolExecutor>(); |
888 | } |
889 | |
890 | TEST(ThreadPoolExecutorTest, WeakRefTestCPU) { |
891 | WeakRefTest<CPUThreadPoolExecutor>(); |
892 | } |
893 | |
894 | TEST(ThreadPoolExecutorTest, VirtualExecutorTestIO) { |
895 | virtualExecutorTest<IOThreadPoolExecutor>(); |
896 | } |
897 | |
898 | TEST(ThreadPoolExecutorTest, VirtualExecutorTestCPU) { |
899 | virtualExecutorTest<CPUThreadPoolExecutor>(); |
900 | } |
901 | |