1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 * 3 * Permission is hereby granted, free of charge, to any person obtaining a copy 4 * of this software and associated documentation files (the "Software"), to 5 * deal in the Software without restriction, including without limitation the 6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 7 * sell copies of the Software, and to permit persons to whom the Software is 8 * furnished to do so, subject to the following conditions: 9 * 10 * The above copyright notice and this permission notice shall be included in 11 * all copies or substantial portions of the Software. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19 * IN THE SOFTWARE. 20 */ 21 22 #include <assert.h> 23 #include <errno.h> 24 #include <limits.h> 25 #include <stdio.h> 26 #include <stdlib.h> 27 #include <string.h> 28 #if defined(_MSC_VER) || defined(__MINGW64_VERSION_MAJOR) 29 #include <crtdbg.h> 30 #endif 31 32 #include "uv.h" 33 #include "internal.h" 34 #include "queue.h" 35 #include "handle-inl.h" 36 #include "heap-inl.h" 37 #include "req-inl.h" 38 39 /* uv_once initialization guards */ 40 static uv_once_t uv_init_guard_ = UV_ONCE_INIT; 41 42 43 #if defined(_DEBUG) && (defined(_MSC_VER) || defined(__MINGW64_VERSION_MAJOR)) 44 /* Our crt debug report handler allows us to temporarily disable asserts 45 * just for the current thread. 46 */ 47 48 UV_THREAD_LOCAL int uv__crt_assert_enabled = TRUE; 49 50 static int uv__crt_dbg_report_handler(int report_type, char *message, int *ret_val) { 51 if (uv__crt_assert_enabled || report_type != _CRT_ASSERT) 52 return FALSE; 53 54 if (ret_val) { 55 /* Set ret_val to 0 to continue with normal execution. 56 * Set ret_val to 1 to trigger a breakpoint. 57 */ 58 59 if(IsDebuggerPresent()) 60 *ret_val = 1; 61 else 62 *ret_val = 0; 63 } 64 65 /* Don't call _CrtDbgReport. */ 66 return TRUE; 67 } 68 #else 69 UV_THREAD_LOCAL int uv__crt_assert_enabled = FALSE; 70 #endif 71 72 73 #if !defined(__MINGW32__) || __MSVCRT_VERSION__ >= 0x800 74 static void uv__crt_invalid_parameter_handler(const wchar_t* expression, 75 const wchar_t* function, const wchar_t * file, unsigned int line, 76 uintptr_t reserved) { 77 /* No-op. */ 78 } 79 #endif 80 81 static uv_loop_t** uv__loops; 82 static int uv__loops_size; 83 static int uv__loops_capacity; 84 #define UV__LOOPS_CHUNK_SIZE 8 85 static uv_mutex_t uv__loops_lock; 86 87 88 static void uv__loops_init(void) { 89 uv_mutex_init(&uv__loops_lock); 90 } 91 92 93 static int uv__loops_add(uv_loop_t* loop) { 94 uv_loop_t** new_loops; 95 int new_capacity, i; 96 97 uv_mutex_lock(&uv__loops_lock); 98 99 if (uv__loops_size == uv__loops_capacity) { 100 new_capacity = uv__loops_capacity + UV__LOOPS_CHUNK_SIZE; 101 new_loops = uv__realloc(uv__loops, sizeof(uv_loop_t*) * new_capacity); 102 if (!new_loops) 103 goto failed_loops_realloc; 104 uv__loops = new_loops; 105 for (i = uv__loops_capacity; i < new_capacity; ++i) 106 uv__loops[i] = NULL; 107 uv__loops_capacity = new_capacity; 108 } 109 uv__loops[uv__loops_size] = loop; 110 ++uv__loops_size; 111 112 uv_mutex_unlock(&uv__loops_lock); 113 return 0; 114 115 failed_loops_realloc: 116 uv_mutex_unlock(&uv__loops_lock); 117 return UV_ENOMEM; 118 } 119 120 121 static void uv__loops_remove(uv_loop_t* loop) { 122 int loop_index; 123 int smaller_capacity; 124 uv_loop_t** new_loops; 125 126 uv_mutex_lock(&uv__loops_lock); 127 128 for (loop_index = 0; loop_index < uv__loops_size; ++loop_index) { 129 if (uv__loops[loop_index] == loop) 130 break; 131 } 132 /* If loop was not found, ignore */ 133 if (loop_index == uv__loops_size) 134 goto loop_removed; 135 136 uv__loops[loop_index] = uv__loops[uv__loops_size - 1]; 137 uv__loops[uv__loops_size - 1] = NULL; 138 --uv__loops_size; 139 140 if (uv__loops_size == 0) { 141 uv__loops_capacity = 0; 142 uv__free(uv__loops); 143 uv__loops = NULL; 144 goto loop_removed; 145 } 146 147 /* If we didn't grow to big skip downsizing */ 148 if (uv__loops_capacity < 4 * UV__LOOPS_CHUNK_SIZE) 149 goto loop_removed; 150 151 /* Downsize only if more than half of buffer is free */ 152 smaller_capacity = uv__loops_capacity / 2; 153 if (uv__loops_size >= smaller_capacity) 154 goto loop_removed; 155 new_loops = uv__realloc(uv__loops, sizeof(uv_loop_t*) * smaller_capacity); 156 if (!new_loops) 157 goto loop_removed; 158 uv__loops = new_loops; 159 uv__loops_capacity = smaller_capacity; 160 161 loop_removed: 162 uv_mutex_unlock(&uv__loops_lock); 163 } 164 165 void uv__wake_all_loops(void) { 166 int i; 167 uv_loop_t* loop; 168 169 uv_mutex_lock(&uv__loops_lock); 170 for (i = 0; i < uv__loops_size; ++i) { 171 loop = uv__loops[i]; 172 assert(loop); 173 if (loop->iocp != INVALID_HANDLE_VALUE) 174 PostQueuedCompletionStatus(loop->iocp, 0, 0, NULL); 175 } 176 uv_mutex_unlock(&uv__loops_lock); 177 } 178 179 static void uv__init(void) { 180 /* Tell Windows that we will handle critical errors. */ 181 SetErrorMode(SEM_FAILCRITICALERRORS | SEM_NOGPFAULTERRORBOX | 182 SEM_NOOPENFILEERRORBOX); 183 184 /* Tell the CRT to not exit the application when an invalid parameter is 185 * passed. The main issue is that invalid FDs will trigger this behavior. 186 */ 187 #if !defined(__MINGW32__) || __MSVCRT_VERSION__ >= 0x800 188 _set_invalid_parameter_handler(uv__crt_invalid_parameter_handler); 189 #endif 190 191 /* We also need to setup our debug report handler because some CRT 192 * functions (eg _get_osfhandle) raise an assert when called with invalid 193 * FDs even though they return the proper error code in the release build. 194 */ 195 #if defined(_DEBUG) && (defined(_MSC_VER) || defined(__MINGW64_VERSION_MAJOR)) 196 _CrtSetReportHook(uv__crt_dbg_report_handler); 197 #endif 198 199 /* Initialize tracking of all uv loops */ 200 uv__loops_init(); 201 202 /* Fetch winapi function pointers. This must be done first because other 203 * initialization code might need these function pointers to be loaded. 204 */ 205 uv__winapi_init(); 206 207 /* Initialize winsock */ 208 uv__winsock_init(); 209 210 /* Initialize FS */ 211 uv__fs_init(); 212 213 /* Initialize signal stuff */ 214 uv__signals_init(); 215 216 /* Initialize console */ 217 uv__console_init(); 218 219 /* Initialize utilities */ 220 uv__util_init(); 221 222 /* Initialize system wakeup detection */ 223 uv__init_detect_system_wakeup(); 224 } 225 226 227 int uv_loop_init(uv_loop_t* loop) { 228 uv__loop_internal_fields_t* lfields; 229 struct heap* timer_heap; 230 int err; 231 232 /* Initialize libuv itself first */ 233 uv__once_init(); 234 235 /* Create an I/O completion port */ 236 loop->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1); 237 if (loop->iocp == NULL) 238 return uv_translate_sys_error(GetLastError()); 239 240 lfields = (uv__loop_internal_fields_t*) uv__calloc(1, sizeof(*lfields)); 241 if (lfields == NULL) 242 return UV_ENOMEM; 243 loop->internal_fields = lfields; 244 245 err = uv_mutex_init(&lfields->loop_metrics.lock); 246 if (err) 247 goto fail_metrics_mutex_init; 248 memset(&lfields->loop_metrics.metrics, 249 0, 250 sizeof(lfields->loop_metrics.metrics)); 251 252 /* To prevent uninitialized memory access, loop->time must be initialized 253 * to zero before calling uv_update_time for the first time. 254 */ 255 loop->time = 0; 256 uv_update_time(loop); 257 258 uv__queue_init(&loop->wq); 259 uv__queue_init(&loop->handle_queue); 260 loop->active_reqs.count = 0; 261 loop->active_handles = 0; 262 263 loop->pending_reqs_tail = NULL; 264 265 loop->endgame_handles = NULL; 266 267 loop->timer_heap = timer_heap = uv__malloc(sizeof(*timer_heap)); 268 if (timer_heap == NULL) { 269 err = UV_ENOMEM; 270 goto fail_timers_alloc; 271 } 272 273 heap_init(timer_heap); 274 275 loop->check_handles = NULL; 276 loop->prepare_handles = NULL; 277 loop->idle_handles = NULL; 278 279 loop->next_prepare_handle = NULL; 280 loop->next_check_handle = NULL; 281 loop->next_idle_handle = NULL; 282 283 memset(&loop->poll_peer_sockets, 0, sizeof loop->poll_peer_sockets); 284 285 loop->timer_counter = 0; 286 loop->stop_flag = 0; 287 288 err = uv_mutex_init(&loop->wq_mutex); 289 if (err) 290 goto fail_mutex_init; 291 292 err = uv_async_init(loop, &loop->wq_async, uv__work_done); 293 if (err) 294 goto fail_async_init; 295 296 uv__handle_unref(&loop->wq_async); 297 loop->wq_async.flags |= UV_HANDLE_INTERNAL; 298 299 err = uv__loops_add(loop); 300 if (err) 301 goto fail_async_init; 302 303 return 0; 304 305 fail_async_init: 306 uv_mutex_destroy(&loop->wq_mutex); 307 308 fail_mutex_init: 309 uv__free(timer_heap); 310 loop->timer_heap = NULL; 311 312 fail_timers_alloc: 313 uv_mutex_destroy(&lfields->loop_metrics.lock); 314 315 fail_metrics_mutex_init: 316 uv__free(lfields); 317 loop->internal_fields = NULL; 318 CloseHandle(loop->iocp); 319 loop->iocp = INVALID_HANDLE_VALUE; 320 321 return err; 322 } 323 324 325 void uv_update_time(uv_loop_t* loop) { 326 uint64_t new_time = uv__hrtime(1000); 327 assert(new_time >= loop->time); 328 loop->time = new_time; 329 } 330 331 332 void uv__once_init(void) { 333 uv_once(&uv_init_guard_, uv__init); 334 } 335 336 337 void uv__loop_close(uv_loop_t* loop) { 338 uv__loop_internal_fields_t* lfields; 339 size_t i; 340 341 uv__loops_remove(loop); 342 343 /* Close the async handle without needing an extra loop iteration. 344 * We might have a pending message, but we're just going to destroy the IOCP 345 * soon, so we can just discard it now without the usual risk of a getting 346 * another notification from GetQueuedCompletionStatusEx after calling the 347 * close_cb (which we also skip defining). We'll assert later that queue was 348 * actually empty and all reqs handled. */ 349 loop->wq_async.async_sent = 0; 350 loop->wq_async.close_cb = NULL; 351 uv__handle_closing(&loop->wq_async); 352 uv__handle_close(&loop->wq_async); 353 354 for (i = 0; i < ARRAY_SIZE(loop->poll_peer_sockets); i++) { 355 SOCKET sock = loop->poll_peer_sockets[i]; 356 if (sock != 0 && sock != INVALID_SOCKET) 357 closesocket(sock); 358 } 359 360 uv_mutex_lock(&loop->wq_mutex); 361 assert(uv__queue_empty(&loop->wq) && "thread pool work queue not empty!"); 362 assert(!uv__has_active_reqs(loop)); 363 uv_mutex_unlock(&loop->wq_mutex); 364 uv_mutex_destroy(&loop->wq_mutex); 365 366 uv__free(loop->timer_heap); 367 loop->timer_heap = NULL; 368 369 lfields = uv__get_internal_fields(loop); 370 uv_mutex_destroy(&lfields->loop_metrics.lock); 371 uv__free(lfields); 372 loop->internal_fields = NULL; 373 374 CloseHandle(loop->iocp); 375 } 376 377 378 int uv__loop_configure(uv_loop_t* loop, uv_loop_option option, va_list ap) { 379 uv__loop_internal_fields_t* lfields; 380 381 lfields = uv__get_internal_fields(loop); 382 if (option == UV_METRICS_IDLE_TIME) { 383 lfields->flags |= UV_METRICS_IDLE_TIME; 384 return 0; 385 } 386 387 return UV_ENOSYS; 388 } 389 390 391 int uv_backend_fd(const uv_loop_t* loop) { 392 return -1; 393 } 394 395 396 int uv_loop_fork(uv_loop_t* loop) { 397 return UV_ENOSYS; 398 } 399 400 401 static int uv__loop_alive(const uv_loop_t* loop) { 402 return uv__has_active_handles(loop) || 403 uv__has_active_reqs(loop) || 404 loop->pending_reqs_tail != NULL || 405 loop->endgame_handles != NULL; 406 } 407 408 409 int uv_loop_alive(const uv_loop_t* loop) { 410 return uv__loop_alive(loop); 411 } 412 413 414 int uv_backend_timeout(const uv_loop_t* loop) { 415 if (loop->stop_flag == 0 && 416 /* uv__loop_alive(loop) && */ 417 (uv__has_active_handles(loop) || uv__has_active_reqs(loop)) && 418 loop->pending_reqs_tail == NULL && 419 loop->idle_handles == NULL && 420 loop->endgame_handles == NULL) 421 return uv__next_timeout(loop); 422 return 0; 423 } 424 425 426 static void uv__poll(uv_loop_t* loop, DWORD timeout) { 427 uv__loop_internal_fields_t* lfields; 428 BOOL success; 429 uv_req_t* req; 430 OVERLAPPED_ENTRY overlappeds[128]; 431 ULONG count; 432 ULONG i; 433 int repeat; 434 uint64_t timeout_time; 435 uint64_t user_timeout; 436 uint64_t actual_timeout; 437 int reset_timeout; 438 439 lfields = uv__get_internal_fields(loop); 440 timeout_time = loop->time + timeout; 441 442 if (lfields->flags & UV_METRICS_IDLE_TIME) { 443 reset_timeout = 1; 444 user_timeout = timeout; 445 timeout = 0; 446 } else { 447 reset_timeout = 0; 448 } 449 450 for (repeat = 0; ; repeat++) { 451 actual_timeout = timeout; 452 453 /* Only need to set the provider_entry_time if timeout != 0. The function 454 * will return early if the loop isn't configured with UV_METRICS_IDLE_TIME. 455 */ 456 if (timeout != 0) 457 uv__metrics_set_provider_entry_time(loop); 458 459 /* Store the current timeout in a location that's globally accessible so 460 * other locations like uv__work_done() can determine whether the queue 461 * of events in the callback were waiting when poll was called. 462 */ 463 lfields->current_timeout = timeout; 464 465 success = GetQueuedCompletionStatusEx(loop->iocp, 466 overlappeds, 467 ARRAY_SIZE(overlappeds), 468 &count, 469 timeout, 470 FALSE); 471 472 if (reset_timeout != 0) { 473 timeout = user_timeout; 474 reset_timeout = 0; 475 } 476 477 /* Placed here because on success the loop will break whether there is an 478 * empty package or not, or if GetQueuedCompletionStatusEx returned early 479 * then the timeout will be updated and the loop will run again. In either 480 * case the idle time will need to be updated. 481 */ 482 uv__metrics_update_idle_time(loop); 483 484 if (success) { 485 for (i = 0; i < count; i++) { 486 /* Package was dequeued, but see if it is not a empty package 487 * meant only to wake us up. 488 */ 489 if (overlappeds[i].lpOverlapped) { 490 uv__metrics_inc_events(loop, 1); 491 if (actual_timeout == 0) 492 uv__metrics_inc_events_waiting(loop, 1); 493 494 req = uv__overlapped_to_req(overlappeds[i].lpOverlapped); 495 uv__insert_pending_req(loop, req); 496 } 497 } 498 499 /* Some time might have passed waiting for I/O, 500 * so update the loop time here. 501 */ 502 uv_update_time(loop); 503 } else if (GetLastError() != WAIT_TIMEOUT) { 504 /* Serious error */ 505 uv_fatal_error(GetLastError(), "GetQueuedCompletionStatusEx"); 506 } else if (timeout > 0) { 507 /* GetQueuedCompletionStatus can occasionally return a little early. 508 * Make sure that the desired timeout target time is reached. 509 */ 510 uv_update_time(loop); 511 if (timeout_time > loop->time) { 512 timeout = (DWORD)(timeout_time - loop->time); 513 /* The first call to GetQueuedCompletionStatus should return very 514 * close to the target time and the second should reach it, but 515 * this is not stated in the documentation. To make sure a busy 516 * loop cannot happen, the timeout is increased exponentially 517 * starting on the third round. 518 */ 519 timeout += repeat ? (1 << (repeat - 1)) : 0; 520 continue; 521 } 522 } 523 break; 524 } 525 } 526 527 528 int uv_run(uv_loop_t *loop, uv_run_mode mode) { 529 DWORD timeout; 530 int r; 531 int can_sleep; 532 533 r = uv__loop_alive(loop); 534 if (!r) 535 uv_update_time(loop); 536 537 /* Maintain backwards compatibility by processing timers before entering the 538 * while loop for UV_RUN_DEFAULT. Otherwise timers only need to be executed 539 * once, which should be done after polling in order to maintain proper 540 * execution order of the conceptual event loop. */ 541 if (mode == UV_RUN_DEFAULT && r != 0 && loop->stop_flag == 0) { 542 uv_update_time(loop); 543 uv__run_timers(loop); 544 } 545 546 while (r != 0 && loop->stop_flag == 0) { 547 can_sleep = loop->pending_reqs_tail == NULL && loop->idle_handles == NULL; 548 549 uv__process_reqs(loop); 550 uv__idle_invoke(loop); 551 uv__prepare_invoke(loop); 552 553 timeout = 0; 554 if ((mode == UV_RUN_ONCE && can_sleep) || mode == UV_RUN_DEFAULT) 555 timeout = uv_backend_timeout(loop); 556 557 uv__metrics_inc_loop_count(loop); 558 559 uv__poll(loop, timeout); 560 561 /* Process immediate callbacks (e.g. write_cb) a small fixed number of 562 * times to avoid loop starvation.*/ 563 for (r = 0; r < 8 && loop->pending_reqs_tail != NULL; r++) 564 uv__process_reqs(loop); 565 566 /* Run one final update on the provider_idle_time in case uv__poll* 567 * returned because the timeout expired, but no events were received. This 568 * call will be ignored if the provider_entry_time was either never set (if 569 * the timeout == 0) or was already updated b/c an event was received. 570 */ 571 uv__metrics_update_idle_time(loop); 572 573 uv__check_invoke(loop); 574 uv__process_endgames(loop); 575 576 uv_update_time(loop); 577 uv__run_timers(loop); 578 579 r = uv__loop_alive(loop); 580 if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT) 581 break; 582 } 583 584 /* The if statement lets the compiler compile it to a conditional store. 585 * Avoids dirtying a cache line. 586 */ 587 if (loop->stop_flag != 0) 588 loop->stop_flag = 0; 589 590 return r; 591 } 592 593 594 int uv_fileno(const uv_handle_t* handle, uv_os_fd_t* fd) { 595 uv_os_fd_t fd_out; 596 597 switch (handle->type) { 598 case UV_TCP: 599 fd_out = (uv_os_fd_t)((uv_tcp_t*) handle)->socket; 600 break; 601 602 case UV_NAMED_PIPE: 603 fd_out = ((uv_pipe_t*) handle)->handle; 604 break; 605 606 case UV_TTY: 607 fd_out = ((uv_tty_t*) handle)->handle; 608 break; 609 610 case UV_UDP: 611 fd_out = (uv_os_fd_t)((uv_udp_t*) handle)->socket; 612 break; 613 614 case UV_POLL: 615 fd_out = (uv_os_fd_t)((uv_poll_t*) handle)->socket; 616 break; 617 618 default: 619 return UV_EINVAL; 620 } 621 622 if (uv_is_closing(handle) || fd_out == INVALID_HANDLE_VALUE) 623 return UV_EBADF; 624 625 *fd = fd_out; 626 return 0; 627 } 628 629 630 int uv__socket_sockopt(uv_handle_t* handle, int optname, int* value) { 631 int r; 632 int len; 633 SOCKET socket; 634 635 if (handle == NULL || value == NULL) 636 return UV_EINVAL; 637 638 if (handle->type == UV_TCP) 639 socket = ((uv_tcp_t*) handle)->socket; 640 else if (handle->type == UV_UDP) 641 socket = ((uv_udp_t*) handle)->socket; 642 else 643 return UV_ENOTSUP; 644 645 len = sizeof(*value); 646 647 if (*value == 0) 648 r = getsockopt(socket, SOL_SOCKET, optname, (char*) value, &len); 649 else 650 r = setsockopt(socket, SOL_SOCKET, optname, (const char*) value, len); 651 652 if (r == SOCKET_ERROR) 653 return uv_translate_sys_error(WSAGetLastError()); 654 655 return 0; 656 } 657 658 int uv_cpumask_size(void) { 659 return (int)(sizeof(DWORD_PTR) * 8); 660 } 661 662 int uv__getsockpeername(const uv_handle_t* handle, 663 uv__peersockfunc func, 664 struct sockaddr* name, 665 int* namelen, 666 int delayed_error) { 667 668 int result; 669 uv_os_fd_t fd; 670 671 result = uv_fileno(handle, &fd); 672 if (result != 0) 673 return result; 674 675 if (delayed_error) 676 return uv_translate_sys_error(delayed_error); 677 678 result = func((SOCKET) fd, name, namelen); 679 if (result != 0) 680 return uv_translate_sys_error(WSAGetLastError()); 681 682 return 0; 683 } 684