Home | History | Annotate | Line # | Download | only in pgfs
pgfs_db.c revision 1.2
      1 /*	$NetBSD: pgfs_db.c,v 1.2 2012/04/11 14:26:44 yamt Exp $	*/
      2 
      3 /*-
      4  * Copyright (c)2010,2011 YAMAMOTO Takashi,
      5  * All rights reserved.
      6  *
      7  * Redistribution and use in source and binary forms, with or without
      8  * modification, are permitted provided that the following conditions
      9  * are met:
     10  * 1. Redistributions of source code must retain the above copyright
     11  *    notice, this list of conditions and the following disclaimer.
     12  * 2. Redistributions in binary form must reproduce the above copyright
     13  *    notice, this list of conditions and the following disclaimer in the
     14  *    documentation and/or other materials provided with the distribution.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
     17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
     22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     26  * SUCH DAMAGE.
     27  */
     28 
     29 /*
     30  * backend db operations
     31  */
     32 
     33 #include <sys/cdefs.h>
     34 #ifndef lint
     35 __RCSID("$NetBSD: pgfs_db.c,v 1.2 2012/04/11 14:26:44 yamt Exp $");
     36 #endif /* not lint */
     37 
     38 #include <assert.h>
     39 #include <err.h>
     40 #include <errno.h>
     41 #include <inttypes.h>
     42 #include <puffs.h>
     43 #include <stdbool.h>
     44 #include <stdarg.h>
     45 #include <stdio.h>
     46 #include <stdlib.h>
     47 #include <util.h>
     48 
     49 #include <libpq-fe.h>
     50 
     51 #include "pgfs_db.h"
     52 #include "pgfs_waitq.h"
     53 #include "pgfs_debug.h"
     54 
     55 bool pgfs_dosync = false;
     56 
     57 struct Xconn {
     58 	TAILQ_ENTRY(Xconn) list;
     59 	PGconn *conn;
     60 	struct puffs_cc *blocker;
     61 	struct puffs_cc *owner;
     62 	bool in_trans;
     63 	int id;
     64 };
     65 
     66 static void
     67 dumperror(struct Xconn *xc, const PGresult *res)
     68 {
     69 	static const struct {
     70 		const char *name;
     71 		int code;
     72 	} fields[] = {
     73 #define F(x)	{ .name = #x, .code = x, }
     74 		F(PG_DIAG_SEVERITY),
     75 		F(PG_DIAG_SQLSTATE),
     76 		F(PG_DIAG_MESSAGE_PRIMARY),
     77 		F(PG_DIAG_MESSAGE_DETAIL),
     78 		F(PG_DIAG_MESSAGE_HINT),
     79 		F(PG_DIAG_STATEMENT_POSITION),
     80 		F(PG_DIAG_INTERNAL_POSITION),
     81 		F(PG_DIAG_INTERNAL_QUERY),
     82 		F(PG_DIAG_CONTEXT),
     83 		F(PG_DIAG_SOURCE_FILE),
     84 		F(PG_DIAG_SOURCE_LINE),
     85 		F(PG_DIAG_SOURCE_FUNCTION),
     86 #undef F
     87 	};
     88 	unsigned int i;
     89 
     90 	if (!pgfs_dodprintf) {
     91 		return;
     92 	}
     93 	assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR ||
     94 	    PQresultStatus(res) == PGRES_FATAL_ERROR);
     95 	for (i = 0; i < __arraycount(fields); i++) {
     96 		const char *val = PQresultErrorField(res, fields[i].code);
     97 
     98 		if (val == NULL) {
     99 			continue;
    100 		}
    101 		fprintf(stderr, "%s: %s\n", fields[i].name, val);
    102 	}
    103 }
    104 
    105 TAILQ_HEAD(, Xconn) xclist = TAILQ_HEAD_INITIALIZER(xclist);
    106 struct waitq xcwaitq = TAILQ_HEAD_INITIALIZER(xcwaitq);
    107 
    108 static struct Xconn *
    109 getxc(struct puffs_cc *cc)
    110 {
    111 	struct Xconn *xc;
    112 
    113 	assert(cc != NULL);
    114 retry:
    115 	TAILQ_FOREACH(xc, &xclist, list) {
    116 		if (xc->blocker == NULL) {
    117 			assert(xc->owner == NULL);
    118 			xc->owner = cc;
    119 			DPRINTF("xc %p acquire %p\n", xc, cc);
    120 			return xc;
    121 		} else {
    122 			assert(xc->owner == xc->blocker);
    123 		}
    124 	}
    125 	DPRINTF("no free conn %p\n", cc);
    126 	waiton(&xcwaitq, cc);
    127 	goto retry;
    128 }
    129 
    130 static void
    131 relxc(struct Xconn *xc)
    132 {
    133 
    134 	assert(xc->in_trans);
    135 	assert(xc->owner != NULL);
    136 	xc->in_trans = false;
    137 	xc->owner = NULL;
    138 	wakeup_one(&xcwaitq);
    139 }
    140 
    141 static void
    142 pqwait(struct Xconn *xc)
    143 {
    144 	PGconn *conn = xc->conn;
    145 	struct puffs_cc *cc = xc->owner;
    146 
    147 	if (PQflush(conn)) {
    148 		errx(EXIT_FAILURE, "PQflush: %s", PQerrorMessage(conn));
    149 	}
    150 	if (!PQisBusy(conn)) {
    151 		return;
    152 	}
    153 	assert(xc->blocker == NULL);
    154 	xc->blocker = cc;
    155 	DPRINTF("yielding %p\n", cc);
    156 	/* XXX is it safe to yield before entering mainloop? */
    157 	puffs_cc_yield(cc);
    158 	DPRINTF("yield returned %p\n", cc);
    159 	assert(xc->owner == cc);
    160 	assert(xc->blocker == cc);
    161 	xc->blocker = NULL;
    162 }
    163 
    164 static int
    165 sqltoerrno(const char *sqlstate)
    166 {
    167 	/*
    168 	 * XXX hack; ERRCODE_INTERNAL_ERROR -> EAGAIN to handle
    169 	 * "tuple concurrently updated" errors for lowrite/lo_truncate.
    170 	 *
    171 	 * XXX should map ERRCODE_OUT_OF_MEMORY to EAGAIN?
    172 	 */
    173 	static const struct {
    174 		char sqlstate[5];
    175 		int error;
    176 	} map[] = {
    177 		{ "00000", 0, },	/* ERRCODE_SUCCESSFUL_COMPLETION */
    178 		{ "02000", ENOENT, },	/* ERRCODE_NO_DATA */
    179 		{ "23505", EEXIST, },	/* ERRCODE_UNIQUE_VIOLATION */
    180 		{ "23514", EINVAL, },	/* ERRCODE_CHECK_VIOLATION */
    181 		{ "40001", EAGAIN, },	/* ERRCODE_T_R_SERIALIZATION_FAILURE */
    182 		{ "40P01", EAGAIN, },	/* ERRCODE_T_R_DEADLOCK_DETECTED */
    183 		{ "42704", ENOENT, },	/* ERRCODE_UNDEFINED_OBJECT */
    184 		{ "53100", ENOSPC, },	/* ERRCODE_DISK_FULL */
    185 		{ "53200", ENOMEM, },	/* ERRCODE_OUT_OF_MEMORY */
    186 		{ "XX000", EAGAIN, },	/* ERRCODE_INTERNAL_ERROR */
    187 	};
    188 	unsigned int i;
    189 
    190 	for (i = 0; i < __arraycount(map); i++) {
    191 		if (!memcmp(map[i].sqlstate, sqlstate, 5)) {
    192 			const int error = map[i].error;
    193 
    194 			if (error != 0) {
    195 				DPRINTF("sqlstate %5s mapped to error %d\n",
    196 				    sqlstate, error);
    197 			}
    198 			if (error == EINVAL) {
    199 				/*
    200 				 * sounds like a bug.
    201 				 */
    202 				abort();
    203 			}
    204 			return error;
    205 		}
    206 	}
    207 	DPRINTF("unknown sqlstate %5s mapped to EIO\n", sqlstate);
    208 	return EIO;
    209 }
    210 
    211 struct cmd {
    212 	char name[32];		/* name of prepared statement */
    213 	char *cmd;		/* query string */
    214 	unsigned int nparams;
    215 	Oid *paramtypes;
    216 	uint32_t prepared_mask;	/* for which connections this is prepared? */
    217 	unsigned int flags;	/* CMD_ flags */
    218 };
    219 
    220 #define	CMD_NOPREPARE	1	/* don't prepare this command */
    221 
    222 struct cmd *
    223 createcmd(const char *cmd, unsigned int flags, ...)
    224 {
    225 	struct cmd *c;
    226 	va_list ap;
    227 	const char *cp;
    228 	unsigned int i;
    229 	static unsigned int cmdid;
    230 
    231 	c = emalloc(sizeof(*c));
    232 	c->cmd = estrdup(cmd);
    233 	c->nparams = 0;
    234 	va_start(ap, flags);
    235 	for (cp = cmd; *cp != 0; cp++) {
    236 		if (*cp == '$') { /* XXX */
    237 			c->nparams++;
    238 		}
    239 	}
    240 	c->paramtypes = emalloc(c->nparams * sizeof(*c->paramtypes));
    241 	for (i = 0; i < c->nparams; i++) {
    242 		Oid type = va_arg(ap, Oid);
    243 		assert(type == BYTEA ||
    244 		    type == INT4OID || type == INT8OID || type == OIDOID ||
    245 		    type == TEXTOID || type == TIMESTAMPTZOID);
    246 		c->paramtypes[i] = type;
    247 	}
    248 	va_end(ap);
    249 	snprintf(c->name, sizeof(c->name), "%u", cmdid++);
    250 	if ((flags & CMD_NOPREPARE) != 0) {
    251 		c->prepared_mask = ~0;
    252 	} else {
    253 		c->prepared_mask = 0;
    254 	}
    255 	c->flags = flags;
    256 	return c;
    257 }
    258 
    259 static void
    260 freecmd(struct cmd *c)
    261 {
    262 
    263 	free(c->paramtypes);
    264 	free(c->cmd);
    265 	free(c);
    266 }
    267 
    268 static int
    269 fetch_noresult(struct Xconn *xc)
    270 {
    271 	PGresult *res;
    272 	ExecStatusType status;
    273 	PGconn *conn = xc->conn;
    274 	int error;
    275 
    276 	pqwait(xc);
    277 	res = PQgetResult(conn);
    278 	if (res == NULL) {
    279 		return ENOENT;
    280 	}
    281 	status = PQresultStatus(res);
    282 	if (status == PGRES_COMMAND_OK) {
    283 		assert(PQnfields(res) == 0);
    284 		assert(PQntuples(res) == 0);
    285 		if (!strcmp(PQcmdTuples(res), "0")) {
    286 			error = ENOENT;
    287 		} else {
    288 			error = 0;
    289 		}
    290 	} else if (status == PGRES_FATAL_ERROR) {
    291 		error = sqltoerrno(PQresultErrorField(res, PG_DIAG_SQLSTATE));
    292 		assert(error != 0);
    293 		dumperror(xc, res);
    294 	} else {
    295 		errx(1, "%s not command_ok: %d: %s", __func__,
    296 		    (int)status,
    297 		    PQerrorMessage(conn));
    298 	}
    299 	PQclear(res);
    300 	res = PQgetResult(conn);
    301 	assert(res == NULL);
    302 	if (error != 0) {
    303 		DPRINTF("error %d\n", error);
    304 	}
    305 	return error;
    306 }
    307 
    308 static int
    309 preparecmd(struct Xconn *xc, struct cmd *c)
    310 {
    311 	PGconn *conn = xc->conn;
    312 	const uint32_t mask = 1 << xc->id;
    313 	int error;
    314 	int ret;
    315 
    316 	if ((c->prepared_mask & mask) != 0) {
    317 		return 0;
    318 	}
    319 	assert((c->flags & CMD_NOPREPARE) == 0);
    320 	DPRINTF("PREPARE: '%s'\n", c->cmd);
    321 	ret = PQsendPrepare(conn, c->name, c->cmd, c->nparams, c->paramtypes);
    322 	if (!ret) {
    323 		errx(EXIT_FAILURE, "PQsendPrepare: %s",
    324 		    PQerrorMessage(conn));
    325 	}
    326 	error = fetch_noresult(xc);
    327 	if (error != 0) {
    328 		return error;
    329 	}
    330 	c->prepared_mask |= mask;
    331 	return 0;
    332 }
    333 
    334 /*
    335  * vsendcmd:
    336  *
    337  * resultmode is just passed to PQsendQueryParams/PQsendQueryPrepared.
    338  * 0 for text and 1 for binary.
    339  */
    340 
    341 static int
    342 vsendcmd(struct Xconn *xc, int resultmode, struct cmd *c, va_list ap)
    343 {
    344 	PGconn *conn = xc->conn;
    345 	char **paramvalues;
    346 	int *paramlengths;
    347 	int *paramformats;
    348 	unsigned int i;
    349 	int error;
    350 	int ret;
    351 
    352 	assert(xc->owner != NULL);
    353 	assert(xc->blocker == NULL);
    354 	error = preparecmd(xc, c);
    355 	if (error != 0) {
    356 		return error;
    357 	}
    358 	paramvalues = emalloc(c->nparams * sizeof(*paramvalues));
    359 	paramlengths = NULL;
    360 	paramformats = NULL;
    361 	DPRINTF("CMD: '%s'\n", c->cmd);
    362 	for (i = 0; i < c->nparams; i++) {
    363 		Oid type = c->paramtypes[i];
    364 		char tmpstore[1024];
    365 		const char *buf = NULL;
    366 		intmax_t v = 0; /* XXXgcc */
    367 		int sz;
    368 		bool binary = false;
    369 
    370 		switch (type) {
    371 		case BYTEA:
    372 			buf = va_arg(ap, const void *);
    373 			sz = (int)va_arg(ap, size_t);
    374 			binary = true;
    375 			break;
    376 		case INT8OID:
    377 		case OIDOID:
    378 		case INT4OID:
    379 			switch (type) {
    380 			case INT8OID:
    381 				v = (intmax_t)va_arg(ap, int64_t);
    382 				break;
    383 			case OIDOID:
    384 				v = (intmax_t)va_arg(ap, Oid);
    385 				break;
    386 			case INT4OID:
    387 				v = (intmax_t)va_arg(ap, int32_t);
    388 				break;
    389 			default:
    390 				errx(EXIT_FAILURE, "unknown integer oid %u",
    391 				    type);
    392 			}
    393 			buf = tmpstore;
    394 			sz = snprintf(tmpstore, sizeof(tmpstore),
    395 			    "%jd", v);
    396 			assert(sz != -1);
    397 			assert((size_t)sz < sizeof(tmpstore));
    398 			sz += 1;
    399 			break;
    400 		case TEXTOID:
    401 		case TIMESTAMPTZOID:
    402 			buf = va_arg(ap, char *);
    403 			sz = strlen(buf) + 1;
    404 			break;
    405 		default:
    406 			errx(EXIT_FAILURE, "%s: unknown param type %u",
    407 			    __func__, type);
    408 		}
    409 		if (binary) {
    410 			if (paramlengths == NULL) {
    411 				paramlengths =
    412 				    emalloc(c->nparams * sizeof(*paramformats));
    413 			}
    414 			if (paramformats == NULL) {
    415 				paramformats = ecalloc(1,
    416 				    c->nparams * sizeof(*paramformats));
    417 			}
    418 			paramformats[i] = 1;
    419 			paramlengths[i] = sz;
    420 		}
    421 		paramvalues[i] = emalloc(sz);
    422 		memcpy(paramvalues[i], buf, sz);
    423 		if (binary) {
    424 			DPRINTF("\t[%u]=<BINARY>\n", i);
    425 		} else {
    426 			DPRINTF("\t[%u]='%s'\n", i, paramvalues[i]);
    427 		}
    428 	}
    429 	if ((c->flags & CMD_NOPREPARE) != 0) {
    430 		ret = PQsendQueryParams(conn, c->cmd, c->nparams, c->paramtypes,
    431 		    (const char * const *)paramvalues, paramlengths,
    432 		    paramformats, resultmode);
    433 	} else {
    434 		ret = PQsendQueryPrepared(conn, c->name, c->nparams,
    435 		    (const char * const *)paramvalues, paramlengths,
    436 		    paramformats, resultmode);
    437 	}
    438 	for (i = 0; i < c->nparams; i++) {
    439 		free(paramvalues[i]);
    440 	}
    441 	free(paramvalues);
    442 	free(paramlengths);
    443 	free(paramformats);
    444 	if (!ret) {
    445 		errx(EXIT_FAILURE, "PQsendQueryPrepared: %s",
    446 		    PQerrorMessage(conn));
    447 	}
    448 	return 0;
    449 }
    450 
    451 int
    452 sendcmd(struct Xconn *xc, struct cmd *c, ...)
    453 {
    454 	va_list ap;
    455 	int error;
    456 
    457 	va_start(ap, c);
    458 	error = vsendcmd(xc, 0, c, ap);
    459 	va_end(ap);
    460 	return error;
    461 }
    462 
    463 int
    464 sendcmdx(struct Xconn *xc, int resultmode, struct cmd *c, ...)
    465 {
    466 	va_list ap;
    467 	int error;
    468 
    469 	va_start(ap, c);
    470 	error = vsendcmd(xc, resultmode, c, ap);
    471 	va_end(ap);
    472 	return error;
    473 }
    474 
    475 /*
    476  * simplecmd: a convenient routine to execute a command which returns
    477  * no rows synchronously.
    478  */
    479 
    480 int
    481 simplecmd(struct Xconn *xc, struct cmd *c, ...)
    482 {
    483 	va_list ap;
    484 	int error;
    485 
    486 	va_start(ap, c);
    487 	error = vsendcmd(xc, 0, c, ap);
    488 	va_end(ap);
    489 	if (error != 0) {
    490 		return error;
    491 	}
    492 	return fetch_noresult(xc);
    493 }
    494 
    495 void
    496 fetchinit(struct fetchstatus *s, struct Xconn *xc)
    497 {
    498 	s->xc = xc;
    499 	s->res = NULL;
    500 	s->cur = 0;
    501 	s->nrows = 0;
    502 	s->done = false;
    503 }
    504 
    505 static intmax_t
    506 getint(const char *str)
    507 {
    508 	intmax_t i;
    509 	char *ep;
    510 
    511 	errno = 0;
    512 	i = strtoimax(str, &ep, 10);
    513 	assert(errno == 0);
    514 	assert(str[0] != 0);
    515 	assert(*ep == 0);
    516 	return i;
    517 }
    518 
    519 static int
    520 vfetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, va_list ap)
    521 {
    522 	PGconn *conn = s->xc->conn;
    523 	unsigned int i;
    524 
    525 	assert(conn != NULL);
    526 	if (s->res == NULL) {
    527 		ExecStatusType status;
    528 		int error;
    529 
    530 		pqwait(s->xc);
    531 		s->res = PQgetResult(conn);
    532 		if (s->res == NULL) {
    533 			s->done = true;
    534 			return ENOENT;
    535 		}
    536 		status = PQresultStatus(s->res);
    537 		if (status == PGRES_FATAL_ERROR) {
    538 			error = sqltoerrno(
    539 			    PQresultErrorField(s->res, PG_DIAG_SQLSTATE));
    540 			assert(error != 0);
    541 			dumperror(s->xc, s->res);
    542 			return error;
    543 		}
    544 		if (status != PGRES_TUPLES_OK) {
    545 			errx(1, "not tuples_ok: %s",
    546 			    PQerrorMessage(conn));
    547 		}
    548 		assert((unsigned int)PQnfields(s->res) == n);
    549 		s->nrows = PQntuples(s->res);
    550 		if (s->nrows == 0) {
    551 			DPRINTF("no rows\n");
    552 			return ENOENT;
    553 		}
    554 		assert(s->nrows >= 1);
    555 		s->cur = 0;
    556 	}
    557 	for (i = 0; i < n; i++) {
    558 		size_t size;
    559 
    560 		assert((types[i] != BYTEA) == (PQfformat(s->res, i) == 0));
    561 		DPRINTF("[%u] PQftype = %d, types = %d, value = '%s'\n",
    562 		    i, PQftype(s->res, i), types[i],
    563 		    PQgetisnull(s->res, s->cur, i) ? "<NULL>" :
    564 		    PQfformat(s->res, i) == 0 ? PQgetvalue(s->res, s->cur, i) :
    565 		    "<BINARY>");
    566 		assert(PQftype(s->res, i) == types[i]);
    567 		assert(!PQgetisnull(s->res, s->cur, i));
    568 		switch(types[i]) {
    569 		case INT8OID:
    570 			*va_arg(ap, int64_t *) =
    571 			    getint(PQgetvalue(s->res, s->cur, i));
    572 			break;
    573 		case OIDOID:
    574 			*va_arg(ap, Oid *) =
    575 			    getint(PQgetvalue(s->res, s->cur, i));
    576 			break;
    577 		case INT4OID:
    578 			*va_arg(ap, int32_t *) =
    579 			    getint(PQgetvalue(s->res, s->cur, i));
    580 			break;
    581 		case TEXTOID:
    582 			*va_arg(ap, char **) =
    583 			    estrdup(PQgetvalue(s->res, s->cur, i));
    584 			break;
    585 		case BYTEA:
    586 			size = PQgetlength(s->res, s->cur, i);
    587 			memcpy(va_arg(ap, void *),
    588 			    PQgetvalue(s->res, s->cur, i), size);
    589 			*va_arg(ap, size_t *) = size;
    590 			break;
    591 		default:
    592 			errx(EXIT_FAILURE, "%s unknown type %u", __func__,
    593 			    types[i]);
    594 		}
    595 	}
    596 	s->cur++;
    597 	if (s->cur == s->nrows) {
    598 		PQclear(s->res);
    599 		s->res = NULL;
    600 	}
    601 	return 0;
    602 }
    603 
    604 int
    605 fetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, ...)
    606 {
    607 	va_list ap;
    608 	int error;
    609 
    610 	va_start(ap, types);
    611 	error = vfetchnext(s, n, types, ap);
    612 	va_end(ap);
    613 	return error;
    614 }
    615 
    616 void
    617 fetchdone(struct fetchstatus *s)
    618 {
    619 
    620 	if (s->res != NULL) {
    621 		PQclear(s->res);
    622 		s->res = NULL;
    623 	}
    624 	if (!s->done) {
    625 		PGresult *res;
    626 		unsigned int n;
    627 
    628 		n = 0;
    629 		while ((res = PQgetResult(s->xc->conn)) != NULL) {
    630 			PQclear(res);
    631 			n++;
    632 		}
    633 		if (n > 0) {
    634 			DPRINTF("%u rows dropped\n", n);
    635 		}
    636 	}
    637 }
    638 
    639 int
    640 simplefetch(struct Xconn *xc, Oid type, ...)
    641 {
    642 	struct fetchstatus s;
    643 	va_list ap;
    644 	int error;
    645 
    646 	fetchinit(&s, xc);
    647 	va_start(ap, type);
    648 	error = vfetchnext(&s, 1, &type, ap);
    649 	va_end(ap);
    650 	assert(error != 0 || s.res == NULL);
    651 	fetchdone(&s);
    652 	return error;
    653 }
    654 
    655 static void
    656 setlabel(struct Xconn *xc, const char *label)
    657 {
    658 	int error;
    659 
    660 	/*
    661 	 * put the label into application_name so that it's shown in
    662 	 * pg_stat_activity.  we are sure that our labels don't need
    663 	 * PQescapeStringConn.
    664 	 *
    665 	 * example:
    666 	 *	SELECT pid,application_name,query FROM pg_stat_activity
    667 	 *	WHERE state <> 'idle'
    668 	 */
    669 
    670 	if (label != NULL) {
    671 		struct cmd *c;
    672 		char cmd_str[1024];
    673 
    674 		snprintf(cmd_str, sizeof(cmd_str),
    675 		    "SET application_name TO 'pgfs: %s'", label);
    676 		c = createcmd(cmd_str, CMD_NOPREPARE);
    677 		error = simplecmd(xc, c);
    678 		freecmd(c);
    679 		assert(error == 0);
    680 	} else {
    681 #if 0 /* don't bother to clear label */
    682 		static struct cmd *c;
    683 
    684 		CREATECMD_NOPARAM(c, "SET application_name TO 'pgfs'");
    685 		error = simplecmd(xc, c);
    686 		assert(error == 0);
    687 #endif
    688 	}
    689 }
    690 
    691 struct Xconn *
    692 begin(struct puffs_usermount *pu, const char *label)
    693 {
    694 	struct Xconn *xc = getxc(puffs_cc_getcc(pu));
    695 	static struct cmd *c;
    696 	int error;
    697 
    698 	setlabel(xc, label);
    699 	CREATECMD_NOPARAM(c, "BEGIN");
    700 	assert(!xc->in_trans);
    701 	error = simplecmd(xc, c);
    702 	assert(error == 0);
    703 	assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
    704 	xc->in_trans = true;
    705 	return xc;
    706 }
    707 
    708 struct Xconn *
    709 begin_readonly(struct puffs_usermount *pu, const char *label)
    710 {
    711 	struct Xconn *xc = getxc(puffs_cc_getcc(pu));
    712 	static struct cmd *c;
    713 	int error;
    714 
    715 	setlabel(xc, label);
    716 	CREATECMD_NOPARAM(c, "BEGIN READ ONLY");
    717 	assert(!xc->in_trans);
    718 	error = simplecmd(xc, c);
    719 	assert(error == 0);
    720 	assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
    721 	xc->in_trans = true;
    722 	return xc;
    723 }
    724 
    725 void
    726 rollback(struct Xconn *xc)
    727 {
    728 	PGTransactionStatusType status;
    729 
    730 	/*
    731 	 * check the status as we are not sure the status of our transaction
    732 	 * after a failed commit.
    733 	 */
    734 	status = PQtransactionStatus(xc->conn);
    735 	assert(status != PQTRANS_ACTIVE);
    736 	assert(status != PQTRANS_UNKNOWN);
    737 	if (status != PQTRANS_IDLE) {
    738 		static struct cmd *c;
    739 		int error;
    740 
    741 		assert(status == PQTRANS_INTRANS || status == PQTRANS_INERROR);
    742 		CREATECMD_NOPARAM(c, "ROLLBACK");
    743 		error = simplecmd(xc, c);
    744 		assert(error == 0);
    745 	}
    746 	DPRINTF("xc %p rollback %p\n", xc, xc->owner);
    747 	setlabel(xc, NULL);
    748 	relxc(xc);
    749 }
    750 
    751 int
    752 commit(struct Xconn *xc)
    753 {
    754 	static struct cmd *c;
    755 	int error;
    756 
    757 	CREATECMD_NOPARAM(c, "COMMIT");
    758 	error = simplecmd(xc, c);
    759 	setlabel(xc, NULL);
    760 	if (error == 0) {
    761 		DPRINTF("xc %p commit %p\n", xc, xc->owner);
    762 		relxc(xc);
    763 	}
    764 	return error;
    765 }
    766 
    767 int
    768 commit_sync(struct Xconn *xc)
    769 {
    770 	static struct cmd *c;
    771 	int error;
    772 
    773 	assert(!pgfs_dosync);
    774 	CREATECMD_NOPARAM(c, "SET LOCAL SYNCHRONOUS_COMMIT TO ON");
    775 	error = simplecmd(xc, c);
    776 	assert(error == 0);
    777 	return commit(xc);
    778 }
    779 
    780 static void
    781 pgfs_notice_receiver(void *vp, const PGresult *res)
    782 {
    783 	struct Xconn *xc = vp;
    784 
    785 	assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR);
    786 	fprintf(stderr, "got a notice on %p\n", xc);
    787 	dumperror(xc, res);
    788 }
    789 
    790 static int
    791 pgfs_readframe(struct puffs_usermount *pu, struct puffs_framebuf *pufbuf,
    792     int fd, int *done)
    793 {
    794 	struct Xconn *xc;
    795 	PGconn *conn;
    796 
    797 	TAILQ_FOREACH(xc, &xclist, list) {
    798 		if (PQsocket(xc->conn) == fd) {
    799 			break;
    800 		}
    801 	}
    802 	assert(xc != NULL);
    803 	conn = xc->conn;
    804 	PQconsumeInput(conn);
    805 	if (!PQisBusy(conn)) {
    806 		if (xc->blocker != NULL) {
    807 			DPRINTF("schedule %p\n", xc->blocker);
    808 			puffs_cc_schedule(xc->blocker);
    809 		} else {
    810 			DPRINTF("no blockers\n");
    811 		}
    812 	}
    813 	*done = 0;
    814 	return 0;
    815 }
    816 
    817 int
    818 pgfs_connectdb(struct puffs_usermount *pu, const char *dbname,
    819     const char *dbuser, bool debug, bool synchronous, unsigned int nconn)
    820 {
    821 	const char *keywords[3+1];
    822 	const char *values[3];
    823 	unsigned int i;
    824 
    825 	if (nconn > 32) {
    826 		/*
    827 		 * limit from sizeof(cmd->prepared_mask)
    828 		 */
    829 		return EINVAL;
    830 	}
    831 	if (debug) {
    832 		pgfs_dodprintf = true;
    833 	}
    834 	if (synchronous) {
    835 		pgfs_dosync = true;
    836 	}
    837 	i = 0;
    838 	if (dbname != NULL) {
    839 		keywords[i] = "dbname";
    840 		values[i] = dbname;
    841 		i++;
    842 	}
    843 	if (dbuser != NULL) {
    844 		keywords[i] = "user";
    845 		values[i] = dbuser;
    846 		i++;
    847 	}
    848 	keywords[i] = "application_name";
    849 	values[i] = "pgfs";
    850 	i++;
    851 	keywords[i] = NULL;
    852 	puffs_framev_init(pu, pgfs_readframe, NULL, NULL, NULL, NULL);
    853 	for (i = 0; i < nconn; i++) {
    854 		struct Xconn *xc;
    855 		struct Xconn *xc2;
    856 		static int xcid;
    857 		PGconn *conn;
    858 		struct cmd *c;
    859 		int error;
    860 
    861 		conn = PQconnectdbParams(keywords, values, 0);
    862 		if (conn == NULL) {
    863 			errx(EXIT_FAILURE,
    864 			    "PQconnectdbParams: unknown failure");
    865 		}
    866 		if (PQstatus(conn) != CONNECTION_OK) {
    867 			/*
    868 			 * XXX sleep and retry on ERRCODE_CANNOT_CONNECT_NOW
    869 			 */
    870 			errx(EXIT_FAILURE, "PQconnectdbParams: %s",
    871 			    PQerrorMessage(conn));
    872 		}
    873 		DPRINTF("protocol version %d\n", PQprotocolVersion(conn));
    874 		puffs_framev_addfd(pu, PQsocket(conn), PUFFS_FBIO_READ);
    875 		xc = emalloc(sizeof(*xc));
    876 		xc->conn = conn;
    877 		xc->blocker = NULL;
    878 		xc->owner = NULL;
    879 		xc->in_trans = false;
    880 		xc->id = xcid++;
    881 		assert(xc->id < 32);
    882 		PQsetNoticeReceiver(conn, pgfs_notice_receiver, xc);
    883 		TAILQ_INSERT_HEAD(&xclist, xc, list);
    884 		xc2 = begin(pu, NULL);
    885 		assert(xc2 == xc);
    886 		c = createcmd("SET search_path TO pgfs", CMD_NOPREPARE);
    887 		error = simplecmd(xc, c);
    888 		assert(error == 0);
    889 		freecmd(c);
    890 		c = createcmd("SET SESSION CHARACTERISTICS AS "
    891 		    "TRANSACTION ISOLATION LEVEL REPEATABLE READ",
    892 		    CMD_NOPREPARE);
    893 		error = simplecmd(xc, c);
    894 		assert(error == 0);
    895 		freecmd(c);
    896 		c = createcmd("SET SESSION TIME ZONE UTC", CMD_NOPREPARE);
    897 		error = simplecmd(xc, c);
    898 		assert(error == 0);
    899 		freecmd(c);
    900 		if (!pgfs_dosync) {
    901 			c = createcmd("SET SESSION SYNCHRONOUS_COMMIT TO OFF",
    902 			    CMD_NOPREPARE);
    903 			error = simplecmd(xc, c);
    904 			assert(error == 0);
    905 			freecmd(c);
    906 		}
    907 		if (debug) {
    908 			struct fetchstatus s;
    909 			static const Oid types[] = { INT8OID, };
    910 			uint64_t pid;
    911 
    912 			c = createcmd("SELECT pg_backend_pid()::int8;",
    913 			    CMD_NOPREPARE);
    914 			error = sendcmd(xc, c);
    915 			assert(error == 0);
    916 			fetchinit(&s, xc);
    917 			error = FETCHNEXT(&s, types, &pid);
    918 			fetchdone(&s);
    919 			assert(error == 0);
    920 			DPRINTF("xc %p backend pid %" PRIu64 "\n", xc, pid);
    921 		}
    922 		error = commit(xc);
    923 		assert(error == 0);
    924 		assert(xc->owner == NULL);
    925 	}
    926 	/*
    927 	 * XXX cleanup unlinked files here?  what to do when the filesystem
    928 	 * is shared?
    929 	 */
    930 	return 0;
    931 }
    932 
    933 struct waitq flushwaitq = TAILQ_HEAD_INITIALIZER(flushwaitq);
    934 struct puffs_cc *flusher = NULL;
    935 
    936 int
    937 flush_xacts(struct puffs_usermount *pu)
    938 {
    939 	struct puffs_cc *cc = puffs_cc_getcc(pu);
    940 	struct Xconn *xc;
    941 	static struct cmd *c;
    942 	uint64_t dummy;
    943 	int error;
    944 
    945 	/*
    946 	 * flush all previously issued asynchronous transactions.
    947 	 *
    948 	 * XXX
    949 	 * unfortunately it seems that there is no clean way to tell
    950 	 * PostgreSQL flush XLOG.  we could perform a CHECKPOINT but it's
    951 	 * too expensive and overkill for our purpose.
    952 	 * besides, PostgreSQL has an optimization to skip XLOG flushing
    953 	 * for transactions which didn't produce WAL records.
    954 	 * (changeset f6a0863e3cb72763490ceca2c558d5ef2dddd5f2)
    955 	 * it means that an empty transaction ("BEGIN; COMMIT;"), which
    956 	 * doesn't produce any WAL records, doesn't flush the XLOG even if
    957 	 * synchronous_commit=on.  we issues a dummy setval() to avoid the
    958 	 * optimization.
    959 	 * on the other hand, we try to avoid creating unnecessary WAL activity
    960 	 * by serializing flushing and checking XLOG locations.
    961 	 */
    962 
    963 	assert(!pgfs_dosync);
    964 	if (flusher != NULL) { /* serialize flushers */
    965 		DPRINTF("%p flush in progress %p\n", cc, flusher);
    966 		waiton(&flushwaitq, cc);
    967 		assert(flusher == NULL);
    968 	}
    969 	DPRINTF("%p start flushing\n", cc);
    970 	flusher = cc;
    971 retry:
    972 	xc = begin(pu, "flush");
    973 	CREATECMD_NOPARAM(c, "SELECT setval('dummyseq', 1) WHERE "
    974 	    "pg_current_xlog_insert_location() <> pg_current_xlog_location()");
    975 	error = sendcmd(xc, c);
    976 	if (error != 0) {
    977 		goto got_error;
    978 	}
    979 	error = simplefetch(xc, INT8OID, &dummy);
    980 	assert(error != 0 || dummy == 1);
    981 	if (error == ENOENT) {
    982 		/*
    983 		 * there seems to be nothing to flush.
    984 		 */
    985 		DPRINTF("%p no sync\n", cc);
    986 		error = 0;
    987 	}
    988 	if (error != 0) {
    989 		goto got_error;
    990 	}
    991 	error = commit_sync(xc);
    992 	if (error != 0) {
    993 		goto got_error;
    994 	}
    995 	goto done;
    996 got_error:
    997 	rollback(xc);
    998 	if (error == EAGAIN) {
    999 		goto retry;
   1000 	}
   1001 done:
   1002 	assert(flusher == cc);
   1003 	flusher = NULL;
   1004 	wakeup_one(&flushwaitq);
   1005 	DPRINTF("%p end flushing error=%d\n", cc, error);
   1006 	return error;
   1007 }
   1008