1/*
2 * IXSocket.cpp
3 * Author: Benjamin Sergeant
4 * Copyright (c) 2017-2018 Machine Zone, Inc. All rights reserved.
5 */
6
7#include "IXSocket.h"
8
9#include "IXNetSystem.h"
10#include "IXSelectInterrupt.h"
11#include "IXSelectInterruptFactory.h"
12#include "IXSocketConnect.h"
13#include <algorithm>
14#include <array>
15#include <assert.h>
16#include <fcntl.h>
17#include <stdint.h>
18#include <stdio.h>
19#include <stdlib.h>
20#include <string.h>
21#include <sys/types.h>
22#include <vector>
23
24#ifdef min
25#undef min
26#endif
27
28namespace ix
29{
30 const int Socket::kDefaultPollNoTimeout = -1; // No poll timeout by default
31 const int Socket::kDefaultPollTimeout = kDefaultPollNoTimeout;
32
33 Socket::Socket(int fd)
34 : _sockfd(fd)
35 , _selectInterrupt(createSelectInterrupt())
36 {
37 ;
38 }
39
40 Socket::~Socket()
41 {
42 close();
43 }
44
45 PollResultType Socket::poll(bool readyToRead,
46 int timeoutMs,
47 int sockfd,
48 const SelectInterruptPtr& selectInterrupt)
49 {
50 PollResultType pollResult = PollResultType::ReadyForRead;
51
52 //
53 // We used to use ::select to poll but on Android 9 we get large fds out of
54 // ::connect which crash in FD_SET as they are larger than FD_SETSIZE. Switching
55 // to ::poll does fix that.
56 //
57 // However poll isn't as portable as select and has bugs on Windows, so we
58 // have a shim to fallback to select on those platforms. See
59 // https://github.com/mpv-player/mpv/pull/5203/files for such a select wrapper.
60 //
61 nfds_t nfds = 1;
62 struct pollfd fds[2];
63 memset(fds, 0, sizeof(fds));
64
65 fds[0].fd = sockfd;
66 fds[0].events = (readyToRead) ? POLLIN : POLLOUT;
67
68 // this is ignored by poll, but our select based poll wrapper on Windows needs it
69 fds[0].events |= POLLERR;
70
71 // File descriptor used to interrupt select when needed
72 int interruptFd = -1;
73 void* interruptEvent = nullptr;
74 if (selectInterrupt)
75 {
76 interruptFd = selectInterrupt->getFd();
77 interruptEvent = selectInterrupt->getEvent();
78
79 if (interruptFd != -1)
80 {
81 nfds = 2;
82 fds[1].fd = interruptFd;
83 fds[1].events = POLLIN;
84 }
85 else if (interruptEvent == nullptr)
86 {
87 // Emulation mode: SelectInterrupt neither supports file descriptors nor events
88
89 // Check the selectInterrupt for requests before doing the poll().
90 if (readSelectInterruptRequest(selectInterrupt, &pollResult))
91 {
92 return pollResult;
93 }
94 }
95 }
96
97 void* event = interruptEvent; // ix::poll will set event to nullptr if it wasn't signaled
98 int ret = ix::poll(fds, nfds, timeoutMs, &event);
99
100 if (ret < 0)
101 {
102 pollResult = PollResultType::Error;
103 }
104 else if (ret == 0)
105 {
106 pollResult = PollResultType::Timeout;
107 if (selectInterrupt && interruptFd == -1 && interruptEvent == nullptr)
108 {
109 // Emulation mode: SelectInterrupt neither supports fd nor events
110
111 // Check the selectInterrupt for requests
112 readSelectInterruptRequest(selectInterrupt, &pollResult);
113 }
114 }
115 else if ((interruptFd != -1 && fds[1].revents & POLLIN) || (interruptEvent != nullptr && event != nullptr))
116 {
117 // The InterruptEvent was signaled
118 readSelectInterruptRequest(selectInterrupt, &pollResult);
119 }
120 else if (sockfd != -1 && readyToRead && fds[0].revents & POLLIN)
121 {
122 pollResult = PollResultType::ReadyForRead;
123 }
124 else if (sockfd != -1 && !readyToRead && fds[0].revents & POLLOUT)
125 {
126 pollResult = PollResultType::ReadyForWrite;
127
128#ifdef _WIN32
129 // On connect error, in async mode, windows will write to the exceptions fds
130 if (fds[0].revents & POLLERR)
131 {
132 pollResult = PollResultType::Error;
133 }
134#else
135 int optval = -1;
136 socklen_t optlen = sizeof(optval);
137
138 // getsockopt() puts the errno value for connect into optval so 0
139 // means no-error.
140 if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1 || optval != 0)
141 {
142 pollResult = PollResultType::Error;
143
144 // set errno to optval so that external callers can have an
145 // appropriate error description when calling strerror
146 errno = optval;
147 }
148#endif
149 }
150 else if (sockfd != -1 && (fds[0].revents & POLLERR || fds[0].revents & POLLHUP ||
151 fds[0].revents & POLLNVAL))
152 {
153 pollResult = PollResultType::Error;
154 }
155
156 return pollResult;
157 }
158
159 bool Socket::readSelectInterruptRequest(const SelectInterruptPtr& selectInterrupt,
160 PollResultType* pollResult)
161 {
162 uint64_t value = selectInterrupt->read();
163
164 if (value == SelectInterrupt::kSendRequest)
165 {
166 *pollResult = PollResultType::SendRequest;
167 return true;
168 }
169 else if (value == SelectInterrupt::kCloseRequest)
170 {
171 *pollResult = PollResultType::CloseRequest;
172 return true;
173 }
174
175 return false;
176 }
177
178 PollResultType Socket::isReadyToRead(int timeoutMs)
179 {
180 if (_sockfd == -1)
181 {
182 return PollResultType::Error;
183 }
184
185 bool readyToRead = true;
186 return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt);
187 }
188
189 PollResultType Socket::isReadyToWrite(int timeoutMs)
190 {
191 if (_sockfd == -1)
192 {
193 return PollResultType::Error;
194 }
195
196 bool readyToRead = false;
197 return poll(readyToRead, timeoutMs, _sockfd, _selectInterrupt);
198 }
199
200 // Wake up from poll/select by writing to the pipe which is watched by select
201 bool Socket::wakeUpFromPoll(uint64_t wakeUpCode)
202 {
203 return _selectInterrupt->notify(wakeUpCode);
204 }
205
206 bool Socket::isWakeUpFromPollSupported()
207 {
208 return _selectInterrupt->getFd() != -1 || _selectInterrupt->getEvent() != nullptr;
209 }
210
211 bool Socket::accept(std::string& errMsg)
212 {
213 if (_sockfd == -1)
214 {
215 errMsg = "Socket is uninitialized";
216 return false;
217 }
218 return true;
219 }
220
221 bool Socket::connect(const std::string& host,
222 int port,
223 std::string& errMsg,
224 const CancellationRequest& isCancellationRequested)
225 {
226 std::lock_guard<std::mutex> lock(_socketMutex);
227
228 if (!_selectInterrupt->clear()) return false;
229
230 _sockfd = SocketConnect::connect(host, port, errMsg, isCancellationRequested);
231 return _sockfd != -1;
232 }
233
234 void Socket::close()
235 {
236 std::lock_guard<std::mutex> lock(_socketMutex);
237
238 if (_sockfd == -1) return;
239
240 closeSocket(_sockfd);
241 _sockfd = -1;
242 }
243
244 ssize_t Socket::send(char* buffer, size_t length)
245 {
246 int flags = 0;
247#ifdef MSG_NOSIGNAL
248 flags = MSG_NOSIGNAL;
249#endif
250
251 return ::send(_sockfd, buffer, length, flags);
252 }
253
254 ssize_t Socket::send(const std::string& buffer)
255 {
256 return send((char*) &buffer[0], buffer.size());
257 }
258
259 ssize_t Socket::recv(void* buffer, size_t length)
260 {
261 int flags = 0;
262#ifdef MSG_NOSIGNAL
263 flags = MSG_NOSIGNAL;
264#endif
265
266 return ::recv(_sockfd, (char*) buffer, length, flags);
267 }
268
269 int Socket::getErrno()
270 {
271 int err;
272
273#ifdef _WIN32
274 err = WSAGetLastError();
275#else
276 err = errno;
277#endif
278
279 return err;
280 }
281
282 bool Socket::isWaitNeeded()
283 {
284 int err = getErrno();
285
286 if (err == EWOULDBLOCK || err == EAGAIN || err == EINPROGRESS)
287 {
288 return true;
289 }
290
291 return false;
292 }
293
294 void Socket::closeSocket(int fd)
295 {
296#ifdef _WIN32
297 closesocket(fd);
298#else
299 ::close(fd);
300#endif
301 }
302
303 bool Socket::init(std::string& errorMsg)
304 {
305 return _selectInterrupt->init(errorMsg);
306 }
307
308 bool Socket::writeBytes(const std::string& str,
309 const CancellationRequest& isCancellationRequested)
310 {
311 int offset = 0;
312 int len = (int) str.size();
313
314 while (true)
315 {
316 if (isCancellationRequested && isCancellationRequested()) return false;
317
318 ssize_t ret = send((char*) &str[offset], len);
319
320 // We wrote some bytes, as needed, all good.
321 if (ret > 0)
322 {
323 if (ret == len)
324 {
325 return true;
326 }
327 else
328 {
329 offset += ret;
330 len -= ret;
331 continue;
332 }
333 }
334 // There is possibly something to be writen, try again
335 else if (ret < 0 && Socket::isWaitNeeded())
336 {
337 continue;
338 }
339 // There was an error during the write, abort
340 else
341 {
342 return false;
343 }
344 }
345 }
346
347 bool Socket::readByte(void* buffer, const CancellationRequest& isCancellationRequested)
348 {
349 while (true)
350 {
351 if (isCancellationRequested && isCancellationRequested()) return false;
352
353 ssize_t ret;
354 ret = recv(buffer, 1);
355
356 // We read one byte, as needed, all good.
357 if (ret == 1)
358 {
359 return true;
360 }
361 // There is possibly something to be read, try again
362 else if (ret < 0 && Socket::isWaitNeeded())
363 {
364 // Wait with a 1ms timeout until the socket is ready to read.
365 // This way we are not busy looping
366 if (isReadyToRead(1) == PollResultType::Error)
367 {
368 return false;
369 }
370 }
371 // There was an error during the read, abort
372 else
373 {
374 return false;
375 }
376 }
377 }
378
379 std::pair<bool, std::string> Socket::readLine(
380 const CancellationRequest& isCancellationRequested)
381 {
382 char c;
383 std::string line;
384 line.reserve(64);
385
386 for (int i = 0; i < 2 || (line[i - 2] != '\r' && line[i - 1] != '\n'); ++i)
387 {
388 if (!readByte(&c, isCancellationRequested))
389 {
390 // Return what we were able to read
391 return std::make_pair(false, line);
392 }
393
394 line += c;
395 }
396
397 return std::make_pair(true, line);
398 }
399
400 std::pair<bool, std::string> Socket::readBytes(
401 size_t length,
402 const OnProgressCallback& onProgressCallback,
403 const OnChunkCallback& onChunkCallback,
404 const CancellationRequest& isCancellationRequested)
405 {
406 std::array<uint8_t, 1 << 14> readBuffer;
407 std::vector<uint8_t> output;
408 size_t bytesRead = 0;
409
410 while (bytesRead != length)
411 {
412 if (isCancellationRequested && isCancellationRequested())
413 {
414 const std::string errorMsg("Cancellation Requested");
415 return std::make_pair(false, errorMsg);
416 }
417
418 size_t size = std::min(readBuffer.size(), length - bytesRead);
419 ssize_t ret = recv((char*) &readBuffer[0], size);
420
421 if (ret > 0)
422 {
423 if (onChunkCallback)
424 {
425 std::string chunk(readBuffer.begin(), readBuffer.begin() + ret);
426 onChunkCallback(chunk);
427 }
428 else
429 {
430 output.insert(output.end(), readBuffer.begin(), readBuffer.begin() + ret);
431 }
432 bytesRead += ret;
433 }
434 else if (ret <= 0 && !Socket::isWaitNeeded())
435 {
436 const std::string errorMsg("Recv Error");
437 return std::make_pair(false, errorMsg);
438 }
439
440 if (onProgressCallback) onProgressCallback((int) bytesRead, (int) length);
441
442 // Wait with a 1ms timeout until the socket is ready to read.
443 // This way we are not busy looping
444 if (isReadyToRead(1) == PollResultType::Error)
445 {
446 const std::string errorMsg("Poll Error");
447 return std::make_pair(false, errorMsg);
448 }
449 }
450
451 return std::make_pair(true, std::string(output.begin(), output.end()));
452 }
453} // namespace ix
454