Home | History | Annotate | Line # | Download | only in libevent
bufferevent_ratelim.c revision 1.1.1.2
      1      1.1  christos /*	$NetBSD: bufferevent_ratelim.c,v 1.1.1.2 2014/12/19 20:37:46 christos Exp $	*/
      2      1.1  christos 
      3      1.1  christos /*
      4      1.1  christos  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
      5      1.1  christos  * Copyright (c) 2002-2006 Niels Provos <provos (at) citi.umich.edu>
      6      1.1  christos  * All rights reserved.
      7      1.1  christos  *
      8      1.1  christos  * Redistribution and use in source and binary forms, with or without
      9      1.1  christos  * modification, are permitted provided that the following conditions
     10      1.1  christos  * are met:
     11      1.1  christos  * 1. Redistributions of source code must retain the above copyright
     12      1.1  christos  *    notice, this list of conditions and the following disclaimer.
     13      1.1  christos  * 2. Redistributions in binary form must reproduce the above copyright
     14      1.1  christos  *    notice, this list of conditions and the following disclaimer in the
     15      1.1  christos  *    documentation and/or other materials provided with the distribution.
     16      1.1  christos  * 3. The name of the author may not be used to endorse or promote products
     17      1.1  christos  *    derived from this software without specific prior written permission.
     18      1.1  christos  *
     19      1.1  christos  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     20      1.1  christos  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     21      1.1  christos  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     22      1.1  christos  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     23      1.1  christos  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     24      1.1  christos  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     25      1.1  christos  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     26      1.1  christos  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     27      1.1  christos  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     28      1.1  christos  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     29      1.1  christos  */
     30      1.1  christos #include "evconfig-private.h"
     31      1.1  christos 
     32      1.1  christos #include <sys/types.h>
     33      1.1  christos #include <limits.h>
     34      1.1  christos #include <string.h>
     35      1.1  christos #include <stdlib.h>
     36      1.1  christos 
     37      1.1  christos #include "event2/event.h"
     38      1.1  christos #include "event2/event_struct.h"
     39      1.1  christos #include "event2/util.h"
     40      1.1  christos #include "event2/bufferevent.h"
     41      1.1  christos #include "event2/bufferevent_struct.h"
     42      1.1  christos #include "event2/buffer.h"
     43      1.1  christos 
     44      1.1  christos #include "ratelim-internal.h"
     45      1.1  christos 
     46      1.1  christos #include "bufferevent-internal.h"
     47      1.1  christos #include "mm-internal.h"
     48      1.1  christos #include "util-internal.h"
     49      1.1  christos #include "event-internal.h"
     50      1.1  christos 
     51      1.1  christos int
     52      1.1  christos ev_token_bucket_init_(struct ev_token_bucket *bucket,
     53      1.1  christos     const struct ev_token_bucket_cfg *cfg,
     54      1.1  christos     ev_uint32_t current_tick,
     55      1.1  christos     int reinitialize)
     56      1.1  christos {
     57      1.1  christos 	if (reinitialize) {
     58      1.1  christos 		/* on reinitialization, we only clip downwards, since we've
     59      1.1  christos 		   already used who-knows-how-much bandwidth this tick.  We
     60      1.1  christos 		   leave "last_updated" as it is; the next update will add the
     61      1.1  christos 		   appropriate amount of bandwidth to the bucket.
     62      1.1  christos 		*/
     63      1.1  christos 		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
     64      1.1  christos 			bucket->read_limit = cfg->read_maximum;
     65      1.1  christos 		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
     66      1.1  christos 			bucket->write_limit = cfg->write_maximum;
     67      1.1  christos 	} else {
     68      1.1  christos 		bucket->read_limit = cfg->read_rate;
     69      1.1  christos 		bucket->write_limit = cfg->write_rate;
     70      1.1  christos 		bucket->last_updated = current_tick;
     71      1.1  christos 	}
     72      1.1  christos 	return 0;
     73      1.1  christos }
     74      1.1  christos 
     75      1.1  christos int
     76      1.1  christos ev_token_bucket_update_(struct ev_token_bucket *bucket,
     77      1.1  christos     const struct ev_token_bucket_cfg *cfg,
     78      1.1  christos     ev_uint32_t current_tick)
     79      1.1  christos {
     80      1.1  christos 	/* It's okay if the tick number overflows, since we'll just
     81      1.1  christos 	 * wrap around when we do the unsigned substraction. */
     82      1.1  christos 	unsigned n_ticks = current_tick - bucket->last_updated;
     83      1.1  christos 
     84      1.1  christos 	/* Make sure some ticks actually happened, and that time didn't
     85      1.1  christos 	 * roll back. */
     86      1.1  christos 	if (n_ticks == 0 || n_ticks > INT_MAX)
     87      1.1  christos 		return 0;
     88      1.1  christos 
     89      1.1  christos 	/* Naively, we would say
     90      1.1  christos 		bucket->limit += n_ticks * cfg->rate;
     91      1.1  christos 
     92      1.1  christos 		if (bucket->limit > cfg->maximum)
     93      1.1  christos 			bucket->limit = cfg->maximum;
     94      1.1  christos 
     95      1.1  christos 	   But we're worried about overflow, so we do it like this:
     96      1.1  christos 	*/
     97      1.1  christos 
     98      1.1  christos 	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
     99      1.1  christos 		bucket->read_limit = cfg->read_maximum;
    100      1.1  christos 	else
    101      1.1  christos 		bucket->read_limit += n_ticks * cfg->read_rate;
    102      1.1  christos 
    103      1.1  christos 
    104      1.1  christos 	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
    105      1.1  christos 		bucket->write_limit = cfg->write_maximum;
    106      1.1  christos 	else
    107      1.1  christos 		bucket->write_limit += n_ticks * cfg->write_rate;
    108      1.1  christos 
    109      1.1  christos 
    110      1.1  christos 	bucket->last_updated = current_tick;
    111      1.1  christos 
    112      1.1  christos 	return 1;
    113      1.1  christos }
    114      1.1  christos 
    115      1.1  christos static inline void
    116      1.1  christos bufferevent_update_buckets(struct bufferevent_private *bev)
    117      1.1  christos {
    118      1.1  christos 	/* Must hold lock on bev. */
    119      1.1  christos 	struct timeval now;
    120      1.1  christos 	unsigned tick;
    121      1.1  christos 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
    122      1.1  christos 	tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
    123      1.1  christos 	if (tick != bev->rate_limiting->limit.last_updated)
    124      1.1  christos 		ev_token_bucket_update_(&bev->rate_limiting->limit,
    125      1.1  christos 		    bev->rate_limiting->cfg, tick);
    126      1.1  christos }
    127      1.1  christos 
    128      1.1  christos ev_uint32_t
    129      1.1  christos ev_token_bucket_get_tick_(const struct timeval *tv,
    130      1.1  christos     const struct ev_token_bucket_cfg *cfg)
    131      1.1  christos {
    132      1.1  christos 	/* This computation uses two multiplies and a divide.  We could do
    133      1.1  christos 	 * fewer if we knew that the tick length was an integer number of
    134      1.1  christos 	 * seconds, or if we knew it divided evenly into a second.  We should
    135      1.1  christos 	 * investigate that more.
    136      1.1  christos 	 */
    137      1.1  christos 
    138      1.1  christos 	/* We cast to an ev_uint64_t first, since we don't want to overflow
    139      1.1  christos 	 * before we do the final divide. */
    140      1.1  christos 	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
    141      1.1  christos 	return (unsigned)(msec / cfg->msec_per_tick);
    142      1.1  christos }
    143      1.1  christos 
    144      1.1  christos struct ev_token_bucket_cfg *
    145      1.1  christos ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
    146      1.1  christos     size_t write_rate, size_t write_burst,
    147      1.1  christos     const struct timeval *tick_len)
    148      1.1  christos {
    149      1.1  christos 	struct ev_token_bucket_cfg *r;
    150      1.1  christos 	struct timeval g;
    151      1.1  christos 	if (! tick_len) {
    152      1.1  christos 		g.tv_sec = 1;
    153      1.1  christos 		g.tv_usec = 0;
    154      1.1  christos 		tick_len = &g;
    155      1.1  christos 	}
    156      1.1  christos 	if (read_rate > read_burst || write_rate > write_burst ||
    157      1.1  christos 	    read_rate < 1 || write_rate < 1)
    158      1.1  christos 		return NULL;
    159      1.1  christos 	if (read_rate > EV_RATE_LIMIT_MAX ||
    160      1.1  christos 	    write_rate > EV_RATE_LIMIT_MAX ||
    161      1.1  christos 	    read_burst > EV_RATE_LIMIT_MAX ||
    162      1.1  christos 	    write_burst > EV_RATE_LIMIT_MAX)
    163      1.1  christos 		return NULL;
    164      1.1  christos 	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
    165      1.1  christos 	if (!r)
    166      1.1  christos 		return NULL;
    167      1.1  christos 	r->read_rate = read_rate;
    168      1.1  christos 	r->write_rate = write_rate;
    169      1.1  christos 	r->read_maximum = read_burst;
    170      1.1  christos 	r->write_maximum = write_burst;
    171      1.1  christos 	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
    172      1.1  christos 	r->msec_per_tick = (tick_len->tv_sec * 1000) +
    173      1.1  christos 	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
    174      1.1  christos 	return r;
    175      1.1  christos }
    176      1.1  christos 
    177      1.1  christos void
    178      1.1  christos ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
    179      1.1  christos {
    180      1.1  christos 	mm_free(cfg);
    181      1.1  christos }
    182      1.1  christos 
    183      1.1  christos /* Default values for max_single_read & max_single_write variables. */
    184      1.1  christos #define MAX_SINGLE_READ_DEFAULT 16384
    185      1.1  christos #define MAX_SINGLE_WRITE_DEFAULT 16384
    186      1.1  christos 
    187      1.1  christos #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
    188      1.1  christos #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
    189      1.1  christos 
    190      1.1  christos static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
    191      1.1  christos static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
    192      1.1  christos static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
    193      1.1  christos static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
    194      1.1  christos 
    195      1.1  christos /** Helper: figure out the maximum amount we should write if is_write, or
    196      1.1  christos     the maximum amount we should read if is_read.  Return that maximum, or
    197      1.1  christos     0 if our bucket is wholly exhausted.
    198      1.1  christos  */
    199      1.1  christos static inline ev_ssize_t
    200      1.1  christos bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
    201      1.1  christos {
    202      1.1  christos 	/* needs lock on bev. */
    203      1.1  christos 	ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
    204      1.1  christos 
    205      1.1  christos #define LIM(x)						\
    206      1.1  christos 	(is_write ? (x).write_limit : (x).read_limit)
    207      1.1  christos 
    208      1.1  christos #define GROUP_SUSPENDED(g)			\
    209      1.1  christos 	(is_write ? (g)->write_suspended : (g)->read_suspended)
    210      1.1  christos 
    211      1.1  christos 	/* Sets max_so_far to MIN(x, max_so_far) */
    212      1.1  christos #define CLAMPTO(x)				\
    213      1.1  christos 	do {					\
    214      1.1  christos 		if (max_so_far > (x))		\
    215      1.1  christos 			max_so_far = (x);	\
    216      1.1  christos 	} while (0);
    217      1.1  christos 
    218      1.1  christos 	if (!bev->rate_limiting)
    219      1.1  christos 		return max_so_far;
    220      1.1  christos 
    221      1.1  christos 	/* If rate-limiting is enabled at all, update the appropriate
    222      1.1  christos 	   bucket, and take the smaller of our rate limit and the group
    223      1.1  christos 	   rate limit.
    224      1.1  christos 	 */
    225      1.1  christos 
    226      1.1  christos 	if (bev->rate_limiting->cfg) {
    227      1.1  christos 		bufferevent_update_buckets(bev);
    228      1.1  christos 		max_so_far = LIM(bev->rate_limiting->limit);
    229      1.1  christos 	}
    230      1.1  christos 	if (bev->rate_limiting->group) {
    231      1.1  christos 		struct bufferevent_rate_limit_group *g =
    232      1.1  christos 		    bev->rate_limiting->group;
    233      1.1  christos 		ev_ssize_t share;
    234      1.1  christos 		LOCK_GROUP(g);
    235      1.1  christos 		if (GROUP_SUSPENDED(g)) {
    236      1.1  christos 			/* We can get here if we failed to lock this
    237      1.1  christos 			 * particular bufferevent while suspending the whole
    238      1.1  christos 			 * group. */
    239      1.1  christos 			if (is_write)
    240      1.1  christos 				bufferevent_suspend_write_(&bev->bev,
    241      1.1  christos 				    BEV_SUSPEND_BW_GROUP);
    242      1.1  christos 			else
    243      1.1  christos 				bufferevent_suspend_read_(&bev->bev,
    244      1.1  christos 				    BEV_SUSPEND_BW_GROUP);
    245      1.1  christos 			share = 0;
    246      1.1  christos 		} else {
    247      1.1  christos 			/* XXXX probably we should divide among the active
    248      1.1  christos 			 * members, not the total members. */
    249      1.1  christos 			share = LIM(g->rate_limit) / g->n_members;
    250      1.1  christos 			if (share < g->min_share)
    251      1.1  christos 				share = g->min_share;
    252      1.1  christos 		}
    253      1.1  christos 		UNLOCK_GROUP(g);
    254      1.1  christos 		CLAMPTO(share);
    255      1.1  christos 	}
    256      1.1  christos 
    257      1.1  christos 	if (max_so_far < 0)
    258      1.1  christos 		max_so_far = 0;
    259      1.1  christos 	return max_so_far;
    260      1.1  christos }
    261      1.1  christos 
    262      1.1  christos ev_ssize_t
    263      1.1  christos bufferevent_get_read_max_(struct bufferevent_private *bev)
    264      1.1  christos {
    265      1.1  christos 	return bufferevent_get_rlim_max_(bev, 0);
    266      1.1  christos }
    267      1.1  christos 
    268      1.1  christos ev_ssize_t
    269      1.1  christos bufferevent_get_write_max_(struct bufferevent_private *bev)
    270      1.1  christos {
    271      1.1  christos 	return bufferevent_get_rlim_max_(bev, 1);
    272      1.1  christos }
    273      1.1  christos 
    274      1.1  christos int
    275      1.1  christos bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
    276      1.1  christos {
    277      1.1  christos 	/* XXXXX Make sure all users of this function check its return value */
    278      1.1  christos 	int r = 0;
    279      1.1  christos 	/* need to hold lock on bev */
    280      1.1  christos 	if (!bev->rate_limiting)
    281      1.1  christos 		return 0;
    282      1.1  christos 
    283      1.1  christos 	if (bev->rate_limiting->cfg) {
    284      1.1  christos 		bev->rate_limiting->limit.read_limit -= bytes;
    285      1.1  christos 		if (bev->rate_limiting->limit.read_limit <= 0) {
    286      1.1  christos 			bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
    287      1.1  christos 			if (event_add(&bev->rate_limiting->refill_bucket_event,
    288      1.1  christos 				&bev->rate_limiting->cfg->tick_timeout) < 0)
    289      1.1  christos 				r = -1;
    290      1.1  christos 		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
    291      1.1  christos 			if (!(bev->write_suspended & BEV_SUSPEND_BW))
    292      1.1  christos 				event_del(&bev->rate_limiting->refill_bucket_event);
    293      1.1  christos 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
    294      1.1  christos 		}
    295      1.1  christos 	}
    296      1.1  christos 
    297      1.1  christos 	if (bev->rate_limiting->group) {
    298      1.1  christos 		LOCK_GROUP(bev->rate_limiting->group);
    299      1.1  christos 		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
    300      1.1  christos 		bev->rate_limiting->group->total_read += bytes;
    301      1.1  christos 		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
    302      1.1  christos 			bev_group_suspend_reading_(bev->rate_limiting->group);
    303      1.1  christos 		} else if (bev->rate_limiting->group->read_suspended) {
    304      1.1  christos 			bev_group_unsuspend_reading_(bev->rate_limiting->group);
    305      1.1  christos 		}
    306      1.1  christos 		UNLOCK_GROUP(bev->rate_limiting->group);
    307      1.1  christos 	}
    308      1.1  christos 
    309      1.1  christos 	return r;
    310      1.1  christos }
    311      1.1  christos 
    312      1.1  christos int
    313      1.1  christos bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
    314      1.1  christos {
    315      1.1  christos 	/* XXXXX Make sure all users of this function check its return value */
    316      1.1  christos 	int r = 0;
    317      1.1  christos 	/* need to hold lock */
    318      1.1  christos 	if (!bev->rate_limiting)
    319      1.1  christos 		return 0;
    320      1.1  christos 
    321      1.1  christos 	if (bev->rate_limiting->cfg) {
    322      1.1  christos 		bev->rate_limiting->limit.write_limit -= bytes;
    323      1.1  christos 		if (bev->rate_limiting->limit.write_limit <= 0) {
    324      1.1  christos 			bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
    325      1.1  christos 			if (event_add(&bev->rate_limiting->refill_bucket_event,
    326      1.1  christos 				&bev->rate_limiting->cfg->tick_timeout) < 0)
    327      1.1  christos 				r = -1;
    328      1.1  christos 		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
    329      1.1  christos 			if (!(bev->read_suspended & BEV_SUSPEND_BW))
    330      1.1  christos 				event_del(&bev->rate_limiting->refill_bucket_event);
    331      1.1  christos 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
    332      1.1  christos 		}
    333      1.1  christos 	}
    334      1.1  christos 
    335      1.1  christos 	if (bev->rate_limiting->group) {
    336      1.1  christos 		LOCK_GROUP(bev->rate_limiting->group);
    337      1.1  christos 		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
    338      1.1  christos 		bev->rate_limiting->group->total_written += bytes;
    339      1.1  christos 		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
    340      1.1  christos 			bev_group_suspend_writing_(bev->rate_limiting->group);
    341      1.1  christos 		} else if (bev->rate_limiting->group->write_suspended) {
    342      1.1  christos 			bev_group_unsuspend_writing_(bev->rate_limiting->group);
    343      1.1  christos 		}
    344      1.1  christos 		UNLOCK_GROUP(bev->rate_limiting->group);
    345      1.1  christos 	}
    346      1.1  christos 
    347      1.1  christos 	return r;
    348      1.1  christos }
    349      1.1  christos 
    350      1.1  christos /** Stop reading on every bufferevent in <b>g</b> */
    351      1.1  christos static int
    352      1.1  christos bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
    353      1.1  christos {
    354      1.1  christos 	/* Needs group lock */
    355      1.1  christos 	struct bufferevent_private *bev;
    356      1.1  christos 	g->read_suspended = 1;
    357      1.1  christos 	g->pending_unsuspend_read = 0;
    358      1.1  christos 
    359      1.1  christos 	/* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
    360      1.1  christos 	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
    361      1.1  christos 	   the bufferevent locks.  If we are unable to lock any individual
    362      1.1  christos 	   bufferevent, it will find out later when it looks at its limit
    363      1.1  christos 	   and sees that its group is suspended.)
    364      1.1  christos 	*/
    365      1.1  christos 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
    366      1.1  christos 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
    367      1.1  christos 			bufferevent_suspend_read_(&bev->bev,
    368      1.1  christos 			    BEV_SUSPEND_BW_GROUP);
    369      1.1  christos 			EVLOCK_UNLOCK(bev->lock, 0);
    370      1.1  christos 		}
    371      1.1  christos 	}
    372      1.1  christos 	return 0;
    373      1.1  christos }
    374      1.1  christos 
    375      1.1  christos /** Stop writing on every bufferevent in <b>g</b> */
    376      1.1  christos static int
    377      1.1  christos bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
    378      1.1  christos {
    379      1.1  christos 	/* Needs group lock */
    380      1.1  christos 	struct bufferevent_private *bev;
    381      1.1  christos 	g->write_suspended = 1;
    382      1.1  christos 	g->pending_unsuspend_write = 0;
    383      1.1  christos 	LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
    384      1.1  christos 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
    385      1.1  christos 			bufferevent_suspend_write_(&bev->bev,
    386      1.1  christos 			    BEV_SUSPEND_BW_GROUP);
    387      1.1  christos 			EVLOCK_UNLOCK(bev->lock, 0);
    388      1.1  christos 		}
    389      1.1  christos 	}
    390      1.1  christos 	return 0;
    391      1.1  christos }
    392      1.1  christos 
    393      1.1  christos /** Timer callback invoked on a single bufferevent with one or more exhausted
    394      1.1  christos     buckets when they are ready to refill. */
    395      1.1  christos static void
    396      1.1  christos bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
    397      1.1  christos {
    398      1.1  christos 	unsigned tick;
    399      1.1  christos 	struct timeval now;
    400      1.1  christos 	struct bufferevent_private *bev = arg;
    401      1.1  christos 	int again = 0;
    402      1.1  christos 	BEV_LOCK(&bev->bev);
    403      1.1  christos 	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
    404      1.1  christos 		BEV_UNLOCK(&bev->bev);
    405      1.1  christos 		return;
    406      1.1  christos 	}
    407      1.1  christos 
    408      1.1  christos 	/* First, update the bucket */
    409      1.1  christos 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
    410      1.1  christos 	tick = ev_token_bucket_get_tick_(&now,
    411      1.1  christos 	    bev->rate_limiting->cfg);
    412      1.1  christos 	ev_token_bucket_update_(&bev->rate_limiting->limit,
    413      1.1  christos 	    bev->rate_limiting->cfg,
    414      1.1  christos 	    tick);
    415      1.1  christos 
    416      1.1  christos 	/* Now unsuspend any read/write operations as appropriate. */
    417      1.1  christos 	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
    418      1.1  christos 		if (bev->rate_limiting->limit.read_limit > 0)
    419      1.1  christos 			bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
    420      1.1  christos 		else
    421      1.1  christos 			again = 1;
    422      1.1  christos 	}
    423      1.1  christos 	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
    424      1.1  christos 		if (bev->rate_limiting->limit.write_limit > 0)
    425      1.1  christos 			bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
    426      1.1  christos 		else
    427      1.1  christos 			again = 1;
    428      1.1  christos 	}
    429      1.1  christos 	if (again) {
    430      1.1  christos 		/* One or more of the buckets may need another refill if they
    431      1.1  christos 		   started negative.
    432      1.1  christos 
    433      1.1  christos 		   XXXX if we need to be quiet for more ticks, we should
    434      1.1  christos 		   maybe figure out what timeout we really want.
    435      1.1  christos 		*/
    436      1.1  christos 		/* XXXX Handle event_add failure somehow */
    437      1.1  christos 		event_add(&bev->rate_limiting->refill_bucket_event,
    438      1.1  christos 		    &bev->rate_limiting->cfg->tick_timeout);
    439      1.1  christos 	}
    440      1.1  christos 	BEV_UNLOCK(&bev->bev);
    441      1.1  christos }
    442      1.1  christos 
    443      1.1  christos /** Helper: grab a random element from a bufferevent group.
    444      1.1  christos  *
    445      1.1  christos  * Requires that we hold the lock on the group.
    446      1.1  christos  */
    447      1.1  christos static struct bufferevent_private *
    448      1.1  christos bev_group_random_element_(struct bufferevent_rate_limit_group *group)
    449      1.1  christos {
    450      1.1  christos 	int which;
    451      1.1  christos 	struct bufferevent_private *bev;
    452      1.1  christos 
    453      1.1  christos 	/* requires group lock */
    454      1.1  christos 
    455      1.1  christos 	if (!group->n_members)
    456      1.1  christos 		return NULL;
    457      1.1  christos 
    458      1.1  christos 	EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
    459      1.1  christos 
    460      1.1  christos 	which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
    461      1.1  christos 
    462      1.1  christos 	bev = LIST_FIRST(&group->members);
    463      1.1  christos 	while (which--)
    464      1.1  christos 		bev = LIST_NEXT(bev, rate_limiting->next_in_group);
    465      1.1  christos 
    466      1.1  christos 	return bev;
    467      1.1  christos }
    468      1.1  christos 
    469      1.1  christos /** Iterate over the elements of a rate-limiting group 'g' with a random
    470      1.1  christos     starting point, assigning each to the variable 'bev', and executing the
    471      1.1  christos     block 'block'.
    472      1.1  christos 
    473      1.1  christos     We do this in a half-baked effort to get fairness among group members.
    474      1.1  christos     XXX Round-robin or some kind of priority queue would be even more fair.
    475      1.1  christos  */
    476      1.1  christos #define FOREACH_RANDOM_ORDER(block)			\
    477      1.1  christos 	do {						\
    478      1.1  christos 		first = bev_group_random_element_(g);	\
    479      1.1  christos 		for (bev = first; bev != LIST_END(&g->members); \
    480      1.1  christos 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
    481      1.1  christos 			block ;					 \
    482      1.1  christos 		}						 \
    483      1.1  christos 		for (bev = LIST_FIRST(&g->members); bev && bev != first; \
    484      1.1  christos 		    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
    485      1.1  christos 			block ;						\
    486      1.1  christos 		}							\
    487      1.1  christos 	} while (0)
    488      1.1  christos 
    489      1.1  christos static void
    490      1.1  christos bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
    491      1.1  christos {
    492      1.1  christos 	int again = 0;
    493      1.1  christos 	struct bufferevent_private *bev, *first;
    494      1.1  christos 
    495      1.1  christos 	g->read_suspended = 0;
    496      1.1  christos 	FOREACH_RANDOM_ORDER({
    497      1.1  christos 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
    498      1.1  christos 			bufferevent_unsuspend_read_(&bev->bev,
    499      1.1  christos 			    BEV_SUSPEND_BW_GROUP);
    500      1.1  christos 			EVLOCK_UNLOCK(bev->lock, 0);
    501      1.1  christos 		} else {
    502      1.1  christos 			again = 1;
    503      1.1  christos 		}
    504      1.1  christos 	});
    505      1.1  christos 	g->pending_unsuspend_read = again;
    506      1.1  christos }
    507      1.1  christos 
    508      1.1  christos static void
    509      1.1  christos bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
    510      1.1  christos {
    511      1.1  christos 	int again = 0;
    512      1.1  christos 	struct bufferevent_private *bev, *first;
    513      1.1  christos 	g->write_suspended = 0;
    514      1.1  christos 
    515      1.1  christos 	FOREACH_RANDOM_ORDER({
    516      1.1  christos 		if (EVLOCK_TRY_LOCK_(bev->lock)) {
    517      1.1  christos 			bufferevent_unsuspend_write_(&bev->bev,
    518      1.1  christos 			    BEV_SUSPEND_BW_GROUP);
    519      1.1  christos 			EVLOCK_UNLOCK(bev->lock, 0);
    520      1.1  christos 		} else {
    521      1.1  christos 			again = 1;
    522      1.1  christos 		}
    523      1.1  christos 	});
    524      1.1  christos 	g->pending_unsuspend_write = again;
    525      1.1  christos }
    526      1.1  christos 
    527      1.1  christos /** Callback invoked every tick to add more elements to the group bucket
    528      1.1  christos     and unsuspend group members as needed.
    529      1.1  christos  */
    530      1.1  christos static void
    531      1.1  christos bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
    532      1.1  christos {
    533      1.1  christos 	struct bufferevent_rate_limit_group *g = arg;
    534      1.1  christos 	unsigned tick;
    535      1.1  christos 	struct timeval now;
    536      1.1  christos 
    537      1.1  christos 	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
    538      1.1  christos 
    539      1.1  christos 	LOCK_GROUP(g);
    540      1.1  christos 
    541      1.1  christos 	tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
    542      1.1  christos 	ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
    543      1.1  christos 
    544      1.1  christos 	if (g->pending_unsuspend_read ||
    545      1.1  christos 	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
    546      1.1  christos 		bev_group_unsuspend_reading_(g);
    547      1.1  christos 	}
    548      1.1  christos 	if (g->pending_unsuspend_write ||
    549      1.1  christos 	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
    550      1.1  christos 		bev_group_unsuspend_writing_(g);
    551      1.1  christos 	}
    552      1.1  christos 
    553      1.1  christos 	/* XXXX Rather than waiting to the next tick to unsuspend stuff
    554      1.1  christos 	 * with pending_unsuspend_write/read, we should do it on the
    555      1.1  christos 	 * next iteration of the mainloop.
    556      1.1  christos 	 */
    557      1.1  christos 
    558      1.1  christos 	UNLOCK_GROUP(g);
    559      1.1  christos }
    560      1.1  christos 
    561      1.1  christos int
    562      1.1  christos bufferevent_set_rate_limit(struct bufferevent *bev,
    563      1.1  christos     struct ev_token_bucket_cfg *cfg)
    564      1.1  christos {
    565      1.1  christos 	struct bufferevent_private *bevp =
    566      1.1  christos 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
    567      1.1  christos 	int r = -1;
    568      1.1  christos 	struct bufferevent_rate_limit *rlim;
    569      1.1  christos 	struct timeval now;
    570      1.1  christos 	ev_uint32_t tick;
    571      1.1  christos 	int reinit = 0, suspended = 0;
    572      1.1  christos 	/* XXX reference-count cfg */
    573      1.1  christos 
    574      1.1  christos 	BEV_LOCK(bev);
    575      1.1  christos 
    576      1.1  christos 	if (cfg == NULL) {
    577      1.1  christos 		if (bevp->rate_limiting) {
    578      1.1  christos 			rlim = bevp->rate_limiting;
    579      1.1  christos 			rlim->cfg = NULL;
    580      1.1  christos 			bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
    581      1.1  christos 			bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
    582      1.1  christos 			if (event_initialized(&rlim->refill_bucket_event))
    583      1.1  christos 				event_del(&rlim->refill_bucket_event);
    584      1.1  christos 		}
    585      1.1  christos 		r = 0;
    586      1.1  christos 		goto done;
    587      1.1  christos 	}
    588      1.1  christos 
    589      1.1  christos 	event_base_gettimeofday_cached(bev->ev_base, &now);
    590      1.1  christos 	tick = ev_token_bucket_get_tick_(&now, cfg);
    591      1.1  christos 
    592      1.1  christos 	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
    593      1.1  christos 		/* no-op */
    594      1.1  christos 		r = 0;
    595      1.1  christos 		goto done;
    596      1.1  christos 	}
    597      1.1  christos 	if (bevp->rate_limiting == NULL) {
    598      1.1  christos 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
    599      1.1  christos 		if (!rlim)
    600      1.1  christos 			goto done;
    601      1.1  christos 		bevp->rate_limiting = rlim;
    602      1.1  christos 	} else {
    603      1.1  christos 		rlim = bevp->rate_limiting;
    604      1.1  christos 	}
    605      1.1  christos 	reinit = rlim->cfg != NULL;
    606      1.1  christos 
    607      1.1  christos 	rlim->cfg = cfg;
    608      1.1  christos 	ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
    609      1.1  christos 
    610      1.1  christos 	if (reinit) {
    611      1.1  christos 		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
    612      1.1  christos 		event_del(&rlim->refill_bucket_event);
    613      1.1  christos 	}
    614  1.1.1.2  christos 	event_assign(&rlim->refill_bucket_event, bev->ev_base,
    615  1.1.1.2  christos 	    -1, EV_FINALIZE, bev_refill_callback_, bevp);
    616      1.1  christos 
    617      1.1  christos 	if (rlim->limit.read_limit > 0) {
    618      1.1  christos 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
    619      1.1  christos 	} else {
    620      1.1  christos 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
    621      1.1  christos 		suspended=1;
    622      1.1  christos 	}
    623      1.1  christos 	if (rlim->limit.write_limit > 0) {
    624      1.1  christos 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
    625      1.1  christos 	} else {
    626      1.1  christos 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
    627      1.1  christos 		suspended = 1;
    628      1.1  christos 	}
    629      1.1  christos 
    630      1.1  christos 	if (suspended)
    631      1.1  christos 		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
    632      1.1  christos 
    633      1.1  christos 	r = 0;
    634      1.1  christos 
    635      1.1  christos done:
    636      1.1  christos 	BEV_UNLOCK(bev);
    637      1.1  christos 	return r;
    638      1.1  christos }
    639      1.1  christos 
    640      1.1  christos struct bufferevent_rate_limit_group *
    641      1.1  christos bufferevent_rate_limit_group_new(struct event_base *base,
    642      1.1  christos     const struct ev_token_bucket_cfg *cfg)
    643      1.1  christos {
    644      1.1  christos 	struct bufferevent_rate_limit_group *g;
    645      1.1  christos 	struct timeval now;
    646      1.1  christos 	ev_uint32_t tick;
    647      1.1  christos 
    648      1.1  christos 	event_base_gettimeofday_cached(base, &now);
    649      1.1  christos 	tick = ev_token_bucket_get_tick_(&now, cfg);
    650      1.1  christos 
    651      1.1  christos 	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
    652      1.1  christos 	if (!g)
    653      1.1  christos 		return NULL;
    654      1.1  christos 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
    655      1.1  christos 	LIST_INIT(&g->members);
    656      1.1  christos 
    657      1.1  christos 	ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
    658      1.1  christos 
    659  1.1.1.2  christos 	event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
    660      1.1  christos 	    bev_group_refill_callback_, g);
    661      1.1  christos 	/*XXXX handle event_add failure */
    662      1.1  christos 	event_add(&g->master_refill_event, &cfg->tick_timeout);
    663      1.1  christos 
    664      1.1  christos 	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
    665      1.1  christos 
    666      1.1  christos 	bufferevent_rate_limit_group_set_min_share(g, 64);
    667      1.1  christos 
    668      1.1  christos 	evutil_weakrand_seed_(&g->weakrand_seed,
    669      1.1  christos 	    (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
    670      1.1  christos 
    671      1.1  christos 	return g;
    672      1.1  christos }
    673      1.1  christos 
    674      1.1  christos int
    675      1.1  christos bufferevent_rate_limit_group_set_cfg(
    676      1.1  christos 	struct bufferevent_rate_limit_group *g,
    677      1.1  christos 	const struct ev_token_bucket_cfg *cfg)
    678      1.1  christos {
    679      1.1  christos 	int same_tick;
    680      1.1  christos 	if (!g || !cfg)
    681      1.1  christos 		return -1;
    682      1.1  christos 
    683      1.1  christos 	LOCK_GROUP(g);
    684      1.1  christos 	same_tick = evutil_timercmp(
    685      1.1  christos 		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
    686      1.1  christos 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
    687      1.1  christos 
    688      1.1  christos 	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
    689      1.1  christos 		g->rate_limit.read_limit = cfg->read_maximum;
    690      1.1  christos 	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
    691      1.1  christos 		g->rate_limit.write_limit = cfg->write_maximum;
    692      1.1  christos 
    693      1.1  christos 	if (!same_tick) {
    694      1.1  christos 		/* This can cause a hiccup in the schedule */
    695      1.1  christos 		event_add(&g->master_refill_event, &cfg->tick_timeout);
    696      1.1  christos 	}
    697      1.1  christos 
    698      1.1  christos 	/* The new limits might force us to adjust min_share differently. */
    699      1.1  christos 	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
    700      1.1  christos 
    701      1.1  christos 	UNLOCK_GROUP(g);
    702      1.1  christos 	return 0;
    703      1.1  christos }
    704      1.1  christos 
    705      1.1  christos int
    706      1.1  christos bufferevent_rate_limit_group_set_min_share(
    707      1.1  christos 	struct bufferevent_rate_limit_group *g,
    708      1.1  christos 	size_t share)
    709      1.1  christos {
    710      1.1  christos 	if (share > EV_SSIZE_MAX)
    711      1.1  christos 		return -1;
    712      1.1  christos 
    713      1.1  christos 	g->configured_min_share = share;
    714      1.1  christos 
    715      1.1  christos 	/* Can't set share to less than the one-tick maximum.  IOW, at steady
    716      1.1  christos 	 * state, at least one connection can go per tick. */
    717      1.1  christos 	if (share > g->rate_limit_cfg.read_rate)
    718      1.1  christos 		share = g->rate_limit_cfg.read_rate;
    719      1.1  christos 	if (share > g->rate_limit_cfg.write_rate)
    720      1.1  christos 		share = g->rate_limit_cfg.write_rate;
    721      1.1  christos 
    722      1.1  christos 	g->min_share = share;
    723      1.1  christos 	return 0;
    724      1.1  christos }
    725      1.1  christos 
    726      1.1  christos void
    727      1.1  christos bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
    728      1.1  christos {
    729      1.1  christos 	LOCK_GROUP(g);
    730      1.1  christos 	EVUTIL_ASSERT(0 == g->n_members);
    731      1.1  christos 	event_del(&g->master_refill_event);
    732      1.1  christos 	UNLOCK_GROUP(g);
    733      1.1  christos 	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
    734      1.1  christos 	mm_free(g);
    735      1.1  christos }
    736      1.1  christos 
    737      1.1  christos int
    738      1.1  christos bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
    739      1.1  christos     struct bufferevent_rate_limit_group *g)
    740      1.1  christos {
    741      1.1  christos 	int wsuspend, rsuspend;
    742      1.1  christos 	struct bufferevent_private *bevp =
    743      1.1  christos 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
    744      1.1  christos 	BEV_LOCK(bev);
    745      1.1  christos 
    746      1.1  christos 	if (!bevp->rate_limiting) {
    747      1.1  christos 		struct bufferevent_rate_limit *rlim;
    748      1.1  christos 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
    749      1.1  christos 		if (!rlim) {
    750      1.1  christos 			BEV_UNLOCK(bev);
    751      1.1  christos 			return -1;
    752      1.1  christos 		}
    753  1.1.1.2  christos 		event_assign(&rlim->refill_bucket_event, bev->ev_base,
    754  1.1.1.2  christos 		    -1, EV_FINALIZE, bev_refill_callback_, bevp);
    755      1.1  christos 		bevp->rate_limiting = rlim;
    756      1.1  christos 	}
    757      1.1  christos 
    758      1.1  christos 	if (bevp->rate_limiting->group == g) {
    759      1.1  christos 		BEV_UNLOCK(bev);
    760      1.1  christos 		return 0;
    761      1.1  christos 	}
    762      1.1  christos 	if (bevp->rate_limiting->group)
    763      1.1  christos 		bufferevent_remove_from_rate_limit_group(bev);
    764      1.1  christos 
    765      1.1  christos 	LOCK_GROUP(g);
    766      1.1  christos 	bevp->rate_limiting->group = g;
    767      1.1  christos 	++g->n_members;
    768      1.1  christos 	LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
    769      1.1  christos 
    770      1.1  christos 	rsuspend = g->read_suspended;
    771      1.1  christos 	wsuspend = g->write_suspended;
    772      1.1  christos 
    773      1.1  christos 	UNLOCK_GROUP(g);
    774      1.1  christos 
    775      1.1  christos 	if (rsuspend)
    776      1.1  christos 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
    777      1.1  christos 	if (wsuspend)
    778      1.1  christos 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
    779      1.1  christos 
    780      1.1  christos 	BEV_UNLOCK(bev);
    781      1.1  christos 	return 0;
    782      1.1  christos }
    783      1.1  christos 
    784      1.1  christos int
    785      1.1  christos bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
    786      1.1  christos {
    787      1.1  christos 	return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
    788      1.1  christos }
    789      1.1  christos 
    790      1.1  christos int
    791      1.1  christos bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
    792      1.1  christos     int unsuspend)
    793      1.1  christos {
    794      1.1  christos 	struct bufferevent_private *bevp =
    795      1.1  christos 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
    796      1.1  christos 	BEV_LOCK(bev);
    797      1.1  christos 	if (bevp->rate_limiting && bevp->rate_limiting->group) {
    798      1.1  christos 		struct bufferevent_rate_limit_group *g =
    799      1.1  christos 		    bevp->rate_limiting->group;
    800      1.1  christos 		LOCK_GROUP(g);
    801      1.1  christos 		bevp->rate_limiting->group = NULL;
    802      1.1  christos 		--g->n_members;
    803      1.1  christos 		LIST_REMOVE(bevp, rate_limiting->next_in_group);
    804      1.1  christos 		UNLOCK_GROUP(g);
    805      1.1  christos 	}
    806      1.1  christos 	if (unsuspend) {
    807      1.1  christos 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
    808      1.1  christos 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
    809      1.1  christos 	}
    810      1.1  christos 	BEV_UNLOCK(bev);
    811      1.1  christos 	return 0;
    812      1.1  christos }
    813      1.1  christos 
    814      1.1  christos /* ===
    815      1.1  christos  * API functions to expose rate limits.
    816      1.1  christos  *
    817      1.1  christos  * Don't use these from inside Libevent; they're meant to be for use by
    818      1.1  christos  * the program.
    819      1.1  christos  * === */
    820      1.1  christos 
    821      1.1  christos /* Mostly you don't want to use this function from inside libevent;
    822      1.1  christos  * bufferevent_get_read_max_() is more likely what you want*/
    823      1.1  christos ev_ssize_t
    824      1.1  christos bufferevent_get_read_limit(struct bufferevent *bev)
    825      1.1  christos {
    826      1.1  christos 	ev_ssize_t r;
    827      1.1  christos 	struct bufferevent_private *bevp;
    828      1.1  christos 	BEV_LOCK(bev);
    829      1.1  christos 	bevp = BEV_UPCAST(bev);
    830      1.1  christos 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
    831      1.1  christos 		bufferevent_update_buckets(bevp);
    832      1.1  christos 		r = bevp->rate_limiting->limit.read_limit;
    833      1.1  christos 	} else {
    834      1.1  christos 		r = EV_SSIZE_MAX;
    835      1.1  christos 	}
    836      1.1  christos 	BEV_UNLOCK(bev);
    837      1.1  christos 	return r;
    838      1.1  christos }
    839      1.1  christos 
    840      1.1  christos /* Mostly you don't want to use this function from inside libevent;
    841      1.1  christos  * bufferevent_get_write_max_() is more likely what you want*/
    842      1.1  christos ev_ssize_t
    843      1.1  christos bufferevent_get_write_limit(struct bufferevent *bev)
    844      1.1  christos {
    845      1.1  christos 	ev_ssize_t r;
    846      1.1  christos 	struct bufferevent_private *bevp;
    847      1.1  christos 	BEV_LOCK(bev);
    848      1.1  christos 	bevp = BEV_UPCAST(bev);
    849      1.1  christos 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
    850      1.1  christos 		bufferevent_update_buckets(bevp);
    851      1.1  christos 		r = bevp->rate_limiting->limit.write_limit;
    852      1.1  christos 	} else {
    853      1.1  christos 		r = EV_SSIZE_MAX;
    854      1.1  christos 	}
    855      1.1  christos 	BEV_UNLOCK(bev);
    856      1.1  christos 	return r;
    857      1.1  christos }
    858      1.1  christos 
    859      1.1  christos int
    860      1.1  christos bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
    861      1.1  christos {
    862      1.1  christos 	struct bufferevent_private *bevp;
    863      1.1  christos 	BEV_LOCK(bev);
    864      1.1  christos 	bevp = BEV_UPCAST(bev);
    865      1.1  christos 	if (size == 0 || size > EV_SSIZE_MAX)
    866      1.1  christos 		bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
    867      1.1  christos 	else
    868      1.1  christos 		bevp->max_single_read = size;
    869      1.1  christos 	BEV_UNLOCK(bev);
    870      1.1  christos 	return 0;
    871      1.1  christos }
    872      1.1  christos 
    873      1.1  christos int
    874      1.1  christos bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
    875      1.1  christos {
    876      1.1  christos 	struct bufferevent_private *bevp;
    877      1.1  christos 	BEV_LOCK(bev);
    878      1.1  christos 	bevp = BEV_UPCAST(bev);
    879      1.1  christos 	if (size == 0 || size > EV_SSIZE_MAX)
    880      1.1  christos 		bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
    881      1.1  christos 	else
    882      1.1  christos 		bevp->max_single_write = size;
    883      1.1  christos 	BEV_UNLOCK(bev);
    884      1.1  christos 	return 0;
    885      1.1  christos }
    886      1.1  christos 
    887      1.1  christos ev_ssize_t
    888      1.1  christos bufferevent_get_max_single_read(struct bufferevent *bev)
    889      1.1  christos {
    890      1.1  christos 	ev_ssize_t r;
    891      1.1  christos 
    892      1.1  christos 	BEV_LOCK(bev);
    893      1.1  christos 	r = BEV_UPCAST(bev)->max_single_read;
    894      1.1  christos 	BEV_UNLOCK(bev);
    895      1.1  christos 	return r;
    896      1.1  christos }
    897      1.1  christos 
    898      1.1  christos ev_ssize_t
    899      1.1  christos bufferevent_get_max_single_write(struct bufferevent *bev)
    900      1.1  christos {
    901      1.1  christos 	ev_ssize_t r;
    902      1.1  christos 
    903      1.1  christos 	BEV_LOCK(bev);
    904      1.1  christos 	r = BEV_UPCAST(bev)->max_single_write;
    905      1.1  christos 	BEV_UNLOCK(bev);
    906      1.1  christos 	return r;
    907      1.1  christos }
    908      1.1  christos 
    909      1.1  christos ev_ssize_t
    910      1.1  christos bufferevent_get_max_to_read(struct bufferevent *bev)
    911      1.1  christos {
    912      1.1  christos 	ev_ssize_t r;
    913      1.1  christos 	BEV_LOCK(bev);
    914      1.1  christos 	r = bufferevent_get_read_max_(BEV_UPCAST(bev));
    915      1.1  christos 	BEV_UNLOCK(bev);
    916      1.1  christos 	return r;
    917      1.1  christos }
    918      1.1  christos 
    919      1.1  christos ev_ssize_t
    920      1.1  christos bufferevent_get_max_to_write(struct bufferevent *bev)
    921      1.1  christos {
    922      1.1  christos 	ev_ssize_t r;
    923      1.1  christos 	BEV_LOCK(bev);
    924      1.1  christos 	r = bufferevent_get_write_max_(BEV_UPCAST(bev));
    925      1.1  christos 	BEV_UNLOCK(bev);
    926      1.1  christos 	return r;
    927      1.1  christos }
    928      1.1  christos 
    929  1.1.1.2  christos const struct ev_token_bucket_cfg *
    930  1.1.1.2  christos bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
    931  1.1.1.2  christos 	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
    932  1.1.1.2  christos 	struct ev_token_bucket_cfg *cfg;
    933  1.1.1.2  christos 
    934  1.1.1.2  christos 	BEV_LOCK(bev);
    935  1.1.1.2  christos 
    936  1.1.1.2  christos 	if (bufev_private->rate_limiting) {
    937  1.1.1.2  christos 		cfg = bufev_private->rate_limiting->cfg;
    938  1.1.1.2  christos 	} else {
    939  1.1.1.2  christos 		cfg = NULL;
    940  1.1.1.2  christos 	}
    941  1.1.1.2  christos 
    942  1.1.1.2  christos 	BEV_UNLOCK(bev);
    943  1.1.1.2  christos 
    944  1.1.1.2  christos 	return cfg;
    945  1.1.1.2  christos }
    946      1.1  christos 
    947      1.1  christos /* Mostly you don't want to use this function from inside libevent;
    948      1.1  christos  * bufferevent_get_read_max_() is more likely what you want*/
    949      1.1  christos ev_ssize_t
    950      1.1  christos bufferevent_rate_limit_group_get_read_limit(
    951      1.1  christos 	struct bufferevent_rate_limit_group *grp)
    952      1.1  christos {
    953      1.1  christos 	ev_ssize_t r;
    954      1.1  christos 	LOCK_GROUP(grp);
    955      1.1  christos 	r = grp->rate_limit.read_limit;
    956      1.1  christos 	UNLOCK_GROUP(grp);
    957      1.1  christos 	return r;
    958      1.1  christos }
    959      1.1  christos 
    960      1.1  christos /* Mostly you don't want to use this function from inside libevent;
    961      1.1  christos  * bufferevent_get_write_max_() is more likely what you want. */
    962      1.1  christos ev_ssize_t
    963      1.1  christos bufferevent_rate_limit_group_get_write_limit(
    964      1.1  christos 	struct bufferevent_rate_limit_group *grp)
    965      1.1  christos {
    966      1.1  christos 	ev_ssize_t r;
    967      1.1  christos 	LOCK_GROUP(grp);
    968      1.1  christos 	r = grp->rate_limit.write_limit;
    969      1.1  christos 	UNLOCK_GROUP(grp);
    970      1.1  christos 	return r;
    971      1.1  christos }
    972      1.1  christos 
    973      1.1  christos int
    974      1.1  christos bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
    975      1.1  christos {
    976      1.1  christos 	int r = 0;
    977      1.1  christos 	ev_ssize_t old_limit, new_limit;
    978      1.1  christos 	struct bufferevent_private *bevp;
    979      1.1  christos 	BEV_LOCK(bev);
    980      1.1  christos 	bevp = BEV_UPCAST(bev);
    981      1.1  christos 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
    982      1.1  christos 	old_limit = bevp->rate_limiting->limit.read_limit;
    983      1.1  christos 
    984      1.1  christos 	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
    985      1.1  christos 	if (old_limit > 0 && new_limit <= 0) {
    986      1.1  christos 		bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
    987      1.1  christos 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
    988      1.1  christos 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
    989      1.1  christos 			r = -1;
    990      1.1  christos 	} else if (old_limit <= 0 && new_limit > 0) {
    991      1.1  christos 		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
    992      1.1  christos 			event_del(&bevp->rate_limiting->refill_bucket_event);
    993      1.1  christos 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
    994      1.1  christos 	}
    995      1.1  christos 
    996      1.1  christos 	BEV_UNLOCK(bev);
    997      1.1  christos 	return r;
    998      1.1  christos }
    999      1.1  christos 
   1000      1.1  christos int
   1001      1.1  christos bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
   1002      1.1  christos {
   1003      1.1  christos 	/* XXXX this is mostly copy-and-paste from
   1004      1.1  christos 	 * bufferevent_decrement_read_limit */
   1005      1.1  christos 	int r = 0;
   1006      1.1  christos 	ev_ssize_t old_limit, new_limit;
   1007      1.1  christos 	struct bufferevent_private *bevp;
   1008      1.1  christos 	BEV_LOCK(bev);
   1009      1.1  christos 	bevp = BEV_UPCAST(bev);
   1010      1.1  christos 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
   1011      1.1  christos 	old_limit = bevp->rate_limiting->limit.write_limit;
   1012      1.1  christos 
   1013      1.1  christos 	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
   1014      1.1  christos 	if (old_limit > 0 && new_limit <= 0) {
   1015      1.1  christos 		bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
   1016      1.1  christos 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
   1017      1.1  christos 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
   1018      1.1  christos 			r = -1;
   1019      1.1  christos 	} else if (old_limit <= 0 && new_limit > 0) {
   1020      1.1  christos 		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
   1021      1.1  christos 			event_del(&bevp->rate_limiting->refill_bucket_event);
   1022      1.1  christos 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
   1023      1.1  christos 	}
   1024      1.1  christos 
   1025      1.1  christos 	BEV_UNLOCK(bev);
   1026      1.1  christos 	return r;
   1027      1.1  christos }
   1028      1.1  christos 
   1029      1.1  christos int
   1030      1.1  christos bufferevent_rate_limit_group_decrement_read(
   1031      1.1  christos 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
   1032      1.1  christos {
   1033      1.1  christos 	int r = 0;
   1034      1.1  christos 	ev_ssize_t old_limit, new_limit;
   1035      1.1  christos 	LOCK_GROUP(grp);
   1036      1.1  christos 	old_limit = grp->rate_limit.read_limit;
   1037      1.1  christos 	new_limit = (grp->rate_limit.read_limit -= decr);
   1038      1.1  christos 
   1039      1.1  christos 	if (old_limit > 0 && new_limit <= 0) {
   1040      1.1  christos 		bev_group_suspend_reading_(grp);
   1041      1.1  christos 	} else if (old_limit <= 0 && new_limit > 0) {
   1042      1.1  christos 		bev_group_unsuspend_reading_(grp);
   1043      1.1  christos 	}
   1044      1.1  christos 
   1045      1.1  christos 	UNLOCK_GROUP(grp);
   1046      1.1  christos 	return r;
   1047      1.1  christos }
   1048      1.1  christos 
   1049      1.1  christos int
   1050      1.1  christos bufferevent_rate_limit_group_decrement_write(
   1051      1.1  christos 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
   1052      1.1  christos {
   1053      1.1  christos 	int r = 0;
   1054      1.1  christos 	ev_ssize_t old_limit, new_limit;
   1055      1.1  christos 	LOCK_GROUP(grp);
   1056      1.1  christos 	old_limit = grp->rate_limit.write_limit;
   1057      1.1  christos 	new_limit = (grp->rate_limit.write_limit -= decr);
   1058      1.1  christos 
   1059      1.1  christos 	if (old_limit > 0 && new_limit <= 0) {
   1060      1.1  christos 		bev_group_suspend_writing_(grp);
   1061      1.1  christos 	} else if (old_limit <= 0 && new_limit > 0) {
   1062      1.1  christos 		bev_group_unsuspend_writing_(grp);
   1063      1.1  christos 	}
   1064      1.1  christos 
   1065      1.1  christos 	UNLOCK_GROUP(grp);
   1066      1.1  christos 	return r;
   1067      1.1  christos }
   1068      1.1  christos 
   1069      1.1  christos void
   1070      1.1  christos bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
   1071      1.1  christos     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
   1072      1.1  christos {
   1073      1.1  christos 	EVUTIL_ASSERT(grp != NULL);
   1074      1.1  christos 	if (total_read_out)
   1075      1.1  christos 		*total_read_out = grp->total_read;
   1076      1.1  christos 	if (total_written_out)
   1077      1.1  christos 		*total_written_out = grp->total_written;
   1078      1.1  christos }
   1079      1.1  christos 
   1080      1.1  christos void
   1081      1.1  christos bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
   1082      1.1  christos {
   1083      1.1  christos 	grp->total_read = grp->total_written = 0;
   1084      1.1  christos }
   1085      1.1  christos 
   1086      1.1  christos int
   1087      1.1  christos bufferevent_ratelim_init_(struct bufferevent_private *bev)
   1088      1.1  christos {
   1089      1.1  christos 	bev->rate_limiting = NULL;
   1090      1.1  christos 	bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
   1091      1.1  christos 	bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
   1092      1.1  christos 
   1093      1.1  christos 	return 0;
   1094      1.1  christos }
   1095