Home | History | Annotate | Line # | Download | only in lloadd
      1 /*	$NetBSD: upstream.c,v 1.3 2025/09/05 21:16:24 christos Exp $	*/
      2 
      3 /* $OpenLDAP$ */
      4 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
      5  *
      6  * Copyright 1998-2024 The OpenLDAP Foundation.
      7  * All rights reserved.
      8  *
      9  * Redistribution and use in source and binary forms, with or without
     10  * modification, are permitted only as authorized by the OpenLDAP
     11  * Public License.
     12  *
     13  * A copy of this license is available in the file LICENSE in the
     14  * top-level directory of the distribution or, alternatively, at
     15  * <http://www.OpenLDAP.org/license.html>.
     16  */
     17 
     18 #include <sys/cdefs.h>
     19 __RCSID("$NetBSD: upstream.c,v 1.3 2025/09/05 21:16:24 christos Exp $");
     20 
     21 #include "portable.h"
     22 
     23 #include <ac/socket.h>
     24 #include <ac/errno.h>
     25 #include <ac/string.h>
     26 #include <ac/time.h>
     27 #include <ac/unistd.h>
     28 
     29 #include "lload.h"
     30 
     31 #include "lutil.h"
     32 #include "lutil_ldap.h"
     33 
     34 #ifdef HAVE_CYRUS_SASL
     35 static const sasl_callback_t client_callbacks[] = {
     36 #ifdef SASL_CB_GETREALM
     37         { SASL_CB_GETREALM, NULL, NULL },
     38 #endif
     39         { SASL_CB_USER, NULL, NULL },
     40         { SASL_CB_AUTHNAME, NULL, NULL },
     41         { SASL_CB_PASS, NULL, NULL },
     42         { SASL_CB_LIST_END, NULL, NULL }
     43 };
     44 #endif /* HAVE_CYRUS_SASL */
     45 
     46 static void upstream_unlink( LloadConnection *upstream );
     47 
     48 int
     49 lload_upstream_entry_cmp( const void *l, const void *r )
     50 {
     51     return SLAP_PTRCMP( l, r );
     52 }
     53 
     54 static void
     55 linked_upstream_lost( LloadConnection *client )
     56 {
     57     int gentle = 1;
     58 
     59     CONNECTION_LOCK(client);
     60     assert( client->c_restricted >= LLOAD_OP_RESTRICTED_UPSTREAM );
     61     assert( client->c_linked_upstream );
     62 
     63     client->c_restricted = LLOAD_OP_NOT_RESTRICTED;
     64     client->c_linked_upstream = NULL;
     65     CONNECTION_UNLOCK(client);
     66     lload_connection_close( client, &gentle );
     67 }
     68 
     69 int
     70 forward_response( LloadConnection *client, LloadOperation *op, BerElement *ber )
     71 {
     72     BerElement *output;
     73     BerValue response, controls = BER_BVNULL;
     74     ber_int_t msgid;
     75     ber_tag_t tag, response_tag;
     76     ber_len_t len;
     77 
     78     CONNECTION_LOCK(client);
     79     if ( op->o_client_msgid ) {
     80         msgid = op->o_client_msgid;
     81     } else {
     82         assert( op->o_pin_id );
     83         msgid = op->o_saved_msgid;
     84         op->o_saved_msgid = 0;
     85     }
     86     CONNECTION_UNLOCK(client);
     87 
     88     response_tag = ber_skip_element( ber, &response );
     89 
     90     tag = ber_peek_tag( ber, &len );
     91     if ( tag == LDAP_TAG_CONTROLS ) {
     92         ber_skip_element( ber, &controls );
     93     }
     94 
     95     Debug( LDAP_DEBUG_TRACE, "forward_response: "
     96             "%s to client connid=%lu request msgid=%d\n",
     97             lload_msgtype2str( response_tag ), op->o_client_connid, msgid );
     98 
     99     checked_lock( &client->c_io_mutex );
    100     output = client->c_pendingber;
    101     if ( output == NULL && (output = ber_alloc()) == NULL ) {
    102         ber_free( ber, 1 );
    103         checked_unlock( &client->c_io_mutex );
    104         return -1;
    105     }
    106     client->c_pendingber = output;
    107 
    108     ber_printf( output, "t{titOtO}", LDAP_TAG_MESSAGE,
    109             LDAP_TAG_MSGID, msgid,
    110             response_tag, &response,
    111             LDAP_TAG_CONTROLS, BER_BV_OPTIONAL( &controls ) );
    112 
    113     checked_unlock( &client->c_io_mutex );
    114 
    115     ber_free( ber, 1 );
    116     connection_write_cb( -1, 0, client );
    117     return 0;
    118 }
    119 
    120 int
    121 forward_final_response(
    122         LloadConnection *client,
    123         LloadOperation *op,
    124         BerElement *ber )
    125 {
    126     int rc;
    127 
    128     Debug( LDAP_DEBUG_STATS, "forward_final_response: "
    129             "connid=%lu msgid=%d finishing up with a request for "
    130             "client connid=%lu\n",
    131             op->o_upstream_connid, op->o_upstream_msgid, op->o_client_connid );
    132 
    133     rc = forward_response( client, op, ber );
    134 
    135     op->o_res = LLOAD_OP_COMPLETED;
    136     if ( !op->o_pin_id ) {
    137         OPERATION_UNLINK(op);
    138     }
    139 
    140     return rc;
    141 }
    142 
    143 static int
    144 handle_unsolicited( LloadConnection *c, BerElement *ber )
    145 {
    146     CONNECTION_ASSERT_LOCKED(c);
    147 
    148     assert( c->c_state != LLOAD_C_INVALID );
    149     if ( c->c_state == LLOAD_C_DYING ) {
    150         CONNECTION_UNLOCK(c);
    151         goto out;
    152     }
    153     c->c_state = LLOAD_C_CLOSING;
    154 
    155     Debug( LDAP_DEBUG_STATS, "handle_unsolicited: "
    156             "teardown for upstream connection connid=%lu\n",
    157             c->c_connid );
    158 
    159     CONNECTION_DESTROY(c);
    160 
    161 out:
    162     ber_free( ber, 1 );
    163     return -1;
    164 }
    165 
    166 /*
    167  * Pull c->c_currentber from the connection and try to look up the operation on
    168  * the upstream.
    169  *
    170  * If it's a notice of disconnection, we won't find it and need to tear down
    171  * the connection and tell the clients, if we can't find the operation, ignore
    172  * the message (either client already disconnected/abandoned it or the upstream
    173  * is pulling our leg).
    174  *
    175  * Some responses need special handling:
    176  * - Bind response
    177  * - VC response where the client requested a Bind (both need to update the
    178  *   client's bind status)
    179  * - search entries/referrals and intermediate responses (will not trigger
    180  *   operation to be removed)
    181  *
    182  * If the worker pool is overloaded, we might be called directly from
    183  * the read callback, at that point, the connection hasn't been muted.
    184  *
    185  * TODO: when the client already has data pending on write, we should mute the
    186  * upstream.
    187  * - should record the BerElement on the Op and the Op on the client
    188  *
    189  * The following hold on entering any of the handlers:
    190  * - op->o_upstream_refcnt > 0
    191  * - op->o_upstream->c_refcnt > 0
    192  * - op->o_client->c_refcnt > 0
    193  */
    194 static int
    195 handle_one_response( LloadConnection *c )
    196 {
    197     BerElement *ber;
    198     LloadOperation *op = NULL, needle = { .o_upstream_connid = c->c_connid };
    199     LloadOperationHandler handler = NULL;
    200     ber_tag_t tag;
    201     ber_len_t len;
    202     int rc = LDAP_SUCCESS;
    203 
    204     ber = c->c_currentber;
    205     c->c_currentber = NULL;
    206 
    207     tag = ber_get_int( ber, &needle.o_upstream_msgid );
    208     if ( tag != LDAP_TAG_MSGID ) {
    209         rc = -1;
    210         ber_free( ber, 1 );
    211         goto fail;
    212     }
    213 
    214     CONNECTION_LOCK(c);
    215     if ( needle.o_upstream_msgid == 0 ) {
    216         return handle_unsolicited( c, ber );
    217     } else if ( !( op = ldap_tavl_find(
    218                            c->c_ops, &needle, operation_upstream_cmp ) ) ) {
    219         /* Already abandoned, do nothing */
    220         CONNECTION_UNLOCK(c);
    221         ber_free( ber, 1 );
    222         return rc;
    223         /*
    224     } else if ( op->o_response_pending ) {
    225         c->c_pendingop = op;
    226         event_del( c->c_read_event );
    227     */
    228     } else {
    229         CONNECTION_UNLOCK(c);
    230         /*
    231         op->o_response_pending = ber;
    232         */
    233 
    234         tag = ber_peek_tag( ber, &len );
    235         switch ( tag ) {
    236             case LDAP_RES_SEARCH_ENTRY:
    237             case LDAP_RES_SEARCH_REFERENCE:
    238             case LDAP_RES_INTERMEDIATE:
    239                 handler = forward_response;
    240                 break;
    241             case LDAP_RES_BIND:
    242                 handler = handle_bind_response;
    243                 break;
    244             case LDAP_RES_EXTENDED:
    245                 if ( op->o_tag == LDAP_REQ_BIND ) {
    246 #ifdef LDAP_API_FEATURE_VERIFY_CREDENTIALS
    247                     if ( lload_features & LLOAD_FEATURE_VC ) {
    248                         handler = handle_vc_bind_response;
    249                     } else
    250 #endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */
    251                     {
    252                         handler = handle_whoami_response;
    253                     }
    254                 }
    255                 break;
    256         }
    257         if ( !handler ) {
    258             handler = forward_final_response;
    259         }
    260     }
    261     if ( op ) {
    262         struct timeval tv, tvdiff;
    263         uintptr_t diff;
    264 
    265         gettimeofday( &tv, NULL );
    266         if ( !timerisset( &op->o_last_response ) ) {
    267             LloadBackend *b = c->c_backend;
    268 
    269             timersub( &tv, &op->o_start, &tvdiff );
    270             diff = 1000000 * tvdiff.tv_sec + tvdiff.tv_usec;
    271 
    272             __atomic_add_fetch( &b->b_operation_count, 1, __ATOMIC_RELAXED );
    273             __atomic_add_fetch( &b->b_operation_time, diff, __ATOMIC_RELAXED );
    274         }
    275         op->o_last_response = tv;
    276 
    277         Debug( LDAP_DEBUG_STATS2, "handle_one_response: "
    278                 "upstream connid=%lu, processing response for "
    279                 "client connid=%lu, msgid=%d\n",
    280                 c->c_connid, op->o_client_connid, op->o_client_msgid );
    281     } else {
    282         tag = ber_peek_tag( ber, &len );
    283         Debug( LDAP_DEBUG_STATS2, "handle_one_response: "
    284                 "upstream connid=%lu, %s, msgid=%d not for a pending "
    285                 "operation\n",
    286                 c->c_connid, lload_msgtype2str( tag ),
    287                 needle.o_upstream_msgid );
    288     }
    289 
    290     if ( handler ) {
    291         LloadConnection *client;
    292 
    293         checked_lock( &op->o_link_mutex );
    294         client = op->o_client;
    295         checked_unlock( &op->o_link_mutex );
    296         if ( client && IS_ALIVE( client, c_live ) ) {
    297             rc = handler( client, op, ber );
    298         } else {
    299             ber_free( ber, 1 );
    300         }
    301     } else {
    302         assert(0);
    303         ber_free( ber, 1 );
    304     }
    305 
    306 fail:
    307     if ( rc ) {
    308         Debug( LDAP_DEBUG_STATS, "handle_one_response: "
    309                 "error on processing a response (%s) on upstream connection "
    310                 "connid=%ld, tag=%lx\n",
    311                 lload_msgtype2str( tag ), c->c_connid, tag );
    312         CONNECTION_LOCK_DESTROY(c);
    313     }
    314     return rc;
    315 }
    316 
    317 #ifdef HAVE_CYRUS_SASL
    318 static int
    319 sasl_bind_step( LloadConnection *c, BerValue *scred, BerValue *ccred )
    320 {
    321     LloadBackend *b = c->c_backend;
    322     sasl_conn_t *ctx = c->c_sasl_authctx;
    323     sasl_interact_t *prompts = NULL;
    324     unsigned credlen;
    325     int rc = -1;
    326 
    327     if ( !ctx ) {
    328         const char *mech = NULL;
    329 #ifdef HAVE_TLS
    330         void *ssl;
    331 #endif /* HAVE_TLS */
    332 
    333         if ( sasl_client_new( "ldap", b->b_host, NULL, NULL, client_callbacks,
    334                      0, &ctx ) != SASL_OK ) {
    335             goto done;
    336         }
    337         c->c_sasl_authctx = ctx;
    338 
    339         assert( c->c_sasl_defaults == NULL );
    340         c->c_sasl_defaults =
    341                 lutil_sasl_defaults( NULL, bindconf.sb_saslmech.bv_val,
    342                         bindconf.sb_realm.bv_val, bindconf.sb_authcId.bv_val,
    343                         bindconf.sb_cred.bv_val, bindconf.sb_authzId.bv_val );
    344 
    345 #ifdef HAVE_TLS
    346         /* Check for TLS */
    347         ssl = ldap_pvt_tls_sb_ctx( c->c_sb );
    348         if ( ssl ) {
    349             struct berval authid = BER_BVNULL;
    350             ber_len_t ssf;
    351 
    352             ssf = ldap_pvt_tls_get_strength( ssl );
    353             (void)ldap_pvt_tls_get_my_dn( ssl, &authid, NULL, 0 );
    354 
    355             sasl_setprop( ctx, SASL_SSF_EXTERNAL, &ssf );
    356             sasl_setprop( ctx, SASL_AUTH_EXTERNAL, authid.bv_val );
    357             ch_free( authid.bv_val );
    358 #ifdef SASL_CHANNEL_BINDING /* 2.1.25+ */
    359             {
    360                 char cbinding[64];
    361                 struct berval cbv = { sizeof(cbinding), cbinding };
    362                 if ( ldap_pvt_tls_get_unique( ssl, &cbv, 0 ) ) {
    363                     sasl_channel_binding_t *cb =
    364                             ch_malloc( sizeof(*cb) + cbv.bv_len );
    365                     void *cb_data;
    366                     cb->name = "ldap";
    367                     cb->critical = 0;
    368                     cb->len = cbv.bv_len;
    369                     cb->data = cb_data = cb + 1;
    370                     memcpy( cb_data, cbv.bv_val, cbv.bv_len );
    371                     sasl_setprop( ctx, SASL_CHANNEL_BINDING, cb );
    372                     c->c_sasl_cbinding = cb;
    373                 }
    374             }
    375 #endif
    376         }
    377 #endif
    378 
    379 #if !defined(_WIN32)
    380         /* Check for local */
    381         if ( b->b_proto == LDAP_PROTO_IPC ) {
    382             char authid[sizeof( "gidNumber=4294967295+uidNumber=4294967295,"
    383                                 "cn=peercred,cn=external,cn=auth" )];
    384             int ssf = LDAP_PVT_SASL_LOCAL_SSF;
    385 
    386             sprintf( authid,
    387                     "gidNumber=%u+uidNumber=%u,"
    388                     "cn=peercred,cn=external,cn=auth",
    389                     getegid(), geteuid() );
    390             sasl_setprop( ctx, SASL_SSF_EXTERNAL, &ssf );
    391             sasl_setprop( ctx, SASL_AUTH_EXTERNAL, authid );
    392         }
    393 #endif
    394 
    395         do {
    396             rc = sasl_client_start( ctx, bindconf.sb_saslmech.bv_val,
    397                     &prompts,
    398                     (const char **)&ccred->bv_val, &credlen,
    399                     &mech );
    400 
    401             if ( rc == SASL_INTERACT ) {
    402                 if ( lutil_sasl_interact( NULL, LDAP_SASL_QUIET,
    403                              c->c_sasl_defaults, prompts ) ) {
    404                     break;
    405                 }
    406             }
    407         } while ( rc == SASL_INTERACT );
    408 
    409         ber_str2bv( mech, 0, 0, &c->c_sasl_bind_mech );
    410     } else {
    411         assert( c->c_sasl_defaults );
    412 
    413         do {
    414             rc = sasl_client_step( ctx,
    415                     (scred == NULL) ? NULL : scred->bv_val,
    416                     (scred == NULL) ? 0 : scred->bv_len,
    417                     &prompts,
    418                     (const char **)&ccred->bv_val, &credlen);
    419 
    420             if ( rc == SASL_INTERACT ) {
    421                 if ( lutil_sasl_interact( NULL, LDAP_SASL_QUIET,
    422                              c->c_sasl_defaults, prompts ) ) {
    423                     break;
    424                 }
    425             }
    426         } while ( rc == SASL_INTERACT );
    427     }
    428 
    429     if ( rc == SASL_OK ) {
    430         sasl_ssf_t *ssf;
    431         rc = sasl_getprop( ctx, SASL_SSF, (const void **)(char *)&ssf );
    432         if ( rc == SASL_OK && ssf && *ssf ) {
    433             Debug( LDAP_DEBUG_CONNS, "sasl_bind_step: "
    434                     "connid=%lu mech=%s setting up a new SASL security layer\n",
    435                     c->c_connid, c->c_sasl_bind_mech.bv_val );
    436             ldap_pvt_sasl_install( c->c_sb, ctx );
    437         }
    438     }
    439     ccred->bv_len = credlen;
    440 
    441 done:
    442     Debug( LDAP_DEBUG_TRACE, "sasl_bind_step: "
    443             "connid=%lu next step for SASL bind mech=%s rc=%d\n",
    444             c->c_connid, c->c_sasl_bind_mech.bv_val, rc );
    445     return rc;
    446 }
    447 #endif /* HAVE_CYRUS_SASL */
    448 
    449 int
    450 upstream_bind_cb( LloadConnection *c )
    451 {
    452     BerElement *ber = c->c_currentber;
    453     LloadBackend *b = c->c_backend;
    454     BerValue matcheddn, message;
    455     ber_tag_t tag;
    456     ber_int_t msgid, result;
    457 
    458     c->c_currentber = NULL;
    459 
    460     if ( ber_scanf( ber, "it", &msgid, &tag ) == LBER_ERROR ) {
    461         Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: "
    462                 "protocol violation from server\n" );
    463         goto fail;
    464     }
    465 
    466     if ( msgid != ( c->c_next_msgid - 1 ) || tag != LDAP_RES_BIND ) {
    467         Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: "
    468                 "unexpected %s from server, msgid=%d\n",
    469                 lload_msgtype2str( tag ), msgid );
    470         goto fail;
    471     }
    472 
    473     if ( ber_scanf( ber, "{emm" /* "}" */, &result, &matcheddn, &message ) ==
    474                  LBER_ERROR ) {
    475         Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: "
    476                 "response does not conform with a bind response\n" );
    477         goto fail;
    478     }
    479 
    480     switch ( result ) {
    481         case LDAP_SUCCESS:
    482 #ifdef HAVE_CYRUS_SASL
    483         case LDAP_SASL_BIND_IN_PROGRESS:
    484             if ( !BER_BVISNULL( &c->c_sasl_bind_mech ) ) {
    485                 BerValue scred = BER_BVNULL, ccred;
    486                 ber_len_t len;
    487                 int rc;
    488 
    489                 if ( ber_peek_tag( ber, &len ) == LDAP_TAG_SASL_RES_CREDS &&
    490                         ber_scanf( ber, "m", &scred ) == LBER_ERROR ) {
    491                     Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: "
    492                             "sasl bind response malformed\n" );
    493                     goto fail;
    494                 }
    495 
    496                 rc = sasl_bind_step( c, &scred, &ccred );
    497                 if ( rc != SASL_OK &&
    498                         ( rc != SASL_CONTINUE || result == LDAP_SUCCESS ) ) {
    499                     goto fail;
    500                 }
    501 
    502                 if ( result == LDAP_SASL_BIND_IN_PROGRESS ) {
    503                     BerElement *outber;
    504 
    505                     checked_lock( &c->c_io_mutex );
    506                     outber = c->c_pendingber;
    507                     if ( outber == NULL && (outber = ber_alloc()) == NULL ) {
    508                         checked_unlock( &c->c_io_mutex );
    509                         goto fail;
    510                     }
    511                     c->c_pendingber = outber;
    512 
    513                     msgid = c->c_next_msgid++;
    514                     ber_printf( outber, "{it{iOt{OON}N}}",
    515                             msgid, LDAP_REQ_BIND, LDAP_VERSION3,
    516                             &bindconf.sb_binddn, LDAP_AUTH_SASL,
    517                             &c->c_sasl_bind_mech, BER_BV_OPTIONAL( &ccred ) );
    518                     checked_unlock( &c->c_io_mutex );
    519 
    520                     connection_write_cb( -1, 0, c );
    521 
    522                     if ( rc == SASL_OK ) {
    523                         BER_BVZERO( &c->c_sasl_bind_mech );
    524                     }
    525                     break;
    526                 }
    527             }
    528             if ( result == LDAP_SASL_BIND_IN_PROGRESS ) {
    529                 goto fail;
    530             }
    531 #endif /* HAVE_CYRUS_SASL */
    532             CONNECTION_LOCK(c);
    533             c->c_pdu_cb = handle_one_response;
    534             c->c_state = LLOAD_C_READY;
    535             c->c_type = LLOAD_C_OPEN;
    536             c->c_read_timeout = NULL;
    537             Debug( LDAP_DEBUG_CONNS, "upstream_bind_cb: "
    538                     "connection connid=%lu for backend server '%s' is ready "
    539                     "for use\n",
    540                     c->c_connid, b->b_name.bv_val );
    541             CONNECTION_UNLOCK(c);
    542             checked_lock( &b->b_mutex );
    543             LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next );
    544             b->b_active++;
    545             b->b_opening--;
    546             b->b_failed = 0;
    547             if ( b->b_last_conn ) {
    548                 LDAP_CIRCLEQ_INSERT_AFTER(
    549                         &b->b_conns, b->b_last_conn, c, c_next );
    550             } else {
    551                 LDAP_CIRCLEQ_INSERT_HEAD( &b->b_conns, c, c_next );
    552             }
    553             b->b_last_conn = c;
    554             backend_retry( b );
    555             checked_unlock( &b->b_mutex );
    556             break;
    557         default:
    558             Debug( LDAP_DEBUG_ANY, "upstream_bind_cb: "
    559                     "upstream bind failed, rc=%d, message='%s'\n",
    560                     result, message.bv_val );
    561             goto fail;
    562     }
    563 
    564     checked_lock( &c->c_io_mutex );
    565     c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
    566     checked_unlock( &c->c_io_mutex );
    567     event_add( c->c_read_event, c->c_read_timeout );
    568     ber_free( ber, 1 );
    569     return -1;
    570 
    571 fail:
    572     CONNECTION_LOCK_DESTROY(c);
    573     ber_free( ber, 1 );
    574     return -1;
    575 }
    576 
    577 void *
    578 upstream_bind( void *ctx, void *arg )
    579 {
    580     LloadConnection *c = arg;
    581     BerElement *ber;
    582     ber_int_t msgid;
    583 
    584     /* A reference was passed on to us */
    585     assert( IS_ALIVE( c, c_refcnt ) );
    586 
    587     if ( !IS_ALIVE( c, c_live ) ) {
    588         RELEASE_REF( c, c_refcnt, c->c_destroy );
    589         return NULL;
    590     }
    591 
    592     CONNECTION_LOCK(c);
    593     assert( !event_pending( c->c_read_event, EV_READ, NULL ) );
    594     c->c_pdu_cb = upstream_bind_cb;
    595     CONNECTION_UNLOCK(c);
    596 
    597     checked_lock( &c->c_io_mutex );
    598     ber = c->c_pendingber;
    599     if ( ber == NULL && (ber = ber_alloc()) == NULL ) {
    600         goto fail;
    601     }
    602     c->c_pendingber = ber;
    603     msgid = c->c_next_msgid++;
    604 
    605     if ( bindconf.sb_method == LDAP_AUTH_SIMPLE ) {
    606         /* simple bind */
    607         ber_printf( ber, "{it{iOtON}}",
    608                 msgid, LDAP_REQ_BIND, LDAP_VERSION3,
    609                 &bindconf.sb_binddn, LDAP_AUTH_SIMPLE,
    610                 &bindconf.sb_cred );
    611 
    612 #ifdef HAVE_CYRUS_SASL
    613     } else {
    614         BerValue cred;
    615         int rc;
    616 
    617         rc = sasl_bind_step( c, NULL, &cred );
    618         if ( rc != SASL_OK && rc != SASL_CONTINUE ) {
    619             goto fail;
    620         }
    621 
    622         ber_printf( ber, "{it{iOt{OON}N}}",
    623                 msgid, LDAP_REQ_BIND, LDAP_VERSION3,
    624                 &bindconf.sb_binddn, LDAP_AUTH_SASL,
    625                 &c->c_sasl_bind_mech, BER_BV_OPTIONAL( &cred ) );
    626 
    627         if ( rc == SASL_OK ) {
    628             BER_BVZERO( &c->c_sasl_bind_mech );
    629         }
    630 #endif /* HAVE_CYRUS_SASL */
    631     }
    632     /* TODO: can we be paused at this point? Then we'd have to move this line
    633      * after connection_write_cb */
    634     c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
    635     checked_unlock( &c->c_io_mutex );
    636 
    637     connection_write_cb( -1, 0, c );
    638 
    639     CONNECTION_LOCK(c);
    640     c->c_read_timeout = lload_timeout_net;
    641     event_add( c->c_read_event, c->c_read_timeout );
    642     CONNECTION_UNLOCK(c);
    643 
    644     RELEASE_REF( c, c_refcnt, c->c_destroy );
    645     return NULL;
    646 
    647 fail:
    648     checked_unlock( &c->c_io_mutex );
    649     CONNECTION_LOCK_DESTROY(c);
    650     RELEASE_REF( c, c_refcnt, c->c_destroy );
    651     return NULL;
    652 }
    653 
    654 /*
    655  * The backend is already locked when entering the function.
    656  */
    657 static int
    658 upstream_finish( LloadConnection *c )
    659 {
    660     LloadBackend *b = c->c_backend;
    661     int is_bindconn = 0;
    662 
    663     assert_locked( &b->b_mutex );
    664     CONNECTION_ASSERT_LOCKED(c);
    665     assert( c->c_live );
    666     c->c_pdu_cb = handle_one_response;
    667 
    668     /* Unless we are configured to use the VC exop, consider allocating the
    669      * connection into the bind conn pool. Start off by allocating one for
    670      * general use, then one for binds, then we start filling up the general
    671      * connection pool, finally the bind pool */
    672     if (
    673 #ifdef LDAP_API_FEATURE_VERIFY_CREDENTIALS
    674             !(lload_features & LLOAD_FEATURE_VC) &&
    675 #endif /* LDAP_API_FEATURE_VERIFY_CREDENTIALS */
    676             b->b_active && b->b_numbindconns ) {
    677         if ( !b->b_bindavail ) {
    678             is_bindconn = 1;
    679         } else if ( b->b_active >= b->b_numconns &&
    680                 b->b_bindavail < b->b_numbindconns ) {
    681             is_bindconn = 1;
    682         }
    683     }
    684 
    685     if ( is_bindconn ) {
    686         LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next );
    687         c->c_state = LLOAD_C_READY;
    688         c->c_type = LLOAD_C_BIND;
    689         b->b_bindavail++;
    690         b->b_opening--;
    691         b->b_failed = 0;
    692         if ( b->b_last_bindconn ) {
    693             LDAP_CIRCLEQ_INSERT_AFTER(
    694                     &b->b_bindconns, b->b_last_bindconn, c, c_next );
    695         } else {
    696             LDAP_CIRCLEQ_INSERT_HEAD( &b->b_bindconns, c, c_next );
    697         }
    698         b->b_last_bindconn = c;
    699     } else if ( bindconf.sb_method == LDAP_AUTH_NONE ) {
    700         LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next );
    701         c->c_state = LLOAD_C_READY;
    702         c->c_type = LLOAD_C_OPEN;
    703         b->b_active++;
    704         b->b_opening--;
    705         b->b_failed = 0;
    706         if ( b->b_last_conn ) {
    707             LDAP_CIRCLEQ_INSERT_AFTER( &b->b_conns, b->b_last_conn, c, c_next );
    708         } else {
    709             LDAP_CIRCLEQ_INSERT_HEAD( &b->b_conns, c, c_next );
    710         }
    711         b->b_last_conn = c;
    712     } else {
    713         if ( ldap_pvt_thread_pool_submit(
    714                      &connection_pool, upstream_bind, c ) ) {
    715             Debug( LDAP_DEBUG_ANY, "upstream_finish: "
    716                     "failed to set up a bind callback for connid=%lu\n",
    717                     c->c_connid );
    718             return -1;
    719         }
    720         /* keep a reference for upstream_bind */
    721         acquire_ref( &c->c_refcnt );
    722 
    723         Debug( LDAP_DEBUG_CONNS, "upstream_finish: "
    724                 "scheduled a bind callback for connid=%lu\n",
    725                 c->c_connid );
    726         return LDAP_SUCCESS;
    727     }
    728     event_add( c->c_read_event, c->c_read_timeout );
    729 
    730     Debug( LDAP_DEBUG_CONNS, "upstream_finish: "
    731             "%sconnection connid=%lu for backend server '%s' is ready for "
    732             "use\n",
    733             is_bindconn ? "bind " : "", c->c_connid, b->b_name.bv_val );
    734 
    735     backend_retry( b );
    736     return LDAP_SUCCESS;
    737 }
    738 
    739 #ifdef HAVE_TLS
    740 static void
    741 upstream_tls_handshake_cb( evutil_socket_t s, short what, void *arg )
    742 {
    743     LloadConnection *c = arg;
    744     LloadBackend *b;
    745     epoch_t epoch;
    746     int rc = LDAP_SUCCESS;
    747 
    748     CONNECTION_LOCK(c);
    749     if ( what & EV_TIMEOUT ) {
    750         Debug( LDAP_DEBUG_CONNS, "upstream_tls_handshake_cb: "
    751                 "connid=%lu, timeout reached, destroying\n",
    752                 c->c_connid );
    753         goto fail;
    754     }
    755     b = c->c_backend;
    756 
    757     rc = ldap_pvt_tls_connect( lload_tls_backend_ld, c->c_sb, b->b_host );
    758     if ( rc < 0 ) {
    759         goto fail;
    760     }
    761 
    762     if ( rc == 0 ) {
    763         struct event_base *base = event_get_base( c->c_read_event );
    764 
    765         /*
    766          * We're finished, replace the callbacks
    767          *
    768          * This is deadlock-safe, since both share the same base - the one
    769          * that's just running us.
    770          */
    771         event_del( c->c_read_event );
    772         event_del( c->c_write_event );
    773 
    774         c->c_read_timeout = NULL;
    775         event_assign( c->c_read_event, base, c->c_fd, EV_READ|EV_PERSIST,
    776                 connection_read_cb, c );
    777         event_assign( c->c_write_event, base, c->c_fd, EV_WRITE,
    778                 connection_write_cb, c );
    779         Debug( LDAP_DEBUG_CONNS, "upstream_tls_handshake_cb: "
    780                 "connid=%lu finished\n",
    781                 c->c_connid );
    782         c->c_is_tls = LLOAD_TLS_ESTABLISHED;
    783 
    784         CONNECTION_UNLOCK(c);
    785         checked_lock( &b->b_mutex );
    786         CONNECTION_LOCK(c);
    787 
    788         rc = upstream_finish( c );
    789         checked_unlock( &b->b_mutex );
    790 
    791         if ( rc ) {
    792             goto fail;
    793         }
    794     } else if ( ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_NEEDS_WRITE, NULL ) ) {
    795         event_add( c->c_write_event, lload_write_timeout );
    796         Debug( LDAP_DEBUG_CONNS, "upstream_tls_handshake_cb: "
    797                 "connid=%lu need write rc=%d\n",
    798                 c->c_connid, rc );
    799     }
    800     CONNECTION_UNLOCK(c);
    801     return;
    802 
    803 fail:
    804     Debug( LDAP_DEBUG_CONNS, "upstream_tls_handshake_cb: "
    805             "connid=%lu failed rc=%d\n",
    806             c->c_connid, rc );
    807 
    808     assert( c->c_ops == NULL );
    809     epoch = epoch_join();
    810     CONNECTION_DESTROY(c);
    811     epoch_leave( epoch );
    812 }
    813 
    814 static int
    815 upstream_starttls( LloadConnection *c )
    816 {
    817     BerValue matcheddn, message, responseOid,
    818              startTLSOid = BER_BVC(LDAP_EXOP_START_TLS);
    819     BerElement *ber = c->c_currentber;
    820     struct event_base *base;
    821     ber_int_t msgid, result;
    822     ber_tag_t tag;
    823 
    824     c->c_currentber = NULL;
    825     CONNECTION_LOCK(c);
    826 
    827     if ( ber_scanf( ber, "it", &msgid, &tag ) == LBER_ERROR ) {
    828         Debug( LDAP_DEBUG_ANY, "upstream_starttls: "
    829                 "protocol violation from server\n" );
    830         goto fail;
    831     }
    832 
    833     if ( msgid != ( c->c_next_msgid - 1 ) || tag != LDAP_RES_EXTENDED ) {
    834         Debug( LDAP_DEBUG_ANY, "upstream_starttls: "
    835                 "unexpected %s from server, msgid=%d\n",
    836                 lload_msgtype2str( tag ), msgid );
    837         goto fail;
    838     }
    839 
    840     if ( ber_scanf( ber, "{emm}", &result, &matcheddn, &message ) ==
    841                  LBER_ERROR ) {
    842         Debug( LDAP_DEBUG_ANY, "upstream_starttls: "
    843                 "protocol violation on StartTLS response\n" );
    844         goto fail;
    845     }
    846 
    847     if ( (tag = ber_get_tag( ber )) != LBER_DEFAULT ) {
    848         if ( tag != LDAP_TAG_EXOP_RES_OID ||
    849                 ber_scanf( ber, "{m}", &responseOid ) == LBER_DEFAULT ) {
    850             Debug( LDAP_DEBUG_ANY, "upstream_starttls: "
    851                     "protocol violation on StartTLS response\n" );
    852             goto fail;
    853         }
    854 
    855         if ( ber_bvcmp( &responseOid, &startTLSOid ) ) {
    856             Debug( LDAP_DEBUG_ANY, "upstream_starttls: "
    857                     "oid=%s not a StartTLS response\n",
    858                     responseOid.bv_val );
    859             goto fail;
    860         }
    861     }
    862 
    863     if ( result != LDAP_SUCCESS ) {
    864         LloadBackend *b = c->c_backend;
    865         int rc;
    866 
    867         Debug( LDAP_DEBUG_STATS, "upstream_starttls: "
    868                 "server doesn't support StartTLS rc=%d message='%s'%s\n",
    869                 result, message.bv_val,
    870                 (c->c_is_tls == LLOAD_STARTTLS_OPTIONAL) ? ", ignored" : "" );
    871         if ( c->c_is_tls != LLOAD_STARTTLS_OPTIONAL ) {
    872             goto fail;
    873         }
    874         c->c_is_tls = LLOAD_CLEARTEXT;
    875 
    876         CONNECTION_UNLOCK(c);
    877         checked_lock( &b->b_mutex );
    878         CONNECTION_LOCK(c);
    879 
    880         rc = upstream_finish( c );
    881         checked_unlock( &b->b_mutex );
    882 
    883         if ( rc ) {
    884             goto fail;
    885         }
    886 
    887         ber_free( ber, 1 );
    888         CONNECTION_UNLOCK(c);
    889 
    890         checked_lock( &c->c_io_mutex );
    891         c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
    892         checked_unlock( &c->c_io_mutex );
    893 
    894         /* Do not keep handle_pdus running, we have adjusted c_read_event as we
    895          * need it. */
    896         return -1;
    897     }
    898 
    899     base = event_get_base( c->c_read_event );
    900 
    901     c->c_io_state &= ~LLOAD_C_READ_HANDOVER;
    902     event_del( c->c_read_event );
    903     event_del( c->c_write_event );
    904 
    905     c->c_read_timeout = lload_timeout_net;
    906     event_assign( c->c_read_event, base, c->c_fd, EV_READ|EV_PERSIST,
    907             upstream_tls_handshake_cb, c );
    908     event_assign( c->c_write_event, base, c->c_fd, EV_WRITE,
    909             upstream_tls_handshake_cb, c );
    910 
    911     event_add( c->c_read_event, c->c_read_timeout );
    912     event_add( c->c_write_event, lload_write_timeout );
    913 
    914     CONNECTION_UNLOCK(c);
    915 
    916     ber_free( ber, 1 );
    917     return -1;
    918 
    919 fail:
    920     ber_free( ber, 1 );
    921     CONNECTION_DESTROY(c);
    922     return -1;
    923 }
    924 #endif /* HAVE_TLS */
    925 
    926 /*
    927  * We must already hold b->b_mutex when called.
    928  */
    929 LloadConnection *
    930 upstream_init( ber_socket_t s, LloadBackend *b )
    931 {
    932     LloadConnection *c;
    933     struct event_base *base = lload_get_base( s );
    934     struct event *event;
    935     int flags;
    936 
    937     assert( b != NULL );
    938 
    939     flags = (b->b_proto == LDAP_PROTO_IPC) ? CONN_IS_IPC : 0;
    940     if ( (c = lload_connection_init( s, b->b_host, flags )) == NULL ) {
    941         return NULL;
    942     }
    943 
    944     CONNECTION_LOCK(c);
    945     c->c_backend = b;
    946 #ifdef HAVE_TLS
    947     c->c_is_tls = b->b_tls;
    948 #endif
    949     c->c_pdu_cb = handle_one_response;
    950 
    951     LDAP_CIRCLEQ_INSERT_HEAD( &b->b_preparing, c, c_next );
    952     c->c_type = LLOAD_C_PREPARING;
    953 
    954     {
    955         ber_len_t max = sockbuf_max_incoming_upstream;
    956         ber_sockbuf_ctrl( c->c_sb, LBER_SB_OPT_SET_MAX_INCOMING, &max );
    957     }
    958 
    959     event = event_new( base, s, EV_READ|EV_PERSIST, connection_read_cb, c );
    960     if ( !event ) {
    961         Debug( LDAP_DEBUG_ANY, "upstream_init: "
    962                 "Read event could not be allocated\n" );
    963         goto fail;
    964     }
    965     c->c_read_event = event;
    966 
    967     event = event_new( base, s, EV_WRITE, connection_write_cb, c );
    968     if ( !event ) {
    969         Debug( LDAP_DEBUG_ANY, "upstream_init: "
    970                 "Write event could not be allocated\n" );
    971         goto fail;
    972     }
    973     /* We only add the write event when we have data pending */
    974     c->c_write_event = event;
    975 
    976 #ifdef BALANCER_MODULE
    977     if ( b->b_monitor ) {
    978         acquire_ref( &c->c_refcnt );
    979         CONNECTION_UNLOCK(c);
    980         checked_unlock( &b->b_mutex );
    981         if ( lload_monitor_conn_entry_create( c, b->b_monitor ) ) {
    982             RELEASE_REF( c, c_refcnt, c->c_destroy );
    983             checked_lock( &b->b_mutex );
    984             CONNECTION_LOCK(c);
    985             goto fail;
    986         }
    987         checked_lock( &b->b_mutex );
    988         CONNECTION_LOCK(c);
    989         RELEASE_REF( c, c_refcnt, c->c_destroy );
    990     }
    991 #endif /* BALANCER_MODULE */
    992 
    993     c->c_destroy = upstream_destroy;
    994     c->c_unlink = upstream_unlink;
    995 
    996 #ifdef HAVE_TLS
    997     if ( c->c_is_tls == LLOAD_CLEARTEXT ) {
    998 #endif /* HAVE_TLS */
    999         if ( upstream_finish( c ) ) {
   1000             goto fail;
   1001         }
   1002 #ifdef HAVE_TLS
   1003     } else if ( c->c_is_tls == LLOAD_LDAPS ) {
   1004         event_assign( c->c_read_event, base, s, EV_READ|EV_PERSIST,
   1005                 upstream_tls_handshake_cb, c );
   1006         event_add( c->c_read_event, c->c_read_timeout );
   1007         event_assign( c->c_write_event, base, s, EV_WRITE,
   1008                 upstream_tls_handshake_cb, c );
   1009         event_add( c->c_write_event, lload_write_timeout );
   1010     } else if ( c->c_is_tls == LLOAD_STARTTLS ||
   1011             c->c_is_tls == LLOAD_STARTTLS_OPTIONAL ) {
   1012         BerElement *output;
   1013 
   1014         checked_lock( &c->c_io_mutex );
   1015         if ( (output = c->c_pendingber = ber_alloc()) == NULL ) {
   1016             checked_unlock( &c->c_io_mutex );
   1017             goto fail;
   1018         }
   1019         ber_printf( output, "t{tit{ts}}", LDAP_TAG_MESSAGE,
   1020                 LDAP_TAG_MSGID, c->c_next_msgid++,
   1021                 LDAP_REQ_EXTENDED,
   1022                 LDAP_TAG_EXOP_REQ_OID, LDAP_EXOP_START_TLS );
   1023         checked_unlock( &c->c_io_mutex );
   1024 
   1025         c->c_pdu_cb = upstream_starttls;
   1026         CONNECTION_UNLOCK(c);
   1027         connection_write_cb( s, 0, c );
   1028         CONNECTION_LOCK(c);
   1029         if ( IS_ALIVE( c, c_live ) ) {
   1030             event_add( c->c_read_event, c->c_read_timeout );
   1031         }
   1032     }
   1033 #endif /* HAVE_TLS */
   1034     CONNECTION_UNLOCK(c);
   1035 
   1036     return c;
   1037 
   1038 fail:
   1039     if ( !IS_ALIVE( c, c_live ) ) {
   1040         /*
   1041          * Released while we were unlocked, it's scheduled for destruction
   1042          * already
   1043          */
   1044         return NULL;
   1045     }
   1046 
   1047     if ( c->c_write_event ) {
   1048         event_del( c->c_write_event );
   1049         event_free( c->c_write_event );
   1050     }
   1051     if ( c->c_read_event ) {
   1052         event_del( c->c_read_event );
   1053         event_free( c->c_read_event );
   1054     }
   1055 
   1056     c->c_state = LLOAD_C_INVALID;
   1057     c->c_live--;
   1058     c->c_refcnt--;
   1059     connection_destroy( c );
   1060 
   1061     return NULL;
   1062 }
   1063 
   1064 static void
   1065 upstream_unlink( LloadConnection *c )
   1066 {
   1067     LloadBackend *b = c->c_backend;
   1068     struct event *read_event, *write_event;
   1069     TAvlnode *root, *linked_root;
   1070     long freed, executing;
   1071 
   1072     Debug( LDAP_DEBUG_CONNS, "upstream_unlink: "
   1073             "removing upstream connid=%lu\n",
   1074             c->c_connid );
   1075     CONNECTION_ASSERT_LOCKED(c);
   1076 
   1077     assert( c->c_state != LLOAD_C_INVALID );
   1078     assert( c->c_state != LLOAD_C_DYING );
   1079 
   1080     c->c_state = LLOAD_C_DYING;
   1081 
   1082     read_event = c->c_read_event;
   1083     write_event = c->c_write_event;
   1084 
   1085     root = c->c_ops;
   1086     c->c_ops = NULL;
   1087     executing = c->c_n_ops_executing;
   1088     c->c_n_ops_executing = 0;
   1089 
   1090     linked_root = c->c_linked;
   1091     c->c_linked = NULL;
   1092 
   1093     CONNECTION_UNLOCK(c);
   1094 
   1095     freed = ldap_tavl_free( root, (AVL_FREE)operation_lost_upstream );
   1096     assert( freed == executing );
   1097 
   1098     ldap_tavl_free( linked_root, (AVL_FREE)linked_upstream_lost );
   1099 
   1100     /*
   1101      * Avoid a deadlock:
   1102      * event_del will block if the event is currently executing its callback,
   1103      * that callback might be waiting to lock c->c_mutex
   1104      */
   1105     if ( read_event ) {
   1106         event_del( read_event );
   1107     }
   1108 
   1109     if ( write_event ) {
   1110         event_del( write_event );
   1111     }
   1112 
   1113     checked_lock( &b->b_mutex );
   1114     if ( c->c_type == LLOAD_C_PREPARING ) {
   1115         LDAP_CIRCLEQ_REMOVE( &b->b_preparing, c, c_next );
   1116         b->b_opening--;
   1117         b->b_failed++;
   1118     } else if ( c->c_type == LLOAD_C_BIND ) {
   1119         if ( c == b->b_last_bindconn ) {
   1120             LloadConnection *prev =
   1121                     LDAP_CIRCLEQ_LOOP_PREV( &b->b_bindconns, c, c_next );
   1122             if ( prev == c ) {
   1123                 b->b_last_bindconn = NULL;
   1124             } else {
   1125                 b->b_last_bindconn = prev;
   1126             }
   1127         }
   1128         LDAP_CIRCLEQ_REMOVE( &b->b_bindconns, c, c_next );
   1129         b->b_bindavail--;
   1130     } else {
   1131         if ( c == b->b_last_conn ) {
   1132             LloadConnection *prev =
   1133                     LDAP_CIRCLEQ_LOOP_PREV( &b->b_conns, c, c_next );
   1134             if ( prev == c ) {
   1135                 b->b_last_conn = NULL;
   1136             } else {
   1137                 b->b_last_conn = prev;
   1138             }
   1139         }
   1140         LDAP_CIRCLEQ_REMOVE( &b->b_conns, c, c_next );
   1141         b->b_active--;
   1142     }
   1143     b->b_n_ops_executing -= executing;
   1144     backend_retry( b );
   1145     checked_unlock( &b->b_mutex );
   1146 
   1147     CONNECTION_LOCK(c);
   1148     CONNECTION_ASSERT_LOCKED(c);
   1149 }
   1150 
   1151 void
   1152 upstream_destroy( LloadConnection *c )
   1153 {
   1154     Debug( LDAP_DEBUG_CONNS, "upstream_destroy: "
   1155             "freeing connection connid=%lu\n",
   1156             c->c_connid );
   1157 
   1158     CONNECTION_LOCK(c);
   1159     assert( c->c_state == LLOAD_C_DYING );
   1160 
   1161 #ifdef BALANCER_MODULE
   1162     /*
   1163      * Can't do this in upstream_unlink as that could be run from cn=monitor
   1164      * modify callback.
   1165      */
   1166     if ( !BER_BVISNULL( &c->c_monitor_dn ) ) {
   1167         lload_monitor_conn_unlink( c );
   1168     }
   1169 #endif /* BALANCER_MODULE */
   1170 
   1171     c->c_state = LLOAD_C_INVALID;
   1172 
   1173     assert( c->c_ops == NULL );
   1174 
   1175     if ( c->c_read_event ) {
   1176         event_free( c->c_read_event );
   1177         c->c_read_event = NULL;
   1178     }
   1179 
   1180     if ( c->c_write_event ) {
   1181         event_free( c->c_write_event );
   1182         c->c_write_event = NULL;
   1183     }
   1184 
   1185     if ( c->c_type != LLOAD_C_BIND ) {
   1186         BER_BVZERO( &c->c_sasl_bind_mech );
   1187     }
   1188     connection_destroy( c );
   1189 }
   1190