xfrd-tcp.c revision 1.1 1 /*
2 * xfrd-tcp.c - XFR (transfer) Daemon TCP system source file. Manages tcp conn.
3 *
4 * Copyright (c) 2001-2006, NLnet Labs. All rights reserved.
5 *
6 * See LICENSE for the license.
7 *
8 */
9
10 #include "config.h"
11 #include <assert.h>
12 #include <errno.h>
13 #include <fcntl.h>
14 #include <unistd.h>
15 #include <stdlib.h>
16 #include "nsd.h"
17 #include "xfrd-tcp.h"
18 #include "buffer.h"
19 #include "packet.h"
20 #include "dname.h"
21 #include "options.h"
22 #include "namedb.h"
23 #include "xfrd.h"
24 #include "xfrd-disk.h"
25 #include "util.h"
26
27 /* sort tcppipe, first on IP address, for an IPaddresss, sort on num_unused */
28 static int
29 xfrd_pipe_cmp(const void* a, const void* b)
30 {
31 const struct xfrd_tcp_pipeline* x = (struct xfrd_tcp_pipeline*)a;
32 const struct xfrd_tcp_pipeline* y = (struct xfrd_tcp_pipeline*)b;
33 int r;
34 if(x == y)
35 return 0;
36 if(y->ip_len != x->ip_len)
37 /* subtraction works because nonnegative and small numbers */
38 return (int)y->ip_len - (int)x->ip_len;
39 r = memcmp(&x->ip, &y->ip, x->ip_len);
40 if(r != 0)
41 return r;
42 /* sort that num_unused is sorted ascending, */
43 if(x->num_unused != y->num_unused) {
44 return (x->num_unused < y->num_unused) ? -1 : 1;
45 }
46 /* different pipelines are different still, even with same numunused*/
47 return (uintptr_t)x < (uintptr_t)y ? -1 : 1;
48 }
49
50 xfrd_tcp_set_t* xfrd_tcp_set_create(struct region* region)
51 {
52 int i;
53 xfrd_tcp_set_t* tcp_set = region_alloc(region, sizeof(xfrd_tcp_set_t));
54 memset(tcp_set, 0, sizeof(xfrd_tcp_set_t));
55 tcp_set->tcp_count = 0;
56 tcp_set->tcp_waiting_first = 0;
57 tcp_set->tcp_waiting_last = 0;
58 for(i=0; i<XFRD_MAX_TCP; i++)
59 tcp_set->tcp_state[i] = xfrd_tcp_pipeline_create(region);
60 tcp_set->pipetree = rbtree_create(region, &xfrd_pipe_cmp);
61 return tcp_set;
62 }
63
64 struct xfrd_tcp_pipeline*
65 xfrd_tcp_pipeline_create(region_type* region)
66 {
67 int i;
68 struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)
69 region_alloc_zero(region, sizeof(*tp));
70 tp->num_unused = ID_PIPE_NUM;
71 assert(sizeof(tp->unused)/sizeof(tp->unused[0]) == ID_PIPE_NUM);
72 for(i=0; i<ID_PIPE_NUM; i++)
73 tp->unused[i] = (uint16_t)i;
74 tp->tcp_r = xfrd_tcp_create(region, QIOBUFSZ);
75 tp->tcp_w = xfrd_tcp_create(region, 512);
76 return tp;
77 }
78
79 void
80 xfrd_setup_packet(buffer_type* packet,
81 uint16_t type, uint16_t klass, const dname_type* dname, uint16_t qid)
82 {
83 /* Set up the header */
84 buffer_clear(packet);
85 ID_SET(packet, qid);
86 FLAGS_SET(packet, 0);
87 OPCODE_SET(packet, OPCODE_QUERY);
88 QDCOUNT_SET(packet, 1);
89 ANCOUNT_SET(packet, 0);
90 NSCOUNT_SET(packet, 0);
91 ARCOUNT_SET(packet, 0);
92 buffer_skip(packet, QHEADERSZ);
93
94 /* The question record. */
95 buffer_write(packet, dname_name(dname), dname->name_size);
96 buffer_write_u16(packet, type);
97 buffer_write_u16(packet, klass);
98 }
99
100 static socklen_t
101 #ifdef INET6
102 xfrd_acl_sockaddr(acl_options_t* acl, unsigned int port,
103 struct sockaddr_storage *sck)
104 #else
105 xfrd_acl_sockaddr(acl_options_t* acl, unsigned int port,
106 struct sockaddr_in *sck, const char* fromto)
107 #endif /* INET6 */
108 {
109 /* setup address structure */
110 #ifdef INET6
111 memset(sck, 0, sizeof(struct sockaddr_storage));
112 #else
113 memset(sck, 0, sizeof(struct sockaddr_in));
114 #endif
115 if(acl->is_ipv6) {
116 #ifdef INET6
117 struct sockaddr_in6* sa = (struct sockaddr_in6*)sck;
118 sa->sin6_family = AF_INET6;
119 sa->sin6_port = htons(port);
120 sa->sin6_addr = acl->addr.addr6;
121 return sizeof(struct sockaddr_in6);
122 #else
123 log_msg(LOG_ERR, "xfrd: IPv6 connection %s %s attempted but no \
124 INET6.", fromto, acl->ip_address_spec);
125 return 0;
126 #endif
127 } else {
128 struct sockaddr_in* sa = (struct sockaddr_in*)sck;
129 sa->sin_family = AF_INET;
130 sa->sin_port = htons(port);
131 sa->sin_addr = acl->addr.addr;
132 return sizeof(struct sockaddr_in);
133 }
134 }
135
136 socklen_t
137 #ifdef INET6
138 xfrd_acl_sockaddr_to(acl_options_t* acl, struct sockaddr_storage *to)
139 #else
140 xfrd_acl_sockaddr_to(acl_options_t* acl, struct sockaddr_in *to)
141 #endif /* INET6 */
142 {
143 unsigned int port = acl->port?acl->port:(unsigned)atoi(TCP_PORT);
144 #ifdef INET6
145 return xfrd_acl_sockaddr(acl, port, to);
146 #else
147 return xfrd_acl_sockaddr(acl, port, to, "to");
148 #endif /* INET6 */
149 }
150
151 socklen_t
152 #ifdef INET6
153 xfrd_acl_sockaddr_frm(acl_options_t* acl, struct sockaddr_storage *frm)
154 #else
155 xfrd_acl_sockaddr_frm(acl_options_t* acl, struct sockaddr_in *frm)
156 #endif /* INET6 */
157 {
158 unsigned int port = acl->port?acl->port:0;
159 #ifdef INET6
160 return xfrd_acl_sockaddr(acl, port, frm);
161 #else
162 return xfrd_acl_sockaddr(acl, port, frm, "from");
163 #endif /* INET6 */
164 }
165
166 void
167 xfrd_write_soa_buffer(struct buffer* packet,
168 const dname_type* apex, struct xfrd_soa* soa)
169 {
170 size_t rdlength_pos;
171 uint16_t rdlength;
172 buffer_write(packet, dname_name(apex), apex->name_size);
173
174 /* already in network order */
175 buffer_write(packet, &soa->type, sizeof(soa->type));
176 buffer_write(packet, &soa->klass, sizeof(soa->klass));
177 buffer_write(packet, &soa->ttl, sizeof(soa->ttl));
178 rdlength_pos = buffer_position(packet);
179 buffer_skip(packet, sizeof(rdlength));
180
181 /* uncompressed dnames */
182 buffer_write(packet, soa->prim_ns+1, soa->prim_ns[0]);
183 buffer_write(packet, soa->email+1, soa->email[0]);
184
185 buffer_write(packet, &soa->serial, sizeof(uint32_t));
186 buffer_write(packet, &soa->refresh, sizeof(uint32_t));
187 buffer_write(packet, &soa->retry, sizeof(uint32_t));
188 buffer_write(packet, &soa->expire, sizeof(uint32_t));
189 buffer_write(packet, &soa->minimum, sizeof(uint32_t));
190
191 /* write length of RR */
192 rdlength = buffer_position(packet) - rdlength_pos - sizeof(rdlength);
193 buffer_write_u16_at(packet, rdlength_pos, rdlength);
194 }
195
196 xfrd_tcp_t*
197 xfrd_tcp_create(region_type* region, size_t bufsize)
198 {
199 xfrd_tcp_t* tcp_state = (xfrd_tcp_t*)region_alloc(
200 region, sizeof(xfrd_tcp_t));
201 memset(tcp_state, 0, sizeof(xfrd_tcp_t));
202 tcp_state->packet = buffer_create(region, bufsize);
203 tcp_state->fd = -1;
204
205 return tcp_state;
206 }
207
208 static struct xfrd_tcp_pipeline*
209 pipeline_find(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
210 {
211 rbnode_t* sme = NULL;
212 struct xfrd_tcp_pipeline* r;
213 /* smaller buf than a full pipeline with 64kb ID array, only need
214 * the front part with the key info, this front part contains the
215 * members that the compare function uses. */
216 const size_t keysize = sizeof(struct xfrd_tcp_pipeline) -
217 ID_PIPE_NUM*(sizeof(struct xfrd_zone*) + sizeof(uint16_t));
218 /* void* type for alignment of the struct,
219 * divide the keysize by ptr-size and then add one to round up */
220 void* buf[ (keysize / sizeof(void*)) + 1 ];
221 struct xfrd_tcp_pipeline* key = (struct xfrd_tcp_pipeline*)buf;
222 key->node.key = key;
223 key->ip_len = xfrd_acl_sockaddr_to(zone->master, &key->ip);
224 key->num_unused = ID_PIPE_NUM;
225 /* lookup existing tcp transfer to the master with highest unused */
226 if(rbtree_find_less_equal(set->pipetree, key, &sme)) {
227 /* exact match, strange, fully unused tcp cannot be open */
228 assert(0);
229 }
230 if(!sme)
231 return NULL;
232 r = (struct xfrd_tcp_pipeline*)sme->key;
233 /* <= key pointed at, is the master correct ? */
234 if(r->ip_len != key->ip_len)
235 return NULL;
236 if(memcmp(&r->ip, &key->ip, key->ip_len) != 0)
237 return NULL;
238 /* correct master, is there a slot free for this transfer? */
239 if(r->num_unused == 0)
240 return NULL;
241 return r;
242 }
243
244 /* remove zone from tcp waiting list */
245 static void
246 tcp_zone_waiting_list_popfirst(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
247 {
248 assert(zone->tcp_waiting);
249 set->tcp_waiting_first = zone->tcp_waiting_next;
250 if(zone->tcp_waiting_next)
251 zone->tcp_waiting_next->tcp_waiting_prev = NULL;
252 else set->tcp_waiting_last = 0;
253 zone->tcp_waiting_next = 0;
254 zone->tcp_waiting = 0;
255 }
256
257 /* remove zone from tcp pipe write-wait list */
258 static void
259 tcp_pipe_sendlist_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
260 {
261 if(zone->in_tcp_send) {
262 if(zone->tcp_send_prev)
263 zone->tcp_send_prev->tcp_send_next=zone->tcp_send_next;
264 else tp->tcp_send_first=zone->tcp_send_next;
265 if(zone->tcp_send_next)
266 zone->tcp_send_next->tcp_send_prev=zone->tcp_send_prev;
267 else tp->tcp_send_last=zone->tcp_send_prev;
268 zone->in_tcp_send = 0;
269 }
270 }
271
272 /* remove first from write-wait list */
273 static void
274 tcp_pipe_sendlist_popfirst(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
275 {
276 tp->tcp_send_first = zone->tcp_send_next;
277 if(tp->tcp_send_first)
278 tp->tcp_send_first->tcp_send_prev = NULL;
279 else tp->tcp_send_last = NULL;
280 zone->in_tcp_send = 0;
281 }
282
283 /* remove zone from tcp pipe ID map */
284 static void
285 tcp_pipe_id_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
286 {
287 assert(tp->num_unused < ID_PIPE_NUM && tp->num_unused >= 0);
288 assert(tp->id[zone->query_id] == zone);
289 tp->id[zone->query_id] = NULL;
290 tp->unused[tp->num_unused] = zone->query_id;
291 /* must remove and re-add for sort order in tree */
292 (void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
293 tp->num_unused++;
294 (void)rbtree_insert(xfrd->tcp_set->pipetree, &tp->node);
295 }
296
297 /* stop the tcp pipe (and all its zones need to retry) */
298 static void
299 xfrd_tcp_pipe_stop(struct xfrd_tcp_pipeline* tp)
300 {
301 int i, conn = -1;
302 assert(tp->num_unused < ID_PIPE_NUM); /* at least one 'in-use' */
303 assert(ID_PIPE_NUM - tp->num_unused > tp->num_skip); /* at least one 'nonskip' */
304 /* need to retry for all the zones connected to it */
305 /* these could use different lists and go to a different nextmaster*/
306 for(i=0; i<ID_PIPE_NUM; i++) {
307 if(tp->id[i] && tp->id[i] != TCP_NULL_SKIP) {
308 xfrd_zone_t* zone = tp->id[i];
309 conn = zone->tcp_conn;
310 zone->tcp_conn = -1;
311 zone->tcp_waiting = 0;
312 tcp_pipe_sendlist_remove(tp, zone);
313 tcp_pipe_id_remove(tp, zone);
314 xfrd_set_refresh_now(zone);
315 }
316 }
317 assert(conn != -1);
318 /* now release the entire tcp pipe */
319 xfrd_tcp_pipe_release(xfrd->tcp_set, tp, conn);
320 }
321
322 static void
323 tcp_pipe_reset_timeout(struct xfrd_tcp_pipeline* tp)
324 {
325 int fd = tp->handler.ev_fd;
326 struct timeval tv;
327 tv.tv_sec = xfrd->tcp_set->tcp_timeout;
328 tv.tv_usec = 0;
329 if(tp->handler_added)
330 event_del(&tp->handler);
331 event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|
332 (tp->tcp_send_first?EV_WRITE:0), xfrd_handle_tcp_pipe, tp);
333 if(event_base_set(xfrd->event_base, &tp->handler) != 0)
334 log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
335 if(event_add(&tp->handler, &tv) != 0)
336 log_msg(LOG_ERR, "xfrd tcp: event_add failed");
337 tp->handler_added = 1;
338 }
339
340 /* handle event from fd of tcp pipe */
341 void
342 xfrd_handle_tcp_pipe(int ATTR_UNUSED(fd), short event, void* arg)
343 {
344 struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)arg;
345 if((event & EV_WRITE)) {
346 tcp_pipe_reset_timeout(tp);
347 if(tp->tcp_send_first) {
348 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp write, zone %s",
349 tp->tcp_send_first->apex_str));
350 xfrd_tcp_write(tp, tp->tcp_send_first);
351 }
352 }
353 if((event & EV_READ) && tp->handler_added) {
354 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp read"));
355 tcp_pipe_reset_timeout(tp);
356 xfrd_tcp_read(tp);
357 }
358 if((event & EV_TIMEOUT) && tp->handler_added) {
359 /* tcp connection timed out */
360 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp timeout"));
361 xfrd_tcp_pipe_stop(tp);
362 }
363 }
364
365 /* add a zone to the pipeline, it starts to want to write its query */
366 static void
367 pipeline_setup_new_zone(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp,
368 xfrd_zone_t* zone)
369 {
370 /* assign the ID */
371 int idx;
372 assert(tp->num_unused > 0);
373 /* we pick a random ID, even though it is TCP anyway */
374 idx = random_generate(tp->num_unused);
375 zone->query_id = tp->unused[idx];
376 tp->unused[idx] = tp->unused[tp->num_unused-1];
377 tp->id[zone->query_id] = zone;
378 /* decrement unused counter, and fixup tree */
379 (void)rbtree_delete(set->pipetree, &tp->node);
380 tp->num_unused--;
381 (void)rbtree_insert(set->pipetree, &tp->node);
382
383 /* add to sendlist, at end */
384 zone->tcp_send_next = NULL;
385 zone->tcp_send_prev = tp->tcp_send_last;
386 zone->in_tcp_send = 1;
387 if(tp->tcp_send_last)
388 tp->tcp_send_last->tcp_send_next = zone;
389 else tp->tcp_send_first = zone;
390 tp->tcp_send_last = zone;
391
392 /* is it first in line? */
393 if(tp->tcp_send_first == zone) {
394 xfrd_tcp_setup_write_packet(tp, zone);
395 /* add write to event handler */
396 tcp_pipe_reset_timeout(tp);
397 }
398 }
399
400 void
401 xfrd_tcp_obtain(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
402 {
403 struct xfrd_tcp_pipeline* tp;
404 assert(zone->tcp_conn == -1);
405 assert(zone->tcp_waiting == 0);
406
407 if(set->tcp_count < XFRD_MAX_TCP) {
408 int i;
409 assert(!set->tcp_waiting_first);
410 set->tcp_count ++;
411 /* find a free tcp_buffer */
412 for(i=0; i<XFRD_MAX_TCP; i++) {
413 if(set->tcp_state[i]->tcp_r->fd == -1) {
414 zone->tcp_conn = i;
415 break;
416 }
417 }
418 /** What if there is no free tcp_buffer? return; */
419 if (zone->tcp_conn < 0) {
420 return;
421 }
422
423 tp = set->tcp_state[zone->tcp_conn];
424 zone->tcp_waiting = 0;
425
426 /* stop udp use (if any) */
427 if(zone->zone_handler.ev_fd != -1)
428 xfrd_udp_release(zone);
429
430 if(!xfrd_tcp_open(set, tp, zone)) {
431 zone->tcp_conn = -1;
432 set->tcp_count --;
433 xfrd_set_refresh_now(zone);
434 return;
435 }
436 /* ip and ip_len set by tcp_open */
437 tp->node.key = tp;
438 tp->num_unused = ID_PIPE_NUM;
439 tp->num_skip = 0;
440 tp->tcp_send_first = NULL;
441 tp->tcp_send_last = NULL;
442 memset(tp->id, 0, sizeof(tp->id));
443 for(i=0; i<ID_PIPE_NUM; i++) {
444 tp->unused[i] = i;
445 }
446
447 /* insert into tree */
448 (void)rbtree_insert(set->pipetree, &tp->node);
449 xfrd_deactivate_zone(zone);
450 xfrd_unset_timer(zone);
451 pipeline_setup_new_zone(set, tp, zone);
452 return;
453 }
454 /* check for a pipeline to the same master with unused ID */
455 if((tp = pipeline_find(set, zone))!= NULL) {
456 int i;
457 if(zone->zone_handler.ev_fd != -1)
458 xfrd_udp_release(zone);
459 for(i=0; i<XFRD_MAX_TCP; i++) {
460 if(set->tcp_state[i] == tp)
461 zone->tcp_conn = i;
462 }
463 xfrd_deactivate_zone(zone);
464 xfrd_unset_timer(zone);
465 pipeline_setup_new_zone(set, tp, zone);
466 return;
467 }
468
469 /* wait, at end of line */
470 DEBUG(DEBUG_XFRD,2, (LOG_INFO, "xfrd: max number of tcp "
471 "connections (%d) reached.", XFRD_MAX_TCP));
472 zone->tcp_waiting_next = 0;
473 zone->tcp_waiting_prev = set->tcp_waiting_last;
474 zone->tcp_waiting = 1;
475 if(!set->tcp_waiting_last) {
476 set->tcp_waiting_first = zone;
477 set->tcp_waiting_last = zone;
478 } else {
479 set->tcp_waiting_last->tcp_waiting_next = zone;
480 set->tcp_waiting_last = zone;
481 }
482 xfrd_deactivate_zone(zone);
483 xfrd_unset_timer(zone);
484 }
485
486 int
487 xfrd_tcp_open(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp,
488 xfrd_zone_t* zone)
489 {
490 int fd, family, conn;
491 struct timeval tv;
492 assert(zone->tcp_conn != -1);
493
494 /* if there is no next master, fallback to use the first one */
495 /* but there really should be a master set */
496 if(!zone->master) {
497 zone->master = zone->zone_options->pattern->request_xfr;
498 zone->master_num = 0;
499 }
500
501 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s open tcp conn to %s",
502 zone->apex_str, zone->master->ip_address_spec));
503 tp->tcp_r->is_reading = 1;
504 tp->tcp_r->total_bytes = 0;
505 tp->tcp_r->msglen = 0;
506 buffer_clear(tp->tcp_r->packet);
507 tp->tcp_w->is_reading = 0;
508 tp->tcp_w->total_bytes = 0;
509 tp->tcp_w->msglen = 0;
510 tp->connection_established = 0;
511
512 if(zone->master->is_ipv6) {
513 #ifdef INET6
514 family = PF_INET6;
515 #else
516 xfrd_set_refresh_now(zone);
517 return 0;
518 #endif
519 } else {
520 family = PF_INET;
521 }
522 fd = socket(family, SOCK_STREAM, IPPROTO_TCP);
523 if(fd == -1) {
524 log_msg(LOG_ERR, "xfrd: %s cannot create tcp socket: %s",
525 zone->master->ip_address_spec, strerror(errno));
526 xfrd_set_refresh_now(zone);
527 return 0;
528 }
529 if(fcntl(fd, F_SETFL, O_NONBLOCK) == -1) {
530 log_msg(LOG_ERR, "xfrd: fcntl failed: %s", strerror(errno));
531 close(fd);
532 xfrd_set_refresh_now(zone);
533 return 0;
534 }
535
536 if(xfrd->nsd->outgoing_tcp_mss > 0) {
537 #if defined(IPPROTO_TCP) && defined(TCP_MAXSEG)
538 if(setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG,
539 (void*)&xfrd->nsd->outgoing_tcp_mss,
540 sizeof(xfrd->nsd->outgoing_tcp_mss)) < 0) {
541 log_msg(LOG_ERR, "xfrd: setsockopt(TCP_MAXSEG)"
542 "failed: %s", strerror(errno));
543 }
544 #else
545 log_msg(LOG_ERR, "setsockopt(TCP_MAXSEG) unsupported");
546 #endif
547 }
548
549 tp->ip_len = xfrd_acl_sockaddr_to(zone->master, &tp->ip);
550
551 /* bind it */
552 if (!xfrd_bind_local_interface(fd, zone->zone_options->pattern->
553 outgoing_interface, zone->master, 1)) {
554 close(fd);
555 xfrd_set_refresh_now(zone);
556 return 0;
557 }
558
559 conn = connect(fd, (struct sockaddr*)&tp->ip, tp->ip_len);
560 if (conn == -1 && errno != EINPROGRESS) {
561 log_msg(LOG_ERR, "xfrd: connect %s failed: %s",
562 zone->master->ip_address_spec, strerror(errno));
563 close(fd);
564 xfrd_set_refresh_now(zone);
565 return 0;
566 }
567 tp->tcp_r->fd = fd;
568 tp->tcp_w->fd = fd;
569
570 /* set the tcp pipe event */
571 if(tp->handler_added)
572 event_del(&tp->handler);
573 event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|EV_WRITE,
574 xfrd_handle_tcp_pipe, tp);
575 if(event_base_set(xfrd->event_base, &tp->handler) != 0)
576 log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
577 tv.tv_sec = set->tcp_timeout;
578 tv.tv_usec = 0;
579 if(event_add(&tp->handler, &tv) != 0)
580 log_msg(LOG_ERR, "xfrd tcp: event_add failed");
581 tp->handler_added = 1;
582 return 1;
583 }
584
585 void
586 xfrd_tcp_setup_write_packet(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
587 {
588 xfrd_tcp_t* tcp = tp->tcp_w;
589 assert(zone->tcp_conn != -1);
590 assert(zone->tcp_waiting == 0);
591 /* start AXFR or IXFR for the zone */
592 if(zone->soa_disk_acquired == 0 || zone->master->use_axfr_only ||
593 zone->master->ixfr_disabled ||
594 /* if zone expired, after the first round, do not ask for
595 * IXFR any more, but full AXFR (of any serial number) */
596 (zone->state == xfrd_zone_expired && zone->round_num != 0)) {
597 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request full zone transfer "
598 "(AXFR) for %s to %s",
599 zone->apex_str, zone->master->ip_address_spec));
600
601 xfrd_setup_packet(tcp->packet, TYPE_AXFR, CLASS_IN, zone->apex,
602 zone->query_id);
603 } else {
604 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request incremental zone "
605 "transfer (IXFR) for %s to %s",
606 zone->apex_str, zone->master->ip_address_spec));
607
608 xfrd_setup_packet(tcp->packet, TYPE_IXFR, CLASS_IN, zone->apex,
609 zone->query_id);
610 NSCOUNT_SET(tcp->packet, 1);
611 xfrd_write_soa_buffer(tcp->packet, zone->apex, &zone->soa_disk);
612 }
613 /* old transfer needs to be removed still? */
614 if(zone->msg_seq_nr)
615 xfrd_unlink_xfrfile(xfrd->nsd, zone->xfrfilenumber);
616 zone->msg_seq_nr = 0;
617 zone->msg_rr_count = 0;
618 if(zone->master->key_options && zone->master->key_options->tsig_key) {
619 xfrd_tsig_sign_request(tcp->packet, &zone->tsig, zone->master);
620 }
621 buffer_flip(tcp->packet);
622 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "sent tcp query with ID %d", zone->query_id));
623 tcp->msglen = buffer_limit(tcp->packet);
624 tcp->total_bytes = 0;
625 }
626
627 static void
628 tcp_conn_ready_for_reading(xfrd_tcp_t* tcp)
629 {
630 tcp->total_bytes = 0;
631 tcp->msglen = 0;
632 buffer_clear(tcp->packet);
633 }
634
635 int conn_write(xfrd_tcp_t* tcp)
636 {
637 ssize_t sent;
638
639 if(tcp->total_bytes < sizeof(tcp->msglen)) {
640 uint16_t sendlen = htons(tcp->msglen);
641 sent = write(tcp->fd,
642 (const char*)&sendlen + tcp->total_bytes,
643 sizeof(tcp->msglen) - tcp->total_bytes);
644
645 if(sent == -1) {
646 if(errno == EAGAIN || errno == EINTR) {
647 /* write would block, try later */
648 return 0;
649 } else {
650 return -1;
651 }
652 }
653
654 tcp->total_bytes += sent;
655 if(tcp->total_bytes < sizeof(tcp->msglen)) {
656 /* incomplete write, resume later */
657 return 0;
658 }
659 assert(tcp->total_bytes == sizeof(tcp->msglen));
660 }
661
662 assert(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen));
663
664 sent = write(tcp->fd,
665 buffer_current(tcp->packet),
666 buffer_remaining(tcp->packet));
667 if(sent == -1) {
668 if(errno == EAGAIN || errno == EINTR) {
669 /* write would block, try later */
670 return 0;
671 } else {
672 return -1;
673 }
674 }
675
676 buffer_skip(tcp->packet, sent);
677 tcp->total_bytes += sent;
678
679 if(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen)) {
680 /* more to write when socket becomes writable again */
681 return 0;
682 }
683
684 assert(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen));
685 return 1;
686 }
687
688 void
689 xfrd_tcp_write(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
690 {
691 int ret;
692 xfrd_tcp_t* tcp = tp->tcp_w;
693 assert(zone->tcp_conn != -1);
694 assert(zone == tp->tcp_send_first);
695 /* see if for non-established connection, there is a connect error */
696 if(!tp->connection_established) {
697 /* check for pending error from nonblocking connect */
698 /* from Stevens, unix network programming, vol1, 3rd ed, p450 */
699 int error = 0;
700 socklen_t len = sizeof(error);
701 if(getsockopt(tcp->fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0){
702 error = errno; /* on solaris errno is error */
703 }
704 if(error == EINPROGRESS || error == EWOULDBLOCK)
705 return; /* try again later */
706 if(error != 0) {
707 log_msg(LOG_ERR, "%s: Could not tcp connect to %s: %s",
708 zone->apex_str, zone->master->ip_address_spec,
709 strerror(error));
710 xfrd_tcp_pipe_stop(tp);
711 return;
712 }
713 }
714 ret = conn_write(tcp);
715 if(ret == -1) {
716 log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
717 xfrd_tcp_pipe_stop(tp);
718 return;
719 }
720 if(tcp->total_bytes != 0 && !tp->connection_established)
721 tp->connection_established = 1;
722 if(ret == 0) {
723 return; /* write again later */
724 }
725 /* done writing this message */
726
727 /* remove first zone from sendlist */
728 tcp_pipe_sendlist_popfirst(tp, zone);
729
730 /* see if other zone wants to write; init; let it write (now) */
731 /* and use a loop, because 64k stack calls is a too much */
732 while(tp->tcp_send_first) {
733 /* setup to write for this zone */
734 xfrd_tcp_setup_write_packet(tp, tp->tcp_send_first);
735 /* attempt to write for this zone (if success, continue loop)*/
736 ret = conn_write(tcp);
737 if(ret == -1) {
738 log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
739 xfrd_tcp_pipe_stop(tp);
740 return;
741 }
742 if(ret == 0)
743 return; /* write again later */
744 tcp_pipe_sendlist_popfirst(tp, tp->tcp_send_first);
745 }
746
747 /* if sendlist empty, remove WRITE from event */
748
749 /* listen to READ, and not WRITE events */
750 assert(tp->tcp_send_first == NULL);
751 tcp_pipe_reset_timeout(tp);
752 }
753
754 int
755 conn_read(xfrd_tcp_t* tcp)
756 {
757 ssize_t received;
758 /* receive leading packet length bytes */
759 if(tcp->total_bytes < sizeof(tcp->msglen)) {
760 received = read(tcp->fd,
761 (char*) &tcp->msglen + tcp->total_bytes,
762 sizeof(tcp->msglen) - tcp->total_bytes);
763 if(received == -1) {
764 if(errno == EAGAIN || errno == EINTR) {
765 /* read would block, try later */
766 return 0;
767 } else {
768 #ifdef ECONNRESET
769 if (verbosity >= 2 || errno != ECONNRESET)
770 #endif /* ECONNRESET */
771 log_msg(LOG_ERR, "tcp read sz: %s", strerror(errno));
772 return -1;
773 }
774 } else if(received == 0) {
775 /* EOF */
776 return -1;
777 }
778 tcp->total_bytes += received;
779 if(tcp->total_bytes < sizeof(tcp->msglen)) {
780 /* not complete yet, try later */
781 return 0;
782 }
783
784 assert(tcp->total_bytes == sizeof(tcp->msglen));
785 tcp->msglen = ntohs(tcp->msglen);
786
787 if(tcp->msglen == 0) {
788 buffer_set_limit(tcp->packet, tcp->msglen);
789 return 1;
790 }
791 if(tcp->msglen > buffer_capacity(tcp->packet)) {
792 log_msg(LOG_ERR, "buffer too small, dropping connection");
793 return 0;
794 }
795 buffer_set_limit(tcp->packet, tcp->msglen);
796 }
797
798 assert(buffer_remaining(tcp->packet) > 0);
799
800 received = read(tcp->fd, buffer_current(tcp->packet),
801 buffer_remaining(tcp->packet));
802 if(received == -1) {
803 if(errno == EAGAIN || errno == EINTR) {
804 /* read would block, try later */
805 return 0;
806 } else {
807 #ifdef ECONNRESET
808 if (verbosity >= 2 || errno != ECONNRESET)
809 #endif /* ECONNRESET */
810 log_msg(LOG_ERR, "tcp read %s", strerror(errno));
811 return -1;
812 }
813 } else if(received == 0) {
814 /* EOF */
815 return -1;
816 }
817
818 tcp->total_bytes += received;
819 buffer_skip(tcp->packet, received);
820
821 if(buffer_remaining(tcp->packet) > 0) {
822 /* not complete yet, wait for more */
823 return 0;
824 }
825
826 /* completed */
827 assert(buffer_position(tcp->packet) == tcp->msglen);
828 return 1;
829 }
830
831 void
832 xfrd_tcp_read(struct xfrd_tcp_pipeline* tp)
833 {
834 xfrd_zone_t* zone;
835 xfrd_tcp_t* tcp = tp->tcp_r;
836 int ret;
837 enum xfrd_packet_result pkt_result;
838
839 ret = conn_read(tcp);
840 if(ret == -1) {
841 xfrd_tcp_pipe_stop(tp);
842 return;
843 }
844 if(ret == 0)
845 return;
846 /* completed msg */
847 buffer_flip(tcp->packet);
848 /* see which ID number it is, if skip, handle skip, NULL: warn */
849 if(tcp->msglen < QHEADERSZ) {
850 /* too short for DNS header, skip it */
851 DEBUG(DEBUG_XFRD,1, (LOG_INFO,
852 "xfrd: tcp skip response that is too short"));
853 tcp_conn_ready_for_reading(tcp);
854 return;
855 }
856 zone = tp->id[ID(tcp->packet)];
857 if(!zone || zone == TCP_NULL_SKIP) {
858 /* no zone for this id? skip it */
859 DEBUG(DEBUG_XFRD,1, (LOG_INFO,
860 "xfrd: tcp skip response with %s ID",
861 zone?"set-to-skip":"unknown"));
862 tcp_conn_ready_for_reading(tcp);
863 return;
864 }
865 assert(zone->tcp_conn != -1);
866
867 /* handle message for zone */
868 pkt_result = xfrd_handle_received_xfr_packet(zone, tcp->packet);
869 /* setup for reading the next packet on this connection */
870 tcp_conn_ready_for_reading(tcp);
871 switch(pkt_result) {
872 case xfrd_packet_more:
873 /* wait for next packet */
874 break;
875 case xfrd_packet_newlease:
876 /* set to skip if more packets with this ID */
877 tp->id[zone->query_id] = TCP_NULL_SKIP;
878 tp->num_skip++;
879 /* fall through to remove zone from tp */
880 case xfrd_packet_transfer:
881 if(zone->zone_options->pattern->multi_master_check) {
882 xfrd_tcp_release(xfrd->tcp_set, zone);
883 xfrd_make_request(zone);
884 break;
885 }
886 xfrd_tcp_release(xfrd->tcp_set, zone);
887 assert(zone->round_num == -1);
888 break;
889 case xfrd_packet_notimpl:
890 xfrd_disable_ixfr(zone);
891 xfrd_tcp_release(xfrd->tcp_set, zone);
892 /* query next server */
893 xfrd_make_request(zone);
894 break;
895 case xfrd_packet_bad:
896 case xfrd_packet_tcp:
897 default:
898 /* set to skip if more packets with this ID */
899 tp->id[zone->query_id] = TCP_NULL_SKIP;
900 tp->num_skip++;
901 xfrd_tcp_release(xfrd->tcp_set, zone);
902 /* query next server */
903 xfrd_make_request(zone);
904 break;
905 }
906 }
907
908 void
909 xfrd_tcp_release(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
910 {
911 int conn = zone->tcp_conn;
912 struct xfrd_tcp_pipeline* tp = set->tcp_state[conn];
913 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s released tcp conn to %s",
914 zone->apex_str, zone->master->ip_address_spec));
915 assert(zone->tcp_conn != -1);
916 assert(zone->tcp_waiting == 0);
917 zone->tcp_conn = -1;
918 zone->tcp_waiting = 0;
919
920 /* remove from tcp_send list */
921 tcp_pipe_sendlist_remove(tp, zone);
922 /* remove it from the ID list */
923 if(tp->id[zone->query_id] != TCP_NULL_SKIP)
924 tcp_pipe_id_remove(tp, zone);
925 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: released tcp pipe now %d unused",
926 tp->num_unused));
927 /* if pipe was full, but no more, then see if waiting element is
928 * for the same master, and can fill the unused ID */
929 if(tp->num_unused == 1 && set->tcp_waiting_first) {
930 #ifdef INET6
931 struct sockaddr_storage to;
932 #else
933 struct sockaddr_in to;
934 #endif
935 socklen_t to_len = xfrd_acl_sockaddr_to(
936 set->tcp_waiting_first->master, &to);
937 if(to_len == tp->ip_len && memcmp(&to, &tp->ip, to_len) == 0) {
938 /* use this connection for the waiting zone */
939 zone = set->tcp_waiting_first;
940 assert(zone->tcp_conn == -1);
941 zone->tcp_conn = conn;
942 tcp_zone_waiting_list_popfirst(set, zone);
943 if(zone->zone_handler.ev_fd != -1)
944 xfrd_udp_release(zone);
945 xfrd_unset_timer(zone);
946 pipeline_setup_new_zone(set, tp, zone);
947 return;
948 }
949 /* waiting zone did not go to same server */
950 }
951
952 /* if all unused, or only skipped leftover, close the pipeline */
953 if(tp->num_unused >= ID_PIPE_NUM || tp->num_skip >= ID_PIPE_NUM - tp->num_unused)
954 xfrd_tcp_pipe_release(set, tp, conn);
955 }
956
957 void
958 xfrd_tcp_pipe_release(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp,
959 int conn)
960 {
961 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: tcp pipe released"));
962 /* one handler per tcp pipe */
963 if(tp->handler_added)
964 event_del(&tp->handler);
965 tp->handler_added = 0;
966
967 /* fd in tcp_r and tcp_w is the same, close once */
968 if(tp->tcp_r->fd != -1)
969 close(tp->tcp_r->fd);
970 tp->tcp_r->fd = -1;
971 tp->tcp_w->fd = -1;
972
973 /* remove from pipetree */
974 (void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
975
976 /* a waiting zone can use the free tcp slot (to another server) */
977 /* if that zone fails to set-up or connect, we try to start the next
978 * waiting zone in the list */
979 while(set->tcp_count == XFRD_MAX_TCP && set->tcp_waiting_first) {
980 int i;
981
982 /* pop first waiting process */
983 xfrd_zone_t* zone = set->tcp_waiting_first;
984 /* start it */
985 assert(zone->tcp_conn == -1);
986 zone->tcp_conn = conn;
987 tcp_zone_waiting_list_popfirst(set, zone);
988
989 /* stop udp (if any) */
990 if(zone->zone_handler.ev_fd != -1)
991 xfrd_udp_release(zone);
992 if(!xfrd_tcp_open(set, tp, zone)) {
993 zone->tcp_conn = -1;
994 xfrd_set_refresh_now(zone);
995 /* try to start the next zone (if any) */
996 continue;
997 }
998 /* re-init this tcppipe */
999 /* ip and ip_len set by tcp_open */
1000 tp->node.key = tp;
1001 tp->num_unused = ID_PIPE_NUM;
1002 tp->num_skip = 0;
1003 tp->tcp_send_first = NULL;
1004 tp->tcp_send_last = NULL;
1005 memset(tp->id, 0, sizeof(tp->id));
1006 for(i=0; i<ID_PIPE_NUM; i++) {
1007 tp->unused[i] = i;
1008 }
1009
1010 /* insert into tree */
1011 (void)rbtree_insert(set->pipetree, &tp->node);
1012 /* setup write */
1013 xfrd_unset_timer(zone);
1014 pipeline_setup_new_zone(set, tp, zone);
1015 /* started a task, no need for cleanups, so return */
1016 return;
1017 }
1018 /* no task to start, cleanup */
1019 assert(!set->tcp_waiting_first);
1020 set->tcp_count --;
1021 assert(set->tcp_count >= 0);
1022 }
1023
1024