xfrd-tcp.c revision 1.1.1.1.8.1 1 /*
2 * xfrd-tcp.c - XFR (transfer) Daemon TCP system source file. Manages tcp conn.
3 *
4 * Copyright (c) 2001-2006, NLnet Labs. All rights reserved.
5 *
6 * See LICENSE for the license.
7 *
8 */
9
10 #include "config.h"
11 #include <assert.h>
12 #include <errno.h>
13 #include <fcntl.h>
14 #include <unistd.h>
15 #include <stdlib.h>
16 #include <sys/uio.h>
17 #include "nsd.h"
18 #include "xfrd-tcp.h"
19 #include "buffer.h"
20 #include "packet.h"
21 #include "dname.h"
22 #include "options.h"
23 #include "namedb.h"
24 #include "xfrd.h"
25 #include "xfrd-disk.h"
26 #include "util.h"
27
28 /* sort tcppipe, first on IP address, for an IPaddresss, sort on num_unused */
29 static int
30 xfrd_pipe_cmp(const void* a, const void* b)
31 {
32 const struct xfrd_tcp_pipeline* x = (struct xfrd_tcp_pipeline*)a;
33 const struct xfrd_tcp_pipeline* y = (struct xfrd_tcp_pipeline*)b;
34 int r;
35 if(x == y)
36 return 0;
37 if(y->ip_len != x->ip_len)
38 /* subtraction works because nonnegative and small numbers */
39 return (int)y->ip_len - (int)x->ip_len;
40 r = memcmp(&x->ip, &y->ip, x->ip_len);
41 if(r != 0)
42 return r;
43 /* sort that num_unused is sorted ascending, */
44 if(x->num_unused != y->num_unused) {
45 return (x->num_unused < y->num_unused) ? -1 : 1;
46 }
47 /* different pipelines are different still, even with same numunused*/
48 return (uintptr_t)x < (uintptr_t)y ? -1 : 1;
49 }
50
51 struct xfrd_tcp_set* xfrd_tcp_set_create(struct region* region)
52 {
53 int i;
54 struct xfrd_tcp_set* tcp_set = region_alloc(region,
55 sizeof(struct xfrd_tcp_set));
56 memset(tcp_set, 0, sizeof(struct xfrd_tcp_set));
57 tcp_set->tcp_count = 0;
58 tcp_set->tcp_waiting_first = 0;
59 tcp_set->tcp_waiting_last = 0;
60 for(i=0; i<XFRD_MAX_TCP; i++)
61 tcp_set->tcp_state[i] = xfrd_tcp_pipeline_create(region);
62 tcp_set->pipetree = rbtree_create(region, &xfrd_pipe_cmp);
63 return tcp_set;
64 }
65
66 struct xfrd_tcp_pipeline*
67 xfrd_tcp_pipeline_create(region_type* region)
68 {
69 int i;
70 struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)
71 region_alloc_zero(region, sizeof(*tp));
72 tp->num_unused = ID_PIPE_NUM;
73 assert(sizeof(tp->unused)/sizeof(tp->unused[0]) == ID_PIPE_NUM);
74 for(i=0; i<ID_PIPE_NUM; i++)
75 tp->unused[i] = (uint16_t)i;
76 tp->tcp_r = xfrd_tcp_create(region, QIOBUFSZ);
77 tp->tcp_w = xfrd_tcp_create(region, 512);
78 return tp;
79 }
80
81 void
82 xfrd_setup_packet(buffer_type* packet,
83 uint16_t type, uint16_t klass, const dname_type* dname, uint16_t qid)
84 {
85 /* Set up the header */
86 buffer_clear(packet);
87 ID_SET(packet, qid);
88 FLAGS_SET(packet, 0);
89 OPCODE_SET(packet, OPCODE_QUERY);
90 QDCOUNT_SET(packet, 1);
91 ANCOUNT_SET(packet, 0);
92 NSCOUNT_SET(packet, 0);
93 ARCOUNT_SET(packet, 0);
94 buffer_skip(packet, QHEADERSZ);
95
96 /* The question record. */
97 buffer_write(packet, dname_name(dname), dname->name_size);
98 buffer_write_u16(packet, type);
99 buffer_write_u16(packet, klass);
100 }
101
102 static socklen_t
103 #ifdef INET6
104 xfrd_acl_sockaddr(acl_options_type* acl, unsigned int port,
105 struct sockaddr_storage *sck)
106 #else
107 xfrd_acl_sockaddr(acl_options_type* acl, unsigned int port,
108 struct sockaddr_in *sck, const char* fromto)
109 #endif /* INET6 */
110 {
111 /* setup address structure */
112 #ifdef INET6
113 memset(sck, 0, sizeof(struct sockaddr_storage));
114 #else
115 memset(sck, 0, sizeof(struct sockaddr_in));
116 #endif
117 if(acl->is_ipv6) {
118 #ifdef INET6
119 struct sockaddr_in6* sa = (struct sockaddr_in6*)sck;
120 sa->sin6_family = AF_INET6;
121 sa->sin6_port = htons(port);
122 sa->sin6_addr = acl->addr.addr6;
123 return sizeof(struct sockaddr_in6);
124 #else
125 log_msg(LOG_ERR, "xfrd: IPv6 connection %s %s attempted but no \
126 INET6.", fromto, acl->ip_address_spec);
127 return 0;
128 #endif
129 } else {
130 struct sockaddr_in* sa = (struct sockaddr_in*)sck;
131 sa->sin_family = AF_INET;
132 sa->sin_port = htons(port);
133 sa->sin_addr = acl->addr.addr;
134 return sizeof(struct sockaddr_in);
135 }
136 }
137
138 socklen_t
139 #ifdef INET6
140 xfrd_acl_sockaddr_to(acl_options_type* acl, struct sockaddr_storage *to)
141 #else
142 xfrd_acl_sockaddr_to(acl_options_type* acl, struct sockaddr_in *to)
143 #endif /* INET6 */
144 {
145 unsigned int port = acl->port?acl->port:(unsigned)atoi(TCP_PORT);
146 #ifdef INET6
147 return xfrd_acl_sockaddr(acl, port, to);
148 #else
149 return xfrd_acl_sockaddr(acl, port, to, "to");
150 #endif /* INET6 */
151 }
152
153 socklen_t
154 #ifdef INET6
155 xfrd_acl_sockaddr_frm(acl_options_type* acl, struct sockaddr_storage *frm)
156 #else
157 xfrd_acl_sockaddr_frm(acl_options_type* acl, struct sockaddr_in *frm)
158 #endif /* INET6 */
159 {
160 unsigned int port = acl->port?acl->port:0;
161 #ifdef INET6
162 return xfrd_acl_sockaddr(acl, port, frm);
163 #else
164 return xfrd_acl_sockaddr(acl, port, frm, "from");
165 #endif /* INET6 */
166 }
167
168 void
169 xfrd_write_soa_buffer(struct buffer* packet,
170 const dname_type* apex, struct xfrd_soa* soa)
171 {
172 size_t rdlength_pos;
173 uint16_t rdlength;
174 buffer_write(packet, dname_name(apex), apex->name_size);
175
176 /* already in network order */
177 buffer_write(packet, &soa->type, sizeof(soa->type));
178 buffer_write(packet, &soa->klass, sizeof(soa->klass));
179 buffer_write(packet, &soa->ttl, sizeof(soa->ttl));
180 rdlength_pos = buffer_position(packet);
181 buffer_skip(packet, sizeof(rdlength));
182
183 /* uncompressed dnames */
184 buffer_write(packet, soa->prim_ns+1, soa->prim_ns[0]);
185 buffer_write(packet, soa->email+1, soa->email[0]);
186
187 buffer_write(packet, &soa->serial, sizeof(uint32_t));
188 buffer_write(packet, &soa->refresh, sizeof(uint32_t));
189 buffer_write(packet, &soa->retry, sizeof(uint32_t));
190 buffer_write(packet, &soa->expire, sizeof(uint32_t));
191 buffer_write(packet, &soa->minimum, sizeof(uint32_t));
192
193 /* write length of RR */
194 rdlength = buffer_position(packet) - rdlength_pos - sizeof(rdlength);
195 buffer_write_u16_at(packet, rdlength_pos, rdlength);
196 }
197
198 struct xfrd_tcp*
199 xfrd_tcp_create(region_type* region, size_t bufsize)
200 {
201 struct xfrd_tcp* tcp_state = (struct xfrd_tcp*)region_alloc(
202 region, sizeof(struct xfrd_tcp));
203 memset(tcp_state, 0, sizeof(struct xfrd_tcp));
204 tcp_state->packet = buffer_create(region, bufsize);
205 tcp_state->fd = -1;
206
207 return tcp_state;
208 }
209
210 static struct xfrd_tcp_pipeline*
211 pipeline_find(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
212 {
213 rbnode_type* sme = NULL;
214 struct xfrd_tcp_pipeline* r;
215 /* smaller buf than a full pipeline with 64kb ID array, only need
216 * the front part with the key info, this front part contains the
217 * members that the compare function uses. */
218 const size_t keysize = sizeof(struct xfrd_tcp_pipeline) -
219 ID_PIPE_NUM*(sizeof(struct xfrd_zone*) + sizeof(uint16_t));
220 /* void* type for alignment of the struct,
221 * divide the keysize by ptr-size and then add one to round up */
222 void* buf[ (keysize / sizeof(void*)) + 1 ];
223 struct xfrd_tcp_pipeline* key = (struct xfrd_tcp_pipeline*)buf;
224 key->node.key = key;
225 key->ip_len = xfrd_acl_sockaddr_to(zone->master, &key->ip);
226 key->num_unused = ID_PIPE_NUM;
227 /* lookup existing tcp transfer to the master with highest unused */
228 if(rbtree_find_less_equal(set->pipetree, key, &sme)) {
229 /* exact match, strange, fully unused tcp cannot be open */
230 assert(0);
231 }
232 if(!sme)
233 return NULL;
234 r = (struct xfrd_tcp_pipeline*)sme->key;
235 /* <= key pointed at, is the master correct ? */
236 if(r->ip_len != key->ip_len)
237 return NULL;
238 if(memcmp(&r->ip, &key->ip, key->ip_len) != 0)
239 return NULL;
240 /* correct master, is there a slot free for this transfer? */
241 if(r->num_unused == 0)
242 return NULL;
243 return r;
244 }
245
246 /* remove zone from tcp waiting list */
247 static void
248 tcp_zone_waiting_list_popfirst(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
249 {
250 assert(zone->tcp_waiting);
251 set->tcp_waiting_first = zone->tcp_waiting_next;
252 if(zone->tcp_waiting_next)
253 zone->tcp_waiting_next->tcp_waiting_prev = NULL;
254 else set->tcp_waiting_last = 0;
255 zone->tcp_waiting_next = 0;
256 zone->tcp_waiting = 0;
257 }
258
259 /* remove zone from tcp pipe write-wait list */
260 static void
261 tcp_pipe_sendlist_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
262 {
263 if(zone->in_tcp_send) {
264 if(zone->tcp_send_prev)
265 zone->tcp_send_prev->tcp_send_next=zone->tcp_send_next;
266 else tp->tcp_send_first=zone->tcp_send_next;
267 if(zone->tcp_send_next)
268 zone->tcp_send_next->tcp_send_prev=zone->tcp_send_prev;
269 else tp->tcp_send_last=zone->tcp_send_prev;
270 zone->in_tcp_send = 0;
271 }
272 }
273
274 /* remove first from write-wait list */
275 static void
276 tcp_pipe_sendlist_popfirst(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
277 {
278 tp->tcp_send_first = zone->tcp_send_next;
279 if(tp->tcp_send_first)
280 tp->tcp_send_first->tcp_send_prev = NULL;
281 else tp->tcp_send_last = NULL;
282 zone->in_tcp_send = 0;
283 }
284
285 /* remove zone from tcp pipe ID map */
286 static void
287 tcp_pipe_id_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
288 {
289 assert(tp->num_unused < ID_PIPE_NUM && tp->num_unused >= 0);
290 assert(tp->id[zone->query_id] == zone);
291 tp->id[zone->query_id] = NULL;
292 tp->unused[tp->num_unused] = zone->query_id;
293 /* must remove and re-add for sort order in tree */
294 (void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
295 tp->num_unused++;
296 (void)rbtree_insert(xfrd->tcp_set->pipetree, &tp->node);
297 }
298
299 /* stop the tcp pipe (and all its zones need to retry) */
300 static void
301 xfrd_tcp_pipe_stop(struct xfrd_tcp_pipeline* tp)
302 {
303 int i, conn = -1;
304 assert(tp->num_unused < ID_PIPE_NUM); /* at least one 'in-use' */
305 assert(ID_PIPE_NUM - tp->num_unused > tp->num_skip); /* at least one 'nonskip' */
306 /* need to retry for all the zones connected to it */
307 /* these could use different lists and go to a different nextmaster*/
308 for(i=0; i<ID_PIPE_NUM; i++) {
309 if(tp->id[i] && tp->id[i] != TCP_NULL_SKIP) {
310 xfrd_zone_type* zone = tp->id[i];
311 conn = zone->tcp_conn;
312 zone->tcp_conn = -1;
313 zone->tcp_waiting = 0;
314 tcp_pipe_sendlist_remove(tp, zone);
315 tcp_pipe_id_remove(tp, zone);
316 xfrd_set_refresh_now(zone);
317 }
318 }
319 assert(conn != -1);
320 /* now release the entire tcp pipe */
321 xfrd_tcp_pipe_release(xfrd->tcp_set, tp, conn);
322 }
323
324 static void
325 tcp_pipe_reset_timeout(struct xfrd_tcp_pipeline* tp)
326 {
327 int fd = tp->handler.ev_fd;
328 struct timeval tv;
329 tv.tv_sec = xfrd->tcp_set->tcp_timeout;
330 tv.tv_usec = 0;
331 if(tp->handler_added)
332 event_del(&tp->handler);
333 event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|
334 (tp->tcp_send_first?EV_WRITE:0), xfrd_handle_tcp_pipe, tp);
335 if(event_base_set(xfrd->event_base, &tp->handler) != 0)
336 log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
337 if(event_add(&tp->handler, &tv) != 0)
338 log_msg(LOG_ERR, "xfrd tcp: event_add failed");
339 tp->handler_added = 1;
340 }
341
342 /* handle event from fd of tcp pipe */
343 void
344 xfrd_handle_tcp_pipe(int ATTR_UNUSED(fd), short event, void* arg)
345 {
346 struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)arg;
347 if((event & EV_WRITE)) {
348 tcp_pipe_reset_timeout(tp);
349 if(tp->tcp_send_first) {
350 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp write, zone %s",
351 tp->tcp_send_first->apex_str));
352 xfrd_tcp_write(tp, tp->tcp_send_first);
353 }
354 }
355 if((event & EV_READ) && tp->handler_added) {
356 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp read"));
357 tcp_pipe_reset_timeout(tp);
358 xfrd_tcp_read(tp);
359 }
360 if((event & EV_TIMEOUT) && tp->handler_added) {
361 /* tcp connection timed out */
362 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp timeout"));
363 xfrd_tcp_pipe_stop(tp);
364 }
365 }
366
367 /* add a zone to the pipeline, it starts to want to write its query */
368 static void
369 pipeline_setup_new_zone(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp,
370 xfrd_zone_type* zone)
371 {
372 /* assign the ID */
373 int idx;
374 assert(tp->num_unused > 0);
375 /* we pick a random ID, even though it is TCP anyway */
376 idx = random_generate(tp->num_unused);
377 zone->query_id = tp->unused[idx];
378 tp->unused[idx] = tp->unused[tp->num_unused-1];
379 tp->id[zone->query_id] = zone;
380 /* decrement unused counter, and fixup tree */
381 (void)rbtree_delete(set->pipetree, &tp->node);
382 tp->num_unused--;
383 (void)rbtree_insert(set->pipetree, &tp->node);
384
385 /* add to sendlist, at end */
386 zone->tcp_send_next = NULL;
387 zone->tcp_send_prev = tp->tcp_send_last;
388 zone->in_tcp_send = 1;
389 if(tp->tcp_send_last)
390 tp->tcp_send_last->tcp_send_next = zone;
391 else tp->tcp_send_first = zone;
392 tp->tcp_send_last = zone;
393
394 /* is it first in line? */
395 if(tp->tcp_send_first == zone) {
396 xfrd_tcp_setup_write_packet(tp, zone);
397 /* add write to event handler */
398 tcp_pipe_reset_timeout(tp);
399 }
400 }
401
402 void
403 xfrd_tcp_obtain(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
404 {
405 struct xfrd_tcp_pipeline* tp;
406 assert(zone->tcp_conn == -1);
407 assert(zone->tcp_waiting == 0);
408
409 if(set->tcp_count < XFRD_MAX_TCP) {
410 int i;
411 assert(!set->tcp_waiting_first);
412 set->tcp_count ++;
413 /* find a free tcp_buffer */
414 for(i=0; i<XFRD_MAX_TCP; i++) {
415 if(set->tcp_state[i]->tcp_r->fd == -1) {
416 zone->tcp_conn = i;
417 break;
418 }
419 }
420 /** What if there is no free tcp_buffer? return; */
421 if (zone->tcp_conn < 0) {
422 return;
423 }
424
425 tp = set->tcp_state[zone->tcp_conn];
426 zone->tcp_waiting = 0;
427
428 /* stop udp use (if any) */
429 if(zone->zone_handler.ev_fd != -1)
430 xfrd_udp_release(zone);
431
432 if(!xfrd_tcp_open(set, tp, zone)) {
433 zone->tcp_conn = -1;
434 set->tcp_count --;
435 xfrd_set_refresh_now(zone);
436 return;
437 }
438 /* ip and ip_len set by tcp_open */
439 tp->node.key = tp;
440 tp->num_unused = ID_PIPE_NUM;
441 tp->num_skip = 0;
442 tp->tcp_send_first = NULL;
443 tp->tcp_send_last = NULL;
444 memset(tp->id, 0, sizeof(tp->id));
445 for(i=0; i<ID_PIPE_NUM; i++) {
446 tp->unused[i] = i;
447 }
448
449 /* insert into tree */
450 (void)rbtree_insert(set->pipetree, &tp->node);
451 xfrd_deactivate_zone(zone);
452 xfrd_unset_timer(zone);
453 pipeline_setup_new_zone(set, tp, zone);
454 return;
455 }
456 /* check for a pipeline to the same master with unused ID */
457 if((tp = pipeline_find(set, zone))!= NULL) {
458 int i;
459 if(zone->zone_handler.ev_fd != -1)
460 xfrd_udp_release(zone);
461 for(i=0; i<XFRD_MAX_TCP; i++) {
462 if(set->tcp_state[i] == tp)
463 zone->tcp_conn = i;
464 }
465 xfrd_deactivate_zone(zone);
466 xfrd_unset_timer(zone);
467 pipeline_setup_new_zone(set, tp, zone);
468 return;
469 }
470
471 /* wait, at end of line */
472 DEBUG(DEBUG_XFRD,2, (LOG_INFO, "xfrd: max number of tcp "
473 "connections (%d) reached.", XFRD_MAX_TCP));
474 zone->tcp_waiting_next = 0;
475 zone->tcp_waiting_prev = set->tcp_waiting_last;
476 zone->tcp_waiting = 1;
477 if(!set->tcp_waiting_last) {
478 set->tcp_waiting_first = zone;
479 set->tcp_waiting_last = zone;
480 } else {
481 set->tcp_waiting_last->tcp_waiting_next = zone;
482 set->tcp_waiting_last = zone;
483 }
484 xfrd_deactivate_zone(zone);
485 xfrd_unset_timer(zone);
486 }
487
488 int
489 xfrd_tcp_open(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp,
490 xfrd_zone_type* zone)
491 {
492 int fd, family, conn;
493 struct timeval tv;
494 assert(zone->tcp_conn != -1);
495
496 /* if there is no next master, fallback to use the first one */
497 /* but there really should be a master set */
498 if(!zone->master) {
499 zone->master = zone->zone_options->pattern->request_xfr;
500 zone->master_num = 0;
501 }
502
503 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s open tcp conn to %s",
504 zone->apex_str, zone->master->ip_address_spec));
505 tp->tcp_r->is_reading = 1;
506 tp->tcp_r->total_bytes = 0;
507 tp->tcp_r->msglen = 0;
508 buffer_clear(tp->tcp_r->packet);
509 tp->tcp_w->is_reading = 0;
510 tp->tcp_w->total_bytes = 0;
511 tp->tcp_w->msglen = 0;
512 tp->connection_established = 0;
513
514 if(zone->master->is_ipv6) {
515 #ifdef INET6
516 family = PF_INET6;
517 #else
518 xfrd_set_refresh_now(zone);
519 return 0;
520 #endif
521 } else {
522 family = PF_INET;
523 }
524 fd = socket(family, SOCK_STREAM, IPPROTO_TCP);
525 if(fd == -1) {
526 /* squelch 'Address family not supported by protocol' at low
527 * verbosity levels */
528 if(errno != EAFNOSUPPORT || verbosity > 2)
529 log_msg(LOG_ERR, "xfrd: %s cannot create tcp socket: %s",
530 zone->master->ip_address_spec, strerror(errno));
531 xfrd_set_refresh_now(zone);
532 return 0;
533 }
534 if(fcntl(fd, F_SETFL, O_NONBLOCK) == -1) {
535 log_msg(LOG_ERR, "xfrd: fcntl failed: %s", strerror(errno));
536 close(fd);
537 xfrd_set_refresh_now(zone);
538 return 0;
539 }
540
541 if(xfrd->nsd->outgoing_tcp_mss > 0) {
542 #if defined(IPPROTO_TCP) && defined(TCP_MAXSEG)
543 if(setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG,
544 (void*)&xfrd->nsd->outgoing_tcp_mss,
545 sizeof(xfrd->nsd->outgoing_tcp_mss)) < 0) {
546 log_msg(LOG_ERR, "xfrd: setsockopt(TCP_MAXSEG)"
547 "failed: %s", strerror(errno));
548 }
549 #else
550 log_msg(LOG_ERR, "setsockopt(TCP_MAXSEG) unsupported");
551 #endif
552 }
553
554 tp->ip_len = xfrd_acl_sockaddr_to(zone->master, &tp->ip);
555
556 /* bind it */
557 if (!xfrd_bind_local_interface(fd, zone->zone_options->pattern->
558 outgoing_interface, zone->master, 1)) {
559 close(fd);
560 xfrd_set_refresh_now(zone);
561 return 0;
562 }
563
564 conn = connect(fd, (struct sockaddr*)&tp->ip, tp->ip_len);
565 if (conn == -1 && errno != EINPROGRESS) {
566 log_msg(LOG_ERR, "xfrd: connect %s failed: %s",
567 zone->master->ip_address_spec, strerror(errno));
568 close(fd);
569 xfrd_set_refresh_now(zone);
570 return 0;
571 }
572 tp->tcp_r->fd = fd;
573 tp->tcp_w->fd = fd;
574
575 /* set the tcp pipe event */
576 if(tp->handler_added)
577 event_del(&tp->handler);
578 event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|EV_WRITE,
579 xfrd_handle_tcp_pipe, tp);
580 if(event_base_set(xfrd->event_base, &tp->handler) != 0)
581 log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
582 tv.tv_sec = set->tcp_timeout;
583 tv.tv_usec = 0;
584 if(event_add(&tp->handler, &tv) != 0)
585 log_msg(LOG_ERR, "xfrd tcp: event_add failed");
586 tp->handler_added = 1;
587 return 1;
588 }
589
590 void
591 xfrd_tcp_setup_write_packet(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
592 {
593 struct xfrd_tcp* tcp = tp->tcp_w;
594 assert(zone->tcp_conn != -1);
595 assert(zone->tcp_waiting == 0);
596 /* start AXFR or IXFR for the zone */
597 if(zone->soa_disk_acquired == 0 || zone->master->use_axfr_only ||
598 zone->master->ixfr_disabled ||
599 /* if zone expired, after the first round, do not ask for
600 * IXFR any more, but full AXFR (of any serial number) */
601 (zone->state == xfrd_zone_expired && zone->round_num != 0)) {
602 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request full zone transfer "
603 "(AXFR) for %s to %s",
604 zone->apex_str, zone->master->ip_address_spec));
605
606 xfrd_setup_packet(tcp->packet, TYPE_AXFR, CLASS_IN, zone->apex,
607 zone->query_id);
608 } else {
609 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request incremental zone "
610 "transfer (IXFR) for %s to %s",
611 zone->apex_str, zone->master->ip_address_spec));
612
613 xfrd_setup_packet(tcp->packet, TYPE_IXFR, CLASS_IN, zone->apex,
614 zone->query_id);
615 NSCOUNT_SET(tcp->packet, 1);
616 xfrd_write_soa_buffer(tcp->packet, zone->apex, &zone->soa_disk);
617 }
618 /* old transfer needs to be removed still? */
619 if(zone->msg_seq_nr)
620 xfrd_unlink_xfrfile(xfrd->nsd, zone->xfrfilenumber);
621 zone->msg_seq_nr = 0;
622 zone->msg_rr_count = 0;
623 if(zone->master->key_options && zone->master->key_options->tsig_key) {
624 xfrd_tsig_sign_request(tcp->packet, &zone->tsig, zone->master);
625 }
626 buffer_flip(tcp->packet);
627 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "sent tcp query with ID %d", zone->query_id));
628 tcp->msglen = buffer_limit(tcp->packet);
629 tcp->total_bytes = 0;
630 }
631
632 static void
633 tcp_conn_ready_for_reading(struct xfrd_tcp* tcp)
634 {
635 tcp->total_bytes = 0;
636 tcp->msglen = 0;
637 buffer_clear(tcp->packet);
638 }
639
640 int conn_write(struct xfrd_tcp* tcp)
641 {
642 ssize_t sent;
643
644 if(tcp->total_bytes < sizeof(tcp->msglen)) {
645 uint16_t sendlen = htons(tcp->msglen);
646 #ifdef HAVE_WRITEV
647 struct iovec iov[2];
648 iov[0].iov_base = (uint8_t*)&sendlen + tcp->total_bytes;
649 iov[0].iov_len = sizeof(sendlen) - tcp->total_bytes;
650 iov[1].iov_base = buffer_begin(tcp->packet);
651 iov[1].iov_len = buffer_limit(tcp->packet);
652 sent = writev(tcp->fd, iov, 2);
653 #else /* HAVE_WRITEV */
654 sent = write(tcp->fd,
655 (const char*)&sendlen + tcp->total_bytes,
656 sizeof(tcp->msglen) - tcp->total_bytes);
657 #endif /* HAVE_WRITEV */
658
659 if(sent == -1) {
660 if(errno == EAGAIN || errno == EINTR) {
661 /* write would block, try later */
662 return 0;
663 } else {
664 return -1;
665 }
666 }
667
668 tcp->total_bytes += sent;
669 if(sent > (ssize_t)sizeof(tcp->msglen))
670 buffer_skip(tcp->packet, sent-sizeof(tcp->msglen));
671 if(tcp->total_bytes < sizeof(tcp->msglen)) {
672 /* incomplete write, resume later */
673 return 0;
674 }
675 #ifdef HAVE_WRITEV
676 if(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen)) {
677 /* packet done */
678 return 1;
679 }
680 #endif
681 assert(tcp->total_bytes >= sizeof(tcp->msglen));
682 }
683
684 assert(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen));
685
686 sent = write(tcp->fd,
687 buffer_current(tcp->packet),
688 buffer_remaining(tcp->packet));
689 if(sent == -1) {
690 if(errno == EAGAIN || errno == EINTR) {
691 /* write would block, try later */
692 return 0;
693 } else {
694 return -1;
695 }
696 }
697
698 buffer_skip(tcp->packet, sent);
699 tcp->total_bytes += sent;
700
701 if(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen)) {
702 /* more to write when socket becomes writable again */
703 return 0;
704 }
705
706 assert(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen));
707 return 1;
708 }
709
710 void
711 xfrd_tcp_write(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
712 {
713 int ret;
714 struct xfrd_tcp* tcp = tp->tcp_w;
715 assert(zone->tcp_conn != -1);
716 assert(zone == tp->tcp_send_first);
717 /* see if for non-established connection, there is a connect error */
718 if(!tp->connection_established) {
719 /* check for pending error from nonblocking connect */
720 /* from Stevens, unix network programming, vol1, 3rd ed, p450 */
721 int error = 0;
722 socklen_t len = sizeof(error);
723 if(getsockopt(tcp->fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0){
724 error = errno; /* on solaris errno is error */
725 }
726 if(error == EINPROGRESS || error == EWOULDBLOCK)
727 return; /* try again later */
728 if(error != 0) {
729 log_msg(LOG_ERR, "%s: Could not tcp connect to %s: %s",
730 zone->apex_str, zone->master->ip_address_spec,
731 strerror(error));
732 xfrd_tcp_pipe_stop(tp);
733 return;
734 }
735 }
736 ret = conn_write(tcp);
737 if(ret == -1) {
738 log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
739 xfrd_tcp_pipe_stop(tp);
740 return;
741 }
742 if(tcp->total_bytes != 0 && !tp->connection_established)
743 tp->connection_established = 1;
744 if(ret == 0) {
745 return; /* write again later */
746 }
747 /* done writing this message */
748
749 /* remove first zone from sendlist */
750 tcp_pipe_sendlist_popfirst(tp, zone);
751
752 /* see if other zone wants to write; init; let it write (now) */
753 /* and use a loop, because 64k stack calls is a too much */
754 while(tp->tcp_send_first) {
755 /* setup to write for this zone */
756 xfrd_tcp_setup_write_packet(tp, tp->tcp_send_first);
757 /* attempt to write for this zone (if success, continue loop)*/
758 ret = conn_write(tcp);
759 if(ret == -1) {
760 log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
761 xfrd_tcp_pipe_stop(tp);
762 return;
763 }
764 if(ret == 0)
765 return; /* write again later */
766 tcp_pipe_sendlist_popfirst(tp, tp->tcp_send_first);
767 }
768
769 /* if sendlist empty, remove WRITE from event */
770
771 /* listen to READ, and not WRITE events */
772 assert(tp->tcp_send_first == NULL);
773 tcp_pipe_reset_timeout(tp);
774 }
775
776 int
777 conn_read(struct xfrd_tcp* tcp)
778 {
779 ssize_t received;
780 /* receive leading packet length bytes */
781 if(tcp->total_bytes < sizeof(tcp->msglen)) {
782 received = read(tcp->fd,
783 (char*) &tcp->msglen + tcp->total_bytes,
784 sizeof(tcp->msglen) - tcp->total_bytes);
785 if(received == -1) {
786 if(errno == EAGAIN || errno == EINTR) {
787 /* read would block, try later */
788 return 0;
789 } else {
790 #ifdef ECONNRESET
791 if (verbosity >= 2 || errno != ECONNRESET)
792 #endif /* ECONNRESET */
793 log_msg(LOG_ERR, "tcp read sz: %s", strerror(errno));
794 return -1;
795 }
796 } else if(received == 0) {
797 /* EOF */
798 return -1;
799 }
800 tcp->total_bytes += received;
801 if(tcp->total_bytes < sizeof(tcp->msglen)) {
802 /* not complete yet, try later */
803 return 0;
804 }
805
806 assert(tcp->total_bytes == sizeof(tcp->msglen));
807 tcp->msglen = ntohs(tcp->msglen);
808
809 if(tcp->msglen == 0) {
810 buffer_set_limit(tcp->packet, tcp->msglen);
811 return 1;
812 }
813 if(tcp->msglen > buffer_capacity(tcp->packet)) {
814 log_msg(LOG_ERR, "buffer too small, dropping connection");
815 return 0;
816 }
817 buffer_set_limit(tcp->packet, tcp->msglen);
818 }
819
820 assert(buffer_remaining(tcp->packet) > 0);
821
822 received = read(tcp->fd, buffer_current(tcp->packet),
823 buffer_remaining(tcp->packet));
824 if(received == -1) {
825 if(errno == EAGAIN || errno == EINTR) {
826 /* read would block, try later */
827 return 0;
828 } else {
829 #ifdef ECONNRESET
830 if (verbosity >= 2 || errno != ECONNRESET)
831 #endif /* ECONNRESET */
832 log_msg(LOG_ERR, "tcp read %s", strerror(errno));
833 return -1;
834 }
835 } else if(received == 0) {
836 /* EOF */
837 return -1;
838 }
839
840 tcp->total_bytes += received;
841 buffer_skip(tcp->packet, received);
842
843 if(buffer_remaining(tcp->packet) > 0) {
844 /* not complete yet, wait for more */
845 return 0;
846 }
847
848 /* completed */
849 assert(buffer_position(tcp->packet) == tcp->msglen);
850 return 1;
851 }
852
853 void
854 xfrd_tcp_read(struct xfrd_tcp_pipeline* tp)
855 {
856 xfrd_zone_type* zone;
857 struct xfrd_tcp* tcp = tp->tcp_r;
858 int ret;
859 enum xfrd_packet_result pkt_result;
860
861 ret = conn_read(tcp);
862 if(ret == -1) {
863 xfrd_tcp_pipe_stop(tp);
864 return;
865 }
866 if(ret == 0)
867 return;
868 /* completed msg */
869 buffer_flip(tcp->packet);
870 /* see which ID number it is, if skip, handle skip, NULL: warn */
871 if(tcp->msglen < QHEADERSZ) {
872 /* too short for DNS header, skip it */
873 DEBUG(DEBUG_XFRD,1, (LOG_INFO,
874 "xfrd: tcp skip response that is too short"));
875 tcp_conn_ready_for_reading(tcp);
876 return;
877 }
878 zone = tp->id[ID(tcp->packet)];
879 if(!zone || zone == TCP_NULL_SKIP) {
880 /* no zone for this id? skip it */
881 DEBUG(DEBUG_XFRD,1, (LOG_INFO,
882 "xfrd: tcp skip response with %s ID",
883 zone?"set-to-skip":"unknown"));
884 tcp_conn_ready_for_reading(tcp);
885 return;
886 }
887 assert(zone->tcp_conn != -1);
888
889 /* handle message for zone */
890 pkt_result = xfrd_handle_received_xfr_packet(zone, tcp->packet);
891 /* setup for reading the next packet on this connection */
892 tcp_conn_ready_for_reading(tcp);
893 switch(pkt_result) {
894 case xfrd_packet_more:
895 /* wait for next packet */
896 break;
897 case xfrd_packet_newlease:
898 /* set to skip if more packets with this ID */
899 tp->id[zone->query_id] = TCP_NULL_SKIP;
900 tp->num_skip++;
901 /* fall through to remove zone from tp */
902 /* fallthrough */
903 case xfrd_packet_transfer:
904 if(zone->zone_options->pattern->multi_master_check) {
905 xfrd_tcp_release(xfrd->tcp_set, zone);
906 xfrd_make_request(zone);
907 break;
908 }
909 xfrd_tcp_release(xfrd->tcp_set, zone);
910 assert(zone->round_num == -1);
911 break;
912 case xfrd_packet_notimpl:
913 xfrd_disable_ixfr(zone);
914 xfrd_tcp_release(xfrd->tcp_set, zone);
915 /* query next server */
916 xfrd_make_request(zone);
917 break;
918 case xfrd_packet_bad:
919 case xfrd_packet_tcp:
920 default:
921 /* set to skip if more packets with this ID */
922 tp->id[zone->query_id] = TCP_NULL_SKIP;
923 tp->num_skip++;
924 xfrd_tcp_release(xfrd->tcp_set, zone);
925 /* query next server */
926 xfrd_make_request(zone);
927 break;
928 }
929 }
930
931 void
932 xfrd_tcp_release(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
933 {
934 int conn = zone->tcp_conn;
935 struct xfrd_tcp_pipeline* tp = set->tcp_state[conn];
936 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s released tcp conn to %s",
937 zone->apex_str, zone->master->ip_address_spec));
938 assert(zone->tcp_conn != -1);
939 assert(zone->tcp_waiting == 0);
940 zone->tcp_conn = -1;
941 zone->tcp_waiting = 0;
942
943 /* remove from tcp_send list */
944 tcp_pipe_sendlist_remove(tp, zone);
945 /* remove it from the ID list */
946 if(tp->id[zone->query_id] != TCP_NULL_SKIP)
947 tcp_pipe_id_remove(tp, zone);
948 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: released tcp pipe now %d unused",
949 tp->num_unused));
950 /* if pipe was full, but no more, then see if waiting element is
951 * for the same master, and can fill the unused ID */
952 if(tp->num_unused == 1 && set->tcp_waiting_first) {
953 #ifdef INET6
954 struct sockaddr_storage to;
955 #else
956 struct sockaddr_in to;
957 #endif
958 socklen_t to_len = xfrd_acl_sockaddr_to(
959 set->tcp_waiting_first->master, &to);
960 if(to_len == tp->ip_len && memcmp(&to, &tp->ip, to_len) == 0) {
961 /* use this connection for the waiting zone */
962 zone = set->tcp_waiting_first;
963 assert(zone->tcp_conn == -1);
964 zone->tcp_conn = conn;
965 tcp_zone_waiting_list_popfirst(set, zone);
966 if(zone->zone_handler.ev_fd != -1)
967 xfrd_udp_release(zone);
968 xfrd_unset_timer(zone);
969 pipeline_setup_new_zone(set, tp, zone);
970 return;
971 }
972 /* waiting zone did not go to same server */
973 }
974
975 /* if all unused, or only skipped leftover, close the pipeline */
976 if(tp->num_unused >= ID_PIPE_NUM || tp->num_skip >= ID_PIPE_NUM - tp->num_unused)
977 xfrd_tcp_pipe_release(set, tp, conn);
978 }
979
980 void
981 xfrd_tcp_pipe_release(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp,
982 int conn)
983 {
984 DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: tcp pipe released"));
985 /* one handler per tcp pipe */
986 if(tp->handler_added)
987 event_del(&tp->handler);
988 tp->handler_added = 0;
989
990 /* fd in tcp_r and tcp_w is the same, close once */
991 if(tp->tcp_r->fd != -1)
992 close(tp->tcp_r->fd);
993 tp->tcp_r->fd = -1;
994 tp->tcp_w->fd = -1;
995
996 /* remove from pipetree */
997 (void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
998
999 /* a waiting zone can use the free tcp slot (to another server) */
1000 /* if that zone fails to set-up or connect, we try to start the next
1001 * waiting zone in the list */
1002 while(set->tcp_count == XFRD_MAX_TCP && set->tcp_waiting_first) {
1003 int i;
1004
1005 /* pop first waiting process */
1006 xfrd_zone_type* zone = set->tcp_waiting_first;
1007 /* start it */
1008 assert(zone->tcp_conn == -1);
1009 zone->tcp_conn = conn;
1010 tcp_zone_waiting_list_popfirst(set, zone);
1011
1012 /* stop udp (if any) */
1013 if(zone->zone_handler.ev_fd != -1)
1014 xfrd_udp_release(zone);
1015 if(!xfrd_tcp_open(set, tp, zone)) {
1016 zone->tcp_conn = -1;
1017 xfrd_set_refresh_now(zone);
1018 /* try to start the next zone (if any) */
1019 continue;
1020 }
1021 /* re-init this tcppipe */
1022 /* ip and ip_len set by tcp_open */
1023 tp->node.key = tp;
1024 tp->num_unused = ID_PIPE_NUM;
1025 tp->num_skip = 0;
1026 tp->tcp_send_first = NULL;
1027 tp->tcp_send_last = NULL;
1028 memset(tp->id, 0, sizeof(tp->id));
1029 for(i=0; i<ID_PIPE_NUM; i++) {
1030 tp->unused[i] = i;
1031 }
1032
1033 /* insert into tree */
1034 (void)rbtree_insert(set->pipetree, &tp->node);
1035 /* setup write */
1036 xfrd_unset_timer(zone);
1037 pipeline_setup_new_zone(set, tp, zone);
1038 /* started a task, no need for cleanups, so return */
1039 return;
1040 }
1041 /* no task to start, cleanup */
1042 assert(!set->tcp_waiting_first);
1043 set->tcp_count --;
1044 assert(set->tcp_count >= 0);
1045 }
1046
1047