Home | History | Annotate | Line # | Download | only in test
      1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
      2  *
      3  * Permission is hereby granted, free of charge, to any person obtaining a copy
      4  * of this software and associated documentation files (the "Software"), to
      5  * deal in the Software without restriction, including without limitation the
      6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
      7  * sell copies of the Software, and to permit persons to whom the Software is
      8  * furnished to do so, subject to the following conditions:
      9  *
     10  * The above copyright notice and this permission notice shall be included in
     11  * all copies or substantial portions of the Software.
     12  *
     13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
     18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
     19  * IN THE SOFTWARE.
     20  */
     21 
     22 #include "uv.h"
     23 #include "task.h"
     24 
     25 #ifdef _WIN32
     26 # define putenv _putenv
     27 #endif
     28 
     29 #define INIT_CANCEL_INFO(ci, what)                                            \
     30   do {                                                                        \
     31     (ci)->reqs = (what);                                                      \
     32     (ci)->nreqs = ARRAY_SIZE(what);                                           \
     33     (ci)->stride = sizeof((what)[0]);                                         \
     34   }                                                                           \
     35   while (0)
     36 
     37 struct cancel_info {
     38   void* reqs;
     39   unsigned nreqs;
     40   unsigned stride;
     41   uv_timer_t timer_handle;
     42 };
     43 
     44 struct random_info {
     45   uv_random_t random_req;
     46   char buf[1];
     47 };
     48 
     49 static unsigned fs_cb_called;
     50 static unsigned done_cb_called;
     51 static unsigned done2_cb_called;
     52 static unsigned timer_cb_called;
     53 static uv_work_t pause_reqs[4];
     54 static uv_sem_t pause_sems[ARRAY_SIZE(pause_reqs)];
     55 
     56 
     57 static void work_cb(uv_work_t* req) {
     58   uv_sem_wait(pause_sems + (req - pause_reqs));
     59 }
     60 
     61 
     62 static void done_cb(uv_work_t* req, int status) {
     63   uv_sem_destroy(pause_sems + (req - pause_reqs));
     64 }
     65 
     66 
     67 static void saturate_threadpool(void) {
     68   uv_loop_t* loop;
     69   char buf[64];
     70   size_t i;
     71 
     72   snprintf(buf,
     73            sizeof(buf),
     74            "UV_THREADPOOL_SIZE=%lu",
     75            (unsigned long)ARRAY_SIZE(pause_reqs));
     76   putenv(buf);
     77 
     78   loop = uv_default_loop();
     79   for (i = 0; i < ARRAY_SIZE(pause_reqs); i += 1) {
     80     ASSERT_OK(uv_sem_init(pause_sems + i, 0));
     81     ASSERT_OK(uv_queue_work(loop, pause_reqs + i, work_cb, done_cb));
     82   }
     83 }
     84 
     85 
     86 static void unblock_threadpool(void) {
     87   size_t i;
     88 
     89   for (i = 0; i < ARRAY_SIZE(pause_reqs); i += 1)
     90     uv_sem_post(pause_sems + i);
     91 }
     92 
     93 
     94 static int known_broken(uv_req_t* req) {
     95   if (req->type != UV_FS)
     96     return 0;
     97 
     98 #ifdef __linux__
     99   /* TODO(bnoordhuis) make cancellation work with io_uring */
    100   switch (((uv_fs_t*) req)->fs_type) {
    101     case UV_FS_CLOSE:
    102     case UV_FS_FDATASYNC:
    103     case UV_FS_FSTAT:
    104     case UV_FS_FSYNC:
    105     case UV_FS_LINK:
    106     case UV_FS_LSTAT:
    107     case UV_FS_MKDIR:
    108     case UV_FS_OPEN:
    109     case UV_FS_READ:
    110     case UV_FS_RENAME:
    111     case UV_FS_STAT:
    112     case UV_FS_SYMLINK:
    113     case UV_FS_WRITE:
    114     case UV_FS_UNLINK:
    115       return 1;
    116     default:  /* Squelch -Wswitch warnings. */
    117       break;
    118   }
    119 #endif
    120 
    121   return 0;
    122 }
    123 
    124 
    125 static void fs_cb(uv_fs_t* req) {
    126   ASSERT_NE(known_broken((uv_req_t*) req) || \
    127       req->result == UV_ECANCELED, 0);
    128   uv_fs_req_cleanup(req);
    129   fs_cb_called++;
    130 }
    131 
    132 
    133 static void getaddrinfo_cb(uv_getaddrinfo_t* req,
    134                            int status,
    135                            struct addrinfo* res) {
    136   ASSERT_EQ(status, UV_EAI_CANCELED);
    137   ASSERT_NULL(res);
    138   uv_freeaddrinfo(res);  /* Should not crash. */
    139 }
    140 
    141 
    142 static void getnameinfo_cb(uv_getnameinfo_t* handle,
    143                            int status,
    144                            const char* hostname,
    145                            const char* service) {
    146   ASSERT_EQ(status, UV_EAI_CANCELED);
    147   ASSERT_NULL(hostname);
    148   ASSERT_NULL(service);
    149 }
    150 
    151 
    152 static void work2_cb(uv_work_t* req) {
    153   ASSERT(0 && "work2_cb called");
    154 }
    155 
    156 
    157 static void done2_cb(uv_work_t* req, int status) {
    158   ASSERT_EQ(status, UV_ECANCELED);
    159   done2_cb_called++;
    160 }
    161 
    162 
    163 static void timer_cb(uv_timer_t* handle) {
    164   struct cancel_info* ci;
    165   uv_req_t* req;
    166   unsigned i;
    167 
    168   ci = container_of(handle, struct cancel_info, timer_handle);
    169 
    170   for (i = 0; i < ci->nreqs; i++) {
    171     req = (uv_req_t*) ((char*) ci->reqs + i * ci->stride);
    172     ASSERT(known_broken(req) || 0 == uv_cancel(req));
    173   }
    174 
    175   uv_close((uv_handle_t*) &ci->timer_handle, NULL);
    176   unblock_threadpool();
    177   timer_cb_called++;
    178 }
    179 
    180 
    181 static void nop_done_cb(uv_work_t* req, int status) {
    182   ASSERT_EQ(status, UV_ECANCELED);
    183   done_cb_called++;
    184 }
    185 
    186 
    187 static void nop_random_cb(uv_random_t* req, int status, void* buf, size_t len) {
    188   struct random_info* ri;
    189 
    190   ri = container_of(req, struct random_info, random_req);
    191 
    192   ASSERT_EQ(status, UV_ECANCELED);
    193   ASSERT_PTR_EQ(buf, (void*) ri->buf);
    194   ASSERT_EQ(len, sizeof(ri->buf));
    195 
    196   done_cb_called++;
    197 }
    198 
    199 
    200 TEST_IMPL(threadpool_cancel_getaddrinfo) {
    201   uv_getaddrinfo_t reqs[4];
    202   struct cancel_info ci;
    203   struct addrinfo hints;
    204   uv_loop_t* loop;
    205   int r;
    206 
    207   INIT_CANCEL_INFO(&ci, reqs);
    208   loop = uv_default_loop();
    209   saturate_threadpool();
    210 
    211   r = uv_getaddrinfo(loop, reqs + 0, getaddrinfo_cb, "fail", NULL, NULL);
    212   ASSERT_OK(r);
    213 
    214   r = uv_getaddrinfo(loop, reqs + 1, getaddrinfo_cb, NULL, "fail", NULL);
    215   ASSERT_OK(r);
    216 
    217   r = uv_getaddrinfo(loop, reqs + 2, getaddrinfo_cb, "fail", "fail", NULL);
    218   ASSERT_OK(r);
    219 
    220   r = uv_getaddrinfo(loop, reqs + 3, getaddrinfo_cb, "fail", NULL, &hints);
    221   ASSERT_OK(r);
    222 
    223   ASSERT_OK(uv_timer_init(loop, &ci.timer_handle));
    224   ASSERT_OK(uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
    225   ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
    226   ASSERT_EQ(1, timer_cb_called);
    227 
    228   MAKE_VALGRIND_HAPPY(loop);
    229   return 0;
    230 }
    231 
    232 
    233 TEST_IMPL(threadpool_cancel_getnameinfo) {
    234   uv_getnameinfo_t reqs[4];
    235   struct sockaddr_in addr4;
    236   struct cancel_info ci;
    237   uv_loop_t* loop;
    238   int r;
    239 
    240   r = uv_ip4_addr("127.0.0.1", 80, &addr4);
    241   ASSERT_OK(r);
    242 
    243   INIT_CANCEL_INFO(&ci, reqs);
    244   loop = uv_default_loop();
    245   saturate_threadpool();
    246 
    247   r = uv_getnameinfo(loop, reqs + 0, getnameinfo_cb, (const struct sockaddr*)&addr4, 0);
    248   ASSERT_OK(r);
    249 
    250   r = uv_getnameinfo(loop, reqs + 1, getnameinfo_cb, (const struct sockaddr*)&addr4, 0);
    251   ASSERT_OK(r);
    252 
    253   r = uv_getnameinfo(loop, reqs + 2, getnameinfo_cb, (const struct sockaddr*)&addr4, 0);
    254   ASSERT_OK(r);
    255 
    256   r = uv_getnameinfo(loop, reqs + 3, getnameinfo_cb, (const struct sockaddr*)&addr4, 0);
    257   ASSERT_OK(r);
    258 
    259   ASSERT_OK(uv_timer_init(loop, &ci.timer_handle));
    260   ASSERT_OK(uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
    261   ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
    262   ASSERT_EQ(1, timer_cb_called);
    263 
    264   MAKE_VALGRIND_HAPPY(loop);
    265   return 0;
    266 }
    267 
    268 
    269 TEST_IMPL(threadpool_cancel_random) {
    270   struct random_info req;
    271   uv_loop_t* loop;
    272 
    273   saturate_threadpool();
    274   loop = uv_default_loop();
    275   ASSERT_OK(uv_random(loop,
    276                       &req.random_req,
    277                       &req.buf,
    278                       sizeof(req.buf),
    279                       0,
    280                       nop_random_cb));
    281   ASSERT_OK(uv_cancel((uv_req_t*) &req));
    282   ASSERT_OK(done_cb_called);
    283   unblock_threadpool();
    284   ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
    285   ASSERT_EQ(1, done_cb_called);
    286 
    287   MAKE_VALGRIND_HAPPY(loop);
    288   return 0;
    289 }
    290 
    291 
    292 TEST_IMPL(threadpool_cancel_work) {
    293   struct cancel_info ci;
    294   uv_work_t reqs[16];
    295   uv_loop_t* loop;
    296   unsigned i;
    297 
    298   INIT_CANCEL_INFO(&ci, reqs);
    299   loop = uv_default_loop();
    300   saturate_threadpool();
    301 
    302   for (i = 0; i < ARRAY_SIZE(reqs); i++)
    303     ASSERT_OK(uv_queue_work(loop, reqs + i, work2_cb, done2_cb));
    304 
    305   ASSERT_OK(uv_timer_init(loop, &ci.timer_handle));
    306   ASSERT_OK(uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
    307   ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
    308   ASSERT_EQ(1, timer_cb_called);
    309   ASSERT_EQ(ARRAY_SIZE(reqs), done2_cb_called);
    310 
    311   MAKE_VALGRIND_HAPPY(loop);
    312   return 0;
    313 }
    314 
    315 
    316 TEST_IMPL(threadpool_cancel_fs) {
    317   struct cancel_info ci;
    318   uv_fs_t reqs[26];
    319   uv_loop_t* loop;
    320   unsigned n;
    321   uv_buf_t iov;
    322 
    323   INIT_CANCEL_INFO(&ci, reqs);
    324   loop = uv_default_loop();
    325   saturate_threadpool();
    326   iov = uv_buf_init(NULL, 0);
    327 
    328   /* Needs to match ARRAY_SIZE(fs_reqs). */
    329   n = 0;
    330   ASSERT_OK(uv_fs_chmod(loop, reqs + n++, "/", 0, fs_cb));
    331   ASSERT_OK(uv_fs_chown(loop, reqs + n++, "/", 0, 0, fs_cb));
    332   ASSERT_OK(uv_fs_close(loop, reqs + n++, 0, fs_cb));
    333   ASSERT_OK(uv_fs_fchmod(loop, reqs + n++, 0, 0, fs_cb));
    334   ASSERT_OK(uv_fs_fchown(loop, reqs + n++, 0, 0, 0, fs_cb));
    335   ASSERT_OK(uv_fs_fdatasync(loop, reqs + n++, 0, fs_cb));
    336   ASSERT_OK(uv_fs_fstat(loop, reqs + n++, 0, fs_cb));
    337   ASSERT_OK(uv_fs_fsync(loop, reqs + n++, 0, fs_cb));
    338   ASSERT_OK(uv_fs_ftruncate(loop, reqs + n++, 0, 0, fs_cb));
    339   ASSERT_OK(uv_fs_futime(loop, reqs + n++, 0, 0, 0, fs_cb));
    340   ASSERT_OK(uv_fs_link(loop, reqs + n++, "/", "/", fs_cb));
    341   ASSERT_OK(uv_fs_lstat(loop, reqs + n++, "/", fs_cb));
    342   ASSERT_OK(uv_fs_mkdir(loop, reqs + n++, "/", 0, fs_cb));
    343   ASSERT_OK(uv_fs_open(loop, reqs + n++, "/", 0, 0, fs_cb));
    344   ASSERT_OK(uv_fs_read(loop, reqs + n++, -1, &iov, 1, 0, fs_cb));
    345   ASSERT_OK(uv_fs_scandir(loop, reqs + n++, "/", 0, fs_cb));
    346   ASSERT_OK(uv_fs_readlink(loop, reqs + n++, "/", fs_cb));
    347   ASSERT_OK(uv_fs_realpath(loop, reqs + n++, "/", fs_cb));
    348   ASSERT_OK(uv_fs_rename(loop, reqs + n++, "/", "/", fs_cb));
    349   ASSERT_OK(uv_fs_mkdir(loop, reqs + n++, "/", 0, fs_cb));
    350   ASSERT_OK(uv_fs_sendfile(loop, reqs + n++, 0, 0, 0, 0, fs_cb));
    351   ASSERT_OK(uv_fs_stat(loop, reqs + n++, "/", fs_cb));
    352   ASSERT_OK(uv_fs_symlink(loop, reqs + n++, "/", "/", 0, fs_cb));
    353   ASSERT_OK(uv_fs_unlink(loop, reqs + n++, "/", fs_cb));
    354   ASSERT_OK(uv_fs_utime(loop, reqs + n++, "/", 0, 0, fs_cb));
    355   ASSERT_OK(uv_fs_write(loop, reqs + n++, -1, &iov, 1, 0, fs_cb));
    356   ASSERT_EQ(n, ARRAY_SIZE(reqs));
    357 
    358   ASSERT_OK(uv_timer_init(loop, &ci.timer_handle));
    359   ASSERT_OK(uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
    360   ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
    361   ASSERT_EQ(n, fs_cb_called);
    362   ASSERT_EQ(1, timer_cb_called);
    363 
    364 
    365   MAKE_VALGRIND_HAPPY(loop);
    366   return 0;
    367 }
    368 
    369 
    370 TEST_IMPL(threadpool_cancel_single) {
    371   uv_loop_t* loop;
    372   uv_work_t req;
    373 
    374   saturate_threadpool();
    375   loop = uv_default_loop();
    376   ASSERT_OK(uv_queue_work(loop, &req, (uv_work_cb) abort, nop_done_cb));
    377   ASSERT_OK(uv_cancel((uv_req_t*) &req));
    378   ASSERT_OK(done_cb_called);
    379   unblock_threadpool();
    380   ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT));
    381   ASSERT_EQ(1, done_cb_called);
    382 
    383   MAKE_VALGRIND_HAPPY(loop);
    384   return 0;
    385 }
    386 
    387 
    388 static void after_busy_cb(uv_work_t* req, int status) {
    389   ASSERT_OK(status);
    390   done_cb_called++;
    391 }
    392 
    393 static void busy_cb(uv_work_t* req) {
    394   uv_sem_post((uv_sem_t*) req->data);
    395   /* Assume that calling uv_cancel() takes less than 10ms. */
    396   uv_sleep(10);
    397 }
    398 
    399 TEST_IMPL(threadpool_cancel_when_busy) {
    400   uv_sem_t sem_lock;
    401   uv_work_t req;
    402 
    403   req.data = &sem_lock;
    404 
    405   ASSERT_OK(uv_sem_init(&sem_lock, 0));
    406   ASSERT_OK(uv_queue_work(uv_default_loop(), &req, busy_cb, after_busy_cb));
    407 
    408   uv_sem_wait(&sem_lock);
    409 
    410   ASSERT_EQ(uv_cancel((uv_req_t*) &req), UV_EBUSY);
    411   ASSERT_OK(uv_run(uv_default_loop(), UV_RUN_DEFAULT));
    412   ASSERT_EQ(1, done_cb_called);
    413 
    414   uv_sem_destroy(&sem_lock);
    415 
    416   MAKE_VALGRIND_HAPPY(uv_default_loop());
    417   return 0;
    418 }
    419