1#include "BackgroundSchedulePool.h"
2#include <Common/MemoryTracker.h>
3#include <Common/CurrentMetrics.h>
4#include <Common/Exception.h>
5#include <Common/setThreadName.h>
6#include <Common/Stopwatch.h>
7#include <Common/CurrentThread.h>
8#include <common/logger_useful.h>
9#include <chrono>
10#include <ext/scope_guard.h>
11
12
13namespace CurrentMetrics
14{
15 extern const Metric BackgroundSchedulePoolTask;
16 extern const Metric MemoryTrackingInBackgroundSchedulePool;
17}
18
19namespace DB
20{
21
22
23class TaskNotification final : public Poco::Notification
24{
25public:
26 explicit TaskNotification(const BackgroundSchedulePoolTaskInfoPtr & task_) : task(task_) {}
27 void execute() { task->execute(); }
28
29private:
30 BackgroundSchedulePoolTaskInfoPtr task;
31};
32
33
34BackgroundSchedulePoolTaskInfo::BackgroundSchedulePoolTaskInfo(
35 BackgroundSchedulePool & pool_, const std::string & log_name_, const BackgroundSchedulePool::TaskFunc & function_)
36 : pool(pool_), log_name(log_name_), function(function_)
37{
38}
39
40bool BackgroundSchedulePoolTaskInfo::schedule()
41{
42 std::lock_guard lock(schedule_mutex);
43
44 if (deactivated || scheduled)
45 return false;
46
47 scheduleImpl(lock);
48 return true;
49}
50
51bool BackgroundSchedulePoolTaskInfo::scheduleAfter(size_t ms)
52{
53 std::lock_guard lock(schedule_mutex);
54
55 if (deactivated || scheduled)
56 return false;
57
58 pool.scheduleDelayedTask(shared_from_this(), ms, lock);
59 return true;
60}
61
62void BackgroundSchedulePoolTaskInfo::deactivate()
63{
64 std::lock_guard lock_exec(exec_mutex);
65 std::lock_guard lock_schedule(schedule_mutex);
66
67 if (deactivated)
68 return;
69
70 deactivated = true;
71 scheduled = false;
72
73 if (delayed)
74 pool.cancelDelayedTask(shared_from_this(), lock_schedule);
75}
76
77void BackgroundSchedulePoolTaskInfo::activate()
78{
79 std::lock_guard lock(schedule_mutex);
80 deactivated = false;
81}
82
83bool BackgroundSchedulePoolTaskInfo::activateAndSchedule()
84{
85 std::lock_guard lock(schedule_mutex);
86
87 deactivated = false;
88 if (scheduled)
89 return false;
90
91 scheduleImpl(lock);
92 return true;
93}
94
95void BackgroundSchedulePoolTaskInfo::execute()
96{
97 Stopwatch watch;
98 CurrentMetrics::Increment metric_increment{CurrentMetrics::BackgroundSchedulePoolTask};
99
100 std::lock_guard lock_exec(exec_mutex);
101
102 {
103 std::lock_guard lock_schedule(schedule_mutex);
104
105 if (deactivated)
106 return;
107
108 scheduled = false;
109 executing = true;
110 }
111
112 function();
113 UInt64 milliseconds = watch.elapsedMilliseconds();
114
115 /// If the task is executed longer than specified time, it will be logged.
116 static const int32_t slow_execution_threshold_ms = 200;
117
118 if (milliseconds >= slow_execution_threshold_ms)
119 LOG_TRACE(&Logger::get(log_name), "Execution took " << milliseconds << " ms.");
120
121 {
122 std::lock_guard lock_schedule(schedule_mutex);
123
124 executing = false;
125
126 /// In case was scheduled while executing (including a scheduleAfter which expired) we schedule the task
127 /// on the queue. We don't call the function again here because this way all tasks
128 /// will have their chance to execute
129
130 if (scheduled)
131 pool.queue.enqueueNotification(new TaskNotification(shared_from_this()));
132 }
133}
134
135void BackgroundSchedulePoolTaskInfo::scheduleImpl(std::lock_guard<std::mutex> & schedule_mutex_lock)
136{
137 scheduled = true;
138
139 if (delayed)
140 pool.cancelDelayedTask(shared_from_this(), schedule_mutex_lock);
141
142 /// If the task is not executing at the moment, enqueue it for immediate execution.
143 /// But if it is currently executing, do nothing because it will be enqueued
144 /// at the end of the execute() method.
145 if (!executing)
146 pool.queue.enqueueNotification(new TaskNotification(shared_from_this()));
147}
148
149Coordination::WatchCallback BackgroundSchedulePoolTaskInfo::getWatchCallback()
150{
151 return [t = shared_from_this()](const Coordination::WatchResponse &)
152 {
153 t->schedule();
154 };
155}
156
157
158BackgroundSchedulePool::BackgroundSchedulePool(size_t size_)
159 : size(size_)
160{
161 LOG_INFO(&Logger::get("BackgroundSchedulePool"), "Create BackgroundSchedulePool with " << size << " threads");
162
163 threads.resize(size);
164 for (auto & thread : threads)
165 thread = ThreadFromGlobalPool([this] { threadFunction(); });
166
167 delayed_thread = ThreadFromGlobalPool([this] { delayExecutionThreadFunction(); });
168}
169
170
171BackgroundSchedulePool::~BackgroundSchedulePool()
172{
173 try
174 {
175 {
176 std::unique_lock lock(delayed_tasks_mutex);
177 shutdown = true;
178 wakeup_cond.notify_all();
179 }
180
181 queue.wakeUpAll();
182 delayed_thread.join();
183
184 LOG_TRACE(&Logger::get("BackgroundSchedulePool"), "Waiting for threads to finish.");
185 for (auto & thread : threads)
186 thread.join();
187 }
188 catch (...)
189 {
190 tryLogCurrentException(__PRETTY_FUNCTION__);
191 }
192}
193
194
195BackgroundSchedulePool::TaskHolder BackgroundSchedulePool::createTask(const std::string & name, const TaskFunc & function)
196{
197 return TaskHolder(std::make_shared<TaskInfo>(*this, name, function));
198}
199
200
201void BackgroundSchedulePool::scheduleDelayedTask(const TaskInfoPtr & task, size_t ms, std::lock_guard<std::mutex> & /* task_schedule_mutex_lock */)
202{
203 Poco::Timestamp current_time;
204
205 {
206 std::lock_guard lock(delayed_tasks_mutex);
207
208 if (task->delayed)
209 delayed_tasks.erase(task->iterator);
210
211 task->iterator = delayed_tasks.emplace(current_time + (ms * 1000), task);
212 task->delayed = true;
213 }
214
215 wakeup_cond.notify_all();
216}
217
218
219void BackgroundSchedulePool::cancelDelayedTask(const TaskInfoPtr & task, std::lock_guard<std::mutex> & /* task_schedule_mutex_lock */)
220{
221 {
222 std::lock_guard lock(delayed_tasks_mutex);
223 delayed_tasks.erase(task->iterator);
224 task->delayed = false;
225 }
226
227 wakeup_cond.notify_all();
228}
229
230
231void BackgroundSchedulePool::attachToThreadGroup()
232{
233 std::lock_guard lock(delayed_tasks_mutex);
234
235 if (thread_group)
236 {
237 /// Put all threads to one thread pool
238 CurrentThread::attachTo(thread_group);
239 }
240 else
241 {
242 CurrentThread::initializeQuery();
243 thread_group = CurrentThread::getGroup();
244 }
245}
246
247
248void BackgroundSchedulePool::threadFunction()
249{
250 setThreadName("BackgrSchedPool");
251
252 attachToThreadGroup();
253 SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
254 if (auto memory_tracker = CurrentThread::getMemoryTracker())
255 memory_tracker->setMetric(CurrentMetrics::MemoryTrackingInBackgroundSchedulePool);
256
257 while (!shutdown)
258 {
259 if (Poco::AutoPtr<Poco::Notification> notification = queue.waitDequeueNotification())
260 {
261 TaskNotification & task_notification = static_cast<TaskNotification &>(*notification);
262 task_notification.execute();
263 }
264 }
265}
266
267
268void BackgroundSchedulePool::delayExecutionThreadFunction()
269{
270 setThreadName("BckSchPoolDelay");
271
272 attachToThreadGroup();
273 SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
274
275 while (!shutdown)
276 {
277 TaskInfoPtr task;
278 bool found = false;
279
280 {
281 std::unique_lock lock(delayed_tasks_mutex);
282
283 while (!shutdown)
284 {
285 Poco::Timestamp min_time;
286
287 if (!delayed_tasks.empty())
288 {
289 auto t = delayed_tasks.begin();
290 min_time = t->first;
291 task = t->second;
292 }
293
294 if (!task)
295 {
296 wakeup_cond.wait(lock);
297 continue;
298 }
299
300 Poco::Timestamp current_time;
301
302 if (min_time > current_time)
303 {
304 wakeup_cond.wait_for(lock, std::chrono::microseconds(min_time - current_time));
305 continue;
306 }
307 else
308 {
309 /// We have a task ready for execution
310 found = true;
311 break;
312 }
313 }
314 }
315
316 if (found)
317 task->schedule();
318 }
319}
320
321}
322