1 | #include <Interpreters/MetricLog.h> |
2 | #include <DataTypes/DataTypesNumber.h> |
3 | #include <DataTypes/DataTypeDate.h> |
4 | #include <DataTypes/DataTypeDateTime.h> |
5 | |
6 | |
7 | namespace DB |
8 | { |
9 | |
10 | Block MetricLogElement::createBlock() |
11 | { |
12 | ColumnsWithTypeAndName columns_with_type_and_name; |
13 | |
14 | columns_with_type_and_name.emplace_back(std::make_shared<DataTypeDate>(), "event_date" ); |
15 | columns_with_type_and_name.emplace_back(std::make_shared<DataTypeDateTime>(), "event_time" ); |
16 | columns_with_type_and_name.emplace_back(std::make_shared<DataTypeUInt64>(), "milliseconds" ); |
17 | |
18 | for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i) |
19 | { |
20 | std::string name; |
21 | name += "ProfileEvent_" ; |
22 | name += ProfileEvents::getName(ProfileEvents::Event(i)); |
23 | columns_with_type_and_name.emplace_back(std::make_shared<DataTypeUInt64>(), std::move(name)); |
24 | } |
25 | |
26 | for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i) |
27 | { |
28 | std::string name; |
29 | name += "CurrentMetric_" ; |
30 | name += CurrentMetrics::getName(ProfileEvents::Event(i)); |
31 | columns_with_type_and_name.emplace_back(std::make_shared<DataTypeInt64>(), std::move(name)); |
32 | } |
33 | |
34 | return Block(columns_with_type_and_name); |
35 | } |
36 | |
37 | |
38 | void MetricLogElement::appendToBlock(Block & block) const |
39 | { |
40 | MutableColumns columns = block.mutateColumns(); |
41 | |
42 | size_t column_idx = 0; |
43 | |
44 | columns[column_idx++]->insert(DateLUT::instance().toDayNum(event_time)); |
45 | columns[column_idx++]->insert(event_time); |
46 | columns[column_idx++]->insert(milliseconds); |
47 | |
48 | for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i) |
49 | columns[column_idx++]->insert(profile_events[i]); |
50 | |
51 | for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i) |
52 | columns[column_idx++]->insert(current_metrics[i]); |
53 | } |
54 | |
55 | |
56 | void MetricLog::startCollectMetric(size_t collect_interval_milliseconds_) |
57 | { |
58 | collect_interval_milliseconds = collect_interval_milliseconds_; |
59 | is_shutdown_metric_thread = false; |
60 | metric_flush_thread = ThreadFromGlobalPool([this] { metricThreadFunction(); }); |
61 | } |
62 | |
63 | |
64 | void MetricLog::stopCollectMetric() |
65 | { |
66 | bool old_val = false; |
67 | if (!is_shutdown_metric_thread.compare_exchange_strong(old_val, true)) |
68 | return; |
69 | metric_flush_thread.join(); |
70 | } |
71 | |
72 | |
73 | inline UInt64 time_in_milliseconds(std::chrono::time_point<std::chrono::system_clock> timepoint) |
74 | { |
75 | return std::chrono::duration_cast<std::chrono::milliseconds>(timepoint.time_since_epoch()).count(); |
76 | } |
77 | |
78 | |
79 | inline UInt64 time_in_seconds(std::chrono::time_point<std::chrono::system_clock> timepoint) |
80 | { |
81 | return std::chrono::duration_cast<std::chrono::seconds>(timepoint.time_since_epoch()).count(); |
82 | } |
83 | |
84 | |
85 | void MetricLog::metricThreadFunction() |
86 | { |
87 | auto desired_timepoint = std::chrono::system_clock::now(); |
88 | |
89 | /// For differentiation of ProfileEvents counters. |
90 | std::vector<ProfileEvents::Count> prev_profile_events(ProfileEvents::end()); |
91 | |
92 | while (!is_shutdown_metric_thread) |
93 | { |
94 | try |
95 | { |
96 | const auto current_time = std::chrono::system_clock::now(); |
97 | |
98 | MetricLogElement elem; |
99 | elem.event_time = std::chrono::system_clock::to_time_t(current_time); |
100 | elem.milliseconds = time_in_milliseconds(current_time) - time_in_seconds(current_time) * 1000; |
101 | |
102 | elem.profile_events.resize(ProfileEvents::end()); |
103 | for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i) |
104 | { |
105 | const ProfileEvents::Count new_value = ProfileEvents::global_counters[i].load(std::memory_order_relaxed); |
106 | auto & old_value = prev_profile_events[i]; |
107 | elem.profile_events[i] = new_value - old_value; |
108 | old_value = new_value; |
109 | } |
110 | |
111 | elem.current_metrics.resize(CurrentMetrics::end()); |
112 | for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i) |
113 | { |
114 | elem.current_metrics[i] = CurrentMetrics::values[i]; |
115 | } |
116 | |
117 | this->add(elem); |
118 | |
119 | /// We will record current time into table but align it to regular time intervals to avoid time drift. |
120 | /// We may drop some time points if the server is overloaded and recording took too much time. |
121 | while (desired_timepoint <= current_time) |
122 | desired_timepoint += std::chrono::milliseconds(collect_interval_milliseconds); |
123 | |
124 | std::this_thread::sleep_until(desired_timepoint); |
125 | } |
126 | catch (...) |
127 | { |
128 | tryLogCurrentException(__PRETTY_FUNCTION__); |
129 | } |
130 | } |
131 | } |
132 | |
133 | } |
134 | |