1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 * 3 * Permission is hereby granted, free of charge, to any person obtaining a copy 4 * of this software and associated documentation files (the "Software"), to 5 * deal in the Software without restriction, including without limitation the 6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 7 * sell copies of the Software, and to permit persons to whom the Software is 8 * furnished to do so, subject to the following conditions: 9 * 10 * The above copyright notice and this permission notice shall be included in 11 * all copies or substantial portions of the Software. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19 * IN THE SOFTWARE. 20 */ 21 22 #include "uv.h" 23 #include "internal.h" 24 25 #include <assert.h> 26 #include <string.h> 27 #include <errno.h> 28 #include <stdlib.h> 29 #include <unistd.h> 30 #if defined(__MVS__) 31 #include <xti.h> 32 #endif 33 #include <sys/un.h> 34 35 #if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP) 36 # define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP 37 #endif 38 39 #if defined(IPV6_LEAVE_GROUP) && !defined(IPV6_DROP_MEMBERSHIP) 40 # define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP 41 #endif 42 43 static void uv__udp_run_completed(uv_udp_t* handle); 44 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents); 45 static void uv__udp_recvmsg(uv_udp_t* handle); 46 static void uv__udp_sendmsg(uv_udp_t* handle); 47 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, 48 int domain, 49 unsigned int flags); 50 static int uv__udp_sendmsg1(int fd, 51 const uv_buf_t* bufs, 52 unsigned int nbufs, 53 const struct sockaddr* addr); 54 55 56 void uv__udp_close(uv_udp_t* handle) { 57 uv__io_close(handle->loop, &handle->io_watcher); 58 uv__handle_stop(handle); 59 60 if (handle->io_watcher.fd != -1) { 61 uv__close(handle->io_watcher.fd); 62 handle->io_watcher.fd = -1; 63 } 64 } 65 66 67 void uv__udp_finish_close(uv_udp_t* handle) { 68 uv_udp_send_t* req; 69 struct uv__queue* q; 70 71 assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT)); 72 assert(handle->io_watcher.fd == -1); 73 74 while (!uv__queue_empty(&handle->write_queue)) { 75 q = uv__queue_head(&handle->write_queue); 76 uv__queue_remove(q); 77 78 req = uv__queue_data(q, uv_udp_send_t, queue); 79 req->status = UV_ECANCELED; 80 uv__queue_insert_tail(&handle->write_completed_queue, &req->queue); 81 } 82 83 uv__udp_run_completed(handle); 84 85 assert(handle->send_queue_size == 0); 86 assert(handle->send_queue_count == 0); 87 88 /* Now tear down the handle. */ 89 handle->recv_cb = NULL; 90 handle->alloc_cb = NULL; 91 /* but _do not_ touch close_cb */ 92 } 93 94 95 static void uv__udp_run_completed(uv_udp_t* handle) { 96 uv_udp_send_t* req; 97 struct uv__queue* q; 98 99 assert(!(handle->flags & UV_HANDLE_UDP_PROCESSING)); 100 handle->flags |= UV_HANDLE_UDP_PROCESSING; 101 102 while (!uv__queue_empty(&handle->write_completed_queue)) { 103 q = uv__queue_head(&handle->write_completed_queue); 104 uv__queue_remove(q); 105 106 req = uv__queue_data(q, uv_udp_send_t, queue); 107 uv__req_unregister(handle->loop); 108 109 handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs); 110 handle->send_queue_count--; 111 112 if (req->bufs != req->bufsml) 113 uv__free(req->bufs); 114 req->bufs = NULL; 115 116 if (req->send_cb == NULL) 117 continue; 118 119 /* req->status >= 0 == bytes written 120 * req->status < 0 == errno 121 */ 122 if (req->status >= 0) 123 req->send_cb(req, 0); 124 else 125 req->send_cb(req, req->status); 126 } 127 128 if (uv__queue_empty(&handle->write_queue)) { 129 /* Pending queue and completion queue empty, stop watcher. */ 130 uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT); 131 if (!uv__io_active(&handle->io_watcher, POLLIN)) 132 uv__handle_stop(handle); 133 } 134 135 handle->flags &= ~UV_HANDLE_UDP_PROCESSING; 136 } 137 138 139 static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) { 140 uv_udp_t* handle; 141 142 handle = container_of(w, uv_udp_t, io_watcher); 143 assert(handle->type == UV_UDP); 144 145 if (revents & POLLIN) 146 uv__udp_recvmsg(handle); 147 148 if (revents & POLLOUT && !uv__is_closing(handle)) { 149 uv__udp_sendmsg(handle); 150 uv__udp_run_completed(handle); 151 } 152 } 153 154 static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf) { 155 #if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) 156 struct sockaddr_in6 peers[20]; 157 struct iovec iov[ARRAY_SIZE(peers)]; 158 struct mmsghdr msgs[ARRAY_SIZE(peers)]; 159 ssize_t nread; 160 uv_buf_t chunk_buf; 161 size_t chunks; 162 int flags; 163 size_t k; 164 165 /* prepare structures for recvmmsg */ 166 chunks = buf->len / UV__UDP_DGRAM_MAXSIZE; 167 if (chunks > ARRAY_SIZE(iov)) 168 chunks = ARRAY_SIZE(iov); 169 for (k = 0; k < chunks; ++k) { 170 iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE; 171 iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE; 172 memset(&msgs[k].msg_hdr, 0, sizeof(msgs[k].msg_hdr)); 173 msgs[k].msg_hdr.msg_iov = iov + k; 174 msgs[k].msg_hdr.msg_iovlen = 1; 175 msgs[k].msg_hdr.msg_name = peers + k; 176 msgs[k].msg_hdr.msg_namelen = sizeof(peers[0]); 177 msgs[k].msg_hdr.msg_control = NULL; 178 msgs[k].msg_hdr.msg_controllen = 0; 179 msgs[k].msg_hdr.msg_flags = 0; 180 msgs[k].msg_len = 0; 181 } 182 183 #if defined(__APPLE__) 184 do 185 nread = recvmsg_x(handle->io_watcher.fd, msgs, chunks, MSG_DONTWAIT); 186 while (nread == -1 && errno == EINTR); 187 #else 188 do 189 nread = recvmmsg(handle->io_watcher.fd, msgs, chunks, 0, NULL); 190 while (nread == -1 && errno == EINTR); 191 #endif 192 193 if (nread < 1) { 194 if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK) 195 handle->recv_cb(handle, 0, buf, NULL, 0); 196 else 197 handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0); 198 } else { 199 /* pass each chunk to the application */ 200 for (k = 0; k < (size_t) nread && handle->recv_cb != NULL; k++) { 201 flags = UV_UDP_MMSG_CHUNK; 202 if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC) 203 flags |= UV_UDP_PARTIAL; 204 205 chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len); 206 handle->recv_cb(handle, 207 msgs[k].msg_len, 208 &chunk_buf, 209 msgs[k].msg_hdr.msg_name, 210 flags); 211 } 212 213 /* one last callback so the original buffer is freed */ 214 if (handle->recv_cb != NULL) 215 handle->recv_cb(handle, 0, buf, NULL, UV_UDP_MMSG_FREE); 216 } 217 return nread; 218 #else /* __linux__ || ____FreeBSD__ || __APPLE__ */ 219 return UV_ENOSYS; 220 #endif /* __linux__ || ____FreeBSD__ || __APPLE__ */ 221 } 222 223 static void uv__udp_recvmsg(uv_udp_t* handle) { 224 struct sockaddr_storage peer; 225 struct msghdr h; 226 ssize_t nread; 227 uv_buf_t buf; 228 int flags; 229 int count; 230 231 assert(handle->recv_cb != NULL); 232 assert(handle->alloc_cb != NULL); 233 234 /* Prevent loop starvation when the data comes in as fast as (or faster than) 235 * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O. 236 */ 237 count = 32; 238 239 do { 240 buf = uv_buf_init(NULL, 0); 241 handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf); 242 if (buf.base == NULL || buf.len == 0) { 243 handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0); 244 return; 245 } 246 assert(buf.base != NULL); 247 248 if (uv_udp_using_recvmmsg(handle)) { 249 nread = uv__udp_recvmmsg(handle, &buf); 250 if (nread > 0) 251 count -= nread; 252 continue; 253 } 254 255 memset(&h, 0, sizeof(h)); 256 memset(&peer, 0, sizeof(peer)); 257 h.msg_name = &peer; 258 h.msg_namelen = sizeof(peer); 259 h.msg_iov = (void*) &buf; 260 h.msg_iovlen = 1; 261 262 do { 263 nread = recvmsg(handle->io_watcher.fd, &h, 0); 264 } 265 while (nread == -1 && errno == EINTR); 266 267 if (nread == -1) { 268 if (errno == EAGAIN || errno == EWOULDBLOCK) 269 handle->recv_cb(handle, 0, &buf, NULL, 0); 270 else 271 handle->recv_cb(handle, UV__ERR(errno), &buf, NULL, 0); 272 } 273 else { 274 flags = 0; 275 if (h.msg_flags & MSG_TRUNC) 276 flags |= UV_UDP_PARTIAL; 277 278 handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags); 279 } 280 count--; 281 } 282 /* recv_cb callback may decide to pause or close the handle */ 283 while (nread != -1 284 && count > 0 285 && handle->io_watcher.fd != -1 286 && handle->recv_cb != NULL); 287 } 288 289 290 /* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional 291 * refinements for programs that use multicast. Therefore we preferentially 292 * set SO_REUSEPORT over SO_REUSEADDR here, but we set SO_REUSEPORT only 293 * when that socket option doesn't have the capability of load balancing. 294 * Otherwise, we fall back to SO_REUSEADDR. 295 * 296 * Linux as of 3.9, DragonflyBSD 3.6, AIX 7.2.5 have the SO_REUSEPORT socket 297 * option but with semantics that are different from the BSDs: it _shares_ 298 * the port rather than steals it from the current listener. While useful, 299 * it's not something we can emulate on other platforms so we don't enable it. 300 * 301 * zOS does not support getsockname with SO_REUSEPORT option when using 302 * AF_UNIX. 303 * 304 * Solaris 11.4: SO_REUSEPORT will not load balance when SO_REUSEADDR 305 * is also set, but it's not valid for every socket type. 306 */ 307 static int uv__sock_reuseaddr(int fd) { 308 int yes; 309 yes = 1; 310 311 #if defined(SO_REUSEPORT) && defined(__MVS__) 312 struct sockaddr_in sockfd; 313 unsigned int sockfd_len = sizeof(sockfd); 314 if (getsockname(fd, (struct sockaddr*) &sockfd, &sockfd_len) == -1) 315 return UV__ERR(errno); 316 if (sockfd.sin_family == AF_UNIX) { 317 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes))) 318 return UV__ERR(errno); 319 } else { 320 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes))) 321 return UV__ERR(errno); 322 } 323 #elif defined(SO_REUSEPORT) && defined(UV__SOLARIS_11_4) && UV__SOLARIS_11_4 324 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes))) { 325 if (errno != ENOPROTOOPT) 326 return UV__ERR(errno); 327 /* Not all socket types accept SO_REUSEPORT. */ 328 errno = 0; 329 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes))) 330 return UV__ERR(errno); 331 } 332 #elif defined(SO_REUSEPORT) && \ 333 !defined(__linux__) && !defined(__GNU__) && \ 334 !defined(__illumos__) && !defined(__DragonFly__) && !defined(_AIX73) 335 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes))) 336 return UV__ERR(errno); 337 #else 338 if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes))) 339 return UV__ERR(errno); 340 #endif 341 342 return 0; 343 } 344 345 /* 346 * The Linux kernel suppresses some ICMP error messages by default for UDP 347 * sockets. Setting IP_RECVERR/IPV6_RECVERR on the socket enables full ICMP 348 * error reporting, hopefully resulting in faster failover to working name 349 * servers. 350 */ 351 static int uv__set_recverr(int fd, sa_family_t ss_family) { 352 #if defined(__linux__) 353 int yes; 354 355 yes = 1; 356 if (ss_family == AF_INET) { 357 if (setsockopt(fd, IPPROTO_IP, IP_RECVERR, &yes, sizeof(yes))) 358 return UV__ERR(errno); 359 } else if (ss_family == AF_INET6) { 360 if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVERR, &yes, sizeof(yes))) 361 return UV__ERR(errno); 362 } 363 #endif 364 return 0; 365 } 366 367 368 int uv__udp_bind(uv_udp_t* handle, 369 const struct sockaddr* addr, 370 unsigned int addrlen, 371 unsigned int flags) { 372 int err; 373 int yes; 374 int fd; 375 376 /* Check for bad flags. */ 377 if (flags & ~(UV_UDP_IPV6ONLY | UV_UDP_REUSEADDR | 378 UV_UDP_REUSEPORT | UV_UDP_LINUX_RECVERR)) 379 return UV_EINVAL; 380 381 /* Cannot set IPv6-only mode on non-IPv6 socket. */ 382 if ((flags & UV_UDP_IPV6ONLY) && addr->sa_family != AF_INET6) 383 return UV_EINVAL; 384 385 fd = handle->io_watcher.fd; 386 if (fd == -1) { 387 err = uv__socket(addr->sa_family, SOCK_DGRAM, 0); 388 if (err < 0) 389 return err; 390 fd = err; 391 handle->io_watcher.fd = fd; 392 } 393 394 if (flags & UV_UDP_LINUX_RECVERR) { 395 err = uv__set_recverr(fd, addr->sa_family); 396 if (err) 397 return err; 398 } 399 400 if (flags & UV_UDP_REUSEADDR) { 401 err = uv__sock_reuseaddr(fd); 402 if (err) 403 return err; 404 } 405 406 if (flags & UV_UDP_REUSEPORT) { 407 err = uv__sock_reuseport(fd); 408 if (err) 409 return err; 410 } 411 412 if (flags & UV_UDP_IPV6ONLY) { 413 #ifdef IPV6_V6ONLY 414 yes = 1; 415 if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) { 416 err = UV__ERR(errno); 417 return err; 418 } 419 #else 420 err = UV_ENOTSUP; 421 return err; 422 #endif 423 } 424 425 if (bind(fd, addr, addrlen)) { 426 err = UV__ERR(errno); 427 if (errno == EAFNOSUPPORT) 428 /* OSX, other BSDs and SunoS fail with EAFNOSUPPORT when binding a 429 * socket created with AF_INET to an AF_INET6 address or vice versa. */ 430 err = UV_EINVAL; 431 return err; 432 } 433 434 if (addr->sa_family == AF_INET6) 435 handle->flags |= UV_HANDLE_IPV6; 436 437 handle->flags |= UV_HANDLE_BOUND; 438 return 0; 439 } 440 441 442 static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, 443 int domain, 444 unsigned int flags) { 445 union uv__sockaddr taddr; 446 socklen_t addrlen; 447 448 if (handle->io_watcher.fd != -1) 449 return 0; 450 451 switch (domain) { 452 case AF_INET: 453 { 454 struct sockaddr_in* addr = &taddr.in; 455 memset(addr, 0, sizeof *addr); 456 addr->sin_family = AF_INET; 457 addr->sin_addr.s_addr = INADDR_ANY; 458 addrlen = sizeof *addr; 459 break; 460 } 461 case AF_INET6: 462 { 463 struct sockaddr_in6* addr = &taddr.in6; 464 memset(addr, 0, sizeof *addr); 465 addr->sin6_family = AF_INET6; 466 addr->sin6_addr = in6addr_any; 467 addrlen = sizeof *addr; 468 break; 469 } 470 default: 471 assert(0 && "unsupported address family"); 472 abort(); 473 } 474 475 return uv__udp_bind(handle, &taddr.addr, addrlen, flags); 476 } 477 478 479 int uv__udp_connect(uv_udp_t* handle, 480 const struct sockaddr* addr, 481 unsigned int addrlen) { 482 int err; 483 484 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0); 485 if (err) 486 return err; 487 488 do { 489 errno = 0; 490 err = connect(handle->io_watcher.fd, addr, addrlen); 491 } while (err == -1 && errno == EINTR); 492 493 if (err) 494 return UV__ERR(errno); 495 496 handle->flags |= UV_HANDLE_UDP_CONNECTED; 497 498 return 0; 499 } 500 501 /* From https://pubs.opengroup.org/onlinepubs/9699919799/functions/connect.html 502 * Any of uv supported UNIXs kernel should be standardized, but the kernel 503 * implementation logic not same, let's use pseudocode to explain the udp 504 * disconnect behaviors: 505 * 506 * Predefined stubs for pseudocode: 507 * 1. sodisconnect: The function to perform the real udp disconnect 508 * 2. pru_connect: The function to perform the real udp connect 509 * 3. so: The kernel object match with socket fd 510 * 4. addr: The sockaddr parameter from user space 511 * 512 * BSDs: 513 * if(sodisconnect(so) == 0) { // udp disconnect succeed 514 * if (addr->sa_len != so->addr->sa_len) return EINVAL; 515 * if (addr->sa_family != so->addr->sa_family) return EAFNOSUPPORT; 516 * pru_connect(so); 517 * } 518 * else return EISCONN; 519 * 520 * z/OS (same with Windows): 521 * if(addr->sa_len < so->addr->sa_len) return EINVAL; 522 * if (addr->sa_family == AF_UNSPEC) sodisconnect(so); 523 * 524 * AIX: 525 * if(addr->sa_len != sizeof(struct sockaddr)) return EINVAL; // ignore ip proto version 526 * if (addr->sa_family == AF_UNSPEC) sodisconnect(so); 527 * 528 * Linux,Others: 529 * if(addr->sa_len < sizeof(struct sockaddr)) return EINVAL; 530 * if (addr->sa_family == AF_UNSPEC) sodisconnect(so); 531 */ 532 int uv__udp_disconnect(uv_udp_t* handle) { 533 int r; 534 #if defined(__MVS__) 535 struct sockaddr_storage addr; 536 #else 537 struct sockaddr addr; 538 #endif 539 540 memset(&addr, 0, sizeof(addr)); 541 542 #if defined(__MVS__) 543 addr.ss_family = AF_UNSPEC; 544 #else 545 addr.sa_family = AF_UNSPEC; 546 #endif 547 548 do { 549 errno = 0; 550 #ifdef __PASE__ 551 /* On IBMi a connectionless transport socket can be disconnected by 552 * either setting the addr parameter to NULL or setting the 553 * addr_length parameter to zero, and issuing another connect(). 554 * https://www.ibm.com/docs/en/i/7.4?topic=ssw_ibm_i_74/apis/connec.htm 555 */ 556 r = connect(handle->io_watcher.fd, (struct sockaddr*) NULL, 0); 557 #else 558 r = connect(handle->io_watcher.fd, (struct sockaddr*) &addr, sizeof(addr)); 559 #endif 560 } while (r == -1 && errno == EINTR); 561 562 if (r == -1) { 563 #if defined(BSD) /* The macro BSD is from sys/param.h */ 564 if (errno != EAFNOSUPPORT && errno != EINVAL) 565 return UV__ERR(errno); 566 #else 567 return UV__ERR(errno); 568 #endif 569 } 570 571 handle->flags &= ~UV_HANDLE_UDP_CONNECTED; 572 return 0; 573 } 574 575 int uv__udp_send(uv_udp_send_t* req, 576 uv_udp_t* handle, 577 const uv_buf_t bufs[], 578 unsigned int nbufs, 579 const struct sockaddr* addr, 580 unsigned int addrlen, 581 uv_udp_send_cb send_cb) { 582 int err; 583 int empty_queue; 584 585 assert(nbufs > 0); 586 587 if (addr) { 588 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0); 589 if (err) 590 return err; 591 } 592 593 /* It's legal for send_queue_count > 0 even when the write_queue is empty; 594 * it means there are error-state requests in the write_completed_queue that 595 * will touch up send_queue_size/count later. 596 */ 597 empty_queue = (handle->send_queue_count == 0); 598 599 uv__req_init(handle->loop, req, UV_UDP_SEND); 600 assert(addrlen <= sizeof(req->u.storage)); 601 if (addr == NULL) 602 req->u.storage.ss_family = AF_UNSPEC; 603 else 604 memcpy(&req->u.storage, addr, addrlen); 605 req->send_cb = send_cb; 606 req->handle = handle; 607 req->nbufs = nbufs; 608 609 req->bufs = req->bufsml; 610 if (nbufs > ARRAY_SIZE(req->bufsml)) 611 req->bufs = uv__malloc(nbufs * sizeof(bufs[0])); 612 613 if (req->bufs == NULL) { 614 uv__req_unregister(handle->loop); 615 return UV_ENOMEM; 616 } 617 618 memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0])); 619 handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs); 620 handle->send_queue_count++; 621 uv__queue_insert_tail(&handle->write_queue, &req->queue); 622 uv__handle_start(handle); 623 624 if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) { 625 uv__udp_sendmsg(handle); 626 627 /* `uv__udp_sendmsg` may not be able to do non-blocking write straight 628 * away. In such cases the `io_watcher` has to be queued for asynchronous 629 * write. 630 */ 631 if (!uv__queue_empty(&handle->write_queue)) 632 uv__io_start(handle->loop, &handle->io_watcher, POLLOUT); 633 } else { 634 uv__io_start(handle->loop, &handle->io_watcher, POLLOUT); 635 } 636 637 return 0; 638 } 639 640 641 int uv__udp_try_send(uv_udp_t* handle, 642 const uv_buf_t bufs[], 643 unsigned int nbufs, 644 const struct sockaddr* addr, 645 unsigned int addrlen) { 646 int err; 647 648 if (nbufs < 1) 649 return UV_EINVAL; 650 651 /* already sending a message */ 652 if (handle->send_queue_count != 0) 653 return UV_EAGAIN; 654 655 if (addr) { 656 err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0); 657 if (err) 658 return err; 659 } else { 660 assert(handle->flags & UV_HANDLE_UDP_CONNECTED); 661 } 662 663 err = uv__udp_sendmsg1(handle->io_watcher.fd, bufs, nbufs, addr); 664 if (err > 0) 665 return uv__count_bufs(bufs, nbufs); 666 667 return err; 668 } 669 670 671 static int uv__udp_set_membership4(uv_udp_t* handle, 672 const struct sockaddr_in* multicast_addr, 673 const char* interface_addr, 674 uv_membership membership) { 675 struct ip_mreq mreq; 676 int optname; 677 int err; 678 679 memset(&mreq, 0, sizeof mreq); 680 681 if (interface_addr) { 682 err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr); 683 if (err) 684 return err; 685 } else { 686 mreq.imr_interface.s_addr = htonl(INADDR_ANY); 687 } 688 689 mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr; 690 691 switch (membership) { 692 case UV_JOIN_GROUP: 693 optname = IP_ADD_MEMBERSHIP; 694 break; 695 case UV_LEAVE_GROUP: 696 optname = IP_DROP_MEMBERSHIP; 697 break; 698 default: 699 return UV_EINVAL; 700 } 701 702 if (setsockopt(handle->io_watcher.fd, 703 IPPROTO_IP, 704 optname, 705 &mreq, 706 sizeof(mreq))) { 707 #if defined(__MVS__) 708 if (errno == ENXIO) 709 return UV_ENODEV; 710 #endif 711 return UV__ERR(errno); 712 } 713 714 return 0; 715 } 716 717 718 static int uv__udp_set_membership6(uv_udp_t* handle, 719 const struct sockaddr_in6* multicast_addr, 720 const char* interface_addr, 721 uv_membership membership) { 722 int optname; 723 struct ipv6_mreq mreq; 724 struct sockaddr_in6 addr6; 725 726 memset(&mreq, 0, sizeof mreq); 727 728 if (interface_addr) { 729 if (uv_ip6_addr(interface_addr, 0, &addr6)) 730 return UV_EINVAL; 731 mreq.ipv6mr_interface = addr6.sin6_scope_id; 732 } else { 733 mreq.ipv6mr_interface = 0; 734 } 735 736 mreq.ipv6mr_multiaddr = multicast_addr->sin6_addr; 737 738 switch (membership) { 739 case UV_JOIN_GROUP: 740 optname = IPV6_ADD_MEMBERSHIP; 741 break; 742 case UV_LEAVE_GROUP: 743 optname = IPV6_DROP_MEMBERSHIP; 744 break; 745 default: 746 return UV_EINVAL; 747 } 748 749 if (setsockopt(handle->io_watcher.fd, 750 IPPROTO_IPV6, 751 optname, 752 &mreq, 753 sizeof(mreq))) { 754 #if defined(__MVS__) 755 if (errno == ENXIO) 756 return UV_ENODEV; 757 #endif 758 return UV__ERR(errno); 759 } 760 761 return 0; 762 } 763 764 765 #if !defined(__OpenBSD__) && \ 766 !defined(__NetBSD__) && \ 767 !defined(__ANDROID__) && \ 768 !defined(__DragonFly__) && \ 769 !defined(__QNX__) && \ 770 !defined(__GNU__) 771 static int uv__udp_set_source_membership4(uv_udp_t* handle, 772 const struct sockaddr_in* multicast_addr, 773 const char* interface_addr, 774 const struct sockaddr_in* source_addr, 775 uv_membership membership) { 776 struct ip_mreq_source mreq; 777 int optname; 778 int err; 779 780 err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR); 781 if (err) 782 return err; 783 784 memset(&mreq, 0, sizeof(mreq)); 785 786 if (interface_addr != NULL) { 787 err = uv_inet_pton(AF_INET, interface_addr, &mreq.imr_interface.s_addr); 788 if (err) 789 return err; 790 } else { 791 mreq.imr_interface.s_addr = htonl(INADDR_ANY); 792 } 793 794 mreq.imr_multiaddr.s_addr = multicast_addr->sin_addr.s_addr; 795 mreq.imr_sourceaddr.s_addr = source_addr->sin_addr.s_addr; 796 797 if (membership == UV_JOIN_GROUP) 798 optname = IP_ADD_SOURCE_MEMBERSHIP; 799 else if (membership == UV_LEAVE_GROUP) 800 optname = IP_DROP_SOURCE_MEMBERSHIP; 801 else 802 return UV_EINVAL; 803 804 if (setsockopt(handle->io_watcher.fd, 805 IPPROTO_IP, 806 optname, 807 &mreq, 808 sizeof(mreq))) { 809 return UV__ERR(errno); 810 } 811 812 return 0; 813 } 814 815 816 static int uv__udp_set_source_membership6(uv_udp_t* handle, 817 const struct sockaddr_in6* multicast_addr, 818 const char* interface_addr, 819 const struct sockaddr_in6* source_addr, 820 uv_membership membership) { 821 struct group_source_req mreq; 822 struct sockaddr_in6 addr6; 823 int optname; 824 int err; 825 826 err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR); 827 if (err) 828 return err; 829 830 memset(&mreq, 0, sizeof(mreq)); 831 832 if (interface_addr != NULL) { 833 err = uv_ip6_addr(interface_addr, 0, &addr6); 834 if (err) 835 return err; 836 mreq.gsr_interface = addr6.sin6_scope_id; 837 } else { 838 mreq.gsr_interface = 0; 839 } 840 841 STATIC_ASSERT(sizeof(mreq.gsr_group) >= sizeof(*multicast_addr)); 842 STATIC_ASSERT(sizeof(mreq.gsr_source) >= sizeof(*source_addr)); 843 memcpy(&mreq.gsr_group, multicast_addr, sizeof(*multicast_addr)); 844 memcpy(&mreq.gsr_source, source_addr, sizeof(*source_addr)); 845 846 if (membership == UV_JOIN_GROUP) 847 optname = MCAST_JOIN_SOURCE_GROUP; 848 else if (membership == UV_LEAVE_GROUP) 849 optname = MCAST_LEAVE_SOURCE_GROUP; 850 else 851 return UV_EINVAL; 852 853 if (setsockopt(handle->io_watcher.fd, 854 IPPROTO_IPV6, 855 optname, 856 &mreq, 857 sizeof(mreq))) { 858 return UV__ERR(errno); 859 } 860 861 return 0; 862 } 863 #endif 864 865 866 int uv__udp_init_ex(uv_loop_t* loop, 867 uv_udp_t* handle, 868 unsigned flags, 869 int domain) { 870 int fd; 871 872 fd = -1; 873 if (domain != AF_UNSPEC) { 874 fd = uv__socket(domain, SOCK_DGRAM, 0); 875 if (fd < 0) 876 return fd; 877 } 878 879 uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP); 880 handle->alloc_cb = NULL; 881 handle->recv_cb = NULL; 882 handle->send_queue_size = 0; 883 handle->send_queue_count = 0; 884 uv__io_init(&handle->io_watcher, uv__udp_io, fd); 885 uv__queue_init(&handle->write_queue); 886 uv__queue_init(&handle->write_completed_queue); 887 888 return 0; 889 } 890 891 892 int uv_udp_using_recvmmsg(const uv_udp_t* handle) { 893 #if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) 894 if (handle->flags & UV_HANDLE_UDP_RECVMMSG) 895 return 1; 896 #endif 897 return 0; 898 } 899 900 901 int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) { 902 int err; 903 904 /* Check for already active socket. */ 905 if (handle->io_watcher.fd != -1) 906 return UV_EBUSY; 907 908 if (uv__fd_exists(handle->loop, sock)) 909 return UV_EEXIST; 910 911 err = uv__nonblock(sock, 1); 912 if (err) 913 return err; 914 915 err = uv__sock_reuseaddr(sock); 916 if (err) 917 return err; 918 919 handle->io_watcher.fd = sock; 920 if (uv__udp_is_connected(handle)) 921 handle->flags |= UV_HANDLE_UDP_CONNECTED; 922 923 return 0; 924 } 925 926 927 int uv_udp_set_membership(uv_udp_t* handle, 928 const char* multicast_addr, 929 const char* interface_addr, 930 uv_membership membership) { 931 int err; 932 struct sockaddr_in addr4; 933 struct sockaddr_in6 addr6; 934 935 if (uv_ip4_addr(multicast_addr, 0, &addr4) == 0) { 936 err = uv__udp_maybe_deferred_bind(handle, AF_INET, UV_UDP_REUSEADDR); 937 if (err) 938 return err; 939 return uv__udp_set_membership4(handle, &addr4, interface_addr, membership); 940 } else if (uv_ip6_addr(multicast_addr, 0, &addr6) == 0) { 941 err = uv__udp_maybe_deferred_bind(handle, AF_INET6, UV_UDP_REUSEADDR); 942 if (err) 943 return err; 944 return uv__udp_set_membership6(handle, &addr6, interface_addr, membership); 945 } else { 946 return UV_EINVAL; 947 } 948 } 949 950 951 int uv_udp_set_source_membership(uv_udp_t* handle, 952 const char* multicast_addr, 953 const char* interface_addr, 954 const char* source_addr, 955 uv_membership membership) { 956 #if !defined(__OpenBSD__) && \ 957 !defined(__NetBSD__) && \ 958 !defined(__ANDROID__) && \ 959 !defined(__DragonFly__) && \ 960 !defined(__QNX__) && \ 961 !defined(__GNU__) 962 int err; 963 union uv__sockaddr mcast_addr; 964 union uv__sockaddr src_addr; 965 966 err = uv_ip4_addr(multicast_addr, 0, &mcast_addr.in); 967 if (err) { 968 err = uv_ip6_addr(multicast_addr, 0, &mcast_addr.in6); 969 if (err) 970 return err; 971 err = uv_ip6_addr(source_addr, 0, &src_addr.in6); 972 if (err) 973 return err; 974 return uv__udp_set_source_membership6(handle, 975 &mcast_addr.in6, 976 interface_addr, 977 &src_addr.in6, 978 membership); 979 } 980 981 err = uv_ip4_addr(source_addr, 0, &src_addr.in); 982 if (err) 983 return err; 984 return uv__udp_set_source_membership4(handle, 985 &mcast_addr.in, 986 interface_addr, 987 &src_addr.in, 988 membership); 989 #else 990 return UV_ENOSYS; 991 #endif 992 } 993 994 995 static int uv__setsockopt(uv_udp_t* handle, 996 int option4, 997 int option6, 998 const void* val, 999 socklen_t size) { 1000 int r; 1001 1002 if (handle->flags & UV_HANDLE_IPV6) 1003 r = setsockopt(handle->io_watcher.fd, 1004 IPPROTO_IPV6, 1005 option6, 1006 val, 1007 size); 1008 else 1009 r = setsockopt(handle->io_watcher.fd, 1010 IPPROTO_IP, 1011 option4, 1012 val, 1013 size); 1014 if (r) 1015 return UV__ERR(errno); 1016 1017 return 0; 1018 } 1019 1020 static int uv__setsockopt_maybe_char(uv_udp_t* handle, 1021 int option4, 1022 int option6, 1023 int val) { 1024 #if defined(__sun) || defined(_AIX) || defined(__MVS__) 1025 char arg = val; 1026 #elif defined(__OpenBSD__) 1027 unsigned char arg = val; 1028 #else 1029 int arg = val; 1030 #endif 1031 1032 if (val < 0 || val > 255) 1033 return UV_EINVAL; 1034 1035 return uv__setsockopt(handle, option4, option6, &arg, sizeof(arg)); 1036 } 1037 1038 1039 int uv_udp_set_broadcast(uv_udp_t* handle, int on) { 1040 if (setsockopt(handle->io_watcher.fd, 1041 SOL_SOCKET, 1042 SO_BROADCAST, 1043 &on, 1044 sizeof(on))) { 1045 return UV__ERR(errno); 1046 } 1047 1048 return 0; 1049 } 1050 1051 1052 int uv_udp_set_ttl(uv_udp_t* handle, int ttl) { 1053 if (ttl < 1 || ttl > 255) 1054 return UV_EINVAL; 1055 1056 #if defined(__MVS__) 1057 if (!(handle->flags & UV_HANDLE_IPV6)) 1058 return UV_ENOTSUP; /* zOS does not support setting ttl for IPv4 */ 1059 #endif 1060 1061 /* 1062 * On Solaris and derivatives such as SmartOS, the length of socket options 1063 * is sizeof(int) for IP_TTL and IPV6_UNICAST_HOPS, 1064 * so hardcode the size of these options on this platform, 1065 * and use the general uv__setsockopt_maybe_char call on other platforms. 1066 */ 1067 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \ 1068 defined(__MVS__) || defined(__QNX__) 1069 1070 return uv__setsockopt(handle, 1071 IP_TTL, 1072 IPV6_UNICAST_HOPS, 1073 &ttl, 1074 sizeof(ttl)); 1075 1076 #else /* !(defined(__sun) || defined(_AIX) || defined (__OpenBSD__) || 1077 defined(__MVS__) || defined(__QNX__)) */ 1078 1079 return uv__setsockopt_maybe_char(handle, 1080 IP_TTL, 1081 IPV6_UNICAST_HOPS, 1082 ttl); 1083 1084 #endif /* defined(__sun) || defined(_AIX) || defined (__OpenBSD__) || 1085 defined(__MVS__) || defined(__QNX__) */ 1086 } 1087 1088 1089 int uv_udp_set_multicast_ttl(uv_udp_t* handle, int ttl) { 1090 /* 1091 * On Solaris and derivatives such as SmartOS, the length of socket options 1092 * is sizeof(int) for IPV6_MULTICAST_HOPS and sizeof(char) for 1093 * IP_MULTICAST_TTL, so hardcode the size of the option in the IPv6 case, 1094 * and use the general uv__setsockopt_maybe_char call otherwise. 1095 */ 1096 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \ 1097 defined(__MVS__) || defined(__QNX__) 1098 if (handle->flags & UV_HANDLE_IPV6) 1099 return uv__setsockopt(handle, 1100 IP_MULTICAST_TTL, 1101 IPV6_MULTICAST_HOPS, 1102 &ttl, 1103 sizeof(ttl)); 1104 #endif /* defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \ 1105 defined(__MVS__) || defined(__QNX__) */ 1106 1107 return uv__setsockopt_maybe_char(handle, 1108 IP_MULTICAST_TTL, 1109 IPV6_MULTICAST_HOPS, 1110 ttl); 1111 } 1112 1113 1114 int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) { 1115 /* 1116 * On Solaris and derivatives such as SmartOS, the length of socket options 1117 * is sizeof(int) for IPV6_MULTICAST_LOOP and sizeof(char) for 1118 * IP_MULTICAST_LOOP, so hardcode the size of the option in the IPv6 case, 1119 * and use the general uv__setsockopt_maybe_char call otherwise. 1120 */ 1121 #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \ 1122 defined(__MVS__) || defined(__QNX__) 1123 if (handle->flags & UV_HANDLE_IPV6) 1124 return uv__setsockopt(handle, 1125 IP_MULTICAST_LOOP, 1126 IPV6_MULTICAST_LOOP, 1127 &on, 1128 sizeof(on)); 1129 #endif /* defined(__sun) || defined(_AIX) ||defined(__OpenBSD__) || 1130 defined(__MVS__) || defined(__QNX__) */ 1131 1132 return uv__setsockopt_maybe_char(handle, 1133 IP_MULTICAST_LOOP, 1134 IPV6_MULTICAST_LOOP, 1135 on); 1136 } 1137 1138 int uv_udp_set_multicast_interface(uv_udp_t* handle, const char* interface_addr) { 1139 struct sockaddr_storage addr_st; 1140 struct sockaddr_in* addr4; 1141 struct sockaddr_in6* addr6; 1142 1143 addr4 = (struct sockaddr_in*) &addr_st; 1144 addr6 = (struct sockaddr_in6*) &addr_st; 1145 1146 if (!interface_addr) { 1147 memset(&addr_st, 0, sizeof addr_st); 1148 if (handle->flags & UV_HANDLE_IPV6) { 1149 addr_st.ss_family = AF_INET6; 1150 addr6->sin6_scope_id = 0; 1151 } else { 1152 addr_st.ss_family = AF_INET; 1153 addr4->sin_addr.s_addr = htonl(INADDR_ANY); 1154 } 1155 } else if (uv_ip4_addr(interface_addr, 0, addr4) == 0) { 1156 /* nothing, address was parsed */ 1157 } else if (uv_ip6_addr(interface_addr, 0, addr6) == 0) { 1158 /* nothing, address was parsed */ 1159 } else { 1160 return UV_EINVAL; 1161 } 1162 1163 if (addr_st.ss_family == AF_INET) { 1164 if (setsockopt(handle->io_watcher.fd, 1165 IPPROTO_IP, 1166 IP_MULTICAST_IF, 1167 (void*) &addr4->sin_addr, 1168 sizeof(addr4->sin_addr)) == -1) { 1169 return UV__ERR(errno); 1170 } 1171 } else if (addr_st.ss_family == AF_INET6) { 1172 if (setsockopt(handle->io_watcher.fd, 1173 IPPROTO_IPV6, 1174 IPV6_MULTICAST_IF, 1175 &addr6->sin6_scope_id, 1176 sizeof(addr6->sin6_scope_id)) == -1) { 1177 return UV__ERR(errno); 1178 } 1179 } else { 1180 assert(0 && "unexpected address family"); 1181 abort(); 1182 } 1183 1184 return 0; 1185 } 1186 1187 int uv_udp_getpeername(const uv_udp_t* handle, 1188 struct sockaddr* name, 1189 int* namelen) { 1190 1191 return uv__getsockpeername((const uv_handle_t*) handle, 1192 getpeername, 1193 name, 1194 namelen); 1195 } 1196 1197 int uv_udp_getsockname(const uv_udp_t* handle, 1198 struct sockaddr* name, 1199 int* namelen) { 1200 1201 return uv__getsockpeername((const uv_handle_t*) handle, 1202 getsockname, 1203 name, 1204 namelen); 1205 } 1206 1207 1208 int uv__udp_recv_start(uv_udp_t* handle, 1209 uv_alloc_cb alloc_cb, 1210 uv_udp_recv_cb recv_cb) { 1211 int err; 1212 1213 if (alloc_cb == NULL || recv_cb == NULL) 1214 return UV_EINVAL; 1215 1216 if (uv__io_active(&handle->io_watcher, POLLIN)) 1217 return UV_EALREADY; /* FIXME(bnoordhuis) Should be UV_EBUSY. */ 1218 1219 err = uv__udp_maybe_deferred_bind(handle, AF_INET, 0); 1220 if (err) 1221 return err; 1222 1223 handle->alloc_cb = alloc_cb; 1224 handle->recv_cb = recv_cb; 1225 1226 uv__io_start(handle->loop, &handle->io_watcher, POLLIN); 1227 uv__handle_start(handle); 1228 1229 return 0; 1230 } 1231 1232 1233 int uv__udp_recv_stop(uv_udp_t* handle) { 1234 uv__io_stop(handle->loop, &handle->io_watcher, POLLIN); 1235 1236 if (!uv__io_active(&handle->io_watcher, POLLOUT)) 1237 uv__handle_stop(handle); 1238 1239 handle->alloc_cb = NULL; 1240 handle->recv_cb = NULL; 1241 1242 return 0; 1243 } 1244 1245 1246 static int uv__udp_prep_pkt(struct msghdr* h, 1247 const uv_buf_t* bufs, 1248 const unsigned int nbufs, 1249 const struct sockaddr* addr) { 1250 memset(h, 0, sizeof(*h)); 1251 h->msg_name = (void*) addr; 1252 h->msg_iov = (void*) bufs; 1253 h->msg_iovlen = nbufs; 1254 if (addr == NULL) 1255 return 0; 1256 switch (addr->sa_family) { 1257 case AF_INET: 1258 h->msg_namelen = sizeof(struct sockaddr_in); 1259 return 0; 1260 case AF_INET6: 1261 h->msg_namelen = sizeof(struct sockaddr_in6); 1262 return 0; 1263 case AF_UNIX: 1264 h->msg_namelen = sizeof(struct sockaddr_un); 1265 return 0; 1266 case AF_UNSPEC: 1267 h->msg_name = NULL; 1268 return 0; 1269 } 1270 return UV_EINVAL; 1271 } 1272 1273 1274 static int uv__udp_sendmsg1(int fd, 1275 const uv_buf_t* bufs, 1276 unsigned int nbufs, 1277 const struct sockaddr* addr) { 1278 struct msghdr h; 1279 int r; 1280 1281 if ((r = uv__udp_prep_pkt(&h, bufs, nbufs, addr))) 1282 return r; 1283 1284 do 1285 r = sendmsg(fd, &h, 0); 1286 while (r == -1 && errno == EINTR); 1287 1288 if (r < 0) { 1289 r = UV__ERR(errno); 1290 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) 1291 r = UV_EAGAIN; 1292 return r; 1293 } 1294 1295 /* UDP sockets don't EOF so we don't have to handle r=0 specially, 1296 * that only happens when the input was a zero-sized buffer. 1297 */ 1298 return 1; 1299 } 1300 1301 1302 static int uv__udp_sendmsgv(int fd, 1303 unsigned int count, 1304 uv_buf_t* bufs[/*count*/], 1305 unsigned int nbufs[/*count*/], 1306 struct sockaddr* addrs[/*count*/]) { 1307 unsigned int i; 1308 int nsent; 1309 int r; 1310 1311 r = 0; 1312 nsent = 0; 1313 1314 #if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) || \ 1315 (defined(__sun__) && defined(MSG_WAITFORONE)) 1316 if (count > 1) { 1317 for (i = 0; i < count; /*empty*/) { 1318 struct mmsghdr m[20]; 1319 unsigned int n; 1320 1321 for (n = 0; i < count && n < ARRAY_SIZE(m); i++, n++) 1322 if ((r = uv__udp_prep_pkt(&m[n].msg_hdr, bufs[i], nbufs[i], addrs[i]))) 1323 goto exit; 1324 1325 do 1326 #if defined(__APPLE__) 1327 r = sendmsg_x(fd, m, n, MSG_DONTWAIT); 1328 #else 1329 r = sendmmsg(fd, m, n, 0); 1330 #endif 1331 while (r == -1 && errno == EINTR); 1332 1333 if (r < 1) 1334 goto exit; 1335 1336 nsent += r; 1337 i += r; 1338 } 1339 1340 goto exit; 1341 } 1342 #endif /* defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) || 1343 * (defined(__sun__) && defined(MSG_WAITFORONE)) 1344 */ 1345 1346 for (i = 0; i < count; i++, nsent++) 1347 if ((r = uv__udp_sendmsg1(fd, bufs[i], nbufs[i], addrs[i]))) 1348 goto exit; /* goto to avoid unused label warning. */ 1349 1350 exit: 1351 1352 if (nsent > 0) 1353 return nsent; 1354 1355 if (r < 0) { 1356 r = UV__ERR(errno); 1357 if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) 1358 r = UV_EAGAIN; 1359 } 1360 1361 return r; 1362 } 1363 1364 1365 static void uv__udp_sendmsg(uv_udp_t* handle) { 1366 static const int N = 20; 1367 struct sockaddr* addrs[N]; 1368 unsigned int nbufs[N]; 1369 uv_buf_t* bufs[N]; 1370 struct uv__queue* q; 1371 uv_udp_send_t* req; 1372 int n; 1373 1374 if (uv__queue_empty(&handle->write_queue)) 1375 return; 1376 1377 again: 1378 n = 0; 1379 q = uv__queue_head(&handle->write_queue); 1380 do { 1381 req = uv__queue_data(q, uv_udp_send_t, queue); 1382 addrs[n] = &req->u.addr; 1383 nbufs[n] = req->nbufs; 1384 bufs[n] = req->bufs; 1385 q = uv__queue_next(q); 1386 n++; 1387 } while (n < N && q != &handle->write_queue); 1388 1389 n = uv__udp_sendmsgv(handle->io_watcher.fd, n, bufs, nbufs, addrs); 1390 while (n > 0) { 1391 q = uv__queue_head(&handle->write_queue); 1392 req = uv__queue_data(q, uv_udp_send_t, queue); 1393 req->status = uv__count_bufs(req->bufs, req->nbufs); 1394 uv__queue_remove(&req->queue); 1395 uv__queue_insert_tail(&handle->write_completed_queue, &req->queue); 1396 n--; 1397 } 1398 1399 if (n == 0) { 1400 if (uv__queue_empty(&handle->write_queue)) 1401 goto feed; 1402 goto again; 1403 } 1404 1405 if (n == UV_EAGAIN) 1406 return; 1407 1408 /* Register the error against first request in queue because that 1409 * is the request that uv__udp_sendmsgv tried but failed to send, 1410 * because if it did send any requests, it won't return an error. 1411 */ 1412 q = uv__queue_head(&handle->write_queue); 1413 req = uv__queue_data(q, uv_udp_send_t, queue); 1414 req->status = n; 1415 uv__queue_remove(&req->queue); 1416 uv__queue_insert_tail(&handle->write_completed_queue, &req->queue); 1417 feed: 1418 uv__io_feed(handle->loop, &handle->io_watcher); 1419 } 1420 1421 1422 int uv__udp_try_send2(uv_udp_t* handle, 1423 unsigned int count, 1424 uv_buf_t* bufs[/*count*/], 1425 unsigned int nbufs[/*count*/], 1426 struct sockaddr* addrs[/*count*/]) { 1427 int fd; 1428 1429 fd = handle->io_watcher.fd; 1430 if (fd == -1) 1431 return UV_EINVAL; 1432 1433 return uv__udp_sendmsgv(fd, count, bufs, nbufs, addrs); 1434 } 1435