rumpuser_sp.c revision 1.31 1 /* $NetBSD: rumpuser_sp.c,v 1.31 2011/01/06 06:57:14 pooka Exp $ */
2
3 /*
4 * Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
16 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18 * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
21 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 * SUCH DAMAGE.
26 */
27
28 /*
29 * Sysproxy routines. This provides system RPC support over host sockets.
30 * The most notable limitation is that the client and server must share
31 * the same ABI. This does not mean that they have to be the same
32 * machine or that they need to run the same version of the host OS,
33 * just that they must agree on the data structures. This even *might*
34 * work correctly from one hardware architecture to another.
35 */
36
37 #include <sys/cdefs.h>
38 __RCSID("$NetBSD: rumpuser_sp.c,v 1.31 2011/01/06 06:57:14 pooka Exp $");
39
40 #include <sys/types.h>
41 #include <sys/atomic.h>
42 #include <sys/mman.h>
43 #include <sys/socket.h>
44
45 #include <arpa/inet.h>
46 #include <netinet/in.h>
47 #include <netinet/tcp.h>
48
49 #include <assert.h>
50 #include <errno.h>
51 #include <fcntl.h>
52 #include <poll.h>
53 #include <pthread.h>
54 #include <stdarg.h>
55 #include <stdio.h>
56 #include <stdlib.h>
57 #include <string.h>
58 #include <unistd.h>
59
60 #include <rump/rump.h> /* XXX: for rfork flags */
61 #include <rump/rumpuser.h>
62 #include "rumpuser_int.h"
63
64 #include "sp_common.c"
65
66 #ifndef MAXCLI
67 #define MAXCLI 256
68 #endif
69 #ifndef MAXWORKER
70 #define MAXWORKER 128
71 #endif
72 #ifndef IDLEWORKER
73 #define IDLEWORKER 16
74 #endif
75 int rumpsp_maxworker = MAXWORKER;
76 int rumpsp_idleworker = IDLEWORKER;
77
78 static struct pollfd pfdlist[MAXCLI];
79 static struct spclient spclist[MAXCLI];
80 static unsigned int disco;
81 static volatile int spfini;
82
83 static struct rumpuser_sp_ops spops;
84
85 static char banner[MAXBANNER];
86
87 #define PROTOMAJOR 0
88 #define PROTOMINOR 1
89
90 struct prefork {
91 uint32_t pf_auth[AUTHLEN];
92 struct lwp *pf_lwp;
93
94 LIST_ENTRY(prefork) pf_entries; /* global list */
95 LIST_ENTRY(prefork) pf_spcentries; /* linked from forking spc */
96 };
97 static LIST_HEAD(, prefork) preforks = LIST_HEAD_INITIALIZER(preforks);
98 static pthread_mutex_t pfmtx;
99
100 /*
101 * This version is for the server. It's optimized for multiple threads
102 * and is *NOT* reentrant wrt to signals
103 */
104 static int
105 waitresp(struct spclient *spc, struct respwait *rw)
106 {
107 struct pollfd pfd;
108 int rv = 0;
109
110 sendunlockl(spc);
111
112 rw->rw_error = 0;
113 while (rw->rw_data == NULL && rw->rw_error == 0
114 && spc->spc_state != SPCSTATE_DYING){
115 /* are we free to receive? */
116 if (spc->spc_istatus == SPCSTATUS_FREE) {
117 int gotresp;
118
119 spc->spc_istatus = SPCSTATUS_BUSY;
120 pthread_mutex_unlock(&spc->spc_mtx);
121
122 pfd.fd = spc->spc_fd;
123 pfd.events = POLLIN;
124
125 for (gotresp = 0; !gotresp; ) {
126 switch (readframe(spc)) {
127 case 0:
128 poll(&pfd, 1, INFTIM);
129 continue;
130 case -1:
131 rv = errno;
132 spc->spc_state = SPCSTATE_DYING;
133 goto cleanup;
134 default:
135 break;
136 }
137
138 switch (spc->spc_hdr.rsp_class) {
139 case RUMPSP_RESP:
140 case RUMPSP_ERROR:
141 kickwaiter(spc);
142 gotresp = spc->spc_hdr.rsp_reqno ==
143 rw->rw_reqno;
144 break;
145 case RUMPSP_REQ:
146 handlereq(spc);
147 break;
148 default:
149 /* panic */
150 break;
151 }
152 }
153 cleanup:
154 pthread_mutex_lock(&spc->spc_mtx);
155 if (spc->spc_istatus == SPCSTATUS_WANTED)
156 kickall(spc);
157 spc->spc_istatus = SPCSTATUS_FREE;
158 } else {
159 spc->spc_istatus = SPCSTATUS_WANTED;
160 pthread_cond_wait(&rw->rw_cv, &spc->spc_mtx);
161 }
162 }
163
164 TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries);
165 pthread_mutex_unlock(&spc->spc_mtx);
166
167 pthread_cond_destroy(&rw->rw_cv);
168
169 if (rv)
170 return rv;
171 if (spc->spc_state == SPCSTATE_DYING)
172 return ENOTCONN;
173 return rw->rw_error;
174 }
175
176 /*
177 * Manual wrappers, since librump does not have access to the
178 * user namespace wrapped interfaces.
179 */
180
181 static void
182 lwproc_switch(struct lwp *l)
183 {
184
185 spops.spop_schedule();
186 spops.spop_lwproc_switch(l);
187 spops.spop_unschedule();
188 }
189
190 static void
191 lwproc_release(void)
192 {
193
194 spops.spop_schedule();
195 spops.spop_lwproc_release();
196 spops.spop_unschedule();
197 }
198
199 static int
200 lwproc_rfork(struct spclient *spc, int flags)
201 {
202 int rv;
203
204 spops.spop_schedule();
205 rv = spops.spop_lwproc_rfork(spc, flags);
206 spops.spop_unschedule();
207
208 return rv;
209 }
210
211 static int
212 lwproc_newlwp(pid_t pid)
213 {
214 int rv;
215
216 spops.spop_schedule();
217 rv = spops.spop_lwproc_newlwp(pid);
218 spops.spop_unschedule();
219
220 return rv;
221 }
222
223 static struct lwp *
224 lwproc_curlwp(void)
225 {
226 struct lwp *l;
227
228 spops.spop_schedule();
229 l = spops.spop_lwproc_curlwp();
230 spops.spop_unschedule();
231
232 return l;
233 }
234
235 static pid_t
236 lwproc_getpid(void)
237 {
238 pid_t p;
239
240 spops.spop_schedule();
241 p = spops.spop_getpid();
242 spops.spop_unschedule();
243
244 return p;
245 }
246
247 static int
248 rumpsyscall(int sysnum, void *data, register_t *retval)
249 {
250 int rv;
251
252 spops.spop_schedule();
253 rv = spops.spop_syscall(sysnum, data, retval);
254 spops.spop_unschedule();
255
256 return rv;
257 }
258
259 static uint64_t
260 nextreq(struct spclient *spc)
261 {
262 uint64_t nw;
263
264 pthread_mutex_lock(&spc->spc_mtx);
265 nw = spc->spc_nextreq++;
266 pthread_mutex_unlock(&spc->spc_mtx);
267
268 return nw;
269 }
270
271 static void
272 send_error_resp(struct spclient *spc, uint64_t reqno, int error)
273 {
274 struct rsp_hdr rhdr;
275
276 rhdr.rsp_len = sizeof(rhdr);
277 rhdr.rsp_reqno = reqno;
278 rhdr.rsp_class = RUMPSP_ERROR;
279 rhdr.rsp_type = 0;
280 rhdr.rsp_error = error;
281
282 sendlock(spc);
283 (void)dosend(spc, &rhdr, sizeof(rhdr));
284 sendunlock(spc);
285 }
286
287 static int
288 send_handshake_resp(struct spclient *spc, uint64_t reqno, int error)
289 {
290 struct rsp_hdr rhdr;
291 int rv;
292
293 rhdr.rsp_len = sizeof(rhdr) + sizeof(error);
294 rhdr.rsp_reqno = reqno;
295 rhdr.rsp_class = RUMPSP_RESP;
296 rhdr.rsp_type = RUMPSP_HANDSHAKE;
297 rhdr.rsp_error = 0;
298
299 sendlock(spc);
300 rv = dosend(spc, &rhdr, sizeof(rhdr));
301 rv = dosend(spc, &error, sizeof(error));
302 sendunlock(spc);
303
304 return rv;
305 }
306
307 static int
308 send_syscall_resp(struct spclient *spc, uint64_t reqno, int error,
309 register_t *retval)
310 {
311 struct rsp_hdr rhdr;
312 struct rsp_sysresp sysresp;
313 int rv;
314
315 rhdr.rsp_len = sizeof(rhdr) + sizeof(sysresp);
316 rhdr.rsp_reqno = reqno;
317 rhdr.rsp_class = RUMPSP_RESP;
318 rhdr.rsp_type = RUMPSP_SYSCALL;
319 rhdr.rsp_sysnum = 0;
320
321 sysresp.rsys_error = error;
322 memcpy(sysresp.rsys_retval, retval, sizeof(sysresp.rsys_retval));
323
324 sendlock(spc);
325 rv = dosend(spc, &rhdr, sizeof(rhdr));
326 rv = dosend(spc, &sysresp, sizeof(sysresp));
327 sendunlock(spc);
328
329 return rv;
330 }
331
332 static int
333 send_prefork_resp(struct spclient *spc, uint64_t reqno, uint32_t *auth)
334 {
335 struct rsp_hdr rhdr;
336 int rv;
337
338 rhdr.rsp_len = sizeof(rhdr) + AUTHLEN*sizeof(*auth);
339 rhdr.rsp_reqno = reqno;
340 rhdr.rsp_class = RUMPSP_RESP;
341 rhdr.rsp_type = RUMPSP_PREFORK;
342 rhdr.rsp_sysnum = 0;
343
344 sendlock(spc);
345 rv = dosend(spc, &rhdr, sizeof(rhdr));
346 rv = dosend(spc, auth, AUTHLEN*sizeof(*auth));
347 sendunlock(spc);
348
349 return rv;
350 }
351
352 static int
353 copyin_req(struct spclient *spc, const void *remaddr, size_t *dlen,
354 int wantstr, void **resp)
355 {
356 struct rsp_hdr rhdr;
357 struct rsp_copydata copydata;
358 struct respwait rw;
359 int rv;
360
361 DPRINTF(("copyin_req: %zu bytes from %p\n", *dlen, remaddr));
362
363 rhdr.rsp_len = sizeof(rhdr) + sizeof(copydata);
364 rhdr.rsp_class = RUMPSP_REQ;
365 if (wantstr)
366 rhdr.rsp_type = RUMPSP_COPYINSTR;
367 else
368 rhdr.rsp_type = RUMPSP_COPYIN;
369 rhdr.rsp_sysnum = 0;
370
371 copydata.rcp_addr = __UNCONST(remaddr);
372 copydata.rcp_len = *dlen;
373
374 putwait(spc, &rw, &rhdr);
375 rv = dosend(spc, &rhdr, sizeof(rhdr));
376 rv = dosend(spc, ©data, sizeof(copydata));
377 if (rv) {
378 unputwait(spc, &rw);
379 return rv;
380 }
381
382 rv = waitresp(spc, &rw);
383
384 DPRINTF(("copyin: response %d\n", rv));
385
386 *resp = rw.rw_data;
387 if (wantstr)
388 *dlen = rw.rw_dlen;
389
390 return rv;
391
392 }
393
394 static int
395 send_copyout_req(struct spclient *spc, const void *remaddr,
396 const void *data, size_t dlen)
397 {
398 struct rsp_hdr rhdr;
399 struct rsp_copydata copydata;
400 int rv;
401
402 DPRINTF(("copyout_req (async): %zu bytes to %p\n", dlen, remaddr));
403
404 rhdr.rsp_len = sizeof(rhdr) + sizeof(copydata) + dlen;
405 rhdr.rsp_reqno = nextreq(spc);
406 rhdr.rsp_class = RUMPSP_REQ;
407 rhdr.rsp_type = RUMPSP_COPYOUT;
408 rhdr.rsp_sysnum = 0;
409
410 copydata.rcp_addr = __UNCONST(remaddr);
411 copydata.rcp_len = dlen;
412
413 sendlock(spc);
414 rv = dosend(spc, &rhdr, sizeof(rhdr));
415 rv = dosend(spc, ©data, sizeof(copydata));
416 rv = dosend(spc, data, dlen);
417 sendunlock(spc);
418
419 return rv;
420 }
421
422 static int
423 anonmmap_req(struct spclient *spc, size_t howmuch, void **resp)
424 {
425 struct rsp_hdr rhdr;
426 struct respwait rw;
427 int rv;
428
429 DPRINTF(("anonmmap_req: %zu bytes\n", howmuch));
430
431 rhdr.rsp_len = sizeof(rhdr) + sizeof(howmuch);
432 rhdr.rsp_class = RUMPSP_REQ;
433 rhdr.rsp_type = RUMPSP_ANONMMAP;
434 rhdr.rsp_sysnum = 0;
435
436 putwait(spc, &rw, &rhdr);
437 rv = dosend(spc, &rhdr, sizeof(rhdr));
438 rv = dosend(spc, &howmuch, sizeof(howmuch));
439 if (rv) {
440 unputwait(spc, &rw);
441 return rv;
442 }
443
444 rv = waitresp(spc, &rw);
445
446 *resp = rw.rw_data;
447
448 DPRINTF(("anonmmap: mapped at %p\n", **(void ***)resp));
449
450 return rv;
451 }
452
453 static void
454 spcref(struct spclient *spc)
455 {
456
457 pthread_mutex_lock(&spc->spc_mtx);
458 spc->spc_refcnt++;
459 pthread_mutex_unlock(&spc->spc_mtx);
460 }
461
462 static void
463 spcrelease(struct spclient *spc)
464 {
465 int ref;
466
467 pthread_mutex_lock(&spc->spc_mtx);
468 ref = --spc->spc_refcnt;
469 pthread_mutex_unlock(&spc->spc_mtx);
470
471 if (ref > 0)
472 return;
473
474 DPRINTF(("rump_sp: spcrelease: spc %p fd %d\n", spc, spc->spc_fd));
475
476 _DIAGASSERT(TAILQ_EMPTY(&spc->spc_respwait));
477 _DIAGASSERT(spc->spc_buf == NULL);
478
479 if (spc->spc_mainlwp) {
480 lwproc_switch(spc->spc_mainlwp);
481 lwproc_release();
482 }
483 spc->spc_mainlwp = NULL;
484
485 close(spc->spc_fd);
486 spc->spc_fd = -1;
487 spc->spc_state = SPCSTATE_NEW;
488
489 atomic_inc_uint(&disco);
490 }
491
492 static void
493 serv_handledisco(unsigned int idx)
494 {
495 struct spclient *spc = &spclist[idx];
496
497 DPRINTF(("rump_sp: disconnecting [%u]\n", idx));
498
499 pfdlist[idx].fd = -1;
500 pfdlist[idx].revents = 0;
501 pthread_mutex_lock(&spc->spc_mtx);
502 spc->spc_state = SPCSTATE_DYING;
503 kickall(spc);
504 sendunlockl(spc);
505 pthread_mutex_unlock(&spc->spc_mtx);
506
507 /*
508 * Nobody's going to attempt to send/receive anymore,
509 * so reinit info relevant to that.
510 */
511 /*LINTED:pointer casts may be ok*/
512 memset((char *)spc + SPC_ZEROFF, 0, sizeof(*spc) - SPC_ZEROFF);
513
514 spcrelease(spc);
515 }
516
517 static void
518 serv_shutdown(void)
519 {
520 struct spclient *spc;
521 unsigned int i;
522
523 for (i = 1; i < MAXCLI; i++) {
524 spc = &spclist[i];
525 if (spc->spc_fd == -1)
526 continue;
527
528 shutdown(spc->spc_fd, SHUT_RDWR);
529 serv_handledisco(i);
530
531 spcrelease(spc);
532 }
533 }
534
535 static unsigned
536 serv_handleconn(int fd, connecthook_fn connhook, int busy)
537 {
538 struct sockaddr_storage ss;
539 socklen_t sl = sizeof(ss);
540 int newfd, flags;
541 unsigned i;
542
543 /*LINTED: cast ok */
544 newfd = accept(fd, (struct sockaddr *)&ss, &sl);
545 if (newfd == -1)
546 return 0;
547
548 if (busy) {
549 close(newfd); /* EBUSY */
550 return 0;
551 }
552
553 flags = fcntl(newfd, F_GETFL, 0);
554 if (fcntl(newfd, F_SETFL, flags | O_NONBLOCK) == -1) {
555 close(newfd);
556 return 0;
557 }
558
559 if (connhook(newfd) != 0) {
560 close(newfd);
561 return 0;
562 }
563
564 /* write out a banner for the client */
565 if (write(newfd, banner, strlen(banner)) != (ssize_t)strlen(banner)) {
566 close(newfd);
567 return 0;
568 }
569
570 /* find empty slot the simple way */
571 for (i = 0; i < MAXCLI; i++) {
572 if (pfdlist[i].fd == -1 && spclist[i].spc_state == SPCSTATE_NEW)
573 break;
574 }
575
576 assert(i < MAXCLI);
577
578 pfdlist[i].fd = newfd;
579 spclist[i].spc_fd = newfd;
580 spclist[i].spc_istatus = SPCSTATUS_BUSY; /* dedicated receiver */
581 spclist[i].spc_refcnt = 1;
582
583 TAILQ_INIT(&spclist[i].spc_respwait);
584
585 DPRINTF(("rump_sp: added new connection fd %d at idx %u\n", newfd, i));
586
587 return i;
588 }
589
590 static void
591 serv_handlesyscall(struct spclient *spc, struct rsp_hdr *rhdr, uint8_t *data)
592 {
593 register_t retval[2] = {0, 0};
594 int rv, sysnum;
595
596 sysnum = (int)rhdr->rsp_sysnum;
597 DPRINTF(("rump_sp: handling syscall %d from client %d\n",
598 sysnum, spc->spc_pid));
599
600 lwproc_newlwp(spc->spc_pid);
601 rv = rumpsyscall(sysnum, data, retval);
602 lwproc_release();
603
604 DPRINTF(("rump_sp: got return value %d & %d/%d\n",
605 rv, retval[0], retval[1]));
606
607 send_syscall_resp(spc, rhdr->rsp_reqno, rv, retval);
608 }
609
610 struct sysbouncearg {
611 struct spclient *sba_spc;
612 struct rsp_hdr sba_hdr;
613 uint8_t *sba_data;
614
615 TAILQ_ENTRY(sysbouncearg) sba_entries;
616 };
617 static pthread_mutex_t sbamtx;
618 static pthread_cond_t sbacv;
619 static int nworker, idleworker;
620 static TAILQ_HEAD(, sysbouncearg) syslist = TAILQ_HEAD_INITIALIZER(syslist);
621
622 /*ARGSUSED*/
623 static void *
624 serv_syscallbouncer(void *arg)
625 {
626 struct sysbouncearg *sba;
627
628 for (;;) {
629 pthread_mutex_lock(&sbamtx);
630 if (idleworker >= rumpsp_idleworker) {
631 nworker--;
632 pthread_mutex_unlock(&sbamtx);
633 break;
634 }
635 idleworker++;
636 while (TAILQ_EMPTY(&syslist)) {
637 pthread_cond_wait(&sbacv, &sbamtx);
638 }
639
640 sba = TAILQ_FIRST(&syslist);
641 TAILQ_REMOVE(&syslist, sba, sba_entries);
642 idleworker--;
643 pthread_mutex_unlock(&sbamtx);
644
645 serv_handlesyscall(sba->sba_spc,
646 &sba->sba_hdr, sba->sba_data);
647 spcrelease(sba->sba_spc);
648 free(sba->sba_data);
649 free(sba);
650 }
651
652 return NULL;
653 }
654
655 static int
656 sp_copyin(void *arg, const void *raddr, void *laddr, size_t *len, int wantstr)
657 {
658 struct spclient *spc = arg;
659 void *rdata = NULL; /* XXXuninit */
660 int rv, nlocks;
661
662 rumpuser__kunlock(0, &nlocks, NULL);
663
664 rv = copyin_req(spc, raddr, len, wantstr, &rdata);
665 if (rv)
666 goto out;
667
668 memcpy(laddr, rdata, *len);
669 free(rdata);
670
671 out:
672 rumpuser__klock(nlocks, NULL);
673 if (rv)
674 return EFAULT;
675 return 0;
676 }
677
678 int
679 rumpuser_sp_copyin(void *arg, const void *raddr, void *laddr, size_t len)
680 {
681
682 return sp_copyin(arg, raddr, laddr, &len, 0);
683 }
684
685 int
686 rumpuser_sp_copyinstr(void *arg, const void *raddr, void *laddr, size_t *len)
687 {
688
689 return sp_copyin(arg, raddr, laddr, len, 1);
690 }
691
692 static int
693 sp_copyout(void *arg, const void *laddr, void *raddr, size_t dlen)
694 {
695 struct spclient *spc = arg;
696 int nlocks, rv;
697
698 rumpuser__kunlock(0, &nlocks, NULL);
699 rv = send_copyout_req(spc, raddr, laddr, dlen);
700 rumpuser__klock(nlocks, NULL);
701
702 if (rv)
703 return EFAULT;
704 return 0;
705 }
706
707 int
708 rumpuser_sp_copyout(void *arg, const void *laddr, void *raddr, size_t dlen)
709 {
710
711 return sp_copyout(arg, laddr, raddr, dlen);
712 }
713
714 int
715 rumpuser_sp_copyoutstr(void *arg, const void *laddr, void *raddr, size_t *dlen)
716 {
717
718 return sp_copyout(arg, laddr, raddr, *dlen);
719 }
720
721 int
722 rumpuser_sp_anonmmap(void *arg, size_t howmuch, void **addr)
723 {
724 struct spclient *spc = arg;
725 void *resp, *rdata;
726 int nlocks, rv;
727
728 rumpuser__kunlock(0, &nlocks, NULL);
729
730 rv = anonmmap_req(spc, howmuch, &rdata);
731 if (rv) {
732 rv = EFAULT;
733 goto out;
734 }
735
736 resp = *(void **)rdata;
737 free(rdata);
738
739 if (resp == NULL) {
740 rv = ENOMEM;
741 }
742
743 *addr = resp;
744
745 out:
746 rumpuser__klock(nlocks, NULL);
747
748 if (rv)
749 return rv;
750 return 0;
751 }
752
753 /*
754 *
755 * Startup routines and mainloop for server.
756 *
757 */
758
759 struct spservarg {
760 int sps_sock;
761 connecthook_fn sps_connhook;
762 };
763
764 static pthread_attr_t pattr_detached;
765 static void
766 handlereq(struct spclient *spc)
767 {
768 struct sysbouncearg *sba;
769 pthread_t pt;
770 int retries, error, i;
771
772 if (__predict_false(spc->spc_state == SPCSTATE_NEW)) {
773 if (spc->spc_hdr.rsp_type != RUMPSP_HANDSHAKE) {
774 send_error_resp(spc, spc->spc_hdr.rsp_reqno, EAUTH);
775 shutdown(spc->spc_fd, SHUT_RDWR);
776 spcfreebuf(spc);
777 return;
778 }
779
780 if (spc->spc_hdr.rsp_handshake == HANDSHAKE_GUEST) {
781 if ((error = lwproc_rfork(spc, RUMP_RFCFDG)) != 0) {
782 shutdown(spc->spc_fd, SHUT_RDWR);
783 }
784
785 spcfreebuf(spc);
786 if (error)
787 return;
788
789 spc->spc_mainlwp = lwproc_curlwp();
790
791 send_handshake_resp(spc, spc->spc_hdr.rsp_reqno, 0);
792 } else if (spc->spc_hdr.rsp_handshake == HANDSHAKE_FORK) {
793 struct lwp *tmpmain;
794 struct prefork *pf;
795 struct handshake_fork *rfp;
796 uint64_t reqno;
797 int cancel;
798
799 reqno = spc->spc_hdr.rsp_reqno;
800 if (spc->spc_off-HDRSZ != sizeof(*rfp)) {
801 send_error_resp(spc, reqno, EINVAL);
802 shutdown(spc->spc_fd, SHUT_RDWR);
803 spcfreebuf(spc);
804 return;
805 }
806
807 /*LINTED*/
808 rfp = (void *)spc->spc_buf;
809 cancel = rfp->rf_cancel;
810
811 pthread_mutex_lock(&pfmtx);
812 LIST_FOREACH(pf, &preforks, pf_entries) {
813 if (memcmp(rfp->rf_auth, pf->pf_auth,
814 sizeof(rfp->rf_auth)) == 0) {
815 LIST_REMOVE(pf, pf_entries);
816 LIST_REMOVE(pf, pf_spcentries);
817 break;
818 }
819 }
820 pthread_mutex_lock(&pfmtx);
821 spcfreebuf(spc);
822
823 if (!pf) {
824 send_error_resp(spc, reqno, ESRCH);
825 shutdown(spc->spc_fd, SHUT_RDWR);
826 return;
827 }
828
829 tmpmain = pf->pf_lwp;
830 free(pf);
831 lwproc_switch(tmpmain);
832 if (cancel) {
833 lwproc_release();
834 shutdown(spc->spc_fd, SHUT_RDWR);
835 return;
836 }
837
838 /*
839 * So, we forked already during "prefork" to save
840 * the file descriptors from a parent exit
841 * race condition. But now we need to fork
842 * a second time since the initial fork has
843 * the wrong spc pointer. (yea, optimize
844 * interfaces some day if anyone cares)
845 */
846 if ((error = lwproc_rfork(spc, 0)) != 0) {
847 send_error_resp(spc, reqno, error);
848 shutdown(spc->spc_fd, SHUT_RDWR);
849 lwproc_release();
850 return;
851 }
852 spc->spc_mainlwp = lwproc_curlwp();
853 lwproc_switch(tmpmain);
854 lwproc_release();
855 lwproc_switch(spc->spc_mainlwp);
856
857 send_handshake_resp(spc, reqno, 0);
858 }
859
860 spc->spc_pid = lwproc_getpid();
861
862 DPRINTF(("rump_sp: handshake for client %p complete, pid %d\n",
863 spc, spc->spc_pid));
864
865 lwproc_switch(NULL);
866 spc->spc_state = SPCSTATE_RUNNING;
867 return;
868 }
869
870 if (__predict_false(spc->spc_hdr.rsp_type == RUMPSP_PREFORK)) {
871 struct prefork *pf;
872 uint64_t reqno;
873 uint32_t auth[AUTHLEN];
874
875 DPRINTF(("rump_sp: prefork handler executing for %p\n", spc));
876 reqno = spc->spc_hdr.rsp_reqno;
877 spcfreebuf(spc);
878
879 pf = malloc(sizeof(*pf));
880 if (pf == NULL) {
881 send_error_resp(spc, reqno, ENOMEM);
882 return;
883 }
884
885 /*
886 * Use client main lwp to fork. this is never used by
887 * worker threads (except if spc refcount goes to 0),
888 * so we can safely use it here.
889 */
890 lwproc_switch(spc->spc_mainlwp);
891 if ((error = lwproc_rfork(spc, RUMP_RFFDG)) != 0) {
892 DPRINTF(("rump_sp: fork failed: %d (%p)\n",error, spc));
893 send_error_resp(spc, reqno, error);
894 lwproc_switch(NULL);
895 free(pf);
896 return;
897 }
898
899 /* Ok, we have a new process context and a new curlwp */
900 for (i = 0; i < AUTHLEN; i++) {
901 pf->pf_auth[i] = auth[i] = arc4random();
902 }
903 pf->pf_lwp = lwproc_curlwp();
904 lwproc_switch(NULL);
905
906 pthread_mutex_lock(&pfmtx);
907 LIST_INSERT_HEAD(&preforks, pf, pf_entries);
908 LIST_INSERT_HEAD(&spc->spc_pflist, pf, pf_spcentries);
909 pthread_mutex_unlock(&pfmtx);
910
911 DPRINTF(("rump_sp: prefork handler success %p\n", spc));
912
913 send_prefork_resp(spc, reqno, auth);
914 return;
915 }
916
917 if (__predict_false(spc->spc_hdr.rsp_type != RUMPSP_SYSCALL)) {
918 send_error_resp(spc, spc->spc_hdr.rsp_reqno, EINVAL);
919 spcfreebuf(spc);
920 return;
921 }
922
923 retries = 0;
924 while ((sba = malloc(sizeof(*sba))) == NULL) {
925 if (nworker == 0 || retries > 10) {
926 send_error_resp(spc, spc->spc_hdr.rsp_reqno, EAGAIN);
927 spcfreebuf(spc);
928 return;
929 }
930 /* slim chance of more memory? */
931 usleep(10000);
932 }
933
934 sba->sba_spc = spc;
935 sba->sba_hdr = spc->spc_hdr;
936 sba->sba_data = spc->spc_buf;
937 spcresetbuf(spc);
938
939 spcref(spc);
940
941 pthread_mutex_lock(&sbamtx);
942 TAILQ_INSERT_TAIL(&syslist, sba, sba_entries);
943 if (idleworker > 0) {
944 /* do we have a daemon's tool (i.e. idle threads)? */
945 pthread_cond_signal(&sbacv);
946 } else if (nworker < rumpsp_maxworker) {
947 /*
948 * Else, need to create one
949 * (if we can, otherwise just expect another
950 * worker to pick up the syscall)
951 */
952 if (pthread_create(&pt, &pattr_detached,
953 serv_syscallbouncer, NULL) == 0)
954 nworker++;
955 }
956 pthread_mutex_unlock(&sbamtx);
957 }
958
959 static void *
960 spserver(void *arg)
961 {
962 struct spservarg *sarg = arg;
963 struct spclient *spc;
964 unsigned idx;
965 int seen;
966 int rv;
967 unsigned int nfds, maxidx;
968
969 for (idx = 0; idx < MAXCLI; idx++) {
970 pfdlist[idx].fd = -1;
971 pfdlist[idx].events = POLLIN;
972
973 spc = &spclist[idx];
974 pthread_mutex_init(&spc->spc_mtx, NULL);
975 pthread_cond_init(&spc->spc_cv, NULL);
976 spc->spc_fd = -1;
977 }
978 pfdlist[0].fd = spclist[0].spc_fd = sarg->sps_sock;
979 pfdlist[0].events = POLLIN;
980 nfds = 1;
981 maxidx = 0;
982
983 pthread_attr_init(&pattr_detached);
984 pthread_attr_setdetachstate(&pattr_detached, PTHREAD_CREATE_DETACHED);
985 /* XXX: doesn't stacksize currently work on NetBSD */
986 pthread_attr_setstacksize(&pattr_detached, 32*1024);
987
988 pthread_mutex_init(&sbamtx, NULL);
989 pthread_cond_init(&sbacv, NULL);
990
991 DPRINTF(("rump_sp: server mainloop\n"));
992
993 for (;;) {
994 int discoed;
995
996 /* g/c hangarounds (eventually) */
997 discoed = atomic_swap_uint(&disco, 0);
998 while (discoed--) {
999 nfds--;
1000 idx = maxidx;
1001 while (idx) {
1002 if (pfdlist[idx].fd != -1) {
1003 maxidx = idx;
1004 break;
1005 }
1006 idx--;
1007 }
1008 DPRINTF(("rump_sp: set maxidx to [%u]\n",
1009 maxidx));
1010 }
1011
1012 DPRINTF(("rump_sp: loop nfd %d\n", maxidx+1));
1013 seen = 0;
1014 rv = poll(pfdlist, maxidx+1, INFTIM);
1015 assert(maxidx+1 <= MAXCLI);
1016 assert(rv != 0);
1017 if (rv == -1) {
1018 if (errno == EINTR)
1019 continue;
1020 fprintf(stderr, "rump_spserver: poll returned %d\n",
1021 errno);
1022 break;
1023 }
1024
1025 for (idx = 0; seen < rv && idx < MAXCLI; idx++) {
1026 if ((pfdlist[idx].revents & POLLIN) == 0)
1027 continue;
1028
1029 seen++;
1030 DPRINTF(("rump_sp: activity at [%u] %d/%d\n",
1031 idx, seen, rv));
1032 if (idx > 0) {
1033 spc = &spclist[idx];
1034 DPRINTF(("rump_sp: mainloop read [%u]\n", idx));
1035 switch (readframe(spc)) {
1036 case 0:
1037 break;
1038 case -1:
1039 serv_handledisco(idx);
1040 break;
1041 default:
1042 switch (spc->spc_hdr.rsp_class) {
1043 case RUMPSP_RESP:
1044 kickwaiter(spc);
1045 break;
1046 case RUMPSP_REQ:
1047 handlereq(spc);
1048 break;
1049 default:
1050 send_error_resp(spc,
1051 spc->spc_hdr.rsp_reqno,
1052 ENOENT);
1053 spcfreebuf(spc);
1054 break;
1055 }
1056 break;
1057 }
1058
1059 } else {
1060 DPRINTF(("rump_sp: mainloop new connection\n"));
1061
1062 if (__predict_false(spfini)) {
1063 close(spclist[0].spc_fd);
1064 serv_shutdown();
1065 goto out;
1066 }
1067
1068 idx = serv_handleconn(pfdlist[0].fd,
1069 sarg->sps_connhook, nfds == MAXCLI);
1070 if (idx)
1071 nfds++;
1072 if (idx > maxidx)
1073 maxidx = idx;
1074 DPRINTF(("rump_sp: maxid now %d\n", maxidx));
1075 }
1076 }
1077 }
1078
1079 out:
1080 return NULL;
1081 }
1082
1083 static unsigned cleanupidx;
1084 static struct sockaddr *cleanupsa;
1085 int
1086 rumpuser_sp_init(const char *url, const struct rumpuser_sp_ops *spopsp,
1087 const char *ostype, const char *osrelease, const char *machine)
1088 {
1089 pthread_t pt;
1090 struct spservarg *sarg;
1091 struct sockaddr *sap;
1092 char *p;
1093 unsigned idx;
1094 int error, s;
1095
1096 p = strdup(url);
1097 if (p == NULL)
1098 return ENOMEM;
1099 error = parseurl(p, &sap, &idx, 1);
1100 free(p);
1101 if (error)
1102 return error;
1103
1104 snprintf(banner, sizeof(banner), "RUMPSP-%d.%d-%s-%s/%s\n",
1105 PROTOMAJOR, PROTOMINOR, ostype, osrelease, machine);
1106
1107 s = socket(parsetab[idx].domain, SOCK_STREAM, 0);
1108 if (s == -1)
1109 return errno;
1110
1111 spops = *spopsp;
1112 sarg = malloc(sizeof(*sarg));
1113 if (sarg == NULL) {
1114 close(s);
1115 return ENOMEM;
1116 }
1117
1118 sarg->sps_sock = s;
1119 sarg->sps_connhook = parsetab[idx].connhook;
1120
1121 cleanupidx = idx;
1122 cleanupsa = sap;
1123
1124 /* sloppy error recovery */
1125
1126 /*LINTED*/
1127 if (bind(s, sap, sap->sa_len) == -1) {
1128 fprintf(stderr, "rump_sp: server bind failed\n");
1129 return errno;
1130 }
1131
1132 if (listen(s, MAXCLI) == -1) {
1133 fprintf(stderr, "rump_sp: server listen failed\n");
1134 return errno;
1135 }
1136
1137 if ((error = pthread_create(&pt, NULL, spserver, sarg)) != 0) {
1138 fprintf(stderr, "rump_sp: cannot create wrkr thread\n");
1139 return errno;
1140 }
1141 pthread_detach(pt);
1142
1143 return 0;
1144 }
1145
1146 void
1147 rumpuser_sp_fini()
1148 {
1149
1150 if (spclist[0].spc_fd) {
1151 parsetab[cleanupidx].cleanup(cleanupsa);
1152 shutdown(spclist[0].spc_fd, SHUT_RDWR);
1153 spfini = 1;
1154 }
1155 }
1156