| 1 | /* |
| 2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
| 3 | |
| 4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
| 5 | |
| 6 | libzmq is free software; you can redistribute it and/or modify it under |
| 7 | the terms of the GNU Lesser General Public License (LGPL) as published |
| 8 | by the Free Software Foundation; either version 3 of the License, or |
| 9 | (at your option) any later version. |
| 10 | |
| 11 | As a special exception, the Contributors give you permission to link |
| 12 | this library with independent modules to produce an executable, |
| 13 | regardless of the license terms of these independent modules, and to |
| 14 | copy and distribute the resulting executable under terms of your choice, |
| 15 | provided that you also meet, for each linked independent module, the |
| 16 | terms and conditions of the license of that module. An independent |
| 17 | module is a module which is not derived from or based on this library. |
| 18 | If you modify this library, you must extend this exception to your |
| 19 | version of the library. |
| 20 | |
| 21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
| 22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| 23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
| 24 | License for more details. |
| 25 | |
| 26 | You should have received a copy of the GNU Lesser General Public License |
| 27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 28 | */ |
| 29 | |
| 30 | #include "precompiled.hpp" |
| 31 | #include "macros.hpp" |
| 32 | #include "ip.hpp" |
| 33 | #include "tcp.hpp" |
| 34 | #include "err.hpp" |
| 35 | #include "options.hpp" |
| 36 | |
| 37 | #if !defined ZMQ_HAVE_WINDOWS |
| 38 | #include <fcntl.h> |
| 39 | #include <sys/types.h> |
| 40 | #include <sys/socket.h> |
| 41 | #include <netinet/in.h> |
| 42 | #include <netinet/tcp.h> |
| 43 | #include <unistd.h> |
| 44 | #ifdef ZMQ_HAVE_VXWORKS |
| 45 | #include <sockLib.h> |
| 46 | #endif |
| 47 | #endif |
| 48 | |
| 49 | #if defined ZMQ_HAVE_OPENVMS |
| 50 | #include <ioctl.h> |
| 51 | #endif |
| 52 | |
| 53 | #ifdef __APPLE__ |
| 54 | #include <TargetConditionals.h> |
| 55 | #endif |
| 56 | |
| 57 | int zmq::tune_tcp_socket (fd_t s_) |
| 58 | { |
| 59 | // Disable Nagle's algorithm. We are doing data batching on 0MQ level, |
| 60 | // so using Nagle wouldn't improve throughput in anyway, but it would |
| 61 | // hurt latency. |
| 62 | int nodelay = 1; |
| 63 | int rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELAY, |
| 64 | reinterpret_cast<char *> (&nodelay), sizeof (int)); |
| 65 | assert_success_or_recoverable (s_, rc); |
| 66 | if (rc != 0) |
| 67 | return rc; |
| 68 | |
| 69 | #ifdef ZMQ_HAVE_OPENVMS |
| 70 | // Disable delayed acknowledgements as they hurt latency significantly. |
| 71 | int nodelack = 1; |
| 72 | rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELACK, (char *) &nodelack, |
| 73 | sizeof (int)); |
| 74 | assert_success_or_recoverable (s_, rc); |
| 75 | #endif |
| 76 | return rc; |
| 77 | } |
| 78 | |
| 79 | int zmq::set_tcp_send_buffer (fd_t sockfd_, int bufsize_) |
| 80 | { |
| 81 | const int rc = |
| 82 | setsockopt (sockfd_, SOL_SOCKET, SO_SNDBUF, |
| 83 | reinterpret_cast<char *> (&bufsize_), sizeof bufsize_); |
| 84 | assert_success_or_recoverable (sockfd_, rc); |
| 85 | return rc; |
| 86 | } |
| 87 | |
| 88 | int zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_) |
| 89 | { |
| 90 | const int rc = |
| 91 | setsockopt (sockfd_, SOL_SOCKET, SO_RCVBUF, |
| 92 | reinterpret_cast<char *> (&bufsize_), sizeof bufsize_); |
| 93 | assert_success_or_recoverable (sockfd_, rc); |
| 94 | return rc; |
| 95 | } |
| 96 | |
| 97 | int zmq::tune_tcp_keepalives (fd_t s_, |
| 98 | int keepalive_, |
| 99 | int keepalive_cnt_, |
| 100 | int keepalive_idle_, |
| 101 | int keepalive_intvl_) |
| 102 | { |
| 103 | // These options are used only under certain #ifdefs below. |
| 104 | LIBZMQ_UNUSED (keepalive_); |
| 105 | LIBZMQ_UNUSED (keepalive_cnt_); |
| 106 | LIBZMQ_UNUSED (keepalive_idle_); |
| 107 | LIBZMQ_UNUSED (keepalive_intvl_); |
| 108 | |
| 109 | // If none of the #ifdefs apply, then s_ is unused. |
| 110 | LIBZMQ_UNUSED (s_); |
| 111 | |
| 112 | // Tuning TCP keep-alives if platform allows it |
| 113 | // All values = -1 means skip and leave it for OS |
| 114 | #ifdef ZMQ_HAVE_WINDOWS |
| 115 | if (keepalive_ != -1) { |
| 116 | tcp_keepalive keepalive_opts; |
| 117 | keepalive_opts.onoff = keepalive_; |
| 118 | keepalive_opts.keepalivetime = |
| 119 | keepalive_idle_ != -1 ? keepalive_idle_ * 1000 : 7200000; |
| 120 | keepalive_opts.keepaliveinterval = |
| 121 | keepalive_intvl_ != -1 ? keepalive_intvl_ * 1000 : 1000; |
| 122 | DWORD num_bytes_returned; |
| 123 | int rc = WSAIoctl (s_, SIO_KEEPALIVE_VALS, &keepalive_opts, |
| 124 | sizeof (keepalive_opts), NULL, 0, |
| 125 | &num_bytes_returned, NULL, NULL); |
| 126 | assert_success_or_recoverable (s_, rc); |
| 127 | if (rc == SOCKET_ERROR) |
| 128 | return rc; |
| 129 | } |
| 130 | #else |
| 131 | #ifdef ZMQ_HAVE_SO_KEEPALIVE |
| 132 | if (keepalive_ != -1) { |
| 133 | int rc = |
| 134 | setsockopt (s_, SOL_SOCKET, SO_KEEPALIVE, |
| 135 | reinterpret_cast<char *> (&keepalive_), sizeof (int)); |
| 136 | assert_success_or_recoverable (s_, rc); |
| 137 | if (rc != 0) |
| 138 | return rc; |
| 139 | |
| 140 | #ifdef ZMQ_HAVE_TCP_KEEPCNT |
| 141 | if (keepalive_cnt_ != -1) { |
| 142 | int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPCNT, &keepalive_cnt_, |
| 143 | sizeof (int)); |
| 144 | assert_success_or_recoverable (s_, rc); |
| 145 | if (rc != 0) |
| 146 | return rc; |
| 147 | } |
| 148 | #endif // ZMQ_HAVE_TCP_KEEPCNT |
| 149 | |
| 150 | #ifdef ZMQ_HAVE_TCP_KEEPIDLE |
| 151 | if (keepalive_idle_ != -1) { |
| 152 | int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPIDLE, |
| 153 | &keepalive_idle_, sizeof (int)); |
| 154 | assert_success_or_recoverable (s_, rc); |
| 155 | if (rc != 0) |
| 156 | return rc; |
| 157 | } |
| 158 | #else // ZMQ_HAVE_TCP_KEEPIDLE |
| 159 | #ifdef ZMQ_HAVE_TCP_KEEPALIVE |
| 160 | if (keepalive_idle_ != -1) { |
| 161 | int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPALIVE, |
| 162 | &keepalive_idle_, sizeof (int)); |
| 163 | assert_success_or_recoverable (s_, rc); |
| 164 | if (rc != 0) |
| 165 | return rc; |
| 166 | } |
| 167 | #endif // ZMQ_HAVE_TCP_KEEPALIVE |
| 168 | #endif // ZMQ_HAVE_TCP_KEEPIDLE |
| 169 | |
| 170 | #ifdef ZMQ_HAVE_TCP_KEEPINTVL |
| 171 | if (keepalive_intvl_ != -1) { |
| 172 | int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPINTVL, |
| 173 | &keepalive_intvl_, sizeof (int)); |
| 174 | assert_success_or_recoverable (s_, rc); |
| 175 | if (rc != 0) |
| 176 | return rc; |
| 177 | } |
| 178 | #endif // ZMQ_HAVE_TCP_KEEPINTVL |
| 179 | } |
| 180 | #endif // ZMQ_HAVE_SO_KEEPALIVE |
| 181 | #endif // ZMQ_HAVE_WINDOWS |
| 182 | |
| 183 | return 0; |
| 184 | } |
| 185 | |
| 186 | int zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_) |
| 187 | { |
| 188 | if (timeout_ <= 0) |
| 189 | return 0; |
| 190 | |
| 191 | LIBZMQ_UNUSED (sockfd_); |
| 192 | |
| 193 | #if defined(ZMQ_HAVE_WINDOWS) && defined(TCP_MAXRT) |
| 194 | // msdn says it's supported in >= Vista, >= Windows Server 2003 |
| 195 | timeout_ /= 1000; // in seconds |
| 196 | int rc = |
| 197 | setsockopt (sockfd_, IPPROTO_TCP, TCP_MAXRT, |
| 198 | reinterpret_cast<char *> (&timeout_), sizeof (timeout_)); |
| 199 | assert_success_or_recoverable (sockfd_, rc); |
| 200 | return rc; |
| 201 | // FIXME: should be ZMQ_HAVE_TCP_USER_TIMEOUT |
| 202 | #elif defined(TCP_USER_TIMEOUT) |
| 203 | int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_, |
| 204 | sizeof (timeout_)); |
| 205 | assert_success_or_recoverable (sockfd_, rc); |
| 206 | return rc; |
| 207 | #else |
| 208 | return 0; |
| 209 | #endif |
| 210 | } |
| 211 | |
| 212 | int zmq::tcp_write (fd_t s_, const void *data_, size_t size_) |
| 213 | { |
| 214 | #ifdef ZMQ_HAVE_WINDOWS |
| 215 | |
| 216 | int nbytes = send (s_, (char *) data_, static_cast<int> (size_), 0); |
| 217 | |
| 218 | // If not a single byte can be written to the socket in non-blocking mode |
| 219 | // we'll get an error (this may happen during the speculative write). |
| 220 | const int last_error = WSAGetLastError (); |
| 221 | if (nbytes == SOCKET_ERROR && last_error == WSAEWOULDBLOCK) |
| 222 | return 0; |
| 223 | |
| 224 | // Signalise peer failure. |
| 225 | if (nbytes == SOCKET_ERROR |
| 226 | && (last_error == WSAENETDOWN || last_error == WSAENETRESET |
| 227 | || last_error == WSAEHOSTUNREACH || last_error == WSAECONNABORTED |
| 228 | || last_error == WSAETIMEDOUT || last_error == WSAECONNRESET)) |
| 229 | return -1; |
| 230 | |
| 231 | // Circumvent a Windows bug: |
| 232 | // See https://support.microsoft.com/en-us/kb/201213 |
| 233 | // See https://zeromq.jira.com/browse/LIBZMQ-195 |
| 234 | if (nbytes == SOCKET_ERROR && last_error == WSAENOBUFS) |
| 235 | return 0; |
| 236 | |
| 237 | wsa_assert (nbytes != SOCKET_ERROR); |
| 238 | return nbytes; |
| 239 | |
| 240 | #else |
| 241 | ssize_t nbytes = send (s_, static_cast<const char *> (data_), size_, 0); |
| 242 | |
| 243 | // Several errors are OK. When speculative write is being done we may not |
| 244 | // be able to write a single byte from the socket. Also, SIGSTOP issued |
| 245 | // by a debugging tool can result in EINTR error. |
| 246 | if (nbytes == -1 |
| 247 | && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) |
| 248 | return 0; |
| 249 | |
| 250 | // Signalise peer failure. |
| 251 | if (nbytes == -1) { |
| 252 | #if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE |
| 253 | errno_assert (errno != EACCES && errno != EBADF && errno != EDESTADDRREQ |
| 254 | && errno != EFAULT && errno != EISCONN |
| 255 | && errno != EMSGSIZE && errno != ENOMEM |
| 256 | && errno != ENOTSOCK && errno != EOPNOTSUPP); |
| 257 | #else |
| 258 | errno_assert (errno != EACCES && errno != EDESTADDRREQ |
| 259 | && errno != EFAULT && errno != EISCONN |
| 260 | && errno != EMSGSIZE && errno != ENOMEM |
| 261 | && errno != ENOTSOCK && errno != EOPNOTSUPP); |
| 262 | #endif |
| 263 | return -1; |
| 264 | } |
| 265 | |
| 266 | return static_cast<int> (nbytes); |
| 267 | |
| 268 | #endif |
| 269 | } |
| 270 | |
| 271 | int zmq::tcp_read (fd_t s_, void *data_, size_t size_) |
| 272 | { |
| 273 | #ifdef ZMQ_HAVE_WINDOWS |
| 274 | |
| 275 | const int rc = |
| 276 | recv (s_, static_cast<char *> (data_), static_cast<int> (size_), 0); |
| 277 | |
| 278 | // If not a single byte can be read from the socket in non-blocking mode |
| 279 | // we'll get an error (this may happen during the speculative read). |
| 280 | if (rc == SOCKET_ERROR) { |
| 281 | const int last_error = WSAGetLastError (); |
| 282 | if (last_error == WSAEWOULDBLOCK) { |
| 283 | errno = EAGAIN; |
| 284 | } else { |
| 285 | wsa_assert ( |
| 286 | last_error == WSAENETDOWN || last_error == WSAENETRESET |
| 287 | || last_error == WSAECONNABORTED || last_error == WSAETIMEDOUT |
| 288 | || last_error == WSAECONNRESET || last_error == WSAECONNREFUSED |
| 289 | || last_error == WSAENOTCONN || last_error == WSAENOBUFS); |
| 290 | errno = wsa_error_to_errno (last_error); |
| 291 | } |
| 292 | } |
| 293 | |
| 294 | return rc == SOCKET_ERROR ? -1 : rc; |
| 295 | |
| 296 | #else |
| 297 | |
| 298 | const ssize_t rc = recv (s_, static_cast<char *> (data_), size_, 0); |
| 299 | |
| 300 | // Several errors are OK. When speculative read is being done we may not |
| 301 | // be able to read a single byte from the socket. Also, SIGSTOP issued |
| 302 | // by a debugging tool can result in EINTR error. |
| 303 | if (rc == -1) { |
| 304 | #if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE |
| 305 | errno_assert (errno != EBADF && errno != EFAULT && errno != ENOMEM |
| 306 | && errno != ENOTSOCK); |
| 307 | #else |
| 308 | errno_assert (errno != EFAULT && errno != ENOMEM && errno != ENOTSOCK); |
| 309 | #endif |
| 310 | if (errno == EWOULDBLOCK || errno == EINTR) |
| 311 | errno = EAGAIN; |
| 312 | } |
| 313 | |
| 314 | return static_cast<int> (rc); |
| 315 | |
| 316 | #endif |
| 317 | } |
| 318 | |
| 319 | void zmq::tcp_tune_loopback_fast_path (const fd_t socket_) |
| 320 | { |
| 321 | #if defined ZMQ_HAVE_WINDOWS && defined SIO_LOOPBACK_FAST_PATH |
| 322 | int sio_loopback_fastpath = 1; |
| 323 | DWORD number_of_bytes_returned = 0; |
| 324 | |
| 325 | int rc = WSAIoctl (socket_, SIO_LOOPBACK_FAST_PATH, &sio_loopback_fastpath, |
| 326 | sizeof sio_loopback_fastpath, NULL, 0, |
| 327 | &number_of_bytes_returned, 0, 0); |
| 328 | |
| 329 | if (SOCKET_ERROR == rc) { |
| 330 | DWORD last_error = ::WSAGetLastError (); |
| 331 | |
| 332 | if (WSAEOPNOTSUPP == last_error) { |
| 333 | // This system is not Windows 8 or Server 2012, and the call is not supported. |
| 334 | } else { |
| 335 | wsa_assert (false); |
| 336 | } |
| 337 | } |
| 338 | #else |
| 339 | LIBZMQ_UNUSED (socket_); |
| 340 | #endif |
| 341 | } |
| 342 | |
| 343 | zmq::fd_t zmq::tcp_open_socket (const char *address_, |
| 344 | const zmq::options_t &options_, |
| 345 | bool local_, |
| 346 | bool fallback_to_ipv4_, |
| 347 | zmq::tcp_address_t *out_tcp_addr_) |
| 348 | { |
| 349 | // Convert the textual address into address structure. |
| 350 | int rc = out_tcp_addr_->resolve (address_, local_, options_.ipv6); |
| 351 | if (rc != 0) |
| 352 | return retired_fd; |
| 353 | |
| 354 | // Create the socket. |
| 355 | fd_t s = open_socket (out_tcp_addr_->family (), SOCK_STREAM, IPPROTO_TCP); |
| 356 | |
| 357 | // IPv6 address family not supported, try automatic downgrade to IPv4. |
| 358 | if (s == retired_fd && fallback_to_ipv4_ |
| 359 | && out_tcp_addr_->family () == AF_INET6 && errno == EAFNOSUPPORT |
| 360 | && options_.ipv6) { |
| 361 | rc = out_tcp_addr_->resolve (address_, local_, false); |
| 362 | if (rc != 0) { |
| 363 | return retired_fd; |
| 364 | } |
| 365 | s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); |
| 366 | } |
| 367 | |
| 368 | if (s == retired_fd) { |
| 369 | return retired_fd; |
| 370 | } |
| 371 | |
| 372 | // On some systems, IPv4 mapping in IPv6 sockets is disabled by default. |
| 373 | // Switch it on in such cases. |
| 374 | if (out_tcp_addr_->family () == AF_INET6) |
| 375 | enable_ipv4_mapping (s); |
| 376 | |
| 377 | // Set the IP Type-Of-Service priority for this socket |
| 378 | if (options_.tos != 0) |
| 379 | set_ip_type_of_service (s, options_.tos); |
| 380 | |
| 381 | // Set the socket to loopback fastpath if configured. |
| 382 | if (options_.loopback_fastpath) |
| 383 | tcp_tune_loopback_fast_path (s); |
| 384 | |
| 385 | // Bind the socket to a device if applicable |
| 386 | if (!options_.bound_device.empty ()) |
| 387 | if (bind_to_device (s, options_.bound_device) == -1) |
| 388 | goto setsockopt_error; |
| 389 | |
| 390 | // Set the socket buffer limits for the underlying socket. |
| 391 | if (options_.sndbuf >= 0) |
| 392 | set_tcp_send_buffer (s, options_.sndbuf); |
| 393 | if (options_.rcvbuf >= 0) |
| 394 | set_tcp_receive_buffer (s, options_.rcvbuf); |
| 395 | |
| 396 | return s; |
| 397 | |
| 398 | setsockopt_error: |
| 399 | #ifdef ZMQ_HAVE_WINDOWS |
| 400 | rc = closesocket (s); |
| 401 | wsa_assert (rc != SOCKET_ERROR); |
| 402 | #else |
| 403 | rc = ::close (s); |
| 404 | errno_assert (rc == 0); |
| 405 | #endif |
| 406 | return retired_fd; |
| 407 | } |
| 408 | |