Home | History | Annotate | Line # | Download | only in libevent
bufferevent_ratelim.c revision 1.5.8.1
      1 /*	$NetBSD: bufferevent_ratelim.c,v 1.5.8.1 2025/08/02 05:22:50 perseant Exp $	*/
      2 
      3 /*
      4  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
      5  * Copyright (c) 2002-2006 Niels Provos <provos (at) citi.umich.edu>
      6  * All rights reserved.
      7  *
      8  * Redistribution and use in source and binary forms, with or without
      9  * modification, are permitted provided that the following conditions
     10  * are met:
     11  * 1. Redistributions of source code must retain the above copyright
     12  *    notice, this list of conditions and the following disclaimer.
     13  * 2. Redistributions in binary form must reproduce the above copyright
     14  *    notice, this list of conditions and the following disclaimer in the
     15  *    documentation and/or other materials provided with the distribution.
     16  * 3. The name of the author may not be used to endorse or promote products
     17  *    derived from this software without specific prior written permission.
     18  *
     19  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     20  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     21  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     22  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     23  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     24  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     28  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     29  */
     30 #include "evconfig-private.h"
     31 
     32 #include <sys/types.h>
     33 #include <limits.h>
     34 #include <string.h>
     35 #include <stdlib.h>
     36 
     37 #include "event2/event.h"
     38 #include "event2/event_struct.h"
     39 #include "event2/util.h"
     40 #include "event2/bufferevent.h"
     41 #include "event2/bufferevent_struct.h"
     42 #include "event2/buffer.h"
     43 
     44 #include "ratelim-internal.h"
     45 
     46 #include "bufferevent-internal.h"
     47 #include "mm-internal.h"
     48 #include "util-internal.h"
     49 #include "event-internal.h"
     50 
     51 int
     52 ev_token_bucket_init_(struct ev_token_bucket *bucket,
     53     const struct ev_token_bucket_cfg *cfg,
     54     ev_uint32_t current_tick,
     55     int reinitialize)
     56 {
     57 	if (reinitialize) {
     58 		/* on reinitialization, we only clip downwards, since we've
     59 		   already used who-knows-how-much bandwidth this tick.  We
     60 		   leave "last_updated" as it is; the next update will add the
     61 		   appropriate amount of bandwidth to the bucket.
     62 		*/
     63 		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
     64 			bucket->read_limit = cfg->read_maximum;
     65 		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
     66 			bucket->write_limit = cfg->write_maximum;
     67 	} else {
     68 		bucket->read_limit = cfg->read_rate;
     69 		bucket->write_limit = cfg->write_rate;
     70 		bucket->last_updated = current_tick;
     71 	}
     72 	return 0;
     73 }
     74 
     75 int
     76 ev_token_bucket_update_(struct ev_token_bucket *bucket,
     77     const struct ev_token_bucket_cfg *cfg,
     78     ev_uint32_t current_tick)
     79 {
     80 	/* It's okay if the tick number overflows, since we'll just
     81 	 * wrap around when we do the unsigned substraction. */
     82 	unsigned n_ticks = current_tick - bucket->last_updated;
     83 
     84 	/* Make sure some ticks actually happened, and that time didn't
     85 	 * roll back. */
     86 	if (n_ticks == 0 || n_ticks > INT_MAX)
     87 		return 0;
     88 
     89 	/* Naively, we would say
     90 		bucket->limit += n_ticks * cfg->rate;
     91 
     92 		if (bucket->limit > cfg->maximum)
     93 			bucket->limit = cfg->maximum;
     94 
     95 	   But we're worried about overflow, so we do it like this:
     96 	*/
     97 
     98 	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
     99 		bucket->read_limit = cfg->read_maximum;
    100 	else
    101 		bucket->read_limit += n_ticks * cfg->read_rate;
    102 
    103 
    104 	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
    105 		bucket->write_limit = cfg->write_maximum;
    106 	else
    107 		bucket->write_limit += n_ticks * cfg->write_rate;
    108 
    109 
    110 	bucket->last_updated = current_tick;
    111 
    112 	return 1;
    113 }
    114 
    115 static inline void
    116 bufferevent_update_buckets(struct bufferevent_private *bev)
    117 {
    118 	/* Must hold lock on bev. */
    119 	struct timeval now;
    120 	unsigned tick;
    121 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
    122 	tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
    123 	if (tick != bev->rate_limiting->limit.last_updated)
    124 		ev_token_bucket_update_(&bev->rate_limiting->limit,
    125 		    bev->rate_limiting->cfg, tick);
    126 }
    127 
    128 ev_uint32_t
    129 ev_token_bucket_get_tick_(const struct timeval *tv,
    130     const struct ev_token_bucket_cfg *cfg)
    131 {
    132 	/* This computation uses two multiplies and a divide.  We could do
    133 	 * fewer if we knew that the tick length was an integer number of
    134 	 * seconds, or if we knew it divided evenly into a second.  We should
    135 	 * investigate that more.
    136 	 */
    137 
    138 	/* We cast to an ev_uint64_t first, since we don't want to overflow
    139 	 * before we do the final divide. */
    140 	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
    141 	return (unsigned)(msec / cfg->msec_per_tick);
    142 }
    143 
    144 struct ev_token_bucket_cfg *
    145 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
    146     size_t write_rate, size_t write_burst,
    147     const struct timeval *tick_len)
    148 {
    149 	struct ev_token_bucket_cfg *r;
    150 	struct timeval g;
    151 	if (! tick_len) {
    152 		g.tv_sec = 1;
    153 		g.tv_usec = 0;
    154 		tick_len = &g;
    155 	}
    156 	if (read_rate > read_burst || write_rate > write_burst ||
    157 	    read_rate < 1 || write_rate < 1)
    158 		return NULL;
    159 	if (read_rate > EV_RATE_LIMIT_MAX ||
    160 	    write_rate > EV_RATE_LIMIT_MAX ||
    161 	    read_burst > EV_RATE_LIMIT_MAX ||
    162 	    write_burst > EV_RATE_LIMIT_MAX)
    163 		return NULL;
    164 	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
    165 	if (!r)
    166 		return NULL;
    167 	r->read_rate = read_rate;
    168 	r->write_rate = write_rate;
    169 	r->read_maximum = read_burst;
    170 	r->write_maximum = write_burst;
    171 	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
    172 	r->msec_per_tick = (tick_len->tv_sec * 1000) +
    173 	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
    174 	return r;
    175 }
    176 
    177 void
    178 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
    179 {
    180 	mm_free(cfg);
    181 }
    182 
    183 /* Default values for max_single_read & max_single_write variables. */
    184 #define MAX_SINGLE_READ_DEFAULT 16384
    185 #define MAX_SINGLE_WRITE_DEFAULT 16384
    186 
    187 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
    188 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
    189 
    190 static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
    191 static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
    192 static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
    193 static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
    194 
    195 /** Helper: figure out the maximum amount we should write if is_write, or
    196     the maximum amount we should read if is_read.  Return that maximum, or
    197     0 if our bucket is wholly exhausted.
    198  */
    199 static inline ev_ssize_t
    200 bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
    201 {
    202 	/* needs lock on bev. */
    203 	ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
    204 
    205 #define LIM(x)						\
    206 	(is_write ? (x).write_limit : (x).read_limit)
    207 
    208 #define GROUP_SUSPENDED(g)			\
    209 	(is_write ? (g)->write_suspended : (g)->read_suspended)
    210 
    211 	/* Sets max_so_far to MIN(x, max_so_far) */
    212 #define CLAMPTO(x)				\
    213 	do {					\
    214 		if (max_so_far > (x))		\
    215 			max_so_far = (x);	\
    216 	} while (0);
    217 
    218 	if (!bev->rate_limiting)
    219 		return max_so_far;
    220 
    221 	/* If rate-limiting is enabled at all, update the appropriate
    222 	   bucket, and take the smaller of our rate limit and the group
    223 	   rate limit.
    224 	 */
    225 
    226 	if (bev->rate_limiting->cfg) {
    227 		bufferevent_update_buckets(bev);
    228 		max_so_far = LIM(bev->rate_limiting->limit);
    229 	}
    230 	if (bev->rate_limiting->group) {
    231 		struct bufferevent_rate_limit_group *g =
    232 		    bev->rate_limiting->group;
    233 		ev_ssize_t share;
    234 		LOCK_GROUP(g);
    235 		if (GROUP_SUSPENDED(g)) {
    236 			/* We can get here if we failed to lock this
    237 			 * particular bufferevent while suspending the whole
    238 			 * group. */
    239 			if (is_write)
    240 				bufferevent_suspend_write_(&bev->bev,
    241 				    BEV_SUSPEND_BW_GROUP);
    242 			else
    243 				bufferevent_suspend_read_(&bev->bev,
    244 				    BEV_SUSPEND_BW_GROUP);
    245 			share = 0;
    246 		} else {
    247 			/* XXXX probably we should divide among the active
    248 			 * members, not the total members. */
    249 			share = LIM(g->rate_limit) / g->n_members;
    250 			if (share < g->min_share)
    251 				share = g->min_share;
    252 		}
    253 		UNLOCK_GROUP(g);
    254 		CLAMPTO(share);
    255 	}
    256 
    257 	if (max_so_far < 0)
    258 		max_so_far = 0;
    259 	return max_so_far;
    260 }
    261 
    262 ev_ssize_t
    263 bufferevent_get_read_max_(struct bufferevent_private *bev)
    264 {
    265 	return bufferevent_get_rlim_max_(bev, 0);
    266 }
    267 
    268 ev_ssize_t
    269 bufferevent_get_write_max_(struct bufferevent_private *bev)
    270 {
    271 	return bufferevent_get_rlim_max_(bev, 1);
    272 }
    273 
    274 int
    275 bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
    276 {
    277 	/* XXXXX Make sure all users of this function check its return value */
    278 	int r = 0;
    279 	/* need to hold lock on bev */
    280 	if (!bev->rate_limiting)
    281 		return 0;
    282 
    283 	if (bev->rate_limiting->cfg) {
    284 		bev->rate_limiting->limit.read_limit -= bytes;
    285 		if (bev->rate_limiting->limit.read_limit <= 0) {
    286 			bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
    287 			if (event_add(&bev->rate_limiting->refill_bucket_event,
    288 				&bev->rate_limiting->cfg->tick_timeout) < 0)
    289 				r = -1;
    290 		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
    291 			if (!(bev->write_suspended & BEV_SUSPEND_BW))
    292 				event_del(&bev->rate_limiting->refill_bucket_event);
    293 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
    294 		}
    295 	}
    296 
    297 	if (bev->rate_limiting->group) {
    298 		LOCK_GROUP(bev->rate_limiting->group);
    299 		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
    300 		bev->rate_limiting->group->total_read += bytes;
    301 		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
    302 			bev_group_suspend_reading_(bev->rate_limiting->group);
    303 		} else if (bev->rate_limiting->group->read_suspended) {
    304 			bev_group_unsuspend_reading_(bev->rate_limiting->group);
    305 		}
    306 		UNLOCK_GROUP(bev->rate_limiting->group);
    307 	}
    308 
    309 	return r;
    310 }
    311 
    312 int
    313 bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
    314 {
    315 	/* XXXXX Make sure all users of this function check its return value */
    316 	int r = 0;
    317 	/* need to hold lock */
    318 	if (!bev->rate_limiting)
    319 		return 0;
    320 
    321 	if (bev->rate_limiting->cfg) {
    322 		bev->rate_limiting->limit.write_limit -= bytes;
    323 		if (bev->rate_limiting->limit.write_limit <= 0) {
    324 			bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
    325 			if (event_add(&bev->rate_limiting->refill_bucket_event,
    326 				&bev->rate_limiting->cfg->tick_timeout) < 0)
    327 				r = -1;
    328 		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
    329 			if (!(bev->read_suspended & BEV_SUSPEND_BW))
    330 				event_del(&bev->rate_limiting->refill_bucket_event);
    331 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
    332 		}
    333 	}
    334 
    335 	if (bev->rate_limiting->group) {
    336 		LOCK_GROUP(bev->rate_limiting->group);
    337 		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
    338 		bev->rate_limiting->group->total_written += bytes;
    339 		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
    340 			bev_group_suspend_writing_(bev->rate_limiting->group);
    341 		} else if (bev->rate_limiting->group->write_suspended) {
    342 			bev_group_unsuspend_writing_(bev->rate_limiting->group);
    343 		}
    344 		UNLOCK_GROUP(bev->rate_limiting->group);
    345 	}
    346 
    347 	return r;
    348 }
    349 
    350 /** Stop reading on every bufferevent in <b>g</b> */
    351 static int
    352 bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
    353 {
    354 	/* Needs group lock */
    355 	struct bufferevent_private *bev;
    356 	g->read_suspended = 1;
    357 	g->pending_unsuspend_read = 0;
    358 
    359 	/* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
    360 	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
    361 	   the bufferevent locks.  If we are unable to lock any individual
    362 	   bufferevent, it will find out later when it looks at its limit
    363 	   and sees that its group is suspended.)
    364 	*/
    365 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
    366 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
    367 			bufferevent_suspend_read_(&bev->bev,
    368 			    BEV_SUSPEND_BW_GROUP);
    369 			EVLOCK_UNLOCK(bev->lock, 0);
    370 		}
    371 	}
    372 	return 0;
    373 }
    374 
    375 /** Stop writing on every bufferevent in <b>g</b> */
    376 static int
    377 bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
    378 {
    379 	/* Needs group lock */
    380 	struct bufferevent_private *bev;
    381 	g->write_suspended = 1;
    382 	g->pending_unsuspend_write = 0;
    383 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
    384 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
    385 			bufferevent_suspend_write_(&bev->bev,
    386 			    BEV_SUSPEND_BW_GROUP);
    387 			EVLOCK_UNLOCK(bev->lock, 0);
    388 		}
    389 	}
    390 	return 0;
    391 }
    392 
    393 /** Timer callback invoked on a single bufferevent with one or more exhausted
    394     buckets when they are ready to refill. */
    395 static void
    396 bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
    397 {
    398 	unsigned tick;
    399 	struct timeval now;
    400 	struct bufferevent_private *bev = arg;
    401 	int again = 0;
    402 	BEV_LOCK(&bev->bev);
    403 	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
    404 		BEV_UNLOCK(&bev->bev);
    405 		return;
    406 	}
    407 
    408 	/* First, update the bucket */
    409 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
    410 	tick = ev_token_bucket_get_tick_(&now,
    411 	    bev->rate_limiting->cfg);
    412 	ev_token_bucket_update_(&bev->rate_limiting->limit,
    413 	    bev->rate_limiting->cfg,
    414 	    tick);
    415 
    416 	/* Now unsuspend any read/write operations as appropriate. */
    417 	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
    418 		if (bev->rate_limiting->limit.read_limit > 0)
    419 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
    420 		else
    421 			again = 1;
    422 	}
    423 	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
    424 		if (bev->rate_limiting->limit.write_limit > 0)
    425 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
    426 		else
    427 			again = 1;
    428 	}
    429 	if (again) {
    430 		/* One or more of the buckets may need another refill if they
    431 		   started negative.
    432 
    433 		   XXXX if we need to be quiet for more ticks, we should
    434 		   maybe figure out what timeout we really want.
    435 		*/
    436 		/* XXXX Handle event_add failure somehow */
    437 		event_add(&bev->rate_limiting->refill_bucket_event,
    438 		    &bev->rate_limiting->cfg->tick_timeout);
    439 	}
    440 	BEV_UNLOCK(&bev->bev);
    441 }
    442 
    443 /** Helper: grab a random element from a bufferevent group.
    444  *
    445  * Requires that we hold the lock on the group.
    446  */
    447 static struct bufferevent_private *
    448 bev_group_random_element_(struct bufferevent_rate_limit_group *group)
    449 {
    450 	int which;
    451 	struct bufferevent_private *bev;
    452 
    453 	/* requires group lock */
    454 
    455 	if (!group->n_members)
    456 		return NULL;
    457 
    458 	EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
    459 
    460 	which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
    461 
    462 	bev = LIST_FIRST(&group->members);
    463 	while (which--)
    464 		bev = LIST_NEXT(bev, rate_limiting->next_in_group);
    465 
    466 	return bev;
    467 }
    468 
    469 /** Iterate over the elements of a rate-limiting group 'g' with a random
    470     starting point, assigning each to the variable 'bev', and executing the
    471     block 'block'.
    472 
    473     We do this in a half-baked effort to get fairness among group members.
    474     XXX Round-robin or some kind of priority queue would be even more fair.
    475  */
    476 #define FOREACH_RANDOM_ORDER(block)			\
    477 	do {						\
    478 		first = bev_group_random_element_(g);	\
    479 		for (bev = first; bev != LIST_END(&g->members); \
    480 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
    481 			block ;					 \
    482 		}						 \
    483 		for (bev = LIST_FIRST(&g->members); bev && bev != first; \
    484 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
    485 			block ;						\
    486 		}							\
    487 	} while (0)
    488 
    489 static void
    490 bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
    491 {
    492 	int again = 0;
    493 	struct bufferevent_private *bev, *first;
    494 
    495 	g->read_suspended = 0;
    496 	FOREACH_RANDOM_ORDER({
    497 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
    498 			bufferevent_unsuspend_read_(&bev->bev,
    499 			    BEV_SUSPEND_BW_GROUP);
    500 			EVLOCK_UNLOCK(bev->lock, 0);
    501 		} else {
    502 			again = 1;
    503 		}
    504 	});
    505 	g->pending_unsuspend_read = again;
    506 }
    507 
    508 static void
    509 bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
    510 {
    511 	int again = 0;
    512 	struct bufferevent_private *bev, *first;
    513 	g->write_suspended = 0;
    514 
    515 	FOREACH_RANDOM_ORDER({
    516 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
    517 			bufferevent_unsuspend_write_(&bev->bev,
    518 			    BEV_SUSPEND_BW_GROUP);
    519 			EVLOCK_UNLOCK(bev->lock, 0);
    520 		} else {
    521 			again = 1;
    522 		}
    523 	});
    524 	g->pending_unsuspend_write = again;
    525 }
    526 
    527 /** Callback invoked every tick to add more elements to the group bucket
    528     and unsuspend group members as needed.
    529  */
    530 static void
    531 bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
    532 {
    533 	struct bufferevent_rate_limit_group *g = arg;
    534 	unsigned tick;
    535 	struct timeval now;
    536 
    537 	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
    538 
    539 	LOCK_GROUP(g);
    540 
    541 	tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
    542 	ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
    543 
    544 	if (g->pending_unsuspend_read ||
    545 	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
    546 		bev_group_unsuspend_reading_(g);
    547 	}
    548 	if (g->pending_unsuspend_write ||
    549 	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
    550 		bev_group_unsuspend_writing_(g);
    551 	}
    552 
    553 	/* XXXX Rather than waiting to the next tick to unsuspend stuff
    554 	 * with pending_unsuspend_write/read, we should do it on the
    555 	 * next iteration of the mainloop.
    556 	 */
    557 
    558 	UNLOCK_GROUP(g);
    559 }
    560 
    561 int
    562 bufferevent_set_rate_limit(struct bufferevent *bev,
    563     struct ev_token_bucket_cfg *cfg)
    564 {
    565 	struct bufferevent_private *bevp = BEV_UPCAST(bev);
    566 	int r = -1;
    567 	struct bufferevent_rate_limit *rlim;
    568 	struct timeval now;
    569 	ev_uint32_t tick;
    570 	int reinit = 0, suspended = 0;
    571 	/* XXX reference-count cfg */
    572 
    573 	BEV_LOCK(bev);
    574 
    575 	if (cfg == NULL) {
    576 		if (bevp->rate_limiting) {
    577 			rlim = bevp->rate_limiting;
    578 			rlim->cfg = NULL;
    579 			bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
    580 			bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
    581 			if (event_initialized(&rlim->refill_bucket_event))
    582 				event_del(&rlim->refill_bucket_event);
    583 		}
    584 		r = 0;
    585 		goto done;
    586 	}
    587 
    588 	event_base_gettimeofday_cached(bev->ev_base, &now);
    589 	tick = ev_token_bucket_get_tick_(&now, cfg);
    590 
    591 	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
    592 		/* no-op */
    593 		r = 0;
    594 		goto done;
    595 	}
    596 	if (bevp->rate_limiting == NULL) {
    597 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
    598 		if (!rlim)
    599 			goto done;
    600 		bevp->rate_limiting = rlim;
    601 	} else {
    602 		rlim = bevp->rate_limiting;
    603 	}
    604 	reinit = rlim->cfg != NULL;
    605 
    606 	rlim->cfg = cfg;
    607 	ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
    608 
    609 	if (reinit) {
    610 		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
    611 		event_del(&rlim->refill_bucket_event);
    612 	}
    613 	event_assign(&rlim->refill_bucket_event, bev->ev_base,
    614 	    -1, EV_FINALIZE, bev_refill_callback_, bevp);
    615 
    616 	if (rlim->limit.read_limit > 0) {
    617 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
    618 	} else {
    619 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
    620 		suspended=1;
    621 	}
    622 	if (rlim->limit.write_limit > 0) {
    623 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
    624 	} else {
    625 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
    626 		suspended = 1;
    627 	}
    628 
    629 	if (suspended)
    630 		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
    631 
    632 	r = 0;
    633 
    634 done:
    635 	BEV_UNLOCK(bev);
    636 	return r;
    637 }
    638 
    639 struct bufferevent_rate_limit_group *
    640 bufferevent_rate_limit_group_new(struct event_base *base,
    641     const struct ev_token_bucket_cfg *cfg)
    642 {
    643 	struct bufferevent_rate_limit_group *g;
    644 	struct timeval now;
    645 	ev_uint32_t tick;
    646 
    647 	event_base_gettimeofday_cached(base, &now);
    648 	tick = ev_token_bucket_get_tick_(&now, cfg);
    649 
    650 	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
    651 	if (!g)
    652 		return NULL;
    653 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
    654 	LIST_INIT(&g->members);
    655 
    656 	ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
    657 
    658 	event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
    659 	    bev_group_refill_callback_, g);
    660 	/*XXXX handle event_add failure */
    661 	event_add(&g->master_refill_event, &cfg->tick_timeout);
    662 
    663 	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
    664 
    665 	bufferevent_rate_limit_group_set_min_share(g, 64);
    666 
    667 	evutil_weakrand_seed_(&g->weakrand_seed,
    668 	    (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
    669 
    670 	return g;
    671 }
    672 
    673 int
    674 bufferevent_rate_limit_group_set_cfg(
    675 	struct bufferevent_rate_limit_group *g,
    676 	const struct ev_token_bucket_cfg *cfg)
    677 {
    678 	int same_tick;
    679 	if (!g || !cfg)
    680 		return -1;
    681 
    682 	LOCK_GROUP(g);
    683 	same_tick = evutil_timercmp(
    684 		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
    685 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
    686 
    687 	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
    688 		g->rate_limit.read_limit = cfg->read_maximum;
    689 	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
    690 		g->rate_limit.write_limit = cfg->write_maximum;
    691 
    692 	if (!same_tick) {
    693 		/* This can cause a hiccup in the schedule */
    694 		event_add(&g->master_refill_event, &cfg->tick_timeout);
    695 	}
    696 
    697 	/* The new limits might force us to adjust min_share differently. */
    698 	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
    699 
    700 	UNLOCK_GROUP(g);
    701 	return 0;
    702 }
    703 
    704 int
    705 bufferevent_rate_limit_group_set_min_share(
    706 	struct bufferevent_rate_limit_group *g,
    707 	size_t share)
    708 {
    709 	if (share > EV_SSIZE_MAX)
    710 		return -1;
    711 
    712 	g->configured_min_share = share;
    713 
    714 	/* Can't set share to less than the one-tick maximum.  IOW, at steady
    715 	 * state, at least one connection can go per tick. */
    716 	if (share > g->rate_limit_cfg.read_rate)
    717 		share = g->rate_limit_cfg.read_rate;
    718 	if (share > g->rate_limit_cfg.write_rate)
    719 		share = g->rate_limit_cfg.write_rate;
    720 
    721 	g->min_share = share;
    722 	return 0;
    723 }
    724 
    725 void
    726 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
    727 {
    728 	LOCK_GROUP(g);
    729 	EVUTIL_ASSERT(0 == g->n_members);
    730 	event_del(&g->master_refill_event);
    731 	UNLOCK_GROUP(g);
    732 	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
    733 	mm_free(g);
    734 }
    735 
    736 int
    737 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
    738     struct bufferevent_rate_limit_group *g)
    739 {
    740 	int wsuspend, rsuspend;
    741 	struct bufferevent_private *bevp = BEV_UPCAST(bev);
    742 	BEV_LOCK(bev);
    743 
    744 	if (!bevp->rate_limiting) {
    745 		struct bufferevent_rate_limit *rlim;
    746 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
    747 		if (!rlim) {
    748 			BEV_UNLOCK(bev);
    749 			return -1;
    750 		}
    751 		event_assign(&rlim->refill_bucket_event, bev->ev_base,
    752 		    -1, EV_FINALIZE, bev_refill_callback_, bevp);
    753 		bevp->rate_limiting = rlim;
    754 	}
    755 
    756 	if (bevp->rate_limiting->group == g) {
    757 		BEV_UNLOCK(bev);
    758 		return 0;
    759 	}
    760 	if (bevp->rate_limiting->group)
    761 		bufferevent_remove_from_rate_limit_group(bev);
    762 
    763 	LOCK_GROUP(g);
    764 	bevp->rate_limiting->group = g;
    765 	++g->n_members;
    766 	LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
    767 
    768 	rsuspend = g->read_suspended;
    769 	wsuspend = g->write_suspended;
    770 
    771 	UNLOCK_GROUP(g);
    772 
    773 	if (rsuspend)
    774 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
    775 	if (wsuspend)
    776 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
    777 
    778 	BEV_UNLOCK(bev);
    779 	return 0;
    780 }
    781 
    782 int
    783 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
    784 {
    785 	return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
    786 }
    787 
    788 int
    789 bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
    790     int unsuspend)
    791 {
    792 	struct bufferevent_private *bevp = BEV_UPCAST(bev);
    793 	BEV_LOCK(bev);
    794 	if (bevp->rate_limiting && bevp->rate_limiting->group) {
    795 		struct bufferevent_rate_limit_group *g =
    796 		    bevp->rate_limiting->group;
    797 		LOCK_GROUP(g);
    798 		bevp->rate_limiting->group = NULL;
    799 		--g->n_members;
    800 		LIST_REMOVE(bevp, rate_limiting->next_in_group);
    801 		UNLOCK_GROUP(g);
    802 	}
    803 	if (unsuspend) {
    804 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
    805 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
    806 	}
    807 	BEV_UNLOCK(bev);
    808 	return 0;
    809 }
    810 
    811 /* ===
    812  * API functions to expose rate limits.
    813  *
    814  * Don't use these from inside Libevent; they're meant to be for use by
    815  * the program.
    816  * === */
    817 
    818 /* Mostly you don't want to use this function from inside libevent;
    819  * bufferevent_get_read_max_() is more likely what you want*/
    820 ev_ssize_t
    821 bufferevent_get_read_limit(struct bufferevent *bev)
    822 {
    823 	ev_ssize_t r;
    824 	struct bufferevent_private *bevp;
    825 	BEV_LOCK(bev);
    826 	bevp = BEV_UPCAST(bev);
    827 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
    828 		bufferevent_update_buckets(bevp);
    829 		r = bevp->rate_limiting->limit.read_limit;
    830 	} else {
    831 		r = EV_SSIZE_MAX;
    832 	}
    833 	BEV_UNLOCK(bev);
    834 	return r;
    835 }
    836 
    837 /* Mostly you don't want to use this function from inside libevent;
    838  * bufferevent_get_write_max_() is more likely what you want*/
    839 ev_ssize_t
    840 bufferevent_get_write_limit(struct bufferevent *bev)
    841 {
    842 	ev_ssize_t r;
    843 	struct bufferevent_private *bevp;
    844 	BEV_LOCK(bev);
    845 	bevp = BEV_UPCAST(bev);
    846 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
    847 		bufferevent_update_buckets(bevp);
    848 		r = bevp->rate_limiting->limit.write_limit;
    849 	} else {
    850 		r = EV_SSIZE_MAX;
    851 	}
    852 	BEV_UNLOCK(bev);
    853 	return r;
    854 }
    855 
    856 int
    857 bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
    858 {
    859 	struct bufferevent_private *bevp;
    860 	BEV_LOCK(bev);
    861 	bevp = BEV_UPCAST(bev);
    862 	if (size == 0 || size > EV_SSIZE_MAX)
    863 		bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
    864 	else
    865 		bevp->max_single_read = size;
    866 	BEV_UNLOCK(bev);
    867 	return 0;
    868 }
    869 
    870 int
    871 bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
    872 {
    873 	struct bufferevent_private *bevp;
    874 	BEV_LOCK(bev);
    875 	bevp = BEV_UPCAST(bev);
    876 	if (size == 0 || size > EV_SSIZE_MAX)
    877 		bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
    878 	else
    879 		bevp->max_single_write = size;
    880 	BEV_UNLOCK(bev);
    881 	return 0;
    882 }
    883 
    884 ev_ssize_t
    885 bufferevent_get_max_single_read(struct bufferevent *bev)
    886 {
    887 	ev_ssize_t r;
    888 
    889 	BEV_LOCK(bev);
    890 	r = BEV_UPCAST(bev)->max_single_read;
    891 	BEV_UNLOCK(bev);
    892 	return r;
    893 }
    894 
    895 ev_ssize_t
    896 bufferevent_get_max_single_write(struct bufferevent *bev)
    897 {
    898 	ev_ssize_t r;
    899 
    900 	BEV_LOCK(bev);
    901 	r = BEV_UPCAST(bev)->max_single_write;
    902 	BEV_UNLOCK(bev);
    903 	return r;
    904 }
    905 
    906 ev_ssize_t
    907 bufferevent_get_max_to_read(struct bufferevent *bev)
    908 {
    909 	ev_ssize_t r;
    910 	BEV_LOCK(bev);
    911 	r = bufferevent_get_read_max_(BEV_UPCAST(bev));
    912 	BEV_UNLOCK(bev);
    913 	return r;
    914 }
    915 
    916 ev_ssize_t
    917 bufferevent_get_max_to_write(struct bufferevent *bev)
    918 {
    919 	ev_ssize_t r;
    920 	BEV_LOCK(bev);
    921 	r = bufferevent_get_write_max_(BEV_UPCAST(bev));
    922 	BEV_UNLOCK(bev);
    923 	return r;
    924 }
    925 
    926 const struct ev_token_bucket_cfg *
    927 bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
    928 	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
    929 	struct ev_token_bucket_cfg *cfg;
    930 
    931 	BEV_LOCK(bev);
    932 
    933 	if (bufev_private->rate_limiting) {
    934 		cfg = bufev_private->rate_limiting->cfg;
    935 	} else {
    936 		cfg = NULL;
    937 	}
    938 
    939 	BEV_UNLOCK(bev);
    940 
    941 	return cfg;
    942 }
    943 
    944 /* Mostly you don't want to use this function from inside libevent;
    945  * bufferevent_get_read_max_() is more likely what you want*/
    946 ev_ssize_t
    947 bufferevent_rate_limit_group_get_read_limit(
    948 	struct bufferevent_rate_limit_group *grp)
    949 {
    950 	ev_ssize_t r;
    951 	LOCK_GROUP(grp);
    952 	r = grp->rate_limit.read_limit;
    953 	UNLOCK_GROUP(grp);
    954 	return r;
    955 }
    956 
    957 /* Mostly you don't want to use this function from inside libevent;
    958  * bufferevent_get_write_max_() is more likely what you want. */
    959 ev_ssize_t
    960 bufferevent_rate_limit_group_get_write_limit(
    961 	struct bufferevent_rate_limit_group *grp)
    962 {
    963 	ev_ssize_t r;
    964 	LOCK_GROUP(grp);
    965 	r = grp->rate_limit.write_limit;
    966 	UNLOCK_GROUP(grp);
    967 	return r;
    968 }
    969 
    970 int
    971 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
    972 {
    973 	int r = 0;
    974 	ev_ssize_t old_limit, new_limit;
    975 	struct bufferevent_private *bevp;
    976 	BEV_LOCK(bev);
    977 	bevp = BEV_UPCAST(bev);
    978 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
    979 	old_limit = bevp->rate_limiting->limit.read_limit;
    980 
    981 	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
    982 	if (old_limit > 0 && new_limit <= 0) {
    983 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
    984 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
    985 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
    986 			r = -1;
    987 	} else if (old_limit <= 0 && new_limit > 0) {
    988 		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
    989 			event_del(&bevp->rate_limiting->refill_bucket_event);
    990 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
    991 	}
    992 
    993 	BEV_UNLOCK(bev);
    994 	return r;
    995 }
    996 
    997 int
    998 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
    999 {
   1000 	/* XXXX this is mostly copy-and-paste from
   1001 	 * bufferevent_decrement_read_limit */
   1002 	int r = 0;
   1003 	ev_ssize_t old_limit, new_limit;
   1004 	struct bufferevent_private *bevp;
   1005 	BEV_LOCK(bev);
   1006 	bevp = BEV_UPCAST(bev);
   1007 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
   1008 	old_limit = bevp->rate_limiting->limit.write_limit;
   1009 
   1010 	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
   1011 	if (old_limit > 0 && new_limit <= 0) {
   1012 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
   1013 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
   1014 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
   1015 			r = -1;
   1016 	} else if (old_limit <= 0 && new_limit > 0) {
   1017 		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
   1018 			event_del(&bevp->rate_limiting->refill_bucket_event);
   1019 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
   1020 	}
   1021 
   1022 	BEV_UNLOCK(bev);
   1023 	return r;
   1024 }
   1025 
   1026 int
   1027 bufferevent_rate_limit_group_decrement_read(
   1028 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
   1029 {
   1030 	int r = 0;
   1031 	ev_ssize_t old_limit, new_limit;
   1032 	LOCK_GROUP(grp);
   1033 	old_limit = grp->rate_limit.read_limit;
   1034 	new_limit = (grp->rate_limit.read_limit -= decr);
   1035 
   1036 	if (old_limit > 0 && new_limit <= 0) {
   1037 		bev_group_suspend_reading_(grp);
   1038 	} else if (old_limit <= 0 && new_limit > 0) {
   1039 		bev_group_unsuspend_reading_(grp);
   1040 	}
   1041 
   1042 	UNLOCK_GROUP(grp);
   1043 	return r;
   1044 }
   1045 
   1046 int
   1047 bufferevent_rate_limit_group_decrement_write(
   1048 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
   1049 {
   1050 	int r = 0;
   1051 	ev_ssize_t old_limit, new_limit;
   1052 	LOCK_GROUP(grp);
   1053 	old_limit = grp->rate_limit.write_limit;
   1054 	new_limit = (grp->rate_limit.write_limit -= decr);
   1055 
   1056 	if (old_limit > 0 && new_limit <= 0) {
   1057 		bev_group_suspend_writing_(grp);
   1058 	} else if (old_limit <= 0 && new_limit > 0) {
   1059 		bev_group_unsuspend_writing_(grp);
   1060 	}
   1061 
   1062 	UNLOCK_GROUP(grp);
   1063 	return r;
   1064 }
   1065 
   1066 void
   1067 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
   1068     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
   1069 {
   1070 	EVUTIL_ASSERT(grp != NULL);
   1071 	if (total_read_out)
   1072 		*total_read_out = grp->total_read;
   1073 	if (total_written_out)
   1074 		*total_written_out = grp->total_written;
   1075 }
   1076 
   1077 void
   1078 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
   1079 {
   1080 	grp->total_read = grp->total_written = 0;
   1081 }
   1082 
   1083 int
   1084 bufferevent_ratelim_init_(struct bufferevent_private *bev)
   1085 {
   1086 	bev->rate_limiting = NULL;
   1087 	bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
   1088 	bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
   1089 
   1090 	return 0;
   1091 }
   1092