1#pragma once
2
3#include <thread>
4#include <set>
5#include <map>
6#include <list>
7#include <condition_variable>
8#include <mutex>
9#include <shared_mutex>
10#include <atomic>
11#include <functional>
12#include <Poco/Event.h>
13#include <Poco/Timestamp.h>
14#include <Core/Types.h>
15#include <Common/CurrentMetrics.h>
16#include <Common/CurrentThread.h>
17#include <Common/ThreadPool.h>
18#include <Poco/Util/AbstractConfiguration.h>
19
20
21namespace CurrentMetrics
22{
23 extern const Metric BackgroundPoolTask;
24 extern const Metric MemoryTrackingInBackgroundProcessingPool;
25}
26
27namespace DB
28{
29
30class BackgroundProcessingPool;
31class BackgroundProcessingPoolTaskInfo;
32
33enum class BackgroundProcessingPoolTaskResult
34{
35 SUCCESS,
36 ERROR,
37 NOTHING_TO_DO,
38};
39
40
41/** Using a fixed number of threads, perform an arbitrary number of tasks in an infinite loop.
42 * In this case, one task can run simultaneously from different threads.
43 * Designed for tasks that perform continuous background work (for example, merge).
44 * `Task` is a function that returns a bool - did it do any work.
45 * If not, then the next time will be done later.
46 */
47class BackgroundProcessingPool
48{
49public:
50 /// Returns true, if some useful work was done. In that case, thread will not sleep before next run of this task.
51 using TaskResult = BackgroundProcessingPoolTaskResult;
52 using Task = std::function<TaskResult()>;
53 using TaskInfo = BackgroundProcessingPoolTaskInfo;
54 using TaskHandle = std::shared_ptr<TaskInfo>;
55
56
57 struct PoolSettings
58 {
59 double thread_sleep_seconds = 10;
60 double thread_sleep_seconds_random_part = 1.0;
61 double thread_sleep_seconds_if_nothing_to_do = 0.1;
62
63 /// For exponential backoff.
64 double task_sleep_seconds_when_no_work_min = 10;
65 double task_sleep_seconds_when_no_work_max = 600;
66 double task_sleep_seconds_when_no_work_multiplier = 1.1;
67 double task_sleep_seconds_when_no_work_random_part = 1.0;
68
69 CurrentMetrics::Metric tasks_metric = CurrentMetrics::BackgroundPoolTask;
70 CurrentMetrics::Metric memory_metric = CurrentMetrics::MemoryTrackingInBackgroundProcessingPool;
71
72 PoolSettings() noexcept {}
73 };
74
75 BackgroundProcessingPool(int size_,
76 const PoolSettings & pool_settings = {},
77 const char * log_name = "BackgroundProcessingPool",
78 const char * thread_name_ = "BackgrProcPool");
79
80 size_t getNumberOfThreads() const
81 {
82 return size;
83 }
84
85 /// The task is started immediately.
86 TaskHandle addTask(const Task & task);
87
88 void removeTask(const TaskHandle & task);
89
90 ~BackgroundProcessingPool();
91
92protected:
93 friend class BackgroundProcessingPoolTaskInfo;
94
95 using Tasks = std::multimap<Poco::Timestamp, TaskHandle>; /// key is desired next time to execute (priority).
96 using Threads = std::vector<ThreadFromGlobalPool>;
97
98 const size_t size;
99 const char * thread_name;
100 Poco::Logger * logger;
101
102 Tasks tasks; /// Ordered in priority.
103 std::mutex tasks_mutex;
104
105 Threads threads;
106
107 std::atomic<bool> shutdown {false};
108 std::condition_variable wake_event;
109
110 /// Thread group used for profiling purposes
111 ThreadGroupStatusPtr thread_group;
112
113 void threadFunction();
114
115private:
116 PoolSettings settings;
117};
118
119
120class BackgroundProcessingPoolTaskInfo
121{
122public:
123 /// Wake up any thread.
124 void wake();
125
126 BackgroundProcessingPoolTaskInfo(BackgroundProcessingPool & pool_, const BackgroundProcessingPool::Task & function_)
127 : pool(pool_), function(function_) {}
128
129protected:
130 friend class BackgroundProcessingPool;
131
132 BackgroundProcessingPool & pool;
133 BackgroundProcessingPool::Task function;
134
135 /// Read lock is hold when task is executed.
136 std::shared_mutex rwlock;
137 std::atomic<bool> removed {false};
138
139 std::multimap<Poco::Timestamp, std::shared_ptr<BackgroundProcessingPoolTaskInfo>>::iterator iterator;
140
141 /// For exponential backoff.
142 size_t count_no_work_done = 0;
143};
144
145}
146