macos-ioloop.c revision 1.1.1.1 1 /* macos-ioloop.c
2 *
3 * Copyright (c) 2018-2024 Apple Inc. All rights reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * https://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 * Simple event dispatcher for DNS.
18 */
19
20 #define _GNU_SOURCE
21
22 #include <stdlib.h>
23 #include <string.h>
24 #include <stdio.h>
25 #include <unistd.h>
26 #include <sys/uio.h>
27 #include <errno.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <sys/wait.h>
32 #include <fcntl.h>
33 #include <sys/time.h>
34 #include <signal.h>
35 #include <net/if.h>
36 #include <ifaddrs.h>
37 #include <dns_sd.h>
38
39 #include <CoreUtils/SystemUtils.h> // For `IsAppleTV()`.
40 #include <dispatch/dispatch.h>
41
42 #include "srp.h"
43 #include "dns-msg.h"
44 #include "srp-crypto.h"
45 #include "ioloop.h"
46 #include "tls-macos.h"
47 #include "tls-keychain.h"
48 #include "srp-dnssd.h"
49 #include "ifpermit.h"
50
51 dispatch_queue_t ioloop_main_queue;
52 static int cur_connection_serial;
53
54 // Forward references
55 static void ioloop_tcp_input_start(comm_t *NONNULL connection);
56 static void listener_finalize(comm_t *listener);
57 static bool connection_write_now(comm_t *NONNULL connection);
58 static bool ioloop_listener_connection_ready(comm_t *connection);
59
60 #define DSCP_CS5 0x28
61
62 int
63 getipaddr(addr_t *addr, const char *p)
64 {
65 if (inet_pton(AF_INET, p, &addr->sin.sin_addr)) {
66 addr->sa.sa_family = AF_INET;
67 #ifndef NOT_HAVE_SA_LEN
68 addr->sa.sa_len = sizeof addr->sin;
69 #endif
70 return sizeof addr->sin;
71 } else if (inet_pton(AF_INET6, p, &addr->sin6.sin6_addr)) {
72 addr->sa.sa_family = AF_INET6;
73 #ifndef NOT_HAVE_SA_LEN
74 addr->sa.sa_len = sizeof addr->sin6;
75 #endif
76 return sizeof addr->sin6;
77 } else {
78 return 0;
79 }
80 }
81
82 int64_t
83 ioloop_timenow(void)
84 {
85 int64_t now;
86 struct timeval tv;
87 gettimeofday(&tv, 0);
88 now = (int64_t)tv.tv_sec * 1000 + (int64_t)tv.tv_usec / 1000;
89 return now;
90 }
91
92 static void
93 wakeup_event(void *context)
94 {
95 wakeup_t *wakeup = context;
96 void *wakeup_context = wakeup->context;
97 finalize_callback_t wakeup_finalize = wakeup->finalize;
98 wakeup->context = NULL;
99 wakeup->finalize = NULL;
100
101 // All ioloop wakeups are one-shot.
102 ioloop_cancel_wake_event(wakeup);
103
104 // Call the callback, which mustn't be null.
105 wakeup->wakeup(wakeup_context);
106
107 // We have to call the finalize callback after the event has been delivered, in case we hold the only reference
108 // on the object.
109 if (wakeup_context != NULL && wakeup_finalize != NULL) {
110 wakeup_finalize(wakeup_context);
111 }
112 }
113
114 static void
115 wakeup_finalize(void *context)
116 {
117 wakeup_t *wakeup = context;
118 if (wakeup->ref_count == 0) {
119 if (wakeup->dispatch_source != NULL) {
120 dispatch_release(wakeup->dispatch_source);
121 wakeup->dispatch_source = NULL;
122 }
123 void *wakeup_context = wakeup->context;
124 finalize_callback_t wakeup_finalize = wakeup->finalize;
125 wakeup->finalize = NULL;
126 wakeup->context = NULL;
127 if (wakeup_finalize != NULL && wakeup_context != NULL) {
128 wakeup_finalize(wakeup_context);
129 }
130 free(wakeup);
131 }
132 }
133
134 void
135 ioloop_wakeup_retain_(wakeup_t *wakeup, const char *file, int line)
136 {
137 (void)file; (void)line;
138 RETAIN(wakeup, wakeup);
139 }
140
141 void
142 ioloop_wakeup_release_(wakeup_t *wakeup, const char *file, int line)
143 {
144 (void)file; (void)line;
145 RELEASE(wakeup, wakeup);
146 }
147
148 wakeup_t *
149 ioloop_wakeup_create_(const char *file, int line)
150 {
151 wakeup_t *ret = calloc(1, sizeof(*ret));
152 if (ret) {
153 RETAIN(ret, wakeup);
154 }
155 return ret;
156 }
157
158 bool
159 ioloop_add_wake_event(wakeup_t *wakeup, void *context, wakeup_callback_t callback, wakeup_callback_t finalize,
160 int32_t milliseconds)
161 {
162 if (callback == NULL) {
163 ERROR("ioloop_add_wake_event called with null callback");
164 return false;
165 }
166 if (milliseconds < 0) {
167 ERROR("ioloop_add_wake_event called with negative timeout");
168 return false;
169 }
170 if (wakeup->dispatch_source != NULL) {
171 ioloop_cancel_wake_event(wakeup);
172 }
173 wakeup->wakeup = callback;
174 wakeup->context = context;
175 wakeup->finalize = finalize;
176
177 wakeup->dispatch_source = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, ioloop_main_queue);
178 if (wakeup->dispatch_source == NULL) {
179 ERROR("dispatch_source_create failed in ioloop_add_wake_event().");
180 return false;
181 }
182 dispatch_source_set_event_handler_f(wakeup->dispatch_source, wakeup_event);
183 dispatch_set_context(wakeup->dispatch_source, wakeup);
184
185 // libdispatch doesn't allow events that are scheduled to happen right now. But it is actually useful to be
186 // able to trigger an event to happen immediately, and this is the easiest way to do it from ioloop-we
187 // can't rely on just scheduling an asynchronous event on an event loop because that's specific to Mac.
188 if (milliseconds <= 0) {
189 ERROR("ioloop_add_wake_event: milliseconds = %d", milliseconds);
190 milliseconds = 10;
191 }
192 dispatch_source_set_timer(wakeup->dispatch_source,
193 dispatch_time(DISPATCH_TIME_NOW, milliseconds * NSEC_PER_SEC / 1000),
194 milliseconds * NSEC_PER_SEC / 1000, NSEC_PER_SEC / 100);
195 dispatch_resume(wakeup->dispatch_source);
196
197 return true;
198 }
199
200 void
201 ioloop_cancel_wake_event(wakeup_t *wakeup)
202 {
203 if (wakeup != NULL) {
204 if (wakeup->dispatch_source != NULL) {
205 dispatch_source_cancel(wakeup->dispatch_source);
206 dispatch_release(wakeup->dispatch_source);
207 wakeup->dispatch_source = NULL;
208 }
209 if (wakeup->context != NULL) {
210 void *wakeup_context = wakeup->context;
211 finalize_callback_t wakeup_finalize = wakeup->finalize;
212 wakeup->context = NULL;
213 wakeup->finalize = NULL;
214 if (wakeup_finalize != NULL && wakeup_context != NULL) {
215 wakeup_finalize(wakeup_context);
216 }
217 }
218 }
219 }
220
221 bool
222 ioloop_init(void)
223 {
224 ioloop_main_queue = dispatch_get_main_queue();
225 dispatch_retain(ioloop_main_queue);
226 return true;
227 }
228
229 int
230 ioloop(void)
231 {
232 dispatch_main();
233 return 0;
234 }
235
236 #define connection_cancel(comm, conn) connection_cancel_(comm, conn, __FILE__, __LINE__)
237 static void
238 connection_cancel_(comm_t *comm, nw_connection_t connection, const char *file, int line)
239 {
240 if (connection == NULL) {
241 INFO("null connection at " PUB_S_SRP ":%d", file, line);
242 } else {
243 INFO("%p: " PUB_S_SRP " " PUB_S_SRP ":%d" , connection, comm->canceled ? " (already canceled)" : "", file, line);
244 if (!comm->canceled) {
245 nw_connection_cancel(connection);
246 comm->canceled = true;
247 }
248 }
249 }
250
251 static void
252 comm_finalize(comm_t *comm)
253 {
254 ERROR("comm_finalize");
255 if (comm->connection != NULL) {
256 nw_release(comm->connection);
257 nw_connection_finalized++;
258 comm->connection = NULL;
259 }
260 if (comm->listener != NULL) {
261 nw_release(comm->listener);
262 nw_listener_finalized++;
263 comm->listener = NULL;
264 }
265 if (comm->parameters) {
266 nw_release(comm->parameters);
267 comm->parameters = NULL;
268 }
269 if (comm->pending_write != NULL) {
270 dispatch_release(comm->pending_write);
271 comm->pending_write = NULL;
272 }
273
274 if (comm->listener_state != NULL) {
275 RELEASE_HERE(comm->listener_state, listener);
276 comm->listener_state = NULL;
277 }
278 #if UDP_LISTENER_USES_CONNECTION_GROUPS
279 if (comm->content_context != NULL) {
280 nw_release(comm->content_context);
281 comm->content_context = NULL;
282 }
283 #endif
284
285 // If there is an nw_connection_t or nw_listener_t outstanding, then we will get an asynchronous callback
286 // later on. So we can't actually free the data structure yet, but the good news is that comm_finalize() will
287 // be called again later when the last outstanding asynchronous cancel is done, and then all of the stuff
288 // that follows this will happen.
289 #ifndef __clang_analyzer__
290 if (comm->ref_count > 0) {
291 return;
292 }
293 #endif
294 if (comm->idle_timer != NULL) {
295 ioloop_cancel_wake_event(comm->idle_timer);
296 RELEASE_HERE(comm->idle_timer, wakeup);
297 }
298 if (comm->name != NULL) {
299 free(comm->name);
300 }
301 if (comm->finalize != NULL) {
302 comm->finalize(comm->context);
303 }
304 free(comm);
305 }
306
307 void
308 ioloop_comm_retain_(comm_t *comm, const char *file, int line)
309 {
310 (void)file; (void)line;
311 RETAIN(comm, comm);
312 }
313
314 void
315 ioloop_comm_release_(comm_t *comm, const char *file, int line)
316 {
317 (void)file; (void)line;
318 RELEASE(comm, comm);
319 }
320
321 void
322 ioloop_comm_cancel(comm_t *connection)
323 {
324 if (connection->connection != NULL) {
325 INFO("%p %p", connection, connection->connection);
326 connection_cancel(connection, connection->connection);
327 #if UDP_LISTENER_USES_CONNECTION_GROUPS
328 } else if (connection->connection_group != NULL) {
329 INFO("%p %p", connection, connection->connection_group);
330 nw_connection_group_cancel(connection->connection_group);
331 #else
332 }
333 if (!connection->tcp_stream && connection->connection == NULL) {
334 int fd = connection->io.fd;
335 if (fd != -1) {
336 ioloop_close(&connection->io);
337 if (connection->cancel != NULL) {
338 RETAIN_HERE(connection, listener);
339 dispatch_async(ioloop_main_queue, ^{
340 if (connection->cancel != NULL) {
341 connection->cancel(connection, connection->context);
342 }
343 RELEASE_HERE(connection, listener);
344 });
345 }
346 }
347 #endif // UDP_LISTENER_USES_CONNECTION_GROUPS
348 }
349 if (connection->idle_timer != NULL) {
350 ioloop_cancel_wake_event(connection->idle_timer);
351 }
352 }
353
354 void
355 ioloop_comm_context_set(comm_t *comm, void *context, finalize_callback_t callback)
356 {
357 if (comm->context != NULL && comm->finalize != NULL) {
358 comm->finalize(comm->context);
359 }
360 comm->finalize = callback;
361 comm->context = context;
362 }
363
364 void
365 ioloop_comm_connect_callback_set(comm_t *comm, connect_callback_t callback)
366 {
367 comm->connected = callback;
368 }
369
370 void
371 ioloop_comm_disconnect_callback_set(comm_t *comm, disconnect_callback_t callback)
372 {
373 comm->disconnected = callback;
374 }
375
376 static void
377 message_finalize(message_t *message)
378 {
379 free(message);
380 }
381
382 void
383 ioloop_message_retain_(message_t *message, const char *file, int line)
384 {
385 (void)file; (void)line;
386 RETAIN(message, message);
387 }
388
389 void
390 ioloop_message_release_(message_t *message, const char *file, int line)
391 {
392 (void)file; (void)line;
393 RELEASE(message, message);
394 }
395
396 static bool
397 ioloop_send_message_inner(comm_t *connection, message_t *responding_to,
398 struct iovec *iov, int iov_len, bool final, bool send_length)
399 {
400 dispatch_data_t data = NULL, new_data, combined;
401 int i;
402 uint16_t len = 0;
403
404 #ifdef SRP_TEST_SERVER
405 if (connection->test_send_intercept != NULL) {
406 return connection->test_send_intercept(connection, responding_to, iov, iov_len, final, send_length);
407 }
408 #endif
409
410 // Not needed on OSX because UDP conversations are treated as "connections."
411 #if UDP_LISTENER_USES_CONNECTION_GROUPS
412 (void)responding_to;
413 #else
414 if (!connection->tcp_stream && connection->connection == NULL) {
415 if (connection->io.fd != -1) {
416 return ioloop_udp_send_message(connection, &responding_to->local, &responding_to->src, responding_to->ifindex, iov, iov_len);
417 }
418 return false;
419 }
420 #endif
421
422 if (connection->connection == NULL
423 #if UDP_LISTENER_USES_CONNECTION_GROUPS
424 && connection->content_context == NULL
425 #endif
426 ) {
427 ERROR("no connection");
428 return false;
429 }
430
431 // Create a dispatch_data_t object that contains the data in the iov.
432 for (i = 0; i < iov_len; i++) {
433 new_data = dispatch_data_create(iov[i].iov_base, iov[i].iov_len,
434 ioloop_main_queue, DISPATCH_DATA_DESTRUCTOR_DEFAULT);
435 len += iov[i].iov_len;
436 if (data != NULL) {
437 if (new_data != NULL) {
438 // Subsequent times through
439 combined = dispatch_data_create_concat(data, new_data);
440 dispatch_release(data);
441 dispatch_release(new_data);
442 data = combined;
443 } else {
444 // Fail
445 dispatch_release(data);
446 data = NULL;
447 }
448 } else {
449 // First time through
450 data = new_data;
451 }
452 if (data == NULL) {
453 ERROR("ioloop_send_message: no memory.");
454 return false;
455 }
456 }
457
458 if (len == 0) {
459 if (data) {
460 dispatch_release(data);
461 }
462 ERROR("zero length");
463 return false;
464 }
465
466 // TCP requires a length as well as the payload.
467 if (send_length && connection->tcp_stream) {
468 len = htons(len);
469 new_data = dispatch_data_create(&len, sizeof (len), ioloop_main_queue, DISPATCH_DATA_DESTRUCTOR_DEFAULT);
470 if (new_data == NULL) {
471 if (data != NULL) {
472 dispatch_release(data);
473 }
474 ERROR("no memory for new_data");
475 return false;
476 }
477 // Length is at beginning.
478 combined = dispatch_data_create_concat(new_data, data);
479 dispatch_release(data);
480 dispatch_release(new_data);
481 if (combined == NULL) {
482 ERROR("no memory for combined");
483 return false;
484 }
485 data = combined;
486 }
487
488 if (connection->pending_write != NULL) {
489 ERROR("Dropping pending write on " PRI_S_SRP, connection->name ? connection->name : "<null>");
490 }
491 connection->pending_write = data;
492 connection->final_data = final;
493 if (connection->connection_ready) {
494 return connection_write_now(connection);
495 }
496 return true;
497 }
498
499 bool
500 ioloop_send_message(comm_t *connection, message_t *responding_to, struct iovec *iov, int iov_len)
501 {
502 return ioloop_send_message_inner(connection, responding_to, iov, iov_len, false, true);
503 }
504
505 bool
506 ioloop_send_final_message(comm_t *connection, message_t *responding_to, struct iovec *iov, int iov_len)
507 {
508 return ioloop_send_message_inner(connection, responding_to, iov, iov_len, true, true);
509 }
510
511 bool
512 ioloop_send_data(comm_t *connection, message_t *responding_to, struct iovec *iov, int iov_len)
513 {
514 return ioloop_send_message_inner(connection, responding_to, iov, iov_len, false, false);
515 }
516
517 bool
518 ioloop_send_final_data(comm_t *connection, message_t *responding_to, struct iovec *iov, int iov_len)
519 {
520 return ioloop_send_message_inner(connection, responding_to, iov, iov_len, true, false);
521 }
522
523 #if UDP_LISTENER_USES_CONNECTION_GROUPS
524 // For UDP messages, the context is only going to be used for one reply, so when the reply is sent, call the
525 // disconnected callback.
526 static void
527 ioloop_disconnect_content_context(void *context)
528 {
529 comm_t *connection = context;
530
531 if (connection->disconnected != NULL) {
532 connection->disconnected(connection, connection->context, 0);
533 }
534 RELEASE_HERE(connection, comm);
535 }
536 #endif // UDP_LISTENER_USES_CONNECTION_GROUPS
537
538 static bool
539 connection_write_now(comm_t *connection)
540 {
541 if (false) {
542 #if UDP_LISTENER_USES_CONNECTION_GROUPS
543 } else if (connection->content_context != NULL) {
544 nw_connection_group_reply(connection->listener_state->connection_group, connection->content_context,
545 NW_CONNECTION_DEFAULT_MESSAGE_CONTEXT, connection->pending_write);
546 if (connection->disconnected != NULL) {
547 RETAIN_HERE(connection, comm);
548 ioloop_run_async(ioloop_disconnect_content_context, connection);
549 }
550 #endif
551 } else {
552 // Retain the connection once for each write that's pending, so that it's never finalized while
553 // there's a write in progress.
554 connection->writes_pending++;
555 RETAIN_HERE(connection, comm);
556 nw_connection_send(connection->connection, connection->pending_write,
557 (connection->final_data
558 ? NW_CONNECTION_FINAL_MESSAGE_CONTEXT
559 : NW_CONNECTION_DEFAULT_MESSAGE_CONTEXT), true,
560 ^(nw_error_t _Nullable error) {
561 if (error != NULL) {
562 ERROR("ioloop_send_message: write failed: " PUB_S_SRP,
563 strerror(nw_error_get_error_code(error)));
564 connection_cancel(connection, connection->connection);
565 }
566 if (connection->writes_pending > 0) {
567 connection->writes_pending--;
568 } else {
569 ERROR("ioloop_send_message: write callback reached with no writes marked pending.");
570 }
571 RELEASE_HERE(connection, comm);
572 });
573 }
574 // nw_connection_send should retain this, so let go of our reference to it.
575 dispatch_release(connection->pending_write);
576 connection->pending_write = NULL;
577 return true;
578 }
579
580 static bool
581 datagram_read(comm_t *connection, size_t length, dispatch_data_t content, nw_error_t error)
582 {
583 message_t *message = NULL;
584 bool ret = true, *retp = &ret;
585
586 if (error != NULL) {
587 ERROR(PUB_S_SRP, strerror(nw_error_get_error_code(error)));
588 ret = false;
589 goto out;
590 }
591 if (length > UINT16_MAX) {
592 ERROR("oversized datagram length %zd", length);
593 ret = false;
594 goto out;
595 }
596 message = ioloop_message_create(length);
597 if (message == NULL) {
598 ERROR("unable to allocate message.");
599 ret = false;
600 goto out;
601 }
602 message->length = (uint16_t)length;
603 dispatch_data_apply(content,
604 ^bool (dispatch_data_t __unused region, size_t offset, const void *buffer, size_t size) {
605 if (message->length < offset + size) {
606 ERROR("data region %zd:%zd is out of range for message length %d",
607 offset, size, message->length);
608 *retp = false;
609 return false;
610 }
611 memcpy(((uint8_t *)&message->wire) + offset, buffer, size);
612 return true;
613 });
614 if (ret == true) {
615 // Set the local address
616 message->local = connection->local;
617
618 #ifdef HEXDUMP_INCOMING_DATAGRAMS
619 uint16_t length = message->length > 8192 ? 8192 : message->length; // Don't dump really big messages
620 for (uint16_t i = 0; i < length; i += 32) {
621 char obuf[256];
622 char *obp = obuf;
623 int left = sizeof(obp) - 1;
624 uint16_t max = message->length - i;
625 if (max > 32) {
626 max = 32;
627 }
628 for (uint16_t j = 0; j < max && left > 0; j += 8) {
629 uint16_t submax = max - j;
630 if (submax > 8) {
631 submax = 8;
632 }
633 for (uint16_t k = 0; k < submax; k++) {
634 snprintf(obp, left, "%02x", ((uint8_t *)&message->wire)[i + j + k]);
635 obp += 2;
636 *obp++ = ' ';
637 left -= 3;
638 }
639 *obp++ = ' ';
640 left--;
641 }
642 *obp = 0;
643 INFO("%03d " PUB_S_SRP, i, obuf);
644 }
645 #endif
646 // Process the message.
647 if (connection->listener_state != NULL) {
648 connection->listener_state->datagram_callback(connection, message, connection->listener_state->context);
649 } else {
650 connection->datagram_callback(connection, message, connection->context);
651 }
652 }
653
654 out:
655 if (message != NULL) {
656 ioloop_message_release(message);
657 }
658 if (!ret && connection->connection != NULL) {
659 connection_cancel(connection, connection->connection);
660 }
661 return ret;
662 }
663
664 static void
665 connection_error_to_string(nw_error_t error, char *errbuf, size_t errbuf_size)
666 {
667 CFErrorRef cfe = NULL;
668 CFStringRef errString = NULL;
669 errbuf[0] = 0;
670 if (error != NULL) {
671 cfe = nw_error_copy_cf_error(error);
672 if (cfe != NULL) {
673 errString = CFErrorCopyDescription(cfe);
674 if (errString != NULL) {
675 CFStringGetCString(errString, errbuf, errbuf_size, kCFStringEncodingUTF8);
676 CFRelease(errString);
677 }
678 CFRelease(cfe);
679 }
680 }
681 if (errbuf[0] == 0) {
682 memcpy(errbuf, "<NULL>", 7);
683 }
684 }
685
686 static bool
687 check_fail(comm_t *connection, size_t length, dispatch_data_t content, nw_error_t error, const char *source)
688 {
689 bool fail = false;
690 INFO(PRI_S_SRP ": length %zd, content %p, content_length %ld, error %p, source %s",
691 connection->name, length, content, content == NULL ? -1 : (long)dispatch_data_get_size(content), error, source);
692 if (error != NULL) {
693 fail = true;
694 } else if (connection->connection == NULL) {
695 fail = true;
696 } else if (content == NULL) {
697 ERROR("no content returned in " PUB_S_SRP ": connection must have dropped unexpectedly for " PRI_S_SRP,
698 source, connection->name);
699 fail = true;
700 } else if (dispatch_data_get_size(content) != length) {
701 ERROR("short content returned in " PUB_S_SRP ": %zd != %zd: connection must have dropped unexpectedly for " PRI_S_SRP,
702 source, length, dispatch_data_get_size(content), connection->name);
703 fail = true;
704 }
705 if (fail) {
706 if (connection->connection != NULL) {
707 connection_cancel(connection, connection->connection);
708 }
709 }
710 return fail;
711 }
712
713 static void
714 tcp_read(comm_t *connection, size_t length, dispatch_data_t content, nw_error_t error)
715 {
716 if (check_fail(connection, length, content, error, "tcp_read")) {
717 return;
718 }
719 if (datagram_read(connection, length, content, error)) {
720 // Wait for the next frame
721 ioloop_tcp_input_start(connection);
722 }
723 }
724
725 static void
726 tcp_read_length(comm_t *connection, dispatch_data_t content, nw_error_t error)
727 {
728 size_t length;
729 uint32_t bytes_to_read;
730 const uint8_t *lenbuf;
731 dispatch_data_t map;
732
733 if (check_fail(connection, 2, content, error, "tcp_read_length")) {
734 return;
735 }
736
737 map = dispatch_data_create_map(content, (const void **)&lenbuf, &length);
738 if (map == NULL) {
739 ERROR("tcp_read_length: map create failed");
740 connection_cancel(connection, connection->connection);
741 return;
742 }
743 dispatch_release(map);
744 bytes_to_read = ((unsigned)(lenbuf[0]) << 8) | ((unsigned)lenbuf[1]);
745 RETAIN_HERE(connection, comm);
746 nw_connection_receive(connection->connection, bytes_to_read, bytes_to_read,
747 ^(dispatch_data_t new_content, nw_content_context_t __unused new_context,
748 bool __unused is_complete, nw_error_t new_error) {
749 if (new_error) {
750 char errbuf[512];
751 connection_error_to_string(new_error, errbuf, sizeof(errbuf));
752 INFO("%p: " PUB_S_SRP, connection, errbuf);
753 goto out;
754 }
755 tcp_read(connection, bytes_to_read, new_content, new_error);
756 out:
757 RELEASE_HERE(connection, comm);
758 });
759 }
760
761 static bool
762 ioloop_connection_input_badness_check(comm_t *connection, dispatch_data_t content, bool is_complete, nw_error_t error)
763 {
764 if (error) {
765 char errbuf[512];
766 connection_error_to_string(error, errbuf, sizeof(errbuf));
767 INFO("%p: " PUB_S_SRP, connection, errbuf);
768 return true;
769 }
770
771 // For TCP connections, is_complete means the other end closed the connection.
772 if (connection->tcp_stream && is_complete) {
773 INFO("remote end closed connection.");
774 connection_cancel(connection, connection->connection);
775 return true;
776 }
777
778 if (content == NULL) {
779 INFO("remote end closed connection.");
780 connection_cancel(connection, connection->connection);
781 return true;
782 }
783 return false;
784 }
785
786 static void
787 ioloop_tcp_input_start(comm_t *connection)
788 {
789 if (connection->connection == NULL) {
790 return;
791 }
792
793 RETAIN_HERE(connection, comm); // nw_connection_receive callback retains connection
794 nw_connection_receive(connection->connection, 2, 2,
795 ^(dispatch_data_t content, nw_content_context_t __unused context,
796 bool is_complete, nw_error_t error) {
797 if (!ioloop_connection_input_badness_check(connection, content, is_complete, error)) {
798 tcp_read_length(connection, content, error);
799 }
800 RELEASE_HERE(connection, comm);
801 });
802 }
803
804 static void
805 ioloop_udp_input_start(comm_t *connection)
806 {
807 RETAIN_HERE(connection, comm); // nw_connection_receive callback retains connection
808 nw_connection_receive_message(connection->connection,
809 ^(dispatch_data_t content, nw_content_context_t __unused context,
810 bool __unused is_complete, nw_error_t error) {
811 if (!ioloop_connection_input_badness_check(connection, content, is_complete, error)) {
812 if (datagram_read(connection, dispatch_data_get_size(content), content, error)) {
813 ioloop_udp_input_start(connection);
814 }
815 }
816 RELEASE_HERE(connection, comm);
817 });
818 }
819
820 static void
821 ioloop_connection_state_changed(comm_t *connection, nw_connection_state_t state, nw_error_t error)
822 {
823 char errbuf[512];
824 connection_error_to_string(error, errbuf, sizeof(errbuf));
825
826 if (state == nw_connection_state_ready) {
827 if (connection->server) {
828 if (!ioloop_listener_connection_ready(connection)) {
829 ioloop_comm_cancel(connection);
830 return;
831 }
832 }
833 INFO(PRI_S_SRP " (%p %p) state is ready; error = " PUB_S_SRP,
834 connection->name != NULL ? connection->name : "<no name>", connection, connection->connection, errbuf);
835 // Set up a reader.
836 if (connection->tcp_stream) {
837 ioloop_tcp_input_start(connection);
838 } else {
839 ioloop_udp_input_start(connection);
840 }
841 connection->connection_ready = true;
842 // If there's a write pending, send it now.
843 if (connection->pending_write) {
844 connection_write_now(connection);
845 }
846 if (connection->connected != NULL) {
847 connection->connected(connection, connection->context);
848 }
849 } else if (state == nw_connection_state_failed || state == nw_connection_state_waiting) {
850 // Waiting is equivalent to failed because we are not giving libnetcore enough information to
851 // actually succeed when there is a problem connecting (e.g. "EHOSTUNREACH").
852 INFO(PRI_S_SRP " (%p %p) state is " PUB_S_SRP "; error = " PUB_S_SRP,
853 connection->name != NULL ? connection->name : "<no name>", connection, connection->connection,
854 state == nw_connection_state_failed ? "failed" : "waiting", errbuf);
855 connection_cancel(connection, connection->connection);
856 } else if (state == nw_connection_state_cancelled) {
857 INFO(PRI_S_SRP " (%p %p) state is canceled; error = " PUB_S_SRP,
858 connection->name != NULL ? connection->name : "<no name>", connection, connection->connection, errbuf);
859 if (connection->disconnected != NULL) {
860 connection->disconnected(connection, connection->context, 0);
861 }
862 // This releases the final reference to the connection object, which was held by the nw_connection_t.
863 RELEASE_HERE(connection, comm);
864 } else {
865 if (error != NULL) {
866 // We can get here if e.g. the TLS handshake fails.
867 connection_cancel(connection, connection->connection);
868 }
869 INFO(PRI_S_SRP " (%p %p) state is %d; error = " PUB_S_SRP,
870 connection->name != NULL ? connection->name : "<no name>", connection, connection->connection, state, errbuf);
871 }
872 }
873
874 static void
875 ioloop_connection_get_address_from_endpoint(addr_t *addr, nw_endpoint_t endpoint)
876 {
877 nw_endpoint_type_t endpoint_type = nw_endpoint_get_type(endpoint);
878 if (endpoint_type == nw_endpoint_type_address) {
879 char *address_string = nw_endpoint_copy_address_string(endpoint);
880 if (address_string == NULL) {
881 ERROR("unable to get description of new connection.");
882 } else {
883 getipaddr(addr, address_string);
884 if (addr->sa.sa_family == AF_INET6) {
885 SEGMENTED_IPv6_ADDR_GEN_SRP(&addr->sin6.sin6_addr, rdata_buf);
886 INFO("parsed connection local IPv6 address is: " PRI_SEGMENTED_IPv6_ADDR_SRP,
887 SEGMENTED_IPv6_ADDR_PARAM_SRP(&addr->sin6.sin6_addr, rdata_buf));
888 } else {
889 IPv4_ADDR_GEN_SRP(&addr->sin.sin_addr, rdata_buf);
890 INFO("parsed connection local IPv4 address is: " PRI_IPv4_ADDR_SRP,
891 IPv4_ADDR_PARAM_SRP(&addr->sin.sin_addr, rdata_buf));
892 }
893 }
894 free(address_string);
895 }
896 }
897
898 static void
899 ioloop_connection_set_name_from_endpoint(comm_t *connection, nw_endpoint_t endpoint)
900 {
901 nw_endpoint_type_t endpoint_type = nw_endpoint_get_type(endpoint);
902 if (endpoint_type == nw_endpoint_type_address) {
903 char *port_string = nw_endpoint_copy_port_string(endpoint);
904 char *address_string = nw_endpoint_copy_address_string(endpoint);
905 if (port_string == NULL || address_string == NULL) {
906 ERROR("Unable to get description of new connection.");
907 } else {
908 const char *listener_name = connection->name == NULL ? "bogus" : connection->name;
909 char *free_name = connection->name;
910 connection->name = NULL;
911 asprintf(&connection->name, "%s connection from %s/%s", listener_name, address_string, port_string);
912 if (free_name != NULL) {
913 free(free_name);
914 free_name = NULL;
915 listener_name = NULL;
916 }
917 getipaddr(&connection->address, address_string);
918 if (connection->address.sa.sa_family == AF_INET6) {
919 SEGMENTED_IPv6_ADDR_GEN_SRP(&connection->address.sin6.sin6_addr, rdata_buf);
920 INFO("parsed connection remote IPv6 address is: " PRI_SEGMENTED_IPv6_ADDR_SRP,
921 SEGMENTED_IPv6_ADDR_PARAM_SRP(&connection->address.sin6.sin6_addr, rdata_buf));
922 } else {
923 IPv4_ADDR_GEN_SRP(&connection->address.sin.sin_addr, rdata_buf);
924 INFO("parsed connection remote IPv4 address is: " PRI_IPv4_ADDR_SRP,
925 IPv4_ADDR_PARAM_SRP(&connection->address.sin.sin_addr, rdata_buf));
926 }
927 }
928 free(port_string);
929 free(address_string);
930 } else {
931 if (connection->name == NULL) {
932 connection->name = nw_connection_copy_description(connection->connection);
933 }
934 ERROR("incoming connection " PRI_S_SRP " is of unexpected type %d", connection->name, endpoint_type);
935 }
936 }
937
938 #if UDP_LISTENER_USES_CONNECTION_GROUPS
939 static void
940 ioloop_udp_receive(comm_t *listener, dispatch_data_t content, nw_content_context_t context, bool UNUSED is_complete)
941 {
942 bool proceed = true;
943
944 if (content != NULL) {
945 comm_t *response_state = calloc(1, sizeof (*response_state));
946 if (response_state == NULL) {
947 ERROR("%p: " PRI_S_SRP ": no memory for response state.", listener, listener->name);
948 return;
949 }
950 response_state->serial = ++cur_connection_serial;
951 RETAIN_HERE(response_state, comm);
952 response_state->listener_state = listener;
953 RETAIN_HERE(response_state->listener_state, listener);
954 response_state->datagram_callback = listener->datagram_callback;
955 response_state->content_context = context;
956 nw_retain(response_state->content_context);
957 response_state->connection_ready = true;
958 const char *identifier = nw_content_context_get_identifier(context);
959 response_state->name = strdup(identifier);
960 proceed = datagram_read(response_state, dispatch_data_get_size(content), content, NULL);
961 RELEASE_HERE(response_state, comm);
962 }
963 }
964 #else
965 #endif
966
967
968 static bool
969 ioloop_listener_connection_ready(comm_t *connection)
970 {
971
972 nw_endpoint_t endpoint = nw_connection_copy_endpoint(connection->connection);
973 if (endpoint != NULL) {
974 ioloop_connection_set_name_from_endpoint(connection, endpoint);
975 nw_release(endpoint);
976 }
977 if (connection->name != NULL) {
978 INFO("Received connection from " PRI_S_SRP, connection->name);
979 } else {
980 ERROR("Unable to get description of new connection.");
981 connection->name = strdup("unidentified");
982 }
983
984 // Best effort
985 nw_endpoint_t local_endpoint = nw_connection_copy_connected_local_endpoint(connection->connection);
986 if (local_endpoint != NULL) {
987 ioloop_connection_get_address_from_endpoint(&connection->local, endpoint);
988 nw_release(local_endpoint);
989 }
990
991 if (connection->connected != NULL) {
992 connection->connected(connection, connection->context);
993 }
994 return true;
995 }
996
997 static void
998 ioloop_listener_connection_callback(comm_t *listener, nw_connection_t new_connection)
999 {
1000 nw_connection_set_queue(new_connection, ioloop_main_queue);
1001 nw_connection_start(new_connection);
1002
1003 comm_t *connection = calloc(1, sizeof *connection);
1004 if (connection == NULL) {
1005 ERROR("Unable to receive connection: no memory.");
1006 nw_connection_cancel(new_connection);
1007 return;
1008 }
1009 connection->serial = ++cur_connection_serial;
1010
1011 connection->connection = new_connection;
1012 nw_retain(connection->connection);
1013 nw_connection_created++;
1014
1015 connection->name = strdup(listener->name);
1016 connection->datagram_callback = listener->datagram_callback;
1017 connection->tcp_stream = listener->tcp_stream;
1018 connection->server = true;
1019 connection->context = listener->context;
1020 connection->connected = listener->connected;
1021 RETAIN_HERE(connection, comm); // The connection state changed handler has a reference to the connection.
1022 nw_connection_set_state_changed_handler(connection->connection,
1023 ^(nw_connection_state_t state, nw_error_t error)
1024 { ioloop_connection_state_changed(connection, state, error); });
1025 INFO("started " PRI_S_SRP, connection->name);
1026 }
1027
1028 static void
1029 listener_finalize(comm_t *listener)
1030 {
1031 if (listener->listener != NULL) {
1032 nw_release(listener->listener);
1033 nw_listener_finalized++;
1034 listener->listener = NULL;
1035 }
1036 #if UDP_LISTENER_USES_CONNECTION_GROUPS
1037 if (listener->connection_group) {
1038 nw_release(listener->connection_group);
1039 listener->connection_group = NULL;
1040 }
1041 #endif
1042 if (listener->name != NULL) {
1043 free(listener->name);
1044 }
1045 if (listener->parameters) {
1046 nw_release(listener->parameters);
1047 }
1048 if (listener->avoid_ports != NULL) {
1049 free(listener->avoid_ports);
1050 }
1051 if (listener->finalize) {
1052 listener->finalize(listener->context);
1053 }
1054 free(listener);
1055 }
1056
1057 void
1058 ioloop_listener_retain_(comm_t *listener, const char *file, int line)
1059 {
1060 RETAIN(listener, listener);
1061 }
1062
1063 void
1064 ioloop_listener_release_(comm_t *listener, const char *file, int line)
1065 {
1066 RELEASE(listener, listener);
1067 }
1068
1069 static void ioloop_listener_context_release(void *context)
1070 {
1071 comm_t *listener = context;
1072 RELEASE_HERE(listener, listener);
1073 }
1074
1075 void
1076 ioloop_listener_cancel(comm_t *connection)
1077 {
1078 // Only need to do it once.
1079 if (connection->canceled) {
1080 FAULT("cancel on canceled connection " PRI_S_SRP, connection->name);
1081 return;
1082 }
1083 connection->canceled = true;
1084 if (connection->listener != NULL) {
1085 nw_listener_cancel(connection->listener);
1086 // connection->listener will be released in ioloop_listener_state_changed_handler: nw_listener_state_cancelled.
1087 }
1088 #if UDP_LISTENER_USES_CONNECTION_GROUPS
1089 if (connection->connection_group != NULL) {
1090 INFO("%p %p", connection, connection->connection_group);
1091 nw_connection_group_cancel(connection->connection_group);
1092 }
1093 #else
1094 if (!connection->tcp_stream && connection->connection == NULL) {
1095 int fd = connection->io.fd;
1096 if (fd != -1) {
1097 ioloop_close(&connection->io);
1098 if (connection->cancel != NULL) {
1099 RETAIN_HERE(connection, listener);
1100 dispatch_async(ioloop_main_queue, ^{
1101 if (connection->cancel != NULL) {
1102 connection->cancel(connection, connection->context);
1103 }
1104 RELEASE_HERE(connection, listener);
1105 });
1106 }
1107 }
1108 }
1109 #endif
1110 }
1111
1112 #if UDP_LISTENER_USES_CONNECTION_GROUPS
1113 static bool ioloop_udp_listener_setup(comm_t *listener);
1114
1115 static void
1116 ioloop_udp_listener_state_changed_handler(comm_t *listener, nw_connection_group_state_t state, nw_error_t error)
1117 {
1118 int i;
1119
1120 #ifdef DEBUG_VERBOSE
1121 if (listener->connection_group == NULL) {
1122 if (state == nw_listener_state_cancelled) {
1123 INFO("nw_connection_group gets released before the final nw_connection_group_state_cancelled event - name: " PRI_S_SRP,
1124 listener->name);
1125 } else {
1126 ERROR("nw_connection_group gets released before the connection_group is canceled - name: " PRI_S_SRP ", state: %d",
1127 listener->name, state);
1128 }
1129 }
1130 #endif // DEBUG_VERBOSE
1131
1132 // Should never happen.
1133 if (listener->connection_group == NULL && state != nw_connection_group_state_cancelled) {
1134 return;
1135 }
1136
1137 if (error != NULL) {
1138 char errbuf[512];
1139 connection_error_to_string(error, errbuf, sizeof(errbuf));
1140 INFO("state changed: " PUB_S_SRP, errbuf);
1141 if (listener->connection_group != NULL) {
1142 nw_connection_group_cancel(listener->connection_group);
1143 }
1144 } else {
1145 if (state == nw_connection_group_state_waiting) {
1146 INFO("waiting");
1147 return;
1148 } else if (state == nw_connection_group_state_failed) {
1149 INFO("failed");
1150 nw_connection_group_cancel(listener->connection_group);
1151 } else if (state == nw_connection_group_state_ready) {
1152 // It's possible that we might schedule the ready event but then before we return to the run loop
1153 // the listener gets canceled, in which case we don't want to deliver the ready event.
1154 if (listener->canceled) {
1155 INFO("ready but canceled");
1156 return;
1157 }
1158 INFO("ready");
1159 if (listener->avoiding) {
1160 listener->listen_port = nw_connection_group_get_port(listener->connection_group);
1161 if (listener->avoid_ports != NULL) {
1162 for (i = 0; i < listener->num_avoid_ports; i++) {
1163 if (listener->avoid_ports[i] == listener->listen_port) {
1164 INFO("Got port %d, which we are avoiding.",
1165 listener->listen_port);
1166 listener->avoiding = true;
1167 listener->listen_port = 0;
1168 nw_connection_group_cancel(listener->connection_group);
1169 return;
1170 }
1171 }
1172 }
1173 INFO("Got port %d.", listener->listen_port);
1174 listener->avoiding = false;
1175 if (listener->ready) {
1176 listener->ready(listener->context, listener->listen_port);
1177 }
1178 }
1179 } else if (state == nw_connection_group_state_cancelled) {
1180 INFO("cancelled");
1181 nw_release(listener->connection_group);
1182 nw_listener_finalized++;
1183 listener->connection_group = NULL;
1184 if (listener->avoiding) {
1185 if (!ioloop_udp_listener_setup(listener)) {
1186 ERROR("ioloop_listener_state_changed_handler: Unable to recreate listener.");
1187 goto cancel;
1188 } else {
1189 nw_listener_created++;
1190 }
1191 } else {
1192 ;
1193 cancel:
1194 if (listener->cancel) {
1195 listener->cancel(listener, listener->context);
1196 }
1197 RELEASE_HERE(listener, listener);
1198 }
1199 }
1200 }
1201 }
1202 #endif // UDP_LISTENER_USES_CONNECTION_GROUPS
1203
1204 static void
1205 ioloop_listener_state_changed_handler(comm_t *listener, nw_listener_state_t state, nw_error_t error)
1206 {
1207 #ifdef DEBUG_VERBOSE
1208 if (listener->listener == NULL) {
1209 if (state == nw_listener_state_cancelled) {
1210 INFO("nw_listener gets released before the final nw_listener_state_cancelled event - name: " PRI_S_SRP,
1211 listener->name);
1212 } else {
1213 ERROR("nw_listener gets released before the listener is canceled - name: " PRI_S_SRP ", state: %d",
1214 listener->name, state);
1215 }
1216 }
1217 #endif // DEBUG_VERBOSE
1218
1219 INFO("%p %p " PUB_S_SRP " %d", listener, listener->listener, listener->name, state);
1220
1221 // Should never happen.
1222 if (listener->listener == NULL && state != nw_listener_state_cancelled) {
1223 return;
1224 }
1225
1226 if (error != NULL) {
1227 char errbuf[512];
1228 connection_error_to_string(error, errbuf, sizeof(errbuf));
1229 INFO("state changed: " PUB_S_SRP, errbuf);
1230 if (listener->listener != NULL) {
1231 nw_listener_cancel(listener->listener);
1232 }
1233 } else {
1234 if (state == nw_listener_state_waiting) {
1235 INFO("waiting");
1236 return;
1237 } else if (state == nw_listener_state_failed) {
1238 INFO("failed");
1239 nw_listener_cancel(listener->listener);
1240 } else if (state == nw_listener_state_ready) {
1241 INFO("ready");
1242 if (listener->ready != NULL) {
1243 listener->ready(listener->context, listener->listen_port);
1244 }
1245 } else if (state == nw_listener_state_cancelled) {
1246 INFO("cancelled");
1247 nw_release(listener->listener);
1248 nw_listener_finalized++;
1249 listener->listener = NULL;
1250 if (listener->cancel != NULL) {
1251 listener->cancel(listener, listener->context);
1252 }
1253 RELEASE_HERE(listener, listener); // Release the nw_listener handler function's reference to the ioloop listener object.
1254 } else {
1255 INFO("something else");
1256 }
1257 }
1258 }
1259
1260 #if UDP_LISTENER_USES_CONNECTION_GROUPS
1261 static bool
1262 ioloop_udp_listener_setup(comm_t *listener)
1263 {
1264 listener->connection_group = nw_connection_group_create_with_parameters(listener->parameters);
1265 if (listener->connection_group == NULL) {
1266 return false;
1267 }
1268 nw_connection_group_set_state_changed_handler(listener->connection_group,
1269 ^(nw_connection_group_state_t state, nw_error_t error) {
1270 ioloop_udp_listener_state_changed_handler(listener, state, error);
1271 });
1272 nw_connection_group_set_receive_handler(listener->connection_group, DNS_MAX_UDP_PAYLOAD, true,
1273 ^(dispatch_data_t _Nullable content,
1274 nw_content_context_t _Nonnull receive_context, bool is_complete) {
1275 ioloop_udp_receive(listener, content, receive_context, is_complete);
1276 });
1277 RETAIN_HERE(listener, listener); // For the handlers.
1278
1279 // Start the connection group listener
1280 nw_connection_group_set_queue(listener->connection_group, ioloop_main_queue);
1281 nw_connection_group_start(listener->connection_group);
1282 return true;
1283 }
1284 #else
1285 static comm_t *
1286 ioloop_udp_listener_setup(comm_t *listener, const addr_t *ip_address, uint16_t port, const char *launchd_name, int ifindex)
1287 {
1288 sa_family_t family = (ip_address != NULL) ? ip_address->sa.sa_family : AF_UNSPEC;
1289 sa_family_t real_family = family == AF_UNSPEC ? AF_INET6 : family;
1290 int true_flag = 1;
1291 addr_t sockname;
1292 socklen_t sl;
1293 int rv;
1294
1295 listener->address.sa.sa_family = real_family;
1296 listener->address.sa.sa_len = (real_family == AF_INET
1297 ? sizeof(listener->address.sin)
1298 : sizeof(listener->address.sin6));
1299 if (real_family == AF_INET6) {
1300 listener->address.sin6.sin6_port = htons(port);
1301 } else {
1302 listener->address.sin.sin_port = htons(port);
1303 }
1304
1305 listener->io.fd = -1;
1306 #ifndef SRP_TEST_SERVER
1307 if (launchd_name != NULL) {
1308 int *fds;
1309 size_t cnt;
1310 int ret = launch_activate_socket(launchd_name, &fds, &cnt);
1311 if (ret != 0) {
1312 FAULT("launchd_activate_socket failed for " PUB_S_SRP ": " PUB_S_SRP, launchd_name, strerror(ret));
1313 listener->io.fd = -1;
1314 } else if (cnt == 0) {
1315 FAULT("too few sockets returned from launchd_active_socket for " PUB_S_SRP" : %zd", launchd_name, cnt);
1316 listener->io.fd = -1;
1317 } else if (cnt != 1) {
1318 FAULT("too many sockets returned from launchd_active_socket for " PUB_S_SRP" : %zd", launchd_name, cnt);
1319 for (size_t i = 0; i < cnt; i++) {
1320 close(fds[i]);
1321 }
1322 free(fds);
1323 } else {
1324 listener->io.fd = fds[0];
1325 free(fds);
1326 }
1327 }
1328 #endif
1329 if (listener->io.fd == -1) {
1330 listener->io.fd = socket(real_family, SOCK_DGRAM, IPPROTO_UDP);
1331 if (listener->io.fd < 0) {
1332 ERROR("Can't get socket: %s", strerror(errno));
1333 goto out;
1334 }
1335 rv = setsockopt(listener->io.fd, SOL_SOCKET, SO_REUSEADDR, &true_flag, sizeof true_flag);
1336 if (rv < 0) {
1337 ERROR("SO_REUSEADDR failed: %s", strerror(errno));
1338 goto out;
1339 }
1340
1341 rv = setsockopt(listener->io.fd, SOL_SOCKET, SO_REUSEPORT, &true_flag, sizeof true_flag);
1342 if (rv < 0) {
1343 ERROR("SO_REUSEPORT failed: %s", strerror(errno));
1344 goto out;
1345 }
1346
1347 // shift the DSCP value to the left by 2 bits to make the 8-bit field
1348 int dscp = DSCP_CS5 << 2;
1349 if (real_family == AF_INET6) {
1350 // IPV6_TCLASS.
1351 rv = setsockopt(listener->io.fd, IPPROTO_IPV6, IPV6_TCLASS, &dscp, sizeof(dscp));
1352 if (rv < 0) {
1353 ERROR("IPV6_TCLASS failed: %s", strerror(errno));
1354 goto out;
1355 }
1356 } else {
1357 // IP_TOS
1358 rv = setsockopt(listener->io.fd, IPPROTO_IP, IP_TOS, &dscp, sizeof(dscp));
1359 if (rv < 0) {
1360 ERROR("IP_TOS failed: %s", strerror(errno));
1361 goto out;
1362 }
1363 }
1364 // skipping multicast support for now
1365
1366 if (family == AF_INET6) {
1367 // Don't use a dual-stack socket.
1368 rv = setsockopt(listener->io.fd, IPPROTO_IPV6, IPV6_V6ONLY, &true_flag, sizeof true_flag);
1369 if (rv < 0) {
1370 SEGMENTED_IPv6_ADDR_GEN_SRP(listener->address.sin6.sin6_addr.s6_addr, ipv6_addr_buf);
1371 ERROR("Unable to set IPv6-only flag on UDP socket for " PRI_SEGMENTED_IPv6_ADDR_SRP,
1372 SEGMENTED_IPv6_ADDR_PARAM_SRP(listener->address.sin6.sin6_addr.s6_addr, ipv6_addr_buf));
1373 goto out;
1374 }
1375 SEGMENTED_IPv6_ADDR_GEN_SRP(listener->address.sin6.sin6_addr.s6_addr, ipv6_addr_buf);
1376 ERROR("Successfully set IPv6-only flag on UDP socket for " PRI_SEGMENTED_IPv6_ADDR_SRP,
1377 SEGMENTED_IPv6_ADDR_PARAM_SRP(listener->address.sin6.sin6_addr.s6_addr, ipv6_addr_buf));
1378 }
1379
1380 sl = listener->address.sa.sa_len;
1381 if (bind(listener->io.fd, &listener->address.sa, sl) < 0) {
1382 if (family == AF_INET) {
1383 IPv4_ADDR_GEN_SRP(&listener->address.sin.sin_addr.s_addr, ipv4_addr_buf);
1384 ERROR("Can't bind to " PRI_IPv4_ADDR_SRP "#%d: %s",
1385 IPv4_ADDR_PARAM_SRP(&listener->address.sin.sin_addr.s_addr, ipv4_addr_buf), ntohs(port),
1386 strerror(errno));
1387 } else {
1388 SEGMENTED_IPv6_ADDR_GEN_SRP(&listener->address.sin6.sin6_addr.s6_addr, ipv6_addr_buf);
1389 ERROR("Can't bind to " PRI_SEGMENTED_IPv6_ADDR_SRP "#%d: %s",
1390 SEGMENTED_IPv6_ADDR_PARAM_SRP(&listener->address.sin6.sin6_addr.s6_addr, ipv6_addr_buf), ntohs(port),
1391 strerror(errno));
1392 }
1393 out:
1394 close(listener->io.fd);
1395 listener->io.fd = -1;
1396 RELEASE_HERE(listener, listener);
1397 return NULL;
1398 }
1399 }
1400
1401 if (fcntl(listener->io.fd, F_SETFL, O_NONBLOCK) < 0) {
1402 ERROR("%s: Can't set O_NONBLOCK: %s", listener->name, strerror(errno));
1403 goto out;
1404 }
1405
1406 // We may have bound to an unspecified port, so fetch the port we got. Or we may have got the port from
1407 // launchd, in which case let's make sure we got the right port.
1408 if (launchd_name != NULL || port == 0) {
1409 sl = sizeof(sockname);
1410 if (getsockname(listener->io.fd, (struct sockaddr *)&sockname, &sl) < 0) {
1411 ERROR("getsockname: %s", strerror(errno));
1412 goto out;
1413 }
1414 listener->listen_port = ntohs(real_family == AF_INET6 ? sockname.sin6.sin6_port : sockname.sin.sin_port);
1415 if (launchd_name != NULL && listener->listen_port != port) {
1416 ERROR("launchd port mismatch: %u %u", port, listener->listen_port);
1417 }
1418 } else {
1419 listener->listen_port = port;
1420 }
1421 INFO("port is %d", listener->listen_port);
1422
1423 if (ifindex != 0) {
1424 setsockopt(listener->io.fd, IPPROTO_IP, IP_BOUND_IF, &ifindex, sizeof(ifindex));
1425 setsockopt(listener->io.fd, IPPROTO_IPV6, IPV6_BOUND_IF, &ifindex, sizeof(ifindex));
1426 }
1427 rv = setsockopt(listener->io.fd, family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6,
1428 family == AF_INET ? IP_PKTINFO : IPV6_RECVPKTINFO, &true_flag, sizeof true_flag);
1429 if (rv < 0) {
1430 ERROR("Can't set %s: %s.", family == AF_INET ? "IP_PKTINFO" : "IPV6_RECVPKTINFO",
1431 strerror(errno));
1432 goto out;
1433 }
1434 ioloop_add_reader(&listener->io, ioloop_udp_read_callback);
1435 RETAIN_HERE(listener, listener); // For the reader
1436 listener->io.context = listener;
1437 listener->io.is_static = true;
1438 listener->io.context_release = ioloop_listener_context_release;
1439
1440 // If there's a ready callback, call it.
1441 if (listener->ready != NULL) {
1442 RETAIN_HERE(listener, listener); // For the ready callback
1443 dispatch_async(ioloop_main_queue, ^{
1444 // It's possible that we might schedule the ready event but then before we return to the run loop
1445 // the listener gets canceled, in which case we don't want to deliver the ready event.
1446 if (listener->canceled) {
1447 INFO("ready but canceled");
1448 } else {
1449 if (listener->ready != NULL) {
1450 listener->ready(listener->context, listener->listen_port);
1451 }
1452 }
1453 RELEASE_HERE(listener, listener);
1454 });
1455 }
1456 return listener;
1457 }
1458 #endif // UDP_LISTENER_USES_CONNECTION_GROUPS
1459
1460 comm_t *
1461 ioloop_listener_create(bool stream, bool tls, bool launchd, uint16_t *avoid_ports, int num_avoid_ports,
1462 const addr_t *ip_address, const char *multicast, const char *name,
1463 datagram_callback_t datagram_callback, connect_callback_t connected, cancel_callback_t cancel,
1464 ready_callback_t ready, finalize_callback_t finalize, tls_config_callback_t tls_config,
1465 unsigned ifindex, void *context)
1466 {
1467 comm_t *listener;
1468 int family = (ip_address != NULL) ? ip_address->sa.sa_family : AF_UNSPEC;
1469 uint16_t port;
1470 char portbuf[10];
1471 nw_endpoint_t endpoint;
1472
1473 if (ip_address == NULL) {
1474 port = 0;
1475 } else {
1476 port = (family == AF_INET) ? ntohs(ip_address->sin.sin_port) : ntohs(ip_address->sin6.sin6_port);
1477 }
1478
1479 if (multicast != NULL) {
1480 ERROR("ioloop_setup_listener: multicast not supported.");
1481 return NULL;
1482 }
1483
1484 if (datagram_callback == NULL) {
1485 ERROR("ioloop_setup: no datagram callback provided.");
1486 return NULL;
1487 }
1488
1489 snprintf(portbuf, sizeof(portbuf), "%d", port);
1490 listener = calloc(1, sizeof(*listener));
1491 if (listener == NULL) {
1492 if (ip_address == NULL) {
1493 ERROR("No memory for listener on <NULL>#%d", port);
1494 } else if (family == AF_INET) {
1495 IPv4_ADDR_GEN_SRP(&ip_address->sin.sin_addr.s_addr, ipv4_addr_buf);
1496 ERROR("No memory for listener on " PRI_IPv4_ADDR_SRP "#%d",
1497 IPv4_ADDR_PARAM_SRP(&ip_address->sin.sin_addr.s_addr, ipv4_addr_buf), port);
1498 } else if (family == AF_INET6) {
1499 SEGMENTED_IPv6_ADDR_GEN_SRP(ip_address->sin6.sin6_addr.s6_addr, ipv6_addr_buf);
1500 ERROR("No memory for listener on " PRI_SEGMENTED_IPv6_ADDR_SRP "#%d",
1501 SEGMENTED_IPv6_ADDR_PARAM_SRP(ip_address->sin6.sin6_addr.s6_addr, ipv6_addr_buf), port);
1502 } else {
1503 ERROR("No memory for listener on <family address other than AF_INET or AF_INET6: %d>#%d", family, port);
1504 }
1505 return NULL;
1506 }
1507 listener->serial = ++cur_connection_serial;
1508 if (avoid_ports != NULL) {
1509 listener->avoid_ports = malloc(num_avoid_ports * sizeof(uint16_t));
1510 if (listener->avoid_ports == NULL) {
1511 if (ip_address == NULL) {
1512 ERROR("No memory for listener avoid_ports on <NULL>#%d", port);
1513 } else if (family == AF_INET) {
1514 IPv4_ADDR_GEN_SRP(&ip_address->sin.sin_addr.s_addr, ipv4_addr_buf);
1515 ERROR("No memory for listener avoid_ports on " PRI_IPv4_ADDR_SRP "#%d",
1516 IPv4_ADDR_PARAM_SRP(&ip_address->sin.sin_addr.s_addr, ipv4_addr_buf), port);
1517 } else if (family == AF_INET6) {
1518 SEGMENTED_IPv6_ADDR_GEN_SRP(ip_address->sin6.sin6_addr.s6_addr, ipv6_addr_buf);
1519 ERROR("No memory for listener avoid_ports on " PRI_SEGMENTED_IPv6_ADDR_SRP "#%d",
1520 SEGMENTED_IPv6_ADDR_PARAM_SRP(ip_address->sin6.sin6_addr.s6_addr, ipv6_addr_buf), port);
1521 } else {
1522 ERROR("No memory for listener avoid_ports on <family address other than AF_INET or AF_INET6: %d>#%d",
1523 family, port);
1524 }
1525 free(listener);
1526 return NULL;
1527 }
1528 listener->num_avoid_ports = num_avoid_ports;
1529 listener->avoiding = true;
1530 }
1531 RETAIN_HERE(listener, listener);
1532 listener->name = strdup(name);
1533 if (listener->name == NULL) {
1534 ERROR("no memory for listener name.");
1535 RELEASE_HERE(listener, listener);
1536 return NULL;
1537 }
1538 listener->ready = ready;
1539 listener->context = context;
1540 listener->tcp_stream = stream;
1541 listener->is_listener = true;
1542
1543 #if !UDP_LISTENER_USES_CONNECTION_GROUPS
1544 if (stream == FALSE) {
1545 comm_t *ret = ioloop_udp_listener_setup(listener, ip_address, port, launchd ? name : NULL, ifindex);
1546 if (ret == NULL) {
1547 return ret;
1548 }
1549 }
1550 #endif
1551
1552 listener->datagram_callback = datagram_callback;
1553 listener->cancel = cancel;
1554 listener->finalize = finalize;
1555 listener->connected = connected;
1556
1557 #if !UDP_LISTENER_USES_CONNECTION_GROUPS
1558 if (stream == FALSE) {
1559 return listener;
1560 }
1561 #endif
1562 if (port == 0) {
1563 endpoint = NULL;
1564 // Even though we don't have any ports to avoid, we still want the "avoiding" behavior in this case, since that
1565 // is what triggers a call to the ready handler, which passes the port number that we got to it.
1566 listener->avoiding = true;
1567 } else {
1568 listener->listen_port = port;
1569 char ip_address_str[MAX(INET_ADDRSTRLEN, INET6_ADDRSTRLEN)];
1570 if (ip_address == NULL || family == AF_UNSPEC) {
1571 if (family == AF_INET) {
1572 snprintf(ip_address_str, sizeof(ip_address_str), "0.0.0.0");
1573 } else {
1574 // AF_INET6 or AF_UNSPEC
1575 snprintf(ip_address_str, sizeof(ip_address_str), "::");
1576 }
1577 } else {
1578 if (family == AF_INET) {
1579 inet_ntop(family, &ip_address->sin.sin_addr, ip_address_str, sizeof(ip_address_str));
1580 } else {
1581 inet_ntop(family, &ip_address->sin6.sin6_addr, ip_address_str, sizeof(ip_address_str));
1582 }
1583 }
1584 endpoint = nw_endpoint_create_host(ip_address_str, portbuf);
1585 if (endpoint == NULL) {
1586 ERROR("No memory for listener endpoint.");
1587 RELEASE_HERE(listener, listener);
1588 return NULL;
1589 }
1590 }
1591 if (stream) {
1592 nw_parameters_configure_protocol_block_t configure_tls_block = NW_PARAMETERS_DISABLE_PROTOCOL;
1593 if (tls && tls_config != NULL) {
1594 configure_tls_block = ^(nw_protocol_options_t tls_options) {
1595 tls_config_context_t tls_context = {tls_options, ioloop_main_queue};
1596 tls_config((void *)&tls_context);
1597 };
1598 }
1599
1600 listener->parameters = nw_parameters_create_secure_tcp(configure_tls_block, NW_PARAMETERS_DEFAULT_CONFIGURATION);
1601 } else {
1602 if (tls) {
1603 ERROR("DTLS support not implemented.");
1604 nw_release(endpoint);
1605 RELEASE_HERE(listener, listener);
1606 return NULL;
1607 }
1608 #if UDP_LISTENER_USES_CONNECTION_GROUPS
1609 listener->parameters = nw_parameters_create_secure_udp(NW_PARAMETERS_DISABLE_PROTOCOL,
1610 NW_PARAMETERS_DEFAULT_CONFIGURATION);
1611 #endif
1612 }
1613 if (listener->parameters == NULL) {
1614 ERROR("No memory for listener parameters.");
1615 nw_release(endpoint);
1616 RELEASE_HERE(listener, listener);
1617 return NULL;
1618 }
1619
1620 if (endpoint != NULL) {
1621 nw_parameters_set_local_endpoint(listener->parameters, endpoint);
1622 nw_release(endpoint);
1623 }
1624
1625 // Set SO_REUSEADDR.
1626 nw_parameters_set_reuse_local_address(listener->parameters, true);
1627
1628
1629 if (stream) {
1630 // Create the nw_listener_t.
1631 listener->listener = NULL;
1632 #ifndef SRP_TEST_SERVER
1633 if (launchd && name != NULL) {
1634 listener->listener = nw_listener_create_with_launchd_key(listener->parameters, name);
1635 if (listener->listener == NULL) {
1636 ERROR("launchd listener create failed, trying to create it without relying on launchd.");
1637 }
1638 }
1639 #endif
1640 if (listener->listener == NULL) {
1641 listener->listener = nw_listener_create(listener->parameters);
1642 }
1643 if (listener->listener == NULL) {
1644 ERROR("no memory for nw_listener object");
1645 RELEASE_HERE(listener, listener);
1646 return NULL;
1647 }
1648 nw_listener_created++;
1649 nw_listener_set_new_connection_handler(listener->listener,
1650 ^(nw_connection_t connection) {
1651 ioloop_listener_connection_callback(listener, connection);
1652 });
1653
1654 nw_listener_set_state_changed_handler(listener->listener, ^(nw_listener_state_t state, nw_error_t error) {
1655 ioloop_listener_state_changed_handler(listener, state, error);
1656 });
1657 RETAIN_HERE(listener, listener); // for the nw_listener_t state change handler callback
1658 nw_listener_set_queue(listener->listener, ioloop_main_queue);
1659 nw_listener_start(listener->listener);
1660 #if UDP_LISTENER_USES_CONNECTION_GROUPS
1661 } else {
1662 if (launchd) {
1663 FAULT("launchd not yet supported for connection groups");
1664 return NULL;
1665 }
1666 if (!ioloop_udp_listener_setup(listener)) {
1667 RELEASE_HERE(listener, listener);
1668 return NULL;
1669 }
1670 #endif // UDP_LISTENER_USES_CONNECTION_GROUPS
1671 }
1672
1673 // Listener has one refcount
1674 return listener;
1675 }
1676
1677 comm_t *
1678 ioloop_connection_create(addr_t *NONNULL remote_address, bool tls, bool stream, bool stable, bool opportunistic,
1679 datagram_callback_t datagram_callback, connect_callback_t connected,
1680 disconnect_callback_t disconnected, finalize_callback_t finalize, void *context)
1681 {
1682 comm_t *connection;
1683 char portbuf[10];
1684 nw_parameters_t parameters;
1685 nw_endpoint_t endpoint;
1686 char addrbuf[INET6_ADDRSTRLEN];
1687
1688 inet_ntop(remote_address->sa.sa_family, (remote_address->sa.sa_family == AF_INET
1689 ? (void *)&remote_address->sin.sin_addr
1690 : (void *)&remote_address->sin6.sin6_addr), addrbuf, sizeof addrbuf);
1691 snprintf(portbuf, sizeof(portbuf), "%d", (remote_address->sa.sa_family == AF_INET
1692 ? ntohs(remote_address->sin.sin_port)
1693 : ntohs(remote_address->sin6.sin6_port)));
1694 connection = calloc(1, sizeof(*connection));
1695 if (connection == NULL) {
1696 ERROR("No memory for connection");
1697 return NULL;
1698 }
1699 connection->serial = ++cur_connection_serial;
1700
1701 // If we don't release this because of an error, this is the caller's reference to the comm_t.
1702 RETAIN_HERE(connection, comm);
1703 endpoint = nw_endpoint_create_host(addrbuf, portbuf);
1704 if (endpoint == NULL) {
1705 ERROR("No memory for connection endpoint.");
1706 RELEASE_HERE(connection, comm);
1707 return NULL;
1708 }
1709
1710 if (stream) {
1711 nw_parameters_configure_protocol_block_t configure_tls = NW_PARAMETERS_DISABLE_PROTOCOL;
1712 if (tls) {
1713 // This sets up a block that's called when we get a TLS connection and want to verify
1714 // the cert. Right now we only support opportunistic security, which means we have
1715 // no way to validate the cert. Future work: add support for validating the cert
1716 // using a TLSA record if one is present.
1717 configure_tls = ^(nw_protocol_options_t tls_options) {
1718 sec_protocol_options_t sec_options = nw_tls_copy_sec_protocol_options(tls_options);
1719 sec_protocol_options_set_verify_block(sec_options,
1720 ^(sec_protocol_metadata_t metadata, sec_trust_t trust_ref,
1721 sec_protocol_verify_complete_t complete) {
1722 (void) metadata;
1723 (void) trust_ref;
1724 const bool valid = true;
1725 complete(valid);
1726 }, ioloop_main_queue);
1727 nw_release(sec_options);
1728 };
1729 }
1730
1731 parameters = nw_parameters_create_secure_tcp(configure_tls, NW_PARAMETERS_DEFAULT_CONFIGURATION);
1732 } else {
1733 if (tls) {
1734 ERROR("DTLS support not implemented.");
1735 nw_release(endpoint);
1736 RELEASE_HERE(connection, comm);
1737 return NULL;
1738 }
1739 parameters = nw_parameters_create_secure_udp(NW_PARAMETERS_DISABLE_PROTOCOL,
1740 NW_PARAMETERS_DEFAULT_CONFIGURATION);
1741 }
1742 if (parameters == NULL) {
1743 ERROR("No memory for connection parameters.");
1744 nw_release(endpoint);
1745 RELEASE_HERE(connection, comm);
1746 return NULL;
1747 }
1748
1749 nw_protocol_stack_t protocol_stack = nw_parameters_copy_default_protocol_stack(parameters);
1750
1751 // If user asked for a stable address, set that option.
1752 if (stable) {
1753 nw_protocol_options_t ip_options = nw_protocol_stack_copy_internet_protocol(protocol_stack);
1754 nw_ip_options_set_local_address_preference(ip_options, nw_ip_local_address_preference_stable);
1755 nw_release(ip_options);
1756 }
1757
1758 // Only set TCP options for TCP connections.
1759 if (stream) {
1760 nw_protocol_options_t tcp_options = nw_protocol_stack_copy_transport_protocol(protocol_stack);
1761 nw_tcp_options_set_no_delay(tcp_options, true);
1762 nw_tcp_options_set_enable_keepalive(tcp_options, true);
1763 nw_release(tcp_options);
1764 }
1765 nw_release(protocol_stack);
1766
1767 connection->name = strdup(addrbuf);
1768
1769 // Create the nw_connection_t.
1770 connection->connection = nw_connection_create(endpoint, parameters);
1771 nw_connection_created++;
1772 nw_release(endpoint);
1773 nw_release(parameters);
1774 if (connection->connection == NULL) {
1775 ERROR("no memory for nw_connection object");
1776 RELEASE_HERE(connection, comm);
1777 return NULL;
1778 }
1779
1780 connection->datagram_callback = datagram_callback;
1781 connection->connected = connected;
1782 connection->disconnected = disconnected;
1783 connection->finalize = finalize;
1784 connection->tcp_stream = stream;
1785 connection->opportunistic = opportunistic;
1786 connection->context = context;
1787 RETAIN_HERE(connection, comm); // The connection state changed handler has a reference to the connection.
1788 nw_connection_set_state_changed_handler(connection->connection,
1789 ^(nw_connection_state_t state, nw_error_t error)
1790 { ioloop_connection_state_changed(connection, state, error); });
1791 nw_connection_set_queue(connection->connection, ioloop_main_queue);
1792 nw_connection_start(connection->connection);
1793 return connection;
1794 }
1795
1796 static void
1797 subproc_finalize(subproc_t *subproc)
1798 {
1799 int i;
1800 for (i = 0; i < subproc->argc; i++) {
1801 if (subproc->argv[i] != NULL) {
1802 free(subproc->argv[i]);
1803 subproc->argv[i] = NULL;
1804 }
1805 }
1806 if (subproc->dispatch_source != NULL) {
1807 dispatch_release(subproc->dispatch_source);
1808 }
1809 if (subproc->output_fd != NULL) {
1810 ioloop_file_descriptor_release(subproc->output_fd);
1811 }
1812 if (subproc->finalize != NULL) {
1813 subproc->finalize(subproc->context);
1814 }
1815 free(subproc);
1816 }
1817
1818 static void subproc_cancel(void *context)
1819 {
1820 subproc_t *subproc = context;
1821 subproc->dispatch_source = NULL;
1822 RELEASE_HERE(subproc, subproc);
1823 }
1824
1825 static void
1826 subproc_event(void *context)
1827 {
1828 subproc_t *subproc = context;
1829 pid_t pid;
1830 int status;
1831
1832 pid = waitpid(subproc->pid, &status, WNOHANG);
1833 if (pid <= 0) {
1834 return;
1835 }
1836 subproc->callback(subproc, status, NULL);
1837 if (!WIFSTOPPED(status)) {
1838 dispatch_source_cancel(subproc->dispatch_source);
1839 }
1840 }
1841
1842 static void
1843 subproc_output_finalize(void *context)
1844 {
1845 subproc_t *subproc = context;
1846 RELEASE_HERE(subproc, subproc);
1847 }
1848
1849 void
1850 ioloop_subproc_release_(subproc_t *subproc, const char *file, int line)
1851 {
1852 RELEASE(subproc, subproc);
1853 }
1854
1855 // Invoke the specified executable with the specified arguments. Call callback when it exits.
1856 // All failures are reported through the callback.
1857 subproc_t *
1858 ioloop_subproc(const char *exepath, char *NULLABLE *argv, int argc,
1859 subproc_callback_t callback, io_callback_t output_callback, void *context)
1860 {
1861 subproc_t *subproc;
1862 int i, rv;
1863 posix_spawn_file_actions_t actions;
1864 posix_spawnattr_t attrs;
1865
1866 if (callback == NULL) {
1867 ERROR("ioloop_add_wake_event called with null callback");
1868 return NULL;
1869 }
1870
1871 if (argc > MAX_SUBPROC_ARGS) {
1872 callback(NULL, 0, "too many subproc args");
1873 return NULL;
1874 }
1875
1876 subproc = calloc(1, sizeof *subproc);
1877 if (subproc == NULL) {
1878 callback(NULL, 0, "out of memory");
1879 return NULL;
1880 }
1881 RETAIN_HERE(subproc, subproc); // For the create rule
1882 if (output_callback != NULL) {
1883 rv = pipe(subproc->pipe_fds);
1884 if (rv < 0) {
1885 callback(NULL, 0, "unable to create pipe.");
1886 RELEASE_HERE(subproc, subproc);
1887 return NULL;
1888 }
1889 subproc->output_fd = ioloop_file_descriptor_create(subproc->pipe_fds[0], subproc, subproc_output_finalize);
1890 RETAIN_HERE(subproc, subproc); // For the file descriptor
1891 if (subproc->output_fd == NULL) {
1892 callback(NULL, 0, "out of memory.");
1893 close(subproc->pipe_fds[0]);
1894 close(subproc->pipe_fds[1]);
1895 RELEASE_HERE(subproc, subproc);
1896 return NULL;
1897 }
1898 }
1899
1900 subproc->argv[0] = strdup(exepath);
1901 if (subproc->argv[0] == NULL) {
1902 RELEASE_HERE(subproc, subproc);
1903 callback(NULL, 0, "out of memory");
1904 return NULL;
1905 }
1906 subproc->argc++;
1907 for (i = 0; i < argc; i++) {
1908 subproc->argv[i + 1] = strdup(argv[i]);
1909 if (subproc->argv[i + 1] == NULL) {
1910 RELEASE_HERE(subproc, subproc);
1911 callback(NULL, 0, "out of memory");
1912 return NULL;
1913 }
1914 subproc->argc++;
1915 }
1916
1917 // Set up for posix_spawn
1918 posix_spawn_file_actions_init(&actions);
1919 if (output_callback != NULL) {
1920 posix_spawn_file_actions_adddup2(&actions, subproc->pipe_fds[1], STDOUT_FILENO);
1921 posix_spawn_file_actions_addclose(&actions, subproc->pipe_fds[0]);
1922 posix_spawn_file_actions_addclose(&actions, subproc->pipe_fds[1]);
1923 }
1924 posix_spawnattr_init(&attrs);
1925 extern char **environ;
1926 rv = posix_spawn(&subproc->pid, exepath, &actions, &attrs, subproc->argv, environ);
1927 posix_spawn_file_actions_destroy(&actions);
1928 posix_spawnattr_destroy(&attrs);
1929 if (rv < 0) {
1930 ERROR("posix_spawn failed for " PUB_S_SRP ": " PUB_S_SRP, subproc->argv[0], strerror(errno));
1931 callback(subproc, 0, strerror(errno));
1932 RELEASE_HERE(subproc, subproc);
1933 return NULL;
1934 }
1935 subproc->callback = callback;
1936 subproc->context = context;
1937
1938 subproc->dispatch_source = dispatch_source_create(DISPATCH_SOURCE_TYPE_PROC, subproc->pid, DISPATCH_PROC_EXIT,
1939 ioloop_main_queue);
1940 if (subproc->dispatch_source == NULL) {
1941 ERROR("dispatch_source_create failed in ioloop_add_wake_event().");
1942 return false;
1943 }
1944 dispatch_retain(subproc->dispatch_source);
1945 dispatch_source_set_event_handler_f(subproc->dispatch_source, subproc_event);
1946 dispatch_source_set_cancel_handler_f(subproc->dispatch_source, subproc_cancel);
1947 dispatch_set_context(subproc->dispatch_source, subproc);
1948 dispatch_activate(subproc->dispatch_source);
1949 RETAIN_HERE(subproc, subproc); // Dispatch has a reference
1950
1951 // Now that we have a viable subprocess, add the reader callback.
1952 if (output_callback != NULL && subproc->output_fd != NULL) {
1953 close(subproc->pipe_fds[1]);
1954 ioloop_add_reader(subproc->output_fd, output_callback);
1955 }
1956 return subproc;
1957 }
1958
1959 #ifdef SRP_TEST_SERVER
1960 void
1961 ioloop_dnssd_txn_cancel_srp(void *srp_server, dnssd_txn_t *txn)
1962 {
1963 if (txn->sdref != NULL) {
1964 INFO("txn %p serviceref %p", txn, txn->sdref);
1965 if (srp_server != NULL) {
1966 dns_service_ref_deallocate(srp_server, txn->sdref);
1967 } else {
1968 DNSServiceRefDeallocate(txn->sdref);
1969 }
1970 txn->sdref = NULL;
1971 } else {
1972 INFO("dead transaction.");
1973 }
1974 }
1975 #endif
1976
1977 void
1978 ioloop_dnssd_txn_cancel(dnssd_txn_t *txn)
1979 {
1980 if (txn->sdref != NULL) {
1981 INFO("txn %p serviceref %p", txn, txn->sdref);
1982 DNSServiceRefDeallocate(txn->sdref);
1983 txn->sdref = NULL;
1984 } else {
1985 INFO("dead transaction.");
1986 }
1987 }
1988
1989 static void
1990 dnssd_txn_finalize(dnssd_txn_t *txn)
1991 {
1992 if (txn->sdref != NULL) {
1993 ioloop_dnssd_txn_cancel(txn);
1994 }
1995 if (txn->finalize_callback) {
1996 txn->finalize_callback(txn->context);
1997 }
1998 free(txn);
1999 }
2000
2001 void
2002 ioloop_dnssd_txn_retain_(dnssd_txn_t *dnssd_txn, const char *file, int line)
2003 {
2004 (void)file; (void)line;
2005 RETAIN(dnssd_txn, dnssd_txn);
2006 }
2007
2008 void
2009 ioloop_dnssd_txn_release_(dnssd_txn_t *dnssd_txn, const char *file, int line)
2010 {
2011 (void)file; (void)line;
2012 RELEASE(dnssd_txn, dnssd_txn);
2013 }
2014
2015 dnssd_txn_t *
2016 ioloop_dnssd_txn_add_subordinate_(DNSServiceRef ref, void *context, dnssd_txn_finalize_callback_t finalize_callback,
2017 dnssd_txn_failure_callback_t failure_callback,
2018 const char *file, int line)
2019 {
2020 dnssd_txn_t *txn = calloc(1, sizeof(*txn));
2021 (void)file; (void)line;
2022 (void)failure_callback;
2023
2024 if (txn != NULL) {
2025 RETAIN(txn, dnssd_txn);
2026 txn->sdref = ref;
2027 INFO("txn %p serviceref %p", txn, ref);
2028 txn->context = context;
2029 txn->finalize_callback = finalize_callback;
2030 }
2031 return txn;
2032 }
2033
2034 dnssd_txn_t *
2035 ioloop_dnssd_txn_add_(DNSServiceRef ref, void *context, dnssd_txn_finalize_callback_t finalize_callback,
2036 dnssd_txn_failure_callback_t failure_callback,
2037 const char *file, int line)
2038 {
2039 dnssd_txn_t *txn = ioloop_dnssd_txn_add_subordinate_(ref, context, finalize_callback, failure_callback, file, line);
2040 if (txn != NULL) {
2041 DNSServiceSetDispatchQueue(ref, ioloop_main_queue);
2042 }
2043 return txn;
2044 }
2045
2046
2047 void
2048 ioloop_dnssd_txn_set_aux_pointer(dnssd_txn_t *NONNULL txn, void *aux_pointer)
2049 {
2050 txn->aux_pointer = aux_pointer;
2051 }
2052
2053 void *
2054 ioloop_dnssd_txn_get_aux_pointer(dnssd_txn_t *NONNULL txn)
2055 {
2056 return txn->aux_pointer;
2057 }
2058
2059 void *
2060 ioloop_dnssd_txn_get_context(dnssd_txn_t *NONNULL txn)
2061 {
2062 return txn->context;
2063 }
2064
2065
2066 static void
2067 file_descriptor_finalize(void *context)
2068 {
2069 io_t *file_descriptor = context;
2070 if (file_descriptor->ref_count == 0) {
2071 if (file_descriptor->finalize) {
2072 file_descriptor->finalize(file_descriptor->context);
2073 }
2074 free(file_descriptor);
2075 }
2076 }
2077
2078 void
2079 ioloop_file_descriptor_retain_(io_t *file_descriptor, const char *file, int line)
2080 {
2081 (void)file; (void)line;
2082 RETAIN(file_descriptor, file_descriptor);
2083 }
2084
2085 void
2086 ioloop_file_descriptor_release_(io_t *file_descriptor, const char *file, int line)
2087 {
2088 (void)file; (void)line;
2089 RELEASE(file_descriptor, file_descriptor);
2090 }
2091
2092 io_t *
2093 ioloop_file_descriptor_create_(int fd, void *context, finalize_callback_t finalize, const char *file, int line)
2094 {
2095 io_t *ret;
2096 ret = calloc(1, sizeof(*ret));
2097 if (ret) {
2098 ret->fd = fd;
2099 ret->context = context;
2100 ret->finalize = finalize;
2101 RETAIN(ret, file_descriptor);
2102 }
2103 return ret;
2104 }
2105
2106 static void
2107 ioloop_read_source_finalize(void *context)
2108 {
2109 io_t *io = context;
2110
2111 INFO("io %p fd %d, read source %p, write_source %p", io, io->fd, io->read_source, io->write_source);
2112
2113 // Release the reference count that dispatch was holding.
2114 if (io->is_static) {
2115 if (io->context_release != NULL) {
2116 io->context_release(io->context);
2117 }
2118 FINALIZED(file_descriptor_finalized);
2119 } else {
2120 RELEASE_HERE(io, file_descriptor);
2121 }
2122 }
2123
2124 static void
2125 ioloop_read_source_cancel_callback(void *context)
2126 {
2127 io_t *io = context;
2128
2129 INFO("io %p fd %d, read source %p, write_source %p", io, io->fd, io->read_source, io->write_source);
2130 if (io->read_source != NULL) {
2131 dispatch_release(io->read_source);
2132 io->read_source = NULL;
2133 if (io->fd != -1) {
2134 close(io->fd);
2135 io->fd = -1;
2136 } else {
2137 FAULT("io->fd has been set to -1 too early");
2138 }
2139 }
2140 }
2141
2142 static void
2143 ioloop_read_event(void *context)
2144 {
2145 io_t *io = context;
2146
2147 if (io->read_callback != NULL) {
2148 io->read_callback(io, io->context);
2149 }
2150 }
2151
2152 void
2153 ioloop_close(io_t *io)
2154 {
2155 INFO("io %p fd %d, read source %p, write_source %p", io, io->fd, io->read_source, io->write_source);
2156 if (io->read_source != NULL) {
2157 dispatch_cancel(io->read_source);
2158 }
2159 if (io->write_source != NULL) {
2160 dispatch_cancel(io->write_source);
2161 }
2162 }
2163
2164 void
2165 ioloop_add_reader(io_t *NONNULL io, io_callback_t NONNULL callback)
2166 {
2167 io->read_callback = callback;
2168 if (io->read_source == NULL) {
2169 io->read_source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, io->fd, 0, ioloop_main_queue);
2170 }
2171 if (io->read_source == NULL) {
2172 ERROR("dispatch_source_create: unable to create read dispatch source.");
2173 return;
2174 }
2175 dispatch_source_set_event_handler_f(io->read_source, ioloop_read_event);
2176 dispatch_source_set_cancel_handler_f(io->read_source, ioloop_read_source_cancel_callback);
2177 dispatch_set_finalizer_f(io->read_source, ioloop_read_source_finalize);
2178 dispatch_set_context(io->read_source, io);
2179 RETAIN_HERE(io, file_descriptor); // Dispatch will hold a reference.
2180 dispatch_resume(io->read_source);
2181 INFO("io %p fd %d, read source %p, write_source %p", io, io->fd, io->read_source, io->write_source);
2182 }
2183
2184 void
2185 ioloop_run_async(async_callback_t callback, void *context)
2186 {
2187 dispatch_async(ioloop_main_queue, ^{
2188 callback(context);
2189 });
2190 }
2191
2192 const struct sockaddr *
2193 connection_get_local_address(message_t *message)
2194 {
2195 if (message == NULL) {
2196 ERROR("message is NULL.");
2197 return NULL;
2198 }
2199 return &message->local.sa;
2200 }
2201
2202 bool
2203 ioloop_is_device_apple_tv(void)
2204 {
2205 return IsAppleTV();
2206 }
2207
2208 // Local Variables:
2209 // mode: C
2210 // tab-width: 4
2211 // c-file-style: "bsd"
2212 // c-basic-offset: 4
2213 // fill-column: 108
2214 // indent-tabs-mode: nil
2215 // End:
2216