1 | #include <Common/Exception.h> |
2 | #include <Common/setThreadName.h> |
3 | #include <Common/MemoryTracker.h> |
4 | #include <Common/randomSeed.h> |
5 | #include <IO/WriteHelpers.h> |
6 | #include <common/logger_useful.h> |
7 | #include <Storages/MergeTree/BackgroundProcessingPool.h> |
8 | #include <Common/CurrentThread.h> |
9 | #include <Interpreters/DNSCacheUpdater.h> |
10 | |
11 | #include <ext/scope_guard.h> |
12 | #include <pcg_random.hpp> |
13 | #include <random> |
14 | |
15 | |
16 | namespace DB |
17 | { |
18 | |
19 | void BackgroundProcessingPoolTaskInfo::wake() |
20 | { |
21 | Poco::Timestamp current_time; |
22 | |
23 | { |
24 | std::unique_lock lock(pool.tasks_mutex); |
25 | |
26 | /// This will ensure that iterator is valid. Must be done under the same mutex when the iterator is invalidated. |
27 | if (removed) |
28 | return; |
29 | |
30 | auto next_time_to_execute = iterator->first; |
31 | auto this_task_handle = iterator->second; |
32 | |
33 | /// If this task was done nothing at previous time and it has to sleep, then cancel sleep time. |
34 | if (next_time_to_execute > current_time) |
35 | next_time_to_execute = current_time; |
36 | |
37 | pool.tasks.erase(iterator); |
38 | iterator = pool.tasks.emplace(next_time_to_execute, this_task_handle); |
39 | } |
40 | |
41 | /// Note that if all threads are currently do some work, this call will not wakeup any thread. |
42 | pool.wake_event.notify_one(); |
43 | } |
44 | |
45 | |
46 | BackgroundProcessingPool::BackgroundProcessingPool(int size_, |
47 | const PoolSettings & pool_settings, |
48 | const char * log_name, |
49 | const char * thread_name_) |
50 | : size(size_) |
51 | , thread_name(thread_name_) |
52 | , settings(pool_settings) |
53 | { |
54 | logger = &Logger::get(log_name); |
55 | LOG_INFO(logger, "Create " << log_name << " with " << size << " threads" ); |
56 | |
57 | threads.resize(size); |
58 | for (auto & thread : threads) |
59 | thread = ThreadFromGlobalPool([this] { threadFunction(); }); |
60 | } |
61 | |
62 | |
63 | BackgroundProcessingPool::TaskHandle BackgroundProcessingPool::addTask(const Task & task) |
64 | { |
65 | TaskHandle res = std::make_shared<TaskInfo>(*this, task); |
66 | |
67 | Poco::Timestamp current_time; |
68 | |
69 | { |
70 | std::unique_lock lock(tasks_mutex); |
71 | res->iterator = tasks.emplace(current_time, res); |
72 | } |
73 | |
74 | wake_event.notify_all(); |
75 | |
76 | return res; |
77 | } |
78 | |
79 | void BackgroundProcessingPool::removeTask(const TaskHandle & task) |
80 | { |
81 | if (task->removed.exchange(true)) |
82 | return; |
83 | |
84 | /// Wait for all executions of this task. |
85 | { |
86 | std::unique_lock wlock(task->rwlock); |
87 | } |
88 | |
89 | { |
90 | std::unique_lock lock(tasks_mutex); |
91 | tasks.erase(task->iterator); |
92 | /// Note that the task may be still accessible through TaskHandle (shared_ptr). |
93 | } |
94 | } |
95 | |
96 | BackgroundProcessingPool::~BackgroundProcessingPool() |
97 | { |
98 | try |
99 | { |
100 | shutdown = true; |
101 | wake_event.notify_all(); |
102 | for (auto & thread : threads) |
103 | thread.join(); |
104 | } |
105 | catch (...) |
106 | { |
107 | tryLogCurrentException(__PRETTY_FUNCTION__); |
108 | } |
109 | } |
110 | |
111 | |
112 | void BackgroundProcessingPool::threadFunction() |
113 | { |
114 | setThreadName(thread_name); |
115 | |
116 | { |
117 | std::lock_guard lock(tasks_mutex); |
118 | |
119 | if (thread_group) |
120 | { |
121 | /// Put all threads to one thread pool |
122 | CurrentThread::attachTo(thread_group); |
123 | } |
124 | else |
125 | { |
126 | CurrentThread::initializeQuery(); |
127 | thread_group = CurrentThread::getGroup(); |
128 | } |
129 | } |
130 | |
131 | SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); }); |
132 | if (auto memory_tracker = CurrentThread::getMemoryTracker()) |
133 | memory_tracker->setMetric(settings.memory_metric); |
134 | |
135 | pcg64 rng(randomSeed()); |
136 | std::this_thread::sleep_for(std::chrono::duration<double>(std::uniform_real_distribution<double>(0, settings.thread_sleep_seconds_random_part)(rng))); |
137 | |
138 | while (!shutdown) |
139 | { |
140 | TaskResult task_result = TaskResult::ERROR; |
141 | TaskHandle task; |
142 | |
143 | try |
144 | { |
145 | Poco::Timestamp min_time; |
146 | |
147 | { |
148 | std::unique_lock lock(tasks_mutex); |
149 | |
150 | if (!tasks.empty()) |
151 | { |
152 | for (const auto & time_handle : tasks) |
153 | { |
154 | if (!time_handle.second->removed) |
155 | { |
156 | min_time = time_handle.first; |
157 | task = time_handle.second; |
158 | break; |
159 | } |
160 | } |
161 | } |
162 | } |
163 | |
164 | if (shutdown) |
165 | break; |
166 | |
167 | if (!task) |
168 | { |
169 | std::unique_lock lock(tasks_mutex); |
170 | wake_event.wait_for(lock, |
171 | std::chrono::duration<double>(settings.thread_sleep_seconds |
172 | + std::uniform_real_distribution<double>(0, settings.thread_sleep_seconds_random_part)(rng))); |
173 | continue; |
174 | } |
175 | |
176 | /// No tasks ready for execution. |
177 | Poco::Timestamp current_time; |
178 | if (min_time > current_time) |
179 | { |
180 | std::unique_lock lock(tasks_mutex); |
181 | wake_event.wait_for(lock, std::chrono::microseconds( |
182 | min_time - current_time + std::uniform_int_distribution<uint64_t>(0, settings.thread_sleep_seconds_random_part * 1000000)(rng))); |
183 | } |
184 | |
185 | std::shared_lock rlock(task->rwlock); |
186 | |
187 | if (task->removed) |
188 | continue; |
189 | |
190 | { |
191 | CurrentMetrics::Increment metric_increment{settings.tasks_metric}; |
192 | task_result = task->function(); |
193 | } |
194 | } |
195 | catch (...) |
196 | { |
197 | tryLogCurrentException(__PRETTY_FUNCTION__); |
198 | } |
199 | |
200 | if (shutdown) |
201 | break; |
202 | |
203 | { |
204 | std::unique_lock lock(tasks_mutex); |
205 | |
206 | if (task->removed) |
207 | continue; |
208 | |
209 | if (task_result == TaskResult::SUCCESS) |
210 | task->count_no_work_done = 0; |
211 | else |
212 | ++task->count_no_work_done; |
213 | |
214 | /// If task has done work, it could be executed again immediately. |
215 | /// If not, add delay before next run. |
216 | |
217 | Poco::Timestamp next_time_to_execute; /// current time |
218 | if (task_result == TaskResult::ERROR) |
219 | next_time_to_execute += 1000000 * (std::min( |
220 | settings.task_sleep_seconds_when_no_work_max, |
221 | settings.task_sleep_seconds_when_no_work_min * std::pow(settings.task_sleep_seconds_when_no_work_multiplier, task->count_no_work_done)) |
222 | + std::uniform_real_distribution<double>(0, settings.task_sleep_seconds_when_no_work_random_part)(rng)); |
223 | else if (task_result == TaskResult::NOTHING_TO_DO) |
224 | next_time_to_execute += 1000000 * settings.thread_sleep_seconds_if_nothing_to_do; |
225 | |
226 | tasks.erase(task->iterator); |
227 | task->iterator = tasks.emplace(next_time_to_execute, task); |
228 | } |
229 | } |
230 | } |
231 | |
232 | } |
233 | |