1 /* ioloop.c 2 * 3 * Copyright (c) 2018-2023 Apple, Inc. All rights reserved. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * https://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 * Simple event dispatcher for DNS. 18 */ 19 20 #define _GNU_SOURCE 21 22 #include <stdlib.h> 23 #include <string.h> 24 #include <stdio.h> 25 #include <unistd.h> 26 #include <sys/uio.h> 27 #include <errno.h> 28 #include <sys/socket.h> 29 #include <netinet/in.h> 30 #include <arpa/inet.h> 31 #include <inttypes.h> 32 #ifdef USE_KQUEUE 33 #include <sys/event.h> 34 #endif 35 #include <sys/wait.h> 36 #include <fcntl.h> 37 #include <sys/time.h> 38 #include <signal.h> 39 #include <net/if.h> 40 #include <ifaddrs.h> 41 #include <spawn.h> 42 43 #include "dns_sd.h" 44 45 #include "srp.h" 46 #include "dns-msg.h" 47 #include "srp-crypto.h" 48 #include "ioloop.h" 49 #ifndef EXCLUDE_TLS 50 #include "srp-tls.h" 51 #endif 52 #include "ifpermit.h" 53 54 #ifndef IOLOOP_MACOS 55 56 typedef struct async_event { 57 struct async_event *next; 58 async_callback_t callback; 59 void *context; 60 } async_event_t; 61 62 io_t *ios; 63 wakeup_t *wakeups; 64 subproc_t *subprocesses; 65 async_event_t *async_events; 66 int64_t ioloop_now; 67 68 #ifdef USE_KQUEUE 69 int kq; 70 #endif 71 static void subproc_finalize(subproc_t *subproc); 72 73 int 74 getipaddr(addr_t *addr, const char *p) 75 { 76 if (inet_pton(AF_INET, p, &addr->sin.sin_addr)) { 77 addr->sa.sa_family = AF_INET; 78 #ifndef NOT_HAVE_SA_LEN 79 addr->sa.sa_len = sizeof addr->sin; 80 #endif 81 return sizeof addr->sin; 82 } else if (inet_pton(AF_INET6, p, &addr->sin6.sin6_addr)) { 83 addr->sa.sa_family = AF_INET6; 84 #ifndef NOT_HAVE_SA_LEN 85 addr->sa.sa_len = sizeof addr->sin6; 86 #endif 87 return sizeof addr->sin6; 88 } else { 89 return 0; 90 } 91 } 92 93 int64_t 94 ioloop_timenow() 95 { 96 int64_t now; 97 struct timeval tv; 98 gettimeofday(&tv, 0); 99 now = (int64_t)tv.tv_sec * 1000 + (int64_t)tv.tv_usec / 1000; 100 return now; 101 } 102 103 static void 104 message_finalize(message_t *message) 105 { 106 free(message); 107 } 108 109 void 110 ioloop_message_retain_(message_t *message, const char *file, int line) 111 { 112 (void)file; (void)line; 113 RETAIN(message, message); 114 } 115 116 void 117 ioloop_message_release_(message_t *message, const char *file, int line) 118 { 119 (void)file; (void)line; 120 RELEASE(message, message); 121 } 122 123 void 124 ioloop_close(io_t *io) 125 { 126 close(io->fd); 127 io->fd = -1; 128 } 129 130 static void 131 add_io(io_t *io) 132 { 133 io_t **iop; 134 135 // Add the new reader to the end of the list if it's not on the list. 136 for (iop = &ios; *iop != NULL && *iop != io; iop = &((*iop)->next)) 137 ; 138 if (*iop == NULL) { 139 *iop = io; 140 io->next = NULL; 141 RETAIN_HERE(io, io); 142 } 143 } 144 145 void 146 ioloop_add_reader(io_t *io, io_callback_t callback) 147 { 148 add_io(io); 149 150 io->read_callback = callback; 151 #ifdef USE_SELECT 152 io->want_read = true; 153 #endif 154 #ifdef USE_EPOLL 155 #endif 156 #ifdef USE_KQUEUE 157 struct kevent ev; 158 int rv; 159 EV_SET(&ev, io->fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, io); 160 rv = kevent(kq, &ev, 1, NULL, 0, NULL); 161 if (rv < 0) { 162 ERROR("kevent add: %s", strerror(errno)); 163 return; 164 } 165 #endif // USE_EPOLL 166 } 167 168 void 169 ioloop_add_writer(io_t *io, io_callback_t callback) 170 { 171 add_io(io); 172 173 io->write_callback = callback; 174 #ifdef USE_SELECT 175 io->want_write = true; 176 #endif 177 #ifdef USE_EPOLL 178 #endif 179 #ifdef USE_KQUEUE 180 struct kevent ev; 181 int rv; 182 EV_SET(&ev, io->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, io); 183 rv = kevent(kq, &ev, 1, NULL, 0, NULL); 184 if (rv < 0) { 185 ERROR("kevent add: %s", strerror(errno)); 186 return; 187 } 188 #endif // USE_EPOLL 189 } 190 191 void 192 drop_writer(io_t *io) 193 { 194 #ifdef USE_SELECT 195 io->want_write = false; 196 #endif 197 #ifdef USE_EPOLL 198 #endif 199 #ifdef USE_KQUEUE 200 struct kevent ev; 201 int rv; 202 EV_SET(&ev, io->fd, EVFILT_WRITE, EV_ADD | EV_DISABLE, 0, 0, io); 203 rv = kevent(kq, &ev, 1, NULL, 0, NULL); 204 if (rv < 0) { 205 ERROR("kevent add: %s", strerror(errno)); 206 return; 207 } 208 #endif // USE_EPOLL 209 } 210 211 static void 212 add_remove_wakeup(wakeup_t *wakeup, bool remove) 213 { 214 wakeup_t **p_wakeups; 215 216 // Add the new reader to the end of the list if it's not on the list. 217 for (p_wakeups = &wakeups; *p_wakeups != NULL && *p_wakeups != wakeup; p_wakeups = &((*p_wakeups)->next)) 218 ; 219 if (remove) { 220 void *wakeup_context = wakeup->context; 221 finalize_callback_t finalize = wakeup->finalize; 222 wakeup->context = NULL; 223 if (wakeup->finalize != NULL) { 224 wakeup->finalize = NULL; 225 wakeup_finalize(wakeup_context); 226 } 227 if (*p_wakeups != NULL) { 228 *p_wakeups = wakeup->next; 229 wakeup->next = NULL; 230 } 231 } else { 232 if (*p_wakeups == NULL) { 233 *p_wakeups = wakeup; 234 wakeup->next = NULL; 235 } 236 } 237 } 238 239 static void 240 wakeup_finalize(void *context) 241 { 242 wakeup_t *wakeup = context; 243 add_remove_wakeup(wakeup, true); 244 free(wakeup); 245 } 246 247 void 248 ioloop_wakeup_retain_(wakeup_t *wakeup, const char *file, int line) 249 { 250 (void)file; (void)line; 251 RETAIN(wakeup, wakeup); 252 } 253 254 void 255 ioloop_wakeup_release_(wakeup_t *wakeup, const char *file, int line) 256 { 257 (void)file; (void)line; 258 RELEASE(wakeup, wakeup); 259 } 260 261 wakeup_t * 262 ioloop_wakeup_create_(const char *file, int line) 263 { 264 wakeup_t *ret = calloc(1, sizeof(*ret)); 265 if (ret) { 266 RETAIN(ret, wakeup); 267 } 268 return ret; 269 } 270 271 bool 272 ioloop_add_wake_event(wakeup_t *wakeup, void *context, wakeup_callback_t callback, wakeup_callback_t finalize, int milliseconds) 273 { 274 if (callback == NULL) { 275 ERROR("ioloop_add_wake_event called with null callback"); 276 return false; 277 } 278 if (milliseconds < 0) { 279 ERROR("ioloop_add_wake_event called with negative timeout"); 280 return false; 281 } 282 INFO("%p %p %d", wakeup, context, milliseconds); 283 add_remove_wakeup(wakeup, true); 284 add_remove_wakeup(wakeup, false); 285 wakeup->wakeup_time = ioloop_timenow() + milliseconds; 286 wakeup->finalize = finalize; 287 wakeup->wakeup = callback; 288 wakeup->context = context; 289 return true; 290 } 291 292 void 293 ioloop_cancel_wake_event(wakeup_t *wakeup) 294 { 295 add_remove_wakeup(wakeup, true); 296 wakeup->wakeup_time = 0; 297 } 298 299 bool 300 ioloop_init(void) 301 { 302 signal(SIGPIPE, SIG_IGN); // because why ever? 303 #ifdef USE_KQUEUE 304 kq = kqueue(); 305 if (kq < 0) { 306 ERROR("kqueue(): %s", strerror(errno)); 307 return false; 308 } 309 #endif 310 return true; 311 } 312 313 static void 314 ioloop_io_finalize(io_t *io) 315 { 316 if (io->io_finalize) { 317 io->io_finalize(io); 318 } else { 319 free(io); 320 } 321 } 322 323 int 324 ioloop_events(int64_t timeout_when) 325 { 326 io_t *io, **iop; 327 wakeup_t *wakeup, **p_wakeup; 328 int nev = 0, rv; 329 int64_t now = ioloop_timenow(); 330 int64_t next_event; 331 int64_t timeout = 0; 332 333 if (ioloop_now != 0) { 334 INFO("%lld.%03lld seconds have passed on entry to ioloop_events", 335 (long long)((now - ioloop_now) / 1000), (long long)((now - ioloop_now) % 1000)); 336 } 337 ioloop_now = now; 338 339 #ifdef USE_SELECT 340 int nfds = 0; 341 fd_set reads, writes, errors; 342 struct timeval tv; 343 344 FD_ZERO(&reads); 345 FD_ZERO(&writes); 346 FD_ZERO(&errors); 347 #endif 348 #ifdef USE_KQUEUE 349 struct timespec ts; 350 #endif 351 352 start_over: 353 p_wakeup = &wakeups; 354 355 // A timeout of zero means don't time out. 356 if (timeout_when == 0) { 357 next_event = INT64_MAX; 358 } else { 359 next_event = timeout_when; 360 } 361 362 // Cycle through the list of timeouts. 363 while (*p_wakeup) { 364 wakeup = *p_wakeup; 365 if (wakeup->wakeup_time != 0) { 366 if (wakeup->wakeup_time <= ioloop_now) { 367 *p_wakeup = wakeup->next; 368 wakeup->wakeup_time = 0; 369 void *wakeup_context = wakeup->context; 370 finalize_callback_t wakeup_finalize = wakeup->finalize; 371 wakeup->finalize = NULL; 372 wakeup->context = NULL; 373 wakeup->wakeup(wakeup_context); 374 if (wakeup_finalize != NULL && wakeup_context != NULL) { 375 wakeup_finalize(wakeup_context); 376 } 377 ++nev; 378 379 // In case either wakeup has been freed, or a new wakeup has been added, we need to start 380 // at the beginning again. This wakeup will never still be on the list unless it's been 381 // re-added with a later time, so this should always have the effect that every wakeup that's 382 // ready gets its callback called, and when all wakeups that are ready have been called, 383 // there are no wakeups that are ready remaining on the list, so our loop exits. 384 goto start_over; 385 } else { 386 p_wakeup = &wakeup->next; 387 } 388 if (wakeup->wakeup_time < next_event && wakeup->wakeup_time != 0) { 389 next_event = wakeup->wakeup_time; 390 } 391 } else { 392 *p_wakeup = wakeup->next; 393 } 394 } 395 396 // Deliver and consume any asynchronous events 397 while (async_events != NULL) { 398 async_event_t *event = async_events; 399 async_events = event->next; 400 event->callback(event->context); 401 free(event); 402 } 403 404 iop = &ios; 405 while (*iop) { 406 io = *iop; 407 // If the I/O is dead, finalize or free it. 408 if (io->fd == -1) { 409 *iop = io->next; 410 RELEASE_HERE(io, io); 411 continue; 412 } 413 414 // One-time callback, used to call the listener ready callback after ioloop_listener_create() has 415 // returned; 416 if (io->ready != NULL) { 417 io->ready(io, io->context); 418 io->ready = NULL; 419 } 420 421 iop = &io->next; 422 } 423 424 INFO("now: %" PRIu64 " next_event %" PRIu64, ioloop_now, next_event); 425 426 // If we were given a timeout in the future, or told to wait indefinitely, wait until the next event. 427 if (timeout_when == 0 || timeout_when > ioloop_now) { 428 timeout = next_event - ioloop_now; 429 // Don't choose a time so far in the future that it might overflow some math in the kernel. 430 if (timeout > IOLOOP_DAY * 100) { 431 timeout = IOLOOP_DAY * 100; 432 } 433 #ifdef USE_SELECT 434 tv.tv_sec = timeout / 1000; 435 tv.tv_usec = (timeout % 1000) * 1000; 436 #endif 437 #ifdef USE_KQUEUE 438 ts.tv_sec = timeout / 1000; 439 ts.tv_nsec = (timeout % 1000) * 1000 * 1000; 440 #endif 441 } 442 443 while (subprocesses != NULL) { 444 int status; 445 pid_t pid; 446 pid = waitpid(-1, &status, WNOHANG); 447 if (pid <= 0) { 448 break; 449 } 450 subproc_t **sp, *subproc; 451 for (sp = &subprocesses; (*sp) != NULL; sp = &(*sp)->next) { 452 subproc = *sp; 453 if (subproc->pid == pid) { 454 if (!WIFSTOPPED(status)) { 455 *sp = subproc->next; 456 } 457 subproc->callback(subproc->context, status, NULL); 458 if (!WIFSTOPPED(status)) { 459 subproc->finished = true; 460 RELEASE_HERE(subproc, subproc); 461 break; 462 } 463 } 464 } 465 } 466 467 #ifdef USE_SELECT 468 for (io = ios; io; io = io->next) { 469 if (io->fd != -1 && (io->want_read || io->want_write)) { 470 if (io->fd >= nfds) { 471 nfds = io->fd + 1; 472 } 473 if (io->want_read) { 474 FD_SET(io->fd, &reads); 475 } 476 if (io->want_write) { 477 FD_SET(io->fd, &writes); 478 } 479 } 480 } 481 #endif 482 483 #ifdef USE_SELECT 484 INFO("waiting %lld %lld seconds", (long long)tv.tv_sec, (long long)tv.tv_usec); 485 rv = select(nfds, &reads, &writes, &errors, &tv); 486 if (rv < 0) { 487 ERROR("select: %s", strerror(errno)); 488 exit(1); 489 } 490 now = ioloop_timenow(); 491 INFO("%lld.%03lld seconds passed waiting, got %d events", (long long)((now - ioloop_now) / 1000), 492 (long long)((now - ioloop_now) % 1000), rv); 493 ioloop_now = now; 494 for (io = ios; io; io = io->next) { 495 if (io->fd != -1) { 496 if (FD_ISSET(io->fd, &reads)) { 497 if (io->read_callback != NULL) { 498 io->read_callback(io, io->context); 499 } 500 } else if (FD_ISSET(io->fd, &writes)) { 501 if (io->write_callback != NULL) { 502 io->write_callback(io, io->context); 503 } 504 } 505 } 506 } 507 nev += rv; 508 #endif // USE_SELECT 509 #ifdef USE_KQUEUE 510 #define KEV_MAX 20 511 struct kevent evs[KEV_MAX]; 512 int i; 513 514 INFO("waiting %lld/%lld seconds", (long long)ts.tv_sec, (long long)ts.tv_nsec); 515 do { 516 rv = kevent(kq, NULL, 0, evs, KEV_MAX, &ts); 517 now = ioloop_timenow(); 518 INFO("%lld.%03lld seconds passed waiting, got %d events", (long long)((now - ioloop_now) / 1000), 519 (long long)((now - ioloop_now) % 1000), rv); 520 ioloop_now = now; 521 ts.tv_sec = 0; 522 ts.tv_nsec = 0; 523 if (rv < 0) { 524 if (errno == EINTR) { 525 rv = 0; 526 } else { 527 ERROR("kevent poll: %s", strerror(errno)); 528 exit(1); 529 } 530 } 531 for (i = 0; i < rv; i++) { 532 io = evs[i].udata; 533 if (evs[i].filter == EVFILT_WRITE) { 534 io->write_callback(io, io->context); 535 } else if (evs[i].filter == EVFILT_READ) { 536 io->read_callback(io, io->context); 537 } 538 } 539 nev += rv; 540 } while (rv == KEV_MAX); 541 #endif 542 return nev; 543 } 544 545 int 546 ioloop(void) 547 { 548 int nev; 549 do { 550 nev = ioloop_events(0); 551 INFO("%d", nev); 552 } while (nev >= 0); 553 ERROR("ioloop returned %d.", nev); 554 return -1; 555 } 556 #endif // !defined(IOLOOP_MACOS) 557 558 static void 559 ioloop_normalize_address(addr_t *normalized, addr_t *original) 560 { 561 uint16_t *sinp = (uint16_t *)&original->sin6.sin6_addr; 562 // Check for ::ffff:xxxx:xxxx, which is an ipv4mapped address 563 if (sinp[0] == 0 && sinp[1] == 0 && sinp[2] == 0 && sinp[3] == 0 && sinp[4] == 0 && sinp[5] == 0xffff) { 564 normalized->sin.sin_family = AF_INET; 565 memcpy(&normalized->sin.sin_addr, &sinp[6], sizeof(struct in_addr)); 566 normalized->sin.sin_port = original->sin6.sin6_port; 567 } else { 568 *normalized = *original; 569 } 570 } 571 572 void 573 ioloop_udp_read_callback(io_t *io, void *context) 574 { 575 comm_t *connection = (comm_t *)context; 576 addr_t src; 577 ssize_t rv; 578 struct msghdr msg; 579 struct iovec bufp; 580 uint8_t msgbuf[DNS_MAX_UDP_PAYLOAD]; 581 char cmsgbuf[128]; 582 struct cmsghdr *cmh; 583 message_t *message; 584 (void)context; 585 586 bufp.iov_base = msgbuf; 587 bufp.iov_len = DNS_MAX_UDP_PAYLOAD; 588 msg.msg_iov = &bufp; 589 msg.msg_iovlen = 1; 590 msg.msg_name = &src; 591 msg.msg_namelen = sizeof src; 592 msg.msg_control = cmsgbuf; 593 msg.msg_controllen = sizeof cmsgbuf; 594 595 rv = recvmsg(io->fd, &msg, 0); 596 if (rv < 0) { 597 ERROR("%s", strerror(errno)); 598 return; 599 } 600 message = ioloop_message_create(rv); 601 if (!message) { 602 ERROR("out of memory"); 603 return; 604 } 605 memcpy(&message->src, &src, sizeof src); 606 if (rv > UINT16_MAX) { 607 ERROR("message is surprisingly large: %zd", rv); 608 return; 609 } 610 message->length = (uint16_t)rv; 611 memcpy(&message->wire, msgbuf, rv); 612 613 // For UDP, we use the interface index as part of the validation strategy, so go get 614 // the interface index. 615 bool set_local = false; 616 for (cmh = CMSG_FIRSTHDR(&msg); cmh; cmh = CMSG_NXTHDR(&msg, cmh)) { 617 addr_t source_address, local_address; 618 619 if (cmh->cmsg_level == IPPROTO_IPV6 && cmh->cmsg_type == IPV6_PKTINFO) { 620 struct in6_pktinfo pktinfo; 621 622 memcpy(&pktinfo, CMSG_DATA(cmh), sizeof pktinfo); 623 message->ifindex = pktinfo.ipi6_ifindex; 624 625 /* Get address to which the message was sent, for use when replying. */ 626 message->local.sin6.sin6_family = AF_INET6; 627 message->local.sin6.sin6_port = htons(connection->listen_port); 628 message->local.sin6.sin6_addr = pktinfo.ipi6_addr; 629 #ifndef NOT_HAVE_SA_LEN 630 message->local.sin6.sin6_len = sizeof message->local; 631 #endif 632 set_local = true; 633 } else if (cmh->cmsg_level == IPPROTO_IP && cmh->cmsg_type == IP_PKTINFO) { 634 struct in_pktinfo pktinfo; 635 636 memcpy(&pktinfo, CMSG_DATA(cmh), sizeof pktinfo); 637 message->ifindex = pktinfo.ipi_ifindex; 638 639 message->local.sin.sin_family = AF_INET; 640 message->local.sin.sin_addr = pktinfo.ipi_addr; 641 #ifndef NOT_HAVE_SA_LEN 642 message->local.sin.sin_len = sizeof message->local; 643 #endif 644 message->local.sin.sin_port = htons(connection->listen_port); 645 set_local = true; 646 } 647 if (set_local) { 648 ioloop_normalize_address(&source_address, &src); 649 ioloop_normalize_address(&local_address, &message->local); 650 if (source_address.sa.sa_family == AF_INET6) { 651 SEGMENTED_IPv6_ADDR_GEN_SRP(&source_address.sin6.sin6_addr, src_addr_buf); 652 SEGMENTED_IPv6_ADDR_GEN_SRP(&local_address.sin6.sin6_addr, dest_addr_buf); 653 INFO("received %zd byte UDP message on index %d to " PRI_SEGMENTED_IPv6_ADDR_SRP "#%d from " 654 PRI_SEGMENTED_IPv6_ADDR_SRP "#%d", rv, message->ifindex, 655 SEGMENTED_IPv6_ADDR_PARAM_SRP(&local_address.sin6.sin6_addr, dest_addr_buf), 656 ntohs(local_address.sin6.sin6_port), 657 SEGMENTED_IPv6_ADDR_PARAM_SRP(&source_address.sin6.sin6_addr, src_addr_buf), 658 ntohs(source_address.sin6.sin6_port)); 659 } else { 660 IPv4_ADDR_GEN_SRP(&source_address.sin.sin_addr.s_addr, src_addr_buf); 661 IPv4_ADDR_GEN_SRP(&local_address.sin.sin_addr.s_addr, dest_addr_buf); 662 INFO("received %zd byte UDP message on index %d to " PRI_IPv4_ADDR_SRP "#%d from " PRI_IPv4_ADDR_SRP "#%d", rv, 663 message->ifindex, IPv4_ADDR_PARAM_SRP(&local_address.sin.sin_addr.s_addr, dest_addr_buf), 664 ntohs(local_address.sin.sin_port), 665 IPv4_ADDR_PARAM_SRP(&local_address.sin.sin_addr.s_addr, src_addr_buf), 666 ntohs(source_address.sin.sin_port)); 667 } 668 } 669 } 670 671 // The first packet we get via inetd will not have the PKTINFO sockopt set, since we can only set that after we've 672 // started. We can expect a retransmission, so just drop it rather than trying to do something clever. 673 if (set_local) { 674 connection->datagram_callback(connection, message, connection->context); 675 } else { 676 ERROR("dropping incoming packet because we didn't get a destination address."); 677 } 678 ioloop_message_release(message); 679 } 680 681 #ifndef IOLOOP_MACOS 682 static void 683 tcp_read_callback(io_t *io, void *context) 684 { 685 uint8_t *read_ptr; 686 size_t read_len; 687 comm_t *connection = (comm_t *)io; 688 ssize_t rv; 689 (void)context; 690 if (connection->message_length_len < 2) { 691 read_ptr = connection->message_length_bytes; 692 read_len = 2 - connection->message_length_len; 693 } else { 694 read_ptr = &connection->buf[connection->message_cur]; 695 read_len = connection->message_length - connection->message_cur; 696 } 697 698 if (connection->tls_context != NULL) { 699 #ifndef EXCLUDE_TLS 700 rv = srp_tls_read(connection, read_ptr, read_len); 701 if (rv == 0) { 702 // This isn't an EOF: that's returned as an error status. This just means that 703 // whatever data was available to be read was consumed by the TLS protocol without 704 // producing anything to read at the app layer. 705 return; 706 } else if (rv < 0) { 707 ERROR("TLS return that we can't handle."); 708 close(connection->io.fd); 709 connection->io.fd = -1; 710 srp_tls_context_free(connection); 711 return; 712 } 713 #else 714 ERROR("tls context with TLS excluded in tcp_read_callback."); 715 return; 716 #endif 717 } else { 718 rv = read(connection->io.fd, read_ptr, read_len); 719 720 if (rv < 0) { 721 ERROR("tcp_read_callback: %s", strerror(errno)); 722 close(connection->io.fd); 723 connection->io.fd = -1; 724 // connection->io.finalize() will be called from the io loop. 725 return; 726 } 727 728 // If we read zero here, the remote endpoint has closed or shutdown the connection. Either case is 729 // effectively the same--if we are sensitive to read events, that means that we are done processing 730 // the previous message. 731 if (rv == 0) { 732 ERROR("tcp_read_callback: remote end (%s) closed connection on %d", connection->name, connection->io.fd); 733 close(connection->io.fd); 734 connection->io.fd = -1; 735 if (connection->disconnected) { 736 connection->disconnected(connection, connection->context, 0); 737 } 738 // connection->io.finalize() will be called from the io loop. 739 return; 740 } 741 } 742 if (connection->message_length_len < 2) { 743 connection->message_length_len += rv; 744 if (connection->message_length_len == 2) { 745 connection->message_length = (((uint16_t)connection->message_length_bytes[0] << 8) | 746 ((uint16_t)connection->message_length_bytes[1])); 747 748 if (connection->message == NULL) { 749 connection->message = ioloop_message_create(connection->message_length); 750 if (!connection->message) { 751 ERROR("udp_read_callback: out of memory"); 752 return; 753 } 754 connection->buf = (uint8_t *)&connection->message->wire; 755 connection->message->length = connection->message_length; 756 memset(&connection->message->src, 0, sizeof connection->message->src); 757 } 758 } 759 } else { 760 connection->message_cur += rv; 761 if (connection->message_cur == connection->message_length) { 762 connection->message_cur = 0; 763 connection->datagram_callback(connection, connection->message, connection->context); 764 // The callback may retain the message; we need to make way for the next one. 765 ioloop_message_release(connection->message); 766 connection->message = NULL; 767 connection->message_length = connection->message_length_len = 0; 768 } 769 } 770 } 771 772 773 static bool 774 tcp_send_response(comm_t *comm, message_t *responding_to, struct iovec *iov, int iov_len, bool send_length) 775 { 776 struct msghdr mh; 777 struct iovec iovec[4]; 778 char lenbuf[2]; 779 ssize_t status; 780 size_t payload_length = 0; 781 int i; 782 783 // We don't anticipate ever needing more than four hunks, but if we get more, handle then? 784 if (iov_len > 3) { 785 ERROR("tcp_send_response: too many io buffers"); 786 close(comm->io.fd); 787 comm->io.fd = -1; 788 return false; 789 } 790 791 i = 0; 792 if (send_length) { 793 i++; 794 } 795 for (i = 0; i < iov_len; i++) { 796 iovec[i + 1] = iov[i]; 797 payload_length += iov[i].iov_len; 798 } 799 if (send_length) { 800 iovec[0].iov_base = &lenbuf[0]; 801 iovec[0].iov_len = 2; 802 803 lenbuf[0] = payload_length / 256; 804 lenbuf[1] = payload_length & 0xff; 805 806 payload_length += 2; 807 } 808 809 #ifndef MSG_NOSIGNAL 810 #define MSG_NOSIGNAL 0 811 #endif 812 if (comm->tls_context != NULL) { 813 #ifndef EXCLUDE_TLS 814 status = srp_tls_write(comm, iovec, iov_len + 1); 815 #else 816 ERROR("TLS context not null with TLS excluded."); 817 status = -1; 818 errno = ENOTSUP; 819 return false; 820 #endif 821 } else { 822 memset(&mh, 0, sizeof mh); 823 mh.msg_iov = &iovec[0]; 824 mh.msg_iovlen = iov_len + 1; 825 mh.msg_name = 0; 826 827 status = sendmsg(comm->io.fd, &mh, MSG_NOSIGNAL); 828 } 829 if (status < 0 || status != payload_length) { 830 if (status < 0) { 831 ERROR("tcp_send_response: write failed: %s", strerror(errno)); 832 } else { 833 ERROR("tcp_send_response: short write (%zd out of %zu bytes)", status, payload_length); 834 } 835 close(comm->io.fd); 836 comm->io.fd = -1; 837 return false; 838 } 839 return true; 840 } 841 #endif // !IOLOOP_MACOS 842 843 #if !defined(IOLOOP_MACOS) || !UDP_LISTENER_USES_CONNECTION_GROUPS 844 bool 845 ioloop_udp_send_message(comm_t *comm, addr_t *source, addr_t *dest, int ifindex, struct iovec *iov, int iov_len) 846 { 847 struct msghdr mh; 848 uint8_t cmsg_buf[128]; 849 struct cmsghdr *cmsg; 850 ssize_t status; 851 852 memset(&mh, 0, sizeof mh); 853 mh.msg_iov = iov; 854 mh.msg_iovlen = iov_len; 855 mh.msg_name = dest; 856 mh.msg_control = cmsg_buf; 857 if (source == NULL) { 858 mh.msg_controllen = 0; 859 } else { 860 mh.msg_controllen = sizeof cmsg_buf; 861 cmsg = CMSG_FIRSTHDR(&mh); 862 863 if (source->sa.sa_family == AF_INET) { 864 struct in_pktinfo *inp; 865 mh.msg_namelen = sizeof (struct sockaddr_in); 866 mh.msg_controllen = CMSG_SPACE(sizeof *inp); 867 cmsg->cmsg_level = IPPROTO_IP; 868 cmsg->cmsg_type = IP_PKTINFO; 869 cmsg->cmsg_len = CMSG_LEN(sizeof *inp); 870 inp = (struct in_pktinfo *)CMSG_DATA(cmsg); 871 memset(inp, 0, sizeof *inp); 872 inp->ipi_ifindex = ifindex; 873 inp->ipi_spec_dst = source->sin.sin_addr; 874 inp->ipi_addr = source->sin.sin_addr; 875 } else if (source->sa.sa_family == AF_INET6) { 876 struct in6_pktinfo *inp; 877 mh.msg_namelen = sizeof (struct sockaddr_in6); 878 mh.msg_controllen = CMSG_SPACE(sizeof *inp); 879 cmsg->cmsg_level = IPPROTO_IPV6; 880 cmsg->cmsg_type = IPV6_PKTINFO; 881 cmsg->cmsg_len = CMSG_LEN(sizeof *inp); 882 inp = (struct in6_pktinfo *)CMSG_DATA(cmsg); 883 memset(inp, 0, sizeof *inp); 884 inp->ipi6_ifindex = ifindex; 885 inp->ipi6_addr = source->sin6.sin6_addr; 886 } else { 887 ERROR("unknown family %d", source->sa.sa_family); 888 abort(); 889 } 890 } 891 size_t len = 0; 892 for (int i = 0; i < iov_len; i++) { 893 len += iov[i].iov_len; 894 } 895 addr_t dest_addr, source_addr; 896 ioloop_normalize_address(&dest_addr, dest); 897 if (source != NULL) { 898 ioloop_normalize_address(&source_addr, source); 899 } else { 900 memset(&source_addr, 0, sizeof(source_addr)); 901 source_addr.sa.sa_family = dest_addr.sa.sa_family; 902 } 903 if (dest_addr.sa.sa_family == AF_INET) { 904 IPv4_ADDR_GEN_SRP(&source_addr.sin.sin_addr.s_addr, ipv4_src_buf); 905 IPv4_ADDR_GEN_SRP(&dest_addr.sin.sin_addr.s_addr, ipv4_dest_buf); 906 INFO("sending %zd byte UDP response from " PRI_IPv4_ADDR_SRP " port %d index %d to " PRI_IPv4_ADDR_SRP "#%d", 907 len, IPv4_ADDR_PARAM_SRP(&source_addr.sin.sin_addr.s_addr, ipv4_src_buf), 908 ifindex, ntohs(source_addr.sin.sin_port), 909 IPv4_ADDR_PARAM_SRP(&dest_addr.sin.sin_addr.s_addr, ipv4_dest_buf), ntohs(dest_addr.sin.sin_port)); 910 } else { 911 SEGMENTED_IPv6_ADDR_GEN_SRP(&source_addr.sin6.sin6_addr.s6_addr, ipv6_src_buf); 912 SEGMENTED_IPv6_ADDR_GEN_SRP(&dest_addr.sin6.sin6_addr.s6_addr, ipv6_dest_buf); 913 INFO("sending %zd byte UDP response from " 914 PRI_SEGMENTED_IPv6_ADDR_SRP " port %d index %d to " PRI_SEGMENTED_IPv6_ADDR_SRP "#%d", 915 len, SEGMENTED_IPv6_ADDR_PARAM_SRP(&source_addr.sin6.sin6_addr.s6_addr, ipv6_src_buf), 916 ntohs(source_addr.sin6.sin6_port), ifindex, 917 SEGMENTED_IPv6_ADDR_PARAM_SRP(&dest_addr.sin6.sin6_addr.s6_addr, ipv6_dest_buf), 918 ntohs(dest_addr.sin6.sin6_port)); 919 } 920 status = sendmsg(comm->io.fd, &mh, 0); 921 if (status < 0) { 922 ERROR("%s", strerror(errno)); 923 return false; 924 } 925 return true; 926 } 927 #endif // !defined(IOLOOP_MACOS) || !UDP_LISTENER_USES_CONNECTION_GROUPS 928 929 #ifndef IOLOOP_MACOS 930 static bool 931 udp_send_response(comm_t *comm, message_t *responding_to, struct iovec *iov, int iov_len) 932 { 933 return udp_send_message(comm, &responding_to->local, &responding_to->src, responding_to->ifindex, iov, iov_len); 934 } 935 936 bool 937 ioloop_send_multicast(comm_t *comm, int ifindex, struct iovec *iov, int iov_len) 938 { 939 return udp_send_message(comm, &comm->multicast, ifindex, iov, iov_len); 940 } 941 942 static bool 943 udp_send_connected_response(comm_t *comm, message_t *responding_to, struct iovec *iov, int iov_len) 944 { 945 int status = writev(comm->io.fd, iov, iov_len); 946 (void)responding_to; 947 if (status < 0) { 948 ERROR("udp_send_connected: %s", strerror(errno)); 949 return false; 950 } 951 return true; 952 } 953 954 bool 955 ioloop_send_message(comm_t *connection, message_t *responding_to, struct iovec *iov, int iov_len) 956 { 957 if (connection->tcp_stream) { 958 return tcp_send_response(connection, responding_to, iov, iov_len, true); 959 } else { 960 if (connection->is_connected) { 961 return udp_send_connected_response(connection, responding_to, iov, iov_len); 962 } else if (connection->is_multicast) { 963 ERROR("ioloop_send_message: multicast send must use ioloop_send_multicast!"); 964 return false; 965 } else if (responding_to == NULL) { 966 ERROR("ioloop_send_message: not connected and no responding_to message."); 967 return false; 968 } else { 969 return udp_send_response(connection, responding_to, iov, iov_len); 970 } 971 } 972 } 973 974 bool 975 ioloop_send_final_message(comm_t *connection, message_t *responding_to, struct iovec *iov, int iov_len) 976 { 977 bool ret = ioloop_send_message(connection, responding_to, iov, iov_len); 978 if (ret) { 979 shutdown(connection->io.fd, SHUT_WR); 980 } 981 return ret; 982 } 983 984 bool 985 ioloop_send_data(comm_t *connection, message_t *responding_to, struct iovec *iov, int iov_len) 986 { 987 if (connection->tcp_stream) { 988 return tcp_send_response(connection, responding_to, iov, iov_len, false); 989 } 990 return ioloop_send_message(connection, responding_to, iov, iov_len); 991 } 992 993 bool 994 ioloop_send_final_data(comm_t *connection, message_t *responding_to, struct iovec *iov, int iov_len) 995 { 996 if (connection->tcp_stream) { 997 bool ret = tcp_send_response(connection, responding_to, iov, iov_len, false); 998 if (ret) { 999 shutdown(connection->io.fd, SHUT_WR); 1000 } 1001 return ret; 1002 } 1003 return ioloop_send_message(connection, responding_to, iov, iov_len); 1004 } 1005 1006 static void 1007 io_finalize(io_t *io) 1008 { 1009 io_t **iop; 1010 for (iop = &ios; *iop; iop = &(*iop)->next) { 1011 if (*iop == io) { 1012 *iop = io->next; 1013 break; 1014 } 1015 } 1016 free(io); 1017 } 1018 1019 // When a communication is closed, scan the io event list to see if any other ios are referencing this one. 1020 static void 1021 comm_finalize(io_t *io) 1022 { 1023 comm_t *comm = (comm_t *)io; 1024 ERROR("comm_finalize"); 1025 if (comm->name != NULL) { 1026 free(comm->name); 1027 } 1028 if (comm->finalize != NULL) { 1029 comm->finalize(comm->context); 1030 } 1031 if (comm->message != NULL) { 1032 RELEASE_HERE(comm->message, message); 1033 } 1034 io_finalize(&comm->io); 1035 } 1036 1037 void 1038 ioloop_comm_retain_(comm_t *comm, const char *file, int line) 1039 { 1040 (void)file; (void)line; 1041 RETAIN(&comm->io, comm); 1042 } 1043 1044 void 1045 ioloop_comm_release_(comm_t *comm, const char *file, int line) 1046 { 1047 (void)file; (void)line; 1048 RELEASE(&comm->io, comm); 1049 } 1050 1051 void 1052 ioloop_comm_cancel(comm_t *comm) 1053 { 1054 close(comm->io.fd); 1055 comm->io.fd = -1; 1056 } 1057 1058 void 1059 ioloop_comm_context_set(comm_t *comm, void *context, finalize_callback_t callback) 1060 { 1061 if (comm->context != NULL && comm->finalize != NULL) { 1062 comm->finalize(comm->context); 1063 } 1064 comm->finalize = callback; 1065 comm->context = context; 1066 } 1067 1068 void 1069 ioloop_comm_connect_callback_set(comm_t *comm, connect_callback_t callback) 1070 { 1071 comm->connected = callback; 1072 } 1073 1074 void 1075 ioloop_comm_disconnect_callback_set(comm_t *comm, disconnect_callback_t callback) 1076 { 1077 comm->disconnected = callback; 1078 } 1079 1080 void 1081 ioloop_listener_retain_(comm_t *listener, const char *file, int line) 1082 { 1083 RETAIN(&listener->io, comm); 1084 } 1085 1086 void 1087 ioloop_listener_release_(comm_t *listener, const char *file, int line) 1088 { 1089 RELEASE(&listener->io, comm); 1090 } 1091 1092 void 1093 ioloop_listener_cancel(comm_t *connection) 1094 { 1095 if (connection->io.fd != -1) { 1096 close(connection->io.fd); 1097 connection->io.fd = -1; 1098 } 1099 } 1100 1101 static void 1102 listen_callback(io_t *io, void *context) 1103 { 1104 comm_t *listener = (comm_t *)io; 1105 int rv; 1106 addr_t addr; 1107 socklen_t addr_len = sizeof addr; 1108 comm_t *comm; 1109 char addrbuf[INET6_ADDRSTRLEN + 7]; 1110 int addrlen; 1111 (void)context; 1112 1113 rv = accept(listener->io.fd, &addr.sa, &addr_len); 1114 if (rv < 0) { 1115 ERROR("accept: %s", strerror(errno)); 1116 close(listener->io.fd); 1117 listener->io.fd = -1; 1118 return; 1119 } 1120 inet_ntop(addr.sa.sa_family, (addr.sa.sa_family == AF_INET 1121 ? (void *)&addr.sin.sin_addr 1122 : (void *)&addr.sin6.sin6_addr), addrbuf, sizeof addrbuf); 1123 addrlen = strlen(addrbuf); 1124 snprintf(&addrbuf[addrlen], (sizeof addrbuf) - addrlen, "%%%d", 1125 ntohs((addr.sa.sa_family == AF_INET ? addr.sin.sin_port : addr.sin6.sin6_port))); 1126 comm = calloc(1, sizeof *comm); 1127 comm->name = strdup(addrbuf); 1128 comm->io.fd = rv; 1129 comm->address = addr; 1130 comm->datagram_callback = listener->datagram_callback; 1131 comm->tcp_stream = true; 1132 comm->context = listener->context; 1133 1134 if (listener->tls_context == (tls_context_t *)-1) { 1135 #ifndef EXCLUDE_TLS 1136 if (!srp_tls_listen_callback(comm)) { 1137 ERROR("TLS setup failed."); 1138 close(comm->io.fd); 1139 free(comm); 1140 return; 1141 } 1142 #else 1143 ERROR("TLS context not null in listen_callback when TLS excluded."); 1144 return; 1145 #endif 1146 } 1147 if (listener->connected) { 1148 listener->connected(comm, listener->context); 1149 } 1150 ioloop_add_reader(&comm->io, tcp_read_callback); 1151 1152 #ifdef SO_NOSIGPIPE 1153 int one = 1; 1154 rv = setsockopt(comm->io.fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof one); 1155 if (rv < 0) { 1156 ERROR("SO_NOSIGPIPE failed: %s", strerror(errno)); 1157 } 1158 #endif 1159 } 1160 1161 static void 1162 listener_ready_callback(io_t *io, void *context) 1163 { 1164 comm_t *listener = (comm_t *)io; 1165 if (listener->ready) { 1166 listener->ready(listener->context, listener->listen_port); 1167 } 1168 } 1169 1170 comm_t * 1171 ioloop_listener_create(bool stream, bool tls, bool inetd, uint16_t *UNUSED avoid_ports, int UNUSED num_avoid_ports, 1172 const addr_t *ip_address, const char *multicast, const char *name, 1173 datagram_callback_t datagram_callback, connect_callback_t connected, 1174 cancel_callback_t UNUSED cancel, ready_callback_t ready, finalize_callback_t finalize, 1175 tls_config_callback_t UNUSED tls_config, unsigned UNUSED ifindex, void *context) 1176 { 1177 comm_t *listener; 1178 socklen_t sl; 1179 int rv; 1180 int false_flag = 0; 1181 int true_flag = 1; 1182 uint16_t port; 1183 int family = (ip_address != NULL) ? ip_address->sa.sa_family : AF_UNSPEC; 1184 int real_family = family == AF_UNSPEC ? AF_INET6 : family; 1185 addr_t sockname; 1186 1187 listener = calloc(1, sizeof *listener); 1188 if (listener == NULL) { 1189 return NULL; 1190 } 1191 RETAIN_HERE(&listener->io, comm); 1192 listener->name = strdup(name); 1193 if (!listener->name) { 1194 RELEASE_HERE(&listener->io, comm); 1195 return NULL; 1196 } 1197 listener->io.fd = socket(real_family, stream ? SOCK_STREAM : SOCK_DGRAM, stream ? IPPROTO_TCP : IPPROTO_UDP); 1198 if (listener->io.fd < 0) { 1199 ERROR("Can't get socket: %s", strerror(errno)); 1200 goto out; 1201 } 1202 rv = setsockopt(listener->io.fd, SOL_SOCKET, SO_REUSEADDR, &true_flag, sizeof true_flag); 1203 if (rv < 0) { 1204 ERROR("SO_REUSEADDR failed: %s", strerror(errno)); 1205 goto out; 1206 } 1207 1208 rv = setsockopt(listener->io.fd, SOL_SOCKET, SO_REUSEPORT, &true_flag, sizeof true_flag); 1209 if (rv < 0) { 1210 ERROR("SO_REUSEPORT failed: %s", strerror(errno)); 1211 goto out; 1212 } 1213 1214 if (ip_address == NULL || family == AF_LOCAL) { 1215 port = 0; 1216 } else { 1217 port = (family == AF_INET) ? ip_address->sin.sin_port : ip_address->sin6.sin6_port; 1218 listener->address = *ip_address; 1219 } 1220 listener->address.sa.sa_family = real_family; 1221 1222 if (multicast != 0) { 1223 if (stream) { 1224 ERROR("Unable to do non-datagram multicast."); 1225 goto out; 1226 } 1227 if (family == AF_LOCAL) { 1228 ERROR("Multicast not supported on local sockets."); 1229 goto out; 1230 } 1231 sl = getipaddr(&listener->multicast, multicast); 1232 if (sl == 0) { 1233 goto out; 1234 } 1235 if (listener->multicast.sa.sa_family != family) { 1236 SEGMENTED_IPv6_ADDR_GEN_SRP(listener->address.sin6.sin6_addr.s6_addr, ipv6_addr_buf); 1237 ERROR("multicast address %s from different family than listen address " PRI_SEGMENTED_IPv6_ADDR_SRP ".", 1238 multicast, SEGMENTED_IPv6_ADDR_PARAM_SRP(listener->address.sin6.sin6_addr.s6_addr, ipv6_addr_buf)); 1239 goto out; 1240 } 1241 listener->is_multicast = true; 1242 1243 if (family == AF_INET) { 1244 struct ip_mreq im; 1245 int ttl = 255; 1246 im.imr_multiaddr = listener->multicast.sin.sin_addr; 1247 im.imr_interface.s_addr = 0; 1248 rv = setsockopt(listener->io.fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &im, sizeof im); 1249 if (rv < 0) { 1250 ERROR("Unable to join %s multicast group: %s", multicast, strerror(errno)); 1251 goto out; 1252 } 1253 rv = setsockopt(listener->io.fd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof ttl); 1254 if (rv < 0) { 1255 ERROR("Unable to set IP multicast TTL to 255 for %s: %s", multicast, strerror(errno)); 1256 goto out; 1257 } 1258 rv = setsockopt(listener->io.fd, IPPROTO_IP, IP_TTL, &ttl, sizeof ttl); 1259 if (rv < 0) { 1260 ERROR("Unable to set IP TTL to 255 for %s: %s", multicast, strerror(errno)); 1261 goto out; 1262 } 1263 rv = setsockopt(listener->io.fd, IPPROTO_IP, IP_MULTICAST_LOOP, &false_flag, sizeof false_flag); 1264 if (rv < 0) { 1265 ERROR("Unable to set IP Multcast loopback to false for %s: %s", multicast, strerror(errno)); 1266 goto out; 1267 } 1268 } else { 1269 struct ipv6_mreq im; 1270 int hops = 255; 1271 im.ipv6mr_multiaddr = listener->multicast.sin6.sin6_addr; 1272 im.ipv6mr_interface = 0; 1273 rv = setsockopt(listener->io.fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &im, sizeof im); 1274 if (rv < 0) { 1275 ERROR("Unable to join %s multicast group: %s", multicast, strerror(errno)); 1276 goto out; 1277 } 1278 rv = setsockopt(listener->io.fd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &hops, sizeof hops); 1279 if (rv < 0) { 1280 ERROR("Unable to set IPv6 multicast hops to 255 for %s: %s", multicast, strerror(errno)); 1281 goto out; 1282 } 1283 rv = setsockopt(listener->io.fd, IPPROTO_IPV6, IPV6_UNICAST_HOPS, &hops, sizeof hops); 1284 if (rv < 0) { 1285 ERROR("Unable to set IPv6 hops to 255 for %s: %s", multicast, strerror(errno)); 1286 goto out; 1287 } 1288 rv = setsockopt(listener->io.fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &false_flag, sizeof false_flag); 1289 if (rv < 0) { 1290 ERROR("Unable to set IPv6 Multcast loopback to false for %s: %s", multicast, strerror(errno)); 1291 goto out; 1292 } 1293 } 1294 } 1295 1296 if (family == AF_INET6) { 1297 // Don't use a dual-stack socket. 1298 rv = setsockopt(listener->io.fd, IPPROTO_IPV6, IPV6_V6ONLY, &true_flag, sizeof true_flag); 1299 if (rv < 0) { 1300 SEGMENTED_IPv6_ADDR_GEN_SRP(listener->address.sin6.sin6_addr.s6_addr, ipv6_addr_buf); 1301 ERROR("Unable to set IPv6-only flag on %s socket for " PRI_SEGMENTED_IPv6_ADDR_SRP, 1302 tls ? "TLS" : (stream ? "TCP" : "UDP"), 1303 SEGMENTED_IPv6_ADDR_PARAM_SRP(listener->address.sin6.sin6_addr.s6_addr, ipv6_addr_buf)); 1304 goto out; 1305 } 1306 } 1307 1308 #ifndef NOT_HAVE_SA_LEN 1309 sl = listener->address.sa.sa_len; 1310 #else 1311 sl = real_family == AF_INET ? sizeof(listener->address.sin) : sizeof(listener->address.sin6); 1312 #endif 1313 if (bind(listener->io.fd, &listener->address.sa, sl) < 0) { 1314 if (family == AF_INET) { 1315 IPv4_ADDR_GEN_SRP(&listener->address.sin.sin_addr.s_addr, ipv4_addr_buf); 1316 ERROR("Can't bind to " PRI_IPv4_ADDR_SRP "#%d/%s: %s", 1317 IPv4_ADDR_PARAM_SRP(&listener->address.sin.sin_addr.s_addr, ipv4_addr_buf), ntohs(port), 1318 tls ? "tlsv4" : "tcpv4", strerror(errno)); 1319 } else { 1320 SEGMENTED_IPv6_ADDR_GEN_SRP(&listener->address.sin6.sin6_addr.s6_addr, ipv6_addr_buf); 1321 ERROR("Can't bind to " PRI_SEGMENTED_IPv6_ADDR_SRP "#%d/%s: %s", 1322 SEGMENTED_IPv6_ADDR_PARAM_SRP(&listener->address.sin6.sin6_addr.s6_addr, ipv6_addr_buf), ntohs(port), 1323 tls ? "tlsv6" : "tcpv6", strerror(errno)); 1324 } 1325 out: 1326 close(listener->io.fd); 1327 listener->io.fd = -1; 1328 RELEASE_HERE(&listener->io, comm); 1329 return NULL; 1330 } 1331 1332 // We may have bound to an unspecified port, so fetch the port we got. 1333 if (port == 0 && family != AF_LOCAL) { 1334 if (getsockname(listener->io.fd, (struct sockaddr *)&sockname, &sl) < 0) { 1335 ERROR("ioloop_listener_create: getsockname: %s", strerror(errno)); 1336 goto out; 1337 } 1338 port = ntohs(real_family == AF_INET6 ? sockname.sin6.sin6_port : sockname.sin.sin_port); 1339 } 1340 listener->listen_port = port; 1341 1342 if (tls) { 1343 #ifndef EXCLUDE_TLS 1344 if (!stream) { 1345 ERROR("Asked to do TLS over UDP, which we don't do yet."); 1346 goto out; 1347 } 1348 listener->tls_context = (tls_context_t *)-1; 1349 #else 1350 ERROR("TLS requested when TLS is excluded."); 1351 goto out; 1352 #endif 1353 } 1354 1355 if (stream) { 1356 if (listen(listener->io.fd, 5 /* xxx */) < 0) { 1357 if (family == AF_INET) { 1358 IPv4_ADDR_GEN_SRP(&listener->address.sin.sin_addr.s_addr, ipv4_addr_buf); 1359 ERROR("Can't listen on " PRI_IPv4_ADDR_SRP "#%d/%s: %s", 1360 IPv4_ADDR_PARAM_SRP(&listener->address.sin.sin_addr.s_addr, ipv4_addr_buf), ntohs(port), 1361 tls ? "tlsv4" : "tcpv4", strerror(errno)); 1362 } else { 1363 SEGMENTED_IPv6_ADDR_GEN_SRP(&listener->address.sin6.sin6_addr.s6_addr, ipv6_addr_buf); 1364 ERROR("Can't listen on " PRI_SEGMENTED_IPv6_ADDR_SRP "#%d/%s: %s", 1365 SEGMENTED_IPv6_ADDR_PARAM_SRP(&listener->address.sin6.sin6_addr.s6_addr, ipv6_addr_buf), ntohs(port), 1366 tls ? "tlsv6" : "tcpv6", strerror(errno)); 1367 } 1368 goto out; 1369 } 1370 listener->finalize = finalize; 1371 ioloop_add_reader(&listener->io, listen_callback); 1372 listener->tcp_stream = true; 1373 } else { 1374 rv = setsockopt(listener->io.fd, family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6, 1375 family == AF_INET ? IP_PKTINFO : IPV6_RECVPKTINFO, &true_flag, sizeof true_flag); 1376 if (rv < 0) { 1377 ERROR("Can't set %s: %s.", family == AF_INET ? "IP_PKTINFO" : "IPV6_RECVPKTINFO", 1378 strerror(errno)); 1379 goto out; 1380 } 1381 ioloop_add_reader(&listener->io, udp_read_callback); 1382 } 1383 listener->datagram_callback = datagram_callback; 1384 listener->connected = connected; 1385 listener->context = context; 1386 listener->ready = ready; 1387 listener->io.ready = listener_ready_callback; 1388 listener->io.context = listener; 1389 listener->is_listener = true; 1390 return listener; 1391 } 1392 1393 // This is the callback for when we complete the handshake when connecting to a remote listener. 1394 static void 1395 connect_callback(io_t *io, void *context) 1396 { 1397 int result; 1398 socklen_t len = sizeof result; 1399 comm_t *connection = (comm_t *)io; 1400 bool getsockopt_failed = false; 1401 (void)context; 1402 1403 // If connect failed, indicate that it failed. 1404 if (getsockopt(io->fd, SOL_SOCKET, SO_ERROR, &result, &len) < 0) { 1405 result = errno; 1406 getsockopt_failed = true; 1407 } 1408 if (result != 0) { 1409 ERROR("connect_callback: %ssocket %d: Error %d (%s)", getsockopt_failed ? "getsockopt " : "", 1410 io->fd, result, strerror(result)); 1411 connection->disconnected(connection, connection->context, result); 1412 ioloop_comm_cancel(connection); 1413 return; 1414 } 1415 1416 // If this is a TLS connection, set up TLS. 1417 if (connection->tls_context == (tls_context_t *)-1) { 1418 #ifndef EXCLUDE_TLS 1419 if (!srp_tls_connect_callback(connection)) { 1420 connection->disconnected(connection, connection->context, 0); 1421 ioloop_comm_cancel(connection); 1422 return; 1423 } 1424 #else 1425 ERROR("connect_callback: tls_context triggered with TLS excluded."); 1426 connection->disconnected(connection, connection->context, 0); 1427 ioloop_comm_cancel(connection); 1428 return; 1429 #endif 1430 } 1431 1432 // We don't want to say we're connected until the TLS handshake is complete. 1433 if (!connection->tls_handshake_incomplete) { 1434 connection->connected(connection, connection->context); 1435 } 1436 drop_writer(&connection->io); 1437 ioloop_add_reader(&connection->io, tcp_read_callback); 1438 } 1439 1440 // Currently we don't do DNS lookups, despite the host identifier being an IP address. 1441 comm_t *NULLABLE 1442 ioloop_connection_create(addr_t *remote_address, bool tls, bool stream, bool stable, bool opportunistic, 1443 datagram_callback_t datagram_callback, connect_callback_t connected, 1444 disconnect_callback_t disconnected, finalize_callback_t finalize, 1445 void * context) 1446 { 1447 comm_t *connection; 1448 socklen_t sl; 1449 char buf[INET6_ADDRSTRLEN + 7]; 1450 char *s; 1451 1452 if (!stream && (connected != NULL || disconnected != NULL)) { 1453 ERROR("connected and disconnected callbacks not valid for datagram connections"); 1454 return NULL; 1455 } 1456 if (stream && (connected == NULL || disconnected == NULL)) { 1457 ERROR("connected and disconnected callbacks are required for stream connections"); 1458 return NULL; 1459 } 1460 connection = calloc(1, sizeof *connection); 1461 if (connection == NULL) { 1462 ERROR("No memory for connection structure."); 1463 return NULL; 1464 } 1465 RETAIN_HERE(&connection->io, comm); 1466 if (inet_ntop(remote_address->sa.sa_family, (remote_address->sa.sa_family == AF_INET 1467 ? (void *)&remote_address->sin.sin_addr 1468 : (void *)&remote_address->sin6.sin6_addr), buf, 1469 INET6_ADDRSTRLEN) == NULL) { 1470 ERROR("inet_ntop failed to convert remote address: %s", strerror(errno)); 1471 RELEASE_HERE(&connection->io, comm); 1472 return NULL; 1473 } 1474 s = buf + strlen(buf); 1475 sprintf(s, "%%%hu", ntohs(remote_address->sa.sa_family == AF_INET 1476 ? remote_address->sin.sin_port 1477 : remote_address->sin6.sin6_port)); 1478 connection->name = strdup(buf); 1479 if (!connection->name) { 1480 RELEASE_HERE(&connection->io, comm); 1481 return NULL; 1482 } 1483 connection->io.fd = socket(remote_address->sa.sa_family, 1484 stream ? SOCK_STREAM : SOCK_DGRAM, stream ? IPPROTO_TCP : IPPROTO_UDP); 1485 if (connection->io.fd < 0) { 1486 ERROR("Can't get socket: %s", strerror(errno)); 1487 RELEASE_HERE(&connection->io, comm); 1488 return NULL; 1489 } 1490 connection->address = *remote_address; 1491 if (fcntl(connection->io.fd, F_SETFL, O_NONBLOCK) < 0) { 1492 ERROR("connect_to_host: %s: Can't set O_NONBLOCK: %s", connection->name, strerror(errno)); 1493 RELEASE_HERE(&connection->io, comm); 1494 return NULL; 1495 } 1496 // If a stable address has been requested, request a public address in source address selection. 1497 if (stable && remote_address->sa.sa_family == AF_INET6) { 1498 // Linux doesn't currently follow RFC5014. These values are defined in linux/in6.h, but this can't be 1499 // safely included because it's incompatible with netinet/in.h. So until this is fixed, these values 1500 // are just copied out of the header; when it is fixed, the #if condition will evaluate to false. 1501 #if defined(LINUX) 1502 # if !defined(IPV6_PREFER_SRC_PUBLIC) 1503 # define IPV6_PREFER_SRC_TMP 0x0001 1504 # define IPV6_PREFER_SRC_PUBLIC 0x0002 1505 # define IPV6_PREFER_SRC_PUBTMP_DEFAULT 0x0100 1506 # endif 1507 int value = IPV6_PREFER_SRC_PUBLIC; 1508 if (setsockopt(connection->io.fd, IPPROTO_IPV6, IPV6_ADDR_PREFERENCES, &value, sizeof(value)) < 0) { 1509 ERROR("unable to request stable (public) address: %s", strerror(errno)); 1510 return NULL; 1511 } 1512 #else // Assume BSD 1513 // BSD doesn't follow RFC5014 either (at least xnu). 1514 int value = 0; 1515 if (setsockopt(connection->io.fd, IPPROTO_IPV6, IPV6_PREFER_TEMPADDR, &value, sizeof(value)) < 0) { 1516 ERROR("unable to request stable (public) address."); 1517 return NULL; 1518 } 1519 #endif // LINUX 1520 } 1521 #ifdef NOT_HAVE_SA_LEN 1522 sl = (remote_address->sa.sa_family == AF_INET 1523 ? sizeof remote_address->sin 1524 : sizeof remote_address->sin6); 1525 #else 1526 sl = remote_address->sa.sa_len; 1527 #endif 1528 // Connect to the host 1529 if (connect(connection->io.fd, &connection->address.sa, sl) < 0) { 1530 if (errno != EINPROGRESS && errno != EAGAIN) { 1531 ERROR("Can't connect to %s: %s", connection->name, strerror(errno)); 1532 RELEASE_HERE(&connection->io, comm); 1533 return NULL; 1534 } 1535 } 1536 // At this point if we are doing TCP, we do not yet have a connection, but the connection should be in 1537 // progress, and we should get a write select event when the connection succeeds or fails. 1538 // UDP is connectionless, so the connect() call just sets the default destination for send() on 1539 // the socket. 1540 1541 if (tls) { 1542 #ifndef TLS_EXCLUDED 1543 connection->tls_context = (tls_context_t *)-1; 1544 #else 1545 ERROR("connect_to_host: tls requested when excluded."); 1546 RELEASE_HERE(&connection->io, comm); 1547 return NULL; 1548 #endif 1549 } 1550 1551 connection->connected = connected; 1552 connection->disconnected = disconnected; 1553 connection->datagram_callback = datagram_callback; 1554 connection->context = context; 1555 connection->finalize = finalize; 1556 connection->opportunistic = opportunistic; 1557 if (!stream) { 1558 connection->is_connected = true; 1559 connection->tcp_stream = false; 1560 ioloop_add_reader(&connection->io, udp_read_callback); 1561 } else { 1562 connection->tcp_stream = true; 1563 ioloop_add_writer(&connection->io, connect_callback); 1564 } 1565 1566 return connection; 1567 } 1568 1569 static void 1570 subproc_finalize(subproc_t *subproc) 1571 { 1572 int i; 1573 for (i = 0; i < subproc->argc; i++) { 1574 if (subproc->argv[i] != NULL) { 1575 free(subproc->argv[i]); 1576 subproc->argv[i] = NULL; 1577 } 1578 } 1579 if (subproc->output_fd != NULL) { 1580 ioloop_file_descriptor_release(subproc->output_fd); 1581 } 1582 if (subproc->finalize != NULL) { 1583 subproc->finalize(subproc->context); 1584 } 1585 free(subproc); 1586 } 1587 1588 static void 1589 subproc_output_finalize(void *context) 1590 { 1591 subproc_t *subproc = context; 1592 if (subproc->output_fd) { 1593 subproc->output_fd = NULL; 1594 } 1595 RELEASE_HERE(subproc, subproc); 1596 } 1597 1598 void 1599 ioloop_subproc_release_(subproc_t *subproc, const char *file, int line) 1600 { 1601 RELEASE(subproc, subproc); 1602 } 1603 1604 // Invoke the specified executable with the specified arguments. Call callback when it exits. 1605 // All failures are reported through the callback. 1606 subproc_t * 1607 ioloop_subproc(const char *exepath, char **argv, int argc, subproc_callback_t callback, 1608 io_callback_t output_callback, void *context) 1609 { 1610 subproc_t *subproc; 1611 int i, rv; 1612 posix_spawn_file_actions_t actions; 1613 posix_spawnattr_t attrs; 1614 1615 if (callback == NULL) { 1616 ERROR("ioloop_subproc called with null callback"); 1617 return NULL; 1618 } 1619 1620 if (argc > MAX_SUBPROC_ARGS) { 1621 callback(NULL, 0, "too many subproc args"); 1622 return NULL; 1623 } 1624 1625 subproc = calloc(1, sizeof(*subproc)); 1626 if (subproc == NULL) { 1627 callback(NULL, 0, "out of memory"); 1628 return NULL; 1629 } 1630 RETAIN_HERE(subproc, subproc); 1631 if (output_callback != NULL) { 1632 rv = pipe(subproc->pipe_fds); 1633 if (rv < 0) { 1634 callback(NULL, 0, "unable to create pipe."); 1635 RELEASE_HERE(subproc, subproc); 1636 return NULL; 1637 } 1638 subproc->output_fd = ioloop_file_descriptor_create(subproc->pipe_fds[0], subproc, subproc_output_finalize); 1639 if (subproc->output_fd == NULL) { 1640 // subproc->output_fd holds a reference to subproc. 1641 RETAIN_HERE(subproc, subproc); 1642 callback(NULL, 0, "out of memory."); 1643 close(subproc->pipe_fds[0]); 1644 close(subproc->pipe_fds[1]); 1645 RELEASE_HERE(subproc, subproc); 1646 return NULL; 1647 } 1648 } 1649 1650 subproc->argv[0] = strdup(exepath); 1651 if (subproc->argv[0] == NULL) { 1652 RELEASE_HERE(subproc, subproc); 1653 callback(NULL, 0, "out of memory"); 1654 return NULL; 1655 } 1656 subproc->argc++; 1657 for (i = 0; i < argc; i++) { 1658 subproc->argv[i + 1] = strdup(argv[i]); 1659 if (subproc->argv[i + 1] == NULL) { 1660 RELEASE_HERE(subproc, subproc); 1661 callback(NULL, 0, "out of memory"); 1662 return NULL; 1663 } 1664 subproc->argc++; 1665 } 1666 1667 // Set up for posix_spawn 1668 posix_spawn_file_actions_init(&actions); 1669 if (output_callback != NULL) { 1670 posix_spawn_file_actions_adddup2(&actions, subproc->pipe_fds[1], STDOUT_FILENO); 1671 posix_spawn_file_actions_addclose(&actions, subproc->pipe_fds[0]); 1672 posix_spawn_file_actions_addclose(&actions, subproc->pipe_fds[1]); 1673 } 1674 posix_spawnattr_init(&attrs); 1675 extern char **environ; 1676 rv = posix_spawn(&subproc->pid, exepath, &actions, &attrs, subproc->argv, environ); 1677 posix_spawn_file_actions_destroy(&actions); 1678 posix_spawnattr_destroy(&attrs); 1679 if (rv != 0) { 1680 int err = rv < 0 ? errno : rv; 1681 ERROR("posix_spawn failed for %s: %s", subproc->argv[0], strerror(err)); 1682 callback(subproc, 0, strerror(err)); 1683 RELEASE_HERE(subproc, subproc); 1684 return NULL; 1685 } 1686 subproc->callback = callback; 1687 subproc->context = context; 1688 subproc->next = subprocesses; 1689 subprocesses = subproc; 1690 RETAIN_HERE(subproc, subproc); 1691 1692 // Now that we have a viable subprocess, add the reader callback. 1693 if (output_callback != NULL && subproc->output_fd != NULL) { 1694 close(subproc->pipe_fds[1]); 1695 ioloop_add_reader(subproc->output_fd, output_callback); 1696 } 1697 return subproc; 1698 } 1699 1700 void 1701 ioloop_subproc_run_sync(subproc_t *subproc) 1702 { 1703 int nev; 1704 RETAIN_HERE(subproc, subproc); 1705 do { 1706 nev = ioloop_events(0); 1707 INFO("%d events", nev); 1708 if (subproc->finished) { 1709 RELEASE_HERE(subproc, subproc); 1710 return; 1711 } 1712 } while (nev >= 0); 1713 ERROR("ioloop returned %d.", nev); 1714 } 1715 1716 #ifndef EXCLUDE_DNSSD_TXN_SUPPORT 1717 static void 1718 dnssd_txn_callback(io_t *io, void *context) 1719 { 1720 dnssd_txn_t *txn = (dnssd_txn_t *)context; 1721 // It's only safe to process the I/O if the DNSServiceRef hasn't been deallocated. 1722 if (txn->sdref != NULL) { 1723 int status = DNSServiceProcessResult(txn->sdref); 1724 if (status != kDNSServiceErr_NoError) { 1725 if (txn->failure_callback != NULL) { 1726 txn->failure_callback(txn->context, status); 1727 } else { 1728 INFO("status %d", status); 1729 } 1730 ioloop_dnssd_txn_cancel(txn); 1731 } 1732 } 1733 } 1734 1735 void 1736 dnssd_txn_finalize(dnssd_txn_t *txn) 1737 { 1738 if (txn->sdref != NULL) { 1739 ioloop_dnssd_txn_cancel(txn); 1740 } 1741 if (txn->finalize_callback) { 1742 txn->finalize_callback(txn->context); 1743 } 1744 } 1745 1746 void 1747 dnssd_txn_io_finalize(void *context) 1748 { 1749 dnssd_txn_t *txn = context; 1750 txn->io = NULL; 1751 RELEASE_HERE(txn, dnssd_txn); 1752 } 1753 1754 void 1755 ioloop_dnssd_txn_cancel(dnssd_txn_t *txn) 1756 { 1757 if (txn->sdref != NULL) { 1758 DNSServiceRefDeallocate(txn->sdref); 1759 txn->sdref = NULL; 1760 } else { 1761 INFO("dead transaction."); 1762 } 1763 if (txn->io != NULL) { 1764 txn->io->fd = -1; 1765 RELEASE_HERE(txn->io, file_descriptor); 1766 } 1767 } 1768 1769 void 1770 ioloop_dnssd_txn_retain_(dnssd_txn_t *dnssd_txn, const char *file, int line) 1771 { 1772 (void)file; (void)line; 1773 RETAIN(dnssd_txn, dnssd_txn); 1774 } 1775 1776 void 1777 ioloop_dnssd_txn_release_(dnssd_txn_t *dnssd_txn, const char *file, int line) 1778 { 1779 (void)file; (void)line; 1780 RELEASE(dnssd_txn, dnssd_txn); 1781 } 1782 1783 dnssd_txn_t * 1784 ioloop_dnssd_txn_add_subordinate_(DNSServiceRef ref, void *context, 1785 dnssd_txn_finalize_callback_t callback, dnssd_txn_failure_callback_t failure_callback, 1786 const char *file, int line) 1787 { 1788 dnssd_txn_t *txn = calloc(1, sizeof(*txn)); 1789 if (txn != NULL) { 1790 RETAIN(txn, dnssd_txn); 1791 txn->sdref = ref; 1792 txn->finalize_callback = callback; 1793 txn->failure_callback = failure_callback; 1794 txn->context = context; 1795 } 1796 return txn; 1797 } 1798 1799 dnssd_txn_t * 1800 ioloop_dnssd_txn_add_(DNSServiceRef ref, void *context, 1801 dnssd_txn_finalize_callback_t callback, dnssd_txn_failure_callback_t failure_callback, 1802 const char *file, int line) 1803 { 1804 dnssd_txn_t *txn = ioloop_dnssd_txn_add_subordinate_(ref, context, callback, failure_callback, file, line); 1805 if (txn != NULL) { 1806 txn->io = ioloop_file_descriptor_create(DNSServiceRefSockFD(txn->sdref), txn, dnssd_txn_io_finalize); 1807 if (txn->io == NULL) { 1808 RELEASE_HERE(txn, dnssd_txn); 1809 return NULL; 1810 } 1811 // io holds a reference to txn 1812 RETAIN_HERE(txn, dnssd_txn); 1813 ioloop_add_reader(txn->io, dnssd_txn_callback); 1814 } 1815 return txn; 1816 } 1817 1818 void 1819 ioloop_dnssd_txn_set_aux_pointer(dnssd_txn_t *NONNULL txn, void *aux_pointer) 1820 { 1821 txn->aux_pointer = aux_pointer; 1822 } 1823 1824 void * 1825 ioloop_dnssd_txn_get_aux_pointer(dnssd_txn_t *NONNULL txn) 1826 { 1827 return txn->aux_pointer; 1828 } 1829 1830 void * 1831 ioloop_dnssd_txn_get_context(dnssd_txn_t *NONNULL txn) 1832 { 1833 return txn->context; 1834 } 1835 #endif // EXCLUDE_DNSSD_TXN_SUPPORT 1836 1837 static void 1838 file_descriptor_finalize(void *context) 1839 { 1840 io_t *file_descriptor = context; 1841 if (file_descriptor->finalize) { 1842 file_descriptor->finalize(file_descriptor->context); 1843 } 1844 if (file_descriptor->fd != -1) { 1845 close(file_descriptor->fd); 1846 } 1847 free(file_descriptor); 1848 } 1849 1850 void 1851 ioloop_file_descriptor_retain_(io_t *file_descriptor, const char *file, int line) 1852 { 1853 (void)file; (void)line; 1854 RETAIN(file_descriptor, file_descriptor); 1855 } 1856 1857 void 1858 ioloop_file_descriptor_release_(io_t *file_descriptor, const char *file, int line) 1859 { 1860 (void)file; (void)line; 1861 RELEASE(file_descriptor, file_descriptor); 1862 } 1863 1864 io_t * 1865 ioloop_file_descriptor_create_(int fd, void *context, finalize_callback_t finalize, const char *file, int line) 1866 { 1867 io_t *ret; 1868 ret = calloc(1, sizeof(*ret)); 1869 if (ret) { 1870 ret->fd = fd; 1871 ret->context = context; 1872 ret->finalize = finalize; 1873 ret->io_finalize = file_descriptor_finalize; 1874 RETAIN(ret, file_descriptor); 1875 } 1876 return ret; 1877 } 1878 1879 void 1880 ioloop_run_async(async_callback_t callback, void *context) 1881 { 1882 async_event_t **epp, *event = calloc(1, sizeof(*event)); 1883 if (event == NULL) { 1884 ERROR("no memory for async callback to %p, context %p", callback, context); 1885 } 1886 1887 event->callback = callback; 1888 event->context = context; 1889 1890 epp = &async_events; 1891 while (*epp) { 1892 epp = &(*epp)->next; 1893 } 1894 1895 *epp = event; 1896 } 1897 1898 const struct sockaddr * 1899 connection_get_local_address(message_t *message) 1900 { 1901 if (message == NULL) { 1902 ERROR("message is NULL."); 1903 return NULL; 1904 } 1905 return &message->local.sa; 1906 } 1907 #endif // !defined(IOLOOP_MACOS) 1908 1909 // Local Variables: 1910 // mode: C 1911 // tab-width: 4 1912 // c-file-style: "bsd" 1913 // c-basic-offset: 4 1914 // fill-column: 108 1915 // indent-tabs-mode: nil 1916 // End: 1917