Home | History | Annotate | Line # | Download | only in test
      1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
      2  *
      3  * Permission is hereby granted, free of charge, to any person obtaining a copy
      4  * of this software and associated documentation files (the "Software"), to
      5  * deal in the Software without restriction, including without limitation the
      6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
      7  * sell copies of the Software, and to permit persons to whom the Software is
      8  * furnished to do so, subject to the following conditions:
      9  *
     10  * The above copyright notice and this permission notice shall be included in
     11  * all copies or substantial portions of the Software.
     12  *
     13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
     18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
     19  * IN THE SOFTWARE.
     20  */
     21 
     22 #include "uv.h"
     23 #include "task.h"
     24 
     25 #include <stdio.h>
     26 #include <string.h>
     27 
     28 static uv_pipe_t channel;
     29 static uv_tcp_t tcp_server;
     30 static uv_tcp_t tcp_server2;
     31 static uv_tcp_t tcp_connection;
     32 
     33 static int exit_cb_called;
     34 static int read_cb_called;
     35 static int tcp_write_cb_called;
     36 static int tcp_read_cb_called;
     37 static int on_pipe_read_called;
     38 static int local_conn_accepted;
     39 static int remote_conn_accepted;
     40 static int tcp_server_listening;
     41 static uv_write_t write_req;
     42 static uv_write_t write_req2;
     43 static uv_write_t conn_notify_req;
     44 static int close_cb_called;
     45 static int connection_accepted;
     46 static int tcp_conn_read_cb_called;
     47 static int tcp_conn_write_cb_called;
     48 static int send_zero_write;
     49 
     50 typedef struct {
     51   uv_connect_t conn_req;
     52   uv_write_t tcp_write_req;
     53   uv_tcp_t conn;
     54 } tcp_conn;
     55 
     56 #define CONN_COUNT 100
     57 #define BACKLOG 128
     58 
     59 
     60 static void close_server_conn_cb(uv_handle_t* handle) {
     61   free(handle);
     62 }
     63 
     64 
     65 static void on_connection(uv_stream_t* server, int status) {
     66   uv_tcp_t* conn;
     67   int r;
     68 
     69   if (!local_conn_accepted) {
     70     /* Accept the connection and close it.  Also and close the server. */
     71     ASSERT_OK(status);
     72     ASSERT_PTR_EQ(&tcp_server, server);
     73 
     74     conn = malloc(sizeof(*conn));
     75     ASSERT_NOT_NULL(conn);
     76     r = uv_tcp_init(server->loop, conn);
     77     ASSERT_OK(r);
     78 
     79     r = uv_accept(server, (uv_stream_t*)conn);
     80     ASSERT_OK(r);
     81 
     82     uv_close((uv_handle_t*)conn, close_server_conn_cb);
     83     uv_close((uv_handle_t*)server, NULL);
     84     local_conn_accepted = 1;
     85   }
     86 }
     87 
     88 
     89 static void exit_cb(uv_process_t* process,
     90                     int64_t exit_status,
     91                     int term_signal) {
     92   printf("exit_cb\n");
     93   exit_cb_called++;
     94   ASSERT_OK(exit_status);
     95   ASSERT_OK(term_signal);
     96   uv_close((uv_handle_t*)process, NULL);
     97 }
     98 
     99 
    100 static void on_alloc(uv_handle_t* handle,
    101                      size_t suggested_size,
    102                      uv_buf_t* buf) {
    103   buf->base = malloc(suggested_size);
    104   buf->len = suggested_size;
    105 }
    106 
    107 
    108 static void close_client_conn_cb(uv_handle_t* handle) {
    109   tcp_conn* p = (tcp_conn*)handle->data;
    110   free(p);
    111 }
    112 
    113 
    114 static void connect_cb(uv_connect_t* req, int status) {
    115   uv_close((uv_handle_t*)req->handle, close_client_conn_cb);
    116 }
    117 
    118 
    119 static void make_many_connections(void) {
    120   tcp_conn* conn;
    121   struct sockaddr_in addr;
    122   int r, i;
    123 
    124   for (i = 0; i < CONN_COUNT; i++) {
    125     conn = malloc(sizeof(*conn));
    126     ASSERT_NOT_NULL(conn);
    127 
    128     r = uv_tcp_init(uv_default_loop(), &conn->conn);
    129     ASSERT_OK(r);
    130     ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
    131 
    132     r = uv_tcp_connect(&conn->conn_req,
    133                        (uv_tcp_t*) &conn->conn,
    134                        (const struct sockaddr*) &addr,
    135                        connect_cb);
    136     ASSERT_OK(r);
    137 
    138     conn->conn.data = conn;
    139   }
    140 }
    141 
    142 
    143 static void on_read(uv_stream_t* handle,
    144                     ssize_t nread,
    145                     const uv_buf_t* buf) {
    146   int r;
    147   uv_pipe_t* pipe;
    148   uv_handle_type pending;
    149   uv_buf_t outbuf;
    150 
    151   pipe = (uv_pipe_t*) handle;
    152 
    153   if (nread == 0) {
    154     /* Everything OK, but nothing read. */
    155     free(buf->base);
    156     return;
    157   }
    158 
    159   if (nread < 0) {
    160     if (nread == UV_EOF) {
    161       free(buf->base);
    162       return;
    163     }
    164 
    165     printf("error recving on channel: %s\n", uv_strerror(nread));
    166     abort();
    167   }
    168 
    169   fprintf(stderr, "got %d bytes\n", (int)nread);
    170 
    171   pending = uv_pipe_pending_type(pipe);
    172   if (!tcp_server_listening) {
    173     ASSERT_EQ(1, uv_pipe_pending_count(pipe));
    174     ASSERT_GT(nread, 0);
    175     ASSERT_NOT_NULL(buf->base);
    176     ASSERT_NE(pending, UV_UNKNOWN_HANDLE);
    177     read_cb_called++;
    178 
    179     /* Accept the pending TCP server, and start listening on it. */
    180     ASSERT_EQ(pending, UV_TCP);
    181     r = uv_tcp_init(uv_default_loop(), &tcp_server);
    182     ASSERT_OK(r);
    183 
    184     r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
    185     ASSERT_OK(r);
    186 
    187     r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, on_connection);
    188     ASSERT_OK(r);
    189 
    190     tcp_server_listening = 1;
    191 
    192     /* Make sure that the expected data is correctly multiplexed. */
    193     ASSERT_MEM_EQ("hello\n", buf->base, nread);
    194 
    195     outbuf = uv_buf_init("foobar\n", 7);
    196     r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL);
    197     ASSERT_OK(r);
    198 
    199     /* Create a bunch of connections to get both servers to accept. */
    200     make_many_connections();
    201   } else if (memcmp("accepted_connection\n", buf->base, nread) == 0) {
    202     /* Remote server has accepted a connection.  Close the channel. */
    203     ASSERT_OK(uv_pipe_pending_count(pipe));
    204     ASSERT_EQ(pending, UV_UNKNOWN_HANDLE);
    205     remote_conn_accepted = 1;
    206     uv_close((uv_handle_t*)&channel, NULL);
    207   }
    208 
    209   free(buf->base);
    210 }
    211 
    212 #ifdef _WIN32
    213 static void on_read_listen_after_bound_twice(uv_stream_t* handle,
    214                                              ssize_t nread,
    215                                              const uv_buf_t* buf) {
    216   int r;
    217   uv_pipe_t* pipe;
    218   uv_handle_type pending;
    219 
    220   pipe = (uv_pipe_t*) handle;
    221 
    222   if (nread == 0) {
    223     /* Everything OK, but nothing read. */
    224     free(buf->base);
    225     return;
    226   }
    227 
    228   if (nread < 0) {
    229     if (nread == UV_EOF) {
    230       free(buf->base);
    231       return;
    232     }
    233 
    234     printf("error recving on channel: %s\n", uv_strerror(nread));
    235     abort();
    236   }
    237 
    238   fprintf(stderr, "got %d bytes\n", (int)nread);
    239 
    240   ASSERT_GT(uv_pipe_pending_count(pipe), 0);
    241   pending = uv_pipe_pending_type(pipe);
    242   ASSERT_GT(nread, 0);
    243   ASSERT_NOT_NULL(buf->base);
    244   ASSERT_NE(pending, UV_UNKNOWN_HANDLE);
    245   read_cb_called++;
    246 
    247   if (read_cb_called == 1) {
    248     /* Accept the first TCP server, and start listening on it. */
    249     ASSERT_EQ(pending, UV_TCP);
    250     r = uv_tcp_init(uv_default_loop(), &tcp_server);
    251     ASSERT_OK(r);
    252 
    253     r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
    254     ASSERT_OK(r);
    255 
    256     r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, on_connection);
    257     ASSERT_OK(r);
    258   } else if (read_cb_called == 2) {
    259     /* Accept the second TCP server, and start listening on it. */
    260     ASSERT_EQ(pending, UV_TCP);
    261     r = uv_tcp_init(uv_default_loop(), &tcp_server2);
    262     ASSERT_OK(r);
    263 
    264     r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server2);
    265     ASSERT_OK(r);
    266 
    267     r = uv_listen((uv_stream_t*)&tcp_server2, BACKLOG, on_connection);
    268     ASSERT_EQ(r, UV_EADDRINUSE);
    269 
    270     uv_close((uv_handle_t*)&tcp_server, NULL);
    271     uv_close((uv_handle_t*)&tcp_server2, NULL);
    272     ASSERT_OK(uv_pipe_pending_count(pipe));
    273     uv_close((uv_handle_t*)&channel, NULL);
    274   }
    275 
    276   free(buf->base);
    277 }
    278 #endif
    279 
    280 void spawn_helper(uv_pipe_t* channel,
    281                   uv_process_t* process,
    282                   const char* helper) {
    283   uv_process_options_t options;
    284   size_t exepath_size;
    285   char exepath[1024];
    286   char* args[3];
    287   int r;
    288   uv_stdio_container_t stdio[3];
    289 
    290   r = uv_pipe_init(uv_default_loop(), channel, 1);
    291   ASSERT_OK(r);
    292   ASSERT_NE(0, channel->ipc);
    293 
    294   exepath_size = sizeof(exepath);
    295   r = uv_exepath(exepath, &exepath_size);
    296   ASSERT_OK(r);
    297 
    298   exepath[exepath_size] = '\0';
    299   args[0] = exepath;
    300   args[1] = (char*)helper;
    301   args[2] = NULL;
    302 
    303   memset(&options, 0, sizeof(options));
    304   options.file = exepath;
    305   options.args = args;
    306   options.exit_cb = exit_cb;
    307   options.stdio = stdio;
    308   options.stdio_count = ARRAY_SIZE(stdio);
    309 
    310   stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE;
    311   stdio[0].data.stream = (uv_stream_t*) channel;
    312   stdio[1].flags = UV_INHERIT_FD;
    313   stdio[1].data.fd = 1;
    314   stdio[2].flags = UV_INHERIT_FD;
    315   stdio[2].data.fd = 2;
    316 
    317   r = uv_spawn(uv_default_loop(), process, &options);
    318   ASSERT_OK(r);
    319 }
    320 
    321 
    322 static void on_tcp_write(uv_write_t* req, int status) {
    323   ASSERT_OK(status);
    324   ASSERT_PTR_EQ(req->handle, &tcp_connection);
    325   tcp_write_cb_called++;
    326 }
    327 
    328 
    329 static void on_read_alloc(uv_handle_t* handle,
    330                           size_t suggested_size,
    331                           uv_buf_t* buf) {
    332   buf->base = malloc(suggested_size);
    333   buf->len = suggested_size;
    334 }
    335 
    336 
    337 static void on_tcp_read(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) {
    338   ASSERT_GT(nread, 0);
    339   ASSERT_MEM_EQ("hello again\n", buf->base, nread);
    340   ASSERT_PTR_EQ(tcp, &tcp_connection);
    341   free(buf->base);
    342 
    343   tcp_read_cb_called++;
    344 
    345   uv_close((uv_handle_t*)tcp, NULL);
    346   uv_close((uv_handle_t*)&channel, NULL);
    347 }
    348 
    349 
    350 static void on_read_connection(uv_stream_t* handle,
    351                                ssize_t nread,
    352                                const uv_buf_t* buf) {
    353   int r;
    354   uv_buf_t outbuf;
    355   uv_pipe_t* pipe;
    356   uv_handle_type pending;
    357 
    358   pipe = (uv_pipe_t*) handle;
    359   if (nread == 0) {
    360     /* Everything OK, but nothing read. */
    361     free(buf->base);
    362     return;
    363   }
    364 
    365   if (nread < 0) {
    366     if (nread == UV_EOF) {
    367       free(buf->base);
    368       return;
    369     }
    370 
    371     printf("error recving on channel: %s\n", uv_strerror(nread));
    372     abort();
    373   }
    374 
    375   fprintf(stderr, "got %d bytes\n", (int)nread);
    376 
    377   ASSERT_EQ(1, uv_pipe_pending_count(pipe));
    378   pending = uv_pipe_pending_type(pipe);
    379 
    380   ASSERT_GT(nread, 0);
    381   ASSERT_NOT_NULL(buf->base);
    382   ASSERT_NE(pending, UV_UNKNOWN_HANDLE);
    383   read_cb_called++;
    384 
    385   /* Accept the pending TCP connection */
    386   ASSERT_EQ(pending, UV_TCP);
    387   r = uv_tcp_init(uv_default_loop(), &tcp_connection);
    388   ASSERT_OK(r);
    389 
    390   r = uv_accept(handle, (uv_stream_t*)&tcp_connection);
    391   ASSERT_OK(r);
    392 
    393   /* Make sure that the expected data is correctly multiplexed. */
    394   ASSERT_MEM_EQ("hello\n", buf->base, nread);
    395 
    396   /* Write/read to/from the connection */
    397   outbuf = uv_buf_init("world\n", 6);
    398   r = uv_write(&write_req, (uv_stream_t*)&tcp_connection, &outbuf, 1,
    399     on_tcp_write);
    400   ASSERT_OK(r);
    401 
    402   r = uv_read_start((uv_stream_t*)&tcp_connection, on_read_alloc, on_tcp_read);
    403   ASSERT_OK(r);
    404 
    405   free(buf->base);
    406 }
    407 
    408 
    409 static void on_read_send_zero(uv_stream_t* handle,
    410                               ssize_t nread,
    411                               const uv_buf_t* buf) {
    412   ASSERT(nread == 0 || nread == UV_EOF);
    413   free(buf->base);
    414 }
    415 
    416 
    417 static int run_ipc_test(const char* helper, uv_read_cb read_cb) {
    418   uv_process_t process;
    419   int r;
    420 
    421   spawn_helper(&channel, &process, helper);
    422   uv_read_start((uv_stream_t*)&channel, on_alloc, read_cb);
    423 
    424   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
    425   ASSERT_OK(r);
    426 
    427   MAKE_VALGRIND_HAPPY(uv_default_loop());
    428   return 0;
    429 }
    430 
    431 
    432 TEST_IMPL(ipc_listen_before_write) {
    433 #if defined(NO_SEND_HANDLE_ON_PIPE)
    434   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
    435 #endif
    436   int r = run_ipc_test("ipc_helper_listen_before_write", on_read);
    437   ASSERT_EQ(1, local_conn_accepted);
    438   ASSERT_EQ(1, remote_conn_accepted);
    439   ASSERT_EQ(1, read_cb_called);
    440   ASSERT_EQ(1, exit_cb_called);
    441   return r;
    442 }
    443 
    444 
    445 TEST_IMPL(ipc_listen_after_write) {
    446 #if defined(NO_SEND_HANDLE_ON_PIPE)
    447   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
    448 #endif
    449   int r = run_ipc_test("ipc_helper_listen_after_write", on_read);
    450   ASSERT_EQ(1, local_conn_accepted);
    451   ASSERT_EQ(1, remote_conn_accepted);
    452   ASSERT_EQ(1, read_cb_called);
    453   ASSERT_EQ(1, exit_cb_called);
    454   return r;
    455 }
    456 
    457 
    458 TEST_IMPL(ipc_tcp_connection) {
    459 #if defined(NO_SEND_HANDLE_ON_PIPE)
    460   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
    461 #endif
    462   int r = run_ipc_test("ipc_helper_tcp_connection", on_read_connection);
    463   ASSERT_EQ(1, read_cb_called);
    464   ASSERT_EQ(1, tcp_write_cb_called);
    465   ASSERT_EQ(1, tcp_read_cb_called);
    466   ASSERT_EQ(1, exit_cb_called);
    467   return r;
    468 }
    469 
    470 
    471 #ifdef _WIN32
    472 TEST_IMPL(listen_with_simultaneous_accepts) {
    473   uv_tcp_t server;
    474   int r;
    475   struct sockaddr_in addr;
    476 
    477   ASSERT_OK(uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
    478 
    479   r = uv_tcp_init(uv_default_loop(), &server);
    480   ASSERT_OK(r);
    481 
    482   r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0);
    483   ASSERT_OK(r);
    484 
    485   r = uv_tcp_simultaneous_accepts(&server, 1);
    486   ASSERT_OK(r);
    487 
    488   r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL);
    489   ASSERT_OK(r);
    490   ASSERT_EQ(32, server.reqs_pending);
    491 
    492   MAKE_VALGRIND_HAPPY(uv_default_loop());
    493   return 0;
    494 }
    495 
    496 
    497 TEST_IMPL(listen_no_simultaneous_accepts) {
    498   uv_tcp_t server;
    499   int r;
    500   struct sockaddr_in addr;
    501 
    502   ASSERT_OK(uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
    503 
    504   r = uv_tcp_init(uv_default_loop(), &server);
    505   ASSERT_OK(r);
    506 
    507   r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0);
    508   ASSERT_OK(r);
    509 
    510   r = uv_tcp_simultaneous_accepts(&server, 0);
    511   ASSERT_OK(r);
    512 
    513   r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL);
    514   ASSERT_OK(r);
    515   ASSERT_EQ(1, server.reqs_pending);
    516 
    517   MAKE_VALGRIND_HAPPY(uv_default_loop());
    518   return 0;
    519 }
    520 
    521 TEST_IMPL(ipc_listen_after_bind_twice) {
    522 #if defined(NO_SEND_HANDLE_ON_PIPE)
    523   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
    524 #endif
    525   int r = run_ipc_test("ipc_helper_bind_twice", on_read_listen_after_bound_twice);
    526   ASSERT_EQ(2, read_cb_called);
    527   ASSERT_EQ(1, exit_cb_called);
    528   return r;
    529 }
    530 #endif
    531 
    532 TEST_IMPL(ipc_send_zero) {
    533   int r;
    534   r = run_ipc_test("ipc_helper_send_zero", on_read_send_zero);
    535   ASSERT_OK(r);
    536   return 0;
    537 }
    538 
    539 
    540 /* Everything here runs in a child process. */
    541 
    542 static tcp_conn conn;
    543 
    544 
    545 static void close_cb(uv_handle_t* handle) {
    546   close_cb_called++;
    547 }
    548 
    549 
    550 static void conn_notify_write_cb(uv_write_t* req, int status) {
    551   uv_close((uv_handle_t*)&tcp_server, close_cb);
    552   uv_close((uv_handle_t*)&channel, close_cb);
    553 }
    554 
    555 
    556 static void tcp_connection_write_cb(uv_write_t* req, int status) {
    557   ASSERT_PTR_EQ(&conn.conn, req->handle);
    558   uv_close((uv_handle_t*)req->handle, close_cb);
    559   uv_close((uv_handle_t*)&channel, close_cb);
    560   uv_close((uv_handle_t*)&tcp_server, close_cb);
    561   tcp_conn_write_cb_called++;
    562 }
    563 
    564 
    565 static void send_zero_write_cb(uv_write_t* req, int status) {
    566   ASSERT_OK(status);
    567   send_zero_write++;
    568 }
    569 
    570 static void on_tcp_child_process_read(uv_stream_t* tcp,
    571                                       ssize_t nread,
    572                                       const uv_buf_t* buf) {
    573   uv_buf_t outbuf;
    574   int r;
    575 
    576   if (nread < 0) {
    577     if (nread == UV_EOF) {
    578       free(buf->base);
    579       return;
    580     }
    581 
    582     printf("error recving on tcp connection: %s\n", uv_strerror(nread));
    583     abort();
    584   }
    585 
    586   ASSERT_GT(nread, 0);
    587   ASSERT_MEM_EQ("world\n", buf->base, nread);
    588   on_pipe_read_called++;
    589   free(buf->base);
    590 
    591   /* Write to the socket */
    592   outbuf = uv_buf_init("hello again\n", 12);
    593   r = uv_write(&conn.tcp_write_req, tcp, &outbuf, 1, tcp_connection_write_cb);
    594   ASSERT_OK(r);
    595 
    596   tcp_conn_read_cb_called++;
    597 }
    598 
    599 
    600 static void connect_child_process_cb(uv_connect_t* req, int status) {
    601   int r;
    602 
    603   ASSERT_OK(status);
    604   r = uv_read_start(req->handle, on_read_alloc, on_tcp_child_process_read);
    605   ASSERT_OK(r);
    606 }
    607 
    608 
    609 static void ipc_on_connection(uv_stream_t* server, int status) {
    610   int r;
    611   uv_buf_t buf;
    612 
    613   if (!connection_accepted) {
    614     /*
    615      * Accept the connection and close it.  Also let the other
    616      * side know.
    617      */
    618     ASSERT_OK(status);
    619     ASSERT_PTR_EQ(&tcp_server, server);
    620 
    621     r = uv_tcp_init(server->loop, &conn.conn);
    622     ASSERT_OK(r);
    623 
    624     r = uv_accept(server, (uv_stream_t*)&conn.conn);
    625     ASSERT_OK(r);
    626 
    627     uv_close((uv_handle_t*)&conn.conn, close_cb);
    628 
    629     buf = uv_buf_init("accepted_connection\n", 20);
    630     r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
    631       NULL, conn_notify_write_cb);
    632     ASSERT_OK(r);
    633 
    634     connection_accepted = 1;
    635   }
    636 }
    637 
    638 
    639 static void close_and_free_cb(uv_handle_t* handle) {
    640   close_cb_called++;
    641   free(handle);
    642 }
    643 
    644 static void ipc_on_connection_tcp_conn(uv_stream_t* server, int status) {
    645   int r;
    646   uv_buf_t buf;
    647   uv_tcp_t* conn;
    648 
    649   ASSERT_OK(status);
    650   ASSERT_PTR_EQ(&tcp_server, server);
    651 
    652   conn = malloc(sizeof(*conn));
    653   ASSERT_NOT_NULL(conn);
    654 
    655   r = uv_tcp_init(server->loop, conn);
    656   ASSERT_OK(r);
    657 
    658   r = uv_accept(server, (uv_stream_t*)conn);
    659   ASSERT_OK(r);
    660 
    661   /* Send the accepted connection to the other process */
    662   buf = uv_buf_init("hello\n", 6);
    663   r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
    664     (uv_stream_t*)conn, NULL);
    665   ASSERT_OK(r);
    666 
    667   r = uv_read_start((uv_stream_t*) conn,
    668                     on_read_alloc,
    669                     on_tcp_child_process_read);
    670   ASSERT_OK(r);
    671 
    672   uv_close((uv_handle_t*)conn, close_and_free_cb);
    673 }
    674 
    675 
    676 int ipc_helper(int listen_after_write) {
    677   /*
    678    * This is launched from test-ipc.c. stdin is a duplex channel that we
    679    * over which a handle will be transmitted.
    680    */
    681   struct sockaddr_in addr;
    682   int r;
    683   uv_buf_t buf;
    684 
    685   ASSERT_OK(uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
    686 
    687   r = uv_pipe_init(uv_default_loop(), &channel, 1);
    688   ASSERT_OK(r);
    689 
    690   uv_pipe_open(&channel, 0);
    691 
    692   ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
    693   ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
    694   ASSERT_OK(uv_is_closing((uv_handle_t*) &channel));
    695 
    696   r = uv_tcp_init(uv_default_loop(), &tcp_server);
    697   ASSERT_OK(r);
    698 
    699   r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
    700   ASSERT_OK(r);
    701 
    702   if (!listen_after_write) {
    703     r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection);
    704     ASSERT_OK(r);
    705   }
    706 
    707   buf = uv_buf_init("hello\n", 6);
    708   r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
    709       (uv_stream_t*)&tcp_server, NULL);
    710   ASSERT_OK(r);
    711 
    712   if (listen_after_write) {
    713     r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection);
    714     ASSERT_OK(r);
    715   }
    716 
    717   notify_parent_process();
    718   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
    719   ASSERT_OK(r);
    720 
    721   ASSERT_EQ(1, connection_accepted);
    722   ASSERT_EQ(3, close_cb_called);
    723 
    724   MAKE_VALGRIND_HAPPY(uv_default_loop());
    725   return 0;
    726 }
    727 
    728 
    729 int ipc_helper_tcp_connection(void) {
    730   /*
    731    * This is launched from test-ipc.c. stdin is a duplex channel
    732    * over which a handle will be transmitted.
    733    */
    734 
    735   int r;
    736   struct sockaddr_in addr;
    737 
    738   r = uv_pipe_init(uv_default_loop(), &channel, 1);
    739   ASSERT_OK(r);
    740 
    741   uv_pipe_open(&channel, 0);
    742 
    743   ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
    744   ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
    745   ASSERT_OK(uv_is_closing((uv_handle_t*) &channel));
    746 
    747   r = uv_tcp_init(uv_default_loop(), &tcp_server);
    748   ASSERT_OK(r);
    749 
    750   ASSERT_OK(uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
    751 
    752   r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
    753   ASSERT_OK(r);
    754 
    755   r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection_tcp_conn);
    756   ASSERT_OK(r);
    757 
    758   /* Make a connection to the server */
    759   r = uv_tcp_init(uv_default_loop(), &conn.conn);
    760   ASSERT_OK(r);
    761 
    762   ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
    763 
    764   r = uv_tcp_connect(&conn.conn_req,
    765                      (uv_tcp_t*) &conn.conn,
    766                      (const struct sockaddr*) &addr,
    767                      connect_child_process_cb);
    768   ASSERT_OK(r);
    769 
    770   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
    771   ASSERT_OK(r);
    772 
    773   ASSERT_EQ(1, tcp_conn_read_cb_called);
    774   ASSERT_EQ(1, tcp_conn_write_cb_called);
    775   ASSERT_EQ(4, close_cb_called);
    776 
    777   MAKE_VALGRIND_HAPPY(uv_default_loop());
    778   return 0;
    779 }
    780 
    781 int ipc_helper_bind_twice(void) {
    782   /*
    783    * This is launched from test-ipc.c. stdin is a duplex channel
    784    * over which two handles will be transmitted.
    785    */
    786   struct sockaddr_in addr;
    787   int r;
    788   uv_buf_t buf;
    789 
    790   ASSERT_OK(uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
    791 
    792   r = uv_pipe_init(uv_default_loop(), &channel, 1);
    793   ASSERT_OK(r);
    794 
    795   uv_pipe_open(&channel, 0);
    796 
    797   ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
    798   ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
    799   ASSERT_OK(uv_is_closing((uv_handle_t*) &channel));
    800 
    801   buf = uv_buf_init("hello\n", 6);
    802 
    803   r = uv_tcp_init(uv_default_loop(), &tcp_server);
    804   ASSERT_OK(r);
    805   r = uv_tcp_init(uv_default_loop(), &tcp_server2);
    806   ASSERT_OK(r);
    807 
    808   r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
    809   ASSERT_OK(r);
    810   r = uv_tcp_bind(&tcp_server2, (const struct sockaddr*) &addr, 0);
    811   ASSERT_OK(r);
    812 
    813   r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
    814                 (uv_stream_t*)&tcp_server, NULL);
    815   ASSERT_OK(r);
    816   r = uv_write2(&write_req2, (uv_stream_t*)&channel, &buf, 1,
    817                 (uv_stream_t*)&tcp_server2, NULL);
    818   ASSERT_OK(r);
    819 
    820   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
    821   ASSERT_OK(r);
    822 
    823   MAKE_VALGRIND_HAPPY(uv_default_loop());
    824   return 0;
    825 }
    826 
    827 int ipc_helper_send_zero(void) {
    828   int r;
    829   uv_buf_t zero_buf;
    830 
    831   zero_buf = uv_buf_init(0, 0);
    832 
    833   r = uv_pipe_init(uv_default_loop(), &channel, 0);
    834   ASSERT_OK(r);
    835 
    836   uv_pipe_open(&channel, 0);
    837 
    838   ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
    839   ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
    840   ASSERT_OK(uv_is_closing((uv_handle_t*) &channel));
    841 
    842   r = uv_write(&write_req,
    843                (uv_stream_t*)&channel,
    844                &zero_buf,
    845                1,
    846                send_zero_write_cb);
    847 
    848   ASSERT_OK(r);
    849 
    850   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
    851   ASSERT_OK(r);
    852 
    853   ASSERT_EQ(1, send_zero_write);
    854 
    855   MAKE_VALGRIND_HAPPY(uv_default_loop());
    856   return 0;
    857 }
    858