benchmark-pump.c revision 1.1.1.2 1 1.1 christos /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2 1.1 christos *
3 1.1 christos * Permission is hereby granted, free of charge, to any person obtaining a copy
4 1.1 christos * of this software and associated documentation files (the "Software"), to
5 1.1 christos * deal in the Software without restriction, including without limitation the
6 1.1 christos * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7 1.1 christos * sell copies of the Software, and to permit persons to whom the Software is
8 1.1 christos * furnished to do so, subject to the following conditions:
9 1.1 christos *
10 1.1 christos * The above copyright notice and this permission notice shall be included in
11 1.1 christos * all copies or substantial portions of the Software.
12 1.1 christos *
13 1.1 christos * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14 1.1 christos * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15 1.1 christos * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16 1.1 christos * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17 1.1 christos * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18 1.1 christos * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19 1.1 christos * IN THE SOFTWARE.
20 1.1 christos */
21 1.1 christos
22 1.1 christos #include "task.h"
23 1.1 christos #include "uv.h"
24 1.1 christos
25 1.1 christos #include <math.h>
26 1.1 christos #include <stdio.h>
27 1.1 christos
28 1.1 christos
29 1.1 christos static int TARGET_CONNECTIONS;
30 1.1 christos #define WRITE_BUFFER_SIZE 8192
31 1.1 christos #define MAX_SIMULTANEOUS_CONNECTS 100
32 1.1 christos
33 1.1 christos #define PRINT_STATS 0
34 1.1 christos #define STATS_INTERVAL 1000 /* msec */
35 1.1 christos #define STATS_COUNT 5
36 1.1 christos
37 1.1 christos
38 1.1 christos static void do_write(uv_stream_t*);
39 1.1 christos static void maybe_connect_some(void);
40 1.1 christos
41 1.1 christos static uv_req_t* req_alloc(void);
42 1.1 christos static void req_free(uv_req_t* uv_req);
43 1.1 christos
44 1.1 christos static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf);
45 1.1 christos static void buf_free(const uv_buf_t* buf);
46 1.1 christos
47 1.1 christos static uv_loop_t* loop;
48 1.1 christos
49 1.1 christos static uv_tcp_t tcpServer;
50 1.1 christos static uv_pipe_t pipeServer;
51 1.1 christos static uv_stream_t* server;
52 1.1 christos static struct sockaddr_in listen_addr;
53 1.1 christos static struct sockaddr_in connect_addr;
54 1.1 christos
55 1.1 christos static int64_t start_time;
56 1.1 christos
57 1.1 christos static int max_connect_socket = 0;
58 1.1 christos static int max_read_sockets = 0;
59 1.1 christos static int read_sockets = 0;
60 1.1 christos static int write_sockets = 0;
61 1.1 christos
62 1.1 christos static int64_t nrecv = 0;
63 1.1 christos static int64_t nrecv_total = 0;
64 1.1 christos static int64_t nsent = 0;
65 1.1 christos static int64_t nsent_total = 0;
66 1.1 christos
67 1.1 christos static int stats_left = 0;
68 1.1 christos
69 1.1 christos static char write_buffer[WRITE_BUFFER_SIZE];
70 1.1 christos
71 1.1 christos /* Make this as large as you need. */
72 1.1 christos #define MAX_WRITE_HANDLES 1000
73 1.1 christos
74 1.1 christos static stream_type type;
75 1.1 christos
76 1.1 christos static uv_tcp_t tcp_write_handles[MAX_WRITE_HANDLES];
77 1.1 christos static uv_pipe_t pipe_write_handles[MAX_WRITE_HANDLES];
78 1.1 christos
79 1.1 christos static uv_timer_t timer_handle;
80 1.1 christos
81 1.1 christos
82 1.1 christos static double gbit(int64_t bytes, int64_t passed_ms) {
83 1.1 christos double gbits = ((double)bytes / (1024 * 1024 * 1024)) * 8;
84 1.1 christos return gbits / ((double)passed_ms / 1000);
85 1.1 christos }
86 1.1 christos
87 1.1 christos
88 1.1 christos static void show_stats(uv_timer_t* handle) {
89 1.1 christos int64_t diff;
90 1.1 christos int i;
91 1.1 christos
92 1.1 christos #if PRINT_STATS
93 1.1 christos fprintf(stderr, "connections: %d, write: %.1f gbit/s\n",
94 1.1 christos write_sockets,
95 1.1 christos gbit(nsent, STATS_INTERVAL));
96 1.1 christos fflush(stderr);
97 1.1 christos #endif
98 1.1 christos
99 1.1 christos /* Exit if the show is over */
100 1.1 christos if (!--stats_left) {
101 1.1 christos
102 1.1 christos uv_update_time(loop);
103 1.1 christos diff = uv_now(loop) - start_time;
104 1.1 christos
105 1.1 christos fprintf(stderr, "%s_pump%d_client: %.1f gbit/s\n",
106 1.1 christos type == TCP ? "tcp" : "pipe",
107 1.1 christos write_sockets,
108 1.1 christos gbit(nsent_total, diff));
109 1.1 christos fflush(stderr);
110 1.1 christos
111 1.1 christos for (i = 0; i < write_sockets; i++) {
112 1.1 christos if (type == TCP)
113 1.1 christos uv_close((uv_handle_t*) &tcp_write_handles[i], NULL);
114 1.1 christos else
115 1.1 christos uv_close((uv_handle_t*) &pipe_write_handles[i], NULL);
116 1.1 christos }
117 1.1 christos
118 1.1 christos exit(0);
119 1.1 christos }
120 1.1 christos
121 1.1 christos /* Reset read and write counters */
122 1.1 christos nrecv = 0;
123 1.1 christos nsent = 0;
124 1.1 christos }
125 1.1 christos
126 1.1 christos
127 1.1 christos static void read_show_stats(void) {
128 1.1 christos int64_t diff;
129 1.1 christos
130 1.1 christos uv_update_time(loop);
131 1.1 christos diff = uv_now(loop) - start_time;
132 1.1 christos
133 1.1 christos fprintf(stderr, "%s_pump%d_server: %.1f gbit/s\n",
134 1.1 christos type == TCP ? "tcp" : "pipe",
135 1.1 christos max_read_sockets,
136 1.1 christos gbit(nrecv_total, diff));
137 1.1 christos fflush(stderr);
138 1.1 christos }
139 1.1 christos
140 1.1 christos
141 1.1 christos
142 1.1 christos static void read_sockets_close_cb(uv_handle_t* handle) {
143 1.1 christos free(handle);
144 1.1 christos read_sockets--;
145 1.1 christos
146 1.1 christos /* If it's past the first second and everyone has closed their connection
147 1.1 christos * Then print stats.
148 1.1 christos */
149 1.1 christos if (uv_now(loop) - start_time > 1000 && read_sockets == 0) {
150 1.1 christos read_show_stats();
151 1.1 christos uv_close((uv_handle_t*)server, NULL);
152 1.1 christos }
153 1.1 christos }
154 1.1 christos
155 1.1 christos
156 1.1 christos static void start_stats_collection(void) {
157 1.1 christos int r;
158 1.1 christos
159 1.1 christos /* Show-stats timer */
160 1.1 christos stats_left = STATS_COUNT;
161 1.1 christos r = uv_timer_init(loop, &timer_handle);
162 1.1 christos ASSERT(r == 0);
163 1.1 christos r = uv_timer_start(&timer_handle, show_stats, STATS_INTERVAL, STATS_INTERVAL);
164 1.1 christos ASSERT(r == 0);
165 1.1 christos
166 1.1 christos uv_update_time(loop);
167 1.1 christos start_time = uv_now(loop);
168 1.1 christos }
169 1.1 christos
170 1.1 christos
171 1.1 christos static void read_cb(uv_stream_t* stream, ssize_t bytes, const uv_buf_t* buf) {
172 1.1 christos if (nrecv_total == 0) {
173 1.1 christos ASSERT(start_time == 0);
174 1.1 christos uv_update_time(loop);
175 1.1 christos start_time = uv_now(loop);
176 1.1 christos }
177 1.1 christos
178 1.1 christos if (bytes < 0) {
179 1.1 christos uv_close((uv_handle_t*)stream, read_sockets_close_cb);
180 1.1 christos return;
181 1.1 christos }
182 1.1 christos
183 1.1 christos buf_free(buf);
184 1.1 christos
185 1.1 christos nrecv += bytes;
186 1.1 christos nrecv_total += bytes;
187 1.1 christos }
188 1.1 christos
189 1.1 christos
190 1.1 christos static void write_cb(uv_write_t* req, int status) {
191 1.1 christos ASSERT(status == 0);
192 1.1 christos
193 1.1 christos req_free((uv_req_t*) req);
194 1.1 christos
195 1.1 christos nsent += sizeof write_buffer;
196 1.1 christos nsent_total += sizeof write_buffer;
197 1.1 christos
198 1.1 christos do_write((uv_stream_t*) req->handle);
199 1.1 christos }
200 1.1 christos
201 1.1 christos
202 1.1 christos static void do_write(uv_stream_t* stream) {
203 1.1 christos uv_write_t* req;
204 1.1 christos uv_buf_t buf;
205 1.1 christos int r;
206 1.1 christos
207 1.1 christos buf.base = (char*) &write_buffer;
208 1.1 christos buf.len = sizeof write_buffer;
209 1.1 christos
210 1.1 christos req = (uv_write_t*) req_alloc();
211 1.1 christos r = uv_write(req, stream, &buf, 1, write_cb);
212 1.1 christos ASSERT(r == 0);
213 1.1 christos }
214 1.1 christos
215 1.1 christos
216 1.1 christos static void connect_cb(uv_connect_t* req, int status) {
217 1.1 christos int i;
218 1.1 christos
219 1.1 christos if (status) {
220 1.1 christos fprintf(stderr, "%s", uv_strerror(status));
221 1.1 christos fflush(stderr);
222 1.1 christos }
223 1.1 christos ASSERT(status == 0);
224 1.1 christos
225 1.1 christos write_sockets++;
226 1.1 christos req_free((uv_req_t*) req);
227 1.1 christos
228 1.1 christos maybe_connect_some();
229 1.1 christos
230 1.1 christos if (write_sockets == TARGET_CONNECTIONS) {
231 1.1 christos start_stats_collection();
232 1.1 christos
233 1.1 christos /* Yay! start writing */
234 1.1 christos for (i = 0; i < write_sockets; i++) {
235 1.1 christos if (type == TCP)
236 1.1 christos do_write((uv_stream_t*) &tcp_write_handles[i]);
237 1.1 christos else
238 1.1 christos do_write((uv_stream_t*) &pipe_write_handles[i]);
239 1.1 christos }
240 1.1 christos }
241 1.1 christos }
242 1.1 christos
243 1.1 christos
244 1.1 christos static void maybe_connect_some(void) {
245 1.1 christos uv_connect_t* req;
246 1.1 christos uv_tcp_t* tcp;
247 1.1 christos uv_pipe_t* pipe;
248 1.1 christos int r;
249 1.1 christos
250 1.1 christos while (max_connect_socket < TARGET_CONNECTIONS &&
251 1.1 christos max_connect_socket < write_sockets + MAX_SIMULTANEOUS_CONNECTS) {
252 1.1 christos if (type == TCP) {
253 1.1 christos tcp = &tcp_write_handles[max_connect_socket++];
254 1.1 christos
255 1.1 christos r = uv_tcp_init(loop, tcp);
256 1.1 christos ASSERT(r == 0);
257 1.1 christos
258 1.1 christos req = (uv_connect_t*) req_alloc();
259 1.1 christos r = uv_tcp_connect(req,
260 1.1 christos tcp,
261 1.1 christos (const struct sockaddr*) &connect_addr,
262 1.1 christos connect_cb);
263 1.1 christos ASSERT(r == 0);
264 1.1 christos } else {
265 1.1 christos pipe = &pipe_write_handles[max_connect_socket++];
266 1.1 christos
267 1.1 christos r = uv_pipe_init(loop, pipe, 0);
268 1.1 christos ASSERT(r == 0);
269 1.1 christos
270 1.1 christos req = (uv_connect_t*) req_alloc();
271 1.1 christos uv_pipe_connect(req, pipe, TEST_PIPENAME, connect_cb);
272 1.1 christos }
273 1.1 christos }
274 1.1 christos }
275 1.1 christos
276 1.1 christos
277 1.1 christos static void connection_cb(uv_stream_t* s, int status) {
278 1.1 christos uv_stream_t* stream;
279 1.1 christos int r;
280 1.1 christos
281 1.1 christos ASSERT(server == s);
282 1.1 christos ASSERT(status == 0);
283 1.1 christos
284 1.1 christos if (type == TCP) {
285 1.1 christos stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
286 1.1 christos r = uv_tcp_init(loop, (uv_tcp_t*)stream);
287 1.1 christos ASSERT(r == 0);
288 1.1 christos } else {
289 1.1 christos stream = (uv_stream_t*)malloc(sizeof(uv_pipe_t));
290 1.1 christos r = uv_pipe_init(loop, (uv_pipe_t*)stream, 0);
291 1.1 christos ASSERT(r == 0);
292 1.1 christos }
293 1.1 christos
294 1.1 christos r = uv_accept(s, stream);
295 1.1 christos ASSERT(r == 0);
296 1.1 christos
297 1.1 christos r = uv_read_start(stream, buf_alloc, read_cb);
298 1.1 christos ASSERT(r == 0);
299 1.1 christos
300 1.1 christos read_sockets++;
301 1.1 christos max_read_sockets++;
302 1.1 christos }
303 1.1 christos
304 1.1 christos
305 1.1 christos /*
306 1.1 christos * Request allocator
307 1.1 christos */
308 1.1 christos
309 1.1 christos typedef struct req_list_s {
310 1.1 christos union uv_any_req uv_req;
311 1.1 christos struct req_list_s* next;
312 1.1 christos } req_list_t;
313 1.1 christos
314 1.1 christos
315 1.1 christos static req_list_t* req_freelist = NULL;
316 1.1 christos
317 1.1 christos
318 1.1 christos static uv_req_t* req_alloc(void) {
319 1.1 christos req_list_t* req;
320 1.1 christos
321 1.1 christos req = req_freelist;
322 1.1 christos if (req != NULL) {
323 1.1 christos req_freelist = req->next;
324 1.1 christos return (uv_req_t*) req;
325 1.1 christos }
326 1.1 christos
327 1.1 christos req = (req_list_t*) malloc(sizeof *req);
328 1.1 christos return (uv_req_t*) req;
329 1.1 christos }
330 1.1 christos
331 1.1 christos
332 1.1 christos static void req_free(uv_req_t* uv_req) {
333 1.1 christos req_list_t* req = (req_list_t*) uv_req;
334 1.1 christos
335 1.1 christos req->next = req_freelist;
336 1.1 christos req_freelist = req;
337 1.1 christos }
338 1.1 christos
339 1.1 christos
340 1.1 christos /*
341 1.1 christos * Buffer allocator
342 1.1 christos */
343 1.1 christos
344 1.1 christos typedef struct buf_list_s {
345 1.1 christos uv_buf_t uv_buf_t;
346 1.1 christos struct buf_list_s* next;
347 1.1 christos } buf_list_t;
348 1.1 christos
349 1.1 christos
350 1.1 christos static buf_list_t* buf_freelist = NULL;
351 1.1 christos
352 1.1 christos
353 1.1 christos static void buf_alloc(uv_handle_t* handle, size_t size, uv_buf_t* buf) {
354 1.1 christos buf_list_t* ab;
355 1.1 christos
356 1.1 christos ab = buf_freelist;
357 1.1 christos if (ab != NULL)
358 1.1 christos buf_freelist = ab->next;
359 1.1 christos else {
360 1.1 christos ab = malloc(size + sizeof(*ab));
361 1.1 christos ab->uv_buf_t.len = size;
362 1.1 christos ab->uv_buf_t.base = (char*) (ab + 1);
363 1.1 christos }
364 1.1 christos
365 1.1 christos *buf = ab->uv_buf_t;
366 1.1 christos }
367 1.1 christos
368 1.1 christos
369 1.1 christos static void buf_free(const uv_buf_t* buf) {
370 1.1 christos buf_list_t* ab = (buf_list_t*) buf->base - 1;
371 1.1 christos ab->next = buf_freelist;
372 1.1 christos buf_freelist = ab;
373 1.1 christos }
374 1.1 christos
375 1.1 christos
376 1.1 christos HELPER_IMPL(tcp_pump_server) {
377 1.1 christos int r;
378 1.1 christos
379 1.1 christos type = TCP;
380 1.1 christos loop = uv_default_loop();
381 1.1 christos
382 1.1 christos ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &listen_addr));
383 1.1 christos
384 1.1 christos /* Server */
385 1.1 christos server = (uv_stream_t*)&tcpServer;
386 1.1 christos r = uv_tcp_init(loop, &tcpServer);
387 1.1 christos ASSERT(r == 0);
388 1.1 christos r = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &listen_addr, 0);
389 1.1 christos ASSERT(r == 0);
390 1.1 christos r = uv_listen((uv_stream_t*)&tcpServer, MAX_WRITE_HANDLES, connection_cb);
391 1.1 christos ASSERT(r == 0);
392 1.1 christos
393 1.1.1.2 christos notify_parent_process();
394 1.1 christos uv_run(loop, UV_RUN_DEFAULT);
395 1.1 christos
396 1.1 christos return 0;
397 1.1 christos }
398 1.1 christos
399 1.1 christos
400 1.1 christos HELPER_IMPL(pipe_pump_server) {
401 1.1 christos int r;
402 1.1 christos type = PIPE;
403 1.1 christos
404 1.1 christos loop = uv_default_loop();
405 1.1 christos
406 1.1 christos /* Server */
407 1.1 christos server = (uv_stream_t*)&pipeServer;
408 1.1 christos r = uv_pipe_init(loop, &pipeServer, 0);
409 1.1 christos ASSERT(r == 0);
410 1.1 christos r = uv_pipe_bind(&pipeServer, TEST_PIPENAME);
411 1.1 christos ASSERT(r == 0);
412 1.1 christos r = uv_listen((uv_stream_t*)&pipeServer, MAX_WRITE_HANDLES, connection_cb);
413 1.1 christos ASSERT(r == 0);
414 1.1 christos
415 1.1.1.2 christos notify_parent_process();
416 1.1 christos uv_run(loop, UV_RUN_DEFAULT);
417 1.1 christos
418 1.1 christos MAKE_VALGRIND_HAPPY();
419 1.1 christos return 0;
420 1.1 christos }
421 1.1 christos
422 1.1 christos
423 1.1 christos static void tcp_pump(int n) {
424 1.1 christos ASSERT(n <= MAX_WRITE_HANDLES);
425 1.1 christos TARGET_CONNECTIONS = n;
426 1.1 christos type = TCP;
427 1.1 christos
428 1.1 christos loop = uv_default_loop();
429 1.1 christos
430 1.1 christos ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &connect_addr));
431 1.1 christos
432 1.1 christos /* Start making connections */
433 1.1 christos maybe_connect_some();
434 1.1 christos
435 1.1 christos uv_run(loop, UV_RUN_DEFAULT);
436 1.1 christos
437 1.1 christos MAKE_VALGRIND_HAPPY();
438 1.1 christos }
439 1.1 christos
440 1.1 christos
441 1.1 christos static void pipe_pump(int n) {
442 1.1 christos ASSERT(n <= MAX_WRITE_HANDLES);
443 1.1 christos TARGET_CONNECTIONS = n;
444 1.1 christos type = PIPE;
445 1.1 christos
446 1.1 christos loop = uv_default_loop();
447 1.1 christos
448 1.1 christos /* Start making connections */
449 1.1 christos maybe_connect_some();
450 1.1 christos
451 1.1 christos uv_run(loop, UV_RUN_DEFAULT);
452 1.1 christos
453 1.1 christos MAKE_VALGRIND_HAPPY();
454 1.1 christos }
455 1.1 christos
456 1.1 christos
457 1.1 christos BENCHMARK_IMPL(tcp_pump100_client) {
458 1.1 christos tcp_pump(100);
459 1.1 christos return 0;
460 1.1 christos }
461 1.1 christos
462 1.1 christos
463 1.1 christos BENCHMARK_IMPL(tcp_pump1_client) {
464 1.1 christos tcp_pump(1);
465 1.1 christos return 0;
466 1.1 christos }
467 1.1 christos
468 1.1 christos
469 1.1 christos BENCHMARK_IMPL(pipe_pump100_client) {
470 1.1 christos pipe_pump(100);
471 1.1 christos return 0;
472 1.1 christos }
473 1.1 christos
474 1.1 christos
475 1.1 christos BENCHMARK_IMPL(pipe_pump1_client) {
476 1.1 christos pipe_pump(1);
477 1.1 christos return 0;
478 1.1 christos }
479