1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 *
3 * Permission is hereby granted, free of charge, to any person obtaining a copy
4 * of this software and associated documentation files (the "Software"), to
5 * deal in the Software without restriction, including without limitation the
6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 * sell copies of the Software, and to permit persons to whom the Software is
8 * furnished to do so, subject to the following conditions:
9 *
10 * The above copyright notice and this permission notice shall be included in
11 * all copies or substantial portions of the Software.
12 *
13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 * IN THE SOFTWARE.
20 */
21
22#include "uv.h"
23#include "task.h"
24
25#include <stdio.h>
26#include <string.h>
27
28static uv_pipe_t channel;
29static uv_tcp_t tcp_server;
30static uv_tcp_t tcp_server2;
31static uv_tcp_t tcp_connection;
32
33static int exit_cb_called;
34static int read_cb_called;
35static int tcp_write_cb_called;
36static int tcp_read_cb_called;
37static int on_pipe_read_called;
38static int local_conn_accepted;
39static int remote_conn_accepted;
40static int tcp_server_listening;
41static uv_write_t write_req;
42static uv_write_t write_req2;
43static uv_write_t conn_notify_req;
44static int close_cb_called;
45static int connection_accepted;
46static int tcp_conn_read_cb_called;
47static int tcp_conn_write_cb_called;
48static int closed_handle_data_read;
49static int closed_handle_write;
50static int send_zero_write;
51
52typedef struct {
53 uv_connect_t conn_req;
54 uv_write_t tcp_write_req;
55 uv_tcp_t conn;
56} tcp_conn;
57
58#define CONN_COUNT 100
59#define BACKLOG 128
60#define LARGE_SIZE 100000
61
62static uv_buf_t large_buf;
63static char buffer[LARGE_SIZE];
64static uv_write_t write_reqs[300];
65static int write_reqs_completed;
66
67static unsigned int write_until_data_queued(void);
68static void send_handle_and_close(void);
69
70
71static void close_server_conn_cb(uv_handle_t* handle) {
72 free(handle);
73}
74
75
76static void on_connection(uv_stream_t* server, int status) {
77 uv_tcp_t* conn;
78 int r;
79
80 if (!local_conn_accepted) {
81 /* Accept the connection and close it. Also and close the server. */
82 ASSERT(status == 0);
83 ASSERT((uv_stream_t*)&tcp_server == server);
84
85 conn = malloc(sizeof(*conn));
86 ASSERT(conn);
87 r = uv_tcp_init(server->loop, conn);
88 ASSERT(r == 0);
89
90 r = uv_accept(server, (uv_stream_t*)conn);
91 ASSERT(r == 0);
92
93 uv_close((uv_handle_t*)conn, close_server_conn_cb);
94 uv_close((uv_handle_t*)server, NULL);
95 local_conn_accepted = 1;
96 }
97}
98
99
100static void exit_cb(uv_process_t* process,
101 int64_t exit_status,
102 int term_signal) {
103 printf("exit_cb\n");
104 exit_cb_called++;
105 ASSERT(exit_status == 0);
106 ASSERT(term_signal == 0);
107 uv_close((uv_handle_t*)process, NULL);
108}
109
110
111static void on_alloc(uv_handle_t* handle,
112 size_t suggested_size,
113 uv_buf_t* buf) {
114 buf->base = malloc(suggested_size);
115 buf->len = suggested_size;
116}
117
118
119static void close_client_conn_cb(uv_handle_t* handle) {
120 tcp_conn* p = (tcp_conn*)handle->data;
121 free(p);
122}
123
124
125static void connect_cb(uv_connect_t* req, int status) {
126 uv_close((uv_handle_t*)req->handle, close_client_conn_cb);
127}
128
129
130static void make_many_connections(void) {
131 tcp_conn* conn;
132 struct sockaddr_in addr;
133 int r, i;
134
135 for (i = 0; i < CONN_COUNT; i++) {
136 conn = malloc(sizeof(*conn));
137 ASSERT(conn);
138
139 r = uv_tcp_init(uv_default_loop(), &conn->conn);
140 ASSERT(r == 0);
141
142 ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
143
144 r = uv_tcp_connect(&conn->conn_req,
145 (uv_tcp_t*) &conn->conn,
146 (const struct sockaddr*) &addr,
147 connect_cb);
148 ASSERT(r == 0);
149
150 conn->conn.data = conn;
151 }
152}
153
154
155static void on_read(uv_stream_t* handle,
156 ssize_t nread,
157 const uv_buf_t* buf) {
158 int r;
159 uv_pipe_t* pipe;
160 uv_handle_type pending;
161 uv_buf_t outbuf;
162
163 pipe = (uv_pipe_t*) handle;
164
165 if (nread == 0) {
166 /* Everything OK, but nothing read. */
167 free(buf->base);
168 return;
169 }
170
171 if (nread < 0) {
172 if (nread == UV_EOF) {
173 free(buf->base);
174 return;
175 }
176
177 printf("error recving on channel: %s\n", uv_strerror(nread));
178 abort();
179 }
180
181 fprintf(stderr, "got %d bytes\n", (int)nread);
182
183 pending = uv_pipe_pending_type(pipe);
184 if (!tcp_server_listening) {
185 ASSERT(1 == uv_pipe_pending_count(pipe));
186 ASSERT(nread > 0 && buf->base && pending != UV_UNKNOWN_HANDLE);
187 read_cb_called++;
188
189 /* Accept the pending TCP server, and start listening on it. */
190 ASSERT(pending == UV_TCP);
191 r = uv_tcp_init(uv_default_loop(), &tcp_server);
192 ASSERT(r == 0);
193
194 r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
195 ASSERT(r == 0);
196
197 r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, on_connection);
198 ASSERT(r == 0);
199
200 tcp_server_listening = 1;
201
202 /* Make sure that the expected data is correctly multiplexed. */
203 ASSERT(memcmp("hello\n", buf->base, nread) == 0);
204
205 outbuf = uv_buf_init("world\n", 6);
206 r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL);
207 ASSERT(r == 0);
208
209 /* Create a bunch of connections to get both servers to accept. */
210 make_many_connections();
211 } else if (memcmp("accepted_connection\n", buf->base, nread) == 0) {
212 /* Remote server has accepted a connection. Close the channel. */
213 ASSERT(0 == uv_pipe_pending_count(pipe));
214 ASSERT(pending == UV_UNKNOWN_HANDLE);
215 remote_conn_accepted = 1;
216 uv_close((uv_handle_t*)&channel, NULL);
217 }
218
219 free(buf->base);
220}
221
222#ifdef _WIN32
223static void on_read_listen_after_bound_twice(uv_stream_t* handle,
224 ssize_t nread,
225 const uv_buf_t* buf) {
226 int r;
227 uv_pipe_t* pipe;
228 uv_handle_type pending;
229
230 pipe = (uv_pipe_t*) handle;
231
232 if (nread == 0) {
233 /* Everything OK, but nothing read. */
234 free(buf->base);
235 return;
236 }
237
238 if (nread < 0) {
239 if (nread == UV_EOF) {
240 free(buf->base);
241 return;
242 }
243
244 printf("error recving on channel: %s\n", uv_strerror(nread));
245 abort();
246 }
247
248 fprintf(stderr, "got %d bytes\n", (int)nread);
249
250 ASSERT(uv_pipe_pending_count(pipe) > 0);
251 pending = uv_pipe_pending_type(pipe);
252 ASSERT(nread > 0 && buf->base && pending != UV_UNKNOWN_HANDLE);
253 read_cb_called++;
254
255 if (read_cb_called == 1) {
256 /* Accept the first TCP server, and start listening on it. */
257 ASSERT(pending == UV_TCP);
258 r = uv_tcp_init(uv_default_loop(), &tcp_server);
259 ASSERT(r == 0);
260
261 r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
262 ASSERT(r == 0);
263
264 r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, on_connection);
265 ASSERT(r == 0);
266 } else if (read_cb_called == 2) {
267 /* Accept the second TCP server, and start listening on it. */
268 ASSERT(pending == UV_TCP);
269 r = uv_tcp_init(uv_default_loop(), &tcp_server2);
270 ASSERT(r == 0);
271
272 r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server2);
273 ASSERT(r == 0);
274
275 r = uv_listen((uv_stream_t*)&tcp_server2, BACKLOG, on_connection);
276 ASSERT(r == UV_EADDRINUSE);
277
278 uv_close((uv_handle_t*)&tcp_server, NULL);
279 uv_close((uv_handle_t*)&tcp_server2, NULL);
280 ASSERT(0 == uv_pipe_pending_count(pipe));
281 uv_close((uv_handle_t*)&channel, NULL);
282 }
283
284 free(buf->base);
285}
286#endif
287
288void spawn_helper(uv_pipe_t* channel,
289 uv_process_t* process,
290 const char* helper) {
291 uv_process_options_t options;
292 size_t exepath_size;
293 char exepath[1024];
294 char* args[3];
295 int r;
296 uv_stdio_container_t stdio[3];
297
298 r = uv_pipe_init(uv_default_loop(), channel, 1);
299 ASSERT(r == 0);
300 ASSERT(channel->ipc);
301
302 exepath_size = sizeof(exepath);
303 r = uv_exepath(exepath, &exepath_size);
304 ASSERT(r == 0);
305
306 exepath[exepath_size] = '\0';
307 args[0] = exepath;
308 args[1] = (char*)helper;
309 args[2] = NULL;
310
311 memset(&options, 0, sizeof(options));
312 options.file = exepath;
313 options.args = args;
314 options.exit_cb = exit_cb;
315 options.stdio = stdio;
316 options.stdio_count = ARRAY_SIZE(stdio);
317
318 stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE;
319 stdio[0].data.stream = (uv_stream_t*) channel;
320 stdio[1].flags = UV_INHERIT_FD;
321 stdio[1].data.fd = 1;
322 stdio[2].flags = UV_INHERIT_FD;
323 stdio[2].data.fd = 2;
324
325 r = uv_spawn(uv_default_loop(), process, &options);
326 ASSERT(r == 0);
327}
328
329
330static void on_tcp_write(uv_write_t* req, int status) {
331 ASSERT(status == 0);
332 ASSERT(req->handle == (uv_stream_t*)&tcp_connection);
333 tcp_write_cb_called++;
334}
335
336
337static void on_read_alloc(uv_handle_t* handle,
338 size_t suggested_size,
339 uv_buf_t* buf) {
340 buf->base = malloc(suggested_size);
341 buf->len = suggested_size;
342}
343
344
345static void on_tcp_read(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) {
346 ASSERT(nread > 0);
347 ASSERT(memcmp("hello again\n", buf->base, nread) == 0);
348 ASSERT(tcp == (uv_stream_t*)&tcp_connection);
349 free(buf->base);
350
351 tcp_read_cb_called++;
352
353 uv_close((uv_handle_t*)tcp, NULL);
354 uv_close((uv_handle_t*)&channel, NULL);
355}
356
357
358static void on_read_connection(uv_stream_t* handle,
359 ssize_t nread,
360 const uv_buf_t* buf) {
361 int r;
362 uv_buf_t outbuf;
363 uv_pipe_t* pipe;
364 uv_handle_type pending;
365
366 pipe = (uv_pipe_t*) handle;
367 if (nread == 0) {
368 /* Everything OK, but nothing read. */
369 free(buf->base);
370 return;
371 }
372
373 if (nread < 0) {
374 if (nread == UV_EOF) {
375 free(buf->base);
376 return;
377 }
378
379 printf("error recving on channel: %s\n", uv_strerror(nread));
380 abort();
381 }
382
383 fprintf(stderr, "got %d bytes\n", (int)nread);
384
385 ASSERT(1 == uv_pipe_pending_count(pipe));
386 pending = uv_pipe_pending_type(pipe);
387
388 ASSERT(nread > 0 && buf->base && pending != UV_UNKNOWN_HANDLE);
389 read_cb_called++;
390
391 /* Accept the pending TCP connection */
392 ASSERT(pending == UV_TCP);
393 r = uv_tcp_init(uv_default_loop(), &tcp_connection);
394 ASSERT(r == 0);
395
396 r = uv_accept(handle, (uv_stream_t*)&tcp_connection);
397 ASSERT(r == 0);
398
399 /* Make sure that the expected data is correctly multiplexed. */
400 ASSERT(memcmp("hello\n", buf->base, nread) == 0);
401
402 /* Write/read to/from the connection */
403 outbuf = uv_buf_init("world\n", 6);
404 r = uv_write(&write_req, (uv_stream_t*)&tcp_connection, &outbuf, 1,
405 on_tcp_write);
406 ASSERT(r == 0);
407
408 r = uv_read_start((uv_stream_t*)&tcp_connection, on_read_alloc, on_tcp_read);
409 ASSERT(r == 0);
410
411 free(buf->base);
412}
413
414
415#ifndef _WIN32
416static void on_read_closed_handle(uv_stream_t* handle,
417 ssize_t nread,
418 const uv_buf_t* buf) {
419 if (nread == 0 || nread == UV_EOF) {
420 free(buf->base);
421 return;
422 }
423
424 if (nread < 0) {
425 printf("error recving on channel: %s\n", uv_strerror(nread));
426 abort();
427 }
428
429 closed_handle_data_read += nread;
430 free(buf->base);
431}
432#endif
433
434
435static void on_read_send_zero(uv_stream_t* handle,
436 ssize_t nread,
437 const uv_buf_t* buf) {
438 ASSERT(nread == 0 || nread == UV_EOF);
439 free(buf->base);
440}
441
442
443static int run_ipc_test(const char* helper, uv_read_cb read_cb) {
444 uv_process_t process;
445 int r;
446
447 spawn_helper(&channel, &process, helper);
448 uv_read_start((uv_stream_t*)&channel, on_alloc, read_cb);
449
450 r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
451 ASSERT(r == 0);
452
453 MAKE_VALGRIND_HAPPY();
454 return 0;
455}
456
457
458TEST_IMPL(ipc_listen_before_write) {
459#if defined(NO_SEND_HANDLE_ON_PIPE)
460 RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
461#endif
462 int r = run_ipc_test("ipc_helper_listen_before_write", on_read);
463 ASSERT(local_conn_accepted == 1);
464 ASSERT(remote_conn_accepted == 1);
465 ASSERT(read_cb_called == 1);
466 ASSERT(exit_cb_called == 1);
467 return r;
468}
469
470
471TEST_IMPL(ipc_listen_after_write) {
472#if defined(NO_SEND_HANDLE_ON_PIPE)
473 RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
474#endif
475 int r = run_ipc_test("ipc_helper_listen_after_write", on_read);
476 ASSERT(local_conn_accepted == 1);
477 ASSERT(remote_conn_accepted == 1);
478 ASSERT(read_cb_called == 1);
479 ASSERT(exit_cb_called == 1);
480 return r;
481}
482
483
484TEST_IMPL(ipc_tcp_connection) {
485#if defined(NO_SEND_HANDLE_ON_PIPE)
486 RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
487#endif
488 int r = run_ipc_test("ipc_helper_tcp_connection", on_read_connection);
489 ASSERT(read_cb_called == 1);
490 ASSERT(tcp_write_cb_called == 1);
491 ASSERT(tcp_read_cb_called == 1);
492 ASSERT(exit_cb_called == 1);
493 return r;
494}
495
496#ifndef _WIN32
497TEST_IMPL(ipc_closed_handle) {
498 int r;
499 r = run_ipc_test("ipc_helper_closed_handle", on_read_closed_handle);
500 ASSERT(r == 0);
501 return 0;
502}
503#endif
504
505
506#ifdef _WIN32
507TEST_IMPL(listen_with_simultaneous_accepts) {
508 uv_tcp_t server;
509 int r;
510 struct sockaddr_in addr;
511
512 ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
513
514 r = uv_tcp_init(uv_default_loop(), &server);
515 ASSERT(r == 0);
516
517 r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0);
518 ASSERT(r == 0);
519
520 r = uv_tcp_simultaneous_accepts(&server, 1);
521 ASSERT(r == 0);
522
523 r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL);
524 ASSERT(r == 0);
525 ASSERT(server.reqs_pending == 32);
526
527 MAKE_VALGRIND_HAPPY();
528 return 0;
529}
530
531
532TEST_IMPL(listen_no_simultaneous_accepts) {
533 uv_tcp_t server;
534 int r;
535 struct sockaddr_in addr;
536
537 ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
538
539 r = uv_tcp_init(uv_default_loop(), &server);
540 ASSERT(r == 0);
541
542 r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0);
543 ASSERT(r == 0);
544
545 r = uv_tcp_simultaneous_accepts(&server, 0);
546 ASSERT(r == 0);
547
548 r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL);
549 ASSERT(r == 0);
550 ASSERT(server.reqs_pending == 1);
551
552 MAKE_VALGRIND_HAPPY();
553 return 0;
554}
555
556TEST_IMPL(ipc_listen_after_bind_twice) {
557#if defined(NO_SEND_HANDLE_ON_PIPE)
558 RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
559#endif
560 int r = run_ipc_test("ipc_helper_bind_twice", on_read_listen_after_bound_twice);
561 ASSERT(read_cb_called == 2);
562 ASSERT(exit_cb_called == 1);
563 return r;
564}
565#endif
566
567TEST_IMPL(ipc_send_zero) {
568 int r;
569 r = run_ipc_test("ipc_helper_send_zero", on_read_send_zero);
570 ASSERT(r == 0);
571 return 0;
572}
573
574
575/* Everything here runs in a child process. */
576
577static tcp_conn conn;
578
579
580static void close_cb(uv_handle_t* handle) {
581 close_cb_called++;
582}
583
584
585static void conn_notify_write_cb(uv_write_t* req, int status) {
586 uv_close((uv_handle_t*)&tcp_server, close_cb);
587 uv_close((uv_handle_t*)&channel, close_cb);
588}
589
590
591static void tcp_connection_write_cb(uv_write_t* req, int status) {
592 ASSERT((uv_handle_t*)&conn.conn == (uv_handle_t*)req->handle);
593 uv_close((uv_handle_t*)req->handle, close_cb);
594 uv_close((uv_handle_t*)&channel, close_cb);
595 uv_close((uv_handle_t*)&tcp_server, close_cb);
596 tcp_conn_write_cb_called++;
597}
598
599
600static void closed_handle_large_write_cb(uv_write_t* req, int status) {
601 ASSERT(status == 0);
602 ASSERT(closed_handle_data_read = LARGE_SIZE);
603 if (++write_reqs_completed == ARRAY_SIZE(write_reqs)) {
604 write_reqs_completed = 0;
605 if (write_until_data_queued() > 0)
606 send_handle_and_close();
607 }
608}
609
610
611static void closed_handle_write_cb(uv_write_t* req, int status) {
612 ASSERT(status == UV_EBADF);
613 closed_handle_write = 1;
614}
615
616
617static void send_zero_write_cb(uv_write_t* req, int status) {
618 ASSERT(status == 0);
619 send_zero_write++;
620}
621
622static void on_tcp_child_process_read(uv_stream_t* tcp,
623 ssize_t nread,
624 const uv_buf_t* buf) {
625 uv_buf_t outbuf;
626 int r;
627
628 if (nread < 0) {
629 if (nread == UV_EOF) {
630 free(buf->base);
631 return;
632 }
633
634 printf("error recving on tcp connection: %s\n", uv_strerror(nread));
635 abort();
636 }
637
638 ASSERT(nread > 0);
639 ASSERT(memcmp("world\n", buf->base, nread) == 0);
640 on_pipe_read_called++;
641 free(buf->base);
642
643 /* Write to the socket */
644 outbuf = uv_buf_init("hello again\n", 12);
645 r = uv_write(&conn.tcp_write_req, tcp, &outbuf, 1, tcp_connection_write_cb);
646 ASSERT(r == 0);
647
648 tcp_conn_read_cb_called++;
649}
650
651
652static void connect_child_process_cb(uv_connect_t* req, int status) {
653 int r;
654
655 ASSERT(status == 0);
656 r = uv_read_start(req->handle, on_read_alloc, on_tcp_child_process_read);
657 ASSERT(r == 0);
658}
659
660
661static void ipc_on_connection(uv_stream_t* server, int status) {
662 int r;
663 uv_buf_t buf;
664
665 if (!connection_accepted) {
666 /*
667 * Accept the connection and close it. Also let the other
668 * side know.
669 */
670 ASSERT(status == 0);
671 ASSERT((uv_stream_t*)&tcp_server == server);
672
673 r = uv_tcp_init(server->loop, &conn.conn);
674 ASSERT(r == 0);
675
676 r = uv_accept(server, (uv_stream_t*)&conn.conn);
677 ASSERT(r == 0);
678
679 uv_close((uv_handle_t*)&conn.conn, close_cb);
680
681 buf = uv_buf_init("accepted_connection\n", 20);
682 r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
683 NULL, conn_notify_write_cb);
684 ASSERT(r == 0);
685
686 connection_accepted = 1;
687 }
688}
689
690
691static void ipc_on_connection_tcp_conn(uv_stream_t* server, int status) {
692 int r;
693 uv_buf_t buf;
694 uv_tcp_t* conn;
695
696 ASSERT(status == 0);
697 ASSERT((uv_stream_t*)&tcp_server == server);
698
699 conn = malloc(sizeof(*conn));
700 ASSERT(conn);
701
702 r = uv_tcp_init(server->loop, conn);
703 ASSERT(r == 0);
704
705 r = uv_accept(server, (uv_stream_t*)conn);
706 ASSERT(r == 0);
707
708 /* Send the accepted connection to the other process */
709 buf = uv_buf_init("hello\n", 6);
710 r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
711 (uv_stream_t*)conn, NULL);
712 ASSERT(r == 0);
713
714 r = uv_read_start((uv_stream_t*) conn,
715 on_read_alloc,
716 on_tcp_child_process_read);
717 ASSERT(r == 0);
718
719 uv_close((uv_handle_t*)conn, close_cb);
720}
721
722
723int ipc_helper(int listen_after_write) {
724 /*
725 * This is launched from test-ipc.c. stdin is a duplex channel that we
726 * over which a handle will be transmitted.
727 */
728 struct sockaddr_in addr;
729 int r;
730 uv_buf_t buf;
731
732 ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
733
734 r = uv_pipe_init(uv_default_loop(), &channel, 1);
735 ASSERT(r == 0);
736
737 uv_pipe_open(&channel, 0);
738
739 ASSERT(1 == uv_is_readable((uv_stream_t*) &channel));
740 ASSERT(1 == uv_is_writable((uv_stream_t*) &channel));
741 ASSERT(0 == uv_is_closing((uv_handle_t*) &channel));
742
743 r = uv_tcp_init(uv_default_loop(), &tcp_server);
744 ASSERT(r == 0);
745
746 r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
747 ASSERT(r == 0);
748
749 if (!listen_after_write) {
750 r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection);
751 ASSERT(r == 0);
752 }
753
754 buf = uv_buf_init("hello\n", 6);
755 r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
756 (uv_stream_t*)&tcp_server, NULL);
757 ASSERT(r == 0);
758
759 if (listen_after_write) {
760 r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection);
761 ASSERT(r == 0);
762 }
763
764 notify_parent_process();
765 r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
766 ASSERT(r == 0);
767
768 ASSERT(connection_accepted == 1);
769 ASSERT(close_cb_called == 3);
770
771 MAKE_VALGRIND_HAPPY();
772 return 0;
773}
774
775
776int ipc_helper_tcp_connection(void) {
777 /*
778 * This is launched from test-ipc.c. stdin is a duplex channel
779 * over which a handle will be transmitted.
780 */
781
782 int r;
783 struct sockaddr_in addr;
784
785 r = uv_pipe_init(uv_default_loop(), &channel, 1);
786 ASSERT(r == 0);
787
788 uv_pipe_open(&channel, 0);
789
790 ASSERT(1 == uv_is_readable((uv_stream_t*) &channel));
791 ASSERT(1 == uv_is_writable((uv_stream_t*) &channel));
792 ASSERT(0 == uv_is_closing((uv_handle_t*) &channel));
793
794 r = uv_tcp_init(uv_default_loop(), &tcp_server);
795 ASSERT(r == 0);
796
797 ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
798
799 r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
800 ASSERT(r == 0);
801
802 r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection_tcp_conn);
803 ASSERT(r == 0);
804
805 /* Make a connection to the server */
806 r = uv_tcp_init(uv_default_loop(), &conn.conn);
807 ASSERT(r == 0);
808
809 ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
810
811 r = uv_tcp_connect(&conn.conn_req,
812 (uv_tcp_t*) &conn.conn,
813 (const struct sockaddr*) &addr,
814 connect_child_process_cb);
815 ASSERT(r == 0);
816
817 r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
818 ASSERT(r == 0);
819
820 ASSERT(tcp_conn_read_cb_called == 1);
821 ASSERT(tcp_conn_write_cb_called == 1);
822 ASSERT(close_cb_called == 4);
823
824 MAKE_VALGRIND_HAPPY();
825 return 0;
826}
827
828static unsigned int write_until_data_queued() {
829 unsigned int i;
830 int r;
831
832 i = 0;
833 do {
834 r = uv_write(&write_reqs[i],
835 (uv_stream_t*)&channel,
836 &large_buf,
837 1,
838 closed_handle_large_write_cb);
839 ASSERT(r == 0);
840 i++;
841 } while (((uv_stream_t*)&channel)->write_queue_size == 0 &&
842 i < ARRAY_SIZE(write_reqs));
843
844 return ((uv_stream_t*)&channel)->write_queue_size;
845}
846
847static void send_handle_and_close() {
848 int r;
849 struct sockaddr_in addr;
850
851 r = uv_tcp_init(uv_default_loop(), &tcp_server);
852 ASSERT(r == 0);
853
854 ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
855
856 r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
857 ASSERT(r == 0);
858
859 r = uv_write2(&write_req,
860 (uv_stream_t*)&channel,
861 &large_buf,
862 1,
863 (uv_stream_t*)&tcp_server,
864 closed_handle_write_cb);
865 ASSERT(r == 0);
866
867 uv_close((uv_handle_t*)&tcp_server, NULL);
868}
869
870int ipc_helper_closed_handle(void) {
871 int r;
872
873 memset(buffer, '.', LARGE_SIZE);
874 large_buf = uv_buf_init(buffer, LARGE_SIZE);
875
876 r = uv_pipe_init(uv_default_loop(), &channel, 1);
877 ASSERT(r == 0);
878
879 uv_pipe_open(&channel, 0);
880
881 ASSERT(1 == uv_is_readable((uv_stream_t*) &channel));
882 ASSERT(1 == uv_is_writable((uv_stream_t*) &channel));
883 ASSERT(0 == uv_is_closing((uv_handle_t*) &channel));
884
885 if (write_until_data_queued() > 0)
886 send_handle_and_close();
887
888 r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
889 ASSERT(r == 0);
890
891 ASSERT(closed_handle_write == 1);
892
893 MAKE_VALGRIND_HAPPY();
894 return 0;
895}
896
897
898int ipc_helper_bind_twice(void) {
899 /*
900 * This is launched from test-ipc.c. stdin is a duplex channel
901 * over which two handles will be transmitted.
902 */
903 struct sockaddr_in addr;
904 int r;
905 uv_buf_t buf;
906
907 ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
908
909 r = uv_pipe_init(uv_default_loop(), &channel, 1);
910 ASSERT(r == 0);
911
912 uv_pipe_open(&channel, 0);
913
914 ASSERT(1 == uv_is_readable((uv_stream_t*) &channel));
915 ASSERT(1 == uv_is_writable((uv_stream_t*) &channel));
916 ASSERT(0 == uv_is_closing((uv_handle_t*) &channel));
917
918 buf = uv_buf_init("hello\n", 6);
919
920 r = uv_tcp_init(uv_default_loop(), &tcp_server);
921 ASSERT(r == 0);
922 r = uv_tcp_init(uv_default_loop(), &tcp_server2);
923 ASSERT(r == 0);
924
925 r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
926 ASSERT(r == 0);
927 r = uv_tcp_bind(&tcp_server2, (const struct sockaddr*) &addr, 0);
928 ASSERT(r == 0);
929
930 r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
931 (uv_stream_t*)&tcp_server, NULL);
932 ASSERT(r == 0);
933 r = uv_write2(&write_req2, (uv_stream_t*)&channel, &buf, 1,
934 (uv_stream_t*)&tcp_server2, NULL);
935 ASSERT(r == 0);
936
937 r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
938 ASSERT(r == 0);
939
940 MAKE_VALGRIND_HAPPY();
941 return 0;
942}
943
944int ipc_helper_send_zero(void) {
945 int r;
946 uv_buf_t zero_buf;
947
948 zero_buf = uv_buf_init(0, 0);
949
950 r = uv_pipe_init(uv_default_loop(), &channel, 0);
951 ASSERT(r == 0);
952
953 uv_pipe_open(&channel, 0);
954
955 ASSERT(1 == uv_is_readable((uv_stream_t*) &channel));
956 ASSERT(1 == uv_is_writable((uv_stream_t*) &channel));
957 ASSERT(0 == uv_is_closing((uv_handle_t*) &channel));
958
959 r = uv_write(&write_req,
960 (uv_stream_t*)&channel,
961 &zero_buf,
962 1,
963 send_zero_write_cb);
964
965 ASSERT(r == 0);
966
967 r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
968 ASSERT(r == 0);
969
970 ASSERT(send_zero_write == 1);
971
972 MAKE_VALGRIND_HAPPY();
973 return 0;
974}
975