1 | /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. |
2 | * |
3 | * Permission is hereby granted, free of charge, to any person obtaining a copy |
4 | * of this software and associated documentation files (the "Software"), to |
5 | * deal in the Software without restriction, including without limitation the |
6 | * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or |
7 | * sell copies of the Software, and to permit persons to whom the Software is |
8 | * furnished to do so, subject to the following conditions: |
9 | * |
10 | * The above copyright notice and this permission notice shall be included in |
11 | * all copies or substantial portions of the Software. |
12 | * |
13 | * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
14 | * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
15 | * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
16 | * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
17 | * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING |
18 | * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
19 | * IN THE SOFTWARE. |
20 | */ |
21 | |
22 | #include "uv.h" |
23 | #include "internal.h" |
24 | |
25 | #include <stdio.h> |
26 | #include <stdlib.h> |
27 | #include <string.h> |
28 | #include <assert.h> |
29 | #include <errno.h> |
30 | |
31 | #include <sys/types.h> |
32 | #include <sys/socket.h> |
33 | #include <sys/uio.h> |
34 | #include <sys/un.h> |
35 | #include <unistd.h> |
36 | #include <limits.h> /* IOV_MAX */ |
37 | |
38 | #if defined(__APPLE__) |
39 | # include <sys/event.h> |
40 | # include <sys/time.h> |
41 | # include <sys/select.h> |
42 | |
43 | /* Forward declaration */ |
44 | typedef struct uv__stream_select_s uv__stream_select_t; |
45 | |
46 | struct uv__stream_select_s { |
47 | uv_stream_t* stream; |
48 | uv_thread_t thread; |
49 | uv_sem_t close_sem; |
50 | uv_sem_t async_sem; |
51 | uv_async_t async; |
52 | int events; |
53 | int fake_fd; |
54 | int int_fd; |
55 | int fd; |
56 | fd_set* sread; |
57 | size_t sread_sz; |
58 | fd_set* swrite; |
59 | size_t swrite_sz; |
60 | }; |
61 | |
62 | /* Due to a possible kernel bug at least in OS X 10.10 "Yosemite", |
63 | * EPROTOTYPE can be returned while trying to write to a socket that is |
64 | * shutting down. If we retry the write, we should get the expected EPIPE |
65 | * instead. |
66 | */ |
67 | # define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR || errno == EPROTOTYPE) |
68 | # define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \ |
69 | (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS || \ |
70 | (errno == EMSGSIZE && send_handle != NULL)) |
71 | #else |
72 | # define RETRY_ON_WRITE_ERROR(errno) (errno == EINTR) |
73 | # define IS_TRANSIENT_WRITE_ERROR(errno, send_handle) \ |
74 | (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) |
75 | #endif /* defined(__APPLE__) */ |
76 | |
77 | static void uv__stream_connect(uv_stream_t*); |
78 | static void uv__write(uv_stream_t* stream); |
79 | static void uv__read(uv_stream_t* stream); |
80 | static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events); |
81 | static void uv__write_callbacks(uv_stream_t* stream); |
82 | static size_t uv__write_req_size(uv_write_t* req); |
83 | |
84 | |
85 | void uv__stream_init(uv_loop_t* loop, |
86 | uv_stream_t* stream, |
87 | uv_handle_type type) { |
88 | int err; |
89 | |
90 | uv__handle_init(loop, (uv_handle_t*)stream, type); |
91 | stream->read_cb = NULL; |
92 | stream->alloc_cb = NULL; |
93 | stream->close_cb = NULL; |
94 | stream->connection_cb = NULL; |
95 | stream->connect_req = NULL; |
96 | stream->shutdown_req = NULL; |
97 | stream->accepted_fd = -1; |
98 | stream->queued_fds = NULL; |
99 | stream->delayed_error = 0; |
100 | QUEUE_INIT(&stream->write_queue); |
101 | QUEUE_INIT(&stream->write_completed_queue); |
102 | stream->write_queue_size = 0; |
103 | |
104 | if (loop->emfile_fd == -1) { |
105 | err = uv__open_cloexec("/dev/null" , O_RDONLY); |
106 | if (err < 0) |
107 | /* In the rare case that "/dev/null" isn't mounted open "/" |
108 | * instead. |
109 | */ |
110 | err = uv__open_cloexec("/" , O_RDONLY); |
111 | if (err >= 0) |
112 | loop->emfile_fd = err; |
113 | } |
114 | |
115 | #if defined(__APPLE__) |
116 | stream->select = NULL; |
117 | #endif /* defined(__APPLE_) */ |
118 | |
119 | uv__io_init(&stream->io_watcher, uv__stream_io, -1); |
120 | } |
121 | |
122 | |
123 | static void uv__stream_osx_interrupt_select(uv_stream_t* stream) { |
124 | #if defined(__APPLE__) |
125 | /* Notify select() thread about state change */ |
126 | uv__stream_select_t* s; |
127 | int r; |
128 | |
129 | s = stream->select; |
130 | if (s == NULL) |
131 | return; |
132 | |
133 | /* Interrupt select() loop |
134 | * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will |
135 | * emit read event on other side |
136 | */ |
137 | do |
138 | r = write(s->fake_fd, "x" , 1); |
139 | while (r == -1 && errno == EINTR); |
140 | |
141 | assert(r == 1); |
142 | #else /* !defined(__APPLE__) */ |
143 | /* No-op on any other platform */ |
144 | #endif /* !defined(__APPLE__) */ |
145 | } |
146 | |
147 | |
148 | #if defined(__APPLE__) |
149 | static void uv__stream_osx_select(void* arg) { |
150 | uv_stream_t* stream; |
151 | uv__stream_select_t* s; |
152 | char buf[1024]; |
153 | int events; |
154 | int fd; |
155 | int r; |
156 | int max_fd; |
157 | |
158 | stream = arg; |
159 | s = stream->select; |
160 | fd = s->fd; |
161 | |
162 | if (fd > s->int_fd) |
163 | max_fd = fd; |
164 | else |
165 | max_fd = s->int_fd; |
166 | |
167 | while (1) { |
168 | /* Terminate on semaphore */ |
169 | if (uv_sem_trywait(&s->close_sem) == 0) |
170 | break; |
171 | |
172 | /* Watch fd using select(2) */ |
173 | memset(s->sread, 0, s->sread_sz); |
174 | memset(s->swrite, 0, s->swrite_sz); |
175 | |
176 | if (uv__io_active(&stream->io_watcher, POLLIN)) |
177 | FD_SET(fd, s->sread); |
178 | if (uv__io_active(&stream->io_watcher, POLLOUT)) |
179 | FD_SET(fd, s->swrite); |
180 | FD_SET(s->int_fd, s->sread); |
181 | |
182 | /* Wait indefinitely for fd events */ |
183 | r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL); |
184 | if (r == -1) { |
185 | if (errno == EINTR) |
186 | continue; |
187 | |
188 | /* XXX: Possible?! */ |
189 | abort(); |
190 | } |
191 | |
192 | /* Ignore timeouts */ |
193 | if (r == 0) |
194 | continue; |
195 | |
196 | /* Empty socketpair's buffer in case of interruption */ |
197 | if (FD_ISSET(s->int_fd, s->sread)) |
198 | while (1) { |
199 | r = read(s->int_fd, buf, sizeof(buf)); |
200 | |
201 | if (r == sizeof(buf)) |
202 | continue; |
203 | |
204 | if (r != -1) |
205 | break; |
206 | |
207 | if (errno == EAGAIN || errno == EWOULDBLOCK) |
208 | break; |
209 | |
210 | if (errno == EINTR) |
211 | continue; |
212 | |
213 | abort(); |
214 | } |
215 | |
216 | /* Handle events */ |
217 | events = 0; |
218 | if (FD_ISSET(fd, s->sread)) |
219 | events |= POLLIN; |
220 | if (FD_ISSET(fd, s->swrite)) |
221 | events |= POLLOUT; |
222 | |
223 | assert(events != 0 || FD_ISSET(s->int_fd, s->sread)); |
224 | if (events != 0) { |
225 | ACCESS_ONCE(int, s->events) = events; |
226 | |
227 | uv_async_send(&s->async); |
228 | uv_sem_wait(&s->async_sem); |
229 | |
230 | /* Should be processed at this stage */ |
231 | assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING)); |
232 | } |
233 | } |
234 | } |
235 | |
236 | |
237 | static void uv__stream_osx_select_cb(uv_async_t* handle) { |
238 | uv__stream_select_t* s; |
239 | uv_stream_t* stream; |
240 | int events; |
241 | |
242 | s = container_of(handle, uv__stream_select_t, async); |
243 | stream = s->stream; |
244 | |
245 | /* Get and reset stream's events */ |
246 | events = s->events; |
247 | ACCESS_ONCE(int, s->events) = 0; |
248 | |
249 | assert(events != 0); |
250 | assert(events == (events & (POLLIN | POLLOUT))); |
251 | |
252 | /* Invoke callback on event-loop */ |
253 | if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN)) |
254 | uv__stream_io(stream->loop, &stream->io_watcher, POLLIN); |
255 | |
256 | if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT)) |
257 | uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT); |
258 | |
259 | if (stream->flags & UV_HANDLE_CLOSING) |
260 | return; |
261 | |
262 | /* NOTE: It is important to do it here, otherwise `select()` might be called |
263 | * before the actual `uv__read()`, leading to the blocking syscall |
264 | */ |
265 | uv_sem_post(&s->async_sem); |
266 | } |
267 | |
268 | |
269 | static void uv__stream_osx_cb_close(uv_handle_t* async) { |
270 | uv__stream_select_t* s; |
271 | |
272 | s = container_of(async, uv__stream_select_t, async); |
273 | uv__free(s); |
274 | } |
275 | |
276 | |
277 | int uv__stream_try_select(uv_stream_t* stream, int* fd) { |
278 | /* |
279 | * kqueue doesn't work with some files from /dev mount on osx. |
280 | * select(2) in separate thread for those fds |
281 | */ |
282 | |
283 | struct kevent filter[1]; |
284 | struct kevent events[1]; |
285 | struct timespec timeout; |
286 | uv__stream_select_t* s; |
287 | int fds[2]; |
288 | int err; |
289 | int ret; |
290 | int kq; |
291 | int old_fd; |
292 | int max_fd; |
293 | size_t sread_sz; |
294 | size_t swrite_sz; |
295 | |
296 | kq = kqueue(); |
297 | if (kq == -1) { |
298 | perror("(libuv) kqueue()" ); |
299 | return UV__ERR(errno); |
300 | } |
301 | |
302 | EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0); |
303 | |
304 | /* Use small timeout, because we only want to capture EINVALs */ |
305 | timeout.tv_sec = 0; |
306 | timeout.tv_nsec = 1; |
307 | |
308 | do |
309 | ret = kevent(kq, filter, 1, events, 1, &timeout); |
310 | while (ret == -1 && errno == EINTR); |
311 | |
312 | uv__close(kq); |
313 | |
314 | if (ret == -1) |
315 | return UV__ERR(errno); |
316 | |
317 | if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL) |
318 | return 0; |
319 | |
320 | /* At this point we definitely know that this fd won't work with kqueue */ |
321 | |
322 | /* |
323 | * Create fds for io watcher and to interrupt the select() loop. |
324 | * NOTE: do it ahead of malloc below to allocate enough space for fd_sets |
325 | */ |
326 | if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds)) |
327 | return UV__ERR(errno); |
328 | |
329 | max_fd = *fd; |
330 | if (fds[1] > max_fd) |
331 | max_fd = fds[1]; |
332 | |
333 | sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY; |
334 | swrite_sz = sread_sz; |
335 | |
336 | s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz); |
337 | if (s == NULL) { |
338 | err = UV_ENOMEM; |
339 | goto failed_malloc; |
340 | } |
341 | |
342 | s->events = 0; |
343 | s->fd = *fd; |
344 | s->sread = (fd_set*) ((char*) s + sizeof(*s)); |
345 | s->sread_sz = sread_sz; |
346 | s->swrite = (fd_set*) ((char*) s->sread + sread_sz); |
347 | s->swrite_sz = swrite_sz; |
348 | |
349 | err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb); |
350 | if (err) |
351 | goto failed_async_init; |
352 | |
353 | s->async.flags |= UV_HANDLE_INTERNAL; |
354 | uv__handle_unref(&s->async); |
355 | |
356 | err = uv_sem_init(&s->close_sem, 0); |
357 | if (err != 0) |
358 | goto failed_close_sem_init; |
359 | |
360 | err = uv_sem_init(&s->async_sem, 0); |
361 | if (err != 0) |
362 | goto failed_async_sem_init; |
363 | |
364 | s->fake_fd = fds[0]; |
365 | s->int_fd = fds[1]; |
366 | |
367 | old_fd = *fd; |
368 | s->stream = stream; |
369 | stream->select = s; |
370 | *fd = s->fake_fd; |
371 | |
372 | err = uv_thread_create(&s->thread, uv__stream_osx_select, stream); |
373 | if (err != 0) |
374 | goto failed_thread_create; |
375 | |
376 | return 0; |
377 | |
378 | failed_thread_create: |
379 | s->stream = NULL; |
380 | stream->select = NULL; |
381 | *fd = old_fd; |
382 | |
383 | uv_sem_destroy(&s->async_sem); |
384 | |
385 | failed_async_sem_init: |
386 | uv_sem_destroy(&s->close_sem); |
387 | |
388 | failed_close_sem_init: |
389 | uv__close(fds[0]); |
390 | uv__close(fds[1]); |
391 | uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close); |
392 | return err; |
393 | |
394 | failed_async_init: |
395 | uv__free(s); |
396 | |
397 | failed_malloc: |
398 | uv__close(fds[0]); |
399 | uv__close(fds[1]); |
400 | |
401 | return err; |
402 | } |
403 | #endif /* defined(__APPLE__) */ |
404 | |
405 | |
406 | int uv__stream_open(uv_stream_t* stream, int fd, int flags) { |
407 | #if defined(__APPLE__) |
408 | int enable; |
409 | #endif |
410 | |
411 | if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd)) |
412 | return UV_EBUSY; |
413 | |
414 | assert(fd >= 0); |
415 | stream->flags |= flags; |
416 | |
417 | if (stream->type == UV_TCP) { |
418 | if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1)) |
419 | return UV__ERR(errno); |
420 | |
421 | /* TODO Use delay the user passed in. */ |
422 | if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) && |
423 | uv__tcp_keepalive(fd, 1, 60)) { |
424 | return UV__ERR(errno); |
425 | } |
426 | } |
427 | |
428 | #if defined(__APPLE__) |
429 | enable = 1; |
430 | if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) && |
431 | errno != ENOTSOCK && |
432 | errno != EINVAL) { |
433 | return UV__ERR(errno); |
434 | } |
435 | #endif |
436 | |
437 | stream->io_watcher.fd = fd; |
438 | |
439 | return 0; |
440 | } |
441 | |
442 | |
443 | void uv__stream_flush_write_queue(uv_stream_t* stream, int error) { |
444 | uv_write_t* req; |
445 | QUEUE* q; |
446 | while (!QUEUE_EMPTY(&stream->write_queue)) { |
447 | q = QUEUE_HEAD(&stream->write_queue); |
448 | QUEUE_REMOVE(q); |
449 | |
450 | req = QUEUE_DATA(q, uv_write_t, queue); |
451 | req->error = error; |
452 | |
453 | QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue); |
454 | } |
455 | } |
456 | |
457 | |
458 | void uv__stream_destroy(uv_stream_t* stream) { |
459 | assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT)); |
460 | assert(stream->flags & UV_HANDLE_CLOSED); |
461 | |
462 | if (stream->connect_req) { |
463 | uv__req_unregister(stream->loop, stream->connect_req); |
464 | stream->connect_req->cb(stream->connect_req, UV_ECANCELED); |
465 | stream->connect_req = NULL; |
466 | } |
467 | |
468 | uv__stream_flush_write_queue(stream, UV_ECANCELED); |
469 | uv__write_callbacks(stream); |
470 | |
471 | if (stream->shutdown_req) { |
472 | /* The ECANCELED error code is a lie, the shutdown(2) syscall is a |
473 | * fait accompli at this point. Maybe we should revisit this in v0.11. |
474 | * A possible reason for leaving it unchanged is that it informs the |
475 | * callee that the handle has been destroyed. |
476 | */ |
477 | uv__req_unregister(stream->loop, stream->shutdown_req); |
478 | stream->shutdown_req->cb(stream->shutdown_req, UV_ECANCELED); |
479 | stream->shutdown_req = NULL; |
480 | } |
481 | |
482 | assert(stream->write_queue_size == 0); |
483 | } |
484 | |
485 | |
486 | /* Implements a best effort approach to mitigating accept() EMFILE errors. |
487 | * We have a spare file descriptor stashed away that we close to get below |
488 | * the EMFILE limit. Next, we accept all pending connections and close them |
489 | * immediately to signal the clients that we're overloaded - and we are, but |
490 | * we still keep on trucking. |
491 | * |
492 | * There is one caveat: it's not reliable in a multi-threaded environment. |
493 | * The file descriptor limit is per process. Our party trick fails if another |
494 | * thread opens a file or creates a socket in the time window between us |
495 | * calling close() and accept(). |
496 | */ |
497 | static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) { |
498 | int err; |
499 | int emfile_fd; |
500 | |
501 | if (loop->emfile_fd == -1) |
502 | return UV_EMFILE; |
503 | |
504 | uv__close(loop->emfile_fd); |
505 | loop->emfile_fd = -1; |
506 | |
507 | do { |
508 | err = uv__accept(accept_fd); |
509 | if (err >= 0) |
510 | uv__close(err); |
511 | } while (err >= 0 || err == UV_EINTR); |
512 | |
513 | emfile_fd = uv__open_cloexec("/" , O_RDONLY); |
514 | if (emfile_fd >= 0) |
515 | loop->emfile_fd = emfile_fd; |
516 | |
517 | return err; |
518 | } |
519 | |
520 | |
521 | #if defined(UV_HAVE_KQUEUE) |
522 | # define UV_DEC_BACKLOG(w) w->rcount--; |
523 | #else |
524 | # define UV_DEC_BACKLOG(w) /* no-op */ |
525 | #endif /* defined(UV_HAVE_KQUEUE) */ |
526 | |
527 | |
528 | void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { |
529 | uv_stream_t* stream; |
530 | int err; |
531 | |
532 | stream = container_of(w, uv_stream_t, io_watcher); |
533 | assert(events & POLLIN); |
534 | assert(stream->accepted_fd == -1); |
535 | assert(!(stream->flags & UV_HANDLE_CLOSING)); |
536 | |
537 | uv__io_start(stream->loop, &stream->io_watcher, POLLIN); |
538 | |
539 | /* connection_cb can close the server socket while we're |
540 | * in the loop so check it on each iteration. |
541 | */ |
542 | while (uv__stream_fd(stream) != -1) { |
543 | assert(stream->accepted_fd == -1); |
544 | |
545 | #if defined(UV_HAVE_KQUEUE) |
546 | if (w->rcount <= 0) |
547 | return; |
548 | #endif /* defined(UV_HAVE_KQUEUE) */ |
549 | |
550 | err = uv__accept(uv__stream_fd(stream)); |
551 | if (err < 0) { |
552 | if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK)) |
553 | return; /* Not an error. */ |
554 | |
555 | if (err == UV_ECONNABORTED) |
556 | continue; /* Ignore. Nothing we can do about that. */ |
557 | |
558 | if (err == UV_EMFILE || err == UV_ENFILE) { |
559 | err = uv__emfile_trick(loop, uv__stream_fd(stream)); |
560 | if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK)) |
561 | break; |
562 | } |
563 | |
564 | stream->connection_cb(stream, err); |
565 | continue; |
566 | } |
567 | |
568 | UV_DEC_BACKLOG(w) |
569 | stream->accepted_fd = err; |
570 | stream->connection_cb(stream, 0); |
571 | |
572 | if (stream->accepted_fd != -1) { |
573 | /* The user hasn't yet accepted called uv_accept() */ |
574 | uv__io_stop(loop, &stream->io_watcher, POLLIN); |
575 | return; |
576 | } |
577 | |
578 | if (stream->type == UV_TCP && |
579 | (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) { |
580 | /* Give other processes a chance to accept connections. */ |
581 | struct timespec timeout = { 0, 1 }; |
582 | nanosleep(&timeout, NULL); |
583 | } |
584 | } |
585 | } |
586 | |
587 | |
588 | #undef UV_DEC_BACKLOG |
589 | |
590 | |
591 | int uv_accept(uv_stream_t* server, uv_stream_t* client) { |
592 | int err; |
593 | |
594 | assert(server->loop == client->loop); |
595 | |
596 | if (server->accepted_fd == -1) |
597 | return UV_EAGAIN; |
598 | |
599 | switch (client->type) { |
600 | case UV_NAMED_PIPE: |
601 | case UV_TCP: |
602 | err = uv__stream_open(client, |
603 | server->accepted_fd, |
604 | UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); |
605 | if (err) { |
606 | /* TODO handle error */ |
607 | uv__close(server->accepted_fd); |
608 | goto done; |
609 | } |
610 | break; |
611 | |
612 | case UV_UDP: |
613 | err = uv_udp_open((uv_udp_t*) client, server->accepted_fd); |
614 | if (err) { |
615 | uv__close(server->accepted_fd); |
616 | goto done; |
617 | } |
618 | break; |
619 | |
620 | default: |
621 | return UV_EINVAL; |
622 | } |
623 | |
624 | client->flags |= UV_HANDLE_BOUND; |
625 | |
626 | done: |
627 | /* Process queued fds */ |
628 | if (server->queued_fds != NULL) { |
629 | uv__stream_queued_fds_t* queued_fds; |
630 | |
631 | queued_fds = server->queued_fds; |
632 | |
633 | /* Read first */ |
634 | server->accepted_fd = queued_fds->fds[0]; |
635 | |
636 | /* All read, free */ |
637 | assert(queued_fds->offset > 0); |
638 | if (--queued_fds->offset == 0) { |
639 | uv__free(queued_fds); |
640 | server->queued_fds = NULL; |
641 | } else { |
642 | /* Shift rest */ |
643 | memmove(queued_fds->fds, |
644 | queued_fds->fds + 1, |
645 | queued_fds->offset * sizeof(*queued_fds->fds)); |
646 | } |
647 | } else { |
648 | server->accepted_fd = -1; |
649 | if (err == 0) |
650 | uv__io_start(server->loop, &server->io_watcher, POLLIN); |
651 | } |
652 | return err; |
653 | } |
654 | |
655 | |
656 | int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) { |
657 | int err; |
658 | |
659 | switch (stream->type) { |
660 | case UV_TCP: |
661 | err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb); |
662 | break; |
663 | |
664 | case UV_NAMED_PIPE: |
665 | err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb); |
666 | break; |
667 | |
668 | default: |
669 | err = UV_EINVAL; |
670 | } |
671 | |
672 | if (err == 0) |
673 | uv__handle_start(stream); |
674 | |
675 | return err; |
676 | } |
677 | |
678 | |
679 | static void uv__drain(uv_stream_t* stream) { |
680 | uv_shutdown_t* req; |
681 | int err; |
682 | |
683 | assert(QUEUE_EMPTY(&stream->write_queue)); |
684 | uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); |
685 | uv__stream_osx_interrupt_select(stream); |
686 | |
687 | /* Shutdown? */ |
688 | if ((stream->flags & UV_HANDLE_SHUTTING) && |
689 | !(stream->flags & UV_HANDLE_CLOSING) && |
690 | !(stream->flags & UV_HANDLE_SHUT)) { |
691 | assert(stream->shutdown_req); |
692 | |
693 | req = stream->shutdown_req; |
694 | stream->shutdown_req = NULL; |
695 | stream->flags &= ~UV_HANDLE_SHUTTING; |
696 | uv__req_unregister(stream->loop, req); |
697 | |
698 | err = 0; |
699 | if (shutdown(uv__stream_fd(stream), SHUT_WR)) |
700 | err = UV__ERR(errno); |
701 | |
702 | if (err == 0) |
703 | stream->flags |= UV_HANDLE_SHUT; |
704 | |
705 | if (req->cb != NULL) |
706 | req->cb(req, err); |
707 | } |
708 | } |
709 | |
710 | |
711 | static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) { |
712 | if (n == 1) |
713 | return write(fd, vec->iov_base, vec->iov_len); |
714 | else |
715 | return writev(fd, vec, n); |
716 | } |
717 | |
718 | |
719 | static size_t uv__write_req_size(uv_write_t* req) { |
720 | size_t size; |
721 | |
722 | assert(req->bufs != NULL); |
723 | size = uv__count_bufs(req->bufs + req->write_index, |
724 | req->nbufs - req->write_index); |
725 | assert(req->handle->write_queue_size >= size); |
726 | |
727 | return size; |
728 | } |
729 | |
730 | |
731 | /* Returns 1 if all write request data has been written, or 0 if there is still |
732 | * more data to write. |
733 | * |
734 | * Note: the return value only says something about the *current* request. |
735 | * There may still be other write requests sitting in the queue. |
736 | */ |
737 | static int uv__write_req_update(uv_stream_t* stream, |
738 | uv_write_t* req, |
739 | size_t n) { |
740 | uv_buf_t* buf; |
741 | size_t len; |
742 | |
743 | assert(n <= stream->write_queue_size); |
744 | stream->write_queue_size -= n; |
745 | |
746 | buf = req->bufs + req->write_index; |
747 | |
748 | do { |
749 | len = n < buf->len ? n : buf->len; |
750 | buf->base += len; |
751 | buf->len -= len; |
752 | buf += (buf->len == 0); /* Advance to next buffer if this one is empty. */ |
753 | n -= len; |
754 | } while (n > 0); |
755 | |
756 | req->write_index = buf - req->bufs; |
757 | |
758 | return req->write_index == req->nbufs; |
759 | } |
760 | |
761 | |
762 | static void uv__write_req_finish(uv_write_t* req) { |
763 | uv_stream_t* stream = req->handle; |
764 | |
765 | /* Pop the req off tcp->write_queue. */ |
766 | QUEUE_REMOVE(&req->queue); |
767 | |
768 | /* Only free when there was no error. On error, we touch up write_queue_size |
769 | * right before making the callback. The reason we don't do that right away |
770 | * is that a write_queue_size > 0 is our only way to signal to the user that |
771 | * they should stop writing - which they should if we got an error. Something |
772 | * to revisit in future revisions of the libuv API. |
773 | */ |
774 | if (req->error == 0) { |
775 | if (req->bufs != req->bufsml) |
776 | uv__free(req->bufs); |
777 | req->bufs = NULL; |
778 | } |
779 | |
780 | /* Add it to the write_completed_queue where it will have its |
781 | * callback called in the near future. |
782 | */ |
783 | QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue); |
784 | uv__io_feed(stream->loop, &stream->io_watcher); |
785 | } |
786 | |
787 | |
788 | static int uv__handle_fd(uv_handle_t* handle) { |
789 | switch (handle->type) { |
790 | case UV_NAMED_PIPE: |
791 | case UV_TCP: |
792 | return ((uv_stream_t*) handle)->io_watcher.fd; |
793 | |
794 | case UV_UDP: |
795 | return ((uv_udp_t*) handle)->io_watcher.fd; |
796 | |
797 | default: |
798 | return -1; |
799 | } |
800 | } |
801 | |
802 | static void uv__write(uv_stream_t* stream) { |
803 | struct iovec* iov; |
804 | QUEUE* q; |
805 | uv_write_t* req; |
806 | int iovmax; |
807 | int iovcnt; |
808 | ssize_t n; |
809 | int err; |
810 | |
811 | start: |
812 | |
813 | assert(uv__stream_fd(stream) >= 0); |
814 | |
815 | if (QUEUE_EMPTY(&stream->write_queue)) |
816 | return; |
817 | |
818 | q = QUEUE_HEAD(&stream->write_queue); |
819 | req = QUEUE_DATA(q, uv_write_t, queue); |
820 | assert(req->handle == stream); |
821 | |
822 | /* |
823 | * Cast to iovec. We had to have our own uv_buf_t instead of iovec |
824 | * because Windows's WSABUF is not an iovec. |
825 | */ |
826 | assert(sizeof(uv_buf_t) == sizeof(struct iovec)); |
827 | iov = (struct iovec*) &(req->bufs[req->write_index]); |
828 | iovcnt = req->nbufs - req->write_index; |
829 | |
830 | iovmax = uv__getiovmax(); |
831 | |
832 | /* Limit iov count to avoid EINVALs from writev() */ |
833 | if (iovcnt > iovmax) |
834 | iovcnt = iovmax; |
835 | |
836 | /* |
837 | * Now do the actual writev. Note that we've been updating the pointers |
838 | * inside the iov each time we write. So there is no need to offset it. |
839 | */ |
840 | |
841 | if (req->send_handle) { |
842 | int fd_to_send; |
843 | struct msghdr msg; |
844 | struct cmsghdr *cmsg; |
845 | union { |
846 | char data[64]; |
847 | struct cmsghdr alias; |
848 | } scratch; |
849 | |
850 | if (uv__is_closing(req->send_handle)) { |
851 | err = UV_EBADF; |
852 | goto error; |
853 | } |
854 | |
855 | fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle); |
856 | |
857 | memset(&scratch, 0, sizeof(scratch)); |
858 | |
859 | assert(fd_to_send >= 0); |
860 | |
861 | msg.msg_name = NULL; |
862 | msg.msg_namelen = 0; |
863 | msg.msg_iov = iov; |
864 | msg.msg_iovlen = iovcnt; |
865 | msg.msg_flags = 0; |
866 | |
867 | msg.msg_control = &scratch.alias; |
868 | msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send)); |
869 | |
870 | cmsg = CMSG_FIRSTHDR(&msg); |
871 | cmsg->cmsg_level = SOL_SOCKET; |
872 | cmsg->cmsg_type = SCM_RIGHTS; |
873 | cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send)); |
874 | |
875 | /* silence aliasing warning */ |
876 | { |
877 | void* pv = CMSG_DATA(cmsg); |
878 | int* pi = pv; |
879 | *pi = fd_to_send; |
880 | } |
881 | |
882 | do |
883 | n = sendmsg(uv__stream_fd(stream), &msg, 0); |
884 | while (n == -1 && RETRY_ON_WRITE_ERROR(errno)); |
885 | |
886 | /* Ensure the handle isn't sent again in case this is a partial write. */ |
887 | if (n >= 0) |
888 | req->send_handle = NULL; |
889 | } else { |
890 | do |
891 | n = uv__writev(uv__stream_fd(stream), iov, iovcnt); |
892 | while (n == -1 && RETRY_ON_WRITE_ERROR(errno)); |
893 | } |
894 | |
895 | if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) { |
896 | err = UV__ERR(errno); |
897 | goto error; |
898 | } |
899 | |
900 | if (n >= 0 && uv__write_req_update(stream, req, n)) { |
901 | uv__write_req_finish(req); |
902 | return; /* TODO(bnoordhuis) Start trying to write the next request. */ |
903 | } |
904 | |
905 | /* If this is a blocking stream, try again. */ |
906 | if (stream->flags & UV_HANDLE_BLOCKING_WRITES) |
907 | goto start; |
908 | |
909 | /* We're not done. */ |
910 | uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); |
911 | |
912 | /* Notify select() thread about state change */ |
913 | uv__stream_osx_interrupt_select(stream); |
914 | |
915 | return; |
916 | |
917 | error: |
918 | req->error = err; |
919 | uv__write_req_finish(req); |
920 | uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); |
921 | if (!uv__io_active(&stream->io_watcher, POLLIN)) |
922 | uv__handle_stop(stream); |
923 | uv__stream_osx_interrupt_select(stream); |
924 | } |
925 | |
926 | |
927 | static void uv__write_callbacks(uv_stream_t* stream) { |
928 | uv_write_t* req; |
929 | QUEUE* q; |
930 | QUEUE pq; |
931 | |
932 | if (QUEUE_EMPTY(&stream->write_completed_queue)) |
933 | return; |
934 | |
935 | QUEUE_MOVE(&stream->write_completed_queue, &pq); |
936 | |
937 | while (!QUEUE_EMPTY(&pq)) { |
938 | /* Pop a req off write_completed_queue. */ |
939 | q = QUEUE_HEAD(&pq); |
940 | req = QUEUE_DATA(q, uv_write_t, queue); |
941 | QUEUE_REMOVE(q); |
942 | uv__req_unregister(stream->loop, req); |
943 | |
944 | if (req->bufs != NULL) { |
945 | stream->write_queue_size -= uv__write_req_size(req); |
946 | if (req->bufs != req->bufsml) |
947 | uv__free(req->bufs); |
948 | req->bufs = NULL; |
949 | } |
950 | |
951 | /* NOTE: call callback AFTER freeing the request data. */ |
952 | if (req->cb) |
953 | req->cb(req, req->error); |
954 | } |
955 | } |
956 | |
957 | |
958 | uv_handle_type uv__handle_type(int fd) { |
959 | struct sockaddr_storage ss; |
960 | socklen_t sslen; |
961 | socklen_t len; |
962 | int type; |
963 | |
964 | memset(&ss, 0, sizeof(ss)); |
965 | sslen = sizeof(ss); |
966 | |
967 | if (getsockname(fd, (struct sockaddr*)&ss, &sslen)) |
968 | return UV_UNKNOWN_HANDLE; |
969 | |
970 | len = sizeof type; |
971 | |
972 | if (getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &len)) |
973 | return UV_UNKNOWN_HANDLE; |
974 | |
975 | if (type == SOCK_STREAM) { |
976 | #if defined(_AIX) || defined(__DragonFly__) |
977 | /* on AIX/DragonFly the getsockname call returns an empty sa structure |
978 | * for sockets of type AF_UNIX. For all other types it will |
979 | * return a properly filled in structure. |
980 | */ |
981 | if (sslen == 0) |
982 | return UV_NAMED_PIPE; |
983 | #endif |
984 | switch (ss.ss_family) { |
985 | case AF_UNIX: |
986 | return UV_NAMED_PIPE; |
987 | case AF_INET: |
988 | case AF_INET6: |
989 | return UV_TCP; |
990 | } |
991 | } |
992 | |
993 | if (type == SOCK_DGRAM && |
994 | (ss.ss_family == AF_INET || ss.ss_family == AF_INET6)) |
995 | return UV_UDP; |
996 | |
997 | return UV_UNKNOWN_HANDLE; |
998 | } |
999 | |
1000 | |
1001 | static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) { |
1002 | stream->flags |= UV_HANDLE_READ_EOF; |
1003 | stream->flags &= ~UV_HANDLE_READING; |
1004 | uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); |
1005 | if (!uv__io_active(&stream->io_watcher, POLLOUT)) |
1006 | uv__handle_stop(stream); |
1007 | uv__stream_osx_interrupt_select(stream); |
1008 | stream->read_cb(stream, UV_EOF, buf); |
1009 | } |
1010 | |
1011 | |
1012 | static int uv__stream_queue_fd(uv_stream_t* stream, int fd) { |
1013 | uv__stream_queued_fds_t* queued_fds; |
1014 | unsigned int queue_size; |
1015 | |
1016 | queued_fds = stream->queued_fds; |
1017 | if (queued_fds == NULL) { |
1018 | queue_size = 8; |
1019 | queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) + |
1020 | sizeof(*queued_fds)); |
1021 | if (queued_fds == NULL) |
1022 | return UV_ENOMEM; |
1023 | queued_fds->size = queue_size; |
1024 | queued_fds->offset = 0; |
1025 | stream->queued_fds = queued_fds; |
1026 | |
1027 | /* Grow */ |
1028 | } else if (queued_fds->size == queued_fds->offset) { |
1029 | queue_size = queued_fds->size + 8; |
1030 | queued_fds = uv__realloc(queued_fds, |
1031 | (queue_size - 1) * sizeof(*queued_fds->fds) + |
1032 | sizeof(*queued_fds)); |
1033 | |
1034 | /* |
1035 | * Allocation failure, report back. |
1036 | * NOTE: if it is fatal - sockets will be closed in uv__stream_close |
1037 | */ |
1038 | if (queued_fds == NULL) |
1039 | return UV_ENOMEM; |
1040 | queued_fds->size = queue_size; |
1041 | stream->queued_fds = queued_fds; |
1042 | } |
1043 | |
1044 | /* Put fd in a queue */ |
1045 | queued_fds->fds[queued_fds->offset++] = fd; |
1046 | |
1047 | return 0; |
1048 | } |
1049 | |
1050 | |
1051 | #if defined(__PASE__) |
1052 | /* on IBMi PASE the control message length can not exceed 256. */ |
1053 | # define UV__CMSG_FD_COUNT 60 |
1054 | #else |
1055 | # define UV__CMSG_FD_COUNT 64 |
1056 | #endif |
1057 | #define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int)) |
1058 | |
1059 | |
1060 | static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) { |
1061 | struct cmsghdr* cmsg; |
1062 | |
1063 | for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) { |
1064 | char* start; |
1065 | char* end; |
1066 | int err; |
1067 | void* pv; |
1068 | int* pi; |
1069 | unsigned int i; |
1070 | unsigned int count; |
1071 | |
1072 | if (cmsg->cmsg_type != SCM_RIGHTS) { |
1073 | fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n" , |
1074 | cmsg->cmsg_type); |
1075 | continue; |
1076 | } |
1077 | |
1078 | /* silence aliasing warning */ |
1079 | pv = CMSG_DATA(cmsg); |
1080 | pi = pv; |
1081 | |
1082 | /* Count available fds */ |
1083 | start = (char*) cmsg; |
1084 | end = (char*) cmsg + cmsg->cmsg_len; |
1085 | count = 0; |
1086 | while (start + CMSG_LEN(count * sizeof(*pi)) < end) |
1087 | count++; |
1088 | assert(start + CMSG_LEN(count * sizeof(*pi)) == end); |
1089 | |
1090 | for (i = 0; i < count; i++) { |
1091 | /* Already has accepted fd, queue now */ |
1092 | if (stream->accepted_fd != -1) { |
1093 | err = uv__stream_queue_fd(stream, pi[i]); |
1094 | if (err != 0) { |
1095 | /* Close rest */ |
1096 | for (; i < count; i++) |
1097 | uv__close(pi[i]); |
1098 | return err; |
1099 | } |
1100 | } else { |
1101 | stream->accepted_fd = pi[i]; |
1102 | } |
1103 | } |
1104 | } |
1105 | |
1106 | return 0; |
1107 | } |
1108 | |
1109 | |
1110 | #ifdef __clang__ |
1111 | # pragma clang diagnostic push |
1112 | # pragma clang diagnostic ignored "-Wgnu-folding-constant" |
1113 | # pragma clang diagnostic ignored "-Wvla-extension" |
1114 | #endif |
1115 | |
1116 | static void uv__read(uv_stream_t* stream) { |
1117 | uv_buf_t buf; |
1118 | ssize_t nread; |
1119 | struct msghdr msg; |
1120 | char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)]; |
1121 | int count; |
1122 | int err; |
1123 | int is_ipc; |
1124 | |
1125 | stream->flags &= ~UV_HANDLE_READ_PARTIAL; |
1126 | |
1127 | /* Prevent loop starvation when the data comes in as fast as (or faster than) |
1128 | * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O. |
1129 | */ |
1130 | count = 32; |
1131 | |
1132 | is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc; |
1133 | |
1134 | /* XXX: Maybe instead of having UV_HANDLE_READING we just test if |
1135 | * tcp->read_cb is NULL or not? |
1136 | */ |
1137 | while (stream->read_cb |
1138 | && (stream->flags & UV_HANDLE_READING) |
1139 | && (count-- > 0)) { |
1140 | assert(stream->alloc_cb != NULL); |
1141 | |
1142 | buf = uv_buf_init(NULL, 0); |
1143 | stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf); |
1144 | if (buf.base == NULL || buf.len == 0) { |
1145 | /* User indicates it can't or won't handle the read. */ |
1146 | stream->read_cb(stream, UV_ENOBUFS, &buf); |
1147 | return; |
1148 | } |
1149 | |
1150 | assert(buf.base != NULL); |
1151 | assert(uv__stream_fd(stream) >= 0); |
1152 | |
1153 | if (!is_ipc) { |
1154 | do { |
1155 | nread = read(uv__stream_fd(stream), buf.base, buf.len); |
1156 | } |
1157 | while (nread < 0 && errno == EINTR); |
1158 | } else { |
1159 | /* ipc uses recvmsg */ |
1160 | msg.msg_flags = 0; |
1161 | msg.msg_iov = (struct iovec*) &buf; |
1162 | msg.msg_iovlen = 1; |
1163 | msg.msg_name = NULL; |
1164 | msg.msg_namelen = 0; |
1165 | /* Set up to receive a descriptor even if one isn't in the message */ |
1166 | msg.msg_controllen = sizeof(cmsg_space); |
1167 | msg.msg_control = cmsg_space; |
1168 | |
1169 | do { |
1170 | nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0); |
1171 | } |
1172 | while (nread < 0 && errno == EINTR); |
1173 | } |
1174 | |
1175 | if (nread < 0) { |
1176 | /* Error */ |
1177 | if (errno == EAGAIN || errno == EWOULDBLOCK) { |
1178 | /* Wait for the next one. */ |
1179 | if (stream->flags & UV_HANDLE_READING) { |
1180 | uv__io_start(stream->loop, &stream->io_watcher, POLLIN); |
1181 | uv__stream_osx_interrupt_select(stream); |
1182 | } |
1183 | stream->read_cb(stream, 0, &buf); |
1184 | #if defined(__CYGWIN__) || defined(__MSYS__) |
1185 | } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) { |
1186 | uv__stream_eof(stream, &buf); |
1187 | return; |
1188 | #endif |
1189 | } else { |
1190 | /* Error. User should call uv_close(). */ |
1191 | stream->read_cb(stream, UV__ERR(errno), &buf); |
1192 | if (stream->flags & UV_HANDLE_READING) { |
1193 | stream->flags &= ~UV_HANDLE_READING; |
1194 | uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); |
1195 | if (!uv__io_active(&stream->io_watcher, POLLOUT)) |
1196 | uv__handle_stop(stream); |
1197 | uv__stream_osx_interrupt_select(stream); |
1198 | } |
1199 | } |
1200 | return; |
1201 | } else if (nread == 0) { |
1202 | uv__stream_eof(stream, &buf); |
1203 | return; |
1204 | } else { |
1205 | /* Successful read */ |
1206 | ssize_t buflen = buf.len; |
1207 | |
1208 | if (is_ipc) { |
1209 | err = uv__stream_recv_cmsg(stream, &msg); |
1210 | if (err != 0) { |
1211 | stream->read_cb(stream, err, &buf); |
1212 | return; |
1213 | } |
1214 | } |
1215 | |
1216 | #if defined(__MVS__) |
1217 | if (is_ipc && msg.msg_controllen > 0) { |
1218 | uv_buf_t blankbuf; |
1219 | int nread; |
1220 | struct iovec *old; |
1221 | |
1222 | blankbuf.base = 0; |
1223 | blankbuf.len = 0; |
1224 | old = msg.msg_iov; |
1225 | msg.msg_iov = (struct iovec*) &blankbuf; |
1226 | nread = 0; |
1227 | do { |
1228 | nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0); |
1229 | err = uv__stream_recv_cmsg(stream, &msg); |
1230 | if (err != 0) { |
1231 | stream->read_cb(stream, err, &buf); |
1232 | msg.msg_iov = old; |
1233 | return; |
1234 | } |
1235 | } while (nread == 0 && msg.msg_controllen > 0); |
1236 | msg.msg_iov = old; |
1237 | } |
1238 | #endif |
1239 | stream->read_cb(stream, nread, &buf); |
1240 | |
1241 | /* Return if we didn't fill the buffer, there is no more data to read. */ |
1242 | if (nread < buflen) { |
1243 | stream->flags |= UV_HANDLE_READ_PARTIAL; |
1244 | return; |
1245 | } |
1246 | } |
1247 | } |
1248 | } |
1249 | |
1250 | |
1251 | #ifdef __clang__ |
1252 | # pragma clang diagnostic pop |
1253 | #endif |
1254 | |
1255 | #undef UV__CMSG_FD_COUNT |
1256 | #undef UV__CMSG_FD_SIZE |
1257 | |
1258 | |
1259 | int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) { |
1260 | assert(stream->type == UV_TCP || |
1261 | stream->type == UV_TTY || |
1262 | stream->type == UV_NAMED_PIPE); |
1263 | |
1264 | if (!(stream->flags & UV_HANDLE_WRITABLE) || |
1265 | stream->flags & UV_HANDLE_SHUT || |
1266 | stream->flags & UV_HANDLE_SHUTTING || |
1267 | uv__is_closing(stream)) { |
1268 | return UV_ENOTCONN; |
1269 | } |
1270 | |
1271 | assert(uv__stream_fd(stream) >= 0); |
1272 | |
1273 | /* Initialize request */ |
1274 | uv__req_init(stream->loop, req, UV_SHUTDOWN); |
1275 | req->handle = stream; |
1276 | req->cb = cb; |
1277 | stream->shutdown_req = req; |
1278 | stream->flags |= UV_HANDLE_SHUTTING; |
1279 | |
1280 | uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); |
1281 | uv__stream_osx_interrupt_select(stream); |
1282 | |
1283 | return 0; |
1284 | } |
1285 | |
1286 | |
1287 | static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { |
1288 | uv_stream_t* stream; |
1289 | |
1290 | stream = container_of(w, uv_stream_t, io_watcher); |
1291 | |
1292 | assert(stream->type == UV_TCP || |
1293 | stream->type == UV_NAMED_PIPE || |
1294 | stream->type == UV_TTY); |
1295 | assert(!(stream->flags & UV_HANDLE_CLOSING)); |
1296 | |
1297 | if (stream->connect_req) { |
1298 | uv__stream_connect(stream); |
1299 | return; |
1300 | } |
1301 | |
1302 | assert(uv__stream_fd(stream) >= 0); |
1303 | |
1304 | /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */ |
1305 | if (events & (POLLIN | POLLERR | POLLHUP)) |
1306 | uv__read(stream); |
1307 | |
1308 | if (uv__stream_fd(stream) == -1) |
1309 | return; /* read_cb closed stream. */ |
1310 | |
1311 | /* Short-circuit iff POLLHUP is set, the user is still interested in read |
1312 | * events and uv__read() reported a partial read but not EOF. If the EOF |
1313 | * flag is set, uv__read() called read_cb with err=UV_EOF and we don't |
1314 | * have to do anything. If the partial read flag is not set, we can't |
1315 | * report the EOF yet because there is still data to read. |
1316 | */ |
1317 | if ((events & POLLHUP) && |
1318 | (stream->flags & UV_HANDLE_READING) && |
1319 | (stream->flags & UV_HANDLE_READ_PARTIAL) && |
1320 | !(stream->flags & UV_HANDLE_READ_EOF)) { |
1321 | uv_buf_t buf = { NULL, 0 }; |
1322 | uv__stream_eof(stream, &buf); |
1323 | } |
1324 | |
1325 | if (uv__stream_fd(stream) == -1) |
1326 | return; /* read_cb closed stream. */ |
1327 | |
1328 | if (events & (POLLOUT | POLLERR | POLLHUP)) { |
1329 | uv__write(stream); |
1330 | uv__write_callbacks(stream); |
1331 | |
1332 | /* Write queue drained. */ |
1333 | if (QUEUE_EMPTY(&stream->write_queue)) |
1334 | uv__drain(stream); |
1335 | } |
1336 | } |
1337 | |
1338 | |
1339 | /** |
1340 | * We get called here from directly following a call to connect(2). |
1341 | * In order to determine if we've errored out or succeeded must call |
1342 | * getsockopt. |
1343 | */ |
1344 | static void uv__stream_connect(uv_stream_t* stream) { |
1345 | int error; |
1346 | uv_connect_t* req = stream->connect_req; |
1347 | socklen_t errorsize = sizeof(int); |
1348 | |
1349 | assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE); |
1350 | assert(req); |
1351 | |
1352 | if (stream->delayed_error) { |
1353 | /* To smooth over the differences between unixes errors that |
1354 | * were reported synchronously on the first connect can be delayed |
1355 | * until the next tick--which is now. |
1356 | */ |
1357 | error = stream->delayed_error; |
1358 | stream->delayed_error = 0; |
1359 | } else { |
1360 | /* Normal situation: we need to get the socket error from the kernel. */ |
1361 | assert(uv__stream_fd(stream) >= 0); |
1362 | getsockopt(uv__stream_fd(stream), |
1363 | SOL_SOCKET, |
1364 | SO_ERROR, |
1365 | &error, |
1366 | &errorsize); |
1367 | error = UV__ERR(error); |
1368 | } |
1369 | |
1370 | if (error == UV__ERR(EINPROGRESS)) |
1371 | return; |
1372 | |
1373 | stream->connect_req = NULL; |
1374 | uv__req_unregister(stream->loop, req); |
1375 | |
1376 | if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) { |
1377 | uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); |
1378 | } |
1379 | |
1380 | if (req->cb) |
1381 | req->cb(req, error); |
1382 | |
1383 | if (uv__stream_fd(stream) == -1) |
1384 | return; |
1385 | |
1386 | if (error < 0) { |
1387 | uv__stream_flush_write_queue(stream, UV_ECANCELED); |
1388 | uv__write_callbacks(stream); |
1389 | } |
1390 | } |
1391 | |
1392 | |
1393 | int uv_write2(uv_write_t* req, |
1394 | uv_stream_t* stream, |
1395 | const uv_buf_t bufs[], |
1396 | unsigned int nbufs, |
1397 | uv_stream_t* send_handle, |
1398 | uv_write_cb cb) { |
1399 | int empty_queue; |
1400 | |
1401 | assert(nbufs > 0); |
1402 | assert((stream->type == UV_TCP || |
1403 | stream->type == UV_NAMED_PIPE || |
1404 | stream->type == UV_TTY) && |
1405 | "uv_write (unix) does not yet support other types of streams" ); |
1406 | |
1407 | if (uv__stream_fd(stream) < 0) |
1408 | return UV_EBADF; |
1409 | |
1410 | if (!(stream->flags & UV_HANDLE_WRITABLE)) |
1411 | return UV_EPIPE; |
1412 | |
1413 | if (send_handle) { |
1414 | if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc) |
1415 | return UV_EINVAL; |
1416 | |
1417 | /* XXX We abuse uv_write2() to send over UDP handles to child processes. |
1418 | * Don't call uv__stream_fd() on those handles, it's a macro that on OS X |
1419 | * evaluates to a function that operates on a uv_stream_t with a couple of |
1420 | * OS X specific fields. On other Unices it does (handle)->io_watcher.fd, |
1421 | * which works but only by accident. |
1422 | */ |
1423 | if (uv__handle_fd((uv_handle_t*) send_handle) < 0) |
1424 | return UV_EBADF; |
1425 | |
1426 | #if defined(__CYGWIN__) || defined(__MSYS__) |
1427 | /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it. |
1428 | See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */ |
1429 | return UV_ENOSYS; |
1430 | #endif |
1431 | } |
1432 | |
1433 | /* It's legal for write_queue_size > 0 even when the write_queue is empty; |
1434 | * it means there are error-state requests in the write_completed_queue that |
1435 | * will touch up write_queue_size later, see also uv__write_req_finish(). |
1436 | * We could check that write_queue is empty instead but that implies making |
1437 | * a write() syscall when we know that the handle is in error mode. |
1438 | */ |
1439 | empty_queue = (stream->write_queue_size == 0); |
1440 | |
1441 | /* Initialize the req */ |
1442 | uv__req_init(stream->loop, req, UV_WRITE); |
1443 | req->cb = cb; |
1444 | req->handle = stream; |
1445 | req->error = 0; |
1446 | req->send_handle = send_handle; |
1447 | QUEUE_INIT(&req->queue); |
1448 | |
1449 | req->bufs = req->bufsml; |
1450 | if (nbufs > ARRAY_SIZE(req->bufsml)) |
1451 | req->bufs = uv__malloc(nbufs * sizeof(bufs[0])); |
1452 | |
1453 | if (req->bufs == NULL) |
1454 | return UV_ENOMEM; |
1455 | |
1456 | memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0])); |
1457 | req->nbufs = nbufs; |
1458 | req->write_index = 0; |
1459 | stream->write_queue_size += uv__count_bufs(bufs, nbufs); |
1460 | |
1461 | /* Append the request to write_queue. */ |
1462 | QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue); |
1463 | |
1464 | /* If the queue was empty when this function began, we should attempt to |
1465 | * do the write immediately. Otherwise start the write_watcher and wait |
1466 | * for the fd to become writable. |
1467 | */ |
1468 | if (stream->connect_req) { |
1469 | /* Still connecting, do nothing. */ |
1470 | } |
1471 | else if (empty_queue) { |
1472 | uv__write(stream); |
1473 | } |
1474 | else { |
1475 | /* |
1476 | * blocking streams should never have anything in the queue. |
1477 | * if this assert fires then somehow the blocking stream isn't being |
1478 | * sufficiently flushed in uv__write. |
1479 | */ |
1480 | assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES)); |
1481 | uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); |
1482 | uv__stream_osx_interrupt_select(stream); |
1483 | } |
1484 | |
1485 | return 0; |
1486 | } |
1487 | |
1488 | |
1489 | /* The buffers to be written must remain valid until the callback is called. |
1490 | * This is not required for the uv_buf_t array. |
1491 | */ |
1492 | int uv_write(uv_write_t* req, |
1493 | uv_stream_t* handle, |
1494 | const uv_buf_t bufs[], |
1495 | unsigned int nbufs, |
1496 | uv_write_cb cb) { |
1497 | return uv_write2(req, handle, bufs, nbufs, NULL, cb); |
1498 | } |
1499 | |
1500 | |
1501 | void uv_try_write_cb(uv_write_t* req, int status) { |
1502 | /* Should not be called */ |
1503 | abort(); |
1504 | } |
1505 | |
1506 | |
1507 | int uv_try_write(uv_stream_t* stream, |
1508 | const uv_buf_t bufs[], |
1509 | unsigned int nbufs) { |
1510 | int r; |
1511 | int has_pollout; |
1512 | size_t written; |
1513 | size_t req_size; |
1514 | uv_write_t req; |
1515 | |
1516 | /* Connecting or already writing some data */ |
1517 | if (stream->connect_req != NULL || stream->write_queue_size != 0) |
1518 | return UV_EAGAIN; |
1519 | |
1520 | has_pollout = uv__io_active(&stream->io_watcher, POLLOUT); |
1521 | |
1522 | r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb); |
1523 | if (r != 0) |
1524 | return r; |
1525 | |
1526 | /* Remove not written bytes from write queue size */ |
1527 | written = uv__count_bufs(bufs, nbufs); |
1528 | if (req.bufs != NULL) |
1529 | req_size = uv__write_req_size(&req); |
1530 | else |
1531 | req_size = 0; |
1532 | written -= req_size; |
1533 | stream->write_queue_size -= req_size; |
1534 | |
1535 | /* Unqueue request, regardless of immediateness */ |
1536 | QUEUE_REMOVE(&req.queue); |
1537 | uv__req_unregister(stream->loop, &req); |
1538 | if (req.bufs != req.bufsml) |
1539 | uv__free(req.bufs); |
1540 | req.bufs = NULL; |
1541 | |
1542 | /* Do not poll for writable, if we wasn't before calling this */ |
1543 | if (!has_pollout) { |
1544 | uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); |
1545 | uv__stream_osx_interrupt_select(stream); |
1546 | } |
1547 | |
1548 | if (written == 0 && req_size != 0) |
1549 | return req.error < 0 ? req.error : UV_EAGAIN; |
1550 | else |
1551 | return written; |
1552 | } |
1553 | |
1554 | |
1555 | int uv__read_start(uv_stream_t* stream, |
1556 | uv_alloc_cb alloc_cb, |
1557 | uv_read_cb read_cb) { |
1558 | assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || |
1559 | stream->type == UV_TTY); |
1560 | |
1561 | /* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just |
1562 | * expresses the desired state of the user. |
1563 | */ |
1564 | stream->flags |= UV_HANDLE_READING; |
1565 | |
1566 | /* TODO: try to do the read inline? */ |
1567 | /* TODO: keep track of tcp state. If we've gotten a EOF then we should |
1568 | * not start the IO watcher. |
1569 | */ |
1570 | assert(uv__stream_fd(stream) >= 0); |
1571 | assert(alloc_cb); |
1572 | |
1573 | stream->read_cb = read_cb; |
1574 | stream->alloc_cb = alloc_cb; |
1575 | |
1576 | uv__io_start(stream->loop, &stream->io_watcher, POLLIN); |
1577 | uv__handle_start(stream); |
1578 | uv__stream_osx_interrupt_select(stream); |
1579 | |
1580 | return 0; |
1581 | } |
1582 | |
1583 | |
1584 | int uv_read_stop(uv_stream_t* stream) { |
1585 | if (!(stream->flags & UV_HANDLE_READING)) |
1586 | return 0; |
1587 | |
1588 | stream->flags &= ~UV_HANDLE_READING; |
1589 | uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); |
1590 | if (!uv__io_active(&stream->io_watcher, POLLOUT)) |
1591 | uv__handle_stop(stream); |
1592 | uv__stream_osx_interrupt_select(stream); |
1593 | |
1594 | stream->read_cb = NULL; |
1595 | stream->alloc_cb = NULL; |
1596 | return 0; |
1597 | } |
1598 | |
1599 | |
1600 | int uv_is_readable(const uv_stream_t* stream) { |
1601 | return !!(stream->flags & UV_HANDLE_READABLE); |
1602 | } |
1603 | |
1604 | |
1605 | int uv_is_writable(const uv_stream_t* stream) { |
1606 | return !!(stream->flags & UV_HANDLE_WRITABLE); |
1607 | } |
1608 | |
1609 | |
1610 | #if defined(__APPLE__) |
1611 | int uv___stream_fd(const uv_stream_t* handle) { |
1612 | const uv__stream_select_t* s; |
1613 | |
1614 | assert(handle->type == UV_TCP || |
1615 | handle->type == UV_TTY || |
1616 | handle->type == UV_NAMED_PIPE); |
1617 | |
1618 | s = handle->select; |
1619 | if (s != NULL) |
1620 | return s->fd; |
1621 | |
1622 | return handle->io_watcher.fd; |
1623 | } |
1624 | #endif /* defined(__APPLE__) */ |
1625 | |
1626 | |
1627 | void uv__stream_close(uv_stream_t* handle) { |
1628 | unsigned int i; |
1629 | uv__stream_queued_fds_t* queued_fds; |
1630 | |
1631 | #if defined(__APPLE__) |
1632 | /* Terminate select loop first */ |
1633 | if (handle->select != NULL) { |
1634 | uv__stream_select_t* s; |
1635 | |
1636 | s = handle->select; |
1637 | |
1638 | uv_sem_post(&s->close_sem); |
1639 | uv_sem_post(&s->async_sem); |
1640 | uv__stream_osx_interrupt_select(handle); |
1641 | uv_thread_join(&s->thread); |
1642 | uv_sem_destroy(&s->close_sem); |
1643 | uv_sem_destroy(&s->async_sem); |
1644 | uv__close(s->fake_fd); |
1645 | uv__close(s->int_fd); |
1646 | uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close); |
1647 | |
1648 | handle->select = NULL; |
1649 | } |
1650 | #endif /* defined(__APPLE__) */ |
1651 | |
1652 | uv__io_close(handle->loop, &handle->io_watcher); |
1653 | uv_read_stop(handle); |
1654 | uv__handle_stop(handle); |
1655 | handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); |
1656 | |
1657 | if (handle->io_watcher.fd != -1) { |
1658 | /* Don't close stdio file descriptors. Nothing good comes from it. */ |
1659 | if (handle->io_watcher.fd > STDERR_FILENO) |
1660 | uv__close(handle->io_watcher.fd); |
1661 | handle->io_watcher.fd = -1; |
1662 | } |
1663 | |
1664 | if (handle->accepted_fd != -1) { |
1665 | uv__close(handle->accepted_fd); |
1666 | handle->accepted_fd = -1; |
1667 | } |
1668 | |
1669 | /* Close all queued fds */ |
1670 | if (handle->queued_fds != NULL) { |
1671 | queued_fds = handle->queued_fds; |
1672 | for (i = 0; i < queued_fds->offset; i++) |
1673 | uv__close(queued_fds->fds[i]); |
1674 | uv__free(handle->queued_fds); |
1675 | handle->queued_fds = NULL; |
1676 | } |
1677 | |
1678 | assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT)); |
1679 | } |
1680 | |
1681 | |
1682 | int uv_stream_set_blocking(uv_stream_t* handle, int blocking) { |
1683 | /* Don't need to check the file descriptor, uv__nonblock() |
1684 | * will fail with EBADF if it's not valid. |
1685 | */ |
1686 | return uv__nonblock(uv__stream_fd(handle), !blocking); |
1687 | } |
1688 | |