Home | History | Annotate | Line # | Download | only in pgfs
      1 /*	$NetBSD: pgfs_puffs.c,v 1.5 2014/10/18 07:11:07 snj Exp $	*/
      2 
      3 /*-
      4  * Copyright (c)2010,2011 YAMAMOTO Takashi,
      5  * All rights reserved.
      6  *
      7  * Redistribution and use in source and binary forms, with or without
      8  * modification, are permitted provided that the following conditions
      9  * are met:
     10  * 1. Redistributions of source code must retain the above copyright
     11  *    notice, this list of conditions and the following disclaimer.
     12  * 2. Redistributions in binary form must reproduce the above copyright
     13  *    notice, this list of conditions and the following disclaimer in the
     14  *    documentation and/or other materials provided with the distribution.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
     17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
     22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     26  * SUCH DAMAGE.
     27  */
     28 
     29 /*
     30  * puffs node ops and fs ops.
     31  */
     32 
     33 #include <sys/cdefs.h>
     34 #ifndef lint
     35 __RCSID("$NetBSD: pgfs_puffs.c,v 1.5 2014/10/18 07:11:07 snj Exp $");
     36 #endif /* not lint */
     37 
     38 #include <assert.h>
     39 #include <err.h>
     40 #include <errno.h>
     41 #include <puffs.h>
     42 #include <inttypes.h>
     43 #include <stdarg.h>
     44 #include <stdbool.h>
     45 #include <stdio.h>
     46 #include <stdlib.h>
     47 #include <time.h>
     48 #include <util.h>
     49 
     50 #include <libpq-fe.h>
     51 #include <libpq/libpq-fs.h>	/* INV_* */
     52 
     53 #include "pgfs.h"
     54 #include "pgfs_db.h"
     55 #include "pgfs_subs.h"
     56 #include "pgfs_debug.h"
     57 
     58 static fileid_t
     59 cookie_to_fileid(puffs_cookie_t cookie)
     60 {
     61 
     62 	return (fileid_t)(uintptr_t)cookie;
     63 }
     64 
     65 static puffs_cookie_t
     66 fileid_to_cookie(fileid_t id)
     67 {
     68 	puffs_cookie_t cookie = (puffs_cookie_t)(uintptr_t)id;
     69 
     70 	/* XXX not true for 32-bit ports */
     71 	assert(cookie_to_fileid(cookie) == id);
     72 	return cookie;
     73 }
     74 
     75 puffs_cookie_t
     76 pgfs_root_cookie(void)
     77 {
     78 
     79 	return fileid_to_cookie(PGFS_ROOT_FILEID);
     80 }
     81 
     82 int
     83 pgfs_node_getattr(struct puffs_usermount *pu, puffs_cookie_t opc,
     84     struct vattr *va, const struct puffs_cred *pcr)
     85 {
     86 	struct Xconn *xc;
     87 	struct fileid_lock_handle *lock;
     88 	fileid_t fileid = cookie_to_fileid(opc);
     89 	int error;
     90 
     91 	DPRINTF("%llu\n", fileid);
     92 	lock = fileid_lock(fileid, puffs_cc_getcc(pu));
     93 retry:
     94 	xc = begin_readonly(pu, "getattr");
     95 	error = getattr(xc, fileid, va, GETATTR_ALL);
     96 	if (error != 0) {
     97 		goto got_error;
     98 	}
     99 	error = commit(xc);
    100 	if (error != 0) {
    101 		goto got_error;
    102 	}
    103 	goto done;
    104 got_error:
    105 	rollback(xc);
    106 	if (error == EAGAIN) {
    107 		goto retry;
    108 	}
    109 done:
    110 	fileid_unlock(lock);
    111 	return error;
    112 }
    113 
    114 #define	PGFS_DIRCOOKIE_DOT	0	/* . entry */
    115 #define	PGFS_DIRCOOKIE_DOTDOT	1	/* .. entry */
    116 #define	PGFS_DIRCOOKIE_EOD	2	/* end of directory */
    117 
    118 int
    119 pgfs_node_readdir(struct puffs_usermount *pu, puffs_cookie_t opc,
    120     struct dirent *dent, off_t *readoff, size_t *reslen,
    121     const struct puffs_cred *pcr, int *eofflag, off_t *cookies,
    122     size_t *ncookies)
    123 {
    124 	fileid_t parent_fileid;
    125 	fileid_t child_fileid;
    126 	uint64_t cookie;
    127 	uint64_t nextcookie;
    128 	uint64_t offset;
    129 	struct Xconn *xc = NULL;
    130 	static const Oid types[] = {
    131 		TEXTOID,	/* name */
    132 		INT8OID,	/* cookie */
    133 		INT8OID,	/* nextcookie */
    134 		INT8OID,	/* child_fileid */
    135 	};
    136 	const char *name;
    137 	char *nametofree = NULL;
    138 	struct fetchstatus s;
    139 	int error;
    140 	bool fetching;
    141 	bool bufferfull;
    142 
    143 	parent_fileid = cookie_to_fileid(opc);
    144 	offset = *readoff;
    145 	DPRINTF("%llu %" PRIu64 "\n", parent_fileid, offset);
    146 	*ncookies = 0;
    147 	fetching = false;
    148 next:
    149 	if (offset == PGFS_DIRCOOKIE_DOT) {
    150 		name = ".";
    151 		child_fileid = parent_fileid;
    152 		cookie = offset;
    153 		nextcookie = PGFS_DIRCOOKIE_DOTDOT;
    154 		goto store_and_next;
    155 	}
    156 	if (offset == PGFS_DIRCOOKIE_DOTDOT) {
    157 		if (parent_fileid != PGFS_ROOT_FILEID) {
    158 			if (xc == NULL) {
    159 				xc = begin(pu, "readdir1");
    160 			}
    161 			error = lookupp(xc, parent_fileid, &child_fileid);
    162 			if (error != 0) {
    163 				rollback(xc);
    164 				return error;
    165 			}
    166 		} else {
    167 			child_fileid = parent_fileid;
    168 		}
    169 		name = "..";
    170 		cookie = offset;
    171 		nextcookie = PGFS_DIRCOOKIE_EOD + 1;
    172 		goto store_and_next;
    173 	}
    174 	if (offset == PGFS_DIRCOOKIE_EOD) {
    175 		*eofflag = 1;
    176 		goto done;
    177 	}
    178 	/* offset > PGFS_DIRCOOKIE_EOD; normal entries */
    179 	if (xc == NULL) {
    180 		xc = begin(pu, "readdir2");
    181 	}
    182 	if (!fetching) {
    183 		static struct cmd *c;
    184 
    185 		/*
    186 		 * a simpler query like "ORDER BY name OFFSET :offset - 3"
    187 		 * would work well for most of cases.  however, it doesn't for
    188 		 * applications which expect readdir cookies are kept valid
    189 		 * even after unlink of other entries in the directory.
    190 		 * eg. cvs, bonnie++
    191 		 *
    192 		 * 2::int8 == PGFS_DIRCOOKIE_EOD
    193 		 */
    194 		CREATECMD(c,
    195 			"SELECT name, cookie, "
    196 			"lead(cookie, 1, 2::int8) OVER (ORDER BY cookie), "
    197 			"child_fileid "
    198 			"FROM dirent "
    199 			"WHERE parent_fileid = $1 "
    200 			"AND cookie >= $2 "
    201 			"ORDER BY cookie", INT8OID, INT8OID);
    202 		error = sendcmd(xc, c, parent_fileid, offset);
    203 		if (error != 0) {
    204 			rollback(xc);
    205 			return error;
    206 		}
    207 		fetching = true;
    208 		fetchinit(&s, xc);
    209 	}
    210 	/*
    211 	 * fetch and process an entry
    212 	 */
    213 	error = FETCHNEXT(&s, types, &nametofree, &cookie, &nextcookie,
    214 	    &child_fileid);
    215 	if (error == ENOENT) {
    216 		DPRINTF("ENOENT\n");
    217 		if (offset == PGFS_DIRCOOKIE_EOD + 1) {
    218 			DPRINTF("empty directory\n");
    219 			*eofflag = 1;
    220 			goto done;
    221 		}
    222 		fetchdone(&s);
    223 		rollback(xc);
    224 		return EINVAL;
    225 	}
    226 	if (error != 0) {
    227 		DPRINTF("error %d\n", error);
    228 		fetchdone(&s);
    229 		rollback(xc);
    230 		return error;
    231 	}
    232 	if (offset != cookie && offset != PGFS_DIRCOOKIE_EOD + 1) {
    233 		free(nametofree);
    234 		fetchdone(&s);
    235 		rollback(xc);
    236 		return EINVAL;
    237 	}
    238 	name = nametofree;
    239 store_and_next:
    240 	/*
    241 	 * store an entry and continue processing unless the result buffer
    242 	 * is full.
    243 	 */
    244 	bufferfull = !puffs_nextdent(&dent, name, child_fileid, DT_UNKNOWN,
    245 	    reslen);
    246 	free(nametofree);
    247 	nametofree = NULL;
    248 	if (bufferfull) {
    249 		*eofflag = 0;
    250 		goto done;
    251 	}
    252 	PUFFS_STORE_DCOOKIE(cookies, ncookies, cookie);
    253 	offset = nextcookie;
    254 	*readoff = offset;
    255 	goto next;
    256 done:
    257 	/*
    258 	 * cleanup and update atime of the directory.
    259 	 */
    260 	assert(nametofree == NULL);
    261 	if (fetching) {
    262 		fetchdone(&s);
    263 		fetching = false;
    264 	}
    265 	if (xc == NULL) {
    266 retry:
    267 		xc = begin(pu, "readdir3");
    268 	}
    269 	error = update_atime(xc, parent_fileid);
    270 	if (error != 0) {
    271 		goto got_error;
    272 	}
    273 	error = commit(xc);
    274 	if (error != 0) {
    275 		goto got_error;
    276 	}
    277 	return 0;
    278 got_error:
    279 	rollback(xc);
    280 	if (error == EAGAIN) {
    281 		goto retry;
    282 	}
    283 	return error;
    284 }
    285 
    286 int
    287 pgfs_node_lookup(struct puffs_usermount *pu, puffs_cookie_t opc,
    288     struct puffs_newinfo *pni, const struct puffs_cn *pcn)
    289 {
    290 	struct vattr dva;
    291 	struct vattr cva;
    292 	struct puffs_cred * const pcr = pcn->pcn_cred;
    293 	fileid_t parent_fileid;
    294 	const char *name;
    295 	fileid_t child_fileid;
    296 	struct Xconn *xc;
    297 	mode_t access_mode;
    298 	int error;
    299 	int saved_error;
    300 
    301 	parent_fileid = cookie_to_fileid(opc);
    302 	name = pcn->pcn_name;
    303 	DPRINTF("%llu %s\n", parent_fileid, name);
    304 	assert(strcmp(name, ".")); /* . is handled by framework */
    305 retry:
    306 	xc = begin_readonly(pu, "lookup");
    307 	error = getattr(xc, parent_fileid, &dva,
    308 	    GETATTR_TYPE|GETATTR_MODE|GETATTR_UID|GETATTR_GID);
    309 	if (error != 0) {
    310 		goto got_error;
    311 	}
    312 	access_mode = PUFFS_VEXEC;
    313 	if ((pcn->pcn_flags & NAMEI_ISLASTCN) != 0 &&
    314 	    pcn->pcn_nameiop != NAMEI_LOOKUP) {
    315 		access_mode |= PUFFS_VWRITE;
    316 	}
    317 	error = puffs_access(dva.va_type, dva.va_mode, dva.va_uid, dva.va_gid,
    318 	    access_mode, pcr);
    319 	if (error != 0) {
    320 		goto commit_and_return;
    321 	}
    322 	if (!strcmp(name, "..")) {
    323 		error = lookupp(xc, parent_fileid, &child_fileid);
    324 		if (error != 0) {
    325 			goto got_error;
    326 		}
    327 	} else {
    328 		static struct cmd *c;
    329 		static const Oid types[] = { INT8OID, };
    330 		struct fetchstatus s;
    331 
    332 		CREATECMD(c, "SELECT child_fileid "
    333 			"FROM dirent "
    334 			"WHERE parent_fileid = $1 AND name = $2",
    335 			INT8OID, TEXTOID);
    336 		error = sendcmd(xc, c, parent_fileid, name);
    337 		if (error != 0) {
    338 			DPRINTF("sendcmd %d\n", error);
    339 			goto got_error;
    340 		}
    341 		fetchinit(&s, xc);
    342 		error = FETCHNEXT(&s, types, &child_fileid);
    343 		fetchdone(&s);
    344 		if (error == ENOENT) {
    345 			goto commit_and_return;
    346 		}
    347 		if (error != 0) {
    348 			goto got_error;
    349 		}
    350 	}
    351 	error = getattr(xc, child_fileid, &cva, GETATTR_TYPE|GETATTR_SIZE);
    352 	if (error != 0) {
    353 		goto got_error;
    354 	}
    355 	error = commit(xc);
    356 	if (error != 0) {
    357 		goto got_error;
    358 	}
    359 	puffs_newinfo_setcookie(pni, fileid_to_cookie(child_fileid));
    360 	puffs_newinfo_setvtype(pni, cva.va_type);
    361 	puffs_newinfo_setsize(pni, cva.va_size);
    362 	return 0;
    363 got_error:
    364 	rollback(xc);
    365 	if (error == EAGAIN) {
    366 		goto retry;
    367 	}
    368 	return error;
    369 commit_and_return:
    370 	saved_error = error;
    371 	error = commit(xc);
    372 	if (error != 0) {
    373 		goto got_error;
    374 	}
    375 	return saved_error;
    376 }
    377 
    378 int
    379 pgfs_node_mkdir(struct puffs_usermount *pu, puffs_cookie_t opc,
    380     struct puffs_newinfo *pni, const struct puffs_cn *pcn,
    381     const struct vattr *va)
    382 {
    383 	struct Xconn *xc;
    384 	fileid_t parent_fileid = cookie_to_fileid(opc);
    385 	fileid_t new_fileid;
    386 	struct puffs_cred * const pcr = pcn->pcn_cred;
    387 	uid_t uid;
    388 	gid_t gid;
    389 	int error;
    390 
    391 	DPRINTF("%llu %s\n", parent_fileid, pcn->pcn_name);
    392 	if (puffs_cred_getuid(pcr, &uid) == -1 ||
    393 	    puffs_cred_getgid(pcr, &gid) == -1) {
    394 		return errno;
    395 	}
    396 retry:
    397 	xc = begin(pu, "mkdir");
    398 	error = mklinkfile(xc, parent_fileid, pcn->pcn_name, VDIR,
    399 	    va->va_mode, uid, gid, &new_fileid);
    400 	if (error == 0) {
    401 		error = update_nlink(xc, parent_fileid, 1);
    402 	}
    403 	if (error != 0) {
    404 		goto got_error;
    405 	}
    406 	error = commit(xc);
    407 	if (error != 0) {
    408 		goto got_error;
    409 	}
    410 	puffs_newinfo_setcookie(pni, fileid_to_cookie(new_fileid));
    411 	return 0;
    412 got_error:
    413 	rollback(xc);
    414 	if (error == EAGAIN) {
    415 		goto retry;
    416 	}
    417 	return error;
    418 }
    419 
    420 int
    421 pgfs_node_create(struct puffs_usermount *pu, puffs_cookie_t opc,
    422     struct puffs_newinfo *pni, const struct puffs_cn *pcn,
    423     const struct vattr *va)
    424 {
    425 	struct Xconn *xc;
    426 	fileid_t parent_fileid = cookie_to_fileid(opc);
    427 	fileid_t new_fileid;
    428 	struct puffs_cred * const pcr = pcn->pcn_cred;
    429 	uid_t uid;
    430 	gid_t gid;
    431 	int error;
    432 
    433 	DPRINTF("%llu %s\n", parent_fileid, pcn->pcn_name);
    434 	if (puffs_cred_getuid(pcr, &uid) == -1 ||
    435 	    puffs_cred_getgid(pcr, &gid) == -1) {
    436 		return errno;
    437 	}
    438 retry:
    439 	xc = begin(pu, "create");
    440 	error = mklinkfile_lo(xc, parent_fileid, pcn->pcn_name, VREG,
    441 	    va->va_mode,
    442 	    uid, gid, &new_fileid, NULL);
    443 	if (error != 0) {
    444 		goto got_error;
    445 	}
    446 	error = commit(xc);
    447 	if (error != 0) {
    448 		goto got_error;
    449 	}
    450 	puffs_newinfo_setcookie(pni, fileid_to_cookie(new_fileid));
    451 	return 0;
    452 got_error:
    453 	rollback(xc);
    454 	if (error == EAGAIN) {
    455 		goto retry;
    456 	}
    457 	return error;
    458 }
    459 
    460 int
    461 pgfs_node_write(struct puffs_usermount *pu, puffs_cookie_t opc,
    462     uint8_t *buf, off_t offset, size_t *resid,
    463     const struct puffs_cred *pcr, int ioflags)
    464 {
    465 	struct Xconn *xc;
    466 	struct fileid_lock_handle *lock;
    467 	fileid_t fileid = cookie_to_fileid(opc);
    468 	size_t resultlen;
    469 	int fd;
    470 	int error;
    471 
    472 	if ((ioflags & PUFFS_IO_APPEND) != 0) {
    473 		DPRINTF("%llu append sz %zu\n", fileid, *resid);
    474 	} else {
    475 		DPRINTF("%llu off %" PRIu64 " sz %zu\n", fileid,
    476 		    (uint64_t)offset, *resid);
    477 	}
    478 	lock = fileid_lock(fileid, puffs_cc_getcc(pu));
    479 retry:
    480 	xc = begin(pu, "write");
    481 	error = update_mctime(xc, fileid);
    482 	if (error != 0) {
    483 		goto got_error;
    484 	}
    485 	error = lo_open_by_fileid(xc, fileid, INV_WRITE, &fd);
    486 	if (error != 0) {
    487 		goto got_error;
    488 	}
    489 	if ((ioflags & PUFFS_IO_APPEND) != 0) {
    490 		int32_t off;
    491 
    492 		error = my_lo_lseek(xc, fd, 0, SEEK_END, &off);
    493 		if (error != 0) {
    494 			goto got_error;
    495 		}
    496 		offset = off;
    497 	}
    498 	if (offset < 0) {			/* negative offset */
    499 		error = EINVAL;
    500 		goto got_error;
    501 	}
    502 	if ((uint64_t)(INT64_MAX - offset) < *resid ||	/* int64 overflow */
    503 	    INT_MAX < offset + *resid) {	/* our max filesize */
    504 		error = EFBIG;
    505 		goto got_error;
    506 	}
    507 	if ((ioflags & PUFFS_IO_APPEND) == 0) {
    508 		error = my_lo_lseek(xc, fd, offset, SEEK_SET, NULL);
    509 		if (error != 0) {
    510 			goto got_error;
    511 		}
    512 	}
    513 	error = my_lo_write(xc, fd, (const char *)buf, *resid, &resultlen);
    514 	if (error != 0) {
    515 		goto got_error;
    516 	}
    517 	assert(*resid >= resultlen);
    518 	error = commit(xc);
    519 	if (error != 0) {
    520 		goto got_error;
    521 	}
    522 	*resid -= resultlen;
    523 	DPRINTF("resid %zu\n", *resid);
    524 	goto done;
    525 got_error:
    526 	rollback(xc);
    527 	if (error == EAGAIN) {
    528 		goto retry;
    529 	}
    530 done:
    531 	fileid_unlock(lock);
    532 	return error;
    533 }
    534 
    535 int
    536 pgfs_node_read(struct puffs_usermount *pu, puffs_cookie_t opc,
    537     uint8_t *buf, off_t offset, size_t *resid,
    538     const struct puffs_cred *pcr, int ioflags)
    539 {
    540 	struct Xconn *xc;
    541 	fileid_t fileid = cookie_to_fileid(opc);
    542 	size_t resultlen;
    543 	int fd;
    544 	int error;
    545 
    546 	DPRINTF("%llu off %" PRIu64 " sz %zu\n",
    547 	    fileid, (uint64_t)offset, *resid);
    548 retry:
    549 	xc = begin(pu, "read");
    550 	/*
    551 	 * try to update atime first as it's prone to conflict with other
    552 	 * transactions.  eg. read-ahead requests can conflict each other.
    553 	 * we don't want to retry my_lo_read as it's expensive.
    554 	 *
    555 	 * XXX probably worth to implement noatime mount option.
    556 	 */
    557 	error = update_atime(xc, fileid);
    558 	if (error != 0) {
    559 		goto got_error;
    560 	}
    561 	error = lo_open_by_fileid(xc, fileid, INV_READ, &fd);
    562 	if (error != 0) {
    563 		goto got_error;
    564 	}
    565 	error = my_lo_lseek(xc, fd, offset, SEEK_SET, NULL);
    566 	if (error != 0) {
    567 		goto got_error;
    568 	}
    569 	error = my_lo_read(xc, fd, buf, *resid, &resultlen);
    570 	if (error != 0) {
    571 		goto got_error;
    572 	}
    573 	assert(*resid >= resultlen);
    574 	error = commit(xc);
    575 	if (error != 0) {
    576 		goto got_error;
    577 	}
    578 	*resid -= resultlen;
    579 	return 0;
    580 got_error:
    581 	rollback(xc);
    582 	if (error == EAGAIN) {
    583 		goto retry;
    584 	}
    585 	return error;
    586 }
    587 
    588 int
    589 pgfs_node_link(struct puffs_usermount *pu, puffs_cookie_t dir_opc,
    590     puffs_cookie_t targ_opc, const struct puffs_cn *pcn)
    591 {
    592 	struct Xconn *xc;
    593 	fileid_t dir_fileid = cookie_to_fileid(dir_opc);
    594 	fileid_t targ_fileid = cookie_to_fileid(targ_opc);
    595 	struct vattr va;
    596 	int error;
    597 
    598 	DPRINTF("%llu %llu %s\n", dir_fileid, targ_fileid, pcn->pcn_name);
    599 retry:
    600 	xc = begin(pu, "link");
    601 	error = getattr(xc, targ_fileid, &va, GETATTR_TYPE);
    602 	if (error != 0) {
    603 		goto got_error;
    604 	}
    605 	if (va.va_type == VDIR) {
    606 		error = EPERM;
    607 		goto got_error;
    608 	}
    609 	error = linkfile(xc, dir_fileid, pcn->pcn_name, targ_fileid);
    610 	if (error != 0) {
    611 		goto got_error;
    612 	}
    613 	error = update_ctime(xc, targ_fileid);
    614 	if (error != 0) {
    615 		goto got_error;
    616 	}
    617 	error = commit(xc);
    618 	if (error != 0) {
    619 		goto got_error;
    620 	}
    621 	return 0;
    622 got_error:
    623 	rollback(xc);
    624 	if (error == EAGAIN) {
    625 		goto retry;
    626 	}
    627 	return error;
    628 }
    629 
    630 int
    631 pgfs_node_remove(struct puffs_usermount *pu, puffs_cookie_t opc,
    632     puffs_cookie_t targ, const struct puffs_cn *pcn)
    633 {
    634 	struct Xconn *xc;
    635 	fileid_t fileid = cookie_to_fileid(opc);
    636 	fileid_t targ_fileid = cookie_to_fileid(targ);
    637 	struct vattr va;
    638 	int error;
    639 
    640 retry:
    641 	xc = begin(pu, "remove");
    642 	error = getattr(xc, targ_fileid, &va, GETATTR_TYPE);
    643 	if (error != 0) {
    644 		goto got_error;
    645 	}
    646 	if (va.va_type == VDIR) {
    647 		error = EPERM;
    648 		goto got_error;
    649 	}
    650 	error = unlinkfile(xc, fileid, pcn->pcn_name, targ_fileid);
    651 	if (error != 0) {
    652 		goto got_error;
    653 	}
    654 	error = commit(xc);
    655 	if (error != 0) {
    656 		goto got_error;
    657 	}
    658 	puffs_setback(puffs_cc_getcc(pu), PUFFS_SETBACK_INACT_N2);
    659 	return 0;
    660 got_error:
    661 	rollback(xc);
    662 	if (error == EAGAIN) {
    663 		goto retry;
    664 	}
    665 	return error;
    666 }
    667 
    668 int
    669 pgfs_node_rmdir(struct puffs_usermount *pu, puffs_cookie_t opc,
    670     puffs_cookie_t targ, const struct puffs_cn *pcn)
    671 {
    672 	struct Xconn *xc;
    673 	fileid_t parent_fileid = cookie_to_fileid(opc);
    674 	fileid_t targ_fileid = cookie_to_fileid(targ);
    675 	struct vattr va;
    676 	bool empty;
    677 	int error;
    678 
    679 retry:
    680 	xc = begin(pu, "rmdir");
    681 	error = getattr(xc, targ_fileid, &va, GETATTR_TYPE);
    682 	if (error != 0) {
    683 		goto got_error;
    684 	}
    685 	if (va.va_type != VDIR) {
    686 		error = ENOTDIR;
    687 		goto got_error;
    688 	}
    689 	error = isempty(xc, targ_fileid, &empty);
    690 	if (error != 0) {
    691 		goto got_error;
    692 	}
    693 	if (!empty) {
    694 		error = ENOTEMPTY;
    695 		goto got_error;
    696 	}
    697 	error = unlinkfile(xc, parent_fileid, pcn->pcn_name, targ_fileid);
    698 	if (error == 0) {
    699 		error = update_nlink(xc, parent_fileid, -1);
    700 	}
    701 	if (error != 0) {
    702 		goto got_error;
    703 	}
    704 	error = commit(xc);
    705 	if (error != 0) {
    706 		goto got_error;
    707 	}
    708 	puffs_setback(puffs_cc_getcc(pu), PUFFS_SETBACK_INACT_N2);
    709 	return 0;
    710 got_error:
    711 	rollback(xc);
    712 	if (error == EAGAIN) {
    713 		goto retry;
    714 	}
    715 	return error;
    716 }
    717 
    718 int
    719 pgfs_node_inactive(struct puffs_usermount *pu, puffs_cookie_t opc)
    720 {
    721 	struct Xconn *xc;
    722 	fileid_t fileid = cookie_to_fileid(opc);
    723 	int error;
    724 
    725 	/*
    726 	 * XXX
    727 	 * probably this should be handed to the separate "reaper" context
    728 	 * because lo_unlink() can be too expensive to execute synchronously.
    729 	 * however, the puffs_cc API doesn't provide a way to create a worker
    730 	 * context.
    731 	 */
    732 
    733 	DPRINTF("%llu\n", fileid);
    734 retry:
    735 	xc = begin(pu, "inactive");
    736 	error = cleanupfile(xc, fileid);
    737 	if (error != 0) {
    738 		goto got_error;
    739 	}
    740 	error = commit(xc);
    741 	if (error != 0) {
    742 		goto got_error;
    743 	}
    744 	return 0;
    745 got_error:
    746 	rollback(xc);
    747 	if (error == EAGAIN) {
    748 		goto retry;
    749 	}
    750 	return error;
    751 }
    752 
    753 int
    754 pgfs_node_setattr(struct puffs_usermount *pu, puffs_cookie_t opc,
    755     const struct vattr *va, const struct puffs_cred *pcr)
    756 {
    757 	struct Xconn *xc;
    758 	struct fileid_lock_handle *lock;
    759 	fileid_t fileid = cookie_to_fileid(opc);
    760 	struct vattr ova;
    761 	unsigned int attrs;
    762 	int error;
    763 
    764 	DPRINTF("%llu\n", fileid);
    765 	if (va->va_flags != (u_long)PUFFS_VNOVAL) {
    766 		return EOPNOTSUPP;
    767 	}
    768 	attrs = 0;
    769 	if (va->va_uid != (uid_t)PUFFS_VNOVAL ||
    770 	    va->va_gid != (gid_t)PUFFS_VNOVAL) {
    771 		attrs |= GETATTR_UID|GETATTR_GID|GETATTR_MODE;
    772 	}
    773 	if (va->va_mode != (mode_t)PUFFS_VNOVAL) {
    774 		attrs |= GETATTR_TYPE|GETATTR_UID|GETATTR_GID;
    775 	}
    776 	if (va->va_atime.tv_sec != PUFFS_VNOVAL ||
    777 	    va->va_mtime.tv_sec != PUFFS_VNOVAL ||
    778 	    va->va_ctime.tv_sec != PUFFS_VNOVAL) {
    779 		attrs |= GETATTR_UID|GETATTR_GID|GETATTR_MODE;
    780 	}
    781 	lock = fileid_lock(fileid, puffs_cc_getcc(pu));
    782 retry:
    783 	xc = begin(pu, "setattr");
    784 	error = getattr(xc, fileid, &ova, attrs);
    785 	if (error != 0) {
    786 		goto got_error;
    787 	}
    788 	if (va->va_uid != (uid_t)PUFFS_VNOVAL ||
    789 	    va->va_gid != (gid_t)PUFFS_VNOVAL) {
    790 		static struct cmd *c;
    791 		uint64_t newuid =
    792 		    va->va_uid != (uid_t)PUFFS_VNOVAL ? va->va_uid : ova.va_uid;
    793 		uint64_t newgid =
    794 		    va->va_gid != (gid_t)PUFFS_VNOVAL ? va->va_gid : ova.va_gid;
    795 
    796 		error = puffs_access_chown(ova.va_uid, ova.va_gid,
    797 		    newuid, newgid, pcr);
    798 		if (error != 0) {
    799 			goto got_error;
    800 		}
    801 		CREATECMD(c,
    802 			"UPDATE file "
    803 			"SET uid = $1, gid = $2 "
    804 			"WHERE fileid = $3", INT8OID, INT8OID, INT8OID);
    805 		error = simplecmd(xc, c, newuid, newgid, fileid);
    806 		if (error != 0) {
    807 			goto got_error;
    808 		}
    809 		ova.va_uid = newuid;
    810 		ova.va_gid = newgid;
    811 	}
    812 	if (va->va_mode != (mode_t)PUFFS_VNOVAL) {
    813 		static struct cmd *c;
    814 		uint64_t newmode = va->va_mode;
    815 
    816 		error = puffs_access_chmod(ova.va_uid, ova.va_gid, ova.va_type,
    817 		    newmode, pcr);
    818 		if (error != 0) {
    819 			goto got_error;
    820 		}
    821 		CREATECMD(c,
    822 			"UPDATE file "
    823 			"SET mode = $1 "
    824 			"WHERE fileid = $2", INT8OID, INT8OID);
    825 		error = simplecmd(xc, c, newmode, fileid);
    826 		if (error != 0) {
    827 			goto got_error;
    828 		}
    829 		ova.va_mode = newmode;
    830 	}
    831 	if (va->va_atime.tv_sec != PUFFS_VNOVAL ||
    832 	    va->va_mtime.tv_sec != PUFFS_VNOVAL ||
    833 	    va->va_ctime.tv_sec != PUFFS_VNOVAL ||
    834 	    va->va_birthtime.tv_sec != PUFFS_VNOVAL) {
    835 		error = puffs_access_times(ova.va_uid, ova.va_gid, ova.va_mode,
    836 		    (va->va_vaflags & VA_UTIMES_NULL) != 0, pcr);
    837 		if (error != 0) {
    838 			goto got_error;
    839 		}
    840 		if (va->va_atime.tv_sec != PUFFS_VNOVAL) {
    841 			static struct cmd *c;
    842 			char *ts;
    843 
    844 			error = timespec_to_pgtimestamp(&va->va_atime, &ts);
    845 			if (error != 0) {
    846 				goto got_error;
    847 			}
    848 			CREATECMD(c,
    849 				"UPDATE file "
    850 				"SET atime = $1 "
    851 				"WHERE fileid = $2", TIMESTAMPTZOID, INT8OID);
    852 			error = simplecmd(xc, c, ts, fileid);
    853 			free(ts);
    854 			if (error != 0) {
    855 				goto got_error;
    856 			}
    857 		}
    858 		if (va->va_mtime.tv_sec != PUFFS_VNOVAL) {
    859 			static struct cmd *c;
    860 			char *ts;
    861 
    862 			error = timespec_to_pgtimestamp(&va->va_mtime, &ts);
    863 			if (error != 0) {
    864 				goto got_error;
    865 			}
    866 			CREATECMD(c,
    867 				"UPDATE file "
    868 				"SET mtime = $1 "
    869 				"WHERE fileid = $2", TIMESTAMPTZOID, INT8OID);
    870 			error = simplecmd(xc, c, ts, fileid);
    871 			free(ts);
    872 			if (error != 0) {
    873 				goto got_error;
    874 			}
    875 		}
    876 		if (va->va_ctime.tv_sec != PUFFS_VNOVAL) {
    877 			static struct cmd *c;
    878 			char *ts;
    879 
    880 			error = timespec_to_pgtimestamp(&va->va_ctime, &ts);
    881 			if (error != 0) {
    882 				goto got_error;
    883 			}
    884 			CREATECMD(c,
    885 				"UPDATE file "
    886 				"SET ctime = $1 "
    887 				"WHERE fileid = $2", TIMESTAMPTZOID, INT8OID);
    888 			error = simplecmd(xc, c, ts, fileid);
    889 			free(ts);
    890 			if (error != 0) {
    891 				goto got_error;
    892 			}
    893 		}
    894 		if (va->va_birthtime.tv_sec != PUFFS_VNOVAL) {
    895 			static struct cmd *c;
    896 			char *ts;
    897 
    898 			error = timespec_to_pgtimestamp(&va->va_birthtime, &ts);
    899 			if (error != 0) {
    900 				goto got_error;
    901 			}
    902 			CREATECMD(c,
    903 				"UPDATE file "
    904 				"SET btime = $1 "
    905 				"WHERE fileid = $2", TIMESTAMPTZOID, INT8OID);
    906 			error = simplecmd(xc, c, ts, fileid);
    907 			free(ts);
    908 			if (error != 0) {
    909 				goto got_error;
    910 			}
    911 		}
    912 	}
    913 	if (va->va_size != (uint64_t)PUFFS_VNOVAL) {
    914 		int fd;
    915 
    916 		if (va->va_size > INT_MAX) {
    917 			error = EFBIG;
    918 			goto got_error;
    919 		}
    920 		error = lo_open_by_fileid(xc, fileid, INV_READ|INV_WRITE, &fd);
    921 		if (error != 0) {
    922 			goto got_error;
    923 		}
    924 		error = my_lo_truncate(xc, fd, va->va_size);
    925 		if (error != 0) {
    926 			goto got_error;
    927 		}
    928 		error = my_lo_close(xc, fd);
    929 		if (error != 0) {
    930 			goto got_error;
    931 		}
    932 	}
    933 	error = commit(xc);
    934 	if (error != 0) {
    935 		goto got_error;
    936 	}
    937 	goto done;
    938 got_error:
    939 	rollback(xc);
    940 	if (error == EAGAIN) {
    941 		goto retry;
    942 	}
    943 done:
    944 	fileid_unlock(lock);
    945 	return error;
    946 }
    947 
    948 int
    949 pgfs_node_rename(struct puffs_usermount *pu, puffs_cookie_t src_dir,
    950     puffs_cookie_t src, const struct puffs_cn *pcn_src,
    951     puffs_cookie_t targ_dir, puffs_cookie_t targ,
    952     const struct puffs_cn *pcn_targ)
    953 {
    954 	struct Xconn *xc;
    955 	fileid_t fileid_src_dir = cookie_to_fileid(src_dir);
    956 	fileid_t fileid_src = cookie_to_fileid(src);
    957 	fileid_t fileid_targ_dir = cookie_to_fileid(targ_dir);
    958 	fileid_t fileid_targ = cookie_to_fileid(targ);
    959 	struct vattr va_src;
    960 	struct vattr va_targ;
    961 	int error;
    962 
    963 	DPRINTF("%llu %llu %llu %llu\n", fileid_src_dir, fileid_src,
    964 	    fileid_targ_dir, fileid_targ);
    965 retry:
    966 	xc = begin(pu, "rename");
    967 	error = getattr(xc, fileid_src, &va_src, GETATTR_TYPE);
    968 	if (error != 0) {
    969 		goto got_error;
    970 	}
    971 	if (va_src.va_type == VDIR) {
    972 		error = check_path(xc, fileid_src, fileid_targ_dir);
    973 		if (error != 0) {
    974 			goto got_error;
    975 		}
    976 	}
    977 	if (fileid_targ != 0) {
    978 		error = getattr(xc, fileid_targ, &va_targ,
    979 		    GETATTR_TYPE|GETATTR_NLINK);
    980 		if (error != 0) {
    981 			goto got_error;
    982 		}
    983 		if (va_src.va_type == VDIR) {
    984 			if (va_targ.va_type != VDIR) {
    985 				error = ENOTDIR;
    986 				goto got_error;
    987 			}
    988 			if (va_targ.va_nlink != 2) {
    989 				error = ENOTEMPTY;
    990 				goto got_error;
    991 			}
    992 		} else if (va_targ.va_type == VDIR) {
    993 			error = EISDIR;
    994 			goto got_error;
    995 		}
    996 		error = unlinkfile(xc, fileid_targ_dir, pcn_targ->pcn_name,
    997 		    fileid_targ);
    998 		if (error == 0 && va_targ.va_type == VDIR) {
    999 			error = update_nlink(xc, fileid_targ_dir, -1);
   1000 		}
   1001 		if (error != 0) {
   1002 			goto got_error;
   1003 		}
   1004 	}
   1005 	error = linkfile(xc, fileid_targ_dir, pcn_targ->pcn_name, fileid_src);
   1006 	if (error == 0 && va_src.va_type == VDIR) {
   1007 		error = update_nlink(xc, fileid_targ_dir, 1);
   1008 	}
   1009 	if (error != 0) {
   1010 		goto got_error;
   1011 	}
   1012 	/* XXX ctime? */
   1013 	error = unlinkfile(xc, fileid_src_dir, pcn_src->pcn_name, fileid_src);
   1014 	if (error == 0 && va_src.va_type == VDIR) {
   1015 		error = update_nlink(xc, fileid_src_dir, -1);
   1016 	}
   1017 	if (error != 0) {
   1018 		goto got_error;
   1019 	}
   1020 	error = commit(xc);
   1021 	if (error != 0) {
   1022 		goto got_error;
   1023 	}
   1024 	return 0;
   1025 got_error:
   1026 	rollback(xc);
   1027 	if (error == EAGAIN) {
   1028 		goto retry;
   1029 	}
   1030 	return error;
   1031 }
   1032 
   1033 int
   1034 pgfs_node_symlink(struct puffs_usermount *pu, puffs_cookie_t opc,
   1035     struct puffs_newinfo *pni, const struct puffs_cn *pcn,
   1036     const struct vattr *va, const char *target)
   1037 {
   1038 	struct Xconn *xc;
   1039 	struct puffs_cred *pcr = pcn->pcn_cred;
   1040 	fileid_t parent_fileid = cookie_to_fileid(opc);
   1041 	fileid_t new_fileid;
   1042 	size_t resultlen;
   1043 	size_t targetlen;
   1044 	uid_t uid;
   1045 	gid_t gid;
   1046 	int loid;
   1047 	int fd;
   1048 	int error;
   1049 
   1050 	DPRINTF("%llu %s %s\n", parent_fileid, pcn->pcn_name, target);
   1051 	if (puffs_cred_getuid(pcr, &uid) == -1 ||
   1052 	    puffs_cred_getgid(pcr, &gid) == -1) {
   1053 		return errno;
   1054 	}
   1055 retry:
   1056 	xc = begin(pu, "symlink");
   1057 	error = mklinkfile_lo(xc, parent_fileid, pcn->pcn_name, VLNK,
   1058 	    va->va_mode, uid, gid, &new_fileid, &loid);
   1059 	if (error != 0) {
   1060 		goto got_error;
   1061 	}
   1062 	error = my_lo_open(xc, loid, INV_WRITE, &fd);
   1063 	if (error != 0) {
   1064 		goto got_error;
   1065 	}
   1066 	targetlen = strlen(target);
   1067 	error = my_lo_write(xc, fd, target, targetlen, &resultlen);
   1068 	if (error != 0) {
   1069 		goto got_error;
   1070 	}
   1071 	if (resultlen != targetlen) {
   1072 		error = ENOSPC; /* XXX */
   1073 		goto got_error;
   1074 	}
   1075 	error = commit(xc);
   1076 	if (error != 0) {
   1077 		goto got_error;
   1078 	}
   1079 	puffs_newinfo_setcookie(pni, fileid_to_cookie(new_fileid));
   1080 	return 0;
   1081 got_error:
   1082 	rollback(xc);
   1083 	if (error == EAGAIN) {
   1084 		goto retry;
   1085 	}
   1086 	return error;
   1087 }
   1088 
   1089 int
   1090 pgfs_node_readlink(struct puffs_usermount *pu, puffs_cookie_t opc,
   1091     const struct puffs_cred *pcr, char *buf, size_t *buflenp)
   1092 {
   1093 	fileid_t fileid = cookie_to_fileid(opc);
   1094 	struct Xconn *xc;
   1095 	size_t resultlen;
   1096 	int fd;
   1097 	int error;
   1098 
   1099 	DPRINTF("%llu\n", fileid);
   1100 	xc = begin_readonly(pu, "readlink");
   1101 	error = lo_open_by_fileid(xc, fileid, INV_READ, &fd);
   1102 	if (error != 0) {
   1103 		rollback(xc);
   1104 		return error;
   1105 	}
   1106 	error = my_lo_read(xc, fd, buf, *buflenp, &resultlen);
   1107 	if (error != 0) {
   1108 		rollback(xc);
   1109 		return error;
   1110 	}
   1111 	assert(resultlen <= *buflenp);
   1112 	error = commit(xc);
   1113 	if (error != 0) {
   1114 		return error;
   1115 	}
   1116 	*buflenp = resultlen;
   1117 	return 0;
   1118 }
   1119 
   1120 int
   1121 pgfs_node_access(struct puffs_usermount *pu, puffs_cookie_t opc,
   1122     int mode, const struct puffs_cred *pcr)
   1123 {
   1124 	struct Xconn *xc;
   1125 	fileid_t fileid = cookie_to_fileid(opc);
   1126 	struct vattr va;
   1127 	int error;
   1128 
   1129 	DPRINTF("%llu\n", fileid);
   1130 retry:
   1131 	xc = begin_readonly(pu, "access");
   1132 	error = getattr(xc, fileid, &va,
   1133 	    GETATTR_TYPE|GETATTR_MODE|GETATTR_UID|GETATTR_GID);
   1134 	if (error != 0) {
   1135 		goto got_error;
   1136 	}
   1137 	error = commit(xc);
   1138 	if (error != 0) {
   1139 		goto got_error;
   1140 	}
   1141 	return puffs_access(va.va_type, va.va_mode, va.va_uid, va.va_gid, mode,
   1142 	    pcr);
   1143 got_error:
   1144 	rollback(xc);
   1145 	if (error == EAGAIN) {
   1146 		goto retry;
   1147 	}
   1148 	return error;
   1149 }
   1150 
   1151 int
   1152 pgfs_node_fsync(struct puffs_usermount *pu, puffs_cookie_t opc,
   1153     const struct puffs_cred *pcr, int flags, off_t offlo, off_t offhi)
   1154 {
   1155 	fileid_t fileid = cookie_to_fileid(opc);
   1156 
   1157 	DPRINTF("%llu\n", fileid);
   1158 	return flush_xacts(pu);
   1159 }
   1160 
   1161 int
   1162 pgfs_fs_statvfs(struct puffs_usermount *pu, struct statvfs *sbp)
   1163 {
   1164 	struct Xconn *xc;
   1165 	uint64_t nfiles;
   1166 	uint64_t bytes;
   1167 	uint64_t lo_bytes;
   1168 	static struct cmd *c_nfiles;
   1169 	static struct cmd *c_bytes;
   1170 	static struct cmd *c_lobytes;
   1171 	static const Oid types[] = { INT8OID, };
   1172 	struct fetchstatus s;
   1173 	int error;
   1174 
   1175 retry:
   1176 	xc = begin_readonly(pu, "statvfs");
   1177 	/*
   1178 	 * use an estimate which we can retrieve quickly, instead of
   1179 	 * "SELECT count(*) from file".
   1180 	 */
   1181 	CREATECMD_NOPARAM(c_nfiles,
   1182 		"SELECT reltuples::int8 "
   1183 		"FROM pg_class c LEFT JOIN pg_namespace n "
   1184 		"ON (n.oid=c.relnamespace) "
   1185 		"WHERE n.nspname = 'pgfs' AND c.relname = 'file'");
   1186 	CREATECMD_NOPARAM(c_bytes,
   1187 		"SELECT sum(pg_total_relation_size(c.oid))::int8 "
   1188 		"FROM pg_class c LEFT JOIN pg_namespace n "
   1189 		"ON (n.oid=c.relnamespace) "
   1190 		"WHERE n.nspname = 'pgfs'");
   1191 	/*
   1192 	 * the following is not correct if someone else is using large objects
   1193 	 * in the same database.  we don't bother to join with datafork it as
   1194 	 * it's too expensive for the little benefit.
   1195 	 */
   1196 	CREATECMD_NOPARAM(c_lobytes,
   1197 		"SELECT pg_total_relation_size('pg_largeobject')::int8");
   1198 	error = sendcmd(xc, c_nfiles);
   1199 	if (error != 0) {
   1200 		goto got_error;
   1201 	}
   1202 	fetchinit(&s, xc);
   1203 	error = FETCHNEXT(&s, types, &nfiles);
   1204 	fetchdone(&s);
   1205 	if (error != 0) {
   1206 		goto got_error;
   1207 	}
   1208 	error = sendcmd(xc, c_bytes);
   1209 	if (error != 0) {
   1210 		goto got_error;
   1211 	}
   1212 	fetchinit(&s, xc);
   1213 	error = FETCHNEXT(&s, types, &bytes);
   1214 	fetchdone(&s);
   1215 	if (error != 0) {
   1216 		goto got_error;
   1217 	}
   1218 	error = sendcmd(xc, c_lobytes);
   1219 	if (error != 0) {
   1220 		goto got_error;
   1221 	}
   1222 	fetchinit(&s, xc);
   1223 	error = FETCHNEXT(&s, types, &lo_bytes);
   1224 	fetchdone(&s);
   1225 	if (error != 0) {
   1226 		goto got_error;
   1227 	}
   1228 	error = commit(xc);
   1229 	if (error != 0) {
   1230 		goto got_error;
   1231 	}
   1232 	/*
   1233 	 * XXX fill f_blocks and f_files with meaningless large values.
   1234 	 * there are no easy way to provide meaningful values for them
   1235 	 * esp. with tablespaces.
   1236 	 */
   1237 	sbp->f_bsize = LOBLKSIZE;
   1238 	sbp->f_frsize = LOBLKSIZE;
   1239 	sbp->f_blocks = INT64_MAX / 100 / sbp->f_frsize;
   1240 	sbp->f_bfree = sbp->f_blocks - howmany(bytes + lo_bytes, sbp->f_frsize);
   1241 	sbp->f_bavail = sbp->f_bfree;
   1242 	sbp->f_bresvd = 0;
   1243 	sbp->f_files = INT_MAX;
   1244 	sbp->f_ffree = sbp->f_files - nfiles;
   1245 	sbp->f_favail = sbp->f_ffree;
   1246 	sbp->f_fresvd = 0;
   1247 	return 0;
   1248 got_error:
   1249 	rollback(xc);
   1250 	if (error == EAGAIN) {
   1251 		goto retry;
   1252 	}
   1253 	return error;
   1254 }
   1255