1#include "TaskStatsInfoGetter.h"
2#include <Common/Exception.h>
3#include <Core/Types.h>
4
5#include <unistd.h>
6
7#if defined(OS_LINUX)
8
9#include "hasLinuxCapability.h"
10#include <common/unaligned.h>
11
12#include <errno.h>
13#include <stdio.h>
14#include <stdlib.h>
15#include <string.h>
16#include <sys/socket.h>
17#include <syscall.h>
18#include <linux/genetlink.h>
19#include <linux/netlink.h>
20#include <linux/taskstats.h>
21#include <linux/capability.h>
22
23
24/// Basic idea is motivated by "iotop" tool.
25/// More info: https://www.kernel.org/doc/Documentation/accounting/taskstats.txt
26
27
28namespace DB
29{
30
31namespace ErrorCodes
32{
33 extern const int NETLINK_ERROR;
34 extern const int LOGICAL_ERROR;
35}
36
37// Replace NLMSG_OK with explicit casts since that system macro contains signedness bugs which are not going to be fixed.
38static inline bool is_nlmsg_ok(const struct nlmsghdr * const nlh, const ssize_t len)
39{
40 return len >= static_cast<ssize_t>(sizeof(*nlh)) && nlh->nlmsg_len >= sizeof(*nlh) && static_cast<size_t>(len) >= nlh->nlmsg_len;
41}
42
43namespace
44{
45
46
47/** The message contains:
48 * - Netlink protocol header;
49 * - Generic Netlink (is a sub-protocol of Netlink that we use) protocol header;
50 * - Payload
51 * -- that itself is a list of "Attributes" (sub-messages), each of them contains length (including header), type, and its own payload.
52 * -- and attribute payload may be represented by the list of embedded attributes.
53 */
54struct NetlinkMessage
55{
56 static size_t constexpr MAX_MSG_SIZE = 1024;
57
58 alignas(NLMSG_ALIGNTO) ::nlmsghdr header;
59
60 struct Attribute
61 {
62 ::nlattr header;
63
64 alignas(NLMSG_ALIGNTO) char payload[0];
65
66 const Attribute * next() const
67 {
68 return reinterpret_cast<const Attribute *>(reinterpret_cast<const char *>(this) + NLA_ALIGN(header.nla_len));
69 }
70 };
71
72 union alignas(NLMSG_ALIGNTO)
73 {
74 struct
75 {
76 ::genlmsghdr generic_header;
77
78 union alignas(NLMSG_ALIGNTO)
79 {
80 char buf[MAX_MSG_SIZE];
81 Attribute attribute; /// First attribute. There may be more.
82 } payload;
83 };
84
85 ::nlmsgerr error;
86 };
87
88 const Attribute * end() const
89 {
90 return reinterpret_cast<const Attribute *>(reinterpret_cast<const char *>(this) + header.nlmsg_len);
91 }
92
93 void send(int fd) const
94 {
95 const char * request_buf = reinterpret_cast<const char *>(this);
96 ssize_t request_size = header.nlmsg_len;
97
98 union
99 {
100 ::sockaddr_nl nladdr{};
101 ::sockaddr sockaddr;
102 };
103
104 nladdr.nl_family = AF_NETLINK;
105
106 while (true)
107 {
108 ssize_t bytes_sent = ::sendto(fd, request_buf, request_size, 0, &sockaddr, sizeof(nladdr));
109
110 if (bytes_sent <= 0)
111 {
112 if (errno == EAGAIN)
113 continue;
114 else
115 throwFromErrno("Can't send a Netlink command", ErrorCodes::NETLINK_ERROR);
116 }
117
118 if (bytes_sent > request_size)
119 throw Exception("Wrong result of sendto system call: bytes_sent is greater than request size", ErrorCodes::NETLINK_ERROR);
120
121 if (bytes_sent == request_size)
122 break;
123
124 request_buf += bytes_sent;
125 request_size -= bytes_sent;
126 }
127 }
128
129 void receive(int fd)
130 {
131 ssize_t bytes_received = ::recv(fd, this, sizeof(*this), 0);
132
133 if (header.nlmsg_type == NLMSG_ERROR)
134 throw Exception("Can't receive Netlink response: error " + std::to_string(error.error), ErrorCodes::NETLINK_ERROR);
135
136 if (!is_nlmsg_ok(&header, bytes_received))
137 throw Exception("Can't receive Netlink response: wrong number of bytes received", ErrorCodes::NETLINK_ERROR);
138 }
139};
140
141
142NetlinkMessage query(
143 int fd,
144 UInt16 type,
145 UInt32 pid,
146 UInt8 command,
147 UInt16 attribute_type,
148 const void * attribute_data,
149 int attribute_size)
150{
151 NetlinkMessage request{};
152
153 request.header.nlmsg_len = NLMSG_LENGTH(GENL_HDRLEN); /// Length of both headers.
154 request.header.nlmsg_type = type;
155 request.header.nlmsg_flags = NLM_F_REQUEST; /// A request.
156 request.header.nlmsg_seq = 0;
157 request.header.nlmsg_pid = pid;
158
159 request.generic_header.cmd = command;
160 request.generic_header.version = 1;
161
162 request.payload.attribute.header.nla_type = attribute_type;
163 request.payload.attribute.header.nla_len = attribute_size + NLA_HDRLEN;
164
165 memcpy(&request.payload.attribute.payload, attribute_data, attribute_size);
166
167 request.header.nlmsg_len += NLMSG_ALIGN(request.payload.attribute.header.nla_len);
168
169 request.send(fd);
170
171 NetlinkMessage response;
172 response.receive(fd);
173
174 return response;
175}
176
177
178UInt16 getFamilyIdImpl(int fd)
179{
180 NetlinkMessage answer = query(fd, GENL_ID_CTRL, getpid(), CTRL_CMD_GETFAMILY, CTRL_ATTR_FAMILY_NAME, TASKSTATS_GENL_NAME, strlen(TASKSTATS_GENL_NAME) + 1);
181
182 /// NOTE Why the relevant info is located in the second attribute?
183 const NetlinkMessage::Attribute * attr = answer.payload.attribute.next();
184
185 if (attr->header.nla_type != CTRL_ATTR_FAMILY_ID)
186 throw Exception("Received wrong attribute as an answer to GET_FAMILY Netlink command", ErrorCodes::NETLINK_ERROR);
187
188 return unalignedLoad<UInt16>(attr->payload);
189}
190
191
192bool checkPermissionsImpl()
193{
194 static bool res = hasLinuxCapability(CAP_NET_ADMIN);
195 if (!res)
196 return false;
197
198 /// Check that we can successfully initialize TaskStatsInfoGetter.
199 /// It will ask about family id through Netlink.
200 /// On some LXC containers we have capability but we still cannot use Netlink.
201
202 try
203 {
204 TaskStatsInfoGetter();
205 }
206 catch (...)
207 {
208 tryLogCurrentException(__PRETTY_FUNCTION__);
209 return false;
210 }
211
212 return true;
213}
214
215
216UInt16 getFamilyId(int fd)
217{
218 /// It is thread and exception safe since C++11 and even before.
219 static UInt16 res = getFamilyIdImpl(fd);
220 return res;
221}
222
223}
224
225
226bool TaskStatsInfoGetter::checkPermissions()
227{
228 static bool res = checkPermissionsImpl();
229 return res;
230}
231
232
233TaskStatsInfoGetter::TaskStatsInfoGetter()
234{
235 netlink_socket_fd = ::socket(PF_NETLINK, SOCK_RAW, NETLINK_GENERIC);
236 if (netlink_socket_fd < 0)
237 throwFromErrno("Can't create PF_NETLINK socket", ErrorCodes::NETLINK_ERROR);
238
239 /// On some containerized environments, operation on Netlink socket could hang forever.
240 /// We set reasonably small timeout to overcome this issue.
241
242 struct timeval tv;
243 tv.tv_sec = 0;
244 tv.tv_usec = 50000;
245
246 if (0 != ::setsockopt(netlink_socket_fd, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast<const char *>(&tv), sizeof(tv)))
247 throwFromErrno("Can't set timeout on PF_NETLINK socket", ErrorCodes::NETLINK_ERROR);
248
249 union
250 {
251 ::sockaddr_nl addr{};
252 ::sockaddr sockaddr;
253 };
254 addr.nl_family = AF_NETLINK;
255
256 if (::bind(netlink_socket_fd, &sockaddr, sizeof(addr)) < 0)
257 throwFromErrno("Can't bind PF_NETLINK socket", ErrorCodes::NETLINK_ERROR);
258
259 taskstats_family_id = getFamilyId(netlink_socket_fd);
260}
261
262
263void TaskStatsInfoGetter::getStat(::taskstats & out_stats, pid_t tid)
264{
265 NetlinkMessage answer = query(netlink_socket_fd, taskstats_family_id, tid, TASKSTATS_CMD_GET, TASKSTATS_CMD_ATTR_PID, &tid, sizeof(tid));
266
267 for (const NetlinkMessage::Attribute * attr = &answer.payload.attribute;
268 attr < answer.end();
269 attr = attr->next())
270 {
271 if (attr->header.nla_type == TASKSTATS_TYPE_AGGR_TGID || attr->header.nla_type == TASKSTATS_TYPE_AGGR_PID)
272 {
273 for (const NetlinkMessage::Attribute * nested_attr = reinterpret_cast<const NetlinkMessage::Attribute *>(attr->payload);
274 nested_attr < attr->next();
275 nested_attr = nested_attr->next())
276 {
277 if (nested_attr->header.nla_type == TASKSTATS_TYPE_STATS)
278 {
279 out_stats = unalignedLoad<::taskstats>(nested_attr->payload);
280 return;
281 }
282 }
283 }
284 }
285
286 throw Exception("There is no TASKSTATS_TYPE_STATS attribute in the Netlink response", ErrorCodes::NETLINK_ERROR);
287}
288
289
290static thread_local pid_t current_tid = 0;
291pid_t TaskStatsInfoGetter::getCurrentTID()
292{
293 if (!current_tid)
294 current_tid = syscall(SYS_gettid); /// This call is always successful. - man gettid
295
296 return current_tid;
297}
298
299
300TaskStatsInfoGetter::~TaskStatsInfoGetter()
301{
302 if (netlink_socket_fd >= 0)
303 close(netlink_socket_fd);
304}
305
306}
307
308
309#else
310
311namespace DB
312{
313
314namespace ErrorCodes
315{
316 extern const int NOT_IMPLEMENTED;
317}
318
319bool TaskStatsInfoGetter::checkPermissions()
320{
321 return false;
322}
323
324
325TaskStatsInfoGetter::TaskStatsInfoGetter()
326{
327 // TODO: throw Exception("TaskStats are not implemented for this OS.", ErrorCodes::NOT_IMPLEMENTED);
328}
329
330void TaskStatsInfoGetter::getStat(::taskstats &, pid_t)
331{
332}
333
334pid_t TaskStatsInfoGetter::getCurrentTID()
335{
336 return 0;
337}
338
339TaskStatsInfoGetter::~TaskStatsInfoGetter()
340{
341}
342
343}
344
345#endif
346