Home | History | Annotate | Line # | Download | only in util
      1 /*
      2  * util/tube.c - pipe service
      3  *
      4  * Copyright (c) 2008, NLnet Labs. All rights reserved.
      5  *
      6  * This software is open source.
      7  *
      8  * Redistribution and use in source and binary forms, with or without
      9  * modification, are permitted provided that the following conditions
     10  * are met:
     11  *
     12  * Redistributions of source code must retain the above copyright notice,
     13  * this list of conditions and the following disclaimer.
     14  *
     15  * Redistributions in binary form must reproduce the above copyright notice,
     16  * this list of conditions and the following disclaimer in the documentation
     17  * and/or other materials provided with the distribution.
     18  *
     19  * Neither the name of the NLNET LABS nor the names of its contributors may
     20  * be used to endorse or promote products derived from this software without
     21  * specific prior written permission.
     22  *
     23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     26  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     27  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     28  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
     29  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
     30  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
     31  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
     32  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
     33  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     34  */
     35 
     36 /**
     37  * \file
     38  *
     39  * This file contains pipe service functions.
     40  */
     41 #include "config.h"
     42 #include "util/tube.h"
     43 #include "util/log.h"
     44 #include "util/net_help.h"
     45 #include "util/netevent.h"
     46 #include "util/fptr_wlist.h"
     47 #include "util/ub_event.h"
     48 #ifdef HAVE_POLL_H
     49 #include <poll.h>
     50 #endif
     51 
     52 #ifndef USE_WINSOCK
     53 /* on unix */
     54 
     55 #ifndef HAVE_SOCKETPAIR
     56 /** no socketpair() available, like on Minix 3.1.7, use pipe */
     57 #define socketpair(f, t, p, sv) pipe(sv)
     58 #endif /* HAVE_SOCKETPAIR */
     59 
     60 struct tube* tube_create(void)
     61 {
     62 	struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
     63 	int sv[2];
     64 	if(!tube) {
     65 		int err = errno;
     66 		log_err("tube_create: out of memory");
     67 		errno = err;
     68 		return NULL;
     69 	}
     70 	tube->sr = -1;
     71 	tube->sw = -1;
     72 	if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
     73 		int err = errno;
     74 		log_err("socketpair: %s", strerror(errno));
     75 		free(tube);
     76 		errno = err;
     77 		return NULL;
     78 	}
     79 	tube->sr = sv[0];
     80 	tube->sw = sv[1];
     81 	if(!fd_set_nonblock(tube->sr) || !fd_set_nonblock(tube->sw)) {
     82 		int err = errno;
     83 		log_err("tube: cannot set nonblocking");
     84 		tube_delete(tube);
     85 		errno = err;
     86 		return NULL;
     87 	}
     88 	return tube;
     89 }
     90 
     91 void tube_delete(struct tube* tube)
     92 {
     93 	if(!tube) return;
     94 	tube_remove_bg_listen(tube);
     95 	tube_remove_bg_write(tube);
     96 	/* close fds after deleting commpoints, to be sure.
     97 	 *            Also epoll does not like closing fd before event_del */
     98 	tube_close_read(tube);
     99 	tube_close_write(tube);
    100 	free(tube);
    101 }
    102 
    103 void tube_close_read(struct tube* tube)
    104 {
    105 	if(tube->sr != -1) {
    106 		close(tube->sr);
    107 		tube->sr = -1;
    108 	}
    109 }
    110 
    111 void tube_close_write(struct tube* tube)
    112 {
    113 	if(tube->sw != -1) {
    114 		close(tube->sw);
    115 		tube->sw = -1;
    116 	}
    117 }
    118 
    119 void tube_remove_bg_listen(struct tube* tube)
    120 {
    121 	if(tube->listen_com) {
    122 		comm_point_delete(tube->listen_com);
    123 		tube->listen_com = NULL;
    124 	}
    125 	free(tube->cmd_msg);
    126 	tube->cmd_msg = NULL;
    127 }
    128 
    129 void tube_remove_bg_write(struct tube* tube)
    130 {
    131 	if(tube->res_com) {
    132 		comm_point_delete(tube->res_com);
    133 		tube->res_com = NULL;
    134 	}
    135 	if(tube->res_list) {
    136 		struct tube_res_list* np, *p = tube->res_list;
    137 		tube->res_list = NULL;
    138 		tube->res_last = NULL;
    139 		while(p) {
    140 			np = p->next;
    141 			free(p->buf);
    142 			free(p);
    143 			p = np;
    144 		}
    145 	}
    146 }
    147 
    148 int
    149 tube_handle_listen(struct comm_point* c, void* arg, int error,
    150         struct comm_reply* ATTR_UNUSED(reply_info))
    151 {
    152 	struct tube* tube = (struct tube*)arg;
    153 	ssize_t r;
    154 	if(error != NETEVENT_NOERROR) {
    155 		fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
    156 		(*tube->listen_cb)(tube, NULL, 0, error, tube->listen_arg);
    157 		return 0;
    158 	}
    159 
    160 	if(tube->cmd_read < sizeof(tube->cmd_len)) {
    161 		/* complete reading the length of control msg */
    162 		r = read(c->fd, ((uint8_t*)&tube->cmd_len) + tube->cmd_read,
    163 			sizeof(tube->cmd_len) - tube->cmd_read);
    164 		if(r==0) {
    165 			/* error has happened or */
    166 			/* parent closed pipe, must have exited somehow */
    167 			fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
    168 			(*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
    169 				tube->listen_arg);
    170 			return 0;
    171 		}
    172 		if(r==-1) {
    173 			if(errno != EAGAIN && errno != EINTR) {
    174 				log_err("rpipe error: %s", strerror(errno));
    175 			}
    176 			/* nothing to read now, try later */
    177 			return 0;
    178 		}
    179 		tube->cmd_read += r;
    180 		if(tube->cmd_read < sizeof(tube->cmd_len)) {
    181 			/* not complete, try later */
    182 			return 0;
    183 		}
    184 		tube->cmd_msg = (uint8_t*)calloc(1, tube->cmd_len);
    185 		if(!tube->cmd_msg) {
    186 			log_err("malloc failure");
    187 			tube->cmd_read = 0;
    188 			return 0;
    189 		}
    190 	}
    191 	/* cmd_len has been read, read remainder */
    192 	r = read(c->fd, tube->cmd_msg+tube->cmd_read-sizeof(tube->cmd_len),
    193 		tube->cmd_len - (tube->cmd_read - sizeof(tube->cmd_len)));
    194 	if(r==0) {
    195 		/* error has happened or */
    196 		/* parent closed pipe, must have exited somehow */
    197 		fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
    198 		(*tube->listen_cb)(tube, NULL, 0, NETEVENT_CLOSED,
    199 			tube->listen_arg);
    200 		return 0;
    201 	}
    202 	if(r==-1) {
    203 		/* nothing to read now, try later */
    204 		if(errno != EAGAIN && errno != EINTR) {
    205 			log_err("rpipe error: %s", strerror(errno));
    206 		}
    207 		return 0;
    208 	}
    209 	tube->cmd_read += r;
    210 	if(tube->cmd_read < sizeof(tube->cmd_len) + tube->cmd_len) {
    211 		/* not complete, try later */
    212 		return 0;
    213 	}
    214 	tube->cmd_read = 0;
    215 
    216 	fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
    217 	(*tube->listen_cb)(tube, tube->cmd_msg, tube->cmd_len,
    218 		NETEVENT_NOERROR, tube->listen_arg);
    219 		/* also frees the buf */
    220 	tube->cmd_msg = NULL;
    221 	return 0;
    222 }
    223 
    224 int
    225 tube_handle_write(struct comm_point* c, void* arg, int error,
    226         struct comm_reply* ATTR_UNUSED(reply_info))
    227 {
    228 	struct tube* tube = (struct tube*)arg;
    229 	struct tube_res_list* item = tube->res_list;
    230 	ssize_t r;
    231 	if(error != NETEVENT_NOERROR) {
    232 		log_err("tube_handle_write net error %d", error);
    233 		return 0;
    234 	}
    235 
    236 	if(!item) {
    237 		comm_point_stop_listening(c);
    238 		return 0;
    239 	}
    240 
    241 	if(tube->res_write < sizeof(item->len)) {
    242 		r = write(c->fd, ((uint8_t*)&item->len) + tube->res_write,
    243 			sizeof(item->len) - tube->res_write);
    244 		if(r == -1) {
    245 			if(errno != EAGAIN && errno != EINTR) {
    246 				log_err("wpipe error: %s", strerror(errno));
    247 			}
    248 			return 0; /* try again later */
    249 		}
    250 		if(r == 0) {
    251 			/* error on pipe, must have exited somehow */
    252 			/* cannot signal this to pipe user */
    253 			return 0;
    254 		}
    255 		tube->res_write += r;
    256 		if(tube->res_write < sizeof(item->len))
    257 			return 0;
    258 	}
    259 	r = write(c->fd, item->buf + tube->res_write - sizeof(item->len),
    260 		item->len - (tube->res_write - sizeof(item->len)));
    261 	if(r == -1) {
    262 		if(errno != EAGAIN && errno != EINTR) {
    263 			log_err("wpipe error: %s", strerror(errno));
    264 		}
    265 		return 0; /* try again later */
    266 	}
    267 	if(r == 0) {
    268 		/* error on pipe, must have exited somehow */
    269 		/* cannot signal this to pipe user */
    270 		return 0;
    271 	}
    272 	tube->res_write += r;
    273 	if(tube->res_write < sizeof(item->len) + item->len)
    274 		return 0;
    275 	/* done this result, remove it */
    276 	free(item->buf);
    277 	item->buf = NULL;
    278 	tube->res_list = tube->res_list->next;
    279 	free(item);
    280 	if(!tube->res_list) {
    281 		tube->res_last = NULL;
    282 		comm_point_stop_listening(c);
    283 	}
    284 	tube->res_write = 0;
    285 	return 0;
    286 }
    287 
    288 int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
    289         int nonblock)
    290 {
    291 	ssize_t r, d;
    292 	int fd = tube->sw;
    293 
    294 	/* test */
    295 	if(nonblock) {
    296 		r = write(fd, &len, sizeof(len));
    297 		if(r == -1) {
    298 			if(errno==EINTR || errno==EAGAIN)
    299 				return -1;
    300 			log_err("tube msg write failed: %s", strerror(errno));
    301 			return -1; /* can still continue, perhaps */
    302 		}
    303 	} else r = 0;
    304 	if(!fd_set_block(fd))
    305 		return 0;
    306 	/* write remainder */
    307 	d = r;
    308 	while(d != (ssize_t)sizeof(len)) {
    309 		if((r=write(fd, ((char*)&len)+d, sizeof(len)-d)) == -1) {
    310 			if(errno == EAGAIN)
    311 				continue; /* temporarily unavail: try again*/
    312 			log_err("tube msg write failed: %s", strerror(errno));
    313 			(void)fd_set_nonblock(fd);
    314 			return 0;
    315 		}
    316 		d += r;
    317 	}
    318 	d = 0;
    319 	while(d != (ssize_t)len) {
    320 		if((r=write(fd, buf+d, len-d)) == -1) {
    321 			if(errno == EAGAIN)
    322 				continue; /* temporarily unavail: try again*/
    323 			log_err("tube msg write failed: %s", strerror(errno));
    324 			(void)fd_set_nonblock(fd);
    325 			return 0;
    326 		}
    327 		d += r;
    328 	}
    329 	if(!fd_set_nonblock(fd))
    330 		return 0;
    331 	return 1;
    332 }
    333 
    334 int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
    335         int nonblock)
    336 {
    337 	ssize_t r, d;
    338 	int fd = tube->sr;
    339 
    340 	/* test */
    341 	*len = 0;
    342 	if(nonblock) {
    343 		r = read(fd, len, sizeof(*len));
    344 		if(r == -1) {
    345 			if(errno==EINTR || errno==EAGAIN)
    346 				return -1;
    347 			log_err("tube msg read failed: %s", strerror(errno));
    348 			return -1; /* we can still continue, perhaps */
    349 		}
    350 		if(r == 0) /* EOF */
    351 			return 0;
    352 	} else r = 0;
    353 	if(!fd_set_block(fd))
    354 		return 0;
    355 	/* read remainder */
    356 	d = r;
    357 	while(d != (ssize_t)sizeof(*len)) {
    358 		if((r=read(fd, ((char*)len)+d, sizeof(*len)-d)) == -1) {
    359 			log_err("tube msg read failed: %s", strerror(errno));
    360 			(void)fd_set_nonblock(fd);
    361 			return 0;
    362 		}
    363 		if(r == 0) /* EOF */ {
    364 			(void)fd_set_nonblock(fd);
    365 			return 0;
    366 		}
    367 		d += r;
    368 	}
    369 	if (*len >= 65536*2) {
    370 		log_err("tube msg length %u is too big", (unsigned)*len);
    371 		(void)fd_set_nonblock(fd);
    372 		return 0;
    373 	}
    374 	*buf = (uint8_t*)malloc(*len);
    375 	if(!*buf) {
    376 		log_err("tube read out of memory");
    377 		(void)fd_set_nonblock(fd);
    378 		return 0;
    379 	}
    380 	d = 0;
    381 	while(d < (ssize_t)*len) {
    382 		if((r=read(fd, (*buf)+d, (size_t)((ssize_t)*len)-d)) == -1) {
    383 			log_err("tube msg read failed: %s", strerror(errno));
    384 			(void)fd_set_nonblock(fd);
    385 			free(*buf);
    386 			return 0;
    387 		}
    388 		if(r == 0) { /* EOF */
    389 			(void)fd_set_nonblock(fd);
    390 			free(*buf);
    391 			return 0;
    392 		}
    393 		d += r;
    394 	}
    395 	if(!fd_set_nonblock(fd)) {
    396 		free(*buf);
    397 		return 0;
    398 	}
    399 	return 1;
    400 }
    401 
    402 /** perform poll() on the fd */
    403 static int
    404 pollit(int fd, struct timeval* t)
    405 {
    406 	struct pollfd fds;
    407 	int pret;
    408 	int msec = -1;
    409 	memset(&fds, 0, sizeof(fds));
    410 	fds.fd = fd;
    411 	fds.events = POLLIN | POLLERR | POLLHUP;
    412 #ifndef S_SPLINT_S
    413 	if(t)
    414 		msec = t->tv_sec*1000 + t->tv_usec/1000;
    415 #endif
    416 
    417 	pret = poll(&fds, 1, msec);
    418 
    419 	if(pret == -1)
    420 		return 0;
    421 	if(pret != 0)
    422 		return 1;
    423 	return 0;
    424 }
    425 
    426 int tube_poll(struct tube* tube)
    427 {
    428 	struct timeval t;
    429 	memset(&t, 0, sizeof(t));
    430 	return pollit(tube->sr, &t);
    431 }
    432 
    433 int tube_wait(struct tube* tube)
    434 {
    435 	return pollit(tube->sr, NULL);
    436 }
    437 
    438 int tube_wait_timeout(struct tube* tube, int msec)
    439 {
    440 	int ret = 0;
    441 
    442 	while(1) {
    443 		struct pollfd fds;
    444 		memset(&fds, 0, sizeof(fds));
    445 
    446 		fds.fd = tube->sr;
    447 		fds.events = POLLIN | POLLERR | POLLHUP;
    448 		ret = poll(&fds, 1, msec);
    449 
    450 		if(ret == -1) {
    451 			if(errno == EAGAIN || errno == EINTR)
    452 				continue;
    453 			return -1;
    454 		}
    455 		break;
    456 	}
    457 
    458 	if(ret != 0)
    459 		return 1;
    460 	return 0;
    461 }
    462 
    463 int tube_read_fd(struct tube* tube)
    464 {
    465 	return tube->sr;
    466 }
    467 
    468 int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
    469         tube_callback_type* cb, void* arg)
    470 {
    471 	tube->listen_cb = cb;
    472 	tube->listen_arg = arg;
    473 	if(!(tube->listen_com = comm_point_create_raw(base, tube->sr,
    474 		0, tube_handle_listen, tube))) {
    475 		int err = errno;
    476 		log_err("tube_setup_bg_l: commpoint creation failed");
    477 		errno = err;
    478 		return 0;
    479 	}
    480 	return 1;
    481 }
    482 
    483 int tube_setup_bg_write(struct tube* tube, struct comm_base* base)
    484 {
    485 	if(!(tube->res_com = comm_point_create_raw(base, tube->sw,
    486 		1, tube_handle_write, tube))) {
    487 		int err = errno;
    488 		log_err("tube_setup_bg_w: commpoint creation failed");
    489 		errno = err;
    490 		return 0;
    491 	}
    492 	return 1;
    493 }
    494 
    495 int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
    496 {
    497 	struct tube_res_list* item;
    498 	if(!tube || !tube->res_com) return 0;
    499 	item = (struct tube_res_list*)malloc(sizeof(*item));
    500 	if(!item) {
    501 		free(msg);
    502 		log_err("out of memory for async answer");
    503 		return 0;
    504 	}
    505 	item->buf = msg;
    506 	item->len = len;
    507 	item->next = NULL;
    508 	/* add at back of list, since the first one may be partially written */
    509 	if(tube->res_last)
    510 		tube->res_last->next = item;
    511 	else    tube->res_list = item;
    512 	tube->res_last = item;
    513 	if(tube->res_list == tube->res_last) {
    514 		/* first added item, start the write process */
    515 		comm_point_start_listening(tube->res_com, -1, -1);
    516 	}
    517 	return 1;
    518 }
    519 
    520 void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events),
    521 	void* ATTR_UNUSED(arg))
    522 {
    523 	log_assert(0);
    524 }
    525 
    526 #else /* USE_WINSOCK */
    527 /* on windows */
    528 
    529 
    530 struct tube* tube_create(void)
    531 {
    532 	/* windows does not have forks like unix, so we only support
    533 	 * threads on windows. And thus the pipe need only connect
    534 	 * threads. We use a mutex and a list of datagrams. */
    535 	struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
    536 	if(!tube) {
    537 		int err = errno;
    538 		log_err("tube_create: out of memory");
    539 		errno = err;
    540 		return NULL;
    541 	}
    542 	tube->event = WSACreateEvent();
    543 	if(tube->event == WSA_INVALID_EVENT) {
    544 		free(tube);
    545 		log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError()));
    546 		return NULL;
    547 	}
    548 	if(!WSAResetEvent(tube->event)) {
    549 		log_err("WSAResetEvent: %s", wsa_strerror(WSAGetLastError()));
    550 	}
    551 	lock_basic_init(&tube->res_lock);
    552 	verbose(VERB_ALGO, "tube created");
    553 	return tube;
    554 }
    555 
    556 void tube_delete(struct tube* tube)
    557 {
    558 	if(!tube) return;
    559 	tube_remove_bg_listen(tube);
    560 	tube_remove_bg_write(tube);
    561 	tube_close_read(tube);
    562 	tube_close_write(tube);
    563 	if(!WSACloseEvent(tube->event))
    564 		log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError()));
    565 	lock_basic_destroy(&tube->res_lock);
    566 	verbose(VERB_ALGO, "tube deleted");
    567 	free(tube);
    568 }
    569 
    570 void tube_close_read(struct tube* ATTR_UNUSED(tube))
    571 {
    572 	verbose(VERB_ALGO, "tube close_read");
    573 }
    574 
    575 void tube_close_write(struct tube* ATTR_UNUSED(tube))
    576 {
    577 	verbose(VERB_ALGO, "tube close_write");
    578 	/* wake up waiting reader with an empty queue */
    579 	if(!WSASetEvent(tube->event)) {
    580 		log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
    581 	}
    582 }
    583 
    584 void tube_remove_bg_listen(struct tube* tube)
    585 {
    586 	verbose(VERB_ALGO, "tube remove_bg_listen");
    587 	if (tube->ev_listen != NULL) {
    588 		ub_winsock_unregister_wsaevent(tube->ev_listen);
    589 		tube->ev_listen = NULL;
    590 	}
    591 }
    592 
    593 void tube_remove_bg_write(struct tube* tube)
    594 {
    595 	verbose(VERB_ALGO, "tube remove_bg_write");
    596 	if(tube->res_list) {
    597 		struct tube_res_list* np, *p = tube->res_list;
    598 		tube->res_list = NULL;
    599 		tube->res_last = NULL;
    600 		while(p) {
    601 			np = p->next;
    602 			free(p->buf);
    603 			free(p);
    604 			p = np;
    605 		}
    606 	}
    607 }
    608 
    609 int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
    610         int ATTR_UNUSED(nonblock))
    611 {
    612 	uint8_t* a;
    613 	verbose(VERB_ALGO, "tube write_msg len %d", (int)len);
    614 	a = (uint8_t*)memdup(buf, len);
    615 	if(!a) {
    616 		log_err("out of memory in tube_write_msg");
    617 		return 0;
    618 	}
    619 	/* always nonblocking, this pipe cannot get full */
    620 	return tube_queue_item(tube, a, len);
    621 }
    622 
    623 int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
    624         int nonblock)
    625 {
    626 	struct tube_res_list* item = NULL;
    627 	verbose(VERB_ALGO, "tube read_msg %s", nonblock?"nonblock":"blocking");
    628 	*buf = NULL;
    629 	if(!tube_poll(tube)) {
    630 		verbose(VERB_ALGO, "tube read_msg nodata");
    631 		/* nothing ready right now, wait if we want to */
    632 		if(nonblock)
    633 			return -1; /* would block waiting for items */
    634 		if(!tube_wait(tube))
    635 			return 0;
    636 	}
    637 	lock_basic_lock(&tube->res_lock);
    638 	if(tube->res_list) {
    639 		item = tube->res_list;
    640 		tube->res_list = item->next;
    641 		if(tube->res_last == item) {
    642 			/* the list is now empty */
    643 			tube->res_last = NULL;
    644 			verbose(VERB_ALGO, "tube read_msg lastdata");
    645 			if(!WSAResetEvent(tube->event)) {
    646 				log_err("WSAResetEvent: %s",
    647 					wsa_strerror(WSAGetLastError()));
    648 			}
    649 		}
    650 	}
    651 	lock_basic_unlock(&tube->res_lock);
    652 	if(!item)
    653 		return 0; /* would block waiting for items */
    654 	*buf = item->buf;
    655 	*len = item->len;
    656 	free(item);
    657 	verbose(VERB_ALGO, "tube read_msg len %d", (int)*len);
    658 	return 1;
    659 }
    660 
    661 int tube_poll(struct tube* tube)
    662 {
    663 	struct tube_res_list* item = NULL;
    664 	lock_basic_lock(&tube->res_lock);
    665 	item = tube->res_list;
    666 	lock_basic_unlock(&tube->res_lock);
    667 	if(item)
    668 		return 1;
    669 	return 0;
    670 }
    671 
    672 int tube_wait(struct tube* tube)
    673 {
    674 	/* block on eventhandle */
    675 	DWORD res = WSAWaitForMultipleEvents(
    676 		1 /* one event in array */,
    677 		&tube->event /* the event to wait for, our pipe signal */,
    678 		0 /* wait for all events is false */,
    679 		WSA_INFINITE /* wait, no timeout */,
    680 		0 /* we are not alertable for IO completion routines */
    681 		);
    682 	if(res == WSA_WAIT_TIMEOUT) {
    683 		return 0;
    684 	}
    685 	if(res == WSA_WAIT_IO_COMPLETION) {
    686 		/* a bit unexpected, since we were not alertable */
    687 		return 0;
    688 	}
    689 	return 1;
    690 }
    691 
    692 int tube_wait_timeout(struct tube* tube, int msec)
    693 {
    694 	/* block on eventhandle */
    695 	DWORD res = WSAWaitForMultipleEvents(
    696 		1 /* one event in array */,
    697 		&tube->event /* the event to wait for, our pipe signal */,
    698 		0 /* wait for all events is false */,
    699 		msec /* wait for timeout */,
    700 		0 /* we are not alertable for IO completion routines */
    701 		);
    702 	if(res == WSA_WAIT_TIMEOUT) {
    703 		return 0;
    704 	}
    705 	if(res == WSA_WAIT_IO_COMPLETION) {
    706 		/* a bit unexpected, since we were not alertable */
    707 		return -1;
    708 	}
    709 	return 1;
    710 }
    711 
    712 int tube_read_fd(struct tube* ATTR_UNUSED(tube))
    713 {
    714 	/* nothing sensible on Windows */
    715 	return -1;
    716 }
    717 
    718 int
    719 tube_handle_listen(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
    720 	int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
    721 {
    722 	log_assert(0);
    723 	return 0;
    724 }
    725 
    726 int
    727 tube_handle_write(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
    728 	int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
    729 {
    730 	log_assert(0);
    731 	return 0;
    732 }
    733 
    734 int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
    735         tube_callback_type* cb, void* arg)
    736 {
    737 	tube->listen_cb = cb;
    738 	tube->listen_arg = arg;
    739 	if(!comm_base_internal(base))
    740 		return 1; /* ignore when no comm base - testing */
    741 	tube->ev_listen = ub_winsock_register_wsaevent(
    742 	    comm_base_internal(base), tube->event, &tube_handle_signal, tube);
    743 	return tube->ev_listen ? 1 : 0;
    744 }
    745 
    746 int tube_setup_bg_write(struct tube* ATTR_UNUSED(tube),
    747 	struct comm_base* ATTR_UNUSED(base))
    748 {
    749 	/* the queue item routine performs the signaling */
    750 	return 1;
    751 }
    752 
    753 int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
    754 {
    755 	struct tube_res_list* item;
    756 	if(!tube) return 0;
    757 	item = (struct tube_res_list*)malloc(sizeof(*item));
    758 	verbose(VERB_ALGO, "tube queue_item len %d", (int)len);
    759 	if(!item) {
    760 		free(msg);
    761 		log_err("out of memory for async answer");
    762 		return 0;
    763 	}
    764 	item->buf = msg;
    765 	item->len = len;
    766 	item->next = NULL;
    767 	lock_basic_lock(&tube->res_lock);
    768 	/* add at back of list, since the first one may be partially written */
    769 	if(tube->res_last)
    770 		tube->res_last->next = item;
    771 	else    tube->res_list = item;
    772 	tube->res_last = item;
    773 	/* signal the eventhandle */
    774 	if(!WSASetEvent(tube->event)) {
    775 		log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
    776 	}
    777 	lock_basic_unlock(&tube->res_lock);
    778 	return 1;
    779 }
    780 
    781 void tube_handle_signal(int ATTR_UNUSED(fd), short ATTR_UNUSED(events),
    782 	void* arg)
    783 {
    784 	struct tube* tube = (struct tube*)arg;
    785 	uint8_t* buf;
    786 	uint32_t len = 0;
    787 	verbose(VERB_ALGO, "tube handle_signal");
    788 	while(tube_poll(tube)) {
    789 		if(tube_read_msg(tube, &buf, &len, 1)) {
    790 			fptr_ok(fptr_whitelist_tube_listen(tube->listen_cb));
    791 			(*tube->listen_cb)(tube, buf, len, NETEVENT_NOERROR,
    792 				tube->listen_arg);
    793 		}
    794 	}
    795 }
    796 
    797 #endif /* USE_WINSOCK */
    798