Home | History | Annotate | Line # | Download | only in pgfs
pgfs_db.c revision 1.2
      1  1.2  yamt /*	$NetBSD: pgfs_db.c,v 1.2 2012/04/11 14:26:44 yamt Exp $	*/
      2  1.1  yamt 
      3  1.1  yamt /*-
      4  1.1  yamt  * Copyright (c)2010,2011 YAMAMOTO Takashi,
      5  1.1  yamt  * All rights reserved.
      6  1.1  yamt  *
      7  1.1  yamt  * Redistribution and use in source and binary forms, with or without
      8  1.1  yamt  * modification, are permitted provided that the following conditions
      9  1.1  yamt  * are met:
     10  1.1  yamt  * 1. Redistributions of source code must retain the above copyright
     11  1.1  yamt  *    notice, this list of conditions and the following disclaimer.
     12  1.1  yamt  * 2. Redistributions in binary form must reproduce the above copyright
     13  1.1  yamt  *    notice, this list of conditions and the following disclaimer in the
     14  1.1  yamt  *    documentation and/or other materials provided with the distribution.
     15  1.1  yamt  *
     16  1.1  yamt  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
     17  1.1  yamt  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     18  1.1  yamt  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     19  1.1  yamt  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     20  1.1  yamt  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     21  1.1  yamt  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
     22  1.1  yamt  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     23  1.1  yamt  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     24  1.1  yamt  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     25  1.1  yamt  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     26  1.1  yamt  * SUCH DAMAGE.
     27  1.1  yamt  */
     28  1.1  yamt 
     29  1.1  yamt /*
     30  1.1  yamt  * backend db operations
     31  1.1  yamt  */
     32  1.1  yamt 
     33  1.1  yamt #include <sys/cdefs.h>
     34  1.1  yamt #ifndef lint
     35  1.2  yamt __RCSID("$NetBSD: pgfs_db.c,v 1.2 2012/04/11 14:26:44 yamt Exp $");
     36  1.1  yamt #endif /* not lint */
     37  1.1  yamt 
     38  1.1  yamt #include <assert.h>
     39  1.1  yamt #include <err.h>
     40  1.1  yamt #include <errno.h>
     41  1.1  yamt #include <inttypes.h>
     42  1.1  yamt #include <puffs.h>
     43  1.1  yamt #include <stdbool.h>
     44  1.1  yamt #include <stdarg.h>
     45  1.1  yamt #include <stdio.h>
     46  1.1  yamt #include <stdlib.h>
     47  1.1  yamt #include <util.h>
     48  1.1  yamt 
     49  1.1  yamt #include <libpq-fe.h>
     50  1.1  yamt 
     51  1.1  yamt #include "pgfs_db.h"
     52  1.1  yamt #include "pgfs_waitq.h"
     53  1.1  yamt #include "pgfs_debug.h"
     54  1.1  yamt 
     55  1.1  yamt bool pgfs_dosync = false;
     56  1.1  yamt 
     57  1.1  yamt struct Xconn {
     58  1.1  yamt 	TAILQ_ENTRY(Xconn) list;
     59  1.1  yamt 	PGconn *conn;
     60  1.1  yamt 	struct puffs_cc *blocker;
     61  1.1  yamt 	struct puffs_cc *owner;
     62  1.1  yamt 	bool in_trans;
     63  1.1  yamt 	int id;
     64  1.1  yamt };
     65  1.1  yamt 
     66  1.1  yamt static void
     67  1.1  yamt dumperror(struct Xconn *xc, const PGresult *res)
     68  1.1  yamt {
     69  1.1  yamt 	static const struct {
     70  1.1  yamt 		const char *name;
     71  1.1  yamt 		int code;
     72  1.1  yamt 	} fields[] = {
     73  1.1  yamt #define F(x)	{ .name = #x, .code = x, }
     74  1.1  yamt 		F(PG_DIAG_SEVERITY),
     75  1.1  yamt 		F(PG_DIAG_SQLSTATE),
     76  1.1  yamt 		F(PG_DIAG_MESSAGE_PRIMARY),
     77  1.1  yamt 		F(PG_DIAG_MESSAGE_DETAIL),
     78  1.1  yamt 		F(PG_DIAG_MESSAGE_HINT),
     79  1.1  yamt 		F(PG_DIAG_STATEMENT_POSITION),
     80  1.1  yamt 		F(PG_DIAG_INTERNAL_POSITION),
     81  1.1  yamt 		F(PG_DIAG_INTERNAL_QUERY),
     82  1.1  yamt 		F(PG_DIAG_CONTEXT),
     83  1.1  yamt 		F(PG_DIAG_SOURCE_FILE),
     84  1.1  yamt 		F(PG_DIAG_SOURCE_LINE),
     85  1.1  yamt 		F(PG_DIAG_SOURCE_FUNCTION),
     86  1.1  yamt #undef F
     87  1.1  yamt 	};
     88  1.1  yamt 	unsigned int i;
     89  1.1  yamt 
     90  1.1  yamt 	if (!pgfs_dodprintf) {
     91  1.1  yamt 		return;
     92  1.1  yamt 	}
     93  1.1  yamt 	assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR ||
     94  1.1  yamt 	    PQresultStatus(res) == PGRES_FATAL_ERROR);
     95  1.1  yamt 	for (i = 0; i < __arraycount(fields); i++) {
     96  1.1  yamt 		const char *val = PQresultErrorField(res, fields[i].code);
     97  1.1  yamt 
     98  1.1  yamt 		if (val == NULL) {
     99  1.1  yamt 			continue;
    100  1.1  yamt 		}
    101  1.1  yamt 		fprintf(stderr, "%s: %s\n", fields[i].name, val);
    102  1.1  yamt 	}
    103  1.1  yamt }
    104  1.1  yamt 
    105  1.1  yamt TAILQ_HEAD(, Xconn) xclist = TAILQ_HEAD_INITIALIZER(xclist);
    106  1.1  yamt struct waitq xcwaitq = TAILQ_HEAD_INITIALIZER(xcwaitq);
    107  1.1  yamt 
    108  1.1  yamt static struct Xconn *
    109  1.1  yamt getxc(struct puffs_cc *cc)
    110  1.1  yamt {
    111  1.1  yamt 	struct Xconn *xc;
    112  1.1  yamt 
    113  1.1  yamt 	assert(cc != NULL);
    114  1.1  yamt retry:
    115  1.1  yamt 	TAILQ_FOREACH(xc, &xclist, list) {
    116  1.1  yamt 		if (xc->blocker == NULL) {
    117  1.1  yamt 			assert(xc->owner == NULL);
    118  1.1  yamt 			xc->owner = cc;
    119  1.1  yamt 			DPRINTF("xc %p acquire %p\n", xc, cc);
    120  1.1  yamt 			return xc;
    121  1.1  yamt 		} else {
    122  1.1  yamt 			assert(xc->owner == xc->blocker);
    123  1.1  yamt 		}
    124  1.1  yamt 	}
    125  1.1  yamt 	DPRINTF("no free conn %p\n", cc);
    126  1.1  yamt 	waiton(&xcwaitq, cc);
    127  1.1  yamt 	goto retry;
    128  1.1  yamt }
    129  1.1  yamt 
    130  1.1  yamt static void
    131  1.1  yamt relxc(struct Xconn *xc)
    132  1.1  yamt {
    133  1.1  yamt 
    134  1.1  yamt 	assert(xc->in_trans);
    135  1.1  yamt 	assert(xc->owner != NULL);
    136  1.1  yamt 	xc->in_trans = false;
    137  1.1  yamt 	xc->owner = NULL;
    138  1.1  yamt 	wakeup_one(&xcwaitq);
    139  1.1  yamt }
    140  1.1  yamt 
    141  1.1  yamt static void
    142  1.1  yamt pqwait(struct Xconn *xc)
    143  1.1  yamt {
    144  1.1  yamt 	PGconn *conn = xc->conn;
    145  1.1  yamt 	struct puffs_cc *cc = xc->owner;
    146  1.1  yamt 
    147  1.1  yamt 	if (PQflush(conn)) {
    148  1.1  yamt 		errx(EXIT_FAILURE, "PQflush: %s", PQerrorMessage(conn));
    149  1.1  yamt 	}
    150  1.1  yamt 	if (!PQisBusy(conn)) {
    151  1.1  yamt 		return;
    152  1.1  yamt 	}
    153  1.1  yamt 	assert(xc->blocker == NULL);
    154  1.1  yamt 	xc->blocker = cc;
    155  1.1  yamt 	DPRINTF("yielding %p\n", cc);
    156  1.1  yamt 	/* XXX is it safe to yield before entering mainloop? */
    157  1.1  yamt 	puffs_cc_yield(cc);
    158  1.1  yamt 	DPRINTF("yield returned %p\n", cc);
    159  1.1  yamt 	assert(xc->owner == cc);
    160  1.1  yamt 	assert(xc->blocker == cc);
    161  1.1  yamt 	xc->blocker = NULL;
    162  1.1  yamt }
    163  1.1  yamt 
    164  1.1  yamt static int
    165  1.1  yamt sqltoerrno(const char *sqlstate)
    166  1.1  yamt {
    167  1.1  yamt 	/*
    168  1.1  yamt 	 * XXX hack; ERRCODE_INTERNAL_ERROR -> EAGAIN to handle
    169  1.1  yamt 	 * "tuple concurrently updated" errors for lowrite/lo_truncate.
    170  1.1  yamt 	 *
    171  1.1  yamt 	 * XXX should map ERRCODE_OUT_OF_MEMORY to EAGAIN?
    172  1.1  yamt 	 */
    173  1.1  yamt 	static const struct {
    174  1.1  yamt 		char sqlstate[5];
    175  1.1  yamt 		int error;
    176  1.1  yamt 	} map[] = {
    177  1.1  yamt 		{ "00000", 0, },	/* ERRCODE_SUCCESSFUL_COMPLETION */
    178  1.1  yamt 		{ "02000", ENOENT, },	/* ERRCODE_NO_DATA */
    179  1.1  yamt 		{ "23505", EEXIST, },	/* ERRCODE_UNIQUE_VIOLATION */
    180  1.1  yamt 		{ "23514", EINVAL, },	/* ERRCODE_CHECK_VIOLATION */
    181  1.1  yamt 		{ "40001", EAGAIN, },	/* ERRCODE_T_R_SERIALIZATION_FAILURE */
    182  1.1  yamt 		{ "40P01", EAGAIN, },	/* ERRCODE_T_R_DEADLOCK_DETECTED */
    183  1.1  yamt 		{ "42704", ENOENT, },	/* ERRCODE_UNDEFINED_OBJECT */
    184  1.1  yamt 		{ "53100", ENOSPC, },	/* ERRCODE_DISK_FULL */
    185  1.1  yamt 		{ "53200", ENOMEM, },	/* ERRCODE_OUT_OF_MEMORY */
    186  1.1  yamt 		{ "XX000", EAGAIN, },	/* ERRCODE_INTERNAL_ERROR */
    187  1.1  yamt 	};
    188  1.1  yamt 	unsigned int i;
    189  1.1  yamt 
    190  1.1  yamt 	for (i = 0; i < __arraycount(map); i++) {
    191  1.1  yamt 		if (!memcmp(map[i].sqlstate, sqlstate, 5)) {
    192  1.1  yamt 			const int error = map[i].error;
    193  1.1  yamt 
    194  1.1  yamt 			if (error != 0) {
    195  1.1  yamt 				DPRINTF("sqlstate %5s mapped to error %d\n",
    196  1.1  yamt 				    sqlstate, error);
    197  1.1  yamt 			}
    198  1.1  yamt 			if (error == EINVAL) {
    199  1.1  yamt 				/*
    200  1.1  yamt 				 * sounds like a bug.
    201  1.1  yamt 				 */
    202  1.1  yamt 				abort();
    203  1.1  yamt 			}
    204  1.1  yamt 			return error;
    205  1.1  yamt 		}
    206  1.1  yamt 	}
    207  1.1  yamt 	DPRINTF("unknown sqlstate %5s mapped to EIO\n", sqlstate);
    208  1.1  yamt 	return EIO;
    209  1.1  yamt }
    210  1.1  yamt 
    211  1.1  yamt struct cmd {
    212  1.1  yamt 	char name[32];		/* name of prepared statement */
    213  1.1  yamt 	char *cmd;		/* query string */
    214  1.1  yamt 	unsigned int nparams;
    215  1.1  yamt 	Oid *paramtypes;
    216  1.1  yamt 	uint32_t prepared_mask;	/* for which connections this is prepared? */
    217  1.1  yamt 	unsigned int flags;	/* CMD_ flags */
    218  1.1  yamt };
    219  1.1  yamt 
    220  1.1  yamt #define	CMD_NOPREPARE	1	/* don't prepare this command */
    221  1.1  yamt 
    222  1.1  yamt struct cmd *
    223  1.1  yamt createcmd(const char *cmd, unsigned int flags, ...)
    224  1.1  yamt {
    225  1.1  yamt 	struct cmd *c;
    226  1.1  yamt 	va_list ap;
    227  1.1  yamt 	const char *cp;
    228  1.1  yamt 	unsigned int i;
    229  1.1  yamt 	static unsigned int cmdid;
    230  1.1  yamt 
    231  1.1  yamt 	c = emalloc(sizeof(*c));
    232  1.1  yamt 	c->cmd = estrdup(cmd);
    233  1.1  yamt 	c->nparams = 0;
    234  1.1  yamt 	va_start(ap, flags);
    235  1.1  yamt 	for (cp = cmd; *cp != 0; cp++) {
    236  1.1  yamt 		if (*cp == '$') { /* XXX */
    237  1.1  yamt 			c->nparams++;
    238  1.1  yamt 		}
    239  1.1  yamt 	}
    240  1.1  yamt 	c->paramtypes = emalloc(c->nparams * sizeof(*c->paramtypes));
    241  1.1  yamt 	for (i = 0; i < c->nparams; i++) {
    242  1.1  yamt 		Oid type = va_arg(ap, Oid);
    243  1.1  yamt 		assert(type == BYTEA ||
    244  1.1  yamt 		    type == INT4OID || type == INT8OID || type == OIDOID ||
    245  1.1  yamt 		    type == TEXTOID || type == TIMESTAMPTZOID);
    246  1.1  yamt 		c->paramtypes[i] = type;
    247  1.1  yamt 	}
    248  1.1  yamt 	va_end(ap);
    249  1.1  yamt 	snprintf(c->name, sizeof(c->name), "%u", cmdid++);
    250  1.1  yamt 	if ((flags & CMD_NOPREPARE) != 0) {
    251  1.1  yamt 		c->prepared_mask = ~0;
    252  1.1  yamt 	} else {
    253  1.1  yamt 		c->prepared_mask = 0;
    254  1.1  yamt 	}
    255  1.1  yamt 	c->flags = flags;
    256  1.1  yamt 	return c;
    257  1.1  yamt }
    258  1.1  yamt 
    259  1.1  yamt static void
    260  1.1  yamt freecmd(struct cmd *c)
    261  1.1  yamt {
    262  1.1  yamt 
    263  1.1  yamt 	free(c->paramtypes);
    264  1.1  yamt 	free(c->cmd);
    265  1.1  yamt 	free(c);
    266  1.1  yamt }
    267  1.1  yamt 
    268  1.1  yamt static int
    269  1.1  yamt fetch_noresult(struct Xconn *xc)
    270  1.1  yamt {
    271  1.1  yamt 	PGresult *res;
    272  1.1  yamt 	ExecStatusType status;
    273  1.1  yamt 	PGconn *conn = xc->conn;
    274  1.1  yamt 	int error;
    275  1.1  yamt 
    276  1.1  yamt 	pqwait(xc);
    277  1.1  yamt 	res = PQgetResult(conn);
    278  1.1  yamt 	if (res == NULL) {
    279  1.1  yamt 		return ENOENT;
    280  1.1  yamt 	}
    281  1.1  yamt 	status = PQresultStatus(res);
    282  1.1  yamt 	if (status == PGRES_COMMAND_OK) {
    283  1.1  yamt 		assert(PQnfields(res) == 0);
    284  1.1  yamt 		assert(PQntuples(res) == 0);
    285  1.1  yamt 		if (!strcmp(PQcmdTuples(res), "0")) {
    286  1.1  yamt 			error = ENOENT;
    287  1.1  yamt 		} else {
    288  1.1  yamt 			error = 0;
    289  1.1  yamt 		}
    290  1.1  yamt 	} else if (status == PGRES_FATAL_ERROR) {
    291  1.1  yamt 		error = sqltoerrno(PQresultErrorField(res, PG_DIAG_SQLSTATE));
    292  1.1  yamt 		assert(error != 0);
    293  1.1  yamt 		dumperror(xc, res);
    294  1.1  yamt 	} else {
    295  1.1  yamt 		errx(1, "%s not command_ok: %d: %s", __func__,
    296  1.1  yamt 		    (int)status,
    297  1.1  yamt 		    PQerrorMessage(conn));
    298  1.1  yamt 	}
    299  1.1  yamt 	PQclear(res);
    300  1.1  yamt 	res = PQgetResult(conn);
    301  1.1  yamt 	assert(res == NULL);
    302  1.1  yamt 	if (error != 0) {
    303  1.1  yamt 		DPRINTF("error %d\n", error);
    304  1.1  yamt 	}
    305  1.1  yamt 	return error;
    306  1.1  yamt }
    307  1.1  yamt 
    308  1.1  yamt static int
    309  1.1  yamt preparecmd(struct Xconn *xc, struct cmd *c)
    310  1.1  yamt {
    311  1.1  yamt 	PGconn *conn = xc->conn;
    312  1.1  yamt 	const uint32_t mask = 1 << xc->id;
    313  1.1  yamt 	int error;
    314  1.1  yamt 	int ret;
    315  1.1  yamt 
    316  1.1  yamt 	if ((c->prepared_mask & mask) != 0) {
    317  1.1  yamt 		return 0;
    318  1.1  yamt 	}
    319  1.1  yamt 	assert((c->flags & CMD_NOPREPARE) == 0);
    320  1.1  yamt 	DPRINTF("PREPARE: '%s'\n", c->cmd);
    321  1.1  yamt 	ret = PQsendPrepare(conn, c->name, c->cmd, c->nparams, c->paramtypes);
    322  1.1  yamt 	if (!ret) {
    323  1.1  yamt 		errx(EXIT_FAILURE, "PQsendPrepare: %s",
    324  1.1  yamt 		    PQerrorMessage(conn));
    325  1.1  yamt 	}
    326  1.1  yamt 	error = fetch_noresult(xc);
    327  1.1  yamt 	if (error != 0) {
    328  1.1  yamt 		return error;
    329  1.1  yamt 	}
    330  1.1  yamt 	c->prepared_mask |= mask;
    331  1.1  yamt 	return 0;
    332  1.1  yamt }
    333  1.1  yamt 
    334  1.1  yamt /*
    335  1.1  yamt  * vsendcmd:
    336  1.1  yamt  *
    337  1.1  yamt  * resultmode is just passed to PQsendQueryParams/PQsendQueryPrepared.
    338  1.1  yamt  * 0 for text and 1 for binary.
    339  1.1  yamt  */
    340  1.1  yamt 
    341  1.1  yamt static int
    342  1.1  yamt vsendcmd(struct Xconn *xc, int resultmode, struct cmd *c, va_list ap)
    343  1.1  yamt {
    344  1.1  yamt 	PGconn *conn = xc->conn;
    345  1.1  yamt 	char **paramvalues;
    346  1.1  yamt 	int *paramlengths;
    347  1.1  yamt 	int *paramformats;
    348  1.1  yamt 	unsigned int i;
    349  1.1  yamt 	int error;
    350  1.1  yamt 	int ret;
    351  1.1  yamt 
    352  1.1  yamt 	assert(xc->owner != NULL);
    353  1.1  yamt 	assert(xc->blocker == NULL);
    354  1.1  yamt 	error = preparecmd(xc, c);
    355  1.1  yamt 	if (error != 0) {
    356  1.1  yamt 		return error;
    357  1.1  yamt 	}
    358  1.1  yamt 	paramvalues = emalloc(c->nparams * sizeof(*paramvalues));
    359  1.1  yamt 	paramlengths = NULL;
    360  1.1  yamt 	paramformats = NULL;
    361  1.1  yamt 	DPRINTF("CMD: '%s'\n", c->cmd);
    362  1.1  yamt 	for (i = 0; i < c->nparams; i++) {
    363  1.1  yamt 		Oid type = c->paramtypes[i];
    364  1.1  yamt 		char tmpstore[1024];
    365  1.1  yamt 		const char *buf = NULL;
    366  1.1  yamt 		intmax_t v = 0; /* XXXgcc */
    367  1.1  yamt 		int sz;
    368  1.1  yamt 		bool binary = false;
    369  1.1  yamt 
    370  1.1  yamt 		switch (type) {
    371  1.1  yamt 		case BYTEA:
    372  1.1  yamt 			buf = va_arg(ap, const void *);
    373  1.1  yamt 			sz = (int)va_arg(ap, size_t);
    374  1.1  yamt 			binary = true;
    375  1.1  yamt 			break;
    376  1.1  yamt 		case INT8OID:
    377  1.1  yamt 		case OIDOID:
    378  1.1  yamt 		case INT4OID:
    379  1.1  yamt 			switch (type) {
    380  1.1  yamt 			case INT8OID:
    381  1.1  yamt 				v = (intmax_t)va_arg(ap, int64_t);
    382  1.1  yamt 				break;
    383  1.1  yamt 			case OIDOID:
    384  1.1  yamt 				v = (intmax_t)va_arg(ap, Oid);
    385  1.1  yamt 				break;
    386  1.1  yamt 			case INT4OID:
    387  1.1  yamt 				v = (intmax_t)va_arg(ap, int32_t);
    388  1.1  yamt 				break;
    389  1.1  yamt 			default:
    390  1.1  yamt 				errx(EXIT_FAILURE, "unknown integer oid %u",
    391  1.1  yamt 				    type);
    392  1.1  yamt 			}
    393  1.1  yamt 			buf = tmpstore;
    394  1.1  yamt 			sz = snprintf(tmpstore, sizeof(tmpstore),
    395  1.1  yamt 			    "%jd", v);
    396  1.1  yamt 			assert(sz != -1);
    397  1.1  yamt 			assert((size_t)sz < sizeof(tmpstore));
    398  1.1  yamt 			sz += 1;
    399  1.1  yamt 			break;
    400  1.1  yamt 		case TEXTOID:
    401  1.1  yamt 		case TIMESTAMPTZOID:
    402  1.1  yamt 			buf = va_arg(ap, char *);
    403  1.1  yamt 			sz = strlen(buf) + 1;
    404  1.1  yamt 			break;
    405  1.1  yamt 		default:
    406  1.1  yamt 			errx(EXIT_FAILURE, "%s: unknown param type %u",
    407  1.1  yamt 			    __func__, type);
    408  1.1  yamt 		}
    409  1.1  yamt 		if (binary) {
    410  1.1  yamt 			if (paramlengths == NULL) {
    411  1.1  yamt 				paramlengths =
    412  1.1  yamt 				    emalloc(c->nparams * sizeof(*paramformats));
    413  1.1  yamt 			}
    414  1.1  yamt 			if (paramformats == NULL) {
    415  1.1  yamt 				paramformats = ecalloc(1,
    416  1.1  yamt 				    c->nparams * sizeof(*paramformats));
    417  1.1  yamt 			}
    418  1.1  yamt 			paramformats[i] = 1;
    419  1.1  yamt 			paramlengths[i] = sz;
    420  1.1  yamt 		}
    421  1.1  yamt 		paramvalues[i] = emalloc(sz);
    422  1.1  yamt 		memcpy(paramvalues[i], buf, sz);
    423  1.1  yamt 		if (binary) {
    424  1.1  yamt 			DPRINTF("\t[%u]=<BINARY>\n", i);
    425  1.1  yamt 		} else {
    426  1.1  yamt 			DPRINTF("\t[%u]='%s'\n", i, paramvalues[i]);
    427  1.1  yamt 		}
    428  1.1  yamt 	}
    429  1.1  yamt 	if ((c->flags & CMD_NOPREPARE) != 0) {
    430  1.1  yamt 		ret = PQsendQueryParams(conn, c->cmd, c->nparams, c->paramtypes,
    431  1.1  yamt 		    (const char * const *)paramvalues, paramlengths,
    432  1.1  yamt 		    paramformats, resultmode);
    433  1.1  yamt 	} else {
    434  1.1  yamt 		ret = PQsendQueryPrepared(conn, c->name, c->nparams,
    435  1.1  yamt 		    (const char * const *)paramvalues, paramlengths,
    436  1.1  yamt 		    paramformats, resultmode);
    437  1.1  yamt 	}
    438  1.1  yamt 	for (i = 0; i < c->nparams; i++) {
    439  1.1  yamt 		free(paramvalues[i]);
    440  1.1  yamt 	}
    441  1.1  yamt 	free(paramvalues);
    442  1.1  yamt 	free(paramlengths);
    443  1.1  yamt 	free(paramformats);
    444  1.1  yamt 	if (!ret) {
    445  1.1  yamt 		errx(EXIT_FAILURE, "PQsendQueryPrepared: %s",
    446  1.1  yamt 		    PQerrorMessage(conn));
    447  1.1  yamt 	}
    448  1.1  yamt 	return 0;
    449  1.1  yamt }
    450  1.1  yamt 
    451  1.1  yamt int
    452  1.1  yamt sendcmd(struct Xconn *xc, struct cmd *c, ...)
    453  1.1  yamt {
    454  1.1  yamt 	va_list ap;
    455  1.1  yamt 	int error;
    456  1.1  yamt 
    457  1.1  yamt 	va_start(ap, c);
    458  1.1  yamt 	error = vsendcmd(xc, 0, c, ap);
    459  1.1  yamt 	va_end(ap);
    460  1.1  yamt 	return error;
    461  1.1  yamt }
    462  1.1  yamt 
    463  1.1  yamt int
    464  1.1  yamt sendcmdx(struct Xconn *xc, int resultmode, struct cmd *c, ...)
    465  1.1  yamt {
    466  1.1  yamt 	va_list ap;
    467  1.1  yamt 	int error;
    468  1.1  yamt 
    469  1.1  yamt 	va_start(ap, c);
    470  1.1  yamt 	error = vsendcmd(xc, resultmode, c, ap);
    471  1.1  yamt 	va_end(ap);
    472  1.1  yamt 	return error;
    473  1.1  yamt }
    474  1.1  yamt 
    475  1.1  yamt /*
    476  1.1  yamt  * simplecmd: a convenient routine to execute a command which returns
    477  1.1  yamt  * no rows synchronously.
    478  1.1  yamt  */
    479  1.1  yamt 
    480  1.1  yamt int
    481  1.1  yamt simplecmd(struct Xconn *xc, struct cmd *c, ...)
    482  1.1  yamt {
    483  1.1  yamt 	va_list ap;
    484  1.1  yamt 	int error;
    485  1.1  yamt 
    486  1.1  yamt 	va_start(ap, c);
    487  1.1  yamt 	error = vsendcmd(xc, 0, c, ap);
    488  1.1  yamt 	va_end(ap);
    489  1.1  yamt 	if (error != 0) {
    490  1.1  yamt 		return error;
    491  1.1  yamt 	}
    492  1.1  yamt 	return fetch_noresult(xc);
    493  1.1  yamt }
    494  1.1  yamt 
    495  1.1  yamt void
    496  1.1  yamt fetchinit(struct fetchstatus *s, struct Xconn *xc)
    497  1.1  yamt {
    498  1.1  yamt 	s->xc = xc;
    499  1.1  yamt 	s->res = NULL;
    500  1.1  yamt 	s->cur = 0;
    501  1.1  yamt 	s->nrows = 0;
    502  1.1  yamt 	s->done = false;
    503  1.1  yamt }
    504  1.1  yamt 
    505  1.1  yamt static intmax_t
    506  1.1  yamt getint(const char *str)
    507  1.1  yamt {
    508  1.1  yamt 	intmax_t i;
    509  1.1  yamt 	char *ep;
    510  1.1  yamt 
    511  1.1  yamt 	errno = 0;
    512  1.1  yamt 	i = strtoimax(str, &ep, 10);
    513  1.1  yamt 	assert(errno == 0);
    514  1.1  yamt 	assert(str[0] != 0);
    515  1.1  yamt 	assert(*ep == 0);
    516  1.1  yamt 	return i;
    517  1.1  yamt }
    518  1.1  yamt 
    519  1.1  yamt static int
    520  1.1  yamt vfetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, va_list ap)
    521  1.1  yamt {
    522  1.1  yamt 	PGconn *conn = s->xc->conn;
    523  1.1  yamt 	unsigned int i;
    524  1.1  yamt 
    525  1.1  yamt 	assert(conn != NULL);
    526  1.1  yamt 	if (s->res == NULL) {
    527  1.1  yamt 		ExecStatusType status;
    528  1.1  yamt 		int error;
    529  1.1  yamt 
    530  1.1  yamt 		pqwait(s->xc);
    531  1.1  yamt 		s->res = PQgetResult(conn);
    532  1.1  yamt 		if (s->res == NULL) {
    533  1.1  yamt 			s->done = true;
    534  1.1  yamt 			return ENOENT;
    535  1.1  yamt 		}
    536  1.1  yamt 		status = PQresultStatus(s->res);
    537  1.1  yamt 		if (status == PGRES_FATAL_ERROR) {
    538  1.1  yamt 			error = sqltoerrno(
    539  1.1  yamt 			    PQresultErrorField(s->res, PG_DIAG_SQLSTATE));
    540  1.1  yamt 			assert(error != 0);
    541  1.1  yamt 			dumperror(s->xc, s->res);
    542  1.1  yamt 			return error;
    543  1.1  yamt 		}
    544  1.1  yamt 		if (status != PGRES_TUPLES_OK) {
    545  1.1  yamt 			errx(1, "not tuples_ok: %s",
    546  1.1  yamt 			    PQerrorMessage(conn));
    547  1.1  yamt 		}
    548  1.1  yamt 		assert((unsigned int)PQnfields(s->res) == n);
    549  1.1  yamt 		s->nrows = PQntuples(s->res);
    550  1.1  yamt 		if (s->nrows == 0) {
    551  1.1  yamt 			DPRINTF("no rows\n");
    552  1.1  yamt 			return ENOENT;
    553  1.1  yamt 		}
    554  1.1  yamt 		assert(s->nrows >= 1);
    555  1.1  yamt 		s->cur = 0;
    556  1.1  yamt 	}
    557  1.1  yamt 	for (i = 0; i < n; i++) {
    558  1.1  yamt 		size_t size;
    559  1.1  yamt 
    560  1.1  yamt 		assert((types[i] != BYTEA) == (PQfformat(s->res, i) == 0));
    561  1.1  yamt 		DPRINTF("[%u] PQftype = %d, types = %d, value = '%s'\n",
    562  1.1  yamt 		    i, PQftype(s->res, i), types[i],
    563  1.1  yamt 		    PQgetisnull(s->res, s->cur, i) ? "<NULL>" :
    564  1.1  yamt 		    PQfformat(s->res, i) == 0 ? PQgetvalue(s->res, s->cur, i) :
    565  1.1  yamt 		    "<BINARY>");
    566  1.1  yamt 		assert(PQftype(s->res, i) == types[i]);
    567  1.1  yamt 		assert(!PQgetisnull(s->res, s->cur, i));
    568  1.1  yamt 		switch(types[i]) {
    569  1.1  yamt 		case INT8OID:
    570  1.1  yamt 			*va_arg(ap, int64_t *) =
    571  1.1  yamt 			    getint(PQgetvalue(s->res, s->cur, i));
    572  1.1  yamt 			break;
    573  1.1  yamt 		case OIDOID:
    574  1.1  yamt 			*va_arg(ap, Oid *) =
    575  1.1  yamt 			    getint(PQgetvalue(s->res, s->cur, i));
    576  1.1  yamt 			break;
    577  1.1  yamt 		case INT4OID:
    578  1.1  yamt 			*va_arg(ap, int32_t *) =
    579  1.1  yamt 			    getint(PQgetvalue(s->res, s->cur, i));
    580  1.1  yamt 			break;
    581  1.1  yamt 		case TEXTOID:
    582  1.1  yamt 			*va_arg(ap, char **) =
    583  1.1  yamt 			    estrdup(PQgetvalue(s->res, s->cur, i));
    584  1.1  yamt 			break;
    585  1.1  yamt 		case BYTEA:
    586  1.1  yamt 			size = PQgetlength(s->res, s->cur, i);
    587  1.1  yamt 			memcpy(va_arg(ap, void *),
    588  1.1  yamt 			    PQgetvalue(s->res, s->cur, i), size);
    589  1.1  yamt 			*va_arg(ap, size_t *) = size;
    590  1.1  yamt 			break;
    591  1.1  yamt 		default:
    592  1.1  yamt 			errx(EXIT_FAILURE, "%s unknown type %u", __func__,
    593  1.1  yamt 			    types[i]);
    594  1.1  yamt 		}
    595  1.1  yamt 	}
    596  1.1  yamt 	s->cur++;
    597  1.1  yamt 	if (s->cur == s->nrows) {
    598  1.1  yamt 		PQclear(s->res);
    599  1.1  yamt 		s->res = NULL;
    600  1.1  yamt 	}
    601  1.1  yamt 	return 0;
    602  1.1  yamt }
    603  1.1  yamt 
    604  1.1  yamt int
    605  1.1  yamt fetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, ...)
    606  1.1  yamt {
    607  1.1  yamt 	va_list ap;
    608  1.1  yamt 	int error;
    609  1.1  yamt 
    610  1.1  yamt 	va_start(ap, types);
    611  1.1  yamt 	error = vfetchnext(s, n, types, ap);
    612  1.1  yamt 	va_end(ap);
    613  1.1  yamt 	return error;
    614  1.1  yamt }
    615  1.1  yamt 
    616  1.1  yamt void
    617  1.1  yamt fetchdone(struct fetchstatus *s)
    618  1.1  yamt {
    619  1.1  yamt 
    620  1.1  yamt 	if (s->res != NULL) {
    621  1.1  yamt 		PQclear(s->res);
    622  1.1  yamt 		s->res = NULL;
    623  1.1  yamt 	}
    624  1.1  yamt 	if (!s->done) {
    625  1.1  yamt 		PGresult *res;
    626  1.1  yamt 		unsigned int n;
    627  1.1  yamt 
    628  1.1  yamt 		n = 0;
    629  1.1  yamt 		while ((res = PQgetResult(s->xc->conn)) != NULL) {
    630  1.1  yamt 			PQclear(res);
    631  1.1  yamt 			n++;
    632  1.1  yamt 		}
    633  1.1  yamt 		if (n > 0) {
    634  1.1  yamt 			DPRINTF("%u rows dropped\n", n);
    635  1.1  yamt 		}
    636  1.1  yamt 	}
    637  1.1  yamt }
    638  1.1  yamt 
    639  1.1  yamt int
    640  1.1  yamt simplefetch(struct Xconn *xc, Oid type, ...)
    641  1.1  yamt {
    642  1.1  yamt 	struct fetchstatus s;
    643  1.1  yamt 	va_list ap;
    644  1.1  yamt 	int error;
    645  1.1  yamt 
    646  1.1  yamt 	fetchinit(&s, xc);
    647  1.1  yamt 	va_start(ap, type);
    648  1.1  yamt 	error = vfetchnext(&s, 1, &type, ap);
    649  1.1  yamt 	va_end(ap);
    650  1.1  yamt 	assert(error != 0 || s.res == NULL);
    651  1.1  yamt 	fetchdone(&s);
    652  1.1  yamt 	return error;
    653  1.1  yamt }
    654  1.1  yamt 
    655  1.2  yamt static void
    656  1.2  yamt setlabel(struct Xconn *xc, const char *label)
    657  1.2  yamt {
    658  1.2  yamt 	int error;
    659  1.2  yamt 
    660  1.2  yamt 	/*
    661  1.2  yamt 	 * put the label into application_name so that it's shown in
    662  1.2  yamt 	 * pg_stat_activity.  we are sure that our labels don't need
    663  1.2  yamt 	 * PQescapeStringConn.
    664  1.2  yamt 	 *
    665  1.2  yamt 	 * example:
    666  1.2  yamt 	 *	SELECT pid,application_name,query FROM pg_stat_activity
    667  1.2  yamt 	 *	WHERE state <> 'idle'
    668  1.2  yamt 	 */
    669  1.2  yamt 
    670  1.2  yamt 	if (label != NULL) {
    671  1.2  yamt 		struct cmd *c;
    672  1.2  yamt 		char cmd_str[1024];
    673  1.2  yamt 
    674  1.2  yamt 		snprintf(cmd_str, sizeof(cmd_str),
    675  1.2  yamt 		    "SET application_name TO 'pgfs: %s'", label);
    676  1.2  yamt 		c = createcmd(cmd_str, CMD_NOPREPARE);
    677  1.2  yamt 		error = simplecmd(xc, c);
    678  1.2  yamt 		freecmd(c);
    679  1.2  yamt 		assert(error == 0);
    680  1.2  yamt 	} else {
    681  1.2  yamt #if 0 /* don't bother to clear label */
    682  1.2  yamt 		static struct cmd *c;
    683  1.2  yamt 
    684  1.2  yamt 		CREATECMD_NOPARAM(c, "SET application_name TO 'pgfs'");
    685  1.2  yamt 		error = simplecmd(xc, c);
    686  1.2  yamt 		assert(error == 0);
    687  1.2  yamt #endif
    688  1.2  yamt 	}
    689  1.2  yamt }
    690  1.2  yamt 
    691  1.1  yamt struct Xconn *
    692  1.2  yamt begin(struct puffs_usermount *pu, const char *label)
    693  1.1  yamt {
    694  1.1  yamt 	struct Xconn *xc = getxc(puffs_cc_getcc(pu));
    695  1.1  yamt 	static struct cmd *c;
    696  1.1  yamt 	int error;
    697  1.1  yamt 
    698  1.2  yamt 	setlabel(xc, label);
    699  1.1  yamt 	CREATECMD_NOPARAM(c, "BEGIN");
    700  1.1  yamt 	assert(!xc->in_trans);
    701  1.1  yamt 	error = simplecmd(xc, c);
    702  1.1  yamt 	assert(error == 0);
    703  1.1  yamt 	assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
    704  1.1  yamt 	xc->in_trans = true;
    705  1.1  yamt 	return xc;
    706  1.1  yamt }
    707  1.1  yamt 
    708  1.1  yamt struct Xconn *
    709  1.2  yamt begin_readonly(struct puffs_usermount *pu, const char *label)
    710  1.1  yamt {
    711  1.1  yamt 	struct Xconn *xc = getxc(puffs_cc_getcc(pu));
    712  1.1  yamt 	static struct cmd *c;
    713  1.1  yamt 	int error;
    714  1.1  yamt 
    715  1.2  yamt 	setlabel(xc, label);
    716  1.1  yamt 	CREATECMD_NOPARAM(c, "BEGIN READ ONLY");
    717  1.1  yamt 	assert(!xc->in_trans);
    718  1.1  yamt 	error = simplecmd(xc, c);
    719  1.1  yamt 	assert(error == 0);
    720  1.1  yamt 	assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
    721  1.1  yamt 	xc->in_trans = true;
    722  1.1  yamt 	return xc;
    723  1.1  yamt }
    724  1.1  yamt 
    725  1.1  yamt void
    726  1.1  yamt rollback(struct Xconn *xc)
    727  1.1  yamt {
    728  1.1  yamt 	PGTransactionStatusType status;
    729  1.1  yamt 
    730  1.1  yamt 	/*
    731  1.1  yamt 	 * check the status as we are not sure the status of our transaction
    732  1.1  yamt 	 * after a failed commit.
    733  1.1  yamt 	 */
    734  1.1  yamt 	status = PQtransactionStatus(xc->conn);
    735  1.1  yamt 	assert(status != PQTRANS_ACTIVE);
    736  1.1  yamt 	assert(status != PQTRANS_UNKNOWN);
    737  1.1  yamt 	if (status != PQTRANS_IDLE) {
    738  1.1  yamt 		static struct cmd *c;
    739  1.1  yamt 		int error;
    740  1.1  yamt 
    741  1.1  yamt 		assert(status == PQTRANS_INTRANS || status == PQTRANS_INERROR);
    742  1.1  yamt 		CREATECMD_NOPARAM(c, "ROLLBACK");
    743  1.1  yamt 		error = simplecmd(xc, c);
    744  1.1  yamt 		assert(error == 0);
    745  1.1  yamt 	}
    746  1.1  yamt 	DPRINTF("xc %p rollback %p\n", xc, xc->owner);
    747  1.2  yamt 	setlabel(xc, NULL);
    748  1.1  yamt 	relxc(xc);
    749  1.1  yamt }
    750  1.1  yamt 
    751  1.1  yamt int
    752  1.1  yamt commit(struct Xconn *xc)
    753  1.1  yamt {
    754  1.1  yamt 	static struct cmd *c;
    755  1.1  yamt 	int error;
    756  1.1  yamt 
    757  1.1  yamt 	CREATECMD_NOPARAM(c, "COMMIT");
    758  1.1  yamt 	error = simplecmd(xc, c);
    759  1.2  yamt 	setlabel(xc, NULL);
    760  1.1  yamt 	if (error == 0) {
    761  1.1  yamt 		DPRINTF("xc %p commit %p\n", xc, xc->owner);
    762  1.1  yamt 		relxc(xc);
    763  1.1  yamt 	}
    764  1.1  yamt 	return error;
    765  1.1  yamt }
    766  1.1  yamt 
    767  1.1  yamt int
    768  1.1  yamt commit_sync(struct Xconn *xc)
    769  1.1  yamt {
    770  1.1  yamt 	static struct cmd *c;
    771  1.1  yamt 	int error;
    772  1.1  yamt 
    773  1.1  yamt 	assert(!pgfs_dosync);
    774  1.1  yamt 	CREATECMD_NOPARAM(c, "SET LOCAL SYNCHRONOUS_COMMIT TO ON");
    775  1.1  yamt 	error = simplecmd(xc, c);
    776  1.1  yamt 	assert(error == 0);
    777  1.1  yamt 	return commit(xc);
    778  1.1  yamt }
    779  1.1  yamt 
    780  1.1  yamt static void
    781  1.1  yamt pgfs_notice_receiver(void *vp, const PGresult *res)
    782  1.1  yamt {
    783  1.1  yamt 	struct Xconn *xc = vp;
    784  1.1  yamt 
    785  1.1  yamt 	assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR);
    786  1.1  yamt 	fprintf(stderr, "got a notice on %p\n", xc);
    787  1.1  yamt 	dumperror(xc, res);
    788  1.1  yamt }
    789  1.1  yamt 
    790  1.1  yamt static int
    791  1.1  yamt pgfs_readframe(struct puffs_usermount *pu, struct puffs_framebuf *pufbuf,
    792  1.1  yamt     int fd, int *done)
    793  1.1  yamt {
    794  1.1  yamt 	struct Xconn *xc;
    795  1.1  yamt 	PGconn *conn;
    796  1.1  yamt 
    797  1.1  yamt 	TAILQ_FOREACH(xc, &xclist, list) {
    798  1.1  yamt 		if (PQsocket(xc->conn) == fd) {
    799  1.1  yamt 			break;
    800  1.1  yamt 		}
    801  1.1  yamt 	}
    802  1.1  yamt 	assert(xc != NULL);
    803  1.1  yamt 	conn = xc->conn;
    804  1.1  yamt 	PQconsumeInput(conn);
    805  1.1  yamt 	if (!PQisBusy(conn)) {
    806  1.1  yamt 		if (xc->blocker != NULL) {
    807  1.1  yamt 			DPRINTF("schedule %p\n", xc->blocker);
    808  1.1  yamt 			puffs_cc_schedule(xc->blocker);
    809  1.1  yamt 		} else {
    810  1.1  yamt 			DPRINTF("no blockers\n");
    811  1.1  yamt 		}
    812  1.1  yamt 	}
    813  1.1  yamt 	*done = 0;
    814  1.1  yamt 	return 0;
    815  1.1  yamt }
    816  1.1  yamt 
    817  1.1  yamt int
    818  1.1  yamt pgfs_connectdb(struct puffs_usermount *pu, const char *dbname,
    819  1.1  yamt     const char *dbuser, bool debug, bool synchronous, unsigned int nconn)
    820  1.1  yamt {
    821  1.1  yamt 	const char *keywords[3+1];
    822  1.1  yamt 	const char *values[3];
    823  1.1  yamt 	unsigned int i;
    824  1.1  yamt 
    825  1.1  yamt 	if (nconn > 32) {
    826  1.1  yamt 		/*
    827  1.1  yamt 		 * limit from sizeof(cmd->prepared_mask)
    828  1.1  yamt 		 */
    829  1.1  yamt 		return EINVAL;
    830  1.1  yamt 	}
    831  1.1  yamt 	if (debug) {
    832  1.1  yamt 		pgfs_dodprintf = true;
    833  1.1  yamt 	}
    834  1.1  yamt 	if (synchronous) {
    835  1.1  yamt 		pgfs_dosync = true;
    836  1.1  yamt 	}
    837  1.1  yamt 	i = 0;
    838  1.1  yamt 	if (dbname != NULL) {
    839  1.1  yamt 		keywords[i] = "dbname";
    840  1.1  yamt 		values[i] = dbname;
    841  1.1  yamt 		i++;
    842  1.1  yamt 	}
    843  1.1  yamt 	if (dbuser != NULL) {
    844  1.1  yamt 		keywords[i] = "user";
    845  1.1  yamt 		values[i] = dbuser;
    846  1.1  yamt 		i++;
    847  1.1  yamt 	}
    848  1.1  yamt 	keywords[i] = "application_name";
    849  1.1  yamt 	values[i] = "pgfs";
    850  1.1  yamt 	i++;
    851  1.1  yamt 	keywords[i] = NULL;
    852  1.1  yamt 	puffs_framev_init(pu, pgfs_readframe, NULL, NULL, NULL, NULL);
    853  1.1  yamt 	for (i = 0; i < nconn; i++) {
    854  1.1  yamt 		struct Xconn *xc;
    855  1.1  yamt 		struct Xconn *xc2;
    856  1.1  yamt 		static int xcid;
    857  1.1  yamt 		PGconn *conn;
    858  1.1  yamt 		struct cmd *c;
    859  1.1  yamt 		int error;
    860  1.1  yamt 
    861  1.1  yamt 		conn = PQconnectdbParams(keywords, values, 0);
    862  1.1  yamt 		if (conn == NULL) {
    863  1.1  yamt 			errx(EXIT_FAILURE,
    864  1.1  yamt 			    "PQconnectdbParams: unknown failure");
    865  1.1  yamt 		}
    866  1.1  yamt 		if (PQstatus(conn) != CONNECTION_OK) {
    867  1.1  yamt 			/*
    868  1.1  yamt 			 * XXX sleep and retry on ERRCODE_CANNOT_CONNECT_NOW
    869  1.1  yamt 			 */
    870  1.1  yamt 			errx(EXIT_FAILURE, "PQconnectdbParams: %s",
    871  1.1  yamt 			    PQerrorMessage(conn));
    872  1.1  yamt 		}
    873  1.1  yamt 		DPRINTF("protocol version %d\n", PQprotocolVersion(conn));
    874  1.1  yamt 		puffs_framev_addfd(pu, PQsocket(conn), PUFFS_FBIO_READ);
    875  1.1  yamt 		xc = emalloc(sizeof(*xc));
    876  1.1  yamt 		xc->conn = conn;
    877  1.1  yamt 		xc->blocker = NULL;
    878  1.1  yamt 		xc->owner = NULL;
    879  1.1  yamt 		xc->in_trans = false;
    880  1.1  yamt 		xc->id = xcid++;
    881  1.1  yamt 		assert(xc->id < 32);
    882  1.1  yamt 		PQsetNoticeReceiver(conn, pgfs_notice_receiver, xc);
    883  1.1  yamt 		TAILQ_INSERT_HEAD(&xclist, xc, list);
    884  1.2  yamt 		xc2 = begin(pu, NULL);
    885  1.1  yamt 		assert(xc2 == xc);
    886  1.1  yamt 		c = createcmd("SET search_path TO pgfs", CMD_NOPREPARE);
    887  1.1  yamt 		error = simplecmd(xc, c);
    888  1.1  yamt 		assert(error == 0);
    889  1.1  yamt 		freecmd(c);
    890  1.1  yamt 		c = createcmd("SET SESSION CHARACTERISTICS AS "
    891  1.1  yamt 		    "TRANSACTION ISOLATION LEVEL REPEATABLE READ",
    892  1.1  yamt 		    CMD_NOPREPARE);
    893  1.1  yamt 		error = simplecmd(xc, c);
    894  1.1  yamt 		assert(error == 0);
    895  1.1  yamt 		freecmd(c);
    896  1.1  yamt 		c = createcmd("SET SESSION TIME ZONE UTC", CMD_NOPREPARE);
    897  1.1  yamt 		error = simplecmd(xc, c);
    898  1.1  yamt 		assert(error == 0);
    899  1.1  yamt 		freecmd(c);
    900  1.1  yamt 		if (!pgfs_dosync) {
    901  1.1  yamt 			c = createcmd("SET SESSION SYNCHRONOUS_COMMIT TO OFF",
    902  1.1  yamt 			    CMD_NOPREPARE);
    903  1.1  yamt 			error = simplecmd(xc, c);
    904  1.1  yamt 			assert(error == 0);
    905  1.1  yamt 			freecmd(c);
    906  1.1  yamt 		}
    907  1.1  yamt 		if (debug) {
    908  1.1  yamt 			struct fetchstatus s;
    909  1.1  yamt 			static const Oid types[] = { INT8OID, };
    910  1.1  yamt 			uint64_t pid;
    911  1.1  yamt 
    912  1.1  yamt 			c = createcmd("SELECT pg_backend_pid()::int8;",
    913  1.1  yamt 			    CMD_NOPREPARE);
    914  1.1  yamt 			error = sendcmd(xc, c);
    915  1.1  yamt 			assert(error == 0);
    916  1.1  yamt 			fetchinit(&s, xc);
    917  1.1  yamt 			error = FETCHNEXT(&s, types, &pid);
    918  1.1  yamt 			fetchdone(&s);
    919  1.1  yamt 			assert(error == 0);
    920  1.1  yamt 			DPRINTF("xc %p backend pid %" PRIu64 "\n", xc, pid);
    921  1.1  yamt 		}
    922  1.1  yamt 		error = commit(xc);
    923  1.1  yamt 		assert(error == 0);
    924  1.1  yamt 		assert(xc->owner == NULL);
    925  1.1  yamt 	}
    926  1.1  yamt 	/*
    927  1.1  yamt 	 * XXX cleanup unlinked files here?  what to do when the filesystem
    928  1.1  yamt 	 * is shared?
    929  1.1  yamt 	 */
    930  1.1  yamt 	return 0;
    931  1.1  yamt }
    932  1.1  yamt 
    933  1.1  yamt struct waitq flushwaitq = TAILQ_HEAD_INITIALIZER(flushwaitq);
    934  1.1  yamt struct puffs_cc *flusher = NULL;
    935  1.1  yamt 
    936  1.1  yamt int
    937  1.1  yamt flush_xacts(struct puffs_usermount *pu)
    938  1.1  yamt {
    939  1.1  yamt 	struct puffs_cc *cc = puffs_cc_getcc(pu);
    940  1.1  yamt 	struct Xconn *xc;
    941  1.1  yamt 	static struct cmd *c;
    942  1.1  yamt 	uint64_t dummy;
    943  1.1  yamt 	int error;
    944  1.1  yamt 
    945  1.1  yamt 	/*
    946  1.1  yamt 	 * flush all previously issued asynchronous transactions.
    947  1.1  yamt 	 *
    948  1.1  yamt 	 * XXX
    949  1.1  yamt 	 * unfortunately it seems that there is no clean way to tell
    950  1.1  yamt 	 * PostgreSQL flush XLOG.  we could perform a CHECKPOINT but it's
    951  1.1  yamt 	 * too expensive and overkill for our purpose.
    952  1.1  yamt 	 * besides, PostgreSQL has an optimization to skip XLOG flushing
    953  1.1  yamt 	 * for transactions which didn't produce WAL records.
    954  1.1  yamt 	 * (changeset f6a0863e3cb72763490ceca2c558d5ef2dddd5f2)
    955  1.1  yamt 	 * it means that an empty transaction ("BEGIN; COMMIT;"), which
    956  1.1  yamt 	 * doesn't produce any WAL records, doesn't flush the XLOG even if
    957  1.1  yamt 	 * synchronous_commit=on.  we issues a dummy setval() to avoid the
    958  1.1  yamt 	 * optimization.
    959  1.1  yamt 	 * on the other hand, we try to avoid creating unnecessary WAL activity
    960  1.1  yamt 	 * by serializing flushing and checking XLOG locations.
    961  1.1  yamt 	 */
    962  1.1  yamt 
    963  1.1  yamt 	assert(!pgfs_dosync);
    964  1.1  yamt 	if (flusher != NULL) { /* serialize flushers */
    965  1.1  yamt 		DPRINTF("%p flush in progress %p\n", cc, flusher);
    966  1.1  yamt 		waiton(&flushwaitq, cc);
    967  1.1  yamt 		assert(flusher == NULL);
    968  1.1  yamt 	}
    969  1.1  yamt 	DPRINTF("%p start flushing\n", cc);
    970  1.1  yamt 	flusher = cc;
    971  1.1  yamt retry:
    972  1.2  yamt 	xc = begin(pu, "flush");
    973  1.1  yamt 	CREATECMD_NOPARAM(c, "SELECT setval('dummyseq', 1) WHERE "
    974  1.1  yamt 	    "pg_current_xlog_insert_location() <> pg_current_xlog_location()");
    975  1.1  yamt 	error = sendcmd(xc, c);
    976  1.1  yamt 	if (error != 0) {
    977  1.1  yamt 		goto got_error;
    978  1.1  yamt 	}
    979  1.1  yamt 	error = simplefetch(xc, INT8OID, &dummy);
    980  1.1  yamt 	assert(error != 0 || dummy == 1);
    981  1.1  yamt 	if (error == ENOENT) {
    982  1.1  yamt 		/*
    983  1.1  yamt 		 * there seems to be nothing to flush.
    984  1.1  yamt 		 */
    985  1.1  yamt 		DPRINTF("%p no sync\n", cc);
    986  1.1  yamt 		error = 0;
    987  1.1  yamt 	}
    988  1.1  yamt 	if (error != 0) {
    989  1.1  yamt 		goto got_error;
    990  1.1  yamt 	}
    991  1.1  yamt 	error = commit_sync(xc);
    992  1.1  yamt 	if (error != 0) {
    993  1.1  yamt 		goto got_error;
    994  1.1  yamt 	}
    995  1.1  yamt 	goto done;
    996  1.1  yamt got_error:
    997  1.1  yamt 	rollback(xc);
    998  1.1  yamt 	if (error == EAGAIN) {
    999  1.1  yamt 		goto retry;
   1000  1.1  yamt 	}
   1001  1.1  yamt done:
   1002  1.1  yamt 	assert(flusher == cc);
   1003  1.1  yamt 	flusher = NULL;
   1004  1.1  yamt 	wakeup_one(&flushwaitq);
   1005  1.1  yamt 	DPRINTF("%p end flushing error=%d\n", cc, error);
   1006  1.1  yamt 	return error;
   1007  1.1  yamt }
   1008