Home | History | Annotate | Line # | Download | only in pgfs
      1 /*	$NetBSD: pgfs_subs.c,v 1.5 2012/04/11 14:28:18 yamt Exp $	*/
      2 
      3 /*-
      4  * Copyright (c)2010,2011 YAMAMOTO Takashi,
      5  * All rights reserved.
      6  *
      7  * Redistribution and use in source and binary forms, with or without
      8  * modification, are permitted provided that the following conditions
      9  * are met:
     10  * 1. Redistributions of source code must retain the above copyright
     11  *    notice, this list of conditions and the following disclaimer.
     12  * 2. Redistributions in binary form must reproduce the above copyright
     13  *    notice, this list of conditions and the following disclaimer in the
     14  *    documentation and/or other materials provided with the distribution.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
     17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
     22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     26  * SUCH DAMAGE.
     27  */
     28 
     29 /*
     30  * a file system server which stores the data in a PostgreSQL database.
     31  */
     32 
     33 /*
     34  * we use large objects to store file contents.  there are a few XXXs wrt it.
     35  *
     36  * - large objects don't obey the normal transaction semantics.
     37  *
     38  * - we use large object server-side functions directly (instead of via the
     39  *   libpq large object api) because:
     40  *	- we want to use asynchronous (in the sense of PQsendFoo) operations
     41  *	  which is not available with the libpq large object api.
     42  *	- with the libpq large object api, there's no way to know details of
     43  *	  an error because PGresult is freed in the library without saving
     44  *	  PG_DIAG_SQLSTATE etc.
     45  */
     46 
     47 #include <sys/cdefs.h>
     48 #ifndef lint
     49 __RCSID("$NetBSD: pgfs_subs.c,v 1.5 2012/04/11 14:28:18 yamt Exp $");
     50 #endif /* not lint */
     51 
     52 #include <assert.h>
     53 #include <err.h>
     54 #include <errno.h>
     55 #include <puffs.h>
     56 #include <inttypes.h>
     57 #include <stdarg.h>
     58 #include <stdbool.h>
     59 #include <stdio.h>
     60 #include <stdlib.h>
     61 #include <time.h>
     62 #include <util.h>
     63 
     64 #include <libpq-fe.h>
     65 #include <libpq/libpq-fs.h>	/* INV_* */
     66 
     67 #include "pgfs.h"
     68 #include "pgfs_db.h"
     69 #include "pgfs_debug.h"
     70 #include "pgfs_waitq.h"
     71 #include "pgfs_subs.h"
     72 
     73 const char * const vtype_table[] = {
     74 	[VREG] = "regular",
     75 	[VDIR] = "directory",
     76 	[VLNK] = "link",
     77 };
     78 
     79 static unsigned int
     80 tovtype(const char *type)
     81 {
     82 	unsigned int i;
     83 
     84 	for (i = 0; i < __arraycount(vtype_table); i++) {
     85 		if (vtype_table[i] == NULL) {
     86 			continue;
     87 		}
     88 		if (!strcmp(type, vtype_table[i])) {
     89 			return i;
     90 		}
     91 	}
     92 	assert(0);
     93 	return 0;
     94 }
     95 
     96 static const char *
     97 fromvtype(enum vtype vtype)
     98 {
     99 
    100 	if (vtype < __arraycount(vtype_table)) {
    101 		assert(vtype_table[vtype] != NULL);
    102 		return vtype_table[vtype];
    103 	}
    104 	return NULL;
    105 }
    106 
    107 /*
    108  * fileid_lock stuff below is to keep ordering of operations for a file.
    109  * it is a workaround for the lack of operation barriers in the puffs
    110  * protocol.
    111  *
    112  * currently we do this locking only for SETATTR, GETATTR, and WRITE as
    113  * they are known to be reorder-unsafe.  they are sensitive to the file
    114  * attributes, mainly the file size.  note that as the kernel issues async
    115  * SETATTR/WRITE requests, vnode lock doesn't prevent GETATTR from seeing
    116  * the stale attributes.
    117  *
    118  * we are relying on waiton/wakeup being a FIFO.
    119  */
    120 
    121 struct fileid_lock_handle {
    122 	TAILQ_ENTRY(fileid_lock_handle) list;
    123 	fileid_t fileid;
    124 	struct puffs_cc *owner;	/* diagnostic only */
    125 	struct waitq waitq;
    126 };
    127 
    128 TAILQ_HEAD(, fileid_lock_handle) fileid_lock_list =
    129     TAILQ_HEAD_INITIALIZER(fileid_lock_list);
    130 struct waitq fileid_lock_waitq = TAILQ_HEAD_INITIALIZER(fileid_lock_waitq);
    131 
    132 /*
    133  * fileid_lock: serialize requests for the fileid.
    134  *
    135  * this function should be the first yieldable point in a puffs callback.
    136  */
    137 
    138 struct fileid_lock_handle *
    139 fileid_lock(fileid_t fileid, struct puffs_cc *cc)
    140 {
    141 	struct fileid_lock_handle *lock;
    142 
    143 	TAILQ_FOREACH(lock, &fileid_lock_list, list) {
    144 		if (lock->fileid == fileid) {
    145 			DPRINTF("fileid wait %" PRIu64 " cc %p\n", fileid, cc);
    146 			assert(lock->owner != cc);
    147 			waiton(&lock->waitq, cc);	/* enter FIFO */
    148 			assert(lock->owner == cc);
    149 			return lock;
    150 		}
    151 	}
    152 	lock = emalloc(sizeof(*lock));
    153 	lock->fileid = fileid;
    154 	lock->owner = cc;
    155 	DPRINTF("fileid lock %" PRIu64 " cc %p\n", lock->fileid, cc);
    156 	waitq_init(&lock->waitq);
    157 	TAILQ_INSERT_HEAD(&fileid_lock_list, lock, list);
    158 	return lock;
    159 }
    160 
    161 void
    162 fileid_unlock(struct fileid_lock_handle *lock)
    163 {
    164 
    165 	DPRINTF("fileid unlock %" PRIu64 "\n", lock->fileid);
    166 	assert(lock != NULL);
    167 	assert(lock->owner != NULL);
    168 	/*
    169 	 * perform direct-handoff to the first waiter.
    170 	 *
    171 	 * a handoff is essential to keep the order of requests.
    172 	 */
    173 	lock->owner = wakeup_one(&lock->waitq);
    174 	if (lock->owner != NULL) {
    175 		return;
    176 	}
    177 	/*
    178 	 * no one is waiting this fileid.
    179 	 */
    180 	TAILQ_REMOVE(&fileid_lock_list, lock, list);
    181 	free(lock);
    182 }
    183 
    184 /*
    185  * timespec_to_pgtimestamp: create a text representation of timestamp which
    186  * can be recognized by the database server.
    187  *
    188  * it's caller's responsibility to free(3) the result.
    189  */
    190 
    191 int
    192 timespec_to_pgtimestamp(const struct timespec *tv, char **resultp)
    193 {
    194 	/*
    195 	 * XXX is there any smarter way?
    196 	 */
    197 	char buf1[1024];
    198 	char buf2[1024];
    199 	struct tm tm_store;
    200 	struct tm *tm;
    201 
    202 	tm = gmtime_r(&tv->tv_sec, &tm_store);
    203 	if (tm == NULL) {
    204 		assert(errno != 0);
    205 		return errno;
    206 	}
    207 	strftime(buf1, sizeof(buf1), "%Y%m%dT%H%M%S", tm);
    208 	snprintf(buf2, sizeof(buf2), "%s.%ju", buf1,
    209 	    (uintmax_t)tv->tv_nsec / 1000);
    210 	*resultp = estrdup(buf2);
    211 	return 0;
    212 }
    213 
    214 int
    215 my_lo_truncate(struct Xconn *xc, int32_t fd, int32_t size)
    216 {
    217 	static struct cmd *c;
    218 	int32_t ret;
    219 	int error;
    220 
    221 	CREATECMD(c, "SELECT lo_truncate($1, $2)", INT4OID, INT4OID);
    222 	error = sendcmd(xc, c, fd, size);
    223 	if (error != 0) {
    224 		return error;
    225 	}
    226 	error = simplefetch(xc, INT4OID, &ret);
    227 	if (error != 0) {
    228 		if (error == EEXIST) {
    229 			/*
    230 			 * probably the insertion of the new-sized page
    231 			 * caused a duplicated key error.  retry.
    232 			 */
    233 			DPRINTF("map EEXIST to EAGAIN\n");
    234 			error = EAGAIN;
    235 		}
    236 		return error;
    237 	}
    238 	assert(ret == 0);
    239 	return 0;
    240 }
    241 
    242 int
    243 my_lo_lseek(struct Xconn *xc, int32_t fd, int32_t offset, int32_t whence,
    244     int32_t *retp)
    245 {
    246 	static struct cmd *c;
    247 	int32_t ret;
    248 	int error;
    249 
    250 	CREATECMD(c, "SELECT lo_lseek($1, $2, $3)", INT4OID, INT4OID, INT4OID);
    251 	error = sendcmd(xc, c, fd, offset, whence);
    252 	if (error != 0) {
    253 		return error;
    254 	}
    255 	error = simplefetch(xc, INT4OID, &ret);
    256 	if (error != 0) {
    257 		return error;
    258 	}
    259 	if (retp != NULL) {
    260 		*retp = ret;
    261 	}
    262 	return 0;
    263 }
    264 
    265 int
    266 my_lo_read(struct Xconn *xc, int32_t fd, void *buf, size_t size,
    267     size_t *resultsizep)
    268 {
    269 	static struct cmd *c;
    270 	size_t resultsize;
    271 	int error;
    272 
    273 	CREATECMD(c, "SELECT loread($1, $2)", INT4OID, INT4OID);
    274 	error = sendcmdx(xc, 1, c, fd, (int32_t)size);
    275 	if (error != 0) {
    276 		return error;
    277 	}
    278 	error = simplefetch(xc, BYTEA, buf, &resultsize);
    279 	if (error != 0) {
    280 		return error;
    281 	}
    282 	*resultsizep = resultsize;
    283 	if (size != resultsize) {
    284 		DPRINTF("shortread? %zu != %zu\n", size, resultsize);
    285 	}
    286 	return 0;
    287 }
    288 
    289 int
    290 my_lo_write(struct Xconn *xc, int32_t fd, const void *buf, size_t size,
    291     size_t *resultsizep)
    292 {
    293 	static struct cmd *c;
    294 	int32_t resultsize;
    295 	int error;
    296 
    297 	CREATECMD(c, "SELECT lowrite($1, $2)", INT4OID, BYTEA);
    298 	error = sendcmd(xc, c, fd, buf, (int32_t)size);
    299 	if (error != 0) {
    300 		return error;
    301 	}
    302 	error = simplefetch(xc, INT4OID, &resultsize);
    303 	if (error != 0) {
    304 		if (error == EEXIST) {
    305 			/*
    306 			 * probably the insertion of the new data page
    307 			 * caused a duplicated key error.  retry.
    308 			 */
    309 			DPRINTF("map EEXIST to EAGAIN\n");
    310 			error = EAGAIN;
    311 		}
    312 		return error;
    313 	}
    314 	*resultsizep = resultsize;
    315 	if (size != (size_t)resultsize) {
    316 		DPRINTF("shortwrite? %zu != %zu\n", size, (size_t)resultsize);
    317 	}
    318 	return 0;
    319 }
    320 
    321 int
    322 my_lo_open(struct Xconn *xc, Oid loid, int32_t mode, int32_t *fdp)
    323 {
    324 	static struct cmd *c;
    325 	int error;
    326 
    327 	CREATECMD(c, "SELECT lo_open($1, $2)", OIDOID, INT4OID);
    328 	error = sendcmd(xc, c, loid, mode);
    329 	if (error != 0) {
    330 		return error;
    331 	}
    332 	return simplefetch(xc, INT4OID, fdp);
    333 }
    334 
    335 int
    336 my_lo_close(struct Xconn *xc, int32_t fd)
    337 {
    338 #if 1
    339 	/*
    340 	 * do nothing.
    341 	 *
    342 	 * LO handles are automatically closed at the end of transactions.
    343 	 * our transactions are small enough.
    344 	 */
    345 #else
    346 	static struct cmd *c;
    347 	int32_t ret;
    348 	int error;
    349 
    350 	CREATECMD(c, "SELECT lo_close($1)", INT4OID);
    351 	error = sendcmd(xc, c, fd);
    352 	if (error != 0) {
    353 		return error;
    354 	}
    355 	error = simplefetch(xc, INT4OID, &ret);
    356 	if (error != 0) {
    357 		return error;
    358 	}
    359 	assert(ret == 0);
    360 #endif
    361 	return 0;
    362 }
    363 
    364 static int
    365 lo_lookup_by_fileid(struct Xconn *xc, fileid_t fileid, Oid *idp)
    366 {
    367 	static struct cmd *c;
    368 	static const Oid types[] = { OIDOID, };
    369 	struct fetchstatus s;
    370 	int error;
    371 
    372 	CREATECMD(c, "SELECT loid FROM datafork WHERE fileid = $1", INT8OID);
    373 	error = sendcmd(xc, c, fileid);
    374 	if (error != 0) {
    375 		return error;
    376 	}
    377 	fetchinit(&s, xc);
    378 	error = FETCHNEXT(&s, types, idp);
    379 	fetchdone(&s);
    380 	DPRINTF("error %d\n", error);
    381 	return error;
    382 }
    383 
    384 int
    385 lo_open_by_fileid(struct Xconn *xc, fileid_t fileid, int mode, int *fdp)
    386 {
    387 	Oid loid;
    388 	int fd;
    389 	int error;
    390 
    391 	error = lo_lookup_by_fileid(xc, fileid, &loid);
    392 	if (error != 0) {
    393 		return error;
    394 	}
    395 	error = my_lo_open(xc, loid, mode, &fd);
    396 	if (error != 0) {
    397 		return error;
    398 	}
    399 	*fdp = fd;
    400 	return 0;
    401 }
    402 
    403 static int
    404 getsize(struct Xconn *xc, fileid_t fileid, int *resultp)
    405 {
    406 	int32_t size;
    407 	int fd;
    408 	int error;
    409 
    410 	error = lo_open_by_fileid(xc, fileid, INV_READ, &fd);
    411 	if (error != 0) {
    412 		return error;
    413 	}
    414 	error = my_lo_lseek(xc, fd, 0, SEEK_END, &size);
    415 	if (error != 0) {
    416 		return error;
    417 	}
    418 	error = my_lo_close(xc, fd);
    419 	if (error != 0) {
    420 		return error;
    421 	}
    422 	*resultp = size;
    423 	return 0;
    424 }
    425 
    426 #define	GETATTR_TYPE	0x00000001
    427 #define	GETATTR_NLINK	0x00000002
    428 #define	GETATTR_SIZE	0x00000004
    429 #define	GETATTR_MODE	0x00000008
    430 #define	GETATTR_UID	0x00000010
    431 #define	GETATTR_GID	0x00000020
    432 #define	GETATTR_TIME	0x00000040
    433 #define	GETATTR_ALL	\
    434 	(GETATTR_TYPE|GETATTR_NLINK|GETATTR_SIZE|GETATTR_MODE| \
    435 	GETATTR_UID|GETATTR_GID|GETATTR_TIME)
    436 
    437 int
    438 getattr(struct Xconn *xc, fileid_t fileid, struct vattr *va, unsigned int mask)
    439 {
    440 	char *type;
    441 	long long atime_s;
    442 	long long atime_us;
    443 	long long ctime_s;
    444 	long long ctime_us;
    445 	long long mtime_s;
    446 	long long mtime_us;
    447 	long long btime_s;
    448 	long long btime_us;
    449 	uint64_t mode;
    450 	long long uid;
    451 	long long gid;
    452 	long long nlink;
    453 	long long rev;
    454 	struct fetchstatus s;
    455 	int error;
    456 
    457 	if (mask == 0) {
    458 		return 0;
    459 	}
    460 	/*
    461 	 * unless explicitly requested, avoid fetching timestamps as they
    462 	 * are a little more expensive than other simple attributes.
    463 	 */
    464 	if ((mask & GETATTR_TIME) != 0) {
    465 		static struct cmd *c;
    466 		static const Oid types[] = {
    467 			TEXTOID,
    468 			INT8OID,
    469 			INT8OID,
    470 			INT8OID,
    471 			INT8OID,
    472 			INT8OID,
    473 			INT8OID,
    474 			INT8OID,
    475 			INT8OID,
    476 			INT8OID,
    477 			INT8OID,
    478 			INT8OID,
    479 			INT8OID,
    480 			INT8OID,
    481 		};
    482 
    483 		CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev, "
    484 		    "extract(epoch from date_trunc('second', atime))::int8, "
    485 		    "extract(microseconds from atime)::int8, "
    486 		    "extract(epoch from date_trunc('second', ctime))::int8, "
    487 		    "extract(microseconds from ctime)::int8, "
    488 		    "extract(epoch from date_trunc('second', mtime))::int8, "
    489 		    "extract(microseconds from mtime)::int8, "
    490 		    "extract(epoch from date_trunc('second', btime))::int8, "
    491 		    "extract(microseconds from btime)::int8 "
    492 		    "FROM file "
    493 		    "WHERE fileid = $1", INT8OID);
    494 		error = sendcmd(xc, c, fileid);
    495 		if (error != 0) {
    496 			return error;
    497 		}
    498 		fetchinit(&s, xc);
    499 		error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink,
    500 		    &rev,
    501 		    &atime_s, &atime_us,
    502 		    &ctime_s, &ctime_us,
    503 		    &mtime_s, &mtime_us,
    504 		    &btime_s, &btime_us);
    505 	} else {
    506 		static struct cmd *c;
    507 		static const Oid types[] = {
    508 			TEXTOID,
    509 			INT8OID,
    510 			INT8OID,
    511 			INT8OID,
    512 			INT8OID,
    513 			INT8OID,
    514 		};
    515 
    516 		CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev "
    517 		    "FROM file "
    518 		    "WHERE fileid = $1", INT8OID);
    519 		error = sendcmd(xc, c, fileid);
    520 		if (error != 0) {
    521 			return error;
    522 		}
    523 		fetchinit(&s, xc);
    524 		error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink,
    525 		    &rev);
    526 	}
    527 	fetchdone(&s);
    528 	if (error != 0) {
    529 		return error;
    530 	}
    531 	memset(va, 0xaa, sizeof(*va)); /* fill with garbage for debug */
    532 	va->va_type = tovtype(type);
    533 	free(type);
    534 	va->va_mode = mode;
    535 	va->va_uid = uid;
    536 	va->va_gid = gid;
    537 	if (nlink > 0 && va->va_type == VDIR) {
    538 		nlink++; /* "." */
    539 	}
    540 	va->va_nlink = nlink;
    541 	va->va_fileid = fileid;
    542 	va->va_atime.tv_sec = atime_s;
    543 	va->va_atime.tv_nsec = atime_us * 1000;
    544 	va->va_ctime.tv_sec = ctime_s;
    545 	va->va_ctime.tv_nsec = ctime_us * 1000;
    546 	va->va_mtime.tv_sec = mtime_s;
    547 	va->va_mtime.tv_nsec = mtime_us * 1000;
    548 	va->va_birthtime.tv_sec = btime_s;
    549 	va->va_birthtime.tv_nsec = btime_us * 1000;
    550 	va->va_blocksize = LOBLKSIZE;
    551 	va->va_gen = 1;
    552 	va->va_filerev = rev;
    553 	if ((mask & GETATTR_SIZE) != 0) {
    554 		int size;
    555 
    556 		size = 0;
    557 		if (va->va_type == VREG || va->va_type == VLNK) {
    558 			error = getsize(xc, fileid, &size);
    559 			if (error != 0) {
    560 				return error;
    561 			}
    562 		} else if (va->va_type == VDIR) {
    563 			size = 100; /* XXX */
    564 		}
    565 		va->va_size = size;
    566 	}
    567 	/*
    568 	 * XXX va_bytes: likely wrong due to toast compression.
    569 	 * there's no cheap way to get the compressed size of LO.
    570 	 */
    571 	va->va_bytes = va->va_size;
    572 	va->va_flags = 0;
    573 	return 0;
    574 }
    575 
    576 int
    577 update_mctime(struct Xconn *xc, fileid_t fileid)
    578 {
    579 	static struct cmd *c;
    580 
    581 	CREATECMD(c,
    582 	    "UPDATE file "
    583 	    "SET mtime = current_timestamp, ctime = current_timestamp, "
    584 		"rev = rev + 1 "
    585 	    "WHERE fileid = $1", INT8OID);
    586 	return simplecmd(xc, c, fileid);
    587 }
    588 
    589 int
    590 update_atime(struct Xconn *xc, fileid_t fileid)
    591 {
    592 	static struct cmd *c;
    593 
    594 	CREATECMD(c,
    595 	    "UPDATE file SET atime = current_timestamp WHERE fileid = $1",
    596 	    INT8OID);
    597 	return simplecmd(xc, c, fileid);
    598 }
    599 
    600 int
    601 update_mtime(struct Xconn *xc, fileid_t fileid)
    602 {
    603 	static struct cmd *c;
    604 
    605 	CREATECMD(c,
    606 	    "UPDATE file "
    607 	    "SET mtime = current_timestamp, rev = rev + 1 "
    608 	    "WHERE fileid = $1", INT8OID);
    609 	return simplecmd(xc, c, fileid);
    610 }
    611 
    612 int
    613 update_ctime(struct Xconn *xc, fileid_t fileid)
    614 {
    615 	static struct cmd *c;
    616 
    617 	CREATECMD(c,
    618 	    "UPDATE file SET ctime = current_timestamp WHERE fileid = $1",
    619 	    INT8OID);
    620 	return simplecmd(xc, c, fileid);
    621 }
    622 
    623 int
    624 update_nlink(struct Xconn *xc, fileid_t fileid, int delta)
    625 {
    626 	static struct cmd *c;
    627 
    628 	CREATECMD(c,
    629 	    "UPDATE file "
    630 	    "SET nlink = nlink + $1 "
    631 	    "WHERE fileid = $2",
    632 	    INT8OID, INT8OID);
    633 	return simplecmd(xc, c, (int64_t)delta, fileid);
    634 }
    635 
    636 int
    637 lookupp(struct Xconn *xc, fileid_t fileid, fileid_t *parent)
    638 {
    639 	static struct cmd *c;
    640 	static const Oid types[] = { INT8OID, };
    641 	struct fetchstatus s;
    642 	int error;
    643 
    644 	CREATECMD(c, "SELECT parent_fileid FROM dirent "
    645 		"WHERE child_fileid = $1 LIMIT 1", INT8OID);
    646 	error = sendcmd(xc, c, fileid);
    647 	if (error != 0) {
    648 		return error;
    649 	}
    650 	fetchinit(&s, xc);
    651 	error = FETCHNEXT(&s, types, parent);
    652 	fetchdone(&s);
    653 	if (error != 0) {
    654 		return error;
    655 	}
    656 	return 0;
    657 }
    658 
    659 int
    660 mkfile(struct Xconn *xc, enum vtype vtype, mode_t mode, uid_t uid, gid_t gid,
    661     fileid_t *idp)
    662 {
    663 	static struct cmd *c;
    664 	const char *type;
    665 	int error;
    666 
    667 	type = fromvtype(vtype);
    668 	if (type == NULL) {
    669 		return EOPNOTSUPP;
    670 	}
    671 	CREATECMD(c,
    672 		"INSERT INTO file "
    673 		"(fileid, type, mode, uid, gid, nlink, rev, "
    674 		"atime, ctime, mtime, btime) "
    675 		"VALUES(nextval('fileid_seq'), $1::filetype, $2, $3, $4, 0, 0, "
    676 		"current_timestamp, "
    677 		"current_timestamp, "
    678 		"current_timestamp, "
    679 		"current_timestamp) "
    680 		"RETURNING fileid", TEXTOID, INT8OID, INT8OID, INT8OID);
    681 	error = sendcmd(xc, c, type, (uint64_t)mode, (uint64_t)uid,
    682 	    (uint64_t)gid);
    683 	if (error != 0) {
    684 		return error;
    685 	}
    686 	return simplefetch(xc, INT8OID, idp);
    687 }
    688 
    689 int
    690 linkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child)
    691 {
    692 	static struct cmd *c;
    693 	int error;
    694 
    695 	CREATECMD(c,
    696 		"INSERT INTO dirent "
    697 		"(parent_fileid, name, child_fileid) "
    698 		"VALUES($1, $2, $3)", INT8OID, TEXTOID, INT8OID);
    699 	error = simplecmd(xc, c, parent, name, child);
    700 	if (error != 0) {
    701 		return error;
    702 	}
    703 	error = update_nlink(xc, child, 1);
    704 	if (error != 0) {
    705 		return error;
    706 	}
    707 	return update_mtime(xc, parent);
    708 }
    709 
    710 int
    711 unlinkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child)
    712 {
    713 	static struct cmd *c;
    714 	int error;
    715 
    716 	/*
    717 	 * in addition to the primary key, we check child_fileid as well here
    718 	 * to avoid removing an entry which was appeared after our VOP_LOOKUP.
    719 	 */
    720 	CREATECMD(c,
    721 		"DELETE FROM dirent "
    722 		"WHERE parent_fileid = $1 AND name = $2 AND child_fileid = $3",
    723 		INT8OID, TEXTOID, INT8OID);
    724 	error = simplecmd(xc, c, parent, name, child);
    725 	if (error != 0) {
    726 		return error;
    727 	}
    728 	error = update_nlink(xc, child, -1);
    729 	if (error != 0) {
    730 		return error;
    731 	}
    732 	error = update_mtime(xc, parent);
    733 	if (error != 0) {
    734 		return error;
    735 	}
    736 	return update_ctime(xc, child);
    737 }
    738 
    739 int
    740 mklinkfile(struct Xconn *xc, fileid_t parent, const char *name,
    741     enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *idp)
    742 {
    743 	fileid_t fileid;
    744 	int error;
    745 
    746 	error = mkfile(xc, vtype, mode, uid, gid, &fileid);
    747 	if (error != 0) {
    748 		return error;
    749 	}
    750 	error = linkfile(xc, parent, name, fileid);
    751 	if (error != 0) {
    752 		return error;
    753 	}
    754 	if (idp != NULL) {
    755 		*idp = fileid;
    756 	}
    757 	return 0;
    758 }
    759 
    760 int
    761 mklinkfile_lo(struct Xconn *xc, fileid_t parent_fileid, const char *name,
    762     enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *fileidp,
    763     int *loidp)
    764 {
    765 	static struct cmd *c;
    766 	fileid_t new_fileid;
    767 	int loid;
    768 	int error;
    769 
    770 	error = mklinkfile(xc, parent_fileid, name, vtype, mode, uid, gid,
    771 	    &new_fileid);
    772 	if (error != 0) {
    773 		return error;
    774 	}
    775 	CREATECMD(c,
    776 		"INSERT INTO datafork (fileid, loid) "
    777 		"VALUES($1, lo_creat(-1)) "
    778 		"RETURNING loid", INT8OID);
    779 	error = sendcmd(xc, c, new_fileid);
    780 	if (error != 0) {
    781 		return error;
    782 	}
    783 	error = simplefetch(xc, OIDOID, &loid);
    784 	if (error != 0) {
    785 		return error;
    786 	}
    787 	if (fileidp != NULL) {
    788 		*fileidp = new_fileid;
    789 	}
    790 	if (loidp != NULL) {
    791 		*loidp = loid;
    792 	}
    793 	return 0;
    794 }
    795 
    796 int
    797 cleanupfile(struct Xconn *xc, fileid_t fileid)
    798 {
    799 	static struct cmd *c;
    800 	char *type;
    801 	unsigned int vtype;
    802 	int error;
    803 
    804 	CREATECMD(c, "DELETE FROM file WHERE fileid = $1 AND nlink = 0 "
    805 		"RETURNING type::text", INT8OID);
    806 	error = sendcmd(xc, c, fileid);
    807 	if (error != 0) {
    808 		return error;
    809 	}
    810 	error = simplefetch(xc, TEXTOID, &type);
    811 	if (error == ENOENT) {
    812 		return 0; /* probably nlink > 0 */
    813 	}
    814 	if (error != 0) {
    815 		return error;
    816 	}
    817 	vtype = tovtype(type);
    818 	free(type);
    819 	if (vtype == VREG || vtype == VLNK) {
    820 		static struct cmd *c_datafork;
    821 		int32_t ret;
    822 
    823 		CREATECMD(c_datafork,
    824 			"WITH loids AS (DELETE FROM datafork WHERE fileid = $1 "
    825 			"RETURNING loid) SELECT lo_unlink(loid) FROM loids",
    826 			INT8OID);
    827 		error = sendcmd(xc, c_datafork, fileid);
    828 		if (error != 0) {
    829 			return error;
    830 		}
    831 		error = simplefetch(xc, INT4OID, &ret);
    832 		if (error != 0) {
    833 			return error;
    834 		}
    835 		if (ret != 1) {
    836 			return EIO; /* lo_unlink failed */
    837 		}
    838 	}
    839 	return 0;
    840 }
    841 
    842 /*
    843  * check_path: do locking and check to prevent a rename from creating loop.
    844  *
    845  * lock the dirents between child_fileid and the root directory.
    846  * if gate_fileid is appeared in the path, return EINVAL.
    847  * caller should ensure that child_fileid is of VDIR beforehand.
    848  *
    849  * we uses FOR SHARE row level locks as poor man's predicate locks.
    850  *
    851  * the following is an example to show why we need to lock the path.
    852  *
    853  * consider:
    854  * "mkdir -p /a/b/c/d/e/f && mkdir -p /1/2/3/4/5/6"
    855  * and then
    856  * thread 1 is doing "mv /a/b /1/2/3/4/5/6"
    857  * thread 2 is doing "mv /1/2 /a/b/c/d/e/f"
    858  *
    859  * a possible consequence:
    860  *	thread 1: check_path -> success
    861  *	thread 2: check_path -> success
    862  *	thread 1: modify directories -> block on row-level lock
    863  *	thread 2: modify directories -> block on row-level lock
    864  *			-> deadlock detected
    865  *			-> rollback and retry
    866  *
    867  * another possible consequence:
    868  *	thread 1: check_path -> success
    869  *	thread 1: modify directory entries -> success
    870  *	thread 2: check_path -> block on row-level lock
    871  *	thread 1: commit
    872  *	thread 2: acquire the lock and notices the row is updated
    873  *			-> serialization error
    874  *			-> rollback and retry
    875  *
    876  * XXX it might be better to use real serializable transactions,
    877  * which will be available for PostgreSQL 9.1
    878  */
    879 
    880 int
    881 check_path(struct Xconn *xc, fileid_t gate_fileid, fileid_t child_fileid)
    882 {
    883 	static struct cmd *c;
    884 	fileid_t parent_fileid;
    885 	struct fetchstatus s;
    886 	int error;
    887 
    888 	CREATECMD(c,
    889 		"WITH RECURSIVE r AS "
    890 		"( "
    891 				"SELECT parent_fileid, cookie, child_fileid "
    892 				"FROM dirent "
    893 				"WHERE child_fileid = $1 "
    894 			"UNION ALL "
    895 				"SELECT d.parent_fileid, d.cookie, "
    896 				"d.child_fileid "
    897 				"FROM dirent AS d INNER JOIN r "
    898 				"ON d.child_fileid = r.parent_fileid "
    899 		") "
    900 		"SELECT d.parent_fileid "
    901 		"FROM dirent d "
    902 		"JOIN r "
    903 		"ON d.cookie = r.cookie "
    904 		"FOR SHARE", INT8OID);
    905 	error = sendcmd(xc, c, child_fileid);
    906 	if (error != 0) {
    907 		return error;
    908 	}
    909 	fetchinit(&s, xc);
    910 	do {
    911 		static const Oid types[] = { INT8OID, };
    912 
    913 		error = FETCHNEXT(&s, types, &parent_fileid);
    914 		if (error == ENOENT) {
    915 			fetchdone(&s);
    916 			return 0;
    917 		}
    918 		if (error != 0) {
    919 			fetchdone(&s);
    920 			return error;
    921 		}
    922 	} while (gate_fileid != parent_fileid);
    923 	fetchdone(&s);
    924 	return EINVAL;
    925 }
    926 
    927 int
    928 isempty(struct Xconn *xc, fileid_t fileid, bool *emptyp)
    929 {
    930 	int32_t dummy;
    931 	static struct cmd *c;
    932 	int error;
    933 
    934 	CREATECMD(c,
    935 		"SELECT 1 FROM dirent "
    936 		"WHERE parent_fileid = $1 LIMIT 1", INT8OID);
    937 	error = sendcmd(xc, c, fileid);
    938 	if (error != 0) {
    939 		return error;
    940 	}
    941 	error = simplefetch(xc, INT4OID, &dummy);
    942 	assert(error != 0 || dummy == 1);
    943 	if (error == ENOENT) {
    944 		*emptyp = true;
    945 		error = 0;
    946 	} else {
    947 		*emptyp = false;
    948 	}
    949 	return error;
    950 }
    951