Home | History | Annotate | Line # | Download | only in isc
      1 /*	$NetBSD: netmgr_common.c,v 1.4 2025/07/17 19:01:47 christos Exp $	*/
      2 
      3 /*
      4  * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
      5  *
      6  * SPDX-License-Identifier: MPL-2.0
      7  *
      8  * This Source Code Form is subject to the terms of the Mozilla Public
      9  * License, v. 2.0. If a copy of the MPL was not distributed with this
     10  * file, you can obtain one at https://mozilla.org/MPL/2.0/.
     11  *
     12  * See the COPYRIGHT file distributed with this work for additional
     13  * information regarding copyright ownership.
     14  */
     15 
     16 #include <inttypes.h>
     17 #include <sched.h> /* IWYU pragma: keep */
     18 #include <setjmp.h>
     19 #include <signal.h>
     20 #include <stdarg.h>
     21 #include <stdlib.h>
     22 #include <unistd.h>
     23 
     24 /*
     25  * As a workaround, include an OpenSSL header file before including cmocka.h,
     26  * because OpenSSL 3.1.0 uses __attribute__(malloc), conflicting with a
     27  * redefined malloc in cmocka.h.
     28  */
     29 #include <openssl/err.h>
     30 
     31 #define UNIT_TESTING
     32 #include <cmocka.h>
     33 
     34 #include <isc/async.h>
     35 #include <isc/nonce.h>
     36 #include <isc/os.h>
     37 #include <isc/quota.h>
     38 #include <isc/refcount.h>
     39 #include <isc/sockaddr.h>
     40 #include <isc/thread.h>
     41 #include <isc/util.h>
     42 #include <isc/uv.h>
     43 #define KEEP_BEFORE
     44 
     45 #include "netmgr_common.h"
     46 
     47 #include <tests/isc.h>
     48 
     49 isc_nm_t *listen_nm = NULL;
     50 isc_nm_t *connect_nm = NULL;
     51 
     52 isc_sockaddr_t tcp_listen_addr;
     53 isc_sockaddr_t tcp_connect_addr;
     54 isc_tlsctx_t *tcp_listen_tlsctx = NULL;
     55 isc_tlsctx_t *tcp_connect_tlsctx = NULL;
     56 isc_tlsctx_client_session_cache_t *tcp_tlsctx_client_sess_cache = NULL;
     57 
     58 isc_sockaddr_t udp_listen_addr;
     59 isc_sockaddr_t udp_connect_addr;
     60 
     61 uint64_t send_magic = 0;
     62 
     63 isc_region_t send_msg = { .base = (unsigned char *)&send_magic,
     64 			  .length = sizeof(send_magic) };
     65 
     66 atomic_bool do_send = false;
     67 
     68 atomic_int_fast64_t nsends = 0;
     69 int_fast64_t esends = 0; /* expected sends */
     70 
     71 atomic_int_fast64_t ssends = 0;
     72 atomic_int_fast64_t sreads = 0;
     73 atomic_int_fast64_t saccepts = 0;
     74 
     75 atomic_int_fast64_t cconnects = 0;
     76 atomic_int_fast64_t csends = 0;
     77 atomic_int_fast64_t creads = 0;
     78 atomic_int_fast64_t ctimeouts = 0;
     79 
     80 int expected_ssends;
     81 int expected_sreads;
     82 int expected_csends;
     83 int expected_cconnects;
     84 int expected_creads;
     85 int expected_saccepts;
     86 int expected_ctimeouts;
     87 
     88 bool ssends_shutdown;
     89 bool sreads_shutdown;
     90 bool saccepts_shutdown;
     91 bool csends_shutdown;
     92 bool cconnects_shutdown;
     93 bool creads_shutdown;
     94 bool ctimeouts_shutdown;
     95 
     96 isc_refcount_t active_cconnects = 0;
     97 isc_refcount_t active_csends = 0;
     98 isc_refcount_t active_creads = 0;
     99 isc_refcount_t active_ssends = 0;
    100 isc_refcount_t active_sreads = 0;
    101 
    102 isc_nmsocket_t *listen_sock = NULL;
    103 
    104 isc_quota_t listener_quota;
    105 atomic_bool check_listener_quota = false;
    106 
    107 bool allow_send_back = false;
    108 bool noanswer = false;
    109 bool stream_use_TLS = false;
    110 bool stream_use_PROXY = false;
    111 bool stream_PROXY_over_TLS = false;
    112 bool stream = false;
    113 in_port_t stream_port = 0;
    114 
    115 bool udp_use_PROXY = false;
    116 
    117 isc_nm_recv_cb_t connect_readcb = NULL;
    118 
    119 isc_nm_proxyheader_info_t proxy_info_data;
    120 isc_nm_proxyheader_info_t *proxy_info = NULL;
    121 isc_sockaddr_t proxy_src;
    122 isc_sockaddr_t proxy_dst;
    123 
    124 int
    125 setup_netmgr_test(void **state) {
    126 	struct in_addr in;
    127 	tcp_connect_addr = (isc_sockaddr_t){ .length = 0 };
    128 	isc_sockaddr_fromin6(&tcp_connect_addr, &in6addr_loopback, 0);
    129 
    130 	tcp_listen_addr = (isc_sockaddr_t){ .length = 0 };
    131 	isc_sockaddr_fromin6(&tcp_listen_addr, &in6addr_loopback, stream_port);
    132 
    133 	RUNTIME_CHECK(inet_pton(AF_INET, "1.2.3.4", &in) == 1);
    134 	isc_sockaddr_fromin(&proxy_src, &in, 1234);
    135 	RUNTIME_CHECK(inet_pton(AF_INET, "4.3.2.1", &in) == 1);
    136 	isc_sockaddr_fromin(&proxy_dst, &in, 4321);
    137 	isc_nm_proxyheader_info_init(&proxy_info_data, &proxy_src, &proxy_dst,
    138 				     NULL);
    139 
    140 	esends = NSENDS * workers;
    141 
    142 	atomic_store(&nsends, esends);
    143 
    144 	atomic_store(&saccepts, 0);
    145 	atomic_store(&sreads, 0);
    146 	atomic_store(&ssends, 0);
    147 
    148 	atomic_store(&cconnects, 0);
    149 	atomic_store(&csends, 0);
    150 	atomic_store(&creads, 0);
    151 	atomic_store(&ctimeouts, 0);
    152 	allow_send_back = false;
    153 
    154 	expected_cconnects = -1;
    155 	expected_csends = -1;
    156 	expected_creads = -1;
    157 	expected_sreads = -1;
    158 	expected_ssends = -1;
    159 	expected_saccepts = -1;
    160 	expected_ctimeouts = -1;
    161 
    162 	ssends_shutdown = true;
    163 	sreads_shutdown = true;
    164 	saccepts_shutdown = true;
    165 	csends_shutdown = true;
    166 	cconnects_shutdown = true;
    167 	creads_shutdown = true;
    168 	ctimeouts_shutdown = true;
    169 
    170 	do_send = false;
    171 
    172 	isc_refcount_init(&active_cconnects, 0);
    173 	isc_refcount_init(&active_csends, 0);
    174 	isc_refcount_init(&active_creads, 0);
    175 	isc_refcount_init(&active_ssends, 0);
    176 	isc_refcount_init(&active_sreads, 0);
    177 
    178 	isc_nonce_buf(&send_magic, sizeof(send_magic));
    179 
    180 	setup_loopmgr(state);
    181 	isc_netmgr_create(mctx, loopmgr, &listen_nm);
    182 	assert_non_null(listen_nm);
    183 	isc_nm_settimeouts(listen_nm, T_INIT, T_IDLE, T_KEEPALIVE,
    184 			   T_ADVERTISED);
    185 
    186 	isc_netmgr_create(mctx, loopmgr, &connect_nm);
    187 	assert_non_null(connect_nm);
    188 	isc_nm_settimeouts(connect_nm, T_INIT, T_IDLE, T_KEEPALIVE,
    189 			   T_ADVERTISED);
    190 
    191 	isc_quota_init(&listener_quota, 0);
    192 	atomic_store(&check_listener_quota, false);
    193 
    194 	connect_readcb = connect_read_cb;
    195 	noanswer = false;
    196 
    197 	if (isc_tlsctx_createserver(NULL, NULL, &tcp_listen_tlsctx) !=
    198 	    ISC_R_SUCCESS)
    199 	{
    200 		return -1;
    201 	}
    202 	if (isc_tlsctx_createclient(&tcp_connect_tlsctx) != ISC_R_SUCCESS) {
    203 		return -1;
    204 	}
    205 
    206 	isc_tlsctx_enable_dot_client_alpn(tcp_connect_tlsctx);
    207 
    208 	isc_tlsctx_client_session_cache_create(
    209 		mctx, tcp_connect_tlsctx,
    210 		ISC_TLSCTX_CLIENT_SESSION_CACHE_DEFAULT_SIZE,
    211 		&tcp_tlsctx_client_sess_cache);
    212 
    213 	return 0;
    214 }
    215 
    216 int
    217 teardown_netmgr_test(void **state ISC_ATTR_UNUSED) {
    218 	UNUSED(state);
    219 
    220 	isc_tlsctx_client_session_cache_detach(&tcp_tlsctx_client_sess_cache);
    221 
    222 	isc_tlsctx_free(&tcp_connect_tlsctx);
    223 	isc_tlsctx_free(&tcp_listen_tlsctx);
    224 
    225 	isc_netmgr_destroy(&connect_nm);
    226 	assert_null(connect_nm);
    227 
    228 	isc_netmgr_destroy(&listen_nm);
    229 	assert_null(listen_nm);
    230 
    231 	teardown_loopmgr(state);
    232 
    233 	isc_refcount_destroy(&active_cconnects);
    234 	isc_refcount_destroy(&active_csends);
    235 	isc_refcount_destroy(&active_creads);
    236 	isc_refcount_destroy(&active_ssends);
    237 	isc_refcount_destroy(&active_sreads);
    238 
    239 	proxy_info = NULL;
    240 
    241 	return 0;
    242 }
    243 
    244 void
    245 stop_listening(void *arg ISC_ATTR_UNUSED) {
    246 	isc_nm_stoplistening(listen_sock);
    247 	isc_nmsocket_close(&listen_sock);
    248 	assert_null(listen_sock);
    249 }
    250 
    251 /* Callbacks */
    252 
    253 void
    254 noop_recv_cb(isc_nmhandle_t *handle ISC_ATTR_UNUSED,
    255 	     isc_result_t eresult ISC_ATTR_UNUSED,
    256 	     isc_region_t *region ISC_ATTR_UNUSED,
    257 	     void *cbarg ISC_ATTR_UNUSED) {
    258 	F();
    259 }
    260 
    261 isc_result_t
    262 noop_accept_cb(isc_nmhandle_t *handle ISC_ATTR_UNUSED, isc_result_t eresult,
    263 	       void *cbarg ISC_ATTR_UNUSED) {
    264 	F();
    265 
    266 	if (eresult == ISC_R_SUCCESS) {
    267 		(void)atomic_fetch_add(&saccepts, 1);
    268 	}
    269 
    270 	return ISC_R_SUCCESS;
    271 }
    272 
    273 void
    274 connect_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg);
    275 
    276 void
    277 connect_send(isc_nmhandle_t *handle);
    278 
    279 void
    280 connect_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
    281 	isc_nmhandle_t *sendhandle = handle;
    282 
    283 	assert_non_null(sendhandle);
    284 
    285 	UNUSED(cbarg);
    286 
    287 	F();
    288 
    289 	switch (eresult) {
    290 	case ISC_R_EOF:
    291 	case ISC_R_SHUTTINGDOWN:
    292 	case ISC_R_CANCELED:
    293 	case ISC_R_CONNECTIONRESET:
    294 		/* Abort */
    295 		if (!stream) {
    296 			isc_nm_cancelread(handle);
    297 		}
    298 		break;
    299 	case ISC_R_SUCCESS:
    300 		if (have_expected_csends(atomic_fetch_add(&csends, 1) + 1)) {
    301 			do_csends_shutdown(loopmgr);
    302 		}
    303 		break;
    304 	default:
    305 		fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
    306 			isc_result_totext(eresult), cbarg);
    307 		assert_int_equal(eresult, ISC_R_SUCCESS);
    308 	}
    309 
    310 	isc_refcount_decrement(&active_csends);
    311 	isc_nmhandle_detach(&sendhandle);
    312 }
    313 
    314 void
    315 connect_send(isc_nmhandle_t *handle) {
    316 	isc_nmhandle_t *sendhandle = NULL;
    317 	isc_refcount_increment0(&active_csends);
    318 	isc_nmhandle_attach(handle, &sendhandle);
    319 	isc_nmhandle_setwritetimeout(handle, T_IDLE);
    320 	isc_nm_send(sendhandle, &send_msg, connect_send_cb, NULL);
    321 }
    322 
    323 void
    324 connect_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
    325 		isc_region_t *region, void *cbarg) {
    326 	uint64_t magic = 0;
    327 
    328 	UNUSED(cbarg);
    329 
    330 	assert_non_null(handle);
    331 
    332 	F();
    333 
    334 	switch (eresult) {
    335 	case ISC_R_SUCCESS:
    336 		assert_true(region->length >= sizeof(magic));
    337 
    338 		memmove(&magic, region->base, sizeof(magic));
    339 
    340 		assert_true(magic == send_magic);
    341 
    342 		if (have_expected_creads(atomic_fetch_add(&creads, 1) + 1)) {
    343 			do_creads_shutdown(loopmgr);
    344 		}
    345 
    346 		if (magic == send_magic && allow_send_back) {
    347 			connect_send(handle);
    348 			return;
    349 		}
    350 
    351 		/* This will initiate one more read callback */
    352 		if (stream) {
    353 			isc_nmhandle_close(handle);
    354 		}
    355 		break;
    356 	case ISC_R_TIMEDOUT:
    357 	case ISC_R_EOF:
    358 	case ISC_R_SHUTTINGDOWN:
    359 	case ISC_R_CANCELED:
    360 	case ISC_R_CONNECTIONRESET:
    361 	case ISC_R_CONNREFUSED:
    362 		break;
    363 	default:
    364 		fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
    365 			isc_result_totext(eresult), cbarg);
    366 		assert_int_equal(eresult, ISC_R_SUCCESS);
    367 	}
    368 
    369 	isc_refcount_decrement(&active_creads);
    370 	isc_nmhandle_detach(&handle);
    371 }
    372 
    373 void
    374 connect_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
    375 	isc_nmhandle_t *readhandle = NULL;
    376 
    377 	F();
    378 
    379 	isc_refcount_decrement(&active_cconnects);
    380 
    381 	if (eresult != ISC_R_SUCCESS || connect_readcb == NULL) {
    382 		return;
    383 	}
    384 
    385 	if (stream_use_PROXY) {
    386 		assert_true(isc_nm_is_proxy_handle(handle));
    387 	}
    388 
    389 	/* We are finished, initiate the shutdown */
    390 	if (have_expected_cconnects(atomic_fetch_add(&cconnects, 1) + 1)) {
    391 		do_cconnects_shutdown(loopmgr);
    392 	} else if (do_send) {
    393 		isc_async_current(stream_recv_send_connect,
    394 				  cbarg == NULL
    395 					  ? get_stream_connect_function()
    396 					  : (stream_connect_function)cbarg);
    397 	}
    398 
    399 	isc_refcount_increment0(&active_creads);
    400 	isc_nmhandle_attach(handle, &readhandle);
    401 	isc_nm_read(handle, connect_readcb, NULL);
    402 
    403 	connect_send(handle);
    404 }
    405 
    406 void
    407 listen_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
    408 	isc_nmhandle_t *sendhandle = handle;
    409 
    410 	UNUSED(cbarg);
    411 	UNUSED(eresult);
    412 
    413 	assert_non_null(sendhandle);
    414 
    415 	F();
    416 
    417 	switch (eresult) {
    418 	case ISC_R_CANCELED:
    419 	case ISC_R_CONNECTIONRESET:
    420 	case ISC_R_EOF:
    421 	case ISC_R_SHUTTINGDOWN:
    422 		break;
    423 	case ISC_R_SUCCESS:
    424 		if (have_expected_ssends(atomic_fetch_add(&ssends, 1) + 1)) {
    425 			do_ssends_shutdown(loopmgr);
    426 		}
    427 		break;
    428 	default:
    429 		fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
    430 			isc_result_totext(eresult), cbarg);
    431 		assert_int_equal(eresult, ISC_R_SUCCESS);
    432 	}
    433 
    434 	isc_refcount_decrement(&active_ssends);
    435 	isc_nmhandle_detach(&sendhandle);
    436 }
    437 
    438 void
    439 listen_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
    440 	       isc_region_t *region, void *cbarg) {
    441 	uint64_t magic = 0;
    442 
    443 	assert_non_null(handle);
    444 
    445 	F();
    446 
    447 	switch (eresult) {
    448 	case ISC_R_SUCCESS:
    449 		if (udp_use_PROXY || stream_use_PROXY) {
    450 			assert_true(isc_nm_is_proxy_handle(handle));
    451 			proxy_verify_endpoints(handle);
    452 		}
    453 
    454 		memmove(&magic, region->base, sizeof(magic));
    455 		assert_true(magic == send_magic);
    456 
    457 		if (have_expected_sreads(atomic_fetch_add(&sreads, 1) + 1)) {
    458 			do_sreads_shutdown(loopmgr);
    459 		}
    460 
    461 		assert_true(region->length >= sizeof(magic));
    462 
    463 		memmove(&magic, region->base, sizeof(magic));
    464 		assert_true(magic == send_magic);
    465 
    466 		if (!noanswer) {
    467 			/* Answer and continue to listen */
    468 			isc_nmhandle_t *sendhandle = NULL;
    469 			isc_nmhandle_attach(handle, &sendhandle);
    470 			isc_refcount_increment0(&active_ssends);
    471 			isc_nmhandle_setwritetimeout(sendhandle, T_IDLE);
    472 			isc_nm_send(sendhandle, &send_msg, listen_send_cb,
    473 				    cbarg);
    474 		}
    475 		/* Continue to listen */
    476 		return;
    477 	case ISC_R_CANCELED:
    478 	case ISC_R_CONNECTIONRESET:
    479 	case ISC_R_EOF:
    480 	case ISC_R_SHUTTINGDOWN:
    481 		break;
    482 	default:
    483 		fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
    484 			isc_result_totext(eresult), cbarg);
    485 		assert_int_equal(eresult, ISC_R_SUCCESS);
    486 	}
    487 
    488 	isc_refcount_decrement(&active_sreads);
    489 	isc_nmhandle_detach(&handle);
    490 }
    491 
    492 isc_result_t
    493 listen_accept_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
    494 	UNUSED(handle);
    495 	UNUSED(cbarg);
    496 
    497 	F();
    498 
    499 	if (eresult != ISC_R_SUCCESS) {
    500 		return eresult;
    501 	}
    502 
    503 	if (have_expected_saccepts(atomic_fetch_add(&saccepts, 1) + 1)) {
    504 		do_saccepts_shutdown(loopmgr);
    505 	}
    506 
    507 	isc_nmhandle_attach(handle, &(isc_nmhandle_t *){ NULL });
    508 	isc_refcount_increment0(&active_sreads);
    509 
    510 	return eresult;
    511 }
    512 
    513 isc_result_t
    514 stream_accept_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
    515 	isc_nmhandle_t *readhandle = NULL;
    516 
    517 	UNUSED(cbarg);
    518 
    519 	F();
    520 
    521 	if (eresult != ISC_R_SUCCESS) {
    522 		return eresult;
    523 	}
    524 
    525 	if (have_expected_saccepts(atomic_fetch_add(&saccepts, 1) + 1)) {
    526 		do_saccepts_shutdown(loopmgr);
    527 	}
    528 
    529 	if (stream_use_PROXY) {
    530 		assert_true(isc_nm_is_proxy_handle(handle));
    531 		proxy_verify_endpoints(handle);
    532 	}
    533 
    534 	isc_refcount_increment0(&active_sreads);
    535 
    536 	isc_nmhandle_attach(handle, &readhandle);
    537 	isc_nm_read(handle, listen_read_cb, readhandle);
    538 
    539 	return ISC_R_SUCCESS;
    540 }
    541 
    542 void
    543 stream_recv_send_connect(void *arg) {
    544 	connect_func connect = (connect_func)arg;
    545 	isc_sockaddr_t connect_addr;
    546 
    547 	connect_addr = (isc_sockaddr_t){ .length = 0 };
    548 	isc_sockaddr_fromin6(&connect_addr, &in6addr_loopback, 0);
    549 
    550 	isc_refcount_increment0(&active_cconnects);
    551 	connect(connect_nm);
    552 }
    553 
    554 /* Common stream protocols code */
    555 
    556 void
    557 timeout_retry_cb(isc_nmhandle_t *handle, isc_result_t eresult,
    558 		 isc_region_t *region, void *cbarg) {
    559 	UNUSED(region);
    560 	UNUSED(cbarg);
    561 
    562 	assert_non_null(handle);
    563 
    564 	F();
    565 
    566 	if (eresult == ISC_R_TIMEDOUT &&
    567 	    atomic_fetch_add(&ctimeouts, 1) + 1 < expected_ctimeouts)
    568 	{
    569 		isc_nmhandle_settimeout(handle, T_SOFT);
    570 		connect_send(handle);
    571 		return;
    572 	}
    573 
    574 	isc_refcount_decrement(&active_creads);
    575 	isc_nmhandle_detach(&handle);
    576 
    577 	isc_loopmgr_shutdown(loopmgr);
    578 }
    579 
    580 isc_quota_t *
    581 tcp_listener_init_quota(size_t nthreads) {
    582 	isc_quota_t *quotap = NULL;
    583 	if (atomic_load(&check_listener_quota)) {
    584 		unsigned int max_quota = ISC_MAX(nthreads / 2, 1);
    585 		isc_quota_max(&listener_quota, max_quota);
    586 		quotap = &listener_quota;
    587 	}
    588 	return quotap;
    589 }
    590 
    591 static void
    592 tcp_connect(isc_nm_t *nm) {
    593 	isc_nm_tcpconnect(nm, &tcp_connect_addr, &tcp_listen_addr,
    594 			  connect_connect_cb, NULL, T_CONNECT);
    595 }
    596 
    597 static void
    598 tls_connect(isc_nm_t *nm) {
    599 	isc_nm_tlsconnect(nm, &tcp_connect_addr, &tcp_listen_addr,
    600 			  connect_connect_cb, NULL, tcp_connect_tlsctx, NULL,
    601 			  tcp_tlsctx_client_sess_cache, T_CONNECT,
    602 			  stream_use_PROXY, NULL);
    603 }
    604 
    605 void
    606 set_proxyheader_info(isc_nm_proxyheader_info_t *pi) {
    607 	proxy_info = pi;
    608 }
    609 
    610 isc_nm_proxyheader_info_t *
    611 get_proxyheader_info(void) {
    612 	if (proxy_info != NULL) {
    613 		return proxy_info;
    614 	}
    615 
    616 	/*
    617 	 * There is 50% chance to get the info: so we can test LOCAL headers,
    618 	 * too.
    619 	 */
    620 	if (isc_random_uniform(2)) {
    621 		return &proxy_info_data;
    622 	}
    623 
    624 	return NULL;
    625 }
    626 
    627 static void
    628 proxystream_connect(isc_nm_t *nm) {
    629 	isc_tlsctx_t *tlsctx = stream_PROXY_over_TLS ? tcp_connect_tlsctx
    630 						     : NULL;
    631 	isc_tlsctx_client_session_cache_t *sess_cache =
    632 		stream_PROXY_over_TLS ? tcp_tlsctx_client_sess_cache : NULL;
    633 
    634 	isc_nm_proxystreamconnect(nm, &tcp_connect_addr, &tcp_listen_addr,
    635 				  connect_connect_cb, NULL, T_CONNECT, tlsctx,
    636 				  NULL, sess_cache, get_proxyheader_info());
    637 }
    638 
    639 stream_connect_function
    640 get_stream_connect_function(void) {
    641 	if (stream_use_TLS && !stream_PROXY_over_TLS) {
    642 		return tls_connect;
    643 	} else if (stream_use_PROXY) {
    644 		return proxystream_connect;
    645 	} else {
    646 		return tcp_connect;
    647 	}
    648 
    649 	UNREACHABLE();
    650 }
    651 
    652 isc_result_t
    653 stream_listen(isc_nm_accept_cb_t accept_cb, void *accept_cbarg, int backlog,
    654 	      isc_quota_t *quota, isc_nmsocket_t **sockp) {
    655 	isc_result_t result = ISC_R_SUCCESS;
    656 
    657 	if (stream_use_TLS && !stream_PROXY_over_TLS) {
    658 		result = isc_nm_listentls(
    659 			listen_nm, ISC_NM_LISTEN_ALL, &tcp_listen_addr,
    660 			accept_cb, accept_cbarg, backlog, quota,
    661 			tcp_listen_tlsctx, stream_use_PROXY, sockp);
    662 		return result;
    663 	} else if (stream_use_PROXY) {
    664 		isc_tlsctx_t *tlsctx = stream_PROXY_over_TLS ? tcp_listen_tlsctx
    665 							     : NULL;
    666 		result = isc_nm_listenproxystream(
    667 			listen_nm, ISC_NM_LISTEN_ALL, &tcp_listen_addr,
    668 			accept_cb, accept_cbarg, backlog, quota, tlsctx, sockp);
    669 		return result;
    670 	} else {
    671 		result = isc_nm_listentcp(listen_nm, ISC_NM_LISTEN_ALL,
    672 					  &tcp_listen_addr, accept_cb,
    673 					  accept_cbarg, backlog, quota, sockp);
    674 		return result;
    675 	}
    676 
    677 	UNREACHABLE();
    678 }
    679 
    680 void
    681 stream_connect(isc_nm_cb_t cb, void *cbarg, unsigned int timeout) {
    682 	isc_refcount_increment0(&active_cconnects);
    683 
    684 	if (stream_use_TLS && !stream_PROXY_over_TLS) {
    685 		isc_nm_tlsconnect(connect_nm, &tcp_connect_addr,
    686 				  &tcp_listen_addr, cb, cbarg,
    687 				  tcp_connect_tlsctx, NULL,
    688 				  tcp_tlsctx_client_sess_cache, timeout,
    689 				  stream_use_PROXY, NULL);
    690 		return;
    691 	} else if (stream_use_PROXY) {
    692 		isc_tlsctx_t *tlsctx = stream_PROXY_over_TLS
    693 					       ? tcp_connect_tlsctx
    694 					       : NULL;
    695 		isc_tlsctx_client_session_cache_t *sess_cache =
    696 			stream_PROXY_over_TLS ? tcp_tlsctx_client_sess_cache
    697 					      : NULL;
    698 		isc_nm_proxystreamconnect(connect_nm, &tcp_connect_addr,
    699 					  &tcp_listen_addr, cb, cbarg, timeout,
    700 					  tlsctx, NULL, sess_cache,
    701 					  get_proxyheader_info());
    702 		return;
    703 	} else {
    704 		isc_nm_tcpconnect(connect_nm, &tcp_connect_addr,
    705 				  &tcp_listen_addr, cb, cbarg, timeout);
    706 		return;
    707 	}
    708 	UNREACHABLE();
    709 }
    710 
    711 isc_nm_proxy_type_t
    712 get_proxy_type(void) {
    713 	if (!stream_use_PROXY) {
    714 		return ISC_NM_PROXY_NONE;
    715 	} else if (stream_PROXY_over_TLS) {
    716 		return ISC_NM_PROXY_ENCRYPTED;
    717 	}
    718 
    719 	return ISC_NM_PROXY_PLAIN;
    720 }
    721 
    722 void
    723 connect_success_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
    724 	UNUSED(handle);
    725 	UNUSED(cbarg);
    726 
    727 	F();
    728 
    729 	isc_refcount_decrement(&active_cconnects);
    730 	assert_int_equal(eresult, ISC_R_SUCCESS);
    731 
    732 	if (have_expected_cconnects(atomic_fetch_add(&cconnects, 1) + 1)) {
    733 		do_cconnects_shutdown(loopmgr);
    734 		return;
    735 	}
    736 }
    737 
    738 int
    739 stream_noop_setup(void **state ISC_ATTR_UNUSED) {
    740 	int r = setup_netmgr_test(state);
    741 	expected_cconnects = 1;
    742 	return r;
    743 }
    744 
    745 int
    746 proxystream_noop_setup(void **state) {
    747 	stream_use_PROXY = true;
    748 	return stream_noop_setup(state);
    749 }
    750 
    751 int
    752 proxystreamtls_noop_setup(void **state) {
    753 	stream_PROXY_over_TLS = true;
    754 	return proxystream_noop_setup(state);
    755 }
    756 
    757 void
    758 stream_noop(void **state ISC_ATTR_UNUSED) {
    759 	isc_result_t result = ISC_R_SUCCESS;
    760 
    761 	result = stream_listen(noop_accept_cb, NULL, 128, NULL, &listen_sock);
    762 	assert_int_equal(result, ISC_R_SUCCESS);
    763 	isc_loop_teardown(mainloop, stop_listening, listen_sock);
    764 
    765 	connect_readcb = NULL;
    766 	stream_connect(connect_success_cb, NULL, T_CONNECT);
    767 }
    768 
    769 int
    770 stream_noop_teardown(void **state ISC_ATTR_UNUSED) {
    771 	atomic_assert_int_eq(cconnects, 1);
    772 	atomic_assert_int_eq(csends, 0);
    773 	atomic_assert_int_eq(creads, 0);
    774 	atomic_assert_int_eq(sreads, 0);
    775 	atomic_assert_int_eq(ssends, 0);
    776 	return teardown_netmgr_test(state);
    777 }
    778 
    779 int
    780 proxystream_noop_teardown(void **state) {
    781 	int r = stream_noop_teardown(state);
    782 	stream_use_PROXY = false;
    783 
    784 	return r;
    785 }
    786 
    787 int
    788 proxystreamtls_noop_teardown(void **state) {
    789 	int r = proxystream_noop_teardown(state);
    790 	stream_PROXY_over_TLS = false;
    791 
    792 	return r;
    793 }
    794 
    795 static void
    796 noresponse_readcb(isc_nmhandle_t *handle, isc_result_t eresult,
    797 		  isc_region_t *region, void *cbarg) {
    798 	UNUSED(handle);
    799 	UNUSED(region);
    800 	UNUSED(cbarg);
    801 
    802 	F();
    803 
    804 	assert_true(eresult == ISC_R_CANCELED ||
    805 		    eresult == ISC_R_CONNECTIONRESET || eresult == ISC_R_EOF);
    806 
    807 	isc_refcount_decrement(&active_creads);
    808 	isc_nmhandle_detach(&handle);
    809 
    810 	isc_loopmgr_shutdown(loopmgr);
    811 }
    812 
    813 static void
    814 noresponse_sendcb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
    815 	UNUSED(cbarg);
    816 	UNUSED(eresult);
    817 
    818 	F();
    819 
    820 	assert_non_null(handle);
    821 	atomic_fetch_add(&csends, 1);
    822 	isc_nmhandle_detach(&handle);
    823 	isc_refcount_decrement(&active_csends);
    824 }
    825 
    826 static void
    827 noresponse_connectcb(isc_nmhandle_t *handle, isc_result_t eresult,
    828 		     void *cbarg) {
    829 	isc_nmhandle_t *readhandle = NULL;
    830 	isc_nmhandle_t *sendhandle = NULL;
    831 
    832 	F();
    833 
    834 	isc_refcount_decrement(&active_cconnects);
    835 
    836 	assert_int_equal(eresult, ISC_R_SUCCESS);
    837 
    838 	atomic_fetch_add(&cconnects, 1);
    839 
    840 	isc_refcount_increment0(&active_creads);
    841 	isc_nmhandle_attach(handle, &readhandle);
    842 	isc_nm_read(handle, noresponse_readcb, NULL);
    843 
    844 	isc_refcount_increment0(&active_csends);
    845 	isc_nmhandle_attach(handle, &sendhandle);
    846 	isc_nmhandle_setwritetimeout(handle, T_IDLE);
    847 
    848 	isc_nm_send(handle, (isc_region_t *)&send_msg, noresponse_sendcb,
    849 		    cbarg);
    850 }
    851 
    852 int
    853 stream_noresponse_setup(void **state ISC_ATTR_UNUSED) {
    854 	int r = setup_netmgr_test(state);
    855 	expected_cconnects = 1;
    856 	expected_saccepts = 1;
    857 	return r;
    858 }
    859 
    860 int
    861 proxystream_noresponse_setup(void **state) {
    862 	stream_use_PROXY = true;
    863 	return stream_noresponse_setup(state);
    864 }
    865 
    866 int
    867 proxystream_noresponse_teardown(void **state) {
    868 	int r = stream_noresponse_teardown(state);
    869 	stream_use_PROXY = false;
    870 	return r;
    871 }
    872 
    873 int
    874 proxystreamtls_noresponse_setup(void **state) {
    875 	stream_PROXY_over_TLS = true;
    876 	return proxystream_noresponse_setup(state);
    877 }
    878 
    879 int
    880 proxystreamtls_noresponse_teardown(void **state) {
    881 	int r = proxystream_noresponse_teardown(state);
    882 	stream_PROXY_over_TLS = false;
    883 	return r;
    884 }
    885 
    886 void
    887 stream_noresponse(void **state ISC_ATTR_UNUSED) {
    888 	isc_result_t result = ISC_R_SUCCESS;
    889 
    890 	result = stream_listen(noop_accept_cb, NULL, 128, NULL, &listen_sock);
    891 	assert_int_equal(result, ISC_R_SUCCESS);
    892 	isc_loop_teardown(mainloop, stop_listening, listen_sock);
    893 
    894 	stream_connect(noresponse_connectcb, NULL, T_CONNECT);
    895 }
    896 
    897 int
    898 stream_noresponse_teardown(void **state ISC_ATTR_UNUSED) {
    899 	X(cconnects);
    900 	X(csends);
    901 	X(creads);
    902 	X(sreads);
    903 	X(ssends);
    904 
    905 	atomic_assert_int_eq(cconnects, 1);
    906 	atomic_assert_int_eq(creads, 0);
    907 	atomic_assert_int_eq(sreads, 0);
    908 	atomic_assert_int_eq(ssends, 0);
    909 
    910 	return teardown_netmgr_test(state);
    911 }
    912 
    913 int
    914 stream_timeout_recovery_setup(void **state ISC_ATTR_UNUSED) {
    915 	int r = setup_netmgr_test(state);
    916 
    917 	expected_ctimeouts = 4;
    918 	ctimeouts_shutdown = false;
    919 
    920 	expected_sreads = 5;
    921 	sreads_shutdown = true;
    922 
    923 	return r;
    924 }
    925 
    926 typedef struct proxy_addrs {
    927 	isc_sockaddr_t src_addr;
    928 	isc_sockaddr_t dst_addr;
    929 } proxy_addrs_t;
    930 
    931 static void
    932 proxy2_handler_save_addrs_cb(const isc_result_t result,
    933 			     const isc_proxy2_command_t cmd, const int socktype,
    934 			     const isc_sockaddr_t *restrict src_addr,
    935 			     const isc_sockaddr_t *restrict dst_addr,
    936 			     const isc_region_t *restrict tlv_data,
    937 			     const isc_region_t *restrict extra, void *cbarg) {
    938 	proxy_addrs_t *addrs = (proxy_addrs_t *)cbarg;
    939 
    940 	UNUSED(cmd);
    941 	UNUSED(socktype);
    942 	UNUSED(tlv_data);
    943 	UNUSED(extra);
    944 
    945 	REQUIRE(result == ISC_R_SUCCESS);
    946 
    947 	if (src_addr != NULL) {
    948 		addrs->src_addr = *src_addr;
    949 	}
    950 
    951 	if (dst_addr != NULL) {
    952 		addrs->dst_addr = *dst_addr;
    953 	}
    954 }
    955 
    956 void
    957 proxy_verify_endpoints(isc_nmhandle_t *handle) {
    958 	isc_sockaddr_t local, peer;
    959 	peer = isc_nmhandle_peeraddr(handle);
    960 	local = isc_nmhandle_localaddr(handle);
    961 
    962 	if (isc_nm_is_proxy_unspec(handle)) {
    963 		isc_sockaddr_t real_local, real_peer;
    964 		real_peer = isc_nmhandle_real_peeraddr(handle);
    965 		real_local = isc_nmhandle_real_localaddr(handle);
    966 
    967 		assert_true(isc_sockaddr_equal(&peer, &real_peer));
    968 		assert_true(isc_sockaddr_equal(&local, &real_local));
    969 	} else if (proxy_info == NULL) {
    970 		assert_true(isc_sockaddr_equal(&peer, &proxy_src));
    971 		assert_true(isc_sockaddr_equal(&local, &proxy_dst));
    972 	} else if (proxy_info != NULL && !proxy_info->complete) {
    973 		assert_true(isc_sockaddr_equal(
    974 			&peer, &proxy_info->proxy_info.src_addr));
    975 		assert_true(isc_sockaddr_equal(
    976 			&local, &proxy_info->proxy_info.dst_addr));
    977 	} else if (proxy_info != NULL && proxy_info->complete) {
    978 		proxy_addrs_t addrs = { 0 };
    979 		RUNTIME_CHECK(isc_proxy2_header_handle_directly(
    980 				      &proxy_info->complete_header,
    981 				      proxy2_handler_save_addrs_cb,
    982 				      &addrs) == ISC_R_SUCCESS);
    983 
    984 		assert_true(isc_sockaddr_equal(&peer, &addrs.src_addr));
    985 		assert_true(isc_sockaddr_equal(&local, &addrs.dst_addr));
    986 	}
    987 }
    988 
    989 int
    990 proxystream_timeout_recovery_setup(void **state) {
    991 	stream_use_PROXY = true;
    992 	return stream_timeout_recovery_setup(state);
    993 }
    994 
    995 int
    996 proxystream_timeout_recovery_teardown(void **state) {
    997 	int r = stream_timeout_recovery_teardown(state);
    998 	stream_use_PROXY = false;
    999 	return r;
   1000 }
   1001 
   1002 int
   1003 proxystreamtls_timeout_recovery_setup(void **state) {
   1004 	stream_PROXY_over_TLS = true;
   1005 	return proxystream_timeout_recovery_setup(state);
   1006 }
   1007 
   1008 int
   1009 proxystreamtls_timeout_recovery_teardown(void **state) {
   1010 	int r = proxystream_timeout_recovery_teardown(state);
   1011 	stream_PROXY_over_TLS = false;
   1012 	return r;
   1013 }
   1014 
   1015 void
   1016 stream_timeout_recovery(void **state ISC_ATTR_UNUSED) {
   1017 	isc_result_t result = ISC_R_SUCCESS;
   1018 
   1019 	/*
   1020 	 * Accept connections but don't send responses, forcing client
   1021 	 * reads to time out.
   1022 	 */
   1023 	noanswer = true;
   1024 	result = stream_listen(stream_accept_cb, NULL, 128, NULL, &listen_sock);
   1025 	assert_int_equal(result, ISC_R_SUCCESS);
   1026 	isc_loop_teardown(mainloop, stop_listening, listen_sock);
   1027 
   1028 	/*
   1029 	 * Shorten all the client timeouts to 0.05 seconds.
   1030 	 */
   1031 	isc_nm_settimeouts(connect_nm, T_SOFT, T_SOFT, T_SOFT, T_SOFT);
   1032 	connect_readcb = timeout_retry_cb;
   1033 	stream_connect(connect_connect_cb, NULL, T_CONNECT);
   1034 }
   1035 
   1036 int
   1037 stream_timeout_recovery_teardown(void **state ISC_ATTR_UNUSED) {
   1038 	atomic_assert_int_eq(ctimeouts, expected_ctimeouts);
   1039 	return teardown_netmgr_test(state);
   1040 }
   1041 
   1042 int
   1043 stream_recv_one_setup(void **state ISC_ATTR_UNUSED) {
   1044 	int r = setup_netmgr_test(state);
   1045 
   1046 	expected_cconnects = 1;
   1047 	cconnects_shutdown = false;
   1048 
   1049 	expected_csends = 1;
   1050 	csends_shutdown = false;
   1051 
   1052 	expected_saccepts = 1;
   1053 	saccepts_shutdown = false;
   1054 
   1055 	expected_sreads = 1;
   1056 	sreads_shutdown = false;
   1057 
   1058 	expected_ssends = 1;
   1059 	ssends_shutdown = false;
   1060 
   1061 	expected_creads = 1;
   1062 	creads_shutdown = true;
   1063 
   1064 	return r;
   1065 }
   1066 
   1067 int
   1068 proxystream_recv_one_setup(void **state) {
   1069 	stream_use_PROXY = true;
   1070 	return stream_recv_one_setup(state);
   1071 }
   1072 
   1073 int
   1074 proxystream_recv_one_teardown(void **state) {
   1075 	int r = stream_recv_one_teardown(state);
   1076 	stream_use_PROXY = false;
   1077 	return r;
   1078 }
   1079 
   1080 int
   1081 proxystreamtls_recv_one_setup(void **state) {
   1082 	stream_PROXY_over_TLS = true;
   1083 	return proxystream_recv_one_setup(state);
   1084 }
   1085 
   1086 int
   1087 proxystreamtls_recv_one_teardown(void **state) {
   1088 	int r = proxystream_recv_one_teardown(state);
   1089 	stream_PROXY_over_TLS = false;
   1090 	return r;
   1091 }
   1092 
   1093 void
   1094 stream_recv_one(void **state ISC_ATTR_UNUSED) {
   1095 	isc_result_t result = ISC_R_SUCCESS;
   1096 	isc_quota_t *quotap = tcp_listener_init_quota(1);
   1097 
   1098 	atomic_store(&nsends, 1);
   1099 
   1100 	result = stream_listen(stream_accept_cb, NULL, 128, quotap,
   1101 			       &listen_sock);
   1102 	assert_int_equal(result, ISC_R_SUCCESS);
   1103 	isc_loop_teardown(mainloop, stop_listening, listen_sock);
   1104 
   1105 	stream_connect(connect_connect_cb, NULL, T_CONNECT);
   1106 }
   1107 
   1108 int
   1109 stream_recv_one_teardown(void **state ISC_ATTR_UNUSED) {
   1110 	atomic_assert_int_eq(cconnects, expected_cconnects);
   1111 	atomic_assert_int_eq(csends, expected_csends);
   1112 	atomic_assert_int_eq(saccepts, expected_saccepts);
   1113 	atomic_assert_int_eq(sreads, expected_sreads);
   1114 	atomic_assert_int_eq(ssends, expected_ssends);
   1115 	atomic_assert_int_eq(creads, expected_creads);
   1116 
   1117 	return teardown_netmgr_test(state);
   1118 }
   1119 
   1120 int
   1121 stream_recv_two_setup(void **state ISC_ATTR_UNUSED) {
   1122 	int r = setup_netmgr_test(state);
   1123 
   1124 	expected_cconnects = 2;
   1125 	cconnects_shutdown = false;
   1126 
   1127 	expected_csends = 2;
   1128 	csends_shutdown = false;
   1129 
   1130 	expected_saccepts = 2;
   1131 	saccepts_shutdown = false;
   1132 
   1133 	expected_sreads = 2;
   1134 	sreads_shutdown = false;
   1135 
   1136 	expected_ssends = 2;
   1137 	ssends_shutdown = false;
   1138 
   1139 	expected_creads = 2;
   1140 	creads_shutdown = true;
   1141 
   1142 	return r;
   1143 }
   1144 
   1145 int
   1146 proxystream_recv_two_setup(void **state) {
   1147 	stream_use_PROXY = true;
   1148 	return stream_recv_two_setup(state);
   1149 }
   1150 
   1151 int
   1152 proxystream_recv_two_teardown(void **state) {
   1153 	int r = stream_recv_two_teardown(state);
   1154 	stream_use_PROXY = false;
   1155 	return r;
   1156 }
   1157 
   1158 int
   1159 proxystreamtls_recv_two_setup(void **state) {
   1160 	stream_PROXY_over_TLS = true;
   1161 	return proxystream_recv_two_setup(state);
   1162 }
   1163 
   1164 int
   1165 proxystreamtls_recv_two_teardown(void **state) {
   1166 	int r = proxystream_recv_two_teardown(state);
   1167 	stream_PROXY_over_TLS = false;
   1168 	return r;
   1169 }
   1170 
   1171 void
   1172 stream_recv_two(void **state ISC_ATTR_UNUSED) {
   1173 	isc_result_t result = ISC_R_SUCCESS;
   1174 	isc_quota_t *quotap = tcp_listener_init_quota(1);
   1175 
   1176 	atomic_store(&nsends, 2);
   1177 
   1178 	result = stream_listen(stream_accept_cb, NULL, 128, quotap,
   1179 			       &listen_sock);
   1180 	assert_int_equal(result, ISC_R_SUCCESS);
   1181 	isc_loop_teardown(mainloop, stop_listening, listen_sock);
   1182 
   1183 	stream_connect(connect_connect_cb, NULL, T_CONNECT);
   1184 
   1185 	stream_connect(connect_connect_cb, NULL, T_CONNECT);
   1186 }
   1187 
   1188 int
   1189 stream_recv_two_teardown(void **state ISC_ATTR_UNUSED) {
   1190 	atomic_assert_int_eq(cconnects, expected_cconnects);
   1191 	atomic_assert_int_eq(csends, expected_csends);
   1192 	atomic_assert_int_eq(sreads, expected_saccepts);
   1193 	atomic_assert_int_eq(sreads, expected_sreads);
   1194 	atomic_assert_int_eq(ssends, expected_ssends);
   1195 	atomic_assert_int_eq(creads, expected_creads);
   1196 
   1197 	return teardown_netmgr_test(state);
   1198 }
   1199 
   1200 int
   1201 stream_recv_send_setup(void **state ISC_ATTR_UNUSED) {
   1202 	int r = setup_netmgr_test(state);
   1203 	expected_cconnects = workers;
   1204 	cconnects_shutdown = false;
   1205 	nsends = expected_creads = workers;
   1206 	do_send = true;
   1207 
   1208 	return r;
   1209 }
   1210 
   1211 int
   1212 proxystream_recv_send_setup(void **state) {
   1213 	stream_use_PROXY = true;
   1214 	return stream_recv_send_setup(state);
   1215 }
   1216 
   1217 int
   1218 proxystream_recv_send_teardown(void **state) {
   1219 	int r = stream_recv_send_teardown(state);
   1220 	stream_use_PROXY = false;
   1221 	return r;
   1222 }
   1223 
   1224 int
   1225 proxystreamtls_recv_send_setup(void **state) {
   1226 	stream_PROXY_over_TLS = true;
   1227 	return proxystream_recv_send_setup(state);
   1228 }
   1229 
   1230 int
   1231 proxystreamtls_recv_send_teardown(void **state) {
   1232 	int r = proxystream_recv_send_teardown(state);
   1233 	stream_PROXY_over_TLS = false;
   1234 	return r;
   1235 }
   1236 
   1237 void
   1238 stream_recv_send(void **state ISC_ATTR_UNUSED) {
   1239 	isc_result_t result = ISC_R_SUCCESS;
   1240 	isc_quota_t *quotap = tcp_listener_init_quota(workers);
   1241 
   1242 	result = stream_listen(stream_accept_cb, NULL, 128, quotap,
   1243 			       &listen_sock);
   1244 	assert_int_equal(result, ISC_R_SUCCESS);
   1245 	isc_loop_teardown(mainloop, stop_listening, listen_sock);
   1246 
   1247 	for (size_t i = 0; i < workers; i++) {
   1248 		isc_async_run(isc_loop_get(loopmgr, i),
   1249 			      stream_recv_send_connect,
   1250 			      get_stream_connect_function());
   1251 	}
   1252 }
   1253 
   1254 int
   1255 stream_recv_send_teardown(void **state ISC_ATTR_UNUSED) {
   1256 	X(cconnects);
   1257 	X(csends);
   1258 	X(creads);
   1259 	X(sreads);
   1260 	X(ssends);
   1261 
   1262 	CHECK_RANGE_FULL(csends);
   1263 	CHECK_RANGE_FULL(creads);
   1264 	CHECK_RANGE_FULL(sreads);
   1265 	CHECK_RANGE_FULL(ssends);
   1266 
   1267 	return teardown_netmgr_test(state);
   1268 }
   1269 
   1270 int
   1271 setup_udp_test(void **state) {
   1272 	setup_loopmgr(state);
   1273 	setup_netmgr(state);
   1274 
   1275 	udp_connect_addr = (isc_sockaddr_t){ .length = 0 };
   1276 	isc_sockaddr_fromin6(&udp_connect_addr, &in6addr_loopback, 0);
   1277 
   1278 	udp_listen_addr = (isc_sockaddr_t){ .length = 0 };
   1279 	isc_sockaddr_fromin6(&udp_listen_addr, &in6addr_loopback,
   1280 			     udp_use_PROXY ? PROXYUDP_TEST_PORT
   1281 					   : UDP_TEST_PORT);
   1282 
   1283 	atomic_store(&sreads, 0);
   1284 	atomic_store(&ssends, 0);
   1285 
   1286 	atomic_store(&cconnects, 0);
   1287 	atomic_store(&csends, 0);
   1288 	atomic_store(&creads, 0);
   1289 	atomic_store(&ctimeouts, 0);
   1290 
   1291 	isc_refcount_init(&active_cconnects, 0);
   1292 	isc_refcount_init(&active_csends, 0);
   1293 	isc_refcount_init(&active_creads, 0);
   1294 	isc_refcount_init(&active_ssends, 0);
   1295 	isc_refcount_init(&active_sreads, 0);
   1296 
   1297 	expected_cconnects = -1;
   1298 	expected_csends = -1;
   1299 	expected_creads = -1;
   1300 	expected_sreads = -1;
   1301 	expected_ssends = -1;
   1302 	expected_ctimeouts = -1;
   1303 
   1304 	ssends_shutdown = true;
   1305 	sreads_shutdown = true;
   1306 	csends_shutdown = true;
   1307 	cconnects_shutdown = true;
   1308 	creads_shutdown = true;
   1309 
   1310 	isc_nonce_buf(&send_magic, sizeof(send_magic));
   1311 
   1312 	connect_readcb = connect_read_cb;
   1313 
   1314 	return 0;
   1315 }
   1316 
   1317 int
   1318 teardown_udp_test(void **state) {
   1319 	UNUSED(state);
   1320 
   1321 	isc_refcount_destroy(&active_cconnects);
   1322 	isc_refcount_destroy(&active_csends);
   1323 	isc_refcount_destroy(&active_creads);
   1324 	isc_refcount_destroy(&active_ssends);
   1325 	isc_refcount_destroy(&active_sreads);
   1326 
   1327 	teardown_netmgr(state);
   1328 	teardown_loopmgr(state);
   1329 
   1330 	return 0;
   1331 }
   1332 
   1333 static void
   1334 udp_connect(isc_nm_cb_t cb, void *cbarg, unsigned int timeout) {
   1335 	if (udp_use_PROXY) {
   1336 		isc_nm_proxyudpconnect(netmgr, &udp_connect_addr,
   1337 				       &udp_listen_addr, cb, cbarg, timeout,
   1338 				       NULL);
   1339 	} else {
   1340 		isc_nm_udpconnect(netmgr, &udp_connect_addr, &udp_listen_addr,
   1341 				  cb, cbarg, timeout);
   1342 	}
   1343 }
   1344 
   1345 static void
   1346 udp_listen_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   1347 		   isc_region_t *region, void *cbarg) {
   1348 	if (eresult != ISC_R_SUCCESS) {
   1349 		isc_refcount_increment0(&active_sreads);
   1350 	}
   1351 	listen_read_cb(handle, eresult, region, cbarg);
   1352 }
   1353 
   1354 static void
   1355 udp_start_listening(uint32_t nworkers, isc_nm_recv_cb_t cb) {
   1356 	isc_result_t result;
   1357 
   1358 	if (udp_use_PROXY) {
   1359 		result = isc_nm_listenproxyudp(netmgr, nworkers,
   1360 					       &udp_listen_addr, cb, NULL,
   1361 					       &listen_sock);
   1362 	} else {
   1363 		result = isc_nm_listenudp(netmgr, nworkers, &udp_listen_addr,
   1364 					  cb, NULL, &listen_sock);
   1365 	}
   1366 
   1367 	assert_int_equal(result, ISC_R_SUCCESS);
   1368 
   1369 	isc_loop_teardown(mainloop, stop_listening, listen_sock);
   1370 }
   1371 
   1372 static void
   1373 udp__send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
   1374 	isc_nmhandle_t *sendhandle = handle;
   1375 
   1376 	assert_non_null(sendhandle);
   1377 
   1378 	F();
   1379 
   1380 	switch (eresult) {
   1381 	case ISC_R_SUCCESS:
   1382 		if (have_expected_csends(atomic_fetch_add(&csends, 1) + 1)) {
   1383 			if (csends_shutdown) {
   1384 				isc_nm_cancelread(handle);
   1385 				isc_loopmgr_shutdown(loopmgr);
   1386 			}
   1387 		}
   1388 		break;
   1389 	case ISC_R_SHUTTINGDOWN:
   1390 	case ISC_R_CANCELED:
   1391 		break;
   1392 	default:
   1393 		fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
   1394 			isc_result_totext(eresult), cbarg);
   1395 		assert_int_equal(eresult, ISC_R_SUCCESS);
   1396 	}
   1397 
   1398 	isc_nmhandle_detach(&sendhandle);
   1399 	isc_refcount_decrement(&active_csends);
   1400 }
   1401 
   1402 static void
   1403 udp__connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg);
   1404 
   1405 static void
   1406 udp_enqueue_connect(void *arg ISC_ATTR_UNUSED) {
   1407 	isc_sockaddr_t connect_addr;
   1408 
   1409 	connect_addr = (isc_sockaddr_t){ .length = 0 };
   1410 	isc_sockaddr_fromin6(&connect_addr, &in6addr_loopback, 0);
   1411 
   1412 	isc_refcount_increment0(&active_cconnects);
   1413 
   1414 	udp_connect(udp__connect_cb, NULL, T_CONNECT);
   1415 }
   1416 
   1417 static void
   1418 udp__connect_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   1419 		     isc_region_t *region, void *cbarg) {
   1420 	uint64_t magic = 0;
   1421 
   1422 	assert_non_null(handle);
   1423 
   1424 	F();
   1425 
   1426 	switch (eresult) {
   1427 	case ISC_R_TIMEDOUT:
   1428 		/*
   1429 		 * We are operating on the localhost, UDP cannot get lost, but
   1430 		 * it could be delayed, so we read again until we get the
   1431 		 * answer.
   1432 		 */
   1433 		isc_nm_read(handle, connect_readcb, cbarg);
   1434 		return;
   1435 	case ISC_R_SUCCESS:
   1436 		assert_true(region->length >= sizeof(magic));
   1437 
   1438 		memmove(&magic, region->base, sizeof(magic));
   1439 
   1440 		assert_true(magic == send_magic);
   1441 
   1442 		if (have_expected_creads(atomic_fetch_add(&creads, 1) + 1)) {
   1443 			do_creads_shutdown(loopmgr);
   1444 		}
   1445 
   1446 		if (magic == send_magic && allow_send_back) {
   1447 			connect_send(handle);
   1448 			return;
   1449 		}
   1450 
   1451 		break;
   1452 	default:
   1453 		fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
   1454 			isc_result_totext(eresult), cbarg);
   1455 		assert_int_equal(eresult, ISC_R_SUCCESS);
   1456 	}
   1457 
   1458 	isc_refcount_decrement(&active_creads);
   1459 
   1460 	isc_nmhandle_detach(&handle);
   1461 }
   1462 
   1463 static void
   1464 udp__connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
   1465 	isc_nmhandle_t *readhandle = NULL;
   1466 	isc_nmhandle_t *sendhandle = NULL;
   1467 
   1468 	F();
   1469 
   1470 	isc_refcount_decrement(&active_cconnects);
   1471 
   1472 	switch (eresult) {
   1473 	case ISC_R_SUCCESS:
   1474 		if (udp_use_PROXY) {
   1475 			assert_true(isc_nm_is_proxy_handle(handle));
   1476 		}
   1477 
   1478 		if (have_expected_cconnects(atomic_fetch_add(&cconnects, 1) +
   1479 					    1))
   1480 		{
   1481 			do_cconnects_shutdown(loopmgr);
   1482 		} else if (do_send) {
   1483 			isc_async_current(udp_enqueue_connect, cbarg);
   1484 		}
   1485 
   1486 		isc_refcount_increment0(&active_creads);
   1487 		isc_nmhandle_attach(handle, &readhandle);
   1488 		isc_nm_read(handle, connect_readcb, cbarg);
   1489 
   1490 		isc_refcount_increment0(&active_csends);
   1491 		isc_nmhandle_attach(handle, &sendhandle);
   1492 		isc_nmhandle_setwritetimeout(handle, T_IDLE);
   1493 
   1494 		isc_nm_send(sendhandle, (isc_region_t *)&send_msg, udp__send_cb,
   1495 			    cbarg);
   1496 
   1497 		break;
   1498 	case ISC_R_ADDRINUSE:
   1499 		/* Try again */
   1500 		udp_enqueue_connect(NULL);
   1501 		break;
   1502 	case ISC_R_SHUTTINGDOWN:
   1503 	case ISC_R_CANCELED:
   1504 		break;
   1505 	default:
   1506 		fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
   1507 			isc_result_totext(eresult), cbarg);
   1508 		assert_int_equal(eresult, ISC_R_SUCCESS);
   1509 	}
   1510 }
   1511 
   1512 int
   1513 udp_noop_setup(void **state) {
   1514 	setup_udp_test(state);
   1515 	expected_cconnects = 1;
   1516 	cconnects_shutdown = true;
   1517 	return 0;
   1518 }
   1519 
   1520 int
   1521 udp_noop_teardown(void **state) {
   1522 	atomic_assert_int_eq(cconnects, 1);
   1523 	teardown_udp_test(state);
   1524 	return 0;
   1525 }
   1526 
   1527 void
   1528 udp_noop(void **arg ISC_ATTR_UNUSED) {
   1529 	/* isc_result_t result = ISC_R_SUCCESS; */
   1530 
   1531 	/* result = isc_nm_listenudp(netmgr, ISC_NM_LISTEN_ALL,
   1532 	 * &udp_listen_addr, */
   1533 	/* 			  mock_recv_cb, NULL, &listen_sock); */
   1534 	/* assert_int_equal(result, ISC_R_SUCCESS); */
   1535 
   1536 	/* isc_nm_stoplistening(listen_sock); */
   1537 	/* isc_nmsocket_close(&listen_sock); */
   1538 	/* assert_null(listen_sock); */
   1539 
   1540 	isc_refcount_increment0(&active_cconnects);
   1541 	udp_connect(connect_success_cb, NULL, UDP_T_CONNECT);
   1542 }
   1543 
   1544 int
   1545 proxyudp_noop_setup(void **state) {
   1546 	udp_use_PROXY = true;
   1547 	return udp_noop_setup(state);
   1548 }
   1549 
   1550 int
   1551 proxyudp_noop_teardown(void **state) {
   1552 	int ret = udp_noop_teardown(state);
   1553 	udp_use_PROXY = false;
   1554 	return ret;
   1555 }
   1556 
   1557 static void
   1558 udp_noresponse_recv_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   1559 		       isc_region_t *region, void *cbarg) {
   1560 	UNUSED(handle);
   1561 	UNUSED(eresult);
   1562 	UNUSED(region);
   1563 	UNUSED(cbarg);
   1564 }
   1565 
   1566 static void
   1567 udp_noresponse_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   1568 		       isc_region_t *region, void *cbarg) {
   1569 	UNUSED(region);
   1570 	UNUSED(cbarg);
   1571 
   1572 	assert_int_equal(eresult, ISC_R_TIMEDOUT);
   1573 
   1574 	isc_refcount_decrement(&active_creads);
   1575 
   1576 	atomic_fetch_add(&creads, 1);
   1577 
   1578 	isc_nmhandle_detach(&handle);
   1579 
   1580 	isc_loopmgr_shutdown(loopmgr);
   1581 }
   1582 
   1583 static void
   1584 udp_noresponse_send_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   1585 		       void *cbarg) {
   1586 	UNUSED(cbarg);
   1587 
   1588 	assert_non_null(handle);
   1589 	assert_int_equal(eresult, ISC_R_SUCCESS);
   1590 	atomic_fetch_add(&csends, 1);
   1591 	isc_nmhandle_detach(&handle);
   1592 	isc_refcount_decrement(&active_csends);
   1593 }
   1594 
   1595 static void
   1596 udp_noresponse_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   1597 			  void *cbarg) {
   1598 	isc_nmhandle_t *readhandle = NULL;
   1599 	isc_nmhandle_t *sendhandle = NULL;
   1600 
   1601 	isc_refcount_decrement(&active_cconnects);
   1602 
   1603 	assert_int_equal(eresult, ISC_R_SUCCESS);
   1604 
   1605 	/* Read */
   1606 	isc_refcount_increment0(&active_creads);
   1607 	isc_nmhandle_attach(handle, &readhandle);
   1608 	isc_nm_read(handle, udp_noresponse_read_cb, cbarg);
   1609 
   1610 	/* Send */
   1611 	isc_refcount_increment0(&active_csends);
   1612 	isc_nmhandle_attach(handle, &sendhandle);
   1613 	isc_nmhandle_setwritetimeout(handle, T_IDLE);
   1614 
   1615 	isc_nm_send(sendhandle, (isc_region_t *)&send_msg,
   1616 		    udp_noresponse_send_cb, cbarg);
   1617 
   1618 	atomic_fetch_add(&cconnects, 1);
   1619 }
   1620 
   1621 int
   1622 udp_noresponse_setup(void **state) {
   1623 	setup_udp_test(state);
   1624 	expected_csends = 1;
   1625 	return 0;
   1626 }
   1627 
   1628 int
   1629 udp_noresponse_teardown(void **state) {
   1630 	atomic_assert_int_eq(csends, expected_csends);
   1631 	teardown_udp_test(state);
   1632 	return 0;
   1633 }
   1634 
   1635 void
   1636 udp_noresponse(void **arg ISC_ATTR_UNUSED) {
   1637 	udp_start_listening(ISC_NM_LISTEN_ONE, udp_noresponse_recv_cb);
   1638 
   1639 	isc_refcount_increment0(&active_cconnects);
   1640 	udp_connect(udp_noresponse_connect_cb, listen_sock, UDP_T_SOFT);
   1641 }
   1642 
   1643 int
   1644 proxyudp_noresponse_setup(void **state) {
   1645 	udp_use_PROXY = true;
   1646 	return udp_noresponse_setup(state);
   1647 }
   1648 
   1649 int
   1650 proxyudp_noresponse_teardown(void **state) {
   1651 	int ret = udp_noresponse_teardown(state);
   1652 	udp_use_PROXY = false;
   1653 	return ret;
   1654 }
   1655 
   1656 static void
   1657 udp_timeout_recovery_ssend_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   1658 			      void *cbarg) {
   1659 	UNUSED(cbarg);
   1660 
   1661 	isc_refcount_decrement(&active_ssends);
   1662 	assert_non_null(handle);
   1663 	assert_int_equal(eresult, ISC_R_SUCCESS);
   1664 	atomic_fetch_add(&ssends, 1);
   1665 	isc_nmhandle_detach(&handle);
   1666 }
   1667 
   1668 static void
   1669 udp_timeout_recovery_recv_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   1670 			     isc_region_t *region, void *cbarg) {
   1671 	uint64_t magic = 0;
   1672 	isc_nmhandle_t *sendhandle = NULL;
   1673 	int _creads = atomic_fetch_add(&creads, 1) + 1;
   1674 
   1675 	assert_non_null(handle);
   1676 
   1677 	assert_int_equal(eresult, ISC_R_SUCCESS);
   1678 
   1679 	assert_true(region->length == sizeof(magic));
   1680 
   1681 	memmove(&magic, region->base, sizeof(magic));
   1682 	assert_true(magic == send_magic);
   1683 	assert_true(_creads < 6);
   1684 
   1685 	if (_creads == 5) {
   1686 		isc_nmhandle_attach(handle, &sendhandle);
   1687 		isc_refcount_increment0(&active_ssends);
   1688 		isc_nmhandle_setwritetimeout(sendhandle, T_IDLE);
   1689 		isc_nm_send(sendhandle, (isc_region_t *)&send_msg,
   1690 			    udp_timeout_recovery_ssend_cb, cbarg);
   1691 	}
   1692 }
   1693 
   1694 static void
   1695 udp_timeout_recovery_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   1696 			     isc_region_t *region, void *cbarg) {
   1697 	UNUSED(region);
   1698 	UNUSED(cbarg);
   1699 
   1700 	assert_non_null(handle);
   1701 
   1702 	F();
   1703 
   1704 	if (eresult == ISC_R_TIMEDOUT &&
   1705 	    atomic_fetch_add(&ctimeouts, 1) + 1 < expected_ctimeouts)
   1706 	{
   1707 		isc_nmhandle_settimeout(handle, T_SOFT);
   1708 		return;
   1709 	}
   1710 
   1711 	isc_refcount_decrement(&active_creads);
   1712 	isc_nmhandle_detach(&handle);
   1713 
   1714 	atomic_fetch_add(&creads, 1);
   1715 	isc_loopmgr_shutdown(loopmgr);
   1716 }
   1717 
   1718 static void
   1719 udp_timeout_recovery_send_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   1720 			     void *cbarg) {
   1721 	UNUSED(cbarg);
   1722 
   1723 	assert_non_null(handle);
   1724 	assert_int_equal(eresult, ISC_R_SUCCESS);
   1725 	atomic_fetch_add(&csends, 1);
   1726 
   1727 	isc_nmhandle_detach(&handle);
   1728 	isc_refcount_decrement(&active_csends);
   1729 }
   1730 
   1731 static void
   1732 udp_timeout_recovery_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   1733 				void *cbarg) {
   1734 	isc_nmhandle_t *readhandle = NULL;
   1735 	isc_nmhandle_t *sendhandle = NULL;
   1736 
   1737 	F();
   1738 
   1739 	isc_refcount_decrement(&active_cconnects);
   1740 
   1741 	assert_int_equal(eresult, ISC_R_SUCCESS);
   1742 
   1743 	/* Read */
   1744 	isc_refcount_increment0(&active_creads);
   1745 	isc_nmhandle_attach(handle, &readhandle);
   1746 	isc_nm_read(handle, udp_timeout_recovery_read_cb, cbarg);
   1747 
   1748 	/* Send */
   1749 	isc_refcount_increment0(&active_csends);
   1750 	isc_nmhandle_attach(handle, &sendhandle);
   1751 	isc_nmhandle_setwritetimeout(handle, T_IDLE);
   1752 	isc_nm_send(sendhandle, (isc_region_t *)&send_msg,
   1753 		    udp_timeout_recovery_send_cb, cbarg);
   1754 
   1755 	atomic_fetch_add(&cconnects, 1);
   1756 }
   1757 
   1758 int
   1759 udp_timeout_recovery_setup(void **state) {
   1760 	setup_udp_test(state);
   1761 	expected_cconnects = 1;
   1762 	expected_csends = 1;
   1763 	expected_creads = 1;
   1764 	expected_ctimeouts = 4;
   1765 	return 0;
   1766 }
   1767 
   1768 int
   1769 udp_timeout_recovery_teardown(void **state) {
   1770 	atomic_assert_int_eq(cconnects, expected_cconnects);
   1771 	atomic_assert_int_eq(csends, expected_csends);
   1772 	atomic_assert_int_eq(csends, expected_creads);
   1773 	atomic_assert_int_eq(ctimeouts, expected_ctimeouts);
   1774 	teardown_udp_test(state);
   1775 	return 0;
   1776 }
   1777 
   1778 void
   1779 udp_timeout_recovery(void **arg ISC_ATTR_UNUSED) {
   1780 	/*
   1781 	 * Listen using the noop callback so that client reads will time out.
   1782 	 */
   1783 	udp_start_listening(ISC_NM_LISTEN_ONE, udp_timeout_recovery_recv_cb);
   1784 
   1785 	/*
   1786 	 * Connect with client timeout set to 0.05 seconds, then sleep for at
   1787 	 * least a second for each 'tick'. timeout_retry_cb() will give up
   1788 	 * after five timeouts.
   1789 	 */
   1790 	isc_refcount_increment0(&active_cconnects);
   1791 	udp_connect(udp_timeout_recovery_connect_cb, listen_sock, UDP_T_SOFT);
   1792 }
   1793 
   1794 int
   1795 proxyudp_timeout_recovery_setup(void **state) {
   1796 	udp_use_PROXY = true;
   1797 	return udp_timeout_recovery_setup(state);
   1798 }
   1799 
   1800 int
   1801 proxyudp_timeout_recovery_teardown(void **state) {
   1802 	int ret = udp_timeout_recovery_teardown(state);
   1803 	udp_use_PROXY = false;
   1804 	return ret;
   1805 }
   1806 
   1807 static void
   1808 udp_shutdown_connect_async_cb(void *arg ISC_ATTR_UNUSED);
   1809 
   1810 static void
   1811 udp_shutdown_connect_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   1812 				void *cbarg) {
   1813 	UNUSED(handle);
   1814 	UNUSED(cbarg);
   1815 
   1816 	isc_refcount_decrement(&active_cconnects);
   1817 
   1818 	/*
   1819 	 * The first UDP connect is faster than asynchronous shutdown procedure,
   1820 	 * restart the UDP connect again and expect the failure only in the
   1821 	 * second loop.
   1822 	 */
   1823 	if (atomic_fetch_add(&cconnects, 1) == 0) {
   1824 		assert_int_equal(eresult, ISC_R_SUCCESS);
   1825 		isc_async_current(udp_shutdown_connect_async_cb, netmgr);
   1826 	} else {
   1827 		assert_int_equal(eresult, ISC_R_SHUTTINGDOWN);
   1828 	}
   1829 }
   1830 
   1831 static void
   1832 udp_shutdown_connect_async_cb(void *arg ISC_ATTR_UNUSED) {
   1833 	isc_refcount_increment0(&active_cconnects);
   1834 	udp_connect(udp_shutdown_connect_connect_cb, NULL, T_SOFT);
   1835 }
   1836 
   1837 int
   1838 udp_shutdown_connect_setup(void **state) {
   1839 	setup_udp_test(state);
   1840 	expected_cconnects = 2;
   1841 	return 0;
   1842 }
   1843 
   1844 int
   1845 udp_shutdown_connect_teardown(void **state) {
   1846 	atomic_assert_int_eq(cconnects, expected_cconnects);
   1847 	teardown_udp_test(state);
   1848 	return 0;
   1849 }
   1850 
   1851 void
   1852 udp_shutdown_connect(void **arg ISC_ATTR_UNUSED) {
   1853 	isc_loopmgr_shutdown(loopmgr);
   1854 	/*
   1855 	 * isc_nm_udpconnect() is synchronous, so we need to launch this on the
   1856 	 * async loop.
   1857 	 */
   1858 	isc_async_current(udp_shutdown_connect_async_cb, netmgr);
   1859 }
   1860 
   1861 int
   1862 proxyudp_shutdown_connect_setup(void **state) {
   1863 	udp_use_PROXY = true;
   1864 	return udp_shutdown_connect_setup(state);
   1865 }
   1866 
   1867 int
   1868 proxyudp_shutdown_connect_teardown(void **state) {
   1869 	int ret = udp_shutdown_connect_teardown(state);
   1870 	udp_use_PROXY = false;
   1871 	return ret;
   1872 }
   1873 
   1874 static void
   1875 udp_shutdown_read_recv_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   1876 			  isc_region_t *region, void *cbarg) {
   1877 	uint64_t magic = 0;
   1878 
   1879 	UNUSED(cbarg);
   1880 
   1881 	assert_non_null(handle);
   1882 
   1883 	F();
   1884 
   1885 	assert_int_equal(eresult, ISC_R_SUCCESS);
   1886 
   1887 	assert_true(region->length == sizeof(magic));
   1888 
   1889 	memmove(&magic, region->base, sizeof(magic));
   1890 	assert_true(magic == send_magic);
   1891 }
   1892 
   1893 static void
   1894 udp_shutdown_read_send_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   1895 			  void *cbarg) {
   1896 	UNUSED(cbarg);
   1897 
   1898 	F();
   1899 
   1900 	assert_non_null(handle);
   1901 	assert_int_equal(eresult, ISC_R_SUCCESS);
   1902 
   1903 	atomic_fetch_add(&csends, 1);
   1904 
   1905 	isc_loopmgr_shutdown(loopmgr);
   1906 
   1907 	isc_nmhandle_detach(&handle);
   1908 	isc_refcount_decrement(&active_csends);
   1909 }
   1910 
   1911 static void
   1912 udp_shutdown_read_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   1913 			  isc_region_t *region, void *cbarg) {
   1914 	UNUSED(region);
   1915 	UNUSED(cbarg);
   1916 
   1917 	assert_true(eresult == ISC_R_SHUTTINGDOWN || eresult == ISC_R_TIMEDOUT);
   1918 
   1919 	isc_refcount_decrement(&active_creads);
   1920 
   1921 	atomic_fetch_add(&creads, 1);
   1922 
   1923 	isc_nmhandle_detach(&handle);
   1924 }
   1925 
   1926 static void
   1927 udp_shutdown_read_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   1928 			     void *cbarg) {
   1929 	isc_nmhandle_t *readhandle = NULL;
   1930 	isc_nmhandle_t *sendhandle = NULL;
   1931 
   1932 	isc_refcount_decrement(&active_cconnects);
   1933 
   1934 	assert_int_equal(eresult, ISC_R_SUCCESS);
   1935 
   1936 	/* Read */
   1937 	isc_refcount_increment0(&active_creads);
   1938 	isc_nmhandle_attach(handle, &readhandle);
   1939 	isc_nm_read(handle, udp_shutdown_read_read_cb, cbarg);
   1940 	assert_true(handle->sock->reading);
   1941 
   1942 	/* Send */
   1943 	isc_refcount_increment0(&active_csends);
   1944 	isc_nmhandle_attach(handle, &sendhandle);
   1945 	isc_nmhandle_setwritetimeout(handle, T_IDLE);
   1946 	isc_nm_send(sendhandle, (isc_region_t *)&send_msg,
   1947 		    udp_shutdown_read_send_cb, cbarg);
   1948 
   1949 	atomic_fetch_add(&cconnects, 1);
   1950 }
   1951 
   1952 int
   1953 udp_shutdown_read_setup(void **state) {
   1954 	setup_udp_test(state);
   1955 	expected_cconnects = 1;
   1956 	expected_creads = 1;
   1957 	return 0;
   1958 }
   1959 
   1960 int
   1961 udp_shutdown_read_teardown(void **state) {
   1962 	atomic_assert_int_eq(cconnects, expected_cconnects);
   1963 	atomic_assert_int_eq(creads, expected_creads);
   1964 	teardown_udp_test(state);
   1965 	return 0;
   1966 }
   1967 
   1968 void
   1969 udp_shutdown_read(void **arg ISC_ATTR_UNUSED) {
   1970 	udp_start_listening(ISC_NM_LISTEN_ONE, udp_shutdown_read_recv_cb);
   1971 
   1972 	isc_refcount_increment0(&active_cconnects);
   1973 	udp_connect(udp_shutdown_read_connect_cb, NULL, UDP_T_SOFT);
   1974 }
   1975 
   1976 int
   1977 proxyudp_shutdown_read_setup(void **state) {
   1978 	udp_use_PROXY = true;
   1979 	return udp_shutdown_read_setup(state);
   1980 }
   1981 
   1982 int
   1983 proxyudp_shutdown_read_teardown(void **state) {
   1984 	int ret = udp_shutdown_read_teardown(state);
   1985 	udp_use_PROXY = false;
   1986 	return ret;
   1987 }
   1988 
   1989 static void
   1990 udp_cancel_read_recv_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   1991 			isc_region_t *region, void *cbarg) {
   1992 	uint64_t magic = 0;
   1993 
   1994 	UNUSED(cbarg);
   1995 
   1996 	assert_non_null(handle);
   1997 
   1998 	F();
   1999 
   2000 	assert_int_equal(eresult, ISC_R_SUCCESS);
   2001 
   2002 	assert_true(region->length == sizeof(magic));
   2003 
   2004 	memmove(&magic, region->base, sizeof(magic));
   2005 	assert_true(magic == send_magic);
   2006 }
   2007 
   2008 static void
   2009 udp_cancel_read_send_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   2010 			void *cbarg) {
   2011 	UNUSED(cbarg);
   2012 
   2013 	F();
   2014 
   2015 	assert_non_null(handle);
   2016 	assert_int_equal(eresult, ISC_R_SUCCESS);
   2017 
   2018 	atomic_fetch_add(&csends, 1);
   2019 
   2020 	isc_nm_cancelread(handle);
   2021 
   2022 	isc_nmhandle_detach(&handle);
   2023 	isc_refcount_decrement(&active_csends);
   2024 }
   2025 
   2026 static void
   2027 udp_cancel_read_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   2028 			isc_region_t *region, void *cbarg) {
   2029 	isc_nmhandle_t *sendhandle = NULL;
   2030 	isc_nmhandle_t *readhandle = NULL;
   2031 
   2032 	UNUSED(region);
   2033 
   2034 	F();
   2035 
   2036 	switch (eresult) {
   2037 	case ISC_R_TIMEDOUT:
   2038 
   2039 		/* Read again */
   2040 		isc_refcount_increment0(&active_creads);
   2041 		isc_nmhandle_attach(handle, &readhandle);
   2042 		isc_nm_read(handle, udp_cancel_read_read_cb, cbarg);
   2043 
   2044 		/* Send only once */
   2045 		if (isc_refcount_increment0(&active_csends) == 0) {
   2046 			isc_nmhandle_attach(handle, &sendhandle);
   2047 			isc_nmhandle_setwritetimeout(handle, T_IDLE);
   2048 			isc_nm_send(sendhandle, (isc_region_t *)&send_msg,
   2049 				    udp_cancel_read_send_cb, cbarg);
   2050 		}
   2051 		break;
   2052 	case ISC_R_CANCELED:
   2053 		/* The read has been canceled */
   2054 		atomic_fetch_add(&creads, 1);
   2055 		isc_loopmgr_shutdown(loopmgr);
   2056 		break;
   2057 	default:
   2058 		UNREACHABLE();
   2059 	}
   2060 
   2061 	isc_refcount_decrement(&active_creads);
   2062 
   2063 	isc_nmhandle_detach(&handle);
   2064 }
   2065 
   2066 static void
   2067 udp_cancel_read_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   2068 			   void *cbarg) {
   2069 	isc_nmhandle_t *readhandle = NULL;
   2070 
   2071 	isc_refcount_decrement(&active_cconnects);
   2072 
   2073 	assert_int_equal(eresult, ISC_R_SUCCESS);
   2074 
   2075 	isc_refcount_increment0(&active_creads);
   2076 	isc_nmhandle_attach(handle, &readhandle);
   2077 	isc_nm_read(handle, udp_cancel_read_read_cb, cbarg);
   2078 
   2079 	atomic_fetch_add(&cconnects, 1);
   2080 }
   2081 
   2082 int
   2083 udp_cancel_read_setup(void **state) {
   2084 	setup_udp_test(state);
   2085 	expected_cconnects = 1;
   2086 	expected_creads = 1;
   2087 	return 0;
   2088 }
   2089 
   2090 int
   2091 udp_cancel_read_teardown(void **state) {
   2092 	atomic_assert_int_eq(cconnects, expected_cconnects);
   2093 	atomic_assert_int_eq(creads, expected_creads);
   2094 	teardown_udp_test(state);
   2095 	return 0;
   2096 }
   2097 
   2098 void
   2099 udp_cancel_read(void **arg ISC_ATTR_UNUSED) {
   2100 	udp_start_listening(ISC_NM_LISTEN_ONE, udp_cancel_read_recv_cb);
   2101 
   2102 	isc_refcount_increment0(&active_cconnects);
   2103 	udp_connect(udp_cancel_read_connect_cb, NULL, UDP_T_SOFT);
   2104 }
   2105 
   2106 int
   2107 proxyudp_cancel_read_setup(void **state) {
   2108 	udp_use_PROXY = true;
   2109 	return udp_cancel_read_setup(state);
   2110 }
   2111 
   2112 int
   2113 proxyudp_cancel_read_teardown(void **state) {
   2114 	int ret = udp_cancel_read_teardown(state);
   2115 	udp_use_PROXY = false;
   2116 	return ret;
   2117 }
   2118 
   2119 int
   2120 udp_recv_one_setup(void **state) {
   2121 	setup_udp_test(state);
   2122 
   2123 	connect_readcb = udp__connect_read_cb;
   2124 
   2125 	expected_cconnects = 1;
   2126 	cconnects_shutdown = false;
   2127 
   2128 	expected_csends = 1;
   2129 	csends_shutdown = false;
   2130 
   2131 	expected_sreads = 1;
   2132 	sreads_shutdown = false;
   2133 
   2134 	expected_ssends = 1;
   2135 	ssends_shutdown = false;
   2136 
   2137 	expected_creads = 1;
   2138 	creads_shutdown = true;
   2139 
   2140 	return 0;
   2141 }
   2142 
   2143 int
   2144 udp_recv_one_teardown(void **state) {
   2145 	atomic_assert_int_eq(cconnects, expected_cconnects);
   2146 	atomic_assert_int_eq(csends, expected_csends);
   2147 	atomic_assert_int_eq(sreads, expected_sreads);
   2148 	atomic_assert_int_eq(ssends, expected_ssends);
   2149 	atomic_assert_int_eq(creads, expected_creads);
   2150 
   2151 	teardown_udp_test(state);
   2152 
   2153 	return 0;
   2154 }
   2155 
   2156 void
   2157 udp_recv_one(void **arg ISC_ATTR_UNUSED) {
   2158 	udp_start_listening(ISC_NM_LISTEN_ONE, udp_listen_read_cb);
   2159 
   2160 	udp_enqueue_connect(NULL);
   2161 }
   2162 
   2163 int
   2164 proxyudp_recv_one_setup(void **state) {
   2165 	udp_use_PROXY = true;
   2166 	return udp_recv_one_setup(state);
   2167 }
   2168 
   2169 int
   2170 proxyudp_recv_one_teardown(void **state) {
   2171 	int ret = udp_recv_one_teardown(state);
   2172 	udp_use_PROXY = false;
   2173 	return ret;
   2174 }
   2175 
   2176 int
   2177 udp_recv_two_setup(void **state) {
   2178 	setup_udp_test(state);
   2179 
   2180 	connect_readcb = udp__connect_read_cb;
   2181 
   2182 	expected_cconnects = 2;
   2183 	cconnects_shutdown = false;
   2184 
   2185 	expected_csends = 2;
   2186 	csends_shutdown = false;
   2187 
   2188 	expected_sreads = 2;
   2189 	sreads_shutdown = false;
   2190 
   2191 	expected_ssends = 2;
   2192 	ssends_shutdown = false;
   2193 
   2194 	expected_creads = 2;
   2195 	creads_shutdown = true;
   2196 
   2197 	return 0;
   2198 }
   2199 
   2200 int
   2201 udp_recv_two_teardown(void **state) {
   2202 	atomic_assert_int_eq(cconnects, expected_cconnects);
   2203 	atomic_assert_int_eq(csends, expected_csends);
   2204 	atomic_assert_int_eq(sreads, expected_sreads);
   2205 	atomic_assert_int_eq(ssends, expected_ssends);
   2206 	atomic_assert_int_eq(creads, expected_creads);
   2207 
   2208 	teardown_udp_test(state);
   2209 	return 0;
   2210 }
   2211 
   2212 void
   2213 udp_recv_two(void **arg ISC_ATTR_UNUSED) {
   2214 	udp_start_listening(ISC_NM_LISTEN_ONE, udp_listen_read_cb);
   2215 
   2216 	udp_enqueue_connect(NULL);
   2217 	udp_enqueue_connect(NULL);
   2218 }
   2219 
   2220 int
   2221 proxyudp_recv_two_setup(void **state) {
   2222 	udp_use_PROXY = true;
   2223 	return udp_recv_two_setup(state);
   2224 }
   2225 
   2226 int
   2227 proxyudp_recv_two_teardown(void **state) {
   2228 	int ret = udp_recv_two_teardown(state);
   2229 	udp_use_PROXY = false;
   2230 	return ret;
   2231 }
   2232 
   2233 int
   2234 udp_recv_send_setup(void **state) {
   2235 	setup_udp_test(state);
   2236 
   2237 	/* Allow some leeway (+1) as datagram service is unreliable */
   2238 	expected_cconnects = (workers + 1) * NSENDS;
   2239 	cconnects_shutdown = false;
   2240 
   2241 	expected_creads = workers * NSENDS;
   2242 	do_send = true;
   2243 
   2244 	return 0;
   2245 }
   2246 
   2247 int
   2248 udp_recv_send_teardown(void **state) {
   2249 	atomic_assert_int_ge(cconnects, expected_creads);
   2250 	atomic_assert_int_ge(csends, expected_creads);
   2251 	atomic_assert_int_ge(sreads, expected_creads);
   2252 	atomic_assert_int_ge(ssends, expected_creads);
   2253 	atomic_assert_int_ge(creads, expected_creads);
   2254 
   2255 	teardown_udp_test(state);
   2256 	return 0;
   2257 }
   2258 
   2259 void
   2260 udp_recv_send(void **arg ISC_ATTR_UNUSED) {
   2261 	udp_start_listening(ISC_NM_LISTEN_ALL, udp_listen_read_cb);
   2262 
   2263 	for (size_t i = 0; i < workers; i++) {
   2264 		isc_async_run(isc_loop_get(loopmgr, i), udp_enqueue_connect,
   2265 			      NULL);
   2266 	}
   2267 }
   2268 
   2269 int
   2270 proxyudp_recv_send_setup(void **state) {
   2271 	udp_use_PROXY = true;
   2272 	return udp_recv_send_setup(state);
   2273 }
   2274 
   2275 int
   2276 proxyudp_recv_send_teardown(void **state) {
   2277 	int ret = udp_recv_send_teardown(state);
   2278 	udp_use_PROXY = false;
   2279 	return ret;
   2280 }
   2281 
   2282 static void
   2283 udp_double_read_send_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   2284 			void *cbarg) {
   2285 	assert_non_null(handle);
   2286 
   2287 	F();
   2288 
   2289 	isc_refcount_decrement(&active_ssends);
   2290 
   2291 	switch (eresult) {
   2292 	case ISC_R_SUCCESS:
   2293 		if (have_expected_ssends(atomic_fetch_add(&ssends, 1) + 1)) {
   2294 			do_ssends_shutdown(loopmgr);
   2295 		} else {
   2296 			isc_nmhandle_t *sendhandle = NULL;
   2297 			isc_nmhandle_attach(handle, &sendhandle);
   2298 			isc_nmhandle_setwritetimeout(sendhandle, T_IDLE);
   2299 			isc_refcount_increment0(&active_ssends);
   2300 			isc_nm_send(sendhandle, &send_msg,
   2301 				    udp_double_read_send_cb, cbarg);
   2302 			break;
   2303 		}
   2304 		break;
   2305 	case ISC_R_CANCELED:
   2306 		break;
   2307 	default:
   2308 		fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
   2309 			isc_result_totext(eresult), cbarg);
   2310 		assert_int_equal(eresult, ISC_R_SUCCESS);
   2311 	}
   2312 
   2313 	isc_nmhandle_detach(&handle);
   2314 }
   2315 
   2316 static void
   2317 udp_double_read_listen_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   2318 			  isc_region_t *region, void *cbarg) {
   2319 	uint64_t magic = 0;
   2320 
   2321 	assert_non_null(handle);
   2322 
   2323 	F();
   2324 
   2325 	switch (eresult) {
   2326 	case ISC_R_EOF:
   2327 	case ISC_R_SHUTTINGDOWN:
   2328 	case ISC_R_CANCELED:
   2329 		break;
   2330 	case ISC_R_SUCCESS:
   2331 		memmove(&magic, region->base, sizeof(magic));
   2332 		assert_true(magic == send_magic);
   2333 
   2334 		assert_true(region->length >= sizeof(magic));
   2335 
   2336 		memmove(&magic, region->base, sizeof(magic));
   2337 		assert_true(magic == send_magic);
   2338 
   2339 		isc_nmhandle_t *sendhandle = NULL;
   2340 		isc_nmhandle_attach(handle, &sendhandle);
   2341 		isc_nmhandle_setwritetimeout(sendhandle, T_IDLE);
   2342 		isc_refcount_increment0(&active_ssends);
   2343 		isc_nm_send(sendhandle, &send_msg, udp_double_read_send_cb,
   2344 			    cbarg);
   2345 		return;
   2346 	default:
   2347 		fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
   2348 			isc_result_totext(eresult), cbarg);
   2349 		assert_int_equal(eresult, ISC_R_SUCCESS);
   2350 	}
   2351 
   2352 	isc_refcount_decrement(&active_sreads);
   2353 
   2354 	isc_nmhandle_detach(&handle);
   2355 }
   2356 
   2357 static void
   2358 udp_double_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
   2359 		   isc_region_t *region, void *cbarg) {
   2360 	uint64_t magic = 0;
   2361 	bool detach = false;
   2362 
   2363 	assert_non_null(handle);
   2364 
   2365 	F();
   2366 
   2367 	switch (eresult) {
   2368 	case ISC_R_TIMEDOUT:
   2369 		/*
   2370 		 * We are operating on the localhost, UDP cannot get lost, but
   2371 		 * it could be delayed, so we read again until we get the
   2372 		 * answer.
   2373 		 */
   2374 		detach = false;
   2375 		break;
   2376 	case ISC_R_SUCCESS:
   2377 		assert_true(region->length >= sizeof(magic));
   2378 
   2379 		memmove(&magic, region->base, sizeof(magic));
   2380 
   2381 		assert_true(magic == send_magic);
   2382 
   2383 		if (have_expected_creads(atomic_fetch_add(&creads, 1) + 1)) {
   2384 			do_creads_shutdown(loopmgr);
   2385 			detach = true;
   2386 		}
   2387 
   2388 		if (magic == send_magic && allow_send_back) {
   2389 			connect_send(handle);
   2390 			return;
   2391 		}
   2392 
   2393 		break;
   2394 	case ISC_R_EOF:
   2395 	case ISC_R_SHUTTINGDOWN:
   2396 	case ISC_R_CANCELED:
   2397 	case ISC_R_CONNECTIONRESET:
   2398 		detach = true;
   2399 		break;
   2400 	default:
   2401 		fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
   2402 			isc_result_totext(eresult), cbarg);
   2403 		assert_int_equal(eresult, ISC_R_SUCCESS);
   2404 	}
   2405 
   2406 	if (detach) {
   2407 		isc_refcount_decrement(&active_creads);
   2408 		isc_nmhandle_detach(&handle);
   2409 	} else {
   2410 		isc_nm_read(handle, connect_readcb, cbarg);
   2411 	}
   2412 }
   2413 
   2414 int
   2415 udp_double_read_setup(void **state) {
   2416 	setup_udp_test(state);
   2417 
   2418 	expected_cconnects = 1;
   2419 	cconnects_shutdown = false;
   2420 
   2421 	expected_csends = 1;
   2422 	csends_shutdown = false;
   2423 
   2424 	expected_sreads = 1;
   2425 	sreads_shutdown = false;
   2426 
   2427 	expected_ssends = 2;
   2428 	ssends_shutdown = false;
   2429 	expected_creads = 2;
   2430 	creads_shutdown = true;
   2431 
   2432 	connect_readcb = udp_double_read_cb;
   2433 
   2434 	return 0;
   2435 }
   2436 
   2437 int
   2438 udp_double_read_teardown(void **state) {
   2439 	atomic_assert_int_eq(creads, expected_creads);
   2440 
   2441 	teardown_udp_test(state);
   2442 
   2443 	return 0;
   2444 }
   2445 
   2446 void
   2447 udp_double_read(void **arg ISC_ATTR_UNUSED) {
   2448 	udp_start_listening(ISC_NM_LISTEN_ALL, udp_double_read_listen_cb);
   2449 
   2450 	udp_enqueue_connect(NULL);
   2451 }
   2452 
   2453 int
   2454 proxyudp_double_read_setup(void **state) {
   2455 	udp_use_PROXY = true;
   2456 	return udp_double_read_setup(state);
   2457 }
   2458 
   2459 int
   2460 proxyudp_double_read_teardown(void **state) {
   2461 	int ret = udp_double_read_teardown(state);
   2462 	udp_use_PROXY = false;
   2463 	return ret;
   2464 }
   2465