rumpuser_sp.c revision 1.53 1 /* $NetBSD: rumpuser_sp.c,v 1.53 2013/04/27 16:02:56 pooka Exp $ */
2
3 /*
4 * Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
16 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18 * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
21 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 * SUCH DAMAGE.
26 */
27
28 /*
29 * Sysproxy routines. This provides system RPC support over host sockets.
30 * The most notable limitation is that the client and server must share
31 * the same ABI. This does not mean that they have to be the same
32 * machine or that they need to run the same version of the host OS,
33 * just that they must agree on the data structures. This even *might*
34 * work correctly from one hardware architecture to another.
35 */
36
37 #include "rumpuser_port.h"
38
39 #if !defined(lint)
40 __RCSID("$NetBSD: rumpuser_sp.c,v 1.53 2013/04/27 16:02:56 pooka Exp $");
41 #endif /* !lint */
42
43 #include <sys/types.h>
44 #include <sys/mman.h>
45 #include <sys/socket.h>
46
47 #include <arpa/inet.h>
48 #include <netinet/in.h>
49 #include <netinet/tcp.h>
50
51 #include <assert.h>
52 #include <errno.h>
53 #include <fcntl.h>
54 #include <poll.h>
55 #include <pthread.h>
56 #include <stdarg.h>
57 #include <stdio.h>
58 #include <stdlib.h>
59 #include <string.h>
60 #include <unistd.h>
61
62 #include <rump/rump.h> /* XXX: for rfork flags */
63 #include <rump/rumpuser.h>
64
65 #include "rumpuser_int.h"
66
67 #include "sp_common.c"
68
69 #ifndef MAXCLI
70 #define MAXCLI 256
71 #endif
72 #ifndef MAXWORKER
73 #define MAXWORKER 128
74 #endif
75 #ifndef IDLEWORKER
76 #define IDLEWORKER 16
77 #endif
78 int rumpsp_maxworker = MAXWORKER;
79 int rumpsp_idleworker = IDLEWORKER;
80
81 static struct pollfd pfdlist[MAXCLI];
82 static struct spclient spclist[MAXCLI];
83 static unsigned int disco;
84 static volatile int spfini;
85
86 static struct rumpuser_sp_ops spops;
87
88 static char banner[MAXBANNER];
89
90 #define PROTOMAJOR 0
91 #define PROTOMINOR 4
92
93
94 /* how to use atomic ops on Linux? */
95 #if defined(__linux__) || defined(__CYGWIN__)
96 static pthread_mutex_t discomtx = PTHREAD_MUTEX_INITIALIZER;
97
98 static void
99 signaldisco(void)
100 {
101
102 pthread_mutex_lock(&discomtx);
103 disco++;
104 pthread_mutex_unlock(&discomtx);
105 }
106
107 static unsigned int
108 getdisco(void)
109 {
110 unsigned int discocnt;
111
112 pthread_mutex_lock(&discomtx);
113 discocnt = disco;
114 disco = 0;
115 pthread_mutex_unlock(&discomtx);
116
117 return discocnt;
118 }
119
120 #elif defined(__FreeBSD__) || defined(__DragonFly__)
121
122 #include <machine/atomic.h>
123 #define signaldisco() atomic_add_int(&disco, 1)
124 #define getdisco() atomic_readandclear_int(&disco)
125
126 #else /* NetBSD */
127
128 #include <sys/atomic.h>
129 #define signaldisco() atomic_inc_uint(&disco)
130 #define getdisco() atomic_swap_uint(&disco, 0)
131
132 #endif
133
134
135 struct prefork {
136 uint32_t pf_auth[AUTHLEN];
137 struct lwp *pf_lwp;
138
139 LIST_ENTRY(prefork) pf_entries; /* global list */
140 LIST_ENTRY(prefork) pf_spcentries; /* linked from forking spc */
141 };
142 static LIST_HEAD(, prefork) preforks = LIST_HEAD_INITIALIZER(preforks);
143 static pthread_mutex_t pfmtx;
144
145 /*
146 * This version is for the server. It's optimized for multiple threads
147 * and is *NOT* reentrant wrt to signals.
148 */
149 static int
150 waitresp(struct spclient *spc, struct respwait *rw)
151 {
152 int spcstate;
153 int rv = 0;
154
155 pthread_mutex_lock(&spc->spc_mtx);
156 sendunlockl(spc);
157 while (!rw->rw_done && spc->spc_state != SPCSTATE_DYING) {
158 pthread_cond_wait(&rw->rw_cv, &spc->spc_mtx);
159 }
160 TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries);
161 spcstate = spc->spc_state;
162 pthread_mutex_unlock(&spc->spc_mtx);
163
164 pthread_cond_destroy(&rw->rw_cv);
165
166 if (rv)
167 return rv;
168 if (spcstate == SPCSTATE_DYING)
169 return ENOTCONN;
170 return rw->rw_error;
171 }
172
173 /*
174 * Manual wrappers, since librump does not have access to the
175 * user namespace wrapped interfaces.
176 */
177
178 static void
179 lwproc_switch(struct lwp *l)
180 {
181
182 spops.spop_schedule();
183 spops.spop_lwproc_switch(l);
184 spops.spop_unschedule();
185 }
186
187 static void
188 lwproc_release(void)
189 {
190
191 spops.spop_schedule();
192 spops.spop_lwproc_release();
193 spops.spop_unschedule();
194 }
195
196 static int
197 lwproc_rfork(struct spclient *spc, int flags, const char *comm)
198 {
199 int rv;
200
201 spops.spop_schedule();
202 rv = spops.spop_lwproc_rfork(spc, flags, comm);
203 spops.spop_unschedule();
204
205 return rv;
206 }
207
208 static int
209 lwproc_newlwp(pid_t pid)
210 {
211 int rv;
212
213 spops.spop_schedule();
214 rv = spops.spop_lwproc_newlwp(pid);
215 spops.spop_unschedule();
216
217 return rv;
218 }
219
220 static struct lwp *
221 lwproc_curlwp(void)
222 {
223 struct lwp *l;
224
225 spops.spop_schedule();
226 l = spops.spop_lwproc_curlwp();
227 spops.spop_unschedule();
228
229 return l;
230 }
231
232 static pid_t
233 lwproc_getpid(void)
234 {
235 pid_t p;
236
237 spops.spop_schedule();
238 p = spops.spop_getpid();
239 spops.spop_unschedule();
240
241 return p;
242 }
243
244 static void
245 lwproc_execnotify(const char *comm)
246 {
247
248 spops.spop_schedule();
249 spops.spop_execnotify(comm);
250 spops.spop_unschedule();
251 }
252
253 static void
254 lwproc_lwpexit(void)
255 {
256
257 spops.spop_schedule();
258 spops.spop_lwpexit();
259 spops.spop_unschedule();
260 }
261
262 static int
263 rumpsyscall(int sysnum, void *data, register_t *regrv)
264 {
265 long retval[2] = {0, 0};
266 int rv;
267
268 spops.spop_schedule();
269 rv = spops.spop_syscall(sysnum, data, retval);
270 spops.spop_unschedule();
271
272 regrv[0] = retval[0];
273 regrv[1] = retval[1];
274 return rv;
275 }
276
277 static uint64_t
278 nextreq(struct spclient *spc)
279 {
280 uint64_t nw;
281
282 pthread_mutex_lock(&spc->spc_mtx);
283 nw = spc->spc_nextreq++;
284 pthread_mutex_unlock(&spc->spc_mtx);
285
286 return nw;
287 }
288
289 /*
290 * XXX: we send responses with "blocking" I/O. This is not
291 * ok for the main thread. XXXFIXME
292 */
293
294 static void
295 send_error_resp(struct spclient *spc, uint64_t reqno, enum rumpsp_err error)
296 {
297 struct rsp_hdr rhdr;
298 struct iovec iov[1];
299
300 rhdr.rsp_len = sizeof(rhdr);
301 rhdr.rsp_reqno = reqno;
302 rhdr.rsp_class = RUMPSP_ERROR;
303 rhdr.rsp_type = 0;
304 rhdr.rsp_error = error;
305
306 IOVPUT(iov[0], rhdr);
307
308 sendlock(spc);
309 (void)SENDIOV(spc, iov);
310 sendunlock(spc);
311 }
312
313 static int
314 send_handshake_resp(struct spclient *spc, uint64_t reqno, int error)
315 {
316 struct rsp_hdr rhdr;
317 struct iovec iov[2];
318 int rv;
319
320 rhdr.rsp_len = sizeof(rhdr) + sizeof(error);
321 rhdr.rsp_reqno = reqno;
322 rhdr.rsp_class = RUMPSP_RESP;
323 rhdr.rsp_type = RUMPSP_HANDSHAKE;
324 rhdr.rsp_error = 0;
325
326 IOVPUT(iov[0], rhdr);
327 IOVPUT(iov[1], error);
328
329 sendlock(spc);
330 rv = SENDIOV(spc, iov);
331 sendunlock(spc);
332
333 return rv;
334 }
335
336 static int
337 send_syscall_resp(struct spclient *spc, uint64_t reqno, int error,
338 register_t *retval)
339 {
340 struct rsp_hdr rhdr;
341 struct rsp_sysresp sysresp;
342 struct iovec iov[2];
343 int rv;
344
345 rhdr.rsp_len = sizeof(rhdr) + sizeof(sysresp);
346 rhdr.rsp_reqno = reqno;
347 rhdr.rsp_class = RUMPSP_RESP;
348 rhdr.rsp_type = RUMPSP_SYSCALL;
349 rhdr.rsp_sysnum = 0;
350
351 sysresp.rsys_error = error;
352 memcpy(sysresp.rsys_retval, retval, sizeof(sysresp.rsys_retval));
353
354 IOVPUT(iov[0], rhdr);
355 IOVPUT(iov[1], sysresp);
356
357 sendlock(spc);
358 rv = SENDIOV(spc, iov);
359 sendunlock(spc);
360
361 return rv;
362 }
363
364 static int
365 send_prefork_resp(struct spclient *spc, uint64_t reqno, uint32_t *auth)
366 {
367 struct rsp_hdr rhdr;
368 struct iovec iov[2];
369 int rv;
370
371 rhdr.rsp_len = sizeof(rhdr) + AUTHLEN*sizeof(*auth);
372 rhdr.rsp_reqno = reqno;
373 rhdr.rsp_class = RUMPSP_RESP;
374 rhdr.rsp_type = RUMPSP_PREFORK;
375 rhdr.rsp_sysnum = 0;
376
377 IOVPUT(iov[0], rhdr);
378 IOVPUT_WITHSIZE(iov[1], auth, AUTHLEN*sizeof(*auth));
379
380 sendlock(spc);
381 rv = SENDIOV(spc, iov);
382 sendunlock(spc);
383
384 return rv;
385 }
386
387 static int
388 copyin_req(struct spclient *spc, const void *remaddr, size_t *dlen,
389 int wantstr, void **resp)
390 {
391 struct rsp_hdr rhdr;
392 struct rsp_copydata copydata;
393 struct respwait rw;
394 struct iovec iov[2];
395 int rv;
396
397 DPRINTF(("copyin_req: %zu bytes from %p\n", *dlen, remaddr));
398
399 rhdr.rsp_len = sizeof(rhdr) + sizeof(copydata);
400 rhdr.rsp_class = RUMPSP_REQ;
401 if (wantstr)
402 rhdr.rsp_type = RUMPSP_COPYINSTR;
403 else
404 rhdr.rsp_type = RUMPSP_COPYIN;
405 rhdr.rsp_sysnum = 0;
406
407 copydata.rcp_addr = __UNCONST(remaddr);
408 copydata.rcp_len = *dlen;
409
410 IOVPUT(iov[0], rhdr);
411 IOVPUT(iov[1], copydata);
412
413 putwait(spc, &rw, &rhdr);
414 rv = SENDIOV(spc, iov);
415 if (rv) {
416 unputwait(spc, &rw);
417 return rv;
418 }
419
420 rv = waitresp(spc, &rw);
421
422 DPRINTF(("copyin: response %d\n", rv));
423
424 *resp = rw.rw_data;
425 if (wantstr)
426 *dlen = rw.rw_dlen;
427
428 return rv;
429
430 }
431
432 static int
433 send_copyout_req(struct spclient *spc, const void *remaddr,
434 const void *data, size_t dlen)
435 {
436 struct rsp_hdr rhdr;
437 struct rsp_copydata copydata;
438 struct iovec iov[3];
439 int rv;
440
441 DPRINTF(("copyout_req (async): %zu bytes to %p\n", dlen, remaddr));
442
443 rhdr.rsp_len = sizeof(rhdr) + sizeof(copydata) + dlen;
444 rhdr.rsp_reqno = nextreq(spc);
445 rhdr.rsp_class = RUMPSP_REQ;
446 rhdr.rsp_type = RUMPSP_COPYOUT;
447 rhdr.rsp_sysnum = 0;
448
449 copydata.rcp_addr = __UNCONST(remaddr);
450 copydata.rcp_len = dlen;
451
452 IOVPUT(iov[0], rhdr);
453 IOVPUT(iov[1], copydata);
454 IOVPUT_WITHSIZE(iov[2], __UNCONST(data), dlen);
455
456 sendlock(spc);
457 rv = SENDIOV(spc, iov);
458 sendunlock(spc);
459
460 return rv;
461 }
462
463 static int
464 anonmmap_req(struct spclient *spc, size_t howmuch, void **resp)
465 {
466 struct rsp_hdr rhdr;
467 struct respwait rw;
468 struct iovec iov[2];
469 int rv;
470
471 DPRINTF(("anonmmap_req: %zu bytes\n", howmuch));
472
473 rhdr.rsp_len = sizeof(rhdr) + sizeof(howmuch);
474 rhdr.rsp_class = RUMPSP_REQ;
475 rhdr.rsp_type = RUMPSP_ANONMMAP;
476 rhdr.rsp_sysnum = 0;
477
478 IOVPUT(iov[0], rhdr);
479 IOVPUT(iov[1], howmuch);
480
481 putwait(spc, &rw, &rhdr);
482 rv = SENDIOV(spc, iov);
483 if (rv) {
484 unputwait(spc, &rw);
485 return rv;
486 }
487
488 rv = waitresp(spc, &rw);
489
490 *resp = rw.rw_data;
491
492 DPRINTF(("anonmmap: mapped at %p\n", **(void ***)resp));
493
494 return rv;
495 }
496
497 static int
498 send_raise_req(struct spclient *spc, int signo)
499 {
500 struct rsp_hdr rhdr;
501 struct iovec iov[1];
502 int rv;
503
504 rhdr.rsp_len = sizeof(rhdr);
505 rhdr.rsp_class = RUMPSP_REQ;
506 rhdr.rsp_type = RUMPSP_RAISE;
507 rhdr.rsp_signo = signo;
508
509 IOVPUT(iov[0], rhdr);
510
511 sendlock(spc);
512 rv = SENDIOV(spc, iov);
513 sendunlock(spc);
514
515 return rv;
516 }
517
518 static void
519 spcref(struct spclient *spc)
520 {
521
522 pthread_mutex_lock(&spc->spc_mtx);
523 spc->spc_refcnt++;
524 pthread_mutex_unlock(&spc->spc_mtx);
525 }
526
527 static void
528 spcrelease(struct spclient *spc)
529 {
530 int ref;
531
532 pthread_mutex_lock(&spc->spc_mtx);
533 ref = --spc->spc_refcnt;
534 if (__predict_false(spc->spc_inexec && ref <= 2))
535 pthread_cond_broadcast(&spc->spc_cv);
536 pthread_mutex_unlock(&spc->spc_mtx);
537
538 if (ref > 0)
539 return;
540
541 DPRINTF(("rump_sp: spcrelease: spc %p fd %d\n", spc, spc->spc_fd));
542
543 _DIAGASSERT(TAILQ_EMPTY(&spc->spc_respwait));
544 _DIAGASSERT(spc->spc_buf == NULL);
545
546 if (spc->spc_mainlwp) {
547 lwproc_switch(spc->spc_mainlwp);
548 lwproc_release();
549 }
550 spc->spc_mainlwp = NULL;
551
552 close(spc->spc_fd);
553 spc->spc_fd = -1;
554 spc->spc_state = SPCSTATE_NEW;
555
556 signaldisco();
557 }
558
559 static void
560 serv_handledisco(unsigned int idx)
561 {
562 struct spclient *spc = &spclist[idx];
563 int dolwpexit;
564
565 DPRINTF(("rump_sp: disconnecting [%u]\n", idx));
566
567 pfdlist[idx].fd = -1;
568 pfdlist[idx].revents = 0;
569 pthread_mutex_lock(&spc->spc_mtx);
570 spc->spc_state = SPCSTATE_DYING;
571 kickall(spc);
572 sendunlockl(spc);
573 /* exec uses mainlwp in another thread, but also nuked all lwps */
574 dolwpexit = !spc->spc_inexec;
575 pthread_mutex_unlock(&spc->spc_mtx);
576
577 if (dolwpexit && spc->spc_mainlwp) {
578 lwproc_switch(spc->spc_mainlwp);
579 lwproc_lwpexit();
580 lwproc_switch(NULL);
581 }
582
583 /*
584 * Nobody's going to attempt to send/receive anymore,
585 * so reinit info relevant to that.
586 */
587 /*LINTED:pointer casts may be ok*/
588 memset((char *)spc + SPC_ZEROFF, 0, sizeof(*spc) - SPC_ZEROFF);
589
590 spcrelease(spc);
591 }
592
593 static void
594 serv_shutdown(void)
595 {
596 struct spclient *spc;
597 unsigned int i;
598
599 for (i = 1; i < MAXCLI; i++) {
600 spc = &spclist[i];
601 if (spc->spc_fd == -1)
602 continue;
603
604 shutdown(spc->spc_fd, SHUT_RDWR);
605 serv_handledisco(i);
606
607 spcrelease(spc);
608 }
609 }
610
611 static unsigned
612 serv_handleconn(int fd, connecthook_fn connhook, int busy)
613 {
614 struct sockaddr_storage ss;
615 socklen_t sl = sizeof(ss);
616 int newfd, flags;
617 unsigned i;
618
619 /*LINTED: cast ok */
620 newfd = accept(fd, (struct sockaddr *)&ss, &sl);
621 if (newfd == -1)
622 return 0;
623
624 if (busy) {
625 close(newfd); /* EBUSY */
626 return 0;
627 }
628
629 flags = fcntl(newfd, F_GETFL, 0);
630 if (fcntl(newfd, F_SETFL, flags | O_NONBLOCK) == -1) {
631 close(newfd);
632 return 0;
633 }
634
635 if (connhook(newfd) != 0) {
636 close(newfd);
637 return 0;
638 }
639
640 /* write out a banner for the client */
641 if (send(newfd, banner, strlen(banner), MSG_NOSIGNAL)
642 != (ssize_t)strlen(banner)) {
643 close(newfd);
644 return 0;
645 }
646
647 /* find empty slot the simple way */
648 for (i = 0; i < MAXCLI; i++) {
649 if (pfdlist[i].fd == -1 && spclist[i].spc_state == SPCSTATE_NEW)
650 break;
651 }
652
653 assert(i < MAXCLI);
654
655 pfdlist[i].fd = newfd;
656 spclist[i].spc_fd = newfd;
657 spclist[i].spc_istatus = SPCSTATUS_BUSY; /* dedicated receiver */
658 spclist[i].spc_refcnt = 1;
659
660 TAILQ_INIT(&spclist[i].spc_respwait);
661
662 DPRINTF(("rump_sp: added new connection fd %d at idx %u\n", newfd, i));
663
664 return i;
665 }
666
667 static void
668 serv_handlesyscall(struct spclient *spc, struct rsp_hdr *rhdr, uint8_t *data)
669 {
670 register_t retval[2] = {0, 0};
671 int rv, sysnum;
672
673 sysnum = (int)rhdr->rsp_sysnum;
674 DPRINTF(("rump_sp: handling syscall %d from client %d\n",
675 sysnum, spc->spc_pid));
676
677 if (__predict_false((rv = lwproc_newlwp(spc->spc_pid)) != 0)) {
678 retval[0] = -1;
679 send_syscall_resp(spc, rhdr->rsp_reqno, rv, retval);
680 return;
681 }
682 spc->spc_syscallreq = rhdr->rsp_reqno;
683 rv = rumpsyscall(sysnum, data, retval);
684 spc->spc_syscallreq = 0;
685 lwproc_release();
686
687 DPRINTF(("rump_sp: got return value %d & %d/%d\n",
688 rv, retval[0], retval[1]));
689
690 send_syscall_resp(spc, rhdr->rsp_reqno, rv, retval);
691 }
692
693 static void
694 serv_handleexec(struct spclient *spc, struct rsp_hdr *rhdr, char *comm)
695 {
696 size_t commlen = rhdr->rsp_len - HDRSZ;
697
698 pthread_mutex_lock(&spc->spc_mtx);
699 /* one for the connection and one for us */
700 while (spc->spc_refcnt > 2)
701 pthread_cond_wait(&spc->spc_cv, &spc->spc_mtx);
702 pthread_mutex_unlock(&spc->spc_mtx);
703
704 /*
705 * ok, all the threads are dead (or one is still alive and
706 * the connection is dead, in which case this doesn't matter
707 * very much). proceed with exec.
708 */
709
710 /* ensure comm is 0-terminated */
711 /* TODO: make sure it contains sensible chars? */
712 comm[commlen] = '\0';
713
714 lwproc_switch(spc->spc_mainlwp);
715 lwproc_execnotify(comm);
716 lwproc_switch(NULL);
717
718 pthread_mutex_lock(&spc->spc_mtx);
719 spc->spc_inexec = 0;
720 pthread_mutex_unlock(&spc->spc_mtx);
721 send_handshake_resp(spc, rhdr->rsp_reqno, 0);
722 }
723
724 enum sbatype { SBA_SYSCALL, SBA_EXEC };
725
726 struct servbouncearg {
727 struct spclient *sba_spc;
728 struct rsp_hdr sba_hdr;
729 enum sbatype sba_type;
730 uint8_t *sba_data;
731
732 TAILQ_ENTRY(servbouncearg) sba_entries;
733 };
734 static pthread_mutex_t sbamtx;
735 static pthread_cond_t sbacv;
736 static int nworker, idleworker, nwork;
737 static TAILQ_HEAD(, servbouncearg) wrklist = TAILQ_HEAD_INITIALIZER(wrklist);
738
739 /*ARGSUSED*/
740 static void *
741 serv_workbouncer(void *arg)
742 {
743 struct servbouncearg *sba;
744
745 for (;;) {
746 pthread_mutex_lock(&sbamtx);
747 if (__predict_false(idleworker - nwork >= rumpsp_idleworker)) {
748 nworker--;
749 pthread_mutex_unlock(&sbamtx);
750 break;
751 }
752 idleworker++;
753 while (TAILQ_EMPTY(&wrklist)) {
754 _DIAGASSERT(nwork == 0);
755 pthread_cond_wait(&sbacv, &sbamtx);
756 }
757 idleworker--;
758
759 sba = TAILQ_FIRST(&wrklist);
760 TAILQ_REMOVE(&wrklist, sba, sba_entries);
761 nwork--;
762 pthread_mutex_unlock(&sbamtx);
763
764 if (__predict_true(sba->sba_type == SBA_SYSCALL)) {
765 serv_handlesyscall(sba->sba_spc,
766 &sba->sba_hdr, sba->sba_data);
767 } else {
768 _DIAGASSERT(sba->sba_type == SBA_EXEC);
769 serv_handleexec(sba->sba_spc, &sba->sba_hdr,
770 (char *)sba->sba_data);
771 }
772 spcrelease(sba->sba_spc);
773 free(sba->sba_data);
774 free(sba);
775 }
776
777 return NULL;
778 }
779
780 static int
781 sp_copyin(void *arg, const void *raddr, void *laddr, size_t *len, int wantstr)
782 {
783 struct spclient *spc = arg;
784 void *rdata = NULL; /* XXXuninit */
785 int rv, nlocks;
786
787 rumpuser__unschedule(0, &nlocks, NULL);
788
789 rv = copyin_req(spc, raddr, len, wantstr, &rdata);
790 if (rv)
791 goto out;
792
793 memcpy(laddr, rdata, *len);
794 free(rdata);
795
796 out:
797 rumpuser__reschedule(nlocks, NULL);
798 if (rv)
799 return EFAULT;
800 return 0;
801 }
802
803 int
804 rumpuser_sp_copyin(void *arg, const void *raddr, void *laddr, size_t len)
805 {
806
807 return sp_copyin(arg, raddr, laddr, &len, 0);
808 }
809
810 int
811 rumpuser_sp_copyinstr(void *arg, const void *raddr, void *laddr, size_t *len)
812 {
813
814 return sp_copyin(arg, raddr, laddr, len, 1);
815 }
816
817 static int
818 sp_copyout(void *arg, const void *laddr, void *raddr, size_t dlen)
819 {
820 struct spclient *spc = arg;
821 int nlocks, rv;
822
823 rumpuser__unschedule(0, &nlocks, NULL);
824 rv = send_copyout_req(spc, raddr, laddr, dlen);
825 rumpuser__reschedule(nlocks, NULL);
826
827 if (rv)
828 return EFAULT;
829 return 0;
830 }
831
832 int
833 rumpuser_sp_copyout(void *arg, const void *laddr, void *raddr, size_t dlen)
834 {
835
836 return sp_copyout(arg, laddr, raddr, dlen);
837 }
838
839 int
840 rumpuser_sp_copyoutstr(void *arg, const void *laddr, void *raddr, size_t *dlen)
841 {
842
843 return sp_copyout(arg, laddr, raddr, *dlen);
844 }
845
846 int
847 rumpuser_sp_anonmmap(void *arg, size_t howmuch, void **addr)
848 {
849 struct spclient *spc = arg;
850 void *resp, *rdata;
851 int nlocks, rv;
852
853 rumpuser__unschedule(0, &nlocks, NULL);
854
855 rv = anonmmap_req(spc, howmuch, &rdata);
856 if (rv) {
857 rv = EFAULT;
858 goto out;
859 }
860
861 resp = *(void **)rdata;
862 free(rdata);
863
864 if (resp == NULL) {
865 rv = ENOMEM;
866 }
867
868 *addr = resp;
869
870 out:
871 rumpuser__reschedule(nlocks, NULL);
872
873 if (rv)
874 return rv;
875 return 0;
876 }
877
878 int
879 rumpuser_sp_raise(void *arg, int signo)
880 {
881 struct spclient *spc = arg;
882 int rv, nlocks;
883
884 rumpuser__unschedule(0, &nlocks, NULL);
885 rv = send_raise_req(spc, signo);
886 rumpuser__reschedule(nlocks, NULL);
887
888 return rv;
889 }
890
891 static pthread_attr_t pattr_detached;
892 static void
893 schedulework(struct spclient *spc, enum sbatype sba_type)
894 {
895 struct servbouncearg *sba;
896 pthread_t pt;
897 uint64_t reqno;
898 int retries = 0;
899
900 reqno = spc->spc_hdr.rsp_reqno;
901 while ((sba = malloc(sizeof(*sba))) == NULL) {
902 if (nworker == 0 || retries > 10) {
903 send_error_resp(spc, reqno, RUMPSP_ERR_TRYAGAIN);
904 spcfreebuf(spc);
905 return;
906 }
907 /* slim chance of more memory? */
908 usleep(10000);
909 }
910
911 sba->sba_spc = spc;
912 sba->sba_type = sba_type;
913 sba->sba_hdr = spc->spc_hdr;
914 sba->sba_data = spc->spc_buf;
915 spcresetbuf(spc);
916
917 spcref(spc);
918
919 pthread_mutex_lock(&sbamtx);
920 TAILQ_INSERT_TAIL(&wrklist, sba, sba_entries);
921 nwork++;
922 if (nwork <= idleworker) {
923 /* do we have a daemon's tool (i.e. idle threads)? */
924 pthread_cond_signal(&sbacv);
925 } else if (nworker < rumpsp_maxworker) {
926 /*
927 * Else, need to create one
928 * (if we can, otherwise just expect another
929 * worker to pick up the syscall)
930 */
931 if (pthread_create(&pt, &pattr_detached,
932 serv_workbouncer, NULL) == 0) {
933 nworker++;
934 }
935 }
936 pthread_mutex_unlock(&sbamtx);
937 }
938
939 /*
940 *
941 * Startup routines and mainloop for server.
942 *
943 */
944
945 struct spservarg {
946 int sps_sock;
947 connecthook_fn sps_connhook;
948 };
949
950 static void
951 handlereq(struct spclient *spc)
952 {
953 uint64_t reqno;
954 int error, i;
955
956 reqno = spc->spc_hdr.rsp_reqno;
957 if (__predict_false(spc->spc_state == SPCSTATE_NEW)) {
958 if (spc->spc_hdr.rsp_type != RUMPSP_HANDSHAKE) {
959 send_error_resp(spc, reqno, RUMPSP_ERR_AUTH);
960 shutdown(spc->spc_fd, SHUT_RDWR);
961 spcfreebuf(spc);
962 return;
963 }
964
965 if (spc->spc_hdr.rsp_handshake == HANDSHAKE_GUEST) {
966 char *comm = (char *)spc->spc_buf;
967 size_t commlen = spc->spc_hdr.rsp_len - HDRSZ;
968
969 /* ensure it's 0-terminated */
970 /* XXX make sure it contains sensible chars? */
971 comm[commlen] = '\0';
972
973 if ((error = lwproc_rfork(spc,
974 RUMP_RFCFDG, comm)) != 0) {
975 shutdown(spc->spc_fd, SHUT_RDWR);
976 }
977
978 spcfreebuf(spc);
979 if (error)
980 return;
981
982 spc->spc_mainlwp = lwproc_curlwp();
983
984 send_handshake_resp(spc, reqno, 0);
985 } else if (spc->spc_hdr.rsp_handshake == HANDSHAKE_FORK) {
986 struct lwp *tmpmain;
987 struct prefork *pf;
988 struct handshake_fork *rfp;
989 int cancel;
990
991 if (spc->spc_off-HDRSZ != sizeof(*rfp)) {
992 send_error_resp(spc, reqno,
993 RUMPSP_ERR_MALFORMED_REQUEST);
994 shutdown(spc->spc_fd, SHUT_RDWR);
995 spcfreebuf(spc);
996 return;
997 }
998
999 /*LINTED*/
1000 rfp = (void *)spc->spc_buf;
1001 cancel = rfp->rf_cancel;
1002
1003 pthread_mutex_lock(&pfmtx);
1004 LIST_FOREACH(pf, &preforks, pf_entries) {
1005 if (memcmp(rfp->rf_auth, pf->pf_auth,
1006 sizeof(rfp->rf_auth)) == 0) {
1007 LIST_REMOVE(pf, pf_entries);
1008 LIST_REMOVE(pf, pf_spcentries);
1009 break;
1010 }
1011 }
1012 pthread_mutex_lock(&pfmtx);
1013 spcfreebuf(spc);
1014
1015 if (!pf) {
1016 send_error_resp(spc, reqno,
1017 RUMPSP_ERR_INVALID_PREFORK);
1018 shutdown(spc->spc_fd, SHUT_RDWR);
1019 return;
1020 }
1021
1022 tmpmain = pf->pf_lwp;
1023 free(pf);
1024 lwproc_switch(tmpmain);
1025 if (cancel) {
1026 lwproc_release();
1027 shutdown(spc->spc_fd, SHUT_RDWR);
1028 return;
1029 }
1030
1031 /*
1032 * So, we forked already during "prefork" to save
1033 * the file descriptors from a parent exit
1034 * race condition. But now we need to fork
1035 * a second time since the initial fork has
1036 * the wrong spc pointer. (yea, optimize
1037 * interfaces some day if anyone cares)
1038 */
1039 if ((error = lwproc_rfork(spc, 0, NULL)) != 0) {
1040 send_error_resp(spc, reqno,
1041 RUMPSP_ERR_RFORK_FAILED);
1042 shutdown(spc->spc_fd, SHUT_RDWR);
1043 lwproc_release();
1044 return;
1045 }
1046 spc->spc_mainlwp = lwproc_curlwp();
1047 lwproc_switch(tmpmain);
1048 lwproc_release();
1049 lwproc_switch(spc->spc_mainlwp);
1050
1051 send_handshake_resp(spc, reqno, 0);
1052 } else {
1053 send_error_resp(spc, reqno, RUMPSP_ERR_AUTH);
1054 shutdown(spc->spc_fd, SHUT_RDWR);
1055 spcfreebuf(spc);
1056 return;
1057 }
1058
1059 spc->spc_pid = lwproc_getpid();
1060
1061 DPRINTF(("rump_sp: handshake for client %p complete, pid %d\n",
1062 spc, spc->spc_pid));
1063
1064 lwproc_switch(NULL);
1065 spc->spc_state = SPCSTATE_RUNNING;
1066 return;
1067 }
1068
1069 if (__predict_false(spc->spc_hdr.rsp_type == RUMPSP_PREFORK)) {
1070 struct prefork *pf;
1071 uint32_t auth[AUTHLEN];
1072 int inexec;
1073
1074 DPRINTF(("rump_sp: prefork handler executing for %p\n", spc));
1075 spcfreebuf(spc);
1076
1077 pthread_mutex_lock(&spc->spc_mtx);
1078 inexec = spc->spc_inexec;
1079 pthread_mutex_unlock(&spc->spc_mtx);
1080 if (inexec) {
1081 send_error_resp(spc, reqno, RUMPSP_ERR_INEXEC);
1082 shutdown(spc->spc_fd, SHUT_RDWR);
1083 return;
1084 }
1085
1086 pf = malloc(sizeof(*pf));
1087 if (pf == NULL) {
1088 send_error_resp(spc, reqno, RUMPSP_ERR_NOMEM);
1089 return;
1090 }
1091
1092 /*
1093 * Use client main lwp to fork. this is never used by
1094 * worker threads (except in exec, but we checked for that
1095 * above) so we can safely use it here.
1096 */
1097 lwproc_switch(spc->spc_mainlwp);
1098 if ((error = lwproc_rfork(spc, RUMP_RFFDG, NULL)) != 0) {
1099 DPRINTF(("rump_sp: fork failed: %d (%p)\n",error, spc));
1100 send_error_resp(spc, reqno, RUMPSP_ERR_RFORK_FAILED);
1101 lwproc_switch(NULL);
1102 free(pf);
1103 return;
1104 }
1105
1106 /* Ok, we have a new process context and a new curlwp */
1107 for (i = 0; i < AUTHLEN; i++) {
1108 pf->pf_auth[i] = auth[i] = arc4random();
1109 }
1110 pf->pf_lwp = lwproc_curlwp();
1111 lwproc_switch(NULL);
1112
1113 pthread_mutex_lock(&pfmtx);
1114 LIST_INSERT_HEAD(&preforks, pf, pf_entries);
1115 LIST_INSERT_HEAD(&spc->spc_pflist, pf, pf_spcentries);
1116 pthread_mutex_unlock(&pfmtx);
1117
1118 DPRINTF(("rump_sp: prefork handler success %p\n", spc));
1119
1120 send_prefork_resp(spc, reqno, auth);
1121 return;
1122 }
1123
1124 if (__predict_false(spc->spc_hdr.rsp_type == RUMPSP_HANDSHAKE)) {
1125 int inexec;
1126
1127 if (spc->spc_hdr.rsp_handshake != HANDSHAKE_EXEC) {
1128 send_error_resp(spc, reqno,
1129 RUMPSP_ERR_MALFORMED_REQUEST);
1130 shutdown(spc->spc_fd, SHUT_RDWR);
1131 spcfreebuf(spc);
1132 return;
1133 }
1134
1135 pthread_mutex_lock(&spc->spc_mtx);
1136 inexec = spc->spc_inexec;
1137 pthread_mutex_unlock(&spc->spc_mtx);
1138 if (inexec) {
1139 send_error_resp(spc, reqno, RUMPSP_ERR_INEXEC);
1140 shutdown(spc->spc_fd, SHUT_RDWR);
1141 spcfreebuf(spc);
1142 return;
1143 }
1144
1145 pthread_mutex_lock(&spc->spc_mtx);
1146 spc->spc_inexec = 1;
1147 pthread_mutex_unlock(&spc->spc_mtx);
1148
1149 /*
1150 * start to drain lwps. we will wait for it to finish
1151 * in another thread
1152 */
1153 lwproc_switch(spc->spc_mainlwp);
1154 lwproc_lwpexit();
1155 lwproc_switch(NULL);
1156
1157 /*
1158 * exec has to wait for lwps to drain, so finish it off
1159 * in another thread
1160 */
1161 schedulework(spc, SBA_EXEC);
1162 return;
1163 }
1164
1165 if (__predict_false(spc->spc_hdr.rsp_type != RUMPSP_SYSCALL)) {
1166 send_error_resp(spc, reqno, RUMPSP_ERR_MALFORMED_REQUEST);
1167 spcfreebuf(spc);
1168 return;
1169 }
1170
1171 schedulework(spc, SBA_SYSCALL);
1172 }
1173
1174 static void *
1175 spserver(void *arg)
1176 {
1177 struct spservarg *sarg = arg;
1178 struct spclient *spc;
1179 unsigned idx;
1180 int seen;
1181 int rv;
1182 unsigned int nfds, maxidx;
1183
1184 for (idx = 0; idx < MAXCLI; idx++) {
1185 pfdlist[idx].fd = -1;
1186 pfdlist[idx].events = POLLIN;
1187
1188 spc = &spclist[idx];
1189 pthread_mutex_init(&spc->spc_mtx, NULL);
1190 pthread_cond_init(&spc->spc_cv, NULL);
1191 spc->spc_fd = -1;
1192 }
1193 pfdlist[0].fd = spclist[0].spc_fd = sarg->sps_sock;
1194 pfdlist[0].events = POLLIN;
1195 nfds = 1;
1196 maxidx = 0;
1197
1198 pthread_attr_init(&pattr_detached);
1199 pthread_attr_setdetachstate(&pattr_detached, PTHREAD_CREATE_DETACHED);
1200 #if NOTYET
1201 pthread_attr_setstacksize(&pattr_detached, 32*1024);
1202 #endif
1203
1204 pthread_mutex_init(&sbamtx, NULL);
1205 pthread_cond_init(&sbacv, NULL);
1206
1207 DPRINTF(("rump_sp: server mainloop\n"));
1208
1209 for (;;) {
1210 int discoed;
1211
1212 /* g/c hangarounds (eventually) */
1213 discoed = getdisco();
1214 while (discoed--) {
1215 nfds--;
1216 idx = maxidx;
1217 while (idx) {
1218 if (pfdlist[idx].fd != -1) {
1219 maxidx = idx;
1220 break;
1221 }
1222 idx--;
1223 }
1224 DPRINTF(("rump_sp: set maxidx to [%u]\n",
1225 maxidx));
1226 }
1227
1228 DPRINTF(("rump_sp: loop nfd %d\n", maxidx+1));
1229 seen = 0;
1230 rv = poll(pfdlist, maxidx+1, INFTIM);
1231 assert(maxidx+1 <= MAXCLI);
1232 assert(rv != 0);
1233 if (rv == -1) {
1234 if (errno == EINTR)
1235 continue;
1236 fprintf(stderr, "rump_spserver: poll returned %d\n",
1237 errno);
1238 break;
1239 }
1240
1241 for (idx = 0; seen < rv && idx < MAXCLI; idx++) {
1242 if ((pfdlist[idx].revents & POLLIN) == 0)
1243 continue;
1244
1245 seen++;
1246 DPRINTF(("rump_sp: activity at [%u] %d/%d\n",
1247 idx, seen, rv));
1248 if (idx > 0) {
1249 spc = &spclist[idx];
1250 DPRINTF(("rump_sp: mainloop read [%u]\n", idx));
1251 switch (readframe(spc)) {
1252 case 0:
1253 break;
1254 case -1:
1255 serv_handledisco(idx);
1256 break;
1257 default:
1258 switch (spc->spc_hdr.rsp_class) {
1259 case RUMPSP_RESP:
1260 kickwaiter(spc);
1261 break;
1262 case RUMPSP_REQ:
1263 handlereq(spc);
1264 break;
1265 default:
1266 send_error_resp(spc,
1267 spc->spc_hdr.rsp_reqno,
1268 RUMPSP_ERR_MALFORMED_REQUEST);
1269 spcfreebuf(spc);
1270 break;
1271 }
1272 break;
1273 }
1274
1275 } else {
1276 DPRINTF(("rump_sp: mainloop new connection\n"));
1277
1278 if (__predict_false(spfini)) {
1279 close(spclist[0].spc_fd);
1280 serv_shutdown();
1281 goto out;
1282 }
1283
1284 idx = serv_handleconn(pfdlist[0].fd,
1285 sarg->sps_connhook, nfds == MAXCLI);
1286 if (idx)
1287 nfds++;
1288 if (idx > maxidx)
1289 maxidx = idx;
1290 DPRINTF(("rump_sp: maxid now %d\n", maxidx));
1291 }
1292 }
1293 }
1294
1295 out:
1296 return NULL;
1297 }
1298
1299 static unsigned cleanupidx;
1300 static struct sockaddr *cleanupsa;
1301 int
1302 rumpuser_sp_init(const char *url, const struct rumpuser_sp_ops *spopsp,
1303 const char *ostype, const char *osrelease, const char *machine)
1304 {
1305 pthread_t pt;
1306 struct spservarg *sarg;
1307 struct sockaddr *sap;
1308 char *p;
1309 unsigned idx;
1310 int error, s;
1311
1312 p = strdup(url);
1313 if (p == NULL)
1314 return ENOMEM;
1315 error = parseurl(p, &sap, &idx, 1);
1316 free(p);
1317 if (error)
1318 return error;
1319
1320 snprintf(banner, sizeof(banner), "RUMPSP-%d.%d-%s-%s/%s\n",
1321 PROTOMAJOR, PROTOMINOR, ostype, osrelease, machine);
1322
1323 s = socket(parsetab[idx].domain, SOCK_STREAM, 0);
1324 if (s == -1)
1325 return errno;
1326
1327 spops = *spopsp;
1328 sarg = malloc(sizeof(*sarg));
1329 if (sarg == NULL) {
1330 close(s);
1331 return ENOMEM;
1332 }
1333
1334 sarg->sps_sock = s;
1335 sarg->sps_connhook = parsetab[idx].connhook;
1336
1337 cleanupidx = idx;
1338 cleanupsa = sap;
1339
1340 /* sloppy error recovery */
1341
1342 /*LINTED*/
1343 if (bind(s, sap, parsetab[idx].slen) == -1) {
1344 fprintf(stderr, "rump_sp: server bind failed\n");
1345 return errno;
1346 }
1347
1348 if (listen(s, MAXCLI) == -1) {
1349 fprintf(stderr, "rump_sp: server listen failed\n");
1350 return errno;
1351 }
1352
1353 if ((error = pthread_create(&pt, NULL, spserver, sarg)) != 0) {
1354 fprintf(stderr, "rump_sp: cannot create wrkr thread\n");
1355 return errno;
1356 }
1357 pthread_detach(pt);
1358
1359 return 0;
1360 }
1361
1362 void
1363 rumpuser_sp_fini(void *arg)
1364 {
1365 struct spclient *spc = arg;
1366 register_t retval[2] = {0, 0};
1367
1368 if (spclist[0].spc_fd) {
1369 parsetab[cleanupidx].cleanup(cleanupsa);
1370 }
1371
1372 /*
1373 * stuff response into the socket, since this process is just
1374 * about to exit
1375 */
1376 if (spc && spc->spc_syscallreq)
1377 send_syscall_resp(spc, spc->spc_syscallreq, 0, retval);
1378
1379 if (spclist[0].spc_fd) {
1380 shutdown(spclist[0].spc_fd, SHUT_RDWR);
1381 spfini = 1;
1382 }
1383 }
1384