Home | History | Annotate | Line # | Download | only in test
      1      1.1  christos /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
      2      1.1  christos  *
      3      1.1  christos  * Permission is hereby granted, free of charge, to any person obtaining a copy
      4      1.1  christos  * of this software and associated documentation files (the "Software"), to
      5      1.1  christos  * deal in the Software without restriction, including without limitation the
      6      1.1  christos  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
      7      1.1  christos  * sell copies of the Software, and to permit persons to whom the Software is
      8      1.1  christos  * furnished to do so, subject to the following conditions:
      9      1.1  christos  *
     10      1.1  christos  * The above copyright notice and this permission notice shall be included in
     11      1.1  christos  * all copies or substantial portions of the Software.
     12      1.1  christos  *
     13      1.1  christos  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     14      1.1  christos  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     15      1.1  christos  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     16      1.1  christos  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     17      1.1  christos  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
     18      1.1  christos  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
     19      1.1  christos  * IN THE SOFTWARE.
     20      1.1  christos  */
     21      1.1  christos 
     22      1.1  christos #include "uv.h"
     23      1.1  christos #include "task.h"
     24      1.1  christos 
     25      1.1  christos #include <stdio.h>
     26      1.1  christos #include <string.h>
     27      1.1  christos 
     28      1.1  christos /* See test-ipc.c */
     29      1.1  christos void spawn_helper(uv_pipe_t* channel,
     30      1.1  christos                   uv_process_t* process,
     31      1.1  christos                   const char* helper);
     32      1.1  christos 
     33      1.1  christos void ipc_send_recv_helper_threadproc(void* arg);
     34      1.1  christos 
     35      1.1  christos union handles {
     36      1.1  christos   uv_handle_t handle;
     37      1.1  christos   uv_stream_t stream;
     38      1.1  christos   uv_pipe_t pipe;
     39      1.1  christos   uv_tcp_t tcp;
     40      1.1  christos   uv_tty_t tty;
     41      1.1  christos };
     42      1.1  christos 
     43      1.1  christos struct test_ctx {
     44      1.1  christos   uv_pipe_t channel;
     45      1.1  christos   uv_connect_t connect_req;
     46      1.1  christos   uv_write_t write_req;
     47      1.1  christos   uv_write_t write_req2;
     48      1.1  christos   uv_handle_type expected_type;
     49      1.1  christos   union handles send;
     50      1.1  christos   union handles send2;
     51      1.1  christos   union handles recv;
     52      1.1  christos   union handles recv2;
     53      1.1  christos };
     54      1.1  christos 
     55      1.1  christos struct echo_ctx {
     56      1.1  christos   uv_pipe_t listen;
     57      1.1  christos   uv_pipe_t channel;
     58      1.1  christos   uv_write_t write_req;
     59      1.1  christos   uv_write_t write_req2;
     60      1.1  christos   uv_handle_type expected_type;
     61      1.1  christos   union handles recv;
     62      1.1  christos   union handles recv2;
     63      1.1  christos };
     64      1.1  christos 
     65      1.1  christos static struct test_ctx ctx;
     66      1.1  christos static struct echo_ctx ctx2;
     67      1.1  christos 
     68      1.1  christos /* Used in write2_cb to decide if we need to cleanup or not */
     69      1.1  christos static int is_child_process;
     70      1.1  christos static int is_in_process;
     71      1.1  christos static int read_cb_count;
     72      1.1  christos static int recv_cb_count;
     73      1.1  christos static int write2_cb_called;
     74      1.1  christos 
     75      1.1  christos 
     76      1.1  christos static void alloc_cb(uv_handle_t* handle,
     77      1.1  christos                      size_t suggested_size,
     78      1.1  christos                      uv_buf_t* buf) {
     79  1.1.1.3  christos   /* We're not actually reading anything so a small buffer is okay
     80  1.1.1.3  christos    * but it needs to be heap-allocated to appease TSan.
     81  1.1.1.3  christos    */
     82  1.1.1.3  christos   buf->len = 8;
     83  1.1.1.3  christos   buf->base = malloc(buf->len);
     84  1.1.1.3  christos   ASSERT_NOT_NULL(buf->base);
     85      1.1  christos }
     86      1.1  christos 
     87      1.1  christos 
     88      1.1  christos static void recv_cb(uv_stream_t* handle,
     89      1.1  christos                     ssize_t nread,
     90      1.1  christos                     const uv_buf_t* buf) {
     91      1.1  christos   uv_handle_type pending;
     92      1.1  christos   uv_pipe_t* pipe;
     93      1.1  christos   int r;
     94      1.1  christos   union handles* recv;
     95      1.1  christos 
     96  1.1.1.3  christos   free(buf->base);
     97  1.1.1.3  christos 
     98      1.1  christos   pipe = (uv_pipe_t*) handle;
     99  1.1.1.3  christos   ASSERT_PTR_EQ(pipe, &ctx.channel);
    100      1.1  christos 
    101      1.1  christos   do {
    102      1.1  christos     if (++recv_cb_count == 1) {
    103      1.1  christos       recv = &ctx.recv;
    104      1.1  christos     } else {
    105      1.1  christos       recv = &ctx.recv2;
    106      1.1  christos     }
    107      1.1  christos 
    108      1.1  christos     /* Depending on the OS, the final recv_cb can be called after
    109      1.1  christos      * the child process has terminated which can result in nread
    110      1.1  christos      * being UV_EOF instead of the number of bytes read.  Since
    111      1.1  christos      * the other end of the pipe has closed this UV_EOF is an
    112      1.1  christos      * acceptable value. */
    113      1.1  christos     if (nread == UV_EOF) {
    114      1.1  christos       /* UV_EOF is only acceptable for the final recv_cb call */
    115  1.1.1.3  christos       ASSERT_EQ(2, recv_cb_count);
    116      1.1  christos     } else {
    117  1.1.1.3  christos       ASSERT_GE(nread, 0);
    118  1.1.1.3  christos       ASSERT_GT(uv_pipe_pending_count(pipe), 0);
    119      1.1  christos 
    120      1.1  christos       pending = uv_pipe_pending_type(pipe);
    121  1.1.1.3  christos       ASSERT_EQ(pending, ctx.expected_type);
    122      1.1  christos 
    123      1.1  christos       if (pending == UV_NAMED_PIPE)
    124      1.1  christos         r = uv_pipe_init(ctx.channel.loop, &recv->pipe, 0);
    125      1.1  christos       else if (pending == UV_TCP)
    126      1.1  christos         r = uv_tcp_init(ctx.channel.loop, &recv->tcp);
    127      1.1  christos       else
    128      1.1  christos         abort();
    129  1.1.1.3  christos       ASSERT_OK(r);
    130      1.1  christos 
    131      1.1  christos       r = uv_accept(handle, &recv->stream);
    132  1.1.1.3  christos       ASSERT_OK(r);
    133      1.1  christos     }
    134      1.1  christos   } while (uv_pipe_pending_count(pipe) > 0);
    135      1.1  christos 
    136      1.1  christos   /* Close after two writes received */
    137      1.1  christos   if (recv_cb_count == 2) {
    138      1.1  christos     uv_close((uv_handle_t*)&ctx.channel, NULL);
    139      1.1  christos   }
    140      1.1  christos }
    141      1.1  christos 
    142      1.1  christos static void connect_cb(uv_connect_t* req, int status) {
    143      1.1  christos   int r;
    144      1.1  christos   uv_buf_t buf;
    145      1.1  christos 
    146  1.1.1.3  christos   ASSERT_PTR_EQ(req, &ctx.connect_req);
    147  1.1.1.3  christos   ASSERT_OK(status);
    148      1.1  christos 
    149      1.1  christos   buf = uv_buf_init(".", 1);
    150      1.1  christos   r = uv_write2(&ctx.write_req,
    151      1.1  christos                 (uv_stream_t*)&ctx.channel,
    152      1.1  christos                 &buf, 1,
    153      1.1  christos                 &ctx.send.stream,
    154      1.1  christos                 NULL);
    155  1.1.1.3  christos   ASSERT_OK(r);
    156      1.1  christos 
    157      1.1  christos   /* Perform two writes to the same pipe to make sure that on Windows we are
    158      1.1  christos    * not running into issue 505:
    159      1.1  christos    *   https://github.com/libuv/libuv/issues/505 */
    160      1.1  christos   buf = uv_buf_init(".", 1);
    161      1.1  christos   r = uv_write2(&ctx.write_req2,
    162      1.1  christos                 (uv_stream_t*)&ctx.channel,
    163      1.1  christos                 &buf, 1,
    164      1.1  christos                 &ctx.send2.stream,
    165      1.1  christos                 NULL);
    166  1.1.1.3  christos   ASSERT_OK(r);
    167      1.1  christos 
    168      1.1  christos   r = uv_read_start((uv_stream_t*)&ctx.channel, alloc_cb, recv_cb);
    169  1.1.1.3  christos   ASSERT_OK(r);
    170      1.1  christos }
    171      1.1  christos 
    172      1.1  christos static int run_test(int inprocess) {
    173      1.1  christos   uv_process_t process;
    174      1.1  christos   uv_thread_t tid;
    175      1.1  christos   int r;
    176      1.1  christos 
    177      1.1  christos   if (inprocess) {
    178      1.1  christos     r = uv_thread_create(&tid, ipc_send_recv_helper_threadproc, (void *) 42);
    179  1.1.1.3  christos     ASSERT_OK(r);
    180      1.1  christos 
    181      1.1  christos     uv_sleep(1000);
    182      1.1  christos 
    183      1.1  christos     r = uv_pipe_init(uv_default_loop(), &ctx.channel, 1);
    184  1.1.1.3  christos     ASSERT_OK(r);
    185      1.1  christos 
    186      1.1  christos     uv_pipe_connect(&ctx.connect_req, &ctx.channel, TEST_PIPENAME_3, connect_cb);
    187      1.1  christos   } else {
    188      1.1  christos     spawn_helper(&ctx.channel, &process, "ipc_send_recv_helper");
    189      1.1  christos 
    190      1.1  christos     connect_cb(&ctx.connect_req, 0);
    191      1.1  christos   }
    192      1.1  christos 
    193      1.1  christos   r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
    194  1.1.1.3  christos   ASSERT_OK(r);
    195      1.1  christos 
    196  1.1.1.3  christos   ASSERT_EQ(2, recv_cb_count);
    197      1.1  christos 
    198      1.1  christos   if (inprocess) {
    199      1.1  christos     r = uv_thread_join(&tid);
    200  1.1.1.3  christos     ASSERT_OK(r);
    201      1.1  christos   }
    202      1.1  christos 
    203      1.1  christos   return 0;
    204      1.1  christos }
    205      1.1  christos 
    206      1.1  christos static int run_ipc_send_recv_pipe(int inprocess) {
    207      1.1  christos   int r;
    208      1.1  christos 
    209      1.1  christos   ctx.expected_type = UV_NAMED_PIPE;
    210      1.1  christos 
    211      1.1  christos   r = uv_pipe_init(uv_default_loop(), &ctx.send.pipe, 1);
    212  1.1.1.3  christos   ASSERT_OK(r);
    213      1.1  christos 
    214      1.1  christos   r = uv_pipe_bind(&ctx.send.pipe, TEST_PIPENAME);
    215  1.1.1.3  christos   ASSERT_OK(r);
    216      1.1  christos 
    217      1.1  christos   r = uv_pipe_init(uv_default_loop(), &ctx.send2.pipe, 1);
    218  1.1.1.3  christos   ASSERT_OK(r);
    219      1.1  christos 
    220      1.1  christos   r = uv_pipe_bind(&ctx.send2.pipe, TEST_PIPENAME_2);
    221  1.1.1.3  christos   ASSERT_OK(r);
    222      1.1  christos 
    223      1.1  christos   r = run_test(inprocess);
    224  1.1.1.3  christos   ASSERT_OK(r);
    225      1.1  christos 
    226  1.1.1.3  christos   MAKE_VALGRIND_HAPPY(uv_default_loop());
    227      1.1  christos   return 0;
    228      1.1  christos }
    229      1.1  christos 
    230      1.1  christos TEST_IMPL(ipc_send_recv_pipe) {
    231      1.1  christos #if defined(NO_SEND_HANDLE_ON_PIPE)
    232      1.1  christos   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
    233      1.1  christos #endif
    234      1.1  christos   return run_ipc_send_recv_pipe(0);
    235      1.1  christos }
    236      1.1  christos 
    237      1.1  christos TEST_IMPL(ipc_send_recv_pipe_inprocess) {
    238      1.1  christos #if defined(NO_SEND_HANDLE_ON_PIPE)
    239      1.1  christos   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
    240      1.1  christos #endif
    241      1.1  christos   return run_ipc_send_recv_pipe(1);
    242      1.1  christos }
    243      1.1  christos 
    244      1.1  christos static int run_ipc_send_recv_tcp(int inprocess) {
    245      1.1  christos   struct sockaddr_in addr;
    246      1.1  christos   int r;
    247      1.1  christos 
    248  1.1.1.3  christos   ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
    249      1.1  christos 
    250      1.1  christos   ctx.expected_type = UV_TCP;
    251      1.1  christos 
    252      1.1  christos   r = uv_tcp_init(uv_default_loop(), &ctx.send.tcp);
    253  1.1.1.3  christos   ASSERT_OK(r);
    254      1.1  christos 
    255      1.1  christos   r = uv_tcp_init(uv_default_loop(), &ctx.send2.tcp);
    256  1.1.1.3  christos   ASSERT_OK(r);
    257      1.1  christos 
    258      1.1  christos   r = uv_tcp_bind(&ctx.send.tcp, (const struct sockaddr*) &addr, 0);
    259  1.1.1.3  christos   ASSERT_OK(r);
    260      1.1  christos 
    261      1.1  christos   r = uv_tcp_bind(&ctx.send2.tcp, (const struct sockaddr*) &addr, 0);
    262  1.1.1.3  christos   ASSERT_OK(r);
    263      1.1  christos 
    264      1.1  christos   r = run_test(inprocess);
    265  1.1.1.3  christos   ASSERT_OK(r);
    266      1.1  christos 
    267  1.1.1.3  christos   MAKE_VALGRIND_HAPPY(uv_default_loop());
    268      1.1  christos   return 0;
    269      1.1  christos }
    270      1.1  christos 
    271      1.1  christos TEST_IMPL(ipc_send_recv_tcp) {
    272      1.1  christos #if defined(NO_SEND_HANDLE_ON_PIPE)
    273      1.1  christos   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
    274      1.1  christos #endif
    275      1.1  christos   return run_ipc_send_recv_tcp(0);
    276      1.1  christos }
    277      1.1  christos 
    278      1.1  christos TEST_IMPL(ipc_send_recv_tcp_inprocess) {
    279      1.1  christos #if defined(NO_SEND_HANDLE_ON_PIPE)
    280      1.1  christos   RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
    281      1.1  christos #endif
    282      1.1  christos   return run_ipc_send_recv_tcp(1);
    283      1.1  christos }
    284      1.1  christos 
    285      1.1  christos 
    286      1.1  christos /* Everything here runs in a child process or second thread. */
    287      1.1  christos 
    288      1.1  christos static void write2_cb(uv_write_t* req, int status) {
    289  1.1.1.3  christos   ASSERT_OK(status);
    290      1.1  christos 
    291      1.1  christos   /* After two successful writes in the child process, allow the child
    292      1.1  christos    * process to be closed. */
    293      1.1  christos   if (++write2_cb_called == 2 && (is_child_process || is_in_process)) {
    294      1.1  christos     uv_close(&ctx2.recv.handle, NULL);
    295      1.1  christos     uv_close(&ctx2.recv2.handle, NULL);
    296      1.1  christos     uv_close((uv_handle_t*)&ctx2.channel, NULL);
    297      1.1  christos     uv_close((uv_handle_t*)&ctx2.listen, NULL);
    298      1.1  christos   }
    299      1.1  christos }
    300      1.1  christos 
    301      1.1  christos static void read_cb(uv_stream_t* handle,
    302      1.1  christos                     ssize_t nread,
    303      1.1  christos                     const uv_buf_t* rdbuf) {
    304      1.1  christos   uv_buf_t wrbuf;
    305      1.1  christos   uv_pipe_t* pipe;
    306      1.1  christos   uv_handle_type pending;
    307      1.1  christos   int r;
    308      1.1  christos   union handles* recv;
    309      1.1  christos   uv_write_t* write_req;
    310      1.1  christos 
    311  1.1.1.3  christos   free(rdbuf->base);
    312  1.1.1.3  christos 
    313      1.1  christos   if (nread == UV_EOF || nread == UV_ECONNABORTED) {
    314      1.1  christos     return;
    315      1.1  christos   }
    316      1.1  christos 
    317  1.1.1.2  christos   ASSERT_GE(nread, 0);
    318  1.1.1.2  christos 
    319      1.1  christos   pipe = (uv_pipe_t*) handle;
    320  1.1.1.3  christos   ASSERT_PTR_EQ(pipe, &ctx2.channel);
    321  1.1.1.2  christos 
    322  1.1.1.2  christos   while (uv_pipe_pending_count(pipe) > 0) {
    323      1.1  christos     if (++read_cb_count == 2) {
    324      1.1  christos       recv = &ctx2.recv;
    325      1.1  christos       write_req = &ctx2.write_req;
    326      1.1  christos     } else {
    327      1.1  christos       recv = &ctx2.recv2;
    328      1.1  christos       write_req = &ctx2.write_req2;
    329      1.1  christos     }
    330      1.1  christos 
    331      1.1  christos     pending = uv_pipe_pending_type(pipe);
    332      1.1  christos     ASSERT(pending == UV_NAMED_PIPE || pending == UV_TCP);
    333      1.1  christos 
    334      1.1  christos     if (pending == UV_NAMED_PIPE)
    335      1.1  christos       r = uv_pipe_init(ctx2.channel.loop, &recv->pipe, 0);
    336      1.1  christos     else if (pending == UV_TCP)
    337      1.1  christos       r = uv_tcp_init(ctx2.channel.loop, &recv->tcp);
    338      1.1  christos     else
    339      1.1  christos       abort();
    340  1.1.1.3  christos     ASSERT_OK(r);
    341      1.1  christos 
    342      1.1  christos     r = uv_accept(handle, &recv->stream);
    343  1.1.1.3  christos     ASSERT_OK(r);
    344      1.1  christos 
    345      1.1  christos     wrbuf = uv_buf_init(".", 1);
    346      1.1  christos     r = uv_write2(write_req,
    347      1.1  christos                   (uv_stream_t*)&ctx2.channel,
    348      1.1  christos                   &wrbuf,
    349      1.1  christos                   1,
    350      1.1  christos                   &recv->stream,
    351      1.1  christos                   write2_cb);
    352  1.1.1.3  christos     ASSERT_OK(r);
    353  1.1.1.2  christos   }
    354      1.1  christos }
    355      1.1  christos 
    356      1.1  christos static void send_recv_start(void) {
    357      1.1  christos   int r;
    358  1.1.1.3  christos   ASSERT_EQ(1, uv_is_readable((uv_stream_t*)&ctx2.channel));
    359  1.1.1.3  christos   ASSERT_EQ(1, uv_is_writable((uv_stream_t*)&ctx2.channel));
    360  1.1.1.3  christos   ASSERT_OK(uv_is_closing((uv_handle_t*)&ctx2.channel));
    361      1.1  christos 
    362      1.1  christos   r = uv_read_start((uv_stream_t*)&ctx2.channel, alloc_cb, read_cb);
    363  1.1.1.3  christos   ASSERT_OK(r);
    364      1.1  christos }
    365      1.1  christos 
    366      1.1  christos static void listen_cb(uv_stream_t* handle, int status) {
    367      1.1  christos   int r;
    368  1.1.1.3  christos   ASSERT_PTR_EQ(handle, (uv_stream_t*)&ctx2.listen);
    369  1.1.1.3  christos   ASSERT_OK(status);
    370      1.1  christos 
    371      1.1  christos   r = uv_accept((uv_stream_t*)&ctx2.listen, (uv_stream_t*)&ctx2.channel);
    372  1.1.1.3  christos   ASSERT_OK(r);
    373      1.1  christos 
    374      1.1  christos   send_recv_start();
    375      1.1  christos }
    376      1.1  christos 
    377      1.1  christos int run_ipc_send_recv_helper(uv_loop_t* loop, int inprocess) {
    378      1.1  christos   int r;
    379      1.1  christos 
    380      1.1  christos   is_in_process = inprocess;
    381      1.1  christos 
    382      1.1  christos   memset(&ctx2, 0, sizeof(ctx2));
    383      1.1  christos 
    384      1.1  christos   r = uv_pipe_init(loop, &ctx2.listen, 0);
    385  1.1.1.3  christos   ASSERT_OK(r);
    386      1.1  christos 
    387      1.1  christos   r = uv_pipe_init(loop, &ctx2.channel, 1);
    388  1.1.1.3  christos   ASSERT_OK(r);
    389      1.1  christos 
    390      1.1  christos   if (inprocess) {
    391      1.1  christos     r = uv_pipe_bind(&ctx2.listen, TEST_PIPENAME_3);
    392  1.1.1.3  christos     ASSERT_OK(r);
    393      1.1  christos 
    394      1.1  christos     r = uv_listen((uv_stream_t*)&ctx2.listen, SOMAXCONN, listen_cb);
    395  1.1.1.3  christos     ASSERT_OK(r);
    396      1.1  christos   } else {
    397      1.1  christos     r = uv_pipe_open(&ctx2.channel, 0);
    398  1.1.1.3  christos     ASSERT_OK(r);
    399      1.1  christos 
    400      1.1  christos     send_recv_start();
    401      1.1  christos   }
    402      1.1  christos 
    403      1.1  christos   notify_parent_process();
    404      1.1  christos   r = uv_run(loop, UV_RUN_DEFAULT);
    405  1.1.1.3  christos   ASSERT_OK(r);
    406      1.1  christos 
    407      1.1  christos   return 0;
    408      1.1  christos }
    409      1.1  christos 
    410      1.1  christos /* stdin is a duplex channel over which a handle is sent.
    411      1.1  christos  * We receive it and send it back where it came from.
    412      1.1  christos  */
    413      1.1  christos int ipc_send_recv_helper(void) {
    414      1.1  christos   int r;
    415      1.1  christos 
    416      1.1  christos   r = run_ipc_send_recv_helper(uv_default_loop(), 0);
    417  1.1.1.3  christos   ASSERT_OK(r);
    418      1.1  christos 
    419  1.1.1.3  christos   MAKE_VALGRIND_HAPPY(uv_default_loop());
    420      1.1  christos   return 0;
    421      1.1  christos }
    422      1.1  christos 
    423      1.1  christos void ipc_send_recv_helper_threadproc(void* arg) {
    424      1.1  christos   int r;
    425      1.1  christos   uv_loop_t loop;
    426      1.1  christos 
    427      1.1  christos   r = uv_loop_init(&loop);
    428  1.1.1.3  christos   ASSERT_OK(r);
    429      1.1  christos 
    430      1.1  christos   r = run_ipc_send_recv_helper(&loop, 1);
    431  1.1.1.3  christos   ASSERT_OK(r);
    432      1.1  christos 
    433      1.1  christos   r = uv_loop_close(&loop);
    434  1.1.1.3  christos   ASSERT_OK(r);
    435      1.1  christos }
    436