rumpuser_sp.c revision 1.44 1 /* $NetBSD: rumpuser_sp.c,v 1.44 2011/03/08 12:39:28 pooka Exp $ */
2
3 /*
4 * Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
16 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18 * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
21 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 * SUCH DAMAGE.
26 */
27
28 /*
29 * Sysproxy routines. This provides system RPC support over host sockets.
30 * The most notable limitation is that the client and server must share
31 * the same ABI. This does not mean that they have to be the same
32 * machine or that they need to run the same version of the host OS,
33 * just that they must agree on the data structures. This even *might*
34 * work correctly from one hardware architecture to another.
35 */
36
37 #include <sys/cdefs.h>
38 __RCSID("$NetBSD: rumpuser_sp.c,v 1.44 2011/03/08 12:39:28 pooka Exp $");
39
40 #include <sys/types.h>
41 #include <sys/atomic.h>
42 #include <sys/mman.h>
43 #include <sys/socket.h>
44
45 #include <arpa/inet.h>
46 #include <netinet/in.h>
47 #include <netinet/tcp.h>
48
49 #include <assert.h>
50 #include <errno.h>
51 #include <fcntl.h>
52 #include <poll.h>
53 #include <pthread.h>
54 #include <stdarg.h>
55 #include <stdio.h>
56 #include <stdlib.h>
57 #include <string.h>
58 #include <unistd.h>
59
60 #include <rump/rump.h> /* XXX: for rfork flags */
61 #include <rump/rumpuser.h>
62 #include "rumpuser_int.h"
63
64 #include "sp_common.c"
65
66 #ifndef MAXCLI
67 #define MAXCLI 256
68 #endif
69 #ifndef MAXWORKER
70 #define MAXWORKER 128
71 #endif
72 #ifndef IDLEWORKER
73 #define IDLEWORKER 16
74 #endif
75 int rumpsp_maxworker = MAXWORKER;
76 int rumpsp_idleworker = IDLEWORKER;
77
78 static struct pollfd pfdlist[MAXCLI];
79 static struct spclient spclist[MAXCLI];
80 static unsigned int disco;
81 static volatile int spfini;
82
83 static struct rumpuser_sp_ops spops;
84
85 static char banner[MAXBANNER];
86
87 #define PROTOMAJOR 0
88 #define PROTOMINOR 3
89
90 struct prefork {
91 uint32_t pf_auth[AUTHLEN];
92 struct lwp *pf_lwp;
93
94 LIST_ENTRY(prefork) pf_entries; /* global list */
95 LIST_ENTRY(prefork) pf_spcentries; /* linked from forking spc */
96 };
97 static LIST_HEAD(, prefork) preforks = LIST_HEAD_INITIALIZER(preforks);
98 static pthread_mutex_t pfmtx;
99
100 /*
101 * This version is for the server. It's optimized for multiple threads
102 * and is *NOT* reentrant wrt to signals.
103 */
104 static int
105 waitresp(struct spclient *spc, struct respwait *rw)
106 {
107 int spcstate;
108 int rv = 0;
109
110 pthread_mutex_lock(&spc->spc_mtx);
111 sendunlockl(spc);
112 while (!rw->rw_done && spc->spc_state != SPCSTATE_DYING) {
113 pthread_cond_wait(&rw->rw_cv, &spc->spc_mtx);
114 }
115 TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries);
116 spcstate = spc->spc_state;
117 pthread_mutex_unlock(&spc->spc_mtx);
118
119 pthread_cond_destroy(&rw->rw_cv);
120
121 if (rv)
122 return rv;
123 if (spcstate == SPCSTATE_DYING)
124 return ENOTCONN;
125 return rw->rw_error;
126 }
127
128 /*
129 * Manual wrappers, since librump does not have access to the
130 * user namespace wrapped interfaces.
131 */
132
133 static void
134 lwproc_switch(struct lwp *l)
135 {
136
137 spops.spop_schedule();
138 spops.spop_lwproc_switch(l);
139 spops.spop_unschedule();
140 }
141
142 static void
143 lwproc_release(void)
144 {
145
146 spops.spop_schedule();
147 spops.spop_lwproc_release();
148 spops.spop_unschedule();
149 }
150
151 static int
152 lwproc_rfork(struct spclient *spc, int flags, const char *comm)
153 {
154 int rv;
155
156 spops.spop_schedule();
157 rv = spops.spop_lwproc_rfork(spc, flags, comm);
158 spops.spop_unschedule();
159
160 return rv;
161 }
162
163 static int
164 lwproc_newlwp(pid_t pid)
165 {
166 int rv;
167
168 spops.spop_schedule();
169 rv = spops.spop_lwproc_newlwp(pid);
170 spops.spop_unschedule();
171
172 return rv;
173 }
174
175 static struct lwp *
176 lwproc_curlwp(void)
177 {
178 struct lwp *l;
179
180 spops.spop_schedule();
181 l = spops.spop_lwproc_curlwp();
182 spops.spop_unschedule();
183
184 return l;
185 }
186
187 static pid_t
188 lwproc_getpid(void)
189 {
190 pid_t p;
191
192 spops.spop_schedule();
193 p = spops.spop_getpid();
194 spops.spop_unschedule();
195
196 return p;
197 }
198
199 static void
200 lwproc_execnotify(const char *comm)
201 {
202
203 spops.spop_schedule();
204 spops.spop_execnotify(comm);
205 spops.spop_unschedule();
206 }
207
208 static void
209 lwproc_lwpexit(void)
210 {
211
212 spops.spop_schedule();
213 spops.spop_lwpexit();
214 spops.spop_unschedule();
215 }
216
217 static int
218 rumpsyscall(int sysnum, void *data, register_t *retval)
219 {
220 int rv;
221
222 spops.spop_schedule();
223 rv = spops.spop_syscall(sysnum, data, retval);
224 spops.spop_unschedule();
225
226 return rv;
227 }
228
229 static uint64_t
230 nextreq(struct spclient *spc)
231 {
232 uint64_t nw;
233
234 pthread_mutex_lock(&spc->spc_mtx);
235 nw = spc->spc_nextreq++;
236 pthread_mutex_unlock(&spc->spc_mtx);
237
238 return nw;
239 }
240
241 static void
242 send_error_resp(struct spclient *spc, uint64_t reqno, int error)
243 {
244 struct rsp_hdr rhdr;
245
246 rhdr.rsp_len = sizeof(rhdr);
247 rhdr.rsp_reqno = reqno;
248 rhdr.rsp_class = RUMPSP_ERROR;
249 rhdr.rsp_type = 0;
250 rhdr.rsp_error = error;
251
252 sendlock(spc);
253 (void)dosend(spc, &rhdr, sizeof(rhdr));
254 sendunlock(spc);
255 }
256
257 static int
258 send_handshake_resp(struct spclient *spc, uint64_t reqno, int error)
259 {
260 struct rsp_hdr rhdr;
261 int rv;
262
263 rhdr.rsp_len = sizeof(rhdr) + sizeof(error);
264 rhdr.rsp_reqno = reqno;
265 rhdr.rsp_class = RUMPSP_RESP;
266 rhdr.rsp_type = RUMPSP_HANDSHAKE;
267 rhdr.rsp_error = 0;
268
269 sendlock(spc);
270 rv = dosend(spc, &rhdr, sizeof(rhdr));
271 rv = dosend(spc, &error, sizeof(error));
272 sendunlock(spc);
273
274 return rv;
275 }
276
277 static int
278 send_syscall_resp(struct spclient *spc, uint64_t reqno, int error,
279 register_t *retval)
280 {
281 struct rsp_hdr rhdr;
282 struct rsp_sysresp sysresp;
283 int rv;
284
285 rhdr.rsp_len = sizeof(rhdr) + sizeof(sysresp);
286 rhdr.rsp_reqno = reqno;
287 rhdr.rsp_class = RUMPSP_RESP;
288 rhdr.rsp_type = RUMPSP_SYSCALL;
289 rhdr.rsp_sysnum = 0;
290
291 sysresp.rsys_error = error;
292 memcpy(sysresp.rsys_retval, retval, sizeof(sysresp.rsys_retval));
293
294 sendlock(spc);
295 rv = dosend(spc, &rhdr, sizeof(rhdr));
296 rv = dosend(spc, &sysresp, sizeof(sysresp));
297 sendunlock(spc);
298
299 return rv;
300 }
301
302 static int
303 send_prefork_resp(struct spclient *spc, uint64_t reqno, uint32_t *auth)
304 {
305 struct rsp_hdr rhdr;
306 int rv;
307
308 rhdr.rsp_len = sizeof(rhdr) + AUTHLEN*sizeof(*auth);
309 rhdr.rsp_reqno = reqno;
310 rhdr.rsp_class = RUMPSP_RESP;
311 rhdr.rsp_type = RUMPSP_PREFORK;
312 rhdr.rsp_sysnum = 0;
313
314 sendlock(spc);
315 rv = dosend(spc, &rhdr, sizeof(rhdr));
316 rv = dosend(spc, auth, AUTHLEN*sizeof(*auth));
317 sendunlock(spc);
318
319 return rv;
320 }
321
322 static int
323 copyin_req(struct spclient *spc, const void *remaddr, size_t *dlen,
324 int wantstr, void **resp)
325 {
326 struct rsp_hdr rhdr;
327 struct rsp_copydata copydata;
328 struct respwait rw;
329 int rv;
330
331 DPRINTF(("copyin_req: %zu bytes from %p\n", *dlen, remaddr));
332
333 rhdr.rsp_len = sizeof(rhdr) + sizeof(copydata);
334 rhdr.rsp_class = RUMPSP_REQ;
335 if (wantstr)
336 rhdr.rsp_type = RUMPSP_COPYINSTR;
337 else
338 rhdr.rsp_type = RUMPSP_COPYIN;
339 rhdr.rsp_sysnum = 0;
340
341 copydata.rcp_addr = __UNCONST(remaddr);
342 copydata.rcp_len = *dlen;
343
344 putwait(spc, &rw, &rhdr);
345 rv = dosend(spc, &rhdr, sizeof(rhdr));
346 rv = dosend(spc, ©data, sizeof(copydata));
347 if (rv) {
348 unputwait(spc, &rw);
349 return rv;
350 }
351
352 rv = waitresp(spc, &rw);
353
354 DPRINTF(("copyin: response %d\n", rv));
355
356 *resp = rw.rw_data;
357 if (wantstr)
358 *dlen = rw.rw_dlen;
359
360 return rv;
361
362 }
363
364 static int
365 send_copyout_req(struct spclient *spc, const void *remaddr,
366 const void *data, size_t dlen)
367 {
368 struct rsp_hdr rhdr;
369 struct rsp_copydata copydata;
370 int rv;
371
372 DPRINTF(("copyout_req (async): %zu bytes to %p\n", dlen, remaddr));
373
374 rhdr.rsp_len = sizeof(rhdr) + sizeof(copydata) + dlen;
375 rhdr.rsp_reqno = nextreq(spc);
376 rhdr.rsp_class = RUMPSP_REQ;
377 rhdr.rsp_type = RUMPSP_COPYOUT;
378 rhdr.rsp_sysnum = 0;
379
380 copydata.rcp_addr = __UNCONST(remaddr);
381 copydata.rcp_len = dlen;
382
383 sendlock(spc);
384 rv = dosend(spc, &rhdr, sizeof(rhdr));
385 rv = dosend(spc, ©data, sizeof(copydata));
386 rv = dosend(spc, data, dlen);
387 sendunlock(spc);
388
389 return rv;
390 }
391
392 static int
393 anonmmap_req(struct spclient *spc, size_t howmuch, void **resp)
394 {
395 struct rsp_hdr rhdr;
396 struct respwait rw;
397 int rv;
398
399 DPRINTF(("anonmmap_req: %zu bytes\n", howmuch));
400
401 rhdr.rsp_len = sizeof(rhdr) + sizeof(howmuch);
402 rhdr.rsp_class = RUMPSP_REQ;
403 rhdr.rsp_type = RUMPSP_ANONMMAP;
404 rhdr.rsp_sysnum = 0;
405
406 putwait(spc, &rw, &rhdr);
407 rv = dosend(spc, &rhdr, sizeof(rhdr));
408 rv = dosend(spc, &howmuch, sizeof(howmuch));
409 if (rv) {
410 unputwait(spc, &rw);
411 return rv;
412 }
413
414 rv = waitresp(spc, &rw);
415
416 *resp = rw.rw_data;
417
418 DPRINTF(("anonmmap: mapped at %p\n", **(void ***)resp));
419
420 return rv;
421 }
422
423 static int
424 send_raise_req(struct spclient *spc, int signo)
425 {
426 struct rsp_hdr rhdr;
427 int rv;
428
429 rhdr.rsp_len = sizeof(rhdr);
430 rhdr.rsp_class = RUMPSP_REQ;
431 rhdr.rsp_type = RUMPSP_RAISE;
432 rhdr.rsp_signo = signo;
433
434 sendlock(spc);
435 rv = dosend(spc, &rhdr, sizeof(rhdr));
436 sendunlock(spc);
437
438 return rv;
439 }
440
441 static void
442 spcref(struct spclient *spc)
443 {
444
445 pthread_mutex_lock(&spc->spc_mtx);
446 spc->spc_refcnt++;
447 pthread_mutex_unlock(&spc->spc_mtx);
448 }
449
450 static void
451 spcrelease(struct spclient *spc)
452 {
453 int ref;
454
455 pthread_mutex_lock(&spc->spc_mtx);
456 ref = --spc->spc_refcnt;
457 if (__predict_false(spc->spc_inexec && ref <= 2))
458 pthread_cond_broadcast(&spc->spc_cv);
459 pthread_mutex_unlock(&spc->spc_mtx);
460
461 if (ref > 0)
462 return;
463
464 DPRINTF(("rump_sp: spcrelease: spc %p fd %d\n", spc, spc->spc_fd));
465
466 _DIAGASSERT(TAILQ_EMPTY(&spc->spc_respwait));
467 _DIAGASSERT(spc->spc_buf == NULL);
468
469 if (spc->spc_mainlwp) {
470 lwproc_switch(spc->spc_mainlwp);
471 lwproc_release();
472 }
473 spc->spc_mainlwp = NULL;
474
475 close(spc->spc_fd);
476 spc->spc_fd = -1;
477 spc->spc_state = SPCSTATE_NEW;
478
479 atomic_inc_uint(&disco);
480 }
481
482 static void
483 serv_handledisco(unsigned int idx)
484 {
485 struct spclient *spc = &spclist[idx];
486 int dolwpexit;
487
488 DPRINTF(("rump_sp: disconnecting [%u]\n", idx));
489
490 pfdlist[idx].fd = -1;
491 pfdlist[idx].revents = 0;
492 pthread_mutex_lock(&spc->spc_mtx);
493 spc->spc_state = SPCSTATE_DYING;
494 kickall(spc);
495 sendunlockl(spc);
496 /* exec uses mainlwp in another thread, but also nuked all lwps */
497 dolwpexit = !spc->spc_inexec;
498 pthread_mutex_unlock(&spc->spc_mtx);
499
500 if (dolwpexit && spc->spc_mainlwp) {
501 lwproc_switch(spc->spc_mainlwp);
502 lwproc_lwpexit();
503 lwproc_switch(NULL);
504 }
505
506 /*
507 * Nobody's going to attempt to send/receive anymore,
508 * so reinit info relevant to that.
509 */
510 /*LINTED:pointer casts may be ok*/
511 memset((char *)spc + SPC_ZEROFF, 0, sizeof(*spc) - SPC_ZEROFF);
512
513 spcrelease(spc);
514 }
515
516 static void
517 serv_shutdown(void)
518 {
519 struct spclient *spc;
520 unsigned int i;
521
522 for (i = 1; i < MAXCLI; i++) {
523 spc = &spclist[i];
524 if (spc->spc_fd == -1)
525 continue;
526
527 shutdown(spc->spc_fd, SHUT_RDWR);
528 serv_handledisco(i);
529
530 spcrelease(spc);
531 }
532 }
533
534 static unsigned
535 serv_handleconn(int fd, connecthook_fn connhook, int busy)
536 {
537 struct sockaddr_storage ss;
538 socklen_t sl = sizeof(ss);
539 int newfd, flags;
540 unsigned i;
541
542 /*LINTED: cast ok */
543 newfd = accept(fd, (struct sockaddr *)&ss, &sl);
544 if (newfd == -1)
545 return 0;
546
547 if (busy) {
548 close(newfd); /* EBUSY */
549 return 0;
550 }
551
552 flags = fcntl(newfd, F_GETFL, 0);
553 if (fcntl(newfd, F_SETFL, flags | O_NONBLOCK) == -1) {
554 close(newfd);
555 return 0;
556 }
557
558 if (connhook(newfd) != 0) {
559 close(newfd);
560 return 0;
561 }
562
563 /* write out a banner for the client */
564 if (send(newfd, banner, strlen(banner), MSG_NOSIGNAL)
565 != (ssize_t)strlen(banner)) {
566 close(newfd);
567 return 0;
568 }
569
570 /* find empty slot the simple way */
571 for (i = 0; i < MAXCLI; i++) {
572 if (pfdlist[i].fd == -1 && spclist[i].spc_state == SPCSTATE_NEW)
573 break;
574 }
575
576 assert(i < MAXCLI);
577
578 pfdlist[i].fd = newfd;
579 spclist[i].spc_fd = newfd;
580 spclist[i].spc_istatus = SPCSTATUS_BUSY; /* dedicated receiver */
581 spclist[i].spc_refcnt = 1;
582
583 TAILQ_INIT(&spclist[i].spc_respwait);
584
585 DPRINTF(("rump_sp: added new connection fd %d at idx %u\n", newfd, i));
586
587 return i;
588 }
589
590 static void
591 serv_handlesyscall(struct spclient *spc, struct rsp_hdr *rhdr, uint8_t *data)
592 {
593 register_t retval[2] = {0, 0};
594 int rv, sysnum;
595
596 sysnum = (int)rhdr->rsp_sysnum;
597 DPRINTF(("rump_sp: handling syscall %d from client %d\n",
598 sysnum, spc->spc_pid));
599
600 if (__predict_false((rv = lwproc_newlwp(spc->spc_pid)) != 0)) {
601 retval[0] = -1;
602 send_syscall_resp(spc, rhdr->rsp_reqno, rv, retval);
603 return;
604 }
605 spc->spc_syscallreq = rhdr->rsp_reqno;
606 rv = rumpsyscall(sysnum, data, retval);
607 spc->spc_syscallreq = 0;
608 lwproc_release();
609
610 DPRINTF(("rump_sp: got return value %d & %d/%d\n",
611 rv, retval[0], retval[1]));
612
613 send_syscall_resp(spc, rhdr->rsp_reqno, rv, retval);
614 }
615
616 static void
617 serv_handleexec(struct spclient *spc, struct rsp_hdr *rhdr, char *comm)
618 {
619 size_t commlen = rhdr->rsp_len - HDRSZ;
620
621 pthread_mutex_lock(&spc->spc_mtx);
622 /* one for the connection and one for us */
623 while (spc->spc_refcnt > 2)
624 pthread_cond_wait(&spc->spc_cv, &spc->spc_mtx);
625 pthread_mutex_unlock(&spc->spc_mtx);
626
627 /*
628 * ok, all the threads are dead (or one is still alive and
629 * the connection is dead, in which case this doesn't matter
630 * very much). proceed with exec.
631 */
632
633 /* ensure comm is 0-terminated */
634 /* TODO: make sure it contains sensible chars? */
635 comm[commlen] = '\0';
636
637 lwproc_switch(spc->spc_mainlwp);
638 lwproc_execnotify(comm);
639 lwproc_switch(NULL);
640
641 pthread_mutex_lock(&spc->spc_mtx);
642 spc->spc_inexec = 0;
643 pthread_mutex_unlock(&spc->spc_mtx);
644 send_handshake_resp(spc, rhdr->rsp_reqno, 0);
645 }
646
647 enum sbatype { SBA_SYSCALL, SBA_EXEC };
648
649 struct servbouncearg {
650 struct spclient *sba_spc;
651 struct rsp_hdr sba_hdr;
652 enum sbatype sba_type;
653 uint8_t *sba_data;
654
655 TAILQ_ENTRY(servbouncearg) sba_entries;
656 };
657 static pthread_mutex_t sbamtx;
658 static pthread_cond_t sbacv;
659 static int nworker, idleworker, nwork;
660 static TAILQ_HEAD(, servbouncearg) wrklist = TAILQ_HEAD_INITIALIZER(wrklist);
661
662 /*ARGSUSED*/
663 static void *
664 serv_workbouncer(void *arg)
665 {
666 struct servbouncearg *sba;
667
668 for (;;) {
669 pthread_mutex_lock(&sbamtx);
670 if (__predict_false(idleworker - nwork >= rumpsp_idleworker)) {
671 nworker--;
672 pthread_mutex_unlock(&sbamtx);
673 break;
674 }
675 idleworker++;
676 while (TAILQ_EMPTY(&wrklist)) {
677 _DIAGASSERT(nwork == 0);
678 pthread_cond_wait(&sbacv, &sbamtx);
679 }
680 idleworker--;
681
682 sba = TAILQ_FIRST(&wrklist);
683 TAILQ_REMOVE(&wrklist, sba, sba_entries);
684 nwork--;
685 pthread_mutex_unlock(&sbamtx);
686
687 if (__predict_true(sba->sba_type == SBA_SYSCALL)) {
688 serv_handlesyscall(sba->sba_spc,
689 &sba->sba_hdr, sba->sba_data);
690 } else {
691 _DIAGASSERT(sba->sba_type == SBA_EXEC);
692 serv_handleexec(sba->sba_spc, &sba->sba_hdr,
693 (char *)sba->sba_data);
694 }
695 spcrelease(sba->sba_spc);
696 free(sba->sba_data);
697 free(sba);
698 }
699
700 return NULL;
701 }
702
703 static int
704 sp_copyin(void *arg, const void *raddr, void *laddr, size_t *len, int wantstr)
705 {
706 struct spclient *spc = arg;
707 void *rdata = NULL; /* XXXuninit */
708 int rv, nlocks;
709
710 rumpuser__kunlock(0, &nlocks, NULL);
711
712 rv = copyin_req(spc, raddr, len, wantstr, &rdata);
713 if (rv)
714 goto out;
715
716 memcpy(laddr, rdata, *len);
717 free(rdata);
718
719 out:
720 rumpuser__klock(nlocks, NULL);
721 if (rv)
722 return EFAULT;
723 return 0;
724 }
725
726 int
727 rumpuser_sp_copyin(void *arg, const void *raddr, void *laddr, size_t len)
728 {
729
730 return sp_copyin(arg, raddr, laddr, &len, 0);
731 }
732
733 int
734 rumpuser_sp_copyinstr(void *arg, const void *raddr, void *laddr, size_t *len)
735 {
736
737 return sp_copyin(arg, raddr, laddr, len, 1);
738 }
739
740 static int
741 sp_copyout(void *arg, const void *laddr, void *raddr, size_t dlen)
742 {
743 struct spclient *spc = arg;
744 int nlocks, rv;
745
746 rumpuser__kunlock(0, &nlocks, NULL);
747 rv = send_copyout_req(spc, raddr, laddr, dlen);
748 rumpuser__klock(nlocks, NULL);
749
750 if (rv)
751 return EFAULT;
752 return 0;
753 }
754
755 int
756 rumpuser_sp_copyout(void *arg, const void *laddr, void *raddr, size_t dlen)
757 {
758
759 return sp_copyout(arg, laddr, raddr, dlen);
760 }
761
762 int
763 rumpuser_sp_copyoutstr(void *arg, const void *laddr, void *raddr, size_t *dlen)
764 {
765
766 return sp_copyout(arg, laddr, raddr, *dlen);
767 }
768
769 int
770 rumpuser_sp_anonmmap(void *arg, size_t howmuch, void **addr)
771 {
772 struct spclient *spc = arg;
773 void *resp, *rdata;
774 int nlocks, rv;
775
776 rumpuser__kunlock(0, &nlocks, NULL);
777
778 rv = anonmmap_req(spc, howmuch, &rdata);
779 if (rv) {
780 rv = EFAULT;
781 goto out;
782 }
783
784 resp = *(void **)rdata;
785 free(rdata);
786
787 if (resp == NULL) {
788 rv = ENOMEM;
789 }
790
791 *addr = resp;
792
793 out:
794 rumpuser__klock(nlocks, NULL);
795
796 if (rv)
797 return rv;
798 return 0;
799 }
800
801 int
802 rumpuser_sp_raise(void *arg, int signo)
803 {
804 struct spclient *spc = arg;
805 int rv, nlocks;
806
807 rumpuser__kunlock(0, &nlocks, NULL);
808 rv = send_raise_req(spc, signo);
809 rumpuser__klock(nlocks, NULL);
810
811 return rv;
812 }
813
814 static pthread_attr_t pattr_detached;
815 static void
816 schedulework(struct spclient *spc, enum sbatype sba_type)
817 {
818 struct servbouncearg *sba;
819 pthread_t pt;
820 uint64_t reqno;
821 int retries = 0;
822
823 reqno = spc->spc_hdr.rsp_reqno;
824 while ((sba = malloc(sizeof(*sba))) == NULL) {
825 if (nworker == 0 || retries > 10) {
826 send_error_resp(spc, reqno, EAGAIN);
827 spcfreebuf(spc);
828 return;
829 }
830 /* slim chance of more memory? */
831 usleep(10000);
832 }
833
834 sba->sba_spc = spc;
835 sba->sba_type = sba_type;
836 sba->sba_hdr = spc->spc_hdr;
837 sba->sba_data = spc->spc_buf;
838 spcresetbuf(spc);
839
840 spcref(spc);
841
842 pthread_mutex_lock(&sbamtx);
843 TAILQ_INSERT_TAIL(&wrklist, sba, sba_entries);
844 nwork++;
845 if (nwork <= idleworker) {
846 /* do we have a daemon's tool (i.e. idle threads)? */
847 pthread_cond_signal(&sbacv);
848 } else if (nworker < rumpsp_maxworker) {
849 /*
850 * Else, need to create one
851 * (if we can, otherwise just expect another
852 * worker to pick up the syscall)
853 */
854 if (pthread_create(&pt, &pattr_detached,
855 serv_workbouncer, NULL) == 0) {
856 nworker++;
857 }
858 }
859 pthread_mutex_unlock(&sbamtx);
860 }
861
862 /*
863 *
864 * Startup routines and mainloop for server.
865 *
866 */
867
868 struct spservarg {
869 int sps_sock;
870 connecthook_fn sps_connhook;
871 };
872
873 static void
874 handlereq(struct spclient *spc)
875 {
876 uint64_t reqno;
877 int error, i;
878
879 reqno = spc->spc_hdr.rsp_reqno;
880 if (__predict_false(spc->spc_state == SPCSTATE_NEW)) {
881 if (spc->spc_hdr.rsp_type != RUMPSP_HANDSHAKE) {
882 send_error_resp(spc, reqno, EAUTH);
883 shutdown(spc->spc_fd, SHUT_RDWR);
884 spcfreebuf(spc);
885 return;
886 }
887
888 if (spc->spc_hdr.rsp_handshake == HANDSHAKE_GUEST) {
889 char *comm = (char *)spc->spc_buf;
890 size_t commlen = spc->spc_hdr.rsp_len - HDRSZ;
891
892 /* ensure it's 0-terminated */
893 /* XXX make sure it contains sensible chars? */
894 comm[commlen] = '\0';
895
896 if ((error = lwproc_rfork(spc,
897 RUMP_RFCFDG, comm)) != 0) {
898 shutdown(spc->spc_fd, SHUT_RDWR);
899 }
900
901 spcfreebuf(spc);
902 if (error)
903 return;
904
905 spc->spc_mainlwp = lwproc_curlwp();
906
907 send_handshake_resp(spc, reqno, 0);
908 } else if (spc->spc_hdr.rsp_handshake == HANDSHAKE_FORK) {
909 struct lwp *tmpmain;
910 struct prefork *pf;
911 struct handshake_fork *rfp;
912 int cancel;
913
914 if (spc->spc_off-HDRSZ != sizeof(*rfp)) {
915 send_error_resp(spc, reqno, EINVAL);
916 shutdown(spc->spc_fd, SHUT_RDWR);
917 spcfreebuf(spc);
918 return;
919 }
920
921 /*LINTED*/
922 rfp = (void *)spc->spc_buf;
923 cancel = rfp->rf_cancel;
924
925 pthread_mutex_lock(&pfmtx);
926 LIST_FOREACH(pf, &preforks, pf_entries) {
927 if (memcmp(rfp->rf_auth, pf->pf_auth,
928 sizeof(rfp->rf_auth)) == 0) {
929 LIST_REMOVE(pf, pf_entries);
930 LIST_REMOVE(pf, pf_spcentries);
931 break;
932 }
933 }
934 pthread_mutex_lock(&pfmtx);
935 spcfreebuf(spc);
936
937 if (!pf) {
938 send_error_resp(spc, reqno, ESRCH);
939 shutdown(spc->spc_fd, SHUT_RDWR);
940 return;
941 }
942
943 tmpmain = pf->pf_lwp;
944 free(pf);
945 lwproc_switch(tmpmain);
946 if (cancel) {
947 lwproc_release();
948 shutdown(spc->spc_fd, SHUT_RDWR);
949 return;
950 }
951
952 /*
953 * So, we forked already during "prefork" to save
954 * the file descriptors from a parent exit
955 * race condition. But now we need to fork
956 * a second time since the initial fork has
957 * the wrong spc pointer. (yea, optimize
958 * interfaces some day if anyone cares)
959 */
960 if ((error = lwproc_rfork(spc, 0, NULL)) != 0) {
961 send_error_resp(spc, reqno, error);
962 shutdown(spc->spc_fd, SHUT_RDWR);
963 lwproc_release();
964 return;
965 }
966 spc->spc_mainlwp = lwproc_curlwp();
967 lwproc_switch(tmpmain);
968 lwproc_release();
969 lwproc_switch(spc->spc_mainlwp);
970
971 send_handshake_resp(spc, reqno, 0);
972 } else {
973 send_error_resp(spc, reqno, EAUTH);
974 shutdown(spc->spc_fd, SHUT_RDWR);
975 spcfreebuf(spc);
976 return;
977 }
978
979 spc->spc_pid = lwproc_getpid();
980
981 DPRINTF(("rump_sp: handshake for client %p complete, pid %d\n",
982 spc, spc->spc_pid));
983
984 lwproc_switch(NULL);
985 spc->spc_state = SPCSTATE_RUNNING;
986 return;
987 }
988
989 if (__predict_false(spc->spc_hdr.rsp_type == RUMPSP_PREFORK)) {
990 struct prefork *pf;
991 uint32_t auth[AUTHLEN];
992 int inexec;
993
994 DPRINTF(("rump_sp: prefork handler executing for %p\n", spc));
995 spcfreebuf(spc);
996
997 pthread_mutex_lock(&spc->spc_mtx);
998 inexec = spc->spc_inexec;
999 pthread_mutex_unlock(&spc->spc_mtx);
1000 if (inexec) {
1001 send_error_resp(spc, reqno, EBUSY);
1002 shutdown(spc->spc_fd, SHUT_RDWR);
1003 return;
1004 }
1005
1006 pf = malloc(sizeof(*pf));
1007 if (pf == NULL) {
1008 send_error_resp(spc, reqno, ENOMEM);
1009 return;
1010 }
1011
1012 /*
1013 * Use client main lwp to fork. this is never used by
1014 * worker threads (except in exec, but we checked for that
1015 * above) so we can safely use it here.
1016 */
1017 lwproc_switch(spc->spc_mainlwp);
1018 if ((error = lwproc_rfork(spc, RUMP_RFFDG, NULL)) != 0) {
1019 DPRINTF(("rump_sp: fork failed: %d (%p)\n",error, spc));
1020 send_error_resp(spc, reqno, error);
1021 lwproc_switch(NULL);
1022 free(pf);
1023 return;
1024 }
1025
1026 /* Ok, we have a new process context and a new curlwp */
1027 for (i = 0; i < AUTHLEN; i++) {
1028 pf->pf_auth[i] = auth[i] = arc4random();
1029 }
1030 pf->pf_lwp = lwproc_curlwp();
1031 lwproc_switch(NULL);
1032
1033 pthread_mutex_lock(&pfmtx);
1034 LIST_INSERT_HEAD(&preforks, pf, pf_entries);
1035 LIST_INSERT_HEAD(&spc->spc_pflist, pf, pf_spcentries);
1036 pthread_mutex_unlock(&pfmtx);
1037
1038 DPRINTF(("rump_sp: prefork handler success %p\n", spc));
1039
1040 send_prefork_resp(spc, reqno, auth);
1041 return;
1042 }
1043
1044 if (__predict_false(spc->spc_hdr.rsp_type == RUMPSP_HANDSHAKE)) {
1045 int inexec;
1046
1047 if (spc->spc_hdr.rsp_handshake != HANDSHAKE_EXEC) {
1048 send_error_resp(spc, reqno, EINVAL);
1049 shutdown(spc->spc_fd, SHUT_RDWR);
1050 spcfreebuf(spc);
1051 return;
1052 }
1053
1054 pthread_mutex_lock(&spc->spc_mtx);
1055 inexec = spc->spc_inexec;
1056 pthread_mutex_unlock(&spc->spc_mtx);
1057 if (inexec) {
1058 send_error_resp(spc, reqno, EBUSY);
1059 shutdown(spc->spc_fd, SHUT_RDWR);
1060 spcfreebuf(spc);
1061 return;
1062 }
1063
1064 pthread_mutex_lock(&spc->spc_mtx);
1065 spc->spc_inexec = 1;
1066 pthread_mutex_unlock(&spc->spc_mtx);
1067
1068 /*
1069 * start to drain lwps. we will wait for it to finish
1070 * in another thread
1071 */
1072 lwproc_switch(spc->spc_mainlwp);
1073 lwproc_lwpexit();
1074 lwproc_switch(NULL);
1075
1076 /*
1077 * exec has to wait for lwps to drain, so finish it off
1078 * in another thread
1079 */
1080 schedulework(spc, SBA_EXEC);
1081 return;
1082 }
1083
1084 if (__predict_false(spc->spc_hdr.rsp_type != RUMPSP_SYSCALL)) {
1085 send_error_resp(spc, reqno, EINVAL);
1086 spcfreebuf(spc);
1087 return;
1088 }
1089
1090 schedulework(spc, SBA_SYSCALL);
1091 }
1092
1093 static void *
1094 spserver(void *arg)
1095 {
1096 struct spservarg *sarg = arg;
1097 struct spclient *spc;
1098 unsigned idx;
1099 int seen;
1100 int rv;
1101 unsigned int nfds, maxidx;
1102
1103 for (idx = 0; idx < MAXCLI; idx++) {
1104 pfdlist[idx].fd = -1;
1105 pfdlist[idx].events = POLLIN;
1106
1107 spc = &spclist[idx];
1108 pthread_mutex_init(&spc->spc_mtx, NULL);
1109 pthread_cond_init(&spc->spc_cv, NULL);
1110 spc->spc_fd = -1;
1111 }
1112 pfdlist[0].fd = spclist[0].spc_fd = sarg->sps_sock;
1113 pfdlist[0].events = POLLIN;
1114 nfds = 1;
1115 maxidx = 0;
1116
1117 pthread_attr_init(&pattr_detached);
1118 pthread_attr_setdetachstate(&pattr_detached, PTHREAD_CREATE_DETACHED);
1119 /* XXX: doesn't stacksize currently work on NetBSD */
1120 pthread_attr_setstacksize(&pattr_detached, 32*1024);
1121
1122 pthread_mutex_init(&sbamtx, NULL);
1123 pthread_cond_init(&sbacv, NULL);
1124
1125 DPRINTF(("rump_sp: server mainloop\n"));
1126
1127 for (;;) {
1128 int discoed;
1129
1130 /* g/c hangarounds (eventually) */
1131 discoed = atomic_swap_uint(&disco, 0);
1132 while (discoed--) {
1133 nfds--;
1134 idx = maxidx;
1135 while (idx) {
1136 if (pfdlist[idx].fd != -1) {
1137 maxidx = idx;
1138 break;
1139 }
1140 idx--;
1141 }
1142 DPRINTF(("rump_sp: set maxidx to [%u]\n",
1143 maxidx));
1144 }
1145
1146 DPRINTF(("rump_sp: loop nfd %d\n", maxidx+1));
1147 seen = 0;
1148 rv = poll(pfdlist, maxidx+1, INFTIM);
1149 assert(maxidx+1 <= MAXCLI);
1150 assert(rv != 0);
1151 if (rv == -1) {
1152 if (errno == EINTR)
1153 continue;
1154 fprintf(stderr, "rump_spserver: poll returned %d\n",
1155 errno);
1156 break;
1157 }
1158
1159 for (idx = 0; seen < rv && idx < MAXCLI; idx++) {
1160 if ((pfdlist[idx].revents & POLLIN) == 0)
1161 continue;
1162
1163 seen++;
1164 DPRINTF(("rump_sp: activity at [%u] %d/%d\n",
1165 idx, seen, rv));
1166 if (idx > 0) {
1167 spc = &spclist[idx];
1168 DPRINTF(("rump_sp: mainloop read [%u]\n", idx));
1169 switch (readframe(spc)) {
1170 case 0:
1171 break;
1172 case -1:
1173 serv_handledisco(idx);
1174 break;
1175 default:
1176 switch (spc->spc_hdr.rsp_class) {
1177 case RUMPSP_RESP:
1178 kickwaiter(spc);
1179 break;
1180 case RUMPSP_REQ:
1181 handlereq(spc);
1182 break;
1183 default:
1184 send_error_resp(spc,
1185 spc->spc_hdr.rsp_reqno,
1186 ENOENT);
1187 spcfreebuf(spc);
1188 break;
1189 }
1190 break;
1191 }
1192
1193 } else {
1194 DPRINTF(("rump_sp: mainloop new connection\n"));
1195
1196 if (__predict_false(spfini)) {
1197 close(spclist[0].spc_fd);
1198 serv_shutdown();
1199 goto out;
1200 }
1201
1202 idx = serv_handleconn(pfdlist[0].fd,
1203 sarg->sps_connhook, nfds == MAXCLI);
1204 if (idx)
1205 nfds++;
1206 if (idx > maxidx)
1207 maxidx = idx;
1208 DPRINTF(("rump_sp: maxid now %d\n", maxidx));
1209 }
1210 }
1211 }
1212
1213 out:
1214 return NULL;
1215 }
1216
1217 static unsigned cleanupidx;
1218 static struct sockaddr *cleanupsa;
1219 int
1220 rumpuser_sp_init(const char *url, const struct rumpuser_sp_ops *spopsp,
1221 const char *ostype, const char *osrelease, const char *machine)
1222 {
1223 pthread_t pt;
1224 struct spservarg *sarg;
1225 struct sockaddr *sap;
1226 char *p;
1227 unsigned idx;
1228 int error, s;
1229
1230 p = strdup(url);
1231 if (p == NULL)
1232 return ENOMEM;
1233 error = parseurl(p, &sap, &idx, 1);
1234 free(p);
1235 if (error)
1236 return error;
1237
1238 snprintf(banner, sizeof(banner), "RUMPSP-%d.%d-%s-%s/%s\n",
1239 PROTOMAJOR, PROTOMINOR, ostype, osrelease, machine);
1240
1241 s = socket(parsetab[idx].domain, SOCK_STREAM, 0);
1242 if (s == -1)
1243 return errno;
1244
1245 spops = *spopsp;
1246 sarg = malloc(sizeof(*sarg));
1247 if (sarg == NULL) {
1248 close(s);
1249 return ENOMEM;
1250 }
1251
1252 sarg->sps_sock = s;
1253 sarg->sps_connhook = parsetab[idx].connhook;
1254
1255 cleanupidx = idx;
1256 cleanupsa = sap;
1257
1258 /* sloppy error recovery */
1259
1260 /*LINTED*/
1261 if (bind(s, sap, sap->sa_len) == -1) {
1262 fprintf(stderr, "rump_sp: server bind failed\n");
1263 return errno;
1264 }
1265
1266 if (listen(s, MAXCLI) == -1) {
1267 fprintf(stderr, "rump_sp: server listen failed\n");
1268 return errno;
1269 }
1270
1271 if ((error = pthread_create(&pt, NULL, spserver, sarg)) != 0) {
1272 fprintf(stderr, "rump_sp: cannot create wrkr thread\n");
1273 return errno;
1274 }
1275 pthread_detach(pt);
1276
1277 return 0;
1278 }
1279
1280 void
1281 rumpuser_sp_fini(void *arg)
1282 {
1283 struct spclient *spc = arg;
1284 register_t retval[2] = {0, 0};
1285
1286 if (spclist[0].spc_fd) {
1287 parsetab[cleanupidx].cleanup(cleanupsa);
1288 }
1289
1290 /*
1291 * stuff response into the socket, since this process is just
1292 * about to exit
1293 */
1294 if (spc && spc->spc_syscallreq)
1295 send_syscall_resp(spc, spc->spc_syscallreq, 0, retval);
1296
1297 if (spclist[0].spc_fd) {
1298 shutdown(spclist[0].spc_fd, SHUT_RDWR);
1299 spfini = 1;
1300 }
1301 }
1302