Home | History | Annotate | Line # | Download | only in testcode
      1 /*
      2  * testcode/delayer.c - debug program that delays queries to a server.
      3  *
      4  * Copyright (c) 2008, NLnet Labs. All rights reserved.
      5  *
      6  * This software is open source.
      7  *
      8  * Redistribution and use in source and binary forms, with or without
      9  * modification, are permitted provided that the following conditions
     10  * are met:
     11  *
     12  * Redistributions of source code must retain the above copyright notice,
     13  * this list of conditions and the following disclaimer.
     14  *
     15  * Redistributions in binary form must reproduce the above copyright notice,
     16  * this list of conditions and the following disclaimer in the documentation
     17  * and/or other materials provided with the distribution.
     18  *
     19  * Neither the name of the NLNET LABS nor the names of its contributors may
     20  * be used to endorse or promote products derived from this software without
     21  * specific prior written permission.
     22  *
     23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     24  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     25  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     26  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     27  * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     28  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
     29  * TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
     30  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
     31  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
     32  * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
     33  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     34  */
     35 
     36 /**
     37  * \file
     38  *
     39  * This program delays queries made. It performs as a proxy to another
     40  * server and delays queries to it.
     41  */
     42 
     43 #include "config.h"
     44 #ifdef HAVE_GETOPT_H
     45 #include <getopt.h>
     46 #endif
     47 #ifdef HAVE_TIME_H
     48 #include <time.h>
     49 #endif
     50 #include <sys/time.h>
     51 #include "util/net_help.h"
     52 #include "util/config_file.h"
     53 #include "sldns/sbuffer.h"
     54 #include <signal.h>
     55 
     56 /** number of reads per select for delayer */
     57 #define TRIES_PER_SELECT 100
     58 
     59 /**
     60  * The ring buffer
     61  */
     62 struct ringbuf {
     63 	/** base of buffer */
     64 	uint8_t* buf;
     65 	/** size of buffer */
     66 	size_t size;
     67 	/** low mark, items start here */
     68 	size_t low;
     69 	/** high mark, items end here */
     70 	size_t high;
     71 };
     72 
     73 /**
     74  * List of proxy fds that return replies from the server to our clients.
     75  */
     76 struct proxy {
     77 	/** the fd to listen for replies from server */
     78 	int s;
     79 	/** last time this was used */
     80 	struct timeval lastuse;
     81 	/** remote address */
     82 	struct sockaddr_storage addr;
     83 	/** length of addr */
     84 	socklen_t addr_len;
     85 	/** number of queries waiting (in total) */
     86 	size_t numwait;
     87 	/** number of queries sent to server (in total) */
     88 	size_t numsent;
     89 	/** numberof answers returned to client (in total) */
     90 	size_t numreturn;
     91 	/** how many times repurposed */
     92 	size_t numreuse;
     93 	/** next in proxylist */
     94 	struct proxy* next;
     95 };
     96 
     97 /**
     98  * An item that has to be TCP relayed
     99  */
    100 struct tcp_send_list {
    101 	/** the data item */
    102 	uint8_t* item;
    103 	/** size of item */
    104 	size_t len;
    105 	/** time when the item can be transmitted on */
    106 	struct timeval wait;
    107 	/** how much of the item has already been transmitted */
    108 	size_t done;
    109 	/** next in list */
    110 	struct tcp_send_list* next;
    111 };
    112 
    113 /**
    114  * List of TCP proxy fd pairs to TCP connect client to server
    115  */
    116 struct tcp_proxy {
    117 	/** the fd to listen for client query */
    118 	int client_s;
    119 	/** the fd to listen for server answer */
    120 	int server_s;
    121 
    122 	/** remote client address */
    123 	struct sockaddr_storage addr;
    124 	/** length of address */
    125 	socklen_t addr_len;
    126 	/** timeout on this entry */
    127 	struct timeval timeout;
    128 
    129 	/** list of query items to send to server */
    130 	struct tcp_send_list* querylist;
    131 	/** last in query list */
    132 	struct tcp_send_list* querylast;
    133 	/** list of answer items to send to client */
    134 	struct tcp_send_list* answerlist;
    135 	/** last in answerlist */
    136 	struct tcp_send_list* answerlast;
    137 
    138 	/** next in list */
    139 	struct tcp_proxy* next;
    140 };
    141 
    142 /** usage information for delayer */
    143 static void usage(char* argv[])
    144 {
    145 	printf("usage: %s [options]\n", argv[0]);
    146 	printf("	-f addr : use addr, forward to that server, @port.\n");
    147 	printf("	-b addr : bind to this address to listen.\n");
    148 	printf("	-p port : bind to this port (use 0 for random).\n");
    149 	printf("	-m mem	: use this much memory for waiting queries.\n");
    150 	printf("	-d delay: UDP queries are delayed n milliseconds.\n");
    151 	printf("		  TCP is delayed twice (on send, on recv).\n");
    152 	printf("	-h 	: this help message\n");
    153 	exit(1);
    154 }
    155 
    156 /** timeval compare, t1 < t2 */
    157 static int
    158 dl_tv_smaller(struct timeval* t1, const struct timeval* t2)
    159 {
    160 #ifndef S_SPLINT_S
    161 	if(t1->tv_sec < t2->tv_sec)
    162 		return 1;
    163 	if(t1->tv_sec == t2->tv_sec &&
    164 		t1->tv_usec < t2->tv_usec)
    165 		return 1;
    166 #endif
    167 	return 0;
    168 }
    169 
    170 /** timeval add, t1 += t2 */
    171 static void
    172 dl_tv_add(struct timeval* t1, const struct timeval* t2)
    173 {
    174 #ifndef S_SPLINT_S
    175 	t1->tv_sec += t2->tv_sec;
    176 	t1->tv_usec += t2->tv_usec;
    177 	while(t1->tv_usec >= 1000000) {
    178 		t1->tv_usec -= 1000000;
    179 		t1->tv_sec++;
    180 	}
    181 #endif
    182 }
    183 
    184 /** timeval subtract, t1 -= t2 */
    185 static void
    186 dl_tv_subtract(struct timeval* t1, const struct timeval* t2)
    187 {
    188 #ifndef S_SPLINT_S
    189 	t1->tv_sec -= t2->tv_sec;
    190 	if(t1->tv_usec >= t2->tv_usec) {
    191 		t1->tv_usec -= t2->tv_usec;
    192 	} else {
    193 		t1->tv_sec--;
    194 		t1->tv_usec = 1000000-(t2->tv_usec-t1->tv_usec);
    195 	}
    196 #endif
    197 }
    198 
    199 
    200 /** create new ring buffer */
    201 static struct ringbuf*
    202 ring_create(size_t sz)
    203 {
    204 	struct ringbuf* r = (struct ringbuf*)calloc(1, sizeof(*r));
    205 	if(!r) fatal_exit("out of memory");
    206 	r->buf = (uint8_t*)malloc(sz);
    207 	if(!r->buf) fatal_exit("out of memory");
    208 	r->size = sz;
    209 	r->low = 0;
    210 	r->high = 0;
    211 	return r;
    212 }
    213 
    214 /** delete ring buffer */
    215 static void
    216 ring_delete(struct ringbuf* r)
    217 {
    218 	if(!r) return;
    219 	free(r->buf);
    220 	free(r);
    221 }
    222 
    223 /** add entry to ringbuffer */
    224 static void
    225 ring_add(struct ringbuf* r, sldns_buffer* pkt, struct timeval* now,
    226 	struct timeval* delay, struct proxy* p)
    227 {
    228 	/* time -- proxy* -- 16bitlen -- message */
    229 	uint16_t len = (uint16_t)sldns_buffer_limit(pkt);
    230 	struct timeval when;
    231 	size_t needed;
    232 	uint8_t* where = NULL;
    233 	log_assert(sldns_buffer_limit(pkt) <= 65535);
    234 	needed = sizeof(when) + sizeof(p) + sizeof(len) + len;
    235 	/* put item into ringbuffer */
    236 	if(r->low < r->high) {
    237 		/* used part is in the middle */
    238 		if(r->size - r->high >= needed) {
    239 			where = r->buf + r->high;
    240 			r->high += needed;
    241 		} else if(r->low > needed) {
    242 			/* wrap around ringbuffer */
    243 			/* make sure r->low == r->high means empty */
    244 			/* so r->low == r->high cannot be used to signify
    245 			 * a completely full ringbuf */
    246 			if(r->size - r->high > sizeof(when)+sizeof(p)) {
    247 				/* zero entry at end of buffer */
    248 				memset(r->buf+r->high, 0,
    249 					sizeof(when)+sizeof(p));
    250 			}
    251 			where = r->buf;
    252 			r->high = needed;
    253 		} else {
    254 			/* drop message */
    255 			log_warn("warning: mem full, dropped message");
    256 			return;
    257 		}
    258 	} else {
    259 		/* empty */
    260 		if(r->high == r->low) {
    261 			where = r->buf;
    262 			r->low = 0;
    263 			r->high = needed;
    264 		/* unused part is in the middle */
    265 		/* so ringbuffer has wrapped around */
    266 		} else if(r->low - r->high > needed) {
    267 			where = r->buf + r->high;
    268 			r->high += needed;
    269 		} else {
    270 			log_warn("warning: mem full, dropped message");
    271 			return;
    272 		}
    273 	}
    274 	when = *now;
    275 	dl_tv_add(&when, delay);
    276 	/* copy it at where part */
    277 	log_assert(where != NULL);
    278 	memmove(where, &when, sizeof(when));
    279 	memmove(where+sizeof(when), &p, sizeof(p));
    280 	memmove(where+sizeof(when)+sizeof(p), &len, sizeof(len));
    281 	memmove(where+sizeof(when)+sizeof(p)+sizeof(len),
    282 		sldns_buffer_begin(pkt), len);
    283 }
    284 
    285 /** see if the ringbuffer is empty */
    286 static int
    287 ring_empty(struct ringbuf* r)
    288 {
    289 	return (r->low == r->high);
    290 }
    291 
    292 /** peek at timevalue for next item in ring */
    293 static struct timeval*
    294 ring_peek_time(struct ringbuf* r)
    295 {
    296 	if(ring_empty(r))
    297 		return NULL;
    298 	return (struct timeval*)&r->buf[r->low];
    299 }
    300 
    301 /** get entry from ringbuffer */
    302 static int
    303 ring_pop(struct ringbuf* r, sldns_buffer* pkt, struct timeval* tv,
    304 	struct proxy** p)
    305 {
    306 	/* time -- proxy* -- 16bitlen -- message */
    307 	uint16_t len;
    308 	uint8_t* where = NULL;
    309 	size_t done;
    310 	if(r->low == r->high)
    311 		return 0;
    312 	where = r->buf + r->low;
    313 	memmove(tv, where, sizeof(*tv));
    314 	memmove(p, where+sizeof(*tv), sizeof(*p));
    315 	memmove(&len, where+sizeof(*tv)+sizeof(*p), sizeof(len));
    316 	memmove(sldns_buffer_begin(pkt),
    317 		where+sizeof(*tv)+sizeof(*p)+sizeof(len), len);
    318 	sldns_buffer_set_limit(pkt, (size_t)len);
    319 	done = sizeof(*tv)+sizeof(*p)+sizeof(len)+len;
    320 	/* move lowmark */
    321 	if(r->low < r->high) {
    322 		/* used part in middle */
    323 		log_assert(r->high - r->low >= done);
    324 		r->low += done;
    325 	} else {
    326 		/* unused part in middle */
    327 		log_assert(r->size - r->low >= done);
    328 		r->low += done;
    329 		if(r->size - r->low > sizeof(*tv)+sizeof(*p)) {
    330 			/* see if it is zeroed; means end of buffer */
    331 			struct proxy* pz;
    332 			memmove(&pz, r->buf+r->low+sizeof(*tv), sizeof(pz));
    333 			if(pz == NULL)
    334 				r->low = 0;
    335 		} else r->low = 0;
    336 	}
    337 	if(r->low == r->high) {
    338 		r->low = 0; /* reset if empty */
    339 		r->high = 0;
    340 	}
    341 	return 1;
    342 }
    343 
    344 /** signal handler global info */
    345 static volatile int do_quit = 0;
    346 
    347 /** signal handler for user quit */
    348 static RETSIGTYPE delayer_sigh(int sig)
    349 {
    350 	char str[] = "exit on signal   \n";
    351 	str[15] = '0' + (sig/10)%10;
    352 	str[16] = '0' + sig%10;
    353 	/* simple cast to void will not silence Wunused-result */
    354 	(void)!write(STDOUT_FILENO, str, strlen(str));
    355 	do_quit = 1;
    356 }
    357 
    358 /** send out waiting packets */
    359 static void
    360 service_send(struct ringbuf* ring, struct timeval* now, sldns_buffer* pkt,
    361 	struct sockaddr_storage* srv_addr, socklen_t srv_len)
    362 {
    363 	struct proxy* p;
    364 	struct timeval tv;
    365 	ssize_t sent;
    366 	while(!ring_empty(ring) &&
    367 		dl_tv_smaller(ring_peek_time(ring), now)) {
    368 		/* this items needs to be sent out */
    369 		if(!ring_pop(ring, pkt, &tv, &p))
    370 			fatal_exit("ringbuf error: pop failed");
    371 		verbose(1, "send out query %d.%6.6d",
    372 			(unsigned)tv.tv_sec, (unsigned)tv.tv_usec);
    373 		log_addr(1, "from client", &p->addr, p->addr_len);
    374 		/* send it */
    375 		sent = sendto(p->s, (void*)sldns_buffer_begin(pkt),
    376 			sldns_buffer_limit(pkt), 0,
    377 			(struct sockaddr*)srv_addr, srv_len);
    378 		if(sent == -1) {
    379 			log_err("sendto: %s", sock_strerror(errno));
    380 		} else if(sent != (ssize_t)sldns_buffer_limit(pkt)) {
    381 			log_err("sendto: partial send");
    382 		}
    383 		p->lastuse = *now;
    384 		p->numsent++;
    385 	}
    386 }
    387 
    388 /** do proxy for one readable client */
    389 static void
    390 do_proxy(struct proxy* p, int retsock, sldns_buffer* pkt)
    391 {
    392 	int i;
    393 	ssize_t r;
    394 	for(i=0; i<TRIES_PER_SELECT; i++) {
    395 		r = recv(p->s, (void*)sldns_buffer_begin(pkt),
    396 			sldns_buffer_capacity(pkt), 0);
    397 		if(r == -1) {
    398 #ifndef USE_WINSOCK
    399 			if(errno == EAGAIN || errno == EINTR)
    400 				return;
    401 #else
    402 			if(WSAGetLastError() == WSAEINPROGRESS ||
    403 				WSAGetLastError() == WSAEWOULDBLOCK)
    404 				return;
    405 #endif
    406 			log_err("recv: %s", sock_strerror(errno));
    407 			return;
    408 		}
    409 		sldns_buffer_set_limit(pkt, (size_t)r);
    410 		log_addr(1, "return reply to client", &p->addr, p->addr_len);
    411 		/* send reply back to the real client */
    412 		p->numreturn++;
    413 		r = sendto(retsock, (void*)sldns_buffer_begin(pkt), (size_t)r,
    414 			0, (struct sockaddr*)&p->addr, p->addr_len);
    415 		if(r == -1) {
    416 			log_err("sendto: %s", sock_strerror(errno));
    417 		}
    418 	}
    419 }
    420 
    421 /** proxy return replies to clients */
    422 static void
    423 service_proxy(fd_set* rset, int retsock, struct proxy* proxies,
    424 	sldns_buffer* pkt, struct timeval* now)
    425 {
    426 	struct proxy* p;
    427 	for(p = proxies; p; p = p->next) {
    428 		if(FD_ISSET(p->s, rset)) {
    429 			p->lastuse = *now;
    430 			do_proxy(p, retsock, pkt);
    431 		}
    432 	}
    433 }
    434 
    435 /** find or else create proxy for this remote client */
    436 static struct proxy*
    437 find_create_proxy(struct sockaddr_storage* from, socklen_t from_len,
    438 	fd_set* rorig, int* max, struct proxy** proxies, int serv_ip6,
    439 	struct timeval* now, struct timeval* reuse_timeout)
    440 {
    441 	struct proxy* p;
    442 	struct timeval t;
    443 	for(p = *proxies; p; p = p->next) {
    444 		if(sockaddr_cmp(from, from_len, &p->addr, p->addr_len)==0)
    445 			return p;
    446 	}
    447 	/* possibly: reuse lapsed entries */
    448 	for(p = *proxies; p; p = p->next) {
    449 		if(p->numwait > p->numsent || p->numsent > p->numreturn)
    450 			continue;
    451 		t = *now;
    452 		dl_tv_subtract(&t, &p->lastuse);
    453 		if(dl_tv_smaller(&t, reuse_timeout))
    454 			continue;
    455 		/* yes! */
    456 		verbose(1, "reuse existing entry");
    457 		memmove(&p->addr, from, from_len);
    458 		p->addr_len = from_len;
    459 		p->numreuse++;
    460 		return p;
    461 	}
    462 	/* create new */
    463 	p = (struct proxy*)calloc(1, sizeof(*p));
    464 	if(!p) fatal_exit("out of memory");
    465 	p->s = socket(serv_ip6?AF_INET6:AF_INET, SOCK_DGRAM, 0);
    466 	if(p->s == -1) {
    467 		fatal_exit("socket: %s", sock_strerror(errno));
    468 	}
    469 	fd_set_nonblock(p->s);
    470 	memmove(&p->addr, from, from_len);
    471 	p->addr_len = from_len;
    472 	p->next = *proxies;
    473 	*proxies = p;
    474 	FD_SET(FD_SET_T p->s, rorig);
    475 	if(p->s+1 > *max)
    476 		*max = p->s+1;
    477 	return p;
    478 }
    479 
    480 /** recv new waiting packets */
    481 static void
    482 service_recv(int s, struct ringbuf* ring, sldns_buffer* pkt,
    483 	fd_set* rorig, int* max, struct proxy** proxies,
    484 	struct sockaddr_storage* srv_addr, socklen_t srv_len,
    485 	struct timeval* now, struct timeval* delay, struct timeval* reuse)
    486 {
    487 	int i;
    488 	struct sockaddr_storage from;
    489 	socklen_t from_len;
    490 	ssize_t len;
    491 	struct proxy* p;
    492 	for(i=0; i<TRIES_PER_SELECT; i++) {
    493 		from_len = (socklen_t)sizeof(from);
    494 		len = recvfrom(s, (void*)sldns_buffer_begin(pkt),
    495 			sldns_buffer_capacity(pkt), 0,
    496 			(struct sockaddr*)&from, &from_len);
    497 		if(len < 0) {
    498 #ifndef USE_WINSOCK
    499 			if(errno == EAGAIN || errno == EINTR)
    500 				return;
    501 #else
    502 			if(WSAGetLastError() == WSAEWOULDBLOCK ||
    503 				WSAGetLastError() == WSAEINPROGRESS)
    504 				return;
    505 #endif
    506 			fatal_exit("recvfrom: %s", sock_strerror(errno));
    507 		}
    508 		sldns_buffer_set_limit(pkt, (size_t)len);
    509 		/* find its proxy element */
    510 		p = find_create_proxy(&from, from_len, rorig, max, proxies,
    511 			addr_is_ip6(srv_addr, srv_len), now, reuse);
    512 		if(!p) fatal_exit("error: cannot find or create proxy");
    513 		p->lastuse = *now;
    514 		ring_add(ring, pkt, now, delay, p);
    515 		p->numwait++;
    516 		log_addr(1, "recv from client", &p->addr, p->addr_len);
    517 	}
    518 }
    519 
    520 /** delete tcp proxy */
    521 static void
    522 tcp_proxy_delete(struct tcp_proxy* p)
    523 {
    524 	struct tcp_send_list* s, *sn;
    525 	if(!p)
    526 		return;
    527 	log_addr(1, "delete tcp proxy", &p->addr, p->addr_len);
    528 	s = p->querylist;
    529 	while(s) {
    530 		sn = s->next;
    531 		free(s->item);
    532 		free(s);
    533 		s = sn;
    534 	}
    535 	s = p->answerlist;
    536 	while(s) {
    537 		sn = s->next;
    538 		free(s->item);
    539 		free(s);
    540 		s = sn;
    541 	}
    542 	sock_close(p->client_s);
    543 	if(p->server_s != -1)
    544 		sock_close(p->server_s);
    545 	free(p);
    546 }
    547 
    548 /** accept new TCP connections, and set them up */
    549 static void
    550 service_tcp_listen(int s, fd_set* rorig, int* max, struct tcp_proxy** proxies,
    551 	struct sockaddr_storage* srv_addr, socklen_t srv_len,
    552 	struct timeval* now, struct timeval* tcp_timeout)
    553 {
    554 	int newfd;
    555 	struct sockaddr_storage addr;
    556 	struct tcp_proxy* p;
    557 	socklen_t addr_len;
    558 	newfd = accept(s, (struct sockaddr*)&addr, &addr_len);
    559 	if(newfd == -1) {
    560 #ifndef USE_WINSOCK
    561 		if(errno == EAGAIN || errno == EINTR)
    562 			return;
    563 #else
    564 		if(WSAGetLastError() == WSAEWOULDBLOCK ||
    565 			WSAGetLastError() == WSAEINPROGRESS ||
    566 			WSAGetLastError() == WSAECONNRESET)
    567 			return;
    568 #endif
    569 		fatal_exit("accept: %s", sock_strerror(errno));
    570 	}
    571 	p = (struct tcp_proxy*)calloc(1, sizeof(*p));
    572 	if(!p) fatal_exit("out of memory");
    573 	memmove(&p->addr, &addr, addr_len);
    574 	p->addr_len = addr_len;
    575 	log_addr(1, "new tcp proxy", &p->addr, p->addr_len);
    576 	p->client_s = newfd;
    577 	p->server_s = socket(addr_is_ip6(srv_addr, srv_len)?AF_INET6:AF_INET,
    578 		SOCK_STREAM, 0);
    579 	if(p->server_s == -1) {
    580 		fatal_exit("tcp socket: %s", sock_strerror(errno));
    581 	}
    582 	fd_set_nonblock(p->client_s);
    583 	fd_set_nonblock(p->server_s);
    584 	if(connect(p->server_s, (struct sockaddr*)srv_addr, srv_len) == -1) {
    585 #ifndef USE_WINSOCK
    586 		if(errno != EINPROGRESS) {
    587 			log_err("tcp connect: %s", strerror(errno));
    588 #else
    589 		if(WSAGetLastError() != WSAEWOULDBLOCK &&
    590 			WSAGetLastError() != WSAEINPROGRESS) {
    591 			log_err("tcp connect: %s",
    592 				wsa_strerror(WSAGetLastError()));
    593 #endif
    594 			sock_close(p->server_s);
    595 			sock_close(p->client_s);
    596 			free(p);
    597 			return;
    598 		}
    599 	}
    600 	p->timeout = *now;
    601 	dl_tv_add(&p->timeout, tcp_timeout);
    602 
    603 	/* listen to client and server */
    604 	FD_SET(FD_SET_T p->client_s, rorig);
    605 	FD_SET(FD_SET_T p->server_s, rorig);
    606 	if(p->client_s+1 > *max)
    607 		*max = p->client_s+1;
    608 	if(p->server_s+1 > *max)
    609 		*max = p->server_s+1;
    610 
    611 	/* add into proxy list */
    612 	p->next = *proxies;
    613 	*proxies = p;
    614 }
    615 
    616 /** relay TCP, read a part */
    617 static int
    618 tcp_relay_read(int s, struct tcp_send_list** first,
    619 	struct tcp_send_list** last, struct timeval* now,
    620 	struct timeval* delay, sldns_buffer* pkt)
    621 {
    622 	struct tcp_send_list* item;
    623 	ssize_t r = recv(s, (void*)sldns_buffer_begin(pkt),
    624 		sldns_buffer_capacity(pkt), 0);
    625 	if(r == -1) {
    626 #ifndef USE_WINSOCK
    627 		if(errno == EINTR || errno == EAGAIN)
    628 			return 1;
    629 #else
    630 		if(WSAGetLastError() == WSAEINPROGRESS ||
    631 			WSAGetLastError() == WSAEWOULDBLOCK)
    632 			return 1;
    633 #endif
    634 		log_err("tcp read: %s", sock_strerror(errno));
    635 		return 0;
    636 	} else if(r == 0) {
    637 		/* connection closed */
    638 		return 0;
    639 	}
    640 	item = (struct tcp_send_list*)malloc(sizeof(*item));
    641 	if(!item) {
    642 		log_err("out of memory");
    643 		return 0;
    644 	}
    645 	verbose(1, "read item len %d", (int)r);
    646 	item->len = (size_t)r;
    647 	item->item = memdup(sldns_buffer_begin(pkt), item->len);
    648 	if(!item->item) {
    649 		free(item);
    650 		log_err("out of memory");
    651 		return 0;
    652 	}
    653 	item->done = 0;
    654 	item->wait = *now;
    655 	dl_tv_add(&item->wait, delay);
    656 	item->next = NULL;
    657 
    658 	/* link in */
    659 	if(*first) {
    660 		(*last)->next = item;
    661 	} else {
    662 		*first = item;
    663 	}
    664 	*last = item;
    665 	return 1;
    666 }
    667 
    668 /** relay TCP, write a part */
    669 static int
    670 tcp_relay_write(int s, struct tcp_send_list** first,
    671 	struct tcp_send_list** last, struct timeval* now)
    672 {
    673 	ssize_t r;
    674 	struct tcp_send_list* p;
    675 	while(*first) {
    676 		p = *first;
    677 		/* is the item ready? */
    678 		if(!dl_tv_smaller(&p->wait, now))
    679 			return 1;
    680 		/* write it */
    681 		r = send(s, (void*)(p->item + p->done), p->len - p->done, 0);
    682 		if(r == -1) {
    683 #ifndef USE_WINSOCK
    684 			if(errno == EAGAIN || errno == EINTR)
    685 				return 1;
    686 #else
    687 			if(WSAGetLastError() == WSAEWOULDBLOCK ||
    688 				WSAGetLastError() == WSAEINPROGRESS)
    689 				return 1;
    690 #endif
    691 			log_err("tcp write: %s", sock_strerror(errno));
    692 			return 0;
    693 		} else if(r == 0) {
    694 			/* closed */
    695 			return 0;
    696 		}
    697 		/* account it */
    698 		p->done += (size_t)r;
    699 		verbose(1, "write item %d of %d", (int)p->done, (int)p->len);
    700 		if(p->done >= p->len) {
    701 			free(p->item);
    702 			*first = p->next;
    703 			if(!*first)
    704 				*last = NULL;
    705 			free(p);
    706 		} else {
    707 			/* partial write */
    708 			return 1;
    709 		}
    710 	}
    711 	return 1;
    712 }
    713 
    714 /** perform TCP relaying */
    715 static void
    716 service_tcp_relay(struct tcp_proxy** tcp_proxies, struct timeval* now,
    717 	struct timeval* delay, struct timeval* tcp_timeout, sldns_buffer* pkt,
    718 	fd_set* rset, fd_set* rorig, fd_set* worig)
    719 {
    720 	struct tcp_proxy* p, **prev;
    721 	struct timeval tout;
    722 	int delete_it;
    723 	p = *tcp_proxies;
    724 	prev = tcp_proxies;
    725 	tout = *now;
    726 	dl_tv_add(&tout, tcp_timeout);
    727 
    728 	while(p) {
    729 		delete_it = 0;
    730 		/* can we receive further queries? */
    731 		if(!delete_it && FD_ISSET(p->client_s, rset)) {
    732 			p->timeout = tout;
    733 			log_addr(1, "read tcp query", &p->addr, p->addr_len);
    734 			if(!tcp_relay_read(p->client_s, &p->querylist,
    735 				&p->querylast, now, delay, pkt))
    736 				delete_it = 1;
    737 		}
    738 		/* can we receive further answers? */
    739 		if(!delete_it && p->server_s != -1 &&
    740 			FD_ISSET(p->server_s, rset)) {
    741 			p->timeout = tout;
    742 			log_addr(1, "read tcp answer", &p->addr, p->addr_len);
    743 			if(!tcp_relay_read(p->server_s, &p->answerlist,
    744 				&p->answerlast, now, delay, pkt)) {
    745 				sock_close(p->server_s);
    746 				FD_CLR(FD_SET_T p->server_s, worig);
    747 				FD_CLR(FD_SET_T p->server_s, rorig);
    748 				p->server_s = -1;
    749 			}
    750 		}
    751 		/* can we send on further queries */
    752 		if(!delete_it && p->querylist && p->server_s != -1) {
    753 			p->timeout = tout;
    754 			if(dl_tv_smaller(&p->querylist->wait, now))
    755 				log_addr(1, "write tcp query",
    756 					&p->addr, p->addr_len);
    757 			if(!tcp_relay_write(p->server_s, &p->querylist,
    758 				&p->querylast, now))
    759 				delete_it = 1;
    760 			if(p->querylist &&
    761 				dl_tv_smaller(&p->querylist->wait, now))
    762 				FD_SET(FD_SET_T p->server_s, worig);
    763 			else 	FD_CLR(FD_SET_T p->server_s, worig);
    764 		}
    765 
    766 		/* can we send on further answers */
    767 		if(!delete_it && p->answerlist) {
    768 			p->timeout = tout;
    769 			if(dl_tv_smaller(&p->answerlist->wait, now))
    770 				log_addr(1, "write tcp answer",
    771 					&p->addr, p->addr_len);
    772 			if(!tcp_relay_write(p->client_s, &p->answerlist,
    773 				&p->answerlast, now))
    774 				delete_it = 1;
    775 			if(p->answerlist && dl_tv_smaller(&p->answerlist->wait,
    776 				now))
    777 				FD_SET(FD_SET_T p->client_s, worig);
    778 			else 	FD_CLR(FD_SET_T p->client_s, worig);
    779 			if(!p->answerlist && p->server_s == -1)
    780 				delete_it = 1;
    781 		}
    782 
    783 		/* does this entry timeout? (unused too long) */
    784 		if(dl_tv_smaller(&p->timeout, now)) {
    785 			delete_it = 1;
    786 		}
    787 		if(delete_it) {
    788 			struct tcp_proxy* np = p->next;
    789 			*prev = np;
    790 			FD_CLR(FD_SET_T p->client_s, rorig);
    791 			FD_CLR(FD_SET_T p->client_s, worig);
    792 			if(p->server_s != -1) {
    793 				FD_CLR(FD_SET_T p->server_s, rorig);
    794 				FD_CLR(FD_SET_T p->server_s, worig);
    795 			}
    796 			tcp_proxy_delete(p);
    797 			p = np;
    798 			continue;
    799 		}
    800 
    801 		prev = &p->next;
    802 		p = p->next;
    803 	}
    804 }
    805 
    806 /** find waiting time */
    807 static int
    808 service_findwait(struct timeval* now, struct timeval* wait,
    809 	struct ringbuf* ring, struct tcp_proxy* tcplist)
    810 {
    811 	/* first item is the time to wait */
    812 	struct timeval* peek = ring_peek_time(ring);
    813 	struct timeval tcv;
    814 	int have_tcpval = 0;
    815 	struct tcp_proxy* p;
    816 
    817 	/* also for TCP list the first in sendlists is the time to wait */
    818 	for(p=tcplist; p; p=p->next) {
    819 		if(!have_tcpval)
    820 			tcv = p->timeout;
    821 		have_tcpval = 1;
    822 		if(dl_tv_smaller(&p->timeout, &tcv))
    823 			tcv = p->timeout;
    824 		if(p->querylist && dl_tv_smaller(&p->querylist->wait, &tcv))
    825 			tcv = p->querylist->wait;
    826 		if(p->answerlist && dl_tv_smaller(&p->answerlist->wait, &tcv))
    827 			tcv = p->answerlist->wait;
    828 	}
    829 	if(peek) {
    830 		/* peek can be unaligned */
    831 		/* use wait as a temp variable */
    832 		memmove(wait, peek, sizeof(*wait));
    833 		if(!have_tcpval)
    834 			tcv = *wait;
    835 		else if(dl_tv_smaller(wait, &tcv))
    836 			tcv = *wait;
    837 		have_tcpval = 1;
    838 	}
    839 	if(have_tcpval) {
    840 		*wait = tcv;
    841 		dl_tv_subtract(wait, now);
    842 		return 1;
    843 	}
    844 	/* nothing, block */
    845 	return 0;
    846 }
    847 
    848 /** clear proxy list */
    849 static void
    850 proxy_list_clear(struct proxy* p)
    851 {
    852 	char from[109];
    853 	struct proxy* np;
    854 	int i=0, port;
    855 	while(p) {
    856 		np = p->next;
    857 		port = (int)ntohs(((struct sockaddr_in*)&p->addr)->sin_port);
    858 		if(addr_is_ip6(&p->addr, p->addr_len)) {
    859 			if(inet_ntop(AF_INET6,
    860 				&((struct sockaddr_in6*)&p->addr)->sin6_addr,
    861 				from, (socklen_t)sizeof(from)) == 0)
    862 				(void)strlcpy(from, "err", sizeof(from));
    863 		} else {
    864 			if(inet_ntop(AF_INET,
    865 				&((struct sockaddr_in*)&p->addr)->sin_addr,
    866 				from, (socklen_t)sizeof(from)) == 0)
    867 				(void)strlcpy(from, "err", sizeof(from));
    868 		}
    869 		printf("client[%d]: last %s@%d of %d : %u in, %u out, "
    870 			"%u returned\n", i++, from, port, (int)p->numreuse+1,
    871 			(unsigned)p->numwait, (unsigned)p->numsent,
    872 			(unsigned)p->numreturn);
    873 		sock_close(p->s);
    874 		free(p);
    875 		p = np;
    876 	}
    877 }
    878 
    879 /** clear TCP proxy list */
    880 static void
    881 tcp_proxy_list_clear(struct tcp_proxy* p)
    882 {
    883 	struct tcp_proxy* np;
    884 	while(p) {
    885 		np = p->next;
    886 		tcp_proxy_delete(p);
    887 		p = np;
    888 	}
    889 }
    890 
    891 /** delayer service loop */
    892 static void
    893 service_loop(int udp_s, int listen_s, struct ringbuf* ring,
    894 	struct timeval* delay, struct timeval* reuse,
    895 	struct sockaddr_storage* srv_addr, socklen_t srv_len,
    896 	sldns_buffer* pkt)
    897 {
    898 	fd_set rset, rorig;
    899 	fd_set wset, worig;
    900 	struct timeval now, wait;
    901 	int max, have_wait = 0;
    902 	struct proxy* proxies = NULL;
    903 	struct tcp_proxy* tcp_proxies = NULL;
    904 	struct timeval tcp_timeout;
    905 	tcp_timeout.tv_sec = 120;
    906 	tcp_timeout.tv_usec = 0;
    907 #ifndef S_SPLINT_S
    908 	FD_ZERO(&rorig);
    909 	FD_ZERO(&worig);
    910 	FD_SET(FD_SET_T udp_s, &rorig);
    911 	FD_SET(FD_SET_T listen_s, &rorig);
    912 #endif
    913 	max = udp_s + 1;
    914 	if(listen_s + 1 > max) max = listen_s + 1;
    915 	while(!do_quit) {
    916 		/* wait for events */
    917 		rset = rorig;
    918 		wset = worig;
    919 		if(have_wait)
    920 			verbose(1, "wait for %d.%6.6d",
    921 			(unsigned)wait.tv_sec, (unsigned)wait.tv_usec);
    922 		else	verbose(1, "wait");
    923 		if(select(max, &rset, &wset, NULL, have_wait?&wait:NULL) < 0) {
    924 			if(errno == EAGAIN || errno == EINTR)
    925 				continue;
    926 			fatal_exit("select: %s", strerror(errno));
    927 		}
    928 		/* get current time */
    929 		if(gettimeofday(&now, NULL) < 0) {
    930 			if(errno == EAGAIN || errno == EINTR)
    931 				continue;
    932 			fatal_exit("gettimeofday: %s", strerror(errno));
    933 		}
    934 		verbose(1, "process at %u.%6.6u\n",
    935 			(unsigned)now.tv_sec, (unsigned)now.tv_usec);
    936 		/* sendout delayed queries to master server (frees up buffer)*/
    937 		service_send(ring, &now, pkt, srv_addr, srv_len);
    938 		/* proxy return replies */
    939 		service_proxy(&rset, udp_s, proxies, pkt, &now);
    940 		/* see what can be received to start waiting */
    941 		service_recv(udp_s, ring, pkt, &rorig, &max, &proxies,
    942 			srv_addr, srv_len, &now, delay, reuse);
    943 		/* see if there are new tcp connections */
    944 		service_tcp_listen(listen_s, &rorig, &max, &tcp_proxies,
    945 			srv_addr, srv_len, &now, &tcp_timeout);
    946 		/* service tcp connections */
    947 		service_tcp_relay(&tcp_proxies, &now, delay, &tcp_timeout,
    948 			pkt, &rset, &rorig, &worig);
    949 		/* see what next timeout is (if any) */
    950 		have_wait = service_findwait(&now, &wait, ring, tcp_proxies);
    951 	}
    952 	proxy_list_clear(proxies);
    953 	tcp_proxy_list_clear(tcp_proxies);
    954 }
    955 
    956 /** delayer main service routine */
    957 static void
    958 service(const char* bind_str, int bindport, const char* serv_str,
    959 	size_t memsize, int delay_msec)
    960 {
    961 	struct sockaddr_storage bind_addr, srv_addr;
    962 	socklen_t bind_len, srv_len;
    963 	struct ringbuf* ring = ring_create(memsize);
    964 	struct timeval delay, reuse;
    965 	sldns_buffer* pkt;
    966 	int i, s, listen_s;
    967 #ifndef S_SPLINT_S
    968 	delay.tv_sec = delay_msec / 1000;
    969 	delay.tv_usec = (delay_msec % 1000)*1000;
    970 #endif
    971 	reuse = delay; /* reuse is max(4*delay, 1 second) */
    972 	dl_tv_add(&reuse, &delay);
    973 	dl_tv_add(&reuse, &delay);
    974 	dl_tv_add(&reuse, &delay);
    975 	if(reuse.tv_sec == 0)
    976 		reuse.tv_sec = 1;
    977 	if(!extstrtoaddr(serv_str, &srv_addr, &srv_len, UNBOUND_DNS_PORT)) {
    978 		printf("cannot parse forward address: %s\n", serv_str);
    979 		exit(1);
    980 	}
    981 	pkt = sldns_buffer_new(65535);
    982 	if(!pkt)
    983 		fatal_exit("out of memory");
    984 	if( signal(SIGINT, delayer_sigh) == SIG_ERR ||
    985 #ifdef SIGHUP
    986 		signal(SIGHUP, delayer_sigh) == SIG_ERR ||
    987 #endif
    988 #ifdef SIGQUIT
    989 		signal(SIGQUIT, delayer_sigh) == SIG_ERR ||
    990 #endif
    991 #ifdef SIGBREAK
    992 		signal(SIGBREAK, delayer_sigh) == SIG_ERR ||
    993 #endif
    994 #ifdef SIGALRM
    995 		signal(SIGALRM, delayer_sigh) == SIG_ERR ||
    996 #endif
    997 		signal(SIGTERM, delayer_sigh) == SIG_ERR)
    998 		fatal_exit("could not bind to signal");
    999 	/* bind UDP port */
   1000 	if((s = socket(str_is_ip6(bind_str)?AF_INET6:AF_INET,
   1001 		SOCK_DGRAM, 0)) == -1) {
   1002 		fatal_exit("socket: %s", sock_strerror(errno));
   1003 	}
   1004 	i=0;
   1005 	if(bindport == 0) {
   1006 		bindport = 1024 + ((int)arc4random())%64000;
   1007 		i = 100;
   1008 	}
   1009 	while(1) {
   1010 		if(!ipstrtoaddr(bind_str, bindport, &bind_addr, &bind_len)) {
   1011 			printf("cannot parse listen address: %s\n", bind_str);
   1012 			exit(1);
   1013 		}
   1014 		if(bind(s, (struct sockaddr*)&bind_addr, bind_len) == -1) {
   1015 			log_err("bind: %s", sock_strerror(errno));
   1016 			if(i--==0)
   1017 				fatal_exit("cannot bind any port");
   1018 			bindport = 1024 + ((int)arc4random())%64000;
   1019 		} else break;
   1020 	}
   1021 	fd_set_nonblock(s);
   1022 	/* and TCP port */
   1023 	if((listen_s = socket(str_is_ip6(bind_str)?AF_INET6:AF_INET,
   1024 		SOCK_STREAM, 0)) == -1) {
   1025 		fatal_exit("tcp socket: %s", sock_strerror(errno));
   1026 	}
   1027 #ifdef SO_REUSEADDR
   1028 	if(1) {
   1029 		int on = 1;
   1030 		if(setsockopt(listen_s, SOL_SOCKET, SO_REUSEADDR, (void*)&on,
   1031 			(socklen_t)sizeof(on)) < 0)
   1032 			fatal_exit("setsockopt(.. SO_REUSEADDR ..) failed: %s",
   1033 				sock_strerror(errno));
   1034 	}
   1035 #endif
   1036 	if(bind(listen_s, (struct sockaddr*)&bind_addr, bind_len) == -1) {
   1037 		fatal_exit("tcp bind: %s", sock_strerror(errno));
   1038 	}
   1039 	if(listen(listen_s, 5) == -1) {
   1040 		fatal_exit("tcp listen: %s", sock_strerror(errno));
   1041 	}
   1042 	fd_set_nonblock(listen_s);
   1043 	printf("listening on port: %d\n", bindport);
   1044 
   1045 	/* process loop */
   1046 	do_quit = 0;
   1047 	service_loop(s, listen_s, ring, &delay, &reuse, &srv_addr, srv_len,
   1048 		pkt);
   1049 
   1050 	/* cleanup */
   1051 	verbose(1, "cleanup");
   1052 	sock_close(s);
   1053 	sock_close(listen_s);
   1054 	sldns_buffer_free(pkt);
   1055 	ring_delete(ring);
   1056 }
   1057 
   1058 /** getopt global, in case header files fail to declare it. */
   1059 extern int optind;
   1060 /** getopt global, in case header files fail to declare it. */
   1061 extern char* optarg;
   1062 
   1063 /** main program for delayer */
   1064 int main(int argc, char** argv)
   1065 {
   1066 	int c;		/* defaults */
   1067 	const char* server = "127.0.0.1@53";
   1068 	const char* bindto = "0.0.0.0";
   1069 	int bindport = 0;
   1070 	size_t memsize = 10*1024*1024;
   1071 	int delay = 100;
   1072 
   1073 	verbosity = 0;
   1074 	log_init(0, 0, 0);
   1075 	log_ident_set("delayer");
   1076 	if(argc == 1) usage(argv);
   1077 	while( (c=getopt(argc, argv, "b:d:f:hm:p:")) != -1) {
   1078 		switch(c) {
   1079 			case 'b':
   1080 				bindto = optarg;
   1081 				break;
   1082 			case 'd':
   1083 				if(atoi(optarg)==0 && strcmp(optarg,"0")!=0) {
   1084 					printf("bad delay: %s\n", optarg);
   1085 					return 1;
   1086 				}
   1087 				delay = atoi(optarg);
   1088 				break;
   1089 			case 'f':
   1090 				server = optarg;
   1091 				break;
   1092 			case 'm':
   1093 				if(!cfg_parse_memsize(optarg, &memsize)) {
   1094 					printf("bad memsize: %s\n", optarg);
   1095 					return 1;
   1096 				}
   1097 				break;
   1098 			case 'p':
   1099 				if(atoi(optarg)==0 && strcmp(optarg,"0")!=0) {
   1100 					printf("bad port nr: %s\n", optarg);
   1101 					return 1;
   1102 				}
   1103 				bindport = atoi(optarg);
   1104 				break;
   1105 			case 'h':
   1106 			case '?':
   1107 			default:
   1108 				usage(argv);
   1109 		}
   1110 	}
   1111 	argc -= optind;
   1112 	argv += optind;
   1113 	if(argc != 0)
   1114 		usage(argv);
   1115 
   1116 	printf("bind to %s @ %d and forward to %s after %d msec\n",
   1117 		bindto, bindport, server, delay);
   1118 	service(bindto, bindport, server, memsize, delay);
   1119 	return 0;
   1120 }
   1121