Home | History | Annotate | Line # | Download | only in pgfs
pgfs_subs.c revision 1.4
      1  1.4  yamt /*	$NetBSD: pgfs_subs.c,v 1.4 2012/04/11 14:26:19 yamt Exp $	*/
      2  1.1  yamt 
      3  1.1  yamt /*-
      4  1.1  yamt  * Copyright (c)2010,2011 YAMAMOTO Takashi,
      5  1.1  yamt  * All rights reserved.
      6  1.1  yamt  *
      7  1.1  yamt  * Redistribution and use in source and binary forms, with or without
      8  1.1  yamt  * modification, are permitted provided that the following conditions
      9  1.1  yamt  * are met:
     10  1.1  yamt  * 1. Redistributions of source code must retain the above copyright
     11  1.1  yamt  *    notice, this list of conditions and the following disclaimer.
     12  1.1  yamt  * 2. Redistributions in binary form must reproduce the above copyright
     13  1.1  yamt  *    notice, this list of conditions and the following disclaimer in the
     14  1.1  yamt  *    documentation and/or other materials provided with the distribution.
     15  1.1  yamt  *
     16  1.1  yamt  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
     17  1.1  yamt  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     18  1.1  yamt  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     19  1.1  yamt  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     20  1.1  yamt  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     21  1.1  yamt  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
     22  1.1  yamt  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     23  1.1  yamt  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     24  1.1  yamt  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     25  1.1  yamt  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     26  1.1  yamt  * SUCH DAMAGE.
     27  1.1  yamt  */
     28  1.1  yamt 
     29  1.1  yamt /*
     30  1.1  yamt  * a file system server which stores the data in a PostgreSQL database.
     31  1.1  yamt  */
     32  1.1  yamt 
     33  1.1  yamt /*
     34  1.1  yamt  * we use large objects to store file contents.  there are a few XXXs wrt it.
     35  1.1  yamt  *
     36  1.1  yamt  * - large objects don't obey the normal transaction semantics.
     37  1.1  yamt  *
     38  1.1  yamt  * - we use large object server-side functions directly (instead of via the
     39  1.1  yamt  *   libpq large object api) because:
     40  1.1  yamt  *	- we want to use asynchronous (in the sense of PQsendFoo) operations
     41  1.1  yamt  *	  which is not available with the libpq large object api.
     42  1.1  yamt  *	- with the libpq large object api, there's no way to know details of
     43  1.1  yamt  *	  an error because PGresult is freed in the library without saving
     44  1.1  yamt  *	  PG_DIAG_SQLSTATE etc.
     45  1.1  yamt  */
     46  1.1  yamt 
     47  1.1  yamt #include <sys/cdefs.h>
     48  1.1  yamt #ifndef lint
     49  1.4  yamt __RCSID("$NetBSD: pgfs_subs.c,v 1.4 2012/04/11 14:26:19 yamt Exp $");
     50  1.1  yamt #endif /* not lint */
     51  1.1  yamt 
     52  1.1  yamt #include <assert.h>
     53  1.1  yamt #include <err.h>
     54  1.1  yamt #include <errno.h>
     55  1.1  yamt #include <puffs.h>
     56  1.1  yamt #include <inttypes.h>
     57  1.1  yamt #include <stdarg.h>
     58  1.1  yamt #include <stdbool.h>
     59  1.1  yamt #include <stdio.h>
     60  1.1  yamt #include <stdlib.h>
     61  1.1  yamt #include <time.h>
     62  1.1  yamt #include <util.h>
     63  1.1  yamt 
     64  1.1  yamt #include <libpq-fe.h>
     65  1.1  yamt #include <libpq/libpq-fs.h>	/* INV_* */
     66  1.1  yamt 
     67  1.1  yamt #include "pgfs.h"
     68  1.1  yamt #include "pgfs_db.h"
     69  1.1  yamt #include "pgfs_debug.h"
     70  1.1  yamt #include "pgfs_waitq.h"
     71  1.1  yamt #include "pgfs_subs.h"
     72  1.1  yamt 
     73  1.1  yamt const char * const vtype_table[] = {
     74  1.1  yamt 	[VREG] = "regular",
     75  1.1  yamt 	[VDIR] = "directory",
     76  1.1  yamt 	[VLNK] = "link",
     77  1.1  yamt };
     78  1.1  yamt 
     79  1.1  yamt static unsigned int
     80  1.1  yamt tovtype(const char *type)
     81  1.1  yamt {
     82  1.1  yamt 	unsigned int i;
     83  1.1  yamt 
     84  1.1  yamt 	for (i = 0; i < __arraycount(vtype_table); i++) {
     85  1.1  yamt 		if (vtype_table[i] == NULL) {
     86  1.1  yamt 			continue;
     87  1.1  yamt 		}
     88  1.1  yamt 		if (!strcmp(type, vtype_table[i])) {
     89  1.1  yamt 			return i;
     90  1.1  yamt 		}
     91  1.1  yamt 	}
     92  1.1  yamt 	assert(0);
     93  1.1  yamt 	return 0;
     94  1.1  yamt }
     95  1.1  yamt 
     96  1.1  yamt static const char *
     97  1.1  yamt fromvtype(enum vtype vtype)
     98  1.1  yamt {
     99  1.1  yamt 
    100  1.1  yamt 	if (vtype < __arraycount(vtype_table)) {
    101  1.1  yamt 		assert(vtype_table[vtype] != NULL);
    102  1.1  yamt 		return vtype_table[vtype];
    103  1.1  yamt 	}
    104  1.1  yamt 	return NULL;
    105  1.1  yamt }
    106  1.1  yamt 
    107  1.1  yamt /*
    108  1.1  yamt  * fileid_lock stuff below is to keep ordering of operations for a file.
    109  1.1  yamt  * it is a workaround for the lack of operation barriers in the puffs
    110  1.1  yamt  * protocol.
    111  1.1  yamt  *
    112  1.1  yamt  * currently we do this locking only for SETATTR, GETATTR, and WRITE as
    113  1.1  yamt  * they are known to be reorder-unsafe.  they are sensitive to the file
    114  1.1  yamt  * attributes, mainly the file size.  note that as the kernel issues async
    115  1.1  yamt  * SETATTR/WRITE requests, vnode lock doesn't prevent GETATTR from seeing
    116  1.1  yamt  * the stale attributes.
    117  1.1  yamt  *
    118  1.1  yamt  * we are relying on waiton/wakeup being a FIFO.
    119  1.1  yamt  */
    120  1.1  yamt 
    121  1.1  yamt struct fileid_lock_handle {
    122  1.1  yamt 	TAILQ_ENTRY(fileid_lock_handle) list;
    123  1.1  yamt 	fileid_t fileid;
    124  1.1  yamt 	struct puffs_cc *owner;	/* diagnostic only */
    125  1.1  yamt 	struct waitq waitq;
    126  1.1  yamt };
    127  1.1  yamt 
    128  1.1  yamt TAILQ_HEAD(, fileid_lock_handle) fileid_lock_list =
    129  1.1  yamt     TAILQ_HEAD_INITIALIZER(fileid_lock_list);
    130  1.1  yamt struct waitq fileid_lock_waitq = TAILQ_HEAD_INITIALIZER(fileid_lock_waitq);
    131  1.1  yamt 
    132  1.1  yamt /*
    133  1.1  yamt  * fileid_lock: serialize requests for the fileid.
    134  1.1  yamt  *
    135  1.1  yamt  * this function should be the first yieldable point in a puffs callback.
    136  1.1  yamt  */
    137  1.1  yamt 
    138  1.1  yamt struct fileid_lock_handle *
    139  1.1  yamt fileid_lock(fileid_t fileid, struct puffs_cc *cc)
    140  1.1  yamt {
    141  1.1  yamt 	struct fileid_lock_handle *lock;
    142  1.1  yamt 
    143  1.1  yamt 	TAILQ_FOREACH(lock, &fileid_lock_list, list) {
    144  1.1  yamt 		if (lock->fileid == fileid) {
    145  1.1  yamt 			DPRINTF("fileid wait %" PRIu64 " cc %p\n", fileid, cc);
    146  1.1  yamt 			assert(lock->owner != cc);
    147  1.1  yamt 			waiton(&lock->waitq, cc);	/* enter FIFO */
    148  1.1  yamt 			assert(lock->owner == cc);
    149  1.1  yamt 			return lock;
    150  1.1  yamt 		}
    151  1.1  yamt 	}
    152  1.1  yamt 	lock = emalloc(sizeof(*lock));
    153  1.1  yamt 	lock->fileid = fileid;
    154  1.1  yamt 	lock->owner = cc;
    155  1.1  yamt 	DPRINTF("fileid lock %" PRIu64 " cc %p\n", lock->fileid, cc);
    156  1.1  yamt 	waitq_init(&lock->waitq);
    157  1.1  yamt 	TAILQ_INSERT_HEAD(&fileid_lock_list, lock, list);
    158  1.1  yamt 	return lock;
    159  1.1  yamt }
    160  1.1  yamt 
    161  1.1  yamt void
    162  1.1  yamt fileid_unlock(struct fileid_lock_handle *lock)
    163  1.1  yamt {
    164  1.1  yamt 
    165  1.1  yamt 	DPRINTF("fileid unlock %" PRIu64 "\n", lock->fileid);
    166  1.1  yamt 	assert(lock != NULL);
    167  1.1  yamt 	assert(lock->owner != NULL);
    168  1.1  yamt 	/*
    169  1.1  yamt 	 * perform direct-handoff to the first waiter.
    170  1.1  yamt 	 *
    171  1.1  yamt 	 * a handoff is essential to keep the order of requests.
    172  1.1  yamt 	 */
    173  1.1  yamt 	lock->owner = wakeup_one(&lock->waitq);
    174  1.1  yamt 	if (lock->owner != NULL) {
    175  1.1  yamt 		return;
    176  1.1  yamt 	}
    177  1.1  yamt 	/*
    178  1.1  yamt 	 * no one is waiting this fileid.
    179  1.1  yamt 	 */
    180  1.1  yamt 	TAILQ_REMOVE(&fileid_lock_list, lock, list);
    181  1.1  yamt 	free(lock);
    182  1.1  yamt }
    183  1.1  yamt 
    184  1.1  yamt /*
    185  1.1  yamt  * timespec_to_pgtimestamp: create a text representation of timestamp which
    186  1.1  yamt  * can be recognized by the database server.
    187  1.1  yamt  *
    188  1.1  yamt  * it's caller's responsibility to free(3) the result.
    189  1.1  yamt  */
    190  1.1  yamt 
    191  1.1  yamt int
    192  1.1  yamt timespec_to_pgtimestamp(const struct timespec *tv, char **resultp)
    193  1.1  yamt {
    194  1.1  yamt 	/*
    195  1.1  yamt 	 * XXX is there any smarter way?
    196  1.1  yamt 	 */
    197  1.1  yamt 	char buf1[1024];
    198  1.1  yamt 	char buf2[1024];
    199  1.1  yamt 	struct tm tm_store;
    200  1.1  yamt 	struct tm *tm;
    201  1.1  yamt 
    202  1.1  yamt 	tm = gmtime_r(&tv->tv_sec, &tm_store);
    203  1.1  yamt 	if (tm == NULL) {
    204  1.1  yamt 		assert(errno != 0);
    205  1.1  yamt 		return errno;
    206  1.1  yamt 	}
    207  1.1  yamt 	strftime(buf1, sizeof(buf1), "%Y%m%dT%H%M%S", tm);
    208  1.1  yamt 	snprintf(buf2, sizeof(buf2), "%s.%ju", buf1,
    209  1.1  yamt 	    (uintmax_t)tv->tv_nsec / 1000);
    210  1.1  yamt 	*resultp = estrdup(buf2);
    211  1.1  yamt 	return 0;
    212  1.1  yamt }
    213  1.1  yamt 
    214  1.1  yamt int
    215  1.1  yamt my_lo_truncate(struct Xconn *xc, int32_t fd, int32_t size)
    216  1.1  yamt {
    217  1.1  yamt 	static struct cmd *c;
    218  1.1  yamt 	int32_t ret;
    219  1.1  yamt 	int error;
    220  1.1  yamt 
    221  1.1  yamt 	CREATECMD(c, "SELECT lo_truncate($1, $2)", INT4OID, INT4OID);
    222  1.1  yamt 	error = sendcmd(xc, c, fd, size);
    223  1.1  yamt 	if (error != 0) {
    224  1.1  yamt 		return error;
    225  1.1  yamt 	}
    226  1.1  yamt 	error = simplefetch(xc, INT4OID, &ret);
    227  1.1  yamt 	if (error != 0) {
    228  1.1  yamt 		if (error == EEXIST) {
    229  1.1  yamt 			/*
    230  1.1  yamt 			 * probably the insertion of the new-sized page
    231  1.1  yamt 			 * caused a duplicated key error.  retry.
    232  1.1  yamt 			 */
    233  1.1  yamt 			DPRINTF("map EEXIST to EAGAIN\n");
    234  1.1  yamt 			error = EAGAIN;
    235  1.1  yamt 		}
    236  1.1  yamt 		return error;
    237  1.1  yamt 	}
    238  1.1  yamt 	assert(ret == 0);
    239  1.1  yamt 	return 0;
    240  1.1  yamt }
    241  1.1  yamt 
    242  1.1  yamt int
    243  1.1  yamt my_lo_lseek(struct Xconn *xc, int32_t fd, int32_t offset, int32_t whence,
    244  1.1  yamt     int32_t *retp)
    245  1.1  yamt {
    246  1.1  yamt 	static struct cmd *c;
    247  1.1  yamt 	int32_t ret;
    248  1.1  yamt 	int error;
    249  1.1  yamt 
    250  1.1  yamt 	CREATECMD(c, "SELECT lo_lseek($1, $2, $3)", INT4OID, INT4OID, INT4OID);
    251  1.1  yamt 	error = sendcmd(xc, c, fd, offset, whence);
    252  1.1  yamt 	if (error != 0) {
    253  1.1  yamt 		return error;
    254  1.1  yamt 	}
    255  1.1  yamt 	error = simplefetch(xc, INT4OID, &ret);
    256  1.1  yamt 	if (error != 0) {
    257  1.1  yamt 		return error;
    258  1.1  yamt 	}
    259  1.1  yamt 	if (retp != NULL) {
    260  1.1  yamt 		*retp = ret;
    261  1.1  yamt 	}
    262  1.1  yamt 	return 0;
    263  1.1  yamt }
    264  1.1  yamt 
    265  1.1  yamt int
    266  1.1  yamt my_lo_read(struct Xconn *xc, int32_t fd, void *buf, size_t size,
    267  1.1  yamt     size_t *resultsizep)
    268  1.1  yamt {
    269  1.1  yamt 	static struct cmd *c;
    270  1.1  yamt 	size_t resultsize;
    271  1.1  yamt 	int error;
    272  1.1  yamt 
    273  1.1  yamt 	CREATECMD(c, "SELECT loread($1, $2)", INT4OID, INT4OID);
    274  1.1  yamt 	error = sendcmdx(xc, 1, c, fd, (int32_t)size);
    275  1.1  yamt 	if (error != 0) {
    276  1.1  yamt 		return error;
    277  1.1  yamt 	}
    278  1.1  yamt 	error = simplefetch(xc, BYTEA, buf, &resultsize);
    279  1.1  yamt 	if (error != 0) {
    280  1.1  yamt 		return error;
    281  1.1  yamt 	}
    282  1.1  yamt 	*resultsizep = resultsize;
    283  1.1  yamt 	if (size != resultsize) {
    284  1.1  yamt 		DPRINTF("shortread? %zu != %zu\n", size, resultsize);
    285  1.1  yamt 	}
    286  1.1  yamt 	return 0;
    287  1.1  yamt }
    288  1.1  yamt 
    289  1.1  yamt int
    290  1.1  yamt my_lo_write(struct Xconn *xc, int32_t fd, const void *buf, size_t size,
    291  1.1  yamt     size_t *resultsizep)
    292  1.1  yamt {
    293  1.1  yamt 	static struct cmd *c;
    294  1.1  yamt 	int32_t resultsize;
    295  1.1  yamt 	int error;
    296  1.1  yamt 
    297  1.1  yamt 	CREATECMD(c, "SELECT lowrite($1, $2)", INT4OID, BYTEA);
    298  1.1  yamt 	error = sendcmd(xc, c, fd, buf, (int32_t)size);
    299  1.1  yamt 	if (error != 0) {
    300  1.1  yamt 		return error;
    301  1.1  yamt 	}
    302  1.1  yamt 	error = simplefetch(xc, INT4OID, &resultsize);
    303  1.1  yamt 	if (error != 0) {
    304  1.1  yamt 		if (error == EEXIST) {
    305  1.1  yamt 			/*
    306  1.1  yamt 			 * probably the insertion of the new data page
    307  1.1  yamt 			 * caused a duplicated key error.  retry.
    308  1.1  yamt 			 */
    309  1.1  yamt 			DPRINTF("map EEXIST to EAGAIN\n");
    310  1.1  yamt 			error = EAGAIN;
    311  1.1  yamt 		}
    312  1.1  yamt 		return error;
    313  1.1  yamt 	}
    314  1.1  yamt 	*resultsizep = resultsize;
    315  1.1  yamt 	if (size != (size_t)resultsize) {
    316  1.1  yamt 		DPRINTF("shortwrite? %zu != %zu\n", size, (size_t)resultsize);
    317  1.1  yamt 	}
    318  1.1  yamt 	return 0;
    319  1.1  yamt }
    320  1.1  yamt 
    321  1.1  yamt int
    322  1.1  yamt my_lo_open(struct Xconn *xc, Oid loid, int32_t mode, int32_t *fdp)
    323  1.1  yamt {
    324  1.1  yamt 	static struct cmd *c;
    325  1.1  yamt 	int error;
    326  1.1  yamt 
    327  1.1  yamt 	CREATECMD(c, "SELECT lo_open($1, $2)", OIDOID, INT4OID);
    328  1.1  yamt 	error = sendcmd(xc, c, loid, mode);
    329  1.1  yamt 	if (error != 0) {
    330  1.1  yamt 		return error;
    331  1.1  yamt 	}
    332  1.1  yamt 	return simplefetch(xc, INT4OID, fdp);
    333  1.1  yamt }
    334  1.1  yamt 
    335  1.1  yamt int
    336  1.1  yamt my_lo_close(struct Xconn *xc, int32_t fd)
    337  1.1  yamt {
    338  1.1  yamt 	static struct cmd *c;
    339  1.1  yamt 	int32_t ret;
    340  1.1  yamt 	int error;
    341  1.1  yamt 
    342  1.1  yamt 	CREATECMD(c, "SELECT lo_close($1)", INT4OID);
    343  1.1  yamt 	error = sendcmd(xc, c, fd);
    344  1.1  yamt 	if (error != 0) {
    345  1.1  yamt 		return error;
    346  1.1  yamt 	}
    347  1.1  yamt 	error = simplefetch(xc, INT4OID, &ret);
    348  1.1  yamt 	if (error != 0) {
    349  1.1  yamt 		return error;
    350  1.1  yamt 	}
    351  1.1  yamt 	assert(ret == 0);
    352  1.1  yamt 	return 0;
    353  1.1  yamt }
    354  1.1  yamt 
    355  1.1  yamt static int
    356  1.1  yamt lo_lookup_by_fileid(struct Xconn *xc, fileid_t fileid, Oid *idp)
    357  1.1  yamt {
    358  1.1  yamt 	static struct cmd *c;
    359  1.1  yamt 	static const Oid types[] = { OIDOID, };
    360  1.1  yamt 	struct fetchstatus s;
    361  1.1  yamt 	int error;
    362  1.1  yamt 
    363  1.1  yamt 	CREATECMD(c, "SELECT loid FROM datafork WHERE fileid = $1", INT8OID);
    364  1.1  yamt 	error = sendcmd(xc, c, fileid);
    365  1.1  yamt 	if (error != 0) {
    366  1.1  yamt 		return error;
    367  1.1  yamt 	}
    368  1.1  yamt 	fetchinit(&s, xc);
    369  1.1  yamt 	error = FETCHNEXT(&s, types, idp);
    370  1.1  yamt 	fetchdone(&s);
    371  1.1  yamt 	DPRINTF("error %d\n", error);
    372  1.1  yamt 	return error;
    373  1.1  yamt }
    374  1.1  yamt 
    375  1.1  yamt int
    376  1.1  yamt lo_open_by_fileid(struct Xconn *xc, fileid_t fileid, int mode, int *fdp)
    377  1.1  yamt {
    378  1.1  yamt 	Oid loid;
    379  1.1  yamt 	int fd;
    380  1.1  yamt 	int error;
    381  1.1  yamt 
    382  1.1  yamt 	error = lo_lookup_by_fileid(xc, fileid, &loid);
    383  1.1  yamt 	if (error != 0) {
    384  1.1  yamt 		return error;
    385  1.1  yamt 	}
    386  1.1  yamt 	error = my_lo_open(xc, loid, mode, &fd);
    387  1.1  yamt 	if (error != 0) {
    388  1.1  yamt 		return error;
    389  1.1  yamt 	}
    390  1.1  yamt 	*fdp = fd;
    391  1.1  yamt 	return 0;
    392  1.1  yamt }
    393  1.1  yamt 
    394  1.1  yamt static int
    395  1.1  yamt getsize(struct Xconn *xc, fileid_t fileid, int *resultp)
    396  1.1  yamt {
    397  1.1  yamt 	int32_t size;
    398  1.1  yamt 	int fd;
    399  1.1  yamt 	int error;
    400  1.1  yamt 
    401  1.1  yamt 	error = lo_open_by_fileid(xc, fileid, INV_READ, &fd);
    402  1.1  yamt 	if (error != 0) {
    403  1.1  yamt 		return error;
    404  1.1  yamt 	}
    405  1.1  yamt 	error = my_lo_lseek(xc, fd, 0, SEEK_END, &size);
    406  1.1  yamt 	if (error != 0) {
    407  1.1  yamt 		return error;
    408  1.1  yamt 	}
    409  1.1  yamt 	error = my_lo_close(xc, fd);
    410  1.1  yamt 	if (error != 0) {
    411  1.1  yamt 		return error;
    412  1.1  yamt 	}
    413  1.1  yamt 	*resultp = size;
    414  1.1  yamt 	return 0;
    415  1.1  yamt }
    416  1.1  yamt 
    417  1.1  yamt #define	GETATTR_TYPE	0x00000001
    418  1.1  yamt #define	GETATTR_NLINK	0x00000002
    419  1.1  yamt #define	GETATTR_SIZE	0x00000004
    420  1.1  yamt #define	GETATTR_MODE	0x00000008
    421  1.1  yamt #define	GETATTR_UID	0x00000010
    422  1.1  yamt #define	GETATTR_GID	0x00000020
    423  1.1  yamt #define	GETATTR_TIME	0x00000040
    424  1.1  yamt #define	GETATTR_ALL	\
    425  1.1  yamt 	(GETATTR_TYPE|GETATTR_NLINK|GETATTR_SIZE|GETATTR_MODE| \
    426  1.1  yamt 	GETATTR_UID|GETATTR_GID|GETATTR_TIME)
    427  1.1  yamt 
    428  1.1  yamt int
    429  1.1  yamt getattr(struct Xconn *xc, fileid_t fileid, struct vattr *va, unsigned int mask)
    430  1.1  yamt {
    431  1.1  yamt 	char *type;
    432  1.1  yamt 	long long atime_s;
    433  1.1  yamt 	long long atime_us;
    434  1.1  yamt 	long long ctime_s;
    435  1.1  yamt 	long long ctime_us;
    436  1.1  yamt 	long long mtime_s;
    437  1.1  yamt 	long long mtime_us;
    438  1.1  yamt 	long long btime_s;
    439  1.1  yamt 	long long btime_us;
    440  1.1  yamt 	uint64_t mode;
    441  1.1  yamt 	long long uid;
    442  1.1  yamt 	long long gid;
    443  1.1  yamt 	long long nlink;
    444  1.1  yamt 	long long rev;
    445  1.1  yamt 	struct fetchstatus s;
    446  1.1  yamt 	int error;
    447  1.1  yamt 
    448  1.1  yamt 	if (mask == 0) {
    449  1.1  yamt 		return 0;
    450  1.1  yamt 	}
    451  1.1  yamt 	/*
    452  1.1  yamt 	 * unless explicitly requested, avoid fetching timestamps as they
    453  1.1  yamt 	 * are a little more expensive than other simple attributes.
    454  1.1  yamt 	 */
    455  1.1  yamt 	if ((mask & GETATTR_TIME) != 0) {
    456  1.1  yamt 		static struct cmd *c;
    457  1.1  yamt 		static const Oid types[] = {
    458  1.1  yamt 			TEXTOID,
    459  1.1  yamt 			INT8OID,
    460  1.1  yamt 			INT8OID,
    461  1.1  yamt 			INT8OID,
    462  1.1  yamt 			INT8OID,
    463  1.1  yamt 			INT8OID,
    464  1.1  yamt 			INT8OID,
    465  1.1  yamt 			INT8OID,
    466  1.1  yamt 			INT8OID,
    467  1.1  yamt 			INT8OID,
    468  1.1  yamt 			INT8OID,
    469  1.1  yamt 			INT8OID,
    470  1.1  yamt 			INT8OID,
    471  1.1  yamt 			INT8OID,
    472  1.1  yamt 		};
    473  1.1  yamt 
    474  1.1  yamt 		CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev, "
    475  1.1  yamt 		    "extract(epoch from date_trunc('second', atime))::int8, "
    476  1.1  yamt 		    "extract(microseconds from atime)::int8, "
    477  1.1  yamt 		    "extract(epoch from date_trunc('second', ctime))::int8, "
    478  1.1  yamt 		    "extract(microseconds from ctime)::int8, "
    479  1.1  yamt 		    "extract(epoch from date_trunc('second', mtime))::int8, "
    480  1.1  yamt 		    "extract(microseconds from mtime)::int8, "
    481  1.1  yamt 		    "extract(epoch from date_trunc('second', btime))::int8, "
    482  1.1  yamt 		    "extract(microseconds from btime)::int8 "
    483  1.1  yamt 		    "FROM file "
    484  1.1  yamt 		    "WHERE fileid = $1", INT8OID);
    485  1.1  yamt 		error = sendcmd(xc, c, fileid);
    486  1.1  yamt 		if (error != 0) {
    487  1.1  yamt 			return error;
    488  1.1  yamt 		}
    489  1.1  yamt 		fetchinit(&s, xc);
    490  1.1  yamt 		error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink,
    491  1.1  yamt 		    &rev,
    492  1.1  yamt 		    &atime_s, &atime_us,
    493  1.1  yamt 		    &ctime_s, &ctime_us,
    494  1.1  yamt 		    &mtime_s, &mtime_us,
    495  1.1  yamt 		    &btime_s, &btime_us);
    496  1.1  yamt 	} else {
    497  1.1  yamt 		static struct cmd *c;
    498  1.1  yamt 		static const Oid types[] = {
    499  1.1  yamt 			TEXTOID,
    500  1.1  yamt 			INT8OID,
    501  1.1  yamt 			INT8OID,
    502  1.1  yamt 			INT8OID,
    503  1.1  yamt 			INT8OID,
    504  1.1  yamt 			INT8OID,
    505  1.1  yamt 		};
    506  1.1  yamt 
    507  1.1  yamt 		CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev "
    508  1.1  yamt 		    "FROM file "
    509  1.1  yamt 		    "WHERE fileid = $1", INT8OID);
    510  1.1  yamt 		error = sendcmd(xc, c, fileid);
    511  1.1  yamt 		if (error != 0) {
    512  1.1  yamt 			return error;
    513  1.1  yamt 		}
    514  1.1  yamt 		fetchinit(&s, xc);
    515  1.1  yamt 		error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink,
    516  1.1  yamt 		    &rev);
    517  1.1  yamt 	}
    518  1.1  yamt 	fetchdone(&s);
    519  1.1  yamt 	if (error != 0) {
    520  1.1  yamt 		return error;
    521  1.1  yamt 	}
    522  1.1  yamt 	memset(va, 0xaa, sizeof(*va)); /* fill with garbage for debug */
    523  1.1  yamt 	va->va_type = tovtype(type);
    524  1.1  yamt 	free(type);
    525  1.1  yamt 	va->va_mode = mode;
    526  1.1  yamt 	va->va_uid = uid;
    527  1.1  yamt 	va->va_gid = gid;
    528  1.1  yamt 	if (nlink > 0 && va->va_type == VDIR) {
    529  1.1  yamt 		nlink++; /* "." */
    530  1.1  yamt 	}
    531  1.1  yamt 	va->va_nlink = nlink;
    532  1.1  yamt 	va->va_fileid = fileid;
    533  1.1  yamt 	va->va_atime.tv_sec = atime_s;
    534  1.1  yamt 	va->va_atime.tv_nsec = atime_us * 1000;
    535  1.1  yamt 	va->va_ctime.tv_sec = ctime_s;
    536  1.1  yamt 	va->va_ctime.tv_nsec = ctime_us * 1000;
    537  1.1  yamt 	va->va_mtime.tv_sec = mtime_s;
    538  1.1  yamt 	va->va_mtime.tv_nsec = mtime_us * 1000;
    539  1.1  yamt 	va->va_birthtime.tv_sec = btime_s;
    540  1.1  yamt 	va->va_birthtime.tv_nsec = btime_us * 1000;
    541  1.1  yamt 	va->va_blocksize = LOBLKSIZE;
    542  1.1  yamt 	va->va_gen = 1;
    543  1.1  yamt 	va->va_filerev = rev;
    544  1.1  yamt 	if ((mask & GETATTR_SIZE) != 0) {
    545  1.1  yamt 		int size;
    546  1.1  yamt 
    547  1.1  yamt 		size = 0;
    548  1.1  yamt 		if (va->va_type == VREG || va->va_type == VLNK) {
    549  1.1  yamt 			error = getsize(xc, fileid, &size);
    550  1.1  yamt 			if (error != 0) {
    551  1.1  yamt 				return error;
    552  1.1  yamt 			}
    553  1.1  yamt 		} else if (va->va_type == VDIR) {
    554  1.1  yamt 			size = 100; /* XXX */
    555  1.1  yamt 		}
    556  1.1  yamt 		va->va_size = size;
    557  1.1  yamt 	}
    558  1.1  yamt 	/*
    559  1.1  yamt 	 * XXX va_bytes: likely wrong due to toast compression.
    560  1.1  yamt 	 * there's no cheap way to get the compressed size of LO.
    561  1.1  yamt 	 */
    562  1.1  yamt 	va->va_bytes = va->va_size;
    563  1.1  yamt 	va->va_flags = 0;
    564  1.1  yamt 	return 0;
    565  1.1  yamt }
    566  1.1  yamt 
    567  1.1  yamt int
    568  1.1  yamt update_mctime(struct Xconn *xc, fileid_t fileid)
    569  1.1  yamt {
    570  1.1  yamt 	static struct cmd *c;
    571  1.1  yamt 
    572  1.1  yamt 	CREATECMD(c,
    573  1.1  yamt 	    "UPDATE file "
    574  1.1  yamt 	    "SET mtime = current_timestamp, ctime = current_timestamp, "
    575  1.1  yamt 		"rev = rev + 1 "
    576  1.1  yamt 	    "WHERE fileid = $1", INT8OID);
    577  1.1  yamt 	return simplecmd(xc, c, fileid);
    578  1.1  yamt }
    579  1.1  yamt 
    580  1.1  yamt int
    581  1.1  yamt update_atime(struct Xconn *xc, fileid_t fileid)
    582  1.1  yamt {
    583  1.1  yamt 	static struct cmd *c;
    584  1.1  yamt 
    585  1.1  yamt 	CREATECMD(c,
    586  1.1  yamt 	    "UPDATE file SET atime = current_timestamp WHERE fileid = $1",
    587  1.1  yamt 	    INT8OID);
    588  1.1  yamt 	return simplecmd(xc, c, fileid);
    589  1.1  yamt }
    590  1.1  yamt 
    591  1.1  yamt int
    592  1.1  yamt update_mtime(struct Xconn *xc, fileid_t fileid)
    593  1.1  yamt {
    594  1.1  yamt 	static struct cmd *c;
    595  1.1  yamt 
    596  1.1  yamt 	CREATECMD(c,
    597  1.1  yamt 	    "UPDATE file "
    598  1.1  yamt 	    "SET mtime = current_timestamp, rev = rev + 1 "
    599  1.1  yamt 	    "WHERE fileid = $1", INT8OID);
    600  1.1  yamt 	return simplecmd(xc, c, fileid);
    601  1.1  yamt }
    602  1.1  yamt 
    603  1.1  yamt int
    604  1.1  yamt update_ctime(struct Xconn *xc, fileid_t fileid)
    605  1.1  yamt {
    606  1.1  yamt 	static struct cmd *c;
    607  1.1  yamt 
    608  1.1  yamt 	CREATECMD(c,
    609  1.1  yamt 	    "UPDATE file SET ctime = current_timestamp WHERE fileid = $1",
    610  1.1  yamt 	    INT8OID);
    611  1.1  yamt 	return simplecmd(xc, c, fileid);
    612  1.1  yamt }
    613  1.1  yamt 
    614  1.1  yamt int
    615  1.1  yamt update_nlink(struct Xconn *xc, fileid_t fileid, int delta)
    616  1.1  yamt {
    617  1.1  yamt 	static struct cmd *c;
    618  1.1  yamt 
    619  1.1  yamt 	CREATECMD(c,
    620  1.1  yamt 	    "UPDATE file "
    621  1.1  yamt 	    "SET nlink = nlink + $1 "
    622  1.1  yamt 	    "WHERE fileid = $2",
    623  1.1  yamt 	    INT8OID, INT8OID);
    624  1.1  yamt 	return simplecmd(xc, c, (int64_t)delta, fileid);
    625  1.1  yamt }
    626  1.1  yamt 
    627  1.1  yamt int
    628  1.1  yamt lookupp(struct Xconn *xc, fileid_t fileid, fileid_t *parent)
    629  1.1  yamt {
    630  1.1  yamt 	static struct cmd *c;
    631  1.1  yamt 	static const Oid types[] = { INT8OID, };
    632  1.1  yamt 	struct fetchstatus s;
    633  1.1  yamt 	int error;
    634  1.1  yamt 
    635  1.1  yamt 	CREATECMD(c, "SELECT parent_fileid FROM dirent "
    636  1.1  yamt 		"WHERE child_fileid = $1 LIMIT 1", INT8OID);
    637  1.1  yamt 	error = sendcmd(xc, c, fileid);
    638  1.1  yamt 	if (error != 0) {
    639  1.1  yamt 		return error;
    640  1.1  yamt 	}
    641  1.1  yamt 	fetchinit(&s, xc);
    642  1.1  yamt 	error = FETCHNEXT(&s, types, parent);
    643  1.1  yamt 	fetchdone(&s);
    644  1.1  yamt 	if (error != 0) {
    645  1.1  yamt 		return error;
    646  1.1  yamt 	}
    647  1.1  yamt 	return 0;
    648  1.1  yamt }
    649  1.1  yamt 
    650  1.1  yamt int
    651  1.1  yamt mkfile(struct Xconn *xc, enum vtype vtype, mode_t mode, uid_t uid, gid_t gid,
    652  1.1  yamt     fileid_t *idp)
    653  1.1  yamt {
    654  1.1  yamt 	static struct cmd *c;
    655  1.1  yamt 	const char *type;
    656  1.1  yamt 	int error;
    657  1.1  yamt 
    658  1.1  yamt 	type = fromvtype(vtype);
    659  1.1  yamt 	if (type == NULL) {
    660  1.1  yamt 		return EOPNOTSUPP;
    661  1.1  yamt 	}
    662  1.1  yamt 	CREATECMD(c,
    663  1.1  yamt 		"INSERT INTO file "
    664  1.1  yamt 		"(fileid, type, mode, uid, gid, nlink, rev, "
    665  1.1  yamt 		"atime, ctime, mtime, btime) "
    666  1.1  yamt 		"VALUES(nextval('fileid_seq'), $1::filetype, $2, $3, $4, 0, 0, "
    667  1.1  yamt 		"current_timestamp, "
    668  1.1  yamt 		"current_timestamp, "
    669  1.1  yamt 		"current_timestamp, "
    670  1.1  yamt 		"current_timestamp) "
    671  1.1  yamt 		"RETURNING fileid", TEXTOID, INT8OID, INT8OID, INT8OID);
    672  1.1  yamt 	error = sendcmd(xc, c, type, (uint64_t)mode, (uint64_t)uid,
    673  1.1  yamt 	    (uint64_t)gid);
    674  1.1  yamt 	if (error != 0) {
    675  1.1  yamt 		return error;
    676  1.1  yamt 	}
    677  1.1  yamt 	return simplefetch(xc, INT8OID, idp);
    678  1.1  yamt }
    679  1.1  yamt 
    680  1.1  yamt int
    681  1.1  yamt linkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child)
    682  1.1  yamt {
    683  1.1  yamt 	static struct cmd *c;
    684  1.1  yamt 	int error;
    685  1.1  yamt 
    686  1.1  yamt 	CREATECMD(c,
    687  1.1  yamt 		"INSERT INTO dirent "
    688  1.1  yamt 		"(parent_fileid, name, child_fileid) "
    689  1.1  yamt 		"VALUES($1, $2, $3)", INT8OID, TEXTOID, INT8OID);
    690  1.1  yamt 	error = simplecmd(xc, c, parent, name, child);
    691  1.1  yamt 	if (error != 0) {
    692  1.1  yamt 		return error;
    693  1.1  yamt 	}
    694  1.1  yamt 	error = update_nlink(xc, child, 1);
    695  1.1  yamt 	if (error != 0) {
    696  1.1  yamt 		return error;
    697  1.1  yamt 	}
    698  1.1  yamt 	return update_mtime(xc, parent);
    699  1.1  yamt }
    700  1.1  yamt 
    701  1.1  yamt int
    702  1.1  yamt unlinkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child)
    703  1.1  yamt {
    704  1.1  yamt 	static struct cmd *c;
    705  1.1  yamt 	int error;
    706  1.1  yamt 
    707  1.1  yamt 	/*
    708  1.1  yamt 	 * in addition to the primary key, we check child_fileid as well here
    709  1.1  yamt 	 * to avoid removing an entry which was appeared after our VOP_LOOKUP.
    710  1.1  yamt 	 */
    711  1.1  yamt 	CREATECMD(c,
    712  1.1  yamt 		"DELETE FROM dirent "
    713  1.1  yamt 		"WHERE parent_fileid = $1 AND name = $2 AND child_fileid = $3",
    714  1.1  yamt 		INT8OID, TEXTOID, INT8OID);
    715  1.1  yamt 	error = simplecmd(xc, c, parent, name, child);
    716  1.1  yamt 	if (error != 0) {
    717  1.1  yamt 		return error;
    718  1.1  yamt 	}
    719  1.1  yamt 	error = update_nlink(xc, child, -1);
    720  1.1  yamt 	if (error != 0) {
    721  1.1  yamt 		return error;
    722  1.1  yamt 	}
    723  1.1  yamt 	error = update_mtime(xc, parent);
    724  1.1  yamt 	if (error != 0) {
    725  1.1  yamt 		return error;
    726  1.1  yamt 	}
    727  1.1  yamt 	return update_ctime(xc, child);
    728  1.1  yamt }
    729  1.1  yamt 
    730  1.1  yamt int
    731  1.1  yamt mklinkfile(struct Xconn *xc, fileid_t parent, const char *name,
    732  1.1  yamt     enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *idp)
    733  1.1  yamt {
    734  1.1  yamt 	fileid_t fileid;
    735  1.1  yamt 	int error;
    736  1.1  yamt 
    737  1.1  yamt 	error = mkfile(xc, vtype, mode, uid, gid, &fileid);
    738  1.1  yamt 	if (error != 0) {
    739  1.1  yamt 		return error;
    740  1.1  yamt 	}
    741  1.1  yamt 	error = linkfile(xc, parent, name, fileid);
    742  1.1  yamt 	if (error != 0) {
    743  1.1  yamt 		return error;
    744  1.1  yamt 	}
    745  1.1  yamt 	if (idp != NULL) {
    746  1.1  yamt 		*idp = fileid;
    747  1.1  yamt 	}
    748  1.1  yamt 	return 0;
    749  1.1  yamt }
    750  1.1  yamt 
    751  1.1  yamt int
    752  1.1  yamt mklinkfile_lo(struct Xconn *xc, fileid_t parent_fileid, const char *name,
    753  1.1  yamt     enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *fileidp,
    754  1.1  yamt     int *loidp)
    755  1.1  yamt {
    756  1.1  yamt 	static struct cmd *c;
    757  1.1  yamt 	fileid_t new_fileid;
    758  1.1  yamt 	int loid;
    759  1.1  yamt 	int error;
    760  1.1  yamt 
    761  1.1  yamt 	error = mklinkfile(xc, parent_fileid, name, vtype, mode, uid, gid,
    762  1.1  yamt 	    &new_fileid);
    763  1.1  yamt 	if (error != 0) {
    764  1.1  yamt 		return error;
    765  1.1  yamt 	}
    766  1.1  yamt 	CREATECMD(c,
    767  1.1  yamt 		"INSERT INTO datafork (fileid, loid) "
    768  1.1  yamt 		"VALUES($1, lo_creat(-1)) "
    769  1.1  yamt 		"RETURNING loid", INT8OID);
    770  1.1  yamt 	error = sendcmd(xc, c, new_fileid);
    771  1.1  yamt 	if (error != 0) {
    772  1.1  yamt 		return error;
    773  1.1  yamt 	}
    774  1.1  yamt 	error = simplefetch(xc, OIDOID, &loid);
    775  1.1  yamt 	if (error != 0) {
    776  1.1  yamt 		return error;
    777  1.1  yamt 	}
    778  1.1  yamt 	if (fileidp != NULL) {
    779  1.1  yamt 		*fileidp = new_fileid;
    780  1.1  yamt 	}
    781  1.1  yamt 	if (loidp != NULL) {
    782  1.1  yamt 		*loidp = loid;
    783  1.1  yamt 	}
    784  1.1  yamt 	return 0;
    785  1.1  yamt }
    786  1.1  yamt 
    787  1.1  yamt int
    788  1.4  yamt cleanupfile(struct Xconn *xc, fileid_t fileid)
    789  1.1  yamt {
    790  1.1  yamt 	static struct cmd *c;
    791  1.4  yamt 	char *type;
    792  1.4  yamt 	unsigned int vtype;
    793  1.4  yamt 	int error;
    794  1.1  yamt 
    795  1.4  yamt 	CREATECMD(c, "DELETE FROM file WHERE fileid = $1 AND nlink = 0 "
    796  1.4  yamt 		"RETURNING type::text", INT8OID);
    797  1.4  yamt 	error = sendcmd(xc, c, fileid);
    798  1.4  yamt 	if (error != 0) {
    799  1.4  yamt 		return error;
    800  1.4  yamt 	}
    801  1.4  yamt 	error = simplefetch(xc, TEXTOID, &type);
    802  1.4  yamt 	if (error == ENOENT) {
    803  1.4  yamt 		return 0; /* probably nlink > 0 */
    804  1.4  yamt 	}
    805  1.4  yamt 	if (error != 0) {
    806  1.4  yamt 		return error;
    807  1.4  yamt 	}
    808  1.4  yamt 	vtype = tovtype(type);
    809  1.4  yamt 	free(type);
    810  1.4  yamt 	if (vtype == VREG || vtype == VLNK) {
    811  1.1  yamt 		static struct cmd *c_datafork;
    812  1.2  yamt 		int32_t ret;
    813  1.1  yamt 
    814  1.1  yamt 		CREATECMD(c_datafork,
    815  1.2  yamt 			"WITH loids AS (DELETE FROM datafork WHERE fileid = $1 "
    816  1.2  yamt 			"RETURNING loid) SELECT lo_unlink(loid) FROM loids",
    817  1.2  yamt 			INT8OID);
    818  1.2  yamt 		error = sendcmd(xc, c_datafork, fileid);
    819  1.1  yamt 		if (error != 0) {
    820  1.1  yamt 			return error;
    821  1.1  yamt 		}
    822  1.2  yamt 		error = simplefetch(xc, INT4OID, &ret);
    823  1.2  yamt 		if (error != 0) {
    824  1.2  yamt 			return error;
    825  1.2  yamt 		}
    826  1.2  yamt 		if (ret != 1) {
    827  1.2  yamt 			return EIO; /* lo_unlink failed */
    828  1.2  yamt 		}
    829  1.1  yamt 	}
    830  1.4  yamt 	return 0;
    831  1.1  yamt }
    832  1.1  yamt 
    833  1.1  yamt /*
    834  1.1  yamt  * check_path: do locking and check to prevent a rename from creating loop.
    835  1.1  yamt  *
    836  1.1  yamt  * lock the dirents between child_fileid and the root directory.
    837  1.1  yamt  * if gate_fileid is appeared in the path, return EINVAL.
    838  1.1  yamt  * caller should ensure that child_fileid is of VDIR beforehand.
    839  1.1  yamt  *
    840  1.1  yamt  * we uses FOR SHARE row level locks as poor man's predicate locks.
    841  1.1  yamt  *
    842  1.1  yamt  * the following is an example to show why we need to lock the path.
    843  1.1  yamt  *
    844  1.1  yamt  * consider:
    845  1.1  yamt  * "mkdir -p /a/b/c/d/e/f && mkdir -p /1/2/3/4/5/6"
    846  1.1  yamt  * and then
    847  1.1  yamt  * thread 1 is doing "mv /a/b /1/2/3/4/5/6"
    848  1.1  yamt  * thread 2 is doing "mv /1/2 /a/b/c/d/e/f"
    849  1.1  yamt  *
    850  1.1  yamt  * a possible consequence:
    851  1.1  yamt  *	thread 1: check_path -> success
    852  1.1  yamt  *	thread 2: check_path -> success
    853  1.1  yamt  *	thread 1: modify directories -> block on row-level lock
    854  1.1  yamt  *	thread 2: modify directories -> block on row-level lock
    855  1.1  yamt  *			-> deadlock detected
    856  1.1  yamt  *			-> rollback and retry
    857  1.1  yamt  *
    858  1.1  yamt  * another possible consequence:
    859  1.1  yamt  *	thread 1: check_path -> success
    860  1.1  yamt  *	thread 1: modify directory entries -> success
    861  1.1  yamt  *	thread 2: check_path -> block on row-level lock
    862  1.1  yamt  *	thread 1: commit
    863  1.1  yamt  *	thread 2: acquire the lock and notices the row is updated
    864  1.1  yamt  *			-> serialization error
    865  1.1  yamt  *			-> rollback and retry
    866  1.1  yamt  *
    867  1.1  yamt  * XXX it might be better to use real serializable transactions,
    868  1.1  yamt  * which will be available for PostgreSQL 9.1
    869  1.1  yamt  */
    870  1.1  yamt 
    871  1.1  yamt int
    872  1.1  yamt check_path(struct Xconn *xc, fileid_t gate_fileid, fileid_t child_fileid)
    873  1.1  yamt {
    874  1.1  yamt 	static struct cmd *c;
    875  1.1  yamt 	fileid_t parent_fileid;
    876  1.1  yamt 	struct fetchstatus s;
    877  1.1  yamt 	int error;
    878  1.1  yamt 
    879  1.1  yamt 	CREATECMD(c,
    880  1.1  yamt 		"WITH RECURSIVE r AS "
    881  1.1  yamt 		"( "
    882  1.1  yamt 				"SELECT parent_fileid, cookie, child_fileid "
    883  1.1  yamt 				"FROM dirent "
    884  1.1  yamt 				"WHERE child_fileid = $1 "
    885  1.1  yamt 			"UNION ALL "
    886  1.1  yamt 				"SELECT d.parent_fileid, d.cookie, "
    887  1.1  yamt 				"d.child_fileid "
    888  1.1  yamt 				"FROM dirent AS d INNER JOIN r "
    889  1.1  yamt 				"ON d.child_fileid = r.parent_fileid "
    890  1.1  yamt 		") "
    891  1.1  yamt 		"SELECT d.parent_fileid "
    892  1.1  yamt 		"FROM dirent d "
    893  1.1  yamt 		"JOIN r "
    894  1.1  yamt 		"ON d.cookie = r.cookie "
    895  1.1  yamt 		"FOR SHARE", INT8OID);
    896  1.1  yamt 	error = sendcmd(xc, c, child_fileid);
    897  1.1  yamt 	if (error != 0) {
    898  1.1  yamt 		return error;
    899  1.1  yamt 	}
    900  1.1  yamt 	fetchinit(&s, xc);
    901  1.1  yamt 	do {
    902  1.1  yamt 		static const Oid types[] = { INT8OID, };
    903  1.1  yamt 
    904  1.1  yamt 		error = FETCHNEXT(&s, types, &parent_fileid);
    905  1.1  yamt 		if (error == ENOENT) {
    906  1.1  yamt 			fetchdone(&s);
    907  1.1  yamt 			return 0;
    908  1.1  yamt 		}
    909  1.1  yamt 		if (error != 0) {
    910  1.1  yamt 			fetchdone(&s);
    911  1.1  yamt 			return error;
    912  1.1  yamt 		}
    913  1.1  yamt 	} while (gate_fileid != parent_fileid);
    914  1.1  yamt 	fetchdone(&s);
    915  1.1  yamt 	return EINVAL;
    916  1.1  yamt }
    917  1.1  yamt 
    918  1.1  yamt int
    919  1.1  yamt isempty(struct Xconn *xc, fileid_t fileid, bool *emptyp)
    920  1.1  yamt {
    921  1.3  yamt 	int32_t dummy;
    922  1.1  yamt 	static struct cmd *c;
    923  1.1  yamt 	int error;
    924  1.1  yamt 
    925  1.1  yamt 	CREATECMD(c,
    926  1.1  yamt 		"SELECT 1 FROM dirent "
    927  1.1  yamt 		"WHERE parent_fileid = $1 LIMIT 1", INT8OID);
    928  1.1  yamt 	error = sendcmd(xc, c, fileid);
    929  1.1  yamt 	if (error != 0) {
    930  1.1  yamt 		return error;
    931  1.1  yamt 	}
    932  1.3  yamt 	error = simplefetch(xc, INT4OID, &dummy);
    933  1.1  yamt 	assert(error != 0 || dummy == 1);
    934  1.1  yamt 	if (error == ENOENT) {
    935  1.1  yamt 		*emptyp = true;
    936  1.1  yamt 		error = 0;
    937  1.1  yamt 	} else {
    938  1.1  yamt 		*emptyp = false;
    939  1.1  yamt 	}
    940  1.1  yamt 	return error;
    941  1.1  yamt }
    942