1 1.1 christos /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 1.1 christos * 3 1.1 christos * Permission is hereby granted, free of charge, to any person obtaining a copy 4 1.1 christos * of this software and associated documentation files (the "Software"), to 5 1.1 christos * deal in the Software without restriction, including without limitation the 6 1.1 christos * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 7 1.1 christos * sell copies of the Software, and to permit persons to whom the Software is 8 1.1 christos * furnished to do so, subject to the following conditions: 9 1.1 christos * 10 1.1 christos * The above copyright notice and this permission notice shall be included in 11 1.1 christos * all copies or substantial portions of the Software. 12 1.1 christos * 13 1.1 christos * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 1.1 christos * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 1.1 christos * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 1.1 christos * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 1.1 christos * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 18 1.1 christos * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19 1.1 christos * IN THE SOFTWARE. 20 1.1 christos */ 21 1.1 christos 22 1.1 christos #include "task.h" 23 1.1 christos #include "uv.h" 24 1.1 christos 25 1.1 christos #include <math.h> 26 1.1 christos #include <stdio.h> 27 1.1 christos 28 1.1 christos 29 1.1 christos static int TARGET_CONNECTIONS; 30 1.1 christos #define WRITE_BUFFER_SIZE 8192 31 1.1 christos #define MAX_SIMULTANEOUS_CONNECTS 100 32 1.1 christos 33 1.1 christos #define PRINT_STATS 0 34 1.1 christos #define STATS_INTERVAL 1000 /* msec */ 35 1.1 christos #define STATS_COUNT 5 36 1.1 christos 37 1.1 christos 38 1.1 christos static void do_write(uv_stream_t*); 39 1.1 christos static void maybe_connect_some(void); 40 1.1 christos 41 1.1 christos static uv_req_t* req_alloc(void); 42 1.1 christos static void req_free(uv_req_t* uv_req); 43 1.1 christos 44 1.1 christos static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf); 45 1.1 christos static void buf_free(const uv_buf_t* buf); 46 1.1 christos 47 1.1 christos static uv_loop_t* loop; 48 1.1 christos 49 1.1 christos static uv_tcp_t tcpServer; 50 1.1 christos static uv_pipe_t pipeServer; 51 1.1 christos static uv_stream_t* server; 52 1.1 christos static struct sockaddr_in listen_addr; 53 1.1 christos static struct sockaddr_in connect_addr; 54 1.1 christos 55 1.1 christos static int64_t start_time; 56 1.1 christos 57 1.1 christos static int max_connect_socket = 0; 58 1.1 christos static int max_read_sockets = 0; 59 1.1 christos static int read_sockets = 0; 60 1.1 christos static int write_sockets = 0; 61 1.1 christos 62 1.1 christos static int64_t nrecv = 0; 63 1.1 christos static int64_t nrecv_total = 0; 64 1.1 christos static int64_t nsent = 0; 65 1.1 christos static int64_t nsent_total = 0; 66 1.1 christos 67 1.1 christos static int stats_left = 0; 68 1.1 christos 69 1.1 christos static char write_buffer[WRITE_BUFFER_SIZE]; 70 1.1 christos 71 1.1 christos /* Make this as large as you need. */ 72 1.1 christos #define MAX_WRITE_HANDLES 1000 73 1.1 christos 74 1.1 christos static stream_type type; 75 1.1 christos 76 1.1 christos static uv_tcp_t tcp_write_handles[MAX_WRITE_HANDLES]; 77 1.1 christos static uv_pipe_t pipe_write_handles[MAX_WRITE_HANDLES]; 78 1.1 christos 79 1.1 christos static uv_timer_t timer_handle; 80 1.1 christos 81 1.1 christos 82 1.1 christos static double gbit(int64_t bytes, int64_t passed_ms) { 83 1.1 christos double gbits = ((double)bytes / (1024 * 1024 * 1024)) * 8; 84 1.1 christos return gbits / ((double)passed_ms / 1000); 85 1.1 christos } 86 1.1 christos 87 1.1 christos 88 1.1 christos static void show_stats(uv_timer_t* handle) { 89 1.1 christos int64_t diff; 90 1.1 christos int i; 91 1.1 christos 92 1.1 christos #if PRINT_STATS 93 1.1 christos fprintf(stderr, "connections: %d, write: %.1f gbit/s\n", 94 1.1 christos write_sockets, 95 1.1 christos gbit(nsent, STATS_INTERVAL)); 96 1.1 christos fflush(stderr); 97 1.1 christos #endif 98 1.1 christos 99 1.1 christos /* Exit if the show is over */ 100 1.1 christos if (!--stats_left) { 101 1.1 christos 102 1.1 christos uv_update_time(loop); 103 1.1 christos diff = uv_now(loop) - start_time; 104 1.1 christos 105 1.1 christos fprintf(stderr, "%s_pump%d_client: %.1f gbit/s\n", 106 1.1 christos type == TCP ? "tcp" : "pipe", 107 1.1 christos write_sockets, 108 1.1 christos gbit(nsent_total, diff)); 109 1.1 christos fflush(stderr); 110 1.1 christos 111 1.1 christos for (i = 0; i < write_sockets; i++) { 112 1.1 christos if (type == TCP) 113 1.1 christos uv_close((uv_handle_t*) &tcp_write_handles[i], NULL); 114 1.1 christos else 115 1.1 christos uv_close((uv_handle_t*) &pipe_write_handles[i], NULL); 116 1.1 christos } 117 1.1 christos 118 1.1 christos exit(0); 119 1.1 christos } 120 1.1 christos 121 1.1 christos /* Reset read and write counters */ 122 1.1 christos nrecv = 0; 123 1.1 christos nsent = 0; 124 1.1 christos } 125 1.1 christos 126 1.1 christos 127 1.1 christos static void read_show_stats(void) { 128 1.1 christos int64_t diff; 129 1.1 christos 130 1.1 christos uv_update_time(loop); 131 1.1 christos diff = uv_now(loop) - start_time; 132 1.1 christos 133 1.1 christos fprintf(stderr, "%s_pump%d_server: %.1f gbit/s\n", 134 1.1 christos type == TCP ? "tcp" : "pipe", 135 1.1 christos max_read_sockets, 136 1.1 christos gbit(nrecv_total, diff)); 137 1.1 christos fflush(stderr); 138 1.1 christos } 139 1.1 christos 140 1.1 christos 141 1.1 christos 142 1.1 christos static void read_sockets_close_cb(uv_handle_t* handle) { 143 1.1 christos free(handle); 144 1.1 christos read_sockets--; 145 1.1 christos 146 1.1 christos /* If it's past the first second and everyone has closed their connection 147 1.1 christos * Then print stats. 148 1.1 christos */ 149 1.1 christos if (uv_now(loop) - start_time > 1000 && read_sockets == 0) { 150 1.1 christos read_show_stats(); 151 1.1 christos uv_close((uv_handle_t*)server, NULL); 152 1.1 christos } 153 1.1 christos } 154 1.1 christos 155 1.1 christos 156 1.1 christos static void start_stats_collection(void) { 157 1.1 christos int r; 158 1.1 christos 159 1.1 christos /* Show-stats timer */ 160 1.1 christos stats_left = STATS_COUNT; 161 1.1 christos r = uv_timer_init(loop, &timer_handle); 162 1.1 christos ASSERT(r == 0); 163 1.1 christos r = uv_timer_start(&timer_handle, show_stats, STATS_INTERVAL, STATS_INTERVAL); 164 1.1 christos ASSERT(r == 0); 165 1.1 christos 166 1.1 christos uv_update_time(loop); 167 1.1 christos start_time = uv_now(loop); 168 1.1 christos } 169 1.1 christos 170 1.1 christos 171 1.1 christos static void read_cb(uv_stream_t* stream, ssize_t bytes, const uv_buf_t* buf) { 172 1.1 christos if (nrecv_total == 0) { 173 1.1 christos ASSERT(start_time == 0); 174 1.1 christos uv_update_time(loop); 175 1.1 christos start_time = uv_now(loop); 176 1.1 christos } 177 1.1 christos 178 1.1 christos if (bytes < 0) { 179 1.1 christos uv_close((uv_handle_t*)stream, read_sockets_close_cb); 180 1.1 christos return; 181 1.1 christos } 182 1.1 christos 183 1.1 christos buf_free(buf); 184 1.1 christos 185 1.1 christos nrecv += bytes; 186 1.1 christos nrecv_total += bytes; 187 1.1 christos } 188 1.1 christos 189 1.1 christos 190 1.1 christos static void write_cb(uv_write_t* req, int status) { 191 1.1 christos ASSERT(status == 0); 192 1.1 christos 193 1.1 christos req_free((uv_req_t*) req); 194 1.1 christos 195 1.1 christos nsent += sizeof write_buffer; 196 1.1 christos nsent_total += sizeof write_buffer; 197 1.1 christos 198 1.1 christos do_write((uv_stream_t*) req->handle); 199 1.1 christos } 200 1.1 christos 201 1.1 christos 202 1.1 christos static void do_write(uv_stream_t* stream) { 203 1.1 christos uv_write_t* req; 204 1.1 christos uv_buf_t buf; 205 1.1 christos int r; 206 1.1 christos 207 1.1 christos buf.base = (char*) &write_buffer; 208 1.1 christos buf.len = sizeof write_buffer; 209 1.1 christos 210 1.1 christos req = (uv_write_t*) req_alloc(); 211 1.1 christos r = uv_write(req, stream, &buf, 1, write_cb); 212 1.1 christos ASSERT(r == 0); 213 1.1 christos } 214 1.1 christos 215 1.1 christos 216 1.1 christos static void connect_cb(uv_connect_t* req, int status) { 217 1.1 christos int i; 218 1.1 christos 219 1.1 christos if (status) { 220 1.1 christos fprintf(stderr, "%s", uv_strerror(status)); 221 1.1 christos fflush(stderr); 222 1.1 christos } 223 1.1 christos ASSERT(status == 0); 224 1.1 christos 225 1.1 christos write_sockets++; 226 1.1 christos req_free((uv_req_t*) req); 227 1.1 christos 228 1.1 christos maybe_connect_some(); 229 1.1 christos 230 1.1 christos if (write_sockets == TARGET_CONNECTIONS) { 231 1.1 christos start_stats_collection(); 232 1.1 christos 233 1.1 christos /* Yay! start writing */ 234 1.1 christos for (i = 0; i < write_sockets; i++) { 235 1.1 christos if (type == TCP) 236 1.1 christos do_write((uv_stream_t*) &tcp_write_handles[i]); 237 1.1 christos else 238 1.1 christos do_write((uv_stream_t*) &pipe_write_handles[i]); 239 1.1 christos } 240 1.1 christos } 241 1.1 christos } 242 1.1 christos 243 1.1 christos 244 1.1 christos static void maybe_connect_some(void) { 245 1.1 christos uv_connect_t* req; 246 1.1 christos uv_tcp_t* tcp; 247 1.1 christos uv_pipe_t* pipe; 248 1.1 christos int r; 249 1.1 christos 250 1.1 christos while (max_connect_socket < TARGET_CONNECTIONS && 251 1.1 christos max_connect_socket < write_sockets + MAX_SIMULTANEOUS_CONNECTS) { 252 1.1 christos if (type == TCP) { 253 1.1 christos tcp = &tcp_write_handles[max_connect_socket++]; 254 1.1 christos 255 1.1 christos r = uv_tcp_init(loop, tcp); 256 1.1 christos ASSERT(r == 0); 257 1.1 christos 258 1.1 christos req = (uv_connect_t*) req_alloc(); 259 1.1 christos r = uv_tcp_connect(req, 260 1.1 christos tcp, 261 1.1 christos (const struct sockaddr*) &connect_addr, 262 1.1 christos connect_cb); 263 1.1 christos ASSERT(r == 0); 264 1.1 christos } else { 265 1.1 christos pipe = &pipe_write_handles[max_connect_socket++]; 266 1.1 christos 267 1.1 christos r = uv_pipe_init(loop, pipe, 0); 268 1.1 christos ASSERT(r == 0); 269 1.1 christos 270 1.1 christos req = (uv_connect_t*) req_alloc(); 271 1.1 christos uv_pipe_connect(req, pipe, TEST_PIPENAME, connect_cb); 272 1.1 christos } 273 1.1 christos } 274 1.1 christos } 275 1.1 christos 276 1.1 christos 277 1.1 christos static void connection_cb(uv_stream_t* s, int status) { 278 1.1 christos uv_stream_t* stream; 279 1.1 christos int r; 280 1.1 christos 281 1.1 christos ASSERT(server == s); 282 1.1 christos ASSERT(status == 0); 283 1.1 christos 284 1.1 christos if (type == TCP) { 285 1.1 christos stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); 286 1.1 christos r = uv_tcp_init(loop, (uv_tcp_t*)stream); 287 1.1 christos ASSERT(r == 0); 288 1.1 christos } else { 289 1.1 christos stream = (uv_stream_t*)malloc(sizeof(uv_pipe_t)); 290 1.1 christos r = uv_pipe_init(loop, (uv_pipe_t*)stream, 0); 291 1.1 christos ASSERT(r == 0); 292 1.1 christos } 293 1.1 christos 294 1.1 christos r = uv_accept(s, stream); 295 1.1 christos ASSERT(r == 0); 296 1.1 christos 297 1.1 christos r = uv_read_start(stream, buf_alloc, read_cb); 298 1.1 christos ASSERT(r == 0); 299 1.1 christos 300 1.1 christos read_sockets++; 301 1.1 christos max_read_sockets++; 302 1.1 christos } 303 1.1 christos 304 1.1 christos 305 1.1 christos /* 306 1.1 christos * Request allocator 307 1.1 christos */ 308 1.1 christos 309 1.1 christos typedef struct req_list_s { 310 1.1 christos union uv_any_req uv_req; 311 1.1 christos struct req_list_s* next; 312 1.1 christos } req_list_t; 313 1.1 christos 314 1.1 christos 315 1.1 christos static req_list_t* req_freelist = NULL; 316 1.1 christos 317 1.1 christos 318 1.1 christos static uv_req_t* req_alloc(void) { 319 1.1 christos req_list_t* req; 320 1.1 christos 321 1.1 christos req = req_freelist; 322 1.1 christos if (req != NULL) { 323 1.1 christos req_freelist = req->next; 324 1.1 christos return (uv_req_t*) req; 325 1.1 christos } 326 1.1 christos 327 1.1 christos req = (req_list_t*) malloc(sizeof *req); 328 1.1 christos return (uv_req_t*) req; 329 1.1 christos } 330 1.1 christos 331 1.1 christos 332 1.1 christos static void req_free(uv_req_t* uv_req) { 333 1.1 christos req_list_t* req = (req_list_t*) uv_req; 334 1.1 christos 335 1.1 christos req->next = req_freelist; 336 1.1 christos req_freelist = req; 337 1.1 christos } 338 1.1 christos 339 1.1 christos 340 1.1 christos /* 341 1.1 christos * Buffer allocator 342 1.1 christos */ 343 1.1 christos 344 1.1 christos typedef struct buf_list_s { 345 1.1 christos uv_buf_t uv_buf_t; 346 1.1 christos struct buf_list_s* next; 347 1.1 christos } buf_list_t; 348 1.1 christos 349 1.1 christos 350 1.1 christos static buf_list_t* buf_freelist = NULL; 351 1.1 christos 352 1.1 christos 353 1.1 christos static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf) { 354 1.1 christos buf_list_t* ab; 355 1.1 christos 356 1.1 christos ab = buf_freelist; 357 1.1 christos if (ab != NULL) 358 1.1 christos buf_freelist = ab->next; 359 1.1 christos else { 360 1.1 christos ab = malloc(size + sizeof(*ab)); 361 1.1 christos ab->uv_buf_t.len = size; 362 1.1 christos ab->uv_buf_t.base = (char*) (ab + 1); 363 1.1 christos } 364 1.1 christos 365 1.1 christos *buf = ab->uv_buf_t; 366 1.1 christos } 367 1.1 christos 368 1.1 christos 369 1.1 christos static void buf_free(const uv_buf_t* buf) { 370 1.1 christos buf_list_t* ab = (buf_list_t*) buf->base - 1; 371 1.1 christos ab->next = buf_freelist; 372 1.1 christos buf_freelist = ab; 373 1.1 christos } 374 1.1 christos 375 1.1 christos 376 1.1 christos HELPER_IMPL(tcp_pump_server) { 377 1.1 christos int r; 378 1.1 christos 379 1.1 christos type = TCP; 380 1.1 christos loop = uv_default_loop(); 381 1.1 christos 382 1.1 christos ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &listen_addr)); 383 1.1 christos 384 1.1 christos /* Server */ 385 1.1 christos server = (uv_stream_t*)&tcpServer; 386 1.1 christos r = uv_tcp_init(loop, &tcpServer); 387 1.1 christos ASSERT(r == 0); 388 1.1 christos r = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &listen_addr, 0); 389 1.1 christos ASSERT(r == 0); 390 1.1 christos r = uv_listen((uv_stream_t*)&tcpServer, MAX_WRITE_HANDLES, connection_cb); 391 1.1 christos ASSERT(r == 0); 392 1.1 christos 393 1.1.1.2 christos notify_parent_process(); 394 1.1 christos uv_run(loop, UV_RUN_DEFAULT); 395 1.1 christos 396 1.1 christos return 0; 397 1.1 christos } 398 1.1 christos 399 1.1 christos 400 1.1 christos HELPER_IMPL(pipe_pump_server) { 401 1.1 christos int r; 402 1.1 christos type = PIPE; 403 1.1 christos 404 1.1 christos loop = uv_default_loop(); 405 1.1 christos 406 1.1 christos /* Server */ 407 1.1 christos server = (uv_stream_t*)&pipeServer; 408 1.1 christos r = uv_pipe_init(loop, &pipeServer, 0); 409 1.1 christos ASSERT(r == 0); 410 1.1 christos r = uv_pipe_bind(&pipeServer, TEST_PIPENAME); 411 1.1 christos ASSERT(r == 0); 412 1.1 christos r = uv_listen((uv_stream_t*)&pipeServer, MAX_WRITE_HANDLES, connection_cb); 413 1.1 christos ASSERT(r == 0); 414 1.1 christos 415 1.1.1.2 christos notify_parent_process(); 416 1.1 christos uv_run(loop, UV_RUN_DEFAULT); 417 1.1 christos 418 1.1 christos MAKE_VALGRIND_HAPPY(); 419 1.1 christos return 0; 420 1.1 christos } 421 1.1 christos 422 1.1 christos 423 1.1 christos static void tcp_pump(int n) { 424 1.1 christos ASSERT(n <= MAX_WRITE_HANDLES); 425 1.1 christos TARGET_CONNECTIONS = n; 426 1.1 christos type = TCP; 427 1.1 christos 428 1.1 christos loop = uv_default_loop(); 429 1.1 christos 430 1.1 christos ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &connect_addr)); 431 1.1 christos 432 1.1 christos /* Start making connections */ 433 1.1 christos maybe_connect_some(); 434 1.1 christos 435 1.1 christos uv_run(loop, UV_RUN_DEFAULT); 436 1.1 christos 437 1.1 christos MAKE_VALGRIND_HAPPY(); 438 1.1 christos } 439 1.1 christos 440 1.1 christos 441 1.1 christos static void pipe_pump(int n) { 442 1.1 christos ASSERT(n <= MAX_WRITE_HANDLES); 443 1.1 christos TARGET_CONNECTIONS = n; 444 1.1 christos type = PIPE; 445 1.1 christos 446 1.1 christos loop = uv_default_loop(); 447 1.1 christos 448 1.1 christos /* Start making connections */ 449 1.1 christos maybe_connect_some(); 450 1.1 christos 451 1.1 christos uv_run(loop, UV_RUN_DEFAULT); 452 1.1 christos 453 1.1 christos MAKE_VALGRIND_HAPPY(); 454 1.1 christos } 455 1.1 christos 456 1.1 christos 457 1.1 christos BENCHMARK_IMPL(tcp_pump100_client) { 458 1.1 christos tcp_pump(100); 459 1.1 christos return 0; 460 1.1 christos } 461 1.1 christos 462 1.1 christos 463 1.1 christos BENCHMARK_IMPL(tcp_pump1_client) { 464 1.1 christos tcp_pump(1); 465 1.1 christos return 0; 466 1.1 christos } 467 1.1 christos 468 1.1 christos 469 1.1 christos BENCHMARK_IMPL(pipe_pump100_client) { 470 1.1 christos pipe_pump(100); 471 1.1 christos return 0; 472 1.1 christos } 473 1.1 christos 474 1.1 christos 475 1.1 christos BENCHMARK_IMPL(pipe_pump1_client) { 476 1.1 christos pipe_pump(1); 477 1.1 christos return 0; 478 1.1 christos } 479