1 /* $NetBSD: netmgr_common.c,v 1.4 2025/07/17 19:01:47 christos Exp $ */ 2 3 /* 4 * Copyright (C) Internet Systems Consortium, Inc. ("ISC") 5 * 6 * SPDX-License-Identifier: MPL-2.0 7 * 8 * This Source Code Form is subject to the terms of the Mozilla Public 9 * License, v. 2.0. If a copy of the MPL was not distributed with this 10 * file, you can obtain one at https://mozilla.org/MPL/2.0/. 11 * 12 * See the COPYRIGHT file distributed with this work for additional 13 * information regarding copyright ownership. 14 */ 15 16 #include <inttypes.h> 17 #include <sched.h> /* IWYU pragma: keep */ 18 #include <setjmp.h> 19 #include <signal.h> 20 #include <stdarg.h> 21 #include <stdlib.h> 22 #include <unistd.h> 23 24 /* 25 * As a workaround, include an OpenSSL header file before including cmocka.h, 26 * because OpenSSL 3.1.0 uses __attribute__(malloc), conflicting with a 27 * redefined malloc in cmocka.h. 28 */ 29 #include <openssl/err.h> 30 31 #define UNIT_TESTING 32 #include <cmocka.h> 33 34 #include <isc/async.h> 35 #include <isc/nonce.h> 36 #include <isc/os.h> 37 #include <isc/quota.h> 38 #include <isc/refcount.h> 39 #include <isc/sockaddr.h> 40 #include <isc/thread.h> 41 #include <isc/util.h> 42 #include <isc/uv.h> 43 #define KEEP_BEFORE 44 45 #include "netmgr_common.h" 46 47 #include <tests/isc.h> 48 49 isc_nm_t *listen_nm = NULL; 50 isc_nm_t *connect_nm = NULL; 51 52 isc_sockaddr_t tcp_listen_addr; 53 isc_sockaddr_t tcp_connect_addr; 54 isc_tlsctx_t *tcp_listen_tlsctx = NULL; 55 isc_tlsctx_t *tcp_connect_tlsctx = NULL; 56 isc_tlsctx_client_session_cache_t *tcp_tlsctx_client_sess_cache = NULL; 57 58 isc_sockaddr_t udp_listen_addr; 59 isc_sockaddr_t udp_connect_addr; 60 61 uint64_t send_magic = 0; 62 63 isc_region_t send_msg = { .base = (unsigned char *)&send_magic, 64 .length = sizeof(send_magic) }; 65 66 atomic_bool do_send = false; 67 68 atomic_int_fast64_t nsends = 0; 69 int_fast64_t esends = 0; /* expected sends */ 70 71 atomic_int_fast64_t ssends = 0; 72 atomic_int_fast64_t sreads = 0; 73 atomic_int_fast64_t saccepts = 0; 74 75 atomic_int_fast64_t cconnects = 0; 76 atomic_int_fast64_t csends = 0; 77 atomic_int_fast64_t creads = 0; 78 atomic_int_fast64_t ctimeouts = 0; 79 80 int expected_ssends; 81 int expected_sreads; 82 int expected_csends; 83 int expected_cconnects; 84 int expected_creads; 85 int expected_saccepts; 86 int expected_ctimeouts; 87 88 bool ssends_shutdown; 89 bool sreads_shutdown; 90 bool saccepts_shutdown; 91 bool csends_shutdown; 92 bool cconnects_shutdown; 93 bool creads_shutdown; 94 bool ctimeouts_shutdown; 95 96 isc_refcount_t active_cconnects = 0; 97 isc_refcount_t active_csends = 0; 98 isc_refcount_t active_creads = 0; 99 isc_refcount_t active_ssends = 0; 100 isc_refcount_t active_sreads = 0; 101 102 isc_nmsocket_t *listen_sock = NULL; 103 104 isc_quota_t listener_quota; 105 atomic_bool check_listener_quota = false; 106 107 bool allow_send_back = false; 108 bool noanswer = false; 109 bool stream_use_TLS = false; 110 bool stream_use_PROXY = false; 111 bool stream_PROXY_over_TLS = false; 112 bool stream = false; 113 in_port_t stream_port = 0; 114 115 bool udp_use_PROXY = false; 116 117 isc_nm_recv_cb_t connect_readcb = NULL; 118 119 isc_nm_proxyheader_info_t proxy_info_data; 120 isc_nm_proxyheader_info_t *proxy_info = NULL; 121 isc_sockaddr_t proxy_src; 122 isc_sockaddr_t proxy_dst; 123 124 int 125 setup_netmgr_test(void **state) { 126 struct in_addr in; 127 tcp_connect_addr = (isc_sockaddr_t){ .length = 0 }; 128 isc_sockaddr_fromin6(&tcp_connect_addr, &in6addr_loopback, 0); 129 130 tcp_listen_addr = (isc_sockaddr_t){ .length = 0 }; 131 isc_sockaddr_fromin6(&tcp_listen_addr, &in6addr_loopback, stream_port); 132 133 RUNTIME_CHECK(inet_pton(AF_INET, "1.2.3.4", &in) == 1); 134 isc_sockaddr_fromin(&proxy_src, &in, 1234); 135 RUNTIME_CHECK(inet_pton(AF_INET, "4.3.2.1", &in) == 1); 136 isc_sockaddr_fromin(&proxy_dst, &in, 4321); 137 isc_nm_proxyheader_info_init(&proxy_info_data, &proxy_src, &proxy_dst, 138 NULL); 139 140 esends = NSENDS * workers; 141 142 atomic_store(&nsends, esends); 143 144 atomic_store(&saccepts, 0); 145 atomic_store(&sreads, 0); 146 atomic_store(&ssends, 0); 147 148 atomic_store(&cconnects, 0); 149 atomic_store(&csends, 0); 150 atomic_store(&creads, 0); 151 atomic_store(&ctimeouts, 0); 152 allow_send_back = false; 153 154 expected_cconnects = -1; 155 expected_csends = -1; 156 expected_creads = -1; 157 expected_sreads = -1; 158 expected_ssends = -1; 159 expected_saccepts = -1; 160 expected_ctimeouts = -1; 161 162 ssends_shutdown = true; 163 sreads_shutdown = true; 164 saccepts_shutdown = true; 165 csends_shutdown = true; 166 cconnects_shutdown = true; 167 creads_shutdown = true; 168 ctimeouts_shutdown = true; 169 170 do_send = false; 171 172 isc_refcount_init(&active_cconnects, 0); 173 isc_refcount_init(&active_csends, 0); 174 isc_refcount_init(&active_creads, 0); 175 isc_refcount_init(&active_ssends, 0); 176 isc_refcount_init(&active_sreads, 0); 177 178 isc_nonce_buf(&send_magic, sizeof(send_magic)); 179 180 setup_loopmgr(state); 181 isc_netmgr_create(mctx, loopmgr, &listen_nm); 182 assert_non_null(listen_nm); 183 isc_nm_settimeouts(listen_nm, T_INIT, T_IDLE, T_KEEPALIVE, 184 T_ADVERTISED); 185 186 isc_netmgr_create(mctx, loopmgr, &connect_nm); 187 assert_non_null(connect_nm); 188 isc_nm_settimeouts(connect_nm, T_INIT, T_IDLE, T_KEEPALIVE, 189 T_ADVERTISED); 190 191 isc_quota_init(&listener_quota, 0); 192 atomic_store(&check_listener_quota, false); 193 194 connect_readcb = connect_read_cb; 195 noanswer = false; 196 197 if (isc_tlsctx_createserver(NULL, NULL, &tcp_listen_tlsctx) != 198 ISC_R_SUCCESS) 199 { 200 return -1; 201 } 202 if (isc_tlsctx_createclient(&tcp_connect_tlsctx) != ISC_R_SUCCESS) { 203 return -1; 204 } 205 206 isc_tlsctx_enable_dot_client_alpn(tcp_connect_tlsctx); 207 208 isc_tlsctx_client_session_cache_create( 209 mctx, tcp_connect_tlsctx, 210 ISC_TLSCTX_CLIENT_SESSION_CACHE_DEFAULT_SIZE, 211 &tcp_tlsctx_client_sess_cache); 212 213 return 0; 214 } 215 216 int 217 teardown_netmgr_test(void **state ISC_ATTR_UNUSED) { 218 UNUSED(state); 219 220 isc_tlsctx_client_session_cache_detach(&tcp_tlsctx_client_sess_cache); 221 222 isc_tlsctx_free(&tcp_connect_tlsctx); 223 isc_tlsctx_free(&tcp_listen_tlsctx); 224 225 isc_netmgr_destroy(&connect_nm); 226 assert_null(connect_nm); 227 228 isc_netmgr_destroy(&listen_nm); 229 assert_null(listen_nm); 230 231 teardown_loopmgr(state); 232 233 isc_refcount_destroy(&active_cconnects); 234 isc_refcount_destroy(&active_csends); 235 isc_refcount_destroy(&active_creads); 236 isc_refcount_destroy(&active_ssends); 237 isc_refcount_destroy(&active_sreads); 238 239 proxy_info = NULL; 240 241 return 0; 242 } 243 244 void 245 stop_listening(void *arg ISC_ATTR_UNUSED) { 246 isc_nm_stoplistening(listen_sock); 247 isc_nmsocket_close(&listen_sock); 248 assert_null(listen_sock); 249 } 250 251 /* Callbacks */ 252 253 void 254 noop_recv_cb(isc_nmhandle_t *handle ISC_ATTR_UNUSED, 255 isc_result_t eresult ISC_ATTR_UNUSED, 256 isc_region_t *region ISC_ATTR_UNUSED, 257 void *cbarg ISC_ATTR_UNUSED) { 258 F(); 259 } 260 261 isc_result_t 262 noop_accept_cb(isc_nmhandle_t *handle ISC_ATTR_UNUSED, isc_result_t eresult, 263 void *cbarg ISC_ATTR_UNUSED) { 264 F(); 265 266 if (eresult == ISC_R_SUCCESS) { 267 (void)atomic_fetch_add(&saccepts, 1); 268 } 269 270 return ISC_R_SUCCESS; 271 } 272 273 void 274 connect_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg); 275 276 void 277 connect_send(isc_nmhandle_t *handle); 278 279 void 280 connect_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) { 281 isc_nmhandle_t *sendhandle = handle; 282 283 assert_non_null(sendhandle); 284 285 UNUSED(cbarg); 286 287 F(); 288 289 switch (eresult) { 290 case ISC_R_EOF: 291 case ISC_R_SHUTTINGDOWN: 292 case ISC_R_CANCELED: 293 case ISC_R_CONNECTIONRESET: 294 /* Abort */ 295 if (!stream) { 296 isc_nm_cancelread(handle); 297 } 298 break; 299 case ISC_R_SUCCESS: 300 if (have_expected_csends(atomic_fetch_add(&csends, 1) + 1)) { 301 do_csends_shutdown(loopmgr); 302 } 303 break; 304 default: 305 fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle, 306 isc_result_totext(eresult), cbarg); 307 assert_int_equal(eresult, ISC_R_SUCCESS); 308 } 309 310 isc_refcount_decrement(&active_csends); 311 isc_nmhandle_detach(&sendhandle); 312 } 313 314 void 315 connect_send(isc_nmhandle_t *handle) { 316 isc_nmhandle_t *sendhandle = NULL; 317 isc_refcount_increment0(&active_csends); 318 isc_nmhandle_attach(handle, &sendhandle); 319 isc_nmhandle_setwritetimeout(handle, T_IDLE); 320 isc_nm_send(sendhandle, &send_msg, connect_send_cb, NULL); 321 } 322 323 void 324 connect_read_cb(isc_nmhandle_t *handle, isc_result_t eresult, 325 isc_region_t *region, void *cbarg) { 326 uint64_t magic = 0; 327 328 UNUSED(cbarg); 329 330 assert_non_null(handle); 331 332 F(); 333 334 switch (eresult) { 335 case ISC_R_SUCCESS: 336 assert_true(region->length >= sizeof(magic)); 337 338 memmove(&magic, region->base, sizeof(magic)); 339 340 assert_true(magic == send_magic); 341 342 if (have_expected_creads(atomic_fetch_add(&creads, 1) + 1)) { 343 do_creads_shutdown(loopmgr); 344 } 345 346 if (magic == send_magic && allow_send_back) { 347 connect_send(handle); 348 return; 349 } 350 351 /* This will initiate one more read callback */ 352 if (stream) { 353 isc_nmhandle_close(handle); 354 } 355 break; 356 case ISC_R_TIMEDOUT: 357 case ISC_R_EOF: 358 case ISC_R_SHUTTINGDOWN: 359 case ISC_R_CANCELED: 360 case ISC_R_CONNECTIONRESET: 361 case ISC_R_CONNREFUSED: 362 break; 363 default: 364 fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle, 365 isc_result_totext(eresult), cbarg); 366 assert_int_equal(eresult, ISC_R_SUCCESS); 367 } 368 369 isc_refcount_decrement(&active_creads); 370 isc_nmhandle_detach(&handle); 371 } 372 373 void 374 connect_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) { 375 isc_nmhandle_t *readhandle = NULL; 376 377 F(); 378 379 isc_refcount_decrement(&active_cconnects); 380 381 if (eresult != ISC_R_SUCCESS || connect_readcb == NULL) { 382 return; 383 } 384 385 if (stream_use_PROXY) { 386 assert_true(isc_nm_is_proxy_handle(handle)); 387 } 388 389 /* We are finished, initiate the shutdown */ 390 if (have_expected_cconnects(atomic_fetch_add(&cconnects, 1) + 1)) { 391 do_cconnects_shutdown(loopmgr); 392 } else if (do_send) { 393 isc_async_current(stream_recv_send_connect, 394 cbarg == NULL 395 ? get_stream_connect_function() 396 : (stream_connect_function)cbarg); 397 } 398 399 isc_refcount_increment0(&active_creads); 400 isc_nmhandle_attach(handle, &readhandle); 401 isc_nm_read(handle, connect_readcb, NULL); 402 403 connect_send(handle); 404 } 405 406 void 407 listen_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) { 408 isc_nmhandle_t *sendhandle = handle; 409 410 UNUSED(cbarg); 411 UNUSED(eresult); 412 413 assert_non_null(sendhandle); 414 415 F(); 416 417 switch (eresult) { 418 case ISC_R_CANCELED: 419 case ISC_R_CONNECTIONRESET: 420 case ISC_R_EOF: 421 case ISC_R_SHUTTINGDOWN: 422 break; 423 case ISC_R_SUCCESS: 424 if (have_expected_ssends(atomic_fetch_add(&ssends, 1) + 1)) { 425 do_ssends_shutdown(loopmgr); 426 } 427 break; 428 default: 429 fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle, 430 isc_result_totext(eresult), cbarg); 431 assert_int_equal(eresult, ISC_R_SUCCESS); 432 } 433 434 isc_refcount_decrement(&active_ssends); 435 isc_nmhandle_detach(&sendhandle); 436 } 437 438 void 439 listen_read_cb(isc_nmhandle_t *handle, isc_result_t eresult, 440 isc_region_t *region, void *cbarg) { 441 uint64_t magic = 0; 442 443 assert_non_null(handle); 444 445 F(); 446 447 switch (eresult) { 448 case ISC_R_SUCCESS: 449 if (udp_use_PROXY || stream_use_PROXY) { 450 assert_true(isc_nm_is_proxy_handle(handle)); 451 proxy_verify_endpoints(handle); 452 } 453 454 memmove(&magic, region->base, sizeof(magic)); 455 assert_true(magic == send_magic); 456 457 if (have_expected_sreads(atomic_fetch_add(&sreads, 1) + 1)) { 458 do_sreads_shutdown(loopmgr); 459 } 460 461 assert_true(region->length >= sizeof(magic)); 462 463 memmove(&magic, region->base, sizeof(magic)); 464 assert_true(magic == send_magic); 465 466 if (!noanswer) { 467 /* Answer and continue to listen */ 468 isc_nmhandle_t *sendhandle = NULL; 469 isc_nmhandle_attach(handle, &sendhandle); 470 isc_refcount_increment0(&active_ssends); 471 isc_nmhandle_setwritetimeout(sendhandle, T_IDLE); 472 isc_nm_send(sendhandle, &send_msg, listen_send_cb, 473 cbarg); 474 } 475 /* Continue to listen */ 476 return; 477 case ISC_R_CANCELED: 478 case ISC_R_CONNECTIONRESET: 479 case ISC_R_EOF: 480 case ISC_R_SHUTTINGDOWN: 481 break; 482 default: 483 fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle, 484 isc_result_totext(eresult), cbarg); 485 assert_int_equal(eresult, ISC_R_SUCCESS); 486 } 487 488 isc_refcount_decrement(&active_sreads); 489 isc_nmhandle_detach(&handle); 490 } 491 492 isc_result_t 493 listen_accept_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) { 494 UNUSED(handle); 495 UNUSED(cbarg); 496 497 F(); 498 499 if (eresult != ISC_R_SUCCESS) { 500 return eresult; 501 } 502 503 if (have_expected_saccepts(atomic_fetch_add(&saccepts, 1) + 1)) { 504 do_saccepts_shutdown(loopmgr); 505 } 506 507 isc_nmhandle_attach(handle, &(isc_nmhandle_t *){ NULL }); 508 isc_refcount_increment0(&active_sreads); 509 510 return eresult; 511 } 512 513 isc_result_t 514 stream_accept_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) { 515 isc_nmhandle_t *readhandle = NULL; 516 517 UNUSED(cbarg); 518 519 F(); 520 521 if (eresult != ISC_R_SUCCESS) { 522 return eresult; 523 } 524 525 if (have_expected_saccepts(atomic_fetch_add(&saccepts, 1) + 1)) { 526 do_saccepts_shutdown(loopmgr); 527 } 528 529 if (stream_use_PROXY) { 530 assert_true(isc_nm_is_proxy_handle(handle)); 531 proxy_verify_endpoints(handle); 532 } 533 534 isc_refcount_increment0(&active_sreads); 535 536 isc_nmhandle_attach(handle, &readhandle); 537 isc_nm_read(handle, listen_read_cb, readhandle); 538 539 return ISC_R_SUCCESS; 540 } 541 542 void 543 stream_recv_send_connect(void *arg) { 544 connect_func connect = (connect_func)arg; 545 isc_sockaddr_t connect_addr; 546 547 connect_addr = (isc_sockaddr_t){ .length = 0 }; 548 isc_sockaddr_fromin6(&connect_addr, &in6addr_loopback, 0); 549 550 isc_refcount_increment0(&active_cconnects); 551 connect(connect_nm); 552 } 553 554 /* Common stream protocols code */ 555 556 void 557 timeout_retry_cb(isc_nmhandle_t *handle, isc_result_t eresult, 558 isc_region_t *region, void *cbarg) { 559 UNUSED(region); 560 UNUSED(cbarg); 561 562 assert_non_null(handle); 563 564 F(); 565 566 if (eresult == ISC_R_TIMEDOUT && 567 atomic_fetch_add(&ctimeouts, 1) + 1 < expected_ctimeouts) 568 { 569 isc_nmhandle_settimeout(handle, T_SOFT); 570 connect_send(handle); 571 return; 572 } 573 574 isc_refcount_decrement(&active_creads); 575 isc_nmhandle_detach(&handle); 576 577 isc_loopmgr_shutdown(loopmgr); 578 } 579 580 isc_quota_t * 581 tcp_listener_init_quota(size_t nthreads) { 582 isc_quota_t *quotap = NULL; 583 if (atomic_load(&check_listener_quota)) { 584 unsigned int max_quota = ISC_MAX(nthreads / 2, 1); 585 isc_quota_max(&listener_quota, max_quota); 586 quotap = &listener_quota; 587 } 588 return quotap; 589 } 590 591 static void 592 tcp_connect(isc_nm_t *nm) { 593 isc_nm_tcpconnect(nm, &tcp_connect_addr, &tcp_listen_addr, 594 connect_connect_cb, NULL, T_CONNECT); 595 } 596 597 static void 598 tls_connect(isc_nm_t *nm) { 599 isc_nm_tlsconnect(nm, &tcp_connect_addr, &tcp_listen_addr, 600 connect_connect_cb, NULL, tcp_connect_tlsctx, NULL, 601 tcp_tlsctx_client_sess_cache, T_CONNECT, 602 stream_use_PROXY, NULL); 603 } 604 605 void 606 set_proxyheader_info(isc_nm_proxyheader_info_t *pi) { 607 proxy_info = pi; 608 } 609 610 isc_nm_proxyheader_info_t * 611 get_proxyheader_info(void) { 612 if (proxy_info != NULL) { 613 return proxy_info; 614 } 615 616 /* 617 * There is 50% chance to get the info: so we can test LOCAL headers, 618 * too. 619 */ 620 if (isc_random_uniform(2)) { 621 return &proxy_info_data; 622 } 623 624 return NULL; 625 } 626 627 static void 628 proxystream_connect(isc_nm_t *nm) { 629 isc_tlsctx_t *tlsctx = stream_PROXY_over_TLS ? tcp_connect_tlsctx 630 : NULL; 631 isc_tlsctx_client_session_cache_t *sess_cache = 632 stream_PROXY_over_TLS ? tcp_tlsctx_client_sess_cache : NULL; 633 634 isc_nm_proxystreamconnect(nm, &tcp_connect_addr, &tcp_listen_addr, 635 connect_connect_cb, NULL, T_CONNECT, tlsctx, 636 NULL, sess_cache, get_proxyheader_info()); 637 } 638 639 stream_connect_function 640 get_stream_connect_function(void) { 641 if (stream_use_TLS && !stream_PROXY_over_TLS) { 642 return tls_connect; 643 } else if (stream_use_PROXY) { 644 return proxystream_connect; 645 } else { 646 return tcp_connect; 647 } 648 649 UNREACHABLE(); 650 } 651 652 isc_result_t 653 stream_listen(isc_nm_accept_cb_t accept_cb, void *accept_cbarg, int backlog, 654 isc_quota_t *quota, isc_nmsocket_t **sockp) { 655 isc_result_t result = ISC_R_SUCCESS; 656 657 if (stream_use_TLS && !stream_PROXY_over_TLS) { 658 result = isc_nm_listentls( 659 listen_nm, ISC_NM_LISTEN_ALL, &tcp_listen_addr, 660 accept_cb, accept_cbarg, backlog, quota, 661 tcp_listen_tlsctx, stream_use_PROXY, sockp); 662 return result; 663 } else if (stream_use_PROXY) { 664 isc_tlsctx_t *tlsctx = stream_PROXY_over_TLS ? tcp_listen_tlsctx 665 : NULL; 666 result = isc_nm_listenproxystream( 667 listen_nm, ISC_NM_LISTEN_ALL, &tcp_listen_addr, 668 accept_cb, accept_cbarg, backlog, quota, tlsctx, sockp); 669 return result; 670 } else { 671 result = isc_nm_listentcp(listen_nm, ISC_NM_LISTEN_ALL, 672 &tcp_listen_addr, accept_cb, 673 accept_cbarg, backlog, quota, sockp); 674 return result; 675 } 676 677 UNREACHABLE(); 678 } 679 680 void 681 stream_connect(isc_nm_cb_t cb, void *cbarg, unsigned int timeout) { 682 isc_refcount_increment0(&active_cconnects); 683 684 if (stream_use_TLS && !stream_PROXY_over_TLS) { 685 isc_nm_tlsconnect(connect_nm, &tcp_connect_addr, 686 &tcp_listen_addr, cb, cbarg, 687 tcp_connect_tlsctx, NULL, 688 tcp_tlsctx_client_sess_cache, timeout, 689 stream_use_PROXY, NULL); 690 return; 691 } else if (stream_use_PROXY) { 692 isc_tlsctx_t *tlsctx = stream_PROXY_over_TLS 693 ? tcp_connect_tlsctx 694 : NULL; 695 isc_tlsctx_client_session_cache_t *sess_cache = 696 stream_PROXY_over_TLS ? tcp_tlsctx_client_sess_cache 697 : NULL; 698 isc_nm_proxystreamconnect(connect_nm, &tcp_connect_addr, 699 &tcp_listen_addr, cb, cbarg, timeout, 700 tlsctx, NULL, sess_cache, 701 get_proxyheader_info()); 702 return; 703 } else { 704 isc_nm_tcpconnect(connect_nm, &tcp_connect_addr, 705 &tcp_listen_addr, cb, cbarg, timeout); 706 return; 707 } 708 UNREACHABLE(); 709 } 710 711 isc_nm_proxy_type_t 712 get_proxy_type(void) { 713 if (!stream_use_PROXY) { 714 return ISC_NM_PROXY_NONE; 715 } else if (stream_PROXY_over_TLS) { 716 return ISC_NM_PROXY_ENCRYPTED; 717 } 718 719 return ISC_NM_PROXY_PLAIN; 720 } 721 722 void 723 connect_success_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) { 724 UNUSED(handle); 725 UNUSED(cbarg); 726 727 F(); 728 729 isc_refcount_decrement(&active_cconnects); 730 assert_int_equal(eresult, ISC_R_SUCCESS); 731 732 if (have_expected_cconnects(atomic_fetch_add(&cconnects, 1) + 1)) { 733 do_cconnects_shutdown(loopmgr); 734 return; 735 } 736 } 737 738 int 739 stream_noop_setup(void **state ISC_ATTR_UNUSED) { 740 int r = setup_netmgr_test(state); 741 expected_cconnects = 1; 742 return r; 743 } 744 745 int 746 proxystream_noop_setup(void **state) { 747 stream_use_PROXY = true; 748 return stream_noop_setup(state); 749 } 750 751 int 752 proxystreamtls_noop_setup(void **state) { 753 stream_PROXY_over_TLS = true; 754 return proxystream_noop_setup(state); 755 } 756 757 void 758 stream_noop(void **state ISC_ATTR_UNUSED) { 759 isc_result_t result = ISC_R_SUCCESS; 760 761 result = stream_listen(noop_accept_cb, NULL, 128, NULL, &listen_sock); 762 assert_int_equal(result, ISC_R_SUCCESS); 763 isc_loop_teardown(mainloop, stop_listening, listen_sock); 764 765 connect_readcb = NULL; 766 stream_connect(connect_success_cb, NULL, T_CONNECT); 767 } 768 769 int 770 stream_noop_teardown(void **state ISC_ATTR_UNUSED) { 771 atomic_assert_int_eq(cconnects, 1); 772 atomic_assert_int_eq(csends, 0); 773 atomic_assert_int_eq(creads, 0); 774 atomic_assert_int_eq(sreads, 0); 775 atomic_assert_int_eq(ssends, 0); 776 return teardown_netmgr_test(state); 777 } 778 779 int 780 proxystream_noop_teardown(void **state) { 781 int r = stream_noop_teardown(state); 782 stream_use_PROXY = false; 783 784 return r; 785 } 786 787 int 788 proxystreamtls_noop_teardown(void **state) { 789 int r = proxystream_noop_teardown(state); 790 stream_PROXY_over_TLS = false; 791 792 return r; 793 } 794 795 static void 796 noresponse_readcb(isc_nmhandle_t *handle, isc_result_t eresult, 797 isc_region_t *region, void *cbarg) { 798 UNUSED(handle); 799 UNUSED(region); 800 UNUSED(cbarg); 801 802 F(); 803 804 assert_true(eresult == ISC_R_CANCELED || 805 eresult == ISC_R_CONNECTIONRESET || eresult == ISC_R_EOF); 806 807 isc_refcount_decrement(&active_creads); 808 isc_nmhandle_detach(&handle); 809 810 isc_loopmgr_shutdown(loopmgr); 811 } 812 813 static void 814 noresponse_sendcb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) { 815 UNUSED(cbarg); 816 UNUSED(eresult); 817 818 F(); 819 820 assert_non_null(handle); 821 atomic_fetch_add(&csends, 1); 822 isc_nmhandle_detach(&handle); 823 isc_refcount_decrement(&active_csends); 824 } 825 826 static void 827 noresponse_connectcb(isc_nmhandle_t *handle, isc_result_t eresult, 828 void *cbarg) { 829 isc_nmhandle_t *readhandle = NULL; 830 isc_nmhandle_t *sendhandle = NULL; 831 832 F(); 833 834 isc_refcount_decrement(&active_cconnects); 835 836 assert_int_equal(eresult, ISC_R_SUCCESS); 837 838 atomic_fetch_add(&cconnects, 1); 839 840 isc_refcount_increment0(&active_creads); 841 isc_nmhandle_attach(handle, &readhandle); 842 isc_nm_read(handle, noresponse_readcb, NULL); 843 844 isc_refcount_increment0(&active_csends); 845 isc_nmhandle_attach(handle, &sendhandle); 846 isc_nmhandle_setwritetimeout(handle, T_IDLE); 847 848 isc_nm_send(handle, (isc_region_t *)&send_msg, noresponse_sendcb, 849 cbarg); 850 } 851 852 int 853 stream_noresponse_setup(void **state ISC_ATTR_UNUSED) { 854 int r = setup_netmgr_test(state); 855 expected_cconnects = 1; 856 expected_saccepts = 1; 857 return r; 858 } 859 860 int 861 proxystream_noresponse_setup(void **state) { 862 stream_use_PROXY = true; 863 return stream_noresponse_setup(state); 864 } 865 866 int 867 proxystream_noresponse_teardown(void **state) { 868 int r = stream_noresponse_teardown(state); 869 stream_use_PROXY = false; 870 return r; 871 } 872 873 int 874 proxystreamtls_noresponse_setup(void **state) { 875 stream_PROXY_over_TLS = true; 876 return proxystream_noresponse_setup(state); 877 } 878 879 int 880 proxystreamtls_noresponse_teardown(void **state) { 881 int r = proxystream_noresponse_teardown(state); 882 stream_PROXY_over_TLS = false; 883 return r; 884 } 885 886 void 887 stream_noresponse(void **state ISC_ATTR_UNUSED) { 888 isc_result_t result = ISC_R_SUCCESS; 889 890 result = stream_listen(noop_accept_cb, NULL, 128, NULL, &listen_sock); 891 assert_int_equal(result, ISC_R_SUCCESS); 892 isc_loop_teardown(mainloop, stop_listening, listen_sock); 893 894 stream_connect(noresponse_connectcb, NULL, T_CONNECT); 895 } 896 897 int 898 stream_noresponse_teardown(void **state ISC_ATTR_UNUSED) { 899 X(cconnects); 900 X(csends); 901 X(creads); 902 X(sreads); 903 X(ssends); 904 905 atomic_assert_int_eq(cconnects, 1); 906 atomic_assert_int_eq(creads, 0); 907 atomic_assert_int_eq(sreads, 0); 908 atomic_assert_int_eq(ssends, 0); 909 910 return teardown_netmgr_test(state); 911 } 912 913 int 914 stream_timeout_recovery_setup(void **state ISC_ATTR_UNUSED) { 915 int r = setup_netmgr_test(state); 916 917 expected_ctimeouts = 4; 918 ctimeouts_shutdown = false; 919 920 expected_sreads = 5; 921 sreads_shutdown = true; 922 923 return r; 924 } 925 926 typedef struct proxy_addrs { 927 isc_sockaddr_t src_addr; 928 isc_sockaddr_t dst_addr; 929 } proxy_addrs_t; 930 931 static void 932 proxy2_handler_save_addrs_cb(const isc_result_t result, 933 const isc_proxy2_command_t cmd, const int socktype, 934 const isc_sockaddr_t *restrict src_addr, 935 const isc_sockaddr_t *restrict dst_addr, 936 const isc_region_t *restrict tlv_data, 937 const isc_region_t *restrict extra, void *cbarg) { 938 proxy_addrs_t *addrs = (proxy_addrs_t *)cbarg; 939 940 UNUSED(cmd); 941 UNUSED(socktype); 942 UNUSED(tlv_data); 943 UNUSED(extra); 944 945 REQUIRE(result == ISC_R_SUCCESS); 946 947 if (src_addr != NULL) { 948 addrs->src_addr = *src_addr; 949 } 950 951 if (dst_addr != NULL) { 952 addrs->dst_addr = *dst_addr; 953 } 954 } 955 956 void 957 proxy_verify_endpoints(isc_nmhandle_t *handle) { 958 isc_sockaddr_t local, peer; 959 peer = isc_nmhandle_peeraddr(handle); 960 local = isc_nmhandle_localaddr(handle); 961 962 if (isc_nm_is_proxy_unspec(handle)) { 963 isc_sockaddr_t real_local, real_peer; 964 real_peer = isc_nmhandle_real_peeraddr(handle); 965 real_local = isc_nmhandle_real_localaddr(handle); 966 967 assert_true(isc_sockaddr_equal(&peer, &real_peer)); 968 assert_true(isc_sockaddr_equal(&local, &real_local)); 969 } else if (proxy_info == NULL) { 970 assert_true(isc_sockaddr_equal(&peer, &proxy_src)); 971 assert_true(isc_sockaddr_equal(&local, &proxy_dst)); 972 } else if (proxy_info != NULL && !proxy_info->complete) { 973 assert_true(isc_sockaddr_equal( 974 &peer, &proxy_info->proxy_info.src_addr)); 975 assert_true(isc_sockaddr_equal( 976 &local, &proxy_info->proxy_info.dst_addr)); 977 } else if (proxy_info != NULL && proxy_info->complete) { 978 proxy_addrs_t addrs = { 0 }; 979 RUNTIME_CHECK(isc_proxy2_header_handle_directly( 980 &proxy_info->complete_header, 981 proxy2_handler_save_addrs_cb, 982 &addrs) == ISC_R_SUCCESS); 983 984 assert_true(isc_sockaddr_equal(&peer, &addrs.src_addr)); 985 assert_true(isc_sockaddr_equal(&local, &addrs.dst_addr)); 986 } 987 } 988 989 int 990 proxystream_timeout_recovery_setup(void **state) { 991 stream_use_PROXY = true; 992 return stream_timeout_recovery_setup(state); 993 } 994 995 int 996 proxystream_timeout_recovery_teardown(void **state) { 997 int r = stream_timeout_recovery_teardown(state); 998 stream_use_PROXY = false; 999 return r; 1000 } 1001 1002 int 1003 proxystreamtls_timeout_recovery_setup(void **state) { 1004 stream_PROXY_over_TLS = true; 1005 return proxystream_timeout_recovery_setup(state); 1006 } 1007 1008 int 1009 proxystreamtls_timeout_recovery_teardown(void **state) { 1010 int r = proxystream_timeout_recovery_teardown(state); 1011 stream_PROXY_over_TLS = false; 1012 return r; 1013 } 1014 1015 void 1016 stream_timeout_recovery(void **state ISC_ATTR_UNUSED) { 1017 isc_result_t result = ISC_R_SUCCESS; 1018 1019 /* 1020 * Accept connections but don't send responses, forcing client 1021 * reads to time out. 1022 */ 1023 noanswer = true; 1024 result = stream_listen(stream_accept_cb, NULL, 128, NULL, &listen_sock); 1025 assert_int_equal(result, ISC_R_SUCCESS); 1026 isc_loop_teardown(mainloop, stop_listening, listen_sock); 1027 1028 /* 1029 * Shorten all the client timeouts to 0.05 seconds. 1030 */ 1031 isc_nm_settimeouts(connect_nm, T_SOFT, T_SOFT, T_SOFT, T_SOFT); 1032 connect_readcb = timeout_retry_cb; 1033 stream_connect(connect_connect_cb, NULL, T_CONNECT); 1034 } 1035 1036 int 1037 stream_timeout_recovery_teardown(void **state ISC_ATTR_UNUSED) { 1038 atomic_assert_int_eq(ctimeouts, expected_ctimeouts); 1039 return teardown_netmgr_test(state); 1040 } 1041 1042 int 1043 stream_recv_one_setup(void **state ISC_ATTR_UNUSED) { 1044 int r = setup_netmgr_test(state); 1045 1046 expected_cconnects = 1; 1047 cconnects_shutdown = false; 1048 1049 expected_csends = 1; 1050 csends_shutdown = false; 1051 1052 expected_saccepts = 1; 1053 saccepts_shutdown = false; 1054 1055 expected_sreads = 1; 1056 sreads_shutdown = false; 1057 1058 expected_ssends = 1; 1059 ssends_shutdown = false; 1060 1061 expected_creads = 1; 1062 creads_shutdown = true; 1063 1064 return r; 1065 } 1066 1067 int 1068 proxystream_recv_one_setup(void **state) { 1069 stream_use_PROXY = true; 1070 return stream_recv_one_setup(state); 1071 } 1072 1073 int 1074 proxystream_recv_one_teardown(void **state) { 1075 int r = stream_recv_one_teardown(state); 1076 stream_use_PROXY = false; 1077 return r; 1078 } 1079 1080 int 1081 proxystreamtls_recv_one_setup(void **state) { 1082 stream_PROXY_over_TLS = true; 1083 return proxystream_recv_one_setup(state); 1084 } 1085 1086 int 1087 proxystreamtls_recv_one_teardown(void **state) { 1088 int r = proxystream_recv_one_teardown(state); 1089 stream_PROXY_over_TLS = false; 1090 return r; 1091 } 1092 1093 void 1094 stream_recv_one(void **state ISC_ATTR_UNUSED) { 1095 isc_result_t result = ISC_R_SUCCESS; 1096 isc_quota_t *quotap = tcp_listener_init_quota(1); 1097 1098 atomic_store(&nsends, 1); 1099 1100 result = stream_listen(stream_accept_cb, NULL, 128, quotap, 1101 &listen_sock); 1102 assert_int_equal(result, ISC_R_SUCCESS); 1103 isc_loop_teardown(mainloop, stop_listening, listen_sock); 1104 1105 stream_connect(connect_connect_cb, NULL, T_CONNECT); 1106 } 1107 1108 int 1109 stream_recv_one_teardown(void **state ISC_ATTR_UNUSED) { 1110 atomic_assert_int_eq(cconnects, expected_cconnects); 1111 atomic_assert_int_eq(csends, expected_csends); 1112 atomic_assert_int_eq(saccepts, expected_saccepts); 1113 atomic_assert_int_eq(sreads, expected_sreads); 1114 atomic_assert_int_eq(ssends, expected_ssends); 1115 atomic_assert_int_eq(creads, expected_creads); 1116 1117 return teardown_netmgr_test(state); 1118 } 1119 1120 int 1121 stream_recv_two_setup(void **state ISC_ATTR_UNUSED) { 1122 int r = setup_netmgr_test(state); 1123 1124 expected_cconnects = 2; 1125 cconnects_shutdown = false; 1126 1127 expected_csends = 2; 1128 csends_shutdown = false; 1129 1130 expected_saccepts = 2; 1131 saccepts_shutdown = false; 1132 1133 expected_sreads = 2; 1134 sreads_shutdown = false; 1135 1136 expected_ssends = 2; 1137 ssends_shutdown = false; 1138 1139 expected_creads = 2; 1140 creads_shutdown = true; 1141 1142 return r; 1143 } 1144 1145 int 1146 proxystream_recv_two_setup(void **state) { 1147 stream_use_PROXY = true; 1148 return stream_recv_two_setup(state); 1149 } 1150 1151 int 1152 proxystream_recv_two_teardown(void **state) { 1153 int r = stream_recv_two_teardown(state); 1154 stream_use_PROXY = false; 1155 return r; 1156 } 1157 1158 int 1159 proxystreamtls_recv_two_setup(void **state) { 1160 stream_PROXY_over_TLS = true; 1161 return proxystream_recv_two_setup(state); 1162 } 1163 1164 int 1165 proxystreamtls_recv_two_teardown(void **state) { 1166 int r = proxystream_recv_two_teardown(state); 1167 stream_PROXY_over_TLS = false; 1168 return r; 1169 } 1170 1171 void 1172 stream_recv_two(void **state ISC_ATTR_UNUSED) { 1173 isc_result_t result = ISC_R_SUCCESS; 1174 isc_quota_t *quotap = tcp_listener_init_quota(1); 1175 1176 atomic_store(&nsends, 2); 1177 1178 result = stream_listen(stream_accept_cb, NULL, 128, quotap, 1179 &listen_sock); 1180 assert_int_equal(result, ISC_R_SUCCESS); 1181 isc_loop_teardown(mainloop, stop_listening, listen_sock); 1182 1183 stream_connect(connect_connect_cb, NULL, T_CONNECT); 1184 1185 stream_connect(connect_connect_cb, NULL, T_CONNECT); 1186 } 1187 1188 int 1189 stream_recv_two_teardown(void **state ISC_ATTR_UNUSED) { 1190 atomic_assert_int_eq(cconnects, expected_cconnects); 1191 atomic_assert_int_eq(csends, expected_csends); 1192 atomic_assert_int_eq(sreads, expected_saccepts); 1193 atomic_assert_int_eq(sreads, expected_sreads); 1194 atomic_assert_int_eq(ssends, expected_ssends); 1195 atomic_assert_int_eq(creads, expected_creads); 1196 1197 return teardown_netmgr_test(state); 1198 } 1199 1200 int 1201 stream_recv_send_setup(void **state ISC_ATTR_UNUSED) { 1202 int r = setup_netmgr_test(state); 1203 expected_cconnects = workers; 1204 cconnects_shutdown = false; 1205 nsends = expected_creads = workers; 1206 do_send = true; 1207 1208 return r; 1209 } 1210 1211 int 1212 proxystream_recv_send_setup(void **state) { 1213 stream_use_PROXY = true; 1214 return stream_recv_send_setup(state); 1215 } 1216 1217 int 1218 proxystream_recv_send_teardown(void **state) { 1219 int r = stream_recv_send_teardown(state); 1220 stream_use_PROXY = false; 1221 return r; 1222 } 1223 1224 int 1225 proxystreamtls_recv_send_setup(void **state) { 1226 stream_PROXY_over_TLS = true; 1227 return proxystream_recv_send_setup(state); 1228 } 1229 1230 int 1231 proxystreamtls_recv_send_teardown(void **state) { 1232 int r = proxystream_recv_send_teardown(state); 1233 stream_PROXY_over_TLS = false; 1234 return r; 1235 } 1236 1237 void 1238 stream_recv_send(void **state ISC_ATTR_UNUSED) { 1239 isc_result_t result = ISC_R_SUCCESS; 1240 isc_quota_t *quotap = tcp_listener_init_quota(workers); 1241 1242 result = stream_listen(stream_accept_cb, NULL, 128, quotap, 1243 &listen_sock); 1244 assert_int_equal(result, ISC_R_SUCCESS); 1245 isc_loop_teardown(mainloop, stop_listening, listen_sock); 1246 1247 for (size_t i = 0; i < workers; i++) { 1248 isc_async_run(isc_loop_get(loopmgr, i), 1249 stream_recv_send_connect, 1250 get_stream_connect_function()); 1251 } 1252 } 1253 1254 int 1255 stream_recv_send_teardown(void **state ISC_ATTR_UNUSED) { 1256 X(cconnects); 1257 X(csends); 1258 X(creads); 1259 X(sreads); 1260 X(ssends); 1261 1262 CHECK_RANGE_FULL(csends); 1263 CHECK_RANGE_FULL(creads); 1264 CHECK_RANGE_FULL(sreads); 1265 CHECK_RANGE_FULL(ssends); 1266 1267 return teardown_netmgr_test(state); 1268 } 1269 1270 int 1271 setup_udp_test(void **state) { 1272 setup_loopmgr(state); 1273 setup_netmgr(state); 1274 1275 udp_connect_addr = (isc_sockaddr_t){ .length = 0 }; 1276 isc_sockaddr_fromin6(&udp_connect_addr, &in6addr_loopback, 0); 1277 1278 udp_listen_addr = (isc_sockaddr_t){ .length = 0 }; 1279 isc_sockaddr_fromin6(&udp_listen_addr, &in6addr_loopback, 1280 udp_use_PROXY ? PROXYUDP_TEST_PORT 1281 : UDP_TEST_PORT); 1282 1283 atomic_store(&sreads, 0); 1284 atomic_store(&ssends, 0); 1285 1286 atomic_store(&cconnects, 0); 1287 atomic_store(&csends, 0); 1288 atomic_store(&creads, 0); 1289 atomic_store(&ctimeouts, 0); 1290 1291 isc_refcount_init(&active_cconnects, 0); 1292 isc_refcount_init(&active_csends, 0); 1293 isc_refcount_init(&active_creads, 0); 1294 isc_refcount_init(&active_ssends, 0); 1295 isc_refcount_init(&active_sreads, 0); 1296 1297 expected_cconnects = -1; 1298 expected_csends = -1; 1299 expected_creads = -1; 1300 expected_sreads = -1; 1301 expected_ssends = -1; 1302 expected_ctimeouts = -1; 1303 1304 ssends_shutdown = true; 1305 sreads_shutdown = true; 1306 csends_shutdown = true; 1307 cconnects_shutdown = true; 1308 creads_shutdown = true; 1309 1310 isc_nonce_buf(&send_magic, sizeof(send_magic)); 1311 1312 connect_readcb = connect_read_cb; 1313 1314 return 0; 1315 } 1316 1317 int 1318 teardown_udp_test(void **state) { 1319 UNUSED(state); 1320 1321 isc_refcount_destroy(&active_cconnects); 1322 isc_refcount_destroy(&active_csends); 1323 isc_refcount_destroy(&active_creads); 1324 isc_refcount_destroy(&active_ssends); 1325 isc_refcount_destroy(&active_sreads); 1326 1327 teardown_netmgr(state); 1328 teardown_loopmgr(state); 1329 1330 return 0; 1331 } 1332 1333 static void 1334 udp_connect(isc_nm_cb_t cb, void *cbarg, unsigned int timeout) { 1335 if (udp_use_PROXY) { 1336 isc_nm_proxyudpconnect(netmgr, &udp_connect_addr, 1337 &udp_listen_addr, cb, cbarg, timeout, 1338 NULL); 1339 } else { 1340 isc_nm_udpconnect(netmgr, &udp_connect_addr, &udp_listen_addr, 1341 cb, cbarg, timeout); 1342 } 1343 } 1344 1345 static void 1346 udp_listen_read_cb(isc_nmhandle_t *handle, isc_result_t eresult, 1347 isc_region_t *region, void *cbarg) { 1348 if (eresult != ISC_R_SUCCESS) { 1349 isc_refcount_increment0(&active_sreads); 1350 } 1351 listen_read_cb(handle, eresult, region, cbarg); 1352 } 1353 1354 static void 1355 udp_start_listening(uint32_t nworkers, isc_nm_recv_cb_t cb) { 1356 isc_result_t result; 1357 1358 if (udp_use_PROXY) { 1359 result = isc_nm_listenproxyudp(netmgr, nworkers, 1360 &udp_listen_addr, cb, NULL, 1361 &listen_sock); 1362 } else { 1363 result = isc_nm_listenudp(netmgr, nworkers, &udp_listen_addr, 1364 cb, NULL, &listen_sock); 1365 } 1366 1367 assert_int_equal(result, ISC_R_SUCCESS); 1368 1369 isc_loop_teardown(mainloop, stop_listening, listen_sock); 1370 } 1371 1372 static void 1373 udp__send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) { 1374 isc_nmhandle_t *sendhandle = handle; 1375 1376 assert_non_null(sendhandle); 1377 1378 F(); 1379 1380 switch (eresult) { 1381 case ISC_R_SUCCESS: 1382 if (have_expected_csends(atomic_fetch_add(&csends, 1) + 1)) { 1383 if (csends_shutdown) { 1384 isc_nm_cancelread(handle); 1385 isc_loopmgr_shutdown(loopmgr); 1386 } 1387 } 1388 break; 1389 case ISC_R_SHUTTINGDOWN: 1390 case ISC_R_CANCELED: 1391 break; 1392 default: 1393 fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle, 1394 isc_result_totext(eresult), cbarg); 1395 assert_int_equal(eresult, ISC_R_SUCCESS); 1396 } 1397 1398 isc_nmhandle_detach(&sendhandle); 1399 isc_refcount_decrement(&active_csends); 1400 } 1401 1402 static void 1403 udp__connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg); 1404 1405 static void 1406 udp_enqueue_connect(void *arg ISC_ATTR_UNUSED) { 1407 isc_sockaddr_t connect_addr; 1408 1409 connect_addr = (isc_sockaddr_t){ .length = 0 }; 1410 isc_sockaddr_fromin6(&connect_addr, &in6addr_loopback, 0); 1411 1412 isc_refcount_increment0(&active_cconnects); 1413 1414 udp_connect(udp__connect_cb, NULL, T_CONNECT); 1415 } 1416 1417 static void 1418 udp__connect_read_cb(isc_nmhandle_t *handle, isc_result_t eresult, 1419 isc_region_t *region, void *cbarg) { 1420 uint64_t magic = 0; 1421 1422 assert_non_null(handle); 1423 1424 F(); 1425 1426 switch (eresult) { 1427 case ISC_R_TIMEDOUT: 1428 /* 1429 * We are operating on the localhost, UDP cannot get lost, but 1430 * it could be delayed, so we read again until we get the 1431 * answer. 1432 */ 1433 isc_nm_read(handle, connect_readcb, cbarg); 1434 return; 1435 case ISC_R_SUCCESS: 1436 assert_true(region->length >= sizeof(magic)); 1437 1438 memmove(&magic, region->base, sizeof(magic)); 1439 1440 assert_true(magic == send_magic); 1441 1442 if (have_expected_creads(atomic_fetch_add(&creads, 1) + 1)) { 1443 do_creads_shutdown(loopmgr); 1444 } 1445 1446 if (magic == send_magic && allow_send_back) { 1447 connect_send(handle); 1448 return; 1449 } 1450 1451 break; 1452 default: 1453 fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle, 1454 isc_result_totext(eresult), cbarg); 1455 assert_int_equal(eresult, ISC_R_SUCCESS); 1456 } 1457 1458 isc_refcount_decrement(&active_creads); 1459 1460 isc_nmhandle_detach(&handle); 1461 } 1462 1463 static void 1464 udp__connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) { 1465 isc_nmhandle_t *readhandle = NULL; 1466 isc_nmhandle_t *sendhandle = NULL; 1467 1468 F(); 1469 1470 isc_refcount_decrement(&active_cconnects); 1471 1472 switch (eresult) { 1473 case ISC_R_SUCCESS: 1474 if (udp_use_PROXY) { 1475 assert_true(isc_nm_is_proxy_handle(handle)); 1476 } 1477 1478 if (have_expected_cconnects(atomic_fetch_add(&cconnects, 1) + 1479 1)) 1480 { 1481 do_cconnects_shutdown(loopmgr); 1482 } else if (do_send) { 1483 isc_async_current(udp_enqueue_connect, cbarg); 1484 } 1485 1486 isc_refcount_increment0(&active_creads); 1487 isc_nmhandle_attach(handle, &readhandle); 1488 isc_nm_read(handle, connect_readcb, cbarg); 1489 1490 isc_refcount_increment0(&active_csends); 1491 isc_nmhandle_attach(handle, &sendhandle); 1492 isc_nmhandle_setwritetimeout(handle, T_IDLE); 1493 1494 isc_nm_send(sendhandle, (isc_region_t *)&send_msg, udp__send_cb, 1495 cbarg); 1496 1497 break; 1498 case ISC_R_ADDRINUSE: 1499 /* Try again */ 1500 udp_enqueue_connect(NULL); 1501 break; 1502 case ISC_R_SHUTTINGDOWN: 1503 case ISC_R_CANCELED: 1504 break; 1505 default: 1506 fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle, 1507 isc_result_totext(eresult), cbarg); 1508 assert_int_equal(eresult, ISC_R_SUCCESS); 1509 } 1510 } 1511 1512 int 1513 udp_noop_setup(void **state) { 1514 setup_udp_test(state); 1515 expected_cconnects = 1; 1516 cconnects_shutdown = true; 1517 return 0; 1518 } 1519 1520 int 1521 udp_noop_teardown(void **state) { 1522 atomic_assert_int_eq(cconnects, 1); 1523 teardown_udp_test(state); 1524 return 0; 1525 } 1526 1527 void 1528 udp_noop(void **arg ISC_ATTR_UNUSED) { 1529 /* isc_result_t result = ISC_R_SUCCESS; */ 1530 1531 /* result = isc_nm_listenudp(netmgr, ISC_NM_LISTEN_ALL, 1532 * &udp_listen_addr, */ 1533 /* mock_recv_cb, NULL, &listen_sock); */ 1534 /* assert_int_equal(result, ISC_R_SUCCESS); */ 1535 1536 /* isc_nm_stoplistening(listen_sock); */ 1537 /* isc_nmsocket_close(&listen_sock); */ 1538 /* assert_null(listen_sock); */ 1539 1540 isc_refcount_increment0(&active_cconnects); 1541 udp_connect(connect_success_cb, NULL, UDP_T_CONNECT); 1542 } 1543 1544 int 1545 proxyudp_noop_setup(void **state) { 1546 udp_use_PROXY = true; 1547 return udp_noop_setup(state); 1548 } 1549 1550 int 1551 proxyudp_noop_teardown(void **state) { 1552 int ret = udp_noop_teardown(state); 1553 udp_use_PROXY = false; 1554 return ret; 1555 } 1556 1557 static void 1558 udp_noresponse_recv_cb(isc_nmhandle_t *handle, isc_result_t eresult, 1559 isc_region_t *region, void *cbarg) { 1560 UNUSED(handle); 1561 UNUSED(eresult); 1562 UNUSED(region); 1563 UNUSED(cbarg); 1564 } 1565 1566 static void 1567 udp_noresponse_read_cb(isc_nmhandle_t *handle, isc_result_t eresult, 1568 isc_region_t *region, void *cbarg) { 1569 UNUSED(region); 1570 UNUSED(cbarg); 1571 1572 assert_int_equal(eresult, ISC_R_TIMEDOUT); 1573 1574 isc_refcount_decrement(&active_creads); 1575 1576 atomic_fetch_add(&creads, 1); 1577 1578 isc_nmhandle_detach(&handle); 1579 1580 isc_loopmgr_shutdown(loopmgr); 1581 } 1582 1583 static void 1584 udp_noresponse_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, 1585 void *cbarg) { 1586 UNUSED(cbarg); 1587 1588 assert_non_null(handle); 1589 assert_int_equal(eresult, ISC_R_SUCCESS); 1590 atomic_fetch_add(&csends, 1); 1591 isc_nmhandle_detach(&handle); 1592 isc_refcount_decrement(&active_csends); 1593 } 1594 1595 static void 1596 udp_noresponse_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, 1597 void *cbarg) { 1598 isc_nmhandle_t *readhandle = NULL; 1599 isc_nmhandle_t *sendhandle = NULL; 1600 1601 isc_refcount_decrement(&active_cconnects); 1602 1603 assert_int_equal(eresult, ISC_R_SUCCESS); 1604 1605 /* Read */ 1606 isc_refcount_increment0(&active_creads); 1607 isc_nmhandle_attach(handle, &readhandle); 1608 isc_nm_read(handle, udp_noresponse_read_cb, cbarg); 1609 1610 /* Send */ 1611 isc_refcount_increment0(&active_csends); 1612 isc_nmhandle_attach(handle, &sendhandle); 1613 isc_nmhandle_setwritetimeout(handle, T_IDLE); 1614 1615 isc_nm_send(sendhandle, (isc_region_t *)&send_msg, 1616 udp_noresponse_send_cb, cbarg); 1617 1618 atomic_fetch_add(&cconnects, 1); 1619 } 1620 1621 int 1622 udp_noresponse_setup(void **state) { 1623 setup_udp_test(state); 1624 expected_csends = 1; 1625 return 0; 1626 } 1627 1628 int 1629 udp_noresponse_teardown(void **state) { 1630 atomic_assert_int_eq(csends, expected_csends); 1631 teardown_udp_test(state); 1632 return 0; 1633 } 1634 1635 void 1636 udp_noresponse(void **arg ISC_ATTR_UNUSED) { 1637 udp_start_listening(ISC_NM_LISTEN_ONE, udp_noresponse_recv_cb); 1638 1639 isc_refcount_increment0(&active_cconnects); 1640 udp_connect(udp_noresponse_connect_cb, listen_sock, UDP_T_SOFT); 1641 } 1642 1643 int 1644 proxyudp_noresponse_setup(void **state) { 1645 udp_use_PROXY = true; 1646 return udp_noresponse_setup(state); 1647 } 1648 1649 int 1650 proxyudp_noresponse_teardown(void **state) { 1651 int ret = udp_noresponse_teardown(state); 1652 udp_use_PROXY = false; 1653 return ret; 1654 } 1655 1656 static void 1657 udp_timeout_recovery_ssend_cb(isc_nmhandle_t *handle, isc_result_t eresult, 1658 void *cbarg) { 1659 UNUSED(cbarg); 1660 1661 isc_refcount_decrement(&active_ssends); 1662 assert_non_null(handle); 1663 assert_int_equal(eresult, ISC_R_SUCCESS); 1664 atomic_fetch_add(&ssends, 1); 1665 isc_nmhandle_detach(&handle); 1666 } 1667 1668 static void 1669 udp_timeout_recovery_recv_cb(isc_nmhandle_t *handle, isc_result_t eresult, 1670 isc_region_t *region, void *cbarg) { 1671 uint64_t magic = 0; 1672 isc_nmhandle_t *sendhandle = NULL; 1673 int _creads = atomic_fetch_add(&creads, 1) + 1; 1674 1675 assert_non_null(handle); 1676 1677 assert_int_equal(eresult, ISC_R_SUCCESS); 1678 1679 assert_true(region->length == sizeof(magic)); 1680 1681 memmove(&magic, region->base, sizeof(magic)); 1682 assert_true(magic == send_magic); 1683 assert_true(_creads < 6); 1684 1685 if (_creads == 5) { 1686 isc_nmhandle_attach(handle, &sendhandle); 1687 isc_refcount_increment0(&active_ssends); 1688 isc_nmhandle_setwritetimeout(sendhandle, T_IDLE); 1689 isc_nm_send(sendhandle, (isc_region_t *)&send_msg, 1690 udp_timeout_recovery_ssend_cb, cbarg); 1691 } 1692 } 1693 1694 static void 1695 udp_timeout_recovery_read_cb(isc_nmhandle_t *handle, isc_result_t eresult, 1696 isc_region_t *region, void *cbarg) { 1697 UNUSED(region); 1698 UNUSED(cbarg); 1699 1700 assert_non_null(handle); 1701 1702 F(); 1703 1704 if (eresult == ISC_R_TIMEDOUT && 1705 atomic_fetch_add(&ctimeouts, 1) + 1 < expected_ctimeouts) 1706 { 1707 isc_nmhandle_settimeout(handle, T_SOFT); 1708 return; 1709 } 1710 1711 isc_refcount_decrement(&active_creads); 1712 isc_nmhandle_detach(&handle); 1713 1714 atomic_fetch_add(&creads, 1); 1715 isc_loopmgr_shutdown(loopmgr); 1716 } 1717 1718 static void 1719 udp_timeout_recovery_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, 1720 void *cbarg) { 1721 UNUSED(cbarg); 1722 1723 assert_non_null(handle); 1724 assert_int_equal(eresult, ISC_R_SUCCESS); 1725 atomic_fetch_add(&csends, 1); 1726 1727 isc_nmhandle_detach(&handle); 1728 isc_refcount_decrement(&active_csends); 1729 } 1730 1731 static void 1732 udp_timeout_recovery_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, 1733 void *cbarg) { 1734 isc_nmhandle_t *readhandle = NULL; 1735 isc_nmhandle_t *sendhandle = NULL; 1736 1737 F(); 1738 1739 isc_refcount_decrement(&active_cconnects); 1740 1741 assert_int_equal(eresult, ISC_R_SUCCESS); 1742 1743 /* Read */ 1744 isc_refcount_increment0(&active_creads); 1745 isc_nmhandle_attach(handle, &readhandle); 1746 isc_nm_read(handle, udp_timeout_recovery_read_cb, cbarg); 1747 1748 /* Send */ 1749 isc_refcount_increment0(&active_csends); 1750 isc_nmhandle_attach(handle, &sendhandle); 1751 isc_nmhandle_setwritetimeout(handle, T_IDLE); 1752 isc_nm_send(sendhandle, (isc_region_t *)&send_msg, 1753 udp_timeout_recovery_send_cb, cbarg); 1754 1755 atomic_fetch_add(&cconnects, 1); 1756 } 1757 1758 int 1759 udp_timeout_recovery_setup(void **state) { 1760 setup_udp_test(state); 1761 expected_cconnects = 1; 1762 expected_csends = 1; 1763 expected_creads = 1; 1764 expected_ctimeouts = 4; 1765 return 0; 1766 } 1767 1768 int 1769 udp_timeout_recovery_teardown(void **state) { 1770 atomic_assert_int_eq(cconnects, expected_cconnects); 1771 atomic_assert_int_eq(csends, expected_csends); 1772 atomic_assert_int_eq(csends, expected_creads); 1773 atomic_assert_int_eq(ctimeouts, expected_ctimeouts); 1774 teardown_udp_test(state); 1775 return 0; 1776 } 1777 1778 void 1779 udp_timeout_recovery(void **arg ISC_ATTR_UNUSED) { 1780 /* 1781 * Listen using the noop callback so that client reads will time out. 1782 */ 1783 udp_start_listening(ISC_NM_LISTEN_ONE, udp_timeout_recovery_recv_cb); 1784 1785 /* 1786 * Connect with client timeout set to 0.05 seconds, then sleep for at 1787 * least a second for each 'tick'. timeout_retry_cb() will give up 1788 * after five timeouts. 1789 */ 1790 isc_refcount_increment0(&active_cconnects); 1791 udp_connect(udp_timeout_recovery_connect_cb, listen_sock, UDP_T_SOFT); 1792 } 1793 1794 int 1795 proxyudp_timeout_recovery_setup(void **state) { 1796 udp_use_PROXY = true; 1797 return udp_timeout_recovery_setup(state); 1798 } 1799 1800 int 1801 proxyudp_timeout_recovery_teardown(void **state) { 1802 int ret = udp_timeout_recovery_teardown(state); 1803 udp_use_PROXY = false; 1804 return ret; 1805 } 1806 1807 static void 1808 udp_shutdown_connect_async_cb(void *arg ISC_ATTR_UNUSED); 1809 1810 static void 1811 udp_shutdown_connect_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, 1812 void *cbarg) { 1813 UNUSED(handle); 1814 UNUSED(cbarg); 1815 1816 isc_refcount_decrement(&active_cconnects); 1817 1818 /* 1819 * The first UDP connect is faster than asynchronous shutdown procedure, 1820 * restart the UDP connect again and expect the failure only in the 1821 * second loop. 1822 */ 1823 if (atomic_fetch_add(&cconnects, 1) == 0) { 1824 assert_int_equal(eresult, ISC_R_SUCCESS); 1825 isc_async_current(udp_shutdown_connect_async_cb, netmgr); 1826 } else { 1827 assert_int_equal(eresult, ISC_R_SHUTTINGDOWN); 1828 } 1829 } 1830 1831 static void 1832 udp_shutdown_connect_async_cb(void *arg ISC_ATTR_UNUSED) { 1833 isc_refcount_increment0(&active_cconnects); 1834 udp_connect(udp_shutdown_connect_connect_cb, NULL, T_SOFT); 1835 } 1836 1837 int 1838 udp_shutdown_connect_setup(void **state) { 1839 setup_udp_test(state); 1840 expected_cconnects = 2; 1841 return 0; 1842 } 1843 1844 int 1845 udp_shutdown_connect_teardown(void **state) { 1846 atomic_assert_int_eq(cconnects, expected_cconnects); 1847 teardown_udp_test(state); 1848 return 0; 1849 } 1850 1851 void 1852 udp_shutdown_connect(void **arg ISC_ATTR_UNUSED) { 1853 isc_loopmgr_shutdown(loopmgr); 1854 /* 1855 * isc_nm_udpconnect() is synchronous, so we need to launch this on the 1856 * async loop. 1857 */ 1858 isc_async_current(udp_shutdown_connect_async_cb, netmgr); 1859 } 1860 1861 int 1862 proxyudp_shutdown_connect_setup(void **state) { 1863 udp_use_PROXY = true; 1864 return udp_shutdown_connect_setup(state); 1865 } 1866 1867 int 1868 proxyudp_shutdown_connect_teardown(void **state) { 1869 int ret = udp_shutdown_connect_teardown(state); 1870 udp_use_PROXY = false; 1871 return ret; 1872 } 1873 1874 static void 1875 udp_shutdown_read_recv_cb(isc_nmhandle_t *handle, isc_result_t eresult, 1876 isc_region_t *region, void *cbarg) { 1877 uint64_t magic = 0; 1878 1879 UNUSED(cbarg); 1880 1881 assert_non_null(handle); 1882 1883 F(); 1884 1885 assert_int_equal(eresult, ISC_R_SUCCESS); 1886 1887 assert_true(region->length == sizeof(magic)); 1888 1889 memmove(&magic, region->base, sizeof(magic)); 1890 assert_true(magic == send_magic); 1891 } 1892 1893 static void 1894 udp_shutdown_read_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, 1895 void *cbarg) { 1896 UNUSED(cbarg); 1897 1898 F(); 1899 1900 assert_non_null(handle); 1901 assert_int_equal(eresult, ISC_R_SUCCESS); 1902 1903 atomic_fetch_add(&csends, 1); 1904 1905 isc_loopmgr_shutdown(loopmgr); 1906 1907 isc_nmhandle_detach(&handle); 1908 isc_refcount_decrement(&active_csends); 1909 } 1910 1911 static void 1912 udp_shutdown_read_read_cb(isc_nmhandle_t *handle, isc_result_t eresult, 1913 isc_region_t *region, void *cbarg) { 1914 UNUSED(region); 1915 UNUSED(cbarg); 1916 1917 assert_true(eresult == ISC_R_SHUTTINGDOWN || eresult == ISC_R_TIMEDOUT); 1918 1919 isc_refcount_decrement(&active_creads); 1920 1921 atomic_fetch_add(&creads, 1); 1922 1923 isc_nmhandle_detach(&handle); 1924 } 1925 1926 static void 1927 udp_shutdown_read_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, 1928 void *cbarg) { 1929 isc_nmhandle_t *readhandle = NULL; 1930 isc_nmhandle_t *sendhandle = NULL; 1931 1932 isc_refcount_decrement(&active_cconnects); 1933 1934 assert_int_equal(eresult, ISC_R_SUCCESS); 1935 1936 /* Read */ 1937 isc_refcount_increment0(&active_creads); 1938 isc_nmhandle_attach(handle, &readhandle); 1939 isc_nm_read(handle, udp_shutdown_read_read_cb, cbarg); 1940 assert_true(handle->sock->reading); 1941 1942 /* Send */ 1943 isc_refcount_increment0(&active_csends); 1944 isc_nmhandle_attach(handle, &sendhandle); 1945 isc_nmhandle_setwritetimeout(handle, T_IDLE); 1946 isc_nm_send(sendhandle, (isc_region_t *)&send_msg, 1947 udp_shutdown_read_send_cb, cbarg); 1948 1949 atomic_fetch_add(&cconnects, 1); 1950 } 1951 1952 int 1953 udp_shutdown_read_setup(void **state) { 1954 setup_udp_test(state); 1955 expected_cconnects = 1; 1956 expected_creads = 1; 1957 return 0; 1958 } 1959 1960 int 1961 udp_shutdown_read_teardown(void **state) { 1962 atomic_assert_int_eq(cconnects, expected_cconnects); 1963 atomic_assert_int_eq(creads, expected_creads); 1964 teardown_udp_test(state); 1965 return 0; 1966 } 1967 1968 void 1969 udp_shutdown_read(void **arg ISC_ATTR_UNUSED) { 1970 udp_start_listening(ISC_NM_LISTEN_ONE, udp_shutdown_read_recv_cb); 1971 1972 isc_refcount_increment0(&active_cconnects); 1973 udp_connect(udp_shutdown_read_connect_cb, NULL, UDP_T_SOFT); 1974 } 1975 1976 int 1977 proxyudp_shutdown_read_setup(void **state) { 1978 udp_use_PROXY = true; 1979 return udp_shutdown_read_setup(state); 1980 } 1981 1982 int 1983 proxyudp_shutdown_read_teardown(void **state) { 1984 int ret = udp_shutdown_read_teardown(state); 1985 udp_use_PROXY = false; 1986 return ret; 1987 } 1988 1989 static void 1990 udp_cancel_read_recv_cb(isc_nmhandle_t *handle, isc_result_t eresult, 1991 isc_region_t *region, void *cbarg) { 1992 uint64_t magic = 0; 1993 1994 UNUSED(cbarg); 1995 1996 assert_non_null(handle); 1997 1998 F(); 1999 2000 assert_int_equal(eresult, ISC_R_SUCCESS); 2001 2002 assert_true(region->length == sizeof(magic)); 2003 2004 memmove(&magic, region->base, sizeof(magic)); 2005 assert_true(magic == send_magic); 2006 } 2007 2008 static void 2009 udp_cancel_read_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, 2010 void *cbarg) { 2011 UNUSED(cbarg); 2012 2013 F(); 2014 2015 assert_non_null(handle); 2016 assert_int_equal(eresult, ISC_R_SUCCESS); 2017 2018 atomic_fetch_add(&csends, 1); 2019 2020 isc_nm_cancelread(handle); 2021 2022 isc_nmhandle_detach(&handle); 2023 isc_refcount_decrement(&active_csends); 2024 } 2025 2026 static void 2027 udp_cancel_read_read_cb(isc_nmhandle_t *handle, isc_result_t eresult, 2028 isc_region_t *region, void *cbarg) { 2029 isc_nmhandle_t *sendhandle = NULL; 2030 isc_nmhandle_t *readhandle = NULL; 2031 2032 UNUSED(region); 2033 2034 F(); 2035 2036 switch (eresult) { 2037 case ISC_R_TIMEDOUT: 2038 2039 /* Read again */ 2040 isc_refcount_increment0(&active_creads); 2041 isc_nmhandle_attach(handle, &readhandle); 2042 isc_nm_read(handle, udp_cancel_read_read_cb, cbarg); 2043 2044 /* Send only once */ 2045 if (isc_refcount_increment0(&active_csends) == 0) { 2046 isc_nmhandle_attach(handle, &sendhandle); 2047 isc_nmhandle_setwritetimeout(handle, T_IDLE); 2048 isc_nm_send(sendhandle, (isc_region_t *)&send_msg, 2049 udp_cancel_read_send_cb, cbarg); 2050 } 2051 break; 2052 case ISC_R_CANCELED: 2053 /* The read has been canceled */ 2054 atomic_fetch_add(&creads, 1); 2055 isc_loopmgr_shutdown(loopmgr); 2056 break; 2057 default: 2058 UNREACHABLE(); 2059 } 2060 2061 isc_refcount_decrement(&active_creads); 2062 2063 isc_nmhandle_detach(&handle); 2064 } 2065 2066 static void 2067 udp_cancel_read_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, 2068 void *cbarg) { 2069 isc_nmhandle_t *readhandle = NULL; 2070 2071 isc_refcount_decrement(&active_cconnects); 2072 2073 assert_int_equal(eresult, ISC_R_SUCCESS); 2074 2075 isc_refcount_increment0(&active_creads); 2076 isc_nmhandle_attach(handle, &readhandle); 2077 isc_nm_read(handle, udp_cancel_read_read_cb, cbarg); 2078 2079 atomic_fetch_add(&cconnects, 1); 2080 } 2081 2082 int 2083 udp_cancel_read_setup(void **state) { 2084 setup_udp_test(state); 2085 expected_cconnects = 1; 2086 expected_creads = 1; 2087 return 0; 2088 } 2089 2090 int 2091 udp_cancel_read_teardown(void **state) { 2092 atomic_assert_int_eq(cconnects, expected_cconnects); 2093 atomic_assert_int_eq(creads, expected_creads); 2094 teardown_udp_test(state); 2095 return 0; 2096 } 2097 2098 void 2099 udp_cancel_read(void **arg ISC_ATTR_UNUSED) { 2100 udp_start_listening(ISC_NM_LISTEN_ONE, udp_cancel_read_recv_cb); 2101 2102 isc_refcount_increment0(&active_cconnects); 2103 udp_connect(udp_cancel_read_connect_cb, NULL, UDP_T_SOFT); 2104 } 2105 2106 int 2107 proxyudp_cancel_read_setup(void **state) { 2108 udp_use_PROXY = true; 2109 return udp_cancel_read_setup(state); 2110 } 2111 2112 int 2113 proxyudp_cancel_read_teardown(void **state) { 2114 int ret = udp_cancel_read_teardown(state); 2115 udp_use_PROXY = false; 2116 return ret; 2117 } 2118 2119 int 2120 udp_recv_one_setup(void **state) { 2121 setup_udp_test(state); 2122 2123 connect_readcb = udp__connect_read_cb; 2124 2125 expected_cconnects = 1; 2126 cconnects_shutdown = false; 2127 2128 expected_csends = 1; 2129 csends_shutdown = false; 2130 2131 expected_sreads = 1; 2132 sreads_shutdown = false; 2133 2134 expected_ssends = 1; 2135 ssends_shutdown = false; 2136 2137 expected_creads = 1; 2138 creads_shutdown = true; 2139 2140 return 0; 2141 } 2142 2143 int 2144 udp_recv_one_teardown(void **state) { 2145 atomic_assert_int_eq(cconnects, expected_cconnects); 2146 atomic_assert_int_eq(csends, expected_csends); 2147 atomic_assert_int_eq(sreads, expected_sreads); 2148 atomic_assert_int_eq(ssends, expected_ssends); 2149 atomic_assert_int_eq(creads, expected_creads); 2150 2151 teardown_udp_test(state); 2152 2153 return 0; 2154 } 2155 2156 void 2157 udp_recv_one(void **arg ISC_ATTR_UNUSED) { 2158 udp_start_listening(ISC_NM_LISTEN_ONE, udp_listen_read_cb); 2159 2160 udp_enqueue_connect(NULL); 2161 } 2162 2163 int 2164 proxyudp_recv_one_setup(void **state) { 2165 udp_use_PROXY = true; 2166 return udp_recv_one_setup(state); 2167 } 2168 2169 int 2170 proxyudp_recv_one_teardown(void **state) { 2171 int ret = udp_recv_one_teardown(state); 2172 udp_use_PROXY = false; 2173 return ret; 2174 } 2175 2176 int 2177 udp_recv_two_setup(void **state) { 2178 setup_udp_test(state); 2179 2180 connect_readcb = udp__connect_read_cb; 2181 2182 expected_cconnects = 2; 2183 cconnects_shutdown = false; 2184 2185 expected_csends = 2; 2186 csends_shutdown = false; 2187 2188 expected_sreads = 2; 2189 sreads_shutdown = false; 2190 2191 expected_ssends = 2; 2192 ssends_shutdown = false; 2193 2194 expected_creads = 2; 2195 creads_shutdown = true; 2196 2197 return 0; 2198 } 2199 2200 int 2201 udp_recv_two_teardown(void **state) { 2202 atomic_assert_int_eq(cconnects, expected_cconnects); 2203 atomic_assert_int_eq(csends, expected_csends); 2204 atomic_assert_int_eq(sreads, expected_sreads); 2205 atomic_assert_int_eq(ssends, expected_ssends); 2206 atomic_assert_int_eq(creads, expected_creads); 2207 2208 teardown_udp_test(state); 2209 return 0; 2210 } 2211 2212 void 2213 udp_recv_two(void **arg ISC_ATTR_UNUSED) { 2214 udp_start_listening(ISC_NM_LISTEN_ONE, udp_listen_read_cb); 2215 2216 udp_enqueue_connect(NULL); 2217 udp_enqueue_connect(NULL); 2218 } 2219 2220 int 2221 proxyudp_recv_two_setup(void **state) { 2222 udp_use_PROXY = true; 2223 return udp_recv_two_setup(state); 2224 } 2225 2226 int 2227 proxyudp_recv_two_teardown(void **state) { 2228 int ret = udp_recv_two_teardown(state); 2229 udp_use_PROXY = false; 2230 return ret; 2231 } 2232 2233 int 2234 udp_recv_send_setup(void **state) { 2235 setup_udp_test(state); 2236 2237 /* Allow some leeway (+1) as datagram service is unreliable */ 2238 expected_cconnects = (workers + 1) * NSENDS; 2239 cconnects_shutdown = false; 2240 2241 expected_creads = workers * NSENDS; 2242 do_send = true; 2243 2244 return 0; 2245 } 2246 2247 int 2248 udp_recv_send_teardown(void **state) { 2249 atomic_assert_int_ge(cconnects, expected_creads); 2250 atomic_assert_int_ge(csends, expected_creads); 2251 atomic_assert_int_ge(sreads, expected_creads); 2252 atomic_assert_int_ge(ssends, expected_creads); 2253 atomic_assert_int_ge(creads, expected_creads); 2254 2255 teardown_udp_test(state); 2256 return 0; 2257 } 2258 2259 void 2260 udp_recv_send(void **arg ISC_ATTR_UNUSED) { 2261 udp_start_listening(ISC_NM_LISTEN_ALL, udp_listen_read_cb); 2262 2263 for (size_t i = 0; i < workers; i++) { 2264 isc_async_run(isc_loop_get(loopmgr, i), udp_enqueue_connect, 2265 NULL); 2266 } 2267 } 2268 2269 int 2270 proxyudp_recv_send_setup(void **state) { 2271 udp_use_PROXY = true; 2272 return udp_recv_send_setup(state); 2273 } 2274 2275 int 2276 proxyudp_recv_send_teardown(void **state) { 2277 int ret = udp_recv_send_teardown(state); 2278 udp_use_PROXY = false; 2279 return ret; 2280 } 2281 2282 static void 2283 udp_double_read_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, 2284 void *cbarg) { 2285 assert_non_null(handle); 2286 2287 F(); 2288 2289 isc_refcount_decrement(&active_ssends); 2290 2291 switch (eresult) { 2292 case ISC_R_SUCCESS: 2293 if (have_expected_ssends(atomic_fetch_add(&ssends, 1) + 1)) { 2294 do_ssends_shutdown(loopmgr); 2295 } else { 2296 isc_nmhandle_t *sendhandle = NULL; 2297 isc_nmhandle_attach(handle, &sendhandle); 2298 isc_nmhandle_setwritetimeout(sendhandle, T_IDLE); 2299 isc_refcount_increment0(&active_ssends); 2300 isc_nm_send(sendhandle, &send_msg, 2301 udp_double_read_send_cb, cbarg); 2302 break; 2303 } 2304 break; 2305 case ISC_R_CANCELED: 2306 break; 2307 default: 2308 fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle, 2309 isc_result_totext(eresult), cbarg); 2310 assert_int_equal(eresult, ISC_R_SUCCESS); 2311 } 2312 2313 isc_nmhandle_detach(&handle); 2314 } 2315 2316 static void 2317 udp_double_read_listen_cb(isc_nmhandle_t *handle, isc_result_t eresult, 2318 isc_region_t *region, void *cbarg) { 2319 uint64_t magic = 0; 2320 2321 assert_non_null(handle); 2322 2323 F(); 2324 2325 switch (eresult) { 2326 case ISC_R_EOF: 2327 case ISC_R_SHUTTINGDOWN: 2328 case ISC_R_CANCELED: 2329 break; 2330 case ISC_R_SUCCESS: 2331 memmove(&magic, region->base, sizeof(magic)); 2332 assert_true(magic == send_magic); 2333 2334 assert_true(region->length >= sizeof(magic)); 2335 2336 memmove(&magic, region->base, sizeof(magic)); 2337 assert_true(magic == send_magic); 2338 2339 isc_nmhandle_t *sendhandle = NULL; 2340 isc_nmhandle_attach(handle, &sendhandle); 2341 isc_nmhandle_setwritetimeout(sendhandle, T_IDLE); 2342 isc_refcount_increment0(&active_ssends); 2343 isc_nm_send(sendhandle, &send_msg, udp_double_read_send_cb, 2344 cbarg); 2345 return; 2346 default: 2347 fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle, 2348 isc_result_totext(eresult), cbarg); 2349 assert_int_equal(eresult, ISC_R_SUCCESS); 2350 } 2351 2352 isc_refcount_decrement(&active_sreads); 2353 2354 isc_nmhandle_detach(&handle); 2355 } 2356 2357 static void 2358 udp_double_read_cb(isc_nmhandle_t *handle, isc_result_t eresult, 2359 isc_region_t *region, void *cbarg) { 2360 uint64_t magic = 0; 2361 bool detach = false; 2362 2363 assert_non_null(handle); 2364 2365 F(); 2366 2367 switch (eresult) { 2368 case ISC_R_TIMEDOUT: 2369 /* 2370 * We are operating on the localhost, UDP cannot get lost, but 2371 * it could be delayed, so we read again until we get the 2372 * answer. 2373 */ 2374 detach = false; 2375 break; 2376 case ISC_R_SUCCESS: 2377 assert_true(region->length >= sizeof(magic)); 2378 2379 memmove(&magic, region->base, sizeof(magic)); 2380 2381 assert_true(magic == send_magic); 2382 2383 if (have_expected_creads(atomic_fetch_add(&creads, 1) + 1)) { 2384 do_creads_shutdown(loopmgr); 2385 detach = true; 2386 } 2387 2388 if (magic == send_magic && allow_send_back) { 2389 connect_send(handle); 2390 return; 2391 } 2392 2393 break; 2394 case ISC_R_EOF: 2395 case ISC_R_SHUTTINGDOWN: 2396 case ISC_R_CANCELED: 2397 case ISC_R_CONNECTIONRESET: 2398 detach = true; 2399 break; 2400 default: 2401 fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle, 2402 isc_result_totext(eresult), cbarg); 2403 assert_int_equal(eresult, ISC_R_SUCCESS); 2404 } 2405 2406 if (detach) { 2407 isc_refcount_decrement(&active_creads); 2408 isc_nmhandle_detach(&handle); 2409 } else { 2410 isc_nm_read(handle, connect_readcb, cbarg); 2411 } 2412 } 2413 2414 int 2415 udp_double_read_setup(void **state) { 2416 setup_udp_test(state); 2417 2418 expected_cconnects = 1; 2419 cconnects_shutdown = false; 2420 2421 expected_csends = 1; 2422 csends_shutdown = false; 2423 2424 expected_sreads = 1; 2425 sreads_shutdown = false; 2426 2427 expected_ssends = 2; 2428 ssends_shutdown = false; 2429 expected_creads = 2; 2430 creads_shutdown = true; 2431 2432 connect_readcb = udp_double_read_cb; 2433 2434 return 0; 2435 } 2436 2437 int 2438 udp_double_read_teardown(void **state) { 2439 atomic_assert_int_eq(creads, expected_creads); 2440 2441 teardown_udp_test(state); 2442 2443 return 0; 2444 } 2445 2446 void 2447 udp_double_read(void **arg ISC_ATTR_UNUSED) { 2448 udp_start_listening(ISC_NM_LISTEN_ALL, udp_double_read_listen_cb); 2449 2450 udp_enqueue_connect(NULL); 2451 } 2452 2453 int 2454 proxyudp_double_read_setup(void **state) { 2455 udp_use_PROXY = true; 2456 return udp_double_read_setup(state); 2457 } 2458 2459 int 2460 proxyudp_double_read_teardown(void **state) { 2461 int ret = udp_double_read_teardown(state); 2462 udp_use_PROXY = false; 2463 return ret; 2464 } 2465