1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 * 3 * Permission is hereby granted, free of charge, to any person obtaining a copy 4 * of this software and associated documentation files (the "Software"), to 5 * deal in the Software without restriction, including without limitation the 6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 7 * sell copies of the Software, and to permit persons to whom the Software is 8 * furnished to do so, subject to the following conditions: 9 * 10 * The above copyright notice and this permission notice shall be included in 11 * all copies or substantial portions of the Software. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19 * IN THE SOFTWARE. 20 */ 21 22 #include "uv.h" 23 #include "internal.h" 24 25 #include <stdio.h> 26 #include <stdlib.h> 27 #include <string.h> 28 #include <assert.h> 29 #include <errno.h> 30 31 #include <sys/types.h> 32 #include <sys/socket.h> 33 #include <sys/uio.h> 34 #include <sys/un.h> 35 #include <unistd.h> 36 #include <limits.h> /* IOV_MAX */ 37 38 #if defined(__APPLE__) 39 # include <sys/event.h> 40 # include <sys/time.h> 41 # include <sys/select.h> 42 43 /* Forward declaration */ 44 typedef struct uv__stream_select_s uv__stream_select_t; 45 46 struct uv__stream_select_s { 47 uv_stream_t* stream; 48 uv_thread_t thread; 49 uv_sem_t close_sem; 50 uv_sem_t async_sem; 51 uv_async_t async; 52 int events; 53 int fake_fd; 54 int int_fd; 55 int fd; 56 fd_set* sread; 57 size_t sread_sz; 58 fd_set* swrite; 59 size_t swrite_sz; 60 }; 61 #endif /* defined(__APPLE__) */ 62 63 union uv__cmsg { 64 struct cmsghdr hdr; 65 /* This cannot be larger because of the IBMi PASE limitation that 66 * the total size of control messages cannot exceed 256 bytes. 67 */ 68 char pad[256]; 69 }; 70 71 STATIC_ASSERT(256 == sizeof(union uv__cmsg)); 72 73 static void uv__stream_connect(uv_stream_t*); 74 static void uv__write(uv_stream_t* stream); 75 static void uv__read(uv_stream_t* stream); 76 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events); 77 static void uv__write_callbacks(uv_stream_t* stream); 78 static size_t uv__write_req_size(uv_write_t* req); 79 static void uv__drain(uv_stream_t* stream); 80 81 82 void uv__stream_init(uv_loop_t* loop, 83 uv_stream_t* stream, 84 uv_handle_type type) { 85 int err; 86 87 uv__handle_init(loop, (uv_handle_t*)stream, type); 88 stream->read_cb = NULL; 89 stream->alloc_cb = NULL; 90 stream->close_cb = NULL; 91 stream->connection_cb = NULL; 92 stream->connect_req = NULL; 93 stream->shutdown_req = NULL; 94 stream->accepted_fd = -1; 95 stream->queued_fds = NULL; 96 stream->delayed_error = 0; 97 uv__queue_init(&stream->write_queue); 98 uv__queue_init(&stream->write_completed_queue); 99 stream->write_queue_size = 0; 100 101 if (loop->emfile_fd == -1) { 102 err = uv__open_cloexec("/dev/null", O_RDONLY); 103 if (err < 0) 104 /* In the rare case that "/dev/null" isn't mounted open "/" 105 * instead. 106 */ 107 err = uv__open_cloexec("/", O_RDONLY); 108 if (err >= 0) 109 loop->emfile_fd = err; 110 } 111 112 #if defined(__APPLE__) 113 stream->select = NULL; 114 #endif /* defined(__APPLE_) */ 115 116 uv__io_init(&stream->io_watcher, uv__stream_io, -1); 117 } 118 119 120 static void uv__stream_osx_interrupt_select(uv_stream_t* stream) { 121 #if defined(__APPLE__) 122 /* Notify select() thread about state change */ 123 uv__stream_select_t* s; 124 int r; 125 126 s = stream->select; 127 if (s == NULL) 128 return; 129 130 /* Interrupt select() loop 131 * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will 132 * emit read event on other side 133 */ 134 do 135 r = write(s->fake_fd, "x", 1); 136 while (r == -1 && errno == EINTR); 137 138 assert(r == 1); 139 #else /* !defined(__APPLE__) */ 140 /* No-op on any other platform */ 141 #endif /* !defined(__APPLE__) */ 142 } 143 144 145 #if defined(__APPLE__) 146 static void uv__stream_osx_select(void* arg) { 147 uv_stream_t* stream; 148 uv__stream_select_t* s; 149 char buf[1024]; 150 int events; 151 int fd; 152 int r; 153 int max_fd; 154 155 stream = arg; 156 s = stream->select; 157 fd = s->fd; 158 159 if (fd > s->int_fd) 160 max_fd = fd; 161 else 162 max_fd = s->int_fd; 163 164 for (;;) { 165 /* Terminate on semaphore */ 166 if (uv_sem_trywait(&s->close_sem) == 0) 167 break; 168 169 /* Watch fd using select(2) */ 170 memset(s->sread, 0, s->sread_sz); 171 memset(s->swrite, 0, s->swrite_sz); 172 173 if (uv__io_active(&stream->io_watcher, POLLIN)) 174 FD_SET(fd, s->sread); 175 if (uv__io_active(&stream->io_watcher, POLLOUT)) 176 FD_SET(fd, s->swrite); 177 FD_SET(s->int_fd, s->sread); 178 179 /* Wait indefinitely for fd events */ 180 r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL); 181 if (r == -1) { 182 if (errno == EINTR) 183 continue; 184 185 /* XXX: Possible?! */ 186 abort(); 187 } 188 189 /* Ignore timeouts */ 190 if (r == 0) 191 continue; 192 193 /* Empty socketpair's buffer in case of interruption */ 194 if (FD_ISSET(s->int_fd, s->sread)) 195 for (;;) { 196 r = read(s->int_fd, buf, sizeof(buf)); 197 198 if (r == sizeof(buf)) 199 continue; 200 201 if (r != -1) 202 break; 203 204 if (errno == EAGAIN || errno == EWOULDBLOCK) 205 break; 206 207 if (errno == EINTR) 208 continue; 209 210 abort(); 211 } 212 213 /* Handle events */ 214 events = 0; 215 if (FD_ISSET(fd, s->sread)) 216 events |= POLLIN; 217 if (FD_ISSET(fd, s->swrite)) 218 events |= POLLOUT; 219 220 assert(events != 0 || FD_ISSET(s->int_fd, s->sread)); 221 if (events != 0) { 222 ACCESS_ONCE(int, s->events) = events; 223 224 uv_async_send(&s->async); 225 uv_sem_wait(&s->async_sem); 226 227 /* Should be processed at this stage */ 228 assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING)); 229 } 230 } 231 } 232 233 234 static void uv__stream_osx_select_cb(uv_async_t* handle) { 235 uv__stream_select_t* s; 236 uv_stream_t* stream; 237 int events; 238 239 s = container_of(handle, uv__stream_select_t, async); 240 stream = s->stream; 241 242 /* Get and reset stream's events */ 243 events = s->events; 244 ACCESS_ONCE(int, s->events) = 0; 245 246 assert(events != 0); 247 assert(events == (events & (POLLIN | POLLOUT))); 248 249 /* Invoke callback on event-loop */ 250 if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN)) 251 uv__stream_io(stream->loop, &stream->io_watcher, POLLIN); 252 253 if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT)) 254 uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT); 255 256 if (stream->flags & UV_HANDLE_CLOSING) 257 return; 258 259 /* NOTE: It is important to do it here, otherwise `select()` might be called 260 * before the actual `uv__read()`, leading to the blocking syscall 261 */ 262 uv_sem_post(&s->async_sem); 263 } 264 265 266 static void uv__stream_osx_cb_close(uv_handle_t* async) { 267 uv__stream_select_t* s; 268 269 s = container_of(async, uv__stream_select_t, async); 270 uv__free(s); 271 } 272 273 274 int uv__stream_try_select(uv_stream_t* stream, int* fd) { 275 /* 276 * kqueue doesn't work with some files from /dev mount on osx. 277 * select(2) in separate thread for those fds 278 */ 279 280 struct kevent filter[1]; 281 struct kevent events[1]; 282 struct timespec timeout; 283 uv__stream_select_t* s; 284 int fds[2]; 285 int err; 286 int ret; 287 int kq; 288 int old_fd; 289 int max_fd; 290 size_t sread_sz; 291 size_t swrite_sz; 292 293 kq = kqueue(); 294 if (kq == -1) { 295 perror("(libuv) kqueue()"); 296 return UV__ERR(errno); 297 } 298 299 EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0); 300 301 /* Use small timeout, because we only want to capture EINVALs */ 302 timeout.tv_sec = 0; 303 timeout.tv_nsec = 1; 304 305 do 306 ret = kevent(kq, filter, 1, events, 1, &timeout); 307 while (ret == -1 && errno == EINTR); 308 309 uv__close(kq); 310 311 if (ret == -1) 312 return UV__ERR(errno); 313 314 if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL) 315 return 0; 316 317 /* At this point we definitely know that this fd won't work with kqueue */ 318 319 /* 320 * Create fds for io watcher and to interrupt the select() loop. 321 * NOTE: do it ahead of malloc below to allocate enough space for fd_sets 322 */ 323 if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds)) 324 return UV__ERR(errno); 325 326 max_fd = *fd; 327 if (fds[1] > max_fd) 328 max_fd = fds[1]; 329 330 sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY; 331 swrite_sz = sread_sz; 332 333 s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz); 334 if (s == NULL) { 335 err = UV_ENOMEM; 336 goto failed_malloc; 337 } 338 339 s->events = 0; 340 s->fd = *fd; 341 s->sread = (fd_set*) ((char*) s + sizeof(*s)); 342 s->sread_sz = sread_sz; 343 s->swrite = (fd_set*) ((char*) s->sread + sread_sz); 344 s->swrite_sz = swrite_sz; 345 346 err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb); 347 if (err) 348 goto failed_async_init; 349 350 s->async.flags |= UV_HANDLE_INTERNAL; 351 uv__handle_unref(&s->async); 352 353 err = uv_sem_init(&s->close_sem, 0); 354 if (err != 0) 355 goto failed_close_sem_init; 356 357 err = uv_sem_init(&s->async_sem, 0); 358 if (err != 0) 359 goto failed_async_sem_init; 360 361 s->fake_fd = fds[0]; 362 s->int_fd = fds[1]; 363 364 old_fd = *fd; 365 s->stream = stream; 366 stream->select = s; 367 *fd = s->fake_fd; 368 369 err = uv_thread_create(&s->thread, uv__stream_osx_select, stream); 370 if (err != 0) 371 goto failed_thread_create; 372 373 return 0; 374 375 failed_thread_create: 376 s->stream = NULL; 377 stream->select = NULL; 378 *fd = old_fd; 379 380 uv_sem_destroy(&s->async_sem); 381 382 failed_async_sem_init: 383 uv_sem_destroy(&s->close_sem); 384 385 failed_close_sem_init: 386 uv__close(fds[0]); 387 uv__close(fds[1]); 388 uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close); 389 return err; 390 391 failed_async_init: 392 uv__free(s); 393 394 failed_malloc: 395 uv__close(fds[0]); 396 uv__close(fds[1]); 397 398 return err; 399 } 400 #endif /* defined(__APPLE__) */ 401 402 403 int uv__stream_open(uv_stream_t* stream, int fd, int flags) { 404 #if defined(__APPLE__) 405 int enable; 406 #endif 407 408 if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd)) 409 return UV_EBUSY; 410 411 assert(fd >= 0); 412 stream->flags |= flags; 413 414 if (stream->type == UV_TCP) { 415 if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1)) 416 return UV__ERR(errno); 417 418 /* TODO Use delay the user passed in. */ 419 if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) && 420 uv__tcp_keepalive(fd, 1, 60)) { 421 return UV__ERR(errno); 422 } 423 } 424 425 #if defined(__APPLE__) 426 enable = 1; 427 if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) && 428 errno != ENOTSOCK && 429 errno != EINVAL) { 430 return UV__ERR(errno); 431 } 432 #endif 433 434 stream->io_watcher.fd = fd; 435 436 return 0; 437 } 438 439 440 void uv__stream_flush_write_queue(uv_stream_t* stream, int error) { 441 uv_write_t* req; 442 struct uv__queue* q; 443 while (!uv__queue_empty(&stream->write_queue)) { 444 q = uv__queue_head(&stream->write_queue); 445 uv__queue_remove(q); 446 447 req = uv__queue_data(q, uv_write_t, queue); 448 req->error = error; 449 450 uv__queue_insert_tail(&stream->write_completed_queue, &req->queue); 451 } 452 } 453 454 455 void uv__stream_destroy(uv_stream_t* stream) { 456 assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT)); 457 assert(stream->flags & UV_HANDLE_CLOSED); 458 459 if (stream->connect_req) { 460 uv__req_unregister(stream->loop); 461 stream->connect_req->cb(stream->connect_req, UV_ECANCELED); 462 stream->connect_req = NULL; 463 } 464 465 uv__stream_flush_write_queue(stream, UV_ECANCELED); 466 uv__write_callbacks(stream); 467 uv__drain(stream); 468 469 assert(stream->write_queue_size == 0); 470 } 471 472 473 /* Implements a best effort approach to mitigating accept() EMFILE errors. 474 * We have a spare file descriptor stashed away that we close to get below 475 * the EMFILE limit. Next, we accept all pending connections and close them 476 * immediately to signal the clients that we're overloaded - and we are, but 477 * we still keep on trucking. 478 * 479 * There is one caveat: it's not reliable in a multi-threaded environment. 480 * The file descriptor limit is per process. Our party trick fails if another 481 * thread opens a file or creates a socket in the time window between us 482 * calling close() and accept(). 483 */ 484 static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) { 485 int err; 486 int emfile_fd; 487 488 if (loop->emfile_fd == -1) 489 return UV_EMFILE; 490 491 uv__close(loop->emfile_fd); 492 loop->emfile_fd = -1; 493 494 do { 495 err = uv__accept(accept_fd); 496 if (err >= 0) 497 uv__close(err); 498 } while (err >= 0 || err == UV_EINTR); 499 500 emfile_fd = uv__open_cloexec("/", O_RDONLY); 501 if (emfile_fd >= 0) 502 loop->emfile_fd = emfile_fd; 503 504 return err; 505 } 506 507 508 void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { 509 uv_stream_t* stream; 510 int err; 511 int fd; 512 513 stream = container_of(w, uv_stream_t, io_watcher); 514 assert(events & POLLIN); 515 assert(stream->accepted_fd == -1); 516 assert(!(stream->flags & UV_HANDLE_CLOSING)); 517 518 fd = uv__stream_fd(stream); 519 err = uv__accept(fd); 520 521 if (err == UV_EMFILE || err == UV_ENFILE) 522 err = uv__emfile_trick(loop, fd); /* Shed load. */ 523 524 if (err < 0) 525 return; 526 527 stream->accepted_fd = err; 528 stream->connection_cb(stream, 0); 529 530 if (stream->accepted_fd != -1) 531 /* The user hasn't yet accepted called uv_accept() */ 532 uv__io_stop(loop, &stream->io_watcher, POLLIN); 533 } 534 535 536 int uv_accept(uv_stream_t* server, uv_stream_t* client) { 537 int err; 538 539 assert(server->loop == client->loop); 540 541 if (server->accepted_fd == -1) 542 return UV_EAGAIN; 543 544 switch (client->type) { 545 case UV_NAMED_PIPE: 546 case UV_TCP: 547 err = uv__stream_open(client, 548 server->accepted_fd, 549 UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); 550 if (err) { 551 /* TODO handle error */ 552 uv__close(server->accepted_fd); 553 goto done; 554 } 555 break; 556 557 case UV_UDP: 558 err = uv_udp_open((uv_udp_t*) client, server->accepted_fd); 559 if (err) { 560 uv__close(server->accepted_fd); 561 goto done; 562 } 563 break; 564 565 default: 566 return UV_EINVAL; 567 } 568 569 client->flags |= UV_HANDLE_BOUND; 570 571 done: 572 /* Process queued fds */ 573 if (server->queued_fds != NULL) { 574 uv__stream_queued_fds_t* queued_fds; 575 576 queued_fds = server->queued_fds; 577 578 /* Read first */ 579 server->accepted_fd = queued_fds->fds[0]; 580 581 /* All read, free */ 582 assert(queued_fds->offset > 0); 583 if (--queued_fds->offset == 0) { 584 uv__free(queued_fds); 585 server->queued_fds = NULL; 586 } else { 587 /* Shift rest */ 588 memmove(queued_fds->fds, 589 queued_fds->fds + 1, 590 queued_fds->offset * sizeof(*queued_fds->fds)); 591 } 592 } else { 593 server->accepted_fd = -1; 594 if (err == 0) 595 uv__io_start(server->loop, &server->io_watcher, POLLIN); 596 } 597 return err; 598 } 599 600 601 int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) { 602 int err; 603 if (uv__is_closing(stream)) { 604 return UV_EINVAL; 605 } 606 switch (stream->type) { 607 case UV_TCP: 608 err = uv__tcp_listen((uv_tcp_t*)stream, backlog, cb); 609 break; 610 611 case UV_NAMED_PIPE: 612 err = uv__pipe_listen((uv_pipe_t*)stream, backlog, cb); 613 break; 614 615 default: 616 err = UV_EINVAL; 617 } 618 619 if (err == 0) 620 uv__handle_start(stream); 621 622 return err; 623 } 624 625 626 static void uv__drain(uv_stream_t* stream) { 627 uv_shutdown_t* req; 628 int err; 629 630 assert(uv__queue_empty(&stream->write_queue)); 631 if (!(stream->flags & UV_HANDLE_CLOSING)) { 632 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); 633 uv__stream_osx_interrupt_select(stream); 634 } 635 636 if (!uv__is_stream_shutting(stream)) 637 return; 638 639 req = stream->shutdown_req; 640 assert(req); 641 642 if ((stream->flags & UV_HANDLE_CLOSING) || 643 !(stream->flags & UV_HANDLE_SHUT)) { 644 stream->shutdown_req = NULL; 645 uv__req_unregister(stream->loop); 646 647 err = 0; 648 if (stream->flags & UV_HANDLE_CLOSING) 649 /* The user destroyed the stream before we got to do the shutdown. */ 650 err = UV_ECANCELED; 651 else if (shutdown(uv__stream_fd(stream), SHUT_WR)) 652 err = UV__ERR(errno); 653 else /* Success. */ 654 stream->flags |= UV_HANDLE_SHUT; 655 656 if (req->cb != NULL) 657 req->cb(req, err); 658 } 659 } 660 661 662 static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) { 663 if (n == 1) 664 return write(fd, vec->iov_base, vec->iov_len); 665 else 666 return writev(fd, vec, n); 667 } 668 669 670 static size_t uv__write_req_size(uv_write_t* req) { 671 size_t size; 672 673 assert(req->bufs != NULL); 674 size = uv__count_bufs(req->bufs + req->write_index, 675 req->nbufs - req->write_index); 676 assert(req->handle->write_queue_size >= size); 677 678 return size; 679 } 680 681 682 /* Returns 1 if all write request data has been written, or 0 if there is still 683 * more data to write. 684 * 685 * Note: the return value only says something about the *current* request. 686 * There may still be other write requests sitting in the queue. 687 */ 688 static int uv__write_req_update(uv_stream_t* stream, 689 uv_write_t* req, 690 size_t n) { 691 uv_buf_t* buf; 692 size_t len; 693 694 assert(n <= stream->write_queue_size); 695 stream->write_queue_size -= n; 696 697 buf = req->bufs + req->write_index; 698 699 do { 700 len = n < buf->len ? n : buf->len; 701 if (buf->len != 0) 702 buf->base += len; 703 buf->len -= len; 704 buf += (buf->len == 0); /* Advance to next buffer if this one is empty. */ 705 n -= len; 706 } while (n > 0); 707 708 req->write_index = buf - req->bufs; 709 710 return req->write_index == req->nbufs; 711 } 712 713 714 static void uv__write_req_finish(uv_write_t* req) { 715 uv_stream_t* stream = req->handle; 716 717 /* Pop the req off tcp->write_queue. */ 718 uv__queue_remove(&req->queue); 719 720 /* Only free when there was no error. On error, we touch up write_queue_size 721 * right before making the callback. The reason we don't do that right away 722 * is that a write_queue_size > 0 is our only way to signal to the user that 723 * they should stop writing - which they should if we got an error. Something 724 * to revisit in future revisions of the libuv API. 725 */ 726 if (req->error == 0) { 727 if (req->bufs != req->bufsml) 728 uv__free(req->bufs); 729 req->bufs = NULL; 730 } 731 732 /* Add it to the write_completed_queue where it will have its 733 * callback called in the near future. 734 */ 735 uv__queue_insert_tail(&stream->write_completed_queue, &req->queue); 736 uv__io_feed(stream->loop, &stream->io_watcher); 737 } 738 739 740 static int uv__handle_fd(uv_handle_t* handle) { 741 switch (handle->type) { 742 case UV_NAMED_PIPE: 743 case UV_TCP: 744 return ((uv_stream_t*) handle)->io_watcher.fd; 745 746 case UV_UDP: 747 return ((uv_udp_t*) handle)->io_watcher.fd; 748 749 default: 750 return -1; 751 } 752 } 753 754 static int uv__try_write(uv_stream_t* stream, 755 const uv_buf_t bufs[], 756 unsigned int nbufs, 757 uv_stream_t* send_handle) { 758 struct iovec* iov; 759 int iovmax; 760 int iovcnt; 761 ssize_t n; 762 763 /* 764 * Cast to iovec. We had to have our own uv_buf_t instead of iovec 765 * because Windows's WSABUF is not an iovec. 766 */ 767 iov = (struct iovec*) bufs; 768 iovcnt = nbufs; 769 770 iovmax = uv__getiovmax(); 771 772 /* Limit iov count to avoid EINVALs from writev() */ 773 if (iovcnt > iovmax) 774 iovcnt = iovmax; 775 776 /* 777 * Now do the actual writev. Note that we've been updating the pointers 778 * inside the iov each time we write. So there is no need to offset it. 779 */ 780 if (send_handle != NULL) { 781 int fd_to_send; 782 struct msghdr msg; 783 union uv__cmsg cmsg; 784 785 if (uv__is_closing(send_handle)) 786 return UV_EBADF; 787 788 fd_to_send = uv__handle_fd((uv_handle_t*) send_handle); 789 790 memset(&cmsg, 0, sizeof(cmsg)); 791 792 assert(fd_to_send >= 0); 793 794 msg.msg_name = NULL; 795 msg.msg_namelen = 0; 796 msg.msg_iov = iov; 797 msg.msg_iovlen = iovcnt; 798 msg.msg_flags = 0; 799 800 msg.msg_control = &cmsg.hdr; 801 msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send)); 802 803 cmsg.hdr.cmsg_level = SOL_SOCKET; 804 cmsg.hdr.cmsg_type = SCM_RIGHTS; 805 cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(fd_to_send)); 806 memcpy(CMSG_DATA(&cmsg.hdr), &fd_to_send, sizeof(fd_to_send)); 807 808 do 809 n = sendmsg(uv__stream_fd(stream), &msg, 0); 810 while (n == -1 && errno == EINTR); 811 } else { 812 do 813 n = uv__writev(uv__stream_fd(stream), iov, iovcnt); 814 while (n == -1 && errno == EINTR); 815 } 816 817 if (n >= 0) 818 return n; 819 820 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) 821 return UV_EAGAIN; 822 823 #ifdef __APPLE__ 824 /* macOS versions 10.10 and 10.15 - and presumbaly 10.11 to 10.14, too - 825 * have a bug where a race condition causes the kernel to return EPROTOTYPE 826 * because the socket isn't fully constructed. It's probably the result of 827 * the peer closing the connection and that is why libuv translates it to 828 * ECONNRESET. Previously, libuv retried until the EPROTOTYPE error went 829 * away but some VPN software causes the same behavior except the error is 830 * permanent, not transient, turning the retry mechanism into an infinite 831 * loop. See https://github.com/libuv/libuv/pull/482. 832 */ 833 if (errno == EPROTOTYPE) 834 return UV_ECONNRESET; 835 #endif /* __APPLE__ */ 836 837 return UV__ERR(errno); 838 } 839 840 static void uv__write(uv_stream_t* stream) { 841 struct uv__queue* q; 842 uv_write_t* req; 843 ssize_t n; 844 int count; 845 846 assert(uv__stream_fd(stream) >= 0); 847 848 /* Prevent loop starvation when the consumer of this stream read as fast as 849 * (or faster than) we can write it. This `count` mechanism does not need to 850 * change even if we switch to edge-triggered I/O. 851 */ 852 count = 32; 853 854 for (;;) { 855 if (uv__queue_empty(&stream->write_queue)) 856 return; 857 858 q = uv__queue_head(&stream->write_queue); 859 req = uv__queue_data(q, uv_write_t, queue); 860 assert(req->handle == stream); 861 862 n = uv__try_write(stream, 863 &(req->bufs[req->write_index]), 864 req->nbufs - req->write_index, 865 req->send_handle); 866 867 /* Ensure the handle isn't sent again in case this is a partial write. */ 868 if (n >= 0) { 869 req->send_handle = NULL; 870 if (uv__write_req_update(stream, req, n)) { 871 uv__write_req_finish(req); 872 if (count-- > 0) 873 continue; /* Start trying to write the next request. */ 874 875 return; 876 } 877 } else if (n != UV_EAGAIN) 878 goto error; 879 880 /* If this is a blocking stream, try again. */ 881 if (stream->flags & UV_HANDLE_BLOCKING_WRITES) 882 continue; 883 884 /* We're not done. */ 885 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); 886 887 /* Notify select() thread about state change */ 888 uv__stream_osx_interrupt_select(stream); 889 890 return; 891 } 892 893 error: 894 req->error = n; 895 uv__write_req_finish(req); 896 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); 897 uv__stream_osx_interrupt_select(stream); 898 } 899 900 901 static void uv__write_callbacks(uv_stream_t* stream) { 902 uv_write_t* req; 903 struct uv__queue* q; 904 struct uv__queue pq; 905 906 if (uv__queue_empty(&stream->write_completed_queue)) 907 return; 908 909 uv__queue_move(&stream->write_completed_queue, &pq); 910 911 while (!uv__queue_empty(&pq)) { 912 /* Pop a req off write_completed_queue. */ 913 q = uv__queue_head(&pq); 914 req = uv__queue_data(q, uv_write_t, queue); 915 uv__queue_remove(q); 916 uv__req_unregister(stream->loop); 917 918 if (req->bufs != NULL) { 919 stream->write_queue_size -= uv__write_req_size(req); 920 if (req->bufs != req->bufsml) 921 uv__free(req->bufs); 922 req->bufs = NULL; 923 } 924 925 /* NOTE: call callback AFTER freeing the request data. */ 926 if (req->cb) 927 req->cb(req, req->error); 928 } 929 } 930 931 932 static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) { 933 stream->flags |= UV_HANDLE_READ_EOF; 934 stream->flags &= ~UV_HANDLE_READING; 935 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); 936 uv__handle_stop(stream); 937 uv__stream_osx_interrupt_select(stream); 938 stream->read_cb(stream, UV_EOF, buf); 939 } 940 941 942 static int uv__stream_queue_fd(uv_stream_t* stream, int fd) { 943 uv__stream_queued_fds_t* queued_fds; 944 unsigned int queue_size; 945 946 queued_fds = stream->queued_fds; 947 if (queued_fds == NULL) { 948 queue_size = 8; 949 queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) + 950 sizeof(*queued_fds)); 951 if (queued_fds == NULL) 952 return UV_ENOMEM; 953 queued_fds->size = queue_size; 954 queued_fds->offset = 0; 955 stream->queued_fds = queued_fds; 956 957 /* Grow */ 958 } else if (queued_fds->size == queued_fds->offset) { 959 queue_size = queued_fds->size + 8; 960 queued_fds = uv__realloc(queued_fds, 961 (queue_size - 1) * sizeof(*queued_fds->fds) + 962 sizeof(*queued_fds)); 963 964 /* 965 * Allocation failure, report back. 966 * NOTE: if it is fatal - sockets will be closed in uv__stream_close 967 */ 968 if (queued_fds == NULL) 969 return UV_ENOMEM; 970 queued_fds->size = queue_size; 971 stream->queued_fds = queued_fds; 972 } 973 974 /* Put fd in a queue */ 975 queued_fds->fds[queued_fds->offset++] = fd; 976 977 return 0; 978 } 979 980 981 static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) { 982 struct cmsghdr* cmsg; 983 char* p; 984 char* pe; 985 int fd; 986 int err; 987 size_t count; 988 989 err = 0; 990 for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) { 991 if (cmsg->cmsg_type != SCM_RIGHTS) { 992 fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n", 993 cmsg->cmsg_type); 994 continue; 995 } 996 997 assert(cmsg->cmsg_len >= CMSG_LEN(0)); 998 count = cmsg->cmsg_len - CMSG_LEN(0); 999 assert(count % sizeof(fd) == 0); 1000 count /= sizeof(fd); 1001 1002 p = (void*) CMSG_DATA(cmsg); 1003 pe = p + count * sizeof(fd); 1004 1005 while (p < pe) { 1006 memcpy(&fd, p, sizeof(fd)); 1007 p += sizeof(fd); 1008 1009 if (err == 0) { 1010 if (stream->accepted_fd == -1) 1011 stream->accepted_fd = fd; 1012 else 1013 err = uv__stream_queue_fd(stream, fd); 1014 } 1015 1016 if (err != 0) 1017 uv__close(fd); 1018 } 1019 } 1020 1021 return err; 1022 } 1023 1024 1025 static void uv__read(uv_stream_t* stream) { 1026 uv_buf_t buf; 1027 ssize_t nread; 1028 struct msghdr msg; 1029 union uv__cmsg cmsg; 1030 int count; 1031 int err; 1032 int is_ipc; 1033 1034 stream->flags &= ~UV_HANDLE_READ_PARTIAL; 1035 1036 /* Prevent loop starvation when the data comes in as fast as (or faster than) 1037 * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O. 1038 */ 1039 count = 32; 1040 1041 is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc; 1042 1043 /* XXX: Maybe instead of having UV_HANDLE_READING we just test if 1044 * tcp->read_cb is NULL or not? 1045 */ 1046 while (stream->read_cb 1047 && (stream->flags & UV_HANDLE_READING) 1048 && (count-- > 0)) { 1049 assert(stream->alloc_cb != NULL); 1050 1051 buf = uv_buf_init(NULL, 0); 1052 stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf); 1053 if (buf.base == NULL || buf.len == 0) { 1054 /* User indicates it can't or won't handle the read. */ 1055 stream->read_cb(stream, UV_ENOBUFS, &buf); 1056 return; 1057 } 1058 1059 assert(buf.base != NULL); 1060 assert(uv__stream_fd(stream) >= 0); 1061 1062 if (!is_ipc) { 1063 do { 1064 nread = read(uv__stream_fd(stream), buf.base, buf.len); 1065 } 1066 while (nread < 0 && errno == EINTR); 1067 } else { 1068 /* ipc uses recvmsg */ 1069 msg.msg_flags = 0; 1070 msg.msg_iov = (struct iovec*) &buf; 1071 msg.msg_iovlen = 1; 1072 msg.msg_name = NULL; 1073 msg.msg_namelen = 0; 1074 /* Set up to receive a descriptor even if one isn't in the message */ 1075 msg.msg_controllen = sizeof(cmsg); 1076 msg.msg_control = &cmsg.hdr; 1077 1078 do { 1079 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0); 1080 } 1081 while (nread < 0 && errno == EINTR); 1082 } 1083 1084 if (nread < 0) { 1085 /* Error */ 1086 if (errno == EAGAIN || errno == EWOULDBLOCK) { 1087 /* Wait for the next one. */ 1088 if (stream->flags & UV_HANDLE_READING) { 1089 uv__io_start(stream->loop, &stream->io_watcher, POLLIN); 1090 uv__stream_osx_interrupt_select(stream); 1091 } 1092 stream->read_cb(stream, 0, &buf); 1093 #if defined(__CYGWIN__) || defined(__MSYS__) 1094 } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) { 1095 uv__stream_eof(stream, &buf); 1096 return; 1097 #endif 1098 } else { 1099 /* Error. User should call uv_close(). */ 1100 stream->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); 1101 stream->read_cb(stream, UV__ERR(errno), &buf); 1102 if (stream->flags & UV_HANDLE_READING) { 1103 stream->flags &= ~UV_HANDLE_READING; 1104 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); 1105 uv__handle_stop(stream); 1106 uv__stream_osx_interrupt_select(stream); 1107 } 1108 } 1109 return; 1110 } else if (nread == 0) { 1111 uv__stream_eof(stream, &buf); 1112 return; 1113 } else { 1114 /* Successful read */ 1115 ssize_t buflen = buf.len; 1116 1117 if (is_ipc) { 1118 err = uv__stream_recv_cmsg(stream, &msg); 1119 if (err != 0) { 1120 stream->read_cb(stream, err, &buf); 1121 return; 1122 } 1123 } 1124 1125 #if defined(__MVS__) 1126 if (is_ipc && msg.msg_controllen > 0) { 1127 uv_buf_t blankbuf; 1128 int nread; 1129 struct iovec *old; 1130 1131 blankbuf.base = 0; 1132 blankbuf.len = 0; 1133 old = msg.msg_iov; 1134 msg.msg_iov = (struct iovec*) &blankbuf; 1135 nread = 0; 1136 do { 1137 nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0); 1138 err = uv__stream_recv_cmsg(stream, &msg); 1139 if (err != 0) { 1140 stream->read_cb(stream, err, &buf); 1141 msg.msg_iov = old; 1142 return; 1143 } 1144 } while (nread == 0 && msg.msg_controllen > 0); 1145 msg.msg_iov = old; 1146 } 1147 #endif 1148 stream->read_cb(stream, nread, &buf); 1149 1150 /* Return if we didn't fill the buffer, there is no more data to read. */ 1151 if (nread < buflen) { 1152 stream->flags |= UV_HANDLE_READ_PARTIAL; 1153 return; 1154 } 1155 } 1156 } 1157 } 1158 1159 1160 int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) { 1161 assert(stream->type == UV_TCP || 1162 stream->type == UV_TTY || 1163 stream->type == UV_NAMED_PIPE); 1164 1165 if (!(stream->flags & UV_HANDLE_WRITABLE) || 1166 stream->flags & UV_HANDLE_SHUT || 1167 uv__is_stream_shutting(stream) || 1168 uv__is_closing(stream)) { 1169 return UV_ENOTCONN; 1170 } 1171 1172 assert(uv__stream_fd(stream) >= 0); 1173 1174 /* Initialize request. The `shutdown(2)` call will always be deferred until 1175 * `uv__drain`, just before the callback is run. */ 1176 uv__req_init(stream->loop, req, UV_SHUTDOWN); 1177 req->handle = stream; 1178 req->cb = cb; 1179 stream->shutdown_req = req; 1180 stream->flags &= ~UV_HANDLE_WRITABLE; 1181 1182 if (uv__queue_empty(&stream->write_queue)) 1183 uv__io_feed(stream->loop, &stream->io_watcher); 1184 1185 return 0; 1186 } 1187 1188 1189 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { 1190 uv_stream_t* stream; 1191 1192 stream = container_of(w, uv_stream_t, io_watcher); 1193 1194 assert(stream->type == UV_TCP || 1195 stream->type == UV_NAMED_PIPE || 1196 stream->type == UV_TTY); 1197 assert(!(stream->flags & UV_HANDLE_CLOSING)); 1198 1199 if (stream->connect_req) { 1200 uv__stream_connect(stream); 1201 return; 1202 } 1203 1204 assert(uv__stream_fd(stream) >= 0); 1205 1206 /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */ 1207 if (events & (POLLIN | POLLERR | POLLHUP)) 1208 uv__read(stream); 1209 1210 if (uv__stream_fd(stream) == -1) 1211 return; /* read_cb closed stream. */ 1212 1213 /* Short-circuit iff POLLHUP is set, the user is still interested in read 1214 * events and uv__read() reported a partial read but not EOF. If the EOF 1215 * flag is set, uv__read() called read_cb with err=UV_EOF and we don't 1216 * have to do anything. If the partial read flag is not set, we can't 1217 * report the EOF yet because there is still data to read. 1218 */ 1219 if ((events & POLLHUP) && 1220 (stream->flags & UV_HANDLE_READING) && 1221 (stream->flags & UV_HANDLE_READ_PARTIAL) && 1222 !(stream->flags & UV_HANDLE_READ_EOF)) { 1223 uv_buf_t buf = { NULL, 0 }; 1224 uv__stream_eof(stream, &buf); 1225 } 1226 1227 if (uv__stream_fd(stream) == -1) 1228 return; /* read_cb closed stream. */ 1229 1230 if (events & (POLLOUT | POLLERR | POLLHUP)) { 1231 uv__write(stream); 1232 uv__write_callbacks(stream); 1233 1234 /* Write queue drained. */ 1235 if (uv__queue_empty(&stream->write_queue)) 1236 uv__drain(stream); 1237 } 1238 } 1239 1240 1241 /** 1242 * We get called here from directly following a call to connect(2). 1243 * In order to determine if we've errored out or succeeded must call 1244 * getsockopt. 1245 */ 1246 static void uv__stream_connect(uv_stream_t* stream) { 1247 int error; 1248 uv_connect_t* req = stream->connect_req; 1249 socklen_t errorsize = sizeof(int); 1250 1251 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE); 1252 assert(req); 1253 1254 if (stream->delayed_error) { 1255 /* To smooth over the differences between unixes errors that 1256 * were reported synchronously on the first connect can be delayed 1257 * until the next tick--which is now. 1258 */ 1259 error = stream->delayed_error; 1260 stream->delayed_error = 0; 1261 } else { 1262 /* Normal situation: we need to get the socket error from the kernel. */ 1263 assert(uv__stream_fd(stream) >= 0); 1264 getsockopt(uv__stream_fd(stream), 1265 SOL_SOCKET, 1266 SO_ERROR, 1267 &error, 1268 &errorsize); 1269 error = UV__ERR(error); 1270 } 1271 1272 if (error == UV__ERR(EINPROGRESS)) 1273 return; 1274 1275 stream->connect_req = NULL; 1276 uv__req_unregister(stream->loop); 1277 1278 if (error < 0 || uv__queue_empty(&stream->write_queue)) { 1279 uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); 1280 } 1281 1282 if (req->cb) 1283 req->cb(req, error); 1284 1285 if (uv__stream_fd(stream) == -1) 1286 return; 1287 1288 if (error < 0) { 1289 uv__stream_flush_write_queue(stream, UV_ECANCELED); 1290 uv__write_callbacks(stream); 1291 } 1292 } 1293 1294 1295 static int uv__check_before_write(uv_stream_t* stream, 1296 unsigned int nbufs, 1297 uv_stream_t* send_handle) { 1298 assert(nbufs > 0); 1299 assert((stream->type == UV_TCP || 1300 stream->type == UV_NAMED_PIPE || 1301 stream->type == UV_TTY) && 1302 "uv_write (unix) does not yet support other types of streams"); 1303 1304 if (uv__stream_fd(stream) < 0) 1305 return UV_EBADF; 1306 1307 if (!(stream->flags & UV_HANDLE_WRITABLE)) 1308 return UV_EPIPE; 1309 1310 if (send_handle != NULL) { 1311 if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc) 1312 return UV_EINVAL; 1313 1314 /* XXX We abuse uv_write2() to send over UDP handles to child processes. 1315 * Don't call uv__stream_fd() on those handles, it's a macro that on OS X 1316 * evaluates to a function that operates on a uv_stream_t with a couple of 1317 * OS X specific fields. On other Unices it does (handle)->io_watcher.fd, 1318 * which works but only by accident. 1319 */ 1320 if (uv__handle_fd((uv_handle_t*) send_handle) < 0) 1321 return UV_EBADF; 1322 1323 #if defined(__CYGWIN__) || defined(__MSYS__) 1324 /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it. 1325 See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */ 1326 return UV_ENOSYS; 1327 #endif 1328 } 1329 1330 return 0; 1331 } 1332 1333 int uv_write2(uv_write_t* req, 1334 uv_stream_t* stream, 1335 const uv_buf_t bufs[], 1336 unsigned int nbufs, 1337 uv_stream_t* send_handle, 1338 uv_write_cb cb) { 1339 int empty_queue; 1340 int err; 1341 1342 err = uv__check_before_write(stream, nbufs, send_handle); 1343 if (err < 0) 1344 return err; 1345 1346 /* It's legal for write_queue_size > 0 even when the write_queue is empty; 1347 * it means there are error-state requests in the write_completed_queue that 1348 * will touch up write_queue_size later, see also uv__write_req_finish(). 1349 * We could check that write_queue is empty instead but that implies making 1350 * a write() syscall when we know that the handle is in error mode. 1351 */ 1352 empty_queue = (stream->write_queue_size == 0); 1353 1354 /* Initialize the req */ 1355 uv__req_init(stream->loop, req, UV_WRITE); 1356 req->cb = cb; 1357 req->handle = stream; 1358 req->error = 0; 1359 req->send_handle = send_handle; 1360 uv__queue_init(&req->queue); 1361 1362 req->bufs = req->bufsml; 1363 if (nbufs > ARRAY_SIZE(req->bufsml)) 1364 req->bufs = uv__malloc(nbufs * sizeof(bufs[0])); 1365 1366 if (req->bufs == NULL) 1367 return UV_ENOMEM; 1368 1369 memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0])); 1370 req->nbufs = nbufs; 1371 req->write_index = 0; 1372 stream->write_queue_size += uv__count_bufs(bufs, nbufs); 1373 1374 /* Append the request to write_queue. */ 1375 uv__queue_insert_tail(&stream->write_queue, &req->queue); 1376 1377 /* If the queue was empty when this function began, we should attempt to 1378 * do the write immediately. Otherwise start the write_watcher and wait 1379 * for the fd to become writable. 1380 */ 1381 if (stream->connect_req) { 1382 /* Still connecting, do nothing. */ 1383 } 1384 else if (empty_queue) { 1385 uv__write(stream); 1386 } 1387 else { 1388 /* 1389 * blocking streams should never have anything in the queue. 1390 * if this assert fires then somehow the blocking stream isn't being 1391 * sufficiently flushed in uv__write. 1392 */ 1393 assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES)); 1394 uv__io_start(stream->loop, &stream->io_watcher, POLLOUT); 1395 uv__stream_osx_interrupt_select(stream); 1396 } 1397 1398 return 0; 1399 } 1400 1401 1402 /* The buffers to be written must remain valid until the callback is called. 1403 * This is not required for the uv_buf_t array. 1404 */ 1405 int uv_write(uv_write_t* req, 1406 uv_stream_t* handle, 1407 const uv_buf_t bufs[], 1408 unsigned int nbufs, 1409 uv_write_cb cb) { 1410 return uv_write2(req, handle, bufs, nbufs, NULL, cb); 1411 } 1412 1413 1414 int uv_try_write(uv_stream_t* stream, 1415 const uv_buf_t bufs[], 1416 unsigned int nbufs) { 1417 return uv_try_write2(stream, bufs, nbufs, NULL); 1418 } 1419 1420 1421 int uv_try_write2(uv_stream_t* stream, 1422 const uv_buf_t bufs[], 1423 unsigned int nbufs, 1424 uv_stream_t* send_handle) { 1425 int err; 1426 1427 /* Connecting or already writing some data */ 1428 if (stream->connect_req != NULL || stream->write_queue_size != 0) 1429 return UV_EAGAIN; 1430 1431 err = uv__check_before_write(stream, nbufs, NULL); 1432 if (err < 0) 1433 return err; 1434 1435 return uv__try_write(stream, bufs, nbufs, send_handle); 1436 } 1437 1438 1439 int uv__read_start(uv_stream_t* stream, 1440 uv_alloc_cb alloc_cb, 1441 uv_read_cb read_cb) { 1442 assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || 1443 stream->type == UV_TTY); 1444 1445 /* The UV_HANDLE_READING flag is irrelevant of the state of the stream - it 1446 * just expresses the desired state of the user. */ 1447 stream->flags |= UV_HANDLE_READING; 1448 stream->flags &= ~UV_HANDLE_READ_EOF; 1449 1450 /* TODO: try to do the read inline? */ 1451 assert(uv__stream_fd(stream) >= 0); 1452 assert(alloc_cb); 1453 1454 stream->read_cb = read_cb; 1455 stream->alloc_cb = alloc_cb; 1456 1457 uv__io_start(stream->loop, &stream->io_watcher, POLLIN); 1458 uv__handle_start(stream); 1459 uv__stream_osx_interrupt_select(stream); 1460 1461 return 0; 1462 } 1463 1464 1465 int uv_read_stop(uv_stream_t* stream) { 1466 if (!(stream->flags & UV_HANDLE_READING)) 1467 return 0; 1468 1469 stream->flags &= ~UV_HANDLE_READING; 1470 uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); 1471 uv__handle_stop(stream); 1472 uv__stream_osx_interrupt_select(stream); 1473 1474 stream->read_cb = NULL; 1475 stream->alloc_cb = NULL; 1476 return 0; 1477 } 1478 1479 1480 int uv_is_readable(const uv_stream_t* stream) { 1481 return !!(stream->flags & UV_HANDLE_READABLE); 1482 } 1483 1484 1485 int uv_is_writable(const uv_stream_t* stream) { 1486 return !!(stream->flags & UV_HANDLE_WRITABLE); 1487 } 1488 1489 1490 #if defined(__APPLE__) 1491 int uv___stream_fd(const uv_stream_t* handle) { 1492 const uv__stream_select_t* s; 1493 1494 assert(handle->type == UV_TCP || 1495 handle->type == UV_TTY || 1496 handle->type == UV_NAMED_PIPE); 1497 1498 s = handle->select; 1499 if (s != NULL) 1500 return s->fd; 1501 1502 return handle->io_watcher.fd; 1503 } 1504 #endif /* defined(__APPLE__) */ 1505 1506 1507 void uv__stream_close(uv_stream_t* handle) { 1508 unsigned int i; 1509 uv__stream_queued_fds_t* queued_fds; 1510 1511 #if defined(__APPLE__) 1512 /* Terminate select loop first */ 1513 if (handle->select != NULL) { 1514 uv__stream_select_t* s; 1515 1516 s = handle->select; 1517 1518 uv_sem_post(&s->close_sem); 1519 uv_sem_post(&s->async_sem); 1520 uv__stream_osx_interrupt_select(handle); 1521 uv_thread_join(&s->thread); 1522 uv_sem_destroy(&s->close_sem); 1523 uv_sem_destroy(&s->async_sem); 1524 uv__close(s->fake_fd); 1525 uv__close(s->int_fd); 1526 uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close); 1527 1528 handle->select = NULL; 1529 } 1530 #endif /* defined(__APPLE__) */ 1531 1532 uv__io_close(handle->loop, &handle->io_watcher); 1533 uv_read_stop(handle); 1534 uv__handle_stop(handle); 1535 handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); 1536 1537 if (handle->io_watcher.fd != -1) { 1538 /* Don't close stdio file descriptors. Nothing good comes from it. */ 1539 if (handle->io_watcher.fd > STDERR_FILENO) 1540 uv__close(handle->io_watcher.fd); 1541 handle->io_watcher.fd = -1; 1542 } 1543 1544 if (handle->accepted_fd != -1) { 1545 uv__close(handle->accepted_fd); 1546 handle->accepted_fd = -1; 1547 } 1548 1549 /* Close all queued fds */ 1550 if (handle->queued_fds != NULL) { 1551 queued_fds = handle->queued_fds; 1552 for (i = 0; i < queued_fds->offset; i++) 1553 uv__close(queued_fds->fds[i]); 1554 uv__free(handle->queued_fds); 1555 handle->queued_fds = NULL; 1556 } 1557 1558 assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT)); 1559 } 1560 1561 1562 int uv_stream_set_blocking(uv_stream_t* handle, int blocking) { 1563 /* Don't need to check the file descriptor, uv__nonblock() 1564 * will fail with EBADF if it's not valid. 1565 */ 1566 return uv__nonblock(uv__stream_fd(handle), !blocking); 1567 } 1568