Home | History | Annotate | Line # | Download | only in libevent
bufferevent_async.c revision 1.1.1.1.4.2
      1 /*	$NetBSD: bufferevent_async.c,v 1.1.1.1.4.2 2014/05/22 15:50:12 yamt Exp $	*/
      2 
      3 /*
      4  * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
      5  *
      6  * All rights reserved.
      7  *
      8  * Redistribution and use in source and binary forms, with or without
      9  * modification, are permitted provided that the following conditions
     10  * are met:
     11  * 1. Redistributions of source code must retain the above copyright
     12  *    notice, this list of conditions and the following disclaimer.
     13  * 2. Redistributions in binary form must reproduce the above copyright
     14  *    notice, this list of conditions and the following disclaimer in the
     15  *    documentation and/or other materials provided with the distribution.
     16  * 3. The name of the author may not be used to endorse or promote products
     17  *    derived from this software without specific prior written permission.
     18  *
     19  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     20  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     21  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     22  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     23  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     24  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     28  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     29  */
     30 
     31 #include "event2/event-config.h"
     32 #include "evconfig-private.h"
     33 
     34 #ifdef EVENT__HAVE_SYS_TIME_H
     35 #include <sys/time.h>
     36 #endif
     37 
     38 #include <errno.h>
     39 #include <stdio.h>
     40 #include <stdlib.h>
     41 #include <string.h>
     42 #ifdef EVENT__HAVE_STDARG_H
     43 #include <stdarg.h>
     44 #endif
     45 #ifdef EVENT__HAVE_UNISTD_H
     46 #include <unistd.h>
     47 #endif
     48 
     49 #ifdef _WIN32
     50 #include <winsock2.h>
     51 #include <ws2tcpip.h>
     52 #endif
     53 
     54 #include <sys/queue.h>
     55 
     56 #include "event2/util.h"
     57 #include "event2/bufferevent.h"
     58 #include "event2/buffer.h"
     59 #include "event2/bufferevent_struct.h"
     60 #include "event2/event.h"
     61 #include "event2/util.h"
     62 #include "event-internal.h"
     63 #include "log-internal.h"
     64 #include "mm-internal.h"
     65 #include "bufferevent-internal.h"
     66 #include "util-internal.h"
     67 #include "iocp-internal.h"
     68 
     69 #ifndef SO_UPDATE_CONNECT_CONTEXT
     70 /* Mingw is sometimes missing this */
     71 #define SO_UPDATE_CONNECT_CONTEXT 0x7010
     72 #endif
     73 
     74 /* prototypes */
     75 static int be_async_enable(struct bufferevent *, short);
     76 static int be_async_disable(struct bufferevent *, short);
     77 static void be_async_destruct(struct bufferevent *);
     78 static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
     79 static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
     80 
     81 struct bufferevent_async {
     82 	struct bufferevent_private bev;
     83 	struct event_overlapped connect_overlapped;
     84 	struct event_overlapped read_overlapped;
     85 	struct event_overlapped write_overlapped;
     86 	size_t read_in_progress;
     87 	size_t write_in_progress;
     88 	unsigned ok : 1;
     89 	unsigned read_added : 1;
     90 	unsigned write_added : 1;
     91 };
     92 
     93 const struct bufferevent_ops bufferevent_ops_async = {
     94 	"socket_async",
     95 	evutil_offsetof(struct bufferevent_async, bev.bev),
     96 	be_async_enable,
     97 	be_async_disable,
     98 	be_async_destruct,
     99 	bufferevent_generic_adj_timeouts_,
    100 	be_async_flush,
    101 	be_async_ctrl,
    102 };
    103 
    104 static inline struct bufferevent_async *
    105 upcast(struct bufferevent *bev)
    106 {
    107 	struct bufferevent_async *bev_a;
    108 	if (bev->be_ops != &bufferevent_ops_async)
    109 		return NULL;
    110 	bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
    111 	return bev_a;
    112 }
    113 
    114 static inline struct bufferevent_async *
    115 upcast_connect(struct event_overlapped *eo)
    116 {
    117 	struct bufferevent_async *bev_a;
    118 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
    119 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
    120 	return bev_a;
    121 }
    122 
    123 static inline struct bufferevent_async *
    124 upcast_read(struct event_overlapped *eo)
    125 {
    126 	struct bufferevent_async *bev_a;
    127 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
    128 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
    129 	return bev_a;
    130 }
    131 
    132 static inline struct bufferevent_async *
    133 upcast_write(struct event_overlapped *eo)
    134 {
    135 	struct bufferevent_async *bev_a;
    136 	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
    137 	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
    138 	return bev_a;
    139 }
    140 
    141 static void
    142 bev_async_del_write(struct bufferevent_async *beva)
    143 {
    144 	struct bufferevent *bev = &beva->bev.bev;
    145 
    146 	if (beva->write_added) {
    147 		beva->write_added = 0;
    148 		event_base_del_virtual_(bev->ev_base);
    149 	}
    150 }
    151 
    152 static void
    153 bev_async_del_read(struct bufferevent_async *beva)
    154 {
    155 	struct bufferevent *bev = &beva->bev.bev;
    156 
    157 	if (beva->read_added) {
    158 		beva->read_added = 0;
    159 		event_base_del_virtual_(bev->ev_base);
    160 	}
    161 }
    162 
    163 static void
    164 bev_async_add_write(struct bufferevent_async *beva)
    165 {
    166 	struct bufferevent *bev = &beva->bev.bev;
    167 
    168 	if (!beva->write_added) {
    169 		beva->write_added = 1;
    170 		event_base_add_virtual_(bev->ev_base);
    171 	}
    172 }
    173 
    174 static void
    175 bev_async_add_read(struct bufferevent_async *beva)
    176 {
    177 	struct bufferevent *bev = &beva->bev.bev;
    178 
    179 	if (!beva->read_added) {
    180 		beva->read_added = 1;
    181 		event_base_add_virtual_(bev->ev_base);
    182 	}
    183 }
    184 
    185 static void
    186 bev_async_consider_writing(struct bufferevent_async *beva)
    187 {
    188 	size_t at_most;
    189 	int limit;
    190 	struct bufferevent *bev = &beva->bev.bev;
    191 
    192 	/* Don't write if there's a write in progress, or we do not
    193 	 * want to write, or when there's nothing left to write. */
    194 	if (beva->write_in_progress || beva->bev.connecting)
    195 		return;
    196 	if (!beva->ok || !(bev->enabled&EV_WRITE) ||
    197 	    !evbuffer_get_length(bev->output)) {
    198 		bev_async_del_write(beva);
    199 		return;
    200 	}
    201 
    202 	at_most = evbuffer_get_length(bev->output);
    203 
    204 	/* This is safe so long as bufferevent_get_write_max never returns
    205 	 * more than INT_MAX.  That's true for now. XXXX */
    206 	limit = (int)bufferevent_get_write_max_(&beva->bev);
    207 	if (at_most >= (size_t)limit && limit >= 0)
    208 		at_most = limit;
    209 
    210 	if (beva->bev.write_suspended) {
    211 		bev_async_del_write(beva);
    212 		return;
    213 	}
    214 
    215 	/*  XXXX doesn't respect low-water mark very well. */
    216 	bufferevent_incref_(bev);
    217 	if (evbuffer_launch_write_(bev->output, at_most,
    218 	    &beva->write_overlapped)) {
    219 		bufferevent_decref_(bev);
    220 		beva->ok = 0;
    221 		bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR);
    222 	} else {
    223 		beva->write_in_progress = at_most;
    224 		bufferevent_decrement_write_buckets_(&beva->bev, at_most);
    225 		bev_async_add_write(beva);
    226 	}
    227 }
    228 
    229 static void
    230 bev_async_consider_reading(struct bufferevent_async *beva)
    231 {
    232 	size_t cur_size;
    233 	size_t read_high;
    234 	size_t at_most;
    235 	int limit;
    236 	struct bufferevent *bev = &beva->bev.bev;
    237 
    238 	/* Don't read if there is a read in progress, or we do not
    239 	 * want to read. */
    240 	if (beva->read_in_progress || beva->bev.connecting)
    241 		return;
    242 	if (!beva->ok || !(bev->enabled&EV_READ)) {
    243 		bev_async_del_read(beva);
    244 		return;
    245 	}
    246 
    247 	/* Don't read if we're full */
    248 	cur_size = evbuffer_get_length(bev->input);
    249 	read_high = bev->wm_read.high;
    250 	if (read_high) {
    251 		if (cur_size >= read_high) {
    252 			bev_async_del_read(beva);
    253 			return;
    254 		}
    255 		at_most = read_high - cur_size;
    256 	} else {
    257 		at_most = 16384; /* FIXME totally magic. */
    258 	}
    259 
    260 	/* XXXX This over-commits. */
    261 	/* XXXX see also not above on cast on bufferevent_get_write_max_() */
    262 	limit = (int)bufferevent_get_read_max_(&beva->bev);
    263 	if (at_most >= (size_t)limit && limit >= 0)
    264 		at_most = limit;
    265 
    266 	if (beva->bev.read_suspended) {
    267 		bev_async_del_read(beva);
    268 		return;
    269 	}
    270 
    271 	bufferevent_incref_(bev);
    272 	if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) {
    273 		beva->ok = 0;
    274 		bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR);
    275 		bufferevent_decref_(bev);
    276 	} else {
    277 		beva->read_in_progress = at_most;
    278 		bufferevent_decrement_read_buckets_(&beva->bev, at_most);
    279 		bev_async_add_read(beva);
    280 	}
    281 
    282 	return;
    283 }
    284 
    285 static void
    286 be_async_outbuf_callback(struct evbuffer *buf,
    287     const struct evbuffer_cb_info *cbinfo,
    288     void *arg)
    289 {
    290 	struct bufferevent *bev = arg;
    291 	struct bufferevent_async *bev_async = upcast(bev);
    292 
    293 	/* If we added data to the outbuf and were not writing before,
    294 	 * we may want to write now. */
    295 
    296 	bufferevent_incref_and_lock_(bev);
    297 
    298 	if (cbinfo->n_added)
    299 		bev_async_consider_writing(bev_async);
    300 
    301 	bufferevent_decref_and_unlock_(bev);
    302 }
    303 
    304 static void
    305 be_async_inbuf_callback(struct evbuffer *buf,
    306     const struct evbuffer_cb_info *cbinfo,
    307     void *arg)
    308 {
    309 	struct bufferevent *bev = arg;
    310 	struct bufferevent_async *bev_async = upcast(bev);
    311 
    312 	/* If we drained data from the inbuf and were not reading before,
    313 	 * we may want to read now */
    314 
    315 	bufferevent_incref_and_lock_(bev);
    316 
    317 	if (cbinfo->n_deleted)
    318 		bev_async_consider_reading(bev_async);
    319 
    320 	bufferevent_decref_and_unlock_(bev);
    321 }
    322 
    323 static int
    324 be_async_enable(struct bufferevent *buf, short what)
    325 {
    326 	struct bufferevent_async *bev_async = upcast(buf);
    327 
    328 	if (!bev_async->ok)
    329 		return -1;
    330 
    331 	if (bev_async->bev.connecting) {
    332 		/* Don't launch anything during connection attempts. */
    333 		return 0;
    334 	}
    335 
    336 	if (what & EV_READ)
    337 		BEV_RESET_GENERIC_READ_TIMEOUT(buf);
    338 	if (what & EV_WRITE)
    339 		BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
    340 
    341 	/* If we newly enable reading or writing, and we aren't reading or
    342 	   writing already, consider launching a new read or write. */
    343 
    344 	if (what & EV_READ)
    345 		bev_async_consider_reading(bev_async);
    346 	if (what & EV_WRITE)
    347 		bev_async_consider_writing(bev_async);
    348 	return 0;
    349 }
    350 
    351 static int
    352 be_async_disable(struct bufferevent *bev, short what)
    353 {
    354 	struct bufferevent_async *bev_async = upcast(bev);
    355 	/* XXXX If we disable reading or writing, we may want to consider
    356 	 * canceling any in-progress read or write operation, though it might
    357 	 * not work. */
    358 
    359 	if (what & EV_READ) {
    360 		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
    361 		bev_async_del_read(bev_async);
    362 	}
    363 	if (what & EV_WRITE) {
    364 		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
    365 		bev_async_del_write(bev_async);
    366 	}
    367 
    368 	return 0;
    369 }
    370 
    371 static void
    372 be_async_destruct(struct bufferevent *bev)
    373 {
    374 	struct bufferevent_async *bev_async = upcast(bev);
    375 	struct bufferevent_private *bev_p = BEV_UPCAST(bev);
    376 	evutil_socket_t fd;
    377 
    378 	EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
    379 			!upcast(bev)->read_in_progress);
    380 
    381 	bev_async_del_read(bev_async);
    382 	bev_async_del_write(bev_async);
    383 
    384 	fd = evbuffer_overlapped_get_fd_(bev->input);
    385 	if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) {
    386 		/* XXXX possible double-close */
    387 		evutil_closesocket(fd);
    388 	}
    389 	/* delete this in case non-blocking connect was used */
    390 	if (event_initialized(&bev->ev_write)) {
    391 		event_del(&bev->ev_write);
    392 		bufferevent_del_generic_timeout_cbs_(bev);
    393 	}
    394 }
    395 
    396 /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
    397  * we use WSAGetOverlappedResult to translate. */
    398 static void
    399 bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
    400 {
    401 	DWORD bytes, flags;
    402 	evutil_socket_t fd;
    403 
    404 	fd = evbuffer_overlapped_get_fd_(bev->input);
    405 	WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
    406 }
    407 
    408 static int
    409 be_async_flush(struct bufferevent *bev, short what,
    410     enum bufferevent_flush_mode mode)
    411 {
    412 	return 0;
    413 }
    414 
    415 static void
    416 connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
    417     ev_ssize_t nbytes, int ok)
    418 {
    419 	struct bufferevent_async *bev_a = upcast_connect(eo);
    420 	struct bufferevent *bev = &bev_a->bev.bev;
    421 	evutil_socket_t sock;
    422 
    423 	BEV_LOCK(bev);
    424 
    425 	EVUTIL_ASSERT(bev_a->bev.connecting);
    426 	bev_a->bev.connecting = 0;
    427 	sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input);
    428 	/* XXXX Handle error? */
    429 	setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
    430 
    431 	if (ok)
    432 		bufferevent_async_set_connected_(bev);
    433 	else
    434 		bev_async_set_wsa_error(bev, eo);
    435 
    436 	bufferevent_run_eventcb_(bev,
    437 			ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR);
    438 
    439 	event_base_del_virtual_(bev->ev_base);
    440 
    441 	bufferevent_decref_and_unlock_(bev);
    442 }
    443 
    444 static void
    445 read_complete(struct event_overlapped *eo, ev_uintptr_t key,
    446     ev_ssize_t nbytes, int ok)
    447 {
    448 	struct bufferevent_async *bev_a = upcast_read(eo);
    449 	struct bufferevent *bev = &bev_a->bev.bev;
    450 	short what = BEV_EVENT_READING;
    451 	ev_ssize_t amount_unread;
    452 	BEV_LOCK(bev);
    453 	EVUTIL_ASSERT(bev_a->read_in_progress);
    454 
    455 	amount_unread = bev_a->read_in_progress - nbytes;
    456 	evbuffer_commit_read_(bev->input, nbytes);
    457 	bev_a->read_in_progress = 0;
    458 	if (amount_unread)
    459 		bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread);
    460 
    461 	if (!ok)
    462 		bev_async_set_wsa_error(bev, eo);
    463 
    464 	if (bev_a->ok) {
    465 		if (ok && nbytes) {
    466 			BEV_RESET_GENERIC_READ_TIMEOUT(bev);
    467 			if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
    468 				bufferevent_run_readcb_(bev);
    469 			bev_async_consider_reading(bev_a);
    470 		} else if (!ok) {
    471 			what |= BEV_EVENT_ERROR;
    472 			bev_a->ok = 0;
    473 			bufferevent_run_eventcb_(bev, what);
    474 		} else if (!nbytes) {
    475 			what |= BEV_EVENT_EOF;
    476 			bev_a->ok = 0;
    477 			bufferevent_run_eventcb_(bev, what);
    478 		}
    479 	}
    480 
    481 	bufferevent_decref_and_unlock_(bev);
    482 }
    483 
    484 static void
    485 write_complete(struct event_overlapped *eo, ev_uintptr_t key,
    486     ev_ssize_t nbytes, int ok)
    487 {
    488 	struct bufferevent_async *bev_a = upcast_write(eo);
    489 	struct bufferevent *bev = &bev_a->bev.bev;
    490 	short what = BEV_EVENT_WRITING;
    491 	ev_ssize_t amount_unwritten;
    492 
    493 	BEV_LOCK(bev);
    494 	EVUTIL_ASSERT(bev_a->write_in_progress);
    495 
    496 	amount_unwritten = bev_a->write_in_progress - nbytes;
    497 	evbuffer_commit_write_(bev->output, nbytes);
    498 	bev_a->write_in_progress = 0;
    499 
    500 	if (amount_unwritten)
    501 		bufferevent_decrement_write_buckets_(&bev_a->bev,
    502 		                                     -amount_unwritten);
    503 
    504 
    505 	if (!ok)
    506 		bev_async_set_wsa_error(bev, eo);
    507 
    508 	if (bev_a->ok) {
    509 		if (ok && nbytes) {
    510 			BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
    511 			if (evbuffer_get_length(bev->output) <=
    512 			    bev->wm_write.low)
    513 				bufferevent_run_writecb_(bev);
    514 			bev_async_consider_writing(bev_a);
    515 		} else if (!ok) {
    516 			what |= BEV_EVENT_ERROR;
    517 			bev_a->ok = 0;
    518 			bufferevent_run_eventcb_(bev, what);
    519 		} else if (!nbytes) {
    520 			what |= BEV_EVENT_EOF;
    521 			bev_a->ok = 0;
    522 			bufferevent_run_eventcb_(bev, what);
    523 		}
    524 	}
    525 
    526 	bufferevent_decref_and_unlock_(bev);
    527 }
    528 
    529 struct bufferevent *
    530 bufferevent_async_new_(struct event_base *base,
    531     evutil_socket_t fd, int options)
    532 {
    533 	struct bufferevent_async *bev_a;
    534 	struct bufferevent *bev;
    535 	struct event_iocp_port *iocp;
    536 
    537 	options |= BEV_OPT_THREADSAFE;
    538 
    539 	if (!(iocp = event_base_get_iocp_(base)))
    540 		return NULL;
    541 
    542 	if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) {
    543 		int err = GetLastError();
    544 		/* We may have alrady associated this fd with a port.
    545 		 * Let's hope it's this port, and that the error code
    546 		 * for doing this neer changes. */
    547 		if (err != ERROR_INVALID_PARAMETER)
    548 			return NULL;
    549 	}
    550 
    551 	if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
    552 		return NULL;
    553 
    554 	bev = &bev_a->bev.bev;
    555 	if (!(bev->input = evbuffer_overlapped_new_(fd))) {
    556 		mm_free(bev_a);
    557 		return NULL;
    558 	}
    559 	if (!(bev->output = evbuffer_overlapped_new_(fd))) {
    560 		evbuffer_free(bev->input);
    561 		mm_free(bev_a);
    562 		return NULL;
    563 	}
    564 
    565 	if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async,
    566 		options)<0)
    567 		goto err;
    568 
    569 	evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
    570 	evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
    571 
    572 	event_overlapped_init_(&bev_a->connect_overlapped, connect_complete);
    573 	event_overlapped_init_(&bev_a->read_overlapped, read_complete);
    574 	event_overlapped_init_(&bev_a->write_overlapped, write_complete);
    575 
    576 	bev_a->ok = fd >= 0;
    577 	if (bev_a->ok)
    578 		bufferevent_init_generic_timeout_cbs_(bev);
    579 
    580 	return bev;
    581 err:
    582 	bufferevent_free(&bev_a->bev.bev);
    583 	return NULL;
    584 }
    585 
    586 void
    587 bufferevent_async_set_connected_(struct bufferevent *bev)
    588 {
    589 	struct bufferevent_async *bev_async = upcast(bev);
    590 	bev_async->ok = 1;
    591 	bufferevent_init_generic_timeout_cbs_(bev);
    592 	/* Now's a good time to consider reading/writing */
    593 	be_async_enable(bev, bev->enabled);
    594 }
    595 
    596 int
    597 bufferevent_async_can_connect_(struct bufferevent *bev)
    598 {
    599 	const struct win32_extension_fns *ext =
    600 	    event_get_win32_extension_fns_();
    601 
    602 	if (BEV_IS_ASYNC(bev) &&
    603 	    event_base_get_iocp_(bev->ev_base) &&
    604 	    ext && ext->ConnectEx)
    605 		return 1;
    606 
    607 	return 0;
    608 }
    609 
    610 int
    611 bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd,
    612 	const struct sockaddr *sa, int socklen)
    613 {
    614 	BOOL rc;
    615 	struct bufferevent_async *bev_async = upcast(bev);
    616 	struct sockaddr_storage ss;
    617 	const struct win32_extension_fns *ext =
    618 	    event_get_win32_extension_fns_();
    619 
    620 	EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
    621 
    622 	/* ConnectEx() requires that the socket be bound to an address
    623 	 * with bind() before using, otherwise it will fail. We attempt
    624 	 * to issue a bind() here, taking into account that the error
    625 	 * code is set to WSAEINVAL when the socket is already bound. */
    626 	memset(&ss, 0, sizeof(ss));
    627 	if (sa->sa_family == AF_INET) {
    628 		struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
    629 		sin->sin_family = AF_INET;
    630 		sin->sin_addr.s_addr = INADDR_ANY;
    631 	} else if (sa->sa_family == AF_INET6) {
    632 		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
    633 		sin6->sin6_family = AF_INET6;
    634 		sin6->sin6_addr = in6addr_any;
    635 	} else {
    636 		/* Well, the user will have to bind() */
    637 		return -1;
    638 	}
    639 	if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
    640 	    WSAGetLastError() != WSAEINVAL)
    641 		return -1;
    642 
    643 	event_base_add_virtual_(bev->ev_base);
    644 	bufferevent_incref_(bev);
    645 	rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
    646 			    &bev_async->connect_overlapped.overlapped);
    647 	if (rc || WSAGetLastError() == ERROR_IO_PENDING)
    648 		return 0;
    649 
    650 	event_base_del_virtual_(bev->ev_base);
    651 	bufferevent_decref_(bev);
    652 
    653 	return -1;
    654 }
    655 
    656 static int
    657 be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
    658     union bufferevent_ctrl_data *data)
    659 {
    660 	switch (op) {
    661 	case BEV_CTRL_GET_FD:
    662 		data->fd = evbuffer_overlapped_get_fd_(bev->input);
    663 		return 0;
    664 	case BEV_CTRL_SET_FD: {
    665 		struct event_iocp_port *iocp;
    666 
    667 		if (data->fd == evbuffer_overlapped_get_fd_(bev->input))
    668 			return 0;
    669 		if (!(iocp = event_base_get_iocp_(bev->ev_base)))
    670 			return -1;
    671 		if (event_iocp_port_associate_(iocp, data->fd, 1) < 0)
    672 			return -1;
    673 		evbuffer_overlapped_set_fd_(bev->input, data->fd);
    674 		evbuffer_overlapped_set_fd_(bev->output, data->fd);
    675 		return 0;
    676 	}
    677 	case BEV_CTRL_CANCEL_ALL: {
    678 		struct bufferevent_async *bev_a = upcast(bev);
    679 		evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input);
    680 		if (fd != (evutil_socket_t)INVALID_SOCKET &&
    681 		    (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
    682 			closesocket(fd);
    683 		}
    684 		bev_a->ok = 0;
    685 		return 0;
    686 	}
    687 	case BEV_CTRL_GET_UNDERLYING:
    688 	default:
    689 		return -1;
    690 	}
    691 }
    692 
    693 
    694