1 1.3 yamt /* $NetBSD: pgfs_db.c,v 1.3 2012/04/11 14:27:43 yamt Exp $ */ 2 1.1 yamt 3 1.1 yamt /*- 4 1.1 yamt * Copyright (c)2010,2011 YAMAMOTO Takashi, 5 1.1 yamt * All rights reserved. 6 1.1 yamt * 7 1.1 yamt * Redistribution and use in source and binary forms, with or without 8 1.1 yamt * modification, are permitted provided that the following conditions 9 1.1 yamt * are met: 10 1.1 yamt * 1. Redistributions of source code must retain the above copyright 11 1.1 yamt * notice, this list of conditions and the following disclaimer. 12 1.1 yamt * 2. Redistributions in binary form must reproduce the above copyright 13 1.1 yamt * notice, this list of conditions and the following disclaimer in the 14 1.1 yamt * documentation and/or other materials provided with the distribution. 15 1.1 yamt * 16 1.1 yamt * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 1.1 yamt * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 1.1 yamt * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 1.1 yamt * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 1.1 yamt * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 1.1 yamt * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 1.1 yamt * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 1.1 yamt * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 1.1 yamt * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 1.1 yamt * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 1.1 yamt * SUCH DAMAGE. 27 1.1 yamt */ 28 1.1 yamt 29 1.1 yamt /* 30 1.1 yamt * backend db operations 31 1.1 yamt */ 32 1.1 yamt 33 1.1 yamt #include <sys/cdefs.h> 34 1.1 yamt #ifndef lint 35 1.3 yamt __RCSID("$NetBSD: pgfs_db.c,v 1.3 2012/04/11 14:27:43 yamt Exp $"); 36 1.1 yamt #endif /* not lint */ 37 1.1 yamt 38 1.1 yamt #include <assert.h> 39 1.1 yamt #include <err.h> 40 1.1 yamt #include <errno.h> 41 1.1 yamt #include <inttypes.h> 42 1.1 yamt #include <puffs.h> 43 1.1 yamt #include <stdbool.h> 44 1.1 yamt #include <stdarg.h> 45 1.1 yamt #include <stdio.h> 46 1.1 yamt #include <stdlib.h> 47 1.1 yamt #include <util.h> 48 1.1 yamt 49 1.1 yamt #include <libpq-fe.h> 50 1.1 yamt 51 1.1 yamt #include "pgfs_db.h" 52 1.1 yamt #include "pgfs_waitq.h" 53 1.1 yamt #include "pgfs_debug.h" 54 1.1 yamt 55 1.1 yamt bool pgfs_dosync = false; 56 1.1 yamt 57 1.1 yamt struct Xconn { 58 1.1 yamt TAILQ_ENTRY(Xconn) list; 59 1.1 yamt PGconn *conn; 60 1.1 yamt struct puffs_cc *blocker; 61 1.1 yamt struct puffs_cc *owner; 62 1.1 yamt bool in_trans; 63 1.1 yamt int id; 64 1.3 yamt const char *label; 65 1.1 yamt }; 66 1.1 yamt 67 1.1 yamt static void 68 1.1 yamt dumperror(struct Xconn *xc, const PGresult *res) 69 1.1 yamt { 70 1.1 yamt static const struct { 71 1.1 yamt const char *name; 72 1.1 yamt int code; 73 1.1 yamt } fields[] = { 74 1.1 yamt #define F(x) { .name = #x, .code = x, } 75 1.1 yamt F(PG_DIAG_SEVERITY), 76 1.1 yamt F(PG_DIAG_SQLSTATE), 77 1.1 yamt F(PG_DIAG_MESSAGE_PRIMARY), 78 1.1 yamt F(PG_DIAG_MESSAGE_DETAIL), 79 1.1 yamt F(PG_DIAG_MESSAGE_HINT), 80 1.1 yamt F(PG_DIAG_STATEMENT_POSITION), 81 1.1 yamt F(PG_DIAG_INTERNAL_POSITION), 82 1.1 yamt F(PG_DIAG_INTERNAL_QUERY), 83 1.1 yamt F(PG_DIAG_CONTEXT), 84 1.1 yamt F(PG_DIAG_SOURCE_FILE), 85 1.1 yamt F(PG_DIAG_SOURCE_LINE), 86 1.1 yamt F(PG_DIAG_SOURCE_FUNCTION), 87 1.1 yamt #undef F 88 1.1 yamt }; 89 1.1 yamt unsigned int i; 90 1.1 yamt 91 1.1 yamt if (!pgfs_dodprintf) { 92 1.1 yamt return; 93 1.1 yamt } 94 1.1 yamt assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR || 95 1.1 yamt PQresultStatus(res) == PGRES_FATAL_ERROR); 96 1.1 yamt for (i = 0; i < __arraycount(fields); i++) { 97 1.1 yamt const char *val = PQresultErrorField(res, fields[i].code); 98 1.1 yamt 99 1.1 yamt if (val == NULL) { 100 1.1 yamt continue; 101 1.1 yamt } 102 1.1 yamt fprintf(stderr, "%s: %s\n", fields[i].name, val); 103 1.1 yamt } 104 1.1 yamt } 105 1.1 yamt 106 1.1 yamt TAILQ_HEAD(, Xconn) xclist = TAILQ_HEAD_INITIALIZER(xclist); 107 1.1 yamt struct waitq xcwaitq = TAILQ_HEAD_INITIALIZER(xcwaitq); 108 1.1 yamt 109 1.1 yamt static struct Xconn * 110 1.1 yamt getxc(struct puffs_cc *cc) 111 1.1 yamt { 112 1.1 yamt struct Xconn *xc; 113 1.1 yamt 114 1.1 yamt assert(cc != NULL); 115 1.1 yamt retry: 116 1.1 yamt TAILQ_FOREACH(xc, &xclist, list) { 117 1.1 yamt if (xc->blocker == NULL) { 118 1.1 yamt assert(xc->owner == NULL); 119 1.1 yamt xc->owner = cc; 120 1.1 yamt DPRINTF("xc %p acquire %p\n", xc, cc); 121 1.1 yamt return xc; 122 1.1 yamt } else { 123 1.1 yamt assert(xc->owner == xc->blocker); 124 1.1 yamt } 125 1.1 yamt } 126 1.1 yamt DPRINTF("no free conn %p\n", cc); 127 1.1 yamt waiton(&xcwaitq, cc); 128 1.1 yamt goto retry; 129 1.1 yamt } 130 1.1 yamt 131 1.1 yamt static void 132 1.1 yamt relxc(struct Xconn *xc) 133 1.1 yamt { 134 1.1 yamt 135 1.1 yamt assert(xc->in_trans); 136 1.1 yamt assert(xc->owner != NULL); 137 1.1 yamt xc->in_trans = false; 138 1.1 yamt xc->owner = NULL; 139 1.1 yamt wakeup_one(&xcwaitq); 140 1.1 yamt } 141 1.1 yamt 142 1.1 yamt static void 143 1.1 yamt pqwait(struct Xconn *xc) 144 1.1 yamt { 145 1.1 yamt PGconn *conn = xc->conn; 146 1.1 yamt struct puffs_cc *cc = xc->owner; 147 1.1 yamt 148 1.1 yamt if (PQflush(conn)) { 149 1.1 yamt errx(EXIT_FAILURE, "PQflush: %s", PQerrorMessage(conn)); 150 1.1 yamt } 151 1.1 yamt if (!PQisBusy(conn)) { 152 1.1 yamt return; 153 1.1 yamt } 154 1.1 yamt assert(xc->blocker == NULL); 155 1.1 yamt xc->blocker = cc; 156 1.1 yamt DPRINTF("yielding %p\n", cc); 157 1.1 yamt /* XXX is it safe to yield before entering mainloop? */ 158 1.1 yamt puffs_cc_yield(cc); 159 1.1 yamt DPRINTF("yield returned %p\n", cc); 160 1.1 yamt assert(xc->owner == cc); 161 1.1 yamt assert(xc->blocker == cc); 162 1.1 yamt xc->blocker = NULL; 163 1.1 yamt } 164 1.1 yamt 165 1.1 yamt static int 166 1.1 yamt sqltoerrno(const char *sqlstate) 167 1.1 yamt { 168 1.1 yamt /* 169 1.1 yamt * XXX hack; ERRCODE_INTERNAL_ERROR -> EAGAIN to handle 170 1.1 yamt * "tuple concurrently updated" errors for lowrite/lo_truncate. 171 1.1 yamt * 172 1.1 yamt * XXX should map ERRCODE_OUT_OF_MEMORY to EAGAIN? 173 1.1 yamt */ 174 1.1 yamt static const struct { 175 1.1 yamt char sqlstate[5]; 176 1.1 yamt int error; 177 1.1 yamt } map[] = { 178 1.1 yamt { "00000", 0, }, /* ERRCODE_SUCCESSFUL_COMPLETION */ 179 1.1 yamt { "02000", ENOENT, }, /* ERRCODE_NO_DATA */ 180 1.1 yamt { "23505", EEXIST, }, /* ERRCODE_UNIQUE_VIOLATION */ 181 1.1 yamt { "23514", EINVAL, }, /* ERRCODE_CHECK_VIOLATION */ 182 1.1 yamt { "40001", EAGAIN, }, /* ERRCODE_T_R_SERIALIZATION_FAILURE */ 183 1.1 yamt { "40P01", EAGAIN, }, /* ERRCODE_T_R_DEADLOCK_DETECTED */ 184 1.1 yamt { "42704", ENOENT, }, /* ERRCODE_UNDEFINED_OBJECT */ 185 1.1 yamt { "53100", ENOSPC, }, /* ERRCODE_DISK_FULL */ 186 1.1 yamt { "53200", ENOMEM, }, /* ERRCODE_OUT_OF_MEMORY */ 187 1.1 yamt { "XX000", EAGAIN, }, /* ERRCODE_INTERNAL_ERROR */ 188 1.1 yamt }; 189 1.1 yamt unsigned int i; 190 1.1 yamt 191 1.1 yamt for (i = 0; i < __arraycount(map); i++) { 192 1.1 yamt if (!memcmp(map[i].sqlstate, sqlstate, 5)) { 193 1.1 yamt const int error = map[i].error; 194 1.1 yamt 195 1.1 yamt if (error != 0) { 196 1.1 yamt DPRINTF("sqlstate %5s mapped to error %d\n", 197 1.1 yamt sqlstate, error); 198 1.1 yamt } 199 1.1 yamt if (error == EINVAL) { 200 1.1 yamt /* 201 1.1 yamt * sounds like a bug. 202 1.1 yamt */ 203 1.1 yamt abort(); 204 1.1 yamt } 205 1.1 yamt return error; 206 1.1 yamt } 207 1.1 yamt } 208 1.1 yamt DPRINTF("unknown sqlstate %5s mapped to EIO\n", sqlstate); 209 1.1 yamt return EIO; 210 1.1 yamt } 211 1.1 yamt 212 1.1 yamt struct cmd { 213 1.1 yamt char name[32]; /* name of prepared statement */ 214 1.1 yamt char *cmd; /* query string */ 215 1.1 yamt unsigned int nparams; 216 1.1 yamt Oid *paramtypes; 217 1.1 yamt uint32_t prepared_mask; /* for which connections this is prepared? */ 218 1.1 yamt unsigned int flags; /* CMD_ flags */ 219 1.1 yamt }; 220 1.1 yamt 221 1.1 yamt #define CMD_NOPREPARE 1 /* don't prepare this command */ 222 1.1 yamt 223 1.1 yamt struct cmd * 224 1.1 yamt createcmd(const char *cmd, unsigned int flags, ...) 225 1.1 yamt { 226 1.1 yamt struct cmd *c; 227 1.1 yamt va_list ap; 228 1.1 yamt const char *cp; 229 1.1 yamt unsigned int i; 230 1.1 yamt static unsigned int cmdid; 231 1.1 yamt 232 1.1 yamt c = emalloc(sizeof(*c)); 233 1.1 yamt c->cmd = estrdup(cmd); 234 1.1 yamt c->nparams = 0; 235 1.1 yamt va_start(ap, flags); 236 1.1 yamt for (cp = cmd; *cp != 0; cp++) { 237 1.1 yamt if (*cp == '$') { /* XXX */ 238 1.1 yamt c->nparams++; 239 1.1 yamt } 240 1.1 yamt } 241 1.1 yamt c->paramtypes = emalloc(c->nparams * sizeof(*c->paramtypes)); 242 1.1 yamt for (i = 0; i < c->nparams; i++) { 243 1.1 yamt Oid type = va_arg(ap, Oid); 244 1.1 yamt assert(type == BYTEA || 245 1.1 yamt type == INT4OID || type == INT8OID || type == OIDOID || 246 1.1 yamt type == TEXTOID || type == TIMESTAMPTZOID); 247 1.1 yamt c->paramtypes[i] = type; 248 1.1 yamt } 249 1.1 yamt va_end(ap); 250 1.1 yamt snprintf(c->name, sizeof(c->name), "%u", cmdid++); 251 1.1 yamt if ((flags & CMD_NOPREPARE) != 0) { 252 1.1 yamt c->prepared_mask = ~0; 253 1.1 yamt } else { 254 1.1 yamt c->prepared_mask = 0; 255 1.1 yamt } 256 1.1 yamt c->flags = flags; 257 1.1 yamt return c; 258 1.1 yamt } 259 1.1 yamt 260 1.1 yamt static void 261 1.1 yamt freecmd(struct cmd *c) 262 1.1 yamt { 263 1.1 yamt 264 1.1 yamt free(c->paramtypes); 265 1.1 yamt free(c->cmd); 266 1.1 yamt free(c); 267 1.1 yamt } 268 1.1 yamt 269 1.1 yamt static int 270 1.1 yamt fetch_noresult(struct Xconn *xc) 271 1.1 yamt { 272 1.1 yamt PGresult *res; 273 1.1 yamt ExecStatusType status; 274 1.1 yamt PGconn *conn = xc->conn; 275 1.1 yamt int error; 276 1.1 yamt 277 1.1 yamt pqwait(xc); 278 1.1 yamt res = PQgetResult(conn); 279 1.1 yamt if (res == NULL) { 280 1.1 yamt return ENOENT; 281 1.1 yamt } 282 1.1 yamt status = PQresultStatus(res); 283 1.1 yamt if (status == PGRES_COMMAND_OK) { 284 1.1 yamt assert(PQnfields(res) == 0); 285 1.1 yamt assert(PQntuples(res) == 0); 286 1.1 yamt if (!strcmp(PQcmdTuples(res), "0")) { 287 1.1 yamt error = ENOENT; 288 1.1 yamt } else { 289 1.1 yamt error = 0; 290 1.1 yamt } 291 1.1 yamt } else if (status == PGRES_FATAL_ERROR) { 292 1.1 yamt error = sqltoerrno(PQresultErrorField(res, PG_DIAG_SQLSTATE)); 293 1.1 yamt assert(error != 0); 294 1.1 yamt dumperror(xc, res); 295 1.1 yamt } else { 296 1.1 yamt errx(1, "%s not command_ok: %d: %s", __func__, 297 1.1 yamt (int)status, 298 1.1 yamt PQerrorMessage(conn)); 299 1.1 yamt } 300 1.1 yamt PQclear(res); 301 1.1 yamt res = PQgetResult(conn); 302 1.1 yamt assert(res == NULL); 303 1.1 yamt if (error != 0) { 304 1.1 yamt DPRINTF("error %d\n", error); 305 1.1 yamt } 306 1.1 yamt return error; 307 1.1 yamt } 308 1.1 yamt 309 1.1 yamt static int 310 1.1 yamt preparecmd(struct Xconn *xc, struct cmd *c) 311 1.1 yamt { 312 1.1 yamt PGconn *conn = xc->conn; 313 1.1 yamt const uint32_t mask = 1 << xc->id; 314 1.1 yamt int error; 315 1.1 yamt int ret; 316 1.1 yamt 317 1.1 yamt if ((c->prepared_mask & mask) != 0) { 318 1.1 yamt return 0; 319 1.1 yamt } 320 1.1 yamt assert((c->flags & CMD_NOPREPARE) == 0); 321 1.1 yamt DPRINTF("PREPARE: '%s'\n", c->cmd); 322 1.1 yamt ret = PQsendPrepare(conn, c->name, c->cmd, c->nparams, c->paramtypes); 323 1.1 yamt if (!ret) { 324 1.1 yamt errx(EXIT_FAILURE, "PQsendPrepare: %s", 325 1.1 yamt PQerrorMessage(conn)); 326 1.1 yamt } 327 1.1 yamt error = fetch_noresult(xc); 328 1.1 yamt if (error != 0) { 329 1.1 yamt return error; 330 1.1 yamt } 331 1.1 yamt c->prepared_mask |= mask; 332 1.1 yamt return 0; 333 1.1 yamt } 334 1.1 yamt 335 1.1 yamt /* 336 1.1 yamt * vsendcmd: 337 1.1 yamt * 338 1.1 yamt * resultmode is just passed to PQsendQueryParams/PQsendQueryPrepared. 339 1.1 yamt * 0 for text and 1 for binary. 340 1.1 yamt */ 341 1.1 yamt 342 1.1 yamt static int 343 1.1 yamt vsendcmd(struct Xconn *xc, int resultmode, struct cmd *c, va_list ap) 344 1.1 yamt { 345 1.1 yamt PGconn *conn = xc->conn; 346 1.1 yamt char **paramvalues; 347 1.1 yamt int *paramlengths; 348 1.1 yamt int *paramformats; 349 1.1 yamt unsigned int i; 350 1.1 yamt int error; 351 1.1 yamt int ret; 352 1.1 yamt 353 1.1 yamt assert(xc->owner != NULL); 354 1.1 yamt assert(xc->blocker == NULL); 355 1.1 yamt error = preparecmd(xc, c); 356 1.1 yamt if (error != 0) { 357 1.1 yamt return error; 358 1.1 yamt } 359 1.1 yamt paramvalues = emalloc(c->nparams * sizeof(*paramvalues)); 360 1.1 yamt paramlengths = NULL; 361 1.1 yamt paramformats = NULL; 362 1.1 yamt DPRINTF("CMD: '%s'\n", c->cmd); 363 1.1 yamt for (i = 0; i < c->nparams; i++) { 364 1.1 yamt Oid type = c->paramtypes[i]; 365 1.1 yamt char tmpstore[1024]; 366 1.1 yamt const char *buf = NULL; 367 1.1 yamt intmax_t v = 0; /* XXXgcc */ 368 1.1 yamt int sz; 369 1.1 yamt bool binary = false; 370 1.1 yamt 371 1.1 yamt switch (type) { 372 1.1 yamt case BYTEA: 373 1.1 yamt buf = va_arg(ap, const void *); 374 1.1 yamt sz = (int)va_arg(ap, size_t); 375 1.1 yamt binary = true; 376 1.1 yamt break; 377 1.1 yamt case INT8OID: 378 1.1 yamt case OIDOID: 379 1.1 yamt case INT4OID: 380 1.1 yamt switch (type) { 381 1.1 yamt case INT8OID: 382 1.1 yamt v = (intmax_t)va_arg(ap, int64_t); 383 1.1 yamt break; 384 1.1 yamt case OIDOID: 385 1.1 yamt v = (intmax_t)va_arg(ap, Oid); 386 1.1 yamt break; 387 1.1 yamt case INT4OID: 388 1.1 yamt v = (intmax_t)va_arg(ap, int32_t); 389 1.1 yamt break; 390 1.1 yamt default: 391 1.1 yamt errx(EXIT_FAILURE, "unknown integer oid %u", 392 1.1 yamt type); 393 1.1 yamt } 394 1.1 yamt buf = tmpstore; 395 1.1 yamt sz = snprintf(tmpstore, sizeof(tmpstore), 396 1.1 yamt "%jd", v); 397 1.1 yamt assert(sz != -1); 398 1.1 yamt assert((size_t)sz < sizeof(tmpstore)); 399 1.1 yamt sz += 1; 400 1.1 yamt break; 401 1.1 yamt case TEXTOID: 402 1.1 yamt case TIMESTAMPTZOID: 403 1.1 yamt buf = va_arg(ap, char *); 404 1.1 yamt sz = strlen(buf) + 1; 405 1.1 yamt break; 406 1.1 yamt default: 407 1.1 yamt errx(EXIT_FAILURE, "%s: unknown param type %u", 408 1.1 yamt __func__, type); 409 1.1 yamt } 410 1.1 yamt if (binary) { 411 1.1 yamt if (paramlengths == NULL) { 412 1.1 yamt paramlengths = 413 1.1 yamt emalloc(c->nparams * sizeof(*paramformats)); 414 1.1 yamt } 415 1.1 yamt if (paramformats == NULL) { 416 1.1 yamt paramformats = ecalloc(1, 417 1.1 yamt c->nparams * sizeof(*paramformats)); 418 1.1 yamt } 419 1.1 yamt paramformats[i] = 1; 420 1.1 yamt paramlengths[i] = sz; 421 1.1 yamt } 422 1.1 yamt paramvalues[i] = emalloc(sz); 423 1.1 yamt memcpy(paramvalues[i], buf, sz); 424 1.1 yamt if (binary) { 425 1.1 yamt DPRINTF("\t[%u]=<BINARY>\n", i); 426 1.1 yamt } else { 427 1.1 yamt DPRINTF("\t[%u]='%s'\n", i, paramvalues[i]); 428 1.1 yamt } 429 1.1 yamt } 430 1.1 yamt if ((c->flags & CMD_NOPREPARE) != 0) { 431 1.1 yamt ret = PQsendQueryParams(conn, c->cmd, c->nparams, c->paramtypes, 432 1.1 yamt (const char * const *)paramvalues, paramlengths, 433 1.1 yamt paramformats, resultmode); 434 1.1 yamt } else { 435 1.1 yamt ret = PQsendQueryPrepared(conn, c->name, c->nparams, 436 1.1 yamt (const char * const *)paramvalues, paramlengths, 437 1.1 yamt paramformats, resultmode); 438 1.1 yamt } 439 1.1 yamt for (i = 0; i < c->nparams; i++) { 440 1.1 yamt free(paramvalues[i]); 441 1.1 yamt } 442 1.1 yamt free(paramvalues); 443 1.1 yamt free(paramlengths); 444 1.1 yamt free(paramformats); 445 1.1 yamt if (!ret) { 446 1.1 yamt errx(EXIT_FAILURE, "PQsendQueryPrepared: %s", 447 1.1 yamt PQerrorMessage(conn)); 448 1.1 yamt } 449 1.1 yamt return 0; 450 1.1 yamt } 451 1.1 yamt 452 1.1 yamt int 453 1.1 yamt sendcmd(struct Xconn *xc, struct cmd *c, ...) 454 1.1 yamt { 455 1.1 yamt va_list ap; 456 1.1 yamt int error; 457 1.1 yamt 458 1.1 yamt va_start(ap, c); 459 1.1 yamt error = vsendcmd(xc, 0, c, ap); 460 1.1 yamt va_end(ap); 461 1.1 yamt return error; 462 1.1 yamt } 463 1.1 yamt 464 1.1 yamt int 465 1.1 yamt sendcmdx(struct Xconn *xc, int resultmode, struct cmd *c, ...) 466 1.1 yamt { 467 1.1 yamt va_list ap; 468 1.1 yamt int error; 469 1.1 yamt 470 1.1 yamt va_start(ap, c); 471 1.1 yamt error = vsendcmd(xc, resultmode, c, ap); 472 1.1 yamt va_end(ap); 473 1.1 yamt return error; 474 1.1 yamt } 475 1.1 yamt 476 1.1 yamt /* 477 1.1 yamt * simplecmd: a convenient routine to execute a command which returns 478 1.1 yamt * no rows synchronously. 479 1.1 yamt */ 480 1.1 yamt 481 1.1 yamt int 482 1.1 yamt simplecmd(struct Xconn *xc, struct cmd *c, ...) 483 1.1 yamt { 484 1.1 yamt va_list ap; 485 1.1 yamt int error; 486 1.1 yamt 487 1.1 yamt va_start(ap, c); 488 1.1 yamt error = vsendcmd(xc, 0, c, ap); 489 1.1 yamt va_end(ap); 490 1.1 yamt if (error != 0) { 491 1.1 yamt return error; 492 1.1 yamt } 493 1.1 yamt return fetch_noresult(xc); 494 1.1 yamt } 495 1.1 yamt 496 1.1 yamt void 497 1.1 yamt fetchinit(struct fetchstatus *s, struct Xconn *xc) 498 1.1 yamt { 499 1.1 yamt s->xc = xc; 500 1.1 yamt s->res = NULL; 501 1.1 yamt s->cur = 0; 502 1.1 yamt s->nrows = 0; 503 1.1 yamt s->done = false; 504 1.1 yamt } 505 1.1 yamt 506 1.1 yamt static intmax_t 507 1.1 yamt getint(const char *str) 508 1.1 yamt { 509 1.1 yamt intmax_t i; 510 1.1 yamt char *ep; 511 1.1 yamt 512 1.1 yamt errno = 0; 513 1.1 yamt i = strtoimax(str, &ep, 10); 514 1.1 yamt assert(errno == 0); 515 1.1 yamt assert(str[0] != 0); 516 1.1 yamt assert(*ep == 0); 517 1.1 yamt return i; 518 1.1 yamt } 519 1.1 yamt 520 1.1 yamt static int 521 1.1 yamt vfetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, va_list ap) 522 1.1 yamt { 523 1.1 yamt PGconn *conn = s->xc->conn; 524 1.1 yamt unsigned int i; 525 1.1 yamt 526 1.1 yamt assert(conn != NULL); 527 1.1 yamt if (s->res == NULL) { 528 1.1 yamt ExecStatusType status; 529 1.1 yamt int error; 530 1.1 yamt 531 1.1 yamt pqwait(s->xc); 532 1.1 yamt s->res = PQgetResult(conn); 533 1.1 yamt if (s->res == NULL) { 534 1.1 yamt s->done = true; 535 1.1 yamt return ENOENT; 536 1.1 yamt } 537 1.1 yamt status = PQresultStatus(s->res); 538 1.1 yamt if (status == PGRES_FATAL_ERROR) { 539 1.1 yamt error = sqltoerrno( 540 1.1 yamt PQresultErrorField(s->res, PG_DIAG_SQLSTATE)); 541 1.1 yamt assert(error != 0); 542 1.1 yamt dumperror(s->xc, s->res); 543 1.1 yamt return error; 544 1.1 yamt } 545 1.1 yamt if (status != PGRES_TUPLES_OK) { 546 1.1 yamt errx(1, "not tuples_ok: %s", 547 1.1 yamt PQerrorMessage(conn)); 548 1.1 yamt } 549 1.1 yamt assert((unsigned int)PQnfields(s->res) == n); 550 1.1 yamt s->nrows = PQntuples(s->res); 551 1.1 yamt if (s->nrows == 0) { 552 1.1 yamt DPRINTF("no rows\n"); 553 1.1 yamt return ENOENT; 554 1.1 yamt } 555 1.1 yamt assert(s->nrows >= 1); 556 1.1 yamt s->cur = 0; 557 1.1 yamt } 558 1.1 yamt for (i = 0; i < n; i++) { 559 1.1 yamt size_t size; 560 1.1 yamt 561 1.1 yamt assert((types[i] != BYTEA) == (PQfformat(s->res, i) == 0)); 562 1.1 yamt DPRINTF("[%u] PQftype = %d, types = %d, value = '%s'\n", 563 1.1 yamt i, PQftype(s->res, i), types[i], 564 1.1 yamt PQgetisnull(s->res, s->cur, i) ? "<NULL>" : 565 1.1 yamt PQfformat(s->res, i) == 0 ? PQgetvalue(s->res, s->cur, i) : 566 1.1 yamt "<BINARY>"); 567 1.1 yamt assert(PQftype(s->res, i) == types[i]); 568 1.1 yamt assert(!PQgetisnull(s->res, s->cur, i)); 569 1.1 yamt switch(types[i]) { 570 1.1 yamt case INT8OID: 571 1.1 yamt *va_arg(ap, int64_t *) = 572 1.1 yamt getint(PQgetvalue(s->res, s->cur, i)); 573 1.1 yamt break; 574 1.1 yamt case OIDOID: 575 1.1 yamt *va_arg(ap, Oid *) = 576 1.1 yamt getint(PQgetvalue(s->res, s->cur, i)); 577 1.1 yamt break; 578 1.1 yamt case INT4OID: 579 1.1 yamt *va_arg(ap, int32_t *) = 580 1.1 yamt getint(PQgetvalue(s->res, s->cur, i)); 581 1.1 yamt break; 582 1.1 yamt case TEXTOID: 583 1.1 yamt *va_arg(ap, char **) = 584 1.1 yamt estrdup(PQgetvalue(s->res, s->cur, i)); 585 1.1 yamt break; 586 1.1 yamt case BYTEA: 587 1.1 yamt size = PQgetlength(s->res, s->cur, i); 588 1.1 yamt memcpy(va_arg(ap, void *), 589 1.1 yamt PQgetvalue(s->res, s->cur, i), size); 590 1.1 yamt *va_arg(ap, size_t *) = size; 591 1.1 yamt break; 592 1.1 yamt default: 593 1.1 yamt errx(EXIT_FAILURE, "%s unknown type %u", __func__, 594 1.1 yamt types[i]); 595 1.1 yamt } 596 1.1 yamt } 597 1.1 yamt s->cur++; 598 1.1 yamt if (s->cur == s->nrows) { 599 1.1 yamt PQclear(s->res); 600 1.1 yamt s->res = NULL; 601 1.1 yamt } 602 1.1 yamt return 0; 603 1.1 yamt } 604 1.1 yamt 605 1.1 yamt int 606 1.1 yamt fetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, ...) 607 1.1 yamt { 608 1.1 yamt va_list ap; 609 1.1 yamt int error; 610 1.1 yamt 611 1.1 yamt va_start(ap, types); 612 1.1 yamt error = vfetchnext(s, n, types, ap); 613 1.1 yamt va_end(ap); 614 1.1 yamt return error; 615 1.1 yamt } 616 1.1 yamt 617 1.1 yamt void 618 1.1 yamt fetchdone(struct fetchstatus *s) 619 1.1 yamt { 620 1.1 yamt 621 1.1 yamt if (s->res != NULL) { 622 1.1 yamt PQclear(s->res); 623 1.1 yamt s->res = NULL; 624 1.1 yamt } 625 1.1 yamt if (!s->done) { 626 1.1 yamt PGresult *res; 627 1.1 yamt unsigned int n; 628 1.1 yamt 629 1.1 yamt n = 0; 630 1.1 yamt while ((res = PQgetResult(s->xc->conn)) != NULL) { 631 1.1 yamt PQclear(res); 632 1.1 yamt n++; 633 1.1 yamt } 634 1.1 yamt if (n > 0) { 635 1.1 yamt DPRINTF("%u rows dropped\n", n); 636 1.1 yamt } 637 1.1 yamt } 638 1.1 yamt } 639 1.1 yamt 640 1.1 yamt int 641 1.1 yamt simplefetch(struct Xconn *xc, Oid type, ...) 642 1.1 yamt { 643 1.1 yamt struct fetchstatus s; 644 1.1 yamt va_list ap; 645 1.1 yamt int error; 646 1.1 yamt 647 1.1 yamt fetchinit(&s, xc); 648 1.1 yamt va_start(ap, type); 649 1.1 yamt error = vfetchnext(&s, 1, &type, ap); 650 1.1 yamt va_end(ap); 651 1.1 yamt assert(error != 0 || s.res == NULL); 652 1.1 yamt fetchdone(&s); 653 1.1 yamt return error; 654 1.1 yamt } 655 1.1 yamt 656 1.3 yamt /* 657 1.3 yamt * setlabel: set the descriptive label for the connection. 658 1.3 yamt * 659 1.3 yamt * we use simple pointer comparison for label equality check. 660 1.3 yamt */ 661 1.2 yamt static void 662 1.2 yamt setlabel(struct Xconn *xc, const char *label) 663 1.2 yamt { 664 1.2 yamt int error; 665 1.2 yamt 666 1.2 yamt /* 667 1.2 yamt * put the label into application_name so that it's shown in 668 1.2 yamt * pg_stat_activity. we are sure that our labels don't need 669 1.2 yamt * PQescapeStringConn. 670 1.2 yamt * 671 1.2 yamt * example: 672 1.2 yamt * SELECT pid,application_name,query FROM pg_stat_activity 673 1.2 yamt * WHERE state <> 'idle' 674 1.2 yamt */ 675 1.2 yamt 676 1.3 yamt if (label != NULL && label != xc->label) { 677 1.2 yamt struct cmd *c; 678 1.2 yamt char cmd_str[1024]; 679 1.2 yamt 680 1.2 yamt snprintf(cmd_str, sizeof(cmd_str), 681 1.2 yamt "SET application_name TO 'pgfs: %s'", label); 682 1.2 yamt c = createcmd(cmd_str, CMD_NOPREPARE); 683 1.2 yamt error = simplecmd(xc, c); 684 1.2 yamt freecmd(c); 685 1.2 yamt assert(error == 0); 686 1.3 yamt xc->label = label; 687 1.2 yamt } else { 688 1.2 yamt #if 0 /* don't bother to clear label */ 689 1.2 yamt static struct cmd *c; 690 1.2 yamt 691 1.2 yamt CREATECMD_NOPARAM(c, "SET application_name TO 'pgfs'"); 692 1.2 yamt error = simplecmd(xc, c); 693 1.2 yamt assert(error == 0); 694 1.2 yamt #endif 695 1.2 yamt } 696 1.2 yamt } 697 1.2 yamt 698 1.1 yamt struct Xconn * 699 1.2 yamt begin(struct puffs_usermount *pu, const char *label) 700 1.1 yamt { 701 1.1 yamt struct Xconn *xc = getxc(puffs_cc_getcc(pu)); 702 1.1 yamt static struct cmd *c; 703 1.1 yamt int error; 704 1.1 yamt 705 1.2 yamt setlabel(xc, label); 706 1.1 yamt CREATECMD_NOPARAM(c, "BEGIN"); 707 1.1 yamt assert(!xc->in_trans); 708 1.1 yamt error = simplecmd(xc, c); 709 1.1 yamt assert(error == 0); 710 1.1 yamt assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS); 711 1.1 yamt xc->in_trans = true; 712 1.1 yamt return xc; 713 1.1 yamt } 714 1.1 yamt 715 1.1 yamt struct Xconn * 716 1.2 yamt begin_readonly(struct puffs_usermount *pu, const char *label) 717 1.1 yamt { 718 1.1 yamt struct Xconn *xc = getxc(puffs_cc_getcc(pu)); 719 1.1 yamt static struct cmd *c; 720 1.1 yamt int error; 721 1.1 yamt 722 1.2 yamt setlabel(xc, label); 723 1.1 yamt CREATECMD_NOPARAM(c, "BEGIN READ ONLY"); 724 1.1 yamt assert(!xc->in_trans); 725 1.1 yamt error = simplecmd(xc, c); 726 1.1 yamt assert(error == 0); 727 1.1 yamt assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS); 728 1.1 yamt xc->in_trans = true; 729 1.1 yamt return xc; 730 1.1 yamt } 731 1.1 yamt 732 1.1 yamt void 733 1.1 yamt rollback(struct Xconn *xc) 734 1.1 yamt { 735 1.1 yamt PGTransactionStatusType status; 736 1.1 yamt 737 1.1 yamt /* 738 1.1 yamt * check the status as we are not sure the status of our transaction 739 1.1 yamt * after a failed commit. 740 1.1 yamt */ 741 1.1 yamt status = PQtransactionStatus(xc->conn); 742 1.1 yamt assert(status != PQTRANS_ACTIVE); 743 1.1 yamt assert(status != PQTRANS_UNKNOWN); 744 1.1 yamt if (status != PQTRANS_IDLE) { 745 1.1 yamt static struct cmd *c; 746 1.1 yamt int error; 747 1.1 yamt 748 1.1 yamt assert(status == PQTRANS_INTRANS || status == PQTRANS_INERROR); 749 1.1 yamt CREATECMD_NOPARAM(c, "ROLLBACK"); 750 1.1 yamt error = simplecmd(xc, c); 751 1.1 yamt assert(error == 0); 752 1.1 yamt } 753 1.1 yamt DPRINTF("xc %p rollback %p\n", xc, xc->owner); 754 1.2 yamt setlabel(xc, NULL); 755 1.1 yamt relxc(xc); 756 1.1 yamt } 757 1.1 yamt 758 1.1 yamt int 759 1.1 yamt commit(struct Xconn *xc) 760 1.1 yamt { 761 1.1 yamt static struct cmd *c; 762 1.1 yamt int error; 763 1.1 yamt 764 1.1 yamt CREATECMD_NOPARAM(c, "COMMIT"); 765 1.1 yamt error = simplecmd(xc, c); 766 1.2 yamt setlabel(xc, NULL); 767 1.1 yamt if (error == 0) { 768 1.1 yamt DPRINTF("xc %p commit %p\n", xc, xc->owner); 769 1.1 yamt relxc(xc); 770 1.1 yamt } 771 1.1 yamt return error; 772 1.1 yamt } 773 1.1 yamt 774 1.1 yamt int 775 1.1 yamt commit_sync(struct Xconn *xc) 776 1.1 yamt { 777 1.1 yamt static struct cmd *c; 778 1.1 yamt int error; 779 1.1 yamt 780 1.1 yamt assert(!pgfs_dosync); 781 1.1 yamt CREATECMD_NOPARAM(c, "SET LOCAL SYNCHRONOUS_COMMIT TO ON"); 782 1.1 yamt error = simplecmd(xc, c); 783 1.1 yamt assert(error == 0); 784 1.1 yamt return commit(xc); 785 1.1 yamt } 786 1.1 yamt 787 1.1 yamt static void 788 1.1 yamt pgfs_notice_receiver(void *vp, const PGresult *res) 789 1.1 yamt { 790 1.1 yamt struct Xconn *xc = vp; 791 1.1 yamt 792 1.1 yamt assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR); 793 1.1 yamt fprintf(stderr, "got a notice on %p\n", xc); 794 1.1 yamt dumperror(xc, res); 795 1.1 yamt } 796 1.1 yamt 797 1.1 yamt static int 798 1.1 yamt pgfs_readframe(struct puffs_usermount *pu, struct puffs_framebuf *pufbuf, 799 1.1 yamt int fd, int *done) 800 1.1 yamt { 801 1.1 yamt struct Xconn *xc; 802 1.1 yamt PGconn *conn; 803 1.1 yamt 804 1.1 yamt TAILQ_FOREACH(xc, &xclist, list) { 805 1.1 yamt if (PQsocket(xc->conn) == fd) { 806 1.1 yamt break; 807 1.1 yamt } 808 1.1 yamt } 809 1.1 yamt assert(xc != NULL); 810 1.1 yamt conn = xc->conn; 811 1.1 yamt PQconsumeInput(conn); 812 1.1 yamt if (!PQisBusy(conn)) { 813 1.1 yamt if (xc->blocker != NULL) { 814 1.1 yamt DPRINTF("schedule %p\n", xc->blocker); 815 1.1 yamt puffs_cc_schedule(xc->blocker); 816 1.1 yamt } else { 817 1.1 yamt DPRINTF("no blockers\n"); 818 1.1 yamt } 819 1.1 yamt } 820 1.1 yamt *done = 0; 821 1.1 yamt return 0; 822 1.1 yamt } 823 1.1 yamt 824 1.1 yamt int 825 1.1 yamt pgfs_connectdb(struct puffs_usermount *pu, const char *dbname, 826 1.1 yamt const char *dbuser, bool debug, bool synchronous, unsigned int nconn) 827 1.1 yamt { 828 1.1 yamt const char *keywords[3+1]; 829 1.1 yamt const char *values[3]; 830 1.1 yamt unsigned int i; 831 1.1 yamt 832 1.1 yamt if (nconn > 32) { 833 1.1 yamt /* 834 1.1 yamt * limit from sizeof(cmd->prepared_mask) 835 1.1 yamt */ 836 1.1 yamt return EINVAL; 837 1.1 yamt } 838 1.1 yamt if (debug) { 839 1.1 yamt pgfs_dodprintf = true; 840 1.1 yamt } 841 1.1 yamt if (synchronous) { 842 1.1 yamt pgfs_dosync = true; 843 1.1 yamt } 844 1.1 yamt i = 0; 845 1.1 yamt if (dbname != NULL) { 846 1.1 yamt keywords[i] = "dbname"; 847 1.1 yamt values[i] = dbname; 848 1.1 yamt i++; 849 1.1 yamt } 850 1.1 yamt if (dbuser != NULL) { 851 1.1 yamt keywords[i] = "user"; 852 1.1 yamt values[i] = dbuser; 853 1.1 yamt i++; 854 1.1 yamt } 855 1.1 yamt keywords[i] = "application_name"; 856 1.1 yamt values[i] = "pgfs"; 857 1.1 yamt i++; 858 1.1 yamt keywords[i] = NULL; 859 1.1 yamt puffs_framev_init(pu, pgfs_readframe, NULL, NULL, NULL, NULL); 860 1.1 yamt for (i = 0; i < nconn; i++) { 861 1.1 yamt struct Xconn *xc; 862 1.1 yamt struct Xconn *xc2; 863 1.1 yamt static int xcid; 864 1.1 yamt PGconn *conn; 865 1.1 yamt struct cmd *c; 866 1.1 yamt int error; 867 1.1 yamt 868 1.1 yamt conn = PQconnectdbParams(keywords, values, 0); 869 1.1 yamt if (conn == NULL) { 870 1.1 yamt errx(EXIT_FAILURE, 871 1.1 yamt "PQconnectdbParams: unknown failure"); 872 1.1 yamt } 873 1.1 yamt if (PQstatus(conn) != CONNECTION_OK) { 874 1.1 yamt /* 875 1.1 yamt * XXX sleep and retry on ERRCODE_CANNOT_CONNECT_NOW 876 1.1 yamt */ 877 1.1 yamt errx(EXIT_FAILURE, "PQconnectdbParams: %s", 878 1.1 yamt PQerrorMessage(conn)); 879 1.1 yamt } 880 1.1 yamt DPRINTF("protocol version %d\n", PQprotocolVersion(conn)); 881 1.1 yamt puffs_framev_addfd(pu, PQsocket(conn), PUFFS_FBIO_READ); 882 1.1 yamt xc = emalloc(sizeof(*xc)); 883 1.1 yamt xc->conn = conn; 884 1.1 yamt xc->blocker = NULL; 885 1.1 yamt xc->owner = NULL; 886 1.1 yamt xc->in_trans = false; 887 1.1 yamt xc->id = xcid++; 888 1.3 yamt xc->label = NULL; 889 1.1 yamt assert(xc->id < 32); 890 1.1 yamt PQsetNoticeReceiver(conn, pgfs_notice_receiver, xc); 891 1.1 yamt TAILQ_INSERT_HEAD(&xclist, xc, list); 892 1.2 yamt xc2 = begin(pu, NULL); 893 1.1 yamt assert(xc2 == xc); 894 1.1 yamt c = createcmd("SET search_path TO pgfs", CMD_NOPREPARE); 895 1.1 yamt error = simplecmd(xc, c); 896 1.1 yamt assert(error == 0); 897 1.1 yamt freecmd(c); 898 1.1 yamt c = createcmd("SET SESSION CHARACTERISTICS AS " 899 1.1 yamt "TRANSACTION ISOLATION LEVEL REPEATABLE READ", 900 1.1 yamt CMD_NOPREPARE); 901 1.1 yamt error = simplecmd(xc, c); 902 1.1 yamt assert(error == 0); 903 1.1 yamt freecmd(c); 904 1.1 yamt c = createcmd("SET SESSION TIME ZONE UTC", CMD_NOPREPARE); 905 1.1 yamt error = simplecmd(xc, c); 906 1.1 yamt assert(error == 0); 907 1.1 yamt freecmd(c); 908 1.1 yamt if (!pgfs_dosync) { 909 1.1 yamt c = createcmd("SET SESSION SYNCHRONOUS_COMMIT TO OFF", 910 1.1 yamt CMD_NOPREPARE); 911 1.1 yamt error = simplecmd(xc, c); 912 1.1 yamt assert(error == 0); 913 1.1 yamt freecmd(c); 914 1.1 yamt } 915 1.1 yamt if (debug) { 916 1.1 yamt struct fetchstatus s; 917 1.1 yamt static const Oid types[] = { INT8OID, }; 918 1.1 yamt uint64_t pid; 919 1.1 yamt 920 1.1 yamt c = createcmd("SELECT pg_backend_pid()::int8;", 921 1.1 yamt CMD_NOPREPARE); 922 1.1 yamt error = sendcmd(xc, c); 923 1.1 yamt assert(error == 0); 924 1.1 yamt fetchinit(&s, xc); 925 1.1 yamt error = FETCHNEXT(&s, types, &pid); 926 1.1 yamt fetchdone(&s); 927 1.1 yamt assert(error == 0); 928 1.1 yamt DPRINTF("xc %p backend pid %" PRIu64 "\n", xc, pid); 929 1.1 yamt } 930 1.1 yamt error = commit(xc); 931 1.1 yamt assert(error == 0); 932 1.1 yamt assert(xc->owner == NULL); 933 1.1 yamt } 934 1.1 yamt /* 935 1.1 yamt * XXX cleanup unlinked files here? what to do when the filesystem 936 1.1 yamt * is shared? 937 1.1 yamt */ 938 1.1 yamt return 0; 939 1.1 yamt } 940 1.1 yamt 941 1.1 yamt struct waitq flushwaitq = TAILQ_HEAD_INITIALIZER(flushwaitq); 942 1.1 yamt struct puffs_cc *flusher = NULL; 943 1.1 yamt 944 1.1 yamt int 945 1.1 yamt flush_xacts(struct puffs_usermount *pu) 946 1.1 yamt { 947 1.1 yamt struct puffs_cc *cc = puffs_cc_getcc(pu); 948 1.1 yamt struct Xconn *xc; 949 1.1 yamt static struct cmd *c; 950 1.1 yamt uint64_t dummy; 951 1.1 yamt int error; 952 1.1 yamt 953 1.1 yamt /* 954 1.1 yamt * flush all previously issued asynchronous transactions. 955 1.1 yamt * 956 1.1 yamt * XXX 957 1.1 yamt * unfortunately it seems that there is no clean way to tell 958 1.1 yamt * PostgreSQL flush XLOG. we could perform a CHECKPOINT but it's 959 1.1 yamt * too expensive and overkill for our purpose. 960 1.1 yamt * besides, PostgreSQL has an optimization to skip XLOG flushing 961 1.1 yamt * for transactions which didn't produce WAL records. 962 1.1 yamt * (changeset f6a0863e3cb72763490ceca2c558d5ef2dddd5f2) 963 1.1 yamt * it means that an empty transaction ("BEGIN; COMMIT;"), which 964 1.1 yamt * doesn't produce any WAL records, doesn't flush the XLOG even if 965 1.1 yamt * synchronous_commit=on. we issues a dummy setval() to avoid the 966 1.1 yamt * optimization. 967 1.1 yamt * on the other hand, we try to avoid creating unnecessary WAL activity 968 1.1 yamt * by serializing flushing and checking XLOG locations. 969 1.1 yamt */ 970 1.1 yamt 971 1.1 yamt assert(!pgfs_dosync); 972 1.1 yamt if (flusher != NULL) { /* serialize flushers */ 973 1.1 yamt DPRINTF("%p flush in progress %p\n", cc, flusher); 974 1.1 yamt waiton(&flushwaitq, cc); 975 1.1 yamt assert(flusher == NULL); 976 1.1 yamt } 977 1.1 yamt DPRINTF("%p start flushing\n", cc); 978 1.1 yamt flusher = cc; 979 1.1 yamt retry: 980 1.2 yamt xc = begin(pu, "flush"); 981 1.1 yamt CREATECMD_NOPARAM(c, "SELECT setval('dummyseq', 1) WHERE " 982 1.1 yamt "pg_current_xlog_insert_location() <> pg_current_xlog_location()"); 983 1.1 yamt error = sendcmd(xc, c); 984 1.1 yamt if (error != 0) { 985 1.1 yamt goto got_error; 986 1.1 yamt } 987 1.1 yamt error = simplefetch(xc, INT8OID, &dummy); 988 1.1 yamt assert(error != 0 || dummy == 1); 989 1.1 yamt if (error == ENOENT) { 990 1.1 yamt /* 991 1.1 yamt * there seems to be nothing to flush. 992 1.1 yamt */ 993 1.1 yamt DPRINTF("%p no sync\n", cc); 994 1.1 yamt error = 0; 995 1.1 yamt } 996 1.1 yamt if (error != 0) { 997 1.1 yamt goto got_error; 998 1.1 yamt } 999 1.1 yamt error = commit_sync(xc); 1000 1.1 yamt if (error != 0) { 1001 1.1 yamt goto got_error; 1002 1.1 yamt } 1003 1.1 yamt goto done; 1004 1.1 yamt got_error: 1005 1.1 yamt rollback(xc); 1006 1.1 yamt if (error == EAGAIN) { 1007 1.1 yamt goto retry; 1008 1.1 yamt } 1009 1.1 yamt done: 1010 1.1 yamt assert(flusher == cc); 1011 1.1 yamt flusher = NULL; 1012 1.1 yamt wakeup_one(&flushwaitq); 1013 1.1 yamt DPRINTF("%p end flushing error=%d\n", cc, error); 1014 1.1 yamt return error; 1015 1.1 yamt } 1016