1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #include "precompiled.hpp" |
31 | |
32 | #if !defined ZMQ_HAVE_WINDOWS |
33 | #include <sys/types.h> |
34 | #include <unistd.h> |
35 | #include <sys/socket.h> |
36 | #include <netinet/in.h> |
37 | #include <arpa/inet.h> |
38 | #ifdef ZMQ_HAVE_VXWORKS |
39 | #include <sockLib.h> |
40 | #endif |
41 | #endif |
42 | |
43 | #include "udp_address.hpp" |
44 | #include "udp_engine.hpp" |
45 | #include "session_base.hpp" |
46 | #include "err.hpp" |
47 | #include "ip.hpp" |
48 | |
49 | // OSX uses a different name for this socket option |
50 | #ifndef IPV6_ADD_MEMBERSHIP |
51 | #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP |
52 | #endif |
53 | |
54 | #ifdef __APPLE__ |
55 | #include <TargetConditionals.h> |
56 | #endif |
57 | |
58 | zmq::udp_engine_t::udp_engine_t (const options_t &options_) : |
59 | _plugged (false), |
60 | _fd (-1), |
61 | _session (NULL), |
62 | _handle (static_cast<handle_t> (NULL)), |
63 | _address (NULL), |
64 | _options (options_), |
65 | _send_enabled (false), |
66 | _recv_enabled (false) |
67 | { |
68 | } |
69 | |
70 | zmq::udp_engine_t::~udp_engine_t () |
71 | { |
72 | zmq_assert (!_plugged); |
73 | |
74 | if (_fd != retired_fd) { |
75 | #ifdef ZMQ_HAVE_WINDOWS |
76 | int rc = closesocket (_fd); |
77 | wsa_assert (rc != SOCKET_ERROR); |
78 | #else |
79 | int rc = close (_fd); |
80 | errno_assert (rc == 0); |
81 | #endif |
82 | _fd = retired_fd; |
83 | } |
84 | } |
85 | |
86 | int zmq::udp_engine_t::init (address_t *address_, bool send_, bool recv_) |
87 | { |
88 | zmq_assert (address_); |
89 | zmq_assert (send_ || recv_); |
90 | _send_enabled = send_; |
91 | _recv_enabled = recv_; |
92 | _address = address_; |
93 | |
94 | _fd = open_socket (_address->resolved.udp_addr->family (), SOCK_DGRAM, |
95 | IPPROTO_UDP); |
96 | if (_fd == retired_fd) |
97 | return -1; |
98 | |
99 | unblock_socket (_fd); |
100 | |
101 | return 0; |
102 | } |
103 | |
104 | void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_) |
105 | { |
106 | zmq_assert (!_plugged); |
107 | _plugged = true; |
108 | |
109 | zmq_assert (!_session); |
110 | zmq_assert (session_); |
111 | _session = session_; |
112 | |
113 | // Connect to I/O threads poller object. |
114 | io_object_t::plug (io_thread_); |
115 | _handle = add_fd (_fd); |
116 | |
117 | const udp_address_t *const udp_addr = _address->resolved.udp_addr; |
118 | |
119 | int rc = 0; |
120 | |
121 | // Bind the socket to a device if applicable |
122 | if (!_options.bound_device.empty ()) { |
123 | rc = rc | bind_to_device (_fd, _options.bound_device); |
124 | if (rc != 0) { |
125 | assert_success_or_recoverable (_fd, rc); |
126 | error (connection_error); |
127 | return; |
128 | } |
129 | } |
130 | |
131 | if (_send_enabled) { |
132 | if (!_options.raw_socket) { |
133 | const ip_addr_t *out = udp_addr->target_addr (); |
134 | _out_address = out->as_sockaddr (); |
135 | _out_address_len = out->sockaddr_len (); |
136 | |
137 | if (out->is_multicast ()) { |
138 | bool is_ipv6 = (out->family () == AF_INET6); |
139 | rc = rc |
140 | | set_udp_multicast_loop (_fd, is_ipv6, |
141 | _options.multicast_loop); |
142 | |
143 | if (_options.multicast_hops > 0) { |
144 | rc = rc |
145 | | set_udp_multicast_ttl (_fd, is_ipv6, |
146 | _options.multicast_hops); |
147 | } |
148 | |
149 | rc = rc | set_udp_multicast_iface (_fd, is_ipv6, udp_addr); |
150 | } |
151 | } else { |
152 | /// XXX fixme ? |
153 | _out_address = reinterpret_cast<sockaddr *> (&_raw_address); |
154 | _out_address_len = |
155 | static_cast<zmq_socklen_t> (sizeof (sockaddr_in)); |
156 | } |
157 | } |
158 | |
159 | if (_recv_enabled) { |
160 | rc = rc | set_udp_reuse_address (_fd, true); |
161 | |
162 | const ip_addr_t *bind_addr = udp_addr->bind_addr (); |
163 | ip_addr_t any = ip_addr_t::any (bind_addr->family ()); |
164 | const ip_addr_t *real_bind_addr; |
165 | |
166 | bool multicast = udp_addr->is_mcast (); |
167 | |
168 | if (multicast) { |
169 | // Multicast addresses should be allowed to bind to more than |
170 | // one port as all ports should receive the message |
171 | rc = rc | set_udp_reuse_port (_fd, true); |
172 | |
173 | // In multicast we should bind ANY and use the mreq struct to |
174 | // specify the interface |
175 | any.set_port (bind_addr->port ()); |
176 | |
177 | real_bind_addr = &any; |
178 | } else { |
179 | real_bind_addr = bind_addr; |
180 | } |
181 | |
182 | if (rc != 0) { |
183 | error (protocol_error); |
184 | return; |
185 | } |
186 | |
187 | #ifdef ZMQ_HAVE_VXWORKS |
188 | rc = rc |
189 | | bind (_fd, (sockaddr *) real_bind_addr->as_sockaddr (), |
190 | real_bind_addr->sockaddr_len ()); |
191 | #else |
192 | rc = rc |
193 | | bind (_fd, real_bind_addr->as_sockaddr (), |
194 | real_bind_addr->sockaddr_len ()); |
195 | #endif |
196 | if (rc != 0) { |
197 | assert_success_or_recoverable (_fd, rc); |
198 | error (connection_error); |
199 | return; |
200 | } |
201 | |
202 | if (multicast) { |
203 | rc = rc | add_membership (_fd, udp_addr); |
204 | } |
205 | } |
206 | |
207 | if (rc != 0) { |
208 | error (protocol_error); |
209 | } else { |
210 | if (_send_enabled) { |
211 | set_pollout (_handle); |
212 | } |
213 | |
214 | if (_recv_enabled) { |
215 | set_pollin (_handle); |
216 | |
217 | // Call restart output to drop all join/leave commands |
218 | restart_output (); |
219 | } |
220 | } |
221 | } |
222 | |
223 | int zmq::udp_engine_t::set_udp_multicast_loop (fd_t s_, |
224 | bool is_ipv6_, |
225 | bool loop_) |
226 | { |
227 | int level; |
228 | int optname; |
229 | |
230 | if (is_ipv6_) { |
231 | level = IPPROTO_IPV6; |
232 | optname = IPV6_MULTICAST_LOOP; |
233 | } else { |
234 | level = IPPROTO_IP; |
235 | optname = IP_MULTICAST_LOOP; |
236 | } |
237 | |
238 | int loop = loop_ ? 1 : 0; |
239 | int rc = setsockopt (s_, level, optname, reinterpret_cast<char *> (&loop), |
240 | sizeof (loop)); |
241 | assert_success_or_recoverable (s_, rc); |
242 | return rc; |
243 | } |
244 | |
245 | int zmq::udp_engine_t::set_udp_multicast_ttl (fd_t s_, bool is_ipv6_, int hops_) |
246 | { |
247 | int level; |
248 | |
249 | if (is_ipv6_) { |
250 | level = IPPROTO_IPV6; |
251 | } else { |
252 | level = IPPROTO_IP; |
253 | } |
254 | |
255 | int rc = setsockopt (s_, level, IP_MULTICAST_TTL, |
256 | reinterpret_cast<char *> (&hops_), sizeof (hops_)); |
257 | assert_success_or_recoverable (s_, rc); |
258 | return rc; |
259 | } |
260 | |
261 | int zmq::udp_engine_t::set_udp_multicast_iface (fd_t s_, |
262 | bool is_ipv6_, |
263 | const udp_address_t *addr_) |
264 | { |
265 | int rc = 0; |
266 | |
267 | if (is_ipv6_) { |
268 | int bind_if = addr_->bind_if (); |
269 | |
270 | if (bind_if > 0) { |
271 | // If a bind interface is provided we tell the |
272 | // kernel to use it to send multicast packets |
273 | rc = setsockopt (s_, IPPROTO_IPV6, IPV6_MULTICAST_IF, |
274 | reinterpret_cast<char *> (&bind_if), |
275 | sizeof (bind_if)); |
276 | } |
277 | } else { |
278 | struct in_addr bind_addr = addr_->bind_addr ()->ipv4.sin_addr; |
279 | |
280 | if (bind_addr.s_addr != INADDR_ANY) { |
281 | rc = setsockopt (s_, IPPROTO_IP, IP_MULTICAST_IF, |
282 | reinterpret_cast<char *> (&bind_addr), |
283 | sizeof (bind_addr)); |
284 | } |
285 | } |
286 | |
287 | assert_success_or_recoverable (s_, rc); |
288 | return rc; |
289 | } |
290 | |
291 | int zmq::udp_engine_t::set_udp_reuse_address (fd_t s_, bool on_) |
292 | { |
293 | int on = on_ ? 1 : 0; |
294 | int rc = setsockopt (s_, SOL_SOCKET, SO_REUSEADDR, |
295 | reinterpret_cast<char *> (&on), sizeof (on)); |
296 | assert_success_or_recoverable (s_, rc); |
297 | return rc; |
298 | } |
299 | |
300 | int zmq::udp_engine_t::set_udp_reuse_port (fd_t s_, bool on_) |
301 | { |
302 | #ifndef SO_REUSEPORT |
303 | return 0; |
304 | #else |
305 | int on = on_ ? 1 : 0; |
306 | int rc = setsockopt (s_, SOL_SOCKET, SO_REUSEPORT, |
307 | reinterpret_cast<char *> (&on), sizeof (on)); |
308 | assert_success_or_recoverable (s_, rc); |
309 | return rc; |
310 | #endif |
311 | } |
312 | |
313 | int zmq::udp_engine_t::add_membership (fd_t s_, const udp_address_t *addr_) |
314 | { |
315 | const ip_addr_t *mcast_addr = addr_->target_addr (); |
316 | int rc = 0; |
317 | |
318 | if (mcast_addr->family () == AF_INET) { |
319 | struct ip_mreq mreq; |
320 | mreq.imr_multiaddr = mcast_addr->ipv4.sin_addr; |
321 | mreq.imr_interface = addr_->bind_addr ()->ipv4.sin_addr; |
322 | |
323 | rc = setsockopt (s_, IPPROTO_IP, IP_ADD_MEMBERSHIP, |
324 | reinterpret_cast<char *> (&mreq), sizeof (mreq)); |
325 | |
326 | } else if (mcast_addr->family () == AF_INET6) { |
327 | struct ipv6_mreq mreq; |
328 | int iface = addr_->bind_if (); |
329 | |
330 | zmq_assert (iface >= -1); |
331 | |
332 | mreq.ipv6mr_multiaddr = mcast_addr->ipv6.sin6_addr; |
333 | mreq.ipv6mr_interface = iface; |
334 | |
335 | rc = setsockopt (s_, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, |
336 | reinterpret_cast<char *> (&mreq), sizeof (mreq)); |
337 | } |
338 | |
339 | assert_success_or_recoverable (s_, rc); |
340 | return rc; |
341 | } |
342 | |
343 | void zmq::udp_engine_t::error (error_reason_t reason_) |
344 | { |
345 | zmq_assert (_session); |
346 | _session->engine_error (reason_); |
347 | terminate (); |
348 | } |
349 | |
350 | void zmq::udp_engine_t::terminate () |
351 | { |
352 | zmq_assert (_plugged); |
353 | _plugged = false; |
354 | |
355 | rm_fd (_handle); |
356 | |
357 | // Disconnect from I/O threads poller object. |
358 | io_object_t::unplug (); |
359 | |
360 | delete this; |
361 | } |
362 | |
363 | void zmq::udp_engine_t::sockaddr_to_msg (zmq::msg_t *msg_, sockaddr_in *addr_) |
364 | { |
365 | const char *const name = inet_ntoa (addr_->sin_addr); |
366 | |
367 | char port[6]; |
368 | const int port_len = |
369 | sprintf (port, "%d" , static_cast<int> (ntohs (addr_->sin_port))); |
370 | zmq_assert (port_len > 0); |
371 | |
372 | const size_t name_len = strlen (name); |
373 | const int size = static_cast<int> (name_len) + 1 /* colon */ |
374 | + port_len + 1; // terminating NUL |
375 | const int rc = msg_->init_size (size); |
376 | errno_assert (rc == 0); |
377 | msg_->set_flags (msg_t::more); |
378 | |
379 | // use memcpy instead of strcpy/strcat, since this is more efficient when |
380 | // we already know the lengths, which we calculated above |
381 | char *address = static_cast<char *> (msg_->data ()); |
382 | memcpy (address, name, name_len); |
383 | address += name_len; |
384 | *address++ = ':'; |
385 | memcpy (address, port, static_cast<size_t> (port_len)); |
386 | address += port_len; |
387 | *address = 0; |
388 | } |
389 | |
390 | int zmq::udp_engine_t::resolve_raw_address (char *name_, size_t length_) |
391 | { |
392 | memset (&_raw_address, 0, sizeof _raw_address); |
393 | |
394 | const char *delimiter = NULL; |
395 | |
396 | // Find delimiter, cannot use memrchr as it is not supported on windows |
397 | if (length_ != 0) { |
398 | int chars_left = static_cast<int> (length_); |
399 | char *current_char = name_ + length_; |
400 | do { |
401 | if (*(--current_char) == ':') { |
402 | delimiter = current_char; |
403 | break; |
404 | } |
405 | } while (--chars_left != 0); |
406 | } |
407 | |
408 | if (!delimiter) { |
409 | errno = EINVAL; |
410 | return -1; |
411 | } |
412 | |
413 | std::string addr_str (name_, delimiter - name_); |
414 | std::string port_str (delimiter + 1, name_ + length_ - delimiter - 1); |
415 | |
416 | // Parse the port number (0 is not a valid port). |
417 | uint16_t port = static_cast<uint16_t> (atoi (port_str.c_str ())); |
418 | if (port == 0) { |
419 | errno = EINVAL; |
420 | return -1; |
421 | } |
422 | |
423 | _raw_address.sin_family = AF_INET; |
424 | _raw_address.sin_port = htons (port); |
425 | _raw_address.sin_addr.s_addr = inet_addr (addr_str.c_str ()); |
426 | |
427 | if (_raw_address.sin_addr.s_addr == INADDR_NONE) { |
428 | errno = EINVAL; |
429 | return -1; |
430 | } |
431 | |
432 | return 0; |
433 | } |
434 | |
435 | void zmq::udp_engine_t::out_event () |
436 | { |
437 | msg_t group_msg; |
438 | int rc = _session->pull_msg (&group_msg); |
439 | errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN)); |
440 | |
441 | if (rc == 0) { |
442 | msg_t body_msg; |
443 | rc = _session->pull_msg (&body_msg); |
444 | // If there's a group, there should also be a body |
445 | errno_assert (rc == 0); |
446 | |
447 | const size_t group_size = group_msg.size (); |
448 | const size_t body_size = body_msg.size (); |
449 | size_t size; |
450 | |
451 | if (_options.raw_socket) { |
452 | rc = resolve_raw_address (static_cast<char *> (group_msg.data ()), |
453 | group_size); |
454 | |
455 | // We discard the message if address is not valid |
456 | if (rc != 0) { |
457 | rc = group_msg.close (); |
458 | errno_assert (rc == 0); |
459 | |
460 | rc = body_msg.close (); |
461 | errno_assert (rc == 0); |
462 | |
463 | return; |
464 | } |
465 | |
466 | size = body_size; |
467 | |
468 | memcpy (_out_buffer, body_msg.data (), body_size); |
469 | } else { |
470 | size = group_size + body_size + 1; |
471 | |
472 | // TODO: check if larger than maximum size |
473 | _out_buffer[0] = static_cast<unsigned char> (group_size); |
474 | memcpy (_out_buffer + 1, group_msg.data (), group_size); |
475 | memcpy (_out_buffer + 1 + group_size, body_msg.data (), body_size); |
476 | } |
477 | |
478 | rc = group_msg.close (); |
479 | errno_assert (rc == 0); |
480 | |
481 | body_msg.close (); |
482 | errno_assert (rc == 0); |
483 | |
484 | #ifdef ZMQ_HAVE_WINDOWS |
485 | rc = sendto (_fd, _out_buffer, static_cast<int> (size), 0, _out_address, |
486 | _out_address_len); |
487 | #elif defined ZMQ_HAVE_VXWORKS |
488 | rc = sendto (_fd, reinterpret_cast<caddr_t> (_out_buffer), size, 0, |
489 | (sockaddr *) _out_address, _out_address_len); |
490 | #else |
491 | rc = sendto (_fd, _out_buffer, size, 0, _out_address, _out_address_len); |
492 | #endif |
493 | if (rc < 0) { |
494 | #ifdef ZMQ_HAVE_WINDOWS |
495 | if (WSAGetLastError () != WSAEWOULDBLOCK) { |
496 | assert_success_or_recoverable (_fd, rc); |
497 | error (connection_error); |
498 | } |
499 | #else |
500 | if (rc != EWOULDBLOCK) { |
501 | assert_success_or_recoverable (_fd, rc); |
502 | error (connection_error); |
503 | } |
504 | #endif |
505 | } |
506 | } else { |
507 | reset_pollout (_handle); |
508 | } |
509 | } |
510 | |
511 | const zmq::endpoint_uri_pair_t &zmq::udp_engine_t::get_endpoint () const |
512 | { |
513 | return _empty_endpoint; |
514 | } |
515 | |
516 | void zmq::udp_engine_t::restart_output () |
517 | { |
518 | // If we don't support send we just drop all messages |
519 | if (!_send_enabled) { |
520 | msg_t msg; |
521 | while (_session->pull_msg (&msg) == 0) |
522 | msg.close (); |
523 | } else { |
524 | set_pollout (_handle); |
525 | out_event (); |
526 | } |
527 | } |
528 | |
529 | void zmq::udp_engine_t::in_event () |
530 | { |
531 | sockaddr_storage in_address; |
532 | zmq_socklen_t in_addrlen = |
533 | static_cast<zmq_socklen_t> (sizeof (sockaddr_storage)); |
534 | |
535 | const int nbytes = |
536 | recvfrom (_fd, _in_buffer, MAX_UDP_MSG, 0, |
537 | reinterpret_cast<sockaddr *> (&in_address), &in_addrlen); |
538 | |
539 | if (nbytes < 0) { |
540 | #ifdef ZMQ_HAVE_WINDOWS |
541 | if (WSAGetLastError () != WSAEWOULDBLOCK) { |
542 | assert_success_or_recoverable (_fd, nbytes); |
543 | error (connection_error); |
544 | } |
545 | #else |
546 | if (nbytes != EWOULDBLOCK) { |
547 | assert_success_or_recoverable (_fd, nbytes); |
548 | error (connection_error); |
549 | } |
550 | #endif |
551 | return; |
552 | } |
553 | |
554 | int rc; |
555 | int body_size; |
556 | int body_offset; |
557 | msg_t msg; |
558 | |
559 | if (_options.raw_socket) { |
560 | zmq_assert (in_address.ss_family == AF_INET); |
561 | sockaddr_to_msg (&msg, reinterpret_cast<sockaddr_in *> (&in_address)); |
562 | |
563 | body_size = nbytes; |
564 | body_offset = 0; |
565 | } else { |
566 | // TODO in out_event, the group size is an *unsigned* char. what is |
567 | // the maximum value? |
568 | const char *group_buffer = _in_buffer + 1; |
569 | const int group_size = _in_buffer[0]; |
570 | |
571 | rc = msg.init_size (group_size); |
572 | errno_assert (rc == 0); |
573 | msg.set_flags (msg_t::more); |
574 | memcpy (msg.data (), group_buffer, group_size); |
575 | |
576 | // This doesn't fit, just ingore |
577 | if (nbytes - 1 < group_size) |
578 | return; |
579 | |
580 | body_size = nbytes - 1 - group_size; |
581 | body_offset = 1 + group_size; |
582 | } |
583 | // Push group description to session |
584 | rc = _session->push_msg (&msg); |
585 | errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN)); |
586 | |
587 | // Group description message doesn't fit in the pipe, drop |
588 | if (rc != 0) { |
589 | rc = msg.close (); |
590 | errno_assert (rc == 0); |
591 | |
592 | reset_pollin (_handle); |
593 | return; |
594 | } |
595 | |
596 | rc = msg.close (); |
597 | errno_assert (rc == 0); |
598 | rc = msg.init_size (body_size); |
599 | errno_assert (rc == 0); |
600 | memcpy (msg.data (), _in_buffer + body_offset, body_size); |
601 | |
602 | // Push message body to session |
603 | rc = _session->push_msg (&msg); |
604 | // Message body doesn't fit in the pipe, drop and reset session state |
605 | if (rc != 0) { |
606 | rc = msg.close (); |
607 | errno_assert (rc == 0); |
608 | |
609 | _session->reset (); |
610 | reset_pollin (_handle); |
611 | return; |
612 | } |
613 | |
614 | rc = msg.close (); |
615 | errno_assert (rc == 0); |
616 | _session->flush (); |
617 | } |
618 | |
619 | bool zmq::udp_engine_t::restart_input () |
620 | { |
621 | if (_recv_enabled) { |
622 | set_pollin (_handle); |
623 | in_event (); |
624 | } |
625 | |
626 | return true; |
627 | } |
628 | |