rumpuser_sp.c revision 1.47 1 /* $NetBSD: rumpuser_sp.c,v 1.47 2012/07/27 09:09:05 pooka Exp $ */
2
3 /*
4 * Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
16 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18 * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
21 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 * SUCH DAMAGE.
26 */
27
28 /*
29 * Sysproxy routines. This provides system RPC support over host sockets.
30 * The most notable limitation is that the client and server must share
31 * the same ABI. This does not mean that they have to be the same
32 * machine or that they need to run the same version of the host OS,
33 * just that they must agree on the data structures. This even *might*
34 * work correctly from one hardware architecture to another.
35 */
36
37 #include "rumpuser_port.h"
38
39 #if !defined(lint)
40 __RCSID("$NetBSD: rumpuser_sp.c,v 1.47 2012/07/27 09:09:05 pooka Exp $");
41 #endif /* !lint */
42
43 #include <sys/types.h>
44 #include <sys/mman.h>
45 #include <sys/socket.h>
46
47 #include <arpa/inet.h>
48 #include <netinet/in.h>
49 #include <netinet/tcp.h>
50
51 #include <assert.h>
52 #include <errno.h>
53 #include <fcntl.h>
54 #include <poll.h>
55 #include <pthread.h>
56 #include <stdarg.h>
57 #include <stdio.h>
58 #include <stdlib.h>
59 #include <string.h>
60 #include <unistd.h>
61
62 #include <rump/rump.h> /* XXX: for rfork flags */
63 #include <rump/rumpuser.h>
64
65 #include "rumpuser_int.h"
66
67 #include "sp_common.c"
68
69 #ifndef MAXCLI
70 #define MAXCLI 256
71 #endif
72 #ifndef MAXWORKER
73 #define MAXWORKER 128
74 #endif
75 #ifndef IDLEWORKER
76 #define IDLEWORKER 16
77 #endif
78 int rumpsp_maxworker = MAXWORKER;
79 int rumpsp_idleworker = IDLEWORKER;
80
81 static struct pollfd pfdlist[MAXCLI];
82 static struct spclient spclist[MAXCLI];
83 static unsigned int disco;
84 static volatile int spfini;
85
86 static struct rumpuser_sp_ops spops;
87
88 static char banner[MAXBANNER];
89
90 #define PROTOMAJOR 0
91 #define PROTOMINOR 3
92
93
94 /* how to use atomic ops on Linux? */
95 #ifdef __linux__
96 static pthread_mutex_t discomtx = PTHREAD_MUTEX_INITIALIZER;
97
98 static void
99 signaldisco(void)
100 {
101
102 pthread_mutex_lock(&discomtx);
103 disco++;
104 pthread_mutex_unlock(&discomtx);
105 }
106
107 static unsigned int
108 getdisco(void)
109 {
110 unsigned int discocnt;
111
112 pthread_mutex_lock(&discomtx);
113 discocnt = disco;
114 disco = 0;
115 pthread_mutex_unlock(&discomtx);
116
117 return discocnt;
118 }
119
120 #else /* NetBSD */
121
122 #include <sys/atomic.h>
123 #define signaldisco() atomic_inc_uint(&disco)
124 #define getdisco() atomic_swap_uint(&disco, 0)
125
126 #endif
127
128
129 struct prefork {
130 uint32_t pf_auth[AUTHLEN];
131 struct lwp *pf_lwp;
132
133 LIST_ENTRY(prefork) pf_entries; /* global list */
134 LIST_ENTRY(prefork) pf_spcentries; /* linked from forking spc */
135 };
136 static LIST_HEAD(, prefork) preforks = LIST_HEAD_INITIALIZER(preforks);
137 static pthread_mutex_t pfmtx;
138
139 /*
140 * This version is for the server. It's optimized for multiple threads
141 * and is *NOT* reentrant wrt to signals.
142 */
143 static int
144 waitresp(struct spclient *spc, struct respwait *rw)
145 {
146 int spcstate;
147 int rv = 0;
148
149 pthread_mutex_lock(&spc->spc_mtx);
150 sendunlockl(spc);
151 while (!rw->rw_done && spc->spc_state != SPCSTATE_DYING) {
152 pthread_cond_wait(&rw->rw_cv, &spc->spc_mtx);
153 }
154 TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries);
155 spcstate = spc->spc_state;
156 pthread_mutex_unlock(&spc->spc_mtx);
157
158 pthread_cond_destroy(&rw->rw_cv);
159
160 if (rv)
161 return rv;
162 if (spcstate == SPCSTATE_DYING)
163 return ENOTCONN;
164 return rw->rw_error;
165 }
166
167 /*
168 * Manual wrappers, since librump does not have access to the
169 * user namespace wrapped interfaces.
170 */
171
172 static void
173 lwproc_switch(struct lwp *l)
174 {
175
176 spops.spop_schedule();
177 spops.spop_lwproc_switch(l);
178 spops.spop_unschedule();
179 }
180
181 static void
182 lwproc_release(void)
183 {
184
185 spops.spop_schedule();
186 spops.spop_lwproc_release();
187 spops.spop_unschedule();
188 }
189
190 static int
191 lwproc_rfork(struct spclient *spc, int flags, const char *comm)
192 {
193 int rv;
194
195 spops.spop_schedule();
196 rv = spops.spop_lwproc_rfork(spc, flags, comm);
197 spops.spop_unschedule();
198
199 return rv;
200 }
201
202 static int
203 lwproc_newlwp(pid_t pid)
204 {
205 int rv;
206
207 spops.spop_schedule();
208 rv = spops.spop_lwproc_newlwp(pid);
209 spops.spop_unschedule();
210
211 return rv;
212 }
213
214 static struct lwp *
215 lwproc_curlwp(void)
216 {
217 struct lwp *l;
218
219 spops.spop_schedule();
220 l = spops.spop_lwproc_curlwp();
221 spops.spop_unschedule();
222
223 return l;
224 }
225
226 static pid_t
227 lwproc_getpid(void)
228 {
229 pid_t p;
230
231 spops.spop_schedule();
232 p = spops.spop_getpid();
233 spops.spop_unschedule();
234
235 return p;
236 }
237
238 static void
239 lwproc_execnotify(const char *comm)
240 {
241
242 spops.spop_schedule();
243 spops.spop_execnotify(comm);
244 spops.spop_unschedule();
245 }
246
247 static void
248 lwproc_lwpexit(void)
249 {
250
251 spops.spop_schedule();
252 spops.spop_lwpexit();
253 spops.spop_unschedule();
254 }
255
256 static int
257 rumpsyscall(int sysnum, void *data, register_t *retval)
258 {
259 int rv;
260
261 spops.spop_schedule();
262 rv = spops.spop_syscall(sysnum, data, retval);
263 spops.spop_unschedule();
264
265 return rv;
266 }
267
268 static uint64_t
269 nextreq(struct spclient *spc)
270 {
271 uint64_t nw;
272
273 pthread_mutex_lock(&spc->spc_mtx);
274 nw = spc->spc_nextreq++;
275 pthread_mutex_unlock(&spc->spc_mtx);
276
277 return nw;
278 }
279
280 /*
281 * XXX: we send responses with "blocking" I/O. This is not
282 * ok for the main thread. XXXFIXME
283 */
284
285 static void
286 send_error_resp(struct spclient *spc, uint64_t reqno, int error)
287 {
288 struct rsp_hdr rhdr;
289 struct iovec iov[1];
290
291 rhdr.rsp_len = sizeof(rhdr);
292 rhdr.rsp_reqno = reqno;
293 rhdr.rsp_class = RUMPSP_ERROR;
294 rhdr.rsp_type = 0;
295 rhdr.rsp_error = error;
296
297 IOVPUT(iov[0], rhdr);
298
299 sendlock(spc);
300 (void)SENDIOV(spc, iov);
301 sendunlock(spc);
302 }
303
304 static int
305 send_handshake_resp(struct spclient *spc, uint64_t reqno, int error)
306 {
307 struct rsp_hdr rhdr;
308 struct iovec iov[2];
309 int rv;
310
311 rhdr.rsp_len = sizeof(rhdr) + sizeof(error);
312 rhdr.rsp_reqno = reqno;
313 rhdr.rsp_class = RUMPSP_RESP;
314 rhdr.rsp_type = RUMPSP_HANDSHAKE;
315 rhdr.rsp_error = 0;
316
317 IOVPUT(iov[0], rhdr);
318 IOVPUT(iov[1], error);
319
320 sendlock(spc);
321 rv = SENDIOV(spc, iov);
322 sendunlock(spc);
323
324 return rv;
325 }
326
327 static int
328 send_syscall_resp(struct spclient *spc, uint64_t reqno, int error,
329 register_t *retval)
330 {
331 struct rsp_hdr rhdr;
332 struct rsp_sysresp sysresp;
333 struct iovec iov[2];
334 int rv;
335
336 rhdr.rsp_len = sizeof(rhdr) + sizeof(sysresp);
337 rhdr.rsp_reqno = reqno;
338 rhdr.rsp_class = RUMPSP_RESP;
339 rhdr.rsp_type = RUMPSP_SYSCALL;
340 rhdr.rsp_sysnum = 0;
341
342 sysresp.rsys_error = error;
343 memcpy(sysresp.rsys_retval, retval, sizeof(sysresp.rsys_retval));
344
345 IOVPUT(iov[0], rhdr);
346 IOVPUT(iov[1], sysresp);
347
348 sendlock(spc);
349 rv = SENDIOV(spc, iov);
350 sendunlock(spc);
351
352 return rv;
353 }
354
355 static int
356 send_prefork_resp(struct spclient *spc, uint64_t reqno, uint32_t *auth)
357 {
358 struct rsp_hdr rhdr;
359 struct iovec iov[2];
360 int rv;
361
362 rhdr.rsp_len = sizeof(rhdr) + AUTHLEN*sizeof(*auth);
363 rhdr.rsp_reqno = reqno;
364 rhdr.rsp_class = RUMPSP_RESP;
365 rhdr.rsp_type = RUMPSP_PREFORK;
366 rhdr.rsp_sysnum = 0;
367
368 IOVPUT(iov[0], rhdr);
369 IOVPUT_WITHSIZE(iov[1], auth, AUTHLEN*sizeof(*auth));
370
371 sendlock(spc);
372 rv = SENDIOV(spc, iov);
373 sendunlock(spc);
374
375 return rv;
376 }
377
378 static int
379 copyin_req(struct spclient *spc, const void *remaddr, size_t *dlen,
380 int wantstr, void **resp)
381 {
382 struct rsp_hdr rhdr;
383 struct rsp_copydata copydata;
384 struct respwait rw;
385 struct iovec iov[2];
386 int rv;
387
388 DPRINTF(("copyin_req: %zu bytes from %p\n", *dlen, remaddr));
389
390 rhdr.rsp_len = sizeof(rhdr) + sizeof(copydata);
391 rhdr.rsp_class = RUMPSP_REQ;
392 if (wantstr)
393 rhdr.rsp_type = RUMPSP_COPYINSTR;
394 else
395 rhdr.rsp_type = RUMPSP_COPYIN;
396 rhdr.rsp_sysnum = 0;
397
398 copydata.rcp_addr = __UNCONST(remaddr);
399 copydata.rcp_len = *dlen;
400
401 IOVPUT(iov[0], rhdr);
402 IOVPUT(iov[1], copydata);
403
404 putwait(spc, &rw, &rhdr);
405 rv = SENDIOV(spc, iov);
406 if (rv) {
407 unputwait(spc, &rw);
408 return rv;
409 }
410
411 rv = waitresp(spc, &rw);
412
413 DPRINTF(("copyin: response %d\n", rv));
414
415 *resp = rw.rw_data;
416 if (wantstr)
417 *dlen = rw.rw_dlen;
418
419 return rv;
420
421 }
422
423 static int
424 send_copyout_req(struct spclient *spc, const void *remaddr,
425 const void *data, size_t dlen)
426 {
427 struct rsp_hdr rhdr;
428 struct rsp_copydata copydata;
429 struct iovec iov[3];
430 int rv;
431
432 DPRINTF(("copyout_req (async): %zu bytes to %p\n", dlen, remaddr));
433
434 rhdr.rsp_len = sizeof(rhdr) + sizeof(copydata) + dlen;
435 rhdr.rsp_reqno = nextreq(spc);
436 rhdr.rsp_class = RUMPSP_REQ;
437 rhdr.rsp_type = RUMPSP_COPYOUT;
438 rhdr.rsp_sysnum = 0;
439
440 copydata.rcp_addr = __UNCONST(remaddr);
441 copydata.rcp_len = dlen;
442
443 IOVPUT(iov[0], rhdr);
444 IOVPUT(iov[1], copydata);
445 IOVPUT_WITHSIZE(iov[2], __UNCONST(data), dlen);
446
447 sendlock(spc);
448 rv = SENDIOV(spc, iov);
449 sendunlock(spc);
450
451 return rv;
452 }
453
454 static int
455 anonmmap_req(struct spclient *spc, size_t howmuch, void **resp)
456 {
457 struct rsp_hdr rhdr;
458 struct respwait rw;
459 struct iovec iov[2];
460 int rv;
461
462 DPRINTF(("anonmmap_req: %zu bytes\n", howmuch));
463
464 rhdr.rsp_len = sizeof(rhdr) + sizeof(howmuch);
465 rhdr.rsp_class = RUMPSP_REQ;
466 rhdr.rsp_type = RUMPSP_ANONMMAP;
467 rhdr.rsp_sysnum = 0;
468
469 IOVPUT(iov[0], rhdr);
470 IOVPUT(iov[1], howmuch);
471
472 putwait(spc, &rw, &rhdr);
473 rv = SENDIOV(spc, iov);
474 if (rv) {
475 unputwait(spc, &rw);
476 return rv;
477 }
478
479 rv = waitresp(spc, &rw);
480
481 *resp = rw.rw_data;
482
483 DPRINTF(("anonmmap: mapped at %p\n", **(void ***)resp));
484
485 return rv;
486 }
487
488 static int
489 send_raise_req(struct spclient *spc, int signo)
490 {
491 struct rsp_hdr rhdr;
492 struct iovec iov[1];
493 int rv;
494
495 rhdr.rsp_len = sizeof(rhdr);
496 rhdr.rsp_class = RUMPSP_REQ;
497 rhdr.rsp_type = RUMPSP_RAISE;
498 rhdr.rsp_signo = signo;
499
500 IOVPUT(iov[0], rhdr);
501
502 sendlock(spc);
503 rv = SENDIOV(spc, iov);
504 sendunlock(spc);
505
506 return rv;
507 }
508
509 static void
510 spcref(struct spclient *spc)
511 {
512
513 pthread_mutex_lock(&spc->spc_mtx);
514 spc->spc_refcnt++;
515 pthread_mutex_unlock(&spc->spc_mtx);
516 }
517
518 static void
519 spcrelease(struct spclient *spc)
520 {
521 int ref;
522
523 pthread_mutex_lock(&spc->spc_mtx);
524 ref = --spc->spc_refcnt;
525 if (__predict_false(spc->spc_inexec && ref <= 2))
526 pthread_cond_broadcast(&spc->spc_cv);
527 pthread_mutex_unlock(&spc->spc_mtx);
528
529 if (ref > 0)
530 return;
531
532 DPRINTF(("rump_sp: spcrelease: spc %p fd %d\n", spc, spc->spc_fd));
533
534 _DIAGASSERT(TAILQ_EMPTY(&spc->spc_respwait));
535 _DIAGASSERT(spc->spc_buf == NULL);
536
537 if (spc->spc_mainlwp) {
538 lwproc_switch(spc->spc_mainlwp);
539 lwproc_release();
540 }
541 spc->spc_mainlwp = NULL;
542
543 close(spc->spc_fd);
544 spc->spc_fd = -1;
545 spc->spc_state = SPCSTATE_NEW;
546
547 signaldisco();
548 }
549
550 static void
551 serv_handledisco(unsigned int idx)
552 {
553 struct spclient *spc = &spclist[idx];
554 int dolwpexit;
555
556 DPRINTF(("rump_sp: disconnecting [%u]\n", idx));
557
558 pfdlist[idx].fd = -1;
559 pfdlist[idx].revents = 0;
560 pthread_mutex_lock(&spc->spc_mtx);
561 spc->spc_state = SPCSTATE_DYING;
562 kickall(spc);
563 sendunlockl(spc);
564 /* exec uses mainlwp in another thread, but also nuked all lwps */
565 dolwpexit = !spc->spc_inexec;
566 pthread_mutex_unlock(&spc->spc_mtx);
567
568 if (dolwpexit && spc->spc_mainlwp) {
569 lwproc_switch(spc->spc_mainlwp);
570 lwproc_lwpexit();
571 lwproc_switch(NULL);
572 }
573
574 /*
575 * Nobody's going to attempt to send/receive anymore,
576 * so reinit info relevant to that.
577 */
578 /*LINTED:pointer casts may be ok*/
579 memset((char *)spc + SPC_ZEROFF, 0, sizeof(*spc) - SPC_ZEROFF);
580
581 spcrelease(spc);
582 }
583
584 static void
585 serv_shutdown(void)
586 {
587 struct spclient *spc;
588 unsigned int i;
589
590 for (i = 1; i < MAXCLI; i++) {
591 spc = &spclist[i];
592 if (spc->spc_fd == -1)
593 continue;
594
595 shutdown(spc->spc_fd, SHUT_RDWR);
596 serv_handledisco(i);
597
598 spcrelease(spc);
599 }
600 }
601
602 static unsigned
603 serv_handleconn(int fd, connecthook_fn connhook, int busy)
604 {
605 struct sockaddr_storage ss;
606 socklen_t sl = sizeof(ss);
607 int newfd, flags;
608 unsigned i;
609
610 /*LINTED: cast ok */
611 newfd = accept(fd, (struct sockaddr *)&ss, &sl);
612 if (newfd == -1)
613 return 0;
614
615 if (busy) {
616 close(newfd); /* EBUSY */
617 return 0;
618 }
619
620 flags = fcntl(newfd, F_GETFL, 0);
621 if (fcntl(newfd, F_SETFL, flags | O_NONBLOCK) == -1) {
622 close(newfd);
623 return 0;
624 }
625
626 if (connhook(newfd) != 0) {
627 close(newfd);
628 return 0;
629 }
630
631 /* write out a banner for the client */
632 if (send(newfd, banner, strlen(banner), MSG_NOSIGNAL)
633 != (ssize_t)strlen(banner)) {
634 close(newfd);
635 return 0;
636 }
637
638 /* find empty slot the simple way */
639 for (i = 0; i < MAXCLI; i++) {
640 if (pfdlist[i].fd == -1 && spclist[i].spc_state == SPCSTATE_NEW)
641 break;
642 }
643
644 assert(i < MAXCLI);
645
646 pfdlist[i].fd = newfd;
647 spclist[i].spc_fd = newfd;
648 spclist[i].spc_istatus = SPCSTATUS_BUSY; /* dedicated receiver */
649 spclist[i].spc_refcnt = 1;
650
651 TAILQ_INIT(&spclist[i].spc_respwait);
652
653 DPRINTF(("rump_sp: added new connection fd %d at idx %u\n", newfd, i));
654
655 return i;
656 }
657
658 static void
659 serv_handlesyscall(struct spclient *spc, struct rsp_hdr *rhdr, uint8_t *data)
660 {
661 register_t retval[2] = {0, 0};
662 int rv, sysnum;
663
664 sysnum = (int)rhdr->rsp_sysnum;
665 DPRINTF(("rump_sp: handling syscall %d from client %d\n",
666 sysnum, spc->spc_pid));
667
668 if (__predict_false((rv = lwproc_newlwp(spc->spc_pid)) != 0)) {
669 retval[0] = -1;
670 send_syscall_resp(spc, rhdr->rsp_reqno, rv, retval);
671 return;
672 }
673 spc->spc_syscallreq = rhdr->rsp_reqno;
674 rv = rumpsyscall(sysnum, data, retval);
675 spc->spc_syscallreq = 0;
676 lwproc_release();
677
678 DPRINTF(("rump_sp: got return value %d & %d/%d\n",
679 rv, retval[0], retval[1]));
680
681 send_syscall_resp(spc, rhdr->rsp_reqno, rv, retval);
682 }
683
684 static void
685 serv_handleexec(struct spclient *spc, struct rsp_hdr *rhdr, char *comm)
686 {
687 size_t commlen = rhdr->rsp_len - HDRSZ;
688
689 pthread_mutex_lock(&spc->spc_mtx);
690 /* one for the connection and one for us */
691 while (spc->spc_refcnt > 2)
692 pthread_cond_wait(&spc->spc_cv, &spc->spc_mtx);
693 pthread_mutex_unlock(&spc->spc_mtx);
694
695 /*
696 * ok, all the threads are dead (or one is still alive and
697 * the connection is dead, in which case this doesn't matter
698 * very much). proceed with exec.
699 */
700
701 /* ensure comm is 0-terminated */
702 /* TODO: make sure it contains sensible chars? */
703 comm[commlen] = '\0';
704
705 lwproc_switch(spc->spc_mainlwp);
706 lwproc_execnotify(comm);
707 lwproc_switch(NULL);
708
709 pthread_mutex_lock(&spc->spc_mtx);
710 spc->spc_inexec = 0;
711 pthread_mutex_unlock(&spc->spc_mtx);
712 send_handshake_resp(spc, rhdr->rsp_reqno, 0);
713 }
714
715 enum sbatype { SBA_SYSCALL, SBA_EXEC };
716
717 struct servbouncearg {
718 struct spclient *sba_spc;
719 struct rsp_hdr sba_hdr;
720 enum sbatype sba_type;
721 uint8_t *sba_data;
722
723 TAILQ_ENTRY(servbouncearg) sba_entries;
724 };
725 static pthread_mutex_t sbamtx;
726 static pthread_cond_t sbacv;
727 static int nworker, idleworker, nwork;
728 static TAILQ_HEAD(, servbouncearg) wrklist = TAILQ_HEAD_INITIALIZER(wrklist);
729
730 /*ARGSUSED*/
731 static void *
732 serv_workbouncer(void *arg)
733 {
734 struct servbouncearg *sba;
735
736 for (;;) {
737 pthread_mutex_lock(&sbamtx);
738 if (__predict_false(idleworker - nwork >= rumpsp_idleworker)) {
739 nworker--;
740 pthread_mutex_unlock(&sbamtx);
741 break;
742 }
743 idleworker++;
744 while (TAILQ_EMPTY(&wrklist)) {
745 _DIAGASSERT(nwork == 0);
746 pthread_cond_wait(&sbacv, &sbamtx);
747 }
748 idleworker--;
749
750 sba = TAILQ_FIRST(&wrklist);
751 TAILQ_REMOVE(&wrklist, sba, sba_entries);
752 nwork--;
753 pthread_mutex_unlock(&sbamtx);
754
755 if (__predict_true(sba->sba_type == SBA_SYSCALL)) {
756 serv_handlesyscall(sba->sba_spc,
757 &sba->sba_hdr, sba->sba_data);
758 } else {
759 _DIAGASSERT(sba->sba_type == SBA_EXEC);
760 serv_handleexec(sba->sba_spc, &sba->sba_hdr,
761 (char *)sba->sba_data);
762 }
763 spcrelease(sba->sba_spc);
764 free(sba->sba_data);
765 free(sba);
766 }
767
768 return NULL;
769 }
770
771 static int
772 sp_copyin(void *arg, const void *raddr, void *laddr, size_t *len, int wantstr)
773 {
774 struct spclient *spc = arg;
775 void *rdata = NULL; /* XXXuninit */
776 int rv, nlocks;
777
778 rumpuser__kunlock(0, &nlocks, NULL);
779
780 rv = copyin_req(spc, raddr, len, wantstr, &rdata);
781 if (rv)
782 goto out;
783
784 memcpy(laddr, rdata, *len);
785 free(rdata);
786
787 out:
788 rumpuser__klock(nlocks, NULL);
789 if (rv)
790 return EFAULT;
791 return 0;
792 }
793
794 int
795 rumpuser_sp_copyin(void *arg, const void *raddr, void *laddr, size_t len)
796 {
797
798 return sp_copyin(arg, raddr, laddr, &len, 0);
799 }
800
801 int
802 rumpuser_sp_copyinstr(void *arg, const void *raddr, void *laddr, size_t *len)
803 {
804
805 return sp_copyin(arg, raddr, laddr, len, 1);
806 }
807
808 static int
809 sp_copyout(void *arg, const void *laddr, void *raddr, size_t dlen)
810 {
811 struct spclient *spc = arg;
812 int nlocks, rv;
813
814 rumpuser__kunlock(0, &nlocks, NULL);
815 rv = send_copyout_req(spc, raddr, laddr, dlen);
816 rumpuser__klock(nlocks, NULL);
817
818 if (rv)
819 return EFAULT;
820 return 0;
821 }
822
823 int
824 rumpuser_sp_copyout(void *arg, const void *laddr, void *raddr, size_t dlen)
825 {
826
827 return sp_copyout(arg, laddr, raddr, dlen);
828 }
829
830 int
831 rumpuser_sp_copyoutstr(void *arg, const void *laddr, void *raddr, size_t *dlen)
832 {
833
834 return sp_copyout(arg, laddr, raddr, *dlen);
835 }
836
837 int
838 rumpuser_sp_anonmmap(void *arg, size_t howmuch, void **addr)
839 {
840 struct spclient *spc = arg;
841 void *resp, *rdata;
842 int nlocks, rv;
843
844 rumpuser__kunlock(0, &nlocks, NULL);
845
846 rv = anonmmap_req(spc, howmuch, &rdata);
847 if (rv) {
848 rv = EFAULT;
849 goto out;
850 }
851
852 resp = *(void **)rdata;
853 free(rdata);
854
855 if (resp == NULL) {
856 rv = ENOMEM;
857 }
858
859 *addr = resp;
860
861 out:
862 rumpuser__klock(nlocks, NULL);
863
864 if (rv)
865 return rv;
866 return 0;
867 }
868
869 int
870 rumpuser_sp_raise(void *arg, int signo)
871 {
872 struct spclient *spc = arg;
873 int rv, nlocks;
874
875 rumpuser__kunlock(0, &nlocks, NULL);
876 rv = send_raise_req(spc, signo);
877 rumpuser__klock(nlocks, NULL);
878
879 return rv;
880 }
881
882 static pthread_attr_t pattr_detached;
883 static void
884 schedulework(struct spclient *spc, enum sbatype sba_type)
885 {
886 struct servbouncearg *sba;
887 pthread_t pt;
888 uint64_t reqno;
889 int retries = 0;
890
891 reqno = spc->spc_hdr.rsp_reqno;
892 while ((sba = malloc(sizeof(*sba))) == NULL) {
893 if (nworker == 0 || retries > 10) {
894 send_error_resp(spc, reqno, EAGAIN);
895 spcfreebuf(spc);
896 return;
897 }
898 /* slim chance of more memory? */
899 usleep(10000);
900 }
901
902 sba->sba_spc = spc;
903 sba->sba_type = sba_type;
904 sba->sba_hdr = spc->spc_hdr;
905 sba->sba_data = spc->spc_buf;
906 spcresetbuf(spc);
907
908 spcref(spc);
909
910 pthread_mutex_lock(&sbamtx);
911 TAILQ_INSERT_TAIL(&wrklist, sba, sba_entries);
912 nwork++;
913 if (nwork <= idleworker) {
914 /* do we have a daemon's tool (i.e. idle threads)? */
915 pthread_cond_signal(&sbacv);
916 } else if (nworker < rumpsp_maxworker) {
917 /*
918 * Else, need to create one
919 * (if we can, otherwise just expect another
920 * worker to pick up the syscall)
921 */
922 if (pthread_create(&pt, &pattr_detached,
923 serv_workbouncer, NULL) == 0) {
924 nworker++;
925 }
926 }
927 pthread_mutex_unlock(&sbamtx);
928 }
929
930 /*
931 *
932 * Startup routines and mainloop for server.
933 *
934 */
935
936 struct spservarg {
937 int sps_sock;
938 connecthook_fn sps_connhook;
939 };
940
941 static void
942 handlereq(struct spclient *spc)
943 {
944 uint64_t reqno;
945 int error, i;
946
947 reqno = spc->spc_hdr.rsp_reqno;
948 if (__predict_false(spc->spc_state == SPCSTATE_NEW)) {
949 if (spc->spc_hdr.rsp_type != RUMPSP_HANDSHAKE) {
950 send_error_resp(spc, reqno, EACCES);
951 shutdown(spc->spc_fd, SHUT_RDWR);
952 spcfreebuf(spc);
953 return;
954 }
955
956 if (spc->spc_hdr.rsp_handshake == HANDSHAKE_GUEST) {
957 char *comm = (char *)spc->spc_buf;
958 size_t commlen = spc->spc_hdr.rsp_len - HDRSZ;
959
960 /* ensure it's 0-terminated */
961 /* XXX make sure it contains sensible chars? */
962 comm[commlen] = '\0';
963
964 if ((error = lwproc_rfork(spc,
965 RUMP_RFCFDG, comm)) != 0) {
966 shutdown(spc->spc_fd, SHUT_RDWR);
967 }
968
969 spcfreebuf(spc);
970 if (error)
971 return;
972
973 spc->spc_mainlwp = lwproc_curlwp();
974
975 send_handshake_resp(spc, reqno, 0);
976 } else if (spc->spc_hdr.rsp_handshake == HANDSHAKE_FORK) {
977 struct lwp *tmpmain;
978 struct prefork *pf;
979 struct handshake_fork *rfp;
980 int cancel;
981
982 if (spc->spc_off-HDRSZ != sizeof(*rfp)) {
983 send_error_resp(spc, reqno, EINVAL);
984 shutdown(spc->spc_fd, SHUT_RDWR);
985 spcfreebuf(spc);
986 return;
987 }
988
989 /*LINTED*/
990 rfp = (void *)spc->spc_buf;
991 cancel = rfp->rf_cancel;
992
993 pthread_mutex_lock(&pfmtx);
994 LIST_FOREACH(pf, &preforks, pf_entries) {
995 if (memcmp(rfp->rf_auth, pf->pf_auth,
996 sizeof(rfp->rf_auth)) == 0) {
997 LIST_REMOVE(pf, pf_entries);
998 LIST_REMOVE(pf, pf_spcentries);
999 break;
1000 }
1001 }
1002 pthread_mutex_lock(&pfmtx);
1003 spcfreebuf(spc);
1004
1005 if (!pf) {
1006 send_error_resp(spc, reqno, ESRCH);
1007 shutdown(spc->spc_fd, SHUT_RDWR);
1008 return;
1009 }
1010
1011 tmpmain = pf->pf_lwp;
1012 free(pf);
1013 lwproc_switch(tmpmain);
1014 if (cancel) {
1015 lwproc_release();
1016 shutdown(spc->spc_fd, SHUT_RDWR);
1017 return;
1018 }
1019
1020 /*
1021 * So, we forked already during "prefork" to save
1022 * the file descriptors from a parent exit
1023 * race condition. But now we need to fork
1024 * a second time since the initial fork has
1025 * the wrong spc pointer. (yea, optimize
1026 * interfaces some day if anyone cares)
1027 */
1028 if ((error = lwproc_rfork(spc, 0, NULL)) != 0) {
1029 send_error_resp(spc, reqno, error);
1030 shutdown(spc->spc_fd, SHUT_RDWR);
1031 lwproc_release();
1032 return;
1033 }
1034 spc->spc_mainlwp = lwproc_curlwp();
1035 lwproc_switch(tmpmain);
1036 lwproc_release();
1037 lwproc_switch(spc->spc_mainlwp);
1038
1039 send_handshake_resp(spc, reqno, 0);
1040 } else {
1041 send_error_resp(spc, reqno, EACCES);
1042 shutdown(spc->spc_fd, SHUT_RDWR);
1043 spcfreebuf(spc);
1044 return;
1045 }
1046
1047 spc->spc_pid = lwproc_getpid();
1048
1049 DPRINTF(("rump_sp: handshake for client %p complete, pid %d\n",
1050 spc, spc->spc_pid));
1051
1052 lwproc_switch(NULL);
1053 spc->spc_state = SPCSTATE_RUNNING;
1054 return;
1055 }
1056
1057 if (__predict_false(spc->spc_hdr.rsp_type == RUMPSP_PREFORK)) {
1058 struct prefork *pf;
1059 uint32_t auth[AUTHLEN];
1060 int inexec;
1061
1062 DPRINTF(("rump_sp: prefork handler executing for %p\n", spc));
1063 spcfreebuf(spc);
1064
1065 pthread_mutex_lock(&spc->spc_mtx);
1066 inexec = spc->spc_inexec;
1067 pthread_mutex_unlock(&spc->spc_mtx);
1068 if (inexec) {
1069 send_error_resp(spc, reqno, EBUSY);
1070 shutdown(spc->spc_fd, SHUT_RDWR);
1071 return;
1072 }
1073
1074 pf = malloc(sizeof(*pf));
1075 if (pf == NULL) {
1076 send_error_resp(spc, reqno, ENOMEM);
1077 return;
1078 }
1079
1080 /*
1081 * Use client main lwp to fork. this is never used by
1082 * worker threads (except in exec, but we checked for that
1083 * above) so we can safely use it here.
1084 */
1085 lwproc_switch(spc->spc_mainlwp);
1086 if ((error = lwproc_rfork(spc, RUMP_RFFDG, NULL)) != 0) {
1087 DPRINTF(("rump_sp: fork failed: %d (%p)\n",error, spc));
1088 send_error_resp(spc, reqno, error);
1089 lwproc_switch(NULL);
1090 free(pf);
1091 return;
1092 }
1093
1094 /* Ok, we have a new process context and a new curlwp */
1095 for (i = 0; i < AUTHLEN; i++) {
1096 pf->pf_auth[i] = auth[i] = arc4random();
1097 }
1098 pf->pf_lwp = lwproc_curlwp();
1099 lwproc_switch(NULL);
1100
1101 pthread_mutex_lock(&pfmtx);
1102 LIST_INSERT_HEAD(&preforks, pf, pf_entries);
1103 LIST_INSERT_HEAD(&spc->spc_pflist, pf, pf_spcentries);
1104 pthread_mutex_unlock(&pfmtx);
1105
1106 DPRINTF(("rump_sp: prefork handler success %p\n", spc));
1107
1108 send_prefork_resp(spc, reqno, auth);
1109 return;
1110 }
1111
1112 if (__predict_false(spc->spc_hdr.rsp_type == RUMPSP_HANDSHAKE)) {
1113 int inexec;
1114
1115 if (spc->spc_hdr.rsp_handshake != HANDSHAKE_EXEC) {
1116 send_error_resp(spc, reqno, EINVAL);
1117 shutdown(spc->spc_fd, SHUT_RDWR);
1118 spcfreebuf(spc);
1119 return;
1120 }
1121
1122 pthread_mutex_lock(&spc->spc_mtx);
1123 inexec = spc->spc_inexec;
1124 pthread_mutex_unlock(&spc->spc_mtx);
1125 if (inexec) {
1126 send_error_resp(spc, reqno, EBUSY);
1127 shutdown(spc->spc_fd, SHUT_RDWR);
1128 spcfreebuf(spc);
1129 return;
1130 }
1131
1132 pthread_mutex_lock(&spc->spc_mtx);
1133 spc->spc_inexec = 1;
1134 pthread_mutex_unlock(&spc->spc_mtx);
1135
1136 /*
1137 * start to drain lwps. we will wait for it to finish
1138 * in another thread
1139 */
1140 lwproc_switch(spc->spc_mainlwp);
1141 lwproc_lwpexit();
1142 lwproc_switch(NULL);
1143
1144 /*
1145 * exec has to wait for lwps to drain, so finish it off
1146 * in another thread
1147 */
1148 schedulework(spc, SBA_EXEC);
1149 return;
1150 }
1151
1152 if (__predict_false(spc->spc_hdr.rsp_type != RUMPSP_SYSCALL)) {
1153 send_error_resp(spc, reqno, EINVAL);
1154 spcfreebuf(spc);
1155 return;
1156 }
1157
1158 schedulework(spc, SBA_SYSCALL);
1159 }
1160
1161 static void *
1162 spserver(void *arg)
1163 {
1164 struct spservarg *sarg = arg;
1165 struct spclient *spc;
1166 unsigned idx;
1167 int seen;
1168 int rv;
1169 unsigned int nfds, maxidx;
1170
1171 for (idx = 0; idx < MAXCLI; idx++) {
1172 pfdlist[idx].fd = -1;
1173 pfdlist[idx].events = POLLIN;
1174
1175 spc = &spclist[idx];
1176 pthread_mutex_init(&spc->spc_mtx, NULL);
1177 pthread_cond_init(&spc->spc_cv, NULL);
1178 spc->spc_fd = -1;
1179 }
1180 pfdlist[0].fd = spclist[0].spc_fd = sarg->sps_sock;
1181 pfdlist[0].events = POLLIN;
1182 nfds = 1;
1183 maxidx = 0;
1184
1185 pthread_attr_init(&pattr_detached);
1186 pthread_attr_setdetachstate(&pattr_detached, PTHREAD_CREATE_DETACHED);
1187 #if NOTYET
1188 pthread_attr_setstacksize(&pattr_detached, 32*1024);
1189 #endif
1190
1191 pthread_mutex_init(&sbamtx, NULL);
1192 pthread_cond_init(&sbacv, NULL);
1193
1194 DPRINTF(("rump_sp: server mainloop\n"));
1195
1196 for (;;) {
1197 int discoed;
1198
1199 /* g/c hangarounds (eventually) */
1200 discoed = getdisco();
1201 while (discoed--) {
1202 nfds--;
1203 idx = maxidx;
1204 while (idx) {
1205 if (pfdlist[idx].fd != -1) {
1206 maxidx = idx;
1207 break;
1208 }
1209 idx--;
1210 }
1211 DPRINTF(("rump_sp: set maxidx to [%u]\n",
1212 maxidx));
1213 }
1214
1215 DPRINTF(("rump_sp: loop nfd %d\n", maxidx+1));
1216 seen = 0;
1217 rv = poll(pfdlist, maxidx+1, INFTIM);
1218 assert(maxidx+1 <= MAXCLI);
1219 assert(rv != 0);
1220 if (rv == -1) {
1221 if (errno == EINTR)
1222 continue;
1223 fprintf(stderr, "rump_spserver: poll returned %d\n",
1224 errno);
1225 break;
1226 }
1227
1228 for (idx = 0; seen < rv && idx < MAXCLI; idx++) {
1229 if ((pfdlist[idx].revents & POLLIN) == 0)
1230 continue;
1231
1232 seen++;
1233 DPRINTF(("rump_sp: activity at [%u] %d/%d\n",
1234 idx, seen, rv));
1235 if (idx > 0) {
1236 spc = &spclist[idx];
1237 DPRINTF(("rump_sp: mainloop read [%u]\n", idx));
1238 switch (readframe(spc)) {
1239 case 0:
1240 break;
1241 case -1:
1242 serv_handledisco(idx);
1243 break;
1244 default:
1245 switch (spc->spc_hdr.rsp_class) {
1246 case RUMPSP_RESP:
1247 kickwaiter(spc);
1248 break;
1249 case RUMPSP_REQ:
1250 handlereq(spc);
1251 break;
1252 default:
1253 send_error_resp(spc,
1254 spc->spc_hdr.rsp_reqno,
1255 ENOENT);
1256 spcfreebuf(spc);
1257 break;
1258 }
1259 break;
1260 }
1261
1262 } else {
1263 DPRINTF(("rump_sp: mainloop new connection\n"));
1264
1265 if (__predict_false(spfini)) {
1266 close(spclist[0].spc_fd);
1267 serv_shutdown();
1268 goto out;
1269 }
1270
1271 idx = serv_handleconn(pfdlist[0].fd,
1272 sarg->sps_connhook, nfds == MAXCLI);
1273 if (idx)
1274 nfds++;
1275 if (idx > maxidx)
1276 maxidx = idx;
1277 DPRINTF(("rump_sp: maxid now %d\n", maxidx));
1278 }
1279 }
1280 }
1281
1282 out:
1283 return NULL;
1284 }
1285
1286 static unsigned cleanupidx;
1287 static struct sockaddr *cleanupsa;
1288 int
1289 rumpuser_sp_init(const char *url, const struct rumpuser_sp_ops *spopsp,
1290 const char *ostype, const char *osrelease, const char *machine)
1291 {
1292 pthread_t pt;
1293 struct spservarg *sarg;
1294 struct sockaddr *sap;
1295 char *p;
1296 unsigned idx;
1297 int error, s;
1298
1299 p = strdup(url);
1300 if (p == NULL)
1301 return ENOMEM;
1302 error = parseurl(p, &sap, &idx, 1);
1303 free(p);
1304 if (error)
1305 return error;
1306
1307 snprintf(banner, sizeof(banner), "RUMPSP-%d.%d-%s-%s/%s\n",
1308 PROTOMAJOR, PROTOMINOR, ostype, osrelease, machine);
1309
1310 s = socket(parsetab[idx].domain, SOCK_STREAM, 0);
1311 if (s == -1)
1312 return errno;
1313
1314 spops = *spopsp;
1315 sarg = malloc(sizeof(*sarg));
1316 if (sarg == NULL) {
1317 close(s);
1318 return ENOMEM;
1319 }
1320
1321 sarg->sps_sock = s;
1322 sarg->sps_connhook = parsetab[idx].connhook;
1323
1324 cleanupidx = idx;
1325 cleanupsa = sap;
1326
1327 /* sloppy error recovery */
1328
1329 /*LINTED*/
1330 if (bind(s, sap, parsetab[idx].slen) == -1) {
1331 fprintf(stderr, "rump_sp: server bind failed\n");
1332 return errno;
1333 }
1334
1335 if (listen(s, MAXCLI) == -1) {
1336 fprintf(stderr, "rump_sp: server listen failed\n");
1337 return errno;
1338 }
1339
1340 if ((error = pthread_create(&pt, NULL, spserver, sarg)) != 0) {
1341 fprintf(stderr, "rump_sp: cannot create wrkr thread\n");
1342 return errno;
1343 }
1344 pthread_detach(pt);
1345
1346 return 0;
1347 }
1348
1349 void
1350 rumpuser_sp_fini(void *arg)
1351 {
1352 struct spclient *spc = arg;
1353 register_t retval[2] = {0, 0};
1354
1355 if (spclist[0].spc_fd) {
1356 parsetab[cleanupidx].cleanup(cleanupsa);
1357 }
1358
1359 /*
1360 * stuff response into the socket, since this process is just
1361 * about to exit
1362 */
1363 if (spc && spc->spc_syscallreq)
1364 send_syscall_resp(spc, spc->spc_syscallreq, 0, retval);
1365
1366 if (spclist[0].spc_fd) {
1367 shutdown(spclist[0].spc_fd, SHUT_RDWR);
1368 spfini = 1;
1369 }
1370 }
1371