| 1 | #include <cstdlib> |
| 2 | |
| 3 | #include "MemoryTracker.h" |
| 4 | #include <common/likely.h> |
| 5 | #include <common/logger_useful.h> |
| 6 | #include <Common/Exception.h> |
| 7 | #include <Common/formatReadable.h> |
| 8 | #include <Common/CurrentThread.h> |
| 9 | #include <IO/WriteHelpers.h> |
| 10 | |
| 11 | |
| 12 | namespace DB |
| 13 | { |
| 14 | namespace ErrorCodes |
| 15 | { |
| 16 | extern const int MEMORY_LIMIT_EXCEEDED; |
| 17 | } |
| 18 | } |
| 19 | |
| 20 | |
| 21 | static constexpr size_t log_peak_memory_usage_every = 1ULL << 30; |
| 22 | /// Each thread could new/delete memory in range of (-untracked_memory_limit, untracked_memory_limit) without access to common counters. |
| 23 | static constexpr Int64 untracked_memory_limit = 4 * 1024 * 1024; |
| 24 | |
| 25 | |
| 26 | MemoryTracker::~MemoryTracker() |
| 27 | { |
| 28 | if (static_cast<int>(level) < static_cast<int>(VariableContext::Process) && peak) |
| 29 | { |
| 30 | try |
| 31 | { |
| 32 | logPeakMemoryUsage(); |
| 33 | } |
| 34 | catch (...) |
| 35 | { |
| 36 | /// Exception in Logger, intentionally swallow. |
| 37 | } |
| 38 | } |
| 39 | |
| 40 | /** This is needed for next memory tracker to be consistent with sum of all referring memory trackers. |
| 41 | * |
| 42 | * Sometimes, memory tracker could be destroyed before memory was freed, and on destruction, amount > 0. |
| 43 | * For example, a query could allocate some data and leave it in cache. |
| 44 | * |
| 45 | * If memory will be freed outside of context of this memory tracker, |
| 46 | * but in context of one of the 'next' memory trackers, |
| 47 | * then memory usage of 'next' memory trackers will be underestimated, |
| 48 | * because amount will be decreased twice (first - here, second - when real 'free' happens). |
| 49 | */ |
| 50 | if (auto value = amount.load(std::memory_order_relaxed)) |
| 51 | free(value); |
| 52 | } |
| 53 | |
| 54 | |
| 55 | void MemoryTracker::logPeakMemoryUsage() const |
| 56 | { |
| 57 | LOG_DEBUG(&Logger::get("MemoryTracker" ), |
| 58 | "Peak memory usage" << (description ? " " + std::string(description) : "" ) |
| 59 | << ": " << formatReadableSizeWithBinarySuffix(peak) << "." ); |
| 60 | } |
| 61 | |
| 62 | static void logMemoryUsage(Int64 amount) |
| 63 | { |
| 64 | LOG_DEBUG(&Logger::get("MemoryTracker" ), |
| 65 | "Current memory usage: " << formatReadableSizeWithBinarySuffix(amount) << "." ); |
| 66 | } |
| 67 | |
| 68 | |
| 69 | |
| 70 | void MemoryTracker::alloc(Int64 size) |
| 71 | { |
| 72 | if (blocker.isCancelled()) |
| 73 | return; |
| 74 | |
| 75 | /** Using memory_order_relaxed means that if allocations are done simultaneously, |
| 76 | * we allow exception about memory limit exceeded to be thrown only on next allocation. |
| 77 | * So, we allow over-allocations. |
| 78 | */ |
| 79 | Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed); |
| 80 | |
| 81 | if (metric != CurrentMetrics::end()) |
| 82 | CurrentMetrics::add(metric, size); |
| 83 | |
| 84 | Int64 current_limit = limit.load(std::memory_order_relaxed); |
| 85 | |
| 86 | /// Using non-thread-safe random number generator. Joint distribution in different threads would not be uniform. |
| 87 | /// In this case, it doesn't matter. |
| 88 | if (unlikely(fault_probability && drand48() < fault_probability)) |
| 89 | { |
| 90 | free(size); |
| 91 | |
| 92 | /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc |
| 93 | auto untrack_lock = blocker.cancel(); |
| 94 | |
| 95 | std::stringstream message; |
| 96 | message << "Memory tracker" ; |
| 97 | if (description) |
| 98 | message << " " << description; |
| 99 | message << ": fault injected. Would use " << formatReadableSizeWithBinarySuffix(will_be) |
| 100 | << " (attempt to allocate chunk of " << size << " bytes)" |
| 101 | << ", maximum: " << formatReadableSizeWithBinarySuffix(current_limit); |
| 102 | |
| 103 | throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED); |
| 104 | } |
| 105 | |
| 106 | if (unlikely(current_limit && will_be > current_limit)) |
| 107 | { |
| 108 | free(size); |
| 109 | |
| 110 | /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc |
| 111 | auto untrack_lock = blocker.cancel(); |
| 112 | |
| 113 | std::stringstream message; |
| 114 | message << "Memory limit" ; |
| 115 | if (description) |
| 116 | message << " " << description; |
| 117 | message << " exceeded: would use " << formatReadableSizeWithBinarySuffix(will_be) |
| 118 | << " (attempt to allocate chunk of " << size << " bytes)" |
| 119 | << ", maximum: " << formatReadableSizeWithBinarySuffix(current_limit); |
| 120 | |
| 121 | throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED); |
| 122 | } |
| 123 | |
| 124 | auto peak_old = peak.load(std::memory_order_relaxed); |
| 125 | if (will_be > peak_old) /// Races doesn't matter. Could rewrite with CAS, but not worth. |
| 126 | { |
| 127 | peak.store(will_be, std::memory_order_relaxed); |
| 128 | |
| 129 | if (level == VariableContext::Process && will_be / log_peak_memory_usage_every > peak_old / log_peak_memory_usage_every) |
| 130 | logMemoryUsage(will_be); |
| 131 | } |
| 132 | |
| 133 | if (auto loaded_next = parent.load(std::memory_order_relaxed)) |
| 134 | loaded_next->alloc(size); |
| 135 | } |
| 136 | |
| 137 | |
| 138 | void MemoryTracker::free(Int64 size) |
| 139 | { |
| 140 | if (blocker.isCancelled()) |
| 141 | return; |
| 142 | |
| 143 | if (level == VariableContext::Thread) |
| 144 | { |
| 145 | /// Could become negative if memory allocated in this thread is freed in another one |
| 146 | amount.fetch_sub(size, std::memory_order_relaxed); |
| 147 | } |
| 148 | else |
| 149 | { |
| 150 | Int64 new_amount = amount.fetch_sub(size, std::memory_order_relaxed) - size; |
| 151 | |
| 152 | /** Sometimes, query could free some data, that was allocated outside of query context. |
| 153 | * Example: cache eviction. |
| 154 | * To avoid negative memory usage, we "saturate" amount. |
| 155 | * Memory usage will be calculated with some error. |
| 156 | * NOTE: The code is not atomic. Not worth to fix. |
| 157 | */ |
| 158 | if (unlikely(new_amount < 0)) |
| 159 | { |
| 160 | amount.fetch_sub(new_amount); |
| 161 | size += new_amount; |
| 162 | } |
| 163 | } |
| 164 | |
| 165 | if (auto loaded_next = parent.load(std::memory_order_relaxed)) |
| 166 | loaded_next->free(size); |
| 167 | |
| 168 | if (metric != CurrentMetrics::end()) |
| 169 | CurrentMetrics::sub(metric, size); |
| 170 | } |
| 171 | |
| 172 | |
| 173 | void MemoryTracker::resetCounters() |
| 174 | { |
| 175 | amount.store(0, std::memory_order_relaxed); |
| 176 | peak.store(0, std::memory_order_relaxed); |
| 177 | limit.store(0, std::memory_order_relaxed); |
| 178 | } |
| 179 | |
| 180 | |
| 181 | void MemoryTracker::reset() |
| 182 | { |
| 183 | if (metric != CurrentMetrics::end()) |
| 184 | CurrentMetrics::sub(metric, amount.load(std::memory_order_relaxed)); |
| 185 | |
| 186 | resetCounters(); |
| 187 | } |
| 188 | |
| 189 | |
| 190 | void MemoryTracker::setOrRaiseLimit(Int64 value) |
| 191 | { |
| 192 | /// This is just atomic set to maximum. |
| 193 | Int64 old_value = limit.load(std::memory_order_relaxed); |
| 194 | while (old_value < value && !limit.compare_exchange_weak(old_value, value)) |
| 195 | ; |
| 196 | } |
| 197 | |
| 198 | |
| 199 | namespace CurrentMemoryTracker |
| 200 | { |
| 201 | void alloc(Int64 size) |
| 202 | { |
| 203 | if (auto memory_tracker = DB::CurrentThread::getMemoryTracker()) |
| 204 | { |
| 205 | Int64 & untracked = DB::CurrentThread::getUntrackedMemory(); |
| 206 | untracked += size; |
| 207 | if (untracked > untracked_memory_limit) |
| 208 | { |
| 209 | /// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes |
| 210 | /// more. It could be usefull for enlarge Exception message in rethrow logic. |
| 211 | Int64 tmp = untracked; |
| 212 | untracked = 0; |
| 213 | memory_tracker->alloc(tmp); |
| 214 | } |
| 215 | } |
| 216 | } |
| 217 | |
| 218 | void realloc(Int64 old_size, Int64 new_size) |
| 219 | { |
| 220 | Int64 addition = new_size - old_size; |
| 221 | if (addition > 0) |
| 222 | alloc(addition); |
| 223 | else |
| 224 | free(-addition); |
| 225 | } |
| 226 | |
| 227 | void free(Int64 size) |
| 228 | { |
| 229 | if (auto memory_tracker = DB::CurrentThread::getMemoryTracker()) |
| 230 | { |
| 231 | Int64 & untracked = DB::CurrentThread::getUntrackedMemory(); |
| 232 | untracked -= size; |
| 233 | if (untracked < -untracked_memory_limit) |
| 234 | { |
| 235 | memory_tracker->free(-untracked); |
| 236 | untracked = 0; |
| 237 | } |
| 238 | } |
| 239 | } |
| 240 | } |
| 241 | |
| 242 | DB::SimpleActionLock getCurrentMemoryTrackerActionLock() |
| 243 | { |
| 244 | auto memory_tracker = DB::CurrentThread::getMemoryTracker(); |
| 245 | if (!memory_tracker) |
| 246 | return {}; |
| 247 | return memory_tracker->blocker.cancel(); |
| 248 | } |
| 249 | |