pgfs_subs.c revision 1.2 1 /* $NetBSD: pgfs_subs.c,v 1.2 2011/10/12 16:24:39 yamt Exp $ */
2
3 /*-
4 * Copyright (c)2010,2011 YAMAMOTO Takashi,
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 * SUCH DAMAGE.
27 */
28
29 /*
30 * a file system server which stores the data in a PostgreSQL database.
31 */
32
33 /*
34 * we use large objects to store file contents. there are a few XXXs wrt it.
35 *
36 * - large objects don't obey the normal transaction semantics.
37 *
38 * - we use large object server-side functions directly (instead of via the
39 * libpq large object api) because:
40 * - we want to use asynchronous (in the sense of PQsendFoo) operations
41 * which is not available with the libpq large object api.
42 * - with the libpq large object api, there's no way to know details of
43 * an error because PGresult is freed in the library without saving
44 * PG_DIAG_SQLSTATE etc.
45 */
46
47 #include <sys/cdefs.h>
48 #ifndef lint
49 __RCSID("$NetBSD: pgfs_subs.c,v 1.2 2011/10/12 16:24:39 yamt Exp $");
50 #endif /* not lint */
51
52 #include <assert.h>
53 #include <err.h>
54 #include <errno.h>
55 #include <puffs.h>
56 #include <inttypes.h>
57 #include <stdarg.h>
58 #include <stdbool.h>
59 #include <stdio.h>
60 #include <stdlib.h>
61 #include <time.h>
62 #include <util.h>
63
64 #include <libpq-fe.h>
65 #include <libpq/libpq-fs.h> /* INV_* */
66
67 #include "pgfs.h"
68 #include "pgfs_db.h"
69 #include "pgfs_debug.h"
70 #include "pgfs_waitq.h"
71 #include "pgfs_subs.h"
72
73 const char * const vtype_table[] = {
74 [VREG] = "regular",
75 [VDIR] = "directory",
76 [VLNK] = "link",
77 };
78
79 static unsigned int
80 tovtype(const char *type)
81 {
82 unsigned int i;
83
84 for (i = 0; i < __arraycount(vtype_table); i++) {
85 if (vtype_table[i] == NULL) {
86 continue;
87 }
88 if (!strcmp(type, vtype_table[i])) {
89 return i;
90 }
91 }
92 assert(0);
93 return 0;
94 }
95
96 static const char *
97 fromvtype(enum vtype vtype)
98 {
99
100 if (vtype < __arraycount(vtype_table)) {
101 assert(vtype_table[vtype] != NULL);
102 return vtype_table[vtype];
103 }
104 return NULL;
105 }
106
107 /*
108 * fileid_lock stuff below is to keep ordering of operations for a file.
109 * it is a workaround for the lack of operation barriers in the puffs
110 * protocol.
111 *
112 * currently we do this locking only for SETATTR, GETATTR, and WRITE as
113 * they are known to be reorder-unsafe. they are sensitive to the file
114 * attributes, mainly the file size. note that as the kernel issues async
115 * SETATTR/WRITE requests, vnode lock doesn't prevent GETATTR from seeing
116 * the stale attributes.
117 *
118 * we are relying on waiton/wakeup being a FIFO.
119 */
120
121 struct fileid_lock_handle {
122 TAILQ_ENTRY(fileid_lock_handle) list;
123 fileid_t fileid;
124 struct puffs_cc *owner; /* diagnostic only */
125 struct waitq waitq;
126 };
127
128 TAILQ_HEAD(, fileid_lock_handle) fileid_lock_list =
129 TAILQ_HEAD_INITIALIZER(fileid_lock_list);
130 struct waitq fileid_lock_waitq = TAILQ_HEAD_INITIALIZER(fileid_lock_waitq);
131
132 /*
133 * fileid_lock: serialize requests for the fileid.
134 *
135 * this function should be the first yieldable point in a puffs callback.
136 */
137
138 struct fileid_lock_handle *
139 fileid_lock(fileid_t fileid, struct puffs_cc *cc)
140 {
141 struct fileid_lock_handle *lock;
142
143 TAILQ_FOREACH(lock, &fileid_lock_list, list) {
144 if (lock->fileid == fileid) {
145 DPRINTF("fileid wait %" PRIu64 " cc %p\n", fileid, cc);
146 assert(lock->owner != cc);
147 waiton(&lock->waitq, cc); /* enter FIFO */
148 assert(lock->owner == cc);
149 return lock;
150 }
151 }
152 lock = emalloc(sizeof(*lock));
153 lock->fileid = fileid;
154 lock->owner = cc;
155 DPRINTF("fileid lock %" PRIu64 " cc %p\n", lock->fileid, cc);
156 waitq_init(&lock->waitq);
157 TAILQ_INSERT_HEAD(&fileid_lock_list, lock, list);
158 return lock;
159 }
160
161 void
162 fileid_unlock(struct fileid_lock_handle *lock)
163 {
164
165 DPRINTF("fileid unlock %" PRIu64 "\n", lock->fileid);
166 assert(lock != NULL);
167 assert(lock->owner != NULL);
168 /*
169 * perform direct-handoff to the first waiter.
170 *
171 * a handoff is essential to keep the order of requests.
172 */
173 lock->owner = wakeup_one(&lock->waitq);
174 if (lock->owner != NULL) {
175 return;
176 }
177 /*
178 * no one is waiting this fileid.
179 */
180 TAILQ_REMOVE(&fileid_lock_list, lock, list);
181 free(lock);
182 }
183
184 /*
185 * timespec_to_pgtimestamp: create a text representation of timestamp which
186 * can be recognized by the database server.
187 *
188 * it's caller's responsibility to free(3) the result.
189 */
190
191 int
192 timespec_to_pgtimestamp(const struct timespec *tv, char **resultp)
193 {
194 /*
195 * XXX is there any smarter way?
196 */
197 char buf1[1024];
198 char buf2[1024];
199 struct tm tm_store;
200 struct tm *tm;
201
202 tm = gmtime_r(&tv->tv_sec, &tm_store);
203 if (tm == NULL) {
204 assert(errno != 0);
205 return errno;
206 }
207 strftime(buf1, sizeof(buf1), "%Y%m%dT%H%M%S", tm);
208 snprintf(buf2, sizeof(buf2), "%s.%ju", buf1,
209 (uintmax_t)tv->tv_nsec / 1000);
210 *resultp = estrdup(buf2);
211 return 0;
212 }
213
214 int
215 my_lo_truncate(struct Xconn *xc, int32_t fd, int32_t size)
216 {
217 static struct cmd *c;
218 int32_t ret;
219 int error;
220
221 CREATECMD(c, "SELECT lo_truncate($1, $2)", INT4OID, INT4OID);
222 error = sendcmd(xc, c, fd, size);
223 if (error != 0) {
224 return error;
225 }
226 error = simplefetch(xc, INT4OID, &ret);
227 if (error != 0) {
228 if (error == EEXIST) {
229 /*
230 * probably the insertion of the new-sized page
231 * caused a duplicated key error. retry.
232 */
233 DPRINTF("map EEXIST to EAGAIN\n");
234 error = EAGAIN;
235 }
236 return error;
237 }
238 assert(ret == 0);
239 return 0;
240 }
241
242 int
243 my_lo_lseek(struct Xconn *xc, int32_t fd, int32_t offset, int32_t whence,
244 int32_t *retp)
245 {
246 static struct cmd *c;
247 int32_t ret;
248 int error;
249
250 CREATECMD(c, "SELECT lo_lseek($1, $2, $3)", INT4OID, INT4OID, INT4OID);
251 error = sendcmd(xc, c, fd, offset, whence);
252 if (error != 0) {
253 return error;
254 }
255 error = simplefetch(xc, INT4OID, &ret);
256 if (error != 0) {
257 return error;
258 }
259 if (retp != NULL) {
260 *retp = ret;
261 }
262 return 0;
263 }
264
265 int
266 my_lo_read(struct Xconn *xc, int32_t fd, void *buf, size_t size,
267 size_t *resultsizep)
268 {
269 static struct cmd *c;
270 size_t resultsize;
271 int error;
272
273 CREATECMD(c, "SELECT loread($1, $2)", INT4OID, INT4OID);
274 error = sendcmdx(xc, 1, c, fd, (int32_t)size);
275 if (error != 0) {
276 return error;
277 }
278 error = simplefetch(xc, BYTEA, buf, &resultsize);
279 if (error != 0) {
280 return error;
281 }
282 *resultsizep = resultsize;
283 if (size != resultsize) {
284 DPRINTF("shortread? %zu != %zu\n", size, resultsize);
285 }
286 return 0;
287 }
288
289 int
290 my_lo_write(struct Xconn *xc, int32_t fd, const void *buf, size_t size,
291 size_t *resultsizep)
292 {
293 static struct cmd *c;
294 int32_t resultsize;
295 int error;
296
297 CREATECMD(c, "SELECT lowrite($1, $2)", INT4OID, BYTEA);
298 error = sendcmd(xc, c, fd, buf, (int32_t)size);
299 if (error != 0) {
300 return error;
301 }
302 error = simplefetch(xc, INT4OID, &resultsize);
303 if (error != 0) {
304 if (error == EEXIST) {
305 /*
306 * probably the insertion of the new data page
307 * caused a duplicated key error. retry.
308 */
309 DPRINTF("map EEXIST to EAGAIN\n");
310 error = EAGAIN;
311 }
312 return error;
313 }
314 *resultsizep = resultsize;
315 if (size != (size_t)resultsize) {
316 DPRINTF("shortwrite? %zu != %zu\n", size, (size_t)resultsize);
317 }
318 return 0;
319 }
320
321 int
322 my_lo_open(struct Xconn *xc, Oid loid, int32_t mode, int32_t *fdp)
323 {
324 static struct cmd *c;
325 int error;
326
327 CREATECMD(c, "SELECT lo_open($1, $2)", OIDOID, INT4OID);
328 error = sendcmd(xc, c, loid, mode);
329 if (error != 0) {
330 return error;
331 }
332 return simplefetch(xc, INT4OID, fdp);
333 }
334
335 int
336 my_lo_close(struct Xconn *xc, int32_t fd)
337 {
338 static struct cmd *c;
339 int32_t ret;
340 int error;
341
342 CREATECMD(c, "SELECT lo_close($1)", INT4OID);
343 error = sendcmd(xc, c, fd);
344 if (error != 0) {
345 return error;
346 }
347 error = simplefetch(xc, INT4OID, &ret);
348 if (error != 0) {
349 return error;
350 }
351 assert(ret == 0);
352 return 0;
353 }
354
355 static int
356 lo_lookup_by_fileid(struct Xconn *xc, fileid_t fileid, Oid *idp)
357 {
358 static struct cmd *c;
359 static const Oid types[] = { OIDOID, };
360 struct fetchstatus s;
361 int error;
362
363 CREATECMD(c, "SELECT loid FROM datafork WHERE fileid = $1", INT8OID);
364 error = sendcmd(xc, c, fileid);
365 if (error != 0) {
366 return error;
367 }
368 fetchinit(&s, xc);
369 error = FETCHNEXT(&s, types, idp);
370 fetchdone(&s);
371 DPRINTF("error %d\n", error);
372 return error;
373 }
374
375 int
376 lo_open_by_fileid(struct Xconn *xc, fileid_t fileid, int mode, int *fdp)
377 {
378 Oid loid;
379 int fd;
380 int error;
381
382 error = lo_lookup_by_fileid(xc, fileid, &loid);
383 if (error != 0) {
384 return error;
385 }
386 error = my_lo_open(xc, loid, mode, &fd);
387 if (error != 0) {
388 return error;
389 }
390 *fdp = fd;
391 return 0;
392 }
393
394 static int
395 getsize(struct Xconn *xc, fileid_t fileid, int *resultp)
396 {
397 int32_t size;
398 int fd;
399 int error;
400
401 error = lo_open_by_fileid(xc, fileid, INV_READ, &fd);
402 if (error != 0) {
403 return error;
404 }
405 error = my_lo_lseek(xc, fd, 0, SEEK_END, &size);
406 if (error != 0) {
407 return error;
408 }
409 error = my_lo_close(xc, fd);
410 if (error != 0) {
411 return error;
412 }
413 *resultp = size;
414 return 0;
415 }
416
417 #define GETATTR_TYPE 0x00000001
418 #define GETATTR_NLINK 0x00000002
419 #define GETATTR_SIZE 0x00000004
420 #define GETATTR_MODE 0x00000008
421 #define GETATTR_UID 0x00000010
422 #define GETATTR_GID 0x00000020
423 #define GETATTR_TIME 0x00000040
424 #define GETATTR_ALL \
425 (GETATTR_TYPE|GETATTR_NLINK|GETATTR_SIZE|GETATTR_MODE| \
426 GETATTR_UID|GETATTR_GID|GETATTR_TIME)
427
428 int
429 getattr(struct Xconn *xc, fileid_t fileid, struct vattr *va, unsigned int mask)
430 {
431 char *type;
432 long long atime_s;
433 long long atime_us;
434 long long ctime_s;
435 long long ctime_us;
436 long long mtime_s;
437 long long mtime_us;
438 long long btime_s;
439 long long btime_us;
440 uint64_t mode;
441 long long uid;
442 long long gid;
443 long long nlink;
444 long long rev;
445 struct fetchstatus s;
446 int error;
447
448 if (mask == 0) {
449 return 0;
450 }
451 /*
452 * unless explicitly requested, avoid fetching timestamps as they
453 * are a little more expensive than other simple attributes.
454 */
455 if ((mask & GETATTR_TIME) != 0) {
456 static struct cmd *c;
457 static const Oid types[] = {
458 TEXTOID,
459 INT8OID,
460 INT8OID,
461 INT8OID,
462 INT8OID,
463 INT8OID,
464 INT8OID,
465 INT8OID,
466 INT8OID,
467 INT8OID,
468 INT8OID,
469 INT8OID,
470 INT8OID,
471 INT8OID,
472 };
473
474 CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev, "
475 "extract(epoch from date_trunc('second', atime))::int8, "
476 "extract(microseconds from atime)::int8, "
477 "extract(epoch from date_trunc('second', ctime))::int8, "
478 "extract(microseconds from ctime)::int8, "
479 "extract(epoch from date_trunc('second', mtime))::int8, "
480 "extract(microseconds from mtime)::int8, "
481 "extract(epoch from date_trunc('second', btime))::int8, "
482 "extract(microseconds from btime)::int8 "
483 "FROM file "
484 "WHERE fileid = $1", INT8OID);
485 error = sendcmd(xc, c, fileid);
486 if (error != 0) {
487 return error;
488 }
489 fetchinit(&s, xc);
490 error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink,
491 &rev,
492 &atime_s, &atime_us,
493 &ctime_s, &ctime_us,
494 &mtime_s, &mtime_us,
495 &btime_s, &btime_us);
496 } else {
497 static struct cmd *c;
498 static const Oid types[] = {
499 TEXTOID,
500 INT8OID,
501 INT8OID,
502 INT8OID,
503 INT8OID,
504 INT8OID,
505 };
506
507 CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev "
508 "FROM file "
509 "WHERE fileid = $1", INT8OID);
510 error = sendcmd(xc, c, fileid);
511 if (error != 0) {
512 return error;
513 }
514 fetchinit(&s, xc);
515 error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink,
516 &rev);
517 }
518 fetchdone(&s);
519 if (error != 0) {
520 return error;
521 }
522 memset(va, 0xaa, sizeof(*va)); /* fill with garbage for debug */
523 va->va_type = tovtype(type);
524 free(type);
525 va->va_mode = mode;
526 va->va_uid = uid;
527 va->va_gid = gid;
528 if (nlink > 0 && va->va_type == VDIR) {
529 nlink++; /* "." */
530 }
531 va->va_nlink = nlink;
532 va->va_fileid = fileid;
533 va->va_atime.tv_sec = atime_s;
534 va->va_atime.tv_nsec = atime_us * 1000;
535 va->va_ctime.tv_sec = ctime_s;
536 va->va_ctime.tv_nsec = ctime_us * 1000;
537 va->va_mtime.tv_sec = mtime_s;
538 va->va_mtime.tv_nsec = mtime_us * 1000;
539 va->va_birthtime.tv_sec = btime_s;
540 va->va_birthtime.tv_nsec = btime_us * 1000;
541 va->va_blocksize = LOBLKSIZE;
542 va->va_gen = 1;
543 va->va_filerev = rev;
544 if ((mask & GETATTR_SIZE) != 0) {
545 int size;
546
547 size = 0;
548 if (va->va_type == VREG || va->va_type == VLNK) {
549 error = getsize(xc, fileid, &size);
550 if (error != 0) {
551 return error;
552 }
553 } else if (va->va_type == VDIR) {
554 size = 100; /* XXX */
555 }
556 va->va_size = size;
557 }
558 /*
559 * XXX va_bytes: likely wrong due to toast compression.
560 * there's no cheap way to get the compressed size of LO.
561 */
562 va->va_bytes = va->va_size;
563 va->va_flags = 0;
564 return 0;
565 }
566
567 int
568 update_mctime(struct Xconn *xc, fileid_t fileid)
569 {
570 static struct cmd *c;
571
572 CREATECMD(c,
573 "UPDATE file "
574 "SET mtime = current_timestamp, ctime = current_timestamp, "
575 "rev = rev + 1 "
576 "WHERE fileid = $1", INT8OID);
577 return simplecmd(xc, c, fileid);
578 }
579
580 int
581 update_atime(struct Xconn *xc, fileid_t fileid)
582 {
583 static struct cmd *c;
584
585 CREATECMD(c,
586 "UPDATE file SET atime = current_timestamp WHERE fileid = $1",
587 INT8OID);
588 return simplecmd(xc, c, fileid);
589 }
590
591 int
592 update_mtime(struct Xconn *xc, fileid_t fileid)
593 {
594 static struct cmd *c;
595
596 CREATECMD(c,
597 "UPDATE file "
598 "SET mtime = current_timestamp, rev = rev + 1 "
599 "WHERE fileid = $1", INT8OID);
600 return simplecmd(xc, c, fileid);
601 }
602
603 int
604 update_ctime(struct Xconn *xc, fileid_t fileid)
605 {
606 static struct cmd *c;
607
608 CREATECMD(c,
609 "UPDATE file SET ctime = current_timestamp WHERE fileid = $1",
610 INT8OID);
611 return simplecmd(xc, c, fileid);
612 }
613
614 int
615 update_nlink(struct Xconn *xc, fileid_t fileid, int delta)
616 {
617 static struct cmd *c;
618
619 CREATECMD(c,
620 "UPDATE file "
621 "SET nlink = nlink + $1 "
622 "WHERE fileid = $2",
623 INT8OID, INT8OID);
624 return simplecmd(xc, c, (int64_t)delta, fileid);
625 }
626
627 int
628 lookupp(struct Xconn *xc, fileid_t fileid, fileid_t *parent)
629 {
630 static struct cmd *c;
631 static const Oid types[] = { INT8OID, };
632 struct fetchstatus s;
633 int error;
634
635 CREATECMD(c, "SELECT parent_fileid FROM dirent "
636 "WHERE child_fileid = $1 LIMIT 1", INT8OID);
637 error = sendcmd(xc, c, fileid);
638 if (error != 0) {
639 return error;
640 }
641 fetchinit(&s, xc);
642 error = FETCHNEXT(&s, types, parent);
643 fetchdone(&s);
644 if (error != 0) {
645 return error;
646 }
647 return 0;
648 }
649
650 int
651 mkfile(struct Xconn *xc, enum vtype vtype, mode_t mode, uid_t uid, gid_t gid,
652 fileid_t *idp)
653 {
654 static struct cmd *c;
655 const char *type;
656 int error;
657
658 type = fromvtype(vtype);
659 if (type == NULL) {
660 return EOPNOTSUPP;
661 }
662 CREATECMD(c,
663 "INSERT INTO file "
664 "(fileid, type, mode, uid, gid, nlink, rev, "
665 "atime, ctime, mtime, btime) "
666 "VALUES(nextval('fileid_seq'), $1::filetype, $2, $3, $4, 0, 0, "
667 "current_timestamp, "
668 "current_timestamp, "
669 "current_timestamp, "
670 "current_timestamp) "
671 "RETURNING fileid", TEXTOID, INT8OID, INT8OID, INT8OID);
672 error = sendcmd(xc, c, type, (uint64_t)mode, (uint64_t)uid,
673 (uint64_t)gid);
674 if (error != 0) {
675 return error;
676 }
677 return simplefetch(xc, INT8OID, idp);
678 }
679
680 int
681 linkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child)
682 {
683 static struct cmd *c;
684 int error;
685
686 CREATECMD(c,
687 "INSERT INTO dirent "
688 "(parent_fileid, name, child_fileid) "
689 "VALUES($1, $2, $3)", INT8OID, TEXTOID, INT8OID);
690 error = simplecmd(xc, c, parent, name, child);
691 if (error != 0) {
692 return error;
693 }
694 error = update_nlink(xc, child, 1);
695 if (error != 0) {
696 return error;
697 }
698 return update_mtime(xc, parent);
699 }
700
701 int
702 unlinkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child)
703 {
704 static struct cmd *c;
705 int error;
706
707 /*
708 * in addition to the primary key, we check child_fileid as well here
709 * to avoid removing an entry which was appeared after our VOP_LOOKUP.
710 */
711 CREATECMD(c,
712 "DELETE FROM dirent "
713 "WHERE parent_fileid = $1 AND name = $2 AND child_fileid = $3",
714 INT8OID, TEXTOID, INT8OID);
715 error = simplecmd(xc, c, parent, name, child);
716 if (error != 0) {
717 return error;
718 }
719 error = update_nlink(xc, child, -1);
720 if (error != 0) {
721 return error;
722 }
723 error = update_mtime(xc, parent);
724 if (error != 0) {
725 return error;
726 }
727 return update_ctime(xc, child);
728 }
729
730 int
731 mklinkfile(struct Xconn *xc, fileid_t parent, const char *name,
732 enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *idp)
733 {
734 fileid_t fileid;
735 int error;
736
737 error = mkfile(xc, vtype, mode, uid, gid, &fileid);
738 if (error != 0) {
739 return error;
740 }
741 error = linkfile(xc, parent, name, fileid);
742 if (error != 0) {
743 return error;
744 }
745 if (idp != NULL) {
746 *idp = fileid;
747 }
748 return 0;
749 }
750
751 int
752 mklinkfile_lo(struct Xconn *xc, fileid_t parent_fileid, const char *name,
753 enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *fileidp,
754 int *loidp)
755 {
756 static struct cmd *c;
757 fileid_t new_fileid;
758 int loid;
759 int error;
760
761 error = mklinkfile(xc, parent_fileid, name, vtype, mode, uid, gid,
762 &new_fileid);
763 if (error != 0) {
764 return error;
765 }
766 CREATECMD(c,
767 "INSERT INTO datafork (fileid, loid) "
768 "VALUES($1, lo_creat(-1)) "
769 "RETURNING loid", INT8OID);
770 error = sendcmd(xc, c, new_fileid);
771 if (error != 0) {
772 return error;
773 }
774 error = simplefetch(xc, OIDOID, &loid);
775 if (error != 0) {
776 return error;
777 }
778 if (fileidp != NULL) {
779 *fileidp = new_fileid;
780 }
781 if (loidp != NULL) {
782 *loidp = loid;
783 }
784 return 0;
785 }
786
787 int
788 cleanupfile(struct Xconn *xc, fileid_t fileid, struct vattr *va)
789 {
790 static struct cmd *c;
791
792 /*
793 * XXX what to do when the filesystem is shared?
794 */
795
796 if (va->va_type == VREG || va->va_type == VLNK) {
797 static struct cmd *c_datafork;
798 int32_t ret;
799 int error;
800
801 CREATECMD(c_datafork,
802 "WITH loids AS (DELETE FROM datafork WHERE fileid = $1 "
803 "RETURNING loid) SELECT lo_unlink(loid) FROM loids",
804 INT8OID);
805 error = sendcmd(xc, c_datafork, fileid);
806 if (error != 0) {
807 return error;
808 }
809 error = simplefetch(xc, INT4OID, &ret);
810 if (error != 0) {
811 return error;
812 }
813 if (ret != 1) {
814 return EIO; /* lo_unlink failed */
815 }
816 }
817 CREATECMD(c, "DELETE FROM file WHERE fileid = $1", INT8OID);
818 return simplecmd(xc, c, fileid);
819 }
820
821 /*
822 * check_path: do locking and check to prevent a rename from creating loop.
823 *
824 * lock the dirents between child_fileid and the root directory.
825 * if gate_fileid is appeared in the path, return EINVAL.
826 * caller should ensure that child_fileid is of VDIR beforehand.
827 *
828 * we uses FOR SHARE row level locks as poor man's predicate locks.
829 *
830 * the following is an example to show why we need to lock the path.
831 *
832 * consider:
833 * "mkdir -p /a/b/c/d/e/f && mkdir -p /1/2/3/4/5/6"
834 * and then
835 * thread 1 is doing "mv /a/b /1/2/3/4/5/6"
836 * thread 2 is doing "mv /1/2 /a/b/c/d/e/f"
837 *
838 * a possible consequence:
839 * thread 1: check_path -> success
840 * thread 2: check_path -> success
841 * thread 1: modify directories -> block on row-level lock
842 * thread 2: modify directories -> block on row-level lock
843 * -> deadlock detected
844 * -> rollback and retry
845 *
846 * another possible consequence:
847 * thread 1: check_path -> success
848 * thread 1: modify directory entries -> success
849 * thread 2: check_path -> block on row-level lock
850 * thread 1: commit
851 * thread 2: acquire the lock and notices the row is updated
852 * -> serialization error
853 * -> rollback and retry
854 *
855 * XXX it might be better to use real serializable transactions,
856 * which will be available for PostgreSQL 9.1
857 */
858
859 int
860 check_path(struct Xconn *xc, fileid_t gate_fileid, fileid_t child_fileid)
861 {
862 static struct cmd *c;
863 fileid_t parent_fileid;
864 struct fetchstatus s;
865 int error;
866
867 CREATECMD(c,
868 "WITH RECURSIVE r AS "
869 "( "
870 "SELECT parent_fileid, cookie, child_fileid "
871 "FROM dirent "
872 "WHERE child_fileid = $1 "
873 "UNION ALL "
874 "SELECT d.parent_fileid, d.cookie, "
875 "d.child_fileid "
876 "FROM dirent AS d INNER JOIN r "
877 "ON d.child_fileid = r.parent_fileid "
878 ") "
879 "SELECT d.parent_fileid "
880 "FROM dirent d "
881 "JOIN r "
882 "ON d.cookie = r.cookie "
883 "FOR SHARE", INT8OID);
884 error = sendcmd(xc, c, child_fileid);
885 if (error != 0) {
886 return error;
887 }
888 fetchinit(&s, xc);
889 do {
890 static const Oid types[] = { INT8OID, };
891
892 error = FETCHNEXT(&s, types, &parent_fileid);
893 if (error == ENOENT) {
894 fetchdone(&s);
895 return 0;
896 }
897 if (error != 0) {
898 fetchdone(&s);
899 return error;
900 }
901 } while (gate_fileid != parent_fileid);
902 fetchdone(&s);
903 return EINVAL;
904 }
905
906 int
907 isempty(struct Xconn *xc, fileid_t fileid, bool *emptyp)
908 {
909 fileid_t dummy;
910 static struct cmd *c;
911 static const Oid types[] = { INT8OID, };
912 struct fetchstatus s;
913 int error;
914
915 CREATECMD(c,
916 "SELECT 1 FROM dirent "
917 "WHERE parent_fileid = $1 LIMIT 1", INT8OID);
918 error = sendcmd(xc, c, fileid);
919 if (error != 0) {
920 return error;
921 }
922 fetchinit(&s, xc);
923 error = FETCHNEXT(&s, types, &dummy);
924 fetchdone(&s);
925 assert(error != 0 || dummy == 1);
926 if (error == ENOENT) {
927 *emptyp = true;
928 error = 0;
929 } else {
930 *emptyp = false;
931 }
932 return error;
933 }
934