1 | #include "MetricsTransmitter.h" |
2 | |
3 | #include <Interpreters/AsynchronousMetrics.h> |
4 | |
5 | #include <Common/CurrentMetrics.h> |
6 | #include <Common/Exception.h> |
7 | #include <Common/setThreadName.h> |
8 | |
9 | #include <daemon/BaseDaemon.h> |
10 | |
11 | #include <Poco/Util/Application.h> |
12 | #include <Poco/Util/LayeredConfiguration.h> |
13 | |
14 | |
15 | namespace DB |
16 | { |
17 | |
18 | MetricsTransmitter::MetricsTransmitter( |
19 | const Poco::Util::AbstractConfiguration & config, const std::string & config_name_, const AsynchronousMetrics & async_metrics_) |
20 | : async_metrics(async_metrics_), config_name(config_name_) |
21 | { |
22 | interval_seconds = config.getInt(config_name + ".interval" , 60); |
23 | send_events = config.getBool(config_name + ".events" , true); |
24 | send_events_cumulative = config.getBool(config_name + ".events_cumulative" , false); |
25 | send_metrics = config.getBool(config_name + ".metrics" , true); |
26 | send_asynchronous_metrics = config.getBool(config_name + ".asynchronous_metrics" , true); |
27 | |
28 | thread = ThreadFromGlobalPool{&MetricsTransmitter::run, this}; |
29 | } |
30 | |
31 | |
32 | MetricsTransmitter::~MetricsTransmitter() |
33 | { |
34 | try |
35 | { |
36 | { |
37 | std::lock_guard lock{mutex}; |
38 | quit = true; |
39 | } |
40 | |
41 | cond.notify_one(); |
42 | |
43 | thread->join(); |
44 | } |
45 | catch (...) |
46 | { |
47 | DB::tryLogCurrentException(__PRETTY_FUNCTION__); |
48 | } |
49 | } |
50 | |
51 | |
52 | void MetricsTransmitter::run() |
53 | { |
54 | const std::string thread_name = "MetrTx" + std::to_string(interval_seconds); |
55 | setThreadName(thread_name.c_str()); |
56 | |
57 | const auto get_next_time = [](size_t seconds) |
58 | { |
59 | /// To avoid time drift and transmit values exactly each interval: |
60 | /// next time aligned to system seconds |
61 | /// (60s -> every minute at 00 seconds, 5s -> every minute:[00, 05, 15 ... 55]s, 3600 -> every hour:00:00 |
62 | return std::chrono::system_clock::time_point( |
63 | (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()) / seconds) * seconds |
64 | + std::chrono::seconds(seconds)); |
65 | }; |
66 | |
67 | std::vector<ProfileEvents::Count> prev_counters(ProfileEvents::end()); |
68 | |
69 | std::unique_lock lock{mutex}; |
70 | |
71 | while (true) |
72 | { |
73 | if (cond.wait_until(lock, get_next_time(interval_seconds), [this] { return quit; })) |
74 | break; |
75 | |
76 | transmit(prev_counters); |
77 | } |
78 | } |
79 | |
80 | |
81 | void MetricsTransmitter::transmit(std::vector<ProfileEvents::Count> & prev_counters) |
82 | { |
83 | auto async_metrics_values = async_metrics.getValues(); |
84 | |
85 | GraphiteWriter::KeyValueVector<ssize_t> key_vals{}; |
86 | key_vals.reserve(ProfileEvents::end() + CurrentMetrics::end() + async_metrics_values.size()); |
87 | |
88 | if (send_events) |
89 | { |
90 | for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i) |
91 | { |
92 | const auto counter = ProfileEvents::global_counters[i].load(std::memory_order_relaxed); |
93 | const auto counter_increment = counter - prev_counters[i]; |
94 | prev_counters[i] = counter; |
95 | |
96 | std::string key{ProfileEvents::getName(static_cast<ProfileEvents::Event>(i))}; |
97 | key_vals.emplace_back(profile_events_path_prefix + key, counter_increment); |
98 | } |
99 | } |
100 | |
101 | if (send_events_cumulative) |
102 | { |
103 | for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i) |
104 | { |
105 | const auto counter = ProfileEvents::global_counters[i].load(std::memory_order_relaxed); |
106 | std::string key{ProfileEvents::getName(static_cast<ProfileEvents::Event>(i))}; |
107 | key_vals.emplace_back(profile_events_cumulative_path_prefix + key, counter); |
108 | } |
109 | } |
110 | |
111 | if (send_metrics) |
112 | { |
113 | for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i) |
114 | { |
115 | const auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed); |
116 | |
117 | std::string key{CurrentMetrics::getName(static_cast<CurrentMetrics::Metric>(i))}; |
118 | key_vals.emplace_back(current_metrics_path_prefix + key, value); |
119 | } |
120 | } |
121 | |
122 | if (send_asynchronous_metrics) |
123 | { |
124 | for (const auto & name_value : async_metrics_values) |
125 | { |
126 | key_vals.emplace_back(asynchronous_metrics_path_prefix + name_value.first, name_value.second); |
127 | } |
128 | } |
129 | |
130 | if (key_vals.size()) |
131 | BaseDaemon::instance().writeToGraphite(key_vals, config_name); |
132 | } |
133 | |
134 | } |
135 | |