pgfs_db.c revision 1.2 1 1.2 yamt /* $NetBSD: pgfs_db.c,v 1.2 2012/04/11 14:26:44 yamt Exp $ */
2 1.1 yamt
3 1.1 yamt /*-
4 1.1 yamt * Copyright (c)2010,2011 YAMAMOTO Takashi,
5 1.1 yamt * All rights reserved.
6 1.1 yamt *
7 1.1 yamt * Redistribution and use in source and binary forms, with or without
8 1.1 yamt * modification, are permitted provided that the following conditions
9 1.1 yamt * are met:
10 1.1 yamt * 1. Redistributions of source code must retain the above copyright
11 1.1 yamt * notice, this list of conditions and the following disclaimer.
12 1.1 yamt * 2. Redistributions in binary form must reproduce the above copyright
13 1.1 yamt * notice, this list of conditions and the following disclaimer in the
14 1.1 yamt * documentation and/or other materials provided with the distribution.
15 1.1 yamt *
16 1.1 yamt * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 1.1 yamt * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 1.1 yamt * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 1.1 yamt * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 1.1 yamt * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 1.1 yamt * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 1.1 yamt * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 1.1 yamt * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 1.1 yamt * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 1.1 yamt * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 1.1 yamt * SUCH DAMAGE.
27 1.1 yamt */
28 1.1 yamt
29 1.1 yamt /*
30 1.1 yamt * backend db operations
31 1.1 yamt */
32 1.1 yamt
33 1.1 yamt #include <sys/cdefs.h>
34 1.1 yamt #ifndef lint
35 1.2 yamt __RCSID("$NetBSD: pgfs_db.c,v 1.2 2012/04/11 14:26:44 yamt Exp $");
36 1.1 yamt #endif /* not lint */
37 1.1 yamt
38 1.1 yamt #include <assert.h>
39 1.1 yamt #include <err.h>
40 1.1 yamt #include <errno.h>
41 1.1 yamt #include <inttypes.h>
42 1.1 yamt #include <puffs.h>
43 1.1 yamt #include <stdbool.h>
44 1.1 yamt #include <stdarg.h>
45 1.1 yamt #include <stdio.h>
46 1.1 yamt #include <stdlib.h>
47 1.1 yamt #include <util.h>
48 1.1 yamt
49 1.1 yamt #include <libpq-fe.h>
50 1.1 yamt
51 1.1 yamt #include "pgfs_db.h"
52 1.1 yamt #include "pgfs_waitq.h"
53 1.1 yamt #include "pgfs_debug.h"
54 1.1 yamt
55 1.1 yamt bool pgfs_dosync = false;
56 1.1 yamt
57 1.1 yamt struct Xconn {
58 1.1 yamt TAILQ_ENTRY(Xconn) list;
59 1.1 yamt PGconn *conn;
60 1.1 yamt struct puffs_cc *blocker;
61 1.1 yamt struct puffs_cc *owner;
62 1.1 yamt bool in_trans;
63 1.1 yamt int id;
64 1.1 yamt };
65 1.1 yamt
66 1.1 yamt static void
67 1.1 yamt dumperror(struct Xconn *xc, const PGresult *res)
68 1.1 yamt {
69 1.1 yamt static const struct {
70 1.1 yamt const char *name;
71 1.1 yamt int code;
72 1.1 yamt } fields[] = {
73 1.1 yamt #define F(x) { .name = #x, .code = x, }
74 1.1 yamt F(PG_DIAG_SEVERITY),
75 1.1 yamt F(PG_DIAG_SQLSTATE),
76 1.1 yamt F(PG_DIAG_MESSAGE_PRIMARY),
77 1.1 yamt F(PG_DIAG_MESSAGE_DETAIL),
78 1.1 yamt F(PG_DIAG_MESSAGE_HINT),
79 1.1 yamt F(PG_DIAG_STATEMENT_POSITION),
80 1.1 yamt F(PG_DIAG_INTERNAL_POSITION),
81 1.1 yamt F(PG_DIAG_INTERNAL_QUERY),
82 1.1 yamt F(PG_DIAG_CONTEXT),
83 1.1 yamt F(PG_DIAG_SOURCE_FILE),
84 1.1 yamt F(PG_DIAG_SOURCE_LINE),
85 1.1 yamt F(PG_DIAG_SOURCE_FUNCTION),
86 1.1 yamt #undef F
87 1.1 yamt };
88 1.1 yamt unsigned int i;
89 1.1 yamt
90 1.1 yamt if (!pgfs_dodprintf) {
91 1.1 yamt return;
92 1.1 yamt }
93 1.1 yamt assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR ||
94 1.1 yamt PQresultStatus(res) == PGRES_FATAL_ERROR);
95 1.1 yamt for (i = 0; i < __arraycount(fields); i++) {
96 1.1 yamt const char *val = PQresultErrorField(res, fields[i].code);
97 1.1 yamt
98 1.1 yamt if (val == NULL) {
99 1.1 yamt continue;
100 1.1 yamt }
101 1.1 yamt fprintf(stderr, "%s: %s\n", fields[i].name, val);
102 1.1 yamt }
103 1.1 yamt }
104 1.1 yamt
105 1.1 yamt TAILQ_HEAD(, Xconn) xclist = TAILQ_HEAD_INITIALIZER(xclist);
106 1.1 yamt struct waitq xcwaitq = TAILQ_HEAD_INITIALIZER(xcwaitq);
107 1.1 yamt
108 1.1 yamt static struct Xconn *
109 1.1 yamt getxc(struct puffs_cc *cc)
110 1.1 yamt {
111 1.1 yamt struct Xconn *xc;
112 1.1 yamt
113 1.1 yamt assert(cc != NULL);
114 1.1 yamt retry:
115 1.1 yamt TAILQ_FOREACH(xc, &xclist, list) {
116 1.1 yamt if (xc->blocker == NULL) {
117 1.1 yamt assert(xc->owner == NULL);
118 1.1 yamt xc->owner = cc;
119 1.1 yamt DPRINTF("xc %p acquire %p\n", xc, cc);
120 1.1 yamt return xc;
121 1.1 yamt } else {
122 1.1 yamt assert(xc->owner == xc->blocker);
123 1.1 yamt }
124 1.1 yamt }
125 1.1 yamt DPRINTF("no free conn %p\n", cc);
126 1.1 yamt waiton(&xcwaitq, cc);
127 1.1 yamt goto retry;
128 1.1 yamt }
129 1.1 yamt
130 1.1 yamt static void
131 1.1 yamt relxc(struct Xconn *xc)
132 1.1 yamt {
133 1.1 yamt
134 1.1 yamt assert(xc->in_trans);
135 1.1 yamt assert(xc->owner != NULL);
136 1.1 yamt xc->in_trans = false;
137 1.1 yamt xc->owner = NULL;
138 1.1 yamt wakeup_one(&xcwaitq);
139 1.1 yamt }
140 1.1 yamt
141 1.1 yamt static void
142 1.1 yamt pqwait(struct Xconn *xc)
143 1.1 yamt {
144 1.1 yamt PGconn *conn = xc->conn;
145 1.1 yamt struct puffs_cc *cc = xc->owner;
146 1.1 yamt
147 1.1 yamt if (PQflush(conn)) {
148 1.1 yamt errx(EXIT_FAILURE, "PQflush: %s", PQerrorMessage(conn));
149 1.1 yamt }
150 1.1 yamt if (!PQisBusy(conn)) {
151 1.1 yamt return;
152 1.1 yamt }
153 1.1 yamt assert(xc->blocker == NULL);
154 1.1 yamt xc->blocker = cc;
155 1.1 yamt DPRINTF("yielding %p\n", cc);
156 1.1 yamt /* XXX is it safe to yield before entering mainloop? */
157 1.1 yamt puffs_cc_yield(cc);
158 1.1 yamt DPRINTF("yield returned %p\n", cc);
159 1.1 yamt assert(xc->owner == cc);
160 1.1 yamt assert(xc->blocker == cc);
161 1.1 yamt xc->blocker = NULL;
162 1.1 yamt }
163 1.1 yamt
164 1.1 yamt static int
165 1.1 yamt sqltoerrno(const char *sqlstate)
166 1.1 yamt {
167 1.1 yamt /*
168 1.1 yamt * XXX hack; ERRCODE_INTERNAL_ERROR -> EAGAIN to handle
169 1.1 yamt * "tuple concurrently updated" errors for lowrite/lo_truncate.
170 1.1 yamt *
171 1.1 yamt * XXX should map ERRCODE_OUT_OF_MEMORY to EAGAIN?
172 1.1 yamt */
173 1.1 yamt static const struct {
174 1.1 yamt char sqlstate[5];
175 1.1 yamt int error;
176 1.1 yamt } map[] = {
177 1.1 yamt { "00000", 0, }, /* ERRCODE_SUCCESSFUL_COMPLETION */
178 1.1 yamt { "02000", ENOENT, }, /* ERRCODE_NO_DATA */
179 1.1 yamt { "23505", EEXIST, }, /* ERRCODE_UNIQUE_VIOLATION */
180 1.1 yamt { "23514", EINVAL, }, /* ERRCODE_CHECK_VIOLATION */
181 1.1 yamt { "40001", EAGAIN, }, /* ERRCODE_T_R_SERIALIZATION_FAILURE */
182 1.1 yamt { "40P01", EAGAIN, }, /* ERRCODE_T_R_DEADLOCK_DETECTED */
183 1.1 yamt { "42704", ENOENT, }, /* ERRCODE_UNDEFINED_OBJECT */
184 1.1 yamt { "53100", ENOSPC, }, /* ERRCODE_DISK_FULL */
185 1.1 yamt { "53200", ENOMEM, }, /* ERRCODE_OUT_OF_MEMORY */
186 1.1 yamt { "XX000", EAGAIN, }, /* ERRCODE_INTERNAL_ERROR */
187 1.1 yamt };
188 1.1 yamt unsigned int i;
189 1.1 yamt
190 1.1 yamt for (i = 0; i < __arraycount(map); i++) {
191 1.1 yamt if (!memcmp(map[i].sqlstate, sqlstate, 5)) {
192 1.1 yamt const int error = map[i].error;
193 1.1 yamt
194 1.1 yamt if (error != 0) {
195 1.1 yamt DPRINTF("sqlstate %5s mapped to error %d\n",
196 1.1 yamt sqlstate, error);
197 1.1 yamt }
198 1.1 yamt if (error == EINVAL) {
199 1.1 yamt /*
200 1.1 yamt * sounds like a bug.
201 1.1 yamt */
202 1.1 yamt abort();
203 1.1 yamt }
204 1.1 yamt return error;
205 1.1 yamt }
206 1.1 yamt }
207 1.1 yamt DPRINTF("unknown sqlstate %5s mapped to EIO\n", sqlstate);
208 1.1 yamt return EIO;
209 1.1 yamt }
210 1.1 yamt
211 1.1 yamt struct cmd {
212 1.1 yamt char name[32]; /* name of prepared statement */
213 1.1 yamt char *cmd; /* query string */
214 1.1 yamt unsigned int nparams;
215 1.1 yamt Oid *paramtypes;
216 1.1 yamt uint32_t prepared_mask; /* for which connections this is prepared? */
217 1.1 yamt unsigned int flags; /* CMD_ flags */
218 1.1 yamt };
219 1.1 yamt
220 1.1 yamt #define CMD_NOPREPARE 1 /* don't prepare this command */
221 1.1 yamt
222 1.1 yamt struct cmd *
223 1.1 yamt createcmd(const char *cmd, unsigned int flags, ...)
224 1.1 yamt {
225 1.1 yamt struct cmd *c;
226 1.1 yamt va_list ap;
227 1.1 yamt const char *cp;
228 1.1 yamt unsigned int i;
229 1.1 yamt static unsigned int cmdid;
230 1.1 yamt
231 1.1 yamt c = emalloc(sizeof(*c));
232 1.1 yamt c->cmd = estrdup(cmd);
233 1.1 yamt c->nparams = 0;
234 1.1 yamt va_start(ap, flags);
235 1.1 yamt for (cp = cmd; *cp != 0; cp++) {
236 1.1 yamt if (*cp == '$') { /* XXX */
237 1.1 yamt c->nparams++;
238 1.1 yamt }
239 1.1 yamt }
240 1.1 yamt c->paramtypes = emalloc(c->nparams * sizeof(*c->paramtypes));
241 1.1 yamt for (i = 0; i < c->nparams; i++) {
242 1.1 yamt Oid type = va_arg(ap, Oid);
243 1.1 yamt assert(type == BYTEA ||
244 1.1 yamt type == INT4OID || type == INT8OID || type == OIDOID ||
245 1.1 yamt type == TEXTOID || type == TIMESTAMPTZOID);
246 1.1 yamt c->paramtypes[i] = type;
247 1.1 yamt }
248 1.1 yamt va_end(ap);
249 1.1 yamt snprintf(c->name, sizeof(c->name), "%u", cmdid++);
250 1.1 yamt if ((flags & CMD_NOPREPARE) != 0) {
251 1.1 yamt c->prepared_mask = ~0;
252 1.1 yamt } else {
253 1.1 yamt c->prepared_mask = 0;
254 1.1 yamt }
255 1.1 yamt c->flags = flags;
256 1.1 yamt return c;
257 1.1 yamt }
258 1.1 yamt
259 1.1 yamt static void
260 1.1 yamt freecmd(struct cmd *c)
261 1.1 yamt {
262 1.1 yamt
263 1.1 yamt free(c->paramtypes);
264 1.1 yamt free(c->cmd);
265 1.1 yamt free(c);
266 1.1 yamt }
267 1.1 yamt
268 1.1 yamt static int
269 1.1 yamt fetch_noresult(struct Xconn *xc)
270 1.1 yamt {
271 1.1 yamt PGresult *res;
272 1.1 yamt ExecStatusType status;
273 1.1 yamt PGconn *conn = xc->conn;
274 1.1 yamt int error;
275 1.1 yamt
276 1.1 yamt pqwait(xc);
277 1.1 yamt res = PQgetResult(conn);
278 1.1 yamt if (res == NULL) {
279 1.1 yamt return ENOENT;
280 1.1 yamt }
281 1.1 yamt status = PQresultStatus(res);
282 1.1 yamt if (status == PGRES_COMMAND_OK) {
283 1.1 yamt assert(PQnfields(res) == 0);
284 1.1 yamt assert(PQntuples(res) == 0);
285 1.1 yamt if (!strcmp(PQcmdTuples(res), "0")) {
286 1.1 yamt error = ENOENT;
287 1.1 yamt } else {
288 1.1 yamt error = 0;
289 1.1 yamt }
290 1.1 yamt } else if (status == PGRES_FATAL_ERROR) {
291 1.1 yamt error = sqltoerrno(PQresultErrorField(res, PG_DIAG_SQLSTATE));
292 1.1 yamt assert(error != 0);
293 1.1 yamt dumperror(xc, res);
294 1.1 yamt } else {
295 1.1 yamt errx(1, "%s not command_ok: %d: %s", __func__,
296 1.1 yamt (int)status,
297 1.1 yamt PQerrorMessage(conn));
298 1.1 yamt }
299 1.1 yamt PQclear(res);
300 1.1 yamt res = PQgetResult(conn);
301 1.1 yamt assert(res == NULL);
302 1.1 yamt if (error != 0) {
303 1.1 yamt DPRINTF("error %d\n", error);
304 1.1 yamt }
305 1.1 yamt return error;
306 1.1 yamt }
307 1.1 yamt
308 1.1 yamt static int
309 1.1 yamt preparecmd(struct Xconn *xc, struct cmd *c)
310 1.1 yamt {
311 1.1 yamt PGconn *conn = xc->conn;
312 1.1 yamt const uint32_t mask = 1 << xc->id;
313 1.1 yamt int error;
314 1.1 yamt int ret;
315 1.1 yamt
316 1.1 yamt if ((c->prepared_mask & mask) != 0) {
317 1.1 yamt return 0;
318 1.1 yamt }
319 1.1 yamt assert((c->flags & CMD_NOPREPARE) == 0);
320 1.1 yamt DPRINTF("PREPARE: '%s'\n", c->cmd);
321 1.1 yamt ret = PQsendPrepare(conn, c->name, c->cmd, c->nparams, c->paramtypes);
322 1.1 yamt if (!ret) {
323 1.1 yamt errx(EXIT_FAILURE, "PQsendPrepare: %s",
324 1.1 yamt PQerrorMessage(conn));
325 1.1 yamt }
326 1.1 yamt error = fetch_noresult(xc);
327 1.1 yamt if (error != 0) {
328 1.1 yamt return error;
329 1.1 yamt }
330 1.1 yamt c->prepared_mask |= mask;
331 1.1 yamt return 0;
332 1.1 yamt }
333 1.1 yamt
334 1.1 yamt /*
335 1.1 yamt * vsendcmd:
336 1.1 yamt *
337 1.1 yamt * resultmode is just passed to PQsendQueryParams/PQsendQueryPrepared.
338 1.1 yamt * 0 for text and 1 for binary.
339 1.1 yamt */
340 1.1 yamt
341 1.1 yamt static int
342 1.1 yamt vsendcmd(struct Xconn *xc, int resultmode, struct cmd *c, va_list ap)
343 1.1 yamt {
344 1.1 yamt PGconn *conn = xc->conn;
345 1.1 yamt char **paramvalues;
346 1.1 yamt int *paramlengths;
347 1.1 yamt int *paramformats;
348 1.1 yamt unsigned int i;
349 1.1 yamt int error;
350 1.1 yamt int ret;
351 1.1 yamt
352 1.1 yamt assert(xc->owner != NULL);
353 1.1 yamt assert(xc->blocker == NULL);
354 1.1 yamt error = preparecmd(xc, c);
355 1.1 yamt if (error != 0) {
356 1.1 yamt return error;
357 1.1 yamt }
358 1.1 yamt paramvalues = emalloc(c->nparams * sizeof(*paramvalues));
359 1.1 yamt paramlengths = NULL;
360 1.1 yamt paramformats = NULL;
361 1.1 yamt DPRINTF("CMD: '%s'\n", c->cmd);
362 1.1 yamt for (i = 0; i < c->nparams; i++) {
363 1.1 yamt Oid type = c->paramtypes[i];
364 1.1 yamt char tmpstore[1024];
365 1.1 yamt const char *buf = NULL;
366 1.1 yamt intmax_t v = 0; /* XXXgcc */
367 1.1 yamt int sz;
368 1.1 yamt bool binary = false;
369 1.1 yamt
370 1.1 yamt switch (type) {
371 1.1 yamt case BYTEA:
372 1.1 yamt buf = va_arg(ap, const void *);
373 1.1 yamt sz = (int)va_arg(ap, size_t);
374 1.1 yamt binary = true;
375 1.1 yamt break;
376 1.1 yamt case INT8OID:
377 1.1 yamt case OIDOID:
378 1.1 yamt case INT4OID:
379 1.1 yamt switch (type) {
380 1.1 yamt case INT8OID:
381 1.1 yamt v = (intmax_t)va_arg(ap, int64_t);
382 1.1 yamt break;
383 1.1 yamt case OIDOID:
384 1.1 yamt v = (intmax_t)va_arg(ap, Oid);
385 1.1 yamt break;
386 1.1 yamt case INT4OID:
387 1.1 yamt v = (intmax_t)va_arg(ap, int32_t);
388 1.1 yamt break;
389 1.1 yamt default:
390 1.1 yamt errx(EXIT_FAILURE, "unknown integer oid %u",
391 1.1 yamt type);
392 1.1 yamt }
393 1.1 yamt buf = tmpstore;
394 1.1 yamt sz = snprintf(tmpstore, sizeof(tmpstore),
395 1.1 yamt "%jd", v);
396 1.1 yamt assert(sz != -1);
397 1.1 yamt assert((size_t)sz < sizeof(tmpstore));
398 1.1 yamt sz += 1;
399 1.1 yamt break;
400 1.1 yamt case TEXTOID:
401 1.1 yamt case TIMESTAMPTZOID:
402 1.1 yamt buf = va_arg(ap, char *);
403 1.1 yamt sz = strlen(buf) + 1;
404 1.1 yamt break;
405 1.1 yamt default:
406 1.1 yamt errx(EXIT_FAILURE, "%s: unknown param type %u",
407 1.1 yamt __func__, type);
408 1.1 yamt }
409 1.1 yamt if (binary) {
410 1.1 yamt if (paramlengths == NULL) {
411 1.1 yamt paramlengths =
412 1.1 yamt emalloc(c->nparams * sizeof(*paramformats));
413 1.1 yamt }
414 1.1 yamt if (paramformats == NULL) {
415 1.1 yamt paramformats = ecalloc(1,
416 1.1 yamt c->nparams * sizeof(*paramformats));
417 1.1 yamt }
418 1.1 yamt paramformats[i] = 1;
419 1.1 yamt paramlengths[i] = sz;
420 1.1 yamt }
421 1.1 yamt paramvalues[i] = emalloc(sz);
422 1.1 yamt memcpy(paramvalues[i], buf, sz);
423 1.1 yamt if (binary) {
424 1.1 yamt DPRINTF("\t[%u]=<BINARY>\n", i);
425 1.1 yamt } else {
426 1.1 yamt DPRINTF("\t[%u]='%s'\n", i, paramvalues[i]);
427 1.1 yamt }
428 1.1 yamt }
429 1.1 yamt if ((c->flags & CMD_NOPREPARE) != 0) {
430 1.1 yamt ret = PQsendQueryParams(conn, c->cmd, c->nparams, c->paramtypes,
431 1.1 yamt (const char * const *)paramvalues, paramlengths,
432 1.1 yamt paramformats, resultmode);
433 1.1 yamt } else {
434 1.1 yamt ret = PQsendQueryPrepared(conn, c->name, c->nparams,
435 1.1 yamt (const char * const *)paramvalues, paramlengths,
436 1.1 yamt paramformats, resultmode);
437 1.1 yamt }
438 1.1 yamt for (i = 0; i < c->nparams; i++) {
439 1.1 yamt free(paramvalues[i]);
440 1.1 yamt }
441 1.1 yamt free(paramvalues);
442 1.1 yamt free(paramlengths);
443 1.1 yamt free(paramformats);
444 1.1 yamt if (!ret) {
445 1.1 yamt errx(EXIT_FAILURE, "PQsendQueryPrepared: %s",
446 1.1 yamt PQerrorMessage(conn));
447 1.1 yamt }
448 1.1 yamt return 0;
449 1.1 yamt }
450 1.1 yamt
451 1.1 yamt int
452 1.1 yamt sendcmd(struct Xconn *xc, struct cmd *c, ...)
453 1.1 yamt {
454 1.1 yamt va_list ap;
455 1.1 yamt int error;
456 1.1 yamt
457 1.1 yamt va_start(ap, c);
458 1.1 yamt error = vsendcmd(xc, 0, c, ap);
459 1.1 yamt va_end(ap);
460 1.1 yamt return error;
461 1.1 yamt }
462 1.1 yamt
463 1.1 yamt int
464 1.1 yamt sendcmdx(struct Xconn *xc, int resultmode, struct cmd *c, ...)
465 1.1 yamt {
466 1.1 yamt va_list ap;
467 1.1 yamt int error;
468 1.1 yamt
469 1.1 yamt va_start(ap, c);
470 1.1 yamt error = vsendcmd(xc, resultmode, c, ap);
471 1.1 yamt va_end(ap);
472 1.1 yamt return error;
473 1.1 yamt }
474 1.1 yamt
475 1.1 yamt /*
476 1.1 yamt * simplecmd: a convenient routine to execute a command which returns
477 1.1 yamt * no rows synchronously.
478 1.1 yamt */
479 1.1 yamt
480 1.1 yamt int
481 1.1 yamt simplecmd(struct Xconn *xc, struct cmd *c, ...)
482 1.1 yamt {
483 1.1 yamt va_list ap;
484 1.1 yamt int error;
485 1.1 yamt
486 1.1 yamt va_start(ap, c);
487 1.1 yamt error = vsendcmd(xc, 0, c, ap);
488 1.1 yamt va_end(ap);
489 1.1 yamt if (error != 0) {
490 1.1 yamt return error;
491 1.1 yamt }
492 1.1 yamt return fetch_noresult(xc);
493 1.1 yamt }
494 1.1 yamt
495 1.1 yamt void
496 1.1 yamt fetchinit(struct fetchstatus *s, struct Xconn *xc)
497 1.1 yamt {
498 1.1 yamt s->xc = xc;
499 1.1 yamt s->res = NULL;
500 1.1 yamt s->cur = 0;
501 1.1 yamt s->nrows = 0;
502 1.1 yamt s->done = false;
503 1.1 yamt }
504 1.1 yamt
505 1.1 yamt static intmax_t
506 1.1 yamt getint(const char *str)
507 1.1 yamt {
508 1.1 yamt intmax_t i;
509 1.1 yamt char *ep;
510 1.1 yamt
511 1.1 yamt errno = 0;
512 1.1 yamt i = strtoimax(str, &ep, 10);
513 1.1 yamt assert(errno == 0);
514 1.1 yamt assert(str[0] != 0);
515 1.1 yamt assert(*ep == 0);
516 1.1 yamt return i;
517 1.1 yamt }
518 1.1 yamt
519 1.1 yamt static int
520 1.1 yamt vfetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, va_list ap)
521 1.1 yamt {
522 1.1 yamt PGconn *conn = s->xc->conn;
523 1.1 yamt unsigned int i;
524 1.1 yamt
525 1.1 yamt assert(conn != NULL);
526 1.1 yamt if (s->res == NULL) {
527 1.1 yamt ExecStatusType status;
528 1.1 yamt int error;
529 1.1 yamt
530 1.1 yamt pqwait(s->xc);
531 1.1 yamt s->res = PQgetResult(conn);
532 1.1 yamt if (s->res == NULL) {
533 1.1 yamt s->done = true;
534 1.1 yamt return ENOENT;
535 1.1 yamt }
536 1.1 yamt status = PQresultStatus(s->res);
537 1.1 yamt if (status == PGRES_FATAL_ERROR) {
538 1.1 yamt error = sqltoerrno(
539 1.1 yamt PQresultErrorField(s->res, PG_DIAG_SQLSTATE));
540 1.1 yamt assert(error != 0);
541 1.1 yamt dumperror(s->xc, s->res);
542 1.1 yamt return error;
543 1.1 yamt }
544 1.1 yamt if (status != PGRES_TUPLES_OK) {
545 1.1 yamt errx(1, "not tuples_ok: %s",
546 1.1 yamt PQerrorMessage(conn));
547 1.1 yamt }
548 1.1 yamt assert((unsigned int)PQnfields(s->res) == n);
549 1.1 yamt s->nrows = PQntuples(s->res);
550 1.1 yamt if (s->nrows == 0) {
551 1.1 yamt DPRINTF("no rows\n");
552 1.1 yamt return ENOENT;
553 1.1 yamt }
554 1.1 yamt assert(s->nrows >= 1);
555 1.1 yamt s->cur = 0;
556 1.1 yamt }
557 1.1 yamt for (i = 0; i < n; i++) {
558 1.1 yamt size_t size;
559 1.1 yamt
560 1.1 yamt assert((types[i] != BYTEA) == (PQfformat(s->res, i) == 0));
561 1.1 yamt DPRINTF("[%u] PQftype = %d, types = %d, value = '%s'\n",
562 1.1 yamt i, PQftype(s->res, i), types[i],
563 1.1 yamt PQgetisnull(s->res, s->cur, i) ? "<NULL>" :
564 1.1 yamt PQfformat(s->res, i) == 0 ? PQgetvalue(s->res, s->cur, i) :
565 1.1 yamt "<BINARY>");
566 1.1 yamt assert(PQftype(s->res, i) == types[i]);
567 1.1 yamt assert(!PQgetisnull(s->res, s->cur, i));
568 1.1 yamt switch(types[i]) {
569 1.1 yamt case INT8OID:
570 1.1 yamt *va_arg(ap, int64_t *) =
571 1.1 yamt getint(PQgetvalue(s->res, s->cur, i));
572 1.1 yamt break;
573 1.1 yamt case OIDOID:
574 1.1 yamt *va_arg(ap, Oid *) =
575 1.1 yamt getint(PQgetvalue(s->res, s->cur, i));
576 1.1 yamt break;
577 1.1 yamt case INT4OID:
578 1.1 yamt *va_arg(ap, int32_t *) =
579 1.1 yamt getint(PQgetvalue(s->res, s->cur, i));
580 1.1 yamt break;
581 1.1 yamt case TEXTOID:
582 1.1 yamt *va_arg(ap, char **) =
583 1.1 yamt estrdup(PQgetvalue(s->res, s->cur, i));
584 1.1 yamt break;
585 1.1 yamt case BYTEA:
586 1.1 yamt size = PQgetlength(s->res, s->cur, i);
587 1.1 yamt memcpy(va_arg(ap, void *),
588 1.1 yamt PQgetvalue(s->res, s->cur, i), size);
589 1.1 yamt *va_arg(ap, size_t *) = size;
590 1.1 yamt break;
591 1.1 yamt default:
592 1.1 yamt errx(EXIT_FAILURE, "%s unknown type %u", __func__,
593 1.1 yamt types[i]);
594 1.1 yamt }
595 1.1 yamt }
596 1.1 yamt s->cur++;
597 1.1 yamt if (s->cur == s->nrows) {
598 1.1 yamt PQclear(s->res);
599 1.1 yamt s->res = NULL;
600 1.1 yamt }
601 1.1 yamt return 0;
602 1.1 yamt }
603 1.1 yamt
604 1.1 yamt int
605 1.1 yamt fetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, ...)
606 1.1 yamt {
607 1.1 yamt va_list ap;
608 1.1 yamt int error;
609 1.1 yamt
610 1.1 yamt va_start(ap, types);
611 1.1 yamt error = vfetchnext(s, n, types, ap);
612 1.1 yamt va_end(ap);
613 1.1 yamt return error;
614 1.1 yamt }
615 1.1 yamt
616 1.1 yamt void
617 1.1 yamt fetchdone(struct fetchstatus *s)
618 1.1 yamt {
619 1.1 yamt
620 1.1 yamt if (s->res != NULL) {
621 1.1 yamt PQclear(s->res);
622 1.1 yamt s->res = NULL;
623 1.1 yamt }
624 1.1 yamt if (!s->done) {
625 1.1 yamt PGresult *res;
626 1.1 yamt unsigned int n;
627 1.1 yamt
628 1.1 yamt n = 0;
629 1.1 yamt while ((res = PQgetResult(s->xc->conn)) != NULL) {
630 1.1 yamt PQclear(res);
631 1.1 yamt n++;
632 1.1 yamt }
633 1.1 yamt if (n > 0) {
634 1.1 yamt DPRINTF("%u rows dropped\n", n);
635 1.1 yamt }
636 1.1 yamt }
637 1.1 yamt }
638 1.1 yamt
639 1.1 yamt int
640 1.1 yamt simplefetch(struct Xconn *xc, Oid type, ...)
641 1.1 yamt {
642 1.1 yamt struct fetchstatus s;
643 1.1 yamt va_list ap;
644 1.1 yamt int error;
645 1.1 yamt
646 1.1 yamt fetchinit(&s, xc);
647 1.1 yamt va_start(ap, type);
648 1.1 yamt error = vfetchnext(&s, 1, &type, ap);
649 1.1 yamt va_end(ap);
650 1.1 yamt assert(error != 0 || s.res == NULL);
651 1.1 yamt fetchdone(&s);
652 1.1 yamt return error;
653 1.1 yamt }
654 1.1 yamt
655 1.2 yamt static void
656 1.2 yamt setlabel(struct Xconn *xc, const char *label)
657 1.2 yamt {
658 1.2 yamt int error;
659 1.2 yamt
660 1.2 yamt /*
661 1.2 yamt * put the label into application_name so that it's shown in
662 1.2 yamt * pg_stat_activity. we are sure that our labels don't need
663 1.2 yamt * PQescapeStringConn.
664 1.2 yamt *
665 1.2 yamt * example:
666 1.2 yamt * SELECT pid,application_name,query FROM pg_stat_activity
667 1.2 yamt * WHERE state <> 'idle'
668 1.2 yamt */
669 1.2 yamt
670 1.2 yamt if (label != NULL) {
671 1.2 yamt struct cmd *c;
672 1.2 yamt char cmd_str[1024];
673 1.2 yamt
674 1.2 yamt snprintf(cmd_str, sizeof(cmd_str),
675 1.2 yamt "SET application_name TO 'pgfs: %s'", label);
676 1.2 yamt c = createcmd(cmd_str, CMD_NOPREPARE);
677 1.2 yamt error = simplecmd(xc, c);
678 1.2 yamt freecmd(c);
679 1.2 yamt assert(error == 0);
680 1.2 yamt } else {
681 1.2 yamt #if 0 /* don't bother to clear label */
682 1.2 yamt static struct cmd *c;
683 1.2 yamt
684 1.2 yamt CREATECMD_NOPARAM(c, "SET application_name TO 'pgfs'");
685 1.2 yamt error = simplecmd(xc, c);
686 1.2 yamt assert(error == 0);
687 1.2 yamt #endif
688 1.2 yamt }
689 1.2 yamt }
690 1.2 yamt
691 1.1 yamt struct Xconn *
692 1.2 yamt begin(struct puffs_usermount *pu, const char *label)
693 1.1 yamt {
694 1.1 yamt struct Xconn *xc = getxc(puffs_cc_getcc(pu));
695 1.1 yamt static struct cmd *c;
696 1.1 yamt int error;
697 1.1 yamt
698 1.2 yamt setlabel(xc, label);
699 1.1 yamt CREATECMD_NOPARAM(c, "BEGIN");
700 1.1 yamt assert(!xc->in_trans);
701 1.1 yamt error = simplecmd(xc, c);
702 1.1 yamt assert(error == 0);
703 1.1 yamt assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
704 1.1 yamt xc->in_trans = true;
705 1.1 yamt return xc;
706 1.1 yamt }
707 1.1 yamt
708 1.1 yamt struct Xconn *
709 1.2 yamt begin_readonly(struct puffs_usermount *pu, const char *label)
710 1.1 yamt {
711 1.1 yamt struct Xconn *xc = getxc(puffs_cc_getcc(pu));
712 1.1 yamt static struct cmd *c;
713 1.1 yamt int error;
714 1.1 yamt
715 1.2 yamt setlabel(xc, label);
716 1.1 yamt CREATECMD_NOPARAM(c, "BEGIN READ ONLY");
717 1.1 yamt assert(!xc->in_trans);
718 1.1 yamt error = simplecmd(xc, c);
719 1.1 yamt assert(error == 0);
720 1.1 yamt assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
721 1.1 yamt xc->in_trans = true;
722 1.1 yamt return xc;
723 1.1 yamt }
724 1.1 yamt
725 1.1 yamt void
726 1.1 yamt rollback(struct Xconn *xc)
727 1.1 yamt {
728 1.1 yamt PGTransactionStatusType status;
729 1.1 yamt
730 1.1 yamt /*
731 1.1 yamt * check the status as we are not sure the status of our transaction
732 1.1 yamt * after a failed commit.
733 1.1 yamt */
734 1.1 yamt status = PQtransactionStatus(xc->conn);
735 1.1 yamt assert(status != PQTRANS_ACTIVE);
736 1.1 yamt assert(status != PQTRANS_UNKNOWN);
737 1.1 yamt if (status != PQTRANS_IDLE) {
738 1.1 yamt static struct cmd *c;
739 1.1 yamt int error;
740 1.1 yamt
741 1.1 yamt assert(status == PQTRANS_INTRANS || status == PQTRANS_INERROR);
742 1.1 yamt CREATECMD_NOPARAM(c, "ROLLBACK");
743 1.1 yamt error = simplecmd(xc, c);
744 1.1 yamt assert(error == 0);
745 1.1 yamt }
746 1.1 yamt DPRINTF("xc %p rollback %p\n", xc, xc->owner);
747 1.2 yamt setlabel(xc, NULL);
748 1.1 yamt relxc(xc);
749 1.1 yamt }
750 1.1 yamt
751 1.1 yamt int
752 1.1 yamt commit(struct Xconn *xc)
753 1.1 yamt {
754 1.1 yamt static struct cmd *c;
755 1.1 yamt int error;
756 1.1 yamt
757 1.1 yamt CREATECMD_NOPARAM(c, "COMMIT");
758 1.1 yamt error = simplecmd(xc, c);
759 1.2 yamt setlabel(xc, NULL);
760 1.1 yamt if (error == 0) {
761 1.1 yamt DPRINTF("xc %p commit %p\n", xc, xc->owner);
762 1.1 yamt relxc(xc);
763 1.1 yamt }
764 1.1 yamt return error;
765 1.1 yamt }
766 1.1 yamt
767 1.1 yamt int
768 1.1 yamt commit_sync(struct Xconn *xc)
769 1.1 yamt {
770 1.1 yamt static struct cmd *c;
771 1.1 yamt int error;
772 1.1 yamt
773 1.1 yamt assert(!pgfs_dosync);
774 1.1 yamt CREATECMD_NOPARAM(c, "SET LOCAL SYNCHRONOUS_COMMIT TO ON");
775 1.1 yamt error = simplecmd(xc, c);
776 1.1 yamt assert(error == 0);
777 1.1 yamt return commit(xc);
778 1.1 yamt }
779 1.1 yamt
780 1.1 yamt static void
781 1.1 yamt pgfs_notice_receiver(void *vp, const PGresult *res)
782 1.1 yamt {
783 1.1 yamt struct Xconn *xc = vp;
784 1.1 yamt
785 1.1 yamt assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR);
786 1.1 yamt fprintf(stderr, "got a notice on %p\n", xc);
787 1.1 yamt dumperror(xc, res);
788 1.1 yamt }
789 1.1 yamt
790 1.1 yamt static int
791 1.1 yamt pgfs_readframe(struct puffs_usermount *pu, struct puffs_framebuf *pufbuf,
792 1.1 yamt int fd, int *done)
793 1.1 yamt {
794 1.1 yamt struct Xconn *xc;
795 1.1 yamt PGconn *conn;
796 1.1 yamt
797 1.1 yamt TAILQ_FOREACH(xc, &xclist, list) {
798 1.1 yamt if (PQsocket(xc->conn) == fd) {
799 1.1 yamt break;
800 1.1 yamt }
801 1.1 yamt }
802 1.1 yamt assert(xc != NULL);
803 1.1 yamt conn = xc->conn;
804 1.1 yamt PQconsumeInput(conn);
805 1.1 yamt if (!PQisBusy(conn)) {
806 1.1 yamt if (xc->blocker != NULL) {
807 1.1 yamt DPRINTF("schedule %p\n", xc->blocker);
808 1.1 yamt puffs_cc_schedule(xc->blocker);
809 1.1 yamt } else {
810 1.1 yamt DPRINTF("no blockers\n");
811 1.1 yamt }
812 1.1 yamt }
813 1.1 yamt *done = 0;
814 1.1 yamt return 0;
815 1.1 yamt }
816 1.1 yamt
817 1.1 yamt int
818 1.1 yamt pgfs_connectdb(struct puffs_usermount *pu, const char *dbname,
819 1.1 yamt const char *dbuser, bool debug, bool synchronous, unsigned int nconn)
820 1.1 yamt {
821 1.1 yamt const char *keywords[3+1];
822 1.1 yamt const char *values[3];
823 1.1 yamt unsigned int i;
824 1.1 yamt
825 1.1 yamt if (nconn > 32) {
826 1.1 yamt /*
827 1.1 yamt * limit from sizeof(cmd->prepared_mask)
828 1.1 yamt */
829 1.1 yamt return EINVAL;
830 1.1 yamt }
831 1.1 yamt if (debug) {
832 1.1 yamt pgfs_dodprintf = true;
833 1.1 yamt }
834 1.1 yamt if (synchronous) {
835 1.1 yamt pgfs_dosync = true;
836 1.1 yamt }
837 1.1 yamt i = 0;
838 1.1 yamt if (dbname != NULL) {
839 1.1 yamt keywords[i] = "dbname";
840 1.1 yamt values[i] = dbname;
841 1.1 yamt i++;
842 1.1 yamt }
843 1.1 yamt if (dbuser != NULL) {
844 1.1 yamt keywords[i] = "user";
845 1.1 yamt values[i] = dbuser;
846 1.1 yamt i++;
847 1.1 yamt }
848 1.1 yamt keywords[i] = "application_name";
849 1.1 yamt values[i] = "pgfs";
850 1.1 yamt i++;
851 1.1 yamt keywords[i] = NULL;
852 1.1 yamt puffs_framev_init(pu, pgfs_readframe, NULL, NULL, NULL, NULL);
853 1.1 yamt for (i = 0; i < nconn; i++) {
854 1.1 yamt struct Xconn *xc;
855 1.1 yamt struct Xconn *xc2;
856 1.1 yamt static int xcid;
857 1.1 yamt PGconn *conn;
858 1.1 yamt struct cmd *c;
859 1.1 yamt int error;
860 1.1 yamt
861 1.1 yamt conn = PQconnectdbParams(keywords, values, 0);
862 1.1 yamt if (conn == NULL) {
863 1.1 yamt errx(EXIT_FAILURE,
864 1.1 yamt "PQconnectdbParams: unknown failure");
865 1.1 yamt }
866 1.1 yamt if (PQstatus(conn) != CONNECTION_OK) {
867 1.1 yamt /*
868 1.1 yamt * XXX sleep and retry on ERRCODE_CANNOT_CONNECT_NOW
869 1.1 yamt */
870 1.1 yamt errx(EXIT_FAILURE, "PQconnectdbParams: %s",
871 1.1 yamt PQerrorMessage(conn));
872 1.1 yamt }
873 1.1 yamt DPRINTF("protocol version %d\n", PQprotocolVersion(conn));
874 1.1 yamt puffs_framev_addfd(pu, PQsocket(conn), PUFFS_FBIO_READ);
875 1.1 yamt xc = emalloc(sizeof(*xc));
876 1.1 yamt xc->conn = conn;
877 1.1 yamt xc->blocker = NULL;
878 1.1 yamt xc->owner = NULL;
879 1.1 yamt xc->in_trans = false;
880 1.1 yamt xc->id = xcid++;
881 1.1 yamt assert(xc->id < 32);
882 1.1 yamt PQsetNoticeReceiver(conn, pgfs_notice_receiver, xc);
883 1.1 yamt TAILQ_INSERT_HEAD(&xclist, xc, list);
884 1.2 yamt xc2 = begin(pu, NULL);
885 1.1 yamt assert(xc2 == xc);
886 1.1 yamt c = createcmd("SET search_path TO pgfs", CMD_NOPREPARE);
887 1.1 yamt error = simplecmd(xc, c);
888 1.1 yamt assert(error == 0);
889 1.1 yamt freecmd(c);
890 1.1 yamt c = createcmd("SET SESSION CHARACTERISTICS AS "
891 1.1 yamt "TRANSACTION ISOLATION LEVEL REPEATABLE READ",
892 1.1 yamt CMD_NOPREPARE);
893 1.1 yamt error = simplecmd(xc, c);
894 1.1 yamt assert(error == 0);
895 1.1 yamt freecmd(c);
896 1.1 yamt c = createcmd("SET SESSION TIME ZONE UTC", CMD_NOPREPARE);
897 1.1 yamt error = simplecmd(xc, c);
898 1.1 yamt assert(error == 0);
899 1.1 yamt freecmd(c);
900 1.1 yamt if (!pgfs_dosync) {
901 1.1 yamt c = createcmd("SET SESSION SYNCHRONOUS_COMMIT TO OFF",
902 1.1 yamt CMD_NOPREPARE);
903 1.1 yamt error = simplecmd(xc, c);
904 1.1 yamt assert(error == 0);
905 1.1 yamt freecmd(c);
906 1.1 yamt }
907 1.1 yamt if (debug) {
908 1.1 yamt struct fetchstatus s;
909 1.1 yamt static const Oid types[] = { INT8OID, };
910 1.1 yamt uint64_t pid;
911 1.1 yamt
912 1.1 yamt c = createcmd("SELECT pg_backend_pid()::int8;",
913 1.1 yamt CMD_NOPREPARE);
914 1.1 yamt error = sendcmd(xc, c);
915 1.1 yamt assert(error == 0);
916 1.1 yamt fetchinit(&s, xc);
917 1.1 yamt error = FETCHNEXT(&s, types, &pid);
918 1.1 yamt fetchdone(&s);
919 1.1 yamt assert(error == 0);
920 1.1 yamt DPRINTF("xc %p backend pid %" PRIu64 "\n", xc, pid);
921 1.1 yamt }
922 1.1 yamt error = commit(xc);
923 1.1 yamt assert(error == 0);
924 1.1 yamt assert(xc->owner == NULL);
925 1.1 yamt }
926 1.1 yamt /*
927 1.1 yamt * XXX cleanup unlinked files here? what to do when the filesystem
928 1.1 yamt * is shared?
929 1.1 yamt */
930 1.1 yamt return 0;
931 1.1 yamt }
932 1.1 yamt
933 1.1 yamt struct waitq flushwaitq = TAILQ_HEAD_INITIALIZER(flushwaitq);
934 1.1 yamt struct puffs_cc *flusher = NULL;
935 1.1 yamt
936 1.1 yamt int
937 1.1 yamt flush_xacts(struct puffs_usermount *pu)
938 1.1 yamt {
939 1.1 yamt struct puffs_cc *cc = puffs_cc_getcc(pu);
940 1.1 yamt struct Xconn *xc;
941 1.1 yamt static struct cmd *c;
942 1.1 yamt uint64_t dummy;
943 1.1 yamt int error;
944 1.1 yamt
945 1.1 yamt /*
946 1.1 yamt * flush all previously issued asynchronous transactions.
947 1.1 yamt *
948 1.1 yamt * XXX
949 1.1 yamt * unfortunately it seems that there is no clean way to tell
950 1.1 yamt * PostgreSQL flush XLOG. we could perform a CHECKPOINT but it's
951 1.1 yamt * too expensive and overkill for our purpose.
952 1.1 yamt * besides, PostgreSQL has an optimization to skip XLOG flushing
953 1.1 yamt * for transactions which didn't produce WAL records.
954 1.1 yamt * (changeset f6a0863e3cb72763490ceca2c558d5ef2dddd5f2)
955 1.1 yamt * it means that an empty transaction ("BEGIN; COMMIT;"), which
956 1.1 yamt * doesn't produce any WAL records, doesn't flush the XLOG even if
957 1.1 yamt * synchronous_commit=on. we issues a dummy setval() to avoid the
958 1.1 yamt * optimization.
959 1.1 yamt * on the other hand, we try to avoid creating unnecessary WAL activity
960 1.1 yamt * by serializing flushing and checking XLOG locations.
961 1.1 yamt */
962 1.1 yamt
963 1.1 yamt assert(!pgfs_dosync);
964 1.1 yamt if (flusher != NULL) { /* serialize flushers */
965 1.1 yamt DPRINTF("%p flush in progress %p\n", cc, flusher);
966 1.1 yamt waiton(&flushwaitq, cc);
967 1.1 yamt assert(flusher == NULL);
968 1.1 yamt }
969 1.1 yamt DPRINTF("%p start flushing\n", cc);
970 1.1 yamt flusher = cc;
971 1.1 yamt retry:
972 1.2 yamt xc = begin(pu, "flush");
973 1.1 yamt CREATECMD_NOPARAM(c, "SELECT setval('dummyseq', 1) WHERE "
974 1.1 yamt "pg_current_xlog_insert_location() <> pg_current_xlog_location()");
975 1.1 yamt error = sendcmd(xc, c);
976 1.1 yamt if (error != 0) {
977 1.1 yamt goto got_error;
978 1.1 yamt }
979 1.1 yamt error = simplefetch(xc, INT8OID, &dummy);
980 1.1 yamt assert(error != 0 || dummy == 1);
981 1.1 yamt if (error == ENOENT) {
982 1.1 yamt /*
983 1.1 yamt * there seems to be nothing to flush.
984 1.1 yamt */
985 1.1 yamt DPRINTF("%p no sync\n", cc);
986 1.1 yamt error = 0;
987 1.1 yamt }
988 1.1 yamt if (error != 0) {
989 1.1 yamt goto got_error;
990 1.1 yamt }
991 1.1 yamt error = commit_sync(xc);
992 1.1 yamt if (error != 0) {
993 1.1 yamt goto got_error;
994 1.1 yamt }
995 1.1 yamt goto done;
996 1.1 yamt got_error:
997 1.1 yamt rollback(xc);
998 1.1 yamt if (error == EAGAIN) {
999 1.1 yamt goto retry;
1000 1.1 yamt }
1001 1.1 yamt done:
1002 1.1 yamt assert(flusher == cc);
1003 1.1 yamt flusher = NULL;
1004 1.1 yamt wakeup_one(&flushwaitq);
1005 1.1 yamt DPRINTF("%p end flushing error=%d\n", cc, error);
1006 1.1 yamt return error;
1007 1.1 yamt }
1008