regress_bufferevent.c revision 1.2.6.2 1 /* $NetBSD: regress_bufferevent.c,v 1.2.6.2 2014/12/25 02:34:45 snj Exp $ */
2
3 /*
4 * Copyright (c) 2003-2007 Niels Provos <provos (at) citi.umich.edu>
5 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * 3. The name of the author may not be used to endorse or promote products
16 * derived from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 */
29 #include "util-internal.h"
30
31 /* The old tests here need assertions to work. */
32 #undef NDEBUG
33
34 #ifdef _WIN32
35 #include <winsock2.h>
36 #include <windows.h>
37 #endif
38
39 #include "event2/event-config.h"
40
41 #include <sys/types.h>
42 #include <sys/stat.h>
43 #ifdef EVENT__HAVE_SYS_TIME_H
44 #include <sys/time.h>
45 #endif
46 #include <sys/queue.h>
47 #ifndef _WIN32
48 #include <sys/socket.h>
49 #include <sys/wait.h>
50 #include <signal.h>
51 #include <unistd.h>
52 #include <netdb.h>
53 #include <netinet/in.h>
54 #endif
55 #include <fcntl.h>
56 #include <signal.h>
57 #include <stdlib.h>
58 #include <stdio.h>
59 #include <string.h>
60 #include <errno.h>
61 #include <assert.h>
62
63 #ifdef EVENT__HAVE_ARPA_INET_H
64 #include <arpa/inet.h>
65 #endif
66
67 #include "event2/event-config.h"
68 #include "event2/event.h"
69 #include "event2/event_struct.h"
70 #include "event2/event_compat.h"
71 #include "event2/tag.h"
72 #include "event2/buffer.h"
73 #include "event2/bufferevent.h"
74 #include "event2/bufferevent_compat.h"
75 #include "event2/bufferevent_struct.h"
76 #include "event2/listener.h"
77 #include "event2/util.h"
78
79 #include "bufferevent-internal.h"
80 #include "util-internal.h"
81 #ifdef _WIN32
82 #include "iocp-internal.h"
83 #endif
84
85 #include "regress.h"
86 #include "regress_testutils.h"
87
88 /*
89 * simple bufferevent test
90 */
91
92 static void
93 readcb(struct bufferevent *bev, void *arg)
94 {
95 if (evbuffer_get_length(bev->input) == 8333) {
96 struct evbuffer *evbuf = evbuffer_new();
97 assert(evbuf != NULL);
98
99 /* gratuitous test of bufferevent_read_buffer */
100 bufferevent_read_buffer(bev, evbuf);
101
102 bufferevent_disable(bev, EV_READ);
103
104 if (evbuffer_get_length(evbuf) == 8333) {
105 test_ok++;
106 }
107
108 evbuffer_free(evbuf);
109 }
110 }
111
112 static void
113 writecb(struct bufferevent *bev, void *arg)
114 {
115 if (evbuffer_get_length(bev->output) == 0) {
116 test_ok++;
117 }
118 }
119
120 static void
121 errorcb(struct bufferevent *bev, short what, void *arg)
122 {
123 test_ok = -2;
124 }
125
126 static void
127 test_bufferevent_impl(int use_pair)
128 {
129 struct bufferevent *bev1 = NULL, *bev2 = NULL;
130 char buffer[8333];
131 int i;
132
133 if (use_pair) {
134 struct bufferevent *pair[2];
135 tt_assert(0 == bufferevent_pair_new(NULL, 0, pair));
136 bev1 = pair[0];
137 bev2 = pair[1];
138 bufferevent_setcb(bev1, readcb, writecb, errorcb, bev1);
139 bufferevent_setcb(bev2, readcb, writecb, errorcb, NULL);
140 tt_int_op(bufferevent_getfd(bev1), ==, -1);
141 tt_ptr_op(bufferevent_get_underlying(bev1), ==, NULL);
142 tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, bev2);
143 tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, bev1);
144 } else {
145 bev1 = bufferevent_new(pair[0], readcb, writecb, errorcb, NULL);
146 bev2 = bufferevent_new(pair[1], readcb, writecb, errorcb, NULL);
147 tt_int_op(bufferevent_getfd(bev1), ==, pair[0]);
148 tt_ptr_op(bufferevent_get_underlying(bev1), ==, NULL);
149 tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, NULL);
150 tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, NULL);
151 }
152
153 {
154 /* Test getcb. */
155 bufferevent_data_cb r, w;
156 bufferevent_event_cb e;
157 void *a;
158 bufferevent_getcb(bev1, &r, &w, &e, &a);
159 tt_ptr_op(r, ==, readcb);
160 tt_ptr_op(w, ==, writecb);
161 tt_ptr_op(e, ==, errorcb);
162 tt_ptr_op(a, ==, use_pair ? bev1 : NULL);
163 }
164
165 bufferevent_disable(bev1, EV_READ);
166 bufferevent_enable(bev2, EV_READ);
167
168 tt_int_op(bufferevent_get_enabled(bev1), ==, EV_WRITE);
169 tt_int_op(bufferevent_get_enabled(bev2), ==, EV_WRITE|EV_READ);
170
171 for (i = 0; i < (int)sizeof(buffer); i++)
172 buffer[i] = i;
173
174 bufferevent_write(bev1, buffer, sizeof(buffer));
175
176 event_dispatch();
177
178 bufferevent_free(bev1);
179 tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, NULL);
180 bufferevent_free(bev2);
181
182 if (test_ok != 2)
183 test_ok = 0;
184 end:
185 ;
186 }
187
188 static void
189 test_bufferevent(void)
190 {
191 test_bufferevent_impl(0);
192 }
193
194 static void
195 test_bufferevent_pair(void)
196 {
197 test_bufferevent_impl(1);
198 }
199
200 /*
201 * test watermarks and bufferevent
202 */
203
204 static void
205 wm_readcb(struct bufferevent *bev, void *arg)
206 {
207 struct evbuffer *evbuf = evbuffer_new();
208 int len = (int)evbuffer_get_length(bev->input);
209 static int nread;
210
211 assert(len >= 10 && len <= 20);
212
213 assert(evbuf != NULL);
214
215 /* gratuitous test of bufferevent_read_buffer */
216 bufferevent_read_buffer(bev, evbuf);
217
218 nread += len;
219 if (nread == 65000) {
220 bufferevent_disable(bev, EV_READ);
221 test_ok++;
222 }
223
224 evbuffer_free(evbuf);
225 }
226
227 static void
228 wm_writecb(struct bufferevent *bev, void *arg)
229 {
230 assert(evbuffer_get_length(bev->output) <= 100);
231 if (evbuffer_get_length(bev->output) == 0) {
232 evbuffer_drain(bev->output, evbuffer_get_length(bev->output));
233 test_ok++;
234 }
235 }
236
237 static void
238 wm_errorcb(struct bufferevent *bev, short what, void *arg)
239 {
240 test_ok = -2;
241 }
242
243 static void
244 test_bufferevent_watermarks_impl(int use_pair)
245 {
246 struct bufferevent *bev1 = NULL, *bev2 = NULL;
247 char buffer[65000];
248 size_t low, high;
249 int i;
250 test_ok = 0;
251
252 if (use_pair) {
253 struct bufferevent *pair[2];
254 tt_assert(0 == bufferevent_pair_new(NULL, 0, pair));
255 bev1 = pair[0];
256 bev2 = pair[1];
257 bufferevent_setcb(bev1, NULL, wm_writecb, errorcb, NULL);
258 bufferevent_setcb(bev2, wm_readcb, NULL, errorcb, NULL);
259 } else {
260 bev1 = bufferevent_new(pair[0], NULL, wm_writecb, wm_errorcb, NULL);
261 bev2 = bufferevent_new(pair[1], wm_readcb, NULL, wm_errorcb, NULL);
262 }
263 tt_assert(bev1);
264 tt_assert(bev2);
265 bufferevent_disable(bev1, EV_READ);
266 bufferevent_enable(bev2, EV_READ);
267
268 /* By default, low watermarks are set to 0 */
269 bufferevent_getwatermark(bev1, EV_READ, &low, NULL);
270 tt_int_op(low, ==, 0);
271 bufferevent_getwatermark(bev2, EV_WRITE, &low, NULL);
272 tt_int_op(low, ==, 0);
273
274 for (i = 0; i < (int)sizeof(buffer); i++)
275 buffer[i] = (char)i;
276
277 /* limit the reading on the receiving bufferevent */
278 bufferevent_setwatermark(bev2, EV_READ, 10, 20);
279
280 bufferevent_getwatermark(bev2, EV_READ, &low, &high);
281 tt_int_op(low, ==, 10);
282 tt_int_op(high, ==, 20);
283
284 /* Tell the sending bufferevent not to notify us till it's down to
285 100 bytes. */
286 bufferevent_setwatermark(bev1, EV_WRITE, 100, 2000);
287
288 bufferevent_getwatermark(bev1, EV_WRITE, &low, &high);
289 tt_int_op(low, ==, 100);
290 tt_int_op(high, ==, 2000);
291
292 bufferevent_write(bev1, buffer, sizeof(buffer));
293
294 event_dispatch();
295
296 tt_int_op(test_ok, ==, 2);
297
298 /* The write callback drained all the data from outbuf, so we
299 * should have removed the write event... */
300 tt_assert(!event_pending(&bev2->ev_write, EV_WRITE, NULL));
301
302 end:
303 if (bev1)
304 bufferevent_free(bev1);
305 if (bev2)
306 bufferevent_free(bev2);
307 }
308
309 static void
310 test_bufferevent_watermarks(void)
311 {
312 test_bufferevent_watermarks_impl(0);
313 }
314
315 static void
316 test_bufferevent_pair_watermarks(void)
317 {
318 test_bufferevent_watermarks_impl(1);
319 }
320
321 /*
322 * Test bufferevent filters
323 */
324
325 /* strip an 'x' from each byte */
326
327 static enum bufferevent_filter_result
328 bufferevent_input_filter(struct evbuffer *src, struct evbuffer *dst,
329 ev_ssize_t lim, enum bufferevent_flush_mode state, void *ctx)
330 {
331 const unsigned char *buffer;
332 unsigned i;
333
334 buffer = evbuffer_pullup(src, evbuffer_get_length(src));
335 for (i = 0; i < evbuffer_get_length(src); i += 2) {
336 assert(buffer[i] == 'x');
337 evbuffer_add(dst, buffer + i + 1, 1);
338
339 if (i + 2 > evbuffer_get_length(src))
340 break;
341 }
342
343 evbuffer_drain(src, i);
344 return (BEV_OK);
345 }
346
347 /* add an 'x' before each byte */
348
349 static enum bufferevent_filter_result
350 bufferevent_output_filter(struct evbuffer *src, struct evbuffer *dst,
351 ev_ssize_t lim, enum bufferevent_flush_mode state, void *ctx)
352 {
353 const unsigned char *buffer;
354 unsigned i;
355
356 buffer = evbuffer_pullup(src, evbuffer_get_length(src));
357 for (i = 0; i < evbuffer_get_length(src); ++i) {
358 evbuffer_add(dst, "x", 1);
359 evbuffer_add(dst, buffer + i, 1);
360 }
361
362 evbuffer_drain(src, evbuffer_get_length(src));
363 return (BEV_OK);
364 }
365
366 static void
367 test_bufferevent_filters_impl(int use_pair)
368 {
369 struct bufferevent *bev1 = NULL, *bev2 = NULL;
370 struct bufferevent *bev1_base = NULL, *bev2_base = NULL;
371 char buffer[8333];
372 int i;
373
374 test_ok = 0;
375
376 if (use_pair) {
377 struct bufferevent *pair[2];
378 tt_assert(0 == bufferevent_pair_new(NULL, 0, pair));
379 bev1 = pair[0];
380 bev2 = pair[1];
381 } else {
382 bev1 = bufferevent_socket_new(NULL, pair[0], 0);
383 bev2 = bufferevent_socket_new(NULL, pair[1], 0);
384 }
385 bev1_base = bev1;
386 bev2_base = bev2;
387
388 for (i = 0; i < (int)sizeof(buffer); i++)
389 buffer[i] = i;
390
391 bev1 = bufferevent_filter_new(bev1, NULL, bufferevent_output_filter,
392 BEV_OPT_CLOSE_ON_FREE, NULL, NULL);
393
394 bev2 = bufferevent_filter_new(bev2, bufferevent_input_filter,
395 NULL, BEV_OPT_CLOSE_ON_FREE, NULL, NULL);
396 bufferevent_setcb(bev1, NULL, writecb, errorcb, NULL);
397 bufferevent_setcb(bev2, readcb, NULL, errorcb, NULL);
398
399 tt_ptr_op(bufferevent_get_underlying(bev1), ==, bev1_base);
400 tt_ptr_op(bufferevent_get_underlying(bev2), ==, bev2_base);
401 tt_int_op(bufferevent_getfd(bev1), ==, -1);
402 tt_int_op(bufferevent_getfd(bev2), ==, -1);
403
404 bufferevent_disable(bev1, EV_READ);
405 bufferevent_enable(bev2, EV_READ);
406 /* insert some filters */
407 bufferevent_write(bev1, buffer, sizeof(buffer));
408
409 event_dispatch();
410
411 if (test_ok != 2)
412 test_ok = 0;
413
414 end:
415 if (bev1)
416 bufferevent_free(bev1);
417 if (bev2)
418 bufferevent_free(bev2);
419
420 }
421
422 static void
423 test_bufferevent_filters(void)
424 {
425 test_bufferevent_filters_impl(0);
426 }
427
428 static void
429 test_bufferevent_pair_filters(void)
430 {
431 test_bufferevent_filters_impl(1);
432 }
433
434
435 static void
436 sender_writecb(struct bufferevent *bev, void *ctx)
437 {
438 if (evbuffer_get_length(bufferevent_get_output(bev)) == 0) {
439 bufferevent_disable(bev,EV_READ|EV_WRITE);
440 TT_BLATHER(("Flushed %d: freeing it.", (int)bufferevent_getfd(bev)));
441 bufferevent_free(bev);
442 }
443 }
444
445 static void
446 sender_errorcb(struct bufferevent *bev, short what, void *ctx)
447 {
448 TT_FAIL(("Got sender error %d",(int)what));
449 }
450
451 static int bufferevent_connect_test_flags = 0;
452 static int bufferevent_trigger_test_flags = 0;
453 static int n_strings_read = 0;
454 static int n_reads_invoked = 0;
455
456 #define TEST_STR "Now is the time for all good events to signal for " \
457 "the good of their protocol"
458 static void
459 listen_cb(struct evconnlistener *listener, evutil_socket_t fd,
460 struct sockaddr *sa, int socklen, void *arg)
461 {
462 struct event_base *base = arg;
463 struct bufferevent *bev;
464 const char s[] = TEST_STR;
465 TT_BLATHER(("Got a request on socket %d", (int)fd ));
466 bev = bufferevent_socket_new(base, fd, bufferevent_connect_test_flags);
467 tt_assert(bev);
468 bufferevent_setcb(bev, NULL, sender_writecb, sender_errorcb, NULL);
469 bufferevent_write(bev, s, sizeof(s));
470 end:
471 ;
472 }
473
474 static void
475 reader_eventcb(struct bufferevent *bev, short what, void *ctx)
476 {
477 struct event_base *base = ctx;
478 if (what & BEV_EVENT_ERROR) {
479 perror("foobar");
480 TT_FAIL(("got connector error %d", (int)what));
481 return;
482 }
483 if (what & BEV_EVENT_CONNECTED) {
484 TT_BLATHER(("connected on %d", (int)bufferevent_getfd(bev)));
485 bufferevent_enable(bev, EV_READ);
486 }
487 if (what & BEV_EVENT_EOF) {
488 char buf[512];
489 size_t n;
490 n = bufferevent_read(bev, buf, sizeof(buf)-1);
491 tt_int_op(n, >=, 0);
492 buf[n] = '\0';
493 tt_str_op(buf, ==, TEST_STR);
494 if (++n_strings_read == 2)
495 event_base_loopexit(base, NULL);
496 TT_BLATHER(("EOF on %d: %d strings read.",
497 (int)bufferevent_getfd(bev), n_strings_read));
498 }
499 end:
500 ;
501 }
502
503 static void
504 reader_readcb(struct bufferevent *bev, void *ctx)
505 {
506 TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev)));
507 n_reads_invoked++;
508 }
509
510 static void
511 test_bufferevent_connect(void *arg)
512 {
513 struct basic_test_data *data = arg;
514 struct evconnlistener *lev=NULL;
515 struct bufferevent *bev1=NULL, *bev2=NULL;
516 struct sockaddr_in localhost;
517 struct sockaddr_storage ss;
518 struct sockaddr *sa;
519 ev_socklen_t slen;
520
521 int be_flags=BEV_OPT_CLOSE_ON_FREE;
522
523 if (strstr((char*)data->setup_data, "defer")) {
524 be_flags |= BEV_OPT_DEFER_CALLBACKS;
525 }
526 if (strstr((char*)data->setup_data, "unlocked")) {
527 be_flags |= BEV_OPT_UNLOCK_CALLBACKS;
528 }
529 if (strstr((char*)data->setup_data, "lock")) {
530 be_flags |= BEV_OPT_THREADSAFE;
531 }
532 bufferevent_connect_test_flags = be_flags;
533 #ifdef _WIN32
534 if (!strcmp((char*)data->setup_data, "unset_connectex")) {
535 struct win32_extension_fns *ext =
536 (struct win32_extension_fns *)
537 event_get_win32_extension_fns_();
538 ext->ConnectEx = NULL;
539 }
540 #endif
541
542 memset(&localhost, 0, sizeof(localhost));
543
544 localhost.sin_port = 0; /* pick-a-port */
545 localhost.sin_addr.s_addr = htonl(0x7f000001L);
546 localhost.sin_family = AF_INET;
547 sa = (struct sockaddr *)&localhost;
548 lev = evconnlistener_new_bind(data->base, listen_cb, data->base,
549 LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE,
550 16, sa, sizeof(localhost));
551 tt_assert(lev);
552
553 sa = (struct sockaddr *)&ss;
554 slen = sizeof(ss);
555 if (regress_get_listener_addr(lev, sa, &slen) < 0) {
556 tt_abort_perror("getsockname");
557 }
558
559 tt_assert(!evconnlistener_enable(lev));
560 bev1 = bufferevent_socket_new(data->base, -1, be_flags);
561 bev2 = bufferevent_socket_new(data->base, -1, be_flags);
562 tt_assert(bev1);
563 tt_assert(bev2);
564 bufferevent_setcb(bev1, reader_readcb,NULL, reader_eventcb, data->base);
565 bufferevent_setcb(bev2, reader_readcb,NULL, reader_eventcb, data->base);
566
567 bufferevent_enable(bev1, EV_READ);
568 bufferevent_enable(bev2, EV_READ);
569
570 tt_want(!bufferevent_socket_connect(bev1, sa, sizeof(localhost)));
571 tt_want(!bufferevent_socket_connect(bev2, sa, sizeof(localhost)));
572
573 event_base_dispatch(data->base);
574
575 tt_int_op(n_strings_read, ==, 2);
576 tt_int_op(n_reads_invoked, >=, 2);
577 end:
578 if (lev)
579 evconnlistener_free(lev);
580
581 if (bev1)
582 bufferevent_free(bev1);
583
584 if (bev2)
585 bufferevent_free(bev2);
586 }
587
588 static void
589 want_fail_eventcb(struct bufferevent *bev, short what, void *ctx)
590 {
591 struct event_base *base = ctx;
592 const char *err;
593 evutil_socket_t s;
594
595 if (what & BEV_EVENT_ERROR) {
596 s = bufferevent_getfd(bev);
597 err = evutil_socket_error_to_string(evutil_socket_geterror(s));
598 TT_BLATHER(("connection failure on "EV_SOCK_FMT": %s",
599 EV_SOCK_ARG(s), err));
600 test_ok = 1;
601 } else {
602 TT_FAIL(("didn't fail? what %hd", what));
603 }
604
605 event_base_loopexit(base, NULL);
606 }
607
608 static void
609 close_socket_cb(evutil_socket_t fd, short what, void *arg)
610 {
611 evutil_socket_t *fdp = arg;
612 if (*fdp >= 0) {
613 evutil_closesocket(*fdp);
614 *fdp = -1;
615 }
616 }
617
618 static void
619 test_bufferevent_connect_fail(void *arg)
620 {
621 struct basic_test_data *data = (struct basic_test_data *)arg;
622 struct bufferevent *bev=NULL;
623 struct sockaddr_in localhost;
624 struct sockaddr *sa = (struct sockaddr*)&localhost;
625 evutil_socket_t fake_listener = -1;
626 ev_socklen_t slen = sizeof(localhost);
627 struct event close_listener_event;
628 int close_listener_event_added = 0;
629 struct timeval one_second = { 1, 0 };
630 int r;
631
632 test_ok = 0;
633
634 memset(&localhost, 0, sizeof(localhost));
635 localhost.sin_port = 0; /* have the kernel pick a port */
636 localhost.sin_addr.s_addr = htonl(0x7f000001L);
637 localhost.sin_family = AF_INET;
638
639 /* bind, but don't listen or accept. should trigger
640 "Connection refused" reliably on most platforms. */
641 fake_listener = socket(localhost.sin_family, SOCK_STREAM, 0);
642 tt_assert(fake_listener >= 0);
643 tt_assert(bind(fake_listener, sa, slen) == 0);
644 tt_assert(getsockname(fake_listener, sa, &slen) == 0);
645 bev = bufferevent_socket_new(data->base, -1,
646 BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS);
647 tt_assert(bev);
648 bufferevent_setcb(bev, NULL, NULL, want_fail_eventcb, data->base);
649
650 r = bufferevent_socket_connect(bev, sa, slen);
651 /* XXXX we'd like to test the '0' case everywhere, but FreeBSD tells
652 * detects the error immediately, which is not really wrong of it. */
653 tt_want(r == 0 || r == -1);
654
655 /* Close the listener socket after a second. This should trigger
656 "connection refused" on some other platforms, including OSX. */
657 evtimer_assign(&close_listener_event, data->base, close_socket_cb,
658 &fake_listener);
659 event_add(&close_listener_event, &one_second);
660 close_listener_event_added = 1;
661
662 event_base_dispatch(data->base);
663
664 tt_int_op(test_ok, ==, 1);
665
666 end:
667 if (fake_listener >= 0)
668 evutil_closesocket(fake_listener);
669
670 if (bev)
671 bufferevent_free(bev);
672
673 if (close_listener_event_added)
674 event_del(&close_listener_event);
675 }
676
677 struct timeout_cb_result {
678 struct timeval read_timeout_at;
679 struct timeval write_timeout_at;
680 struct timeval last_wrote_at;
681 int n_read_timeouts;
682 int n_write_timeouts;
683 int total_calls;
684 };
685
686 static void
687 bev_timeout_write_cb(struct bufferevent *bev, void *arg)
688 {
689 struct timeout_cb_result *res = arg;
690 evutil_gettimeofday(&res->last_wrote_at, NULL);
691 }
692
693 static void
694 bev_timeout_event_cb(struct bufferevent *bev, short what, void *arg)
695 {
696 struct timeout_cb_result *res = arg;
697 ++res->total_calls;
698
699 if ((what & (BEV_EVENT_READING|BEV_EVENT_TIMEOUT))
700 == (BEV_EVENT_READING|BEV_EVENT_TIMEOUT)) {
701 evutil_gettimeofday(&res->read_timeout_at, NULL);
702 ++res->n_read_timeouts;
703 }
704 if ((what & (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT))
705 == (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT)) {
706 evutil_gettimeofday(&res->write_timeout_at, NULL);
707 ++res->n_write_timeouts;
708 }
709 }
710
711 static void
712 test_bufferevent_timeouts(void *arg)
713 {
714 /* "arg" is a string containing "pair" and/or "filter". */
715 struct bufferevent *bev1 = NULL, *bev2 = NULL;
716 struct basic_test_data *data = arg;
717 int use_pair = 0, use_filter = 0;
718 struct timeval tv_w, tv_r, started_at;
719 struct timeout_cb_result res1, res2;
720 char buf[1024];
721
722 memset(&res1, 0, sizeof(res1));
723 memset(&res2, 0, sizeof(res2));
724
725 if (strstr((char*)data->setup_data, "pair"))
726 use_pair = 1;
727 if (strstr((char*)data->setup_data, "filter"))
728 use_filter = 1;
729
730 if (use_pair) {
731 struct bufferevent *p[2];
732 tt_int_op(0, ==, bufferevent_pair_new(data->base, 0, p));
733 bev1 = p[0];
734 bev2 = p[1];
735 } else {
736 bev1 = bufferevent_socket_new(data->base, data->pair[0], 0);
737 bev2 = bufferevent_socket_new(data->base, data->pair[1], 0);
738 }
739
740 tt_assert(bev1);
741 tt_assert(bev2);
742
743 if (use_filter) {
744 struct bufferevent *bevf1, *bevf2;
745 bevf1 = bufferevent_filter_new(bev1, NULL, NULL,
746 BEV_OPT_CLOSE_ON_FREE, NULL, NULL);
747 bevf2 = bufferevent_filter_new(bev2, NULL, NULL,
748 BEV_OPT_CLOSE_ON_FREE, NULL, NULL);
749 tt_assert(bevf1);
750 tt_assert(bevf2);
751 bev1 = bevf1;
752 bev2 = bevf2;
753 }
754
755 /* Do this nice and early. */
756 bufferevent_disable(bev2, EV_READ);
757
758 /* bev1 will try to write and read. Both will time out. */
759 evutil_gettimeofday(&started_at, NULL);
760 tv_w.tv_sec = tv_r.tv_sec = 0;
761 tv_w.tv_usec = 100*1000;
762 tv_r.tv_usec = 150*1000;
763 bufferevent_setcb(bev1, NULL, bev_timeout_write_cb,
764 bev_timeout_event_cb, &res1);
765 bufferevent_setwatermark(bev1, EV_WRITE, 1024*1024+10, 0);
766 bufferevent_set_timeouts(bev1, &tv_r, &tv_w);
767 if (use_pair) {
768 /* For a pair, the fact that the other side isn't reading
769 * makes the writer stall */
770 bufferevent_write(bev1, "ABCDEFG", 7);
771 } else {
772 /* For a real socket, the kernel's TCP buffers can eat a
773 * fair number of bytes; make sure that at some point we
774 * have some bytes that will stall. */
775 struct evbuffer *output = bufferevent_get_output(bev1);
776 int i;
777 memset(buf, 0xbb, sizeof(buf));
778 for (i=0;i<1024;++i) {
779 evbuffer_add_reference(output, buf, sizeof(buf),
780 NULL, NULL);
781 }
782 }
783 bufferevent_enable(bev1, EV_READ|EV_WRITE);
784
785 /* bev2 has nothing to say, and isn't listening. */
786 bufferevent_setcb(bev2, NULL, bev_timeout_write_cb,
787 bev_timeout_event_cb, &res2);
788 tv_w.tv_sec = tv_r.tv_sec = 0;
789 tv_w.tv_usec = 200*1000;
790 tv_r.tv_usec = 100*1000;
791 bufferevent_set_timeouts(bev2, &tv_r, &tv_w);
792 bufferevent_enable(bev2, EV_WRITE);
793
794 tv_r.tv_sec = 0;
795 tv_r.tv_usec = 350000;
796
797 event_base_loopexit(data->base, &tv_r);
798 event_base_dispatch(data->base);
799
800 /* XXXX Test that actually reading or writing a little resets the
801 * timeouts. */
802
803 /* Each buf1 timeout happens, and happens only once. */
804 tt_want(res1.n_read_timeouts);
805 tt_want(res1.n_write_timeouts);
806 tt_want(res1.n_read_timeouts == 1);
807 tt_want(res1.n_write_timeouts == 1);
808
809 test_timeval_diff_eq(&started_at, &res1.read_timeout_at, 150);
810 test_timeval_diff_eq(&started_at, &res1.write_timeout_at, 100);
811
812 end:
813 if (bev1)
814 bufferevent_free(bev1);
815 if (bev2)
816 bufferevent_free(bev2);
817 }
818
819 static void
820 trigger_failure_cb(evutil_socket_t fd, short what, void *ctx)
821 {
822 TT_FAIL(("The triggered callback did not fire or the machine is really slow (try increasing timeout)."));
823 }
824
825 static void
826 trigger_eventcb(struct bufferevent *bev, short what, void *ctx)
827 {
828 struct event_base *base = ctx;
829 if (what == ~0) {
830 TT_BLATHER(("Event successfully triggered."));
831 event_base_loopexit(base, NULL);
832 return;
833 }
834 reader_eventcb(bev, what, ctx);
835 }
836
837 static void
838 trigger_readcb_triggered(struct bufferevent *bev, void *ctx)
839 {
840 TT_BLATHER(("Read successfully triggered."));
841 n_reads_invoked++;
842 bufferevent_trigger_event(bev, ~0, bufferevent_trigger_test_flags);
843 }
844
845 static void
846 trigger_readcb(struct bufferevent *bev, void *ctx)
847 {
848 struct timeval timeout = { 30, 0 };
849 struct event_base *base = ctx;
850 size_t low, high, len;
851 int expected_reads;
852
853 TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev)));
854 expected_reads = ++n_reads_invoked;
855
856 bufferevent_setcb(bev, trigger_readcb_triggered, NULL, trigger_eventcb, ctx);
857
858 bufferevent_getwatermark(bev, EV_READ, &low, &high);
859 len = evbuffer_get_length(bufferevent_get_input(bev));
860
861 bufferevent_setwatermark(bev, EV_READ, len + 1, 0);
862 bufferevent_trigger(bev, EV_READ, bufferevent_trigger_test_flags);
863 /* no callback expected */
864 tt_int_op(n_reads_invoked, ==, expected_reads);
865
866 if ((bufferevent_trigger_test_flags & BEV_TRIG_DEFER_CALLBACKS) ||
867 (bufferevent_connect_test_flags & BEV_OPT_DEFER_CALLBACKS)) {
868 /* will be deferred */
869 } else {
870 expected_reads++;
871 }
872
873 event_base_once(base, -1, EV_TIMEOUT, trigger_failure_cb, NULL, &timeout);
874
875 bufferevent_trigger(bev, EV_READ,
876 bufferevent_trigger_test_flags | BEV_TRIG_IGNORE_WATERMARKS);
877 tt_int_op(n_reads_invoked, ==, expected_reads);
878
879 bufferevent_setwatermark(bev, EV_READ, low, high);
880 end:
881 ;
882 }
883
884 static void
885 test_bufferevent_trigger(void *arg)
886 {
887 struct basic_test_data *data = arg;
888 struct evconnlistener *lev=NULL;
889 struct bufferevent *bev=NULL;
890 struct sockaddr_in localhost;
891 struct sockaddr_storage ss;
892 struct sockaddr *sa;
893 ev_socklen_t slen;
894
895 int be_flags=BEV_OPT_CLOSE_ON_FREE;
896 int trig_flags=0;
897
898 if (strstr((char*)data->setup_data, "defer")) {
899 be_flags |= BEV_OPT_DEFER_CALLBACKS;
900 }
901 bufferevent_connect_test_flags = be_flags;
902
903 if (strstr((char*)data->setup_data, "postpone")) {
904 trig_flags |= BEV_TRIG_DEFER_CALLBACKS;
905 }
906 bufferevent_trigger_test_flags = trig_flags;
907
908 memset(&localhost, 0, sizeof(localhost));
909
910 localhost.sin_port = 0; /* pick-a-port */
911 localhost.sin_addr.s_addr = htonl(0x7f000001L);
912 localhost.sin_family = AF_INET;
913 sa = (struct sockaddr *)&localhost;
914 lev = evconnlistener_new_bind(data->base, listen_cb, data->base,
915 LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE,
916 16, sa, sizeof(localhost));
917 tt_assert(lev);
918
919 sa = (struct sockaddr *)&ss;
920 slen = sizeof(ss);
921 if (regress_get_listener_addr(lev, sa, &slen) < 0) {
922 tt_abort_perror("getsockname");
923 }
924
925 tt_assert(!evconnlistener_enable(lev));
926 bev = bufferevent_socket_new(data->base, -1, be_flags);
927 tt_assert(bev);
928 bufferevent_setcb(bev, trigger_readcb, NULL, trigger_eventcb, data->base);
929
930 bufferevent_enable(bev, EV_READ);
931
932 tt_want(!bufferevent_socket_connect(bev, sa, sizeof(localhost)));
933
934 event_base_dispatch(data->base);
935
936 tt_int_op(n_reads_invoked, ==, 2);
937 end:
938 if (lev)
939 evconnlistener_free(lev);
940
941 if (bev)
942 bufferevent_free(bev);
943 }
944
945 struct testcase_t bufferevent_testcases[] = {
946
947 LEGACY(bufferevent, TT_ISOLATED),
948 LEGACY(bufferevent_pair, TT_ISOLATED),
949 LEGACY(bufferevent_watermarks, TT_ISOLATED),
950 LEGACY(bufferevent_pair_watermarks, TT_ISOLATED),
951 LEGACY(bufferevent_filters, TT_ISOLATED),
952 LEGACY(bufferevent_pair_filters, TT_ISOLATED),
953 { "bufferevent_connect", test_bufferevent_connect, TT_FORK|TT_NEED_BASE,
954 &basic_setup, (void*)"" },
955 { "bufferevent_connect_defer", test_bufferevent_connect,
956 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" },
957 { "bufferevent_connect_lock", test_bufferevent_connect,
958 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, (void*)"lock" },
959 { "bufferevent_connect_lock_defer", test_bufferevent_connect,
960 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
961 (void*)"defer lock" },
962 { "bufferevent_connect_unlocked_cbs", test_bufferevent_connect,
963 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
964 (void*)"lock defer unlocked" },
965 { "bufferevent_connect_fail", test_bufferevent_connect_fail,
966 TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
967 { "bufferevent_timeout", test_bufferevent_timeouts,
968 TT_FORK|TT_NEED_BASE|TT_NEED_SOCKETPAIR, &basic_setup, (void*)"" },
969 { "bufferevent_timeout_pair", test_bufferevent_timeouts,
970 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"pair" },
971 { "bufferevent_timeout_filter", test_bufferevent_timeouts,
972 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter" },
973 { "bufferevent_timeout_filter_pair", test_bufferevent_timeouts,
974 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter pair" },
975 { "bufferevent_trigger", test_bufferevent_trigger, TT_FORK|TT_NEED_BASE,
976 &basic_setup, (void*)"" },
977 { "bufferevent_trigger_defer", test_bufferevent_trigger,
978 TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" },
979 { "bufferevent_trigger_postpone", test_bufferevent_trigger,
980 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
981 (void*)"postpone" },
982 { "bufferevent_trigger_defer_postpone", test_bufferevent_trigger,
983 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
984 (void*)"defer postpone" },
985 #ifdef EVENT__HAVE_LIBZ
986 LEGACY(bufferevent_zlib, TT_ISOLATED),
987 #else
988 { "bufferevent_zlib", NULL, TT_SKIP, NULL, NULL },
989 #endif
990
991 END_OF_TESTCASES,
992 };
993
994 struct testcase_t bufferevent_iocp_testcases[] = {
995
996 LEGACY(bufferevent, TT_ISOLATED|TT_ENABLE_IOCP),
997 LEGACY(bufferevent_watermarks, TT_ISOLATED|TT_ENABLE_IOCP),
998 LEGACY(bufferevent_filters, TT_ISOLATED|TT_ENABLE_IOCP),
999 { "bufferevent_connect", test_bufferevent_connect,
1000 TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"" },
1001 { "bufferevent_connect_defer", test_bufferevent_connect,
1002 TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"defer" },
1003 { "bufferevent_connect_lock", test_bufferevent_connect,
1004 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup,
1005 (void*)"lock" },
1006 { "bufferevent_connect_lock_defer", test_bufferevent_connect,
1007 TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup,
1008 (void*)"defer lock" },
1009 { "bufferevent_connect_fail", test_bufferevent_connect_fail,
1010 TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, NULL },
1011 { "bufferevent_connect_nonblocking", test_bufferevent_connect,
1012 TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup,
1013 (void*)"unset_connectex" },
1014
1015 END_OF_TESTCASES,
1016 };
1017