Home | History | Annotate | Line # | Download | only in dist
evrpc.c revision 1.2
      1 /*	$NetBSD: evrpc.c,v 1.2 2013/04/11 16:56:41 christos Exp $	*/
      2 /*
      3  * Copyright (c) 2000-2007 Niels Provos <provos (at) citi.umich.edu>
      4  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
      5  *
      6  * Redistribution and use in source and binary forms, with or without
      7  * modification, are permitted provided that the following conditions
      8  * are met:
      9  * 1. Redistributions of source code must retain the above copyright
     10  *    notice, this list of conditions and the following disclaimer.
     11  * 2. Redistributions in binary form must reproduce the above copyright
     12  *    notice, this list of conditions and the following disclaimer in the
     13  *    documentation and/or other materials provided with the distribution.
     14  * 3. The name of the author may not be used to endorse or promote products
     15  *    derived from this software without specific prior written permission.
     16  *
     17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     27  */
     28 #include "event2/event-config.h"
     29 #include <sys/cdefs.h>
     30 __RCSID("$NetBSD: evrpc.c,v 1.2 2013/04/11 16:56:41 christos Exp $");
     31 
     32 #ifdef WIN32
     33 #define WIN32_LEAN_AND_MEAN
     34 #include <winsock2.h>
     35 #include <windows.h>
     36 #undef WIN32_LEAN_AND_MEAN
     37 #endif
     38 
     39 #include <sys/types.h>
     40 #ifndef WIN32
     41 #include <sys/socket.h>
     42 #endif
     43 #ifdef _EVENT_HAVE_SYS_TIME_H
     44 #include <sys/time.h>
     45 #endif
     46 #include <sys/queue.h>
     47 #include <stdio.h>
     48 #include <stdlib.h>
     49 #ifndef WIN32
     50 #include <unistd.h>
     51 #endif
     52 #include <errno.h>
     53 #include <signal.h>
     54 #include <string.h>
     55 
     56 #include <sys/queue.h>
     57 
     58 #include "event2/event.h"
     59 #include "event2/event_struct.h"
     60 #include "event2/rpc.h"
     61 #include "event2/rpc_struct.h"
     62 #include "evrpc-internal.h"
     63 #include "event2/http.h"
     64 #include "event2/buffer.h"
     65 #include "event2/tag.h"
     66 #include "event2/http_struct.h"
     67 #include "event2/http_compat.h"
     68 #include "event2/util.h"
     69 #include "util-internal.h"
     70 #include "log-internal.h"
     71 #include "mm-internal.h"
     72 
     73 struct evrpc_base *
     74 evrpc_init(struct evhttp *http_server)
     75 {
     76 	struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base));
     77 	if (base == NULL)
     78 		return (NULL);
     79 
     80 	/* we rely on the tagging sub system */
     81 	evtag_init();
     82 
     83 	TAILQ_INIT(&base->registered_rpcs);
     84 	TAILQ_INIT(&base->input_hooks);
     85 	TAILQ_INIT(&base->output_hooks);
     86 
     87 	TAILQ_INIT(&base->paused_requests);
     88 
     89 	base->http_server = http_server;
     90 
     91 	return (base);
     92 }
     93 
     94 void
     95 evrpc_free(struct evrpc_base *base)
     96 {
     97 	struct evrpc *rpc;
     98 	struct evrpc_hook *hook;
     99 	struct evrpc_hook_ctx *paused;
    100 	int r;
    101 
    102 	while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
    103 		r = evrpc_unregister_rpc(base, rpc->uri);
    104 		EVUTIL_ASSERT(r == 0);
    105 	}
    106 	while ((paused = TAILQ_FIRST(&base->paused_requests)) != NULL) {
    107 		TAILQ_REMOVE(&base->paused_requests, paused, next);
    108 		mm_free(paused);
    109 	}
    110 	while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
    111 		r = evrpc_remove_hook(base, EVRPC_INPUT, hook);
    112 		EVUTIL_ASSERT(r);
    113 	}
    114 	while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
    115 		r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook);
    116 		EVUTIL_ASSERT(r);
    117 	}
    118 	mm_free(base);
    119 }
    120 
    121 void *
    122 evrpc_add_hook(void *vbase,
    123     enum EVRPC_HOOK_TYPE hook_type,
    124     int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *),
    125     void *cb_arg)
    126 {
    127 	struct _evrpc_hooks *base = vbase;
    128 	struct evrpc_hook_list *head = NULL;
    129 	struct evrpc_hook *hook = NULL;
    130 	switch (hook_type) {
    131 	case EVRPC_INPUT:
    132 		head = &base->in_hooks;
    133 		break;
    134 	case EVRPC_OUTPUT:
    135 		head = &base->out_hooks;
    136 		break;
    137 	default:
    138 		EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
    139 	}
    140 
    141 	hook = mm_calloc(1, sizeof(struct evrpc_hook));
    142 	EVUTIL_ASSERT(hook != NULL);
    143 
    144 	hook->process = cb;
    145 	hook->process_arg = cb_arg;
    146 	TAILQ_INSERT_TAIL(head, hook, next);
    147 
    148 	return (hook);
    149 }
    150 
    151 static int
    152 evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
    153 {
    154 	struct evrpc_hook *hook = NULL;
    155 	TAILQ_FOREACH(hook, head, next) {
    156 		if (hook == handle) {
    157 			TAILQ_REMOVE(head, hook, next);
    158 			mm_free(hook);
    159 			return (1);
    160 		}
    161 	}
    162 
    163 	return (0);
    164 }
    165 
    166 /*
    167  * remove the hook specified by the handle
    168  */
    169 
    170 int
    171 evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
    172 {
    173 	struct _evrpc_hooks *base = vbase;
    174 	struct evrpc_hook_list *head = NULL;
    175 	switch (hook_type) {
    176 	case EVRPC_INPUT:
    177 		head = &base->in_hooks;
    178 		break;
    179 	case EVRPC_OUTPUT:
    180 		head = &base->out_hooks;
    181 		break;
    182 	default:
    183 		EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
    184 	}
    185 
    186 	return (evrpc_remove_hook_internal(head, handle));
    187 }
    188 
    189 static int
    190 evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx,
    191     struct evhttp_request *req, struct evbuffer *evbuf)
    192 {
    193 	struct evrpc_hook *hook;
    194 	TAILQ_FOREACH(hook, head, next) {
    195 		int res = hook->process(ctx, req, evbuf, hook->process_arg);
    196 		if (res != EVRPC_CONTINUE)
    197 			return (res);
    198 	}
    199 
    200 	return (EVRPC_CONTINUE);
    201 }
    202 
    203 static void evrpc_pool_schedule(struct evrpc_pool *pool);
    204 static void evrpc_request_cb(struct evhttp_request *, void *);
    205 
    206 /*
    207  * Registers a new RPC with the HTTP server.   The evrpc object is expected
    208  * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
    209  * calls this function.
    210  */
    211 
    212 static char *
    213 evrpc_construct_uri(const char *uri)
    214 {
    215 	char *constructed_uri;
    216 	size_t constructed_uri_len;
    217 
    218 	constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
    219 	if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL)
    220 		event_err(1, "%s: failed to register rpc at %s",
    221 		    __func__, uri);
    222 	memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
    223 	memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
    224 	constructed_uri[constructed_uri_len - 1] = '\0';
    225 
    226 	return (constructed_uri);
    227 }
    228 
    229 int
    230 evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
    231     void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
    232 {
    233 	char *constructed_uri = evrpc_construct_uri(rpc->uri);
    234 
    235 	rpc->base = base;
    236 	rpc->cb = cb;
    237 	rpc->cb_arg = cb_arg;
    238 
    239 	TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
    240 
    241 	evhttp_set_cb(base->http_server,
    242 	    constructed_uri,
    243 	    evrpc_request_cb,
    244 	    rpc);
    245 
    246 	mm_free(constructed_uri);
    247 
    248 	return (0);
    249 }
    250 
    251 int
    252 evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
    253 {
    254 	char *registered_uri = NULL;
    255 	struct evrpc *rpc;
    256 	int r;
    257 
    258 	/* find the right rpc; linear search might be slow */
    259 	TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
    260 		if (strcmp(rpc->uri, name) == 0)
    261 			break;
    262 	}
    263 	if (rpc == NULL) {
    264 		/* We did not find an RPC with this name */
    265 		return (-1);
    266 	}
    267 	TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
    268 
    269 	registered_uri = evrpc_construct_uri(name);
    270 
    271 	/* remove the http server callback */
    272 	r = evhttp_del_cb(base->http_server, registered_uri);
    273 	EVUTIL_ASSERT(r == 0);
    274 
    275 	mm_free(registered_uri);
    276 
    277 	mm_free(__UNCONST(rpc->uri));
    278 	mm_free(rpc);
    279 	return (0);
    280 }
    281 
    282 static int evrpc_pause_request(void *vbase, void *ctx,
    283     void (*cb)(void *, enum EVRPC_HOOK_RESULT));
    284 static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT);
    285 
    286 static void
    287 evrpc_request_cb(struct evhttp_request *req, void *arg)
    288 {
    289 	struct evrpc *rpc = arg;
    290 	struct evrpc_req_generic *rpc_state = NULL;
    291 
    292 	/* let's verify the outside parameters */
    293 	if (req->type != EVHTTP_REQ_POST ||
    294 	    evbuffer_get_length(req->input_buffer) <= 0)
    295 		goto error;
    296 
    297 	rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic));
    298 	if (rpc_state == NULL)
    299 		goto error;
    300 	rpc_state->rpc = rpc;
    301 	rpc_state->http_req = req;
    302 	rpc_state->rpc_data = NULL;
    303 
    304 	if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) {
    305 		int hook_res;
    306 
    307 		evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon);
    308 
    309 		/*
    310 		 * allow hooks to modify the outgoing request
    311 		 */
    312 		hook_res = evrpc_process_hooks(&rpc->base->input_hooks,
    313 		    rpc_state, req, req->input_buffer);
    314 		switch (hook_res) {
    315 		case EVRPC_TERMINATE:
    316 			goto error;
    317 		case EVRPC_PAUSE:
    318 			evrpc_pause_request(rpc->base, rpc_state,
    319 			    evrpc_request_cb_closure);
    320 			return;
    321 		case EVRPC_CONTINUE:
    322 			break;
    323 		default:
    324 			EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
    325 			    hook_res == EVRPC_CONTINUE ||
    326 			    hook_res == EVRPC_PAUSE);
    327 		}
    328 	}
    329 
    330 	evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE);
    331 	return;
    332 
    333 error:
    334 	if (rpc_state != NULL)
    335 		evrpc_reqstate_free(rpc_state);
    336 	evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
    337 	return;
    338 }
    339 
    340 static void
    341 evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
    342 {
    343 	struct evrpc_req_generic *rpc_state = arg;
    344 	struct evrpc *rpc;
    345 	struct evhttp_request *req;
    346 
    347 	EVUTIL_ASSERT(rpc_state);
    348 	rpc = rpc_state->rpc;
    349 	req = rpc_state->http_req;
    350 
    351 	if (hook_res == EVRPC_TERMINATE)
    352 		goto error;
    353 
    354 	/* let's check that we can parse the request */
    355 	rpc_state->request = rpc->request_new(rpc->request_new_arg);
    356 	if (rpc_state->request == NULL)
    357 		goto error;
    358 
    359 	if (rpc->request_unmarshal(
    360 		    rpc_state->request, req->input_buffer) == -1) {
    361 		/* we failed to parse the request; that's a bummer */
    362 		goto error;
    363 	}
    364 
    365 	/* at this point, we have a well formed request, prepare the reply */
    366 
    367 	rpc_state->reply = rpc->reply_new(rpc->reply_new_arg);
    368 	if (rpc_state->reply == NULL)
    369 		goto error;
    370 
    371 	/* give the rpc to the user; they can deal with it */
    372 	rpc->cb(rpc_state, rpc->cb_arg);
    373 
    374 	return;
    375 
    376 error:
    377 	if (rpc_state != NULL)
    378 		evrpc_reqstate_free(rpc_state);
    379 	evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
    380 	return;
    381 }
    382 
    383 
    384 void
    385 evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
    386 {
    387 	struct evrpc *rpc;
    388 	EVUTIL_ASSERT(rpc_state != NULL);
    389 	rpc = rpc_state->rpc;
    390 
    391 	/* clean up all memory */
    392 	if (rpc_state->hook_meta != NULL)
    393 		evrpc_hook_context_free(rpc_state->hook_meta);
    394 	if (rpc_state->request != NULL)
    395 		rpc->request_free(rpc_state->request);
    396 	if (rpc_state->reply != NULL)
    397 		rpc->reply_free(rpc_state->reply);
    398 	if (rpc_state->rpc_data != NULL)
    399 		evbuffer_free(rpc_state->rpc_data);
    400 	mm_free(rpc_state);
    401 }
    402 
    403 static void
    404 evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT);
    405 
    406 void
    407 evrpc_request_done(struct evrpc_req_generic *rpc_state)
    408 {
    409 	struct evhttp_request *req;
    410 	struct evrpc *rpc;
    411 
    412 	EVUTIL_ASSERT(rpc_state);
    413 
    414 	req = rpc_state->http_req;
    415 	rpc = rpc_state->rpc;
    416 
    417 	if (rpc->reply_complete(rpc_state->reply) == -1) {
    418 		/* the reply was not completely filled in.  error out */
    419 		goto error;
    420 	}
    421 
    422 	if ((rpc_state->rpc_data = evbuffer_new()) == NULL) {
    423 		/* out of memory */
    424 		goto error;
    425 	}
    426 
    427 	/* serialize the reply */
    428 	rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply);
    429 
    430 	if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) {
    431 		int hook_res;
    432 
    433 		evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon);
    434 
    435 		/* do hook based tweaks to the request */
    436 		hook_res = evrpc_process_hooks(&rpc->base->output_hooks,
    437 		    rpc_state, req, rpc_state->rpc_data);
    438 		switch (hook_res) {
    439 		case EVRPC_TERMINATE:
    440 			goto error;
    441 		case EVRPC_PAUSE:
    442 			if (evrpc_pause_request(rpc->base, rpc_state,
    443 				evrpc_request_done_closure) == -1)
    444 				goto error;
    445 			return;
    446 		case EVRPC_CONTINUE:
    447 			break;
    448 		default:
    449 			EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
    450 			    hook_res == EVRPC_CONTINUE ||
    451 			    hook_res == EVRPC_PAUSE);
    452 		}
    453 	}
    454 
    455 	evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE);
    456 	return;
    457 
    458 error:
    459 	if (rpc_state != NULL)
    460 		evrpc_reqstate_free(rpc_state);
    461 	evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
    462 	return;
    463 }
    464 
    465 void *
    466 evrpc_get_request(struct evrpc_req_generic *req)
    467 {
    468 	return req->request;
    469 }
    470 
    471 void *
    472 evrpc_get_reply(struct evrpc_req_generic *req)
    473 {
    474 	return req->reply;
    475 }
    476 
    477 static void
    478 evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
    479 {
    480 	struct evrpc_req_generic *rpc_state = arg;
    481 	struct evhttp_request *req;
    482 	EVUTIL_ASSERT(rpc_state);
    483 	req = rpc_state->http_req;
    484 
    485 	if (hook_res == EVRPC_TERMINATE)
    486 		goto error;
    487 
    488 	/* on success, we are going to transmit marshaled binary data */
    489 	if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
    490 		evhttp_add_header(req->output_headers,
    491 		    "Content-Type", "application/octet-stream");
    492 	}
    493 	evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data);
    494 
    495 	evrpc_reqstate_free(rpc_state);
    496 
    497 	return;
    498 
    499 error:
    500 	if (rpc_state != NULL)
    501 		evrpc_reqstate_free(rpc_state);
    502 	evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
    503 	return;
    504 }
    505 
    506 
    507 /* Client implementation of RPC site */
    508 
    509 static int evrpc_schedule_request(struct evhttp_connection *connection,
    510     struct evrpc_request_wrapper *ctx);
    511 
    512 struct evrpc_pool *
    513 evrpc_pool_new(struct event_base *base)
    514 {
    515 	struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool));
    516 	if (pool == NULL)
    517 		return (NULL);
    518 
    519 	TAILQ_INIT(&pool->connections);
    520 	TAILQ_INIT(&pool->requests);
    521 
    522 	TAILQ_INIT(&pool->paused_requests);
    523 
    524 	TAILQ_INIT(&pool->input_hooks);
    525 	TAILQ_INIT(&pool->output_hooks);
    526 
    527 	pool->base = base;
    528 	pool->timeout = -1;
    529 
    530 	return (pool);
    531 }
    532 
    533 static void
    534 evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
    535 {
    536 	if (request->hook_meta != NULL)
    537 		evrpc_hook_context_free(request->hook_meta);
    538 	mm_free(request->name);
    539 	mm_free(request);
    540 }
    541 
    542 void
    543 evrpc_pool_free(struct evrpc_pool *pool)
    544 {
    545 	struct evhttp_connection *connection;
    546 	struct evrpc_request_wrapper *request;
    547 	struct evrpc_hook_ctx *paused;
    548 	struct evrpc_hook *hook;
    549 	int r;
    550 
    551 	while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
    552 		TAILQ_REMOVE(&pool->requests, request, next);
    553 		evrpc_request_wrapper_free(request);
    554 	}
    555 
    556 	while ((paused = TAILQ_FIRST(&pool->paused_requests)) != NULL) {
    557 		TAILQ_REMOVE(&pool->paused_requests, paused, next);
    558 		mm_free(paused);
    559 	}
    560 
    561 	while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
    562 		TAILQ_REMOVE(&pool->connections, connection, next);
    563 		evhttp_connection_free(connection);
    564 	}
    565 
    566 	while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
    567 		r = evrpc_remove_hook(pool, EVRPC_INPUT, hook);
    568 		EVUTIL_ASSERT(r);
    569 	}
    570 
    571 	while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
    572 		r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook);
    573 		EVUTIL_ASSERT(r);
    574 	}
    575 
    576 	mm_free(pool);
    577 }
    578 
    579 /*
    580  * Add a connection to the RPC pool.   A request scheduled on the pool
    581  * may use any available connection.
    582  */
    583 
    584 void
    585 evrpc_pool_add_connection(struct evrpc_pool *pool,
    586     struct evhttp_connection *connection)
    587 {
    588 	EVUTIL_ASSERT(connection->http_server == NULL);
    589 	TAILQ_INSERT_TAIL(&pool->connections, connection, next);
    590 
    591 	/*
    592 	 * associate an event base with this connection
    593 	 */
    594 	if (pool->base != NULL)
    595 		evhttp_connection_set_base(connection, pool->base);
    596 
    597 	/*
    598 	 * unless a timeout was specifically set for a connection,
    599 	 * the connection inherits the timeout from the pool.
    600 	 */
    601 	if (connection->timeout == -1)
    602 		connection->timeout = pool->timeout;
    603 
    604 	/*
    605 	 * if we have any requests pending, schedule them with the new
    606 	 * connections.
    607 	 */
    608 
    609 	if (TAILQ_FIRST(&pool->requests) != NULL) {
    610 		struct evrpc_request_wrapper *request =
    611 		    TAILQ_FIRST(&pool->requests);
    612 		TAILQ_REMOVE(&pool->requests, request, next);
    613 		evrpc_schedule_request(connection, request);
    614 	}
    615 }
    616 
    617 void
    618 evrpc_pool_remove_connection(struct evrpc_pool *pool,
    619     struct evhttp_connection *connection)
    620 {
    621 	TAILQ_REMOVE(&pool->connections, connection, next);
    622 }
    623 
    624 void
    625 evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
    626 {
    627 	struct evhttp_connection *evcon;
    628 	TAILQ_FOREACH(evcon, &pool->connections, next) {
    629 		evcon->timeout = timeout_in_secs;
    630 	}
    631 	pool->timeout = timeout_in_secs;
    632 }
    633 
    634 
    635 static void evrpc_reply_done(struct evhttp_request *, void *);
    636 static void evrpc_request_timeout(evutil_socket_t, short, void *);
    637 
    638 /*
    639  * Finds a connection object associated with the pool that is currently
    640  * idle and can be used to make a request.
    641  */
    642 static struct evhttp_connection *
    643 evrpc_pool_find_connection(struct evrpc_pool *pool)
    644 {
    645 	struct evhttp_connection *connection;
    646 	TAILQ_FOREACH(connection, &pool->connections, next) {
    647 		if (TAILQ_FIRST(&connection->requests) == NULL)
    648 			return (connection);
    649 	}
    650 
    651 	return (NULL);
    652 }
    653 
    654 /*
    655  * Prototypes responsible for evrpc scheduling and hooking
    656  */
    657 
    658 static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT);
    659 
    660 /*
    661  * We assume that the ctx is no longer queued on the pool.
    662  */
    663 static int
    664 evrpc_schedule_request(struct evhttp_connection *connection,
    665     struct evrpc_request_wrapper *ctx)
    666 {
    667 	struct evhttp_request *req = NULL;
    668 	struct evrpc_pool *pool = ctx->pool;
    669 	struct evrpc_status status;
    670 
    671 	if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
    672 		goto error;
    673 
    674 	/* serialize the request data into the output buffer */
    675 	ctx->request_marshal(req->output_buffer, ctx->request);
    676 
    677 	/* we need to know the connection that we might have to abort */
    678 	ctx->evcon = connection;
    679 
    680 	/* if we get paused we also need to know the request */
    681 	ctx->req = req;
    682 
    683 	if (TAILQ_FIRST(&pool->output_hooks) != NULL) {
    684 		int hook_res;
    685 
    686 		evrpc_hook_associate_meta(&ctx->hook_meta, connection);
    687 
    688 		/* apply hooks to the outgoing request */
    689 		hook_res = evrpc_process_hooks(&pool->output_hooks,
    690 		    ctx, req, req->output_buffer);
    691 
    692 		switch (hook_res) {
    693 		case EVRPC_TERMINATE:
    694 			goto error;
    695 		case EVRPC_PAUSE:
    696 			/* we need to be explicitly resumed */
    697 			if (evrpc_pause_request(pool, ctx,
    698 				evrpc_schedule_request_closure) == -1)
    699 				goto error;
    700 			return (0);
    701 		case EVRPC_CONTINUE:
    702 			/* we can just continue */
    703 			break;
    704 		default:
    705 			EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
    706 			    hook_res == EVRPC_CONTINUE ||
    707 			    hook_res == EVRPC_PAUSE);
    708 		}
    709 	}
    710 
    711 	evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE);
    712 	return (0);
    713 
    714 error:
    715 	memset(&status, 0, sizeof(status));
    716 	status.error = EVRPC_STATUS_ERR_UNSTARTED;
    717 	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
    718 	evrpc_request_wrapper_free(ctx);
    719 	return (-1);
    720 }
    721 
    722 static void
    723 evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
    724 {
    725 	struct evrpc_request_wrapper *ctx = arg;
    726 	struct evhttp_connection *connection = ctx->evcon;
    727 	struct evhttp_request *req = ctx->req;
    728 	struct evrpc_pool *pool = ctx->pool;
    729 	struct evrpc_status status;
    730 	char *uri = NULL;
    731 	int res = 0;
    732 
    733 	if (hook_res == EVRPC_TERMINATE)
    734 		goto error;
    735 
    736 	uri = evrpc_construct_uri(ctx->name);
    737 	if (uri == NULL)
    738 		goto error;
    739 
    740 	if (pool->timeout > 0) {
    741 		/*
    742 		 * a timeout after which the whole rpc is going to be aborted.
    743 		 */
    744 		struct timeval tv;
    745 		evutil_timerclear(&tv);
    746 		tv.tv_sec = pool->timeout;
    747 		evtimer_add(&ctx->ev_timeout, &tv);
    748 	}
    749 
    750 	/* start the request over the connection */
    751 	res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
    752 	mm_free(uri);
    753 
    754 	if (res == -1)
    755 		goto error;
    756 
    757 	return;
    758 
    759 error:
    760 	memset(&status, 0, sizeof(status));
    761 	status.error = EVRPC_STATUS_ERR_UNSTARTED;
    762 	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
    763 	evrpc_request_wrapper_free(ctx);
    764 }
    765 
    766 /* we just queue the paused request on the pool under the req object */
    767 static int
    768 evrpc_pause_request(void *vbase, void *ctx,
    769     void (*cb)(void *, enum EVRPC_HOOK_RESULT))
    770 {
    771 	struct _evrpc_hooks *base = vbase;
    772 	struct evrpc_hook_ctx *paused = mm_malloc(sizeof(*paused));
    773 	if (paused == NULL)
    774 		return (-1);
    775 
    776 	paused->ctx = ctx;
    777 	paused->cb = cb;
    778 
    779 	TAILQ_INSERT_TAIL(&base->pause_requests, paused, next);
    780 	return (0);
    781 }
    782 
    783 int
    784 evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res)
    785 {
    786 	struct _evrpc_hooks *base = vbase;
    787 	struct evrpc_pause_list *head = &base->pause_requests;
    788 	struct evrpc_hook_ctx *paused;
    789 
    790 	TAILQ_FOREACH(paused, head, next) {
    791 		if (paused->ctx == ctx)
    792 			break;
    793 	}
    794 
    795 	if (paused == NULL)
    796 		return (-1);
    797 
    798 	(*paused->cb)(paused->ctx, res);
    799 	TAILQ_REMOVE(head, paused, next);
    800 	mm_free(paused);
    801 	return (0);
    802 }
    803 
    804 int
    805 evrpc_make_request(struct evrpc_request_wrapper *ctx)
    806 {
    807 	struct evrpc_pool *pool = ctx->pool;
    808 
    809 	/* initialize the event structure for this rpc */
    810 	evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx);
    811 
    812 	/* we better have some available connections on the pool */
    813 	EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL);
    814 
    815 	/*
    816 	 * if no connection is available, we queue the request on the pool,
    817 	 * the next time a connection is empty, the rpc will be send on that.
    818 	 */
    819 	TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
    820 
    821 	evrpc_pool_schedule(pool);
    822 
    823 	return (0);
    824 }
    825 
    826 
    827 struct evrpc_request_wrapper *
    828 evrpc_make_request_ctx(
    829 	struct evrpc_pool *pool, void *request, void *reply,
    830 	const char *rpcname,
    831 	void (*req_marshal)(struct evbuffer*, void *),
    832 	void (*rpl_clear)(void *),
    833 	int (*rpl_unmarshal)(void *, struct evbuffer *),
    834 	void (*cb)(struct evrpc_status *, void *, void *, void *),
    835 	void *cbarg)
    836 {
    837 	struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *)
    838 	    mm_malloc(sizeof(struct evrpc_request_wrapper));
    839 	if (ctx == NULL)
    840 		return (NULL);
    841 
    842 	ctx->pool = pool;
    843 	ctx->hook_meta = NULL;
    844 	ctx->evcon = NULL;
    845 	ctx->name = mm_strdup(rpcname);
    846 	if (ctx->name == NULL) {
    847 		mm_free(ctx);
    848 		return (NULL);
    849 	}
    850 	ctx->cb = cb;
    851 	ctx->cb_arg = cbarg;
    852 	ctx->request = request;
    853 	ctx->reply = reply;
    854 	ctx->request_marshal = req_marshal;
    855 	ctx->reply_clear = rpl_clear;
    856 	ctx->reply_unmarshal = rpl_unmarshal;
    857 
    858 	return (ctx);
    859 }
    860 
    861 static void
    862 evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT);
    863 
    864 static void
    865 evrpc_reply_done(struct evhttp_request *req, void *arg)
    866 {
    867 	struct evrpc_request_wrapper *ctx = arg;
    868 	struct evrpc_pool *pool = ctx->pool;
    869 	int hook_res = EVRPC_CONTINUE;
    870 
    871 	/* cancel any timeout we might have scheduled */
    872 	event_del(&ctx->ev_timeout);
    873 
    874 	ctx->req = req;
    875 
    876 	/* we need to get the reply now */
    877 	if (req == NULL) {
    878 		evrpc_reply_done_closure(ctx, EVRPC_CONTINUE);
    879 		return;
    880 	}
    881 
    882 	if (TAILQ_FIRST(&pool->input_hooks) != NULL) {
    883 		evrpc_hook_associate_meta(&ctx->hook_meta, ctx->evcon);
    884 
    885 		/* apply hooks to the incoming request */
    886 		hook_res = evrpc_process_hooks(&pool->input_hooks,
    887 		    ctx, req, req->input_buffer);
    888 
    889 		switch (hook_res) {
    890 		case EVRPC_TERMINATE:
    891 		case EVRPC_CONTINUE:
    892 			break;
    893 		case EVRPC_PAUSE:
    894 			/*
    895 			 * if we get paused we also need to know the
    896 			 * request.  unfortunately, the underlying
    897 			 * layer is going to free it.  we need to
    898 			 * request ownership explicitly
    899 			 */
    900 			if (req != NULL)
    901 				evhttp_request_own(req);
    902 
    903 			evrpc_pause_request(pool, ctx,
    904 			    evrpc_reply_done_closure);
    905 			return;
    906 		default:
    907 			EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
    908 			    hook_res == EVRPC_CONTINUE ||
    909 			    hook_res == EVRPC_PAUSE);
    910 		}
    911 	}
    912 
    913 	evrpc_reply_done_closure(ctx, hook_res);
    914 
    915 	/* http request is being freed by underlying layer */
    916 }
    917 
    918 static void
    919 evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
    920 {
    921 	struct evrpc_request_wrapper *ctx = arg;
    922 	struct evhttp_request *req = ctx->req;
    923 	struct evrpc_pool *pool = ctx->pool;
    924 	struct evrpc_status status;
    925 	int res = -1;
    926 
    927 	memset(&status, 0, sizeof(status));
    928 	status.http_req = req;
    929 
    930 	/* we need to get the reply now */
    931 	if (req == NULL) {
    932 		status.error = EVRPC_STATUS_ERR_TIMEOUT;
    933 	} else if (hook_res == EVRPC_TERMINATE) {
    934 		status.error = EVRPC_STATUS_ERR_HOOKABORTED;
    935 	} else {
    936 		res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
    937 		if (res == -1)
    938 			status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
    939 	}
    940 
    941 	if (res == -1) {
    942 		/* clear everything that we might have written previously */
    943 		ctx->reply_clear(ctx->reply);
    944 	}
    945 
    946 	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
    947 
    948 	evrpc_request_wrapper_free(ctx);
    949 
    950 	/* the http layer owned the original request structure, but if we
    951 	 * got paused, we asked for ownership and need to free it here. */
    952 	if (req != NULL && evhttp_request_is_owned(req))
    953 		evhttp_request_free(req);
    954 
    955 	/* see if we can schedule another request */
    956 	evrpc_pool_schedule(pool);
    957 }
    958 
    959 static void
    960 evrpc_pool_schedule(struct evrpc_pool *pool)
    961 {
    962 	struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
    963 	struct evhttp_connection *evcon;
    964 
    965 	/* if no requests are pending, we have no work */
    966 	if (ctx == NULL)
    967 		return;
    968 
    969 	if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
    970 		TAILQ_REMOVE(&pool->requests, ctx, next);
    971 		evrpc_schedule_request(evcon, ctx);
    972 	}
    973 }
    974 
    975 static void
    976 evrpc_request_timeout(evutil_socket_t fd, short what, void *arg)
    977 {
    978 	struct evrpc_request_wrapper *ctx = arg;
    979 	struct evhttp_connection *evcon = ctx->evcon;
    980 	EVUTIL_ASSERT(evcon != NULL);
    981 
    982 	evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
    983 }
    984 
    985 /*
    986  * frees potential meta data associated with a request.
    987  */
    988 
    989 static void
    990 evrpc_meta_data_free(struct evrpc_meta_list *meta_data)
    991 {
    992 	struct evrpc_meta *entry;
    993 	EVUTIL_ASSERT(meta_data != NULL);
    994 
    995 	while ((entry = TAILQ_FIRST(meta_data)) != NULL) {
    996 		TAILQ_REMOVE(meta_data, entry, next);
    997 		mm_free(entry->key);
    998 		mm_free(entry->data);
    999 		mm_free(entry);
   1000 	}
   1001 }
   1002 
   1003 static struct evrpc_hook_meta *
   1004 evrpc_hook_meta_new(void)
   1005 {
   1006 	struct evrpc_hook_meta *ctx;
   1007 	ctx = mm_malloc(sizeof(struct evrpc_hook_meta));
   1008 	EVUTIL_ASSERT(ctx != NULL);
   1009 
   1010 	TAILQ_INIT(&ctx->meta_data);
   1011 	ctx->evcon = NULL;
   1012 
   1013 	return (ctx);
   1014 }
   1015 
   1016 static void
   1017 evrpc_hook_associate_meta(struct evrpc_hook_meta **pctx,
   1018     struct evhttp_connection *evcon)
   1019 {
   1020 	struct evrpc_hook_meta *ctx = *pctx;
   1021 	if (ctx == NULL)
   1022 		*pctx = ctx = evrpc_hook_meta_new();
   1023 	ctx->evcon = evcon;
   1024 }
   1025 
   1026 static void
   1027 evrpc_hook_context_free(struct evrpc_hook_meta *ctx)
   1028 {
   1029 	evrpc_meta_data_free(&ctx->meta_data);
   1030 	mm_free(ctx);
   1031 }
   1032 
   1033 /* Adds meta data */
   1034 void
   1035 evrpc_hook_add_meta(void *ctx, const char *key,
   1036     const void *data, size_t data_size)
   1037 {
   1038 	struct evrpc_request_wrapper *req = ctx;
   1039 	struct evrpc_hook_meta *store = NULL;
   1040 	struct evrpc_meta *meta = NULL;
   1041 
   1042 	if ((store = req->hook_meta) == NULL)
   1043 		store = req->hook_meta = evrpc_hook_meta_new();
   1044 
   1045 	meta = mm_malloc(sizeof(struct evrpc_meta));
   1046 	EVUTIL_ASSERT(meta != NULL);
   1047 	meta->key = mm_strdup(key);
   1048 	EVUTIL_ASSERT(meta->key != NULL);
   1049 	meta->data_size = data_size;
   1050 	meta->data = mm_malloc(data_size);
   1051 	EVUTIL_ASSERT(meta->data != NULL);
   1052 	memcpy(meta->data, data, data_size);
   1053 
   1054 	TAILQ_INSERT_TAIL(&store->meta_data, meta, next);
   1055 }
   1056 
   1057 int
   1058 evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size)
   1059 {
   1060 	struct evrpc_request_wrapper *req = ctx;
   1061 	struct evrpc_meta *meta = NULL;
   1062 
   1063 	if (req->hook_meta == NULL)
   1064 		return (-1);
   1065 
   1066 	TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) {
   1067 		if (strcmp(meta->key, key) == 0) {
   1068 			*data = meta->data;
   1069 			*data_size = meta->data_size;
   1070 			return (0);
   1071 		}
   1072 	}
   1073 
   1074 	return (-1);
   1075 }
   1076 
   1077 struct evhttp_connection *
   1078 evrpc_hook_get_connection(void *ctx)
   1079 {
   1080 	struct evrpc_request_wrapper *req = ctx;
   1081 	return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL);
   1082 }
   1083 
   1084 int
   1085 evrpc_send_request_generic(struct evrpc_pool *pool,
   1086     void *request, void *reply,
   1087     void (*cb)(struct evrpc_status *, void *, void *, void *),
   1088     void *cb_arg,
   1089     const char *rpcname,
   1090     void (*req_marshal)(struct evbuffer *, void *),
   1091     void (*rpl_clear)(void *),
   1092     int (*rpl_unmarshal)(void *, struct evbuffer *))
   1093 {
   1094 	struct evrpc_status status;
   1095 	struct evrpc_request_wrapper *ctx;
   1096 	ctx = evrpc_make_request_ctx(pool, request, reply,
   1097 	    rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg);
   1098 	if (ctx == NULL)
   1099 		goto error;
   1100 	return (evrpc_make_request(ctx));
   1101 error:
   1102 	memset(&status, 0, sizeof(status));
   1103 	status.error = EVRPC_STATUS_ERR_UNSTARTED;
   1104 	(*(cb))(&status, request, reply, cb_arg);
   1105 	return (-1);
   1106 }
   1107 
   1108 /** Takes a request object and fills it in with the right magic */
   1109 static struct evrpc *
   1110 evrpc_register_object(const char *name,
   1111     void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *),
   1112     int (*req_unmarshal)(void *, struct evbuffer *),
   1113     void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *),
   1114     int (*rpl_complete)(void *),
   1115     void (*rpl_marshal)(struct evbuffer *, void *))
   1116 {
   1117 	struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc));
   1118 	if (rpc == NULL)
   1119 		return (NULL);
   1120 	rpc->uri = mm_strdup(name);
   1121 	if (rpc->uri == NULL) {
   1122 		mm_free(rpc);
   1123 		return (NULL);
   1124 	}
   1125 	rpc->request_new = req_new;
   1126 	rpc->request_new_arg = req_new_arg;
   1127 	rpc->request_free = req_free;
   1128 	rpc->request_unmarshal = req_unmarshal;
   1129 	rpc->reply_new = rpl_new;
   1130 	rpc->reply_new_arg = rpl_new_arg;
   1131 	rpc->reply_free = rpl_free;
   1132 	rpc->reply_complete = rpl_complete;
   1133 	rpc->reply_marshal = rpl_marshal;
   1134 	return (rpc);
   1135 }
   1136 
   1137 int
   1138 evrpc_register_generic(struct evrpc_base *base, const char *name,
   1139     void (*callback)(struct evrpc_req_generic *, void *), void *cbarg,
   1140     void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *),
   1141     int (*req_unmarshal)(void *, struct evbuffer *),
   1142     void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *),
   1143     int (*rpl_complete)(void *),
   1144     void (*rpl_marshal)(struct evbuffer *, void *))
   1145 {
   1146 	struct evrpc* rpc =
   1147 	    evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal,
   1148 		rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal);
   1149 	if (rpc == NULL)
   1150 		return (-1);
   1151 	evrpc_register_rpc(base, rpc,
   1152 	    (void (*)(struct evrpc_req_generic*, void *))callback, cbarg);
   1153 	return (0);
   1154 }
   1155 
   1156 /** accessors for obscure and undocumented functionality */
   1157 struct evrpc_pool *
   1158 evrpc_request_get_pool(struct evrpc_request_wrapper *ctx)
   1159 {
   1160 	return (ctx->pool);
   1161 }
   1162 
   1163 void
   1164 evrpc_request_set_pool(struct evrpc_request_wrapper *ctx,
   1165     struct evrpc_pool *pool)
   1166 {
   1167 	ctx->pool = pool;
   1168 }
   1169 
   1170 void
   1171 evrpc_request_set_cb(struct evrpc_request_wrapper *ctx,
   1172     void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg),
   1173     void *cb_arg)
   1174 {
   1175 	ctx->cb = cb;
   1176 	ctx->cb_arg = cb_arg;
   1177 }
   1178