1 1.5 christos /* $NetBSD: evrpc.c,v 1.5 2021/04/07 03:36:48 christos Exp $ */ 2 1.5 christos 3 1.1 plunky /* 4 1.2 christos * Copyright (c) 2000-2007 Niels Provos <provos (at) citi.umich.edu> 5 1.2 christos * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 6 1.1 plunky * 7 1.1 plunky * Redistribution and use in source and binary forms, with or without 8 1.1 plunky * modification, are permitted provided that the following conditions 9 1.1 plunky * are met: 10 1.1 plunky * 1. Redistributions of source code must retain the above copyright 11 1.1 plunky * notice, this list of conditions and the following disclaimer. 12 1.1 plunky * 2. Redistributions in binary form must reproduce the above copyright 13 1.1 plunky * notice, this list of conditions and the following disclaimer in the 14 1.1 plunky * documentation and/or other materials provided with the distribution. 15 1.1 plunky * 3. The name of the author may not be used to endorse or promote products 16 1.1 plunky * derived from this software without specific prior written permission. 17 1.1 plunky * 18 1.1 plunky * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 19 1.1 plunky * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 20 1.1 plunky * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 21 1.1 plunky * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 22 1.1 plunky * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 23 1.1 plunky * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24 1.1 plunky * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25 1.1 plunky * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26 1.1 plunky * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 27 1.1 plunky * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 1.1 plunky */ 29 1.2 christos #include "event2/event-config.h" 30 1.2 christos #include <sys/cdefs.h> 31 1.5 christos __RCSID("$NetBSD: evrpc.c,v 1.5 2021/04/07 03:36:48 christos Exp $"); 32 1.4 christos #include "evconfig-private.h" 33 1.1 plunky 34 1.4 christos #ifdef _WIN32 35 1.1 plunky #define WIN32_LEAN_AND_MEAN 36 1.1 plunky #include <winsock2.h> 37 1.1 plunky #include <windows.h> 38 1.1 plunky #undef WIN32_LEAN_AND_MEAN 39 1.1 plunky #endif 40 1.1 plunky 41 1.1 plunky #include <sys/types.h> 42 1.4 christos #ifndef _WIN32 43 1.1 plunky #include <sys/socket.h> 44 1.1 plunky #endif 45 1.4 christos #ifdef EVENT__HAVE_SYS_TIME_H 46 1.1 plunky #include <sys/time.h> 47 1.1 plunky #endif 48 1.1 plunky #include <sys/queue.h> 49 1.1 plunky #include <stdio.h> 50 1.1 plunky #include <stdlib.h> 51 1.4 christos #ifndef _WIN32 52 1.1 plunky #include <unistd.h> 53 1.1 plunky #endif 54 1.1 plunky #include <errno.h> 55 1.1 plunky #include <signal.h> 56 1.1 plunky #include <string.h> 57 1.1 plunky 58 1.2 christos #include <sys/queue.h> 59 1.2 christos 60 1.2 christos #include "event2/event.h" 61 1.2 christos #include "event2/event_struct.h" 62 1.2 christos #include "event2/rpc.h" 63 1.2 christos #include "event2/rpc_struct.h" 64 1.1 plunky #include "evrpc-internal.h" 65 1.2 christos #include "event2/http.h" 66 1.2 christos #include "event2/buffer.h" 67 1.2 christos #include "event2/tag.h" 68 1.2 christos #include "event2/http_struct.h" 69 1.2 christos #include "event2/http_compat.h" 70 1.2 christos #include "event2/util.h" 71 1.2 christos #include "util-internal.h" 72 1.2 christos #include "log-internal.h" 73 1.2 christos #include "mm-internal.h" 74 1.1 plunky 75 1.1 plunky struct evrpc_base * 76 1.1 plunky evrpc_init(struct evhttp *http_server) 77 1.1 plunky { 78 1.2 christos struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base)); 79 1.1 plunky if (base == NULL) 80 1.1 plunky return (NULL); 81 1.1 plunky 82 1.1 plunky /* we rely on the tagging sub system */ 83 1.1 plunky evtag_init(); 84 1.1 plunky 85 1.1 plunky TAILQ_INIT(&base->registered_rpcs); 86 1.1 plunky TAILQ_INIT(&base->input_hooks); 87 1.1 plunky TAILQ_INIT(&base->output_hooks); 88 1.2 christos 89 1.2 christos TAILQ_INIT(&base->paused_requests); 90 1.2 christos 91 1.1 plunky base->http_server = http_server; 92 1.1 plunky 93 1.1 plunky return (base); 94 1.1 plunky } 95 1.1 plunky 96 1.1 plunky void 97 1.1 plunky evrpc_free(struct evrpc_base *base) 98 1.1 plunky { 99 1.1 plunky struct evrpc *rpc; 100 1.1 plunky struct evrpc_hook *hook; 101 1.3 spz struct evrpc_hook_ctx *pause; 102 1.2 christos int r; 103 1.1 plunky 104 1.1 plunky while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) { 105 1.2 christos r = evrpc_unregister_rpc(base, rpc->uri); 106 1.2 christos EVUTIL_ASSERT(r == 0); 107 1.2 christos } 108 1.3 spz while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) { 109 1.3 spz TAILQ_REMOVE(&base->paused_requests, pause, next); 110 1.3 spz mm_free(pause); 111 1.1 plunky } 112 1.1 plunky while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) { 113 1.2 christos r = evrpc_remove_hook(base, EVRPC_INPUT, hook); 114 1.2 christos EVUTIL_ASSERT(r); 115 1.1 plunky } 116 1.1 plunky while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) { 117 1.2 christos r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook); 118 1.2 christos EVUTIL_ASSERT(r); 119 1.1 plunky } 120 1.2 christos mm_free(base); 121 1.1 plunky } 122 1.1 plunky 123 1.1 plunky void * 124 1.1 plunky evrpc_add_hook(void *vbase, 125 1.1 plunky enum EVRPC_HOOK_TYPE hook_type, 126 1.2 christos int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *), 127 1.1 plunky void *cb_arg) 128 1.1 plunky { 129 1.4 christos struct evrpc_hooks_ *base = vbase; 130 1.1 plunky struct evrpc_hook_list *head = NULL; 131 1.1 plunky struct evrpc_hook *hook = NULL; 132 1.1 plunky switch (hook_type) { 133 1.1 plunky case EVRPC_INPUT: 134 1.1 plunky head = &base->in_hooks; 135 1.1 plunky break; 136 1.1 plunky case EVRPC_OUTPUT: 137 1.1 plunky head = &base->out_hooks; 138 1.1 plunky break; 139 1.1 plunky default: 140 1.2 christos EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); 141 1.1 plunky } 142 1.1 plunky 143 1.2 christos hook = mm_calloc(1, sizeof(struct evrpc_hook)); 144 1.2 christos EVUTIL_ASSERT(hook != NULL); 145 1.2 christos 146 1.1 plunky hook->process = cb; 147 1.1 plunky hook->process_arg = cb_arg; 148 1.1 plunky TAILQ_INSERT_TAIL(head, hook, next); 149 1.1 plunky 150 1.1 plunky return (hook); 151 1.1 plunky } 152 1.1 plunky 153 1.1 plunky static int 154 1.1 plunky evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle) 155 1.1 plunky { 156 1.1 plunky struct evrpc_hook *hook = NULL; 157 1.1 plunky TAILQ_FOREACH(hook, head, next) { 158 1.1 plunky if (hook == handle) { 159 1.1 plunky TAILQ_REMOVE(head, hook, next); 160 1.2 christos mm_free(hook); 161 1.1 plunky return (1); 162 1.1 plunky } 163 1.1 plunky } 164 1.1 plunky 165 1.1 plunky return (0); 166 1.1 plunky } 167 1.1 plunky 168 1.1 plunky /* 169 1.1 plunky * remove the hook specified by the handle 170 1.1 plunky */ 171 1.1 plunky 172 1.1 plunky int 173 1.1 plunky evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle) 174 1.1 plunky { 175 1.4 christos struct evrpc_hooks_ *base = vbase; 176 1.1 plunky struct evrpc_hook_list *head = NULL; 177 1.1 plunky switch (hook_type) { 178 1.1 plunky case EVRPC_INPUT: 179 1.1 plunky head = &base->in_hooks; 180 1.1 plunky break; 181 1.1 plunky case EVRPC_OUTPUT: 182 1.1 plunky head = &base->out_hooks; 183 1.1 plunky break; 184 1.1 plunky default: 185 1.2 christos EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT); 186 1.1 plunky } 187 1.1 plunky 188 1.1 plunky return (evrpc_remove_hook_internal(head, handle)); 189 1.1 plunky } 190 1.1 plunky 191 1.1 plunky static int 192 1.2 christos evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx, 193 1.1 plunky struct evhttp_request *req, struct evbuffer *evbuf) 194 1.1 plunky { 195 1.1 plunky struct evrpc_hook *hook; 196 1.1 plunky TAILQ_FOREACH(hook, head, next) { 197 1.2 christos int res = hook->process(ctx, req, evbuf, hook->process_arg); 198 1.2 christos if (res != EVRPC_CONTINUE) 199 1.2 christos return (res); 200 1.1 plunky } 201 1.1 plunky 202 1.2 christos return (EVRPC_CONTINUE); 203 1.1 plunky } 204 1.1 plunky 205 1.1 plunky static void evrpc_pool_schedule(struct evrpc_pool *pool); 206 1.1 plunky static void evrpc_request_cb(struct evhttp_request *, void *); 207 1.1 plunky 208 1.1 plunky /* 209 1.1 plunky * Registers a new RPC with the HTTP server. The evrpc object is expected 210 1.1 plunky * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn 211 1.1 plunky * calls this function. 212 1.1 plunky */ 213 1.1 plunky 214 1.1 plunky static char * 215 1.1 plunky evrpc_construct_uri(const char *uri) 216 1.1 plunky { 217 1.1 plunky char *constructed_uri; 218 1.2 christos size_t constructed_uri_len; 219 1.1 plunky 220 1.1 plunky constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1; 221 1.2 christos if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL) 222 1.1 plunky event_err(1, "%s: failed to register rpc at %s", 223 1.1 plunky __func__, uri); 224 1.1 plunky memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX)); 225 1.1 plunky memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri)); 226 1.1 plunky constructed_uri[constructed_uri_len - 1] = '\0'; 227 1.1 plunky 228 1.1 plunky return (constructed_uri); 229 1.1 plunky } 230 1.1 plunky 231 1.1 plunky int 232 1.1 plunky evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc, 233 1.1 plunky void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg) 234 1.1 plunky { 235 1.1 plunky char *constructed_uri = evrpc_construct_uri(rpc->uri); 236 1.1 plunky 237 1.1 plunky rpc->base = base; 238 1.1 plunky rpc->cb = cb; 239 1.1 plunky rpc->cb_arg = cb_arg; 240 1.1 plunky 241 1.1 plunky TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next); 242 1.1 plunky 243 1.1 plunky evhttp_set_cb(base->http_server, 244 1.1 plunky constructed_uri, 245 1.1 plunky evrpc_request_cb, 246 1.1 plunky rpc); 247 1.2 christos 248 1.2 christos mm_free(constructed_uri); 249 1.1 plunky 250 1.1 plunky return (0); 251 1.1 plunky } 252 1.1 plunky 253 1.1 plunky int 254 1.1 plunky evrpc_unregister_rpc(struct evrpc_base *base, const char *name) 255 1.1 plunky { 256 1.1 plunky char *registered_uri = NULL; 257 1.1 plunky struct evrpc *rpc; 258 1.2 christos int r; 259 1.1 plunky 260 1.1 plunky /* find the right rpc; linear search might be slow */ 261 1.1 plunky TAILQ_FOREACH(rpc, &base->registered_rpcs, next) { 262 1.1 plunky if (strcmp(rpc->uri, name) == 0) 263 1.1 plunky break; 264 1.1 plunky } 265 1.1 plunky if (rpc == NULL) { 266 1.1 plunky /* We did not find an RPC with this name */ 267 1.1 plunky return (-1); 268 1.1 plunky } 269 1.1 plunky TAILQ_REMOVE(&base->registered_rpcs, rpc, next); 270 1.1 plunky 271 1.2 christos registered_uri = evrpc_construct_uri(name); 272 1.1 plunky 273 1.1 plunky /* remove the http server callback */ 274 1.2 christos r = evhttp_del_cb(base->http_server, registered_uri); 275 1.2 christos EVUTIL_ASSERT(r == 0); 276 1.1 plunky 277 1.2 christos mm_free(registered_uri); 278 1.2 christos 279 1.2 christos mm_free(__UNCONST(rpc->uri)); 280 1.2 christos mm_free(rpc); 281 1.1 plunky return (0); 282 1.1 plunky } 283 1.1 plunky 284 1.2 christos static int evrpc_pause_request(void *vbase, void *ctx, 285 1.2 christos void (*cb)(void *, enum EVRPC_HOOK_RESULT)); 286 1.2 christos static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT); 287 1.2 christos 288 1.1 plunky static void 289 1.1 plunky evrpc_request_cb(struct evhttp_request *req, void *arg) 290 1.1 plunky { 291 1.1 plunky struct evrpc *rpc = arg; 292 1.1 plunky struct evrpc_req_generic *rpc_state = NULL; 293 1.1 plunky 294 1.1 plunky /* let's verify the outside parameters */ 295 1.1 plunky if (req->type != EVHTTP_REQ_POST || 296 1.2 christos evbuffer_get_length(req->input_buffer) <= 0) 297 1.1 plunky goto error; 298 1.1 plunky 299 1.2 christos rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic)); 300 1.2 christos if (rpc_state == NULL) 301 1.1 plunky goto error; 302 1.2 christos rpc_state->rpc = rpc; 303 1.2 christos rpc_state->http_req = req; 304 1.2 christos rpc_state->rpc_data = NULL; 305 1.2 christos 306 1.2 christos if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) { 307 1.2 christos int hook_res; 308 1.2 christos 309 1.4 christos evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon); 310 1.2 christos 311 1.2 christos /* 312 1.2 christos * allow hooks to modify the outgoing request 313 1.2 christos */ 314 1.2 christos hook_res = evrpc_process_hooks(&rpc->base->input_hooks, 315 1.2 christos rpc_state, req, req->input_buffer); 316 1.2 christos switch (hook_res) { 317 1.2 christos case EVRPC_TERMINATE: 318 1.2 christos goto error; 319 1.2 christos case EVRPC_PAUSE: 320 1.2 christos evrpc_pause_request(rpc->base, rpc_state, 321 1.2 christos evrpc_request_cb_closure); 322 1.2 christos return; 323 1.2 christos case EVRPC_CONTINUE: 324 1.2 christos break; 325 1.2 christos default: 326 1.2 christos EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 327 1.2 christos hook_res == EVRPC_CONTINUE || 328 1.2 christos hook_res == EVRPC_PAUSE); 329 1.2 christos } 330 1.2 christos } 331 1.2 christos 332 1.2 christos evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE); 333 1.2 christos return; 334 1.2 christos 335 1.2 christos error: 336 1.5 christos if (rpc_state) 337 1.5 christos evrpc_reqstate_free_(rpc_state); 338 1.2 christos evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 339 1.2 christos return; 340 1.2 christos } 341 1.2 christos 342 1.2 christos static void 343 1.2 christos evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 344 1.2 christos { 345 1.2 christos struct evrpc_req_generic *rpc_state = arg; 346 1.2 christos struct evrpc *rpc; 347 1.2 christos struct evhttp_request *req; 348 1.2 christos 349 1.2 christos EVUTIL_ASSERT(rpc_state); 350 1.2 christos rpc = rpc_state->rpc; 351 1.2 christos req = rpc_state->http_req; 352 1.1 plunky 353 1.2 christos if (hook_res == EVRPC_TERMINATE) 354 1.1 plunky goto error; 355 1.1 plunky 356 1.1 plunky /* let's check that we can parse the request */ 357 1.2 christos rpc_state->request = rpc->request_new(rpc->request_new_arg); 358 1.1 plunky if (rpc_state->request == NULL) 359 1.1 plunky goto error; 360 1.1 plunky 361 1.1 plunky if (rpc->request_unmarshal( 362 1.1 plunky rpc_state->request, req->input_buffer) == -1) { 363 1.1 plunky /* we failed to parse the request; that's a bummer */ 364 1.1 plunky goto error; 365 1.1 plunky } 366 1.1 plunky 367 1.1 plunky /* at this point, we have a well formed request, prepare the reply */ 368 1.1 plunky 369 1.2 christos rpc_state->reply = rpc->reply_new(rpc->reply_new_arg); 370 1.1 plunky if (rpc_state->reply == NULL) 371 1.1 plunky goto error; 372 1.1 plunky 373 1.1 plunky /* give the rpc to the user; they can deal with it */ 374 1.1 plunky rpc->cb(rpc_state, rpc->cb_arg); 375 1.1 plunky 376 1.1 plunky return; 377 1.1 plunky 378 1.1 plunky error: 379 1.4 christos evrpc_reqstate_free_(rpc_state); 380 1.2 christos evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 381 1.1 plunky return; 382 1.1 plunky } 383 1.1 plunky 384 1.2 christos 385 1.1 plunky void 386 1.4 christos evrpc_reqstate_free_(struct evrpc_req_generic* rpc_state) 387 1.1 plunky { 388 1.2 christos struct evrpc *rpc; 389 1.2 christos EVUTIL_ASSERT(rpc_state != NULL); 390 1.2 christos rpc = rpc_state->rpc; 391 1.2 christos 392 1.1 plunky /* clean up all memory */ 393 1.2 christos if (rpc_state->hook_meta != NULL) 394 1.4 christos evrpc_hook_context_free_(rpc_state->hook_meta); 395 1.2 christos if (rpc_state->request != NULL) 396 1.2 christos rpc->request_free(rpc_state->request); 397 1.2 christos if (rpc_state->reply != NULL) 398 1.2 christos rpc->reply_free(rpc_state->reply); 399 1.2 christos if (rpc_state->rpc_data != NULL) 400 1.2 christos evbuffer_free(rpc_state->rpc_data); 401 1.2 christos mm_free(rpc_state); 402 1.2 christos } 403 1.1 plunky 404 1.2 christos static void 405 1.2 christos evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT); 406 1.1 plunky 407 1.1 plunky void 408 1.2 christos evrpc_request_done(struct evrpc_req_generic *rpc_state) 409 1.1 plunky { 410 1.2 christos struct evhttp_request *req; 411 1.2 christos struct evrpc *rpc; 412 1.2 christos 413 1.2 christos EVUTIL_ASSERT(rpc_state); 414 1.2 christos 415 1.2 christos req = rpc_state->http_req; 416 1.2 christos rpc = rpc_state->rpc; 417 1.1 plunky 418 1.1 plunky if (rpc->reply_complete(rpc_state->reply) == -1) { 419 1.1 plunky /* the reply was not completely filled in. error out */ 420 1.1 plunky goto error; 421 1.1 plunky } 422 1.1 plunky 423 1.2 christos if ((rpc_state->rpc_data = evbuffer_new()) == NULL) { 424 1.1 plunky /* out of memory */ 425 1.1 plunky goto error; 426 1.1 plunky } 427 1.1 plunky 428 1.1 plunky /* serialize the reply */ 429 1.2 christos rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply); 430 1.2 christos 431 1.2 christos if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) { 432 1.2 christos int hook_res; 433 1.2 christos 434 1.4 christos evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon); 435 1.2 christos 436 1.2 christos /* do hook based tweaks to the request */ 437 1.2 christos hook_res = evrpc_process_hooks(&rpc->base->output_hooks, 438 1.2 christos rpc_state, req, rpc_state->rpc_data); 439 1.2 christos switch (hook_res) { 440 1.2 christos case EVRPC_TERMINATE: 441 1.2 christos goto error; 442 1.2 christos case EVRPC_PAUSE: 443 1.2 christos if (evrpc_pause_request(rpc->base, rpc_state, 444 1.2 christos evrpc_request_done_closure) == -1) 445 1.2 christos goto error; 446 1.2 christos return; 447 1.2 christos case EVRPC_CONTINUE: 448 1.2 christos break; 449 1.2 christos default: 450 1.2 christos EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 451 1.2 christos hook_res == EVRPC_CONTINUE || 452 1.2 christos hook_res == EVRPC_PAUSE); 453 1.2 christos } 454 1.2 christos } 455 1.2 christos 456 1.2 christos evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE); 457 1.2 christos return; 458 1.2 christos 459 1.2 christos error: 460 1.4 christos evrpc_reqstate_free_(rpc_state); 461 1.2 christos evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 462 1.2 christos return; 463 1.2 christos } 464 1.2 christos 465 1.2 christos void * 466 1.2 christos evrpc_get_request(struct evrpc_req_generic *req) 467 1.2 christos { 468 1.2 christos return req->request; 469 1.2 christos } 470 1.2 christos 471 1.2 christos void * 472 1.2 christos evrpc_get_reply(struct evrpc_req_generic *req) 473 1.2 christos { 474 1.2 christos return req->reply; 475 1.2 christos } 476 1.2 christos 477 1.2 christos static void 478 1.2 christos evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 479 1.2 christos { 480 1.2 christos struct evrpc_req_generic *rpc_state = arg; 481 1.2 christos struct evhttp_request *req; 482 1.2 christos EVUTIL_ASSERT(rpc_state); 483 1.2 christos req = rpc_state->http_req; 484 1.1 plunky 485 1.2 christos if (hook_res == EVRPC_TERMINATE) 486 1.1 plunky goto error; 487 1.1 plunky 488 1.1 plunky /* on success, we are going to transmit marshaled binary data */ 489 1.1 plunky if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) { 490 1.1 plunky evhttp_add_header(req->output_headers, 491 1.1 plunky "Content-Type", "application/octet-stream"); 492 1.1 plunky } 493 1.2 christos evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data); 494 1.1 plunky 495 1.4 christos evrpc_reqstate_free_(rpc_state); 496 1.1 plunky 497 1.1 plunky return; 498 1.1 plunky 499 1.1 plunky error: 500 1.4 christos evrpc_reqstate_free_(rpc_state); 501 1.2 christos evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL); 502 1.1 plunky return; 503 1.1 plunky } 504 1.1 plunky 505 1.2 christos 506 1.1 plunky /* Client implementation of RPC site */ 507 1.1 plunky 508 1.1 plunky static int evrpc_schedule_request(struct evhttp_connection *connection, 509 1.1 plunky struct evrpc_request_wrapper *ctx); 510 1.1 plunky 511 1.1 plunky struct evrpc_pool * 512 1.1 plunky evrpc_pool_new(struct event_base *base) 513 1.1 plunky { 514 1.2 christos struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool)); 515 1.1 plunky if (pool == NULL) 516 1.1 plunky return (NULL); 517 1.1 plunky 518 1.1 plunky TAILQ_INIT(&pool->connections); 519 1.1 plunky TAILQ_INIT(&pool->requests); 520 1.1 plunky 521 1.2 christos TAILQ_INIT(&pool->paused_requests); 522 1.2 christos 523 1.1 plunky TAILQ_INIT(&pool->input_hooks); 524 1.1 plunky TAILQ_INIT(&pool->output_hooks); 525 1.1 plunky 526 1.1 plunky pool->base = base; 527 1.1 plunky pool->timeout = -1; 528 1.1 plunky 529 1.1 plunky return (pool); 530 1.1 plunky } 531 1.1 plunky 532 1.1 plunky static void 533 1.1 plunky evrpc_request_wrapper_free(struct evrpc_request_wrapper *request) 534 1.1 plunky { 535 1.2 christos if (request->hook_meta != NULL) 536 1.4 christos evrpc_hook_context_free_(request->hook_meta); 537 1.2 christos mm_free(request->name); 538 1.2 christos mm_free(request); 539 1.1 plunky } 540 1.1 plunky 541 1.1 plunky void 542 1.1 plunky evrpc_pool_free(struct evrpc_pool *pool) 543 1.1 plunky { 544 1.1 plunky struct evhttp_connection *connection; 545 1.1 plunky struct evrpc_request_wrapper *request; 546 1.3 spz struct evrpc_hook_ctx *pause; 547 1.1 plunky struct evrpc_hook *hook; 548 1.2 christos int r; 549 1.1 plunky 550 1.1 plunky while ((request = TAILQ_FIRST(&pool->requests)) != NULL) { 551 1.1 plunky TAILQ_REMOVE(&pool->requests, request, next); 552 1.1 plunky evrpc_request_wrapper_free(request); 553 1.1 plunky } 554 1.1 plunky 555 1.3 spz while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) { 556 1.3 spz TAILQ_REMOVE(&pool->paused_requests, pause, next); 557 1.3 spz mm_free(pause); 558 1.2 christos } 559 1.2 christos 560 1.1 plunky while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) { 561 1.1 plunky TAILQ_REMOVE(&pool->connections, connection, next); 562 1.1 plunky evhttp_connection_free(connection); 563 1.1 plunky } 564 1.1 plunky 565 1.1 plunky while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) { 566 1.2 christos r = evrpc_remove_hook(pool, EVRPC_INPUT, hook); 567 1.2 christos EVUTIL_ASSERT(r); 568 1.1 plunky } 569 1.1 plunky 570 1.1 plunky while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) { 571 1.2 christos r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook); 572 1.2 christos EVUTIL_ASSERT(r); 573 1.1 plunky } 574 1.1 plunky 575 1.2 christos mm_free(pool); 576 1.1 plunky } 577 1.1 plunky 578 1.1 plunky /* 579 1.1 plunky * Add a connection to the RPC pool. A request scheduled on the pool 580 1.1 plunky * may use any available connection. 581 1.1 plunky */ 582 1.1 plunky 583 1.1 plunky void 584 1.1 plunky evrpc_pool_add_connection(struct evrpc_pool *pool, 585 1.2 christos struct evhttp_connection *connection) 586 1.2 christos { 587 1.2 christos EVUTIL_ASSERT(connection->http_server == NULL); 588 1.1 plunky TAILQ_INSERT_TAIL(&pool->connections, connection, next); 589 1.1 plunky 590 1.1 plunky /* 591 1.1 plunky * associate an event base with this connection 592 1.1 plunky */ 593 1.1 plunky if (pool->base != NULL) 594 1.1 plunky evhttp_connection_set_base(connection, pool->base); 595 1.1 plunky 596 1.2 christos /* 597 1.1 plunky * unless a timeout was specifically set for a connection, 598 1.1 plunky * the connection inherits the timeout from the pool. 599 1.1 plunky */ 600 1.4 christos if (!evutil_timerisset(&connection->timeout)) 601 1.4 christos evhttp_connection_set_timeout(connection, pool->timeout); 602 1.1 plunky 603 1.2 christos /* 604 1.1 plunky * if we have any requests pending, schedule them with the new 605 1.1 plunky * connections. 606 1.1 plunky */ 607 1.1 plunky 608 1.1 plunky if (TAILQ_FIRST(&pool->requests) != NULL) { 609 1.2 christos struct evrpc_request_wrapper *request = 610 1.1 plunky TAILQ_FIRST(&pool->requests); 611 1.1 plunky TAILQ_REMOVE(&pool->requests, request, next); 612 1.1 plunky evrpc_schedule_request(connection, request); 613 1.1 plunky } 614 1.1 plunky } 615 1.1 plunky 616 1.1 plunky void 617 1.2 christos evrpc_pool_remove_connection(struct evrpc_pool *pool, 618 1.2 christos struct evhttp_connection *connection) 619 1.2 christos { 620 1.2 christos TAILQ_REMOVE(&pool->connections, connection, next); 621 1.2 christos } 622 1.2 christos 623 1.2 christos void 624 1.1 plunky evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs) 625 1.1 plunky { 626 1.1 plunky struct evhttp_connection *evcon; 627 1.1 plunky TAILQ_FOREACH(evcon, &pool->connections, next) { 628 1.4 christos evhttp_connection_set_timeout(evcon, timeout_in_secs); 629 1.1 plunky } 630 1.1 plunky pool->timeout = timeout_in_secs; 631 1.1 plunky } 632 1.1 plunky 633 1.1 plunky 634 1.1 plunky static void evrpc_reply_done(struct evhttp_request *, void *); 635 1.2 christos static void evrpc_request_timeout(evutil_socket_t, short, void *); 636 1.1 plunky 637 1.1 plunky /* 638 1.1 plunky * Finds a connection object associated with the pool that is currently 639 1.1 plunky * idle and can be used to make a request. 640 1.1 plunky */ 641 1.1 plunky static struct evhttp_connection * 642 1.1 plunky evrpc_pool_find_connection(struct evrpc_pool *pool) 643 1.1 plunky { 644 1.1 plunky struct evhttp_connection *connection; 645 1.1 plunky TAILQ_FOREACH(connection, &pool->connections, next) { 646 1.1 plunky if (TAILQ_FIRST(&connection->requests) == NULL) 647 1.1 plunky return (connection); 648 1.1 plunky } 649 1.1 plunky 650 1.1 plunky return (NULL); 651 1.1 plunky } 652 1.1 plunky 653 1.1 plunky /* 654 1.2 christos * Prototypes responsible for evrpc scheduling and hooking 655 1.2 christos */ 656 1.2 christos 657 1.2 christos static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT); 658 1.2 christos 659 1.2 christos /* 660 1.1 plunky * We assume that the ctx is no longer queued on the pool. 661 1.1 plunky */ 662 1.1 plunky static int 663 1.1 plunky evrpc_schedule_request(struct evhttp_connection *connection, 664 1.1 plunky struct evrpc_request_wrapper *ctx) 665 1.1 plunky { 666 1.1 plunky struct evhttp_request *req = NULL; 667 1.1 plunky struct evrpc_pool *pool = ctx->pool; 668 1.1 plunky struct evrpc_status status; 669 1.1 plunky 670 1.1 plunky if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL) 671 1.1 plunky goto error; 672 1.1 plunky 673 1.1 plunky /* serialize the request data into the output buffer */ 674 1.1 plunky ctx->request_marshal(req->output_buffer, ctx->request); 675 1.1 plunky 676 1.1 plunky /* we need to know the connection that we might have to abort */ 677 1.1 plunky ctx->evcon = connection; 678 1.1 plunky 679 1.2 christos /* if we get paused we also need to know the request */ 680 1.2 christos ctx->req = req; 681 1.2 christos 682 1.2 christos if (TAILQ_FIRST(&pool->output_hooks) != NULL) { 683 1.2 christos int hook_res; 684 1.2 christos 685 1.4 christos evrpc_hook_associate_meta_(&ctx->hook_meta, connection); 686 1.2 christos 687 1.2 christos /* apply hooks to the outgoing request */ 688 1.2 christos hook_res = evrpc_process_hooks(&pool->output_hooks, 689 1.2 christos ctx, req, req->output_buffer); 690 1.2 christos 691 1.2 christos switch (hook_res) { 692 1.2 christos case EVRPC_TERMINATE: 693 1.2 christos goto error; 694 1.2 christos case EVRPC_PAUSE: 695 1.2 christos /* we need to be explicitly resumed */ 696 1.2 christos if (evrpc_pause_request(pool, ctx, 697 1.2 christos evrpc_schedule_request_closure) == -1) 698 1.2 christos goto error; 699 1.2 christos return (0); 700 1.2 christos case EVRPC_CONTINUE: 701 1.2 christos /* we can just continue */ 702 1.2 christos break; 703 1.2 christos default: 704 1.2 christos EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 705 1.2 christos hook_res == EVRPC_CONTINUE || 706 1.2 christos hook_res == EVRPC_PAUSE); 707 1.2 christos } 708 1.2 christos } 709 1.2 christos 710 1.2 christos evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE); 711 1.2 christos return (0); 712 1.2 christos 713 1.2 christos error: 714 1.2 christos memset(&status, 0, sizeof(status)); 715 1.2 christos status.error = EVRPC_STATUS_ERR_UNSTARTED; 716 1.2 christos (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 717 1.2 christos evrpc_request_wrapper_free(ctx); 718 1.2 christos return (-1); 719 1.2 christos } 720 1.2 christos 721 1.2 christos static void 722 1.2 christos evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 723 1.2 christos { 724 1.2 christos struct evrpc_request_wrapper *ctx = arg; 725 1.2 christos struct evhttp_connection *connection = ctx->evcon; 726 1.2 christos struct evhttp_request *req = ctx->req; 727 1.2 christos struct evrpc_pool *pool = ctx->pool; 728 1.2 christos struct evrpc_status status; 729 1.2 christos char *uri = NULL; 730 1.2 christos int res = 0; 731 1.2 christos 732 1.2 christos if (hook_res == EVRPC_TERMINATE) 733 1.2 christos goto error; 734 1.2 christos 735 1.2 christos uri = evrpc_construct_uri(ctx->name); 736 1.2 christos if (uri == NULL) 737 1.1 plunky goto error; 738 1.1 plunky 739 1.1 plunky if (pool->timeout > 0) { 740 1.2 christos /* 741 1.1 plunky * a timeout after which the whole rpc is going to be aborted. 742 1.1 plunky */ 743 1.1 plunky struct timeval tv; 744 1.1 plunky evutil_timerclear(&tv); 745 1.1 plunky tv.tv_sec = pool->timeout; 746 1.1 plunky evtimer_add(&ctx->ev_timeout, &tv); 747 1.1 plunky } 748 1.1 plunky 749 1.1 plunky /* start the request over the connection */ 750 1.1 plunky res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri); 751 1.2 christos mm_free(uri); 752 1.1 plunky 753 1.1 plunky if (res == -1) 754 1.1 plunky goto error; 755 1.1 plunky 756 1.2 christos return; 757 1.1 plunky 758 1.1 plunky error: 759 1.1 plunky memset(&status, 0, sizeof(status)); 760 1.1 plunky status.error = EVRPC_STATUS_ERR_UNSTARTED; 761 1.1 plunky (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 762 1.1 plunky evrpc_request_wrapper_free(ctx); 763 1.2 christos } 764 1.2 christos 765 1.2 christos /* we just queue the paused request on the pool under the req object */ 766 1.2 christos static int 767 1.2 christos evrpc_pause_request(void *vbase, void *ctx, 768 1.2 christos void (*cb)(void *, enum EVRPC_HOOK_RESULT)) 769 1.2 christos { 770 1.4 christos struct evrpc_hooks_ *base = vbase; 771 1.3 spz struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause)); 772 1.3 spz if (pause == NULL) 773 1.2 christos return (-1); 774 1.2 christos 775 1.3 spz pause->ctx = ctx; 776 1.3 spz pause->cb = cb; 777 1.2 christos 778 1.3 spz TAILQ_INSERT_TAIL(&base->pause_requests, pause, next); 779 1.2 christos return (0); 780 1.2 christos } 781 1.2 christos 782 1.2 christos int 783 1.2 christos evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res) 784 1.2 christos { 785 1.4 christos struct evrpc_hooks_ *base = vbase; 786 1.2 christos struct evrpc_pause_list *head = &base->pause_requests; 787 1.3 spz struct evrpc_hook_ctx *pause; 788 1.2 christos 789 1.3 spz TAILQ_FOREACH(pause, head, next) { 790 1.3 spz if (pause->ctx == ctx) 791 1.2 christos break; 792 1.2 christos } 793 1.2 christos 794 1.3 spz if (pause == NULL) 795 1.2 christos return (-1); 796 1.2 christos 797 1.3 spz (*pause->cb)(pause->ctx, res); 798 1.3 spz TAILQ_REMOVE(head, pause, next); 799 1.3 spz mm_free(pause); 800 1.2 christos return (0); 801 1.1 plunky } 802 1.1 plunky 803 1.1 plunky int 804 1.1 plunky evrpc_make_request(struct evrpc_request_wrapper *ctx) 805 1.1 plunky { 806 1.1 plunky struct evrpc_pool *pool = ctx->pool; 807 1.1 plunky 808 1.1 plunky /* initialize the event structure for this rpc */ 809 1.2 christos evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx); 810 1.1 plunky 811 1.1 plunky /* we better have some available connections on the pool */ 812 1.2 christos EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL); 813 1.1 plunky 814 1.2 christos /* 815 1.1 plunky * if no connection is available, we queue the request on the pool, 816 1.1 plunky * the next time a connection is empty, the rpc will be send on that. 817 1.1 plunky */ 818 1.1 plunky TAILQ_INSERT_TAIL(&pool->requests, ctx, next); 819 1.1 plunky 820 1.1 plunky evrpc_pool_schedule(pool); 821 1.1 plunky 822 1.1 plunky return (0); 823 1.1 plunky } 824 1.1 plunky 825 1.2 christos 826 1.2 christos struct evrpc_request_wrapper * 827 1.2 christos evrpc_make_request_ctx( 828 1.2 christos struct evrpc_pool *pool, void *request, void *reply, 829 1.2 christos const char *rpcname, 830 1.2 christos void (*req_marshal)(struct evbuffer*, void *), 831 1.2 christos void (*rpl_clear)(void *), 832 1.2 christos int (*rpl_unmarshal)(void *, struct evbuffer *), 833 1.2 christos void (*cb)(struct evrpc_status *, void *, void *, void *), 834 1.2 christos void *cbarg) 835 1.2 christos { 836 1.2 christos struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *) 837 1.2 christos mm_malloc(sizeof(struct evrpc_request_wrapper)); 838 1.2 christos if (ctx == NULL) 839 1.2 christos return (NULL); 840 1.2 christos 841 1.2 christos ctx->pool = pool; 842 1.2 christos ctx->hook_meta = NULL; 843 1.2 christos ctx->evcon = NULL; 844 1.2 christos ctx->name = mm_strdup(rpcname); 845 1.2 christos if (ctx->name == NULL) { 846 1.2 christos mm_free(ctx); 847 1.2 christos return (NULL); 848 1.2 christos } 849 1.2 christos ctx->cb = cb; 850 1.2 christos ctx->cb_arg = cbarg; 851 1.2 christos ctx->request = request; 852 1.2 christos ctx->reply = reply; 853 1.2 christos ctx->request_marshal = req_marshal; 854 1.2 christos ctx->reply_clear = rpl_clear; 855 1.2 christos ctx->reply_unmarshal = rpl_unmarshal; 856 1.2 christos 857 1.2 christos return (ctx); 858 1.2 christos } 859 1.2 christos 860 1.2 christos static void 861 1.2 christos evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT); 862 1.2 christos 863 1.1 plunky static void 864 1.1 plunky evrpc_reply_done(struct evhttp_request *req, void *arg) 865 1.1 plunky { 866 1.1 plunky struct evrpc_request_wrapper *ctx = arg; 867 1.1 plunky struct evrpc_pool *pool = ctx->pool; 868 1.2 christos int hook_res = EVRPC_CONTINUE; 869 1.2 christos 870 1.2 christos /* cancel any timeout we might have scheduled */ 871 1.2 christos event_del(&ctx->ev_timeout); 872 1.2 christos 873 1.2 christos ctx->req = req; 874 1.2 christos 875 1.2 christos /* we need to get the reply now */ 876 1.2 christos if (req == NULL) { 877 1.2 christos evrpc_reply_done_closure(ctx, EVRPC_CONTINUE); 878 1.2 christos return; 879 1.2 christos } 880 1.2 christos 881 1.2 christos if (TAILQ_FIRST(&pool->input_hooks) != NULL) { 882 1.4 christos evrpc_hook_associate_meta_(&ctx->hook_meta, ctx->evcon); 883 1.2 christos 884 1.2 christos /* apply hooks to the incoming request */ 885 1.2 christos hook_res = evrpc_process_hooks(&pool->input_hooks, 886 1.2 christos ctx, req, req->input_buffer); 887 1.2 christos 888 1.2 christos switch (hook_res) { 889 1.2 christos case EVRPC_TERMINATE: 890 1.2 christos case EVRPC_CONTINUE: 891 1.2 christos break; 892 1.2 christos case EVRPC_PAUSE: 893 1.2 christos /* 894 1.2 christos * if we get paused we also need to know the 895 1.2 christos * request. unfortunately, the underlying 896 1.2 christos * layer is going to free it. we need to 897 1.2 christos * request ownership explicitly 898 1.2 christos */ 899 1.5 christos evhttp_request_own(req); 900 1.2 christos 901 1.2 christos evrpc_pause_request(pool, ctx, 902 1.2 christos evrpc_reply_done_closure); 903 1.2 christos return; 904 1.2 christos default: 905 1.2 christos EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE || 906 1.2 christos hook_res == EVRPC_CONTINUE || 907 1.2 christos hook_res == EVRPC_PAUSE); 908 1.2 christos } 909 1.2 christos } 910 1.2 christos 911 1.2 christos evrpc_reply_done_closure(ctx, hook_res); 912 1.2 christos 913 1.2 christos /* http request is being freed by underlying layer */ 914 1.2 christos } 915 1.2 christos 916 1.2 christos static void 917 1.2 christos evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res) 918 1.2 christos { 919 1.2 christos struct evrpc_request_wrapper *ctx = arg; 920 1.2 christos struct evhttp_request *req = ctx->req; 921 1.2 christos struct evrpc_pool *pool = ctx->pool; 922 1.1 plunky struct evrpc_status status; 923 1.1 plunky int res = -1; 924 1.1 plunky 925 1.1 plunky memset(&status, 0, sizeof(status)); 926 1.1 plunky status.http_req = req; 927 1.1 plunky 928 1.1 plunky /* we need to get the reply now */ 929 1.2 christos if (req == NULL) { 930 1.2 christos status.error = EVRPC_STATUS_ERR_TIMEOUT; 931 1.2 christos } else if (hook_res == EVRPC_TERMINATE) { 932 1.2 christos status.error = EVRPC_STATUS_ERR_HOOKABORTED; 933 1.1 plunky } else { 934 1.2 christos res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); 935 1.2 christos if (res == -1) 936 1.2 christos status.error = EVRPC_STATUS_ERR_BADPAYLOAD; 937 1.1 plunky } 938 1.1 plunky 939 1.1 plunky if (res == -1) { 940 1.1 plunky /* clear everything that we might have written previously */ 941 1.1 plunky ctx->reply_clear(ctx->reply); 942 1.1 plunky } 943 1.1 plunky 944 1.1 plunky (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg); 945 1.2 christos 946 1.1 plunky evrpc_request_wrapper_free(ctx); 947 1.1 plunky 948 1.2 christos /* the http layer owned the original request structure, but if we 949 1.2 christos * got paused, we asked for ownership and need to free it here. */ 950 1.2 christos if (req != NULL && evhttp_request_is_owned(req)) 951 1.2 christos evhttp_request_free(req); 952 1.1 plunky 953 1.1 plunky /* see if we can schedule another request */ 954 1.1 plunky evrpc_pool_schedule(pool); 955 1.1 plunky } 956 1.1 plunky 957 1.1 plunky static void 958 1.1 plunky evrpc_pool_schedule(struct evrpc_pool *pool) 959 1.1 plunky { 960 1.1 plunky struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests); 961 1.1 plunky struct evhttp_connection *evcon; 962 1.1 plunky 963 1.1 plunky /* if no requests are pending, we have no work */ 964 1.1 plunky if (ctx == NULL) 965 1.1 plunky return; 966 1.1 plunky 967 1.1 plunky if ((evcon = evrpc_pool_find_connection(pool)) != NULL) { 968 1.1 plunky TAILQ_REMOVE(&pool->requests, ctx, next); 969 1.1 plunky evrpc_schedule_request(evcon, ctx); 970 1.1 plunky } 971 1.1 plunky } 972 1.1 plunky 973 1.1 plunky static void 974 1.2 christos evrpc_request_timeout(evutil_socket_t fd, short what, void *arg) 975 1.1 plunky { 976 1.1 plunky struct evrpc_request_wrapper *ctx = arg; 977 1.1 plunky struct evhttp_connection *evcon = ctx->evcon; 978 1.2 christos EVUTIL_ASSERT(evcon != NULL); 979 1.1 plunky 980 1.4 christos evhttp_connection_fail_(evcon, EVREQ_HTTP_TIMEOUT); 981 1.1 plunky } 982 1.2 christos 983 1.2 christos /* 984 1.2 christos * frees potential meta data associated with a request. 985 1.2 christos */ 986 1.2 christos 987 1.2 christos static void 988 1.2 christos evrpc_meta_data_free(struct evrpc_meta_list *meta_data) 989 1.2 christos { 990 1.2 christos struct evrpc_meta *entry; 991 1.2 christos EVUTIL_ASSERT(meta_data != NULL); 992 1.2 christos 993 1.2 christos while ((entry = TAILQ_FIRST(meta_data)) != NULL) { 994 1.2 christos TAILQ_REMOVE(meta_data, entry, next); 995 1.2 christos mm_free(entry->key); 996 1.2 christos mm_free(entry->data); 997 1.2 christos mm_free(entry); 998 1.2 christos } 999 1.2 christos } 1000 1.2 christos 1001 1.2 christos static struct evrpc_hook_meta * 1002 1.4 christos evrpc_hook_meta_new_(void) 1003 1.2 christos { 1004 1.2 christos struct evrpc_hook_meta *ctx; 1005 1.2 christos ctx = mm_malloc(sizeof(struct evrpc_hook_meta)); 1006 1.2 christos EVUTIL_ASSERT(ctx != NULL); 1007 1.2 christos 1008 1.2 christos TAILQ_INIT(&ctx->meta_data); 1009 1.2 christos ctx->evcon = NULL; 1010 1.2 christos 1011 1.2 christos return (ctx); 1012 1.2 christos } 1013 1.2 christos 1014 1.2 christos static void 1015 1.4 christos evrpc_hook_associate_meta_(struct evrpc_hook_meta **pctx, 1016 1.2 christos struct evhttp_connection *evcon) 1017 1.2 christos { 1018 1.2 christos struct evrpc_hook_meta *ctx = *pctx; 1019 1.2 christos if (ctx == NULL) 1020 1.4 christos *pctx = ctx = evrpc_hook_meta_new_(); 1021 1.2 christos ctx->evcon = evcon; 1022 1.2 christos } 1023 1.2 christos 1024 1.2 christos static void 1025 1.4 christos evrpc_hook_context_free_(struct evrpc_hook_meta *ctx) 1026 1.2 christos { 1027 1.2 christos evrpc_meta_data_free(&ctx->meta_data); 1028 1.2 christos mm_free(ctx); 1029 1.2 christos } 1030 1.2 christos 1031 1.2 christos /* Adds meta data */ 1032 1.2 christos void 1033 1.2 christos evrpc_hook_add_meta(void *ctx, const char *key, 1034 1.2 christos const void *data, size_t data_size) 1035 1.2 christos { 1036 1.2 christos struct evrpc_request_wrapper *req = ctx; 1037 1.2 christos struct evrpc_hook_meta *store = NULL; 1038 1.2 christos struct evrpc_meta *meta = NULL; 1039 1.2 christos 1040 1.2 christos if ((store = req->hook_meta) == NULL) 1041 1.4 christos store = req->hook_meta = evrpc_hook_meta_new_(); 1042 1.2 christos 1043 1.2 christos meta = mm_malloc(sizeof(struct evrpc_meta)); 1044 1.2 christos EVUTIL_ASSERT(meta != NULL); 1045 1.2 christos meta->key = mm_strdup(key); 1046 1.2 christos EVUTIL_ASSERT(meta->key != NULL); 1047 1.2 christos meta->data_size = data_size; 1048 1.2 christos meta->data = mm_malloc(data_size); 1049 1.2 christos EVUTIL_ASSERT(meta->data != NULL); 1050 1.2 christos memcpy(meta->data, data, data_size); 1051 1.2 christos 1052 1.2 christos TAILQ_INSERT_TAIL(&store->meta_data, meta, next); 1053 1.2 christos } 1054 1.2 christos 1055 1.2 christos int 1056 1.2 christos evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size) 1057 1.2 christos { 1058 1.2 christos struct evrpc_request_wrapper *req = ctx; 1059 1.2 christos struct evrpc_meta *meta = NULL; 1060 1.2 christos 1061 1.2 christos if (req->hook_meta == NULL) 1062 1.2 christos return (-1); 1063 1.2 christos 1064 1.2 christos TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) { 1065 1.2 christos if (strcmp(meta->key, key) == 0) { 1066 1.2 christos *data = meta->data; 1067 1.2 christos *data_size = meta->data_size; 1068 1.2 christos return (0); 1069 1.2 christos } 1070 1.2 christos } 1071 1.2 christos 1072 1.2 christos return (-1); 1073 1.2 christos } 1074 1.2 christos 1075 1.2 christos struct evhttp_connection * 1076 1.2 christos evrpc_hook_get_connection(void *ctx) 1077 1.2 christos { 1078 1.2 christos struct evrpc_request_wrapper *req = ctx; 1079 1.2 christos return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL); 1080 1.2 christos } 1081 1.2 christos 1082 1.2 christos int 1083 1.2 christos evrpc_send_request_generic(struct evrpc_pool *pool, 1084 1.2 christos void *request, void *reply, 1085 1.2 christos void (*cb)(struct evrpc_status *, void *, void *, void *), 1086 1.2 christos void *cb_arg, 1087 1.2 christos const char *rpcname, 1088 1.2 christos void (*req_marshal)(struct evbuffer *, void *), 1089 1.2 christos void (*rpl_clear)(void *), 1090 1.2 christos int (*rpl_unmarshal)(void *, struct evbuffer *)) 1091 1.2 christos { 1092 1.2 christos struct evrpc_status status; 1093 1.2 christos struct evrpc_request_wrapper *ctx; 1094 1.2 christos ctx = evrpc_make_request_ctx(pool, request, reply, 1095 1.2 christos rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg); 1096 1.2 christos if (ctx == NULL) 1097 1.2 christos goto error; 1098 1.2 christos return (evrpc_make_request(ctx)); 1099 1.2 christos error: 1100 1.2 christos memset(&status, 0, sizeof(status)); 1101 1.2 christos status.error = EVRPC_STATUS_ERR_UNSTARTED; 1102 1.2 christos (*(cb))(&status, request, reply, cb_arg); 1103 1.2 christos return (-1); 1104 1.2 christos } 1105 1.2 christos 1106 1.2 christos /** Takes a request object and fills it in with the right magic */ 1107 1.2 christos static struct evrpc * 1108 1.2 christos evrpc_register_object(const char *name, 1109 1.2 christos void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *), 1110 1.2 christos int (*req_unmarshal)(void *, struct evbuffer *), 1111 1.2 christos void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *), 1112 1.2 christos int (*rpl_complete)(void *), 1113 1.2 christos void (*rpl_marshal)(struct evbuffer *, void *)) 1114 1.2 christos { 1115 1.2 christos struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc)); 1116 1.2 christos if (rpc == NULL) 1117 1.2 christos return (NULL); 1118 1.2 christos rpc->uri = mm_strdup(name); 1119 1.2 christos if (rpc->uri == NULL) { 1120 1.2 christos mm_free(rpc); 1121 1.2 christos return (NULL); 1122 1.2 christos } 1123 1.2 christos rpc->request_new = req_new; 1124 1.2 christos rpc->request_new_arg = req_new_arg; 1125 1.2 christos rpc->request_free = req_free; 1126 1.2 christos rpc->request_unmarshal = req_unmarshal; 1127 1.2 christos rpc->reply_new = rpl_new; 1128 1.2 christos rpc->reply_new_arg = rpl_new_arg; 1129 1.2 christos rpc->reply_free = rpl_free; 1130 1.2 christos rpc->reply_complete = rpl_complete; 1131 1.2 christos rpc->reply_marshal = rpl_marshal; 1132 1.2 christos return (rpc); 1133 1.2 christos } 1134 1.2 christos 1135 1.2 christos int 1136 1.2 christos evrpc_register_generic(struct evrpc_base *base, const char *name, 1137 1.2 christos void (*callback)(struct evrpc_req_generic *, void *), void *cbarg, 1138 1.2 christos void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *), 1139 1.2 christos int (*req_unmarshal)(void *, struct evbuffer *), 1140 1.2 christos void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *), 1141 1.2 christos int (*rpl_complete)(void *), 1142 1.2 christos void (*rpl_marshal)(struct evbuffer *, void *)) 1143 1.2 christos { 1144 1.2 christos struct evrpc* rpc = 1145 1.2 christos evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal, 1146 1.2 christos rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal); 1147 1.2 christos if (rpc == NULL) 1148 1.2 christos return (-1); 1149 1.2 christos evrpc_register_rpc(base, rpc, 1150 1.2 christos (void (*)(struct evrpc_req_generic*, void *))callback, cbarg); 1151 1.2 christos return (0); 1152 1.2 christos } 1153 1.2 christos 1154 1.2 christos /** accessors for obscure and undocumented functionality */ 1155 1.2 christos struct evrpc_pool * 1156 1.2 christos evrpc_request_get_pool(struct evrpc_request_wrapper *ctx) 1157 1.2 christos { 1158 1.2 christos return (ctx->pool); 1159 1.2 christos } 1160 1.2 christos 1161 1.2 christos void 1162 1.2 christos evrpc_request_set_pool(struct evrpc_request_wrapper *ctx, 1163 1.2 christos struct evrpc_pool *pool) 1164 1.2 christos { 1165 1.2 christos ctx->pool = pool; 1166 1.2 christos } 1167 1.2 christos 1168 1.2 christos void 1169 1.2 christos evrpc_request_set_cb(struct evrpc_request_wrapper *ctx, 1170 1.2 christos void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg), 1171 1.2 christos void *cb_arg) 1172 1.2 christos { 1173 1.2 christos ctx->cb = cb; 1174 1.2 christos ctx->cb_arg = cb_arg; 1175 1.2 christos } 1176