1 /* $NetBSD: upstream.c,v 1.3 2025/09/05 21:16:24 christos Exp $ */ 2 3 /* $OpenLDAP$ */ 4 /* This work is part of OpenLDAP Software <http://www.openldap.org/>. 5 * 6 * Copyright 1998-2024 The OpenLDAP Foundation. 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted only as authorized by the OpenLDAP 11 * Public License. 12 * 13 * A copy of this license is available in the file LICENSE in the 14 * top-level directory of the distribution or, alternatively, at 15 * <http://www.OpenLDAP.org/license.html>. 16 */ 17 18 #include <sys/cdefs.h> 19 __RCSID("$NetBSD: upstream.c,v 1.3 2025/09/05 21:16:24 christos Exp $"); 20 21 #include "portable.h" 22 23 #include <ac/socket.h> 24 #include <ac/errno.h> 25 #include <ac/string.h> 26 #include <ac/time.h> 27 #include <ac/unistd.h> 28 29 #include "lload.h" 30 31 #include "lutil.h" 32 #include "lutil_ldap.h" 33 34 #ifdef HAVE_CYRUS_SASL 35 static const sasl_callback_t client_callbacks[] = { 36 #ifdef SASL_CB_GETREALM 37 { SASL_CB_GETREALM, NULL, NULL }, 38 #endif 39 { SASL_CB_USER, NULL, NULL }, 40 { SASL_CB_AUTHNAME, NULL, NULL }, 41 { SASL_CB_PASS, NULL, NULL }, 42 { SASL_CB_LIST_END, NULL, NULL } 43 }; 44 #endif /* HAVE_CYRUS_SASL */ 45 46 static void upstream_unlink( LloadConnection *upstream ); 47 48 int 49 lload_upstream_entry_cmp( const void *l, const void *r ) 50 { 51 return SLAP_PTRCMP( l, r ); 52 } 53 54 static void 55 linked_upstream_lost( LloadConnection *client ) 56 { 57 int gentle = 1; 58 59 CONNECTION_LOCK(client); 60 assert( client->c_restricted >= LLOAD_OP_RESTRICTED_UPSTREAM ); 61 assert( client->c_linked_upstream ); 62 63 client->c_restricted = LLOAD_OP_NOT_RESTRICTED; 64 client->c_linked_upstream = NULL; 65 CONNECTION_UNLOCK(client); 66 lload_connection_close( client, &gentle ); 67 } 68 69 int 70 forward_response( LloadConnection *client, LloadOperation *op, BerElement *ber ) 71 { 72 BerElement *output; 73 BerValue response, controls = BER_BVNULL; 74 ber_int_t msgid; 75 ber_tag_t tag, response_tag; 76 ber_len_t len; 77 78 CONNECTION_LOCK(client); 79 if ( op->o_client_msgid ) { 80 msgid = op->o_client_msgid; 81 } else { 82 assert( op->o_pin_id ); 83 msgid = op->o_saved_msgid; 84 op->o_saved_msgid = 0; 85 } 86 CONNECTION_UNLOCK(client); 87 88 response_tag = ber_skip_element( ber, &response ); 89 90 tag = ber_peek_tag( ber, &len ); 91 if ( tag == LDAP_TAG_CONTROLS ) { 92 ber_skip_element( ber, &controls ); 93 } 94 95 Debug( LDAP_DEBUG_TRACE, "forward_response: " 96 "%s to client connid=%lu request msgid=%d\n", 97 lload_msgtype2str( response_tag ), op->o_client_connid, msgid ); 98 99 checked_lock( &client->c_io_mutex ); 100 output = client->c_pendingber; 101 if ( output == NULL && (output = ber_alloc()) == NULL ) { 102 ber_free( ber, 1 ); 103 checked_unlock( &client->c_io_mutex ); 104 return -1; 105 } 106 client->c_pendingber = output; 107 108 ber_printf( output, "t{titOtO}", LDAP_TAG_MESSAGE, 109 LDAP_TAG_MSGID, msgid, 110 response_tag, &response, 111 LDAP_TAG_CONTROLS, BER_BV_OPTIONAL( &controls ) ); 112 113 checked_unlock( &client->c_io_mutex ); 114 115 ber_free( ber, 1 ); 116 connection_write_cb( -1, 0, client ); 117 return 0; 118 } 119 120 int 121 forward_final_response( 122 LloadConnection *client, 123 LloadOperation *op, 124 BerElement *ber ) 125 { 126 int rc; 127 128 Debug( LDAP_DEBUG_STATS, "forward_final_response: " 129 "connid=%lu msgid=%d finishing up with a request for " 130 "client connid=%lu\n", 131 op->o_upstream_connid, op->o_upstream_msgid, op->o_client_connid ); 132 133 rc = forward_response( client, op, ber ); 134 135 op->o_res = LLOAD_OP_COMPLETED; 136 if ( !op->o_pin_id ) { 137 OPERATION_UNLINK(op); 138 } 139 140 return rc; 141 } 142 143 static int 144 handle_unsolicited( LloadConnection *c, BerElement *ber ) 145 { 146 CONNECTION_ASSERT_LOCKED(c); 147 148 assert( c->c_state != LLOAD_C_INVALID ); 149 if ( c->c_state == LLOAD_C_DYING ) { 150 CONNECTION_UNLOCK(c); 151 goto out; 152 } 153 c->c_state = LLOAD_C_CLOSING; 154 155 Debug( LDAP_DEBUG_STATS, "handle_unsolicited: " 156 "teardown for upstream connection connid=%lu\n", 157 c->c_connid ); 158 159 CONNECTION_DESTROY(c); 160 161 out: 162 ber_free( ber, 1 ); 163 return -1; 164 } 165 166 /* 167 * Pull c->c_currentber from the connection and try to look up the operation on 168 * the upstream. 169 * 170 * If it's a notice of disconnection, we won't find it and need to tear down 171 * the connection and tell the clients, if we can't find the operation, ignore 172 * the message (either client already disconnected/abandoned it or the upstream 173 * is pulling our leg). 174 * 175 * Some responses need special handling: 176 * - Bind response 177 * - VC response where the client requested a Bind (both need to update the 178 * client's bind status) 179 * - search entries/referrals and intermediate responses (will not trigger 180 * operation to be removed) 181 * 182 * If the worker pool is overloaded, we might be called directly from 183 * the read callback, at that point, the connection hasn't been muted. 184 * 185 * TODO: when the client already has data pending on write, we should mute the 186 * upstream. 187 * - should record the BerElement on the Op and the Op on the client 188 * 189 * The following hold on entering any of the handlers: 190 * - op->o_upstream_refcnt > 0 191 * - op->o_upstream->c_refcnt > 0 192 * - op->o_client->c_refcnt > 0 193 */ 194 static int 195 handle_one_response( LloadConnection *c ) 196 { 197 BerElement *ber; 198 LloadOperation *op = NULL, needle = { .o_upstream_connid = c->c_connid }; 199 LloadOperationHandler handler = NULL; 200 ber_tag_t tag; 201 ber_len_t len; 202 int rc = LDAP_SUCCESS; 203 204 ber = c->c_currentber; 205 c->c_currentber = NULL; 206 207 tag = ber_get_int( ber, &needle.o_upstream_msgid ); 208 if ( tag != LDAP_TAG_MSGID ) { 209 rc = -1; 210 ber_free( ber, 1 ); 211 goto fail; 212 } 213 214 CONNECTION_LOCK(c); 215 if ( needle.o_upstream_msgid == 0 ) { 216 return handle_unsolicited( c, ber ); 217 } else if ( !( op = ldap_tavl_find( 218 c->c_ops, &needle, operation_upstream_cmp ) ) ) { 219 /* Already abandoned, do nothing */ 220 CONNECTION_UNLOCK(c); 221 ber_free( ber, 1 ); 222 return rc; 223 /* 224 } else if ( op->o_response_pending ) { 225 c->c_pendingop = op; 226 event_del( c->c_read_event ); 227 */ 228 } else { 229 CONNECTION_UNLOCK(c); 230 /* 231 op->o_response_pending = ber; 232 */ 233 234 tag = ber_peek_tag( ber, &len ); 235 switch ( tag ) { 236 case LDAP_RES_SEARCH_ENTRY: 237 case LDAP_RES_SEARCH_REFERENCE: 238 case LDAP_RES_INTERMEDIATE: 239 handler = forward_response; 240 break; 241 case LDAP_RES_BIND: 242 handler = handle_bind_response; 243 break; 244 case LDAP_RES_EXTENDED: 245 if ( op->o_tag == LDAP_REQ_BIND ) { 246 #ifdef LDAP_API_FEATURE_VERIFY_CREDENTIALS 247 if ( lload_features & LLOAD_FEATURE_VC ) { 248 handler = handle_vc_bind_response; 249 } else 250 #endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */ 251 { 252 handler = handle_whoami_response; 253 } 254 } 255 break; 256 } 257 if ( !handler ) { 258 handler = forward_final_response; 259 } 260 } 261 if ( op ) { 262 struct timeval tv, tvdiff; 263 uintptr_t diff; 264 265 gettimeofday( &tv, NULL ); 266 if ( !timerisset( &op->o_last_response ) ) { 267 LloadBackend *b = c->c_backend; 268 269 timersub( &tv, &op->o_start, &tvdiff ); 270 diff = 1000000 * tvdiff.tv_sec + tvdiff.tv_usec; 271 272 __atomic_add_fetch( &b->b_operation_count, 1, __ATOMIC_RELAXED ); 273 __atomic_add_fetch( &b->b_operation_time, diff, __ATOMIC_RELAXED ); 274 } 275 op->o_last_response = tv; 276 277 Debug( LDAP_DEBUG_STATS2, "handle_one_response: " 278 "upstream connid=%lu, processing response for " 279 "client connid=%lu, msgid=%d\n", 280 c->c_connid, op->o_client_connid, op->o_client_msgid ); 281 } else { 282 tag = ber_peek_tag( ber, &len ); 283 Debug( LDAP_DEBUG_STATS2, "handle_one_response: " 284 "upstream connid=%lu, %s, msgid=%d not for a pending " 285 "operation\n", 286 c->c_connid, lload_msgtype2str( tag ), 287 needle.o_upstream_msgid ); 288 } 289 290 if ( handler ) { 291 LloadConnection *client; 292 293 checked_lock( &op->o_link_mutex ); 294 client = op->o_client; 295 checked_unlock( &op->o_link_mutex ); 296 if ( client && IS_ALIVE( client, c_live ) ) { 297 rc = handler( client, op, ber ); 298 } else { 299 ber_free( ber, 1 ); 300 } 301 } else { 302 assert(0); 303 ber_free( ber, 1 ); 304 } 305 306 fail: 307 if ( rc ) { 308 Debug( LDAP_DEBUG_STATS, "handle_one_response: " 309 "error on processing a response (%s) on upstream connection " 310 "connid=%ld, tag=%lx\n", 311 lload_msgtype2str( tag ), c->c_connid, tag ); 312 CONNECTION_LOCK_DESTROY(c); 313 } 314 return rc; 315 } 316 317 #ifdef HAVE_CYRUS_SASL 318 static int 319 sasl_bind_step( LloadConnection *c, BerValue *scred, BerValue *ccred ) 320 { 321 LloadBackend *b = c->c_backend; 322 sasl_conn_t *ctx = c->c_sasl_authctx; 323 sasl_interact_t *prompts = NULL; 324 unsigned credlen; 325 int rc = -1; 326 327 if ( !ctx ) { 328 const char *mech = NULL; 329 #ifdef HAVE_TLS 330 void *ssl; 331 #endif /* HAVE_TLS */ 332 333 if ( sasl_client_new( "ldap", b->b_host, NULL, NULL, client_callbacks, 334 0, &ctx ) != SASL_OK ) { 335 goto done; 336 } 337 c->c_sasl_authctx = ctx; 338 339 assert( c->c_sasl_defaults == NULL ); 340 c->c_sasl_defaults = 341 lutil_sasl_defaults( NULL, bindconf.sb_saslmech.bv_val, 342 bindconf.sb_realm.bv_val, bindconf.sb_authcId.bv_val, 343 bindconf.sb_cred.bv_val, bindconf.sb_authzId.bv_val ); 344 345 #ifdef HAVE_TLS 346 /* Check for TLS */ 347 ssl = ldap_pvt_tls_sb_ctx( c->c_sb ); 348 if ( ssl ) { 349 struct berval authid = BER_BVNULL; 350 ber_len_t ssf; 351 352 ssf = ldap_pvt_tls_get_strength( ssl ); 353 (void)ldap_pvt_tls_get_my_dn( ssl, &authid, NULL, 0 ); 354 355 sasl_setprop( ctx, SASL_SSF_EXTERNAL, &ssf ); 356 sasl_setprop( ctx, SASL_AUTH_EXTERNAL, authid.bv_val ); 357 ch_free( authid.bv_val ); 358 #ifdef SASL_CHANNEL_BINDING /* 2.1.25+ */ 359 { 360 char cbinding[64]; 361 struct berval cbv = { sizeof(cbinding), cbinding }; 362 if ( ldap_pvt_tls_get_unique( ssl, &cbv, 0 ) ) { 363 sasl_channel_binding_t *cb = 364 ch_malloc( sizeof(*cb) + cbv.bv_len ); 365 void *cb_data; 366 cb->name = "ldap"; 367 cb->critical = 0; 368 cb->len = cbv.bv_len; 369 cb->data = cb_data = cb + 1; 370 memcpy( cb_data, cbv.bv_val, cbv.bv_len ); 371 sasl_setprop( ctx, SASL_CHANNEL_BINDING, cb ); 372 c->c_sasl_cbinding = cb; 373 } 374 } 375 #endif 376 } 377 #endif 378 379 #if !defined(_WIN32) 380 /* Check for local */ 381 if ( b->b_proto == LDAP_PROTO_IPC ) { 382 char authid[sizeof( "gidNumber=4294967295+uidNumber=4294967295," 383 "cn=peercred,cn=external,cn=auth" )]; 384 int ssf = LDAP_PVT_SASL_LOCAL_SSF; 385 386 sprintf( authid, 387 "gidNumber=%u+uidNumber=%u," 388 "cn=peercred,cn=external,cn=auth", 389 getegid(), geteuid() ); 390 sasl_setprop( ctx, SASL_SSF_EXTERNAL, &ssf ); 391 sasl_setprop( ctx, SASL_AUTH_EXTERNAL, authid ); 392 } 393 #endif 394 395 do { 396 rc = sasl_client_start( ctx, bindconf.sb_saslmech.bv_val, 397 &prompts, 398 (const char **)&ccred->bv_val, &credlen, 399 &mech ); 400 401 if ( rc == SASL_INTERACT ) { 402 if ( lutil_sasl_interact( NULL, LDAP_SASL_QUIET, 403 c->c_sasl_defaults, prompts ) ) { 404 break; 405 } 406 } 407 } while ( rc == SASL_INTERACT ); 408 409 ber_str2bv( mech, 0, 0, &c->c_sasl_bind_mech ); 410 } else { 411 assert( c->c_sasl_defaults ); 412 413 do { 414 rc = sasl_client_step( ctx, 415 (scred == NULL) ? NULL : scred->bv_val, 416 (scred == NULL) ? 0 : scred->bv_len, 417 &prompts, 418 (const char **)&ccred->bv_val, &credlen); 419 420 if ( rc == SASL_INTERACT ) { 421 if ( lutil_sasl_interact( NULL, LDAP_SASL_QUIET, 422 c->c_sasl_defaults, prompts ) ) { 423 break; 424 } 425 } 426 } while ( rc == SASL_INTERACT ); 427 } 428 429 if ( rc == SASL_OK ) { 430 sasl_ssf_t *ssf; 431 rc = sasl_getprop( ctx, SASL_SSF, (const void **)(char *)&ssf ); 432 if ( rc == SASL_OK && ssf && *ssf ) { 433 Debug( LDAP_DEBUG_CONNS, "sasl_bind_step: " 434 "connid=%lu mech=%s setting up a new SASL security layer\n", 435 c->c_connid, c->c_sasl_bind_mech.bv_val ); 436 ldap_pvt_sasl_install( c->c_sb, ctx ); 437 } 438 } 439 ccred->bv_len = credlen; 440 441 done: 442 Debug( LDAP_DEBUG_TRACE, "sasl_bind_step: " 443 "connid=%lu next step for SASL bind mech=%s rc=%d\n", 444 c->c_connid, c->c_sasl_bind_mech.bv_val, rc ); 445 return rc; 446 } 447 #endif /* HAVE_CYRUS_SASL */ 448 449 int 450 upstream_bind_cb( LloadConnection *c ) 451 { 452 BerElement *ber = c->c_currentber; 453 LloadBackend *b = c->c_backend; 454 BerValue matcheddn, message; 455 ber_tag_t tag; 456 ber_int_t msgid, result; 457 458 c->c_currentber = NULL; 459 460 if ( ber_scanf( ber, "it", &msgid, &tag ) == LBER_ERROR ) { 461 Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: " 462 "protocol violation from server\n" ); 463 goto fail; 464 } 465 466 if ( msgid != ( c->c_next_msgid - 1 ) || tag != LDAP_RES_BIND ) { 467 Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: " 468 "unexpected %s from server, msgid=%d\n", 469 lload_msgtype2str( tag ), msgid ); 470 goto fail; 471 } 472 473 if ( ber_scanf( ber, "{emm" /* "}" */, &result, &matcheddn, &message ) == 474 LBER_ERROR ) { 475 Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: " 476 "response does not conform with a bind response\n" ); 477 goto fail; 478 } 479 480 switch ( result ) { 481 case LDAP_SUCCESS: 482 #ifdef HAVE_CYRUS_SASL 483 case LDAP_SASL_BIND_IN_PROGRESS: 484 if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) { 485 BerValue scred = BER_BVNULL, ccred; 486 ber_len_t len; 487 int rc; 488 489 if ( ber_peek_tag( ber, &len ) == LDAP_TAG_SASL_RES_CREDS && 490 ber_scanf( ber, "m", &scred ) == LBER_ERROR ) { 491 Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: " 492 "sasl bind response malformed\n" ); 493 goto fail; 494 } 495 496 rc = sasl_bind_step( c, &scred, &ccred ); 497 if ( rc != SASL_OK && 498 ( rc != SASL_CONTINUE || result == LDAP_SUCCESS ) ) { 499 goto fail; 500 } 501 502 if ( result == LDAP_SASL_BIND_IN_PROGRESS ) { 503 BerElement *outber; 504 505 checked_lock( &c->c_io_mutex ); 506 outber = c->c_pendingber; 507 if ( outber == NULL && (outber = ber_alloc()) == NULL ) { 508 checked_unlock( &c->c_io_mutex ); 509 goto fail; 510 } 511 c->c_pendingber = outber; 512 513 msgid = c->c_next_msgid++; 514 ber_printf( outber, "{it{iOt{OON}N}}", 515 msgid, LDAP_REQ_BIND, LDAP_VERSION3, 516 &bindconf.sb_binddn, LDAP_AUTH_SASL, 517 &c->c_sasl_bind_mech, BER_BV_OPTIONAL( &ccred ) ); 518 checked_unlock( &c->c_io_mutex ); 519 520 connection_write_cb( -1, 0, c ); 521 522 if ( rc == SASL_OK ) { 523 BER_BVZERO( &c->c_sasl_bind_mech ); 524 } 525 break; 526 } 527 } 528 if ( result == LDAP_SASL_BIND_IN_PROGRESS ) { 529 goto fail; 530 } 531 #endif /* HAVE_CYRUS_SASL */ 532 CONNECTION_LOCK(c); 533 c->c_pdu_cb = handle_one_response; 534 c->c_state = LLOAD_C_READY; 535 c->c_type = LLOAD_C_OPEN; 536 c->c_read_timeout = NULL; 537 Debug( LDAP_DEBUG_CONNS, "upstream_bind_cb: " 538 "connection connid=%lu for backend server '%s' is ready " 539 "for use\n", 540 c->c_connid, b->b_name.bv_val ); 541 CONNECTION_UNLOCK(c); 542 checked_lock( &b->b_mutex ); 543 LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next ); 544 b->b_active++; 545 b->b_opening--; 546 b->b_failed = 0; 547 if ( b->b_last_conn ) { 548 LDAP_CIRCLEQ_INSERT_AFTER( 549 &b->b_conns, b->b_last_conn, c, c_next ); 550 } else { 551 LDAP_CIRCLEQ_INSERT_HEAD( &b->b_conns, c, c_next ); 552 } 553 b->b_last_conn = c; 554 backend_retry( b ); 555 checked_unlock( &b->b_mutex ); 556 break; 557 default: 558 Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: " 559 "upstream bind failed, rc=%d, message='%s'\n", 560 result, message.bv_val ); 561 goto fail; 562 } 563 564 checked_lock( &c->c_io_mutex ); 565 c->c_io_state &= ~LLOAD_C_READ_HANDOVER; 566 checked_unlock( &c->c_io_mutex ); 567 event_add( c->c_read_event, c->c_read_timeout ); 568 ber_free( ber, 1 ); 569 return -1; 570 571 fail: 572 CONNECTION_LOCK_DESTROY(c); 573 ber_free( ber, 1 ); 574 return -1; 575 } 576 577 void * 578 upstream_bind( void *ctx, void *arg ) 579 { 580 LloadConnection *c = arg; 581 BerElement *ber; 582 ber_int_t msgid; 583 584 /* A reference was passed on to us */ 585 assert( IS_ALIVE( c, c_refcnt ) ); 586 587 if ( !IS_ALIVE( c, c_live ) ) { 588 RELEASE_REF( c, c_refcnt, c->c_destroy ); 589 return NULL; 590 } 591 592 CONNECTION_LOCK(c); 593 assert( !event_pending( c->c_read_event, EV_READ, NULL ) ); 594 c->c_pdu_cb = upstream_bind_cb; 595 CONNECTION_UNLOCK(c); 596 597 checked_lock( &c->c_io_mutex ); 598 ber = c->c_pendingber; 599 if ( ber == NULL && (ber = ber_alloc()) == NULL ) { 600 goto fail; 601 } 602 c->c_pendingber = ber; 603 msgid = c->c_next_msgid++; 604 605 if ( bindconf.sb_method == LDAP_AUTH_SIMPLE ) { 606 /* simple bind */ 607 ber_printf( ber, "{it{iOtON}}", 608 msgid, LDAP_REQ_BIND, LDAP_VERSION3, 609 &bindconf.sb_binddn, LDAP_AUTH_SIMPLE, 610 &bindconf.sb_cred ); 611 612 #ifdef HAVE_CYRUS_SASL 613 } else { 614 BerValue cred; 615 int rc; 616 617 rc = sasl_bind_step( c, NULL, &cred ); 618 if ( rc != SASL_OK && rc != SASL_CONTINUE ) { 619 goto fail; 620 } 621 622 ber_printf( ber, "{it{iOt{OON}N}}", 623 msgid, LDAP_REQ_BIND, LDAP_VERSION3, 624 &bindconf.sb_binddn, LDAP_AUTH_SASL, 625 &c->c_sasl_bind_mech, BER_BV_OPTIONAL( &cred ) ); 626 627 if ( rc == SASL_OK ) { 628 BER_BVZERO( &c->c_sasl_bind_mech ); 629 } 630 #endif /* HAVE_CYRUS_SASL */ 631 } 632 /* TODO: can we be paused at this point? Then we'd have to move this line 633 * after connection_write_cb */ 634 c->c_io_state &= ~LLOAD_C_READ_HANDOVER; 635 checked_unlock( &c->c_io_mutex ); 636 637 connection_write_cb( -1, 0, c ); 638 639 CONNECTION_LOCK(c); 640 c->c_read_timeout = lload_timeout_net; 641 event_add( c->c_read_event, c->c_read_timeout ); 642 CONNECTION_UNLOCK(c); 643 644 RELEASE_REF( c, c_refcnt, c->c_destroy ); 645 return NULL; 646 647 fail: 648 checked_unlock( &c->c_io_mutex ); 649 CONNECTION_LOCK_DESTROY(c); 650 RELEASE_REF( c, c_refcnt, c->c_destroy ); 651 return NULL; 652 } 653 654 /* 655 * The backend is already locked when entering the function. 656 */ 657 static int 658 upstream_finish( LloadConnection *c ) 659 { 660 LloadBackend *b = c->c_backend; 661 int is_bindconn = 0; 662 663 assert_locked( &b->b_mutex ); 664 CONNECTION_ASSERT_LOCKED(c); 665 assert( c->c_live ); 666 c->c_pdu_cb = handle_one_response; 667 668 /* Unless we are configured to use the VC exop, consider allocating the 669 * connection into the bind conn pool. Start off by allocating one for 670 * general use, then one for binds, then we start filling up the general 671 * connection pool, finally the bind pool */ 672 if ( 673 #ifdef LDAP_API_FEATURE_VERIFY_CREDENTIALS 674 !(lload_features & LLOAD_FEATURE_VC) && 675 #endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */ 676 b->b_active && b->b_numbindconns ) { 677 if ( !b->b_bindavail ) { 678 is_bindconn = 1; 679 } else if ( b->b_active >= b->b_numconns && 680 b->b_bindavail < b->b_numbindconns ) { 681 is_bindconn = 1; 682 } 683 } 684 685 if ( is_bindconn ) { 686 LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next ); 687 c->c_state = LLOAD_C_READY; 688 c->c_type = LLOAD_C_BIND; 689 b->b_bindavail++; 690 b->b_opening--; 691 b->b_failed = 0; 692 if ( b->b_last_bindconn ) { 693 LDAP_CIRCLEQ_INSERT_AFTER( 694 &b->b_bindconns, b->b_last_bindconn, c, c_next ); 695 } else { 696 LDAP_CIRCLEQ_INSERT_HEAD( &b->b_bindconns, c, c_next ); 697 } 698 b->b_last_bindconn = c; 699 } else if ( bindconf.sb_method == LDAP_AUTH_NONE ) { 700 LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next ); 701 c->c_state = LLOAD_C_READY; 702 c->c_type = LLOAD_C_OPEN; 703 b->b_active++; 704 b->b_opening--; 705 b->b_failed = 0; 706 if ( b->b_last_conn ) { 707 LDAP_CIRCLEQ_INSERT_AFTER( &b->b_conns, b->b_last_conn, c, c_next ); 708 } else { 709 LDAP_CIRCLEQ_INSERT_HEAD( &b->b_conns, c, c_next ); 710 } 711 b->b_last_conn = c; 712 } else { 713 if ( ldap_pvt_thread_pool_submit( 714 &connection_pool, upstream_bind, c ) ) { 715 Debug( LDAP_DEBUG_ANY, "upstream_finish: " 716 "failed to set up a bind callback for connid=%lu\n", 717 c->c_connid ); 718 return -1; 719 } 720 /* keep a reference for upstream_bind */ 721 acquire_ref( &c->c_refcnt ); 722 723 Debug( LDAP_DEBUG_CONNS, "upstream_finish: " 724 "scheduled a bind callback for connid=%lu\n", 725 c->c_connid ); 726 return LDAP_SUCCESS; 727 } 728 event_add( c->c_read_event, c->c_read_timeout ); 729 730 Debug( LDAP_DEBUG_CONNS, "upstream_finish: " 731 "%sconnection connid=%lu for backend server '%s' is ready for " 732 "use\n", 733 is_bindconn ? "bind " : "", c->c_connid, b->b_name.bv_val ); 734 735 backend_retry( b ); 736 return LDAP_SUCCESS; 737 } 738 739 #ifdef HAVE_TLS 740 static void 741 upstream_tls_handshake_cb( evutil_socket_t s, short what, void *arg ) 742 { 743 LloadConnection *c = arg; 744 LloadBackend *b; 745 epoch_t epoch; 746 int rc = LDAP_SUCCESS; 747 748 CONNECTION_LOCK(c); 749 if ( what & EV_TIMEOUT ) { 750 Debug( LDAP_DEBUG_CONNS, "upstream_tls_handshake_cb: " 751 "connid=%lu, timeout reached, destroying\n", 752 c->c_connid ); 753 goto fail; 754 } 755 b = c->c_backend; 756 757 rc = ldap_pvt_tls_connect( lload_tls_backend_ld, c->c_sb, b->b_host ); 758 if ( rc < 0 ) { 759 goto fail; 760 } 761 762 if ( rc == 0 ) { 763 struct event_base *base = event_get_base( c->c_read_event ); 764 765 /* 766 * We're finished, replace the callbacks 767 * 768 * This is deadlock-safe, since both share the same base - the one 769 * that's just running us. 770 */ 771 event_del( c->c_read_event ); 772 event_del( c->c_write_event ); 773 774 c->c_read_timeout = NULL; 775 event_assign( c->c_read_event, base, c->c_fd, EV_READ|EV_PERSIST, 776 connection_read_cb, c ); 777 event_assign( c->c_write_event, base, c->c_fd, EV_WRITE, 778 connection_write_cb, c ); 779 Debug( LDAP_DEBUG_CONNS, "upstream_tls_handshake_cb: " 780 "connid=%lu finished\n", 781 c->c_connid ); 782 c->c_is_tls = LLOAD_TLS_ESTABLISHED; 783 784 CONNECTION_UNLOCK(c); 785 checked_lock( &b->b_mutex ); 786 CONNECTION_LOCK(c); 787 788 rc = upstream_finish( c ); 789 checked_unlock( &b->b_mutex ); 790 791 if ( rc ) { 792 goto fail; 793 } 794 } else if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_WRITE, NULL ) ) { 795 event_add( c->c_write_event, lload_write_timeout ); 796 Debug( LDAP_DEBUG_CONNS, "upstream_tls_handshake_cb: " 797 "connid=%lu need write rc=%d\n", 798 c->c_connid, rc ); 799 } 800 CONNECTION_UNLOCK(c); 801 return; 802 803 fail: 804 Debug( LDAP_DEBUG_CONNS, "upstream_tls_handshake_cb: " 805 "connid=%lu failed rc=%d\n", 806 c->c_connid, rc ); 807 808 assert( c->c_ops == NULL ); 809 epoch = epoch_join(); 810 CONNECTION_DESTROY(c); 811 epoch_leave( epoch ); 812 } 813 814 static int 815 upstream_starttls( LloadConnection *c ) 816 { 817 BerValue matcheddn, message, responseOid, 818 startTLSOid = BER_BVC(LDAP_EXOP_START_TLS); 819 BerElement *ber = c->c_currentber; 820 struct event_base *base; 821 ber_int_t msgid, result; 822 ber_tag_t tag; 823 824 c->c_currentber = NULL; 825 CONNECTION_LOCK(c); 826 827 if ( ber_scanf( ber, "it", &msgid, &tag ) == LBER_ERROR ) { 828 Debug( LDAP_DEBUG_ANY, "upstream_starttls: " 829 "protocol violation from server\n" ); 830 goto fail; 831 } 832 833 if ( msgid != ( c->c_next_msgid - 1 ) || tag != LDAP_RES_EXTENDED ) { 834 Debug( LDAP_DEBUG_ANY, "upstream_starttls: " 835 "unexpected %s from server, msgid=%d\n", 836 lload_msgtype2str( tag ), msgid ); 837 goto fail; 838 } 839 840 if ( ber_scanf( ber, "{emm}", &result, &matcheddn, &message ) == 841 LBER_ERROR ) { 842 Debug( LDAP_DEBUG_ANY, "upstream_starttls: " 843 "protocol violation on StartTLS response\n" ); 844 goto fail; 845 } 846 847 if ( (tag = ber_get_tag( ber )) != LBER_DEFAULT ) { 848 if ( tag != LDAP_TAG_EXOP_RES_OID || 849 ber_scanf( ber, "{m}", &responseOid ) == LBER_DEFAULT ) { 850 Debug( LDAP_DEBUG_ANY, "upstream_starttls: " 851 "protocol violation on StartTLS response\n" ); 852 goto fail; 853 } 854 855 if ( ber_bvcmp( &responseOid, &startTLSOid ) ) { 856 Debug( LDAP_DEBUG_ANY, "upstream_starttls: " 857 "oid=%s not a StartTLS response\n", 858 responseOid.bv_val ); 859 goto fail; 860 } 861 } 862 863 if ( result != LDAP_SUCCESS ) { 864 LloadBackend *b = c->c_backend; 865 int rc; 866 867 Debug( LDAP_DEBUG_STATS, "upstream_starttls: " 868 "server doesn't support StartTLS rc=%d message='%s'%s\n", 869 result, message.bv_val, 870 (c->c_is_tls == LLOAD_STARTTLS_OPTIONAL) ? ", ignored" : "" ); 871 if ( c->c_is_tls != LLOAD_STARTTLS_OPTIONAL ) { 872 goto fail; 873 } 874 c->c_is_tls = LLOAD_CLEARTEXT; 875 876 CONNECTION_UNLOCK(c); 877 checked_lock( &b->b_mutex ); 878 CONNECTION_LOCK(c); 879 880 rc = upstream_finish( c ); 881 checked_unlock( &b->b_mutex ); 882 883 if ( rc ) { 884 goto fail; 885 } 886 887 ber_free( ber, 1 ); 888 CONNECTION_UNLOCK(c); 889 890 checked_lock( &c->c_io_mutex ); 891 c->c_io_state &= ~LLOAD_C_READ_HANDOVER; 892 checked_unlock( &c->c_io_mutex ); 893 894 /* Do not keep handle_pdus running, we have adjusted c_read_event as we 895 * need it. */ 896 return -1; 897 } 898 899 base = event_get_base( c->c_read_event ); 900 901 c->c_io_state &= ~LLOAD_C_READ_HANDOVER; 902 event_del( c->c_read_event ); 903 event_del( c->c_write_event ); 904 905 c->c_read_timeout = lload_timeout_net; 906 event_assign( c->c_read_event, base, c->c_fd, EV_READ|EV_PERSIST, 907 upstream_tls_handshake_cb, c ); 908 event_assign( c->c_write_event, base, c->c_fd, EV_WRITE, 909 upstream_tls_handshake_cb, c ); 910 911 event_add( c->c_read_event, c->c_read_timeout ); 912 event_add( c->c_write_event, lload_write_timeout ); 913 914 CONNECTION_UNLOCK(c); 915 916 ber_free( ber, 1 ); 917 return -1; 918 919 fail: 920 ber_free( ber, 1 ); 921 CONNECTION_DESTROY(c); 922 return -1; 923 } 924 #endif /* HAVE_TLS */ 925 926 /* 927 * We must already hold b->b_mutex when called. 928 */ 929 LloadConnection * 930 upstream_init( ber_socket_t s, LloadBackend *b ) 931 { 932 LloadConnection *c; 933 struct event_base *base = lload_get_base( s ); 934 struct event *event; 935 int flags; 936 937 assert( b != NULL ); 938 939 flags = (b->b_proto == LDAP_PROTO_IPC) ? CONN_IS_IPC : 0; 940 if ( (c = lload_connection_init( s, b->b_host, flags )) == NULL ) { 941 return NULL; 942 } 943 944 CONNECTION_LOCK(c); 945 c->c_backend = b; 946 #ifdef HAVE_TLS 947 c->c_is_tls = b->b_tls; 948 #endif 949 c->c_pdu_cb = handle_one_response; 950 951 LDAP_CIRCLEQ_INSERT_HEAD( &b->b_preparing, c, c_next ); 952 c->c_type = LLOAD_C_PREPARING; 953 954 { 955 ber_len_t max = sockbuf_max_incoming_upstream; 956 ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_MAX_INCOMING, &max ); 957 } 958 959 event = event_new( base, s, EV_READ|EV_PERSIST, connection_read_cb, c ); 960 if ( !event ) { 961 Debug( LDAP_DEBUG_ANY, "upstream_init: " 962 "Read event could not be allocated\n" ); 963 goto fail; 964 } 965 c->c_read_event = event; 966 967 event = event_new( base, s, EV_WRITE, connection_write_cb, c ); 968 if ( !event ) { 969 Debug( LDAP_DEBUG_ANY, "upstream_init: " 970 "Write event could not be allocated\n" ); 971 goto fail; 972 } 973 /* We only add the write event when we have data pending */ 974 c->c_write_event = event; 975 976 #ifdef BALANCER_MODULE 977 if ( b->b_monitor ) { 978 acquire_ref( &c->c_refcnt ); 979 CONNECTION_UNLOCK(c); 980 checked_unlock( &b->b_mutex ); 981 if ( lload_monitor_conn_entry_create( c, b->b_monitor ) ) { 982 RELEASE_REF( c, c_refcnt, c->c_destroy ); 983 checked_lock( &b->b_mutex ); 984 CONNECTION_LOCK(c); 985 goto fail; 986 } 987 checked_lock( &b->b_mutex ); 988 CONNECTION_LOCK(c); 989 RELEASE_REF( c, c_refcnt, c->c_destroy ); 990 } 991 #endif /* BALANCER_MODULE */ 992 993 c->c_destroy = upstream_destroy; 994 c->c_unlink = upstream_unlink; 995 996 #ifdef HAVE_TLS 997 if ( c->c_is_tls == LLOAD_CLEARTEXT ) { 998 #endif /* HAVE_TLS */ 999 if ( upstream_finish( c ) ) { 1000 goto fail; 1001 } 1002 #ifdef HAVE_TLS 1003 } else if ( c->c_is_tls == LLOAD_LDAPS ) { 1004 event_assign( c->c_read_event, base, s, EV_READ|EV_PERSIST, 1005 upstream_tls_handshake_cb, c ); 1006 event_add( c->c_read_event, c->c_read_timeout ); 1007 event_assign( c->c_write_event, base, s, EV_WRITE, 1008 upstream_tls_handshake_cb, c ); 1009 event_add( c->c_write_event, lload_write_timeout ); 1010 } else if ( c->c_is_tls == LLOAD_STARTTLS || 1011 c->c_is_tls == LLOAD_STARTTLS_OPTIONAL ) { 1012 BerElement *output; 1013 1014 checked_lock( &c->c_io_mutex ); 1015 if ( (output = c->c_pendingber = ber_alloc()) == NULL ) { 1016 checked_unlock( &c->c_io_mutex ); 1017 goto fail; 1018 } 1019 ber_printf( output, "t{tit{ts}}", LDAP_TAG_MESSAGE, 1020 LDAP_TAG_MSGID, c->c_next_msgid++, 1021 LDAP_REQ_EXTENDED, 1022 LDAP_TAG_EXOP_REQ_OID, LDAP_EXOP_START_TLS ); 1023 checked_unlock( &c->c_io_mutex ); 1024 1025 c->c_pdu_cb = upstream_starttls; 1026 CONNECTION_UNLOCK(c); 1027 connection_write_cb( s, 0, c ); 1028 CONNECTION_LOCK(c); 1029 if ( IS_ALIVE( c, c_live ) ) { 1030 event_add( c->c_read_event, c->c_read_timeout ); 1031 } 1032 } 1033 #endif /* HAVE_TLS */ 1034 CONNECTION_UNLOCK(c); 1035 1036 return c; 1037 1038 fail: 1039 if ( !IS_ALIVE( c, c_live ) ) { 1040 /* 1041 * Released while we were unlocked, it's scheduled for destruction 1042 * already 1043 */ 1044 return NULL; 1045 } 1046 1047 if ( c->c_write_event ) { 1048 event_del( c->c_write_event ); 1049 event_free( c->c_write_event ); 1050 } 1051 if ( c->c_read_event ) { 1052 event_del( c->c_read_event ); 1053 event_free( c->c_read_event ); 1054 } 1055 1056 c->c_state = LLOAD_C_INVALID; 1057 c->c_live--; 1058 c->c_refcnt--; 1059 connection_destroy( c ); 1060 1061 return NULL; 1062 } 1063 1064 static void 1065 upstream_unlink( LloadConnection *c ) 1066 { 1067 LloadBackend *b = c->c_backend; 1068 struct event *read_event, *write_event; 1069 TAvlnode *root, *linked_root; 1070 long freed, executing; 1071 1072 Debug( LDAP_DEBUG_CONNS, "upstream_unlink: " 1073 "removing upstream connid=%lu\n", 1074 c->c_connid ); 1075 CONNECTION_ASSERT_LOCKED(c); 1076 1077 assert( c->c_state != LLOAD_C_INVALID ); 1078 assert( c->c_state != LLOAD_C_DYING ); 1079 1080 c->c_state = LLOAD_C_DYING; 1081 1082 read_event = c->c_read_event; 1083 write_event = c->c_write_event; 1084 1085 root = c->c_ops; 1086 c->c_ops = NULL; 1087 executing = c->c_n_ops_executing; 1088 c->c_n_ops_executing = 0; 1089 1090 linked_root = c->c_linked; 1091 c->c_linked = NULL; 1092 1093 CONNECTION_UNLOCK(c); 1094 1095 freed = ldap_tavl_free( root, (AVL_FREE)operation_lost_upstream ); 1096 assert( freed == executing ); 1097 1098 ldap_tavl_free( linked_root, (AVL_FREE)linked_upstream_lost ); 1099 1100 /* 1101 * Avoid a deadlock: 1102 * event_del will block if the event is currently executing its callback, 1103 * that callback might be waiting to lock c->c_mutex 1104 */ 1105 if ( read_event ) { 1106 event_del( read_event ); 1107 } 1108 1109 if ( write_event ) { 1110 event_del( write_event ); 1111 } 1112 1113 checked_lock( &b->b_mutex ); 1114 if ( c->c_type == LLOAD_C_PREPARING ) { 1115 LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next ); 1116 b->b_opening--; 1117 b->b_failed++; 1118 } else if ( c->c_type == LLOAD_C_BIND ) { 1119 if ( c == b->b_last_bindconn ) { 1120 LloadConnection *prev = 1121 LDAP_CIRCLEQ_LOOP_PREV( &b->b_bindconns, c, c_next ); 1122 if ( prev == c ) { 1123 b->b_last_bindconn = NULL; 1124 } else { 1125 b->b_last_bindconn = prev; 1126 } 1127 } 1128 LDAP_CIRCLEQ_REMOVE( &b->b_bindconns, c, c_next ); 1129 b->b_bindavail--; 1130 } else { 1131 if ( c == b->b_last_conn ) { 1132 LloadConnection *prev = 1133 LDAP_CIRCLEQ_LOOP_PREV( &b->b_conns, c, c_next ); 1134 if ( prev == c ) { 1135 b->b_last_conn = NULL; 1136 } else { 1137 b->b_last_conn = prev; 1138 } 1139 } 1140 LDAP_CIRCLEQ_REMOVE( &b->b_conns, c, c_next ); 1141 b->b_active--; 1142 } 1143 b->b_n_ops_executing -= executing; 1144 backend_retry( b ); 1145 checked_unlock( &b->b_mutex ); 1146 1147 CONNECTION_LOCK(c); 1148 CONNECTION_ASSERT_LOCKED(c); 1149 } 1150 1151 void 1152 upstream_destroy( LloadConnection *c ) 1153 { 1154 Debug( LDAP_DEBUG_CONNS, "upstream_destroy: " 1155 "freeing connection connid=%lu\n", 1156 c->c_connid ); 1157 1158 CONNECTION_LOCK(c); 1159 assert( c->c_state == LLOAD_C_DYING ); 1160 1161 #ifdef BALANCER_MODULE 1162 /* 1163 * Can't do this in upstream_unlink as that could be run from cn=monitor 1164 * modify callback. 1165 */ 1166 if ( !BER_BVISNULL( &c->c_monitor_dn ) ) { 1167 lload_monitor_conn_unlink( c ); 1168 } 1169 #endif /* BALANCER_MODULE */ 1170 1171 c->c_state = LLOAD_C_INVALID; 1172 1173 assert( c->c_ops == NULL ); 1174 1175 if ( c->c_read_event ) { 1176 event_free( c->c_read_event ); 1177 c->c_read_event = NULL; 1178 } 1179 1180 if ( c->c_write_event ) { 1181 event_free( c->c_write_event ); 1182 c->c_write_event = NULL; 1183 } 1184 1185 if ( c->c_type != LLOAD_C_BIND ) { 1186 BER_BVZERO( &c->c_sasl_bind_mech ); 1187 } 1188 connection_destroy( c ); 1189 } 1190