Home | History | Annotate | Line # | Download | only in librumpuser
rumpuser_sp.c revision 1.25
      1 /*      $NetBSD: rumpuser_sp.c,v 1.25 2010/12/12 17:58:28 pooka Exp $	*/
      2 
      3 /*
      4  * Copyright (c) 2010 Antti Kantee.  All Rights Reserved.
      5  *
      6  * Redistribution and use in source and binary forms, with or without
      7  * modification, are permitted provided that the following conditions
      8  * are met:
      9  * 1. Redistributions of source code must retain the above copyright
     10  *    notice, this list of conditions and the following disclaimer.
     11  * 2. Redistributions in binary form must reproduce the above copyright
     12  *    notice, this list of conditions and the following disclaimer in the
     13  *    documentation and/or other materials provided with the distribution.
     14  *
     15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
     16  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
     17  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
     18  * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
     19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
     20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
     21  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
     22  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
     23  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
     24  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
     25  * SUCH DAMAGE.
     26  */
     27 
     28 /*
     29  * Sysproxy routines.  This provides system RPC support over host sockets.
     30  * The most notable limitation is that the client and server must share
     31  * the same ABI.  This does not mean that they have to be the same
     32  * machine or that they need to run the same version of the host OS,
     33  * just that they must agree on the data structures.  This even *might*
     34  * work correctly from one hardware architecture to another.
     35  */
     36 
     37 #include <sys/cdefs.h>
     38 __RCSID("$NetBSD: rumpuser_sp.c,v 1.25 2010/12/12 17:58:28 pooka Exp $");
     39 
     40 #include <sys/types.h>
     41 #include <sys/atomic.h>
     42 #include <sys/mman.h>
     43 #include <sys/socket.h>
     44 
     45 #include <arpa/inet.h>
     46 #include <netinet/in.h>
     47 #include <netinet/tcp.h>
     48 
     49 #include <assert.h>
     50 #include <errno.h>
     51 #include <fcntl.h>
     52 #include <poll.h>
     53 #include <pthread.h>
     54 #include <stdarg.h>
     55 #include <stdio.h>
     56 #include <stdlib.h>
     57 #include <string.h>
     58 #include <unistd.h>
     59 
     60 #include <rump/rumpuser.h>
     61 #include "rumpuser_int.h"
     62 
     63 #include "sp_common.c"
     64 
     65 #ifndef MAXCLI
     66 #define MAXCLI 256
     67 #endif
     68 #ifndef MAXWORKER
     69 #define MAXWORKER 128
     70 #endif
     71 #ifndef IDLEWORKER
     72 #define IDLEWORKER 16
     73 #endif
     74 int rumpsp_maxworker = MAXWORKER;
     75 int rumpsp_idleworker = IDLEWORKER;
     76 
     77 static struct pollfd pfdlist[MAXCLI];
     78 static struct spclient spclist[MAXCLI];
     79 static unsigned int disco;
     80 static volatile int spfini;
     81 
     82 static struct rumpuser_sp_ops spops;
     83 
     84 /*
     85  * Manual wrappers, since librump does not have access to the
     86  * user namespace wrapped interfaces.
     87  */
     88 
     89 static void
     90 lwproc_switch(struct lwp *l)
     91 {
     92 
     93 	spops.spop_schedule();
     94 	spops.spop_lwproc_switch(l);
     95 	spops.spop_unschedule();
     96 }
     97 
     98 static void
     99 lwproc_release(void)
    100 {
    101 
    102 	spops.spop_schedule();
    103 	spops.spop_lwproc_release();
    104 	spops.spop_unschedule();
    105 }
    106 
    107 static int
    108 lwproc_newproc(struct spclient *spc)
    109 {
    110 	int rv;
    111 
    112 	spops.spop_schedule();
    113 	rv = spops.spop_lwproc_newproc(spc);
    114 	spops.spop_unschedule();
    115 
    116 	return rv;
    117 }
    118 
    119 static int
    120 lwproc_newlwp(pid_t pid)
    121 {
    122 	int rv;
    123 
    124 	spops.spop_schedule();
    125 	rv = spops.spop_lwproc_newlwp(pid);
    126 	spops.spop_unschedule();
    127 
    128 	return rv;
    129 }
    130 
    131 static struct lwp *
    132 lwproc_curlwp(void)
    133 {
    134 	struct lwp *l;
    135 
    136 	spops.spop_schedule();
    137 	l = spops.spop_lwproc_curlwp();
    138 	spops.spop_unschedule();
    139 
    140 	return l;
    141 }
    142 
    143 static pid_t
    144 lwproc_getpid(void)
    145 {
    146 	pid_t p;
    147 
    148 	spops.spop_schedule();
    149 	p = spops.spop_getpid();
    150 	spops.spop_unschedule();
    151 
    152 	return p;
    153 }
    154 
    155 static int
    156 rumpsyscall(int sysnum, void *data, register_t *retval)
    157 {
    158 	int rv;
    159 
    160 	spops.spop_schedule();
    161 	rv = spops.spop_syscall(sysnum, data, retval);
    162 	spops.spop_unschedule();
    163 
    164 	return rv;
    165 }
    166 
    167 static uint64_t
    168 nextreq(struct spclient *spc)
    169 {
    170 	uint64_t nw;
    171 
    172 	pthread_mutex_lock(&spc->spc_mtx);
    173 	nw = spc->spc_nextreq++;
    174 	pthread_mutex_unlock(&spc->spc_mtx);
    175 
    176 	return nw;
    177 }
    178 
    179 static void
    180 send_error_resp(struct spclient *spc, uint64_t reqno, int error)
    181 {
    182 	struct rsp_hdr rhdr;
    183 
    184 	rhdr.rsp_len = sizeof(rhdr);
    185 	rhdr.rsp_reqno = reqno;
    186 	rhdr.rsp_class = RUMPSP_ERROR;
    187 	rhdr.rsp_type = 0;
    188 	rhdr.rsp_error = error;
    189 
    190 	sendlock(spc);
    191 	(void)dosend(spc, &rhdr, sizeof(rhdr));
    192 	sendunlock(spc);
    193 }
    194 
    195 static int
    196 send_syscall_resp(struct spclient *spc, uint64_t reqno, int error,
    197 	register_t *retval)
    198 {
    199 	struct rsp_hdr rhdr;
    200 	struct rsp_sysresp sysresp;
    201 	int rv;
    202 
    203 	rhdr.rsp_len = sizeof(rhdr) + sizeof(sysresp);
    204 	rhdr.rsp_reqno = reqno;
    205 	rhdr.rsp_class = RUMPSP_RESP;
    206 	rhdr.rsp_type = RUMPSP_SYSCALL;
    207 	rhdr.rsp_sysnum = 0;
    208 
    209 	sysresp.rsys_error = error;
    210 	memcpy(sysresp.rsys_retval, retval, sizeof(sysresp.rsys_retval));
    211 
    212 	sendlock(spc);
    213 	rv = dosend(spc, &rhdr, sizeof(rhdr));
    214 	rv = dosend(spc, &sysresp, sizeof(sysresp));
    215 	sendunlock(spc);
    216 
    217 	return rv;
    218 }
    219 
    220 static int
    221 copyin_req(struct spclient *spc, const void *remaddr, size_t *dlen,
    222 	int wantstr, void **resp)
    223 {
    224 	struct rsp_hdr rhdr;
    225 	struct rsp_copydata copydata;
    226 	struct respwait rw;
    227 	int rv;
    228 
    229 	DPRINTF(("copyin_req: %zu bytes from %p\n", *dlen, remaddr));
    230 
    231 	rhdr.rsp_len = sizeof(rhdr) + sizeof(copydata);
    232 	rhdr.rsp_class = RUMPSP_REQ;
    233 	if (wantstr)
    234 		rhdr.rsp_type = RUMPSP_COPYINSTR;
    235 	else
    236 		rhdr.rsp_type = RUMPSP_COPYIN;
    237 	rhdr.rsp_sysnum = 0;
    238 
    239 	copydata.rcp_addr = __UNCONST(remaddr);
    240 	copydata.rcp_len = *dlen;
    241 
    242 	putwait(spc, &rw, &rhdr);
    243 	rv = dosend(spc, &rhdr, sizeof(rhdr));
    244 	rv = dosend(spc, &copydata, sizeof(copydata));
    245 	if (rv) {
    246 		unputwait(spc, &rw);
    247 		return rv;
    248 	}
    249 
    250 	rv = waitresp(spc, &rw);
    251 
    252 	DPRINTF(("copyin: response %d\n", rv));
    253 
    254 	*resp = rw.rw_data;
    255 	if (wantstr)
    256 		*dlen = rw.rw_dlen;
    257 
    258 	return rv;
    259 
    260 }
    261 
    262 static int
    263 send_copyout_req(struct spclient *spc, const void *remaddr,
    264 	const void *data, size_t dlen)
    265 {
    266 	struct rsp_hdr rhdr;
    267 	struct rsp_copydata copydata;
    268 	int rv;
    269 
    270 	DPRINTF(("copyout_req (async): %zu bytes to %p\n", dlen, remaddr));
    271 
    272 	rhdr.rsp_len = sizeof(rhdr) + sizeof(copydata) + dlen;
    273 	rhdr.rsp_reqno = nextreq(spc);
    274 	rhdr.rsp_class = RUMPSP_REQ;
    275 	rhdr.rsp_type = RUMPSP_COPYOUT;
    276 	rhdr.rsp_sysnum = 0;
    277 
    278 	copydata.rcp_addr = __UNCONST(remaddr);
    279 	copydata.rcp_len = dlen;
    280 
    281 	sendlock(spc);
    282 	rv = dosend(spc, &rhdr, sizeof(rhdr));
    283 	rv = dosend(spc, &copydata, sizeof(copydata));
    284 	rv = dosend(spc, data, dlen);
    285 	sendunlock(spc);
    286 
    287 	return rv;
    288 }
    289 
    290 static int
    291 anonmmap_req(struct spclient *spc, size_t howmuch, void **resp)
    292 {
    293 	struct rsp_hdr rhdr;
    294 	struct respwait rw;
    295 	int rv;
    296 
    297 	DPRINTF(("anonmmap_req: %zu bytes\n", howmuch));
    298 
    299 	rhdr.rsp_len = sizeof(rhdr) + sizeof(howmuch);
    300 	rhdr.rsp_class = RUMPSP_REQ;
    301 	rhdr.rsp_type = RUMPSP_ANONMMAP;
    302 	rhdr.rsp_sysnum = 0;
    303 
    304 	putwait(spc, &rw, &rhdr);
    305 	rv = dosend(spc, &rhdr, sizeof(rhdr));
    306 	rv = dosend(spc, &howmuch, sizeof(howmuch));
    307 	if (rv) {
    308 		unputwait(spc, &rw);
    309 		return rv;
    310 	}
    311 
    312 	rv = waitresp(spc, &rw);
    313 
    314 	*resp = rw.rw_data;
    315 
    316 	DPRINTF(("anonmmap: mapped at %p\n", **(void ***)resp));
    317 
    318 	return rv;
    319 }
    320 
    321 static void
    322 spcref(struct spclient *spc)
    323 {
    324 
    325 	pthread_mutex_lock(&spc->spc_mtx);
    326 	spc->spc_refcnt++;
    327 	pthread_mutex_unlock(&spc->spc_mtx);
    328 }
    329 
    330 static void
    331 spcrelease(struct spclient *spc)
    332 {
    333 	int ref;
    334 
    335 	pthread_mutex_lock(&spc->spc_mtx);
    336 	ref = --spc->spc_refcnt;
    337 	pthread_mutex_unlock(&spc->spc_mtx);
    338 
    339 	if (ref > 0)
    340 		return;
    341 
    342 	DPRINTF(("spcrelease: spc %p fd %d\n", spc, spc->spc_fd));
    343 
    344 	_DIAGASSERT(TAILQ_EMPTY(&spc->spc_respwait));
    345 	_DIAGASSERT(spc->spc_buf == NULL);
    346 
    347 	lwproc_switch(spc->spc_mainlwp);
    348 	lwproc_release();
    349 	spc->spc_mainlwp = NULL;
    350 
    351 	close(spc->spc_fd);
    352 	spc->spc_fd = -1;
    353 	spc->spc_dying = 0;
    354 
    355 	atomic_inc_uint(&disco);
    356 }
    357 
    358 static void
    359 serv_handledisco(unsigned int idx)
    360 {
    361 	struct spclient *spc = &spclist[idx];
    362 
    363 	DPRINTF(("rump_sp: disconnecting [%u]\n", idx));
    364 
    365 	pfdlist[idx].fd = -1;
    366 	pfdlist[idx].revents = 0;
    367 	pthread_mutex_lock(&spc->spc_mtx);
    368 	spc->spc_dying = 1;
    369 	kickall(spc);
    370 	pthread_mutex_unlock(&spc->spc_mtx);
    371 
    372 	/*
    373 	 * Nobody's going to attempt to send/receive anymore,
    374 	 * so reinit info relevant to that.
    375 	 */
    376 	/*LINTED:pointer casts may be ok*/
    377 	memset((char *)spc + SPC_ZEROFF, 0, sizeof(*spc) - SPC_ZEROFF);
    378 
    379 	spcrelease(spc);
    380 }
    381 
    382 static void
    383 serv_shutdown(void)
    384 {
    385 	struct spclient *spc;
    386 	unsigned int i;
    387 
    388 	for (i = 1; i < MAXCLI; i++) {
    389 		spc = &spclist[i];
    390 		if (spc->spc_fd == -1)
    391 			continue;
    392 
    393 		shutdown(spc->spc_fd, SHUT_RDWR);
    394 		serv_handledisco(i);
    395 
    396 		spcrelease(spc);
    397 	}
    398 }
    399 
    400 static unsigned
    401 serv_handleconn(int fd, connecthook_fn connhook, int busy)
    402 {
    403 	struct sockaddr_storage ss;
    404 	socklen_t sl = sizeof(ss);
    405 	int newfd, flags;
    406 	unsigned i;
    407 
    408 	/*LINTED: cast ok */
    409 	newfd = accept(fd, (struct sockaddr *)&ss, &sl);
    410 	if (newfd == -1)
    411 		return 0;
    412 
    413 	if (busy) {
    414 		close(newfd); /* EBUSY */
    415 		return 0;
    416 	}
    417 
    418 	/* XXX: should do some sort of handshake too */
    419 
    420 	flags = fcntl(newfd, F_GETFL, 0);
    421 	if (fcntl(newfd, F_SETFL, flags | O_NONBLOCK) == -1) {
    422 		close(newfd);
    423 		return 0;
    424 	}
    425 
    426 	if (connhook(newfd) != 0) {
    427 		close(newfd);
    428 		return 0;
    429 	}
    430 
    431 	/* find empty slot the simple way */
    432 	for (i = 0; i < MAXCLI; i++) {
    433 		if (pfdlist[i].fd == -1 && spclist[i].spc_dying == 0)
    434 			break;
    435 	}
    436 
    437 	if (lwproc_newproc(&spclist[i]) != 0) {
    438 		close(newfd);
    439 		return 0;
    440 	}
    441 
    442 	assert(i < MAXCLI);
    443 
    444 	pfdlist[i].fd = newfd;
    445 	spclist[i].spc_fd = newfd;
    446 	spclist[i].spc_mainlwp = lwproc_curlwp();
    447 	spclist[i].spc_istatus = SPCSTATUS_BUSY; /* dedicated receiver */
    448 	spclist[i].spc_pid = lwproc_getpid();
    449 	spclist[i].spc_refcnt = 1;
    450 
    451 	TAILQ_INIT(&spclist[i].spc_respwait);
    452 
    453 	DPRINTF(("rump_sp: added new connection fd %d at idx %u, pid %d\n",
    454 	    newfd, i, lwproc_getpid()));
    455 
    456 	lwproc_switch(NULL);
    457 
    458 	return i;
    459 }
    460 
    461 static void
    462 serv_handlesyscall(struct spclient *spc, struct rsp_hdr *rhdr, uint8_t *data)
    463 {
    464 	register_t retval[2] = {0, 0};
    465 	int rv, sysnum;
    466 
    467 	sysnum = (int)rhdr->rsp_sysnum;
    468 	DPRINTF(("rump_sp: handling syscall %d from client %d\n",
    469 	    sysnum, 0));
    470 
    471 	lwproc_newlwp(spc->spc_pid);
    472 	rv = rumpsyscall(sysnum, data, retval);
    473 	lwproc_release();
    474 
    475 	DPRINTF(("rump_sp: got return value %d & %d/%d\n",
    476 	    rv, retval[0], retval[1]));
    477 
    478 	send_syscall_resp(spc, rhdr->rsp_reqno, rv, retval);
    479 }
    480 
    481 struct sysbouncearg {
    482 	struct spclient *sba_spc;
    483 	struct rsp_hdr sba_hdr;
    484 	uint8_t *sba_data;
    485 
    486 	TAILQ_ENTRY(sysbouncearg) sba_entries;
    487 };
    488 static pthread_mutex_t sbamtx;
    489 static pthread_cond_t sbacv;
    490 static int nworker, idleworker;
    491 static TAILQ_HEAD(, sysbouncearg) syslist = TAILQ_HEAD_INITIALIZER(syslist);
    492 
    493 /*ARGSUSED*/
    494 static void *
    495 serv_syscallbouncer(void *arg)
    496 {
    497 	struct sysbouncearg *sba;
    498 
    499 	for (;;) {
    500 		pthread_mutex_lock(&sbamtx);
    501 		if (idleworker >= rumpsp_idleworker) {
    502 			nworker--;
    503 			pthread_mutex_unlock(&sbamtx);
    504 			break;
    505 		}
    506 		idleworker++;
    507 		while (TAILQ_EMPTY(&syslist)) {
    508 			pthread_cond_wait(&sbacv, &sbamtx);
    509 		}
    510 
    511 		sba = TAILQ_FIRST(&syslist);
    512 		TAILQ_REMOVE(&syslist, sba, sba_entries);
    513 		idleworker--;
    514 		pthread_mutex_unlock(&sbamtx);
    515 
    516 		serv_handlesyscall(sba->sba_spc,
    517 		    &sba->sba_hdr, sba->sba_data);
    518 		spcrelease(sba->sba_spc);
    519 		free(sba->sba_data);
    520 		free(sba);
    521 	}
    522 
    523 	return NULL;
    524 }
    525 
    526 static int
    527 sp_copyin(void *arg, const void *raddr, void *laddr, size_t *len, int wantstr)
    528 {
    529 	struct spclient *spc = arg;
    530 	void *rdata = NULL; /* XXXuninit */
    531 	int rv, nlocks;
    532 
    533 	rumpuser__kunlock(0, &nlocks, NULL);
    534 
    535 	rv = copyin_req(spc, raddr, len, wantstr, &rdata);
    536 	if (rv)
    537 		goto out;
    538 
    539 	memcpy(laddr, rdata, *len);
    540 	free(rdata);
    541 
    542  out:
    543 	rumpuser__klock(nlocks, NULL);
    544 	if (rv)
    545 		return EFAULT;
    546 	return 0;
    547 }
    548 
    549 int
    550 rumpuser_sp_copyin(void *arg, const void *raddr, void *laddr, size_t len)
    551 {
    552 
    553 	return sp_copyin(arg, raddr, laddr, &len, 0);
    554 }
    555 
    556 int
    557 rumpuser_sp_copyinstr(void *arg, const void *raddr, void *laddr, size_t *len)
    558 {
    559 
    560 	return sp_copyin(arg, raddr, laddr, len, 1);
    561 }
    562 
    563 static int
    564 sp_copyout(void *arg, const void *laddr, void *raddr, size_t dlen)
    565 {
    566 	struct spclient *spc = arg;
    567 	int nlocks, rv;
    568 
    569 	rumpuser__kunlock(0, &nlocks, NULL);
    570 	rv = send_copyout_req(spc, raddr, laddr, dlen);
    571 	rumpuser__klock(nlocks, NULL);
    572 
    573 	if (rv)
    574 		return EFAULT;
    575 	return 0;
    576 }
    577 
    578 int
    579 rumpuser_sp_copyout(void *arg, const void *laddr, void *raddr, size_t dlen)
    580 {
    581 
    582 	return sp_copyout(arg, laddr, raddr, dlen);
    583 }
    584 
    585 int
    586 rumpuser_sp_copyoutstr(void *arg, const void *laddr, void *raddr, size_t *dlen)
    587 {
    588 
    589 	return sp_copyout(arg, laddr, raddr, *dlen);
    590 }
    591 
    592 int
    593 rumpuser_sp_anonmmap(void *arg, size_t howmuch, void **addr)
    594 {
    595 	struct spclient *spc = arg;
    596 	void *resp, *rdata;
    597 	int nlocks, rv;
    598 
    599 	rumpuser__kunlock(0, &nlocks, NULL);
    600 
    601 	rv = anonmmap_req(spc, howmuch, &rdata);
    602 	if (rv) {
    603 		rv = EFAULT;
    604 		goto out;
    605 	}
    606 
    607 	resp = *(void **)rdata;
    608 	free(rdata);
    609 
    610 	if (resp == NULL) {
    611 		rv = ENOMEM;
    612 	}
    613 
    614 	*addr = resp;
    615 
    616  out:
    617 	rumpuser__klock(nlocks, NULL);
    618 
    619 	if (rv)
    620 		return rv;
    621 	return 0;
    622 }
    623 
    624 /*
    625  *
    626  * Startup routines and mainloop for server.
    627  *
    628  */
    629 
    630 struct spservarg {
    631 	int sps_sock;
    632 	connecthook_fn sps_connhook;
    633 };
    634 
    635 static pthread_attr_t pattr_detached;
    636 static void
    637 handlereq(struct spclient *spc)
    638 {
    639 	struct sysbouncearg *sba;
    640 	pthread_t pt;
    641 	int retries;
    642 
    643 	if (__predict_false(spc->spc_hdr.rsp_type != RUMPSP_SYSCALL)) {
    644 		send_error_resp(spc, spc->spc_hdr.rsp_reqno, EINVAL);
    645 		spcfreebuf(spc);
    646 		return;
    647 	}
    648 
    649 	retries = 0;
    650 	while ((sba = malloc(sizeof(*sba))) == NULL) {
    651 		if (nworker == 0 || retries > 10) {
    652 			send_error_resp(spc, spc->spc_hdr.rsp_reqno, EAGAIN);
    653 			spcfreebuf(spc);
    654 			return;
    655 		}
    656 		/* slim chance of more memory? */
    657 		usleep(10000);
    658 	}
    659 
    660 	sba->sba_spc = spc;
    661 	sba->sba_hdr = spc->spc_hdr;
    662 	sba->sba_data = spc->spc_buf;
    663 	spcresetbuf(spc);
    664 
    665 	spcref(spc);
    666 
    667 	pthread_mutex_lock(&sbamtx);
    668 	TAILQ_INSERT_TAIL(&syslist, sba, sba_entries);
    669 	if (idleworker > 0) {
    670 		/* do we have a daemon's tool (i.e. idle threads)? */
    671 		pthread_cond_signal(&sbacv);
    672 	} else if (nworker < rumpsp_maxworker) {
    673 		/*
    674 		 * Else, need to create one
    675 		 * (if we can, otherwise just expect another
    676 		 * worker to pick up the syscall)
    677 		 */
    678 		if (pthread_create(&pt, &pattr_detached,
    679 		    serv_syscallbouncer, NULL) == 0)
    680 			nworker++;
    681 	}
    682 	pthread_mutex_unlock(&sbamtx);
    683 }
    684 
    685 static void *
    686 spserver(void *arg)
    687 {
    688 	struct spservarg *sarg = arg;
    689 	struct spclient *spc;
    690 	unsigned idx;
    691 	int seen;
    692 	int rv;
    693 	unsigned int nfds, maxidx;
    694 
    695 	for (idx = 0; idx < MAXCLI; idx++) {
    696 		pfdlist[idx].fd = -1;
    697 		pfdlist[idx].events = POLLIN;
    698 
    699 		spc = &spclist[idx];
    700 		pthread_mutex_init(&spc->spc_mtx, NULL);
    701 		pthread_cond_init(&spc->spc_cv, NULL);
    702 		spc->spc_fd = -1;
    703 	}
    704 	pfdlist[0].fd = spclist[0].spc_fd = sarg->sps_sock;
    705 	pfdlist[0].events = POLLIN;
    706 	nfds = 1;
    707 	maxidx = 0;
    708 
    709 	pthread_attr_init(&pattr_detached);
    710 	pthread_attr_setdetachstate(&pattr_detached, PTHREAD_CREATE_DETACHED);
    711 	/* XXX: doesn't stacksize currently work on NetBSD */
    712 	pthread_attr_setstacksize(&pattr_detached, 32*1024);
    713 
    714 	pthread_mutex_init(&sbamtx, NULL);
    715 	pthread_cond_init(&sbacv, NULL);
    716 
    717 	DPRINTF(("rump_sp: server mainloop\n"));
    718 
    719 	for (;;) {
    720 		int discoed;
    721 
    722 		/* g/c hangarounds (eventually) */
    723 		discoed = atomic_swap_uint(&disco, 0);
    724 		while (discoed--) {
    725 			nfds--;
    726 			idx = maxidx;
    727 			while (idx) {
    728 				if (pfdlist[idx].fd != -1) {
    729 					maxidx = idx;
    730 					break;
    731 				}
    732 				idx--;
    733 			}
    734 			DPRINTF(("rump_sp: set maxidx to [%u]\n",
    735 			    maxidx));
    736 		}
    737 
    738 		DPRINTF(("rump_sp: loop nfd %d\n", maxidx+1));
    739 		seen = 0;
    740 		rv = poll(pfdlist, maxidx+1, INFTIM);
    741 		assert(maxidx+1 <= MAXCLI);
    742 		assert(rv != 0);
    743 		if (rv == -1) {
    744 			if (errno == EINTR)
    745 				continue;
    746 			fprintf(stderr, "rump_spserver: poll returned %d\n",
    747 			    errno);
    748 			break;
    749 		}
    750 
    751 		for (idx = 0; seen < rv && idx < MAXCLI; idx++) {
    752 			if ((pfdlist[idx].revents & POLLIN) == 0)
    753 				continue;
    754 
    755 			seen++;
    756 			DPRINTF(("rump_sp: activity at [%u] %d/%d\n",
    757 			    idx, seen, rv));
    758 			if (idx > 0) {
    759 				spc = &spclist[idx];
    760 				DPRINTF(("rump_sp: mainloop read [%u]\n", idx));
    761 				switch (readframe(spc)) {
    762 				case 0:
    763 					break;
    764 				case -1:
    765 					serv_handledisco(idx);
    766 					break;
    767 				default:
    768 					switch (spc->spc_hdr.rsp_class) {
    769 					case RUMPSP_RESP:
    770 						kickwaiter(spc);
    771 						break;
    772 					case RUMPSP_REQ:
    773 						handlereq(spc);
    774 						break;
    775 					default:
    776 						send_error_resp(spc,
    777 						    spc->spc_hdr.rsp_reqno,
    778 						    ENOENT);
    779 						spcfreebuf(spc);
    780 						break;
    781 					}
    782 					break;
    783 				}
    784 
    785 			} else {
    786 				DPRINTF(("rump_sp: mainloop new connection\n"));
    787 
    788 				if (__predict_false(spfini)) {
    789 					close(spclist[0].spc_fd);
    790 					serv_shutdown();
    791 					goto out;
    792 				}
    793 
    794 				idx = serv_handleconn(pfdlist[0].fd,
    795 				    sarg->sps_connhook, nfds == MAXCLI);
    796 				if (idx)
    797 					nfds++;
    798 				if (idx > maxidx)
    799 					maxidx = idx;
    800 				DPRINTF(("rump_sp: maxid now %d\n", maxidx));
    801 			}
    802 		}
    803 	}
    804 
    805  out:
    806 	return NULL;
    807 }
    808 
    809 static unsigned cleanupidx;
    810 static struct sockaddr *cleanupsa;
    811 int
    812 rumpuser_sp_init(const struct rumpuser_sp_ops *spopsp, const char *url)
    813 {
    814 	pthread_t pt;
    815 	struct spservarg *sarg;
    816 	struct sockaddr *sap;
    817 	char *p;
    818 	unsigned idx;
    819 	int error, s;
    820 
    821 	p = strdup(url);
    822 	if (p == NULL)
    823 		return ENOMEM;
    824 	error = parseurl(p, &sap, &idx, 1);
    825 	free(p);
    826 	if (error)
    827 		return error;
    828 
    829 	s = socket(parsetab[idx].domain, SOCK_STREAM, 0);
    830 	if (s == -1)
    831 		return errno;
    832 
    833 	spops = *spopsp;
    834 	sarg = malloc(sizeof(*sarg));
    835 	if (sarg == NULL) {
    836 		close(s);
    837 		return ENOMEM;
    838 	}
    839 
    840 	sarg->sps_sock = s;
    841 	sarg->sps_connhook = parsetab[idx].connhook;
    842 
    843 	cleanupidx = idx;
    844 	cleanupsa = sap;
    845 
    846 	/* sloppy error recovery */
    847 
    848 	/*LINTED*/
    849 	if (bind(s, sap, sap->sa_len) == -1) {
    850 		fprintf(stderr, "rump_sp: server bind failed\n");
    851 		return errno;
    852 	}
    853 
    854 	if (listen(s, MAXCLI) == -1) {
    855 		fprintf(stderr, "rump_sp: server listen failed\n");
    856 		return errno;
    857 	}
    858 
    859 	if ((error = pthread_create(&pt, NULL, spserver, sarg)) != 0) {
    860 		fprintf(stderr, "rump_sp: cannot create wrkr thread\n");
    861 		return errno;
    862 	}
    863 	pthread_detach(pt);
    864 
    865 	return 0;
    866 }
    867 
    868 void
    869 rumpuser_sp_fini()
    870 {
    871 
    872 	if (spclist[0].spc_fd) {
    873 		parsetab[cleanupidx].cleanup(cleanupsa);
    874 		shutdown(spclist[0].spc_fd, SHUT_RDWR);
    875 		spfini = 1;
    876 	}
    877 }
    878