Home | History | Annotate | Line # | Download | only in unix
      1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
      2  *
      3  * Permission is hereby granted, free of charge, to any person obtaining a copy
      4  * of this software and associated documentation files (the "Software"), to
      5  * deal in the Software without restriction, including without limitation the
      6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
      7  * sell copies of the Software, and to permit persons to whom the Software is
      8  * furnished to do so, subject to the following conditions:
      9  *
     10  * The above copyright notice and this permission notice shall be included in
     11  * all copies or substantial portions of the Software.
     12  *
     13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
     18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
     19  * IN THE SOFTWARE.
     20  */
     21 
     22 #include "uv.h"
     23 #include "internal.h"
     24 
     25 #include <stdio.h>
     26 #include <stdlib.h>
     27 #include <string.h>
     28 #include <assert.h>
     29 #include <errno.h>
     30 
     31 #include <sys/types.h>
     32 #include <sys/socket.h>
     33 #include <sys/uio.h>
     34 #include <sys/un.h>
     35 #include <unistd.h>
     36 #include <limits.h> /* IOV_MAX */
     37 
     38 #if defined(__APPLE__)
     39 # include <sys/event.h>
     40 # include <sys/time.h>
     41 # include <sys/select.h>
     42 
     43 /* Forward declaration */
     44 typedef struct uv__stream_select_s uv__stream_select_t;
     45 
     46 struct uv__stream_select_s {
     47   uv_stream_t* stream;
     48   uv_thread_t thread;
     49   uv_sem_t close_sem;
     50   uv_sem_t async_sem;
     51   uv_async_t async;
     52   int events;
     53   int fake_fd;
     54   int int_fd;
     55   int fd;
     56   fd_set* sread;
     57   size_t sread_sz;
     58   fd_set* swrite;
     59   size_t swrite_sz;
     60 };
     61 #endif /* defined(__APPLE__) */
     62 
     63 union uv__cmsg {
     64   struct cmsghdr hdr;
     65   /* This cannot be larger because of the IBMi PASE limitation that
     66    * the total size of control messages cannot exceed 256 bytes.
     67    */
     68   char pad[256];
     69 };
     70 
     71 STATIC_ASSERT(256 == sizeof(union uv__cmsg));
     72 
     73 static void uv__stream_connect(uv_stream_t*);
     74 static void uv__write(uv_stream_t* stream);
     75 static void uv__read(uv_stream_t* stream);
     76 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
     77 static void uv__write_callbacks(uv_stream_t* stream);
     78 static size_t uv__write_req_size(uv_write_t* req);
     79 static void uv__drain(uv_stream_t* stream);
     80 
     81 
     82 void uv__stream_init(uv_loop_t* loop,
     83                      uv_stream_t* stream,
     84                      uv_handle_type type) {
     85   int err;
     86 
     87   uv__handle_init(loop, (uv_handle_t*)stream, type);
     88   stream->read_cb = NULL;
     89   stream->alloc_cb = NULL;
     90   stream->close_cb = NULL;
     91   stream->connection_cb = NULL;
     92   stream->connect_req = NULL;
     93   stream->shutdown_req = NULL;
     94   stream->accepted_fd = -1;
     95   stream->queued_fds = NULL;
     96   stream->delayed_error = 0;
     97   uv__queue_init(&stream->write_queue);
     98   uv__queue_init(&stream->write_completed_queue);
     99   stream->write_queue_size = 0;
    100 
    101   if (loop->emfile_fd == -1) {
    102     err = uv__open_cloexec("/dev/null", O_RDONLY);
    103     if (err < 0)
    104         /* In the rare case that "/dev/null" isn't mounted open "/"
    105          * instead.
    106          */
    107         err = uv__open_cloexec("/", O_RDONLY);
    108     if (err >= 0)
    109       loop->emfile_fd = err;
    110   }
    111 
    112 #if defined(__APPLE__)
    113   stream->select = NULL;
    114 #endif /* defined(__APPLE_) */
    115 
    116   uv__io_init(&stream->io_watcher, uv__stream_io, -1);
    117 }
    118 
    119 
    120 static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
    121 #if defined(__APPLE__)
    122   /* Notify select() thread about state change */
    123   uv__stream_select_t* s;
    124   int r;
    125 
    126   s = stream->select;
    127   if (s == NULL)
    128     return;
    129 
    130   /* Interrupt select() loop
    131    * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
    132    * emit read event on other side
    133    */
    134   do
    135     r = write(s->fake_fd, "x", 1);
    136   while (r == -1 && errno == EINTR);
    137 
    138   assert(r == 1);
    139 #else  /* !defined(__APPLE__) */
    140   /* No-op on any other platform */
    141 #endif  /* !defined(__APPLE__) */
    142 }
    143 
    144 
    145 #if defined(__APPLE__)
    146 static void uv__stream_osx_select(void* arg) {
    147   uv_stream_t* stream;
    148   uv__stream_select_t* s;
    149   char buf[1024];
    150   int events;
    151   int fd;
    152   int r;
    153   int max_fd;
    154 
    155   stream = arg;
    156   s = stream->select;
    157   fd = s->fd;
    158 
    159   if (fd > s->int_fd)
    160     max_fd = fd;
    161   else
    162     max_fd = s->int_fd;
    163 
    164   for (;;) {
    165     /* Terminate on semaphore */
    166     if (uv_sem_trywait(&s->close_sem) == 0)
    167       break;
    168 
    169     /* Watch fd using select(2) */
    170     memset(s->sread, 0, s->sread_sz);
    171     memset(s->swrite, 0, s->swrite_sz);
    172 
    173     if (uv__io_active(&stream->io_watcher, POLLIN))
    174       FD_SET(fd, s->sread);
    175     if (uv__io_active(&stream->io_watcher, POLLOUT))
    176       FD_SET(fd, s->swrite);
    177     FD_SET(s->int_fd, s->sread);
    178 
    179     /* Wait indefinitely for fd events */
    180     r = select(max_fd + 1, s->sread, s->swrite, NULL, NULL);
    181     if (r == -1) {
    182       if (errno == EINTR)
    183         continue;
    184 
    185       /* XXX: Possible?! */
    186       abort();
    187     }
    188 
    189     /* Ignore timeouts */
    190     if (r == 0)
    191       continue;
    192 
    193     /* Empty socketpair's buffer in case of interruption */
    194     if (FD_ISSET(s->int_fd, s->sread))
    195       for (;;) {
    196         r = read(s->int_fd, buf, sizeof(buf));
    197 
    198         if (r == sizeof(buf))
    199           continue;
    200 
    201         if (r != -1)
    202           break;
    203 
    204         if (errno == EAGAIN || errno == EWOULDBLOCK)
    205           break;
    206 
    207         if (errno == EINTR)
    208           continue;
    209 
    210         abort();
    211       }
    212 
    213     /* Handle events */
    214     events = 0;
    215     if (FD_ISSET(fd, s->sread))
    216       events |= POLLIN;
    217     if (FD_ISSET(fd, s->swrite))
    218       events |= POLLOUT;
    219 
    220     assert(events != 0 || FD_ISSET(s->int_fd, s->sread));
    221     if (events != 0) {
    222       ACCESS_ONCE(int, s->events) = events;
    223 
    224       uv_async_send(&s->async);
    225       uv_sem_wait(&s->async_sem);
    226 
    227       /* Should be processed at this stage */
    228       assert((s->events == 0) || (stream->flags & UV_HANDLE_CLOSING));
    229     }
    230   }
    231 }
    232 
    233 
    234 static void uv__stream_osx_select_cb(uv_async_t* handle) {
    235   uv__stream_select_t* s;
    236   uv_stream_t* stream;
    237   int events;
    238 
    239   s = container_of(handle, uv__stream_select_t, async);
    240   stream = s->stream;
    241 
    242   /* Get and reset stream's events */
    243   events = s->events;
    244   ACCESS_ONCE(int, s->events) = 0;
    245 
    246   assert(events != 0);
    247   assert(events == (events & (POLLIN | POLLOUT)));
    248 
    249   /* Invoke callback on event-loop */
    250   if ((events & POLLIN) && uv__io_active(&stream->io_watcher, POLLIN))
    251     uv__stream_io(stream->loop, &stream->io_watcher, POLLIN);
    252 
    253   if ((events & POLLOUT) && uv__io_active(&stream->io_watcher, POLLOUT))
    254     uv__stream_io(stream->loop, &stream->io_watcher, POLLOUT);
    255 
    256   if (stream->flags & UV_HANDLE_CLOSING)
    257     return;
    258 
    259   /* NOTE: It is important to do it here, otherwise `select()` might be called
    260    * before the actual `uv__read()`, leading to the blocking syscall
    261    */
    262   uv_sem_post(&s->async_sem);
    263 }
    264 
    265 
    266 static void uv__stream_osx_cb_close(uv_handle_t* async) {
    267   uv__stream_select_t* s;
    268 
    269   s = container_of(async, uv__stream_select_t, async);
    270   uv__free(s);
    271 }
    272 
    273 
    274 int uv__stream_try_select(uv_stream_t* stream, int* fd) {
    275   /*
    276    * kqueue doesn't work with some files from /dev mount on osx.
    277    * select(2) in separate thread for those fds
    278    */
    279 
    280   struct kevent filter[1];
    281   struct kevent events[1];
    282   struct timespec timeout;
    283   uv__stream_select_t* s;
    284   int fds[2];
    285   int err;
    286   int ret;
    287   int kq;
    288   int old_fd;
    289   int max_fd;
    290   size_t sread_sz;
    291   size_t swrite_sz;
    292 
    293   kq = kqueue();
    294   if (kq == -1) {
    295     perror("(libuv) kqueue()");
    296     return UV__ERR(errno);
    297   }
    298 
    299   EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
    300 
    301   /* Use small timeout, because we only want to capture EINVALs */
    302   timeout.tv_sec = 0;
    303   timeout.tv_nsec = 1;
    304 
    305   do
    306     ret = kevent(kq, filter, 1, events, 1, &timeout);
    307   while (ret == -1 && errno == EINTR);
    308 
    309   uv__close(kq);
    310 
    311   if (ret == -1)
    312     return UV__ERR(errno);
    313 
    314   if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
    315     return 0;
    316 
    317   /* At this point we definitely know that this fd won't work with kqueue */
    318 
    319   /*
    320    * Create fds for io watcher and to interrupt the select() loop.
    321    * NOTE: do it ahead of malloc below to allocate enough space for fd_sets
    322    */
    323   if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
    324     return UV__ERR(errno);
    325 
    326   max_fd = *fd;
    327   if (fds[1] > max_fd)
    328     max_fd = fds[1];
    329 
    330   sread_sz = ROUND_UP(max_fd + 1, sizeof(uint32_t) * NBBY) / NBBY;
    331   swrite_sz = sread_sz;
    332 
    333   s = uv__malloc(sizeof(*s) + sread_sz + swrite_sz);
    334   if (s == NULL) {
    335     err = UV_ENOMEM;
    336     goto failed_malloc;
    337   }
    338 
    339   s->events = 0;
    340   s->fd = *fd;
    341   s->sread = (fd_set*) ((char*) s + sizeof(*s));
    342   s->sread_sz = sread_sz;
    343   s->swrite = (fd_set*) ((char*) s->sread + sread_sz);
    344   s->swrite_sz = swrite_sz;
    345 
    346   err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
    347   if (err)
    348     goto failed_async_init;
    349 
    350   s->async.flags |= UV_HANDLE_INTERNAL;
    351   uv__handle_unref(&s->async);
    352 
    353   err = uv_sem_init(&s->close_sem, 0);
    354   if (err != 0)
    355     goto failed_close_sem_init;
    356 
    357   err = uv_sem_init(&s->async_sem, 0);
    358   if (err != 0)
    359     goto failed_async_sem_init;
    360 
    361   s->fake_fd = fds[0];
    362   s->int_fd = fds[1];
    363 
    364   old_fd = *fd;
    365   s->stream = stream;
    366   stream->select = s;
    367   *fd = s->fake_fd;
    368 
    369   err = uv_thread_create(&s->thread, uv__stream_osx_select, stream);
    370   if (err != 0)
    371     goto failed_thread_create;
    372 
    373   return 0;
    374 
    375 failed_thread_create:
    376   s->stream = NULL;
    377   stream->select = NULL;
    378   *fd = old_fd;
    379 
    380   uv_sem_destroy(&s->async_sem);
    381 
    382 failed_async_sem_init:
    383   uv_sem_destroy(&s->close_sem);
    384 
    385 failed_close_sem_init:
    386   uv__close(fds[0]);
    387   uv__close(fds[1]);
    388   uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
    389   return err;
    390 
    391 failed_async_init:
    392   uv__free(s);
    393 
    394 failed_malloc:
    395   uv__close(fds[0]);
    396   uv__close(fds[1]);
    397 
    398   return err;
    399 }
    400 #endif /* defined(__APPLE__) */
    401 
    402 
    403 int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
    404 #if defined(__APPLE__)
    405   int enable;
    406 #endif
    407 
    408   if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
    409     return UV_EBUSY;
    410 
    411   assert(fd >= 0);
    412   stream->flags |= flags;
    413 
    414   if (stream->type == UV_TCP) {
    415     if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
    416       return UV__ERR(errno);
    417 
    418     /* TODO Use delay the user passed in. */
    419     if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
    420         uv__tcp_keepalive(fd, 1, 60)) {
    421       return UV__ERR(errno);
    422     }
    423   }
    424 
    425 #if defined(__APPLE__)
    426   enable = 1;
    427   if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
    428       errno != ENOTSOCK &&
    429       errno != EINVAL) {
    430     return UV__ERR(errno);
    431   }
    432 #endif
    433 
    434   stream->io_watcher.fd = fd;
    435 
    436   return 0;
    437 }
    438 
    439 
    440 void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
    441   uv_write_t* req;
    442   struct uv__queue* q;
    443   while (!uv__queue_empty(&stream->write_queue)) {
    444     q = uv__queue_head(&stream->write_queue);
    445     uv__queue_remove(q);
    446 
    447     req = uv__queue_data(q, uv_write_t, queue);
    448     req->error = error;
    449 
    450     uv__queue_insert_tail(&stream->write_completed_queue, &req->queue);
    451   }
    452 }
    453 
    454 
    455 void uv__stream_destroy(uv_stream_t* stream) {
    456   assert(!uv__io_active(&stream->io_watcher, POLLIN | POLLOUT));
    457   assert(stream->flags & UV_HANDLE_CLOSED);
    458 
    459   if (stream->connect_req) {
    460     uv__req_unregister(stream->loop);
    461     stream->connect_req->cb(stream->connect_req, UV_ECANCELED);
    462     stream->connect_req = NULL;
    463   }
    464 
    465   uv__stream_flush_write_queue(stream, UV_ECANCELED);
    466   uv__write_callbacks(stream);
    467   uv__drain(stream);
    468 
    469   assert(stream->write_queue_size == 0);
    470 }
    471 
    472 
    473 /* Implements a best effort approach to mitigating accept() EMFILE errors.
    474  * We have a spare file descriptor stashed away that we close to get below
    475  * the EMFILE limit. Next, we accept all pending connections and close them
    476  * immediately to signal the clients that we're overloaded - and we are, but
    477  * we still keep on trucking.
    478  *
    479  * There is one caveat: it's not reliable in a multi-threaded environment.
    480  * The file descriptor limit is per process. Our party trick fails if another
    481  * thread opens a file or creates a socket in the time window between us
    482  * calling close() and accept().
    483  */
    484 static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
    485   int err;
    486   int emfile_fd;
    487 
    488   if (loop->emfile_fd == -1)
    489     return UV_EMFILE;
    490 
    491   uv__close(loop->emfile_fd);
    492   loop->emfile_fd = -1;
    493 
    494   do {
    495     err = uv__accept(accept_fd);
    496     if (err >= 0)
    497       uv__close(err);
    498   } while (err >= 0 || err == UV_EINTR);
    499 
    500   emfile_fd = uv__open_cloexec("/", O_RDONLY);
    501   if (emfile_fd >= 0)
    502     loop->emfile_fd = emfile_fd;
    503 
    504   return err;
    505 }
    506 
    507 
    508 void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
    509   uv_stream_t* stream;
    510   int err;
    511   int fd;
    512 
    513   stream = container_of(w, uv_stream_t, io_watcher);
    514   assert(events & POLLIN);
    515   assert(stream->accepted_fd == -1);
    516   assert(!(stream->flags & UV_HANDLE_CLOSING));
    517 
    518   fd = uv__stream_fd(stream);
    519   err = uv__accept(fd);
    520 
    521   if (err == UV_EMFILE || err == UV_ENFILE)
    522     err = uv__emfile_trick(loop, fd);  /* Shed load. */
    523 
    524   if (err < 0)
    525     return;
    526 
    527   stream->accepted_fd = err;
    528   stream->connection_cb(stream, 0);
    529 
    530   if (stream->accepted_fd != -1)
    531     /* The user hasn't yet accepted called uv_accept() */
    532     uv__io_stop(loop, &stream->io_watcher, POLLIN);
    533 }
    534 
    535 
    536 int uv_accept(uv_stream_t* server, uv_stream_t* client) {
    537   int err;
    538 
    539   assert(server->loop == client->loop);
    540 
    541   if (server->accepted_fd == -1)
    542     return UV_EAGAIN;
    543 
    544   switch (client->type) {
    545     case UV_NAMED_PIPE:
    546     case UV_TCP:
    547       err = uv__stream_open(client,
    548                             server->accepted_fd,
    549                             UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
    550       if (err) {
    551         /* TODO handle error */
    552         uv__close(server->accepted_fd);
    553         goto done;
    554       }
    555       break;
    556 
    557     case UV_UDP:
    558       err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
    559       if (err) {
    560         uv__close(server->accepted_fd);
    561         goto done;
    562       }
    563       break;
    564 
    565     default:
    566       return UV_EINVAL;
    567   }
    568 
    569   client->flags |= UV_HANDLE_BOUND;
    570 
    571 done:
    572   /* Process queued fds */
    573   if (server->queued_fds != NULL) {
    574     uv__stream_queued_fds_t* queued_fds;
    575 
    576     queued_fds = server->queued_fds;
    577 
    578     /* Read first */
    579     server->accepted_fd = queued_fds->fds[0];
    580 
    581     /* All read, free */
    582     assert(queued_fds->offset > 0);
    583     if (--queued_fds->offset == 0) {
    584       uv__free(queued_fds);
    585       server->queued_fds = NULL;
    586     } else {
    587       /* Shift rest */
    588       memmove(queued_fds->fds,
    589               queued_fds->fds + 1,
    590               queued_fds->offset * sizeof(*queued_fds->fds));
    591     }
    592   } else {
    593     server->accepted_fd = -1;
    594     if (err == 0)
    595       uv__io_start(server->loop, &server->io_watcher, POLLIN);
    596   }
    597   return err;
    598 }
    599 
    600 
    601 int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
    602   int err;
    603   if (uv__is_closing(stream)) {
    604     return UV_EINVAL;
    605   }
    606   switch (stream->type) {
    607   case UV_TCP:
    608     err = uv__tcp_listen((uv_tcp_t*)stream, backlog, cb);
    609     break;
    610 
    611   case UV_NAMED_PIPE:
    612     err = uv__pipe_listen((uv_pipe_t*)stream, backlog, cb);
    613     break;
    614 
    615   default:
    616     err = UV_EINVAL;
    617   }
    618 
    619   if (err == 0)
    620     uv__handle_start(stream);
    621 
    622   return err;
    623 }
    624 
    625 
    626 static void uv__drain(uv_stream_t* stream) {
    627   uv_shutdown_t* req;
    628   int err;
    629 
    630   assert(uv__queue_empty(&stream->write_queue));
    631   if (!(stream->flags & UV_HANDLE_CLOSING)) {
    632     uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
    633     uv__stream_osx_interrupt_select(stream);
    634   }
    635 
    636   if (!uv__is_stream_shutting(stream))
    637     return;
    638 
    639   req = stream->shutdown_req;
    640   assert(req);
    641 
    642   if ((stream->flags & UV_HANDLE_CLOSING) ||
    643       !(stream->flags & UV_HANDLE_SHUT)) {
    644     stream->shutdown_req = NULL;
    645     uv__req_unregister(stream->loop);
    646 
    647     err = 0;
    648     if (stream->flags & UV_HANDLE_CLOSING)
    649       /* The user destroyed the stream before we got to do the shutdown. */
    650       err = UV_ECANCELED;
    651     else if (shutdown(uv__stream_fd(stream), SHUT_WR))
    652       err = UV__ERR(errno);
    653     else /* Success. */
    654       stream->flags |= UV_HANDLE_SHUT;
    655 
    656     if (req->cb != NULL)
    657       req->cb(req, err);
    658   }
    659 }
    660 
    661 
    662 static ssize_t uv__writev(int fd, struct iovec* vec, size_t n) {
    663   if (n == 1)
    664     return write(fd, vec->iov_base, vec->iov_len);
    665   else
    666     return writev(fd, vec, n);
    667 }
    668 
    669 
    670 static size_t uv__write_req_size(uv_write_t* req) {
    671   size_t size;
    672 
    673   assert(req->bufs != NULL);
    674   size = uv__count_bufs(req->bufs + req->write_index,
    675                         req->nbufs - req->write_index);
    676   assert(req->handle->write_queue_size >= size);
    677 
    678   return size;
    679 }
    680 
    681 
    682 /* Returns 1 if all write request data has been written, or 0 if there is still
    683  * more data to write.
    684  *
    685  * Note: the return value only says something about the *current* request.
    686  * There may still be other write requests sitting in the queue.
    687  */
    688 static int uv__write_req_update(uv_stream_t* stream,
    689                                 uv_write_t* req,
    690                                 size_t n) {
    691   uv_buf_t* buf;
    692   size_t len;
    693 
    694   assert(n <= stream->write_queue_size);
    695   stream->write_queue_size -= n;
    696 
    697   buf = req->bufs + req->write_index;
    698 
    699   do {
    700     len = n < buf->len ? n : buf->len;
    701     if (buf->len != 0)
    702       buf->base += len;
    703     buf->len -= len;
    704     buf += (buf->len == 0);  /* Advance to next buffer if this one is empty. */
    705     n -= len;
    706   } while (n > 0);
    707 
    708   req->write_index = buf - req->bufs;
    709 
    710   return req->write_index == req->nbufs;
    711 }
    712 
    713 
    714 static void uv__write_req_finish(uv_write_t* req) {
    715   uv_stream_t* stream = req->handle;
    716 
    717   /* Pop the req off tcp->write_queue. */
    718   uv__queue_remove(&req->queue);
    719 
    720   /* Only free when there was no error. On error, we touch up write_queue_size
    721    * right before making the callback. The reason we don't do that right away
    722    * is that a write_queue_size > 0 is our only way to signal to the user that
    723    * they should stop writing - which they should if we got an error. Something
    724    * to revisit in future revisions of the libuv API.
    725    */
    726   if (req->error == 0) {
    727     if (req->bufs != req->bufsml)
    728       uv__free(req->bufs);
    729     req->bufs = NULL;
    730   }
    731 
    732   /* Add it to the write_completed_queue where it will have its
    733    * callback called in the near future.
    734    */
    735   uv__queue_insert_tail(&stream->write_completed_queue, &req->queue);
    736   uv__io_feed(stream->loop, &stream->io_watcher);
    737 }
    738 
    739 
    740 static int uv__handle_fd(uv_handle_t* handle) {
    741   switch (handle->type) {
    742     case UV_NAMED_PIPE:
    743     case UV_TCP:
    744       return ((uv_stream_t*) handle)->io_watcher.fd;
    745 
    746     case UV_UDP:
    747       return ((uv_udp_t*) handle)->io_watcher.fd;
    748 
    749     default:
    750       return -1;
    751   }
    752 }
    753 
    754 static int uv__try_write(uv_stream_t* stream,
    755                          const uv_buf_t bufs[],
    756                          unsigned int nbufs,
    757                          uv_stream_t* send_handle) {
    758   struct iovec* iov;
    759   int iovmax;
    760   int iovcnt;
    761   ssize_t n;
    762 
    763   /*
    764    * Cast to iovec. We had to have our own uv_buf_t instead of iovec
    765    * because Windows's WSABUF is not an iovec.
    766    */
    767   iov = (struct iovec*) bufs;
    768   iovcnt = nbufs;
    769 
    770   iovmax = uv__getiovmax();
    771 
    772   /* Limit iov count to avoid EINVALs from writev() */
    773   if (iovcnt > iovmax)
    774     iovcnt = iovmax;
    775 
    776   /*
    777    * Now do the actual writev. Note that we've been updating the pointers
    778    * inside the iov each time we write. So there is no need to offset it.
    779    */
    780   if (send_handle != NULL) {
    781     int fd_to_send;
    782     struct msghdr msg;
    783     union uv__cmsg cmsg;
    784 
    785     if (uv__is_closing(send_handle))
    786       return UV_EBADF;
    787 
    788     fd_to_send = uv__handle_fd((uv_handle_t*) send_handle);
    789 
    790     memset(&cmsg, 0, sizeof(cmsg));
    791 
    792     assert(fd_to_send >= 0);
    793 
    794     msg.msg_name = NULL;
    795     msg.msg_namelen = 0;
    796     msg.msg_iov = iov;
    797     msg.msg_iovlen = iovcnt;
    798     msg.msg_flags = 0;
    799 
    800     msg.msg_control = &cmsg.hdr;
    801     msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
    802 
    803     cmsg.hdr.cmsg_level = SOL_SOCKET;
    804     cmsg.hdr.cmsg_type = SCM_RIGHTS;
    805     cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(fd_to_send));
    806     memcpy(CMSG_DATA(&cmsg.hdr), &fd_to_send, sizeof(fd_to_send));
    807 
    808     do
    809       n = sendmsg(uv__stream_fd(stream), &msg, 0);
    810     while (n == -1 && errno == EINTR);
    811   } else {
    812     do
    813       n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
    814     while (n == -1 && errno == EINTR);
    815   }
    816 
    817   if (n >= 0)
    818     return n;
    819 
    820   if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
    821     return UV_EAGAIN;
    822 
    823 #ifdef __APPLE__
    824   /* macOS versions 10.10 and 10.15 - and presumbaly 10.11 to 10.14, too -
    825    * have a bug where a race condition causes the kernel to return EPROTOTYPE
    826    * because the socket isn't fully constructed. It's probably the result of
    827    * the peer closing the connection and that is why libuv translates it to
    828    * ECONNRESET. Previously, libuv retried until the EPROTOTYPE error went
    829    * away but some VPN software causes the same behavior except the error is
    830    * permanent, not transient, turning the retry mechanism into an infinite
    831    * loop. See https://github.com/libuv/libuv/pull/482.
    832    */
    833   if (errno == EPROTOTYPE)
    834     return UV_ECONNRESET;
    835 #endif  /* __APPLE__ */
    836 
    837   return UV__ERR(errno);
    838 }
    839 
    840 static void uv__write(uv_stream_t* stream) {
    841   struct uv__queue* q;
    842   uv_write_t* req;
    843   ssize_t n;
    844   int count;
    845 
    846   assert(uv__stream_fd(stream) >= 0);
    847 
    848   /* Prevent loop starvation when the consumer of this stream read as fast as
    849    * (or faster than) we can write it. This `count` mechanism does not need to
    850    * change even if we switch to edge-triggered I/O.
    851    */
    852   count = 32;
    853 
    854   for (;;) {
    855     if (uv__queue_empty(&stream->write_queue))
    856       return;
    857 
    858     q = uv__queue_head(&stream->write_queue);
    859     req = uv__queue_data(q, uv_write_t, queue);
    860     assert(req->handle == stream);
    861 
    862     n = uv__try_write(stream,
    863                       &(req->bufs[req->write_index]),
    864                       req->nbufs - req->write_index,
    865                       req->send_handle);
    866 
    867     /* Ensure the handle isn't sent again in case this is a partial write. */
    868     if (n >= 0) {
    869       req->send_handle = NULL;
    870       if (uv__write_req_update(stream, req, n)) {
    871         uv__write_req_finish(req);
    872         if (count-- > 0)
    873           continue; /* Start trying to write the next request. */
    874 
    875         return;
    876       }
    877     } else if (n != UV_EAGAIN)
    878       goto error;
    879 
    880     /* If this is a blocking stream, try again. */
    881     if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
    882       continue;
    883 
    884     /* We're not done. */
    885     uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
    886 
    887     /* Notify select() thread about state change */
    888     uv__stream_osx_interrupt_select(stream);
    889 
    890     return;
    891   }
    892 
    893 error:
    894   req->error = n;
    895   uv__write_req_finish(req);
    896   uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
    897   uv__stream_osx_interrupt_select(stream);
    898 }
    899 
    900 
    901 static void uv__write_callbacks(uv_stream_t* stream) {
    902   uv_write_t* req;
    903   struct uv__queue* q;
    904   struct uv__queue pq;
    905 
    906   if (uv__queue_empty(&stream->write_completed_queue))
    907     return;
    908 
    909   uv__queue_move(&stream->write_completed_queue, &pq);
    910 
    911   while (!uv__queue_empty(&pq)) {
    912     /* Pop a req off write_completed_queue. */
    913     q = uv__queue_head(&pq);
    914     req = uv__queue_data(q, uv_write_t, queue);
    915     uv__queue_remove(q);
    916     uv__req_unregister(stream->loop);
    917 
    918     if (req->bufs != NULL) {
    919       stream->write_queue_size -= uv__write_req_size(req);
    920       if (req->bufs != req->bufsml)
    921         uv__free(req->bufs);
    922       req->bufs = NULL;
    923     }
    924 
    925     /* NOTE: call callback AFTER freeing the request data. */
    926     if (req->cb)
    927       req->cb(req, req->error);
    928   }
    929 }
    930 
    931 
    932 static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
    933   stream->flags |= UV_HANDLE_READ_EOF;
    934   stream->flags &= ~UV_HANDLE_READING;
    935   uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
    936   uv__handle_stop(stream);
    937   uv__stream_osx_interrupt_select(stream);
    938   stream->read_cb(stream, UV_EOF, buf);
    939 }
    940 
    941 
    942 static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
    943   uv__stream_queued_fds_t* queued_fds;
    944   unsigned int queue_size;
    945 
    946   queued_fds = stream->queued_fds;
    947   if (queued_fds == NULL) {
    948     queue_size = 8;
    949     queued_fds = uv__malloc((queue_size - 1) * sizeof(*queued_fds->fds) +
    950                             sizeof(*queued_fds));
    951     if (queued_fds == NULL)
    952       return UV_ENOMEM;
    953     queued_fds->size = queue_size;
    954     queued_fds->offset = 0;
    955     stream->queued_fds = queued_fds;
    956 
    957     /* Grow */
    958   } else if (queued_fds->size == queued_fds->offset) {
    959     queue_size = queued_fds->size + 8;
    960     queued_fds = uv__realloc(queued_fds,
    961                              (queue_size - 1) * sizeof(*queued_fds->fds) +
    962                               sizeof(*queued_fds));
    963 
    964     /*
    965      * Allocation failure, report back.
    966      * NOTE: if it is fatal - sockets will be closed in uv__stream_close
    967      */
    968     if (queued_fds == NULL)
    969       return UV_ENOMEM;
    970     queued_fds->size = queue_size;
    971     stream->queued_fds = queued_fds;
    972   }
    973 
    974   /* Put fd in a queue */
    975   queued_fds->fds[queued_fds->offset++] = fd;
    976 
    977   return 0;
    978 }
    979 
    980 
    981 static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) {
    982   struct cmsghdr* cmsg;
    983   char* p;
    984   char* pe;
    985   int fd;
    986   int err;
    987   size_t count;
    988 
    989   err = 0;
    990   for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
    991     if (cmsg->cmsg_type != SCM_RIGHTS) {
    992       fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
    993           cmsg->cmsg_type);
    994       continue;
    995     }
    996 
    997     assert(cmsg->cmsg_len >= CMSG_LEN(0));
    998     count = cmsg->cmsg_len - CMSG_LEN(0);
    999     assert(count % sizeof(fd) == 0);
   1000     count /= sizeof(fd);
   1001 
   1002     p = (void*) CMSG_DATA(cmsg);
   1003     pe = p + count * sizeof(fd);
   1004 
   1005     while (p < pe) {
   1006       memcpy(&fd, p, sizeof(fd));
   1007       p += sizeof(fd);
   1008 
   1009       if (err == 0) {
   1010         if (stream->accepted_fd == -1)
   1011           stream->accepted_fd = fd;
   1012         else
   1013           err = uv__stream_queue_fd(stream, fd);
   1014       }
   1015 
   1016       if (err != 0)
   1017         uv__close(fd);
   1018     }
   1019   }
   1020 
   1021   return err;
   1022 }
   1023 
   1024 
   1025 static void uv__read(uv_stream_t* stream) {
   1026   uv_buf_t buf;
   1027   ssize_t nread;
   1028   struct msghdr msg;
   1029   union uv__cmsg cmsg;
   1030   int count;
   1031   int err;
   1032   int is_ipc;
   1033 
   1034   stream->flags &= ~UV_HANDLE_READ_PARTIAL;
   1035 
   1036   /* Prevent loop starvation when the data comes in as fast as (or faster than)
   1037    * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
   1038    */
   1039   count = 32;
   1040 
   1041   is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
   1042 
   1043   /* XXX: Maybe instead of having UV_HANDLE_READING we just test if
   1044    * tcp->read_cb is NULL or not?
   1045    */
   1046   while (stream->read_cb
   1047       && (stream->flags & UV_HANDLE_READING)
   1048       && (count-- > 0)) {
   1049     assert(stream->alloc_cb != NULL);
   1050 
   1051     buf = uv_buf_init(NULL, 0);
   1052     stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
   1053     if (buf.base == NULL || buf.len == 0) {
   1054       /* User indicates it can't or won't handle the read. */
   1055       stream->read_cb(stream, UV_ENOBUFS, &buf);
   1056       return;
   1057     }
   1058 
   1059     assert(buf.base != NULL);
   1060     assert(uv__stream_fd(stream) >= 0);
   1061 
   1062     if (!is_ipc) {
   1063       do {
   1064         nread = read(uv__stream_fd(stream), buf.base, buf.len);
   1065       }
   1066       while (nread < 0 && errno == EINTR);
   1067     } else {
   1068       /* ipc uses recvmsg */
   1069       msg.msg_flags = 0;
   1070       msg.msg_iov = (struct iovec*) &buf;
   1071       msg.msg_iovlen = 1;
   1072       msg.msg_name = NULL;
   1073       msg.msg_namelen = 0;
   1074       /* Set up to receive a descriptor even if one isn't in the message */
   1075       msg.msg_controllen = sizeof(cmsg);
   1076       msg.msg_control = &cmsg.hdr;
   1077 
   1078       do {
   1079         nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
   1080       }
   1081       while (nread < 0 && errno == EINTR);
   1082     }
   1083 
   1084     if (nread < 0) {
   1085       /* Error */
   1086       if (errno == EAGAIN || errno == EWOULDBLOCK) {
   1087         /* Wait for the next one. */
   1088         if (stream->flags & UV_HANDLE_READING) {
   1089           uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
   1090           uv__stream_osx_interrupt_select(stream);
   1091         }
   1092         stream->read_cb(stream, 0, &buf);
   1093 #if defined(__CYGWIN__) || defined(__MSYS__)
   1094       } else if (errno == ECONNRESET && stream->type == UV_NAMED_PIPE) {
   1095         uv__stream_eof(stream, &buf);
   1096         return;
   1097 #endif
   1098       } else {
   1099         /* Error. User should call uv_close(). */
   1100         stream->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
   1101         stream->read_cb(stream, UV__ERR(errno), &buf);
   1102         if (stream->flags & UV_HANDLE_READING) {
   1103           stream->flags &= ~UV_HANDLE_READING;
   1104           uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
   1105           uv__handle_stop(stream);
   1106           uv__stream_osx_interrupt_select(stream);
   1107         }
   1108       }
   1109       return;
   1110     } else if (nread == 0) {
   1111       uv__stream_eof(stream, &buf);
   1112       return;
   1113     } else {
   1114       /* Successful read */
   1115       ssize_t buflen = buf.len;
   1116 
   1117       if (is_ipc) {
   1118         err = uv__stream_recv_cmsg(stream, &msg);
   1119         if (err != 0) {
   1120           stream->read_cb(stream, err, &buf);
   1121           return;
   1122         }
   1123       }
   1124 
   1125 #if defined(__MVS__)
   1126       if (is_ipc && msg.msg_controllen > 0) {
   1127         uv_buf_t blankbuf;
   1128         int nread;
   1129         struct iovec *old;
   1130 
   1131         blankbuf.base = 0;
   1132         blankbuf.len = 0;
   1133         old = msg.msg_iov;
   1134         msg.msg_iov = (struct iovec*) &blankbuf;
   1135         nread = 0;
   1136         do {
   1137           nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
   1138           err = uv__stream_recv_cmsg(stream, &msg);
   1139           if (err != 0) {
   1140             stream->read_cb(stream, err, &buf);
   1141             msg.msg_iov = old;
   1142             return;
   1143           }
   1144         } while (nread == 0 && msg.msg_controllen > 0);
   1145         msg.msg_iov = old;
   1146       }
   1147 #endif
   1148       stream->read_cb(stream, nread, &buf);
   1149 
   1150       /* Return if we didn't fill the buffer, there is no more data to read. */
   1151       if (nread < buflen) {
   1152         stream->flags |= UV_HANDLE_READ_PARTIAL;
   1153         return;
   1154       }
   1155     }
   1156   }
   1157 }
   1158 
   1159 
   1160 int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
   1161   assert(stream->type == UV_TCP ||
   1162          stream->type == UV_TTY ||
   1163          stream->type == UV_NAMED_PIPE);
   1164 
   1165   if (!(stream->flags & UV_HANDLE_WRITABLE) ||
   1166       stream->flags & UV_HANDLE_SHUT ||
   1167       uv__is_stream_shutting(stream) ||
   1168       uv__is_closing(stream)) {
   1169     return UV_ENOTCONN;
   1170   }
   1171 
   1172   assert(uv__stream_fd(stream) >= 0);
   1173 
   1174   /* Initialize request. The `shutdown(2)` call will always be deferred until
   1175    * `uv__drain`, just before the callback is run. */
   1176   uv__req_init(stream->loop, req, UV_SHUTDOWN);
   1177   req->handle = stream;
   1178   req->cb = cb;
   1179   stream->shutdown_req = req;
   1180   stream->flags &= ~UV_HANDLE_WRITABLE;
   1181 
   1182   if (uv__queue_empty(&stream->write_queue))
   1183     uv__io_feed(stream->loop, &stream->io_watcher);
   1184 
   1185   return 0;
   1186 }
   1187 
   1188 
   1189 static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
   1190   uv_stream_t* stream;
   1191 
   1192   stream = container_of(w, uv_stream_t, io_watcher);
   1193 
   1194   assert(stream->type == UV_TCP ||
   1195          stream->type == UV_NAMED_PIPE ||
   1196          stream->type == UV_TTY);
   1197   assert(!(stream->flags & UV_HANDLE_CLOSING));
   1198 
   1199   if (stream->connect_req) {
   1200     uv__stream_connect(stream);
   1201     return;
   1202   }
   1203 
   1204   assert(uv__stream_fd(stream) >= 0);
   1205 
   1206   /* Ignore POLLHUP here. Even if it's set, there may still be data to read. */
   1207   if (events & (POLLIN | POLLERR | POLLHUP))
   1208     uv__read(stream);
   1209 
   1210   if (uv__stream_fd(stream) == -1)
   1211     return;  /* read_cb closed stream. */
   1212 
   1213   /* Short-circuit iff POLLHUP is set, the user is still interested in read
   1214    * events and uv__read() reported a partial read but not EOF. If the EOF
   1215    * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
   1216    * have to do anything. If the partial read flag is not set, we can't
   1217    * report the EOF yet because there is still data to read.
   1218    */
   1219   if ((events & POLLHUP) &&
   1220       (stream->flags & UV_HANDLE_READING) &&
   1221       (stream->flags & UV_HANDLE_READ_PARTIAL) &&
   1222       !(stream->flags & UV_HANDLE_READ_EOF)) {
   1223     uv_buf_t buf = { NULL, 0 };
   1224     uv__stream_eof(stream, &buf);
   1225   }
   1226 
   1227   if (uv__stream_fd(stream) == -1)
   1228     return;  /* read_cb closed stream. */
   1229 
   1230   if (events & (POLLOUT | POLLERR | POLLHUP)) {
   1231     uv__write(stream);
   1232     uv__write_callbacks(stream);
   1233 
   1234     /* Write queue drained. */
   1235     if (uv__queue_empty(&stream->write_queue))
   1236       uv__drain(stream);
   1237   }
   1238 }
   1239 
   1240 
   1241 /**
   1242  * We get called here from directly following a call to connect(2).
   1243  * In order to determine if we've errored out or succeeded must call
   1244  * getsockopt.
   1245  */
   1246 static void uv__stream_connect(uv_stream_t* stream) {
   1247   int error;
   1248   uv_connect_t* req = stream->connect_req;
   1249   socklen_t errorsize = sizeof(int);
   1250 
   1251   assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
   1252   assert(req);
   1253 
   1254   if (stream->delayed_error) {
   1255     /* To smooth over the differences between unixes errors that
   1256      * were reported synchronously on the first connect can be delayed
   1257      * until the next tick--which is now.
   1258      */
   1259     error = stream->delayed_error;
   1260     stream->delayed_error = 0;
   1261   } else {
   1262     /* Normal situation: we need to get the socket error from the kernel. */
   1263     assert(uv__stream_fd(stream) >= 0);
   1264     getsockopt(uv__stream_fd(stream),
   1265                SOL_SOCKET,
   1266                SO_ERROR,
   1267                &error,
   1268                &errorsize);
   1269     error = UV__ERR(error);
   1270   }
   1271 
   1272   if (error == UV__ERR(EINPROGRESS))
   1273     return;
   1274 
   1275   stream->connect_req = NULL;
   1276   uv__req_unregister(stream->loop);
   1277 
   1278   if (error < 0 || uv__queue_empty(&stream->write_queue)) {
   1279     uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
   1280   }
   1281 
   1282   if (req->cb)
   1283     req->cb(req, error);
   1284 
   1285   if (uv__stream_fd(stream) == -1)
   1286     return;
   1287 
   1288   if (error < 0) {
   1289     uv__stream_flush_write_queue(stream, UV_ECANCELED);
   1290     uv__write_callbacks(stream);
   1291   }
   1292 }
   1293 
   1294 
   1295 static int uv__check_before_write(uv_stream_t* stream,
   1296                                   unsigned int nbufs,
   1297                                   uv_stream_t* send_handle) {
   1298   assert(nbufs > 0);
   1299   assert((stream->type == UV_TCP ||
   1300           stream->type == UV_NAMED_PIPE ||
   1301           stream->type == UV_TTY) &&
   1302          "uv_write (unix) does not yet support other types of streams");
   1303 
   1304   if (uv__stream_fd(stream) < 0)
   1305     return UV_EBADF;
   1306 
   1307   if (!(stream->flags & UV_HANDLE_WRITABLE))
   1308     return UV_EPIPE;
   1309 
   1310   if (send_handle != NULL) {
   1311     if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
   1312       return UV_EINVAL;
   1313 
   1314     /* XXX We abuse uv_write2() to send over UDP handles to child processes.
   1315      * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
   1316      * evaluates to a function that operates on a uv_stream_t with a couple of
   1317      * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
   1318      * which works but only by accident.
   1319      */
   1320     if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
   1321       return UV_EBADF;
   1322 
   1323 #if defined(__CYGWIN__) || defined(__MSYS__)
   1324     /* Cygwin recvmsg always sets msg_controllen to zero, so we cannot send it.
   1325        See https://github.com/mirror/newlib-cygwin/blob/86fc4bf0/winsup/cygwin/fhandler_socket.cc#L1736-L1743 */
   1326     return UV_ENOSYS;
   1327 #endif
   1328   }
   1329 
   1330   return 0;
   1331 }
   1332 
   1333 int uv_write2(uv_write_t* req,
   1334               uv_stream_t* stream,
   1335               const uv_buf_t bufs[],
   1336               unsigned int nbufs,
   1337               uv_stream_t* send_handle,
   1338               uv_write_cb cb) {
   1339   int empty_queue;
   1340   int err;
   1341 
   1342   err = uv__check_before_write(stream, nbufs, send_handle);
   1343   if (err < 0)
   1344     return err;
   1345 
   1346   /* It's legal for write_queue_size > 0 even when the write_queue is empty;
   1347    * it means there are error-state requests in the write_completed_queue that
   1348    * will touch up write_queue_size later, see also uv__write_req_finish().
   1349    * We could check that write_queue is empty instead but that implies making
   1350    * a write() syscall when we know that the handle is in error mode.
   1351    */
   1352   empty_queue = (stream->write_queue_size == 0);
   1353 
   1354   /* Initialize the req */
   1355   uv__req_init(stream->loop, req, UV_WRITE);
   1356   req->cb = cb;
   1357   req->handle = stream;
   1358   req->error = 0;
   1359   req->send_handle = send_handle;
   1360   uv__queue_init(&req->queue);
   1361 
   1362   req->bufs = req->bufsml;
   1363   if (nbufs > ARRAY_SIZE(req->bufsml))
   1364     req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
   1365 
   1366   if (req->bufs == NULL)
   1367     return UV_ENOMEM;
   1368 
   1369   memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
   1370   req->nbufs = nbufs;
   1371   req->write_index = 0;
   1372   stream->write_queue_size += uv__count_bufs(bufs, nbufs);
   1373 
   1374   /* Append the request to write_queue. */
   1375   uv__queue_insert_tail(&stream->write_queue, &req->queue);
   1376 
   1377   /* If the queue was empty when this function began, we should attempt to
   1378    * do the write immediately. Otherwise start the write_watcher and wait
   1379    * for the fd to become writable.
   1380    */
   1381   if (stream->connect_req) {
   1382     /* Still connecting, do nothing. */
   1383   }
   1384   else if (empty_queue) {
   1385     uv__write(stream);
   1386   }
   1387   else {
   1388     /*
   1389      * blocking streams should never have anything in the queue.
   1390      * if this assert fires then somehow the blocking stream isn't being
   1391      * sufficiently flushed in uv__write.
   1392      */
   1393     assert(!(stream->flags & UV_HANDLE_BLOCKING_WRITES));
   1394     uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
   1395     uv__stream_osx_interrupt_select(stream);
   1396   }
   1397 
   1398   return 0;
   1399 }
   1400 
   1401 
   1402 /* The buffers to be written must remain valid until the callback is called.
   1403  * This is not required for the uv_buf_t array.
   1404  */
   1405 int uv_write(uv_write_t* req,
   1406              uv_stream_t* handle,
   1407              const uv_buf_t bufs[],
   1408              unsigned int nbufs,
   1409              uv_write_cb cb) {
   1410   return uv_write2(req, handle, bufs, nbufs, NULL, cb);
   1411 }
   1412 
   1413 
   1414 int uv_try_write(uv_stream_t* stream,
   1415                  const uv_buf_t bufs[],
   1416                  unsigned int nbufs) {
   1417   return uv_try_write2(stream, bufs, nbufs, NULL);
   1418 }
   1419 
   1420 
   1421 int uv_try_write2(uv_stream_t* stream,
   1422                   const uv_buf_t bufs[],
   1423                   unsigned int nbufs,
   1424                   uv_stream_t* send_handle) {
   1425   int err;
   1426 
   1427   /* Connecting or already writing some data */
   1428   if (stream->connect_req != NULL || stream->write_queue_size != 0)
   1429     return UV_EAGAIN;
   1430 
   1431   err = uv__check_before_write(stream, nbufs, NULL);
   1432   if (err < 0)
   1433     return err;
   1434 
   1435   return uv__try_write(stream, bufs, nbufs, send_handle);
   1436 }
   1437 
   1438 
   1439 int uv__read_start(uv_stream_t* stream,
   1440                    uv_alloc_cb alloc_cb,
   1441                    uv_read_cb read_cb) {
   1442   assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
   1443       stream->type == UV_TTY);
   1444 
   1445   /* The UV_HANDLE_READING flag is irrelevant of the state of the stream - it
   1446    * just expresses the desired state of the user. */
   1447   stream->flags |= UV_HANDLE_READING;
   1448   stream->flags &= ~UV_HANDLE_READ_EOF;
   1449 
   1450   /* TODO: try to do the read inline? */
   1451   assert(uv__stream_fd(stream) >= 0);
   1452   assert(alloc_cb);
   1453 
   1454   stream->read_cb = read_cb;
   1455   stream->alloc_cb = alloc_cb;
   1456 
   1457   uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
   1458   uv__handle_start(stream);
   1459   uv__stream_osx_interrupt_select(stream);
   1460 
   1461   return 0;
   1462 }
   1463 
   1464 
   1465 int uv_read_stop(uv_stream_t* stream) {
   1466   if (!(stream->flags & UV_HANDLE_READING))
   1467     return 0;
   1468 
   1469   stream->flags &= ~UV_HANDLE_READING;
   1470   uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
   1471   uv__handle_stop(stream);
   1472   uv__stream_osx_interrupt_select(stream);
   1473 
   1474   stream->read_cb = NULL;
   1475   stream->alloc_cb = NULL;
   1476   return 0;
   1477 }
   1478 
   1479 
   1480 int uv_is_readable(const uv_stream_t* stream) {
   1481   return !!(stream->flags & UV_HANDLE_READABLE);
   1482 }
   1483 
   1484 
   1485 int uv_is_writable(const uv_stream_t* stream) {
   1486   return !!(stream->flags & UV_HANDLE_WRITABLE);
   1487 }
   1488 
   1489 
   1490 #if defined(__APPLE__)
   1491 int uv___stream_fd(const uv_stream_t* handle) {
   1492   const uv__stream_select_t* s;
   1493 
   1494   assert(handle->type == UV_TCP ||
   1495          handle->type == UV_TTY ||
   1496          handle->type == UV_NAMED_PIPE);
   1497 
   1498   s = handle->select;
   1499   if (s != NULL)
   1500     return s->fd;
   1501 
   1502   return handle->io_watcher.fd;
   1503 }
   1504 #endif /* defined(__APPLE__) */
   1505 
   1506 
   1507 void uv__stream_close(uv_stream_t* handle) {
   1508   unsigned int i;
   1509   uv__stream_queued_fds_t* queued_fds;
   1510 
   1511 #if defined(__APPLE__)
   1512   /* Terminate select loop first */
   1513   if (handle->select != NULL) {
   1514     uv__stream_select_t* s;
   1515 
   1516     s = handle->select;
   1517 
   1518     uv_sem_post(&s->close_sem);
   1519     uv_sem_post(&s->async_sem);
   1520     uv__stream_osx_interrupt_select(handle);
   1521     uv_thread_join(&s->thread);
   1522     uv_sem_destroy(&s->close_sem);
   1523     uv_sem_destroy(&s->async_sem);
   1524     uv__close(s->fake_fd);
   1525     uv__close(s->int_fd);
   1526     uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
   1527 
   1528     handle->select = NULL;
   1529   }
   1530 #endif /* defined(__APPLE__) */
   1531 
   1532   uv__io_close(handle->loop, &handle->io_watcher);
   1533   uv_read_stop(handle);
   1534   uv__handle_stop(handle);
   1535   handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
   1536 
   1537   if (handle->io_watcher.fd != -1) {
   1538     /* Don't close stdio file descriptors.  Nothing good comes from it. */
   1539     if (handle->io_watcher.fd > STDERR_FILENO)
   1540       uv__close(handle->io_watcher.fd);
   1541     handle->io_watcher.fd = -1;
   1542   }
   1543 
   1544   if (handle->accepted_fd != -1) {
   1545     uv__close(handle->accepted_fd);
   1546     handle->accepted_fd = -1;
   1547   }
   1548 
   1549   /* Close all queued fds */
   1550   if (handle->queued_fds != NULL) {
   1551     queued_fds = handle->queued_fds;
   1552     for (i = 0; i < queued_fds->offset; i++)
   1553       uv__close(queued_fds->fds[i]);
   1554     uv__free(handle->queued_fds);
   1555     handle->queued_fds = NULL;
   1556   }
   1557 
   1558   assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
   1559 }
   1560 
   1561 
   1562 int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
   1563   /* Don't need to check the file descriptor, uv__nonblock()
   1564    * will fail with EBADF if it's not valid.
   1565    */
   1566   return uv__nonblock(uv__stream_fd(handle), !blocking);
   1567 }
   1568