Home | History | Annotate | Line # | Download | only in xenbus
      1 /* $NetBSD: xenbus_xs.c,v 1.28 2022/09/01 16:25:18 bouyer Exp $ */
      2 /******************************************************************************
      3  * xenbus_xs.c
      4  *
      5  * This is the kernel equivalent of the "xs" library.  We don't need everything
      6  * and we use xenbus_comms for communication.
      7  *
      8  * Copyright (C) 2005 Rusty Russell, IBM Corporation
      9  *
     10  * This file may be distributed separately from the Linux kernel, or
     11  * incorporated into other software packages, subject to the following license:
     12  *
     13  * Permission is hereby granted, free of charge, to any person obtaining a copy
     14  * of this source file (the "Software"), to deal in the Software without
     15  * restriction, including without limitation the rights to use, copy, modify,
     16  * merge, publish, distribute, sublicense, and/or sell copies of the Software,
     17  * and to permit persons to whom the Software is furnished to do so, subject to
     18  * the following conditions:
     19  *
     20  * The above copyright notice and this permission notice shall be included in
     21  * all copies or substantial portions of the Software.
     22  *
     23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     24  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     25  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     26  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     27  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
     28  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
     29  * IN THE SOFTWARE.
     30  */
     31 
     32 #include <sys/cdefs.h>
     33 __KERNEL_RCSID(0, "$NetBSD: xenbus_xs.c,v 1.28 2022/09/01 16:25:18 bouyer Exp $");
     34 
     35 #if 0
     36 #define DPRINTK(fmt, args...) \
     37     printf("xenbus_xs (%s:%d) " fmt ".\n", __func__, __LINE__, ##args)
     38 #else
     39 #define DPRINTK(fmt, args...) ((void)0)
     40 #endif
     41 
     42 #include <sys/types.h>
     43 #include <sys/null.h>
     44 #include <sys/errno.h>
     45 #include <sys/malloc.h>
     46 #include <sys/systm.h>
     47 #include <sys/param.h>
     48 #include <sys/proc.h>
     49 #include <sys/mutex.h>
     50 #include <sys/kthread.h>
     51 
     52 #include <xen/xen.h>	/* for xendomain_is_dom0() */
     53 #include <xen/xenbus.h>
     54 #include "xenbus_comms.h"
     55 
     56 #define streq(a, b) (strcmp((a), (b)) == 0)
     57 
     58 struct xs_stored_msg {
     59 	SIMPLEQ_ENTRY(xs_stored_msg) msg_next;
     60 
     61 	struct xsd_sockmsg hdr;
     62 
     63 	union {
     64 		/* Queued replies. */
     65 		struct {
     66 			char *body;
     67 		} reply;
     68 
     69 		/* Queued watch events. */
     70 		struct {
     71 			struct xenbus_watch *handle;
     72 			char **vec;
     73 			unsigned int vec_size;
     74 		} watch;
     75 	} u;
     76 };
     77 
     78 struct xs_handle {
     79 	/* A list of replies. Currently only one will ever be outstanding. */
     80 	SIMPLEQ_HEAD(, xs_stored_msg) reply_list;
     81 	kmutex_t reply_lock;
     82 	kcondvar_t reply_cv;
     83 	kmutex_t xs_lock; /* serialize access to xenstore */
     84 	int suspend_spl;
     85 
     86 };
     87 
     88 static struct xs_handle xs_state;
     89 
     90 /* List of registered watches, and a lock to protect it. */
     91 static SLIST_HEAD(, xenbus_watch) watches;
     92 static kmutex_t watches_lock;
     93 
     94 /* List of pending watch callback events, and a lock to protect it. */
     95 static SIMPLEQ_HEAD(, xs_stored_msg) watch_events;
     96 static kmutex_t watch_events_lock;
     97 static kcondvar_t watch_cv;
     98 
     99 static int
    100 get_error(const char *errorstring)
    101 {
    102 	unsigned int i;
    103 
    104 	for (i = 0; !streq(errorstring, xsd_errors[i].errstring); i++) {
    105 		if (i == (sizeof(xsd_errors) / sizeof(xsd_errors[0]) - 1)) {
    106 			printf(
    107 			       "XENBUS xen store gave: unknown error %s",
    108 			       errorstring);
    109 			return EINVAL;
    110 		}
    111 	}
    112 	return xsd_errors[i].errnum;
    113 }
    114 
    115 static void *
    116 read_reply(enum xsd_sockmsg_type *type, unsigned int *len)
    117 {
    118 	struct xs_stored_msg *msg;
    119 	char *body;
    120 
    121 	mutex_enter(&xs_state.reply_lock);
    122 	while (SIMPLEQ_EMPTY(&xs_state.reply_list)) {
    123 		cv_wait(&xs_state.reply_cv, &xs_state.reply_lock);
    124 	}
    125 	msg = SIMPLEQ_FIRST(&xs_state.reply_list);
    126 	SIMPLEQ_REMOVE_HEAD(&xs_state.reply_list, msg_next);
    127 	mutex_exit(&xs_state.reply_lock);
    128 
    129 	*type = msg->hdr.type;
    130 	if (len)
    131 		*len = msg->hdr.len;
    132 	body = msg->u.reply.body;
    133 	DPRINTK("read_reply: type %d body %s",
    134 	    msg->hdr.type, body);
    135 
    136 	free(msg, M_DEVBUF);
    137 
    138 	return body;
    139 }
    140 
    141 #if 0
    142 /* Emergency write. */
    143 void
    144 xenbus_debug_write(const char *str, unsigned int count)
    145 {
    146 	struct xsd_sockmsg msg = { 0 };
    147 
    148 	msg.type = XS_DEBUG;
    149 	msg.len = sizeof("print") + count + 1;
    150 
    151 	xb_write(&msg, sizeof(msg));
    152 	xb_write("print", sizeof("print"));
    153 	xb_write(str, count);
    154 	xb_write("", 1);
    155 }
    156 #endif
    157 
    158 int
    159 xenbus_dev_request_and_reply(struct xsd_sockmsg *msg, void**reply)
    160 {
    161 	int err = 0;
    162 
    163 	mutex_enter(&xs_state.xs_lock);
    164 	err = xb_write(msg, sizeof(*msg) + msg->len);
    165 	if (err) {
    166 		msg->type = XS_ERROR;
    167 		*reply = NULL;
    168 	} else {
    169 		*reply = read_reply(&msg->type, &msg->len);
    170 	}
    171 	mutex_exit(&xs_state.xs_lock);
    172 
    173 	return err;
    174 }
    175 
    176 void
    177 xenbus_dev_reply_free(struct xsd_sockmsg *msg, void *reply)
    178 {
    179 	free(reply, M_DEVBUF);
    180 }
    181 
    182 /* Send message to xs, get kmalloc'ed reply.  ERR_PTR() on error. */
    183 static int
    184 xs_talkv(struct xenbus_transaction *t,
    185 		      enum xsd_sockmsg_type type,
    186 		      const struct iovec *iovec,
    187 		      unsigned int num_vecs,
    188 		      unsigned int *len,
    189 		      char **retbuf)
    190 {
    191 	struct xsd_sockmsg msg;
    192 	unsigned int i;
    193 	int err;
    194 	void *ret;
    195 
    196 	msg.tx_id = (uint32_t)(unsigned long)t;
    197 	msg.req_id = 0;
    198 	msg.type = type;
    199 	msg.len = 0;
    200 	for (i = 0; i < num_vecs; i++)
    201 		msg.len += iovec[i].iov_len;
    202 
    203 	mutex_enter(&xs_state.xs_lock);
    204 
    205 	DPRINTK("write msg");
    206 	err = xb_write(&msg, sizeof(msg));
    207 	DPRINTK("write msg err %d", err);
    208 	if (err) {
    209 		mutex_exit(&xs_state.xs_lock);
    210 		return (err);
    211 	}
    212 
    213 	for (i = 0; i < num_vecs; i++) {
    214 		DPRINTK("write iovect");
    215 		err = xb_write(iovec[i].iov_base, iovec[i].iov_len);
    216 		DPRINTK("write iovect err %d", err);
    217 		if (err) {
    218 			mutex_exit(&xs_state.xs_lock);
    219 			return (err);
    220 		}
    221 	}
    222 
    223 	DPRINTK("read");
    224 	ret = read_reply(&msg.type, len);
    225 	DPRINTK("read done");
    226 
    227 	mutex_exit(&xs_state.xs_lock);
    228 
    229 	if (msg.type == XS_ERROR) {
    230 		err = get_error(ret);
    231 		free(ret, M_DEVBUF);
    232 		return (err);
    233 	}
    234 
    235 	KASSERT(msg.type == type);
    236 	if (retbuf != NULL)
    237 		*retbuf = ret;
    238 	else
    239 		free(ret, M_DEVBUF);
    240 	return 0;
    241 }
    242 
    243 /* Simplified version of xs_talkv: single message. */
    244 static int
    245 xs_single(struct xenbus_transaction *t,
    246 		       enum xsd_sockmsg_type type,
    247 		       const char *string,
    248 		       unsigned int *len,
    249 		       char **ret)
    250 {
    251 	struct iovec iovec;
    252 
    253 	/* xs_talkv only reads iovec */
    254 	iovec.iov_base = __UNCONST(string);
    255 	iovec.iov_len = strlen(string) + 1;
    256 	return xs_talkv(t, type, &iovec, 1, len, ret);
    257 }
    258 
    259 static unsigned int
    260 count_strings(const char *strings, unsigned int len)
    261 {
    262 	unsigned int num;
    263 	const char *p;
    264 
    265 	for (p = strings, num = 0; p < strings + len; p += strlen(p) + 1)
    266 		num++;
    267 
    268 	return num;
    269 }
    270 
    271 /* Return the path to dir with /name appended. Buffer must be kfree()'ed. */
    272 static char *
    273 join(const char *dir, const char *name)
    274 {
    275 	char *buffer;
    276 
    277 	buffer = malloc(strlen(dir) + strlen("/") + strlen(name) + 1,
    278 			 M_DEVBUF, M_NOWAIT);
    279 	if (buffer == NULL)
    280 		return NULL;
    281 
    282 	strcpy(buffer, dir);
    283 	if (!streq(name, "")) {
    284 		strcat(buffer, "/");
    285 		strcat(buffer, name);
    286 	}
    287 
    288 	return buffer;
    289 }
    290 
    291 static char **
    292 split(char *strings, unsigned int len, unsigned int *num)
    293 {
    294 	char *p, **ret;
    295 
    296 	/* Count the strings. */
    297 	*num = count_strings(strings, len);
    298 
    299 	/* Transfer to one big alloc for easy freeing. */
    300 	ret = malloc(*num * sizeof(char *) + len, M_DEVBUF, M_NOWAIT);
    301 	if (!ret) {
    302 		free(strings, M_DEVBUF);
    303 		return NULL;
    304 	}
    305 	memcpy(&ret[*num], strings, len);
    306 	free(strings, M_DEVBUF);
    307 
    308 	strings = (char *)&ret[*num];
    309 	for (p = strings, *num = 0; p < strings + len; p += strlen(p) + 1)
    310 		ret[(*num)++] = p;
    311 
    312 	return ret;
    313 }
    314 
    315 int
    316 xenbus_directory(struct xenbus_transaction *t,
    317 			const char *dir, const char *node, unsigned int *num,
    318 			char ***retbuf)
    319 {
    320 	char *strings, *path;
    321 	unsigned int len;
    322 	int err;
    323 
    324 	path = join(dir, node);
    325 	if (path == NULL)
    326 		return ENOMEM;
    327 
    328 	err = xs_single(t, XS_DIRECTORY, path, &len, &strings);
    329 	DPRINTK("xs_single %d %d", err, len);
    330 	free(path, M_DEVBUF);
    331 	if (err)
    332 		return err;
    333 
    334 	DPRINTK("xs_single strings %s", strings);
    335 	*retbuf = split(strings, len, num);
    336 	if (*retbuf == NULL)
    337 		return ENOMEM;
    338 	return 0;
    339 }
    340 
    341 void
    342 xenbus_directory_free(unsigned int num, char **dir)
    343 {
    344 	free(dir, M_DEVBUF);
    345 }
    346 
    347 /* Check if a path exists. Return 1 if it does. */
    348 int
    349 xenbus_exists(struct xenbus_transaction *t,
    350 		  const char *dir, const char *node)
    351 {
    352 	char **d;
    353 	int dir_n, err;
    354 
    355 	err = xenbus_directory(t, dir, node, &dir_n, &d);
    356 	if (err)
    357 		return 0;
    358 	free(d, M_DEVBUF);
    359 	return 1;
    360 }
    361 
    362 /* Get the value of a single file.
    363  * Returns a kmalloced value: call free() on it after use.
    364  * len indicates length in bytes.
    365  */
    366 int
    367 xenbus_read(struct xenbus_transaction *t,
    368 		  const char *dir, const char *node,
    369 		  char *buffer, size_t bufsz)
    370 {
    371 	char *path;
    372 	int err;
    373 	char *ret;
    374 	unsigned int len;
    375 
    376 	path = join(dir, node);
    377 	if (path == NULL)
    378 		return ENOMEM;
    379 
    380 	err = xs_single(t, XS_READ, path, &len, &ret);
    381 
    382 	if (err == 0) {
    383 		if (len + 1 <= bufsz) {
    384 			strncpy(buffer, ret, bufsz);
    385 		} else {
    386 			err = ENAMETOOLONG;
    387 		}
    388 		free(ret, M_DEVBUF);
    389 	}
    390 
    391 	free(path, M_DEVBUF);
    392 	return err;
    393 }
    394 
    395 /* Read a node and convert it to unsigned long. */
    396 int
    397 xenbus_read_ul(struct xenbus_transaction *t,
    398 		  const char *dir, const char *node, unsigned long *val,
    399 		  int base)
    400 {
    401 	char string[32], *ep;
    402 	int err;
    403 
    404 	err = xenbus_read(t, dir, node, string, sizeof(string));
    405 	if (err)
    406 		return err;
    407 	*val = strtoul(string, &ep, base);
    408 	if (*ep != '\0') {
    409 		return EFTYPE;
    410 	}
    411 	return 0;
    412 }
    413 
    414 /* Read a node and convert it to unsigned long long. */
    415 int
    416 xenbus_read_ull(struct xenbus_transaction *t,
    417 		  const char *dir, const char *node, unsigned long long *val,
    418 		  int base)
    419 {
    420 	char string[32], *ep;
    421 	int err;
    422 
    423 	err = xenbus_read(t, dir, node, string, sizeof(string));
    424 	if (err)
    425 		return err;
    426 	*val = strtoull(string, &ep, base);
    427 	if (*ep != '\0') {
    428 		return EFTYPE;
    429 	}
    430 	return 0;
    431 }
    432 
    433 /* Write the value of a single file.
    434  * Returns -err on failure.
    435  */
    436 int
    437 xenbus_write(struct xenbus_transaction *t,
    438 		 const char *dir, const char *node, const char *string)
    439 {
    440 	const char *path;
    441 	struct iovec iovec[2];
    442 	int ret;
    443 
    444 	path = join(dir, node);
    445 	if (path == NULL)
    446 		return ENOMEM;
    447 
    448 	/* xs_talkv only reads iovec */
    449 	iovec[0].iov_base = __UNCONST(path);
    450 	iovec[0].iov_len = strlen(path) + 1;
    451 	iovec[1].iov_base = __UNCONST(string);
    452 	iovec[1].iov_len = strlen(string);
    453 
    454 	ret = xs_talkv(t, XS_WRITE, iovec, 2, NULL, NULL);
    455 	return ret;
    456 }
    457 
    458 /* Create a new directory. */
    459 int
    460 xenbus_mkdir(struct xenbus_transaction *t,
    461 		 const char *dir, const char *node)
    462 {
    463 	char *path;
    464 	int ret;
    465 
    466 	path = join(dir, node);
    467 	if (path == NULL)
    468 		return ENOMEM;
    469 
    470 	ret = xs_single(t, XS_MKDIR, path, NULL, NULL);
    471 	free(path, M_DEVBUF);
    472 	return ret;
    473 }
    474 
    475 /* Destroy a file or directory (directories must be empty). */
    476 int xenbus_rm(struct xenbus_transaction *t, const char *dir, const char *node)
    477 {
    478 	char *path;
    479 	int ret;
    480 
    481 	path = join(dir, node);
    482 	if (path == NULL)
    483 		return ENOMEM;
    484 
    485 	ret = xs_single(t, XS_RM, path, NULL, NULL);
    486 	free(path, M_DEVBUF);
    487 	return ret;
    488 }
    489 
    490 /* Start a transaction: changes by others will not be seen during this
    491  * transaction, and changes will not be visible to others until end.
    492  * MUST BE CALLED AT IPL_TTY !
    493  */
    494 struct xenbus_transaction *
    495 xenbus_transaction_start(void)
    496 {
    497 	char *id_str;
    498 	unsigned long id, err;
    499 
    500 	err = xs_single(NULL, XS_TRANSACTION_START, "", NULL, &id_str);
    501 	if (err) {
    502 		return NULL;
    503 	}
    504 
    505 	id = strtoul(id_str, NULL, 0);
    506 	free(id_str, M_DEVBUF);
    507 
    508 	return (struct xenbus_transaction *)id;
    509 }
    510 
    511 /* End a transaction.
    512  * If abandon is true, transaction is discarded instead of committed.
    513  * MUST BE CALLED AT IPL_TTY !
    514  */
    515 int xenbus_transaction_end(struct xenbus_transaction *t, int abort)
    516 {
    517 	char abortstr[2];
    518 	int err;
    519 
    520 	if (abort)
    521 		strcpy(abortstr, "F");
    522 	else
    523 		strcpy(abortstr, "T");
    524 
    525 	err = xs_single(t, XS_TRANSACTION_END, abortstr, NULL, NULL);
    526 
    527 	return err;
    528 }
    529 
    530 /* Single printf and write: returns -errno or 0. */
    531 int
    532 xenbus_printf(struct xenbus_transaction *t,
    533 		  const char *dir, const char *node, const char *fmt, ...)
    534 {
    535 	va_list ap;
    536 	int ret;
    537 #define PRINTF_BUFFER_SIZE 4096
    538 	char *printf_buffer;
    539 
    540 	printf_buffer = malloc(PRINTF_BUFFER_SIZE, M_DEVBUF, M_NOWAIT);
    541 	if (printf_buffer == NULL)
    542 		return ENOMEM;
    543 
    544 	va_start(ap, fmt);
    545 	ret = vsnprintf(printf_buffer, PRINTF_BUFFER_SIZE, fmt, ap);
    546 	va_end(ap);
    547 
    548 	KASSERT(ret < PRINTF_BUFFER_SIZE);
    549 	ret = xenbus_write(t, dir, node, printf_buffer);
    550 
    551 	free(printf_buffer, M_DEVBUF);
    552 
    553 	return ret;
    554 }
    555 
    556 static int
    557 xs_watch(const char *path, const char *token)
    558 {
    559 	struct iovec iov[2];
    560 
    561 	/* xs_talkv only reads iovec */
    562 	iov[0].iov_base = __UNCONST(path);
    563 	iov[0].iov_len = strlen(path) + 1;
    564 	iov[1].iov_base = __UNCONST(token);
    565 	iov[1].iov_len = strlen(token) + 1;
    566 
    567 	return xs_talkv(NULL, XS_WATCH, iov, 2, NULL, NULL);
    568 }
    569 
    570 static int
    571 xs_unwatch(const char *path, const char *token)
    572 {
    573 	struct iovec iov[2];
    574 
    575 	/* xs_talkv only reads iovec */
    576 	iov[0].iov_base = __UNCONST(path);
    577 	iov[0].iov_len = strlen(path) + 1;
    578 	iov[1].iov_base = __UNCONST(token);
    579 	iov[1].iov_len = strlen(token) + 1;
    580 
    581 	return xs_talkv(NULL, XS_UNWATCH, iov, 2, NULL, NULL);
    582 }
    583 
    584 static struct xenbus_watch *
    585 find_watch(const char *token)
    586 {
    587 	struct xenbus_watch *i, *cmp;
    588 
    589 	cmp = (void *)strtoul(token, NULL, 16);
    590 
    591 	SLIST_FOREACH(i, &watches, watch_next) {
    592 		if (i == cmp)
    593 			return i;
    594 	}
    595 
    596 	return NULL;
    597 }
    598 
    599 /* Register callback to watch this node. */
    600 int
    601 register_xenbus_watch(struct xenbus_watch *watch)
    602 {
    603 	/* Pointer in ascii is the token. */
    604 	char token[sizeof(watch) * 2 + 1];
    605 	int err;
    606 
    607 	snprintf(token, sizeof(token), "%lX", (long)watch);
    608 
    609 	mutex_enter(&watches_lock);
    610 	KASSERT(find_watch(token) == 0);
    611 	SLIST_INSERT_HEAD(&watches, watch, watch_next);
    612 	mutex_exit(&watches_lock);
    613 
    614 	err = xs_watch(watch->node, token);
    615 
    616 	/* Ignore errors due to multiple registration. */
    617 	if ((err != 0) && (err != EEXIST)) {
    618 		mutex_enter(&watches_lock);
    619 		SLIST_REMOVE(&watches, watch, xenbus_watch, watch_next);
    620 		mutex_exit(&watches_lock);
    621 	}
    622 	return err;
    623 }
    624 
    625 void
    626 unregister_xenbus_watch(struct xenbus_watch *watch)
    627 {
    628 	SIMPLEQ_HEAD(, xs_stored_msg) gclist;
    629 	struct xs_stored_msg *msg, *next_msg;
    630 	char token[sizeof(watch) * 2 + 1];
    631 	int err;
    632 
    633 	snprintf(token, sizeof(token), "%lX", (long)watch);
    634 
    635 	mutex_enter(&watches_lock);
    636 	KASSERT(find_watch(token));
    637 	SLIST_REMOVE(&watches, watch, xenbus_watch, watch_next);
    638 	mutex_exit(&watches_lock);
    639 
    640 	err = xs_unwatch(watch->node, token);
    641 	if (err) {
    642 		printf(
    643 		       "XENBUS Failed to release watch %s: %i\n",
    644 		       watch->node, err);
    645 	}
    646 
    647 	/* Cancel pending watch events. */
    648 	SIMPLEQ_INIT(&gclist);
    649 	mutex_enter(&watch_events_lock);
    650 	for (msg = SIMPLEQ_FIRST(&watch_events); msg != NULL; msg = next_msg) {
    651 		next_msg = SIMPLEQ_NEXT(msg, msg_next);
    652 		if (msg->u.watch.handle != watch)
    653 			continue;
    654 		SIMPLEQ_REMOVE(&watch_events, msg, xs_stored_msg, msg_next);
    655 		SIMPLEQ_INSERT_TAIL(&gclist, msg, msg_next);
    656 	}
    657 	mutex_exit(&watch_events_lock);
    658 
    659 	while ((msg = SIMPLEQ_FIRST(&gclist)) != NULL) {
    660 		SIMPLEQ_REMOVE(&gclist, msg, xs_stored_msg, msg_next);
    661 		free(msg->u.watch.vec, M_DEVBUF);
    662 		free(msg, M_DEVBUF);
    663 	}
    664 }
    665 
    666 void
    667 xs_suspend(void)
    668 {
    669 	xs_state.suspend_spl = spltty();
    670 }
    671 
    672 void
    673 xs_resume(void)
    674 {
    675 	struct xenbus_watch *watch;
    676 	char token[sizeof(watch) * 2 + 1];
    677 	/* No need for watches_lock: the suspend_mutex is sufficient. */
    678 	SLIST_FOREACH(watch, &watches, watch_next) {
    679 		snprintf(token, sizeof(token), "%lX", (long)watch);
    680 		xs_watch(watch->node, token);
    681 	}
    682 
    683 	splx(xs_state.suspend_spl);
    684 }
    685 
    686 static void
    687 xenwatch_thread(void *unused)
    688 {
    689 	SIMPLEQ_HEAD(, xs_stored_msg) events_to_proces;
    690 	struct xs_stored_msg *msg;
    691 
    692 	SIMPLEQ_INIT(&events_to_proces);
    693 	for (;;) {
    694 		mutex_enter(&watch_events_lock);
    695 		while (SIMPLEQ_EMPTY(&watch_events))
    696 			cv_wait(&watch_cv, &watch_events_lock);
    697 		SIMPLEQ_CONCAT(&events_to_proces, &watch_events);
    698 		mutex_exit(&watch_events_lock);
    699 
    700 		DPRINTK("xenwatch_thread: processing events");
    701 
    702 		while ((msg = SIMPLEQ_FIRST(&events_to_proces)) != NULL) {
    703 			DPRINTK("xenwatch_thread: got event");
    704 			SIMPLEQ_REMOVE_HEAD(&events_to_proces, msg_next);
    705 			msg->u.watch.handle->xbw_callback(
    706 				msg->u.watch.handle,
    707 				(void *)msg->u.watch.vec,
    708 				msg->u.watch.vec_size);
    709 			free(msg->u.watch.vec, M_DEVBUF);
    710 			free(msg, M_DEVBUF);
    711 		}
    712 	}
    713 }
    714 
    715 static int
    716 process_msg(void)
    717 {
    718 	struct xs_stored_msg *msg, *s_msg;
    719 	char *body;
    720 	int err;
    721 
    722 	msg = malloc(sizeof(*msg), M_DEVBUF, M_WAITOK);
    723 	if (msg == NULL)
    724 		return ENOMEM;
    725 
    726 	err = xb_read(&msg->hdr, sizeof(msg->hdr));
    727 	DPRINTK("xb_read hdr %d", err);
    728 	if (err) {
    729 		free(msg, M_DEVBUF);
    730 		return err;
    731 	}
    732 
    733 	body = malloc(msg->hdr.len + 1, M_DEVBUF, M_NOWAIT);
    734 	if (body == NULL) {
    735 		free(msg, M_DEVBUF);
    736 		return ENOMEM;
    737 	}
    738 
    739 	err = xb_read(body, msg->hdr.len);
    740 	DPRINTK("xb_read body %d", err);
    741 	if (err) {
    742 		free(body, M_DEVBUF);
    743 		free(msg, M_DEVBUF);
    744 		return err;
    745 	}
    746 	body[msg->hdr.len] = '\0';
    747 
    748 	if (msg->hdr.type == XS_WATCH_EVENT) {
    749 		bool found, repeated;
    750 
    751 		DPRINTK("process_msg: XS_WATCH_EVENT");
    752 		msg->u.watch.vec = split(body, msg->hdr.len,
    753 					 &msg->u.watch.vec_size);
    754 		if (msg->u.watch.vec == NULL) {
    755 			free(msg, M_DEVBUF);
    756 			return ENOMEM;
    757 		}
    758 
    759 		mutex_enter(&watches_lock);
    760 		msg->u.watch.handle = find_watch(
    761 		    msg->u.watch.vec[XS_WATCH_TOKEN]);
    762 		found = (msg->u.watch.handle != NULL);
    763 		repeated = false;
    764 		if (found) {
    765 			mutex_enter(&watch_events_lock);
    766 			/* Don't add duplicate events to the queue of pending watches */
    767 			SIMPLEQ_FOREACH(s_msg, &watch_events, msg_next) {
    768 				if (s_msg->u.watch.handle == msg->u.watch.handle) {
    769 					repeated = true;
    770 					break;
    771 				}
    772 			}
    773 			if (!repeated) {
    774 				SIMPLEQ_INSERT_TAIL(&watch_events, msg, msg_next);
    775 				cv_broadcast(&watch_cv);
    776 			}
    777 			mutex_exit(&watch_events_lock);
    778 		}
    779 		mutex_exit(&watches_lock);
    780 		if (!found || repeated) {
    781 			free(msg->u.watch.vec, M_DEVBUF);
    782 			free(msg, M_DEVBUF);
    783 		}
    784 	} else {
    785 		DPRINTK("process_msg: type %d body %s", msg->hdr.type, body);
    786 
    787 		msg->u.reply.body = body;
    788 		mutex_enter(&xs_state.reply_lock);
    789 		SIMPLEQ_INSERT_TAIL(&xs_state.reply_list, msg, msg_next);
    790 		cv_broadcast(&xs_state.reply_cv);
    791 		mutex_exit(&xs_state.reply_lock);
    792 	}
    793 
    794 	return 0;
    795 }
    796 
    797 static void
    798 xenbus_thread(void *unused)
    799 {
    800 	int err;
    801 
    802 	for (;;) {
    803 		err = process_msg();
    804 		if (err)
    805 			printk("XENBUS error %d while reading message\n", err);
    806 	}
    807 }
    808 
    809 int
    810 xs_init(device_t dev)
    811 {
    812 	int err;
    813 
    814 	SLIST_INIT(&watches);
    815 	mutex_init(&watches_lock, MUTEX_DEFAULT, IPL_TTY);
    816 
    817 	SIMPLEQ_INIT(&watch_events);
    818 	mutex_init(&watch_events_lock, MUTEX_DEFAULT, IPL_TTY);
    819 	cv_init(&watch_cv, "evtsq");
    820 
    821 	SIMPLEQ_INIT(&xs_state.reply_list);
    822 	mutex_init(&xs_state.xs_lock, MUTEX_DEFAULT, IPL_NONE);
    823 	mutex_init(&xs_state.reply_lock, MUTEX_DEFAULT, IPL_TTY);
    824 	cv_init(&xs_state.reply_cv, "rplq");
    825 
    826 	err = kthread_create(PRI_NONE, KTHREAD_MPSAFE, NULL, xenwatch_thread,
    827 	    NULL, NULL, "xenwatch");
    828 	if (err) {
    829 		aprint_error_dev(dev, "kthread_create(xenwatch): %d\n", err);
    830 		return err;
    831 	}
    832 
    833 	err = kthread_create(PRI_NONE, KTHREAD_MPSAFE, NULL, xenbus_thread,
    834 	    NULL, NULL, "xenbus");
    835 	if (err) {
    836 		aprint_error_dev(dev, "kthread_create(xenbus): %d\n", err);
    837 		return err;
    838 	}
    839 
    840 	return 0;
    841 }
    842 
    843 /*
    844  * Local variables:
    845  *  c-file-style: "linux"
    846  *  indent-tabs-mode: t
    847  *  c-indent-level: 8
    848  *  c-basic-offset: 8
    849  *  tab-width: 8
    850  * End:
    851  */
    852