| 1 | #include <Interpreters/MetricLog.h> |
| 2 | #include <DataTypes/DataTypesNumber.h> |
| 3 | #include <DataTypes/DataTypeDate.h> |
| 4 | #include <DataTypes/DataTypeDateTime.h> |
| 5 | |
| 6 | |
| 7 | namespace DB |
| 8 | { |
| 9 | |
| 10 | Block MetricLogElement::createBlock() |
| 11 | { |
| 12 | ColumnsWithTypeAndName columns_with_type_and_name; |
| 13 | |
| 14 | columns_with_type_and_name.emplace_back(std::make_shared<DataTypeDate>(), "event_date" ); |
| 15 | columns_with_type_and_name.emplace_back(std::make_shared<DataTypeDateTime>(), "event_time" ); |
| 16 | columns_with_type_and_name.emplace_back(std::make_shared<DataTypeUInt64>(), "milliseconds" ); |
| 17 | |
| 18 | for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i) |
| 19 | { |
| 20 | std::string name; |
| 21 | name += "ProfileEvent_" ; |
| 22 | name += ProfileEvents::getName(ProfileEvents::Event(i)); |
| 23 | columns_with_type_and_name.emplace_back(std::make_shared<DataTypeUInt64>(), std::move(name)); |
| 24 | } |
| 25 | |
| 26 | for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i) |
| 27 | { |
| 28 | std::string name; |
| 29 | name += "CurrentMetric_" ; |
| 30 | name += CurrentMetrics::getName(ProfileEvents::Event(i)); |
| 31 | columns_with_type_and_name.emplace_back(std::make_shared<DataTypeInt64>(), std::move(name)); |
| 32 | } |
| 33 | |
| 34 | return Block(columns_with_type_and_name); |
| 35 | } |
| 36 | |
| 37 | |
| 38 | void MetricLogElement::appendToBlock(Block & block) const |
| 39 | { |
| 40 | MutableColumns columns = block.mutateColumns(); |
| 41 | |
| 42 | size_t column_idx = 0; |
| 43 | |
| 44 | columns[column_idx++]->insert(DateLUT::instance().toDayNum(event_time)); |
| 45 | columns[column_idx++]->insert(event_time); |
| 46 | columns[column_idx++]->insert(milliseconds); |
| 47 | |
| 48 | for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i) |
| 49 | columns[column_idx++]->insert(profile_events[i]); |
| 50 | |
| 51 | for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i) |
| 52 | columns[column_idx++]->insert(current_metrics[i]); |
| 53 | } |
| 54 | |
| 55 | |
| 56 | void MetricLog::startCollectMetric(size_t collect_interval_milliseconds_) |
| 57 | { |
| 58 | collect_interval_milliseconds = collect_interval_milliseconds_; |
| 59 | is_shutdown_metric_thread = false; |
| 60 | metric_flush_thread = ThreadFromGlobalPool([this] { metricThreadFunction(); }); |
| 61 | } |
| 62 | |
| 63 | |
| 64 | void MetricLog::stopCollectMetric() |
| 65 | { |
| 66 | bool old_val = false; |
| 67 | if (!is_shutdown_metric_thread.compare_exchange_strong(old_val, true)) |
| 68 | return; |
| 69 | metric_flush_thread.join(); |
| 70 | } |
| 71 | |
| 72 | |
| 73 | inline UInt64 time_in_milliseconds(std::chrono::time_point<std::chrono::system_clock> timepoint) |
| 74 | { |
| 75 | return std::chrono::duration_cast<std::chrono::milliseconds>(timepoint.time_since_epoch()).count(); |
| 76 | } |
| 77 | |
| 78 | |
| 79 | inline UInt64 time_in_seconds(std::chrono::time_point<std::chrono::system_clock> timepoint) |
| 80 | { |
| 81 | return std::chrono::duration_cast<std::chrono::seconds>(timepoint.time_since_epoch()).count(); |
| 82 | } |
| 83 | |
| 84 | |
| 85 | void MetricLog::metricThreadFunction() |
| 86 | { |
| 87 | auto desired_timepoint = std::chrono::system_clock::now(); |
| 88 | |
| 89 | /// For differentiation of ProfileEvents counters. |
| 90 | std::vector<ProfileEvents::Count> prev_profile_events(ProfileEvents::end()); |
| 91 | |
| 92 | while (!is_shutdown_metric_thread) |
| 93 | { |
| 94 | try |
| 95 | { |
| 96 | const auto current_time = std::chrono::system_clock::now(); |
| 97 | |
| 98 | MetricLogElement elem; |
| 99 | elem.event_time = std::chrono::system_clock::to_time_t(current_time); |
| 100 | elem.milliseconds = time_in_milliseconds(current_time) - time_in_seconds(current_time) * 1000; |
| 101 | |
| 102 | elem.profile_events.resize(ProfileEvents::end()); |
| 103 | for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i) |
| 104 | { |
| 105 | const ProfileEvents::Count new_value = ProfileEvents::global_counters[i].load(std::memory_order_relaxed); |
| 106 | auto & old_value = prev_profile_events[i]; |
| 107 | elem.profile_events[i] = new_value - old_value; |
| 108 | old_value = new_value; |
| 109 | } |
| 110 | |
| 111 | elem.current_metrics.resize(CurrentMetrics::end()); |
| 112 | for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i) |
| 113 | { |
| 114 | elem.current_metrics[i] = CurrentMetrics::values[i]; |
| 115 | } |
| 116 | |
| 117 | this->add(elem); |
| 118 | |
| 119 | /// We will record current time into table but align it to regular time intervals to avoid time drift. |
| 120 | /// We may drop some time points if the server is overloaded and recording took too much time. |
| 121 | while (desired_timepoint <= current_time) |
| 122 | desired_timepoint += std::chrono::milliseconds(collect_interval_milliseconds); |
| 123 | |
| 124 | std::this_thread::sleep_until(desired_timepoint); |
| 125 | } |
| 126 | catch (...) |
| 127 | { |
| 128 | tryLogCurrentException(__PRETTY_FUNCTION__); |
| 129 | } |
| 130 | } |
| 131 | } |
| 132 | |
| 133 | } |
| 134 | |