Home | History | Annotate | Line # | Download | only in isc
ev_streams.c revision 1.1.1.3
      1 /*	$NetBSD: ev_streams.c,v 1.1.1.3 2007/03/30 20:16:19 ghen Exp $	*/
      2 
      3 /*
      4  * Copyright (c) 2004 by Internet Systems Consortium, Inc. ("ISC")
      5  * Copyright (c) 1996-1999 by Internet Software Consortium
      6  *
      7  * Permission to use, copy, modify, and distribute this software for any
      8  * purpose with or without fee is hereby granted, provided that the above
      9  * copyright notice and this permission notice appear in all copies.
     10  *
     11  * THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES
     12  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     13  * MERCHANTABILITY AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR
     14  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     15  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     16  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
     17  * OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     18  */
     19 
     20 /* ev_streams.c - implement asynch stream file IO for the eventlib
     21  * vix 04mar96 [initial]
     22  */
     23 
     24 #if !defined(LINT) && !defined(CODECENTER)
     25 static const char rcsid[] = "Id: ev_streams.c,v 1.4.18.1 2005/04/27 05:01:06 sra Exp";
     26 #endif
     27 
     28 #include "port_before.h"
     29 #include "fd_setsize.h"
     30 
     31 #include <sys/types.h>
     32 #include <sys/uio.h>
     33 
     34 #include <errno.h>
     35 
     36 #include <isc/eventlib.h>
     37 #include <isc/assertions.h>
     38 #include "eventlib_p.h"
     39 
     40 #include "port_after.h"
     41 
     42 static int	copyvec(evStream *str, const struct iovec *iov, int iocnt);
     43 static void	consume(evStream *str, size_t bytes);
     44 static void	done(evContext opaqueCtx, evStream *str);
     45 static void	writable(evContext opaqueCtx, void *uap, int fd, int evmask);
     46 static void	readable(evContext opaqueCtx, void *uap, int fd, int evmask);
     47 
     48 struct iovec
     49 evConsIovec(void *buf, size_t cnt) {
     50 	struct iovec ret;
     51 
     52 	memset(&ret, 0xf5, sizeof ret);
     53 	ret.iov_base = buf;
     54 	ret.iov_len = cnt;
     55 	return (ret);
     56 }
     57 
     58 int
     59 evWrite(evContext opaqueCtx, int fd, const struct iovec *iov, int iocnt,
     60 	evStreamFunc func, void *uap, evStreamID *id)
     61 {
     62 	evContext_p *ctx = opaqueCtx.opaque;
     63 	evStream *new;
     64 	int save;
     65 
     66 	OKNEW(new);
     67 	new->func = func;
     68 	new->uap = uap;
     69 	new->fd = fd;
     70 	new->flags = 0;
     71 	if (evSelectFD(opaqueCtx, fd, EV_WRITE, writable, new, &new->file) < 0)
     72 		goto free;
     73 	if (copyvec(new, iov, iocnt) < 0)
     74 		goto free;
     75 	new->prevDone = NULL;
     76 	new->nextDone = NULL;
     77 	if (ctx->streams != NULL)
     78 		ctx->streams->prev = new;
     79 	new->prev = NULL;
     80 	new->next = ctx->streams;
     81 	ctx->streams = new;
     82 	if (id != NULL)
     83 		id->opaque = new;
     84 	return (0);
     85  free:
     86 	save = errno;
     87 	FREE(new);
     88 	errno = save;
     89 	return (-1);
     90 }
     91 
     92 int
     93 evRead(evContext opaqueCtx, int fd, const struct iovec *iov, int iocnt,
     94        evStreamFunc func, void *uap, evStreamID *id)
     95 {
     96 	evContext_p *ctx = opaqueCtx.opaque;
     97 	evStream *new;
     98 	int save;
     99 
    100 	OKNEW(new);
    101 	new->func = func;
    102 	new->uap = uap;
    103 	new->fd = fd;
    104 	new->flags = 0;
    105 	if (evSelectFD(opaqueCtx, fd, EV_READ, readable, new, &new->file) < 0)
    106 		goto free;
    107 	if (copyvec(new, iov, iocnt) < 0)
    108 		goto free;
    109 	new->prevDone = NULL;
    110 	new->nextDone = NULL;
    111 	if (ctx->streams != NULL)
    112 		ctx->streams->prev = new;
    113 	new->prev = NULL;
    114 	new->next = ctx->streams;
    115 	ctx->streams = new;
    116 	if (id)
    117 		id->opaque = new;
    118 	return (0);
    119  free:
    120 	save = errno;
    121 	FREE(new);
    122 	errno = save;
    123 	return (-1);
    124 }
    125 
    126 int
    127 evTimeRW(evContext opaqueCtx, evStreamID id, evTimerID timer) /*ARGSUSED*/ {
    128 	evStream *str = id.opaque;
    129 
    130 	UNUSED(opaqueCtx);
    131 
    132 	str->timer = timer;
    133 	str->flags |= EV_STR_TIMEROK;
    134 	return (0);
    135 }
    136 
    137 int
    138 evUntimeRW(evContext opaqueCtx, evStreamID id) /*ARGSUSED*/ {
    139 	evStream *str = id.opaque;
    140 
    141 	UNUSED(opaqueCtx);
    142 
    143 	str->flags &= ~EV_STR_TIMEROK;
    144 	return (0);
    145 }
    146 
    147 int
    148 evCancelRW(evContext opaqueCtx, evStreamID id) {
    149 	evContext_p *ctx = opaqueCtx.opaque;
    150 	evStream *old = id.opaque;
    151 
    152 	/*
    153 	 * The streams list is doubly threaded.  First, there's ctx->streams
    154 	 * that's used by evDestroy() to find and cancel all streams.  Second,
    155 	 * there's ctx->strDone (head) and ctx->strLast (tail) which thread
    156 	 * through the potentially smaller number of "IO completed" streams,
    157 	 * used in evGetNext() to avoid scanning the entire list.
    158 	 */
    159 
    160 	/* Unlink from ctx->streams. */
    161 	if (old->prev != NULL)
    162 		old->prev->next = old->next;
    163 	else
    164 		ctx->streams = old->next;
    165 	if (old->next != NULL)
    166 		old->next->prev = old->prev;
    167 
    168 	/*
    169 	 * If 'old' is on the ctx->strDone list, remove it.  Update
    170 	 * ctx->strLast if necessary.
    171 	 */
    172 	if (old->prevDone == NULL && old->nextDone == NULL) {
    173 		/*
    174 		 * Either 'old' is the only item on the done list, or it's
    175 		 * not on the done list.  If the former, then we unlink it
    176 		 * from the list.  If the latter, we leave the list alone.
    177 		 */
    178 		if (ctx->strDone == old) {
    179 			ctx->strDone = NULL;
    180 			ctx->strLast = NULL;
    181 		}
    182 	} else {
    183 		if (old->prevDone != NULL)
    184 			old->prevDone->nextDone = old->nextDone;
    185 		else
    186 			ctx->strDone = old->nextDone;
    187 		if (old->nextDone != NULL)
    188 			old->nextDone->prevDone = old->prevDone;
    189 		else
    190 			ctx->strLast = old->prevDone;
    191 	}
    192 
    193 	/* Deallocate the stream. */
    194 	if (old->file.opaque)
    195 		evDeselectFD(opaqueCtx, old->file);
    196 	memput(old->iovOrig, sizeof (struct iovec) * old->iovOrigCount);
    197 	FREE(old);
    198 	return (0);
    199 }
    200 
    201 /* Copy a scatter/gather vector and initialize a stream handler's IO. */
    202 static int
    203 copyvec(evStream *str, const struct iovec *iov, int iocnt) {
    204 	int i;
    205 
    206 	str->iovOrig = (struct iovec *)memget(sizeof(struct iovec) * iocnt);
    207 	if (str->iovOrig == NULL) {
    208 		errno = ENOMEM;
    209 		return (-1);
    210 	}
    211 	str->ioTotal = 0;
    212 	for (i = 0; i < iocnt; i++) {
    213 		str->iovOrig[i] = iov[i];
    214 		str->ioTotal += iov[i].iov_len;
    215 	}
    216 	str->iovOrigCount = iocnt;
    217 	str->iovCur = str->iovOrig;
    218 	str->iovCurCount = str->iovOrigCount;
    219 	str->ioDone = 0;
    220 	return (0);
    221 }
    222 
    223 /* Pull off or truncate lead iovec(s). */
    224 static void
    225 consume(evStream *str, size_t bytes) {
    226 	while (bytes > 0U) {
    227 		if (bytes < (size_t)str->iovCur->iov_len) {
    228 			str->iovCur->iov_len -= bytes;
    229 			str->iovCur->iov_base = (void *)
    230 				((u_char *)str->iovCur->iov_base + bytes);
    231 			str->ioDone += bytes;
    232 			bytes = 0;
    233 		} else {
    234 			bytes -= str->iovCur->iov_len;
    235 			str->ioDone += str->iovCur->iov_len;
    236 			str->iovCur++;
    237 			str->iovCurCount--;
    238 		}
    239 	}
    240 }
    241 
    242 /* Add a stream to Done list and deselect the FD. */
    243 static void
    244 done(evContext opaqueCtx, evStream *str) {
    245 	evContext_p *ctx = opaqueCtx.opaque;
    246 
    247 	if (ctx->strLast != NULL) {
    248 		str->prevDone = ctx->strLast;
    249 		ctx->strLast->nextDone = str;
    250 		ctx->strLast = str;
    251 	} else {
    252 		INSIST(ctx->strDone == NULL);
    253 		ctx->strDone = ctx->strLast = str;
    254 	}
    255 	evDeselectFD(opaqueCtx, str->file);
    256 	str->file.opaque = NULL;
    257 	/* evDrop() will call evCancelRW() on us. */
    258 }
    259 
    260 /* Dribble out some bytes on the stream.  (Called by evDispatch().) */
    261 static void
    262 writable(evContext opaqueCtx, void *uap, int fd, int evmask) {
    263 	evStream *str = uap;
    264 	int bytes;
    265 
    266 	UNUSED(evmask);
    267 
    268 	bytes = writev(fd, str->iovCur, str->iovCurCount);
    269 	if (bytes > 0) {
    270 		if ((str->flags & EV_STR_TIMEROK) != 0)
    271 			evTouchIdleTimer(opaqueCtx, str->timer);
    272 		consume(str, bytes);
    273 	} else {
    274 		if (bytes < 0 && errno != EINTR) {
    275 			str->ioDone = -1;
    276 			str->ioErrno = errno;
    277 		}
    278 	}
    279 	if (str->ioDone == -1 || str->ioDone == str->ioTotal)
    280 		done(opaqueCtx, str);
    281 }
    282 
    283 /* Scoop up some bytes from the stream.  (Called by evDispatch().) */
    284 static void
    285 readable(evContext opaqueCtx, void *uap, int fd, int evmask) {
    286 	evStream *str = uap;
    287 	int bytes;
    288 
    289 	UNUSED(evmask);
    290 
    291 	bytes = readv(fd, str->iovCur, str->iovCurCount);
    292 	if (bytes > 0) {
    293 		if ((str->flags & EV_STR_TIMEROK) != 0)
    294 			evTouchIdleTimer(opaqueCtx, str->timer);
    295 		consume(str, bytes);
    296 	} else {
    297 		if (bytes == 0)
    298 			str->ioDone = 0;
    299 		else {
    300 			if (errno != EINTR) {
    301 				str->ioDone = -1;
    302 				str->ioErrno = errno;
    303 			}
    304 		}
    305 	}
    306 	if (str->ioDone <= 0 || str->ioDone == str->ioTotal)
    307 		done(opaqueCtx, str);
    308 }
    309 
    310 /*! \file */
    311