1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22#include "uv.h"
23#include "internal.h"
24
25#include <stdio.h>
26#include <stdlib.h>
27#include <string.h>
28#include <assert.h>
29#include <errno.h>
30
31#include <sys/types.h>
32#include <sys/socket.h>
33#include <sys/uio.h>
34#include <sys/un.h>
35#include <unistd.h>
36#include <limits.h> /* IOV_MAX */
37
38#if defined(__APPLE__)
39# include <sys/event.h>
40# include <sys/time.h>
41# include <sys/select.h>
42
43/* Forward declaration */
44typedef struct uv__stream_select_s uv__stream_select_t;
45
46struct uv__stream_select_s {
47 uv_stream_t* stream;
48 uv_thread_t thread;
49 uv_sem_t close_sem;
50 uv_sem_t async_sem;
51 uv_async_t async;
52 int events;
53 int fake_fd;
54 int int_fd;
55 int fd;
56 fd_set* sread;
57 size_t sread_sz;
58 fd_set* swrite;
59 size_t swrite_sz;
60};
61
62/* Due to a possible kernel bug at least in OS X 10.10 "Yosemite",
63 * EPROTOTYPE can be returned while trying to write to a socket that is
64 * shutting down. If we retry the write, we should get the expected EPIPE
65 * instead.
66 */
67# define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR || errno == EPROTOTYPE)
68# define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
69 (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS || \
70 (errno == EMSGSIZE && send_handle != NULL))
71#else
72# define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR)
73# define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \
74 (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
75#endif /* defined(__APPLE__) */
76
77static void uv__stream_connect(uv_stream_t*);
78static void uv__write(uv_stream_t* stream);
79static void uv__read(uv_stream_t* stream);
80static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
81static void uv__write_callbacks(uv_stream_t* stream);
82static size_t uv__write_req_size(uv_write_t* req);
83
84
85void uv__stream_init(uv_loop_t* loop,
86 uv_stream_t* stream,
87 uv_handle_type type) {
88 int err;
89
90 uv__handle_init(loop, (uv_handle_t*)stream, type);
91 stream->read_cb = NULL;
92 stream->alloc_cb = NULL;
93 stream->close_cb = NULL;
94 stream->connection_cb = NULL;
95 stream->connect_req = NULL;
96 stream->shutdown_req = NULL;
97 stream->accepted_fd = -1;
98 stream->queued_fds = NULL;
99 stream->delayed_error = 0;
100 QUEUE_INIT(&stream->write_queue);
101 QUEUE_INIT(&stream->write_completed_queue);
102 stream->write_queue_size = 0;
103
104 if (loop->emfile_fd == -1) {
105 err = uv__open_cloexec("/dev/null", O_RDONLY);
106 if (err < 0)
107 /* In the rare case that "/dev/null" isn't mounted open "/"
108 * instead.
109 */
110 err = uv__open_cloexec("/", O_RDONLY);
111 if (err >= 0)
112 loop->emfile_fd = err;
113 }
114
115#if defined(__APPLE__)
116 stream->select = NULL;
117#endif /* defined(__APPLE_) */
118
119 uv__io_init(&stream->io_watcher, uv__stream_io, -1);
120}
121
122
123static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
124#if defined(__APPLE__)
125 /* Notify select() thread about state change */
126 uv__stream_select_t* s;
127 int r;
128
129 s = stream->select;
130 if (s == NULL)
131 return;
132
133 /* Interrupt select() loop
134 * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
135 * emit read event on other side
136 */
137 do
138 r = write(s->fake_fd, "x", 1);
139 while (r == -1 && errno == EINTR);
140
141 assert(r == 1);
142#else /* !defined(__APPLE__) */
143 /* No-op on any other platform */
144#endif /* !defined(__APPLE__) */
145}
146
147
148#if defined(__APPLE__)
149static void uv__stream_osx_select(void* arg) {
150 uv_stream_t* stream;
151 uv__stream_select_t* s;
152 char buf[1024];
153 int events;
154 int fd;
155 int r;
156 int max_fd;
157
158 stream = arg;
159 s = stream->select;
160 fd = s->fd;
161
162 if (fd > s->int_fd)
163 max_fd = fd;
164 else
165 max_fd = s->int_fd;
166
167 while (1) {
168 /* Terminate on semaphore */
169 if (uv_sem_trywait(&s->close_sem) == 0)
170 break;
171
172 /* Watch fd using select(2) */
173 memset(s->sread, 0, s->sread_sz);
174 memset(s->swrite, 0, s->swrite_sz);
175
176 if (uv__io_active(&stream->io_watcher, POLLIN))
177 FD_SET(fd, s->sread);
178 if (uv__io_active(&stream->io_watcher, POLLOUT))
179 FD_SET(fd, s->swrite);
180 FD_SET(s->int_fd, s->sread);
181
182 /* Wait indefinitely for fd events */
183 r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL);
184 if (r == -1) {
185 if (errno == EINTR)
186 continue;
187
188 /* XXX: Possible?! */
189 abort();
190 }
191
192 /* Ignore timeouts */
193 if (r == 0)
194 continue;
195
196 /* Empty socketpair's buffer in case of interruption */
197 if (FD_ISSET(s->int_fd, s->sread))
198 while (1) {
199 r = read(s->int_fd, buf, sizeof(buf));
200
201 if (r == sizeof(buf))
202 continue;
203
204 if (r != -1)
205 break;
206
207 if (errno == EAGAIN || errno == EWOULDBLOCK)
208 break;
209
210 if (errno == EINTR)
211 continue;
212
213 abort();
214 }
215
216 /* Handle events */
217 events = 0;
218 if (FD_ISSET(fd, s->sread))
219 events |= POLLIN;
220 if (FD_ISSET(fd, s->swrite))
221 events |= POLLOUT;
222
223 assert(events != 0 || FD_ISSET(s->int_fd, s->sread));
224 if (events != 0) {
225 ACCESS_ONCE(int, s->events) = events;
226
227 uv_async_send(&s->async);
228 uv_sem_wait(&s->async_sem);
229
230 /* Should be processed at this stage */
231 assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
232 }
233 }
234}
235
236
237static void uv__stream_osx_select_cb(uv_async_t* handle) {
238 uv__stream_select_t* s;
239 uv_stream_t* stream;
240 int events;
241
242 s = container_of(handle, uv__stream_select_t, async);
243 stream = s->stream;
244
245 /* Get and reset stream's events */
246 events = s->events;
247 ACCESS_ONCE(int, s->events) = 0;
248
249 assert(events != 0);
250 assert(events == (events & (POLLIN | POLLOUT)));
251
252 /* Invoke callback on event-loop */
253 if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN))
254 uv__stream_io(stream->loop, &stream->io_watcher, POLLIN);
255
256 if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
257 uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
258
259 if (stream->flags & UV_HANDLE_CLOSING)
260 return;
261
262 /* NOTE: It is important to do it here, otherwise `select()` might be called
263 * before the actual `uv__read()`, leading to the blocking syscall
264 */
265 uv_sem_post(&s->async_sem);
266}
267
268
269static void uv__stream_osx_cb_close(uv_handle_t* async) {
270 uv__stream_select_t* s;
271
272 s = container_of(async, uv__stream_select_t, async);
273 uv__free(s);
274}
275
276
277int uv__stream_try_select(uv_stream_t* stream, int* fd) {
278 /*
279 * kqueue doesn't work with some files from /dev mount on osx.
280 * select(2) in separate thread for those fds
281 */
282
283 struct kevent filter[1];
284 struct kevent events[1];
285 struct timespec timeout;
286 uv__stream_select_t* s;
287 int fds[2];
288 int err;
289 int ret;
290 int kq;
291 int old_fd;
292 int max_fd;
293 size_t sread_sz;
294 size_t swrite_sz;
295
296 kq = kqueue();
297 if (kq == -1) {
298 perror("(libuv) kqueue()");
299 return UV__ERR(errno);
300 }
301
302 EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
303
304 /* Use small timeout, because we only want to capture EINVALs */
305 timeout.tv_sec = 0;
306 timeout.tv_nsec = 1;
307
308 do
309 ret = kevent(kq, filter, 1, events, 1, &timeout);
310 while (ret == -1 && errno == EINTR);
311
312 uv__close(kq);
313
314 if (ret == -1)
315 return UV__ERR(errno);
316
317 if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
318 return 0;
319
320 /* At this point we definitely know that this fd won't work with kqueue */
321
322 /*
323 * Create fds for io watcher and to interrupt the select() loop.
324 * NOTE: do it ahead of malloc below to allocate enough space for fd_sets
325 */
326 if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
327 return UV__ERR(errno);
328
329 max_fd = *fd;
330 if (fds[1] > max_fd)
331 max_fd = fds[1];
332
333 sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY;
334 swrite_sz = sread_sz;
335
336 s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz);
337 if (s == NULL) {
338 err = UV_ENOMEM;
339 goto failed_malloc;
340 }
341
342 s->events = 0;
343 s->fd = *fd;
344 s->sread = (fd_set*) ((char*) s + sizeof(*s));
345 s->sread_sz = sread_sz;
346 s->swrite = (fd_set*) ((char*) s->sread + sread_sz);
347 s->swrite_sz = swrite_sz;
348
349 err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
350 if (err)
351 goto failed_async_init;
352
353 s->async.flags |= UV_HANDLE_INTERNAL;
354 uv__handle_unref(&s->async);
355
356 err = uv_sem_init(&s->close_sem, 0);
357 if (err != 0)
358 goto failed_close_sem_init;
359
360 err = uv_sem_init(&s->async_sem, 0);
361 if (err != 0)
362 goto failed_async_sem_init;
363
364 s->fake_fd = fds[0];
365 s->int_fd = fds[1];
366
367 old_fd = *fd;
368 s->stream = stream;
369 stream->select = s;
370 *fd = s->fake_fd;
371
372 err = uv_thread_create(&s->thread, uv__stream_osx_select, stream);
373 if (err != 0)
374 goto failed_thread_create;
375
376 return 0;
377
378failed_thread_create:
379 s->stream = NULL;
380 stream->select = NULL;
381 *fd = old_fd;
382
383 uv_sem_destroy(&s->async_sem);
384
385failed_async_sem_init:
386 uv_sem_destroy(&s->close_sem);
387
388failed_close_sem_init:
389 uv__close(fds[0]);
390 uv__close(fds[1]);
391 uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
392 return err;
393
394failed_async_init:
395 uv__free(s);
396
397failed_malloc:
398 uv__close(fds[0]);
399 uv__close(fds[1]);
400
401 return err;
402}
403#endif /* defined(__APPLE__) */
404
405
406int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
407#if defined(__APPLE__)
408 int enable;
409#endif
410
411 if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
412 return UV_EBUSY;
413
414 assert(fd >= 0);
415 stream->flags |= flags;
416
417 if (stream->type == UV_TCP) {
418 if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
419 return UV__ERR(errno);
420
421 /* TODO Use delay the user passed in. */
422 if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
423 uv__tcp_keepalive(fd, 1, 60)) {
424 return UV__ERR(errno);
425 }
426 }
427
428#if defined(__APPLE__)
429 enable = 1;
430 if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
431 errno != ENOTSOCK &&
432 errno != EINVAL) {
433 return UV__ERR(errno);
434 }
435#endif
436
437 stream->io_watcher.fd = fd;
438
439 return 0;
440}
441
442
443void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
444 uv_write_t* req;
445 QUEUE* q;
446 while (!QUEUE_EMPTY(&stream->write_queue)) {
447 q = QUEUE_HEAD(&stream->write_queue);
448 QUEUE_REMOVE(q);
449
450 req = QUEUE_DATA(q, uv_write_t, queue);
451 req->error = error;
452
453 QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
454 }
455}
456
457
458void uv__stream_destroy(uv_stream_t* stream) {
459 assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
460 assert(stream->flags & UV_HANDLE_CLOSED);
461
462 if (stream->connect_req) {
463 uv__req_unregister(stream->loop, stream->connect_req);
464 stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
465 stream->connect_req = NULL;
466 }
467
468 uv__stream_flush_write_queue(stream, UV_ECANCELED);
469 uv__write_callbacks(stream);
470
471 if (stream->shutdown_req) {
472 /* The ECANCELED error code is a lie, the shutdown(2) syscall is a
473 * fait accompli at this point. Maybe we should revisit this in v0.11.
474 * A possible reason for leaving it unchanged is that it informs the
475 * callee that the handle has been destroyed.
476 */
477 uv__req_unregister(stream->loop, stream->shutdown_req);
478 stream->shutdown_req->cb(stream->shutdown_req, UV_ECANCELED);
479 stream->shutdown_req = NULL;
480 }
481
482 assert(stream->write_queue_size == 0);
483}
484
485
486/* Implements a best effort approach to mitigating accept() EMFILE errors.
487 * We have a spare file descriptor stashed away that we close to get below
488 * the EMFILE limit. Next, we accept all pending connections and close them
489 * immediately to signal the clients that we're overloaded - and we are, but
490 * we still keep on trucking.
491 *
492 * There is one caveat: it's not reliable in a multi-threaded environment.
493 * The file descriptor limit is per process. Our party trick fails if another
494 * thread opens a file or creates a socket in the time window between us
495 * calling close() and accept().
496 */
497static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
498 int err;
499 int emfile_fd;
500
501 if (loop->emfile_fd == -1)
502 return UV_EMFILE;
503
504 uv__close(loop->emfile_fd);
505 loop->emfile_fd = -1;
506
507 do {
508 err = uv__accept(accept_fd);
509 if (err >= 0)
510 uv__close(err);
511 } while (err >= 0 || err == UV_EINTR);
512
513 emfile_fd = uv__open_cloexec("/", O_RDONLY);
514 if (emfile_fd >= 0)
515 loop->emfile_fd = emfile_fd;
516
517 return err;
518}
519
520
521#if defined(UV_HAVE_KQUEUE)
522# define UV_DEC_BACKLOG(w) w->rcount--;
523#else
524# define UV_DEC_BACKLOG(w) /* no-op */
525#endif /* defined(UV_HAVE_KQUEUE) */
526
527
528void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
529 uv_stream_t* stream;
530 int err;
531
532 stream = container_of(w, uv_stream_t, io_watcher);
533 assert(events & POLLIN);
534 assert(stream->accepted_fd == -1);
535 assert(!(stream->flags & UV_HANDLE_CLOSING));
536
537 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
538
539 /* connection_cb can close the server socket while we're
540 * in the loop so check it on each iteration.
541 */
542 while (uv__stream_fd(stream) != -1) {
543 assert(stream->accepted_fd == -1);
544
545#if defined(UV_HAVE_KQUEUE)
546 if (w->rcount <= 0)
547 return;
548#endif /* defined(UV_HAVE_KQUEUE) */
549
550 err = uv__accept(uv__stream_fd(stream));
551 if (err < 0) {
552 if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
553 return; /* Not an error. */
554
555 if (err == UV_ECONNABORTED)
556 continue; /* Ignore. Nothing we can do about that. */
557
558 if (err == UV_EMFILE || err == UV_ENFILE) {
559 err = uv__emfile_trick(loop, uv__stream_fd(stream));
560 if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
561 break;
562 }
563
564 stream->connection_cb(stream, err);
565 continue;
566 }
567
568 UV_DEC_BACKLOG(w)
569 stream->accepted_fd = err;
570 stream->connection_cb(stream, 0);
571
572 if (stream->accepted_fd != -1) {
573 /* The user hasn't yet accepted called uv_accept() */
574 uv__io_stop(loop, &stream->io_watcher, POLLIN);
575 return;
576 }
577
578 if (stream->type == UV_TCP &&
579 (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {
580 /* Give other processes a chance to accept connections. */
581 struct timespec timeout = { 0, 1 };
582 nanosleep(&timeout, NULL);
583 }
584 }
585}
586
587
588#undef UV_DEC_BACKLOG
589
590
591int uv_accept(uv_stream_t* server, uv_stream_t* client) {
592 int err;
593
594 assert(server->loop == client->loop);
595
596 if (server->accepted_fd == -1)
597 return UV_EAGAIN;
598
599 switch (client->type) {
600 case UV_NAMED_PIPE:
601 case UV_TCP:
602 err = uv__stream_open(client,
603 server->accepted_fd,
604 UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
605 if (err) {
606 /* TODO handle error */
607 uv__close(server->accepted_fd);
608 goto done;
609 }
610 break;
611
612 case UV_UDP:
613 err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
614 if (err) {
615 uv__close(server->accepted_fd);
616 goto done;
617 }
618 break;
619
620 default:
621 return UV_EINVAL;
622 }
623
624 client->flags |= UV_HANDLE_BOUND;
625
626done:
627 /* Process queued fds */
628 if (server->queued_fds != NULL) {
629 uv__stream_queued_fds_t* queued_fds;
630
631 queued_fds = server->queued_fds;
632
633 /* Read first */
634 server->accepted_fd = queued_fds->fds[0];
635
636 /* All read, free */
637 assert(queued_fds->offset > 0);
638 if (--queued_fds->offset == 0) {
639 uv__free(queued_fds);
640 server->queued_fds = NULL;
641 } else {
642 /* Shift rest */
643 memmove(queued_fds->fds,
644 queued_fds->fds + 1,
645 queued_fds->offset * sizeof(*queued_fds->fds));
646 }
647 } else {
648 server->accepted_fd = -1;
649 if (err == 0)
650 uv__io_start(server->loop, &server->io_watcher, POLLIN);
651 }
652 return err;
653}
654
655
656int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
657 int err;
658
659 switch (stream->type) {
660 case UV_TCP:
661 err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
662 break;
663
664 case UV_NAMED_PIPE:
665 err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
666 break;
667
668 default:
669 err = UV_EINVAL;
670 }
671
672 if (err == 0)
673 uv__handle_start(stream);
674
675 return err;
676}
677
678
679static void uv__drain(uv_stream_t* stream) {
680 uv_shutdown_t* req;
681 int err;
682
683 assert(QUEUE_EMPTY(&stream->write_queue));
684 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
685 uv__stream_osx_interrupt_select(stream);
686
687 /* Shutdown? */
688 if ((stream->flags & UV_HANDLE_SHUTTING) &&
689 !(stream->flags & UV_HANDLE_CLOSING) &&
690 !(stream->flags & UV_HANDLE_SHUT)) {
691 assert(stream->shutdown_req);
692
693 req = stream->shutdown_req;
694 stream->shutdown_req = NULL;
695 stream->flags &= ~UV_HANDLE_SHUTTING;
696 uv__req_unregister(stream->loop, req);
697
698 err = 0;
699 if (shutdown(uv__stream_fd(stream), SHUT_WR))
700 err = UV__ERR(errno);
701
702 if (err == 0)
703 stream->flags |= UV_HANDLE_SHUT;
704
705 if (req->cb != NULL)
706 req->cb(req, err);
707 }
708}
709
710
711static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) {
712 if (n == 1)
713 return write(fd, vec->iov_base, vec->iov_len);
714 else
715 return writev(fd, vec, n);
716}
717
718
719static size_t uv__write_req_size(uv_write_t* req) {
720 size_t size;
721
722 assert(req->bufs != NULL);
723 size = uv__count_bufs(req->bufs + req->write_index,
724 req->nbufs - req->write_index);
725 assert(req->handle->write_queue_size >= size);
726
727 return size;
728}
729
730
731/* Returns 1 if all write request data has been written, or 0 if there is still
732 * more data to write.
733 *
734 * Note: the return value only says something about the *current* request.
735 * There may still be other write requests sitting in the queue.
736 */
737static int uv__write_req_update(uv_stream_t* stream,
738 uv_write_t* req,
739 size_t n) {
740 uv_buf_t* buf;
741 size_t len;
742
743 assert(n <= stream->write_queue_size);
744 stream->write_queue_size -= n;
745
746 buf = req->bufs + req->write_index;
747
748 do {
749 len = n < buf->len ? n : buf->len;
750 buf->base += len;
751 buf->len -= len;
752 buf += (buf->len == 0); /* Advance to next buffer if this one is empty. */
753 n -= len;
754 } while (n > 0);
755
756 req->write_index = buf - req->bufs;
757
758 return req->write_index == req->nbufs;
759}
760
761
762static void uv__write_req_finish(uv_write_t* req) {
763 uv_stream_t* stream = req->handle;
764
765 /* Pop the req off tcp->write_queue. */
766 QUEUE_REMOVE(&req->queue);
767
768 /* Only free when there was no error. On error, we touch up write_queue_size
769 * right before making the callback. The reason we don't do that right away
770 * is that a write_queue_size > 0 is our only way to signal to the user that
771 * they should stop writing - which they should if we got an error. Something
772 * to revisit in future revisions of the libuv API.
773 */
774 if (req->error == 0) {
775 if (req->bufs != req->bufsml)
776 uv__free(req->bufs);
777 req->bufs = NULL;
778 }
779
780 /* Add it to the write_completed_queue where it will have its
781 * callback called in the near future.
782 */
783 QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
784 uv__io_feed(stream->loop, &stream->io_watcher);
785}
786
787
788static int uv__handle_fd(uv_handle_t* handle) {
789 switch (handle->type) {
790 case UV_NAMED_PIPE:
791 case UV_TCP:
792 return ((uv_stream_t*) handle)->io_watcher.fd;
793
794 case UV_UDP:
795 return ((uv_udp_t*) handle)->io_watcher.fd;
796
797 default:
798 return -1;
799 }
800}
801
802static void uv__write(uv_stream_t* stream) {
803 struct iovec* iov;
804 QUEUE* q;
805 uv_write_t* req;
806 int iovmax;
807 int iovcnt;
808 ssize_t n;
809 int err;
810
811start:
812
813 assert(uv__stream_fd(stream) >= 0);
814
815 if (QUEUE_EMPTY(&stream->write_queue))
816 return;
817
818 q = QUEUE_HEAD(&stream->write_queue);
819 req = QUEUE_DATA(q, uv_write_t, queue);
820 assert(req->handle == stream);
821
822 /*
823 * Cast to iovec. We had to have our own uv_buf_t instead of iovec
824 * because Windows's WSABUF is not an iovec.
825 */
826 assert(sizeof(uv_buf_t) == sizeof(struct iovec));
827 iov = (struct iovec*) &(req->bufs[req->write_index]);
828 iovcnt = req->nbufs - req->write_index;
829
830 iovmax = uv__getiovmax();
831
832 /* Limit iov count to avoid EINVALs from writev() */
833 if (iovcnt > iovmax)
834 iovcnt = iovmax;
835
836 /*
837 * Now do the actual writev. Note that we've been updating the pointers
838 * inside the iov each time we write. So there is no need to offset it.
839 */
840
841 if (req->send_handle) {
842 int fd_to_send;
843 struct msghdr msg;
844 struct cmsghdr *cmsg;
845 union {
846 char data[64];
847 struct cmsghdr alias;
848 } scratch;
849
850 if (uv__is_closing(req->send_handle)) {
851 err = UV_EBADF;
852 goto error;
853 }
854
855 fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
856
857 memset(&scratch, 0, sizeof(scratch));
858
859 assert(fd_to_send >= 0);
860
861 msg.msg_name = NULL;
862 msg.msg_namelen = 0;
863 msg.msg_iov = iov;
864 msg.msg_iovlen = iovcnt;
865 msg.msg_flags = 0;
866
867 msg.msg_control = &scratch.alias;
868 msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
869
870 cmsg = CMSG_FIRSTHDR(&msg);
871 cmsg->cmsg_level = SOL_SOCKET;
872 cmsg->cmsg_type = SCM_RIGHTS;
873 cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));
874
875 /* silence aliasing warning */
876 {
877 void* pv = CMSG_DATA(cmsg);
878 int* pi = pv;
879 *pi = fd_to_send;
880 }
881
882 do
883 n = sendmsg(uv__stream_fd(stream), &msg, 0);
884 while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
885
886 /* Ensure the handle isn't sent again in case this is a partial write. */
887 if (n >= 0)
888 req->send_handle = NULL;
889 } else {
890 do
891 n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
892 while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
893 }
894
895 if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) {
896 err = UV__ERR(errno);
897 goto error;
898 }
899
900 if (n >= 0 && uv__write_req_update(stream, req, n)) {
901 uv__write_req_finish(req);
902 return; /* TODO(bnoordhuis) Start trying to write the next request. */
903 }
904
905 /* If this is a blocking stream, try again. */
906 if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
907 goto start;
908
909 /* We're not done. */
910 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
911
912 /* Notify select() thread about state change */
913 uv__stream_osx_interrupt_select(stream);
914
915 return;
916
917error:
918 req->error = err;
919 uv__write_req_finish(req);
920 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
921 if (!uv__io_active(&stream->io_watcher, POLLIN))
922 uv__handle_stop(stream);
923 uv__stream_osx_interrupt_select(stream);
924}
925
926
927static void uv__write_callbacks(uv_stream_t* stream) {
928 uv_write_t* req;
929 QUEUE* q;
930 QUEUE pq;
931
932 if (QUEUE_EMPTY(&stream->write_completed_queue))
933 return;
934
935 QUEUE_MOVE(&stream->write_completed_queue, &pq);
936
937 while (!QUEUE_EMPTY(&pq)) {
938 /* Pop a req off write_completed_queue. */
939 q = QUEUE_HEAD(&pq);
940 req = QUEUE_DATA(q, uv_write_t, queue);
941 QUEUE_REMOVE(q);
942 uv__req_unregister(stream->loop, req);
943
944 if (req->bufs != NULL) {
945 stream->write_queue_size -= uv__write_req_size(req);
946 if (req->bufs != req->bufsml)
947 uv__free(req->bufs);
948 req->bufs = NULL;
949 }
950
951 /* NOTE: call callback AFTER freeing the request data. */
952 if (req->cb)
953 req->cb(req, req->error);
954 }
955}
956
957
958uv_handle_type uv__handle_type(int fd) {
959 struct sockaddr_storage ss;
960 socklen_t sslen;
961 socklen_t len;
962 int type;
963
964 memset(&ss, 0, sizeof(ss));
965 sslen = sizeof(ss);
966
967 if (getsockname(fd, (struct sockaddr*)&ss, &sslen))
968 return UV_UNKNOWN_HANDLE;
969
970 len = sizeof type;
971
972 if (getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &len))
973 return UV_UNKNOWN_HANDLE;
974
975 if (type == SOCK_STREAM) {
976#if defined(_AIX) || defined(__DragonFly__)
977 /* on AIX/DragonFly the getsockname call returns an empty sa structure
978 * for sockets of type AF_UNIX. For all other types it will
979 * return a properly filled in structure.
980 */
981 if (sslen == 0)
982 return UV_NAMED_PIPE;
983#endif
984 switch (ss.ss_family) {
985 case AF_UNIX:
986 return UV_NAMED_PIPE;
987 case AF_INET:
988 case AF_INET6:
989 return UV_TCP;
990 }
991 }
992
993 if (type == SOCK_DGRAM &&
994 (ss.ss_family == AF_INET || ss.ss_family == AF_INET6))
995 return UV_UDP;
996
997 return UV_UNKNOWN_HANDLE;
998}
999
1000
1001static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
1002 stream->flags |= UV_HANDLE_READ_EOF;
1003 stream->flags &= ~UV_HANDLE_READING;
1004 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1005 if (!uv__io_active(&stream->io_watcher, POLLOUT))
1006 uv__handle_stop(stream);
1007 uv__stream_osx_interrupt_select(stream);
1008 stream->read_cb(stream, UV_EOF, buf);
1009}
1010
1011
1012static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
1013 uv__stream_queued_fds_t* queued_fds;
1014 unsigned int queue_size;
1015
1016 queued_fds = stream->queued_fds;
1017 if (queued_fds == NULL) {
1018 queue_size = 8;
1019 queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) +
1020 sizeof(*queued_fds));
1021 if (queued_fds == NULL)
1022 return UV_ENOMEM;
1023 queued_fds->size = queue_size;
1024 queued_fds->offset = 0;
1025 stream->queued_fds = queued_fds;
1026
1027 /* Grow */
1028 } else if (queued_fds->size == queued_fds->offset) {
1029 queue_size = queued_fds->size + 8;
1030 queued_fds = uv__realloc(queued_fds,
1031 (queue_size - 1) * sizeof(*queued_fds->fds) +
1032 sizeof(*queued_fds));
1033
1034 /*
1035 * Allocation failure, report back.
1036 * NOTE: if it is fatal - sockets will be closed in uv__stream_close
1037 */
1038 if (queued_fds == NULL)
1039 return UV_ENOMEM;
1040 queued_fds->size = queue_size;
1041 stream->queued_fds = queued_fds;
1042 }
1043
1044 /* Put fd in a queue */
1045 queued_fds->fds[queued_fds->offset++] = fd;
1046
1047 return 0;
1048}
1049
1050
1051#if defined(__PASE__)
1052/* on IBMi PASE the control message length can not exceed 256. */
1053# define UV__CMSG_FD_COUNT 60
1054#else
1055# define UV__CMSG_FD_COUNT 64
1056#endif
1057#define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int))
1058
1059
1060static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) {
1061 struct cmsghdr* cmsg;
1062
1063 for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
1064 char* start;
1065 char* end;
1066 int err;
1067 void* pv;
1068 int* pi;
1069 unsigned int i;
1070 unsigned int count;
1071
1072 if (cmsg->cmsg_type != SCM_RIGHTS) {
1073 fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
1074 cmsg->cmsg_type);
1075 continue;
1076 }
1077
1078 /* silence aliasing warning */
1079 pv = CMSG_DATA(cmsg);
1080 pi = pv;
1081
1082 /* Count available fds */
1083 start = (char*) cmsg;
1084 end = (char*) cmsg + cmsg->cmsg_len;
1085 count = 0;
1086 while (start + CMSG_LEN(count * sizeof(*pi)) < end)
1087 count++;
1088 assert(start + CMSG_LEN(count * sizeof(*pi)) == end);
1089
1090 for (i = 0; i < count; i++) {
1091 /* Already has accepted fd, queue now */
1092 if (stream->accepted_fd != -1) {
1093 err = uv__stream_queue_fd(stream, pi[i]);
1094 if (err != 0) {
1095 /* Close rest */
1096 for (; i < count; i++)
1097 uv__close(pi[i]);
1098 return err;
1099 }
1100 } else {
1101 stream->accepted_fd = pi[i];
1102 }
1103 }
1104 }
1105
1106 return 0;
1107}
1108
1109
1110#ifdef __clang__
1111# pragma clang diagnostic push
1112# pragma clang diagnostic ignored "-Wgnu-folding-constant"
1113# pragma clang diagnostic ignored "-Wvla-extension"
1114#endif
1115
1116static void uv__read(uv_stream_t* stream) {
1117 uv_buf_t buf;
1118 ssize_t nread;
1119 struct msghdr msg;
1120 char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
1121 int count;
1122 int err;
1123 int is_ipc;
1124
1125 stream->flags &= ~UV_HANDLE_READ_PARTIAL;
1126
1127 /* Prevent loop starvation when the data comes in as fast as (or faster than)
1128 * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
1129 */
1130 count = 32;
1131
1132 is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
1133
1134 /* XXX: Maybe instead of having UV_HANDLE_READING we just test if
1135 * tcp->read_cb is NULL or not?
1136 */
1137 while (stream->read_cb
1138 && (stream->flags & UV_HANDLE_READING)
1139 && (count-- > 0)) {
1140 assert(stream->alloc_cb != NULL);
1141
1142 buf = uv_buf_init(NULL, 0);
1143 stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
1144 if (buf.base == NULL || buf.len == 0) {
1145 /* User indicates it can't or won't handle the read. */
1146 stream->read_cb(stream, UV_ENOBUFS, &buf);
1147 return;
1148 }
1149
1150 assert(buf.base != NULL);
1151 assert(uv__stream_fd(stream) >= 0);
1152
1153 if (!is_ipc) {
1154 do {
1155 nread = read(uv__stream_fd(stream), buf.base, buf.len);
1156 }
1157 while (nread < 0 && errno == EINTR);
1158 } else {
1159 /* ipc uses recvmsg */
1160 msg.msg_flags = 0;
1161 msg.msg_iov = (struct iovec*) &buf;
1162 msg.msg_iovlen = 1;
1163 msg.msg_name = NULL;
1164 msg.msg_namelen = 0;
1165 /* Set up to receive a descriptor even if one isn't in the message */
1166 msg.msg_controllen = sizeof(cmsg_space);
1167 msg.msg_control = cmsg_space;
1168
1169 do {
1170 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1171 }
1172 while (nread < 0 && errno == EINTR);
1173 }
1174
1175 if (nread < 0) {
1176 /* Error */
1177 if (errno == EAGAIN || errno == EWOULDBLOCK) {
1178 /* Wait for the next one. */
1179 if (stream->flags & UV_HANDLE_READING) {
1180 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1181 uv__stream_osx_interrupt_select(stream);
1182 }
1183 stream->read_cb(stream, 0, &buf);
1184#if defined(__CYGWIN__) || defined(__MSYS__)
1185 } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
1186 uv__stream_eof(stream, &buf);
1187 return;
1188#endif
1189 } else {
1190 /* Error. User should call uv_close(). */
1191 stream->read_cb(stream, UV__ERR(errno), &buf);
1192 if (stream->flags & UV_HANDLE_READING) {
1193 stream->flags &= ~UV_HANDLE_READING;
1194 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1195 if (!uv__io_active(&stream->io_watcher, POLLOUT))
1196 uv__handle_stop(stream);
1197 uv__stream_osx_interrupt_select(stream);
1198 }
1199 }
1200 return;
1201 } else if (nread == 0) {
1202 uv__stream_eof(stream, &buf);
1203 return;
1204 } else {
1205 /* Successful read */
1206 ssize_t buflen = buf.len;
1207
1208 if (is_ipc) {
1209 err = uv__stream_recv_cmsg(stream, &msg);
1210 if (err != 0) {
1211 stream->read_cb(stream, err, &buf);
1212 return;
1213 }
1214 }
1215
1216#if defined(__MVS__)
1217 if (is_ipc && msg.msg_controllen > 0) {
1218 uv_buf_t blankbuf;
1219 int nread;
1220 struct iovec *old;
1221
1222 blankbuf.base = 0;
1223 blankbuf.len = 0;
1224 old = msg.msg_iov;
1225 msg.msg_iov = (struct iovec*) &blankbuf;
1226 nread = 0;
1227 do {
1228 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
1229 err = uv__stream_recv_cmsg(stream, &msg);
1230 if (err != 0) {
1231 stream->read_cb(stream, err, &buf);
1232 msg.msg_iov = old;
1233 return;
1234 }
1235 } while (nread == 0 && msg.msg_controllen > 0);
1236 msg.msg_iov = old;
1237 }
1238#endif
1239 stream->read_cb(stream, nread, &buf);
1240
1241 /* Return if we didn't fill the buffer, there is no more data to read. */
1242 if (nread < buflen) {
1243 stream->flags |= UV_HANDLE_READ_PARTIAL;
1244 return;
1245 }
1246 }
1247 }
1248}
1249
1250
1251#ifdef __clang__
1252# pragma clang diagnostic pop
1253#endif
1254
1255#undef UV__CMSG_FD_COUNT
1256#undef UV__CMSG_FD_SIZE
1257
1258
1259int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
1260 assert(stream->type == UV_TCP ||
1261 stream->type == UV_TTY ||
1262 stream->type == UV_NAMED_PIPE);
1263
1264 if (!(stream->flags & UV_HANDLE_WRITABLE) ||
1265 stream->flags & UV_HANDLE_SHUT ||
1266 stream->flags & UV_HANDLE_SHUTTING ||
1267 uv__is_closing(stream)) {
1268 return UV_ENOTCONN;
1269 }
1270
1271 assert(uv__stream_fd(stream) >= 0);
1272
1273 /* Initialize request */
1274 uv__req_init(stream->loop, req, UV_SHUTDOWN);
1275 req->handle = stream;
1276 req->cb = cb;
1277 stream->shutdown_req = req;
1278 stream->flags |= UV_HANDLE_SHUTTING;
1279
1280 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1281 uv__stream_osx_interrupt_select(stream);
1282
1283 return 0;
1284}
1285
1286
1287static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
1288 uv_stream_t* stream;
1289
1290 stream = container_of(w, uv_stream_t, io_watcher);
1291
1292 assert(stream->type == UV_TCP ||
1293 stream->type == UV_NAMED_PIPE ||
1294 stream->type == UV_TTY);
1295 assert(!(stream->flags & UV_HANDLE_CLOSING));
1296
1297 if (stream->connect_req) {
1298 uv__stream_connect(stream);
1299 return;
1300 }
1301
1302 assert(uv__stream_fd(stream) >= 0);
1303
1304 /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
1305 if (events & (POLLIN | POLLERR | POLLHUP))
1306 uv__read(stream);
1307
1308 if (uv__stream_fd(stream) == -1)
1309 return; /* read_cb closed stream. */
1310
1311 /* Short-circuit iff POLLHUP is set, the user is still interested in read
1312 * events and uv__read() reported a partial read but not EOF. If the EOF
1313 * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
1314 * have to do anything. If the partial read flag is not set, we can't
1315 * report the EOF yet because there is still data to read.
1316 */
1317 if ((events & POLLHUP) &&
1318 (stream->flags & UV_HANDLE_READING) &&
1319 (stream->flags & UV_HANDLE_READ_PARTIAL) &&
1320 !(stream->flags & UV_HANDLE_READ_EOF)) {
1321 uv_buf_t buf = { NULL, 0 };
1322 uv__stream_eof(stream, &buf);
1323 }
1324
1325 if (uv__stream_fd(stream) == -1)
1326 return; /* read_cb closed stream. */
1327
1328 if (events & (POLLOUT | POLLERR | POLLHUP)) {
1329 uv__write(stream);
1330 uv__write_callbacks(stream);
1331
1332 /* Write queue drained. */
1333 if (QUEUE_EMPTY(&stream->write_queue))
1334 uv__drain(stream);
1335 }
1336}
1337
1338
1339/**
1340 * We get called here from directly following a call to connect(2).
1341 * In order to determine if we've errored out or succeeded must call
1342 * getsockopt.
1343 */
1344static void uv__stream_connect(uv_stream_t* stream) {
1345 int error;
1346 uv_connect_t* req = stream->connect_req;
1347 socklen_t errorsize = sizeof(int);
1348
1349 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
1350 assert(req);
1351
1352 if (stream->delayed_error) {
1353 /* To smooth over the differences between unixes errors that
1354 * were reported synchronously on the first connect can be delayed
1355 * until the next tick--which is now.
1356 */
1357 error = stream->delayed_error;
1358 stream->delayed_error = 0;
1359 } else {
1360 /* Normal situation: we need to get the socket error from the kernel. */
1361 assert(uv__stream_fd(stream) >= 0);
1362 getsockopt(uv__stream_fd(stream),
1363 SOL_SOCKET,
1364 SO_ERROR,
1365 &error,
1366 &errorsize);
1367 error = UV__ERR(error);
1368 }
1369
1370 if (error == UV__ERR(EINPROGRESS))
1371 return;
1372
1373 stream->connect_req = NULL;
1374 uv__req_unregister(stream->loop, req);
1375
1376 if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) {
1377 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
1378 }
1379
1380 if (req->cb)
1381 req->cb(req, error);
1382
1383 if (uv__stream_fd(stream) == -1)
1384 return;
1385
1386 if (error < 0) {
1387 uv__stream_flush_write_queue(stream, UV_ECANCELED);
1388 uv__write_callbacks(stream);
1389 }
1390}
1391
1392
1393int uv_write2(uv_write_t* req,
1394 uv_stream_t* stream,
1395 const uv_buf_t bufs[],
1396 unsigned int nbufs,
1397 uv_stream_t* send_handle,
1398 uv_write_cb cb) {
1399 int empty_queue;
1400
1401 assert(nbufs > 0);
1402 assert((stream->type == UV_TCP ||
1403 stream->type == UV_NAMED_PIPE ||
1404 stream->type == UV_TTY) &&
1405 "uv_write (unix) does not yet support other types of streams");
1406
1407 if (uv__stream_fd(stream) < 0)
1408 return UV_EBADF;
1409
1410 if (!(stream->flags & UV_HANDLE_WRITABLE))
1411 return UV_EPIPE;
1412
1413 if (send_handle) {
1414 if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
1415 return UV_EINVAL;
1416
1417 /* XXX We abuse uv_write2() to send over UDP handles to child processes.
1418 * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
1419 * evaluates to a function that operates on a uv_stream_t with a couple of
1420 * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
1421 * which works but only by accident.
1422 */
1423 if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
1424 return UV_EBADF;
1425
1426#if defined(__CYGWIN__) || defined(__MSYS__)
1427 /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it.
1428 See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */
1429 return UV_ENOSYS;
1430#endif
1431 }
1432
1433 /* It's legal for write_queue_size > 0 even when the write_queue is empty;
1434 * it means there are error-state requests in the write_completed_queue that
1435 * will touch up write_queue_size later, see also uv__write_req_finish().
1436 * We could check that write_queue is empty instead but that implies making
1437 * a write() syscall when we know that the handle is in error mode.
1438 */
1439 empty_queue = (stream->write_queue_size == 0);
1440
1441 /* Initialize the req */
1442 uv__req_init(stream->loop, req, UV_WRITE);
1443 req->cb = cb;
1444 req->handle = stream;
1445 req->error = 0;
1446 req->send_handle = send_handle;
1447 QUEUE_INIT(&req->queue);
1448
1449 req->bufs = req->bufsml;
1450 if (nbufs > ARRAY_SIZE(req->bufsml))
1451 req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
1452
1453 if (req->bufs == NULL)
1454 return UV_ENOMEM;
1455
1456 memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
1457 req->nbufs = nbufs;
1458 req->write_index = 0;
1459 stream->write_queue_size += uv__count_bufs(bufs, nbufs);
1460
1461 /* Append the request to write_queue. */
1462 QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
1463
1464 /* If the queue was empty when this function began, we should attempt to
1465 * do the write immediately. Otherwise start the write_watcher and wait
1466 * for the fd to become writable.
1467 */
1468 if (stream->connect_req) {
1469 /* Still connecting, do nothing. */
1470 }
1471 else if (empty_queue) {
1472 uv__write(stream);
1473 }
1474 else {
1475 /*
1476 * blocking streams should never have anything in the queue.
1477 * if this assert fires then somehow the blocking stream isn't being
1478 * sufficiently flushed in uv__write.
1479 */
1480 assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
1481 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
1482 uv__stream_osx_interrupt_select(stream);
1483 }
1484
1485 return 0;
1486}
1487
1488
1489/* The buffers to be written must remain valid until the callback is called.
1490 * This is not required for the uv_buf_t array.
1491 */
1492int uv_write(uv_write_t* req,
1493 uv_stream_t* handle,
1494 const uv_buf_t bufs[],
1495 unsigned int nbufs,
1496 uv_write_cb cb) {
1497 return uv_write2(req, handle, bufs, nbufs, NULL, cb);
1498}
1499
1500
1501void uv_try_write_cb(uv_write_t* req, int status) {
1502 /* Should not be called */
1503 abort();
1504}
1505
1506
1507int uv_try_write(uv_stream_t* stream,
1508 const uv_buf_t bufs[],
1509 unsigned int nbufs) {
1510 int r;
1511 int has_pollout;
1512 size_t written;
1513 size_t req_size;
1514 uv_write_t req;
1515
1516 /* Connecting or already writing some data */
1517 if (stream->connect_req != NULL || stream->write_queue_size != 0)
1518 return UV_EAGAIN;
1519
1520 has_pollout = uv__io_active(&stream->io_watcher, POLLOUT);
1521
1522 r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb);
1523 if (r != 0)
1524 return r;
1525
1526 /* Remove not written bytes from write queue size */
1527 written = uv__count_bufs(bufs, nbufs);
1528 if (req.bufs != NULL)
1529 req_size = uv__write_req_size(&req);
1530 else
1531 req_size = 0;
1532 written -= req_size;
1533 stream->write_queue_size -= req_size;
1534
1535 /* Unqueue request, regardless of immediateness */
1536 QUEUE_REMOVE(&req.queue);
1537 uv__req_unregister(stream->loop, &req);
1538 if (req.bufs != req.bufsml)
1539 uv__free(req.bufs);
1540 req.bufs = NULL;
1541
1542 /* Do not poll for writable, if we wasn't before calling this */
1543 if (!has_pollout) {
1544 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
1545 uv__stream_osx_interrupt_select(stream);
1546 }
1547
1548 if (written == 0 && req_size != 0)
1549 return req.error < 0 ? req.error : UV_EAGAIN;
1550 else
1551 return written;
1552}
1553
1554
1555int uv__read_start(uv_stream_t* stream,
1556 uv_alloc_cb alloc_cb,
1557 uv_read_cb read_cb) {
1558 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
1559 stream->type == UV_TTY);
1560
1561 /* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just
1562 * expresses the desired state of the user.
1563 */
1564 stream->flags |= UV_HANDLE_READING;
1565
1566 /* TODO: try to do the read inline? */
1567 /* TODO: keep track of tcp state. If we've gotten a EOF then we should
1568 * not start the IO watcher.
1569 */
1570 assert(uv__stream_fd(stream) >= 0);
1571 assert(alloc_cb);
1572
1573 stream->read_cb = read_cb;
1574 stream->alloc_cb = alloc_cb;
1575
1576 uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
1577 uv__handle_start(stream);
1578 uv__stream_osx_interrupt_select(stream);
1579
1580 return 0;
1581}
1582
1583
1584int uv_read_stop(uv_stream_t* stream) {
1585 if (!(stream->flags & UV_HANDLE_READING))
1586 return 0;
1587
1588 stream->flags &= ~UV_HANDLE_READING;
1589 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
1590 if (!uv__io_active(&stream->io_watcher, POLLOUT))
1591 uv__handle_stop(stream);
1592 uv__stream_osx_interrupt_select(stream);
1593
1594 stream->read_cb = NULL;
1595 stream->alloc_cb = NULL;
1596 return 0;
1597}
1598
1599
1600int uv_is_readable(const uv_stream_t* stream) {
1601 return !!(stream->flags & UV_HANDLE_READABLE);
1602}
1603
1604
1605int uv_is_writable(const uv_stream_t* stream) {
1606 return !!(stream->flags & UV_HANDLE_WRITABLE);
1607}
1608
1609
1610#if defined(__APPLE__)
1611int uv___stream_fd(const uv_stream_t* handle) {
1612 const uv__stream_select_t* s;
1613
1614 assert(handle->type == UV_TCP ||
1615 handle->type == UV_TTY ||
1616 handle->type == UV_NAMED_PIPE);
1617
1618 s = handle->select;
1619 if (s != NULL)
1620 return s->fd;
1621
1622 return handle->io_watcher.fd;
1623}
1624#endif /* defined(__APPLE__) */
1625
1626
1627void uv__stream_close(uv_stream_t* handle) {
1628 unsigned int i;
1629 uv__stream_queued_fds_t* queued_fds;
1630
1631#if defined(__APPLE__)
1632 /* Terminate select loop first */
1633 if (handle->select != NULL) {
1634 uv__stream_select_t* s;
1635
1636 s = handle->select;
1637
1638 uv_sem_post(&s->close_sem);
1639 uv_sem_post(&s->async_sem);
1640 uv__stream_osx_interrupt_select(handle);
1641 uv_thread_join(&s->thread);
1642 uv_sem_destroy(&s->close_sem);
1643 uv_sem_destroy(&s->async_sem);
1644 uv__close(s->fake_fd);
1645 uv__close(s->int_fd);
1646 uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
1647
1648 handle->select = NULL;
1649 }
1650#endif /* defined(__APPLE__) */
1651
1652 uv__io_close(handle->loop, &handle->io_watcher);
1653 uv_read_stop(handle);
1654 uv__handle_stop(handle);
1655 handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1656
1657 if (handle->io_watcher.fd != -1) {
1658 /* Don't close stdio file descriptors. Nothing good comes from it. */
1659 if (handle->io_watcher.fd > STDERR_FILENO)
1660 uv__close(handle->io_watcher.fd);
1661 handle->io_watcher.fd = -1;
1662 }
1663
1664 if (handle->accepted_fd != -1) {
1665 uv__close(handle->accepted_fd);
1666 handle->accepted_fd = -1;
1667 }
1668
1669 /* Close all queued fds */
1670 if (handle->queued_fds != NULL) {
1671 queued_fds = handle->queued_fds;
1672 for (i = 0; i < queued_fds->offset; i++)
1673 uv__close(queued_fds->fds[i]);
1674 uv__free(handle->queued_fds);
1675 handle->queued_fds = NULL;
1676 }
1677
1678 assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
1679}
1680
1681
1682int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
1683 /* Don't need to check the file descriptor, uv__nonblock()
1684 * will fail with EBADF if it's not valid.
1685 */
1686 return uv__nonblock(uv__stream_fd(handle), !blocking);
1687}
1688