1#include <cstdlib>
2
3#include "MemoryTracker.h"
4#include <common/likely.h>
5#include <common/logger_useful.h>
6#include <Common/Exception.h>
7#include <Common/formatReadable.h>
8#include <Common/CurrentThread.h>
9#include <IO/WriteHelpers.h>
10
11
12namespace DB
13{
14 namespace ErrorCodes
15 {
16 extern const int MEMORY_LIMIT_EXCEEDED;
17 }
18}
19
20
21static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
22/// Each thread could new/delete memory in range of (-untracked_memory_limit, untracked_memory_limit) without access to common counters.
23static constexpr Int64 untracked_memory_limit = 4 * 1024 * 1024;
24
25
26MemoryTracker::~MemoryTracker()
27{
28 if (static_cast<int>(level) < static_cast<int>(VariableContext::Process) && peak)
29 {
30 try
31 {
32 logPeakMemoryUsage();
33 }
34 catch (...)
35 {
36 /// Exception in Logger, intentionally swallow.
37 }
38 }
39
40 /** This is needed for next memory tracker to be consistent with sum of all referring memory trackers.
41 *
42 * Sometimes, memory tracker could be destroyed before memory was freed, and on destruction, amount > 0.
43 * For example, a query could allocate some data and leave it in cache.
44 *
45 * If memory will be freed outside of context of this memory tracker,
46 * but in context of one of the 'next' memory trackers,
47 * then memory usage of 'next' memory trackers will be underestimated,
48 * because amount will be decreased twice (first - here, second - when real 'free' happens).
49 */
50 if (auto value = amount.load(std::memory_order_relaxed))
51 free(value);
52}
53
54
55void MemoryTracker::logPeakMemoryUsage() const
56{
57 LOG_DEBUG(&Logger::get("MemoryTracker"),
58 "Peak memory usage" << (description ? " " + std::string(description) : "")
59 << ": " << formatReadableSizeWithBinarySuffix(peak) << ".");
60}
61
62static void logMemoryUsage(Int64 amount)
63{
64 LOG_DEBUG(&Logger::get("MemoryTracker"),
65 "Current memory usage: " << formatReadableSizeWithBinarySuffix(amount) << ".");
66}
67
68
69
70void MemoryTracker::alloc(Int64 size)
71{
72 if (blocker.isCancelled())
73 return;
74
75 /** Using memory_order_relaxed means that if allocations are done simultaneously,
76 * we allow exception about memory limit exceeded to be thrown only on next allocation.
77 * So, we allow over-allocations.
78 */
79 Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed);
80
81 if (metric != CurrentMetrics::end())
82 CurrentMetrics::add(metric, size);
83
84 Int64 current_limit = limit.load(std::memory_order_relaxed);
85
86 /// Using non-thread-safe random number generator. Joint distribution in different threads would not be uniform.
87 /// In this case, it doesn't matter.
88 if (unlikely(fault_probability && drand48() < fault_probability))
89 {
90 free(size);
91
92 /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
93 auto untrack_lock = blocker.cancel();
94
95 std::stringstream message;
96 message << "Memory tracker";
97 if (description)
98 message << " " << description;
99 message << ": fault injected. Would use " << formatReadableSizeWithBinarySuffix(will_be)
100 << " (attempt to allocate chunk of " << size << " bytes)"
101 << ", maximum: " << formatReadableSizeWithBinarySuffix(current_limit);
102
103 throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
104 }
105
106 if (unlikely(current_limit && will_be > current_limit))
107 {
108 free(size);
109
110 /// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
111 auto untrack_lock = blocker.cancel();
112
113 std::stringstream message;
114 message << "Memory limit";
115 if (description)
116 message << " " << description;
117 message << " exceeded: would use " << formatReadableSizeWithBinarySuffix(will_be)
118 << " (attempt to allocate chunk of " << size << " bytes)"
119 << ", maximum: " << formatReadableSizeWithBinarySuffix(current_limit);
120
121 throw DB::Exception(message.str(), DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED);
122 }
123
124 auto peak_old = peak.load(std::memory_order_relaxed);
125 if (will_be > peak_old) /// Races doesn't matter. Could rewrite with CAS, but not worth.
126 {
127 peak.store(will_be, std::memory_order_relaxed);
128
129 if (level == VariableContext::Process && will_be / log_peak_memory_usage_every > peak_old / log_peak_memory_usage_every)
130 logMemoryUsage(will_be);
131 }
132
133 if (auto loaded_next = parent.load(std::memory_order_relaxed))
134 loaded_next->alloc(size);
135}
136
137
138void MemoryTracker::free(Int64 size)
139{
140 if (blocker.isCancelled())
141 return;
142
143 if (level == VariableContext::Thread)
144 {
145 /// Could become negative if memory allocated in this thread is freed in another one
146 amount.fetch_sub(size, std::memory_order_relaxed);
147 }
148 else
149 {
150 Int64 new_amount = amount.fetch_sub(size, std::memory_order_relaxed) - size;
151
152 /** Sometimes, query could free some data, that was allocated outside of query context.
153 * Example: cache eviction.
154 * To avoid negative memory usage, we "saturate" amount.
155 * Memory usage will be calculated with some error.
156 * NOTE: The code is not atomic. Not worth to fix.
157 */
158 if (unlikely(new_amount < 0))
159 {
160 amount.fetch_sub(new_amount);
161 size += new_amount;
162 }
163 }
164
165 if (auto loaded_next = parent.load(std::memory_order_relaxed))
166 loaded_next->free(size);
167
168 if (metric != CurrentMetrics::end())
169 CurrentMetrics::sub(metric, size);
170}
171
172
173void MemoryTracker::resetCounters()
174{
175 amount.store(0, std::memory_order_relaxed);
176 peak.store(0, std::memory_order_relaxed);
177 limit.store(0, std::memory_order_relaxed);
178}
179
180
181void MemoryTracker::reset()
182{
183 if (metric != CurrentMetrics::end())
184 CurrentMetrics::sub(metric, amount.load(std::memory_order_relaxed));
185
186 resetCounters();
187}
188
189
190void MemoryTracker::setOrRaiseLimit(Int64 value)
191{
192 /// This is just atomic set to maximum.
193 Int64 old_value = limit.load(std::memory_order_relaxed);
194 while (old_value < value && !limit.compare_exchange_weak(old_value, value))
195 ;
196}
197
198
199namespace CurrentMemoryTracker
200{
201 void alloc(Int64 size)
202 {
203 if (auto memory_tracker = DB::CurrentThread::getMemoryTracker())
204 {
205 Int64 & untracked = DB::CurrentThread::getUntrackedMemory();
206 untracked += size;
207 if (untracked > untracked_memory_limit)
208 {
209 /// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
210 /// more. It could be usefull for enlarge Exception message in rethrow logic.
211 Int64 tmp = untracked;
212 untracked = 0;
213 memory_tracker->alloc(tmp);
214 }
215 }
216 }
217
218 void realloc(Int64 old_size, Int64 new_size)
219 {
220 Int64 addition = new_size - old_size;
221 if (addition > 0)
222 alloc(addition);
223 else
224 free(-addition);
225 }
226
227 void free(Int64 size)
228 {
229 if (auto memory_tracker = DB::CurrentThread::getMemoryTracker())
230 {
231 Int64 & untracked = DB::CurrentThread::getUntrackedMemory();
232 untracked -= size;
233 if (untracked < -untracked_memory_limit)
234 {
235 memory_tracker->free(-untracked);
236 untracked = 0;
237 }
238 }
239 }
240}
241
242DB::SimpleActionLock getCurrentMemoryTrackerActionLock()
243{
244 auto memory_tracker = DB::CurrentThread::getMemoryTracker();
245 if (!memory_tracker)
246 return {};
247 return memory_tracker->blocker.cancel();
248}
249