netmgr_common.c revision 1.4.4.2 1 /* $NetBSD: netmgr_common.c,v 1.4.4.2 2025/08/02 05:54:16 perseant Exp $ */
2
3 /*
4 * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
5 *
6 * SPDX-License-Identifier: MPL-2.0
7 *
8 * This Source Code Form is subject to the terms of the Mozilla Public
9 * License, v. 2.0. If a copy of the MPL was not distributed with this
10 * file, you can obtain one at https://mozilla.org/MPL/2.0/.
11 *
12 * See the COPYRIGHT file distributed with this work for additional
13 * information regarding copyright ownership.
14 */
15
16 #include <inttypes.h>
17 #include <sched.h> /* IWYU pragma: keep */
18 #include <setjmp.h>
19 #include <signal.h>
20 #include <stdarg.h>
21 #include <stdlib.h>
22 #include <unistd.h>
23
24 /*
25 * As a workaround, include an OpenSSL header file before including cmocka.h,
26 * because OpenSSL 3.1.0 uses __attribute__(malloc), conflicting with a
27 * redefined malloc in cmocka.h.
28 */
29 #include <openssl/err.h>
30
31 #define UNIT_TESTING
32 #include <cmocka.h>
33
34 #include <isc/async.h>
35 #include <isc/nonce.h>
36 #include <isc/os.h>
37 #include <isc/quota.h>
38 #include <isc/refcount.h>
39 #include <isc/sockaddr.h>
40 #include <isc/thread.h>
41 #include <isc/util.h>
42 #include <isc/uv.h>
43 #define KEEP_BEFORE
44
45 #include "netmgr_common.h"
46
47 #include <tests/isc.h>
48
49 isc_nm_t *listen_nm = NULL;
50 isc_nm_t *connect_nm = NULL;
51
52 isc_sockaddr_t tcp_listen_addr;
53 isc_sockaddr_t tcp_connect_addr;
54 isc_tlsctx_t *tcp_listen_tlsctx = NULL;
55 isc_tlsctx_t *tcp_connect_tlsctx = NULL;
56 isc_tlsctx_client_session_cache_t *tcp_tlsctx_client_sess_cache = NULL;
57
58 isc_sockaddr_t udp_listen_addr;
59 isc_sockaddr_t udp_connect_addr;
60
61 uint64_t send_magic = 0;
62
63 isc_region_t send_msg = { .base = (unsigned char *)&send_magic,
64 .length = sizeof(send_magic) };
65
66 atomic_bool do_send = false;
67
68 atomic_int_fast64_t nsends = 0;
69 int_fast64_t esends = 0; /* expected sends */
70
71 atomic_int_fast64_t ssends = 0;
72 atomic_int_fast64_t sreads = 0;
73 atomic_int_fast64_t saccepts = 0;
74
75 atomic_int_fast64_t cconnects = 0;
76 atomic_int_fast64_t csends = 0;
77 atomic_int_fast64_t creads = 0;
78 atomic_int_fast64_t ctimeouts = 0;
79
80 int expected_ssends;
81 int expected_sreads;
82 int expected_csends;
83 int expected_cconnects;
84 int expected_creads;
85 int expected_saccepts;
86 int expected_ctimeouts;
87
88 bool ssends_shutdown;
89 bool sreads_shutdown;
90 bool saccepts_shutdown;
91 bool csends_shutdown;
92 bool cconnects_shutdown;
93 bool creads_shutdown;
94 bool ctimeouts_shutdown;
95
96 isc_refcount_t active_cconnects = 0;
97 isc_refcount_t active_csends = 0;
98 isc_refcount_t active_creads = 0;
99 isc_refcount_t active_ssends = 0;
100 isc_refcount_t active_sreads = 0;
101
102 isc_nmsocket_t *listen_sock = NULL;
103
104 isc_quota_t listener_quota;
105 atomic_bool check_listener_quota = false;
106
107 bool allow_send_back = false;
108 bool noanswer = false;
109 bool stream_use_TLS = false;
110 bool stream_use_PROXY = false;
111 bool stream_PROXY_over_TLS = false;
112 bool stream = false;
113 in_port_t stream_port = 0;
114
115 bool udp_use_PROXY = false;
116
117 isc_nm_recv_cb_t connect_readcb = NULL;
118
119 isc_nm_proxyheader_info_t proxy_info_data;
120 isc_nm_proxyheader_info_t *proxy_info = NULL;
121 isc_sockaddr_t proxy_src;
122 isc_sockaddr_t proxy_dst;
123
124 int
125 setup_netmgr_test(void **state) {
126 struct in_addr in;
127 tcp_connect_addr = (isc_sockaddr_t){ .length = 0 };
128 isc_sockaddr_fromin6(&tcp_connect_addr, &in6addr_loopback, 0);
129
130 tcp_listen_addr = (isc_sockaddr_t){ .length = 0 };
131 isc_sockaddr_fromin6(&tcp_listen_addr, &in6addr_loopback, stream_port);
132
133 RUNTIME_CHECK(inet_pton(AF_INET, "1.2.3.4", &in) == 1);
134 isc_sockaddr_fromin(&proxy_src, &in, 1234);
135 RUNTIME_CHECK(inet_pton(AF_INET, "4.3.2.1", &in) == 1);
136 isc_sockaddr_fromin(&proxy_dst, &in, 4321);
137 isc_nm_proxyheader_info_init(&proxy_info_data, &proxy_src, &proxy_dst,
138 NULL);
139
140 esends = NSENDS * workers;
141
142 atomic_store(&nsends, esends);
143
144 atomic_store(&saccepts, 0);
145 atomic_store(&sreads, 0);
146 atomic_store(&ssends, 0);
147
148 atomic_store(&cconnects, 0);
149 atomic_store(&csends, 0);
150 atomic_store(&creads, 0);
151 atomic_store(&ctimeouts, 0);
152 allow_send_back = false;
153
154 expected_cconnects = -1;
155 expected_csends = -1;
156 expected_creads = -1;
157 expected_sreads = -1;
158 expected_ssends = -1;
159 expected_saccepts = -1;
160 expected_ctimeouts = -1;
161
162 ssends_shutdown = true;
163 sreads_shutdown = true;
164 saccepts_shutdown = true;
165 csends_shutdown = true;
166 cconnects_shutdown = true;
167 creads_shutdown = true;
168 ctimeouts_shutdown = true;
169
170 do_send = false;
171
172 isc_refcount_init(&active_cconnects, 0);
173 isc_refcount_init(&active_csends, 0);
174 isc_refcount_init(&active_creads, 0);
175 isc_refcount_init(&active_ssends, 0);
176 isc_refcount_init(&active_sreads, 0);
177
178 isc_nonce_buf(&send_magic, sizeof(send_magic));
179
180 setup_loopmgr(state);
181 isc_netmgr_create(mctx, loopmgr, &listen_nm);
182 assert_non_null(listen_nm);
183 isc_nm_settimeouts(listen_nm, T_INIT, T_IDLE, T_KEEPALIVE,
184 T_ADVERTISED);
185
186 isc_netmgr_create(mctx, loopmgr, &connect_nm);
187 assert_non_null(connect_nm);
188 isc_nm_settimeouts(connect_nm, T_INIT, T_IDLE, T_KEEPALIVE,
189 T_ADVERTISED);
190
191 isc_quota_init(&listener_quota, 0);
192 atomic_store(&check_listener_quota, false);
193
194 connect_readcb = connect_read_cb;
195 noanswer = false;
196
197 if (isc_tlsctx_createserver(NULL, NULL, &tcp_listen_tlsctx) !=
198 ISC_R_SUCCESS)
199 {
200 return -1;
201 }
202 if (isc_tlsctx_createclient(&tcp_connect_tlsctx) != ISC_R_SUCCESS) {
203 return -1;
204 }
205
206 isc_tlsctx_enable_dot_client_alpn(tcp_connect_tlsctx);
207
208 isc_tlsctx_client_session_cache_create(
209 mctx, tcp_connect_tlsctx,
210 ISC_TLSCTX_CLIENT_SESSION_CACHE_DEFAULT_SIZE,
211 &tcp_tlsctx_client_sess_cache);
212
213 return 0;
214 }
215
216 int
217 teardown_netmgr_test(void **state ISC_ATTR_UNUSED) {
218 UNUSED(state);
219
220 isc_tlsctx_client_session_cache_detach(&tcp_tlsctx_client_sess_cache);
221
222 isc_tlsctx_free(&tcp_connect_tlsctx);
223 isc_tlsctx_free(&tcp_listen_tlsctx);
224
225 isc_netmgr_destroy(&connect_nm);
226 assert_null(connect_nm);
227
228 isc_netmgr_destroy(&listen_nm);
229 assert_null(listen_nm);
230
231 teardown_loopmgr(state);
232
233 isc_refcount_destroy(&active_cconnects);
234 isc_refcount_destroy(&active_csends);
235 isc_refcount_destroy(&active_creads);
236 isc_refcount_destroy(&active_ssends);
237 isc_refcount_destroy(&active_sreads);
238
239 proxy_info = NULL;
240
241 return 0;
242 }
243
244 void
245 stop_listening(void *arg ISC_ATTR_UNUSED) {
246 isc_nm_stoplistening(listen_sock);
247 isc_nmsocket_close(&listen_sock);
248 assert_null(listen_sock);
249 }
250
251 /* Callbacks */
252
253 void
254 noop_recv_cb(isc_nmhandle_t *handle ISC_ATTR_UNUSED,
255 isc_result_t eresult ISC_ATTR_UNUSED,
256 isc_region_t *region ISC_ATTR_UNUSED,
257 void *cbarg ISC_ATTR_UNUSED) {
258 F();
259 }
260
261 isc_result_t
262 noop_accept_cb(isc_nmhandle_t *handle ISC_ATTR_UNUSED, isc_result_t eresult,
263 void *cbarg ISC_ATTR_UNUSED) {
264 F();
265
266 if (eresult == ISC_R_SUCCESS) {
267 (void)atomic_fetch_add(&saccepts, 1);
268 }
269
270 return ISC_R_SUCCESS;
271 }
272
273 void
274 connect_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg);
275
276 void
277 connect_send(isc_nmhandle_t *handle);
278
279 void
280 connect_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
281 isc_nmhandle_t *sendhandle = handle;
282
283 assert_non_null(sendhandle);
284
285 UNUSED(cbarg);
286
287 F();
288
289 switch (eresult) {
290 case ISC_R_EOF:
291 case ISC_R_SHUTTINGDOWN:
292 case ISC_R_CANCELED:
293 case ISC_R_CONNECTIONRESET:
294 /* Abort */
295 if (!stream) {
296 isc_nm_cancelread(handle);
297 }
298 break;
299 case ISC_R_SUCCESS:
300 if (have_expected_csends(atomic_fetch_add(&csends, 1) + 1)) {
301 do_csends_shutdown(loopmgr);
302 }
303 break;
304 default:
305 fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
306 isc_result_totext(eresult), cbarg);
307 assert_int_equal(eresult, ISC_R_SUCCESS);
308 }
309
310 isc_refcount_decrement(&active_csends);
311 isc_nmhandle_detach(&sendhandle);
312 }
313
314 void
315 connect_send(isc_nmhandle_t *handle) {
316 isc_nmhandle_t *sendhandle = NULL;
317 isc_refcount_increment0(&active_csends);
318 isc_nmhandle_attach(handle, &sendhandle);
319 isc_nmhandle_setwritetimeout(handle, T_IDLE);
320 isc_nm_send(sendhandle, &send_msg, connect_send_cb, NULL);
321 }
322
323 void
324 connect_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
325 isc_region_t *region, void *cbarg) {
326 uint64_t magic = 0;
327
328 UNUSED(cbarg);
329
330 assert_non_null(handle);
331
332 F();
333
334 switch (eresult) {
335 case ISC_R_SUCCESS:
336 assert_true(region->length >= sizeof(magic));
337
338 memmove(&magic, region->base, sizeof(magic));
339
340 assert_true(magic == send_magic);
341
342 if (have_expected_creads(atomic_fetch_add(&creads, 1) + 1)) {
343 do_creads_shutdown(loopmgr);
344 }
345
346 if (magic == send_magic && allow_send_back) {
347 connect_send(handle);
348 return;
349 }
350
351 /* This will initiate one more read callback */
352 if (stream) {
353 isc_nmhandle_close(handle);
354 }
355 break;
356 case ISC_R_TIMEDOUT:
357 case ISC_R_EOF:
358 case ISC_R_SHUTTINGDOWN:
359 case ISC_R_CANCELED:
360 case ISC_R_CONNECTIONRESET:
361 case ISC_R_CONNREFUSED:
362 break;
363 default:
364 fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
365 isc_result_totext(eresult), cbarg);
366 assert_int_equal(eresult, ISC_R_SUCCESS);
367 }
368
369 isc_refcount_decrement(&active_creads);
370 isc_nmhandle_detach(&handle);
371 }
372
373 void
374 connect_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
375 isc_nmhandle_t *readhandle = NULL;
376
377 F();
378
379 isc_refcount_decrement(&active_cconnects);
380
381 if (eresult != ISC_R_SUCCESS || connect_readcb == NULL) {
382 return;
383 }
384
385 if (stream_use_PROXY) {
386 assert_true(isc_nm_is_proxy_handle(handle));
387 }
388
389 /* We are finished, initiate the shutdown */
390 if (have_expected_cconnects(atomic_fetch_add(&cconnects, 1) + 1)) {
391 do_cconnects_shutdown(loopmgr);
392 } else if (do_send) {
393 isc_async_current(stream_recv_send_connect,
394 cbarg == NULL
395 ? get_stream_connect_function()
396 : (stream_connect_function)cbarg);
397 }
398
399 isc_refcount_increment0(&active_creads);
400 isc_nmhandle_attach(handle, &readhandle);
401 isc_nm_read(handle, connect_readcb, NULL);
402
403 connect_send(handle);
404 }
405
406 void
407 listen_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
408 isc_nmhandle_t *sendhandle = handle;
409
410 UNUSED(cbarg);
411 UNUSED(eresult);
412
413 assert_non_null(sendhandle);
414
415 F();
416
417 switch (eresult) {
418 case ISC_R_CANCELED:
419 case ISC_R_CONNECTIONRESET:
420 case ISC_R_EOF:
421 case ISC_R_SHUTTINGDOWN:
422 break;
423 case ISC_R_SUCCESS:
424 if (have_expected_ssends(atomic_fetch_add(&ssends, 1) + 1)) {
425 do_ssends_shutdown(loopmgr);
426 }
427 break;
428 default:
429 fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
430 isc_result_totext(eresult), cbarg);
431 assert_int_equal(eresult, ISC_R_SUCCESS);
432 }
433
434 isc_refcount_decrement(&active_ssends);
435 isc_nmhandle_detach(&sendhandle);
436 }
437
438 void
439 listen_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
440 isc_region_t *region, void *cbarg) {
441 uint64_t magic = 0;
442
443 assert_non_null(handle);
444
445 F();
446
447 switch (eresult) {
448 case ISC_R_SUCCESS:
449 if (udp_use_PROXY || stream_use_PROXY) {
450 assert_true(isc_nm_is_proxy_handle(handle));
451 proxy_verify_endpoints(handle);
452 }
453
454 memmove(&magic, region->base, sizeof(magic));
455 assert_true(magic == send_magic);
456
457 if (have_expected_sreads(atomic_fetch_add(&sreads, 1) + 1)) {
458 do_sreads_shutdown(loopmgr);
459 }
460
461 assert_true(region->length >= sizeof(magic));
462
463 memmove(&magic, region->base, sizeof(magic));
464 assert_true(magic == send_magic);
465
466 if (!noanswer) {
467 /* Answer and continue to listen */
468 isc_nmhandle_t *sendhandle = NULL;
469 isc_nmhandle_attach(handle, &sendhandle);
470 isc_refcount_increment0(&active_ssends);
471 isc_nmhandle_setwritetimeout(sendhandle, T_IDLE);
472 isc_nm_send(sendhandle, &send_msg, listen_send_cb,
473 cbarg);
474 }
475 /* Continue to listen */
476 return;
477 case ISC_R_CANCELED:
478 case ISC_R_CONNECTIONRESET:
479 case ISC_R_EOF:
480 case ISC_R_SHUTTINGDOWN:
481 break;
482 default:
483 fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
484 isc_result_totext(eresult), cbarg);
485 assert_int_equal(eresult, ISC_R_SUCCESS);
486 }
487
488 isc_refcount_decrement(&active_sreads);
489 isc_nmhandle_detach(&handle);
490 }
491
492 isc_result_t
493 listen_accept_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
494 UNUSED(handle);
495 UNUSED(cbarg);
496
497 F();
498
499 if (eresult != ISC_R_SUCCESS) {
500 return eresult;
501 }
502
503 if (have_expected_saccepts(atomic_fetch_add(&saccepts, 1) + 1)) {
504 do_saccepts_shutdown(loopmgr);
505 }
506
507 isc_nmhandle_attach(handle, &(isc_nmhandle_t *){ NULL });
508 isc_refcount_increment0(&active_sreads);
509
510 return eresult;
511 }
512
513 isc_result_t
514 stream_accept_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
515 isc_nmhandle_t *readhandle = NULL;
516
517 UNUSED(cbarg);
518
519 F();
520
521 if (eresult != ISC_R_SUCCESS) {
522 return eresult;
523 }
524
525 if (have_expected_saccepts(atomic_fetch_add(&saccepts, 1) + 1)) {
526 do_saccepts_shutdown(loopmgr);
527 }
528
529 if (stream_use_PROXY) {
530 assert_true(isc_nm_is_proxy_handle(handle));
531 proxy_verify_endpoints(handle);
532 }
533
534 isc_refcount_increment0(&active_sreads);
535
536 isc_nmhandle_attach(handle, &readhandle);
537 isc_nm_read(handle, listen_read_cb, readhandle);
538
539 return ISC_R_SUCCESS;
540 }
541
542 void
543 stream_recv_send_connect(void *arg) {
544 connect_func connect = (connect_func)arg;
545 isc_sockaddr_t connect_addr;
546
547 connect_addr = (isc_sockaddr_t){ .length = 0 };
548 isc_sockaddr_fromin6(&connect_addr, &in6addr_loopback, 0);
549
550 isc_refcount_increment0(&active_cconnects);
551 connect(connect_nm);
552 }
553
554 /* Common stream protocols code */
555
556 void
557 timeout_retry_cb(isc_nmhandle_t *handle, isc_result_t eresult,
558 isc_region_t *region, void *cbarg) {
559 UNUSED(region);
560 UNUSED(cbarg);
561
562 assert_non_null(handle);
563
564 F();
565
566 if (eresult == ISC_R_TIMEDOUT &&
567 atomic_fetch_add(&ctimeouts, 1) + 1 < expected_ctimeouts)
568 {
569 isc_nmhandle_settimeout(handle, T_SOFT);
570 connect_send(handle);
571 return;
572 }
573
574 isc_refcount_decrement(&active_creads);
575 isc_nmhandle_detach(&handle);
576
577 isc_loopmgr_shutdown(loopmgr);
578 }
579
580 isc_quota_t *
581 tcp_listener_init_quota(size_t nthreads) {
582 isc_quota_t *quotap = NULL;
583 if (atomic_load(&check_listener_quota)) {
584 unsigned int max_quota = ISC_MAX(nthreads / 2, 1);
585 isc_quota_max(&listener_quota, max_quota);
586 quotap = &listener_quota;
587 }
588 return quotap;
589 }
590
591 static void
592 tcp_connect(isc_nm_t *nm) {
593 isc_nm_tcpconnect(nm, &tcp_connect_addr, &tcp_listen_addr,
594 connect_connect_cb, NULL, T_CONNECT);
595 }
596
597 static void
598 tls_connect(isc_nm_t *nm) {
599 isc_nm_tlsconnect(nm, &tcp_connect_addr, &tcp_listen_addr,
600 connect_connect_cb, NULL, tcp_connect_tlsctx, NULL,
601 tcp_tlsctx_client_sess_cache, T_CONNECT,
602 stream_use_PROXY, NULL);
603 }
604
605 void
606 set_proxyheader_info(isc_nm_proxyheader_info_t *pi) {
607 proxy_info = pi;
608 }
609
610 isc_nm_proxyheader_info_t *
611 get_proxyheader_info(void) {
612 if (proxy_info != NULL) {
613 return proxy_info;
614 }
615
616 /*
617 * There is 50% chance to get the info: so we can test LOCAL headers,
618 * too.
619 */
620 if (isc_random_uniform(2)) {
621 return &proxy_info_data;
622 }
623
624 return NULL;
625 }
626
627 static void
628 proxystream_connect(isc_nm_t *nm) {
629 isc_tlsctx_t *tlsctx = stream_PROXY_over_TLS ? tcp_connect_tlsctx
630 : NULL;
631 isc_tlsctx_client_session_cache_t *sess_cache =
632 stream_PROXY_over_TLS ? tcp_tlsctx_client_sess_cache : NULL;
633
634 isc_nm_proxystreamconnect(nm, &tcp_connect_addr, &tcp_listen_addr,
635 connect_connect_cb, NULL, T_CONNECT, tlsctx,
636 NULL, sess_cache, get_proxyheader_info());
637 }
638
639 stream_connect_function
640 get_stream_connect_function(void) {
641 if (stream_use_TLS && !stream_PROXY_over_TLS) {
642 return tls_connect;
643 } else if (stream_use_PROXY) {
644 return proxystream_connect;
645 } else {
646 return tcp_connect;
647 }
648
649 UNREACHABLE();
650 }
651
652 isc_result_t
653 stream_listen(isc_nm_accept_cb_t accept_cb, void *accept_cbarg, int backlog,
654 isc_quota_t *quota, isc_nmsocket_t **sockp) {
655 isc_result_t result = ISC_R_SUCCESS;
656
657 if (stream_use_TLS && !stream_PROXY_over_TLS) {
658 result = isc_nm_listentls(
659 listen_nm, ISC_NM_LISTEN_ALL, &tcp_listen_addr,
660 accept_cb, accept_cbarg, backlog, quota,
661 tcp_listen_tlsctx, stream_use_PROXY, sockp);
662 return result;
663 } else if (stream_use_PROXY) {
664 isc_tlsctx_t *tlsctx = stream_PROXY_over_TLS ? tcp_listen_tlsctx
665 : NULL;
666 result = isc_nm_listenproxystream(
667 listen_nm, ISC_NM_LISTEN_ALL, &tcp_listen_addr,
668 accept_cb, accept_cbarg, backlog, quota, tlsctx, sockp);
669 return result;
670 } else {
671 result = isc_nm_listentcp(listen_nm, ISC_NM_LISTEN_ALL,
672 &tcp_listen_addr, accept_cb,
673 accept_cbarg, backlog, quota, sockp);
674 return result;
675 }
676
677 UNREACHABLE();
678 }
679
680 void
681 stream_connect(isc_nm_cb_t cb, void *cbarg, unsigned int timeout) {
682 isc_refcount_increment0(&active_cconnects);
683
684 if (stream_use_TLS && !stream_PROXY_over_TLS) {
685 isc_nm_tlsconnect(connect_nm, &tcp_connect_addr,
686 &tcp_listen_addr, cb, cbarg,
687 tcp_connect_tlsctx, NULL,
688 tcp_tlsctx_client_sess_cache, timeout,
689 stream_use_PROXY, NULL);
690 return;
691 } else if (stream_use_PROXY) {
692 isc_tlsctx_t *tlsctx = stream_PROXY_over_TLS
693 ? tcp_connect_tlsctx
694 : NULL;
695 isc_tlsctx_client_session_cache_t *sess_cache =
696 stream_PROXY_over_TLS ? tcp_tlsctx_client_sess_cache
697 : NULL;
698 isc_nm_proxystreamconnect(connect_nm, &tcp_connect_addr,
699 &tcp_listen_addr, cb, cbarg, timeout,
700 tlsctx, NULL, sess_cache,
701 get_proxyheader_info());
702 return;
703 } else {
704 isc_nm_tcpconnect(connect_nm, &tcp_connect_addr,
705 &tcp_listen_addr, cb, cbarg, timeout);
706 return;
707 }
708 UNREACHABLE();
709 }
710
711 isc_nm_proxy_type_t
712 get_proxy_type(void) {
713 if (!stream_use_PROXY) {
714 return ISC_NM_PROXY_NONE;
715 } else if (stream_PROXY_over_TLS) {
716 return ISC_NM_PROXY_ENCRYPTED;
717 }
718
719 return ISC_NM_PROXY_PLAIN;
720 }
721
722 void
723 connect_success_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
724 UNUSED(handle);
725 UNUSED(cbarg);
726
727 F();
728
729 isc_refcount_decrement(&active_cconnects);
730 assert_int_equal(eresult, ISC_R_SUCCESS);
731
732 if (have_expected_cconnects(atomic_fetch_add(&cconnects, 1) + 1)) {
733 do_cconnects_shutdown(loopmgr);
734 return;
735 }
736 }
737
738 int
739 stream_noop_setup(void **state ISC_ATTR_UNUSED) {
740 int r = setup_netmgr_test(state);
741 expected_cconnects = 1;
742 return r;
743 }
744
745 int
746 proxystream_noop_setup(void **state) {
747 stream_use_PROXY = true;
748 return stream_noop_setup(state);
749 }
750
751 int
752 proxystreamtls_noop_setup(void **state) {
753 stream_PROXY_over_TLS = true;
754 return proxystream_noop_setup(state);
755 }
756
757 void
758 stream_noop(void **state ISC_ATTR_UNUSED) {
759 isc_result_t result = ISC_R_SUCCESS;
760
761 result = stream_listen(noop_accept_cb, NULL, 128, NULL, &listen_sock);
762 assert_int_equal(result, ISC_R_SUCCESS);
763 isc_loop_teardown(mainloop, stop_listening, listen_sock);
764
765 connect_readcb = NULL;
766 stream_connect(connect_success_cb, NULL, T_CONNECT);
767 }
768
769 int
770 stream_noop_teardown(void **state ISC_ATTR_UNUSED) {
771 atomic_assert_int_eq(cconnects, 1);
772 atomic_assert_int_eq(csends, 0);
773 atomic_assert_int_eq(creads, 0);
774 atomic_assert_int_eq(sreads, 0);
775 atomic_assert_int_eq(ssends, 0);
776 return teardown_netmgr_test(state);
777 }
778
779 int
780 proxystream_noop_teardown(void **state) {
781 int r = stream_noop_teardown(state);
782 stream_use_PROXY = false;
783
784 return r;
785 }
786
787 int
788 proxystreamtls_noop_teardown(void **state) {
789 int r = proxystream_noop_teardown(state);
790 stream_PROXY_over_TLS = false;
791
792 return r;
793 }
794
795 static void
796 noresponse_readcb(isc_nmhandle_t *handle, isc_result_t eresult,
797 isc_region_t *region, void *cbarg) {
798 UNUSED(handle);
799 UNUSED(region);
800 UNUSED(cbarg);
801
802 F();
803
804 assert_true(eresult == ISC_R_CANCELED ||
805 eresult == ISC_R_CONNECTIONRESET || eresult == ISC_R_EOF);
806
807 isc_refcount_decrement(&active_creads);
808 isc_nmhandle_detach(&handle);
809
810 isc_loopmgr_shutdown(loopmgr);
811 }
812
813 static void
814 noresponse_sendcb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
815 UNUSED(cbarg);
816 UNUSED(eresult);
817
818 F();
819
820 assert_non_null(handle);
821 atomic_fetch_add(&csends, 1);
822 isc_nmhandle_detach(&handle);
823 isc_refcount_decrement(&active_csends);
824 }
825
826 static void
827 noresponse_connectcb(isc_nmhandle_t *handle, isc_result_t eresult,
828 void *cbarg) {
829 isc_nmhandle_t *readhandle = NULL;
830 isc_nmhandle_t *sendhandle = NULL;
831
832 F();
833
834 isc_refcount_decrement(&active_cconnects);
835
836 assert_int_equal(eresult, ISC_R_SUCCESS);
837
838 atomic_fetch_add(&cconnects, 1);
839
840 isc_refcount_increment0(&active_creads);
841 isc_nmhandle_attach(handle, &readhandle);
842 isc_nm_read(handle, noresponse_readcb, NULL);
843
844 isc_refcount_increment0(&active_csends);
845 isc_nmhandle_attach(handle, &sendhandle);
846 isc_nmhandle_setwritetimeout(handle, T_IDLE);
847
848 isc_nm_send(handle, (isc_region_t *)&send_msg, noresponse_sendcb,
849 cbarg);
850 }
851
852 int
853 stream_noresponse_setup(void **state ISC_ATTR_UNUSED) {
854 int r = setup_netmgr_test(state);
855 expected_cconnects = 1;
856 expected_saccepts = 1;
857 return r;
858 }
859
860 int
861 proxystream_noresponse_setup(void **state) {
862 stream_use_PROXY = true;
863 return stream_noresponse_setup(state);
864 }
865
866 int
867 proxystream_noresponse_teardown(void **state) {
868 int r = stream_noresponse_teardown(state);
869 stream_use_PROXY = false;
870 return r;
871 }
872
873 int
874 proxystreamtls_noresponse_setup(void **state) {
875 stream_PROXY_over_TLS = true;
876 return proxystream_noresponse_setup(state);
877 }
878
879 int
880 proxystreamtls_noresponse_teardown(void **state) {
881 int r = proxystream_noresponse_teardown(state);
882 stream_PROXY_over_TLS = false;
883 return r;
884 }
885
886 void
887 stream_noresponse(void **state ISC_ATTR_UNUSED) {
888 isc_result_t result = ISC_R_SUCCESS;
889
890 result = stream_listen(noop_accept_cb, NULL, 128, NULL, &listen_sock);
891 assert_int_equal(result, ISC_R_SUCCESS);
892 isc_loop_teardown(mainloop, stop_listening, listen_sock);
893
894 stream_connect(noresponse_connectcb, NULL, T_CONNECT);
895 }
896
897 int
898 stream_noresponse_teardown(void **state ISC_ATTR_UNUSED) {
899 X(cconnects);
900 X(csends);
901 X(creads);
902 X(sreads);
903 X(ssends);
904
905 atomic_assert_int_eq(cconnects, 1);
906 atomic_assert_int_eq(creads, 0);
907 atomic_assert_int_eq(sreads, 0);
908 atomic_assert_int_eq(ssends, 0);
909
910 return teardown_netmgr_test(state);
911 }
912
913 int
914 stream_timeout_recovery_setup(void **state ISC_ATTR_UNUSED) {
915 int r = setup_netmgr_test(state);
916
917 expected_ctimeouts = 4;
918 ctimeouts_shutdown = false;
919
920 expected_sreads = 5;
921 sreads_shutdown = true;
922
923 return r;
924 }
925
926 typedef struct proxy_addrs {
927 isc_sockaddr_t src_addr;
928 isc_sockaddr_t dst_addr;
929 } proxy_addrs_t;
930
931 static void
932 proxy2_handler_save_addrs_cb(const isc_result_t result,
933 const isc_proxy2_command_t cmd, const int socktype,
934 const isc_sockaddr_t *restrict src_addr,
935 const isc_sockaddr_t *restrict dst_addr,
936 const isc_region_t *restrict tlv_data,
937 const isc_region_t *restrict extra, void *cbarg) {
938 proxy_addrs_t *addrs = (proxy_addrs_t *)cbarg;
939
940 UNUSED(cmd);
941 UNUSED(socktype);
942 UNUSED(tlv_data);
943 UNUSED(extra);
944
945 REQUIRE(result == ISC_R_SUCCESS);
946
947 if (src_addr != NULL) {
948 addrs->src_addr = *src_addr;
949 }
950
951 if (dst_addr != NULL) {
952 addrs->dst_addr = *dst_addr;
953 }
954 }
955
956 void
957 proxy_verify_endpoints(isc_nmhandle_t *handle) {
958 isc_sockaddr_t local, peer;
959 peer = isc_nmhandle_peeraddr(handle);
960 local = isc_nmhandle_localaddr(handle);
961
962 if (isc_nm_is_proxy_unspec(handle)) {
963 isc_sockaddr_t real_local, real_peer;
964 real_peer = isc_nmhandle_real_peeraddr(handle);
965 real_local = isc_nmhandle_real_localaddr(handle);
966
967 assert_true(isc_sockaddr_equal(&peer, &real_peer));
968 assert_true(isc_sockaddr_equal(&local, &real_local));
969 } else if (proxy_info == NULL) {
970 assert_true(isc_sockaddr_equal(&peer, &proxy_src));
971 assert_true(isc_sockaddr_equal(&local, &proxy_dst));
972 } else if (proxy_info != NULL && !proxy_info->complete) {
973 assert_true(isc_sockaddr_equal(
974 &peer, &proxy_info->proxy_info.src_addr));
975 assert_true(isc_sockaddr_equal(
976 &local, &proxy_info->proxy_info.dst_addr));
977 } else if (proxy_info != NULL && proxy_info->complete) {
978 proxy_addrs_t addrs = { 0 };
979 RUNTIME_CHECK(isc_proxy2_header_handle_directly(
980 &proxy_info->complete_header,
981 proxy2_handler_save_addrs_cb,
982 &addrs) == ISC_R_SUCCESS);
983
984 assert_true(isc_sockaddr_equal(&peer, &addrs.src_addr));
985 assert_true(isc_sockaddr_equal(&local, &addrs.dst_addr));
986 }
987 }
988
989 int
990 proxystream_timeout_recovery_setup(void **state) {
991 stream_use_PROXY = true;
992 return stream_timeout_recovery_setup(state);
993 }
994
995 int
996 proxystream_timeout_recovery_teardown(void **state) {
997 int r = stream_timeout_recovery_teardown(state);
998 stream_use_PROXY = false;
999 return r;
1000 }
1001
1002 int
1003 proxystreamtls_timeout_recovery_setup(void **state) {
1004 stream_PROXY_over_TLS = true;
1005 return proxystream_timeout_recovery_setup(state);
1006 }
1007
1008 int
1009 proxystreamtls_timeout_recovery_teardown(void **state) {
1010 int r = proxystream_timeout_recovery_teardown(state);
1011 stream_PROXY_over_TLS = false;
1012 return r;
1013 }
1014
1015 void
1016 stream_timeout_recovery(void **state ISC_ATTR_UNUSED) {
1017 isc_result_t result = ISC_R_SUCCESS;
1018
1019 /*
1020 * Accept connections but don't send responses, forcing client
1021 * reads to time out.
1022 */
1023 noanswer = true;
1024 result = stream_listen(stream_accept_cb, NULL, 128, NULL, &listen_sock);
1025 assert_int_equal(result, ISC_R_SUCCESS);
1026 isc_loop_teardown(mainloop, stop_listening, listen_sock);
1027
1028 /*
1029 * Shorten all the client timeouts to 0.05 seconds.
1030 */
1031 isc_nm_settimeouts(connect_nm, T_SOFT, T_SOFT, T_SOFT, T_SOFT);
1032 connect_readcb = timeout_retry_cb;
1033 stream_connect(connect_connect_cb, NULL, T_CONNECT);
1034 }
1035
1036 int
1037 stream_timeout_recovery_teardown(void **state ISC_ATTR_UNUSED) {
1038 atomic_assert_int_eq(ctimeouts, expected_ctimeouts);
1039 return teardown_netmgr_test(state);
1040 }
1041
1042 int
1043 stream_recv_one_setup(void **state ISC_ATTR_UNUSED) {
1044 int r = setup_netmgr_test(state);
1045
1046 expected_cconnects = 1;
1047 cconnects_shutdown = false;
1048
1049 expected_csends = 1;
1050 csends_shutdown = false;
1051
1052 expected_saccepts = 1;
1053 saccepts_shutdown = false;
1054
1055 expected_sreads = 1;
1056 sreads_shutdown = false;
1057
1058 expected_ssends = 1;
1059 ssends_shutdown = false;
1060
1061 expected_creads = 1;
1062 creads_shutdown = true;
1063
1064 return r;
1065 }
1066
1067 int
1068 proxystream_recv_one_setup(void **state) {
1069 stream_use_PROXY = true;
1070 return stream_recv_one_setup(state);
1071 }
1072
1073 int
1074 proxystream_recv_one_teardown(void **state) {
1075 int r = stream_recv_one_teardown(state);
1076 stream_use_PROXY = false;
1077 return r;
1078 }
1079
1080 int
1081 proxystreamtls_recv_one_setup(void **state) {
1082 stream_PROXY_over_TLS = true;
1083 return proxystream_recv_one_setup(state);
1084 }
1085
1086 int
1087 proxystreamtls_recv_one_teardown(void **state) {
1088 int r = proxystream_recv_one_teardown(state);
1089 stream_PROXY_over_TLS = false;
1090 return r;
1091 }
1092
1093 void
1094 stream_recv_one(void **state ISC_ATTR_UNUSED) {
1095 isc_result_t result = ISC_R_SUCCESS;
1096 isc_quota_t *quotap = tcp_listener_init_quota(1);
1097
1098 atomic_store(&nsends, 1);
1099
1100 result = stream_listen(stream_accept_cb, NULL, 128, quotap,
1101 &listen_sock);
1102 assert_int_equal(result, ISC_R_SUCCESS);
1103 isc_loop_teardown(mainloop, stop_listening, listen_sock);
1104
1105 stream_connect(connect_connect_cb, NULL, T_CONNECT);
1106 }
1107
1108 int
1109 stream_recv_one_teardown(void **state ISC_ATTR_UNUSED) {
1110 atomic_assert_int_eq(cconnects, expected_cconnects);
1111 atomic_assert_int_eq(csends, expected_csends);
1112 atomic_assert_int_eq(saccepts, expected_saccepts);
1113 atomic_assert_int_eq(sreads, expected_sreads);
1114 atomic_assert_int_eq(ssends, expected_ssends);
1115 atomic_assert_int_eq(creads, expected_creads);
1116
1117 return teardown_netmgr_test(state);
1118 }
1119
1120 int
1121 stream_recv_two_setup(void **state ISC_ATTR_UNUSED) {
1122 int r = setup_netmgr_test(state);
1123
1124 expected_cconnects = 2;
1125 cconnects_shutdown = false;
1126
1127 expected_csends = 2;
1128 csends_shutdown = false;
1129
1130 expected_saccepts = 2;
1131 saccepts_shutdown = false;
1132
1133 expected_sreads = 2;
1134 sreads_shutdown = false;
1135
1136 expected_ssends = 2;
1137 ssends_shutdown = false;
1138
1139 expected_creads = 2;
1140 creads_shutdown = true;
1141
1142 return r;
1143 }
1144
1145 int
1146 proxystream_recv_two_setup(void **state) {
1147 stream_use_PROXY = true;
1148 return stream_recv_two_setup(state);
1149 }
1150
1151 int
1152 proxystream_recv_two_teardown(void **state) {
1153 int r = stream_recv_two_teardown(state);
1154 stream_use_PROXY = false;
1155 return r;
1156 }
1157
1158 int
1159 proxystreamtls_recv_two_setup(void **state) {
1160 stream_PROXY_over_TLS = true;
1161 return proxystream_recv_two_setup(state);
1162 }
1163
1164 int
1165 proxystreamtls_recv_two_teardown(void **state) {
1166 int r = proxystream_recv_two_teardown(state);
1167 stream_PROXY_over_TLS = false;
1168 return r;
1169 }
1170
1171 void
1172 stream_recv_two(void **state ISC_ATTR_UNUSED) {
1173 isc_result_t result = ISC_R_SUCCESS;
1174 isc_quota_t *quotap = tcp_listener_init_quota(1);
1175
1176 atomic_store(&nsends, 2);
1177
1178 result = stream_listen(stream_accept_cb, NULL, 128, quotap,
1179 &listen_sock);
1180 assert_int_equal(result, ISC_R_SUCCESS);
1181 isc_loop_teardown(mainloop, stop_listening, listen_sock);
1182
1183 stream_connect(connect_connect_cb, NULL, T_CONNECT);
1184
1185 stream_connect(connect_connect_cb, NULL, T_CONNECT);
1186 }
1187
1188 int
1189 stream_recv_two_teardown(void **state ISC_ATTR_UNUSED) {
1190 atomic_assert_int_eq(cconnects, expected_cconnects);
1191 atomic_assert_int_eq(csends, expected_csends);
1192 atomic_assert_int_eq(sreads, expected_saccepts);
1193 atomic_assert_int_eq(sreads, expected_sreads);
1194 atomic_assert_int_eq(ssends, expected_ssends);
1195 atomic_assert_int_eq(creads, expected_creads);
1196
1197 return teardown_netmgr_test(state);
1198 }
1199
1200 int
1201 stream_recv_send_setup(void **state ISC_ATTR_UNUSED) {
1202 int r = setup_netmgr_test(state);
1203 expected_cconnects = workers;
1204 cconnects_shutdown = false;
1205 nsends = expected_creads = workers;
1206 do_send = true;
1207
1208 return r;
1209 }
1210
1211 int
1212 proxystream_recv_send_setup(void **state) {
1213 stream_use_PROXY = true;
1214 return stream_recv_send_setup(state);
1215 }
1216
1217 int
1218 proxystream_recv_send_teardown(void **state) {
1219 int r = stream_recv_send_teardown(state);
1220 stream_use_PROXY = false;
1221 return r;
1222 }
1223
1224 int
1225 proxystreamtls_recv_send_setup(void **state) {
1226 stream_PROXY_over_TLS = true;
1227 return proxystream_recv_send_setup(state);
1228 }
1229
1230 int
1231 proxystreamtls_recv_send_teardown(void **state) {
1232 int r = proxystream_recv_send_teardown(state);
1233 stream_PROXY_over_TLS = false;
1234 return r;
1235 }
1236
1237 void
1238 stream_recv_send(void **state ISC_ATTR_UNUSED) {
1239 isc_result_t result = ISC_R_SUCCESS;
1240 isc_quota_t *quotap = tcp_listener_init_quota(workers);
1241
1242 result = stream_listen(stream_accept_cb, NULL, 128, quotap,
1243 &listen_sock);
1244 assert_int_equal(result, ISC_R_SUCCESS);
1245 isc_loop_teardown(mainloop, stop_listening, listen_sock);
1246
1247 for (size_t i = 0; i < workers; i++) {
1248 isc_async_run(isc_loop_get(loopmgr, i),
1249 stream_recv_send_connect,
1250 get_stream_connect_function());
1251 }
1252 }
1253
1254 int
1255 stream_recv_send_teardown(void **state ISC_ATTR_UNUSED) {
1256 X(cconnects);
1257 X(csends);
1258 X(creads);
1259 X(sreads);
1260 X(ssends);
1261
1262 CHECK_RANGE_FULL(csends);
1263 CHECK_RANGE_FULL(creads);
1264 CHECK_RANGE_FULL(sreads);
1265 CHECK_RANGE_FULL(ssends);
1266
1267 return teardown_netmgr_test(state);
1268 }
1269
1270 int
1271 setup_udp_test(void **state) {
1272 setup_loopmgr(state);
1273 setup_netmgr(state);
1274
1275 udp_connect_addr = (isc_sockaddr_t){ .length = 0 };
1276 isc_sockaddr_fromin6(&udp_connect_addr, &in6addr_loopback, 0);
1277
1278 udp_listen_addr = (isc_sockaddr_t){ .length = 0 };
1279 isc_sockaddr_fromin6(&udp_listen_addr, &in6addr_loopback,
1280 udp_use_PROXY ? PROXYUDP_TEST_PORT
1281 : UDP_TEST_PORT);
1282
1283 atomic_store(&sreads, 0);
1284 atomic_store(&ssends, 0);
1285
1286 atomic_store(&cconnects, 0);
1287 atomic_store(&csends, 0);
1288 atomic_store(&creads, 0);
1289 atomic_store(&ctimeouts, 0);
1290
1291 isc_refcount_init(&active_cconnects, 0);
1292 isc_refcount_init(&active_csends, 0);
1293 isc_refcount_init(&active_creads, 0);
1294 isc_refcount_init(&active_ssends, 0);
1295 isc_refcount_init(&active_sreads, 0);
1296
1297 expected_cconnects = -1;
1298 expected_csends = -1;
1299 expected_creads = -1;
1300 expected_sreads = -1;
1301 expected_ssends = -1;
1302 expected_ctimeouts = -1;
1303
1304 ssends_shutdown = true;
1305 sreads_shutdown = true;
1306 csends_shutdown = true;
1307 cconnects_shutdown = true;
1308 creads_shutdown = true;
1309
1310 isc_nonce_buf(&send_magic, sizeof(send_magic));
1311
1312 connect_readcb = connect_read_cb;
1313
1314 return 0;
1315 }
1316
1317 int
1318 teardown_udp_test(void **state) {
1319 UNUSED(state);
1320
1321 isc_refcount_destroy(&active_cconnects);
1322 isc_refcount_destroy(&active_csends);
1323 isc_refcount_destroy(&active_creads);
1324 isc_refcount_destroy(&active_ssends);
1325 isc_refcount_destroy(&active_sreads);
1326
1327 teardown_netmgr(state);
1328 teardown_loopmgr(state);
1329
1330 return 0;
1331 }
1332
1333 static void
1334 udp_connect(isc_nm_cb_t cb, void *cbarg, unsigned int timeout) {
1335 if (udp_use_PROXY) {
1336 isc_nm_proxyudpconnect(netmgr, &udp_connect_addr,
1337 &udp_listen_addr, cb, cbarg, timeout,
1338 NULL);
1339 } else {
1340 isc_nm_udpconnect(netmgr, &udp_connect_addr, &udp_listen_addr,
1341 cb, cbarg, timeout);
1342 }
1343 }
1344
1345 static void
1346 udp_listen_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
1347 isc_region_t *region, void *cbarg) {
1348 if (eresult != ISC_R_SUCCESS) {
1349 isc_refcount_increment0(&active_sreads);
1350 }
1351 listen_read_cb(handle, eresult, region, cbarg);
1352 }
1353
1354 static void
1355 udp_start_listening(uint32_t nworkers, isc_nm_recv_cb_t cb) {
1356 isc_result_t result;
1357
1358 if (udp_use_PROXY) {
1359 result = isc_nm_listenproxyudp(netmgr, nworkers,
1360 &udp_listen_addr, cb, NULL,
1361 &listen_sock);
1362 } else {
1363 result = isc_nm_listenudp(netmgr, nworkers, &udp_listen_addr,
1364 cb, NULL, &listen_sock);
1365 }
1366
1367 assert_int_equal(result, ISC_R_SUCCESS);
1368
1369 isc_loop_teardown(mainloop, stop_listening, listen_sock);
1370 }
1371
1372 static void
1373 udp__send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
1374 isc_nmhandle_t *sendhandle = handle;
1375
1376 assert_non_null(sendhandle);
1377
1378 F();
1379
1380 switch (eresult) {
1381 case ISC_R_SUCCESS:
1382 if (have_expected_csends(atomic_fetch_add(&csends, 1) + 1)) {
1383 if (csends_shutdown) {
1384 isc_nm_cancelread(handle);
1385 isc_loopmgr_shutdown(loopmgr);
1386 }
1387 }
1388 break;
1389 case ISC_R_SHUTTINGDOWN:
1390 case ISC_R_CANCELED:
1391 break;
1392 default:
1393 fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
1394 isc_result_totext(eresult), cbarg);
1395 assert_int_equal(eresult, ISC_R_SUCCESS);
1396 }
1397
1398 isc_nmhandle_detach(&sendhandle);
1399 isc_refcount_decrement(&active_csends);
1400 }
1401
1402 static void
1403 udp__connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg);
1404
1405 static void
1406 udp_enqueue_connect(void *arg ISC_ATTR_UNUSED) {
1407 isc_sockaddr_t connect_addr;
1408
1409 connect_addr = (isc_sockaddr_t){ .length = 0 };
1410 isc_sockaddr_fromin6(&connect_addr, &in6addr_loopback, 0);
1411
1412 isc_refcount_increment0(&active_cconnects);
1413
1414 udp_connect(udp__connect_cb, NULL, T_CONNECT);
1415 }
1416
1417 static void
1418 udp__connect_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
1419 isc_region_t *region, void *cbarg) {
1420 uint64_t magic = 0;
1421
1422 assert_non_null(handle);
1423
1424 F();
1425
1426 switch (eresult) {
1427 case ISC_R_TIMEDOUT:
1428 /*
1429 * We are operating on the localhost, UDP cannot get lost, but
1430 * it could be delayed, so we read again until we get the
1431 * answer.
1432 */
1433 isc_nm_read(handle, connect_readcb, cbarg);
1434 return;
1435 case ISC_R_SUCCESS:
1436 assert_true(region->length >= sizeof(magic));
1437
1438 memmove(&magic, region->base, sizeof(magic));
1439
1440 assert_true(magic == send_magic);
1441
1442 if (have_expected_creads(atomic_fetch_add(&creads, 1) + 1)) {
1443 do_creads_shutdown(loopmgr);
1444 }
1445
1446 if (magic == send_magic && allow_send_back) {
1447 connect_send(handle);
1448 return;
1449 }
1450
1451 break;
1452 default:
1453 fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
1454 isc_result_totext(eresult), cbarg);
1455 assert_int_equal(eresult, ISC_R_SUCCESS);
1456 }
1457
1458 isc_refcount_decrement(&active_creads);
1459
1460 isc_nmhandle_detach(&handle);
1461 }
1462
1463 static void
1464 udp__connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
1465 isc_nmhandle_t *readhandle = NULL;
1466 isc_nmhandle_t *sendhandle = NULL;
1467
1468 F();
1469
1470 isc_refcount_decrement(&active_cconnects);
1471
1472 switch (eresult) {
1473 case ISC_R_SUCCESS:
1474 if (udp_use_PROXY) {
1475 assert_true(isc_nm_is_proxy_handle(handle));
1476 }
1477
1478 if (have_expected_cconnects(atomic_fetch_add(&cconnects, 1) +
1479 1))
1480 {
1481 do_cconnects_shutdown(loopmgr);
1482 } else if (do_send) {
1483 isc_async_current(udp_enqueue_connect, cbarg);
1484 }
1485
1486 isc_refcount_increment0(&active_creads);
1487 isc_nmhandle_attach(handle, &readhandle);
1488 isc_nm_read(handle, connect_readcb, cbarg);
1489
1490 isc_refcount_increment0(&active_csends);
1491 isc_nmhandle_attach(handle, &sendhandle);
1492 isc_nmhandle_setwritetimeout(handle, T_IDLE);
1493
1494 isc_nm_send(sendhandle, (isc_region_t *)&send_msg, udp__send_cb,
1495 cbarg);
1496
1497 break;
1498 case ISC_R_ADDRINUSE:
1499 /* Try again */
1500 udp_enqueue_connect(NULL);
1501 break;
1502 case ISC_R_SHUTTINGDOWN:
1503 case ISC_R_CANCELED:
1504 break;
1505 default:
1506 fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
1507 isc_result_totext(eresult), cbarg);
1508 assert_int_equal(eresult, ISC_R_SUCCESS);
1509 }
1510 }
1511
1512 int
1513 udp_noop_setup(void **state) {
1514 setup_udp_test(state);
1515 expected_cconnects = 1;
1516 cconnects_shutdown = true;
1517 return 0;
1518 }
1519
1520 int
1521 udp_noop_teardown(void **state) {
1522 atomic_assert_int_eq(cconnects, 1);
1523 teardown_udp_test(state);
1524 return 0;
1525 }
1526
1527 void
1528 udp_noop(void **arg ISC_ATTR_UNUSED) {
1529 /* isc_result_t result = ISC_R_SUCCESS; */
1530
1531 /* result = isc_nm_listenudp(netmgr, ISC_NM_LISTEN_ALL,
1532 * &udp_listen_addr, */
1533 /* mock_recv_cb, NULL, &listen_sock); */
1534 /* assert_int_equal(result, ISC_R_SUCCESS); */
1535
1536 /* isc_nm_stoplistening(listen_sock); */
1537 /* isc_nmsocket_close(&listen_sock); */
1538 /* assert_null(listen_sock); */
1539
1540 isc_refcount_increment0(&active_cconnects);
1541 udp_connect(connect_success_cb, NULL, UDP_T_CONNECT);
1542 }
1543
1544 int
1545 proxyudp_noop_setup(void **state) {
1546 udp_use_PROXY = true;
1547 return udp_noop_setup(state);
1548 }
1549
1550 int
1551 proxyudp_noop_teardown(void **state) {
1552 int ret = udp_noop_teardown(state);
1553 udp_use_PROXY = false;
1554 return ret;
1555 }
1556
1557 static void
1558 udp_noresponse_recv_cb(isc_nmhandle_t *handle, isc_result_t eresult,
1559 isc_region_t *region, void *cbarg) {
1560 UNUSED(handle);
1561 UNUSED(eresult);
1562 UNUSED(region);
1563 UNUSED(cbarg);
1564 }
1565
1566 static void
1567 udp_noresponse_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
1568 isc_region_t *region, void *cbarg) {
1569 UNUSED(region);
1570 UNUSED(cbarg);
1571
1572 assert_int_equal(eresult, ISC_R_TIMEDOUT);
1573
1574 isc_refcount_decrement(&active_creads);
1575
1576 atomic_fetch_add(&creads, 1);
1577
1578 isc_nmhandle_detach(&handle);
1579
1580 isc_loopmgr_shutdown(loopmgr);
1581 }
1582
1583 static void
1584 udp_noresponse_send_cb(isc_nmhandle_t *handle, isc_result_t eresult,
1585 void *cbarg) {
1586 UNUSED(cbarg);
1587
1588 assert_non_null(handle);
1589 assert_int_equal(eresult, ISC_R_SUCCESS);
1590 atomic_fetch_add(&csends, 1);
1591 isc_nmhandle_detach(&handle);
1592 isc_refcount_decrement(&active_csends);
1593 }
1594
1595 static void
1596 udp_noresponse_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult,
1597 void *cbarg) {
1598 isc_nmhandle_t *readhandle = NULL;
1599 isc_nmhandle_t *sendhandle = NULL;
1600
1601 isc_refcount_decrement(&active_cconnects);
1602
1603 assert_int_equal(eresult, ISC_R_SUCCESS);
1604
1605 /* Read */
1606 isc_refcount_increment0(&active_creads);
1607 isc_nmhandle_attach(handle, &readhandle);
1608 isc_nm_read(handle, udp_noresponse_read_cb, cbarg);
1609
1610 /* Send */
1611 isc_refcount_increment0(&active_csends);
1612 isc_nmhandle_attach(handle, &sendhandle);
1613 isc_nmhandle_setwritetimeout(handle, T_IDLE);
1614
1615 isc_nm_send(sendhandle, (isc_region_t *)&send_msg,
1616 udp_noresponse_send_cb, cbarg);
1617
1618 atomic_fetch_add(&cconnects, 1);
1619 }
1620
1621 int
1622 udp_noresponse_setup(void **state) {
1623 setup_udp_test(state);
1624 expected_csends = 1;
1625 return 0;
1626 }
1627
1628 int
1629 udp_noresponse_teardown(void **state) {
1630 atomic_assert_int_eq(csends, expected_csends);
1631 teardown_udp_test(state);
1632 return 0;
1633 }
1634
1635 void
1636 udp_noresponse(void **arg ISC_ATTR_UNUSED) {
1637 udp_start_listening(ISC_NM_LISTEN_ONE, udp_noresponse_recv_cb);
1638
1639 isc_refcount_increment0(&active_cconnects);
1640 udp_connect(udp_noresponse_connect_cb, listen_sock, UDP_T_SOFT);
1641 }
1642
1643 int
1644 proxyudp_noresponse_setup(void **state) {
1645 udp_use_PROXY = true;
1646 return udp_noresponse_setup(state);
1647 }
1648
1649 int
1650 proxyudp_noresponse_teardown(void **state) {
1651 int ret = udp_noresponse_teardown(state);
1652 udp_use_PROXY = false;
1653 return ret;
1654 }
1655
1656 static void
1657 udp_timeout_recovery_ssend_cb(isc_nmhandle_t *handle, isc_result_t eresult,
1658 void *cbarg) {
1659 UNUSED(cbarg);
1660
1661 isc_refcount_decrement(&active_ssends);
1662 assert_non_null(handle);
1663 assert_int_equal(eresult, ISC_R_SUCCESS);
1664 atomic_fetch_add(&ssends, 1);
1665 isc_nmhandle_detach(&handle);
1666 }
1667
1668 static void
1669 udp_timeout_recovery_recv_cb(isc_nmhandle_t *handle, isc_result_t eresult,
1670 isc_region_t *region, void *cbarg) {
1671 uint64_t magic = 0;
1672 isc_nmhandle_t *sendhandle = NULL;
1673 int _creads = atomic_fetch_add(&creads, 1) + 1;
1674
1675 assert_non_null(handle);
1676
1677 assert_int_equal(eresult, ISC_R_SUCCESS);
1678
1679 assert_true(region->length == sizeof(magic));
1680
1681 memmove(&magic, region->base, sizeof(magic));
1682 assert_true(magic == send_magic);
1683 assert_true(_creads < 6);
1684
1685 if (_creads == 5) {
1686 isc_nmhandle_attach(handle, &sendhandle);
1687 isc_refcount_increment0(&active_ssends);
1688 isc_nmhandle_setwritetimeout(sendhandle, T_IDLE);
1689 isc_nm_send(sendhandle, (isc_region_t *)&send_msg,
1690 udp_timeout_recovery_ssend_cb, cbarg);
1691 }
1692 }
1693
1694 static void
1695 udp_timeout_recovery_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
1696 isc_region_t *region, void *cbarg) {
1697 UNUSED(region);
1698 UNUSED(cbarg);
1699
1700 assert_non_null(handle);
1701
1702 F();
1703
1704 if (eresult == ISC_R_TIMEDOUT &&
1705 atomic_fetch_add(&ctimeouts, 1) + 1 < expected_ctimeouts)
1706 {
1707 isc_nmhandle_settimeout(handle, T_SOFT);
1708 return;
1709 }
1710
1711 isc_refcount_decrement(&active_creads);
1712 isc_nmhandle_detach(&handle);
1713
1714 atomic_fetch_add(&creads, 1);
1715 isc_loopmgr_shutdown(loopmgr);
1716 }
1717
1718 static void
1719 udp_timeout_recovery_send_cb(isc_nmhandle_t *handle, isc_result_t eresult,
1720 void *cbarg) {
1721 UNUSED(cbarg);
1722
1723 assert_non_null(handle);
1724 assert_int_equal(eresult, ISC_R_SUCCESS);
1725 atomic_fetch_add(&csends, 1);
1726
1727 isc_nmhandle_detach(&handle);
1728 isc_refcount_decrement(&active_csends);
1729 }
1730
1731 static void
1732 udp_timeout_recovery_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult,
1733 void *cbarg) {
1734 isc_nmhandle_t *readhandle = NULL;
1735 isc_nmhandle_t *sendhandle = NULL;
1736
1737 F();
1738
1739 isc_refcount_decrement(&active_cconnects);
1740
1741 assert_int_equal(eresult, ISC_R_SUCCESS);
1742
1743 /* Read */
1744 isc_refcount_increment0(&active_creads);
1745 isc_nmhandle_attach(handle, &readhandle);
1746 isc_nm_read(handle, udp_timeout_recovery_read_cb, cbarg);
1747
1748 /* Send */
1749 isc_refcount_increment0(&active_csends);
1750 isc_nmhandle_attach(handle, &sendhandle);
1751 isc_nmhandle_setwritetimeout(handle, T_IDLE);
1752 isc_nm_send(sendhandle, (isc_region_t *)&send_msg,
1753 udp_timeout_recovery_send_cb, cbarg);
1754
1755 atomic_fetch_add(&cconnects, 1);
1756 }
1757
1758 int
1759 udp_timeout_recovery_setup(void **state) {
1760 setup_udp_test(state);
1761 expected_cconnects = 1;
1762 expected_csends = 1;
1763 expected_creads = 1;
1764 expected_ctimeouts = 4;
1765 return 0;
1766 }
1767
1768 int
1769 udp_timeout_recovery_teardown(void **state) {
1770 atomic_assert_int_eq(cconnects, expected_cconnects);
1771 atomic_assert_int_eq(csends, expected_csends);
1772 atomic_assert_int_eq(csends, expected_creads);
1773 atomic_assert_int_eq(ctimeouts, expected_ctimeouts);
1774 teardown_udp_test(state);
1775 return 0;
1776 }
1777
1778 void
1779 udp_timeout_recovery(void **arg ISC_ATTR_UNUSED) {
1780 /*
1781 * Listen using the noop callback so that client reads will time out.
1782 */
1783 udp_start_listening(ISC_NM_LISTEN_ONE, udp_timeout_recovery_recv_cb);
1784
1785 /*
1786 * Connect with client timeout set to 0.05 seconds, then sleep for at
1787 * least a second for each 'tick'. timeout_retry_cb() will give up
1788 * after five timeouts.
1789 */
1790 isc_refcount_increment0(&active_cconnects);
1791 udp_connect(udp_timeout_recovery_connect_cb, listen_sock, UDP_T_SOFT);
1792 }
1793
1794 int
1795 proxyudp_timeout_recovery_setup(void **state) {
1796 udp_use_PROXY = true;
1797 return udp_timeout_recovery_setup(state);
1798 }
1799
1800 int
1801 proxyudp_timeout_recovery_teardown(void **state) {
1802 int ret = udp_timeout_recovery_teardown(state);
1803 udp_use_PROXY = false;
1804 return ret;
1805 }
1806
1807 static void
1808 udp_shutdown_connect_async_cb(void *arg ISC_ATTR_UNUSED);
1809
1810 static void
1811 udp_shutdown_connect_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult,
1812 void *cbarg) {
1813 UNUSED(handle);
1814 UNUSED(cbarg);
1815
1816 isc_refcount_decrement(&active_cconnects);
1817
1818 /*
1819 * The first UDP connect is faster than asynchronous shutdown procedure,
1820 * restart the UDP connect again and expect the failure only in the
1821 * second loop.
1822 */
1823 if (atomic_fetch_add(&cconnects, 1) == 0) {
1824 assert_int_equal(eresult, ISC_R_SUCCESS);
1825 isc_async_current(udp_shutdown_connect_async_cb, netmgr);
1826 } else {
1827 assert_int_equal(eresult, ISC_R_SHUTTINGDOWN);
1828 }
1829 }
1830
1831 static void
1832 udp_shutdown_connect_async_cb(void *arg ISC_ATTR_UNUSED) {
1833 isc_refcount_increment0(&active_cconnects);
1834 udp_connect(udp_shutdown_connect_connect_cb, NULL, T_SOFT);
1835 }
1836
1837 int
1838 udp_shutdown_connect_setup(void **state) {
1839 setup_udp_test(state);
1840 expected_cconnects = 2;
1841 return 0;
1842 }
1843
1844 int
1845 udp_shutdown_connect_teardown(void **state) {
1846 atomic_assert_int_eq(cconnects, expected_cconnects);
1847 teardown_udp_test(state);
1848 return 0;
1849 }
1850
1851 void
1852 udp_shutdown_connect(void **arg ISC_ATTR_UNUSED) {
1853 isc_loopmgr_shutdown(loopmgr);
1854 /*
1855 * isc_nm_udpconnect() is synchronous, so we need to launch this on the
1856 * async loop.
1857 */
1858 isc_async_current(udp_shutdown_connect_async_cb, netmgr);
1859 }
1860
1861 int
1862 proxyudp_shutdown_connect_setup(void **state) {
1863 udp_use_PROXY = true;
1864 return udp_shutdown_connect_setup(state);
1865 }
1866
1867 int
1868 proxyudp_shutdown_connect_teardown(void **state) {
1869 int ret = udp_shutdown_connect_teardown(state);
1870 udp_use_PROXY = false;
1871 return ret;
1872 }
1873
1874 static void
1875 udp_shutdown_read_recv_cb(isc_nmhandle_t *handle, isc_result_t eresult,
1876 isc_region_t *region, void *cbarg) {
1877 uint64_t magic = 0;
1878
1879 UNUSED(cbarg);
1880
1881 assert_non_null(handle);
1882
1883 F();
1884
1885 assert_int_equal(eresult, ISC_R_SUCCESS);
1886
1887 assert_true(region->length == sizeof(magic));
1888
1889 memmove(&magic, region->base, sizeof(magic));
1890 assert_true(magic == send_magic);
1891 }
1892
1893 static void
1894 udp_shutdown_read_send_cb(isc_nmhandle_t *handle, isc_result_t eresult,
1895 void *cbarg) {
1896 UNUSED(cbarg);
1897
1898 F();
1899
1900 assert_non_null(handle);
1901 assert_int_equal(eresult, ISC_R_SUCCESS);
1902
1903 atomic_fetch_add(&csends, 1);
1904
1905 isc_loopmgr_shutdown(loopmgr);
1906
1907 isc_nmhandle_detach(&handle);
1908 isc_refcount_decrement(&active_csends);
1909 }
1910
1911 static void
1912 udp_shutdown_read_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
1913 isc_region_t *region, void *cbarg) {
1914 UNUSED(region);
1915 UNUSED(cbarg);
1916
1917 assert_true(eresult == ISC_R_SHUTTINGDOWN || eresult == ISC_R_TIMEDOUT);
1918
1919 isc_refcount_decrement(&active_creads);
1920
1921 atomic_fetch_add(&creads, 1);
1922
1923 isc_nmhandle_detach(&handle);
1924 }
1925
1926 static void
1927 udp_shutdown_read_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult,
1928 void *cbarg) {
1929 isc_nmhandle_t *readhandle = NULL;
1930 isc_nmhandle_t *sendhandle = NULL;
1931
1932 isc_refcount_decrement(&active_cconnects);
1933
1934 assert_int_equal(eresult, ISC_R_SUCCESS);
1935
1936 /* Read */
1937 isc_refcount_increment0(&active_creads);
1938 isc_nmhandle_attach(handle, &readhandle);
1939 isc_nm_read(handle, udp_shutdown_read_read_cb, cbarg);
1940 assert_true(handle->sock->reading);
1941
1942 /* Send */
1943 isc_refcount_increment0(&active_csends);
1944 isc_nmhandle_attach(handle, &sendhandle);
1945 isc_nmhandle_setwritetimeout(handle, T_IDLE);
1946 isc_nm_send(sendhandle, (isc_region_t *)&send_msg,
1947 udp_shutdown_read_send_cb, cbarg);
1948
1949 atomic_fetch_add(&cconnects, 1);
1950 }
1951
1952 int
1953 udp_shutdown_read_setup(void **state) {
1954 setup_udp_test(state);
1955 expected_cconnects = 1;
1956 expected_creads = 1;
1957 return 0;
1958 }
1959
1960 int
1961 udp_shutdown_read_teardown(void **state) {
1962 atomic_assert_int_eq(cconnects, expected_cconnects);
1963 atomic_assert_int_eq(creads, expected_creads);
1964 teardown_udp_test(state);
1965 return 0;
1966 }
1967
1968 void
1969 udp_shutdown_read(void **arg ISC_ATTR_UNUSED) {
1970 udp_start_listening(ISC_NM_LISTEN_ONE, udp_shutdown_read_recv_cb);
1971
1972 isc_refcount_increment0(&active_cconnects);
1973 udp_connect(udp_shutdown_read_connect_cb, NULL, UDP_T_SOFT);
1974 }
1975
1976 int
1977 proxyudp_shutdown_read_setup(void **state) {
1978 udp_use_PROXY = true;
1979 return udp_shutdown_read_setup(state);
1980 }
1981
1982 int
1983 proxyudp_shutdown_read_teardown(void **state) {
1984 int ret = udp_shutdown_read_teardown(state);
1985 udp_use_PROXY = false;
1986 return ret;
1987 }
1988
1989 static void
1990 udp_cancel_read_recv_cb(isc_nmhandle_t *handle, isc_result_t eresult,
1991 isc_region_t *region, void *cbarg) {
1992 uint64_t magic = 0;
1993
1994 UNUSED(cbarg);
1995
1996 assert_non_null(handle);
1997
1998 F();
1999
2000 assert_int_equal(eresult, ISC_R_SUCCESS);
2001
2002 assert_true(region->length == sizeof(magic));
2003
2004 memmove(&magic, region->base, sizeof(magic));
2005 assert_true(magic == send_magic);
2006 }
2007
2008 static void
2009 udp_cancel_read_send_cb(isc_nmhandle_t *handle, isc_result_t eresult,
2010 void *cbarg) {
2011 UNUSED(cbarg);
2012
2013 F();
2014
2015 assert_non_null(handle);
2016 assert_int_equal(eresult, ISC_R_SUCCESS);
2017
2018 atomic_fetch_add(&csends, 1);
2019
2020 isc_nm_cancelread(handle);
2021
2022 isc_nmhandle_detach(&handle);
2023 isc_refcount_decrement(&active_csends);
2024 }
2025
2026 static void
2027 udp_cancel_read_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
2028 isc_region_t *region, void *cbarg) {
2029 isc_nmhandle_t *sendhandle = NULL;
2030 isc_nmhandle_t *readhandle = NULL;
2031
2032 UNUSED(region);
2033
2034 F();
2035
2036 switch (eresult) {
2037 case ISC_R_TIMEDOUT:
2038
2039 /* Read again */
2040 isc_refcount_increment0(&active_creads);
2041 isc_nmhandle_attach(handle, &readhandle);
2042 isc_nm_read(handle, udp_cancel_read_read_cb, cbarg);
2043
2044 /* Send only once */
2045 if (isc_refcount_increment0(&active_csends) == 0) {
2046 isc_nmhandle_attach(handle, &sendhandle);
2047 isc_nmhandle_setwritetimeout(handle, T_IDLE);
2048 isc_nm_send(sendhandle, (isc_region_t *)&send_msg,
2049 udp_cancel_read_send_cb, cbarg);
2050 }
2051 break;
2052 case ISC_R_CANCELED:
2053 /* The read has been canceled */
2054 atomic_fetch_add(&creads, 1);
2055 isc_loopmgr_shutdown(loopmgr);
2056 break;
2057 default:
2058 UNREACHABLE();
2059 }
2060
2061 isc_refcount_decrement(&active_creads);
2062
2063 isc_nmhandle_detach(&handle);
2064 }
2065
2066 static void
2067 udp_cancel_read_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult,
2068 void *cbarg) {
2069 isc_nmhandle_t *readhandle = NULL;
2070
2071 isc_refcount_decrement(&active_cconnects);
2072
2073 assert_int_equal(eresult, ISC_R_SUCCESS);
2074
2075 isc_refcount_increment0(&active_creads);
2076 isc_nmhandle_attach(handle, &readhandle);
2077 isc_nm_read(handle, udp_cancel_read_read_cb, cbarg);
2078
2079 atomic_fetch_add(&cconnects, 1);
2080 }
2081
2082 int
2083 udp_cancel_read_setup(void **state) {
2084 setup_udp_test(state);
2085 expected_cconnects = 1;
2086 expected_creads = 1;
2087 return 0;
2088 }
2089
2090 int
2091 udp_cancel_read_teardown(void **state) {
2092 atomic_assert_int_eq(cconnects, expected_cconnects);
2093 atomic_assert_int_eq(creads, expected_creads);
2094 teardown_udp_test(state);
2095 return 0;
2096 }
2097
2098 void
2099 udp_cancel_read(void **arg ISC_ATTR_UNUSED) {
2100 udp_start_listening(ISC_NM_LISTEN_ONE, udp_cancel_read_recv_cb);
2101
2102 isc_refcount_increment0(&active_cconnects);
2103 udp_connect(udp_cancel_read_connect_cb, NULL, UDP_T_SOFT);
2104 }
2105
2106 int
2107 proxyudp_cancel_read_setup(void **state) {
2108 udp_use_PROXY = true;
2109 return udp_cancel_read_setup(state);
2110 }
2111
2112 int
2113 proxyudp_cancel_read_teardown(void **state) {
2114 int ret = udp_cancel_read_teardown(state);
2115 udp_use_PROXY = false;
2116 return ret;
2117 }
2118
2119 int
2120 udp_recv_one_setup(void **state) {
2121 setup_udp_test(state);
2122
2123 connect_readcb = udp__connect_read_cb;
2124
2125 expected_cconnects = 1;
2126 cconnects_shutdown = false;
2127
2128 expected_csends = 1;
2129 csends_shutdown = false;
2130
2131 expected_sreads = 1;
2132 sreads_shutdown = false;
2133
2134 expected_ssends = 1;
2135 ssends_shutdown = false;
2136
2137 expected_creads = 1;
2138 creads_shutdown = true;
2139
2140 return 0;
2141 }
2142
2143 int
2144 udp_recv_one_teardown(void **state) {
2145 atomic_assert_int_eq(cconnects, expected_cconnects);
2146 atomic_assert_int_eq(csends, expected_csends);
2147 atomic_assert_int_eq(sreads, expected_sreads);
2148 atomic_assert_int_eq(ssends, expected_ssends);
2149 atomic_assert_int_eq(creads, expected_creads);
2150
2151 teardown_udp_test(state);
2152
2153 return 0;
2154 }
2155
2156 void
2157 udp_recv_one(void **arg ISC_ATTR_UNUSED) {
2158 udp_start_listening(ISC_NM_LISTEN_ONE, udp_listen_read_cb);
2159
2160 udp_enqueue_connect(NULL);
2161 }
2162
2163 int
2164 proxyudp_recv_one_setup(void **state) {
2165 udp_use_PROXY = true;
2166 return udp_recv_one_setup(state);
2167 }
2168
2169 int
2170 proxyudp_recv_one_teardown(void **state) {
2171 int ret = udp_recv_one_teardown(state);
2172 udp_use_PROXY = false;
2173 return ret;
2174 }
2175
2176 int
2177 udp_recv_two_setup(void **state) {
2178 setup_udp_test(state);
2179
2180 connect_readcb = udp__connect_read_cb;
2181
2182 expected_cconnects = 2;
2183 cconnects_shutdown = false;
2184
2185 expected_csends = 2;
2186 csends_shutdown = false;
2187
2188 expected_sreads = 2;
2189 sreads_shutdown = false;
2190
2191 expected_ssends = 2;
2192 ssends_shutdown = false;
2193
2194 expected_creads = 2;
2195 creads_shutdown = true;
2196
2197 return 0;
2198 }
2199
2200 int
2201 udp_recv_two_teardown(void **state) {
2202 atomic_assert_int_eq(cconnects, expected_cconnects);
2203 atomic_assert_int_eq(csends, expected_csends);
2204 atomic_assert_int_eq(sreads, expected_sreads);
2205 atomic_assert_int_eq(ssends, expected_ssends);
2206 atomic_assert_int_eq(creads, expected_creads);
2207
2208 teardown_udp_test(state);
2209 return 0;
2210 }
2211
2212 void
2213 udp_recv_two(void **arg ISC_ATTR_UNUSED) {
2214 udp_start_listening(ISC_NM_LISTEN_ONE, udp_listen_read_cb);
2215
2216 udp_enqueue_connect(NULL);
2217 udp_enqueue_connect(NULL);
2218 }
2219
2220 int
2221 proxyudp_recv_two_setup(void **state) {
2222 udp_use_PROXY = true;
2223 return udp_recv_two_setup(state);
2224 }
2225
2226 int
2227 proxyudp_recv_two_teardown(void **state) {
2228 int ret = udp_recv_two_teardown(state);
2229 udp_use_PROXY = false;
2230 return ret;
2231 }
2232
2233 int
2234 udp_recv_send_setup(void **state) {
2235 setup_udp_test(state);
2236
2237 /* Allow some leeway (+1) as datagram service is unreliable */
2238 expected_cconnects = (workers + 1) * NSENDS;
2239 cconnects_shutdown = false;
2240
2241 expected_creads = workers * NSENDS;
2242 do_send = true;
2243
2244 return 0;
2245 }
2246
2247 int
2248 udp_recv_send_teardown(void **state) {
2249 atomic_assert_int_ge(cconnects, expected_creads);
2250 atomic_assert_int_ge(csends, expected_creads);
2251 atomic_assert_int_ge(sreads, expected_creads);
2252 atomic_assert_int_ge(ssends, expected_creads);
2253 atomic_assert_int_ge(creads, expected_creads);
2254
2255 teardown_udp_test(state);
2256 return 0;
2257 }
2258
2259 void
2260 udp_recv_send(void **arg ISC_ATTR_UNUSED) {
2261 udp_start_listening(ISC_NM_LISTEN_ALL, udp_listen_read_cb);
2262
2263 for (size_t i = 0; i < workers; i++) {
2264 isc_async_run(isc_loop_get(loopmgr, i), udp_enqueue_connect,
2265 NULL);
2266 }
2267 }
2268
2269 int
2270 proxyudp_recv_send_setup(void **state) {
2271 udp_use_PROXY = true;
2272 return udp_recv_send_setup(state);
2273 }
2274
2275 int
2276 proxyudp_recv_send_teardown(void **state) {
2277 int ret = udp_recv_send_teardown(state);
2278 udp_use_PROXY = false;
2279 return ret;
2280 }
2281
2282 static void
2283 udp_double_read_send_cb(isc_nmhandle_t *handle, isc_result_t eresult,
2284 void *cbarg) {
2285 assert_non_null(handle);
2286
2287 F();
2288
2289 isc_refcount_decrement(&active_ssends);
2290
2291 switch (eresult) {
2292 case ISC_R_SUCCESS:
2293 if (have_expected_ssends(atomic_fetch_add(&ssends, 1) + 1)) {
2294 do_ssends_shutdown(loopmgr);
2295 } else {
2296 isc_nmhandle_t *sendhandle = NULL;
2297 isc_nmhandle_attach(handle, &sendhandle);
2298 isc_nmhandle_setwritetimeout(sendhandle, T_IDLE);
2299 isc_refcount_increment0(&active_ssends);
2300 isc_nm_send(sendhandle, &send_msg,
2301 udp_double_read_send_cb, cbarg);
2302 break;
2303 }
2304 break;
2305 case ISC_R_CANCELED:
2306 break;
2307 default:
2308 fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
2309 isc_result_totext(eresult), cbarg);
2310 assert_int_equal(eresult, ISC_R_SUCCESS);
2311 }
2312
2313 isc_nmhandle_detach(&handle);
2314 }
2315
2316 static void
2317 udp_double_read_listen_cb(isc_nmhandle_t *handle, isc_result_t eresult,
2318 isc_region_t *region, void *cbarg) {
2319 uint64_t magic = 0;
2320
2321 assert_non_null(handle);
2322
2323 F();
2324
2325 switch (eresult) {
2326 case ISC_R_EOF:
2327 case ISC_R_SHUTTINGDOWN:
2328 case ISC_R_CANCELED:
2329 break;
2330 case ISC_R_SUCCESS:
2331 memmove(&magic, region->base, sizeof(magic));
2332 assert_true(magic == send_magic);
2333
2334 assert_true(region->length >= sizeof(magic));
2335
2336 memmove(&magic, region->base, sizeof(magic));
2337 assert_true(magic == send_magic);
2338
2339 isc_nmhandle_t *sendhandle = NULL;
2340 isc_nmhandle_attach(handle, &sendhandle);
2341 isc_nmhandle_setwritetimeout(sendhandle, T_IDLE);
2342 isc_refcount_increment0(&active_ssends);
2343 isc_nm_send(sendhandle, &send_msg, udp_double_read_send_cb,
2344 cbarg);
2345 return;
2346 default:
2347 fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
2348 isc_result_totext(eresult), cbarg);
2349 assert_int_equal(eresult, ISC_R_SUCCESS);
2350 }
2351
2352 isc_refcount_decrement(&active_sreads);
2353
2354 isc_nmhandle_detach(&handle);
2355 }
2356
2357 static void
2358 udp_double_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
2359 isc_region_t *region, void *cbarg) {
2360 uint64_t magic = 0;
2361 bool detach = false;
2362
2363 assert_non_null(handle);
2364
2365 F();
2366
2367 switch (eresult) {
2368 case ISC_R_TIMEDOUT:
2369 /*
2370 * We are operating on the localhost, UDP cannot get lost, but
2371 * it could be delayed, so we read again until we get the
2372 * answer.
2373 */
2374 detach = false;
2375 break;
2376 case ISC_R_SUCCESS:
2377 assert_true(region->length >= sizeof(magic));
2378
2379 memmove(&magic, region->base, sizeof(magic));
2380
2381 assert_true(magic == send_magic);
2382
2383 if (have_expected_creads(atomic_fetch_add(&creads, 1) + 1)) {
2384 do_creads_shutdown(loopmgr);
2385 detach = true;
2386 }
2387
2388 if (magic == send_magic && allow_send_back) {
2389 connect_send(handle);
2390 return;
2391 }
2392
2393 break;
2394 case ISC_R_EOF:
2395 case ISC_R_SHUTTINGDOWN:
2396 case ISC_R_CANCELED:
2397 case ISC_R_CONNECTIONRESET:
2398 detach = true;
2399 break;
2400 default:
2401 fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
2402 isc_result_totext(eresult), cbarg);
2403 assert_int_equal(eresult, ISC_R_SUCCESS);
2404 }
2405
2406 if (detach) {
2407 isc_refcount_decrement(&active_creads);
2408 isc_nmhandle_detach(&handle);
2409 } else {
2410 isc_nm_read(handle, connect_readcb, cbarg);
2411 }
2412 }
2413
2414 int
2415 udp_double_read_setup(void **state) {
2416 setup_udp_test(state);
2417
2418 expected_cconnects = 1;
2419 cconnects_shutdown = false;
2420
2421 expected_csends = 1;
2422 csends_shutdown = false;
2423
2424 expected_sreads = 1;
2425 sreads_shutdown = false;
2426
2427 expected_ssends = 2;
2428 ssends_shutdown = false;
2429 expected_creads = 2;
2430 creads_shutdown = true;
2431
2432 connect_readcb = udp_double_read_cb;
2433
2434 return 0;
2435 }
2436
2437 int
2438 udp_double_read_teardown(void **state) {
2439 atomic_assert_int_eq(creads, expected_creads);
2440
2441 teardown_udp_test(state);
2442
2443 return 0;
2444 }
2445
2446 void
2447 udp_double_read(void **arg ISC_ATTR_UNUSED) {
2448 udp_start_listening(ISC_NM_LISTEN_ALL, udp_double_read_listen_cb);
2449
2450 udp_enqueue_connect(NULL);
2451 }
2452
2453 int
2454 proxyudp_double_read_setup(void **state) {
2455 udp_use_PROXY = true;
2456 return udp_double_read_setup(state);
2457 }
2458
2459 int
2460 proxyudp_double_read_teardown(void **state) {
2461 int ret = udp_double_read_teardown(state);
2462 udp_use_PROXY = false;
2463 return ret;
2464 }
2465