1 | #include "TaskStatsInfoGetter.h" |
2 | #include <Common/Exception.h> |
3 | #include <Core/Types.h> |
4 | |
5 | #include <unistd.h> |
6 | |
7 | #if defined(OS_LINUX) |
8 | |
9 | #include "hasLinuxCapability.h" |
10 | #include <common/unaligned.h> |
11 | |
12 | #include <errno.h> |
13 | #include <stdio.h> |
14 | #include <stdlib.h> |
15 | #include <string.h> |
16 | #include <sys/socket.h> |
17 | #include <syscall.h> |
18 | #include <linux/genetlink.h> |
19 | #include <linux/netlink.h> |
20 | #include <linux/taskstats.h> |
21 | #include <linux/capability.h> |
22 | |
23 | |
24 | /// Basic idea is motivated by "iotop" tool. |
25 | /// More info: https://www.kernel.org/doc/Documentation/accounting/taskstats.txt |
26 | |
27 | |
28 | namespace DB |
29 | { |
30 | |
31 | namespace ErrorCodes |
32 | { |
33 | extern const int NETLINK_ERROR; |
34 | extern const int LOGICAL_ERROR; |
35 | } |
36 | |
37 | // Replace NLMSG_OK with explicit casts since that system macro contains signedness bugs which are not going to be fixed. |
38 | static inline bool is_nlmsg_ok(const struct nlmsghdr * const nlh, const ssize_t len) |
39 | { |
40 | return len >= static_cast<ssize_t>(sizeof(*nlh)) && nlh->nlmsg_len >= sizeof(*nlh) && static_cast<size_t>(len) >= nlh->nlmsg_len; |
41 | } |
42 | |
43 | namespace |
44 | { |
45 | |
46 | |
47 | /** The message contains: |
48 | * - Netlink protocol header; |
49 | * - Generic Netlink (is a sub-protocol of Netlink that we use) protocol header; |
50 | * - Payload |
51 | * -- that itself is a list of "Attributes" (sub-messages), each of them contains length (including header), type, and its own payload. |
52 | * -- and attribute payload may be represented by the list of embedded attributes. |
53 | */ |
54 | struct NetlinkMessage |
55 | { |
56 | static size_t constexpr MAX_MSG_SIZE = 1024; |
57 | |
58 | alignas(NLMSG_ALIGNTO) ::nlmsghdr ; |
59 | |
60 | struct Attribute |
61 | { |
62 | ::nlattr ; |
63 | |
64 | alignas(NLMSG_ALIGNTO) char payload[0]; |
65 | |
66 | const Attribute * next() const |
67 | { |
68 | return reinterpret_cast<const Attribute *>(reinterpret_cast<const char *>(this) + NLA_ALIGN(header.nla_len)); |
69 | } |
70 | }; |
71 | |
72 | union alignas(NLMSG_ALIGNTO) |
73 | { |
74 | struct |
75 | { |
76 | ::genlmsghdr ; |
77 | |
78 | union alignas(NLMSG_ALIGNTO) |
79 | { |
80 | char buf[MAX_MSG_SIZE]; |
81 | Attribute attribute; /// First attribute. There may be more. |
82 | } payload; |
83 | }; |
84 | |
85 | ::nlmsgerr error; |
86 | }; |
87 | |
88 | const Attribute * end() const |
89 | { |
90 | return reinterpret_cast<const Attribute *>(reinterpret_cast<const char *>(this) + header.nlmsg_len); |
91 | } |
92 | |
93 | void send(int fd) const |
94 | { |
95 | const char * request_buf = reinterpret_cast<const char *>(this); |
96 | ssize_t request_size = header.nlmsg_len; |
97 | |
98 | union |
99 | { |
100 | ::sockaddr_nl nladdr{}; |
101 | ::sockaddr sockaddr; |
102 | }; |
103 | |
104 | nladdr.nl_family = AF_NETLINK; |
105 | |
106 | while (true) |
107 | { |
108 | ssize_t bytes_sent = ::sendto(fd, request_buf, request_size, 0, &sockaddr, sizeof(nladdr)); |
109 | |
110 | if (bytes_sent <= 0) |
111 | { |
112 | if (errno == EAGAIN) |
113 | continue; |
114 | else |
115 | throwFromErrno("Can't send a Netlink command" , ErrorCodes::NETLINK_ERROR); |
116 | } |
117 | |
118 | if (bytes_sent > request_size) |
119 | throw Exception("Wrong result of sendto system call: bytes_sent is greater than request size" , ErrorCodes::NETLINK_ERROR); |
120 | |
121 | if (bytes_sent == request_size) |
122 | break; |
123 | |
124 | request_buf += bytes_sent; |
125 | request_size -= bytes_sent; |
126 | } |
127 | } |
128 | |
129 | void receive(int fd) |
130 | { |
131 | ssize_t bytes_received = ::recv(fd, this, sizeof(*this), 0); |
132 | |
133 | if (header.nlmsg_type == NLMSG_ERROR) |
134 | throw Exception("Can't receive Netlink response: error " + std::to_string(error.error), ErrorCodes::NETLINK_ERROR); |
135 | |
136 | if (!is_nlmsg_ok(&header, bytes_received)) |
137 | throw Exception("Can't receive Netlink response: wrong number of bytes received" , ErrorCodes::NETLINK_ERROR); |
138 | } |
139 | }; |
140 | |
141 | |
142 | NetlinkMessage query( |
143 | int fd, |
144 | UInt16 type, |
145 | UInt32 pid, |
146 | UInt8 command, |
147 | UInt16 attribute_type, |
148 | const void * attribute_data, |
149 | int attribute_size) |
150 | { |
151 | NetlinkMessage request{}; |
152 | |
153 | request.header.nlmsg_len = NLMSG_LENGTH(GENL_HDRLEN); /// Length of both headers. |
154 | request.header.nlmsg_type = type; |
155 | request.header.nlmsg_flags = NLM_F_REQUEST; /// A request. |
156 | request.header.nlmsg_seq = 0; |
157 | request.header.nlmsg_pid = pid; |
158 | |
159 | request.generic_header.cmd = command; |
160 | request.generic_header.version = 1; |
161 | |
162 | request.payload.attribute.header.nla_type = attribute_type; |
163 | request.payload.attribute.header.nla_len = attribute_size + NLA_HDRLEN; |
164 | |
165 | memcpy(&request.payload.attribute.payload, attribute_data, attribute_size); |
166 | |
167 | request.header.nlmsg_len += NLMSG_ALIGN(request.payload.attribute.header.nla_len); |
168 | |
169 | request.send(fd); |
170 | |
171 | NetlinkMessage response; |
172 | response.receive(fd); |
173 | |
174 | return response; |
175 | } |
176 | |
177 | |
178 | UInt16 getFamilyIdImpl(int fd) |
179 | { |
180 | NetlinkMessage answer = query(fd, GENL_ID_CTRL, getpid(), CTRL_CMD_GETFAMILY, CTRL_ATTR_FAMILY_NAME, TASKSTATS_GENL_NAME, strlen(TASKSTATS_GENL_NAME) + 1); |
181 | |
182 | /// NOTE Why the relevant info is located in the second attribute? |
183 | const NetlinkMessage::Attribute * attr = answer.payload.attribute.next(); |
184 | |
185 | if (attr->header.nla_type != CTRL_ATTR_FAMILY_ID) |
186 | throw Exception("Received wrong attribute as an answer to GET_FAMILY Netlink command" , ErrorCodes::NETLINK_ERROR); |
187 | |
188 | return unalignedLoad<UInt16>(attr->payload); |
189 | } |
190 | |
191 | |
192 | bool checkPermissionsImpl() |
193 | { |
194 | static bool res = hasLinuxCapability(CAP_NET_ADMIN); |
195 | if (!res) |
196 | return false; |
197 | |
198 | /// Check that we can successfully initialize TaskStatsInfoGetter. |
199 | /// It will ask about family id through Netlink. |
200 | /// On some LXC containers we have capability but we still cannot use Netlink. |
201 | |
202 | try |
203 | { |
204 | TaskStatsInfoGetter(); |
205 | } |
206 | catch (...) |
207 | { |
208 | tryLogCurrentException(__PRETTY_FUNCTION__); |
209 | return false; |
210 | } |
211 | |
212 | return true; |
213 | } |
214 | |
215 | |
216 | UInt16 getFamilyId(int fd) |
217 | { |
218 | /// It is thread and exception safe since C++11 and even before. |
219 | static UInt16 res = getFamilyIdImpl(fd); |
220 | return res; |
221 | } |
222 | |
223 | } |
224 | |
225 | |
226 | bool TaskStatsInfoGetter::checkPermissions() |
227 | { |
228 | static bool res = checkPermissionsImpl(); |
229 | return res; |
230 | } |
231 | |
232 | |
233 | TaskStatsInfoGetter::TaskStatsInfoGetter() |
234 | { |
235 | netlink_socket_fd = ::socket(PF_NETLINK, SOCK_RAW, NETLINK_GENERIC); |
236 | if (netlink_socket_fd < 0) |
237 | throwFromErrno("Can't create PF_NETLINK socket" , ErrorCodes::NETLINK_ERROR); |
238 | |
239 | /// On some containerized environments, operation on Netlink socket could hang forever. |
240 | /// We set reasonably small timeout to overcome this issue. |
241 | |
242 | struct timeval tv; |
243 | tv.tv_sec = 0; |
244 | tv.tv_usec = 50000; |
245 | |
246 | if (0 != ::setsockopt(netlink_socket_fd, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast<const char *>(&tv), sizeof(tv))) |
247 | throwFromErrno("Can't set timeout on PF_NETLINK socket" , ErrorCodes::NETLINK_ERROR); |
248 | |
249 | union |
250 | { |
251 | ::sockaddr_nl addr{}; |
252 | ::sockaddr sockaddr; |
253 | }; |
254 | addr.nl_family = AF_NETLINK; |
255 | |
256 | if (::bind(netlink_socket_fd, &sockaddr, sizeof(addr)) < 0) |
257 | throwFromErrno("Can't bind PF_NETLINK socket" , ErrorCodes::NETLINK_ERROR); |
258 | |
259 | taskstats_family_id = getFamilyId(netlink_socket_fd); |
260 | } |
261 | |
262 | |
263 | void TaskStatsInfoGetter::getStat(::taskstats & out_stats, pid_t tid) |
264 | { |
265 | NetlinkMessage answer = query(netlink_socket_fd, taskstats_family_id, tid, TASKSTATS_CMD_GET, TASKSTATS_CMD_ATTR_PID, &tid, sizeof(tid)); |
266 | |
267 | for (const NetlinkMessage::Attribute * attr = &answer.payload.attribute; |
268 | attr < answer.end(); |
269 | attr = attr->next()) |
270 | { |
271 | if (attr->header.nla_type == TASKSTATS_TYPE_AGGR_TGID || attr->header.nla_type == TASKSTATS_TYPE_AGGR_PID) |
272 | { |
273 | for (const NetlinkMessage::Attribute * nested_attr = reinterpret_cast<const NetlinkMessage::Attribute *>(attr->payload); |
274 | nested_attr < attr->next(); |
275 | nested_attr = nested_attr->next()) |
276 | { |
277 | if (nested_attr->header.nla_type == TASKSTATS_TYPE_STATS) |
278 | { |
279 | out_stats = unalignedLoad<::taskstats>(nested_attr->payload); |
280 | return; |
281 | } |
282 | } |
283 | } |
284 | } |
285 | |
286 | throw Exception("There is no TASKSTATS_TYPE_STATS attribute in the Netlink response" , ErrorCodes::NETLINK_ERROR); |
287 | } |
288 | |
289 | |
290 | static thread_local pid_t current_tid = 0; |
291 | pid_t TaskStatsInfoGetter::getCurrentTID() |
292 | { |
293 | if (!current_tid) |
294 | current_tid = syscall(SYS_gettid); /// This call is always successful. - man gettid |
295 | |
296 | return current_tid; |
297 | } |
298 | |
299 | |
300 | TaskStatsInfoGetter::~TaskStatsInfoGetter() |
301 | { |
302 | if (netlink_socket_fd >= 0) |
303 | close(netlink_socket_fd); |
304 | } |
305 | |
306 | } |
307 | |
308 | |
309 | #else |
310 | |
311 | namespace DB |
312 | { |
313 | |
314 | namespace ErrorCodes |
315 | { |
316 | extern const int NOT_IMPLEMENTED; |
317 | } |
318 | |
319 | bool TaskStatsInfoGetter::checkPermissions() |
320 | { |
321 | return false; |
322 | } |
323 | |
324 | |
325 | TaskStatsInfoGetter::TaskStatsInfoGetter() |
326 | { |
327 | // TODO: throw Exception("TaskStats are not implemented for this OS.", ErrorCodes::NOT_IMPLEMENTED); |
328 | } |
329 | |
330 | void TaskStatsInfoGetter::getStat(::taskstats &, pid_t) |
331 | { |
332 | } |
333 | |
334 | pid_t TaskStatsInfoGetter::getCurrentTID() |
335 | { |
336 | return 0; |
337 | } |
338 | |
339 | TaskStatsInfoGetter::~TaskStatsInfoGetter() |
340 | { |
341 | } |
342 | |
343 | } |
344 | |
345 | #endif |
346 | |