Home | History | Annotate | Line # | Download | only in libevent
bufferevent_ratelim.c revision 1.1.1.1.8.2
      1 /*	$NetBSD: bufferevent_ratelim.c,v 1.1.1.1.8.2 2014/08/19 23:51:45 tls Exp $	*/
      2 
      3 /*
      4  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
      5  * Copyright (c) 2002-2006 Niels Provos <provos (at) citi.umich.edu>
      6  * All rights reserved.
      7  *
      8  * Redistribution and use in source and binary forms, with or without
      9  * modification, are permitted provided that the following conditions
     10  * are met:
     11  * 1. Redistributions of source code must retain the above copyright
     12  *    notice, this list of conditions and the following disclaimer.
     13  * 2. Redistributions in binary form must reproduce the above copyright
     14  *    notice, this list of conditions and the following disclaimer in the
     15  *    documentation and/or other materials provided with the distribution.
     16  * 3. The name of the author may not be used to endorse or promote products
     17  *    derived from this software without specific prior written permission.
     18  *
     19  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     20  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     21  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     22  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     23  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     24  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     28  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     29  */
     30 #include "evconfig-private.h"
     31 
     32 #include <sys/types.h>
     33 #include <limits.h>
     34 #include <string.h>
     35 #include <stdlib.h>
     36 
     37 #include "event2/event.h"
     38 #include "event2/event_struct.h"
     39 #include "event2/util.h"
     40 #include "event2/bufferevent.h"
     41 #include "event2/bufferevent_struct.h"
     42 #include "event2/buffer.h"
     43 
     44 #include "ratelim-internal.h"
     45 
     46 #include "bufferevent-internal.h"
     47 #include "mm-internal.h"
     48 #include "util-internal.h"
     49 #include "event-internal.h"
     50 
     51 int
     52 ev_token_bucket_init_(struct ev_token_bucket *bucket,
     53     const struct ev_token_bucket_cfg *cfg,
     54     ev_uint32_t current_tick,
     55     int reinitialize)
     56 {
     57 	if (reinitialize) {
     58 		/* on reinitialization, we only clip downwards, since we've
     59 		   already used who-knows-how-much bandwidth this tick.  We
     60 		   leave "last_updated" as it is; the next update will add the
     61 		   appropriate amount of bandwidth to the bucket.
     62 		*/
     63 		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
     64 			bucket->read_limit = cfg->read_maximum;
     65 		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
     66 			bucket->write_limit = cfg->write_maximum;
     67 	} else {
     68 		bucket->read_limit = cfg->read_rate;
     69 		bucket->write_limit = cfg->write_rate;
     70 		bucket->last_updated = current_tick;
     71 	}
     72 	return 0;
     73 }
     74 
     75 int
     76 ev_token_bucket_update_(struct ev_token_bucket *bucket,
     77     const struct ev_token_bucket_cfg *cfg,
     78     ev_uint32_t current_tick)
     79 {
     80 	/* It's okay if the tick number overflows, since we'll just
     81 	 * wrap around when we do the unsigned substraction. */
     82 	unsigned n_ticks = current_tick - bucket->last_updated;
     83 
     84 	/* Make sure some ticks actually happened, and that time didn't
     85 	 * roll back. */
     86 	if (n_ticks == 0 || n_ticks > INT_MAX)
     87 		return 0;
     88 
     89 	/* Naively, we would say
     90 		bucket->limit += n_ticks * cfg->rate;
     91 
     92 		if (bucket->limit > cfg->maximum)
     93 			bucket->limit = cfg->maximum;
     94 
     95 	   But we're worried about overflow, so we do it like this:
     96 	*/
     97 
     98 	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
     99 		bucket->read_limit = cfg->read_maximum;
    100 	else
    101 		bucket->read_limit += n_ticks * cfg->read_rate;
    102 
    103 
    104 	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
    105 		bucket->write_limit = cfg->write_maximum;
    106 	else
    107 		bucket->write_limit += n_ticks * cfg->write_rate;
    108 
    109 
    110 	bucket->last_updated = current_tick;
    111 
    112 	return 1;
    113 }
    114 
    115 static inline void
    116 bufferevent_update_buckets(struct bufferevent_private *bev)
    117 {
    118 	/* Must hold lock on bev. */
    119 	struct timeval now;
    120 	unsigned tick;
    121 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
    122 	tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
    123 	if (tick != bev->rate_limiting->limit.last_updated)
    124 		ev_token_bucket_update_(&bev->rate_limiting->limit,
    125 		    bev->rate_limiting->cfg, tick);
    126 }
    127 
    128 ev_uint32_t
    129 ev_token_bucket_get_tick_(const struct timeval *tv,
    130     const struct ev_token_bucket_cfg *cfg)
    131 {
    132 	/* This computation uses two multiplies and a divide.  We could do
    133 	 * fewer if we knew that the tick length was an integer number of
    134 	 * seconds, or if we knew it divided evenly into a second.  We should
    135 	 * investigate that more.
    136 	 */
    137 
    138 	/* We cast to an ev_uint64_t first, since we don't want to overflow
    139 	 * before we do the final divide. */
    140 	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
    141 	return (unsigned)(msec / cfg->msec_per_tick);
    142 }
    143 
    144 struct ev_token_bucket_cfg *
    145 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
    146     size_t write_rate, size_t write_burst,
    147     const struct timeval *tick_len)
    148 {
    149 	struct ev_token_bucket_cfg *r;
    150 	struct timeval g;
    151 	if (! tick_len) {
    152 		g.tv_sec = 1;
    153 		g.tv_usec = 0;
    154 		tick_len = &g;
    155 	}
    156 	if (read_rate > read_burst || write_rate > write_burst ||
    157 	    read_rate < 1 || write_rate < 1)
    158 		return NULL;
    159 	if (read_rate > EV_RATE_LIMIT_MAX ||
    160 	    write_rate > EV_RATE_LIMIT_MAX ||
    161 	    read_burst > EV_RATE_LIMIT_MAX ||
    162 	    write_burst > EV_RATE_LIMIT_MAX)
    163 		return NULL;
    164 	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
    165 	if (!r)
    166 		return NULL;
    167 	r->read_rate = read_rate;
    168 	r->write_rate = write_rate;
    169 	r->read_maximum = read_burst;
    170 	r->write_maximum = write_burst;
    171 	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
    172 	r->msec_per_tick = (tick_len->tv_sec * 1000) +
    173 	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
    174 	return r;
    175 }
    176 
    177 void
    178 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
    179 {
    180 	mm_free(cfg);
    181 }
    182 
    183 /* Default values for max_single_read & max_single_write variables. */
    184 #define MAX_SINGLE_READ_DEFAULT 16384
    185 #define MAX_SINGLE_WRITE_DEFAULT 16384
    186 
    187 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
    188 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
    189 
    190 static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
    191 static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
    192 static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
    193 static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
    194 
    195 /** Helper: figure out the maximum amount we should write if is_write, or
    196     the maximum amount we should read if is_read.  Return that maximum, or
    197     0 if our bucket is wholly exhausted.
    198  */
    199 static inline ev_ssize_t
    200 bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
    201 {
    202 	/* needs lock on bev. */
    203 	ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
    204 
    205 #define LIM(x)						\
    206 	(is_write ? (x).write_limit : (x).read_limit)
    207 
    208 #define GROUP_SUSPENDED(g)			\
    209 	(is_write ? (g)->write_suspended : (g)->read_suspended)
    210 
    211 	/* Sets max_so_far to MIN(x, max_so_far) */
    212 #define CLAMPTO(x)				\
    213 	do {					\
    214 		if (max_so_far > (x))		\
    215 			max_so_far = (x);	\
    216 	} while (0);
    217 
    218 	if (!bev->rate_limiting)
    219 		return max_so_far;
    220 
    221 	/* If rate-limiting is enabled at all, update the appropriate
    222 	   bucket, and take the smaller of our rate limit and the group
    223 	   rate limit.
    224 	 */
    225 
    226 	if (bev->rate_limiting->cfg) {
    227 		bufferevent_update_buckets(bev);
    228 		max_so_far = LIM(bev->rate_limiting->limit);
    229 	}
    230 	if (bev->rate_limiting->group) {
    231 		struct bufferevent_rate_limit_group *g =
    232 		    bev->rate_limiting->group;
    233 		ev_ssize_t share;
    234 		LOCK_GROUP(g);
    235 		if (GROUP_SUSPENDED(g)) {
    236 			/* We can get here if we failed to lock this
    237 			 * particular bufferevent while suspending the whole
    238 			 * group. */
    239 			if (is_write)
    240 				bufferevent_suspend_write_(&bev->bev,
    241 				    BEV_SUSPEND_BW_GROUP);
    242 			else
    243 				bufferevent_suspend_read_(&bev->bev,
    244 				    BEV_SUSPEND_BW_GROUP);
    245 			share = 0;
    246 		} else {
    247 			/* XXXX probably we should divide among the active
    248 			 * members, not the total members. */
    249 			share = LIM(g->rate_limit) / g->n_members;
    250 			if (share < g->min_share)
    251 				share = g->min_share;
    252 		}
    253 		UNLOCK_GROUP(g);
    254 		CLAMPTO(share);
    255 	}
    256 
    257 	if (max_so_far < 0)
    258 		max_so_far = 0;
    259 	return max_so_far;
    260 }
    261 
    262 ev_ssize_t
    263 bufferevent_get_read_max_(struct bufferevent_private *bev)
    264 {
    265 	return bufferevent_get_rlim_max_(bev, 0);
    266 }
    267 
    268 ev_ssize_t
    269 bufferevent_get_write_max_(struct bufferevent_private *bev)
    270 {
    271 	return bufferevent_get_rlim_max_(bev, 1);
    272 }
    273 
    274 int
    275 bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
    276 {
    277 	/* XXXXX Make sure all users of this function check its return value */
    278 	int r = 0;
    279 	/* need to hold lock on bev */
    280 	if (!bev->rate_limiting)
    281 		return 0;
    282 
    283 	if (bev->rate_limiting->cfg) {
    284 		bev->rate_limiting->limit.read_limit -= bytes;
    285 		if (bev->rate_limiting->limit.read_limit <= 0) {
    286 			bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
    287 			if (event_add(&bev->rate_limiting->refill_bucket_event,
    288 				&bev->rate_limiting->cfg->tick_timeout) < 0)
    289 				r = -1;
    290 		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
    291 			if (!(bev->write_suspended & BEV_SUSPEND_BW))
    292 				event_del(&bev->rate_limiting->refill_bucket_event);
    293 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
    294 		}
    295 	}
    296 
    297 	if (bev->rate_limiting->group) {
    298 		LOCK_GROUP(bev->rate_limiting->group);
    299 		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
    300 		bev->rate_limiting->group->total_read += bytes;
    301 		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
    302 			bev_group_suspend_reading_(bev->rate_limiting->group);
    303 		} else if (bev->rate_limiting->group->read_suspended) {
    304 			bev_group_unsuspend_reading_(bev->rate_limiting->group);
    305 		}
    306 		UNLOCK_GROUP(bev->rate_limiting->group);
    307 	}
    308 
    309 	return r;
    310 }
    311 
    312 int
    313 bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
    314 {
    315 	/* XXXXX Make sure all users of this function check its return value */
    316 	int r = 0;
    317 	/* need to hold lock */
    318 	if (!bev->rate_limiting)
    319 		return 0;
    320 
    321 	if (bev->rate_limiting->cfg) {
    322 		bev->rate_limiting->limit.write_limit -= bytes;
    323 		if (bev->rate_limiting->limit.write_limit <= 0) {
    324 			bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
    325 			if (event_add(&bev->rate_limiting->refill_bucket_event,
    326 				&bev->rate_limiting->cfg->tick_timeout) < 0)
    327 				r = -1;
    328 		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
    329 			if (!(bev->read_suspended & BEV_SUSPEND_BW))
    330 				event_del(&bev->rate_limiting->refill_bucket_event);
    331 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
    332 		}
    333 	}
    334 
    335 	if (bev->rate_limiting->group) {
    336 		LOCK_GROUP(bev->rate_limiting->group);
    337 		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
    338 		bev->rate_limiting->group->total_written += bytes;
    339 		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
    340 			bev_group_suspend_writing_(bev->rate_limiting->group);
    341 		} else if (bev->rate_limiting->group->write_suspended) {
    342 			bev_group_unsuspend_writing_(bev->rate_limiting->group);
    343 		}
    344 		UNLOCK_GROUP(bev->rate_limiting->group);
    345 	}
    346 
    347 	return r;
    348 }
    349 
    350 /** Stop reading on every bufferevent in <b>g</b> */
    351 static int
    352 bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
    353 {
    354 	/* Needs group lock */
    355 	struct bufferevent_private *bev;
    356 	g->read_suspended = 1;
    357 	g->pending_unsuspend_read = 0;
    358 
    359 	/* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
    360 	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
    361 	   the bufferevent locks.  If we are unable to lock any individual
    362 	   bufferevent, it will find out later when it looks at its limit
    363 	   and sees that its group is suspended.)
    364 	*/
    365 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
    366 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
    367 			bufferevent_suspend_read_(&bev->bev,
    368 			    BEV_SUSPEND_BW_GROUP);
    369 			EVLOCK_UNLOCK(bev->lock, 0);
    370 		}
    371 	}
    372 	return 0;
    373 }
    374 
    375 /** Stop writing on every bufferevent in <b>g</b> */
    376 static int
    377 bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
    378 {
    379 	/* Needs group lock */
    380 	struct bufferevent_private *bev;
    381 	g->write_suspended = 1;
    382 	g->pending_unsuspend_write = 0;
    383 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
    384 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
    385 			bufferevent_suspend_write_(&bev->bev,
    386 			    BEV_SUSPEND_BW_GROUP);
    387 			EVLOCK_UNLOCK(bev->lock, 0);
    388 		}
    389 	}
    390 	return 0;
    391 }
    392 
    393 /** Timer callback invoked on a single bufferevent with one or more exhausted
    394     buckets when they are ready to refill. */
    395 static void
    396 bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
    397 {
    398 	unsigned tick;
    399 	struct timeval now;
    400 	struct bufferevent_private *bev = arg;
    401 	int again = 0;
    402 	BEV_LOCK(&bev->bev);
    403 	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
    404 		BEV_UNLOCK(&bev->bev);
    405 		return;
    406 	}
    407 
    408 	/* First, update the bucket */
    409 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
    410 	tick = ev_token_bucket_get_tick_(&now,
    411 	    bev->rate_limiting->cfg);
    412 	ev_token_bucket_update_(&bev->rate_limiting->limit,
    413 	    bev->rate_limiting->cfg,
    414 	    tick);
    415 
    416 	/* Now unsuspend any read/write operations as appropriate. */
    417 	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
    418 		if (bev->rate_limiting->limit.read_limit > 0)
    419 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
    420 		else
    421 			again = 1;
    422 	}
    423 	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
    424 		if (bev->rate_limiting->limit.write_limit > 0)
    425 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
    426 		else
    427 			again = 1;
    428 	}
    429 	if (again) {
    430 		/* One or more of the buckets may need another refill if they
    431 		   started negative.
    432 
    433 		   XXXX if we need to be quiet for more ticks, we should
    434 		   maybe figure out what timeout we really want.
    435 		*/
    436 		/* XXXX Handle event_add failure somehow */
    437 		event_add(&bev->rate_limiting->refill_bucket_event,
    438 		    &bev->rate_limiting->cfg->tick_timeout);
    439 	}
    440 	BEV_UNLOCK(&bev->bev);
    441 }
    442 
    443 /** Helper: grab a random element from a bufferevent group.
    444  *
    445  * Requires that we hold the lock on the group.
    446  */
    447 static struct bufferevent_private *
    448 bev_group_random_element_(struct bufferevent_rate_limit_group *group)
    449 {
    450 	int which;
    451 	struct bufferevent_private *bev;
    452 
    453 	/* requires group lock */
    454 
    455 	if (!group->n_members)
    456 		return NULL;
    457 
    458 	EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
    459 
    460 	which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
    461 
    462 	bev = LIST_FIRST(&group->members);
    463 	while (which--)
    464 		bev = LIST_NEXT(bev, rate_limiting->next_in_group);
    465 
    466 	return bev;
    467 }
    468 
    469 /** Iterate over the elements of a rate-limiting group 'g' with a random
    470     starting point, assigning each to the variable 'bev', and executing the
    471     block 'block'.
    472 
    473     We do this in a half-baked effort to get fairness among group members.
    474     XXX Round-robin or some kind of priority queue would be even more fair.
    475  */
    476 #define FOREACH_RANDOM_ORDER(block)			\
    477 	do {						\
    478 		first = bev_group_random_element_(g);	\
    479 		for (bev = first; bev != LIST_END(&g->members); \
    480 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
    481 			block ;					 \
    482 		}						 \
    483 		for (bev = LIST_FIRST(&g->members); bev && bev != first; \
    484 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
    485 			block ;						\
    486 		}							\
    487 	} while (0)
    488 
    489 static void
    490 bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
    491 {
    492 	int again = 0;
    493 	struct bufferevent_private *bev, *first;
    494 
    495 	g->read_suspended = 0;
    496 	FOREACH_RANDOM_ORDER({
    497 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
    498 			bufferevent_unsuspend_read_(&bev->bev,
    499 			    BEV_SUSPEND_BW_GROUP);
    500 			EVLOCK_UNLOCK(bev->lock, 0);
    501 		} else {
    502 			again = 1;
    503 		}
    504 	});
    505 	g->pending_unsuspend_read = again;
    506 }
    507 
    508 static void
    509 bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
    510 {
    511 	int again = 0;
    512 	struct bufferevent_private *bev, *first;
    513 	g->write_suspended = 0;
    514 
    515 	FOREACH_RANDOM_ORDER({
    516 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
    517 			bufferevent_unsuspend_write_(&bev->bev,
    518 			    BEV_SUSPEND_BW_GROUP);
    519 			EVLOCK_UNLOCK(bev->lock, 0);
    520 		} else {
    521 			again = 1;
    522 		}
    523 	});
    524 	g->pending_unsuspend_write = again;
    525 }
    526 
    527 /** Callback invoked every tick to add more elements to the group bucket
    528     and unsuspend group members as needed.
    529  */
    530 static void
    531 bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
    532 {
    533 	struct bufferevent_rate_limit_group *g = arg;
    534 	unsigned tick;
    535 	struct timeval now;
    536 
    537 	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
    538 
    539 	LOCK_GROUP(g);
    540 
    541 	tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
    542 	ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
    543 
    544 	if (g->pending_unsuspend_read ||
    545 	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
    546 		bev_group_unsuspend_reading_(g);
    547 	}
    548 	if (g->pending_unsuspend_write ||
    549 	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
    550 		bev_group_unsuspend_writing_(g);
    551 	}
    552 
    553 	/* XXXX Rather than waiting to the next tick to unsuspend stuff
    554 	 * with pending_unsuspend_write/read, we should do it on the
    555 	 * next iteration of the mainloop.
    556 	 */
    557 
    558 	UNLOCK_GROUP(g);
    559 }
    560 
    561 int
    562 bufferevent_set_rate_limit(struct bufferevent *bev,
    563     struct ev_token_bucket_cfg *cfg)
    564 {
    565 	struct bufferevent_private *bevp =
    566 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
    567 	int r = -1;
    568 	struct bufferevent_rate_limit *rlim;
    569 	struct timeval now;
    570 	ev_uint32_t tick;
    571 	int reinit = 0, suspended = 0;
    572 	/* XXX reference-count cfg */
    573 
    574 	BEV_LOCK(bev);
    575 
    576 	if (cfg == NULL) {
    577 		if (bevp->rate_limiting) {
    578 			rlim = bevp->rate_limiting;
    579 			rlim->cfg = NULL;
    580 			bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
    581 			bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
    582 			if (event_initialized(&rlim->refill_bucket_event))
    583 				event_del(&rlim->refill_bucket_event);
    584 		}
    585 		r = 0;
    586 		goto done;
    587 	}
    588 
    589 	event_base_gettimeofday_cached(bev->ev_base, &now);
    590 	tick = ev_token_bucket_get_tick_(&now, cfg);
    591 
    592 	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
    593 		/* no-op */
    594 		r = 0;
    595 		goto done;
    596 	}
    597 	if (bevp->rate_limiting == NULL) {
    598 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
    599 		if (!rlim)
    600 			goto done;
    601 		bevp->rate_limiting = rlim;
    602 	} else {
    603 		rlim = bevp->rate_limiting;
    604 	}
    605 	reinit = rlim->cfg != NULL;
    606 
    607 	rlim->cfg = cfg;
    608 	ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
    609 
    610 	if (reinit) {
    611 		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
    612 		event_del(&rlim->refill_bucket_event);
    613 	}
    614 	evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
    615 	    bev_refill_callback_, bevp);
    616 
    617 	if (rlim->limit.read_limit > 0) {
    618 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
    619 	} else {
    620 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
    621 		suspended=1;
    622 	}
    623 	if (rlim->limit.write_limit > 0) {
    624 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
    625 	} else {
    626 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
    627 		suspended = 1;
    628 	}
    629 
    630 	if (suspended)
    631 		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
    632 
    633 	r = 0;
    634 
    635 done:
    636 	BEV_UNLOCK(bev);
    637 	return r;
    638 }
    639 
    640 struct bufferevent_rate_limit_group *
    641 bufferevent_rate_limit_group_new(struct event_base *base,
    642     const struct ev_token_bucket_cfg *cfg)
    643 {
    644 	struct bufferevent_rate_limit_group *g;
    645 	struct timeval now;
    646 	ev_uint32_t tick;
    647 
    648 	event_base_gettimeofday_cached(base, &now);
    649 	tick = ev_token_bucket_get_tick_(&now, cfg);
    650 
    651 	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
    652 	if (!g)
    653 		return NULL;
    654 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
    655 	LIST_INIT(&g->members);
    656 
    657 	ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
    658 
    659 	event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
    660 	    bev_group_refill_callback_, g);
    661 	/*XXXX handle event_add failure */
    662 	event_add(&g->master_refill_event, &cfg->tick_timeout);
    663 
    664 	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
    665 
    666 	bufferevent_rate_limit_group_set_min_share(g, 64);
    667 
    668 	evutil_weakrand_seed_(&g->weakrand_seed,
    669 	    (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
    670 
    671 	return g;
    672 }
    673 
    674 int
    675 bufferevent_rate_limit_group_set_cfg(
    676 	struct bufferevent_rate_limit_group *g,
    677 	const struct ev_token_bucket_cfg *cfg)
    678 {
    679 	int same_tick;
    680 	if (!g || !cfg)
    681 		return -1;
    682 
    683 	LOCK_GROUP(g);
    684 	same_tick = evutil_timercmp(
    685 		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
    686 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
    687 
    688 	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
    689 		g->rate_limit.read_limit = cfg->read_maximum;
    690 	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
    691 		g->rate_limit.write_limit = cfg->write_maximum;
    692 
    693 	if (!same_tick) {
    694 		/* This can cause a hiccup in the schedule */
    695 		event_add(&g->master_refill_event, &cfg->tick_timeout);
    696 	}
    697 
    698 	/* The new limits might force us to adjust min_share differently. */
    699 	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
    700 
    701 	UNLOCK_GROUP(g);
    702 	return 0;
    703 }
    704 
    705 int
    706 bufferevent_rate_limit_group_set_min_share(
    707 	struct bufferevent_rate_limit_group *g,
    708 	size_t share)
    709 {
    710 	if (share > EV_SSIZE_MAX)
    711 		return -1;
    712 
    713 	g->configured_min_share = share;
    714 
    715 	/* Can't set share to less than the one-tick maximum.  IOW, at steady
    716 	 * state, at least one connection can go per tick. */
    717 	if (share > g->rate_limit_cfg.read_rate)
    718 		share = g->rate_limit_cfg.read_rate;
    719 	if (share > g->rate_limit_cfg.write_rate)
    720 		share = g->rate_limit_cfg.write_rate;
    721 
    722 	g->min_share = share;
    723 	return 0;
    724 }
    725 
    726 void
    727 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
    728 {
    729 	LOCK_GROUP(g);
    730 	EVUTIL_ASSERT(0 == g->n_members);
    731 	event_del(&g->master_refill_event);
    732 	UNLOCK_GROUP(g);
    733 	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
    734 	mm_free(g);
    735 }
    736 
    737 int
    738 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
    739     struct bufferevent_rate_limit_group *g)
    740 {
    741 	int wsuspend, rsuspend;
    742 	struct bufferevent_private *bevp =
    743 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
    744 	BEV_LOCK(bev);
    745 
    746 	if (!bevp->rate_limiting) {
    747 		struct bufferevent_rate_limit *rlim;
    748 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
    749 		if (!rlim) {
    750 			BEV_UNLOCK(bev);
    751 			return -1;
    752 		}
    753 		evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
    754 		    bev_refill_callback_, bevp);
    755 		bevp->rate_limiting = rlim;
    756 	}
    757 
    758 	if (bevp->rate_limiting->group == g) {
    759 		BEV_UNLOCK(bev);
    760 		return 0;
    761 	}
    762 	if (bevp->rate_limiting->group)
    763 		bufferevent_remove_from_rate_limit_group(bev);
    764 
    765 	LOCK_GROUP(g);
    766 	bevp->rate_limiting->group = g;
    767 	++g->n_members;
    768 	LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
    769 
    770 	rsuspend = g->read_suspended;
    771 	wsuspend = g->write_suspended;
    772 
    773 	UNLOCK_GROUP(g);
    774 
    775 	if (rsuspend)
    776 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
    777 	if (wsuspend)
    778 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
    779 
    780 	BEV_UNLOCK(bev);
    781 	return 0;
    782 }
    783 
    784 int
    785 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
    786 {
    787 	return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
    788 }
    789 
    790 int
    791 bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
    792     int unsuspend)
    793 {
    794 	struct bufferevent_private *bevp =
    795 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
    796 	BEV_LOCK(bev);
    797 	if (bevp->rate_limiting && bevp->rate_limiting->group) {
    798 		struct bufferevent_rate_limit_group *g =
    799 		    bevp->rate_limiting->group;
    800 		LOCK_GROUP(g);
    801 		bevp->rate_limiting->group = NULL;
    802 		--g->n_members;
    803 		LIST_REMOVE(bevp, rate_limiting->next_in_group);
    804 		UNLOCK_GROUP(g);
    805 	}
    806 	if (unsuspend) {
    807 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
    808 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
    809 	}
    810 	BEV_UNLOCK(bev);
    811 	return 0;
    812 }
    813 
    814 /* ===
    815  * API functions to expose rate limits.
    816  *
    817  * Don't use these from inside Libevent; they're meant to be for use by
    818  * the program.
    819  * === */
    820 
    821 /* Mostly you don't want to use this function from inside libevent;
    822  * bufferevent_get_read_max_() is more likely what you want*/
    823 ev_ssize_t
    824 bufferevent_get_read_limit(struct bufferevent *bev)
    825 {
    826 	ev_ssize_t r;
    827 	struct bufferevent_private *bevp;
    828 	BEV_LOCK(bev);
    829 	bevp = BEV_UPCAST(bev);
    830 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
    831 		bufferevent_update_buckets(bevp);
    832 		r = bevp->rate_limiting->limit.read_limit;
    833 	} else {
    834 		r = EV_SSIZE_MAX;
    835 	}
    836 	BEV_UNLOCK(bev);
    837 	return r;
    838 }
    839 
    840 /* Mostly you don't want to use this function from inside libevent;
    841  * bufferevent_get_write_max_() is more likely what you want*/
    842 ev_ssize_t
    843 bufferevent_get_write_limit(struct bufferevent *bev)
    844 {
    845 	ev_ssize_t r;
    846 	struct bufferevent_private *bevp;
    847 	BEV_LOCK(bev);
    848 	bevp = BEV_UPCAST(bev);
    849 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
    850 		bufferevent_update_buckets(bevp);
    851 		r = bevp->rate_limiting->limit.write_limit;
    852 	} else {
    853 		r = EV_SSIZE_MAX;
    854 	}
    855 	BEV_UNLOCK(bev);
    856 	return r;
    857 }
    858 
    859 int
    860 bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
    861 {
    862 	struct bufferevent_private *bevp;
    863 	BEV_LOCK(bev);
    864 	bevp = BEV_UPCAST(bev);
    865 	if (size == 0 || size > EV_SSIZE_MAX)
    866 		bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
    867 	else
    868 		bevp->max_single_read = size;
    869 	BEV_UNLOCK(bev);
    870 	return 0;
    871 }
    872 
    873 int
    874 bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
    875 {
    876 	struct bufferevent_private *bevp;
    877 	BEV_LOCK(bev);
    878 	bevp = BEV_UPCAST(bev);
    879 	if (size == 0 || size > EV_SSIZE_MAX)
    880 		bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
    881 	else
    882 		bevp->max_single_write = size;
    883 	BEV_UNLOCK(bev);
    884 	return 0;
    885 }
    886 
    887 ev_ssize_t
    888 bufferevent_get_max_single_read(struct bufferevent *bev)
    889 {
    890 	ev_ssize_t r;
    891 
    892 	BEV_LOCK(bev);
    893 	r = BEV_UPCAST(bev)->max_single_read;
    894 	BEV_UNLOCK(bev);
    895 	return r;
    896 }
    897 
    898 ev_ssize_t
    899 bufferevent_get_max_single_write(struct bufferevent *bev)
    900 {
    901 	ev_ssize_t r;
    902 
    903 	BEV_LOCK(bev);
    904 	r = BEV_UPCAST(bev)->max_single_write;
    905 	BEV_UNLOCK(bev);
    906 	return r;
    907 }
    908 
    909 ev_ssize_t
    910 bufferevent_get_max_to_read(struct bufferevent *bev)
    911 {
    912 	ev_ssize_t r;
    913 	BEV_LOCK(bev);
    914 	r = bufferevent_get_read_max_(BEV_UPCAST(bev));
    915 	BEV_UNLOCK(bev);
    916 	return r;
    917 }
    918 
    919 ev_ssize_t
    920 bufferevent_get_max_to_write(struct bufferevent *bev)
    921 {
    922 	ev_ssize_t r;
    923 	BEV_LOCK(bev);
    924 	r = bufferevent_get_write_max_(BEV_UPCAST(bev));
    925 	BEV_UNLOCK(bev);
    926 	return r;
    927 }
    928 
    929 
    930 /* Mostly you don't want to use this function from inside libevent;
    931  * bufferevent_get_read_max_() is more likely what you want*/
    932 ev_ssize_t
    933 bufferevent_rate_limit_group_get_read_limit(
    934 	struct bufferevent_rate_limit_group *grp)
    935 {
    936 	ev_ssize_t r;
    937 	LOCK_GROUP(grp);
    938 	r = grp->rate_limit.read_limit;
    939 	UNLOCK_GROUP(grp);
    940 	return r;
    941 }
    942 
    943 /* Mostly you don't want to use this function from inside libevent;
    944  * bufferevent_get_write_max_() is more likely what you want. */
    945 ev_ssize_t
    946 bufferevent_rate_limit_group_get_write_limit(
    947 	struct bufferevent_rate_limit_group *grp)
    948 {
    949 	ev_ssize_t r;
    950 	LOCK_GROUP(grp);
    951 	r = grp->rate_limit.write_limit;
    952 	UNLOCK_GROUP(grp);
    953 	return r;
    954 }
    955 
    956 int
    957 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
    958 {
    959 	int r = 0;
    960 	ev_ssize_t old_limit, new_limit;
    961 	struct bufferevent_private *bevp;
    962 	BEV_LOCK(bev);
    963 	bevp = BEV_UPCAST(bev);
    964 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
    965 	old_limit = bevp->rate_limiting->limit.read_limit;
    966 
    967 	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
    968 	if (old_limit > 0 && new_limit <= 0) {
    969 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
    970 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
    971 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
    972 			r = -1;
    973 	} else if (old_limit <= 0 && new_limit > 0) {
    974 		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
    975 			event_del(&bevp->rate_limiting->refill_bucket_event);
    976 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
    977 	}
    978 
    979 	BEV_UNLOCK(bev);
    980 	return r;
    981 }
    982 
    983 int
    984 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
    985 {
    986 	/* XXXX this is mostly copy-and-paste from
    987 	 * bufferevent_decrement_read_limit */
    988 	int r = 0;
    989 	ev_ssize_t old_limit, new_limit;
    990 	struct bufferevent_private *bevp;
    991 	BEV_LOCK(bev);
    992 	bevp = BEV_UPCAST(bev);
    993 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
    994 	old_limit = bevp->rate_limiting->limit.write_limit;
    995 
    996 	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
    997 	if (old_limit > 0 && new_limit <= 0) {
    998 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
    999 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
   1000 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
   1001 			r = -1;
   1002 	} else if (old_limit <= 0 && new_limit > 0) {
   1003 		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
   1004 			event_del(&bevp->rate_limiting->refill_bucket_event);
   1005 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
   1006 	}
   1007 
   1008 	BEV_UNLOCK(bev);
   1009 	return r;
   1010 }
   1011 
   1012 int
   1013 bufferevent_rate_limit_group_decrement_read(
   1014 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
   1015 {
   1016 	int r = 0;
   1017 	ev_ssize_t old_limit, new_limit;
   1018 	LOCK_GROUP(grp);
   1019 	old_limit = grp->rate_limit.read_limit;
   1020 	new_limit = (grp->rate_limit.read_limit -= decr);
   1021 
   1022 	if (old_limit > 0 && new_limit <= 0) {
   1023 		bev_group_suspend_reading_(grp);
   1024 	} else if (old_limit <= 0 && new_limit > 0) {
   1025 		bev_group_unsuspend_reading_(grp);
   1026 	}
   1027 
   1028 	UNLOCK_GROUP(grp);
   1029 	return r;
   1030 }
   1031 
   1032 int
   1033 bufferevent_rate_limit_group_decrement_write(
   1034 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
   1035 {
   1036 	int r = 0;
   1037 	ev_ssize_t old_limit, new_limit;
   1038 	LOCK_GROUP(grp);
   1039 	old_limit = grp->rate_limit.write_limit;
   1040 	new_limit = (grp->rate_limit.write_limit -= decr);
   1041 
   1042 	if (old_limit > 0 && new_limit <= 0) {
   1043 		bev_group_suspend_writing_(grp);
   1044 	} else if (old_limit <= 0 && new_limit > 0) {
   1045 		bev_group_unsuspend_writing_(grp);
   1046 	}
   1047 
   1048 	UNLOCK_GROUP(grp);
   1049 	return r;
   1050 }
   1051 
   1052 void
   1053 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
   1054     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
   1055 {
   1056 	EVUTIL_ASSERT(grp != NULL);
   1057 	if (total_read_out)
   1058 		*total_read_out = grp->total_read;
   1059 	if (total_written_out)
   1060 		*total_written_out = grp->total_written;
   1061 }
   1062 
   1063 void
   1064 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
   1065 {
   1066 	grp->total_read = grp->total_written = 0;
   1067 }
   1068 
   1069 int
   1070 bufferevent_ratelim_init_(struct bufferevent_private *bev)
   1071 {
   1072 	bev->rate_limiting = NULL;
   1073 	bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
   1074 	bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
   1075 
   1076 	return 0;
   1077 }
   1078