Home | History | Annotate | Line # | Download | only in oqmgr
      1 /*	$NetBSD: qmgr_queue.c,v 1.2 2017/02/14 01:16:46 christos Exp $	*/
      2 
      3 /*++
      4 /* NAME
      5 /*	qmgr_queue 3
      6 /* SUMMARY
      7 /*	per-destination queues
      8 /* SYNOPSIS
      9 /*	#include "qmgr.h"
     10 /*
     11 /*	int	qmgr_queue_count;
     12 /*
     13 /*	QMGR_QUEUE *qmgr_queue_create(transport, name, nexthop)
     14 /*	QMGR_TRANSPORT *transport;
     15 /*	const char *name;
     16 /*	const char *nexthop;
     17 /*
     18 /*	void	qmgr_queue_done(queue)
     19 /*	QMGR_QUEUE *queue;
     20 /*
     21 /*	QMGR_QUEUE *qmgr_queue_find(transport, name)
     22 /*	QMGR_TRANSPORT *transport;
     23 /*	const char *name;
     24 /*
     25 /*	QMGR_QUEUE *qmgr_queue_select(transport)
     26 /*	QMGR_TRANSPORT *transport;
     27 /*
     28 /*	void	qmgr_queue_throttle(queue, dsn)
     29 /*	QMGR_QUEUE *queue;
     30 /*	DSN	*dsn;
     31 /*
     32 /*	void	qmgr_queue_unthrottle(queue)
     33 /*	QMGR_QUEUE *queue;
     34 /*
     35 /*	void	qmgr_queue_suspend(queue, delay)
     36 /*	QMGR_QUEUE *queue;
     37 /*	int	delay;
     38 /* DESCRIPTION
     39 /*	These routines add/delete/manipulate per-destination queues.
     40 /*	Each queue corresponds to a specific transport and destination.
     41 /*	Each queue has a `todo' list of delivery requests for that
     42 /*	destination, and a `busy' list of delivery requests in progress.
     43 /*
     44 /*	qmgr_queue_count is a global counter for the total number
     45 /*	of in-core queue structures.
     46 /*
     47 /*	qmgr_queue_create() creates an empty named queue for the named
     48 /*	transport and destination. The queue is given an initial
     49 /*	concurrency limit as specified with the
     50 /*	\fIinitial_destination_concurrency\fR configuration parameter,
     51 /*	provided that it does not exceed the transport-specific
     52 /*	concurrency limit.
     53 /*
     54 /*	qmgr_queue_done() disposes of a per-destination queue after all
     55 /*	its entries have been taken care of. It is an error to dispose
     56 /*	of a dead queue.
     57 /*
     58 /*	qmgr_queue_find() looks up the named queue for the named
     59 /*	transport. A null result means that the queue was not found.
     60 /*
     61 /*	qmgr_queue_select() uses a round-robin strategy to select
     62 /*	from the named transport one per-destination queue with a
     63 /*	non-empty `todo' list.
     64 /*
     65 /*	qmgr_queue_throttle() handles a delivery error, and decrements the
     66 /*	concurrency limit for the destination, with a lower bound of 1.
     67 /*	When the cohort failure bound is reached, qmgr_queue_throttle()
     68 /*	sets the concurrency limit to zero and starts a timer
     69 /*	to re-enable delivery to the destination after a configurable delay.
     70 /*
     71 /*	qmgr_queue_unthrottle() undoes qmgr_queue_throttle()'s effects.
     72 /*	The concurrency limit for the destination is incremented,
     73 /*	provided that it does not exceed the destination concurrency
     74 /*	limit specified for the transport. This routine implements
     75 /*	"slow open" mode, and eliminates the "thundering herd" problem.
     76 /*
     77 /*	qmgr_queue_suspend() suspends delivery for this destination
     78 /*	briefly.
     79 /* DIAGNOSTICS
     80 /*	Panic: consistency check failure.
     81 /* LICENSE
     82 /* .ad
     83 /* .fi
     84 /*	The Secure Mailer license must be distributed with this software.
     85 /* AUTHOR(S)
     86 /*	Wietse Venema
     87 /*	IBM T.J. Watson Research
     88 /*	P.O. Box 704
     89 /*	Yorktown Heights, NY 10598, USA
     90 /*--*/
     91 
     92 /* System library. */
     93 
     94 #include <sys_defs.h>
     95 #include <time.h>
     96 
     97 /* Utility library. */
     98 
     99 #include <msg.h>
    100 #include <mymalloc.h>
    101 #include <events.h>
    102 #include <htable.h>
    103 
    104 /* Global library. */
    105 
    106 #include <mail_params.h>
    107 #include <recipient_list.h>
    108 #include <mail_proto.h>			/* QMGR_LOG_WINDOW */
    109 
    110 /* Application-specific. */
    111 
    112 #include "qmgr.h"
    113 
    114 int     qmgr_queue_count;
    115 
    116 #define QMGR_ERROR_OR_RETRY_QUEUE(queue) \
    117 	(strcmp(queue->transport->name, MAIL_SERVICE_RETRY) == 0 \
    118 	    || strcmp(queue->transport->name, MAIL_SERVICE_ERROR) == 0)
    119 
    120 #define QMGR_LOG_FEEDBACK(feedback) \
    121 	if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
    122 	    msg_info("%s: feedback %g", myname, feedback);
    123 
    124 #define QMGR_LOG_WINDOW(queue) \
    125 	if (var_conc_feedback_debug && !QMGR_ERROR_OR_RETRY_QUEUE(queue)) \
    126 	    msg_info("%s: queue %s: limit %d window %d success %g failure %g fail_cohorts %g", \
    127 		    myname, queue->name, queue->transport->dest_concurrency_limit, \
    128 		    queue->window, queue->success, queue->failure, queue->fail_cohorts);
    129 
    130 /* qmgr_queue_resume - resume delivery to destination */
    131 
    132 static void qmgr_queue_resume(int event, void *context)
    133 {
    134     QMGR_QUEUE *queue = (QMGR_QUEUE *) context;
    135     const char *myname = "qmgr_queue_resume";
    136 
    137     /*
    138      * Sanity checks.
    139      */
    140     if (!QMGR_QUEUE_SUSPENDED(queue))
    141 	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
    142 
    143     /*
    144      * We can't simply force delivery on this queue: the transport's pending
    145      * count may already be maxed out, and there may be other constraints
    146      * that definitely should be none of our business. The best we can do is
    147      * to play by the same rules as everyone else: let qmgr_active_drain()
    148      * and round-robin selection take care of message selection.
    149      */
    150     queue->window = 1;
    151 
    152     /*
    153      * Every event handler that leaves a queue in the "ready" state should
    154      * remove the queue when it is empty.
    155      */
    156     if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0)
    157 	qmgr_queue_done(queue);
    158 }
    159 
    160 /* qmgr_queue_suspend - briefly suspend a destination */
    161 
    162 void    qmgr_queue_suspend(QMGR_QUEUE *queue, int delay)
    163 {
    164     const char *myname = "qmgr_queue_suspend";
    165 
    166     /*
    167      * Sanity checks.
    168      */
    169     if (!QMGR_QUEUE_READY(queue))
    170 	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
    171     if (queue->busy_refcount > 0)
    172 	msg_panic("%s: queue is busy", myname);
    173 
    174     /*
    175      * Set the queue status to "suspended". No-one is supposed to remove a
    176      * queue in suspended state.
    177      */
    178     queue->window = QMGR_QUEUE_STAT_SUSPENDED;
    179     event_request_timer(qmgr_queue_resume, (void *) queue, delay);
    180 }
    181 
    182 /* qmgr_queue_unthrottle_wrapper - in case (char *) != (struct *) */
    183 
    184 static void qmgr_queue_unthrottle_wrapper(int unused_event, void *context)
    185 {
    186     QMGR_QUEUE *queue = (QMGR_QUEUE *) context;
    187 
    188     /*
    189      * This routine runs when a wakeup timer goes off; it does not run in the
    190      * context of some queue manipulation. Therefore, it is safe to discard
    191      * this in-core queue when it is empty and when this site is not dead.
    192      */
    193     qmgr_queue_unthrottle(queue);
    194     if (QMGR_QUEUE_READY(queue) && queue->todo.next == 0 && queue->busy.next == 0)
    195 	qmgr_queue_done(queue);
    196 }
    197 
    198 /* qmgr_queue_unthrottle - give this destination another chance */
    199 
    200 void    qmgr_queue_unthrottle(QMGR_QUEUE *queue)
    201 {
    202     const char *myname = "qmgr_queue_unthrottle";
    203     QMGR_TRANSPORT *transport = queue->transport;
    204     double  feedback;
    205 
    206     if (msg_verbose)
    207 	msg_info("%s: queue %s", myname, queue->name);
    208 
    209     /*
    210      * Sanity checks.
    211      */
    212     if (!QMGR_QUEUE_THROTTLED(queue) && !QMGR_QUEUE_READY(queue))
    213 	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
    214 
    215     /*
    216      * Don't restart the negative feedback hysteresis cycle with every
    217      * positive feedback. Restart it only when we make a positive concurrency
    218      * adjustment (i.e. at the end of a positive feedback hysteresis cycle).
    219      * Otherwise negative feedback would be too aggressive: negative feedback
    220      * takes effect immediately at the start of its hysteresis cycle.
    221      */
    222     queue->fail_cohorts = 0;
    223 
    224     /*
    225      * Special case when this site was dead.
    226      */
    227     if (QMGR_QUEUE_THROTTLED(queue)) {
    228 	event_cancel_timer(qmgr_queue_unthrottle_wrapper, (void *) queue);
    229 	if (queue->dsn == 0)
    230 	    msg_panic("%s: queue %s: window 0 status 0", myname, queue->name);
    231 	dsn_free(queue->dsn);
    232 	queue->dsn = 0;
    233 	/* Back from the almost grave, best concurrency is anyone's guess. */
    234 	if (queue->busy_refcount > 0)
    235 	    queue->window = queue->busy_refcount;
    236 	else
    237 	    queue->window = transport->init_dest_concurrency;
    238 	queue->success = queue->failure = 0;
    239 	QMGR_LOG_WINDOW(queue);
    240 	return;
    241     }
    242 
    243     /*
    244      * Increase the destination's concurrency limit until we reach the
    245      * transport's concurrency limit. Allow for a margin the size of the
    246      * initial destination concurrency, so that we're not too gentle.
    247      *
    248      * Why is the concurrency increment based on preferred concurrency and not
    249      * on the number of outstanding delivery requests? The latter fluctuates
    250      * wildly when deliveries complete in bursts (artificial benchmark
    251      * measurements), and does not account for cached connections.
    252      *
    253      * Keep the window within reasonable distance from actual concurrency
    254      * otherwise negative feedback will be ineffective. This expression
    255      * assumes that busy_refcount changes gradually. This is invalid when
    256      * deliveries complete in bursts (artificial benchmark measurements).
    257      */
    258     if (transport->dest_concurrency_limit == 0
    259 	|| transport->dest_concurrency_limit > queue->window)
    260 	if (queue->window < queue->busy_refcount + transport->init_dest_concurrency) {
    261 	    feedback = QMGR_FEEDBACK_VAL(transport->pos_feedback, queue->window);
    262 	    QMGR_LOG_FEEDBACK(feedback);
    263 	    queue->success += feedback;
    264 	    /* Prepare for overshoot (feedback > hysteresis, rounding error). */
    265 	    while (queue->success + feedback / 2 >= transport->pos_feedback.hysteresis) {
    266 		queue->window += transport->pos_feedback.hysteresis;
    267 		queue->success -= transport->pos_feedback.hysteresis;
    268 		queue->failure = 0;
    269 	    }
    270 	    /* Prepare for overshoot. */
    271 	    if (transport->dest_concurrency_limit > 0
    272 		&& queue->window > transport->dest_concurrency_limit)
    273 		queue->window = transport->dest_concurrency_limit;
    274 	}
    275     QMGR_LOG_WINDOW(queue);
    276 }
    277 
    278 /* qmgr_queue_throttle - handle destination delivery failure */
    279 
    280 void    qmgr_queue_throttle(QMGR_QUEUE *queue, DSN *dsn)
    281 {
    282     const char *myname = "qmgr_queue_throttle";
    283     QMGR_TRANSPORT *transport = queue->transport;
    284     double  feedback;
    285 
    286     /*
    287      * Sanity checks.
    288      */
    289     if (!QMGR_QUEUE_READY(queue))
    290 	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
    291     if (queue->dsn)
    292 	msg_panic("%s: queue %s: spurious reason %s",
    293 		  myname, queue->name, queue->dsn->reason);
    294     if (msg_verbose)
    295 	msg_info("%s: queue %s: %s %s",
    296 		 myname, queue->name, dsn->status, dsn->reason);
    297 
    298     /*
    299      * Don't restart the positive feedback hysteresis cycle with every
    300      * negative feedback. Restart it only when we make a negative concurrency
    301      * adjustment (i.e. at the start of a negative feedback hysteresis
    302      * cycle). Otherwise positive feedback would be too weak (positive
    303      * feedback does not take effect until the end of its hysteresis cycle).
    304      */
    305 
    306     /*
    307      * This queue is declared dead after a configurable number of
    308      * pseudo-cohort failures.
    309      */
    310     if (QMGR_QUEUE_READY(queue)) {
    311 	queue->fail_cohorts += 1.0 / queue->window;
    312 	if (transport->fail_cohort_limit > 0
    313 	    && queue->fail_cohorts >= transport->fail_cohort_limit)
    314 	    queue->window = QMGR_QUEUE_STAT_THROTTLED;
    315     }
    316 
    317     /*
    318      * Decrease the destination's concurrency limit until we reach 1. Base
    319      * adjustments on the concurrency limit itself, instead of using the
    320      * actual concurrency. The latter fluctuates wildly when deliveries
    321      * complete in bursts (artificial benchmark measurements).
    322      *
    323      * Even after reaching 1, we maintain the negative hysteresis cycle so that
    324      * negative feedback can cancel out positive feedback.
    325      */
    326     if (QMGR_QUEUE_READY(queue)) {
    327 	feedback = QMGR_FEEDBACK_VAL(transport->neg_feedback, queue->window);
    328 	QMGR_LOG_FEEDBACK(feedback);
    329 	queue->failure -= feedback;
    330 	/* Prepare for overshoot (feedback > hysteresis, rounding error). */
    331 	while (queue->failure - feedback / 2 < 0) {
    332 	    queue->window -= transport->neg_feedback.hysteresis;
    333 	    queue->success = 0;
    334 	    queue->failure += transport->neg_feedback.hysteresis;
    335 	}
    336 	/* Prepare for overshoot. */
    337 	if (queue->window < 1)
    338 	    queue->window = 1;
    339     }
    340 
    341     /*
    342      * Special case for a site that just was declared dead.
    343      */
    344     if (QMGR_QUEUE_THROTTLED(queue)) {
    345 	queue->dsn = DSN_COPY(dsn);
    346 	event_request_timer(qmgr_queue_unthrottle_wrapper,
    347 			    (void *) queue, var_min_backoff_time);
    348 	queue->dflags = 0;
    349     }
    350     QMGR_LOG_WINDOW(queue);
    351 }
    352 
    353 /* qmgr_queue_select - select in-core queue for delivery */
    354 
    355 QMGR_QUEUE *qmgr_queue_select(QMGR_TRANSPORT *transport)
    356 {
    357     QMGR_QUEUE *queue;
    358 
    359     /*
    360      * If we find a suitable site, rotate the list to enforce round-robin
    361      * selection. See similar selection code in qmgr_transport_select().
    362      */
    363     for (queue = transport->queue_list.next; queue; queue = queue->peers.next) {
    364 	if (queue->window > queue->busy_refcount && queue->todo.next != 0) {
    365 	    QMGR_LIST_ROTATE(transport->queue_list, queue);
    366 	    if (msg_verbose)
    367 		msg_info("qmgr_queue_select: %s", queue->name);
    368 	    return (queue);
    369 	}
    370     }
    371     return (0);
    372 }
    373 
    374 /* qmgr_queue_done - delete in-core queue for site */
    375 
    376 void    qmgr_queue_done(QMGR_QUEUE *queue)
    377 {
    378     const char *myname = "qmgr_queue_done";
    379     QMGR_TRANSPORT *transport = queue->transport;
    380 
    381     /*
    382      * Sanity checks. It is an error to delete an in-core queue with pending
    383      * messages or timers.
    384      */
    385     if (queue->busy_refcount != 0 || queue->todo_refcount != 0)
    386 	msg_panic("%s: refcount: %d", myname,
    387 		  queue->busy_refcount + queue->todo_refcount);
    388     if (queue->todo.next || queue->busy.next)
    389 	msg_panic("%s: queue not empty: %s", myname, queue->name);
    390     if (!QMGR_QUEUE_READY(queue))
    391 	msg_panic("%s: bad queue status: %s", myname, QMGR_QUEUE_STATUS(queue));
    392     if (queue->dsn)
    393 	msg_panic("%s: queue %s: spurious reason %s",
    394 		  myname, queue->name, queue->dsn->reason);
    395 
    396     /*
    397      * Clean up this in-core queue.
    398      */
    399     QMGR_LIST_UNLINK(transport->queue_list, QMGR_QUEUE *, queue);
    400     htable_delete(transport->queue_byname, queue->name, (void (*) (void *)) 0);
    401     myfree(queue->name);
    402     myfree(queue->nexthop);
    403     qmgr_queue_count--;
    404     myfree((void *) queue);
    405 }
    406 
    407 /* qmgr_queue_create - create in-core queue for site */
    408 
    409 QMGR_QUEUE *qmgr_queue_create(QMGR_TRANSPORT *transport, const char *name,
    410 			              const char *nexthop)
    411 {
    412     QMGR_QUEUE *queue;
    413 
    414     /*
    415      * If possible, choose an initial concurrency of > 1 so that one bad
    416      * message or one bad network won't slow us down unnecessarily.
    417      */
    418 
    419     queue = (QMGR_QUEUE *) mymalloc(sizeof(QMGR_QUEUE));
    420     qmgr_queue_count++;
    421     queue->dflags = 0;
    422     queue->last_done = 0;
    423     queue->name = mystrdup(name);
    424     queue->nexthop = mystrdup(nexthop);
    425     queue->todo_refcount = 0;
    426     queue->busy_refcount = 0;
    427     queue->transport = transport;
    428     queue->window = transport->init_dest_concurrency;
    429     queue->success = queue->failure = queue->fail_cohorts = 0;
    430     QMGR_LIST_INIT(queue->todo);
    431     QMGR_LIST_INIT(queue->busy);
    432     queue->dsn = 0;
    433     queue->clog_time_to_warn = 0;
    434     QMGR_LIST_PREPEND(transport->queue_list, queue);
    435     htable_enter(transport->queue_byname, name, (void *) queue);
    436     return (queue);
    437 }
    438 
    439 /* qmgr_queue_find - find in-core named queue */
    440 
    441 QMGR_QUEUE *qmgr_queue_find(QMGR_TRANSPORT *transport, const char *name)
    442 {
    443     return ((QMGR_QUEUE *) htable_find(transport->queue_byname, name));
    444 }
    445