bufferevent_ratelim.c revision 1.1.1.2 1 1.1 christos /* $NetBSD: bufferevent_ratelim.c,v 1.1.1.2 2014/12/19 20:37:46 christos Exp $ */
2 1.1 christos
3 1.1 christos /*
4 1.1 christos * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
5 1.1 christos * Copyright (c) 2002-2006 Niels Provos <provos (at) citi.umich.edu>
6 1.1 christos * All rights reserved.
7 1.1 christos *
8 1.1 christos * Redistribution and use in source and binary forms, with or without
9 1.1 christos * modification, are permitted provided that the following conditions
10 1.1 christos * are met:
11 1.1 christos * 1. Redistributions of source code must retain the above copyright
12 1.1 christos * notice, this list of conditions and the following disclaimer.
13 1.1 christos * 2. Redistributions in binary form must reproduce the above copyright
14 1.1 christos * notice, this list of conditions and the following disclaimer in the
15 1.1 christos * documentation and/or other materials provided with the distribution.
16 1.1 christos * 3. The name of the author may not be used to endorse or promote products
17 1.1 christos * derived from this software without specific prior written permission.
18 1.1 christos *
19 1.1 christos * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20 1.1 christos * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21 1.1 christos * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22 1.1 christos * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23 1.1 christos * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24 1.1 christos * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 1.1 christos * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 1.1 christos * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 1.1 christos * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 1.1 christos * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 1.1 christos */
30 1.1 christos #include "evconfig-private.h"
31 1.1 christos
32 1.1 christos #include <sys/types.h>
33 1.1 christos #include <limits.h>
34 1.1 christos #include <string.h>
35 1.1 christos #include <stdlib.h>
36 1.1 christos
37 1.1 christos #include "event2/event.h"
38 1.1 christos #include "event2/event_struct.h"
39 1.1 christos #include "event2/util.h"
40 1.1 christos #include "event2/bufferevent.h"
41 1.1 christos #include "event2/bufferevent_struct.h"
42 1.1 christos #include "event2/buffer.h"
43 1.1 christos
44 1.1 christos #include "ratelim-internal.h"
45 1.1 christos
46 1.1 christos #include "bufferevent-internal.h"
47 1.1 christos #include "mm-internal.h"
48 1.1 christos #include "util-internal.h"
49 1.1 christos #include "event-internal.h"
50 1.1 christos
51 1.1 christos int
52 1.1 christos ev_token_bucket_init_(struct ev_token_bucket *bucket,
53 1.1 christos const struct ev_token_bucket_cfg *cfg,
54 1.1 christos ev_uint32_t current_tick,
55 1.1 christos int reinitialize)
56 1.1 christos {
57 1.1 christos if (reinitialize) {
58 1.1 christos /* on reinitialization, we only clip downwards, since we've
59 1.1 christos already used who-knows-how-much bandwidth this tick. We
60 1.1 christos leave "last_updated" as it is; the next update will add the
61 1.1 christos appropriate amount of bandwidth to the bucket.
62 1.1 christos */
63 1.1 christos if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
64 1.1 christos bucket->read_limit = cfg->read_maximum;
65 1.1 christos if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
66 1.1 christos bucket->write_limit = cfg->write_maximum;
67 1.1 christos } else {
68 1.1 christos bucket->read_limit = cfg->read_rate;
69 1.1 christos bucket->write_limit = cfg->write_rate;
70 1.1 christos bucket->last_updated = current_tick;
71 1.1 christos }
72 1.1 christos return 0;
73 1.1 christos }
74 1.1 christos
75 1.1 christos int
76 1.1 christos ev_token_bucket_update_(struct ev_token_bucket *bucket,
77 1.1 christos const struct ev_token_bucket_cfg *cfg,
78 1.1 christos ev_uint32_t current_tick)
79 1.1 christos {
80 1.1 christos /* It's okay if the tick number overflows, since we'll just
81 1.1 christos * wrap around when we do the unsigned substraction. */
82 1.1 christos unsigned n_ticks = current_tick - bucket->last_updated;
83 1.1 christos
84 1.1 christos /* Make sure some ticks actually happened, and that time didn't
85 1.1 christos * roll back. */
86 1.1 christos if (n_ticks == 0 || n_ticks > INT_MAX)
87 1.1 christos return 0;
88 1.1 christos
89 1.1 christos /* Naively, we would say
90 1.1 christos bucket->limit += n_ticks * cfg->rate;
91 1.1 christos
92 1.1 christos if (bucket->limit > cfg->maximum)
93 1.1 christos bucket->limit = cfg->maximum;
94 1.1 christos
95 1.1 christos But we're worried about overflow, so we do it like this:
96 1.1 christos */
97 1.1 christos
98 1.1 christos if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
99 1.1 christos bucket->read_limit = cfg->read_maximum;
100 1.1 christos else
101 1.1 christos bucket->read_limit += n_ticks * cfg->read_rate;
102 1.1 christos
103 1.1 christos
104 1.1 christos if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
105 1.1 christos bucket->write_limit = cfg->write_maximum;
106 1.1 christos else
107 1.1 christos bucket->write_limit += n_ticks * cfg->write_rate;
108 1.1 christos
109 1.1 christos
110 1.1 christos bucket->last_updated = current_tick;
111 1.1 christos
112 1.1 christos return 1;
113 1.1 christos }
114 1.1 christos
115 1.1 christos static inline void
116 1.1 christos bufferevent_update_buckets(struct bufferevent_private *bev)
117 1.1 christos {
118 1.1 christos /* Must hold lock on bev. */
119 1.1 christos struct timeval now;
120 1.1 christos unsigned tick;
121 1.1 christos event_base_gettimeofday_cached(bev->bev.ev_base, &now);
122 1.1 christos tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
123 1.1 christos if (tick != bev->rate_limiting->limit.last_updated)
124 1.1 christos ev_token_bucket_update_(&bev->rate_limiting->limit,
125 1.1 christos bev->rate_limiting->cfg, tick);
126 1.1 christos }
127 1.1 christos
128 1.1 christos ev_uint32_t
129 1.1 christos ev_token_bucket_get_tick_(const struct timeval *tv,
130 1.1 christos const struct ev_token_bucket_cfg *cfg)
131 1.1 christos {
132 1.1 christos /* This computation uses two multiplies and a divide. We could do
133 1.1 christos * fewer if we knew that the tick length was an integer number of
134 1.1 christos * seconds, or if we knew it divided evenly into a second. We should
135 1.1 christos * investigate that more.
136 1.1 christos */
137 1.1 christos
138 1.1 christos /* We cast to an ev_uint64_t first, since we don't want to overflow
139 1.1 christos * before we do the final divide. */
140 1.1 christos ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
141 1.1 christos return (unsigned)(msec / cfg->msec_per_tick);
142 1.1 christos }
143 1.1 christos
144 1.1 christos struct ev_token_bucket_cfg *
145 1.1 christos ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
146 1.1 christos size_t write_rate, size_t write_burst,
147 1.1 christos const struct timeval *tick_len)
148 1.1 christos {
149 1.1 christos struct ev_token_bucket_cfg *r;
150 1.1 christos struct timeval g;
151 1.1 christos if (! tick_len) {
152 1.1 christos g.tv_sec = 1;
153 1.1 christos g.tv_usec = 0;
154 1.1 christos tick_len = &g;
155 1.1 christos }
156 1.1 christos if (read_rate > read_burst || write_rate > write_burst ||
157 1.1 christos read_rate < 1 || write_rate < 1)
158 1.1 christos return NULL;
159 1.1 christos if (read_rate > EV_RATE_LIMIT_MAX ||
160 1.1 christos write_rate > EV_RATE_LIMIT_MAX ||
161 1.1 christos read_burst > EV_RATE_LIMIT_MAX ||
162 1.1 christos write_burst > EV_RATE_LIMIT_MAX)
163 1.1 christos return NULL;
164 1.1 christos r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
165 1.1 christos if (!r)
166 1.1 christos return NULL;
167 1.1 christos r->read_rate = read_rate;
168 1.1 christos r->write_rate = write_rate;
169 1.1 christos r->read_maximum = read_burst;
170 1.1 christos r->write_maximum = write_burst;
171 1.1 christos memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
172 1.1 christos r->msec_per_tick = (tick_len->tv_sec * 1000) +
173 1.1 christos (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
174 1.1 christos return r;
175 1.1 christos }
176 1.1 christos
177 1.1 christos void
178 1.1 christos ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
179 1.1 christos {
180 1.1 christos mm_free(cfg);
181 1.1 christos }
182 1.1 christos
183 1.1 christos /* Default values for max_single_read & max_single_write variables. */
184 1.1 christos #define MAX_SINGLE_READ_DEFAULT 16384
185 1.1 christos #define MAX_SINGLE_WRITE_DEFAULT 16384
186 1.1 christos
187 1.1 christos #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
188 1.1 christos #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
189 1.1 christos
190 1.1 christos static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
191 1.1 christos static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
192 1.1 christos static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
193 1.1 christos static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
194 1.1 christos
195 1.1 christos /** Helper: figure out the maximum amount we should write if is_write, or
196 1.1 christos the maximum amount we should read if is_read. Return that maximum, or
197 1.1 christos 0 if our bucket is wholly exhausted.
198 1.1 christos */
199 1.1 christos static inline ev_ssize_t
200 1.1 christos bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
201 1.1 christos {
202 1.1 christos /* needs lock on bev. */
203 1.1 christos ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
204 1.1 christos
205 1.1 christos #define LIM(x) \
206 1.1 christos (is_write ? (x).write_limit : (x).read_limit)
207 1.1 christos
208 1.1 christos #define GROUP_SUSPENDED(g) \
209 1.1 christos (is_write ? (g)->write_suspended : (g)->read_suspended)
210 1.1 christos
211 1.1 christos /* Sets max_so_far to MIN(x, max_so_far) */
212 1.1 christos #define CLAMPTO(x) \
213 1.1 christos do { \
214 1.1 christos if (max_so_far > (x)) \
215 1.1 christos max_so_far = (x); \
216 1.1 christos } while (0);
217 1.1 christos
218 1.1 christos if (!bev->rate_limiting)
219 1.1 christos return max_so_far;
220 1.1 christos
221 1.1 christos /* If rate-limiting is enabled at all, update the appropriate
222 1.1 christos bucket, and take the smaller of our rate limit and the group
223 1.1 christos rate limit.
224 1.1 christos */
225 1.1 christos
226 1.1 christos if (bev->rate_limiting->cfg) {
227 1.1 christos bufferevent_update_buckets(bev);
228 1.1 christos max_so_far = LIM(bev->rate_limiting->limit);
229 1.1 christos }
230 1.1 christos if (bev->rate_limiting->group) {
231 1.1 christos struct bufferevent_rate_limit_group *g =
232 1.1 christos bev->rate_limiting->group;
233 1.1 christos ev_ssize_t share;
234 1.1 christos LOCK_GROUP(g);
235 1.1 christos if (GROUP_SUSPENDED(g)) {
236 1.1 christos /* We can get here if we failed to lock this
237 1.1 christos * particular bufferevent while suspending the whole
238 1.1 christos * group. */
239 1.1 christos if (is_write)
240 1.1 christos bufferevent_suspend_write_(&bev->bev,
241 1.1 christos BEV_SUSPEND_BW_GROUP);
242 1.1 christos else
243 1.1 christos bufferevent_suspend_read_(&bev->bev,
244 1.1 christos BEV_SUSPEND_BW_GROUP);
245 1.1 christos share = 0;
246 1.1 christos } else {
247 1.1 christos /* XXXX probably we should divide among the active
248 1.1 christos * members, not the total members. */
249 1.1 christos share = LIM(g->rate_limit) / g->n_members;
250 1.1 christos if (share < g->min_share)
251 1.1 christos share = g->min_share;
252 1.1 christos }
253 1.1 christos UNLOCK_GROUP(g);
254 1.1 christos CLAMPTO(share);
255 1.1 christos }
256 1.1 christos
257 1.1 christos if (max_so_far < 0)
258 1.1 christos max_so_far = 0;
259 1.1 christos return max_so_far;
260 1.1 christos }
261 1.1 christos
262 1.1 christos ev_ssize_t
263 1.1 christos bufferevent_get_read_max_(struct bufferevent_private *bev)
264 1.1 christos {
265 1.1 christos return bufferevent_get_rlim_max_(bev, 0);
266 1.1 christos }
267 1.1 christos
268 1.1 christos ev_ssize_t
269 1.1 christos bufferevent_get_write_max_(struct bufferevent_private *bev)
270 1.1 christos {
271 1.1 christos return bufferevent_get_rlim_max_(bev, 1);
272 1.1 christos }
273 1.1 christos
274 1.1 christos int
275 1.1 christos bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
276 1.1 christos {
277 1.1 christos /* XXXXX Make sure all users of this function check its return value */
278 1.1 christos int r = 0;
279 1.1 christos /* need to hold lock on bev */
280 1.1 christos if (!bev->rate_limiting)
281 1.1 christos return 0;
282 1.1 christos
283 1.1 christos if (bev->rate_limiting->cfg) {
284 1.1 christos bev->rate_limiting->limit.read_limit -= bytes;
285 1.1 christos if (bev->rate_limiting->limit.read_limit <= 0) {
286 1.1 christos bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
287 1.1 christos if (event_add(&bev->rate_limiting->refill_bucket_event,
288 1.1 christos &bev->rate_limiting->cfg->tick_timeout) < 0)
289 1.1 christos r = -1;
290 1.1 christos } else if (bev->read_suspended & BEV_SUSPEND_BW) {
291 1.1 christos if (!(bev->write_suspended & BEV_SUSPEND_BW))
292 1.1 christos event_del(&bev->rate_limiting->refill_bucket_event);
293 1.1 christos bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
294 1.1 christos }
295 1.1 christos }
296 1.1 christos
297 1.1 christos if (bev->rate_limiting->group) {
298 1.1 christos LOCK_GROUP(bev->rate_limiting->group);
299 1.1 christos bev->rate_limiting->group->rate_limit.read_limit -= bytes;
300 1.1 christos bev->rate_limiting->group->total_read += bytes;
301 1.1 christos if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
302 1.1 christos bev_group_suspend_reading_(bev->rate_limiting->group);
303 1.1 christos } else if (bev->rate_limiting->group->read_suspended) {
304 1.1 christos bev_group_unsuspend_reading_(bev->rate_limiting->group);
305 1.1 christos }
306 1.1 christos UNLOCK_GROUP(bev->rate_limiting->group);
307 1.1 christos }
308 1.1 christos
309 1.1 christos return r;
310 1.1 christos }
311 1.1 christos
312 1.1 christos int
313 1.1 christos bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
314 1.1 christos {
315 1.1 christos /* XXXXX Make sure all users of this function check its return value */
316 1.1 christos int r = 0;
317 1.1 christos /* need to hold lock */
318 1.1 christos if (!bev->rate_limiting)
319 1.1 christos return 0;
320 1.1 christos
321 1.1 christos if (bev->rate_limiting->cfg) {
322 1.1 christos bev->rate_limiting->limit.write_limit -= bytes;
323 1.1 christos if (bev->rate_limiting->limit.write_limit <= 0) {
324 1.1 christos bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
325 1.1 christos if (event_add(&bev->rate_limiting->refill_bucket_event,
326 1.1 christos &bev->rate_limiting->cfg->tick_timeout) < 0)
327 1.1 christos r = -1;
328 1.1 christos } else if (bev->write_suspended & BEV_SUSPEND_BW) {
329 1.1 christos if (!(bev->read_suspended & BEV_SUSPEND_BW))
330 1.1 christos event_del(&bev->rate_limiting->refill_bucket_event);
331 1.1 christos bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
332 1.1 christos }
333 1.1 christos }
334 1.1 christos
335 1.1 christos if (bev->rate_limiting->group) {
336 1.1 christos LOCK_GROUP(bev->rate_limiting->group);
337 1.1 christos bev->rate_limiting->group->rate_limit.write_limit -= bytes;
338 1.1 christos bev->rate_limiting->group->total_written += bytes;
339 1.1 christos if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
340 1.1 christos bev_group_suspend_writing_(bev->rate_limiting->group);
341 1.1 christos } else if (bev->rate_limiting->group->write_suspended) {
342 1.1 christos bev_group_unsuspend_writing_(bev->rate_limiting->group);
343 1.1 christos }
344 1.1 christos UNLOCK_GROUP(bev->rate_limiting->group);
345 1.1 christos }
346 1.1 christos
347 1.1 christos return r;
348 1.1 christos }
349 1.1 christos
350 1.1 christos /** Stop reading on every bufferevent in <b>g</b> */
351 1.1 christos static int
352 1.1 christos bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
353 1.1 christos {
354 1.1 christos /* Needs group lock */
355 1.1 christos struct bufferevent_private *bev;
356 1.1 christos g->read_suspended = 1;
357 1.1 christos g->pending_unsuspend_read = 0;
358 1.1 christos
359 1.1 christos /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
360 1.1 christos to prevent a deadlock. (Ordinarily, the group lock nests inside
361 1.1 christos the bufferevent locks. If we are unable to lock any individual
362 1.1 christos bufferevent, it will find out later when it looks at its limit
363 1.1 christos and sees that its group is suspended.)
364 1.1 christos */
365 1.1 christos LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
366 1.1 christos if (EVLOCK_TRY_LOCK_(bev->lock)) {
367 1.1 christos bufferevent_suspend_read_(&bev->bev,
368 1.1 christos BEV_SUSPEND_BW_GROUP);
369 1.1 christos EVLOCK_UNLOCK(bev->lock, 0);
370 1.1 christos }
371 1.1 christos }
372 1.1 christos return 0;
373 1.1 christos }
374 1.1 christos
375 1.1 christos /** Stop writing on every bufferevent in <b>g</b> */
376 1.1 christos static int
377 1.1 christos bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
378 1.1 christos {
379 1.1 christos /* Needs group lock */
380 1.1 christos struct bufferevent_private *bev;
381 1.1 christos g->write_suspended = 1;
382 1.1 christos g->pending_unsuspend_write = 0;
383 1.1 christos LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
384 1.1 christos if (EVLOCK_TRY_LOCK_(bev->lock)) {
385 1.1 christos bufferevent_suspend_write_(&bev->bev,
386 1.1 christos BEV_SUSPEND_BW_GROUP);
387 1.1 christos EVLOCK_UNLOCK(bev->lock, 0);
388 1.1 christos }
389 1.1 christos }
390 1.1 christos return 0;
391 1.1 christos }
392 1.1 christos
393 1.1 christos /** Timer callback invoked on a single bufferevent with one or more exhausted
394 1.1 christos buckets when they are ready to refill. */
395 1.1 christos static void
396 1.1 christos bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
397 1.1 christos {
398 1.1 christos unsigned tick;
399 1.1 christos struct timeval now;
400 1.1 christos struct bufferevent_private *bev = arg;
401 1.1 christos int again = 0;
402 1.1 christos BEV_LOCK(&bev->bev);
403 1.1 christos if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
404 1.1 christos BEV_UNLOCK(&bev->bev);
405 1.1 christos return;
406 1.1 christos }
407 1.1 christos
408 1.1 christos /* First, update the bucket */
409 1.1 christos event_base_gettimeofday_cached(bev->bev.ev_base, &now);
410 1.1 christos tick = ev_token_bucket_get_tick_(&now,
411 1.1 christos bev->rate_limiting->cfg);
412 1.1 christos ev_token_bucket_update_(&bev->rate_limiting->limit,
413 1.1 christos bev->rate_limiting->cfg,
414 1.1 christos tick);
415 1.1 christos
416 1.1 christos /* Now unsuspend any read/write operations as appropriate. */
417 1.1 christos if ((bev->read_suspended & BEV_SUSPEND_BW)) {
418 1.1 christos if (bev->rate_limiting->limit.read_limit > 0)
419 1.1 christos bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
420 1.1 christos else
421 1.1 christos again = 1;
422 1.1 christos }
423 1.1 christos if ((bev->write_suspended & BEV_SUSPEND_BW)) {
424 1.1 christos if (bev->rate_limiting->limit.write_limit > 0)
425 1.1 christos bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
426 1.1 christos else
427 1.1 christos again = 1;
428 1.1 christos }
429 1.1 christos if (again) {
430 1.1 christos /* One or more of the buckets may need another refill if they
431 1.1 christos started negative.
432 1.1 christos
433 1.1 christos XXXX if we need to be quiet for more ticks, we should
434 1.1 christos maybe figure out what timeout we really want.
435 1.1 christos */
436 1.1 christos /* XXXX Handle event_add failure somehow */
437 1.1 christos event_add(&bev->rate_limiting->refill_bucket_event,
438 1.1 christos &bev->rate_limiting->cfg->tick_timeout);
439 1.1 christos }
440 1.1 christos BEV_UNLOCK(&bev->bev);
441 1.1 christos }
442 1.1 christos
443 1.1 christos /** Helper: grab a random element from a bufferevent group.
444 1.1 christos *
445 1.1 christos * Requires that we hold the lock on the group.
446 1.1 christos */
447 1.1 christos static struct bufferevent_private *
448 1.1 christos bev_group_random_element_(struct bufferevent_rate_limit_group *group)
449 1.1 christos {
450 1.1 christos int which;
451 1.1 christos struct bufferevent_private *bev;
452 1.1 christos
453 1.1 christos /* requires group lock */
454 1.1 christos
455 1.1 christos if (!group->n_members)
456 1.1 christos return NULL;
457 1.1 christos
458 1.1 christos EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
459 1.1 christos
460 1.1 christos which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
461 1.1 christos
462 1.1 christos bev = LIST_FIRST(&group->members);
463 1.1 christos while (which--)
464 1.1 christos bev = LIST_NEXT(bev, rate_limiting->next_in_group);
465 1.1 christos
466 1.1 christos return bev;
467 1.1 christos }
468 1.1 christos
469 1.1 christos /** Iterate over the elements of a rate-limiting group 'g' with a random
470 1.1 christos starting point, assigning each to the variable 'bev', and executing the
471 1.1 christos block 'block'.
472 1.1 christos
473 1.1 christos We do this in a half-baked effort to get fairness among group members.
474 1.1 christos XXX Round-robin or some kind of priority queue would be even more fair.
475 1.1 christos */
476 1.1 christos #define FOREACH_RANDOM_ORDER(block) \
477 1.1 christos do { \
478 1.1 christos first = bev_group_random_element_(g); \
479 1.1 christos for (bev = first; bev != LIST_END(&g->members); \
480 1.1 christos bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
481 1.1 christos block ; \
482 1.1 christos } \
483 1.1 christos for (bev = LIST_FIRST(&g->members); bev && bev != first; \
484 1.1 christos bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
485 1.1 christos block ; \
486 1.1 christos } \
487 1.1 christos } while (0)
488 1.1 christos
489 1.1 christos static void
490 1.1 christos bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
491 1.1 christos {
492 1.1 christos int again = 0;
493 1.1 christos struct bufferevent_private *bev, *first;
494 1.1 christos
495 1.1 christos g->read_suspended = 0;
496 1.1 christos FOREACH_RANDOM_ORDER({
497 1.1 christos if (EVLOCK_TRY_LOCK_(bev->lock)) {
498 1.1 christos bufferevent_unsuspend_read_(&bev->bev,
499 1.1 christos BEV_SUSPEND_BW_GROUP);
500 1.1 christos EVLOCK_UNLOCK(bev->lock, 0);
501 1.1 christos } else {
502 1.1 christos again = 1;
503 1.1 christos }
504 1.1 christos });
505 1.1 christos g->pending_unsuspend_read = again;
506 1.1 christos }
507 1.1 christos
508 1.1 christos static void
509 1.1 christos bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
510 1.1 christos {
511 1.1 christos int again = 0;
512 1.1 christos struct bufferevent_private *bev, *first;
513 1.1 christos g->write_suspended = 0;
514 1.1 christos
515 1.1 christos FOREACH_RANDOM_ORDER({
516 1.1 christos if (EVLOCK_TRY_LOCK_(bev->lock)) {
517 1.1 christos bufferevent_unsuspend_write_(&bev->bev,
518 1.1 christos BEV_SUSPEND_BW_GROUP);
519 1.1 christos EVLOCK_UNLOCK(bev->lock, 0);
520 1.1 christos } else {
521 1.1 christos again = 1;
522 1.1 christos }
523 1.1 christos });
524 1.1 christos g->pending_unsuspend_write = again;
525 1.1 christos }
526 1.1 christos
527 1.1 christos /** Callback invoked every tick to add more elements to the group bucket
528 1.1 christos and unsuspend group members as needed.
529 1.1 christos */
530 1.1 christos static void
531 1.1 christos bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
532 1.1 christos {
533 1.1 christos struct bufferevent_rate_limit_group *g = arg;
534 1.1 christos unsigned tick;
535 1.1 christos struct timeval now;
536 1.1 christos
537 1.1 christos event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
538 1.1 christos
539 1.1 christos LOCK_GROUP(g);
540 1.1 christos
541 1.1 christos tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
542 1.1 christos ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
543 1.1 christos
544 1.1 christos if (g->pending_unsuspend_read ||
545 1.1 christos (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
546 1.1 christos bev_group_unsuspend_reading_(g);
547 1.1 christos }
548 1.1 christos if (g->pending_unsuspend_write ||
549 1.1 christos (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
550 1.1 christos bev_group_unsuspend_writing_(g);
551 1.1 christos }
552 1.1 christos
553 1.1 christos /* XXXX Rather than waiting to the next tick to unsuspend stuff
554 1.1 christos * with pending_unsuspend_write/read, we should do it on the
555 1.1 christos * next iteration of the mainloop.
556 1.1 christos */
557 1.1 christos
558 1.1 christos UNLOCK_GROUP(g);
559 1.1 christos }
560 1.1 christos
561 1.1 christos int
562 1.1 christos bufferevent_set_rate_limit(struct bufferevent *bev,
563 1.1 christos struct ev_token_bucket_cfg *cfg)
564 1.1 christos {
565 1.1 christos struct bufferevent_private *bevp =
566 1.1 christos EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
567 1.1 christos int r = -1;
568 1.1 christos struct bufferevent_rate_limit *rlim;
569 1.1 christos struct timeval now;
570 1.1 christos ev_uint32_t tick;
571 1.1 christos int reinit = 0, suspended = 0;
572 1.1 christos /* XXX reference-count cfg */
573 1.1 christos
574 1.1 christos BEV_LOCK(bev);
575 1.1 christos
576 1.1 christos if (cfg == NULL) {
577 1.1 christos if (bevp->rate_limiting) {
578 1.1 christos rlim = bevp->rate_limiting;
579 1.1 christos rlim->cfg = NULL;
580 1.1 christos bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
581 1.1 christos bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
582 1.1 christos if (event_initialized(&rlim->refill_bucket_event))
583 1.1 christos event_del(&rlim->refill_bucket_event);
584 1.1 christos }
585 1.1 christos r = 0;
586 1.1 christos goto done;
587 1.1 christos }
588 1.1 christos
589 1.1 christos event_base_gettimeofday_cached(bev->ev_base, &now);
590 1.1 christos tick = ev_token_bucket_get_tick_(&now, cfg);
591 1.1 christos
592 1.1 christos if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
593 1.1 christos /* no-op */
594 1.1 christos r = 0;
595 1.1 christos goto done;
596 1.1 christos }
597 1.1 christos if (bevp->rate_limiting == NULL) {
598 1.1 christos rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
599 1.1 christos if (!rlim)
600 1.1 christos goto done;
601 1.1 christos bevp->rate_limiting = rlim;
602 1.1 christos } else {
603 1.1 christos rlim = bevp->rate_limiting;
604 1.1 christos }
605 1.1 christos reinit = rlim->cfg != NULL;
606 1.1 christos
607 1.1 christos rlim->cfg = cfg;
608 1.1 christos ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
609 1.1 christos
610 1.1 christos if (reinit) {
611 1.1 christos EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
612 1.1 christos event_del(&rlim->refill_bucket_event);
613 1.1 christos }
614 1.1.1.2 christos event_assign(&rlim->refill_bucket_event, bev->ev_base,
615 1.1.1.2 christos -1, EV_FINALIZE, bev_refill_callback_, bevp);
616 1.1 christos
617 1.1 christos if (rlim->limit.read_limit > 0) {
618 1.1 christos bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
619 1.1 christos } else {
620 1.1 christos bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
621 1.1 christos suspended=1;
622 1.1 christos }
623 1.1 christos if (rlim->limit.write_limit > 0) {
624 1.1 christos bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
625 1.1 christos } else {
626 1.1 christos bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
627 1.1 christos suspended = 1;
628 1.1 christos }
629 1.1 christos
630 1.1 christos if (suspended)
631 1.1 christos event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
632 1.1 christos
633 1.1 christos r = 0;
634 1.1 christos
635 1.1 christos done:
636 1.1 christos BEV_UNLOCK(bev);
637 1.1 christos return r;
638 1.1 christos }
639 1.1 christos
640 1.1 christos struct bufferevent_rate_limit_group *
641 1.1 christos bufferevent_rate_limit_group_new(struct event_base *base,
642 1.1 christos const struct ev_token_bucket_cfg *cfg)
643 1.1 christos {
644 1.1 christos struct bufferevent_rate_limit_group *g;
645 1.1 christos struct timeval now;
646 1.1 christos ev_uint32_t tick;
647 1.1 christos
648 1.1 christos event_base_gettimeofday_cached(base, &now);
649 1.1 christos tick = ev_token_bucket_get_tick_(&now, cfg);
650 1.1 christos
651 1.1 christos g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
652 1.1 christos if (!g)
653 1.1 christos return NULL;
654 1.1 christos memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
655 1.1 christos LIST_INIT(&g->members);
656 1.1 christos
657 1.1 christos ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
658 1.1 christos
659 1.1.1.2 christos event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
660 1.1 christos bev_group_refill_callback_, g);
661 1.1 christos /*XXXX handle event_add failure */
662 1.1 christos event_add(&g->master_refill_event, &cfg->tick_timeout);
663 1.1 christos
664 1.1 christos EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
665 1.1 christos
666 1.1 christos bufferevent_rate_limit_group_set_min_share(g, 64);
667 1.1 christos
668 1.1 christos evutil_weakrand_seed_(&g->weakrand_seed,
669 1.1 christos (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
670 1.1 christos
671 1.1 christos return g;
672 1.1 christos }
673 1.1 christos
674 1.1 christos int
675 1.1 christos bufferevent_rate_limit_group_set_cfg(
676 1.1 christos struct bufferevent_rate_limit_group *g,
677 1.1 christos const struct ev_token_bucket_cfg *cfg)
678 1.1 christos {
679 1.1 christos int same_tick;
680 1.1 christos if (!g || !cfg)
681 1.1 christos return -1;
682 1.1 christos
683 1.1 christos LOCK_GROUP(g);
684 1.1 christos same_tick = evutil_timercmp(
685 1.1 christos &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
686 1.1 christos memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
687 1.1 christos
688 1.1 christos if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
689 1.1 christos g->rate_limit.read_limit = cfg->read_maximum;
690 1.1 christos if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
691 1.1 christos g->rate_limit.write_limit = cfg->write_maximum;
692 1.1 christos
693 1.1 christos if (!same_tick) {
694 1.1 christos /* This can cause a hiccup in the schedule */
695 1.1 christos event_add(&g->master_refill_event, &cfg->tick_timeout);
696 1.1 christos }
697 1.1 christos
698 1.1 christos /* The new limits might force us to adjust min_share differently. */
699 1.1 christos bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
700 1.1 christos
701 1.1 christos UNLOCK_GROUP(g);
702 1.1 christos return 0;
703 1.1 christos }
704 1.1 christos
705 1.1 christos int
706 1.1 christos bufferevent_rate_limit_group_set_min_share(
707 1.1 christos struct bufferevent_rate_limit_group *g,
708 1.1 christos size_t share)
709 1.1 christos {
710 1.1 christos if (share > EV_SSIZE_MAX)
711 1.1 christos return -1;
712 1.1 christos
713 1.1 christos g->configured_min_share = share;
714 1.1 christos
715 1.1 christos /* Can't set share to less than the one-tick maximum. IOW, at steady
716 1.1 christos * state, at least one connection can go per tick. */
717 1.1 christos if (share > g->rate_limit_cfg.read_rate)
718 1.1 christos share = g->rate_limit_cfg.read_rate;
719 1.1 christos if (share > g->rate_limit_cfg.write_rate)
720 1.1 christos share = g->rate_limit_cfg.write_rate;
721 1.1 christos
722 1.1 christos g->min_share = share;
723 1.1 christos return 0;
724 1.1 christos }
725 1.1 christos
726 1.1 christos void
727 1.1 christos bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
728 1.1 christos {
729 1.1 christos LOCK_GROUP(g);
730 1.1 christos EVUTIL_ASSERT(0 == g->n_members);
731 1.1 christos event_del(&g->master_refill_event);
732 1.1 christos UNLOCK_GROUP(g);
733 1.1 christos EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
734 1.1 christos mm_free(g);
735 1.1 christos }
736 1.1 christos
737 1.1 christos int
738 1.1 christos bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
739 1.1 christos struct bufferevent_rate_limit_group *g)
740 1.1 christos {
741 1.1 christos int wsuspend, rsuspend;
742 1.1 christos struct bufferevent_private *bevp =
743 1.1 christos EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
744 1.1 christos BEV_LOCK(bev);
745 1.1 christos
746 1.1 christos if (!bevp->rate_limiting) {
747 1.1 christos struct bufferevent_rate_limit *rlim;
748 1.1 christos rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
749 1.1 christos if (!rlim) {
750 1.1 christos BEV_UNLOCK(bev);
751 1.1 christos return -1;
752 1.1 christos }
753 1.1.1.2 christos event_assign(&rlim->refill_bucket_event, bev->ev_base,
754 1.1.1.2 christos -1, EV_FINALIZE, bev_refill_callback_, bevp);
755 1.1 christos bevp->rate_limiting = rlim;
756 1.1 christos }
757 1.1 christos
758 1.1 christos if (bevp->rate_limiting->group == g) {
759 1.1 christos BEV_UNLOCK(bev);
760 1.1 christos return 0;
761 1.1 christos }
762 1.1 christos if (bevp->rate_limiting->group)
763 1.1 christos bufferevent_remove_from_rate_limit_group(bev);
764 1.1 christos
765 1.1 christos LOCK_GROUP(g);
766 1.1 christos bevp->rate_limiting->group = g;
767 1.1 christos ++g->n_members;
768 1.1 christos LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
769 1.1 christos
770 1.1 christos rsuspend = g->read_suspended;
771 1.1 christos wsuspend = g->write_suspended;
772 1.1 christos
773 1.1 christos UNLOCK_GROUP(g);
774 1.1 christos
775 1.1 christos if (rsuspend)
776 1.1 christos bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
777 1.1 christos if (wsuspend)
778 1.1 christos bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
779 1.1 christos
780 1.1 christos BEV_UNLOCK(bev);
781 1.1 christos return 0;
782 1.1 christos }
783 1.1 christos
784 1.1 christos int
785 1.1 christos bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
786 1.1 christos {
787 1.1 christos return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
788 1.1 christos }
789 1.1 christos
790 1.1 christos int
791 1.1 christos bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
792 1.1 christos int unsuspend)
793 1.1 christos {
794 1.1 christos struct bufferevent_private *bevp =
795 1.1 christos EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
796 1.1 christos BEV_LOCK(bev);
797 1.1 christos if (bevp->rate_limiting && bevp->rate_limiting->group) {
798 1.1 christos struct bufferevent_rate_limit_group *g =
799 1.1 christos bevp->rate_limiting->group;
800 1.1 christos LOCK_GROUP(g);
801 1.1 christos bevp->rate_limiting->group = NULL;
802 1.1 christos --g->n_members;
803 1.1 christos LIST_REMOVE(bevp, rate_limiting->next_in_group);
804 1.1 christos UNLOCK_GROUP(g);
805 1.1 christos }
806 1.1 christos if (unsuspend) {
807 1.1 christos bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
808 1.1 christos bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
809 1.1 christos }
810 1.1 christos BEV_UNLOCK(bev);
811 1.1 christos return 0;
812 1.1 christos }
813 1.1 christos
814 1.1 christos /* ===
815 1.1 christos * API functions to expose rate limits.
816 1.1 christos *
817 1.1 christos * Don't use these from inside Libevent; they're meant to be for use by
818 1.1 christos * the program.
819 1.1 christos * === */
820 1.1 christos
821 1.1 christos /* Mostly you don't want to use this function from inside libevent;
822 1.1 christos * bufferevent_get_read_max_() is more likely what you want*/
823 1.1 christos ev_ssize_t
824 1.1 christos bufferevent_get_read_limit(struct bufferevent *bev)
825 1.1 christos {
826 1.1 christos ev_ssize_t r;
827 1.1 christos struct bufferevent_private *bevp;
828 1.1 christos BEV_LOCK(bev);
829 1.1 christos bevp = BEV_UPCAST(bev);
830 1.1 christos if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
831 1.1 christos bufferevent_update_buckets(bevp);
832 1.1 christos r = bevp->rate_limiting->limit.read_limit;
833 1.1 christos } else {
834 1.1 christos r = EV_SSIZE_MAX;
835 1.1 christos }
836 1.1 christos BEV_UNLOCK(bev);
837 1.1 christos return r;
838 1.1 christos }
839 1.1 christos
840 1.1 christos /* Mostly you don't want to use this function from inside libevent;
841 1.1 christos * bufferevent_get_write_max_() is more likely what you want*/
842 1.1 christos ev_ssize_t
843 1.1 christos bufferevent_get_write_limit(struct bufferevent *bev)
844 1.1 christos {
845 1.1 christos ev_ssize_t r;
846 1.1 christos struct bufferevent_private *bevp;
847 1.1 christos BEV_LOCK(bev);
848 1.1 christos bevp = BEV_UPCAST(bev);
849 1.1 christos if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
850 1.1 christos bufferevent_update_buckets(bevp);
851 1.1 christos r = bevp->rate_limiting->limit.write_limit;
852 1.1 christos } else {
853 1.1 christos r = EV_SSIZE_MAX;
854 1.1 christos }
855 1.1 christos BEV_UNLOCK(bev);
856 1.1 christos return r;
857 1.1 christos }
858 1.1 christos
859 1.1 christos int
860 1.1 christos bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
861 1.1 christos {
862 1.1 christos struct bufferevent_private *bevp;
863 1.1 christos BEV_LOCK(bev);
864 1.1 christos bevp = BEV_UPCAST(bev);
865 1.1 christos if (size == 0 || size > EV_SSIZE_MAX)
866 1.1 christos bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
867 1.1 christos else
868 1.1 christos bevp->max_single_read = size;
869 1.1 christos BEV_UNLOCK(bev);
870 1.1 christos return 0;
871 1.1 christos }
872 1.1 christos
873 1.1 christos int
874 1.1 christos bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
875 1.1 christos {
876 1.1 christos struct bufferevent_private *bevp;
877 1.1 christos BEV_LOCK(bev);
878 1.1 christos bevp = BEV_UPCAST(bev);
879 1.1 christos if (size == 0 || size > EV_SSIZE_MAX)
880 1.1 christos bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
881 1.1 christos else
882 1.1 christos bevp->max_single_write = size;
883 1.1 christos BEV_UNLOCK(bev);
884 1.1 christos return 0;
885 1.1 christos }
886 1.1 christos
887 1.1 christos ev_ssize_t
888 1.1 christos bufferevent_get_max_single_read(struct bufferevent *bev)
889 1.1 christos {
890 1.1 christos ev_ssize_t r;
891 1.1 christos
892 1.1 christos BEV_LOCK(bev);
893 1.1 christos r = BEV_UPCAST(bev)->max_single_read;
894 1.1 christos BEV_UNLOCK(bev);
895 1.1 christos return r;
896 1.1 christos }
897 1.1 christos
898 1.1 christos ev_ssize_t
899 1.1 christos bufferevent_get_max_single_write(struct bufferevent *bev)
900 1.1 christos {
901 1.1 christos ev_ssize_t r;
902 1.1 christos
903 1.1 christos BEV_LOCK(bev);
904 1.1 christos r = BEV_UPCAST(bev)->max_single_write;
905 1.1 christos BEV_UNLOCK(bev);
906 1.1 christos return r;
907 1.1 christos }
908 1.1 christos
909 1.1 christos ev_ssize_t
910 1.1 christos bufferevent_get_max_to_read(struct bufferevent *bev)
911 1.1 christos {
912 1.1 christos ev_ssize_t r;
913 1.1 christos BEV_LOCK(bev);
914 1.1 christos r = bufferevent_get_read_max_(BEV_UPCAST(bev));
915 1.1 christos BEV_UNLOCK(bev);
916 1.1 christos return r;
917 1.1 christos }
918 1.1 christos
919 1.1 christos ev_ssize_t
920 1.1 christos bufferevent_get_max_to_write(struct bufferevent *bev)
921 1.1 christos {
922 1.1 christos ev_ssize_t r;
923 1.1 christos BEV_LOCK(bev);
924 1.1 christos r = bufferevent_get_write_max_(BEV_UPCAST(bev));
925 1.1 christos BEV_UNLOCK(bev);
926 1.1 christos return r;
927 1.1 christos }
928 1.1 christos
929 1.1.1.2 christos const struct ev_token_bucket_cfg *
930 1.1.1.2 christos bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
931 1.1.1.2 christos struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
932 1.1.1.2 christos struct ev_token_bucket_cfg *cfg;
933 1.1.1.2 christos
934 1.1.1.2 christos BEV_LOCK(bev);
935 1.1.1.2 christos
936 1.1.1.2 christos if (bufev_private->rate_limiting) {
937 1.1.1.2 christos cfg = bufev_private->rate_limiting->cfg;
938 1.1.1.2 christos } else {
939 1.1.1.2 christos cfg = NULL;
940 1.1.1.2 christos }
941 1.1.1.2 christos
942 1.1.1.2 christos BEV_UNLOCK(bev);
943 1.1.1.2 christos
944 1.1.1.2 christos return cfg;
945 1.1.1.2 christos }
946 1.1 christos
947 1.1 christos /* Mostly you don't want to use this function from inside libevent;
948 1.1 christos * bufferevent_get_read_max_() is more likely what you want*/
949 1.1 christos ev_ssize_t
950 1.1 christos bufferevent_rate_limit_group_get_read_limit(
951 1.1 christos struct bufferevent_rate_limit_group *grp)
952 1.1 christos {
953 1.1 christos ev_ssize_t r;
954 1.1 christos LOCK_GROUP(grp);
955 1.1 christos r = grp->rate_limit.read_limit;
956 1.1 christos UNLOCK_GROUP(grp);
957 1.1 christos return r;
958 1.1 christos }
959 1.1 christos
960 1.1 christos /* Mostly you don't want to use this function from inside libevent;
961 1.1 christos * bufferevent_get_write_max_() is more likely what you want. */
962 1.1 christos ev_ssize_t
963 1.1 christos bufferevent_rate_limit_group_get_write_limit(
964 1.1 christos struct bufferevent_rate_limit_group *grp)
965 1.1 christos {
966 1.1 christos ev_ssize_t r;
967 1.1 christos LOCK_GROUP(grp);
968 1.1 christos r = grp->rate_limit.write_limit;
969 1.1 christos UNLOCK_GROUP(grp);
970 1.1 christos return r;
971 1.1 christos }
972 1.1 christos
973 1.1 christos int
974 1.1 christos bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
975 1.1 christos {
976 1.1 christos int r = 0;
977 1.1 christos ev_ssize_t old_limit, new_limit;
978 1.1 christos struct bufferevent_private *bevp;
979 1.1 christos BEV_LOCK(bev);
980 1.1 christos bevp = BEV_UPCAST(bev);
981 1.1 christos EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
982 1.1 christos old_limit = bevp->rate_limiting->limit.read_limit;
983 1.1 christos
984 1.1 christos new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
985 1.1 christos if (old_limit > 0 && new_limit <= 0) {
986 1.1 christos bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
987 1.1 christos if (event_add(&bevp->rate_limiting->refill_bucket_event,
988 1.1 christos &bevp->rate_limiting->cfg->tick_timeout) < 0)
989 1.1 christos r = -1;
990 1.1 christos } else if (old_limit <= 0 && new_limit > 0) {
991 1.1 christos if (!(bevp->write_suspended & BEV_SUSPEND_BW))
992 1.1 christos event_del(&bevp->rate_limiting->refill_bucket_event);
993 1.1 christos bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
994 1.1 christos }
995 1.1 christos
996 1.1 christos BEV_UNLOCK(bev);
997 1.1 christos return r;
998 1.1 christos }
999 1.1 christos
1000 1.1 christos int
1001 1.1 christos bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
1002 1.1 christos {
1003 1.1 christos /* XXXX this is mostly copy-and-paste from
1004 1.1 christos * bufferevent_decrement_read_limit */
1005 1.1 christos int r = 0;
1006 1.1 christos ev_ssize_t old_limit, new_limit;
1007 1.1 christos struct bufferevent_private *bevp;
1008 1.1 christos BEV_LOCK(bev);
1009 1.1 christos bevp = BEV_UPCAST(bev);
1010 1.1 christos EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1011 1.1 christos old_limit = bevp->rate_limiting->limit.write_limit;
1012 1.1 christos
1013 1.1 christos new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1014 1.1 christos if (old_limit > 0 && new_limit <= 0) {
1015 1.1 christos bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1016 1.1 christos if (event_add(&bevp->rate_limiting->refill_bucket_event,
1017 1.1 christos &bevp->rate_limiting->cfg->tick_timeout) < 0)
1018 1.1 christos r = -1;
1019 1.1 christos } else if (old_limit <= 0 && new_limit > 0) {
1020 1.1 christos if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1021 1.1 christos event_del(&bevp->rate_limiting->refill_bucket_event);
1022 1.1 christos bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1023 1.1 christos }
1024 1.1 christos
1025 1.1 christos BEV_UNLOCK(bev);
1026 1.1 christos return r;
1027 1.1 christos }
1028 1.1 christos
1029 1.1 christos int
1030 1.1 christos bufferevent_rate_limit_group_decrement_read(
1031 1.1 christos struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1032 1.1 christos {
1033 1.1 christos int r = 0;
1034 1.1 christos ev_ssize_t old_limit, new_limit;
1035 1.1 christos LOCK_GROUP(grp);
1036 1.1 christos old_limit = grp->rate_limit.read_limit;
1037 1.1 christos new_limit = (grp->rate_limit.read_limit -= decr);
1038 1.1 christos
1039 1.1 christos if (old_limit > 0 && new_limit <= 0) {
1040 1.1 christos bev_group_suspend_reading_(grp);
1041 1.1 christos } else if (old_limit <= 0 && new_limit > 0) {
1042 1.1 christos bev_group_unsuspend_reading_(grp);
1043 1.1 christos }
1044 1.1 christos
1045 1.1 christos UNLOCK_GROUP(grp);
1046 1.1 christos return r;
1047 1.1 christos }
1048 1.1 christos
1049 1.1 christos int
1050 1.1 christos bufferevent_rate_limit_group_decrement_write(
1051 1.1 christos struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1052 1.1 christos {
1053 1.1 christos int r = 0;
1054 1.1 christos ev_ssize_t old_limit, new_limit;
1055 1.1 christos LOCK_GROUP(grp);
1056 1.1 christos old_limit = grp->rate_limit.write_limit;
1057 1.1 christos new_limit = (grp->rate_limit.write_limit -= decr);
1058 1.1 christos
1059 1.1 christos if (old_limit > 0 && new_limit <= 0) {
1060 1.1 christos bev_group_suspend_writing_(grp);
1061 1.1 christos } else if (old_limit <= 0 && new_limit > 0) {
1062 1.1 christos bev_group_unsuspend_writing_(grp);
1063 1.1 christos }
1064 1.1 christos
1065 1.1 christos UNLOCK_GROUP(grp);
1066 1.1 christos return r;
1067 1.1 christos }
1068 1.1 christos
1069 1.1 christos void
1070 1.1 christos bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1071 1.1 christos ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1072 1.1 christos {
1073 1.1 christos EVUTIL_ASSERT(grp != NULL);
1074 1.1 christos if (total_read_out)
1075 1.1 christos *total_read_out = grp->total_read;
1076 1.1 christos if (total_written_out)
1077 1.1 christos *total_written_out = grp->total_written;
1078 1.1 christos }
1079 1.1 christos
1080 1.1 christos void
1081 1.1 christos bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1082 1.1 christos {
1083 1.1 christos grp->total_read = grp->total_written = 0;
1084 1.1 christos }
1085 1.1 christos
1086 1.1 christos int
1087 1.1 christos bufferevent_ratelim_init_(struct bufferevent_private *bev)
1088 1.1 christos {
1089 1.1 christos bev->rate_limiting = NULL;
1090 1.1 christos bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1091 1.1 christos bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1092 1.1 christos
1093 1.1 christos return 0;
1094 1.1 christos }
1095