Home | History | Annotate | Line # | Download | only in dist
      1 /*	$NetBSD: bufferevent.c,v 1.6 2021/04/10 19:02:37 rillig Exp $	*/
      2 
      3 /*
      4  * Copyright (c) 2002-2007 Niels Provos <provos (at) citi.umich.edu>
      5  * Copyright (c) 2007-2012 Niels Provos, Nick Mathewson
      6  *
      7  * Redistribution and use in source and binary forms, with or without
      8  * modification, are permitted provided that the following conditions
      9  * are met:
     10  * 1. Redistributions of source code must retain the above copyright
     11  *    notice, this list of conditions and the following disclaimer.
     12  * 2. Redistributions in binary form must reproduce the above copyright
     13  *    notice, this list of conditions and the following disclaimer in the
     14  *    documentation and/or other materials provided with the distribution.
     15  * 3. The name of the author may not be used to endorse or promote products
     16  *    derived from this software without specific prior written permission.
     17  *
     18  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     19  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     20  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     21  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     22  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     23  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     24  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     25  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     27  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     28  */
     29 
     30 #include "event2/event-config.h"
     31 #include <sys/cdefs.h>
     32 __RCSID("$NetBSD: bufferevent.c,v 1.6 2021/04/10 19:02:37 rillig Exp $");
     33 #include "evconfig-private.h"
     34 
     35 #include <sys/types.h>
     36 
     37 #ifdef EVENT__HAVE_SYS_TIME_H
     38 #include <sys/time.h>
     39 #endif
     40 
     41 #include <errno.h>
     42 #include <stdio.h>
     43 #include <stdlib.h>
     44 #include <string.h>
     45 #ifdef EVENT__HAVE_STDARG_H
     46 #include <stdarg.h>
     47 #endif
     48 
     49 #ifdef _WIN32
     50 #include <winsock2.h>
     51 #endif
     52 
     53 #include "event2/util.h"
     54 #include "event2/buffer.h"
     55 #include "event2/buffer_compat.h"
     56 #include "event2/bufferevent.h"
     57 #include "event2/bufferevent_struct.h"
     58 #include "event2/bufferevent_compat.h"
     59 #include "event2/event.h"
     60 #include "event-internal.h"
     61 #include "log-internal.h"
     62 #include "mm-internal.h"
     63 #include "bufferevent-internal.h"
     64 #include "evbuffer-internal.h"
     65 #include "util-internal.h"
     66 
     67 static void bufferevent_cancel_all_(struct bufferevent *bev);
     68 static void bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_);
     69 
     70 void
     71 bufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what)
     72 {
     73 	struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
     74 	BEV_LOCK(bufev);
     75 	if (!bufev_private->read_suspended)
     76 		bufev->be_ops->disable(bufev, EV_READ);
     77 	bufev_private->read_suspended |= what;
     78 	BEV_UNLOCK(bufev);
     79 }
     80 
     81 void
     82 bufferevent_unsuspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what)
     83 {
     84 	struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
     85 	BEV_LOCK(bufev);
     86 	bufev_private->read_suspended &= ~what;
     87 	if (!bufev_private->read_suspended && (bufev->enabled & EV_READ))
     88 		bufev->be_ops->enable(bufev, EV_READ);
     89 	BEV_UNLOCK(bufev);
     90 }
     91 
     92 void
     93 bufferevent_suspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what)
     94 {
     95 	struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
     96 	BEV_LOCK(bufev);
     97 	if (!bufev_private->write_suspended)
     98 		bufev->be_ops->disable(bufev, EV_WRITE);
     99 	bufev_private->write_suspended |= what;
    100 	BEV_UNLOCK(bufev);
    101 }
    102 
    103 void
    104 bufferevent_unsuspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what)
    105 {
    106 	struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
    107 	BEV_LOCK(bufev);
    108 	bufev_private->write_suspended &= ~what;
    109 	if (!bufev_private->write_suspended && (bufev->enabled & EV_WRITE))
    110 		bufev->be_ops->enable(bufev, EV_WRITE);
    111 	BEV_UNLOCK(bufev);
    112 }
    113 
    114 /**
    115  * Sometimes bufferevent's implementation can overrun high watermarks
    116  * (one of examples is openssl) and in this case if the read callback
    117  * will not handle enough data do over condition above the read
    118  * callback will never be called again (due to suspend above).
    119  *
    120  * To avoid this we are scheduling read callback again here, but only
    121  * from the user callback to avoid multiple scheduling:
    122  * - when the data had been added to it
    123  * - when the data had been drained from it (user specified read callback)
    124  */
    125 static void bufferevent_inbuf_wm_check(struct bufferevent *bev)
    126 {
    127 	if (!bev->wm_read.high)
    128 		return;
    129 	if (!(bev->enabled & EV_READ))
    130 		return;
    131 	if (evbuffer_get_length(bev->input) < bev->wm_read.high)
    132 		return;
    133 
    134 	bufferevent_trigger(bev, EV_READ, BEV_OPT_DEFER_CALLBACKS);
    135 }
    136 
    137 /* Callback to implement watermarks on the input buffer.  Only enabled
    138  * if the watermark is set. */
    139 static void
    140 bufferevent_inbuf_wm_cb(struct evbuffer *buf,
    141     const struct evbuffer_cb_info *cbinfo,
    142     void *arg)
    143 {
    144 	struct bufferevent *bufev = arg;
    145 	size_t size;
    146 
    147 	size = evbuffer_get_length(buf);
    148 
    149 	if (size >= bufev->wm_read.high)
    150 		bufferevent_wm_suspend_read(bufev);
    151 	else
    152 		bufferevent_wm_unsuspend_read(bufev);
    153 }
    154 
    155 static void
    156 bufferevent_run_deferred_callbacks_locked(struct event_callback *cb, void *arg)
    157 {
    158 	struct bufferevent_private *bufev_private = arg;
    159 	struct bufferevent *bufev = &bufev_private->bev;
    160 
    161 	BEV_LOCK(bufev);
    162 	if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
    163 	    bufev->errorcb) {
    164 		/* The "connected" happened before any reads or writes, so
    165 		   send it first. */
    166 		bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
    167 		bufev->errorcb(bufev, BEV_EVENT_CONNECTED, bufev->cbarg);
    168 	}
    169 	if (bufev_private->readcb_pending && bufev->readcb) {
    170 		bufev_private->readcb_pending = 0;
    171 		bufev->readcb(bufev, bufev->cbarg);
    172 		bufferevent_inbuf_wm_check(bufev);
    173 	}
    174 	if (bufev_private->writecb_pending && bufev->writecb) {
    175 		bufev_private->writecb_pending = 0;
    176 		bufev->writecb(bufev, bufev->cbarg);
    177 	}
    178 	if (bufev_private->eventcb_pending && bufev->errorcb) {
    179 		short what = bufev_private->eventcb_pending;
    180 		int err = bufev_private->errno_pending;
    181 		bufev_private->eventcb_pending = 0;
    182 		bufev_private->errno_pending = 0;
    183 		EVUTIL_SET_SOCKET_ERROR(err);
    184 		bufev->errorcb(bufev, what, bufev->cbarg);
    185 	}
    186 	bufferevent_decref_and_unlock_(bufev);
    187 }
    188 
    189 static void
    190 bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg)
    191 {
    192 	struct bufferevent_private *bufev_private = arg;
    193 	struct bufferevent *bufev = &bufev_private->bev;
    194 
    195 	BEV_LOCK(bufev);
    196 #define UNLOCKED(stmt) \
    197 	do { BEV_UNLOCK(bufev); stmt; BEV_LOCK(bufev); } while(0)
    198 
    199 	if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
    200 	    bufev->errorcb) {
    201 		/* The "connected" happened before any reads or writes, so
    202 		   send it first. */
    203 		bufferevent_event_cb errorcb = bufev->errorcb;
    204 		void *cbarg = bufev->cbarg;
    205 		bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
    206 		UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg));
    207 	}
    208 	if (bufev_private->readcb_pending && bufev->readcb) {
    209 		bufferevent_data_cb readcb = bufev->readcb;
    210 		void *cbarg = bufev->cbarg;
    211 		bufev_private->readcb_pending = 0;
    212 		UNLOCKED(readcb(bufev, cbarg));
    213 		bufferevent_inbuf_wm_check(bufev);
    214 	}
    215 	if (bufev_private->writecb_pending && bufev->writecb) {
    216 		bufferevent_data_cb writecb = bufev->writecb;
    217 		void *cbarg = bufev->cbarg;
    218 		bufev_private->writecb_pending = 0;
    219 		UNLOCKED(writecb(bufev, cbarg));
    220 	}
    221 	if (bufev_private->eventcb_pending && bufev->errorcb) {
    222 		bufferevent_event_cb errorcb = bufev->errorcb;
    223 		void *cbarg = bufev->cbarg;
    224 		short what = bufev_private->eventcb_pending;
    225 		int err = bufev_private->errno_pending;
    226 		bufev_private->eventcb_pending = 0;
    227 		bufev_private->errno_pending = 0;
    228 		EVUTIL_SET_SOCKET_ERROR(err);
    229 		UNLOCKED(errorcb(bufev,what,cbarg));
    230 	}
    231 	bufferevent_decref_and_unlock_(bufev);
    232 #undef UNLOCKED
    233 }
    234 
    235 #define SCHEDULE_DEFERRED(bevp)						\
    236 	do {								\
    237 		if (event_deferred_cb_schedule_(			\
    238 			    (bevp)->bev.ev_base,			\
    239 			&(bevp)->deferred))				\
    240 			bufferevent_incref_(&(bevp)->bev);		\
    241 	} while (0)
    242 
    243 
    244 void
    245 bufferevent_run_readcb_(struct bufferevent *bufev, int options)
    246 {
    247 	/* Requires that we hold the lock and a reference */
    248 	struct bufferevent_private *p = BEV_UPCAST(bufev);
    249 	if (bufev->readcb == NULL)
    250 		return;
    251 	if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
    252 		p->readcb_pending = 1;
    253 		SCHEDULE_DEFERRED(p);
    254 	} else {
    255 		bufev->readcb(bufev, bufev->cbarg);
    256 		bufferevent_inbuf_wm_check(bufev);
    257 	}
    258 }
    259 
    260 void
    261 bufferevent_run_writecb_(struct bufferevent *bufev, int options)
    262 {
    263 	/* Requires that we hold the lock and a reference */
    264 	struct bufferevent_private *p = BEV_UPCAST(bufev);
    265 	if (bufev->writecb == NULL)
    266 		return;
    267 	if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
    268 		p->writecb_pending = 1;
    269 		SCHEDULE_DEFERRED(p);
    270 	} else {
    271 		bufev->writecb(bufev, bufev->cbarg);
    272 	}
    273 }
    274 
    275 #define BEV_TRIG_ALL_OPTS (			\
    276 		BEV_TRIG_IGNORE_WATERMARKS|	\
    277 		BEV_TRIG_DEFER_CALLBACKS	\
    278 	)
    279 
    280 void
    281 bufferevent_trigger(struct bufferevent *bufev, short iotype, int options)
    282 {
    283 	bufferevent_incref_and_lock_(bufev);
    284 	bufferevent_trigger_nolock_(bufev, iotype, options&BEV_TRIG_ALL_OPTS);
    285 	bufferevent_decref_and_unlock_(bufev);
    286 }
    287 
    288 void
    289 bufferevent_run_eventcb_(struct bufferevent *bufev, short what, int options)
    290 {
    291 	/* Requires that we hold the lock and a reference */
    292 	struct bufferevent_private *p = BEV_UPCAST(bufev);
    293 	if (bufev->errorcb == NULL)
    294 		return;
    295 	if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
    296 		p->eventcb_pending |= what;
    297 		p->errno_pending = EVUTIL_SOCKET_ERROR();
    298 		SCHEDULE_DEFERRED(p);
    299 	} else {
    300 		bufev->errorcb(bufev, what, bufev->cbarg);
    301 	}
    302 }
    303 
    304 void
    305 bufferevent_trigger_event(struct bufferevent *bufev, short what, int options)
    306 {
    307 	bufferevent_incref_and_lock_(bufev);
    308 	bufferevent_run_eventcb_(bufev, what, options&BEV_TRIG_ALL_OPTS);
    309 	bufferevent_decref_and_unlock_(bufev);
    310 }
    311 
    312 int
    313 bufferevent_init_common_(struct bufferevent_private *bufev_private,
    314     struct event_base *base,
    315     const struct bufferevent_ops *ops,
    316     enum bufferevent_options options)
    317 {
    318 	struct bufferevent *bufev = &bufev_private->bev;
    319 
    320 	if (!bufev->input) {
    321 		if ((bufev->input = evbuffer_new()) == NULL)
    322 			goto err;
    323 	}
    324 
    325 	if (!bufev->output) {
    326 		if ((bufev->output = evbuffer_new()) == NULL)
    327 			goto err;
    328 	}
    329 
    330 	bufev_private->refcnt = 1;
    331 	bufev->ev_base = base;
    332 
    333 	/* Disable timeouts. */
    334 	evutil_timerclear(&bufev->timeout_read);
    335 	evutil_timerclear(&bufev->timeout_write);
    336 
    337 	bufev->be_ops = ops;
    338 
    339 	if (bufferevent_ratelim_init_(bufev_private))
    340 		goto err;
    341 
    342 	/*
    343 	 * Set to EV_WRITE so that using bufferevent_write is going to
    344 	 * trigger a callback.  Reading needs to be explicitly enabled
    345 	 * because otherwise no data will be available.
    346 	 */
    347 	bufev->enabled = EV_WRITE;
    348 
    349 #ifndef EVENT__DISABLE_THREAD_SUPPORT
    350 	if (options & BEV_OPT_THREADSAFE) {
    351 		if (bufferevent_enable_locking_(bufev, NULL) < 0)
    352 			goto err;
    353 	}
    354 #endif
    355 	if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))
    356 	    == BEV_OPT_UNLOCK_CALLBACKS) {
    357 		event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
    358 		goto err;
    359 	}
    360 	if (options & BEV_OPT_UNLOCK_CALLBACKS)
    361 		event_deferred_cb_init_(
    362 		    &bufev_private->deferred,
    363 		    event_base_get_npriorities(base) / 2,
    364 		    bufferevent_run_deferred_callbacks_unlocked,
    365 		    bufev_private);
    366 	else
    367 		event_deferred_cb_init_(
    368 		    &bufev_private->deferred,
    369 		    event_base_get_npriorities(base) / 2,
    370 		    bufferevent_run_deferred_callbacks_locked,
    371 		    bufev_private);
    372 
    373 	bufev_private->options = options;
    374 
    375 	evbuffer_set_parent_(bufev->input, bufev);
    376 	evbuffer_set_parent_(bufev->output, bufev);
    377 
    378 	return 0;
    379 
    380 err:
    381 	if (bufev->input) {
    382 		evbuffer_free(bufev->input);
    383 		bufev->input = NULL;
    384 	}
    385 	if (bufev->output) {
    386 		evbuffer_free(bufev->output);
    387 		bufev->output = NULL;
    388 	}
    389 	return -1;
    390 }
    391 
    392 void
    393 bufferevent_setcb(struct bufferevent *bufev,
    394     bufferevent_data_cb readcb, bufferevent_data_cb writecb,
    395     bufferevent_event_cb eventcb, void *cbarg)
    396 {
    397 	BEV_LOCK(bufev);
    398 
    399 	bufev->readcb = readcb;
    400 	bufev->writecb = writecb;
    401 	bufev->errorcb = eventcb;
    402 
    403 	bufev->cbarg = cbarg;
    404 	BEV_UNLOCK(bufev);
    405 }
    406 
    407 void
    408 bufferevent_getcb(struct bufferevent *bufev,
    409     bufferevent_data_cb *readcb_ptr,
    410     bufferevent_data_cb *writecb_ptr,
    411     bufferevent_event_cb *eventcb_ptr,
    412     void **cbarg_ptr)
    413 {
    414 	BEV_LOCK(bufev);
    415 	if (readcb_ptr)
    416 		*readcb_ptr = bufev->readcb;
    417 	if (writecb_ptr)
    418 		*writecb_ptr = bufev->writecb;
    419 	if (eventcb_ptr)
    420 		*eventcb_ptr = bufev->errorcb;
    421 	if (cbarg_ptr)
    422 		*cbarg_ptr = bufev->cbarg;
    423 
    424 	BEV_UNLOCK(bufev);
    425 }
    426 
    427 struct evbuffer *
    428 bufferevent_get_input(struct bufferevent *bufev)
    429 {
    430 	return bufev->input;
    431 }
    432 
    433 struct evbuffer *
    434 bufferevent_get_output(struct bufferevent *bufev)
    435 {
    436 	return bufev->output;
    437 }
    438 
    439 struct event_base *
    440 bufferevent_get_base(struct bufferevent *bufev)
    441 {
    442 	return bufev->ev_base;
    443 }
    444 
    445 int
    446 bufferevent_get_priority(const struct bufferevent *bufev)
    447 {
    448 	if (event_initialized(&bufev->ev_read)) {
    449 		return event_get_priority(&bufev->ev_read);
    450 	} else {
    451 		return event_base_get_npriorities(bufev->ev_base) / 2;
    452 	}
    453 }
    454 
    455 int
    456 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
    457 {
    458 	if (evbuffer_add(bufev->output, data, size) == -1)
    459 		return (-1);
    460 
    461 	return 0;
    462 }
    463 
    464 int
    465 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
    466 {
    467 	if (evbuffer_add_buffer(bufev->output, buf) == -1)
    468 		return (-1);
    469 
    470 	return 0;
    471 }
    472 
    473 size_t
    474 bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
    475 {
    476 	return (evbuffer_remove(bufev->input, data, size));
    477 }
    478 
    479 int
    480 bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf)
    481 {
    482 	return (evbuffer_add_buffer(buf, bufev->input));
    483 }
    484 
    485 int
    486 bufferevent_enable(struct bufferevent *bufev, short event)
    487 {
    488 	struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
    489 	short impl_events = event;
    490 	int r = 0;
    491 
    492 	bufferevent_incref_and_lock_(bufev);
    493 	if (bufev_private->read_suspended)
    494 		impl_events &= ~EV_READ;
    495 	if (bufev_private->write_suspended)
    496 		impl_events &= ~EV_WRITE;
    497 
    498 	bufev->enabled |= event;
    499 
    500 	if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
    501 		r = -1;
    502 	if (r)
    503 		event_debug(("%s: cannot enable 0x%hx on %p", __func__, event, bufev));
    504 
    505 	bufferevent_decref_and_unlock_(bufev);
    506 	return r;
    507 }
    508 
    509 int
    510 bufferevent_set_timeouts(struct bufferevent *bufev,
    511 			 const struct timeval *tv_read,
    512 			 const struct timeval *tv_write)
    513 {
    514 	int r = 0;
    515 	BEV_LOCK(bufev);
    516 	if (tv_read) {
    517 		bufev->timeout_read = *tv_read;
    518 	} else {
    519 		evutil_timerclear(&bufev->timeout_read);
    520 	}
    521 	if (tv_write) {
    522 		bufev->timeout_write = *tv_write;
    523 	} else {
    524 		evutil_timerclear(&bufev->timeout_write);
    525 	}
    526 
    527 	if (bufev->be_ops->adj_timeouts)
    528 		r = bufev->be_ops->adj_timeouts(bufev);
    529 	BEV_UNLOCK(bufev);
    530 
    531 	return r;
    532 }
    533 
    534 
    535 /* Obsolete; use bufferevent_set_timeouts */
    536 void
    537 bufferevent_settimeout(struct bufferevent *bufev,
    538 		       int timeout_read, int timeout_write)
    539 {
    540 	struct timeval tv_read, tv_write;
    541 	struct timeval *ptv_read = NULL, *ptv_write = NULL;
    542 
    543 	memset(&tv_read, 0, sizeof(tv_read));
    544 	memset(&tv_write, 0, sizeof(tv_write));
    545 
    546 	if (timeout_read) {
    547 		tv_read.tv_sec = timeout_read;
    548 		ptv_read = &tv_read;
    549 	}
    550 	if (timeout_write) {
    551 		tv_write.tv_sec = timeout_write;
    552 		ptv_write = &tv_write;
    553 	}
    554 
    555 	bufferevent_set_timeouts(bufev, ptv_read, ptv_write);
    556 }
    557 
    558 
    559 int
    560 bufferevent_disable_hard_(struct bufferevent *bufev, short event)
    561 {
    562 	int r = 0;
    563 	struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
    564 
    565 	BEV_LOCK(bufev);
    566 	bufev->enabled &= ~event;
    567 
    568 	bufev_private->connecting = 0;
    569 	if (bufev->be_ops->disable(bufev, event) < 0)
    570 		r = -1;
    571 
    572 	BEV_UNLOCK(bufev);
    573 	return r;
    574 }
    575 
    576 int
    577 bufferevent_disable(struct bufferevent *bufev, short event)
    578 {
    579 	int r = 0;
    580 
    581 	BEV_LOCK(bufev);
    582 	bufev->enabled &= ~event;
    583 
    584 	if (bufev->be_ops->disable(bufev, event) < 0)
    585 		r = -1;
    586 	if (r)
    587 		event_debug(("%s: cannot disable 0x%hx on %p", __func__, event, bufev));
    588 
    589 	BEV_UNLOCK(bufev);
    590 	return r;
    591 }
    592 
    593 /*
    594  * Sets the water marks
    595  */
    596 
    597 void
    598 bufferevent_setwatermark(struct bufferevent *bufev, short events,
    599     size_t lowmark, size_t highmark)
    600 {
    601 	struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
    602 
    603 	BEV_LOCK(bufev);
    604 	if (events & EV_WRITE) {
    605 		bufev->wm_write.low = lowmark;
    606 		bufev->wm_write.high = highmark;
    607 	}
    608 
    609 	if (events & EV_READ) {
    610 		bufev->wm_read.low = lowmark;
    611 		bufev->wm_read.high = highmark;
    612 
    613 		if (highmark) {
    614 			/* There is now a new high-water mark for read.
    615 			   enable the callback if needed, and see if we should
    616 			   suspend/bufferevent_wm_unsuspend. */
    617 
    618 			if (bufev_private->read_watermarks_cb == NULL) {
    619 				bufev_private->read_watermarks_cb =
    620 				    evbuffer_add_cb(bufev->input,
    621 						    bufferevent_inbuf_wm_cb,
    622 						    bufev);
    623 			}
    624 			evbuffer_cb_set_flags(bufev->input,
    625 				      bufev_private->read_watermarks_cb,
    626 				      EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER);
    627 
    628 			if (evbuffer_get_length(bufev->input) >= highmark)
    629 				bufferevent_wm_suspend_read(bufev);
    630 			else if (evbuffer_get_length(bufev->input) < highmark)
    631 				bufferevent_wm_unsuspend_read(bufev);
    632 		} else {
    633 			/* There is now no high-water mark for read. */
    634 			if (bufev_private->read_watermarks_cb)
    635 				evbuffer_cb_clear_flags(bufev->input,
    636 				    bufev_private->read_watermarks_cb,
    637 				    EVBUFFER_CB_ENABLED);
    638 			bufferevent_wm_unsuspend_read(bufev);
    639 		}
    640 	}
    641 	BEV_UNLOCK(bufev);
    642 }
    643 
    644 int
    645 bufferevent_getwatermark(struct bufferevent *bufev, short events,
    646     size_t *lowmark, size_t *highmark)
    647 {
    648 	if (events == EV_WRITE) {
    649 		BEV_LOCK(bufev);
    650 		if (lowmark)
    651 			*lowmark = bufev->wm_write.low;
    652 		if (highmark)
    653 			*highmark = bufev->wm_write.high;
    654 		BEV_UNLOCK(bufev);
    655 		return 0;
    656 	}
    657 
    658 	if (events == EV_READ) {
    659 		BEV_LOCK(bufev);
    660 		if (lowmark)
    661 			*lowmark = bufev->wm_read.low;
    662 		if (highmark)
    663 			*highmark = bufev->wm_read.high;
    664 		BEV_UNLOCK(bufev);
    665 		return 0;
    666 	}
    667 	return -1;
    668 }
    669 
    670 int
    671 bufferevent_flush(struct bufferevent *bufev,
    672     short iotype,
    673     enum bufferevent_flush_mode mode)
    674 {
    675 	int r = -1;
    676 	BEV_LOCK(bufev);
    677 	if (bufev->be_ops->flush)
    678 		r = bufev->be_ops->flush(bufev, iotype, mode);
    679 	BEV_UNLOCK(bufev);
    680 	return r;
    681 }
    682 
    683 void
    684 bufferevent_incref_and_lock_(struct bufferevent *bufev)
    685 {
    686 	struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
    687 	BEV_LOCK(bufev);
    688 	++bufev_private->refcnt;
    689 }
    690 
    691 #if 0
    692 static void
    693 bufferevent_transfer_lock_ownership_(struct bufferevent *donor,
    694     struct bufferevent *recipient)
    695 {
    696 	struct bufferevent_private *d = BEV_UPCAST(donor);
    697 	struct bufferevent_private *r = BEV_UPCAST(recipient);
    698 	if (d->lock != r->lock)
    699 		return;
    700 	if (r->own_lock)
    701 		return;
    702 	if (d->own_lock) {
    703 		d->own_lock = 0;
    704 		r->own_lock = 1;
    705 	}
    706 }
    707 #endif
    708 
    709 int
    710 bufferevent_decref_and_unlock_(struct bufferevent *bufev)
    711 {
    712 	struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
    713 	int n_cbs = 0;
    714 #define MAX_CBS 16
    715 	struct event_callback *cbs[MAX_CBS];
    716 
    717 	EVUTIL_ASSERT(bufev_private->refcnt > 0);
    718 
    719 	if (--bufev_private->refcnt) {
    720 		BEV_UNLOCK(bufev);
    721 		return 0;
    722 	}
    723 
    724 	if (bufev->be_ops->unlink)
    725 		bufev->be_ops->unlink(bufev);
    726 
    727 	/* Okay, we're out of references. Let's finalize this once all the
    728 	 * callbacks are done running. */
    729 	cbs[0] = &bufev->ev_read.ev_evcallback;
    730 	cbs[1] = &bufev->ev_write.ev_evcallback;
    731 	cbs[2] = &bufev_private->deferred;
    732 	n_cbs = 3;
    733 	if (bufev_private->rate_limiting) {
    734 		struct event *e = &bufev_private->rate_limiting->refill_bucket_event;
    735 		if (event_initialized(e))
    736 			cbs[n_cbs++] = &e->ev_evcallback;
    737 	}
    738 	n_cbs += evbuffer_get_callbacks_(bufev->input, cbs+n_cbs, MAX_CBS-n_cbs);
    739 	n_cbs += evbuffer_get_callbacks_(bufev->output, cbs+n_cbs, MAX_CBS-n_cbs);
    740 
    741 	event_callback_finalize_many_(bufev->ev_base, n_cbs, cbs,
    742 	    bufferevent_finalize_cb_);
    743 
    744 #undef MAX_CBS
    745 	BEV_UNLOCK(bufev);
    746 
    747 	return 1;
    748 }
    749 
    750 static void
    751 bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_)
    752 {
    753 	struct bufferevent *bufev = arg_;
    754 	struct bufferevent *underlying;
    755 	struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
    756 
    757 	BEV_LOCK(bufev);
    758 	underlying = bufferevent_get_underlying(bufev);
    759 
    760 	/* Clean up the shared info */
    761 	if (bufev->be_ops->destruct)
    762 		bufev->be_ops->destruct(bufev);
    763 
    764 	/* XXX what happens if refcnt for these buffers is > 1?
    765 	 * The buffers can share a lock with this bufferevent object,
    766 	 * but the lock might be destroyed below. */
    767 	/* evbuffer will free the callbacks */
    768 	evbuffer_free(bufev->input);
    769 	evbuffer_free(bufev->output);
    770 
    771 	if (bufev_private->rate_limiting) {
    772 		if (bufev_private->rate_limiting->group)
    773 			bufferevent_remove_from_rate_limit_group_internal_(bufev,0);
    774 		mm_free(bufev_private->rate_limiting);
    775 		bufev_private->rate_limiting = NULL;
    776 	}
    777 
    778 
    779 	BEV_UNLOCK(bufev);
    780 
    781 	if (bufev_private->own_lock)
    782 		EVTHREAD_FREE_LOCK(bufev_private->lock,
    783 		    EVTHREAD_LOCKTYPE_RECURSIVE);
    784 
    785 	/* Free the actual allocated memory. */
    786 	mm_free(((char*)bufev) - bufev->be_ops->mem_offset);
    787 
    788 	/* Release the reference to underlying now that we no longer need the
    789 	 * reference to it.  We wait this long mainly in case our lock is
    790 	 * shared with underlying.
    791 	 *
    792 	 * The 'destruct' function will also drop a reference to underlying
    793 	 * if BEV_OPT_CLOSE_ON_FREE is set.
    794 	 *
    795 	 * XXX Should we/can we just refcount evbuffer/bufferevent locks?
    796 	 * It would probably save us some headaches.
    797 	 */
    798 	if (underlying)
    799 		bufferevent_decref_(underlying);
    800 }
    801 
    802 int
    803 bufferevent_decref(struct bufferevent *bufev)
    804 {
    805 	BEV_LOCK(bufev);
    806 	return bufferevent_decref_and_unlock_(bufev);
    807 }
    808 
    809 void
    810 bufferevent_free(struct bufferevent *bufev)
    811 {
    812 	BEV_LOCK(bufev);
    813 	bufferevent_setcb(bufev, NULL, NULL, NULL, NULL);
    814 	bufferevent_cancel_all_(bufev);
    815 	bufferevent_decref_and_unlock_(bufev);
    816 }
    817 
    818 void
    819 bufferevent_incref(struct bufferevent *bufev)
    820 {
    821 	struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
    822 
    823 	/* XXX: now that this function is public, we might want to
    824 	 * - return the count from this function
    825 	 * - create a new function to atomically grab the current refcount
    826 	 */
    827 	BEV_LOCK(bufev);
    828 	++bufev_private->refcnt;
    829 	BEV_UNLOCK(bufev);
    830 }
    831 
    832 int
    833 bufferevent_enable_locking_(struct bufferevent *bufev, void *lock)
    834 {
    835 #ifdef EVENT__DISABLE_THREAD_SUPPORT
    836 	return -1;
    837 #else
    838 	struct bufferevent *underlying;
    839 
    840 	if (BEV_UPCAST(bufev)->lock)
    841 		return -1;
    842 	underlying = bufferevent_get_underlying(bufev);
    843 
    844 	if (!lock && underlying && BEV_UPCAST(underlying)->lock) {
    845 		lock = BEV_UPCAST(underlying)->lock;
    846 		BEV_UPCAST(bufev)->lock = lock;
    847 		BEV_UPCAST(bufev)->own_lock = 0;
    848 	} else if (!lock) {
    849 		EVTHREAD_ALLOC_LOCK(lock, EVTHREAD_LOCKTYPE_RECURSIVE);
    850 		if (!lock)
    851 			return -1;
    852 		BEV_UPCAST(bufev)->lock = lock;
    853 		BEV_UPCAST(bufev)->own_lock = 1;
    854 	} else {
    855 		BEV_UPCAST(bufev)->lock = lock;
    856 		BEV_UPCAST(bufev)->own_lock = 0;
    857 	}
    858 	evbuffer_enable_locking(bufev->input, lock);
    859 	evbuffer_enable_locking(bufev->output, lock);
    860 
    861 	if (underlying && !BEV_UPCAST(underlying)->lock)
    862 		bufferevent_enable_locking_(underlying, lock);
    863 
    864 	return 0;
    865 #endif
    866 }
    867 
    868 int
    869 bufferevent_setfd(struct bufferevent *bev, evutil_socket_t fd)
    870 {
    871 	union bufferevent_ctrl_data d;
    872 	int res = -1;
    873 	d.fd = fd;
    874 	BEV_LOCK(bev);
    875 	if (bev->be_ops->ctrl)
    876 		res = bev->be_ops->ctrl(bev, BEV_CTRL_SET_FD, &d);
    877 	if (res)
    878 		event_debug(("%s: cannot set fd for %p to "EV_SOCK_FMT, __func__, bev, fd));
    879 	BEV_UNLOCK(bev);
    880 	return res;
    881 }
    882 
    883 evutil_socket_t
    884 bufferevent_getfd(struct bufferevent *bev)
    885 {
    886 	union bufferevent_ctrl_data d;
    887 	int res = -1;
    888 	d.fd = -1;
    889 	BEV_LOCK(bev);
    890 	if (bev->be_ops->ctrl)
    891 		res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_FD, &d);
    892 	if (res)
    893 		event_debug(("%s: cannot get fd for %p", __func__, bev));
    894 	BEV_UNLOCK(bev);
    895 	return (res<0) ? -1 : d.fd;
    896 }
    897 
    898 enum bufferevent_options
    899 bufferevent_get_options_(struct bufferevent *bev)
    900 {
    901 	struct bufferevent_private *bev_p = BEV_UPCAST(bev);
    902 	enum bufferevent_options options;
    903 
    904 	BEV_LOCK(bev);
    905 	options = bev_p->options;
    906 	BEV_UNLOCK(bev);
    907 	return options;
    908 }
    909 
    910 
    911 static void
    912 bufferevent_cancel_all_(struct bufferevent *bev)
    913 {
    914 	union bufferevent_ctrl_data d;
    915 	memset(&d, 0, sizeof(d));
    916 	BEV_LOCK(bev);
    917 	if (bev->be_ops->ctrl)
    918 		bev->be_ops->ctrl(bev, BEV_CTRL_CANCEL_ALL, &d);
    919 	BEV_UNLOCK(bev);
    920 }
    921 
    922 short
    923 bufferevent_get_enabled(struct bufferevent *bufev)
    924 {
    925 	short r;
    926 	BEV_LOCK(bufev);
    927 	r = bufev->enabled;
    928 	BEV_UNLOCK(bufev);
    929 	return r;
    930 }
    931 
    932 struct bufferevent *
    933 bufferevent_get_underlying(struct bufferevent *bev)
    934 {
    935 	union bufferevent_ctrl_data d;
    936 	int res = -1;
    937 	d.ptr = NULL;
    938 	BEV_LOCK(bev);
    939 	if (bev->be_ops->ctrl)
    940 		res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_UNDERLYING, &d);
    941 	BEV_UNLOCK(bev);
    942 	return (res<0) ? NULL : d.ptr;
    943 }
    944 
    945 static void
    946 bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx)
    947 {
    948 	struct bufferevent *bev = ctx;
    949 	bufferevent_incref_and_lock_(bev);
    950 	bufferevent_disable(bev, EV_READ);
    951 	bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING, 0);
    952 	bufferevent_decref_and_unlock_(bev);
    953 }
    954 static void
    955 bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx)
    956 {
    957 	struct bufferevent *bev = ctx;
    958 	bufferevent_incref_and_lock_(bev);
    959 	bufferevent_disable(bev, EV_WRITE);
    960 	bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING, 0);
    961 	bufferevent_decref_and_unlock_(bev);
    962 }
    963 
    964 void
    965 bufferevent_init_generic_timeout_cbs_(struct bufferevent *bev)
    966 {
    967 	event_assign(&bev->ev_read, bev->ev_base, -1, EV_FINALIZE,
    968 	    bufferevent_generic_read_timeout_cb, bev);
    969 	event_assign(&bev->ev_write, bev->ev_base, -1, EV_FINALIZE,
    970 	    bufferevent_generic_write_timeout_cb, bev);
    971 }
    972 
    973 int
    974 bufferevent_generic_adj_timeouts_(struct bufferevent *bev)
    975 {
    976 	const short enabled = bev->enabled;
    977 	struct bufferevent_private *bev_p = BEV_UPCAST(bev);
    978 	int r1=0, r2=0;
    979 	if ((enabled & EV_READ) && !bev_p->read_suspended &&
    980 	    evutil_timerisset(&bev->timeout_read))
    981 		r1 = event_add(&bev->ev_read, &bev->timeout_read);
    982 	else
    983 		r1 = event_del(&bev->ev_read);
    984 
    985 	if ((enabled & EV_WRITE) && !bev_p->write_suspended &&
    986 	    evutil_timerisset(&bev->timeout_write) &&
    987 	    evbuffer_get_length(bev->output))
    988 		r2 = event_add(&bev->ev_write, &bev->timeout_write);
    989 	else
    990 		r2 = event_del(&bev->ev_write);
    991 	if (r1 < 0 || r2 < 0)
    992 		return -1;
    993 	return 0;
    994 }
    995 
    996 int
    997 bufferevent_generic_adj_existing_timeouts_(struct bufferevent *bev)
    998 {
    999 	int r = 0;
   1000 	if (event_pending(&bev->ev_read, EV_READ, NULL)) {
   1001 		if (evutil_timerisset(&bev->timeout_read)) {
   1002 			    if (bufferevent_add_event_(&bev->ev_read, &bev->timeout_read) < 0)
   1003 				    r = -1;
   1004 		} else {
   1005 			event_remove_timer(&bev->ev_read);
   1006 		}
   1007 	}
   1008 	if (event_pending(&bev->ev_write, EV_WRITE, NULL)) {
   1009 		if (evutil_timerisset(&bev->timeout_write)) {
   1010 			if (bufferevent_add_event_(&bev->ev_write, &bev->timeout_write) < 0)
   1011 				r = -1;
   1012 		} else {
   1013 			event_remove_timer(&bev->ev_write);
   1014 		}
   1015 	}
   1016 	return r;
   1017 }
   1018 
   1019 int
   1020 bufferevent_add_event_(struct event *ev, const struct timeval *tv)
   1021 {
   1022 	if (!evutil_timerisset(tv))
   1023 		return event_add(ev, NULL);
   1024 	else
   1025 		return event_add(ev, tv);
   1026 }
   1027 
   1028 /* For use by user programs only; internally, we should be calling
   1029    either bufferevent_incref_and_lock_(), or BEV_LOCK. */
   1030 void
   1031 bufferevent_lock(struct bufferevent *bev)
   1032 {
   1033 	bufferevent_incref_and_lock_(bev);
   1034 }
   1035 
   1036 void
   1037 bufferevent_unlock(struct bufferevent *bev)
   1038 {
   1039 	bufferevent_decref_and_unlock_(bev);
   1040 }
   1041