1#include "MetricsTransmitter.h"
2
3#include <Interpreters/AsynchronousMetrics.h>
4
5#include <Common/CurrentMetrics.h>
6#include <Common/Exception.h>
7#include <Common/setThreadName.h>
8
9#include <daemon/BaseDaemon.h>
10
11#include <Poco/Util/Application.h>
12#include <Poco/Util/LayeredConfiguration.h>
13
14
15namespace DB
16{
17
18MetricsTransmitter::MetricsTransmitter(
19 const Poco::Util::AbstractConfiguration & config, const std::string & config_name_, const AsynchronousMetrics & async_metrics_)
20 : async_metrics(async_metrics_), config_name(config_name_)
21{
22 interval_seconds = config.getInt(config_name + ".interval", 60);
23 send_events = config.getBool(config_name + ".events", true);
24 send_events_cumulative = config.getBool(config_name + ".events_cumulative", false);
25 send_metrics = config.getBool(config_name + ".metrics", true);
26 send_asynchronous_metrics = config.getBool(config_name + ".asynchronous_metrics", true);
27
28 thread = ThreadFromGlobalPool{&MetricsTransmitter::run, this};
29}
30
31
32MetricsTransmitter::~MetricsTransmitter()
33{
34 try
35 {
36 {
37 std::lock_guard lock{mutex};
38 quit = true;
39 }
40
41 cond.notify_one();
42
43 thread->join();
44 }
45 catch (...)
46 {
47 DB::tryLogCurrentException(__PRETTY_FUNCTION__);
48 }
49}
50
51
52void MetricsTransmitter::run()
53{
54 const std::string thread_name = "MetrTx" + std::to_string(interval_seconds);
55 setThreadName(thread_name.c_str());
56
57 const auto get_next_time = [](size_t seconds)
58 {
59 /// To avoid time drift and transmit values exactly each interval:
60 /// next time aligned to system seconds
61 /// (60s -> every minute at 00 seconds, 5s -> every minute:[00, 05, 15 ... 55]s, 3600 -> every hour:00:00
62 return std::chrono::system_clock::time_point(
63 (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()) / seconds) * seconds
64 + std::chrono::seconds(seconds));
65 };
66
67 std::vector<ProfileEvents::Count> prev_counters(ProfileEvents::end());
68
69 std::unique_lock lock{mutex};
70
71 while (true)
72 {
73 if (cond.wait_until(lock, get_next_time(interval_seconds), [this] { return quit; }))
74 break;
75
76 transmit(prev_counters);
77 }
78}
79
80
81void MetricsTransmitter::transmit(std::vector<ProfileEvents::Count> & prev_counters)
82{
83 auto async_metrics_values = async_metrics.getValues();
84
85 GraphiteWriter::KeyValueVector<ssize_t> key_vals{};
86 key_vals.reserve(ProfileEvents::end() + CurrentMetrics::end() + async_metrics_values.size());
87
88 if (send_events)
89 {
90 for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
91 {
92 const auto counter = ProfileEvents::global_counters[i].load(std::memory_order_relaxed);
93 const auto counter_increment = counter - prev_counters[i];
94 prev_counters[i] = counter;
95
96 std::string key{ProfileEvents::getName(static_cast<ProfileEvents::Event>(i))};
97 key_vals.emplace_back(profile_events_path_prefix + key, counter_increment);
98 }
99 }
100
101 if (send_events_cumulative)
102 {
103 for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
104 {
105 const auto counter = ProfileEvents::global_counters[i].load(std::memory_order_relaxed);
106 std::string key{ProfileEvents::getName(static_cast<ProfileEvents::Event>(i))};
107 key_vals.emplace_back(profile_events_cumulative_path_prefix + key, counter);
108 }
109 }
110
111 if (send_metrics)
112 {
113 for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
114 {
115 const auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed);
116
117 std::string key{CurrentMetrics::getName(static_cast<CurrentMetrics::Metric>(i))};
118 key_vals.emplace_back(current_metrics_path_prefix + key, value);
119 }
120 }
121
122 if (send_asynchronous_metrics)
123 {
124 for (const auto & name_value : async_metrics_values)
125 {
126 key_vals.emplace_back(asynchronous_metrics_path_prefix + name_value.first, name_value.second);
127 }
128 }
129
130 if (key_vals.size())
131 BaseDaemon::instance().writeToGraphite(key_vals, config_name);
132}
133
134}
135