1/*
2Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4This file is part of libzmq, the ZeroMQ core engine in C++.
5
6libzmq is free software; you can redistribute it and/or modify it under
7the terms of the GNU Lesser General Public License (LGPL) as published
8by the Free Software Foundation; either version 3 of the License, or
9(at your option) any later version.
10
11As a special exception, the Contributors give you permission to link
12this library with independent modules to produce an executable,
13regardless of the license terms of these independent modules, and to
14copy and distribute the resulting executable under terms of your choice,
15provided that you also meet, for each linked independent module, the
16terms and conditions of the license of that module. An independent
17module is a module which is not derived from or based on this library.
18If you modify this library, you must extend this exception to your
19version of the library.
20
21libzmq is distributed in the hope that it will be useful, but WITHOUT
22ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24License for more details.
25
26You should have received a copy of the GNU Lesser General Public License
27along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31
32#if !defined ZMQ_HAVE_WINDOWS
33#include <sys/types.h>
34#include <unistd.h>
35#include <sys/socket.h>
36#include <netinet/in.h>
37#include <arpa/inet.h>
38#ifdef ZMQ_HAVE_VXWORKS
39#include <sockLib.h>
40#endif
41#endif
42
43#include "udp_address.hpp"
44#include "udp_engine.hpp"
45#include "session_base.hpp"
46#include "err.hpp"
47#include "ip.hpp"
48
49// OSX uses a different name for this socket option
50#ifndef IPV6_ADD_MEMBERSHIP
51#define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
52#endif
53
54#ifdef __APPLE__
55#include <TargetConditionals.h>
56#endif
57
58zmq::udp_engine_t::udp_engine_t (const options_t &options_) :
59 _plugged (false),
60 _fd (-1),
61 _session (NULL),
62 _handle (static_cast<handle_t> (NULL)),
63 _address (NULL),
64 _options (options_),
65 _send_enabled (false),
66 _recv_enabled (false)
67{
68}
69
70zmq::udp_engine_t::~udp_engine_t ()
71{
72 zmq_assert (!_plugged);
73
74 if (_fd != retired_fd) {
75#ifdef ZMQ_HAVE_WINDOWS
76 int rc = closesocket (_fd);
77 wsa_assert (rc != SOCKET_ERROR);
78#else
79 int rc = close (_fd);
80 errno_assert (rc == 0);
81#endif
82 _fd = retired_fd;
83 }
84}
85
86int zmq::udp_engine_t::init (address_t *address_, bool send_, bool recv_)
87{
88 zmq_assert (address_);
89 zmq_assert (send_ || recv_);
90 _send_enabled = send_;
91 _recv_enabled = recv_;
92 _address = address_;
93
94 _fd = open_socket (_address->resolved.udp_addr->family (), SOCK_DGRAM,
95 IPPROTO_UDP);
96 if (_fd == retired_fd)
97 return -1;
98
99 unblock_socket (_fd);
100
101 return 0;
102}
103
104void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
105{
106 zmq_assert (!_plugged);
107 _plugged = true;
108
109 zmq_assert (!_session);
110 zmq_assert (session_);
111 _session = session_;
112
113 // Connect to I/O threads poller object.
114 io_object_t::plug (io_thread_);
115 _handle = add_fd (_fd);
116
117 const udp_address_t *const udp_addr = _address->resolved.udp_addr;
118
119 int rc = 0;
120
121 // Bind the socket to a device if applicable
122 if (!_options.bound_device.empty ()) {
123 rc = rc | bind_to_device (_fd, _options.bound_device);
124 if (rc != 0) {
125 assert_success_or_recoverable (_fd, rc);
126 error (connection_error);
127 return;
128 }
129 }
130
131 if (_send_enabled) {
132 if (!_options.raw_socket) {
133 const ip_addr_t *out = udp_addr->target_addr ();
134 _out_address = out->as_sockaddr ();
135 _out_address_len = out->sockaddr_len ();
136
137 if (out->is_multicast ()) {
138 bool is_ipv6 = (out->family () == AF_INET6);
139 rc = rc
140 | set_udp_multicast_loop (_fd, is_ipv6,
141 _options.multicast_loop);
142
143 if (_options.multicast_hops > 0) {
144 rc = rc
145 | set_udp_multicast_ttl (_fd, is_ipv6,
146 _options.multicast_hops);
147 }
148
149 rc = rc | set_udp_multicast_iface (_fd, is_ipv6, udp_addr);
150 }
151 } else {
152 /// XXX fixme ?
153 _out_address = reinterpret_cast<sockaddr *> (&_raw_address);
154 _out_address_len =
155 static_cast<zmq_socklen_t> (sizeof (sockaddr_in));
156 }
157 }
158
159 if (_recv_enabled) {
160 rc = rc | set_udp_reuse_address (_fd, true);
161
162 const ip_addr_t *bind_addr = udp_addr->bind_addr ();
163 ip_addr_t any = ip_addr_t::any (bind_addr->family ());
164 const ip_addr_t *real_bind_addr;
165
166 bool multicast = udp_addr->is_mcast ();
167
168 if (multicast) {
169 // Multicast addresses should be allowed to bind to more than
170 // one port as all ports should receive the message
171 rc = rc | set_udp_reuse_port (_fd, true);
172
173 // In multicast we should bind ANY and use the mreq struct to
174 // specify the interface
175 any.set_port (bind_addr->port ());
176
177 real_bind_addr = &any;
178 } else {
179 real_bind_addr = bind_addr;
180 }
181
182 if (rc != 0) {
183 error (protocol_error);
184 return;
185 }
186
187#ifdef ZMQ_HAVE_VXWORKS
188 rc = rc
189 | bind (_fd, (sockaddr *) real_bind_addr->as_sockaddr (),
190 real_bind_addr->sockaddr_len ());
191#else
192 rc = rc
193 | bind (_fd, real_bind_addr->as_sockaddr (),
194 real_bind_addr->sockaddr_len ());
195#endif
196 if (rc != 0) {
197 assert_success_or_recoverable (_fd, rc);
198 error (connection_error);
199 return;
200 }
201
202 if (multicast) {
203 rc = rc | add_membership (_fd, udp_addr);
204 }
205 }
206
207 if (rc != 0) {
208 error (protocol_error);
209 } else {
210 if (_send_enabled) {
211 set_pollout (_handle);
212 }
213
214 if (_recv_enabled) {
215 set_pollin (_handle);
216
217 // Call restart output to drop all join/leave commands
218 restart_output ();
219 }
220 }
221}
222
223int zmq::udp_engine_t::set_udp_multicast_loop (fd_t s_,
224 bool is_ipv6_,
225 bool loop_)
226{
227 int level;
228 int optname;
229
230 if (is_ipv6_) {
231 level = IPPROTO_IPV6;
232 optname = IPV6_MULTICAST_LOOP;
233 } else {
234 level = IPPROTO_IP;
235 optname = IP_MULTICAST_LOOP;
236 }
237
238 int loop = loop_ ? 1 : 0;
239 int rc = setsockopt (s_, level, optname, reinterpret_cast<char *> (&loop),
240 sizeof (loop));
241 assert_success_or_recoverable (s_, rc);
242 return rc;
243}
244
245int zmq::udp_engine_t::set_udp_multicast_ttl (fd_t s_, bool is_ipv6_, int hops_)
246{
247 int level;
248
249 if (is_ipv6_) {
250 level = IPPROTO_IPV6;
251 } else {
252 level = IPPROTO_IP;
253 }
254
255 int rc = setsockopt (s_, level, IP_MULTICAST_TTL,
256 reinterpret_cast<char *> (&hops_), sizeof (hops_));
257 assert_success_or_recoverable (s_, rc);
258 return rc;
259}
260
261int zmq::udp_engine_t::set_udp_multicast_iface (fd_t s_,
262 bool is_ipv6_,
263 const udp_address_t *addr_)
264{
265 int rc = 0;
266
267 if (is_ipv6_) {
268 int bind_if = addr_->bind_if ();
269
270 if (bind_if > 0) {
271 // If a bind interface is provided we tell the
272 // kernel to use it to send multicast packets
273 rc = setsockopt (s_, IPPROTO_IPV6, IPV6_MULTICAST_IF,
274 reinterpret_cast<char *> (&bind_if),
275 sizeof (bind_if));
276 }
277 } else {
278 struct in_addr bind_addr = addr_->bind_addr ()->ipv4.sin_addr;
279
280 if (bind_addr.s_addr != INADDR_ANY) {
281 rc = setsockopt (s_, IPPROTO_IP, IP_MULTICAST_IF,
282 reinterpret_cast<char *> (&bind_addr),
283 sizeof (bind_addr));
284 }
285 }
286
287 assert_success_or_recoverable (s_, rc);
288 return rc;
289}
290
291int zmq::udp_engine_t::set_udp_reuse_address (fd_t s_, bool on_)
292{
293 int on = on_ ? 1 : 0;
294 int rc = setsockopt (s_, SOL_SOCKET, SO_REUSEADDR,
295 reinterpret_cast<char *> (&on), sizeof (on));
296 assert_success_or_recoverable (s_, rc);
297 return rc;
298}
299
300int zmq::udp_engine_t::set_udp_reuse_port (fd_t s_, bool on_)
301{
302#ifndef SO_REUSEPORT
303 return 0;
304#else
305 int on = on_ ? 1 : 0;
306 int rc = setsockopt (s_, SOL_SOCKET, SO_REUSEPORT,
307 reinterpret_cast<char *> (&on), sizeof (on));
308 assert_success_or_recoverable (s_, rc);
309 return rc;
310#endif
311}
312
313int zmq::udp_engine_t::add_membership (fd_t s_, const udp_address_t *addr_)
314{
315 const ip_addr_t *mcast_addr = addr_->target_addr ();
316 int rc = 0;
317
318 if (mcast_addr->family () == AF_INET) {
319 struct ip_mreq mreq;
320 mreq.imr_multiaddr = mcast_addr->ipv4.sin_addr;
321 mreq.imr_interface = addr_->bind_addr ()->ipv4.sin_addr;
322
323 rc = setsockopt (s_, IPPROTO_IP, IP_ADD_MEMBERSHIP,
324 reinterpret_cast<char *> (&mreq), sizeof (mreq));
325
326 } else if (mcast_addr->family () == AF_INET6) {
327 struct ipv6_mreq mreq;
328 int iface = addr_->bind_if ();
329
330 zmq_assert (iface >= -1);
331
332 mreq.ipv6mr_multiaddr = mcast_addr->ipv6.sin6_addr;
333 mreq.ipv6mr_interface = iface;
334
335 rc = setsockopt (s_, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
336 reinterpret_cast<char *> (&mreq), sizeof (mreq));
337 }
338
339 assert_success_or_recoverable (s_, rc);
340 return rc;
341}
342
343void zmq::udp_engine_t::error (error_reason_t reason_)
344{
345 zmq_assert (_session);
346 _session->engine_error (reason_);
347 terminate ();
348}
349
350void zmq::udp_engine_t::terminate ()
351{
352 zmq_assert (_plugged);
353 _plugged = false;
354
355 rm_fd (_handle);
356
357 // Disconnect from I/O threads poller object.
358 io_object_t::unplug ();
359
360 delete this;
361}
362
363void zmq::udp_engine_t::sockaddr_to_msg (zmq::msg_t *msg_, sockaddr_in *addr_)
364{
365 const char *const name = inet_ntoa (addr_->sin_addr);
366
367 char port[6];
368 const int port_len =
369 sprintf (port, "%d", static_cast<int> (ntohs (addr_->sin_port)));
370 zmq_assert (port_len > 0);
371
372 const size_t name_len = strlen (name);
373 const int size = static_cast<int> (name_len) + 1 /* colon */
374 + port_len + 1; // terminating NUL
375 const int rc = msg_->init_size (size);
376 errno_assert (rc == 0);
377 msg_->set_flags (msg_t::more);
378
379 // use memcpy instead of strcpy/strcat, since this is more efficient when
380 // we already know the lengths, which we calculated above
381 char *address = static_cast<char *> (msg_->data ());
382 memcpy (address, name, name_len);
383 address += name_len;
384 *address++ = ':';
385 memcpy (address, port, static_cast<size_t> (port_len));
386 address += port_len;
387 *address = 0;
388}
389
390int zmq::udp_engine_t::resolve_raw_address (char *name_, size_t length_)
391{
392 memset (&_raw_address, 0, sizeof _raw_address);
393
394 const char *delimiter = NULL;
395
396 // Find delimiter, cannot use memrchr as it is not supported on windows
397 if (length_ != 0) {
398 int chars_left = static_cast<int> (length_);
399 char *current_char = name_ + length_;
400 do {
401 if (*(--current_char) == ':') {
402 delimiter = current_char;
403 break;
404 }
405 } while (--chars_left != 0);
406 }
407
408 if (!delimiter) {
409 errno = EINVAL;
410 return -1;
411 }
412
413 std::string addr_str (name_, delimiter - name_);
414 std::string port_str (delimiter + 1, name_ + length_ - delimiter - 1);
415
416 // Parse the port number (0 is not a valid port).
417 uint16_t port = static_cast<uint16_t> (atoi (port_str.c_str ()));
418 if (port == 0) {
419 errno = EINVAL;
420 return -1;
421 }
422
423 _raw_address.sin_family = AF_INET;
424 _raw_address.sin_port = htons (port);
425 _raw_address.sin_addr.s_addr = inet_addr (addr_str.c_str ());
426
427 if (_raw_address.sin_addr.s_addr == INADDR_NONE) {
428 errno = EINVAL;
429 return -1;
430 }
431
432 return 0;
433}
434
435void zmq::udp_engine_t::out_event ()
436{
437 msg_t group_msg;
438 int rc = _session->pull_msg (&group_msg);
439 errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));
440
441 if (rc == 0) {
442 msg_t body_msg;
443 rc = _session->pull_msg (&body_msg);
444 // If there's a group, there should also be a body
445 errno_assert (rc == 0);
446
447 const size_t group_size = group_msg.size ();
448 const size_t body_size = body_msg.size ();
449 size_t size;
450
451 if (_options.raw_socket) {
452 rc = resolve_raw_address (static_cast<char *> (group_msg.data ()),
453 group_size);
454
455 // We discard the message if address is not valid
456 if (rc != 0) {
457 rc = group_msg.close ();
458 errno_assert (rc == 0);
459
460 rc = body_msg.close ();
461 errno_assert (rc == 0);
462
463 return;
464 }
465
466 size = body_size;
467
468 memcpy (_out_buffer, body_msg.data (), body_size);
469 } else {
470 size = group_size + body_size + 1;
471
472 // TODO: check if larger than maximum size
473 _out_buffer[0] = static_cast<unsigned char> (group_size);
474 memcpy (_out_buffer + 1, group_msg.data (), group_size);
475 memcpy (_out_buffer + 1 + group_size, body_msg.data (), body_size);
476 }
477
478 rc = group_msg.close ();
479 errno_assert (rc == 0);
480
481 body_msg.close ();
482 errno_assert (rc == 0);
483
484#ifdef ZMQ_HAVE_WINDOWS
485 rc = sendto (_fd, _out_buffer, static_cast<int> (size), 0, _out_address,
486 _out_address_len);
487#elif defined ZMQ_HAVE_VXWORKS
488 rc = sendto (_fd, reinterpret_cast<caddr_t> (_out_buffer), size, 0,
489 (sockaddr *) _out_address, _out_address_len);
490#else
491 rc = sendto (_fd, _out_buffer, size, 0, _out_address, _out_address_len);
492#endif
493 if (rc < 0) {
494#ifdef ZMQ_HAVE_WINDOWS
495 if (WSAGetLastError () != WSAEWOULDBLOCK) {
496 assert_success_or_recoverable (_fd, rc);
497 error (connection_error);
498 }
499#else
500 if (rc != EWOULDBLOCK) {
501 assert_success_or_recoverable (_fd, rc);
502 error (connection_error);
503 }
504#endif
505 }
506 } else {
507 reset_pollout (_handle);
508 }
509}
510
511const zmq::endpoint_uri_pair_t &zmq::udp_engine_t::get_endpoint () const
512{
513 return _empty_endpoint;
514}
515
516void zmq::udp_engine_t::restart_output ()
517{
518 // If we don't support send we just drop all messages
519 if (!_send_enabled) {
520 msg_t msg;
521 while (_session->pull_msg (&msg) == 0)
522 msg.close ();
523 } else {
524 set_pollout (_handle);
525 out_event ();
526 }
527}
528
529void zmq::udp_engine_t::in_event ()
530{
531 sockaddr_storage in_address;
532 zmq_socklen_t in_addrlen =
533 static_cast<zmq_socklen_t> (sizeof (sockaddr_storage));
534
535 const int nbytes =
536 recvfrom (_fd, _in_buffer, MAX_UDP_MSG, 0,
537 reinterpret_cast<sockaddr *> (&in_address), &in_addrlen);
538
539 if (nbytes < 0) {
540#ifdef ZMQ_HAVE_WINDOWS
541 if (WSAGetLastError () != WSAEWOULDBLOCK) {
542 assert_success_or_recoverable (_fd, nbytes);
543 error (connection_error);
544 }
545#else
546 if (nbytes != EWOULDBLOCK) {
547 assert_success_or_recoverable (_fd, nbytes);
548 error (connection_error);
549 }
550#endif
551 return;
552 }
553
554 int rc;
555 int body_size;
556 int body_offset;
557 msg_t msg;
558
559 if (_options.raw_socket) {
560 zmq_assert (in_address.ss_family == AF_INET);
561 sockaddr_to_msg (&msg, reinterpret_cast<sockaddr_in *> (&in_address));
562
563 body_size = nbytes;
564 body_offset = 0;
565 } else {
566 // TODO in out_event, the group size is an *unsigned* char. what is
567 // the maximum value?
568 const char *group_buffer = _in_buffer + 1;
569 const int group_size = _in_buffer[0];
570
571 rc = msg.init_size (group_size);
572 errno_assert (rc == 0);
573 msg.set_flags (msg_t::more);
574 memcpy (msg.data (), group_buffer, group_size);
575
576 // This doesn't fit, just ingore
577 if (nbytes - 1 < group_size)
578 return;
579
580 body_size = nbytes - 1 - group_size;
581 body_offset = 1 + group_size;
582 }
583 // Push group description to session
584 rc = _session->push_msg (&msg);
585 errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));
586
587 // Group description message doesn't fit in the pipe, drop
588 if (rc != 0) {
589 rc = msg.close ();
590 errno_assert (rc == 0);
591
592 reset_pollin (_handle);
593 return;
594 }
595
596 rc = msg.close ();
597 errno_assert (rc == 0);
598 rc = msg.init_size (body_size);
599 errno_assert (rc == 0);
600 memcpy (msg.data (), _in_buffer + body_offset, body_size);
601
602 // Push message body to session
603 rc = _session->push_msg (&msg);
604 // Message body doesn't fit in the pipe, drop and reset session state
605 if (rc != 0) {
606 rc = msg.close ();
607 errno_assert (rc == 0);
608
609 _session->reset ();
610 reset_pollin (_handle);
611 return;
612 }
613
614 rc = msg.close ();
615 errno_assert (rc == 0);
616 _session->flush ();
617}
618
619bool zmq::udp_engine_t::restart_input ()
620{
621 if (_recv_enabled) {
622 set_pollin (_handle);
623 in_event ();
624 }
625
626 return true;
627}
628