Home | History | Annotate | Line # | Download | only in unix
      1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
      2  *
      3  * Permission is hereby granted, free of charge, to any person obtaining a copy
      4  * of this software and associated documentation files (the "Software"), to
      5  * deal in the Software without restriction, including without limitation the
      6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
      7  * sell copies of the Software, and to permit persons to whom the Software is
      8  * furnished to do so, subject to the following conditions:
      9  *
     10  * The above copyright notice and this permission notice shall be included in
     11  * all copies or substantial portions of the Software.
     12  *
     13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
     18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
     19  * IN THE SOFTWARE.
     20  */
     21 
     22 #include "uv.h"
     23 #include "internal.h"
     24 
     25 #include <assert.h>
     26 #include <string.h>
     27 #include <errno.h>
     28 #include <stdlib.h>
     29 #include <unistd.h>
     30 #if defined(__MVS__)
     31 #include <xti.h>
     32 #endif
     33 #include <sys/un.h>
     34 
     35 #if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP)
     36 # define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
     37 #endif
     38 
     39 #if defined(IPV6_LEAVE_GROUP) && !defined(IPV6_DROP_MEMBERSHIP)
     40 # define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
     41 #endif
     42 
     43 static void uv__udp_run_completed(uv_udp_t* handle);
     44 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
     45 static void uv__udp_recvmsg(uv_udp_t* handle);
     46 static void uv__udp_sendmsg(uv_udp_t* handle);
     47 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
     48                                        int domain,
     49                                        unsigned int flags);
     50 static int uv__udp_sendmsg1(int fd,
     51                             const uv_buf_t* bufs,
     52                             unsigned int nbufs,
     53                             const struct sockaddr* addr);
     54 
     55 
     56 void uv__udp_close(uv_udp_t* handle) {
     57   uv__io_close(handle->loop, &handle->io_watcher);
     58   uv__handle_stop(handle);
     59 
     60   if (handle->io_watcher.fd != -1) {
     61     uv__close(handle->io_watcher.fd);
     62     handle->io_watcher.fd = -1;
     63   }
     64 }
     65 
     66 
     67 void uv__udp_finish_close(uv_udp_t* handle) {
     68   uv_udp_send_t* req;
     69   struct uv__queue* q;
     70 
     71   assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
     72   assert(handle->io_watcher.fd == -1);
     73 
     74   while (!uv__queue_empty(&handle->write_queue)) {
     75     q = uv__queue_head(&handle->write_queue);
     76     uv__queue_remove(q);
     77 
     78     req = uv__queue_data(q, uv_udp_send_t, queue);
     79     req->status = UV_ECANCELED;
     80     uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
     81   }
     82 
     83   uv__udp_run_completed(handle);
     84 
     85   assert(handle->send_queue_size == 0);
     86   assert(handle->send_queue_count == 0);
     87 
     88   /* Now tear down the handle. */
     89   handle->recv_cb = NULL;
     90   handle->alloc_cb = NULL;
     91   /* but _do not_ touch close_cb */
     92 }
     93 
     94 
     95 static void uv__udp_run_completed(uv_udp_t* handle) {
     96   uv_udp_send_t* req;
     97   struct uv__queue* q;
     98 
     99   assert(!(handle->flags & UV_HANDLE_UDP_PROCESSING));
    100   handle->flags |= UV_HANDLE_UDP_PROCESSING;
    101 
    102   while (!uv__queue_empty(&handle->write_completed_queue)) {
    103     q = uv__queue_head(&handle->write_completed_queue);
    104     uv__queue_remove(q);
    105 
    106     req = uv__queue_data(q, uv_udp_send_t, queue);
    107     uv__req_unregister(handle->loop);
    108 
    109     handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
    110     handle->send_queue_count--;
    111 
    112     if (req->bufs != req->bufsml)
    113       uv__free(req->bufs);
    114     req->bufs = NULL;
    115 
    116     if (req->send_cb == NULL)
    117       continue;
    118 
    119     /* req->status >= 0 == bytes written
    120      * req->status <  0 == errno
    121      */
    122     if (req->status >= 0)
    123       req->send_cb(req, 0);
    124     else
    125       req->send_cb(req, req->status);
    126   }
    127 
    128   if (uv__queue_empty(&handle->write_queue)) {
    129     /* Pending queue and completion queue empty, stop watcher. */
    130     uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT);
    131     if (!uv__io_active(&handle->io_watcher, POLLIN))
    132       uv__handle_stop(handle);
    133   }
    134 
    135   handle->flags &= ~UV_HANDLE_UDP_PROCESSING;
    136 }
    137 
    138 
    139 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
    140   uv_udp_t* handle;
    141 
    142   handle = container_of(w, uv_udp_t, io_watcher);
    143   assert(handle->type == UV_UDP);
    144 
    145   if (revents & POLLIN)
    146     uv__udp_recvmsg(handle);
    147 
    148   if (revents & POLLOUT && !uv__is_closing(handle)) {
    149     uv__udp_sendmsg(handle);
    150     uv__udp_run_completed(handle);
    151   }
    152 }
    153 
    154 static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf) {
    155 #if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
    156   struct sockaddr_in6 peers[20];
    157   struct iovec iov[ARRAY_SIZE(peers)];
    158   struct mmsghdr msgs[ARRAY_SIZE(peers)];
    159   ssize_t nread;
    160   uv_buf_t chunk_buf;
    161   size_t chunks;
    162   int flags;
    163   size_t k;
    164 
    165   /* prepare structures for recvmmsg */
    166   chunks = buf->len / UV__UDP_DGRAM_MAXSIZE;
    167   if (chunks > ARRAY_SIZE(iov))
    168     chunks = ARRAY_SIZE(iov);
    169   for (k = 0; k < chunks; ++k) {
    170     iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE;
    171     iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE;
    172     memset(&msgs[k].msg_hdr, 0, sizeof(msgs[k].msg_hdr));
    173     msgs[k].msg_hdr.msg_iov = iov + k;
    174     msgs[k].msg_hdr.msg_iovlen = 1;
    175     msgs[k].msg_hdr.msg_name = peers + k;
    176     msgs[k].msg_hdr.msg_namelen = sizeof(peers[0]);
    177     msgs[k].msg_hdr.msg_control = NULL;
    178     msgs[k].msg_hdr.msg_controllen = 0;
    179     msgs[k].msg_hdr.msg_flags = 0;
    180     msgs[k].msg_len = 0;
    181   }
    182 
    183 #if defined(__APPLE__)
    184   do
    185     nread = recvmsg_x(handle->io_watcher.fd, msgs, chunks, MSG_DONTWAIT);
    186   while (nread == -1 && errno == EINTR);
    187 #else
    188   do
    189     nread = recvmmsg(handle->io_watcher.fd, msgs, chunks, 0, NULL);
    190   while (nread == -1 && errno == EINTR);
    191 #endif
    192 
    193   if (nread < 1) {
    194     if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
    195       handle->recv_cb(handle, 0, buf, NULL, 0);
    196     else
    197       handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0);
    198   } else {
    199     /* pass each chunk to the application */
    200     for (k = 0; k < (size_t) nread && handle->recv_cb != NULL; k++) {
    201       flags = UV_UDP_MMSG_CHUNK;
    202       if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC)
    203         flags |= UV_UDP_PARTIAL;
    204 
    205       chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len);
    206       handle->recv_cb(handle,
    207                       msgs[k].msg_len,
    208                       &chunk_buf,
    209                       msgs[k].msg_hdr.msg_name,
    210                       flags);
    211     }
    212 
    213     /* one last callback so the original buffer is freed */
    214     if (handle->recv_cb != NULL)
    215       handle->recv_cb(handle, 0, buf, NULL, UV_UDP_MMSG_FREE);
    216   }
    217   return nread;
    218 #else  /* __linux__ || ____FreeBSD__ || __APPLE__ */
    219   return UV_ENOSYS;
    220 #endif  /* __linux__ || ____FreeBSD__ || __APPLE__ */
    221 }
    222 
    223 static void uv__udp_recvmsg(uv_udp_t* handle) {
    224   struct sockaddr_storage peer;
    225   struct msghdr h;
    226   ssize_t nread;
    227   uv_buf_t buf;
    228   int flags;
    229   int count;
    230 
    231   assert(handle->recv_cb != NULL);
    232   assert(handle->alloc_cb != NULL);
    233 
    234   /* Prevent loop starvation when the data comes in as fast as (or faster than)
    235    * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
    236    */
    237   count = 32;
    238 
    239   do {
    240     buf = uv_buf_init(NULL, 0);
    241     handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf);
    242     if (buf.base == NULL || buf.len == 0) {
    243       handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0);
    244       return;
    245     }
    246     assert(buf.base != NULL);
    247 
    248     if (uv_udp_using_recvmmsg(handle)) {
    249       nread = uv__udp_recvmmsg(handle, &buf);
    250       if (nread > 0)
    251         count -= nread;
    252       continue;
    253     }
    254 
    255     memset(&h, 0, sizeof(h));
    256     memset(&peer, 0, sizeof(peer));
    257     h.msg_name = &peer;
    258     h.msg_namelen = sizeof(peer);
    259     h.msg_iov = (void*) &buf;
    260     h.msg_iovlen = 1;
    261 
    262     do {
    263       nread = recvmsg(handle->io_watcher.fd, &h, 0);
    264     }
    265     while (nread == -1 && errno == EINTR);
    266 
    267     if (nread == -1) {
    268       if (errno == EAGAIN || errno == EWOULDBLOCK)
    269         handle->recv_cb(handle, 0, &buf, NULL, 0);
    270       else
    271         handle->recv_cb(handle, UV__ERR(errno), &buf, NULL, 0);
    272     }
    273     else {
    274       flags = 0;
    275       if (h.msg_flags & MSG_TRUNC)
    276         flags |= UV_UDP_PARTIAL;
    277 
    278       handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags);
    279     }
    280     count--;
    281   }
    282   /* recv_cb callback may decide to pause or close the handle */
    283   while (nread != -1
    284       && count > 0
    285       && handle->io_watcher.fd != -1
    286       && handle->recv_cb != NULL);
    287 }
    288 
    289 
    290 /* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
    291  * refinements for programs that use multicast. Therefore we preferentially
    292  * set SO_REUSEPORT over SO_REUSEADDR here, but we set SO_REUSEPORT only
    293  * when that socket option doesn't have the capability of load balancing.
    294  * Otherwise, we fall back to SO_REUSEADDR.
    295  *
    296  * Linux as of 3.9, DragonflyBSD 3.6, AIX 7.2.5 have the SO_REUSEPORT socket
    297  * option but with semantics that are different from the BSDs: it _shares_
    298  * the port rather than steals it from the current listener. While useful,
    299  * it's not something we can emulate on other platforms so we don't enable it.
    300  *
    301  * zOS does not support getsockname with SO_REUSEPORT option when using
    302  * AF_UNIX.
    303  *
    304  * Solaris 11.4: SO_REUSEPORT will not load balance when SO_REUSEADDR
    305  * is also set, but it's not valid for every socket type.
    306  */
    307 static int uv__sock_reuseaddr(int fd) {
    308   int yes;
    309   yes = 1;
    310 
    311 #if defined(SO_REUSEPORT) && defined(__MVS__)
    312   struct sockaddr_in sockfd;
    313   unsigned int sockfd_len = sizeof(sockfd);
    314   if (getsockname(fd, (struct sockaddr*) &sockfd, &sockfd_len) == -1)
    315       return UV__ERR(errno);
    316   if (sockfd.sin_family == AF_UNIX) {
    317     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
    318       return UV__ERR(errno);
    319   } else {
    320     if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
    321        return UV__ERR(errno);
    322   }
    323 #elif defined(SO_REUSEPORT) && defined(UV__SOLARIS_11_4) && UV__SOLARIS_11_4
    324   if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes))) {
    325     if (errno != ENOPROTOOPT)
    326       return UV__ERR(errno);
    327     /* Not all socket types accept SO_REUSEPORT. */
    328     errno = 0;
    329     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
    330       return UV__ERR(errno);
    331   }
    332 #elif defined(SO_REUSEPORT) && \
    333   !defined(__linux__) && !defined(__GNU__) && \
    334   !defined(__illumos__) && !defined(__DragonFly__) && !defined(_AIX73)
    335   if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)))
    336     return UV__ERR(errno);
    337 #else
    338   if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)))
    339     return UV__ERR(errno);
    340 #endif
    341 
    342   return 0;
    343 }
    344 
    345 /*
    346  * The Linux kernel suppresses some ICMP error messages by default for UDP
    347  * sockets. Setting IP_RECVERR/IPV6_RECVERR on the socket enables full ICMP
    348  * error reporting, hopefully resulting in faster failover to working name
    349  * servers.
    350  */
    351 static int uv__set_recverr(int fd, sa_family_t ss_family) {
    352 #if defined(__linux__)
    353   int yes;
    354 
    355   yes = 1;
    356   if (ss_family == AF_INET) {
    357     if (setsockopt(fd, IPPROTO_IP, IP_RECVERR, &yes, sizeof(yes)))
    358       return UV__ERR(errno);
    359   } else if (ss_family == AF_INET6) {
    360     if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVERR, &yes, sizeof(yes)))
    361        return UV__ERR(errno);
    362   }
    363 #endif
    364   return 0;
    365 }
    366 
    367 
    368 int uv__udp_bind(uv_udp_t* handle,
    369                  const struct sockaddr* addr,
    370                  unsigned int addrlen,
    371                  unsigned int flags) {
    372   int err;
    373   int yes;
    374   int fd;
    375 
    376   /* Check for bad flags. */
    377   if (flags & ~(UV_UDP_IPV6ONLY | UV_UDP_REUSEADDR |
    378                 UV_UDP_REUSEPORT | UV_UDP_LINUX_RECVERR))
    379     return UV_EINVAL;
    380 
    381   /* Cannot set IPv6-only mode on non-IPv6 socket. */
    382   if ((flags & UV_UDP_IPV6ONLY) && addr->sa_family != AF_INET6)
    383     return UV_EINVAL;
    384 
    385   fd = handle->io_watcher.fd;
    386   if (fd == -1) {
    387     err = uv__socket(addr->sa_family, SOCK_DGRAM, 0);
    388     if (err < 0)
    389       return err;
    390     fd = err;
    391     handle->io_watcher.fd = fd;
    392   }
    393 
    394   if (flags & UV_UDP_LINUX_RECVERR) {
    395     err = uv__set_recverr(fd, addr->sa_family);
    396     if (err)
    397       return err;
    398   }
    399 
    400   if (flags & UV_UDP_REUSEADDR) {
    401     err = uv__sock_reuseaddr(fd);
    402     if (err)
    403       return err;
    404   }
    405 
    406   if (flags & UV_UDP_REUSEPORT) {
    407     err = uv__sock_reuseport(fd);
    408     if (err)
    409       return err;
    410   }
    411 
    412   if (flags & UV_UDP_IPV6ONLY) {
    413 #ifdef IPV6_V6ONLY
    414     yes = 1;
    415     if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) {
    416       err = UV__ERR(errno);
    417       return err;
    418     }
    419 #else
    420     err = UV_ENOTSUP;
    421     return err;
    422 #endif
    423   }
    424 
    425   if (bind(fd, addr, addrlen)) {
    426     err = UV__ERR(errno);
    427     if (errno == EAFNOSUPPORT)
    428       /* OSX, other BSDs and SunoS fail with EAFNOSUPPORT when binding a
    429        * socket created with AF_INET to an AF_INET6 address or vice versa. */
    430       err = UV_EINVAL;
    431     return err;
    432   }
    433 
    434   if (addr->sa_family == AF_INET6)
    435     handle->flags |= UV_HANDLE_IPV6;
    436 
    437   handle->flags |= UV_HANDLE_BOUND;
    438   return 0;
    439 }
    440 
    441 
    442 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
    443                                        int domain,
    444                                        unsigned int flags) {
    445   union uv__sockaddr taddr;
    446   socklen_t addrlen;
    447 
    448   if (handle->io_watcher.fd != -1)
    449     return 0;
    450 
    451   switch (domain) {
    452   case AF_INET:
    453   {
    454     struct sockaddr_in* addr = &taddr.in;
    455     memset(addr, 0, sizeof *addr);
    456     addr->sin_family = AF_INET;
    457     addr->sin_addr.s_addr = INADDR_ANY;
    458     addrlen = sizeof *addr;
    459     break;
    460   }
    461   case AF_INET6:
    462   {
    463     struct sockaddr_in6* addr = &taddr.in6;
    464     memset(addr, 0, sizeof *addr);
    465     addr->sin6_family = AF_INET6;
    466     addr->sin6_addr = in6addr_any;
    467     addrlen = sizeof *addr;
    468     break;
    469   }
    470   default:
    471     assert(0 && "unsupported address family");
    472     abort();
    473   }
    474 
    475   return uv__udp_bind(handle, &taddr.addr, addrlen, flags);
    476 }
    477 
    478 
    479 int uv__udp_connect(uv_udp_t* handle,
    480                     const struct sockaddr* addr,
    481                     unsigned int addrlen) {
    482   int err;
    483 
    484   err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
    485   if (err)
    486     return err;
    487 
    488   do {
    489     errno = 0;
    490     err = connect(handle->io_watcher.fd, addr, addrlen);
    491   } while (err == -1 && errno == EINTR);
    492 
    493   if (err)
    494     return UV__ERR(errno);
    495 
    496   handle->flags |= UV_HANDLE_UDP_CONNECTED;
    497 
    498   return 0;
    499 }
    500 
    501 /* From https://pubs.opengroup.org/onlinepubs/9699919799/functions/connect.html
    502  * Any of uv supported UNIXs kernel should be standardized, but the kernel
    503  * implementation logic not same, let's use pseudocode to explain the udp
    504  * disconnect behaviors:
    505  *
    506  * Predefined stubs for pseudocode:
    507  *   1. sodisconnect: The function to perform the real udp disconnect
    508  *   2. pru_connect: The function to perform the real udp connect
    509  *   3. so: The kernel object match with socket fd
    510  *   4. addr: The sockaddr parameter from user space
    511  *
    512  * BSDs:
    513  *   if(sodisconnect(so) == 0) { // udp disconnect succeed
    514  *     if (addr->sa_len != so->addr->sa_len) return EINVAL;
    515  *     if (addr->sa_family != so->addr->sa_family) return EAFNOSUPPORT;
    516  *     pru_connect(so);
    517  *   }
    518  *   else return EISCONN;
    519  *
    520  * z/OS (same with Windows):
    521  *   if(addr->sa_len < so->addr->sa_len) return EINVAL;
    522  *   if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
    523  *
    524  * AIX:
    525  *   if(addr->sa_len != sizeof(struct sockaddr)) return EINVAL; // ignore ip proto version
    526  *   if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
    527  *
    528  * Linux,Others:
    529  *   if(addr->sa_len < sizeof(struct sockaddr)) return EINVAL;
    530  *   if (addr->sa_family == AF_UNSPEC) sodisconnect(so);
    531  */
    532 int uv__udp_disconnect(uv_udp_t* handle) {
    533     int r;
    534 #if defined(__MVS__)
    535     struct sockaddr_storage addr;
    536 #else
    537     struct sockaddr addr;
    538 #endif
    539 
    540     memset(&addr, 0, sizeof(addr));
    541 
    542 #if defined(__MVS__)
    543     addr.ss_family = AF_UNSPEC;
    544 #else
    545     addr.sa_family = AF_UNSPEC;
    546 #endif
    547 
    548     do {
    549       errno = 0;
    550 #ifdef __PASE__
    551       /* On IBMi a connectionless transport socket can be disconnected by
    552        * either setting the addr parameter to NULL or setting the
    553        * addr_length parameter to zero, and issuing another connect().
    554        * https://www.ibm.com/docs/en/i/7.4?topic=ssw_ibm_i_74/apis/connec.htm
    555        */
    556       r = connect(handle->io_watcher.fd, (struct sockaddr*) NULL, 0);
    557 #else
    558       r = connect(handle->io_watcher.fd, (struct sockaddr*) &addr, sizeof(addr));
    559 #endif
    560     } while (r == -1 && errno == EINTR);
    561 
    562     if (r == -1) {
    563 #if defined(BSD)  /* The macro BSD is from sys/param.h */
    564       if (errno != EAFNOSUPPORT && errno != EINVAL)
    565         return UV__ERR(errno);
    566 #else
    567       return UV__ERR(errno);
    568 #endif
    569     }
    570 
    571     handle->flags &= ~UV_HANDLE_UDP_CONNECTED;
    572     return 0;
    573 }
    574 
    575 int uv__udp_send(uv_udp_send_t* req,
    576                  uv_udp_t* handle,
    577                  const uv_buf_t bufs[],
    578                  unsigned int nbufs,
    579                  const struct sockaddr* addr,
    580                  unsigned int addrlen,
    581                  uv_udp_send_cb send_cb) {
    582   int err;
    583   int empty_queue;
    584 
    585   assert(nbufs > 0);
    586 
    587   if (addr) {
    588     err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
    589     if (err)
    590       return err;
    591   }
    592 
    593   /* It's legal for send_queue_count > 0 even when the write_queue is empty;
    594    * it means there are error-state requests in the write_completed_queue that
    595    * will touch up send_queue_size/count later.
    596    */
    597   empty_queue = (handle->send_queue_count == 0);
    598 
    599   uv__req_init(handle->loop, req, UV_UDP_SEND);
    600   assert(addrlen <= sizeof(req->u.storage));
    601   if (addr == NULL)
    602     req->u.storage.ss_family = AF_UNSPEC;
    603   else
    604     memcpy(&req->u.storage, addr, addrlen);
    605   req->send_cb = send_cb;
    606   req->handle = handle;
    607   req->nbufs = nbufs;
    608 
    609   req->bufs = req->bufsml;
    610   if (nbufs > ARRAY_SIZE(req->bufsml))
    611     req->bufs = uv__malloc(nbufs * sizeof(bufs[0]));
    612 
    613   if (req->bufs == NULL) {
    614     uv__req_unregister(handle->loop);
    615     return UV_ENOMEM;
    616   }
    617 
    618   memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
    619   handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
    620   handle->send_queue_count++;
    621   uv__queue_insert_tail(&handle->write_queue, &req->queue);
    622   uv__handle_start(handle);
    623 
    624   if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) {
    625     uv__udp_sendmsg(handle);
    626 
    627     /* `uv__udp_sendmsg` may not be able to do non-blocking write straight
    628      * away. In such cases the `io_watcher` has to be queued for asynchronous
    629      * write.
    630      */
    631     if (!uv__queue_empty(&handle->write_queue))
    632       uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
    633   } else {
    634     uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
    635   }
    636 
    637   return 0;
    638 }
    639 
    640 
    641 int uv__udp_try_send(uv_udp_t* handle,
    642                      const uv_buf_t bufs[],
    643                      unsigned int nbufs,
    644                      const struct sockaddr* addr,
    645                      unsigned int addrlen) {
    646   int err;
    647 
    648   if (nbufs < 1)
    649     return UV_EINVAL;
    650 
    651   /* already sending a message */
    652   if (handle->send_queue_count != 0)
    653     return UV_EAGAIN;
    654 
    655   if (addr) {
    656     err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
    657     if (err)
    658       return err;
    659   } else {
    660     assert(handle->flags & UV_HANDLE_UDP_CONNECTED);
    661   }
    662 
    663   err = uv__udp_sendmsg1(handle->io_watcher.fd, bufs, nbufs, addr);
    664   if (err > 0)
    665     return uv__count_bufs(bufs, nbufs);
    666 
    667   return err;
    668 }
    669 
    670 
    671 static int uv__udp_set_membership4(uv_udp_t* handle,
    672                                    const struct sockaddr_in* multicast_addr,
    673                                    const char* interface_addr,
    674                                    uv_membership membership) {
    675   struct ip_mreq mreq;
    676   int optname;
    677   int err;
    678 
    679   memset(&mreq, 0, sizeof mreq);
    680 
    681   if (interface_addr) {
    682     err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
    683     if (err)
    684       return err;
    685   } else {
    686     mreq.imr_interface.s_addr = htonl(INADDR_ANY);
    687   }
    688 
    689   mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
    690 
    691   switch (membership) {
    692   case UV_JOIN_GROUP:
    693     optname = IP_ADD_MEMBERSHIP;
    694     break;
    695   case UV_LEAVE_GROUP:
    696     optname = IP_DROP_MEMBERSHIP;
    697     break;
    698   default:
    699     return UV_EINVAL;
    700   }
    701 
    702   if (setsockopt(handle->io_watcher.fd,
    703                  IPPROTO_IP,
    704                  optname,
    705                  &mreq,
    706                  sizeof(mreq))) {
    707 #if defined(__MVS__)
    708   if (errno == ENXIO)
    709     return UV_ENODEV;
    710 #endif
    711     return UV__ERR(errno);
    712   }
    713 
    714   return 0;
    715 }
    716 
    717 
    718 static int uv__udp_set_membership6(uv_udp_t* handle,
    719                                    const struct sockaddr_in6* multicast_addr,
    720                                    const char* interface_addr,
    721                                    uv_membership membership) {
    722   int optname;
    723   struct ipv6_mreq mreq;
    724   struct sockaddr_in6 addr6;
    725 
    726   memset(&mreq, 0, sizeof mreq);
    727 
    728   if (interface_addr) {
    729     if (uv_ip6_addr(interface_addr, 0, &addr6))
    730       return UV_EINVAL;
    731     mreq.ipv6mr_interface = addr6.sin6_scope_id;
    732   } else {
    733     mreq.ipv6mr_interface = 0;
    734   }
    735 
    736   mreq.ipv6mr_multiaddr = multicast_addr->sin6_addr;
    737 
    738   switch (membership) {
    739   case UV_JOIN_GROUP:
    740     optname = IPV6_ADD_MEMBERSHIP;
    741     break;
    742   case UV_LEAVE_GROUP:
    743     optname = IPV6_DROP_MEMBERSHIP;
    744     break;
    745   default:
    746     return UV_EINVAL;
    747   }
    748 
    749   if (setsockopt(handle->io_watcher.fd,
    750                  IPPROTO_IPV6,
    751                  optname,
    752                  &mreq,
    753                  sizeof(mreq))) {
    754 #if defined(__MVS__)
    755   if (errno == ENXIO)
    756     return UV_ENODEV;
    757 #endif
    758     return UV__ERR(errno);
    759   }
    760 
    761   return 0;
    762 }
    763 
    764 
    765 #if !defined(__OpenBSD__) &&                                        \
    766     !defined(__NetBSD__) &&                                         \
    767     !defined(__ANDROID__) &&                                        \
    768     !defined(__DragonFly__) &&                                      \
    769     !defined(__QNX__) &&                                            \
    770     !defined(__GNU__)
    771 static int uv__udp_set_source_membership4(uv_udp_t* handle,
    772                                           const struct sockaddr_in* multicast_addr,
    773                                           const char* interface_addr,
    774                                           const struct sockaddr_in* source_addr,
    775                                           uv_membership membership) {
    776   struct ip_mreq_source mreq;
    777   int optname;
    778   int err;
    779 
    780   err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
    781   if (err)
    782     return err;
    783 
    784   memset(&mreq, 0, sizeof(mreq));
    785 
    786   if (interface_addr != NULL) {
    787     err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr);
    788     if (err)
    789       return err;
    790   } else {
    791     mreq.imr_interface.s_addr = htonl(INADDR_ANY);
    792   }
    793 
    794   mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr;
    795   mreq.imr_sourceaddr.s_addr = source_addr->sin_addr.s_addr;
    796 
    797   if (membership == UV_JOIN_GROUP)
    798     optname = IP_ADD_SOURCE_MEMBERSHIP;
    799   else if (membership == UV_LEAVE_GROUP)
    800     optname = IP_DROP_SOURCE_MEMBERSHIP;
    801   else
    802     return UV_EINVAL;
    803 
    804   if (setsockopt(handle->io_watcher.fd,
    805                  IPPROTO_IP,
    806                  optname,
    807                  &mreq,
    808                  sizeof(mreq))) {
    809     return UV__ERR(errno);
    810   }
    811 
    812   return 0;
    813 }
    814 
    815 
    816 static int uv__udp_set_source_membership6(uv_udp_t* handle,
    817                                           const struct sockaddr_in6* multicast_addr,
    818                                           const char* interface_addr,
    819                                           const struct sockaddr_in6* source_addr,
    820                                           uv_membership membership) {
    821   struct group_source_req mreq;
    822   struct sockaddr_in6 addr6;
    823   int optname;
    824   int err;
    825 
    826   err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
    827   if (err)
    828     return err;
    829 
    830   memset(&mreq, 0, sizeof(mreq));
    831 
    832   if (interface_addr != NULL) {
    833     err = uv_ip6_addr(interface_addr, 0, &addr6);
    834     if (err)
    835       return err;
    836     mreq.gsr_interface = addr6.sin6_scope_id;
    837   } else {
    838     mreq.gsr_interface = 0;
    839   }
    840 
    841   STATIC_ASSERT(sizeof(mreq.gsr_group) >= sizeof(*multicast_addr));
    842   STATIC_ASSERT(sizeof(mreq.gsr_source) >= sizeof(*source_addr));
    843   memcpy(&mreq.gsr_group, multicast_addr, sizeof(*multicast_addr));
    844   memcpy(&mreq.gsr_source, source_addr, sizeof(*source_addr));
    845 
    846   if (membership == UV_JOIN_GROUP)
    847     optname = MCAST_JOIN_SOURCE_GROUP;
    848   else if (membership == UV_LEAVE_GROUP)
    849     optname = MCAST_LEAVE_SOURCE_GROUP;
    850   else
    851     return UV_EINVAL;
    852 
    853   if (setsockopt(handle->io_watcher.fd,
    854                  IPPROTO_IPV6,
    855                  optname,
    856                  &mreq,
    857                  sizeof(mreq))) {
    858     return UV__ERR(errno);
    859   }
    860 
    861   return 0;
    862 }
    863 #endif
    864 
    865 
    866 int uv__udp_init_ex(uv_loop_t* loop,
    867                     uv_udp_t* handle,
    868                     unsigned flags,
    869                     int domain) {
    870   int fd;
    871 
    872   fd = -1;
    873   if (domain != AF_UNSPEC) {
    874     fd = uv__socket(domain, SOCK_DGRAM, 0);
    875     if (fd < 0)
    876       return fd;
    877   }
    878 
    879   uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP);
    880   handle->alloc_cb = NULL;
    881   handle->recv_cb = NULL;
    882   handle->send_queue_size = 0;
    883   handle->send_queue_count = 0;
    884   uv__io_init(&handle->io_watcher, uv__udp_io, fd);
    885   uv__queue_init(&handle->write_queue);
    886   uv__queue_init(&handle->write_completed_queue);
    887 
    888   return 0;
    889 }
    890 
    891 
    892 int uv_udp_using_recvmmsg(const uv_udp_t* handle) {
    893 #if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
    894   if (handle->flags & UV_HANDLE_UDP_RECVMMSG)
    895     return 1;
    896 #endif
    897   return 0;
    898 }
    899 
    900 
    901 int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) {
    902   int err;
    903 
    904   /* Check for already active socket. */
    905   if (handle->io_watcher.fd != -1)
    906     return UV_EBUSY;
    907 
    908   if (uv__fd_exists(handle->loop, sock))
    909     return UV_EEXIST;
    910 
    911   err = uv__nonblock(sock, 1);
    912   if (err)
    913     return err;
    914 
    915   err = uv__sock_reuseaddr(sock);
    916   if (err)
    917     return err;
    918 
    919   handle->io_watcher.fd = sock;
    920   if (uv__udp_is_connected(handle))
    921     handle->flags |= UV_HANDLE_UDP_CONNECTED;
    922 
    923   return 0;
    924 }
    925 
    926 
    927 int uv_udp_set_membership(uv_udp_t* handle,
    928                           const char* multicast_addr,
    929                           const char* interface_addr,
    930                           uv_membership membership) {
    931   int err;
    932   struct sockaddr_in addr4;
    933   struct sockaddr_in6 addr6;
    934 
    935   if (uv_ip4_addr(multicast_addr, 0, &addr4) == 0) {
    936     err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR);
    937     if (err)
    938       return err;
    939     return uv__udp_set_membership4(handle, &addr4, interface_addr, membership);
    940   } else if (uv_ip6_addr(multicast_addr, 0, &addr6) == 0) {
    941     err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR);
    942     if (err)
    943       return err;
    944     return uv__udp_set_membership6(handle, &addr6, interface_addr, membership);
    945   } else {
    946     return UV_EINVAL;
    947   }
    948 }
    949 
    950 
    951 int uv_udp_set_source_membership(uv_udp_t* handle,
    952                                  const char* multicast_addr,
    953                                  const char* interface_addr,
    954                                  const char* source_addr,
    955                                  uv_membership membership) {
    956 #if !defined(__OpenBSD__) &&                                        \
    957     !defined(__NetBSD__) &&                                         \
    958     !defined(__ANDROID__) &&                                        \
    959     !defined(__DragonFly__) &&                                      \
    960     !defined(__QNX__) &&                                            \
    961     !defined(__GNU__)
    962   int err;
    963   union uv__sockaddr mcast_addr;
    964   union uv__sockaddr src_addr;
    965 
    966   err = uv_ip4_addr(multicast_addr, 0, &mcast_addr.in);
    967   if (err) {
    968     err = uv_ip6_addr(multicast_addr, 0, &mcast_addr.in6);
    969     if (err)
    970       return err;
    971     err = uv_ip6_addr(source_addr, 0, &src_addr.in6);
    972     if (err)
    973       return err;
    974     return uv__udp_set_source_membership6(handle,
    975                                           &mcast_addr.in6,
    976                                           interface_addr,
    977                                           &src_addr.in6,
    978                                           membership);
    979   }
    980 
    981   err = uv_ip4_addr(source_addr, 0, &src_addr.in);
    982   if (err)
    983     return err;
    984   return uv__udp_set_source_membership4(handle,
    985                                         &mcast_addr.in,
    986                                         interface_addr,
    987                                         &src_addr.in,
    988                                         membership);
    989 #else
    990   return UV_ENOSYS;
    991 #endif
    992 }
    993 
    994 
    995 static int uv__setsockopt(uv_udp_t* handle,
    996                          int option4,
    997                          int option6,
    998                          const void* val,
    999                          socklen_t size) {
   1000   int r;
   1001 
   1002   if (handle->flags & UV_HANDLE_IPV6)
   1003     r = setsockopt(handle->io_watcher.fd,
   1004                    IPPROTO_IPV6,
   1005                    option6,
   1006                    val,
   1007                    size);
   1008   else
   1009     r = setsockopt(handle->io_watcher.fd,
   1010                    IPPROTO_IP,
   1011                    option4,
   1012                    val,
   1013                    size);
   1014   if (r)
   1015     return UV__ERR(errno);
   1016 
   1017   return 0;
   1018 }
   1019 
   1020 static int uv__setsockopt_maybe_char(uv_udp_t* handle,
   1021                                      int option4,
   1022                                      int option6,
   1023                                      int val) {
   1024 #if defined(__sun) || defined(_AIX) || defined(__MVS__)
   1025   char arg = val;
   1026 #elif defined(__OpenBSD__)
   1027   unsigned char arg = val;
   1028 #else
   1029   int arg = val;
   1030 #endif
   1031 
   1032   if (val < 0 || val > 255)
   1033     return UV_EINVAL;
   1034 
   1035   return uv__setsockopt(handle, option4, option6, &arg, sizeof(arg));
   1036 }
   1037 
   1038 
   1039 int uv_udp_set_broadcast(uv_udp_t* handle, int on) {
   1040   if (setsockopt(handle->io_watcher.fd,
   1041                  SOL_SOCKET,
   1042                  SO_BROADCAST,
   1043                  &on,
   1044                  sizeof(on))) {
   1045     return UV__ERR(errno);
   1046   }
   1047 
   1048   return 0;
   1049 }
   1050 
   1051 
   1052 int uv_udp_set_ttl(uv_udp_t* handle, int ttl) {
   1053   if (ttl < 1 || ttl > 255)
   1054     return UV_EINVAL;
   1055 
   1056 #if defined(__MVS__)
   1057   if (!(handle->flags & UV_HANDLE_IPV6))
   1058     return UV_ENOTSUP;  /* zOS does not support setting ttl for IPv4 */
   1059 #endif
   1060 
   1061 /*
   1062  * On Solaris and derivatives such as SmartOS, the length of socket options
   1063  * is sizeof(int) for IP_TTL and IPV6_UNICAST_HOPS,
   1064  * so hardcode the size of these options on this platform,
   1065  * and use the general uv__setsockopt_maybe_char call on other platforms.
   1066  */
   1067 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
   1068     defined(__MVS__) || defined(__QNX__)
   1069 
   1070   return uv__setsockopt(handle,
   1071                         IP_TTL,
   1072                         IPV6_UNICAST_HOPS,
   1073                         &ttl,
   1074                         sizeof(ttl));
   1075 
   1076 #else /* !(defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
   1077            defined(__MVS__) || defined(__QNX__)) */
   1078 
   1079   return uv__setsockopt_maybe_char(handle,
   1080                                    IP_TTL,
   1081                                    IPV6_UNICAST_HOPS,
   1082                                    ttl);
   1083 
   1084 #endif /* defined(__sun) || defined(_AIX) || defined (__OpenBSD__) ||
   1085           defined(__MVS__) || defined(__QNX__) */
   1086 }
   1087 
   1088 
   1089 int uv_udp_set_multicast_ttl(uv_udp_t* handle, int ttl) {
   1090 /*
   1091  * On Solaris and derivatives such as SmartOS, the length of socket options
   1092  * is sizeof(int) for IPV6_MULTICAST_HOPS and sizeof(char) for
   1093  * IP_MULTICAST_TTL, so hardcode the size of the option in the IPv6 case,
   1094  * and use the general uv__setsockopt_maybe_char call otherwise.
   1095  */
   1096 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
   1097     defined(__MVS__) || defined(__QNX__)
   1098   if (handle->flags & UV_HANDLE_IPV6)
   1099     return uv__setsockopt(handle,
   1100                           IP_MULTICAST_TTL,
   1101                           IPV6_MULTICAST_HOPS,
   1102                           &ttl,
   1103                           sizeof(ttl));
   1104 #endif /* defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
   1105     defined(__MVS__) || defined(__QNX__) */
   1106 
   1107   return uv__setsockopt_maybe_char(handle,
   1108                                    IP_MULTICAST_TTL,
   1109                                    IPV6_MULTICAST_HOPS,
   1110                                    ttl);
   1111 }
   1112 
   1113 
   1114 int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) {
   1115 /*
   1116  * On Solaris and derivatives such as SmartOS, the length of socket options
   1117  * is sizeof(int) for IPV6_MULTICAST_LOOP and sizeof(char) for
   1118  * IP_MULTICAST_LOOP, so hardcode the size of the option in the IPv6 case,
   1119  * and use the general uv__setsockopt_maybe_char call otherwise.
   1120  */
   1121 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \
   1122     defined(__MVS__) || defined(__QNX__)
   1123   if (handle->flags & UV_HANDLE_IPV6)
   1124     return uv__setsockopt(handle,
   1125                           IP_MULTICAST_LOOP,
   1126                           IPV6_MULTICAST_LOOP,
   1127                           &on,
   1128                           sizeof(on));
   1129 #endif /* defined(__sun) || defined(_AIX) ||defined(__OpenBSD__) ||
   1130     defined(__MVS__) || defined(__QNX__) */
   1131 
   1132   return uv__setsockopt_maybe_char(handle,
   1133                                    IP_MULTICAST_LOOP,
   1134                                    IPV6_MULTICAST_LOOP,
   1135                                    on);
   1136 }
   1137 
   1138 int uv_udp_set_multicast_interface(uv_udp_t* handle, const char* interface_addr) {
   1139   struct sockaddr_storage addr_st;
   1140   struct sockaddr_in* addr4;
   1141   struct sockaddr_in6* addr6;
   1142 
   1143   addr4 = (struct sockaddr_in*) &addr_st;
   1144   addr6 = (struct sockaddr_in6*) &addr_st;
   1145 
   1146   if (!interface_addr) {
   1147     memset(&addr_st, 0, sizeof addr_st);
   1148     if (handle->flags & UV_HANDLE_IPV6) {
   1149       addr_st.ss_family = AF_INET6;
   1150       addr6->sin6_scope_id = 0;
   1151     } else {
   1152       addr_st.ss_family = AF_INET;
   1153       addr4->sin_addr.s_addr = htonl(INADDR_ANY);
   1154     }
   1155   } else if (uv_ip4_addr(interface_addr, 0, addr4) == 0) {
   1156     /* nothing, address was parsed */
   1157   } else if (uv_ip6_addr(interface_addr, 0, addr6) == 0) {
   1158     /* nothing, address was parsed */
   1159   } else {
   1160     return UV_EINVAL;
   1161   }
   1162 
   1163   if (addr_st.ss_family == AF_INET) {
   1164     if (setsockopt(handle->io_watcher.fd,
   1165                    IPPROTO_IP,
   1166                    IP_MULTICAST_IF,
   1167                    (void*) &addr4->sin_addr,
   1168                    sizeof(addr4->sin_addr)) == -1) {
   1169       return UV__ERR(errno);
   1170     }
   1171   } else if (addr_st.ss_family == AF_INET6) {
   1172     if (setsockopt(handle->io_watcher.fd,
   1173                    IPPROTO_IPV6,
   1174                    IPV6_MULTICAST_IF,
   1175                    &addr6->sin6_scope_id,
   1176                    sizeof(addr6->sin6_scope_id)) == -1) {
   1177       return UV__ERR(errno);
   1178     }
   1179   } else {
   1180     assert(0 && "unexpected address family");
   1181     abort();
   1182   }
   1183 
   1184   return 0;
   1185 }
   1186 
   1187 int uv_udp_getpeername(const uv_udp_t* handle,
   1188                        struct sockaddr* name,
   1189                        int* namelen) {
   1190 
   1191   return uv__getsockpeername((const uv_handle_t*) handle,
   1192                              getpeername,
   1193                              name,
   1194                              namelen);
   1195 }
   1196 
   1197 int uv_udp_getsockname(const uv_udp_t* handle,
   1198                        struct sockaddr* name,
   1199                        int* namelen) {
   1200 
   1201   return uv__getsockpeername((const uv_handle_t*) handle,
   1202                              getsockname,
   1203                              name,
   1204                              namelen);
   1205 }
   1206 
   1207 
   1208 int uv__udp_recv_start(uv_udp_t* handle,
   1209                        uv_alloc_cb alloc_cb,
   1210                        uv_udp_recv_cb recv_cb) {
   1211   int err;
   1212 
   1213   if (alloc_cb == NULL || recv_cb == NULL)
   1214     return UV_EINVAL;
   1215 
   1216   if (uv__io_active(&handle->io_watcher, POLLIN))
   1217     return UV_EALREADY;  /* FIXME(bnoordhuis) Should be UV_EBUSY. */
   1218 
   1219   err = uv__udp_maybe_deferred_bind(handle, AF_INET, 0);
   1220   if (err)
   1221     return err;
   1222 
   1223   handle->alloc_cb = alloc_cb;
   1224   handle->recv_cb = recv_cb;
   1225 
   1226   uv__io_start(handle->loop, &handle->io_watcher, POLLIN);
   1227   uv__handle_start(handle);
   1228 
   1229   return 0;
   1230 }
   1231 
   1232 
   1233 int uv__udp_recv_stop(uv_udp_t* handle) {
   1234   uv__io_stop(handle->loop, &handle->io_watcher, POLLIN);
   1235 
   1236   if (!uv__io_active(&handle->io_watcher, POLLOUT))
   1237     uv__handle_stop(handle);
   1238 
   1239   handle->alloc_cb = NULL;
   1240   handle->recv_cb = NULL;
   1241 
   1242   return 0;
   1243 }
   1244 
   1245 
   1246 static int uv__udp_prep_pkt(struct msghdr* h,
   1247                             const uv_buf_t* bufs,
   1248                             const unsigned int nbufs,
   1249                             const struct sockaddr* addr) {
   1250   memset(h, 0, sizeof(*h));
   1251   h->msg_name = (void*) addr;
   1252   h->msg_iov = (void*) bufs;
   1253   h->msg_iovlen = nbufs;
   1254   if (addr == NULL)
   1255     return 0;
   1256   switch (addr->sa_family) {
   1257   case AF_INET:
   1258     h->msg_namelen = sizeof(struct sockaddr_in);
   1259     return 0;
   1260   case AF_INET6:
   1261     h->msg_namelen = sizeof(struct sockaddr_in6);
   1262     return 0;
   1263   case AF_UNIX:
   1264     h->msg_namelen = sizeof(struct sockaddr_un);
   1265     return 0;
   1266   case AF_UNSPEC:
   1267     h->msg_name = NULL;
   1268     return 0;
   1269   }
   1270   return UV_EINVAL;
   1271 }
   1272 
   1273 
   1274 static int uv__udp_sendmsg1(int fd,
   1275                             const uv_buf_t* bufs,
   1276                             unsigned int nbufs,
   1277                             const struct sockaddr* addr) {
   1278   struct msghdr h;
   1279   int r;
   1280 
   1281   if ((r = uv__udp_prep_pkt(&h, bufs, nbufs, addr)))
   1282     return r;
   1283 
   1284   do
   1285     r = sendmsg(fd, &h, 0);
   1286   while (r == -1 && errno == EINTR);
   1287 
   1288   if (r < 0) {
   1289     r = UV__ERR(errno);
   1290     if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
   1291       r = UV_EAGAIN;
   1292     return r;
   1293   }
   1294 
   1295   /* UDP sockets don't EOF so we don't have to handle r=0 specially,
   1296    * that only happens when the input was a zero-sized buffer.
   1297    */
   1298   return 1;
   1299 }
   1300 
   1301 
   1302 static int uv__udp_sendmsgv(int fd,
   1303                             unsigned int count,
   1304                             uv_buf_t* bufs[/*count*/],
   1305                             unsigned int nbufs[/*count*/],
   1306                             struct sockaddr* addrs[/*count*/]) {
   1307   unsigned int i;
   1308   int nsent;
   1309   int r;
   1310 
   1311   r = 0;
   1312   nsent = 0;
   1313 
   1314 #if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) || \
   1315   (defined(__sun__) && defined(MSG_WAITFORONE))
   1316   if (count > 1) {
   1317     for (i = 0; i < count; /*empty*/) {
   1318       struct mmsghdr m[20];
   1319       unsigned int n;
   1320 
   1321       for (n = 0; i < count && n < ARRAY_SIZE(m); i++, n++)
   1322         if ((r = uv__udp_prep_pkt(&m[n].msg_hdr, bufs[i], nbufs[i], addrs[i])))
   1323           goto exit;
   1324 
   1325       do
   1326 #if defined(__APPLE__)
   1327         r = sendmsg_x(fd, m, n, MSG_DONTWAIT);
   1328 #else
   1329         r = sendmmsg(fd, m, n, 0);
   1330 #endif
   1331       while (r == -1 && errno == EINTR);
   1332 
   1333       if (r < 1)
   1334         goto exit;
   1335 
   1336       nsent += r;
   1337       i += r;
   1338     }
   1339 
   1340     goto exit;
   1341   }
   1342 #endif  /* defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) ||
   1343 	 * (defined(__sun__) && defined(MSG_WAITFORONE))
   1344 	 */
   1345 
   1346   for (i = 0; i < count; i++, nsent++)
   1347     if ((r = uv__udp_sendmsg1(fd, bufs[i], nbufs[i], addrs[i])))
   1348       goto exit;  /* goto to avoid unused label warning. */
   1349 
   1350 exit:
   1351 
   1352   if (nsent > 0)
   1353     return nsent;
   1354 
   1355   if (r < 0) {
   1356     r = UV__ERR(errno);
   1357     if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
   1358       r = UV_EAGAIN;
   1359   }
   1360 
   1361   return r;
   1362 }
   1363 
   1364 
   1365 static void uv__udp_sendmsg(uv_udp_t* handle) {
   1366   static const int N = 20;
   1367   struct sockaddr* addrs[N];
   1368   unsigned int nbufs[N];
   1369   uv_buf_t* bufs[N];
   1370   struct uv__queue* q;
   1371   uv_udp_send_t* req;
   1372   int n;
   1373 
   1374   if (uv__queue_empty(&handle->write_queue))
   1375     return;
   1376 
   1377 again:
   1378   n = 0;
   1379   q = uv__queue_head(&handle->write_queue);
   1380   do {
   1381     req = uv__queue_data(q, uv_udp_send_t, queue);
   1382     addrs[n] = &req->u.addr;
   1383     nbufs[n] = req->nbufs;
   1384     bufs[n] = req->bufs;
   1385     q = uv__queue_next(q);
   1386     n++;
   1387   } while (n < N && q != &handle->write_queue);
   1388 
   1389   n = uv__udp_sendmsgv(handle->io_watcher.fd, n, bufs, nbufs, addrs);
   1390   while (n > 0) {
   1391     q = uv__queue_head(&handle->write_queue);
   1392     req = uv__queue_data(q, uv_udp_send_t, queue);
   1393     req->status = uv__count_bufs(req->bufs, req->nbufs);
   1394     uv__queue_remove(&req->queue);
   1395     uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
   1396     n--;
   1397   }
   1398 
   1399   if (n == 0) {
   1400     if (uv__queue_empty(&handle->write_queue))
   1401       goto feed;
   1402     goto again;
   1403   }
   1404 
   1405   if (n == UV_EAGAIN)
   1406     return;
   1407 
   1408   /* Register the error against first request in queue because that
   1409    * is the request that uv__udp_sendmsgv tried but failed to send,
   1410    * because if it did send any requests, it won't return an error.
   1411    */
   1412   q = uv__queue_head(&handle->write_queue);
   1413   req = uv__queue_data(q, uv_udp_send_t, queue);
   1414   req->status = n;
   1415   uv__queue_remove(&req->queue);
   1416   uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
   1417 feed:
   1418   uv__io_feed(handle->loop, &handle->io_watcher);
   1419 }
   1420 
   1421 
   1422 int uv__udp_try_send2(uv_udp_t* handle,
   1423                       unsigned int count,
   1424                       uv_buf_t* bufs[/*count*/],
   1425                       unsigned int nbufs[/*count*/],
   1426                       struct sockaddr* addrs[/*count*/]) {
   1427   int fd;
   1428 
   1429   fd = handle->io_watcher.fd;
   1430   if (fd == -1)
   1431     return UV_EINVAL;
   1432 
   1433   return uv__udp_sendmsgv(fd, count, bufs, nbufs, addrs);
   1434 }
   1435