Home | History | Annotate | Line # | Download | only in pgfs
pgfs_db.c revision 1.1.2.1
      1      1.1  yamt /*	$NetBSD: pgfs_db.c,v 1.1.2.1 2012/04/17 00:05:44 yamt Exp $	*/
      2      1.1  yamt 
      3      1.1  yamt /*-
      4      1.1  yamt  * Copyright (c)2010,2011 YAMAMOTO Takashi,
      5      1.1  yamt  * All rights reserved.
      6      1.1  yamt  *
      7      1.1  yamt  * Redistribution and use in source and binary forms, with or without
      8      1.1  yamt  * modification, are permitted provided that the following conditions
      9      1.1  yamt  * are met:
     10      1.1  yamt  * 1. Redistributions of source code must retain the above copyright
     11      1.1  yamt  *    notice, this list of conditions and the following disclaimer.
     12      1.1  yamt  * 2. Redistributions in binary form must reproduce the above copyright
     13      1.1  yamt  *    notice, this list of conditions and the following disclaimer in the
     14      1.1  yamt  *    documentation and/or other materials provided with the distribution.
     15      1.1  yamt  *
     16      1.1  yamt  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
     17      1.1  yamt  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     18      1.1  yamt  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     19      1.1  yamt  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     20      1.1  yamt  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     21      1.1  yamt  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
     22      1.1  yamt  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     23      1.1  yamt  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     24      1.1  yamt  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     25      1.1  yamt  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     26      1.1  yamt  * SUCH DAMAGE.
     27      1.1  yamt  */
     28      1.1  yamt 
     29      1.1  yamt /*
     30      1.1  yamt  * backend db operations
     31      1.1  yamt  */
     32      1.1  yamt 
     33      1.1  yamt #include <sys/cdefs.h>
     34      1.1  yamt #ifndef lint
     35      1.1  yamt __RCSID("$NetBSD: pgfs_db.c,v 1.1.2.1 2012/04/17 00:05:44 yamt Exp $");
     36      1.1  yamt #endif /* not lint */
     37      1.1  yamt 
     38      1.1  yamt #include <assert.h>
     39      1.1  yamt #include <err.h>
     40      1.1  yamt #include <errno.h>
     41      1.1  yamt #include <inttypes.h>
     42      1.1  yamt #include <puffs.h>
     43      1.1  yamt #include <stdbool.h>
     44      1.1  yamt #include <stdarg.h>
     45      1.1  yamt #include <stdio.h>
     46      1.1  yamt #include <stdlib.h>
     47      1.1  yamt #include <util.h>
     48      1.1  yamt 
     49      1.1  yamt #include <libpq-fe.h>
     50      1.1  yamt 
     51      1.1  yamt #include "pgfs_db.h"
     52      1.1  yamt #include "pgfs_waitq.h"
     53      1.1  yamt #include "pgfs_debug.h"
     54      1.1  yamt 
     55      1.1  yamt bool pgfs_dosync = false;
     56      1.1  yamt 
     57      1.1  yamt struct Xconn {
     58      1.1  yamt 	TAILQ_ENTRY(Xconn) list;
     59      1.1  yamt 	PGconn *conn;
     60      1.1  yamt 	struct puffs_cc *blocker;
     61      1.1  yamt 	struct puffs_cc *owner;
     62      1.1  yamt 	bool in_trans;
     63      1.1  yamt 	int id;
     64  1.1.2.1  yamt 	const char *label;
     65      1.1  yamt };
     66      1.1  yamt 
     67      1.1  yamt static void
     68      1.1  yamt dumperror(struct Xconn *xc, const PGresult *res)
     69      1.1  yamt {
     70      1.1  yamt 	static const struct {
     71      1.1  yamt 		const char *name;
     72      1.1  yamt 		int code;
     73      1.1  yamt 	} fields[] = {
     74      1.1  yamt #define F(x)	{ .name = #x, .code = x, }
     75      1.1  yamt 		F(PG_DIAG_SEVERITY),
     76      1.1  yamt 		F(PG_DIAG_SQLSTATE),
     77      1.1  yamt 		F(PG_DIAG_MESSAGE_PRIMARY),
     78      1.1  yamt 		F(PG_DIAG_MESSAGE_DETAIL),
     79      1.1  yamt 		F(PG_DIAG_MESSAGE_HINT),
     80      1.1  yamt 		F(PG_DIAG_STATEMENT_POSITION),
     81      1.1  yamt 		F(PG_DIAG_INTERNAL_POSITION),
     82      1.1  yamt 		F(PG_DIAG_INTERNAL_QUERY),
     83      1.1  yamt 		F(PG_DIAG_CONTEXT),
     84      1.1  yamt 		F(PG_DIAG_SOURCE_FILE),
     85      1.1  yamt 		F(PG_DIAG_SOURCE_LINE),
     86      1.1  yamt 		F(PG_DIAG_SOURCE_FUNCTION),
     87      1.1  yamt #undef F
     88      1.1  yamt 	};
     89      1.1  yamt 	unsigned int i;
     90      1.1  yamt 
     91      1.1  yamt 	if (!pgfs_dodprintf) {
     92      1.1  yamt 		return;
     93      1.1  yamt 	}
     94      1.1  yamt 	assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR ||
     95      1.1  yamt 	    PQresultStatus(res) == PGRES_FATAL_ERROR);
     96      1.1  yamt 	for (i = 0; i < __arraycount(fields); i++) {
     97      1.1  yamt 		const char *val = PQresultErrorField(res, fields[i].code);
     98      1.1  yamt 
     99      1.1  yamt 		if (val == NULL) {
    100      1.1  yamt 			continue;
    101      1.1  yamt 		}
    102      1.1  yamt 		fprintf(stderr, "%s: %s\n", fields[i].name, val);
    103      1.1  yamt 	}
    104      1.1  yamt }
    105      1.1  yamt 
    106      1.1  yamt TAILQ_HEAD(, Xconn) xclist = TAILQ_HEAD_INITIALIZER(xclist);
    107      1.1  yamt struct waitq xcwaitq = TAILQ_HEAD_INITIALIZER(xcwaitq);
    108      1.1  yamt 
    109      1.1  yamt static struct Xconn *
    110      1.1  yamt getxc(struct puffs_cc *cc)
    111      1.1  yamt {
    112      1.1  yamt 	struct Xconn *xc;
    113      1.1  yamt 
    114      1.1  yamt 	assert(cc != NULL);
    115      1.1  yamt retry:
    116      1.1  yamt 	TAILQ_FOREACH(xc, &xclist, list) {
    117      1.1  yamt 		if (xc->blocker == NULL) {
    118      1.1  yamt 			assert(xc->owner == NULL);
    119      1.1  yamt 			xc->owner = cc;
    120      1.1  yamt 			DPRINTF("xc %p acquire %p\n", xc, cc);
    121      1.1  yamt 			return xc;
    122      1.1  yamt 		} else {
    123      1.1  yamt 			assert(xc->owner == xc->blocker);
    124      1.1  yamt 		}
    125      1.1  yamt 	}
    126      1.1  yamt 	DPRINTF("no free conn %p\n", cc);
    127      1.1  yamt 	waiton(&xcwaitq, cc);
    128      1.1  yamt 	goto retry;
    129      1.1  yamt }
    130      1.1  yamt 
    131      1.1  yamt static void
    132      1.1  yamt relxc(struct Xconn *xc)
    133      1.1  yamt {
    134      1.1  yamt 
    135      1.1  yamt 	assert(xc->in_trans);
    136      1.1  yamt 	assert(xc->owner != NULL);
    137      1.1  yamt 	xc->in_trans = false;
    138      1.1  yamt 	xc->owner = NULL;
    139      1.1  yamt 	wakeup_one(&xcwaitq);
    140      1.1  yamt }
    141      1.1  yamt 
    142      1.1  yamt static void
    143      1.1  yamt pqwait(struct Xconn *xc)
    144      1.1  yamt {
    145      1.1  yamt 	PGconn *conn = xc->conn;
    146      1.1  yamt 	struct puffs_cc *cc = xc->owner;
    147      1.1  yamt 
    148      1.1  yamt 	if (PQflush(conn)) {
    149      1.1  yamt 		errx(EXIT_FAILURE, "PQflush: %s", PQerrorMessage(conn));
    150      1.1  yamt 	}
    151      1.1  yamt 	if (!PQisBusy(conn)) {
    152      1.1  yamt 		return;
    153      1.1  yamt 	}
    154      1.1  yamt 	assert(xc->blocker == NULL);
    155      1.1  yamt 	xc->blocker = cc;
    156      1.1  yamt 	DPRINTF("yielding %p\n", cc);
    157      1.1  yamt 	/* XXX is it safe to yield before entering mainloop? */
    158      1.1  yamt 	puffs_cc_yield(cc);
    159      1.1  yamt 	DPRINTF("yield returned %p\n", cc);
    160      1.1  yamt 	assert(xc->owner == cc);
    161      1.1  yamt 	assert(xc->blocker == cc);
    162      1.1  yamt 	xc->blocker = NULL;
    163      1.1  yamt }
    164      1.1  yamt 
    165      1.1  yamt static int
    166      1.1  yamt sqltoerrno(const char *sqlstate)
    167      1.1  yamt {
    168      1.1  yamt 	/*
    169      1.1  yamt 	 * XXX hack; ERRCODE_INTERNAL_ERROR -> EAGAIN to handle
    170      1.1  yamt 	 * "tuple concurrently updated" errors for lowrite/lo_truncate.
    171      1.1  yamt 	 *
    172      1.1  yamt 	 * XXX should map ERRCODE_OUT_OF_MEMORY to EAGAIN?
    173      1.1  yamt 	 */
    174      1.1  yamt 	static const struct {
    175      1.1  yamt 		char sqlstate[5];
    176      1.1  yamt 		int error;
    177      1.1  yamt 	} map[] = {
    178      1.1  yamt 		{ "00000", 0, },	/* ERRCODE_SUCCESSFUL_COMPLETION */
    179      1.1  yamt 		{ "02000", ENOENT, },	/* ERRCODE_NO_DATA */
    180      1.1  yamt 		{ "23505", EEXIST, },	/* ERRCODE_UNIQUE_VIOLATION */
    181      1.1  yamt 		{ "23514", EINVAL, },	/* ERRCODE_CHECK_VIOLATION */
    182      1.1  yamt 		{ "40001", EAGAIN, },	/* ERRCODE_T_R_SERIALIZATION_FAILURE */
    183      1.1  yamt 		{ "40P01", EAGAIN, },	/* ERRCODE_T_R_DEADLOCK_DETECTED */
    184      1.1  yamt 		{ "42704", ENOENT, },	/* ERRCODE_UNDEFINED_OBJECT */
    185      1.1  yamt 		{ "53100", ENOSPC, },	/* ERRCODE_DISK_FULL */
    186      1.1  yamt 		{ "53200", ENOMEM, },	/* ERRCODE_OUT_OF_MEMORY */
    187      1.1  yamt 		{ "XX000", EAGAIN, },	/* ERRCODE_INTERNAL_ERROR */
    188      1.1  yamt 	};
    189      1.1  yamt 	unsigned int i;
    190      1.1  yamt 
    191      1.1  yamt 	for (i = 0; i < __arraycount(map); i++) {
    192      1.1  yamt 		if (!memcmp(map[i].sqlstate, sqlstate, 5)) {
    193      1.1  yamt 			const int error = map[i].error;
    194      1.1  yamt 
    195      1.1  yamt 			if (error != 0) {
    196      1.1  yamt 				DPRINTF("sqlstate %5s mapped to error %d\n",
    197      1.1  yamt 				    sqlstate, error);
    198      1.1  yamt 			}
    199      1.1  yamt 			if (error == EINVAL) {
    200      1.1  yamt 				/*
    201      1.1  yamt 				 * sounds like a bug.
    202      1.1  yamt 				 */
    203      1.1  yamt 				abort();
    204      1.1  yamt 			}
    205      1.1  yamt 			return error;
    206      1.1  yamt 		}
    207      1.1  yamt 	}
    208      1.1  yamt 	DPRINTF("unknown sqlstate %5s mapped to EIO\n", sqlstate);
    209      1.1  yamt 	return EIO;
    210      1.1  yamt }
    211      1.1  yamt 
    212      1.1  yamt struct cmd {
    213      1.1  yamt 	char name[32];		/* name of prepared statement */
    214      1.1  yamt 	char *cmd;		/* query string */
    215      1.1  yamt 	unsigned int nparams;
    216      1.1  yamt 	Oid *paramtypes;
    217      1.1  yamt 	uint32_t prepared_mask;	/* for which connections this is prepared? */
    218      1.1  yamt 	unsigned int flags;	/* CMD_ flags */
    219      1.1  yamt };
    220      1.1  yamt 
    221      1.1  yamt #define	CMD_NOPREPARE	1	/* don't prepare this command */
    222      1.1  yamt 
    223      1.1  yamt struct cmd *
    224      1.1  yamt createcmd(const char *cmd, unsigned int flags, ...)
    225      1.1  yamt {
    226      1.1  yamt 	struct cmd *c;
    227      1.1  yamt 	va_list ap;
    228      1.1  yamt 	const char *cp;
    229      1.1  yamt 	unsigned int i;
    230      1.1  yamt 	static unsigned int cmdid;
    231      1.1  yamt 
    232      1.1  yamt 	c = emalloc(sizeof(*c));
    233      1.1  yamt 	c->cmd = estrdup(cmd);
    234      1.1  yamt 	c->nparams = 0;
    235      1.1  yamt 	va_start(ap, flags);
    236      1.1  yamt 	for (cp = cmd; *cp != 0; cp++) {
    237      1.1  yamt 		if (*cp == '$') { /* XXX */
    238      1.1  yamt 			c->nparams++;
    239      1.1  yamt 		}
    240      1.1  yamt 	}
    241      1.1  yamt 	c->paramtypes = emalloc(c->nparams * sizeof(*c->paramtypes));
    242      1.1  yamt 	for (i = 0; i < c->nparams; i++) {
    243      1.1  yamt 		Oid type = va_arg(ap, Oid);
    244      1.1  yamt 		assert(type == BYTEA ||
    245      1.1  yamt 		    type == INT4OID || type == INT8OID || type == OIDOID ||
    246      1.1  yamt 		    type == TEXTOID || type == TIMESTAMPTZOID);
    247      1.1  yamt 		c->paramtypes[i] = type;
    248      1.1  yamt 	}
    249      1.1  yamt 	va_end(ap);
    250      1.1  yamt 	snprintf(c->name, sizeof(c->name), "%u", cmdid++);
    251      1.1  yamt 	if ((flags & CMD_NOPREPARE) != 0) {
    252      1.1  yamt 		c->prepared_mask = ~0;
    253      1.1  yamt 	} else {
    254      1.1  yamt 		c->prepared_mask = 0;
    255      1.1  yamt 	}
    256      1.1  yamt 	c->flags = flags;
    257      1.1  yamt 	return c;
    258      1.1  yamt }
    259      1.1  yamt 
    260      1.1  yamt static void
    261      1.1  yamt freecmd(struct cmd *c)
    262      1.1  yamt {
    263      1.1  yamt 
    264      1.1  yamt 	free(c->paramtypes);
    265      1.1  yamt 	free(c->cmd);
    266      1.1  yamt 	free(c);
    267      1.1  yamt }
    268      1.1  yamt 
    269      1.1  yamt static int
    270      1.1  yamt fetch_noresult(struct Xconn *xc)
    271      1.1  yamt {
    272      1.1  yamt 	PGresult *res;
    273      1.1  yamt 	ExecStatusType status;
    274      1.1  yamt 	PGconn *conn = xc->conn;
    275      1.1  yamt 	int error;
    276      1.1  yamt 
    277      1.1  yamt 	pqwait(xc);
    278      1.1  yamt 	res = PQgetResult(conn);
    279      1.1  yamt 	if (res == NULL) {
    280      1.1  yamt 		return ENOENT;
    281      1.1  yamt 	}
    282      1.1  yamt 	status = PQresultStatus(res);
    283      1.1  yamt 	if (status == PGRES_COMMAND_OK) {
    284      1.1  yamt 		assert(PQnfields(res) == 0);
    285      1.1  yamt 		assert(PQntuples(res) == 0);
    286      1.1  yamt 		if (!strcmp(PQcmdTuples(res), "0")) {
    287      1.1  yamt 			error = ENOENT;
    288      1.1  yamt 		} else {
    289      1.1  yamt 			error = 0;
    290      1.1  yamt 		}
    291      1.1  yamt 	} else if (status == PGRES_FATAL_ERROR) {
    292      1.1  yamt 		error = sqltoerrno(PQresultErrorField(res, PG_DIAG_SQLSTATE));
    293      1.1  yamt 		assert(error != 0);
    294      1.1  yamt 		dumperror(xc, res);
    295      1.1  yamt 	} else {
    296      1.1  yamt 		errx(1, "%s not command_ok: %d: %s", __func__,
    297      1.1  yamt 		    (int)status,
    298      1.1  yamt 		    PQerrorMessage(conn));
    299      1.1  yamt 	}
    300      1.1  yamt 	PQclear(res);
    301      1.1  yamt 	res = PQgetResult(conn);
    302      1.1  yamt 	assert(res == NULL);
    303      1.1  yamt 	if (error != 0) {
    304      1.1  yamt 		DPRINTF("error %d\n", error);
    305      1.1  yamt 	}
    306      1.1  yamt 	return error;
    307      1.1  yamt }
    308      1.1  yamt 
    309      1.1  yamt static int
    310      1.1  yamt preparecmd(struct Xconn *xc, struct cmd *c)
    311      1.1  yamt {
    312      1.1  yamt 	PGconn *conn = xc->conn;
    313      1.1  yamt 	const uint32_t mask = 1 << xc->id;
    314      1.1  yamt 	int error;
    315      1.1  yamt 	int ret;
    316      1.1  yamt 
    317      1.1  yamt 	if ((c->prepared_mask & mask) != 0) {
    318      1.1  yamt 		return 0;
    319      1.1  yamt 	}
    320      1.1  yamt 	assert((c->flags & CMD_NOPREPARE) == 0);
    321      1.1  yamt 	DPRINTF("PREPARE: '%s'\n", c->cmd);
    322      1.1  yamt 	ret = PQsendPrepare(conn, c->name, c->cmd, c->nparams, c->paramtypes);
    323      1.1  yamt 	if (!ret) {
    324      1.1  yamt 		errx(EXIT_FAILURE, "PQsendPrepare: %s",
    325      1.1  yamt 		    PQerrorMessage(conn));
    326      1.1  yamt 	}
    327      1.1  yamt 	error = fetch_noresult(xc);
    328      1.1  yamt 	if (error != 0) {
    329      1.1  yamt 		return error;
    330      1.1  yamt 	}
    331      1.1  yamt 	c->prepared_mask |= mask;
    332      1.1  yamt 	return 0;
    333      1.1  yamt }
    334      1.1  yamt 
    335      1.1  yamt /*
    336      1.1  yamt  * vsendcmd:
    337      1.1  yamt  *
    338      1.1  yamt  * resultmode is just passed to PQsendQueryParams/PQsendQueryPrepared.
    339      1.1  yamt  * 0 for text and 1 for binary.
    340      1.1  yamt  */
    341      1.1  yamt 
    342      1.1  yamt static int
    343      1.1  yamt vsendcmd(struct Xconn *xc, int resultmode, struct cmd *c, va_list ap)
    344      1.1  yamt {
    345      1.1  yamt 	PGconn *conn = xc->conn;
    346      1.1  yamt 	char **paramvalues;
    347      1.1  yamt 	int *paramlengths;
    348      1.1  yamt 	int *paramformats;
    349      1.1  yamt 	unsigned int i;
    350      1.1  yamt 	int error;
    351      1.1  yamt 	int ret;
    352      1.1  yamt 
    353      1.1  yamt 	assert(xc->owner != NULL);
    354      1.1  yamt 	assert(xc->blocker == NULL);
    355      1.1  yamt 	error = preparecmd(xc, c);
    356      1.1  yamt 	if (error != 0) {
    357      1.1  yamt 		return error;
    358      1.1  yamt 	}
    359      1.1  yamt 	paramvalues = emalloc(c->nparams * sizeof(*paramvalues));
    360      1.1  yamt 	paramlengths = NULL;
    361      1.1  yamt 	paramformats = NULL;
    362      1.1  yamt 	DPRINTF("CMD: '%s'\n", c->cmd);
    363      1.1  yamt 	for (i = 0; i < c->nparams; i++) {
    364      1.1  yamt 		Oid type = c->paramtypes[i];
    365      1.1  yamt 		char tmpstore[1024];
    366      1.1  yamt 		const char *buf = NULL;
    367      1.1  yamt 		intmax_t v = 0; /* XXXgcc */
    368      1.1  yamt 		int sz;
    369      1.1  yamt 		bool binary = false;
    370      1.1  yamt 
    371      1.1  yamt 		switch (type) {
    372      1.1  yamt 		case BYTEA:
    373      1.1  yamt 			buf = va_arg(ap, const void *);
    374      1.1  yamt 			sz = (int)va_arg(ap, size_t);
    375      1.1  yamt 			binary = true;
    376      1.1  yamt 			break;
    377      1.1  yamt 		case INT8OID:
    378      1.1  yamt 		case OIDOID:
    379      1.1  yamt 		case INT4OID:
    380      1.1  yamt 			switch (type) {
    381      1.1  yamt 			case INT8OID:
    382      1.1  yamt 				v = (intmax_t)va_arg(ap, int64_t);
    383      1.1  yamt 				break;
    384      1.1  yamt 			case OIDOID:
    385      1.1  yamt 				v = (intmax_t)va_arg(ap, Oid);
    386      1.1  yamt 				break;
    387      1.1  yamt 			case INT4OID:
    388      1.1  yamt 				v = (intmax_t)va_arg(ap, int32_t);
    389      1.1  yamt 				break;
    390      1.1  yamt 			default:
    391      1.1  yamt 				errx(EXIT_FAILURE, "unknown integer oid %u",
    392      1.1  yamt 				    type);
    393      1.1  yamt 			}
    394      1.1  yamt 			buf = tmpstore;
    395      1.1  yamt 			sz = snprintf(tmpstore, sizeof(tmpstore),
    396      1.1  yamt 			    "%jd", v);
    397      1.1  yamt 			assert(sz != -1);
    398      1.1  yamt 			assert((size_t)sz < sizeof(tmpstore));
    399      1.1  yamt 			sz += 1;
    400      1.1  yamt 			break;
    401      1.1  yamt 		case TEXTOID:
    402      1.1  yamt 		case TIMESTAMPTZOID:
    403      1.1  yamt 			buf = va_arg(ap, char *);
    404      1.1  yamt 			sz = strlen(buf) + 1;
    405      1.1  yamt 			break;
    406      1.1  yamt 		default:
    407      1.1  yamt 			errx(EXIT_FAILURE, "%s: unknown param type %u",
    408      1.1  yamt 			    __func__, type);
    409      1.1  yamt 		}
    410      1.1  yamt 		if (binary) {
    411      1.1  yamt 			if (paramlengths == NULL) {
    412      1.1  yamt 				paramlengths =
    413      1.1  yamt 				    emalloc(c->nparams * sizeof(*paramformats));
    414      1.1  yamt 			}
    415      1.1  yamt 			if (paramformats == NULL) {
    416      1.1  yamt 				paramformats = ecalloc(1,
    417      1.1  yamt 				    c->nparams * sizeof(*paramformats));
    418      1.1  yamt 			}
    419      1.1  yamt 			paramformats[i] = 1;
    420      1.1  yamt 			paramlengths[i] = sz;
    421      1.1  yamt 		}
    422      1.1  yamt 		paramvalues[i] = emalloc(sz);
    423      1.1  yamt 		memcpy(paramvalues[i], buf, sz);
    424      1.1  yamt 		if (binary) {
    425      1.1  yamt 			DPRINTF("\t[%u]=<BINARY>\n", i);
    426      1.1  yamt 		} else {
    427      1.1  yamt 			DPRINTF("\t[%u]='%s'\n", i, paramvalues[i]);
    428      1.1  yamt 		}
    429      1.1  yamt 	}
    430      1.1  yamt 	if ((c->flags & CMD_NOPREPARE) != 0) {
    431      1.1  yamt 		ret = PQsendQueryParams(conn, c->cmd, c->nparams, c->paramtypes,
    432      1.1  yamt 		    (const char * const *)paramvalues, paramlengths,
    433      1.1  yamt 		    paramformats, resultmode);
    434      1.1  yamt 	} else {
    435      1.1  yamt 		ret = PQsendQueryPrepared(conn, c->name, c->nparams,
    436      1.1  yamt 		    (const char * const *)paramvalues, paramlengths,
    437      1.1  yamt 		    paramformats, resultmode);
    438      1.1  yamt 	}
    439      1.1  yamt 	for (i = 0; i < c->nparams; i++) {
    440      1.1  yamt 		free(paramvalues[i]);
    441      1.1  yamt 	}
    442      1.1  yamt 	free(paramvalues);
    443      1.1  yamt 	free(paramlengths);
    444      1.1  yamt 	free(paramformats);
    445      1.1  yamt 	if (!ret) {
    446      1.1  yamt 		errx(EXIT_FAILURE, "PQsendQueryPrepared: %s",
    447      1.1  yamt 		    PQerrorMessage(conn));
    448      1.1  yamt 	}
    449      1.1  yamt 	return 0;
    450      1.1  yamt }
    451      1.1  yamt 
    452      1.1  yamt int
    453      1.1  yamt sendcmd(struct Xconn *xc, struct cmd *c, ...)
    454      1.1  yamt {
    455      1.1  yamt 	va_list ap;
    456      1.1  yamt 	int error;
    457      1.1  yamt 
    458      1.1  yamt 	va_start(ap, c);
    459      1.1  yamt 	error = vsendcmd(xc, 0, c, ap);
    460      1.1  yamt 	va_end(ap);
    461      1.1  yamt 	return error;
    462      1.1  yamt }
    463      1.1  yamt 
    464      1.1  yamt int
    465      1.1  yamt sendcmdx(struct Xconn *xc, int resultmode, struct cmd *c, ...)
    466      1.1  yamt {
    467      1.1  yamt 	va_list ap;
    468      1.1  yamt 	int error;
    469      1.1  yamt 
    470      1.1  yamt 	va_start(ap, c);
    471      1.1  yamt 	error = vsendcmd(xc, resultmode, c, ap);
    472      1.1  yamt 	va_end(ap);
    473      1.1  yamt 	return error;
    474      1.1  yamt }
    475      1.1  yamt 
    476      1.1  yamt /*
    477      1.1  yamt  * simplecmd: a convenient routine to execute a command which returns
    478      1.1  yamt  * no rows synchronously.
    479      1.1  yamt  */
    480      1.1  yamt 
    481      1.1  yamt int
    482      1.1  yamt simplecmd(struct Xconn *xc, struct cmd *c, ...)
    483      1.1  yamt {
    484      1.1  yamt 	va_list ap;
    485      1.1  yamt 	int error;
    486      1.1  yamt 
    487      1.1  yamt 	va_start(ap, c);
    488      1.1  yamt 	error = vsendcmd(xc, 0, c, ap);
    489      1.1  yamt 	va_end(ap);
    490      1.1  yamt 	if (error != 0) {
    491      1.1  yamt 		return error;
    492      1.1  yamt 	}
    493      1.1  yamt 	return fetch_noresult(xc);
    494      1.1  yamt }
    495      1.1  yamt 
    496      1.1  yamt void
    497      1.1  yamt fetchinit(struct fetchstatus *s, struct Xconn *xc)
    498      1.1  yamt {
    499      1.1  yamt 	s->xc = xc;
    500      1.1  yamt 	s->res = NULL;
    501      1.1  yamt 	s->cur = 0;
    502      1.1  yamt 	s->nrows = 0;
    503      1.1  yamt 	s->done = false;
    504      1.1  yamt }
    505      1.1  yamt 
    506      1.1  yamt static intmax_t
    507      1.1  yamt getint(const char *str)
    508      1.1  yamt {
    509      1.1  yamt 	intmax_t i;
    510      1.1  yamt 	char *ep;
    511      1.1  yamt 
    512      1.1  yamt 	errno = 0;
    513      1.1  yamt 	i = strtoimax(str, &ep, 10);
    514      1.1  yamt 	assert(errno == 0);
    515      1.1  yamt 	assert(str[0] != 0);
    516      1.1  yamt 	assert(*ep == 0);
    517      1.1  yamt 	return i;
    518      1.1  yamt }
    519      1.1  yamt 
    520      1.1  yamt static int
    521      1.1  yamt vfetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, va_list ap)
    522      1.1  yamt {
    523      1.1  yamt 	PGconn *conn = s->xc->conn;
    524      1.1  yamt 	unsigned int i;
    525      1.1  yamt 
    526      1.1  yamt 	assert(conn != NULL);
    527      1.1  yamt 	if (s->res == NULL) {
    528      1.1  yamt 		ExecStatusType status;
    529      1.1  yamt 		int error;
    530      1.1  yamt 
    531      1.1  yamt 		pqwait(s->xc);
    532      1.1  yamt 		s->res = PQgetResult(conn);
    533      1.1  yamt 		if (s->res == NULL) {
    534      1.1  yamt 			s->done = true;
    535      1.1  yamt 			return ENOENT;
    536      1.1  yamt 		}
    537      1.1  yamt 		status = PQresultStatus(s->res);
    538      1.1  yamt 		if (status == PGRES_FATAL_ERROR) {
    539      1.1  yamt 			error = sqltoerrno(
    540      1.1  yamt 			    PQresultErrorField(s->res, PG_DIAG_SQLSTATE));
    541      1.1  yamt 			assert(error != 0);
    542      1.1  yamt 			dumperror(s->xc, s->res);
    543      1.1  yamt 			return error;
    544      1.1  yamt 		}
    545      1.1  yamt 		if (status != PGRES_TUPLES_OK) {
    546      1.1  yamt 			errx(1, "not tuples_ok: %s",
    547      1.1  yamt 			    PQerrorMessage(conn));
    548      1.1  yamt 		}
    549      1.1  yamt 		assert((unsigned int)PQnfields(s->res) == n);
    550      1.1  yamt 		s->nrows = PQntuples(s->res);
    551      1.1  yamt 		if (s->nrows == 0) {
    552      1.1  yamt 			DPRINTF("no rows\n");
    553      1.1  yamt 			return ENOENT;
    554      1.1  yamt 		}
    555      1.1  yamt 		assert(s->nrows >= 1);
    556      1.1  yamt 		s->cur = 0;
    557      1.1  yamt 	}
    558      1.1  yamt 	for (i = 0; i < n; i++) {
    559      1.1  yamt 		size_t size;
    560      1.1  yamt 
    561      1.1  yamt 		assert((types[i] != BYTEA) == (PQfformat(s->res, i) == 0));
    562      1.1  yamt 		DPRINTF("[%u] PQftype = %d, types = %d, value = '%s'\n",
    563      1.1  yamt 		    i, PQftype(s->res, i), types[i],
    564      1.1  yamt 		    PQgetisnull(s->res, s->cur, i) ? "<NULL>" :
    565      1.1  yamt 		    PQfformat(s->res, i) == 0 ? PQgetvalue(s->res, s->cur, i) :
    566      1.1  yamt 		    "<BINARY>");
    567      1.1  yamt 		assert(PQftype(s->res, i) == types[i]);
    568      1.1  yamt 		assert(!PQgetisnull(s->res, s->cur, i));
    569      1.1  yamt 		switch(types[i]) {
    570      1.1  yamt 		case INT8OID:
    571      1.1  yamt 			*va_arg(ap, int64_t *) =
    572      1.1  yamt 			    getint(PQgetvalue(s->res, s->cur, i));
    573      1.1  yamt 			break;
    574      1.1  yamt 		case OIDOID:
    575      1.1  yamt 			*va_arg(ap, Oid *) =
    576      1.1  yamt 			    getint(PQgetvalue(s->res, s->cur, i));
    577      1.1  yamt 			break;
    578      1.1  yamt 		case INT4OID:
    579      1.1  yamt 			*va_arg(ap, int32_t *) =
    580      1.1  yamt 			    getint(PQgetvalue(s->res, s->cur, i));
    581      1.1  yamt 			break;
    582      1.1  yamt 		case TEXTOID:
    583      1.1  yamt 			*va_arg(ap, char **) =
    584      1.1  yamt 			    estrdup(PQgetvalue(s->res, s->cur, i));
    585      1.1  yamt 			break;
    586      1.1  yamt 		case BYTEA:
    587      1.1  yamt 			size = PQgetlength(s->res, s->cur, i);
    588      1.1  yamt 			memcpy(va_arg(ap, void *),
    589      1.1  yamt 			    PQgetvalue(s->res, s->cur, i), size);
    590      1.1  yamt 			*va_arg(ap, size_t *) = size;
    591      1.1  yamt 			break;
    592      1.1  yamt 		default:
    593      1.1  yamt 			errx(EXIT_FAILURE, "%s unknown type %u", __func__,
    594      1.1  yamt 			    types[i]);
    595      1.1  yamt 		}
    596      1.1  yamt 	}
    597      1.1  yamt 	s->cur++;
    598      1.1  yamt 	if (s->cur == s->nrows) {
    599      1.1  yamt 		PQclear(s->res);
    600      1.1  yamt 		s->res = NULL;
    601      1.1  yamt 	}
    602      1.1  yamt 	return 0;
    603      1.1  yamt }
    604      1.1  yamt 
    605      1.1  yamt int
    606      1.1  yamt fetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, ...)
    607      1.1  yamt {
    608      1.1  yamt 	va_list ap;
    609      1.1  yamt 	int error;
    610      1.1  yamt 
    611      1.1  yamt 	va_start(ap, types);
    612      1.1  yamt 	error = vfetchnext(s, n, types, ap);
    613      1.1  yamt 	va_end(ap);
    614      1.1  yamt 	return error;
    615      1.1  yamt }
    616      1.1  yamt 
    617      1.1  yamt void
    618      1.1  yamt fetchdone(struct fetchstatus *s)
    619      1.1  yamt {
    620      1.1  yamt 
    621      1.1  yamt 	if (s->res != NULL) {
    622      1.1  yamt 		PQclear(s->res);
    623      1.1  yamt 		s->res = NULL;
    624      1.1  yamt 	}
    625      1.1  yamt 	if (!s->done) {
    626      1.1  yamt 		PGresult *res;
    627      1.1  yamt 		unsigned int n;
    628      1.1  yamt 
    629      1.1  yamt 		n = 0;
    630      1.1  yamt 		while ((res = PQgetResult(s->xc->conn)) != NULL) {
    631      1.1  yamt 			PQclear(res);
    632      1.1  yamt 			n++;
    633      1.1  yamt 		}
    634      1.1  yamt 		if (n > 0) {
    635      1.1  yamt 			DPRINTF("%u rows dropped\n", n);
    636      1.1  yamt 		}
    637      1.1  yamt 	}
    638      1.1  yamt }
    639      1.1  yamt 
    640      1.1  yamt int
    641      1.1  yamt simplefetch(struct Xconn *xc, Oid type, ...)
    642      1.1  yamt {
    643      1.1  yamt 	struct fetchstatus s;
    644      1.1  yamt 	va_list ap;
    645      1.1  yamt 	int error;
    646      1.1  yamt 
    647      1.1  yamt 	fetchinit(&s, xc);
    648      1.1  yamt 	va_start(ap, type);
    649      1.1  yamt 	error = vfetchnext(&s, 1, &type, ap);
    650      1.1  yamt 	va_end(ap);
    651      1.1  yamt 	assert(error != 0 || s.res == NULL);
    652      1.1  yamt 	fetchdone(&s);
    653      1.1  yamt 	return error;
    654      1.1  yamt }
    655      1.1  yamt 
    656  1.1.2.1  yamt /*
    657  1.1.2.1  yamt  * setlabel: set the descriptive label for the connection.
    658  1.1.2.1  yamt  *
    659  1.1.2.1  yamt  * we use simple pointer comparison for label equality check.
    660  1.1.2.1  yamt  */
    661  1.1.2.1  yamt static void
    662  1.1.2.1  yamt setlabel(struct Xconn *xc, const char *label)
    663  1.1.2.1  yamt {
    664  1.1.2.1  yamt 	int error;
    665  1.1.2.1  yamt 
    666  1.1.2.1  yamt 	/*
    667  1.1.2.1  yamt 	 * put the label into application_name so that it's shown in
    668  1.1.2.1  yamt 	 * pg_stat_activity.  we are sure that our labels don't need
    669  1.1.2.1  yamt 	 * PQescapeStringConn.
    670  1.1.2.1  yamt 	 *
    671  1.1.2.1  yamt 	 * example:
    672  1.1.2.1  yamt 	 *	SELECT pid,application_name,query FROM pg_stat_activity
    673  1.1.2.1  yamt 	 *	WHERE state <> 'idle'
    674  1.1.2.1  yamt 	 */
    675  1.1.2.1  yamt 
    676  1.1.2.1  yamt 	if (label != NULL && label != xc->label) {
    677  1.1.2.1  yamt 		struct cmd *c;
    678  1.1.2.1  yamt 		char cmd_str[1024];
    679  1.1.2.1  yamt 
    680  1.1.2.1  yamt 		snprintf(cmd_str, sizeof(cmd_str),
    681  1.1.2.1  yamt 		    "SET application_name TO 'pgfs: %s'", label);
    682  1.1.2.1  yamt 		c = createcmd(cmd_str, CMD_NOPREPARE);
    683  1.1.2.1  yamt 		error = simplecmd(xc, c);
    684  1.1.2.1  yamt 		freecmd(c);
    685  1.1.2.1  yamt 		assert(error == 0);
    686  1.1.2.1  yamt 		xc->label = label;
    687  1.1.2.1  yamt 	} else {
    688  1.1.2.1  yamt #if 0 /* don't bother to clear label */
    689  1.1.2.1  yamt 		static struct cmd *c;
    690  1.1.2.1  yamt 
    691  1.1.2.1  yamt 		CREATECMD_NOPARAM(c, "SET application_name TO 'pgfs'");
    692  1.1.2.1  yamt 		error = simplecmd(xc, c);
    693  1.1.2.1  yamt 		assert(error == 0);
    694  1.1.2.1  yamt #endif
    695  1.1.2.1  yamt 	}
    696  1.1.2.1  yamt }
    697  1.1.2.1  yamt 
    698      1.1  yamt struct Xconn *
    699  1.1.2.1  yamt begin(struct puffs_usermount *pu, const char *label)
    700      1.1  yamt {
    701      1.1  yamt 	struct Xconn *xc = getxc(puffs_cc_getcc(pu));
    702      1.1  yamt 	static struct cmd *c;
    703      1.1  yamt 	int error;
    704      1.1  yamt 
    705  1.1.2.1  yamt 	setlabel(xc, label);
    706      1.1  yamt 	CREATECMD_NOPARAM(c, "BEGIN");
    707      1.1  yamt 	assert(!xc->in_trans);
    708      1.1  yamt 	error = simplecmd(xc, c);
    709      1.1  yamt 	assert(error == 0);
    710      1.1  yamt 	assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
    711      1.1  yamt 	xc->in_trans = true;
    712      1.1  yamt 	return xc;
    713      1.1  yamt }
    714      1.1  yamt 
    715      1.1  yamt struct Xconn *
    716  1.1.2.1  yamt begin_readonly(struct puffs_usermount *pu, const char *label)
    717      1.1  yamt {
    718      1.1  yamt 	struct Xconn *xc = getxc(puffs_cc_getcc(pu));
    719      1.1  yamt 	static struct cmd *c;
    720      1.1  yamt 	int error;
    721      1.1  yamt 
    722  1.1.2.1  yamt 	setlabel(xc, label);
    723      1.1  yamt 	CREATECMD_NOPARAM(c, "BEGIN READ ONLY");
    724      1.1  yamt 	assert(!xc->in_trans);
    725      1.1  yamt 	error = simplecmd(xc, c);
    726      1.1  yamt 	assert(error == 0);
    727      1.1  yamt 	assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
    728      1.1  yamt 	xc->in_trans = true;
    729      1.1  yamt 	return xc;
    730      1.1  yamt }
    731      1.1  yamt 
    732      1.1  yamt void
    733      1.1  yamt rollback(struct Xconn *xc)
    734      1.1  yamt {
    735      1.1  yamt 	PGTransactionStatusType status;
    736      1.1  yamt 
    737      1.1  yamt 	/*
    738      1.1  yamt 	 * check the status as we are not sure the status of our transaction
    739      1.1  yamt 	 * after a failed commit.
    740      1.1  yamt 	 */
    741      1.1  yamt 	status = PQtransactionStatus(xc->conn);
    742      1.1  yamt 	assert(status != PQTRANS_ACTIVE);
    743      1.1  yamt 	assert(status != PQTRANS_UNKNOWN);
    744      1.1  yamt 	if (status != PQTRANS_IDLE) {
    745      1.1  yamt 		static struct cmd *c;
    746      1.1  yamt 		int error;
    747      1.1  yamt 
    748      1.1  yamt 		assert(status == PQTRANS_INTRANS || status == PQTRANS_INERROR);
    749      1.1  yamt 		CREATECMD_NOPARAM(c, "ROLLBACK");
    750      1.1  yamt 		error = simplecmd(xc, c);
    751      1.1  yamt 		assert(error == 0);
    752      1.1  yamt 	}
    753      1.1  yamt 	DPRINTF("xc %p rollback %p\n", xc, xc->owner);
    754  1.1.2.1  yamt 	setlabel(xc, NULL);
    755      1.1  yamt 	relxc(xc);
    756      1.1  yamt }
    757      1.1  yamt 
    758      1.1  yamt int
    759      1.1  yamt commit(struct Xconn *xc)
    760      1.1  yamt {
    761      1.1  yamt 	static struct cmd *c;
    762      1.1  yamt 	int error;
    763      1.1  yamt 
    764      1.1  yamt 	CREATECMD_NOPARAM(c, "COMMIT");
    765      1.1  yamt 	error = simplecmd(xc, c);
    766  1.1.2.1  yamt 	setlabel(xc, NULL);
    767      1.1  yamt 	if (error == 0) {
    768      1.1  yamt 		DPRINTF("xc %p commit %p\n", xc, xc->owner);
    769      1.1  yamt 		relxc(xc);
    770      1.1  yamt 	}
    771      1.1  yamt 	return error;
    772      1.1  yamt }
    773      1.1  yamt 
    774      1.1  yamt int
    775      1.1  yamt commit_sync(struct Xconn *xc)
    776      1.1  yamt {
    777      1.1  yamt 	static struct cmd *c;
    778      1.1  yamt 	int error;
    779      1.1  yamt 
    780      1.1  yamt 	assert(!pgfs_dosync);
    781      1.1  yamt 	CREATECMD_NOPARAM(c, "SET LOCAL SYNCHRONOUS_COMMIT TO ON");
    782      1.1  yamt 	error = simplecmd(xc, c);
    783      1.1  yamt 	assert(error == 0);
    784      1.1  yamt 	return commit(xc);
    785      1.1  yamt }
    786      1.1  yamt 
    787      1.1  yamt static void
    788      1.1  yamt pgfs_notice_receiver(void *vp, const PGresult *res)
    789      1.1  yamt {
    790      1.1  yamt 	struct Xconn *xc = vp;
    791      1.1  yamt 
    792      1.1  yamt 	assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR);
    793      1.1  yamt 	fprintf(stderr, "got a notice on %p\n", xc);
    794      1.1  yamt 	dumperror(xc, res);
    795      1.1  yamt }
    796      1.1  yamt 
    797      1.1  yamt static int
    798      1.1  yamt pgfs_readframe(struct puffs_usermount *pu, struct puffs_framebuf *pufbuf,
    799      1.1  yamt     int fd, int *done)
    800      1.1  yamt {
    801      1.1  yamt 	struct Xconn *xc;
    802      1.1  yamt 	PGconn *conn;
    803      1.1  yamt 
    804      1.1  yamt 	TAILQ_FOREACH(xc, &xclist, list) {
    805      1.1  yamt 		if (PQsocket(xc->conn) == fd) {
    806      1.1  yamt 			break;
    807      1.1  yamt 		}
    808      1.1  yamt 	}
    809      1.1  yamt 	assert(xc != NULL);
    810      1.1  yamt 	conn = xc->conn;
    811      1.1  yamt 	PQconsumeInput(conn);
    812      1.1  yamt 	if (!PQisBusy(conn)) {
    813      1.1  yamt 		if (xc->blocker != NULL) {
    814      1.1  yamt 			DPRINTF("schedule %p\n", xc->blocker);
    815      1.1  yamt 			puffs_cc_schedule(xc->blocker);
    816      1.1  yamt 		} else {
    817      1.1  yamt 			DPRINTF("no blockers\n");
    818      1.1  yamt 		}
    819      1.1  yamt 	}
    820      1.1  yamt 	*done = 0;
    821      1.1  yamt 	return 0;
    822      1.1  yamt }
    823      1.1  yamt 
    824      1.1  yamt int
    825      1.1  yamt pgfs_connectdb(struct puffs_usermount *pu, const char *dbname,
    826      1.1  yamt     const char *dbuser, bool debug, bool synchronous, unsigned int nconn)
    827      1.1  yamt {
    828      1.1  yamt 	const char *keywords[3+1];
    829      1.1  yamt 	const char *values[3];
    830      1.1  yamt 	unsigned int i;
    831      1.1  yamt 
    832      1.1  yamt 	if (nconn > 32) {
    833      1.1  yamt 		/*
    834      1.1  yamt 		 * limit from sizeof(cmd->prepared_mask)
    835      1.1  yamt 		 */
    836      1.1  yamt 		return EINVAL;
    837      1.1  yamt 	}
    838      1.1  yamt 	if (debug) {
    839      1.1  yamt 		pgfs_dodprintf = true;
    840      1.1  yamt 	}
    841      1.1  yamt 	if (synchronous) {
    842      1.1  yamt 		pgfs_dosync = true;
    843      1.1  yamt 	}
    844      1.1  yamt 	i = 0;
    845      1.1  yamt 	if (dbname != NULL) {
    846      1.1  yamt 		keywords[i] = "dbname";
    847      1.1  yamt 		values[i] = dbname;
    848      1.1  yamt 		i++;
    849      1.1  yamt 	}
    850      1.1  yamt 	if (dbuser != NULL) {
    851      1.1  yamt 		keywords[i] = "user";
    852      1.1  yamt 		values[i] = dbuser;
    853      1.1  yamt 		i++;
    854      1.1  yamt 	}
    855      1.1  yamt 	keywords[i] = "application_name";
    856      1.1  yamt 	values[i] = "pgfs";
    857      1.1  yamt 	i++;
    858      1.1  yamt 	keywords[i] = NULL;
    859      1.1  yamt 	puffs_framev_init(pu, pgfs_readframe, NULL, NULL, NULL, NULL);
    860      1.1  yamt 	for (i = 0; i < nconn; i++) {
    861      1.1  yamt 		struct Xconn *xc;
    862      1.1  yamt 		struct Xconn *xc2;
    863      1.1  yamt 		static int xcid;
    864      1.1  yamt 		PGconn *conn;
    865      1.1  yamt 		struct cmd *c;
    866      1.1  yamt 		int error;
    867      1.1  yamt 
    868      1.1  yamt 		conn = PQconnectdbParams(keywords, values, 0);
    869      1.1  yamt 		if (conn == NULL) {
    870      1.1  yamt 			errx(EXIT_FAILURE,
    871      1.1  yamt 			    "PQconnectdbParams: unknown failure");
    872      1.1  yamt 		}
    873      1.1  yamt 		if (PQstatus(conn) != CONNECTION_OK) {
    874      1.1  yamt 			/*
    875      1.1  yamt 			 * XXX sleep and retry on ERRCODE_CANNOT_CONNECT_NOW
    876      1.1  yamt 			 */
    877      1.1  yamt 			errx(EXIT_FAILURE, "PQconnectdbParams: %s",
    878      1.1  yamt 			    PQerrorMessage(conn));
    879      1.1  yamt 		}
    880      1.1  yamt 		DPRINTF("protocol version %d\n", PQprotocolVersion(conn));
    881      1.1  yamt 		puffs_framev_addfd(pu, PQsocket(conn), PUFFS_FBIO_READ);
    882      1.1  yamt 		xc = emalloc(sizeof(*xc));
    883      1.1  yamt 		xc->conn = conn;
    884      1.1  yamt 		xc->blocker = NULL;
    885      1.1  yamt 		xc->owner = NULL;
    886      1.1  yamt 		xc->in_trans = false;
    887      1.1  yamt 		xc->id = xcid++;
    888  1.1.2.1  yamt 		xc->label = NULL;
    889      1.1  yamt 		assert(xc->id < 32);
    890      1.1  yamt 		PQsetNoticeReceiver(conn, pgfs_notice_receiver, xc);
    891      1.1  yamt 		TAILQ_INSERT_HEAD(&xclist, xc, list);
    892  1.1.2.1  yamt 		xc2 = begin(pu, NULL);
    893      1.1  yamt 		assert(xc2 == xc);
    894      1.1  yamt 		c = createcmd("SET search_path TO pgfs", CMD_NOPREPARE);
    895      1.1  yamt 		error = simplecmd(xc, c);
    896      1.1  yamt 		assert(error == 0);
    897      1.1  yamt 		freecmd(c);
    898      1.1  yamt 		c = createcmd("SET SESSION CHARACTERISTICS AS "
    899      1.1  yamt 		    "TRANSACTION ISOLATION LEVEL REPEATABLE READ",
    900      1.1  yamt 		    CMD_NOPREPARE);
    901      1.1  yamt 		error = simplecmd(xc, c);
    902      1.1  yamt 		assert(error == 0);
    903      1.1  yamt 		freecmd(c);
    904      1.1  yamt 		c = createcmd("SET SESSION TIME ZONE UTC", CMD_NOPREPARE);
    905      1.1  yamt 		error = simplecmd(xc, c);
    906      1.1  yamt 		assert(error == 0);
    907      1.1  yamt 		freecmd(c);
    908      1.1  yamt 		if (!pgfs_dosync) {
    909      1.1  yamt 			c = createcmd("SET SESSION SYNCHRONOUS_COMMIT TO OFF",
    910      1.1  yamt 			    CMD_NOPREPARE);
    911      1.1  yamt 			error = simplecmd(xc, c);
    912      1.1  yamt 			assert(error == 0);
    913      1.1  yamt 			freecmd(c);
    914      1.1  yamt 		}
    915      1.1  yamt 		if (debug) {
    916      1.1  yamt 			struct fetchstatus s;
    917      1.1  yamt 			static const Oid types[] = { INT8OID, };
    918      1.1  yamt 			uint64_t pid;
    919      1.1  yamt 
    920      1.1  yamt 			c = createcmd("SELECT pg_backend_pid()::int8;",
    921      1.1  yamt 			    CMD_NOPREPARE);
    922      1.1  yamt 			error = sendcmd(xc, c);
    923      1.1  yamt 			assert(error == 0);
    924      1.1  yamt 			fetchinit(&s, xc);
    925      1.1  yamt 			error = FETCHNEXT(&s, types, &pid);
    926      1.1  yamt 			fetchdone(&s);
    927      1.1  yamt 			assert(error == 0);
    928      1.1  yamt 			DPRINTF("xc %p backend pid %" PRIu64 "\n", xc, pid);
    929      1.1  yamt 		}
    930      1.1  yamt 		error = commit(xc);
    931      1.1  yamt 		assert(error == 0);
    932      1.1  yamt 		assert(xc->owner == NULL);
    933      1.1  yamt 	}
    934      1.1  yamt 	/*
    935      1.1  yamt 	 * XXX cleanup unlinked files here?  what to do when the filesystem
    936      1.1  yamt 	 * is shared?
    937      1.1  yamt 	 */
    938      1.1  yamt 	return 0;
    939      1.1  yamt }
    940      1.1  yamt 
    941      1.1  yamt struct waitq flushwaitq = TAILQ_HEAD_INITIALIZER(flushwaitq);
    942      1.1  yamt struct puffs_cc *flusher = NULL;
    943      1.1  yamt 
    944      1.1  yamt int
    945      1.1  yamt flush_xacts(struct puffs_usermount *pu)
    946      1.1  yamt {
    947      1.1  yamt 	struct puffs_cc *cc = puffs_cc_getcc(pu);
    948      1.1  yamt 	struct Xconn *xc;
    949      1.1  yamt 	static struct cmd *c;
    950      1.1  yamt 	uint64_t dummy;
    951      1.1  yamt 	int error;
    952      1.1  yamt 
    953      1.1  yamt 	/*
    954      1.1  yamt 	 * flush all previously issued asynchronous transactions.
    955      1.1  yamt 	 *
    956      1.1  yamt 	 * XXX
    957      1.1  yamt 	 * unfortunately it seems that there is no clean way to tell
    958      1.1  yamt 	 * PostgreSQL flush XLOG.  we could perform a CHECKPOINT but it's
    959      1.1  yamt 	 * too expensive and overkill for our purpose.
    960      1.1  yamt 	 * besides, PostgreSQL has an optimization to skip XLOG flushing
    961      1.1  yamt 	 * for transactions which didn't produce WAL records.
    962      1.1  yamt 	 * (changeset f6a0863e3cb72763490ceca2c558d5ef2dddd5f2)
    963      1.1  yamt 	 * it means that an empty transaction ("BEGIN; COMMIT;"), which
    964      1.1  yamt 	 * doesn't produce any WAL records, doesn't flush the XLOG even if
    965      1.1  yamt 	 * synchronous_commit=on.  we issues a dummy setval() to avoid the
    966      1.1  yamt 	 * optimization.
    967      1.1  yamt 	 * on the other hand, we try to avoid creating unnecessary WAL activity
    968      1.1  yamt 	 * by serializing flushing and checking XLOG locations.
    969      1.1  yamt 	 */
    970      1.1  yamt 
    971      1.1  yamt 	assert(!pgfs_dosync);
    972      1.1  yamt 	if (flusher != NULL) { /* serialize flushers */
    973      1.1  yamt 		DPRINTF("%p flush in progress %p\n", cc, flusher);
    974      1.1  yamt 		waiton(&flushwaitq, cc);
    975      1.1  yamt 		assert(flusher == NULL);
    976      1.1  yamt 	}
    977      1.1  yamt 	DPRINTF("%p start flushing\n", cc);
    978      1.1  yamt 	flusher = cc;
    979      1.1  yamt retry:
    980  1.1.2.1  yamt 	xc = begin(pu, "flush");
    981      1.1  yamt 	CREATECMD_NOPARAM(c, "SELECT setval('dummyseq', 1) WHERE "
    982      1.1  yamt 	    "pg_current_xlog_insert_location() <> pg_current_xlog_location()");
    983      1.1  yamt 	error = sendcmd(xc, c);
    984      1.1  yamt 	if (error != 0) {
    985      1.1  yamt 		goto got_error;
    986      1.1  yamt 	}
    987      1.1  yamt 	error = simplefetch(xc, INT8OID, &dummy);
    988      1.1  yamt 	assert(error != 0 || dummy == 1);
    989      1.1  yamt 	if (error == ENOENT) {
    990      1.1  yamt 		/*
    991      1.1  yamt 		 * there seems to be nothing to flush.
    992      1.1  yamt 		 */
    993      1.1  yamt 		DPRINTF("%p no sync\n", cc);
    994      1.1  yamt 		error = 0;
    995      1.1  yamt 	}
    996      1.1  yamt 	if (error != 0) {
    997      1.1  yamt 		goto got_error;
    998      1.1  yamt 	}
    999      1.1  yamt 	error = commit_sync(xc);
   1000      1.1  yamt 	if (error != 0) {
   1001      1.1  yamt 		goto got_error;
   1002      1.1  yamt 	}
   1003      1.1  yamt 	goto done;
   1004      1.1  yamt got_error:
   1005      1.1  yamt 	rollback(xc);
   1006      1.1  yamt 	if (error == EAGAIN) {
   1007      1.1  yamt 		goto retry;
   1008      1.1  yamt 	}
   1009      1.1  yamt done:
   1010      1.1  yamt 	assert(flusher == cc);
   1011      1.1  yamt 	flusher = NULL;
   1012      1.1  yamt 	wakeup_one(&flushwaitq);
   1013      1.1  yamt 	DPRINTF("%p end flushing error=%d\n", cc, error);
   1014      1.1  yamt 	return error;
   1015      1.1  yamt }
   1016