sys_pipe.c revision 1.161 1 /* $NetBSD: sys_pipe.c,v 1.161 2023/10/04 22:12:23 ad Exp $ */
2
3 /*-
4 * Copyright (c) 2003, 2007, 2008, 2009 The NetBSD Foundation, Inc.
5 * All rights reserved.
6 *
7 * This code is derived from software contributed to The NetBSD Foundation
8 * by Paul Kranenburg, and by Andrew Doran.
9 *
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
12 * are met:
13 * 1. Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in the
17 * documentation and/or other materials provided with the distribution.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32 /*
33 * Copyright (c) 1996 John S. Dyson
34 * All rights reserved.
35 *
36 * Redistribution and use in source and binary forms, with or without
37 * modification, are permitted provided that the following conditions
38 * are met:
39 * 1. Redistributions of source code must retain the above copyright
40 * notice immediately at the beginning of the file, without modification,
41 * this list of conditions, and the following disclaimer.
42 * 2. Redistributions in binary form must reproduce the above copyright
43 * notice, this list of conditions and the following disclaimer in the
44 * documentation and/or other materials provided with the distribution.
45 * 3. Absolutely no warranty of function or purpose is made by the author
46 * John S. Dyson.
47 * 4. Modifications may be freely made to this file if the above conditions
48 * are met.
49 */
50
51 /*
52 * This file contains a high-performance replacement for the socket-based
53 * pipes scheme originally used. It does not support all features of
54 * sockets, but does do everything that pipes normally do.
55 *
56 * This code has two modes of operation, a small write mode and a large
57 * write mode. The small write mode acts like conventional pipes with
58 * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the
59 * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT
60 * and PIPE_SIZE in size it is mapped read-only into the kernel address space
61 * using the UVM page loan facility from where the receiving process can copy
62 * the data directly from the pages in the sending process.
63 *
64 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
65 * happen for small transfers so that the system will not spend all of
66 * its time context switching. PIPE_SIZE is constrained by the
67 * amount of kernel virtual memory.
68 */
69
70 #include <sys/cdefs.h>
71 __KERNEL_RCSID(0, "$NetBSD: sys_pipe.c,v 1.161 2023/10/04 22:12:23 ad Exp $");
72
73 #include <sys/param.h>
74 #include <sys/systm.h>
75 #include <sys/proc.h>
76 #include <sys/fcntl.h>
77 #include <sys/file.h>
78 #include <sys/filedesc.h>
79 #include <sys/filio.h>
80 #include <sys/kernel.h>
81 #include <sys/ttycom.h>
82 #include <sys/stat.h>
83 #include <sys/poll.h>
84 #include <sys/signalvar.h>
85 #include <sys/vnode.h>
86 #include <sys/uio.h>
87 #include <sys/select.h>
88 #include <sys/mount.h>
89 #include <sys/syscallargs.h>
90 #include <sys/sysctl.h>
91 #include <sys/kauth.h>
92 #include <sys/atomic.h>
93 #include <sys/pipe.h>
94
95 static int pipe_read(file_t *, off_t *, struct uio *, kauth_cred_t, int);
96 static int pipe_write(file_t *, off_t *, struct uio *, kauth_cred_t, int);
97 static int pipe_close(file_t *);
98 static int pipe_poll(file_t *, int);
99 static int pipe_kqfilter(file_t *, struct knote *);
100 static int pipe_stat(file_t *, struct stat *);
101 static int pipe_ioctl(file_t *, u_long, void *);
102 static void pipe_restart(file_t *);
103 static int pipe_fpathconf(file_t *, int, register_t *);
104 static int pipe_posix_fadvise(file_t *, off_t, off_t, int);
105
106 static const struct fileops pipeops = {
107 .fo_name = "pipe",
108 .fo_read = pipe_read,
109 .fo_write = pipe_write,
110 .fo_ioctl = pipe_ioctl,
111 .fo_fcntl = fnullop_fcntl,
112 .fo_poll = pipe_poll,
113 .fo_stat = pipe_stat,
114 .fo_close = pipe_close,
115 .fo_kqfilter = pipe_kqfilter,
116 .fo_restart = pipe_restart,
117 .fo_fpathconf = pipe_fpathconf,
118 .fo_posix_fadvise = pipe_posix_fadvise,
119 };
120
121 /*
122 * Default pipe buffer size(s), this can be kind-of large now because pipe
123 * space is pageable. The pipe code will try to maintain locality of
124 * reference for performance reasons, so small amounts of outstanding I/O
125 * will not wipe the cache.
126 */
127 #define MINPIPESIZE (PIPE_SIZE / 3)
128 #define MAXPIPESIZE (2 * PIPE_SIZE / 3)
129
130 /*
131 * Limit the number of "big" pipes
132 */
133 #define LIMITBIGPIPES 32
134 static u_int maxbigpipes = LIMITBIGPIPES;
135 static u_int nbigpipe = 0;
136
137 /*
138 * Amount of KVA consumed by pipe buffers.
139 */
140 static u_int amountpipekva = 0;
141
142 static void pipeclose(struct pipe *);
143 static void pipe_free_kmem(struct pipe *);
144 static int pipe_create(struct pipe **, pool_cache_t);
145 static int pipelock(struct pipe *, bool);
146 static inline void pipeunlock(struct pipe *);
147 static void pipeselwakeup(struct pipe *, struct pipe *, int);
148 static int pipespace(struct pipe *, int);
149 static int pipe_ctor(void *, void *, int);
150 static void pipe_dtor(void *, void *);
151
152 static pool_cache_t pipe_wr_cache;
153 static pool_cache_t pipe_rd_cache;
154
155 void
156 pipe_init(void)
157 {
158
159 /* Writer side is not automatically allocated KVA. */
160 pipe_wr_cache = pool_cache_init(sizeof(struct pipe), 0, 0, 0, "pipewr",
161 NULL, IPL_NONE, pipe_ctor, pipe_dtor, NULL);
162 KASSERT(pipe_wr_cache != NULL);
163
164 /* Reader side gets preallocated KVA. */
165 pipe_rd_cache = pool_cache_init(sizeof(struct pipe), 0, 0, 0, "piperd",
166 NULL, IPL_NONE, pipe_ctor, pipe_dtor, (void *)1);
167 KASSERT(pipe_rd_cache != NULL);
168 }
169
170 static int
171 pipe_ctor(void *arg, void *obj, int flags)
172 {
173 struct pipe *pipe;
174 vaddr_t va;
175
176 pipe = obj;
177
178 memset(pipe, 0, sizeof(struct pipe));
179 if (arg != NULL) {
180 /* Preallocate space. */
181 va = uvm_km_alloc(kernel_map, PIPE_SIZE, 0,
182 UVM_KMF_PAGEABLE | UVM_KMF_WAITVA);
183 KASSERT(va != 0);
184 pipe->pipe_kmem = va;
185 atomic_add_int(&amountpipekva, PIPE_SIZE);
186 }
187 cv_init(&pipe->pipe_rcv, "pipe_rd");
188 cv_init(&pipe->pipe_wcv, "pipe_wr");
189 cv_init(&pipe->pipe_draincv, "pipe_drn");
190 cv_init(&pipe->pipe_lkcv, "pipe_lk");
191 selinit(&pipe->pipe_sel);
192 pipe->pipe_state = PIPE_SIGNALR;
193
194 return 0;
195 }
196
197 static void
198 pipe_dtor(void *arg, void *obj)
199 {
200 struct pipe *pipe;
201
202 pipe = obj;
203
204 cv_destroy(&pipe->pipe_rcv);
205 cv_destroy(&pipe->pipe_wcv);
206 cv_destroy(&pipe->pipe_draincv);
207 cv_destroy(&pipe->pipe_lkcv);
208 seldestroy(&pipe->pipe_sel);
209 if (pipe->pipe_kmem != 0) {
210 uvm_km_free(kernel_map, pipe->pipe_kmem, PIPE_SIZE,
211 UVM_KMF_PAGEABLE);
212 atomic_add_int(&amountpipekva, -PIPE_SIZE);
213 }
214 }
215
216 /*
217 * The pipe system call for the DTYPE_PIPE type of pipes
218 */
219 int
220 pipe1(struct lwp *l, int *fildes, int flags)
221 {
222 struct pipe *rpipe, *wpipe;
223 file_t *rf, *wf;
224 int fd, error;
225 proc_t *p;
226
227 if (flags & ~(O_CLOEXEC|O_NONBLOCK|O_NOSIGPIPE))
228 return EINVAL;
229 p = curproc;
230 rpipe = wpipe = NULL;
231 if ((error = pipe_create(&rpipe, pipe_rd_cache)) ||
232 (error = pipe_create(&wpipe, pipe_wr_cache))) {
233 goto free2;
234 }
235 rpipe->pipe_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_NONE);
236 wpipe->pipe_lock = rpipe->pipe_lock;
237 mutex_obj_hold(wpipe->pipe_lock);
238
239 error = fd_allocfile(&rf, &fd);
240 if (error)
241 goto free2;
242 fildes[0] = fd;
243
244 error = fd_allocfile(&wf, &fd);
245 if (error)
246 goto free3;
247 fildes[1] = fd;
248
249 rf->f_flag = FREAD | flags;
250 rf->f_type = DTYPE_PIPE;
251 rf->f_pipe = rpipe;
252 rf->f_ops = &pipeops;
253 fd_set_exclose(l, fildes[0], (flags & O_CLOEXEC) != 0);
254
255 wf->f_flag = FWRITE | flags;
256 wf->f_type = DTYPE_PIPE;
257 wf->f_pipe = wpipe;
258 wf->f_ops = &pipeops;
259 fd_set_exclose(l, fildes[1], (flags & O_CLOEXEC) != 0);
260
261 rpipe->pipe_peer = wpipe;
262 wpipe->pipe_peer = rpipe;
263
264 fd_affix(p, rf, fildes[0]);
265 fd_affix(p, wf, fildes[1]);
266 return (0);
267 free3:
268 fd_abort(p, rf, fildes[0]);
269 free2:
270 pipeclose(wpipe);
271 pipeclose(rpipe);
272
273 return (error);
274 }
275
276 /*
277 * Allocate kva for pipe circular buffer, the space is pageable
278 * This routine will 'realloc' the size of a pipe safely, if it fails
279 * it will retain the old buffer.
280 * If it fails it will return ENOMEM.
281 */
282 static int
283 pipespace(struct pipe *pipe, int size)
284 {
285 void *buffer;
286
287 /*
288 * Allocate pageable virtual address space. Physical memory is
289 * allocated on demand.
290 */
291 if (size == PIPE_SIZE && pipe->pipe_kmem != 0) {
292 buffer = (void *)pipe->pipe_kmem;
293 } else {
294 buffer = (void *)uvm_km_alloc(kernel_map, round_page(size),
295 0, UVM_KMF_PAGEABLE);
296 if (buffer == NULL)
297 return (ENOMEM);
298 atomic_add_int(&amountpipekva, size);
299 }
300
301 /* free old resources if we're resizing */
302 pipe_free_kmem(pipe);
303 pipe->pipe_buffer.buffer = buffer;
304 pipe->pipe_buffer.size = size;
305 pipe->pipe_buffer.in = 0;
306 pipe->pipe_buffer.out = 0;
307 pipe->pipe_buffer.cnt = 0;
308 return (0);
309 }
310
311 /*
312 * Initialize and allocate VM and memory for pipe.
313 */
314 static int
315 pipe_create(struct pipe **pipep, pool_cache_t cache)
316 {
317 struct pipe *pipe;
318 int error;
319
320 pipe = pool_cache_get(cache, PR_WAITOK);
321 KASSERT(pipe != NULL);
322 *pipep = pipe;
323 error = 0;
324 getnanotime(&pipe->pipe_btime);
325 pipe->pipe_atime = pipe->pipe_mtime = pipe->pipe_btime;
326 pipe->pipe_lock = NULL;
327 if (cache == pipe_rd_cache) {
328 error = pipespace(pipe, PIPE_SIZE);
329 } else {
330 pipe->pipe_buffer.buffer = NULL;
331 pipe->pipe_buffer.size = 0;
332 pipe->pipe_buffer.in = 0;
333 pipe->pipe_buffer.out = 0;
334 pipe->pipe_buffer.cnt = 0;
335 }
336 return error;
337 }
338
339 /*
340 * Lock a pipe for I/O, blocking other access
341 * Called with pipe spin lock held.
342 */
343 static int
344 pipelock(struct pipe *pipe, bool catch_p)
345 {
346 int error;
347
348 KASSERT(mutex_owned(pipe->pipe_lock));
349
350 while (pipe->pipe_state & PIPE_LOCKFL) {
351 pipe->pipe_waiters++;
352 KASSERT(pipe->pipe_waiters != 0); /* just in case */
353 if (catch_p) {
354 error = cv_wait_sig(&pipe->pipe_lkcv, pipe->pipe_lock);
355 if (error != 0) {
356 KASSERT(pipe->pipe_waiters > 0);
357 pipe->pipe_waiters--;
358 return error;
359 }
360 } else
361 cv_wait(&pipe->pipe_lkcv, pipe->pipe_lock);
362 KASSERT(pipe->pipe_waiters > 0);
363 pipe->pipe_waiters--;
364 }
365
366 pipe->pipe_state |= PIPE_LOCKFL;
367
368 return 0;
369 }
370
371 /*
372 * unlock a pipe I/O lock
373 */
374 static inline void
375 pipeunlock(struct pipe *pipe)
376 {
377
378 KASSERT(pipe->pipe_state & PIPE_LOCKFL);
379
380 pipe->pipe_state &= ~PIPE_LOCKFL;
381 if (pipe->pipe_waiters > 0) {
382 cv_signal(&pipe->pipe_lkcv);
383 }
384 }
385
386 /*
387 * Select/poll wakup. This also sends SIGIO to peer connected to
388 * 'sigpipe' side of pipe.
389 */
390 static void
391 pipeselwakeup(struct pipe *selp, struct pipe *sigp, int code)
392 {
393 int band;
394
395 switch (code) {
396 case POLL_IN:
397 band = POLLIN|POLLRDNORM;
398 break;
399 case POLL_OUT:
400 band = POLLOUT|POLLWRNORM;
401 break;
402 case POLL_HUP:
403 band = POLLHUP;
404 break;
405 case POLL_ERR:
406 band = POLLERR;
407 break;
408 default:
409 band = 0;
410 #ifdef DIAGNOSTIC
411 printf("bad siginfo code %d in pipe notification.\n", code);
412 #endif
413 break;
414 }
415
416 selnotify(&selp->pipe_sel, band, NOTE_SUBMIT);
417
418 if (sigp == NULL || (sigp->pipe_state & PIPE_ASYNC) == 0)
419 return;
420
421 fownsignal(sigp->pipe_pgid, SIGIO, code, band, selp);
422 }
423
424 static int
425 pipe_read(file_t *fp, off_t *offset, struct uio *uio, kauth_cred_t cred,
426 int flags)
427 {
428 struct pipe *rpipe = fp->f_pipe;
429 struct pipebuf *bp = &rpipe->pipe_buffer;
430 kmutex_t *lock = rpipe->pipe_lock;
431 int error;
432 size_t nread = 0;
433 size_t size;
434 size_t ocnt;
435 unsigned int wakeup_state = 0;
436
437 /*
438 * Try to avoid locking the pipe if we have nothing to do.
439 *
440 * There are programs which share one pipe amongst multiple processes
441 * and perform non-blocking reads in parallel, even if the pipe is
442 * empty. This in particular is the case with BSD make, which when
443 * spawned with a high -j number can find itself with over half of the
444 * calls failing to find anything.
445 */
446 if ((fp->f_flag & FNONBLOCK) != 0) {
447 if (__predict_false(uio->uio_resid == 0))
448 return (0);
449 if (atomic_load_relaxed(&bp->cnt) == 0 &&
450 (atomic_load_relaxed(&rpipe->pipe_state) & PIPE_EOF) == 0)
451 return (EAGAIN);
452 }
453
454 mutex_enter(lock);
455 ++rpipe->pipe_busy;
456 ocnt = bp->cnt;
457
458 again:
459 error = pipelock(rpipe, true);
460 if (error)
461 goto unlocked_error;
462
463 while (uio->uio_resid) {
464 /*
465 * Normal pipe buffer receive.
466 */
467 if (bp->cnt > 0) {
468 size = bp->size - bp->out;
469 if (size > bp->cnt)
470 size = bp->cnt;
471 if (size > uio->uio_resid)
472 size = uio->uio_resid;
473
474 mutex_exit(lock);
475 error = uiomove((char *)bp->buffer + bp->out, size, uio);
476 mutex_enter(lock);
477 if (error)
478 break;
479
480 bp->out += size;
481 if (bp->out >= bp->size)
482 bp->out = 0;
483
484 bp->cnt -= size;
485
486 /*
487 * If there is no more to read in the pipe, reset
488 * its pointers to the beginning. This improves
489 * cache hit stats.
490 */
491 if (bp->cnt == 0) {
492 bp->in = 0;
493 bp->out = 0;
494 }
495 nread += size;
496 continue;
497 }
498
499 /*
500 * Break if some data was read.
501 */
502 if (nread > 0)
503 break;
504
505 /*
506 * Detect EOF condition.
507 * Read returns 0 on EOF, no need to set error.
508 */
509 if (rpipe->pipe_state & PIPE_EOF)
510 break;
511
512 /*
513 * Don't block on non-blocking I/O.
514 */
515 if (fp->f_flag & FNONBLOCK) {
516 error = EAGAIN;
517 break;
518 }
519
520 /*
521 * Unlock the pipe buffer for our remaining processing.
522 * We will either break out with an error or we will
523 * sleep and relock to loop.
524 */
525 pipeunlock(rpipe);
526
527 #if 1 /* XXX (dsl) I'm sure these aren't needed here ... */
528 /*
529 * We want to read more, wake up select/poll.
530 */
531 pipeselwakeup(rpipe, rpipe->pipe_peer, POLL_OUT);
532
533 /*
534 * If the "write-side" is blocked, wake it up now.
535 */
536 cv_broadcast(&rpipe->pipe_wcv);
537 #endif
538
539 if (wakeup_state & PIPE_RESTART) {
540 error = ERESTART;
541 goto unlocked_error;
542 }
543
544 /* Now wait until the pipe is filled */
545 error = cv_wait_sig(&rpipe->pipe_rcv, lock);
546 if (error != 0)
547 goto unlocked_error;
548 wakeup_state = rpipe->pipe_state;
549 goto again;
550 }
551
552 if (error == 0)
553 getnanotime(&rpipe->pipe_atime);
554 pipeunlock(rpipe);
555
556 unlocked_error:
557 --rpipe->pipe_busy;
558 if (rpipe->pipe_busy == 0) {
559 rpipe->pipe_state &= ~PIPE_RESTART;
560 cv_broadcast(&rpipe->pipe_draincv);
561 }
562 if (bp->cnt < MINPIPESIZE) {
563 cv_broadcast(&rpipe->pipe_wcv);
564 }
565
566 /*
567 * If anything was read off the buffer, signal to the writer it's
568 * possible to write more data. Also send signal if we are here for the
569 * first time after last write.
570 */
571 if ((bp->size - bp->cnt) >= PIPE_BUF
572 && (ocnt != bp->cnt || (rpipe->pipe_state & PIPE_SIGNALR))) {
573 pipeselwakeup(rpipe, rpipe->pipe_peer, POLL_OUT);
574 rpipe->pipe_state &= ~PIPE_SIGNALR;
575 }
576
577 mutex_exit(lock);
578 return (error);
579 }
580
581 static int
582 pipe_write(file_t *fp, off_t *offset, struct uio *uio, kauth_cred_t cred,
583 int flags)
584 {
585 struct pipe *wpipe, *rpipe;
586 struct pipebuf *bp;
587 kmutex_t *lock;
588 int error;
589 unsigned int wakeup_state = 0;
590
591 /* We want to write to our peer */
592 rpipe = fp->f_pipe;
593 lock = rpipe->pipe_lock;
594 error = 0;
595
596 mutex_enter(lock);
597 wpipe = rpipe->pipe_peer;
598
599 /*
600 * Detect loss of pipe read side, issue SIGPIPE if lost.
601 */
602 if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) != 0) {
603 mutex_exit(lock);
604 return EPIPE;
605 }
606 ++wpipe->pipe_busy;
607
608 /* Acquire the long-term pipe lock */
609 if ((error = pipelock(wpipe, true)) != 0) {
610 --wpipe->pipe_busy;
611 if (wpipe->pipe_busy == 0) {
612 wpipe->pipe_state &= ~PIPE_RESTART;
613 cv_broadcast(&wpipe->pipe_draincv);
614 }
615 mutex_exit(lock);
616 return (error);
617 }
618
619 bp = &wpipe->pipe_buffer;
620
621 /*
622 * If it is advantageous to resize the pipe buffer, do so.
623 */
624 if ((uio->uio_resid > PIPE_SIZE) &&
625 (nbigpipe < maxbigpipes) &&
626 (bp->size <= PIPE_SIZE) && (bp->cnt == 0)) {
627
628 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
629 atomic_inc_uint(&nbigpipe);
630 }
631
632 while (uio->uio_resid) {
633 size_t space;
634
635 space = bp->size - bp->cnt;
636
637 /* Writes of size <= PIPE_BUF must be atomic. */
638 if ((space < uio->uio_resid) && (uio->uio_resid <= PIPE_BUF))
639 space = 0;
640
641 if (space > 0) {
642 int size; /* Transfer size */
643 int segsize; /* first segment to transfer */
644
645 /*
646 * Transfer size is minimum of uio transfer
647 * and free space in pipe buffer.
648 */
649 if (space > uio->uio_resid)
650 size = uio->uio_resid;
651 else
652 size = space;
653 /*
654 * First segment to transfer is minimum of
655 * transfer size and contiguous space in
656 * pipe buffer. If first segment to transfer
657 * is less than the transfer size, we've got
658 * a wraparound in the buffer.
659 */
660 segsize = bp->size - bp->in;
661 if (segsize > size)
662 segsize = size;
663
664 /* Transfer first segment */
665 mutex_exit(lock);
666 error = uiomove((char *)bp->buffer + bp->in, segsize,
667 uio);
668
669 if (error == 0 && segsize < size) {
670 /*
671 * Transfer remaining part now, to
672 * support atomic writes. Wraparound
673 * happened.
674 */
675 KASSERT(bp->in + segsize == bp->size);
676 error = uiomove(bp->buffer,
677 size - segsize, uio);
678 }
679 mutex_enter(lock);
680 if (error)
681 break;
682
683 bp->in += size;
684 if (bp->in >= bp->size) {
685 KASSERT(bp->in == size - segsize + bp->size);
686 bp->in = size - segsize;
687 }
688
689 bp->cnt += size;
690 KASSERT(bp->cnt <= bp->size);
691 wakeup_state = 0;
692 } else {
693 /*
694 * If the "read-side" has been blocked, wake it up now.
695 */
696 cv_broadcast(&wpipe->pipe_rcv);
697
698 /*
699 * Don't block on non-blocking I/O.
700 */
701 if (fp->f_flag & FNONBLOCK) {
702 error = EAGAIN;
703 break;
704 }
705
706 /*
707 * We have no more space and have something to offer,
708 * wake up select/poll.
709 */
710 if (bp->cnt)
711 pipeselwakeup(wpipe, wpipe, POLL_IN);
712
713 if (wakeup_state & PIPE_RESTART) {
714 error = ERESTART;
715 break;
716 }
717
718 /*
719 * If read side wants to go away, we just issue a signal
720 * to ourselves.
721 */
722 if (wpipe->pipe_state & PIPE_EOF) {
723 error = EPIPE;
724 break;
725 }
726
727 pipeunlock(wpipe);
728 error = cv_wait_sig(&wpipe->pipe_wcv, lock);
729 (void)pipelock(wpipe, false);
730 if (error != 0)
731 break;
732 wakeup_state = wpipe->pipe_state;
733 }
734 }
735
736 --wpipe->pipe_busy;
737 if (wpipe->pipe_busy == 0) {
738 wpipe->pipe_state &= ~PIPE_RESTART;
739 cv_broadcast(&wpipe->pipe_draincv);
740 }
741 if (bp->cnt > 0) {
742 cv_broadcast(&wpipe->pipe_rcv);
743 }
744
745 /*
746 * Don't return EPIPE if I/O was successful
747 */
748 if (error == EPIPE && bp->cnt == 0 && uio->uio_resid == 0)
749 error = 0;
750
751 if (error == 0)
752 getnanotime(&wpipe->pipe_mtime);
753
754 /*
755 * We have something to offer, wake up select/poll.
756 * wmap->cnt is always 0 in this point (direct write
757 * is only done synchronously), so check only wpipe->pipe_buffer.cnt
758 */
759 if (bp->cnt)
760 pipeselwakeup(wpipe, wpipe, POLL_IN);
761
762 /*
763 * Arrange for next read(2) to do a signal.
764 */
765 wpipe->pipe_state |= PIPE_SIGNALR;
766
767 pipeunlock(wpipe);
768 mutex_exit(lock);
769 return (error);
770 }
771
772 /*
773 * We implement a very minimal set of ioctls for compatibility with sockets.
774 */
775 int
776 pipe_ioctl(file_t *fp, u_long cmd, void *data)
777 {
778 struct pipe *pipe = fp->f_pipe;
779 kmutex_t *lock = pipe->pipe_lock;
780
781 switch (cmd) {
782
783 case FIONBIO:
784 return (0);
785
786 case FIOASYNC:
787 mutex_enter(lock);
788 if (*(int *)data) {
789 pipe->pipe_state |= PIPE_ASYNC;
790 } else {
791 pipe->pipe_state &= ~PIPE_ASYNC;
792 }
793 mutex_exit(lock);
794 return (0);
795
796 case FIONREAD:
797 mutex_enter(lock);
798 *(int *)data = pipe->pipe_buffer.cnt;
799 mutex_exit(lock);
800 return (0);
801
802 case FIONWRITE:
803 /* Look at other side */
804 mutex_enter(lock);
805 pipe = pipe->pipe_peer;
806 if (pipe == NULL)
807 *(int *)data = 0;
808 else
809 *(int *)data = pipe->pipe_buffer.cnt;
810 mutex_exit(lock);
811 return (0);
812
813 case FIONSPACE:
814 /* Look at other side */
815 mutex_enter(lock);
816 pipe = pipe->pipe_peer;
817 if (pipe == NULL)
818 *(int *)data = 0;
819 else
820 *(int *)data = pipe->pipe_buffer.size -
821 pipe->pipe_buffer.cnt;
822 mutex_exit(lock);
823 return (0);
824
825 case TIOCSPGRP:
826 case FIOSETOWN:
827 return fsetown(&pipe->pipe_pgid, cmd, data);
828
829 case TIOCGPGRP:
830 case FIOGETOWN:
831 return fgetown(pipe->pipe_pgid, cmd, data);
832
833 }
834 return (EPASSTHROUGH);
835 }
836
837 int
838 pipe_poll(file_t *fp, int events)
839 {
840 struct pipe *rpipe = fp->f_pipe;
841 struct pipe *wpipe;
842 int eof = 0;
843 int revents = 0;
844
845 mutex_enter(rpipe->pipe_lock);
846 wpipe = rpipe->pipe_peer;
847
848 if (events & (POLLIN | POLLRDNORM))
849 if ((rpipe->pipe_buffer.cnt > 0) ||
850 (rpipe->pipe_state & PIPE_EOF))
851 revents |= events & (POLLIN | POLLRDNORM);
852
853 eof |= (rpipe->pipe_state & PIPE_EOF);
854
855 if (wpipe == NULL)
856 revents |= events & (POLLOUT | POLLWRNORM);
857 else {
858 if (events & (POLLOUT | POLLWRNORM))
859 if ((wpipe->pipe_state & PIPE_EOF) || (
860 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
861 revents |= events & (POLLOUT | POLLWRNORM);
862
863 eof |= (wpipe->pipe_state & PIPE_EOF);
864 }
865
866 if (wpipe == NULL || eof)
867 revents |= POLLHUP;
868
869 if (revents == 0) {
870 if (events & (POLLIN | POLLRDNORM))
871 selrecord(curlwp, &rpipe->pipe_sel);
872
873 if (events & (POLLOUT | POLLWRNORM))
874 selrecord(curlwp, &wpipe->pipe_sel);
875 }
876 mutex_exit(rpipe->pipe_lock);
877
878 return (revents);
879 }
880
881 static int
882 pipe_stat(file_t *fp, struct stat *ub)
883 {
884 struct pipe *pipe = fp->f_pipe;
885
886 mutex_enter(pipe->pipe_lock);
887 memset(ub, 0, sizeof(*ub));
888 ub->st_mode = S_IFIFO | S_IRUSR | S_IWUSR;
889 ub->st_blksize = pipe->pipe_buffer.size;
890 if (ub->st_blksize == 0 && pipe->pipe_peer)
891 ub->st_blksize = pipe->pipe_peer->pipe_buffer.size;
892 ub->st_size = pipe->pipe_buffer.cnt;
893 ub->st_blocks = (ub->st_size) ? 1 : 0;
894 ub->st_atimespec = pipe->pipe_atime;
895 ub->st_mtimespec = pipe->pipe_mtime;
896 ub->st_ctimespec = ub->st_birthtimespec = pipe->pipe_btime;
897 ub->st_uid = kauth_cred_geteuid(fp->f_cred);
898 ub->st_gid = kauth_cred_getegid(fp->f_cred);
899
900 /*
901 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
902 * XXX (st_dev, st_ino) should be unique.
903 */
904 mutex_exit(pipe->pipe_lock);
905 return 0;
906 }
907
908 static int
909 pipe_close(file_t *fp)
910 {
911 struct pipe *pipe = fp->f_pipe;
912
913 fp->f_pipe = NULL;
914 pipeclose(pipe);
915 return (0);
916 }
917
918 static void
919 pipe_restart(file_t *fp)
920 {
921 struct pipe *pipe = fp->f_pipe;
922
923 /*
924 * Unblock blocked reads/writes in order to allow close() to complete.
925 * System calls return ERESTART so that the fd is revalidated.
926 * (Partial writes return the transfer length.)
927 */
928 mutex_enter(pipe->pipe_lock);
929 pipe->pipe_state |= PIPE_RESTART;
930 /* Wakeup both cvs, maybe we only need one, but maybe there are some
931 * other paths where wakeup is needed, and it saves deciding which! */
932 cv_broadcast(&pipe->pipe_rcv);
933 cv_broadcast(&pipe->pipe_wcv);
934 mutex_exit(pipe->pipe_lock);
935 }
936
937 static int
938 pipe_fpathconf(struct file *fp, int name, register_t *retval)
939 {
940
941 switch (name) {
942 case _PC_PIPE_BUF:
943 *retval = PIPE_BUF;
944 return 0;
945 default:
946 return EINVAL;
947 }
948 }
949
950 static int
951 pipe_posix_fadvise(struct file *fp, off_t offset, off_t len, int advice)
952 {
953
954 return ESPIPE;
955 }
956
957 static void
958 pipe_free_kmem(struct pipe *pipe)
959 {
960
961 if (pipe->pipe_buffer.buffer != NULL) {
962 if (pipe->pipe_buffer.size > PIPE_SIZE) {
963 atomic_dec_uint(&nbigpipe);
964 }
965 if (pipe->pipe_buffer.buffer != (void *)pipe->pipe_kmem) {
966 uvm_km_free(kernel_map,
967 (vaddr_t)pipe->pipe_buffer.buffer,
968 pipe->pipe_buffer.size, UVM_KMF_PAGEABLE);
969 atomic_add_int(&amountpipekva,
970 -pipe->pipe_buffer.size);
971 }
972 pipe->pipe_buffer.buffer = NULL;
973 }
974 }
975
976 /*
977 * Shutdown the pipe.
978 */
979 static void
980 pipeclose(struct pipe *pipe)
981 {
982 kmutex_t *lock;
983 struct pipe *ppipe;
984
985 if (pipe == NULL)
986 return;
987
988 KASSERT(cv_is_valid(&pipe->pipe_rcv));
989 KASSERT(cv_is_valid(&pipe->pipe_wcv));
990 KASSERT(cv_is_valid(&pipe->pipe_draincv));
991 KASSERT(cv_is_valid(&pipe->pipe_lkcv));
992
993 lock = pipe->pipe_lock;
994 if (lock == NULL)
995 /* Must have failed during create */
996 goto free_resources;
997
998 mutex_enter(lock);
999 pipeselwakeup(pipe, pipe, POLL_HUP);
1000
1001 /*
1002 * If the other side is blocked, wake it up saying that
1003 * we want to close it down.
1004 */
1005 pipe->pipe_state |= PIPE_EOF;
1006 if (pipe->pipe_busy) {
1007 while (pipe->pipe_busy) {
1008 cv_broadcast(&pipe->pipe_wcv);
1009 cv_wait_sig(&pipe->pipe_draincv, lock);
1010 }
1011 }
1012
1013 /*
1014 * Disconnect from peer.
1015 */
1016 if ((ppipe = pipe->pipe_peer) != NULL) {
1017 pipeselwakeup(ppipe, ppipe, POLL_HUP);
1018 ppipe->pipe_state |= PIPE_EOF;
1019 cv_broadcast(&ppipe->pipe_rcv);
1020 ppipe->pipe_peer = NULL;
1021 }
1022
1023 /*
1024 * Any knote objects still left in the list are
1025 * the one attached by peer. Since no one will
1026 * traverse this list, we just clear it.
1027 *
1028 * XXX Exposes select/kqueue internals.
1029 */
1030 SLIST_INIT(&pipe->pipe_sel.sel_klist);
1031
1032 KASSERT((pipe->pipe_state & PIPE_LOCKFL) == 0);
1033 mutex_exit(lock);
1034 mutex_obj_free(lock);
1035
1036 /*
1037 * Free resources.
1038 */
1039 free_resources:
1040 pipe->pipe_pgid = 0;
1041 pipe->pipe_state = PIPE_SIGNALR;
1042 pipe->pipe_peer = NULL;
1043 pipe->pipe_lock = NULL;
1044 pipe_free_kmem(pipe);
1045 if (pipe->pipe_kmem != 0) {
1046 pool_cache_put(pipe_rd_cache, pipe);
1047 } else {
1048 pool_cache_put(pipe_wr_cache, pipe);
1049 }
1050 }
1051
1052 static void
1053 filt_pipedetach(struct knote *kn)
1054 {
1055 struct pipe *pipe;
1056 kmutex_t *lock;
1057
1058 pipe = ((file_t *)kn->kn_obj)->f_pipe;
1059 lock = pipe->pipe_lock;
1060
1061 mutex_enter(lock);
1062
1063 switch(kn->kn_filter) {
1064 case EVFILT_WRITE:
1065 /* Need the peer structure, not our own. */
1066 pipe = pipe->pipe_peer;
1067
1068 /* If reader end already closed, just return. */
1069 if (pipe == NULL) {
1070 mutex_exit(lock);
1071 return;
1072 }
1073
1074 break;
1075 default:
1076 /* Nothing to do. */
1077 break;
1078 }
1079
1080 KASSERT(kn->kn_hook == pipe);
1081 selremove_knote(&pipe->pipe_sel, kn);
1082 mutex_exit(lock);
1083 }
1084
1085 static int
1086 filt_piperead(struct knote *kn, long hint)
1087 {
1088 struct pipe *rpipe = ((file_t *)kn->kn_obj)->f_pipe;
1089 struct pipe *wpipe;
1090 int rv;
1091
1092 if ((hint & NOTE_SUBMIT) == 0) {
1093 mutex_enter(rpipe->pipe_lock);
1094 }
1095 wpipe = rpipe->pipe_peer;
1096 kn->kn_data = rpipe->pipe_buffer.cnt;
1097
1098 if ((rpipe->pipe_state & PIPE_EOF) ||
1099 (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1100 knote_set_eof(kn, 0);
1101 rv = 1;
1102 } else {
1103 rv = kn->kn_data > 0;
1104 }
1105
1106 if ((hint & NOTE_SUBMIT) == 0) {
1107 mutex_exit(rpipe->pipe_lock);
1108 }
1109 return rv;
1110 }
1111
1112 static int
1113 filt_pipewrite(struct knote *kn, long hint)
1114 {
1115 struct pipe *rpipe = ((file_t *)kn->kn_obj)->f_pipe;
1116 struct pipe *wpipe;
1117 int rv;
1118
1119 if ((hint & NOTE_SUBMIT) == 0) {
1120 mutex_enter(rpipe->pipe_lock);
1121 }
1122 wpipe = rpipe->pipe_peer;
1123
1124 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1125 kn->kn_data = 0;
1126 knote_set_eof(kn, 0);
1127 rv = 1;
1128 } else {
1129 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1130 rv = kn->kn_data >= PIPE_BUF;
1131 }
1132
1133 if ((hint & NOTE_SUBMIT) == 0) {
1134 mutex_exit(rpipe->pipe_lock);
1135 }
1136 return rv;
1137 }
1138
1139 static const struct filterops pipe_rfiltops = {
1140 .f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE,
1141 .f_attach = NULL,
1142 .f_detach = filt_pipedetach,
1143 .f_event = filt_piperead,
1144 };
1145
1146 static const struct filterops pipe_wfiltops = {
1147 .f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE,
1148 .f_attach = NULL,
1149 .f_detach = filt_pipedetach,
1150 .f_event = filt_pipewrite,
1151 };
1152
1153 static int
1154 pipe_kqfilter(file_t *fp, struct knote *kn)
1155 {
1156 struct pipe *pipe;
1157 kmutex_t *lock;
1158
1159 pipe = ((file_t *)kn->kn_obj)->f_pipe;
1160 lock = pipe->pipe_lock;
1161
1162 mutex_enter(lock);
1163
1164 switch (kn->kn_filter) {
1165 case EVFILT_READ:
1166 kn->kn_fop = &pipe_rfiltops;
1167 break;
1168 case EVFILT_WRITE:
1169 kn->kn_fop = &pipe_wfiltops;
1170 pipe = pipe->pipe_peer;
1171 if (pipe == NULL) {
1172 /* Other end of pipe has been closed. */
1173 mutex_exit(lock);
1174 return (EBADF);
1175 }
1176 break;
1177 default:
1178 mutex_exit(lock);
1179 return (EINVAL);
1180 }
1181
1182 kn->kn_hook = pipe;
1183 selrecord_knote(&pipe->pipe_sel, kn);
1184 mutex_exit(lock);
1185
1186 return (0);
1187 }
1188
1189 /*
1190 * Handle pipe sysctls.
1191 */
1192 SYSCTL_SETUP(sysctl_kern_pipe_setup, "sysctl kern.pipe subtree setup")
1193 {
1194
1195 sysctl_createv(clog, 0, NULL, NULL,
1196 CTLFLAG_PERMANENT,
1197 CTLTYPE_NODE, "pipe",
1198 SYSCTL_DESCR("Pipe settings"),
1199 NULL, 0, NULL, 0,
1200 CTL_KERN, KERN_PIPE, CTL_EOL);
1201
1202 sysctl_createv(clog, 0, NULL, NULL,
1203 CTLFLAG_PERMANENT|CTLFLAG_READWRITE,
1204 CTLTYPE_INT, "maxbigpipes",
1205 SYSCTL_DESCR("Maximum number of \"big\" pipes"),
1206 NULL, 0, &maxbigpipes, 0,
1207 CTL_KERN, KERN_PIPE, KERN_PIPE_MAXBIGPIPES, CTL_EOL);
1208 sysctl_createv(clog, 0, NULL, NULL,
1209 CTLFLAG_PERMANENT,
1210 CTLTYPE_INT, "nbigpipes",
1211 SYSCTL_DESCR("Number of \"big\" pipes"),
1212 NULL, 0, &nbigpipe, 0,
1213 CTL_KERN, KERN_PIPE, KERN_PIPE_NBIGPIPES, CTL_EOL);
1214 sysctl_createv(clog, 0, NULL, NULL,
1215 CTLFLAG_PERMANENT,
1216 CTLTYPE_INT, "kvasize",
1217 SYSCTL_DESCR("Amount of kernel memory consumed by pipe "
1218 "buffers"),
1219 NULL, 0, &amountpipekva, 0,
1220 CTL_KERN, KERN_PIPE, KERN_PIPE_KVASIZE, CTL_EOL);
1221 }
1222