Home | History | Annotate | Line # | Download | only in dist
xfrd-tcp.c revision 1.1.1.3
      1      1.1  christos /*
      2      1.1  christos  * xfrd-tcp.c - XFR (transfer) Daemon TCP system source file. Manages tcp conn.
      3      1.1  christos  *
      4      1.1  christos  * Copyright (c) 2001-2006, NLnet Labs. All rights reserved.
      5      1.1  christos  *
      6      1.1  christos  * See LICENSE for the license.
      7      1.1  christos  *
      8      1.1  christos  */
      9      1.1  christos 
     10      1.1  christos #include "config.h"
     11      1.1  christos #include <assert.h>
     12      1.1  christos #include <errno.h>
     13      1.1  christos #include <fcntl.h>
     14      1.1  christos #include <unistd.h>
     15      1.1  christos #include <stdlib.h>
     16  1.1.1.2  christos #include <sys/uio.h>
     17      1.1  christos #include "nsd.h"
     18      1.1  christos #include "xfrd-tcp.h"
     19      1.1  christos #include "buffer.h"
     20      1.1  christos #include "packet.h"
     21      1.1  christos #include "dname.h"
     22      1.1  christos #include "options.h"
     23      1.1  christos #include "namedb.h"
     24      1.1  christos #include "xfrd.h"
     25      1.1  christos #include "xfrd-disk.h"
     26      1.1  christos #include "util.h"
     27      1.1  christos 
     28      1.1  christos /* sort tcppipe, first on IP address, for an IPaddresss, sort on num_unused */
     29      1.1  christos static int
     30      1.1  christos xfrd_pipe_cmp(const void* a, const void* b)
     31      1.1  christos {
     32      1.1  christos 	const struct xfrd_tcp_pipeline* x = (struct xfrd_tcp_pipeline*)a;
     33      1.1  christos 	const struct xfrd_tcp_pipeline* y = (struct xfrd_tcp_pipeline*)b;
     34      1.1  christos 	int r;
     35      1.1  christos 	if(x == y)
     36      1.1  christos 		return 0;
     37      1.1  christos 	if(y->ip_len != x->ip_len)
     38      1.1  christos 		/* subtraction works because nonnegative and small numbers */
     39      1.1  christos 		return (int)y->ip_len - (int)x->ip_len;
     40      1.1  christos 	r = memcmp(&x->ip, &y->ip, x->ip_len);
     41      1.1  christos 	if(r != 0)
     42      1.1  christos 		return r;
     43      1.1  christos 	/* sort that num_unused is sorted ascending, */
     44      1.1  christos 	if(x->num_unused != y->num_unused) {
     45      1.1  christos 		return (x->num_unused < y->num_unused) ? -1 : 1;
     46      1.1  christos 	}
     47      1.1  christos 	/* different pipelines are different still, even with same numunused*/
     48      1.1  christos 	return (uintptr_t)x < (uintptr_t)y ? -1 : 1;
     49      1.1  christos }
     50      1.1  christos 
     51  1.1.1.2  christos struct xfrd_tcp_set* xfrd_tcp_set_create(struct region* region)
     52      1.1  christos {
     53      1.1  christos 	int i;
     54  1.1.1.2  christos 	struct xfrd_tcp_set* tcp_set = region_alloc(region,
     55  1.1.1.2  christos 		sizeof(struct xfrd_tcp_set));
     56  1.1.1.2  christos 	memset(tcp_set, 0, sizeof(struct xfrd_tcp_set));
     57      1.1  christos 	tcp_set->tcp_count = 0;
     58      1.1  christos 	tcp_set->tcp_waiting_first = 0;
     59      1.1  christos 	tcp_set->tcp_waiting_last = 0;
     60      1.1  christos 	for(i=0; i<XFRD_MAX_TCP; i++)
     61      1.1  christos 		tcp_set->tcp_state[i] = xfrd_tcp_pipeline_create(region);
     62      1.1  christos 	tcp_set->pipetree = rbtree_create(region, &xfrd_pipe_cmp);
     63      1.1  christos 	return tcp_set;
     64      1.1  christos }
     65      1.1  christos 
     66      1.1  christos struct xfrd_tcp_pipeline*
     67      1.1  christos xfrd_tcp_pipeline_create(region_type* region)
     68      1.1  christos {
     69      1.1  christos 	int i;
     70      1.1  christos 	struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)
     71      1.1  christos 		region_alloc_zero(region, sizeof(*tp));
     72      1.1  christos 	tp->num_unused = ID_PIPE_NUM;
     73      1.1  christos 	assert(sizeof(tp->unused)/sizeof(tp->unused[0]) == ID_PIPE_NUM);
     74      1.1  christos 	for(i=0; i<ID_PIPE_NUM; i++)
     75      1.1  christos 		tp->unused[i] = (uint16_t)i;
     76      1.1  christos 	tp->tcp_r = xfrd_tcp_create(region, QIOBUFSZ);
     77      1.1  christos 	tp->tcp_w = xfrd_tcp_create(region, 512);
     78      1.1  christos 	return tp;
     79      1.1  christos }
     80      1.1  christos 
     81      1.1  christos void
     82      1.1  christos xfrd_setup_packet(buffer_type* packet,
     83      1.1  christos 	uint16_t type, uint16_t klass, const dname_type* dname, uint16_t qid)
     84      1.1  christos {
     85      1.1  christos 	/* Set up the header */
     86      1.1  christos 	buffer_clear(packet);
     87      1.1  christos 	ID_SET(packet, qid);
     88      1.1  christos 	FLAGS_SET(packet, 0);
     89      1.1  christos 	OPCODE_SET(packet, OPCODE_QUERY);
     90      1.1  christos 	QDCOUNT_SET(packet, 1);
     91      1.1  christos 	ANCOUNT_SET(packet, 0);
     92      1.1  christos 	NSCOUNT_SET(packet, 0);
     93      1.1  christos 	ARCOUNT_SET(packet, 0);
     94      1.1  christos 	buffer_skip(packet, QHEADERSZ);
     95      1.1  christos 
     96      1.1  christos 	/* The question record. */
     97      1.1  christos 	buffer_write(packet, dname_name(dname), dname->name_size);
     98      1.1  christos 	buffer_write_u16(packet, type);
     99      1.1  christos 	buffer_write_u16(packet, klass);
    100      1.1  christos }
    101      1.1  christos 
    102      1.1  christos static socklen_t
    103      1.1  christos #ifdef INET6
    104  1.1.1.2  christos xfrd_acl_sockaddr(acl_options_type* acl, unsigned int port,
    105      1.1  christos 	struct sockaddr_storage *sck)
    106      1.1  christos #else
    107  1.1.1.2  christos xfrd_acl_sockaddr(acl_options_type* acl, unsigned int port,
    108      1.1  christos 	struct sockaddr_in *sck, const char* fromto)
    109      1.1  christos #endif /* INET6 */
    110      1.1  christos {
    111      1.1  christos 	/* setup address structure */
    112      1.1  christos #ifdef INET6
    113      1.1  christos 	memset(sck, 0, sizeof(struct sockaddr_storage));
    114      1.1  christos #else
    115      1.1  christos 	memset(sck, 0, sizeof(struct sockaddr_in));
    116      1.1  christos #endif
    117      1.1  christos 	if(acl->is_ipv6) {
    118      1.1  christos #ifdef INET6
    119      1.1  christos 		struct sockaddr_in6* sa = (struct sockaddr_in6*)sck;
    120      1.1  christos 		sa->sin6_family = AF_INET6;
    121      1.1  christos 		sa->sin6_port = htons(port);
    122      1.1  christos 		sa->sin6_addr = acl->addr.addr6;
    123      1.1  christos 		return sizeof(struct sockaddr_in6);
    124      1.1  christos #else
    125      1.1  christos 		log_msg(LOG_ERR, "xfrd: IPv6 connection %s %s attempted but no \
    126      1.1  christos INET6.", fromto, acl->ip_address_spec);
    127      1.1  christos 		return 0;
    128      1.1  christos #endif
    129      1.1  christos 	} else {
    130      1.1  christos 		struct sockaddr_in* sa = (struct sockaddr_in*)sck;
    131      1.1  christos 		sa->sin_family = AF_INET;
    132      1.1  christos 		sa->sin_port = htons(port);
    133      1.1  christos 		sa->sin_addr = acl->addr.addr;
    134      1.1  christos 		return sizeof(struct sockaddr_in);
    135      1.1  christos 	}
    136      1.1  christos }
    137      1.1  christos 
    138      1.1  christos socklen_t
    139      1.1  christos #ifdef INET6
    140  1.1.1.2  christos xfrd_acl_sockaddr_to(acl_options_type* acl, struct sockaddr_storage *to)
    141      1.1  christos #else
    142  1.1.1.2  christos xfrd_acl_sockaddr_to(acl_options_type* acl, struct sockaddr_in *to)
    143      1.1  christos #endif /* INET6 */
    144      1.1  christos {
    145      1.1  christos 	unsigned int port = acl->port?acl->port:(unsigned)atoi(TCP_PORT);
    146      1.1  christos #ifdef INET6
    147      1.1  christos 	return xfrd_acl_sockaddr(acl, port, to);
    148      1.1  christos #else
    149      1.1  christos 	return xfrd_acl_sockaddr(acl, port, to, "to");
    150      1.1  christos #endif /* INET6 */
    151      1.1  christos }
    152      1.1  christos 
    153      1.1  christos socklen_t
    154      1.1  christos #ifdef INET6
    155  1.1.1.2  christos xfrd_acl_sockaddr_frm(acl_options_type* acl, struct sockaddr_storage *frm)
    156      1.1  christos #else
    157  1.1.1.2  christos xfrd_acl_sockaddr_frm(acl_options_type* acl, struct sockaddr_in *frm)
    158      1.1  christos #endif /* INET6 */
    159      1.1  christos {
    160      1.1  christos 	unsigned int port = acl->port?acl->port:0;
    161      1.1  christos #ifdef INET6
    162      1.1  christos 	return xfrd_acl_sockaddr(acl, port, frm);
    163      1.1  christos #else
    164      1.1  christos 	return xfrd_acl_sockaddr(acl, port, frm, "from");
    165      1.1  christos #endif /* INET6 */
    166      1.1  christos }
    167      1.1  christos 
    168      1.1  christos void
    169      1.1  christos xfrd_write_soa_buffer(struct buffer* packet,
    170      1.1  christos 	const dname_type* apex, struct xfrd_soa* soa)
    171      1.1  christos {
    172      1.1  christos 	size_t rdlength_pos;
    173      1.1  christos 	uint16_t rdlength;
    174      1.1  christos 	buffer_write(packet, dname_name(apex), apex->name_size);
    175      1.1  christos 
    176      1.1  christos 	/* already in network order */
    177      1.1  christos 	buffer_write(packet, &soa->type, sizeof(soa->type));
    178      1.1  christos 	buffer_write(packet, &soa->klass, sizeof(soa->klass));
    179      1.1  christos 	buffer_write(packet, &soa->ttl, sizeof(soa->ttl));
    180      1.1  christos 	rdlength_pos = buffer_position(packet);
    181      1.1  christos 	buffer_skip(packet, sizeof(rdlength));
    182      1.1  christos 
    183      1.1  christos 	/* uncompressed dnames */
    184      1.1  christos 	buffer_write(packet, soa->prim_ns+1, soa->prim_ns[0]);
    185      1.1  christos 	buffer_write(packet, soa->email+1, soa->email[0]);
    186      1.1  christos 
    187      1.1  christos 	buffer_write(packet, &soa->serial, sizeof(uint32_t));
    188      1.1  christos 	buffer_write(packet, &soa->refresh, sizeof(uint32_t));
    189      1.1  christos 	buffer_write(packet, &soa->retry, sizeof(uint32_t));
    190      1.1  christos 	buffer_write(packet, &soa->expire, sizeof(uint32_t));
    191      1.1  christos 	buffer_write(packet, &soa->minimum, sizeof(uint32_t));
    192      1.1  christos 
    193      1.1  christos 	/* write length of RR */
    194      1.1  christos 	rdlength = buffer_position(packet) - rdlength_pos - sizeof(rdlength);
    195      1.1  christos 	buffer_write_u16_at(packet, rdlength_pos, rdlength);
    196      1.1  christos }
    197      1.1  christos 
    198  1.1.1.2  christos struct xfrd_tcp*
    199      1.1  christos xfrd_tcp_create(region_type* region, size_t bufsize)
    200      1.1  christos {
    201  1.1.1.2  christos 	struct xfrd_tcp* tcp_state = (struct xfrd_tcp*)region_alloc(
    202  1.1.1.2  christos 		region, sizeof(struct xfrd_tcp));
    203  1.1.1.2  christos 	memset(tcp_state, 0, sizeof(struct xfrd_tcp));
    204      1.1  christos 	tcp_state->packet = buffer_create(region, bufsize);
    205      1.1  christos 	tcp_state->fd = -1;
    206      1.1  christos 
    207      1.1  christos 	return tcp_state;
    208      1.1  christos }
    209      1.1  christos 
    210      1.1  christos static struct xfrd_tcp_pipeline*
    211  1.1.1.2  christos pipeline_find(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
    212      1.1  christos {
    213  1.1.1.2  christos 	rbnode_type* sme = NULL;
    214      1.1  christos 	struct xfrd_tcp_pipeline* r;
    215      1.1  christos 	/* smaller buf than a full pipeline with 64kb ID array, only need
    216      1.1  christos 	 * the front part with the key info, this front part contains the
    217      1.1  christos 	 * members that the compare function uses. */
    218      1.1  christos 	const size_t keysize = sizeof(struct xfrd_tcp_pipeline) -
    219      1.1  christos 		ID_PIPE_NUM*(sizeof(struct xfrd_zone*) + sizeof(uint16_t));
    220      1.1  christos 	/* void* type for alignment of the struct,
    221      1.1  christos 	 * divide the keysize by ptr-size and then add one to round up */
    222      1.1  christos 	void* buf[ (keysize / sizeof(void*)) + 1 ];
    223      1.1  christos 	struct xfrd_tcp_pipeline* key = (struct xfrd_tcp_pipeline*)buf;
    224      1.1  christos 	key->node.key = key;
    225      1.1  christos 	key->ip_len = xfrd_acl_sockaddr_to(zone->master, &key->ip);
    226      1.1  christos 	key->num_unused = ID_PIPE_NUM;
    227      1.1  christos 	/* lookup existing tcp transfer to the master with highest unused */
    228      1.1  christos 	if(rbtree_find_less_equal(set->pipetree, key, &sme)) {
    229      1.1  christos 		/* exact match, strange, fully unused tcp cannot be open */
    230      1.1  christos 		assert(0);
    231      1.1  christos 	}
    232      1.1  christos 	if(!sme)
    233      1.1  christos 		return NULL;
    234      1.1  christos 	r = (struct xfrd_tcp_pipeline*)sme->key;
    235      1.1  christos 	/* <= key pointed at, is the master correct ? */
    236      1.1  christos 	if(r->ip_len != key->ip_len)
    237      1.1  christos 		return NULL;
    238      1.1  christos 	if(memcmp(&r->ip, &key->ip, key->ip_len) != 0)
    239      1.1  christos 		return NULL;
    240      1.1  christos 	/* correct master, is there a slot free for this transfer? */
    241      1.1  christos 	if(r->num_unused == 0)
    242      1.1  christos 		return NULL;
    243      1.1  christos 	return r;
    244      1.1  christos }
    245      1.1  christos 
    246      1.1  christos /* remove zone from tcp waiting list */
    247      1.1  christos static void
    248  1.1.1.2  christos tcp_zone_waiting_list_popfirst(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
    249      1.1  christos {
    250      1.1  christos 	assert(zone->tcp_waiting);
    251      1.1  christos 	set->tcp_waiting_first = zone->tcp_waiting_next;
    252      1.1  christos 	if(zone->tcp_waiting_next)
    253      1.1  christos 		zone->tcp_waiting_next->tcp_waiting_prev = NULL;
    254      1.1  christos 	else	set->tcp_waiting_last = 0;
    255      1.1  christos 	zone->tcp_waiting_next = 0;
    256      1.1  christos 	zone->tcp_waiting = 0;
    257      1.1  christos }
    258      1.1  christos 
    259      1.1  christos /* remove zone from tcp pipe write-wait list */
    260      1.1  christos static void
    261  1.1.1.2  christos tcp_pipe_sendlist_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
    262      1.1  christos {
    263      1.1  christos 	if(zone->in_tcp_send) {
    264      1.1  christos 		if(zone->tcp_send_prev)
    265      1.1  christos 			zone->tcp_send_prev->tcp_send_next=zone->tcp_send_next;
    266      1.1  christos 		else	tp->tcp_send_first=zone->tcp_send_next;
    267      1.1  christos 		if(zone->tcp_send_next)
    268      1.1  christos 			zone->tcp_send_next->tcp_send_prev=zone->tcp_send_prev;
    269      1.1  christos 		else	tp->tcp_send_last=zone->tcp_send_prev;
    270      1.1  christos 		zone->in_tcp_send = 0;
    271      1.1  christos 	}
    272      1.1  christos }
    273      1.1  christos 
    274      1.1  christos /* remove first from write-wait list */
    275      1.1  christos static void
    276  1.1.1.2  christos tcp_pipe_sendlist_popfirst(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
    277      1.1  christos {
    278      1.1  christos 	tp->tcp_send_first = zone->tcp_send_next;
    279      1.1  christos 	if(tp->tcp_send_first)
    280      1.1  christos 		tp->tcp_send_first->tcp_send_prev = NULL;
    281      1.1  christos 	else	tp->tcp_send_last = NULL;
    282      1.1  christos 	zone->in_tcp_send = 0;
    283      1.1  christos }
    284      1.1  christos 
    285      1.1  christos /* remove zone from tcp pipe ID map */
    286      1.1  christos static void
    287  1.1.1.2  christos tcp_pipe_id_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
    288      1.1  christos {
    289      1.1  christos 	assert(tp->num_unused < ID_PIPE_NUM && tp->num_unused >= 0);
    290      1.1  christos 	assert(tp->id[zone->query_id] == zone);
    291      1.1  christos 	tp->id[zone->query_id] = NULL;
    292      1.1  christos 	tp->unused[tp->num_unused] = zone->query_id;
    293      1.1  christos 	/* must remove and re-add for sort order in tree */
    294      1.1  christos 	(void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
    295      1.1  christos 	tp->num_unused++;
    296      1.1  christos 	(void)rbtree_insert(xfrd->tcp_set->pipetree, &tp->node);
    297      1.1  christos }
    298      1.1  christos 
    299      1.1  christos /* stop the tcp pipe (and all its zones need to retry) */
    300      1.1  christos static void
    301      1.1  christos xfrd_tcp_pipe_stop(struct xfrd_tcp_pipeline* tp)
    302      1.1  christos {
    303      1.1  christos 	int i, conn = -1;
    304      1.1  christos 	assert(tp->num_unused < ID_PIPE_NUM); /* at least one 'in-use' */
    305      1.1  christos 	assert(ID_PIPE_NUM - tp->num_unused > tp->num_skip); /* at least one 'nonskip' */
    306      1.1  christos 	/* need to retry for all the zones connected to it */
    307      1.1  christos 	/* these could use different lists and go to a different nextmaster*/
    308      1.1  christos 	for(i=0; i<ID_PIPE_NUM; i++) {
    309      1.1  christos 		if(tp->id[i] && tp->id[i] != TCP_NULL_SKIP) {
    310  1.1.1.2  christos 			xfrd_zone_type* zone = tp->id[i];
    311      1.1  christos 			conn = zone->tcp_conn;
    312      1.1  christos 			zone->tcp_conn = -1;
    313      1.1  christos 			zone->tcp_waiting = 0;
    314      1.1  christos 			tcp_pipe_sendlist_remove(tp, zone);
    315      1.1  christos 			tcp_pipe_id_remove(tp, zone);
    316      1.1  christos 			xfrd_set_refresh_now(zone);
    317      1.1  christos 		}
    318      1.1  christos 	}
    319      1.1  christos 	assert(conn != -1);
    320      1.1  christos 	/* now release the entire tcp pipe */
    321      1.1  christos 	xfrd_tcp_pipe_release(xfrd->tcp_set, tp, conn);
    322      1.1  christos }
    323      1.1  christos 
    324      1.1  christos static void
    325      1.1  christos tcp_pipe_reset_timeout(struct xfrd_tcp_pipeline* tp)
    326      1.1  christos {
    327      1.1  christos 	int fd = tp->handler.ev_fd;
    328      1.1  christos 	struct timeval tv;
    329      1.1  christos 	tv.tv_sec = xfrd->tcp_set->tcp_timeout;
    330      1.1  christos 	tv.tv_usec = 0;
    331      1.1  christos 	if(tp->handler_added)
    332      1.1  christos 		event_del(&tp->handler);
    333  1.1.1.3  christos 	memset(&tp->handler, 0, sizeof(tp->handler));
    334      1.1  christos 	event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|
    335      1.1  christos 		(tp->tcp_send_first?EV_WRITE:0), xfrd_handle_tcp_pipe, tp);
    336      1.1  christos 	if(event_base_set(xfrd->event_base, &tp->handler) != 0)
    337      1.1  christos 		log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
    338      1.1  christos 	if(event_add(&tp->handler, &tv) != 0)
    339      1.1  christos 		log_msg(LOG_ERR, "xfrd tcp: event_add failed");
    340      1.1  christos 	tp->handler_added = 1;
    341      1.1  christos }
    342      1.1  christos 
    343      1.1  christos /* handle event from fd of tcp pipe */
    344      1.1  christos void
    345      1.1  christos xfrd_handle_tcp_pipe(int ATTR_UNUSED(fd), short event, void* arg)
    346      1.1  christos {
    347      1.1  christos 	struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)arg;
    348      1.1  christos 	if((event & EV_WRITE)) {
    349      1.1  christos 		tcp_pipe_reset_timeout(tp);
    350      1.1  christos 		if(tp->tcp_send_first) {
    351      1.1  christos 			DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp write, zone %s",
    352      1.1  christos 				tp->tcp_send_first->apex_str));
    353      1.1  christos 			xfrd_tcp_write(tp, tp->tcp_send_first);
    354      1.1  christos 		}
    355      1.1  christos 	}
    356      1.1  christos 	if((event & EV_READ) && tp->handler_added) {
    357      1.1  christos 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp read"));
    358      1.1  christos 		tcp_pipe_reset_timeout(tp);
    359      1.1  christos 		xfrd_tcp_read(tp);
    360      1.1  christos 	}
    361      1.1  christos 	if((event & EV_TIMEOUT) && tp->handler_added) {
    362      1.1  christos 		/* tcp connection timed out */
    363      1.1  christos 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp timeout"));
    364      1.1  christos 		xfrd_tcp_pipe_stop(tp);
    365      1.1  christos 	}
    366      1.1  christos }
    367      1.1  christos 
    368      1.1  christos /* add a zone to the pipeline, it starts to want to write its query */
    369      1.1  christos static void
    370  1.1.1.2  christos pipeline_setup_new_zone(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp,
    371  1.1.1.2  christos 	xfrd_zone_type* zone)
    372      1.1  christos {
    373      1.1  christos 	/* assign the ID */
    374      1.1  christos 	int idx;
    375      1.1  christos 	assert(tp->num_unused > 0);
    376      1.1  christos 	/* we pick a random ID, even though it is TCP anyway */
    377      1.1  christos 	idx = random_generate(tp->num_unused);
    378      1.1  christos 	zone->query_id = tp->unused[idx];
    379      1.1  christos 	tp->unused[idx] = tp->unused[tp->num_unused-1];
    380      1.1  christos 	tp->id[zone->query_id] = zone;
    381      1.1  christos 	/* decrement unused counter, and fixup tree */
    382      1.1  christos 	(void)rbtree_delete(set->pipetree, &tp->node);
    383      1.1  christos 	tp->num_unused--;
    384      1.1  christos 	(void)rbtree_insert(set->pipetree, &tp->node);
    385      1.1  christos 
    386      1.1  christos 	/* add to sendlist, at end */
    387      1.1  christos 	zone->tcp_send_next = NULL;
    388      1.1  christos 	zone->tcp_send_prev = tp->tcp_send_last;
    389      1.1  christos 	zone->in_tcp_send = 1;
    390      1.1  christos 	if(tp->tcp_send_last)
    391      1.1  christos 		tp->tcp_send_last->tcp_send_next = zone;
    392      1.1  christos 	else	tp->tcp_send_first = zone;
    393      1.1  christos 	tp->tcp_send_last = zone;
    394      1.1  christos 
    395      1.1  christos 	/* is it first in line? */
    396      1.1  christos 	if(tp->tcp_send_first == zone) {
    397      1.1  christos 		xfrd_tcp_setup_write_packet(tp, zone);
    398      1.1  christos 		/* add write to event handler */
    399      1.1  christos 		tcp_pipe_reset_timeout(tp);
    400      1.1  christos 	}
    401      1.1  christos }
    402      1.1  christos 
    403      1.1  christos void
    404  1.1.1.2  christos xfrd_tcp_obtain(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
    405      1.1  christos {
    406      1.1  christos 	struct xfrd_tcp_pipeline* tp;
    407      1.1  christos 	assert(zone->tcp_conn == -1);
    408      1.1  christos 	assert(zone->tcp_waiting == 0);
    409      1.1  christos 
    410      1.1  christos 	if(set->tcp_count < XFRD_MAX_TCP) {
    411      1.1  christos 		int i;
    412      1.1  christos 		assert(!set->tcp_waiting_first);
    413      1.1  christos 		set->tcp_count ++;
    414      1.1  christos 		/* find a free tcp_buffer */
    415      1.1  christos 		for(i=0; i<XFRD_MAX_TCP; i++) {
    416      1.1  christos 			if(set->tcp_state[i]->tcp_r->fd == -1) {
    417      1.1  christos 				zone->tcp_conn = i;
    418      1.1  christos 				break;
    419      1.1  christos 			}
    420      1.1  christos 		}
    421      1.1  christos 		/** What if there is no free tcp_buffer? return; */
    422      1.1  christos 		if (zone->tcp_conn < 0) {
    423      1.1  christos 			return;
    424      1.1  christos 		}
    425      1.1  christos 
    426      1.1  christos 		tp = set->tcp_state[zone->tcp_conn];
    427      1.1  christos 		zone->tcp_waiting = 0;
    428      1.1  christos 
    429      1.1  christos 		/* stop udp use (if any) */
    430      1.1  christos 		if(zone->zone_handler.ev_fd != -1)
    431      1.1  christos 			xfrd_udp_release(zone);
    432      1.1  christos 
    433      1.1  christos 		if(!xfrd_tcp_open(set, tp, zone)) {
    434      1.1  christos 			zone->tcp_conn = -1;
    435      1.1  christos 			set->tcp_count --;
    436      1.1  christos 			xfrd_set_refresh_now(zone);
    437      1.1  christos 			return;
    438      1.1  christos 		}
    439      1.1  christos 		/* ip and ip_len set by tcp_open */
    440      1.1  christos 		tp->node.key = tp;
    441      1.1  christos 		tp->num_unused = ID_PIPE_NUM;
    442      1.1  christos 		tp->num_skip = 0;
    443      1.1  christos 		tp->tcp_send_first = NULL;
    444      1.1  christos 		tp->tcp_send_last = NULL;
    445      1.1  christos 		memset(tp->id, 0, sizeof(tp->id));
    446      1.1  christos 		for(i=0; i<ID_PIPE_NUM; i++) {
    447      1.1  christos 			tp->unused[i] = i;
    448      1.1  christos 		}
    449      1.1  christos 
    450      1.1  christos 		/* insert into tree */
    451      1.1  christos 		(void)rbtree_insert(set->pipetree, &tp->node);
    452      1.1  christos 		xfrd_deactivate_zone(zone);
    453      1.1  christos 		xfrd_unset_timer(zone);
    454      1.1  christos 		pipeline_setup_new_zone(set, tp, zone);
    455      1.1  christos 		return;
    456      1.1  christos 	}
    457      1.1  christos 	/* check for a pipeline to the same master with unused ID */
    458      1.1  christos 	if((tp = pipeline_find(set, zone))!= NULL) {
    459      1.1  christos 		int i;
    460      1.1  christos 		if(zone->zone_handler.ev_fd != -1)
    461      1.1  christos 			xfrd_udp_release(zone);
    462      1.1  christos 		for(i=0; i<XFRD_MAX_TCP; i++) {
    463      1.1  christos 			if(set->tcp_state[i] == tp)
    464      1.1  christos 				zone->tcp_conn = i;
    465      1.1  christos 		}
    466      1.1  christos 		xfrd_deactivate_zone(zone);
    467      1.1  christos 		xfrd_unset_timer(zone);
    468      1.1  christos 		pipeline_setup_new_zone(set, tp, zone);
    469      1.1  christos 		return;
    470      1.1  christos 	}
    471      1.1  christos 
    472      1.1  christos 	/* wait, at end of line */
    473      1.1  christos 	DEBUG(DEBUG_XFRD,2, (LOG_INFO, "xfrd: max number of tcp "
    474      1.1  christos 		"connections (%d) reached.", XFRD_MAX_TCP));
    475      1.1  christos 	zone->tcp_waiting_next = 0;
    476      1.1  christos 	zone->tcp_waiting_prev = set->tcp_waiting_last;
    477      1.1  christos 	zone->tcp_waiting = 1;
    478      1.1  christos 	if(!set->tcp_waiting_last) {
    479      1.1  christos 		set->tcp_waiting_first = zone;
    480      1.1  christos 		set->tcp_waiting_last = zone;
    481      1.1  christos 	} else {
    482      1.1  christos 		set->tcp_waiting_last->tcp_waiting_next = zone;
    483      1.1  christos 		set->tcp_waiting_last = zone;
    484      1.1  christos 	}
    485      1.1  christos 	xfrd_deactivate_zone(zone);
    486      1.1  christos 	xfrd_unset_timer(zone);
    487      1.1  christos }
    488      1.1  christos 
    489      1.1  christos int
    490  1.1.1.2  christos xfrd_tcp_open(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp,
    491  1.1.1.2  christos 	xfrd_zone_type* zone)
    492      1.1  christos {
    493      1.1  christos 	int fd, family, conn;
    494      1.1  christos 	struct timeval tv;
    495      1.1  christos 	assert(zone->tcp_conn != -1);
    496      1.1  christos 
    497      1.1  christos 	/* if there is no next master, fallback to use the first one */
    498      1.1  christos 	/* but there really should be a master set */
    499      1.1  christos 	if(!zone->master) {
    500      1.1  christos 		zone->master = zone->zone_options->pattern->request_xfr;
    501      1.1  christos 		zone->master_num = 0;
    502      1.1  christos 	}
    503      1.1  christos 
    504      1.1  christos 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s open tcp conn to %s",
    505      1.1  christos 		zone->apex_str, zone->master->ip_address_spec));
    506      1.1  christos 	tp->tcp_r->is_reading = 1;
    507      1.1  christos 	tp->tcp_r->total_bytes = 0;
    508      1.1  christos 	tp->tcp_r->msglen = 0;
    509      1.1  christos 	buffer_clear(tp->tcp_r->packet);
    510      1.1  christos 	tp->tcp_w->is_reading = 0;
    511      1.1  christos 	tp->tcp_w->total_bytes = 0;
    512      1.1  christos 	tp->tcp_w->msglen = 0;
    513      1.1  christos 	tp->connection_established = 0;
    514      1.1  christos 
    515      1.1  christos 	if(zone->master->is_ipv6) {
    516      1.1  christos #ifdef INET6
    517      1.1  christos 		family = PF_INET6;
    518      1.1  christos #else
    519      1.1  christos 		xfrd_set_refresh_now(zone);
    520      1.1  christos 		return 0;
    521      1.1  christos #endif
    522      1.1  christos 	} else {
    523      1.1  christos 		family = PF_INET;
    524      1.1  christos 	}
    525      1.1  christos 	fd = socket(family, SOCK_STREAM, IPPROTO_TCP);
    526      1.1  christos 	if(fd == -1) {
    527  1.1.1.2  christos 		/* squelch 'Address family not supported by protocol' at low
    528  1.1.1.2  christos 		 * verbosity levels */
    529  1.1.1.2  christos 		if(errno != EAFNOSUPPORT || verbosity > 2)
    530  1.1.1.2  christos 		    log_msg(LOG_ERR, "xfrd: %s cannot create tcp socket: %s",
    531      1.1  christos 			zone->master->ip_address_spec, strerror(errno));
    532      1.1  christos 		xfrd_set_refresh_now(zone);
    533      1.1  christos 		return 0;
    534      1.1  christos 	}
    535      1.1  christos 	if(fcntl(fd, F_SETFL, O_NONBLOCK) == -1) {
    536      1.1  christos 		log_msg(LOG_ERR, "xfrd: fcntl failed: %s", strerror(errno));
    537      1.1  christos 		close(fd);
    538      1.1  christos 		xfrd_set_refresh_now(zone);
    539      1.1  christos 		return 0;
    540      1.1  christos 	}
    541      1.1  christos 
    542      1.1  christos 	if(xfrd->nsd->outgoing_tcp_mss > 0) {
    543      1.1  christos #if defined(IPPROTO_TCP) && defined(TCP_MAXSEG)
    544      1.1  christos 		if(setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG,
    545      1.1  christos 			(void*)&xfrd->nsd->outgoing_tcp_mss,
    546      1.1  christos 			sizeof(xfrd->nsd->outgoing_tcp_mss)) < 0) {
    547      1.1  christos 			log_msg(LOG_ERR, "xfrd: setsockopt(TCP_MAXSEG)"
    548      1.1  christos 					"failed: %s", strerror(errno));
    549      1.1  christos 		}
    550      1.1  christos #else
    551      1.1  christos 		log_msg(LOG_ERR, "setsockopt(TCP_MAXSEG) unsupported");
    552      1.1  christos #endif
    553      1.1  christos 	}
    554      1.1  christos 
    555      1.1  christos 	tp->ip_len = xfrd_acl_sockaddr_to(zone->master, &tp->ip);
    556      1.1  christos 
    557      1.1  christos 	/* bind it */
    558      1.1  christos 	if (!xfrd_bind_local_interface(fd, zone->zone_options->pattern->
    559      1.1  christos 		outgoing_interface, zone->master, 1)) {
    560      1.1  christos 		close(fd);
    561      1.1  christos 		xfrd_set_refresh_now(zone);
    562      1.1  christos 		return 0;
    563      1.1  christos         }
    564      1.1  christos 
    565      1.1  christos 	conn = connect(fd, (struct sockaddr*)&tp->ip, tp->ip_len);
    566      1.1  christos 	if (conn == -1 && errno != EINPROGRESS) {
    567      1.1  christos 		log_msg(LOG_ERR, "xfrd: connect %s failed: %s",
    568      1.1  christos 			zone->master->ip_address_spec, strerror(errno));
    569      1.1  christos 		close(fd);
    570      1.1  christos 		xfrd_set_refresh_now(zone);
    571      1.1  christos 		return 0;
    572      1.1  christos 	}
    573      1.1  christos 	tp->tcp_r->fd = fd;
    574      1.1  christos 	tp->tcp_w->fd = fd;
    575      1.1  christos 
    576      1.1  christos 	/* set the tcp pipe event */
    577      1.1  christos 	if(tp->handler_added)
    578      1.1  christos 		event_del(&tp->handler);
    579  1.1.1.3  christos 	memset(&tp->handler, 0, sizeof(tp->handler));
    580      1.1  christos 	event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|EV_WRITE,
    581      1.1  christos 		xfrd_handle_tcp_pipe, tp);
    582      1.1  christos 	if(event_base_set(xfrd->event_base, &tp->handler) != 0)
    583      1.1  christos 		log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
    584      1.1  christos 	tv.tv_sec = set->tcp_timeout;
    585      1.1  christos 	tv.tv_usec = 0;
    586      1.1  christos 	if(event_add(&tp->handler, &tv) != 0)
    587      1.1  christos 		log_msg(LOG_ERR, "xfrd tcp: event_add failed");
    588      1.1  christos 	tp->handler_added = 1;
    589      1.1  christos 	return 1;
    590      1.1  christos }
    591      1.1  christos 
    592      1.1  christos void
    593  1.1.1.2  christos xfrd_tcp_setup_write_packet(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
    594      1.1  christos {
    595  1.1.1.2  christos 	struct xfrd_tcp* tcp = tp->tcp_w;
    596      1.1  christos 	assert(zone->tcp_conn != -1);
    597      1.1  christos 	assert(zone->tcp_waiting == 0);
    598      1.1  christos 	/* start AXFR or IXFR for the zone */
    599      1.1  christos 	if(zone->soa_disk_acquired == 0 || zone->master->use_axfr_only ||
    600      1.1  christos 		zone->master->ixfr_disabled ||
    601      1.1  christos 		/* if zone expired, after the first round, do not ask for
    602      1.1  christos 		 * IXFR any more, but full AXFR (of any serial number) */
    603      1.1  christos 		(zone->state == xfrd_zone_expired && zone->round_num != 0)) {
    604      1.1  christos 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request full zone transfer "
    605      1.1  christos 						"(AXFR) for %s to %s",
    606      1.1  christos 			zone->apex_str, zone->master->ip_address_spec));
    607      1.1  christos 
    608      1.1  christos 		xfrd_setup_packet(tcp->packet, TYPE_AXFR, CLASS_IN, zone->apex,
    609      1.1  christos 			zone->query_id);
    610      1.1  christos 	} else {
    611      1.1  christos 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request incremental zone "
    612      1.1  christos 						"transfer (IXFR) for %s to %s",
    613      1.1  christos 			zone->apex_str, zone->master->ip_address_spec));
    614      1.1  christos 
    615      1.1  christos 		xfrd_setup_packet(tcp->packet, TYPE_IXFR, CLASS_IN, zone->apex,
    616      1.1  christos 			zone->query_id);
    617      1.1  christos         	NSCOUNT_SET(tcp->packet, 1);
    618      1.1  christos 		xfrd_write_soa_buffer(tcp->packet, zone->apex, &zone->soa_disk);
    619      1.1  christos 	}
    620      1.1  christos 	/* old transfer needs to be removed still? */
    621      1.1  christos 	if(zone->msg_seq_nr)
    622      1.1  christos 		xfrd_unlink_xfrfile(xfrd->nsd, zone->xfrfilenumber);
    623      1.1  christos 	zone->msg_seq_nr = 0;
    624      1.1  christos 	zone->msg_rr_count = 0;
    625      1.1  christos 	if(zone->master->key_options && zone->master->key_options->tsig_key) {
    626      1.1  christos 		xfrd_tsig_sign_request(tcp->packet, &zone->tsig, zone->master);
    627      1.1  christos 	}
    628      1.1  christos 	buffer_flip(tcp->packet);
    629      1.1  christos 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "sent tcp query with ID %d", zone->query_id));
    630      1.1  christos 	tcp->msglen = buffer_limit(tcp->packet);
    631      1.1  christos 	tcp->total_bytes = 0;
    632      1.1  christos }
    633      1.1  christos 
    634      1.1  christos static void
    635  1.1.1.2  christos tcp_conn_ready_for_reading(struct xfrd_tcp* tcp)
    636      1.1  christos {
    637      1.1  christos 	tcp->total_bytes = 0;
    638      1.1  christos 	tcp->msglen = 0;
    639      1.1  christos 	buffer_clear(tcp->packet);
    640      1.1  christos }
    641      1.1  christos 
    642  1.1.1.2  christos int conn_write(struct xfrd_tcp* tcp)
    643      1.1  christos {
    644      1.1  christos 	ssize_t sent;
    645      1.1  christos 
    646      1.1  christos 	if(tcp->total_bytes < sizeof(tcp->msglen)) {
    647      1.1  christos 		uint16_t sendlen = htons(tcp->msglen);
    648  1.1.1.2  christos #ifdef HAVE_WRITEV
    649  1.1.1.2  christos 		struct iovec iov[2];
    650  1.1.1.2  christos 		iov[0].iov_base = (uint8_t*)&sendlen + tcp->total_bytes;
    651  1.1.1.2  christos 		iov[0].iov_len = sizeof(sendlen) - tcp->total_bytes;
    652  1.1.1.2  christos 		iov[1].iov_base = buffer_begin(tcp->packet);
    653  1.1.1.2  christos 		iov[1].iov_len = buffer_limit(tcp->packet);
    654  1.1.1.2  christos 		sent = writev(tcp->fd, iov, 2);
    655  1.1.1.2  christos #else /* HAVE_WRITEV */
    656      1.1  christos 		sent = write(tcp->fd,
    657      1.1  christos 			(const char*)&sendlen + tcp->total_bytes,
    658      1.1  christos 			sizeof(tcp->msglen) - tcp->total_bytes);
    659  1.1.1.2  christos #endif /* HAVE_WRITEV */
    660      1.1  christos 
    661      1.1  christos 		if(sent == -1) {
    662      1.1  christos 			if(errno == EAGAIN || errno == EINTR) {
    663      1.1  christos 				/* write would block, try later */
    664      1.1  christos 				return 0;
    665      1.1  christos 			} else {
    666      1.1  christos 				return -1;
    667      1.1  christos 			}
    668      1.1  christos 		}
    669      1.1  christos 
    670      1.1  christos 		tcp->total_bytes += sent;
    671  1.1.1.2  christos 		if(sent > (ssize_t)sizeof(tcp->msglen))
    672  1.1.1.2  christos 			buffer_skip(tcp->packet, sent-sizeof(tcp->msglen));
    673      1.1  christos 		if(tcp->total_bytes < sizeof(tcp->msglen)) {
    674      1.1  christos 			/* incomplete write, resume later */
    675      1.1  christos 			return 0;
    676      1.1  christos 		}
    677  1.1.1.2  christos #ifdef HAVE_WRITEV
    678  1.1.1.2  christos 		if(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen)) {
    679  1.1.1.2  christos 			/* packet done */
    680  1.1.1.2  christos 			return 1;
    681  1.1.1.2  christos 		}
    682  1.1.1.2  christos #endif
    683  1.1.1.2  christos 		assert(tcp->total_bytes >= sizeof(tcp->msglen));
    684      1.1  christos 	}
    685      1.1  christos 
    686      1.1  christos 	assert(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen));
    687      1.1  christos 
    688      1.1  christos 	sent = write(tcp->fd,
    689      1.1  christos 		buffer_current(tcp->packet),
    690      1.1  christos 		buffer_remaining(tcp->packet));
    691      1.1  christos 	if(sent == -1) {
    692      1.1  christos 		if(errno == EAGAIN || errno == EINTR) {
    693      1.1  christos 			/* write would block, try later */
    694      1.1  christos 			return 0;
    695      1.1  christos 		} else {
    696      1.1  christos 			return -1;
    697      1.1  christos 		}
    698      1.1  christos 	}
    699      1.1  christos 
    700      1.1  christos 	buffer_skip(tcp->packet, sent);
    701      1.1  christos 	tcp->total_bytes += sent;
    702      1.1  christos 
    703      1.1  christos 	if(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen)) {
    704      1.1  christos 		/* more to write when socket becomes writable again */
    705      1.1  christos 		return 0;
    706      1.1  christos 	}
    707      1.1  christos 
    708      1.1  christos 	assert(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen));
    709      1.1  christos 	return 1;
    710      1.1  christos }
    711      1.1  christos 
    712      1.1  christos void
    713  1.1.1.2  christos xfrd_tcp_write(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
    714      1.1  christos {
    715      1.1  christos 	int ret;
    716  1.1.1.2  christos 	struct xfrd_tcp* tcp = tp->tcp_w;
    717      1.1  christos 	assert(zone->tcp_conn != -1);
    718      1.1  christos 	assert(zone == tp->tcp_send_first);
    719      1.1  christos 	/* see if for non-established connection, there is a connect error */
    720      1.1  christos 	if(!tp->connection_established) {
    721      1.1  christos 		/* check for pending error from nonblocking connect */
    722      1.1  christos 		/* from Stevens, unix network programming, vol1, 3rd ed, p450 */
    723      1.1  christos 		int error = 0;
    724      1.1  christos 		socklen_t len = sizeof(error);
    725      1.1  christos 		if(getsockopt(tcp->fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0){
    726      1.1  christos 			error = errno; /* on solaris errno is error */
    727      1.1  christos 		}
    728      1.1  christos 		if(error == EINPROGRESS || error == EWOULDBLOCK)
    729      1.1  christos 			return; /* try again later */
    730      1.1  christos 		if(error != 0) {
    731      1.1  christos 			log_msg(LOG_ERR, "%s: Could not tcp connect to %s: %s",
    732      1.1  christos 				zone->apex_str, zone->master->ip_address_spec,
    733      1.1  christos 				strerror(error));
    734      1.1  christos 			xfrd_tcp_pipe_stop(tp);
    735      1.1  christos 			return;
    736      1.1  christos 		}
    737      1.1  christos 	}
    738      1.1  christos 	ret = conn_write(tcp);
    739      1.1  christos 	if(ret == -1) {
    740      1.1  christos 		log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
    741      1.1  christos 		xfrd_tcp_pipe_stop(tp);
    742      1.1  christos 		return;
    743      1.1  christos 	}
    744      1.1  christos 	if(tcp->total_bytes != 0 && !tp->connection_established)
    745      1.1  christos 		tp->connection_established = 1;
    746      1.1  christos 	if(ret == 0) {
    747      1.1  christos 		return; /* write again later */
    748      1.1  christos 	}
    749      1.1  christos 	/* done writing this message */
    750      1.1  christos 
    751      1.1  christos 	/* remove first zone from sendlist */
    752      1.1  christos 	tcp_pipe_sendlist_popfirst(tp, zone);
    753      1.1  christos 
    754      1.1  christos 	/* see if other zone wants to write; init; let it write (now) */
    755      1.1  christos 	/* and use a loop, because 64k stack calls is a too much */
    756      1.1  christos 	while(tp->tcp_send_first) {
    757      1.1  christos 		/* setup to write for this zone */
    758      1.1  christos 		xfrd_tcp_setup_write_packet(tp, tp->tcp_send_first);
    759      1.1  christos 		/* attempt to write for this zone (if success, continue loop)*/
    760      1.1  christos 		ret = conn_write(tcp);
    761      1.1  christos 		if(ret == -1) {
    762      1.1  christos 			log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
    763      1.1  christos 			xfrd_tcp_pipe_stop(tp);
    764      1.1  christos 			return;
    765      1.1  christos 		}
    766      1.1  christos 		if(ret == 0)
    767      1.1  christos 			return; /* write again later */
    768      1.1  christos 		tcp_pipe_sendlist_popfirst(tp, tp->tcp_send_first);
    769      1.1  christos 	}
    770      1.1  christos 
    771      1.1  christos 	/* if sendlist empty, remove WRITE from event */
    772      1.1  christos 
    773      1.1  christos 	/* listen to READ, and not WRITE events */
    774      1.1  christos 	assert(tp->tcp_send_first == NULL);
    775      1.1  christos 	tcp_pipe_reset_timeout(tp);
    776      1.1  christos }
    777      1.1  christos 
    778      1.1  christos int
    779  1.1.1.2  christos conn_read(struct xfrd_tcp* tcp)
    780      1.1  christos {
    781      1.1  christos 	ssize_t received;
    782      1.1  christos 	/* receive leading packet length bytes */
    783      1.1  christos 	if(tcp->total_bytes < sizeof(tcp->msglen)) {
    784      1.1  christos 		received = read(tcp->fd,
    785      1.1  christos 			(char*) &tcp->msglen + tcp->total_bytes,
    786      1.1  christos 			sizeof(tcp->msglen) - tcp->total_bytes);
    787      1.1  christos 		if(received == -1) {
    788      1.1  christos 			if(errno == EAGAIN || errno == EINTR) {
    789      1.1  christos 				/* read would block, try later */
    790      1.1  christos 				return 0;
    791      1.1  christos 			} else {
    792      1.1  christos #ifdef ECONNRESET
    793      1.1  christos 				if (verbosity >= 2 || errno != ECONNRESET)
    794      1.1  christos #endif /* ECONNRESET */
    795      1.1  christos 				log_msg(LOG_ERR, "tcp read sz: %s", strerror(errno));
    796      1.1  christos 				return -1;
    797      1.1  christos 			}
    798      1.1  christos 		} else if(received == 0) {
    799      1.1  christos 			/* EOF */
    800      1.1  christos 			return -1;
    801      1.1  christos 		}
    802      1.1  christos 		tcp->total_bytes += received;
    803      1.1  christos 		if(tcp->total_bytes < sizeof(tcp->msglen)) {
    804      1.1  christos 			/* not complete yet, try later */
    805      1.1  christos 			return 0;
    806      1.1  christos 		}
    807      1.1  christos 
    808      1.1  christos 		assert(tcp->total_bytes == sizeof(tcp->msglen));
    809      1.1  christos 		tcp->msglen = ntohs(tcp->msglen);
    810      1.1  christos 
    811      1.1  christos 		if(tcp->msglen == 0) {
    812      1.1  christos 			buffer_set_limit(tcp->packet, tcp->msglen);
    813      1.1  christos 			return 1;
    814      1.1  christos 		}
    815      1.1  christos 		if(tcp->msglen > buffer_capacity(tcp->packet)) {
    816      1.1  christos 			log_msg(LOG_ERR, "buffer too small, dropping connection");
    817      1.1  christos 			return 0;
    818      1.1  christos 		}
    819      1.1  christos 		buffer_set_limit(tcp->packet, tcp->msglen);
    820      1.1  christos 	}
    821      1.1  christos 
    822      1.1  christos 	assert(buffer_remaining(tcp->packet) > 0);
    823      1.1  christos 
    824      1.1  christos 	received = read(tcp->fd, buffer_current(tcp->packet),
    825      1.1  christos 		buffer_remaining(tcp->packet));
    826      1.1  christos 	if(received == -1) {
    827      1.1  christos 		if(errno == EAGAIN || errno == EINTR) {
    828      1.1  christos 			/* read would block, try later */
    829      1.1  christos 			return 0;
    830      1.1  christos 		} else {
    831      1.1  christos #ifdef ECONNRESET
    832      1.1  christos 			if (verbosity >= 2 || errno != ECONNRESET)
    833      1.1  christos #endif /* ECONNRESET */
    834      1.1  christos 			log_msg(LOG_ERR, "tcp read %s", strerror(errno));
    835      1.1  christos 			return -1;
    836      1.1  christos 		}
    837      1.1  christos 	} else if(received == 0) {
    838      1.1  christos 		/* EOF */
    839      1.1  christos 		return -1;
    840      1.1  christos 	}
    841      1.1  christos 
    842      1.1  christos 	tcp->total_bytes += received;
    843      1.1  christos 	buffer_skip(tcp->packet, received);
    844      1.1  christos 
    845      1.1  christos 	if(buffer_remaining(tcp->packet) > 0) {
    846      1.1  christos 		/* not complete yet, wait for more */
    847      1.1  christos 		return 0;
    848      1.1  christos 	}
    849      1.1  christos 
    850      1.1  christos 	/* completed */
    851      1.1  christos 	assert(buffer_position(tcp->packet) == tcp->msglen);
    852      1.1  christos 	return 1;
    853      1.1  christos }
    854      1.1  christos 
    855      1.1  christos void
    856      1.1  christos xfrd_tcp_read(struct xfrd_tcp_pipeline* tp)
    857      1.1  christos {
    858  1.1.1.2  christos 	xfrd_zone_type* zone;
    859  1.1.1.2  christos 	struct xfrd_tcp* tcp = tp->tcp_r;
    860      1.1  christos 	int ret;
    861      1.1  christos 	enum xfrd_packet_result pkt_result;
    862      1.1  christos 
    863      1.1  christos 	ret = conn_read(tcp);
    864      1.1  christos 	if(ret == -1) {
    865      1.1  christos 		xfrd_tcp_pipe_stop(tp);
    866      1.1  christos 		return;
    867      1.1  christos 	}
    868      1.1  christos 	if(ret == 0)
    869      1.1  christos 		return;
    870      1.1  christos 	/* completed msg */
    871      1.1  christos 	buffer_flip(tcp->packet);
    872      1.1  christos 	/* see which ID number it is, if skip, handle skip, NULL: warn */
    873      1.1  christos 	if(tcp->msglen < QHEADERSZ) {
    874      1.1  christos 		/* too short for DNS header, skip it */
    875      1.1  christos 		DEBUG(DEBUG_XFRD,1, (LOG_INFO,
    876      1.1  christos 			"xfrd: tcp skip response that is too short"));
    877      1.1  christos 		tcp_conn_ready_for_reading(tcp);
    878      1.1  christos 		return;
    879      1.1  christos 	}
    880      1.1  christos 	zone = tp->id[ID(tcp->packet)];
    881      1.1  christos 	if(!zone || zone == TCP_NULL_SKIP) {
    882      1.1  christos 		/* no zone for this id? skip it */
    883      1.1  christos 		DEBUG(DEBUG_XFRD,1, (LOG_INFO,
    884      1.1  christos 			"xfrd: tcp skip response with %s ID",
    885      1.1  christos 			zone?"set-to-skip":"unknown"));
    886      1.1  christos 		tcp_conn_ready_for_reading(tcp);
    887      1.1  christos 		return;
    888      1.1  christos 	}
    889      1.1  christos 	assert(zone->tcp_conn != -1);
    890      1.1  christos 
    891      1.1  christos 	/* handle message for zone */
    892      1.1  christos 	pkt_result = xfrd_handle_received_xfr_packet(zone, tcp->packet);
    893      1.1  christos 	/* setup for reading the next packet on this connection */
    894      1.1  christos 	tcp_conn_ready_for_reading(tcp);
    895      1.1  christos 	switch(pkt_result) {
    896      1.1  christos 		case xfrd_packet_more:
    897      1.1  christos 			/* wait for next packet */
    898      1.1  christos 			break;
    899      1.1  christos 		case xfrd_packet_newlease:
    900      1.1  christos 			/* set to skip if more packets with this ID */
    901      1.1  christos 			tp->id[zone->query_id] = TCP_NULL_SKIP;
    902      1.1  christos 			tp->num_skip++;
    903      1.1  christos 			/* fall through to remove zone from tp */
    904  1.1.1.2  christos 			/* fallthrough */
    905      1.1  christos 		case xfrd_packet_transfer:
    906      1.1  christos 			if(zone->zone_options->pattern->multi_master_check) {
    907      1.1  christos 				xfrd_tcp_release(xfrd->tcp_set, zone);
    908      1.1  christos 				xfrd_make_request(zone);
    909      1.1  christos 				break;
    910      1.1  christos 			}
    911      1.1  christos 			xfrd_tcp_release(xfrd->tcp_set, zone);
    912      1.1  christos 			assert(zone->round_num == -1);
    913      1.1  christos 			break;
    914      1.1  christos 		case xfrd_packet_notimpl:
    915      1.1  christos 			xfrd_disable_ixfr(zone);
    916      1.1  christos 			xfrd_tcp_release(xfrd->tcp_set, zone);
    917      1.1  christos 			/* query next server */
    918      1.1  christos 			xfrd_make_request(zone);
    919      1.1  christos 			break;
    920      1.1  christos 		case xfrd_packet_bad:
    921      1.1  christos 		case xfrd_packet_tcp:
    922      1.1  christos 		default:
    923      1.1  christos 			/* set to skip if more packets with this ID */
    924      1.1  christos 			tp->id[zone->query_id] = TCP_NULL_SKIP;
    925      1.1  christos 			tp->num_skip++;
    926      1.1  christos 			xfrd_tcp_release(xfrd->tcp_set, zone);
    927      1.1  christos 			/* query next server */
    928      1.1  christos 			xfrd_make_request(zone);
    929      1.1  christos 			break;
    930      1.1  christos 	}
    931      1.1  christos }
    932      1.1  christos 
    933      1.1  christos void
    934  1.1.1.2  christos xfrd_tcp_release(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
    935      1.1  christos {
    936      1.1  christos 	int conn = zone->tcp_conn;
    937      1.1  christos 	struct xfrd_tcp_pipeline* tp = set->tcp_state[conn];
    938      1.1  christos 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s released tcp conn to %s",
    939      1.1  christos 		zone->apex_str, zone->master->ip_address_spec));
    940      1.1  christos 	assert(zone->tcp_conn != -1);
    941      1.1  christos 	assert(zone->tcp_waiting == 0);
    942      1.1  christos 	zone->tcp_conn = -1;
    943      1.1  christos 	zone->tcp_waiting = 0;
    944      1.1  christos 
    945      1.1  christos 	/* remove from tcp_send list */
    946      1.1  christos 	tcp_pipe_sendlist_remove(tp, zone);
    947      1.1  christos 	/* remove it from the ID list */
    948      1.1  christos 	if(tp->id[zone->query_id] != TCP_NULL_SKIP)
    949      1.1  christos 		tcp_pipe_id_remove(tp, zone);
    950      1.1  christos 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: released tcp pipe now %d unused",
    951      1.1  christos 		tp->num_unused));
    952      1.1  christos 	/* if pipe was full, but no more, then see if waiting element is
    953      1.1  christos 	 * for the same master, and can fill the unused ID */
    954      1.1  christos 	if(tp->num_unused == 1 && set->tcp_waiting_first) {
    955      1.1  christos #ifdef INET6
    956      1.1  christos 		struct sockaddr_storage to;
    957      1.1  christos #else
    958      1.1  christos 		struct sockaddr_in to;
    959      1.1  christos #endif
    960      1.1  christos 		socklen_t to_len = xfrd_acl_sockaddr_to(
    961      1.1  christos 			set->tcp_waiting_first->master, &to);
    962      1.1  christos 		if(to_len == tp->ip_len && memcmp(&to, &tp->ip, to_len) == 0) {
    963      1.1  christos 			/* use this connection for the waiting zone */
    964      1.1  christos 			zone = set->tcp_waiting_first;
    965      1.1  christos 			assert(zone->tcp_conn == -1);
    966      1.1  christos 			zone->tcp_conn = conn;
    967      1.1  christos 			tcp_zone_waiting_list_popfirst(set, zone);
    968      1.1  christos 			if(zone->zone_handler.ev_fd != -1)
    969      1.1  christos 				xfrd_udp_release(zone);
    970      1.1  christos 			xfrd_unset_timer(zone);
    971      1.1  christos 			pipeline_setup_new_zone(set, tp, zone);
    972      1.1  christos 			return;
    973      1.1  christos 		}
    974      1.1  christos 		/* waiting zone did not go to same server */
    975      1.1  christos 	}
    976      1.1  christos 
    977      1.1  christos 	/* if all unused, or only skipped leftover, close the pipeline */
    978      1.1  christos 	if(tp->num_unused >= ID_PIPE_NUM || tp->num_skip >= ID_PIPE_NUM - tp->num_unused)
    979      1.1  christos 		xfrd_tcp_pipe_release(set, tp, conn);
    980      1.1  christos }
    981      1.1  christos 
    982      1.1  christos void
    983  1.1.1.2  christos xfrd_tcp_pipe_release(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp,
    984      1.1  christos 	int conn)
    985      1.1  christos {
    986      1.1  christos 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: tcp pipe released"));
    987      1.1  christos 	/* one handler per tcp pipe */
    988      1.1  christos 	if(tp->handler_added)
    989      1.1  christos 		event_del(&tp->handler);
    990      1.1  christos 	tp->handler_added = 0;
    991      1.1  christos 
    992      1.1  christos 	/* fd in tcp_r and tcp_w is the same, close once */
    993      1.1  christos 	if(tp->tcp_r->fd != -1)
    994      1.1  christos 		close(tp->tcp_r->fd);
    995      1.1  christos 	tp->tcp_r->fd = -1;
    996      1.1  christos 	tp->tcp_w->fd = -1;
    997      1.1  christos 
    998      1.1  christos 	/* remove from pipetree */
    999      1.1  christos 	(void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
   1000      1.1  christos 
   1001      1.1  christos 	/* a waiting zone can use the free tcp slot (to another server) */
   1002      1.1  christos 	/* if that zone fails to set-up or connect, we try to start the next
   1003      1.1  christos 	 * waiting zone in the list */
   1004      1.1  christos 	while(set->tcp_count == XFRD_MAX_TCP && set->tcp_waiting_first) {
   1005      1.1  christos 		int i;
   1006      1.1  christos 
   1007      1.1  christos 		/* pop first waiting process */
   1008  1.1.1.2  christos 		xfrd_zone_type* zone = set->tcp_waiting_first;
   1009      1.1  christos 		/* start it */
   1010      1.1  christos 		assert(zone->tcp_conn == -1);
   1011      1.1  christos 		zone->tcp_conn = conn;
   1012      1.1  christos 		tcp_zone_waiting_list_popfirst(set, zone);
   1013      1.1  christos 
   1014      1.1  christos 		/* stop udp (if any) */
   1015      1.1  christos 		if(zone->zone_handler.ev_fd != -1)
   1016      1.1  christos 			xfrd_udp_release(zone);
   1017      1.1  christos 		if(!xfrd_tcp_open(set, tp, zone)) {
   1018      1.1  christos 			zone->tcp_conn = -1;
   1019      1.1  christos 			xfrd_set_refresh_now(zone);
   1020      1.1  christos 			/* try to start the next zone (if any) */
   1021      1.1  christos 			continue;
   1022      1.1  christos 		}
   1023      1.1  christos 		/* re-init this tcppipe */
   1024      1.1  christos 		/* ip and ip_len set by tcp_open */
   1025      1.1  christos 		tp->node.key = tp;
   1026      1.1  christos 		tp->num_unused = ID_PIPE_NUM;
   1027      1.1  christos 		tp->num_skip = 0;
   1028      1.1  christos 		tp->tcp_send_first = NULL;
   1029      1.1  christos 		tp->tcp_send_last = NULL;
   1030      1.1  christos 		memset(tp->id, 0, sizeof(tp->id));
   1031      1.1  christos 		for(i=0; i<ID_PIPE_NUM; i++) {
   1032      1.1  christos 			tp->unused[i] = i;
   1033      1.1  christos 		}
   1034      1.1  christos 
   1035      1.1  christos 		/* insert into tree */
   1036      1.1  christos 		(void)rbtree_insert(set->pipetree, &tp->node);
   1037      1.1  christos 		/* setup write */
   1038      1.1  christos 		xfrd_unset_timer(zone);
   1039      1.1  christos 		pipeline_setup_new_zone(set, tp, zone);
   1040      1.1  christos 		/* started a task, no need for cleanups, so return */
   1041      1.1  christos 		return;
   1042      1.1  christos 	}
   1043      1.1  christos 	/* no task to start, cleanup */
   1044      1.1  christos 	assert(!set->tcp_waiting_first);
   1045      1.1  christos 	set->tcp_count --;
   1046      1.1  christos 	assert(set->tcp_count >= 0);
   1047      1.1  christos }
   1048      1.1  christos 
   1049