Home | History | Annotate | Line # | Download | only in pgfs
      1  1.5  yamt /*	$NetBSD: pgfs_subs.c,v 1.5 2012/04/11 14:28:18 yamt Exp $	*/
      2  1.1  yamt 
      3  1.1  yamt /*-
      4  1.1  yamt  * Copyright (c)2010,2011 YAMAMOTO Takashi,
      5  1.1  yamt  * All rights reserved.
      6  1.1  yamt  *
      7  1.1  yamt  * Redistribution and use in source and binary forms, with or without
      8  1.1  yamt  * modification, are permitted provided that the following conditions
      9  1.1  yamt  * are met:
     10  1.1  yamt  * 1. Redistributions of source code must retain the above copyright
     11  1.1  yamt  *    notice, this list of conditions and the following disclaimer.
     12  1.1  yamt  * 2. Redistributions in binary form must reproduce the above copyright
     13  1.1  yamt  *    notice, this list of conditions and the following disclaimer in the
     14  1.1  yamt  *    documentation and/or other materials provided with the distribution.
     15  1.1  yamt  *
     16  1.1  yamt  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
     17  1.1  yamt  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     18  1.1  yamt  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     19  1.1  yamt  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     20  1.1  yamt  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     21  1.1  yamt  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
     22  1.1  yamt  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     23  1.1  yamt  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     24  1.1  yamt  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     25  1.1  yamt  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     26  1.1  yamt  * SUCH DAMAGE.
     27  1.1  yamt  */
     28  1.1  yamt 
     29  1.1  yamt /*
     30  1.1  yamt  * a file system server which stores the data in a PostgreSQL database.
     31  1.1  yamt  */
     32  1.1  yamt 
     33  1.1  yamt /*
     34  1.1  yamt  * we use large objects to store file contents.  there are a few XXXs wrt it.
     35  1.1  yamt  *
     36  1.1  yamt  * - large objects don't obey the normal transaction semantics.
     37  1.1  yamt  *
     38  1.1  yamt  * - we use large object server-side functions directly (instead of via the
     39  1.1  yamt  *   libpq large object api) because:
     40  1.1  yamt  *	- we want to use asynchronous (in the sense of PQsendFoo) operations
     41  1.1  yamt  *	  which is not available with the libpq large object api.
     42  1.1  yamt  *	- with the libpq large object api, there's no way to know details of
     43  1.1  yamt  *	  an error because PGresult is freed in the library without saving
     44  1.1  yamt  *	  PG_DIAG_SQLSTATE etc.
     45  1.1  yamt  */
     46  1.1  yamt 
     47  1.1  yamt #include <sys/cdefs.h>
     48  1.1  yamt #ifndef lint
     49  1.5  yamt __RCSID("$NetBSD: pgfs_subs.c,v 1.5 2012/04/11 14:28:18 yamt Exp $");
     50  1.1  yamt #endif /* not lint */
     51  1.1  yamt 
     52  1.1  yamt #include <assert.h>
     53  1.1  yamt #include <err.h>
     54  1.1  yamt #include <errno.h>
     55  1.1  yamt #include <puffs.h>
     56  1.1  yamt #include <inttypes.h>
     57  1.1  yamt #include <stdarg.h>
     58  1.1  yamt #include <stdbool.h>
     59  1.1  yamt #include <stdio.h>
     60  1.1  yamt #include <stdlib.h>
     61  1.1  yamt #include <time.h>
     62  1.1  yamt #include <util.h>
     63  1.1  yamt 
     64  1.1  yamt #include <libpq-fe.h>
     65  1.1  yamt #include <libpq/libpq-fs.h>	/* INV_* */
     66  1.1  yamt 
     67  1.1  yamt #include "pgfs.h"
     68  1.1  yamt #include "pgfs_db.h"
     69  1.1  yamt #include "pgfs_debug.h"
     70  1.1  yamt #include "pgfs_waitq.h"
     71  1.1  yamt #include "pgfs_subs.h"
     72  1.1  yamt 
     73  1.1  yamt const char * const vtype_table[] = {
     74  1.1  yamt 	[VREG] = "regular",
     75  1.1  yamt 	[VDIR] = "directory",
     76  1.1  yamt 	[VLNK] = "link",
     77  1.1  yamt };
     78  1.1  yamt 
     79  1.1  yamt static unsigned int
     80  1.1  yamt tovtype(const char *type)
     81  1.1  yamt {
     82  1.1  yamt 	unsigned int i;
     83  1.1  yamt 
     84  1.1  yamt 	for (i = 0; i < __arraycount(vtype_table); i++) {
     85  1.1  yamt 		if (vtype_table[i] == NULL) {
     86  1.1  yamt 			continue;
     87  1.1  yamt 		}
     88  1.1  yamt 		if (!strcmp(type, vtype_table[i])) {
     89  1.1  yamt 			return i;
     90  1.1  yamt 		}
     91  1.1  yamt 	}
     92  1.1  yamt 	assert(0);
     93  1.1  yamt 	return 0;
     94  1.1  yamt }
     95  1.1  yamt 
     96  1.1  yamt static const char *
     97  1.1  yamt fromvtype(enum vtype vtype)
     98  1.1  yamt {
     99  1.1  yamt 
    100  1.1  yamt 	if (vtype < __arraycount(vtype_table)) {
    101  1.1  yamt 		assert(vtype_table[vtype] != NULL);
    102  1.1  yamt 		return vtype_table[vtype];
    103  1.1  yamt 	}
    104  1.1  yamt 	return NULL;
    105  1.1  yamt }
    106  1.1  yamt 
    107  1.1  yamt /*
    108  1.1  yamt  * fileid_lock stuff below is to keep ordering of operations for a file.
    109  1.1  yamt  * it is a workaround for the lack of operation barriers in the puffs
    110  1.1  yamt  * protocol.
    111  1.1  yamt  *
    112  1.1  yamt  * currently we do this locking only for SETATTR, GETATTR, and WRITE as
    113  1.1  yamt  * they are known to be reorder-unsafe.  they are sensitive to the file
    114  1.1  yamt  * attributes, mainly the file size.  note that as the kernel issues async
    115  1.1  yamt  * SETATTR/WRITE requests, vnode lock doesn't prevent GETATTR from seeing
    116  1.1  yamt  * the stale attributes.
    117  1.1  yamt  *
    118  1.1  yamt  * we are relying on waiton/wakeup being a FIFO.
    119  1.1  yamt  */
    120  1.1  yamt 
    121  1.1  yamt struct fileid_lock_handle {
    122  1.1  yamt 	TAILQ_ENTRY(fileid_lock_handle) list;
    123  1.1  yamt 	fileid_t fileid;
    124  1.1  yamt 	struct puffs_cc *owner;	/* diagnostic only */
    125  1.1  yamt 	struct waitq waitq;
    126  1.1  yamt };
    127  1.1  yamt 
    128  1.1  yamt TAILQ_HEAD(, fileid_lock_handle) fileid_lock_list =
    129  1.1  yamt     TAILQ_HEAD_INITIALIZER(fileid_lock_list);
    130  1.1  yamt struct waitq fileid_lock_waitq = TAILQ_HEAD_INITIALIZER(fileid_lock_waitq);
    131  1.1  yamt 
    132  1.1  yamt /*
    133  1.1  yamt  * fileid_lock: serialize requests for the fileid.
    134  1.1  yamt  *
    135  1.1  yamt  * this function should be the first yieldable point in a puffs callback.
    136  1.1  yamt  */
    137  1.1  yamt 
    138  1.1  yamt struct fileid_lock_handle *
    139  1.1  yamt fileid_lock(fileid_t fileid, struct puffs_cc *cc)
    140  1.1  yamt {
    141  1.1  yamt 	struct fileid_lock_handle *lock;
    142  1.1  yamt 
    143  1.1  yamt 	TAILQ_FOREACH(lock, &fileid_lock_list, list) {
    144  1.1  yamt 		if (lock->fileid == fileid) {
    145  1.1  yamt 			DPRINTF("fileid wait %" PRIu64 " cc %p\n", fileid, cc);
    146  1.1  yamt 			assert(lock->owner != cc);
    147  1.1  yamt 			waiton(&lock->waitq, cc);	/* enter FIFO */
    148  1.1  yamt 			assert(lock->owner == cc);
    149  1.1  yamt 			return lock;
    150  1.1  yamt 		}
    151  1.1  yamt 	}
    152  1.1  yamt 	lock = emalloc(sizeof(*lock));
    153  1.1  yamt 	lock->fileid = fileid;
    154  1.1  yamt 	lock->owner = cc;
    155  1.1  yamt 	DPRINTF("fileid lock %" PRIu64 " cc %p\n", lock->fileid, cc);
    156  1.1  yamt 	waitq_init(&lock->waitq);
    157  1.1  yamt 	TAILQ_INSERT_HEAD(&fileid_lock_list, lock, list);
    158  1.1  yamt 	return lock;
    159  1.1  yamt }
    160  1.1  yamt 
    161  1.1  yamt void
    162  1.1  yamt fileid_unlock(struct fileid_lock_handle *lock)
    163  1.1  yamt {
    164  1.1  yamt 
    165  1.1  yamt 	DPRINTF("fileid unlock %" PRIu64 "\n", lock->fileid);
    166  1.1  yamt 	assert(lock != NULL);
    167  1.1  yamt 	assert(lock->owner != NULL);
    168  1.1  yamt 	/*
    169  1.1  yamt 	 * perform direct-handoff to the first waiter.
    170  1.1  yamt 	 *
    171  1.1  yamt 	 * a handoff is essential to keep the order of requests.
    172  1.1  yamt 	 */
    173  1.1  yamt 	lock->owner = wakeup_one(&lock->waitq);
    174  1.1  yamt 	if (lock->owner != NULL) {
    175  1.1  yamt 		return;
    176  1.1  yamt 	}
    177  1.1  yamt 	/*
    178  1.1  yamt 	 * no one is waiting this fileid.
    179  1.1  yamt 	 */
    180  1.1  yamt 	TAILQ_REMOVE(&fileid_lock_list, lock, list);
    181  1.1  yamt 	free(lock);
    182  1.1  yamt }
    183  1.1  yamt 
    184  1.1  yamt /*
    185  1.1  yamt  * timespec_to_pgtimestamp: create a text representation of timestamp which
    186  1.1  yamt  * can be recognized by the database server.
    187  1.1  yamt  *
    188  1.1  yamt  * it's caller's responsibility to free(3) the result.
    189  1.1  yamt  */
    190  1.1  yamt 
    191  1.1  yamt int
    192  1.1  yamt timespec_to_pgtimestamp(const struct timespec *tv, char **resultp)
    193  1.1  yamt {
    194  1.1  yamt 	/*
    195  1.1  yamt 	 * XXX is there any smarter way?
    196  1.1  yamt 	 */
    197  1.1  yamt 	char buf1[1024];
    198  1.1  yamt 	char buf2[1024];
    199  1.1  yamt 	struct tm tm_store;
    200  1.1  yamt 	struct tm *tm;
    201  1.1  yamt 
    202  1.1  yamt 	tm = gmtime_r(&tv->tv_sec, &tm_store);
    203  1.1  yamt 	if (tm == NULL) {
    204  1.1  yamt 		assert(errno != 0);
    205  1.1  yamt 		return errno;
    206  1.1  yamt 	}
    207  1.1  yamt 	strftime(buf1, sizeof(buf1), "%Y%m%dT%H%M%S", tm);
    208  1.1  yamt 	snprintf(buf2, sizeof(buf2), "%s.%ju", buf1,
    209  1.1  yamt 	    (uintmax_t)tv->tv_nsec / 1000);
    210  1.1  yamt 	*resultp = estrdup(buf2);
    211  1.1  yamt 	return 0;
    212  1.1  yamt }
    213  1.1  yamt 
    214  1.1  yamt int
    215  1.1  yamt my_lo_truncate(struct Xconn *xc, int32_t fd, int32_t size)
    216  1.1  yamt {
    217  1.1  yamt 	static struct cmd *c;
    218  1.1  yamt 	int32_t ret;
    219  1.1  yamt 	int error;
    220  1.1  yamt 
    221  1.1  yamt 	CREATECMD(c, "SELECT lo_truncate($1, $2)", INT4OID, INT4OID);
    222  1.1  yamt 	error = sendcmd(xc, c, fd, size);
    223  1.1  yamt 	if (error != 0) {
    224  1.1  yamt 		return error;
    225  1.1  yamt 	}
    226  1.1  yamt 	error = simplefetch(xc, INT4OID, &ret);
    227  1.1  yamt 	if (error != 0) {
    228  1.1  yamt 		if (error == EEXIST) {
    229  1.1  yamt 			/*
    230  1.1  yamt 			 * probably the insertion of the new-sized page
    231  1.1  yamt 			 * caused a duplicated key error.  retry.
    232  1.1  yamt 			 */
    233  1.1  yamt 			DPRINTF("map EEXIST to EAGAIN\n");
    234  1.1  yamt 			error = EAGAIN;
    235  1.1  yamt 		}
    236  1.1  yamt 		return error;
    237  1.1  yamt 	}
    238  1.1  yamt 	assert(ret == 0);
    239  1.1  yamt 	return 0;
    240  1.1  yamt }
    241  1.1  yamt 
    242  1.1  yamt int
    243  1.1  yamt my_lo_lseek(struct Xconn *xc, int32_t fd, int32_t offset, int32_t whence,
    244  1.1  yamt     int32_t *retp)
    245  1.1  yamt {
    246  1.1  yamt 	static struct cmd *c;
    247  1.1  yamt 	int32_t ret;
    248  1.1  yamt 	int error;
    249  1.1  yamt 
    250  1.1  yamt 	CREATECMD(c, "SELECT lo_lseek($1, $2, $3)", INT4OID, INT4OID, INT4OID);
    251  1.1  yamt 	error = sendcmd(xc, c, fd, offset, whence);
    252  1.1  yamt 	if (error != 0) {
    253  1.1  yamt 		return error;
    254  1.1  yamt 	}
    255  1.1  yamt 	error = simplefetch(xc, INT4OID, &ret);
    256  1.1  yamt 	if (error != 0) {
    257  1.1  yamt 		return error;
    258  1.1  yamt 	}
    259  1.1  yamt 	if (retp != NULL) {
    260  1.1  yamt 		*retp = ret;
    261  1.1  yamt 	}
    262  1.1  yamt 	return 0;
    263  1.1  yamt }
    264  1.1  yamt 
    265  1.1  yamt int
    266  1.1  yamt my_lo_read(struct Xconn *xc, int32_t fd, void *buf, size_t size,
    267  1.1  yamt     size_t *resultsizep)
    268  1.1  yamt {
    269  1.1  yamt 	static struct cmd *c;
    270  1.1  yamt 	size_t resultsize;
    271  1.1  yamt 	int error;
    272  1.1  yamt 
    273  1.1  yamt 	CREATECMD(c, "SELECT loread($1, $2)", INT4OID, INT4OID);
    274  1.1  yamt 	error = sendcmdx(xc, 1, c, fd, (int32_t)size);
    275  1.1  yamt 	if (error != 0) {
    276  1.1  yamt 		return error;
    277  1.1  yamt 	}
    278  1.1  yamt 	error = simplefetch(xc, BYTEA, buf, &resultsize);
    279  1.1  yamt 	if (error != 0) {
    280  1.1  yamt 		return error;
    281  1.1  yamt 	}
    282  1.1  yamt 	*resultsizep = resultsize;
    283  1.1  yamt 	if (size != resultsize) {
    284  1.1  yamt 		DPRINTF("shortread? %zu != %zu\n", size, resultsize);
    285  1.1  yamt 	}
    286  1.1  yamt 	return 0;
    287  1.1  yamt }
    288  1.1  yamt 
    289  1.1  yamt int
    290  1.1  yamt my_lo_write(struct Xconn *xc, int32_t fd, const void *buf, size_t size,
    291  1.1  yamt     size_t *resultsizep)
    292  1.1  yamt {
    293  1.1  yamt 	static struct cmd *c;
    294  1.1  yamt 	int32_t resultsize;
    295  1.1  yamt 	int error;
    296  1.1  yamt 
    297  1.1  yamt 	CREATECMD(c, "SELECT lowrite($1, $2)", INT4OID, BYTEA);
    298  1.1  yamt 	error = sendcmd(xc, c, fd, buf, (int32_t)size);
    299  1.1  yamt 	if (error != 0) {
    300  1.1  yamt 		return error;
    301  1.1  yamt 	}
    302  1.1  yamt 	error = simplefetch(xc, INT4OID, &resultsize);
    303  1.1  yamt 	if (error != 0) {
    304  1.1  yamt 		if (error == EEXIST) {
    305  1.1  yamt 			/*
    306  1.1  yamt 			 * probably the insertion of the new data page
    307  1.1  yamt 			 * caused a duplicated key error.  retry.
    308  1.1  yamt 			 */
    309  1.1  yamt 			DPRINTF("map EEXIST to EAGAIN\n");
    310  1.1  yamt 			error = EAGAIN;
    311  1.1  yamt 		}
    312  1.1  yamt 		return error;
    313  1.1  yamt 	}
    314  1.1  yamt 	*resultsizep = resultsize;
    315  1.1  yamt 	if (size != (size_t)resultsize) {
    316  1.1  yamt 		DPRINTF("shortwrite? %zu != %zu\n", size, (size_t)resultsize);
    317  1.1  yamt 	}
    318  1.1  yamt 	return 0;
    319  1.1  yamt }
    320  1.1  yamt 
    321  1.1  yamt int
    322  1.1  yamt my_lo_open(struct Xconn *xc, Oid loid, int32_t mode, int32_t *fdp)
    323  1.1  yamt {
    324  1.1  yamt 	static struct cmd *c;
    325  1.1  yamt 	int error;
    326  1.1  yamt 
    327  1.1  yamt 	CREATECMD(c, "SELECT lo_open($1, $2)", OIDOID, INT4OID);
    328  1.1  yamt 	error = sendcmd(xc, c, loid, mode);
    329  1.1  yamt 	if (error != 0) {
    330  1.1  yamt 		return error;
    331  1.1  yamt 	}
    332  1.1  yamt 	return simplefetch(xc, INT4OID, fdp);
    333  1.1  yamt }
    334  1.1  yamt 
    335  1.1  yamt int
    336  1.1  yamt my_lo_close(struct Xconn *xc, int32_t fd)
    337  1.1  yamt {
    338  1.5  yamt #if 1
    339  1.5  yamt 	/*
    340  1.5  yamt 	 * do nothing.
    341  1.5  yamt 	 *
    342  1.5  yamt 	 * LO handles are automatically closed at the end of transactions.
    343  1.5  yamt 	 * our transactions are small enough.
    344  1.5  yamt 	 */
    345  1.5  yamt #else
    346  1.1  yamt 	static struct cmd *c;
    347  1.1  yamt 	int32_t ret;
    348  1.1  yamt 	int error;
    349  1.1  yamt 
    350  1.1  yamt 	CREATECMD(c, "SELECT lo_close($1)", INT4OID);
    351  1.1  yamt 	error = sendcmd(xc, c, fd);
    352  1.1  yamt 	if (error != 0) {
    353  1.1  yamt 		return error;
    354  1.1  yamt 	}
    355  1.1  yamt 	error = simplefetch(xc, INT4OID, &ret);
    356  1.1  yamt 	if (error != 0) {
    357  1.1  yamt 		return error;
    358  1.1  yamt 	}
    359  1.1  yamt 	assert(ret == 0);
    360  1.5  yamt #endif
    361  1.1  yamt 	return 0;
    362  1.1  yamt }
    363  1.1  yamt 
    364  1.1  yamt static int
    365  1.1  yamt lo_lookup_by_fileid(struct Xconn *xc, fileid_t fileid, Oid *idp)
    366  1.1  yamt {
    367  1.1  yamt 	static struct cmd *c;
    368  1.1  yamt 	static const Oid types[] = { OIDOID, };
    369  1.1  yamt 	struct fetchstatus s;
    370  1.1  yamt 	int error;
    371  1.1  yamt 
    372  1.1  yamt 	CREATECMD(c, "SELECT loid FROM datafork WHERE fileid = $1", INT8OID);
    373  1.1  yamt 	error = sendcmd(xc, c, fileid);
    374  1.1  yamt 	if (error != 0) {
    375  1.1  yamt 		return error;
    376  1.1  yamt 	}
    377  1.1  yamt 	fetchinit(&s, xc);
    378  1.1  yamt 	error = FETCHNEXT(&s, types, idp);
    379  1.1  yamt 	fetchdone(&s);
    380  1.1  yamt 	DPRINTF("error %d\n", error);
    381  1.1  yamt 	return error;
    382  1.1  yamt }
    383  1.1  yamt 
    384  1.1  yamt int
    385  1.1  yamt lo_open_by_fileid(struct Xconn *xc, fileid_t fileid, int mode, int *fdp)
    386  1.1  yamt {
    387  1.1  yamt 	Oid loid;
    388  1.1  yamt 	int fd;
    389  1.1  yamt 	int error;
    390  1.1  yamt 
    391  1.1  yamt 	error = lo_lookup_by_fileid(xc, fileid, &loid);
    392  1.1  yamt 	if (error != 0) {
    393  1.1  yamt 		return error;
    394  1.1  yamt 	}
    395  1.1  yamt 	error = my_lo_open(xc, loid, mode, &fd);
    396  1.1  yamt 	if (error != 0) {
    397  1.1  yamt 		return error;
    398  1.1  yamt 	}
    399  1.1  yamt 	*fdp = fd;
    400  1.1  yamt 	return 0;
    401  1.1  yamt }
    402  1.1  yamt 
    403  1.1  yamt static int
    404  1.1  yamt getsize(struct Xconn *xc, fileid_t fileid, int *resultp)
    405  1.1  yamt {
    406  1.1  yamt 	int32_t size;
    407  1.1  yamt 	int fd;
    408  1.1  yamt 	int error;
    409  1.1  yamt 
    410  1.1  yamt 	error = lo_open_by_fileid(xc, fileid, INV_READ, &fd);
    411  1.1  yamt 	if (error != 0) {
    412  1.1  yamt 		return error;
    413  1.1  yamt 	}
    414  1.1  yamt 	error = my_lo_lseek(xc, fd, 0, SEEK_END, &size);
    415  1.1  yamt 	if (error != 0) {
    416  1.1  yamt 		return error;
    417  1.1  yamt 	}
    418  1.1  yamt 	error = my_lo_close(xc, fd);
    419  1.1  yamt 	if (error != 0) {
    420  1.1  yamt 		return error;
    421  1.1  yamt 	}
    422  1.1  yamt 	*resultp = size;
    423  1.1  yamt 	return 0;
    424  1.1  yamt }
    425  1.1  yamt 
    426  1.1  yamt #define	GETATTR_TYPE	0x00000001
    427  1.1  yamt #define	GETATTR_NLINK	0x00000002
    428  1.1  yamt #define	GETATTR_SIZE	0x00000004
    429  1.1  yamt #define	GETATTR_MODE	0x00000008
    430  1.1  yamt #define	GETATTR_UID	0x00000010
    431  1.1  yamt #define	GETATTR_GID	0x00000020
    432  1.1  yamt #define	GETATTR_TIME	0x00000040
    433  1.1  yamt #define	GETATTR_ALL	\
    434  1.1  yamt 	(GETATTR_TYPE|GETATTR_NLINK|GETATTR_SIZE|GETATTR_MODE| \
    435  1.1  yamt 	GETATTR_UID|GETATTR_GID|GETATTR_TIME)
    436  1.1  yamt 
    437  1.1  yamt int
    438  1.1  yamt getattr(struct Xconn *xc, fileid_t fileid, struct vattr *va, unsigned int mask)
    439  1.1  yamt {
    440  1.1  yamt 	char *type;
    441  1.1  yamt 	long long atime_s;
    442  1.1  yamt 	long long atime_us;
    443  1.1  yamt 	long long ctime_s;
    444  1.1  yamt 	long long ctime_us;
    445  1.1  yamt 	long long mtime_s;
    446  1.1  yamt 	long long mtime_us;
    447  1.1  yamt 	long long btime_s;
    448  1.1  yamt 	long long btime_us;
    449  1.1  yamt 	uint64_t mode;
    450  1.1  yamt 	long long uid;
    451  1.1  yamt 	long long gid;
    452  1.1  yamt 	long long nlink;
    453  1.1  yamt 	long long rev;
    454  1.1  yamt 	struct fetchstatus s;
    455  1.1  yamt 	int error;
    456  1.1  yamt 
    457  1.1  yamt 	if (mask == 0) {
    458  1.1  yamt 		return 0;
    459  1.1  yamt 	}
    460  1.1  yamt 	/*
    461  1.1  yamt 	 * unless explicitly requested, avoid fetching timestamps as they
    462  1.1  yamt 	 * are a little more expensive than other simple attributes.
    463  1.1  yamt 	 */
    464  1.1  yamt 	if ((mask & GETATTR_TIME) != 0) {
    465  1.1  yamt 		static struct cmd *c;
    466  1.1  yamt 		static const Oid types[] = {
    467  1.1  yamt 			TEXTOID,
    468  1.1  yamt 			INT8OID,
    469  1.1  yamt 			INT8OID,
    470  1.1  yamt 			INT8OID,
    471  1.1  yamt 			INT8OID,
    472  1.1  yamt 			INT8OID,
    473  1.1  yamt 			INT8OID,
    474  1.1  yamt 			INT8OID,
    475  1.1  yamt 			INT8OID,
    476  1.1  yamt 			INT8OID,
    477  1.1  yamt 			INT8OID,
    478  1.1  yamt 			INT8OID,
    479  1.1  yamt 			INT8OID,
    480  1.1  yamt 			INT8OID,
    481  1.1  yamt 		};
    482  1.1  yamt 
    483  1.1  yamt 		CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev, "
    484  1.1  yamt 		    "extract(epoch from date_trunc('second', atime))::int8, "
    485  1.1  yamt 		    "extract(microseconds from atime)::int8, "
    486  1.1  yamt 		    "extract(epoch from date_trunc('second', ctime))::int8, "
    487  1.1  yamt 		    "extract(microseconds from ctime)::int8, "
    488  1.1  yamt 		    "extract(epoch from date_trunc('second', mtime))::int8, "
    489  1.1  yamt 		    "extract(microseconds from mtime)::int8, "
    490  1.1  yamt 		    "extract(epoch from date_trunc('second', btime))::int8, "
    491  1.1  yamt 		    "extract(microseconds from btime)::int8 "
    492  1.1  yamt 		    "FROM file "
    493  1.1  yamt 		    "WHERE fileid = $1", INT8OID);
    494  1.1  yamt 		error = sendcmd(xc, c, fileid);
    495  1.1  yamt 		if (error != 0) {
    496  1.1  yamt 			return error;
    497  1.1  yamt 		}
    498  1.1  yamt 		fetchinit(&s, xc);
    499  1.1  yamt 		error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink,
    500  1.1  yamt 		    &rev,
    501  1.1  yamt 		    &atime_s, &atime_us,
    502  1.1  yamt 		    &ctime_s, &ctime_us,
    503  1.1  yamt 		    &mtime_s, &mtime_us,
    504  1.1  yamt 		    &btime_s, &btime_us);
    505  1.1  yamt 	} else {
    506  1.1  yamt 		static struct cmd *c;
    507  1.1  yamt 		static const Oid types[] = {
    508  1.1  yamt 			TEXTOID,
    509  1.1  yamt 			INT8OID,
    510  1.1  yamt 			INT8OID,
    511  1.1  yamt 			INT8OID,
    512  1.1  yamt 			INT8OID,
    513  1.1  yamt 			INT8OID,
    514  1.1  yamt 		};
    515  1.1  yamt 
    516  1.1  yamt 		CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev "
    517  1.1  yamt 		    "FROM file "
    518  1.1  yamt 		    "WHERE fileid = $1", INT8OID);
    519  1.1  yamt 		error = sendcmd(xc, c, fileid);
    520  1.1  yamt 		if (error != 0) {
    521  1.1  yamt 			return error;
    522  1.1  yamt 		}
    523  1.1  yamt 		fetchinit(&s, xc);
    524  1.1  yamt 		error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink,
    525  1.1  yamt 		    &rev);
    526  1.1  yamt 	}
    527  1.1  yamt 	fetchdone(&s);
    528  1.1  yamt 	if (error != 0) {
    529  1.1  yamt 		return error;
    530  1.1  yamt 	}
    531  1.1  yamt 	memset(va, 0xaa, sizeof(*va)); /* fill with garbage for debug */
    532  1.1  yamt 	va->va_type = tovtype(type);
    533  1.1  yamt 	free(type);
    534  1.1  yamt 	va->va_mode = mode;
    535  1.1  yamt 	va->va_uid = uid;
    536  1.1  yamt 	va->va_gid = gid;
    537  1.1  yamt 	if (nlink > 0 && va->va_type == VDIR) {
    538  1.1  yamt 		nlink++; /* "." */
    539  1.1  yamt 	}
    540  1.1  yamt 	va->va_nlink = nlink;
    541  1.1  yamt 	va->va_fileid = fileid;
    542  1.1  yamt 	va->va_atime.tv_sec = atime_s;
    543  1.1  yamt 	va->va_atime.tv_nsec = atime_us * 1000;
    544  1.1  yamt 	va->va_ctime.tv_sec = ctime_s;
    545  1.1  yamt 	va->va_ctime.tv_nsec = ctime_us * 1000;
    546  1.1  yamt 	va->va_mtime.tv_sec = mtime_s;
    547  1.1  yamt 	va->va_mtime.tv_nsec = mtime_us * 1000;
    548  1.1  yamt 	va->va_birthtime.tv_sec = btime_s;
    549  1.1  yamt 	va->va_birthtime.tv_nsec = btime_us * 1000;
    550  1.1  yamt 	va->va_blocksize = LOBLKSIZE;
    551  1.1  yamt 	va->va_gen = 1;
    552  1.1  yamt 	va->va_filerev = rev;
    553  1.1  yamt 	if ((mask & GETATTR_SIZE) != 0) {
    554  1.1  yamt 		int size;
    555  1.1  yamt 
    556  1.1  yamt 		size = 0;
    557  1.1  yamt 		if (va->va_type == VREG || va->va_type == VLNK) {
    558  1.1  yamt 			error = getsize(xc, fileid, &size);
    559  1.1  yamt 			if (error != 0) {
    560  1.1  yamt 				return error;
    561  1.1  yamt 			}
    562  1.1  yamt 		} else if (va->va_type == VDIR) {
    563  1.1  yamt 			size = 100; /* XXX */
    564  1.1  yamt 		}
    565  1.1  yamt 		va->va_size = size;
    566  1.1  yamt 	}
    567  1.1  yamt 	/*
    568  1.1  yamt 	 * XXX va_bytes: likely wrong due to toast compression.
    569  1.1  yamt 	 * there's no cheap way to get the compressed size of LO.
    570  1.1  yamt 	 */
    571  1.1  yamt 	va->va_bytes = va->va_size;
    572  1.1  yamt 	va->va_flags = 0;
    573  1.1  yamt 	return 0;
    574  1.1  yamt }
    575  1.1  yamt 
    576  1.1  yamt int
    577  1.1  yamt update_mctime(struct Xconn *xc, fileid_t fileid)
    578  1.1  yamt {
    579  1.1  yamt 	static struct cmd *c;
    580  1.1  yamt 
    581  1.1  yamt 	CREATECMD(c,
    582  1.1  yamt 	    "UPDATE file "
    583  1.1  yamt 	    "SET mtime = current_timestamp, ctime = current_timestamp, "
    584  1.1  yamt 		"rev = rev + 1 "
    585  1.1  yamt 	    "WHERE fileid = $1", INT8OID);
    586  1.1  yamt 	return simplecmd(xc, c, fileid);
    587  1.1  yamt }
    588  1.1  yamt 
    589  1.1  yamt int
    590  1.1  yamt update_atime(struct Xconn *xc, fileid_t fileid)
    591  1.1  yamt {
    592  1.1  yamt 	static struct cmd *c;
    593  1.1  yamt 
    594  1.1  yamt 	CREATECMD(c,
    595  1.1  yamt 	    "UPDATE file SET atime = current_timestamp WHERE fileid = $1",
    596  1.1  yamt 	    INT8OID);
    597  1.1  yamt 	return simplecmd(xc, c, fileid);
    598  1.1  yamt }
    599  1.1  yamt 
    600  1.1  yamt int
    601  1.1  yamt update_mtime(struct Xconn *xc, fileid_t fileid)
    602  1.1  yamt {
    603  1.1  yamt 	static struct cmd *c;
    604  1.1  yamt 
    605  1.1  yamt 	CREATECMD(c,
    606  1.1  yamt 	    "UPDATE file "
    607  1.1  yamt 	    "SET mtime = current_timestamp, rev = rev + 1 "
    608  1.1  yamt 	    "WHERE fileid = $1", INT8OID);
    609  1.1  yamt 	return simplecmd(xc, c, fileid);
    610  1.1  yamt }
    611  1.1  yamt 
    612  1.1  yamt int
    613  1.1  yamt update_ctime(struct Xconn *xc, fileid_t fileid)
    614  1.1  yamt {
    615  1.1  yamt 	static struct cmd *c;
    616  1.1  yamt 
    617  1.1  yamt 	CREATECMD(c,
    618  1.1  yamt 	    "UPDATE file SET ctime = current_timestamp WHERE fileid = $1",
    619  1.1  yamt 	    INT8OID);
    620  1.1  yamt 	return simplecmd(xc, c, fileid);
    621  1.1  yamt }
    622  1.1  yamt 
    623  1.1  yamt int
    624  1.1  yamt update_nlink(struct Xconn *xc, fileid_t fileid, int delta)
    625  1.1  yamt {
    626  1.1  yamt 	static struct cmd *c;
    627  1.1  yamt 
    628  1.1  yamt 	CREATECMD(c,
    629  1.1  yamt 	    "UPDATE file "
    630  1.1  yamt 	    "SET nlink = nlink + $1 "
    631  1.1  yamt 	    "WHERE fileid = $2",
    632  1.1  yamt 	    INT8OID, INT8OID);
    633  1.1  yamt 	return simplecmd(xc, c, (int64_t)delta, fileid);
    634  1.1  yamt }
    635  1.1  yamt 
    636  1.1  yamt int
    637  1.1  yamt lookupp(struct Xconn *xc, fileid_t fileid, fileid_t *parent)
    638  1.1  yamt {
    639  1.1  yamt 	static struct cmd *c;
    640  1.1  yamt 	static const Oid types[] = { INT8OID, };
    641  1.1  yamt 	struct fetchstatus s;
    642  1.1  yamt 	int error;
    643  1.1  yamt 
    644  1.1  yamt 	CREATECMD(c, "SELECT parent_fileid FROM dirent "
    645  1.1  yamt 		"WHERE child_fileid = $1 LIMIT 1", INT8OID);
    646  1.1  yamt 	error = sendcmd(xc, c, fileid);
    647  1.1  yamt 	if (error != 0) {
    648  1.1  yamt 		return error;
    649  1.1  yamt 	}
    650  1.1  yamt 	fetchinit(&s, xc);
    651  1.1  yamt 	error = FETCHNEXT(&s, types, parent);
    652  1.1  yamt 	fetchdone(&s);
    653  1.1  yamt 	if (error != 0) {
    654  1.1  yamt 		return error;
    655  1.1  yamt 	}
    656  1.1  yamt 	return 0;
    657  1.1  yamt }
    658  1.1  yamt 
    659  1.1  yamt int
    660  1.1  yamt mkfile(struct Xconn *xc, enum vtype vtype, mode_t mode, uid_t uid, gid_t gid,
    661  1.1  yamt     fileid_t *idp)
    662  1.1  yamt {
    663  1.1  yamt 	static struct cmd *c;
    664  1.1  yamt 	const char *type;
    665  1.1  yamt 	int error;
    666  1.1  yamt 
    667  1.1  yamt 	type = fromvtype(vtype);
    668  1.1  yamt 	if (type == NULL) {
    669  1.1  yamt 		return EOPNOTSUPP;
    670  1.1  yamt 	}
    671  1.1  yamt 	CREATECMD(c,
    672  1.1  yamt 		"INSERT INTO file "
    673  1.1  yamt 		"(fileid, type, mode, uid, gid, nlink, rev, "
    674  1.1  yamt 		"atime, ctime, mtime, btime) "
    675  1.1  yamt 		"VALUES(nextval('fileid_seq'), $1::filetype, $2, $3, $4, 0, 0, "
    676  1.1  yamt 		"current_timestamp, "
    677  1.1  yamt 		"current_timestamp, "
    678  1.1  yamt 		"current_timestamp, "
    679  1.1  yamt 		"current_timestamp) "
    680  1.1  yamt 		"RETURNING fileid", TEXTOID, INT8OID, INT8OID, INT8OID);
    681  1.1  yamt 	error = sendcmd(xc, c, type, (uint64_t)mode, (uint64_t)uid,
    682  1.1  yamt 	    (uint64_t)gid);
    683  1.1  yamt 	if (error != 0) {
    684  1.1  yamt 		return error;
    685  1.1  yamt 	}
    686  1.1  yamt 	return simplefetch(xc, INT8OID, idp);
    687  1.1  yamt }
    688  1.1  yamt 
    689  1.1  yamt int
    690  1.1  yamt linkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child)
    691  1.1  yamt {
    692  1.1  yamt 	static struct cmd *c;
    693  1.1  yamt 	int error;
    694  1.1  yamt 
    695  1.1  yamt 	CREATECMD(c,
    696  1.1  yamt 		"INSERT INTO dirent "
    697  1.1  yamt 		"(parent_fileid, name, child_fileid) "
    698  1.1  yamt 		"VALUES($1, $2, $3)", INT8OID, TEXTOID, INT8OID);
    699  1.1  yamt 	error = simplecmd(xc, c, parent, name, child);
    700  1.1  yamt 	if (error != 0) {
    701  1.1  yamt 		return error;
    702  1.1  yamt 	}
    703  1.1  yamt 	error = update_nlink(xc, child, 1);
    704  1.1  yamt 	if (error != 0) {
    705  1.1  yamt 		return error;
    706  1.1  yamt 	}
    707  1.1  yamt 	return update_mtime(xc, parent);
    708  1.1  yamt }
    709  1.1  yamt 
    710  1.1  yamt int
    711  1.1  yamt unlinkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child)
    712  1.1  yamt {
    713  1.1  yamt 	static struct cmd *c;
    714  1.1  yamt 	int error;
    715  1.1  yamt 
    716  1.1  yamt 	/*
    717  1.1  yamt 	 * in addition to the primary key, we check child_fileid as well here
    718  1.1  yamt 	 * to avoid removing an entry which was appeared after our VOP_LOOKUP.
    719  1.1  yamt 	 */
    720  1.1  yamt 	CREATECMD(c,
    721  1.1  yamt 		"DELETE FROM dirent "
    722  1.1  yamt 		"WHERE parent_fileid = $1 AND name = $2 AND child_fileid = $3",
    723  1.1  yamt 		INT8OID, TEXTOID, INT8OID);
    724  1.1  yamt 	error = simplecmd(xc, c, parent, name, child);
    725  1.1  yamt 	if (error != 0) {
    726  1.1  yamt 		return error;
    727  1.1  yamt 	}
    728  1.1  yamt 	error = update_nlink(xc, child, -1);
    729  1.1  yamt 	if (error != 0) {
    730  1.1  yamt 		return error;
    731  1.1  yamt 	}
    732  1.1  yamt 	error = update_mtime(xc, parent);
    733  1.1  yamt 	if (error != 0) {
    734  1.1  yamt 		return error;
    735  1.1  yamt 	}
    736  1.1  yamt 	return update_ctime(xc, child);
    737  1.1  yamt }
    738  1.1  yamt 
    739  1.1  yamt int
    740  1.1  yamt mklinkfile(struct Xconn *xc, fileid_t parent, const char *name,
    741  1.1  yamt     enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *idp)
    742  1.1  yamt {
    743  1.1  yamt 	fileid_t fileid;
    744  1.1  yamt 	int error;
    745  1.1  yamt 
    746  1.1  yamt 	error = mkfile(xc, vtype, mode, uid, gid, &fileid);
    747  1.1  yamt 	if (error != 0) {
    748  1.1  yamt 		return error;
    749  1.1  yamt 	}
    750  1.1  yamt 	error = linkfile(xc, parent, name, fileid);
    751  1.1  yamt 	if (error != 0) {
    752  1.1  yamt 		return error;
    753  1.1  yamt 	}
    754  1.1  yamt 	if (idp != NULL) {
    755  1.1  yamt 		*idp = fileid;
    756  1.1  yamt 	}
    757  1.1  yamt 	return 0;
    758  1.1  yamt }
    759  1.1  yamt 
    760  1.1  yamt int
    761  1.1  yamt mklinkfile_lo(struct Xconn *xc, fileid_t parent_fileid, const char *name,
    762  1.1  yamt     enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *fileidp,
    763  1.1  yamt     int *loidp)
    764  1.1  yamt {
    765  1.1  yamt 	static struct cmd *c;
    766  1.1  yamt 	fileid_t new_fileid;
    767  1.1  yamt 	int loid;
    768  1.1  yamt 	int error;
    769  1.1  yamt 
    770  1.1  yamt 	error = mklinkfile(xc, parent_fileid, name, vtype, mode, uid, gid,
    771  1.1  yamt 	    &new_fileid);
    772  1.1  yamt 	if (error != 0) {
    773  1.1  yamt 		return error;
    774  1.1  yamt 	}
    775  1.1  yamt 	CREATECMD(c,
    776  1.1  yamt 		"INSERT INTO datafork (fileid, loid) "
    777  1.1  yamt 		"VALUES($1, lo_creat(-1)) "
    778  1.1  yamt 		"RETURNING loid", INT8OID);
    779  1.1  yamt 	error = sendcmd(xc, c, new_fileid);
    780  1.1  yamt 	if (error != 0) {
    781  1.1  yamt 		return error;
    782  1.1  yamt 	}
    783  1.1  yamt 	error = simplefetch(xc, OIDOID, &loid);
    784  1.1  yamt 	if (error != 0) {
    785  1.1  yamt 		return error;
    786  1.1  yamt 	}
    787  1.1  yamt 	if (fileidp != NULL) {
    788  1.1  yamt 		*fileidp = new_fileid;
    789  1.1  yamt 	}
    790  1.1  yamt 	if (loidp != NULL) {
    791  1.1  yamt 		*loidp = loid;
    792  1.1  yamt 	}
    793  1.1  yamt 	return 0;
    794  1.1  yamt }
    795  1.1  yamt 
    796  1.1  yamt int
    797  1.4  yamt cleanupfile(struct Xconn *xc, fileid_t fileid)
    798  1.1  yamt {
    799  1.1  yamt 	static struct cmd *c;
    800  1.4  yamt 	char *type;
    801  1.4  yamt 	unsigned int vtype;
    802  1.4  yamt 	int error;
    803  1.1  yamt 
    804  1.4  yamt 	CREATECMD(c, "DELETE FROM file WHERE fileid = $1 AND nlink = 0 "
    805  1.4  yamt 		"RETURNING type::text", INT8OID);
    806  1.4  yamt 	error = sendcmd(xc, c, fileid);
    807  1.4  yamt 	if (error != 0) {
    808  1.4  yamt 		return error;
    809  1.4  yamt 	}
    810  1.4  yamt 	error = simplefetch(xc, TEXTOID, &type);
    811  1.4  yamt 	if (error == ENOENT) {
    812  1.4  yamt 		return 0; /* probably nlink > 0 */
    813  1.4  yamt 	}
    814  1.4  yamt 	if (error != 0) {
    815  1.4  yamt 		return error;
    816  1.4  yamt 	}
    817  1.4  yamt 	vtype = tovtype(type);
    818  1.4  yamt 	free(type);
    819  1.4  yamt 	if (vtype == VREG || vtype == VLNK) {
    820  1.1  yamt 		static struct cmd *c_datafork;
    821  1.2  yamt 		int32_t ret;
    822  1.1  yamt 
    823  1.1  yamt 		CREATECMD(c_datafork,
    824  1.2  yamt 			"WITH loids AS (DELETE FROM datafork WHERE fileid = $1 "
    825  1.2  yamt 			"RETURNING loid) SELECT lo_unlink(loid) FROM loids",
    826  1.2  yamt 			INT8OID);
    827  1.2  yamt 		error = sendcmd(xc, c_datafork, fileid);
    828  1.1  yamt 		if (error != 0) {
    829  1.1  yamt 			return error;
    830  1.1  yamt 		}
    831  1.2  yamt 		error = simplefetch(xc, INT4OID, &ret);
    832  1.2  yamt 		if (error != 0) {
    833  1.2  yamt 			return error;
    834  1.2  yamt 		}
    835  1.2  yamt 		if (ret != 1) {
    836  1.2  yamt 			return EIO; /* lo_unlink failed */
    837  1.2  yamt 		}
    838  1.1  yamt 	}
    839  1.4  yamt 	return 0;
    840  1.1  yamt }
    841  1.1  yamt 
    842  1.1  yamt /*
    843  1.1  yamt  * check_path: do locking and check to prevent a rename from creating loop.
    844  1.1  yamt  *
    845  1.1  yamt  * lock the dirents between child_fileid and the root directory.
    846  1.1  yamt  * if gate_fileid is appeared in the path, return EINVAL.
    847  1.1  yamt  * caller should ensure that child_fileid is of VDIR beforehand.
    848  1.1  yamt  *
    849  1.1  yamt  * we uses FOR SHARE row level locks as poor man's predicate locks.
    850  1.1  yamt  *
    851  1.1  yamt  * the following is an example to show why we need to lock the path.
    852  1.1  yamt  *
    853  1.1  yamt  * consider:
    854  1.1  yamt  * "mkdir -p /a/b/c/d/e/f && mkdir -p /1/2/3/4/5/6"
    855  1.1  yamt  * and then
    856  1.1  yamt  * thread 1 is doing "mv /a/b /1/2/3/4/5/6"
    857  1.1  yamt  * thread 2 is doing "mv /1/2 /a/b/c/d/e/f"
    858  1.1  yamt  *
    859  1.1  yamt  * a possible consequence:
    860  1.1  yamt  *	thread 1: check_path -> success
    861  1.1  yamt  *	thread 2: check_path -> success
    862  1.1  yamt  *	thread 1: modify directories -> block on row-level lock
    863  1.1  yamt  *	thread 2: modify directories -> block on row-level lock
    864  1.1  yamt  *			-> deadlock detected
    865  1.1  yamt  *			-> rollback and retry
    866  1.1  yamt  *
    867  1.1  yamt  * another possible consequence:
    868  1.1  yamt  *	thread 1: check_path -> success
    869  1.1  yamt  *	thread 1: modify directory entries -> success
    870  1.1  yamt  *	thread 2: check_path -> block on row-level lock
    871  1.1  yamt  *	thread 1: commit
    872  1.1  yamt  *	thread 2: acquire the lock and notices the row is updated
    873  1.1  yamt  *			-> serialization error
    874  1.1  yamt  *			-> rollback and retry
    875  1.1  yamt  *
    876  1.1  yamt  * XXX it might be better to use real serializable transactions,
    877  1.1  yamt  * which will be available for PostgreSQL 9.1
    878  1.1  yamt  */
    879  1.1  yamt 
    880  1.1  yamt int
    881  1.1  yamt check_path(struct Xconn *xc, fileid_t gate_fileid, fileid_t child_fileid)
    882  1.1  yamt {
    883  1.1  yamt 	static struct cmd *c;
    884  1.1  yamt 	fileid_t parent_fileid;
    885  1.1  yamt 	struct fetchstatus s;
    886  1.1  yamt 	int error;
    887  1.1  yamt 
    888  1.1  yamt 	CREATECMD(c,
    889  1.1  yamt 		"WITH RECURSIVE r AS "
    890  1.1  yamt 		"( "
    891  1.1  yamt 				"SELECT parent_fileid, cookie, child_fileid "
    892  1.1  yamt 				"FROM dirent "
    893  1.1  yamt 				"WHERE child_fileid = $1 "
    894  1.1  yamt 			"UNION ALL "
    895  1.1  yamt 				"SELECT d.parent_fileid, d.cookie, "
    896  1.1  yamt 				"d.child_fileid "
    897  1.1  yamt 				"FROM dirent AS d INNER JOIN r "
    898  1.1  yamt 				"ON d.child_fileid = r.parent_fileid "
    899  1.1  yamt 		") "
    900  1.1  yamt 		"SELECT d.parent_fileid "
    901  1.1  yamt 		"FROM dirent d "
    902  1.1  yamt 		"JOIN r "
    903  1.1  yamt 		"ON d.cookie = r.cookie "
    904  1.1  yamt 		"FOR SHARE", INT8OID);
    905  1.1  yamt 	error = sendcmd(xc, c, child_fileid);
    906  1.1  yamt 	if (error != 0) {
    907  1.1  yamt 		return error;
    908  1.1  yamt 	}
    909  1.1  yamt 	fetchinit(&s, xc);
    910  1.1  yamt 	do {
    911  1.1  yamt 		static const Oid types[] = { INT8OID, };
    912  1.1  yamt 
    913  1.1  yamt 		error = FETCHNEXT(&s, types, &parent_fileid);
    914  1.1  yamt 		if (error == ENOENT) {
    915  1.1  yamt 			fetchdone(&s);
    916  1.1  yamt 			return 0;
    917  1.1  yamt 		}
    918  1.1  yamt 		if (error != 0) {
    919  1.1  yamt 			fetchdone(&s);
    920  1.1  yamt 			return error;
    921  1.1  yamt 		}
    922  1.1  yamt 	} while (gate_fileid != parent_fileid);
    923  1.1  yamt 	fetchdone(&s);
    924  1.1  yamt 	return EINVAL;
    925  1.1  yamt }
    926  1.1  yamt 
    927  1.1  yamt int
    928  1.1  yamt isempty(struct Xconn *xc, fileid_t fileid, bool *emptyp)
    929  1.1  yamt {
    930  1.3  yamt 	int32_t dummy;
    931  1.1  yamt 	static struct cmd *c;
    932  1.1  yamt 	int error;
    933  1.1  yamt 
    934  1.1  yamt 	CREATECMD(c,
    935  1.1  yamt 		"SELECT 1 FROM dirent "
    936  1.1  yamt 		"WHERE parent_fileid = $1 LIMIT 1", INT8OID);
    937  1.1  yamt 	error = sendcmd(xc, c, fileid);
    938  1.1  yamt 	if (error != 0) {
    939  1.1  yamt 		return error;
    940  1.1  yamt 	}
    941  1.3  yamt 	error = simplefetch(xc, INT4OID, &dummy);
    942  1.1  yamt 	assert(error != 0 || dummy == 1);
    943  1.1  yamt 	if (error == ENOENT) {
    944  1.1  yamt 		*emptyp = true;
    945  1.1  yamt 		error = 0;
    946  1.1  yamt 	} else {
    947  1.1  yamt 		*emptyp = false;
    948  1.1  yamt 	}
    949  1.1  yamt 	return error;
    950  1.1  yamt }
    951