1 | #include <Interpreters/AsynchronousMetrics.h> |
2 | #include <Interpreters/ExpressionJIT.h> |
3 | #include <Common/Exception.h> |
4 | #include <Common/setThreadName.h> |
5 | #include <Common/CurrentMetrics.h> |
6 | #include <Common/typeid_cast.h> |
7 | #include "config_core.h" |
8 | #include <Storages/MarkCache.h> |
9 | #include <Storages/StorageMergeTree.h> |
10 | #include <Storages/StorageReplicatedMergeTree.h> |
11 | #include <IO/UncompressedCache.h> |
12 | #include <Databases/IDatabase.h> |
13 | #include <chrono> |
14 | |
15 | #if __has_include(<common/config_common.h>) |
16 | #include <common/config_common.h> |
17 | #endif |
18 | |
19 | #if USE_JEMALLOC |
20 | #include <jemalloc/jemalloc.h> |
21 | #endif |
22 | |
23 | |
24 | namespace DB |
25 | { |
26 | |
27 | AsynchronousMetrics::~AsynchronousMetrics() |
28 | { |
29 | try |
30 | { |
31 | { |
32 | std::lock_guard lock{wait_mutex}; |
33 | quit = true; |
34 | } |
35 | |
36 | wait_cond.notify_one(); |
37 | thread.join(); |
38 | } |
39 | catch (...) |
40 | { |
41 | DB::tryLogCurrentException(__PRETTY_FUNCTION__); |
42 | } |
43 | } |
44 | |
45 | |
46 | AsynchronousMetrics::Container AsynchronousMetrics::getValues() const |
47 | { |
48 | std::lock_guard lock{container_mutex}; |
49 | return container; |
50 | } |
51 | |
52 | |
53 | void AsynchronousMetrics::set(const std::string & name, Value value) |
54 | { |
55 | std::lock_guard lock{container_mutex}; |
56 | container[name] = value; |
57 | } |
58 | |
59 | |
60 | void AsynchronousMetrics::run() |
61 | { |
62 | setThreadName("AsyncMetrics" ); |
63 | |
64 | std::unique_lock lock{wait_mutex}; |
65 | |
66 | /// Next minute + 30 seconds. To be distant with moment of transmission of metrics, see MetricsTransmitter. |
67 | const auto get_next_minute = [] |
68 | { |
69 | return std::chrono::time_point_cast<std::chrono::minutes, std::chrono::system_clock>( |
70 | std::chrono::system_clock::now() + std::chrono::minutes(1)) + std::chrono::seconds(30); |
71 | }; |
72 | |
73 | while (true) |
74 | { |
75 | try |
76 | { |
77 | update(); |
78 | } |
79 | catch (...) |
80 | { |
81 | tryLogCurrentException(__PRETTY_FUNCTION__); |
82 | } |
83 | |
84 | if (wait_cond.wait_until(lock, get_next_minute(), [this] { return quit; })) |
85 | break; |
86 | } |
87 | } |
88 | |
89 | |
90 | template <typename Max, typename T> |
91 | static void calculateMax(Max & max, T x) |
92 | { |
93 | if (Max(x) > max) |
94 | max = x; |
95 | } |
96 | |
97 | template <typename Max, typename Sum, typename T> |
98 | static void calculateMaxAndSum(Max & max, Sum & sum, T x) |
99 | { |
100 | sum += x; |
101 | if (Max(x) > max) |
102 | max = x; |
103 | } |
104 | |
105 | |
106 | void AsynchronousMetrics::update() |
107 | { |
108 | { |
109 | if (auto mark_cache = context.getMarkCache()) |
110 | { |
111 | set("MarkCacheBytes" , mark_cache->weight()); |
112 | set("MarkCacheFiles" , mark_cache->count()); |
113 | } |
114 | } |
115 | |
116 | { |
117 | if (auto uncompressed_cache = context.getUncompressedCache()) |
118 | { |
119 | set("UncompressedCacheBytes" , uncompressed_cache->weight()); |
120 | set("UncompressedCacheCells" , uncompressed_cache->count()); |
121 | } |
122 | } |
123 | |
124 | #if USE_EMBEDDED_COMPILER |
125 | { |
126 | if (auto compiled_expression_cache = context.getCompiledExpressionCache()) |
127 | set("CompiledExpressionCacheCount" , compiled_expression_cache->count()); |
128 | } |
129 | #endif |
130 | |
131 | set("Uptime" , context.getUptimeSeconds()); |
132 | |
133 | { |
134 | auto databases = context.getDatabases(); |
135 | |
136 | size_t max_queue_size = 0; |
137 | size_t max_inserts_in_queue = 0; |
138 | size_t max_merges_in_queue = 0; |
139 | |
140 | size_t sum_queue_size = 0; |
141 | size_t sum_inserts_in_queue = 0; |
142 | size_t sum_merges_in_queue = 0; |
143 | |
144 | size_t max_absolute_delay = 0; |
145 | size_t max_relative_delay = 0; |
146 | |
147 | size_t max_part_count_for_partition = 0; |
148 | |
149 | size_t number_of_databases = databases.size(); |
150 | size_t total_number_of_tables = 0; |
151 | |
152 | for (const auto & db : databases) |
153 | { |
154 | /// Lazy database can not contain MergeTree tables |
155 | if (db.second->getEngineName() == "Lazy" ) |
156 | continue; |
157 | for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) |
158 | { |
159 | ++total_number_of_tables; |
160 | auto & table = iterator->table(); |
161 | StorageMergeTree * table_merge_tree = dynamic_cast<StorageMergeTree *>(table.get()); |
162 | StorageReplicatedMergeTree * table_replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(table.get()); |
163 | |
164 | if (table_replicated_merge_tree) |
165 | { |
166 | StorageReplicatedMergeTree::Status status; |
167 | table_replicated_merge_tree->getStatus(status, false); |
168 | |
169 | calculateMaxAndSum(max_queue_size, sum_queue_size, status.queue.queue_size); |
170 | calculateMaxAndSum(max_inserts_in_queue, sum_inserts_in_queue, status.queue.inserts_in_queue); |
171 | calculateMaxAndSum(max_merges_in_queue, sum_merges_in_queue, status.queue.merges_in_queue); |
172 | |
173 | if (!status.is_readonly) |
174 | { |
175 | try |
176 | { |
177 | time_t absolute_delay = 0; |
178 | time_t relative_delay = 0; |
179 | table_replicated_merge_tree->getReplicaDelays(absolute_delay, relative_delay); |
180 | |
181 | calculateMax(max_absolute_delay, absolute_delay); |
182 | calculateMax(max_relative_delay, relative_delay); |
183 | } |
184 | catch (...) |
185 | { |
186 | tryLogCurrentException(__PRETTY_FUNCTION__, |
187 | "Cannot get replica delay for table: " + backQuoteIfNeed(db.first) + "." + backQuoteIfNeed(iterator->name())); |
188 | } |
189 | } |
190 | |
191 | calculateMax(max_part_count_for_partition, table_replicated_merge_tree->getMaxPartsCountForPartition()); |
192 | } |
193 | |
194 | if (table_merge_tree) |
195 | { |
196 | calculateMax(max_part_count_for_partition, table_merge_tree->getMaxPartsCountForPartition()); |
197 | } |
198 | } |
199 | } |
200 | |
201 | set("ReplicasMaxQueueSize" , max_queue_size); |
202 | set("ReplicasMaxInsertsInQueue" , max_inserts_in_queue); |
203 | set("ReplicasMaxMergesInQueue" , max_merges_in_queue); |
204 | |
205 | set("ReplicasSumQueueSize" , sum_queue_size); |
206 | set("ReplicasSumInsertsInQueue" , sum_inserts_in_queue); |
207 | set("ReplicasSumMergesInQueue" , sum_merges_in_queue); |
208 | |
209 | set("ReplicasMaxAbsoluteDelay" , max_absolute_delay); |
210 | set("ReplicasMaxRelativeDelay" , max_relative_delay); |
211 | |
212 | set("MaxPartCountForPartition" , max_part_count_for_partition); |
213 | |
214 | set("NumberOfDatabases" , number_of_databases); |
215 | set("NumberOfTables" , total_number_of_tables); |
216 | } |
217 | |
218 | #if USE_JEMALLOC |
219 | { |
220 | #define FOR_EACH_METRIC(M) \ |
221 | M("allocated", size_t) \ |
222 | M("active", size_t) \ |
223 | M("metadata", size_t) \ |
224 | M("metadata_thp", size_t) \ |
225 | M("resident", size_t) \ |
226 | M("mapped", size_t) \ |
227 | M("retained", size_t) \ |
228 | M("background_thread.num_threads", size_t) \ |
229 | M("background_thread.num_runs", uint64_t) \ |
230 | M("background_thread.run_interval", uint64_t) \ |
231 | |
232 | #define GET_METRIC(NAME, TYPE) \ |
233 | do \ |
234 | { \ |
235 | TYPE value{}; \ |
236 | size_t size = sizeof(value); \ |
237 | mallctl("stats." NAME, &value, &size, nullptr, 0); \ |
238 | set("jemalloc." NAME, value); \ |
239 | } while (0); |
240 | |
241 | FOR_EACH_METRIC(GET_METRIC) |
242 | |
243 | #undef GET_METRIC |
244 | #undef FOR_EACH_METRIC |
245 | } |
246 | #endif |
247 | |
248 | /// Add more metrics as you wish. |
249 | } |
250 | |
251 | |
252 | } |
253 | |