1 | /* |
2 | * IXSocket.cpp |
3 | * Author: Benjamin Sergeant |
4 | * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved. |
5 | */ |
6 | |
7 | #include "IXSocket.h" |
8 | |
9 | #include "IXNetSystem.h" |
10 | #include "IXSelectInterrupt.h" |
11 | #include "IXSelectInterruptFactory.h" |
12 | #include "IXSocketConnect.h" |
13 | #include <algorithm> |
14 | #include <array> |
15 | #include <assert.h> |
16 | #include <fcntl.h> |
17 | #include <stdint.h> |
18 | #include <stdio.h> |
19 | #include <stdlib.h> |
20 | #include <string.h> |
21 | #include <sys/types.h> |
22 | #include <vector> |
23 | |
24 | #ifdef min |
25 | #undef min |
26 | #endif |
27 | |
28 | namespace ix |
29 | { |
30 | const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default |
31 | const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout; |
32 | |
33 | Socket::Socket(int fd) |
34 | : _sockfd(fd) |
35 | , _selectInterrupt(createSelectInterrupt()) |
36 | { |
37 | ; |
38 | } |
39 | |
40 | Socket::~Socket() |
41 | { |
42 | close(); |
43 | } |
44 | |
45 | PollResultType Socket::poll(bool readyToRead, |
46 | int timeoutMs, |
47 | int sockfd, |
48 | const SelectInterruptPtr& selectInterrupt) |
49 | { |
50 | PollResultType pollResult = PollResultType::ReadyForRead; |
51 | |
52 | // |
53 | // We used to use ::select to poll but on Android 9 we get large fds out of |
54 | // ::connect which crash in FD_SET as they are larger than FD_SETSIZE. Switching |
55 | // to ::poll does fix that. |
56 | // |
57 | // However poll isn't as portable as select and has bugs on Windows, so we |
58 | // have a shim to fallback to select on those platforms. See |
59 | // https://github.com/mpv-player/mpv/pull/5203/files for such a select wrapper. |
60 | // |
61 | nfds_t nfds = 1; |
62 | struct pollfd fds[2]; |
63 | memset(fds, 0, sizeof(fds)); |
64 | |
65 | fds[0].fd = sockfd; |
66 | fds[0].events = (readyToRead) ? POLLIN : POLLOUT; |
67 | |
68 | // this is ignored by poll, but our select based poll wrapper on Windows needs it |
69 | fds[0].events |= POLLERR; |
70 | |
71 | // File descriptor used to interrupt select when needed |
72 | int interruptFd = -1; |
73 | void* interruptEvent = nullptr; |
74 | if (selectInterrupt) |
75 | { |
76 | interruptFd = selectInterrupt->getFd(); |
77 | interruptEvent = selectInterrupt->getEvent(); |
78 | |
79 | if (interruptFd != -1) |
80 | { |
81 | nfds = 2; |
82 | fds[1].fd = interruptFd; |
83 | fds[1].events = POLLIN; |
84 | } |
85 | else if (interruptEvent == nullptr) |
86 | { |
87 | // Emulation mode: SelectInterrupt neither supports file descriptors nor events |
88 | |
89 | // Check the selectInterrupt for requests before doing the poll(). |
90 | if (readSelectInterruptRequest(selectInterrupt, &pollResult)) |
91 | { |
92 | return pollResult; |
93 | } |
94 | } |
95 | } |
96 | |
97 | void* event = interruptEvent; // ix::poll will set event to nullptr if it wasn't signaled |
98 | int ret = ix::poll(fds, nfds, timeoutMs, &event); |
99 | |
100 | if (ret < 0) |
101 | { |
102 | pollResult = PollResultType::Error; |
103 | } |
104 | else if (ret == 0) |
105 | { |
106 | pollResult = PollResultType::Timeout; |
107 | if (selectInterrupt && interruptFd == -1 && interruptEvent == nullptr) |
108 | { |
109 | // Emulation mode: SelectInterrupt neither supports fd nor events |
110 | |
111 | // Check the selectInterrupt for requests |
112 | readSelectInterruptRequest(selectInterrupt, &pollResult); |
113 | } |
114 | } |
115 | else if ((interruptFd != -1 && fds[1].revents & POLLIN) || (interruptEvent != nullptr && event != nullptr)) |
116 | { |
117 | // The InterruptEvent was signaled |
118 | readSelectInterruptRequest(selectInterrupt, &pollResult); |
119 | } |
120 | else if (sockfd != -1 && readyToRead && fds[0].revents & POLLIN) |
121 | { |
122 | pollResult = PollResultType::ReadyForRead; |
123 | } |
124 | else if (sockfd != -1 && !readyToRead && fds[0].revents & POLLOUT) |
125 | { |
126 | pollResult = PollResultType::ReadyForWrite; |
127 | |
128 | #ifdef _WIN32 |
129 | // On connect error, in async mode, windows will write to the exceptions fds |
130 | if (fds[0].revents & POLLERR) |
131 | { |
132 | pollResult = PollResultType::Error; |
133 | } |
134 | #else |
135 | int optval = -1; |
136 | socklen_t optlen = sizeof(optval); |
137 | |
138 | // getsockopt() puts the errno value for connect into optval so 0 |
139 | // means no-error. |
140 | if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1 || optval != 0) |
141 | { |
142 | pollResult = PollResultType::Error; |
143 | |
144 | // set errno to optval so that external callers can have an |
145 | // appropriate error description when calling strerror |
146 | errno = optval; |
147 | } |
148 | #endif |
149 | } |
150 | else if (sockfd != -1 && (fds[0].revents & POLLERR || fds[0].revents & POLLHUP || |
151 | fds[0].revents & POLLNVAL)) |
152 | { |
153 | pollResult = PollResultType::Error; |
154 | } |
155 | |
156 | return pollResult; |
157 | } |
158 | |
159 | bool Socket::readSelectInterruptRequest(const SelectInterruptPtr& selectInterrupt, |
160 | PollResultType* pollResult) |
161 | { |
162 | uint64_t value = selectInterrupt->read(); |
163 | |
164 | if (value == SelectInterrupt::kSendRequest) |
165 | { |
166 | *pollResult = PollResultType::SendRequest; |
167 | return true; |
168 | } |
169 | else if (value == SelectInterrupt::kCloseRequest) |
170 | { |
171 | *pollResult = PollResultType::CloseRequest; |
172 | return true; |
173 | } |
174 | |
175 | return false; |
176 | } |
177 | |
178 | PollResultType Socket::isReadyToRead(int timeoutMs) |
179 | { |
180 | if (_sockfd == -1) |
181 | { |
182 | return PollResultType::Error; |
183 | } |
184 | |
185 | bool readyToRead = true; |
186 | return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt); |
187 | } |
188 | |
189 | PollResultType Socket::isReadyToWrite(int timeoutMs) |
190 | { |
191 | if (_sockfd == -1) |
192 | { |
193 | return PollResultType::Error; |
194 | } |
195 | |
196 | bool readyToRead = false; |
197 | return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt); |
198 | } |
199 | |
200 | // Wake up from poll/select by writing to the pipe which is watched by select |
201 | bool Socket::wakeUpFromPoll(uint64_t wakeUpCode) |
202 | { |
203 | return _selectInterrupt->notify(wakeUpCode); |
204 | } |
205 | |
206 | bool Socket::isWakeUpFromPollSupported() |
207 | { |
208 | return _selectInterrupt->getFd() != -1 || _selectInterrupt->getEvent() != nullptr; |
209 | } |
210 | |
211 | bool Socket::accept(std::string& errMsg) |
212 | { |
213 | if (_sockfd == -1) |
214 | { |
215 | errMsg = "Socket is uninitialized" ; |
216 | return false; |
217 | } |
218 | return true; |
219 | } |
220 | |
221 | bool Socket::connect(const std::string& host, |
222 | int port, |
223 | std::string& errMsg, |
224 | const CancellationRequest& isCancellationRequested) |
225 | { |
226 | std::lock_guard<std::mutex> lock(_socketMutex); |
227 | |
228 | if (!_selectInterrupt->clear()) return false; |
229 | |
230 | _sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested); |
231 | return _sockfd != -1; |
232 | } |
233 | |
234 | void Socket::close() |
235 | { |
236 | std::lock_guard<std::mutex> lock(_socketMutex); |
237 | |
238 | if (_sockfd == -1) return; |
239 | |
240 | closeSocket(_sockfd); |
241 | _sockfd = -1; |
242 | } |
243 | |
244 | ssize_t Socket::send(char* buffer, size_t length) |
245 | { |
246 | int flags = 0; |
247 | #ifdef MSG_NOSIGNAL |
248 | flags = MSG_NOSIGNAL; |
249 | #endif |
250 | |
251 | return ::send(_sockfd, buffer, length, flags); |
252 | } |
253 | |
254 | ssize_t Socket::send(const std::string& buffer) |
255 | { |
256 | return send((char*) &buffer[0], buffer.size()); |
257 | } |
258 | |
259 | ssize_t Socket::recv(void* buffer, size_t length) |
260 | { |
261 | int flags = 0; |
262 | #ifdef MSG_NOSIGNAL |
263 | flags = MSG_NOSIGNAL; |
264 | #endif |
265 | |
266 | return ::recv(_sockfd, (char*) buffer, length, flags); |
267 | } |
268 | |
269 | int Socket::getErrno() |
270 | { |
271 | int err; |
272 | |
273 | #ifdef _WIN32 |
274 | err = WSAGetLastError(); |
275 | #else |
276 | err = errno; |
277 | #endif |
278 | |
279 | return err; |
280 | } |
281 | |
282 | bool Socket::isWaitNeeded() |
283 | { |
284 | int err = getErrno(); |
285 | |
286 | if (err == EWOULDBLOCK || err == EAGAIN || err == EINPROGRESS) |
287 | { |
288 | return true; |
289 | } |
290 | |
291 | return false; |
292 | } |
293 | |
294 | void Socket::closeSocket(int fd) |
295 | { |
296 | #ifdef _WIN32 |
297 | closesocket(fd); |
298 | #else |
299 | ::close(fd); |
300 | #endif |
301 | } |
302 | |
303 | bool Socket::init(std::string& errorMsg) |
304 | { |
305 | return _selectInterrupt->init(errorMsg); |
306 | } |
307 | |
308 | bool Socket::writeBytes(const std::string& str, |
309 | const CancellationRequest& isCancellationRequested) |
310 | { |
311 | int offset = 0; |
312 | int len = (int) str.size(); |
313 | |
314 | while (true) |
315 | { |
316 | if (isCancellationRequested && isCancellationRequested()) return false; |
317 | |
318 | ssize_t ret = send((char*) &str[offset], len); |
319 | |
320 | // We wrote some bytes, as needed, all good. |
321 | if (ret > 0) |
322 | { |
323 | if (ret == len) |
324 | { |
325 | return true; |
326 | } |
327 | else |
328 | { |
329 | offset += ret; |
330 | len -= ret; |
331 | continue; |
332 | } |
333 | } |
334 | // There is possibly something to be writen, try again |
335 | else if (ret < 0 && Socket::isWaitNeeded()) |
336 | { |
337 | continue; |
338 | } |
339 | // There was an error during the write, abort |
340 | else |
341 | { |
342 | return false; |
343 | } |
344 | } |
345 | } |
346 | |
347 | bool Socket::readByte(void* buffer, const CancellationRequest& isCancellationRequested) |
348 | { |
349 | while (true) |
350 | { |
351 | if (isCancellationRequested && isCancellationRequested()) return false; |
352 | |
353 | ssize_t ret; |
354 | ret = recv(buffer, 1); |
355 | |
356 | // We read one byte, as needed, all good. |
357 | if (ret == 1) |
358 | { |
359 | return true; |
360 | } |
361 | // There is possibly something to be read, try again |
362 | else if (ret < 0 && Socket::isWaitNeeded()) |
363 | { |
364 | // Wait with a 1ms timeout until the socket is ready to read. |
365 | // This way we are not busy looping |
366 | if (isReadyToRead(1) == PollResultType::Error) |
367 | { |
368 | return false; |
369 | } |
370 | } |
371 | // There was an error during the read, abort |
372 | else |
373 | { |
374 | return false; |
375 | } |
376 | } |
377 | } |
378 | |
379 | std::pair<bool, std::string> Socket::readLine( |
380 | const CancellationRequest& isCancellationRequested) |
381 | { |
382 | char c; |
383 | std::string line; |
384 | line.reserve(64); |
385 | |
386 | for (int i = 0; i < 2 || (line[i - 2] != '\r' && line[i - 1] != '\n'); ++i) |
387 | { |
388 | if (!readByte(&c, isCancellationRequested)) |
389 | { |
390 | // Return what we were able to read |
391 | return std::make_pair(false, line); |
392 | } |
393 | |
394 | line += c; |
395 | } |
396 | |
397 | return std::make_pair(true, line); |
398 | } |
399 | |
400 | std::pair<bool, std::string> Socket::readBytes( |
401 | size_t length, |
402 | const OnProgressCallback& onProgressCallback, |
403 | const OnChunkCallback& onChunkCallback, |
404 | const CancellationRequest& isCancellationRequested) |
405 | { |
406 | std::array<uint8_t, 1 << 14> readBuffer; |
407 | std::vector<uint8_t> output; |
408 | size_t bytesRead = 0; |
409 | |
410 | while (bytesRead != length) |
411 | { |
412 | if (isCancellationRequested && isCancellationRequested()) |
413 | { |
414 | const std::string errorMsg("Cancellation Requested" ); |
415 | return std::make_pair(false, errorMsg); |
416 | } |
417 | |
418 | size_t size = std::min(readBuffer.size(), length - bytesRead); |
419 | ssize_t ret = recv((char*) &readBuffer[0], size); |
420 | |
421 | if (ret > 0) |
422 | { |
423 | if (onChunkCallback) |
424 | { |
425 | std::string chunk(readBuffer.begin(), readBuffer.begin() + ret); |
426 | onChunkCallback(chunk); |
427 | } |
428 | else |
429 | { |
430 | output.insert(output.end(), readBuffer.begin(), readBuffer.begin() + ret); |
431 | } |
432 | bytesRead += ret; |
433 | } |
434 | else if (ret <= 0 && !Socket::isWaitNeeded()) |
435 | { |
436 | const std::string errorMsg("Recv Error" ); |
437 | return std::make_pair(false, errorMsg); |
438 | } |
439 | |
440 | if (onProgressCallback) onProgressCallback((int) bytesRead, (int) length); |
441 | |
442 | // Wait with a 1ms timeout until the socket is ready to read. |
443 | // This way we are not busy looping |
444 | if (isReadyToRead(1) == PollResultType::Error) |
445 | { |
446 | const std::string errorMsg("Poll Error" ); |
447 | return std::make_pair(false, errorMsg); |
448 | } |
449 | } |
450 | |
451 | return std::make_pair(true, std::string(output.begin(), output.end())); |
452 | } |
453 | } // namespace ix |
454 | |