| 1 | #include "TaskStatsInfoGetter.h" |
| 2 | #include <Common/Exception.h> |
| 3 | #include <Core/Types.h> |
| 4 | |
| 5 | #include <unistd.h> |
| 6 | |
| 7 | #if defined(OS_LINUX) |
| 8 | |
| 9 | #include "hasLinuxCapability.h" |
| 10 | #include <common/unaligned.h> |
| 11 | |
| 12 | #include <errno.h> |
| 13 | #include <stdio.h> |
| 14 | #include <stdlib.h> |
| 15 | #include <string.h> |
| 16 | #include <sys/socket.h> |
| 17 | #include <syscall.h> |
| 18 | #include <linux/genetlink.h> |
| 19 | #include <linux/netlink.h> |
| 20 | #include <linux/taskstats.h> |
| 21 | #include <linux/capability.h> |
| 22 | |
| 23 | |
| 24 | /// Basic idea is motivated by "iotop" tool. |
| 25 | /// More info: https://www.kernel.org/doc/Documentation/accounting/taskstats.txt |
| 26 | |
| 27 | |
| 28 | namespace DB |
| 29 | { |
| 30 | |
| 31 | namespace ErrorCodes |
| 32 | { |
| 33 | extern const int NETLINK_ERROR; |
| 34 | extern const int LOGICAL_ERROR; |
| 35 | } |
| 36 | |
| 37 | // Replace NLMSG_OK with explicit casts since that system macro contains signedness bugs which are not going to be fixed. |
| 38 | static inline bool is_nlmsg_ok(const struct nlmsghdr * const nlh, const ssize_t len) |
| 39 | { |
| 40 | return len >= static_cast<ssize_t>(sizeof(*nlh)) && nlh->nlmsg_len >= sizeof(*nlh) && static_cast<size_t>(len) >= nlh->nlmsg_len; |
| 41 | } |
| 42 | |
| 43 | namespace |
| 44 | { |
| 45 | |
| 46 | |
| 47 | /** The message contains: |
| 48 | * - Netlink protocol header; |
| 49 | * - Generic Netlink (is a sub-protocol of Netlink that we use) protocol header; |
| 50 | * - Payload |
| 51 | * -- that itself is a list of "Attributes" (sub-messages), each of them contains length (including header), type, and its own payload. |
| 52 | * -- and attribute payload may be represented by the list of embedded attributes. |
| 53 | */ |
| 54 | struct NetlinkMessage |
| 55 | { |
| 56 | static size_t constexpr MAX_MSG_SIZE = 1024; |
| 57 | |
| 58 | alignas(NLMSG_ALIGNTO) ::nlmsghdr ; |
| 59 | |
| 60 | struct Attribute |
| 61 | { |
| 62 | ::nlattr ; |
| 63 | |
| 64 | alignas(NLMSG_ALIGNTO) char payload[0]; |
| 65 | |
| 66 | const Attribute * next() const |
| 67 | { |
| 68 | return reinterpret_cast<const Attribute *>(reinterpret_cast<const char *>(this) + NLA_ALIGN(header.nla_len)); |
| 69 | } |
| 70 | }; |
| 71 | |
| 72 | union alignas(NLMSG_ALIGNTO) |
| 73 | { |
| 74 | struct |
| 75 | { |
| 76 | ::genlmsghdr ; |
| 77 | |
| 78 | union alignas(NLMSG_ALIGNTO) |
| 79 | { |
| 80 | char buf[MAX_MSG_SIZE]; |
| 81 | Attribute attribute; /// First attribute. There may be more. |
| 82 | } payload; |
| 83 | }; |
| 84 | |
| 85 | ::nlmsgerr error; |
| 86 | }; |
| 87 | |
| 88 | const Attribute * end() const |
| 89 | { |
| 90 | return reinterpret_cast<const Attribute *>(reinterpret_cast<const char *>(this) + header.nlmsg_len); |
| 91 | } |
| 92 | |
| 93 | void send(int fd) const |
| 94 | { |
| 95 | const char * request_buf = reinterpret_cast<const char *>(this); |
| 96 | ssize_t request_size = header.nlmsg_len; |
| 97 | |
| 98 | union |
| 99 | { |
| 100 | ::sockaddr_nl nladdr{}; |
| 101 | ::sockaddr sockaddr; |
| 102 | }; |
| 103 | |
| 104 | nladdr.nl_family = AF_NETLINK; |
| 105 | |
| 106 | while (true) |
| 107 | { |
| 108 | ssize_t bytes_sent = ::sendto(fd, request_buf, request_size, 0, &sockaddr, sizeof(nladdr)); |
| 109 | |
| 110 | if (bytes_sent <= 0) |
| 111 | { |
| 112 | if (errno == EAGAIN) |
| 113 | continue; |
| 114 | else |
| 115 | throwFromErrno("Can't send a Netlink command" , ErrorCodes::NETLINK_ERROR); |
| 116 | } |
| 117 | |
| 118 | if (bytes_sent > request_size) |
| 119 | throw Exception("Wrong result of sendto system call: bytes_sent is greater than request size" , ErrorCodes::NETLINK_ERROR); |
| 120 | |
| 121 | if (bytes_sent == request_size) |
| 122 | break; |
| 123 | |
| 124 | request_buf += bytes_sent; |
| 125 | request_size -= bytes_sent; |
| 126 | } |
| 127 | } |
| 128 | |
| 129 | void receive(int fd) |
| 130 | { |
| 131 | ssize_t bytes_received = ::recv(fd, this, sizeof(*this), 0); |
| 132 | |
| 133 | if (header.nlmsg_type == NLMSG_ERROR) |
| 134 | throw Exception("Can't receive Netlink response: error " + std::to_string(error.error), ErrorCodes::NETLINK_ERROR); |
| 135 | |
| 136 | if (!is_nlmsg_ok(&header, bytes_received)) |
| 137 | throw Exception("Can't receive Netlink response: wrong number of bytes received" , ErrorCodes::NETLINK_ERROR); |
| 138 | } |
| 139 | }; |
| 140 | |
| 141 | |
| 142 | NetlinkMessage query( |
| 143 | int fd, |
| 144 | UInt16 type, |
| 145 | UInt32 pid, |
| 146 | UInt8 command, |
| 147 | UInt16 attribute_type, |
| 148 | const void * attribute_data, |
| 149 | int attribute_size) |
| 150 | { |
| 151 | NetlinkMessage request{}; |
| 152 | |
| 153 | request.header.nlmsg_len = NLMSG_LENGTH(GENL_HDRLEN); /// Length of both headers. |
| 154 | request.header.nlmsg_type = type; |
| 155 | request.header.nlmsg_flags = NLM_F_REQUEST; /// A request. |
| 156 | request.header.nlmsg_seq = 0; |
| 157 | request.header.nlmsg_pid = pid; |
| 158 | |
| 159 | request.generic_header.cmd = command; |
| 160 | request.generic_header.version = 1; |
| 161 | |
| 162 | request.payload.attribute.header.nla_type = attribute_type; |
| 163 | request.payload.attribute.header.nla_len = attribute_size + NLA_HDRLEN; |
| 164 | |
| 165 | memcpy(&request.payload.attribute.payload, attribute_data, attribute_size); |
| 166 | |
| 167 | request.header.nlmsg_len += NLMSG_ALIGN(request.payload.attribute.header.nla_len); |
| 168 | |
| 169 | request.send(fd); |
| 170 | |
| 171 | NetlinkMessage response; |
| 172 | response.receive(fd); |
| 173 | |
| 174 | return response; |
| 175 | } |
| 176 | |
| 177 | |
| 178 | UInt16 getFamilyIdImpl(int fd) |
| 179 | { |
| 180 | NetlinkMessage answer = query(fd, GENL_ID_CTRL, getpid(), CTRL_CMD_GETFAMILY, CTRL_ATTR_FAMILY_NAME, TASKSTATS_GENL_NAME, strlen(TASKSTATS_GENL_NAME) + 1); |
| 181 | |
| 182 | /// NOTE Why the relevant info is located in the second attribute? |
| 183 | const NetlinkMessage::Attribute * attr = answer.payload.attribute.next(); |
| 184 | |
| 185 | if (attr->header.nla_type != CTRL_ATTR_FAMILY_ID) |
| 186 | throw Exception("Received wrong attribute as an answer to GET_FAMILY Netlink command" , ErrorCodes::NETLINK_ERROR); |
| 187 | |
| 188 | return unalignedLoad<UInt16>(attr->payload); |
| 189 | } |
| 190 | |
| 191 | |
| 192 | bool checkPermissionsImpl() |
| 193 | { |
| 194 | static bool res = hasLinuxCapability(CAP_NET_ADMIN); |
| 195 | if (!res) |
| 196 | return false; |
| 197 | |
| 198 | /// Check that we can successfully initialize TaskStatsInfoGetter. |
| 199 | /// It will ask about family id through Netlink. |
| 200 | /// On some LXC containers we have capability but we still cannot use Netlink. |
| 201 | |
| 202 | try |
| 203 | { |
| 204 | TaskStatsInfoGetter(); |
| 205 | } |
| 206 | catch (...) |
| 207 | { |
| 208 | tryLogCurrentException(__PRETTY_FUNCTION__); |
| 209 | return false; |
| 210 | } |
| 211 | |
| 212 | return true; |
| 213 | } |
| 214 | |
| 215 | |
| 216 | UInt16 getFamilyId(int fd) |
| 217 | { |
| 218 | /// It is thread and exception safe since C++11 and even before. |
| 219 | static UInt16 res = getFamilyIdImpl(fd); |
| 220 | return res; |
| 221 | } |
| 222 | |
| 223 | } |
| 224 | |
| 225 | |
| 226 | bool TaskStatsInfoGetter::checkPermissions() |
| 227 | { |
| 228 | static bool res = checkPermissionsImpl(); |
| 229 | return res; |
| 230 | } |
| 231 | |
| 232 | |
| 233 | TaskStatsInfoGetter::TaskStatsInfoGetter() |
| 234 | { |
| 235 | netlink_socket_fd = ::socket(PF_NETLINK, SOCK_RAW, NETLINK_GENERIC); |
| 236 | if (netlink_socket_fd < 0) |
| 237 | throwFromErrno("Can't create PF_NETLINK socket" , ErrorCodes::NETLINK_ERROR); |
| 238 | |
| 239 | /// On some containerized environments, operation on Netlink socket could hang forever. |
| 240 | /// We set reasonably small timeout to overcome this issue. |
| 241 | |
| 242 | struct timeval tv; |
| 243 | tv.tv_sec = 0; |
| 244 | tv.tv_usec = 50000; |
| 245 | |
| 246 | if (0 != ::setsockopt(netlink_socket_fd, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast<const char *>(&tv), sizeof(tv))) |
| 247 | throwFromErrno("Can't set timeout on PF_NETLINK socket" , ErrorCodes::NETLINK_ERROR); |
| 248 | |
| 249 | union |
| 250 | { |
| 251 | ::sockaddr_nl addr{}; |
| 252 | ::sockaddr sockaddr; |
| 253 | }; |
| 254 | addr.nl_family = AF_NETLINK; |
| 255 | |
| 256 | if (::bind(netlink_socket_fd, &sockaddr, sizeof(addr)) < 0) |
| 257 | throwFromErrno("Can't bind PF_NETLINK socket" , ErrorCodes::NETLINK_ERROR); |
| 258 | |
| 259 | taskstats_family_id = getFamilyId(netlink_socket_fd); |
| 260 | } |
| 261 | |
| 262 | |
| 263 | void TaskStatsInfoGetter::getStat(::taskstats & out_stats, pid_t tid) |
| 264 | { |
| 265 | NetlinkMessage answer = query(netlink_socket_fd, taskstats_family_id, tid, TASKSTATS_CMD_GET, TASKSTATS_CMD_ATTR_PID, &tid, sizeof(tid)); |
| 266 | |
| 267 | for (const NetlinkMessage::Attribute * attr = &answer.payload.attribute; |
| 268 | attr < answer.end(); |
| 269 | attr = attr->next()) |
| 270 | { |
| 271 | if (attr->header.nla_type == TASKSTATS_TYPE_AGGR_TGID || attr->header.nla_type == TASKSTATS_TYPE_AGGR_PID) |
| 272 | { |
| 273 | for (const NetlinkMessage::Attribute * nested_attr = reinterpret_cast<const NetlinkMessage::Attribute *>(attr->payload); |
| 274 | nested_attr < attr->next(); |
| 275 | nested_attr = nested_attr->next()) |
| 276 | { |
| 277 | if (nested_attr->header.nla_type == TASKSTATS_TYPE_STATS) |
| 278 | { |
| 279 | out_stats = unalignedLoad<::taskstats>(nested_attr->payload); |
| 280 | return; |
| 281 | } |
| 282 | } |
| 283 | } |
| 284 | } |
| 285 | |
| 286 | throw Exception("There is no TASKSTATS_TYPE_STATS attribute in the Netlink response" , ErrorCodes::NETLINK_ERROR); |
| 287 | } |
| 288 | |
| 289 | |
| 290 | static thread_local pid_t current_tid = 0; |
| 291 | pid_t TaskStatsInfoGetter::getCurrentTID() |
| 292 | { |
| 293 | if (!current_tid) |
| 294 | current_tid = syscall(SYS_gettid); /// This call is always successful. - man gettid |
| 295 | |
| 296 | return current_tid; |
| 297 | } |
| 298 | |
| 299 | |
| 300 | TaskStatsInfoGetter::~TaskStatsInfoGetter() |
| 301 | { |
| 302 | if (netlink_socket_fd >= 0) |
| 303 | close(netlink_socket_fd); |
| 304 | } |
| 305 | |
| 306 | } |
| 307 | |
| 308 | |
| 309 | #else |
| 310 | |
| 311 | namespace DB |
| 312 | { |
| 313 | |
| 314 | namespace ErrorCodes |
| 315 | { |
| 316 | extern const int NOT_IMPLEMENTED; |
| 317 | } |
| 318 | |
| 319 | bool TaskStatsInfoGetter::checkPermissions() |
| 320 | { |
| 321 | return false; |
| 322 | } |
| 323 | |
| 324 | |
| 325 | TaskStatsInfoGetter::TaskStatsInfoGetter() |
| 326 | { |
| 327 | // TODO: throw Exception("TaskStats are not implemented for this OS.", ErrorCodes::NOT_IMPLEMENTED); |
| 328 | } |
| 329 | |
| 330 | void TaskStatsInfoGetter::getStat(::taskstats &, pid_t) |
| 331 | { |
| 332 | } |
| 333 | |
| 334 | pid_t TaskStatsInfoGetter::getCurrentTID() |
| 335 | { |
| 336 | return 0; |
| 337 | } |
| 338 | |
| 339 | TaskStatsInfoGetter::~TaskStatsInfoGetter() |
| 340 | { |
| 341 | } |
| 342 | |
| 343 | } |
| 344 | |
| 345 | #endif |
| 346 | |