Home | History | Annotate | Line # | Download | only in test
benchmark-pump.c revision 1.1.1.2
      1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
      2  *
      3  * Permission is hereby granted, free of charge, to any person obtaining a copy
      4  * of this software and associated documentation files (the "Software"), to
      5  * deal in the Software without restriction, including without limitation the
      6  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
      7  * sell copies of the Software, and to permit persons to whom the Software is
      8  * furnished to do so, subject to the following conditions:
      9  *
     10  * The above copyright notice and this permission notice shall be included in
     11  * all copies or substantial portions of the Software.
     12  *
     13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     16  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     17  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
     18  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
     19  * IN THE SOFTWARE.
     20  */
     21 
     22 #include "task.h"
     23 #include "uv.h"
     24 
     25 #include <math.h>
     26 #include <stdio.h>
     27 
     28 
     29 static int TARGET_CONNECTIONS;
     30 #define WRITE_BUFFER_SIZE           8192
     31 #define MAX_SIMULTANEOUS_CONNECTS   100
     32 
     33 #define PRINT_STATS                 0
     34 #define STATS_INTERVAL              1000 /* msec */
     35 #define STATS_COUNT                 5
     36 
     37 
     38 static void do_write(uv_stream_t*);
     39 static void maybe_connect_some(void);
     40 
     41 static uv_req_t* req_alloc(void);
     42 static void req_free(uv_req_t* uv_req);
     43 
     44 static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf);
     45 static void buf_free(const uv_buf_t* buf);
     46 
     47 static uv_loop_t* loop;
     48 
     49 static uv_tcp_t tcpServer;
     50 static uv_pipe_t pipeServer;
     51 static uv_stream_t* server;
     52 static struct sockaddr_in listen_addr;
     53 static struct sockaddr_in connect_addr;
     54 
     55 static int64_t start_time;
     56 
     57 static int max_connect_socket = 0;
     58 static int max_read_sockets = 0;
     59 static int read_sockets = 0;
     60 static int write_sockets = 0;
     61 
     62 static int64_t nrecv = 0;
     63 static int64_t nrecv_total = 0;
     64 static int64_t nsent = 0;
     65 static int64_t nsent_total = 0;
     66 
     67 static int stats_left = 0;
     68 
     69 static char write_buffer[WRITE_BUFFER_SIZE];
     70 
     71 /* Make this as large as you need. */
     72 #define MAX_WRITE_HANDLES 1000
     73 
     74 static stream_type type;
     75 
     76 static uv_tcp_t tcp_write_handles[MAX_WRITE_HANDLES];
     77 static uv_pipe_t pipe_write_handles[MAX_WRITE_HANDLES];
     78 
     79 static uv_timer_t timer_handle;
     80 
     81 
     82 static double gbit(int64_t bytes, int64_t passed_ms) {
     83   double gbits = ((double)bytes / (1024 * 1024 * 1024)) * 8;
     84   return gbits / ((double)passed_ms / 1000);
     85 }
     86 
     87 
     88 static void show_stats(uv_timer_t* handle) {
     89   int64_t diff;
     90   int i;
     91 
     92 #if PRINT_STATS
     93   fprintf(stderr, "connections: %d, write: %.1f gbit/s\n",
     94           write_sockets,
     95           gbit(nsent, STATS_INTERVAL));
     96   fflush(stderr);
     97 #endif
     98 
     99   /* Exit if the show is over */
    100   if (!--stats_left) {
    101 
    102     uv_update_time(loop);
    103     diff = uv_now(loop) - start_time;
    104 
    105     fprintf(stderr, "%s_pump%d_client: %.1f gbit/s\n",
    106             type == TCP ? "tcp" : "pipe",
    107             write_sockets,
    108             gbit(nsent_total, diff));
    109     fflush(stderr);
    110 
    111     for (i = 0; i < write_sockets; i++) {
    112       if (type == TCP)
    113         uv_close((uv_handle_t*) &tcp_write_handles[i], NULL);
    114       else
    115         uv_close((uv_handle_t*) &pipe_write_handles[i], NULL);
    116     }
    117 
    118     exit(0);
    119   }
    120 
    121   /* Reset read and write counters */
    122   nrecv = 0;
    123   nsent = 0;
    124 }
    125 
    126 
    127 static void read_show_stats(void) {
    128   int64_t diff;
    129 
    130   uv_update_time(loop);
    131   diff = uv_now(loop) - start_time;
    132 
    133   fprintf(stderr, "%s_pump%d_server: %.1f gbit/s\n",
    134           type == TCP ? "tcp" : "pipe",
    135           max_read_sockets,
    136           gbit(nrecv_total, diff));
    137   fflush(stderr);
    138 }
    139 
    140 
    141 
    142 static void read_sockets_close_cb(uv_handle_t* handle) {
    143   free(handle);
    144   read_sockets--;
    145 
    146   /* If it's past the first second and everyone has closed their connection
    147    * Then print stats.
    148    */
    149   if (uv_now(loop) - start_time > 1000 && read_sockets == 0) {
    150     read_show_stats();
    151     uv_close((uv_handle_t*)server, NULL);
    152   }
    153 }
    154 
    155 
    156 static void start_stats_collection(void) {
    157   int r;
    158 
    159   /* Show-stats timer */
    160   stats_left = STATS_COUNT;
    161   r = uv_timer_init(loop, &timer_handle);
    162   ASSERT(r == 0);
    163   r = uv_timer_start(&timer_handle, show_stats, STATS_INTERVAL, STATS_INTERVAL);
    164   ASSERT(r == 0);
    165 
    166   uv_update_time(loop);
    167   start_time = uv_now(loop);
    168 }
    169 
    170 
    171 static void read_cb(uv_stream_t* stream, ssize_t bytes, const uv_buf_t* buf) {
    172   if (nrecv_total == 0) {
    173     ASSERT(start_time == 0);
    174     uv_update_time(loop);
    175     start_time = uv_now(loop);
    176   }
    177 
    178   if (bytes < 0) {
    179     uv_close((uv_handle_t*)stream, read_sockets_close_cb);
    180     return;
    181   }
    182 
    183   buf_free(buf);
    184 
    185   nrecv += bytes;
    186   nrecv_total += bytes;
    187 }
    188 
    189 
    190 static void write_cb(uv_write_t* req, int status) {
    191   ASSERT(status == 0);
    192 
    193   req_free((uv_req_t*) req);
    194 
    195   nsent += sizeof write_buffer;
    196   nsent_total += sizeof write_buffer;
    197 
    198   do_write((uv_stream_t*) req->handle);
    199 }
    200 
    201 
    202 static void do_write(uv_stream_t* stream) {
    203   uv_write_t* req;
    204   uv_buf_t buf;
    205   int r;
    206 
    207   buf.base = (char*) &write_buffer;
    208   buf.len = sizeof write_buffer;
    209 
    210   req = (uv_write_t*) req_alloc();
    211   r = uv_write(req, stream, &buf, 1, write_cb);
    212   ASSERT(r == 0);
    213 }
    214 
    215 
    216 static void connect_cb(uv_connect_t* req, int status) {
    217   int i;
    218 
    219   if (status) {
    220     fprintf(stderr, "%s", uv_strerror(status));
    221     fflush(stderr);
    222   }
    223   ASSERT(status == 0);
    224 
    225   write_sockets++;
    226   req_free((uv_req_t*) req);
    227 
    228   maybe_connect_some();
    229 
    230   if (write_sockets == TARGET_CONNECTIONS) {
    231     start_stats_collection();
    232 
    233     /* Yay! start writing */
    234     for (i = 0; i < write_sockets; i++) {
    235       if (type == TCP)
    236         do_write((uv_stream_t*) &tcp_write_handles[i]);
    237       else
    238         do_write((uv_stream_t*) &pipe_write_handles[i]);
    239     }
    240   }
    241 }
    242 
    243 
    244 static void maybe_connect_some(void) {
    245   uv_connect_t* req;
    246   uv_tcp_t* tcp;
    247   uv_pipe_t* pipe;
    248   int r;
    249 
    250   while (max_connect_socket < TARGET_CONNECTIONS &&
    251          max_connect_socket < write_sockets + MAX_SIMULTANEOUS_CONNECTS) {
    252     if (type == TCP) {
    253       tcp = &tcp_write_handles[max_connect_socket++];
    254 
    255       r = uv_tcp_init(loop, tcp);
    256       ASSERT(r == 0);
    257 
    258       req = (uv_connect_t*) req_alloc();
    259       r = uv_tcp_connect(req,
    260                          tcp,
    261                          (const struct sockaddr*) &connect_addr,
    262                          connect_cb);
    263       ASSERT(r == 0);
    264     } else {
    265       pipe = &pipe_write_handles[max_connect_socket++];
    266 
    267       r = uv_pipe_init(loop, pipe, 0);
    268       ASSERT(r == 0);
    269 
    270       req = (uv_connect_t*) req_alloc();
    271       uv_pipe_connect(req, pipe, TEST_PIPENAME, connect_cb);
    272     }
    273   }
    274 }
    275 
    276 
    277 static void connection_cb(uv_stream_t* s, int status) {
    278   uv_stream_t* stream;
    279   int r;
    280 
    281   ASSERT(server == s);
    282   ASSERT(status == 0);
    283 
    284   if (type == TCP) {
    285     stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
    286     r = uv_tcp_init(loop, (uv_tcp_t*)stream);
    287     ASSERT(r == 0);
    288   } else {
    289     stream = (uv_stream_t*)malloc(sizeof(uv_pipe_t));
    290     r = uv_pipe_init(loop, (uv_pipe_t*)stream, 0);
    291     ASSERT(r == 0);
    292   }
    293 
    294   r = uv_accept(s, stream);
    295   ASSERT(r == 0);
    296 
    297   r = uv_read_start(stream, buf_alloc, read_cb);
    298   ASSERT(r == 0);
    299 
    300   read_sockets++;
    301   max_read_sockets++;
    302 }
    303 
    304 
    305 /*
    306  * Request allocator
    307  */
    308 
    309 typedef struct req_list_s {
    310   union uv_any_req uv_req;
    311   struct req_list_s* next;
    312 } req_list_t;
    313 
    314 
    315 static req_list_t* req_freelist = NULL;
    316 
    317 
    318 static uv_req_t* req_alloc(void) {
    319   req_list_t* req;
    320 
    321   req = req_freelist;
    322   if (req != NULL) {
    323     req_freelist = req->next;
    324     return (uv_req_t*) req;
    325   }
    326 
    327   req = (req_list_t*) malloc(sizeof *req);
    328   return (uv_req_t*) req;
    329 }
    330 
    331 
    332 static void req_free(uv_req_t* uv_req) {
    333   req_list_t* req = (req_list_t*) uv_req;
    334 
    335   req->next = req_freelist;
    336   req_freelist = req;
    337 }
    338 
    339 
    340 /*
    341  * Buffer allocator
    342  */
    343 
    344 typedef struct buf_list_s {
    345   uv_buf_t uv_buf_t;
    346   struct buf_list_s* next;
    347 } buf_list_t;
    348 
    349 
    350 static buf_list_t* buf_freelist = NULL;
    351 
    352 
    353 static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf) {
    354   buf_list_t* ab;
    355 
    356   ab = buf_freelist;
    357   if (ab != NULL)
    358     buf_freelist = ab->next;
    359   else {
    360     ab = malloc(size + sizeof(*ab));
    361     ab->uv_buf_t.len = size;
    362     ab->uv_buf_t.base = (char*) (ab + 1);
    363   }
    364 
    365   *buf = ab->uv_buf_t;
    366 }
    367 
    368 
    369 static void buf_free(const uv_buf_t* buf) {
    370   buf_list_t* ab = (buf_list_t*) buf->base - 1;
    371   ab->next = buf_freelist;
    372   buf_freelist = ab;
    373 }
    374 
    375 
    376 HELPER_IMPL(tcp_pump_server) {
    377   int r;
    378 
    379   type = TCP;
    380   loop = uv_default_loop();
    381 
    382   ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &listen_addr));
    383 
    384   /* Server */
    385   server = (uv_stream_t*)&tcpServer;
    386   r = uv_tcp_init(loop, &tcpServer);
    387   ASSERT(r == 0);
    388   r = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &listen_addr, 0);
    389   ASSERT(r == 0);
    390   r = uv_listen((uv_stream_t*)&tcpServer, MAX_WRITE_HANDLES, connection_cb);
    391   ASSERT(r == 0);
    392 
    393   notify_parent_process();
    394   uv_run(loop, UV_RUN_DEFAULT);
    395 
    396   return 0;
    397 }
    398 
    399 
    400 HELPER_IMPL(pipe_pump_server) {
    401   int r;
    402   type = PIPE;
    403 
    404   loop = uv_default_loop();
    405 
    406   /* Server */
    407   server = (uv_stream_t*)&pipeServer;
    408   r = uv_pipe_init(loop, &pipeServer, 0);
    409   ASSERT(r == 0);
    410   r = uv_pipe_bind(&pipeServer, TEST_PIPENAME);
    411   ASSERT(r == 0);
    412   r = uv_listen((uv_stream_t*)&pipeServer, MAX_WRITE_HANDLES, connection_cb);
    413   ASSERT(r == 0);
    414 
    415   notify_parent_process();
    416   uv_run(loop, UV_RUN_DEFAULT);
    417 
    418   MAKE_VALGRIND_HAPPY();
    419   return 0;
    420 }
    421 
    422 
    423 static void tcp_pump(int n) {
    424   ASSERT(n <= MAX_WRITE_HANDLES);
    425   TARGET_CONNECTIONS = n;
    426   type = TCP;
    427 
    428   loop = uv_default_loop();
    429 
    430   ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &connect_addr));
    431 
    432   /* Start making connections */
    433   maybe_connect_some();
    434 
    435   uv_run(loop, UV_RUN_DEFAULT);
    436 
    437   MAKE_VALGRIND_HAPPY();
    438 }
    439 
    440 
    441 static void pipe_pump(int n) {
    442   ASSERT(n <= MAX_WRITE_HANDLES);
    443   TARGET_CONNECTIONS = n;
    444   type = PIPE;
    445 
    446   loop = uv_default_loop();
    447 
    448   /* Start making connections */
    449   maybe_connect_some();
    450 
    451   uv_run(loop, UV_RUN_DEFAULT);
    452 
    453   MAKE_VALGRIND_HAPPY();
    454 }
    455 
    456 
    457 BENCHMARK_IMPL(tcp_pump100_client) {
    458   tcp_pump(100);
    459   return 0;
    460 }
    461 
    462 
    463 BENCHMARK_IMPL(tcp_pump1_client) {
    464   tcp_pump(1);
    465   return 0;
    466 }
    467 
    468 
    469 BENCHMARK_IMPL(pipe_pump100_client) {
    470   pipe_pump(100);
    471   return 0;
    472 }
    473 
    474 
    475 BENCHMARK_IMPL(pipe_pump1_client) {
    476   pipe_pump(1);
    477   return 0;
    478 }
    479