1#include <atomic>
2
3#include <Common/Exception.h>
4#include <Common/ThreadPool.h>
5
6#include <gtest/gtest.h>
7
8
9/// Test what happens if local ThreadPool cannot create a ThreadFromGlobalPool.
10/// There was a bug: if local ThreadPool cannot allocate even a single thread,
11/// the job will be scheduled but never get executed.
12
13
14TEST(ThreadPool, GlobalFull1)
15{
16 GlobalThreadPool & global_pool = GlobalThreadPool::instance();
17
18 static constexpr size_t capacity = 5;
19
20 global_pool.setMaxThreads(capacity);
21 global_pool.setMaxFreeThreads(1);
22 global_pool.setQueueSize(capacity);
23 global_pool.wait();
24
25 std::atomic<size_t> counter = 0;
26 static constexpr size_t num_jobs = capacity + 1;
27
28 auto func = [&] { ++counter; while (counter != num_jobs) {} };
29
30 ThreadPool pool(num_jobs);
31
32 for (size_t i = 0; i < capacity; ++i)
33 pool.scheduleOrThrowOnError(func);
34
35 for (size_t i = capacity; i < num_jobs; ++i)
36 {
37 EXPECT_THROW(pool.scheduleOrThrowOnError(func), DB::Exception);
38 ++counter;
39 }
40
41 pool.wait();
42 EXPECT_EQ(counter, num_jobs);
43
44 global_pool.setMaxThreads(10000);
45 global_pool.setMaxFreeThreads(1000);
46 global_pool.setQueueSize(10000);
47}
48
49
50TEST(ThreadPool, GlobalFull2)
51{
52 GlobalThreadPool & global_pool = GlobalThreadPool::instance();
53
54 static constexpr size_t capacity = 5;
55
56 global_pool.setMaxThreads(capacity);
57 global_pool.setMaxFreeThreads(1);
58 global_pool.setQueueSize(capacity);
59
60 /// ThreadFromGlobalPool from local thread pools from previous test case have exited
61 /// but their threads from global_pool may not have finished (they still have to exit).
62 /// If we will not wait here, we can get "Cannot schedule a task exception" earlier than we expect in this test.
63 global_pool.wait();
64
65 std::atomic<size_t> counter = 0;
66 auto func = [&] { ++counter; while (counter != capacity + 1) {} };
67
68 ThreadPool pool(capacity, 0, capacity);
69 for (size_t i = 0; i < capacity; ++i)
70 pool.scheduleOrThrowOnError(func);
71
72 ThreadPool another_pool(1);
73 EXPECT_THROW(another_pool.scheduleOrThrowOnError(func), DB::Exception);
74
75 ++counter;
76
77 pool.wait();
78
79 global_pool.wait();
80
81 for (size_t i = 0; i < capacity; ++i)
82 another_pool.scheduleOrThrowOnError([&] { ++counter; });
83
84 another_pool.wait();
85 EXPECT_EQ(counter, capacity * 2 + 1);
86
87 global_pool.setMaxThreads(10000);
88 global_pool.setMaxFreeThreads(1000);
89 global_pool.setQueueSize(10000);
90}
91