1#pragma once
2
3#include <Poco/Notification.h>
4#include <Poco/NotificationQueue.h>
5#include <Poco/Timestamp.h>
6#include <thread>
7#include <atomic>
8#include <mutex>
9#include <condition_variable>
10#include <vector>
11#include <map>
12#include <functional>
13#include <boost/noncopyable.hpp>
14#include <Common/ZooKeeper/Types.h>
15#include <Common/CurrentThread.h>
16#include <Common/ThreadPool.h>
17
18
19namespace DB
20{
21
22class TaskNotification;
23class BackgroundSchedulePoolTaskInfo;
24class BackgroundSchedulePoolTaskHolder;
25
26
27/** Executes functions scheduled at a specific point in time.
28 * Basically all tasks are added in a queue and precessed by worker threads.
29 *
30 * The most important difference between this and BackgroundProcessingPool
31 * is that we have the guarantee that the same function is not executed from many workers in the same time.
32 *
33 * The usage scenario: instead starting a separate thread for each task,
34 * register a task in BackgroundSchedulePool and when you need to run the task,
35 * call schedule or scheduleAfter(duration) method.
36 */
37class BackgroundSchedulePool
38{
39public:
40 friend class BackgroundSchedulePoolTaskInfo;
41
42 using TaskInfo = BackgroundSchedulePoolTaskInfo;
43 using TaskInfoPtr = std::shared_ptr<TaskInfo>;
44 using TaskFunc = std::function<void()>;
45 using TaskHolder = BackgroundSchedulePoolTaskHolder;
46 using DelayedTasks = std::multimap<Poco::Timestamp, TaskInfoPtr>;
47
48 TaskHolder createTask(const std::string & log_name, const TaskFunc & function);
49
50 size_t getNumberOfThreads() const { return size; }
51
52 BackgroundSchedulePool(size_t size_);
53 ~BackgroundSchedulePool();
54
55private:
56 using Threads = std::vector<ThreadFromGlobalPool>;
57
58 void threadFunction();
59 void delayExecutionThreadFunction();
60
61 /// Schedule task for execution after specified delay from now.
62 void scheduleDelayedTask(const TaskInfoPtr & task_info, size_t ms, std::lock_guard<std::mutex> & task_schedule_mutex_lock);
63
64 /// Remove task, that was scheduled with delay, from schedule.
65 void cancelDelayedTask(const TaskInfoPtr & task_info, std::lock_guard<std::mutex> & task_schedule_mutex_lock);
66
67 /// Number for worker threads.
68 const size_t size;
69 std::atomic<bool> shutdown {false};
70 Threads threads;
71 Poco::NotificationQueue queue;
72
73 /// Delayed notifications.
74
75 std::condition_variable wakeup_cond;
76 std::mutex delayed_tasks_mutex;
77 /// Thread waiting for next delayed task.
78 ThreadFromGlobalPool delayed_thread;
79 /// Tasks ordered by scheduled time.
80 DelayedTasks delayed_tasks;
81
82 /// Thread group used for profiling purposes
83 ThreadGroupStatusPtr thread_group;
84
85 void attachToThreadGroup();
86};
87
88
89class BackgroundSchedulePoolTaskInfo : public std::enable_shared_from_this<BackgroundSchedulePoolTaskInfo>, private boost::noncopyable
90{
91public:
92 BackgroundSchedulePoolTaskInfo(BackgroundSchedulePool & pool_, const std::string & log_name_, const BackgroundSchedulePool::TaskFunc & function_);
93
94 /// Schedule for execution as soon as possible (if not already scheduled).
95 /// If the task was already scheduled with delay, the delay will be ignored.
96 bool schedule();
97
98 /// Schedule for execution after specified delay.
99 bool scheduleAfter(size_t ms);
100
101 /// Further attempts to schedule become no-op. Will wait till the end of the current execution of the task.
102 void deactivate();
103
104 void activate();
105
106 /// Atomically activate task and schedule it for execution.
107 bool activateAndSchedule();
108
109 /// get Coordination::WatchCallback needed for notifications from ZooKeeper watches.
110 Coordination::WatchCallback getWatchCallback();
111
112private:
113 friend class TaskNotification;
114 friend class BackgroundSchedulePool;
115
116 void execute();
117
118 void scheduleImpl(std::lock_guard<std::mutex> & schedule_mutex_lock);
119
120 BackgroundSchedulePool & pool;
121 std::string log_name;
122 BackgroundSchedulePool::TaskFunc function;
123
124 std::mutex exec_mutex;
125 std::mutex schedule_mutex;
126
127 /// Invariants:
128 /// * If deactivated is true then scheduled, delayed and executing are all false.
129 /// * scheduled and delayed cannot be true at the same time.
130 bool deactivated = false;
131 bool scheduled = false;
132 bool delayed = false;
133 bool executing = false;
134
135 /// If the task is scheduled with delay, points to element of delayed_tasks.
136 BackgroundSchedulePool::DelayedTasks::iterator iterator;
137};
138
139using BackgroundSchedulePoolTaskInfoPtr = std::shared_ptr<BackgroundSchedulePoolTaskInfo>;
140
141
142class BackgroundSchedulePoolTaskHolder
143{
144public:
145 BackgroundSchedulePoolTaskHolder() = default;
146 explicit BackgroundSchedulePoolTaskHolder(const BackgroundSchedulePoolTaskInfoPtr & task_info_) : task_info(task_info_) {}
147 BackgroundSchedulePoolTaskHolder(const BackgroundSchedulePoolTaskHolder & other) = delete;
148 BackgroundSchedulePoolTaskHolder(BackgroundSchedulePoolTaskHolder && other) noexcept = default;
149 BackgroundSchedulePoolTaskHolder & operator=(const BackgroundSchedulePoolTaskHolder & other) noexcept = delete;
150 BackgroundSchedulePoolTaskHolder & operator=(BackgroundSchedulePoolTaskHolder && other) noexcept = default;
151
152 ~BackgroundSchedulePoolTaskHolder()
153 {
154 if (task_info)
155 task_info->deactivate();
156 }
157
158 BackgroundSchedulePoolTaskInfo * operator->() { return task_info.get(); }
159 const BackgroundSchedulePoolTaskInfo * operator->() const { return task_info.get(); }
160
161private:
162 BackgroundSchedulePoolTaskInfoPtr task_info;
163};
164
165}
166