xfrd-tcp.c revision 1.1.1.3 1 1.1 christos /*
2 1.1 christos * xfrd-tcp.c - XFR (transfer) Daemon TCP system source file. Manages tcp conn.
3 1.1 christos *
4 1.1 christos * Copyright (c) 2001-2006, NLnet Labs. All rights reserved.
5 1.1 christos *
6 1.1 christos * See LICENSE for the license.
7 1.1 christos *
8 1.1 christos */
9 1.1 christos
10 1.1 christos #include "config.h"
11 1.1 christos #include <assert.h>
12 1.1 christos #include <errno.h>
13 1.1 christos #include <fcntl.h>
14 1.1 christos #include <unistd.h>
15 1.1 christos #include <stdlib.h>
16 1.1.1.2 christos #include <sys/uio.h>
17 1.1 christos #include "nsd.h"
18 1.1 christos #include "xfrd-tcp.h"
19 1.1 christos #include "buffer.h"
20 1.1 christos #include "packet.h"
21 1.1 christos #include "dname.h"
22 1.1 christos #include "options.h"
23 1.1 christos #include "namedb.h"
24 1.1 christos #include "xfrd.h"
25 1.1 christos #include "xfrd-disk.h"
26 1.1 christos #include "util.h"
27 1.1 christos
28 1.1 christos /* sort tcppipe, first on IP address, for an IPaddresss, sort on num_unused */
29 1.1 christos static int
30 1.1 christos xfrd_pipe_cmp(const void* a, const void* b)
31 1.1 christos {
32 1.1 christos const struct xfrd_tcp_pipeline* x = (struct xfrd_tcp_pipeline*)a;
33 1.1 christos const struct xfrd_tcp_pipeline* y = (struct xfrd_tcp_pipeline*)b;
34 1.1 christos int r;
35 1.1 christos if(x == y)
36 1.1 christos return 0;
37 1.1 christos if(y->ip_len != x->ip_len)
38 1.1 christos /* subtraction works because nonnegative and small numbers */
39 1.1 christos return (int)y->ip_len - (int)x->ip_len;
40 1.1 christos r = memcmp(&x->ip, &y->ip, x->ip_len);
41 1.1 christos if(r != 0)
42 1.1 christos return r;
43 1.1 christos /* sort that num_unused is sorted ascending, */
44 1.1 christos if(x->num_unused != y->num_unused) {
45 1.1 christos return (x->num_unused < y->num_unused) ? -1 : 1;
46 1.1 christos }
47 1.1 christos /* different pipelines are different still, even with same numunused*/
48 1.1 christos return (uintptr_t)x < (uintptr_t)y ? -1 : 1;
49 1.1 christos }
50 1.1 christos
51 1.1.1.2 christos struct xfrd_tcp_set* xfrd_tcp_set_create(struct region* region)
52 1.1 christos {
53 1.1 christos int i;
54 1.1.1.2 christos struct xfrd_tcp_set* tcp_set = region_alloc(region,
55 1.1.1.2 christos sizeof(struct xfrd_tcp_set));
56 1.1.1.2 christos memset(tcp_set, 0, sizeof(struct xfrd_tcp_set));
57 1.1 christos tcp_set->tcp_count = 0;
58 1.1 christos tcp_set->tcp_waiting_first = 0;
59 1.1 christos tcp_set->tcp_waiting_last = 0;
60 1.1 christos for(i=0; i<XFRD_MAX_TCP; i++)
61 1.1 christos tcp_set->tcp_state[i] = xfrd_tcp_pipeline_create(region);
62 1.1 christos tcp_set->pipetree = rbtree_create(region, &xfrd_pipe_cmp);
63 1.1 christos return tcp_set;
64 1.1 christos }
65 1.1 christos
66 1.1 christos struct xfrd_tcp_pipeline*
67 1.1 christos xfrd_tcp_pipeline_create(region_type* region)
68 1.1 christos {
69 1.1 christos int i;
70 1.1 christos struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)
71 1.1 christos region_alloc_zero(region, sizeof(*tp));
72 1.1 christos tp->num_unused = ID_PIPE_NUM;
73 1.1 christos assert(sizeof(tp->unused)/sizeof(tp->unused[0]) == ID_PIPE_NUM);
74 1.1 christos for(i=0; i<ID_PIPE_NUM; i++)
75 1.1 christos tp->unused[i] = (uint16_t)i;
76 1.1 christos tp->tcp_r = xfrd_tcp_create(region, QIOBUFSZ);
77 1.1 christos tp->tcp_w = xfrd_tcp_create(region, 512);
78 1.1 christos return tp;
79 1.1 christos }
80 1.1 christos
81 1.1 christos void
82 1.1 christos xfrd_setup_packet(buffer_type* packet,
83 1.1 christos uint16_t type, uint16_t klass, const dname_type* dname, uint16_t qid)
84 1.1 christos {
85 1.1 christos /* Set up the header */
86 1.1 christos buffer_clear(packet);
87 1.1 christos ID_SET(packet, qid);
88 1.1 christos FLAGS_SET(packet, 0);
89 1.1 christos OPCODE_SET(packet, OPCODE_QUERY);
90 1.1 christos QDCOUNT_SET(packet, 1);
91 1.1 christos ANCOUNT_SET(packet, 0);
92 1.1 christos NSCOUNT_SET(packet, 0);
93 1.1 christos ARCOUNT_SET(packet, 0);
94 1.1 christos buffer_skip(packet, QHEADERSZ);
95 1.1 christos
96 1.1 christos /* The question record. */
97 1.1 christos buffer_write(packet, dname_name(dname), dname->name_size);
98 1.1 christos buffer_write_u16(packet, type);
99 1.1 christos buffer_write_u16(packet, klass);
100 1.1 christos }
101 1.1 christos
102 1.1 christos static socklen_t
103 1.1 christos #ifdef INET6
104 1.1.1.2 christos xfrd_acl_sockaddr(acl_options_type* acl, unsigned int port,
105 1.1 christos struct sockaddr_storage *sck)
106 1.1 christos #else
107 1.1.1.2 christos xfrd_acl_sockaddr(acl_options_type* acl, unsigned int port,
108 1.1 christos struct sockaddr_in *sck, const char* fromto)
109 1.1 christos #endif /* INET6 */
110 1.1 christos {
111 1.1 christos /* setup address structure */
112 1.1 christos #ifdef INET6
113 1.1 christos memset(sck, 0, sizeof(struct sockaddr_storage));
114 1.1 christos #else
115 1.1 christos memset(sck, 0, sizeof(struct sockaddr_in));
116 1.1 christos #endif
117 1.1 christos if(acl->is_ipv6) {
118 1.1 christos #ifdef INET6
119 1.1 christos struct sockaddr_in6* sa = (struct sockaddr_in6*)sck;
120 1.1 christos sa->sin6_family = AF_INET6;
121 1.1 christos sa->sin6_port = htons(port);
122 1.1 christos sa->sin6_addr = acl->addr.addr6;
123 1.1 christos return sizeof(struct sockaddr_in6);
124 1.1 christos #else
125 1.1 christos log_msg(LOG_ERR, "xfrd: IPv6 connection %s %s attempted but no \
126 1.1 christos INET6.", fromto, acl->ip_address_spec);
127 1.1 christos return 0;
128 1.1 christos #endif
129 1.1 christos } else {
130 1.1 christos struct sockaddr_in* sa = (struct sockaddr_in*)sck;
131 1.1 christos sa->sin_family = AF_INET;
132 1.1 christos sa->sin_port = htons(port);
133 1.1 christos sa->sin_addr = acl->addr.addr;
134 1.1 christos return sizeof(struct sockaddr_in);
135 1.1 christos }
136 1.1 christos }
137 1.1 christos
138 1.1 christos socklen_t
139 1.1 christos #ifdef INET6
140 1.1.1.2 christos xfrd_acl_sockaddr_to(acl_options_type* acl, struct sockaddr_storage *to)
141 1.1 christos #else
142 1.1.1.2 christos xfrd_acl_sockaddr_to(acl_options_type* acl, struct sockaddr_in *to)
143 1.1 christos #endif /* INET6 */
144 1.1 christos {
145 1.1 christos unsigned int port = acl->port?acl->port:(unsigned)atoi(TCP_PORT);
146 1.1 christos #ifdef INET6
147 1.1 christos return xfrd_acl_sockaddr(acl, port, to);
148 1.1 christos #else
149 1.1 christos return xfrd_acl_sockaddr(acl, port, to, "to");
150 1.1 christos #endif /* INET6 */
151 1.1 christos }
152 1.1 christos
153 1.1 christos socklen_t
154 1.1 christos #ifdef INET6
155 1.1.1.2 christos xfrd_acl_sockaddr_frm(acl_options_type* acl, struct sockaddr_storage *frm)
156 1.1 christos #else
157 1.1.1.2 christos xfrd_acl_sockaddr_frm(acl_options_type* acl, struct sockaddr_in *frm)
158 1.1 christos #endif /* INET6 */
159 1.1 christos {
160 1.1 christos unsigned int port = acl->port?acl->port:0;
161 1.1 christos #ifdef INET6
162 1.1 christos return xfrd_acl_sockaddr(acl, port, frm);
163 1.1 christos #else
164 1.1 christos return xfrd_acl_sockaddr(acl, port, frm, "from");
165 1.1 christos #endif /* INET6 */
166 1.1 christos }
167 1.1 christos
168 1.1 christos void
169 1.1 christos xfrd_write_soa_buffer(struct buffer* packet,
170 1.1 christos const dname_type* apex, struct xfrd_soa* soa)
171 1.1 christos {
172 1.1 christos size_t rdlength_pos;
173 1.1 christos uint16_t rdlength;
174 1.1 christos buffer_write(packet, dname_name(apex), apex->name_size);
175 1.1 christos
176 1.1 christos /* already in network order */
177 1.1 christos buffer_write(packet, &soa->type, sizeof(soa->type));
178 1.1 christos buffer_write(packet, &soa->klass, sizeof(soa->klass));
179 1.1 christos buffer_write(packet, &soa->ttl, sizeof(soa->ttl));
180 1.1 christos rdlength_pos = buffer_position(packet);
181 1.1 christos buffer_skip(packet, sizeof(rdlength));
182 1.1 christos
183 1.1 christos /* uncompressed dnames */
184 1.1 christos buffer_write(packet, soa->prim_ns+1, soa->prim_ns[0]);
185 1.1 christos buffer_write(packet, soa->email+1, soa->email[0]);
186 1.1 christos
187 1.1 christos buffer_write(packet, &soa->serial, sizeof(uint32_t));
188 1.1 christos buffer_write(packet, &soa->refresh, sizeof(uint32_t));
189 1.1 christos buffer_write(packet, &soa->retry, sizeof(uint32_t));
190 1.1 christos buffer_write(packet, &soa->expire, sizeof(uint32_t));
191 1.1 christos buffer_write(packet, &soa->minimum, sizeof(uint32_t));
192 1.1 christos
193 1.1 christos /* write length of RR */
194 1.1 christos rdlength = buffer_position(packet) - rdlength_pos - sizeof(rdlength);
195 1.1 christos buffer_write_u16_at(packet, rdlength_pos, rdlength);
196 1.1 christos }
197 1.1 christos
198 1.1.1.2 christos struct xfrd_tcp*
199 1.1 christos xfrd_tcp_create(region_type* region, size_t bufsize)
200 1.1 christos {
201 1.1.1.2 christos struct xfrd_tcp* tcp_state = (struct xfrd_tcp*)region_alloc(
202 1.1.1.2 christos region, sizeof(struct xfrd_tcp));
203 1.1.1.2 christos memset(tcp_state, 0, sizeof(struct xfrd_tcp));
204 1.1 christos tcp_state->packet = buffer_create(region, bufsize);
205 1.1 christos tcp_state->fd = -1;
206 1.1 christos
207 1.1 christos return tcp_state;
208 1.1 christos }
209 1.1 christos
210 1.1 christos static struct xfrd_tcp_pipeline*
211 1.1.1.2 christos pipeline_find(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
212 1.1 christos {
213 1.1.1.2 christos rbnode_type* sme = NULL;
214 1.1 christos struct xfrd_tcp_pipeline* r;
215 1.1 christos /* smaller buf than a full pipeline with 64kb ID array, only need
216 1.1 christos * the front part with the key info, this front part contains the
217 1.1 christos * members that the compare function uses. */
218 1.1 christos const size_t keysize = sizeof(struct xfrd_tcp_pipeline) -
219 1.1 christos ID_PIPE_NUM*(sizeof(struct xfrd_zone*) + sizeof(uint16_t));
220 1.1 christos /* void* type for alignment of the struct,
221 1.1 christos * divide the keysize by ptr-size and then add one to round up */
222 1.1 christos void* buf[ (keysize / sizeof(void*)) + 1 ];
223 1.1 christos struct xfrd_tcp_pipeline* key = (struct xfrd_tcp_pipeline*)buf;
224 1.1 christos key->node.key = key;
225 1.1 christos key->ip_len = xfrd_acl_sockaddr_to(zone->master, &key->ip);
226 1.1 christos key->num_unused = ID_PIPE_NUM;
227 1.1 christos /* lookup existing tcp transfer to the master with highest unused */
228 1.1 christos if(rbtree_find_less_equal(set->pipetree, key, &sme)) {
229 1.1 christos /* exact match, strange, fully unused tcp cannot be open */
230 1.1 christos assert(0);
231 1.1 christos }
232 1.1 christos if(!sme)
233 1.1 christos return NULL;
234 1.1 christos r = (struct xfrd_tcp_pipeline*)sme->key;
235 1.1 christos /* <= key pointed at, is the master correct ? */
236 1.1 christos if(r->ip_len != key->ip_len)
237 1.1 christos return NULL;
238 1.1 christos if(memcmp(&r->ip, &key->ip, key->ip_len) != 0)
239 1.1 christos return NULL;
240 1.1 christos /* correct master, is there a slot free for this transfer? */
241 1.1 christos if(r->num_unused == 0)
242 1.1 christos return NULL;
243 1.1 christos return r;
244 1.1 christos }
245 1.1 christos
246 1.1 christos /* remove zone from tcp waiting list */
247 1.1 christos static void
248 1.1.1.2 christos tcp_zone_waiting_list_popfirst(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
249 1.1 christos {
250 1.1 christos assert(zone->tcp_waiting);
251 1.1 christos set->tcp_waiting_first = zone->tcp_waiting_next;
252 1.1 christos if(zone->tcp_waiting_next)
253 1.1 christos zone->tcp_waiting_next->tcp_waiting_prev = NULL;
254 1.1 christos else set->tcp_waiting_last = 0;
255 1.1 christos zone->tcp_waiting_next = 0;
256 1.1 christos zone->tcp_waiting = 0;
257 1.1 christos }
258 1.1 christos
259 1.1 christos /* remove zone from tcp pipe write-wait list */
260 1.1 christos static void
261 1.1.1.2 christos tcp_pipe_sendlist_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
262 1.1 christos {
263 1.1 christos if(zone->in_tcp_send) {
264 1.1 christos if(zone->tcp_send_prev)
265 1.1 christos zone->tcp_send_prev->tcp_send_next=zone->tcp_send_next;
266 1.1 christos else tp->tcp_send_first=zone->tcp_send_next;
267 1.1 christos if(zone->tcp_send_next)
268 1.1 christos zone->tcp_send_next->tcp_send_prev=zone->tcp_send_prev;
269 1.1 christos else tp->tcp_send_last=zone->tcp_send_prev;
270 1.1 christos zone->in_tcp_send = 0;
271 1.1 christos }
272 1.1 christos }
273 1.1 christos
274 1.1 christos /* remove first from write-wait list */
275 1.1 christos static void
276 1.1.1.2 christos tcp_pipe_sendlist_popfirst(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
277 1.1 christos {
278 1.1 christos tp->tcp_send_first = zone->tcp_send_next;
279 1.1 christos if(tp->tcp_send_first)
280 1.1 christos tp->tcp_send_first->tcp_send_prev = NULL;
281 1.1 christos else tp->tcp_send_last = NULL;
282 1.1 christos zone->in_tcp_send = 0;
283 1.1 christos }
284 1.1 christos
285 1.1 christos /* remove zone from tcp pipe ID map */
286 1.1 christos static void
287 1.1.1.2 christos tcp_pipe_id_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
288 1.1 christos {
289 1.1 christos assert(tp->num_unused < ID_PIPE_NUM && tp->num_unused >= 0);
290 1.1 christos assert(tp->id[zone->query_id] == zone);
291 1.1 christos tp->id[zone->query_id] = NULL;
292 1.1 christos tp->unused[tp->num_unused] = zone->query_id;
293 1.1 christos /* must remove and re-add for sort order in tree */
294 1.1 christos (void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
295 1.1 christos tp->num_unused++;
296 1.1 christos (void)rbtree_insert(xfrd->tcp_set->pipetree, &tp->node);
297 1.1 christos }
298 1.1 christos
299 1.1 christos /* stop the tcp pipe (and all its zones need to retry) */
300 1.1 christos static void
301 1.1 christos xfrd_tcp_pipe_stop(struct xfrd_tcp_pipeline* tp)
302 1.1 christos {
303 1.1 christos int i, conn = -1;
304 1.1 christos assert(tp->num_unused < ID_PIPE_NUM); /* at least one 'in-use' */
305 1.1 christos assert(ID_PIPE_NUM - tp->num_unused > tp->num_skip); /* at least one 'nonskip' */
306 1.1 christos /* need to retry for all the zones connected to it */
307 1.1 christos /* these could use different lists and go to a different nextmaster*/
308 1.1 christos for(i=0; i<ID_PIPE_NUM; i++) {
309 1.1 christos if(tp->id[i] && tp->id[i] != TCP_NULL_SKIP) {
310 1.1.1.2 christos xfrd_zone_type* zone = tp->id[i];
311 1.1 christos conn = zone->tcp_conn;
312 1.1 christos zone->tcp_conn = -1;
313 1.1 christos zone->tcp_waiting = 0;
314 1.1 christos tcp_pipe_sendlist_remove(tp, zone);
315 1.1 christos tcp_pipe_id_remove(tp, zone);
316 1.1 christos xfrd_set_refresh_now(zone);
317 1.1 christos }
318 1.1 christos }
319 1.1 christos assert(conn != -1);
320 1.1 christos /* now release the entire tcp pipe */
321 1.1 christos xfrd_tcp_pipe_release(xfrd->tcp_set, tp, conn);
322 1.1 christos }
323 1.1 christos
324 1.1 christos static void
325 1.1 christos tcp_pipe_reset_timeout(struct xfrd_tcp_pipeline* tp)
326 1.1 christos {
327 1.1 christos int fd = tp->handler.ev_fd;
328 1.1 christos struct timeval tv;
329 1.1 christos tv.tv_sec = xfrd->tcp_set->tcp_timeout;
330 1.1 christos tv.tv_usec = 0;
331 1.1 christos if(tp->handler_added)
332 1.1 christos event_del(&tp->handler);
333 1.1.1.3 christos memset(&tp->handler, 0, sizeof(tp->handler));
334 1.1 christos event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|
335 1.1 christos (tp->tcp_send_first?EV_WRITE:0), xfrd_handle_tcp_pipe, tp);
336 1.1 christos if(event_base_set(xfrd->event_base, &tp->handler) != 0)
337 1.1 christos log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
338 1.1 christos if(event_add(&tp->handler, &tv) != 0)
339 1.1 christos log_msg(LOG_ERR, "xfrd tcp: event_add failed");
340 1.1 christos tp->handler_added = 1;
341 1.1 christos }
342 1.1 christos
343 1.1 christos /* handle event from fd of tcp pipe */
344 1.1 christos void
345 1.1 christos xfrd_handle_tcp_pipe(int ATTR_UNUSED(fd), short event, void* arg)
346 1.1 christos {
347 1.1 christos struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)arg;
348 1.1 christos if((event & EV_WRITE)) {
349 1.1 christos tcp_pipe_reset_timeout(tp);
350 1.1 christos if(tp->tcp_send_first) {
351 1.1 christos DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp write, zone %s",
352 1.1 christos tp->tcp_send_first->apex_str));
353 1.1 christos xfrd_tcp_write(tp, tp->tcp_send_first);
354 1.1 christos }
355 1.1 christos }
356 1.1 christos if((event & EV_READ) && tp->handler_added) {
357 1.1 christos DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp read"));
358 1.1 christos tcp_pipe_reset_timeout(tp);
359 1.1 christos xfrd_tcp_read(tp);
360 1.1 christos }
361 1.1 christos if((event & EV_TIMEOUT) && tp->handler_added) {
362 1.1 christos /* tcp connection timed out */
363 1.1 christos DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp timeout"));
364 1.1 christos xfrd_tcp_pipe_stop(tp);
365 1.1 christos }
366 1.1 christos }
367 1.1 christos
368 1.1 christos /* add a zone to the pipeline, it starts to want to write its query */
369 1.1 christos static void
370 1.1.1.2 christos pipeline_setup_new_zone(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp,
371 1.1.1.2 christos xfrd_zone_type* zone)
372 1.1 christos {
373 1.1 christos /* assign the ID */
374 1.1 christos int idx;
375 1.1 christos assert(tp->num_unused > 0);
376 1.1 christos /* we pick a random ID, even though it is TCP anyway */
377 1.1 christos idx = random_generate(tp->num_unused);
378 1.1 christos zone->query_id = tp->unused[idx];
379 1.1 christos tp->unused[idx] = tp->unused[tp->num_unused-1];
380 1.1 christos tp->id[zone->query_id] = zone;
381 1.1 christos /* decrement unused counter, and fixup tree */
382 1.1 christos (void)rbtree_delete(set->pipetree, &tp->node);
383 1.1 christos tp->num_unused--;
384 1.1 christos (void)rbtree_insert(set->pipetree, &tp->node);
385 1.1 christos
386 1.1 christos /* add to sendlist, at end */
387 1.1 christos zone->tcp_send_next = NULL;
388 1.1 christos zone->tcp_send_prev = tp->tcp_send_last;
389 1.1 christos zone->in_tcp_send = 1;
390 1.1 christos if(tp->tcp_send_last)
391 1.1 christos tp->tcp_send_last->tcp_send_next = zone;
392 1.1 christos else tp->tcp_send_first = zone;
393 1.1 christos tp->tcp_send_last = zone;
394 1.1 christos
395 1.1 christos /* is it first in line? */
396 1.1 christos if(tp->tcp_send_first == zone) {
397 1.1 christos xfrd_tcp_setup_write_packet(tp, zone);
398 1.1 christos /* add write to event handler */
399 1.1 christos tcp_pipe_reset_timeout(tp);
400 1.1 christos }
401 1.1 christos }
402 1.1 christos
403 1.1 christos void
404 1.1.1.2 christos xfrd_tcp_obtain(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
405 1.1 christos {
406 1.1 christos struct xfrd_tcp_pipeline* tp;
407 1.1 christos assert(zone->tcp_conn == -1);
408 1.1 christos assert(zone->tcp_waiting == 0);
409 1.1 christos
410 1.1 christos if(set->tcp_count < XFRD_MAX_TCP) {
411 1.1 christos int i;
412 1.1 christos assert(!set->tcp_waiting_first);
413 1.1 christos set->tcp_count ++;
414 1.1 christos /* find a free tcp_buffer */
415 1.1 christos for(i=0; i<XFRD_MAX_TCP; i++) {
416 1.1 christos if(set->tcp_state[i]->tcp_r->fd == -1) {
417 1.1 christos zone->tcp_conn = i;
418 1.1 christos break;
419 1.1 christos }
420 1.1 christos }
421 1.1 christos /** What if there is no free tcp_buffer? return; */
422 1.1 christos if (zone->tcp_conn < 0) {
423 1.1 christos return;
424 1.1 christos }
425 1.1 christos
426 1.1 christos tp = set->tcp_state[zone->tcp_conn];
427 1.1 christos zone->tcp_waiting = 0;
428 1.1 christos
429 1.1 christos /* stop udp use (if any) */
430 1.1 christos if(zone->zone_handler.ev_fd != -1)
431 1.1 christos xfrd_udp_release(zone);
432 1.1 christos
433 1.1 christos if(!xfrd_tcp_open(set, tp, zone)) {
434 1.1 christos zone->tcp_conn = -1;
435 1.1 christos set->tcp_count --;
436 1.1 christos xfrd_set_refresh_now(zone);
437 1.1 christos return;
438 1.1 christos }
439 1.1 christos /* ip and ip_len set by tcp_open */
440 1.1 christos tp->node.key = tp;
441 1.1 christos tp->num_unused = ID_PIPE_NUM;
442 1.1 christos tp->num_skip = 0;
443 1.1 christos tp->tcp_send_first = NULL;
444 1.1 christos tp->tcp_send_last = NULL;
445 1.1 christos memset(tp->id, 0, sizeof(tp->id));
446 1.1 christos for(i=0; i<ID_PIPE_NUM; i++) {
447 1.1 christos tp->unused[i] = i;
448 1.1 christos }
449 1.1 christos
450 1.1 christos /* insert into tree */
451 1.1 christos (void)rbtree_insert(set->pipetree, &tp->node);
452 1.1 christos xfrd_deactivate_zone(zone);
453 1.1 christos xfrd_unset_timer(zone);
454 1.1 christos pipeline_setup_new_zone(set, tp, zone);
455 1.1 christos return;
456 1.1 christos }
457 1.1 christos /* check for a pipeline to the same master with unused ID */
458 1.1 christos if((tp = pipeline_find(set, zone))!= NULL) {
459 1.1 christos int i;
460 1.1 christos if(zone->zone_handler.ev_fd != -1)
461 1.1 christos xfrd_udp_release(zone);
462 1.1 christos for(i=0; i<XFRD_MAX_TCP; i++) {
463 1.1 christos if(set->tcp_state[i] == tp)
464 1.1 christos zone->tcp_conn = i;
465 1.1 christos }
466 1.1 christos xfrd_deactivate_zone(zone);
467 1.1 christos xfrd_unset_timer(zone);
468 1.1 christos pipeline_setup_new_zone(set, tp, zone);
469 1.1 christos return;
470 1.1 christos }
471 1.1 christos
472 1.1 christos /* wait, at end of line */
473 1.1 christos DEBUG(DEBUG_XFRD,2, (LOG_INFO, "xfrd: max number of tcp "
474 1.1 christos "connections (%d) reached.", XFRD_MAX_TCP));
475 1.1 christos zone->tcp_waiting_next = 0;
476 1.1 christos zone->tcp_waiting_prev = set->tcp_waiting_last;
477 1.1 christos zone->tcp_waiting = 1;
478 1.1 christos if(!set->tcp_waiting_last) {
479 1.1 christos set->tcp_waiting_first = zone;
480 1.1 christos set->tcp_waiting_last = zone;
481 1.1 christos } else {
482 1.1 christos set->tcp_waiting_last->tcp_waiting_next = zone;
483 1.1 christos set->tcp_waiting_last = zone;
484 1.1 christos }
485 1.1 christos xfrd_deactivate_zone(zone);
486 1.1 christos xfrd_unset_timer(zone);
487 1.1 christos }
488 1.1 christos
489 1.1 christos int
490 1.1.1.2 christos xfrd_tcp_open(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp,
491 1.1.1.2 christos xfrd_zone_type* zone)
492 1.1 christos {
493 1.1 christos int fd, family, conn;
494 1.1 christos struct timeval tv;
495 1.1 christos assert(zone->tcp_conn != -1);
496 1.1 christos
497 1.1 christos /* if there is no next master, fallback to use the first one */
498 1.1 christos /* but there really should be a master set */
499 1.1 christos if(!zone->master) {
500 1.1 christos zone->master = zone->zone_options->pattern->request_xfr;
501 1.1 christos zone->master_num = 0;
502 1.1 christos }
503 1.1 christos
504 1.1 christos DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s open tcp conn to %s",
505 1.1 christos zone->apex_str, zone->master->ip_address_spec));
506 1.1 christos tp->tcp_r->is_reading = 1;
507 1.1 christos tp->tcp_r->total_bytes = 0;
508 1.1 christos tp->tcp_r->msglen = 0;
509 1.1 christos buffer_clear(tp->tcp_r->packet);
510 1.1 christos tp->tcp_w->is_reading = 0;
511 1.1 christos tp->tcp_w->total_bytes = 0;
512 1.1 christos tp->tcp_w->msglen = 0;
513 1.1 christos tp->connection_established = 0;
514 1.1 christos
515 1.1 christos if(zone->master->is_ipv6) {
516 1.1 christos #ifdef INET6
517 1.1 christos family = PF_INET6;
518 1.1 christos #else
519 1.1 christos xfrd_set_refresh_now(zone);
520 1.1 christos return 0;
521 1.1 christos #endif
522 1.1 christos } else {
523 1.1 christos family = PF_INET;
524 1.1 christos }
525 1.1 christos fd = socket(family, SOCK_STREAM, IPPROTO_TCP);
526 1.1 christos if(fd == -1) {
527 1.1.1.2 christos /* squelch 'Address family not supported by protocol' at low
528 1.1.1.2 christos * verbosity levels */
529 1.1.1.2 christos if(errno != EAFNOSUPPORT || verbosity > 2)
530 1.1.1.2 christos log_msg(LOG_ERR, "xfrd: %s cannot create tcp socket: %s",
531 1.1 christos zone->master->ip_address_spec, strerror(errno));
532 1.1 christos xfrd_set_refresh_now(zone);
533 1.1 christos return 0;
534 1.1 christos }
535 1.1 christos if(fcntl(fd, F_SETFL, O_NONBLOCK) == -1) {
536 1.1 christos log_msg(LOG_ERR, "xfrd: fcntl failed: %s", strerror(errno));
537 1.1 christos close(fd);
538 1.1 christos xfrd_set_refresh_now(zone);
539 1.1 christos return 0;
540 1.1 christos }
541 1.1 christos
542 1.1 christos if(xfrd->nsd->outgoing_tcp_mss > 0) {
543 1.1 christos #if defined(IPPROTO_TCP) && defined(TCP_MAXSEG)
544 1.1 christos if(setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG,
545 1.1 christos (void*)&xfrd->nsd->outgoing_tcp_mss,
546 1.1 christos sizeof(xfrd->nsd->outgoing_tcp_mss)) < 0) {
547 1.1 christos log_msg(LOG_ERR, "xfrd: setsockopt(TCP_MAXSEG)"
548 1.1 christos "failed: %s", strerror(errno));
549 1.1 christos }
550 1.1 christos #else
551 1.1 christos log_msg(LOG_ERR, "setsockopt(TCP_MAXSEG) unsupported");
552 1.1 christos #endif
553 1.1 christos }
554 1.1 christos
555 1.1 christos tp->ip_len = xfrd_acl_sockaddr_to(zone->master, &tp->ip);
556 1.1 christos
557 1.1 christos /* bind it */
558 1.1 christos if (!xfrd_bind_local_interface(fd, zone->zone_options->pattern->
559 1.1 christos outgoing_interface, zone->master, 1)) {
560 1.1 christos close(fd);
561 1.1 christos xfrd_set_refresh_now(zone);
562 1.1 christos return 0;
563 1.1 christos }
564 1.1 christos
565 1.1 christos conn = connect(fd, (struct sockaddr*)&tp->ip, tp->ip_len);
566 1.1 christos if (conn == -1 && errno != EINPROGRESS) {
567 1.1 christos log_msg(LOG_ERR, "xfrd: connect %s failed: %s",
568 1.1 christos zone->master->ip_address_spec, strerror(errno));
569 1.1 christos close(fd);
570 1.1 christos xfrd_set_refresh_now(zone);
571 1.1 christos return 0;
572 1.1 christos }
573 1.1 christos tp->tcp_r->fd = fd;
574 1.1 christos tp->tcp_w->fd = fd;
575 1.1 christos
576 1.1 christos /* set the tcp pipe event */
577 1.1 christos if(tp->handler_added)
578 1.1 christos event_del(&tp->handler);
579 1.1.1.3 christos memset(&tp->handler, 0, sizeof(tp->handler));
580 1.1 christos event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|EV_WRITE,
581 1.1 christos xfrd_handle_tcp_pipe, tp);
582 1.1 christos if(event_base_set(xfrd->event_base, &tp->handler) != 0)
583 1.1 christos log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
584 1.1 christos tv.tv_sec = set->tcp_timeout;
585 1.1 christos tv.tv_usec = 0;
586 1.1 christos if(event_add(&tp->handler, &tv) != 0)
587 1.1 christos log_msg(LOG_ERR, "xfrd tcp: event_add failed");
588 1.1 christos tp->handler_added = 1;
589 1.1 christos return 1;
590 1.1 christos }
591 1.1 christos
592 1.1 christos void
593 1.1.1.2 christos xfrd_tcp_setup_write_packet(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
594 1.1 christos {
595 1.1.1.2 christos struct xfrd_tcp* tcp = tp->tcp_w;
596 1.1 christos assert(zone->tcp_conn != -1);
597 1.1 christos assert(zone->tcp_waiting == 0);
598 1.1 christos /* start AXFR or IXFR for the zone */
599 1.1 christos if(zone->soa_disk_acquired == 0 || zone->master->use_axfr_only ||
600 1.1 christos zone->master->ixfr_disabled ||
601 1.1 christos /* if zone expired, after the first round, do not ask for
602 1.1 christos * IXFR any more, but full AXFR (of any serial number) */
603 1.1 christos (zone->state == xfrd_zone_expired && zone->round_num != 0)) {
604 1.1 christos DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request full zone transfer "
605 1.1 christos "(AXFR) for %s to %s",
606 1.1 christos zone->apex_str, zone->master->ip_address_spec));
607 1.1 christos
608 1.1 christos xfrd_setup_packet(tcp->packet, TYPE_AXFR, CLASS_IN, zone->apex,
609 1.1 christos zone->query_id);
610 1.1 christos } else {
611 1.1 christos DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request incremental zone "
612 1.1 christos "transfer (IXFR) for %s to %s",
613 1.1 christos zone->apex_str, zone->master->ip_address_spec));
614 1.1 christos
615 1.1 christos xfrd_setup_packet(tcp->packet, TYPE_IXFR, CLASS_IN, zone->apex,
616 1.1 christos zone->query_id);
617 1.1 christos NSCOUNT_SET(tcp->packet, 1);
618 1.1 christos xfrd_write_soa_buffer(tcp->packet, zone->apex, &zone->soa_disk);
619 1.1 christos }
620 1.1 christos /* old transfer needs to be removed still? */
621 1.1 christos if(zone->msg_seq_nr)
622 1.1 christos xfrd_unlink_xfrfile(xfrd->nsd, zone->xfrfilenumber);
623 1.1 christos zone->msg_seq_nr = 0;
624 1.1 christos zone->msg_rr_count = 0;
625 1.1 christos if(zone->master->key_options && zone->master->key_options->tsig_key) {
626 1.1 christos xfrd_tsig_sign_request(tcp->packet, &zone->tsig, zone->master);
627 1.1 christos }
628 1.1 christos buffer_flip(tcp->packet);
629 1.1 christos DEBUG(DEBUG_XFRD,1, (LOG_INFO, "sent tcp query with ID %d", zone->query_id));
630 1.1 christos tcp->msglen = buffer_limit(tcp->packet);
631 1.1 christos tcp->total_bytes = 0;
632 1.1 christos }
633 1.1 christos
634 1.1 christos static void
635 1.1.1.2 christos tcp_conn_ready_for_reading(struct xfrd_tcp* tcp)
636 1.1 christos {
637 1.1 christos tcp->total_bytes = 0;
638 1.1 christos tcp->msglen = 0;
639 1.1 christos buffer_clear(tcp->packet);
640 1.1 christos }
641 1.1 christos
642 1.1.1.2 christos int conn_write(struct xfrd_tcp* tcp)
643 1.1 christos {
644 1.1 christos ssize_t sent;
645 1.1 christos
646 1.1 christos if(tcp->total_bytes < sizeof(tcp->msglen)) {
647 1.1 christos uint16_t sendlen = htons(tcp->msglen);
648 1.1.1.2 christos #ifdef HAVE_WRITEV
649 1.1.1.2 christos struct iovec iov[2];
650 1.1.1.2 christos iov[0].iov_base = (uint8_t*)&sendlen + tcp->total_bytes;
651 1.1.1.2 christos iov[0].iov_len = sizeof(sendlen) - tcp->total_bytes;
652 1.1.1.2 christos iov[1].iov_base = buffer_begin(tcp->packet);
653 1.1.1.2 christos iov[1].iov_len = buffer_limit(tcp->packet);
654 1.1.1.2 christos sent = writev(tcp->fd, iov, 2);
655 1.1.1.2 christos #else /* HAVE_WRITEV */
656 1.1 christos sent = write(tcp->fd,
657 1.1 christos (const char*)&sendlen + tcp->total_bytes,
658 1.1 christos sizeof(tcp->msglen) - tcp->total_bytes);
659 1.1.1.2 christos #endif /* HAVE_WRITEV */
660 1.1 christos
661 1.1 christos if(sent == -1) {
662 1.1 christos if(errno == EAGAIN || errno == EINTR) {
663 1.1 christos /* write would block, try later */
664 1.1 christos return 0;
665 1.1 christos } else {
666 1.1 christos return -1;
667 1.1 christos }
668 1.1 christos }
669 1.1 christos
670 1.1 christos tcp->total_bytes += sent;
671 1.1.1.2 christos if(sent > (ssize_t)sizeof(tcp->msglen))
672 1.1.1.2 christos buffer_skip(tcp->packet, sent-sizeof(tcp->msglen));
673 1.1 christos if(tcp->total_bytes < sizeof(tcp->msglen)) {
674 1.1 christos /* incomplete write, resume later */
675 1.1 christos return 0;
676 1.1 christos }
677 1.1.1.2 christos #ifdef HAVE_WRITEV
678 1.1.1.2 christos if(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen)) {
679 1.1.1.2 christos /* packet done */
680 1.1.1.2 christos return 1;
681 1.1.1.2 christos }
682 1.1.1.2 christos #endif
683 1.1.1.2 christos assert(tcp->total_bytes >= sizeof(tcp->msglen));
684 1.1 christos }
685 1.1 christos
686 1.1 christos assert(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen));
687 1.1 christos
688 1.1 christos sent = write(tcp->fd,
689 1.1 christos buffer_current(tcp->packet),
690 1.1 christos buffer_remaining(tcp->packet));
691 1.1 christos if(sent == -1) {
692 1.1 christos if(errno == EAGAIN || errno == EINTR) {
693 1.1 christos /* write would block, try later */
694 1.1 christos return 0;
695 1.1 christos } else {
696 1.1 christos return -1;
697 1.1 christos }
698 1.1 christos }
699 1.1 christos
700 1.1 christos buffer_skip(tcp->packet, sent);
701 1.1 christos tcp->total_bytes += sent;
702 1.1 christos
703 1.1 christos if(tcp->total_bytes < tcp->msglen + sizeof(tcp->msglen)) {
704 1.1 christos /* more to write when socket becomes writable again */
705 1.1 christos return 0;
706 1.1 christos }
707 1.1 christos
708 1.1 christos assert(tcp->total_bytes == tcp->msglen + sizeof(tcp->msglen));
709 1.1 christos return 1;
710 1.1 christos }
711 1.1 christos
712 1.1 christos void
713 1.1.1.2 christos xfrd_tcp_write(struct xfrd_tcp_pipeline* tp, xfrd_zone_type* zone)
714 1.1 christos {
715 1.1 christos int ret;
716 1.1.1.2 christos struct xfrd_tcp* tcp = tp->tcp_w;
717 1.1 christos assert(zone->tcp_conn != -1);
718 1.1 christos assert(zone == tp->tcp_send_first);
719 1.1 christos /* see if for non-established connection, there is a connect error */
720 1.1 christos if(!tp->connection_established) {
721 1.1 christos /* check for pending error from nonblocking connect */
722 1.1 christos /* from Stevens, unix network programming, vol1, 3rd ed, p450 */
723 1.1 christos int error = 0;
724 1.1 christos socklen_t len = sizeof(error);
725 1.1 christos if(getsockopt(tcp->fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0){
726 1.1 christos error = errno; /* on solaris errno is error */
727 1.1 christos }
728 1.1 christos if(error == EINPROGRESS || error == EWOULDBLOCK)
729 1.1 christos return; /* try again later */
730 1.1 christos if(error != 0) {
731 1.1 christos log_msg(LOG_ERR, "%s: Could not tcp connect to %s: %s",
732 1.1 christos zone->apex_str, zone->master->ip_address_spec,
733 1.1 christos strerror(error));
734 1.1 christos xfrd_tcp_pipe_stop(tp);
735 1.1 christos return;
736 1.1 christos }
737 1.1 christos }
738 1.1 christos ret = conn_write(tcp);
739 1.1 christos if(ret == -1) {
740 1.1 christos log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
741 1.1 christos xfrd_tcp_pipe_stop(tp);
742 1.1 christos return;
743 1.1 christos }
744 1.1 christos if(tcp->total_bytes != 0 && !tp->connection_established)
745 1.1 christos tp->connection_established = 1;
746 1.1 christos if(ret == 0) {
747 1.1 christos return; /* write again later */
748 1.1 christos }
749 1.1 christos /* done writing this message */
750 1.1 christos
751 1.1 christos /* remove first zone from sendlist */
752 1.1 christos tcp_pipe_sendlist_popfirst(tp, zone);
753 1.1 christos
754 1.1 christos /* see if other zone wants to write; init; let it write (now) */
755 1.1 christos /* and use a loop, because 64k stack calls is a too much */
756 1.1 christos while(tp->tcp_send_first) {
757 1.1 christos /* setup to write for this zone */
758 1.1 christos xfrd_tcp_setup_write_packet(tp, tp->tcp_send_first);
759 1.1 christos /* attempt to write for this zone (if success, continue loop)*/
760 1.1 christos ret = conn_write(tcp);
761 1.1 christos if(ret == -1) {
762 1.1 christos log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
763 1.1 christos xfrd_tcp_pipe_stop(tp);
764 1.1 christos return;
765 1.1 christos }
766 1.1 christos if(ret == 0)
767 1.1 christos return; /* write again later */
768 1.1 christos tcp_pipe_sendlist_popfirst(tp, tp->tcp_send_first);
769 1.1 christos }
770 1.1 christos
771 1.1 christos /* if sendlist empty, remove WRITE from event */
772 1.1 christos
773 1.1 christos /* listen to READ, and not WRITE events */
774 1.1 christos assert(tp->tcp_send_first == NULL);
775 1.1 christos tcp_pipe_reset_timeout(tp);
776 1.1 christos }
777 1.1 christos
778 1.1 christos int
779 1.1.1.2 christos conn_read(struct xfrd_tcp* tcp)
780 1.1 christos {
781 1.1 christos ssize_t received;
782 1.1 christos /* receive leading packet length bytes */
783 1.1 christos if(tcp->total_bytes < sizeof(tcp->msglen)) {
784 1.1 christos received = read(tcp->fd,
785 1.1 christos (char*) &tcp->msglen + tcp->total_bytes,
786 1.1 christos sizeof(tcp->msglen) - tcp->total_bytes);
787 1.1 christos if(received == -1) {
788 1.1 christos if(errno == EAGAIN || errno == EINTR) {
789 1.1 christos /* read would block, try later */
790 1.1 christos return 0;
791 1.1 christos } else {
792 1.1 christos #ifdef ECONNRESET
793 1.1 christos if (verbosity >= 2 || errno != ECONNRESET)
794 1.1 christos #endif /* ECONNRESET */
795 1.1 christos log_msg(LOG_ERR, "tcp read sz: %s", strerror(errno));
796 1.1 christos return -1;
797 1.1 christos }
798 1.1 christos } else if(received == 0) {
799 1.1 christos /* EOF */
800 1.1 christos return -1;
801 1.1 christos }
802 1.1 christos tcp->total_bytes += received;
803 1.1 christos if(tcp->total_bytes < sizeof(tcp->msglen)) {
804 1.1 christos /* not complete yet, try later */
805 1.1 christos return 0;
806 1.1 christos }
807 1.1 christos
808 1.1 christos assert(tcp->total_bytes == sizeof(tcp->msglen));
809 1.1 christos tcp->msglen = ntohs(tcp->msglen);
810 1.1 christos
811 1.1 christos if(tcp->msglen == 0) {
812 1.1 christos buffer_set_limit(tcp->packet, tcp->msglen);
813 1.1 christos return 1;
814 1.1 christos }
815 1.1 christos if(tcp->msglen > buffer_capacity(tcp->packet)) {
816 1.1 christos log_msg(LOG_ERR, "buffer too small, dropping connection");
817 1.1 christos return 0;
818 1.1 christos }
819 1.1 christos buffer_set_limit(tcp->packet, tcp->msglen);
820 1.1 christos }
821 1.1 christos
822 1.1 christos assert(buffer_remaining(tcp->packet) > 0);
823 1.1 christos
824 1.1 christos received = read(tcp->fd, buffer_current(tcp->packet),
825 1.1 christos buffer_remaining(tcp->packet));
826 1.1 christos if(received == -1) {
827 1.1 christos if(errno == EAGAIN || errno == EINTR) {
828 1.1 christos /* read would block, try later */
829 1.1 christos return 0;
830 1.1 christos } else {
831 1.1 christos #ifdef ECONNRESET
832 1.1 christos if (verbosity >= 2 || errno != ECONNRESET)
833 1.1 christos #endif /* ECONNRESET */
834 1.1 christos log_msg(LOG_ERR, "tcp read %s", strerror(errno));
835 1.1 christos return -1;
836 1.1 christos }
837 1.1 christos } else if(received == 0) {
838 1.1 christos /* EOF */
839 1.1 christos return -1;
840 1.1 christos }
841 1.1 christos
842 1.1 christos tcp->total_bytes += received;
843 1.1 christos buffer_skip(tcp->packet, received);
844 1.1 christos
845 1.1 christos if(buffer_remaining(tcp->packet) > 0) {
846 1.1 christos /* not complete yet, wait for more */
847 1.1 christos return 0;
848 1.1 christos }
849 1.1 christos
850 1.1 christos /* completed */
851 1.1 christos assert(buffer_position(tcp->packet) == tcp->msglen);
852 1.1 christos return 1;
853 1.1 christos }
854 1.1 christos
855 1.1 christos void
856 1.1 christos xfrd_tcp_read(struct xfrd_tcp_pipeline* tp)
857 1.1 christos {
858 1.1.1.2 christos xfrd_zone_type* zone;
859 1.1.1.2 christos struct xfrd_tcp* tcp = tp->tcp_r;
860 1.1 christos int ret;
861 1.1 christos enum xfrd_packet_result pkt_result;
862 1.1 christos
863 1.1 christos ret = conn_read(tcp);
864 1.1 christos if(ret == -1) {
865 1.1 christos xfrd_tcp_pipe_stop(tp);
866 1.1 christos return;
867 1.1 christos }
868 1.1 christos if(ret == 0)
869 1.1 christos return;
870 1.1 christos /* completed msg */
871 1.1 christos buffer_flip(tcp->packet);
872 1.1 christos /* see which ID number it is, if skip, handle skip, NULL: warn */
873 1.1 christos if(tcp->msglen < QHEADERSZ) {
874 1.1 christos /* too short for DNS header, skip it */
875 1.1 christos DEBUG(DEBUG_XFRD,1, (LOG_INFO,
876 1.1 christos "xfrd: tcp skip response that is too short"));
877 1.1 christos tcp_conn_ready_for_reading(tcp);
878 1.1 christos return;
879 1.1 christos }
880 1.1 christos zone = tp->id[ID(tcp->packet)];
881 1.1 christos if(!zone || zone == TCP_NULL_SKIP) {
882 1.1 christos /* no zone for this id? skip it */
883 1.1 christos DEBUG(DEBUG_XFRD,1, (LOG_INFO,
884 1.1 christos "xfrd: tcp skip response with %s ID",
885 1.1 christos zone?"set-to-skip":"unknown"));
886 1.1 christos tcp_conn_ready_for_reading(tcp);
887 1.1 christos return;
888 1.1 christos }
889 1.1 christos assert(zone->tcp_conn != -1);
890 1.1 christos
891 1.1 christos /* handle message for zone */
892 1.1 christos pkt_result = xfrd_handle_received_xfr_packet(zone, tcp->packet);
893 1.1 christos /* setup for reading the next packet on this connection */
894 1.1 christos tcp_conn_ready_for_reading(tcp);
895 1.1 christos switch(pkt_result) {
896 1.1 christos case xfrd_packet_more:
897 1.1 christos /* wait for next packet */
898 1.1 christos break;
899 1.1 christos case xfrd_packet_newlease:
900 1.1 christos /* set to skip if more packets with this ID */
901 1.1 christos tp->id[zone->query_id] = TCP_NULL_SKIP;
902 1.1 christos tp->num_skip++;
903 1.1 christos /* fall through to remove zone from tp */
904 1.1.1.2 christos /* fallthrough */
905 1.1 christos case xfrd_packet_transfer:
906 1.1 christos if(zone->zone_options->pattern->multi_master_check) {
907 1.1 christos xfrd_tcp_release(xfrd->tcp_set, zone);
908 1.1 christos xfrd_make_request(zone);
909 1.1 christos break;
910 1.1 christos }
911 1.1 christos xfrd_tcp_release(xfrd->tcp_set, zone);
912 1.1 christos assert(zone->round_num == -1);
913 1.1 christos break;
914 1.1 christos case xfrd_packet_notimpl:
915 1.1 christos xfrd_disable_ixfr(zone);
916 1.1 christos xfrd_tcp_release(xfrd->tcp_set, zone);
917 1.1 christos /* query next server */
918 1.1 christos xfrd_make_request(zone);
919 1.1 christos break;
920 1.1 christos case xfrd_packet_bad:
921 1.1 christos case xfrd_packet_tcp:
922 1.1 christos default:
923 1.1 christos /* set to skip if more packets with this ID */
924 1.1 christos tp->id[zone->query_id] = TCP_NULL_SKIP;
925 1.1 christos tp->num_skip++;
926 1.1 christos xfrd_tcp_release(xfrd->tcp_set, zone);
927 1.1 christos /* query next server */
928 1.1 christos xfrd_make_request(zone);
929 1.1 christos break;
930 1.1 christos }
931 1.1 christos }
932 1.1 christos
933 1.1 christos void
934 1.1.1.2 christos xfrd_tcp_release(struct xfrd_tcp_set* set, xfrd_zone_type* zone)
935 1.1 christos {
936 1.1 christos int conn = zone->tcp_conn;
937 1.1 christos struct xfrd_tcp_pipeline* tp = set->tcp_state[conn];
938 1.1 christos DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s released tcp conn to %s",
939 1.1 christos zone->apex_str, zone->master->ip_address_spec));
940 1.1 christos assert(zone->tcp_conn != -1);
941 1.1 christos assert(zone->tcp_waiting == 0);
942 1.1 christos zone->tcp_conn = -1;
943 1.1 christos zone->tcp_waiting = 0;
944 1.1 christos
945 1.1 christos /* remove from tcp_send list */
946 1.1 christos tcp_pipe_sendlist_remove(tp, zone);
947 1.1 christos /* remove it from the ID list */
948 1.1 christos if(tp->id[zone->query_id] != TCP_NULL_SKIP)
949 1.1 christos tcp_pipe_id_remove(tp, zone);
950 1.1 christos DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: released tcp pipe now %d unused",
951 1.1 christos tp->num_unused));
952 1.1 christos /* if pipe was full, but no more, then see if waiting element is
953 1.1 christos * for the same master, and can fill the unused ID */
954 1.1 christos if(tp->num_unused == 1 && set->tcp_waiting_first) {
955 1.1 christos #ifdef INET6
956 1.1 christos struct sockaddr_storage to;
957 1.1 christos #else
958 1.1 christos struct sockaddr_in to;
959 1.1 christos #endif
960 1.1 christos socklen_t to_len = xfrd_acl_sockaddr_to(
961 1.1 christos set->tcp_waiting_first->master, &to);
962 1.1 christos if(to_len == tp->ip_len && memcmp(&to, &tp->ip, to_len) == 0) {
963 1.1 christos /* use this connection for the waiting zone */
964 1.1 christos zone = set->tcp_waiting_first;
965 1.1 christos assert(zone->tcp_conn == -1);
966 1.1 christos zone->tcp_conn = conn;
967 1.1 christos tcp_zone_waiting_list_popfirst(set, zone);
968 1.1 christos if(zone->zone_handler.ev_fd != -1)
969 1.1 christos xfrd_udp_release(zone);
970 1.1 christos xfrd_unset_timer(zone);
971 1.1 christos pipeline_setup_new_zone(set, tp, zone);
972 1.1 christos return;
973 1.1 christos }
974 1.1 christos /* waiting zone did not go to same server */
975 1.1 christos }
976 1.1 christos
977 1.1 christos /* if all unused, or only skipped leftover, close the pipeline */
978 1.1 christos if(tp->num_unused >= ID_PIPE_NUM || tp->num_skip >= ID_PIPE_NUM - tp->num_unused)
979 1.1 christos xfrd_tcp_pipe_release(set, tp, conn);
980 1.1 christos }
981 1.1 christos
982 1.1 christos void
983 1.1.1.2 christos xfrd_tcp_pipe_release(struct xfrd_tcp_set* set, struct xfrd_tcp_pipeline* tp,
984 1.1 christos int conn)
985 1.1 christos {
986 1.1 christos DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: tcp pipe released"));
987 1.1 christos /* one handler per tcp pipe */
988 1.1 christos if(tp->handler_added)
989 1.1 christos event_del(&tp->handler);
990 1.1 christos tp->handler_added = 0;
991 1.1 christos
992 1.1 christos /* fd in tcp_r and tcp_w is the same, close once */
993 1.1 christos if(tp->tcp_r->fd != -1)
994 1.1 christos close(tp->tcp_r->fd);
995 1.1 christos tp->tcp_r->fd = -1;
996 1.1 christos tp->tcp_w->fd = -1;
997 1.1 christos
998 1.1 christos /* remove from pipetree */
999 1.1 christos (void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
1000 1.1 christos
1001 1.1 christos /* a waiting zone can use the free tcp slot (to another server) */
1002 1.1 christos /* if that zone fails to set-up or connect, we try to start the next
1003 1.1 christos * waiting zone in the list */
1004 1.1 christos while(set->tcp_count == XFRD_MAX_TCP && set->tcp_waiting_first) {
1005 1.1 christos int i;
1006 1.1 christos
1007 1.1 christos /* pop first waiting process */
1008 1.1.1.2 christos xfrd_zone_type* zone = set->tcp_waiting_first;
1009 1.1 christos /* start it */
1010 1.1 christos assert(zone->tcp_conn == -1);
1011 1.1 christos zone->tcp_conn = conn;
1012 1.1 christos tcp_zone_waiting_list_popfirst(set, zone);
1013 1.1 christos
1014 1.1 christos /* stop udp (if any) */
1015 1.1 christos if(zone->zone_handler.ev_fd != -1)
1016 1.1 christos xfrd_udp_release(zone);
1017 1.1 christos if(!xfrd_tcp_open(set, tp, zone)) {
1018 1.1 christos zone->tcp_conn = -1;
1019 1.1 christos xfrd_set_refresh_now(zone);
1020 1.1 christos /* try to start the next zone (if any) */
1021 1.1 christos continue;
1022 1.1 christos }
1023 1.1 christos /* re-init this tcppipe */
1024 1.1 christos /* ip and ip_len set by tcp_open */
1025 1.1 christos tp->node.key = tp;
1026 1.1 christos tp->num_unused = ID_PIPE_NUM;
1027 1.1 christos tp->num_skip = 0;
1028 1.1 christos tp->tcp_send_first = NULL;
1029 1.1 christos tp->tcp_send_last = NULL;
1030 1.1 christos memset(tp->id, 0, sizeof(tp->id));
1031 1.1 christos for(i=0; i<ID_PIPE_NUM; i++) {
1032 1.1 christos tp->unused[i] = i;
1033 1.1 christos }
1034 1.1 christos
1035 1.1 christos /* insert into tree */
1036 1.1 christos (void)rbtree_insert(set->pipetree, &tp->node);
1037 1.1 christos /* setup write */
1038 1.1 christos xfrd_unset_timer(zone);
1039 1.1 christos pipeline_setup_new_zone(set, tp, zone);
1040 1.1 christos /* started a task, no need for cleanups, so return */
1041 1.1 christos return;
1042 1.1 christos }
1043 1.1 christos /* no task to start, cleanup */
1044 1.1 christos assert(!set->tcp_waiting_first);
1045 1.1 christos set->tcp_count --;
1046 1.1 christos assert(set->tcp_count >= 0);
1047 1.1 christos }
1048 1.1 christos
1049