Home | History | Annotate | Line # | Download | only in test
benchmark-multi-accept.c revision 1.1
      1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
      2  *
      3  * Permission is hereby granted, free of charge, to any person obtaining a copy
      4  * of this software and associated documentation files (the "Software"), to
      5  * deal in the Software without restriction, including without limitation the
      6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
      7  * sell copies of the Software, and to permit persons to whom the Software is
      8  * furnished to do so, subject to the following conditions:
      9  *
     10  * The above copyright notice and this permission notice shall be included in
     11  * all copies or substantial portions of the Software.
     12  *
     13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
     18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
     19  * IN THE SOFTWARE.
     20  */
     21 
     22 #include "task.h"
     23 #include "uv.h"
     24 
     25 #define IPC_PIPE_NAME TEST_PIPENAME
     26 #define NUM_CONNECTS  (250 * 1000)
     27 
     28 union stream_handle {
     29   uv_pipe_t pipe;
     30   uv_tcp_t tcp;
     31 };
     32 
     33 /* Use as (uv_stream_t *) &handle_storage -- it's kind of clunky but it
     34  * avoids aliasing warnings.
     35  */
     36 typedef unsigned char handle_storage_t[sizeof(union stream_handle)];
     37 
     38 /* Used for passing around the listen handle, not part of the benchmark proper.
     39  * We have an overabundance of server types here. It works like this:
     40  *
     41  *  1. The main thread starts an IPC pipe server.
     42  *  2. The worker threads connect to the IPC server and obtain a listen handle.
     43  *  3. The worker threads start accepting requests on the listen handle.
     44  *  4. The main thread starts connecting repeatedly.
     45  *
     46  * Step #4 should perhaps be farmed out over several threads.
     47  */
     48 struct ipc_server_ctx {
     49   handle_storage_t server_handle;
     50   unsigned int num_connects;
     51   uv_pipe_t ipc_pipe;
     52 };
     53 
     54 struct ipc_peer_ctx {
     55   handle_storage_t peer_handle;
     56   uv_write_t write_req;
     57 };
     58 
     59 struct ipc_client_ctx {
     60   uv_connect_t connect_req;
     61   uv_stream_t* server_handle;
     62   uv_pipe_t ipc_pipe;
     63   char scratch[16];
     64 };
     65 
     66 /* Used in the actual benchmark. */
     67 struct server_ctx {
     68   handle_storage_t server_handle;
     69   unsigned int num_connects;
     70   uv_async_t async_handle;
     71   uv_thread_t thread_id;
     72   uv_sem_t semaphore;
     73 };
     74 
     75 struct client_ctx {
     76   handle_storage_t client_handle;
     77   unsigned int num_connects;
     78   uv_connect_t connect_req;
     79   uv_idle_t idle_handle;
     80 };
     81 
     82 static void ipc_connection_cb(uv_stream_t* ipc_pipe, int status);
     83 static void ipc_write_cb(uv_write_t* req, int status);
     84 static void ipc_close_cb(uv_handle_t* handle);
     85 static void ipc_connect_cb(uv_connect_t* req, int status);
     86 static void ipc_read_cb(uv_stream_t* handle,
     87                         ssize_t nread,
     88                         const uv_buf_t* buf);
     89 static void ipc_alloc_cb(uv_handle_t* handle,
     90                          size_t suggested_size,
     91                          uv_buf_t* buf);
     92 
     93 static void sv_async_cb(uv_async_t* handle);
     94 static void sv_connection_cb(uv_stream_t* server_handle, int status);
     95 static void sv_read_cb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf);
     96 static void sv_alloc_cb(uv_handle_t* handle,
     97                         size_t suggested_size,
     98                         uv_buf_t* buf);
     99 
    100 static void cl_connect_cb(uv_connect_t* req, int status);
    101 static void cl_idle_cb(uv_idle_t* handle);
    102 static void cl_close_cb(uv_handle_t* handle);
    103 
    104 static struct sockaddr_in listen_addr;
    105 
    106 
    107 static void ipc_connection_cb(uv_stream_t* ipc_pipe, int status) {
    108   struct ipc_server_ctx* sc;
    109   struct ipc_peer_ctx* pc;
    110   uv_loop_t* loop;
    111   uv_buf_t buf;
    112 
    113   loop = ipc_pipe->loop;
    114   buf = uv_buf_init("PING", 4);
    115   sc = container_of(ipc_pipe, struct ipc_server_ctx, ipc_pipe);
    116   pc = calloc(1, sizeof(*pc));
    117   ASSERT(pc != NULL);
    118 
    119   if (ipc_pipe->type == UV_TCP)
    120     ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) &pc->peer_handle));
    121   else if (ipc_pipe->type == UV_NAMED_PIPE)
    122     ASSERT(0 == uv_pipe_init(loop, (uv_pipe_t*) &pc->peer_handle, 1));
    123   else
    124     ASSERT(0);
    125 
    126   ASSERT(0 == uv_accept(ipc_pipe, (uv_stream_t*) &pc->peer_handle));
    127   ASSERT(0 == uv_write2(&pc->write_req,
    128                         (uv_stream_t*) &pc->peer_handle,
    129                         &buf,
    130                         1,
    131                         (uv_stream_t*) &sc->server_handle,
    132                         ipc_write_cb));
    133 
    134   if (--sc->num_connects == 0)
    135     uv_close((uv_handle_t*) ipc_pipe, NULL);
    136 }
    137 
    138 
    139 static void ipc_write_cb(uv_write_t* req, int status) {
    140   struct ipc_peer_ctx* ctx;
    141   ctx = container_of(req, struct ipc_peer_ctx, write_req);
    142   uv_close((uv_handle_t*) &ctx->peer_handle, ipc_close_cb);
    143 }
    144 
    145 
    146 static void ipc_close_cb(uv_handle_t* handle) {
    147   struct ipc_peer_ctx* ctx;
    148   ctx = container_of(handle, struct ipc_peer_ctx, peer_handle);
    149   free(ctx);
    150 }
    151 
    152 
    153 static void ipc_connect_cb(uv_connect_t* req, int status) {
    154   struct ipc_client_ctx* ctx;
    155   ctx = container_of(req, struct ipc_client_ctx, connect_req);
    156   ASSERT(0 == status);
    157   ASSERT(0 == uv_read_start((uv_stream_t*) &ctx->ipc_pipe,
    158                             ipc_alloc_cb,
    159                             ipc_read_cb));
    160 }
    161 
    162 
    163 static void ipc_alloc_cb(uv_handle_t* handle,
    164                          size_t suggested_size,
    165                          uv_buf_t* buf) {
    166   struct ipc_client_ctx* ctx;
    167   ctx = container_of(handle, struct ipc_client_ctx, ipc_pipe);
    168   buf->base = ctx->scratch;
    169   buf->len = sizeof(ctx->scratch);
    170 }
    171 
    172 
    173 static void ipc_read_cb(uv_stream_t* handle,
    174                         ssize_t nread,
    175                         const uv_buf_t* buf) {
    176   struct ipc_client_ctx* ctx;
    177   uv_loop_t* loop;
    178   uv_handle_type type;
    179   uv_pipe_t* ipc_pipe;
    180 
    181   ipc_pipe = (uv_pipe_t*) handle;
    182   ctx = container_of(ipc_pipe, struct ipc_client_ctx, ipc_pipe);
    183   loop = ipc_pipe->loop;
    184 
    185   ASSERT(1 == uv_pipe_pending_count(ipc_pipe));
    186   type = uv_pipe_pending_type(ipc_pipe);
    187   if (type == UV_TCP)
    188     ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) ctx->server_handle));
    189   else if (type == UV_NAMED_PIPE)
    190     ASSERT(0 == uv_pipe_init(loop, (uv_pipe_t*) ctx->server_handle, 0));
    191   else
    192     ASSERT(0);
    193 
    194   ASSERT(0 == uv_accept(handle, ctx->server_handle));
    195   uv_close((uv_handle_t*) &ctx->ipc_pipe, NULL);
    196 }
    197 
    198 
    199 /* Set up an IPC pipe server that hands out listen sockets to the worker
    200  * threads. It's kind of cumbersome for such a simple operation, maybe we
    201  * should revive uv_import() and uv_export().
    202  */
    203 static void send_listen_handles(uv_handle_type type,
    204                                 unsigned int num_servers,
    205                                 struct server_ctx* servers) {
    206   struct ipc_server_ctx ctx;
    207   uv_loop_t* loop;
    208   unsigned int i;
    209 
    210   loop = uv_default_loop();
    211   ctx.num_connects = num_servers;
    212 
    213   if (type == UV_TCP) {
    214     ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) &ctx.server_handle));
    215     ASSERT(0 == uv_tcp_bind((uv_tcp_t*) &ctx.server_handle,
    216                             (const struct sockaddr*) &listen_addr,
    217                             0));
    218   }
    219   else
    220     ASSERT(0);
    221   /* We need to initialize this pipe with ipc=0 - this is not a uv_pipe we'll
    222    * be sending handles over, it's just for listening for new connections.
    223    * If we accept a connection then the connected pipe must be initialized
    224    * with ipc=1.
    225    */
    226   ASSERT(0 == uv_pipe_init(loop, &ctx.ipc_pipe, 0));
    227   ASSERT(0 == uv_pipe_bind(&ctx.ipc_pipe, IPC_PIPE_NAME));
    228   ASSERT(0 == uv_listen((uv_stream_t*) &ctx.ipc_pipe, 128, ipc_connection_cb));
    229 
    230   for (i = 0; i < num_servers; i++)
    231     uv_sem_post(&servers[i].semaphore);
    232 
    233   ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
    234   uv_close((uv_handle_t*) &ctx.server_handle, NULL);
    235   ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
    236 
    237   for (i = 0; i < num_servers; i++)
    238     uv_sem_wait(&servers[i].semaphore);
    239 }
    240 
    241 
    242 static void get_listen_handle(uv_loop_t* loop, uv_stream_t* server_handle) {
    243   struct ipc_client_ctx ctx;
    244 
    245   ctx.server_handle = server_handle;
    246   ctx.server_handle->data = "server handle";
    247 
    248   ASSERT(0 == uv_pipe_init(loop, &ctx.ipc_pipe, 1));
    249   uv_pipe_connect(&ctx.connect_req,
    250                   &ctx.ipc_pipe,
    251                   IPC_PIPE_NAME,
    252                   ipc_connect_cb);
    253   ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
    254 }
    255 
    256 
    257 static void server_cb(void *arg) {
    258   struct server_ctx *ctx;
    259   uv_loop_t loop;
    260 
    261   ctx = arg;
    262   ASSERT(0 == uv_loop_init(&loop));
    263 
    264   ASSERT(0 == uv_async_init(&loop, &ctx->async_handle, sv_async_cb));
    265   uv_unref((uv_handle_t*) &ctx->async_handle);
    266 
    267   /* Wait until the main thread is ready. */
    268   uv_sem_wait(&ctx->semaphore);
    269   get_listen_handle(&loop, (uv_stream_t*) &ctx->server_handle);
    270   uv_sem_post(&ctx->semaphore);
    271 
    272   /* Now start the actual benchmark. */
    273   ASSERT(0 == uv_listen((uv_stream_t*) &ctx->server_handle,
    274                         128,
    275                         sv_connection_cb));
    276   ASSERT(0 == uv_run(&loop, UV_RUN_DEFAULT));
    277 
    278   uv_loop_close(&loop);
    279 }
    280 
    281 
    282 static void sv_async_cb(uv_async_t* handle) {
    283   struct server_ctx* ctx;
    284   ctx = container_of(handle, struct server_ctx, async_handle);
    285   uv_close((uv_handle_t*) &ctx->server_handle, NULL);
    286   uv_close((uv_handle_t*) &ctx->async_handle, NULL);
    287 }
    288 
    289 
    290 static void sv_connection_cb(uv_stream_t* server_handle, int status) {
    291   handle_storage_t* storage;
    292   struct server_ctx* ctx;
    293 
    294   ctx = container_of(server_handle, struct server_ctx, server_handle);
    295   ASSERT(status == 0);
    296 
    297   storage = malloc(sizeof(*storage));
    298   ASSERT(storage != NULL);
    299 
    300   if (server_handle->type == UV_TCP)
    301     ASSERT(0 == uv_tcp_init(server_handle->loop, (uv_tcp_t*) storage));
    302   else if (server_handle->type == UV_NAMED_PIPE)
    303     ASSERT(0 == uv_pipe_init(server_handle->loop, (uv_pipe_t*) storage, 0));
    304   else
    305     ASSERT(0);
    306 
    307   ASSERT(0 == uv_accept(server_handle, (uv_stream_t*) storage));
    308   ASSERT(0 == uv_read_start((uv_stream_t*) storage, sv_alloc_cb, sv_read_cb));
    309   ctx->num_connects++;
    310 }
    311 
    312 
    313 static void sv_alloc_cb(uv_handle_t* handle,
    314                         size_t suggested_size,
    315                         uv_buf_t* buf) {
    316   static char slab[32];
    317   buf->base = slab;
    318   buf->len = sizeof(slab);
    319 }
    320 
    321 
    322 static void sv_read_cb(uv_stream_t* handle,
    323                        ssize_t nread,
    324                        const uv_buf_t* buf) {
    325   ASSERT(nread == UV_EOF);
    326   uv_close((uv_handle_t*) handle, (uv_close_cb) free);
    327 }
    328 
    329 
    330 static void cl_connect_cb(uv_connect_t* req, int status) {
    331   struct client_ctx* ctx = container_of(req, struct client_ctx, connect_req);
    332   uv_idle_start(&ctx->idle_handle, cl_idle_cb);
    333   ASSERT(0 == status);
    334 }
    335 
    336 
    337 static void cl_idle_cb(uv_idle_t* handle) {
    338   struct client_ctx* ctx = container_of(handle, struct client_ctx, idle_handle);
    339   uv_close((uv_handle_t*) &ctx->client_handle, cl_close_cb);
    340   uv_idle_stop(&ctx->idle_handle);
    341 }
    342 
    343 
    344 static void cl_close_cb(uv_handle_t* handle) {
    345   struct client_ctx* ctx;
    346 
    347   ctx = container_of(handle, struct client_ctx, client_handle);
    348 
    349   if (--ctx->num_connects == 0) {
    350     uv_close((uv_handle_t*) &ctx->idle_handle, NULL);
    351     return;
    352   }
    353 
    354   ASSERT(0 == uv_tcp_init(handle->loop, (uv_tcp_t*) &ctx->client_handle));
    355   ASSERT(0 == uv_tcp_connect(&ctx->connect_req,
    356                              (uv_tcp_t*) &ctx->client_handle,
    357                              (const struct sockaddr*) &listen_addr,
    358                              cl_connect_cb));
    359 }
    360 
    361 
    362 static int test_tcp(unsigned int num_servers, unsigned int num_clients) {
    363   struct server_ctx* servers;
    364   struct client_ctx* clients;
    365   uv_loop_t* loop;
    366   uv_tcp_t* handle;
    367   unsigned int i;
    368   double time;
    369 
    370   ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &listen_addr));
    371   loop = uv_default_loop();
    372 
    373   servers = calloc(num_servers, sizeof(servers[0]));
    374   clients = calloc(num_clients, sizeof(clients[0]));
    375   ASSERT(servers != NULL);
    376   ASSERT(clients != NULL);
    377 
    378   /* We're making the assumption here that from the perspective of the
    379    * OS scheduler, threads are functionally equivalent to and interchangeable
    380    * with full-blown processes.
    381    */
    382   for (i = 0; i < num_servers; i++) {
    383     struct server_ctx* ctx = servers + i;
    384     ASSERT(0 == uv_sem_init(&ctx->semaphore, 0));
    385     ASSERT(0 == uv_thread_create(&ctx->thread_id, server_cb, ctx));
    386   }
    387 
    388   send_listen_handles(UV_TCP, num_servers, servers);
    389 
    390   for (i = 0; i < num_clients; i++) {
    391     struct client_ctx* ctx = clients + i;
    392     ctx->num_connects = NUM_CONNECTS / num_clients;
    393     handle = (uv_tcp_t*) &ctx->client_handle;
    394     handle->data = "client handle";
    395     ASSERT(0 == uv_tcp_init(loop, handle));
    396     ASSERT(0 == uv_tcp_connect(&ctx->connect_req,
    397                                handle,
    398                                (const struct sockaddr*) &listen_addr,
    399                                cl_connect_cb));
    400     ASSERT(0 == uv_idle_init(loop, &ctx->idle_handle));
    401   }
    402 
    403   {
    404     uint64_t t = uv_hrtime();
    405     ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT));
    406     t = uv_hrtime() - t;
    407     time = t / 1e9;
    408   }
    409 
    410   for (i = 0; i < num_servers; i++) {
    411     struct server_ctx* ctx = servers + i;
    412     uv_async_send(&ctx->async_handle);
    413     ASSERT(0 == uv_thread_join(&ctx->thread_id));
    414     uv_sem_destroy(&ctx->semaphore);
    415   }
    416 
    417   printf("accept%u: %.0f accepts/sec (%u total)\n",
    418          num_servers,
    419          NUM_CONNECTS / time,
    420          NUM_CONNECTS);
    421 
    422   for (i = 0; i < num_servers; i++) {
    423     struct server_ctx* ctx = servers + i;
    424     printf("  thread #%u: %.0f accepts/sec (%u total, %.1f%%)\n",
    425            i,
    426            ctx->num_connects / time,
    427            ctx->num_connects,
    428            ctx->num_connects * 100.0 / NUM_CONNECTS);
    429   }
    430 
    431   free(clients);
    432   free(servers);
    433 
    434   MAKE_VALGRIND_HAPPY();
    435   return 0;
    436 }
    437 
    438 
    439 BENCHMARK_IMPL(tcp_multi_accept2) {
    440   return test_tcp(2, 40);
    441 }
    442 
    443 
    444 BENCHMARK_IMPL(tcp_multi_accept4) {
    445   return test_tcp(4, 40);
    446 }
    447 
    448 
    449 BENCHMARK_IMPL(tcp_multi_accept8) {
    450   return test_tcp(8, 40);
    451 }
    452