Home | History | Annotate | Line # | Download | only in pgfs
      1  1.3  yamt /*	$NetBSD: pgfs_db.c,v 1.3 2012/04/11 14:27:43 yamt Exp $	*/
      2  1.1  yamt 
      3  1.1  yamt /*-
      4  1.1  yamt  * Copyright (c)2010,2011 YAMAMOTO Takashi,
      5  1.1  yamt  * All rights reserved.
      6  1.1  yamt  *
      7  1.1  yamt  * Redistribution and use in source and binary forms, with or without
      8  1.1  yamt  * modification, are permitted provided that the following conditions
      9  1.1  yamt  * are met:
     10  1.1  yamt  * 1. Redistributions of source code must retain the above copyright
     11  1.1  yamt  *    notice, this list of conditions and the following disclaimer.
     12  1.1  yamt  * 2. Redistributions in binary form must reproduce the above copyright
     13  1.1  yamt  *    notice, this list of conditions and the following disclaimer in the
     14  1.1  yamt  *    documentation and/or other materials provided with the distribution.
     15  1.1  yamt  *
     16  1.1  yamt  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
     17  1.1  yamt  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     18  1.1  yamt  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     19  1.1  yamt  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     20  1.1  yamt  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     21  1.1  yamt  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
     22  1.1  yamt  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     23  1.1  yamt  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     24  1.1  yamt  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     25  1.1  yamt  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     26  1.1  yamt  * SUCH DAMAGE.
     27  1.1  yamt  */
     28  1.1  yamt 
     29  1.1  yamt /*
     30  1.1  yamt  * backend db operations
     31  1.1  yamt  */
     32  1.1  yamt 
     33  1.1  yamt #include <sys/cdefs.h>
     34  1.1  yamt #ifndef lint
     35  1.3  yamt __RCSID("$NetBSD: pgfs_db.c,v 1.3 2012/04/11 14:27:43 yamt Exp $");
     36  1.1  yamt #endif /* not lint */
     37  1.1  yamt 
     38  1.1  yamt #include <assert.h>
     39  1.1  yamt #include <err.h>
     40  1.1  yamt #include <errno.h>
     41  1.1  yamt #include <inttypes.h>
     42  1.1  yamt #include <puffs.h>
     43  1.1  yamt #include <stdbool.h>
     44  1.1  yamt #include <stdarg.h>
     45  1.1  yamt #include <stdio.h>
     46  1.1  yamt #include <stdlib.h>
     47  1.1  yamt #include <util.h>
     48  1.1  yamt 
     49  1.1  yamt #include <libpq-fe.h>
     50  1.1  yamt 
     51  1.1  yamt #include "pgfs_db.h"
     52  1.1  yamt #include "pgfs_waitq.h"
     53  1.1  yamt #include "pgfs_debug.h"
     54  1.1  yamt 
     55  1.1  yamt bool pgfs_dosync = false;
     56  1.1  yamt 
     57  1.1  yamt struct Xconn {
     58  1.1  yamt 	TAILQ_ENTRY(Xconn) list;
     59  1.1  yamt 	PGconn *conn;
     60  1.1  yamt 	struct puffs_cc *blocker;
     61  1.1  yamt 	struct puffs_cc *owner;
     62  1.1  yamt 	bool in_trans;
     63  1.1  yamt 	int id;
     64  1.3  yamt 	const char *label;
     65  1.1  yamt };
     66  1.1  yamt 
     67  1.1  yamt static void
     68  1.1  yamt dumperror(struct Xconn *xc, const PGresult *res)
     69  1.1  yamt {
     70  1.1  yamt 	static const struct {
     71  1.1  yamt 		const char *name;
     72  1.1  yamt 		int code;
     73  1.1  yamt 	} fields[] = {
     74  1.1  yamt #define F(x)	{ .name = #x, .code = x, }
     75  1.1  yamt 		F(PG_DIAG_SEVERITY),
     76  1.1  yamt 		F(PG_DIAG_SQLSTATE),
     77  1.1  yamt 		F(PG_DIAG_MESSAGE_PRIMARY),
     78  1.1  yamt 		F(PG_DIAG_MESSAGE_DETAIL),
     79  1.1  yamt 		F(PG_DIAG_MESSAGE_HINT),
     80  1.1  yamt 		F(PG_DIAG_STATEMENT_POSITION),
     81  1.1  yamt 		F(PG_DIAG_INTERNAL_POSITION),
     82  1.1  yamt 		F(PG_DIAG_INTERNAL_QUERY),
     83  1.1  yamt 		F(PG_DIAG_CONTEXT),
     84  1.1  yamt 		F(PG_DIAG_SOURCE_FILE),
     85  1.1  yamt 		F(PG_DIAG_SOURCE_LINE),
     86  1.1  yamt 		F(PG_DIAG_SOURCE_FUNCTION),
     87  1.1  yamt #undef F
     88  1.1  yamt 	};
     89  1.1  yamt 	unsigned int i;
     90  1.1  yamt 
     91  1.1  yamt 	if (!pgfs_dodprintf) {
     92  1.1  yamt 		return;
     93  1.1  yamt 	}
     94  1.1  yamt 	assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR ||
     95  1.1  yamt 	    PQresultStatus(res) == PGRES_FATAL_ERROR);
     96  1.1  yamt 	for (i = 0; i < __arraycount(fields); i++) {
     97  1.1  yamt 		const char *val = PQresultErrorField(res, fields[i].code);
     98  1.1  yamt 
     99  1.1  yamt 		if (val == NULL) {
    100  1.1  yamt 			continue;
    101  1.1  yamt 		}
    102  1.1  yamt 		fprintf(stderr, "%s: %s\n", fields[i].name, val);
    103  1.1  yamt 	}
    104  1.1  yamt }
    105  1.1  yamt 
    106  1.1  yamt TAILQ_HEAD(, Xconn) xclist = TAILQ_HEAD_INITIALIZER(xclist);
    107  1.1  yamt struct waitq xcwaitq = TAILQ_HEAD_INITIALIZER(xcwaitq);
    108  1.1  yamt 
    109  1.1  yamt static struct Xconn *
    110  1.1  yamt getxc(struct puffs_cc *cc)
    111  1.1  yamt {
    112  1.1  yamt 	struct Xconn *xc;
    113  1.1  yamt 
    114  1.1  yamt 	assert(cc != NULL);
    115  1.1  yamt retry:
    116  1.1  yamt 	TAILQ_FOREACH(xc, &xclist, list) {
    117  1.1  yamt 		if (xc->blocker == NULL) {
    118  1.1  yamt 			assert(xc->owner == NULL);
    119  1.1  yamt 			xc->owner = cc;
    120  1.1  yamt 			DPRINTF("xc %p acquire %p\n", xc, cc);
    121  1.1  yamt 			return xc;
    122  1.1  yamt 		} else {
    123  1.1  yamt 			assert(xc->owner == xc->blocker);
    124  1.1  yamt 		}
    125  1.1  yamt 	}
    126  1.1  yamt 	DPRINTF("no free conn %p\n", cc);
    127  1.1  yamt 	waiton(&xcwaitq, cc);
    128  1.1  yamt 	goto retry;
    129  1.1  yamt }
    130  1.1  yamt 
    131  1.1  yamt static void
    132  1.1  yamt relxc(struct Xconn *xc)
    133  1.1  yamt {
    134  1.1  yamt 
    135  1.1  yamt 	assert(xc->in_trans);
    136  1.1  yamt 	assert(xc->owner != NULL);
    137  1.1  yamt 	xc->in_trans = false;
    138  1.1  yamt 	xc->owner = NULL;
    139  1.1  yamt 	wakeup_one(&xcwaitq);
    140  1.1  yamt }
    141  1.1  yamt 
    142  1.1  yamt static void
    143  1.1  yamt pqwait(struct Xconn *xc)
    144  1.1  yamt {
    145  1.1  yamt 	PGconn *conn = xc->conn;
    146  1.1  yamt 	struct puffs_cc *cc = xc->owner;
    147  1.1  yamt 
    148  1.1  yamt 	if (PQflush(conn)) {
    149  1.1  yamt 		errx(EXIT_FAILURE, "PQflush: %s", PQerrorMessage(conn));
    150  1.1  yamt 	}
    151  1.1  yamt 	if (!PQisBusy(conn)) {
    152  1.1  yamt 		return;
    153  1.1  yamt 	}
    154  1.1  yamt 	assert(xc->blocker == NULL);
    155  1.1  yamt 	xc->blocker = cc;
    156  1.1  yamt 	DPRINTF("yielding %p\n", cc);
    157  1.1  yamt 	/* XXX is it safe to yield before entering mainloop? */
    158  1.1  yamt 	puffs_cc_yield(cc);
    159  1.1  yamt 	DPRINTF("yield returned %p\n", cc);
    160  1.1  yamt 	assert(xc->owner == cc);
    161  1.1  yamt 	assert(xc->blocker == cc);
    162  1.1  yamt 	xc->blocker = NULL;
    163  1.1  yamt }
    164  1.1  yamt 
    165  1.1  yamt static int
    166  1.1  yamt sqltoerrno(const char *sqlstate)
    167  1.1  yamt {
    168  1.1  yamt 	/*
    169  1.1  yamt 	 * XXX hack; ERRCODE_INTERNAL_ERROR -> EAGAIN to handle
    170  1.1  yamt 	 * "tuple concurrently updated" errors for lowrite/lo_truncate.
    171  1.1  yamt 	 *
    172  1.1  yamt 	 * XXX should map ERRCODE_OUT_OF_MEMORY to EAGAIN?
    173  1.1  yamt 	 */
    174  1.1  yamt 	static const struct {
    175  1.1  yamt 		char sqlstate[5];
    176  1.1  yamt 		int error;
    177  1.1  yamt 	} map[] = {
    178  1.1  yamt 		{ "00000", 0, },	/* ERRCODE_SUCCESSFUL_COMPLETION */
    179  1.1  yamt 		{ "02000", ENOENT, },	/* ERRCODE_NO_DATA */
    180  1.1  yamt 		{ "23505", EEXIST, },	/* ERRCODE_UNIQUE_VIOLATION */
    181  1.1  yamt 		{ "23514", EINVAL, },	/* ERRCODE_CHECK_VIOLATION */
    182  1.1  yamt 		{ "40001", EAGAIN, },	/* ERRCODE_T_R_SERIALIZATION_FAILURE */
    183  1.1  yamt 		{ "40P01", EAGAIN, },	/* ERRCODE_T_R_DEADLOCK_DETECTED */
    184  1.1  yamt 		{ "42704", ENOENT, },	/* ERRCODE_UNDEFINED_OBJECT */
    185  1.1  yamt 		{ "53100", ENOSPC, },	/* ERRCODE_DISK_FULL */
    186  1.1  yamt 		{ "53200", ENOMEM, },	/* ERRCODE_OUT_OF_MEMORY */
    187  1.1  yamt 		{ "XX000", EAGAIN, },	/* ERRCODE_INTERNAL_ERROR */
    188  1.1  yamt 	};
    189  1.1  yamt 	unsigned int i;
    190  1.1  yamt 
    191  1.1  yamt 	for (i = 0; i < __arraycount(map); i++) {
    192  1.1  yamt 		if (!memcmp(map[i].sqlstate, sqlstate, 5)) {
    193  1.1  yamt 			const int error = map[i].error;
    194  1.1  yamt 
    195  1.1  yamt 			if (error != 0) {
    196  1.1  yamt 				DPRINTF("sqlstate %5s mapped to error %d\n",
    197  1.1  yamt 				    sqlstate, error);
    198  1.1  yamt 			}
    199  1.1  yamt 			if (error == EINVAL) {
    200  1.1  yamt 				/*
    201  1.1  yamt 				 * sounds like a bug.
    202  1.1  yamt 				 */
    203  1.1  yamt 				abort();
    204  1.1  yamt 			}
    205  1.1  yamt 			return error;
    206  1.1  yamt 		}
    207  1.1  yamt 	}
    208  1.1  yamt 	DPRINTF("unknown sqlstate %5s mapped to EIO\n", sqlstate);
    209  1.1  yamt 	return EIO;
    210  1.1  yamt }
    211  1.1  yamt 
    212  1.1  yamt struct cmd {
    213  1.1  yamt 	char name[32];		/* name of prepared statement */
    214  1.1  yamt 	char *cmd;		/* query string */
    215  1.1  yamt 	unsigned int nparams;
    216  1.1  yamt 	Oid *paramtypes;
    217  1.1  yamt 	uint32_t prepared_mask;	/* for which connections this is prepared? */
    218  1.1  yamt 	unsigned int flags;	/* CMD_ flags */
    219  1.1  yamt };
    220  1.1  yamt 
    221  1.1  yamt #define	CMD_NOPREPARE	1	/* don't prepare this command */
    222  1.1  yamt 
    223  1.1  yamt struct cmd *
    224  1.1  yamt createcmd(const char *cmd, unsigned int flags, ...)
    225  1.1  yamt {
    226  1.1  yamt 	struct cmd *c;
    227  1.1  yamt 	va_list ap;
    228  1.1  yamt 	const char *cp;
    229  1.1  yamt 	unsigned int i;
    230  1.1  yamt 	static unsigned int cmdid;
    231  1.1  yamt 
    232  1.1  yamt 	c = emalloc(sizeof(*c));
    233  1.1  yamt 	c->cmd = estrdup(cmd);
    234  1.1  yamt 	c->nparams = 0;
    235  1.1  yamt 	va_start(ap, flags);
    236  1.1  yamt 	for (cp = cmd; *cp != 0; cp++) {
    237  1.1  yamt 		if (*cp == '$') { /* XXX */
    238  1.1  yamt 			c->nparams++;
    239  1.1  yamt 		}
    240  1.1  yamt 	}
    241  1.1  yamt 	c->paramtypes = emalloc(c->nparams * sizeof(*c->paramtypes));
    242  1.1  yamt 	for (i = 0; i < c->nparams; i++) {
    243  1.1  yamt 		Oid type = va_arg(ap, Oid);
    244  1.1  yamt 		assert(type == BYTEA ||
    245  1.1  yamt 		    type == INT4OID || type == INT8OID || type == OIDOID ||
    246  1.1  yamt 		    type == TEXTOID || type == TIMESTAMPTZOID);
    247  1.1  yamt 		c->paramtypes[i] = type;
    248  1.1  yamt 	}
    249  1.1  yamt 	va_end(ap);
    250  1.1  yamt 	snprintf(c->name, sizeof(c->name), "%u", cmdid++);
    251  1.1  yamt 	if ((flags & CMD_NOPREPARE) != 0) {
    252  1.1  yamt 		c->prepared_mask = ~0;
    253  1.1  yamt 	} else {
    254  1.1  yamt 		c->prepared_mask = 0;
    255  1.1  yamt 	}
    256  1.1  yamt 	c->flags = flags;
    257  1.1  yamt 	return c;
    258  1.1  yamt }
    259  1.1  yamt 
    260  1.1  yamt static void
    261  1.1  yamt freecmd(struct cmd *c)
    262  1.1  yamt {
    263  1.1  yamt 
    264  1.1  yamt 	free(c->paramtypes);
    265  1.1  yamt 	free(c->cmd);
    266  1.1  yamt 	free(c);
    267  1.1  yamt }
    268  1.1  yamt 
    269  1.1  yamt static int
    270  1.1  yamt fetch_noresult(struct Xconn *xc)
    271  1.1  yamt {
    272  1.1  yamt 	PGresult *res;
    273  1.1  yamt 	ExecStatusType status;
    274  1.1  yamt 	PGconn *conn = xc->conn;
    275  1.1  yamt 	int error;
    276  1.1  yamt 
    277  1.1  yamt 	pqwait(xc);
    278  1.1  yamt 	res = PQgetResult(conn);
    279  1.1  yamt 	if (res == NULL) {
    280  1.1  yamt 		return ENOENT;
    281  1.1  yamt 	}
    282  1.1  yamt 	status = PQresultStatus(res);
    283  1.1  yamt 	if (status == PGRES_COMMAND_OK) {
    284  1.1  yamt 		assert(PQnfields(res) == 0);
    285  1.1  yamt 		assert(PQntuples(res) == 0);
    286  1.1  yamt 		if (!strcmp(PQcmdTuples(res), "0")) {
    287  1.1  yamt 			error = ENOENT;
    288  1.1  yamt 		} else {
    289  1.1  yamt 			error = 0;
    290  1.1  yamt 		}
    291  1.1  yamt 	} else if (status == PGRES_FATAL_ERROR) {
    292  1.1  yamt 		error = sqltoerrno(PQresultErrorField(res, PG_DIAG_SQLSTATE));
    293  1.1  yamt 		assert(error != 0);
    294  1.1  yamt 		dumperror(xc, res);
    295  1.1  yamt 	} else {
    296  1.1  yamt 		errx(1, "%s not command_ok: %d: %s", __func__,
    297  1.1  yamt 		    (int)status,
    298  1.1  yamt 		    PQerrorMessage(conn));
    299  1.1  yamt 	}
    300  1.1  yamt 	PQclear(res);
    301  1.1  yamt 	res = PQgetResult(conn);
    302  1.1  yamt 	assert(res == NULL);
    303  1.1  yamt 	if (error != 0) {
    304  1.1  yamt 		DPRINTF("error %d\n", error);
    305  1.1  yamt 	}
    306  1.1  yamt 	return error;
    307  1.1  yamt }
    308  1.1  yamt 
    309  1.1  yamt static int
    310  1.1  yamt preparecmd(struct Xconn *xc, struct cmd *c)
    311  1.1  yamt {
    312  1.1  yamt 	PGconn *conn = xc->conn;
    313  1.1  yamt 	const uint32_t mask = 1 << xc->id;
    314  1.1  yamt 	int error;
    315  1.1  yamt 	int ret;
    316  1.1  yamt 
    317  1.1  yamt 	if ((c->prepared_mask & mask) != 0) {
    318  1.1  yamt 		return 0;
    319  1.1  yamt 	}
    320  1.1  yamt 	assert((c->flags & CMD_NOPREPARE) == 0);
    321  1.1  yamt 	DPRINTF("PREPARE: '%s'\n", c->cmd);
    322  1.1  yamt 	ret = PQsendPrepare(conn, c->name, c->cmd, c->nparams, c->paramtypes);
    323  1.1  yamt 	if (!ret) {
    324  1.1  yamt 		errx(EXIT_FAILURE, "PQsendPrepare: %s",
    325  1.1  yamt 		    PQerrorMessage(conn));
    326  1.1  yamt 	}
    327  1.1  yamt 	error = fetch_noresult(xc);
    328  1.1  yamt 	if (error != 0) {
    329  1.1  yamt 		return error;
    330  1.1  yamt 	}
    331  1.1  yamt 	c->prepared_mask |= mask;
    332  1.1  yamt 	return 0;
    333  1.1  yamt }
    334  1.1  yamt 
    335  1.1  yamt /*
    336  1.1  yamt  * vsendcmd:
    337  1.1  yamt  *
    338  1.1  yamt  * resultmode is just passed to PQsendQueryParams/PQsendQueryPrepared.
    339  1.1  yamt  * 0 for text and 1 for binary.
    340  1.1  yamt  */
    341  1.1  yamt 
    342  1.1  yamt static int
    343  1.1  yamt vsendcmd(struct Xconn *xc, int resultmode, struct cmd *c, va_list ap)
    344  1.1  yamt {
    345  1.1  yamt 	PGconn *conn = xc->conn;
    346  1.1  yamt 	char **paramvalues;
    347  1.1  yamt 	int *paramlengths;
    348  1.1  yamt 	int *paramformats;
    349  1.1  yamt 	unsigned int i;
    350  1.1  yamt 	int error;
    351  1.1  yamt 	int ret;
    352  1.1  yamt 
    353  1.1  yamt 	assert(xc->owner != NULL);
    354  1.1  yamt 	assert(xc->blocker == NULL);
    355  1.1  yamt 	error = preparecmd(xc, c);
    356  1.1  yamt 	if (error != 0) {
    357  1.1  yamt 		return error;
    358  1.1  yamt 	}
    359  1.1  yamt 	paramvalues = emalloc(c->nparams * sizeof(*paramvalues));
    360  1.1  yamt 	paramlengths = NULL;
    361  1.1  yamt 	paramformats = NULL;
    362  1.1  yamt 	DPRINTF("CMD: '%s'\n", c->cmd);
    363  1.1  yamt 	for (i = 0; i < c->nparams; i++) {
    364  1.1  yamt 		Oid type = c->paramtypes[i];
    365  1.1  yamt 		char tmpstore[1024];
    366  1.1  yamt 		const char *buf = NULL;
    367  1.1  yamt 		intmax_t v = 0; /* XXXgcc */
    368  1.1  yamt 		int sz;
    369  1.1  yamt 		bool binary = false;
    370  1.1  yamt 
    371  1.1  yamt 		switch (type) {
    372  1.1  yamt 		case BYTEA:
    373  1.1  yamt 			buf = va_arg(ap, const void *);
    374  1.1  yamt 			sz = (int)va_arg(ap, size_t);
    375  1.1  yamt 			binary = true;
    376  1.1  yamt 			break;
    377  1.1  yamt 		case INT8OID:
    378  1.1  yamt 		case OIDOID:
    379  1.1  yamt 		case INT4OID:
    380  1.1  yamt 			switch (type) {
    381  1.1  yamt 			case INT8OID:
    382  1.1  yamt 				v = (intmax_t)va_arg(ap, int64_t);
    383  1.1  yamt 				break;
    384  1.1  yamt 			case OIDOID:
    385  1.1  yamt 				v = (intmax_t)va_arg(ap, Oid);
    386  1.1  yamt 				break;
    387  1.1  yamt 			case INT4OID:
    388  1.1  yamt 				v = (intmax_t)va_arg(ap, int32_t);
    389  1.1  yamt 				break;
    390  1.1  yamt 			default:
    391  1.1  yamt 				errx(EXIT_FAILURE, "unknown integer oid %u",
    392  1.1  yamt 				    type);
    393  1.1  yamt 			}
    394  1.1  yamt 			buf = tmpstore;
    395  1.1  yamt 			sz = snprintf(tmpstore, sizeof(tmpstore),
    396  1.1  yamt 			    "%jd", v);
    397  1.1  yamt 			assert(sz != -1);
    398  1.1  yamt 			assert((size_t)sz < sizeof(tmpstore));
    399  1.1  yamt 			sz += 1;
    400  1.1  yamt 			break;
    401  1.1  yamt 		case TEXTOID:
    402  1.1  yamt 		case TIMESTAMPTZOID:
    403  1.1  yamt 			buf = va_arg(ap, char *);
    404  1.1  yamt 			sz = strlen(buf) + 1;
    405  1.1  yamt 			break;
    406  1.1  yamt 		default:
    407  1.1  yamt 			errx(EXIT_FAILURE, "%s: unknown param type %u",
    408  1.1  yamt 			    __func__, type);
    409  1.1  yamt 		}
    410  1.1  yamt 		if (binary) {
    411  1.1  yamt 			if (paramlengths == NULL) {
    412  1.1  yamt 				paramlengths =
    413  1.1  yamt 				    emalloc(c->nparams * sizeof(*paramformats));
    414  1.1  yamt 			}
    415  1.1  yamt 			if (paramformats == NULL) {
    416  1.1  yamt 				paramformats = ecalloc(1,
    417  1.1  yamt 				    c->nparams * sizeof(*paramformats));
    418  1.1  yamt 			}
    419  1.1  yamt 			paramformats[i] = 1;
    420  1.1  yamt 			paramlengths[i] = sz;
    421  1.1  yamt 		}
    422  1.1  yamt 		paramvalues[i] = emalloc(sz);
    423  1.1  yamt 		memcpy(paramvalues[i], buf, sz);
    424  1.1  yamt 		if (binary) {
    425  1.1  yamt 			DPRINTF("\t[%u]=<BINARY>\n", i);
    426  1.1  yamt 		} else {
    427  1.1  yamt 			DPRINTF("\t[%u]='%s'\n", i, paramvalues[i]);
    428  1.1  yamt 		}
    429  1.1  yamt 	}
    430  1.1  yamt 	if ((c->flags & CMD_NOPREPARE) != 0) {
    431  1.1  yamt 		ret = PQsendQueryParams(conn, c->cmd, c->nparams, c->paramtypes,
    432  1.1  yamt 		    (const char * const *)paramvalues, paramlengths,
    433  1.1  yamt 		    paramformats, resultmode);
    434  1.1  yamt 	} else {
    435  1.1  yamt 		ret = PQsendQueryPrepared(conn, c->name, c->nparams,
    436  1.1  yamt 		    (const char * const *)paramvalues, paramlengths,
    437  1.1  yamt 		    paramformats, resultmode);
    438  1.1  yamt 	}
    439  1.1  yamt 	for (i = 0; i < c->nparams; i++) {
    440  1.1  yamt 		free(paramvalues[i]);
    441  1.1  yamt 	}
    442  1.1  yamt 	free(paramvalues);
    443  1.1  yamt 	free(paramlengths);
    444  1.1  yamt 	free(paramformats);
    445  1.1  yamt 	if (!ret) {
    446  1.1  yamt 		errx(EXIT_FAILURE, "PQsendQueryPrepared: %s",
    447  1.1  yamt 		    PQerrorMessage(conn));
    448  1.1  yamt 	}
    449  1.1  yamt 	return 0;
    450  1.1  yamt }
    451  1.1  yamt 
    452  1.1  yamt int
    453  1.1  yamt sendcmd(struct Xconn *xc, struct cmd *c, ...)
    454  1.1  yamt {
    455  1.1  yamt 	va_list ap;
    456  1.1  yamt 	int error;
    457  1.1  yamt 
    458  1.1  yamt 	va_start(ap, c);
    459  1.1  yamt 	error = vsendcmd(xc, 0, c, ap);
    460  1.1  yamt 	va_end(ap);
    461  1.1  yamt 	return error;
    462  1.1  yamt }
    463  1.1  yamt 
    464  1.1  yamt int
    465  1.1  yamt sendcmdx(struct Xconn *xc, int resultmode, struct cmd *c, ...)
    466  1.1  yamt {
    467  1.1  yamt 	va_list ap;
    468  1.1  yamt 	int error;
    469  1.1  yamt 
    470  1.1  yamt 	va_start(ap, c);
    471  1.1  yamt 	error = vsendcmd(xc, resultmode, c, ap);
    472  1.1  yamt 	va_end(ap);
    473  1.1  yamt 	return error;
    474  1.1  yamt }
    475  1.1  yamt 
    476  1.1  yamt /*
    477  1.1  yamt  * simplecmd: a convenient routine to execute a command which returns
    478  1.1  yamt  * no rows synchronously.
    479  1.1  yamt  */
    480  1.1  yamt 
    481  1.1  yamt int
    482  1.1  yamt simplecmd(struct Xconn *xc, struct cmd *c, ...)
    483  1.1  yamt {
    484  1.1  yamt 	va_list ap;
    485  1.1  yamt 	int error;
    486  1.1  yamt 
    487  1.1  yamt 	va_start(ap, c);
    488  1.1  yamt 	error = vsendcmd(xc, 0, c, ap);
    489  1.1  yamt 	va_end(ap);
    490  1.1  yamt 	if (error != 0) {
    491  1.1  yamt 		return error;
    492  1.1  yamt 	}
    493  1.1  yamt 	return fetch_noresult(xc);
    494  1.1  yamt }
    495  1.1  yamt 
    496  1.1  yamt void
    497  1.1  yamt fetchinit(struct fetchstatus *s, struct Xconn *xc)
    498  1.1  yamt {
    499  1.1  yamt 	s->xc = xc;
    500  1.1  yamt 	s->res = NULL;
    501  1.1  yamt 	s->cur = 0;
    502  1.1  yamt 	s->nrows = 0;
    503  1.1  yamt 	s->done = false;
    504  1.1  yamt }
    505  1.1  yamt 
    506  1.1  yamt static intmax_t
    507  1.1  yamt getint(const char *str)
    508  1.1  yamt {
    509  1.1  yamt 	intmax_t i;
    510  1.1  yamt 	char *ep;
    511  1.1  yamt 
    512  1.1  yamt 	errno = 0;
    513  1.1  yamt 	i = strtoimax(str, &ep, 10);
    514  1.1  yamt 	assert(errno == 0);
    515  1.1  yamt 	assert(str[0] != 0);
    516  1.1  yamt 	assert(*ep == 0);
    517  1.1  yamt 	return i;
    518  1.1  yamt }
    519  1.1  yamt 
    520  1.1  yamt static int
    521  1.1  yamt vfetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, va_list ap)
    522  1.1  yamt {
    523  1.1  yamt 	PGconn *conn = s->xc->conn;
    524  1.1  yamt 	unsigned int i;
    525  1.1  yamt 
    526  1.1  yamt 	assert(conn != NULL);
    527  1.1  yamt 	if (s->res == NULL) {
    528  1.1  yamt 		ExecStatusType status;
    529  1.1  yamt 		int error;
    530  1.1  yamt 
    531  1.1  yamt 		pqwait(s->xc);
    532  1.1  yamt 		s->res = PQgetResult(conn);
    533  1.1  yamt 		if (s->res == NULL) {
    534  1.1  yamt 			s->done = true;
    535  1.1  yamt 			return ENOENT;
    536  1.1  yamt 		}
    537  1.1  yamt 		status = PQresultStatus(s->res);
    538  1.1  yamt 		if (status == PGRES_FATAL_ERROR) {
    539  1.1  yamt 			error = sqltoerrno(
    540  1.1  yamt 			    PQresultErrorField(s->res, PG_DIAG_SQLSTATE));
    541  1.1  yamt 			assert(error != 0);
    542  1.1  yamt 			dumperror(s->xc, s->res);
    543  1.1  yamt 			return error;
    544  1.1  yamt 		}
    545  1.1  yamt 		if (status != PGRES_TUPLES_OK) {
    546  1.1  yamt 			errx(1, "not tuples_ok: %s",
    547  1.1  yamt 			    PQerrorMessage(conn));
    548  1.1  yamt 		}
    549  1.1  yamt 		assert((unsigned int)PQnfields(s->res) == n);
    550  1.1  yamt 		s->nrows = PQntuples(s->res);
    551  1.1  yamt 		if (s->nrows == 0) {
    552  1.1  yamt 			DPRINTF("no rows\n");
    553  1.1  yamt 			return ENOENT;
    554  1.1  yamt 		}
    555  1.1  yamt 		assert(s->nrows >= 1);
    556  1.1  yamt 		s->cur = 0;
    557  1.1  yamt 	}
    558  1.1  yamt 	for (i = 0; i < n; i++) {
    559  1.1  yamt 		size_t size;
    560  1.1  yamt 
    561  1.1  yamt 		assert((types[i] != BYTEA) == (PQfformat(s->res, i) == 0));
    562  1.1  yamt 		DPRINTF("[%u] PQftype = %d, types = %d, value = '%s'\n",
    563  1.1  yamt 		    i, PQftype(s->res, i), types[i],
    564  1.1  yamt 		    PQgetisnull(s->res, s->cur, i) ? "<NULL>" :
    565  1.1  yamt 		    PQfformat(s->res, i) == 0 ? PQgetvalue(s->res, s->cur, i) :
    566  1.1  yamt 		    "<BINARY>");
    567  1.1  yamt 		assert(PQftype(s->res, i) == types[i]);
    568  1.1  yamt 		assert(!PQgetisnull(s->res, s->cur, i));
    569  1.1  yamt 		switch(types[i]) {
    570  1.1  yamt 		case INT8OID:
    571  1.1  yamt 			*va_arg(ap, int64_t *) =
    572  1.1  yamt 			    getint(PQgetvalue(s->res, s->cur, i));
    573  1.1  yamt 			break;
    574  1.1  yamt 		case OIDOID:
    575  1.1  yamt 			*va_arg(ap, Oid *) =
    576  1.1  yamt 			    getint(PQgetvalue(s->res, s->cur, i));
    577  1.1  yamt 			break;
    578  1.1  yamt 		case INT4OID:
    579  1.1  yamt 			*va_arg(ap, int32_t *) =
    580  1.1  yamt 			    getint(PQgetvalue(s->res, s->cur, i));
    581  1.1  yamt 			break;
    582  1.1  yamt 		case TEXTOID:
    583  1.1  yamt 			*va_arg(ap, char **) =
    584  1.1  yamt 			    estrdup(PQgetvalue(s->res, s->cur, i));
    585  1.1  yamt 			break;
    586  1.1  yamt 		case BYTEA:
    587  1.1  yamt 			size = PQgetlength(s->res, s->cur, i);
    588  1.1  yamt 			memcpy(va_arg(ap, void *),
    589  1.1  yamt 			    PQgetvalue(s->res, s->cur, i), size);
    590  1.1  yamt 			*va_arg(ap, size_t *) = size;
    591  1.1  yamt 			break;
    592  1.1  yamt 		default:
    593  1.1  yamt 			errx(EXIT_FAILURE, "%s unknown type %u", __func__,
    594  1.1  yamt 			    types[i]);
    595  1.1  yamt 		}
    596  1.1  yamt 	}
    597  1.1  yamt 	s->cur++;
    598  1.1  yamt 	if (s->cur == s->nrows) {
    599  1.1  yamt 		PQclear(s->res);
    600  1.1  yamt 		s->res = NULL;
    601  1.1  yamt 	}
    602  1.1  yamt 	return 0;
    603  1.1  yamt }
    604  1.1  yamt 
    605  1.1  yamt int
    606  1.1  yamt fetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, ...)
    607  1.1  yamt {
    608  1.1  yamt 	va_list ap;
    609  1.1  yamt 	int error;
    610  1.1  yamt 
    611  1.1  yamt 	va_start(ap, types);
    612  1.1  yamt 	error = vfetchnext(s, n, types, ap);
    613  1.1  yamt 	va_end(ap);
    614  1.1  yamt 	return error;
    615  1.1  yamt }
    616  1.1  yamt 
    617  1.1  yamt void
    618  1.1  yamt fetchdone(struct fetchstatus *s)
    619  1.1  yamt {
    620  1.1  yamt 
    621  1.1  yamt 	if (s->res != NULL) {
    622  1.1  yamt 		PQclear(s->res);
    623  1.1  yamt 		s->res = NULL;
    624  1.1  yamt 	}
    625  1.1  yamt 	if (!s->done) {
    626  1.1  yamt 		PGresult *res;
    627  1.1  yamt 		unsigned int n;
    628  1.1  yamt 
    629  1.1  yamt 		n = 0;
    630  1.1  yamt 		while ((res = PQgetResult(s->xc->conn)) != NULL) {
    631  1.1  yamt 			PQclear(res);
    632  1.1  yamt 			n++;
    633  1.1  yamt 		}
    634  1.1  yamt 		if (n > 0) {
    635  1.1  yamt 			DPRINTF("%u rows dropped\n", n);
    636  1.1  yamt 		}
    637  1.1  yamt 	}
    638  1.1  yamt }
    639  1.1  yamt 
    640  1.1  yamt int
    641  1.1  yamt simplefetch(struct Xconn *xc, Oid type, ...)
    642  1.1  yamt {
    643  1.1  yamt 	struct fetchstatus s;
    644  1.1  yamt 	va_list ap;
    645  1.1  yamt 	int error;
    646  1.1  yamt 
    647  1.1  yamt 	fetchinit(&s, xc);
    648  1.1  yamt 	va_start(ap, type);
    649  1.1  yamt 	error = vfetchnext(&s, 1, &type, ap);
    650  1.1  yamt 	va_end(ap);
    651  1.1  yamt 	assert(error != 0 || s.res == NULL);
    652  1.1  yamt 	fetchdone(&s);
    653  1.1  yamt 	return error;
    654  1.1  yamt }
    655  1.1  yamt 
    656  1.3  yamt /*
    657  1.3  yamt  * setlabel: set the descriptive label for the connection.
    658  1.3  yamt  *
    659  1.3  yamt  * we use simple pointer comparison for label equality check.
    660  1.3  yamt  */
    661  1.2  yamt static void
    662  1.2  yamt setlabel(struct Xconn *xc, const char *label)
    663  1.2  yamt {
    664  1.2  yamt 	int error;
    665  1.2  yamt 
    666  1.2  yamt 	/*
    667  1.2  yamt 	 * put the label into application_name so that it's shown in
    668  1.2  yamt 	 * pg_stat_activity.  we are sure that our labels don't need
    669  1.2  yamt 	 * PQescapeStringConn.
    670  1.2  yamt 	 *
    671  1.2  yamt 	 * example:
    672  1.2  yamt 	 *	SELECT pid,application_name,query FROM pg_stat_activity
    673  1.2  yamt 	 *	WHERE state <> 'idle'
    674  1.2  yamt 	 */
    675  1.2  yamt 
    676  1.3  yamt 	if (label != NULL && label != xc->label) {
    677  1.2  yamt 		struct cmd *c;
    678  1.2  yamt 		char cmd_str[1024];
    679  1.2  yamt 
    680  1.2  yamt 		snprintf(cmd_str, sizeof(cmd_str),
    681  1.2  yamt 		    "SET application_name TO 'pgfs: %s'", label);
    682  1.2  yamt 		c = createcmd(cmd_str, CMD_NOPREPARE);
    683  1.2  yamt 		error = simplecmd(xc, c);
    684  1.2  yamt 		freecmd(c);
    685  1.2  yamt 		assert(error == 0);
    686  1.3  yamt 		xc->label = label;
    687  1.2  yamt 	} else {
    688  1.2  yamt #if 0 /* don't bother to clear label */
    689  1.2  yamt 		static struct cmd *c;
    690  1.2  yamt 
    691  1.2  yamt 		CREATECMD_NOPARAM(c, "SET application_name TO 'pgfs'");
    692  1.2  yamt 		error = simplecmd(xc, c);
    693  1.2  yamt 		assert(error == 0);
    694  1.2  yamt #endif
    695  1.2  yamt 	}
    696  1.2  yamt }
    697  1.2  yamt 
    698  1.1  yamt struct Xconn *
    699  1.2  yamt begin(struct puffs_usermount *pu, const char *label)
    700  1.1  yamt {
    701  1.1  yamt 	struct Xconn *xc = getxc(puffs_cc_getcc(pu));
    702  1.1  yamt 	static struct cmd *c;
    703  1.1  yamt 	int error;
    704  1.1  yamt 
    705  1.2  yamt 	setlabel(xc, label);
    706  1.1  yamt 	CREATECMD_NOPARAM(c, "BEGIN");
    707  1.1  yamt 	assert(!xc->in_trans);
    708  1.1  yamt 	error = simplecmd(xc, c);
    709  1.1  yamt 	assert(error == 0);
    710  1.1  yamt 	assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
    711  1.1  yamt 	xc->in_trans = true;
    712  1.1  yamt 	return xc;
    713  1.1  yamt }
    714  1.1  yamt 
    715  1.1  yamt struct Xconn *
    716  1.2  yamt begin_readonly(struct puffs_usermount *pu, const char *label)
    717  1.1  yamt {
    718  1.1  yamt 	struct Xconn *xc = getxc(puffs_cc_getcc(pu));
    719  1.1  yamt 	static struct cmd *c;
    720  1.1  yamt 	int error;
    721  1.1  yamt 
    722  1.2  yamt 	setlabel(xc, label);
    723  1.1  yamt 	CREATECMD_NOPARAM(c, "BEGIN READ ONLY");
    724  1.1  yamt 	assert(!xc->in_trans);
    725  1.1  yamt 	error = simplecmd(xc, c);
    726  1.1  yamt 	assert(error == 0);
    727  1.1  yamt 	assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
    728  1.1  yamt 	xc->in_trans = true;
    729  1.1  yamt 	return xc;
    730  1.1  yamt }
    731  1.1  yamt 
    732  1.1  yamt void
    733  1.1  yamt rollback(struct Xconn *xc)
    734  1.1  yamt {
    735  1.1  yamt 	PGTransactionStatusType status;
    736  1.1  yamt 
    737  1.1  yamt 	/*
    738  1.1  yamt 	 * check the status as we are not sure the status of our transaction
    739  1.1  yamt 	 * after a failed commit.
    740  1.1  yamt 	 */
    741  1.1  yamt 	status = PQtransactionStatus(xc->conn);
    742  1.1  yamt 	assert(status != PQTRANS_ACTIVE);
    743  1.1  yamt 	assert(status != PQTRANS_UNKNOWN);
    744  1.1  yamt 	if (status != PQTRANS_IDLE) {
    745  1.1  yamt 		static struct cmd *c;
    746  1.1  yamt 		int error;
    747  1.1  yamt 
    748  1.1  yamt 		assert(status == PQTRANS_INTRANS || status == PQTRANS_INERROR);
    749  1.1  yamt 		CREATECMD_NOPARAM(c, "ROLLBACK");
    750  1.1  yamt 		error = simplecmd(xc, c);
    751  1.1  yamt 		assert(error == 0);
    752  1.1  yamt 	}
    753  1.1  yamt 	DPRINTF("xc %p rollback %p\n", xc, xc->owner);
    754  1.2  yamt 	setlabel(xc, NULL);
    755  1.1  yamt 	relxc(xc);
    756  1.1  yamt }
    757  1.1  yamt 
    758  1.1  yamt int
    759  1.1  yamt commit(struct Xconn *xc)
    760  1.1  yamt {
    761  1.1  yamt 	static struct cmd *c;
    762  1.1  yamt 	int error;
    763  1.1  yamt 
    764  1.1  yamt 	CREATECMD_NOPARAM(c, "COMMIT");
    765  1.1  yamt 	error = simplecmd(xc, c);
    766  1.2  yamt 	setlabel(xc, NULL);
    767  1.1  yamt 	if (error == 0) {
    768  1.1  yamt 		DPRINTF("xc %p commit %p\n", xc, xc->owner);
    769  1.1  yamt 		relxc(xc);
    770  1.1  yamt 	}
    771  1.1  yamt 	return error;
    772  1.1  yamt }
    773  1.1  yamt 
    774  1.1  yamt int
    775  1.1  yamt commit_sync(struct Xconn *xc)
    776  1.1  yamt {
    777  1.1  yamt 	static struct cmd *c;
    778  1.1  yamt 	int error;
    779  1.1  yamt 
    780  1.1  yamt 	assert(!pgfs_dosync);
    781  1.1  yamt 	CREATECMD_NOPARAM(c, "SET LOCAL SYNCHRONOUS_COMMIT TO ON");
    782  1.1  yamt 	error = simplecmd(xc, c);
    783  1.1  yamt 	assert(error == 0);
    784  1.1  yamt 	return commit(xc);
    785  1.1  yamt }
    786  1.1  yamt 
    787  1.1  yamt static void
    788  1.1  yamt pgfs_notice_receiver(void *vp, const PGresult *res)
    789  1.1  yamt {
    790  1.1  yamt 	struct Xconn *xc = vp;
    791  1.1  yamt 
    792  1.1  yamt 	assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR);
    793  1.1  yamt 	fprintf(stderr, "got a notice on %p\n", xc);
    794  1.1  yamt 	dumperror(xc, res);
    795  1.1  yamt }
    796  1.1  yamt 
    797  1.1  yamt static int
    798  1.1  yamt pgfs_readframe(struct puffs_usermount *pu, struct puffs_framebuf *pufbuf,
    799  1.1  yamt     int fd, int *done)
    800  1.1  yamt {
    801  1.1  yamt 	struct Xconn *xc;
    802  1.1  yamt 	PGconn *conn;
    803  1.1  yamt 
    804  1.1  yamt 	TAILQ_FOREACH(xc, &xclist, list) {
    805  1.1  yamt 		if (PQsocket(xc->conn) == fd) {
    806  1.1  yamt 			break;
    807  1.1  yamt 		}
    808  1.1  yamt 	}
    809  1.1  yamt 	assert(xc != NULL);
    810  1.1  yamt 	conn = xc->conn;
    811  1.1  yamt 	PQconsumeInput(conn);
    812  1.1  yamt 	if (!PQisBusy(conn)) {
    813  1.1  yamt 		if (xc->blocker != NULL) {
    814  1.1  yamt 			DPRINTF("schedule %p\n", xc->blocker);
    815  1.1  yamt 			puffs_cc_schedule(xc->blocker);
    816  1.1  yamt 		} else {
    817  1.1  yamt 			DPRINTF("no blockers\n");
    818  1.1  yamt 		}
    819  1.1  yamt 	}
    820  1.1  yamt 	*done = 0;
    821  1.1  yamt 	return 0;
    822  1.1  yamt }
    823  1.1  yamt 
    824  1.1  yamt int
    825  1.1  yamt pgfs_connectdb(struct puffs_usermount *pu, const char *dbname,
    826  1.1  yamt     const char *dbuser, bool debug, bool synchronous, unsigned int nconn)
    827  1.1  yamt {
    828  1.1  yamt 	const char *keywords[3+1];
    829  1.1  yamt 	const char *values[3];
    830  1.1  yamt 	unsigned int i;
    831  1.1  yamt 
    832  1.1  yamt 	if (nconn > 32) {
    833  1.1  yamt 		/*
    834  1.1  yamt 		 * limit from sizeof(cmd->prepared_mask)
    835  1.1  yamt 		 */
    836  1.1  yamt 		return EINVAL;
    837  1.1  yamt 	}
    838  1.1  yamt 	if (debug) {
    839  1.1  yamt 		pgfs_dodprintf = true;
    840  1.1  yamt 	}
    841  1.1  yamt 	if (synchronous) {
    842  1.1  yamt 		pgfs_dosync = true;
    843  1.1  yamt 	}
    844  1.1  yamt 	i = 0;
    845  1.1  yamt 	if (dbname != NULL) {
    846  1.1  yamt 		keywords[i] = "dbname";
    847  1.1  yamt 		values[i] = dbname;
    848  1.1  yamt 		i++;
    849  1.1  yamt 	}
    850  1.1  yamt 	if (dbuser != NULL) {
    851  1.1  yamt 		keywords[i] = "user";
    852  1.1  yamt 		values[i] = dbuser;
    853  1.1  yamt 		i++;
    854  1.1  yamt 	}
    855  1.1  yamt 	keywords[i] = "application_name";
    856  1.1  yamt 	values[i] = "pgfs";
    857  1.1  yamt 	i++;
    858  1.1  yamt 	keywords[i] = NULL;
    859  1.1  yamt 	puffs_framev_init(pu, pgfs_readframe, NULL, NULL, NULL, NULL);
    860  1.1  yamt 	for (i = 0; i < nconn; i++) {
    861  1.1  yamt 		struct Xconn *xc;
    862  1.1  yamt 		struct Xconn *xc2;
    863  1.1  yamt 		static int xcid;
    864  1.1  yamt 		PGconn *conn;
    865  1.1  yamt 		struct cmd *c;
    866  1.1  yamt 		int error;
    867  1.1  yamt 
    868  1.1  yamt 		conn = PQconnectdbParams(keywords, values, 0);
    869  1.1  yamt 		if (conn == NULL) {
    870  1.1  yamt 			errx(EXIT_FAILURE,
    871  1.1  yamt 			    "PQconnectdbParams: unknown failure");
    872  1.1  yamt 		}
    873  1.1  yamt 		if (PQstatus(conn) != CONNECTION_OK) {
    874  1.1  yamt 			/*
    875  1.1  yamt 			 * XXX sleep and retry on ERRCODE_CANNOT_CONNECT_NOW
    876  1.1  yamt 			 */
    877  1.1  yamt 			errx(EXIT_FAILURE, "PQconnectdbParams: %s",
    878  1.1  yamt 			    PQerrorMessage(conn));
    879  1.1  yamt 		}
    880  1.1  yamt 		DPRINTF("protocol version %d\n", PQprotocolVersion(conn));
    881  1.1  yamt 		puffs_framev_addfd(pu, PQsocket(conn), PUFFS_FBIO_READ);
    882  1.1  yamt 		xc = emalloc(sizeof(*xc));
    883  1.1  yamt 		xc->conn = conn;
    884  1.1  yamt 		xc->blocker = NULL;
    885  1.1  yamt 		xc->owner = NULL;
    886  1.1  yamt 		xc->in_trans = false;
    887  1.1  yamt 		xc->id = xcid++;
    888  1.3  yamt 		xc->label = NULL;
    889  1.1  yamt 		assert(xc->id < 32);
    890  1.1  yamt 		PQsetNoticeReceiver(conn, pgfs_notice_receiver, xc);
    891  1.1  yamt 		TAILQ_INSERT_HEAD(&xclist, xc, list);
    892  1.2  yamt 		xc2 = begin(pu, NULL);
    893  1.1  yamt 		assert(xc2 == xc);
    894  1.1  yamt 		c = createcmd("SET search_path TO pgfs", CMD_NOPREPARE);
    895  1.1  yamt 		error = simplecmd(xc, c);
    896  1.1  yamt 		assert(error == 0);
    897  1.1  yamt 		freecmd(c);
    898  1.1  yamt 		c = createcmd("SET SESSION CHARACTERISTICS AS "
    899  1.1  yamt 		    "TRANSACTION ISOLATION LEVEL REPEATABLE READ",
    900  1.1  yamt 		    CMD_NOPREPARE);
    901  1.1  yamt 		error = simplecmd(xc, c);
    902  1.1  yamt 		assert(error == 0);
    903  1.1  yamt 		freecmd(c);
    904  1.1  yamt 		c = createcmd("SET SESSION TIME ZONE UTC", CMD_NOPREPARE);
    905  1.1  yamt 		error = simplecmd(xc, c);
    906  1.1  yamt 		assert(error == 0);
    907  1.1  yamt 		freecmd(c);
    908  1.1  yamt 		if (!pgfs_dosync) {
    909  1.1  yamt 			c = createcmd("SET SESSION SYNCHRONOUS_COMMIT TO OFF",
    910  1.1  yamt 			    CMD_NOPREPARE);
    911  1.1  yamt 			error = simplecmd(xc, c);
    912  1.1  yamt 			assert(error == 0);
    913  1.1  yamt 			freecmd(c);
    914  1.1  yamt 		}
    915  1.1  yamt 		if (debug) {
    916  1.1  yamt 			struct fetchstatus s;
    917  1.1  yamt 			static const Oid types[] = { INT8OID, };
    918  1.1  yamt 			uint64_t pid;
    919  1.1  yamt 
    920  1.1  yamt 			c = createcmd("SELECT pg_backend_pid()::int8;",
    921  1.1  yamt 			    CMD_NOPREPARE);
    922  1.1  yamt 			error = sendcmd(xc, c);
    923  1.1  yamt 			assert(error == 0);
    924  1.1  yamt 			fetchinit(&s, xc);
    925  1.1  yamt 			error = FETCHNEXT(&s, types, &pid);
    926  1.1  yamt 			fetchdone(&s);
    927  1.1  yamt 			assert(error == 0);
    928  1.1  yamt 			DPRINTF("xc %p backend pid %" PRIu64 "\n", xc, pid);
    929  1.1  yamt 		}
    930  1.1  yamt 		error = commit(xc);
    931  1.1  yamt 		assert(error == 0);
    932  1.1  yamt 		assert(xc->owner == NULL);
    933  1.1  yamt 	}
    934  1.1  yamt 	/*
    935  1.1  yamt 	 * XXX cleanup unlinked files here?  what to do when the filesystem
    936  1.1  yamt 	 * is shared?
    937  1.1  yamt 	 */
    938  1.1  yamt 	return 0;
    939  1.1  yamt }
    940  1.1  yamt 
    941  1.1  yamt struct waitq flushwaitq = TAILQ_HEAD_INITIALIZER(flushwaitq);
    942  1.1  yamt struct puffs_cc *flusher = NULL;
    943  1.1  yamt 
    944  1.1  yamt int
    945  1.1  yamt flush_xacts(struct puffs_usermount *pu)
    946  1.1  yamt {
    947  1.1  yamt 	struct puffs_cc *cc = puffs_cc_getcc(pu);
    948  1.1  yamt 	struct Xconn *xc;
    949  1.1  yamt 	static struct cmd *c;
    950  1.1  yamt 	uint64_t dummy;
    951  1.1  yamt 	int error;
    952  1.1  yamt 
    953  1.1  yamt 	/*
    954  1.1  yamt 	 * flush all previously issued asynchronous transactions.
    955  1.1  yamt 	 *
    956  1.1  yamt 	 * XXX
    957  1.1  yamt 	 * unfortunately it seems that there is no clean way to tell
    958  1.1  yamt 	 * PostgreSQL flush XLOG.  we could perform a CHECKPOINT but it's
    959  1.1  yamt 	 * too expensive and overkill for our purpose.
    960  1.1  yamt 	 * besides, PostgreSQL has an optimization to skip XLOG flushing
    961  1.1  yamt 	 * for transactions which didn't produce WAL records.
    962  1.1  yamt 	 * (changeset f6a0863e3cb72763490ceca2c558d5ef2dddd5f2)
    963  1.1  yamt 	 * it means that an empty transaction ("BEGIN; COMMIT;"), which
    964  1.1  yamt 	 * doesn't produce any WAL records, doesn't flush the XLOG even if
    965  1.1  yamt 	 * synchronous_commit=on.  we issues a dummy setval() to avoid the
    966  1.1  yamt 	 * optimization.
    967  1.1  yamt 	 * on the other hand, we try to avoid creating unnecessary WAL activity
    968  1.1  yamt 	 * by serializing flushing and checking XLOG locations.
    969  1.1  yamt 	 */
    970  1.1  yamt 
    971  1.1  yamt 	assert(!pgfs_dosync);
    972  1.1  yamt 	if (flusher != NULL) { /* serialize flushers */
    973  1.1  yamt 		DPRINTF("%p flush in progress %p\n", cc, flusher);
    974  1.1  yamt 		waiton(&flushwaitq, cc);
    975  1.1  yamt 		assert(flusher == NULL);
    976  1.1  yamt 	}
    977  1.1  yamt 	DPRINTF("%p start flushing\n", cc);
    978  1.1  yamt 	flusher = cc;
    979  1.1  yamt retry:
    980  1.2  yamt 	xc = begin(pu, "flush");
    981  1.1  yamt 	CREATECMD_NOPARAM(c, "SELECT setval('dummyseq', 1) WHERE "
    982  1.1  yamt 	    "pg_current_xlog_insert_location() <> pg_current_xlog_location()");
    983  1.1  yamt 	error = sendcmd(xc, c);
    984  1.1  yamt 	if (error != 0) {
    985  1.1  yamt 		goto got_error;
    986  1.1  yamt 	}
    987  1.1  yamt 	error = simplefetch(xc, INT8OID, &dummy);
    988  1.1  yamt 	assert(error != 0 || dummy == 1);
    989  1.1  yamt 	if (error == ENOENT) {
    990  1.1  yamt 		/*
    991  1.1  yamt 		 * there seems to be nothing to flush.
    992  1.1  yamt 		 */
    993  1.1  yamt 		DPRINTF("%p no sync\n", cc);
    994  1.1  yamt 		error = 0;
    995  1.1  yamt 	}
    996  1.1  yamt 	if (error != 0) {
    997  1.1  yamt 		goto got_error;
    998  1.1  yamt 	}
    999  1.1  yamt 	error = commit_sync(xc);
   1000  1.1  yamt 	if (error != 0) {
   1001  1.1  yamt 		goto got_error;
   1002  1.1  yamt 	}
   1003  1.1  yamt 	goto done;
   1004  1.1  yamt got_error:
   1005  1.1  yamt 	rollback(xc);
   1006  1.1  yamt 	if (error == EAGAIN) {
   1007  1.1  yamt 		goto retry;
   1008  1.1  yamt 	}
   1009  1.1  yamt done:
   1010  1.1  yamt 	assert(flusher == cc);
   1011  1.1  yamt 	flusher = NULL;
   1012  1.1  yamt 	wakeup_one(&flushwaitq);
   1013  1.1  yamt 	DPRINTF("%p end flushing error=%d\n", cc, error);
   1014  1.1  yamt 	return error;
   1015  1.1  yamt }
   1016