Home | History | Annotate | Line # | Download | only in ServiceRegistration
macos-ioloop.c revision 1.1.1.1
      1 /* macos-ioloop.c
      2  *
      3  * Copyright (c) 2018-2024 Apple Inc. All rights reserved.
      4  *
      5  * Licensed under the Apache License, Version 2.0 (the "License");
      6  * you may not use this file except in compliance with the License.
      7  * You may obtain a copy of the License at
      8  *
      9  *     https://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  *
     17  * Simple event dispatcher for DNS.
     18  */
     19 
     20 #define _GNU_SOURCE
     21 
     22 #include <stdlib.h>
     23 #include <string.h>
     24 #include <stdio.h>
     25 #include <unistd.h>
     26 #include <sys/uio.h>
     27 #include <errno.h>
     28 #include <sys/socket.h>
     29 #include <netinet/in.h>
     30 #include <arpa/inet.h>
     31 #include <sys/wait.h>
     32 #include <fcntl.h>
     33 #include <sys/time.h>
     34 #include <signal.h>
     35 #include <net/if.h>
     36 #include <ifaddrs.h>
     37 #include <dns_sd.h>
     38 
     39 #include <CoreUtils/SystemUtils.h>  // For `IsAppleTV()`.
     40 #include <dispatch/dispatch.h>
     41 
     42 #include "srp.h"
     43 #include "dns-msg.h"
     44 #include "srp-crypto.h"
     45 #include "ioloop.h"
     46 #include "tls-macos.h"
     47 #include "tls-keychain.h"
     48 #include "srp-dnssd.h"
     49 #include "ifpermit.h"
     50 
     51 dispatch_queue_t ioloop_main_queue;
     52 static int cur_connection_serial;
     53 
     54 // Forward references
     55 static void ioloop_tcp_input_start(comm_t *NONNULL connection);
     56 static void listener_finalize(comm_t *listener);
     57 static bool connection_write_now(comm_t *NONNULL connection);
     58 static bool ioloop_listener_connection_ready(comm_t *connection);
     59 
     60 #define DSCP_CS5 0x28
     61 
     62 int
     63 getipaddr(addr_t *addr, const char *p)
     64 {
     65     if (inet_pton(AF_INET, p, &addr->sin.sin_addr)) {
     66         addr->sa.sa_family = AF_INET;
     67 #ifndef NOT_HAVE_SA_LEN
     68         addr->sa.sa_len = sizeof addr->sin;
     69 #endif
     70         return sizeof addr->sin;
     71     }  else if (inet_pton(AF_INET6, p, &addr->sin6.sin6_addr)) {
     72         addr->sa.sa_family = AF_INET6;
     73 #ifndef NOT_HAVE_SA_LEN
     74         addr->sa.sa_len = sizeof addr->sin6;
     75 #endif
     76         return sizeof addr->sin6;
     77     } else {
     78         return 0;
     79     }
     80 }
     81 
     82 int64_t
     83 ioloop_timenow(void)
     84 {
     85     int64_t now;
     86     struct timeval tv;
     87     gettimeofday(&tv, 0);
     88     now = (int64_t)tv.tv_sec * 1000 + (int64_t)tv.tv_usec / 1000;
     89     return now;
     90 }
     91 
     92 static void
     93 wakeup_event(void *context)
     94 {
     95     wakeup_t *wakeup = context;
     96     void *wakeup_context = wakeup->context;
     97     finalize_callback_t wakeup_finalize = wakeup->finalize;
     98     wakeup->context = NULL;
     99     wakeup->finalize = NULL;
    100 
    101     // All ioloop wakeups are one-shot.
    102     ioloop_cancel_wake_event(wakeup);
    103 
    104     // Call the callback, which mustn't be null.
    105     wakeup->wakeup(wakeup_context);
    106 
    107     // We have to call the finalize callback after the event has been delivered, in case we hold the only reference
    108     // on the object.
    109     if (wakeup_context != NULL && wakeup_finalize != NULL) {
    110         wakeup_finalize(wakeup_context);
    111     }
    112 }
    113 
    114 static void
    115 wakeup_finalize(void *context)
    116 {
    117     wakeup_t *wakeup = context;
    118     if (wakeup->ref_count == 0) {
    119         if (wakeup->dispatch_source != NULL) {
    120             dispatch_release(wakeup->dispatch_source);
    121             wakeup->dispatch_source = NULL;
    122         }
    123         void *wakeup_context = wakeup->context;
    124         finalize_callback_t wakeup_finalize = wakeup->finalize;
    125         wakeup->finalize = NULL;
    126         wakeup->context = NULL;
    127         if (wakeup_finalize != NULL && wakeup_context != NULL) {
    128             wakeup_finalize(wakeup_context);
    129         }
    130         free(wakeup);
    131     }
    132 }
    133 
    134 void
    135 ioloop_wakeup_retain_(wakeup_t *wakeup, const char *file, int line)
    136 {
    137     (void)file; (void)line;
    138     RETAIN(wakeup, wakeup);
    139 }
    140 
    141 void
    142 ioloop_wakeup_release_(wakeup_t *wakeup, const char *file, int line)
    143 {
    144     (void)file; (void)line;
    145     RELEASE(wakeup, wakeup);
    146 }
    147 
    148 wakeup_t *
    149 ioloop_wakeup_create_(const char *file, int line)
    150 {
    151     wakeup_t *ret = calloc(1, sizeof(*ret));
    152     if (ret) {
    153         RETAIN(ret, wakeup);
    154     }
    155     return ret;
    156 }
    157 
    158 bool
    159 ioloop_add_wake_event(wakeup_t *wakeup, void *context, wakeup_callback_t callback, wakeup_callback_t finalize,
    160                       int32_t milliseconds)
    161 {
    162     if (callback == NULL) {
    163         ERROR("ioloop_add_wake_event called with null callback");
    164         return false;
    165     }
    166     if (milliseconds < 0) {
    167         ERROR("ioloop_add_wake_event called with negative timeout");
    168         return false;
    169     }
    170     if (wakeup->dispatch_source != NULL) {
    171         ioloop_cancel_wake_event(wakeup);
    172     }
    173     wakeup->wakeup = callback;
    174     wakeup->context = context;
    175     wakeup->finalize = finalize;
    176 
    177     wakeup->dispatch_source = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, ioloop_main_queue);
    178     if (wakeup->dispatch_source == NULL) {
    179         ERROR("dispatch_source_create failed in ioloop_add_wake_event().");
    180         return false;
    181     }
    182     dispatch_source_set_event_handler_f(wakeup->dispatch_source, wakeup_event);
    183     dispatch_set_context(wakeup->dispatch_source, wakeup);
    184 
    185     // libdispatch doesn't allow events that are scheduled to happen right now. But it is actually useful to be
    186     // able to trigger an event to happen immediately, and this is the easiest way to do it from ioloop-we
    187     // can't rely on just scheduling an asynchronous event on an event loop because that's specific to Mac.
    188     if (milliseconds <= 0) {
    189         ERROR("ioloop_add_wake_event: milliseconds = %d", milliseconds);
    190         milliseconds = 10;
    191     }
    192     dispatch_source_set_timer(wakeup->dispatch_source,
    193                               dispatch_time(DISPATCH_TIME_NOW, milliseconds * NSEC_PER_SEC / 1000),
    194                               milliseconds * NSEC_PER_SEC / 1000, NSEC_PER_SEC / 100);
    195     dispatch_resume(wakeup->dispatch_source);
    196 
    197     return true;
    198 }
    199 
    200 void
    201 ioloop_cancel_wake_event(wakeup_t *wakeup)
    202 {
    203     if (wakeup != NULL) {
    204         if (wakeup->dispatch_source != NULL) {
    205             dispatch_source_cancel(wakeup->dispatch_source);
    206             dispatch_release(wakeup->dispatch_source);
    207             wakeup->dispatch_source = NULL;
    208         }
    209         if (wakeup->context != NULL) {
    210             void *wakeup_context = wakeup->context;
    211             finalize_callback_t wakeup_finalize = wakeup->finalize;
    212             wakeup->context = NULL;
    213             wakeup->finalize = NULL;
    214             if (wakeup_finalize != NULL && wakeup_context != NULL) {
    215                 wakeup_finalize(wakeup_context);
    216             }
    217         }
    218     }
    219 }
    220 
    221 bool
    222 ioloop_init(void)
    223 {
    224     ioloop_main_queue = dispatch_get_main_queue();
    225     dispatch_retain(ioloop_main_queue);
    226     return true;
    227 }
    228 
    229 int
    230 ioloop(void)
    231 {
    232     dispatch_main();
    233     return 0;
    234 }
    235 
    236 #define connection_cancel(comm, conn) connection_cancel_(comm, conn, __FILE__, __LINE__)
    237 static void
    238 connection_cancel_(comm_t *comm, nw_connection_t connection, const char *file, int line)
    239 {
    240     if (connection == NULL) {
    241         INFO("null connection at " PUB_S_SRP ":%d", file, line);
    242     } else {
    243         INFO("%p: " PUB_S_SRP " " PUB_S_SRP ":%d" , connection, comm->canceled ? " (already canceled)" : "", file, line);
    244         if (!comm->canceled) {
    245             nw_connection_cancel(connection);
    246             comm->canceled = true;
    247         }
    248     }
    249 }
    250 
    251 static void
    252 comm_finalize(comm_t *comm)
    253 {
    254     ERROR("comm_finalize");
    255     if (comm->connection != NULL) {
    256         nw_release(comm->connection);
    257         nw_connection_finalized++;
    258         comm->connection = NULL;
    259     }
    260     if (comm->listener != NULL) {
    261         nw_release(comm->listener);
    262         nw_listener_finalized++;
    263         comm->listener = NULL;
    264     }
    265     if (comm->parameters) {
    266         nw_release(comm->parameters);
    267         comm->parameters = NULL;
    268     }
    269     if (comm->pending_write != NULL) {
    270         dispatch_release(comm->pending_write);
    271         comm->pending_write = NULL;
    272     }
    273 
    274     if (comm->listener_state != NULL) {
    275         RELEASE_HERE(comm->listener_state, listener);
    276         comm->listener_state = NULL;
    277     }
    278 #if UDP_LISTENER_USES_CONNECTION_GROUPS
    279     if (comm->content_context != NULL) {
    280         nw_release(comm->content_context);
    281         comm->content_context = NULL;
    282     }
    283 #endif
    284 
    285     // If there is an nw_connection_t or nw_listener_t outstanding, then we will get an asynchronous callback
    286     // later on.  So we can't actually free the data structure yet, but the good news is that comm_finalize() will
    287     // be called again later when the last outstanding asynchronous cancel is done, and then all of the stuff
    288     // that follows this will happen.
    289 #ifndef __clang_analyzer__
    290     if (comm->ref_count > 0) {
    291         return;
    292     }
    293 #endif
    294     if (comm->idle_timer != NULL) {
    295         ioloop_cancel_wake_event(comm->idle_timer);
    296         RELEASE_HERE(comm->idle_timer, wakeup);
    297     }
    298     if (comm->name != NULL) {
    299         free(comm->name);
    300     }
    301     if (comm->finalize != NULL) {
    302         comm->finalize(comm->context);
    303     }
    304     free(comm);
    305 }
    306 
    307 void
    308 ioloop_comm_retain_(comm_t *comm, const char *file, int line)
    309 {
    310     (void)file; (void)line;
    311     RETAIN(comm, comm);
    312 }
    313 
    314 void
    315 ioloop_comm_release_(comm_t *comm, const char *file, int line)
    316 {
    317     (void)file; (void)line;
    318     RELEASE(comm, comm);
    319 }
    320 
    321 void
    322 ioloop_comm_cancel(comm_t *connection)
    323 {
    324     if (connection->connection != NULL) {
    325         INFO("%p %p", connection, connection->connection);
    326         connection_cancel(connection, connection->connection);
    327 #if UDP_LISTENER_USES_CONNECTION_GROUPS
    328     } else if (connection->connection_group != NULL) {
    329         INFO("%p %p", connection, connection->connection_group);
    330         nw_connection_group_cancel(connection->connection_group);
    331 #else
    332     }
    333     if (!connection->tcp_stream && connection->connection == NULL) {
    334         int fd = connection->io.fd;
    335         if (fd != -1) {
    336             ioloop_close(&connection->io);
    337             if (connection->cancel != NULL) {
    338                 RETAIN_HERE(connection, listener);
    339                 dispatch_async(ioloop_main_queue, ^{
    340                         if (connection->cancel != NULL) {
    341                             connection->cancel(connection, connection->context);
    342                         }
    343                         RELEASE_HERE(connection, listener);
    344                     });
    345             }
    346         }
    347 #endif // UDP_LISTENER_USES_CONNECTION_GROUPS
    348     }
    349     if (connection->idle_timer != NULL) {
    350         ioloop_cancel_wake_event(connection->idle_timer);
    351     }
    352 }
    353 
    354 void
    355 ioloop_comm_context_set(comm_t *comm, void *context, finalize_callback_t callback)
    356 {
    357     if (comm->context != NULL && comm->finalize != NULL) {
    358         comm->finalize(comm->context);
    359     }
    360     comm->finalize = callback;
    361     comm->context = context;
    362 }
    363 
    364 void
    365 ioloop_comm_connect_callback_set(comm_t *comm, connect_callback_t callback)
    366 {
    367     comm->connected = callback;
    368 }
    369 
    370 void
    371 ioloop_comm_disconnect_callback_set(comm_t *comm, disconnect_callback_t callback)
    372 {
    373     comm->disconnected = callback;
    374 }
    375 
    376 static void
    377 message_finalize(message_t *message)
    378 {
    379     free(message);
    380 }
    381 
    382 void
    383 ioloop_message_retain_(message_t *message, const char *file, int line)
    384 {
    385     (void)file; (void)line;
    386     RETAIN(message, message);
    387 }
    388 
    389 void
    390 ioloop_message_release_(message_t *message, const char *file, int line)
    391 {
    392     (void)file; (void)line;
    393     RELEASE(message, message);
    394 }
    395 
    396 static bool
    397 ioloop_send_message_inner(comm_t *connection, message_t *responding_to,
    398                           struct iovec *iov, int iov_len, bool final, bool send_length)
    399 {
    400     dispatch_data_t data = NULL, new_data, combined;
    401     int i;
    402     uint16_t len = 0;
    403 
    404 #ifdef SRP_TEST_SERVER
    405     if (connection->test_send_intercept != NULL) {
    406         return connection->test_send_intercept(connection, responding_to, iov, iov_len, final, send_length);
    407     }
    408 #endif
    409 
    410     // Not needed on OSX because UDP conversations are treated as "connections."
    411 #if UDP_LISTENER_USES_CONNECTION_GROUPS
    412     (void)responding_to;
    413 #else
    414     if (!connection->tcp_stream && connection->connection == NULL) {
    415         if (connection->io.fd != -1) {
    416             return ioloop_udp_send_message(connection, &responding_to->local, &responding_to->src, responding_to->ifindex, iov, iov_len);
    417         }
    418         return false;
    419     }
    420 #endif
    421 
    422     if (connection->connection == NULL
    423 #if UDP_LISTENER_USES_CONNECTION_GROUPS
    424         && connection->content_context == NULL
    425 #endif
    426         ) {
    427         ERROR("no connection");
    428         return false;
    429     }
    430 
    431     // Create a dispatch_data_t object that contains the data in the iov.
    432     for (i = 0; i < iov_len; i++) {
    433         new_data = dispatch_data_create(iov[i].iov_base, iov[i].iov_len,
    434                                         ioloop_main_queue, DISPATCH_DATA_DESTRUCTOR_DEFAULT);
    435         len += iov[i].iov_len;
    436         if (data != NULL) {
    437             if (new_data != NULL) {
    438                 // Subsequent times through
    439                 combined = dispatch_data_create_concat(data, new_data);
    440                 dispatch_release(data);
    441                 dispatch_release(new_data);
    442                 data = combined;
    443             } else {
    444                 // Fail
    445                 dispatch_release(data);
    446                 data = NULL;
    447             }
    448         } else {
    449             // First time through
    450             data = new_data;
    451         }
    452         if (data == NULL) {
    453             ERROR("ioloop_send_message: no memory.");
    454             return false;
    455         }
    456     }
    457 
    458     if (len == 0) {
    459         if (data) {
    460             dispatch_release(data);
    461         }
    462         ERROR("zero length");
    463         return false;
    464     }
    465 
    466     // TCP requires a length as well as the payload.
    467     if (send_length && connection->tcp_stream) {
    468         len = htons(len);
    469         new_data = dispatch_data_create(&len, sizeof (len), ioloop_main_queue, DISPATCH_DATA_DESTRUCTOR_DEFAULT);
    470         if (new_data == NULL) {
    471             if (data != NULL) {
    472                 dispatch_release(data);
    473             }
    474             ERROR("no memory for new_data");
    475             return false;
    476         }
    477         // Length is at beginning.
    478         combined = dispatch_data_create_concat(new_data, data);
    479         dispatch_release(data);
    480         dispatch_release(new_data);
    481         if (combined == NULL) {
    482             ERROR("no memory for combined");
    483             return false;
    484         }
    485         data = combined;
    486     }
    487 
    488     if (connection->pending_write != NULL) {
    489         ERROR("Dropping pending write on " PRI_S_SRP, connection->name ? connection->name : "<null>");
    490     }
    491     connection->pending_write = data;
    492     connection->final_data = final;
    493     if (connection->connection_ready) {
    494         return connection_write_now(connection);
    495     }
    496     return true;
    497 }
    498 
    499 bool
    500 ioloop_send_message(comm_t *connection, message_t *responding_to, struct iovec *iov, int iov_len)
    501 {
    502     return ioloop_send_message_inner(connection, responding_to, iov, iov_len, false, true);
    503 }
    504 
    505 bool
    506 ioloop_send_final_message(comm_t *connection, message_t *responding_to, struct iovec *iov, int iov_len)
    507 {
    508     return ioloop_send_message_inner(connection, responding_to, iov, iov_len, true, true);
    509 }
    510 
    511 bool
    512 ioloop_send_data(comm_t *connection, message_t *responding_to, struct iovec *iov, int iov_len)
    513 {
    514     return ioloop_send_message_inner(connection, responding_to, iov, iov_len, false, false);
    515 }
    516 
    517 bool
    518 ioloop_send_final_data(comm_t *connection, message_t *responding_to, struct iovec *iov, int iov_len)
    519 {
    520     return ioloop_send_message_inner(connection, responding_to, iov, iov_len, true, false);
    521 }
    522 
    523 #if UDP_LISTENER_USES_CONNECTION_GROUPS
    524 // For UDP messages, the context is only going to be used for one reply, so when the reply is sent, call the
    525 // disconnected callback.
    526 static void
    527 ioloop_disconnect_content_context(void *context)
    528 {
    529     comm_t *connection = context;
    530 
    531     if (connection->disconnected != NULL) {
    532         connection->disconnected(connection, connection->context, 0);
    533     }
    534     RELEASE_HERE(connection, comm);
    535 }
    536 #endif // UDP_LISTENER_USES_CONNECTION_GROUPS
    537 
    538 static bool
    539 connection_write_now(comm_t *connection)
    540 {
    541     if (false) {
    542 #if UDP_LISTENER_USES_CONNECTION_GROUPS
    543     } else if (connection->content_context != NULL) {
    544         nw_connection_group_reply(connection->listener_state->connection_group, connection->content_context,
    545                                   NW_CONNECTION_DEFAULT_MESSAGE_CONTEXT, connection->pending_write);
    546         if (connection->disconnected != NULL) {
    547             RETAIN_HERE(connection, comm);
    548             ioloop_run_async(ioloop_disconnect_content_context, connection);
    549         }
    550 #endif
    551     } else {
    552         // Retain the connection once for each write that's pending, so that it's never finalized while
    553         // there's a write in progress.
    554         connection->writes_pending++;
    555         RETAIN_HERE(connection, comm);
    556         nw_connection_send(connection->connection, connection->pending_write,
    557                            (connection->final_data
    558                             ? NW_CONNECTION_FINAL_MESSAGE_CONTEXT
    559                             : NW_CONNECTION_DEFAULT_MESSAGE_CONTEXT), true,
    560                            ^(nw_error_t  _Nullable error) {
    561                                if (error != NULL) {
    562                                    ERROR("ioloop_send_message: write failed: " PUB_S_SRP,
    563                                          strerror(nw_error_get_error_code(error)));
    564                                    connection_cancel(connection, connection->connection);
    565                                }
    566                                if (connection->writes_pending > 0) {
    567                                    connection->writes_pending--;
    568                                } else {
    569                                    ERROR("ioloop_send_message: write callback reached with no writes marked pending.");
    570                                }
    571                                RELEASE_HERE(connection, comm);
    572                            });
    573     }
    574     // nw_connection_send should retain this, so let go of our reference to it.
    575     dispatch_release(connection->pending_write);
    576     connection->pending_write = NULL;
    577     return true;
    578 }
    579 
    580 static bool
    581 datagram_read(comm_t *connection, size_t length, dispatch_data_t content, nw_error_t error)
    582 {
    583     message_t *message = NULL;
    584     bool ret = true, *retp = &ret;
    585 
    586     if (error != NULL) {
    587         ERROR(PUB_S_SRP, strerror(nw_error_get_error_code(error)));
    588         ret = false;
    589         goto out;
    590     }
    591     if (length > UINT16_MAX) {
    592         ERROR("oversized datagram length %zd", length);
    593         ret = false;
    594         goto out;
    595     }
    596     message = ioloop_message_create(length);
    597     if (message == NULL) {
    598         ERROR("unable to allocate message.");
    599         ret = false;
    600         goto out;
    601     }
    602     message->length = (uint16_t)length;
    603     dispatch_data_apply(content,
    604                         ^bool (dispatch_data_t __unused region, size_t offset, const void *buffer, size_t size) {
    605             if (message->length < offset + size) {
    606                 ERROR("data region %zd:%zd is out of range for message length %d",
    607                       offset, size, message->length);
    608                 *retp = false;
    609                 return false;
    610             }
    611             memcpy(((uint8_t *)&message->wire) + offset, buffer, size);
    612             return true;
    613         });
    614     if (ret == true) {
    615         // Set the local address
    616         message->local = connection->local;
    617 
    618 #ifdef HEXDUMP_INCOMING_DATAGRAMS
    619         uint16_t length = message->length > 8192 ? 8192 : message->length; // Don't dump really big messages
    620         for (uint16_t i = 0; i < length; i += 32) {
    621             char obuf[256];
    622             char *obp = obuf;
    623             int left = sizeof(obp) - 1;
    624             uint16_t max = message->length - i;
    625             if (max > 32) {
    626                 max = 32;
    627             }
    628             for (uint16_t j = 0; j < max && left > 0; j += 8) {
    629                 uint16_t submax = max - j;
    630                 if (submax > 8) {
    631                     submax = 8;
    632                 }
    633                 for (uint16_t k = 0; k < submax; k++) {
    634                     snprintf(obp, left, "%02x", ((uint8_t *)&message->wire)[i + j + k]);
    635                     obp += 2;
    636                     *obp++ = ' ';
    637                     left -= 3;
    638                 }
    639                 *obp++ = ' ';
    640                 left--;
    641             }
    642             *obp = 0;
    643             INFO("%03d " PUB_S_SRP, i, obuf);
    644         }
    645 #endif
    646         // Process the message.
    647         if (connection->listener_state != NULL) {
    648             connection->listener_state->datagram_callback(connection, message, connection->listener_state->context);
    649         } else {
    650             connection->datagram_callback(connection, message, connection->context);
    651         }
    652     }
    653 
    654     out:
    655     if (message != NULL) {
    656         ioloop_message_release(message);
    657     }
    658     if (!ret && connection->connection != NULL) {
    659         connection_cancel(connection, connection->connection);
    660     }
    661     return ret;
    662 }
    663 
    664 static void
    665 connection_error_to_string(nw_error_t error, char *errbuf, size_t errbuf_size)
    666 {
    667     CFErrorRef cfe = NULL;
    668     CFStringRef errString = NULL;
    669     errbuf[0] = 0;
    670     if (error != NULL) {
    671         cfe = nw_error_copy_cf_error(error);
    672         if (cfe != NULL) {
    673             errString = CFErrorCopyDescription(cfe);
    674             if (errString != NULL) {
    675                 CFStringGetCString(errString, errbuf, errbuf_size, kCFStringEncodingUTF8);
    676                 CFRelease(errString);
    677             }
    678             CFRelease(cfe);
    679         }
    680     }
    681     if (errbuf[0] == 0) {
    682         memcpy(errbuf, "<NULL>", 7);
    683     }
    684 }
    685 
    686 static bool
    687 check_fail(comm_t *connection, size_t length, dispatch_data_t content, nw_error_t error, const char *source)
    688 {
    689     bool fail = false;
    690     INFO(PRI_S_SRP ": length %zd, content %p, content_length %ld, error %p, source %s",
    691          connection->name, length, content, content == NULL ? -1 : (long)dispatch_data_get_size(content), error, source);
    692     if (error != NULL) {
    693         fail = true;
    694     } else if (connection->connection == NULL) {
    695         fail = true;
    696     } else if (content == NULL) {
    697         ERROR("no content returned in " PUB_S_SRP ": connection must have dropped unexpectedly for " PRI_S_SRP,
    698               source, connection->name);
    699         fail = true;
    700     } else if (dispatch_data_get_size(content) != length) {
    701         ERROR("short content returned in " PUB_S_SRP ": %zd != %zd: connection must have dropped unexpectedly for " PRI_S_SRP,
    702               source, length, dispatch_data_get_size(content), connection->name);
    703         fail = true;
    704     }
    705     if (fail) {
    706         if (connection->connection != NULL) {
    707             connection_cancel(connection, connection->connection);
    708         }
    709     }
    710     return fail;
    711 }
    712 
    713 static void
    714 tcp_read(comm_t *connection, size_t length, dispatch_data_t content, nw_error_t error)
    715 {
    716     if (check_fail(connection, length, content, error, "tcp_read")) {
    717         return;
    718     }
    719     if (datagram_read(connection, length, content, error)) {
    720         // Wait for the next frame
    721         ioloop_tcp_input_start(connection);
    722     }
    723 }
    724 
    725 static void
    726 tcp_read_length(comm_t *connection, dispatch_data_t content, nw_error_t error)
    727 {
    728     size_t length;
    729     uint32_t bytes_to_read;
    730     const uint8_t *lenbuf;
    731     dispatch_data_t map;
    732 
    733     if (check_fail(connection, 2, content, error, "tcp_read_length")) {
    734         return;
    735     }
    736 
    737     map = dispatch_data_create_map(content, (const void **)&lenbuf, &length);
    738     if (map == NULL) {
    739         ERROR("tcp_read_length: map create failed");
    740         connection_cancel(connection, connection->connection);
    741         return;
    742     }
    743     dispatch_release(map);
    744     bytes_to_read = ((unsigned)(lenbuf[0]) << 8) | ((unsigned)lenbuf[1]);
    745     RETAIN_HERE(connection, comm);
    746     nw_connection_receive(connection->connection, bytes_to_read, bytes_to_read,
    747                           ^(dispatch_data_t new_content, nw_content_context_t __unused new_context,
    748                             bool __unused is_complete, nw_error_t new_error) {
    749                               if (new_error) {
    750                                   char errbuf[512];
    751                                   connection_error_to_string(new_error, errbuf, sizeof(errbuf));
    752                                   INFO("%p: " PUB_S_SRP, connection, errbuf);
    753                                   goto out;
    754                               }
    755                               tcp_read(connection, bytes_to_read, new_content, new_error);
    756                           out:
    757                               RELEASE_HERE(connection, comm);
    758                           });
    759 }
    760 
    761 static bool
    762 ioloop_connection_input_badness_check(comm_t *connection, dispatch_data_t content, bool is_complete, nw_error_t error)
    763 {
    764     if (error) {
    765         char errbuf[512];
    766         connection_error_to_string(error, errbuf, sizeof(errbuf));
    767         INFO("%p: " PUB_S_SRP, connection, errbuf);
    768         return true;
    769     }
    770 
    771     // For TCP connections, is_complete means the other end closed the connection.
    772     if (connection->tcp_stream && is_complete) {
    773         INFO("remote end closed connection.");
    774         connection_cancel(connection, connection->connection);
    775         return true;
    776     }
    777 
    778     if (content == NULL) {
    779         INFO("remote end closed connection.");
    780         connection_cancel(connection, connection->connection);
    781         return true;
    782     }
    783     return false;
    784 }
    785 
    786 static void
    787 ioloop_tcp_input_start(comm_t *connection)
    788 {
    789     if (connection->connection == NULL) {
    790         return;
    791     }
    792 
    793     RETAIN_HERE(connection, comm); // nw_connection_receive callback retains connection
    794     nw_connection_receive(connection->connection, 2, 2,
    795                           ^(dispatch_data_t content, nw_content_context_t __unused context,
    796                             bool is_complete, nw_error_t error) {
    797                               if (!ioloop_connection_input_badness_check(connection, content, is_complete, error)) {
    798                                   tcp_read_length(connection, content, error);
    799                               }
    800                               RELEASE_HERE(connection, comm);
    801                           });
    802 }
    803 
    804 static void
    805 ioloop_udp_input_start(comm_t *connection)
    806 {
    807     RETAIN_HERE(connection, comm); // nw_connection_receive callback retains connection
    808     nw_connection_receive_message(connection->connection,
    809                                   ^(dispatch_data_t content, nw_content_context_t __unused context,
    810                                     bool __unused is_complete, nw_error_t error) {
    811                                       if (!ioloop_connection_input_badness_check(connection, content, is_complete, error)) {
    812                                           if (datagram_read(connection, dispatch_data_get_size(content), content, error)) {
    813                                               ioloop_udp_input_start(connection);
    814                                           }
    815                                       }
    816                                       RELEASE_HERE(connection, comm);
    817                                   });
    818 }
    819 
    820 static void
    821 ioloop_connection_state_changed(comm_t *connection, nw_connection_state_t state, nw_error_t error)
    822 {
    823     char errbuf[512];
    824     connection_error_to_string(error, errbuf, sizeof(errbuf));
    825 
    826     if (state == nw_connection_state_ready) {
    827         if (connection->server) {
    828             if (!ioloop_listener_connection_ready(connection)) {
    829                 ioloop_comm_cancel(connection);
    830                 return;
    831             }
    832         }
    833         INFO(PRI_S_SRP " (%p %p) state is ready; error = " PUB_S_SRP,
    834              connection->name != NULL ? connection->name : "<no name>", connection, connection->connection, errbuf);
    835         // Set up a reader.
    836         if (connection->tcp_stream) {
    837             ioloop_tcp_input_start(connection);
    838         } else {
    839             ioloop_udp_input_start(connection);
    840         }
    841         connection->connection_ready = true;
    842         // If there's a write pending, send it now.
    843         if (connection->pending_write) {
    844             connection_write_now(connection);
    845         }
    846         if (connection->connected != NULL) {
    847             connection->connected(connection, connection->context);
    848         }
    849     } else if (state == nw_connection_state_failed || state == nw_connection_state_waiting) {
    850         // Waiting is equivalent to failed because we are not giving libnetcore enough information to
    851         // actually succeed when there is a problem connecting (e.g. "EHOSTUNREACH").
    852         INFO(PRI_S_SRP " (%p %p) state is " PUB_S_SRP "; error = " PUB_S_SRP,
    853              connection->name != NULL ? connection->name : "<no name>", connection, connection->connection,
    854              state == nw_connection_state_failed ? "failed" : "waiting", errbuf);
    855         connection_cancel(connection, connection->connection);
    856     } else if (state == nw_connection_state_cancelled) {
    857         INFO(PRI_S_SRP " (%p %p) state is canceled; error = " PUB_S_SRP,
    858              connection->name != NULL ? connection->name : "<no name>", connection, connection->connection, errbuf);
    859         if (connection->disconnected != NULL) {
    860             connection->disconnected(connection, connection->context, 0);
    861         }
    862         // This releases the final reference to the connection object, which was held by the nw_connection_t.
    863         RELEASE_HERE(connection, comm);
    864     } else {
    865         if (error != NULL) {
    866             // We can get here if e.g. the TLS handshake fails.
    867             connection_cancel(connection, connection->connection);
    868         }
    869         INFO(PRI_S_SRP " (%p %p) state is %d; error = " PUB_S_SRP,
    870              connection->name != NULL ? connection->name : "<no name>", connection, connection->connection, state, errbuf);
    871     }
    872 }
    873 
    874 static void
    875 ioloop_connection_get_address_from_endpoint(addr_t *addr, nw_endpoint_t endpoint)
    876 {
    877     nw_endpoint_type_t endpoint_type = nw_endpoint_get_type(endpoint);
    878     if (endpoint_type == nw_endpoint_type_address) {
    879         char *address_string = nw_endpoint_copy_address_string(endpoint);
    880         if (address_string == NULL) {
    881             ERROR("unable to get description of new connection.");
    882         } else {
    883             getipaddr(addr, address_string);
    884             if (addr->sa.sa_family == AF_INET6) {
    885                 SEGMENTED_IPv6_ADDR_GEN_SRP(&addr->sin6.sin6_addr, rdata_buf);
    886                 INFO("parsed connection local IPv6 address is: " PRI_SEGMENTED_IPv6_ADDR_SRP,
    887                      SEGMENTED_IPv6_ADDR_PARAM_SRP(&addr->sin6.sin6_addr, rdata_buf));
    888             } else {
    889                 IPv4_ADDR_GEN_SRP(&addr->sin.sin_addr, rdata_buf);
    890                 INFO("parsed connection local IPv4 address is: " PRI_IPv4_ADDR_SRP,
    891                      IPv4_ADDR_PARAM_SRP(&addr->sin.sin_addr, rdata_buf));
    892             }
    893         }
    894         free(address_string);
    895     }
    896 }
    897 
    898 static void
    899 ioloop_connection_set_name_from_endpoint(comm_t *connection, nw_endpoint_t endpoint)
    900 {
    901     nw_endpoint_type_t endpoint_type = nw_endpoint_get_type(endpoint);
    902     if (endpoint_type == nw_endpoint_type_address) {
    903         char *port_string = nw_endpoint_copy_port_string(endpoint);
    904         char *address_string = nw_endpoint_copy_address_string(endpoint);
    905         if (port_string == NULL || address_string == NULL) {
    906             ERROR("Unable to get description of new connection.");
    907         } else {
    908             const char *listener_name = connection->name == NULL ? "bogus" : connection->name;
    909             char *free_name = connection->name;
    910             connection->name = NULL;
    911             asprintf(&connection->name, "%s connection from %s/%s", listener_name, address_string, port_string);
    912             if (free_name != NULL) {
    913                 free(free_name);
    914                 free_name = NULL;
    915                 listener_name = NULL;
    916             }
    917             getipaddr(&connection->address, address_string);
    918             if (connection->address.sa.sa_family == AF_INET6) {
    919                 SEGMENTED_IPv6_ADDR_GEN_SRP(&connection->address.sin6.sin6_addr, rdata_buf);
    920                 INFO("parsed connection remote IPv6 address is: " PRI_SEGMENTED_IPv6_ADDR_SRP,
    921                      SEGMENTED_IPv6_ADDR_PARAM_SRP(&connection->address.sin6.sin6_addr, rdata_buf));
    922             } else {
    923                 IPv4_ADDR_GEN_SRP(&connection->address.sin.sin_addr, rdata_buf);
    924                 INFO("parsed connection remote IPv4 address is: " PRI_IPv4_ADDR_SRP,
    925                      IPv4_ADDR_PARAM_SRP(&connection->address.sin.sin_addr, rdata_buf));
    926             }
    927         }
    928         free(port_string);
    929         free(address_string);
    930     } else {
    931         if (connection->name == NULL) {
    932             connection->name = nw_connection_copy_description(connection->connection);
    933         }
    934         ERROR("incoming connection " PRI_S_SRP " is of unexpected type %d", connection->name, endpoint_type);
    935     }
    936 }
    937 
    938 #if UDP_LISTENER_USES_CONNECTION_GROUPS
    939 static void
    940 ioloop_udp_receive(comm_t *listener, dispatch_data_t content, nw_content_context_t context, bool UNUSED is_complete)
    941 {
    942     bool proceed = true;
    943 
    944     if (content != NULL) {
    945         comm_t *response_state = calloc(1, sizeof (*response_state));
    946         if (response_state == NULL) {
    947             ERROR("%p: " PRI_S_SRP ": no memory for response state.", listener, listener->name);
    948             return;
    949         }
    950         response_state->serial = ++cur_connection_serial;
    951         RETAIN_HERE(response_state, comm);
    952         response_state->listener_state = listener;
    953         RETAIN_HERE(response_state->listener_state, listener);
    954         response_state->datagram_callback = listener->datagram_callback;
    955         response_state->content_context = context;
    956         nw_retain(response_state->content_context);
    957         response_state->connection_ready = true;
    958         const char *identifier = nw_content_context_get_identifier(context);
    959         response_state->name = strdup(identifier);
    960         proceed = datagram_read(response_state, dispatch_data_get_size(content), content, NULL);
    961         RELEASE_HERE(response_state, comm);
    962     }
    963 }
    964 #else
    965 #endif
    966 
    967 
    968 static bool
    969 ioloop_listener_connection_ready(comm_t *connection)
    970 {
    971 
    972     nw_endpoint_t endpoint = nw_connection_copy_endpoint(connection->connection);
    973     if (endpoint != NULL) {
    974         ioloop_connection_set_name_from_endpoint(connection, endpoint);
    975         nw_release(endpoint);
    976     }
    977     if (connection->name != NULL) {
    978         INFO("Received connection from " PRI_S_SRP, connection->name);
    979     } else {
    980         ERROR("Unable to get description of new connection.");
    981         connection->name = strdup("unidentified");
    982     }
    983 
    984     // Best effort
    985     nw_endpoint_t local_endpoint = nw_connection_copy_connected_local_endpoint(connection->connection);
    986     if (local_endpoint != NULL) {
    987         ioloop_connection_get_address_from_endpoint(&connection->local, endpoint);
    988         nw_release(local_endpoint);
    989     }
    990 
    991     if (connection->connected != NULL) {
    992         connection->connected(connection, connection->context);
    993     }
    994     return true;
    995 }
    996 
    997 static void
    998 ioloop_listener_connection_callback(comm_t *listener, nw_connection_t new_connection)
    999 {
   1000     nw_connection_set_queue(new_connection, ioloop_main_queue);
   1001     nw_connection_start(new_connection);
   1002 
   1003     comm_t *connection = calloc(1, sizeof *connection);
   1004     if (connection == NULL) {
   1005         ERROR("Unable to receive connection: no memory.");
   1006         nw_connection_cancel(new_connection);
   1007         return;
   1008     }
   1009     connection->serial = ++cur_connection_serial;
   1010 
   1011     connection->connection = new_connection;
   1012     nw_retain(connection->connection);
   1013     nw_connection_created++;
   1014 
   1015     connection->name = strdup(listener->name);
   1016     connection->datagram_callback = listener->datagram_callback;
   1017     connection->tcp_stream = listener->tcp_stream;
   1018     connection->server = true;
   1019     connection->context = listener->context;
   1020     connection->connected = listener->connected;
   1021     RETAIN_HERE(connection, comm); // The connection state changed handler has a reference to the connection.
   1022     nw_connection_set_state_changed_handler(connection->connection,
   1023                                             ^(nw_connection_state_t state, nw_error_t error)
   1024                                             { ioloop_connection_state_changed(connection, state, error); });
   1025     INFO("started " PRI_S_SRP, connection->name);
   1026 }
   1027 
   1028 static void
   1029 listener_finalize(comm_t *listener)
   1030 {
   1031     if (listener->listener != NULL) {
   1032         nw_release(listener->listener);
   1033         nw_listener_finalized++;
   1034         listener->listener = NULL;
   1035     }
   1036 #if UDP_LISTENER_USES_CONNECTION_GROUPS
   1037     if (listener->connection_group) {
   1038         nw_release(listener->connection_group);
   1039         listener->connection_group = NULL;
   1040     }
   1041 #endif
   1042     if (listener->name != NULL) {
   1043         free(listener->name);
   1044     }
   1045     if (listener->parameters) {
   1046         nw_release(listener->parameters);
   1047     }
   1048     if (listener->avoid_ports != NULL) {
   1049         free(listener->avoid_ports);
   1050     }
   1051     if (listener->finalize) {
   1052         listener->finalize(listener->context);
   1053     }
   1054     free(listener);
   1055 }
   1056 
   1057 void
   1058 ioloop_listener_retain_(comm_t *listener, const char *file, int line)
   1059 {
   1060     RETAIN(listener, listener);
   1061 }
   1062 
   1063 void
   1064 ioloop_listener_release_(comm_t *listener, const char *file, int line)
   1065 {
   1066     RELEASE(listener, listener);
   1067 }
   1068 
   1069 static void ioloop_listener_context_release(void *context)
   1070 {
   1071     comm_t *listener = context;
   1072     RELEASE_HERE(listener, listener);
   1073 }
   1074 
   1075 void
   1076 ioloop_listener_cancel(comm_t *connection)
   1077 {
   1078     // Only need to do it once.
   1079     if (connection->canceled) {
   1080         FAULT("cancel on canceled connection " PRI_S_SRP, connection->name);
   1081         return;
   1082     }
   1083     connection->canceled = true;
   1084     if (connection->listener != NULL) {
   1085         nw_listener_cancel(connection->listener);
   1086         // connection->listener will be released in ioloop_listener_state_changed_handler: nw_listener_state_cancelled.
   1087     }
   1088 #if UDP_LISTENER_USES_CONNECTION_GROUPS
   1089     if (connection->connection_group != NULL) {
   1090         INFO("%p %p", connection, connection->connection_group);
   1091         nw_connection_group_cancel(connection->connection_group);
   1092     }
   1093 #else
   1094     if (!connection->tcp_stream && connection->connection == NULL) {
   1095         int fd = connection->io.fd;
   1096         if (fd != -1) {
   1097             ioloop_close(&connection->io);
   1098             if (connection->cancel != NULL) {
   1099                 RETAIN_HERE(connection, listener);
   1100                 dispatch_async(ioloop_main_queue, ^{
   1101                         if (connection->cancel != NULL) {
   1102                             connection->cancel(connection, connection->context);
   1103                         }
   1104                         RELEASE_HERE(connection, listener);
   1105                     });
   1106             }
   1107         }
   1108     }
   1109 #endif
   1110 }
   1111 
   1112 #if UDP_LISTENER_USES_CONNECTION_GROUPS
   1113 static bool ioloop_udp_listener_setup(comm_t *listener);
   1114 
   1115 static void
   1116 ioloop_udp_listener_state_changed_handler(comm_t *listener, nw_connection_group_state_t state, nw_error_t error)
   1117 {
   1118     int i;
   1119 
   1120 #ifdef DEBUG_VERBOSE
   1121     if (listener->connection_group == NULL) {
   1122         if (state == nw_listener_state_cancelled) {
   1123             INFO("nw_connection_group gets released before the final nw_connection_group_state_cancelled event - name: " PRI_S_SRP,
   1124                  listener->name);
   1125         } else {
   1126             ERROR("nw_connection_group gets released before the connection_group is canceled - name: " PRI_S_SRP ", state: %d",
   1127                   listener->name, state);
   1128         }
   1129     }
   1130 #endif // DEBUG_VERBOSE
   1131 
   1132     // Should never happen.
   1133     if (listener->connection_group == NULL && state != nw_connection_group_state_cancelled) {
   1134         return;
   1135     }
   1136 
   1137     if (error != NULL) {
   1138         char errbuf[512];
   1139         connection_error_to_string(error, errbuf, sizeof(errbuf));
   1140         INFO("state changed: " PUB_S_SRP, errbuf);
   1141         if (listener->connection_group != NULL) {
   1142             nw_connection_group_cancel(listener->connection_group);
   1143         }
   1144     } else {
   1145         if (state == nw_connection_group_state_waiting) {
   1146             INFO("waiting");
   1147             return;
   1148         } else if (state == nw_connection_group_state_failed) {
   1149             INFO("failed");
   1150             nw_connection_group_cancel(listener->connection_group);
   1151         } else if (state == nw_connection_group_state_ready) {
   1152             // It's possible that we might schedule the ready event but then before we return to the run loop
   1153             // the listener gets canceled, in which case we don't want to deliver the ready event.
   1154             if (listener->canceled) {
   1155                 INFO("ready but canceled");
   1156                 return;
   1157             }
   1158             INFO("ready");
   1159             if (listener->avoiding) {
   1160                 listener->listen_port = nw_connection_group_get_port(listener->connection_group);
   1161                 if (listener->avoid_ports != NULL) {
   1162                     for (i = 0; i < listener->num_avoid_ports; i++) {
   1163                         if (listener->avoid_ports[i] == listener->listen_port) {
   1164                             INFO("Got port %d, which we are avoiding.",
   1165                                  listener->listen_port);
   1166                             listener->avoiding = true;
   1167                             listener->listen_port = 0;
   1168                             nw_connection_group_cancel(listener->connection_group);
   1169                             return;
   1170                         }
   1171                     }
   1172                 }
   1173                 INFO("Got port %d.", listener->listen_port);
   1174                 listener->avoiding = false;
   1175                 if (listener->ready) {
   1176                     listener->ready(listener->context, listener->listen_port);
   1177                 }
   1178             }
   1179         } else if (state == nw_connection_group_state_cancelled) {
   1180             INFO("cancelled");
   1181             nw_release(listener->connection_group);
   1182             nw_listener_finalized++;
   1183             listener->connection_group = NULL;
   1184             if (listener->avoiding) {
   1185                 if (!ioloop_udp_listener_setup(listener)) {
   1186                     ERROR("ioloop_listener_state_changed_handler: Unable to recreate listener.");
   1187                     goto cancel;
   1188                 } else {
   1189                     nw_listener_created++;
   1190                 }
   1191             } else {
   1192                 ;
   1193             cancel:
   1194                 if (listener->cancel) {
   1195                     listener->cancel(listener, listener->context);
   1196                 }
   1197                 RELEASE_HERE(listener, listener);
   1198             }
   1199         }
   1200     }
   1201 }
   1202 #endif // UDP_LISTENER_USES_CONNECTION_GROUPS
   1203 
   1204 static void
   1205 ioloop_listener_state_changed_handler(comm_t *listener, nw_listener_state_t state, nw_error_t error)
   1206 {
   1207 #ifdef DEBUG_VERBOSE
   1208     if (listener->listener == NULL) {
   1209         if (state == nw_listener_state_cancelled) {
   1210             INFO("nw_listener gets released before the final nw_listener_state_cancelled event - name: " PRI_S_SRP,
   1211                  listener->name);
   1212         } else {
   1213             ERROR("nw_listener gets released before the listener is canceled - name: " PRI_S_SRP ", state: %d",
   1214                   listener->name, state);
   1215         }
   1216     }
   1217 #endif // DEBUG_VERBOSE
   1218 
   1219     INFO("%p %p " PUB_S_SRP " %d", listener, listener->listener, listener->name, state);
   1220 
   1221     // Should never happen.
   1222     if (listener->listener == NULL && state != nw_listener_state_cancelled) {
   1223         return;
   1224     }
   1225 
   1226     if (error != NULL) {
   1227         char errbuf[512];
   1228         connection_error_to_string(error, errbuf, sizeof(errbuf));
   1229         INFO("state changed: " PUB_S_SRP, errbuf);
   1230         if (listener->listener != NULL) {
   1231             nw_listener_cancel(listener->listener);
   1232         }
   1233     } else {
   1234         if (state == nw_listener_state_waiting) {
   1235             INFO("waiting");
   1236             return;
   1237         } else if (state == nw_listener_state_failed) {
   1238             INFO("failed");
   1239             nw_listener_cancel(listener->listener);
   1240         } else if (state == nw_listener_state_ready) {
   1241             INFO("ready");
   1242             if (listener->ready != NULL) {
   1243                 listener->ready(listener->context, listener->listen_port);
   1244             }
   1245         } else if (state == nw_listener_state_cancelled) {
   1246             INFO("cancelled");
   1247             nw_release(listener->listener);
   1248             nw_listener_finalized++;
   1249             listener->listener = NULL;
   1250             if (listener->cancel != NULL) {
   1251                 listener->cancel(listener, listener->context);
   1252             }
   1253             RELEASE_HERE(listener, listener); // Release the nw_listener handler function's reference to the ioloop listener object.
   1254         } else {
   1255             INFO("something else");
   1256         }
   1257     }
   1258 }
   1259 
   1260 #if UDP_LISTENER_USES_CONNECTION_GROUPS
   1261 static bool
   1262 ioloop_udp_listener_setup(comm_t *listener)
   1263 {
   1264     listener->connection_group = nw_connection_group_create_with_parameters(listener->parameters);
   1265     if (listener->connection_group == NULL) {
   1266         return false;
   1267     }
   1268     nw_connection_group_set_state_changed_handler(listener->connection_group,
   1269                                                   ^(nw_connection_group_state_t state, nw_error_t error) {
   1270             ioloop_udp_listener_state_changed_handler(listener, state, error);
   1271         });
   1272     nw_connection_group_set_receive_handler(listener->connection_group, DNS_MAX_UDP_PAYLOAD, true,
   1273                                             ^(dispatch_data_t  _Nullable content,
   1274                                               nw_content_context_t  _Nonnull receive_context, bool is_complete) {
   1275                                                 ioloop_udp_receive(listener, content, receive_context, is_complete);
   1276                                             });
   1277     RETAIN_HERE(listener, listener); // For the handlers.
   1278 
   1279     // Start the connection group listener
   1280     nw_connection_group_set_queue(listener->connection_group, ioloop_main_queue);
   1281     nw_connection_group_start(listener->connection_group);
   1282     return true;
   1283 }
   1284 #else
   1285 static comm_t *
   1286 ioloop_udp_listener_setup(comm_t *listener, const addr_t *ip_address, uint16_t port, const char *launchd_name, int ifindex)
   1287 {
   1288     sa_family_t family = (ip_address != NULL) ? ip_address->sa.sa_family : AF_UNSPEC;
   1289     sa_family_t real_family = family == AF_UNSPEC ? AF_INET6 : family;
   1290     int true_flag = 1;
   1291     addr_t sockname;
   1292     socklen_t sl;
   1293     int rv;
   1294 
   1295     listener->address.sa.sa_family = real_family;
   1296     listener->address.sa.sa_len = (real_family == AF_INET
   1297                                    ? sizeof(listener->address.sin)
   1298                                    : sizeof(listener->address.sin6));
   1299     if (real_family == AF_INET6) {
   1300         listener->address.sin6.sin6_port = htons(port);
   1301     } else {
   1302         listener->address.sin.sin_port = htons(port);
   1303     }
   1304 
   1305     listener->io.fd = -1;
   1306 #ifndef SRP_TEST_SERVER
   1307     if (launchd_name != NULL) {
   1308         int *fds;
   1309         size_t cnt;
   1310         int ret = launch_activate_socket(launchd_name, &fds, &cnt);
   1311         if (ret != 0) {
   1312             FAULT("launchd_activate_socket failed for " PUB_S_SRP ": " PUB_S_SRP, launchd_name, strerror(ret));
   1313             listener->io.fd = -1;
   1314         } else if (cnt == 0) {
   1315             FAULT("too few sockets returned from launchd_active_socket for " PUB_S_SRP" : %zd", launchd_name, cnt);
   1316             listener->io.fd = -1;
   1317         } else if (cnt != 1) {
   1318             FAULT("too many sockets returned from launchd_active_socket for " PUB_S_SRP" : %zd", launchd_name, cnt);
   1319             for (size_t i = 0; i < cnt; i++) {
   1320                 close(fds[i]);
   1321             }
   1322             free(fds);
   1323         } else {
   1324             listener->io.fd = fds[0];
   1325             free(fds);
   1326         }
   1327     }
   1328 #endif
   1329     if (listener->io.fd == -1) {
   1330         listener->io.fd = socket(real_family, SOCK_DGRAM, IPPROTO_UDP);
   1331         if (listener->io.fd < 0) {
   1332             ERROR("Can't get socket: %s", strerror(errno));
   1333             goto out;
   1334         }
   1335         rv = setsockopt(listener->io.fd, SOL_SOCKET, SO_REUSEADDR, &true_flag, sizeof true_flag);
   1336         if (rv < 0) {
   1337             ERROR("SO_REUSEADDR failed: %s", strerror(errno));
   1338             goto out;
   1339         }
   1340 
   1341         rv = setsockopt(listener->io.fd, SOL_SOCKET, SO_REUSEPORT, &true_flag, sizeof true_flag);
   1342         if (rv < 0) {
   1343             ERROR("SO_REUSEPORT failed: %s", strerror(errno));
   1344             goto out;
   1345         }
   1346 
   1347         // shift the DSCP value to the left by 2 bits to make the 8-bit field
   1348         int dscp = DSCP_CS5 << 2;
   1349         if (real_family == AF_INET6) {
   1350             // IPV6_TCLASS.
   1351             rv = setsockopt(listener->io.fd, IPPROTO_IPV6, IPV6_TCLASS, &dscp, sizeof(dscp));
   1352             if (rv < 0) {
   1353                 ERROR("IPV6_TCLASS failed: %s", strerror(errno));
   1354                 goto out;
   1355             }
   1356         } else {
   1357             // IP_TOS
   1358             rv = setsockopt(listener->io.fd, IPPROTO_IP, IP_TOS, &dscp, sizeof(dscp));
   1359             if (rv < 0) {
   1360                 ERROR("IP_TOS failed: %s", strerror(errno));
   1361                 goto out;
   1362             }
   1363         }
   1364         // skipping multicast support for now
   1365 
   1366         if (family == AF_INET6) {
   1367             // Don't use a dual-stack socket.
   1368             rv = setsockopt(listener->io.fd, IPPROTO_IPV6, IPV6_V6ONLY, &true_flag, sizeof true_flag);
   1369             if (rv < 0) {
   1370                 SEGMENTED_IPv6_ADDR_GEN_SRP(listener->address.sin6.sin6_addr.s6_addr, ipv6_addr_buf);
   1371                 ERROR("Unable to set IPv6-only flag on UDP socket for " PRI_SEGMENTED_IPv6_ADDR_SRP,
   1372                       SEGMENTED_IPv6_ADDR_PARAM_SRP(listener->address.sin6.sin6_addr.s6_addr, ipv6_addr_buf));
   1373                 goto out;
   1374             }
   1375             SEGMENTED_IPv6_ADDR_GEN_SRP(listener->address.sin6.sin6_addr.s6_addr, ipv6_addr_buf);
   1376             ERROR("Successfully set IPv6-only flag on UDP socket for " PRI_SEGMENTED_IPv6_ADDR_SRP,
   1377                   SEGMENTED_IPv6_ADDR_PARAM_SRP(listener->address.sin6.sin6_addr.s6_addr, ipv6_addr_buf));
   1378         }
   1379 
   1380         sl = listener->address.sa.sa_len;
   1381         if (bind(listener->io.fd, &listener->address.sa, sl) < 0) {
   1382             if (family == AF_INET) {
   1383                 IPv4_ADDR_GEN_SRP(&listener->address.sin.sin_addr.s_addr, ipv4_addr_buf);
   1384                 ERROR("Can't bind to " PRI_IPv4_ADDR_SRP "#%d: %s",
   1385                       IPv4_ADDR_PARAM_SRP(&listener->address.sin.sin_addr.s_addr, ipv4_addr_buf), ntohs(port),
   1386                       strerror(errno));
   1387             } else {
   1388                 SEGMENTED_IPv6_ADDR_GEN_SRP(&listener->address.sin6.sin6_addr.s6_addr, ipv6_addr_buf);
   1389                 ERROR("Can't bind to " PRI_SEGMENTED_IPv6_ADDR_SRP "#%d: %s",
   1390                       SEGMENTED_IPv6_ADDR_PARAM_SRP(&listener->address.sin6.sin6_addr.s6_addr, ipv6_addr_buf), ntohs(port),
   1391                       strerror(errno));
   1392             }
   1393         out:
   1394             close(listener->io.fd);
   1395             listener->io.fd = -1;
   1396             RELEASE_HERE(listener, listener);
   1397             return NULL;
   1398         }
   1399     }
   1400 
   1401     if (fcntl(listener->io.fd, F_SETFL, O_NONBLOCK) < 0) {
   1402         ERROR("%s: Can't set O_NONBLOCK: %s", listener->name, strerror(errno));
   1403         goto out;
   1404     }
   1405 
   1406     // We may have bound to an unspecified port, so fetch the port we got. Or we may have got the port from
   1407     // launchd, in which case let's make sure we got the right port.
   1408     if (launchd_name != NULL || port == 0) {
   1409         sl = sizeof(sockname);
   1410         if (getsockname(listener->io.fd, (struct sockaddr *)&sockname, &sl) < 0) {
   1411             ERROR("getsockname: %s", strerror(errno));
   1412             goto out;
   1413         }
   1414         listener->listen_port = ntohs(real_family == AF_INET6 ? sockname.sin6.sin6_port : sockname.sin.sin_port);
   1415         if (launchd_name != NULL && listener->listen_port != port) {
   1416             ERROR("launchd port mismatch: %u %u", port, listener->listen_port);
   1417         }
   1418     } else {
   1419         listener->listen_port = port;
   1420     }
   1421     INFO("port is %d", listener->listen_port);
   1422 
   1423     if (ifindex != 0) {
   1424         setsockopt(listener->io.fd, IPPROTO_IP, IP_BOUND_IF, &ifindex, sizeof(ifindex));
   1425         setsockopt(listener->io.fd, IPPROTO_IPV6, IPV6_BOUND_IF, &ifindex, sizeof(ifindex));
   1426     }
   1427     rv = setsockopt(listener->io.fd, family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6,
   1428                     family == AF_INET ? IP_PKTINFO : IPV6_RECVPKTINFO, &true_flag, sizeof true_flag);
   1429     if (rv < 0) {
   1430         ERROR("Can't set %s: %s.", family == AF_INET ? "IP_PKTINFO" : "IPV6_RECVPKTINFO",
   1431               strerror(errno));
   1432         goto out;
   1433     }
   1434     ioloop_add_reader(&listener->io, ioloop_udp_read_callback);
   1435     RETAIN_HERE(listener, listener); // For the reader
   1436     listener->io.context = listener;
   1437     listener->io.is_static = true;
   1438     listener->io.context_release = ioloop_listener_context_release;
   1439 
   1440     // If there's a ready callback, call it.
   1441     if (listener->ready != NULL) {
   1442         RETAIN_HERE(listener, listener); // For the ready callback
   1443         dispatch_async(ioloop_main_queue, ^{
   1444                 // It's possible that we might schedule the ready event but then before we return to the run loop
   1445                 // the listener gets canceled, in which case we don't want to deliver the ready event.
   1446                 if (listener->canceled) {
   1447                     INFO("ready but canceled");
   1448                 } else {
   1449                     if (listener->ready != NULL) {
   1450                         listener->ready(listener->context, listener->listen_port);
   1451                     }
   1452                 }
   1453                 RELEASE_HERE(listener, listener);
   1454             });
   1455     }
   1456     return listener;
   1457 }
   1458 #endif // UDP_LISTENER_USES_CONNECTION_GROUPS
   1459 
   1460 comm_t *
   1461 ioloop_listener_create(bool stream, bool tls, bool launchd, uint16_t *avoid_ports, int num_avoid_ports,
   1462                        const addr_t *ip_address, const char *multicast, const char *name,
   1463                        datagram_callback_t datagram_callback, connect_callback_t connected, cancel_callback_t cancel,
   1464                        ready_callback_t ready, finalize_callback_t finalize, tls_config_callback_t tls_config,
   1465                        unsigned ifindex, void *context)
   1466 {
   1467     comm_t *listener;
   1468     int family = (ip_address != NULL) ? ip_address->sa.sa_family : AF_UNSPEC;
   1469     uint16_t port;
   1470     char portbuf[10];
   1471     nw_endpoint_t endpoint;
   1472 
   1473     if (ip_address == NULL) {
   1474         port = 0;
   1475     } else {
   1476         port = (family == AF_INET) ? ntohs(ip_address->sin.sin_port) : ntohs(ip_address->sin6.sin6_port);
   1477     }
   1478 
   1479     if (multicast != NULL) {
   1480         ERROR("ioloop_setup_listener: multicast not supported.");
   1481         return NULL;
   1482     }
   1483 
   1484     if (datagram_callback == NULL) {
   1485         ERROR("ioloop_setup: no datagram callback provided.");
   1486         return NULL;
   1487     }
   1488 
   1489     snprintf(portbuf, sizeof(portbuf), "%d", port);
   1490     listener = calloc(1, sizeof(*listener));
   1491     if (listener == NULL) {
   1492         if (ip_address == NULL) {
   1493             ERROR("No memory for listener on <NULL>#%d", port);
   1494         } else if (family == AF_INET) {
   1495             IPv4_ADDR_GEN_SRP(&ip_address->sin.sin_addr.s_addr, ipv4_addr_buf);
   1496             ERROR("No memory for listener on " PRI_IPv4_ADDR_SRP "#%d",
   1497                   IPv4_ADDR_PARAM_SRP(&ip_address->sin.sin_addr.s_addr, ipv4_addr_buf), port);
   1498         } else if (family == AF_INET6) {
   1499             SEGMENTED_IPv6_ADDR_GEN_SRP(ip_address->sin6.sin6_addr.s6_addr, ipv6_addr_buf);
   1500             ERROR("No memory for listener on " PRI_SEGMENTED_IPv6_ADDR_SRP "#%d",
   1501                   SEGMENTED_IPv6_ADDR_PARAM_SRP(ip_address->sin6.sin6_addr.s6_addr, ipv6_addr_buf), port);
   1502         } else {
   1503             ERROR("No memory for listener on <family address other than AF_INET or AF_INET6: %d>#%d", family, port);
   1504         }
   1505         return NULL;
   1506     }
   1507     listener->serial = ++cur_connection_serial;
   1508     if (avoid_ports != NULL) {
   1509         listener->avoid_ports = malloc(num_avoid_ports * sizeof(uint16_t));
   1510         if (listener->avoid_ports == NULL) {
   1511             if (ip_address == NULL) {
   1512                 ERROR("No memory for listener avoid_ports on <NULL>#%d", port);
   1513             } else if (family == AF_INET) {
   1514                 IPv4_ADDR_GEN_SRP(&ip_address->sin.sin_addr.s_addr, ipv4_addr_buf);
   1515                 ERROR("No memory for listener avoid_ports on " PRI_IPv4_ADDR_SRP "#%d",
   1516                       IPv4_ADDR_PARAM_SRP(&ip_address->sin.sin_addr.s_addr, ipv4_addr_buf), port);
   1517             } else if (family == AF_INET6) {
   1518                 SEGMENTED_IPv6_ADDR_GEN_SRP(ip_address->sin6.sin6_addr.s6_addr, ipv6_addr_buf);
   1519                 ERROR("No memory for listener avoid_ports on " PRI_SEGMENTED_IPv6_ADDR_SRP "#%d",
   1520                       SEGMENTED_IPv6_ADDR_PARAM_SRP(ip_address->sin6.sin6_addr.s6_addr, ipv6_addr_buf), port);
   1521             } else {
   1522                 ERROR("No memory for listener avoid_ports on <family address other than AF_INET or AF_INET6: %d>#%d",
   1523                       family, port);
   1524             }
   1525             free(listener);
   1526             return NULL;
   1527         }
   1528         listener->num_avoid_ports = num_avoid_ports;
   1529         listener->avoiding = true;
   1530     }
   1531     RETAIN_HERE(listener, listener);
   1532     listener->name = strdup(name);
   1533     if (listener->name == NULL) {
   1534         ERROR("no memory for listener name.");
   1535         RELEASE_HERE(listener, listener);
   1536         return NULL;
   1537     }
   1538     listener->ready = ready;
   1539     listener->context = context;
   1540     listener->tcp_stream = stream;
   1541     listener->is_listener = true;
   1542 
   1543 #if !UDP_LISTENER_USES_CONNECTION_GROUPS
   1544     if (stream == FALSE) {
   1545         comm_t *ret = ioloop_udp_listener_setup(listener, ip_address, port, launchd ? name : NULL, ifindex);
   1546         if (ret == NULL) {
   1547             return ret;
   1548         }
   1549     }
   1550 #endif
   1551 
   1552     listener->datagram_callback = datagram_callback;
   1553     listener->cancel = cancel;
   1554     listener->finalize = finalize;
   1555     listener->connected = connected;
   1556 
   1557 #if !UDP_LISTENER_USES_CONNECTION_GROUPS
   1558     if (stream == FALSE) {
   1559         return listener;
   1560     }
   1561 #endif
   1562     if (port == 0) {
   1563         endpoint = NULL;
   1564         // Even though we don't have any ports to avoid, we still want the "avoiding" behavior in this case, since that
   1565         // is what triggers a call to the ready handler, which passes the port number that we got to it.
   1566         listener->avoiding = true;
   1567     } else {
   1568         listener->listen_port = port;
   1569         char ip_address_str[MAX(INET_ADDRSTRLEN, INET6_ADDRSTRLEN)];
   1570         if (ip_address == NULL || family == AF_UNSPEC) {
   1571             if (family == AF_INET) {
   1572                 snprintf(ip_address_str, sizeof(ip_address_str), "0.0.0.0");
   1573             } else {
   1574                 // AF_INET6 or AF_UNSPEC
   1575                 snprintf(ip_address_str, sizeof(ip_address_str), "::");
   1576             }
   1577         } else {
   1578             if (family == AF_INET) {
   1579                 inet_ntop(family, &ip_address->sin.sin_addr, ip_address_str, sizeof(ip_address_str));
   1580             } else {
   1581                 inet_ntop(family, &ip_address->sin6.sin6_addr, ip_address_str, sizeof(ip_address_str));
   1582             }
   1583         }
   1584         endpoint = nw_endpoint_create_host(ip_address_str, portbuf);
   1585         if (endpoint == NULL) {
   1586             ERROR("No memory for listener endpoint.");
   1587             RELEASE_HERE(listener, listener);
   1588             return NULL;
   1589         }
   1590     }
   1591     if (stream) {
   1592         nw_parameters_configure_protocol_block_t configure_tls_block = NW_PARAMETERS_DISABLE_PROTOCOL;
   1593         if (tls && tls_config != NULL) {
   1594             configure_tls_block = ^(nw_protocol_options_t tls_options) {
   1595                 tls_config_context_t tls_context = {tls_options, ioloop_main_queue};
   1596                 tls_config((void *)&tls_context);
   1597             };
   1598         }
   1599 
   1600         listener->parameters = nw_parameters_create_secure_tcp(configure_tls_block, NW_PARAMETERS_DEFAULT_CONFIGURATION);
   1601     } else {
   1602         if (tls) {
   1603             ERROR("DTLS support not implemented.");
   1604             nw_release(endpoint);
   1605             RELEASE_HERE(listener, listener);
   1606             return NULL;
   1607         }
   1608 #if UDP_LISTENER_USES_CONNECTION_GROUPS
   1609         listener->parameters = nw_parameters_create_secure_udp(NW_PARAMETERS_DISABLE_PROTOCOL,
   1610                                                                NW_PARAMETERS_DEFAULT_CONFIGURATION);
   1611 #endif
   1612     }
   1613     if (listener->parameters == NULL) {
   1614         ERROR("No memory for listener parameters.");
   1615         nw_release(endpoint);
   1616         RELEASE_HERE(listener, listener);
   1617         return NULL;
   1618     }
   1619 
   1620     if (endpoint != NULL) {
   1621         nw_parameters_set_local_endpoint(listener->parameters, endpoint);
   1622         nw_release(endpoint);
   1623     }
   1624 
   1625     // Set SO_REUSEADDR.
   1626     nw_parameters_set_reuse_local_address(listener->parameters, true);
   1627 
   1628 
   1629     if (stream) {
   1630         // Create the nw_listener_t.
   1631         listener->listener = NULL;
   1632 #ifndef SRP_TEST_SERVER
   1633         if (launchd && name != NULL) {
   1634             listener->listener = nw_listener_create_with_launchd_key(listener->parameters, name);
   1635             if (listener->listener == NULL) {
   1636                 ERROR("launchd listener create failed, trying to create it without relying on launchd.");
   1637             }
   1638         }
   1639 #endif
   1640         if (listener->listener == NULL) {
   1641             listener->listener = nw_listener_create(listener->parameters);
   1642         }
   1643         if (listener->listener == NULL) {
   1644             ERROR("no memory for nw_listener object");
   1645             RELEASE_HERE(listener, listener);
   1646             return NULL;
   1647         }
   1648         nw_listener_created++;
   1649         nw_listener_set_new_connection_handler(listener->listener,
   1650                                                ^(nw_connection_t connection) {
   1651                                                    ioloop_listener_connection_callback(listener, connection);
   1652                                                });
   1653 
   1654         nw_listener_set_state_changed_handler(listener->listener, ^(nw_listener_state_t state, nw_error_t error) {
   1655             ioloop_listener_state_changed_handler(listener, state, error);
   1656         });
   1657         RETAIN_HERE(listener, listener); // for the nw_listener_t state change handler callback
   1658         nw_listener_set_queue(listener->listener, ioloop_main_queue);
   1659         nw_listener_start(listener->listener);
   1660 #if UDP_LISTENER_USES_CONNECTION_GROUPS
   1661     } else {
   1662         if (launchd) {
   1663             FAULT("launchd not yet supported for connection groups");
   1664             return NULL;
   1665         }
   1666         if (!ioloop_udp_listener_setup(listener)) {
   1667             RELEASE_HERE(listener, listener);
   1668             return NULL;
   1669         }
   1670 #endif // UDP_LISTENER_USES_CONNECTION_GROUPS
   1671     }
   1672 
   1673     // Listener has one refcount
   1674     return listener;
   1675 }
   1676 
   1677 comm_t *
   1678 ioloop_connection_create(addr_t *NONNULL remote_address, bool tls, bool stream, bool stable, bool opportunistic,
   1679                          datagram_callback_t datagram_callback, connect_callback_t connected,
   1680                          disconnect_callback_t disconnected, finalize_callback_t finalize, void *context)
   1681 {
   1682     comm_t *connection;
   1683     char portbuf[10];
   1684     nw_parameters_t parameters;
   1685     nw_endpoint_t endpoint;
   1686     char addrbuf[INET6_ADDRSTRLEN];
   1687 
   1688     inet_ntop(remote_address->sa.sa_family, (remote_address->sa.sa_family == AF_INET
   1689                                              ? (void *)&remote_address->sin.sin_addr
   1690                                              : (void *)&remote_address->sin6.sin6_addr), addrbuf, sizeof addrbuf);
   1691     snprintf(portbuf, sizeof(portbuf), "%d", (remote_address->sa.sa_family == AF_INET
   1692                             ? ntohs(remote_address->sin.sin_port)
   1693                             : ntohs(remote_address->sin6.sin6_port)));
   1694     connection = calloc(1, sizeof(*connection));
   1695     if (connection == NULL) {
   1696         ERROR("No memory for connection");
   1697         return NULL;
   1698     }
   1699     connection->serial = ++cur_connection_serial;
   1700 
   1701     // If we don't release this because of an error, this is the caller's reference to the comm_t.
   1702     RETAIN_HERE(connection, comm);
   1703     endpoint = nw_endpoint_create_host(addrbuf, portbuf);
   1704     if (endpoint == NULL) {
   1705         ERROR("No memory for connection endpoint.");
   1706         RELEASE_HERE(connection, comm);
   1707         return NULL;
   1708     }
   1709 
   1710     if (stream) {
   1711         nw_parameters_configure_protocol_block_t configure_tls = NW_PARAMETERS_DISABLE_PROTOCOL;
   1712         if (tls) {
   1713             // This sets up a block that's called when we get a TLS connection and want to verify
   1714             // the cert.   Right now we only support opportunistic security, which means we have
   1715             // no way to validate the cert.   Future work: add support for validating the cert
   1716             // using a TLSA record if one is present.
   1717             configure_tls = ^(nw_protocol_options_t tls_options) {
   1718                 sec_protocol_options_t sec_options = nw_tls_copy_sec_protocol_options(tls_options);
   1719                 sec_protocol_options_set_verify_block(sec_options,
   1720                                                       ^(sec_protocol_metadata_t metadata, sec_trust_t trust_ref,
   1721                                                         sec_protocol_verify_complete_t complete) {
   1722                                                           (void) metadata;
   1723                                                           (void) trust_ref;
   1724                                                           const bool valid = true;
   1725                                                           complete(valid);
   1726                                                       }, ioloop_main_queue);
   1727                 nw_release(sec_options);
   1728             };
   1729         }
   1730 
   1731         parameters = nw_parameters_create_secure_tcp(configure_tls, NW_PARAMETERS_DEFAULT_CONFIGURATION);
   1732     } else {
   1733         if (tls) {
   1734             ERROR("DTLS support not implemented.");
   1735             nw_release(endpoint);
   1736             RELEASE_HERE(connection, comm);
   1737             return NULL;
   1738         }
   1739         parameters = nw_parameters_create_secure_udp(NW_PARAMETERS_DISABLE_PROTOCOL,
   1740                                                      NW_PARAMETERS_DEFAULT_CONFIGURATION);
   1741     }
   1742     if (parameters == NULL) {
   1743         ERROR("No memory for connection parameters.");
   1744         nw_release(endpoint);
   1745         RELEASE_HERE(connection, comm);
   1746         return NULL;
   1747     }
   1748 
   1749     nw_protocol_stack_t protocol_stack = nw_parameters_copy_default_protocol_stack(parameters);
   1750 
   1751     // If user asked for a stable address, set that option.
   1752     if (stable) {
   1753         nw_protocol_options_t ip_options = nw_protocol_stack_copy_internet_protocol(protocol_stack);
   1754         nw_ip_options_set_local_address_preference(ip_options, nw_ip_local_address_preference_stable);
   1755         nw_release(ip_options);
   1756     }
   1757 
   1758     // Only set TCP options for TCP connections.
   1759     if (stream) {
   1760         nw_protocol_options_t tcp_options = nw_protocol_stack_copy_transport_protocol(protocol_stack);
   1761         nw_tcp_options_set_no_delay(tcp_options, true);
   1762         nw_tcp_options_set_enable_keepalive(tcp_options, true);
   1763         nw_release(tcp_options);
   1764     }
   1765     nw_release(protocol_stack);
   1766 
   1767     connection->name = strdup(addrbuf);
   1768 
   1769     // Create the nw_connection_t.
   1770     connection->connection = nw_connection_create(endpoint, parameters);
   1771     nw_connection_created++;
   1772     nw_release(endpoint);
   1773     nw_release(parameters);
   1774     if (connection->connection == NULL) {
   1775         ERROR("no memory for nw_connection object");
   1776         RELEASE_HERE(connection, comm);
   1777         return NULL;
   1778     }
   1779 
   1780     connection->datagram_callback = datagram_callback;
   1781     connection->connected = connected;
   1782     connection->disconnected = disconnected;
   1783     connection->finalize = finalize;
   1784     connection->tcp_stream = stream;
   1785     connection->opportunistic = opportunistic;
   1786     connection->context = context;
   1787     RETAIN_HERE(connection, comm); // The connection state changed handler has a reference to the connection.
   1788     nw_connection_set_state_changed_handler(connection->connection,
   1789                                             ^(nw_connection_state_t state, nw_error_t error)
   1790                                             { ioloop_connection_state_changed(connection, state, error); });
   1791     nw_connection_set_queue(connection->connection, ioloop_main_queue);
   1792     nw_connection_start(connection->connection);
   1793     return connection;
   1794 }
   1795 
   1796 static void
   1797 subproc_finalize(subproc_t *subproc)
   1798 {
   1799     int i;
   1800     for (i = 0; i < subproc->argc; i++) {
   1801         if (subproc->argv[i] != NULL) {
   1802             free(subproc->argv[i]);
   1803             subproc->argv[i] = NULL;
   1804         }
   1805     }
   1806     if (subproc->dispatch_source != NULL) {
   1807         dispatch_release(subproc->dispatch_source);
   1808     }
   1809     if (subproc->output_fd != NULL) {
   1810         ioloop_file_descriptor_release(subproc->output_fd);
   1811     }
   1812     if (subproc->finalize != NULL) {
   1813         subproc->finalize(subproc->context);
   1814     }
   1815     free(subproc);
   1816 }
   1817 
   1818 static void subproc_cancel(void *context)
   1819 {
   1820     subproc_t *subproc = context;
   1821     subproc->dispatch_source = NULL;
   1822     RELEASE_HERE(subproc, subproc);
   1823 }
   1824 
   1825 static void
   1826 subproc_event(void *context)
   1827 {
   1828     subproc_t *subproc = context;
   1829     pid_t pid;
   1830     int status;
   1831 
   1832     pid = waitpid(subproc->pid, &status, WNOHANG);
   1833     if (pid <= 0) {
   1834         return;
   1835     }
   1836     subproc->callback(subproc, status, NULL);
   1837     if (!WIFSTOPPED(status)) {
   1838         dispatch_source_cancel(subproc->dispatch_source);
   1839     }
   1840 }
   1841 
   1842 static void
   1843 subproc_output_finalize(void *context)
   1844 {
   1845     subproc_t *subproc = context;
   1846     RELEASE_HERE(subproc, subproc);
   1847 }
   1848 
   1849 void
   1850 ioloop_subproc_release_(subproc_t *subproc, const char *file, int line)
   1851 {
   1852     RELEASE(subproc, subproc);
   1853 }
   1854 
   1855 // Invoke the specified executable with the specified arguments.   Call callback when it exits.
   1856 // All failures are reported through the callback.
   1857 subproc_t *
   1858 ioloop_subproc(const char *exepath, char *NULLABLE *argv, int argc,
   1859                subproc_callback_t callback, io_callback_t output_callback, void *context)
   1860 {
   1861     subproc_t *subproc;
   1862     int i, rv;
   1863     posix_spawn_file_actions_t actions;
   1864     posix_spawnattr_t attrs;
   1865 
   1866     if (callback == NULL) {
   1867         ERROR("ioloop_add_wake_event called with null callback");
   1868         return NULL;
   1869     }
   1870 
   1871     if (argc > MAX_SUBPROC_ARGS) {
   1872         callback(NULL, 0, "too many subproc args");
   1873         return NULL;
   1874     }
   1875 
   1876     subproc = calloc(1, sizeof *subproc);
   1877     if (subproc == NULL) {
   1878         callback(NULL, 0, "out of memory");
   1879         return NULL;
   1880     }
   1881     RETAIN_HERE(subproc, subproc); // For the create rule
   1882     if (output_callback != NULL) {
   1883         rv = pipe(subproc->pipe_fds);
   1884         if (rv < 0) {
   1885             callback(NULL, 0, "unable to create pipe.");
   1886             RELEASE_HERE(subproc, subproc);
   1887             return NULL;
   1888         }
   1889         subproc->output_fd = ioloop_file_descriptor_create(subproc->pipe_fds[0], subproc, subproc_output_finalize);
   1890         RETAIN_HERE(subproc, subproc); // For the file descriptor
   1891         if (subproc->output_fd == NULL) {
   1892             callback(NULL, 0, "out of memory.");
   1893             close(subproc->pipe_fds[0]);
   1894             close(subproc->pipe_fds[1]);
   1895             RELEASE_HERE(subproc, subproc);
   1896             return NULL;
   1897         }
   1898     }
   1899 
   1900     subproc->argv[0] = strdup(exepath);
   1901     if (subproc->argv[0] == NULL) {
   1902         RELEASE_HERE(subproc, subproc);
   1903         callback(NULL, 0, "out of memory");
   1904         return NULL;
   1905     }
   1906     subproc->argc++;
   1907     for (i = 0; i < argc; i++) {
   1908         subproc->argv[i + 1] = strdup(argv[i]);
   1909         if (subproc->argv[i + 1] == NULL) {
   1910             RELEASE_HERE(subproc, subproc);
   1911             callback(NULL, 0, "out of memory");
   1912             return NULL;
   1913         }
   1914         subproc->argc++;
   1915     }
   1916 
   1917     // Set up for posix_spawn
   1918     posix_spawn_file_actions_init(&actions);
   1919     if (output_callback != NULL) {
   1920         posix_spawn_file_actions_adddup2(&actions, subproc->pipe_fds[1], STDOUT_FILENO);
   1921         posix_spawn_file_actions_addclose(&actions, subproc->pipe_fds[0]);
   1922         posix_spawn_file_actions_addclose(&actions, subproc->pipe_fds[1]);
   1923     }
   1924     posix_spawnattr_init(&attrs);
   1925     extern char **environ;
   1926     rv = posix_spawn(&subproc->pid, exepath, &actions, &attrs, subproc->argv, environ);
   1927     posix_spawn_file_actions_destroy(&actions);
   1928     posix_spawnattr_destroy(&attrs);
   1929     if (rv < 0) {
   1930         ERROR("posix_spawn failed for " PUB_S_SRP ": " PUB_S_SRP, subproc->argv[0], strerror(errno));
   1931         callback(subproc, 0, strerror(errno));
   1932         RELEASE_HERE(subproc, subproc);
   1933         return NULL;
   1934     }
   1935     subproc->callback = callback;
   1936     subproc->context = context;
   1937 
   1938     subproc->dispatch_source = dispatch_source_create(DISPATCH_SOURCE_TYPE_PROC, subproc->pid, DISPATCH_PROC_EXIT,
   1939                                                       ioloop_main_queue);
   1940     if (subproc->dispatch_source == NULL) {
   1941         ERROR("dispatch_source_create failed in ioloop_add_wake_event().");
   1942         return false;
   1943     }
   1944     dispatch_retain(subproc->dispatch_source);
   1945     dispatch_source_set_event_handler_f(subproc->dispatch_source, subproc_event);
   1946     dispatch_source_set_cancel_handler_f(subproc->dispatch_source, subproc_cancel);
   1947     dispatch_set_context(subproc->dispatch_source, subproc);
   1948     dispatch_activate(subproc->dispatch_source);
   1949     RETAIN_HERE(subproc, subproc); // Dispatch has a reference
   1950 
   1951     // Now that we have a viable subprocess, add the reader callback.
   1952     if (output_callback != NULL && subproc->output_fd != NULL) {
   1953         close(subproc->pipe_fds[1]);
   1954         ioloop_add_reader(subproc->output_fd, output_callback);
   1955     }
   1956     return subproc;
   1957 }
   1958 
   1959 #ifdef SRP_TEST_SERVER
   1960 void
   1961 ioloop_dnssd_txn_cancel_srp(void *srp_server, dnssd_txn_t *txn)
   1962 {
   1963     if (txn->sdref != NULL) {
   1964         INFO("txn %p serviceref %p", txn, txn->sdref);
   1965         if (srp_server != NULL) {
   1966             dns_service_ref_deallocate(srp_server, txn->sdref);
   1967         } else {
   1968             DNSServiceRefDeallocate(txn->sdref);
   1969         }
   1970         txn->sdref = NULL;
   1971     } else {
   1972         INFO("dead transaction.");
   1973     }
   1974 }
   1975 #endif
   1976 
   1977 void
   1978 ioloop_dnssd_txn_cancel(dnssd_txn_t *txn)
   1979 {
   1980     if (txn->sdref != NULL) {
   1981         INFO("txn %p serviceref %p", txn, txn->sdref);
   1982         DNSServiceRefDeallocate(txn->sdref);
   1983         txn->sdref = NULL;
   1984     } else {
   1985         INFO("dead transaction.");
   1986     }
   1987 }
   1988 
   1989 static void
   1990 dnssd_txn_finalize(dnssd_txn_t *txn)
   1991 {
   1992     if (txn->sdref != NULL) {
   1993         ioloop_dnssd_txn_cancel(txn);
   1994     }
   1995     if (txn->finalize_callback) {
   1996         txn->finalize_callback(txn->context);
   1997     }
   1998     free(txn);
   1999 }
   2000 
   2001 void
   2002 ioloop_dnssd_txn_retain_(dnssd_txn_t *dnssd_txn, const char *file, int line)
   2003 {
   2004     (void)file; (void)line;
   2005     RETAIN(dnssd_txn, dnssd_txn);
   2006 }
   2007 
   2008 void
   2009 ioloop_dnssd_txn_release_(dnssd_txn_t *dnssd_txn, const char *file, int line)
   2010 {
   2011     (void)file; (void)line;
   2012     RELEASE(dnssd_txn, dnssd_txn);
   2013 }
   2014 
   2015 dnssd_txn_t *
   2016 ioloop_dnssd_txn_add_subordinate_(DNSServiceRef ref, void *context, dnssd_txn_finalize_callback_t finalize_callback,
   2017                                   dnssd_txn_failure_callback_t failure_callback,
   2018                                   const char *file, int line)
   2019 {
   2020     dnssd_txn_t *txn = calloc(1, sizeof(*txn));
   2021     (void)file; (void)line;
   2022     (void)failure_callback;
   2023 
   2024     if (txn != NULL) {
   2025         RETAIN(txn, dnssd_txn);
   2026         txn->sdref = ref;
   2027         INFO("txn %p serviceref %p", txn, ref);
   2028         txn->context = context;
   2029         txn->finalize_callback = finalize_callback;
   2030     }
   2031     return txn;
   2032 }
   2033 
   2034 dnssd_txn_t *
   2035 ioloop_dnssd_txn_add_(DNSServiceRef ref, void *context, dnssd_txn_finalize_callback_t finalize_callback,
   2036                       dnssd_txn_failure_callback_t failure_callback,
   2037                       const char *file, int line)
   2038 {
   2039     dnssd_txn_t *txn = ioloop_dnssd_txn_add_subordinate_(ref, context, finalize_callback, failure_callback, file, line);
   2040     if (txn != NULL) {
   2041         DNSServiceSetDispatchQueue(ref, ioloop_main_queue);
   2042     }
   2043     return txn;
   2044 }
   2045 
   2046 
   2047 void
   2048 ioloop_dnssd_txn_set_aux_pointer(dnssd_txn_t *NONNULL txn, void *aux_pointer)
   2049 {
   2050     txn->aux_pointer = aux_pointer;
   2051 }
   2052 
   2053 void *
   2054 ioloop_dnssd_txn_get_aux_pointer(dnssd_txn_t *NONNULL txn)
   2055 {
   2056     return txn->aux_pointer;
   2057 }
   2058 
   2059 void *
   2060 ioloop_dnssd_txn_get_context(dnssd_txn_t *NONNULL txn)
   2061 {
   2062     return txn->context;
   2063 }
   2064 
   2065 
   2066 static void
   2067 file_descriptor_finalize(void *context)
   2068 {
   2069     io_t *file_descriptor = context;
   2070     if (file_descriptor->ref_count == 0) {
   2071         if (file_descriptor->finalize) {
   2072             file_descriptor->finalize(file_descriptor->context);
   2073         }
   2074         free(file_descriptor);
   2075     }
   2076 }
   2077 
   2078 void
   2079 ioloop_file_descriptor_retain_(io_t *file_descriptor, const char *file, int line)
   2080 {
   2081     (void)file; (void)line;
   2082     RETAIN(file_descriptor, file_descriptor);
   2083 }
   2084 
   2085 void
   2086 ioloop_file_descriptor_release_(io_t *file_descriptor, const char *file, int line)
   2087 {
   2088     (void)file; (void)line;
   2089     RELEASE(file_descriptor, file_descriptor);
   2090 }
   2091 
   2092 io_t *
   2093 ioloop_file_descriptor_create_(int fd, void *context, finalize_callback_t finalize, const char *file, int line)
   2094 {
   2095     io_t *ret;
   2096     ret = calloc(1, sizeof(*ret));
   2097     if (ret) {
   2098         ret->fd = fd;
   2099         ret->context = context;
   2100         ret->finalize = finalize;
   2101         RETAIN(ret, file_descriptor);
   2102     }
   2103     return ret;
   2104 }
   2105 
   2106 static void
   2107 ioloop_read_source_finalize(void *context)
   2108 {
   2109     io_t *io = context;
   2110 
   2111     INFO("io %p fd %d, read source %p, write_source %p", io, io->fd, io->read_source, io->write_source);
   2112 
   2113     // Release the reference count that dispatch was holding.
   2114     if (io->is_static) {
   2115         if (io->context_release != NULL) {
   2116             io->context_release(io->context);
   2117         }
   2118         FINALIZED(file_descriptor_finalized);
   2119     } else {
   2120         RELEASE_HERE(io, file_descriptor);
   2121     }
   2122 }
   2123 
   2124 static void
   2125 ioloop_read_source_cancel_callback(void *context)
   2126 {
   2127     io_t *io = context;
   2128 
   2129     INFO("io %p fd %d, read source %p, write_source %p", io, io->fd, io->read_source, io->write_source);
   2130     if (io->read_source != NULL) {
   2131         dispatch_release(io->read_source);
   2132         io->read_source = NULL;
   2133         if (io->fd != -1) {
   2134             close(io->fd);
   2135             io->fd = -1;
   2136         } else {
   2137             FAULT("io->fd has been set to -1 too early");
   2138         }
   2139     }
   2140 }
   2141 
   2142 static void
   2143 ioloop_read_event(void *context)
   2144 {
   2145     io_t *io = context;
   2146 
   2147     if (io->read_callback != NULL) {
   2148         io->read_callback(io, io->context);
   2149     }
   2150 }
   2151 
   2152 void
   2153 ioloop_close(io_t *io)
   2154 {
   2155     INFO("io %p fd %d, read source %p, write_source %p", io, io->fd, io->read_source, io->write_source);
   2156     if (io->read_source != NULL) {
   2157         dispatch_cancel(io->read_source);
   2158     }
   2159     if (io->write_source != NULL) {
   2160         dispatch_cancel(io->write_source);
   2161     }
   2162 }
   2163 
   2164 void
   2165 ioloop_add_reader(io_t *NONNULL io, io_callback_t NONNULL callback)
   2166 {
   2167     io->read_callback = callback;
   2168     if (io->read_source == NULL) {
   2169         io->read_source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, io->fd, 0, ioloop_main_queue);
   2170     }
   2171     if (io->read_source == NULL) {
   2172         ERROR("dispatch_source_create: unable to create read dispatch source.");
   2173         return;
   2174     }
   2175     dispatch_source_set_event_handler_f(io->read_source, ioloop_read_event);
   2176     dispatch_source_set_cancel_handler_f(io->read_source, ioloop_read_source_cancel_callback);
   2177     dispatch_set_finalizer_f(io->read_source, ioloop_read_source_finalize);
   2178     dispatch_set_context(io->read_source, io);
   2179     RETAIN_HERE(io, file_descriptor); // Dispatch will hold a reference.
   2180     dispatch_resume(io->read_source);
   2181     INFO("io %p fd %d, read source %p, write_source %p", io, io->fd, io->read_source, io->write_source);
   2182 }
   2183 
   2184 void
   2185 ioloop_run_async(async_callback_t callback, void *context)
   2186 {
   2187     dispatch_async(ioloop_main_queue, ^{
   2188             callback(context);
   2189         });
   2190 }
   2191 
   2192 const struct sockaddr *
   2193 connection_get_local_address(message_t *message)
   2194 {
   2195     if (message == NULL) {
   2196         ERROR("message is NULL.");
   2197         return NULL;
   2198     }
   2199     return &message->local.sa;
   2200 }
   2201 
   2202 bool
   2203 ioloop_is_device_apple_tv(void)
   2204 {
   2205     return IsAppleTV();
   2206 }
   2207 
   2208 // Local Variables:
   2209 // mode: C
   2210 // tab-width: 4
   2211 // c-file-style: "bsd"
   2212 // c-basic-offset: 4
   2213 // fill-column: 108
   2214 // indent-tabs-mode: nil
   2215 // End:
   2216