rumpuser_sp.c revision 1.29 1 /* $NetBSD: rumpuser_sp.c,v 1.29 2011/01/05 17:14:50 pooka Exp $ */
2
3 /*
4 * Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
16 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18 * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
21 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 * SUCH DAMAGE.
26 */
27
28 /*
29 * Sysproxy routines. This provides system RPC support over host sockets.
30 * The most notable limitation is that the client and server must share
31 * the same ABI. This does not mean that they have to be the same
32 * machine or that they need to run the same version of the host OS,
33 * just that they must agree on the data structures. This even *might*
34 * work correctly from one hardware architecture to another.
35 */
36
37 #include <sys/cdefs.h>
38 __RCSID("$NetBSD: rumpuser_sp.c,v 1.29 2011/01/05 17:14:50 pooka Exp $");
39
40 #include <sys/types.h>
41 #include <sys/atomic.h>
42 #include <sys/mman.h>
43 #include <sys/socket.h>
44
45 #include <arpa/inet.h>
46 #include <netinet/in.h>
47 #include <netinet/tcp.h>
48
49 #include <assert.h>
50 #include <errno.h>
51 #include <fcntl.h>
52 #include <poll.h>
53 #include <pthread.h>
54 #include <stdarg.h>
55 #include <stdio.h>
56 #include <stdlib.h>
57 #include <string.h>
58 #include <unistd.h>
59
60 #include <rump/rump.h> /* XXX: for rfork flags */
61 #include <rump/rumpuser.h>
62 #include "rumpuser_int.h"
63
64 #include "sp_common.c"
65
66 #ifndef MAXCLI
67 #define MAXCLI 256
68 #endif
69 #ifndef MAXWORKER
70 #define MAXWORKER 128
71 #endif
72 #ifndef IDLEWORKER
73 #define IDLEWORKER 16
74 #endif
75 int rumpsp_maxworker = MAXWORKER;
76 int rumpsp_idleworker = IDLEWORKER;
77
78 static struct pollfd pfdlist[MAXCLI];
79 static struct spclient spclist[MAXCLI];
80 static unsigned int disco;
81 static volatile int spfini;
82
83 static struct rumpuser_sp_ops spops;
84
85 static char banner[MAXBANNER];
86
87 #define PROTOMAJOR 0
88 #define PROTOMINOR 1
89
90 struct prefork {
91 uint32_t pf_auth[AUTHLEN];
92 struct lwp *pf_lwp;
93
94 LIST_ENTRY(prefork) pf_entries; /* global list */
95 LIST_ENTRY(prefork) pf_spcentries; /* linked from forking spc */
96 };
97 static LIST_HEAD(, prefork) preforks = LIST_HEAD_INITIALIZER(preforks);
98 static pthread_mutex_t pfmtx;
99
100 /*
101 * Manual wrappers, since librump does not have access to the
102 * user namespace wrapped interfaces.
103 */
104
105 static void
106 lwproc_switch(struct lwp *l)
107 {
108
109 spops.spop_schedule();
110 spops.spop_lwproc_switch(l);
111 spops.spop_unschedule();
112 }
113
114 static void
115 lwproc_release(void)
116 {
117
118 spops.spop_schedule();
119 spops.spop_lwproc_release();
120 spops.spop_unschedule();
121 }
122
123 static int
124 lwproc_rfork(struct spclient *spc, int flags)
125 {
126 int rv;
127
128 spops.spop_schedule();
129 rv = spops.spop_lwproc_rfork(spc, flags);
130 spops.spop_unschedule();
131
132 return rv;
133 }
134
135 static int
136 lwproc_newlwp(pid_t pid)
137 {
138 int rv;
139
140 spops.spop_schedule();
141 rv = spops.spop_lwproc_newlwp(pid);
142 spops.spop_unschedule();
143
144 return rv;
145 }
146
147 static struct lwp *
148 lwproc_curlwp(void)
149 {
150 struct lwp *l;
151
152 spops.spop_schedule();
153 l = spops.spop_lwproc_curlwp();
154 spops.spop_unschedule();
155
156 return l;
157 }
158
159 static pid_t
160 lwproc_getpid(void)
161 {
162 pid_t p;
163
164 spops.spop_schedule();
165 p = spops.spop_getpid();
166 spops.spop_unschedule();
167
168 return p;
169 }
170
171 static int
172 rumpsyscall(int sysnum, void *data, register_t *retval)
173 {
174 int rv;
175
176 spops.spop_schedule();
177 rv = spops.spop_syscall(sysnum, data, retval);
178 spops.spop_unschedule();
179
180 return rv;
181 }
182
183 static uint64_t
184 nextreq(struct spclient *spc)
185 {
186 uint64_t nw;
187
188 pthread_mutex_lock(&spc->spc_mtx);
189 nw = spc->spc_nextreq++;
190 pthread_mutex_unlock(&spc->spc_mtx);
191
192 return nw;
193 }
194
195 static void
196 send_error_resp(struct spclient *spc, uint64_t reqno, int error)
197 {
198 struct rsp_hdr rhdr;
199
200 rhdr.rsp_len = sizeof(rhdr);
201 rhdr.rsp_reqno = reqno;
202 rhdr.rsp_class = RUMPSP_ERROR;
203 rhdr.rsp_type = 0;
204 rhdr.rsp_error = error;
205
206 sendlock(spc);
207 (void)dosend(spc, &rhdr, sizeof(rhdr));
208 sendunlock(spc);
209 }
210
211 static int
212 send_handshake_resp(struct spclient *spc, uint64_t reqno, int error)
213 {
214 struct rsp_hdr rhdr;
215 int rv;
216
217 rhdr.rsp_len = sizeof(rhdr) + sizeof(error);
218 rhdr.rsp_reqno = reqno;
219 rhdr.rsp_class = RUMPSP_RESP;
220 rhdr.rsp_type = RUMPSP_HANDSHAKE;
221 rhdr.rsp_error = 0;
222
223 sendlock(spc);
224 rv = dosend(spc, &rhdr, sizeof(rhdr));
225 rv = dosend(spc, &error, sizeof(error));
226 sendunlock(spc);
227
228 return rv;
229 }
230
231 static int
232 send_syscall_resp(struct spclient *spc, uint64_t reqno, int error,
233 register_t *retval)
234 {
235 struct rsp_hdr rhdr;
236 struct rsp_sysresp sysresp;
237 int rv;
238
239 rhdr.rsp_len = sizeof(rhdr) + sizeof(sysresp);
240 rhdr.rsp_reqno = reqno;
241 rhdr.rsp_class = RUMPSP_RESP;
242 rhdr.rsp_type = RUMPSP_SYSCALL;
243 rhdr.rsp_sysnum = 0;
244
245 sysresp.rsys_error = error;
246 memcpy(sysresp.rsys_retval, retval, sizeof(sysresp.rsys_retval));
247
248 sendlock(spc);
249 rv = dosend(spc, &rhdr, sizeof(rhdr));
250 rv = dosend(spc, &sysresp, sizeof(sysresp));
251 sendunlock(spc);
252
253 return rv;
254 }
255
256 static int
257 send_prefork_resp(struct spclient *spc, uint64_t reqno, uint32_t *auth)
258 {
259 struct rsp_hdr rhdr;
260 int rv;
261
262 rhdr.rsp_len = sizeof(rhdr) + AUTHLEN*sizeof(*auth);
263 rhdr.rsp_reqno = reqno;
264 rhdr.rsp_class = RUMPSP_RESP;
265 rhdr.rsp_type = RUMPSP_PREFORK;
266 rhdr.rsp_sysnum = 0;
267
268 sendlock(spc);
269 rv = dosend(spc, &rhdr, sizeof(rhdr));
270 rv = dosend(spc, auth, AUTHLEN*sizeof(*auth));
271 sendunlock(spc);
272
273 return rv;
274 }
275
276 static int
277 copyin_req(struct spclient *spc, const void *remaddr, size_t *dlen,
278 int wantstr, void **resp)
279 {
280 struct rsp_hdr rhdr;
281 struct rsp_copydata copydata;
282 struct respwait rw;
283 int rv;
284
285 DPRINTF(("copyin_req: %zu bytes from %p\n", *dlen, remaddr));
286
287 rhdr.rsp_len = sizeof(rhdr) + sizeof(copydata);
288 rhdr.rsp_class = RUMPSP_REQ;
289 if (wantstr)
290 rhdr.rsp_type = RUMPSP_COPYINSTR;
291 else
292 rhdr.rsp_type = RUMPSP_COPYIN;
293 rhdr.rsp_sysnum = 0;
294
295 copydata.rcp_addr = __UNCONST(remaddr);
296 copydata.rcp_len = *dlen;
297
298 putwait(spc, &rw, &rhdr);
299 rv = dosend(spc, &rhdr, sizeof(rhdr));
300 rv = dosend(spc, ©data, sizeof(copydata));
301 if (rv) {
302 unputwait(spc, &rw);
303 return rv;
304 }
305
306 rv = waitresp(spc, &rw);
307
308 DPRINTF(("copyin: response %d\n", rv));
309
310 *resp = rw.rw_data;
311 if (wantstr)
312 *dlen = rw.rw_dlen;
313
314 return rv;
315
316 }
317
318 static int
319 send_copyout_req(struct spclient *spc, const void *remaddr,
320 const void *data, size_t dlen)
321 {
322 struct rsp_hdr rhdr;
323 struct rsp_copydata copydata;
324 int rv;
325
326 DPRINTF(("copyout_req (async): %zu bytes to %p\n", dlen, remaddr));
327
328 rhdr.rsp_len = sizeof(rhdr) + sizeof(copydata) + dlen;
329 rhdr.rsp_reqno = nextreq(spc);
330 rhdr.rsp_class = RUMPSP_REQ;
331 rhdr.rsp_type = RUMPSP_COPYOUT;
332 rhdr.rsp_sysnum = 0;
333
334 copydata.rcp_addr = __UNCONST(remaddr);
335 copydata.rcp_len = dlen;
336
337 sendlock(spc);
338 rv = dosend(spc, &rhdr, sizeof(rhdr));
339 rv = dosend(spc, ©data, sizeof(copydata));
340 rv = dosend(spc, data, dlen);
341 sendunlock(spc);
342
343 return rv;
344 }
345
346 static int
347 anonmmap_req(struct spclient *spc, size_t howmuch, void **resp)
348 {
349 struct rsp_hdr rhdr;
350 struct respwait rw;
351 int rv;
352
353 DPRINTF(("anonmmap_req: %zu bytes\n", howmuch));
354
355 rhdr.rsp_len = sizeof(rhdr) + sizeof(howmuch);
356 rhdr.rsp_class = RUMPSP_REQ;
357 rhdr.rsp_type = RUMPSP_ANONMMAP;
358 rhdr.rsp_sysnum = 0;
359
360 putwait(spc, &rw, &rhdr);
361 rv = dosend(spc, &rhdr, sizeof(rhdr));
362 rv = dosend(spc, &howmuch, sizeof(howmuch));
363 if (rv) {
364 unputwait(spc, &rw);
365 return rv;
366 }
367
368 rv = waitresp(spc, &rw);
369
370 *resp = rw.rw_data;
371
372 DPRINTF(("anonmmap: mapped at %p\n", **(void ***)resp));
373
374 return rv;
375 }
376
377 static void
378 spcref(struct spclient *spc)
379 {
380
381 pthread_mutex_lock(&spc->spc_mtx);
382 spc->spc_refcnt++;
383 pthread_mutex_unlock(&spc->spc_mtx);
384 }
385
386 static void
387 spcrelease(struct spclient *spc)
388 {
389 int ref;
390
391 pthread_mutex_lock(&spc->spc_mtx);
392 ref = --spc->spc_refcnt;
393 pthread_mutex_unlock(&spc->spc_mtx);
394
395 if (ref > 0)
396 return;
397
398 DPRINTF(("rump_sp: spcrelease: spc %p fd %d\n", spc, spc->spc_fd));
399
400 _DIAGASSERT(TAILQ_EMPTY(&spc->spc_respwait));
401 _DIAGASSERT(spc->spc_buf == NULL);
402
403 if (spc->spc_mainlwp) {
404 lwproc_switch(spc->spc_mainlwp);
405 lwproc_release();
406 }
407 spc->spc_mainlwp = NULL;
408
409 close(spc->spc_fd);
410 spc->spc_fd = -1;
411 spc->spc_state = SPCSTATE_NEW;
412
413 atomic_inc_uint(&disco);
414 }
415
416 static void
417 serv_handledisco(unsigned int idx)
418 {
419 struct spclient *spc = &spclist[idx];
420
421 DPRINTF(("rump_sp: disconnecting [%u]\n", idx));
422
423 pfdlist[idx].fd = -1;
424 pfdlist[idx].revents = 0;
425 pthread_mutex_lock(&spc->spc_mtx);
426 spc->spc_state = SPCSTATE_DYING;
427 kickall(spc);
428 pthread_mutex_unlock(&spc->spc_mtx);
429
430 /*
431 * Nobody's going to attempt to send/receive anymore,
432 * so reinit info relevant to that.
433 */
434 /*LINTED:pointer casts may be ok*/
435 memset((char *)spc + SPC_ZEROFF, 0, sizeof(*spc) - SPC_ZEROFF);
436
437 spcrelease(spc);
438 }
439
440 static void
441 serv_shutdown(void)
442 {
443 struct spclient *spc;
444 unsigned int i;
445
446 for (i = 1; i < MAXCLI; i++) {
447 spc = &spclist[i];
448 if (spc->spc_fd == -1)
449 continue;
450
451 shutdown(spc->spc_fd, SHUT_RDWR);
452 serv_handledisco(i);
453
454 spcrelease(spc);
455 }
456 }
457
458 static unsigned
459 serv_handleconn(int fd, connecthook_fn connhook, int busy)
460 {
461 struct sockaddr_storage ss;
462 socklen_t sl = sizeof(ss);
463 int newfd, flags;
464 unsigned i;
465
466 /*LINTED: cast ok */
467 newfd = accept(fd, (struct sockaddr *)&ss, &sl);
468 if (newfd == -1)
469 return 0;
470
471 if (busy) {
472 close(newfd); /* EBUSY */
473 return 0;
474 }
475
476 flags = fcntl(newfd, F_GETFL, 0);
477 if (fcntl(newfd, F_SETFL, flags | O_NONBLOCK) == -1) {
478 close(newfd);
479 return 0;
480 }
481
482 if (connhook(newfd) != 0) {
483 close(newfd);
484 return 0;
485 }
486
487 /* write out a banner for the client */
488 if (write(newfd, banner, strlen(banner)) != (ssize_t)strlen(banner)) {
489 close(newfd);
490 return 0;
491 }
492
493 /* find empty slot the simple way */
494 for (i = 0; i < MAXCLI; i++) {
495 if (pfdlist[i].fd == -1 && spclist[i].spc_state == SPCSTATE_NEW)
496 break;
497 }
498
499 assert(i < MAXCLI);
500
501 pfdlist[i].fd = newfd;
502 spclist[i].spc_fd = newfd;
503 spclist[i].spc_istatus = SPCSTATUS_BUSY; /* dedicated receiver */
504 spclist[i].spc_refcnt = 1;
505
506 TAILQ_INIT(&spclist[i].spc_respwait);
507
508 DPRINTF(("rump_sp: added new connection fd %d at idx %u\n", newfd, i));
509
510 return i;
511 }
512
513 static void
514 serv_handlesyscall(struct spclient *spc, struct rsp_hdr *rhdr, uint8_t *data)
515 {
516 register_t retval[2] = {0, 0};
517 int rv, sysnum;
518
519 sysnum = (int)rhdr->rsp_sysnum;
520 DPRINTF(("rump_sp: handling syscall %d from client %d\n",
521 sysnum, spc->spc_pid));
522
523 lwproc_newlwp(spc->spc_pid);
524 rv = rumpsyscall(sysnum, data, retval);
525 lwproc_release();
526
527 DPRINTF(("rump_sp: got return value %d & %d/%d\n",
528 rv, retval[0], retval[1]));
529
530 send_syscall_resp(spc, rhdr->rsp_reqno, rv, retval);
531 }
532
533 struct sysbouncearg {
534 struct spclient *sba_spc;
535 struct rsp_hdr sba_hdr;
536 uint8_t *sba_data;
537
538 TAILQ_ENTRY(sysbouncearg) sba_entries;
539 };
540 static pthread_mutex_t sbamtx;
541 static pthread_cond_t sbacv;
542 static int nworker, idleworker;
543 static TAILQ_HEAD(, sysbouncearg) syslist = TAILQ_HEAD_INITIALIZER(syslist);
544
545 /*ARGSUSED*/
546 static void *
547 serv_syscallbouncer(void *arg)
548 {
549 struct sysbouncearg *sba;
550
551 for (;;) {
552 pthread_mutex_lock(&sbamtx);
553 if (idleworker >= rumpsp_idleworker) {
554 nworker--;
555 pthread_mutex_unlock(&sbamtx);
556 break;
557 }
558 idleworker++;
559 while (TAILQ_EMPTY(&syslist)) {
560 pthread_cond_wait(&sbacv, &sbamtx);
561 }
562
563 sba = TAILQ_FIRST(&syslist);
564 TAILQ_REMOVE(&syslist, sba, sba_entries);
565 idleworker--;
566 pthread_mutex_unlock(&sbamtx);
567
568 serv_handlesyscall(sba->sba_spc,
569 &sba->sba_hdr, sba->sba_data);
570 spcrelease(sba->sba_spc);
571 free(sba->sba_data);
572 free(sba);
573 }
574
575 return NULL;
576 }
577
578 static int
579 sp_copyin(void *arg, const void *raddr, void *laddr, size_t *len, int wantstr)
580 {
581 struct spclient *spc = arg;
582 void *rdata = NULL; /* XXXuninit */
583 int rv, nlocks;
584
585 rumpuser__kunlock(0, &nlocks, NULL);
586
587 rv = copyin_req(spc, raddr, len, wantstr, &rdata);
588 if (rv)
589 goto out;
590
591 memcpy(laddr, rdata, *len);
592 free(rdata);
593
594 out:
595 rumpuser__klock(nlocks, NULL);
596 if (rv)
597 return EFAULT;
598 return 0;
599 }
600
601 int
602 rumpuser_sp_copyin(void *arg, const void *raddr, void *laddr, size_t len)
603 {
604
605 return sp_copyin(arg, raddr, laddr, &len, 0);
606 }
607
608 int
609 rumpuser_sp_copyinstr(void *arg, const void *raddr, void *laddr, size_t *len)
610 {
611
612 return sp_copyin(arg, raddr, laddr, len, 1);
613 }
614
615 static int
616 sp_copyout(void *arg, const void *laddr, void *raddr, size_t dlen)
617 {
618 struct spclient *spc = arg;
619 int nlocks, rv;
620
621 rumpuser__kunlock(0, &nlocks, NULL);
622 rv = send_copyout_req(spc, raddr, laddr, dlen);
623 rumpuser__klock(nlocks, NULL);
624
625 if (rv)
626 return EFAULT;
627 return 0;
628 }
629
630 int
631 rumpuser_sp_copyout(void *arg, const void *laddr, void *raddr, size_t dlen)
632 {
633
634 return sp_copyout(arg, laddr, raddr, dlen);
635 }
636
637 int
638 rumpuser_sp_copyoutstr(void *arg, const void *laddr, void *raddr, size_t *dlen)
639 {
640
641 return sp_copyout(arg, laddr, raddr, *dlen);
642 }
643
644 int
645 rumpuser_sp_anonmmap(void *arg, size_t howmuch, void **addr)
646 {
647 struct spclient *spc = arg;
648 void *resp, *rdata;
649 int nlocks, rv;
650
651 rumpuser__kunlock(0, &nlocks, NULL);
652
653 rv = anonmmap_req(spc, howmuch, &rdata);
654 if (rv) {
655 rv = EFAULT;
656 goto out;
657 }
658
659 resp = *(void **)rdata;
660 free(rdata);
661
662 if (resp == NULL) {
663 rv = ENOMEM;
664 }
665
666 *addr = resp;
667
668 out:
669 rumpuser__klock(nlocks, NULL);
670
671 if (rv)
672 return rv;
673 return 0;
674 }
675
676 /*
677 *
678 * Startup routines and mainloop for server.
679 *
680 */
681
682 struct spservarg {
683 int sps_sock;
684 connecthook_fn sps_connhook;
685 };
686
687 static pthread_attr_t pattr_detached;
688 static void
689 handlereq(struct spclient *spc)
690 {
691 struct sysbouncearg *sba;
692 pthread_t pt;
693 int retries, error, i;
694
695 if (__predict_false(spc->spc_state == SPCSTATE_NEW)) {
696 if (spc->spc_hdr.rsp_type != RUMPSP_HANDSHAKE) {
697 send_error_resp(spc, spc->spc_hdr.rsp_reqno, EAUTH);
698 shutdown(spc->spc_fd, SHUT_RDWR);
699 spcfreebuf(spc);
700 return;
701 }
702
703 if (spc->spc_hdr.rsp_handshake == HANDSHAKE_GUEST) {
704 if ((error = lwproc_rfork(spc, RUMP_RFCFDG)) != 0) {
705 shutdown(spc->spc_fd, SHUT_RDWR);
706 }
707
708 spcfreebuf(spc);
709 if (error)
710 return;
711
712 spc->spc_mainlwp = lwproc_curlwp();
713
714 send_handshake_resp(spc, spc->spc_hdr.rsp_reqno, 0);
715 } else if (spc->spc_hdr.rsp_handshake == HANDSHAKE_FORK) {
716 struct lwp *tmpmain;
717 struct prefork *pf;
718 struct handshake_fork *rfp;
719 uint64_t reqno;
720 int cancel;
721
722 reqno = spc->spc_hdr.rsp_reqno;
723 if (spc->spc_off-HDRSZ != sizeof(*rfp)) {
724 send_error_resp(spc, reqno, EINVAL);
725 shutdown(spc->spc_fd, SHUT_RDWR);
726 spcfreebuf(spc);
727 return;
728 }
729
730 /*LINTED*/
731 rfp = (void *)spc->spc_buf;
732 cancel = rfp->rf_cancel;
733
734 pthread_mutex_lock(&pfmtx);
735 LIST_FOREACH(pf, &preforks, pf_entries) {
736 if (memcmp(rfp->rf_auth, pf->pf_auth,
737 sizeof(rfp->rf_auth)) == 0) {
738 LIST_REMOVE(pf, pf_entries);
739 LIST_REMOVE(pf, pf_spcentries);
740 break;
741 }
742 }
743 pthread_mutex_lock(&pfmtx);
744 spcfreebuf(spc);
745
746 if (!pf) {
747 send_error_resp(spc, reqno, ESRCH);
748 shutdown(spc->spc_fd, SHUT_RDWR);
749 return;
750 }
751
752 tmpmain = pf->pf_lwp;
753 free(pf);
754 lwproc_switch(tmpmain);
755 if (cancel) {
756 lwproc_release();
757 shutdown(spc->spc_fd, SHUT_RDWR);
758 return;
759 }
760
761 /*
762 * So, we forked already during "prefork" to save
763 * the file descriptors from a parent exit
764 * race condition. But now we need to fork
765 * a second time since the initial fork has
766 * the wrong spc pointer. (yea, optimize
767 * interfaces some day if anyone cares)
768 */
769 if ((error = lwproc_rfork(spc, 0)) != 0) {
770 send_error_resp(spc, reqno, error);
771 shutdown(spc->spc_fd, SHUT_RDWR);
772 lwproc_release();
773 return;
774 }
775 spc->spc_mainlwp = lwproc_curlwp();
776 lwproc_switch(tmpmain);
777 lwproc_release();
778 lwproc_switch(spc->spc_mainlwp);
779
780 send_handshake_resp(spc, reqno, 0);
781 }
782
783 spc->spc_pid = lwproc_getpid();
784
785 DPRINTF(("rump_sp: handshake for client %p complete, pid %d\n",
786 spc, spc->spc_pid));
787
788 lwproc_switch(NULL);
789 spc->spc_state = SPCSTATE_RUNNING;
790 return;
791 }
792
793 if (__predict_false(spc->spc_hdr.rsp_type == RUMPSP_PREFORK)) {
794 struct prefork *pf;
795 uint64_t reqno;
796 uint32_t auth[AUTHLEN];
797
798 DPRINTF(("rump_sp: prefork handler executing for %p\n", spc));
799 reqno = spc->spc_hdr.rsp_reqno;
800 spcfreebuf(spc);
801
802 pf = malloc(sizeof(*pf));
803 if (pf == NULL) {
804 send_error_resp(spc, reqno, ENOMEM);
805 return;
806 }
807
808 /*
809 * Use client main lwp to fork. this is never used by
810 * worker threads (except if spc refcount goes to 0),
811 * so we can safely use it here.
812 */
813 lwproc_switch(spc->spc_mainlwp);
814 if ((error = lwproc_rfork(spc, RUMP_RFFDG)) != 0) {
815 DPRINTF(("rump_sp: fork failed: %d (%p)\n",error, spc));
816 send_error_resp(spc, reqno, error);
817 lwproc_switch(NULL);
818 free(pf);
819 return;
820 }
821
822 /* Ok, we have a new process context and a new curlwp */
823 for (i = 0; i < AUTHLEN; i++) {
824 pf->pf_auth[i] = auth[i] = arc4random();
825 }
826 pf->pf_lwp = lwproc_curlwp();
827 lwproc_switch(NULL);
828
829 pthread_mutex_lock(&pfmtx);
830 LIST_INSERT_HEAD(&preforks, pf, pf_entries);
831 LIST_INSERT_HEAD(&spc->spc_pflist, pf, pf_spcentries);
832 pthread_mutex_unlock(&pfmtx);
833
834 DPRINTF(("rump_sp: prefork handler success %p\n", spc));
835
836 send_prefork_resp(spc, reqno, auth);
837 return;
838 }
839
840 if (__predict_false(spc->spc_hdr.rsp_type != RUMPSP_SYSCALL)) {
841 send_error_resp(spc, spc->spc_hdr.rsp_reqno, EINVAL);
842 spcfreebuf(spc);
843 return;
844 }
845
846 retries = 0;
847 while ((sba = malloc(sizeof(*sba))) == NULL) {
848 if (nworker == 0 || retries > 10) {
849 send_error_resp(spc, spc->spc_hdr.rsp_reqno, EAGAIN);
850 spcfreebuf(spc);
851 return;
852 }
853 /* slim chance of more memory? */
854 usleep(10000);
855 }
856
857 sba->sba_spc = spc;
858 sba->sba_hdr = spc->spc_hdr;
859 sba->sba_data = spc->spc_buf;
860 spcresetbuf(spc);
861
862 spcref(spc);
863
864 pthread_mutex_lock(&sbamtx);
865 TAILQ_INSERT_TAIL(&syslist, sba, sba_entries);
866 if (idleworker > 0) {
867 /* do we have a daemon's tool (i.e. idle threads)? */
868 pthread_cond_signal(&sbacv);
869 } else if (nworker < rumpsp_maxworker) {
870 /*
871 * Else, need to create one
872 * (if we can, otherwise just expect another
873 * worker to pick up the syscall)
874 */
875 if (pthread_create(&pt, &pattr_detached,
876 serv_syscallbouncer, NULL) == 0)
877 nworker++;
878 }
879 pthread_mutex_unlock(&sbamtx);
880 }
881
882 static void *
883 spserver(void *arg)
884 {
885 struct spservarg *sarg = arg;
886 struct spclient *spc;
887 unsigned idx;
888 int seen;
889 int rv;
890 unsigned int nfds, maxidx;
891
892 for (idx = 0; idx < MAXCLI; idx++) {
893 pfdlist[idx].fd = -1;
894 pfdlist[idx].events = POLLIN;
895
896 spc = &spclist[idx];
897 pthread_mutex_init(&spc->spc_mtx, NULL);
898 pthread_cond_init(&spc->spc_cv, NULL);
899 spc->spc_fd = -1;
900 }
901 pfdlist[0].fd = spclist[0].spc_fd = sarg->sps_sock;
902 pfdlist[0].events = POLLIN;
903 nfds = 1;
904 maxidx = 0;
905
906 pthread_attr_init(&pattr_detached);
907 pthread_attr_setdetachstate(&pattr_detached, PTHREAD_CREATE_DETACHED);
908 /* XXX: doesn't stacksize currently work on NetBSD */
909 pthread_attr_setstacksize(&pattr_detached, 32*1024);
910
911 pthread_mutex_init(&sbamtx, NULL);
912 pthread_cond_init(&sbacv, NULL);
913
914 DPRINTF(("rump_sp: server mainloop\n"));
915
916 for (;;) {
917 int discoed;
918
919 /* g/c hangarounds (eventually) */
920 discoed = atomic_swap_uint(&disco, 0);
921 while (discoed--) {
922 nfds--;
923 idx = maxidx;
924 while (idx) {
925 if (pfdlist[idx].fd != -1) {
926 maxidx = idx;
927 break;
928 }
929 idx--;
930 }
931 DPRINTF(("rump_sp: set maxidx to [%u]\n",
932 maxidx));
933 }
934
935 DPRINTF(("rump_sp: loop nfd %d\n", maxidx+1));
936 seen = 0;
937 rv = poll(pfdlist, maxidx+1, INFTIM);
938 assert(maxidx+1 <= MAXCLI);
939 assert(rv != 0);
940 if (rv == -1) {
941 if (errno == EINTR)
942 continue;
943 fprintf(stderr, "rump_spserver: poll returned %d\n",
944 errno);
945 break;
946 }
947
948 for (idx = 0; seen < rv && idx < MAXCLI; idx++) {
949 if ((pfdlist[idx].revents & POLLIN) == 0)
950 continue;
951
952 seen++;
953 DPRINTF(("rump_sp: activity at [%u] %d/%d\n",
954 idx, seen, rv));
955 if (idx > 0) {
956 spc = &spclist[idx];
957 DPRINTF(("rump_sp: mainloop read [%u]\n", idx));
958 switch (readframe(spc)) {
959 case 0:
960 break;
961 case -1:
962 serv_handledisco(idx);
963 break;
964 default:
965 switch (spc->spc_hdr.rsp_class) {
966 case RUMPSP_RESP:
967 kickwaiter(spc);
968 break;
969 case RUMPSP_REQ:
970 handlereq(spc);
971 break;
972 default:
973 send_error_resp(spc,
974 spc->spc_hdr.rsp_reqno,
975 ENOENT);
976 spcfreebuf(spc);
977 break;
978 }
979 break;
980 }
981
982 } else {
983 DPRINTF(("rump_sp: mainloop new connection\n"));
984
985 if (__predict_false(spfini)) {
986 close(spclist[0].spc_fd);
987 serv_shutdown();
988 goto out;
989 }
990
991 idx = serv_handleconn(pfdlist[0].fd,
992 sarg->sps_connhook, nfds == MAXCLI);
993 if (idx)
994 nfds++;
995 if (idx > maxidx)
996 maxidx = idx;
997 DPRINTF(("rump_sp: maxid now %d\n", maxidx));
998 }
999 }
1000 }
1001
1002 out:
1003 return NULL;
1004 }
1005
1006 static unsigned cleanupidx;
1007 static struct sockaddr *cleanupsa;
1008 int
1009 rumpuser_sp_init(const char *url, const struct rumpuser_sp_ops *spopsp,
1010 const char *ostype, const char *osrelease, const char *machine)
1011 {
1012 pthread_t pt;
1013 struct spservarg *sarg;
1014 struct sockaddr *sap;
1015 char *p;
1016 unsigned idx;
1017 int error, s;
1018
1019 p = strdup(url);
1020 if (p == NULL)
1021 return ENOMEM;
1022 error = parseurl(p, &sap, &idx, 1);
1023 free(p);
1024 if (error)
1025 return error;
1026
1027 snprintf(banner, sizeof(banner), "RUMPSP-%d.%d-%s-%s/%s\n",
1028 PROTOMAJOR, PROTOMINOR, ostype, osrelease, machine);
1029
1030 s = socket(parsetab[idx].domain, SOCK_STREAM, 0);
1031 if (s == -1)
1032 return errno;
1033
1034 spops = *spopsp;
1035 sarg = malloc(sizeof(*sarg));
1036 if (sarg == NULL) {
1037 close(s);
1038 return ENOMEM;
1039 }
1040
1041 sarg->sps_sock = s;
1042 sarg->sps_connhook = parsetab[idx].connhook;
1043
1044 cleanupidx = idx;
1045 cleanupsa = sap;
1046
1047 /* sloppy error recovery */
1048
1049 /*LINTED*/
1050 if (bind(s, sap, sap->sa_len) == -1) {
1051 fprintf(stderr, "rump_sp: server bind failed\n");
1052 return errno;
1053 }
1054
1055 if (listen(s, MAXCLI) == -1) {
1056 fprintf(stderr, "rump_sp: server listen failed\n");
1057 return errno;
1058 }
1059
1060 if ((error = pthread_create(&pt, NULL, spserver, sarg)) != 0) {
1061 fprintf(stderr, "rump_sp: cannot create wrkr thread\n");
1062 return errno;
1063 }
1064 pthread_detach(pt);
1065
1066 return 0;
1067 }
1068
1069 void
1070 rumpuser_sp_fini()
1071 {
1072
1073 if (spclist[0].spc_fd) {
1074 parsetab[cleanupidx].cleanup(cleanupsa);
1075 shutdown(spclist[0].spc_fd, SHUT_RDWR);
1076 spfini = 1;
1077 }
1078 }
1079