Home | History | Annotate | Line # | Download | only in liblmdb
mplay.c revision 1.1
      1 /*	$NetBSD: mplay.c,v 1.1 2025/09/05 21:09:33 christos Exp $	*/
      2 
      3 /* mplay.c - memory-mapped database log replay */
      4 /*
      5  * Copyright 2011-2023 Howard Chu, Symas Corp.
      6  * All rights reserved.
      7  *
      8  * Redistribution and use in source and binary forms, with or without
      9  * modification, are permitted only as authorized by the OpenLDAP
     10  * Public License.
     11  *
     12  * A copy of this license is available in the file LICENSE in the
     13  * top-level directory of the distribution or, alternatively, at
     14  * <http://www.OpenLDAP.org/license.html>.
     15  */
     16 #include <stdio.h>
     17 #include <stdlib.h>
     18 #include <unistd.h>
     19 #include <time.h>
     20 #include <string.h>
     21 #include <ctype.h>
     22 #include <assert.h>
     23 #include <sys/types.h>
     24 #include <sys/wait.h>
     25 
     26 #include "lmdb.h"
     27 
     28 #define E(expr) CHECK((rc = (expr)) == MDB_SUCCESS, #expr)
     29 #define RES(err, expr) ((rc = expr) == (err) || (CHECK(!rc, #expr), 0))
     30 #define CHECK(test, msg) ((test) ? (void)0 : ((void)fprintf(stderr, \
     31 	"%s:%d: %s: %s\n", __FILE__, __LINE__, msg, mdb_strerror(rc)), abort()))
     32 
     33 #define MDB_SCNy(t)	"z" #t
     34 
     35 #define SCMP(s)	s, (sizeof(s)-1)
     36 char inbuf[8192];
     37 char *dbuf, *kbuf;
     38 size_t dbufsize;
     39 int maxkey;
     40 
     41 #define SOFF(s)	(sizeof(s)+1)
     42 
     43 #define MAXENVS	16
     44 #define MAXTXNS	16
     45 #define MAXCRSS	16
     46 
     47 #define MAXPIDS	16
     48 
     49 typedef struct crspair {
     50 	void *tcrs;	/* scanned text pointer */
     51 	MDB_cursor *rcrs;
     52 } crspair;
     53 
     54 typedef struct txnpair {
     55 	void *ttxn;	/* scanned text pointer */
     56 	MDB_txn *rtxn;
     57 	crspair cursors[MAXCRSS];
     58 	int ncursors;
     59 } txnpair;
     60 
     61 typedef struct envpair {
     62 	void *tenv;
     63 	MDB_env *renv;
     64 	txnpair txns[MAXTXNS];
     65 	int ntxns;
     66 } envpair;
     67 
     68 envpair envs[MAXENVS];
     69 int nenvs;
     70 
     71 envpair *lastenv;
     72 txnpair *lasttxn;
     73 crspair *lastcrs;
     74 
     75 typedef struct pidpair {
     76 	int tpid;
     77 	pid_t rpid;
     78 	int fdout, fdin;
     79 } pidpair;
     80 
     81 pidpair *lastpid;
     82 
     83 pidpair pids[MAXPIDS];
     84 int npids;
     85 
     86 unsigned long lcount;
     87 
     88 static int unhex(unsigned char *c2)
     89 {
     90 	int x, c;
     91 	x = *c2++ & 0x4f;
     92 	if (x & 0x40)
     93 		x -= 55;
     94 	c = x << 4;
     95 	x = *c2 & 0x4f;
     96 	if (x & 0x40)
     97 		x -= 55;
     98 	c |= x;
     99 	return c;
    100 }
    101 
    102 int inhex(char *in, char *out)
    103 {
    104 	char *c2 = out;
    105 	while (isxdigit(*in)) {
    106 		*c2++ = unhex((unsigned char *)in);
    107 		in += 2;
    108 	}
    109 	return c2 - out;
    110 }
    111 
    112 static void addenv(void *tenv, MDB_env *renv)
    113 {
    114 	assert(nenvs < MAXENVS);
    115 	envs[nenvs].tenv = tenv;
    116 	envs[nenvs].renv = renv;
    117 	envs[nenvs].ntxns = 0;
    118 	lastenv = envs+nenvs;
    119 	nenvs++;
    120 }
    121 
    122 static envpair *findenv(void *tenv)
    123 {
    124 	int i;
    125 	if (!lastenv || lastenv->tenv != tenv) {
    126 		for (i=0; i<nenvs; i++)
    127 			if (envs[i].tenv == tenv)
    128 				break;
    129 		assert(i < nenvs);
    130 		lastenv = &envs[i];
    131 	}
    132 	return lastenv;
    133 }
    134 
    135 static void delenv(envpair *ep)
    136 {
    137 	int i = ep - envs;
    138 	for (; i<nenvs-1; i++)
    139 		envs[i] = envs[i+1];
    140 	nenvs--;
    141 	lastenv = NULL;
    142 }
    143 
    144 static void addtxn(void *tenv, void *ttxn, MDB_txn *rtxn)
    145 {
    146 	envpair *ep;
    147 	txnpair *tp;
    148 
    149 	ep = findenv(tenv);
    150 	assert(ep->ntxns < MAXTXNS);
    151 	tp = ep->txns+ep->ntxns;
    152 	tp->ttxn = ttxn;
    153 	tp->rtxn = rtxn;
    154 	tp->ncursors = 0;
    155 	ep->ntxns++;
    156 	lasttxn = tp;
    157 }
    158 
    159 static txnpair *findtxn(void *ttxn)
    160 {
    161 	int i, j;
    162 	if (lasttxn && lasttxn->ttxn == ttxn)
    163 		return lasttxn;
    164 	if (lastenv) {
    165 		for (i=0; i<lastenv->ntxns; i++) {
    166 			if (lastenv->txns[i].ttxn == ttxn) {
    167 				lasttxn = lastenv->txns+i;
    168 				return lasttxn;
    169 			}
    170 		}
    171 	}
    172 	for (i=0; i<nenvs; i++) {
    173 		if (envs+i == lastenv) continue;
    174 		for (j=0; j<envs[i].ntxns; j++) {
    175 			if (envs[i].txns[j].ttxn == ttxn) {
    176 				lastenv = envs+i;
    177 				lasttxn = envs[i].txns+j;
    178 				return lasttxn;
    179 			}
    180 		}
    181 	}
    182 	assert(0);	/* should have found it */
    183 }
    184 
    185 static void deltxn(txnpair *tp)
    186 {
    187 	int i = tp - lastenv->txns;
    188 	for (; i<lastenv->ntxns-1; i++)
    189 		lastenv->txns[i] = lastenv->txns[i+1];
    190 	lastenv->ntxns--;
    191 	lasttxn = NULL;
    192 }
    193 
    194 static void addcrs(txnpair *tp, void *tcrs, MDB_cursor *rcrs)
    195 {
    196 	int j = tp->ncursors;
    197 	assert(tp->ncursors < MAXCRSS);
    198 
    199 	tp->cursors[j].tcrs = tcrs;
    200 	tp->cursors[j].rcrs = rcrs;
    201 	tp->ncursors++;
    202 	lastcrs = tp->cursors+j;
    203 }
    204 
    205 static crspair *findcrs(void *tcrs)
    206 {
    207 	int i, j, k;
    208 	envpair *ep;
    209 	txnpair *tp;
    210 	crspair *cp;
    211 	if (lastcrs && lastcrs->tcrs == tcrs)
    212 		return lastcrs;
    213 	if (lasttxn) {
    214 		for (k=0, cp=lasttxn->cursors; k<lasttxn->ncursors; k++, cp++) {
    215 			if (cp->tcrs == tcrs) {
    216 				lastcrs = cp;
    217 				return lastcrs;
    218 			}
    219 		}
    220 	}
    221 	if (lastenv) {
    222 		for (j=0, tp=lastenv->txns; j<lastenv->ntxns; j++, tp++){
    223 			if (tp == lasttxn)
    224 				continue;
    225 			for (k=0, cp = tp->cursors; k<tp->ncursors; k++, cp++) {
    226 				if (cp->tcrs == tcrs) {
    227 					lastcrs = cp;
    228 					lasttxn = tp;
    229 					return lastcrs;
    230 				}
    231 			}
    232 		}
    233 	}
    234 	for (i=0, ep=envs; i<nenvs; i++, ep++) {
    235 		for (j=0, tp=ep->txns; j<ep->ntxns; j++, tp++) {
    236 			if (tp == lasttxn)
    237 				continue;
    238 			for (k=0, cp = tp->cursors; k<tp->ncursors; k++, cp++) {
    239 				if (cp->tcrs == tcrs) {
    240 					lastcrs = cp;
    241 					lasttxn = tp;
    242 					lastenv = ep;
    243 					return lastcrs;
    244 				}
    245 			}
    246 		}
    247 	}
    248 	assert(0);	/* should have found it already */
    249 }
    250 
    251 static void delcrs(void *tcrs)
    252 {
    253 	int i;
    254 	crspair *cp = findcrs(tcrs);
    255 	mdb_cursor_close(cp->rcrs);
    256 	for (i = cp - lasttxn->cursors; i<lasttxn->ncursors-1; i++)
    257 		lasttxn->cursors[i] = lasttxn->cursors[i+1];
    258 	lasttxn->ncursors--;
    259 	lastcrs = NULL;
    260 }
    261 
    262 void child()
    263 {
    264 	int rc;
    265 	MDB_val key, data;
    266 	char *ptr;
    267 
    268 	while (fgets(inbuf, sizeof(inbuf), stdin)) {
    269 		ptr = inbuf;
    270 		if (!strncmp(ptr, SCMP("exit")))
    271 			break;
    272 
    273 		if (!strncmp(ptr, SCMP("mdb_env_create"))) {
    274 			void *tenv;
    275 			MDB_env *renv;
    276 			sscanf(ptr+SOFF("mdb_env_create"), "%p", &tenv);
    277 			E(mdb_env_create(&renv));
    278 			addenv(tenv, renv);
    279 		} else if (!strncmp(ptr, SCMP("mdb_env_set_maxdbs"))) {
    280 			void *tenv;
    281 			envpair *ep;
    282 			unsigned int maxdbs;
    283 			sscanf(ptr+SOFF("mdb_env_set_maxdbs"), "%p, %u", &tenv, &maxdbs);
    284 			ep = findenv(tenv);
    285 			E(mdb_env_set_maxdbs(ep->renv, maxdbs));
    286 		} else if (!strncmp(ptr, SCMP("mdb_env_set_mapsize"))) {
    287 			void *tenv;
    288 			envpair *ep;
    289 			size_t mapsize;
    290 			sscanf(ptr+SOFF("mdb_env_set_mapsize"), "%p, %"MDB_SCNy(u), &tenv, &mapsize);
    291 			ep = findenv(tenv);
    292 			E(mdb_env_set_mapsize(ep->renv, mapsize));
    293 		} else if (!strncmp(ptr, SCMP("mdb_env_open"))) {
    294 			void *tenv;
    295 			envpair *ep;
    296 			char *path;
    297 			int len;
    298 			unsigned int flags, mode;
    299 			sscanf(ptr+SOFF("mdb_env_open"), "%p, %n", &tenv, &len);
    300 			path = ptr+SOFF("mdb_env_open")+len;
    301 			ptr = strchr(path, ',');
    302 			*ptr++ = '\0';
    303 			sscanf(ptr, "%u, %o", &flags, &mode);
    304 			ep = findenv(tenv);
    305 			E(mdb_env_open(ep->renv, path, flags, mode));
    306 			if (!maxkey) {
    307 				maxkey = mdb_env_get_maxkeysize(ep->renv);
    308 				kbuf = malloc(maxkey+2);
    309 				dbuf = malloc(maxkey+2);
    310 				dbufsize = maxkey;
    311 			}
    312 		} else if (!strncmp(ptr, SCMP("mdb_env_close"))) {
    313 			void *tenv;
    314 			envpair *ep;
    315 			sscanf(ptr+SOFF("mdb_env_close"), "%p", &tenv);
    316 			ep = findenv(tenv);
    317 			mdb_env_close(ep->renv);
    318 			delenv(ep);
    319 			if (!nenvs)	/* if no other envs left, this process is done */
    320 				break;
    321 		} else if (!strncmp(ptr, SCMP("mdb_txn_begin"))) {
    322 			unsigned int flags;
    323 			void *tenv, *ttxn;
    324 			envpair *ep;
    325 			MDB_txn *rtxn;
    326 			sscanf(ptr+SOFF("mdb_txn_begin"), "%p, %*p, %u = %p", &tenv, &flags, &ttxn);
    327 			ep = findenv(tenv);
    328 			E(mdb_txn_begin(ep->renv, NULL, flags, &rtxn));
    329 			addtxn(tenv, ttxn, rtxn);
    330 		} else if (!strncmp(ptr, SCMP("mdb_txn_commit"))) {
    331 			void *ttxn;
    332 			txnpair *tp;
    333 			sscanf(ptr+SOFF("mdb_txn_commit"), "%p", &ttxn);
    334 			tp = findtxn(ttxn);
    335 			E(mdb_txn_commit(tp->rtxn));
    336 			deltxn(tp);
    337 		} else if (!strncmp(ptr, SCMP("mdb_txn_abort"))) {
    338 			void *ttxn;
    339 			txnpair *tp;
    340 			sscanf(ptr+SOFF("mdb_txn_abort"), "%p", &ttxn);
    341 			tp = findtxn(ttxn);
    342 			mdb_txn_abort(tp->rtxn);
    343 			deltxn(tp);
    344 		} else if (!strncmp(ptr, SCMP("mdb_dbi_open"))) {
    345 			void *ttxn;
    346 			txnpair *tp;
    347 			char *dbname;
    348 			unsigned int flags;
    349 			unsigned int tdbi;
    350 			MDB_dbi dbi;
    351 			sscanf(ptr+SOFF("mdb_dbi_open"), "%p, ", &ttxn);
    352 			dbname = strchr(ptr+SOFF("mdb_dbi_open"), ',');
    353 			dbname += 2;
    354 			ptr = strchr(dbname, ',');
    355 			*ptr++ = '\0';
    356 			if (!strcmp(dbname, "(null)"))
    357 				dbname = NULL;
    358 			sscanf(ptr, "%u = %u", &flags, &tdbi);
    359 			tp = findtxn(ttxn);
    360 			E(mdb_dbi_open(tp->rtxn, dbname, flags, &dbi));
    361 		} else if (!strncmp(ptr, SCMP("mdb_dbi_close"))) {
    362 			void *tenv;
    363 			envpair *ep;
    364 			unsigned int tdbi;
    365 			sscanf(ptr+SOFF("mdb_dbi_close"), "%p, %u", &tenv, &tdbi);
    366 			ep = findenv(tenv);
    367 			mdb_dbi_close(ep->renv, tdbi);
    368 		} else if (!strncmp(ptr, SCMP("mdb_cursor_open"))) {
    369 			void *ttxn, *tcrs;
    370 			txnpair *tp;
    371 			MDB_cursor *rcrs;
    372 			unsigned int tdbi;
    373 			sscanf(ptr+SOFF("mdb_cursor_open"), "%p, %u = %p", &ttxn, &tdbi, &tcrs);
    374 			tp = findtxn(ttxn);
    375 			E(mdb_cursor_open(tp->rtxn, tdbi, &rcrs));
    376 			addcrs(tp, tcrs, rcrs);
    377 		} else if (!strncmp(ptr, SCMP("mdb_cursor_close"))) {
    378 			void *tcrs;
    379 			sscanf(ptr+SOFF("mdb_cursor_close"), "%p", &tcrs);
    380 			delcrs(tcrs);
    381 		} else if (!strncmp(ptr, SCMP("mdb_cursor_put"))) {
    382 			void *tcrs;
    383 			crspair *cp;
    384 			unsigned int flags;
    385 			int len;
    386 			sscanf(ptr+SOFF("mdb_cursor_put"), "%p, ", &tcrs);
    387 			cp = findcrs(tcrs);
    388 			ptr = strchr(ptr+SOFF("mdb_cursor_put"), ',');
    389 			sscanf(ptr+1, "%"MDB_SCNy(u)",", &key.mv_size);
    390 			if (key.mv_size) {
    391 				ptr = strchr(ptr, '[');
    392 				inhex(ptr+1, kbuf);
    393 				key.mv_data = kbuf;
    394 				ptr += key.mv_size * 2 + 2;
    395 			}
    396 			ptr = strchr(ptr+1, ',');
    397 			sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len);
    398 			if (data.mv_size > dbufsize) {
    399 				dbuf = realloc(dbuf, data.mv_size+2);
    400 				assert(dbuf != NULL);
    401 				dbufsize = data.mv_size;
    402 			}
    403 			ptr += len+1;
    404 			if (*ptr == '[') {
    405 				inhex(ptr+1, dbuf);
    406 				data.mv_data = dbuf;
    407 				ptr += data.mv_size * 2 + 2;
    408 			} else {
    409 				sprintf(dbuf, "%09ld", (long)mdb_txn_id(lasttxn->rtxn));
    410 			}
    411 			sscanf(ptr+1, "%u", &flags);
    412 			E(mdb_cursor_put(cp->rcrs, &key, &data, flags));
    413 		} else if (!strncmp(ptr, SCMP("mdb_cursor_del"))) {
    414 			void *tcrs;
    415 			crspair *cp;
    416 			unsigned int flags;
    417 			sscanf(ptr+SOFF("mdb_cursor_del"), "%p, %u", &tcrs, &flags);
    418 			cp = findcrs(tcrs);
    419 			E(mdb_cursor_del(cp->rcrs, flags));
    420 		} else if (!strncmp(ptr, SCMP("mdb_put"))) {
    421 			void *ttxn;
    422 			txnpair *tp;
    423 			unsigned int tdbi, flags;
    424 			int len;
    425 			sscanf(ptr+SOFF("mdb_put"),"%p, %u, %"MDB_SCNy(u), &ttxn, &tdbi, &key.mv_size);
    426 			tp = findtxn(ttxn);
    427 			ptr = strchr(ptr+SOFF("mdb_put"), '[');
    428 			inhex(ptr+1, kbuf);
    429 			key.mv_data = kbuf;
    430 			ptr += key.mv_size * 2 + 2;
    431 			sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len);
    432 			if (data.mv_size > dbufsize) {
    433 				dbuf = realloc(dbuf, data.mv_size+2);
    434 				assert(dbuf != NULL);
    435 				dbufsize = data.mv_size;
    436 			}
    437 			ptr += len+1;
    438 			if (*ptr == '[') {
    439 				inhex(ptr+1, dbuf);
    440 				ptr += data.mv_size * 2 + 2;
    441 			} else {
    442 				sprintf(dbuf, "%09ld", (long)mdb_txn_id(tp->rtxn));
    443 			}
    444 			data.mv_data = dbuf;
    445 			sscanf(ptr+1, "%u", &flags);
    446 			RES(MDB_KEYEXIST,mdb_put(tp->rtxn, tdbi, &key, &data, flags));
    447 		} else if (!strncmp(ptr, SCMP("mdb_del"))) {
    448 			void *ttxn;
    449 			txnpair *tp;
    450 			unsigned int tdbi;
    451 			int len;
    452 			sscanf(ptr+SOFF("mdb_del"),"%p, %u, %"MDB_SCNy(u), &ttxn, &tdbi, &key.mv_size);
    453 			tp = findtxn(ttxn);
    454 			ptr = strchr(ptr+SOFF("mdb_del"), '[');
    455 			inhex(ptr+1, kbuf);
    456 			key.mv_data = kbuf;
    457 			ptr += key.mv_size * 2 + 2;
    458 			sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len);
    459 			if (data.mv_size > dbufsize) {
    460 				dbuf = realloc(dbuf, data.mv_size+2);
    461 				assert(dbuf != NULL);
    462 				dbufsize = data.mv_size;
    463 			}
    464 			ptr += len+1;
    465 			if (*ptr == '[') {
    466 				inhex(ptr+1, dbuf);
    467 			} else {
    468 				sprintf(dbuf, "%09ld", (long)mdb_txn_id(tp->rtxn));
    469 			}
    470 			data.mv_data = dbuf;
    471 			RES(MDB_NOTFOUND,mdb_del(tp->rtxn, tdbi, &key, &data));
    472 		}
    473 		write(1, "\n", 1);
    474 	}
    475 	exit(0);
    476 }
    477 
    478 static pidpair *addpid(int tpid)
    479 {
    480 	int fdout[2], fdin[2];
    481 	pid_t pid;
    482 	assert(npids < MAXPIDS);
    483 	pids[npids].tpid = tpid;
    484 	pipe(fdout);
    485 	pipe(fdin);
    486 	if ((pid = fork()) == 0) {
    487 		/* child */
    488 		fclose(stdin);
    489 		fclose(stdout);
    490 		dup2(fdout[0], 0);
    491 		dup2(fdin[1], 1);
    492 		stdin = fdopen(0, "r");
    493 		stdout = fdopen(1, "w");
    494 		child();
    495 		return 0;	/* NOTREACHED */
    496 	} else {
    497 		pids[npids].rpid = pid;
    498 		pids[npids].fdout = fdout[1];
    499 		pids[npids].fdin = fdin[0];
    500 		lastpid = pids+npids;
    501 		npids++;
    502 		return lastpid;
    503 	}
    504 }
    505 
    506 static pidpair *findpid(int tpid)
    507 {
    508 	int i;
    509 	if (!lastpid || lastpid->tpid != tpid) {
    510 		for (i=0; i<npids; i++)
    511 			if (pids[i].tpid == tpid)
    512 				break;
    513 		if (i == npids)
    514 			return NULL;
    515 		lastpid = &pids[i];
    516 	}
    517 	return lastpid;
    518 }
    519 
    520 volatile pid_t killpid;
    521 
    522 static void delpid(int tpid)
    523 {
    524 	pidpair *pp = findpid(tpid);
    525 	if (pp) {
    526 		pid_t kpid = pp->rpid;
    527 		killpid = kpid;
    528 		write(pp->fdout, "exit\n", sizeof("exit"));
    529 		while (killpid == kpid)
    530 			usleep(10000);
    531 	}
    532 }
    533 
    534 static void reaper(int sig)
    535 {
    536 	int status, i;
    537 	pid_t pid = waitpid(-1, &status, 0);
    538 	if (pid > 0) {
    539 		fprintf(stderr, "# %s %d\n", WIFEXITED(status) ? "exited" : "killed", pid);
    540 		for (i=0; i<npids; i++)
    541 			if (pids[i].rpid == pid)
    542 				break;
    543 		assert(i < npids);
    544 		close(pids[i].fdout);
    545 		close(pids[i].fdin);
    546 		for (;i<npids-1; i++)
    547 			pids[i] = pids[i+1];
    548 		npids--;
    549 		killpid = 0;
    550 	}
    551 }
    552 
    553 int main(int argc,char * argv[])
    554 {
    555 	signal(SIGCHLD, reaper);
    556 
    557 	while (fgets(inbuf, sizeof(inbuf), stdin)) {
    558 		pidpair *pp;
    559 		int tpid, len;
    560 		char c, *ptr;
    561 		lcount++;
    562 
    563 		if (inbuf[0] == '#' && !strncmp(inbuf+1, SCMP(" killed"))) {
    564 			sscanf(inbuf+SOFF("killed"),"%d", &tpid);
    565 			delpid(tpid);
    566 			continue;
    567 		}
    568 
    569 		if (inbuf[0] != '>')
    570 			continue;
    571 		ptr = inbuf+1;
    572 		sscanf(ptr, "%d:%n", &tpid, &len);
    573 		pp = findpid(tpid);
    574 		if (!pp)
    575 			pp = addpid(tpid);	/* new process */
    576 
    577 		ptr = inbuf+len+1;
    578 		len = strlen(ptr);
    579 		write(pp->fdout, ptr, len);	/* send command and wait for ack */
    580 		read(pp->fdin, &c, 1);
    581 	}
    582 	while (npids)
    583 		delpid(pids[0].tpid);
    584 }
    585