Home | History | Annotate | Line # | Download | only in dist
xfrd-tcp.c revision 1.1
      1 /*
      2  * xfrd-tcp.c - XFR (transfer) Daemon TCP system source file. Manages tcp conn.
      3  *
      4  * Copyright (c) 2001-2006, NLnet Labs. All rights reserved.
      5  *
      6  * See LICENSE for the license.
      7  *
      8  */
      9 
     10 #include "config.h"
     11 #include <assert.h>
     12 #include <errno.h>
     13 #include <fcntl.h>
     14 #include <unistd.h>
     15 #include <stdlib.h>
     16 #include "nsd.h"
     17 #include "xfrd-tcp.h"
     18 #include "buffer.h"
     19 #include "packet.h"
     20 #include "dname.h"
     21 #include "options.h"
     22 #include "namedb.h"
     23 #include "xfrd.h"
     24 #include "xfrd-disk.h"
     25 #include "util.h"
     26 
     27 /* sort tcppipe, first on IP address, for an IPaddresss, sort on num_unused */
     28 static int
     29 xfrd_pipe_cmp(const void* a, const void* b)
     30 {
     31 	const struct xfrd_tcp_pipeline* x = (struct xfrd_tcp_pipeline*)a;
     32 	const struct xfrd_tcp_pipeline* y = (struct xfrd_tcp_pipeline*)b;
     33 	int r;
     34 	if(x == y)
     35 		return 0;
     36 	if(y->ip_len != x->ip_len)
     37 		/* subtraction works because nonnegative and small numbers */
     38 		return (int)y->ip_len - (int)x->ip_len;
     39 	r = memcmp(&x->ip, &y->ip, x->ip_len);
     40 	if(r != 0)
     41 		return r;
     42 	/* sort that num_unused is sorted ascending, */
     43 	if(x->num_unused != y->num_unused) {
     44 		return (x->num_unused < y->num_unused) ? -1 : 1;
     45 	}
     46 	/* different pipelines are different still, even with same numunused*/
     47 	return (uintptr_t)x < (uintptr_t)y ? -1 : 1;
     48 }
     49 
     50 xfrd_tcp_set_t* xfrd_tcp_set_create(struct region* region)
     51 {
     52 	int i;
     53 	xfrd_tcp_set_t* tcp_set = region_alloc(region, sizeof(xfrd_tcp_set_t));
     54 	memset(tcp_set, 0, sizeof(xfrd_tcp_set_t));
     55 	tcp_set->tcp_count = 0;
     56 	tcp_set->tcp_waiting_first = 0;
     57 	tcp_set->tcp_waiting_last = 0;
     58 	for(i=0; i<XFRD_MAX_TCP; i++)
     59 		tcp_set->tcp_state[i] = xfrd_tcp_pipeline_create(region);
     60 	tcp_set->pipetree = rbtree_create(region, &xfrd_pipe_cmp);
     61 	return tcp_set;
     62 }
     63 
     64 struct xfrd_tcp_pipeline*
     65 xfrd_tcp_pipeline_create(region_type* region)
     66 {
     67 	int i;
     68 	struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)
     69 		region_alloc_zero(region, sizeof(*tp));
     70 	tp->num_unused = ID_PIPE_NUM;
     71 	assert(sizeof(tp->unused)/sizeof(tp->unused[0]) == ID_PIPE_NUM);
     72 	for(i=0; i<ID_PIPE_NUM; i++)
     73 		tp->unused[i] = (uint16_t)i;
     74 	tp->tcp_r = xfrd_tcp_create(region, QIOBUFSZ);
     75 	tp->tcp_w = xfrd_tcp_create(region, 512);
     76 	return tp;
     77 }
     78 
     79 void
     80 xfrd_setup_packet(buffer_type* packet,
     81 	uint16_t type, uint16_t klass, const dname_type* dname, uint16_t qid)
     82 {
     83 	/* Set up the header */
     84 	buffer_clear(packet);
     85 	ID_SET(packet, qid);
     86 	FLAGS_SET(packet, 0);
     87 	OPCODE_SET(packet, OPCODE_QUERY);
     88 	QDCOUNT_SET(packet, 1);
     89 	ANCOUNT_SET(packet, 0);
     90 	NSCOUNT_SET(packet, 0);
     91 	ARCOUNT_SET(packet, 0);
     92 	buffer_skip(packet, QHEADERSZ);
     93 
     94 	/* The question record. */
     95 	buffer_write(packet, dname_name(dname), dname->name_size);
     96 	buffer_write_u16(packet, type);
     97 	buffer_write_u16(packet, klass);
     98 }
     99 
    100 static socklen_t
    101 #ifdef INET6
    102 xfrd_acl_sockaddr(acl_options_t* acl, unsigned int port,
    103 	struct sockaddr_storage *sck)
    104 #else
    105 xfrd_acl_sockaddr(acl_options_t* acl, unsigned int port,
    106 	struct sockaddr_in *sck, const char* fromto)
    107 #endif /* INET6 */
    108 {
    109 	/* setup address structure */
    110 #ifdef INET6
    111 	memset(sck, 0, sizeof(struct sockaddr_storage));
    112 #else
    113 	memset(sck, 0, sizeof(struct sockaddr_in));
    114 #endif
    115 	if(acl->is_ipv6) {
    116 #ifdef INET6
    117 		struct sockaddr_in6* sa = (struct sockaddr_in6*)sck;
    118 		sa->sin6_family = AF_INET6;
    119 		sa->sin6_port = htons(port);
    120 		sa->sin6_addr = acl->addr.addr6;
    121 		return sizeof(struct sockaddr_in6);
    122 #else
    123 		log_msg(LOG_ERR, "xfrd: IPv6 connection %s %s attempted but no \
    124 INET6.", fromto, acl->ip_address_spec);
    125 		return 0;
    126 #endif
    127 	} else {
    128 		struct sockaddr_in* sa = (struct sockaddr_in*)sck;
    129 		sa->sin_family = AF_INET;
    130 		sa->sin_port = htons(port);
    131 		sa->sin_addr = acl->addr.addr;
    132 		return sizeof(struct sockaddr_in);
    133 	}
    134 }
    135 
    136 socklen_t
    137 #ifdef INET6
    138 xfrd_acl_sockaddr_to(acl_options_t* acl, struct sockaddr_storage *to)
    139 #else
    140 xfrd_acl_sockaddr_to(acl_options_t* acl, struct sockaddr_in *to)
    141 #endif /* INET6 */
    142 {
    143 	unsigned int port = acl->port?acl->port:(unsigned)atoi(TCP_PORT);
    144 #ifdef INET6
    145 	return xfrd_acl_sockaddr(acl, port, to);
    146 #else
    147 	return xfrd_acl_sockaddr(acl, port, to, "to");
    148 #endif /* INET6 */
    149 }
    150 
    151 socklen_t
    152 #ifdef INET6
    153 xfrd_acl_sockaddr_frm(acl_options_t* acl, struct sockaddr_storage *frm)
    154 #else
    155 xfrd_acl_sockaddr_frm(acl_options_t* acl, struct sockaddr_in *frm)
    156 #endif /* INET6 */
    157 {
    158 	unsigned int port = acl->port?acl->port:0;
    159 #ifdef INET6
    160 	return xfrd_acl_sockaddr(acl, port, frm);
    161 #else
    162 	return xfrd_acl_sockaddr(acl, port, frm, "from");
    163 #endif /* INET6 */
    164 }
    165 
    166 void
    167 xfrd_write_soa_buffer(struct buffer* packet,
    168 	const dname_type* apex, struct xfrd_soa* soa)
    169 {
    170 	size_t rdlength_pos;
    171 	uint16_t rdlength;
    172 	buffer_write(packet, dname_name(apex), apex->name_size);
    173 
    174 	/* already in network order */
    175 	buffer_write(packet, &soa->type, sizeof(soa->type));
    176 	buffer_write(packet, &soa->klass, sizeof(soa->klass));
    177 	buffer_write(packet, &soa->ttl, sizeof(soa->ttl));
    178 	rdlength_pos = buffer_position(packet);
    179 	buffer_skip(packet, sizeof(rdlength));
    180 
    181 	/* uncompressed dnames */
    182 	buffer_write(packet, soa->prim_ns+1, soa->prim_ns[0]);
    183 	buffer_write(packet, soa->email+1, soa->email[0]);
    184 
    185 	buffer_write(packet, &soa->serial, sizeof(uint32_t));
    186 	buffer_write(packet, &soa->refresh, sizeof(uint32_t));
    187 	buffer_write(packet, &soa->retry, sizeof(uint32_t));
    188 	buffer_write(packet, &soa->expire, sizeof(uint32_t));
    189 	buffer_write(packet, &soa->minimum, sizeof(uint32_t));
    190 
    191 	/* write length of RR */
    192 	rdlength = buffer_position(packet) - rdlength_pos - sizeof(rdlength);
    193 	buffer_write_u16_at(packet, rdlength_pos, rdlength);
    194 }
    195 
    196 xfrd_tcp_t*
    197 xfrd_tcp_create(region_type* region, size_t bufsize)
    198 {
    199 	xfrd_tcp_t* tcp_state = (xfrd_tcp_t*)region_alloc(
    200 		region, sizeof(xfrd_tcp_t));
    201 	memset(tcp_state, 0, sizeof(xfrd_tcp_t));
    202 	tcp_state->packet = buffer_create(region, bufsize);
    203 	tcp_state->fd = -1;
    204 
    205 	return tcp_state;
    206 }
    207 
    208 static struct xfrd_tcp_pipeline*
    209 pipeline_find(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
    210 {
    211 	rbnode_t* sme = NULL;
    212 	struct xfrd_tcp_pipeline* r;
    213 	/* smaller buf than a full pipeline with 64kb ID array, only need
    214 	 * the front part with the key info, this front part contains the
    215 	 * members that the compare function uses. */
    216 	const size_t keysize = sizeof(struct xfrd_tcp_pipeline) -
    217 		ID_PIPE_NUM*(sizeof(struct xfrd_zone*) + sizeof(uint16_t));
    218 	/* void* type for alignment of the struct,
    219 	 * divide the keysize by ptr-size and then add one to round up */
    220 	void* buf[ (keysize / sizeof(void*)) + 1 ];
    221 	struct xfrd_tcp_pipeline* key = (struct xfrd_tcp_pipeline*)buf;
    222 	key->node.key = key;
    223 	key->ip_len = xfrd_acl_sockaddr_to(zone->master, &key->ip);
    224 	key->num_unused = ID_PIPE_NUM;
    225 	/* lookup existing tcp transfer to the master with highest unused */
    226 	if(rbtree_find_less_equal(set->pipetree, key, &sme)) {
    227 		/* exact match, strange, fully unused tcp cannot be open */
    228 		assert(0);
    229 	}
    230 	if(!sme)
    231 		return NULL;
    232 	r = (struct xfrd_tcp_pipeline*)sme->key;
    233 	/* <= key pointed at, is the master correct ? */
    234 	if(r->ip_len != key->ip_len)
    235 		return NULL;
    236 	if(memcmp(&r->ip, &key->ip, key->ip_len) != 0)
    237 		return NULL;
    238 	/* correct master, is there a slot free for this transfer? */
    239 	if(r->num_unused == 0)
    240 		return NULL;
    241 	return r;
    242 }
    243 
    244 /* remove zone from tcp waiting list */
    245 static void
    246 tcp_zone_waiting_list_popfirst(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
    247 {
    248 	assert(zone->tcp_waiting);
    249 	set->tcp_waiting_first = zone->tcp_waiting_next;
    250 	if(zone->tcp_waiting_next)
    251 		zone->tcp_waiting_next->tcp_waiting_prev = NULL;
    252 	else	set->tcp_waiting_last = 0;
    253 	zone->tcp_waiting_next = 0;
    254 	zone->tcp_waiting = 0;
    255 }
    256 
    257 /* remove zone from tcp pipe write-wait list */
    258 static void
    259 tcp_pipe_sendlist_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
    260 {
    261 	if(zone->in_tcp_send) {
    262 		if(zone->tcp_send_prev)
    263 			zone->tcp_send_prev->tcp_send_next=zone->tcp_send_next;
    264 		else	tp->tcp_send_first=zone->tcp_send_next;
    265 		if(zone->tcp_send_next)
    266 			zone->tcp_send_next->tcp_send_prev=zone->tcp_send_prev;
    267 		else	tp->tcp_send_last=zone->tcp_send_prev;
    268 		zone->in_tcp_send = 0;
    269 	}
    270 }
    271 
    272 /* remove first from write-wait list */
    273 static void
    274 tcp_pipe_sendlist_popfirst(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
    275 {
    276 	tp->tcp_send_first = zone->tcp_send_next;
    277 	if(tp->tcp_send_first)
    278 		tp->tcp_send_first->tcp_send_prev = NULL;
    279 	else	tp->tcp_send_last = NULL;
    280 	zone->in_tcp_send = 0;
    281 }
    282 
    283 /* remove zone from tcp pipe ID map */
    284 static void
    285 tcp_pipe_id_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
    286 {
    287 	assert(tp->num_unused < ID_PIPE_NUM && tp->num_unused >= 0);
    288 	assert(tp->id[zone->query_id] == zone);
    289 	tp->id[zone->query_id] = NULL;
    290 	tp->unused[tp->num_unused] = zone->query_id;
    291 	/* must remove and re-add for sort order in tree */
    292 	(void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
    293 	tp->num_unused++;
    294 	(void)rbtree_insert(xfrd->tcp_set->pipetree, &tp->node);
    295 }
    296 
    297 /* stop the tcp pipe (and all its zones need to retry) */
    298 static void
    299 xfrd_tcp_pipe_stop(struct xfrd_tcp_pipeline* tp)
    300 {
    301 	int i, conn = -1;
    302 	assert(tp->num_unused < ID_PIPE_NUM); /* at least one 'in-use' */
    303 	assert(ID_PIPE_NUM - tp->num_unused > tp->num_skip); /* at least one 'nonskip' */
    304 	/* need to retry for all the zones connected to it */
    305 	/* these could use different lists and go to a different nextmaster*/
    306 	for(i=0; i<ID_PIPE_NUM; i++) {
    307 		if(tp->id[i] && tp->id[i] != TCP_NULL_SKIP) {
    308 			xfrd_zone_t* zone = tp->id[i];
    309 			conn = zone->tcp_conn;
    310 			zone->tcp_conn = -1;
    311 			zone->tcp_waiting = 0;
    312 			tcp_pipe_sendlist_remove(tp, zone);
    313 			tcp_pipe_id_remove(tp, zone);
    314 			xfrd_set_refresh_now(zone);
    315 		}
    316 	}
    317 	assert(conn != -1);
    318 	/* now release the entire tcp pipe */
    319 	xfrd_tcp_pipe_release(xfrd->tcp_set, tp, conn);
    320 }
    321 
    322 static void
    323 tcp_pipe_reset_timeout(struct xfrd_tcp_pipeline* tp)
    324 {
    325 	int fd = tp->handler.ev_fd;
    326 	struct timeval tv;
    327 	tv.tv_sec = xfrd->tcp_set->tcp_timeout;
    328 	tv.tv_usec = 0;
    329 	if(tp->handler_added)
    330 		event_del(&tp->handler);
    331 	event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|
    332 		(tp->tcp_send_first?EV_WRITE:0), xfrd_handle_tcp_pipe, tp);
    333 	if(event_base_set(xfrd->event_base, &tp->handler) != 0)
    334 		log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
    335 	if(event_add(&tp->handler, &tv) != 0)
    336 		log_msg(LOG_ERR, "xfrd tcp: event_add failed");
    337 	tp->handler_added = 1;
    338 }
    339 
    340 /* handle event from fd of tcp pipe */
    341 void
    342 xfrd_handle_tcp_pipe(int ATTR_UNUSED(fd), short event, void* arg)
    343 {
    344 	struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)arg;
    345 	if((event & EV_WRITE)) {
    346 		tcp_pipe_reset_timeout(tp);
    347 		if(tp->tcp_send_first) {
    348 			DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp write, zone %s",
    349 				tp->tcp_send_first->apex_str));
    350 			xfrd_tcp_write(tp, tp->tcp_send_first);
    351 		}
    352 	}
    353 	if((event & EV_READ) && tp->handler_added) {
    354 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp read"));
    355 		tcp_pipe_reset_timeout(tp);
    356 		xfrd_tcp_read(tp);
    357 	}
    358 	if((event & EV_TIMEOUT) && tp->handler_added) {
    359 		/* tcp connection timed out */
    360 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp timeout"));
    361 		xfrd_tcp_pipe_stop(tp);
    362 	}
    363 }
    364 
    365 /* add a zone to the pipeline, it starts to want to write its query */
    366 static void
    367 pipeline_setup_new_zone(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp,
    368 	xfrd_zone_t* zone)
    369 {
    370 	/* assign the ID */
    371 	int idx;
    372 	assert(tp->num_unused > 0);
    373 	/* we pick a random ID, even though it is TCP anyway */
    374 	idx = random_generate(tp->num_unused);
    375 	zone->query_id = tp->unused[idx];
    376 	tp->unused[idx] = tp->unused[tp->num_unused-1];
    377 	tp->id[zone->query_id] = zone;
    378 	/* decrement unused counter, and fixup tree */
    379 	(void)rbtree_delete(set->pipetree, &tp->node);
    380 	tp->num_unused--;
    381 	(void)rbtree_insert(set->pipetree, &tp->node);
    382 
    383 	/* add to sendlist, at end */
    384 	zone->tcp_send_next = NULL;
    385 	zone->tcp_send_prev = tp->tcp_send_last;
    386 	zone->in_tcp_send = 1;
    387 	if(tp->tcp_send_last)
    388 		tp->tcp_send_last->tcp_send_next = zone;
    389 	else	tp->tcp_send_first = zone;
    390 	tp->tcp_send_last = zone;
    391 
    392 	/* is it first in line? */
    393 	if(tp->tcp_send_first == zone) {
    394 		xfrd_tcp_setup_write_packet(tp, zone);
    395 		/* add write to event handler */
    396 		tcp_pipe_reset_timeout(tp);
    397 	}
    398 }
    399 
    400 void
    401 xfrd_tcp_obtain(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
    402 {
    403 	struct xfrd_tcp_pipeline* tp;
    404 	assert(zone->tcp_conn == -1);
    405 	assert(zone->tcp_waiting == 0);
    406 
    407 	if(set->tcp_count < XFRD_MAX_TCP) {
    408 		int i;
    409 		assert(!set->tcp_waiting_first);
    410 		set->tcp_count ++;
    411 		/* find a free tcp_buffer */
    412 		for(i=0; i<XFRD_MAX_TCP; i++) {
    413 			if(set->tcp_state[i]->tcp_r->fd == -1) {
    414 				zone->tcp_conn = i;
    415 				break;
    416 			}
    417 		}
    418 		/** What if there is no free tcp_buffer? return; */
    419 		if (zone->tcp_conn < 0) {
    420 			return;
    421 		}
    422 
    423 		tp = set->tcp_state[zone->tcp_conn];
    424 		zone->tcp_waiting = 0;
    425 
    426 		/* stop udp use (if any) */
    427 		if(zone->zone_handler.ev_fd != -1)
    428 			xfrd_udp_release(zone);
    429 
    430 		if(!xfrd_tcp_open(set, tp, zone)) {
    431 			zone->tcp_conn = -1;
    432 			set->tcp_count --;
    433 			xfrd_set_refresh_now(zone);
    434 			return;
    435 		}
    436 		/* ip and ip_len set by tcp_open */
    437 		tp->node.key = tp;
    438 		tp->num_unused = ID_PIPE_NUM;
    439 		tp->num_skip = 0;
    440 		tp->tcp_send_first = NULL;
    441 		tp->tcp_send_last = NULL;
    442 		memset(tp->id, 0, sizeof(tp->id));
    443 		for(i=0; i<ID_PIPE_NUM; i++) {
    444 			tp->unused[i] = i;
    445 		}
    446 
    447 		/* insert into tree */
    448 		(void)rbtree_insert(set->pipetree, &tp->node);
    449 		xfrd_deactivate_zone(zone);
    450 		xfrd_unset_timer(zone);
    451 		pipeline_setup_new_zone(set, tp, zone);
    452 		return;
    453 	}
    454 	/* check for a pipeline to the same master with unused ID */
    455 	if((tp = pipeline_find(set, zone))!= NULL) {
    456 		int i;
    457 		if(zone->zone_handler.ev_fd != -1)
    458 			xfrd_udp_release(zone);
    459 		for(i=0; i<XFRD_MAX_TCP; i++) {
    460 			if(set->tcp_state[i] == tp)
    461 				zone->tcp_conn = i;
    462 		}
    463 		xfrd_deactivate_zone(zone);
    464 		xfrd_unset_timer(zone);
    465 		pipeline_setup_new_zone(set, tp, zone);
    466 		return;
    467 	}
    468 
    469 	/* wait, at end of line */
    470 	DEBUG(DEBUG_XFRD,2, (LOG_INFO, "xfrd: max number of tcp "
    471 		"connections (%d) reached.", XFRD_MAX_TCP));
    472 	zone->tcp_waiting_next = 0;
    473 	zone->tcp_waiting_prev = set->tcp_waiting_last;
    474 	zone->tcp_waiting = 1;
    475 	if(!set->tcp_waiting_last) {
    476 		set->tcp_waiting_first = zone;
    477 		set->tcp_waiting_last = zone;
    478 	} else {
    479 		set->tcp_waiting_last->tcp_waiting_next = zone;
    480 		set->tcp_waiting_last = zone;
    481 	}
    482 	xfrd_deactivate_zone(zone);
    483 	xfrd_unset_timer(zone);
    484 }
    485 
    486 int
    487 xfrd_tcp_open(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp,
    488 	xfrd_zone_t* zone)
    489 {
    490 	int fd, family, conn;
    491 	struct timeval tv;
    492 	assert(zone->tcp_conn != -1);
    493 
    494 	/* if there is no next master, fallback to use the first one */
    495 	/* but there really should be a master set */
    496 	if(!zone->master) {
    497 		zone->master = zone->zone_options->pattern->request_xfr;
    498 		zone->master_num = 0;
    499 	}
    500 
    501 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s open tcp conn to %s",
    502 		zone->apex_str, zone->master->ip_address_spec));
    503 	tp->tcp_r->is_reading = 1;
    504 	tp->tcp_r->total_bytes = 0;
    505 	tp->tcp_r->msglen = 0;
    506 	buffer_clear(tp->tcp_r->packet);
    507 	tp->tcp_w->is_reading = 0;
    508 	tp->tcp_w->total_bytes = 0;
    509 	tp->tcp_w->msglen = 0;
    510 	tp->connection_established = 0;
    511 
    512 	if(zone->master->is_ipv6) {
    513 #ifdef INET6
    514 		family = PF_INET6;
    515 #else
    516 		xfrd_set_refresh_now(zone);
    517 		return 0;
    518 #endif
    519 	} else {
    520 		family = PF_INET;
    521 	}
    522 	fd = socket(family, SOCK_STREAM, IPPROTO_TCP);
    523 	if(fd == -1) {
    524 		log_msg(LOG_ERR, "xfrd: %s cannot create tcp socket: %s",
    525 			zone->master->ip_address_spec, strerror(errno));
    526 		xfrd_set_refresh_now(zone);
    527 		return 0;
    528 	}
    529 	if(fcntl(fd, F_SETFL, O_NONBLOCK) == -1) {
    530 		log_msg(LOG_ERR, "xfrd: fcntl failed: %s", strerror(errno));
    531 		close(fd);
    532 		xfrd_set_refresh_now(zone);
    533 		return 0;
    534 	}
    535 
    536 	if(xfrd->nsd->outgoing_tcp_mss > 0) {
    537 #if defined(IPPROTO_TCP) && defined(TCP_MAXSEG)
    538 		if(setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG,
    539 			(void*)&xfrd->nsd->outgoing_tcp_mss,
    540 			sizeof(xfrd->nsd->outgoing_tcp_mss)) < 0) {
    541 			log_msg(LOG_ERR, "xfrd: setsockopt(TCP_MAXSEG)"
    542 					"failed: %s", strerror(errno));
    543 		}
    544 #else
    545 		log_msg(LOG_ERR, "setsockopt(TCP_MAXSEG) unsupported");
    546 #endif
    547 	}
    548 
    549 	tp->ip_len = xfrd_acl_sockaddr_to(zone->master, &tp->ip);
    550 
    551 	/* bind it */
    552 	if (!xfrd_bind_local_interface(fd, zone->zone_options->pattern->
    553 		outgoing_interface, zone->master, 1)) {
    554 		close(fd);
    555 		xfrd_set_refresh_now(zone);
    556 		return 0;
    557         }
    558 
    559 	conn = connect(fd, (struct sockaddr*)&tp->ip, tp->ip_len);
    560 	if (conn == -1 && errno != EINPROGRESS) {
    561 		log_msg(LOG_ERR, "xfrd: connect %s failed: %s",
    562 			zone->master->ip_address_spec, strerror(errno));
    563 		close(fd);
    564 		xfrd_set_refresh_now(zone);
    565 		return 0;
    566 	}
    567 	tp->tcp_r->fd = fd;
    568 	tp->tcp_w->fd = fd;
    569 
    570 	/* set the tcp pipe event */
    571 	if(tp->handler_added)
    572 		event_del(&tp->handler);
    573 	event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|EV_WRITE,
    574 		xfrd_handle_tcp_pipe, tp);
    575 	if(event_base_set(xfrd->event_base, &tp->handler) != 0)
    576 		log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
    577 	tv.tv_sec = set->tcp_timeout;
    578 	tv.tv_usec = 0;
    579 	if(event_add(&tp->handler, &tv) != 0)
    580 		log_msg(LOG_ERR, "xfrd tcp: event_add failed");
    581 	tp->handler_added = 1;
    582 	return 1;
    583 }
    584 
    585 void
    586 xfrd_tcp_setup_write_packet(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
    587 {
    588 	xfrd_tcp_t* tcp = tp->tcp_w;
    589 	assert(zone->tcp_conn != -1);
    590 	assert(zone->tcp_waiting == 0);
    591 	/* start AXFR or IXFR for the zone */
    592 	if(zone->soa_disk_acquired == 0 || zone->master->use_axfr_only ||
    593 		zone->master->ixfr_disabled ||
    594 		/* if zone expired, after the first round, do not ask for
    595 		 * IXFR any more, but full AXFR (of any serial number) */
    596 		(zone->state == xfrd_zone_expired && zone->round_num != 0)) {
    597 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request full zone transfer "
    598 						"(AXFR) for %s to %s",
    599 			zone->apex_str, zone->master->ip_address_spec));
    600 
    601 		xfrd_setup_packet(tcp->packet, TYPE_AXFR, CLASS_IN, zone->apex,
    602 			zone->query_id);
    603 	} else {
    604 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request incremental zone "
    605 						"transfer (IXFR) for %s to %s",
    606 			zone->apex_str, zone->master->ip_address_spec));
    607 
    608 		xfrd_setup_packet(tcp->packet, TYPE_IXFR, CLASS_IN, zone->apex,
    609 			zone->query_id);
    610         	NSCOUNT_SET(tcp->packet, 1);
    611 		xfrd_write_soa_buffer(tcp->packet, zone->apex, &zone->soa_disk);
    612 	}
    613 	/* old transfer needs to be removed still? */
    614 	if(zone->msg_seq_nr)
    615 		xfrd_unlink_xfrfile(xfrd->nsd, zone->xfrfilenumber);
    616 	zone->msg_seq_nr = 0;
    617 	zone->msg_rr_count = 0;
    618 	if(zone->master->key_options && zone->master->key_options->tsig_key) {
    619 		xfrd_tsig_sign_request(tcp->packet, &zone->tsig, zone->master);
    620 	}
    621 	buffer_flip(tcp->packet);
    622 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "sent tcp query with ID %d", zone->query_id));
    623 	tcp->msglen = buffer_limit(tcp->packet);
    624 	tcp->total_bytes = 0;
    625 }
    626 
    627 static void
    628 tcp_conn_ready_for_reading(xfrd_tcp_t* tcp)
    629 {
    630 	tcp->total_bytes = 0;
    631 	tcp->msglen = 0;
    632 	buffer_clear(tcp->packet);
    633 }
    634 
    635 int conn_write(xfrd_tcp_t* tcp)
    636 {
    637 	ssize_t sent;
    638 
    639 	if(tcp->total_bytes < sizeof(tcp->msglen)) {
    640 		uint16_t sendlen = htons(tcp->msglen);
    641 		sent = write(tcp->fd,
    642 			(const char*)&sendlen + tcp->total_bytes,
    643 			sizeof(tcp->msglen) - tcp->total_bytes);
    644 
    645 		if(sent == -1) {
    646 			if(errno == EAGAIN || errno == EINTR) {
    647 				/* write would block, try later */
    648 				return 0;
    649 			} else {
    650 				return -1;
    651 			}
    652 		}
    653 
    654 		tcp->total_bytes += sent;
    655 		if(tcp->total_bytes < sizeof(tcp->msglen)) {
    656 			/* incomplete write, resume later */
    657 			return 0;
    658 		}
    659 		assert(tcp->total_bytes == sizeof(tcp->msglen));
    660 	}
    661 
    662 	assert(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen));
    663 
    664 	sent = write(tcp->fd,
    665 		buffer_current(tcp->packet),
    666 		buffer_remaining(tcp->packet));
    667 	if(sent == -1) {
    668 		if(errno == EAGAIN || errno == EINTR) {
    669 			/* write would block, try later */
    670 			return 0;
    671 		} else {
    672 			return -1;
    673 		}
    674 	}
    675 
    676 	buffer_skip(tcp->packet, sent);
    677 	tcp->total_bytes += sent;
    678 
    679 	if(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen)) {
    680 		/* more to write when socket becomes writable again */
    681 		return 0;
    682 	}
    683 
    684 	assert(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen));
    685 	return 1;
    686 }
    687 
    688 void
    689 xfrd_tcp_write(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
    690 {
    691 	int ret;
    692 	xfrd_tcp_t* tcp = tp->tcp_w;
    693 	assert(zone->tcp_conn != -1);
    694 	assert(zone == tp->tcp_send_first);
    695 	/* see if for non-established connection, there is a connect error */
    696 	if(!tp->connection_established) {
    697 		/* check for pending error from nonblocking connect */
    698 		/* from Stevens, unix network programming, vol1, 3rd ed, p450 */
    699 		int error = 0;
    700 		socklen_t len = sizeof(error);
    701 		if(getsockopt(tcp->fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0){
    702 			error = errno; /* on solaris errno is error */
    703 		}
    704 		if(error == EINPROGRESS || error == EWOULDBLOCK)
    705 			return; /* try again later */
    706 		if(error != 0) {
    707 			log_msg(LOG_ERR, "%s: Could not tcp connect to %s: %s",
    708 				zone->apex_str, zone->master->ip_address_spec,
    709 				strerror(error));
    710 			xfrd_tcp_pipe_stop(tp);
    711 			return;
    712 		}
    713 	}
    714 	ret = conn_write(tcp);
    715 	if(ret == -1) {
    716 		log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
    717 		xfrd_tcp_pipe_stop(tp);
    718 		return;
    719 	}
    720 	if(tcp->total_bytes != 0 && !tp->connection_established)
    721 		tp->connection_established = 1;
    722 	if(ret == 0) {
    723 		return; /* write again later */
    724 	}
    725 	/* done writing this message */
    726 
    727 	/* remove first zone from sendlist */
    728 	tcp_pipe_sendlist_popfirst(tp, zone);
    729 
    730 	/* see if other zone wants to write; init; let it write (now) */
    731 	/* and use a loop, because 64k stack calls is a too much */
    732 	while(tp->tcp_send_first) {
    733 		/* setup to write for this zone */
    734 		xfrd_tcp_setup_write_packet(tp, tp->tcp_send_first);
    735 		/* attempt to write for this zone (if success, continue loop)*/
    736 		ret = conn_write(tcp);
    737 		if(ret == -1) {
    738 			log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
    739 			xfrd_tcp_pipe_stop(tp);
    740 			return;
    741 		}
    742 		if(ret == 0)
    743 			return; /* write again later */
    744 		tcp_pipe_sendlist_popfirst(tp, tp->tcp_send_first);
    745 	}
    746 
    747 	/* if sendlist empty, remove WRITE from event */
    748 
    749 	/* listen to READ, and not WRITE events */
    750 	assert(tp->tcp_send_first == NULL);
    751 	tcp_pipe_reset_timeout(tp);
    752 }
    753 
    754 int
    755 conn_read(xfrd_tcp_t* tcp)
    756 {
    757 	ssize_t received;
    758 	/* receive leading packet length bytes */
    759 	if(tcp->total_bytes < sizeof(tcp->msglen)) {
    760 		received = read(tcp->fd,
    761 			(char*) &tcp->msglen + tcp->total_bytes,
    762 			sizeof(tcp->msglen) - tcp->total_bytes);
    763 		if(received == -1) {
    764 			if(errno == EAGAIN || errno == EINTR) {
    765 				/* read would block, try later */
    766 				return 0;
    767 			} else {
    768 #ifdef ECONNRESET
    769 				if (verbosity >= 2 || errno != ECONNRESET)
    770 #endif /* ECONNRESET */
    771 				log_msg(LOG_ERR, "tcp read sz: %s", strerror(errno));
    772 				return -1;
    773 			}
    774 		} else if(received == 0) {
    775 			/* EOF */
    776 			return -1;
    777 		}
    778 		tcp->total_bytes += received;
    779 		if(tcp->total_bytes < sizeof(tcp->msglen)) {
    780 			/* not complete yet, try later */
    781 			return 0;
    782 		}
    783 
    784 		assert(tcp->total_bytes == sizeof(tcp->msglen));
    785 		tcp->msglen = ntohs(tcp->msglen);
    786 
    787 		if(tcp->msglen == 0) {
    788 			buffer_set_limit(tcp->packet, tcp->msglen);
    789 			return 1;
    790 		}
    791 		if(tcp->msglen > buffer_capacity(tcp->packet)) {
    792 			log_msg(LOG_ERR, "buffer too small, dropping connection");
    793 			return 0;
    794 		}
    795 		buffer_set_limit(tcp->packet, tcp->msglen);
    796 	}
    797 
    798 	assert(buffer_remaining(tcp->packet) > 0);
    799 
    800 	received = read(tcp->fd, buffer_current(tcp->packet),
    801 		buffer_remaining(tcp->packet));
    802 	if(received == -1) {
    803 		if(errno == EAGAIN || errno == EINTR) {
    804 			/* read would block, try later */
    805 			return 0;
    806 		} else {
    807 #ifdef ECONNRESET
    808 			if (verbosity >= 2 || errno != ECONNRESET)
    809 #endif /* ECONNRESET */
    810 			log_msg(LOG_ERR, "tcp read %s", strerror(errno));
    811 			return -1;
    812 		}
    813 	} else if(received == 0) {
    814 		/* EOF */
    815 		return -1;
    816 	}
    817 
    818 	tcp->total_bytes += received;
    819 	buffer_skip(tcp->packet, received);
    820 
    821 	if(buffer_remaining(tcp->packet) > 0) {
    822 		/* not complete yet, wait for more */
    823 		return 0;
    824 	}
    825 
    826 	/* completed */
    827 	assert(buffer_position(tcp->packet) == tcp->msglen);
    828 	return 1;
    829 }
    830 
    831 void
    832 xfrd_tcp_read(struct xfrd_tcp_pipeline* tp)
    833 {
    834 	xfrd_zone_t* zone;
    835 	xfrd_tcp_t* tcp = tp->tcp_r;
    836 	int ret;
    837 	enum xfrd_packet_result pkt_result;
    838 
    839 	ret = conn_read(tcp);
    840 	if(ret == -1) {
    841 		xfrd_tcp_pipe_stop(tp);
    842 		return;
    843 	}
    844 	if(ret == 0)
    845 		return;
    846 	/* completed msg */
    847 	buffer_flip(tcp->packet);
    848 	/* see which ID number it is, if skip, handle skip, NULL: warn */
    849 	if(tcp->msglen < QHEADERSZ) {
    850 		/* too short for DNS header, skip it */
    851 		DEBUG(DEBUG_XFRD,1, (LOG_INFO,
    852 			"xfrd: tcp skip response that is too short"));
    853 		tcp_conn_ready_for_reading(tcp);
    854 		return;
    855 	}
    856 	zone = tp->id[ID(tcp->packet)];
    857 	if(!zone || zone == TCP_NULL_SKIP) {
    858 		/* no zone for this id? skip it */
    859 		DEBUG(DEBUG_XFRD,1, (LOG_INFO,
    860 			"xfrd: tcp skip response with %s ID",
    861 			zone?"set-to-skip":"unknown"));
    862 		tcp_conn_ready_for_reading(tcp);
    863 		return;
    864 	}
    865 	assert(zone->tcp_conn != -1);
    866 
    867 	/* handle message for zone */
    868 	pkt_result = xfrd_handle_received_xfr_packet(zone, tcp->packet);
    869 	/* setup for reading the next packet on this connection */
    870 	tcp_conn_ready_for_reading(tcp);
    871 	switch(pkt_result) {
    872 		case xfrd_packet_more:
    873 			/* wait for next packet */
    874 			break;
    875 		case xfrd_packet_newlease:
    876 			/* set to skip if more packets with this ID */
    877 			tp->id[zone->query_id] = TCP_NULL_SKIP;
    878 			tp->num_skip++;
    879 			/* fall through to remove zone from tp */
    880 		case xfrd_packet_transfer:
    881 			if(zone->zone_options->pattern->multi_master_check) {
    882 				xfrd_tcp_release(xfrd->tcp_set, zone);
    883 				xfrd_make_request(zone);
    884 				break;
    885 			}
    886 			xfrd_tcp_release(xfrd->tcp_set, zone);
    887 			assert(zone->round_num == -1);
    888 			break;
    889 		case xfrd_packet_notimpl:
    890 			xfrd_disable_ixfr(zone);
    891 			xfrd_tcp_release(xfrd->tcp_set, zone);
    892 			/* query next server */
    893 			xfrd_make_request(zone);
    894 			break;
    895 		case xfrd_packet_bad:
    896 		case xfrd_packet_tcp:
    897 		default:
    898 			/* set to skip if more packets with this ID */
    899 			tp->id[zone->query_id] = TCP_NULL_SKIP;
    900 			tp->num_skip++;
    901 			xfrd_tcp_release(xfrd->tcp_set, zone);
    902 			/* query next server */
    903 			xfrd_make_request(zone);
    904 			break;
    905 	}
    906 }
    907 
    908 void
    909 xfrd_tcp_release(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
    910 {
    911 	int conn = zone->tcp_conn;
    912 	struct xfrd_tcp_pipeline* tp = set->tcp_state[conn];
    913 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s released tcp conn to %s",
    914 		zone->apex_str, zone->master->ip_address_spec));
    915 	assert(zone->tcp_conn != -1);
    916 	assert(zone->tcp_waiting == 0);
    917 	zone->tcp_conn = -1;
    918 	zone->tcp_waiting = 0;
    919 
    920 	/* remove from tcp_send list */
    921 	tcp_pipe_sendlist_remove(tp, zone);
    922 	/* remove it from the ID list */
    923 	if(tp->id[zone->query_id] != TCP_NULL_SKIP)
    924 		tcp_pipe_id_remove(tp, zone);
    925 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: released tcp pipe now %d unused",
    926 		tp->num_unused));
    927 	/* if pipe was full, but no more, then see if waiting element is
    928 	 * for the same master, and can fill the unused ID */
    929 	if(tp->num_unused == 1 && set->tcp_waiting_first) {
    930 #ifdef INET6
    931 		struct sockaddr_storage to;
    932 #else
    933 		struct sockaddr_in to;
    934 #endif
    935 		socklen_t to_len = xfrd_acl_sockaddr_to(
    936 			set->tcp_waiting_first->master, &to);
    937 		if(to_len == tp->ip_len && memcmp(&to, &tp->ip, to_len) == 0) {
    938 			/* use this connection for the waiting zone */
    939 			zone = set->tcp_waiting_first;
    940 			assert(zone->tcp_conn == -1);
    941 			zone->tcp_conn = conn;
    942 			tcp_zone_waiting_list_popfirst(set, zone);
    943 			if(zone->zone_handler.ev_fd != -1)
    944 				xfrd_udp_release(zone);
    945 			xfrd_unset_timer(zone);
    946 			pipeline_setup_new_zone(set, tp, zone);
    947 			return;
    948 		}
    949 		/* waiting zone did not go to same server */
    950 	}
    951 
    952 	/* if all unused, or only skipped leftover, close the pipeline */
    953 	if(tp->num_unused >= ID_PIPE_NUM || tp->num_skip >= ID_PIPE_NUM - tp->num_unused)
    954 		xfrd_tcp_pipe_release(set, tp, conn);
    955 }
    956 
    957 void
    958 xfrd_tcp_pipe_release(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp,
    959 	int conn)
    960 {
    961 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: tcp pipe released"));
    962 	/* one handler per tcp pipe */
    963 	if(tp->handler_added)
    964 		event_del(&tp->handler);
    965 	tp->handler_added = 0;
    966 
    967 	/* fd in tcp_r and tcp_w is the same, close once */
    968 	if(tp->tcp_r->fd != -1)
    969 		close(tp->tcp_r->fd);
    970 	tp->tcp_r->fd = -1;
    971 	tp->tcp_w->fd = -1;
    972 
    973 	/* remove from pipetree */
    974 	(void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
    975 
    976 	/* a waiting zone can use the free tcp slot (to another server) */
    977 	/* if that zone fails to set-up or connect, we try to start the next
    978 	 * waiting zone in the list */
    979 	while(set->tcp_count == XFRD_MAX_TCP && set->tcp_waiting_first) {
    980 		int i;
    981 
    982 		/* pop first waiting process */
    983 		xfrd_zone_t* zone = set->tcp_waiting_first;
    984 		/* start it */
    985 		assert(zone->tcp_conn == -1);
    986 		zone->tcp_conn = conn;
    987 		tcp_zone_waiting_list_popfirst(set, zone);
    988 
    989 		/* stop udp (if any) */
    990 		if(zone->zone_handler.ev_fd != -1)
    991 			xfrd_udp_release(zone);
    992 		if(!xfrd_tcp_open(set, tp, zone)) {
    993 			zone->tcp_conn = -1;
    994 			xfrd_set_refresh_now(zone);
    995 			/* try to start the next zone (if any) */
    996 			continue;
    997 		}
    998 		/* re-init this tcppipe */
    999 		/* ip and ip_len set by tcp_open */
   1000 		tp->node.key = tp;
   1001 		tp->num_unused = ID_PIPE_NUM;
   1002 		tp->num_skip = 0;
   1003 		tp->tcp_send_first = NULL;
   1004 		tp->tcp_send_last = NULL;
   1005 		memset(tp->id, 0, sizeof(tp->id));
   1006 		for(i=0; i<ID_PIPE_NUM; i++) {
   1007 			tp->unused[i] = i;
   1008 		}
   1009 
   1010 		/* insert into tree */
   1011 		(void)rbtree_insert(set->pipetree, &tp->node);
   1012 		/* setup write */
   1013 		xfrd_unset_timer(zone);
   1014 		pipeline_setup_new_zone(set, tp, zone);
   1015 		/* started a task, no need for cleanups, so return */
   1016 		return;
   1017 	}
   1018 	/* no task to start, cleanup */
   1019 	assert(!set->tcp_waiting_first);
   1020 	set->tcp_count --;
   1021 	assert(set->tcp_count >= 0);
   1022 }
   1023 
   1024