Home | History | Annotate | Line # | Download | only in test
test-ipc-send-recv.c revision 1.1
      1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
      2  *
      3  * Permission is hereby granted, free of charge, to any person obtaining a copy
      4  * of this software and associated documentation files (the "Software"), to
      5  * deal in the Software without restriction, including without limitation the
      6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
      7  * sell copies of the Software, and to permit persons to whom the Software is
      8  * furnished to do so, subject to the following conditions:
      9  *
     10  * The above copyright notice and this permission notice shall be included in
     11  * all copies or substantial portions of the Software.
     12  *
     13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
     18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
     19  * IN THE SOFTWARE.
     20  */
     21 
     22 #include "uv.h"
     23 #include "task.h"
     24 
     25 #include <stdio.h>
     26 #include <string.h>
     27 
     28 /* See test-ipc.c */
     29 void spawn_helper(uv_pipe_t* channel,
     30                   uv_process_t* process,
     31                   const char* helper);
     32 
     33 void ipc_send_recv_helper_threadproc(void* arg);
     34 
     35 union handles {
     36   uv_handle_t handle;
     37   uv_stream_t stream;
     38   uv_pipe_t pipe;
     39   uv_tcp_t tcp;
     40   uv_tty_t tty;
     41 };
     42 
     43 struct test_ctx {
     44   uv_pipe_t channel;
     45   uv_connect_t connect_req;
     46   uv_write_t write_req;
     47   uv_write_t write_req2;
     48   uv_handle_type expected_type;
     49   union handles send;
     50   union handles send2;
     51   union handles recv;
     52   union handles recv2;
     53 };
     54 
     55 struct echo_ctx {
     56   uv_pipe_t listen;
     57   uv_pipe_t channel;
     58   uv_write_t write_req;
     59   uv_write_t write_req2;
     60   uv_handle_type expected_type;
     61   union handles recv;
     62   union handles recv2;
     63 };
     64 
     65 static struct test_ctx ctx;
     66 static struct echo_ctx ctx2;
     67 
     68 /* Used in write2_cb to decide if we need to cleanup or not */
     69 static int is_child_process;
     70 static int is_in_process;
     71 static int read_cb_count;
     72 static int recv_cb_count;
     73 static int write2_cb_called;
     74 
     75 
     76 static void alloc_cb(uv_handle_t* handle,
     77                      size_t suggested_size,
     78                      uv_buf_t* buf) {
     79   /* we're not actually reading anything so a small buffer is okay */
     80   static char slab[8];
     81   buf->base = slab;
     82   buf->len = sizeof(slab);
     83 }
     84 
     85 
     86 static void recv_cb(uv_stream_t* handle,
     87                     ssize_t nread,
     88                     const uv_buf_t* buf) {
     89   uv_handle_type pending;
     90   uv_pipe_t* pipe;
     91   int r;
     92   union handles* recv;
     93 
     94   pipe = (uv_pipe_t*) handle;
     95   ASSERT(pipe == &ctx.channel);
     96 
     97   do {
     98     if (++recv_cb_count == 1) {
     99       recv = &ctx.recv;
    100     } else {
    101       recv = &ctx.recv2;
    102     }
    103 
    104     /* Depending on the OS, the final recv_cb can be called after
    105      * the child process has terminated which can result in nread
    106      * being UV_EOF instead of the number of bytes read.  Since
    107      * the other end of the pipe has closed this UV_EOF is an
    108      * acceptable value. */
    109     if (nread == UV_EOF) {
    110       /* UV_EOF is only acceptable for the final recv_cb call */
    111       ASSERT(recv_cb_count == 2);
    112     } else {
    113       ASSERT(nread >= 0);
    114       ASSERT(uv_pipe_pending_count(pipe) > 0);
    115 
    116       pending = uv_pipe_pending_type(pipe);
    117       ASSERT(pending == ctx.expected_type);
    118 
    119       if (pending == UV_NAMED_PIPE)
    120         r = uv_pipe_init(ctx.channel.loop, &recv->pipe, 0);
    121       else if (pending == UV_TCP)
    122         r = uv_tcp_init(ctx.channel.loop, &recv->tcp);
    123       else
    124         abort();
    125       ASSERT(r == 0);
    126 
    127       r = uv_accept(handle, &recv->stream);
    128       ASSERT(r == 0);
    129     }
    130   } while (uv_pipe_pending_count(pipe) > 0);
    131 
    132   /* Close after two writes received */
    133   if (recv_cb_count == 2) {
    134     uv_close((uv_handle_t*)&ctx.channel, NULL);
    135   }
    136 }
    137 
    138 static void connect_cb(uv_connect_t* req, int status) {
    139   int r;
    140   uv_buf_t buf;
    141 
    142   ASSERT(req == &ctx.connect_req);
    143   ASSERT(status == 0);
    144 
    145   buf = uv_buf_init(".", 1);
    146   r = uv_write2(&ctx.write_req,
    147                 (uv_stream_t*)&ctx.channel,
    148                 &buf, 1,
    149                 &ctx.send.stream,
    150                 NULL);
    151   ASSERT(r == 0);
    152 
    153   /* Perform two writes to the same pipe to make sure that on Windows we are
    154    * not running into issue 505:
    155    *   https://github.com/libuv/libuv/issues/505 */
    156   buf = uv_buf_init(".", 1);
    157   r = uv_write2(&ctx.write_req2,
    158                 (uv_stream_t*)&ctx.channel,
    159                 &buf, 1,
    160                 &ctx.send2.stream,
    161                 NULL);
    162   ASSERT(r == 0);
    163 
    164   r = uv_read_start((uv_stream_t*)&ctx.channel, alloc_cb, recv_cb);
    165   ASSERT(r == 0);
    166 }
    167 
    168 static int run_test(int inprocess) {
    169   uv_process_t process;
    170   uv_thread_t tid;
    171   int r;
    172 
    173   if (inprocess) {
    174     r = uv_thread_create(&tid, ipc_send_recv_helper_threadproc, (void *) 42);
    175     ASSERT(r == 0);
    176 
    177     uv_sleep(1000);
    178 
    179     r = uv_pipe_init(uv_default_loop(), &ctx.channel, 1);
    180     ASSERT(r == 0);
    181 
    182     uv_pipe_connect(&ctx.connect_req, &ctx.channel, TEST_PIPENAME_3, connect_cb);
    183   } else {
    184     spawn_helper(&ctx.channel, &process, "ipc_send_recv_helper");
    185 
    186     connect_cb(&ctx.connect_req, 0);
    187   }
    188 
    189   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
    190   ASSERT(r == 0);
    191 
    192   ASSERT(recv_cb_count == 2);
    193 
    194   if (inprocess) {
    195     r = uv_thread_join(&tid);
    196     ASSERT(r == 0);
    197   }
    198 
    199   return 0;
    200 }
    201 
    202 static int run_ipc_send_recv_pipe(int inprocess) {
    203   int r;
    204 
    205   ctx.expected_type = UV_NAMED_PIPE;
    206 
    207   r = uv_pipe_init(uv_default_loop(), &ctx.send.pipe, 1);
    208   ASSERT(r == 0);
    209 
    210   r = uv_pipe_bind(&ctx.send.pipe, TEST_PIPENAME);
    211   ASSERT(r == 0);
    212 
    213   r = uv_pipe_init(uv_default_loop(), &ctx.send2.pipe, 1);
    214   ASSERT(r == 0);
    215 
    216   r = uv_pipe_bind(&ctx.send2.pipe, TEST_PIPENAME_2);
    217   ASSERT(r == 0);
    218 
    219   r = run_test(inprocess);
    220   ASSERT(r == 0);
    221 
    222   MAKE_VALGRIND_HAPPY();
    223   return 0;
    224 }
    225 
    226 TEST_IMPL(ipc_send_recv_pipe) {
    227 #if defined(NO_SEND_HANDLE_ON_PIPE)
    228   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
    229 #endif
    230   return run_ipc_send_recv_pipe(0);
    231 }
    232 
    233 TEST_IMPL(ipc_send_recv_pipe_inprocess) {
    234 #if defined(NO_SEND_HANDLE_ON_PIPE)
    235   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
    236 #endif
    237   return run_ipc_send_recv_pipe(1);
    238 }
    239 
    240 static int run_ipc_send_recv_tcp(int inprocess) {
    241   struct sockaddr_in addr;
    242   int r;
    243 
    244   ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
    245 
    246   ctx.expected_type = UV_TCP;
    247 
    248   r = uv_tcp_init(uv_default_loop(), &ctx.send.tcp);
    249   ASSERT(r == 0);
    250 
    251   r = uv_tcp_init(uv_default_loop(), &ctx.send2.tcp);
    252   ASSERT(r == 0);
    253 
    254   r = uv_tcp_bind(&ctx.send.tcp, (const struct sockaddr*) &addr, 0);
    255   ASSERT(r == 0);
    256 
    257   r = uv_tcp_bind(&ctx.send2.tcp, (const struct sockaddr*) &addr, 0);
    258   ASSERT(r == 0);
    259 
    260   r = run_test(inprocess);
    261   ASSERT(r == 0);
    262 
    263   MAKE_VALGRIND_HAPPY();
    264   return 0;
    265 }
    266 
    267 TEST_IMPL(ipc_send_recv_tcp) {
    268 #if defined(NO_SEND_HANDLE_ON_PIPE)
    269   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
    270 #endif
    271   return run_ipc_send_recv_tcp(0);
    272 }
    273 
    274 TEST_IMPL(ipc_send_recv_tcp_inprocess) {
    275 #if defined(NO_SEND_HANDLE_ON_PIPE)
    276   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
    277 #endif
    278   return run_ipc_send_recv_tcp(1);
    279 }
    280 
    281 
    282 /* Everything here runs in a child process or second thread. */
    283 
    284 static void write2_cb(uv_write_t* req, int status) {
    285   ASSERT(status == 0);
    286 
    287   /* After two successful writes in the child process, allow the child
    288    * process to be closed. */
    289   if (++write2_cb_called == 2 && (is_child_process || is_in_process)) {
    290     uv_close(&ctx2.recv.handle, NULL);
    291     uv_close(&ctx2.recv2.handle, NULL);
    292     uv_close((uv_handle_t*)&ctx2.channel, NULL);
    293     uv_close((uv_handle_t*)&ctx2.listen, NULL);
    294   }
    295 }
    296 
    297 static void read_cb(uv_stream_t* handle,
    298                     ssize_t nread,
    299                     const uv_buf_t* rdbuf) {
    300   uv_buf_t wrbuf;
    301   uv_pipe_t* pipe;
    302   uv_handle_type pending;
    303   int r;
    304   union handles* recv;
    305   uv_write_t* write_req;
    306 
    307   if (nread == UV_EOF || nread == UV_ECONNABORTED) {
    308     return;
    309   }
    310 
    311   pipe = (uv_pipe_t*) handle;
    312   do {
    313     if (++read_cb_count == 2) {
    314       recv = &ctx2.recv;
    315       write_req = &ctx2.write_req;
    316     } else {
    317       recv = &ctx2.recv2;
    318       write_req = &ctx2.write_req2;
    319     }
    320 
    321     ASSERT(pipe == &ctx2.channel);
    322     ASSERT(nread >= 0);
    323     ASSERT(uv_pipe_pending_count(pipe) > 0);
    324 
    325     pending = uv_pipe_pending_type(pipe);
    326     ASSERT(pending == UV_NAMED_PIPE || pending == UV_TCP);
    327 
    328     if (pending == UV_NAMED_PIPE)
    329       r = uv_pipe_init(ctx2.channel.loop, &recv->pipe, 0);
    330     else if (pending == UV_TCP)
    331       r = uv_tcp_init(ctx2.channel.loop, &recv->tcp);
    332     else
    333       abort();
    334     ASSERT(r == 0);
    335 
    336     r = uv_accept(handle, &recv->stream);
    337     ASSERT(r == 0);
    338 
    339     wrbuf = uv_buf_init(".", 1);
    340     r = uv_write2(write_req,
    341                   (uv_stream_t*)&ctx2.channel,
    342                   &wrbuf,
    343                   1,
    344                   &recv->stream,
    345                   write2_cb);
    346     ASSERT(r == 0);
    347   } while (uv_pipe_pending_count(pipe) > 0);
    348 }
    349 
    350 static void send_recv_start(void) {
    351   int r;
    352   ASSERT(1 == uv_is_readable((uv_stream_t*)&ctx2.channel));
    353   ASSERT(1 == uv_is_writable((uv_stream_t*)&ctx2.channel));
    354   ASSERT(0 == uv_is_closing((uv_handle_t*)&ctx2.channel));
    355 
    356   r = uv_read_start((uv_stream_t*)&ctx2.channel, alloc_cb, read_cb);
    357   ASSERT(r == 0);
    358 }
    359 
    360 static void listen_cb(uv_stream_t* handle, int status) {
    361   int r;
    362   ASSERT(handle == (uv_stream_t*)&ctx2.listen);
    363   ASSERT(status == 0);
    364 
    365   r = uv_accept((uv_stream_t*)&ctx2.listen, (uv_stream_t*)&ctx2.channel);
    366   ASSERT(r == 0);
    367 
    368   send_recv_start();
    369 }
    370 
    371 int run_ipc_send_recv_helper(uv_loop_t* loop, int inprocess) {
    372   int r;
    373 
    374   is_in_process = inprocess;
    375 
    376   memset(&ctx2, 0, sizeof(ctx2));
    377 
    378   r = uv_pipe_init(loop, &ctx2.listen, 0);
    379   ASSERT(r == 0);
    380 
    381   r = uv_pipe_init(loop, &ctx2.channel, 1);
    382   ASSERT(r == 0);
    383 
    384   if (inprocess) {
    385     r = uv_pipe_bind(&ctx2.listen, TEST_PIPENAME_3);
    386     ASSERT(r == 0);
    387 
    388     r = uv_listen((uv_stream_t*)&ctx2.listen, SOMAXCONN, listen_cb);
    389     ASSERT(r == 0);
    390   } else {
    391     r = uv_pipe_open(&ctx2.channel, 0);
    392     ASSERT(r == 0);
    393 
    394     send_recv_start();
    395   }
    396 
    397   notify_parent_process();
    398   r = uv_run(loop, UV_RUN_DEFAULT);
    399   ASSERT(r == 0);
    400 
    401   return 0;
    402 }
    403 
    404 /* stdin is a duplex channel over which a handle is sent.
    405  * We receive it and send it back where it came from.
    406  */
    407 int ipc_send_recv_helper(void) {
    408   int r;
    409 
    410   r = run_ipc_send_recv_helper(uv_default_loop(), 0);
    411   ASSERT(r == 0);
    412 
    413   MAKE_VALGRIND_HAPPY();
    414   return 0;
    415 }
    416 
    417 void ipc_send_recv_helper_threadproc(void* arg) {
    418   int r;
    419   uv_loop_t loop;
    420 
    421   r = uv_loop_init(&loop);
    422   ASSERT(r == 0);
    423 
    424   r = run_ipc_send_recv_helper(&loop, 1);
    425   ASSERT(r == 0);
    426 
    427   r = uv_loop_close(&loop);
    428   ASSERT(r == 0);
    429 }
    430