1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include "macros.hpp"
32#include "ip.hpp"
33#include "tcp.hpp"
34#include "err.hpp"
35#include "options.hpp"
36
37#if !defined ZMQ_HAVE_WINDOWS
38#include <fcntl.h>
39#include <sys/types.h>
40#include <sys/socket.h>
41#include <netinet/in.h>
42#include <netinet/tcp.h>
43#include <unistd.h>
44#ifdef ZMQ_HAVE_VXWORKS
45#include <sockLib.h>
46#endif
47#endif
48
49#if defined ZMQ_HAVE_OPENVMS
50#include <ioctl.h>
51#endif
52
53#ifdef __APPLE__
54#include <TargetConditionals.h>
55#endif
56
57int zmq::tune_tcp_socket (fd_t s_)
58{
59 // Disable Nagle's algorithm. We are doing data batching on 0MQ level,
60 // so using Nagle wouldn't improve throughput in anyway, but it would
61 // hurt latency.
62 int nodelay = 1;
63 int rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELAY,
64 reinterpret_cast<char *> (&nodelay), sizeof (int));
65 assert_success_or_recoverable (s_, rc);
66 if (rc != 0)
67 return rc;
68
69#ifdef ZMQ_HAVE_OPENVMS
70 // Disable delayed acknowledgements as they hurt latency significantly.
71 int nodelack = 1;
72 rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELACK, (char *) &nodelack,
73 sizeof (int));
74 assert_success_or_recoverable (s_, rc);
75#endif
76 return rc;
77}
78
79int zmq::set_tcp_send_buffer (fd_t sockfd_, int bufsize_)
80{
81 const int rc =
82 setsockopt (sockfd_, SOL_SOCKET, SO_SNDBUF,
83 reinterpret_cast<char *> (&bufsize_), sizeof bufsize_);
84 assert_success_or_recoverable (sockfd_, rc);
85 return rc;
86}
87
88int zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_)
89{
90 const int rc =
91 setsockopt (sockfd_, SOL_SOCKET, SO_RCVBUF,
92 reinterpret_cast<char *> (&bufsize_), sizeof bufsize_);
93 assert_success_or_recoverable (sockfd_, rc);
94 return rc;
95}
96
97int zmq::tune_tcp_keepalives (fd_t s_,
98 int keepalive_,
99 int keepalive_cnt_,
100 int keepalive_idle_,
101 int keepalive_intvl_)
102{
103 // These options are used only under certain #ifdefs below.
104 LIBZMQ_UNUSED (keepalive_);
105 LIBZMQ_UNUSED (keepalive_cnt_);
106 LIBZMQ_UNUSED (keepalive_idle_);
107 LIBZMQ_UNUSED (keepalive_intvl_);
108
109 // If none of the #ifdefs apply, then s_ is unused.
110 LIBZMQ_UNUSED (s_);
111
112 // Tuning TCP keep-alives if platform allows it
113 // All values = -1 means skip and leave it for OS
114#ifdef ZMQ_HAVE_WINDOWS
115 if (keepalive_ != -1) {
116 tcp_keepalive keepalive_opts;
117 keepalive_opts.onoff = keepalive_;
118 keepalive_opts.keepalivetime =
119 keepalive_idle_ != -1 ? keepalive_idle_ * 1000 : 7200000;
120 keepalive_opts.keepaliveinterval =
121 keepalive_intvl_ != -1 ? keepalive_intvl_ * 1000 : 1000;
122 DWORD num_bytes_returned;
123 int rc = WSAIoctl (s_, SIO_KEEPALIVE_VALS, &keepalive_opts,
124 sizeof (keepalive_opts), NULL, 0,
125 &num_bytes_returned, NULL, NULL);
126 assert_success_or_recoverable (s_, rc);
127 if (rc == SOCKET_ERROR)
128 return rc;
129 }
130#else
131#ifdef ZMQ_HAVE_SO_KEEPALIVE
132 if (keepalive_ != -1) {
133 int rc =
134 setsockopt (s_, SOL_SOCKET, SO_KEEPALIVE,
135 reinterpret_cast<char *> (&keepalive_), sizeof (int));
136 assert_success_or_recoverable (s_, rc);
137 if (rc != 0)
138 return rc;
139
140#ifdef ZMQ_HAVE_TCP_KEEPCNT
141 if (keepalive_cnt_ != -1) {
142 int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPCNT, &keepalive_cnt_,
143 sizeof (int));
144 assert_success_or_recoverable (s_, rc);
145 if (rc != 0)
146 return rc;
147 }
148#endif // ZMQ_HAVE_TCP_KEEPCNT
149
150#ifdef ZMQ_HAVE_TCP_KEEPIDLE
151 if (keepalive_idle_ != -1) {
152 int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPIDLE,
153 &keepalive_idle_, sizeof (int));
154 assert_success_or_recoverable (s_, rc);
155 if (rc != 0)
156 return rc;
157 }
158#else // ZMQ_HAVE_TCP_KEEPIDLE
159#ifdef ZMQ_HAVE_TCP_KEEPALIVE
160 if (keepalive_idle_ != -1) {
161 int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPALIVE,
162 &keepalive_idle_, sizeof (int));
163 assert_success_or_recoverable (s_, rc);
164 if (rc != 0)
165 return rc;
166 }
167#endif // ZMQ_HAVE_TCP_KEEPALIVE
168#endif // ZMQ_HAVE_TCP_KEEPIDLE
169
170#ifdef ZMQ_HAVE_TCP_KEEPINTVL
171 if (keepalive_intvl_ != -1) {
172 int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPINTVL,
173 &keepalive_intvl_, sizeof (int));
174 assert_success_or_recoverable (s_, rc);
175 if (rc != 0)
176 return rc;
177 }
178#endif // ZMQ_HAVE_TCP_KEEPINTVL
179 }
180#endif // ZMQ_HAVE_SO_KEEPALIVE
181#endif // ZMQ_HAVE_WINDOWS
182
183 return 0;
184}
185
186int zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_)
187{
188 if (timeout_ <= 0)
189 return 0;
190
191 LIBZMQ_UNUSED (sockfd_);
192
193#if defined(ZMQ_HAVE_WINDOWS) && defined(TCP_MAXRT)
194 // msdn says it's supported in >= Vista, >= Windows Server 2003
195 timeout_ /= 1000; // in seconds
196 int rc =
197 setsockopt (sockfd_, IPPROTO_TCP, TCP_MAXRT,
198 reinterpret_cast<char *> (&timeout_), sizeof (timeout_));
199 assert_success_or_recoverable (sockfd_, rc);
200 return rc;
201// FIXME: should be ZMQ_HAVE_TCP_USER_TIMEOUT
202#elif defined(TCP_USER_TIMEOUT)
203 int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_,
204 sizeof (timeout_));
205 assert_success_or_recoverable (sockfd_, rc);
206 return rc;
207#else
208 return 0;
209#endif
210}
211
212int zmq::tcp_write (fd_t s_, const void *data_, size_t size_)
213{
214#ifdef ZMQ_HAVE_WINDOWS
215
216 int nbytes = send (s_, (char *) data_, static_cast<int> (size_), 0);
217
218 // If not a single byte can be written to the socket in non-blocking mode
219 // we'll get an error (this may happen during the speculative write).
220 const int last_error = WSAGetLastError ();
221 if (nbytes == SOCKET_ERROR && last_error == WSAEWOULDBLOCK)
222 return 0;
223
224 // Signalise peer failure.
225 if (nbytes == SOCKET_ERROR
226 && (last_error == WSAENETDOWN || last_error == WSAENETRESET
227 || last_error == WSAEHOSTUNREACH || last_error == WSAECONNABORTED
228 || last_error == WSAETIMEDOUT || last_error == WSAECONNRESET))
229 return -1;
230
231 // Circumvent a Windows bug:
232 // See https://support.microsoft.com/en-us/kb/201213
233 // See https://zeromq.jira.com/browse/LIBZMQ-195
234 if (nbytes == SOCKET_ERROR && last_error == WSAENOBUFS)
235 return 0;
236
237 wsa_assert (nbytes != SOCKET_ERROR);
238 return nbytes;
239
240#else
241 ssize_t nbytes = send (s_, static_cast<const char *> (data_), size_, 0);
242
243 // Several errors are OK. When speculative write is being done we may not
244 // be able to write a single byte from the socket. Also, SIGSTOP issued
245 // by a debugging tool can result in EINTR error.
246 if (nbytes == -1
247 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
248 return 0;
249
250 // Signalise peer failure.
251 if (nbytes == -1) {
252#if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
253 errno_assert (errno != EACCES && errno != EBADF && errno != EDESTADDRREQ
254 && errno != EFAULT && errno != EISCONN
255 && errno != EMSGSIZE && errno != ENOMEM
256 && errno != ENOTSOCK && errno != EOPNOTSUPP);
257#else
258 errno_assert (errno != EACCES && errno != EDESTADDRREQ
259 && errno != EFAULT && errno != EISCONN
260 && errno != EMSGSIZE && errno != ENOMEM
261 && errno != ENOTSOCK && errno != EOPNOTSUPP);
262#endif
263 return -1;
264 }
265
266 return static_cast<int> (nbytes);
267
268#endif
269}
270
271int zmq::tcp_read (fd_t s_, void *data_, size_t size_)
272{
273#ifdef ZMQ_HAVE_WINDOWS
274
275 const int rc =
276 recv (s_, static_cast<char *> (data_), static_cast<int> (size_), 0);
277
278 // If not a single byte can be read from the socket in non-blocking mode
279 // we'll get an error (this may happen during the speculative read).
280 if (rc == SOCKET_ERROR) {
281 const int last_error = WSAGetLastError ();
282 if (last_error == WSAEWOULDBLOCK) {
283 errno = EAGAIN;
284 } else {
285 wsa_assert (
286 last_error == WSAENETDOWN || last_error == WSAENETRESET
287 || last_error == WSAECONNABORTED || last_error == WSAETIMEDOUT
288 || last_error == WSAECONNRESET || last_error == WSAECONNREFUSED
289 || last_error == WSAENOTCONN || last_error == WSAENOBUFS);
290 errno = wsa_error_to_errno (last_error);
291 }
292 }
293
294 return rc == SOCKET_ERROR ? -1 : rc;
295
296#else
297
298 const ssize_t rc = recv (s_, static_cast<char *> (data_), size_, 0);
299
300 // Several errors are OK. When speculative read is being done we may not
301 // be able to read a single byte from the socket. Also, SIGSTOP issued
302 // by a debugging tool can result in EINTR error.
303 if (rc == -1) {
304#if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
305 errno_assert (errno != EBADF && errno != EFAULT && errno != ENOMEM
306 && errno != ENOTSOCK);
307#else
308 errno_assert (errno != EFAULT && errno != ENOMEM && errno != ENOTSOCK);
309#endif
310 if (errno == EWOULDBLOCK || errno == EINTR)
311 errno = EAGAIN;
312 }
313
314 return static_cast<int> (rc);
315
316#endif
317}
318
319void zmq::tcp_tune_loopback_fast_path (const fd_t socket_)
320{
321#if defined ZMQ_HAVE_WINDOWS && defined SIO_LOOPBACK_FAST_PATH
322 int sio_loopback_fastpath = 1;
323 DWORD number_of_bytes_returned = 0;
324
325 int rc = WSAIoctl (socket_, SIO_LOOPBACK_FAST_PATH, &sio_loopback_fastpath,
326 sizeof sio_loopback_fastpath, NULL, 0,
327 &number_of_bytes_returned, 0, 0);
328
329 if (SOCKET_ERROR == rc) {
330 DWORD last_error = ::WSAGetLastError ();
331
332 if (WSAEOPNOTSUPP == last_error) {
333 // This system is not Windows 8 or Server 2012, and the call is not supported.
334 } else {
335 wsa_assert (false);
336 }
337 }
338#else
339 LIBZMQ_UNUSED (socket_);
340#endif
341}
342
343zmq::fd_t zmq::tcp_open_socket (const char *address_,
344 const zmq::options_t &options_,
345 bool local_,
346 bool fallback_to_ipv4_,
347 zmq::tcp_address_t *out_tcp_addr_)
348{
349 // Convert the textual address into address structure.
350 int rc = out_tcp_addr_->resolve (address_, local_, options_.ipv6);
351 if (rc != 0)
352 return retired_fd;
353
354 // Create the socket.
355 fd_t s = open_socket (out_tcp_addr_->family (), SOCK_STREAM, IPPROTO_TCP);
356
357 // IPv6 address family not supported, try automatic downgrade to IPv4.
358 if (s == retired_fd && fallback_to_ipv4_
359 && out_tcp_addr_->family () == AF_INET6 && errno == EAFNOSUPPORT
360 && options_.ipv6) {
361 rc = out_tcp_addr_->resolve (address_, local_, false);
362 if (rc != 0) {
363 return retired_fd;
364 }
365 s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
366 }
367
368 if (s == retired_fd) {
369 return retired_fd;
370 }
371
372 // On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
373 // Switch it on in such cases.
374 if (out_tcp_addr_->family () == AF_INET6)
375 enable_ipv4_mapping (s);
376
377 // Set the IP Type-Of-Service priority for this socket
378 if (options_.tos != 0)
379 set_ip_type_of_service (s, options_.tos);
380
381 // Set the socket to loopback fastpath if configured.
382 if (options_.loopback_fastpath)
383 tcp_tune_loopback_fast_path (s);
384
385 // Bind the socket to a device if applicable
386 if (!options_.bound_device.empty ())
387 if (bind_to_device (s, options_.bound_device) == -1)
388 goto setsockopt_error;
389
390 // Set the socket buffer limits for the underlying socket.
391 if (options_.sndbuf >= 0)
392 set_tcp_send_buffer (s, options_.sndbuf);
393 if (options_.rcvbuf >= 0)
394 set_tcp_receive_buffer (s, options_.rcvbuf);
395
396 return s;
397
398setsockopt_error:
399#ifdef ZMQ_HAVE_WINDOWS
400 rc = closesocket (s);
401 wsa_assert (rc != SOCKET_ERROR);
402#else
403 rc = ::close (s);
404 errno_assert (rc == 0);
405#endif
406 return retired_fd;
407}
408