| 1 | #include "MetricsTransmitter.h" |
| 2 | |
| 3 | #include <Interpreters/AsynchronousMetrics.h> |
| 4 | |
| 5 | #include <Common/CurrentMetrics.h> |
| 6 | #include <Common/Exception.h> |
| 7 | #include <Common/setThreadName.h> |
| 8 | |
| 9 | #include <daemon/BaseDaemon.h> |
| 10 | |
| 11 | #include <Poco/Util/Application.h> |
| 12 | #include <Poco/Util/LayeredConfiguration.h> |
| 13 | |
| 14 | |
| 15 | namespace DB |
| 16 | { |
| 17 | |
| 18 | MetricsTransmitter::MetricsTransmitter( |
| 19 | const Poco::Util::AbstractConfiguration & config, const std::string & config_name_, const AsynchronousMetrics & async_metrics_) |
| 20 | : async_metrics(async_metrics_), config_name(config_name_) |
| 21 | { |
| 22 | interval_seconds = config.getInt(config_name + ".interval" , 60); |
| 23 | send_events = config.getBool(config_name + ".events" , true); |
| 24 | send_events_cumulative = config.getBool(config_name + ".events_cumulative" , false); |
| 25 | send_metrics = config.getBool(config_name + ".metrics" , true); |
| 26 | send_asynchronous_metrics = config.getBool(config_name + ".asynchronous_metrics" , true); |
| 27 | |
| 28 | thread = ThreadFromGlobalPool{&MetricsTransmitter::run, this}; |
| 29 | } |
| 30 | |
| 31 | |
| 32 | MetricsTransmitter::~MetricsTransmitter() |
| 33 | { |
| 34 | try |
| 35 | { |
| 36 | { |
| 37 | std::lock_guard lock{mutex}; |
| 38 | quit = true; |
| 39 | } |
| 40 | |
| 41 | cond.notify_one(); |
| 42 | |
| 43 | thread->join(); |
| 44 | } |
| 45 | catch (...) |
| 46 | { |
| 47 | DB::tryLogCurrentException(__PRETTY_FUNCTION__); |
| 48 | } |
| 49 | } |
| 50 | |
| 51 | |
| 52 | void MetricsTransmitter::run() |
| 53 | { |
| 54 | const std::string thread_name = "MetrTx" + std::to_string(interval_seconds); |
| 55 | setThreadName(thread_name.c_str()); |
| 56 | |
| 57 | const auto get_next_time = [](size_t seconds) |
| 58 | { |
| 59 | /// To avoid time drift and transmit values exactly each interval: |
| 60 | /// next time aligned to system seconds |
| 61 | /// (60s -> every minute at 00 seconds, 5s -> every minute:[00, 05, 15 ... 55]s, 3600 -> every hour:00:00 |
| 62 | return std::chrono::system_clock::time_point( |
| 63 | (std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()) / seconds) * seconds |
| 64 | + std::chrono::seconds(seconds)); |
| 65 | }; |
| 66 | |
| 67 | std::vector<ProfileEvents::Count> prev_counters(ProfileEvents::end()); |
| 68 | |
| 69 | std::unique_lock lock{mutex}; |
| 70 | |
| 71 | while (true) |
| 72 | { |
| 73 | if (cond.wait_until(lock, get_next_time(interval_seconds), [this] { return quit; })) |
| 74 | break; |
| 75 | |
| 76 | transmit(prev_counters); |
| 77 | } |
| 78 | } |
| 79 | |
| 80 | |
| 81 | void MetricsTransmitter::transmit(std::vector<ProfileEvents::Count> & prev_counters) |
| 82 | { |
| 83 | auto async_metrics_values = async_metrics.getValues(); |
| 84 | |
| 85 | GraphiteWriter::KeyValueVector<ssize_t> key_vals{}; |
| 86 | key_vals.reserve(ProfileEvents::end() + CurrentMetrics::end() + async_metrics_values.size()); |
| 87 | |
| 88 | if (send_events) |
| 89 | { |
| 90 | for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i) |
| 91 | { |
| 92 | const auto counter = ProfileEvents::global_counters[i].load(std::memory_order_relaxed); |
| 93 | const auto counter_increment = counter - prev_counters[i]; |
| 94 | prev_counters[i] = counter; |
| 95 | |
| 96 | std::string key{ProfileEvents::getName(static_cast<ProfileEvents::Event>(i))}; |
| 97 | key_vals.emplace_back(profile_events_path_prefix + key, counter_increment); |
| 98 | } |
| 99 | } |
| 100 | |
| 101 | if (send_events_cumulative) |
| 102 | { |
| 103 | for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i) |
| 104 | { |
| 105 | const auto counter = ProfileEvents::global_counters[i].load(std::memory_order_relaxed); |
| 106 | std::string key{ProfileEvents::getName(static_cast<ProfileEvents::Event>(i))}; |
| 107 | key_vals.emplace_back(profile_events_cumulative_path_prefix + key, counter); |
| 108 | } |
| 109 | } |
| 110 | |
| 111 | if (send_metrics) |
| 112 | { |
| 113 | for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i) |
| 114 | { |
| 115 | const auto value = CurrentMetrics::values[i].load(std::memory_order_relaxed); |
| 116 | |
| 117 | std::string key{CurrentMetrics::getName(static_cast<CurrentMetrics::Metric>(i))}; |
| 118 | key_vals.emplace_back(current_metrics_path_prefix + key, value); |
| 119 | } |
| 120 | } |
| 121 | |
| 122 | if (send_asynchronous_metrics) |
| 123 | { |
| 124 | for (const auto & name_value : async_metrics_values) |
| 125 | { |
| 126 | key_vals.emplace_back(asynchronous_metrics_path_prefix + name_value.first, name_value.second); |
| 127 | } |
| 128 | } |
| 129 | |
| 130 | if (key_vals.size()) |
| 131 | BaseDaemon::instance().writeToGraphite(key_vals, config_name); |
| 132 | } |
| 133 | |
| 134 | } |
| 135 | |