1#include <Interpreters/MetricLog.h>
2#include <DataTypes/DataTypesNumber.h>
3#include <DataTypes/DataTypeDate.h>
4#include <DataTypes/DataTypeDateTime.h>
5
6
7namespace DB
8{
9
10Block MetricLogElement::createBlock()
11{
12 ColumnsWithTypeAndName columns_with_type_and_name;
13
14 columns_with_type_and_name.emplace_back(std::make_shared<DataTypeDate>(), "event_date");
15 columns_with_type_and_name.emplace_back(std::make_shared<DataTypeDateTime>(), "event_time");
16 columns_with_type_and_name.emplace_back(std::make_shared<DataTypeUInt64>(), "milliseconds");
17
18 for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
19 {
20 std::string name;
21 name += "ProfileEvent_";
22 name += ProfileEvents::getName(ProfileEvents::Event(i));
23 columns_with_type_and_name.emplace_back(std::make_shared<DataTypeUInt64>(), std::move(name));
24 }
25
26 for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
27 {
28 std::string name;
29 name += "CurrentMetric_";
30 name += CurrentMetrics::getName(ProfileEvents::Event(i));
31 columns_with_type_and_name.emplace_back(std::make_shared<DataTypeInt64>(), std::move(name));
32 }
33
34 return Block(columns_with_type_and_name);
35}
36
37
38void MetricLogElement::appendToBlock(Block & block) const
39{
40 MutableColumns columns = block.mutateColumns();
41
42 size_t column_idx = 0;
43
44 columns[column_idx++]->insert(DateLUT::instance().toDayNum(event_time));
45 columns[column_idx++]->insert(event_time);
46 columns[column_idx++]->insert(milliseconds);
47
48 for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
49 columns[column_idx++]->insert(profile_events[i]);
50
51 for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
52 columns[column_idx++]->insert(current_metrics[i]);
53}
54
55
56void MetricLog::startCollectMetric(size_t collect_interval_milliseconds_)
57{
58 collect_interval_milliseconds = collect_interval_milliseconds_;
59 is_shutdown_metric_thread = false;
60 metric_flush_thread = ThreadFromGlobalPool([this] { metricThreadFunction(); });
61}
62
63
64void MetricLog::stopCollectMetric()
65{
66 bool old_val = false;
67 if (!is_shutdown_metric_thread.compare_exchange_strong(old_val, true))
68 return;
69 metric_flush_thread.join();
70}
71
72
73inline UInt64 time_in_milliseconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
74{
75 return std::chrono::duration_cast<std::chrono::milliseconds>(timepoint.time_since_epoch()).count();
76}
77
78
79inline UInt64 time_in_seconds(std::chrono::time_point<std::chrono::system_clock> timepoint)
80{
81 return std::chrono::duration_cast<std::chrono::seconds>(timepoint.time_since_epoch()).count();
82}
83
84
85void MetricLog::metricThreadFunction()
86{
87 auto desired_timepoint = std::chrono::system_clock::now();
88
89 /// For differentiation of ProfileEvents counters.
90 std::vector<ProfileEvents::Count> prev_profile_events(ProfileEvents::end());
91
92 while (!is_shutdown_metric_thread)
93 {
94 try
95 {
96 const auto current_time = std::chrono::system_clock::now();
97
98 MetricLogElement elem;
99 elem.event_time = std::chrono::system_clock::to_time_t(current_time);
100 elem.milliseconds = time_in_milliseconds(current_time) - time_in_seconds(current_time) * 1000;
101
102 elem.profile_events.resize(ProfileEvents::end());
103 for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
104 {
105 const ProfileEvents::Count new_value = ProfileEvents::global_counters[i].load(std::memory_order_relaxed);
106 auto & old_value = prev_profile_events[i];
107 elem.profile_events[i] = new_value - old_value;
108 old_value = new_value;
109 }
110
111 elem.current_metrics.resize(CurrentMetrics::end());
112 for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
113 {
114 elem.current_metrics[i] = CurrentMetrics::values[i];
115 }
116
117 this->add(elem);
118
119 /// We will record current time into table but align it to regular time intervals to avoid time drift.
120 /// We may drop some time points if the server is overloaded and recording took too much time.
121 while (desired_timepoint <= current_time)
122 desired_timepoint += std::chrono::milliseconds(collect_interval_milliseconds);
123
124 std::this_thread::sleep_until(desired_timepoint);
125 }
126 catch (...)
127 {
128 tryLogCurrentException(__PRETTY_FUNCTION__);
129 }
130 }
131}
132
133}
134