Home | History | Annotate | Line # | Download | only in dist
xfrd-tcp.c revision 1.1.1.1.8.1
      1          1.1  christos /*
      2          1.1  christos  * xfrd-tcp.c - XFR (transfer) Daemon TCP system source file. Manages tcp conn.
      3          1.1  christos  *
      4          1.1  christos  * Copyright (c) 2001-2006, NLnet Labs. All rights reserved.
      5          1.1  christos  *
      6          1.1  christos  * See LICENSE for the license.
      7          1.1  christos  *
      8          1.1  christos  */
      9          1.1  christos 
     10          1.1  christos #include "config.h"
     11          1.1  christos #include <assert.h>
     12          1.1  christos #include <errno.h>
     13          1.1  christos #include <fcntl.h>
     14          1.1  christos #include <unistd.h>
     15          1.1  christos #include <stdlib.h>
     16  1.1.1.1.8.1    martin #include <sys/uio.h>
     17          1.1  christos #include "nsd.h"
     18          1.1  christos #include "xfrd-tcp.h"
     19          1.1  christos #include "buffer.h"
     20          1.1  christos #include "packet.h"
     21          1.1  christos #include "dname.h"
     22          1.1  christos #include "options.h"
     23          1.1  christos #include "namedb.h"
     24          1.1  christos #include "xfrd.h"
     25          1.1  christos #include "xfrd-disk.h"
     26          1.1  christos #include "util.h"
     27          1.1  christos 
     28          1.1  christos /* sort tcppipe, first on IP address, for an IPaddresss, sort on num_unused */
     29          1.1  christos static int
     30          1.1  christos xfrd_pipe_cmp(const void* a, const void* b)
     31          1.1  christos {
     32          1.1  christos 	const struct xfrd_tcp_pipeline* x = (struct xfrd_tcp_pipeline*)a;
     33          1.1  christos 	const struct xfrd_tcp_pipeline* y = (struct xfrd_tcp_pipeline*)b;
     34          1.1  christos 	int r;
     35          1.1  christos 	if(x == y)
     36          1.1  christos 		return 0;
     37          1.1  christos 	if(y->ip_len != x->ip_len)
     38          1.1  christos 		/* subtraction works because nonnegative and small numbers */
     39          1.1  christos 		return (int)y->ip_len - (int)x->ip_len;
     40          1.1  christos 	r = memcmp(&x->ip, &y->ip, x->ip_len);
     41          1.1  christos 	if(r != 0)
     42          1.1  christos 		return r;
     43          1.1  christos 	/* sort that num_unused is sorted ascending, */
     44          1.1  christos 	if(x->num_unused != y->num_unused) {
     45          1.1  christos 		return (x->num_unused < y->num_unused) ? -1 : 1;
     46          1.1  christos 	}
     47          1.1  christos 	/* different pipelines are different still, even with same numunused*/
     48          1.1  christos 	return (uintptr_t)x < (uintptr_t)y ? -1 : 1;
     49          1.1  christos }
     50          1.1  christos 
     51  1.1.1.1.8.1    martin struct xfrd_tcp_set* xfrd_tcp_set_create(struct region* region)
     52          1.1  christos {
     53          1.1  christos 	int i;
     54  1.1.1.1.8.1    martin 	struct xfrd_tcp_set* tcp_set = region_alloc(region,
     55  1.1.1.1.8.1    martin 		sizeof(struct xfrd_tcp_set));
     56  1.1.1.1.8.1    martin 	memset(tcp_set, 0, sizeof(struct xfrd_tcp_set));
     57          1.1  christos 	tcp_set->tcp_count = 0;
     58          1.1  christos 	tcp_set->tcp_waiting_first = 0;
     59          1.1  christos 	tcp_set->tcp_waiting_last = 0;
     60          1.1  christos 	for(i=0; i<XFRD_MAX_TCP; i++)
     61          1.1  christos 		tcp_set->tcp_state[i] = xfrd_tcp_pipeline_create(region);
     62          1.1  christos 	tcp_set->pipetree = rbtree_create(region, &xfrd_pipe_cmp);
     63          1.1  christos 	return tcp_set;
     64          1.1  christos }
     65          1.1  christos 
     66          1.1  christos struct xfrd_tcp_pipeline*
     67          1.1  christos xfrd_tcp_pipeline_create(region_type* region)
     68          1.1  christos {
     69          1.1  christos 	int i;
     70          1.1  christos 	struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)
     71          1.1  christos 		region_alloc_zero(region, sizeof(*tp));
     72          1.1  christos 	tp->num_unused = ID_PIPE_NUM;
     73          1.1  christos 	assert(sizeof(tp->unused)/sizeof(tp->unused[0]) == ID_PIPE_NUM);
     74          1.1  christos 	for(i=0; i<ID_PIPE_NUM; i++)
     75          1.1  christos 		tp->unused[i] = (uint16_t)i;
     76          1.1  christos 	tp->tcp_r = xfrd_tcp_create(region, QIOBUFSZ);
     77          1.1  christos 	tp->tcp_w = xfrd_tcp_create(region, 512);
     78          1.1  christos 	return tp;
     79          1.1  christos }
     80          1.1  christos 
     81          1.1  christos void
     82          1.1  christos xfrd_setup_packet(buffer_type* packet,
     83          1.1  christos 	uint16_t type, uint16_t klass, const dname_type* dname, uint16_t qid)
     84          1.1  christos {
     85          1.1  christos 	/* Set up the header */
     86          1.1  christos 	buffer_clear(packet);
     87          1.1  christos 	ID_SET(packet, qid);
     88          1.1  christos 	FLAGS_SET(packet, 0);
     89          1.1  christos 	OPCODE_SET(packet, OPCODE_QUERY);
     90          1.1  christos 	QDCOUNT_SET(packet, 1);
     91          1.1  christos 	ANCOUNT_SET(packet, 0);
     92          1.1  christos 	NSCOUNT_SET(packet, 0);
     93          1.1  christos 	ARCOUNT_SET(packet, 0);
     94          1.1  christos 	buffer_skip(packet, QHEADERSZ);
     95          1.1  christos 
     96          1.1  christos 	/* The question record. */
     97          1.1  christos 	buffer_write(packet, dname_name(dname), dname->name_size);
     98          1.1  christos 	buffer_write_u16(packet, type);
     99          1.1  christos 	buffer_write_u16(packet, klass);
    100          1.1  christos }
    101          1.1  christos 
    102          1.1  christos static socklen_t
    103          1.1  christos #ifdef INET6
    104  1.1.1.1.8.1    martin xfrd_acl_sockaddr(acl_options_type* acl, unsigned int port,
    105          1.1  christos 	struct sockaddr_storage *sck)
    106          1.1  christos #else
    107  1.1.1.1.8.1    martin xfrd_acl_sockaddr(acl_options_type* acl, unsigned int port,
    108          1.1  christos 	struct sockaddr_in *sck, const char* fromto)
    109          1.1  christos #endif /* INET6 */
    110          1.1  christos {
    111          1.1  christos 	/* setup address structure */
    112          1.1  christos #ifdef INET6
    113          1.1  christos 	memset(sck, 0, sizeof(struct sockaddr_storage));
    114          1.1  christos #else
    115          1.1  christos 	memset(sck, 0, sizeof(struct sockaddr_in));
    116          1.1  christos #endif
    117          1.1  christos 	if(acl->is_ipv6) {
    118          1.1  christos #ifdef INET6
    119          1.1  christos 		struct sockaddr_in6* sa = (struct sockaddr_in6*)sck;
    120          1.1  christos 		sa->sin6_family = AF_INET6;
    121          1.1  christos 		sa->sin6_port = htons(port);
    122          1.1  christos 		sa->sin6_addr = acl->addr.addr6;
    123          1.1  christos 		return sizeof(struct sockaddr_in6);
    124          1.1  christos #else
    125          1.1  christos 		log_msg(LOG_ERR, "xfrd: IPv6 connection %s %s attempted but no \
    126          1.1  christos INET6.", fromto, acl->ip_address_spec);
    127          1.1  christos 		return 0;
    128          1.1  christos #endif
    129          1.1  christos 	} else {
    130          1.1  christos 		struct sockaddr_in* sa = (struct sockaddr_in*)sck;
    131          1.1  christos 		sa->sin_family = AF_INET;
    132          1.1  christos 		sa->sin_port = htons(port);
    133          1.1  christos 		sa->sin_addr = acl->addr.addr;
    134          1.1  christos 		return sizeof(struct sockaddr_in);
    135          1.1  christos 	}
    136          1.1  christos }
    137          1.1  christos 
    138          1.1  christos socklen_t
    139          1.1  christos #ifdef INET6
    140  1.1.1.1.8.1    martin xfrd_acl_sockaddr_to(acl_options_type* acl, struct sockaddr_storage *to)
    141          1.1  christos #else
    142  1.1.1.1.8.1    martin xfrd_acl_sockaddr_to(acl_options_type* acl, struct sockaddr_in *to)
    143          1.1  christos #endif /* INET6 */
    144          1.1  christos {
    145          1.1  christos 	unsigned int port = acl->port?acl->port:(unsigned)atoi(TCP_PORT);
    146          1.1  christos #ifdef INET6
    147          1.1  christos 	return xfrd_acl_sockaddr(acl, port, to);
    148          1.1  christos #else
    149          1.1  christos 	return xfrd_acl_sockaddr(acl, port, to, "to");
    150          1.1  christos #endif /* INET6 */
    151          1.1  christos }
    152          1.1  christos 
    153          1.1  christos socklen_t
    154          1.1  christos #ifdef INET6
    155  1.1.1.1.8.1    martin xfrd_acl_sockaddr_frm(acl_options_type* acl, struct sockaddr_storage *frm)
    156          1.1  christos #else
    157  1.1.1.1.8.1    martin xfrd_acl_sockaddr_frm(acl_options_type* acl, struct sockaddr_in *frm)
    158          1.1  christos #endif /* INET6 */
    159          1.1  christos {
    160          1.1  christos 	unsigned int port = acl->port?acl->port:0;
    161          1.1  christos #ifdef INET6
    162          1.1  christos 	return xfrd_acl_sockaddr(acl, port, frm);
    163          1.1  christos #else
    164          1.1  christos 	return xfrd_acl_sockaddr(acl, port, frm, "from");
    165          1.1  christos #endif /* INET6 */
    166          1.1  christos }
    167          1.1  christos 
    168          1.1  christos void
    169          1.1  christos xfrd_write_soa_buffer(struct buffer* packet,
    170          1.1  christos 	const dname_type* apex, struct xfrd_soa* soa)
    171          1.1  christos {
    172          1.1  christos 	size_t rdlength_pos;
    173          1.1  christos 	uint16_t rdlength;
    174          1.1  christos 	buffer_write(packet, dname_name(apex), apex->name_size);
    175          1.1  christos 
    176          1.1  christos 	/* already in network order */
    177          1.1  christos 	buffer_write(packet, &soa->type, sizeof(soa->type));
    178          1.1  christos 	buffer_write(packet, &soa->klass, sizeof(soa->klass));
    179          1.1  christos 	buffer_write(packet, &soa->ttl, sizeof(soa->ttl));
    180          1.1  christos 	rdlength_pos = buffer_position(packet);
    181          1.1  christos 	buffer_skip(packet, sizeof(rdlength));
    182          1.1  christos 
    183          1.1  christos 	/* uncompressed dnames */
    184          1.1  christos 	buffer_write(packet, soa->prim_ns+1, soa->prim_ns[0]);
    185          1.1  christos 	buffer_write(packet, soa->email+1, soa->email[0]);
    186          1.1  christos 
    187          1.1  christos 	buffer_write(packet, &soa->serial, sizeof(uint32_t));
    188          1.1  christos 	buffer_write(packet, &soa->refresh, sizeof(uint32_t));
    189          1.1  christos 	buffer_write(packet, &soa->retry, sizeof(uint32_t));
    190          1.1  christos 	buffer_write(packet, &soa->expire, sizeof(uint32_t));
    191          1.1  christos 	buffer_write(packet, &soa->minimum, sizeof(uint32_t));
    192          1.1  christos 
    193          1.1  christos 	/* write length of RR */
    194          1.1  christos 	rdlength = buffer_position(packet) - rdlength_pos - sizeof(rdlength);
    195          1.1  christos 	buffer_write_u16_at(packet, rdlength_pos, rdlength);
    196          1.1  christos }
    197          1.1  christos 
    198  1.1.1.1.8.1    martin struct xfrd_tcp*
    199          1.1  christos xfrd_tcp_create(region_type* region, size_t bufsize)
    200          1.1  christos {
    201  1.1.1.1.8.1    martin 	struct xfrd_tcp* tcp_state = (struct xfrd_tcp*)region_alloc(
    202  1.1.1.1.8.1    martin 		region, sizeof(struct xfrd_tcp));
    203  1.1.1.1.8.1    martin 	memset(tcp_state, 0, sizeof(struct xfrd_tcp));
    204          1.1  christos 	tcp_state->packet = buffer_create(region, bufsize);
    205          1.1  christos 	tcp_state->fd = -1;
    206          1.1  christos 
    207          1.1  christos 	return tcp_state;
    208          1.1  christos }
    209          1.1  christos 
    210          1.1  christos static struct xfrd_tcp_pipeline*
    211  1.1.1.1.8.1    martin pipeline_find(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
    212          1.1  christos {
    213  1.1.1.1.8.1    martin 	rbnode_type* sme = NULL;
    214          1.1  christos 	struct xfrd_tcp_pipeline* r;
    215          1.1  christos 	/* smaller buf than a full pipeline with 64kb ID array, only need
    216          1.1  christos 	 * the front part with the key info, this front part contains the
    217          1.1  christos 	 * members that the compare function uses. */
    218          1.1  christos 	const size_t keysize = sizeof(struct xfrd_tcp_pipeline) -
    219          1.1  christos 		ID_PIPE_NUM*(sizeof(struct xfrd_zone*) + sizeof(uint16_t));
    220          1.1  christos 	/* void* type for alignment of the struct,
    221          1.1  christos 	 * divide the keysize by ptr-size and then add one to round up */
    222          1.1  christos 	void* buf[ (keysize / sizeof(void*)) + 1 ];
    223          1.1  christos 	struct xfrd_tcp_pipeline* key = (struct xfrd_tcp_pipeline*)buf;
    224          1.1  christos 	key->node.key = key;
    225          1.1  christos 	key->ip_len = xfrd_acl_sockaddr_to(zone->master, &key->ip);
    226          1.1  christos 	key->num_unused = ID_PIPE_NUM;
    227          1.1  christos 	/* lookup existing tcp transfer to the master with highest unused */
    228          1.1  christos 	if(rbtree_find_less_equal(set->pipetree, key, &sme)) {
    229          1.1  christos 		/* exact match, strange, fully unused tcp cannot be open */
    230          1.1  christos 		assert(0);
    231          1.1  christos 	}
    232          1.1  christos 	if(!sme)
    233          1.1  christos 		return NULL;
    234          1.1  christos 	r = (struct xfrd_tcp_pipeline*)sme->key;
    235          1.1  christos 	/* <= key pointed at, is the master correct ? */
    236          1.1  christos 	if(r->ip_len != key->ip_len)
    237          1.1  christos 		return NULL;
    238          1.1  christos 	if(memcmp(&r->ip, &key->ip, key->ip_len) != 0)
    239          1.1  christos 		return NULL;
    240          1.1  christos 	/* correct master, is there a slot free for this transfer? */
    241          1.1  christos 	if(r->num_unused == 0)
    242          1.1  christos 		return NULL;
    243          1.1  christos 	return r;
    244          1.1  christos }
    245          1.1  christos 
    246          1.1  christos /* remove zone from tcp waiting list */
    247          1.1  christos static void
    248  1.1.1.1.8.1    martin tcp_zone_waiting_list_popfirst(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
    249          1.1  christos {
    250          1.1  christos 	assert(zone->tcp_waiting);
    251          1.1  christos 	set->tcp_waiting_first = zone->tcp_waiting_next;
    252          1.1  christos 	if(zone->tcp_waiting_next)
    253          1.1  christos 		zone->tcp_waiting_next->tcp_waiting_prev = NULL;
    254          1.1  christos 	else	set->tcp_waiting_last = 0;
    255          1.1  christos 	zone->tcp_waiting_next = 0;
    256          1.1  christos 	zone->tcp_waiting = 0;
    257          1.1  christos }
    258          1.1  christos 
    259          1.1  christos /* remove zone from tcp pipe write-wait list */
    260          1.1  christos static void
    261  1.1.1.1.8.1    martin tcp_pipe_sendlist_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
    262          1.1  christos {
    263          1.1  christos 	if(zone->in_tcp_send) {
    264          1.1  christos 		if(zone->tcp_send_prev)
    265          1.1  christos 			zone->tcp_send_prev->tcp_send_next=zone->tcp_send_next;
    266          1.1  christos 		else	tp->tcp_send_first=zone->tcp_send_next;
    267          1.1  christos 		if(zone->tcp_send_next)
    268          1.1  christos 			zone->tcp_send_next->tcp_send_prev=zone->tcp_send_prev;
    269          1.1  christos 		else	tp->tcp_send_last=zone->tcp_send_prev;
    270          1.1  christos 		zone->in_tcp_send = 0;
    271          1.1  christos 	}
    272          1.1  christos }
    273          1.1  christos 
    274          1.1  christos /* remove first from write-wait list */
    275          1.1  christos static void
    276  1.1.1.1.8.1    martin tcp_pipe_sendlist_popfirst(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
    277          1.1  christos {
    278          1.1  christos 	tp->tcp_send_first = zone->tcp_send_next;
    279          1.1  christos 	if(tp->tcp_send_first)
    280          1.1  christos 		tp->tcp_send_first->tcp_send_prev = NULL;
    281          1.1  christos 	else	tp->tcp_send_last = NULL;
    282          1.1  christos 	zone->in_tcp_send = 0;
    283          1.1  christos }
    284          1.1  christos 
    285          1.1  christos /* remove zone from tcp pipe ID map */
    286          1.1  christos static void
    287  1.1.1.1.8.1    martin tcp_pipe_id_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
    288          1.1  christos {
    289          1.1  christos 	assert(tp->num_unused < ID_PIPE_NUM && tp->num_unused >= 0);
    290          1.1  christos 	assert(tp->id[zone->query_id] == zone);
    291          1.1  christos 	tp->id[zone->query_id] = NULL;
    292          1.1  christos 	tp->unused[tp->num_unused] = zone->query_id;
    293          1.1  christos 	/* must remove and re-add for sort order in tree */
    294          1.1  christos 	(void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
    295          1.1  christos 	tp->num_unused++;
    296          1.1  christos 	(void)rbtree_insert(xfrd->tcp_set->pipetree, &tp->node);
    297          1.1  christos }
    298          1.1  christos 
    299          1.1  christos /* stop the tcp pipe (and all its zones need to retry) */
    300          1.1  christos static void
    301          1.1  christos xfrd_tcp_pipe_stop(struct xfrd_tcp_pipeline* tp)
    302          1.1  christos {
    303          1.1  christos 	int i, conn = -1;
    304          1.1  christos 	assert(tp->num_unused < ID_PIPE_NUM); /* at least one 'in-use' */
    305          1.1  christos 	assert(ID_PIPE_NUM - tp->num_unused > tp->num_skip); /* at least one 'nonskip' */
    306          1.1  christos 	/* need to retry for all the zones connected to it */
    307          1.1  christos 	/* these could use different lists and go to a different nextmaster*/
    308          1.1  christos 	for(i=0; i<ID_PIPE_NUM; i++) {
    309          1.1  christos 		if(tp->id[i] && tp->id[i] != TCP_NULL_SKIP) {
    310  1.1.1.1.8.1    martin 			xfrd_zone_type* zone = tp->id[i];
    311          1.1  christos 			conn = zone->tcp_conn;
    312          1.1  christos 			zone->tcp_conn = -1;
    313          1.1  christos 			zone->tcp_waiting = 0;
    314          1.1  christos 			tcp_pipe_sendlist_remove(tp, zone);
    315          1.1  christos 			tcp_pipe_id_remove(tp, zone);
    316          1.1  christos 			xfrd_set_refresh_now(zone);
    317          1.1  christos 		}
    318          1.1  christos 	}
    319          1.1  christos 	assert(conn != -1);
    320          1.1  christos 	/* now release the entire tcp pipe */
    321          1.1  christos 	xfrd_tcp_pipe_release(xfrd->tcp_set, tp, conn);
    322          1.1  christos }
    323          1.1  christos 
    324          1.1  christos static void
    325          1.1  christos tcp_pipe_reset_timeout(struct xfrd_tcp_pipeline* tp)
    326          1.1  christos {
    327          1.1  christos 	int fd = tp->handler.ev_fd;
    328          1.1  christos 	struct timeval tv;
    329          1.1  christos 	tv.tv_sec = xfrd->tcp_set->tcp_timeout;
    330          1.1  christos 	tv.tv_usec = 0;
    331          1.1  christos 	if(tp->handler_added)
    332          1.1  christos 		event_del(&tp->handler);
    333          1.1  christos 	event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|
    334          1.1  christos 		(tp->tcp_send_first?EV_WRITE:0), xfrd_handle_tcp_pipe, tp);
    335          1.1  christos 	if(event_base_set(xfrd->event_base, &tp->handler) != 0)
    336          1.1  christos 		log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
    337          1.1  christos 	if(event_add(&tp->handler, &tv) != 0)
    338          1.1  christos 		log_msg(LOG_ERR, "xfrd tcp: event_add failed");
    339          1.1  christos 	tp->handler_added = 1;
    340          1.1  christos }
    341          1.1  christos 
    342          1.1  christos /* handle event from fd of tcp pipe */
    343          1.1  christos void
    344          1.1  christos xfrd_handle_tcp_pipe(int ATTR_UNUSED(fd), short event, void* arg)
    345          1.1  christos {
    346          1.1  christos 	struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)arg;
    347          1.1  christos 	if((event & EV_WRITE)) {
    348          1.1  christos 		tcp_pipe_reset_timeout(tp);
    349          1.1  christos 		if(tp->tcp_send_first) {
    350          1.1  christos 			DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp write, zone %s",
    351          1.1  christos 				tp->tcp_send_first->apex_str));
    352          1.1  christos 			xfrd_tcp_write(tp, tp->tcp_send_first);
    353          1.1  christos 		}
    354          1.1  christos 	}
    355          1.1  christos 	if((event & EV_READ) && tp->handler_added) {
    356          1.1  christos 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp read"));
    357          1.1  christos 		tcp_pipe_reset_timeout(tp);
    358          1.1  christos 		xfrd_tcp_read(tp);
    359          1.1  christos 	}
    360          1.1  christos 	if((event & EV_TIMEOUT) && tp->handler_added) {
    361          1.1  christos 		/* tcp connection timed out */
    362          1.1  christos 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp timeout"));
    363          1.1  christos 		xfrd_tcp_pipe_stop(tp);
    364          1.1  christos 	}
    365          1.1  christos }
    366          1.1  christos 
    367          1.1  christos /* add a zone to the pipeline, it starts to want to write its query */
    368          1.1  christos static void
    369  1.1.1.1.8.1    martin pipeline_setup_new_zone(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp,
    370  1.1.1.1.8.1    martin 	xfrd_zone_type* zone)
    371          1.1  christos {
    372          1.1  christos 	/* assign the ID */
    373          1.1  christos 	int idx;
    374          1.1  christos 	assert(tp->num_unused > 0);
    375          1.1  christos 	/* we pick a random ID, even though it is TCP anyway */
    376          1.1  christos 	idx = random_generate(tp->num_unused);
    377          1.1  christos 	zone->query_id = tp->unused[idx];
    378          1.1  christos 	tp->unused[idx] = tp->unused[tp->num_unused-1];
    379          1.1  christos 	tp->id[zone->query_id] = zone;
    380          1.1  christos 	/* decrement unused counter, and fixup tree */
    381          1.1  christos 	(void)rbtree_delete(set->pipetree, &tp->node);
    382          1.1  christos 	tp->num_unused--;
    383          1.1  christos 	(void)rbtree_insert(set->pipetree, &tp->node);
    384          1.1  christos 
    385          1.1  christos 	/* add to sendlist, at end */
    386          1.1  christos 	zone->tcp_send_next = NULL;
    387          1.1  christos 	zone->tcp_send_prev = tp->tcp_send_last;
    388          1.1  christos 	zone->in_tcp_send = 1;
    389          1.1  christos 	if(tp->tcp_send_last)
    390          1.1  christos 		tp->tcp_send_last->tcp_send_next = zone;
    391          1.1  christos 	else	tp->tcp_send_first = zone;
    392          1.1  christos 	tp->tcp_send_last = zone;
    393          1.1  christos 
    394          1.1  christos 	/* is it first in line? */
    395          1.1  christos 	if(tp->tcp_send_first == zone) {
    396          1.1  christos 		xfrd_tcp_setup_write_packet(tp, zone);
    397          1.1  christos 		/* add write to event handler */
    398          1.1  christos 		tcp_pipe_reset_timeout(tp);
    399          1.1  christos 	}
    400          1.1  christos }
    401          1.1  christos 
    402          1.1  christos void
    403  1.1.1.1.8.1    martin xfrd_tcp_obtain(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
    404          1.1  christos {
    405          1.1  christos 	struct xfrd_tcp_pipeline* tp;
    406          1.1  christos 	assert(zone->tcp_conn == -1);
    407          1.1  christos 	assert(zone->tcp_waiting == 0);
    408          1.1  christos 
    409          1.1  christos 	if(set->tcp_count < XFRD_MAX_TCP) {
    410          1.1  christos 		int i;
    411          1.1  christos 		assert(!set->tcp_waiting_first);
    412          1.1  christos 		set->tcp_count ++;
    413          1.1  christos 		/* find a free tcp_buffer */
    414          1.1  christos 		for(i=0; i<XFRD_MAX_TCP; i++) {
    415          1.1  christos 			if(set->tcp_state[i]->tcp_r->fd == -1) {
    416          1.1  christos 				zone->tcp_conn = i;
    417          1.1  christos 				break;
    418          1.1  christos 			}
    419          1.1  christos 		}
    420          1.1  christos 		/** What if there is no free tcp_buffer? return; */
    421          1.1  christos 		if (zone->tcp_conn < 0) {
    422          1.1  christos 			return;
    423          1.1  christos 		}
    424          1.1  christos 
    425          1.1  christos 		tp = set->tcp_state[zone->tcp_conn];
    426          1.1  christos 		zone->tcp_waiting = 0;
    427          1.1  christos 
    428          1.1  christos 		/* stop udp use (if any) */
    429          1.1  christos 		if(zone->zone_handler.ev_fd != -1)
    430          1.1  christos 			xfrd_udp_release(zone);
    431          1.1  christos 
    432          1.1  christos 		if(!xfrd_tcp_open(set, tp, zone)) {
    433          1.1  christos 			zone->tcp_conn = -1;
    434          1.1  christos 			set->tcp_count --;
    435          1.1  christos 			xfrd_set_refresh_now(zone);
    436          1.1  christos 			return;
    437          1.1  christos 		}
    438          1.1  christos 		/* ip and ip_len set by tcp_open */
    439          1.1  christos 		tp->node.key = tp;
    440          1.1  christos 		tp->num_unused = ID_PIPE_NUM;
    441          1.1  christos 		tp->num_skip = 0;
    442          1.1  christos 		tp->tcp_send_first = NULL;
    443          1.1  christos 		tp->tcp_send_last = NULL;
    444          1.1  christos 		memset(tp->id, 0, sizeof(tp->id));
    445          1.1  christos 		for(i=0; i<ID_PIPE_NUM; i++) {
    446          1.1  christos 			tp->unused[i] = i;
    447          1.1  christos 		}
    448          1.1  christos 
    449          1.1  christos 		/* insert into tree */
    450          1.1  christos 		(void)rbtree_insert(set->pipetree, &tp->node);
    451          1.1  christos 		xfrd_deactivate_zone(zone);
    452          1.1  christos 		xfrd_unset_timer(zone);
    453          1.1  christos 		pipeline_setup_new_zone(set, tp, zone);
    454          1.1  christos 		return;
    455          1.1  christos 	}
    456          1.1  christos 	/* check for a pipeline to the same master with unused ID */
    457          1.1  christos 	if((tp = pipeline_find(set, zone))!= NULL) {
    458          1.1  christos 		int i;
    459          1.1  christos 		if(zone->zone_handler.ev_fd != -1)
    460          1.1  christos 			xfrd_udp_release(zone);
    461          1.1  christos 		for(i=0; i<XFRD_MAX_TCP; i++) {
    462          1.1  christos 			if(set->tcp_state[i] == tp)
    463          1.1  christos 				zone->tcp_conn = i;
    464          1.1  christos 		}
    465          1.1  christos 		xfrd_deactivate_zone(zone);
    466          1.1  christos 		xfrd_unset_timer(zone);
    467          1.1  christos 		pipeline_setup_new_zone(set, tp, zone);
    468          1.1  christos 		return;
    469          1.1  christos 	}
    470          1.1  christos 
    471          1.1  christos 	/* wait, at end of line */
    472          1.1  christos 	DEBUG(DEBUG_XFRD,2, (LOG_INFO, "xfrd: max number of tcp "
    473          1.1  christos 		"connections (%d) reached.", XFRD_MAX_TCP));
    474          1.1  christos 	zone->tcp_waiting_next = 0;
    475          1.1  christos 	zone->tcp_waiting_prev = set->tcp_waiting_last;
    476          1.1  christos 	zone->tcp_waiting = 1;
    477          1.1  christos 	if(!set->tcp_waiting_last) {
    478          1.1  christos 		set->tcp_waiting_first = zone;
    479          1.1  christos 		set->tcp_waiting_last = zone;
    480          1.1  christos 	} else {
    481          1.1  christos 		set->tcp_waiting_last->tcp_waiting_next = zone;
    482          1.1  christos 		set->tcp_waiting_last = zone;
    483          1.1  christos 	}
    484          1.1  christos 	xfrd_deactivate_zone(zone);
    485          1.1  christos 	xfrd_unset_timer(zone);
    486          1.1  christos }
    487          1.1  christos 
    488          1.1  christos int
    489  1.1.1.1.8.1    martin xfrd_tcp_open(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp,
    490  1.1.1.1.8.1    martin 	xfrd_zone_type* zone)
    491          1.1  christos {
    492          1.1  christos 	int fd, family, conn;
    493          1.1  christos 	struct timeval tv;
    494          1.1  christos 	assert(zone->tcp_conn != -1);
    495          1.1  christos 
    496          1.1  christos 	/* if there is no next master, fallback to use the first one */
    497          1.1  christos 	/* but there really should be a master set */
    498          1.1  christos 	if(!zone->master) {
    499          1.1  christos 		zone->master = zone->zone_options->pattern->request_xfr;
    500          1.1  christos 		zone->master_num = 0;
    501          1.1  christos 	}
    502          1.1  christos 
    503          1.1  christos 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s open tcp conn to %s",
    504          1.1  christos 		zone->apex_str, zone->master->ip_address_spec));
    505          1.1  christos 	tp->tcp_r->is_reading = 1;
    506          1.1  christos 	tp->tcp_r->total_bytes = 0;
    507          1.1  christos 	tp->tcp_r->msglen = 0;
    508          1.1  christos 	buffer_clear(tp->tcp_r->packet);
    509          1.1  christos 	tp->tcp_w->is_reading = 0;
    510          1.1  christos 	tp->tcp_w->total_bytes = 0;
    511          1.1  christos 	tp->tcp_w->msglen = 0;
    512          1.1  christos 	tp->connection_established = 0;
    513          1.1  christos 
    514          1.1  christos 	if(zone->master->is_ipv6) {
    515          1.1  christos #ifdef INET6
    516          1.1  christos 		family = PF_INET6;
    517          1.1  christos #else
    518          1.1  christos 		xfrd_set_refresh_now(zone);
    519          1.1  christos 		return 0;
    520          1.1  christos #endif
    521          1.1  christos 	} else {
    522          1.1  christos 		family = PF_INET;
    523          1.1  christos 	}
    524          1.1  christos 	fd = socket(family, SOCK_STREAM, IPPROTO_TCP);
    525          1.1  christos 	if(fd == -1) {
    526  1.1.1.1.8.1    martin 		/* squelch 'Address family not supported by protocol' at low
    527  1.1.1.1.8.1    martin 		 * verbosity levels */
    528  1.1.1.1.8.1    martin 		if(errno != EAFNOSUPPORT || verbosity > 2)
    529  1.1.1.1.8.1    martin 		    log_msg(LOG_ERR, "xfrd: %s cannot create tcp socket: %s",
    530          1.1  christos 			zone->master->ip_address_spec, strerror(errno));
    531          1.1  christos 		xfrd_set_refresh_now(zone);
    532          1.1  christos 		return 0;
    533          1.1  christos 	}
    534          1.1  christos 	if(fcntl(fd, F_SETFL, O_NONBLOCK) == -1) {
    535          1.1  christos 		log_msg(LOG_ERR, "xfrd: fcntl failed: %s", strerror(errno));
    536          1.1  christos 		close(fd);
    537          1.1  christos 		xfrd_set_refresh_now(zone);
    538          1.1  christos 		return 0;
    539          1.1  christos 	}
    540          1.1  christos 
    541          1.1  christos 	if(xfrd->nsd->outgoing_tcp_mss > 0) {
    542          1.1  christos #if defined(IPPROTO_TCP) && defined(TCP_MAXSEG)
    543          1.1  christos 		if(setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG,
    544          1.1  christos 			(void*)&xfrd->nsd->outgoing_tcp_mss,
    545          1.1  christos 			sizeof(xfrd->nsd->outgoing_tcp_mss)) < 0) {
    546          1.1  christos 			log_msg(LOG_ERR, "xfrd: setsockopt(TCP_MAXSEG)"
    547          1.1  christos 					"failed: %s", strerror(errno));
    548          1.1  christos 		}
    549          1.1  christos #else
    550          1.1  christos 		log_msg(LOG_ERR, "setsockopt(TCP_MAXSEG) unsupported");
    551          1.1  christos #endif
    552          1.1  christos 	}
    553          1.1  christos 
    554          1.1  christos 	tp->ip_len = xfrd_acl_sockaddr_to(zone->master, &tp->ip);
    555          1.1  christos 
    556          1.1  christos 	/* bind it */
    557          1.1  christos 	if (!xfrd_bind_local_interface(fd, zone->zone_options->pattern->
    558          1.1  christos 		outgoing_interface, zone->master, 1)) {
    559          1.1  christos 		close(fd);
    560          1.1  christos 		xfrd_set_refresh_now(zone);
    561          1.1  christos 		return 0;
    562          1.1  christos         }
    563          1.1  christos 
    564          1.1  christos 	conn = connect(fd, (struct sockaddr*)&tp->ip, tp->ip_len);
    565          1.1  christos 	if (conn == -1 && errno != EINPROGRESS) {
    566          1.1  christos 		log_msg(LOG_ERR, "xfrd: connect %s failed: %s",
    567          1.1  christos 			zone->master->ip_address_spec, strerror(errno));
    568          1.1  christos 		close(fd);
    569          1.1  christos 		xfrd_set_refresh_now(zone);
    570          1.1  christos 		return 0;
    571          1.1  christos 	}
    572          1.1  christos 	tp->tcp_r->fd = fd;
    573          1.1  christos 	tp->tcp_w->fd = fd;
    574          1.1  christos 
    575          1.1  christos 	/* set the tcp pipe event */
    576          1.1  christos 	if(tp->handler_added)
    577          1.1  christos 		event_del(&tp->handler);
    578          1.1  christos 	event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|EV_WRITE,
    579          1.1  christos 		xfrd_handle_tcp_pipe, tp);
    580          1.1  christos 	if(event_base_set(xfrd->event_base, &tp->handler) != 0)
    581          1.1  christos 		log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
    582          1.1  christos 	tv.tv_sec = set->tcp_timeout;
    583          1.1  christos 	tv.tv_usec = 0;
    584          1.1  christos 	if(event_add(&tp->handler, &tv) != 0)
    585          1.1  christos 		log_msg(LOG_ERR, "xfrd tcp: event_add failed");
    586          1.1  christos 	tp->handler_added = 1;
    587          1.1  christos 	return 1;
    588          1.1  christos }
    589          1.1  christos 
    590          1.1  christos void
    591  1.1.1.1.8.1    martin xfrd_tcp_setup_write_packet(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
    592          1.1  christos {
    593  1.1.1.1.8.1    martin 	struct xfrd_tcp* tcp = tp->tcp_w;
    594          1.1  christos 	assert(zone->tcp_conn != -1);
    595          1.1  christos 	assert(zone->tcp_waiting == 0);
    596          1.1  christos 	/* start AXFR or IXFR for the zone */
    597          1.1  christos 	if(zone->soa_disk_acquired == 0 || zone->master->use_axfr_only ||
    598          1.1  christos 		zone->master->ixfr_disabled ||
    599          1.1  christos 		/* if zone expired, after the first round, do not ask for
    600          1.1  christos 		 * IXFR any more, but full AXFR (of any serial number) */
    601          1.1  christos 		(zone->state == xfrd_zone_expired && zone->round_num != 0)) {
    602          1.1  christos 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request full zone transfer "
    603          1.1  christos 						"(AXFR) for %s to %s",
    604          1.1  christos 			zone->apex_str, zone->master->ip_address_spec));
    605          1.1  christos 
    606          1.1  christos 		xfrd_setup_packet(tcp->packet, TYPE_AXFR, CLASS_IN, zone->apex,
    607          1.1  christos 			zone->query_id);
    608          1.1  christos 	} else {
    609          1.1  christos 		DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request incremental zone "
    610          1.1  christos 						"transfer (IXFR) for %s to %s",
    611          1.1  christos 			zone->apex_str, zone->master->ip_address_spec));
    612          1.1  christos 
    613          1.1  christos 		xfrd_setup_packet(tcp->packet, TYPE_IXFR, CLASS_IN, zone->apex,
    614          1.1  christos 			zone->query_id);
    615          1.1  christos         	NSCOUNT_SET(tcp->packet, 1);
    616          1.1  christos 		xfrd_write_soa_buffer(tcp->packet, zone->apex, &zone->soa_disk);
    617          1.1  christos 	}
    618          1.1  christos 	/* old transfer needs to be removed still? */
    619          1.1  christos 	if(zone->msg_seq_nr)
    620          1.1  christos 		xfrd_unlink_xfrfile(xfrd->nsd, zone->xfrfilenumber);
    621          1.1  christos 	zone->msg_seq_nr = 0;
    622          1.1  christos 	zone->msg_rr_count = 0;
    623          1.1  christos 	if(zone->master->key_options && zone->master->key_options->tsig_key) {
    624          1.1  christos 		xfrd_tsig_sign_request(tcp->packet, &zone->tsig, zone->master);
    625          1.1  christos 	}
    626          1.1  christos 	buffer_flip(tcp->packet);
    627          1.1  christos 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "sent tcp query with ID %d", zone->query_id));
    628          1.1  christos 	tcp->msglen = buffer_limit(tcp->packet);
    629          1.1  christos 	tcp->total_bytes = 0;
    630          1.1  christos }
    631          1.1  christos 
    632          1.1  christos static void
    633  1.1.1.1.8.1    martin tcp_conn_ready_for_reading(struct xfrd_tcp* tcp)
    634          1.1  christos {
    635          1.1  christos 	tcp->total_bytes = 0;
    636          1.1  christos 	tcp->msglen = 0;
    637          1.1  christos 	buffer_clear(tcp->packet);
    638          1.1  christos }
    639          1.1  christos 
    640  1.1.1.1.8.1    martin int conn_write(struct xfrd_tcp* tcp)
    641          1.1  christos {
    642          1.1  christos 	ssize_t sent;
    643          1.1  christos 
    644          1.1  christos 	if(tcp->total_bytes < sizeof(tcp->msglen)) {
    645          1.1  christos 		uint16_t sendlen = htons(tcp->msglen);
    646  1.1.1.1.8.1    martin #ifdef HAVE_WRITEV
    647  1.1.1.1.8.1    martin 		struct iovec iov[2];
    648  1.1.1.1.8.1    martin 		iov[0].iov_base = (uint8_t*)&sendlen + tcp->total_bytes;
    649  1.1.1.1.8.1    martin 		iov[0].iov_len = sizeof(sendlen) - tcp->total_bytes;
    650  1.1.1.1.8.1    martin 		iov[1].iov_base = buffer_begin(tcp->packet);
    651  1.1.1.1.8.1    martin 		iov[1].iov_len = buffer_limit(tcp->packet);
    652  1.1.1.1.8.1    martin 		sent = writev(tcp->fd, iov, 2);
    653  1.1.1.1.8.1    martin #else /* HAVE_WRITEV */
    654          1.1  christos 		sent = write(tcp->fd,
    655          1.1  christos 			(const char*)&sendlen + tcp->total_bytes,
    656          1.1  christos 			sizeof(tcp->msglen) - tcp->total_bytes);
    657  1.1.1.1.8.1    martin #endif /* HAVE_WRITEV */
    658          1.1  christos 
    659          1.1  christos 		if(sent == -1) {
    660          1.1  christos 			if(errno == EAGAIN || errno == EINTR) {
    661          1.1  christos 				/* write would block, try later */
    662          1.1  christos 				return 0;
    663          1.1  christos 			} else {
    664          1.1  christos 				return -1;
    665          1.1  christos 			}
    666          1.1  christos 		}
    667          1.1  christos 
    668          1.1  christos 		tcp->total_bytes += sent;
    669  1.1.1.1.8.1    martin 		if(sent > (ssize_t)sizeof(tcp->msglen))
    670  1.1.1.1.8.1    martin 			buffer_skip(tcp->packet, sent-sizeof(tcp->msglen));
    671          1.1  christos 		if(tcp->total_bytes < sizeof(tcp->msglen)) {
    672          1.1  christos 			/* incomplete write, resume later */
    673          1.1  christos 			return 0;
    674          1.1  christos 		}
    675  1.1.1.1.8.1    martin #ifdef HAVE_WRITEV
    676  1.1.1.1.8.1    martin 		if(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen)) {
    677  1.1.1.1.8.1    martin 			/* packet done */
    678  1.1.1.1.8.1    martin 			return 1;
    679  1.1.1.1.8.1    martin 		}
    680  1.1.1.1.8.1    martin #endif
    681  1.1.1.1.8.1    martin 		assert(tcp->total_bytes >= sizeof(tcp->msglen));
    682          1.1  christos 	}
    683          1.1  christos 
    684          1.1  christos 	assert(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen));
    685          1.1  christos 
    686          1.1  christos 	sent = write(tcp->fd,
    687          1.1  christos 		buffer_current(tcp->packet),
    688          1.1  christos 		buffer_remaining(tcp->packet));
    689          1.1  christos 	if(sent == -1) {
    690          1.1  christos 		if(errno == EAGAIN || errno == EINTR) {
    691          1.1  christos 			/* write would block, try later */
    692          1.1  christos 			return 0;
    693          1.1  christos 		} else {
    694          1.1  christos 			return -1;
    695          1.1  christos 		}
    696          1.1  christos 	}
    697          1.1  christos 
    698          1.1  christos 	buffer_skip(tcp->packet, sent);
    699          1.1  christos 	tcp->total_bytes += sent;
    700          1.1  christos 
    701          1.1  christos 	if(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen)) {
    702          1.1  christos 		/* more to write when socket becomes writable again */
    703          1.1  christos 		return 0;
    704          1.1  christos 	}
    705          1.1  christos 
    706          1.1  christos 	assert(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen));
    707          1.1  christos 	return 1;
    708          1.1  christos }
    709          1.1  christos 
    710          1.1  christos void
    711  1.1.1.1.8.1    martin xfrd_tcp_write(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
    712          1.1  christos {
    713          1.1  christos 	int ret;
    714  1.1.1.1.8.1    martin 	struct xfrd_tcp* tcp = tp->tcp_w;
    715          1.1  christos 	assert(zone->tcp_conn != -1);
    716          1.1  christos 	assert(zone == tp->tcp_send_first);
    717          1.1  christos 	/* see if for non-established connection, there is a connect error */
    718          1.1  christos 	if(!tp->connection_established) {
    719          1.1  christos 		/* check for pending error from nonblocking connect */
    720          1.1  christos 		/* from Stevens, unix network programming, vol1, 3rd ed, p450 */
    721          1.1  christos 		int error = 0;
    722          1.1  christos 		socklen_t len = sizeof(error);
    723          1.1  christos 		if(getsockopt(tcp->fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0){
    724          1.1  christos 			error = errno; /* on solaris errno is error */
    725          1.1  christos 		}
    726          1.1  christos 		if(error == EINPROGRESS || error == EWOULDBLOCK)
    727          1.1  christos 			return; /* try again later */
    728          1.1  christos 		if(error != 0) {
    729          1.1  christos 			log_msg(LOG_ERR, "%s: Could not tcp connect to %s: %s",
    730          1.1  christos 				zone->apex_str, zone->master->ip_address_spec,
    731          1.1  christos 				strerror(error));
    732          1.1  christos 			xfrd_tcp_pipe_stop(tp);
    733          1.1  christos 			return;
    734          1.1  christos 		}
    735          1.1  christos 	}
    736          1.1  christos 	ret = conn_write(tcp);
    737          1.1  christos 	if(ret == -1) {
    738          1.1  christos 		log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
    739          1.1  christos 		xfrd_tcp_pipe_stop(tp);
    740          1.1  christos 		return;
    741          1.1  christos 	}
    742          1.1  christos 	if(tcp->total_bytes != 0 && !tp->connection_established)
    743          1.1  christos 		tp->connection_established = 1;
    744          1.1  christos 	if(ret == 0) {
    745          1.1  christos 		return; /* write again later */
    746          1.1  christos 	}
    747          1.1  christos 	/* done writing this message */
    748          1.1  christos 
    749          1.1  christos 	/* remove first zone from sendlist */
    750          1.1  christos 	tcp_pipe_sendlist_popfirst(tp, zone);
    751          1.1  christos 
    752          1.1  christos 	/* see if other zone wants to write; init; let it write (now) */
    753          1.1  christos 	/* and use a loop, because 64k stack calls is a too much */
    754          1.1  christos 	while(tp->tcp_send_first) {
    755          1.1  christos 		/* setup to write for this zone */
    756          1.1  christos 		xfrd_tcp_setup_write_packet(tp, tp->tcp_send_first);
    757          1.1  christos 		/* attempt to write for this zone (if success, continue loop)*/
    758          1.1  christos 		ret = conn_write(tcp);
    759          1.1  christos 		if(ret == -1) {
    760          1.1  christos 			log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
    761          1.1  christos 			xfrd_tcp_pipe_stop(tp);
    762          1.1  christos 			return;
    763          1.1  christos 		}
    764          1.1  christos 		if(ret == 0)
    765          1.1  christos 			return; /* write again later */
    766          1.1  christos 		tcp_pipe_sendlist_popfirst(tp, tp->tcp_send_first);
    767          1.1  christos 	}
    768          1.1  christos 
    769          1.1  christos 	/* if sendlist empty, remove WRITE from event */
    770          1.1  christos 
    771          1.1  christos 	/* listen to READ, and not WRITE events */
    772          1.1  christos 	assert(tp->tcp_send_first == NULL);
    773          1.1  christos 	tcp_pipe_reset_timeout(tp);
    774          1.1  christos }
    775          1.1  christos 
    776          1.1  christos int
    777  1.1.1.1.8.1    martin conn_read(struct xfrd_tcp* tcp)
    778          1.1  christos {
    779          1.1  christos 	ssize_t received;
    780          1.1  christos 	/* receive leading packet length bytes */
    781          1.1  christos 	if(tcp->total_bytes < sizeof(tcp->msglen)) {
    782          1.1  christos 		received = read(tcp->fd,
    783          1.1  christos 			(char*) &tcp->msglen + tcp->total_bytes,
    784          1.1  christos 			sizeof(tcp->msglen) - tcp->total_bytes);
    785          1.1  christos 		if(received == -1) {
    786          1.1  christos 			if(errno == EAGAIN || errno == EINTR) {
    787          1.1  christos 				/* read would block, try later */
    788          1.1  christos 				return 0;
    789          1.1  christos 			} else {
    790          1.1  christos #ifdef ECONNRESET
    791          1.1  christos 				if (verbosity >= 2 || errno != ECONNRESET)
    792          1.1  christos #endif /* ECONNRESET */
    793          1.1  christos 				log_msg(LOG_ERR, "tcp read sz: %s", strerror(errno));
    794          1.1  christos 				return -1;
    795          1.1  christos 			}
    796          1.1  christos 		} else if(received == 0) {
    797          1.1  christos 			/* EOF */
    798          1.1  christos 			return -1;
    799          1.1  christos 		}
    800          1.1  christos 		tcp->total_bytes += received;
    801          1.1  christos 		if(tcp->total_bytes < sizeof(tcp->msglen)) {
    802          1.1  christos 			/* not complete yet, try later */
    803          1.1  christos 			return 0;
    804          1.1  christos 		}
    805          1.1  christos 
    806          1.1  christos 		assert(tcp->total_bytes == sizeof(tcp->msglen));
    807          1.1  christos 		tcp->msglen = ntohs(tcp->msglen);
    808          1.1  christos 
    809          1.1  christos 		if(tcp->msglen == 0) {
    810          1.1  christos 			buffer_set_limit(tcp->packet, tcp->msglen);
    811          1.1  christos 			return 1;
    812          1.1  christos 		}
    813          1.1  christos 		if(tcp->msglen > buffer_capacity(tcp->packet)) {
    814          1.1  christos 			log_msg(LOG_ERR, "buffer too small, dropping connection");
    815          1.1  christos 			return 0;
    816          1.1  christos 		}
    817          1.1  christos 		buffer_set_limit(tcp->packet, tcp->msglen);
    818          1.1  christos 	}
    819          1.1  christos 
    820          1.1  christos 	assert(buffer_remaining(tcp->packet) > 0);
    821          1.1  christos 
    822          1.1  christos 	received = read(tcp->fd, buffer_current(tcp->packet),
    823          1.1  christos 		buffer_remaining(tcp->packet));
    824          1.1  christos 	if(received == -1) {
    825          1.1  christos 		if(errno == EAGAIN || errno == EINTR) {
    826          1.1  christos 			/* read would block, try later */
    827          1.1  christos 			return 0;
    828          1.1  christos 		} else {
    829          1.1  christos #ifdef ECONNRESET
    830          1.1  christos 			if (verbosity >= 2 || errno != ECONNRESET)
    831          1.1  christos #endif /* ECONNRESET */
    832          1.1  christos 			log_msg(LOG_ERR, "tcp read %s", strerror(errno));
    833          1.1  christos 			return -1;
    834          1.1  christos 		}
    835          1.1  christos 	} else if(received == 0) {
    836          1.1  christos 		/* EOF */
    837          1.1  christos 		return -1;
    838          1.1  christos 	}
    839          1.1  christos 
    840          1.1  christos 	tcp->total_bytes += received;
    841          1.1  christos 	buffer_skip(tcp->packet, received);
    842          1.1  christos 
    843          1.1  christos 	if(buffer_remaining(tcp->packet) > 0) {
    844          1.1  christos 		/* not complete yet, wait for more */
    845          1.1  christos 		return 0;
    846          1.1  christos 	}
    847          1.1  christos 
    848          1.1  christos 	/* completed */
    849          1.1  christos 	assert(buffer_position(tcp->packet) == tcp->msglen);
    850          1.1  christos 	return 1;
    851          1.1  christos }
    852          1.1  christos 
    853          1.1  christos void
    854          1.1  christos xfrd_tcp_read(struct xfrd_tcp_pipeline* tp)
    855          1.1  christos {
    856  1.1.1.1.8.1    martin 	xfrd_zone_type* zone;
    857  1.1.1.1.8.1    martin 	struct xfrd_tcp* tcp = tp->tcp_r;
    858          1.1  christos 	int ret;
    859          1.1  christos 	enum xfrd_packet_result pkt_result;
    860          1.1  christos 
    861          1.1  christos 	ret = conn_read(tcp);
    862          1.1  christos 	if(ret == -1) {
    863          1.1  christos 		xfrd_tcp_pipe_stop(tp);
    864          1.1  christos 		return;
    865          1.1  christos 	}
    866          1.1  christos 	if(ret == 0)
    867          1.1  christos 		return;
    868          1.1  christos 	/* completed msg */
    869          1.1  christos 	buffer_flip(tcp->packet);
    870          1.1  christos 	/* see which ID number it is, if skip, handle skip, NULL: warn */
    871          1.1  christos 	if(tcp->msglen < QHEADERSZ) {
    872          1.1  christos 		/* too short for DNS header, skip it */
    873          1.1  christos 		DEBUG(DEBUG_XFRD,1, (LOG_INFO,
    874          1.1  christos 			"xfrd: tcp skip response that is too short"));
    875          1.1  christos 		tcp_conn_ready_for_reading(tcp);
    876          1.1  christos 		return;
    877          1.1  christos 	}
    878          1.1  christos 	zone = tp->id[ID(tcp->packet)];
    879          1.1  christos 	if(!zone || zone == TCP_NULL_SKIP) {
    880          1.1  christos 		/* no zone for this id? skip it */
    881          1.1  christos 		DEBUG(DEBUG_XFRD,1, (LOG_INFO,
    882          1.1  christos 			"xfrd: tcp skip response with %s ID",
    883          1.1  christos 			zone?"set-to-skip":"unknown"));
    884          1.1  christos 		tcp_conn_ready_for_reading(tcp);
    885          1.1  christos 		return;
    886          1.1  christos 	}
    887          1.1  christos 	assert(zone->tcp_conn != -1);
    888          1.1  christos 
    889          1.1  christos 	/* handle message for zone */
    890          1.1  christos 	pkt_result = xfrd_handle_received_xfr_packet(zone, tcp->packet);
    891          1.1  christos 	/* setup for reading the next packet on this connection */
    892          1.1  christos 	tcp_conn_ready_for_reading(tcp);
    893          1.1  christos 	switch(pkt_result) {
    894          1.1  christos 		case xfrd_packet_more:
    895          1.1  christos 			/* wait for next packet */
    896          1.1  christos 			break;
    897          1.1  christos 		case xfrd_packet_newlease:
    898          1.1  christos 			/* set to skip if more packets with this ID */
    899          1.1  christos 			tp->id[zone->query_id] = TCP_NULL_SKIP;
    900          1.1  christos 			tp->num_skip++;
    901          1.1  christos 			/* fall through to remove zone from tp */
    902  1.1.1.1.8.1    martin 			/* fallthrough */
    903          1.1  christos 		case xfrd_packet_transfer:
    904          1.1  christos 			if(zone->zone_options->pattern->multi_master_check) {
    905          1.1  christos 				xfrd_tcp_release(xfrd->tcp_set, zone);
    906          1.1  christos 				xfrd_make_request(zone);
    907          1.1  christos 				break;
    908          1.1  christos 			}
    909          1.1  christos 			xfrd_tcp_release(xfrd->tcp_set, zone);
    910          1.1  christos 			assert(zone->round_num == -1);
    911          1.1  christos 			break;
    912          1.1  christos 		case xfrd_packet_notimpl:
    913          1.1  christos 			xfrd_disable_ixfr(zone);
    914          1.1  christos 			xfrd_tcp_release(xfrd->tcp_set, zone);
    915          1.1  christos 			/* query next server */
    916          1.1  christos 			xfrd_make_request(zone);
    917          1.1  christos 			break;
    918          1.1  christos 		case xfrd_packet_bad:
    919          1.1  christos 		case xfrd_packet_tcp:
    920          1.1  christos 		default:
    921          1.1  christos 			/* set to skip if more packets with this ID */
    922          1.1  christos 			tp->id[zone->query_id] = TCP_NULL_SKIP;
    923          1.1  christos 			tp->num_skip++;
    924          1.1  christos 			xfrd_tcp_release(xfrd->tcp_set, zone);
    925          1.1  christos 			/* query next server */
    926          1.1  christos 			xfrd_make_request(zone);
    927          1.1  christos 			break;
    928          1.1  christos 	}
    929          1.1  christos }
    930          1.1  christos 
    931          1.1  christos void
    932  1.1.1.1.8.1    martin xfrd_tcp_release(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
    933          1.1  christos {
    934          1.1  christos 	int conn = zone->tcp_conn;
    935          1.1  christos 	struct xfrd_tcp_pipeline* tp = set->tcp_state[conn];
    936          1.1  christos 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s released tcp conn to %s",
    937          1.1  christos 		zone->apex_str, zone->master->ip_address_spec));
    938          1.1  christos 	assert(zone->tcp_conn != -1);
    939          1.1  christos 	assert(zone->tcp_waiting == 0);
    940          1.1  christos 	zone->tcp_conn = -1;
    941          1.1  christos 	zone->tcp_waiting = 0;
    942          1.1  christos 
    943          1.1  christos 	/* remove from tcp_send list */
    944          1.1  christos 	tcp_pipe_sendlist_remove(tp, zone);
    945          1.1  christos 	/* remove it from the ID list */
    946          1.1  christos 	if(tp->id[zone->query_id] != TCP_NULL_SKIP)
    947          1.1  christos 		tcp_pipe_id_remove(tp, zone);
    948          1.1  christos 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: released tcp pipe now %d unused",
    949          1.1  christos 		tp->num_unused));
    950          1.1  christos 	/* if pipe was full, but no more, then see if waiting element is
    951          1.1  christos 	 * for the same master, and can fill the unused ID */
    952          1.1  christos 	if(tp->num_unused == 1 && set->tcp_waiting_first) {
    953          1.1  christos #ifdef INET6
    954          1.1  christos 		struct sockaddr_storage to;
    955          1.1  christos #else
    956          1.1  christos 		struct sockaddr_in to;
    957          1.1  christos #endif
    958          1.1  christos 		socklen_t to_len = xfrd_acl_sockaddr_to(
    959          1.1  christos 			set->tcp_waiting_first->master, &to);
    960          1.1  christos 		if(to_len == tp->ip_len && memcmp(&to, &tp->ip, to_len) == 0) {
    961          1.1  christos 			/* use this connection for the waiting zone */
    962          1.1  christos 			zone = set->tcp_waiting_first;
    963          1.1  christos 			assert(zone->tcp_conn == -1);
    964          1.1  christos 			zone->tcp_conn = conn;
    965          1.1  christos 			tcp_zone_waiting_list_popfirst(set, zone);
    966          1.1  christos 			if(zone->zone_handler.ev_fd != -1)
    967          1.1  christos 				xfrd_udp_release(zone);
    968          1.1  christos 			xfrd_unset_timer(zone);
    969          1.1  christos 			pipeline_setup_new_zone(set, tp, zone);
    970          1.1  christos 			return;
    971          1.1  christos 		}
    972          1.1  christos 		/* waiting zone did not go to same server */
    973          1.1  christos 	}
    974          1.1  christos 
    975          1.1  christos 	/* if all unused, or only skipped leftover, close the pipeline */
    976          1.1  christos 	if(tp->num_unused >= ID_PIPE_NUM || tp->num_skip >= ID_PIPE_NUM - tp->num_unused)
    977          1.1  christos 		xfrd_tcp_pipe_release(set, tp, conn);
    978          1.1  christos }
    979          1.1  christos 
    980          1.1  christos void
    981  1.1.1.1.8.1    martin xfrd_tcp_pipe_release(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp,
    982          1.1  christos 	int conn)
    983          1.1  christos {
    984          1.1  christos 	DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: tcp pipe released"));
    985          1.1  christos 	/* one handler per tcp pipe */
    986          1.1  christos 	if(tp->handler_added)
    987          1.1  christos 		event_del(&tp->handler);
    988          1.1  christos 	tp->handler_added = 0;
    989          1.1  christos 
    990          1.1  christos 	/* fd in tcp_r and tcp_w is the same, close once */
    991          1.1  christos 	if(tp->tcp_r->fd != -1)
    992          1.1  christos 		close(tp->tcp_r->fd);
    993          1.1  christos 	tp->tcp_r->fd = -1;
    994          1.1  christos 	tp->tcp_w->fd = -1;
    995          1.1  christos 
    996          1.1  christos 	/* remove from pipetree */
    997          1.1  christos 	(void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
    998          1.1  christos 
    999          1.1  christos 	/* a waiting zone can use the free tcp slot (to another server) */
   1000          1.1  christos 	/* if that zone fails to set-up or connect, we try to start the next
   1001          1.1  christos 	 * waiting zone in the list */
   1002          1.1  christos 	while(set->tcp_count == XFRD_MAX_TCP && set->tcp_waiting_first) {
   1003          1.1  christos 		int i;
   1004          1.1  christos 
   1005          1.1  christos 		/* pop first waiting process */
   1006  1.1.1.1.8.1    martin 		xfrd_zone_type* zone = set->tcp_waiting_first;
   1007          1.1  christos 		/* start it */
   1008          1.1  christos 		assert(zone->tcp_conn == -1);
   1009          1.1  christos 		zone->tcp_conn = conn;
   1010          1.1  christos 		tcp_zone_waiting_list_popfirst(set, zone);
   1011          1.1  christos 
   1012          1.1  christos 		/* stop udp (if any) */
   1013          1.1  christos 		if(zone->zone_handler.ev_fd != -1)
   1014          1.1  christos 			xfrd_udp_release(zone);
   1015          1.1  christos 		if(!xfrd_tcp_open(set, tp, zone)) {
   1016          1.1  christos 			zone->tcp_conn = -1;
   1017          1.1  christos 			xfrd_set_refresh_now(zone);
   1018          1.1  christos 			/* try to start the next zone (if any) */
   1019          1.1  christos 			continue;
   1020          1.1  christos 		}
   1021          1.1  christos 		/* re-init this tcppipe */
   1022          1.1  christos 		/* ip and ip_len set by tcp_open */
   1023          1.1  christos 		tp->node.key = tp;
   1024          1.1  christos 		tp->num_unused = ID_PIPE_NUM;
   1025          1.1  christos 		tp->num_skip = 0;
   1026          1.1  christos 		tp->tcp_send_first = NULL;
   1027          1.1  christos 		tp->tcp_send_last = NULL;
   1028          1.1  christos 		memset(tp->id, 0, sizeof(tp->id));
   1029          1.1  christos 		for(i=0; i<ID_PIPE_NUM; i++) {
   1030          1.1  christos 			tp->unused[i] = i;
   1031          1.1  christos 		}
   1032          1.1  christos 
   1033          1.1  christos 		/* insert into tree */
   1034          1.1  christos 		(void)rbtree_insert(set->pipetree, &tp->node);
   1035          1.1  christos 		/* setup write */
   1036          1.1  christos 		xfrd_unset_timer(zone);
   1037          1.1  christos 		pipeline_setup_new_zone(set, tp, zone);
   1038          1.1  christos 		/* started a task, no need for cleanups, so return */
   1039          1.1  christos 		return;
   1040          1.1  christos 	}
   1041          1.1  christos 	/* no task to start, cleanup */
   1042          1.1  christos 	assert(!set->tcp_waiting_first);
   1043          1.1  christos 	set->tcp_count --;
   1044          1.1  christos 	assert(set->tcp_count >= 0);
   1045          1.1  christos }
   1046          1.1  christos 
   1047