pgfs_db.c revision 1.2 1 /* $NetBSD: pgfs_db.c,v 1.2 2012/04/11 14:26:44 yamt Exp $ */
2
3 /*-
4 * Copyright (c)2010,2011 YAMAMOTO Takashi,
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 * SUCH DAMAGE.
27 */
28
29 /*
30 * backend db operations
31 */
32
33 #include <sys/cdefs.h>
34 #ifndef lint
35 __RCSID("$NetBSD: pgfs_db.c,v 1.2 2012/04/11 14:26:44 yamt Exp $");
36 #endif /* not lint */
37
38 #include <assert.h>
39 #include <err.h>
40 #include <errno.h>
41 #include <inttypes.h>
42 #include <puffs.h>
43 #include <stdbool.h>
44 #include <stdarg.h>
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <util.h>
48
49 #include <libpq-fe.h>
50
51 #include "pgfs_db.h"
52 #include "pgfs_waitq.h"
53 #include "pgfs_debug.h"
54
55 bool pgfs_dosync = false;
56
57 struct Xconn {
58 TAILQ_ENTRY(Xconn) list;
59 PGconn *conn;
60 struct puffs_cc *blocker;
61 struct puffs_cc *owner;
62 bool in_trans;
63 int id;
64 };
65
66 static void
67 dumperror(struct Xconn *xc, const PGresult *res)
68 {
69 static const struct {
70 const char *name;
71 int code;
72 } fields[] = {
73 #define F(x) { .name = #x, .code = x, }
74 F(PG_DIAG_SEVERITY),
75 F(PG_DIAG_SQLSTATE),
76 F(PG_DIAG_MESSAGE_PRIMARY),
77 F(PG_DIAG_MESSAGE_DETAIL),
78 F(PG_DIAG_MESSAGE_HINT),
79 F(PG_DIAG_STATEMENT_POSITION),
80 F(PG_DIAG_INTERNAL_POSITION),
81 F(PG_DIAG_INTERNAL_QUERY),
82 F(PG_DIAG_CONTEXT),
83 F(PG_DIAG_SOURCE_FILE),
84 F(PG_DIAG_SOURCE_LINE),
85 F(PG_DIAG_SOURCE_FUNCTION),
86 #undef F
87 };
88 unsigned int i;
89
90 if (!pgfs_dodprintf) {
91 return;
92 }
93 assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR ||
94 PQresultStatus(res) == PGRES_FATAL_ERROR);
95 for (i = 0; i < __arraycount(fields); i++) {
96 const char *val = PQresultErrorField(res, fields[i].code);
97
98 if (val == NULL) {
99 continue;
100 }
101 fprintf(stderr, "%s: %s\n", fields[i].name, val);
102 }
103 }
104
105 TAILQ_HEAD(, Xconn) xclist = TAILQ_HEAD_INITIALIZER(xclist);
106 struct waitq xcwaitq = TAILQ_HEAD_INITIALIZER(xcwaitq);
107
108 static struct Xconn *
109 getxc(struct puffs_cc *cc)
110 {
111 struct Xconn *xc;
112
113 assert(cc != NULL);
114 retry:
115 TAILQ_FOREACH(xc, &xclist, list) {
116 if (xc->blocker == NULL) {
117 assert(xc->owner == NULL);
118 xc->owner = cc;
119 DPRINTF("xc %p acquire %p\n", xc, cc);
120 return xc;
121 } else {
122 assert(xc->owner == xc->blocker);
123 }
124 }
125 DPRINTF("no free conn %p\n", cc);
126 waiton(&xcwaitq, cc);
127 goto retry;
128 }
129
130 static void
131 relxc(struct Xconn *xc)
132 {
133
134 assert(xc->in_trans);
135 assert(xc->owner != NULL);
136 xc->in_trans = false;
137 xc->owner = NULL;
138 wakeup_one(&xcwaitq);
139 }
140
141 static void
142 pqwait(struct Xconn *xc)
143 {
144 PGconn *conn = xc->conn;
145 struct puffs_cc *cc = xc->owner;
146
147 if (PQflush(conn)) {
148 errx(EXIT_FAILURE, "PQflush: %s", PQerrorMessage(conn));
149 }
150 if (!PQisBusy(conn)) {
151 return;
152 }
153 assert(xc->blocker == NULL);
154 xc->blocker = cc;
155 DPRINTF("yielding %p\n", cc);
156 /* XXX is it safe to yield before entering mainloop? */
157 puffs_cc_yield(cc);
158 DPRINTF("yield returned %p\n", cc);
159 assert(xc->owner == cc);
160 assert(xc->blocker == cc);
161 xc->blocker = NULL;
162 }
163
164 static int
165 sqltoerrno(const char *sqlstate)
166 {
167 /*
168 * XXX hack; ERRCODE_INTERNAL_ERROR -> EAGAIN to handle
169 * "tuple concurrently updated" errors for lowrite/lo_truncate.
170 *
171 * XXX should map ERRCODE_OUT_OF_MEMORY to EAGAIN?
172 */
173 static const struct {
174 char sqlstate[5];
175 int error;
176 } map[] = {
177 { "00000", 0, }, /* ERRCODE_SUCCESSFUL_COMPLETION */
178 { "02000", ENOENT, }, /* ERRCODE_NO_DATA */
179 { "23505", EEXIST, }, /* ERRCODE_UNIQUE_VIOLATION */
180 { "23514", EINVAL, }, /* ERRCODE_CHECK_VIOLATION */
181 { "40001", EAGAIN, }, /* ERRCODE_T_R_SERIALIZATION_FAILURE */
182 { "40P01", EAGAIN, }, /* ERRCODE_T_R_DEADLOCK_DETECTED */
183 { "42704", ENOENT, }, /* ERRCODE_UNDEFINED_OBJECT */
184 { "53100", ENOSPC, }, /* ERRCODE_DISK_FULL */
185 { "53200", ENOMEM, }, /* ERRCODE_OUT_OF_MEMORY */
186 { "XX000", EAGAIN, }, /* ERRCODE_INTERNAL_ERROR */
187 };
188 unsigned int i;
189
190 for (i = 0; i < __arraycount(map); i++) {
191 if (!memcmp(map[i].sqlstate, sqlstate, 5)) {
192 const int error = map[i].error;
193
194 if (error != 0) {
195 DPRINTF("sqlstate %5s mapped to error %d\n",
196 sqlstate, error);
197 }
198 if (error == EINVAL) {
199 /*
200 * sounds like a bug.
201 */
202 abort();
203 }
204 return error;
205 }
206 }
207 DPRINTF("unknown sqlstate %5s mapped to EIO\n", sqlstate);
208 return EIO;
209 }
210
211 struct cmd {
212 char name[32]; /* name of prepared statement */
213 char *cmd; /* query string */
214 unsigned int nparams;
215 Oid *paramtypes;
216 uint32_t prepared_mask; /* for which connections this is prepared? */
217 unsigned int flags; /* CMD_ flags */
218 };
219
220 #define CMD_NOPREPARE 1 /* don't prepare this command */
221
222 struct cmd *
223 createcmd(const char *cmd, unsigned int flags, ...)
224 {
225 struct cmd *c;
226 va_list ap;
227 const char *cp;
228 unsigned int i;
229 static unsigned int cmdid;
230
231 c = emalloc(sizeof(*c));
232 c->cmd = estrdup(cmd);
233 c->nparams = 0;
234 va_start(ap, flags);
235 for (cp = cmd; *cp != 0; cp++) {
236 if (*cp == '$') { /* XXX */
237 c->nparams++;
238 }
239 }
240 c->paramtypes = emalloc(c->nparams * sizeof(*c->paramtypes));
241 for (i = 0; i < c->nparams; i++) {
242 Oid type = va_arg(ap, Oid);
243 assert(type == BYTEA ||
244 type == INT4OID || type == INT8OID || type == OIDOID ||
245 type == TEXTOID || type == TIMESTAMPTZOID);
246 c->paramtypes[i] = type;
247 }
248 va_end(ap);
249 snprintf(c->name, sizeof(c->name), "%u", cmdid++);
250 if ((flags & CMD_NOPREPARE) != 0) {
251 c->prepared_mask = ~0;
252 } else {
253 c->prepared_mask = 0;
254 }
255 c->flags = flags;
256 return c;
257 }
258
259 static void
260 freecmd(struct cmd *c)
261 {
262
263 free(c->paramtypes);
264 free(c->cmd);
265 free(c);
266 }
267
268 static int
269 fetch_noresult(struct Xconn *xc)
270 {
271 PGresult *res;
272 ExecStatusType status;
273 PGconn *conn = xc->conn;
274 int error;
275
276 pqwait(xc);
277 res = PQgetResult(conn);
278 if (res == NULL) {
279 return ENOENT;
280 }
281 status = PQresultStatus(res);
282 if (status == PGRES_COMMAND_OK) {
283 assert(PQnfields(res) == 0);
284 assert(PQntuples(res) == 0);
285 if (!strcmp(PQcmdTuples(res), "0")) {
286 error = ENOENT;
287 } else {
288 error = 0;
289 }
290 } else if (status == PGRES_FATAL_ERROR) {
291 error = sqltoerrno(PQresultErrorField(res, PG_DIAG_SQLSTATE));
292 assert(error != 0);
293 dumperror(xc, res);
294 } else {
295 errx(1, "%s not command_ok: %d: %s", __func__,
296 (int)status,
297 PQerrorMessage(conn));
298 }
299 PQclear(res);
300 res = PQgetResult(conn);
301 assert(res == NULL);
302 if (error != 0) {
303 DPRINTF("error %d\n", error);
304 }
305 return error;
306 }
307
308 static int
309 preparecmd(struct Xconn *xc, struct cmd *c)
310 {
311 PGconn *conn = xc->conn;
312 const uint32_t mask = 1 << xc->id;
313 int error;
314 int ret;
315
316 if ((c->prepared_mask & mask) != 0) {
317 return 0;
318 }
319 assert((c->flags & CMD_NOPREPARE) == 0);
320 DPRINTF("PREPARE: '%s'\n", c->cmd);
321 ret = PQsendPrepare(conn, c->name, c->cmd, c->nparams, c->paramtypes);
322 if (!ret) {
323 errx(EXIT_FAILURE, "PQsendPrepare: %s",
324 PQerrorMessage(conn));
325 }
326 error = fetch_noresult(xc);
327 if (error != 0) {
328 return error;
329 }
330 c->prepared_mask |= mask;
331 return 0;
332 }
333
334 /*
335 * vsendcmd:
336 *
337 * resultmode is just passed to PQsendQueryParams/PQsendQueryPrepared.
338 * 0 for text and 1 for binary.
339 */
340
341 static int
342 vsendcmd(struct Xconn *xc, int resultmode, struct cmd *c, va_list ap)
343 {
344 PGconn *conn = xc->conn;
345 char **paramvalues;
346 int *paramlengths;
347 int *paramformats;
348 unsigned int i;
349 int error;
350 int ret;
351
352 assert(xc->owner != NULL);
353 assert(xc->blocker == NULL);
354 error = preparecmd(xc, c);
355 if (error != 0) {
356 return error;
357 }
358 paramvalues = emalloc(c->nparams * sizeof(*paramvalues));
359 paramlengths = NULL;
360 paramformats = NULL;
361 DPRINTF("CMD: '%s'\n", c->cmd);
362 for (i = 0; i < c->nparams; i++) {
363 Oid type = c->paramtypes[i];
364 char tmpstore[1024];
365 const char *buf = NULL;
366 intmax_t v = 0; /* XXXgcc */
367 int sz;
368 bool binary = false;
369
370 switch (type) {
371 case BYTEA:
372 buf = va_arg(ap, const void *);
373 sz = (int)va_arg(ap, size_t);
374 binary = true;
375 break;
376 case INT8OID:
377 case OIDOID:
378 case INT4OID:
379 switch (type) {
380 case INT8OID:
381 v = (intmax_t)va_arg(ap, int64_t);
382 break;
383 case OIDOID:
384 v = (intmax_t)va_arg(ap, Oid);
385 break;
386 case INT4OID:
387 v = (intmax_t)va_arg(ap, int32_t);
388 break;
389 default:
390 errx(EXIT_FAILURE, "unknown integer oid %u",
391 type);
392 }
393 buf = tmpstore;
394 sz = snprintf(tmpstore, sizeof(tmpstore),
395 "%jd", v);
396 assert(sz != -1);
397 assert((size_t)sz < sizeof(tmpstore));
398 sz += 1;
399 break;
400 case TEXTOID:
401 case TIMESTAMPTZOID:
402 buf = va_arg(ap, char *);
403 sz = strlen(buf) + 1;
404 break;
405 default:
406 errx(EXIT_FAILURE, "%s: unknown param type %u",
407 __func__, type);
408 }
409 if (binary) {
410 if (paramlengths == NULL) {
411 paramlengths =
412 emalloc(c->nparams * sizeof(*paramformats));
413 }
414 if (paramformats == NULL) {
415 paramformats = ecalloc(1,
416 c->nparams * sizeof(*paramformats));
417 }
418 paramformats[i] = 1;
419 paramlengths[i] = sz;
420 }
421 paramvalues[i] = emalloc(sz);
422 memcpy(paramvalues[i], buf, sz);
423 if (binary) {
424 DPRINTF("\t[%u]=<BINARY>\n", i);
425 } else {
426 DPRINTF("\t[%u]='%s'\n", i, paramvalues[i]);
427 }
428 }
429 if ((c->flags & CMD_NOPREPARE) != 0) {
430 ret = PQsendQueryParams(conn, c->cmd, c->nparams, c->paramtypes,
431 (const char * const *)paramvalues, paramlengths,
432 paramformats, resultmode);
433 } else {
434 ret = PQsendQueryPrepared(conn, c->name, c->nparams,
435 (const char * const *)paramvalues, paramlengths,
436 paramformats, resultmode);
437 }
438 for (i = 0; i < c->nparams; i++) {
439 free(paramvalues[i]);
440 }
441 free(paramvalues);
442 free(paramlengths);
443 free(paramformats);
444 if (!ret) {
445 errx(EXIT_FAILURE, "PQsendQueryPrepared: %s",
446 PQerrorMessage(conn));
447 }
448 return 0;
449 }
450
451 int
452 sendcmd(struct Xconn *xc, struct cmd *c, ...)
453 {
454 va_list ap;
455 int error;
456
457 va_start(ap, c);
458 error = vsendcmd(xc, 0, c, ap);
459 va_end(ap);
460 return error;
461 }
462
463 int
464 sendcmdx(struct Xconn *xc, int resultmode, struct cmd *c, ...)
465 {
466 va_list ap;
467 int error;
468
469 va_start(ap, c);
470 error = vsendcmd(xc, resultmode, c, ap);
471 va_end(ap);
472 return error;
473 }
474
475 /*
476 * simplecmd: a convenient routine to execute a command which returns
477 * no rows synchronously.
478 */
479
480 int
481 simplecmd(struct Xconn *xc, struct cmd *c, ...)
482 {
483 va_list ap;
484 int error;
485
486 va_start(ap, c);
487 error = vsendcmd(xc, 0, c, ap);
488 va_end(ap);
489 if (error != 0) {
490 return error;
491 }
492 return fetch_noresult(xc);
493 }
494
495 void
496 fetchinit(struct fetchstatus *s, struct Xconn *xc)
497 {
498 s->xc = xc;
499 s->res = NULL;
500 s->cur = 0;
501 s->nrows = 0;
502 s->done = false;
503 }
504
505 static intmax_t
506 getint(const char *str)
507 {
508 intmax_t i;
509 char *ep;
510
511 errno = 0;
512 i = strtoimax(str, &ep, 10);
513 assert(errno == 0);
514 assert(str[0] != 0);
515 assert(*ep == 0);
516 return i;
517 }
518
519 static int
520 vfetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, va_list ap)
521 {
522 PGconn *conn = s->xc->conn;
523 unsigned int i;
524
525 assert(conn != NULL);
526 if (s->res == NULL) {
527 ExecStatusType status;
528 int error;
529
530 pqwait(s->xc);
531 s->res = PQgetResult(conn);
532 if (s->res == NULL) {
533 s->done = true;
534 return ENOENT;
535 }
536 status = PQresultStatus(s->res);
537 if (status == PGRES_FATAL_ERROR) {
538 error = sqltoerrno(
539 PQresultErrorField(s->res, PG_DIAG_SQLSTATE));
540 assert(error != 0);
541 dumperror(s->xc, s->res);
542 return error;
543 }
544 if (status != PGRES_TUPLES_OK) {
545 errx(1, "not tuples_ok: %s",
546 PQerrorMessage(conn));
547 }
548 assert((unsigned int)PQnfields(s->res) == n);
549 s->nrows = PQntuples(s->res);
550 if (s->nrows == 0) {
551 DPRINTF("no rows\n");
552 return ENOENT;
553 }
554 assert(s->nrows >= 1);
555 s->cur = 0;
556 }
557 for (i = 0; i < n; i++) {
558 size_t size;
559
560 assert((types[i] != BYTEA) == (PQfformat(s->res, i) == 0));
561 DPRINTF("[%u] PQftype = %d, types = %d, value = '%s'\n",
562 i, PQftype(s->res, i), types[i],
563 PQgetisnull(s->res, s->cur, i) ? "<NULL>" :
564 PQfformat(s->res, i) == 0 ? PQgetvalue(s->res, s->cur, i) :
565 "<BINARY>");
566 assert(PQftype(s->res, i) == types[i]);
567 assert(!PQgetisnull(s->res, s->cur, i));
568 switch(types[i]) {
569 case INT8OID:
570 *va_arg(ap, int64_t *) =
571 getint(PQgetvalue(s->res, s->cur, i));
572 break;
573 case OIDOID:
574 *va_arg(ap, Oid *) =
575 getint(PQgetvalue(s->res, s->cur, i));
576 break;
577 case INT4OID:
578 *va_arg(ap, int32_t *) =
579 getint(PQgetvalue(s->res, s->cur, i));
580 break;
581 case TEXTOID:
582 *va_arg(ap, char **) =
583 estrdup(PQgetvalue(s->res, s->cur, i));
584 break;
585 case BYTEA:
586 size = PQgetlength(s->res, s->cur, i);
587 memcpy(va_arg(ap, void *),
588 PQgetvalue(s->res, s->cur, i), size);
589 *va_arg(ap, size_t *) = size;
590 break;
591 default:
592 errx(EXIT_FAILURE, "%s unknown type %u", __func__,
593 types[i]);
594 }
595 }
596 s->cur++;
597 if (s->cur == s->nrows) {
598 PQclear(s->res);
599 s->res = NULL;
600 }
601 return 0;
602 }
603
604 int
605 fetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, ...)
606 {
607 va_list ap;
608 int error;
609
610 va_start(ap, types);
611 error = vfetchnext(s, n, types, ap);
612 va_end(ap);
613 return error;
614 }
615
616 void
617 fetchdone(struct fetchstatus *s)
618 {
619
620 if (s->res != NULL) {
621 PQclear(s->res);
622 s->res = NULL;
623 }
624 if (!s->done) {
625 PGresult *res;
626 unsigned int n;
627
628 n = 0;
629 while ((res = PQgetResult(s->xc->conn)) != NULL) {
630 PQclear(res);
631 n++;
632 }
633 if (n > 0) {
634 DPRINTF("%u rows dropped\n", n);
635 }
636 }
637 }
638
639 int
640 simplefetch(struct Xconn *xc, Oid type, ...)
641 {
642 struct fetchstatus s;
643 va_list ap;
644 int error;
645
646 fetchinit(&s, xc);
647 va_start(ap, type);
648 error = vfetchnext(&s, 1, &type, ap);
649 va_end(ap);
650 assert(error != 0 || s.res == NULL);
651 fetchdone(&s);
652 return error;
653 }
654
655 static void
656 setlabel(struct Xconn *xc, const char *label)
657 {
658 int error;
659
660 /*
661 * put the label into application_name so that it's shown in
662 * pg_stat_activity. we are sure that our labels don't need
663 * PQescapeStringConn.
664 *
665 * example:
666 * SELECT pid,application_name,query FROM pg_stat_activity
667 * WHERE state <> 'idle'
668 */
669
670 if (label != NULL) {
671 struct cmd *c;
672 char cmd_str[1024];
673
674 snprintf(cmd_str, sizeof(cmd_str),
675 "SET application_name TO 'pgfs: %s'", label);
676 c = createcmd(cmd_str, CMD_NOPREPARE);
677 error = simplecmd(xc, c);
678 freecmd(c);
679 assert(error == 0);
680 } else {
681 #if 0 /* don't bother to clear label */
682 static struct cmd *c;
683
684 CREATECMD_NOPARAM(c, "SET application_name TO 'pgfs'");
685 error = simplecmd(xc, c);
686 assert(error == 0);
687 #endif
688 }
689 }
690
691 struct Xconn *
692 begin(struct puffs_usermount *pu, const char *label)
693 {
694 struct Xconn *xc = getxc(puffs_cc_getcc(pu));
695 static struct cmd *c;
696 int error;
697
698 setlabel(xc, label);
699 CREATECMD_NOPARAM(c, "BEGIN");
700 assert(!xc->in_trans);
701 error = simplecmd(xc, c);
702 assert(error == 0);
703 assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
704 xc->in_trans = true;
705 return xc;
706 }
707
708 struct Xconn *
709 begin_readonly(struct puffs_usermount *pu, const char *label)
710 {
711 struct Xconn *xc = getxc(puffs_cc_getcc(pu));
712 static struct cmd *c;
713 int error;
714
715 setlabel(xc, label);
716 CREATECMD_NOPARAM(c, "BEGIN READ ONLY");
717 assert(!xc->in_trans);
718 error = simplecmd(xc, c);
719 assert(error == 0);
720 assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
721 xc->in_trans = true;
722 return xc;
723 }
724
725 void
726 rollback(struct Xconn *xc)
727 {
728 PGTransactionStatusType status;
729
730 /*
731 * check the status as we are not sure the status of our transaction
732 * after a failed commit.
733 */
734 status = PQtransactionStatus(xc->conn);
735 assert(status != PQTRANS_ACTIVE);
736 assert(status != PQTRANS_UNKNOWN);
737 if (status != PQTRANS_IDLE) {
738 static struct cmd *c;
739 int error;
740
741 assert(status == PQTRANS_INTRANS || status == PQTRANS_INERROR);
742 CREATECMD_NOPARAM(c, "ROLLBACK");
743 error = simplecmd(xc, c);
744 assert(error == 0);
745 }
746 DPRINTF("xc %p rollback %p\n", xc, xc->owner);
747 setlabel(xc, NULL);
748 relxc(xc);
749 }
750
751 int
752 commit(struct Xconn *xc)
753 {
754 static struct cmd *c;
755 int error;
756
757 CREATECMD_NOPARAM(c, "COMMIT");
758 error = simplecmd(xc, c);
759 setlabel(xc, NULL);
760 if (error == 0) {
761 DPRINTF("xc %p commit %p\n", xc, xc->owner);
762 relxc(xc);
763 }
764 return error;
765 }
766
767 int
768 commit_sync(struct Xconn *xc)
769 {
770 static struct cmd *c;
771 int error;
772
773 assert(!pgfs_dosync);
774 CREATECMD_NOPARAM(c, "SET LOCAL SYNCHRONOUS_COMMIT TO ON");
775 error = simplecmd(xc, c);
776 assert(error == 0);
777 return commit(xc);
778 }
779
780 static void
781 pgfs_notice_receiver(void *vp, const PGresult *res)
782 {
783 struct Xconn *xc = vp;
784
785 assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR);
786 fprintf(stderr, "got a notice on %p\n", xc);
787 dumperror(xc, res);
788 }
789
790 static int
791 pgfs_readframe(struct puffs_usermount *pu, struct puffs_framebuf *pufbuf,
792 int fd, int *done)
793 {
794 struct Xconn *xc;
795 PGconn *conn;
796
797 TAILQ_FOREACH(xc, &xclist, list) {
798 if (PQsocket(xc->conn) == fd) {
799 break;
800 }
801 }
802 assert(xc != NULL);
803 conn = xc->conn;
804 PQconsumeInput(conn);
805 if (!PQisBusy(conn)) {
806 if (xc->blocker != NULL) {
807 DPRINTF("schedule %p\n", xc->blocker);
808 puffs_cc_schedule(xc->blocker);
809 } else {
810 DPRINTF("no blockers\n");
811 }
812 }
813 *done = 0;
814 return 0;
815 }
816
817 int
818 pgfs_connectdb(struct puffs_usermount *pu, const char *dbname,
819 const char *dbuser, bool debug, bool synchronous, unsigned int nconn)
820 {
821 const char *keywords[3+1];
822 const char *values[3];
823 unsigned int i;
824
825 if (nconn > 32) {
826 /*
827 * limit from sizeof(cmd->prepared_mask)
828 */
829 return EINVAL;
830 }
831 if (debug) {
832 pgfs_dodprintf = true;
833 }
834 if (synchronous) {
835 pgfs_dosync = true;
836 }
837 i = 0;
838 if (dbname != NULL) {
839 keywords[i] = "dbname";
840 values[i] = dbname;
841 i++;
842 }
843 if (dbuser != NULL) {
844 keywords[i] = "user";
845 values[i] = dbuser;
846 i++;
847 }
848 keywords[i] = "application_name";
849 values[i] = "pgfs";
850 i++;
851 keywords[i] = NULL;
852 puffs_framev_init(pu, pgfs_readframe, NULL, NULL, NULL, NULL);
853 for (i = 0; i < nconn; i++) {
854 struct Xconn *xc;
855 struct Xconn *xc2;
856 static int xcid;
857 PGconn *conn;
858 struct cmd *c;
859 int error;
860
861 conn = PQconnectdbParams(keywords, values, 0);
862 if (conn == NULL) {
863 errx(EXIT_FAILURE,
864 "PQconnectdbParams: unknown failure");
865 }
866 if (PQstatus(conn) != CONNECTION_OK) {
867 /*
868 * XXX sleep and retry on ERRCODE_CANNOT_CONNECT_NOW
869 */
870 errx(EXIT_FAILURE, "PQconnectdbParams: %s",
871 PQerrorMessage(conn));
872 }
873 DPRINTF("protocol version %d\n", PQprotocolVersion(conn));
874 puffs_framev_addfd(pu, PQsocket(conn), PUFFS_FBIO_READ);
875 xc = emalloc(sizeof(*xc));
876 xc->conn = conn;
877 xc->blocker = NULL;
878 xc->owner = NULL;
879 xc->in_trans = false;
880 xc->id = xcid++;
881 assert(xc->id < 32);
882 PQsetNoticeReceiver(conn, pgfs_notice_receiver, xc);
883 TAILQ_INSERT_HEAD(&xclist, xc, list);
884 xc2 = begin(pu, NULL);
885 assert(xc2 == xc);
886 c = createcmd("SET search_path TO pgfs", CMD_NOPREPARE);
887 error = simplecmd(xc, c);
888 assert(error == 0);
889 freecmd(c);
890 c = createcmd("SET SESSION CHARACTERISTICS AS "
891 "TRANSACTION ISOLATION LEVEL REPEATABLE READ",
892 CMD_NOPREPARE);
893 error = simplecmd(xc, c);
894 assert(error == 0);
895 freecmd(c);
896 c = createcmd("SET SESSION TIME ZONE UTC", CMD_NOPREPARE);
897 error = simplecmd(xc, c);
898 assert(error == 0);
899 freecmd(c);
900 if (!pgfs_dosync) {
901 c = createcmd("SET SESSION SYNCHRONOUS_COMMIT TO OFF",
902 CMD_NOPREPARE);
903 error = simplecmd(xc, c);
904 assert(error == 0);
905 freecmd(c);
906 }
907 if (debug) {
908 struct fetchstatus s;
909 static const Oid types[] = { INT8OID, };
910 uint64_t pid;
911
912 c = createcmd("SELECT pg_backend_pid()::int8;",
913 CMD_NOPREPARE);
914 error = sendcmd(xc, c);
915 assert(error == 0);
916 fetchinit(&s, xc);
917 error = FETCHNEXT(&s, types, &pid);
918 fetchdone(&s);
919 assert(error == 0);
920 DPRINTF("xc %p backend pid %" PRIu64 "\n", xc, pid);
921 }
922 error = commit(xc);
923 assert(error == 0);
924 assert(xc->owner == NULL);
925 }
926 /*
927 * XXX cleanup unlinked files here? what to do when the filesystem
928 * is shared?
929 */
930 return 0;
931 }
932
933 struct waitq flushwaitq = TAILQ_HEAD_INITIALIZER(flushwaitq);
934 struct puffs_cc *flusher = NULL;
935
936 int
937 flush_xacts(struct puffs_usermount *pu)
938 {
939 struct puffs_cc *cc = puffs_cc_getcc(pu);
940 struct Xconn *xc;
941 static struct cmd *c;
942 uint64_t dummy;
943 int error;
944
945 /*
946 * flush all previously issued asynchronous transactions.
947 *
948 * XXX
949 * unfortunately it seems that there is no clean way to tell
950 * PostgreSQL flush XLOG. we could perform a CHECKPOINT but it's
951 * too expensive and overkill for our purpose.
952 * besides, PostgreSQL has an optimization to skip XLOG flushing
953 * for transactions which didn't produce WAL records.
954 * (changeset f6a0863e3cb72763490ceca2c558d5ef2dddd5f2)
955 * it means that an empty transaction ("BEGIN; COMMIT;"), which
956 * doesn't produce any WAL records, doesn't flush the XLOG even if
957 * synchronous_commit=on. we issues a dummy setval() to avoid the
958 * optimization.
959 * on the other hand, we try to avoid creating unnecessary WAL activity
960 * by serializing flushing and checking XLOG locations.
961 */
962
963 assert(!pgfs_dosync);
964 if (flusher != NULL) { /* serialize flushers */
965 DPRINTF("%p flush in progress %p\n", cc, flusher);
966 waiton(&flushwaitq, cc);
967 assert(flusher == NULL);
968 }
969 DPRINTF("%p start flushing\n", cc);
970 flusher = cc;
971 retry:
972 xc = begin(pu, "flush");
973 CREATECMD_NOPARAM(c, "SELECT setval('dummyseq', 1) WHERE "
974 "pg_current_xlog_insert_location() <> pg_current_xlog_location()");
975 error = sendcmd(xc, c);
976 if (error != 0) {
977 goto got_error;
978 }
979 error = simplefetch(xc, INT8OID, &dummy);
980 assert(error != 0 || dummy == 1);
981 if (error == ENOENT) {
982 /*
983 * there seems to be nothing to flush.
984 */
985 DPRINTF("%p no sync\n", cc);
986 error = 0;
987 }
988 if (error != 0) {
989 goto got_error;
990 }
991 error = commit_sync(xc);
992 if (error != 0) {
993 goto got_error;
994 }
995 goto done;
996 got_error:
997 rollback(xc);
998 if (error == EAGAIN) {
999 goto retry;
1000 }
1001 done:
1002 assert(flusher == cc);
1003 flusher = NULL;
1004 wakeup_one(&flushwaitq);
1005 DPRINTF("%p end flushing error=%d\n", cc, error);
1006 return error;
1007 }
1008