mplay.c revision 1.1 1 /* $NetBSD: mplay.c,v 1.1 2025/09/05 21:09:33 christos Exp $ */
2
3 /* mplay.c - memory-mapped database log replay */
4 /*
5 * Copyright 2011-2023 Howard Chu, Symas Corp.
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted only as authorized by the OpenLDAP
10 * Public License.
11 *
12 * A copy of this license is available in the file LICENSE in the
13 * top-level directory of the distribution or, alternatively, at
14 * <http://www.OpenLDAP.org/license.html>.
15 */
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <unistd.h>
19 #include <time.h>
20 #include <string.h>
21 #include <ctype.h>
22 #include <assert.h>
23 #include <sys/types.h>
24 #include <sys/wait.h>
25
26 #include "lmdb.h"
27
28 #define E(expr) CHECK((rc = (expr)) == MDB_SUCCESS, #expr)
29 #define RES(err, expr) ((rc = expr) == (err) || (CHECK(!rc, #expr), 0))
30 #define CHECK(test, msg) ((test) ? (void)0 : ((void)fprintf(stderr, \
31 "%s:%d: %s: %s\n", __FILE__, __LINE__, msg, mdb_strerror(rc)), abort()))
32
33 #define MDB_SCNy(t) "z" #t
34
35 #define SCMP(s) s, (sizeof(s)-1)
36 char inbuf[8192];
37 char *dbuf, *kbuf;
38 size_t dbufsize;
39 int maxkey;
40
41 #define SOFF(s) (sizeof(s)+1)
42
43 #define MAXENVS 16
44 #define MAXTXNS 16
45 #define MAXCRSS 16
46
47 #define MAXPIDS 16
48
49 typedef struct crspair {
50 void *tcrs; /* scanned text pointer */
51 MDB_cursor *rcrs;
52 } crspair;
53
54 typedef struct txnpair {
55 void *ttxn; /* scanned text pointer */
56 MDB_txn *rtxn;
57 crspair cursors[MAXCRSS];
58 int ncursors;
59 } txnpair;
60
61 typedef struct envpair {
62 void *tenv;
63 MDB_env *renv;
64 txnpair txns[MAXTXNS];
65 int ntxns;
66 } envpair;
67
68 envpair envs[MAXENVS];
69 int nenvs;
70
71 envpair *lastenv;
72 txnpair *lasttxn;
73 crspair *lastcrs;
74
75 typedef struct pidpair {
76 int tpid;
77 pid_t rpid;
78 int fdout, fdin;
79 } pidpair;
80
81 pidpair *lastpid;
82
83 pidpair pids[MAXPIDS];
84 int npids;
85
86 unsigned long lcount;
87
88 static int unhex(unsigned char *c2)
89 {
90 int x, c;
91 x = *c2++ & 0x4f;
92 if (x & 0x40)
93 x -= 55;
94 c = x << 4;
95 x = *c2 & 0x4f;
96 if (x & 0x40)
97 x -= 55;
98 c |= x;
99 return c;
100 }
101
102 int inhex(char *in, char *out)
103 {
104 char *c2 = out;
105 while (isxdigit(*in)) {
106 *c2++ = unhex((unsigned char *)in);
107 in += 2;
108 }
109 return c2 - out;
110 }
111
112 static void addenv(void *tenv, MDB_env *renv)
113 {
114 assert(nenvs < MAXENVS);
115 envs[nenvs].tenv = tenv;
116 envs[nenvs].renv = renv;
117 envs[nenvs].ntxns = 0;
118 lastenv = envs+nenvs;
119 nenvs++;
120 }
121
122 static envpair *findenv(void *tenv)
123 {
124 int i;
125 if (!lastenv || lastenv->tenv != tenv) {
126 for (i=0; i<nenvs; i++)
127 if (envs[i].tenv == tenv)
128 break;
129 assert(i < nenvs);
130 lastenv = &envs[i];
131 }
132 return lastenv;
133 }
134
135 static void delenv(envpair *ep)
136 {
137 int i = ep - envs;
138 for (; i<nenvs-1; i++)
139 envs[i] = envs[i+1];
140 nenvs--;
141 lastenv = NULL;
142 }
143
144 static void addtxn(void *tenv, void *ttxn, MDB_txn *rtxn)
145 {
146 envpair *ep;
147 txnpair *tp;
148
149 ep = findenv(tenv);
150 assert(ep->ntxns < MAXTXNS);
151 tp = ep->txns+ep->ntxns;
152 tp->ttxn = ttxn;
153 tp->rtxn = rtxn;
154 tp->ncursors = 0;
155 ep->ntxns++;
156 lasttxn = tp;
157 }
158
159 static txnpair *findtxn(void *ttxn)
160 {
161 int i, j;
162 if (lasttxn && lasttxn->ttxn == ttxn)
163 return lasttxn;
164 if (lastenv) {
165 for (i=0; i<lastenv->ntxns; i++) {
166 if (lastenv->txns[i].ttxn == ttxn) {
167 lasttxn = lastenv->txns+i;
168 return lasttxn;
169 }
170 }
171 }
172 for (i=0; i<nenvs; i++) {
173 if (envs+i == lastenv) continue;
174 for (j=0; j<envs[i].ntxns; j++) {
175 if (envs[i].txns[j].ttxn == ttxn) {
176 lastenv = envs+i;
177 lasttxn = envs[i].txns+j;
178 return lasttxn;
179 }
180 }
181 }
182 assert(0); /* should have found it */
183 }
184
185 static void deltxn(txnpair *tp)
186 {
187 int i = tp - lastenv->txns;
188 for (; i<lastenv->ntxns-1; i++)
189 lastenv->txns[i] = lastenv->txns[i+1];
190 lastenv->ntxns--;
191 lasttxn = NULL;
192 }
193
194 static void addcrs(txnpair *tp, void *tcrs, MDB_cursor *rcrs)
195 {
196 int j = tp->ncursors;
197 assert(tp->ncursors < MAXCRSS);
198
199 tp->cursors[j].tcrs = tcrs;
200 tp->cursors[j].rcrs = rcrs;
201 tp->ncursors++;
202 lastcrs = tp->cursors+j;
203 }
204
205 static crspair *findcrs(void *tcrs)
206 {
207 int i, j, k;
208 envpair *ep;
209 txnpair *tp;
210 crspair *cp;
211 if (lastcrs && lastcrs->tcrs == tcrs)
212 return lastcrs;
213 if (lasttxn) {
214 for (k=0, cp=lasttxn->cursors; k<lasttxn->ncursors; k++, cp++) {
215 if (cp->tcrs == tcrs) {
216 lastcrs = cp;
217 return lastcrs;
218 }
219 }
220 }
221 if (lastenv) {
222 for (j=0, tp=lastenv->txns; j<lastenv->ntxns; j++, tp++){
223 if (tp == lasttxn)
224 continue;
225 for (k=0, cp = tp->cursors; k<tp->ncursors; k++, cp++) {
226 if (cp->tcrs == tcrs) {
227 lastcrs = cp;
228 lasttxn = tp;
229 return lastcrs;
230 }
231 }
232 }
233 }
234 for (i=0, ep=envs; i<nenvs; i++, ep++) {
235 for (j=0, tp=ep->txns; j<ep->ntxns; j++, tp++) {
236 if (tp == lasttxn)
237 continue;
238 for (k=0, cp = tp->cursors; k<tp->ncursors; k++, cp++) {
239 if (cp->tcrs == tcrs) {
240 lastcrs = cp;
241 lasttxn = tp;
242 lastenv = ep;
243 return lastcrs;
244 }
245 }
246 }
247 }
248 assert(0); /* should have found it already */
249 }
250
251 static void delcrs(void *tcrs)
252 {
253 int i;
254 crspair *cp = findcrs(tcrs);
255 mdb_cursor_close(cp->rcrs);
256 for (i = cp - lasttxn->cursors; i<lasttxn->ncursors-1; i++)
257 lasttxn->cursors[i] = lasttxn->cursors[i+1];
258 lasttxn->ncursors--;
259 lastcrs = NULL;
260 }
261
262 void child()
263 {
264 int rc;
265 MDB_val key, data;
266 char *ptr;
267
268 while (fgets(inbuf, sizeof(inbuf), stdin)) {
269 ptr = inbuf;
270 if (!strncmp(ptr, SCMP("exit")))
271 break;
272
273 if (!strncmp(ptr, SCMP("mdb_env_create"))) {
274 void *tenv;
275 MDB_env *renv;
276 sscanf(ptr+SOFF("mdb_env_create"), "%p", &tenv);
277 E(mdb_env_create(&renv));
278 addenv(tenv, renv);
279 } else if (!strncmp(ptr, SCMP("mdb_env_set_maxdbs"))) {
280 void *tenv;
281 envpair *ep;
282 unsigned int maxdbs;
283 sscanf(ptr+SOFF("mdb_env_set_maxdbs"), "%p, %u", &tenv, &maxdbs);
284 ep = findenv(tenv);
285 E(mdb_env_set_maxdbs(ep->renv, maxdbs));
286 } else if (!strncmp(ptr, SCMP("mdb_env_set_mapsize"))) {
287 void *tenv;
288 envpair *ep;
289 size_t mapsize;
290 sscanf(ptr+SOFF("mdb_env_set_mapsize"), "%p, %"MDB_SCNy(u), &tenv, &mapsize);
291 ep = findenv(tenv);
292 E(mdb_env_set_mapsize(ep->renv, mapsize));
293 } else if (!strncmp(ptr, SCMP("mdb_env_open"))) {
294 void *tenv;
295 envpair *ep;
296 char *path;
297 int len;
298 unsigned int flags, mode;
299 sscanf(ptr+SOFF("mdb_env_open"), "%p, %n", &tenv, &len);
300 path = ptr+SOFF("mdb_env_open")+len;
301 ptr = strchr(path, ',');
302 *ptr++ = '\0';
303 sscanf(ptr, "%u, %o", &flags, &mode);
304 ep = findenv(tenv);
305 E(mdb_env_open(ep->renv, path, flags, mode));
306 if (!maxkey) {
307 maxkey = mdb_env_get_maxkeysize(ep->renv);
308 kbuf = malloc(maxkey+2);
309 dbuf = malloc(maxkey+2);
310 dbufsize = maxkey;
311 }
312 } else if (!strncmp(ptr, SCMP("mdb_env_close"))) {
313 void *tenv;
314 envpair *ep;
315 sscanf(ptr+SOFF("mdb_env_close"), "%p", &tenv);
316 ep = findenv(tenv);
317 mdb_env_close(ep->renv);
318 delenv(ep);
319 if (!nenvs) /* if no other envs left, this process is done */
320 break;
321 } else if (!strncmp(ptr, SCMP("mdb_txn_begin"))) {
322 unsigned int flags;
323 void *tenv, *ttxn;
324 envpair *ep;
325 MDB_txn *rtxn;
326 sscanf(ptr+SOFF("mdb_txn_begin"), "%p, %*p, %u = %p", &tenv, &flags, &ttxn);
327 ep = findenv(tenv);
328 E(mdb_txn_begin(ep->renv, NULL, flags, &rtxn));
329 addtxn(tenv, ttxn, rtxn);
330 } else if (!strncmp(ptr, SCMP("mdb_txn_commit"))) {
331 void *ttxn;
332 txnpair *tp;
333 sscanf(ptr+SOFF("mdb_txn_commit"), "%p", &ttxn);
334 tp = findtxn(ttxn);
335 E(mdb_txn_commit(tp->rtxn));
336 deltxn(tp);
337 } else if (!strncmp(ptr, SCMP("mdb_txn_abort"))) {
338 void *ttxn;
339 txnpair *tp;
340 sscanf(ptr+SOFF("mdb_txn_abort"), "%p", &ttxn);
341 tp = findtxn(ttxn);
342 mdb_txn_abort(tp->rtxn);
343 deltxn(tp);
344 } else if (!strncmp(ptr, SCMP("mdb_dbi_open"))) {
345 void *ttxn;
346 txnpair *tp;
347 char *dbname;
348 unsigned int flags;
349 unsigned int tdbi;
350 MDB_dbi dbi;
351 sscanf(ptr+SOFF("mdb_dbi_open"), "%p, ", &ttxn);
352 dbname = strchr(ptr+SOFF("mdb_dbi_open"), ',');
353 dbname += 2;
354 ptr = strchr(dbname, ',');
355 *ptr++ = '\0';
356 if (!strcmp(dbname, "(null)"))
357 dbname = NULL;
358 sscanf(ptr, "%u = %u", &flags, &tdbi);
359 tp = findtxn(ttxn);
360 E(mdb_dbi_open(tp->rtxn, dbname, flags, &dbi));
361 } else if (!strncmp(ptr, SCMP("mdb_dbi_close"))) {
362 void *tenv;
363 envpair *ep;
364 unsigned int tdbi;
365 sscanf(ptr+SOFF("mdb_dbi_close"), "%p, %u", &tenv, &tdbi);
366 ep = findenv(tenv);
367 mdb_dbi_close(ep->renv, tdbi);
368 } else if (!strncmp(ptr, SCMP("mdb_cursor_open"))) {
369 void *ttxn, *tcrs;
370 txnpair *tp;
371 MDB_cursor *rcrs;
372 unsigned int tdbi;
373 sscanf(ptr+SOFF("mdb_cursor_open"), "%p, %u = %p", &ttxn, &tdbi, &tcrs);
374 tp = findtxn(ttxn);
375 E(mdb_cursor_open(tp->rtxn, tdbi, &rcrs));
376 addcrs(tp, tcrs, rcrs);
377 } else if (!strncmp(ptr, SCMP("mdb_cursor_close"))) {
378 void *tcrs;
379 sscanf(ptr+SOFF("mdb_cursor_close"), "%p", &tcrs);
380 delcrs(tcrs);
381 } else if (!strncmp(ptr, SCMP("mdb_cursor_put"))) {
382 void *tcrs;
383 crspair *cp;
384 unsigned int flags;
385 int len;
386 sscanf(ptr+SOFF("mdb_cursor_put"), "%p, ", &tcrs);
387 cp = findcrs(tcrs);
388 ptr = strchr(ptr+SOFF("mdb_cursor_put"), ',');
389 sscanf(ptr+1, "%"MDB_SCNy(u)",", &key.mv_size);
390 if (key.mv_size) {
391 ptr = strchr(ptr, '[');
392 inhex(ptr+1, kbuf);
393 key.mv_data = kbuf;
394 ptr += key.mv_size * 2 + 2;
395 }
396 ptr = strchr(ptr+1, ',');
397 sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len);
398 if (data.mv_size > dbufsize) {
399 dbuf = realloc(dbuf, data.mv_size+2);
400 assert(dbuf != NULL);
401 dbufsize = data.mv_size;
402 }
403 ptr += len+1;
404 if (*ptr == '[') {
405 inhex(ptr+1, dbuf);
406 data.mv_data = dbuf;
407 ptr += data.mv_size * 2 + 2;
408 } else {
409 sprintf(dbuf, "%09ld", (long)mdb_txn_id(lasttxn->rtxn));
410 }
411 sscanf(ptr+1, "%u", &flags);
412 E(mdb_cursor_put(cp->rcrs, &key, &data, flags));
413 } else if (!strncmp(ptr, SCMP("mdb_cursor_del"))) {
414 void *tcrs;
415 crspair *cp;
416 unsigned int flags;
417 sscanf(ptr+SOFF("mdb_cursor_del"), "%p, %u", &tcrs, &flags);
418 cp = findcrs(tcrs);
419 E(mdb_cursor_del(cp->rcrs, flags));
420 } else if (!strncmp(ptr, SCMP("mdb_put"))) {
421 void *ttxn;
422 txnpair *tp;
423 unsigned int tdbi, flags;
424 int len;
425 sscanf(ptr+SOFF("mdb_put"),"%p, %u, %"MDB_SCNy(u), &ttxn, &tdbi, &key.mv_size);
426 tp = findtxn(ttxn);
427 ptr = strchr(ptr+SOFF("mdb_put"), '[');
428 inhex(ptr+1, kbuf);
429 key.mv_data = kbuf;
430 ptr += key.mv_size * 2 + 2;
431 sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len);
432 if (data.mv_size > dbufsize) {
433 dbuf = realloc(dbuf, data.mv_size+2);
434 assert(dbuf != NULL);
435 dbufsize = data.mv_size;
436 }
437 ptr += len+1;
438 if (*ptr == '[') {
439 inhex(ptr+1, dbuf);
440 ptr += data.mv_size * 2 + 2;
441 } else {
442 sprintf(dbuf, "%09ld", (long)mdb_txn_id(tp->rtxn));
443 }
444 data.mv_data = dbuf;
445 sscanf(ptr+1, "%u", &flags);
446 RES(MDB_KEYEXIST,mdb_put(tp->rtxn, tdbi, &key, &data, flags));
447 } else if (!strncmp(ptr, SCMP("mdb_del"))) {
448 void *ttxn;
449 txnpair *tp;
450 unsigned int tdbi;
451 int len;
452 sscanf(ptr+SOFF("mdb_del"),"%p, %u, %"MDB_SCNy(u), &ttxn, &tdbi, &key.mv_size);
453 tp = findtxn(ttxn);
454 ptr = strchr(ptr+SOFF("mdb_del"), '[');
455 inhex(ptr+1, kbuf);
456 key.mv_data = kbuf;
457 ptr += key.mv_size * 2 + 2;
458 sscanf(ptr+1, "%"MDB_SCNy(u)"%n", &data.mv_size, &len);
459 if (data.mv_size > dbufsize) {
460 dbuf = realloc(dbuf, data.mv_size+2);
461 assert(dbuf != NULL);
462 dbufsize = data.mv_size;
463 }
464 ptr += len+1;
465 if (*ptr == '[') {
466 inhex(ptr+1, dbuf);
467 } else {
468 sprintf(dbuf, "%09ld", (long)mdb_txn_id(tp->rtxn));
469 }
470 data.mv_data = dbuf;
471 RES(MDB_NOTFOUND,mdb_del(tp->rtxn, tdbi, &key, &data));
472 }
473 write(1, "\n", 1);
474 }
475 exit(0);
476 }
477
478 static pidpair *addpid(int tpid)
479 {
480 int fdout[2], fdin[2];
481 pid_t pid;
482 assert(npids < MAXPIDS);
483 pids[npids].tpid = tpid;
484 pipe(fdout);
485 pipe(fdin);
486 if ((pid = fork()) == 0) {
487 /* child */
488 fclose(stdin);
489 fclose(stdout);
490 dup2(fdout[0], 0);
491 dup2(fdin[1], 1);
492 stdin = fdopen(0, "r");
493 stdout = fdopen(1, "w");
494 child();
495 return 0; /* NOTREACHED */
496 } else {
497 pids[npids].rpid = pid;
498 pids[npids].fdout = fdout[1];
499 pids[npids].fdin = fdin[0];
500 lastpid = pids+npids;
501 npids++;
502 return lastpid;
503 }
504 }
505
506 static pidpair *findpid(int tpid)
507 {
508 int i;
509 if (!lastpid || lastpid->tpid != tpid) {
510 for (i=0; i<npids; i++)
511 if (pids[i].tpid == tpid)
512 break;
513 if (i == npids)
514 return NULL;
515 lastpid = &pids[i];
516 }
517 return lastpid;
518 }
519
520 volatile pid_t killpid;
521
522 static void delpid(int tpid)
523 {
524 pidpair *pp = findpid(tpid);
525 if (pp) {
526 pid_t kpid = pp->rpid;
527 killpid = kpid;
528 write(pp->fdout, "exit\n", sizeof("exit"));
529 while (killpid == kpid)
530 usleep(10000);
531 }
532 }
533
534 static void reaper(int sig)
535 {
536 int status, i;
537 pid_t pid = waitpid(-1, &status, 0);
538 if (pid > 0) {
539 fprintf(stderr, "# %s %d\n", WIFEXITED(status) ? "exited" : "killed", pid);
540 for (i=0; i<npids; i++)
541 if (pids[i].rpid == pid)
542 break;
543 assert(i < npids);
544 close(pids[i].fdout);
545 close(pids[i].fdin);
546 for (;i<npids-1; i++)
547 pids[i] = pids[i+1];
548 npids--;
549 killpid = 0;
550 }
551 }
552
553 int main(int argc,char * argv[])
554 {
555 signal(SIGCHLD, reaper);
556
557 while (fgets(inbuf, sizeof(inbuf), stdin)) {
558 pidpair *pp;
559 int tpid, len;
560 char c, *ptr;
561 lcount++;
562
563 if (inbuf[0] == '#' && !strncmp(inbuf+1, SCMP(" killed"))) {
564 sscanf(inbuf+SOFF("killed"),"%d", &tpid);
565 delpid(tpid);
566 continue;
567 }
568
569 if (inbuf[0] != '>')
570 continue;
571 ptr = inbuf+1;
572 sscanf(ptr, "%d:%n", &tpid, &len);
573 pp = findpid(tpid);
574 if (!pp)
575 pp = addpid(tpid); /* new process */
576
577 ptr = inbuf+len+1;
578 len = strlen(ptr);
579 write(pp->fdout, ptr, len); /* send command and wait for ack */
580 read(pp->fdin, &c, 1);
581 }
582 while (npids)
583 delpid(pids[0].tpid);
584 }
585