Home | History | Annotate | Line # | Download | only in dist
xfrd-tcp.c revision 1.1.1.4
      1 /*
      2  * xfrd-tcp.c - XFR (transfer) Daemon TCP system source file. Manages tcp conn.
      3  *
      4  * Copyright (c) 2001-2006, NLnet Labs. All rights reserved.
      5  *
      6  * See LICENSE for the license.
      7  *
      8  */
      9 
     10 #include "config.h"
     11 #include <assert.h>
     12 #include <errno.h>
     13 #include <fcntl.h>
     14 #include <unistd.h>
     15 #include <stdlib.h>
     16 #include <sys/uio.h>
     17 #include "nsd.h"
     18 #include "xfrd-tcp.h"
     19 #include "buffer.h"
     20 #include "packet.h"
     21 #include "dname.h"
     22 #include "options.h"
     23 #include "namedb.h"
     24 #include "xfrd.h"
     25 #include "xfrd-disk.h"
     26 #include "util.h"
     27 
     28 /* sort tcppipe, first on IP address, for an IPaddresss, sort on num_unused */
     29 static int
     30 xfrd_pipe_cmp(const void* a, const void* b)
     31 {
     32 	const struct xfrd_tcp_pipeline* x = (struct xfrd_tcp_pipeline*)a;
     33 	const struct xfrd_tcp_pipeline* y = (struct xfrd_tcp_pipeline*)b;
     34 	int r;
     35 	if(x == y)
     36 		return 0;
     37 	if(y->ip_len != x->ip_len)
     38 		/* subtraction works because nonnegative and small numbers */
     39 		return (int)y->ip_len - (int)x->ip_len;
     40 	r = memcmp(&x->ip, &y->ip, x->ip_len);
     41 	if(r != 0)
     42 		return r;
     43 	/* sort that num_unused is sorted ascending, */
     44 	if(x->num_unused != y->num_unused) {
     45 		return (x->num_unused < y->num_unused) ? -1 : 1;
     46 	}
     47 	/* different pipelines are different still, even with same numunused*/
     48 	return (uintptr_t)x < (uintptr_t)y ? -1 : 1;
     49 }
     50 
     51 struct xfrd_tcp_set* xfrd_tcp_set_create(struct region* region)
     52 {
     53 	int i;
     54 	struct xfrd_tcp_set* tcp_set = region_alloc(region,
     55 		sizeof(struct xfrd_tcp_set));
     56 	memset(tcp_set, 0, sizeof(struct xfrd_tcp_set));
     57 	tcp_set->tcp_count = 0;
     58 	tcp_set->tcp_waiting_first = 0;
     59 	tcp_set->tcp_waiting_last = 0;
     60 	for(i=0; i<XFRD_MAX_TCP; i++)
     61 		tcp_set->tcp_state[i] = xfrd_tcp_pipeline_create(region);
     62 	tcp_set->pipetree = rbtree_create(region, &xfrd_pipe_cmp);
     63 	return tcp_set;
     64 }
     65 
     66 struct xfrd_tcp_pipeline*
     67 xfrd_tcp_pipeline_create(region_type* region)
     68 {
     69 	int i;
     70 	struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)
     71 		region_alloc_zero(region, sizeof(*tp));
     72 	tp->num_unused = ID_PIPE_NUM;
     73 	assert(sizeof(tp->unused)/sizeof(tp->unused[0]) == ID_PIPE_NUM);
     74 	for(i=0; i<ID_PIPE_NUM; i++)
     75 		tp->unused[i] = (uint16_t)i;
     76 	tp->tcp_r = xfrd_tcp_create(region, QIOBUFSZ);
     77 	tp->tcp_w = xfrd_tcp_create(region, 512);
     78 	return tp;
     79 }
     80 
     81 void
     82 xfrd_setup_packet(buffer_type* packet,
     83 	uint16_t type, uint16_t klass, const dname_type* dname, uint16_t qid)
     84 {
     85 	/* Set up the header */
     86 	buffer_clear(packet);
     87 	ID_SET(packet, qid);
     88 	FLAGS_SET(packet, 0);
     89 	OPCODE_SET(packet, OPCODE_QUERY);
     90 	QDCOUNT_SET(packet, 1);
     91 	ANCOUNT_SET(packet, 0);
     92 	NSCOUNT_SET(packet, 0);
     93 	ARCOUNT_SET(packet, 0);
     94 	buffer_skip(packet, QHEADERSZ);
     95 
     96 	/* The question record. */
     97 	buffer_write(packet, dname_name(dname), dname->name_size);
     98 	buffer_write_u16(packet, type);
     99 	buffer_write_u16(packet, klass);
    100 }
    101 
    102 static socklen_t
    103 #ifdef INET6
    104 xfrd_acl_sockaddr(acl_options_type* acl, unsigned int port,
    105 	struct sockaddr_storage *sck)
    106 #else
    107 xfrd_acl_sockaddr(acl_options_type* acl, unsigned int port,
    108 	struct sockaddr_in *sck, const char* fromto)
    109 #endif /* INET6 */
    110 {
    111 	/* setup address structure */
    112 #ifdef INET6
    113 	memset(sck, 0, sizeof(struct sockaddr_storage));
    114 #else
    115 	memset(sck, 0, sizeof(struct sockaddr_in));
    116 #endif
    117 	if(acl->is_ipv6) {
    118 #ifdef INET6
    119 		struct sockaddr_in6* sa = (struct sockaddr_in6*)sck;
    120 		sa->sin6_family = AF_INET6;
    121 		sa->sin6_port = htons(port);
    122 		sa->sin6_addr = acl->addr.addr6;
    123 		return sizeof(struct sockaddr_in6);
    124 #else
    125 		log_msg(LOG_ERR, "xfrd: IPv6 connection %s %s attempted but no \
    126 INET6.", fromto, acl->ip_address_spec);
    127 		return 0;
    128 #endif
    129 	} else {
    130 		struct sockaddr_in* sa = (struct sockaddr_in*)sck;
    131 		sa->sin_family = AF_INET;
    132 		sa->sin_port = htons(port);
    133 		sa->sin_addr = acl->addr.addr;
    134 		return sizeof(struct sockaddr_in);
    135 	}
    136 }
    137 
    138 socklen_t
    139 #ifdef INET6
    140 xfrd_acl_sockaddr_to(acl_options_type* acl, struct sockaddr_storage *to)
    141 #else
    142 xfrd_acl_sockaddr_to(acl_options_type* acl, struct sockaddr_in *to)
    143 #endif /* INET6 */
    144 {
    145 	unsigned int port = acl->port?acl->port:(unsigned)atoi(TCP_PORT);
    146 #ifdef INET6
    147 	return xfrd_acl_sockaddr(acl, port, to);
    148 #else
    149 	return xfrd_acl_sockaddr(acl, port, to, "to");
    150 #endif /* INET6 */
    151 }
    152 
    153 socklen_t
    154 #ifdef INET6
    155 xfrd_acl_sockaddr_frm(acl_options_type* acl, struct sockaddr_storage *frm)
    156 #else
    157 xfrd_acl_sockaddr_frm(acl_options_type* acl, struct sockaddr_in *frm)
    158 #endif /* INET6 */
    159 {
    160 	unsigned int port = acl->port?acl->port:0;
    161 #ifdef INET6
    162 	return xfrd_acl_sockaddr(acl, port, frm);
    163 #else
    164 	return xfrd_acl_sockaddr(acl, port, frm, "from");
    165 #endif /* INET6 */
    166 }
    167 
    168 void
    169 xfrd_write_soa_buffer(struct buffer* packet,
    170 	const dname_type* apex, struct xfrd_soa* soa)
    171 {
    172 	size_t rdlength_pos;
    173 	uint16_t rdlength;
    174 	buffer_write(packet, dname_name(apex), apex->name_size);
    175 
    176 	/* already in network order */
    177 	buffer_write(packet, &soa->type, sizeof(soa->type));
    178 	buffer_write(packet, &soa->klass, sizeof(soa->klass));
    179 	buffer_write(packet, &soa->ttl, sizeof(soa->ttl));
    180 	rdlength_pos = buffer_position(packet);
    181 	buffer_skip(packet, sizeof(rdlength));
    182 
    183 	/* uncompressed dnames */
    184 	buffer_write(packet, soa->prim_ns+1, soa->prim_ns[0]);
    185 	buffer_write(packet, soa->email+1, soa->email[0]);
    186 
    187 	buffer_write(packet, &soa->serial, sizeof(uint32_t));
    188 	buffer_write(packet, &soa->refresh, sizeof(uint32_t));
    189 	buffer_write(packet, &soa->retry, sizeof(uint32_t));
    190 	buffer_write(packet, &soa->expire, sizeof(uint32_t));
    191 	buffer_write(packet, &soa->minimum, sizeof(uint32_t));
    192 
    193 	/* write length of RR */
    194 	rdlength = buffer_position(packet) - rdlength_pos - sizeof(rdlength);
    195 	buffer_write_u16_at(packet, rdlength_pos, rdlength);
    196 }
    197 
    198 struct xfrd_tcp*
    199 xfrd_tcp_create(region_type* region, size_t bufsize)
    200 {
    201 	struct xfrd_tcp* tcp_state = (struct xfrd_tcp*)region_alloc(
    202 		region, sizeof(struct xfrd_tcp));
    203 	memset(tcp_state, 0, sizeof(struct xfrd_tcp));
    204 	tcp_state->packet = buffer_create(region, bufsize);
    205 	tcp_state->fd = -1;
    206 
    207 	return tcp_state;
    208 }
    209 
    210 static struct xfrd_tcp_pipeline*
    211 pipeline_find(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
    212 {
    213 	rbnode_type* sme = NULL;
    214 	struct xfrd_tcp_pipeline* r;
    215 	/* smaller buf than a full pipeline with 64kb ID array, only need
    216 	 * the front part with the key info, this front part contains the
    217 	 * members that the compare function uses. */
    218 	enum { keysize = sizeof(struct xfrd_tcp_pipeline) -
    219 		ID_PIPE_NUM*(sizeof(struct xfrd_zone*) + sizeof(uint16_t)) };
    220 	/* void* type for alignment of the struct,
    221 	 * divide the keysize by ptr-size and then add one to round up */
    222 	void* buf[ (keysize / sizeof(void*)) + 1 ];
    223 	struct xfrd_tcp_pipeline* key = (struct xfrd_tcp_pipeline*)buf;
    224 	key->node.key = key;
    225 	key->ip_len = xfrd_acl_sockaddr_to(zone->master, &key->ip);
    226 	key->num_unused = ID_PIPE_NUM;
    227 	/* lookup existing tcp transfer to the master with highest unused */
    228 	if(rbtree_find_less_equal(set->pipetree, key, &sme)) {
    229 		/* exact match, strange, fully unused tcp cannot be open */
    230 		assert(0);
    231 	}
    232 	if(!sme)
    233 		return NULL;
    234 	r = (struct xfrd_tcp_pipeline*)sme->key;
    235 	/* <= key pointed at, is the master correct ? */
    236 	if(r->ip_len != key->ip_len)
    237 		return NULL;
    238 	if(memcmp(&r->ip, &key->ip, key->ip_len) != 0)
    239 		return NULL;
    240 	/* correct master, is there a slot free for this transfer? */
    241 	if(r->num_unused == 0)
    242 		return NULL;
    243 	return r;
    244 }
    245 
    246 /* remove zone from tcp waiting list */
    247 static void
    248 tcp_zone_waiting_list_popfirst(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
    249 {
    250 	assert(zone->tcp_waiting);
    251 	set->tcp_waiting_first = zone->tcp_waiting_next;
    252 	if(zone->tcp_waiting_next)
    253 		zone->tcp_waiting_next->tcp_waiting_prev = NULL;
    254 	else	set->tcp_waiting_last = 0;
    255 	zone->tcp_waiting_next = 0;
    256 	zone->tcp_waiting = 0;
    257 }
    258 
    259 /* remove zone from tcp pipe write-wait list */
    260 static void
    261 tcp_pipe_sendlist_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
    262 {
    263 	if(zone->in_tcp_send) {
    264 		if(zone->tcp_send_prev)
    265 			zone->tcp_send_prev->tcp_send_next=zone->tcp_send_next;
    266 		else	tp->tcp_send_first=zone->tcp_send_next;
    267 		if(zone->tcp_send_next)
    268 			zone->tcp_send_next->tcp_send_prev=zone->tcp_send_prev;
    269 		else	tp->tcp_send_last=zone->tcp_send_prev;
    270 		zone->in_tcp_send = 0;
    271 	}
    272 }
    273 
    274 /* remove first from write-wait list */
    275 static void
    276 tcp_pipe_sendlist_popfirst(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
    277 {
    278 	tp->tcp_send_first = zone->tcp_send_next;
    279 	if(tp->tcp_send_first)
    280 		tp->tcp_send_first->tcp_send_prev = NULL;
    281 	else	tp->tcp_send_last = NULL;
    282 	zone->in_tcp_send = 0;
    283 }
    284 
    285 /* remove zone from tcp pipe ID map */
    286 static void
    287 tcp_pipe_id_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
    288 {
    289 	assert(tp->num_unused < ID_PIPE_NUM && tp->num_unused >= 0);
    290 	assert(tp->id[zone->query_id] == zone);
    291 	tp->id[zone->query_id] = NULL;
    292 	tp->unused[tp->num_unused] = zone->query_id;
    293 	/* must remove and re-add for sort order in tree */
    294 	(void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
    295 	tp->num_unused++;
    296 	(void)rbtree_insert(xfrd->tcp_set->pipetree, &tp->node);
    297 }
    298 
    299 /* stop the tcp pipe (and all its zones need to retry) */
    300 static void
    301 xfrd_tcp_pipe_stop(struct xfrd_tcp_pipeline* tp)
    302 {
    303 	int i, conn = -1;
    304 	assert(tp->num_unused < ID_PIPE_NUM); /* at least one 'in-use' */
    305 	assert(ID_PIPE_NUM - tp->num_unused > tp->num_skip); /* at least one 'nonskip' */
    306 	/* need to retry for all the zones connected to it */
    307 	/* these could use different lists and go to a different nextmaster*/
    308 	for(i=0; i<ID_PIPE_NUM; i++) {
    309 		if(tp->id[i] && tp->id[i] != TCP_NULL_SKIP) {
    310 			xfrd_zone_type* zone = tp->id[i];
    311 			conn = zone->tcp_conn;
    312 			zone->tcp_conn = -1;
    313 			zone->tcp_waiting = 0;
    314 			tcp_pipe_sendlist_remove(tp, zone);
    315 			tcp_pipe_id_remove(tp, zone);
    316 			xfrd_set_refresh_now(zone);
    317 		}
    318 	}
    319 	assert(conn != -1);
    320 	/* now release the entire tcp pipe */
    321 	xfrd_tcp_pipe_release(xfrd->tcp_set, tp, conn);
    322 }
    323 
    324 static void
    325 tcp_pipe_reset_timeout(struct xfrd_tcp_pipeline* tp)
    326 {
    327 	int fd = tp->handler.ev_fd;
    328 	struct timeval tv;
    329 	tv.tv_sec = xfrd->tcp_set->tcp_timeout;
    330 	tv.tv_usec = 0;
    331 	if(tp->handler_added)
    332 		event_del(&tp->handler);
    333 	memset(&tp->handler, 0, sizeof(tp->handler));
    334 	event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|
    335 		(tp->tcp_send_first?EV_WRITE:0), xfrd_handle_tcp_pipe, tp);
    336 	if(event_base_set(xfrd->event_base, &tp->handler) != 0)
    337 		log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
    338 	if(event_add(&tp->handler, &tv) != 0)
    339 		log_msg(LOG_ERR, "xfrd tcp: event_add failed");
    340 	tp->handler_added = 1;
    341 }
    342 
    343 /* handle event from fd of tcp pipe */
    344 void
    345 xfrd_handle_tcp_pipe(int ATTR_UNUSED(fd), short event, void* arg)
    346 {
    347 	struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)arg;
    348 	if((event & EV_WRITE)) {
    349 		tcp_pipe_reset_timeout(tp);
    350 		if(tp->tcp_send_first) {
    351 			DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp write, zone %s",
    352 				tp->tcp_send_first->apex_str));
    353 			xfrd_tcp_write(tp, tp->tcp_send_first);
    354 		}
    355 	}
    356 	if((event & EV_READ) && tp->handler_added) {
    357 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp read"));
    358 		tcp_pipe_reset_timeout(tp);
    359 		xfrd_tcp_read(tp);
    360 	}
    361 	if((event & EV_TIMEOUT) && tp->handler_added) {
    362 		/* tcp connection timed out */
    363 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp timeout"));
    364 		xfrd_tcp_pipe_stop(tp);
    365 	}
    366 }
    367 
    368 /* add a zone to the pipeline, it starts to want to write its query */
    369 static void
    370 pipeline_setup_new_zone(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp,
    371 	xfrd_zone_type* zone)
    372 {
    373 	/* assign the ID */
    374 	int idx;
    375 	assert(tp->num_unused > 0);
    376 	/* we pick a random ID, even though it is TCP anyway */
    377 	idx = random_generate(tp->num_unused);
    378 	zone->query_id = tp->unused[idx];
    379 	tp->unused[idx] = tp->unused[tp->num_unused-1];
    380 	tp->id[zone->query_id] = zone;
    381 	/* decrement unused counter, and fixup tree */
    382 	(void)rbtree_delete(set->pipetree, &tp->node);
    383 	tp->num_unused--;
    384 	(void)rbtree_insert(set->pipetree, &tp->node);
    385 
    386 	/* add to sendlist, at end */
    387 	zone->tcp_send_next = NULL;
    388 	zone->tcp_send_prev = tp->tcp_send_last;
    389 	zone->in_tcp_send = 1;
    390 	if(tp->tcp_send_last)
    391 		tp->tcp_send_last->tcp_send_next = zone;
    392 	else	tp->tcp_send_first = zone;
    393 	tp->tcp_send_last = zone;
    394 
    395 	/* is it first in line? */
    396 	if(tp->tcp_send_first == zone) {
    397 		xfrd_tcp_setup_write_packet(tp, zone);
    398 		/* add write to event handler */
    399 		tcp_pipe_reset_timeout(tp);
    400 	}
    401 }
    402 
    403 void
    404 xfrd_tcp_obtain(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
    405 {
    406 	struct xfrd_tcp_pipeline* tp;
    407 	assert(zone->tcp_conn == -1);
    408 	assert(zone->tcp_waiting == 0);
    409 
    410 	if(set->tcp_count < XFRD_MAX_TCP) {
    411 		int i;
    412 		assert(!set->tcp_waiting_first);
    413 		set->tcp_count ++;
    414 		/* find a free tcp_buffer */
    415 		for(i=0; i<XFRD_MAX_TCP; i++) {
    416 			if(set->tcp_state[i]->tcp_r->fd == -1) {
    417 				zone->tcp_conn = i;
    418 				break;
    419 			}
    420 		}
    421 		/** What if there is no free tcp_buffer? return; */
    422 		if (zone->tcp_conn < 0) {
    423 			return;
    424 		}
    425 
    426 		tp = set->tcp_state[zone->tcp_conn];
    427 		zone->tcp_waiting = 0;
    428 
    429 		/* stop udp use (if any) */
    430 		if(zone->zone_handler.ev_fd != -1)
    431 			xfrd_udp_release(zone);
    432 
    433 		if(!xfrd_tcp_open(set, tp, zone)) {
    434 			zone->tcp_conn = -1;
    435 			set->tcp_count --;
    436 			xfrd_set_refresh_now(zone);
    437 			return;
    438 		}
    439 		/* ip and ip_len set by tcp_open */
    440 		tp->node.key = tp;
    441 		tp->num_unused = ID_PIPE_NUM;
    442 		tp->num_skip = 0;
    443 		tp->tcp_send_first = NULL;
    444 		tp->tcp_send_last = NULL;
    445 		memset(tp->id, 0, sizeof(tp->id));
    446 		for(i=0; i<ID_PIPE_NUM; i++) {
    447 			tp->unused[i] = i;
    448 		}
    449 
    450 		/* insert into tree */
    451 		(void)rbtree_insert(set->pipetree, &tp->node);
    452 		xfrd_deactivate_zone(zone);
    453 		xfrd_unset_timer(zone);
    454 		pipeline_setup_new_zone(set, tp, zone);
    455 		return;
    456 	}
    457 	/* check for a pipeline to the same master with unused ID */
    458 	if((tp = pipeline_find(set, zone))!= NULL) {
    459 		int i;
    460 		if(zone->zone_handler.ev_fd != -1)
    461 			xfrd_udp_release(zone);
    462 		for(i=0; i<XFRD_MAX_TCP; i++) {
    463 			if(set->tcp_state[i] == tp)
    464 				zone->tcp_conn = i;
    465 		}
    466 		xfrd_deactivate_zone(zone);
    467 		xfrd_unset_timer(zone);
    468 		pipeline_setup_new_zone(set, tp, zone);
    469 		return;
    470 	}
    471 
    472 	/* wait, at end of line */
    473 	DEBUG(DEBUG_XFRD,2, (LOG_INFO, "xfrd: max number of tcp "
    474 		"connections (%d) reached.", XFRD_MAX_TCP));
    475 	zone->tcp_waiting_next = 0;
    476 	zone->tcp_waiting_prev = set->tcp_waiting_last;
    477 	zone->tcp_waiting = 1;
    478 	if(!set->tcp_waiting_last) {
    479 		set->tcp_waiting_first = zone;
    480 		set->tcp_waiting_last = zone;
    481 	} else {
    482 		set->tcp_waiting_last->tcp_waiting_next = zone;
    483 		set->tcp_waiting_last = zone;
    484 	}
    485 	xfrd_deactivate_zone(zone);
    486 	xfrd_unset_timer(zone);
    487 }
    488 
    489 int
    490 xfrd_tcp_open(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp,
    491 	xfrd_zone_type* zone)
    492 {
    493 	int fd, family, conn;
    494 	struct timeval tv;
    495 	assert(zone->tcp_conn != -1);
    496 
    497 	/* if there is no next master, fallback to use the first one */
    498 	/* but there really should be a master set */
    499 	if(!zone->master) {
    500 		zone->master = zone->zone_options->pattern->request_xfr;
    501 		zone->master_num = 0;
    502 	}
    503 
    504 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s open tcp conn to %s",
    505 		zone->apex_str, zone->master->ip_address_spec));
    506 	tp->tcp_r->is_reading = 1;
    507 	tp->tcp_r->total_bytes = 0;
    508 	tp->tcp_r->msglen = 0;
    509 	buffer_clear(tp->tcp_r->packet);
    510 	tp->tcp_w->is_reading = 0;
    511 	tp->tcp_w->total_bytes = 0;
    512 	tp->tcp_w->msglen = 0;
    513 	tp->connection_established = 0;
    514 
    515 	if(zone->master->is_ipv6) {
    516 #ifdef INET6
    517 		family = PF_INET6;
    518 #else
    519 		xfrd_set_refresh_now(zone);
    520 		return 0;
    521 #endif
    522 	} else {
    523 		family = PF_INET;
    524 	}
    525 	fd = socket(family, SOCK_STREAM, IPPROTO_TCP);
    526 	if(fd == -1) {
    527 		/* squelch 'Address family not supported by protocol' at low
    528 		 * verbosity levels */
    529 		if(errno != EAFNOSUPPORT || verbosity > 2)
    530 		    log_msg(LOG_ERR, "xfrd: %s cannot create tcp socket: %s",
    531 			zone->master->ip_address_spec, strerror(errno));
    532 		xfrd_set_refresh_now(zone);
    533 		return 0;
    534 	}
    535 	if(fcntl(fd, F_SETFL, O_NONBLOCK) == -1) {
    536 		log_msg(LOG_ERR, "xfrd: fcntl failed: %s", strerror(errno));
    537 		close(fd);
    538 		xfrd_set_refresh_now(zone);
    539 		return 0;
    540 	}
    541 
    542 	if(xfrd->nsd->outgoing_tcp_mss > 0) {
    543 #if defined(IPPROTO_TCP) && defined(TCP_MAXSEG)
    544 		if(setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG,
    545 			(void*)&xfrd->nsd->outgoing_tcp_mss,
    546 			sizeof(xfrd->nsd->outgoing_tcp_mss)) < 0) {
    547 			log_msg(LOG_ERR, "xfrd: setsockopt(TCP_MAXSEG)"
    548 					"failed: %s", strerror(errno));
    549 		}
    550 #else
    551 		log_msg(LOG_ERR, "setsockopt(TCP_MAXSEG) unsupported");
    552 #endif
    553 	}
    554 
    555 	tp->ip_len = xfrd_acl_sockaddr_to(zone->master, &tp->ip);
    556 
    557 	/* bind it */
    558 	if (!xfrd_bind_local_interface(fd, zone->zone_options->pattern->
    559 		outgoing_interface, zone->master, 1)) {
    560 		close(fd);
    561 		xfrd_set_refresh_now(zone);
    562 		return 0;
    563         }
    564 
    565 	conn = connect(fd, (struct sockaddr*)&tp->ip, tp->ip_len);
    566 	if (conn == -1 && errno != EINPROGRESS) {
    567 		log_msg(LOG_ERR, "xfrd: connect %s failed: %s",
    568 			zone->master->ip_address_spec, strerror(errno));
    569 		close(fd);
    570 		xfrd_set_refresh_now(zone);
    571 		return 0;
    572 	}
    573 	tp->tcp_r->fd = fd;
    574 	tp->tcp_w->fd = fd;
    575 
    576 	/* set the tcp pipe event */
    577 	if(tp->handler_added)
    578 		event_del(&tp->handler);
    579 	memset(&tp->handler, 0, sizeof(tp->handler));
    580 	event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|EV_WRITE,
    581 		xfrd_handle_tcp_pipe, tp);
    582 	if(event_base_set(xfrd->event_base, &tp->handler) != 0)
    583 		log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
    584 	tv.tv_sec = set->tcp_timeout;
    585 	tv.tv_usec = 0;
    586 	if(event_add(&tp->handler, &tv) != 0)
    587 		log_msg(LOG_ERR, "xfrd tcp: event_add failed");
    588 	tp->handler_added = 1;
    589 	return 1;
    590 }
    591 
    592 void
    593 xfrd_tcp_setup_write_packet(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
    594 {
    595 	struct xfrd_tcp* tcp = tp->tcp_w;
    596 	assert(zone->tcp_conn != -1);
    597 	assert(zone->tcp_waiting == 0);
    598 	/* start AXFR or IXFR for the zone */
    599 	if(zone->soa_disk_acquired == 0 || zone->master->use_axfr_only ||
    600 		zone->master->ixfr_disabled ||
    601 		/* if zone expired, after the first round, do not ask for
    602 		 * IXFR any more, but full AXFR (of any serial number) */
    603 		(zone->state == xfrd_zone_expired && zone->round_num != 0)) {
    604 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request full zone transfer "
    605 						"(AXFR) for %s to %s",
    606 			zone->apex_str, zone->master->ip_address_spec));
    607 
    608 		xfrd_setup_packet(tcp->packet, TYPE_AXFR, CLASS_IN, zone->apex,
    609 			zone->query_id);
    610 		zone->query_type = TYPE_AXFR;
    611 	} else {
    612 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request incremental zone "
    613 						"transfer (IXFR) for %s to %s",
    614 			zone->apex_str, zone->master->ip_address_spec));
    615 
    616 		xfrd_setup_packet(tcp->packet, TYPE_IXFR, CLASS_IN, zone->apex,
    617 			zone->query_id);
    618 		zone->query_type = TYPE_IXFR;
    619         	NSCOUNT_SET(tcp->packet, 1);
    620 		xfrd_write_soa_buffer(tcp->packet, zone->apex, &zone->soa_disk);
    621 	}
    622 	/* old transfer needs to be removed still? */
    623 	if(zone->msg_seq_nr)
    624 		xfrd_unlink_xfrfile(xfrd->nsd, zone->xfrfilenumber);
    625 	zone->msg_seq_nr = 0;
    626 	zone->msg_rr_count = 0;
    627 	if(zone->master->key_options && zone->master->key_options->tsig_key) {
    628 		xfrd_tsig_sign_request(tcp->packet, &zone->tsig, zone->master);
    629 	}
    630 	buffer_flip(tcp->packet);
    631 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "sent tcp query with ID %d", zone->query_id));
    632 	tcp->msglen = buffer_limit(tcp->packet);
    633 	tcp->total_bytes = 0;
    634 }
    635 
    636 static void
    637 tcp_conn_ready_for_reading(struct xfrd_tcp* tcp)
    638 {
    639 	tcp->total_bytes = 0;
    640 	tcp->msglen = 0;
    641 	buffer_clear(tcp->packet);
    642 }
    643 
    644 int conn_write(struct xfrd_tcp* tcp)
    645 {
    646 	ssize_t sent;
    647 
    648 	if(tcp->total_bytes < sizeof(tcp->msglen)) {
    649 		uint16_t sendlen = htons(tcp->msglen);
    650 #ifdef HAVE_WRITEV
    651 		struct iovec iov[2];
    652 		iov[0].iov_base = (uint8_t*)&sendlen + tcp->total_bytes;
    653 		iov[0].iov_len = sizeof(sendlen) - tcp->total_bytes;
    654 		iov[1].iov_base = buffer_begin(tcp->packet);
    655 		iov[1].iov_len = buffer_limit(tcp->packet);
    656 		sent = writev(tcp->fd, iov, 2);
    657 #else /* HAVE_WRITEV */
    658 		sent = write(tcp->fd,
    659 			(const char*)&sendlen + tcp->total_bytes,
    660 			sizeof(tcp->msglen) - tcp->total_bytes);
    661 #endif /* HAVE_WRITEV */
    662 
    663 		if(sent == -1) {
    664 			if(errno == EAGAIN || errno == EINTR) {
    665 				/* write would block, try later */
    666 				return 0;
    667 			} else {
    668 				return -1;
    669 			}
    670 		}
    671 
    672 		tcp->total_bytes += sent;
    673 		if(sent > (ssize_t)sizeof(tcp->msglen))
    674 			buffer_skip(tcp->packet, sent-sizeof(tcp->msglen));
    675 		if(tcp->total_bytes < sizeof(tcp->msglen)) {
    676 			/* incomplete write, resume later */
    677 			return 0;
    678 		}
    679 #ifdef HAVE_WRITEV
    680 		if(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen)) {
    681 			/* packet done */
    682 			return 1;
    683 		}
    684 #endif
    685 		assert(tcp->total_bytes >= sizeof(tcp->msglen));
    686 	}
    687 
    688 	assert(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen));
    689 
    690 	sent = write(tcp->fd,
    691 		buffer_current(tcp->packet),
    692 		buffer_remaining(tcp->packet));
    693 	if(sent == -1) {
    694 		if(errno == EAGAIN || errno == EINTR) {
    695 			/* write would block, try later */
    696 			return 0;
    697 		} else {
    698 			return -1;
    699 		}
    700 	}
    701 
    702 	buffer_skip(tcp->packet, sent);
    703 	tcp->total_bytes += sent;
    704 
    705 	if(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen)) {
    706 		/* more to write when socket becomes writable again */
    707 		return 0;
    708 	}
    709 
    710 	assert(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen));
    711 	return 1;
    712 }
    713 
    714 void
    715 xfrd_tcp_write(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
    716 {
    717 	int ret;
    718 	struct xfrd_tcp* tcp = tp->tcp_w;
    719 	assert(zone->tcp_conn != -1);
    720 	assert(zone == tp->tcp_send_first);
    721 	/* see if for non-established connection, there is a connect error */
    722 	if(!tp->connection_established) {
    723 		/* check for pending error from nonblocking connect */
    724 		/* from Stevens, unix network programming, vol1, 3rd ed, p450 */
    725 		int error = 0;
    726 		socklen_t len = sizeof(error);
    727 		if(getsockopt(tcp->fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0){
    728 			error = errno; /* on solaris errno is error */
    729 		}
    730 		if(error == EINPROGRESS || error == EWOULDBLOCK)
    731 			return; /* try again later */
    732 		if(error != 0) {
    733 			log_msg(LOG_ERR, "%s: Could not tcp connect to %s: %s",
    734 				zone->apex_str, zone->master->ip_address_spec,
    735 				strerror(error));
    736 			xfrd_tcp_pipe_stop(tp);
    737 			return;
    738 		}
    739 	}
    740 	ret = conn_write(tcp);
    741 	if(ret == -1) {
    742 		log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
    743 		xfrd_tcp_pipe_stop(tp);
    744 		return;
    745 	}
    746 	if(tcp->total_bytes != 0 && !tp->connection_established)
    747 		tp->connection_established = 1;
    748 	if(ret == 0) {
    749 		return; /* write again later */
    750 	}
    751 	/* done writing this message */
    752 
    753 	/* remove first zone from sendlist */
    754 	tcp_pipe_sendlist_popfirst(tp, zone);
    755 
    756 	/* see if other zone wants to write; init; let it write (now) */
    757 	/* and use a loop, because 64k stack calls is a too much */
    758 	while(tp->tcp_send_first) {
    759 		/* setup to write for this zone */
    760 		xfrd_tcp_setup_write_packet(tp, tp->tcp_send_first);
    761 		/* attempt to write for this zone (if success, continue loop)*/
    762 		ret = conn_write(tcp);
    763 		if(ret == -1) {
    764 			log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
    765 			xfrd_tcp_pipe_stop(tp);
    766 			return;
    767 		}
    768 		if(ret == 0)
    769 			return; /* write again later */
    770 		tcp_pipe_sendlist_popfirst(tp, tp->tcp_send_first);
    771 	}
    772 
    773 	/* if sendlist empty, remove WRITE from event */
    774 
    775 	/* listen to READ, and not WRITE events */
    776 	assert(tp->tcp_send_first == NULL);
    777 	tcp_pipe_reset_timeout(tp);
    778 }
    779 
    780 int
    781 conn_read(struct xfrd_tcp* tcp)
    782 {
    783 	ssize_t received;
    784 	/* receive leading packet length bytes */
    785 	if(tcp->total_bytes < sizeof(tcp->msglen)) {
    786 		received = read(tcp->fd,
    787 			(char*) &tcp->msglen + tcp->total_bytes,
    788 			sizeof(tcp->msglen) - tcp->total_bytes);
    789 		if(received == -1) {
    790 			if(errno == EAGAIN || errno == EINTR) {
    791 				/* read would block, try later */
    792 				return 0;
    793 			} else {
    794 #ifdef ECONNRESET
    795 				if (verbosity >= 2 || errno != ECONNRESET)
    796 #endif /* ECONNRESET */
    797 				log_msg(LOG_ERR, "tcp read sz: %s", strerror(errno));
    798 				return -1;
    799 			}
    800 		} else if(received == 0) {
    801 			/* EOF */
    802 			return -1;
    803 		}
    804 		tcp->total_bytes += received;
    805 		if(tcp->total_bytes < sizeof(tcp->msglen)) {
    806 			/* not complete yet, try later */
    807 			return 0;
    808 		}
    809 
    810 		assert(tcp->total_bytes == sizeof(tcp->msglen));
    811 		tcp->msglen = ntohs(tcp->msglen);
    812 
    813 		if(tcp->msglen == 0) {
    814 			buffer_set_limit(tcp->packet, tcp->msglen);
    815 			return 1;
    816 		}
    817 		if(tcp->msglen > buffer_capacity(tcp->packet)) {
    818 			log_msg(LOG_ERR, "buffer too small, dropping connection");
    819 			return 0;
    820 		}
    821 		buffer_set_limit(tcp->packet, tcp->msglen);
    822 	}
    823 
    824 	assert(buffer_remaining(tcp->packet) > 0);
    825 
    826 	received = read(tcp->fd, buffer_current(tcp->packet),
    827 		buffer_remaining(tcp->packet));
    828 	if(received == -1) {
    829 		if(errno == EAGAIN || errno == EINTR) {
    830 			/* read would block, try later */
    831 			return 0;
    832 		} else {
    833 #ifdef ECONNRESET
    834 			if (verbosity >= 2 || errno != ECONNRESET)
    835 #endif /* ECONNRESET */
    836 			log_msg(LOG_ERR, "tcp read %s", strerror(errno));
    837 			return -1;
    838 		}
    839 	} else if(received == 0) {
    840 		/* EOF */
    841 		return -1;
    842 	}
    843 
    844 	tcp->total_bytes += received;
    845 	buffer_skip(tcp->packet, received);
    846 
    847 	if(buffer_remaining(tcp->packet) > 0) {
    848 		/* not complete yet, wait for more */
    849 		return 0;
    850 	}
    851 
    852 	/* completed */
    853 	assert(buffer_position(tcp->packet) == tcp->msglen);
    854 	return 1;
    855 }
    856 
    857 void
    858 xfrd_tcp_read(struct xfrd_tcp_pipeline* tp)
    859 {
    860 	xfrd_zone_type* zone;
    861 	struct xfrd_tcp* tcp = tp->tcp_r;
    862 	int ret;
    863 	enum xfrd_packet_result pkt_result;
    864 
    865 	ret = conn_read(tcp);
    866 	if(ret == -1) {
    867 		xfrd_tcp_pipe_stop(tp);
    868 		return;
    869 	}
    870 	if(ret == 0)
    871 		return;
    872 	/* completed msg */
    873 	buffer_flip(tcp->packet);
    874 	/* see which ID number it is, if skip, handle skip, NULL: warn */
    875 	if(tcp->msglen < QHEADERSZ) {
    876 		/* too short for DNS header, skip it */
    877 		DEBUG(DEBUG_XFRD,1, (LOG_INFO,
    878 			"xfrd: tcp skip response that is too short"));
    879 		tcp_conn_ready_for_reading(tcp);
    880 		return;
    881 	}
    882 	zone = tp->id[ID(tcp->packet)];
    883 	if(!zone || zone == TCP_NULL_SKIP) {
    884 		/* no zone for this id? skip it */
    885 		DEBUG(DEBUG_XFRD,1, (LOG_INFO,
    886 			"xfrd: tcp skip response with %s ID",
    887 			zone?"set-to-skip":"unknown"));
    888 		tcp_conn_ready_for_reading(tcp);
    889 		return;
    890 	}
    891 	assert(zone->tcp_conn != -1);
    892 
    893 	/* handle message for zone */
    894 	pkt_result = xfrd_handle_received_xfr_packet(zone, tcp->packet);
    895 	/* setup for reading the next packet on this connection */
    896 	tcp_conn_ready_for_reading(tcp);
    897 	switch(pkt_result) {
    898 		case xfrd_packet_more:
    899 			/* wait for next packet */
    900 			break;
    901 		case xfrd_packet_newlease:
    902 			/* set to skip if more packets with this ID */
    903 			tp->id[zone->query_id] = TCP_NULL_SKIP;
    904 			tp->num_skip++;
    905 			/* fall through to remove zone from tp */
    906 			/* fallthrough */
    907 		case xfrd_packet_transfer:
    908 			if(zone->zone_options->pattern->multi_master_check) {
    909 				xfrd_tcp_release(xfrd->tcp_set, zone);
    910 				xfrd_make_request(zone);
    911 				break;
    912 			}
    913 			xfrd_tcp_release(xfrd->tcp_set, zone);
    914 			assert(zone->round_num == -1);
    915 			break;
    916 		case xfrd_packet_notimpl:
    917 			xfrd_disable_ixfr(zone);
    918 			xfrd_tcp_release(xfrd->tcp_set, zone);
    919 			/* query next server */
    920 			xfrd_make_request(zone);
    921 			break;
    922 		case xfrd_packet_bad:
    923 		case xfrd_packet_tcp:
    924 		default:
    925 			/* set to skip if more packets with this ID */
    926 			tp->id[zone->query_id] = TCP_NULL_SKIP;
    927 			tp->num_skip++;
    928 			xfrd_tcp_release(xfrd->tcp_set, zone);
    929 			/* query next server */
    930 			xfrd_make_request(zone);
    931 			break;
    932 	}
    933 }
    934 
    935 void
    936 xfrd_tcp_release(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
    937 {
    938 	int conn = zone->tcp_conn;
    939 	struct xfrd_tcp_pipeline* tp = set->tcp_state[conn];
    940 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s released tcp conn to %s",
    941 		zone->apex_str, zone->master->ip_address_spec));
    942 	assert(zone->tcp_conn != -1);
    943 	assert(zone->tcp_waiting == 0);
    944 	zone->tcp_conn = -1;
    945 	zone->tcp_waiting = 0;
    946 
    947 	/* remove from tcp_send list */
    948 	tcp_pipe_sendlist_remove(tp, zone);
    949 	/* remove it from the ID list */
    950 	if(tp->id[zone->query_id] != TCP_NULL_SKIP)
    951 		tcp_pipe_id_remove(tp, zone);
    952 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: released tcp pipe now %d unused",
    953 		tp->num_unused));
    954 	/* if pipe was full, but no more, then see if waiting element is
    955 	 * for the same master, and can fill the unused ID */
    956 	if(tp->num_unused == 1 && set->tcp_waiting_first) {
    957 #ifdef INET6
    958 		struct sockaddr_storage to;
    959 #else
    960 		struct sockaddr_in to;
    961 #endif
    962 		socklen_t to_len = xfrd_acl_sockaddr_to(
    963 			set->tcp_waiting_first->master, &to);
    964 		if(to_len == tp->ip_len && memcmp(&to, &tp->ip, to_len) == 0) {
    965 			/* use this connection for the waiting zone */
    966 			zone = set->tcp_waiting_first;
    967 			assert(zone->tcp_conn == -1);
    968 			zone->tcp_conn = conn;
    969 			tcp_zone_waiting_list_popfirst(set, zone);
    970 			if(zone->zone_handler.ev_fd != -1)
    971 				xfrd_udp_release(zone);
    972 			xfrd_unset_timer(zone);
    973 			pipeline_setup_new_zone(set, tp, zone);
    974 			return;
    975 		}
    976 		/* waiting zone did not go to same server */
    977 	}
    978 
    979 	/* if all unused, or only skipped leftover, close the pipeline */
    980 	if(tp->num_unused >= ID_PIPE_NUM || tp->num_skip >= ID_PIPE_NUM - tp->num_unused)
    981 		xfrd_tcp_pipe_release(set, tp, conn);
    982 }
    983 
    984 void
    985 xfrd_tcp_pipe_release(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp,
    986 	int conn)
    987 {
    988 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: tcp pipe released"));
    989 	/* one handler per tcp pipe */
    990 	if(tp->handler_added)
    991 		event_del(&tp->handler);
    992 	tp->handler_added = 0;
    993 
    994 	/* fd in tcp_r and tcp_w is the same, close once */
    995 	if(tp->tcp_r->fd != -1)
    996 		close(tp->tcp_r->fd);
    997 	tp->tcp_r->fd = -1;
    998 	tp->tcp_w->fd = -1;
    999 
   1000 	/* remove from pipetree */
   1001 	(void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
   1002 
   1003 	/* a waiting zone can use the free tcp slot (to another server) */
   1004 	/* if that zone fails to set-up or connect, we try to start the next
   1005 	 * waiting zone in the list */
   1006 	while(set->tcp_count == XFRD_MAX_TCP && set->tcp_waiting_first) {
   1007 		int i;
   1008 
   1009 		/* pop first waiting process */
   1010 		xfrd_zone_type* zone = set->tcp_waiting_first;
   1011 		/* start it */
   1012 		assert(zone->tcp_conn == -1);
   1013 		zone->tcp_conn = conn;
   1014 		tcp_zone_waiting_list_popfirst(set, zone);
   1015 
   1016 		/* stop udp (if any) */
   1017 		if(zone->zone_handler.ev_fd != -1)
   1018 			xfrd_udp_release(zone);
   1019 		if(!xfrd_tcp_open(set, tp, zone)) {
   1020 			zone->tcp_conn = -1;
   1021 			xfrd_set_refresh_now(zone);
   1022 			/* try to start the next zone (if any) */
   1023 			continue;
   1024 		}
   1025 		/* re-init this tcppipe */
   1026 		/* ip and ip_len set by tcp_open */
   1027 		tp->node.key = tp;
   1028 		tp->num_unused = ID_PIPE_NUM;
   1029 		tp->num_skip = 0;
   1030 		tp->tcp_send_first = NULL;
   1031 		tp->tcp_send_last = NULL;
   1032 		memset(tp->id, 0, sizeof(tp->id));
   1033 		for(i=0; i<ID_PIPE_NUM; i++) {
   1034 			tp->unused[i] = i;
   1035 		}
   1036 
   1037 		/* insert into tree */
   1038 		(void)rbtree_insert(set->pipetree, &tp->node);
   1039 		/* setup write */
   1040 		xfrd_unset_timer(zone);
   1041 		pipeline_setup_new_zone(set, tp, zone);
   1042 		/* started a task, no need for cleanups, so return */
   1043 		return;
   1044 	}
   1045 	/* no task to start, cleanup */
   1046 	assert(!set->tcp_waiting_first);
   1047 	set->tcp_count --;
   1048 	assert(set->tcp_count >= 0);
   1049 }
   1050 
   1051