Home | History | Annotate | Line # | Download | only in test
regress_bufferevent.c revision 1.7
      1 /*	$NetBSD: regress_bufferevent.c,v 1.7 2024/08/18 20:47:23 christos Exp $	*/
      2 
      3 /*
      4  * Copyright (c) 2003-2007 Niels Provos <provos (at) citi.umich.edu>
      5  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
      6  *
      7  * Redistribution and use in source and binary forms, with or without
      8  * modification, are permitted provided that the following conditions
      9  * are met:
     10  * 1. Redistributions of source code must retain the above copyright
     11  *    notice, this list of conditions and the following disclaimer.
     12  * 2. Redistributions in binary form must reproduce the above copyright
     13  *    notice, this list of conditions and the following disclaimer in the
     14  *    documentation and/or other materials provided with the distribution.
     15  * 3. The name of the author may not be used to endorse or promote products
     16  *    derived from this software without specific prior written permission.
     17  *
     18  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     19  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     20  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     21  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     22  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     23  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     24  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     25  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     27  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     28  */
     29 #include "util-internal.h"
     30 
     31 /* The old tests here need assertions to work. */
     32 #undef NDEBUG
     33 
     34 /**
     35  * - clang supports __has_feature
     36  * - gcc supports __SANITIZE_ADDRESS__
     37  *
     38  * Let's set __SANITIZE_ADDRESS__ if __has_feature(address_sanitizer)
     39  */
     40 #ifndef __has_feature
     41 #define __has_feature(x) 0
     42 #endif
     43 #if !defined(__SANITIZE_ADDRESS__) && __has_feature(address_sanitizer)
     44 #define __SANITIZE_ADDRESS__
     45 #endif
     46 
     47 #ifdef _WIN32
     48 #include <winsock2.h>
     49 #include <windows.h>
     50 #endif
     51 
     52 #include "event2/event-config.h"
     53 
     54 #include <sys/types.h>
     55 #include <sys/stat.h>
     56 #ifdef EVENT__HAVE_SYS_TIME_H
     57 #include <sys/time.h>
     58 #endif
     59 #include <sys/queue.h>
     60 #ifndef _WIN32
     61 #include <sys/socket.h>
     62 #include <sys/wait.h>
     63 #include <signal.h>
     64 #include <unistd.h>
     65 #include <netdb.h>
     66 #include <netinet/in.h>
     67 #endif
     68 #include <fcntl.h>
     69 #include <signal.h>
     70 #include <stdlib.h>
     71 #include <stdio.h>
     72 #include <string.h>
     73 #include <errno.h>
     74 #include <assert.h>
     75 
     76 #ifdef EVENT__HAVE_ARPA_INET_H
     77 #include <arpa/inet.h>
     78 #endif
     79 
     80 #include "event2/event-config.h"
     81 #include "event2/event.h"
     82 #include "event2/event_struct.h"
     83 #include "event2/event_compat.h"
     84 #include "event2/tag.h"
     85 #include "event2/buffer.h"
     86 #include "event2/bufferevent.h"
     87 #include "event2/bufferevent_compat.h"
     88 #include "event2/bufferevent_struct.h"
     89 #include "event2/listener.h"
     90 #include "event2/util.h"
     91 
     92 #include "bufferevent-internal.h"
     93 #include "evthread-internal.h"
     94 #include "util-internal.h"
     95 #ifdef _WIN32
     96 #include "iocp-internal.h"
     97 #endif
     98 
     99 #include "regress.h"
    100 #include "regress_testutils.h"
    101 
    102 /*
    103  * simple bufferevent test
    104  */
    105 
    106 static void
    107 readcb(struct bufferevent *bev, void *arg)
    108 {
    109 	if (evbuffer_get_length(bev->input) == 8333) {
    110 		struct evbuffer *evbuf = evbuffer_new();
    111 		assert(evbuf != NULL);
    112 
    113 		/* gratuitous test of bufferevent_read_buffer */
    114 		bufferevent_read_buffer(bev, evbuf);
    115 
    116 		bufferevent_disable(bev, EV_READ);
    117 
    118 		if (evbuffer_get_length(evbuf) == 8333) {
    119 			test_ok++;
    120 		}
    121 
    122 		evbuffer_free(evbuf);
    123 	}
    124 }
    125 
    126 static void
    127 writecb(struct bufferevent *bev, void *arg)
    128 {
    129 	if (evbuffer_get_length(bev->output) == 0) {
    130 		test_ok++;
    131 	}
    132 }
    133 
    134 static void
    135 errorcb(struct bufferevent *bev, short what, void *arg)
    136 {
    137 	test_ok = -2;
    138 }
    139 
    140 static void
    141 test_bufferevent_impl(int use_pair, int flush)
    142 {
    143 	struct bufferevent *bev1 = NULL, *bev2 = NULL;
    144 	char buffer[8333];
    145 	int i;
    146 	int expected = 2;
    147 
    148 	if (use_pair) {
    149 		struct bufferevent *pair[2];
    150 		tt_assert(0 == bufferevent_pair_new(NULL, 0, pair));
    151 		bev1 = pair[0];
    152 		bev2 = pair[1];
    153 		bufferevent_setcb(bev1, readcb, writecb, errorcb, bev1);
    154 		bufferevent_setcb(bev2, readcb, writecb, errorcb, NULL);
    155 		tt_fd_op(bufferevent_getfd(bev1), ==, EVUTIL_INVALID_SOCKET);
    156 		tt_ptr_op(bufferevent_get_underlying(bev1), ==, NULL);
    157 		tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, bev2);
    158 		tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, bev1);
    159 	} else {
    160 		bev1 = bufferevent_new(pair[0], readcb, writecb, errorcb, NULL);
    161 		bev2 = bufferevent_new(pair[1], readcb, writecb, errorcb, NULL);
    162 		tt_fd_op(bufferevent_getfd(bev1), ==, pair[0]);
    163 		tt_ptr_op(bufferevent_get_underlying(bev1), ==, NULL);
    164 		tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, NULL);
    165 		tt_ptr_op(bufferevent_pair_get_partner(bev2), ==, NULL);
    166 	}
    167 
    168 	{
    169 		/* Test getcb. */
    170 		bufferevent_data_cb r, w;
    171 		bufferevent_event_cb e;
    172 		void *a;
    173 		bufferevent_getcb(bev1, &r, &w, &e, &a);
    174 		tt_ptr_op(r, ==, readcb);
    175 		tt_ptr_op(w, ==, writecb);
    176 		tt_ptr_op(e, ==, errorcb);
    177 		tt_ptr_op(a, ==, use_pair ? bev1 : NULL);
    178 	}
    179 
    180 	bufferevent_disable(bev1, EV_READ);
    181 	bufferevent_enable(bev2, EV_READ);
    182 
    183 	tt_int_op(bufferevent_get_enabled(bev1), ==, EV_WRITE);
    184 	tt_int_op(bufferevent_get_enabled(bev2), ==, EV_WRITE|EV_READ);
    185 
    186 	for (i = 0; i < (int)sizeof(buffer); i++)
    187 		buffer[i] = i;
    188 
    189 	bufferevent_write(bev1, buffer, sizeof(buffer));
    190 	if (flush >= 0) {
    191 		tt_int_op(bufferevent_flush(bev1, EV_WRITE, flush), >=, 0);
    192 	}
    193 
    194 	event_dispatch();
    195 
    196 	bufferevent_free(bev2);
    197 	tt_ptr_op(bufferevent_pair_get_partner(bev1), ==, NULL);
    198 	bufferevent_free(bev1);
    199 
    200 	/** Only pair call errorcb for BEV_FINISHED */
    201 	if (use_pair && flush == BEV_FINISHED) {
    202 		expected = -1;
    203 	}
    204 	if (test_ok != expected)
    205 		test_ok = 0;
    206 end:
    207 	;
    208 }
    209 
    210 static void test_bufferevent(void) { test_bufferevent_impl(0, -1); }
    211 static void test_bufferevent_pair(void) { test_bufferevent_impl(1, -1); }
    212 
    213 static void test_bufferevent_flush_normal(void) { test_bufferevent_impl(0, BEV_NORMAL); }
    214 static void test_bufferevent_flush_flush(void) { test_bufferevent_impl(0, BEV_FLUSH); }
    215 static void test_bufferevent_flush_finished(void) { test_bufferevent_impl(0, BEV_FINISHED); }
    216 
    217 static void test_bufferevent_pair_flush_normal(void) { test_bufferevent_impl(1, BEV_NORMAL); }
    218 static void test_bufferevent_pair_flush_flush(void) { test_bufferevent_impl(1, BEV_FLUSH); }
    219 static void test_bufferevent_pair_flush_finished(void) { test_bufferevent_impl(1, BEV_FINISHED); }
    220 
    221 #if defined(EVTHREAD_USE_PTHREADS_IMPLEMENTED) && !defined(__SANITIZE_ADDRESS__)
    222 /**
    223  * Trace lock/unlock/alloc/free for locks.
    224  * (More heavier then evthread_debug*)
    225  */
    226 typedef struct
    227 {
    228 	void *lock;
    229 	enum {
    230 		ALLOC, FREE,
    231 	} status;
    232 	size_t locked /** allow recursive locking */;
    233 } lock_wrapper;
    234 struct lock_unlock_base
    235 {
    236 	/* Original callbacks */
    237 	struct evthread_lock_callbacks cbs;
    238 	/* Map of locks */
    239 	lock_wrapper *locks;
    240 	size_t nr_locks;
    241 } lu_base = {
    242 	.locks = NULL,
    243 };
    244 
    245 static lock_wrapper *lu_find(void *lock_)
    246 {
    247 	size_t i;
    248 	for (i = 0; i < lu_base.nr_locks; ++i) {
    249 		lock_wrapper *lock = &lu_base.locks[i];
    250 		if (lock->lock == lock_)
    251 			return lock;
    252 	}
    253 	return NULL;
    254 }
    255 
    256 static void *trace_lock_alloc(unsigned locktype)
    257 {
    258 	void *lock;
    259 	++lu_base.nr_locks;
    260 	lu_base.locks = realloc(lu_base.locks,
    261 		sizeof(lock_wrapper) * lu_base.nr_locks);
    262 	lock = lu_base.cbs.alloc(locktype);
    263 	lu_base.locks[lu_base.nr_locks - 1] = (lock_wrapper){ lock, ALLOC, 0 };
    264 	return lock;
    265 }
    266 static void trace_lock_free(void *lock_, unsigned locktype)
    267 {
    268 	lock_wrapper *lock = lu_find(lock_);
    269 	if (!lock || lock->status == FREE || lock->locked) {
    270 		TT_FAIL(("lock: free error"));
    271 	} else {
    272 		lock->status = FREE;
    273 		lu_base.cbs.free(lock_, locktype);
    274 	}
    275 }
    276 static int trace_lock_lock(unsigned mode, void *lock_)
    277 {
    278 	lock_wrapper *lock = lu_find(lock_);
    279 	if (!lock || lock->status == FREE) {
    280 		TT_FAIL(("lock: lock error"));
    281 		return -1;
    282 	} else {
    283 		++lock->locked;
    284 		return lu_base.cbs.lock(mode, lock_);
    285 	}
    286 }
    287 static int trace_lock_unlock(unsigned mode, void *lock_)
    288 {
    289 	lock_wrapper *lock = lu_find(lock_);
    290 	if (!lock || lock->status == FREE || !lock->locked) {
    291 		TT_FAIL(("lock: unlock error"));
    292 		return -1;
    293 	} else {
    294 		--lock->locked;
    295 		return lu_base.cbs.unlock(mode, lock_);
    296 	}
    297 }
    298 static void lock_unlock_free_thread_cbs(void)
    299 {
    300 	event_base_free(NULL);
    301 
    302 	if (libevent_tests_running_in_debug_mode)
    303 		libevent_global_shutdown();
    304 
    305 	/** drop immutable flag */
    306 	evthread_set_lock_callbacks(NULL);
    307 	/** avoid calling of event_global_setup_locks_() for new cbs */
    308 	libevent_global_shutdown();
    309 	/** drop immutable flag for non-debug ops (since called after shutdown) */
    310 	evthread_set_lock_callbacks(NULL);
    311 }
    312 
    313 static int use_lock_unlock_profiler(void)
    314 {
    315 	struct evthread_lock_callbacks cbs = {
    316 		EVTHREAD_LOCK_API_VERSION,
    317 		EVTHREAD_LOCKTYPE_RECURSIVE,
    318 		trace_lock_alloc,
    319 		trace_lock_free,
    320 		trace_lock_lock,
    321 		trace_lock_unlock,
    322 	};
    323 	memcpy(&lu_base.cbs, evthread_get_lock_callbacks(),
    324 		sizeof(lu_base.cbs));
    325 	{
    326 		lock_unlock_free_thread_cbs();
    327 
    328 		evthread_set_lock_callbacks(&cbs);
    329 		/** re-create debug locks correctly */
    330 		evthread_enable_lock_debugging();
    331 
    332 		event_init();
    333 	}
    334 	return 0;
    335 }
    336 static void free_lock_unlock_profiler(struct basic_test_data *data)
    337 {
    338 	/** fix "held_by" for kqueue */
    339 	evthread_set_lock_callbacks(NULL);
    340 
    341 	lock_unlock_free_thread_cbs();
    342 	free(lu_base.locks);
    343 	data->base = NULL;
    344 }
    345 
    346 static void test_bufferevent_pair_release_lock(void *arg)
    347 {
    348 	struct basic_test_data *data = arg;
    349 	use_lock_unlock_profiler();
    350 	{
    351 		struct bufferevent *pair[2];
    352 		if (!bufferevent_pair_new(NULL, BEV_OPT_THREADSAFE, pair)) {
    353 			bufferevent_free(pair[0]);
    354 			bufferevent_free(pair[1]);
    355 		} else
    356 			tt_abort_perror("bufferevent_pair_new");
    357 	}
    358 	free_lock_unlock_profiler(data);
    359 end:
    360 	;
    361 }
    362 #endif
    363 
    364 /*
    365  * test watermarks and bufferevent
    366  */
    367 
    368 static void
    369 wm_readcb(struct bufferevent *bev, void *arg)
    370 {
    371 	struct evbuffer *evbuf = evbuffer_new();
    372 	int len = (int)evbuffer_get_length(bev->input);
    373 	static int nread;
    374 
    375 	assert(len >= 10 && len <= 20);
    376 
    377 	assert(evbuf != NULL);
    378 
    379 	/* gratuitous test of bufferevent_read_buffer */
    380 	bufferevent_read_buffer(bev, evbuf);
    381 
    382 	nread += len;
    383 	if (nread == 65000) {
    384 		bufferevent_disable(bev, EV_READ);
    385 		test_ok++;
    386 	}
    387 
    388 	evbuffer_free(evbuf);
    389 }
    390 
    391 static void
    392 wm_writecb(struct bufferevent *bev, void *arg)
    393 {
    394 	assert(evbuffer_get_length(bev->output) <= 100);
    395 	if (evbuffer_get_length(bev->output) == 0) {
    396 		evbuffer_drain(bev->output, evbuffer_get_length(bev->output));
    397 		test_ok++;
    398 	}
    399 }
    400 
    401 static void
    402 wm_errorcb(struct bufferevent *bev, short what, void *arg)
    403 {
    404 	test_ok = -2;
    405 }
    406 
    407 static void
    408 test_bufferevent_watermarks_impl(int use_pair)
    409 {
    410 	struct bufferevent *bev1 = NULL, *bev2 = NULL;
    411 	char buffer[65000];
    412 	size_t low, high;
    413 	int i;
    414 	test_ok = 0;
    415 
    416 	if (use_pair) {
    417 		struct bufferevent *pair[2];
    418 		tt_assert(0 == bufferevent_pair_new(NULL, 0, pair));
    419 		bev1 = pair[0];
    420 		bev2 = pair[1];
    421 		bufferevent_setcb(bev1, NULL, wm_writecb, errorcb, NULL);
    422 		bufferevent_setcb(bev2, wm_readcb, NULL, errorcb, NULL);
    423 	} else {
    424 		bev1 = bufferevent_new(pair[0], NULL, wm_writecb, wm_errorcb, NULL);
    425 		bev2 = bufferevent_new(pair[1], wm_readcb, NULL, wm_errorcb, NULL);
    426 	}
    427 	tt_assert(bev1);
    428 	tt_assert(bev2);
    429 	bufferevent_disable(bev1, EV_READ);
    430 	bufferevent_enable(bev2, EV_READ);
    431 
    432 	/* By default, low watermarks are set to 0 */
    433 	bufferevent_getwatermark(bev1, EV_READ, &low, NULL);
    434 	tt_int_op(low, ==, 0);
    435 	bufferevent_getwatermark(bev2, EV_WRITE, &low, NULL);
    436 	tt_int_op(low, ==, 0);
    437 
    438 	for (i = 0; i < (int)sizeof(buffer); i++)
    439 		buffer[i] = (char)i;
    440 
    441 	/* limit the reading on the receiving bufferevent */
    442 	bufferevent_setwatermark(bev2, EV_READ, 10, 20);
    443 
    444 	bufferevent_getwatermark(bev2, EV_READ, &low, &high);
    445 	tt_int_op(low, ==, 10);
    446 	tt_int_op(high, ==, 20);
    447 
    448 	/* Tell the sending bufferevent not to notify us till it's down to
    449 	   100 bytes. */
    450 	bufferevent_setwatermark(bev1, EV_WRITE, 100, 2000);
    451 
    452 	bufferevent_getwatermark(bev1, EV_WRITE, &low, &high);
    453 	tt_int_op(low, ==, 100);
    454 	tt_int_op(high, ==, 2000);
    455 
    456 	{
    457 	int r = bufferevent_getwatermark(bev1, EV_WRITE | EV_READ, &low, &high);
    458 	tt_int_op(r, !=, 0);
    459 	}
    460 
    461 	bufferevent_write(bev1, buffer, sizeof(buffer));
    462 
    463 	event_dispatch();
    464 
    465 	tt_int_op(test_ok, ==, 2);
    466 
    467 	/* The write callback drained all the data from outbuf, so we
    468 	 * should have removed the write event... */
    469 	tt_assert(!event_pending(&bev2->ev_write, EV_WRITE, NULL));
    470 
    471 end:
    472 	if (bev1)
    473 		bufferevent_free(bev1);
    474 	if (bev2)
    475 		bufferevent_free(bev2);
    476 }
    477 
    478 static void
    479 test_bufferevent_watermarks(void)
    480 {
    481 	test_bufferevent_watermarks_impl(0);
    482 }
    483 
    484 static void
    485 test_bufferevent_pair_watermarks(void)
    486 {
    487 	test_bufferevent_watermarks_impl(1);
    488 }
    489 
    490 /*
    491  * Test bufferevent filters
    492  */
    493 
    494 /* strip an 'x' from each byte */
    495 
    496 static enum bufferevent_filter_result
    497 bufferevent_input_filter(struct evbuffer *src, struct evbuffer *dst,
    498     ev_ssize_t lim, enum bufferevent_flush_mode state, void *ctx)
    499 {
    500 	const unsigned char *buffer;
    501 	unsigned i;
    502 
    503 	buffer = evbuffer_pullup(src, evbuffer_get_length(src));
    504 	for (i = 0; i < evbuffer_get_length(src); i += 2) {
    505 		if (buffer[i] == '-')
    506 			continue;
    507 
    508 		assert(buffer[i] == 'x');
    509 		evbuffer_add(dst, buffer + i + 1, 1);
    510 	}
    511 
    512 	evbuffer_drain(src, i);
    513 	return (BEV_OK);
    514 }
    515 
    516 /* add an 'x' before each byte */
    517 
    518 static enum bufferevent_filter_result
    519 bufferevent_output_filter(struct evbuffer *src, struct evbuffer *dst,
    520     ev_ssize_t lim, enum bufferevent_flush_mode state, void *ctx)
    521 {
    522 	const unsigned char *buffer;
    523 	unsigned i;
    524 	struct bufferevent **bevp = ctx;
    525 
    526 	++test_ok;
    527 
    528 	if (test_ok == 1) {
    529 		buffer = evbuffer_pullup(src, evbuffer_get_length(src));
    530 		for (i = 0; i < evbuffer_get_length(src); ++i) {
    531 			evbuffer_add(dst, "x", 1);
    532 			evbuffer_add(dst, buffer + i, 1);
    533 		}
    534 		evbuffer_drain(src, evbuffer_get_length(src));
    535 	} else {
    536 		return BEV_ERROR;
    537 	}
    538 
    539 	if (bevp && test_ok == 1) {
    540 		int prev = ++test_ok;
    541 		bufferevent_write(*bevp, "-", 1);
    542 		/* check that during this bufferevent_write()
    543 		 * bufferevent_output_filter() will not be called again */
    544 		assert(test_ok == prev);
    545 		--test_ok;
    546 	}
    547 
    548 	return (BEV_OK);
    549 }
    550 
    551 static void
    552 test_bufferevent_filters_impl(int use_pair, int disable)
    553 {
    554 	struct bufferevent *bev1 = NULL, *bev2 = NULL;
    555 	struct bufferevent *bev1_base = NULL, *bev2_base = NULL;
    556 	char buffer[8333];
    557 	int i;
    558 
    559 	test_ok = 0;
    560 
    561 	if (use_pair) {
    562 		struct bufferevent *pair[2];
    563 		tt_assert(0 == bufferevent_pair_new(NULL, 0, pair));
    564 		bev1 = pair[0];
    565 		bev2 = pair[1];
    566 	} else {
    567 		bev1 = bufferevent_socket_new(NULL, pair[0], 0);
    568 		bev2 = bufferevent_socket_new(NULL, pair[1], 0);
    569 	}
    570 	bev1_base = bev1;
    571 	bev2_base = bev2;
    572 
    573 	for (i = 0; i < (int)sizeof(buffer); i++)
    574 		buffer[i] = i;
    575 
    576 	bev1 = bufferevent_filter_new(bev1, NULL, bufferevent_output_filter,
    577 				      BEV_OPT_CLOSE_ON_FREE, NULL,
    578 					  disable ? &bev1 : NULL);
    579 
    580 	bev2 = bufferevent_filter_new(bev2, bufferevent_input_filter,
    581 				      NULL, BEV_OPT_CLOSE_ON_FREE, NULL, NULL);
    582 	bufferevent_setcb(bev1, NULL, writecb, errorcb, NULL);
    583 	bufferevent_setcb(bev2, readcb, NULL, errorcb, NULL);
    584 
    585 	tt_ptr_op(bufferevent_get_underlying(bev1), ==, bev1_base);
    586 	tt_ptr_op(bufferevent_get_underlying(bev2), ==, bev2_base);
    587 	tt_fd_op(bufferevent_getfd(bev1), ==, bufferevent_getfd(bev1_base));
    588 	tt_fd_op(bufferevent_getfd(bev2), ==, bufferevent_getfd(bev2_base));
    589 
    590 	bufferevent_disable(bev1, EV_READ);
    591 	bufferevent_enable(bev2, EV_READ);
    592 	/* insert some filters */
    593 	bufferevent_write(bev1, buffer, sizeof(buffer));
    594 
    595 	event_dispatch();
    596 
    597 	if (test_ok != 3 + !!disable)
    598 		test_ok = 0;
    599 
    600 end:
    601 	if (bev1)
    602 		bufferevent_free(bev1);
    603 	if (bev2)
    604 		bufferevent_free(bev2);
    605 
    606 }
    607 
    608 static void test_bufferevent_filters(void)
    609 { test_bufferevent_filters_impl(0, 0); }
    610 static void test_bufferevent_pair_filters(void)
    611 { test_bufferevent_filters_impl(1, 0); }
    612 static void test_bufferevent_filters_disable(void)
    613 { test_bufferevent_filters_impl(0, 1); }
    614 static void test_bufferevent_pair_filters_disable(void)
    615 { test_bufferevent_filters_impl(1, 1); }
    616 
    617 
    618 static void
    619 sender_writecb(struct bufferevent *bev, void *ctx)
    620 {
    621 	if (evbuffer_get_length(bufferevent_get_output(bev)) == 0) {
    622 		bufferevent_disable(bev,EV_READ|EV_WRITE);
    623 		TT_BLATHER(("Flushed %d: freeing it.", (int)bufferevent_getfd(bev)));
    624 		bufferevent_free(bev);
    625 	}
    626 }
    627 
    628 static void
    629 sender_errorcb(struct bufferevent *bev, short what, void *ctx)
    630 {
    631 	TT_FAIL(("Got sender error %d",(int)what));
    632 }
    633 
    634 static int bufferevent_connect_test_flags = 0;
    635 static int bufferevent_trigger_test_flags = 0;
    636 static int n_strings_read = 0;
    637 static int n_reads_invoked = 0;
    638 static int n_events_invoked = 0;
    639 
    640 #define TEST_STR "Now is the time for all good events to signal for " \
    641 	"the good of their protocol"
    642 static void
    643 listen_cb(struct evconnlistener *listener, evutil_socket_t fd,
    644     struct sockaddr *sa, int socklen, void *arg)
    645 {
    646 	struct event_base *base = arg;
    647 	struct bufferevent *bev;
    648 	const char s[] = TEST_STR;
    649 	TT_BLATHER(("Got a request on socket %d", (int)fd ));
    650 	bev = bufferevent_socket_new(base, fd, bufferevent_connect_test_flags);
    651 	tt_assert(bev);
    652 	bufferevent_setcb(bev, NULL, sender_writecb, sender_errorcb, NULL);
    653 	bufferevent_write(bev, s, sizeof(s));
    654 end:
    655 	;
    656 }
    657 
    658 static evutil_socket_t
    659 fake_listener_create(struct sockaddr_in *localhost)
    660 {
    661 	struct sockaddr *sa = (struct sockaddr *)localhost;
    662 	evutil_socket_t fd = -1;
    663 	ev_socklen_t slen = sizeof(*localhost);
    664 
    665 	memset(localhost, 0, sizeof(*localhost));
    666 	localhost->sin_port = 0; /* have the kernel pick a port */
    667 	localhost->sin_addr.s_addr = htonl(0x7f000001L);
    668 	localhost->sin_family = AF_INET;
    669 
    670 	/* bind, but don't listen or accept. should trigger
    671 	   "Connection refused" reliably on most platforms. */
    672 	fd = socket(localhost->sin_family, SOCK_STREAM, 0);
    673 	tt_assert(fd >= 0);
    674 	tt_assert(bind(fd, sa, slen) == 0);
    675 	tt_assert(getsockname(fd, sa, &slen) == 0);
    676 
    677 	return fd;
    678 
    679 end:
    680 	return -1;
    681 }
    682 
    683 static void
    684 reader_eventcb(struct bufferevent *bev, short what, void *ctx)
    685 {
    686 	struct event_base *base = ctx;
    687 	if (what & BEV_EVENT_ERROR) {
    688 		perror("foobar");
    689 		TT_FAIL(("got connector error %d", (int)what));
    690 		return;
    691 	}
    692 	if (what & BEV_EVENT_CONNECTED) {
    693 		TT_BLATHER(("connected on %d", (int)bufferevent_getfd(bev)));
    694 		bufferevent_enable(bev, EV_READ);
    695 	}
    696 	if (what & BEV_EVENT_EOF) {
    697 		char buf[512];
    698 		size_t n;
    699 		n = bufferevent_read(bev, buf, sizeof(buf)-1);
    700 		tt_int_op(n, >=, 0);
    701 		buf[n] = '\0';
    702 		tt_str_op(buf, ==, TEST_STR);
    703 		if (++n_strings_read == 2)
    704 			event_base_loopexit(base, NULL);
    705 		TT_BLATHER(("EOF on %d: %d strings read.",
    706 			(int)bufferevent_getfd(bev), n_strings_read));
    707 	}
    708 end:
    709 	;
    710 }
    711 
    712 static void
    713 reader_eventcb_simple(struct bufferevent *bev, short what, void *ctx)
    714 {
    715 	TT_BLATHER(("Read eventcb simple invoked on %d.",
    716 		(int)bufferevent_getfd(bev)));
    717 	n_events_invoked++;
    718 }
    719 
    720 static void
    721 reader_readcb(struct bufferevent *bev, void *ctx)
    722 {
    723 	TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev)));
    724 	n_reads_invoked++;
    725 }
    726 
    727 static void
    728 test_bufferevent_connect(void *arg)
    729 {
    730 	struct basic_test_data *data = arg;
    731 	struct evconnlistener *lev=NULL;
    732 	struct bufferevent *bev1=NULL, *bev2=NULL;
    733 	struct sockaddr_in localhost;
    734 	struct sockaddr_storage ss;
    735 	struct sockaddr *sa;
    736 	ev_socklen_t slen;
    737 
    738 	int be_flags=BEV_OPT_CLOSE_ON_FREE;
    739 
    740 	if (strstr((char*)data->setup_data, "defer")) {
    741 		be_flags |= BEV_OPT_DEFER_CALLBACKS;
    742 	}
    743 	if (strstr((char*)data->setup_data, "unlocked")) {
    744 		be_flags |= BEV_OPT_UNLOCK_CALLBACKS;
    745 	}
    746 	if (strstr((char*)data->setup_data, "lock")) {
    747 		be_flags |= BEV_OPT_THREADSAFE;
    748 	}
    749 	bufferevent_connect_test_flags = be_flags;
    750 #ifdef _WIN32
    751 	if (!strcmp((char*)data->setup_data, "unset_connectex")) {
    752 		struct win32_extension_fns *ext =
    753 		    (struct win32_extension_fns *)
    754 		    event_get_win32_extension_fns_();
    755 		ext->ConnectEx = NULL;
    756 	}
    757 #endif
    758 
    759 	memset(&localhost, 0, sizeof(localhost));
    760 
    761 	localhost.sin_port = 0; /* pick-a-port */
    762 	localhost.sin_addr.s_addr = htonl(0x7f000001L);
    763 	localhost.sin_family = AF_INET;
    764 	sa = (struct sockaddr *)&localhost;
    765 	lev = evconnlistener_new_bind(data->base, listen_cb, data->base,
    766 	    LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE,
    767 	    16, sa, sizeof(localhost));
    768 	tt_assert(lev);
    769 
    770 	sa = (struct sockaddr *)&ss;
    771 	slen = sizeof(ss);
    772 	if (regress_get_listener_addr(lev, sa, &slen) < 0) {
    773 		tt_abort_perror("getsockname");
    774 	}
    775 
    776 	tt_assert(!evconnlistener_enable(lev));
    777 	bev1 = bufferevent_socket_new(data->base, -1, be_flags);
    778 	bev2 = bufferevent_socket_new(data->base, -1, be_flags);
    779 	tt_assert(bev1);
    780 	tt_assert(bev2);
    781 	bufferevent_setcb(bev1, reader_readcb,NULL, reader_eventcb, data->base);
    782 	bufferevent_setcb(bev2, reader_readcb,NULL, reader_eventcb, data->base);
    783 
    784 	bufferevent_enable(bev1, EV_READ);
    785 	bufferevent_enable(bev2, EV_READ);
    786 
    787 	tt_want(!bufferevent_socket_connect(bev1, sa, sizeof(localhost)));
    788 	tt_want(!bufferevent_socket_connect(bev2, sa, sizeof(localhost)));
    789 
    790 	event_base_dispatch(data->base);
    791 
    792 	tt_int_op(n_strings_read, ==, 2);
    793 	tt_int_op(n_reads_invoked, >=, 2);
    794 end:
    795 	if (lev)
    796 		evconnlistener_free(lev);
    797 
    798 	if (bev1)
    799 		bufferevent_free(bev1);
    800 
    801 	if (bev2)
    802 		bufferevent_free(bev2);
    803 }
    804 
    805 static void
    806 close_socket_cb(evutil_socket_t fd, short what, void *arg)
    807 {
    808 	evutil_socket_t *fdp = arg;
    809 	if (*fdp >= 0) {
    810 		evutil_closesocket(*fdp);
    811 		*fdp = -1;
    812 	}
    813 }
    814 
    815 static void
    816 test_bufferevent_connect_fail_eventcb(void *arg)
    817 {
    818 	struct basic_test_data *data = arg;
    819 	int flags = BEV_OPT_CLOSE_ON_FREE | (long)data->setup_data;
    820 	struct event close_listener_event;
    821 	struct bufferevent *bev = NULL;
    822 	struct evconnlistener *lev = NULL;
    823 	struct sockaddr_in localhost;
    824 	struct timeval close_timeout = { 0, 300000 };
    825 	ev_socklen_t slen = sizeof(localhost);
    826 	evutil_socket_t fake_listener = -1;
    827 	int r;
    828 
    829 	fake_listener = fake_listener_create(&localhost);
    830 
    831 	tt_int_op(n_events_invoked, ==, 0);
    832 
    833 	bev = bufferevent_socket_new(data->base, -1, flags);
    834 	tt_assert(bev);
    835 	bufferevent_setcb(bev, reader_readcb, reader_readcb,
    836 		reader_eventcb_simple, data->base);
    837 	bufferevent_enable(bev, EV_READ|EV_WRITE);
    838 	tt_int_op(n_events_invoked, ==, 0);
    839 	tt_int_op(n_reads_invoked, ==, 0);
    840 
    841 	/** @see also test_bufferevent_connect_fail() */
    842 	r = bufferevent_socket_connect(bev, (struct sockaddr *)&localhost, slen);
    843 	/* XXXX we'd like to test the '0' case everywhere, but FreeBSD tells
    844 	 * detects the error immediately, which is not really wrong of it. */
    845 	tt_want(r == 0 || r == -1);
    846 
    847 	tt_int_op(n_events_invoked, ==, 0);
    848 	tt_int_op(n_reads_invoked, ==, 0);
    849 
    850 	/* Close the listener socket after a delay. This should trigger
    851 	   "connection refused" on some other platforms, including OSX. */
    852 	evtimer_assign(&close_listener_event, data->base, close_socket_cb,
    853 	    &fake_listener);
    854 	event_add(&close_listener_event, &close_timeout);
    855 
    856 	event_base_dispatch(data->base);
    857 	tt_int_op(n_events_invoked, ==, 1);
    858 	tt_int_op(n_reads_invoked, ==, 0);
    859 
    860 end:
    861 	if (lev)
    862 		evconnlistener_free(lev);
    863 	if (bev)
    864 		bufferevent_free(bev);
    865 	if (fake_listener >= 0)
    866 		evutil_closesocket(fake_listener);
    867 }
    868 
    869 static void
    870 want_fail_eventcb(struct bufferevent *bev, short what, void *ctx)
    871 {
    872 	struct event_base *base = ctx;
    873 	const char *err;
    874 	evutil_socket_t s;
    875 
    876 	if (what & BEV_EVENT_ERROR) {
    877 		s = bufferevent_getfd(bev);
    878 		err = evutil_socket_error_to_string(evutil_socket_geterror(s));
    879 		TT_BLATHER(("connection failure on "EV_SOCK_FMT": %s",
    880 			EV_SOCK_ARG(s), err));
    881 		test_ok = 1;
    882 	} else {
    883 		TT_FAIL(("didn't fail? what %hd", what));
    884 	}
    885 
    886 	event_base_loopexit(base, NULL);
    887 }
    888 
    889 static void
    890 test_bufferevent_connect_fail(void *arg)
    891 {
    892 	struct basic_test_data *data = (struct basic_test_data *)arg;
    893 	struct bufferevent *bev=NULL;
    894 	struct event close_listener_event;
    895 	int close_listener_event_added = 0;
    896 	struct timeval close_timeout = { 0, 300000 };
    897 	struct sockaddr_in localhost;
    898 	ev_socklen_t slen = sizeof(localhost);
    899 	evutil_socket_t fake_listener = -1;
    900 	int r;
    901 
    902 	test_ok = 0;
    903 
    904 	fake_listener = fake_listener_create(&localhost);
    905 	bev = bufferevent_socket_new(data->base, -1,
    906 		BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS);
    907 	tt_assert(bev);
    908 	bufferevent_setcb(bev, NULL, NULL, want_fail_eventcb, data->base);
    909 
    910 	r = bufferevent_socket_connect(bev, (struct sockaddr *)&localhost, slen);
    911 	/* XXXX we'd like to test the '0' case everywhere, but FreeBSD tells
    912 	 * detects the error immediately, which is not really wrong of it. */
    913 	tt_want(r == 0 || r == -1);
    914 
    915 	/* Close the listener socket after a delay. This should trigger
    916 	   "connection refused" on some other platforms, including OSX. */
    917 	evtimer_assign(&close_listener_event, data->base, close_socket_cb,
    918 	    &fake_listener);
    919 	event_add(&close_listener_event, &close_timeout);
    920 	close_listener_event_added = 1;
    921 
    922 	event_base_dispatch(data->base);
    923 
    924 	tt_int_op(test_ok, ==, 1);
    925 
    926 end:
    927 	if (fake_listener >= 0)
    928 		evutil_closesocket(fake_listener);
    929 
    930 	if (bev)
    931 		bufferevent_free(bev);
    932 
    933 	if (close_listener_event_added)
    934 		event_del(&close_listener_event);
    935 }
    936 
    937 struct timeout_cb_result {
    938 	struct timeval read_timeout_at;
    939 	struct timeval write_timeout_at;
    940 	struct timeval last_wrote_at;
    941 	struct timeval last_read_at;
    942 	int n_read_timeouts;
    943 	int n_write_timeouts;
    944 	int total_calls;
    945 };
    946 
    947 static void
    948 bev_timeout_read_cb(struct bufferevent *bev, void *arg)
    949 {
    950 	struct timeout_cb_result *res = arg;
    951 	evutil_gettimeofday(&res->last_read_at, NULL);
    952 }
    953 static void
    954 bev_timeout_write_cb(struct bufferevent *bev, void *arg)
    955 {
    956 	struct timeout_cb_result *res = arg;
    957 	evutil_gettimeofday(&res->last_wrote_at, NULL);
    958 }
    959 static void
    960 bev_timeout_event_cb(struct bufferevent *bev, short what, void *arg)
    961 {
    962 	struct timeout_cb_result *res = arg;
    963 	++res->total_calls;
    964 
    965 	if ((what & (BEV_EVENT_READING|BEV_EVENT_TIMEOUT))
    966 	    == (BEV_EVENT_READING|BEV_EVENT_TIMEOUT)) {
    967 		evutil_gettimeofday(&res->read_timeout_at, NULL);
    968 		++res->n_read_timeouts;
    969 	}
    970 	if ((what & (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT))
    971 	    == (BEV_EVENT_WRITING|BEV_EVENT_TIMEOUT)) {
    972 		evutil_gettimeofday(&res->write_timeout_at, NULL);
    973 		++res->n_write_timeouts;
    974 	}
    975 }
    976 
    977 static void
    978 test_bufferevent_timeouts(void *arg)
    979 {
    980 	/* "arg" is a string containing "pair" and/or "filter". */
    981 	struct bufferevent *bev1 = NULL, *bev2 = NULL;
    982 	struct basic_test_data *data = arg;
    983 	int use_pair = 0, use_filter = 0;
    984 	struct timeval tv_w, tv_r, started_at;
    985 	struct timeout_cb_result res1, res2;
    986 
    987 	memset(&res1, 0, sizeof(res1));
    988 	memset(&res2, 0, sizeof(res2));
    989 
    990 	if (strstr((char*)data->setup_data, "pair"))
    991 		use_pair = 1;
    992 	if (strstr((char*)data->setup_data, "filter"))
    993 		use_filter = 1;
    994 
    995 	if (use_pair) {
    996 		struct bufferevent *p[2];
    997 		tt_int_op(0, ==, bufferevent_pair_new(data->base, 0, p));
    998 		bev1 = p[0];
    999 		bev2 = p[1];
   1000 	} else {
   1001 		bev1 = bufferevent_socket_new(data->base, data->pair[0], 0);
   1002 		bev2 = bufferevent_socket_new(data->base, data->pair[1], 0);
   1003 	}
   1004 	tt_assert(bev1);
   1005 	tt_assert(bev2);
   1006 
   1007 	if (use_filter) {
   1008 		struct bufferevent *bevf1, *bevf2;
   1009 		bevf1 = bufferevent_filter_new(bev1, NULL, NULL,
   1010 		    BEV_OPT_CLOSE_ON_FREE, NULL, NULL);
   1011 		bevf2 = bufferevent_filter_new(bev2, NULL, NULL,
   1012 		    BEV_OPT_CLOSE_ON_FREE, NULL, NULL);
   1013 		tt_assert(bevf1);
   1014 		tt_assert(bevf2);
   1015 		bev1 = bevf1;
   1016 		bev2 = bevf2;
   1017 	}
   1018 
   1019 	/* Do this nice and early. */
   1020 	bufferevent_disable(bev2, EV_READ);
   1021 
   1022 	/* bev1 will try to write and read.  Both will time out. */
   1023 	evutil_gettimeofday(&started_at, NULL);
   1024 	tv_w.tv_sec = tv_r.tv_sec = 0;
   1025 	tv_w.tv_usec = 100*1000;
   1026 	tv_r.tv_usec = 150*1000;
   1027 	bufferevent_setcb(bev1, bev_timeout_read_cb, bev_timeout_write_cb,
   1028 	    bev_timeout_event_cb, &res1);
   1029 	bufferevent_set_timeouts(bev1, &tv_r, &tv_w);
   1030 	bufferevent_write(bev1, "ABCDEFG", 7);
   1031 	bufferevent_enable(bev1, EV_READ|EV_WRITE);
   1032 
   1033 	/* bev2 has nothing to say, and isn't listening. */
   1034 	bufferevent_setcb(bev2, bev_timeout_read_cb, bev_timeout_write_cb,
   1035 	    bev_timeout_event_cb, &res2);
   1036 	tv_w.tv_sec = tv_r.tv_sec = 0;
   1037 	tv_w.tv_usec = 200*1000;
   1038 	tv_r.tv_usec = 100*1000;
   1039 	bufferevent_set_timeouts(bev2, &tv_r, &tv_w);
   1040 	bufferevent_enable(bev2, EV_WRITE);
   1041 
   1042 	tv_r.tv_sec = 0;
   1043 	tv_r.tv_usec = 350000;
   1044 
   1045 	event_base_loopexit(data->base, &tv_r);
   1046 	event_base_dispatch(data->base);
   1047 
   1048 	/* XXXX Test that actually reading or writing a little resets the
   1049 	 * timeouts. */
   1050 
   1051 	tt_want(res1.total_calls == 2);
   1052 	tt_want(res1.n_read_timeouts == 1);
   1053 	tt_want(res1.n_write_timeouts == 1);
   1054 	tt_want(res2.total_calls == !(use_pair && !use_filter));
   1055 	tt_want(res2.n_write_timeouts == !(use_pair && !use_filter));
   1056 	tt_want(!res2.n_read_timeouts);
   1057 
   1058 	test_timeval_diff_eq(&started_at, &res1.read_timeout_at, 150);
   1059 	test_timeval_diff_eq(&started_at, &res1.write_timeout_at, 100);
   1060 
   1061 #define tt_assert_timeval_empty(tv) do {  \
   1062 	tt_int_op((tv).tv_sec, ==, 0);   \
   1063 	tt_int_op((tv).tv_usec, ==, 0);  \
   1064 } while(0)
   1065 	tt_assert_timeval_empty(res1.last_read_at);
   1066 	tt_assert_timeval_empty(res2.last_read_at);
   1067 	tt_assert_timeval_empty(res2.last_wrote_at);
   1068 	tt_assert_timeval_empty(res2.last_wrote_at);
   1069 #undef tt_assert_timeval_empty
   1070 
   1071 end:
   1072 	if (bev1)
   1073 		bufferevent_free(bev1);
   1074 	if (bev2)
   1075 		bufferevent_free(bev2);
   1076 }
   1077 
   1078 static void
   1079 trigger_failure_cb(evutil_socket_t fd, short what, void *ctx)
   1080 {
   1081 	TT_FAIL(("The triggered callback did not fire or the machine is really slow (try increasing timeout)."));
   1082 }
   1083 
   1084 static void
   1085 trigger_eventcb(struct bufferevent *bev, short what, void *ctx)
   1086 {
   1087 	struct event_base *base = ctx;
   1088 	if (what == ~0) {
   1089 		TT_BLATHER(("Event successfully triggered."));
   1090 		event_base_loopexit(base, NULL);
   1091 		return;
   1092 	}
   1093 	reader_eventcb(bev, what, ctx);
   1094 }
   1095 
   1096 static void
   1097 trigger_readcb_triggered(struct bufferevent *bev, void *ctx)
   1098 {
   1099 	TT_BLATHER(("Read successfully triggered."));
   1100 	n_reads_invoked++;
   1101 	bufferevent_trigger_event(bev, ~0, bufferevent_trigger_test_flags);
   1102 }
   1103 
   1104 static void
   1105 trigger_readcb(struct bufferevent *bev, void *ctx)
   1106 {
   1107 	struct timeval timeout = { 30, 0 };
   1108 	struct event_base *base = ctx;
   1109 	size_t low, high, len;
   1110 	int expected_reads;
   1111 
   1112 	TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev)));
   1113 	expected_reads = ++n_reads_invoked;
   1114 
   1115 	bufferevent_setcb(bev, trigger_readcb_triggered, NULL, trigger_eventcb, ctx);
   1116 
   1117 	bufferevent_getwatermark(bev, EV_READ, &low, &high);
   1118 	len = evbuffer_get_length(bufferevent_get_input(bev));
   1119 
   1120 	bufferevent_setwatermark(bev, EV_READ, len + 1, 0);
   1121 	bufferevent_trigger(bev, EV_READ, bufferevent_trigger_test_flags);
   1122 	/* no callback expected */
   1123 	tt_int_op(n_reads_invoked, ==, expected_reads);
   1124 
   1125 	if ((bufferevent_trigger_test_flags & BEV_TRIG_DEFER_CALLBACKS) ||
   1126 	    (bufferevent_connect_test_flags & BEV_OPT_DEFER_CALLBACKS)) {
   1127 		/* will be deferred */
   1128 	} else {
   1129 		expected_reads++;
   1130 	}
   1131 
   1132 	event_base_once(base, -1, EV_TIMEOUT, trigger_failure_cb, NULL, &timeout);
   1133 
   1134 	bufferevent_trigger(bev, EV_READ,
   1135 	    bufferevent_trigger_test_flags | BEV_TRIG_IGNORE_WATERMARKS);
   1136 	tt_int_op(n_reads_invoked, ==, expected_reads);
   1137 
   1138 	bufferevent_setwatermark(bev, EV_READ, low, high);
   1139 end:
   1140 	;
   1141 }
   1142 
   1143 static void
   1144 test_bufferevent_trigger(void *arg)
   1145 {
   1146 	struct basic_test_data *data = arg;
   1147 	struct evconnlistener *lev=NULL;
   1148 	struct bufferevent *bev=NULL;
   1149 	struct sockaddr_in localhost;
   1150 	struct sockaddr_storage ss;
   1151 	struct sockaddr *sa;
   1152 	ev_socklen_t slen;
   1153 
   1154 	int be_flags=BEV_OPT_CLOSE_ON_FREE;
   1155 	int trig_flags=0;
   1156 
   1157 	if (strstr((char*)data->setup_data, "defer")) {
   1158 		be_flags |= BEV_OPT_DEFER_CALLBACKS;
   1159 	}
   1160 	bufferevent_connect_test_flags = be_flags;
   1161 
   1162 	if (strstr((char*)data->setup_data, "postpone")) {
   1163 		trig_flags |= BEV_TRIG_DEFER_CALLBACKS;
   1164 	}
   1165 	bufferevent_trigger_test_flags = trig_flags;
   1166 
   1167 	memset(&localhost, 0, sizeof(localhost));
   1168 
   1169 	localhost.sin_port = 0; /* pick-a-port */
   1170 	localhost.sin_addr.s_addr = htonl(0x7f000001L);
   1171 	localhost.sin_family = AF_INET;
   1172 	sa = (struct sockaddr *)&localhost;
   1173 	lev = evconnlistener_new_bind(data->base, listen_cb, data->base,
   1174 	    LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE,
   1175 	    16, sa, sizeof(localhost));
   1176 	tt_assert(lev);
   1177 
   1178 	sa = (struct sockaddr *)&ss;
   1179 	slen = sizeof(ss);
   1180 	if (regress_get_listener_addr(lev, sa, &slen) < 0) {
   1181 		tt_abort_perror("getsockname");
   1182 	}
   1183 
   1184 	tt_assert(!evconnlistener_enable(lev));
   1185 	bev = bufferevent_socket_new(data->base, -1, be_flags);
   1186 	tt_assert(bev);
   1187 	bufferevent_setcb(bev, trigger_readcb, NULL, trigger_eventcb, data->base);
   1188 
   1189 	bufferevent_enable(bev, EV_READ);
   1190 
   1191 	tt_want(!bufferevent_socket_connect(bev, sa, sizeof(localhost)));
   1192 
   1193 	event_base_dispatch(data->base);
   1194 
   1195 	tt_int_op(n_reads_invoked, ==, 2);
   1196 end:
   1197 	if (lev)
   1198 		evconnlistener_free(lev);
   1199 
   1200 	if (bev)
   1201 		bufferevent_free(bev);
   1202 }
   1203 
   1204 static void
   1205 test_bufferevent_socket_filter_inactive(void *arg)
   1206 {
   1207 	struct basic_test_data *data = arg;
   1208 	struct bufferevent *bev = NULL, *bevf = NULL;
   1209 
   1210 	bev = bufferevent_socket_new(data->base, -1, 0);
   1211 	tt_assert(bev);
   1212 	bevf = bufferevent_filter_new(bev, NULL, NULL, 0, NULL, NULL);
   1213 	tt_assert(bevf);
   1214 
   1215 end:
   1216 	if (bevf)
   1217 		bufferevent_free(bevf);
   1218 	if (bev)
   1219 		bufferevent_free(bev);
   1220 }
   1221 
   1222 static void
   1223 pair_flush_eventcb(struct bufferevent *bev, short what, void *ctx)
   1224 {
   1225 	int *callback_what = ctx;
   1226 	*callback_what = what;
   1227 }
   1228 
   1229 static void
   1230 test_bufferevent_pair_flush(void *arg)
   1231 {
   1232 	struct basic_test_data *data = arg;
   1233 	struct bufferevent *pair[2];
   1234 	struct bufferevent *bev1 = NULL;
   1235 	struct bufferevent *bev2 = NULL;
   1236 	int callback_what = 0;
   1237 
   1238 	tt_assert(0 == bufferevent_pair_new(data->base, 0, pair));
   1239 	bev1 = pair[0];
   1240 	bev2 = pair[1];
   1241 	tt_assert(0 == bufferevent_enable(bev1, EV_WRITE));
   1242 	tt_assert(0 == bufferevent_enable(bev2, EV_READ));
   1243 
   1244 	bufferevent_setcb(bev2, NULL, NULL, pair_flush_eventcb, &callback_what);
   1245 
   1246 	bufferevent_flush(bev1, EV_WRITE, BEV_FINISHED);
   1247 
   1248 	event_base_loop(data->base, EVLOOP_ONCE);
   1249 
   1250 	tt_assert(callback_what == (BEV_EVENT_READING | BEV_EVENT_EOF));
   1251 
   1252 end:
   1253 	if (bev1)
   1254 		bufferevent_free(bev1);
   1255 	if (bev2)
   1256 		bufferevent_free(bev2);
   1257 }
   1258 
   1259 struct bufferevent_filter_data_stuck {
   1260 	size_t header_size;
   1261 	size_t total_read;
   1262 };
   1263 
   1264 static void
   1265 bufferevent_filter_data_stuck_readcb(struct bufferevent *bev, void *arg)
   1266 {
   1267 	struct bufferevent_filter_data_stuck *filter_data = arg;
   1268 	struct evbuffer *input = bufferevent_get_input(bev);
   1269 	size_t read_size = evbuffer_get_length(input);
   1270 	evbuffer_drain(input, read_size);
   1271 	filter_data->total_read += read_size;
   1272 }
   1273 
   1274 /**
   1275  * This filter prepends header once before forwarding data.
   1276  */
   1277 static enum bufferevent_filter_result
   1278 bufferevent_filter_data_stuck_inputcb(
   1279     struct evbuffer *src, struct evbuffer *dst, ev_ssize_t dst_limit,
   1280     enum bufferevent_flush_mode mode, void *ctx)
   1281 {
   1282 	struct bufferevent_filter_data_stuck *filter_data = ctx;
   1283 	static int header_inserted = 0;
   1284 	size_t payload_size;
   1285 	size_t header_size = 0;
   1286 
   1287 	if (!header_inserted) {
   1288 		char *header = calloc(filter_data->header_size, 1);
   1289 		evbuffer_add(dst, header, filter_data->header_size);
   1290 		free(header);
   1291 		header_size = filter_data->header_size;
   1292 		header_inserted = 1;
   1293 	}
   1294 
   1295 	payload_size = evbuffer_get_length(src);
   1296 	if (payload_size > dst_limit - header_size) {
   1297 		payload_size = dst_limit - header_size;
   1298 	}
   1299 
   1300 	tt_int_op(payload_size, ==, evbuffer_remove_buffer(src, dst, payload_size));
   1301 
   1302 end:
   1303 	return BEV_OK;
   1304 }
   1305 
   1306 static void
   1307 test_bufferevent_filter_data_stuck(void *arg)
   1308 {
   1309 	const size_t read_high_wm = 4096;
   1310 	struct bufferevent_filter_data_stuck filter_data;
   1311 	struct basic_test_data *data = arg;
   1312 	struct bufferevent *pair[2];
   1313 	struct bufferevent *filter = NULL;
   1314 
   1315 	int options = BEV_OPT_CLOSE_ON_FREE | BEV_OPT_DEFER_CALLBACKS;
   1316 
   1317 	char payload[4096];
   1318 	int payload_size = sizeof(payload);
   1319 
   1320 	memset(&filter_data, 0, sizeof(filter_data));
   1321 	filter_data.header_size = 20;
   1322 
   1323 	tt_assert(bufferevent_pair_new(data->base, options, pair) == 0);
   1324 
   1325 	bufferevent_setwatermark(pair[0], EV_READ, 0, read_high_wm);
   1326 	bufferevent_setwatermark(pair[1], EV_READ, 0, read_high_wm);
   1327 
   1328 	tt_assert(
   1329 		filter =
   1330 		 bufferevent_filter_new(pair[1],
   1331 		 bufferevent_filter_data_stuck_inputcb,
   1332 		 NULL,
   1333 		 options,
   1334 		 NULL,
   1335 		 &filter_data));
   1336 
   1337 	bufferevent_setcb(filter,
   1338 		bufferevent_filter_data_stuck_readcb,
   1339 		NULL,
   1340 		NULL,
   1341 		&filter_data);
   1342 
   1343 	tt_assert(bufferevent_enable(filter, EV_READ|EV_WRITE) == 0);
   1344 
   1345 	bufferevent_setwatermark(filter, EV_READ, 0, read_high_wm);
   1346 
   1347 	tt_assert(bufferevent_write(pair[0], payload, sizeof(payload)) == 0);
   1348 
   1349 	event_base_dispatch(data->base);
   1350 
   1351 	tt_int_op(filter_data.total_read, ==, payload_size + filter_data.header_size);
   1352 end:
   1353 	if (pair[0])
   1354 		bufferevent_free(pair[0]);
   1355 	if (filter)
   1356 		bufferevent_free(filter);
   1357 }
   1358 
   1359 struct testcase_t bufferevent_testcases[] = {
   1360 
   1361 	LEGACY(bufferevent, TT_ISOLATED),
   1362 	LEGACY(bufferevent_pair, TT_ISOLATED),
   1363 	LEGACY(bufferevent_flush_normal, TT_ISOLATED),
   1364 	LEGACY(bufferevent_flush_flush, TT_ISOLATED),
   1365 	LEGACY(bufferevent_flush_finished, TT_ISOLATED),
   1366 	LEGACY(bufferevent_pair_flush_normal, TT_ISOLATED),
   1367 	LEGACY(bufferevent_pair_flush_flush, TT_ISOLATED),
   1368 	LEGACY(bufferevent_pair_flush_finished, TT_ISOLATED),
   1369 #if defined(EVTHREAD_USE_PTHREADS_IMPLEMENTED) && !defined(__SANITIZE_ADDRESS__)
   1370 	{ "bufferevent_pair_release_lock", test_bufferevent_pair_release_lock,
   1371 	  TT_FORK|TT_ISOLATED|TT_NEED_THREADS|TT_NEED_BASE|TT_LEGACY|TT_NO_LOGS,
   1372 	  &basic_setup, NULL },
   1373 #endif
   1374 	LEGACY(bufferevent_watermarks, TT_ISOLATED),
   1375 	LEGACY(bufferevent_pair_watermarks, TT_ISOLATED),
   1376 	LEGACY(bufferevent_filters, TT_ISOLATED),
   1377 	LEGACY(bufferevent_pair_filters, TT_ISOLATED),
   1378 	LEGACY(bufferevent_filters_disable, TT_ISOLATED),
   1379 	LEGACY(bufferevent_pair_filters_disable, TT_ISOLATED),
   1380 	{ "bufferevent_connect", test_bufferevent_connect, TT_FORK|TT_NEED_BASE,
   1381 	  &basic_setup, (void*)"" },
   1382 	{ "bufferevent_connect_defer", test_bufferevent_connect,
   1383 	  TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" },
   1384 	{ "bufferevent_connect_lock", test_bufferevent_connect,
   1385 	  TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, (void*)"lock" },
   1386 	{ "bufferevent_connect_lock_defer", test_bufferevent_connect,
   1387 	  TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
   1388 	  (void*)"defer lock" },
   1389 	{ "bufferevent_connect_unlocked_cbs", test_bufferevent_connect,
   1390 	  TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
   1391 	  (void*)"lock defer unlocked" },
   1392 	{ "bufferevent_connect_fail", test_bufferevent_connect_fail,
   1393 	  TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
   1394 	{ "bufferevent_timeout", test_bufferevent_timeouts,
   1395 	  TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"" },
   1396 	{ "bufferevent_timeout_pair", test_bufferevent_timeouts,
   1397 	  TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"pair" },
   1398 	{ "bufferevent_timeout_filter", test_bufferevent_timeouts,
   1399 	  TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter" },
   1400 	{ "bufferevent_timeout_filter_pair", test_bufferevent_timeouts,
   1401 	  TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter pair" },
   1402 	{ "bufferevent_trigger", test_bufferevent_trigger, TT_FORK|TT_NEED_BASE,
   1403 	  &basic_setup, (void*)"" },
   1404 	{ "bufferevent_trigger_defer", test_bufferevent_trigger,
   1405 	  TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" },
   1406 	{ "bufferevent_trigger_postpone", test_bufferevent_trigger,
   1407 	  TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
   1408 	  (void*)"postpone" },
   1409 	{ "bufferevent_trigger_defer_postpone", test_bufferevent_trigger,
   1410 	  TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
   1411 	  (void*)"defer postpone" },
   1412 #ifdef EVENT__HAVE_LIBZ
   1413 	LEGACY(bufferevent_zlib, TT_ISOLATED),
   1414 #else
   1415 	{ "bufferevent_zlib", NULL, TT_SKIP, NULL, NULL },
   1416 #endif
   1417 
   1418 	{ "bufferevent_connect_fail_eventcb_defer",
   1419 	  test_bufferevent_connect_fail_eventcb,
   1420 	  TT_FORK|TT_NEED_BASE, &basic_setup, (void*)BEV_OPT_DEFER_CALLBACKS },
   1421 	{ "bufferevent_connect_fail_eventcb",
   1422 	  test_bufferevent_connect_fail_eventcb,
   1423 	  TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
   1424 
   1425 	{ "bufferevent_socket_filter_inactive",
   1426 	  test_bufferevent_socket_filter_inactive,
   1427 	  TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
   1428 	{ "bufferevent_pair_flush",
   1429 	  test_bufferevent_pair_flush,
   1430 	  TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
   1431 	{ "bufferevent_filter_data_stuck",
   1432 	  test_bufferevent_filter_data_stuck,
   1433 	  TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
   1434 
   1435 	END_OF_TESTCASES,
   1436 };
   1437 
   1438 #define TT_IOCP (TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP)
   1439 #define TT_IOCP_LEGACY (TT_ISOLATED|TT_ENABLE_IOCP)
   1440 struct testcase_t bufferevent_iocp_testcases[] = {
   1441 	LEGACY(bufferevent, TT_IOCP_LEGACY),
   1442 	LEGACY(bufferevent_flush_normal, TT_ISOLATED),
   1443 	LEGACY(bufferevent_flush_flush, TT_ISOLATED),
   1444 	LEGACY(bufferevent_flush_finished, TT_ISOLATED),
   1445 	LEGACY(bufferevent_watermarks, TT_IOCP_LEGACY),
   1446 	LEGACY(bufferevent_filters, TT_IOCP_LEGACY),
   1447 	LEGACY(bufferevent_filters_disable, TT_IOCP_LEGACY),
   1448 
   1449 	{ "bufferevent_connect", test_bufferevent_connect,
   1450 	  TT_IOCP, &basic_setup, (void*)"" },
   1451 	{ "bufferevent_connect_defer", test_bufferevent_connect,
   1452 	  TT_IOCP, &basic_setup, (void*)"defer" },
   1453 	{ "bufferevent_connect_lock", test_bufferevent_connect,
   1454 	  TT_IOCP, &basic_setup, (void*)"lock" },
   1455 	{ "bufferevent_connect_lock_defer", test_bufferevent_connect,
   1456 	  TT_IOCP, &basic_setup, (void*)"defer lock" },
   1457 	{ "bufferevent_connect_fail", test_bufferevent_connect_fail,
   1458 	  TT_IOCP, &basic_setup, NULL },
   1459 	{ "bufferevent_connect_nonblocking", test_bufferevent_connect,
   1460 	  TT_IOCP, &basic_setup, (void*)"unset_connectex" },
   1461 
   1462 	{ "bufferevent_connect_fail_eventcb_defer",
   1463 	  test_bufferevent_connect_fail_eventcb,
   1464 	  TT_IOCP, &basic_setup, (void*)BEV_OPT_DEFER_CALLBACKS },
   1465 	{ "bufferevent_connect_fail_eventcb",
   1466 	  test_bufferevent_connect_fail_eventcb, TT_IOCP, &basic_setup, NULL },
   1467 
   1468 	END_OF_TESTCASES,
   1469 };
   1470