1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #include "precompiled.hpp" |
31 | #include "macros.hpp" |
32 | #include "ip.hpp" |
33 | #include "tcp.hpp" |
34 | #include "err.hpp" |
35 | #include "options.hpp" |
36 | |
37 | #if !defined ZMQ_HAVE_WINDOWS |
38 | #include <fcntl.h> |
39 | #include <sys/types.h> |
40 | #include <sys/socket.h> |
41 | #include <netinet/in.h> |
42 | #include <netinet/tcp.h> |
43 | #include <unistd.h> |
44 | #ifdef ZMQ_HAVE_VXWORKS |
45 | #include <sockLib.h> |
46 | #endif |
47 | #endif |
48 | |
49 | #if defined ZMQ_HAVE_OPENVMS |
50 | #include <ioctl.h> |
51 | #endif |
52 | |
53 | #ifdef __APPLE__ |
54 | #include <TargetConditionals.h> |
55 | #endif |
56 | |
57 | int zmq::tune_tcp_socket (fd_t s_) |
58 | { |
59 | // Disable Nagle's algorithm. We are doing data batching on 0MQ level, |
60 | // so using Nagle wouldn't improve throughput in anyway, but it would |
61 | // hurt latency. |
62 | int nodelay = 1; |
63 | int rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELAY, |
64 | reinterpret_cast<char *> (&nodelay), sizeof (int)); |
65 | assert_success_or_recoverable (s_, rc); |
66 | if (rc != 0) |
67 | return rc; |
68 | |
69 | #ifdef ZMQ_HAVE_OPENVMS |
70 | // Disable delayed acknowledgements as they hurt latency significantly. |
71 | int nodelack = 1; |
72 | rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELACK, (char *) &nodelack, |
73 | sizeof (int)); |
74 | assert_success_or_recoverable (s_, rc); |
75 | #endif |
76 | return rc; |
77 | } |
78 | |
79 | int zmq::set_tcp_send_buffer (fd_t sockfd_, int bufsize_) |
80 | { |
81 | const int rc = |
82 | setsockopt (sockfd_, SOL_SOCKET, SO_SNDBUF, |
83 | reinterpret_cast<char *> (&bufsize_), sizeof bufsize_); |
84 | assert_success_or_recoverable (sockfd_, rc); |
85 | return rc; |
86 | } |
87 | |
88 | int zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_) |
89 | { |
90 | const int rc = |
91 | setsockopt (sockfd_, SOL_SOCKET, SO_RCVBUF, |
92 | reinterpret_cast<char *> (&bufsize_), sizeof bufsize_); |
93 | assert_success_or_recoverable (sockfd_, rc); |
94 | return rc; |
95 | } |
96 | |
97 | int zmq::tune_tcp_keepalives (fd_t s_, |
98 | int keepalive_, |
99 | int keepalive_cnt_, |
100 | int keepalive_idle_, |
101 | int keepalive_intvl_) |
102 | { |
103 | // These options are used only under certain #ifdefs below. |
104 | LIBZMQ_UNUSED (keepalive_); |
105 | LIBZMQ_UNUSED (keepalive_cnt_); |
106 | LIBZMQ_UNUSED (keepalive_idle_); |
107 | LIBZMQ_UNUSED (keepalive_intvl_); |
108 | |
109 | // If none of the #ifdefs apply, then s_ is unused. |
110 | LIBZMQ_UNUSED (s_); |
111 | |
112 | // Tuning TCP keep-alives if platform allows it |
113 | // All values = -1 means skip and leave it for OS |
114 | #ifdef ZMQ_HAVE_WINDOWS |
115 | if (keepalive_ != -1) { |
116 | tcp_keepalive keepalive_opts; |
117 | keepalive_opts.onoff = keepalive_; |
118 | keepalive_opts.keepalivetime = |
119 | keepalive_idle_ != -1 ? keepalive_idle_ * 1000 : 7200000; |
120 | keepalive_opts.keepaliveinterval = |
121 | keepalive_intvl_ != -1 ? keepalive_intvl_ * 1000 : 1000; |
122 | DWORD num_bytes_returned; |
123 | int rc = WSAIoctl (s_, SIO_KEEPALIVE_VALS, &keepalive_opts, |
124 | sizeof (keepalive_opts), NULL, 0, |
125 | &num_bytes_returned, NULL, NULL); |
126 | assert_success_or_recoverable (s_, rc); |
127 | if (rc == SOCKET_ERROR) |
128 | return rc; |
129 | } |
130 | #else |
131 | #ifdef ZMQ_HAVE_SO_KEEPALIVE |
132 | if (keepalive_ != -1) { |
133 | int rc = |
134 | setsockopt (s_, SOL_SOCKET, SO_KEEPALIVE, |
135 | reinterpret_cast<char *> (&keepalive_), sizeof (int)); |
136 | assert_success_or_recoverable (s_, rc); |
137 | if (rc != 0) |
138 | return rc; |
139 | |
140 | #ifdef ZMQ_HAVE_TCP_KEEPCNT |
141 | if (keepalive_cnt_ != -1) { |
142 | int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPCNT, &keepalive_cnt_, |
143 | sizeof (int)); |
144 | assert_success_or_recoverable (s_, rc); |
145 | if (rc != 0) |
146 | return rc; |
147 | } |
148 | #endif // ZMQ_HAVE_TCP_KEEPCNT |
149 | |
150 | #ifdef ZMQ_HAVE_TCP_KEEPIDLE |
151 | if (keepalive_idle_ != -1) { |
152 | int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPIDLE, |
153 | &keepalive_idle_, sizeof (int)); |
154 | assert_success_or_recoverable (s_, rc); |
155 | if (rc != 0) |
156 | return rc; |
157 | } |
158 | #else // ZMQ_HAVE_TCP_KEEPIDLE |
159 | #ifdef ZMQ_HAVE_TCP_KEEPALIVE |
160 | if (keepalive_idle_ != -1) { |
161 | int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPALIVE, |
162 | &keepalive_idle_, sizeof (int)); |
163 | assert_success_or_recoverable (s_, rc); |
164 | if (rc != 0) |
165 | return rc; |
166 | } |
167 | #endif // ZMQ_HAVE_TCP_KEEPALIVE |
168 | #endif // ZMQ_HAVE_TCP_KEEPIDLE |
169 | |
170 | #ifdef ZMQ_HAVE_TCP_KEEPINTVL |
171 | if (keepalive_intvl_ != -1) { |
172 | int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPINTVL, |
173 | &keepalive_intvl_, sizeof (int)); |
174 | assert_success_or_recoverable (s_, rc); |
175 | if (rc != 0) |
176 | return rc; |
177 | } |
178 | #endif // ZMQ_HAVE_TCP_KEEPINTVL |
179 | } |
180 | #endif // ZMQ_HAVE_SO_KEEPALIVE |
181 | #endif // ZMQ_HAVE_WINDOWS |
182 | |
183 | return 0; |
184 | } |
185 | |
186 | int zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_) |
187 | { |
188 | if (timeout_ <= 0) |
189 | return 0; |
190 | |
191 | LIBZMQ_UNUSED (sockfd_); |
192 | |
193 | #if defined(ZMQ_HAVE_WINDOWS) && defined(TCP_MAXRT) |
194 | // msdn says it's supported in >= Vista, >= Windows Server 2003 |
195 | timeout_ /= 1000; // in seconds |
196 | int rc = |
197 | setsockopt (sockfd_, IPPROTO_TCP, TCP_MAXRT, |
198 | reinterpret_cast<char *> (&timeout_), sizeof (timeout_)); |
199 | assert_success_or_recoverable (sockfd_, rc); |
200 | return rc; |
201 | // FIXME: should be ZMQ_HAVE_TCP_USER_TIMEOUT |
202 | #elif defined(TCP_USER_TIMEOUT) |
203 | int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_, |
204 | sizeof (timeout_)); |
205 | assert_success_or_recoverable (sockfd_, rc); |
206 | return rc; |
207 | #else |
208 | return 0; |
209 | #endif |
210 | } |
211 | |
212 | int zmq::tcp_write (fd_t s_, const void *data_, size_t size_) |
213 | { |
214 | #ifdef ZMQ_HAVE_WINDOWS |
215 | |
216 | int nbytes = send (s_, (char *) data_, static_cast<int> (size_), 0); |
217 | |
218 | // If not a single byte can be written to the socket in non-blocking mode |
219 | // we'll get an error (this may happen during the speculative write). |
220 | const int last_error = WSAGetLastError (); |
221 | if (nbytes == SOCKET_ERROR && last_error == WSAEWOULDBLOCK) |
222 | return 0; |
223 | |
224 | // Signalise peer failure. |
225 | if (nbytes == SOCKET_ERROR |
226 | && (last_error == WSAENETDOWN || last_error == WSAENETRESET |
227 | || last_error == WSAEHOSTUNREACH || last_error == WSAECONNABORTED |
228 | || last_error == WSAETIMEDOUT || last_error == WSAECONNRESET)) |
229 | return -1; |
230 | |
231 | // Circumvent a Windows bug: |
232 | // See https://support.microsoft.com/en-us/kb/201213 |
233 | // See https://zeromq.jira.com/browse/LIBZMQ-195 |
234 | if (nbytes == SOCKET_ERROR && last_error == WSAENOBUFS) |
235 | return 0; |
236 | |
237 | wsa_assert (nbytes != SOCKET_ERROR); |
238 | return nbytes; |
239 | |
240 | #else |
241 | ssize_t nbytes = send (s_, static_cast<const char *> (data_), size_, 0); |
242 | |
243 | // Several errors are OK. When speculative write is being done we may not |
244 | // be able to write a single byte from the socket. Also, SIGSTOP issued |
245 | // by a debugging tool can result in EINTR error. |
246 | if (nbytes == -1 |
247 | && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) |
248 | return 0; |
249 | |
250 | // Signalise peer failure. |
251 | if (nbytes == -1) { |
252 | #if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE |
253 | errno_assert (errno != EACCES && errno != EBADF && errno != EDESTADDRREQ |
254 | && errno != EFAULT && errno != EISCONN |
255 | && errno != EMSGSIZE && errno != ENOMEM |
256 | && errno != ENOTSOCK && errno != EOPNOTSUPP); |
257 | #else |
258 | errno_assert (errno != EACCES && errno != EDESTADDRREQ |
259 | && errno != EFAULT && errno != EISCONN |
260 | && errno != EMSGSIZE && errno != ENOMEM |
261 | && errno != ENOTSOCK && errno != EOPNOTSUPP); |
262 | #endif |
263 | return -1; |
264 | } |
265 | |
266 | return static_cast<int> (nbytes); |
267 | |
268 | #endif |
269 | } |
270 | |
271 | int zmq::tcp_read (fd_t s_, void *data_, size_t size_) |
272 | { |
273 | #ifdef ZMQ_HAVE_WINDOWS |
274 | |
275 | const int rc = |
276 | recv (s_, static_cast<char *> (data_), static_cast<int> (size_), 0); |
277 | |
278 | // If not a single byte can be read from the socket in non-blocking mode |
279 | // we'll get an error (this may happen during the speculative read). |
280 | if (rc == SOCKET_ERROR) { |
281 | const int last_error = WSAGetLastError (); |
282 | if (last_error == WSAEWOULDBLOCK) { |
283 | errno = EAGAIN; |
284 | } else { |
285 | wsa_assert ( |
286 | last_error == WSAENETDOWN || last_error == WSAENETRESET |
287 | || last_error == WSAECONNABORTED || last_error == WSAETIMEDOUT |
288 | || last_error == WSAECONNRESET || last_error == WSAECONNREFUSED |
289 | || last_error == WSAENOTCONN || last_error == WSAENOBUFS); |
290 | errno = wsa_error_to_errno (last_error); |
291 | } |
292 | } |
293 | |
294 | return rc == SOCKET_ERROR ? -1 : rc; |
295 | |
296 | #else |
297 | |
298 | const ssize_t rc = recv (s_, static_cast<char *> (data_), size_, 0); |
299 | |
300 | // Several errors are OK. When speculative read is being done we may not |
301 | // be able to read a single byte from the socket. Also, SIGSTOP issued |
302 | // by a debugging tool can result in EINTR error. |
303 | if (rc == -1) { |
304 | #if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE |
305 | errno_assert (errno != EBADF && errno != EFAULT && errno != ENOMEM |
306 | && errno != ENOTSOCK); |
307 | #else |
308 | errno_assert (errno != EFAULT && errno != ENOMEM && errno != ENOTSOCK); |
309 | #endif |
310 | if (errno == EWOULDBLOCK || errno == EINTR) |
311 | errno = EAGAIN; |
312 | } |
313 | |
314 | return static_cast<int> (rc); |
315 | |
316 | #endif |
317 | } |
318 | |
319 | void zmq::tcp_tune_loopback_fast_path (const fd_t socket_) |
320 | { |
321 | #if defined ZMQ_HAVE_WINDOWS && defined SIO_LOOPBACK_FAST_PATH |
322 | int sio_loopback_fastpath = 1; |
323 | DWORD number_of_bytes_returned = 0; |
324 | |
325 | int rc = WSAIoctl (socket_, SIO_LOOPBACK_FAST_PATH, &sio_loopback_fastpath, |
326 | sizeof sio_loopback_fastpath, NULL, 0, |
327 | &number_of_bytes_returned, 0, 0); |
328 | |
329 | if (SOCKET_ERROR == rc) { |
330 | DWORD last_error = ::WSAGetLastError (); |
331 | |
332 | if (WSAEOPNOTSUPP == last_error) { |
333 | // This system is not Windows 8 or Server 2012, and the call is not supported. |
334 | } else { |
335 | wsa_assert (false); |
336 | } |
337 | } |
338 | #else |
339 | LIBZMQ_UNUSED (socket_); |
340 | #endif |
341 | } |
342 | |
343 | zmq::fd_t zmq::tcp_open_socket (const char *address_, |
344 | const zmq::options_t &options_, |
345 | bool local_, |
346 | bool fallback_to_ipv4_, |
347 | zmq::tcp_address_t *out_tcp_addr_) |
348 | { |
349 | // Convert the textual address into address structure. |
350 | int rc = out_tcp_addr_->resolve (address_, local_, options_.ipv6); |
351 | if (rc != 0) |
352 | return retired_fd; |
353 | |
354 | // Create the socket. |
355 | fd_t s = open_socket (out_tcp_addr_->family (), SOCK_STREAM, IPPROTO_TCP); |
356 | |
357 | // IPv6 address family not supported, try automatic downgrade to IPv4. |
358 | if (s == retired_fd && fallback_to_ipv4_ |
359 | && out_tcp_addr_->family () == AF_INET6 && errno == EAFNOSUPPORT |
360 | && options_.ipv6) { |
361 | rc = out_tcp_addr_->resolve (address_, local_, false); |
362 | if (rc != 0) { |
363 | return retired_fd; |
364 | } |
365 | s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); |
366 | } |
367 | |
368 | if (s == retired_fd) { |
369 | return retired_fd; |
370 | } |
371 | |
372 | // On some systems, IPv4 mapping in IPv6 sockets is disabled by default. |
373 | // Switch it on in such cases. |
374 | if (out_tcp_addr_->family () == AF_INET6) |
375 | enable_ipv4_mapping (s); |
376 | |
377 | // Set the IP Type-Of-Service priority for this socket |
378 | if (options_.tos != 0) |
379 | set_ip_type_of_service (s, options_.tos); |
380 | |
381 | // Set the socket to loopback fastpath if configured. |
382 | if (options_.loopback_fastpath) |
383 | tcp_tune_loopback_fast_path (s); |
384 | |
385 | // Bind the socket to a device if applicable |
386 | if (!options_.bound_device.empty ()) |
387 | if (bind_to_device (s, options_.bound_device) == -1) |
388 | goto setsockopt_error; |
389 | |
390 | // Set the socket buffer limits for the underlying socket. |
391 | if (options_.sndbuf >= 0) |
392 | set_tcp_send_buffer (s, options_.sndbuf); |
393 | if (options_.rcvbuf >= 0) |
394 | set_tcp_receive_buffer (s, options_.rcvbuf); |
395 | |
396 | return s; |
397 | |
398 | setsockopt_error: |
399 | #ifdef ZMQ_HAVE_WINDOWS |
400 | rc = closesocket (s); |
401 | wsa_assert (rc != SOCKET_ERROR); |
402 | #else |
403 | rc = ::close (s); |
404 | errno_assert (rc == 0); |
405 | #endif |
406 | return retired_fd; |
407 | } |
408 | |