Home | History | Annotate | Line # | Download | only in dist
xfrd-tcp.c revision 1.1
      1  1.1  christos /*
      2  1.1  christos  * xfrd-tcp.c - XFR (transfer) Daemon TCP system source file. Manages tcp conn.
      3  1.1  christos  *
      4  1.1  christos  * Copyright (c) 2001-2006, NLnet Labs. All rights reserved.
      5  1.1  christos  *
      6  1.1  christos  * See LICENSE for the license.
      7  1.1  christos  *
      8  1.1  christos  */
      9  1.1  christos 
     10  1.1  christos #include "config.h"
     11  1.1  christos #include <assert.h>
     12  1.1  christos #include <errno.h>
     13  1.1  christos #include <fcntl.h>
     14  1.1  christos #include <unistd.h>
     15  1.1  christos #include <stdlib.h>
     16  1.1  christos #include "nsd.h"
     17  1.1  christos #include "xfrd-tcp.h"
     18  1.1  christos #include "buffer.h"
     19  1.1  christos #include "packet.h"
     20  1.1  christos #include "dname.h"
     21  1.1  christos #include "options.h"
     22  1.1  christos #include "namedb.h"
     23  1.1  christos #include "xfrd.h"
     24  1.1  christos #include "xfrd-disk.h"
     25  1.1  christos #include "util.h"
     26  1.1  christos 
     27  1.1  christos /* sort tcppipe, first on IP address, for an IPaddresss, sort on num_unused */
     28  1.1  christos static int
     29  1.1  christos xfrd_pipe_cmp(const void* a, const void* b)
     30  1.1  christos {
     31  1.1  christos 	const struct xfrd_tcp_pipeline* x = (struct xfrd_tcp_pipeline*)a;
     32  1.1  christos 	const struct xfrd_tcp_pipeline* y = (struct xfrd_tcp_pipeline*)b;
     33  1.1  christos 	int r;
     34  1.1  christos 	if(x == y)
     35  1.1  christos 		return 0;
     36  1.1  christos 	if(y->ip_len != x->ip_len)
     37  1.1  christos 		/* subtraction works because nonnegative and small numbers */
     38  1.1  christos 		return (int)y->ip_len - (int)x->ip_len;
     39  1.1  christos 	r = memcmp(&x->ip, &y->ip, x->ip_len);
     40  1.1  christos 	if(r != 0)
     41  1.1  christos 		return r;
     42  1.1  christos 	/* sort that num_unused is sorted ascending, */
     43  1.1  christos 	if(x->num_unused != y->num_unused) {
     44  1.1  christos 		return (x->num_unused < y->num_unused) ? -1 : 1;
     45  1.1  christos 	}
     46  1.1  christos 	/* different pipelines are different still, even with same numunused*/
     47  1.1  christos 	return (uintptr_t)x < (uintptr_t)y ? -1 : 1;
     48  1.1  christos }
     49  1.1  christos 
     50  1.1  christos xfrd_tcp_set_t* xfrd_tcp_set_create(struct region* region)
     51  1.1  christos {
     52  1.1  christos 	int i;
     53  1.1  christos 	xfrd_tcp_set_t* tcp_set = region_alloc(region, sizeof(xfrd_tcp_set_t));
     54  1.1  christos 	memset(tcp_set, 0, sizeof(xfrd_tcp_set_t));
     55  1.1  christos 	tcp_set->tcp_count = 0;
     56  1.1  christos 	tcp_set->tcp_waiting_first = 0;
     57  1.1  christos 	tcp_set->tcp_waiting_last = 0;
     58  1.1  christos 	for(i=0; i<XFRD_MAX_TCP; i++)
     59  1.1  christos 		tcp_set->tcp_state[i] = xfrd_tcp_pipeline_create(region);
     60  1.1  christos 	tcp_set->pipetree = rbtree_create(region, &xfrd_pipe_cmp);
     61  1.1  christos 	return tcp_set;
     62  1.1  christos }
     63  1.1  christos 
     64  1.1  christos struct xfrd_tcp_pipeline*
     65  1.1  christos xfrd_tcp_pipeline_create(region_type* region)
     66  1.1  christos {
     67  1.1  christos 	int i;
     68  1.1  christos 	struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)
     69  1.1  christos 		region_alloc_zero(region, sizeof(*tp));
     70  1.1  christos 	tp->num_unused = ID_PIPE_NUM;
     71  1.1  christos 	assert(sizeof(tp->unused)/sizeof(tp->unused[0]) == ID_PIPE_NUM);
     72  1.1  christos 	for(i=0; i<ID_PIPE_NUM; i++)
     73  1.1  christos 		tp->unused[i] = (uint16_t)i;
     74  1.1  christos 	tp->tcp_r = xfrd_tcp_create(region, QIOBUFSZ);
     75  1.1  christos 	tp->tcp_w = xfrd_tcp_create(region, 512);
     76  1.1  christos 	return tp;
     77  1.1  christos }
     78  1.1  christos 
     79  1.1  christos void
     80  1.1  christos xfrd_setup_packet(buffer_type* packet,
     81  1.1  christos 	uint16_t type, uint16_t klass, const dname_type* dname, uint16_t qid)
     82  1.1  christos {
     83  1.1  christos 	/* Set up the header */
     84  1.1  christos 	buffer_clear(packet);
     85  1.1  christos 	ID_SET(packet, qid);
     86  1.1  christos 	FLAGS_SET(packet, 0);
     87  1.1  christos 	OPCODE_SET(packet, OPCODE_QUERY);
     88  1.1  christos 	QDCOUNT_SET(packet, 1);
     89  1.1  christos 	ANCOUNT_SET(packet, 0);
     90  1.1  christos 	NSCOUNT_SET(packet, 0);
     91  1.1  christos 	ARCOUNT_SET(packet, 0);
     92  1.1  christos 	buffer_skip(packet, QHEADERSZ);
     93  1.1  christos 
     94  1.1  christos 	/* The question record. */
     95  1.1  christos 	buffer_write(packet, dname_name(dname), dname->name_size);
     96  1.1  christos 	buffer_write_u16(packet, type);
     97  1.1  christos 	buffer_write_u16(packet, klass);
     98  1.1  christos }
     99  1.1  christos 
    100  1.1  christos static socklen_t
    101  1.1  christos #ifdef INET6
    102  1.1  christos xfrd_acl_sockaddr(acl_options_t* acl, unsigned int port,
    103  1.1  christos 	struct sockaddr_storage *sck)
    104  1.1  christos #else
    105  1.1  christos xfrd_acl_sockaddr(acl_options_t* acl, unsigned int port,
    106  1.1  christos 	struct sockaddr_in *sck, const char* fromto)
    107  1.1  christos #endif /* INET6 */
    108  1.1  christos {
    109  1.1  christos 	/* setup address structure */
    110  1.1  christos #ifdef INET6
    111  1.1  christos 	memset(sck, 0, sizeof(struct sockaddr_storage));
    112  1.1  christos #else
    113  1.1  christos 	memset(sck, 0, sizeof(struct sockaddr_in));
    114  1.1  christos #endif
    115  1.1  christos 	if(acl->is_ipv6) {
    116  1.1  christos #ifdef INET6
    117  1.1  christos 		struct sockaddr_in6* sa = (struct sockaddr_in6*)sck;
    118  1.1  christos 		sa->sin6_family = AF_INET6;
    119  1.1  christos 		sa->sin6_port = htons(port);
    120  1.1  christos 		sa->sin6_addr = acl->addr.addr6;
    121  1.1  christos 		return sizeof(struct sockaddr_in6);
    122  1.1  christos #else
    123  1.1  christos 		log_msg(LOG_ERR, "xfrd: IPv6 connection %s %s attempted but no \
    124  1.1  christos INET6.", fromto, acl->ip_address_spec);
    125  1.1  christos 		return 0;
    126  1.1  christos #endif
    127  1.1  christos 	} else {
    128  1.1  christos 		struct sockaddr_in* sa = (struct sockaddr_in*)sck;
    129  1.1  christos 		sa->sin_family = AF_INET;
    130  1.1  christos 		sa->sin_port = htons(port);
    131  1.1  christos 		sa->sin_addr = acl->addr.addr;
    132  1.1  christos 		return sizeof(struct sockaddr_in);
    133  1.1  christos 	}
    134  1.1  christos }
    135  1.1  christos 
    136  1.1  christos socklen_t
    137  1.1  christos #ifdef INET6
    138  1.1  christos xfrd_acl_sockaddr_to(acl_options_t* acl, struct sockaddr_storage *to)
    139  1.1  christos #else
    140  1.1  christos xfrd_acl_sockaddr_to(acl_options_t* acl, struct sockaddr_in *to)
    141  1.1  christos #endif /* INET6 */
    142  1.1  christos {
    143  1.1  christos 	unsigned int port = acl->port?acl->port:(unsigned)atoi(TCP_PORT);
    144  1.1  christos #ifdef INET6
    145  1.1  christos 	return xfrd_acl_sockaddr(acl, port, to);
    146  1.1  christos #else
    147  1.1  christos 	return xfrd_acl_sockaddr(acl, port, to, "to");
    148  1.1  christos #endif /* INET6 */
    149  1.1  christos }
    150  1.1  christos 
    151  1.1  christos socklen_t
    152  1.1  christos #ifdef INET6
    153  1.1  christos xfrd_acl_sockaddr_frm(acl_options_t* acl, struct sockaddr_storage *frm)
    154  1.1  christos #else
    155  1.1  christos xfrd_acl_sockaddr_frm(acl_options_t* acl, struct sockaddr_in *frm)
    156  1.1  christos #endif /* INET6 */
    157  1.1  christos {
    158  1.1  christos 	unsigned int port = acl->port?acl->port:0;
    159  1.1  christos #ifdef INET6
    160  1.1  christos 	return xfrd_acl_sockaddr(acl, port, frm);
    161  1.1  christos #else
    162  1.1  christos 	return xfrd_acl_sockaddr(acl, port, frm, "from");
    163  1.1  christos #endif /* INET6 */
    164  1.1  christos }
    165  1.1  christos 
    166  1.1  christos void
    167  1.1  christos xfrd_write_soa_buffer(struct buffer* packet,
    168  1.1  christos 	const dname_type* apex, struct xfrd_soa* soa)
    169  1.1  christos {
    170  1.1  christos 	size_t rdlength_pos;
    171  1.1  christos 	uint16_t rdlength;
    172  1.1  christos 	buffer_write(packet, dname_name(apex), apex->name_size);
    173  1.1  christos 
    174  1.1  christos 	/* already in network order */
    175  1.1  christos 	buffer_write(packet, &soa->type, sizeof(soa->type));
    176  1.1  christos 	buffer_write(packet, &soa->klass, sizeof(soa->klass));
    177  1.1  christos 	buffer_write(packet, &soa->ttl, sizeof(soa->ttl));
    178  1.1  christos 	rdlength_pos = buffer_position(packet);
    179  1.1  christos 	buffer_skip(packet, sizeof(rdlength));
    180  1.1  christos 
    181  1.1  christos 	/* uncompressed dnames */
    182  1.1  christos 	buffer_write(packet, soa->prim_ns+1, soa->prim_ns[0]);
    183  1.1  christos 	buffer_write(packet, soa->email+1, soa->email[0]);
    184  1.1  christos 
    185  1.1  christos 	buffer_write(packet, &soa->serial, sizeof(uint32_t));
    186  1.1  christos 	buffer_write(packet, &soa->refresh, sizeof(uint32_t));
    187  1.1  christos 	buffer_write(packet, &soa->retry, sizeof(uint32_t));
    188  1.1  christos 	buffer_write(packet, &soa->expire, sizeof(uint32_t));
    189  1.1  christos 	buffer_write(packet, &soa->minimum, sizeof(uint32_t));
    190  1.1  christos 
    191  1.1  christos 	/* write length of RR */
    192  1.1  christos 	rdlength = buffer_position(packet) - rdlength_pos - sizeof(rdlength);
    193  1.1  christos 	buffer_write_u16_at(packet, rdlength_pos, rdlength);
    194  1.1  christos }
    195  1.1  christos 
    196  1.1  christos xfrd_tcp_t*
    197  1.1  christos xfrd_tcp_create(region_type* region, size_t bufsize)
    198  1.1  christos {
    199  1.1  christos 	xfrd_tcp_t* tcp_state = (xfrd_tcp_t*)region_alloc(
    200  1.1  christos 		region, sizeof(xfrd_tcp_t));
    201  1.1  christos 	memset(tcp_state, 0, sizeof(xfrd_tcp_t));
    202  1.1  christos 	tcp_state->packet = buffer_create(region, bufsize);
    203  1.1  christos 	tcp_state->fd = -1;
    204  1.1  christos 
    205  1.1  christos 	return tcp_state;
    206  1.1  christos }
    207  1.1  christos 
    208  1.1  christos static struct xfrd_tcp_pipeline*
    209  1.1  christos pipeline_find(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
    210  1.1  christos {
    211  1.1  christos 	rbnode_t* sme = NULL;
    212  1.1  christos 	struct xfrd_tcp_pipeline* r;
    213  1.1  christos 	/* smaller buf than a full pipeline with 64kb ID array, only need
    214  1.1  christos 	 * the front part with the key info, this front part contains the
    215  1.1  christos 	 * members that the compare function uses. */
    216  1.1  christos 	const size_t keysize = sizeof(struct xfrd_tcp_pipeline) -
    217  1.1  christos 		ID_PIPE_NUM*(sizeof(struct xfrd_zone*) + sizeof(uint16_t));
    218  1.1  christos 	/* void* type for alignment of the struct,
    219  1.1  christos 	 * divide the keysize by ptr-size and then add one to round up */
    220  1.1  christos 	void* buf[ (keysize / sizeof(void*)) + 1 ];
    221  1.1  christos 	struct xfrd_tcp_pipeline* key = (struct xfrd_tcp_pipeline*)buf;
    222  1.1  christos 	key->node.key = key;
    223  1.1  christos 	key->ip_len = xfrd_acl_sockaddr_to(zone->master, &key->ip);
    224  1.1  christos 	key->num_unused = ID_PIPE_NUM;
    225  1.1  christos 	/* lookup existing tcp transfer to the master with highest unused */
    226  1.1  christos 	if(rbtree_find_less_equal(set->pipetree, key, &sme)) {
    227  1.1  christos 		/* exact match, strange, fully unused tcp cannot be open */
    228  1.1  christos 		assert(0);
    229  1.1  christos 	}
    230  1.1  christos 	if(!sme)
    231  1.1  christos 		return NULL;
    232  1.1  christos 	r = (struct xfrd_tcp_pipeline*)sme->key;
    233  1.1  christos 	/* <= key pointed at, is the master correct ? */
    234  1.1  christos 	if(r->ip_len != key->ip_len)
    235  1.1  christos 		return NULL;
    236  1.1  christos 	if(memcmp(&r->ip, &key->ip, key->ip_len) != 0)
    237  1.1  christos 		return NULL;
    238  1.1  christos 	/* correct master, is there a slot free for this transfer? */
    239  1.1  christos 	if(r->num_unused == 0)
    240  1.1  christos 		return NULL;
    241  1.1  christos 	return r;
    242  1.1  christos }
    243  1.1  christos 
    244  1.1  christos /* remove zone from tcp waiting list */
    245  1.1  christos static void
    246  1.1  christos tcp_zone_waiting_list_popfirst(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
    247  1.1  christos {
    248  1.1  christos 	assert(zone->tcp_waiting);
    249  1.1  christos 	set->tcp_waiting_first = zone->tcp_waiting_next;
    250  1.1  christos 	if(zone->tcp_waiting_next)
    251  1.1  christos 		zone->tcp_waiting_next->tcp_waiting_prev = NULL;
    252  1.1  christos 	else	set->tcp_waiting_last = 0;
    253  1.1  christos 	zone->tcp_waiting_next = 0;
    254  1.1  christos 	zone->tcp_waiting = 0;
    255  1.1  christos }
    256  1.1  christos 
    257  1.1  christos /* remove zone from tcp pipe write-wait list */
    258  1.1  christos static void
    259  1.1  christos tcp_pipe_sendlist_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
    260  1.1  christos {
    261  1.1  christos 	if(zone->in_tcp_send) {
    262  1.1  christos 		if(zone->tcp_send_prev)
    263  1.1  christos 			zone->tcp_send_prev->tcp_send_next=zone->tcp_send_next;
    264  1.1  christos 		else	tp->tcp_send_first=zone->tcp_send_next;
    265  1.1  christos 		if(zone->tcp_send_next)
    266  1.1  christos 			zone->tcp_send_next->tcp_send_prev=zone->tcp_send_prev;
    267  1.1  christos 		else	tp->tcp_send_last=zone->tcp_send_prev;
    268  1.1  christos 		zone->in_tcp_send = 0;
    269  1.1  christos 	}
    270  1.1  christos }
    271  1.1  christos 
    272  1.1  christos /* remove first from write-wait list */
    273  1.1  christos static void
    274  1.1  christos tcp_pipe_sendlist_popfirst(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
    275  1.1  christos {
    276  1.1  christos 	tp->tcp_send_first = zone->tcp_send_next;
    277  1.1  christos 	if(tp->tcp_send_first)
    278  1.1  christos 		tp->tcp_send_first->tcp_send_prev = NULL;
    279  1.1  christos 	else	tp->tcp_send_last = NULL;
    280  1.1  christos 	zone->in_tcp_send = 0;
    281  1.1  christos }
    282  1.1  christos 
    283  1.1  christos /* remove zone from tcp pipe ID map */
    284  1.1  christos static void
    285  1.1  christos tcp_pipe_id_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
    286  1.1  christos {
    287  1.1  christos 	assert(tp->num_unused < ID_PIPE_NUM && tp->num_unused >= 0);
    288  1.1  christos 	assert(tp->id[zone->query_id] == zone);
    289  1.1  christos 	tp->id[zone->query_id] = NULL;
    290  1.1  christos 	tp->unused[tp->num_unused] = zone->query_id;
    291  1.1  christos 	/* must remove and re-add for sort order in tree */
    292  1.1  christos 	(void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
    293  1.1  christos 	tp->num_unused++;
    294  1.1  christos 	(void)rbtree_insert(xfrd->tcp_set->pipetree, &tp->node);
    295  1.1  christos }
    296  1.1  christos 
    297  1.1  christos /* stop the tcp pipe (and all its zones need to retry) */
    298  1.1  christos static void
    299  1.1  christos xfrd_tcp_pipe_stop(struct xfrd_tcp_pipeline* tp)
    300  1.1  christos {
    301  1.1  christos 	int i, conn = -1;
    302  1.1  christos 	assert(tp->num_unused < ID_PIPE_NUM); /* at least one 'in-use' */
    303  1.1  christos 	assert(ID_PIPE_NUM - tp->num_unused > tp->num_skip); /* at least one 'nonskip' */
    304  1.1  christos 	/* need to retry for all the zones connected to it */
    305  1.1  christos 	/* these could use different lists and go to a different nextmaster*/
    306  1.1  christos 	for(i=0; i<ID_PIPE_NUM; i++) {
    307  1.1  christos 		if(tp->id[i] && tp->id[i] != TCP_NULL_SKIP) {
    308  1.1  christos 			xfrd_zone_t* zone = tp->id[i];
    309  1.1  christos 			conn = zone->tcp_conn;
    310  1.1  christos 			zone->tcp_conn = -1;
    311  1.1  christos 			zone->tcp_waiting = 0;
    312  1.1  christos 			tcp_pipe_sendlist_remove(tp, zone);
    313  1.1  christos 			tcp_pipe_id_remove(tp, zone);
    314  1.1  christos 			xfrd_set_refresh_now(zone);
    315  1.1  christos 		}
    316  1.1  christos 	}
    317  1.1  christos 	assert(conn != -1);
    318  1.1  christos 	/* now release the entire tcp pipe */
    319  1.1  christos 	xfrd_tcp_pipe_release(xfrd->tcp_set, tp, conn);
    320  1.1  christos }
    321  1.1  christos 
    322  1.1  christos static void
    323  1.1  christos tcp_pipe_reset_timeout(struct xfrd_tcp_pipeline* tp)
    324  1.1  christos {
    325  1.1  christos 	int fd = tp->handler.ev_fd;
    326  1.1  christos 	struct timeval tv;
    327  1.1  christos 	tv.tv_sec = xfrd->tcp_set->tcp_timeout;
    328  1.1  christos 	tv.tv_usec = 0;
    329  1.1  christos 	if(tp->handler_added)
    330  1.1  christos 		event_del(&tp->handler);
    331  1.1  christos 	event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|
    332  1.1  christos 		(tp->tcp_send_first?EV_WRITE:0), xfrd_handle_tcp_pipe, tp);
    333  1.1  christos 	if(event_base_set(xfrd->event_base, &tp->handler) != 0)
    334  1.1  christos 		log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
    335  1.1  christos 	if(event_add(&tp->handler, &tv) != 0)
    336  1.1  christos 		log_msg(LOG_ERR, "xfrd tcp: event_add failed");
    337  1.1  christos 	tp->handler_added = 1;
    338  1.1  christos }
    339  1.1  christos 
    340  1.1  christos /* handle event from fd of tcp pipe */
    341  1.1  christos void
    342  1.1  christos xfrd_handle_tcp_pipe(int ATTR_UNUSED(fd), short event, void* arg)
    343  1.1  christos {
    344  1.1  christos 	struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)arg;
    345  1.1  christos 	if((event & EV_WRITE)) {
    346  1.1  christos 		tcp_pipe_reset_timeout(tp);
    347  1.1  christos 		if(tp->tcp_send_first) {
    348  1.1  christos 			DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp write, zone %s",
    349  1.1  christos 				tp->tcp_send_first->apex_str));
    350  1.1  christos 			xfrd_tcp_write(tp, tp->tcp_send_first);
    351  1.1  christos 		}
    352  1.1  christos 	}
    353  1.1  christos 	if((event & EV_READ) && tp->handler_added) {
    354  1.1  christos 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp read"));
    355  1.1  christos 		tcp_pipe_reset_timeout(tp);
    356  1.1  christos 		xfrd_tcp_read(tp);
    357  1.1  christos 	}
    358  1.1  christos 	if((event & EV_TIMEOUT) && tp->handler_added) {
    359  1.1  christos 		/* tcp connection timed out */
    360  1.1  christos 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp timeout"));
    361  1.1  christos 		xfrd_tcp_pipe_stop(tp);
    362  1.1  christos 	}
    363  1.1  christos }
    364  1.1  christos 
    365  1.1  christos /* add a zone to the pipeline, it starts to want to write its query */
    366  1.1  christos static void
    367  1.1  christos pipeline_setup_new_zone(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp,
    368  1.1  christos 	xfrd_zone_t* zone)
    369  1.1  christos {
    370  1.1  christos 	/* assign the ID */
    371  1.1  christos 	int idx;
    372  1.1  christos 	assert(tp->num_unused > 0);
    373  1.1  christos 	/* we pick a random ID, even though it is TCP anyway */
    374  1.1  christos 	idx = random_generate(tp->num_unused);
    375  1.1  christos 	zone->query_id = tp->unused[idx];
    376  1.1  christos 	tp->unused[idx] = tp->unused[tp->num_unused-1];
    377  1.1  christos 	tp->id[zone->query_id] = zone;
    378  1.1  christos 	/* decrement unused counter, and fixup tree */
    379  1.1  christos 	(void)rbtree_delete(set->pipetree, &tp->node);
    380  1.1  christos 	tp->num_unused--;
    381  1.1  christos 	(void)rbtree_insert(set->pipetree, &tp->node);
    382  1.1  christos 
    383  1.1  christos 	/* add to sendlist, at end */
    384  1.1  christos 	zone->tcp_send_next = NULL;
    385  1.1  christos 	zone->tcp_send_prev = tp->tcp_send_last;
    386  1.1  christos 	zone->in_tcp_send = 1;
    387  1.1  christos 	if(tp->tcp_send_last)
    388  1.1  christos 		tp->tcp_send_last->tcp_send_next = zone;
    389  1.1  christos 	else	tp->tcp_send_first = zone;
    390  1.1  christos 	tp->tcp_send_last = zone;
    391  1.1  christos 
    392  1.1  christos 	/* is it first in line? */
    393  1.1  christos 	if(tp->tcp_send_first == zone) {
    394  1.1  christos 		xfrd_tcp_setup_write_packet(tp, zone);
    395  1.1  christos 		/* add write to event handler */
    396  1.1  christos 		tcp_pipe_reset_timeout(tp);
    397  1.1  christos 	}
    398  1.1  christos }
    399  1.1  christos 
    400  1.1  christos void
    401  1.1  christos xfrd_tcp_obtain(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
    402  1.1  christos {
    403  1.1  christos 	struct xfrd_tcp_pipeline* tp;
    404  1.1  christos 	assert(zone->tcp_conn == -1);
    405  1.1  christos 	assert(zone->tcp_waiting == 0);
    406  1.1  christos 
    407  1.1  christos 	if(set->tcp_count < XFRD_MAX_TCP) {
    408  1.1  christos 		int i;
    409  1.1  christos 		assert(!set->tcp_waiting_first);
    410  1.1  christos 		set->tcp_count ++;
    411  1.1  christos 		/* find a free tcp_buffer */
    412  1.1  christos 		for(i=0; i<XFRD_MAX_TCP; i++) {
    413  1.1  christos 			if(set->tcp_state[i]->tcp_r->fd == -1) {
    414  1.1  christos 				zone->tcp_conn = i;
    415  1.1  christos 				break;
    416  1.1  christos 			}
    417  1.1  christos 		}
    418  1.1  christos 		/** What if there is no free tcp_buffer? return; */
    419  1.1  christos 		if (zone->tcp_conn < 0) {
    420  1.1  christos 			return;
    421  1.1  christos 		}
    422  1.1  christos 
    423  1.1  christos 		tp = set->tcp_state[zone->tcp_conn];
    424  1.1  christos 		zone->tcp_waiting = 0;
    425  1.1  christos 
    426  1.1  christos 		/* stop udp use (if any) */
    427  1.1  christos 		if(zone->zone_handler.ev_fd != -1)
    428  1.1  christos 			xfrd_udp_release(zone);
    429  1.1  christos 
    430  1.1  christos 		if(!xfrd_tcp_open(set, tp, zone)) {
    431  1.1  christos 			zone->tcp_conn = -1;
    432  1.1  christos 			set->tcp_count --;
    433  1.1  christos 			xfrd_set_refresh_now(zone);
    434  1.1  christos 			return;
    435  1.1  christos 		}
    436  1.1  christos 		/* ip and ip_len set by tcp_open */
    437  1.1  christos 		tp->node.key = tp;
    438  1.1  christos 		tp->num_unused = ID_PIPE_NUM;
    439  1.1  christos 		tp->num_skip = 0;
    440  1.1  christos 		tp->tcp_send_first = NULL;
    441  1.1  christos 		tp->tcp_send_last = NULL;
    442  1.1  christos 		memset(tp->id, 0, sizeof(tp->id));
    443  1.1  christos 		for(i=0; i<ID_PIPE_NUM; i++) {
    444  1.1  christos 			tp->unused[i] = i;
    445  1.1  christos 		}
    446  1.1  christos 
    447  1.1  christos 		/* insert into tree */
    448  1.1  christos 		(void)rbtree_insert(set->pipetree, &tp->node);
    449  1.1  christos 		xfrd_deactivate_zone(zone);
    450  1.1  christos 		xfrd_unset_timer(zone);
    451  1.1  christos 		pipeline_setup_new_zone(set, tp, zone);
    452  1.1  christos 		return;
    453  1.1  christos 	}
    454  1.1  christos 	/* check for a pipeline to the same master with unused ID */
    455  1.1  christos 	if((tp = pipeline_find(set, zone))!= NULL) {
    456  1.1  christos 		int i;
    457  1.1  christos 		if(zone->zone_handler.ev_fd != -1)
    458  1.1  christos 			xfrd_udp_release(zone);
    459  1.1  christos 		for(i=0; i<XFRD_MAX_TCP; i++) {
    460  1.1  christos 			if(set->tcp_state[i] == tp)
    461  1.1  christos 				zone->tcp_conn = i;
    462  1.1  christos 		}
    463  1.1  christos 		xfrd_deactivate_zone(zone);
    464  1.1  christos 		xfrd_unset_timer(zone);
    465  1.1  christos 		pipeline_setup_new_zone(set, tp, zone);
    466  1.1  christos 		return;
    467  1.1  christos 	}
    468  1.1  christos 
    469  1.1  christos 	/* wait, at end of line */
    470  1.1  christos 	DEBUG(DEBUG_XFRD,2, (LOG_INFO, "xfrd: max number of tcp "
    471  1.1  christos 		"connections (%d) reached.", XFRD_MAX_TCP));
    472  1.1  christos 	zone->tcp_waiting_next = 0;
    473  1.1  christos 	zone->tcp_waiting_prev = set->tcp_waiting_last;
    474  1.1  christos 	zone->tcp_waiting = 1;
    475  1.1  christos 	if(!set->tcp_waiting_last) {
    476  1.1  christos 		set->tcp_waiting_first = zone;
    477  1.1  christos 		set->tcp_waiting_last = zone;
    478  1.1  christos 	} else {
    479  1.1  christos 		set->tcp_waiting_last->tcp_waiting_next = zone;
    480  1.1  christos 		set->tcp_waiting_last = zone;
    481  1.1  christos 	}
    482  1.1  christos 	xfrd_deactivate_zone(zone);
    483  1.1  christos 	xfrd_unset_timer(zone);
    484  1.1  christos }
    485  1.1  christos 
    486  1.1  christos int
    487  1.1  christos xfrd_tcp_open(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp,
    488  1.1  christos 	xfrd_zone_t* zone)
    489  1.1  christos {
    490  1.1  christos 	int fd, family, conn;
    491  1.1  christos 	struct timeval tv;
    492  1.1  christos 	assert(zone->tcp_conn != -1);
    493  1.1  christos 
    494  1.1  christos 	/* if there is no next master, fallback to use the first one */
    495  1.1  christos 	/* but there really should be a master set */
    496  1.1  christos 	if(!zone->master) {
    497  1.1  christos 		zone->master = zone->zone_options->pattern->request_xfr;
    498  1.1  christos 		zone->master_num = 0;
    499  1.1  christos 	}
    500  1.1  christos 
    501  1.1  christos 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s open tcp conn to %s",
    502  1.1  christos 		zone->apex_str, zone->master->ip_address_spec));
    503  1.1  christos 	tp->tcp_r->is_reading = 1;
    504  1.1  christos 	tp->tcp_r->total_bytes = 0;
    505  1.1  christos 	tp->tcp_r->msglen = 0;
    506  1.1  christos 	buffer_clear(tp->tcp_r->packet);
    507  1.1  christos 	tp->tcp_w->is_reading = 0;
    508  1.1  christos 	tp->tcp_w->total_bytes = 0;
    509  1.1  christos 	tp->tcp_w->msglen = 0;
    510  1.1  christos 	tp->connection_established = 0;
    511  1.1  christos 
    512  1.1  christos 	if(zone->master->is_ipv6) {
    513  1.1  christos #ifdef INET6
    514  1.1  christos 		family = PF_INET6;
    515  1.1  christos #else
    516  1.1  christos 		xfrd_set_refresh_now(zone);
    517  1.1  christos 		return 0;
    518  1.1  christos #endif
    519  1.1  christos 	} else {
    520  1.1  christos 		family = PF_INET;
    521  1.1  christos 	}
    522  1.1  christos 	fd = socket(family, SOCK_STREAM, IPPROTO_TCP);
    523  1.1  christos 	if(fd == -1) {
    524  1.1  christos 		log_msg(LOG_ERR, "xfrd: %s cannot create tcp socket: %s",
    525  1.1  christos 			zone->master->ip_address_spec, strerror(errno));
    526  1.1  christos 		xfrd_set_refresh_now(zone);
    527  1.1  christos 		return 0;
    528  1.1  christos 	}
    529  1.1  christos 	if(fcntl(fd, F_SETFL, O_NONBLOCK) == -1) {
    530  1.1  christos 		log_msg(LOG_ERR, "xfrd: fcntl failed: %s", strerror(errno));
    531  1.1  christos 		close(fd);
    532  1.1  christos 		xfrd_set_refresh_now(zone);
    533  1.1  christos 		return 0;
    534  1.1  christos 	}
    535  1.1  christos 
    536  1.1  christos 	if(xfrd->nsd->outgoing_tcp_mss > 0) {
    537  1.1  christos #if defined(IPPROTO_TCP) && defined(TCP_MAXSEG)
    538  1.1  christos 		if(setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG,
    539  1.1  christos 			(void*)&xfrd->nsd->outgoing_tcp_mss,
    540  1.1  christos 			sizeof(xfrd->nsd->outgoing_tcp_mss)) < 0) {
    541  1.1  christos 			log_msg(LOG_ERR, "xfrd: setsockopt(TCP_MAXSEG)"
    542  1.1  christos 					"failed: %s", strerror(errno));
    543  1.1  christos 		}
    544  1.1  christos #else
    545  1.1  christos 		log_msg(LOG_ERR, "setsockopt(TCP_MAXSEG) unsupported");
    546  1.1  christos #endif
    547  1.1  christos 	}
    548  1.1  christos 
    549  1.1  christos 	tp->ip_len = xfrd_acl_sockaddr_to(zone->master, &tp->ip);
    550  1.1  christos 
    551  1.1  christos 	/* bind it */
    552  1.1  christos 	if (!xfrd_bind_local_interface(fd, zone->zone_options->pattern->
    553  1.1  christos 		outgoing_interface, zone->master, 1)) {
    554  1.1  christos 		close(fd);
    555  1.1  christos 		xfrd_set_refresh_now(zone);
    556  1.1  christos 		return 0;
    557  1.1  christos         }
    558  1.1  christos 
    559  1.1  christos 	conn = connect(fd, (struct sockaddr*)&tp->ip, tp->ip_len);
    560  1.1  christos 	if (conn == -1 && errno != EINPROGRESS) {
    561  1.1  christos 		log_msg(LOG_ERR, "xfrd: connect %s failed: %s",
    562  1.1  christos 			zone->master->ip_address_spec, strerror(errno));
    563  1.1  christos 		close(fd);
    564  1.1  christos 		xfrd_set_refresh_now(zone);
    565  1.1  christos 		return 0;
    566  1.1  christos 	}
    567  1.1  christos 	tp->tcp_r->fd = fd;
    568  1.1  christos 	tp->tcp_w->fd = fd;
    569  1.1  christos 
    570  1.1  christos 	/* set the tcp pipe event */
    571  1.1  christos 	if(tp->handler_added)
    572  1.1  christos 		event_del(&tp->handler);
    573  1.1  christos 	event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|EV_WRITE,
    574  1.1  christos 		xfrd_handle_tcp_pipe, tp);
    575  1.1  christos 	if(event_base_set(xfrd->event_base, &tp->handler) != 0)
    576  1.1  christos 		log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
    577  1.1  christos 	tv.tv_sec = set->tcp_timeout;
    578  1.1  christos 	tv.tv_usec = 0;
    579  1.1  christos 	if(event_add(&tp->handler, &tv) != 0)
    580  1.1  christos 		log_msg(LOG_ERR, "xfrd tcp: event_add failed");
    581  1.1  christos 	tp->handler_added = 1;
    582  1.1  christos 	return 1;
    583  1.1  christos }
    584  1.1  christos 
    585  1.1  christos void
    586  1.1  christos xfrd_tcp_setup_write_packet(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
    587  1.1  christos {
    588  1.1  christos 	xfrd_tcp_t* tcp = tp->tcp_w;
    589  1.1  christos 	assert(zone->tcp_conn != -1);
    590  1.1  christos 	assert(zone->tcp_waiting == 0);
    591  1.1  christos 	/* start AXFR or IXFR for the zone */
    592  1.1  christos 	if(zone->soa_disk_acquired == 0 || zone->master->use_axfr_only ||
    593  1.1  christos 		zone->master->ixfr_disabled ||
    594  1.1  christos 		/* if zone expired, after the first round, do not ask for
    595  1.1  christos 		 * IXFR any more, but full AXFR (of any serial number) */
    596  1.1  christos 		(zone->state == xfrd_zone_expired && zone->round_num != 0)) {
    597  1.1  christos 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request full zone transfer "
    598  1.1  christos 						"(AXFR) for %s to %s",
    599  1.1  christos 			zone->apex_str, zone->master->ip_address_spec));
    600  1.1  christos 
    601  1.1  christos 		xfrd_setup_packet(tcp->packet, TYPE_AXFR, CLASS_IN, zone->apex,
    602  1.1  christos 			zone->query_id);
    603  1.1  christos 	} else {
    604  1.1  christos 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request incremental zone "
    605  1.1  christos 						"transfer (IXFR) for %s to %s",
    606  1.1  christos 			zone->apex_str, zone->master->ip_address_spec));
    607  1.1  christos 
    608  1.1  christos 		xfrd_setup_packet(tcp->packet, TYPE_IXFR, CLASS_IN, zone->apex,
    609  1.1  christos 			zone->query_id);
    610  1.1  christos         	NSCOUNT_SET(tcp->packet, 1);
    611  1.1  christos 		xfrd_write_soa_buffer(tcp->packet, zone->apex, &zone->soa_disk);
    612  1.1  christos 	}
    613  1.1  christos 	/* old transfer needs to be removed still? */
    614  1.1  christos 	if(zone->msg_seq_nr)
    615  1.1  christos 		xfrd_unlink_xfrfile(xfrd->nsd, zone->xfrfilenumber);
    616  1.1  christos 	zone->msg_seq_nr = 0;
    617  1.1  christos 	zone->msg_rr_count = 0;
    618  1.1  christos 	if(zone->master->key_options && zone->master->key_options->tsig_key) {
    619  1.1  christos 		xfrd_tsig_sign_request(tcp->packet, &zone->tsig, zone->master);
    620  1.1  christos 	}
    621  1.1  christos 	buffer_flip(tcp->packet);
    622  1.1  christos 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "sent tcp query with ID %d", zone->query_id));
    623  1.1  christos 	tcp->msglen = buffer_limit(tcp->packet);
    624  1.1  christos 	tcp->total_bytes = 0;
    625  1.1  christos }
    626  1.1  christos 
    627  1.1  christos static void
    628  1.1  christos tcp_conn_ready_for_reading(xfrd_tcp_t* tcp)
    629  1.1  christos {
    630  1.1  christos 	tcp->total_bytes = 0;
    631  1.1  christos 	tcp->msglen = 0;
    632  1.1  christos 	buffer_clear(tcp->packet);
    633  1.1  christos }
    634  1.1  christos 
    635  1.1  christos int conn_write(xfrd_tcp_t* tcp)
    636  1.1  christos {
    637  1.1  christos 	ssize_t sent;
    638  1.1  christos 
    639  1.1  christos 	if(tcp->total_bytes < sizeof(tcp->msglen)) {
    640  1.1  christos 		uint16_t sendlen = htons(tcp->msglen);
    641  1.1  christos 		sent = write(tcp->fd,
    642  1.1  christos 			(const char*)&sendlen + tcp->total_bytes,
    643  1.1  christos 			sizeof(tcp->msglen) - tcp->total_bytes);
    644  1.1  christos 
    645  1.1  christos 		if(sent == -1) {
    646  1.1  christos 			if(errno == EAGAIN || errno == EINTR) {
    647  1.1  christos 				/* write would block, try later */
    648  1.1  christos 				return 0;
    649  1.1  christos 			} else {
    650  1.1  christos 				return -1;
    651  1.1  christos 			}
    652  1.1  christos 		}
    653  1.1  christos 
    654  1.1  christos 		tcp->total_bytes += sent;
    655  1.1  christos 		if(tcp->total_bytes < sizeof(tcp->msglen)) {
    656  1.1  christos 			/* incomplete write, resume later */
    657  1.1  christos 			return 0;
    658  1.1  christos 		}
    659  1.1  christos 		assert(tcp->total_bytes == sizeof(tcp->msglen));
    660  1.1  christos 	}
    661  1.1  christos 
    662  1.1  christos 	assert(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen));
    663  1.1  christos 
    664  1.1  christos 	sent = write(tcp->fd,
    665  1.1  christos 		buffer_current(tcp->packet),
    666  1.1  christos 		buffer_remaining(tcp->packet));
    667  1.1  christos 	if(sent == -1) {
    668  1.1  christos 		if(errno == EAGAIN || errno == EINTR) {
    669  1.1  christos 			/* write would block, try later */
    670  1.1  christos 			return 0;
    671  1.1  christos 		} else {
    672  1.1  christos 			return -1;
    673  1.1  christos 		}
    674  1.1  christos 	}
    675  1.1  christos 
    676  1.1  christos 	buffer_skip(tcp->packet, sent);
    677  1.1  christos 	tcp->total_bytes += sent;
    678  1.1  christos 
    679  1.1  christos 	if(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen)) {
    680  1.1  christos 		/* more to write when socket becomes writable again */
    681  1.1  christos 		return 0;
    682  1.1  christos 	}
    683  1.1  christos 
    684  1.1  christos 	assert(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen));
    685  1.1  christos 	return 1;
    686  1.1  christos }
    687  1.1  christos 
    688  1.1  christos void
    689  1.1  christos xfrd_tcp_write(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
    690  1.1  christos {
    691  1.1  christos 	int ret;
    692  1.1  christos 	xfrd_tcp_t* tcp = tp->tcp_w;
    693  1.1  christos 	assert(zone->tcp_conn != -1);
    694  1.1  christos 	assert(zone == tp->tcp_send_first);
    695  1.1  christos 	/* see if for non-established connection, there is a connect error */
    696  1.1  christos 	if(!tp->connection_established) {
    697  1.1  christos 		/* check for pending error from nonblocking connect */
    698  1.1  christos 		/* from Stevens, unix network programming, vol1, 3rd ed, p450 */
    699  1.1  christos 		int error = 0;
    700  1.1  christos 		socklen_t len = sizeof(error);
    701  1.1  christos 		if(getsockopt(tcp->fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0){
    702  1.1  christos 			error = errno; /* on solaris errno is error */
    703  1.1  christos 		}
    704  1.1  christos 		if(error == EINPROGRESS || error == EWOULDBLOCK)
    705  1.1  christos 			return; /* try again later */
    706  1.1  christos 		if(error != 0) {
    707  1.1  christos 			log_msg(LOG_ERR, "%s: Could not tcp connect to %s: %s",
    708  1.1  christos 				zone->apex_str, zone->master->ip_address_spec,
    709  1.1  christos 				strerror(error));
    710  1.1  christos 			xfrd_tcp_pipe_stop(tp);
    711  1.1  christos 			return;
    712  1.1  christos 		}
    713  1.1  christos 	}
    714  1.1  christos 	ret = conn_write(tcp);
    715  1.1  christos 	if(ret == -1) {
    716  1.1  christos 		log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
    717  1.1  christos 		xfrd_tcp_pipe_stop(tp);
    718  1.1  christos 		return;
    719  1.1  christos 	}
    720  1.1  christos 	if(tcp->total_bytes != 0 && !tp->connection_established)
    721  1.1  christos 		tp->connection_established = 1;
    722  1.1  christos 	if(ret == 0) {
    723  1.1  christos 		return; /* write again later */
    724  1.1  christos 	}
    725  1.1  christos 	/* done writing this message */
    726  1.1  christos 
    727  1.1  christos 	/* remove first zone from sendlist */
    728  1.1  christos 	tcp_pipe_sendlist_popfirst(tp, zone);
    729  1.1  christos 
    730  1.1  christos 	/* see if other zone wants to write; init; let it write (now) */
    731  1.1  christos 	/* and use a loop, because 64k stack calls is a too much */
    732  1.1  christos 	while(tp->tcp_send_first) {
    733  1.1  christos 		/* setup to write for this zone */
    734  1.1  christos 		xfrd_tcp_setup_write_packet(tp, tp->tcp_send_first);
    735  1.1  christos 		/* attempt to write for this zone (if success, continue loop)*/
    736  1.1  christos 		ret = conn_write(tcp);
    737  1.1  christos 		if(ret == -1) {
    738  1.1  christos 			log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
    739  1.1  christos 			xfrd_tcp_pipe_stop(tp);
    740  1.1  christos 			return;
    741  1.1  christos 		}
    742  1.1  christos 		if(ret == 0)
    743  1.1  christos 			return; /* write again later */
    744  1.1  christos 		tcp_pipe_sendlist_popfirst(tp, tp->tcp_send_first);
    745  1.1  christos 	}
    746  1.1  christos 
    747  1.1  christos 	/* if sendlist empty, remove WRITE from event */
    748  1.1  christos 
    749  1.1  christos 	/* listen to READ, and not WRITE events */
    750  1.1  christos 	assert(tp->tcp_send_first == NULL);
    751  1.1  christos 	tcp_pipe_reset_timeout(tp);
    752  1.1  christos }
    753  1.1  christos 
    754  1.1  christos int
    755  1.1  christos conn_read(xfrd_tcp_t* tcp)
    756  1.1  christos {
    757  1.1  christos 	ssize_t received;
    758  1.1  christos 	/* receive leading packet length bytes */
    759  1.1  christos 	if(tcp->total_bytes < sizeof(tcp->msglen)) {
    760  1.1  christos 		received = read(tcp->fd,
    761  1.1  christos 			(char*) &tcp->msglen + tcp->total_bytes,
    762  1.1  christos 			sizeof(tcp->msglen) - tcp->total_bytes);
    763  1.1  christos 		if(received == -1) {
    764  1.1  christos 			if(errno == EAGAIN || errno == EINTR) {
    765  1.1  christos 				/* read would block, try later */
    766  1.1  christos 				return 0;
    767  1.1  christos 			} else {
    768  1.1  christos #ifdef ECONNRESET
    769  1.1  christos 				if (verbosity >= 2 || errno != ECONNRESET)
    770  1.1  christos #endif /* ECONNRESET */
    771  1.1  christos 				log_msg(LOG_ERR, "tcp read sz: %s", strerror(errno));
    772  1.1  christos 				return -1;
    773  1.1  christos 			}
    774  1.1  christos 		} else if(received == 0) {
    775  1.1  christos 			/* EOF */
    776  1.1  christos 			return -1;
    777  1.1  christos 		}
    778  1.1  christos 		tcp->total_bytes += received;
    779  1.1  christos 		if(tcp->total_bytes < sizeof(tcp->msglen)) {
    780  1.1  christos 			/* not complete yet, try later */
    781  1.1  christos 			return 0;
    782  1.1  christos 		}
    783  1.1  christos 
    784  1.1  christos 		assert(tcp->total_bytes == sizeof(tcp->msglen));
    785  1.1  christos 		tcp->msglen = ntohs(tcp->msglen);
    786  1.1  christos 
    787  1.1  christos 		if(tcp->msglen == 0) {
    788  1.1  christos 			buffer_set_limit(tcp->packet, tcp->msglen);
    789  1.1  christos 			return 1;
    790  1.1  christos 		}
    791  1.1  christos 		if(tcp->msglen > buffer_capacity(tcp->packet)) {
    792  1.1  christos 			log_msg(LOG_ERR, "buffer too small, dropping connection");
    793  1.1  christos 			return 0;
    794  1.1  christos 		}
    795  1.1  christos 		buffer_set_limit(tcp->packet, tcp->msglen);
    796  1.1  christos 	}
    797  1.1  christos 
    798  1.1  christos 	assert(buffer_remaining(tcp->packet) > 0);
    799  1.1  christos 
    800  1.1  christos 	received = read(tcp->fd, buffer_current(tcp->packet),
    801  1.1  christos 		buffer_remaining(tcp->packet));
    802  1.1  christos 	if(received == -1) {
    803  1.1  christos 		if(errno == EAGAIN || errno == EINTR) {
    804  1.1  christos 			/* read would block, try later */
    805  1.1  christos 			return 0;
    806  1.1  christos 		} else {
    807  1.1  christos #ifdef ECONNRESET
    808  1.1  christos 			if (verbosity >= 2 || errno != ECONNRESET)
    809  1.1  christos #endif /* ECONNRESET */
    810  1.1  christos 			log_msg(LOG_ERR, "tcp read %s", strerror(errno));
    811  1.1  christos 			return -1;
    812  1.1  christos 		}
    813  1.1  christos 	} else if(received == 0) {
    814  1.1  christos 		/* EOF */
    815  1.1  christos 		return -1;
    816  1.1  christos 	}
    817  1.1  christos 
    818  1.1  christos 	tcp->total_bytes += received;
    819  1.1  christos 	buffer_skip(tcp->packet, received);
    820  1.1  christos 
    821  1.1  christos 	if(buffer_remaining(tcp->packet) > 0) {
    822  1.1  christos 		/* not complete yet, wait for more */
    823  1.1  christos 		return 0;
    824  1.1  christos 	}
    825  1.1  christos 
    826  1.1  christos 	/* completed */
    827  1.1  christos 	assert(buffer_position(tcp->packet) == tcp->msglen);
    828  1.1  christos 	return 1;
    829  1.1  christos }
    830  1.1  christos 
    831  1.1  christos void
    832  1.1  christos xfrd_tcp_read(struct xfrd_tcp_pipeline* tp)
    833  1.1  christos {
    834  1.1  christos 	xfrd_zone_t* zone;
    835  1.1  christos 	xfrd_tcp_t* tcp = tp->tcp_r;
    836  1.1  christos 	int ret;
    837  1.1  christos 	enum xfrd_packet_result pkt_result;
    838  1.1  christos 
    839  1.1  christos 	ret = conn_read(tcp);
    840  1.1  christos 	if(ret == -1) {
    841  1.1  christos 		xfrd_tcp_pipe_stop(tp);
    842  1.1  christos 		return;
    843  1.1  christos 	}
    844  1.1  christos 	if(ret == 0)
    845  1.1  christos 		return;
    846  1.1  christos 	/* completed msg */
    847  1.1  christos 	buffer_flip(tcp->packet);
    848  1.1  christos 	/* see which ID number it is, if skip, handle skip, NULL: warn */
    849  1.1  christos 	if(tcp->msglen < QHEADERSZ) {
    850  1.1  christos 		/* too short for DNS header, skip it */
    851  1.1  christos 		DEBUG(DEBUG_XFRD,1, (LOG_INFO,
    852  1.1  christos 			"xfrd: tcp skip response that is too short"));
    853  1.1  christos 		tcp_conn_ready_for_reading(tcp);
    854  1.1  christos 		return;
    855  1.1  christos 	}
    856  1.1  christos 	zone = tp->id[ID(tcp->packet)];
    857  1.1  christos 	if(!zone || zone == TCP_NULL_SKIP) {
    858  1.1  christos 		/* no zone for this id? skip it */
    859  1.1  christos 		DEBUG(DEBUG_XFRD,1, (LOG_INFO,
    860  1.1  christos 			"xfrd: tcp skip response with %s ID",
    861  1.1  christos 			zone?"set-to-skip":"unknown"));
    862  1.1  christos 		tcp_conn_ready_for_reading(tcp);
    863  1.1  christos 		return;
    864  1.1  christos 	}
    865  1.1  christos 	assert(zone->tcp_conn != -1);
    866  1.1  christos 
    867  1.1  christos 	/* handle message for zone */
    868  1.1  christos 	pkt_result = xfrd_handle_received_xfr_packet(zone, tcp->packet);
    869  1.1  christos 	/* setup for reading the next packet on this connection */
    870  1.1  christos 	tcp_conn_ready_for_reading(tcp);
    871  1.1  christos 	switch(pkt_result) {
    872  1.1  christos 		case xfrd_packet_more:
    873  1.1  christos 			/* wait for next packet */
    874  1.1  christos 			break;
    875  1.1  christos 		case xfrd_packet_newlease:
    876  1.1  christos 			/* set to skip if more packets with this ID */
    877  1.1  christos 			tp->id[zone->query_id] = TCP_NULL_SKIP;
    878  1.1  christos 			tp->num_skip++;
    879  1.1  christos 			/* fall through to remove zone from tp */
    880  1.1  christos 		case xfrd_packet_transfer:
    881  1.1  christos 			if(zone->zone_options->pattern->multi_master_check) {
    882  1.1  christos 				xfrd_tcp_release(xfrd->tcp_set, zone);
    883  1.1  christos 				xfrd_make_request(zone);
    884  1.1  christos 				break;
    885  1.1  christos 			}
    886  1.1  christos 			xfrd_tcp_release(xfrd->tcp_set, zone);
    887  1.1  christos 			assert(zone->round_num == -1);
    888  1.1  christos 			break;
    889  1.1  christos 		case xfrd_packet_notimpl:
    890  1.1  christos 			xfrd_disable_ixfr(zone);
    891  1.1  christos 			xfrd_tcp_release(xfrd->tcp_set, zone);
    892  1.1  christos 			/* query next server */
    893  1.1  christos 			xfrd_make_request(zone);
    894  1.1  christos 			break;
    895  1.1  christos 		case xfrd_packet_bad:
    896  1.1  christos 		case xfrd_packet_tcp:
    897  1.1  christos 		default:
    898  1.1  christos 			/* set to skip if more packets with this ID */
    899  1.1  christos 			tp->id[zone->query_id] = TCP_NULL_SKIP;
    900  1.1  christos 			tp->num_skip++;
    901  1.1  christos 			xfrd_tcp_release(xfrd->tcp_set, zone);
    902  1.1  christos 			/* query next server */
    903  1.1  christos 			xfrd_make_request(zone);
    904  1.1  christos 			break;
    905  1.1  christos 	}
    906  1.1  christos }
    907  1.1  christos 
    908  1.1  christos void
    909  1.1  christos xfrd_tcp_release(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
    910  1.1  christos {
    911  1.1  christos 	int conn = zone->tcp_conn;
    912  1.1  christos 	struct xfrd_tcp_pipeline* tp = set->tcp_state[conn];
    913  1.1  christos 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s released tcp conn to %s",
    914  1.1  christos 		zone->apex_str, zone->master->ip_address_spec));
    915  1.1  christos 	assert(zone->tcp_conn != -1);
    916  1.1  christos 	assert(zone->tcp_waiting == 0);
    917  1.1  christos 	zone->tcp_conn = -1;
    918  1.1  christos 	zone->tcp_waiting = 0;
    919  1.1  christos 
    920  1.1  christos 	/* remove from tcp_send list */
    921  1.1  christos 	tcp_pipe_sendlist_remove(tp, zone);
    922  1.1  christos 	/* remove it from the ID list */
    923  1.1  christos 	if(tp->id[zone->query_id] != TCP_NULL_SKIP)
    924  1.1  christos 		tcp_pipe_id_remove(tp, zone);
    925  1.1  christos 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: released tcp pipe now %d unused",
    926  1.1  christos 		tp->num_unused));
    927  1.1  christos 	/* if pipe was full, but no more, then see if waiting element is
    928  1.1  christos 	 * for the same master, and can fill the unused ID */
    929  1.1  christos 	if(tp->num_unused == 1 && set->tcp_waiting_first) {
    930  1.1  christos #ifdef INET6
    931  1.1  christos 		struct sockaddr_storage to;
    932  1.1  christos #else
    933  1.1  christos 		struct sockaddr_in to;
    934  1.1  christos #endif
    935  1.1  christos 		socklen_t to_len = xfrd_acl_sockaddr_to(
    936  1.1  christos 			set->tcp_waiting_first->master, &to);
    937  1.1  christos 		if(to_len == tp->ip_len && memcmp(&to, &tp->ip, to_len) == 0) {
    938  1.1  christos 			/* use this connection for the waiting zone */
    939  1.1  christos 			zone = set->tcp_waiting_first;
    940  1.1  christos 			assert(zone->tcp_conn == -1);
    941  1.1  christos 			zone->tcp_conn = conn;
    942  1.1  christos 			tcp_zone_waiting_list_popfirst(set, zone);
    943  1.1  christos 			if(zone->zone_handler.ev_fd != -1)
    944  1.1  christos 				xfrd_udp_release(zone);
    945  1.1  christos 			xfrd_unset_timer(zone);
    946  1.1  christos 			pipeline_setup_new_zone(set, tp, zone);
    947  1.1  christos 			return;
    948  1.1  christos 		}
    949  1.1  christos 		/* waiting zone did not go to same server */
    950  1.1  christos 	}
    951  1.1  christos 
    952  1.1  christos 	/* if all unused, or only skipped leftover, close the pipeline */
    953  1.1  christos 	if(tp->num_unused >= ID_PIPE_NUM || tp->num_skip >= ID_PIPE_NUM - tp->num_unused)
    954  1.1  christos 		xfrd_tcp_pipe_release(set, tp, conn);
    955  1.1  christos }
    956  1.1  christos 
    957  1.1  christos void
    958  1.1  christos xfrd_tcp_pipe_release(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp,
    959  1.1  christos 	int conn)
    960  1.1  christos {
    961  1.1  christos 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: tcp pipe released"));
    962  1.1  christos 	/* one handler per tcp pipe */
    963  1.1  christos 	if(tp->handler_added)
    964  1.1  christos 		event_del(&tp->handler);
    965  1.1  christos 	tp->handler_added = 0;
    966  1.1  christos 
    967  1.1  christos 	/* fd in tcp_r and tcp_w is the same, close once */
    968  1.1  christos 	if(tp->tcp_r->fd != -1)
    969  1.1  christos 		close(tp->tcp_r->fd);
    970  1.1  christos 	tp->tcp_r->fd = -1;
    971  1.1  christos 	tp->tcp_w->fd = -1;
    972  1.1  christos 
    973  1.1  christos 	/* remove from pipetree */
    974  1.1  christos 	(void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
    975  1.1  christos 
    976  1.1  christos 	/* a waiting zone can use the free tcp slot (to another server) */
    977  1.1  christos 	/* if that zone fails to set-up or connect, we try to start the next
    978  1.1  christos 	 * waiting zone in the list */
    979  1.1  christos 	while(set->tcp_count == XFRD_MAX_TCP && set->tcp_waiting_first) {
    980  1.1  christos 		int i;
    981  1.1  christos 
    982  1.1  christos 		/* pop first waiting process */
    983  1.1  christos 		xfrd_zone_t* zone = set->tcp_waiting_first;
    984  1.1  christos 		/* start it */
    985  1.1  christos 		assert(zone->tcp_conn == -1);
    986  1.1  christos 		zone->tcp_conn = conn;
    987  1.1  christos 		tcp_zone_waiting_list_popfirst(set, zone);
    988  1.1  christos 
    989  1.1  christos 		/* stop udp (if any) */
    990  1.1  christos 		if(zone->zone_handler.ev_fd != -1)
    991  1.1  christos 			xfrd_udp_release(zone);
    992  1.1  christos 		if(!xfrd_tcp_open(set, tp, zone)) {
    993  1.1  christos 			zone->tcp_conn = -1;
    994  1.1  christos 			xfrd_set_refresh_now(zone);
    995  1.1  christos 			/* try to start the next zone (if any) */
    996  1.1  christos 			continue;
    997  1.1  christos 		}
    998  1.1  christos 		/* re-init this tcppipe */
    999  1.1  christos 		/* ip and ip_len set by tcp_open */
   1000  1.1  christos 		tp->node.key = tp;
   1001  1.1  christos 		tp->num_unused = ID_PIPE_NUM;
   1002  1.1  christos 		tp->num_skip = 0;
   1003  1.1  christos 		tp->tcp_send_first = NULL;
   1004  1.1  christos 		tp->tcp_send_last = NULL;
   1005  1.1  christos 		memset(tp->id, 0, sizeof(tp->id));
   1006  1.1  christos 		for(i=0; i<ID_PIPE_NUM; i++) {
   1007  1.1  christos 			tp->unused[i] = i;
   1008  1.1  christos 		}
   1009  1.1  christos 
   1010  1.1  christos 		/* insert into tree */
   1011  1.1  christos 		(void)rbtree_insert(set->pipetree, &tp->node);
   1012  1.1  christos 		/* setup write */
   1013  1.1  christos 		xfrd_unset_timer(zone);
   1014  1.1  christos 		pipeline_setup_new_zone(set, tp, zone);
   1015  1.1  christos 		/* started a task, no need for cleanups, so return */
   1016  1.1  christos 		return;
   1017  1.1  christos 	}
   1018  1.1  christos 	/* no task to start, cleanup */
   1019  1.1  christos 	assert(!set->tcp_waiting_first);
   1020  1.1  christos 	set->tcp_count --;
   1021  1.1  christos 	assert(set->tcp_count >= 0);
   1022  1.1  christos }
   1023  1.1  christos 
   1024