1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 * 3 * Permission is hereby granted, free of charge, to any person obtaining a copy 4 * of this software and associated documentation files (the "Software"), to 5 * deal in the Software without restriction, including without limitation the 6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 7 * sell copies of the Software, and to permit persons to whom the Software is 8 * furnished to do so, subject to the following conditions: 9 * 10 * The above copyright notice and this permission notice shall be included in 11 * all copies or substantial portions of the Software. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19 * IN THE SOFTWARE. 20 */ 21 22 #include "uv.h" 23 #include "task.h" 24 25 #ifdef _WIN32 26 # define putenv _putenv 27 #endif 28 29 #define INIT_CANCEL_INFO(ci, what) \ 30 do { \ 31 (ci)->reqs = (what); \ 32 (ci)->nreqs = ARRAY_SIZE(what); \ 33 (ci)->stride = sizeof((what)[0]); \ 34 } \ 35 while (0) 36 37 struct cancel_info { 38 void* reqs; 39 unsigned nreqs; 40 unsigned stride; 41 uv_timer_t timer_handle; 42 }; 43 44 struct random_info { 45 uv_random_t random_req; 46 char buf[1]; 47 }; 48 49 static unsigned fs_cb_called; 50 static unsigned done_cb_called; 51 static unsigned done2_cb_called; 52 static unsigned timer_cb_called; 53 static uv_work_t pause_reqs[4]; 54 static uv_sem_t pause_sems[ARRAY_SIZE(pause_reqs)]; 55 56 57 static void work_cb(uv_work_t* req) { 58 uv_sem_wait(pause_sems + (req - pause_reqs)); 59 } 60 61 62 static void done_cb(uv_work_t* req, int status) { 63 uv_sem_destroy(pause_sems + (req - pause_reqs)); 64 } 65 66 67 static void saturate_threadpool(void) { 68 uv_loop_t* loop; 69 char buf[64]; 70 size_t i; 71 72 snprintf(buf, 73 sizeof(buf), 74 "UV_THREADPOOL_SIZE=%lu", 75 (unsigned long)ARRAY_SIZE(pause_reqs)); 76 putenv(buf); 77 78 loop = uv_default_loop(); 79 for (i = 0; i < ARRAY_SIZE(pause_reqs); i += 1) { 80 ASSERT_OK(uv_sem_init(pause_sems + i, 0)); 81 ASSERT_OK(uv_queue_work(loop, pause_reqs + i, work_cb, done_cb)); 82 } 83 } 84 85 86 static void unblock_threadpool(void) { 87 size_t i; 88 89 for (i = 0; i < ARRAY_SIZE(pause_reqs); i += 1) 90 uv_sem_post(pause_sems + i); 91 } 92 93 94 static int known_broken(uv_req_t* req) { 95 if (req->type != UV_FS) 96 return 0; 97 98 #ifdef __linux__ 99 /* TODO(bnoordhuis) make cancellation work with io_uring */ 100 switch (((uv_fs_t*) req)->fs_type) { 101 case UV_FS_CLOSE: 102 case UV_FS_FDATASYNC: 103 case UV_FS_FSTAT: 104 case UV_FS_FSYNC: 105 case UV_FS_LINK: 106 case UV_FS_LSTAT: 107 case UV_FS_MKDIR: 108 case UV_FS_OPEN: 109 case UV_FS_READ: 110 case UV_FS_RENAME: 111 case UV_FS_STAT: 112 case UV_FS_SYMLINK: 113 case UV_FS_WRITE: 114 case UV_FS_UNLINK: 115 return 1; 116 default: /* Squelch -Wswitch warnings. */ 117 break; 118 } 119 #endif 120 121 return 0; 122 } 123 124 125 static void fs_cb(uv_fs_t* req) { 126 ASSERT_NE(known_broken((uv_req_t*) req) || \ 127 req->result == UV_ECANCELED, 0); 128 uv_fs_req_cleanup(req); 129 fs_cb_called++; 130 } 131 132 133 static void getaddrinfo_cb(uv_getaddrinfo_t* req, 134 int status, 135 struct addrinfo* res) { 136 ASSERT_EQ(status, UV_EAI_CANCELED); 137 ASSERT_NULL(res); 138 uv_freeaddrinfo(res); /* Should not crash. */ 139 } 140 141 142 static void getnameinfo_cb(uv_getnameinfo_t* handle, 143 int status, 144 const char* hostname, 145 const char* service) { 146 ASSERT_EQ(status, UV_EAI_CANCELED); 147 ASSERT_NULL(hostname); 148 ASSERT_NULL(service); 149 } 150 151 152 static void work2_cb(uv_work_t* req) { 153 ASSERT(0 && "work2_cb called"); 154 } 155 156 157 static void done2_cb(uv_work_t* req, int status) { 158 ASSERT_EQ(status, UV_ECANCELED); 159 done2_cb_called++; 160 } 161 162 163 static void timer_cb(uv_timer_t* handle) { 164 struct cancel_info* ci; 165 uv_req_t* req; 166 unsigned i; 167 168 ci = container_of(handle, struct cancel_info, timer_handle); 169 170 for (i = 0; i < ci->nreqs; i++) { 171 req = (uv_req_t*) ((char*) ci->reqs + i * ci->stride); 172 ASSERT(known_broken(req) || 0 == uv_cancel(req)); 173 } 174 175 uv_close((uv_handle_t*) &ci->timer_handle, NULL); 176 unblock_threadpool(); 177 timer_cb_called++; 178 } 179 180 181 static void nop_done_cb(uv_work_t* req, int status) { 182 ASSERT_EQ(status, UV_ECANCELED); 183 done_cb_called++; 184 } 185 186 187 static void nop_random_cb(uv_random_t* req, int status, void* buf, size_t len) { 188 struct random_info* ri; 189 190 ri = container_of(req, struct random_info, random_req); 191 192 ASSERT_EQ(status, UV_ECANCELED); 193 ASSERT_PTR_EQ(buf, (void*) ri->buf); 194 ASSERT_EQ(len, sizeof(ri->buf)); 195 196 done_cb_called++; 197 } 198 199 200 TEST_IMPL(threadpool_cancel_getaddrinfo) { 201 uv_getaddrinfo_t reqs[4]; 202 struct cancel_info ci; 203 struct addrinfo hints; 204 uv_loop_t* loop; 205 int r; 206 207 INIT_CANCEL_INFO(&ci, reqs); 208 loop = uv_default_loop(); 209 saturate_threadpool(); 210 211 r = uv_getaddrinfo(loop, reqs + 0, getaddrinfo_cb, "fail", NULL, NULL); 212 ASSERT_OK(r); 213 214 r = uv_getaddrinfo(loop, reqs + 1, getaddrinfo_cb, NULL, "fail", NULL); 215 ASSERT_OK(r); 216 217 r = uv_getaddrinfo(loop, reqs + 2, getaddrinfo_cb, "fail", "fail", NULL); 218 ASSERT_OK(r); 219 220 r = uv_getaddrinfo(loop, reqs + 3, getaddrinfo_cb, "fail", NULL, &hints); 221 ASSERT_OK(r); 222 223 ASSERT_OK(uv_timer_init(loop, &ci.timer_handle)); 224 ASSERT_OK(uv_timer_start(&ci.timer_handle, timer_cb, 10, 0)); 225 ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT)); 226 ASSERT_EQ(1, timer_cb_called); 227 228 MAKE_VALGRIND_HAPPY(loop); 229 return 0; 230 } 231 232 233 TEST_IMPL(threadpool_cancel_getnameinfo) { 234 uv_getnameinfo_t reqs[4]; 235 struct sockaddr_in addr4; 236 struct cancel_info ci; 237 uv_loop_t* loop; 238 int r; 239 240 r = uv_ip4_addr("127.0.0.1", 80, &addr4); 241 ASSERT_OK(r); 242 243 INIT_CANCEL_INFO(&ci, reqs); 244 loop = uv_default_loop(); 245 saturate_threadpool(); 246 247 r = uv_getnameinfo(loop, reqs + 0, getnameinfo_cb, (const struct sockaddr*)&addr4, 0); 248 ASSERT_OK(r); 249 250 r = uv_getnameinfo(loop, reqs + 1, getnameinfo_cb, (const struct sockaddr*)&addr4, 0); 251 ASSERT_OK(r); 252 253 r = uv_getnameinfo(loop, reqs + 2, getnameinfo_cb, (const struct sockaddr*)&addr4, 0); 254 ASSERT_OK(r); 255 256 r = uv_getnameinfo(loop, reqs + 3, getnameinfo_cb, (const struct sockaddr*)&addr4, 0); 257 ASSERT_OK(r); 258 259 ASSERT_OK(uv_timer_init(loop, &ci.timer_handle)); 260 ASSERT_OK(uv_timer_start(&ci.timer_handle, timer_cb, 10, 0)); 261 ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT)); 262 ASSERT_EQ(1, timer_cb_called); 263 264 MAKE_VALGRIND_HAPPY(loop); 265 return 0; 266 } 267 268 269 TEST_IMPL(threadpool_cancel_random) { 270 struct random_info req; 271 uv_loop_t* loop; 272 273 saturate_threadpool(); 274 loop = uv_default_loop(); 275 ASSERT_OK(uv_random(loop, 276 &req.random_req, 277 &req.buf, 278 sizeof(req.buf), 279 0, 280 nop_random_cb)); 281 ASSERT_OK(uv_cancel((uv_req_t*) &req)); 282 ASSERT_OK(done_cb_called); 283 unblock_threadpool(); 284 ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT)); 285 ASSERT_EQ(1, done_cb_called); 286 287 MAKE_VALGRIND_HAPPY(loop); 288 return 0; 289 } 290 291 292 TEST_IMPL(threadpool_cancel_work) { 293 struct cancel_info ci; 294 uv_work_t reqs[16]; 295 uv_loop_t* loop; 296 unsigned i; 297 298 INIT_CANCEL_INFO(&ci, reqs); 299 loop = uv_default_loop(); 300 saturate_threadpool(); 301 302 for (i = 0; i < ARRAY_SIZE(reqs); i++) 303 ASSERT_OK(uv_queue_work(loop, reqs + i, work2_cb, done2_cb)); 304 305 ASSERT_OK(uv_timer_init(loop, &ci.timer_handle)); 306 ASSERT_OK(uv_timer_start(&ci.timer_handle, timer_cb, 10, 0)); 307 ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT)); 308 ASSERT_EQ(1, timer_cb_called); 309 ASSERT_EQ(ARRAY_SIZE(reqs), done2_cb_called); 310 311 MAKE_VALGRIND_HAPPY(loop); 312 return 0; 313 } 314 315 316 TEST_IMPL(threadpool_cancel_fs) { 317 struct cancel_info ci; 318 uv_fs_t reqs[26]; 319 uv_loop_t* loop; 320 unsigned n; 321 uv_buf_t iov; 322 323 INIT_CANCEL_INFO(&ci, reqs); 324 loop = uv_default_loop(); 325 saturate_threadpool(); 326 iov = uv_buf_init(NULL, 0); 327 328 /* Needs to match ARRAY_SIZE(fs_reqs). */ 329 n = 0; 330 ASSERT_OK(uv_fs_chmod(loop, reqs + n++, "/", 0, fs_cb)); 331 ASSERT_OK(uv_fs_chown(loop, reqs + n++, "/", 0, 0, fs_cb)); 332 ASSERT_OK(uv_fs_close(loop, reqs + n++, 0, fs_cb)); 333 ASSERT_OK(uv_fs_fchmod(loop, reqs + n++, 0, 0, fs_cb)); 334 ASSERT_OK(uv_fs_fchown(loop, reqs + n++, 0, 0, 0, fs_cb)); 335 ASSERT_OK(uv_fs_fdatasync(loop, reqs + n++, 0, fs_cb)); 336 ASSERT_OK(uv_fs_fstat(loop, reqs + n++, 0, fs_cb)); 337 ASSERT_OK(uv_fs_fsync(loop, reqs + n++, 0, fs_cb)); 338 ASSERT_OK(uv_fs_ftruncate(loop, reqs + n++, 0, 0, fs_cb)); 339 ASSERT_OK(uv_fs_futime(loop, reqs + n++, 0, 0, 0, fs_cb)); 340 ASSERT_OK(uv_fs_link(loop, reqs + n++, "/", "/", fs_cb)); 341 ASSERT_OK(uv_fs_lstat(loop, reqs + n++, "/", fs_cb)); 342 ASSERT_OK(uv_fs_mkdir(loop, reqs + n++, "/", 0, fs_cb)); 343 ASSERT_OK(uv_fs_open(loop, reqs + n++, "/", 0, 0, fs_cb)); 344 ASSERT_OK(uv_fs_read(loop, reqs + n++, -1, &iov, 1, 0, fs_cb)); 345 ASSERT_OK(uv_fs_scandir(loop, reqs + n++, "/", 0, fs_cb)); 346 ASSERT_OK(uv_fs_readlink(loop, reqs + n++, "/", fs_cb)); 347 ASSERT_OK(uv_fs_realpath(loop, reqs + n++, "/", fs_cb)); 348 ASSERT_OK(uv_fs_rename(loop, reqs + n++, "/", "/", fs_cb)); 349 ASSERT_OK(uv_fs_mkdir(loop, reqs + n++, "/", 0, fs_cb)); 350 ASSERT_OK(uv_fs_sendfile(loop, reqs + n++, 0, 0, 0, 0, fs_cb)); 351 ASSERT_OK(uv_fs_stat(loop, reqs + n++, "/", fs_cb)); 352 ASSERT_OK(uv_fs_symlink(loop, reqs + n++, "/", "/", 0, fs_cb)); 353 ASSERT_OK(uv_fs_unlink(loop, reqs + n++, "/", fs_cb)); 354 ASSERT_OK(uv_fs_utime(loop, reqs + n++, "/", 0, 0, fs_cb)); 355 ASSERT_OK(uv_fs_write(loop, reqs + n++, -1, &iov, 1, 0, fs_cb)); 356 ASSERT_EQ(n, ARRAY_SIZE(reqs)); 357 358 ASSERT_OK(uv_timer_init(loop, &ci.timer_handle)); 359 ASSERT_OK(uv_timer_start(&ci.timer_handle, timer_cb, 10, 0)); 360 ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT)); 361 ASSERT_EQ(n, fs_cb_called); 362 ASSERT_EQ(1, timer_cb_called); 363 364 365 MAKE_VALGRIND_HAPPY(loop); 366 return 0; 367 } 368 369 370 TEST_IMPL(threadpool_cancel_single) { 371 uv_loop_t* loop; 372 uv_work_t req; 373 374 saturate_threadpool(); 375 loop = uv_default_loop(); 376 ASSERT_OK(uv_queue_work(loop, &req, (uv_work_cb) abort, nop_done_cb)); 377 ASSERT_OK(uv_cancel((uv_req_t*) &req)); 378 ASSERT_OK(done_cb_called); 379 unblock_threadpool(); 380 ASSERT_OK(uv_run(loop, UV_RUN_DEFAULT)); 381 ASSERT_EQ(1, done_cb_called); 382 383 MAKE_VALGRIND_HAPPY(loop); 384 return 0; 385 } 386 387 388 static void after_busy_cb(uv_work_t* req, int status) { 389 ASSERT_OK(status); 390 done_cb_called++; 391 } 392 393 static void busy_cb(uv_work_t* req) { 394 uv_sem_post((uv_sem_t*) req->data); 395 /* Assume that calling uv_cancel() takes less than 10ms. */ 396 uv_sleep(10); 397 } 398 399 TEST_IMPL(threadpool_cancel_when_busy) { 400 uv_sem_t sem_lock; 401 uv_work_t req; 402 403 req.data = &sem_lock; 404 405 ASSERT_OK(uv_sem_init(&sem_lock, 0)); 406 ASSERT_OK(uv_queue_work(uv_default_loop(), &req, busy_cb, after_busy_cb)); 407 408 uv_sem_wait(&sem_lock); 409 410 ASSERT_EQ(uv_cancel((uv_req_t*) &req), UV_EBUSY); 411 ASSERT_OK(uv_run(uv_default_loop(), UV_RUN_DEFAULT)); 412 ASSERT_EQ(1, done_cb_called); 413 414 uv_sem_destroy(&sem_lock); 415 416 MAKE_VALGRIND_HAPPY(uv_default_loop()); 417 return 0; 418 } 419