| 1 | #include <Common/Exception.h> |
| 2 | #include <Common/setThreadName.h> |
| 3 | #include <Common/MemoryTracker.h> |
| 4 | #include <Common/randomSeed.h> |
| 5 | #include <IO/WriteHelpers.h> |
| 6 | #include <common/logger_useful.h> |
| 7 | #include <Storages/MergeTree/BackgroundProcessingPool.h> |
| 8 | #include <Common/CurrentThread.h> |
| 9 | #include <Interpreters/DNSCacheUpdater.h> |
| 10 | |
| 11 | #include <ext/scope_guard.h> |
| 12 | #include <pcg_random.hpp> |
| 13 | #include <random> |
| 14 | |
| 15 | |
| 16 | namespace DB |
| 17 | { |
| 18 | |
| 19 | void BackgroundProcessingPoolTaskInfo::wake() |
| 20 | { |
| 21 | Poco::Timestamp current_time; |
| 22 | |
| 23 | { |
| 24 | std::unique_lock lock(pool.tasks_mutex); |
| 25 | |
| 26 | /// This will ensure that iterator is valid. Must be done under the same mutex when the iterator is invalidated. |
| 27 | if (removed) |
| 28 | return; |
| 29 | |
| 30 | auto next_time_to_execute = iterator->first; |
| 31 | auto this_task_handle = iterator->second; |
| 32 | |
| 33 | /// If this task was done nothing at previous time and it has to sleep, then cancel sleep time. |
| 34 | if (next_time_to_execute > current_time) |
| 35 | next_time_to_execute = current_time; |
| 36 | |
| 37 | pool.tasks.erase(iterator); |
| 38 | iterator = pool.tasks.emplace(next_time_to_execute, this_task_handle); |
| 39 | } |
| 40 | |
| 41 | /// Note that if all threads are currently do some work, this call will not wakeup any thread. |
| 42 | pool.wake_event.notify_one(); |
| 43 | } |
| 44 | |
| 45 | |
| 46 | BackgroundProcessingPool::BackgroundProcessingPool(int size_, |
| 47 | const PoolSettings & pool_settings, |
| 48 | const char * log_name, |
| 49 | const char * thread_name_) |
| 50 | : size(size_) |
| 51 | , thread_name(thread_name_) |
| 52 | , settings(pool_settings) |
| 53 | { |
| 54 | logger = &Logger::get(log_name); |
| 55 | LOG_INFO(logger, "Create " << log_name << " with " << size << " threads" ); |
| 56 | |
| 57 | threads.resize(size); |
| 58 | for (auto & thread : threads) |
| 59 | thread = ThreadFromGlobalPool([this] { threadFunction(); }); |
| 60 | } |
| 61 | |
| 62 | |
| 63 | BackgroundProcessingPool::TaskHandle BackgroundProcessingPool::addTask(const Task & task) |
| 64 | { |
| 65 | TaskHandle res = std::make_shared<TaskInfo>(*this, task); |
| 66 | |
| 67 | Poco::Timestamp current_time; |
| 68 | |
| 69 | { |
| 70 | std::unique_lock lock(tasks_mutex); |
| 71 | res->iterator = tasks.emplace(current_time, res); |
| 72 | } |
| 73 | |
| 74 | wake_event.notify_all(); |
| 75 | |
| 76 | return res; |
| 77 | } |
| 78 | |
| 79 | void BackgroundProcessingPool::removeTask(const TaskHandle & task) |
| 80 | { |
| 81 | if (task->removed.exchange(true)) |
| 82 | return; |
| 83 | |
| 84 | /// Wait for all executions of this task. |
| 85 | { |
| 86 | std::unique_lock wlock(task->rwlock); |
| 87 | } |
| 88 | |
| 89 | { |
| 90 | std::unique_lock lock(tasks_mutex); |
| 91 | tasks.erase(task->iterator); |
| 92 | /// Note that the task may be still accessible through TaskHandle (shared_ptr). |
| 93 | } |
| 94 | } |
| 95 | |
| 96 | BackgroundProcessingPool::~BackgroundProcessingPool() |
| 97 | { |
| 98 | try |
| 99 | { |
| 100 | shutdown = true; |
| 101 | wake_event.notify_all(); |
| 102 | for (auto & thread : threads) |
| 103 | thread.join(); |
| 104 | } |
| 105 | catch (...) |
| 106 | { |
| 107 | tryLogCurrentException(__PRETTY_FUNCTION__); |
| 108 | } |
| 109 | } |
| 110 | |
| 111 | |
| 112 | void BackgroundProcessingPool::threadFunction() |
| 113 | { |
| 114 | setThreadName(thread_name); |
| 115 | |
| 116 | { |
| 117 | std::lock_guard lock(tasks_mutex); |
| 118 | |
| 119 | if (thread_group) |
| 120 | { |
| 121 | /// Put all threads to one thread pool |
| 122 | CurrentThread::attachTo(thread_group); |
| 123 | } |
| 124 | else |
| 125 | { |
| 126 | CurrentThread::initializeQuery(); |
| 127 | thread_group = CurrentThread::getGroup(); |
| 128 | } |
| 129 | } |
| 130 | |
| 131 | SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); }); |
| 132 | if (auto memory_tracker = CurrentThread::getMemoryTracker()) |
| 133 | memory_tracker->setMetric(settings.memory_metric); |
| 134 | |
| 135 | pcg64 rng(randomSeed()); |
| 136 | std::this_thread::sleep_for(std::chrono::duration<double>(std::uniform_real_distribution<double>(0, settings.thread_sleep_seconds_random_part)(rng))); |
| 137 | |
| 138 | while (!shutdown) |
| 139 | { |
| 140 | TaskResult task_result = TaskResult::ERROR; |
| 141 | TaskHandle task; |
| 142 | |
| 143 | try |
| 144 | { |
| 145 | Poco::Timestamp min_time; |
| 146 | |
| 147 | { |
| 148 | std::unique_lock lock(tasks_mutex); |
| 149 | |
| 150 | if (!tasks.empty()) |
| 151 | { |
| 152 | for (const auto & time_handle : tasks) |
| 153 | { |
| 154 | if (!time_handle.second->removed) |
| 155 | { |
| 156 | min_time = time_handle.first; |
| 157 | task = time_handle.second; |
| 158 | break; |
| 159 | } |
| 160 | } |
| 161 | } |
| 162 | } |
| 163 | |
| 164 | if (shutdown) |
| 165 | break; |
| 166 | |
| 167 | if (!task) |
| 168 | { |
| 169 | std::unique_lock lock(tasks_mutex); |
| 170 | wake_event.wait_for(lock, |
| 171 | std::chrono::duration<double>(settings.thread_sleep_seconds |
| 172 | + std::uniform_real_distribution<double>(0, settings.thread_sleep_seconds_random_part)(rng))); |
| 173 | continue; |
| 174 | } |
| 175 | |
| 176 | /// No tasks ready for execution. |
| 177 | Poco::Timestamp current_time; |
| 178 | if (min_time > current_time) |
| 179 | { |
| 180 | std::unique_lock lock(tasks_mutex); |
| 181 | wake_event.wait_for(lock, std::chrono::microseconds( |
| 182 | min_time - current_time + std::uniform_int_distribution<uint64_t>(0, settings.thread_sleep_seconds_random_part * 1000000)(rng))); |
| 183 | } |
| 184 | |
| 185 | std::shared_lock rlock(task->rwlock); |
| 186 | |
| 187 | if (task->removed) |
| 188 | continue; |
| 189 | |
| 190 | { |
| 191 | CurrentMetrics::Increment metric_increment{settings.tasks_metric}; |
| 192 | task_result = task->function(); |
| 193 | } |
| 194 | } |
| 195 | catch (...) |
| 196 | { |
| 197 | tryLogCurrentException(__PRETTY_FUNCTION__); |
| 198 | } |
| 199 | |
| 200 | if (shutdown) |
| 201 | break; |
| 202 | |
| 203 | { |
| 204 | std::unique_lock lock(tasks_mutex); |
| 205 | |
| 206 | if (task->removed) |
| 207 | continue; |
| 208 | |
| 209 | if (task_result == TaskResult::SUCCESS) |
| 210 | task->count_no_work_done = 0; |
| 211 | else |
| 212 | ++task->count_no_work_done; |
| 213 | |
| 214 | /// If task has done work, it could be executed again immediately. |
| 215 | /// If not, add delay before next run. |
| 216 | |
| 217 | Poco::Timestamp next_time_to_execute; /// current time |
| 218 | if (task_result == TaskResult::ERROR) |
| 219 | next_time_to_execute += 1000000 * (std::min( |
| 220 | settings.task_sleep_seconds_when_no_work_max, |
| 221 | settings.task_sleep_seconds_when_no_work_min * std::pow(settings.task_sleep_seconds_when_no_work_multiplier, task->count_no_work_done)) |
| 222 | + std::uniform_real_distribution<double>(0, settings.task_sleep_seconds_when_no_work_random_part)(rng)); |
| 223 | else if (task_result == TaskResult::NOTHING_TO_DO) |
| 224 | next_time_to_execute += 1000000 * settings.thread_sleep_seconds_if_nothing_to_do; |
| 225 | |
| 226 | tasks.erase(task->iterator); |
| 227 | task->iterator = tasks.emplace(next_time_to_execute, task); |
| 228 | } |
| 229 | } |
| 230 | } |
| 231 | |
| 232 | } |
| 233 | |