pgfs_db.c revision 1.1 1 1.1 yamt /* $NetBSD: pgfs_db.c,v 1.1 2011/10/12 01:05:00 yamt Exp $ */
2 1.1 yamt
3 1.1 yamt /*-
4 1.1 yamt * Copyright (c)2010,2011 YAMAMOTO Takashi,
5 1.1 yamt * All rights reserved.
6 1.1 yamt *
7 1.1 yamt * Redistribution and use in source and binary forms, with or without
8 1.1 yamt * modification, are permitted provided that the following conditions
9 1.1 yamt * are met:
10 1.1 yamt * 1. Redistributions of source code must retain the above copyright
11 1.1 yamt * notice, this list of conditions and the following disclaimer.
12 1.1 yamt * 2. Redistributions in binary form must reproduce the above copyright
13 1.1 yamt * notice, this list of conditions and the following disclaimer in the
14 1.1 yamt * documentation and/or other materials provided with the distribution.
15 1.1 yamt *
16 1.1 yamt * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 1.1 yamt * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 1.1 yamt * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 1.1 yamt * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 1.1 yamt * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 1.1 yamt * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 1.1 yamt * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 1.1 yamt * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 1.1 yamt * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 1.1 yamt * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 1.1 yamt * SUCH DAMAGE.
27 1.1 yamt */
28 1.1 yamt
29 1.1 yamt /*
30 1.1 yamt * backend db operations
31 1.1 yamt */
32 1.1 yamt
33 1.1 yamt #include <sys/cdefs.h>
34 1.1 yamt #ifndef lint
35 1.1 yamt __RCSID("$NetBSD: pgfs_db.c,v 1.1 2011/10/12 01:05:00 yamt Exp $");
36 1.1 yamt #endif /* not lint */
37 1.1 yamt
38 1.1 yamt #include <assert.h>
39 1.1 yamt #include <err.h>
40 1.1 yamt #include <errno.h>
41 1.1 yamt #include <inttypes.h>
42 1.1 yamt #include <puffs.h>
43 1.1 yamt #include <stdbool.h>
44 1.1 yamt #include <stdarg.h>
45 1.1 yamt #include <stdio.h>
46 1.1 yamt #include <stdlib.h>
47 1.1 yamt #include <util.h>
48 1.1 yamt
49 1.1 yamt #include <libpq-fe.h>
50 1.1 yamt
51 1.1 yamt #include "pgfs_db.h"
52 1.1 yamt #include "pgfs_waitq.h"
53 1.1 yamt #include "pgfs_debug.h"
54 1.1 yamt
55 1.1 yamt bool pgfs_dosync = false;
56 1.1 yamt
57 1.1 yamt struct Xconn {
58 1.1 yamt TAILQ_ENTRY(Xconn) list;
59 1.1 yamt PGconn *conn;
60 1.1 yamt struct puffs_cc *blocker;
61 1.1 yamt struct puffs_cc *owner;
62 1.1 yamt bool in_trans;
63 1.1 yamt int id;
64 1.1 yamt };
65 1.1 yamt
66 1.1 yamt static void
67 1.1 yamt dumperror(struct Xconn *xc, const PGresult *res)
68 1.1 yamt {
69 1.1 yamt static const struct {
70 1.1 yamt const char *name;
71 1.1 yamt int code;
72 1.1 yamt } fields[] = {
73 1.1 yamt #define F(x) { .name = #x, .code = x, }
74 1.1 yamt F(PG_DIAG_SEVERITY),
75 1.1 yamt F(PG_DIAG_SQLSTATE),
76 1.1 yamt F(PG_DIAG_MESSAGE_PRIMARY),
77 1.1 yamt F(PG_DIAG_MESSAGE_DETAIL),
78 1.1 yamt F(PG_DIAG_MESSAGE_HINT),
79 1.1 yamt F(PG_DIAG_STATEMENT_POSITION),
80 1.1 yamt F(PG_DIAG_INTERNAL_POSITION),
81 1.1 yamt F(PG_DIAG_INTERNAL_QUERY),
82 1.1 yamt F(PG_DIAG_CONTEXT),
83 1.1 yamt F(PG_DIAG_SOURCE_FILE),
84 1.1 yamt F(PG_DIAG_SOURCE_LINE),
85 1.1 yamt F(PG_DIAG_SOURCE_FUNCTION),
86 1.1 yamt #undef F
87 1.1 yamt };
88 1.1 yamt unsigned int i;
89 1.1 yamt
90 1.1 yamt if (!pgfs_dodprintf) {
91 1.1 yamt return;
92 1.1 yamt }
93 1.1 yamt assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR ||
94 1.1 yamt PQresultStatus(res) == PGRES_FATAL_ERROR);
95 1.1 yamt for (i = 0; i < __arraycount(fields); i++) {
96 1.1 yamt const char *val = PQresultErrorField(res, fields[i].code);
97 1.1 yamt
98 1.1 yamt if (val == NULL) {
99 1.1 yamt continue;
100 1.1 yamt }
101 1.1 yamt fprintf(stderr, "%s: %s\n", fields[i].name, val);
102 1.1 yamt }
103 1.1 yamt }
104 1.1 yamt
105 1.1 yamt TAILQ_HEAD(, Xconn) xclist = TAILQ_HEAD_INITIALIZER(xclist);
106 1.1 yamt struct waitq xcwaitq = TAILQ_HEAD_INITIALIZER(xcwaitq);
107 1.1 yamt
108 1.1 yamt static struct Xconn *
109 1.1 yamt getxc(struct puffs_cc *cc)
110 1.1 yamt {
111 1.1 yamt struct Xconn *xc;
112 1.1 yamt
113 1.1 yamt assert(cc != NULL);
114 1.1 yamt retry:
115 1.1 yamt TAILQ_FOREACH(xc, &xclist, list) {
116 1.1 yamt if (xc->blocker == NULL) {
117 1.1 yamt assert(xc->owner == NULL);
118 1.1 yamt xc->owner = cc;
119 1.1 yamt DPRINTF("xc %p acquire %p\n", xc, cc);
120 1.1 yamt return xc;
121 1.1 yamt } else {
122 1.1 yamt assert(xc->owner == xc->blocker);
123 1.1 yamt }
124 1.1 yamt }
125 1.1 yamt DPRINTF("no free conn %p\n", cc);
126 1.1 yamt waiton(&xcwaitq, cc);
127 1.1 yamt goto retry;
128 1.1 yamt }
129 1.1 yamt
130 1.1 yamt static void
131 1.1 yamt relxc(struct Xconn *xc)
132 1.1 yamt {
133 1.1 yamt
134 1.1 yamt assert(xc->in_trans);
135 1.1 yamt assert(xc->owner != NULL);
136 1.1 yamt xc->in_trans = false;
137 1.1 yamt xc->owner = NULL;
138 1.1 yamt wakeup_one(&xcwaitq);
139 1.1 yamt }
140 1.1 yamt
141 1.1 yamt static void
142 1.1 yamt pqwait(struct Xconn *xc)
143 1.1 yamt {
144 1.1 yamt PGconn *conn = xc->conn;
145 1.1 yamt struct puffs_cc *cc = xc->owner;
146 1.1 yamt
147 1.1 yamt if (PQflush(conn)) {
148 1.1 yamt errx(EXIT_FAILURE, "PQflush: %s", PQerrorMessage(conn));
149 1.1 yamt }
150 1.1 yamt if (!PQisBusy(conn)) {
151 1.1 yamt return;
152 1.1 yamt }
153 1.1 yamt assert(xc->blocker == NULL);
154 1.1 yamt xc->blocker = cc;
155 1.1 yamt DPRINTF("yielding %p\n", cc);
156 1.1 yamt /* XXX is it safe to yield before entering mainloop? */
157 1.1 yamt puffs_cc_yield(cc);
158 1.1 yamt DPRINTF("yield returned %p\n", cc);
159 1.1 yamt assert(xc->owner == cc);
160 1.1 yamt assert(xc->blocker == cc);
161 1.1 yamt xc->blocker = NULL;
162 1.1 yamt }
163 1.1 yamt
164 1.1 yamt static int
165 1.1 yamt sqltoerrno(const char *sqlstate)
166 1.1 yamt {
167 1.1 yamt /*
168 1.1 yamt * XXX hack; ERRCODE_INTERNAL_ERROR -> EAGAIN to handle
169 1.1 yamt * "tuple concurrently updated" errors for lowrite/lo_truncate.
170 1.1 yamt *
171 1.1 yamt * XXX should map ERRCODE_OUT_OF_MEMORY to EAGAIN?
172 1.1 yamt */
173 1.1 yamt static const struct {
174 1.1 yamt char sqlstate[5];
175 1.1 yamt int error;
176 1.1 yamt } map[] = {
177 1.1 yamt { "00000", 0, }, /* ERRCODE_SUCCESSFUL_COMPLETION */
178 1.1 yamt { "02000", ENOENT, }, /* ERRCODE_NO_DATA */
179 1.1 yamt { "23505", EEXIST, }, /* ERRCODE_UNIQUE_VIOLATION */
180 1.1 yamt { "23514", EINVAL, }, /* ERRCODE_CHECK_VIOLATION */
181 1.1 yamt { "40001", EAGAIN, }, /* ERRCODE_T_R_SERIALIZATION_FAILURE */
182 1.1 yamt { "40P01", EAGAIN, }, /* ERRCODE_T_R_DEADLOCK_DETECTED */
183 1.1 yamt { "42704", ENOENT, }, /* ERRCODE_UNDEFINED_OBJECT */
184 1.1 yamt { "53100", ENOSPC, }, /* ERRCODE_DISK_FULL */
185 1.1 yamt { "53200", ENOMEM, }, /* ERRCODE_OUT_OF_MEMORY */
186 1.1 yamt { "XX000", EAGAIN, }, /* ERRCODE_INTERNAL_ERROR */
187 1.1 yamt };
188 1.1 yamt unsigned int i;
189 1.1 yamt
190 1.1 yamt for (i = 0; i < __arraycount(map); i++) {
191 1.1 yamt if (!memcmp(map[i].sqlstate, sqlstate, 5)) {
192 1.1 yamt const int error = map[i].error;
193 1.1 yamt
194 1.1 yamt if (error != 0) {
195 1.1 yamt DPRINTF("sqlstate %5s mapped to error %d\n",
196 1.1 yamt sqlstate, error);
197 1.1 yamt }
198 1.1 yamt if (error == EINVAL) {
199 1.1 yamt /*
200 1.1 yamt * sounds like a bug.
201 1.1 yamt */
202 1.1 yamt abort();
203 1.1 yamt }
204 1.1 yamt return error;
205 1.1 yamt }
206 1.1 yamt }
207 1.1 yamt DPRINTF("unknown sqlstate %5s mapped to EIO\n", sqlstate);
208 1.1 yamt return EIO;
209 1.1 yamt }
210 1.1 yamt
211 1.1 yamt struct cmd {
212 1.1 yamt char name[32]; /* name of prepared statement */
213 1.1 yamt char *cmd; /* query string */
214 1.1 yamt unsigned int nparams;
215 1.1 yamt Oid *paramtypes;
216 1.1 yamt uint32_t prepared_mask; /* for which connections this is prepared? */
217 1.1 yamt unsigned int flags; /* CMD_ flags */
218 1.1 yamt };
219 1.1 yamt
220 1.1 yamt #define CMD_NOPREPARE 1 /* don't prepare this command */
221 1.1 yamt
222 1.1 yamt struct cmd *
223 1.1 yamt createcmd(const char *cmd, unsigned int flags, ...)
224 1.1 yamt {
225 1.1 yamt struct cmd *c;
226 1.1 yamt va_list ap;
227 1.1 yamt const char *cp;
228 1.1 yamt unsigned int i;
229 1.1 yamt static unsigned int cmdid;
230 1.1 yamt
231 1.1 yamt c = emalloc(sizeof(*c));
232 1.1 yamt c->cmd = estrdup(cmd);
233 1.1 yamt c->nparams = 0;
234 1.1 yamt va_start(ap, flags);
235 1.1 yamt for (cp = cmd; *cp != 0; cp++) {
236 1.1 yamt if (*cp == '$') { /* XXX */
237 1.1 yamt c->nparams++;
238 1.1 yamt }
239 1.1 yamt }
240 1.1 yamt c->paramtypes = emalloc(c->nparams * sizeof(*c->paramtypes));
241 1.1 yamt for (i = 0; i < c->nparams; i++) {
242 1.1 yamt Oid type = va_arg(ap, Oid);
243 1.1 yamt assert(type == BYTEA ||
244 1.1 yamt type == INT4OID || type == INT8OID || type == OIDOID ||
245 1.1 yamt type == TEXTOID || type == TIMESTAMPTZOID);
246 1.1 yamt c->paramtypes[i] = type;
247 1.1 yamt }
248 1.1 yamt va_end(ap);
249 1.1 yamt snprintf(c->name, sizeof(c->name), "%u", cmdid++);
250 1.1 yamt if ((flags & CMD_NOPREPARE) != 0) {
251 1.1 yamt c->prepared_mask = ~0;
252 1.1 yamt } else {
253 1.1 yamt c->prepared_mask = 0;
254 1.1 yamt }
255 1.1 yamt c->flags = flags;
256 1.1 yamt return c;
257 1.1 yamt }
258 1.1 yamt
259 1.1 yamt static void
260 1.1 yamt freecmd(struct cmd *c)
261 1.1 yamt {
262 1.1 yamt
263 1.1 yamt free(c->paramtypes);
264 1.1 yamt free(c->cmd);
265 1.1 yamt free(c);
266 1.1 yamt }
267 1.1 yamt
268 1.1 yamt static int
269 1.1 yamt fetch_noresult(struct Xconn *xc)
270 1.1 yamt {
271 1.1 yamt PGresult *res;
272 1.1 yamt ExecStatusType status;
273 1.1 yamt PGconn *conn = xc->conn;
274 1.1 yamt int error;
275 1.1 yamt
276 1.1 yamt pqwait(xc);
277 1.1 yamt res = PQgetResult(conn);
278 1.1 yamt if (res == NULL) {
279 1.1 yamt return ENOENT;
280 1.1 yamt }
281 1.1 yamt status = PQresultStatus(res);
282 1.1 yamt if (status == PGRES_COMMAND_OK) {
283 1.1 yamt assert(PQnfields(res) == 0);
284 1.1 yamt assert(PQntuples(res) == 0);
285 1.1 yamt if (!strcmp(PQcmdTuples(res), "0")) {
286 1.1 yamt error = ENOENT;
287 1.1 yamt } else {
288 1.1 yamt error = 0;
289 1.1 yamt }
290 1.1 yamt } else if (status == PGRES_FATAL_ERROR) {
291 1.1 yamt error = sqltoerrno(PQresultErrorField(res, PG_DIAG_SQLSTATE));
292 1.1 yamt assert(error != 0);
293 1.1 yamt dumperror(xc, res);
294 1.1 yamt } else {
295 1.1 yamt errx(1, "%s not command_ok: %d: %s", __func__,
296 1.1 yamt (int)status,
297 1.1 yamt PQerrorMessage(conn));
298 1.1 yamt }
299 1.1 yamt PQclear(res);
300 1.1 yamt res = PQgetResult(conn);
301 1.1 yamt assert(res == NULL);
302 1.1 yamt if (error != 0) {
303 1.1 yamt DPRINTF("error %d\n", error);
304 1.1 yamt }
305 1.1 yamt return error;
306 1.1 yamt }
307 1.1 yamt
308 1.1 yamt static int
309 1.1 yamt preparecmd(struct Xconn *xc, struct cmd *c)
310 1.1 yamt {
311 1.1 yamt PGconn *conn = xc->conn;
312 1.1 yamt const uint32_t mask = 1 << xc->id;
313 1.1 yamt int error;
314 1.1 yamt int ret;
315 1.1 yamt
316 1.1 yamt if ((c->prepared_mask & mask) != 0) {
317 1.1 yamt return 0;
318 1.1 yamt }
319 1.1 yamt assert((c->flags & CMD_NOPREPARE) == 0);
320 1.1 yamt DPRINTF("PREPARE: '%s'\n", c->cmd);
321 1.1 yamt ret = PQsendPrepare(conn, c->name, c->cmd, c->nparams, c->paramtypes);
322 1.1 yamt if (!ret) {
323 1.1 yamt errx(EXIT_FAILURE, "PQsendPrepare: %s",
324 1.1 yamt PQerrorMessage(conn));
325 1.1 yamt }
326 1.1 yamt error = fetch_noresult(xc);
327 1.1 yamt if (error != 0) {
328 1.1 yamt return error;
329 1.1 yamt }
330 1.1 yamt c->prepared_mask |= mask;
331 1.1 yamt return 0;
332 1.1 yamt }
333 1.1 yamt
334 1.1 yamt /*
335 1.1 yamt * vsendcmd:
336 1.1 yamt *
337 1.1 yamt * resultmode is just passed to PQsendQueryParams/PQsendQueryPrepared.
338 1.1 yamt * 0 for text and 1 for binary.
339 1.1 yamt */
340 1.1 yamt
341 1.1 yamt static int
342 1.1 yamt vsendcmd(struct Xconn *xc, int resultmode, struct cmd *c, va_list ap)
343 1.1 yamt {
344 1.1 yamt PGconn *conn = xc->conn;
345 1.1 yamt char **paramvalues;
346 1.1 yamt int *paramlengths;
347 1.1 yamt int *paramformats;
348 1.1 yamt unsigned int i;
349 1.1 yamt int error;
350 1.1 yamt int ret;
351 1.1 yamt
352 1.1 yamt assert(xc->owner != NULL);
353 1.1 yamt assert(xc->blocker == NULL);
354 1.1 yamt error = preparecmd(xc, c);
355 1.1 yamt if (error != 0) {
356 1.1 yamt return error;
357 1.1 yamt }
358 1.1 yamt paramvalues = emalloc(c->nparams * sizeof(*paramvalues));
359 1.1 yamt paramlengths = NULL;
360 1.1 yamt paramformats = NULL;
361 1.1 yamt DPRINTF("CMD: '%s'\n", c->cmd);
362 1.1 yamt for (i = 0; i < c->nparams; i++) {
363 1.1 yamt Oid type = c->paramtypes[i];
364 1.1 yamt char tmpstore[1024];
365 1.1 yamt const char *buf = NULL;
366 1.1 yamt intmax_t v = 0; /* XXXgcc */
367 1.1 yamt int sz;
368 1.1 yamt bool binary = false;
369 1.1 yamt
370 1.1 yamt switch (type) {
371 1.1 yamt case BYTEA:
372 1.1 yamt buf = va_arg(ap, const void *);
373 1.1 yamt sz = (int)va_arg(ap, size_t);
374 1.1 yamt binary = true;
375 1.1 yamt break;
376 1.1 yamt case INT8OID:
377 1.1 yamt case OIDOID:
378 1.1 yamt case INT4OID:
379 1.1 yamt switch (type) {
380 1.1 yamt case INT8OID:
381 1.1 yamt v = (intmax_t)va_arg(ap, int64_t);
382 1.1 yamt break;
383 1.1 yamt case OIDOID:
384 1.1 yamt v = (intmax_t)va_arg(ap, Oid);
385 1.1 yamt break;
386 1.1 yamt case INT4OID:
387 1.1 yamt v = (intmax_t)va_arg(ap, int32_t);
388 1.1 yamt break;
389 1.1 yamt default:
390 1.1 yamt errx(EXIT_FAILURE, "unknown integer oid %u",
391 1.1 yamt type);
392 1.1 yamt }
393 1.1 yamt buf = tmpstore;
394 1.1 yamt sz = snprintf(tmpstore, sizeof(tmpstore),
395 1.1 yamt "%jd", v);
396 1.1 yamt assert(sz != -1);
397 1.1 yamt assert((size_t)sz < sizeof(tmpstore));
398 1.1 yamt sz += 1;
399 1.1 yamt break;
400 1.1 yamt case TEXTOID:
401 1.1 yamt case TIMESTAMPTZOID:
402 1.1 yamt buf = va_arg(ap, char *);
403 1.1 yamt sz = strlen(buf) + 1;
404 1.1 yamt break;
405 1.1 yamt default:
406 1.1 yamt errx(EXIT_FAILURE, "%s: unknown param type %u",
407 1.1 yamt __func__, type);
408 1.1 yamt }
409 1.1 yamt if (binary) {
410 1.1 yamt if (paramlengths == NULL) {
411 1.1 yamt paramlengths =
412 1.1 yamt emalloc(c->nparams * sizeof(*paramformats));
413 1.1 yamt }
414 1.1 yamt if (paramformats == NULL) {
415 1.1 yamt paramformats = ecalloc(1,
416 1.1 yamt c->nparams * sizeof(*paramformats));
417 1.1 yamt }
418 1.1 yamt paramformats[i] = 1;
419 1.1 yamt paramlengths[i] = sz;
420 1.1 yamt }
421 1.1 yamt paramvalues[i] = emalloc(sz);
422 1.1 yamt memcpy(paramvalues[i], buf, sz);
423 1.1 yamt if (binary) {
424 1.1 yamt DPRINTF("\t[%u]=<BINARY>\n", i);
425 1.1 yamt } else {
426 1.1 yamt DPRINTF("\t[%u]='%s'\n", i, paramvalues[i]);
427 1.1 yamt }
428 1.1 yamt }
429 1.1 yamt if ((c->flags & CMD_NOPREPARE) != 0) {
430 1.1 yamt ret = PQsendQueryParams(conn, c->cmd, c->nparams, c->paramtypes,
431 1.1 yamt (const char * const *)paramvalues, paramlengths,
432 1.1 yamt paramformats, resultmode);
433 1.1 yamt } else {
434 1.1 yamt ret = PQsendQueryPrepared(conn, c->name, c->nparams,
435 1.1 yamt (const char * const *)paramvalues, paramlengths,
436 1.1 yamt paramformats, resultmode);
437 1.1 yamt }
438 1.1 yamt for (i = 0; i < c->nparams; i++) {
439 1.1 yamt free(paramvalues[i]);
440 1.1 yamt }
441 1.1 yamt free(paramvalues);
442 1.1 yamt free(paramlengths);
443 1.1 yamt free(paramformats);
444 1.1 yamt if (!ret) {
445 1.1 yamt errx(EXIT_FAILURE, "PQsendQueryPrepared: %s",
446 1.1 yamt PQerrorMessage(conn));
447 1.1 yamt }
448 1.1 yamt return 0;
449 1.1 yamt }
450 1.1 yamt
451 1.1 yamt int
452 1.1 yamt sendcmd(struct Xconn *xc, struct cmd *c, ...)
453 1.1 yamt {
454 1.1 yamt va_list ap;
455 1.1 yamt int error;
456 1.1 yamt
457 1.1 yamt va_start(ap, c);
458 1.1 yamt error = vsendcmd(xc, 0, c, ap);
459 1.1 yamt va_end(ap);
460 1.1 yamt return error;
461 1.1 yamt }
462 1.1 yamt
463 1.1 yamt int
464 1.1 yamt sendcmdx(struct Xconn *xc, int resultmode, struct cmd *c, ...)
465 1.1 yamt {
466 1.1 yamt va_list ap;
467 1.1 yamt int error;
468 1.1 yamt
469 1.1 yamt va_start(ap, c);
470 1.1 yamt error = vsendcmd(xc, resultmode, c, ap);
471 1.1 yamt va_end(ap);
472 1.1 yamt return error;
473 1.1 yamt }
474 1.1 yamt
475 1.1 yamt /*
476 1.1 yamt * simplecmd: a convenient routine to execute a command which returns
477 1.1 yamt * no rows synchronously.
478 1.1 yamt */
479 1.1 yamt
480 1.1 yamt int
481 1.1 yamt simplecmd(struct Xconn *xc, struct cmd *c, ...)
482 1.1 yamt {
483 1.1 yamt va_list ap;
484 1.1 yamt int error;
485 1.1 yamt
486 1.1 yamt va_start(ap, c);
487 1.1 yamt error = vsendcmd(xc, 0, c, ap);
488 1.1 yamt va_end(ap);
489 1.1 yamt if (error != 0) {
490 1.1 yamt return error;
491 1.1 yamt }
492 1.1 yamt return fetch_noresult(xc);
493 1.1 yamt }
494 1.1 yamt
495 1.1 yamt void
496 1.1 yamt fetchinit(struct fetchstatus *s, struct Xconn *xc)
497 1.1 yamt {
498 1.1 yamt s->xc = xc;
499 1.1 yamt s->res = NULL;
500 1.1 yamt s->cur = 0;
501 1.1 yamt s->nrows = 0;
502 1.1 yamt s->done = false;
503 1.1 yamt }
504 1.1 yamt
505 1.1 yamt static intmax_t
506 1.1 yamt getint(const char *str)
507 1.1 yamt {
508 1.1 yamt intmax_t i;
509 1.1 yamt char *ep;
510 1.1 yamt
511 1.1 yamt errno = 0;
512 1.1 yamt i = strtoimax(str, &ep, 10);
513 1.1 yamt assert(errno == 0);
514 1.1 yamt assert(str[0] != 0);
515 1.1 yamt assert(*ep == 0);
516 1.1 yamt return i;
517 1.1 yamt }
518 1.1 yamt
519 1.1 yamt static int
520 1.1 yamt vfetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, va_list ap)
521 1.1 yamt {
522 1.1 yamt PGconn *conn = s->xc->conn;
523 1.1 yamt unsigned int i;
524 1.1 yamt
525 1.1 yamt assert(conn != NULL);
526 1.1 yamt if (s->res == NULL) {
527 1.1 yamt ExecStatusType status;
528 1.1 yamt int error;
529 1.1 yamt
530 1.1 yamt pqwait(s->xc);
531 1.1 yamt s->res = PQgetResult(conn);
532 1.1 yamt if (s->res == NULL) {
533 1.1 yamt s->done = true;
534 1.1 yamt return ENOENT;
535 1.1 yamt }
536 1.1 yamt status = PQresultStatus(s->res);
537 1.1 yamt if (status == PGRES_FATAL_ERROR) {
538 1.1 yamt error = sqltoerrno(
539 1.1 yamt PQresultErrorField(s->res, PG_DIAG_SQLSTATE));
540 1.1 yamt assert(error != 0);
541 1.1 yamt dumperror(s->xc, s->res);
542 1.1 yamt return error;
543 1.1 yamt }
544 1.1 yamt if (status != PGRES_TUPLES_OK) {
545 1.1 yamt errx(1, "not tuples_ok: %s",
546 1.1 yamt PQerrorMessage(conn));
547 1.1 yamt }
548 1.1 yamt assert((unsigned int)PQnfields(s->res) == n);
549 1.1 yamt s->nrows = PQntuples(s->res);
550 1.1 yamt if (s->nrows == 0) {
551 1.1 yamt DPRINTF("no rows\n");
552 1.1 yamt return ENOENT;
553 1.1 yamt }
554 1.1 yamt assert(s->nrows >= 1);
555 1.1 yamt s->cur = 0;
556 1.1 yamt }
557 1.1 yamt for (i = 0; i < n; i++) {
558 1.1 yamt size_t size;
559 1.1 yamt
560 1.1 yamt assert((types[i] != BYTEA) == (PQfformat(s->res, i) == 0));
561 1.1 yamt DPRINTF("[%u] PQftype = %d, types = %d, value = '%s'\n",
562 1.1 yamt i, PQftype(s->res, i), types[i],
563 1.1 yamt PQgetisnull(s->res, s->cur, i) ? "<NULL>" :
564 1.1 yamt PQfformat(s->res, i) == 0 ? PQgetvalue(s->res, s->cur, i) :
565 1.1 yamt "<BINARY>");
566 1.1 yamt assert(PQftype(s->res, i) == types[i]);
567 1.1 yamt assert(!PQgetisnull(s->res, s->cur, i));
568 1.1 yamt switch(types[i]) {
569 1.1 yamt case INT8OID:
570 1.1 yamt *va_arg(ap, int64_t *) =
571 1.1 yamt getint(PQgetvalue(s->res, s->cur, i));
572 1.1 yamt break;
573 1.1 yamt case OIDOID:
574 1.1 yamt *va_arg(ap, Oid *) =
575 1.1 yamt getint(PQgetvalue(s->res, s->cur, i));
576 1.1 yamt break;
577 1.1 yamt case INT4OID:
578 1.1 yamt *va_arg(ap, int32_t *) =
579 1.1 yamt getint(PQgetvalue(s->res, s->cur, i));
580 1.1 yamt break;
581 1.1 yamt case TEXTOID:
582 1.1 yamt *va_arg(ap, char **) =
583 1.1 yamt estrdup(PQgetvalue(s->res, s->cur, i));
584 1.1 yamt break;
585 1.1 yamt case BYTEA:
586 1.1 yamt size = PQgetlength(s->res, s->cur, i);
587 1.1 yamt memcpy(va_arg(ap, void *),
588 1.1 yamt PQgetvalue(s->res, s->cur, i), size);
589 1.1 yamt *va_arg(ap, size_t *) = size;
590 1.1 yamt break;
591 1.1 yamt default:
592 1.1 yamt errx(EXIT_FAILURE, "%s unknown type %u", __func__,
593 1.1 yamt types[i]);
594 1.1 yamt }
595 1.1 yamt }
596 1.1 yamt s->cur++;
597 1.1 yamt if (s->cur == s->nrows) {
598 1.1 yamt PQclear(s->res);
599 1.1 yamt s->res = NULL;
600 1.1 yamt }
601 1.1 yamt return 0;
602 1.1 yamt }
603 1.1 yamt
604 1.1 yamt int
605 1.1 yamt fetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, ...)
606 1.1 yamt {
607 1.1 yamt va_list ap;
608 1.1 yamt int error;
609 1.1 yamt
610 1.1 yamt va_start(ap, types);
611 1.1 yamt error = vfetchnext(s, n, types, ap);
612 1.1 yamt va_end(ap);
613 1.1 yamt return error;
614 1.1 yamt }
615 1.1 yamt
616 1.1 yamt void
617 1.1 yamt fetchdone(struct fetchstatus *s)
618 1.1 yamt {
619 1.1 yamt
620 1.1 yamt if (s->res != NULL) {
621 1.1 yamt PQclear(s->res);
622 1.1 yamt s->res = NULL;
623 1.1 yamt }
624 1.1 yamt if (!s->done) {
625 1.1 yamt PGresult *res;
626 1.1 yamt unsigned int n;
627 1.1 yamt
628 1.1 yamt n = 0;
629 1.1 yamt while ((res = PQgetResult(s->xc->conn)) != NULL) {
630 1.1 yamt PQclear(res);
631 1.1 yamt n++;
632 1.1 yamt }
633 1.1 yamt if (n > 0) {
634 1.1 yamt DPRINTF("%u rows dropped\n", n);
635 1.1 yamt }
636 1.1 yamt }
637 1.1 yamt }
638 1.1 yamt
639 1.1 yamt int
640 1.1 yamt simplefetch(struct Xconn *xc, Oid type, ...)
641 1.1 yamt {
642 1.1 yamt struct fetchstatus s;
643 1.1 yamt va_list ap;
644 1.1 yamt int error;
645 1.1 yamt
646 1.1 yamt fetchinit(&s, xc);
647 1.1 yamt va_start(ap, type);
648 1.1 yamt error = vfetchnext(&s, 1, &type, ap);
649 1.1 yamt va_end(ap);
650 1.1 yamt assert(error != 0 || s.res == NULL);
651 1.1 yamt fetchdone(&s);
652 1.1 yamt return error;
653 1.1 yamt }
654 1.1 yamt
655 1.1 yamt struct Xconn *
656 1.1 yamt begin(struct puffs_usermount *pu)
657 1.1 yamt {
658 1.1 yamt struct Xconn *xc = getxc(puffs_cc_getcc(pu));
659 1.1 yamt static struct cmd *c;
660 1.1 yamt int error;
661 1.1 yamt
662 1.1 yamt CREATECMD_NOPARAM(c, "BEGIN");
663 1.1 yamt assert(!xc->in_trans);
664 1.1 yamt error = simplecmd(xc, c);
665 1.1 yamt assert(error == 0);
666 1.1 yamt assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
667 1.1 yamt xc->in_trans = true;
668 1.1 yamt return xc;
669 1.1 yamt }
670 1.1 yamt
671 1.1 yamt struct Xconn *
672 1.1 yamt begin_readonly(struct puffs_usermount *pu)
673 1.1 yamt {
674 1.1 yamt struct Xconn *xc = getxc(puffs_cc_getcc(pu));
675 1.1 yamt static struct cmd *c;
676 1.1 yamt int error;
677 1.1 yamt
678 1.1 yamt CREATECMD_NOPARAM(c, "BEGIN READ ONLY");
679 1.1 yamt assert(!xc->in_trans);
680 1.1 yamt error = simplecmd(xc, c);
681 1.1 yamt assert(error == 0);
682 1.1 yamt assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
683 1.1 yamt xc->in_trans = true;
684 1.1 yamt return xc;
685 1.1 yamt }
686 1.1 yamt
687 1.1 yamt void
688 1.1 yamt rollback(struct Xconn *xc)
689 1.1 yamt {
690 1.1 yamt PGTransactionStatusType status;
691 1.1 yamt
692 1.1 yamt /*
693 1.1 yamt * check the status as we are not sure the status of our transaction
694 1.1 yamt * after a failed commit.
695 1.1 yamt */
696 1.1 yamt status = PQtransactionStatus(xc->conn);
697 1.1 yamt assert(status != PQTRANS_ACTIVE);
698 1.1 yamt assert(status != PQTRANS_UNKNOWN);
699 1.1 yamt if (status != PQTRANS_IDLE) {
700 1.1 yamt static struct cmd *c;
701 1.1 yamt int error;
702 1.1 yamt
703 1.1 yamt assert(status == PQTRANS_INTRANS || status == PQTRANS_INERROR);
704 1.1 yamt CREATECMD_NOPARAM(c, "ROLLBACK");
705 1.1 yamt error = simplecmd(xc, c);
706 1.1 yamt assert(error == 0);
707 1.1 yamt }
708 1.1 yamt DPRINTF("xc %p rollback %p\n", xc, xc->owner);
709 1.1 yamt relxc(xc);
710 1.1 yamt }
711 1.1 yamt
712 1.1 yamt int
713 1.1 yamt commit(struct Xconn *xc)
714 1.1 yamt {
715 1.1 yamt static struct cmd *c;
716 1.1 yamt int error;
717 1.1 yamt
718 1.1 yamt CREATECMD_NOPARAM(c, "COMMIT");
719 1.1 yamt error = simplecmd(xc, c);
720 1.1 yamt if (error == 0) {
721 1.1 yamt DPRINTF("xc %p commit %p\n", xc, xc->owner);
722 1.1 yamt relxc(xc);
723 1.1 yamt }
724 1.1 yamt return error;
725 1.1 yamt }
726 1.1 yamt
727 1.1 yamt int
728 1.1 yamt commit_sync(struct Xconn *xc)
729 1.1 yamt {
730 1.1 yamt static struct cmd *c;
731 1.1 yamt int error;
732 1.1 yamt
733 1.1 yamt assert(!pgfs_dosync);
734 1.1 yamt CREATECMD_NOPARAM(c, "SET LOCAL SYNCHRONOUS_COMMIT TO ON");
735 1.1 yamt error = simplecmd(xc, c);
736 1.1 yamt assert(error == 0);
737 1.1 yamt return commit(xc);
738 1.1 yamt }
739 1.1 yamt
740 1.1 yamt static void
741 1.1 yamt pgfs_notice_receiver(void *vp, const PGresult *res)
742 1.1 yamt {
743 1.1 yamt struct Xconn *xc = vp;
744 1.1 yamt
745 1.1 yamt assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR);
746 1.1 yamt fprintf(stderr, "got a notice on %p\n", xc);
747 1.1 yamt dumperror(xc, res);
748 1.1 yamt }
749 1.1 yamt
750 1.1 yamt static int
751 1.1 yamt pgfs_readframe(struct puffs_usermount *pu, struct puffs_framebuf *pufbuf,
752 1.1 yamt int fd, int *done)
753 1.1 yamt {
754 1.1 yamt struct Xconn *xc;
755 1.1 yamt PGconn *conn;
756 1.1 yamt
757 1.1 yamt TAILQ_FOREACH(xc, &xclist, list) {
758 1.1 yamt if (PQsocket(xc->conn) == fd) {
759 1.1 yamt break;
760 1.1 yamt }
761 1.1 yamt }
762 1.1 yamt assert(xc != NULL);
763 1.1 yamt conn = xc->conn;
764 1.1 yamt PQconsumeInput(conn);
765 1.1 yamt if (!PQisBusy(conn)) {
766 1.1 yamt if (xc->blocker != NULL) {
767 1.1 yamt DPRINTF("schedule %p\n", xc->blocker);
768 1.1 yamt puffs_cc_schedule(xc->blocker);
769 1.1 yamt } else {
770 1.1 yamt DPRINTF("no blockers\n");
771 1.1 yamt }
772 1.1 yamt }
773 1.1 yamt *done = 0;
774 1.1 yamt return 0;
775 1.1 yamt }
776 1.1 yamt
777 1.1 yamt int
778 1.1 yamt pgfs_connectdb(struct puffs_usermount *pu, const char *dbname,
779 1.1 yamt const char *dbuser, bool debug, bool synchronous, unsigned int nconn)
780 1.1 yamt {
781 1.1 yamt const char *keywords[3+1];
782 1.1 yamt const char *values[3];
783 1.1 yamt unsigned int i;
784 1.1 yamt
785 1.1 yamt if (nconn > 32) {
786 1.1 yamt /*
787 1.1 yamt * limit from sizeof(cmd->prepared_mask)
788 1.1 yamt */
789 1.1 yamt return EINVAL;
790 1.1 yamt }
791 1.1 yamt if (debug) {
792 1.1 yamt pgfs_dodprintf = true;
793 1.1 yamt }
794 1.1 yamt if (synchronous) {
795 1.1 yamt pgfs_dosync = true;
796 1.1 yamt }
797 1.1 yamt i = 0;
798 1.1 yamt if (dbname != NULL) {
799 1.1 yamt keywords[i] = "dbname";
800 1.1 yamt values[i] = dbname;
801 1.1 yamt i++;
802 1.1 yamt }
803 1.1 yamt if (dbuser != NULL) {
804 1.1 yamt keywords[i] = "user";
805 1.1 yamt values[i] = dbuser;
806 1.1 yamt i++;
807 1.1 yamt }
808 1.1 yamt keywords[i] = "application_name";
809 1.1 yamt values[i] = "pgfs";
810 1.1 yamt i++;
811 1.1 yamt keywords[i] = NULL;
812 1.1 yamt puffs_framev_init(pu, pgfs_readframe, NULL, NULL, NULL, NULL);
813 1.1 yamt for (i = 0; i < nconn; i++) {
814 1.1 yamt struct Xconn *xc;
815 1.1 yamt struct Xconn *xc2;
816 1.1 yamt static int xcid;
817 1.1 yamt PGconn *conn;
818 1.1 yamt struct cmd *c;
819 1.1 yamt int error;
820 1.1 yamt
821 1.1 yamt conn = PQconnectdbParams(keywords, values, 0);
822 1.1 yamt if (conn == NULL) {
823 1.1 yamt errx(EXIT_FAILURE,
824 1.1 yamt "PQconnectdbParams: unknown failure");
825 1.1 yamt }
826 1.1 yamt if (PQstatus(conn) != CONNECTION_OK) {
827 1.1 yamt /*
828 1.1 yamt * XXX sleep and retry on ERRCODE_CANNOT_CONNECT_NOW
829 1.1 yamt */
830 1.1 yamt errx(EXIT_FAILURE, "PQconnectdbParams: %s",
831 1.1 yamt PQerrorMessage(conn));
832 1.1 yamt }
833 1.1 yamt DPRINTF("protocol version %d\n", PQprotocolVersion(conn));
834 1.1 yamt puffs_framev_addfd(pu, PQsocket(conn), PUFFS_FBIO_READ);
835 1.1 yamt xc = emalloc(sizeof(*xc));
836 1.1 yamt xc->conn = conn;
837 1.1 yamt xc->blocker = NULL;
838 1.1 yamt xc->owner = NULL;
839 1.1 yamt xc->in_trans = false;
840 1.1 yamt xc->id = xcid++;
841 1.1 yamt assert(xc->id < 32);
842 1.1 yamt PQsetNoticeReceiver(conn, pgfs_notice_receiver, xc);
843 1.1 yamt TAILQ_INSERT_HEAD(&xclist, xc, list);
844 1.1 yamt xc2 = begin(pu);
845 1.1 yamt assert(xc2 == xc);
846 1.1 yamt c = createcmd("SET search_path TO pgfs", CMD_NOPREPARE);
847 1.1 yamt error = simplecmd(xc, c);
848 1.1 yamt assert(error == 0);
849 1.1 yamt freecmd(c);
850 1.1 yamt c = createcmd("SET SESSION CHARACTERISTICS AS "
851 1.1 yamt "TRANSACTION ISOLATION LEVEL REPEATABLE READ",
852 1.1 yamt CMD_NOPREPARE);
853 1.1 yamt error = simplecmd(xc, c);
854 1.1 yamt assert(error == 0);
855 1.1 yamt freecmd(c);
856 1.1 yamt c = createcmd("SET SESSION TIME ZONE UTC", CMD_NOPREPARE);
857 1.1 yamt error = simplecmd(xc, c);
858 1.1 yamt assert(error == 0);
859 1.1 yamt freecmd(c);
860 1.1 yamt if (!pgfs_dosync) {
861 1.1 yamt c = createcmd("SET SESSION SYNCHRONOUS_COMMIT TO OFF",
862 1.1 yamt CMD_NOPREPARE);
863 1.1 yamt error = simplecmd(xc, c);
864 1.1 yamt assert(error == 0);
865 1.1 yamt freecmd(c);
866 1.1 yamt }
867 1.1 yamt if (debug) {
868 1.1 yamt struct fetchstatus s;
869 1.1 yamt static const Oid types[] = { INT8OID, };
870 1.1 yamt uint64_t pid;
871 1.1 yamt
872 1.1 yamt c = createcmd("SELECT pg_backend_pid()::int8;",
873 1.1 yamt CMD_NOPREPARE);
874 1.1 yamt error = sendcmd(xc, c);
875 1.1 yamt assert(error == 0);
876 1.1 yamt fetchinit(&s, xc);
877 1.1 yamt error = FETCHNEXT(&s, types, &pid);
878 1.1 yamt fetchdone(&s);
879 1.1 yamt assert(error == 0);
880 1.1 yamt DPRINTF("xc %p backend pid %" PRIu64 "\n", xc, pid);
881 1.1 yamt }
882 1.1 yamt error = commit(xc);
883 1.1 yamt assert(error == 0);
884 1.1 yamt assert(xc->owner == NULL);
885 1.1 yamt }
886 1.1 yamt /*
887 1.1 yamt * XXX cleanup unlinked files here? what to do when the filesystem
888 1.1 yamt * is shared?
889 1.1 yamt */
890 1.1 yamt return 0;
891 1.1 yamt }
892 1.1 yamt
893 1.1 yamt struct waitq flushwaitq = TAILQ_HEAD_INITIALIZER(flushwaitq);
894 1.1 yamt struct puffs_cc *flusher = NULL;
895 1.1 yamt
896 1.1 yamt int
897 1.1 yamt flush_xacts(struct puffs_usermount *pu)
898 1.1 yamt {
899 1.1 yamt struct puffs_cc *cc = puffs_cc_getcc(pu);
900 1.1 yamt struct Xconn *xc;
901 1.1 yamt static struct cmd *c;
902 1.1 yamt uint64_t dummy;
903 1.1 yamt int error;
904 1.1 yamt
905 1.1 yamt /*
906 1.1 yamt * flush all previously issued asynchronous transactions.
907 1.1 yamt *
908 1.1 yamt * XXX
909 1.1 yamt * unfortunately it seems that there is no clean way to tell
910 1.1 yamt * PostgreSQL flush XLOG. we could perform a CHECKPOINT but it's
911 1.1 yamt * too expensive and overkill for our purpose.
912 1.1 yamt * besides, PostgreSQL has an optimization to skip XLOG flushing
913 1.1 yamt * for transactions which didn't produce WAL records.
914 1.1 yamt * (changeset f6a0863e3cb72763490ceca2c558d5ef2dddd5f2)
915 1.1 yamt * it means that an empty transaction ("BEGIN; COMMIT;"), which
916 1.1 yamt * doesn't produce any WAL records, doesn't flush the XLOG even if
917 1.1 yamt * synchronous_commit=on. we issues a dummy setval() to avoid the
918 1.1 yamt * optimization.
919 1.1 yamt * on the other hand, we try to avoid creating unnecessary WAL activity
920 1.1 yamt * by serializing flushing and checking XLOG locations.
921 1.1 yamt */
922 1.1 yamt
923 1.1 yamt assert(!pgfs_dosync);
924 1.1 yamt if (flusher != NULL) { /* serialize flushers */
925 1.1 yamt DPRINTF("%p flush in progress %p\n", cc, flusher);
926 1.1 yamt waiton(&flushwaitq, cc);
927 1.1 yamt assert(flusher == NULL);
928 1.1 yamt }
929 1.1 yamt DPRINTF("%p start flushing\n", cc);
930 1.1 yamt flusher = cc;
931 1.1 yamt retry:
932 1.1 yamt xc = begin(pu);
933 1.1 yamt CREATECMD_NOPARAM(c, "SELECT setval('dummyseq', 1) WHERE "
934 1.1 yamt "pg_current_xlog_insert_location() <> pg_current_xlog_location()");
935 1.1 yamt error = sendcmd(xc, c);
936 1.1 yamt if (error != 0) {
937 1.1 yamt goto got_error;
938 1.1 yamt }
939 1.1 yamt error = simplefetch(xc, INT8OID, &dummy);
940 1.1 yamt assert(error != 0 || dummy == 1);
941 1.1 yamt if (error == ENOENT) {
942 1.1 yamt /*
943 1.1 yamt * there seems to be nothing to flush.
944 1.1 yamt */
945 1.1 yamt DPRINTF("%p no sync\n", cc);
946 1.1 yamt error = 0;
947 1.1 yamt }
948 1.1 yamt if (error != 0) {
949 1.1 yamt goto got_error;
950 1.1 yamt }
951 1.1 yamt error = commit_sync(xc);
952 1.1 yamt if (error != 0) {
953 1.1 yamt goto got_error;
954 1.1 yamt }
955 1.1 yamt goto done;
956 1.1 yamt got_error:
957 1.1 yamt rollback(xc);
958 1.1 yamt if (error == EAGAIN) {
959 1.1 yamt goto retry;
960 1.1 yamt }
961 1.1 yamt done:
962 1.1 yamt assert(flusher == cc);
963 1.1 yamt flusher = NULL;
964 1.1 yamt wakeup_one(&flushwaitq);
965 1.1 yamt DPRINTF("%p end flushing error=%d\n", cc, error);
966 1.1 yamt return error;
967 1.1 yamt }
968