Home | History | Annotate | Line # | Download | only in dist
bufferevent_async.c revision 1.1.1.2
      1 /*	$NetBSD: bufferevent_async.c,v 1.1.1.2 2017/01/31 21:14:52 christos Exp $	*/
      2 /*
      3  * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
      4  *
      5  * All rights reserved.
      6  *
      7  * Redistribution and use in source and binary forms, with or without
      8  * modification, are permitted provided that the following conditions
      9  * are met:
     10  * 1. Redistributions of source code must retain the above copyright
     11  *    notice, this list of conditions and the following disclaimer.
     12  * 2. Redistributions in binary form must reproduce the above copyright
     13  *    notice, this list of conditions and the following disclaimer in the
     14  *    documentation and/or other materials provided with the distribution.
     15  * 3. The name of the author may not be used to endorse or promote products
     16  *    derived from this software without specific prior written permission.
     17  *
     18  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     19  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     20  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     21  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     22  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     23  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     24  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     25  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     27  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     28  */
     29 
     30 #include "event2/event-config.h"
     31 #include <sys/cdefs.h>
     32 __RCSID("$NetBSD: bufferevent_async.c,v 1.1.1.2 2017/01/31 21:14:52 christos Exp $");
     33 #include "evconfig-private.h"
     34 
     35 #ifdef EVENT__HAVE_SYS_TIME_H
     36 #include <sys/time.h>
     37 #endif
     38 
     39 #include <errno.h>
     40 #include <stdio.h>
     41 #include <stdlib.h>
     42 #include <string.h>
     43 #ifdef EVENT__HAVE_STDARG_H
     44 #include <stdarg.h>
     45 #endif
     46 #ifdef EVENT__HAVE_UNISTD_H
     47 #include <unistd.h>
     48 #endif
     49 
     50 #ifdef _WIN32
     51 #include <winsock2.h>
     52 #include <ws2tcpip.h>
     53 #endif
     54 
     55 #include <sys/queue.h>
     56 
     57 #include "event2/util.h"
     58 #include "event2/bufferevent.h"
     59 #include "event2/buffer.h"
     60 #include "event2/bufferevent_struct.h"
     61 #include "event2/event.h"
     62 #include "event2/util.h"
     63 #include "event-internal.h"
     64 #include "log-internal.h"
     65 #include "mm-internal.h"
     66 #include "bufferevent-internal.h"
     67 #include "util-internal.h"
     68 #include "iocp-internal.h"
     69 
     70 #ifndef SO_UPDATE_CONNECT_CONTEXT
     71 /* Mingw is sometimes missing this */
     72 #define SO_UPDATE_CONNECT_CONTEXT 0x7010
     73 #endif
     74 
     75 /* prototypes */
     76 static int be_async_enable(struct bufferevent *, short);
     77 static int be_async_disable(struct bufferevent *, short);
     78 static void be_async_destruct(struct bufferevent *);
     79 static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
     80 static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
     81 
     82 struct bufferevent_async {
     83 	struct bufferevent_private bev;
     84 	struct event_overlapped connect_overlapped;
     85 	struct event_overlapped read_overlapped;
     86 	struct event_overlapped write_overlapped;
     87 	size_t read_in_progress;
     88 	size_t write_in_progress;
     89 	unsigned ok : 1;
     90 	unsigned read_added : 1;
     91 	unsigned write_added : 1;
     92 };
     93 
     94 const struct bufferevent_ops bufferevent_ops_async = {
     95 	"socket_async",
     96 	evutil_offsetof(struct bufferevent_async, bev.bev),
     97 	be_async_enable,
     98 	be_async_disable,
     99 	NULL, /* Unlink */
    100 	be_async_destruct,
    101 	bufferevent_generic_adj_timeouts_,
    102 	be_async_flush,
    103 	be_async_ctrl,
    104 };
    105 
    106 static inline struct bufferevent_async *
    107 upcast(struct bufferevent *bev)
    108 {
    109 	struct bufferevent_async *bev_a;
    110 	if (bev->be_ops != &bufferevent_ops_async)
    111 		return NULL;
    112 	bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
    113 	return bev_a;
    114 }
    115 
    116 static inline struct bufferevent_async *
    117 upcast_connect(struct event_overlapped *eo)
    118 {
    119 	struct bufferevent_async *bev_a;
    120 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
    121 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
    122 	return bev_a;
    123 }
    124 
    125 static inline struct bufferevent_async *
    126 upcast_read(struct event_overlapped *eo)
    127 {
    128 	struct bufferevent_async *bev_a;
    129 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
    130 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
    131 	return bev_a;
    132 }
    133 
    134 static inline struct bufferevent_async *
    135 upcast_write(struct event_overlapped *eo)
    136 {
    137 	struct bufferevent_async *bev_a;
    138 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
    139 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
    140 	return bev_a;
    141 }
    142 
    143 static void
    144 bev_async_del_write(struct bufferevent_async *beva)
    145 {
    146 	struct bufferevent *bev = &beva->bev.bev;
    147 
    148 	if (beva->write_added) {
    149 		beva->write_added = 0;
    150 		event_base_del_virtual_(bev->ev_base);
    151 	}
    152 }
    153 
    154 static void
    155 bev_async_del_read(struct bufferevent_async *beva)
    156 {
    157 	struct bufferevent *bev = &beva->bev.bev;
    158 
    159 	if (beva->read_added) {
    160 		beva->read_added = 0;
    161 		event_base_del_virtual_(bev->ev_base);
    162 	}
    163 }
    164 
    165 static void
    166 bev_async_add_write(struct bufferevent_async *beva)
    167 {
    168 	struct bufferevent *bev = &beva->bev.bev;
    169 
    170 	if (!beva->write_added) {
    171 		beva->write_added = 1;
    172 		event_base_add_virtual_(bev->ev_base);
    173 	}
    174 }
    175 
    176 static void
    177 bev_async_add_read(struct bufferevent_async *beva)
    178 {
    179 	struct bufferevent *bev = &beva->bev.bev;
    180 
    181 	if (!beva->read_added) {
    182 		beva->read_added = 1;
    183 		event_base_add_virtual_(bev->ev_base);
    184 	}
    185 }
    186 
    187 static void
    188 bev_async_consider_writing(struct bufferevent_async *beva)
    189 {
    190 	size_t at_most;
    191 	int limit;
    192 	struct bufferevent *bev = &beva->bev.bev;
    193 
    194 	/* Don't write if there's a write in progress, or we do not
    195 	 * want to write, or when there's nothing left to write. */
    196 	if (beva->write_in_progress || beva->bev.connecting)
    197 		return;
    198 	if (!beva->ok || !(bev->enabled&EV_WRITE) ||
    199 	    !evbuffer_get_length(bev->output)) {
    200 		bev_async_del_write(beva);
    201 		return;
    202 	}
    203 
    204 	at_most = evbuffer_get_length(bev->output);
    205 
    206 	/* This is safe so long as bufferevent_get_write_max never returns
    207 	 * more than INT_MAX.  That's true for now. XXXX */
    208 	limit = (int)bufferevent_get_write_max_(&beva->bev);
    209 	if (at_most >= (size_t)limit && limit >= 0)
    210 		at_most = limit;
    211 
    212 	if (beva->bev.write_suspended) {
    213 		bev_async_del_write(beva);
    214 		return;
    215 	}
    216 
    217 	/*  XXXX doesn't respect low-water mark very well. */
    218 	bufferevent_incref_(bev);
    219 	if (evbuffer_launch_write_(bev->output, at_most,
    220 	    &beva->write_overlapped)) {
    221 		bufferevent_decref_(bev);
    222 		beva->ok = 0;
    223 		bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
    224 	} else {
    225 		beva->write_in_progress = at_most;
    226 		bufferevent_decrement_write_buckets_(&beva->bev, at_most);
    227 		bev_async_add_write(beva);
    228 	}
    229 }
    230 
    231 static void
    232 bev_async_consider_reading(struct bufferevent_async *beva)
    233 {
    234 	size_t cur_size;
    235 	size_t read_high;
    236 	size_t at_most;
    237 	int limit;
    238 	struct bufferevent *bev = &beva->bev.bev;
    239 
    240 	/* Don't read if there is a read in progress, or we do not
    241 	 * want to read. */
    242 	if (beva->read_in_progress || beva->bev.connecting)
    243 		return;
    244 	if (!beva->ok || !(bev->enabled&EV_READ)) {
    245 		bev_async_del_read(beva);
    246 		return;
    247 	}
    248 
    249 	/* Don't read if we're full */
    250 	cur_size = evbuffer_get_length(bev->input);
    251 	read_high = bev->wm_read.high;
    252 	if (read_high) {
    253 		if (cur_size >= read_high) {
    254 			bev_async_del_read(beva);
    255 			return;
    256 		}
    257 		at_most = read_high - cur_size;
    258 	} else {
    259 		at_most = 16384; /* FIXME totally magic. */
    260 	}
    261 
    262 	/* XXXX This over-commits. */
    263 	/* XXXX see also not above on cast on bufferevent_get_write_max_() */
    264 	limit = (int)bufferevent_get_read_max_(&beva->bev);
    265 	if (at_most >= (size_t)limit && limit >= 0)
    266 		at_most = limit;
    267 
    268 	if (beva->bev.read_suspended) {
    269 		bev_async_del_read(beva);
    270 		return;
    271 	}
    272 
    273 	bufferevent_incref_(bev);
    274 	if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
    275 		beva->ok = 0;
    276 		bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0);
    277 		bufferevent_decref_(bev);
    278 	} else {
    279 		beva->read_in_progress = at_most;
    280 		bufferevent_decrement_read_buckets_(&beva->bev, at_most);
    281 		bev_async_add_read(beva);
    282 	}
    283 
    284 	return;
    285 }
    286 
    287 static void
    288 be_async_outbuf_callback(struct evbuffer *buf,
    289     const struct evbuffer_cb_info *cbinfo,
    290     void *arg)
    291 {
    292 	struct bufferevent *bev = arg;
    293 	struct bufferevent_async *bev_async = upcast(bev);
    294 
    295 	/* If we added data to the outbuf and were not writing before,
    296 	 * we may want to write now. */
    297 
    298 	bufferevent_incref_and_lock_(bev);
    299 
    300 	if (cbinfo->n_added)
    301 		bev_async_consider_writing(bev_async);
    302 
    303 	bufferevent_decref_and_unlock_(bev);
    304 }
    305 
    306 static void
    307 be_async_inbuf_callback(struct evbuffer *buf,
    308     const struct evbuffer_cb_info *cbinfo,
    309     void *arg)
    310 {
    311 	struct bufferevent *bev = arg;
    312 	struct bufferevent_async *bev_async = upcast(bev);
    313 
    314 	/* If we drained data from the inbuf and were not reading before,
    315 	 * we may want to read now */
    316 
    317 	bufferevent_incref_and_lock_(bev);
    318 
    319 	if (cbinfo->n_deleted)
    320 		bev_async_consider_reading(bev_async);
    321 
    322 	bufferevent_decref_and_unlock_(bev);
    323 }
    324 
    325 static int
    326 be_async_enable(struct bufferevent *buf, short what)
    327 {
    328 	struct bufferevent_async *bev_async = upcast(buf);
    329 
    330 	if (!bev_async->ok)
    331 		return -1;
    332 
    333 	if (bev_async->bev.connecting) {
    334 		/* Don't launch anything during connection attempts. */
    335 		return 0;
    336 	}
    337 
    338 	if (what & EV_READ)
    339 		BEV_RESET_GENERIC_READ_TIMEOUT(buf);
    340 	if (what & EV_WRITE)
    341 		BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
    342 
    343 	/* If we newly enable reading or writing, and we aren't reading or
    344 	   writing already, consider launching a new read or write. */
    345 
    346 	if (what & EV_READ)
    347 		bev_async_consider_reading(bev_async);
    348 	if (what & EV_WRITE)
    349 		bev_async_consider_writing(bev_async);
    350 	return 0;
    351 }
    352 
    353 static int
    354 be_async_disable(struct bufferevent *bev, short what)
    355 {
    356 	struct bufferevent_async *bev_async = upcast(bev);
    357 	/* XXXX If we disable reading or writing, we may want to consider
    358 	 * canceling any in-progress read or write operation, though it might
    359 	 * not work. */
    360 
    361 	if (what & EV_READ) {
    362 		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
    363 		bev_async_del_read(bev_async);
    364 	}
    365 	if (what & EV_WRITE) {
    366 		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
    367 		bev_async_del_write(bev_async);
    368 	}
    369 
    370 	return 0;
    371 }
    372 
    373 static void
    374 be_async_destruct(struct bufferevent *bev)
    375 {
    376 	struct bufferevent_async *bev_async = upcast(bev);
    377 	struct bufferevent_private *bev_p = BEV_UPCAST(bev);
    378 	evutil_socket_t fd;
    379 
    380 	EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
    381 			!upcast(bev)->read_in_progress);
    382 
    383 	bev_async_del_read(bev_async);
    384 	bev_async_del_write(bev_async);
    385 
    386 	fd = evbuffer_overlapped_get_fd_(bev->input);
    387 	if (fd != (evutil_socket_t)INVALID_SOCKET &&
    388 		(bev_p->options & BEV_OPT_CLOSE_ON_FREE)) {
    389 		evutil_closesocket(fd);
    390 		evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET);
    391 	}
    392 }
    393 
    394 /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
    395  * we use WSAGetOverlappedResult to translate. */
    396 static void
    397 bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
    398 {
    399 	DWORD bytes, flags;
    400 	evutil_socket_t fd;
    401 
    402 	fd = evbuffer_overlapped_get_fd_(bev->input);
    403 	WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
    404 }
    405 
    406 static int
    407 be_async_flush(struct bufferevent *bev, short what,
    408     enum bufferevent_flush_mode mode)
    409 {
    410 	return 0;
    411 }
    412 
    413 static void
    414 connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
    415     ev_ssize_t nbytes, int ok)
    416 {
    417 	struct bufferevent_async *bev_a = upcast_connect(eo);
    418 	struct bufferevent *bev = &bev_a->bev.bev;
    419 	evutil_socket_t sock;
    420 
    421 	BEV_LOCK(bev);
    422 
    423 	EVUTIL_ASSERT(bev_a->bev.connecting);
    424 	bev_a->bev.connecting = 0;
    425 	sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
    426 	/* XXXX Handle error? */
    427 	setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
    428 
    429 	if (ok)
    430 		bufferevent_async_set_connected_(bev);
    431 	else
    432 		bev_async_set_wsa_error(bev, eo);
    433 
    434 	bufferevent_run_eventcb_(bev,
    435 			ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0);
    436 
    437 	event_base_del_virtual_(bev->ev_base);
    438 
    439 	bufferevent_decref_and_unlock_(bev);
    440 }
    441 
    442 static void
    443 read_complete(struct event_overlapped *eo, ev_uintptr_t key,
    444     ev_ssize_t nbytes, int ok)
    445 {
    446 	struct bufferevent_async *bev_a = upcast_read(eo);
    447 	struct bufferevent *bev = &bev_a->bev.bev;
    448 	short what = BEV_EVENT_READING;
    449 	ev_ssize_t amount_unread;
    450 	BEV_LOCK(bev);
    451 	EVUTIL_ASSERT(bev_a->read_in_progress);
    452 
    453 	amount_unread = bev_a->read_in_progress - nbytes;
    454 	evbuffer_commit_read_(bev->input, nbytes);
    455 	bev_a->read_in_progress = 0;
    456 	if (amount_unread)
    457 		bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
    458 
    459 	if (!ok)
    460 		bev_async_set_wsa_error(bev, eo);
    461 
    462 	if (bev_a->ok) {
    463 		if (ok && nbytes) {
    464 			BEV_RESET_GENERIC_READ_TIMEOUT(bev);
    465 			bufferevent_trigger_nolock_(bev, EV_READ, 0);
    466 			bev_async_consider_reading(bev_a);
    467 		} else if (!ok) {
    468 			what |= BEV_EVENT_ERROR;
    469 			bev_a->ok = 0;
    470 			bufferevent_run_eventcb_(bev, what, 0);
    471 		} else if (!nbytes) {
    472 			what |= BEV_EVENT_EOF;
    473 			bev_a->ok = 0;
    474 			bufferevent_run_eventcb_(bev, what, 0);
    475 		}
    476 	}
    477 
    478 	bufferevent_decref_and_unlock_(bev);
    479 }
    480 
    481 static void
    482 write_complete(struct event_overlapped *eo, ev_uintptr_t key,
    483     ev_ssize_t nbytes, int ok)
    484 {
    485 	struct bufferevent_async *bev_a = upcast_write(eo);
    486 	struct bufferevent *bev = &bev_a->bev.bev;
    487 	short what = BEV_EVENT_WRITING;
    488 	ev_ssize_t amount_unwritten;
    489 
    490 	BEV_LOCK(bev);
    491 	EVUTIL_ASSERT(bev_a->write_in_progress);
    492 
    493 	amount_unwritten = bev_a->write_in_progress - nbytes;
    494 	evbuffer_commit_write_(bev->output, nbytes);
    495 	bev_a->write_in_progress = 0;
    496 
    497 	if (amount_unwritten)
    498 		bufferevent_decrement_write_buckets_(&bev_a->bev,
    499 		                                     -amount_unwritten);
    500 
    501 
    502 	if (!ok)
    503 		bev_async_set_wsa_error(bev, eo);
    504 
    505 	if (bev_a->ok) {
    506 		if (ok && nbytes) {
    507 			BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
    508 			bufferevent_trigger_nolock_(bev, EV_WRITE, 0);
    509 			bev_async_consider_writing(bev_a);
    510 		} else if (!ok) {
    511 			what |= BEV_EVENT_ERROR;
    512 			bev_a->ok = 0;
    513 			bufferevent_run_eventcb_(bev, what, 0);
    514 		} else if (!nbytes) {
    515 			what |= BEV_EVENT_EOF;
    516 			bev_a->ok = 0;
    517 			bufferevent_run_eventcb_(bev, what, 0);
    518 		}
    519 	}
    520 
    521 	bufferevent_decref_and_unlock_(bev);
    522 }
    523 
    524 struct bufferevent *
    525 bufferevent_async_new_(struct event_base *base,
    526     evutil_socket_t fd, int options)
    527 {
    528 	struct bufferevent_async *bev_a;
    529 	struct bufferevent *bev;
    530 	struct event_iocp_port *iocp;
    531 
    532 	options |= BEV_OPT_THREADSAFE;
    533 
    534 	if (!(iocp = event_base_get_iocp_(base)))
    535 		return NULL;
    536 
    537 	if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
    538 		int err = GetLastError();
    539 		/* We may have alrady associated this fd with a port.
    540 		 * Let's hope it's this port, and that the error code
    541 		 * for doing this neer changes. */
    542 		if (err != ERROR_INVALID_PARAMETER)
    543 			return NULL;
    544 	}
    545 
    546 	if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
    547 		return NULL;
    548 
    549 	bev = &bev_a->bev.bev;
    550 	if (!(bev->input = evbuffer_overlapped_new_(fd))) {
    551 		mm_free(bev_a);
    552 		return NULL;
    553 	}
    554 	if (!(bev->output = evbuffer_overlapped_new_(fd))) {
    555 		evbuffer_free(bev->input);
    556 		mm_free(bev_a);
    557 		return NULL;
    558 	}
    559 
    560 	if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
    561 		options)<0)
    562 		goto err;
    563 
    564 	evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
    565 	evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
    566 
    567 	event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
    568 	event_overlapped_init_(&bev_a->read_overlapped, read_complete);
    569 	event_overlapped_init_(&bev_a->write_overlapped, write_complete);
    570 
    571 	bufferevent_init_generic_timeout_cbs_(bev);
    572 
    573 	bev_a->ok = fd >= 0;
    574 
    575 	return bev;
    576 err:
    577 	bufferevent_free(&bev_a->bev.bev);
    578 	return NULL;
    579 }
    580 
    581 void
    582 bufferevent_async_set_connected_(struct bufferevent *bev)
    583 {
    584 	struct bufferevent_async *bev_async = upcast(bev);
    585 	bev_async->ok = 1;
    586 	bufferevent_init_generic_timeout_cbs_(bev);
    587 	/* Now's a good time to consider reading/writing */
    588 	be_async_enable(bev, bev->enabled);
    589 }
    590 
    591 int
    592 bufferevent_async_can_connect_(struct bufferevent *bev)
    593 {
    594 	const struct win32_extension_fns *ext =
    595 	    event_get_win32_extension_fns_();
    596 
    597 	if (BEV_IS_ASYNC(bev) &&
    598 	    event_base_get_iocp_(bev->ev_base) &&
    599 	    ext && ext->ConnectEx)
    600 		return 1;
    601 
    602 	return 0;
    603 }
    604 
    605 int
    606 bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
    607 	const struct sockaddr *sa, int socklen)
    608 {
    609 	BOOL rc;
    610 	struct bufferevent_async *bev_async = upcast(bev);
    611 	struct sockaddr_storage ss;
    612 	const struct win32_extension_fns *ext =
    613 	    event_get_win32_extension_fns_();
    614 
    615 	EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
    616 
    617 	/* ConnectEx() requires that the socket be bound to an address
    618 	 * with bind() before using, otherwise it will fail. We attempt
    619 	 * to issue a bind() here, taking into account that the error
    620 	 * code is set to WSAEINVAL when the socket is already bound. */
    621 	memset(&ss, 0, sizeof(ss));
    622 	if (sa->sa_family == AF_INET) {
    623 		struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
    624 		sin->sin_family = AF_INET;
    625 		sin->sin_addr.s_addr = INADDR_ANY;
    626 	} else if (sa->sa_family == AF_INET6) {
    627 		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
    628 		sin6->sin6_family = AF_INET6;
    629 		sin6->sin6_addr = in6addr_any;
    630 	} else {
    631 		/* Well, the user will have to bind() */
    632 		return -1;
    633 	}
    634 	if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
    635 	    WSAGetLastError() != WSAEINVAL)
    636 		return -1;
    637 
    638 	event_base_add_virtual_(bev->ev_base);
    639 	bufferevent_incref_(bev);
    640 	rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
    641 			    &bev_async->connect_overlapped.overlapped);
    642 	if (rc || WSAGetLastError() == ERROR_IO_PENDING)
    643 		return 0;
    644 
    645 	event_base_del_virtual_(bev->ev_base);
    646 	bufferevent_decref_(bev);
    647 
    648 	return -1;
    649 }
    650 
    651 static int
    652 be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
    653     union bufferevent_ctrl_data *data)
    654 {
    655 	switch (op) {
    656 	case BEV_CTRL_GET_FD:
    657 		data->fd = evbuffer_overlapped_get_fd_(bev->input);
    658 		return 0;
    659 	case BEV_CTRL_SET_FD: {
    660 		struct event_iocp_port *iocp;
    661 
    662 		if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
    663 			return 0;
    664 		if (!(iocp = event_base_get_iocp_(bev->ev_base)))
    665 			return -1;
    666 		if (event_iocp_port_associate_(iocp, data->fd, 1) < 0)
    667 			return -1;
    668 		evbuffer_overlapped_set_fd_(bev->input, data->fd);
    669 		evbuffer_overlapped_set_fd_(bev->output, data->fd);
    670 		return 0;
    671 	}
    672 	case BEV_CTRL_CANCEL_ALL: {
    673 		struct bufferevent_async *bev_a = upcast(bev);
    674 		evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
    675 		if (fd != (evutil_socket_t)INVALID_SOCKET &&
    676 		    (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
    677 			closesocket(fd);
    678 			evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET);
    679 		}
    680 		bev_a->ok = 0;
    681 		return 0;
    682 	}
    683 	case BEV_CTRL_GET_UNDERLYING:
    684 	default:
    685 		return -1;
    686 	}
    687 }
    688 
    689 
    690