1 | #pragma once |
2 | |
3 | #include <thread> |
4 | #include <set> |
5 | #include <map> |
6 | #include <list> |
7 | #include <condition_variable> |
8 | #include <mutex> |
9 | #include <shared_mutex> |
10 | #include <atomic> |
11 | #include <functional> |
12 | #include <Poco/Event.h> |
13 | #include <Poco/Timestamp.h> |
14 | #include <Core/Types.h> |
15 | #include <Common/CurrentMetrics.h> |
16 | #include <Common/CurrentThread.h> |
17 | #include <Common/ThreadPool.h> |
18 | #include <Poco/Util/AbstractConfiguration.h> |
19 | |
20 | |
21 | namespace CurrentMetrics |
22 | { |
23 | extern const Metric BackgroundPoolTask; |
24 | extern const Metric MemoryTrackingInBackgroundProcessingPool; |
25 | } |
26 | |
27 | namespace DB |
28 | { |
29 | |
30 | class BackgroundProcessingPool; |
31 | class BackgroundProcessingPoolTaskInfo; |
32 | |
33 | enum class BackgroundProcessingPoolTaskResult |
34 | { |
35 | SUCCESS, |
36 | ERROR, |
37 | NOTHING_TO_DO, |
38 | }; |
39 | |
40 | |
41 | /** Using a fixed number of threads, perform an arbitrary number of tasks in an infinite loop. |
42 | * In this case, one task can run simultaneously from different threads. |
43 | * Designed for tasks that perform continuous background work (for example, merge). |
44 | * `Task` is a function that returns a bool - did it do any work. |
45 | * If not, then the next time will be done later. |
46 | */ |
47 | class BackgroundProcessingPool |
48 | { |
49 | public: |
50 | /// Returns true, if some useful work was done. In that case, thread will not sleep before next run of this task. |
51 | using TaskResult = BackgroundProcessingPoolTaskResult; |
52 | using Task = std::function<TaskResult()>; |
53 | using TaskInfo = BackgroundProcessingPoolTaskInfo; |
54 | using TaskHandle = std::shared_ptr<TaskInfo>; |
55 | |
56 | |
57 | struct PoolSettings |
58 | { |
59 | double thread_sleep_seconds = 10; |
60 | double thread_sleep_seconds_random_part = 1.0; |
61 | double thread_sleep_seconds_if_nothing_to_do = 0.1; |
62 | |
63 | /// For exponential backoff. |
64 | double task_sleep_seconds_when_no_work_min = 10; |
65 | double task_sleep_seconds_when_no_work_max = 600; |
66 | double task_sleep_seconds_when_no_work_multiplier = 1.1; |
67 | double task_sleep_seconds_when_no_work_random_part = 1.0; |
68 | |
69 | CurrentMetrics::Metric tasks_metric = CurrentMetrics::BackgroundPoolTask; |
70 | CurrentMetrics::Metric memory_metric = CurrentMetrics::MemoryTrackingInBackgroundProcessingPool; |
71 | |
72 | PoolSettings() noexcept {} |
73 | }; |
74 | |
75 | BackgroundProcessingPool(int size_, |
76 | const PoolSettings & pool_settings = {}, |
77 | const char * log_name = "BackgroundProcessingPool" , |
78 | const char * thread_name_ = "BackgrProcPool" ); |
79 | |
80 | size_t getNumberOfThreads() const |
81 | { |
82 | return size; |
83 | } |
84 | |
85 | /// The task is started immediately. |
86 | TaskHandle addTask(const Task & task); |
87 | |
88 | void removeTask(const TaskHandle & task); |
89 | |
90 | ~BackgroundProcessingPool(); |
91 | |
92 | protected: |
93 | friend class BackgroundProcessingPoolTaskInfo; |
94 | |
95 | using Tasks = std::multimap<Poco::Timestamp, TaskHandle>; /// key is desired next time to execute (priority). |
96 | using Threads = std::vector<ThreadFromGlobalPool>; |
97 | |
98 | const size_t size; |
99 | const char * thread_name; |
100 | Poco::Logger * logger; |
101 | |
102 | Tasks tasks; /// Ordered in priority. |
103 | std::mutex tasks_mutex; |
104 | |
105 | Threads threads; |
106 | |
107 | std::atomic<bool> shutdown {false}; |
108 | std::condition_variable wake_event; |
109 | |
110 | /// Thread group used for profiling purposes |
111 | ThreadGroupStatusPtr thread_group; |
112 | |
113 | void threadFunction(); |
114 | |
115 | private: |
116 | PoolSettings settings; |
117 | }; |
118 | |
119 | |
120 | class BackgroundProcessingPoolTaskInfo |
121 | { |
122 | public: |
123 | /// Wake up any thread. |
124 | void wake(); |
125 | |
126 | BackgroundProcessingPoolTaskInfo(BackgroundProcessingPool & pool_, const BackgroundProcessingPool::Task & function_) |
127 | : pool(pool_), function(function_) {} |
128 | |
129 | protected: |
130 | friend class BackgroundProcessingPool; |
131 | |
132 | BackgroundProcessingPool & pool; |
133 | BackgroundProcessingPool::Task function; |
134 | |
135 | /// Read lock is hold when task is executed. |
136 | std::shared_mutex rwlock; |
137 | std::atomic<bool> removed {false}; |
138 | |
139 | std::multimap<Poco::Timestamp, std::shared_ptr<BackgroundProcessingPoolTaskInfo>>::iterator iterator; |
140 | |
141 | /// For exponential backoff. |
142 | size_t count_no_work_done = 0; |
143 | }; |
144 | |
145 | } |
146 | |