| 1 | #include <cstdlib> | 
|---|
| 2 |  | 
|---|
| 3 | #include "MemoryTracker.h" | 
|---|
| 4 | #include <common/likely.h> | 
|---|
| 5 | #include <common/logger_useful.h> | 
|---|
| 6 | #include <Common/Exception.h> | 
|---|
| 7 | #include <Common/formatReadable.h> | 
|---|
| 8 | #include <Common/CurrentThread.h> | 
|---|
| 9 | #include <IO/WriteHelpers.h> | 
|---|
| 10 |  | 
|---|
| 11 |  | 
|---|
| 12 | namespace DB | 
|---|
| 13 | { | 
|---|
| 14 | namespace ErrorCodes | 
|---|
| 15 | { | 
|---|
| 16 | extern const int MEMORY_LIMIT_EXCEEDED; | 
|---|
| 17 | } | 
|---|
| 18 | } | 
|---|
| 19 |  | 
|---|
| 20 |  | 
|---|
| 21 | static constexpr size_t log_peak_memory_usage_every = 1ULL << 30; | 
|---|
| 22 | /// Each thread could new/delete memory in range of (-untracked_memory_limit, untracked_memory_limit) without access to common counters. | 
|---|
| 23 | static constexpr Int64 untracked_memory_limit = 4 * 1024 * 1024; | 
|---|
| 24 |  | 
|---|
| 25 |  | 
|---|
| 26 | MemoryTracker::~MemoryTracker() | 
|---|
| 27 | { | 
|---|
| 28 | if (static_cast<int>(level) < static_cast<int>(VariableContext::Process) && peak) | 
|---|
| 29 | { | 
|---|
| 30 | try | 
|---|
| 31 | { | 
|---|
| 32 | logPeakMemoryUsage(); | 
|---|
| 33 | } | 
|---|
| 34 | catch (...) | 
|---|
| 35 | { | 
|---|
| 36 | /// Exception in Logger, intentionally swallow. | 
|---|
| 37 | } | 
|---|
| 38 | } | 
|---|
| 39 |  | 
|---|
| 40 | /** This is needed for next memory tracker to be consistent with sum of all referring memory trackers. | 
|---|
| 41 | * | 
|---|
| 42 | * Sometimes, memory tracker could be destroyed before memory was freed, and on destruction, amount > 0. | 
|---|
| 43 | * For example, a query could allocate some data and leave it in cache. | 
|---|
| 44 | * | 
|---|
| 45 | * If memory will be freed outside of context of this memory tracker, | 
|---|
| 46 | *  but in context of one of the 'next' memory trackers, | 
|---|
| 47 | *  then memory usage of 'next' memory trackers will be underestimated, | 
|---|
| 48 | *  because amount will be decreased twice (first - here, second - when real 'free' happens). | 
|---|
| 49 | */ | 
|---|
| 50 | if (auto value = amount.load(std::memory_order_relaxed)) | 
|---|
| 51 | free(value); | 
|---|
| 52 | } | 
|---|
| 53 |  | 
|---|
| 54 |  | 
|---|
| 55 | void MemoryTracker::logPeakMemoryUsage() const | 
|---|
| 56 | { | 
|---|
| 57 | LOG_DEBUG(&Logger::get( "MemoryTracker"), | 
|---|
| 58 | "Peak memory usage"<< (description ? " "+ std::string(description) : "") | 
|---|
| 59 | << ": "<< formatReadableSizeWithBinarySuffix(peak) << "."); | 
|---|
| 60 | } | 
|---|
| 61 |  | 
|---|
| 62 | static void logMemoryUsage(Int64 amount) | 
|---|
| 63 | { | 
|---|
| 64 | LOG_DEBUG(&Logger::get( "MemoryTracker"), | 
|---|
| 65 | "Current memory usage: "<< formatReadableSizeWithBinarySuffix(amount) << "."); | 
|---|
| 66 | } | 
|---|
| 67 |  | 
|---|
| 68 |  | 
|---|
| 69 |  | 
|---|
| 70 | void MemoryTracker::alloc(Int64 size) | 
|---|
| 71 | { | 
|---|
| 72 | if (blocker.isCancelled()) | 
|---|
| 73 | return; | 
|---|
| 74 |  | 
|---|
| 75 | /** Using memory_order_relaxed means that if allocations are done simultaneously, | 
|---|
| 76 | *  we allow exception about memory limit exceeded to be thrown only on next allocation. | 
|---|
| 77 | * So, we allow over-allocations. | 
|---|
| 78 | */ | 
|---|
| 79 | Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed); | 
|---|
| 80 |  | 
|---|
| 81 | if (metric != CurrentMetrics::end()) | 
|---|
| 82 | CurrentMetrics::add(metric, size); | 
|---|
| 83 |  | 
|---|
| 84 | Int64 current_limit = limit.load(std::memory_order_relaxed); | 
|---|
| 85 |  | 
|---|
| 86 | /// Using non-thread-safe random number generator. Joint distribution in different threads would not be uniform. | 
|---|
| 87 | /// In this case, it doesn't matter. | 
|---|
| 88 | if (unlikely(fault_probability && drand48() < fault_probability)) | 
|---|
| 89 | { | 
|---|
| 90 | free(size); | 
|---|
| 91 |  | 
|---|
| 92 | /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc | 
|---|
| 93 | auto untrack_lock = blocker.cancel(); | 
|---|
| 94 |  | 
|---|
| 95 | std::stringstream message; | 
|---|
| 96 | message << "Memory tracker"; | 
|---|
| 97 | if (description) | 
|---|
| 98 | message << " "<< description; | 
|---|
| 99 | message << ": fault injected. Would use "<< formatReadableSizeWithBinarySuffix(will_be) | 
|---|
| 100 | << " (attempt to allocate chunk of "<< size << " bytes)" | 
|---|
| 101 | << ", maximum: "<< formatReadableSizeWithBinarySuffix(current_limit); | 
|---|
| 102 |  | 
|---|
| 103 | throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED); | 
|---|
| 104 | } | 
|---|
| 105 |  | 
|---|
| 106 | if (unlikely(current_limit && will_be > current_limit)) | 
|---|
| 107 | { | 
|---|
| 108 | free(size); | 
|---|
| 109 |  | 
|---|
| 110 | /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc | 
|---|
| 111 | auto untrack_lock = blocker.cancel(); | 
|---|
| 112 |  | 
|---|
| 113 | std::stringstream message; | 
|---|
| 114 | message << "Memory limit"; | 
|---|
| 115 | if (description) | 
|---|
| 116 | message << " "<< description; | 
|---|
| 117 | message << " exceeded: would use "<< formatReadableSizeWithBinarySuffix(will_be) | 
|---|
| 118 | << " (attempt to allocate chunk of "<< size << " bytes)" | 
|---|
| 119 | << ", maximum: "<< formatReadableSizeWithBinarySuffix(current_limit); | 
|---|
| 120 |  | 
|---|
| 121 | throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED); | 
|---|
| 122 | } | 
|---|
| 123 |  | 
|---|
| 124 | auto peak_old = peak.load(std::memory_order_relaxed); | 
|---|
| 125 | if (will_be > peak_old)        /// Races doesn't matter. Could rewrite with CAS, but not worth. | 
|---|
| 126 | { | 
|---|
| 127 | peak.store(will_be, std::memory_order_relaxed); | 
|---|
| 128 |  | 
|---|
| 129 | if (level == VariableContext::Process && will_be / log_peak_memory_usage_every > peak_old / log_peak_memory_usage_every) | 
|---|
| 130 | logMemoryUsage(will_be); | 
|---|
| 131 | } | 
|---|
| 132 |  | 
|---|
| 133 | if (auto loaded_next = parent.load(std::memory_order_relaxed)) | 
|---|
| 134 | loaded_next->alloc(size); | 
|---|
| 135 | } | 
|---|
| 136 |  | 
|---|
| 137 |  | 
|---|
| 138 | void MemoryTracker::free(Int64 size) | 
|---|
| 139 | { | 
|---|
| 140 | if (blocker.isCancelled()) | 
|---|
| 141 | return; | 
|---|
| 142 |  | 
|---|
| 143 | if (level == VariableContext::Thread) | 
|---|
| 144 | { | 
|---|
| 145 | /// Could become negative if memory allocated in this thread is freed in another one | 
|---|
| 146 | amount.fetch_sub(size, std::memory_order_relaxed); | 
|---|
| 147 | } | 
|---|
| 148 | else | 
|---|
| 149 | { | 
|---|
| 150 | Int64 new_amount = amount.fetch_sub(size, std::memory_order_relaxed) - size; | 
|---|
| 151 |  | 
|---|
| 152 | /** Sometimes, query could free some data, that was allocated outside of query context. | 
|---|
| 153 | * Example: cache eviction. | 
|---|
| 154 | * To avoid negative memory usage, we "saturate" amount. | 
|---|
| 155 | * Memory usage will be calculated with some error. | 
|---|
| 156 | * NOTE: The code is not atomic. Not worth to fix. | 
|---|
| 157 | */ | 
|---|
| 158 | if (unlikely(new_amount < 0)) | 
|---|
| 159 | { | 
|---|
| 160 | amount.fetch_sub(new_amount); | 
|---|
| 161 | size += new_amount; | 
|---|
| 162 | } | 
|---|
| 163 | } | 
|---|
| 164 |  | 
|---|
| 165 | if (auto loaded_next = parent.load(std::memory_order_relaxed)) | 
|---|
| 166 | loaded_next->free(size); | 
|---|
| 167 |  | 
|---|
| 168 | if (metric != CurrentMetrics::end()) | 
|---|
| 169 | CurrentMetrics::sub(metric, size); | 
|---|
| 170 | } | 
|---|
| 171 |  | 
|---|
| 172 |  | 
|---|
| 173 | void MemoryTracker::resetCounters() | 
|---|
| 174 | { | 
|---|
| 175 | amount.store(0, std::memory_order_relaxed); | 
|---|
| 176 | peak.store(0, std::memory_order_relaxed); | 
|---|
| 177 | limit.store(0, std::memory_order_relaxed); | 
|---|
| 178 | } | 
|---|
| 179 |  | 
|---|
| 180 |  | 
|---|
| 181 | void MemoryTracker::reset() | 
|---|
| 182 | { | 
|---|
| 183 | if (metric != CurrentMetrics::end()) | 
|---|
| 184 | CurrentMetrics::sub(metric, amount.load(std::memory_order_relaxed)); | 
|---|
| 185 |  | 
|---|
| 186 | resetCounters(); | 
|---|
| 187 | } | 
|---|
| 188 |  | 
|---|
| 189 |  | 
|---|
| 190 | void MemoryTracker::setOrRaiseLimit(Int64 value) | 
|---|
| 191 | { | 
|---|
| 192 | /// This is just atomic set to maximum. | 
|---|
| 193 | Int64 old_value = limit.load(std::memory_order_relaxed); | 
|---|
| 194 | while (old_value < value && !limit.compare_exchange_weak(old_value, value)) | 
|---|
| 195 | ; | 
|---|
| 196 | } | 
|---|
| 197 |  | 
|---|
| 198 |  | 
|---|
| 199 | namespace CurrentMemoryTracker | 
|---|
| 200 | { | 
|---|
| 201 | void alloc(Int64 size) | 
|---|
| 202 | { | 
|---|
| 203 | if (auto memory_tracker = DB::CurrentThread::getMemoryTracker()) | 
|---|
| 204 | { | 
|---|
| 205 | Int64 & untracked = DB::CurrentThread::getUntrackedMemory(); | 
|---|
| 206 | untracked += size; | 
|---|
| 207 | if (untracked > untracked_memory_limit) | 
|---|
| 208 | { | 
|---|
| 209 | /// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes | 
|---|
| 210 | /// more. It could be usefull for enlarge Exception message in rethrow logic. | 
|---|
| 211 | Int64 tmp = untracked; | 
|---|
| 212 | untracked = 0; | 
|---|
| 213 | memory_tracker->alloc(tmp); | 
|---|
| 214 | } | 
|---|
| 215 | } | 
|---|
| 216 | } | 
|---|
| 217 |  | 
|---|
| 218 | void realloc(Int64 old_size, Int64 new_size) | 
|---|
| 219 | { | 
|---|
| 220 | Int64 addition = new_size - old_size; | 
|---|
| 221 | if (addition > 0) | 
|---|
| 222 | alloc(addition); | 
|---|
| 223 | else | 
|---|
| 224 | free(-addition); | 
|---|
| 225 | } | 
|---|
| 226 |  | 
|---|
| 227 | void free(Int64 size) | 
|---|
| 228 | { | 
|---|
| 229 | if (auto memory_tracker = DB::CurrentThread::getMemoryTracker()) | 
|---|
| 230 | { | 
|---|
| 231 | Int64 & untracked = DB::CurrentThread::getUntrackedMemory(); | 
|---|
| 232 | untracked -= size; | 
|---|
| 233 | if (untracked < -untracked_memory_limit) | 
|---|
| 234 | { | 
|---|
| 235 | memory_tracker->free(-untracked); | 
|---|
| 236 | untracked = 0; | 
|---|
| 237 | } | 
|---|
| 238 | } | 
|---|
| 239 | } | 
|---|
| 240 | } | 
|---|
| 241 |  | 
|---|
| 242 | DB::SimpleActionLock getCurrentMemoryTrackerActionLock() | 
|---|
| 243 | { | 
|---|
| 244 | auto memory_tracker = DB::CurrentThread::getMemoryTracker(); | 
|---|
| 245 | if (!memory_tracker) | 
|---|
| 246 | return {}; | 
|---|
| 247 | return memory_tracker->blocker.cancel(); | 
|---|
| 248 | } | 
|---|
| 249 |  | 
|---|