1#include <Common/Exception.h>
2#include <Common/setThreadName.h>
3#include <Common/MemoryTracker.h>
4#include <Common/randomSeed.h>
5#include <IO/WriteHelpers.h>
6#include <common/logger_useful.h>
7#include <Storages/MergeTree/BackgroundProcessingPool.h>
8#include <Common/CurrentThread.h>
9#include <Interpreters/DNSCacheUpdater.h>
10
11#include <ext/scope_guard.h>
12#include <pcg_random.hpp>
13#include <random>
14
15
16namespace DB
17{
18
19void BackgroundProcessingPoolTaskInfo::wake()
20{
21 Poco::Timestamp current_time;
22
23 {
24 std::unique_lock lock(pool.tasks_mutex);
25
26 /// This will ensure that iterator is valid. Must be done under the same mutex when the iterator is invalidated.
27 if (removed)
28 return;
29
30 auto next_time_to_execute = iterator->first;
31 auto this_task_handle = iterator->second;
32
33 /// If this task was done nothing at previous time and it has to sleep, then cancel sleep time.
34 if (next_time_to_execute > current_time)
35 next_time_to_execute = current_time;
36
37 pool.tasks.erase(iterator);
38 iterator = pool.tasks.emplace(next_time_to_execute, this_task_handle);
39 }
40
41 /// Note that if all threads are currently do some work, this call will not wakeup any thread.
42 pool.wake_event.notify_one();
43}
44
45
46BackgroundProcessingPool::BackgroundProcessingPool(int size_,
47 const PoolSettings & pool_settings,
48 const char * log_name,
49 const char * thread_name_)
50 : size(size_)
51 , thread_name(thread_name_)
52 , settings(pool_settings)
53{
54 logger = &Logger::get(log_name);
55 LOG_INFO(logger, "Create " << log_name << " with " << size << " threads");
56
57 threads.resize(size);
58 for (auto & thread : threads)
59 thread = ThreadFromGlobalPool([this] { threadFunction(); });
60}
61
62
63BackgroundProcessingPool::TaskHandle BackgroundProcessingPool::addTask(const Task & task)
64{
65 TaskHandle res = std::make_shared<TaskInfo>(*this, task);
66
67 Poco::Timestamp current_time;
68
69 {
70 std::unique_lock lock(tasks_mutex);
71 res->iterator = tasks.emplace(current_time, res);
72 }
73
74 wake_event.notify_all();
75
76 return res;
77}
78
79void BackgroundProcessingPool::removeTask(const TaskHandle & task)
80{
81 if (task->removed.exchange(true))
82 return;
83
84 /// Wait for all executions of this task.
85 {
86 std::unique_lock wlock(task->rwlock);
87 }
88
89 {
90 std::unique_lock lock(tasks_mutex);
91 tasks.erase(task->iterator);
92 /// Note that the task may be still accessible through TaskHandle (shared_ptr).
93 }
94}
95
96BackgroundProcessingPool::~BackgroundProcessingPool()
97{
98 try
99 {
100 shutdown = true;
101 wake_event.notify_all();
102 for (auto & thread : threads)
103 thread.join();
104 }
105 catch (...)
106 {
107 tryLogCurrentException(__PRETTY_FUNCTION__);
108 }
109}
110
111
112void BackgroundProcessingPool::threadFunction()
113{
114 setThreadName(thread_name);
115
116 {
117 std::lock_guard lock(tasks_mutex);
118
119 if (thread_group)
120 {
121 /// Put all threads to one thread pool
122 CurrentThread::attachTo(thread_group);
123 }
124 else
125 {
126 CurrentThread::initializeQuery();
127 thread_group = CurrentThread::getGroup();
128 }
129 }
130
131 SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
132 if (auto memory_tracker = CurrentThread::getMemoryTracker())
133 memory_tracker->setMetric(settings.memory_metric);
134
135 pcg64 rng(randomSeed());
136 std::this_thread::sleep_for(std::chrono::duration<double>(std::uniform_real_distribution<double>(0, settings.thread_sleep_seconds_random_part)(rng)));
137
138 while (!shutdown)
139 {
140 TaskResult task_result = TaskResult::ERROR;
141 TaskHandle task;
142
143 try
144 {
145 Poco::Timestamp min_time;
146
147 {
148 std::unique_lock lock(tasks_mutex);
149
150 if (!tasks.empty())
151 {
152 for (const auto & time_handle : tasks)
153 {
154 if (!time_handle.second->removed)
155 {
156 min_time = time_handle.first;
157 task = time_handle.second;
158 break;
159 }
160 }
161 }
162 }
163
164 if (shutdown)
165 break;
166
167 if (!task)
168 {
169 std::unique_lock lock(tasks_mutex);
170 wake_event.wait_for(lock,
171 std::chrono::duration<double>(settings.thread_sleep_seconds
172 + std::uniform_real_distribution<double>(0, settings.thread_sleep_seconds_random_part)(rng)));
173 continue;
174 }
175
176 /// No tasks ready for execution.
177 Poco::Timestamp current_time;
178 if (min_time > current_time)
179 {
180 std::unique_lock lock(tasks_mutex);
181 wake_event.wait_for(lock, std::chrono::microseconds(
182 min_time - current_time + std::uniform_int_distribution<uint64_t>(0, settings.thread_sleep_seconds_random_part * 1000000)(rng)));
183 }
184
185 std::shared_lock rlock(task->rwlock);
186
187 if (task->removed)
188 continue;
189
190 {
191 CurrentMetrics::Increment metric_increment{settings.tasks_metric};
192 task_result = task->function();
193 }
194 }
195 catch (...)
196 {
197 tryLogCurrentException(__PRETTY_FUNCTION__);
198 }
199
200 if (shutdown)
201 break;
202
203 {
204 std::unique_lock lock(tasks_mutex);
205
206 if (task->removed)
207 continue;
208
209 if (task_result == TaskResult::SUCCESS)
210 task->count_no_work_done = 0;
211 else
212 ++task->count_no_work_done;
213
214 /// If task has done work, it could be executed again immediately.
215 /// If not, add delay before next run.
216
217 Poco::Timestamp next_time_to_execute; /// current time
218 if (task_result == TaskResult::ERROR)
219 next_time_to_execute += 1000000 * (std::min(
220 settings.task_sleep_seconds_when_no_work_max,
221 settings.task_sleep_seconds_when_no_work_min * std::pow(settings.task_sleep_seconds_when_no_work_multiplier, task->count_no_work_done))
222 + std::uniform_real_distribution<double>(0, settings.task_sleep_seconds_when_no_work_random_part)(rng));
223 else if (task_result == TaskResult::NOTHING_TO_DO)
224 next_time_to_execute += 1000000 * settings.thread_sleep_seconds_if_nothing_to_do;
225
226 tasks.erase(task->iterator);
227 task->iterator = tasks.emplace(next_time_to_execute, task);
228 }
229 }
230}
231
232}
233