Home | History | Annotate | Line # | Download | only in pgfs
      1 /*	$NetBSD: pgfs_db.c,v 1.3 2012/04/11 14:27:43 yamt Exp $	*/
      2 
      3 /*-
      4  * Copyright (c)2010,2011 YAMAMOTO Takashi,
      5  * All rights reserved.
      6  *
      7  * Redistribution and use in source and binary forms, with or without
      8  * modification, are permitted provided that the following conditions
      9  * are met:
     10  * 1. Redistributions of source code must retain the above copyright
     11  *    notice, this list of conditions and the following disclaimer.
     12  * 2. Redistributions in binary form must reproduce the above copyright
     13  *    notice, this list of conditions and the following disclaimer in the
     14  *    documentation and/or other materials provided with the distribution.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
     17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
     22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     26  * SUCH DAMAGE.
     27  */
     28 
     29 /*
     30  * backend db operations
     31  */
     32 
     33 #include <sys/cdefs.h>
     34 #ifndef lint
     35 __RCSID("$NetBSD: pgfs_db.c,v 1.3 2012/04/11 14:27:43 yamt Exp $");
     36 #endif /* not lint */
     37 
     38 #include <assert.h>
     39 #include <err.h>
     40 #include <errno.h>
     41 #include <inttypes.h>
     42 #include <puffs.h>
     43 #include <stdbool.h>
     44 #include <stdarg.h>
     45 #include <stdio.h>
     46 #include <stdlib.h>
     47 #include <util.h>
     48 
     49 #include <libpq-fe.h>
     50 
     51 #include "pgfs_db.h"
     52 #include "pgfs_waitq.h"
     53 #include "pgfs_debug.h"
     54 
     55 bool pgfs_dosync = false;
     56 
     57 struct Xconn {
     58 	TAILQ_ENTRY(Xconn) list;
     59 	PGconn *conn;
     60 	struct puffs_cc *blocker;
     61 	struct puffs_cc *owner;
     62 	bool in_trans;
     63 	int id;
     64 	const char *label;
     65 };
     66 
     67 static void
     68 dumperror(struct Xconn *xc, const PGresult *res)
     69 {
     70 	static const struct {
     71 		const char *name;
     72 		int code;
     73 	} fields[] = {
     74 #define F(x)	{ .name = #x, .code = x, }
     75 		F(PG_DIAG_SEVERITY),
     76 		F(PG_DIAG_SQLSTATE),
     77 		F(PG_DIAG_MESSAGE_PRIMARY),
     78 		F(PG_DIAG_MESSAGE_DETAIL),
     79 		F(PG_DIAG_MESSAGE_HINT),
     80 		F(PG_DIAG_STATEMENT_POSITION),
     81 		F(PG_DIAG_INTERNAL_POSITION),
     82 		F(PG_DIAG_INTERNAL_QUERY),
     83 		F(PG_DIAG_CONTEXT),
     84 		F(PG_DIAG_SOURCE_FILE),
     85 		F(PG_DIAG_SOURCE_LINE),
     86 		F(PG_DIAG_SOURCE_FUNCTION),
     87 #undef F
     88 	};
     89 	unsigned int i;
     90 
     91 	if (!pgfs_dodprintf) {
     92 		return;
     93 	}
     94 	assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR ||
     95 	    PQresultStatus(res) == PGRES_FATAL_ERROR);
     96 	for (i = 0; i < __arraycount(fields); i++) {
     97 		const char *val = PQresultErrorField(res, fields[i].code);
     98 
     99 		if (val == NULL) {
    100 			continue;
    101 		}
    102 		fprintf(stderr, "%s: %s\n", fields[i].name, val);
    103 	}
    104 }
    105 
    106 TAILQ_HEAD(, Xconn) xclist = TAILQ_HEAD_INITIALIZER(xclist);
    107 struct waitq xcwaitq = TAILQ_HEAD_INITIALIZER(xcwaitq);
    108 
    109 static struct Xconn *
    110 getxc(struct puffs_cc *cc)
    111 {
    112 	struct Xconn *xc;
    113 
    114 	assert(cc != NULL);
    115 retry:
    116 	TAILQ_FOREACH(xc, &xclist, list) {
    117 		if (xc->blocker == NULL) {
    118 			assert(xc->owner == NULL);
    119 			xc->owner = cc;
    120 			DPRINTF("xc %p acquire %p\n", xc, cc);
    121 			return xc;
    122 		} else {
    123 			assert(xc->owner == xc->blocker);
    124 		}
    125 	}
    126 	DPRINTF("no free conn %p\n", cc);
    127 	waiton(&xcwaitq, cc);
    128 	goto retry;
    129 }
    130 
    131 static void
    132 relxc(struct Xconn *xc)
    133 {
    134 
    135 	assert(xc->in_trans);
    136 	assert(xc->owner != NULL);
    137 	xc->in_trans = false;
    138 	xc->owner = NULL;
    139 	wakeup_one(&xcwaitq);
    140 }
    141 
    142 static void
    143 pqwait(struct Xconn *xc)
    144 {
    145 	PGconn *conn = xc->conn;
    146 	struct puffs_cc *cc = xc->owner;
    147 
    148 	if (PQflush(conn)) {
    149 		errx(EXIT_FAILURE, "PQflush: %s", PQerrorMessage(conn));
    150 	}
    151 	if (!PQisBusy(conn)) {
    152 		return;
    153 	}
    154 	assert(xc->blocker == NULL);
    155 	xc->blocker = cc;
    156 	DPRINTF("yielding %p\n", cc);
    157 	/* XXX is it safe to yield before entering mainloop? */
    158 	puffs_cc_yield(cc);
    159 	DPRINTF("yield returned %p\n", cc);
    160 	assert(xc->owner == cc);
    161 	assert(xc->blocker == cc);
    162 	xc->blocker = NULL;
    163 }
    164 
    165 static int
    166 sqltoerrno(const char *sqlstate)
    167 {
    168 	/*
    169 	 * XXX hack; ERRCODE_INTERNAL_ERROR -> EAGAIN to handle
    170 	 * "tuple concurrently updated" errors for lowrite/lo_truncate.
    171 	 *
    172 	 * XXX should map ERRCODE_OUT_OF_MEMORY to EAGAIN?
    173 	 */
    174 	static const struct {
    175 		char sqlstate[5];
    176 		int error;
    177 	} map[] = {
    178 		{ "00000", 0, },	/* ERRCODE_SUCCESSFUL_COMPLETION */
    179 		{ "02000", ENOENT, },	/* ERRCODE_NO_DATA */
    180 		{ "23505", EEXIST, },	/* ERRCODE_UNIQUE_VIOLATION */
    181 		{ "23514", EINVAL, },	/* ERRCODE_CHECK_VIOLATION */
    182 		{ "40001", EAGAIN, },	/* ERRCODE_T_R_SERIALIZATION_FAILURE */
    183 		{ "40P01", EAGAIN, },	/* ERRCODE_T_R_DEADLOCK_DETECTED */
    184 		{ "42704", ENOENT, },	/* ERRCODE_UNDEFINED_OBJECT */
    185 		{ "53100", ENOSPC, },	/* ERRCODE_DISK_FULL */
    186 		{ "53200", ENOMEM, },	/* ERRCODE_OUT_OF_MEMORY */
    187 		{ "XX000", EAGAIN, },	/* ERRCODE_INTERNAL_ERROR */
    188 	};
    189 	unsigned int i;
    190 
    191 	for (i = 0; i < __arraycount(map); i++) {
    192 		if (!memcmp(map[i].sqlstate, sqlstate, 5)) {
    193 			const int error = map[i].error;
    194 
    195 			if (error != 0) {
    196 				DPRINTF("sqlstate %5s mapped to error %d\n",
    197 				    sqlstate, error);
    198 			}
    199 			if (error == EINVAL) {
    200 				/*
    201 				 * sounds like a bug.
    202 				 */
    203 				abort();
    204 			}
    205 			return error;
    206 		}
    207 	}
    208 	DPRINTF("unknown sqlstate %5s mapped to EIO\n", sqlstate);
    209 	return EIO;
    210 }
    211 
    212 struct cmd {
    213 	char name[32];		/* name of prepared statement */
    214 	char *cmd;		/* query string */
    215 	unsigned int nparams;
    216 	Oid *paramtypes;
    217 	uint32_t prepared_mask;	/* for which connections this is prepared? */
    218 	unsigned int flags;	/* CMD_ flags */
    219 };
    220 
    221 #define	CMD_NOPREPARE	1	/* don't prepare this command */
    222 
    223 struct cmd *
    224 createcmd(const char *cmd, unsigned int flags, ...)
    225 {
    226 	struct cmd *c;
    227 	va_list ap;
    228 	const char *cp;
    229 	unsigned int i;
    230 	static unsigned int cmdid;
    231 
    232 	c = emalloc(sizeof(*c));
    233 	c->cmd = estrdup(cmd);
    234 	c->nparams = 0;
    235 	va_start(ap, flags);
    236 	for (cp = cmd; *cp != 0; cp++) {
    237 		if (*cp == '$') { /* XXX */
    238 			c->nparams++;
    239 		}
    240 	}
    241 	c->paramtypes = emalloc(c->nparams * sizeof(*c->paramtypes));
    242 	for (i = 0; i < c->nparams; i++) {
    243 		Oid type = va_arg(ap, Oid);
    244 		assert(type == BYTEA ||
    245 		    type == INT4OID || type == INT8OID || type == OIDOID ||
    246 		    type == TEXTOID || type == TIMESTAMPTZOID);
    247 		c->paramtypes[i] = type;
    248 	}
    249 	va_end(ap);
    250 	snprintf(c->name, sizeof(c->name), "%u", cmdid++);
    251 	if ((flags & CMD_NOPREPARE) != 0) {
    252 		c->prepared_mask = ~0;
    253 	} else {
    254 		c->prepared_mask = 0;
    255 	}
    256 	c->flags = flags;
    257 	return c;
    258 }
    259 
    260 static void
    261 freecmd(struct cmd *c)
    262 {
    263 
    264 	free(c->paramtypes);
    265 	free(c->cmd);
    266 	free(c);
    267 }
    268 
    269 static int
    270 fetch_noresult(struct Xconn *xc)
    271 {
    272 	PGresult *res;
    273 	ExecStatusType status;
    274 	PGconn *conn = xc->conn;
    275 	int error;
    276 
    277 	pqwait(xc);
    278 	res = PQgetResult(conn);
    279 	if (res == NULL) {
    280 		return ENOENT;
    281 	}
    282 	status = PQresultStatus(res);
    283 	if (status == PGRES_COMMAND_OK) {
    284 		assert(PQnfields(res) == 0);
    285 		assert(PQntuples(res) == 0);
    286 		if (!strcmp(PQcmdTuples(res), "0")) {
    287 			error = ENOENT;
    288 		} else {
    289 			error = 0;
    290 		}
    291 	} else if (status == PGRES_FATAL_ERROR) {
    292 		error = sqltoerrno(PQresultErrorField(res, PG_DIAG_SQLSTATE));
    293 		assert(error != 0);
    294 		dumperror(xc, res);
    295 	} else {
    296 		errx(1, "%s not command_ok: %d: %s", __func__,
    297 		    (int)status,
    298 		    PQerrorMessage(conn));
    299 	}
    300 	PQclear(res);
    301 	res = PQgetResult(conn);
    302 	assert(res == NULL);
    303 	if (error != 0) {
    304 		DPRINTF("error %d\n", error);
    305 	}
    306 	return error;
    307 }
    308 
    309 static int
    310 preparecmd(struct Xconn *xc, struct cmd *c)
    311 {
    312 	PGconn *conn = xc->conn;
    313 	const uint32_t mask = 1 << xc->id;
    314 	int error;
    315 	int ret;
    316 
    317 	if ((c->prepared_mask & mask) != 0) {
    318 		return 0;
    319 	}
    320 	assert((c->flags & CMD_NOPREPARE) == 0);
    321 	DPRINTF("PREPARE: '%s'\n", c->cmd);
    322 	ret = PQsendPrepare(conn, c->name, c->cmd, c->nparams, c->paramtypes);
    323 	if (!ret) {
    324 		errx(EXIT_FAILURE, "PQsendPrepare: %s",
    325 		    PQerrorMessage(conn));
    326 	}
    327 	error = fetch_noresult(xc);
    328 	if (error != 0) {
    329 		return error;
    330 	}
    331 	c->prepared_mask |= mask;
    332 	return 0;
    333 }
    334 
    335 /*
    336  * vsendcmd:
    337  *
    338  * resultmode is just passed to PQsendQueryParams/PQsendQueryPrepared.
    339  * 0 for text and 1 for binary.
    340  */
    341 
    342 static int
    343 vsendcmd(struct Xconn *xc, int resultmode, struct cmd *c, va_list ap)
    344 {
    345 	PGconn *conn = xc->conn;
    346 	char **paramvalues;
    347 	int *paramlengths;
    348 	int *paramformats;
    349 	unsigned int i;
    350 	int error;
    351 	int ret;
    352 
    353 	assert(xc->owner != NULL);
    354 	assert(xc->blocker == NULL);
    355 	error = preparecmd(xc, c);
    356 	if (error != 0) {
    357 		return error;
    358 	}
    359 	paramvalues = emalloc(c->nparams * sizeof(*paramvalues));
    360 	paramlengths = NULL;
    361 	paramformats = NULL;
    362 	DPRINTF("CMD: '%s'\n", c->cmd);
    363 	for (i = 0; i < c->nparams; i++) {
    364 		Oid type = c->paramtypes[i];
    365 		char tmpstore[1024];
    366 		const char *buf = NULL;
    367 		intmax_t v = 0; /* XXXgcc */
    368 		int sz;
    369 		bool binary = false;
    370 
    371 		switch (type) {
    372 		case BYTEA:
    373 			buf = va_arg(ap, const void *);
    374 			sz = (int)va_arg(ap, size_t);
    375 			binary = true;
    376 			break;
    377 		case INT8OID:
    378 		case OIDOID:
    379 		case INT4OID:
    380 			switch (type) {
    381 			case INT8OID:
    382 				v = (intmax_t)va_arg(ap, int64_t);
    383 				break;
    384 			case OIDOID:
    385 				v = (intmax_t)va_arg(ap, Oid);
    386 				break;
    387 			case INT4OID:
    388 				v = (intmax_t)va_arg(ap, int32_t);
    389 				break;
    390 			default:
    391 				errx(EXIT_FAILURE, "unknown integer oid %u",
    392 				    type);
    393 			}
    394 			buf = tmpstore;
    395 			sz = snprintf(tmpstore, sizeof(tmpstore),
    396 			    "%jd", v);
    397 			assert(sz != -1);
    398 			assert((size_t)sz < sizeof(tmpstore));
    399 			sz += 1;
    400 			break;
    401 		case TEXTOID:
    402 		case TIMESTAMPTZOID:
    403 			buf = va_arg(ap, char *);
    404 			sz = strlen(buf) + 1;
    405 			break;
    406 		default:
    407 			errx(EXIT_FAILURE, "%s: unknown param type %u",
    408 			    __func__, type);
    409 		}
    410 		if (binary) {
    411 			if (paramlengths == NULL) {
    412 				paramlengths =
    413 				    emalloc(c->nparams * sizeof(*paramformats));
    414 			}
    415 			if (paramformats == NULL) {
    416 				paramformats = ecalloc(1,
    417 				    c->nparams * sizeof(*paramformats));
    418 			}
    419 			paramformats[i] = 1;
    420 			paramlengths[i] = sz;
    421 		}
    422 		paramvalues[i] = emalloc(sz);
    423 		memcpy(paramvalues[i], buf, sz);
    424 		if (binary) {
    425 			DPRINTF("\t[%u]=<BINARY>\n", i);
    426 		} else {
    427 			DPRINTF("\t[%u]='%s'\n", i, paramvalues[i]);
    428 		}
    429 	}
    430 	if ((c->flags & CMD_NOPREPARE) != 0) {
    431 		ret = PQsendQueryParams(conn, c->cmd, c->nparams, c->paramtypes,
    432 		    (const char * const *)paramvalues, paramlengths,
    433 		    paramformats, resultmode);
    434 	} else {
    435 		ret = PQsendQueryPrepared(conn, c->name, c->nparams,
    436 		    (const char * const *)paramvalues, paramlengths,
    437 		    paramformats, resultmode);
    438 	}
    439 	for (i = 0; i < c->nparams; i++) {
    440 		free(paramvalues[i]);
    441 	}
    442 	free(paramvalues);
    443 	free(paramlengths);
    444 	free(paramformats);
    445 	if (!ret) {
    446 		errx(EXIT_FAILURE, "PQsendQueryPrepared: %s",
    447 		    PQerrorMessage(conn));
    448 	}
    449 	return 0;
    450 }
    451 
    452 int
    453 sendcmd(struct Xconn *xc, struct cmd *c, ...)
    454 {
    455 	va_list ap;
    456 	int error;
    457 
    458 	va_start(ap, c);
    459 	error = vsendcmd(xc, 0, c, ap);
    460 	va_end(ap);
    461 	return error;
    462 }
    463 
    464 int
    465 sendcmdx(struct Xconn *xc, int resultmode, struct cmd *c, ...)
    466 {
    467 	va_list ap;
    468 	int error;
    469 
    470 	va_start(ap, c);
    471 	error = vsendcmd(xc, resultmode, c, ap);
    472 	va_end(ap);
    473 	return error;
    474 }
    475 
    476 /*
    477  * simplecmd: a convenient routine to execute a command which returns
    478  * no rows synchronously.
    479  */
    480 
    481 int
    482 simplecmd(struct Xconn *xc, struct cmd *c, ...)
    483 {
    484 	va_list ap;
    485 	int error;
    486 
    487 	va_start(ap, c);
    488 	error = vsendcmd(xc, 0, c, ap);
    489 	va_end(ap);
    490 	if (error != 0) {
    491 		return error;
    492 	}
    493 	return fetch_noresult(xc);
    494 }
    495 
    496 void
    497 fetchinit(struct fetchstatus *s, struct Xconn *xc)
    498 {
    499 	s->xc = xc;
    500 	s->res = NULL;
    501 	s->cur = 0;
    502 	s->nrows = 0;
    503 	s->done = false;
    504 }
    505 
    506 static intmax_t
    507 getint(const char *str)
    508 {
    509 	intmax_t i;
    510 	char *ep;
    511 
    512 	errno = 0;
    513 	i = strtoimax(str, &ep, 10);
    514 	assert(errno == 0);
    515 	assert(str[0] != 0);
    516 	assert(*ep == 0);
    517 	return i;
    518 }
    519 
    520 static int
    521 vfetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, va_list ap)
    522 {
    523 	PGconn *conn = s->xc->conn;
    524 	unsigned int i;
    525 
    526 	assert(conn != NULL);
    527 	if (s->res == NULL) {
    528 		ExecStatusType status;
    529 		int error;
    530 
    531 		pqwait(s->xc);
    532 		s->res = PQgetResult(conn);
    533 		if (s->res == NULL) {
    534 			s->done = true;
    535 			return ENOENT;
    536 		}
    537 		status = PQresultStatus(s->res);
    538 		if (status == PGRES_FATAL_ERROR) {
    539 			error = sqltoerrno(
    540 			    PQresultErrorField(s->res, PG_DIAG_SQLSTATE));
    541 			assert(error != 0);
    542 			dumperror(s->xc, s->res);
    543 			return error;
    544 		}
    545 		if (status != PGRES_TUPLES_OK) {
    546 			errx(1, "not tuples_ok: %s",
    547 			    PQerrorMessage(conn));
    548 		}
    549 		assert((unsigned int)PQnfields(s->res) == n);
    550 		s->nrows = PQntuples(s->res);
    551 		if (s->nrows == 0) {
    552 			DPRINTF("no rows\n");
    553 			return ENOENT;
    554 		}
    555 		assert(s->nrows >= 1);
    556 		s->cur = 0;
    557 	}
    558 	for (i = 0; i < n; i++) {
    559 		size_t size;
    560 
    561 		assert((types[i] != BYTEA) == (PQfformat(s->res, i) == 0));
    562 		DPRINTF("[%u] PQftype = %d, types = %d, value = '%s'\n",
    563 		    i, PQftype(s->res, i), types[i],
    564 		    PQgetisnull(s->res, s->cur, i) ? "<NULL>" :
    565 		    PQfformat(s->res, i) == 0 ? PQgetvalue(s->res, s->cur, i) :
    566 		    "<BINARY>");
    567 		assert(PQftype(s->res, i) == types[i]);
    568 		assert(!PQgetisnull(s->res, s->cur, i));
    569 		switch(types[i]) {
    570 		case INT8OID:
    571 			*va_arg(ap, int64_t *) =
    572 			    getint(PQgetvalue(s->res, s->cur, i));
    573 			break;
    574 		case OIDOID:
    575 			*va_arg(ap, Oid *) =
    576 			    getint(PQgetvalue(s->res, s->cur, i));
    577 			break;
    578 		case INT4OID:
    579 			*va_arg(ap, int32_t *) =
    580 			    getint(PQgetvalue(s->res, s->cur, i));
    581 			break;
    582 		case TEXTOID:
    583 			*va_arg(ap, char **) =
    584 			    estrdup(PQgetvalue(s->res, s->cur, i));
    585 			break;
    586 		case BYTEA:
    587 			size = PQgetlength(s->res, s->cur, i);
    588 			memcpy(va_arg(ap, void *),
    589 			    PQgetvalue(s->res, s->cur, i), size);
    590 			*va_arg(ap, size_t *) = size;
    591 			break;
    592 		default:
    593 			errx(EXIT_FAILURE, "%s unknown type %u", __func__,
    594 			    types[i]);
    595 		}
    596 	}
    597 	s->cur++;
    598 	if (s->cur == s->nrows) {
    599 		PQclear(s->res);
    600 		s->res = NULL;
    601 	}
    602 	return 0;
    603 }
    604 
    605 int
    606 fetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, ...)
    607 {
    608 	va_list ap;
    609 	int error;
    610 
    611 	va_start(ap, types);
    612 	error = vfetchnext(s, n, types, ap);
    613 	va_end(ap);
    614 	return error;
    615 }
    616 
    617 void
    618 fetchdone(struct fetchstatus *s)
    619 {
    620 
    621 	if (s->res != NULL) {
    622 		PQclear(s->res);
    623 		s->res = NULL;
    624 	}
    625 	if (!s->done) {
    626 		PGresult *res;
    627 		unsigned int n;
    628 
    629 		n = 0;
    630 		while ((res = PQgetResult(s->xc->conn)) != NULL) {
    631 			PQclear(res);
    632 			n++;
    633 		}
    634 		if (n > 0) {
    635 			DPRINTF("%u rows dropped\n", n);
    636 		}
    637 	}
    638 }
    639 
    640 int
    641 simplefetch(struct Xconn *xc, Oid type, ...)
    642 {
    643 	struct fetchstatus s;
    644 	va_list ap;
    645 	int error;
    646 
    647 	fetchinit(&s, xc);
    648 	va_start(ap, type);
    649 	error = vfetchnext(&s, 1, &type, ap);
    650 	va_end(ap);
    651 	assert(error != 0 || s.res == NULL);
    652 	fetchdone(&s);
    653 	return error;
    654 }
    655 
    656 /*
    657  * setlabel: set the descriptive label for the connection.
    658  *
    659  * we use simple pointer comparison for label equality check.
    660  */
    661 static void
    662 setlabel(struct Xconn *xc, const char *label)
    663 {
    664 	int error;
    665 
    666 	/*
    667 	 * put the label into application_name so that it's shown in
    668 	 * pg_stat_activity.  we are sure that our labels don't need
    669 	 * PQescapeStringConn.
    670 	 *
    671 	 * example:
    672 	 *	SELECT pid,application_name,query FROM pg_stat_activity
    673 	 *	WHERE state <> 'idle'
    674 	 */
    675 
    676 	if (label != NULL && label != xc->label) {
    677 		struct cmd *c;
    678 		char cmd_str[1024];
    679 
    680 		snprintf(cmd_str, sizeof(cmd_str),
    681 		    "SET application_name TO 'pgfs: %s'", label);
    682 		c = createcmd(cmd_str, CMD_NOPREPARE);
    683 		error = simplecmd(xc, c);
    684 		freecmd(c);
    685 		assert(error == 0);
    686 		xc->label = label;
    687 	} else {
    688 #if 0 /* don't bother to clear label */
    689 		static struct cmd *c;
    690 
    691 		CREATECMD_NOPARAM(c, "SET application_name TO 'pgfs'");
    692 		error = simplecmd(xc, c);
    693 		assert(error == 0);
    694 #endif
    695 	}
    696 }
    697 
    698 struct Xconn *
    699 begin(struct puffs_usermount *pu, const char *label)
    700 {
    701 	struct Xconn *xc = getxc(puffs_cc_getcc(pu));
    702 	static struct cmd *c;
    703 	int error;
    704 
    705 	setlabel(xc, label);
    706 	CREATECMD_NOPARAM(c, "BEGIN");
    707 	assert(!xc->in_trans);
    708 	error = simplecmd(xc, c);
    709 	assert(error == 0);
    710 	assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
    711 	xc->in_trans = true;
    712 	return xc;
    713 }
    714 
    715 struct Xconn *
    716 begin_readonly(struct puffs_usermount *pu, const char *label)
    717 {
    718 	struct Xconn *xc = getxc(puffs_cc_getcc(pu));
    719 	static struct cmd *c;
    720 	int error;
    721 
    722 	setlabel(xc, label);
    723 	CREATECMD_NOPARAM(c, "BEGIN READ ONLY");
    724 	assert(!xc->in_trans);
    725 	error = simplecmd(xc, c);
    726 	assert(error == 0);
    727 	assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
    728 	xc->in_trans = true;
    729 	return xc;
    730 }
    731 
    732 void
    733 rollback(struct Xconn *xc)
    734 {
    735 	PGTransactionStatusType status;
    736 
    737 	/*
    738 	 * check the status as we are not sure the status of our transaction
    739 	 * after a failed commit.
    740 	 */
    741 	status = PQtransactionStatus(xc->conn);
    742 	assert(status != PQTRANS_ACTIVE);
    743 	assert(status != PQTRANS_UNKNOWN);
    744 	if (status != PQTRANS_IDLE) {
    745 		static struct cmd *c;
    746 		int error;
    747 
    748 		assert(status == PQTRANS_INTRANS || status == PQTRANS_INERROR);
    749 		CREATECMD_NOPARAM(c, "ROLLBACK");
    750 		error = simplecmd(xc, c);
    751 		assert(error == 0);
    752 	}
    753 	DPRINTF("xc %p rollback %p\n", xc, xc->owner);
    754 	setlabel(xc, NULL);
    755 	relxc(xc);
    756 }
    757 
    758 int
    759 commit(struct Xconn *xc)
    760 {
    761 	static struct cmd *c;
    762 	int error;
    763 
    764 	CREATECMD_NOPARAM(c, "COMMIT");
    765 	error = simplecmd(xc, c);
    766 	setlabel(xc, NULL);
    767 	if (error == 0) {
    768 		DPRINTF("xc %p commit %p\n", xc, xc->owner);
    769 		relxc(xc);
    770 	}
    771 	return error;
    772 }
    773 
    774 int
    775 commit_sync(struct Xconn *xc)
    776 {
    777 	static struct cmd *c;
    778 	int error;
    779 
    780 	assert(!pgfs_dosync);
    781 	CREATECMD_NOPARAM(c, "SET LOCAL SYNCHRONOUS_COMMIT TO ON");
    782 	error = simplecmd(xc, c);
    783 	assert(error == 0);
    784 	return commit(xc);
    785 }
    786 
    787 static void
    788 pgfs_notice_receiver(void *vp, const PGresult *res)
    789 {
    790 	struct Xconn *xc = vp;
    791 
    792 	assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR);
    793 	fprintf(stderr, "got a notice on %p\n", xc);
    794 	dumperror(xc, res);
    795 }
    796 
    797 static int
    798 pgfs_readframe(struct puffs_usermount *pu, struct puffs_framebuf *pufbuf,
    799     int fd, int *done)
    800 {
    801 	struct Xconn *xc;
    802 	PGconn *conn;
    803 
    804 	TAILQ_FOREACH(xc, &xclist, list) {
    805 		if (PQsocket(xc->conn) == fd) {
    806 			break;
    807 		}
    808 	}
    809 	assert(xc != NULL);
    810 	conn = xc->conn;
    811 	PQconsumeInput(conn);
    812 	if (!PQisBusy(conn)) {
    813 		if (xc->blocker != NULL) {
    814 			DPRINTF("schedule %p\n", xc->blocker);
    815 			puffs_cc_schedule(xc->blocker);
    816 		} else {
    817 			DPRINTF("no blockers\n");
    818 		}
    819 	}
    820 	*done = 0;
    821 	return 0;
    822 }
    823 
    824 int
    825 pgfs_connectdb(struct puffs_usermount *pu, const char *dbname,
    826     const char *dbuser, bool debug, bool synchronous, unsigned int nconn)
    827 {
    828 	const char *keywords[3+1];
    829 	const char *values[3];
    830 	unsigned int i;
    831 
    832 	if (nconn > 32) {
    833 		/*
    834 		 * limit from sizeof(cmd->prepared_mask)
    835 		 */
    836 		return EINVAL;
    837 	}
    838 	if (debug) {
    839 		pgfs_dodprintf = true;
    840 	}
    841 	if (synchronous) {
    842 		pgfs_dosync = true;
    843 	}
    844 	i = 0;
    845 	if (dbname != NULL) {
    846 		keywords[i] = "dbname";
    847 		values[i] = dbname;
    848 		i++;
    849 	}
    850 	if (dbuser != NULL) {
    851 		keywords[i] = "user";
    852 		values[i] = dbuser;
    853 		i++;
    854 	}
    855 	keywords[i] = "application_name";
    856 	values[i] = "pgfs";
    857 	i++;
    858 	keywords[i] = NULL;
    859 	puffs_framev_init(pu, pgfs_readframe, NULL, NULL, NULL, NULL);
    860 	for (i = 0; i < nconn; i++) {
    861 		struct Xconn *xc;
    862 		struct Xconn *xc2;
    863 		static int xcid;
    864 		PGconn *conn;
    865 		struct cmd *c;
    866 		int error;
    867 
    868 		conn = PQconnectdbParams(keywords, values, 0);
    869 		if (conn == NULL) {
    870 			errx(EXIT_FAILURE,
    871 			    "PQconnectdbParams: unknown failure");
    872 		}
    873 		if (PQstatus(conn) != CONNECTION_OK) {
    874 			/*
    875 			 * XXX sleep and retry on ERRCODE_CANNOT_CONNECT_NOW
    876 			 */
    877 			errx(EXIT_FAILURE, "PQconnectdbParams: %s",
    878 			    PQerrorMessage(conn));
    879 		}
    880 		DPRINTF("protocol version %d\n", PQprotocolVersion(conn));
    881 		puffs_framev_addfd(pu, PQsocket(conn), PUFFS_FBIO_READ);
    882 		xc = emalloc(sizeof(*xc));
    883 		xc->conn = conn;
    884 		xc->blocker = NULL;
    885 		xc->owner = NULL;
    886 		xc->in_trans = false;
    887 		xc->id = xcid++;
    888 		xc->label = NULL;
    889 		assert(xc->id < 32);
    890 		PQsetNoticeReceiver(conn, pgfs_notice_receiver, xc);
    891 		TAILQ_INSERT_HEAD(&xclist, xc, list);
    892 		xc2 = begin(pu, NULL);
    893 		assert(xc2 == xc);
    894 		c = createcmd("SET search_path TO pgfs", CMD_NOPREPARE);
    895 		error = simplecmd(xc, c);
    896 		assert(error == 0);
    897 		freecmd(c);
    898 		c = createcmd("SET SESSION CHARACTERISTICS AS "
    899 		    "TRANSACTION ISOLATION LEVEL REPEATABLE READ",
    900 		    CMD_NOPREPARE);
    901 		error = simplecmd(xc, c);
    902 		assert(error == 0);
    903 		freecmd(c);
    904 		c = createcmd("SET SESSION TIME ZONE UTC", CMD_NOPREPARE);
    905 		error = simplecmd(xc, c);
    906 		assert(error == 0);
    907 		freecmd(c);
    908 		if (!pgfs_dosync) {
    909 			c = createcmd("SET SESSION SYNCHRONOUS_COMMIT TO OFF",
    910 			    CMD_NOPREPARE);
    911 			error = simplecmd(xc, c);
    912 			assert(error == 0);
    913 			freecmd(c);
    914 		}
    915 		if (debug) {
    916 			struct fetchstatus s;
    917 			static const Oid types[] = { INT8OID, };
    918 			uint64_t pid;
    919 
    920 			c = createcmd("SELECT pg_backend_pid()::int8;",
    921 			    CMD_NOPREPARE);
    922 			error = sendcmd(xc, c);
    923 			assert(error == 0);
    924 			fetchinit(&s, xc);
    925 			error = FETCHNEXT(&s, types, &pid);
    926 			fetchdone(&s);
    927 			assert(error == 0);
    928 			DPRINTF("xc %p backend pid %" PRIu64 "\n", xc, pid);
    929 		}
    930 		error = commit(xc);
    931 		assert(error == 0);
    932 		assert(xc->owner == NULL);
    933 	}
    934 	/*
    935 	 * XXX cleanup unlinked files here?  what to do when the filesystem
    936 	 * is shared?
    937 	 */
    938 	return 0;
    939 }
    940 
    941 struct waitq flushwaitq = TAILQ_HEAD_INITIALIZER(flushwaitq);
    942 struct puffs_cc *flusher = NULL;
    943 
    944 int
    945 flush_xacts(struct puffs_usermount *pu)
    946 {
    947 	struct puffs_cc *cc = puffs_cc_getcc(pu);
    948 	struct Xconn *xc;
    949 	static struct cmd *c;
    950 	uint64_t dummy;
    951 	int error;
    952 
    953 	/*
    954 	 * flush all previously issued asynchronous transactions.
    955 	 *
    956 	 * XXX
    957 	 * unfortunately it seems that there is no clean way to tell
    958 	 * PostgreSQL flush XLOG.  we could perform a CHECKPOINT but it's
    959 	 * too expensive and overkill for our purpose.
    960 	 * besides, PostgreSQL has an optimization to skip XLOG flushing
    961 	 * for transactions which didn't produce WAL records.
    962 	 * (changeset f6a0863e3cb72763490ceca2c558d5ef2dddd5f2)
    963 	 * it means that an empty transaction ("BEGIN; COMMIT;"), which
    964 	 * doesn't produce any WAL records, doesn't flush the XLOG even if
    965 	 * synchronous_commit=on.  we issues a dummy setval() to avoid the
    966 	 * optimization.
    967 	 * on the other hand, we try to avoid creating unnecessary WAL activity
    968 	 * by serializing flushing and checking XLOG locations.
    969 	 */
    970 
    971 	assert(!pgfs_dosync);
    972 	if (flusher != NULL) { /* serialize flushers */
    973 		DPRINTF("%p flush in progress %p\n", cc, flusher);
    974 		waiton(&flushwaitq, cc);
    975 		assert(flusher == NULL);
    976 	}
    977 	DPRINTF("%p start flushing\n", cc);
    978 	flusher = cc;
    979 retry:
    980 	xc = begin(pu, "flush");
    981 	CREATECMD_NOPARAM(c, "SELECT setval('dummyseq', 1) WHERE "
    982 	    "pg_current_xlog_insert_location() <> pg_current_xlog_location()");
    983 	error = sendcmd(xc, c);
    984 	if (error != 0) {
    985 		goto got_error;
    986 	}
    987 	error = simplefetch(xc, INT8OID, &dummy);
    988 	assert(error != 0 || dummy == 1);
    989 	if (error == ENOENT) {
    990 		/*
    991 		 * there seems to be nothing to flush.
    992 		 */
    993 		DPRINTF("%p no sync\n", cc);
    994 		error = 0;
    995 	}
    996 	if (error != 0) {
    997 		goto got_error;
    998 	}
    999 	error = commit_sync(xc);
   1000 	if (error != 0) {
   1001 		goto got_error;
   1002 	}
   1003 	goto done;
   1004 got_error:
   1005 	rollback(xc);
   1006 	if (error == EAGAIN) {
   1007 		goto retry;
   1008 	}
   1009 done:
   1010 	assert(flusher == cc);
   1011 	flusher = NULL;
   1012 	wakeup_one(&flushwaitq);
   1013 	DPRINTF("%p end flushing error=%d\n", cc, error);
   1014 	return error;
   1015 }
   1016