1 /* $NetBSD: syncprov.c,v 1.4 2025/09/05 21:16:32 christos Exp $ */ 2 3 /* $OpenLDAP$ */ 4 /* syncprov.c - syncrepl provider */ 5 /* This work is part of OpenLDAP Software <http://www.openldap.org/>. 6 * 7 * Copyright 2004-2024 The OpenLDAP Foundation. 8 * All rights reserved. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted only as authorized by the OpenLDAP 12 * Public License. 13 * 14 * A copy of this license is available in the file LICENSE in the 15 * top-level directory of the distribution or, alternatively, at 16 * <http://www.OpenLDAP.org/license.html>. 17 */ 18 /* ACKNOWLEDGEMENTS: 19 * This work was initially developed by Howard Chu for inclusion in 20 * OpenLDAP Software. 21 */ 22 23 #include <sys/cdefs.h> 24 __RCSID("$NetBSD: syncprov.c,v 1.4 2025/09/05 21:16:32 christos Exp $"); 25 26 #include "portable.h" 27 28 #ifdef SLAPD_OVER_SYNCPROV 29 30 #include <ac/string.h> 31 #include "lutil.h" 32 #include "slap.h" 33 #include "slap-config.h" 34 #include "ldap_rq.h" 35 36 #ifdef LDAP_DEVEL 37 #define CHECK_CSN 1 38 #endif 39 40 /* A modify request on a particular entry */ 41 typedef struct modinst { 42 struct modinst *mi_next; 43 Operation *mi_op; 44 } modinst; 45 46 typedef struct modtarget { 47 struct modinst *mt_mods; 48 struct modinst *mt_tail; 49 struct berval mt_dn; 50 ldap_pvt_thread_mutex_t mt_mutex; 51 } modtarget; 52 53 /* All the info of a psearch result that's shared between 54 * multiple queues 55 */ 56 typedef struct resinfo { 57 struct syncres *ri_list; 58 Entry *ri_e; 59 struct berval ri_dn; 60 struct berval ri_ndn; 61 struct berval ri_uuid; 62 struct berval ri_csn; 63 struct berval ri_cookie; 64 char ri_isref; 65 ldap_pvt_thread_mutex_t ri_mutex; 66 } resinfo; 67 68 /* A queued result of a persistent search */ 69 typedef struct syncres { 70 struct syncres *s_next; /* list of results on this psearch queue */ 71 struct syncres *s_rilist; /* list of psearches using this result */ 72 resinfo *s_info; 73 char s_mode; 74 } syncres; 75 76 /* Record of a persistent search */ 77 typedef struct syncops { 78 struct syncops *s_next; 79 struct syncprov_info_t *s_si; 80 struct berval s_base; /* ndn of search base */ 81 ID s_eid; /* entryID of search base */ 82 Operation *s_op; /* search op */ 83 int s_rid; 84 int s_sid; 85 struct berval s_filterstr; 86 int s_flags; /* search status */ 87 #define PS_IS_REFRESHING 0x01 88 #define PS_IS_DETACHED 0x02 89 #define PS_WROTE_BASE 0x04 90 #define PS_FIND_BASE 0x08 91 #define PS_FIX_FILTER 0x10 92 #define PS_TASK_QUEUED 0x20 93 94 int s_inuse; /* reference count */ 95 struct syncres *s_res; 96 struct syncres *s_restail; 97 void *s_pool_cookie; 98 ldap_pvt_thread_mutex_t s_mutex; 99 } syncops; 100 101 /* A received sync control */ 102 typedef struct sync_control { 103 struct sync_cookie sr_state; 104 int sr_rhint; 105 } sync_control; 106 107 #if 0 /* moved back to slap.h */ 108 #define o_sync o_ctrlflag[slap_cids.sc_LDAPsync] 109 #endif 110 /* o_sync_mode uses data bits of o_sync */ 111 #define o_sync_mode o_ctrlflag[slap_cids.sc_LDAPsync] 112 113 #define SLAP_SYNC_NONE (LDAP_SYNC_NONE<<SLAP_CONTROL_SHIFT) 114 #define SLAP_SYNC_REFRESH (LDAP_SYNC_REFRESH_ONLY<<SLAP_CONTROL_SHIFT) 115 #define SLAP_SYNC_PERSIST (LDAP_SYNC_RESERVED<<SLAP_CONTROL_SHIFT) 116 #define SLAP_SYNC_REFRESH_AND_PERSIST (LDAP_SYNC_REFRESH_AND_PERSIST<<SLAP_CONTROL_SHIFT) 117 118 /* Record of which searches matched at premodify step */ 119 typedef struct syncmatches { 120 struct syncmatches *sm_next; 121 syncops *sm_op; 122 } syncmatches; 123 124 /* Session log data */ 125 typedef struct slog_entry { 126 struct berval se_uuid; 127 struct berval se_csn; 128 int se_sid; 129 ber_tag_t se_tag; 130 } slog_entry; 131 132 typedef struct sessionlog { 133 BerVarray sl_mincsn; 134 int *sl_sids; 135 int sl_numcsns; 136 int sl_num; 137 int sl_size; 138 int sl_playing; 139 TAvlnode *sl_entries; 140 ldap_pvt_thread_rdwr_t sl_mutex; 141 } sessionlog; 142 143 /* Accesslog callback data */ 144 typedef struct syncprov_accesslog_deletes { 145 Operation *op; 146 SlapReply *rs; 147 sync_control *srs; 148 BerVarray ctxcsn; 149 int numcsns, *sids; 150 Avlnode *uuids; 151 BerVarray uuid_list; 152 int ndel, list_len; 153 char *uuid_buf; 154 } syncprov_accesslog_deletes; 155 156 /* The main state for this overlay */ 157 typedef struct syncprov_info_t { 158 syncops *si_ops; 159 struct berval si_contextdn; 160 struct berval si_logbase; 161 BerVarray si_ctxcsn; /* ldapsync context */ 162 int *si_sids; 163 int si_numcsns; 164 int si_chkops; /* checkpointing info */ 165 int si_chktime; 166 int si_numops; /* number of ops since last checkpoint */ 167 int si_nopres; /* Skip present phase */ 168 int si_usehint; /* use reload hint */ 169 int si_active; /* True if there are active mods */ 170 int si_dirty; /* True if the context is dirty, i.e changes 171 * have been made without updating the csn. */ 172 time_t si_chklast; /* time of last checkpoint */ 173 Avlnode *si_mods; /* entries being modified */ 174 sessionlog *si_logs; 175 ldap_pvt_thread_rdwr_t si_csn_rwlock; 176 ldap_pvt_thread_mutex_t si_ops_mutex; 177 ldap_pvt_thread_mutex_t si_mods_mutex; 178 ldap_pvt_thread_mutex_t si_resp_mutex; 179 } syncprov_info_t; 180 181 typedef struct opcookie { 182 slap_overinst *son; 183 syncmatches *smatches; 184 modtarget *smt; 185 Entry *se; 186 struct berval sdn; /* DN of entry, for deletes */ 187 struct berval sndn; 188 struct berval suuid; /* UUID of entry */ 189 struct berval sctxcsn; 190 short osid; /* sid of op csn */ 191 short rsid; /* sid of relay */ 192 short sreference; /* Is the entry a reference? */ 193 syncres ssres; 194 } opcookie; 195 196 typedef struct fbase_cookie { 197 struct berval *fdn; /* DN of a modified entry, for scope testing */ 198 syncops *fss; /* persistent search we're testing against */ 199 int fbase; /* if TRUE we found the search base and it's still valid */ 200 int fscope; /* if TRUE then fdn is within the psearch scope */ 201 } fbase_cookie; 202 203 static AttributeName csn_anlist[3]; 204 static AttributeName uuid_anlist[2]; 205 206 static AttributeDescription *ad_reqType, *ad_reqResult, *ad_reqDN, 207 *ad_reqEntryUUID, *ad_minCSN, *ad_reqNewDN; 208 209 /* Build a LDAPsync intermediate state control */ 210 static int 211 syncprov_state_ctrl( 212 Operation *op, 213 SlapReply *rs, 214 Entry *e, 215 int entry_sync_state, 216 LDAPControl **ctrls, 217 int num_ctrls, 218 int send_cookie, 219 struct berval *cookie ) 220 { 221 Attribute* a; 222 int ret; 223 224 BerElementBuffer berbuf; 225 BerElement *ber = (BerElement *)&berbuf; 226 LDAPControl *cp; 227 struct berval bv; 228 struct berval entryuuid_bv = BER_BVNULL; 229 230 ber_init2( ber, 0, LBER_USE_DER ); 231 ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx ); 232 233 for ( a = e->e_attrs; a != NULL; a = a->a_next ) { 234 AttributeDescription *desc = a->a_desc; 235 if ( desc == slap_schema.si_ad_entryUUID ) { 236 entryuuid_bv = a->a_nvals[0]; 237 break; 238 } 239 } 240 241 /* FIXME: what if entryuuid is NULL or empty ? */ 242 243 if ( send_cookie && cookie ) { 244 ber_printf( ber, "{eOON}", 245 entry_sync_state, &entryuuid_bv, cookie ); 246 } else { 247 ber_printf( ber, "{eON}", 248 entry_sync_state, &entryuuid_bv ); 249 } 250 251 ret = ber_flatten2( ber, &bv, 0 ); 252 if ( ret == 0 ) { 253 cp = op->o_tmpalloc( sizeof( LDAPControl ) + bv.bv_len, op->o_tmpmemctx ); 254 cp->ldctl_oid = LDAP_CONTROL_SYNC_STATE; 255 cp->ldctl_iscritical = (op->o_sync == SLAP_CONTROL_CRITICAL); 256 cp->ldctl_value.bv_val = (char *)&cp[1]; 257 cp->ldctl_value.bv_len = bv.bv_len; 258 AC_MEMCPY( cp->ldctl_value.bv_val, bv.bv_val, bv.bv_len ); 259 ctrls[num_ctrls] = cp; 260 } 261 ber_free_buf( ber ); 262 263 if ( ret < 0 ) { 264 Debug( LDAP_DEBUG_TRACE, 265 "slap_build_sync_ctrl: ber_flatten2 failed (%d)\n", 266 ret ); 267 send_ldap_error( op, rs, LDAP_OTHER, "internal error" ); 268 return LDAP_OTHER; 269 } 270 271 return LDAP_SUCCESS; 272 } 273 274 /* Build a LDAPsync final state control */ 275 static int 276 syncprov_done_ctrl( 277 Operation *op, 278 SlapReply *rs, 279 LDAPControl **ctrls, 280 int num_ctrls, 281 int send_cookie, 282 struct berval *cookie, 283 int refreshDeletes ) 284 { 285 int ret; 286 BerElementBuffer berbuf; 287 BerElement *ber = (BerElement *)&berbuf; 288 LDAPControl *cp; 289 struct berval bv; 290 291 ber_init2( ber, NULL, LBER_USE_DER ); 292 ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx ); 293 294 ber_printf( ber, "{" ); 295 if ( send_cookie && cookie ) { 296 ber_printf( ber, "O", cookie ); 297 } 298 if ( refreshDeletes == LDAP_SYNC_REFRESH_DELETES ) { 299 ber_printf( ber, "b", refreshDeletes ); 300 } 301 ber_printf( ber, "N}" ); 302 303 ret = ber_flatten2( ber, &bv, 0 ); 304 if ( ret == 0 ) { 305 cp = op->o_tmpalloc( sizeof( LDAPControl ) + bv.bv_len, op->o_tmpmemctx ); 306 cp->ldctl_oid = LDAP_CONTROL_SYNC_DONE; 307 cp->ldctl_iscritical = (op->o_sync == SLAP_CONTROL_CRITICAL); 308 cp->ldctl_value.bv_val = (char *)&cp[1]; 309 cp->ldctl_value.bv_len = bv.bv_len; 310 AC_MEMCPY( cp->ldctl_value.bv_val, bv.bv_val, bv.bv_len ); 311 ctrls[num_ctrls] = cp; 312 } 313 314 ber_free_buf( ber ); 315 316 if ( ret < 0 ) { 317 Debug( LDAP_DEBUG_TRACE, 318 "syncprov_done_ctrl: ber_flatten2 failed (%d)\n", 319 ret ); 320 send_ldap_error( op, rs, LDAP_OTHER, "internal error" ); 321 return LDAP_OTHER; 322 } 323 324 return LDAP_SUCCESS; 325 } 326 327 static int 328 syncprov_sendinfo( 329 Operation *op, 330 SlapReply *rs, 331 int type, 332 struct berval *cookie, 333 int refreshDone, 334 BerVarray syncUUIDs, 335 int refreshDeletes ) 336 { 337 BerElementBuffer berbuf; 338 BerElement *ber = (BerElement *)&berbuf; 339 struct berval rspdata; 340 341 int ret; 342 343 ber_init2( ber, NULL, LBER_USE_DER ); 344 ber_set_option( ber, LBER_OPT_BER_MEMCTX, &op->o_tmpmemctx ); 345 346 if ( type ) { 347 switch ( type ) { 348 case LDAP_TAG_SYNC_NEW_COOKIE: 349 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendinfo: " 350 "sending a new cookie=%s\n", 351 op->o_log_prefix, cookie->bv_val ); 352 ber_printf( ber, "tO", type, cookie ); 353 break; 354 case LDAP_TAG_SYNC_REFRESH_DELETE: 355 case LDAP_TAG_SYNC_REFRESH_PRESENT: 356 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendinfo: " 357 "%s cookie=%s\n", 358 op->o_log_prefix, 359 type == LDAP_TAG_SYNC_REFRESH_DELETE ? "refreshDelete" : "refreshPresent", 360 cookie ? cookie->bv_val : "" ); 361 ber_printf( ber, "t{", type ); 362 if ( cookie ) { 363 ber_printf( ber, "O", cookie ); 364 } 365 if ( refreshDone == 0 ) { 366 ber_printf( ber, "b", refreshDone ); 367 } 368 ber_printf( ber, "N}" ); 369 break; 370 case LDAP_TAG_SYNC_ID_SET: 371 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendinfo: " 372 "%s syncIdSet cookie=%s\n", 373 op->o_log_prefix, refreshDeletes ? "delete" : "present", 374 cookie ? cookie->bv_val : "" ); 375 ber_printf( ber, "t{", type ); 376 if ( cookie ) { 377 ber_printf( ber, "O", cookie ); 378 } 379 if ( refreshDeletes == 1 ) { 380 ber_printf( ber, "b", refreshDeletes ); 381 } 382 ber_printf( ber, "[W]", syncUUIDs ); 383 ber_printf( ber, "N}" ); 384 break; 385 default: 386 Debug( LDAP_DEBUG_TRACE, 387 "%s syncprov_sendinfo: invalid syncinfo type (%d)\n", 388 op->o_log_prefix, type ); 389 return LDAP_OTHER; 390 } 391 } 392 393 ret = ber_flatten2( ber, &rspdata, 0 ); 394 395 if ( ret < 0 ) { 396 Debug( LDAP_DEBUG_TRACE, 397 "syncprov_sendinfo: ber_flatten2 failed (%d)\n", 398 ret ); 399 send_ldap_error( op, rs, LDAP_OTHER, "internal error" ); 400 return LDAP_OTHER; 401 } 402 403 rs->sr_rspoid = LDAP_SYNC_INFO; 404 rs->sr_rspdata = &rspdata; 405 send_ldap_intermediate( op, rs ); 406 rs->sr_rspdata = NULL; 407 ber_free_buf( ber ); 408 409 return LDAP_SUCCESS; 410 } 411 412 /* Find a modtarget in an AVL tree */ 413 static int 414 sp_avl_cmp( const void *c1, const void *c2 ) 415 { 416 const modtarget *m1, *m2; 417 int rc; 418 419 m1 = c1; m2 = c2; 420 rc = m1->mt_dn.bv_len - m2->mt_dn.bv_len; 421 422 if ( rc ) return rc; 423 return ber_bvcmp( &m1->mt_dn, &m2->mt_dn ); 424 } 425 426 static int 427 sp_uuid_cmp( const void *l, const void *r ) 428 { 429 const struct berval *left = l, *right = r; 430 431 return ber_bvcmp( left, right ); 432 } 433 434 static int 435 syncprov_sessionlog_cmp( const void *l, const void *r ) 436 { 437 const slog_entry *left = l, *right = r; 438 int ret = ber_bvcmp( &left->se_csn, &right->se_csn ); 439 if ( !ret ) 440 ret = ber_bvcmp( &left->se_uuid, &right->se_uuid ); 441 /* Only time we have two modifications with same CSN is when we detect a 442 * rename during replication. 443 * We invert the test here because LDAP_REQ_MODDN is 444 * numerically greater than LDAP_REQ_MODIFY but we 445 * want it to occur first. 446 */ 447 if ( !ret ) 448 ret = right->se_tag - left->se_tag; 449 450 return ret; 451 } 452 453 /* syncprov_findbase: 454 * finds the true DN of the base of a search (with alias dereferencing) and 455 * checks to make sure the base entry doesn't get replaced with a different 456 * entry (e.g., swapping trees via ModDN, or retargeting an alias). If a 457 * change is detected, any persistent search on this base must be terminated / 458 * reloaded. 459 * On the first call, we just save the DN and entryID. On subsequent calls 460 * we compare the DN and entryID with the saved values. 461 */ 462 static int 463 findbase_cb( Operation *op, SlapReply *rs ) 464 { 465 slap_callback *sc = op->o_callback; 466 467 if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) { 468 fbase_cookie *fc = sc->sc_private; 469 470 /* If no entryID, we're looking for the first time. 471 * Just store whatever we got. 472 */ 473 if ( fc->fss->s_eid == NOID ) { 474 fc->fbase = 2; 475 fc->fss->s_eid = rs->sr_entry->e_id; 476 ber_dupbv( &fc->fss->s_base, &rs->sr_entry->e_nname ); 477 478 } else if ( rs->sr_entry->e_id == fc->fss->s_eid && 479 dn_match( &rs->sr_entry->e_nname, &fc->fss->s_base )) { 480 481 /* OK, the DN is the same and the entryID is the same. */ 482 fc->fbase = 1; 483 } 484 } 485 if ( rs->sr_err != LDAP_SUCCESS ) { 486 Debug( LDAP_DEBUG_ANY, "findbase failed! %d\n", rs->sr_err ); 487 } 488 return LDAP_SUCCESS; 489 } 490 491 static Filter generic_filter = { LDAP_FILTER_PRESENT, { 0 }, NULL }; 492 static struct berval generic_filterstr = BER_BVC("(objectclass=*)"); 493 494 static int 495 syncprov_findbase( Operation *op, fbase_cookie *fc ) 496 { 497 /* Use basic parameters from syncrepl search, but use 498 * current op's threadctx / tmpmemctx 499 */ 500 ldap_pvt_thread_mutex_lock( &fc->fss->s_mutex ); 501 if ( fc->fss->s_flags & PS_FIND_BASE ) { 502 slap_callback cb = {0}; 503 Operation fop; 504 SlapReply frs = { REP_RESULT }; 505 int rc; 506 507 fc->fss->s_flags ^= PS_FIND_BASE; 508 ldap_pvt_thread_mutex_unlock( &fc->fss->s_mutex ); 509 510 fop = *fc->fss->s_op; 511 512 fop.o_bd = fop.o_bd->bd_self; 513 fop.o_hdr = op->o_hdr; 514 fop.o_time = op->o_time; 515 fop.o_tincr = op->o_tincr; 516 fop.o_extra = op->o_extra; 517 518 cb.sc_response = findbase_cb; 519 cb.sc_private = fc; 520 521 fop.o_sync_mode = 0; /* turn off sync mode */ 522 fop.o_managedsait = SLAP_CONTROL_CRITICAL; 523 fop.o_callback = &cb; 524 fop.o_tag = LDAP_REQ_SEARCH; 525 fop.ors_scope = LDAP_SCOPE_BASE; 526 fop.ors_limit = NULL; 527 fop.ors_slimit = 1; 528 fop.ors_tlimit = SLAP_NO_LIMIT; 529 fop.ors_attrs = slap_anlist_no_attrs; 530 fop.ors_attrsonly = 1; 531 fop.ors_filter = &generic_filter; 532 fop.ors_filterstr = generic_filterstr; 533 534 Debug( LDAP_DEBUG_SYNC, "%s syncprov_findbase: searching\n", op->o_log_prefix ); 535 rc = fop.o_bd->be_search( &fop, &frs ); 536 } else { 537 ldap_pvt_thread_mutex_unlock( &fc->fss->s_mutex ); 538 fc->fbase = 1; 539 } 540 541 /* After the first call, see if the fdn resides in the scope */ 542 if ( fc->fbase == 1 ) { 543 switch ( fc->fss->s_op->ors_scope ) { 544 case LDAP_SCOPE_BASE: 545 fc->fscope = dn_match( fc->fdn, &fc->fss->s_base ); 546 break; 547 case LDAP_SCOPE_ONELEVEL: { 548 struct berval pdn; 549 dnParent( fc->fdn, &pdn ); 550 fc->fscope = dn_match( &pdn, &fc->fss->s_base ); 551 break; } 552 case LDAP_SCOPE_SUBTREE: 553 fc->fscope = dnIsSuffix( fc->fdn, &fc->fss->s_base ); 554 break; 555 case LDAP_SCOPE_SUBORDINATE: 556 fc->fscope = dnIsSuffix( fc->fdn, &fc->fss->s_base ) && 557 !dn_match( fc->fdn, &fc->fss->s_base ); 558 break; 559 } 560 } 561 562 if ( fc->fbase ) 563 return LDAP_SUCCESS; 564 565 /* If entryID has changed, then the base of this search has 566 * changed. Invalidate the psearch. 567 */ 568 return LDAP_NO_SUCH_OBJECT; 569 } 570 571 /* syncprov_findcsn: 572 * This function has three different purposes, but they all use a search 573 * that filters on entryCSN so they're combined here. 574 * 1: at startup time, after a contextCSN has been read from the database, 575 * we search for all entries with CSN >= contextCSN in case the contextCSN 576 * was not checkpointed at the previous shutdown. 577 * 578 * 2: when the current contextCSN is known and we have a sync cookie, we search 579 * for one entry with CSN = the cookie CSN. If not found, try <= cookie CSN. 580 * If an entry is found, the cookie CSN is valid, otherwise it is stale. 581 * 582 * 3: during a refresh phase, we search for all entries with CSN <= the cookie 583 * CSN, and generate Present records for them. We always collect this result 584 * in SyncID sets, even if there's only one match. 585 */ 586 typedef enum find_csn_t { 587 FIND_MAXCSN = 1, 588 FIND_CSN = 2, 589 FIND_PRESENT = 3 590 } find_csn_t; 591 592 static int 593 findmax_cb( Operation *op, SlapReply *rs ) 594 { 595 if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) { 596 struct berval *maxcsn = op->o_callback->sc_private; 597 Attribute *a = attr_find( rs->sr_entry->e_attrs, 598 slap_schema.si_ad_entryCSN ); 599 600 if ( a && ber_bvcmp( &a->a_vals[0], maxcsn ) > 0 && 601 slap_parse_csn_sid( &a->a_vals[0] ) == slap_serverID ) { 602 maxcsn->bv_len = a->a_vals[0].bv_len; 603 strcpy( maxcsn->bv_val, a->a_vals[0].bv_val ); 604 } 605 } 606 return LDAP_SUCCESS; 607 } 608 609 static int 610 findcsn_cb( Operation *op, SlapReply *rs ) 611 { 612 slap_callback *sc = op->o_callback; 613 614 /* We just want to know that at least one exists, so it's OK if 615 * we exceed the unchecked limit. 616 */ 617 if ( rs->sr_err == LDAP_ADMINLIMIT_EXCEEDED || 618 (rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS )) { 619 sc->sc_private = (void *)1; 620 } 621 return LDAP_SUCCESS; 622 } 623 624 /* Build a list of entryUUIDs for sending in a SyncID set */ 625 626 #define UUID_LEN 16 627 628 typedef struct fpres_cookie { 629 int num; 630 BerVarray uuids; 631 char *last; 632 } fpres_cookie; 633 634 static int 635 findpres_cb( Operation *op, SlapReply *rs ) 636 { 637 slap_callback *sc = op->o_callback; 638 fpres_cookie *pc = sc->sc_private; 639 Attribute *a; 640 int ret = SLAP_CB_CONTINUE; 641 642 switch ( rs->sr_type ) { 643 case REP_SEARCH: 644 a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryUUID ); 645 if ( a ) { 646 pc->uuids[pc->num].bv_val = pc->last; 647 AC_MEMCPY( pc->uuids[pc->num].bv_val, a->a_nvals[0].bv_val, 648 pc->uuids[pc->num].bv_len ); 649 pc->num++; 650 pc->last = pc->uuids[pc->num].bv_val; 651 pc->uuids[pc->num].bv_val = NULL; 652 } 653 ret = LDAP_SUCCESS; 654 if ( pc->num != SLAP_SYNCUUID_SET_SIZE ) 655 break; 656 /* FALLTHRU */ 657 case REP_RESULT: 658 ret = rs->sr_err; 659 if ( pc->num ) { 660 ret = syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL, 661 0, pc->uuids, 0 ); 662 pc->uuids[pc->num].bv_val = pc->last; 663 pc->num = 0; 664 pc->last = pc->uuids[0].bv_val; 665 } 666 break; 667 default: 668 break; 669 } 670 return ret; 671 } 672 673 static int 674 syncprov_findcsn( Operation *op, find_csn_t mode, struct berval *csn ) 675 { 676 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 677 syncprov_info_t *si = on->on_bi.bi_private; 678 679 slap_callback cb = {0}; 680 Operation fop; 681 SlapReply frs = { REP_RESULT }; 682 char buf[LDAP_PVT_CSNSTR_BUFSIZE + STRLENOF("(entryCSN<=)")]; 683 char cbuf[LDAP_PVT_CSNSTR_BUFSIZE]; 684 struct berval maxcsn; 685 Filter cf; 686 AttributeAssertion eq = ATTRIBUTEASSERTION_INIT; 687 fpres_cookie pcookie; 688 sync_control *srs = NULL; 689 struct slap_limits_set fc_limits; 690 int i, rc = LDAP_SUCCESS, findcsn_retry = 1; 691 int maxid; 692 693 if ( mode != FIND_MAXCSN ) { 694 srs = op->o_controls[slap_cids.sc_LDAPsync]; 695 } 696 697 Debug( LDAP_DEBUG_SYNC, "%s syncprov_findcsn: mode=%s csn=%s\n", 698 op->o_log_prefix, 699 mode == FIND_MAXCSN ? 700 "FIND_MAXCSN" : 701 mode == FIND_CSN ? 702 "FIND_CSN" : 703 "FIND_PRESENT", 704 csn ? csn->bv_val : "" ); 705 706 again: 707 fop = *op; 708 fop.o_sync_mode &= SLAP_CONTROL_MASK; /* turn off sync_mode */ 709 /* We want pure entries, not referrals */ 710 fop.o_managedsait = SLAP_CONTROL_CRITICAL; 711 712 cf.f_ava = &eq; 713 cf.f_av_desc = slap_schema.si_ad_entryCSN; 714 BER_BVZERO( &cf.f_av_value ); 715 cf.f_next = NULL; 716 717 fop.o_callback = &cb; 718 fop.ors_limit = NULL; 719 fop.ors_tlimit = SLAP_NO_LIMIT; 720 fop.ors_filter = &cf; 721 fop.ors_filterstr.bv_val = buf; 722 723 switch( mode ) { 724 case FIND_MAXCSN: 725 cf.f_choice = LDAP_FILTER_GE; 726 /* If there are multiple CSNs, use the one with our serverID */ 727 for ( i=0; i<si->si_numcsns; i++) { 728 if ( slap_serverID == si->si_sids[i] ) { 729 maxid = i; 730 break; 731 } 732 } 733 if ( i == si->si_numcsns ) { 734 /* No match: this is multimaster, and none of the content in the DB 735 * originated locally. Treat like no CSN. 736 */ 737 return LDAP_NO_SUCH_OBJECT; 738 } 739 cf.f_av_value = si->si_ctxcsn[maxid]; 740 fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ), 741 "(entryCSN>=%s)", cf.f_av_value.bv_val ); 742 if ( fop.ors_filterstr.bv_len >= sizeof( buf ) ) { 743 return LDAP_OTHER; 744 } 745 fop.ors_attrsonly = 0; 746 fop.ors_attrs = csn_anlist; 747 fop.ors_slimit = SLAP_NO_LIMIT; 748 cb.sc_private = &maxcsn; 749 cb.sc_response = findmax_cb; 750 strcpy( cbuf, cf.f_av_value.bv_val ); 751 maxcsn.bv_val = cbuf; 752 maxcsn.bv_len = cf.f_av_value.bv_len; 753 break; 754 case FIND_CSN: 755 if ( BER_BVISEMPTY( &cf.f_av_value )) { 756 cf.f_av_value = *csn; 757 } 758 fop.o_dn = op->o_bd->be_rootdn; 759 fop.o_ndn = op->o_bd->be_rootndn; 760 fop.o_req_dn = op->o_bd->be_suffix[0]; 761 fop.o_req_ndn = op->o_bd->be_nsuffix[0]; 762 /* Look for exact match the first time */ 763 if ( findcsn_retry ) { 764 cf.f_choice = LDAP_FILTER_EQUALITY; 765 fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ), 766 "(entryCSN=%s)", cf.f_av_value.bv_val ); 767 /* On retry, look for <= */ 768 } else { 769 cf.f_choice = LDAP_FILTER_LE; 770 fop.ors_limit = &fc_limits; 771 memset( &fc_limits, 0, sizeof( fc_limits )); 772 fc_limits.lms_s_unchecked = 1; 773 fop.ors_filterstr.bv_len = snprintf( buf, sizeof( buf ), 774 "(entryCSN<=%s)", cf.f_av_value.bv_val ); 775 } 776 if ( fop.ors_filterstr.bv_len >= sizeof( buf ) ) { 777 return LDAP_OTHER; 778 } 779 fop.ors_attrsonly = 1; 780 fop.ors_attrs = slap_anlist_no_attrs; 781 fop.ors_slimit = 1; 782 cb.sc_private = NULL; 783 cb.sc_response = findcsn_cb; 784 break; 785 case FIND_PRESENT: 786 fop.ors_filter = op->ors_filter; 787 fop.ors_filterstr = op->ors_filterstr; 788 fop.ors_attrsonly = 0; 789 fop.ors_attrs = uuid_anlist; 790 fop.ors_slimit = SLAP_NO_LIMIT; 791 cb.sc_private = &pcookie; 792 cb.sc_response = findpres_cb; 793 pcookie.num = 0; 794 795 /* preallocate storage for a full set */ 796 pcookie.uuids = op->o_tmpalloc( (SLAP_SYNCUUID_SET_SIZE+1) * 797 sizeof(struct berval) + SLAP_SYNCUUID_SET_SIZE * UUID_LEN, 798 op->o_tmpmemctx ); 799 pcookie.last = (char *)(pcookie.uuids + SLAP_SYNCUUID_SET_SIZE+1); 800 pcookie.uuids[0].bv_val = pcookie.last; 801 pcookie.uuids[0].bv_len = UUID_LEN; 802 for (i=1; i<SLAP_SYNCUUID_SET_SIZE; i++) { 803 pcookie.uuids[i].bv_val = pcookie.uuids[i-1].bv_val + UUID_LEN; 804 pcookie.uuids[i].bv_len = UUID_LEN; 805 } 806 break; 807 } 808 809 fop.o_bd->bd_info = (BackendInfo *)on->on_info; 810 fop.o_bd->be_search( &fop, &frs ); 811 fop.o_bd->bd_info = (BackendInfo *)on; 812 813 switch( mode ) { 814 case FIND_MAXCSN: 815 if ( ber_bvcmp( &si->si_ctxcsn[maxid], &maxcsn )) { 816 #ifdef CHECK_CSN 817 Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax; 818 assert( !syn->ssyn_validate( syn, &maxcsn )); 819 #endif 820 ber_bvreplace( &si->si_ctxcsn[maxid], &maxcsn ); 821 si->si_numops++; /* ensure a checkpoint */ 822 } 823 break; 824 case FIND_CSN: 825 /* If matching CSN was not found, invalidate the context. */ 826 Debug( LDAP_DEBUG_SYNC, "%s syncprov_findcsn: csn%s=%s %sfound\n", 827 op->o_log_prefix, 828 cf.f_choice == LDAP_FILTER_EQUALITY ? "=" : "<", 829 cf.f_av_value.bv_val, cb.sc_private ? "" : "not " ); 830 if ( !cb.sc_private ) { 831 /* If we didn't find an exact match, then try for <= */ 832 if ( findcsn_retry ) { 833 findcsn_retry = 0; 834 rs_reinit( &frs, REP_RESULT ); 835 goto again; 836 } 837 rc = LDAP_NO_SUCH_OBJECT; 838 } 839 break; 840 case FIND_PRESENT: 841 op->o_tmpfree( pcookie.uuids, op->o_tmpmemctx ); 842 break; 843 } 844 845 return rc; 846 } 847 848 static void free_resinfo( syncres *sr ) 849 { 850 syncres **st; 851 resinfo *ri = sr->s_info; 852 int freeit = 0; 853 854 ldap_pvt_thread_mutex_lock( &ri->ri_mutex ); 855 for (st = &sr->s_info->ri_list; *st; st = &(*st)->s_rilist) { 856 if (*st == sr) { 857 *st = sr->s_rilist; 858 if ( !sr->s_info->ri_list ) 859 freeit = 1; 860 sr->s_info = NULL; 861 break; 862 } 863 } 864 ldap_pvt_thread_mutex_unlock( &ri->ri_mutex ); 865 if ( freeit ) { 866 ldap_pvt_thread_mutex_destroy( &ri->ri_mutex ); 867 if ( ri->ri_e ) 868 entry_free( ri->ri_e ); 869 if ( !BER_BVISNULL( &ri->ri_cookie )) 870 ch_free( ri->ri_cookie.bv_val ); 871 ch_free( ri ); 872 } 873 } 874 875 #define FS_UNLINK 1 876 #define FS_LOCK 2 877 #define FS_DEFER 4 878 879 #define FSR_NOTFREE 0 880 #define FSR_DIDFREE 1 881 #define FSR_CANFREE 2 882 883 static int 884 syncprov_free_syncop( syncops *so, int flags ) 885 { 886 syncres *sr, *srnext; 887 GroupAssertion *ga, *gnext; 888 889 if ( flags & FS_LOCK ) 890 ldap_pvt_thread_mutex_lock( &so->s_mutex ); 891 /* already being freed, or still in use */ 892 if ( !so->s_inuse || so->s_inuse > 1 ) { 893 if ( flags & FS_LOCK ) 894 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 895 if ( !( flags & FS_DEFER ) && so->s_inuse ) 896 so->s_inuse--; 897 return FSR_NOTFREE; 898 } 899 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 900 901 /* caller wants to cleanup other stuff before actual free */ 902 if ( flags & FS_DEFER ) 903 return FSR_CANFREE; 904 905 if (( flags & FS_UNLINK ) && so->s_si ) { 906 syncops **sop; 907 ldap_pvt_thread_mutex_lock( &so->s_si->si_ops_mutex ); 908 for ( sop = &so->s_si->si_ops; *sop; sop = &(*sop)->s_next ) { 909 if ( *sop == so ) { 910 *sop = so->s_next; 911 break; 912 } 913 } 914 ldap_pvt_thread_mutex_unlock( &so->s_si->si_ops_mutex ); 915 } 916 if ( so->s_flags & PS_IS_DETACHED ) { 917 filter_free( so->s_op->ors_filter ); 918 for ( ga = so->s_op->o_groups; ga; ga=gnext ) { 919 gnext = ga->ga_next; 920 ch_free( ga ); 921 } 922 ch_free( so->s_op ); 923 } 924 ch_free( so->s_base.bv_val ); 925 for ( sr=so->s_res; sr; sr=srnext ) { 926 srnext = sr->s_next; 927 free_resinfo( sr ); 928 ch_free( sr ); 929 } 930 ldap_pvt_thread_mutex_destroy( &so->s_mutex ); 931 ch_free( so ); 932 return FSR_DIDFREE; 933 } 934 935 /* Send a persistent search response */ 936 static int 937 syncprov_sendresp( Operation *op, resinfo *ri, syncops *so, int mode ) 938 { 939 SlapReply rs = { REP_SEARCH }; 940 struct berval cookie, csns[2]; 941 Entry e_uuid = {0}; 942 Attribute a_uuid = {0}; 943 944 if ( so->s_op->o_abandon ) 945 return SLAPD_ABANDON; 946 947 rs.sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2, op->o_tmpmemctx ); 948 rs.sr_ctrls[1] = NULL; 949 rs.sr_flags = REP_CTRLS_MUSTBEFREED; 950 csns[0] = ri->ri_csn; 951 BER_BVZERO( &csns[1] ); 952 slap_compose_sync_cookie( op, &cookie, csns, so->s_rid, 953 slap_serverID ? slap_serverID : -1, NULL ); 954 955 #ifdef LDAP_DEBUG 956 if ( so->s_sid > 0 ) { 957 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendresp: to=%03x, cookie=%s\n", 958 op->o_log_prefix, so->s_sid, cookie.bv_val ); 959 } else { 960 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendresp: cookie=%s\n", 961 op->o_log_prefix, cookie.bv_val ); 962 } 963 #endif 964 965 e_uuid.e_attrs = &a_uuid; 966 a_uuid.a_desc = slap_schema.si_ad_entryUUID; 967 a_uuid.a_nvals = &ri->ri_uuid; 968 rs.sr_err = syncprov_state_ctrl( op, &rs, &e_uuid, 969 mode, rs.sr_ctrls, 0, 1, &cookie ); 970 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); 971 972 rs.sr_entry = &e_uuid; 973 if ( mode == LDAP_SYNC_ADD || mode == LDAP_SYNC_MODIFY ) { 974 e_uuid = *ri->ri_e; 975 e_uuid.e_private = NULL; 976 } 977 978 switch( mode ) { 979 case LDAP_SYNC_ADD: 980 if ( ri->ri_isref && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) { 981 rs.sr_ref = get_entry_referrals( op, rs.sr_entry ); 982 rs.sr_err = send_search_reference( op, &rs ); 983 ber_bvarray_free( rs.sr_ref ); 984 break; 985 } 986 /* fallthru */ 987 case LDAP_SYNC_MODIFY: 988 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendresp: sending %s, dn=%s\n", 989 op->o_log_prefix, 990 mode == LDAP_SYNC_ADD ? "LDAP_SYNC_ADD" : "LDAP_SYNC_MODIFY", 991 e_uuid.e_nname.bv_val ); 992 rs.sr_attrs = op->ors_attrs; 993 rs.sr_err = send_search_entry( op, &rs ); 994 break; 995 case LDAP_SYNC_DELETE: 996 Debug( LDAP_DEBUG_SYNC, "%s syncprov_sendresp: " 997 "sending LDAP_SYNC_DELETE, dn=%s\n", 998 op->o_log_prefix, ri->ri_dn.bv_val ); 999 e_uuid.e_attrs = NULL; 1000 e_uuid.e_name = ri->ri_dn; 1001 e_uuid.e_nname = ri->ri_ndn; 1002 if ( ri->ri_isref && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) { 1003 struct berval bv = BER_BVNULL; 1004 rs.sr_ref = &bv; 1005 rs.sr_err = send_search_reference( op, &rs ); 1006 } else { 1007 rs.sr_err = send_search_entry( op, &rs ); 1008 } 1009 break; 1010 default: 1011 assert(0); 1012 } 1013 return rs.sr_err; 1014 } 1015 1016 static void 1017 syncprov_qstart( syncops *so ); 1018 1019 /* Play back queued responses */ 1020 static int 1021 syncprov_qplay( Operation *op, syncops *so ) 1022 { 1023 syncres *sr; 1024 int rc = 0; 1025 1026 do { 1027 ldap_pvt_thread_mutex_lock( &so->s_mutex ); 1028 sr = so->s_res; 1029 /* Exit loop with mutex held */ 1030 if ( !sr ) 1031 break; 1032 so->s_res = sr->s_next; 1033 if ( !so->s_res ) 1034 so->s_restail = NULL; 1035 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 1036 1037 if ( !so->s_op->o_abandon ) { 1038 1039 if ( sr->s_mode == LDAP_SYNC_NEW_COOKIE ) { 1040 SlapReply rs = { REP_INTERMEDIATE }; 1041 1042 rc = syncprov_sendinfo( op, &rs, LDAP_TAG_SYNC_NEW_COOKIE, 1043 &sr->s_info->ri_cookie, 0, NULL, 0 ); 1044 } else { 1045 rc = syncprov_sendresp( op, sr->s_info, so, sr->s_mode ); 1046 } 1047 } else { 1048 /* set rc so we don't do a new qstart */ 1049 rc = 1; 1050 } 1051 1052 free_resinfo( sr ); 1053 ch_free( sr ); 1054 1055 if ( so->s_op->o_abandon ) 1056 continue; 1057 1058 /* Exit loop with mutex held */ 1059 ldap_pvt_thread_mutex_lock( &so->s_mutex ); 1060 break; 1061 1062 } while (1); 1063 1064 /* We now only send one change at a time, to prevent one 1065 * psearch from hogging all the CPU. Resubmit this task if 1066 * there are more responses queued and no errors occurred. 1067 */ 1068 1069 if ( rc == 0 && so->s_res ) { 1070 syncprov_qstart( so ); 1071 } 1072 1073 return rc; 1074 } 1075 1076 static int 1077 syncprov_drop_psearch( syncops *so, int lock ); 1078 1079 /* task for playing back queued responses */ 1080 static void * 1081 syncprov_qtask( void *ctx, void *arg ) 1082 { 1083 syncops *so = arg; 1084 OperationBuffer opbuf; 1085 Operation *op; 1086 BackendDB be; 1087 int rc, flag, frc; 1088 1089 op = &opbuf.ob_op; 1090 *op = *so->s_op; 1091 op->o_hdr = &opbuf.ob_hdr; 1092 op->o_controls = opbuf.ob_controls; 1093 memset( op->o_controls, 0, sizeof(opbuf.ob_controls) ); 1094 op->o_sync = SLAP_CONTROL_IGNORED; 1095 1096 *op->o_hdr = *so->s_op->o_hdr; 1097 1098 op->o_tmpmemctx = slap_sl_mem_create(SLAP_SLAB_SIZE, SLAP_SLAB_STACK, ctx, 1); 1099 op->o_tmpmfuncs = &slap_sl_mfuncs; 1100 op->o_threadctx = ctx; 1101 operation_counter_init( op, ctx ); 1102 1103 /* syncprov_qplay expects a fake db */ 1104 be = *so->s_op->o_bd; 1105 be.be_flags |= SLAP_DBFLAG_OVERLAY; 1106 op->o_bd = &be; 1107 LDAP_SLIST_FIRST(&op->o_extra) = NULL; 1108 op->o_callback = NULL; 1109 1110 rc = syncprov_qplay( op, so ); 1111 1112 /* if an error occurred, or no responses left, task is no longer queued */ 1113 if ( !rc && !so->s_res ) 1114 rc = 1; 1115 1116 flag = FS_UNLINK; 1117 if ( rc && op->o_abandon ) 1118 flag = FS_DEFER; 1119 1120 /* decrement use count... */ 1121 frc = syncprov_free_syncop( so, flag ); 1122 if ( frc == FSR_NOTFREE ) { 1123 if ( rc ) 1124 /* if we didn't unlink, and task is no longer queued, clear flag */ 1125 so->s_flags ^= PS_TASK_QUEUED; 1126 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 1127 } 1128 1129 /* if we got abandoned while processing, cleanup now */ 1130 if ( frc == FSR_CANFREE ) { 1131 syncprov_drop_psearch( so, 1 ); 1132 } 1133 1134 return NULL; 1135 } 1136 1137 /* Start the task to play back queued psearch responses */ 1138 static void 1139 syncprov_qstart( syncops *so ) 1140 { 1141 so->s_flags |= PS_TASK_QUEUED; 1142 so->s_inuse++; 1143 ldap_pvt_thread_pool_submit2( &connection_pool, 1144 syncprov_qtask, so, &so->s_pool_cookie ); 1145 } 1146 1147 /* Queue a persistent search response */ 1148 static int 1149 syncprov_qresp( opcookie *opc, syncops *so, int mode ) 1150 { 1151 syncres *sr; 1152 resinfo *ri; 1153 int srsize; 1154 struct berval csn = opc->sctxcsn; 1155 1156 sr = ch_malloc( sizeof( syncres )); 1157 sr->s_next = NULL; 1158 sr->s_mode = mode; 1159 if ( !opc->ssres.s_info ) { 1160 1161 srsize = sizeof( resinfo ); 1162 if ( csn.bv_len ) 1163 srsize += csn.bv_len + 1; 1164 1165 if ( opc->se ) { 1166 Attribute *a; 1167 ri = ch_malloc( srsize ); 1168 ri->ri_dn = opc->se->e_name; 1169 ri->ri_ndn = opc->se->e_nname; 1170 a = attr_find( opc->se->e_attrs, slap_schema.si_ad_entryUUID ); 1171 if ( a ) 1172 ri->ri_uuid = a->a_nvals[0]; 1173 else 1174 ri->ri_uuid.bv_len = 0; 1175 if ( csn.bv_len ) { 1176 ri->ri_csn.bv_val = (char *)(ri + 1); 1177 ri->ri_csn.bv_len = csn.bv_len; 1178 memcpy( ri->ri_csn.bv_val, csn.bv_val, csn.bv_len ); 1179 ri->ri_csn.bv_val[csn.bv_len] = '\0'; 1180 } else { 1181 ri->ri_csn.bv_val = NULL; 1182 } 1183 } else { 1184 srsize += opc->suuid.bv_len + 1185 opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1; 1186 ri = ch_malloc( srsize ); 1187 ri->ri_dn.bv_val = (char *)(ri + 1); 1188 ri->ri_dn.bv_len = opc->sdn.bv_len; 1189 ri->ri_ndn.bv_val = lutil_strcopy( ri->ri_dn.bv_val, 1190 opc->sdn.bv_val ) + 1; 1191 ri->ri_ndn.bv_len = opc->sndn.bv_len; 1192 ri->ri_uuid.bv_val = lutil_strcopy( ri->ri_ndn.bv_val, 1193 opc->sndn.bv_val ) + 1; 1194 ri->ri_uuid.bv_len = opc->suuid.bv_len; 1195 AC_MEMCPY( ri->ri_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len ); 1196 if ( csn.bv_len ) { 1197 ri->ri_csn.bv_val = ri->ri_uuid.bv_val + ri->ri_uuid.bv_len; 1198 memcpy( ri->ri_csn.bv_val, csn.bv_val, csn.bv_len ); 1199 ri->ri_csn.bv_val[csn.bv_len] = '\0'; 1200 } else { 1201 ri->ri_csn.bv_val = NULL; 1202 } 1203 } 1204 ri->ri_list = &opc->ssres; 1205 ri->ri_e = opc->se; 1206 ri->ri_csn.bv_len = csn.bv_len; 1207 ri->ri_isref = opc->sreference; 1208 BER_BVZERO( &ri->ri_cookie ); 1209 ldap_pvt_thread_mutex_init( &ri->ri_mutex ); 1210 opc->se = NULL; 1211 opc->ssres.s_info = ri; 1212 } 1213 ri = opc->ssres.s_info; 1214 sr->s_info = ri; 1215 ldap_pvt_thread_mutex_lock( &ri->ri_mutex ); 1216 sr->s_rilist = ri->ri_list; 1217 ri->ri_list = sr; 1218 if ( mode == LDAP_SYNC_NEW_COOKIE && BER_BVISNULL( &ri->ri_cookie )) { 1219 syncprov_info_t *si = opc->son->on_bi.bi_private; 1220 1221 slap_compose_sync_cookie( NULL, &ri->ri_cookie, si->si_ctxcsn, 1222 so->s_rid, slap_serverID ? slap_serverID : -1, NULL ); 1223 } 1224 Debug( LDAP_DEBUG_SYNC, "%s syncprov_qresp: " 1225 "set up a new syncres mode=%d csn=%s\n", 1226 so->s_op->o_log_prefix, mode, csn.bv_val ? csn.bv_val : "" ); 1227 ldap_pvt_thread_mutex_unlock( &ri->ri_mutex ); 1228 1229 ldap_pvt_thread_mutex_lock( &so->s_mutex ); 1230 if ( !so->s_res ) { 1231 so->s_res = sr; 1232 } else { 1233 so->s_restail->s_next = sr; 1234 } 1235 so->s_restail = sr; 1236 1237 /* If the base of the psearch was modified, check it next time round */ 1238 if ( so->s_flags & PS_WROTE_BASE ) { 1239 so->s_flags ^= PS_WROTE_BASE; 1240 so->s_flags |= PS_FIND_BASE; 1241 } 1242 if (( so->s_flags & (PS_IS_DETACHED|PS_TASK_QUEUED)) == PS_IS_DETACHED ) { 1243 syncprov_qstart( so ); 1244 } 1245 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 1246 return LDAP_SUCCESS; 1247 } 1248 1249 static int 1250 syncprov_drop_psearch( syncops *so, int lock ) 1251 { 1252 if ( so->s_flags & PS_IS_DETACHED ) { 1253 if ( lock ) 1254 ldap_pvt_thread_mutex_lock( &so->s_op->o_conn->c_mutex ); 1255 so->s_op->o_conn->c_n_ops_executing--; 1256 so->s_op->o_conn->c_n_ops_completed++; 1257 LDAP_STAILQ_REMOVE( &so->s_op->o_conn->c_ops, so->s_op, Operation, 1258 o_next ); 1259 if ( lock ) 1260 ldap_pvt_thread_mutex_unlock( &so->s_op->o_conn->c_mutex ); 1261 } 1262 return syncprov_free_syncop( so, FS_LOCK ); 1263 } 1264 1265 static int 1266 syncprov_ab_cleanup( Operation *op, SlapReply *rs ) 1267 { 1268 slap_callback *sc = op->o_callback; 1269 op->o_callback = sc->sc_next; 1270 syncprov_drop_psearch( sc->sc_private, 0 ); 1271 op->o_tmpfree( sc, op->o_tmpmemctx ); 1272 return 0; 1273 } 1274 1275 static int 1276 syncprov_op_abandon( Operation *op, SlapReply *rs ) 1277 { 1278 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 1279 syncprov_info_t *si = on->on_bi.bi_private; 1280 syncops *so, **sop; 1281 1282 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 1283 for ( sop=&si->si_ops; (so = *sop); sop = &(*sop)->s_next ) { 1284 if ( so->s_op->o_connid == op->o_connid && 1285 so->s_op->o_msgid == op->orn_msgid ) { 1286 so->s_op->o_abandon = 1; 1287 *sop = so->s_next; 1288 break; 1289 } 1290 } 1291 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 1292 if ( so ) { 1293 /* Is this really a Cancel exop? */ 1294 if ( op->o_tag != LDAP_REQ_ABANDON ) { 1295 so->s_op->o_cancel = SLAP_CANCEL_ACK; 1296 rs->sr_err = LDAP_CANCELLED; 1297 send_ldap_result( so->s_op, rs ); 1298 if ( so->s_flags & PS_IS_DETACHED ) { 1299 slap_callback *cb; 1300 cb = op->o_tmpcalloc( 1, sizeof(slap_callback), op->o_tmpmemctx ); 1301 cb->sc_cleanup = syncprov_ab_cleanup; 1302 cb->sc_next = op->o_callback; 1303 cb->sc_private = so; 1304 op->o_callback = cb; 1305 return SLAP_CB_CONTINUE; 1306 } 1307 } 1308 /* if task is active, it must drop itself */ 1309 if ( !( so->s_flags & PS_TASK_QUEUED )) 1310 syncprov_drop_psearch( so, 0 ); 1311 } 1312 return SLAP_CB_CONTINUE; 1313 } 1314 1315 /* Find which persistent searches are affected by this operation */ 1316 static void 1317 syncprov_matchops( Operation *op, opcookie *opc, int saveit ) 1318 { 1319 slap_overinst *on = opc->son; 1320 syncprov_info_t *si = on->on_bi.bi_private; 1321 1322 fbase_cookie fc; 1323 syncops **pss; 1324 Entry *e = NULL; 1325 Attribute *a; 1326 int rc, gonext; 1327 BackendDB *b0 = op->o_bd, db; 1328 1329 fc.fdn = saveit ? &op->o_req_ndn : &opc->sndn; 1330 if ( !saveit && op->o_tag == LDAP_REQ_DELETE ) { 1331 /* Delete succeeded, there is no entry */ 1332 } else if ( op->o_tag != LDAP_REQ_ADD ) { 1333 if ( !SLAP_ISOVERLAY( op->o_bd )) { 1334 db = *op->o_bd; 1335 op->o_bd = &db; 1336 } 1337 rc = overlay_entry_get_ov( op, fc.fdn, NULL, NULL, 0, &e, on ); 1338 /* If we're sending responses now, make a copy and unlock the DB */ 1339 if ( e && !saveit ) { 1340 if ( !opc->se ) 1341 opc->se = entry_dup( e ); 1342 overlay_entry_release_ov( op, e, 0, on ); 1343 e = opc->se; 1344 } 1345 if ( rc ) { 1346 Debug( LDAP_DEBUG_SYNC, "%s syncprov_matchops: " 1347 "%s check, error finding entry dn=%s in database\n", 1348 op->o_log_prefix, saveit ? "initial" : "final", fc.fdn->bv_val ); 1349 op->o_bd = b0; 1350 return; 1351 } 1352 } else { 1353 e = op->ora_e; 1354 if ( !saveit ) { 1355 if ( !opc->se ) 1356 opc->se = entry_dup( e ); 1357 e = opc->se; 1358 } 1359 } 1360 1361 if ( saveit || op->o_tag == LDAP_REQ_ADD ) { 1362 if ( op->o_tag == LDAP_REQ_MODRDN ) { 1363 ber_dupbv_x( &opc->sdn, &op->orr_newDN, op->o_tmpmemctx ); 1364 ber_dupbv_x( &opc->sndn, &op->orr_nnewDN, op->o_tmpmemctx ); 1365 } else { 1366 ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx ); 1367 ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx ); 1368 } 1369 opc->sreference = is_entry_referral( e ); 1370 a = attr_find( e->e_attrs, slap_schema.si_ad_entryUUID ); 1371 if ( a ) 1372 ber_dupbv_x( &opc->suuid, &a->a_nvals[0], op->o_tmpmemctx ); 1373 Debug( LDAP_DEBUG_SYNC, "%s syncprov_matchops: " 1374 "%srecording uuid for dn=%s on opc=%p\n", 1375 op->o_log_prefix, a ? "" : "not ", opc->sdn.bv_val, opc ); 1376 } 1377 1378 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 1379 for (pss = &si->si_ops; *pss; pss = gonext ? &(*pss)->s_next : pss) 1380 { 1381 Operation op2; 1382 Opheader oh; 1383 syncmatches *sm; 1384 int found = 0; 1385 syncops *snext, *ss = *pss; 1386 1387 gonext = 1; 1388 if ( ss->s_op->o_abandon ) 1389 continue; 1390 1391 /* Don't send ops back to the originator */ 1392 if ( opc->osid > 0 && opc->osid == ss->s_sid ) { 1393 Debug( LDAP_DEBUG_SYNC, "%s syncprov_matchops: " 1394 "skipping original sid %03x\n", 1395 ss->s_op->o_log_prefix, opc->osid ); 1396 continue; 1397 } 1398 1399 /* Don't send ops back to the messenger */ 1400 if ( opc->rsid > 0 && opc->rsid == ss->s_sid ) { 1401 Debug( LDAP_DEBUG_SYNC, "%s syncprov_matchops: " 1402 "skipping relayed sid %03x\n", 1403 ss->s_op->o_log_prefix, opc->rsid ); 1404 continue; 1405 } 1406 1407 /* validate base */ 1408 fc.fss = ss; 1409 fc.fbase = 0; 1410 fc.fscope = 0; 1411 1412 /* If the base of the search is missing, signal a refresh */ 1413 rc = syncprov_findbase( op, &fc ); 1414 if ( rc != LDAP_SUCCESS ) { 1415 SlapReply rs = {REP_RESULT}; 1416 send_ldap_error( ss->s_op, &rs, LDAP_SYNC_REFRESH_REQUIRED, 1417 "search base has changed" ); 1418 snext = ss->s_next; 1419 if ( syncprov_drop_psearch( ss, 1 ) ) 1420 *pss = snext; 1421 gonext = 0; 1422 continue; 1423 } 1424 1425 /* If we're sending results now, look for this op in old matches */ 1426 if ( !saveit ) { 1427 syncmatches *old; 1428 1429 /* Did we modify the search base? */ 1430 if ( dn_match( &op->o_req_ndn, &ss->s_base )) { 1431 ldap_pvt_thread_mutex_lock( &ss->s_mutex ); 1432 ss->s_flags |= PS_WROTE_BASE; 1433 ldap_pvt_thread_mutex_unlock( &ss->s_mutex ); 1434 } 1435 1436 for ( sm=opc->smatches, old=(syncmatches *)&opc->smatches; sm; 1437 old=sm, sm=sm->sm_next ) { 1438 if ( sm->sm_op == ss ) { 1439 found = 1; 1440 old->sm_next = sm->sm_next; 1441 op->o_tmpfree( sm, op->o_tmpmemctx ); 1442 break; 1443 } 1444 } 1445 } 1446 1447 rc = LDAP_COMPARE_FALSE; 1448 if ( e && !is_entry_glue( e ) && fc.fscope ) { 1449 ldap_pvt_thread_mutex_lock( &ss->s_mutex ); 1450 op2 = *ss->s_op; 1451 oh = *op->o_hdr; 1452 oh.oh_conn = ss->s_op->o_conn; 1453 oh.oh_connid = ss->s_op->o_connid; 1454 op2.o_bd = op->o_bd->bd_self; 1455 op2.o_hdr = &oh; 1456 op2.o_extra = op->o_extra; 1457 op2.o_callback = NULL; 1458 if (ss->s_flags & PS_FIX_FILTER) { 1459 /* Skip the AND/GE clause that we stuck on in front. We 1460 would lose deletes/mods that happen during the refresh 1461 phase otherwise (ITS#6555) */ 1462 op2.ors_filter = ss->s_op->ors_filter->f_and->f_next; 1463 } 1464 rc = test_filter( &op2, e, op2.ors_filter ); 1465 ldap_pvt_thread_mutex_unlock( &ss->s_mutex ); 1466 } 1467 1468 Debug( LDAP_DEBUG_TRACE, "%s syncprov_matchops: " 1469 "sid %03x fscope %d rc %d\n", 1470 ss->s_op->o_log_prefix, ss->s_sid, fc.fscope, rc ); 1471 1472 /* check if current o_req_dn is in scope and matches filter */ 1473 if ( fc.fscope && rc == LDAP_COMPARE_TRUE ) { 1474 if ( saveit ) { 1475 sm = op->o_tmpalloc( sizeof(syncmatches), op->o_tmpmemctx ); 1476 sm->sm_next = opc->smatches; 1477 sm->sm_op = ss; 1478 ldap_pvt_thread_mutex_lock( &ss->s_mutex ); 1479 ++ss->s_inuse; 1480 ldap_pvt_thread_mutex_unlock( &ss->s_mutex ); 1481 opc->smatches = sm; 1482 } else { 1483 /* if found send UPDATE else send ADD */ 1484 syncprov_qresp( opc, ss, 1485 found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD ); 1486 } 1487 } else if ( !saveit && found ) { 1488 /* send DELETE */ 1489 syncprov_qresp( opc, ss, LDAP_SYNC_DELETE ); 1490 } else if ( !saveit ) { 1491 syncprov_qresp( opc, ss, LDAP_SYNC_NEW_COOKIE ); 1492 } 1493 if ( !saveit && found ) { 1494 /* Decrement s_inuse, was incremented when called 1495 * with saveit == TRUE 1496 */ 1497 snext = ss->s_next; 1498 if ( syncprov_free_syncop( ss, FS_LOCK ) ) { 1499 *pss = snext; 1500 gonext = 0; 1501 } 1502 } 1503 } 1504 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 1505 1506 if ( op->o_tag != LDAP_REQ_ADD && e ) { 1507 if ( !SLAP_ISOVERLAY( op->o_bd )) { 1508 op->o_bd = &db; 1509 } 1510 if ( saveit ) 1511 overlay_entry_release_ov( op, e, 0, on ); 1512 op->o_bd = b0; 1513 } 1514 if ( !saveit ) { 1515 if ( opc->ssres.s_info ) 1516 free_resinfo( &opc->ssres ); 1517 else if ( opc->se ) 1518 entry_free( opc->se ); 1519 } 1520 op->o_bd = b0; 1521 } 1522 1523 static int 1524 syncprov_op_cleanup( Operation *op, SlapReply *rs ) 1525 { 1526 slap_callback *cb = op->o_callback; 1527 opcookie *opc = cb->sc_private; 1528 slap_overinst *on = opc->son; 1529 syncprov_info_t *si = on->on_bi.bi_private; 1530 syncmatches *sm, *snext; 1531 modtarget *mt; 1532 1533 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 1534 if ( si->si_active ) 1535 si->si_active--; 1536 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 1537 1538 for (sm = opc->smatches; sm; sm=snext) { 1539 snext = sm->sm_next; 1540 syncprov_free_syncop( sm->sm_op, FS_LOCK|FS_UNLINK ); 1541 op->o_tmpfree( sm, op->o_tmpmemctx ); 1542 } 1543 1544 /* Remove op from lock table */ 1545 mt = opc->smt; 1546 if ( mt ) { 1547 modinst *mi = (modinst *)(opc+1), **m2; 1548 ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); 1549 for (m2 = &mt->mt_mods; ; m2 = &(*m2)->mi_next) { 1550 if ( *m2 == mi ) { 1551 *m2 = mi->mi_next; 1552 if ( mt->mt_tail == mi ) 1553 mt->mt_tail = ( m2 == &mt->mt_mods ) ? NULL : (modinst *)m2; 1554 break; 1555 } 1556 } 1557 /* If there are more, promote the next one */ 1558 if ( mt->mt_mods ) { 1559 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 1560 } else { 1561 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 1562 ldap_pvt_thread_mutex_lock( &si->si_mods_mutex ); 1563 ldap_avl_delete( &si->si_mods, mt, sp_avl_cmp ); 1564 ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); 1565 ldap_pvt_thread_mutex_destroy( &mt->mt_mutex ); 1566 ch_free( mt->mt_dn.bv_val ); 1567 ch_free( mt ); 1568 } 1569 } 1570 if ( !BER_BVISNULL( &opc->suuid )) 1571 op->o_tmpfree( opc->suuid.bv_val, op->o_tmpmemctx ); 1572 if ( !BER_BVISNULL( &opc->sndn )) 1573 op->o_tmpfree( opc->sndn.bv_val, op->o_tmpmemctx ); 1574 if ( !BER_BVISNULL( &opc->sdn )) 1575 op->o_tmpfree( opc->sdn.bv_val, op->o_tmpmemctx ); 1576 op->o_callback = cb->sc_next; 1577 1578 if ( opc->ssres.s_info ) { 1579 free_resinfo( &opc->ssres ); 1580 } 1581 op->o_tmpfree(cb, op->o_tmpmemctx); 1582 1583 return 0; 1584 } 1585 1586 static void 1587 syncprov_checkpoint( Operation *op, slap_overinst *on ) 1588 { 1589 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 1590 Modifications mod; 1591 Operation opm; 1592 SlapReply rsm = {REP_RESULT}; 1593 slap_callback cb = {0}; 1594 BackendDB be; 1595 BackendInfo *bi; 1596 1597 #ifdef CHECK_CSN 1598 Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax; 1599 1600 int i; 1601 for ( i=0; i<si->si_numcsns; i++ ) { 1602 assert( !syn->ssyn_validate( syn, si->si_ctxcsn+i )); 1603 } 1604 #endif 1605 1606 Debug( LDAP_DEBUG_SYNC, "%s syncprov_checkpoint: running checkpoint\n", 1607 op->o_log_prefix ); 1608 1609 mod.sml_numvals = si->si_numcsns; 1610 mod.sml_values = si->si_ctxcsn; 1611 mod.sml_nvalues = NULL; 1612 mod.sml_desc = slap_schema.si_ad_contextCSN; 1613 mod.sml_op = LDAP_MOD_REPLACE; 1614 mod.sml_flags = SLAP_MOD_INTERNAL; 1615 mod.sml_next = NULL; 1616 1617 cb.sc_response = slap_null_cb; 1618 opm = *op; 1619 opm.o_tag = LDAP_REQ_MODIFY; 1620 opm.o_callback = &cb; 1621 opm.orm_modlist = &mod; 1622 opm.orm_no_opattrs = 1; 1623 if ( SLAP_GLUE_SUBORDINATE( op->o_bd )) { 1624 be = *on->on_info->oi_origdb; 1625 opm.o_bd = &be; 1626 } 1627 opm.o_req_dn = si->si_contextdn; 1628 opm.o_req_ndn = si->si_contextdn; 1629 bi = opm.o_bd->bd_info; 1630 opm.o_bd->bd_info = on->on_info->oi_orig; 1631 opm.o_managedsait = SLAP_CONTROL_NONCRITICAL; 1632 opm.o_no_schema_check = 1; 1633 opm.o_dont_replicate = 1; 1634 opm.o_opid = -1; 1635 opm.o_bd->be_modify( &opm, &rsm ); 1636 1637 if ( rsm.sr_err == LDAP_NO_SUCH_OBJECT && 1638 SLAP_SYNC_SUBENTRY( opm.o_bd )) { 1639 const char *text; 1640 char txtbuf[SLAP_TEXT_BUFLEN]; 1641 size_t textlen = sizeof txtbuf; 1642 Entry *e = slap_create_context_csn_entry( opm.o_bd, NULL ); 1643 rs_reinit( &rsm, REP_RESULT ); 1644 slap_mods2entry( &mod, &e, 0, 1, &text, txtbuf, textlen); 1645 opm.ora_e = e; 1646 opm.o_bd->be_add( &opm, &rsm ); 1647 if ( e == opm.ora_e ) 1648 be_entry_release_w( &opm, opm.ora_e ); 1649 } 1650 opm.o_bd->bd_info = bi; 1651 1652 if ( mod.sml_next != NULL ) { 1653 slap_mods_free( mod.sml_next, 1 ); 1654 } 1655 #ifdef CHECK_CSN 1656 for ( i=0; i<si->si_numcsns; i++ ) { 1657 assert( !syn->ssyn_validate( syn, si->si_ctxcsn+i )); 1658 } 1659 #endif 1660 } 1661 1662 static void 1663 syncprov_add_slog( Operation *op ) 1664 { 1665 opcookie *opc = op->o_callback->sc_private; 1666 slap_overinst *on = opc->son; 1667 syncprov_info_t *si = on->on_bi.bi_private; 1668 sessionlog *sl; 1669 slog_entry *se; 1670 char uuidstr[40]; 1671 int rc; 1672 1673 sl = si->si_logs; 1674 { 1675 if ( BER_BVISEMPTY( &op->o_csn ) ) { 1676 /* During the syncrepl refresh phase we can receive operations 1677 * without a csn. We cannot reliably determine the consumers 1678 * state with respect to such operations, so we ignore them and 1679 * wipe out anything in the log if we see them. 1680 */ 1681 ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex ); 1682 /* can only do this if no one else is reading the log at the moment */ 1683 if ( !sl->sl_playing ) { 1684 ldap_tavl_free( sl->sl_entries, (AVL_FREE)ch_free ); 1685 sl->sl_num = 0; 1686 sl->sl_entries = NULL; 1687 } 1688 ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex ); 1689 return; 1690 } 1691 1692 /* Allocate a record. UUIDs are not NUL-terminated. */ 1693 se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len + 1694 op->o_csn.bv_len + 1 ); 1695 se->se_tag = op->o_tag; 1696 1697 se->se_uuid.bv_val = (char *)(&se[1]); 1698 AC_MEMCPY( se->se_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len ); 1699 se->se_uuid.bv_len = opc->suuid.bv_len; 1700 1701 se->se_csn.bv_val = se->se_uuid.bv_val + opc->suuid.bv_len; 1702 AC_MEMCPY( se->se_csn.bv_val, op->o_csn.bv_val, op->o_csn.bv_len ); 1703 se->se_csn.bv_val[op->o_csn.bv_len] = '\0'; 1704 se->se_csn.bv_len = op->o_csn.bv_len; 1705 se->se_sid = slap_parse_csn_sid( &se->se_csn ); 1706 1707 ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex ); 1708 if ( LogTest( LDAP_DEBUG_SYNC ) ) { 1709 uuidstr[0] = 0; 1710 if ( !BER_BVISEMPTY( &opc->suuid ) ) { 1711 lutil_uuidstr_from_normalized( opc->suuid.bv_val, opc->suuid.bv_len, 1712 uuidstr, 40 ); 1713 } 1714 Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: " 1715 "adding csn=%s to sessionlog, uuid=%s\n", 1716 op->o_log_prefix, se->se_csn.bv_val, uuidstr ); 1717 } 1718 if ( !sl->sl_entries ) { 1719 if ( !sl->sl_mincsn ) { 1720 sl->sl_numcsns = 1; 1721 sl->sl_mincsn = ch_malloc( 2*sizeof( struct berval )); 1722 sl->sl_sids = ch_malloc( sizeof( int )); 1723 sl->sl_sids[0] = se->se_sid; 1724 ber_dupbv( sl->sl_mincsn, &se->se_csn ); 1725 BER_BVZERO( &sl->sl_mincsn[1] ); 1726 } 1727 } 1728 rc = ldap_tavl_insert( &sl->sl_entries, se, syncprov_sessionlog_cmp, ldap_avl_dup_error ); 1729 if ( rc ) { 1730 Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: " 1731 "duplicate sessionlog entry ignored: csn=%s, uuid=%s\n", 1732 op->o_log_prefix, se->se_csn.bv_val, uuidstr ); 1733 ch_free( se ); 1734 goto leave; 1735 } 1736 sl->sl_num++; 1737 if ( !sl->sl_playing && sl->sl_num > sl->sl_size ) { 1738 TAvlnode *edge = ldap_tavl_end( sl->sl_entries, TAVL_DIR_LEFT ); 1739 while ( sl->sl_num > sl->sl_size ) { 1740 int i; 1741 TAvlnode *next = ldap_tavl_next( edge, TAVL_DIR_RIGHT ); 1742 se = edge->avl_data; 1743 Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: " 1744 "expiring csn=%s from sessionlog (sessionlog size=%d)\n", 1745 op->o_log_prefix, se->se_csn.bv_val, sl->sl_num ); 1746 for ( i=0; i<sl->sl_numcsns; i++ ) 1747 if ( sl->sl_sids[i] >= se->se_sid ) 1748 break; 1749 if ( i == sl->sl_numcsns || sl->sl_sids[i] != se->se_sid ) { 1750 Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: " 1751 "adding csn=%s to mincsn\n", 1752 op->o_log_prefix, se->se_csn.bv_val ); 1753 slap_insert_csn_sids( (struct sync_cookie *)sl, 1754 i, se->se_sid, &se->se_csn ); 1755 } else { 1756 Debug( LDAP_DEBUG_SYNC, "%s syncprov_add_slog: " 1757 "updating mincsn for sid=%d csn=%s to %s\n", 1758 op->o_log_prefix, se->se_sid, sl->sl_mincsn[i].bv_val, se->se_csn.bv_val ); 1759 ber_bvreplace( &sl->sl_mincsn[i], &se->se_csn ); 1760 } 1761 ldap_tavl_delete( &sl->sl_entries, se, syncprov_sessionlog_cmp ); 1762 ch_free( se ); 1763 edge = next; 1764 sl->sl_num--; 1765 } 1766 } 1767 leave: 1768 ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex ); 1769 } 1770 } 1771 1772 /* Just set a flag if we found the matching entry */ 1773 static int 1774 playlog_cb( Operation *op, SlapReply *rs ) 1775 { 1776 if ( rs->sr_type == REP_SEARCH ) { 1777 op->o_callback->sc_private = (void *)1; 1778 } 1779 return rs->sr_err; 1780 } 1781 1782 /* 1783 * Check whether the last nmods UUIDs in the uuids list exist in the database 1784 * and (still) match the op filter, zero out the bv_len of any that still exist 1785 * and return the number of UUIDs we have confirmed are gone now. 1786 */ 1787 static int 1788 check_uuidlist_presence( 1789 Operation *op, 1790 struct berval *uuids, 1791 int len, 1792 int nmods ) 1793 { 1794 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 1795 Operation fop = *op; 1796 SlapReply frs = { REP_RESULT }; 1797 Filter mf, af; 1798 AttributeAssertion eq = ATTRIBUTEASSERTION_INIT; 1799 slap_callback cb = {0}; 1800 int i, mods = nmods; 1801 1802 fop.o_sync_mode = 0; 1803 fop.o_callback = &cb; 1804 fop.ors_limit = NULL; 1805 fop.ors_tlimit = SLAP_NO_LIMIT; 1806 fop.ors_attrs = slap_anlist_all_attributes; 1807 fop.ors_attrsonly = 0; 1808 fop.o_managedsait = SLAP_CONTROL_CRITICAL; 1809 1810 af.f_choice = LDAP_FILTER_AND; 1811 af.f_next = NULL; 1812 af.f_and = &mf; 1813 mf.f_choice = LDAP_FILTER_EQUALITY; 1814 mf.f_ava = &eq; 1815 mf.f_av_desc = slap_schema.si_ad_entryUUID; 1816 mf.f_next = fop.ors_filter; 1817 1818 fop.ors_filter = ⁡ 1819 1820 cb.sc_response = playlog_cb; 1821 1822 fop.o_bd->bd_info = (BackendInfo *)on->on_info; 1823 for ( i=0; i<nmods; i++ ) { 1824 mf.f_av_value = uuids[ len - 1 - i ]; 1825 cb.sc_private = NULL; 1826 fop.ors_slimit = 1; 1827 1828 if ( BER_BVISEMPTY( &mf.f_av_value ) ) { 1829 mods--; 1830 continue; 1831 } 1832 1833 rs_reinit( &frs, REP_RESULT ); 1834 fop.o_bd->be_search( &fop, &frs ); 1835 if ( cb.sc_private ) { 1836 uuids[ len - 1 - i ].bv_len = 0; 1837 mods--; 1838 } 1839 } 1840 fop.o_bd->bd_info = (BackendInfo *)on; 1841 1842 return mods; 1843 } 1844 1845 /* 1846 * On each entry we get from the DB: 1847 * - if it's an ADD, skip 1848 * - check we've not handled it yet, skip if we have 1849 * - check if it's a DELETE or missing from the DB now 1850 * - send a new syncinfo entry 1851 * - remember we've handled it already 1852 * 1853 * If we exhaust the list, clear it, forgetting entries we've handled so far. 1854 */ 1855 static int 1856 syncprov_accesslog_uuid_cb( Operation *op, SlapReply *rs ) 1857 { 1858 slap_callback *sc = op->o_callback; 1859 syncprov_accesslog_deletes *uuid_progress = sc->sc_private; 1860 Attribute *a, *attrs; 1861 sync_control *srs = uuid_progress->srs; 1862 struct berval *bv, csn[2] = {}, uuid[2] = {}, 1863 add = BER_BVC("add"), 1864 delete = BER_BVC("delete"), 1865 modrdn = BER_BVC("modrdn"); 1866 int cmp, sid, i, is_delete = 0, rc; 1867 1868 if ( rs->sr_type != REP_SEARCH ) { 1869 return rs->sr_err; 1870 } 1871 attrs = rs->sr_entry->e_attrs; 1872 1873 a = attr_find( attrs, ad_reqType ); 1874 if ( !a || a->a_numvals == 0 ) { 1875 rs->sr_err = LDAP_CONSTRAINT_VIOLATION; 1876 return rs->sr_err; 1877 } 1878 1879 if ( bvmatch( &a->a_nvals[0], &add ) ) { 1880 return rs->sr_err; 1881 } 1882 1883 if ( bvmatch( &a->a_nvals[0], &delete ) ) { 1884 is_delete = 1; 1885 } 1886 1887 if ( bvmatch( &a->a_nvals[0], &modrdn ) ) { 1888 a = attr_find( attrs, ad_reqDN ); 1889 if ( !a || a->a_numvals == 0 ) { 1890 rs->sr_err = LDAP_CONSTRAINT_VIOLATION; 1891 return rs->sr_err; 1892 } 1893 1894 /* Was it present in the first place? If not, skip: */ 1895 if ( !dnIsSuffix( &a->a_nvals[0], &uuid_progress->op->o_req_ndn ) ) { 1896 return rs->sr_err; 1897 } 1898 1899 a = attr_find( attrs, ad_reqNewDN ); 1900 if ( !a || a->a_numvals == 0 ) { 1901 rs->sr_err = LDAP_CONSTRAINT_VIOLATION; 1902 return rs->sr_err; 1903 } 1904 1905 /* Has it gone away? */ 1906 if ( !dnIsSuffix( &a->a_nvals[0], &uuid_progress->op->o_req_ndn ) ) { 1907 is_delete = 1; 1908 } 1909 } 1910 1911 /* 1912 * Only pick entries that are both: 1913 */ 1914 a = attr_find( attrs, slap_schema.si_ad_entryCSN ); 1915 if ( !a || a->a_numvals == 0 ) { 1916 rs->sr_err = LDAP_CONSTRAINT_VIOLATION; 1917 return rs->sr_err; 1918 } 1919 csn[0] = a->a_nvals[0]; 1920 1921 sid = slap_parse_csn_sid( &csn[0] ); 1922 1923 /* 1924 * newer than cookieCSN (srs->sr_state.ctxcsn) 1925 */ 1926 cmp = 1; 1927 for ( i=0; i<srs->sr_state.numcsns; i++ ) { 1928 if ( sid == srs->sr_state.sids[i] ) { 1929 cmp = ber_bvcmp( &csn[0], &srs->sr_state.ctxcsn[i] ); 1930 break; 1931 } 1932 } 1933 if ( cmp <= 0 ) { 1934 Debug( LDAP_DEBUG_SYNC, "%s syncprov_accesslog_uuid_cb: " 1935 "cmp %d, csn %s too old\n", 1936 op->o_log_prefix, cmp, csn[0].bv_val ); 1937 return rs->sr_err; 1938 } 1939 1940 /* 1941 * not newer than snapshot ctxcsn (uuid_progress->ctxcsn) 1942 */ 1943 cmp = 0; 1944 for ( i=0; i<uuid_progress->numcsns; i++ ) { 1945 if ( sid == uuid_progress->sids[i] ) { 1946 cmp = ber_bvcmp( &csn[0], &uuid_progress->ctxcsn[i] ); 1947 break; 1948 } 1949 } 1950 if ( cmp > 0 ) { 1951 Debug( LDAP_DEBUG_SYNC, "%s syncprov_accesslog_uuid_cb: " 1952 "cmp %d, csn %s too new\n", 1953 op->o_log_prefix, cmp, csn[0].bv_val ); 1954 return rs->sr_err; 1955 } 1956 1957 a = attr_find( attrs, ad_reqEntryUUID ); 1958 if ( !a || a->a_numvals == 0 ) { 1959 rs->sr_err = LDAP_CONSTRAINT_VIOLATION; 1960 return rs->sr_err; 1961 } 1962 uuid[0] = a->a_nvals[0]; 1963 1964 bv = ldap_avl_find( uuid_progress->uuids, uuid, sp_uuid_cmp ); 1965 if ( bv ) { 1966 /* Already checked or sent, no change */ 1967 Debug( LDAP_DEBUG_SYNC, "%s syncprov_accesslog_uuid_cb: " 1968 "uuid %s already checked\n", 1969 op->o_log_prefix, a->a_vals[0].bv_val ); 1970 return rs->sr_err; 1971 } 1972 1973 if ( !is_delete ) { 1974 is_delete = check_uuidlist_presence( uuid_progress->op, uuid, 1, 1 ); 1975 } 1976 Debug( LDAP_DEBUG_SYNC, "%s syncprov_accesslog_uuid_cb: " 1977 "uuid %s is %s present\n", 1978 op->o_log_prefix, a->a_vals[0].bv_val, 1979 is_delete ? "no longer" : "still" ); 1980 1981 i = uuid_progress->ndel++; 1982 1983 bv = &uuid_progress->uuid_list[i]; 1984 bv->bv_val = &uuid_progress->uuid_buf[i*UUID_LEN]; 1985 bv->bv_len = a->a_nvals[0].bv_len; 1986 AC_MEMCPY( bv->bv_val, a->a_nvals[0].bv_val, a->a_nvals[0].bv_len ); 1987 1988 rc = ldap_avl_insert( &uuid_progress->uuids, bv, sp_uuid_cmp, ldap_avl_dup_error ); 1989 assert( rc == LDAP_SUCCESS ); 1990 1991 if ( is_delete ) { 1992 struct berval cookie; 1993 1994 slap_compose_sync_cookie( op, &cookie, srs->sr_state.ctxcsn, 1995 srs->sr_state.rid, slap_serverID ? slap_serverID : -1, csn ); 1996 syncprov_sendinfo( uuid_progress->op, uuid_progress->rs, 1997 LDAP_TAG_SYNC_ID_SET, &cookie, 0, uuid, 1 ); 1998 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); 1999 } 2000 2001 if ( uuid_progress->ndel >= uuid_progress->list_len ) { 2002 int ndel; 2003 2004 assert( uuid_progress->ndel == uuid_progress->list_len ); 2005 ndel = ldap_avl_free( uuid_progress->uuids, NULL ); 2006 assert( ndel == uuid_progress->ndel ); 2007 uuid_progress->uuids = NULL; 2008 uuid_progress->ndel = 0; 2009 } 2010 2011 return rs->sr_err; 2012 } 2013 2014 static int 2015 syncprov_play_sessionlog( Operation *op, SlapReply *rs, sync_control *srs, 2016 BerVarray ctxcsn, int numcsns, int *sids, 2017 struct berval *mincsn, int minsid ) 2018 { 2019 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 2020 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 2021 sessionlog *sl = si->si_logs; 2022 int i, j, ndel, num, nmods, mmods, do_play = 0, rc = -1; 2023 BerVarray uuids, csns; 2024 struct berval uuid[2] = {}, csn[2] = {}; 2025 slog_entry *se; 2026 TAvlnode *entry; 2027 char cbuf[LDAP_PVT_CSNSTR_BUFSIZE]; 2028 struct berval delcsn[2]; 2029 2030 ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex ); 2031 /* Are there any log entries, and is the consumer state 2032 * present in the session log? 2033 */ 2034 if ( !sl->sl_num ) { 2035 ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex ); 2036 return rc; 2037 } 2038 assert( sl->sl_num > 0 ); 2039 2040 for ( i=0; i<sl->sl_numcsns; i++ ) { 2041 /* SID not present == new enough */ 2042 if ( minsid < sl->sl_sids[i] ) { 2043 do_play = 1; 2044 break; 2045 } 2046 /* SID present */ 2047 if ( minsid == sl->sl_sids[i] ) { 2048 /* new enough? */ 2049 if ( ber_bvcmp( mincsn, &sl->sl_mincsn[i] ) >= 0 ) 2050 do_play = 1; 2051 break; 2052 } 2053 } 2054 /* SID not present == new enough */ 2055 if ( i == sl->sl_numcsns ) 2056 do_play = 1; 2057 2058 if ( !do_play ) { 2059 ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex ); 2060 return rc; 2061 } 2062 2063 num = sl->sl_num; 2064 i = 0; 2065 nmods = 0; 2066 sl->sl_playing++; 2067 ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex ); 2068 2069 uuids = op->o_tmpalloc( (num) * sizeof( struct berval ) + 2070 num * UUID_LEN, op->o_tmpmemctx ); 2071 uuids[0].bv_val = (char *)(uuids + num); 2072 csns = op->o_tmpalloc( (num) * sizeof( struct berval ) + 2073 num * LDAP_PVT_CSNSTR_BUFSIZE, op->o_tmpmemctx ); 2074 csns[0].bv_val = (char *)(csns + num); 2075 2076 ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex ); 2077 /* Make a copy of the relevant UUIDs. Put the Deletes up front 2078 * and everything else at the end. Do this first so we can 2079 * let the write side manage the sessionlog again. 2080 */ 2081 assert( sl->sl_entries ); 2082 2083 /* Find first relevant log entry. If greater than mincsn, backtrack one entry */ 2084 { 2085 slog_entry te = {0}; 2086 te.se_csn = *mincsn; 2087 entry = ldap_tavl_find3( sl->sl_entries, &te, syncprov_sessionlog_cmp, &ndel ); 2088 } 2089 if ( ndel > 0 && entry ) 2090 entry = ldap_tavl_next( entry, TAVL_DIR_LEFT ); 2091 /* if none, just start at beginning */ 2092 if ( !entry ) 2093 entry = ldap_tavl_end( sl->sl_entries, TAVL_DIR_LEFT ); 2094 2095 do { 2096 char uuidstr[40] = {}; 2097 slog_entry *se = entry->avl_data; 2098 int k; 2099 2100 /* Make sure writes can still make progress */ 2101 ldap_pvt_thread_rdwr_runlock( &sl->sl_mutex ); 2102 ndel = 1; 2103 for ( k=0; k<srs->sr_state.numcsns; k++ ) { 2104 if ( se->se_sid == srs->sr_state.sids[k] ) { 2105 ndel = ber_bvcmp( &se->se_csn, &srs->sr_state.ctxcsn[k] ); 2106 break; 2107 } 2108 } 2109 if ( ndel <= 0 ) { 2110 ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex ); 2111 continue; 2112 } 2113 ndel = 0; 2114 for ( k=0; k<numcsns; k++ ) { 2115 if ( se->se_sid == sids[k] ) { 2116 ndel = ber_bvcmp( &se->se_csn, &ctxcsn[k] ); 2117 break; 2118 } 2119 } 2120 if ( ndel > 0 ) { 2121 Debug( LDAP_DEBUG_SYNC, "%s syncprov_play_sessionlog: " 2122 "cmp %d, csn %s too new, we're finished\n", 2123 op->o_log_prefix, ndel, se->se_csn.bv_val ); 2124 ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex ); 2125 break; 2126 } 2127 if ( se->se_tag == LDAP_REQ_DELETE ) { 2128 j = i; 2129 i++; 2130 } else { 2131 if ( se->se_tag == LDAP_REQ_ADD ) { 2132 ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex ); 2133 continue; 2134 } 2135 nmods++; 2136 j = num - nmods; 2137 } 2138 uuids[j].bv_val = uuids[0].bv_val + (j * UUID_LEN); 2139 AC_MEMCPY(uuids[j].bv_val, se->se_uuid.bv_val, UUID_LEN); 2140 uuids[j].bv_len = UUID_LEN; 2141 2142 csns[j].bv_val = csns[0].bv_val + (j * LDAP_PVT_CSNSTR_BUFSIZE); 2143 AC_MEMCPY(csns[j].bv_val, se->se_csn.bv_val, se->se_csn.bv_len); 2144 csns[j].bv_len = se->se_csn.bv_len; 2145 /* We're printing it */ 2146 csns[j].bv_val[csns[j].bv_len] = '\0'; 2147 2148 if ( LogTest( LDAP_DEBUG_SYNC ) ) { 2149 lutil_uuidstr_from_normalized( uuids[j].bv_val, uuids[j].bv_len, 2150 uuidstr, 40 ); 2151 Debug( LDAP_DEBUG_SYNC, "%s syncprov_play_sessionlog: " 2152 "picking a %s entry uuid=%s cookie=%s\n", 2153 op->o_log_prefix, se->se_tag == LDAP_REQ_DELETE ? "deleted" : "modified", 2154 uuidstr, csns[j].bv_val ); 2155 } 2156 ldap_pvt_thread_rdwr_rlock( &sl->sl_mutex ); 2157 } while ( (entry = ldap_tavl_next( entry, TAVL_DIR_RIGHT )) != NULL ); 2158 ldap_pvt_thread_rdwr_runlock( &sl->sl_mutex ); 2159 ldap_pvt_thread_rdwr_wlock( &sl->sl_mutex ); 2160 sl->sl_playing--; 2161 ldap_pvt_thread_rdwr_wunlock( &sl->sl_mutex ); 2162 2163 ndel = i; 2164 2165 /* Zero out unused slots */ 2166 for ( i=ndel; i < num - nmods; i++ ) 2167 uuids[i].bv_len = 0; 2168 2169 /* Mods must be validated to see if they belong in this delete set. 2170 */ 2171 2172 mmods = nmods; 2173 /* Strip any duplicates */ 2174 for ( i=0; i<nmods; i++ ) { 2175 for ( j=0; j<ndel; j++ ) { 2176 if ( bvmatch( &uuids[j], &uuids[num - 1 - i] )) { 2177 uuids[num - 1 - i].bv_len = 0; 2178 mmods --; 2179 break; 2180 } 2181 } 2182 if ( uuids[num - 1 - i].bv_len == 0 ) continue; 2183 for ( j=0; j<i; j++ ) { 2184 if ( bvmatch( &uuids[num - 1 - j], &uuids[num - 1 - i] )) { 2185 uuids[num - 1 - i].bv_len = 0; 2186 mmods --; 2187 break; 2188 } 2189 } 2190 } 2191 2192 /* Check mods now */ 2193 if ( mmods ) { 2194 check_uuidlist_presence( op, uuids, num, nmods ); 2195 } 2196 2197 /* ITS#8768 Send entries sorted by CSN order */ 2198 i = j = 0; 2199 while ( i < ndel || j < nmods ) { 2200 struct berval cookie; 2201 int index; 2202 2203 /* Skip over duplicate mods */ 2204 if ( j < nmods && BER_BVISEMPTY( &uuids[ num - 1 - j ] ) ) { 2205 j++; 2206 continue; 2207 } 2208 index = num - 1 - j; 2209 2210 if ( i >= ndel ) { 2211 j++; 2212 } else if ( j >= nmods ) { 2213 index = i++; 2214 /* Take the oldest by CSN order */ 2215 } else if ( ber_bvcmp( &csns[index], &csns[i] ) < 0 ) { 2216 j++; 2217 } else { 2218 index = i++; 2219 } 2220 2221 uuid[0] = uuids[index]; 2222 csn[0] = csns[index]; 2223 2224 slap_compose_sync_cookie( op, &cookie, srs->sr_state.ctxcsn, 2225 srs->sr_state.rid, slap_serverID ? slap_serverID : -1, csn ); 2226 if ( LogTest( LDAP_DEBUG_SYNC ) ) { 2227 char uuidstr[40]; 2228 lutil_uuidstr_from_normalized( uuid[0].bv_val, uuid[0].bv_len, 2229 uuidstr, 40 ); 2230 Debug( LDAP_DEBUG_SYNC, "%s syncprov_play_sessionlog: " 2231 "sending a new disappearing entry uuid=%s cookie=%s\n", 2232 op->o_log_prefix, uuidstr, cookie.bv_val ); 2233 } 2234 2235 /* TODO: we might batch those that share the same CSN (think present 2236 * phase), but would have to limit how many we send out at once */ 2237 syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, &cookie, 0, uuid, 1 ); 2238 } 2239 op->o_tmpfree( uuids, op->o_tmpmemctx ); 2240 op->o_tmpfree( csns, op->o_tmpmemctx ); 2241 2242 return LDAP_SUCCESS; 2243 } 2244 2245 static int 2246 syncprov_play_accesslog( Operation *op, SlapReply *rs, sync_control *srs, 2247 BerVarray ctxcsn, int numcsns, int *sids, 2248 struct berval *mincsn, int minsid ) 2249 { 2250 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 2251 syncprov_info_t *si = on->on_bi.bi_private; 2252 Operation fop; 2253 SlapReply frs = { REP_RESULT }; 2254 slap_callback cb = {}; 2255 Filter *f; 2256 syncprov_accesslog_deletes uuid_progress = { 2257 .op = op, 2258 .rs = rs, 2259 .srs = srs, 2260 .ctxcsn = ctxcsn, 2261 .numcsns = numcsns, 2262 .sids = sids, 2263 }; 2264 struct berval oldestcsn = BER_BVNULL, newestcsn = BER_BVNULL, 2265 basedn, filterpattern = BER_BVC( 2266 "(&" 2267 "(entryCSN>=%s)" 2268 "(entryCSN<=%s)" 2269 "(reqResult=0)" 2270 "(|" 2271 "(reqDN:dnSubtreeMatch:=%s)" 2272 "(reqNewDN:dnSubtreeMatch:=%s)" 2273 ")" 2274 "(|" 2275 "(objectclass=auditWriteObject)" 2276 "(objectclass=auditExtended)" 2277 "))" ); 2278 BackendDB *db; 2279 Entry *e; 2280 Attribute *a; 2281 int *minsids, i, j = 0, rc = -1; 2282 2283 assert( !BER_BVISNULL( &si->si_logbase ) ); 2284 2285 db = select_backend( &si->si_logbase, 0 ); 2286 if ( !db ) { 2287 Debug( LDAP_DEBUG_ANY, "%s syncprov_play_accesslog: " 2288 "No database configured to hold accesslog dn=%s\n", 2289 op->o_log_prefix, si->si_logbase.bv_val ); 2290 return LDAP_NO_SUCH_OBJECT; 2291 } 2292 2293 fop = *op; 2294 fop.o_sync_mode = 0; 2295 fop.o_bd = db; 2296 rc = be_entry_get_rw( &fop, &si->si_logbase, NULL, ad_minCSN, 0, &e ); 2297 if ( rc ) { 2298 return rc; 2299 } 2300 2301 a = attr_find( e->e_attrs, ad_minCSN ); 2302 if ( !a ) { 2303 be_entry_release_rw( &fop, e, 0 ); 2304 return LDAP_NO_SUCH_ATTRIBUTE; 2305 } 2306 2307 /* 2308 * If we got here: 2309 * - the consumer's cookie (srs->sr_state.ctxcsn) has the same sids in the 2310 * same order as ctxcsn 2311 * - at least one of the cookie's csns is older than its ctxcsn counterpart 2312 * 2313 * Now prepare the filter, we want it to be the union of all the intervals 2314 * between the cookie and our contextCSN for each sid. Right now, we can't 2315 * specify them separately, so just pick the boundary CSNs of non-empty 2316 * intervals as a conservative overestimate. 2317 * 2318 * Also check accesslog can actually serve this query based on what's 2319 * stored in minCSN. 2320 */ 2321 2322 assert( srs->sr_state.numcsns == numcsns ); 2323 2324 minsids = slap_parse_csn_sids( a->a_nvals, a->a_numvals, op->o_tmpmemctx ); 2325 slap_sort_csn_sids( a->a_nvals, minsids, a->a_numvals, op->o_tmpmemctx ); 2326 for ( i=0, j=0; i < numcsns; i++ ) { 2327 assert( srs->sr_state.sids[i] == sids[i] ); 2328 if ( ber_bvcmp( &srs->sr_state.ctxcsn[i], &ctxcsn[i] ) >= 0 ) { 2329 /* Consumer is up to date for this sid */ 2330 continue; 2331 } 2332 for ( ; j < a->a_numvals && minsids[j] < sids[i]; j++ ) 2333 /* Find the right minCSN, if present */; 2334 if ( j == a->a_numvals || minsids[j] != sids[i] || 2335 ber_bvcmp( &srs->sr_state.ctxcsn[i], &a->a_nvals[j] ) < 0 ) { 2336 /* Consumer is missing changes for a sid and minCSN indicates we 2337 * can't replay all relevant history */ 2338 Debug( LDAP_DEBUG_SYNC, "%s syncprov_play_accesslog: " 2339 "accesslog information inadequate for log replay on csn=%s\n", 2340 op->o_log_prefix, srs->sr_state.ctxcsn[i].bv_val ); 2341 slap_sl_free( minsids, op->o_tmpmemctx ); 2342 be_entry_release_rw( &fop, e, 0 ); 2343 return 1; 2344 } 2345 if ( BER_BVISEMPTY( &oldestcsn ) || 2346 ber_bvcmp( &oldestcsn, &srs->sr_state.ctxcsn[i] ) > 0 ) { 2347 oldestcsn = srs->sr_state.ctxcsn[i]; 2348 } 2349 if ( BER_BVISEMPTY( &newestcsn ) || 2350 ber_bvcmp( &newestcsn, &ctxcsn[i] ) < 0 ) { 2351 newestcsn = ctxcsn[i]; 2352 } 2353 } 2354 assert( !BER_BVISEMPTY( &oldestcsn ) && !BER_BVISEMPTY( &newestcsn ) && 2355 ber_bvcmp( &oldestcsn, &newestcsn ) < 0 ); 2356 slap_sl_free( minsids, op->o_tmpmemctx ); 2357 2358 filter_escape_value_x( &op->o_req_ndn, &basedn, fop.o_tmpmemctx ); 2359 /* filter_escape_value_x sets output to BVNULL if input value is empty, 2360 * supply our own copy */ 2361 if ( BER_BVISEMPTY( &basedn ) ) { 2362 basedn.bv_val = ""; 2363 } 2364 fop.o_req_ndn = fop.o_req_dn = si->si_logbase; 2365 fop.ors_filterstr.bv_val = fop.o_tmpalloc( 2366 filterpattern.bv_len + 2367 oldestcsn.bv_len + newestcsn.bv_len + 2 * basedn.bv_len, 2368 fop.o_tmpmemctx ); 2369 fop.ors_filterstr.bv_len = sprintf( fop.ors_filterstr.bv_val, 2370 filterpattern.bv_val, 2371 oldestcsn.bv_val, newestcsn.bv_val, basedn.bv_val, basedn.bv_val ); 2372 Debug( LDAP_DEBUG_SYNC, "%s syncprov_play_accesslog: " 2373 "prepared filter '%s', base='%s'\n", 2374 op->o_log_prefix, fop.ors_filterstr.bv_val, si->si_logbase.bv_val ); 2375 f = str2filter_x( &fop, fop.ors_filterstr.bv_val ); 2376 assert( f != NULL ); 2377 fop.ors_filter = f; 2378 2379 if ( !BER_BVISEMPTY( &basedn ) ) { 2380 fop.o_tmpfree( basedn.bv_val, fop.o_tmpmemctx ); 2381 } 2382 be_entry_release_rw( &fop, e, 0 ); 2383 2384 /* 2385 * Allocate memory for list_len uuids for use by the callback, populate 2386 * with entries that we have sent or checked still match the filter. 2387 * A disappearing entry gets its uuid sent as a delete. 2388 * 2389 * in the callback, we need: 2390 * - original op and rs so we can send the message 2391 * - sync_control 2392 * - the uuid buffer and list and their length 2393 * - number of uuids we already have in the list 2394 * - the lookup structure so we don't have to check/send a uuid twice 2395 * (AVL?) 2396 */ 2397 uuid_progress.list_len = SLAP_SYNCUUID_SET_SIZE; 2398 uuid_progress.uuid_list = fop.o_tmpalloc( (uuid_progress.list_len) * sizeof(struct berval), fop.o_tmpmemctx ); 2399 uuid_progress.uuid_buf = fop.o_tmpalloc( (uuid_progress.list_len) * UUID_LEN, fop.o_tmpmemctx ); 2400 2401 cb.sc_private = &uuid_progress; 2402 cb.sc_response = syncprov_accesslog_uuid_cb; 2403 2404 fop.o_callback = &cb; 2405 2406 rc = fop.o_bd->be_search( &fop, &frs ); 2407 2408 ldap_avl_free( uuid_progress.uuids, NULL ); 2409 fop.o_tmpfree( uuid_progress.uuid_buf, fop.o_tmpmemctx ); 2410 fop.o_tmpfree( uuid_progress.uuid_list, fop.o_tmpmemctx ); 2411 fop.o_tmpfree( fop.ors_filterstr.bv_val, fop.o_tmpmemctx ); 2412 filter_free_x( &fop, f, 1 ); 2413 2414 return rc; 2415 } 2416 2417 static int 2418 syncprov_new_ctxcsn( opcookie *opc, syncprov_info_t *si, int csn_changed, int numvals, BerVarray vals ) 2419 { 2420 unsigned i; 2421 int j, sid; 2422 2423 for ( i=0; i<numvals; i++ ) { 2424 sid = slap_parse_csn_sid( &vals[i] ); 2425 for ( j=0; j<si->si_numcsns; j++ ) { 2426 if ( sid < si->si_sids[j] ) 2427 break; 2428 if ( sid == si->si_sids[j] ) { 2429 if ( ber_bvcmp( &vals[i], &si->si_ctxcsn[j] ) > 0 ) { 2430 ber_bvreplace( &si->si_ctxcsn[j], &vals[i] ); 2431 csn_changed = 1; 2432 } 2433 break; 2434 } 2435 } 2436 2437 if ( j == si->si_numcsns || sid != si->si_sids[j] ) { 2438 slap_insert_csn_sids( (struct sync_cookie *)&si->si_ctxcsn, 2439 j, sid, &vals[i] ); 2440 csn_changed = 1; 2441 } 2442 } 2443 if ( csn_changed ) 2444 si->si_dirty = 0; 2445 ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); 2446 2447 if ( csn_changed ) { 2448 syncops *ss; 2449 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 2450 for ( ss = si->si_ops; ss; ss = ss->s_next ) { 2451 if ( ss->s_op->o_abandon ) 2452 continue; 2453 /* Send the updated csn to all syncrepl consumers, 2454 * including the server from which it originated. 2455 * The syncrepl consumer and syncprov provider on 2456 * the originating server may be configured to store 2457 * their csn values in different entries. 2458 */ 2459 syncprov_qresp( opc, ss, LDAP_SYNC_NEW_COOKIE ); 2460 } 2461 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 2462 } 2463 return csn_changed; 2464 } 2465 2466 static int 2467 syncprov_op_response( Operation *op, SlapReply *rs ) 2468 { 2469 opcookie *opc = op->o_callback->sc_private; 2470 slap_overinst *on = opc->son; 2471 syncprov_info_t *si = on->on_bi.bi_private; 2472 syncmatches *sm; 2473 2474 if ( rs->sr_err == LDAP_SUCCESS ) 2475 { 2476 struct berval maxcsn; 2477 char cbuf[LDAP_PVT_CSNSTR_BUFSIZE]; 2478 int do_check = 0, have_psearches, foundit, csn_changed = 0; 2479 2480 ldap_pvt_thread_mutex_lock( &si->si_resp_mutex ); 2481 2482 /* Update our context CSN */ 2483 cbuf[0] = '\0'; 2484 maxcsn.bv_val = cbuf; 2485 maxcsn.bv_len = sizeof(cbuf); 2486 ldap_pvt_thread_rdwr_wlock( &si->si_csn_rwlock ); 2487 2488 slap_get_commit_csn( op, &maxcsn, &foundit ); 2489 if ( BER_BVISEMPTY( &maxcsn ) && SLAP_GLUE_SUBORDINATE( op->o_bd )) { 2490 /* syncrepl queues the CSN values in the db where 2491 * it is configured , not where the changes are made. 2492 * So look for a value in the glue db if we didn't 2493 * find any in this db. 2494 */ 2495 BackendDB *be = op->o_bd; 2496 op->o_bd = select_backend( &be->be_nsuffix[0], 1); 2497 maxcsn.bv_val = cbuf; 2498 maxcsn.bv_len = sizeof(cbuf); 2499 slap_get_commit_csn( op, &maxcsn, &foundit ); 2500 op->o_bd = be; 2501 } 2502 if ( !BER_BVISEMPTY( &maxcsn ) ) { 2503 int i, sid; 2504 #ifdef CHECK_CSN 2505 Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax; 2506 assert( !syn->ssyn_validate( syn, &maxcsn )); 2507 #endif 2508 sid = slap_parse_csn_sid( &maxcsn ); 2509 for ( i=0; i<si->si_numcsns; i++ ) { 2510 if ( sid < si->si_sids[i] ) 2511 break; 2512 if ( sid == si->si_sids[i] ) { 2513 if ( ber_bvcmp( &maxcsn, &si->si_ctxcsn[i] ) > 0 ) { 2514 ber_bvreplace( &si->si_ctxcsn[i], &maxcsn ); 2515 csn_changed = 1; 2516 } 2517 break; 2518 } 2519 } 2520 /* It's a new SID for us */ 2521 if ( i == si->si_numcsns || sid != si->si_sids[i] ) { 2522 slap_insert_csn_sids((struct sync_cookie *)&(si->si_ctxcsn), 2523 i, sid, &maxcsn ); 2524 csn_changed = 1; 2525 } 2526 } 2527 2528 /* Don't do any processing for consumer contextCSN updates */ 2529 if ( SLAPD_SYNC_IS_SYNCCONN( op->o_connid ) && 2530 op->o_tag == LDAP_REQ_MODIFY && 2531 op->orm_modlist && 2532 op->orm_modlist->sml_op == LDAP_MOD_REPLACE && 2533 op->orm_modlist->sml_desc == slap_schema.si_ad_contextCSN ) { 2534 /* Catch contextCSN updates from syncrepl. We have to look at 2535 * all the attribute values, as there may be more than one csn 2536 * that changed, and only one can be passed in the csn queue. 2537 */ 2538 csn_changed = syncprov_new_ctxcsn( opc, si, csn_changed, 2539 op->orm_modlist->sml_numvals, op->orm_modlist->sml_values ); 2540 if ( csn_changed ) 2541 si->si_numops++; 2542 goto leave; 2543 } 2544 if ( op->o_dont_replicate ) { 2545 if ( csn_changed ) 2546 si->si_numops++; 2547 ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); 2548 goto leave; 2549 } 2550 2551 /* If we're adding the context entry, parse all of its contextCSNs */ 2552 if ( op->o_tag == LDAP_REQ_ADD && 2553 dn_match( &op->o_req_ndn, &si->si_contextdn )) { 2554 Attribute *a = attr_find( op->ora_e->e_attrs, slap_schema.si_ad_contextCSN ); 2555 if ( a ) { 2556 csn_changed = syncprov_new_ctxcsn( opc, si, csn_changed, a->a_numvals, a->a_vals ); 2557 if ( csn_changed ) 2558 si->si_numops++; 2559 goto added; 2560 } 2561 } 2562 2563 if ( csn_changed ) 2564 si->si_numops++; 2565 if ( si->si_chkops || si->si_chktime ) { 2566 /* Never checkpoint adding the context entry, 2567 * it will deadlock 2568 */ 2569 if ( op->o_tag != LDAP_REQ_ADD || 2570 !dn_match( &op->o_req_ndn, &si->si_contextdn )) { 2571 if ( si->si_chkops && si->si_numops >= si->si_chkops ) { 2572 do_check = 1; 2573 si->si_numops = 0; 2574 } 2575 if ( si->si_chktime && 2576 (op->o_time - si->si_chklast >= si->si_chktime )) { 2577 if ( si->si_chklast ) { 2578 do_check = 1; 2579 si->si_chklast = op->o_time; 2580 } else { 2581 si->si_chklast = 1; 2582 } 2583 } 2584 } 2585 } 2586 si->si_dirty = !csn_changed; 2587 ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); 2588 2589 added: 2590 if ( do_check ) { 2591 ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); 2592 syncprov_checkpoint( op, on ); 2593 ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); 2594 } 2595 2596 /* only update consumer ctx if this is a newer csn */ 2597 if ( csn_changed ) { 2598 opc->sctxcsn = maxcsn; 2599 } 2600 2601 /* Handle any persistent searches */ 2602 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 2603 have_psearches = ( si->si_ops != NULL ); 2604 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 2605 if ( have_psearches ) { 2606 syncprov_matchops( op, opc, 0 ); 2607 } 2608 2609 /* Add any log records */ 2610 if ( si->si_logs ) { 2611 syncprov_add_slog( op ); 2612 } 2613 leave: ldap_pvt_thread_mutex_unlock( &si->si_resp_mutex ); 2614 } 2615 return SLAP_CB_CONTINUE; 2616 } 2617 2618 /* We don't use a subentry to store the context CSN any more. 2619 * We expose the current context CSN as an operational attribute 2620 * of the suffix entry. 2621 */ 2622 static int 2623 syncprov_op_compare( Operation *op, SlapReply *rs ) 2624 { 2625 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 2626 syncprov_info_t *si = on->on_bi.bi_private; 2627 int rc = SLAP_CB_CONTINUE; 2628 2629 if ( dn_match( &op->o_req_ndn, &si->si_contextdn ) && 2630 op->oq_compare.rs_ava->aa_desc == slap_schema.si_ad_contextCSN ) 2631 { 2632 Entry e = {0}; 2633 Attribute a = {0}; 2634 2635 e.e_name = si->si_contextdn; 2636 e.e_nname = si->si_contextdn; 2637 e.e_attrs = &a; 2638 2639 a.a_desc = slap_schema.si_ad_contextCSN; 2640 2641 ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); 2642 2643 a.a_vals = si->si_ctxcsn; 2644 a.a_nvals = a.a_vals; 2645 a.a_numvals = si->si_numcsns; 2646 2647 rs->sr_err = access_allowed( op, &e, op->oq_compare.rs_ava->aa_desc, 2648 &op->oq_compare.rs_ava->aa_value, ACL_COMPARE, NULL ); 2649 if ( ! rs->sr_err ) { 2650 rs->sr_err = LDAP_INSUFFICIENT_ACCESS; 2651 goto return_results; 2652 } 2653 2654 if ( get_assert( op ) && 2655 ( test_filter( op, &e, get_assertion( op ) ) != LDAP_COMPARE_TRUE ) ) 2656 { 2657 rs->sr_err = LDAP_ASSERTION_FAILED; 2658 goto return_results; 2659 } 2660 2661 2662 rs->sr_err = LDAP_COMPARE_FALSE; 2663 2664 if ( attr_valfind( &a, 2665 SLAP_MR_ATTRIBUTE_VALUE_NORMALIZED_MATCH | 2666 SLAP_MR_ASSERTED_VALUE_NORMALIZED_MATCH, 2667 &op->oq_compare.rs_ava->aa_value, NULL, op->o_tmpmemctx ) == 0 ) 2668 { 2669 rs->sr_err = LDAP_COMPARE_TRUE; 2670 } 2671 2672 return_results:; 2673 2674 ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); 2675 2676 send_ldap_result( op, rs ); 2677 2678 if( rs->sr_err == LDAP_COMPARE_FALSE || rs->sr_err == LDAP_COMPARE_TRUE ) { 2679 rs->sr_err = LDAP_SUCCESS; 2680 } 2681 rc = rs->sr_err; 2682 } 2683 2684 return rc; 2685 } 2686 2687 static int 2688 syncprov_op_mod( Operation *op, SlapReply *rs ) 2689 { 2690 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 2691 syncprov_info_t *si = on->on_bi.bi_private; 2692 slap_callback *cb; 2693 opcookie *opc; 2694 int have_psearches, cbsize; 2695 2696 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 2697 have_psearches = ( si->si_ops != NULL ); 2698 si->si_active++; 2699 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 2700 2701 cbsize = sizeof(slap_callback) + sizeof(opcookie) + 2702 (have_psearches ? sizeof(modinst) : 0 ); 2703 2704 cb = op->o_tmpcalloc(1, cbsize, op->o_tmpmemctx); 2705 opc = (opcookie *)(cb+1); 2706 opc->son = on; 2707 cb->sc_response = syncprov_op_response; 2708 cb->sc_cleanup = syncprov_op_cleanup; 2709 cb->sc_private = opc; 2710 cb->sc_next = op->o_callback; 2711 op->o_callback = cb; 2712 2713 opc->osid = -1; 2714 opc->rsid = -1; 2715 if ( op->o_csn.bv_val ) { 2716 opc->osid = slap_parse_csn_sid( &op->o_csn ); 2717 } 2718 if ( op->o_controls ) { 2719 struct sync_cookie *scook = 2720 op->o_controls[slap_cids.sc_LDAPsync]; 2721 if ( scook ) 2722 opc->rsid = scook->sid; 2723 } 2724 2725 if ( op->o_dont_replicate ) 2726 return SLAP_CB_CONTINUE; 2727 2728 /* If there are active persistent searches, lock this operation. 2729 * See seqmod.c for the locking logic on its own. 2730 */ 2731 if ( have_psearches ) { 2732 modtarget *mt, mtdummy; 2733 modinst *mi; 2734 2735 mi = (modinst *)(opc+1); 2736 mi->mi_op = op; 2737 2738 /* See if we're already modifying this entry... */ 2739 mtdummy.mt_dn = op->o_req_ndn; 2740 retry: 2741 ldap_pvt_thread_mutex_lock( &si->si_mods_mutex ); 2742 mt = ldap_avl_find( si->si_mods, &mtdummy, sp_avl_cmp ); 2743 if ( mt ) { 2744 ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); 2745 if ( mt->mt_mods == NULL ) { 2746 /* Cannot reuse this mt, as another thread is about 2747 * to release it in syncprov_op_cleanup. Wait for them 2748 * to finish; our own insert is required to succeed. 2749 */ 2750 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 2751 ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); 2752 ldap_pvt_thread_yield(); 2753 goto retry; 2754 } 2755 } 2756 if ( mt ) { 2757 mt->mt_tail->mi_next = mi; 2758 mt->mt_tail = mi; 2759 ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); 2760 /* wait for this op to get to head of list */ 2761 while ( mt->mt_mods != mi ) { 2762 modinst *m2; 2763 /* don't wait on other mods from the same thread */ 2764 for ( m2 = mt->mt_mods; m2; m2 = m2->mi_next ) { 2765 if ( m2->mi_op->o_threadctx == op->o_threadctx ) { 2766 break; 2767 } 2768 } 2769 if ( m2 ) 2770 break; 2771 2772 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 2773 /* FIXME: if dynamic config can delete overlays or 2774 * databases we'll have to check for cleanup here. 2775 * Currently it's not an issue because there are 2776 * no dynamic config deletes... 2777 */ 2778 if ( slapd_shutdown ) 2779 return SLAPD_ABANDON; 2780 2781 if ( !ldap_pvt_thread_pool_pausewait( &connection_pool )) 2782 ldap_pvt_thread_yield(); 2783 ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); 2784 2785 /* clean up if the caller is giving up */ 2786 if ( op->o_abandon ) { 2787 modinst **m2; 2788 slap_callback **sc; 2789 for (m2 = &mt->mt_mods; ; m2 = &(*m2)->mi_next) { 2790 if ( *m2 == mi ) { 2791 *m2 = mi->mi_next; 2792 if ( mt->mt_tail == mi ) 2793 mt->mt_tail = ( m2 == &mt->mt_mods ) ? NULL : (modinst *)m2; 2794 break; 2795 } 2796 } 2797 for (sc = &op->o_callback; ; sc = &(*sc)->sc_next) { 2798 if ( *sc == cb ) { 2799 *sc = cb->sc_next; 2800 break; 2801 } 2802 } 2803 op->o_tmpfree( cb, op->o_tmpmemctx ); 2804 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 2805 return SLAPD_ABANDON; 2806 } 2807 } 2808 ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); 2809 } else { 2810 /* Record that we're modifying this entry now */ 2811 mt = ch_malloc( sizeof(modtarget) ); 2812 mt->mt_mods = mi; 2813 mt->mt_tail = mi; 2814 ber_dupbv( &mt->mt_dn, &mi->mi_op->o_req_ndn ); 2815 ldap_pvt_thread_mutex_init( &mt->mt_mutex ); 2816 ldap_avl_insert( &si->si_mods, mt, sp_avl_cmp, ldap_avl_dup_error ); 2817 ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); 2818 } 2819 opc->smt = mt; 2820 } 2821 2822 if (( have_psearches || si->si_logs ) && op->o_tag != LDAP_REQ_ADD ) 2823 syncprov_matchops( op, opc, 1 ); 2824 2825 return SLAP_CB_CONTINUE; 2826 } 2827 2828 static int 2829 syncprov_op_extended( Operation *op, SlapReply *rs ) 2830 { 2831 if ( exop_is_write( op )) 2832 return syncprov_op_mod( op, rs ); 2833 2834 return SLAP_CB_CONTINUE; 2835 } 2836 2837 typedef struct searchstate { 2838 slap_overinst *ss_on; 2839 syncops *ss_so; 2840 BerVarray ss_ctxcsn; 2841 int *ss_sids; 2842 int ss_numcsns; 2843 #define SS_PRESENT 0x01 2844 #define SS_CHANGED 0x02 2845 int ss_flags; 2846 } searchstate; 2847 2848 typedef struct SyncOperationBuffer { 2849 Operation sob_op; 2850 Opheader sob_hdr; 2851 OpExtra sob_oe; 2852 AttributeName sob_extra; /* not always present */ 2853 /* Further data allocated here */ 2854 } SyncOperationBuffer; 2855 2856 static void 2857 syncprov_detach_op( Operation *op, syncops *so, slap_overinst *on ) 2858 { 2859 SyncOperationBuffer *sopbuf2; 2860 Operation *op2; 2861 int i, alen = 0; 2862 size_t size; 2863 char *ptr; 2864 GroupAssertion *g1, *g2; 2865 2866 /* count the search attrs */ 2867 for (i=0; op->ors_attrs && !BER_BVISNULL( &op->ors_attrs[i].an_name ); i++) { 2868 alen += op->ors_attrs[i].an_name.bv_len + 1; 2869 } 2870 /* Make a new copy of the operation */ 2871 size = offsetof( SyncOperationBuffer, sob_extra ) + 2872 (i ? ( (i+1) * sizeof(AttributeName) + alen) : 0) + 2873 op->o_req_dn.bv_len + 1 + 2874 op->o_req_ndn.bv_len + 1 + 2875 op->o_ndn.bv_len + 1 + 2876 so->s_filterstr.bv_len + 1; 2877 sopbuf2 = ch_calloc( 1, size ); 2878 op2 = &sopbuf2->sob_op; 2879 op2->o_hdr = &sopbuf2->sob_hdr; 2880 LDAP_SLIST_FIRST(&op2->o_extra) = &sopbuf2->sob_oe; 2881 2882 /* Copy the fields we care about explicitly, leave the rest alone */ 2883 *op2->o_hdr = *op->o_hdr; 2884 op2->o_tag = op->o_tag; 2885 op2->o_time = op->o_time; 2886 op2->o_bd = on->on_info->oi_origdb; 2887 op2->o_request = op->o_request; 2888 op2->o_managedsait = op->o_managedsait; 2889 LDAP_SLIST_FIRST(&op2->o_extra)->oe_key = on; 2890 LDAP_SLIST_NEXT(LDAP_SLIST_FIRST(&op2->o_extra), oe_next) = NULL; 2891 2892 ptr = (char *) sopbuf2 + offsetof( SyncOperationBuffer, sob_extra ); 2893 if ( i ) { 2894 op2->ors_attrs = (AttributeName *) ptr; 2895 ptr = (char *) &op2->ors_attrs[i+1]; 2896 for (i=0; !BER_BVISNULL( &op->ors_attrs[i].an_name ); i++) { 2897 op2->ors_attrs[i] = op->ors_attrs[i]; 2898 op2->ors_attrs[i].an_name.bv_val = ptr; 2899 ptr = lutil_strcopy( ptr, op->ors_attrs[i].an_name.bv_val ) + 1; 2900 } 2901 BER_BVZERO( &op2->ors_attrs[i].an_name ); 2902 } 2903 2904 op2->o_authz = op->o_authz; 2905 op2->o_ndn.bv_val = ptr; 2906 ptr = lutil_strcopy(ptr, op->o_ndn.bv_val) + 1; 2907 op2->o_dn = op2->o_ndn; 2908 op2->o_req_dn.bv_len = op->o_req_dn.bv_len; 2909 op2->o_req_dn.bv_val = ptr; 2910 ptr = lutil_strcopy(ptr, op->o_req_dn.bv_val) + 1; 2911 op2->o_req_ndn.bv_len = op->o_req_ndn.bv_len; 2912 op2->o_req_ndn.bv_val = ptr; 2913 ptr = lutil_strcopy(ptr, op->o_req_ndn.bv_val) + 1; 2914 op2->ors_filterstr.bv_val = ptr; 2915 strcpy( ptr, so->s_filterstr.bv_val ); 2916 op2->ors_filterstr.bv_len = so->s_filterstr.bv_len; 2917 2918 /* Skip the AND/GE clause that we stuck on in front */ 2919 if ( so->s_flags & PS_FIX_FILTER ) { 2920 op2->ors_filter = op->ors_filter->f_and->f_next; 2921 so->s_flags ^= PS_FIX_FILTER; 2922 } else { 2923 op2->ors_filter = op->ors_filter; 2924 } 2925 op2->ors_filter = filter_dup( op2->ors_filter, NULL ); 2926 so->s_op = op2; 2927 2928 /* Copy any cached group ACLs individually */ 2929 op2->o_groups = NULL; 2930 for ( g1=op->o_groups; g1; g1=g1->ga_next ) { 2931 g2 = ch_malloc( sizeof(GroupAssertion) + g1->ga_len ); 2932 *g2 = *g1; 2933 strcpy( g2->ga_ndn, g1->ga_ndn ); 2934 g2->ga_next = op2->o_groups; 2935 op2->o_groups = g2; 2936 } 2937 /* Don't allow any further group caching */ 2938 op2->o_do_not_cache = 1; 2939 2940 /* Add op2 to conn so abandon will find us */ 2941 op->o_conn->c_n_ops_executing++; 2942 op->o_conn->c_n_ops_completed--; 2943 LDAP_STAILQ_INSERT_TAIL( &op->o_conn->c_ops, op2, o_next ); 2944 so->s_flags |= PS_IS_DETACHED; 2945 } 2946 2947 static int 2948 syncprov_search_response( Operation *op, SlapReply *rs ) 2949 { 2950 searchstate *ss = op->o_callback->sc_private; 2951 slap_overinst *on = ss->ss_on; 2952 syncops *so = ss->ss_so; 2953 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 2954 sync_control *srs = op->o_controls[slap_cids.sc_LDAPsync]; 2955 2956 if ( rs->sr_type == REP_SEARCH || rs->sr_type == REP_SEARCHREF ) { 2957 Attribute *a; 2958 /* If we got a referral without a referral object, there's 2959 * something missing that we cannot replicate. Just ignore it. 2960 * The consumer will abort because we didn't send the expected 2961 * control. 2962 */ 2963 if ( !rs->sr_entry ) { 2964 assert( rs->sr_entry != NULL ); 2965 Debug( LDAP_DEBUG_ANY, "%s syncprov_search_response: " 2966 "bogus referral in context\n", op->o_log_prefix ); 2967 return SLAP_CB_CONTINUE; 2968 } 2969 if ( is_entry_glue( rs->sr_entry ) ) { 2970 return LDAP_SUCCESS; 2971 } 2972 a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryCSN ); 2973 if ( a == NULL && rs->sr_operational_attrs != NULL ) { 2974 a = attr_find( rs->sr_operational_attrs, slap_schema.si_ad_entryCSN ); 2975 } 2976 if ( a ) { 2977 int i, sid; 2978 sid = slap_parse_csn_sid( &a->a_nvals[0] ); 2979 2980 /* If not a persistent search */ 2981 if ( !so ) { 2982 /* Make sure entry is less than the snapshot'd contextCSN */ 2983 for ( i=0; i<ss->ss_numcsns; i++ ) { 2984 if ( sid == ss->ss_sids[i] && ber_bvcmp( &a->a_nvals[0], 2985 &ss->ss_ctxcsn[i] ) > 0 ) { 2986 Debug( LDAP_DEBUG_SYNC, "%s syncprov_search_response: " 2987 "Entry %s CSN %s greater than snapshot %s\n", 2988 op->o_log_prefix, 2989 rs->sr_entry->e_name.bv_val, 2990 a->a_nvals[0].bv_val, 2991 ss->ss_ctxcsn[i].bv_val ); 2992 return LDAP_SUCCESS; 2993 } 2994 } 2995 } 2996 2997 /* Don't send old entries twice */ 2998 if ( srs->sr_state.ctxcsn ) { 2999 for ( i=0; i<srs->sr_state.numcsns; i++ ) { 3000 if ( sid == srs->sr_state.sids[i] && 3001 ber_bvcmp( &a->a_nvals[0], 3002 &srs->sr_state.ctxcsn[i] )<= 0 ) { 3003 Debug( LDAP_DEBUG_SYNC, "%s syncprov_search_response: " 3004 "Entry %s CSN %s older or equal to ctx %s\n", 3005 op->o_log_prefix, 3006 rs->sr_entry->e_name.bv_val, 3007 a->a_nvals[0].bv_val, 3008 srs->sr_state.ctxcsn[i].bv_val ); 3009 return LDAP_SUCCESS; 3010 } 3011 } 3012 } 3013 } 3014 rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2, 3015 op->o_tmpmemctx ); 3016 rs->sr_ctrls[1] = NULL; 3017 rs->sr_flags |= REP_CTRLS_MUSTBEFREED; 3018 /* If we're in delta-sync mode, always send a cookie */ 3019 if ( si->si_nopres && si->si_usehint && a ) { 3020 struct berval cookie; 3021 slap_compose_sync_cookie( op, &cookie, a->a_nvals, srs->sr_state.rid, 3022 slap_serverID ? slap_serverID : -1, NULL ); 3023 rs->sr_err = syncprov_state_ctrl( op, rs, rs->sr_entry, 3024 LDAP_SYNC_ADD, rs->sr_ctrls, 0, 1, &cookie ); 3025 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); 3026 } else { 3027 rs->sr_err = syncprov_state_ctrl( op, rs, rs->sr_entry, 3028 LDAP_SYNC_ADD, rs->sr_ctrls, 0, 0, NULL ); 3029 } 3030 } else if ( rs->sr_type == REP_RESULT && rs->sr_err == LDAP_SUCCESS ) { 3031 struct berval cookie = BER_BVNULL; 3032 3033 if ( ( ss->ss_flags & SS_CHANGED ) && 3034 ss->ss_ctxcsn && !BER_BVISNULL( &ss->ss_ctxcsn[0] )) { 3035 slap_compose_sync_cookie( op, &cookie, ss->ss_ctxcsn, 3036 srs->sr_state.rid, 3037 slap_serverID ? slap_serverID : -1, NULL ); 3038 3039 Debug( LDAP_DEBUG_SYNC, "%s syncprov_search_response: cookie=%s\n", 3040 op->o_log_prefix, cookie.bv_val ); 3041 } 3042 3043 /* Is this a regular refresh? 3044 * Note: refresh never gets here if there were no changes 3045 */ 3046 if ( !so ) { 3047 rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2, 3048 op->o_tmpmemctx ); 3049 rs->sr_ctrls[1] = NULL; 3050 rs->sr_flags |= REP_CTRLS_MUSTBEFREED; 3051 rs->sr_err = syncprov_done_ctrl( op, rs, rs->sr_ctrls, 3052 0, 1, &cookie, ( ss->ss_flags & SS_PRESENT ) ? LDAP_SYNC_REFRESH_PRESENTS : 3053 LDAP_SYNC_REFRESH_DELETES ); 3054 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); 3055 } else { 3056 /* It's RefreshAndPersist, transition to Persist phase */ 3057 rs->sr_err = SLAPD_NO_REPLY; 3058 syncprov_sendinfo( op, rs, ( ss->ss_flags & SS_PRESENT ) ? 3059 LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE, 3060 ( ss->ss_flags & SS_CHANGED ) ? &cookie : NULL, 3061 1, NULL, 0 ); 3062 if ( !BER_BVISNULL( &cookie )) 3063 op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); 3064 3065 /* Detach this Op from frontend control */ 3066 ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex ); 3067 3068 /* But not if this connection was closed along the way */ 3069 if ( op->o_abandon ) { 3070 ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex ); 3071 /* syncprov_ab_cleanup will free this syncop */ 3072 return SLAPD_ABANDON; 3073 3074 } else { 3075 ldap_pvt_thread_mutex_lock( &so->s_mutex ); 3076 /* Turn off the refreshing flag */ 3077 so->s_flags ^= PS_IS_REFRESHING; 3078 3079 Debug( LDAP_DEBUG_SYNC, "%s syncprov_search_response: " 3080 "detaching op\n", op->o_log_prefix ); 3081 syncprov_detach_op( op, so, on ); 3082 3083 ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex ); 3084 3085 /* If there are queued responses, fire them off */ 3086 if ( so->s_res ) 3087 syncprov_qstart( so ); 3088 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 3089 return rs->sr_err; 3090 } 3091 } 3092 } 3093 3094 return SLAP_CB_CONTINUE; 3095 } 3096 3097 static int 3098 syncprov_search_cb( Operation *op, SlapReply *rs ) 3099 { 3100 /* 3101 * Prevent the glue overlay from processing subordinates when it is 3102 * configured (explicitly or implicitly) below the syncprov overlay. 3103 */ 3104 if ( rs->sr_type == REP_RESULT ) 3105 op->o_no_subordinate_glue = 1; 3106 return SLAP_CB_CONTINUE; 3107 } 3108 3109 static int 3110 syncprov_search_cleanup( Operation *op, SlapReply *rs ) 3111 { 3112 if ( rs->sr_type == REP_RESULT || rs->sr_type == REP_INTERMEDIATE || 3113 rs->sr_err == SLAPD_ABANDON || op->o_abandon ) { 3114 searchstate *ss = op->o_callback->sc_private; 3115 if ( ss && ss->ss_numcsns ) { 3116 ber_bvarray_free_x( ss->ss_ctxcsn, op->o_tmpmemctx ); 3117 op->o_tmpfree( ss->ss_sids, op->o_tmpmemctx ); 3118 } 3119 slap_freeself_cb( op, rs ); 3120 } 3121 return SLAP_CB_CONTINUE; 3122 } 3123 3124 static int 3125 syncprov_op_search( Operation *op, SlapReply *rs ) 3126 { 3127 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 3128 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 3129 slap_callback *cb; 3130 int gotstate = 0, changed = 0, do_present = 0; 3131 syncops *sop = NULL; 3132 searchstate *ss; 3133 sync_control *srs; 3134 BerVarray ctxcsn; 3135 int i, *sids, numcsns; 3136 struct berval mincsn, maxcsn; 3137 int minsid, maxsid; 3138 int dirty = 0; 3139 3140 if ( op->o_sync > SLAP_CONTROL_IGNORED ) { 3141 cb = op->o_tmpcalloc( 1, sizeof(slap_callback), op->o_tmpmemctx ); 3142 cb->sc_response = syncprov_search_cb; 3143 cb->sc_cleanup = syncprov_search_cleanup; 3144 cb->sc_next = op->o_callback; 3145 op->o_callback = cb; 3146 } 3147 3148 if ( !(op->o_sync_mode & SLAP_SYNC_REFRESH) ) return SLAP_CB_CONTINUE; 3149 3150 if ( op->ors_deref & LDAP_DEREF_SEARCHING ) { 3151 send_ldap_error( op, rs, LDAP_PROTOCOL_ERROR, "illegal value for derefAliases" ); 3152 return rs->sr_err; 3153 } 3154 3155 srs = op->o_controls[slap_cids.sc_LDAPsync]; 3156 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: " 3157 "got a %ssearch with a cookie=%s\n", 3158 op->o_log_prefix, 3159 op->o_sync_mode & SLAP_SYNC_PERSIST ? "persistent ": "", 3160 srs->sr_state.octet_str.bv_val ); 3161 3162 /* If this is a persistent search, set it up right away */ 3163 if ( op->o_sync_mode & SLAP_SYNC_PERSIST ) { 3164 syncops so = {0}; 3165 fbase_cookie fc; 3166 opcookie opc; 3167 slap_callback sc = {0}; 3168 3169 fc.fss = &so; 3170 fc.fbase = 0; 3171 so.s_eid = NOID; 3172 so.s_op = op; 3173 so.s_flags = PS_IS_REFRESHING | PS_FIND_BASE; 3174 /* syncprov_findbase expects to be called as a callback... */ 3175 sc.sc_private = &opc; 3176 opc.son = on; 3177 ldap_pvt_thread_mutex_init( &so.s_mutex ); 3178 cb = op->o_callback; 3179 op->o_callback = ≻ 3180 rs->sr_err = syncprov_findbase( op, &fc ); 3181 op->o_callback = cb; 3182 ldap_pvt_thread_mutex_destroy( &so.s_mutex ); 3183 3184 /* Special case, if client knows nothing, nor do we, keep going */ 3185 if ( srs->sr_state.numcsns == 0 && rs->sr_err == LDAP_NO_SUCH_OBJECT ) { 3186 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: " 3187 "both our DB and client empty, ignoring NO_SUCH_OBJECT\n", 3188 op->o_log_prefix ); 3189 rs->sr_err = LDAP_SUCCESS; 3190 } 3191 3192 if ( rs->sr_err != LDAP_SUCCESS ) { 3193 send_ldap_result( op, rs ); 3194 return rs->sr_err; 3195 } 3196 sop = ch_malloc( sizeof( syncops )); 3197 *sop = so; 3198 sop->s_rid = srs->sr_state.rid; 3199 sop->s_sid = srs->sr_state.sid; 3200 /* set refcount=2 to prevent being freed out from under us 3201 * by abandons that occur while we're running here 3202 */ 3203 sop->s_inuse = 2; 3204 3205 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 3206 while ( si->si_active ) { 3207 /* Wait for active mods to finish before proceeding, as they 3208 * may already have inspected the si_ops list looking for 3209 * consumers to replicate the change to. Using the log 3210 * doesn't help, as we may finish playing it before the 3211 * active mods gets added to it. 3212 */ 3213 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 3214 if ( slapd_shutdown ) { 3215 aband: 3216 ch_free( sop->s_base.bv_val ); 3217 ch_free( sop ); 3218 return SLAPD_ABANDON; 3219 } 3220 if ( !ldap_pvt_thread_pool_pausewait( &connection_pool )) 3221 ldap_pvt_thread_yield(); 3222 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 3223 } 3224 if ( op->o_abandon ) { 3225 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 3226 goto aband; 3227 } 3228 ldap_pvt_thread_mutex_init( &sop->s_mutex ); 3229 sop->s_next = si->si_ops; 3230 sop->s_si = si; 3231 si->si_ops = sop; 3232 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 3233 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: " 3234 "registered persistent search\n", op->o_log_prefix ); 3235 } 3236 3237 /* snapshot the ctxcsn 3238 * Note: this must not be done before the psearch setup. (ITS#8365) 3239 */ 3240 ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); 3241 numcsns = si->si_numcsns; 3242 if ( numcsns ) { 3243 ber_bvarray_dup_x( &ctxcsn, si->si_ctxcsn, op->o_tmpmemctx ); 3244 sids = op->o_tmpalloc( numcsns * sizeof(int), op->o_tmpmemctx ); 3245 for ( i=0; i<numcsns; i++ ) 3246 sids[i] = si->si_sids[i]; 3247 } else { 3248 ctxcsn = NULL; 3249 sids = NULL; 3250 } 3251 dirty = si->si_dirty; 3252 ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); 3253 3254 /* If we have a cookie, handle the PRESENT lookups */ 3255 if ( srs->sr_state.ctxcsn ) { 3256 sessionlog *sl; 3257 int i, j; 3258 3259 /* If we don't have any CSN of our own yet, bail out. 3260 */ 3261 if ( !numcsns ) { 3262 rs->sr_err = LDAP_UNWILLING_TO_PERFORM; 3263 rs->sr_text = "consumer has state info but provider doesn't!"; 3264 goto bailout; 3265 } 3266 3267 if ( !si->si_nopres ) 3268 do_present = SS_PRESENT; 3269 3270 /* If there are SIDs we don't recognize in the cookie, drop them */ 3271 for (i=0; i<srs->sr_state.numcsns; ) { 3272 for (j=i; j<numcsns; j++) { 3273 if ( srs->sr_state.sids[i] <= sids[j] ) { 3274 break; 3275 } 3276 } 3277 /* not found */ 3278 if ( j == numcsns || srs->sr_state.sids[i] != sids[j] ) { 3279 char *tmp = srs->sr_state.ctxcsn[i].bv_val; 3280 srs->sr_state.numcsns--; 3281 for ( j=i; j<srs->sr_state.numcsns; j++ ) { 3282 srs->sr_state.ctxcsn[j] = srs->sr_state.ctxcsn[j+1]; 3283 srs->sr_state.sids[j] = srs->sr_state.sids[j+1]; 3284 } 3285 srs->sr_state.ctxcsn[j].bv_val = tmp; 3286 srs->sr_state.ctxcsn[j].bv_len = 0; 3287 continue; 3288 } 3289 i++; 3290 } 3291 3292 if (srs->sr_state.numcsns != numcsns) { 3293 /* consumer doesn't have the right number of CSNs */ 3294 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: " 3295 "consumer cookie is missing a csn we track\n", 3296 op->o_log_prefix ); 3297 3298 changed = SS_CHANGED; 3299 if ( srs->sr_state.ctxcsn ) { 3300 ber_bvarray_free_x( srs->sr_state.ctxcsn, op->o_tmpmemctx ); 3301 srs->sr_state.ctxcsn = NULL; 3302 } 3303 if ( srs->sr_state.sids ) { 3304 slap_sl_free( srs->sr_state.sids, op->o_tmpmemctx ); 3305 srs->sr_state.sids = NULL; 3306 } 3307 srs->sr_state.numcsns = 0; 3308 goto shortcut; 3309 } 3310 3311 /* Find the smallest CSN which differs from contextCSN */ 3312 mincsn.bv_len = 0; 3313 maxcsn.bv_len = 0; 3314 for ( i=0,j=0; i<srs->sr_state.numcsns; i++ ) { 3315 int newer; 3316 while ( srs->sr_state.sids[i] != sids[j] ) j++; 3317 if ( BER_BVISEMPTY( &maxcsn ) || ber_bvcmp( &maxcsn, 3318 &srs->sr_state.ctxcsn[i] ) < 0 ) { 3319 maxcsn = srs->sr_state.ctxcsn[i]; 3320 maxsid = sids[j]; 3321 } 3322 newer = ber_bvcmp( &srs->sr_state.ctxcsn[i], &ctxcsn[j] ); 3323 /* If our state is newer, tell consumer about changes */ 3324 if ( newer < 0) { 3325 changed = SS_CHANGED; 3326 if ( BER_BVISEMPTY( &mincsn ) || ber_bvcmp( &mincsn, 3327 &srs->sr_state.ctxcsn[i] ) > 0 ) { 3328 mincsn = srs->sr_state.ctxcsn[i]; 3329 minsid = sids[j]; 3330 } 3331 } else if ( newer > 0 && sids[j] == slap_serverID ) { 3332 /* our state is older, complain to consumer */ 3333 rs->sr_err = LDAP_UNWILLING_TO_PERFORM; 3334 rs->sr_text = "consumer state is newer than provider!"; 3335 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: " 3336 "consumer %d state %s is newer than provider %d state %s\n", 3337 op->o_log_prefix, sids[i], srs->sr_state.ctxcsn[i].bv_val, 3338 sids[j], /* == slap_serverID */ 3339 ctxcsn[j].bv_val); 3340 bailout: 3341 if ( sop ) { 3342 syncops **sp = &si->si_ops; 3343 3344 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 3345 while ( *sp != sop ) 3346 sp = &(*sp)->s_next; 3347 *sp = sop->s_next; 3348 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 3349 ch_free( sop->s_base.bv_val ); 3350 ch_free( sop ); 3351 } 3352 rs->sr_ctrls = NULL; 3353 send_ldap_result( op, rs ); 3354 if ( numcsns ) { 3355 ber_bvarray_free_x( ctxcsn, op->o_tmpmemctx ); 3356 op->o_tmpfree( sids, op->o_tmpmemctx ); 3357 } 3358 return rs->sr_err; 3359 } 3360 } 3361 if ( BER_BVISEMPTY( &mincsn )) { 3362 mincsn = maxcsn; 3363 minsid = maxsid; 3364 } 3365 3366 /* If nothing has changed, shortcut it */ 3367 if ( !changed && !dirty ) { 3368 do_present = 0; 3369 no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) { 3370 LDAPControl *ctrls[2]; 3371 3372 ctrls[0] = NULL; 3373 ctrls[1] = NULL; 3374 syncprov_done_ctrl( op, rs, ctrls, 0, 0, 3375 NULL, LDAP_SYNC_REFRESH_DELETES ); 3376 rs->sr_ctrls = ctrls; 3377 rs->sr_err = LDAP_SUCCESS; 3378 send_ldap_result( op, rs ); 3379 rs->sr_ctrls = NULL; 3380 if ( numcsns ) { 3381 ber_bvarray_free_x( ctxcsn, op->o_tmpmemctx ); 3382 op->o_tmpfree( sids, op->o_tmpmemctx ); 3383 } 3384 return rs->sr_err; 3385 } 3386 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: " 3387 "no change, skipping log replay\n", 3388 op->o_log_prefix ); 3389 goto shortcut; 3390 } 3391 3392 if ( !BER_BVISNULL( &si->si_logbase ) ) { 3393 do_present = 0; 3394 if ( syncprov_play_accesslog( op, rs, srs, ctxcsn, 3395 numcsns, sids, &mincsn, minsid ) ) { 3396 do_present = SS_PRESENT; 3397 } 3398 } else if ( si->si_logs ) { 3399 do_present = 0; 3400 if ( syncprov_play_sessionlog( op, rs, srs, ctxcsn, 3401 numcsns, sids, &mincsn, minsid ) ) { 3402 do_present = SS_PRESENT; 3403 } 3404 } else if ( ad_minCSN != NULL && si->si_nopres && si->si_usehint ) { 3405 /* We are instructed to trust minCSN if it exists. */ 3406 Entry *e; 3407 Attribute *a = NULL; 3408 int rc; 3409 3410 /* 3411 * ITS#9580 FIXME: when we've figured out and split the 3412 * sessionlog/deltalog tracking, use the appropriate attribute 3413 */ 3414 rc = overlay_entry_get_ov( op, &op->o_bd->be_nsuffix[0], NULL, 3415 ad_minCSN, 0, &e, on ); 3416 if ( rc == LDAP_SUCCESS && e != NULL ) { 3417 a = attr_find( e->e_attrs, ad_minCSN ); 3418 } 3419 3420 if ( a != NULL ) { 3421 int *minsids; 3422 3423 minsids = slap_parse_csn_sids( a->a_vals, a->a_numvals, op->o_tmpmemctx ); 3424 slap_sort_csn_sids( a->a_vals, minsids, a->a_numvals, op->o_tmpmemctx ); 3425 3426 for ( i=0, j=0; i < a->a_numvals; i++ ) { 3427 while ( j < numcsns && minsids[i] > sids[j] ) j++; 3428 if ( j < numcsns && minsids[i] == sids[j] && 3429 ber_bvcmp( &a->a_vals[i], &srs->sr_state.ctxcsn[j] ) <= 0 ) { 3430 /* minCSN for this serverID is contained, keep going */ 3431 continue; 3432 } 3433 /* 3434 * Log DB's minCSN claims we can only replay from a certain 3435 * CSN for this serverID, but consumer's cookie hasn't met that 3436 * threshold: they need to refresh 3437 */ 3438 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: " 3439 "consumer not within recorded mincsn for DB's mincsn=%s\n", 3440 op->o_log_prefix, a->a_vals[i].bv_val ); 3441 rs->sr_err = LDAP_SYNC_REFRESH_REQUIRED; 3442 rs->sr_text = "sync cookie is stale"; 3443 slap_sl_free( minsids, op->o_tmpmemctx ); 3444 overlay_entry_release_ov( op, e, 0, on ); 3445 goto bailout; 3446 } 3447 slap_sl_free( minsids, op->o_tmpmemctx ); 3448 } 3449 if ( e != NULL ) 3450 overlay_entry_release_ov( op, e, 0, on ); 3451 } 3452 3453 /* 3454 * If sessionlog wasn't useful, see if we can find at least one entry 3455 * that hasn't changed based on the cookie. 3456 * 3457 * TODO: Using mincsn only (rather than the whole cookie) will 3458 * under-approximate the set of entries that haven't changed, but we 3459 * can't look up CSNs by serverid with the current indexing support. 3460 * 3461 * As a result, dormant serverids in the cluster become mincsns and 3462 * more likely to make syncprov_findcsn(,FIND_CSN,) fail -> triggering 3463 * an expensive refresh... 3464 */ 3465 if ( !do_present ) { 3466 gotstate = 1; 3467 } else if ( syncprov_findcsn( op, FIND_CSN, &mincsn ) != LDAP_SUCCESS ) { 3468 /* No, so a reload is required */ 3469 /* the 2.2 consumer doesn't send this hint */ 3470 if ( si->si_usehint && srs->sr_rhint == 0 ) { 3471 rs->sr_err = LDAP_SYNC_REFRESH_REQUIRED; 3472 rs->sr_text = "sync cookie is stale"; 3473 goto bailout; 3474 } 3475 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: " 3476 "failed to find entry with csn=%s, ignoring cookie\n", 3477 op->o_log_prefix, mincsn.bv_val ); 3478 if ( srs->sr_state.ctxcsn ) { 3479 ber_bvarray_free_x( srs->sr_state.ctxcsn, op->o_tmpmemctx ); 3480 srs->sr_state.ctxcsn = NULL; 3481 } 3482 if ( srs->sr_state.sids ) { 3483 slap_sl_free( srs->sr_state.sids, op->o_tmpmemctx ); 3484 srs->sr_state.sids = NULL; 3485 } 3486 srs->sr_state.numcsns = 0; 3487 } else { 3488 gotstate = 1; 3489 /* If changed and doing Present lookup, send Present UUIDs */ 3490 if ( syncprov_findcsn( op, FIND_PRESENT, 0 ) != LDAP_SUCCESS ) 3491 goto bailout; 3492 } 3493 } else { 3494 /* The consumer knows nothing, we know nothing. OK. */ 3495 if (!numcsns) 3496 goto no_change; 3497 /* No consumer state, assume something has changed */ 3498 changed = SS_CHANGED; 3499 } 3500 3501 shortcut: 3502 /* Append CSN range to search filter, save original filter 3503 * for persistent search evaluation 3504 */ 3505 if ( sop ) { 3506 ldap_pvt_thread_mutex_lock( &sop->s_mutex ); 3507 sop->s_filterstr = op->ors_filterstr; 3508 /* correct the refcount that was set to 2 before */ 3509 sop->s_inuse--; 3510 } 3511 3512 /* If something changed, find the changes */ 3513 if ( gotstate && ( changed || dirty ) ) { 3514 Filter *fand, *fava; 3515 3516 fand = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx ); 3517 fand->f_choice = LDAP_FILTER_AND; 3518 fand->f_next = NULL; 3519 fava = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx ); 3520 fand->f_and = fava; 3521 fava->f_choice = LDAP_FILTER_GE; 3522 fava->f_ava = op->o_tmpalloc( sizeof(AttributeAssertion), op->o_tmpmemctx ); 3523 fava->f_ava->aa_desc = slap_schema.si_ad_entryCSN; 3524 #ifdef LDAP_COMP_MATCH 3525 fava->f_ava->aa_cf = NULL; 3526 #endif 3527 ber_dupbv_x( &fava->f_ava->aa_value, &mincsn, op->o_tmpmemctx ); 3528 fava->f_next = op->ors_filter; 3529 op->ors_filter = fand; 3530 filter2bv_x( op, op->ors_filter, &op->ors_filterstr ); 3531 if ( sop ) { 3532 sop->s_flags |= PS_FIX_FILTER; 3533 } 3534 } 3535 if ( sop ) { 3536 ldap_pvt_thread_mutex_unlock( &sop->s_mutex ); 3537 } 3538 3539 /* Let our callback add needed info to returned entries */ 3540 cb = op->o_tmpcalloc(1, sizeof(slap_callback)+sizeof(searchstate), op->o_tmpmemctx); 3541 ss = (searchstate *)(cb+1); 3542 ss->ss_on = on; 3543 ss->ss_so = sop; 3544 ss->ss_flags = do_present | changed; 3545 ss->ss_ctxcsn = ctxcsn; 3546 ss->ss_numcsns = numcsns; 3547 ss->ss_sids = sids; 3548 cb->sc_response = syncprov_search_response; 3549 cb->sc_cleanup = syncprov_search_cleanup; 3550 cb->sc_private = ss; 3551 cb->sc_next = op->o_callback; 3552 op->o_callback = cb; 3553 3554 /* If this is a persistent search and no changes were reported during 3555 * the refresh phase, just invoke the response callback to transition 3556 * us into persist phase 3557 */ 3558 if ( !changed && !dirty ) { 3559 Debug( LDAP_DEBUG_SYNC, "%s syncprov_op_search: " 3560 "nothing changed, finishing up initial search early\n", 3561 op->o_log_prefix ); 3562 rs->sr_err = LDAP_SUCCESS; 3563 rs->sr_nentries = 0; 3564 send_ldap_result( op, rs ); 3565 return rs->sr_err; 3566 } 3567 return SLAP_CB_CONTINUE; 3568 } 3569 3570 static int 3571 syncprov_operational( 3572 Operation *op, 3573 SlapReply *rs ) 3574 { 3575 slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; 3576 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 3577 3578 /* This prevents generating unnecessarily; frontend will strip 3579 * any statically stored copy. 3580 */ 3581 if ( op->o_sync != SLAP_CONTROL_NONE ) 3582 return SLAP_CB_CONTINUE; 3583 3584 if ( rs->sr_entry && 3585 dn_match( &rs->sr_entry->e_nname, &si->si_contextdn )) { 3586 3587 if ( SLAP_OPATTRS( rs->sr_attr_flags ) || 3588 ad_inlist( slap_schema.si_ad_contextCSN, rs->sr_attrs )) { 3589 Attribute *a, **ap = NULL; 3590 3591 for ( a=rs->sr_entry->e_attrs; a; a=a->a_next ) { 3592 if ( a->a_desc == slap_schema.si_ad_contextCSN ) 3593 break; 3594 } 3595 3596 ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); 3597 if ( si->si_ctxcsn ) { 3598 if ( !a ) { 3599 for ( ap = &rs->sr_operational_attrs; *ap; 3600 ap=&(*ap)->a_next ); 3601 3602 a = attr_alloc( slap_schema.si_ad_contextCSN ); 3603 *ap = a; 3604 } 3605 3606 if ( !ap ) { 3607 if ( rs_entry2modifiable( op, rs, on )) { 3608 a = attr_find( rs->sr_entry->e_attrs, 3609 slap_schema.si_ad_contextCSN ); 3610 } 3611 if ( a->a_nvals != a->a_vals ) { 3612 ber_bvarray_free( a->a_nvals ); 3613 } 3614 a->a_nvals = NULL; 3615 ber_bvarray_free( a->a_vals ); 3616 a->a_vals = NULL; 3617 a->a_numvals = 0; 3618 } 3619 attr_valadd( a, si->si_ctxcsn, si->si_ctxcsn, si->si_numcsns ); 3620 } 3621 ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); 3622 } 3623 } 3624 return SLAP_CB_CONTINUE; 3625 } 3626 3627 static int 3628 syncprov_setup_accesslog(void) 3629 { 3630 const char *text; 3631 int rc = -1; 3632 3633 if ( !ad_reqType ) { 3634 if ( slap_str2ad( "reqType", &ad_reqType, &text ) ) { 3635 Debug( LDAP_DEBUG_ANY, "syncprov_setup_accesslog: " 3636 "couldn't get definition for attribute reqType, " 3637 "is accesslog configured?\n" ); 3638 return rc; 3639 } 3640 } 3641 3642 if ( !ad_reqResult ) { 3643 if ( slap_str2ad( "reqResult", &ad_reqResult, &text ) ) { 3644 Debug( LDAP_DEBUG_ANY, "syncprov_setup_accesslog: " 3645 "couldn't get definition for attribute reqResult, " 3646 "is accesslog configured?\n" ); 3647 return rc; 3648 } 3649 } 3650 3651 if ( !ad_reqDN ) { 3652 if ( slap_str2ad( "reqDN", &ad_reqDN, &text ) ) { 3653 Debug( LDAP_DEBUG_ANY, "syncprov_setup_accesslog: " 3654 "couldn't get definition for attribute reqDN, " 3655 "is accesslog configured?\n" ); 3656 return rc; 3657 } 3658 } 3659 3660 if ( !ad_reqEntryUUID ) { 3661 if ( slap_str2ad( "reqEntryUUID", &ad_reqEntryUUID, &text ) ) { 3662 Debug( LDAP_DEBUG_ANY, "syncprov_setup_accesslog: " 3663 "couldn't get definition for attribute reqEntryUUID, " 3664 "is accesslog configured?\n" ); 3665 return rc; 3666 } 3667 } 3668 3669 if ( !ad_reqNewDN ) { 3670 if ( slap_str2ad( "reqNewDN", &ad_reqNewDN, &text ) ) { 3671 Debug( LDAP_DEBUG_ANY, "syncprov_setup_accesslog: " 3672 "couldn't get definition for attribute reqNewDN, " 3673 "is accessslog configured?\n" ); 3674 return rc; 3675 } 3676 } 3677 3678 if ( !ad_minCSN ) { 3679 if ( slap_str2ad( "minCSN", &ad_minCSN, &text ) ) { 3680 Debug( LDAP_DEBUG_ANY, "syncprov_setup_accesslog: " 3681 "couldn't get definition for attribute minCSN, " 3682 "is accessslog configured?\n" ); 3683 return rc; 3684 } 3685 } 3686 3687 return LDAP_SUCCESS; 3688 } 3689 3690 enum { 3691 SP_CHKPT = 1, 3692 SP_SESSL, 3693 SP_NOPRES, 3694 SP_USEHINT, 3695 SP_LOGDB 3696 }; 3697 3698 static ConfigDriver sp_cf_gen; 3699 3700 static ConfigTable spcfg[] = { 3701 { "syncprov-checkpoint", "ops> <minutes", 3, 3, 0, ARG_MAGIC|SP_CHKPT, 3702 sp_cf_gen, "( OLcfgOvAt:1.1 NAME 'olcSpCheckpoint' " 3703 "DESC 'ContextCSN checkpoint interval in ops and minutes' " 3704 "EQUALITY caseIgnoreMatch " 3705 "SYNTAX OMsDirectoryString SINGLE-VALUE )", NULL, NULL }, 3706 { "syncprov-sessionlog", "ops", 2, 2, 0, ARG_INT|ARG_MAGIC|SP_SESSL, 3707 sp_cf_gen, "( OLcfgOvAt:1.2 NAME 'olcSpSessionlog' " 3708 "DESC 'Session log size in ops' " 3709 "EQUALITY integerMatch " 3710 "SYNTAX OMsInteger SINGLE-VALUE )", NULL, NULL }, 3711 { "syncprov-nopresent", NULL, 2, 2, 0, ARG_ON_OFF|ARG_MAGIC|SP_NOPRES, 3712 sp_cf_gen, "( OLcfgOvAt:1.3 NAME 'olcSpNoPresent' " 3713 "DESC 'Omit Present phase processing' " 3714 "EQUALITY booleanMatch " 3715 "SYNTAX OMsBoolean SINGLE-VALUE )", NULL, NULL }, 3716 { "syncprov-reloadhint", NULL, 2, 2, 0, ARG_ON_OFF|ARG_MAGIC|SP_USEHINT, 3717 sp_cf_gen, "( OLcfgOvAt:1.4 NAME 'olcSpReloadHint' " 3718 "DESC 'Observe Reload Hint in Request control' " 3719 "EQUALITY booleanMatch " 3720 "SYNTAX OMsBoolean SINGLE-VALUE )", NULL, NULL }, 3721 { "syncprov-sessionlog-source", NULL, 2, 2, 0, ARG_DN|ARG_QUOTE|ARG_MAGIC|SP_LOGDB, 3722 sp_cf_gen, "( OLcfgOvAt:1.5 NAME 'olcSpSessionlogSource' " 3723 "DESC 'On startup, try loading sessionlog from this subtree' " 3724 "SYNTAX OMsDN SINGLE-VALUE )", NULL, NULL }, 3725 { NULL, NULL, 0, 0, 0, ARG_IGNORED } 3726 }; 3727 3728 static ConfigOCs spocs[] = { 3729 { "( OLcfgOvOc:1.1 " 3730 "NAME 'olcSyncProvConfig' " 3731 "DESC 'SyncRepl Provider configuration' " 3732 "SUP olcOverlayConfig " 3733 "MAY ( olcSpCheckpoint " 3734 "$ olcSpSessionlog " 3735 "$ olcSpNoPresent " 3736 "$ olcSpReloadHint " 3737 "$ olcSpSessionlogSource " 3738 ") )", 3739 Cft_Overlay, spcfg }, 3740 { NULL, 0, NULL } 3741 }; 3742 3743 static int 3744 sp_cf_gen(ConfigArgs *c) 3745 { 3746 slap_overinst *on = (slap_overinst *)c->bi; 3747 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 3748 int rc = 0; 3749 3750 if ( c->op == SLAP_CONFIG_EMIT ) { 3751 switch ( c->type ) { 3752 case SP_CHKPT: 3753 if ( si->si_chkops || si->si_chktime ) { 3754 struct berval bv; 3755 /* we assume si_chktime is a multiple of 60 3756 * because the parsed value was originally 3757 * multiplied by 60 */ 3758 bv.bv_len = snprintf( c->cr_msg, sizeof( c->cr_msg ), 3759 "%d %d", si->si_chkops, si->si_chktime/60 ); 3760 if ( bv.bv_len >= sizeof( c->cr_msg ) ) { 3761 rc = 1; 3762 } else { 3763 bv.bv_val = c->cr_msg; 3764 value_add_one( &c->rvalue_vals, &bv ); 3765 } 3766 } else { 3767 rc = 1; 3768 } 3769 break; 3770 case SP_SESSL: 3771 if ( si->si_logs ) { 3772 c->value_int = si->si_logs->sl_size; 3773 } else { 3774 rc = 1; 3775 } 3776 break; 3777 case SP_NOPRES: 3778 if ( si->si_nopres ) { 3779 c->value_int = 1; 3780 } else { 3781 rc = 1; 3782 } 3783 break; 3784 case SP_USEHINT: 3785 if ( si->si_usehint ) { 3786 c->value_int = 1; 3787 } else { 3788 rc = 1; 3789 } 3790 break; 3791 case SP_LOGDB: 3792 if ( BER_BVISEMPTY( &si->si_logbase ) ) { 3793 rc = 1; 3794 } else { 3795 value_add_one( &c->rvalue_vals, &si->si_logbase ); 3796 value_add_one( &c->rvalue_nvals, &si->si_logbase ); 3797 } 3798 break; 3799 } 3800 return rc; 3801 } else if ( c->op == LDAP_MOD_DELETE ) { 3802 switch ( c->type ) { 3803 case SP_CHKPT: 3804 si->si_chkops = 0; 3805 si->si_chktime = 0; 3806 break; 3807 case SP_SESSL: 3808 if ( si->si_logs ) 3809 si->si_logs->sl_size = 0; 3810 break; 3811 case SP_NOPRES: 3812 si->si_nopres = 0; 3813 break; 3814 case SP_USEHINT: 3815 si->si_usehint = 0; 3816 break; 3817 case SP_LOGDB: 3818 if ( !BER_BVISNULL( &si->si_logbase ) ) { 3819 ch_free( si->si_logbase.bv_val ); 3820 BER_BVZERO( &si->si_logbase ); 3821 } 3822 break; 3823 } 3824 return rc; 3825 } 3826 switch ( c->type ) { 3827 case SP_CHKPT: 3828 if ( lutil_atoi( &si->si_chkops, c->argv[1] ) != 0 ) { 3829 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s unable to parse checkpoint ops # \"%s\"", 3830 c->argv[0], c->argv[1] ); 3831 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, 3832 "%s: %s\n", c->log, c->cr_msg ); 3833 return ARG_BAD_CONF; 3834 } 3835 if ( si->si_chkops <= 0 ) { 3836 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s invalid checkpoint ops # \"%d\"", 3837 c->argv[0], si->si_chkops ); 3838 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, 3839 "%s: %s\n", c->log, c->cr_msg ); 3840 return ARG_BAD_CONF; 3841 } 3842 if ( lutil_atoi( &si->si_chktime, c->argv[2] ) != 0 ) { 3843 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s unable to parse checkpoint time \"%s\"", 3844 c->argv[0], c->argv[1] ); 3845 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, 3846 "%s: %s\n", c->log, c->cr_msg ); 3847 return ARG_BAD_CONF; 3848 } 3849 if ( si->si_chktime <= 0 ) { 3850 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s invalid checkpoint time \"%d\"", 3851 c->argv[0], si->si_chkops ); 3852 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, 3853 "%s: %s\n", c->log, c->cr_msg ); 3854 return ARG_BAD_CONF; 3855 } 3856 si->si_chktime *= 60; 3857 break; 3858 case SP_SESSL: { 3859 sessionlog *sl; 3860 int size = c->value_int; 3861 3862 if ( size < 0 ) { 3863 snprintf( c->cr_msg, sizeof( c->cr_msg ), "%s size %d is negative", 3864 c->argv[0], size ); 3865 Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE, 3866 "%s: %s\n", c->log, c->cr_msg ); 3867 return ARG_BAD_CONF; 3868 } 3869 if ( size && !BER_BVISNULL( &si->si_logbase ) ) { 3870 Debug( LDAP_DEBUG_ANY, "syncprov_config: while configuring " 3871 "internal sessionlog, accesslog source has already been " 3872 "configured, this results in wasteful operation\n" ); 3873 } 3874 sl = si->si_logs; 3875 if ( !sl ) { 3876 if ( !size ) break; 3877 sl = ch_calloc( 1, sizeof( sessionlog )); 3878 ldap_pvt_thread_rdwr_init( &sl->sl_mutex ); 3879 si->si_logs = sl; 3880 } 3881 sl->sl_size = size; 3882 } 3883 break; 3884 case SP_NOPRES: 3885 si->si_nopres = c->value_int; 3886 if ( si->si_nopres ) { 3887 /* Consider we might be a delta provider, but it's ok if not */ 3888 (void)syncprov_setup_accesslog(); 3889 } 3890 break; 3891 case SP_USEHINT: 3892 si->si_usehint = c->value_int; 3893 break; 3894 case SP_LOGDB: 3895 if ( si->si_logs ) { 3896 Debug( LDAP_DEBUG_ANY, "syncprov_config: while configuring " 3897 "accesslog source, internal sessionlog has already been " 3898 "configured, this results in wasteful operation\n" ); 3899 } 3900 if ( CONFIG_ONLINE_ADD( c ) ) { 3901 if ( !select_backend( &c->value_ndn, 0 ) ) { 3902 snprintf( c->cr_msg, sizeof( c->cr_msg ), 3903 "<%s> no matching backend found for suffix", 3904 c->argv[0] ); 3905 Debug( LDAP_DEBUG_ANY, "%s: %s \"%s\"\n", 3906 c->log, c->cr_msg, c->value_dn.bv_val ); 3907 rc = 1; 3908 break; 3909 } 3910 ch_free( c->value_ndn.bv_val ); 3911 } 3912 si->si_logbase = c->value_ndn; 3913 rc = syncprov_setup_accesslog(); 3914 ch_free( c->value_dn.bv_val ); 3915 break; 3916 } 3917 return rc; 3918 } 3919 3920 /* ITS#3456 we cannot run this search on the main thread, must use a 3921 * child thread in order to insure we have a big enough stack. 3922 */ 3923 static void * 3924 syncprov_db_otask( 3925 void *ptr 3926 ) 3927 { 3928 syncprov_findcsn( ptr, FIND_MAXCSN, 0 ); 3929 return NULL; 3930 } 3931 3932 static int 3933 syncprov_db_ocallback( 3934 Operation *op, 3935 SlapReply *rs 3936 ) 3937 { 3938 if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) { 3939 if ( rs->sr_entry->e_name.bv_len ) 3940 op->o_callback->sc_private = (void *)1; 3941 } 3942 return LDAP_SUCCESS; 3943 } 3944 3945 /* ITS#9015 see if the DB is really empty */ 3946 static void * 3947 syncprov_db_otask2( 3948 void *ptr 3949 ) 3950 { 3951 Operation *op = ptr; 3952 SlapReply rs = {REP_RESULT}; 3953 slap_callback cb = {0}; 3954 int rc; 3955 3956 cb.sc_response = syncprov_db_ocallback; 3957 3958 op->o_managedsait = SLAP_CONTROL_CRITICAL; 3959 op->o_callback = &cb; 3960 op->o_tag = LDAP_REQ_SEARCH; 3961 op->ors_scope = LDAP_SCOPE_SUBTREE; 3962 op->ors_limit = NULL; 3963 op->ors_slimit = 1; 3964 op->ors_tlimit = SLAP_NO_LIMIT; 3965 op->ors_attrs = slap_anlist_no_attrs; 3966 op->ors_attrsonly = 1; 3967 op->ors_deref = LDAP_DEREF_NEVER; 3968 op->ors_filter = &generic_filter; 3969 op->ors_filterstr = generic_filterstr; 3970 rc = op->o_bd->be_search( op, &rs ); 3971 if ( rc == LDAP_SIZELIMIT_EXCEEDED || cb.sc_private ) 3972 op->ors_slimit = 2; 3973 return NULL; 3974 } 3975 3976 /* Read any existing contextCSN from the underlying db. 3977 * Then search for any entries newer than that. If no value exists, 3978 * just generate it. Cache whatever result. 3979 */ 3980 static int 3981 syncprov_db_open( 3982 BackendDB *be, 3983 ConfigReply *cr 3984 ) 3985 { 3986 slap_overinst *on = (slap_overinst *) be->bd_info; 3987 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 3988 3989 Connection conn = { 0 }; 3990 OperationBuffer opbuf; 3991 Operation *op; 3992 Entry *e = NULL; 3993 Attribute *a; 3994 int rc; 3995 void *thrctx = NULL; 3996 3997 if ( !SLAP_LASTMOD( be )) { 3998 Debug( LDAP_DEBUG_ANY, 3999 "syncprov_db_open: invalid config, lastmod must be enabled\n" ); 4000 return -1; 4001 } 4002 4003 if ( slapMode & SLAP_TOOL_MODE ) { 4004 return 0; 4005 } 4006 4007 rc = overlay_register_control( be, LDAP_CONTROL_SYNC ); 4008 if ( rc ) { 4009 return rc; 4010 } 4011 4012 Debug( LDAP_DEBUG_SYNC, "syncprov_db_open: " 4013 "starting syncprov for suffix %s\n", 4014 be->be_suffix[0].bv_val ); 4015 4016 thrctx = ldap_pvt_thread_pool_context(); 4017 connection_fake_init2( &conn, &opbuf, thrctx, 0 ); 4018 op = &opbuf.ob_op; 4019 op->o_bd = be; 4020 op->o_dn = be->be_rootdn; 4021 op->o_ndn = be->be_rootndn; 4022 4023 if ( SLAP_SYNC_SUBENTRY( be )) { 4024 build_new_dn( &si->si_contextdn, be->be_nsuffix, 4025 (struct berval *)&slap_ldapsync_cn_bv, NULL ); 4026 } else { 4027 si->si_contextdn = be->be_nsuffix[0]; 4028 } 4029 rc = overlay_entry_get_ov( op, &si->si_contextdn, NULL, 4030 slap_schema.si_ad_contextCSN, 0, &e, on ); 4031 4032 if ( e ) { 4033 ldap_pvt_thread_t tid; 4034 4035 a = attr_find( e->e_attrs, slap_schema.si_ad_contextCSN ); 4036 if ( a ) { 4037 ber_bvarray_dup_x( &si->si_ctxcsn, a->a_vals, NULL ); 4038 si->si_numcsns = a->a_numvals; 4039 si->si_sids = slap_parse_csn_sids( si->si_ctxcsn, a->a_numvals, NULL ); 4040 slap_sort_csn_sids( si->si_ctxcsn, si->si_sids, si->si_numcsns, NULL ); 4041 } 4042 overlay_entry_release_ov( op, e, 0, on ); 4043 if ( si->si_ctxcsn && !SLAP_DBCLEAN( be )) { 4044 op->o_tag = LDAP_REQ_SEARCH; 4045 op->o_req_dn = be->be_suffix[0]; 4046 op->o_req_ndn = be->be_nsuffix[0]; 4047 op->ors_scope = LDAP_SCOPE_SUBTREE; 4048 ldap_pvt_thread_create( &tid, 0, syncprov_db_otask, op ); 4049 ldap_pvt_thread_join( tid, NULL ); 4050 } 4051 } 4052 4053 /* Didn't find a contextCSN, should we generate one? */ 4054 if ( !si->si_ctxcsn ) { 4055 char csnbuf[ LDAP_PVT_CSNSTR_BUFSIZE ]; 4056 struct berval csn; 4057 4058 if ( SLAP_SINGLE_SHADOW( op->o_bd ) ) { 4059 /* Not in charge of this serverID, don't generate anything. */ 4060 goto out; 4061 } 4062 if ( !SLAP_SYNC_SUBENTRY( be ) && rc != LDAP_SUCCESS 4063 && rc != LDAP_NO_SUCH_ATTRIBUTE ) { 4064 /* If the DB is genuinely empty, don't generate one either. */ 4065 goto out; 4066 } 4067 if ( !si->si_contextdn.bv_len ) { 4068 ldap_pvt_thread_t tid; 4069 /* a glue entry here with no contextCSN might mean an empty DB. 4070 * we need to search for children, to be sure. 4071 */ 4072 op->o_req_dn = be->be_suffix[0]; 4073 op->o_req_ndn = be->be_nsuffix[0]; 4074 op->o_bd->bd_info = (BackendInfo *)on->on_info; 4075 ldap_pvt_thread_create( &tid, 0, syncprov_db_otask2, op ); 4076 ldap_pvt_thread_join( tid, NULL ); 4077 if ( op->ors_slimit == 1 ) 4078 goto out; 4079 } 4080 4081 csn.bv_val = csnbuf; 4082 csn.bv_len = sizeof( csnbuf ); 4083 slap_get_csn( op, &csn, 0 ); 4084 value_add_one( &si->si_ctxcsn, &csn ); 4085 si->si_numcsns = 1; 4086 si->si_sids = ch_malloc( sizeof(int) ); 4087 si->si_sids[0] = slap_serverID; 4088 Debug( LDAP_DEBUG_SYNC, "syncprov_db_open: " 4089 "generated a new ctxcsn=%s for suffix %s\n", 4090 csn.bv_val, be->be_suffix[0].bv_val ); 4091 4092 /* make sure we do a checkpoint on close */ 4093 si->si_numops++; 4094 } 4095 4096 /* Initialize the sessionlog mincsn */ 4097 if ( si->si_logs && si->si_numcsns ) { 4098 sessionlog *sl = si->si_logs; 4099 int i; 4100 ber_bvarray_dup_x( &sl->sl_mincsn, si->si_ctxcsn, NULL ); 4101 sl->sl_numcsns = si->si_numcsns; 4102 sl->sl_sids = ch_malloc( si->si_numcsns * sizeof(int) ); 4103 for ( i=0; i < si->si_numcsns; i++ ) 4104 sl->sl_sids[i] = si->si_sids[i]; 4105 } 4106 4107 if ( !BER_BVISNULL( &si->si_logbase ) ) { 4108 BackendDB *db = select_backend( &si->si_logbase, 0 ); 4109 if ( !db ) { 4110 Debug( LDAP_DEBUG_ANY, "syncprov_db_open: " 4111 "configured accesslog database dn='%s' not present\n", 4112 si->si_logbase.bv_val ); 4113 return -1; 4114 } 4115 } 4116 4117 out: 4118 op->o_bd->bd_info = (BackendInfo *)on; 4119 return 0; 4120 } 4121 4122 /* Write the current contextCSN into the underlying db. 4123 */ 4124 static int 4125 syncprov_db_close( 4126 BackendDB *be, 4127 ConfigReply *cr 4128 ) 4129 { 4130 slap_overinst *on = (slap_overinst *) be->bd_info; 4131 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 4132 #ifdef SLAP_CONFIG_DELETE 4133 syncops *so, *sonext; 4134 #endif /* SLAP_CONFIG_DELETE */ 4135 4136 if ( slapMode & SLAP_TOOL_MODE ) { 4137 return 0; 4138 } 4139 if ( si->si_numops ) { 4140 Connection conn = {0}; 4141 OperationBuffer opbuf; 4142 Operation *op; 4143 void *thrctx; 4144 4145 thrctx = ldap_pvt_thread_pool_context(); 4146 connection_fake_init2( &conn, &opbuf, thrctx, 0 ); 4147 op = &opbuf.ob_op; 4148 op->o_bd = be; 4149 op->o_dn = be->be_rootdn; 4150 op->o_ndn = be->be_rootndn; 4151 syncprov_checkpoint( op, on ); 4152 } 4153 4154 #ifdef SLAP_CONFIG_DELETE 4155 if ( !slapd_shutdown ) { 4156 ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); 4157 for ( so=si->si_ops, sonext=so; so; so=sonext ) { 4158 SlapReply rs = {REP_RESULT}; 4159 rs.sr_err = LDAP_UNAVAILABLE; 4160 ldap_pvt_thread_mutex_lock( &so->s_mutex ); 4161 send_ldap_result( so->s_op, &rs ); 4162 sonext=so->s_next; 4163 if ( so->s_flags & PS_TASK_QUEUED ) 4164 ldap_pvt_thread_pool_retract( so->s_pool_cookie ); 4165 ldap_pvt_thread_mutex_unlock( &so->s_mutex ); 4166 if ( !syncprov_drop_psearch( so, 0 )) 4167 so->s_si = NULL; 4168 } 4169 si->si_ops=NULL; 4170 ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); 4171 } 4172 overlay_unregister_control( be, LDAP_CONTROL_SYNC ); 4173 #endif /* SLAP_CONFIG_DELETE */ 4174 4175 return 0; 4176 } 4177 4178 static int 4179 syncprov_db_init( 4180 BackendDB *be, 4181 ConfigReply *cr 4182 ) 4183 { 4184 slap_overinst *on = (slap_overinst *)be->bd_info; 4185 syncprov_info_t *si; 4186 4187 if ( SLAP_ISGLOBALOVERLAY( be ) ) { 4188 Debug( LDAP_DEBUG_ANY, 4189 "syncprov must be instantiated within a database.\n" ); 4190 return 1; 4191 } 4192 4193 si = ch_calloc(1, sizeof(syncprov_info_t)); 4194 on->on_bi.bi_private = si; 4195 ldap_pvt_thread_rdwr_init( &si->si_csn_rwlock ); 4196 ldap_pvt_thread_mutex_init( &si->si_ops_mutex ); 4197 ldap_pvt_thread_mutex_init( &si->si_mods_mutex ); 4198 ldap_pvt_thread_mutex_init( &si->si_resp_mutex ); 4199 4200 csn_anlist[0].an_desc = slap_schema.si_ad_entryCSN; 4201 csn_anlist[0].an_name = slap_schema.si_ad_entryCSN->ad_cname; 4202 csn_anlist[1].an_desc = slap_schema.si_ad_entryUUID; 4203 csn_anlist[1].an_name = slap_schema.si_ad_entryUUID->ad_cname; 4204 4205 uuid_anlist[0].an_desc = slap_schema.si_ad_entryUUID; 4206 uuid_anlist[0].an_name = slap_schema.si_ad_entryUUID->ad_cname; 4207 4208 return 0; 4209 } 4210 4211 static int 4212 syncprov_db_destroy( 4213 BackendDB *be, 4214 ConfigReply *cr 4215 ) 4216 { 4217 slap_overinst *on = (slap_overinst *)be->bd_info; 4218 syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; 4219 4220 if ( si ) { 4221 if ( si->si_logs ) { 4222 sessionlog *sl = si->si_logs; 4223 4224 ldap_tavl_free( sl->sl_entries, (AVL_FREE)ch_free ); 4225 if ( sl->sl_mincsn ) 4226 ber_bvarray_free( sl->sl_mincsn ); 4227 if ( sl->sl_sids ) 4228 ch_free( sl->sl_sids ); 4229 4230 ldap_pvt_thread_rdwr_destroy(&si->si_logs->sl_mutex); 4231 ch_free( si->si_logs ); 4232 } 4233 if ( si->si_ctxcsn ) 4234 ber_bvarray_free( si->si_ctxcsn ); 4235 if ( si->si_sids ) 4236 ch_free( si->si_sids ); 4237 if ( si->si_logbase.bv_val ) 4238 ch_free( si->si_logbase.bv_val ); 4239 ldap_pvt_thread_mutex_destroy( &si->si_resp_mutex ); 4240 ldap_pvt_thread_mutex_destroy( &si->si_mods_mutex ); 4241 ldap_pvt_thread_mutex_destroy( &si->si_ops_mutex ); 4242 ldap_pvt_thread_rdwr_destroy( &si->si_csn_rwlock ); 4243 ch_free( si ); 4244 } 4245 4246 return 0; 4247 } 4248 4249 static int syncprov_parseCtrl ( 4250 Operation *op, 4251 SlapReply *rs, 4252 LDAPControl *ctrl ) 4253 { 4254 ber_tag_t tag; 4255 BerElementBuffer berbuf; 4256 BerElement *ber = (BerElement *)&berbuf; 4257 ber_int_t mode; 4258 ber_len_t len; 4259 struct berval cookie = BER_BVNULL; 4260 sync_control *sr; 4261 int rhint = 0; 4262 4263 if ( op->o_sync != SLAP_CONTROL_NONE ) { 4264 rs->sr_text = "Sync control specified multiple times"; 4265 return LDAP_PROTOCOL_ERROR; 4266 } 4267 4268 if ( op->o_pagedresults != SLAP_CONTROL_NONE ) { 4269 rs->sr_text = "Sync control specified with pagedResults control"; 4270 return LDAP_PROTOCOL_ERROR; 4271 } 4272 4273 if ( BER_BVISNULL( &ctrl->ldctl_value ) ) { 4274 rs->sr_text = "Sync control value is absent"; 4275 return LDAP_PROTOCOL_ERROR; 4276 } 4277 4278 if ( BER_BVISEMPTY( &ctrl->ldctl_value ) ) { 4279 rs->sr_text = "Sync control value is empty"; 4280 return LDAP_PROTOCOL_ERROR; 4281 } 4282 4283 /* Parse the control value 4284 * syncRequestValue ::= SEQUENCE { 4285 * mode ENUMERATED { 4286 * -- 0 unused 4287 * refreshOnly (1), 4288 * -- 2 reserved 4289 * refreshAndPersist (3) 4290 * }, 4291 * cookie syncCookie OPTIONAL 4292 * } 4293 */ 4294 4295 ber_init2( ber, &ctrl->ldctl_value, 0 ); 4296 4297 if ( (tag = ber_scanf( ber, "{i" /*}*/, &mode )) == LBER_ERROR ) { 4298 rs->sr_text = "Sync control : mode decoding error"; 4299 return LDAP_PROTOCOL_ERROR; 4300 } 4301 4302 switch( mode ) { 4303 case LDAP_SYNC_REFRESH_ONLY: 4304 mode = SLAP_SYNC_REFRESH; 4305 break; 4306 case LDAP_SYNC_REFRESH_AND_PERSIST: 4307 mode = SLAP_SYNC_REFRESH_AND_PERSIST; 4308 break; 4309 default: 4310 rs->sr_text = "Sync control : unknown update mode"; 4311 return LDAP_PROTOCOL_ERROR; 4312 } 4313 4314 tag = ber_peek_tag( ber, &len ); 4315 4316 if ( tag == LDAP_TAG_SYNC_COOKIE ) { 4317 if (( ber_scanf( ber, /*{*/ "m", &cookie )) == LBER_ERROR ) { 4318 rs->sr_text = "Sync control : cookie decoding error"; 4319 return LDAP_PROTOCOL_ERROR; 4320 } 4321 tag = ber_peek_tag( ber, &len ); 4322 } 4323 if ( tag == LDAP_TAG_RELOAD_HINT ) { 4324 if (( ber_scanf( ber, /*{*/ "b", &rhint )) == LBER_ERROR ) { 4325 rs->sr_text = "Sync control : rhint decoding error"; 4326 return LDAP_PROTOCOL_ERROR; 4327 } 4328 } 4329 if (( ber_scanf( ber, /*{*/ "}")) == LBER_ERROR ) { 4330 rs->sr_text = "Sync control : decoding error"; 4331 return LDAP_PROTOCOL_ERROR; 4332 } 4333 sr = op->o_tmpcalloc( 1, sizeof(struct sync_control), op->o_tmpmemctx ); 4334 sr->sr_rhint = rhint; 4335 if (!BER_BVISNULL(&cookie)) { 4336 ber_dupbv_x( &sr->sr_state.octet_str, &cookie, op->o_tmpmemctx ); 4337 /* If parse fails, pretend no cookie was sent */ 4338 if ( slap_parse_sync_cookie( &sr->sr_state, op->o_tmpmemctx ) || 4339 sr->sr_state.rid == -1 ) { 4340 if ( sr->sr_state.ctxcsn ) { 4341 ber_bvarray_free_x( sr->sr_state.ctxcsn, op->o_tmpmemctx ); 4342 sr->sr_state.ctxcsn = NULL; 4343 } 4344 sr->sr_state.numcsns = 0; 4345 } 4346 } 4347 4348 op->o_controls[slap_cids.sc_LDAPsync] = sr; 4349 4350 op->o_sync = ctrl->ldctl_iscritical 4351 ? SLAP_CONTROL_CRITICAL 4352 : SLAP_CONTROL_NONCRITICAL; 4353 4354 op->o_sync_mode |= mode; /* o_sync_mode shares o_sync */ 4355 4356 return LDAP_SUCCESS; 4357 } 4358 4359 /* This overlay is set up for dynamic loading via moduleload. For static 4360 * configuration, you'll need to arrange for the slap_overinst to be 4361 * initialized and registered by some other function inside slapd. 4362 */ 4363 4364 static slap_overinst syncprov; 4365 4366 int 4367 syncprov_initialize() 4368 { 4369 int rc; 4370 4371 rc = register_supported_control( LDAP_CONTROL_SYNC, 4372 SLAP_CTRL_SEARCH, NULL, 4373 syncprov_parseCtrl, &slap_cids.sc_LDAPsync ); 4374 if ( rc != LDAP_SUCCESS ) { 4375 Debug( LDAP_DEBUG_ANY, 4376 "syncprov_init: Failed to register control %d\n", rc ); 4377 return rc; 4378 } 4379 4380 syncprov.on_bi.bi_type = "syncprov"; 4381 syncprov.on_bi.bi_flags = SLAPO_BFLAG_SINGLE; 4382 syncprov.on_bi.bi_db_init = syncprov_db_init; 4383 syncprov.on_bi.bi_db_destroy = syncprov_db_destroy; 4384 syncprov.on_bi.bi_db_open = syncprov_db_open; 4385 syncprov.on_bi.bi_db_close = syncprov_db_close; 4386 4387 syncprov.on_bi.bi_op_abandon = syncprov_op_abandon; 4388 syncprov.on_bi.bi_op_cancel = syncprov_op_abandon; 4389 4390 syncprov.on_bi.bi_op_add = syncprov_op_mod; 4391 syncprov.on_bi.bi_op_compare = syncprov_op_compare; 4392 syncprov.on_bi.bi_op_delete = syncprov_op_mod; 4393 syncprov.on_bi.bi_op_modify = syncprov_op_mod; 4394 syncprov.on_bi.bi_op_modrdn = syncprov_op_mod; 4395 syncprov.on_bi.bi_op_search = syncprov_op_search; 4396 syncprov.on_bi.bi_extended = syncprov_op_extended; 4397 syncprov.on_bi.bi_operational = syncprov_operational; 4398 4399 syncprov.on_bi.bi_cf_ocs = spocs; 4400 4401 generic_filter.f_desc = slap_schema.si_ad_objectClass; 4402 4403 rc = config_register_schema( spcfg, spocs ); 4404 if ( rc ) return rc; 4405 4406 return overlay_register( &syncprov ); 4407 } 4408 4409 #if SLAPD_OVER_SYNCPROV == SLAPD_MOD_DYNAMIC 4410 int 4411 init_module( int argc, char *argv[] ) 4412 { 4413 return syncprov_initialize(); 4414 } 4415 #endif /* SLAPD_OVER_SYNCPROV == SLAPD_MOD_DYNAMIC */ 4416 4417 #endif /* defined(SLAPD_OVER_SYNCPROV) */ 4418