1 | #include <DataTypes/DataTypeString.h> |
2 | #include <DataTypes/DataTypesNumber.h> |
3 | #include <DataTypes/DataTypeArray.h> |
4 | #include <DataTypes/DataTypeFactory.h> |
5 | #include <Interpreters/ProcessList.h> |
6 | #include <Storages/System/StorageSystemProcesses.h> |
7 | #include <Interpreters/Context.h> |
8 | #include <Core/Settings.h> |
9 | #include <Interpreters/ProfileEventsExt.h> |
10 | #include <Common/typeid_cast.h> |
11 | #include <Common/IPv6ToBinary.h> |
12 | #include <Columns/ColumnsNumber.h> |
13 | #include <Columns/ColumnArray.h> |
14 | |
15 | |
16 | namespace DB |
17 | { |
18 | |
19 | NamesAndTypesList StorageSystemProcesses::getNamesAndTypes() |
20 | { |
21 | return { |
22 | {"is_initial_query" , std::make_shared<DataTypeUInt8>()}, |
23 | |
24 | {"user" , std::make_shared<DataTypeString>()}, |
25 | {"query_id" , std::make_shared<DataTypeString>()}, |
26 | {"address" , DataTypeFactory::instance().get("IPv6" )}, |
27 | {"port" , std::make_shared<DataTypeUInt16>()}, |
28 | |
29 | {"initial_user" , std::make_shared<DataTypeString>()}, |
30 | {"initial_query_id" , std::make_shared<DataTypeString>()}, |
31 | {"initial_address" , DataTypeFactory::instance().get("IPv6" )}, |
32 | {"initial_port" , std::make_shared<DataTypeUInt16>()}, |
33 | |
34 | {"interface" , std::make_shared<DataTypeUInt8>()}, |
35 | |
36 | {"os_user" , std::make_shared<DataTypeString>()}, |
37 | {"client_hostname" , std::make_shared<DataTypeString>()}, |
38 | {"client_name" , std::make_shared<DataTypeString>()}, |
39 | {"client_revision" , std::make_shared<DataTypeUInt64>()}, |
40 | {"client_version_major" , std::make_shared<DataTypeUInt64>()}, |
41 | {"client_version_minor" , std::make_shared<DataTypeUInt64>()}, |
42 | {"client_version_patch" , std::make_shared<DataTypeUInt64>()}, |
43 | |
44 | {"http_method" , std::make_shared<DataTypeUInt8>()}, |
45 | {"http_user_agent" , std::make_shared<DataTypeString>()}, |
46 | |
47 | {"quota_key" , std::make_shared<DataTypeString>()}, |
48 | |
49 | {"elapsed" , std::make_shared<DataTypeFloat64>()}, |
50 | {"is_cancelled" , std::make_shared<DataTypeUInt8>()}, |
51 | {"read_rows" , std::make_shared<DataTypeUInt64>()}, |
52 | {"read_bytes" , std::make_shared<DataTypeUInt64>()}, |
53 | {"total_rows_approx" , std::make_shared<DataTypeUInt64>()}, |
54 | {"written_rows" , std::make_shared<DataTypeUInt64>()}, |
55 | {"written_bytes" , std::make_shared<DataTypeUInt64>()}, |
56 | {"memory_usage" , std::make_shared<DataTypeInt64>()}, |
57 | {"peak_memory_usage" , std::make_shared<DataTypeInt64>()}, |
58 | {"query" , std::make_shared<DataTypeString>()}, |
59 | |
60 | {"thread_numbers" , std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt32>())}, |
61 | {"os_thread_ids" , std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt32>())}, |
62 | {"ProfileEvents.Names" , std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}, |
63 | {"ProfileEvents.Values" , std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>())}, |
64 | {"Settings.Names" , std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}, |
65 | {"Settings.Values" , std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}, |
66 | }; |
67 | } |
68 | |
69 | |
70 | void StorageSystemProcesses::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo &) const |
71 | { |
72 | ProcessList::Info info = context.getProcessList().getInfo(true, true, true); |
73 | |
74 | for (const auto & process : info) |
75 | { |
76 | size_t i = 0; |
77 | |
78 | res_columns[i++]->insert(process.client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY); |
79 | |
80 | res_columns[i++]->insert(process.client_info.current_user); |
81 | res_columns[i++]->insert(process.client_info.current_query_id); |
82 | res_columns[i++]->insertData(IPv6ToBinary(process.client_info.current_address.host()).data(), 16); |
83 | res_columns[i++]->insert(process.client_info.current_address.port()); |
84 | |
85 | res_columns[i++]->insert(process.client_info.initial_user); |
86 | res_columns[i++]->insert(process.client_info.initial_query_id); |
87 | res_columns[i++]->insertData(IPv6ToBinary(process.client_info.initial_address.host()).data(), 16); |
88 | res_columns[i++]->insert(process.client_info.initial_address.port()); |
89 | |
90 | res_columns[i++]->insert(UInt64(process.client_info.interface)); |
91 | |
92 | res_columns[i++]->insert(process.client_info.os_user); |
93 | res_columns[i++]->insert(process.client_info.client_hostname); |
94 | res_columns[i++]->insert(process.client_info.client_name); |
95 | res_columns[i++]->insert(process.client_info.client_revision); |
96 | res_columns[i++]->insert(process.client_info.client_version_major); |
97 | res_columns[i++]->insert(process.client_info.client_version_minor); |
98 | res_columns[i++]->insert(process.client_info.client_version_patch); |
99 | |
100 | res_columns[i++]->insert(UInt64(process.client_info.http_method)); |
101 | res_columns[i++]->insert(process.client_info.http_user_agent); |
102 | |
103 | res_columns[i++]->insert(process.client_info.quota_key); |
104 | |
105 | res_columns[i++]->insert(process.elapsed_seconds); |
106 | res_columns[i++]->insert(process.is_cancelled); |
107 | res_columns[i++]->insert(process.read_rows); |
108 | res_columns[i++]->insert(process.read_bytes); |
109 | res_columns[i++]->insert(process.total_rows); |
110 | res_columns[i++]->insert(process.written_rows); |
111 | res_columns[i++]->insert(process.written_bytes); |
112 | res_columns[i++]->insert(process.memory_usage); |
113 | res_columns[i++]->insert(process.peak_memory_usage); |
114 | res_columns[i++]->insert(process.query); |
115 | |
116 | { |
117 | Array threads_array; |
118 | threads_array.reserve(process.thread_numbers.size()); |
119 | for (const UInt32 thread_number : process.thread_numbers) |
120 | threads_array.emplace_back(thread_number); |
121 | res_columns[i++]->insert(threads_array); |
122 | } |
123 | |
124 | { |
125 | Array threads_array; |
126 | threads_array.reserve(process.os_thread_ids.size()); |
127 | for (const UInt32 thread_number : process.os_thread_ids) |
128 | threads_array.emplace_back(thread_number); |
129 | res_columns[i++]->insert(threads_array); |
130 | } |
131 | |
132 | { |
133 | IColumn * column_profile_events_names = res_columns[i++].get(); |
134 | IColumn * column_profile_events_values = res_columns[i++].get(); |
135 | |
136 | if (process.profile_counters) |
137 | ProfileEvents::dumpToArrayColumns(*process.profile_counters, column_profile_events_names, column_profile_events_values, true); |
138 | else |
139 | { |
140 | column_profile_events_names->insertDefault(); |
141 | column_profile_events_values->insertDefault(); |
142 | } |
143 | } |
144 | |
145 | { |
146 | IColumn * column_settings_names = res_columns[i++].get(); |
147 | IColumn * column_settings_values = res_columns[i++].get(); |
148 | |
149 | if (process.query_settings) |
150 | process.query_settings->dumpToArrayColumns(column_settings_names, column_settings_values, true); |
151 | else |
152 | { |
153 | column_settings_names->insertDefault(); |
154 | column_settings_values->insertDefault(); |
155 | } |
156 | } |
157 | } |
158 | } |
159 | |
160 | } |
161 | |