1#include <Interpreters/AsynchronousMetrics.h>
2#include <Interpreters/ExpressionJIT.h>
3#include <Common/Exception.h>
4#include <Common/setThreadName.h>
5#include <Common/CurrentMetrics.h>
6#include <Common/typeid_cast.h>
7#include "config_core.h"
8#include <Storages/MarkCache.h>
9#include <Storages/StorageMergeTree.h>
10#include <Storages/StorageReplicatedMergeTree.h>
11#include <IO/UncompressedCache.h>
12#include <Databases/IDatabase.h>
13#include <chrono>
14
15#if __has_include(<common/config_common.h>)
16#include <common/config_common.h>
17#endif
18
19#if USE_JEMALLOC
20 #include <jemalloc/jemalloc.h>
21#endif
22
23
24namespace DB
25{
26
27AsynchronousMetrics::~AsynchronousMetrics()
28{
29 try
30 {
31 {
32 std::lock_guard lock{wait_mutex};
33 quit = true;
34 }
35
36 wait_cond.notify_one();
37 thread.join();
38 }
39 catch (...)
40 {
41 DB::tryLogCurrentException(__PRETTY_FUNCTION__);
42 }
43}
44
45
46AsynchronousMetrics::Container AsynchronousMetrics::getValues() const
47{
48 std::lock_guard lock{container_mutex};
49 return container;
50}
51
52
53void AsynchronousMetrics::set(const std::string & name, Value value)
54{
55 std::lock_guard lock{container_mutex};
56 container[name] = value;
57}
58
59
60void AsynchronousMetrics::run()
61{
62 setThreadName("AsyncMetrics");
63
64 std::unique_lock lock{wait_mutex};
65
66 /// Next minute + 30 seconds. To be distant with moment of transmission of metrics, see MetricsTransmitter.
67 const auto get_next_minute = []
68 {
69 return std::chrono::time_point_cast<std::chrono::minutes, std::chrono::system_clock>(
70 std::chrono::system_clock::now() + std::chrono::minutes(1)) + std::chrono::seconds(30);
71 };
72
73 while (true)
74 {
75 try
76 {
77 update();
78 }
79 catch (...)
80 {
81 tryLogCurrentException(__PRETTY_FUNCTION__);
82 }
83
84 if (wait_cond.wait_until(lock, get_next_minute(), [this] { return quit; }))
85 break;
86 }
87}
88
89
90template <typename Max, typename T>
91static void calculateMax(Max & max, T x)
92{
93 if (Max(x) > max)
94 max = x;
95}
96
97template <typename Max, typename Sum, typename T>
98static void calculateMaxAndSum(Max & max, Sum & sum, T x)
99{
100 sum += x;
101 if (Max(x) > max)
102 max = x;
103}
104
105
106void AsynchronousMetrics::update()
107{
108 {
109 if (auto mark_cache = context.getMarkCache())
110 {
111 set("MarkCacheBytes", mark_cache->weight());
112 set("MarkCacheFiles", mark_cache->count());
113 }
114 }
115
116 {
117 if (auto uncompressed_cache = context.getUncompressedCache())
118 {
119 set("UncompressedCacheBytes", uncompressed_cache->weight());
120 set("UncompressedCacheCells", uncompressed_cache->count());
121 }
122 }
123
124#if USE_EMBEDDED_COMPILER
125 {
126 if (auto compiled_expression_cache = context.getCompiledExpressionCache())
127 set("CompiledExpressionCacheCount", compiled_expression_cache->count());
128 }
129#endif
130
131 set("Uptime", context.getUptimeSeconds());
132
133 {
134 auto databases = context.getDatabases();
135
136 size_t max_queue_size = 0;
137 size_t max_inserts_in_queue = 0;
138 size_t max_merges_in_queue = 0;
139
140 size_t sum_queue_size = 0;
141 size_t sum_inserts_in_queue = 0;
142 size_t sum_merges_in_queue = 0;
143
144 size_t max_absolute_delay = 0;
145 size_t max_relative_delay = 0;
146
147 size_t max_part_count_for_partition = 0;
148
149 size_t number_of_databases = databases.size();
150 size_t total_number_of_tables = 0;
151
152 for (const auto & db : databases)
153 {
154 /// Lazy database can not contain MergeTree tables
155 if (db.second->getEngineName() == "Lazy")
156 continue;
157 for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
158 {
159 ++total_number_of_tables;
160 auto & table = iterator->table();
161 StorageMergeTree * table_merge_tree = dynamic_cast<StorageMergeTree *>(table.get());
162 StorageReplicatedMergeTree * table_replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(table.get());
163
164 if (table_replicated_merge_tree)
165 {
166 StorageReplicatedMergeTree::Status status;
167 table_replicated_merge_tree->getStatus(status, false);
168
169 calculateMaxAndSum(max_queue_size, sum_queue_size, status.queue.queue_size);
170 calculateMaxAndSum(max_inserts_in_queue, sum_inserts_in_queue, status.queue.inserts_in_queue);
171 calculateMaxAndSum(max_merges_in_queue, sum_merges_in_queue, status.queue.merges_in_queue);
172
173 if (!status.is_readonly)
174 {
175 try
176 {
177 time_t absolute_delay = 0;
178 time_t relative_delay = 0;
179 table_replicated_merge_tree->getReplicaDelays(absolute_delay, relative_delay);
180
181 calculateMax(max_absolute_delay, absolute_delay);
182 calculateMax(max_relative_delay, relative_delay);
183 }
184 catch (...)
185 {
186 tryLogCurrentException(__PRETTY_FUNCTION__,
187 "Cannot get replica delay for table: " + backQuoteIfNeed(db.first) + "." + backQuoteIfNeed(iterator->name()));
188 }
189 }
190
191 calculateMax(max_part_count_for_partition, table_replicated_merge_tree->getMaxPartsCountForPartition());
192 }
193
194 if (table_merge_tree)
195 {
196 calculateMax(max_part_count_for_partition, table_merge_tree->getMaxPartsCountForPartition());
197 }
198 }
199 }
200
201 set("ReplicasMaxQueueSize", max_queue_size);
202 set("ReplicasMaxInsertsInQueue", max_inserts_in_queue);
203 set("ReplicasMaxMergesInQueue", max_merges_in_queue);
204
205 set("ReplicasSumQueueSize", sum_queue_size);
206 set("ReplicasSumInsertsInQueue", sum_inserts_in_queue);
207 set("ReplicasSumMergesInQueue", sum_merges_in_queue);
208
209 set("ReplicasMaxAbsoluteDelay", max_absolute_delay);
210 set("ReplicasMaxRelativeDelay", max_relative_delay);
211
212 set("MaxPartCountForPartition", max_part_count_for_partition);
213
214 set("NumberOfDatabases", number_of_databases);
215 set("NumberOfTables", total_number_of_tables);
216 }
217
218#if USE_JEMALLOC
219 {
220 #define FOR_EACH_METRIC(M) \
221 M("allocated", size_t) \
222 M("active", size_t) \
223 M("metadata", size_t) \
224 M("metadata_thp", size_t) \
225 M("resident", size_t) \
226 M("mapped", size_t) \
227 M("retained", size_t) \
228 M("background_thread.num_threads", size_t) \
229 M("background_thread.num_runs", uint64_t) \
230 M("background_thread.run_interval", uint64_t) \
231
232 #define GET_METRIC(NAME, TYPE) \
233 do \
234 { \
235 TYPE value{}; \
236 size_t size = sizeof(value); \
237 mallctl("stats." NAME, &value, &size, nullptr, 0); \
238 set("jemalloc." NAME, value); \
239 } while (0);
240
241 FOR_EACH_METRIC(GET_METRIC)
242
243 #undef GET_METRIC
244 #undef FOR_EACH_METRIC
245 }
246#endif
247
248 /// Add more metrics as you wish.
249}
250
251
252}
253