1 | #include <cstdlib> |
2 | |
3 | #include "MemoryTracker.h" |
4 | #include <common/likely.h> |
5 | #include <common/logger_useful.h> |
6 | #include <Common/Exception.h> |
7 | #include <Common/formatReadable.h> |
8 | #include <Common/CurrentThread.h> |
9 | #include <IO/WriteHelpers.h> |
10 | |
11 | |
12 | namespace DB |
13 | { |
14 | namespace ErrorCodes |
15 | { |
16 | extern const int MEMORY_LIMIT_EXCEEDED; |
17 | } |
18 | } |
19 | |
20 | |
21 | static constexpr size_t log_peak_memory_usage_every = 1ULL << 30; |
22 | /// Each thread could new/delete memory in range of (-untracked_memory_limit, untracked_memory_limit) without access to common counters. |
23 | static constexpr Int64 untracked_memory_limit = 4 * 1024 * 1024; |
24 | |
25 | |
26 | MemoryTracker::~MemoryTracker() |
27 | { |
28 | if (static_cast<int>(level) < static_cast<int>(VariableContext::Process) && peak) |
29 | { |
30 | try |
31 | { |
32 | logPeakMemoryUsage(); |
33 | } |
34 | catch (...) |
35 | { |
36 | /// Exception in Logger, intentionally swallow. |
37 | } |
38 | } |
39 | |
40 | /** This is needed for next memory tracker to be consistent with sum of all referring memory trackers. |
41 | * |
42 | * Sometimes, memory tracker could be destroyed before memory was freed, and on destruction, amount > 0. |
43 | * For example, a query could allocate some data and leave it in cache. |
44 | * |
45 | * If memory will be freed outside of context of this memory tracker, |
46 | * but in context of one of the 'next' memory trackers, |
47 | * then memory usage of 'next' memory trackers will be underestimated, |
48 | * because amount will be decreased twice (first - here, second - when real 'free' happens). |
49 | */ |
50 | if (auto value = amount.load(std::memory_order_relaxed)) |
51 | free(value); |
52 | } |
53 | |
54 | |
55 | void MemoryTracker::logPeakMemoryUsage() const |
56 | { |
57 | LOG_DEBUG(&Logger::get("MemoryTracker" ), |
58 | "Peak memory usage" << (description ? " " + std::string(description) : "" ) |
59 | << ": " << formatReadableSizeWithBinarySuffix(peak) << "." ); |
60 | } |
61 | |
62 | static void logMemoryUsage(Int64 amount) |
63 | { |
64 | LOG_DEBUG(&Logger::get("MemoryTracker" ), |
65 | "Current memory usage: " << formatReadableSizeWithBinarySuffix(amount) << "." ); |
66 | } |
67 | |
68 | |
69 | |
70 | void MemoryTracker::alloc(Int64 size) |
71 | { |
72 | if (blocker.isCancelled()) |
73 | return; |
74 | |
75 | /** Using memory_order_relaxed means that if allocations are done simultaneously, |
76 | * we allow exception about memory limit exceeded to be thrown only on next allocation. |
77 | * So, we allow over-allocations. |
78 | */ |
79 | Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed); |
80 | |
81 | if (metric != CurrentMetrics::end()) |
82 | CurrentMetrics::add(metric, size); |
83 | |
84 | Int64 current_limit = limit.load(std::memory_order_relaxed); |
85 | |
86 | /// Using non-thread-safe random number generator. Joint distribution in different threads would not be uniform. |
87 | /// In this case, it doesn't matter. |
88 | if (unlikely(fault_probability && drand48() < fault_probability)) |
89 | { |
90 | free(size); |
91 | |
92 | /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc |
93 | auto untrack_lock = blocker.cancel(); |
94 | |
95 | std::stringstream message; |
96 | message << "Memory tracker" ; |
97 | if (description) |
98 | message << " " << description; |
99 | message << ": fault injected. Would use " << formatReadableSizeWithBinarySuffix(will_be) |
100 | << " (attempt to allocate chunk of " << size << " bytes)" |
101 | << ", maximum: " << formatReadableSizeWithBinarySuffix(current_limit); |
102 | |
103 | throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED); |
104 | } |
105 | |
106 | if (unlikely(current_limit && will_be > current_limit)) |
107 | { |
108 | free(size); |
109 | |
110 | /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc |
111 | auto untrack_lock = blocker.cancel(); |
112 | |
113 | std::stringstream message; |
114 | message << "Memory limit" ; |
115 | if (description) |
116 | message << " " << description; |
117 | message << " exceeded: would use " << formatReadableSizeWithBinarySuffix(will_be) |
118 | << " (attempt to allocate chunk of " << size << " bytes)" |
119 | << ", maximum: " << formatReadableSizeWithBinarySuffix(current_limit); |
120 | |
121 | throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED); |
122 | } |
123 | |
124 | auto peak_old = peak.load(std::memory_order_relaxed); |
125 | if (will_be > peak_old) /// Races doesn't matter. Could rewrite with CAS, but not worth. |
126 | { |
127 | peak.store(will_be, std::memory_order_relaxed); |
128 | |
129 | if (level == VariableContext::Process && will_be / log_peak_memory_usage_every > peak_old / log_peak_memory_usage_every) |
130 | logMemoryUsage(will_be); |
131 | } |
132 | |
133 | if (auto loaded_next = parent.load(std::memory_order_relaxed)) |
134 | loaded_next->alloc(size); |
135 | } |
136 | |
137 | |
138 | void MemoryTracker::free(Int64 size) |
139 | { |
140 | if (blocker.isCancelled()) |
141 | return; |
142 | |
143 | if (level == VariableContext::Thread) |
144 | { |
145 | /// Could become negative if memory allocated in this thread is freed in another one |
146 | amount.fetch_sub(size, std::memory_order_relaxed); |
147 | } |
148 | else |
149 | { |
150 | Int64 new_amount = amount.fetch_sub(size, std::memory_order_relaxed) - size; |
151 | |
152 | /** Sometimes, query could free some data, that was allocated outside of query context. |
153 | * Example: cache eviction. |
154 | * To avoid negative memory usage, we "saturate" amount. |
155 | * Memory usage will be calculated with some error. |
156 | * NOTE: The code is not atomic. Not worth to fix. |
157 | */ |
158 | if (unlikely(new_amount < 0)) |
159 | { |
160 | amount.fetch_sub(new_amount); |
161 | size += new_amount; |
162 | } |
163 | } |
164 | |
165 | if (auto loaded_next = parent.load(std::memory_order_relaxed)) |
166 | loaded_next->free(size); |
167 | |
168 | if (metric != CurrentMetrics::end()) |
169 | CurrentMetrics::sub(metric, size); |
170 | } |
171 | |
172 | |
173 | void MemoryTracker::resetCounters() |
174 | { |
175 | amount.store(0, std::memory_order_relaxed); |
176 | peak.store(0, std::memory_order_relaxed); |
177 | limit.store(0, std::memory_order_relaxed); |
178 | } |
179 | |
180 | |
181 | void MemoryTracker::reset() |
182 | { |
183 | if (metric != CurrentMetrics::end()) |
184 | CurrentMetrics::sub(metric, amount.load(std::memory_order_relaxed)); |
185 | |
186 | resetCounters(); |
187 | } |
188 | |
189 | |
190 | void MemoryTracker::setOrRaiseLimit(Int64 value) |
191 | { |
192 | /// This is just atomic set to maximum. |
193 | Int64 old_value = limit.load(std::memory_order_relaxed); |
194 | while (old_value < value && !limit.compare_exchange_weak(old_value, value)) |
195 | ; |
196 | } |
197 | |
198 | |
199 | namespace CurrentMemoryTracker |
200 | { |
201 | void alloc(Int64 size) |
202 | { |
203 | if (auto memory_tracker = DB::CurrentThread::getMemoryTracker()) |
204 | { |
205 | Int64 & untracked = DB::CurrentThread::getUntrackedMemory(); |
206 | untracked += size; |
207 | if (untracked > untracked_memory_limit) |
208 | { |
209 | /// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes |
210 | /// more. It could be usefull for enlarge Exception message in rethrow logic. |
211 | Int64 tmp = untracked; |
212 | untracked = 0; |
213 | memory_tracker->alloc(tmp); |
214 | } |
215 | } |
216 | } |
217 | |
218 | void realloc(Int64 old_size, Int64 new_size) |
219 | { |
220 | Int64 addition = new_size - old_size; |
221 | if (addition > 0) |
222 | alloc(addition); |
223 | else |
224 | free(-addition); |
225 | } |
226 | |
227 | void free(Int64 size) |
228 | { |
229 | if (auto memory_tracker = DB::CurrentThread::getMemoryTracker()) |
230 | { |
231 | Int64 & untracked = DB::CurrentThread::getUntrackedMemory(); |
232 | untracked -= size; |
233 | if (untracked < -untracked_memory_limit) |
234 | { |
235 | memory_tracker->free(-untracked); |
236 | untracked = 0; |
237 | } |
238 | } |
239 | } |
240 | } |
241 | |
242 | DB::SimpleActionLock getCurrentMemoryTrackerActionLock() |
243 | { |
244 | auto memory_tracker = DB::CurrentThread::getMemoryTracker(); |
245 | if (!memory_tracker) |
246 | return {}; |
247 | return memory_tracker->blocker.cancel(); |
248 | } |
249 | |