1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22#include "uv.h"
23#include "internal.h"
24
25#include <assert.h>
26#include <string.h>
27#include <errno.h>
28#include <stdlib.h>
29#include <unistd.h>
30#if defined(__MVS__)
31#include <xti.h>
32#endif
33#include <sys/un.h>
34
35#define UV__UDP_DGRAM_MAXSIZE (64 * 1024)
36
37#if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP)
38# define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
39#endif
40
41#if defined(IPV6_LEAVE_GROUP) && !defined(IPV6_DROP_MEMBERSHIP)
42# define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
43#endif
44
45union uv__sockaddr {
46 struct sockaddr_in6 in6;
47 struct sockaddr_in in;
48 struct sockaddr addr;
49};
50
51static void uv__udp_run_completed(uv_udp_t* handle);
52static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
53static void uv__udp_recvmsg(uv_udp_t* handle);
54static void uv__udp_sendmsg(uv_udp_t* handle);
55static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
56 int domain,
57 unsigned int flags);
58
59#if HAVE_MMSG
60
61#define UV__MMSG_MAXWIDTH 20
62
63static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf);
64static void uv__udp_sendmmsg(uv_udp_t* handle);
65
66static int uv__recvmmsg_avail;
67static int uv__sendmmsg_avail;
68static uv_once_t once = UV_ONCE_INIT;
69
70static void uv__udp_mmsg_init(void) {
71 int ret;
72 int s;
73 s = uv__socket(AF_INET, SOCK_DGRAM, 0);
74 if (s < 0)
75 return;
76 ret = uv__sendmmsg(s, NULL, 0);
77 if (ret == 0 || errno != ENOSYS) {
78 uv__sendmmsg_avail = 1;
79 uv__recvmmsg_avail = 1;
80 } else {
81 ret = uv__recvmmsg(s, NULL, 0);
82 if (ret == 0 || errno != ENOSYS)
83 uv__recvmmsg_avail = 1;
84 }
85 uv__close(s);
86}
87
88#endif
89
90void uv__udp_close(uv_udp_t* handle) {
91 uv__io_close(handle->loop, &handle->io_watcher);
92 uv__handle_stop(handle);
93
94 if (handle->io_watcher.fd != -1) {
95 uv__close(handle->io_watcher.fd);
96 handle->io_watcher.fd = -1;
97 }
98}
99
100
101void uv__udp_finish_close(uv_udp_t* handle) {
102 uv_udp_send_t* req;
103 QUEUE* q;
104
105 assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
106 assert(handle->io_watcher.fd == -1);
107
108 while (!QUEUE_EMPTY(&handle->write_queue)) {
109 q = QUEUE_HEAD(&handle->write_queue);
110 QUEUE_REMOVE(q);
111
112 req = QUEUE_DATA(q, uv_udp_send_t, queue);
113 req->status = UV_ECANCELED;
114 QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
115 }
116
117 uv__udp_run_completed(handle);
118
119 assert(handle->send_queue_size == 0);
120 assert(handle->send_queue_count == 0);
121
122 /* Now tear down the handle. */
123 handle->recv_cb = NULL;
124 handle->alloc_cb = NULL;
125 /* but _do not_ touch close_cb */
126}
127
128
129static void uv__udp_run_completed(uv_udp_t* handle) {
130 uv_udp_send_t* req;
131 QUEUE* q;
132
133 assert(!(handle->flags & UV_HANDLE_UDP_PROCESSING));
134 handle->flags |= UV_HANDLE_UDP_PROCESSING;
135
136 while (!QUEUE_EMPTY(&handle->write_completed_queue)) {
137 q = QUEUE_HEAD(&handle->write_completed_queue);
138 QUEUE_REMOVE(q);
139
140 req = QUEUE_DATA(q, uv_udp_send_t, queue);
141 uv__req_unregister(handle->loop, req);
142
143 handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
144 handle->send_queue_count--;
145
146 if (req->bufs != req->bufsml)
147 uv__free(req->bufs);
148 req->bufs = NULL;
149
150 if (req->send_cb == NULL)
151 continue;
152
153 /* req->status >= 0 == bytes written
154 * req->status < 0 == errno
155 */
156 if (req->status >= 0)
157 req->send_cb(req, 0);
158 else
159 req->send_cb(req, req->status);
160 }
161
162 if (QUEUE_EMPTY(&handle->write_queue)) {
163 /* Pending queue and completion queue empty, stop watcher. */
164 uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT);
165 if (!uv__io_active(&handle->io_watcher, POLLIN))
166 uv__handle_stop(handle);
167 }
168
169 handle->flags &= ~UV_HANDLE_UDP_PROCESSING;
170}
171
172
173static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
174 uv_udp_t* handle;
175
176 handle = container_of(w, uv_udp_t, io_watcher);
177 assert(handle->type == UV_UDP);
178
179 if (revents & POLLIN)
180 uv__udp_recvmsg(handle);
181
182 if (revents & POLLOUT) {
183 uv__udp_sendmsg(handle);
184 uv__udp_run_completed(handle);
185 }
186}
187
188#if HAVE_MMSG
189static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf) {
190 struct sockaddr_in6 peers[UV__MMSG_MAXWIDTH];
191 struct iovec iov[UV__MMSG_MAXWIDTH];
192 struct uv__mmsghdr msgs[UV__MMSG_MAXWIDTH];
193 ssize_t nread;
194 uv_buf_t chunk_buf;
195 size_t chunks;
196 int flags;
197 size_t k;
198
199 /* prepare structures for recvmmsg */
200 chunks = buf->len / UV__UDP_DGRAM_MAXSIZE;
201 if (chunks > ARRAY_SIZE(iov))
202 chunks = ARRAY_SIZE(iov);
203 for (k = 0; k < chunks; ++k) {
204 iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE;
205 iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE;
206 msgs[k].msg_hdr.msg_iov = iov + k;
207 msgs[k].msg_hdr.msg_iovlen = 1;
208 msgs[k].msg_hdr.msg_name = peers + k;
209 msgs[k].msg_hdr.msg_namelen = sizeof(peers[0]);
210 msgs[k].msg_hdr.msg_control = NULL;
211 msgs[k].msg_hdr.msg_controllen = 0;
212 msgs[k].msg_hdr.msg_flags = 0;
213 }
214
215 do
216 nread = uv__recvmmsg(handle->io_watcher.fd, msgs, chunks);
217 while (nread == -1 && errno == EINTR);
218
219 if (nread < 1) {
220 if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
221 handle->recv_cb(handle, 0, buf, NULL, 0);
222 else
223 handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0);
224 } else {
225 /* pass each chunk to the application */
226 for (k = 0; k < (size_t) nread && handle->recv_cb != NULL; k++) {
227 flags = UV_UDP_MMSG_CHUNK;
228 if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC)
229 flags |= UV_UDP_PARTIAL;
230
231 chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len);
232 handle->recv_cb(handle,
233 msgs[k].msg_len,
234 &chunk_buf,
235 msgs[k].msg_hdr.msg_name,
236 flags);
237 }
238
239 /* one last callback so the original buffer is freed */
240 if (handle->recv_cb != NULL)
241 handle->recv_cb(handle, 0, buf, NULL, UV_UDP_MMSG_FREE);
242 }
243 return nread;
244}
245#endif
246
247static void uv__udp_recvmsg(uv_udp_t* handle) {
248 struct sockaddr_storage peer;
249 struct msghdr h;
250 ssize_t nread;
251 uv_buf_t buf;
252 int flags;
253 int count;
254
255 assert(handle->recv_cb != NULL);
256 assert(handle->alloc_cb != NULL);
257
258 /* Prevent loop starvation when the data comes in as fast as (or faster than)
259 * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
260 */
261 count = 32;
262
263 do {
264 buf = uv_buf_init(NULL, 0);
265 handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf);
266 if (buf.base == NULL || buf.len == 0) {
267 handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0);
268 return;
269 }
270 assert(buf.base != NULL);
271
272#if HAVE_MMSG
273 if (uv_udp_using_recvmmsg(handle)) {
274 nread = uv__udp_recvmmsg(handle, &buf);
275 if (nread > 0)
276 count -= nread;
277 continue;
278 }
279#endif
280
281 memset(&h, 0, sizeof(h));
282 memset(&peer, 0, sizeof(peer));
283 h.msg_name = &peer;
284 h.msg_namelen = sizeof(peer);
285 h.msg_iov = (void*) &buf;
286 h.msg_iovlen = 1;
287
288 do {
289 nread = recvmsg(handle->io_watcher.fd, &h, 0);
290 }
291 while (nread == -1 && errno == EINTR);
292
293 if (nread == -1) {
294 if (errno == EAGAIN || errno == EWOULDBLOCK)
295 handle->recv_cb(handle, 0, &buf, NULL, 0);
296 else
297 handle->recv_cb(handle, UV__ERR(errno), &buf, NULL, 0);
298 }
299 else {
300 flags = 0;
301 if (h.msg_flags & MSG_TRUNC)
302 flags |= UV_UDP_PARTIAL;
303
304 handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags);
305 }
306 count--;
307 }
308 /* recv_cb callback may decide to pause or close the handle */
309 while (nread != -1
310 && count > 0
311 && handle->io_watcher.fd != -1
312 && handle->recv_cb != NULL);
313}
314
315#if HAVE_MMSG
316static void uv__udp_sendmmsg(uv_udp_t* handle) {
317 uv_udp_send_t* req;
318 struct uv__mmsghdr h[UV__MMSG_MAXWIDTH];
319 struct uv__mmsghdr *p;
320 QUEUE* q;
321 ssize_t npkts;
322 size_t pkts;
323 size_t i;
324
325 if (QUEUE_EMPTY(&handle->write_queue))
326 return;
327
328write_queue_drain:
329 for (pkts = 0, q = QUEUE_HEAD(&handle->write_queue);
330 pkts < UV__MMSG_MAXWIDTH && q != &handle->write_queue;
331 ++pkts, q = QUEUE_HEAD(q)) {
332 assert(q != NULL);
333 req = QUEUE_DATA(q, uv_udp_send_t, queue);
334 assert(req != NULL);
335
336 p = &h[pkts];
337 memset(p, 0, sizeof(*p));
338 if (req->addr.ss_family == AF_UNSPEC) {
339 p->msg_hdr.msg_name = NULL;
340 p->msg_hdr.msg_namelen = 0;
341 } else {
342 p->msg_hdr.msg_name = &req->addr;
343 if (req->addr.ss_family == AF_INET6)
344 p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in6);
345 else if (req->addr.ss_family == AF_INET)
346 p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in);
347 else if (req->addr.ss_family == AF_UNIX)
348 p->msg_hdr.msg_namelen = sizeof(struct sockaddr_un);
349 else {
350 assert(0 && "unsupported address family");
351 abort();
352 }
353 }
354 h[pkts].msg_hdr.msg_iov = (struct iovec*) req->bufs;
355 h[pkts].msg_hdr.msg_iovlen = req->nbufs;
356 }
357
358 do
359 npkts = uv__sendmmsg(handle->io_watcher.fd, h, pkts);
360 while (npkts == -1 && errno == EINTR);
361
362 if (npkts < 1) {
363 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
364 return;
365 for (i = 0, q = QUEUE_HEAD(&handle->write_queue);
366 i < pkts && q != &handle->write_queue;
367 ++i, q = QUEUE_HEAD(&handle->write_queue)) {
368 assert(q != NULL);
369 req = QUEUE_DATA(q, uv_udp_send_t, queue);
370 assert(req != NULL);
371
372 req->status = UV__ERR(errno);
373 QUEUE_REMOVE(&req->queue);
374 QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
375 }
376 uv__io_feed(handle->loop, &handle->io_watcher);
377 return;
378 }
379
380 for (i = 0, q = QUEUE_HEAD(&handle->write_queue);
381 i < pkts && q != &handle->write_queue;
382 ++i, q = QUEUE_HEAD(&handle->write_queue)) {
383 assert(q != NULL);
384 req = QUEUE_DATA(q, uv_udp_send_t, queue);
385 assert(req != NULL);
386
387 req->status = req->bufs[0].len;
388
389 /* Sending a datagram is an atomic operation: either all data
390 * is written or nothing is (and EMSGSIZE is raised). That is
391 * why we don't handle partial writes. Just pop the request
392 * off the write queue and onto the completed queue, done.
393 */
394 QUEUE_REMOVE(&req->queue);
395 QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
396 }
397
398 /* couldn't batch everything, continue sending (jump to avoid stack growth) */
399 if (!QUEUE_EMPTY(&handle->write_queue))
400 goto write_queue_drain;
401 uv__io_feed(handle->loop, &handle->io_watcher);
402 return;
403}
404#endif
405
406static void uv__udp_sendmsg(uv_udp_t* handle) {
407 uv_udp_send_t* req;
408 struct msghdr h;
409 QUEUE* q;
410 ssize_t size;
411
412#if HAVE_MMSG
413 uv_once(&once, uv__udp_mmsg_init);
414 if (uv__sendmmsg_avail) {
415 uv__udp_sendmmsg(handle);
416 return;
417 }
418#endif
419
420 while (!QUEUE_EMPTY(&handle->write_queue)) {
421 q = QUEUE_HEAD(&handle->write_queue);
422 assert(q != NULL);
423
424 req = QUEUE_DATA(q, uv_udp_send_t, queue);
425 assert(req != NULL);
426
427 memset(&h, 0, sizeof h);
428 if (req->addr.ss_family == AF_UNSPEC) {
429 h.msg_name = NULL;
430 h.msg_namelen = 0;
431 } else {
432 h.msg_name = &req->addr;
433 if (req->addr.ss_family == AF_INET6)
434 h.msg_namelen = sizeof(struct sockaddr_in6);
435 else if (req->addr.ss_family == AF_INET)
436 h.msg_namelen = sizeof(struct sockaddr_in);
437 else if (req->addr.ss_family == AF_UNIX)
438 h.msg_namelen = sizeof(struct sockaddr_un);
439 else {
440 assert(0 && "unsupported address family");
441 abort();
442 }
443 }
444 h.msg_iov = (struct iovec*) req->bufs;
445 h.msg_iovlen = req->nbufs;
446
447 do {
448 size = sendmsg(handle->io_watcher.fd, &h, 0);
449 } while (size == -1 && errno == EINTR);
450
451 if (size == -1) {
452 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
453 break;
454 }
455
456 req->status = (size == -1 ? UV__ERR(errno) : size);
457
458 /* Sending a datagram is an atomic operation: either all data
459 * is written or nothing is (and EMSGSIZE is raised). That is
460 * why we don't handle partial writes. Just pop the request
461 * off the write queue and onto the completed queue, done.
462 */
463 QUEUE_REMOVE(&req->queue);
464 QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
465 uv__io_feed(handle->loop, &handle->io_watcher);
466 }
467}
468
469/* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
470 * refinements for programs that use multicast.
471 *
472 * Linux as of 3.9 has a SO_REUSEPORT socket option but with semantics that
473 * are different from the BSDs: it _shares_ the port rather than steal it
474 * from the current listener. While useful, it's not something we can emulate
475 * on other platforms so we don't enable it.
476 *
477 * zOS does not support getsockname with SO_REUSEPORT option when using
478 * AF_UNIX.
479 */
480static int uv__set_reuse(int fd) {
481 int yes;
482 yes = 1;
483
484#if defined(SO_REUSEPORT) && defined(__MVS__)
485 struct sockaddr_in sockfd;
486 unsigned int sockfd_len = sizeof(sockfd);
487 if (getsockname(fd, (struct sockaddr*) &sockfd, &sockfd_len) == -1)
488 return UV__ERR(errno);
489 if (sockfd.sin_family == AF_UNIX) {
490 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
491 return UV__ERR(errno);
492 } else {
493 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
494 return UV__ERR(errno);
495 }
496#elif defined(SO_REUSEPORT) && !defined(__linux__)
497 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
498 return UV__ERR(errno);
499#else
500 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
501 return UV__ERR(errno);
502#endif
503
504 return 0;
505}
506
507
508int uv__udp_bind(uv_udp_t* handle,
509 const struct sockaddr* addr,
510 unsigned int addrlen,
511 unsigned int flags) {
512 int err;
513 int yes;
514 int fd;
515
516 /* Check for bad flags. */
517 if (flags & ~(UV_UDP_IPV6ONLY | UV_UDP_REUSEADDR))
518 return UV_EINVAL;
519
520 /* Cannot set IPv6-only mode on non-IPv6 socket. */
521 if ((flags & UV_UDP_IPV6ONLY) && addr->sa_family != AF_INET6)
522 return UV_EINVAL;
523
524 fd = handle->io_watcher.fd;
525 if (fd == -1) {
526 err = uv__socket(addr->sa_family, SOCK_DGRAM, 0);
527 if (err < 0)
528 return err;
529 fd = err;
530 handle->io_watcher.fd = fd;
531 }
532
533 if (flags & UV_UDP_REUSEADDR) {
534 err = uv__set_reuse(fd);
535 if (err)
536 return err;
537 }
538
539 if (flags & UV_UDP_IPV6ONLY) {
540#ifdef IPV6_V6ONLY
541 yes = 1;
542 if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) {
543 err = UV__ERR(errno);
544 return err;
545 }
546#else
547 err = UV_ENOTSUP;
548 return err;
549#endif
550 }
551
552 if (bind(fd, addr, addrlen)) {
553 err = UV__ERR(errno);
554 if (errno == EAFNOSUPPORT)
555 /* OSX, other BSDs and SunoS fail with EAFNOSUPPORT when binding a
556 * socket created with AF_INET to an AF_INET6 address or vice versa. */
557 err = UV_EINVAL;
558 return err;
559 }
560
561 if (addr->sa_family == AF_INET6)
562 handle->flags |= UV_HANDLE_IPV6;
563
564 handle->flags |= UV_HANDLE_BOUND;
565 return 0;
566}
567
568
569static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
570 int domain,
571 unsigned int flags) {
572 union uv__sockaddr taddr;
573 socklen_t addrlen;
574
575 if (handle->io_watcher.fd != -1)
576 return 0;
577
578 switch (domain) {
579 case AF_INET:
580 {
581 struct sockaddr_in* addr = &taddr.in;
582 memset(addr, 0, sizeof *addr);
583 addr->sin_family = AF_INET;
584 addr->sin_addr.s_addr = INADDR_ANY;
585 addrlen = sizeof *addr;
586 break;
587 }
588 case AF_INET6:
589 {
590 struct sockaddr_in6* addr = &taddr.in6;
591 memset(addr, 0, sizeof *addr);
592 addr->sin6_family = AF_INET6;
593 addr->sin6_addr = in6addr_any;
594 addrlen = sizeof *addr;
595 break;
596 }
597 default:
598 assert(0 && "unsupported address family");
599 abort();
600 }
601
602 return uv__udp_bind(handle, &taddr.addr, addrlen, flags);
603}
604
605
606int uv__udp_connect(uv_udp_t* handle,
607 const struct sockaddr* addr,
608 unsigned int addrlen) {
609 int err;
610
611 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
612 if (err)
613 return err;
614
615 do {
616 errno = 0;
617 err = connect(handle->io_watcher.fd, addr, addrlen);
618 } while (err == -1 && errno == EINTR);
619
620 if (err)
621 return UV__ERR(errno);
622
623 handle->flags |= UV_HANDLE_UDP_CONNECTED;
624
625 return 0;
626}
627
628
629int uv__udp_disconnect(uv_udp_t* handle) {
630 int r;
631 struct sockaddr addr;
632
633 memset(&addr, 0, sizeof(addr));
634
635 addr.sa_family = AF_UNSPEC;
636
637 do {
638 errno = 0;
639 r = connect(handle->io_watcher.fd, &addr, sizeof(addr));
640 } while (r == -1 && errno == EINTR);
641
642 if (r == -1 && errno != EAFNOSUPPORT)
643 return UV__ERR(errno);
644
645 handle->flags &= ~UV_HANDLE_UDP_CONNECTED;
646 return 0;
647}
648
649
650int uv__udp_send(uv_udp_send_t* req,
651 uv_udp_t* handle,
652 const uv_buf_t bufs[],
653 unsigned int nbufs,
654 const struct sockaddr* addr,
655 unsigned int addrlen,
656 uv_udp_send_cb send_cb) {
657 int err;
658 int empty_queue;
659
660 assert(nbufs > 0);
661
662 if (addr) {
663 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
664 if (err)
665 return err;
666 }
667
668 /* It's legal for send_queue_count > 0 even when the write_queue is empty;
669 * it means there are error-state requests in the write_completed_queue that
670 * will touch up send_queue_size/count later.
671 */
672 empty_queue = (handle->send_queue_count == 0);
673
674 uv__req_init(handle->loop, req, UV_UDP_SEND);
675 assert(addrlen <= sizeof(req->addr));
676 if (addr == NULL)
677 req->addr.ss_family = AF_UNSPEC;
678 else
679 memcpy(&req->addr, addr, addrlen);
680 req->send_cb = send_cb;
681 req->handle = handle;
682 req->nbufs = nbufs;
683
684 req->bufs = req->bufsml;
685 if (nbufs > ARRAY_SIZE(req->bufsml))
686 req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
687
688 if (req->bufs == NULL) {
689 uv__req_unregister(handle->loop, req);
690 return UV_ENOMEM;
691 }
692
693 memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
694 handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
695 handle->send_queue_count++;
696 QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue);
697 uv__handle_start(handle);
698
699 if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) {
700 uv__udp_sendmsg(handle);
701
702 /* `uv__udp_sendmsg` may not be able to do non-blocking write straight
703 * away. In such cases the `io_watcher` has to be queued for asynchronous
704 * write.
705 */
706 if (!QUEUE_EMPTY(&handle->write_queue))
707 uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
708 } else {
709 uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
710 }
711
712 return 0;
713}
714
715
716int uv__udp_try_send(uv_udp_t* handle,
717 const uv_buf_t bufs[],
718 unsigned int nbufs,
719 const struct sockaddr* addr,
720 unsigned int addrlen) {
721 int err;
722 struct msghdr h;
723 ssize_t size;
724
725 assert(nbufs > 0);
726
727 /* already sending a message */
728 if (handle->send_queue_count != 0)
729 return UV_EAGAIN;
730
731 if (addr) {
732 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
733 if (err)
734 return err;
735 } else {
736 assert(handle->flags & UV_HANDLE_UDP_CONNECTED);
737 }
738
739 memset(&h, 0, sizeof h);
740 h.msg_name = (struct sockaddr*) addr;
741 h.msg_namelen = addrlen;
742 h.msg_iov = (struct iovec*) bufs;
743 h.msg_iovlen = nbufs;
744
745 do {
746 size = sendmsg(handle->io_watcher.fd, &h, 0);
747 } while (size == -1 && errno == EINTR);
748
749 if (size == -1) {
750 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
751 return UV_EAGAIN;
752 else
753 return UV__ERR(errno);
754 }
755
756 return size;
757}
758
759
760static int uv__udp_set_membership4(uv_udp_t* handle,
761 const struct sockaddr_in* multicast_addr,
762 const char* interface_addr,
763 uv_membership membership) {
764 struct ip_mreq mreq;
765 int optname;
766 int err;
767
768 memset(&mreq, 0, sizeof mreq);
769
770 if (interface_addr) {
771 err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
772 if (err)
773 return err;
774 } else {
775 mreq.imr_interface.s_addr = htonl(INADDR_ANY);
776 }
777
778 mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
779
780 switch (membership) {
781 case UV_JOIN_GROUP:
782 optname = IP_ADD_MEMBERSHIP;
783 break;
784 case UV_LEAVE_GROUP:
785 optname = IP_DROP_MEMBERSHIP;
786 break;
787 default:
788 return UV_EINVAL;
789 }
790
791 if (setsockopt(handle->io_watcher.fd,
792 IPPROTO_IP,
793 optname,
794 &mreq,
795 sizeof(mreq))) {
796#if defined(__MVS__)
797 if (errno == ENXIO)
798 return UV_ENODEV;
799#endif
800 return UV__ERR(errno);
801 }
802
803 return 0;
804}
805
806
807static int uv__udp_set_membership6(uv_udp_t* handle,
808 const struct sockaddr_in6* multicast_addr,
809 const char* interface_addr,
810 uv_membership membership) {
811 int optname;
812 struct ipv6_mreq mreq;
813 struct sockaddr_in6 addr6;
814
815 memset(&mreq, 0, sizeof mreq);
816
817 if (interface_addr) {
818 if (uv_ip6_addr(interface_addr, 0, &addr6))
819 return UV_EINVAL;
820 mreq.ipv6mr_interface = addr6.sin6_scope_id;
821 } else {
822 mreq.ipv6mr_interface = 0;
823 }
824
825 mreq.ipv6mr_multiaddr = multicast_addr->sin6_addr;
826
827 switch (membership) {
828 case UV_JOIN_GROUP:
829 optname = IPV6_ADD_MEMBERSHIP;
830 break;
831 case UV_LEAVE_GROUP:
832 optname = IPV6_DROP_MEMBERSHIP;
833 break;
834 default:
835 return UV_EINVAL;
836 }
837
838 if (setsockopt(handle->io_watcher.fd,
839 IPPROTO_IPV6,
840 optname,
841 &mreq,
842 sizeof(mreq))) {
843#if defined(__MVS__)
844 if (errno == ENXIO)
845 return UV_ENODEV;
846#endif
847 return UV__ERR(errno);
848 }
849
850 return 0;
851}
852
853
854#if !defined(__OpenBSD__) && \
855 !defined(__NetBSD__) && \
856 !defined(__ANDROID__) && \
857 !defined(__DragonFly__) & \
858 !defined(__QNX__)
859static int uv__udp_set_source_membership4(uv_udp_t* handle,
860 const struct sockaddr_in* multicast_addr,
861 const char* interface_addr,
862 const struct sockaddr_in* source_addr,
863 uv_membership membership) {
864 struct ip_mreq_source mreq;
865 int optname;
866 int err;
867
868 err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
869 if (err)
870 return err;
871
872 memset(&mreq, 0, sizeof(mreq));
873
874 if (interface_addr != NULL) {
875 err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
876 if (err)
877 return err;
878 } else {
879 mreq.imr_interface.s_addr = htonl(INADDR_ANY);
880 }
881
882 mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
883 mreq.imr_sourceaddr.s_addr = source_addr->sin_addr.s_addr;
884
885 if (membership == UV_JOIN_GROUP)
886 optname = IP_ADD_SOURCE_MEMBERSHIP;
887 else if (membership == UV_LEAVE_GROUP)
888 optname = IP_DROP_SOURCE_MEMBERSHIP;
889 else
890 return UV_EINVAL;
891
892 if (setsockopt(handle->io_watcher.fd,
893 IPPROTO_IP,
894 optname,
895 &mreq,
896 sizeof(mreq))) {
897 return UV__ERR(errno);
898 }
899
900 return 0;
901}
902
903
904static int uv__udp_set_source_membership6(uv_udp_t* handle,
905 const struct sockaddr_in6* multicast_addr,
906 const char* interface_addr,
907 const struct sockaddr_in6* source_addr,
908 uv_membership membership) {
909 struct group_source_req mreq;
910 struct sockaddr_in6 addr6;
911 int optname;
912 int err;
913
914 err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
915 if (err)
916 return err;
917
918 memset(&mreq, 0, sizeof(mreq));
919
920 if (interface_addr != NULL) {
921 err = uv_ip6_addr(interface_addr, 0, &addr6);
922 if (err)
923 return err;
924 mreq.gsr_interface = addr6.sin6_scope_id;
925 } else {
926 mreq.gsr_interface = 0;
927 }
928
929 STATIC_ASSERT(sizeof(mreq.gsr_group) >= sizeof(*multicast_addr));
930 STATIC_ASSERT(sizeof(mreq.gsr_source) >= sizeof(*source_addr));
931 memcpy(&mreq.gsr_group, multicast_addr, sizeof(*multicast_addr));
932 memcpy(&mreq.gsr_source, source_addr, sizeof(*source_addr));
933
934 if (membership == UV_JOIN_GROUP)
935 optname = MCAST_JOIN_SOURCE_GROUP;
936 else if (membership == UV_LEAVE_GROUP)
937 optname = MCAST_LEAVE_SOURCE_GROUP;
938 else
939 return UV_EINVAL;
940
941 if (setsockopt(handle->io_watcher.fd,
942 IPPROTO_IPV6,
943 optname,
944 &mreq,
945 sizeof(mreq))) {
946 return UV__ERR(errno);
947 }
948
949 return 0;
950}
951#endif
952
953
954int uv__udp_init_ex(uv_loop_t* loop,
955 uv_udp_t* handle,
956 unsigned flags,
957 int domain) {
958 int fd;
959
960 fd = -1;
961 if (domain != AF_UNSPEC) {
962 fd = uv__socket(domain, SOCK_DGRAM, 0);
963 if (fd < 0)
964 return fd;
965 }
966
967 uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);
968 handle->alloc_cb = NULL;
969 handle->recv_cb = NULL;
970 handle->send_queue_size = 0;
971 handle->send_queue_count = 0;
972 uv__io_init(&handle->io_watcher, uv__udp_io, fd);
973 QUEUE_INIT(&handle->write_queue);
974 QUEUE_INIT(&handle->write_completed_queue);
975
976 return 0;
977}
978
979
980int uv_udp_using_recvmmsg(const uv_udp_t* handle) {
981#if HAVE_MMSG
982 if (handle->flags & UV_HANDLE_UDP_RECVMMSG) {
983 uv_once(&once, uv__udp_mmsg_init);
984 return uv__recvmmsg_avail;
985 }
986#endif
987 return 0;
988}
989
990
991int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) {
992 int err;
993
994 /* Check for already active socket. */
995 if (handle->io_watcher.fd != -1)
996 return UV_EBUSY;
997
998 if (uv__fd_exists(handle->loop, sock))
999 return UV_EEXIST;
1000
1001 err = uv__nonblock(sock, 1);
1002 if (err)
1003 return err;
1004
1005 err = uv__set_reuse(sock);
1006 if (err)
1007 return err;
1008
1009 handle->io_watcher.fd = sock;
1010 if (uv__udp_is_connected(handle))
1011 handle->flags |= UV_HANDLE_UDP_CONNECTED;
1012
1013 return 0;
1014}
1015
1016
1017int uv_udp_set_membership(uv_udp_t* handle,
1018 const char* multicast_addr,
1019 const char* interface_addr,
1020 uv_membership membership) {
1021 int err;
1022 struct sockaddr_in addr4;
1023 struct sockaddr_in6 addr6;
1024
1025 if (uv_ip4_addr(multicast_addr, 0, &addr4) == 0) {
1026 err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
1027 if (err)
1028 return err;
1029 return uv__udp_set_membership4(handle, &addr4, interface_addr, membership);
1030 } else if (uv_ip6_addr(multicast_addr, 0, &addr6) == 0) {
1031 err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
1032 if (err)
1033 return err;
1034 return uv__udp_set_membership6(handle, &addr6, interface_addr, membership);
1035 } else {
1036 return UV_EINVAL;
1037 }
1038}
1039
1040
1041int uv_udp_set_source_membership(uv_udp_t* handle,
1042 const char* multicast_addr,
1043 const char* interface_addr,
1044 const char* source_addr,
1045 uv_membership membership) {
1046#if !defined(__OpenBSD__) && \
1047 !defined(__NetBSD__) && \
1048 !defined(__ANDROID__) && \
1049 !defined(__DragonFly__) && \
1050 !defined(__QNX__)
1051 int err;
1052 union uv__sockaddr mcast_addr;
1053 union uv__sockaddr src_addr;
1054
1055 err = uv_ip4_addr(multicast_addr, 0, &mcast_addr.in);
1056 if (err) {
1057 err = uv_ip6_addr(multicast_addr, 0, &mcast_addr.in6);
1058 if (err)
1059 return err;
1060 err = uv_ip6_addr(source_addr, 0, &src_addr.in6);
1061 if (err)
1062 return err;
1063 return uv__udp_set_source_membership6(handle,
1064 &mcast_addr.in6,
1065 interface_addr,
1066 &src_addr.in6,
1067 membership);
1068 }
1069
1070 err = uv_ip4_addr(source_addr, 0, &src_addr.in);
1071 if (err)
1072 return err;
1073 return uv__udp_set_source_membership4(handle,
1074 &mcast_addr.in,
1075 interface_addr,
1076 &src_addr.in,
1077 membership);
1078#else
1079 return UV_ENOSYS;
1080#endif
1081}
1082
1083
1084static int uv__setsockopt(uv_udp_t* handle,
1085 int option4,
1086 int option6,
1087 const void* val,
1088 socklen_t size) {
1089 int r;
1090
1091 if (handle->flags & UV_HANDLE_IPV6)
1092 r = setsockopt(handle->io_watcher.fd,
1093 IPPROTO_IPV6,
1094 option6,
1095 val,
1096 size);
1097 else
1098 r = setsockopt(handle->io_watcher.fd,
1099 IPPROTO_IP,
1100 option4,
1101 val,
1102 size);
1103 if (r)
1104 return UV__ERR(errno);
1105
1106 return 0;
1107}
1108
1109static int uv__setsockopt_maybe_char(uv_udp_t* handle,
1110 int option4,
1111 int option6,
1112 int val) {
1113#if defined(__sun) || defined(_AIX) || defined(__MVS__)
1114 char arg = val;
1115#elif defined(__OpenBSD__)
1116 unsigned char arg = val;
1117#else
1118 int arg = val;
1119#endif
1120
1121 if (val < 0 || val > 255)
1122 return UV_EINVAL;
1123
1124 return uv__setsockopt(handle, option4, option6, &arg, sizeof(arg));
1125}
1126
1127
1128int uv_udp_set_broadcast(uv_udp_t* handle, int on) {
1129 if (setsockopt(handle->io_watcher.fd,
1130 SOL_SOCKET,
1131 SO_BROADCAST,
1132 &on,
1133 sizeof(on))) {
1134 return UV__ERR(errno);
1135 }
1136
1137 return 0;
1138}
1139
1140
1141int uv_udp_set_ttl(uv_udp_t* handle, int ttl) {
1142 if (ttl < 1 || ttl > 255)
1143 return UV_EINVAL;
1144
1145#if defined(__MVS__)
1146 if (!(handle->flags & UV_HANDLE_IPV6))
1147 return UV_ENOTSUP; /* zOS does not support setting ttl for IPv4 */
1148#endif
1149
1150/*
1151 * On Solaris and derivatives such as SmartOS, the length of socket options
1152 * is sizeof(int) for IP_TTL and IPV6_UNICAST_HOPS,
1153 * so hardcode the size of these options on this platform,
1154 * and use the general uv__setsockopt_maybe_char call on other platforms.
1155 */
1156#if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1157 defined(__MVS__) || defined(__QNX__)
1158
1159 return uv__setsockopt(handle,
1160 IP_TTL,
1161 IPV6_UNICAST_HOPS,
1162 &ttl,
1163 sizeof(ttl));
1164
1165#else /* !(defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
1166 defined(__MVS__) || defined(__QNX__)) */
1167
1168 return uv__setsockopt_maybe_char(handle,
1169 IP_TTL,
1170 IPV6_UNICAST_HOPS,
1171 ttl);
1172
1173#endif /* defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
1174 defined(__MVS__) || defined(__QNX__) */
1175}
1176
1177
1178int uv_udp_set_multicast_ttl(uv_udp_t* handle, int ttl) {
1179/*
1180 * On Solaris and derivatives such as SmartOS, the length of socket options
1181 * is sizeof(int) for IPV6_MULTICAST_HOPS and sizeof(char) for
1182 * IP_MULTICAST_TTL, so hardcode the size of the option in the IPv6 case,
1183 * and use the general uv__setsockopt_maybe_char call otherwise.
1184 */
1185#if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1186 defined(__MVS__) || defined(__QNX__)
1187 if (handle->flags & UV_HANDLE_IPV6)
1188 return uv__setsockopt(handle,
1189 IP_MULTICAST_TTL,
1190 IPV6_MULTICAST_HOPS,
1191 &ttl,
1192 sizeof(ttl));
1193#endif /* defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1194 defined(__MVS__) || defined(__QNX__) */
1195
1196 return uv__setsockopt_maybe_char(handle,
1197 IP_MULTICAST_TTL,
1198 IPV6_MULTICAST_HOPS,
1199 ttl);
1200}
1201
1202
1203int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) {
1204/*
1205 * On Solaris and derivatives such as SmartOS, the length of socket options
1206 * is sizeof(int) for IPV6_MULTICAST_LOOP and sizeof(char) for
1207 * IP_MULTICAST_LOOP, so hardcode the size of the option in the IPv6 case,
1208 * and use the general uv__setsockopt_maybe_char call otherwise.
1209 */
1210#if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
1211 defined(__MVS__) || defined(__QNX__)
1212 if (handle->flags & UV_HANDLE_IPV6)
1213 return uv__setsockopt(handle,
1214 IP_MULTICAST_LOOP,
1215 IPV6_MULTICAST_LOOP,
1216 &on,
1217 sizeof(on));
1218#endif /* defined(__sun) || defined(_AIX) ||defined(__OpenBSD__) ||
1219 defined(__MVS__) || defined(__QNX__) */
1220
1221 return uv__setsockopt_maybe_char(handle,
1222 IP_MULTICAST_LOOP,
1223 IPV6_MULTICAST_LOOP,
1224 on);
1225}
1226
1227int uv_udp_set_multicast_interface(uv_udp_t* handle, const char* interface_addr) {
1228 struct sockaddr_storage addr_st;
1229 struct sockaddr_in* addr4;
1230 struct sockaddr_in6* addr6;
1231
1232 addr4 = (struct sockaddr_in*) &addr_st;
1233 addr6 = (struct sockaddr_in6*) &addr_st;
1234
1235 if (!interface_addr) {
1236 memset(&addr_st, 0, sizeof addr_st);
1237 if (handle->flags & UV_HANDLE_IPV6) {
1238 addr_st.ss_family = AF_INET6;
1239 addr6->sin6_scope_id = 0;
1240 } else {
1241 addr_st.ss_family = AF_INET;
1242 addr4->sin_addr.s_addr = htonl(INADDR_ANY);
1243 }
1244 } else if (uv_ip4_addr(interface_addr, 0, addr4) == 0) {
1245 /* nothing, address was parsed */
1246 } else if (uv_ip6_addr(interface_addr, 0, addr6) == 0) {
1247 /* nothing, address was parsed */
1248 } else {
1249 return UV_EINVAL;
1250 }
1251
1252 if (addr_st.ss_family == AF_INET) {
1253 if (setsockopt(handle->io_watcher.fd,
1254 IPPROTO_IP,
1255 IP_MULTICAST_IF,
1256 (void*) &addr4->sin_addr,
1257 sizeof(addr4->sin_addr)) == -1) {
1258 return UV__ERR(errno);
1259 }
1260 } else if (addr_st.ss_family == AF_INET6) {
1261 if (setsockopt(handle->io_watcher.fd,
1262 IPPROTO_IPV6,
1263 IPV6_MULTICAST_IF,
1264 &addr6->sin6_scope_id,
1265 sizeof(addr6->sin6_scope_id)) == -1) {
1266 return UV__ERR(errno);
1267 }
1268 } else {
1269 assert(0 && "unexpected address family");
1270 abort();
1271 }
1272
1273 return 0;
1274}
1275
1276int uv_udp_getpeername(const uv_udp_t* handle,
1277 struct sockaddr* name,
1278 int* namelen) {
1279
1280 return uv__getsockpeername((const uv_handle_t*) handle,
1281 getpeername,
1282 name,
1283 namelen);
1284}
1285
1286int uv_udp_getsockname(const uv_udp_t* handle,
1287 struct sockaddr* name,
1288 int* namelen) {
1289
1290 return uv__getsockpeername((const uv_handle_t*) handle,
1291 getsockname,
1292 name,
1293 namelen);
1294}
1295
1296
1297int uv__udp_recv_start(uv_udp_t* handle,
1298 uv_alloc_cb alloc_cb,
1299 uv_udp_recv_cb recv_cb) {
1300 int err;
1301
1302 if (alloc_cb == NULL || recv_cb == NULL)
1303 return UV_EINVAL;
1304
1305 if (uv__io_active(&handle->io_watcher, POLLIN))
1306 return UV_EALREADY; /* FIXME(bnoordhuis) Should be UV_EBUSY. */
1307
1308 err = uv__udp_maybe_deferred_bind(handle, AF_INET, 0);
1309 if (err)
1310 return err;
1311
1312 handle->alloc_cb = alloc_cb;
1313 handle->recv_cb = recv_cb;
1314
1315 uv__io_start(handle->loop, &handle->io_watcher, POLLIN);
1316 uv__handle_start(handle);
1317
1318 return 0;
1319}
1320
1321
1322int uv__udp_recv_stop(uv_udp_t* handle) {
1323 uv__io_stop(handle->loop, &handle->io_watcher, POLLIN);
1324
1325 if (!uv__io_active(&handle->io_watcher, POLLOUT))
1326 uv__handle_stop(handle);
1327
1328 handle->alloc_cb = NULL;
1329 handle->recv_cb = NULL;
1330
1331 return 0;
1332}
1333