dnstap_collector.c revision 1.1.1.3 1 1.1 prlw1 /*
2 1.1 prlw1 * dnstap/dnstap_collector.c -- nsd collector process for dnstap information
3 1.1 prlw1 *
4 1.1 prlw1 * Copyright (c) 2018, NLnet Labs. All rights reserved.
5 1.1 prlw1 *
6 1.1 prlw1 * See LICENSE for the license.
7 1.1 prlw1 *
8 1.1 prlw1 */
9 1.1 prlw1
10 1.1 prlw1 #include "config.h"
11 1.1 prlw1 #include <sys/types.h>
12 1.1 prlw1 #include <sys/socket.h>
13 1.1 prlw1 #include <errno.h>
14 1.1 prlw1 #include <fcntl.h>
15 1.1 prlw1 #include <unistd.h>
16 1.1 prlw1 #ifndef USE_MINI_EVENT
17 1.1 prlw1 # ifdef HAVE_EVENT_H
18 1.1 prlw1 # include <event.h>
19 1.1 prlw1 # else
20 1.1 prlw1 # include <event2/event.h>
21 1.1 prlw1 # include "event2/event_struct.h"
22 1.1 prlw1 # include "event2/event_compat.h"
23 1.1 prlw1 # endif
24 1.1 prlw1 #else
25 1.1 prlw1 # include "mini_event.h"
26 1.1 prlw1 #endif
27 1.1 prlw1 #include "dnstap/dnstap_collector.h"
28 1.1 prlw1 #include "dnstap/dnstap.h"
29 1.1 prlw1 #include "util.h"
30 1.1 prlw1 #include "nsd.h"
31 1.1 prlw1 #include "region-allocator.h"
32 1.1 prlw1 #include "buffer.h"
33 1.1 prlw1 #include "namedb.h"
34 1.1 prlw1 #include "options.h"
35 1.1.1.3 christos #include "remote.h"
36 1.1 prlw1
37 1.1.1.2 christos #include "udb.h"
38 1.1.1.2 christos #include "rrl.h"
39 1.1.1.2 christos
40 1.1 prlw1 struct dt_collector* dt_collector_create(struct nsd* nsd)
41 1.1 prlw1 {
42 1.1 prlw1 int i, sv[2];
43 1.1 prlw1 struct dt_collector* dt_col = (struct dt_collector*)xalloc_zero(
44 1.1 prlw1 sizeof(*dt_col));
45 1.1.1.2 christos dt_col->count = nsd->child_count * 2;
46 1.1 prlw1 dt_col->dt_env = NULL;
47 1.1 prlw1 dt_col->region = region_create(xalloc, free);
48 1.1 prlw1 dt_col->send_buffer = buffer_create(dt_col->region,
49 1.1.1.2 christos /* msglen + is_response + addrlen + is_tcp + packetlen + packet + zonelen + zone + spare + local_addr + addr */
50 1.1 prlw1 4+1+4+1+4+TCP_MAX_MESSAGE_LEN+4+MAXHOSTNAMELEN + 32 +
51 1.1 prlw1 #ifdef INET6
52 1.1.1.2 christos sizeof(struct sockaddr_storage) + sizeof(struct sockaddr_storage)
53 1.1 prlw1 #else
54 1.1.1.2 christos sizeof(struct sockaddr_in) + sizeof(struct sockaddr_in)
55 1.1 prlw1 #endif
56 1.1 prlw1 );
57 1.1 prlw1
58 1.1.1.2 christos /* open communication channels in struct nsd */
59 1.1 prlw1 nsd->dt_collector_fd_send = (int*)xalloc_array_zero(dt_col->count,
60 1.1 prlw1 sizeof(int));
61 1.1 prlw1 nsd->dt_collector_fd_recv = (int*)xalloc_array_zero(dt_col->count,
62 1.1 prlw1 sizeof(int));
63 1.1 prlw1 for(i=0; i<dt_col->count; i++) {
64 1.1.1.2 christos int sv[2];
65 1.1.1.2 christos int bufsz = buffer_capacity(dt_col->send_buffer);
66 1.1.1.2 christos sv[0] = -1; /* For receiving by parent (dnstap-collector) */
67 1.1.1.2 christos sv[1] = -1; /* For sending by child (server childs) */
68 1.1.1.3 christos if(socketpair(AF_UNIX, SOCK_DGRAM
69 1.1.1.3 christos #ifdef SOCK_NONBLOCK
70 1.1.1.3 christos | SOCK_NONBLOCK
71 1.1.1.3 christos #endif
72 1.1.1.3 christos , 0, sv) < 0) {
73 1.1.1.2 christos error("dnstap_collector: cannot create communication channel: %s",
74 1.1 prlw1 strerror(errno));
75 1.1 prlw1 }
76 1.1.1.3 christos #ifndef SOCK_NONBLOCK
77 1.1.1.3 christos if (fcntl(sv[0], F_SETFL, O_NONBLOCK) == -1) {
78 1.1.1.3 christos log_msg(LOG_ERR, "dnstap_collector receive fd fcntl "
79 1.1.1.3 christos "failed: %s", strerror(errno));
80 1.1.1.3 christos }
81 1.1.1.3 christos if (fcntl(sv[1], F_SETFL, O_NONBLOCK) == -1) {
82 1.1.1.3 christos log_msg(LOG_ERR, "dnstap_collector send fd fcntl "
83 1.1.1.3 christos "failed: %s", strerror(errno));
84 1.1.1.3 christos }
85 1.1.1.3 christos #endif
86 1.1.1.2 christos if(setsockopt(sv[0], SOL_SOCKET, SO_RCVBUF, &bufsz, sizeof(bufsz))) {
87 1.1.1.2 christos log_msg(LOG_ERR, "setting dnstap_collector "
88 1.1.1.2 christos "receive buffer size failed: %s", strerror(errno));
89 1.1 prlw1 }
90 1.1.1.2 christos if(setsockopt(sv[1], SOL_SOCKET, SO_SNDBUF, &bufsz, sizeof(bufsz))) {
91 1.1.1.2 christos log_msg(LOG_ERR, "setting dnstap_collector "
92 1.1.1.2 christos "send buffer size failed: %s", strerror(errno));
93 1.1 prlw1 }
94 1.1.1.2 christos nsd->dt_collector_fd_recv[i] = sv[0];
95 1.1.1.2 christos nsd->dt_collector_fd_send[i] = sv[1];
96 1.1 prlw1 }
97 1.1.1.2 christos nsd->dt_collector_fd_swap = nsd->dt_collector_fd_send + nsd->child_count;
98 1.1 prlw1
99 1.1 prlw1 /* open socketpair */
100 1.1 prlw1 if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
101 1.1 prlw1 error("dnstap_collector: cannot create socketpair: %s",
102 1.1 prlw1 strerror(errno));
103 1.1 prlw1 }
104 1.1 prlw1 if(fcntl(sv[0], F_SETFL, O_NONBLOCK) == -1) {
105 1.1 prlw1 log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno));
106 1.1 prlw1 }
107 1.1 prlw1 if(fcntl(sv[1], F_SETFL, O_NONBLOCK) == -1) {
108 1.1 prlw1 log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno));
109 1.1 prlw1 }
110 1.1 prlw1 dt_col->cmd_socket_dt = sv[0];
111 1.1 prlw1 dt_col->cmd_socket_nsd = sv[1];
112 1.1 prlw1
113 1.1 prlw1 return dt_col;
114 1.1 prlw1 }
115 1.1 prlw1
116 1.1 prlw1 void dt_collector_destroy(struct dt_collector* dt_col, struct nsd* nsd)
117 1.1 prlw1 {
118 1.1 prlw1 if(!dt_col) return;
119 1.1 prlw1 free(nsd->dt_collector_fd_recv);
120 1.1 prlw1 nsd->dt_collector_fd_recv = NULL;
121 1.1.1.2 christos if (nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap)
122 1.1.1.2 christos free(nsd->dt_collector_fd_send);
123 1.1.1.2 christos else
124 1.1.1.2 christos free(nsd->dt_collector_fd_swap);
125 1.1 prlw1 nsd->dt_collector_fd_send = NULL;
126 1.1.1.2 christos nsd->dt_collector_fd_swap = NULL;
127 1.1 prlw1 region_destroy(dt_col->region);
128 1.1 prlw1 free(dt_col);
129 1.1 prlw1 }
130 1.1 prlw1
131 1.1 prlw1 void dt_collector_close(struct dt_collector* dt_col, struct nsd* nsd)
132 1.1 prlw1 {
133 1.1.1.2 christos int i, *fd_send;
134 1.1 prlw1 if(!dt_col) return;
135 1.1 prlw1 if(dt_col->cmd_socket_dt != -1) {
136 1.1 prlw1 close(dt_col->cmd_socket_dt);
137 1.1 prlw1 dt_col->cmd_socket_dt = -1;
138 1.1 prlw1 }
139 1.1 prlw1 if(dt_col->cmd_socket_nsd != -1) {
140 1.1 prlw1 close(dt_col->cmd_socket_nsd);
141 1.1 prlw1 dt_col->cmd_socket_nsd = -1;
142 1.1 prlw1 }
143 1.1.1.2 christos fd_send = nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap
144 1.1.1.2 christos ? nsd->dt_collector_fd_send : nsd->dt_collector_fd_swap;
145 1.1 prlw1 for(i=0; i<dt_col->count; i++) {
146 1.1 prlw1 if(nsd->dt_collector_fd_recv[i] != -1) {
147 1.1 prlw1 close(nsd->dt_collector_fd_recv[i]);
148 1.1 prlw1 nsd->dt_collector_fd_recv[i] = -1;
149 1.1 prlw1 }
150 1.1.1.2 christos if(fd_send[i] != -1) {
151 1.1.1.2 christos close(fd_send[i]);
152 1.1.1.2 christos fd_send[i] = -1;
153 1.1 prlw1 }
154 1.1 prlw1 }
155 1.1 prlw1 }
156 1.1 prlw1
157 1.1 prlw1 /* handle command from nsd to dt collector.
158 1.1 prlw1 * mostly, check for fd closed, this means we have to exit */
159 1.1 prlw1 void
160 1.1 prlw1 dt_handle_cmd_from_nsd(int ATTR_UNUSED(fd), short event, void* arg)
161 1.1 prlw1 {
162 1.1 prlw1 struct dt_collector* dt_col = (struct dt_collector*)arg;
163 1.1 prlw1 if((event&EV_READ) != 0) {
164 1.1 prlw1 event_base_loopexit(dt_col->event_base, NULL);
165 1.1 prlw1 }
166 1.1 prlw1 }
167 1.1 prlw1
168 1.1.1.2 christos /* receive data from fd into buffer, 1 when message received, -1 on error */
169 1.1.1.2 christos static int recv_into_buffer(int fd, struct buffer* buf)
170 1.1 prlw1 {
171 1.1 prlw1 size_t msglen;
172 1.1 prlw1 ssize_t r;
173 1.1 prlw1
174 1.1.1.2 christos assert(buffer_position(buf) == 0);
175 1.1.1.2 christos r = recv(fd, buffer_current(buf), buffer_capacity(buf), MSG_DONTWAIT);
176 1.1 prlw1 if(r == -1) {
177 1.1.1.2 christos if(errno == EAGAIN || errno == EINTR || errno == EMSGSIZE) {
178 1.1.1.2 christos /* continue to receive a message later */
179 1.1 prlw1 return 0;
180 1.1 prlw1 }
181 1.1.1.2 christos log_msg(LOG_ERR, "dnstap collector: receive failed: %s",
182 1.1 prlw1 strerror(errno));
183 1.1.1.2 christos return -1;
184 1.1.1.2 christos }
185 1.1.1.2 christos if(r == 0) {
186 1.1.1.2 christos /* Remote end closed the connection? */
187 1.1.1.2 christos log_msg(LOG_ERR, "dnstap collector: remote closed connection");
188 1.1.1.2 christos return -1;
189 1.1.1.2 christos }
190 1.1.1.2 christos assert(r > 4);
191 1.1.1.2 christos msglen = buffer_read_u32_at(buf, 0);
192 1.1.1.2 christos if(msglen != (size_t)(r - 4)) {
193 1.1.1.2 christos /* Is this still possible now the communication channel is of
194 1.1.1.2 christos * type SOCK_DGRAM? I think not, but better safe than sorry. */
195 1.1.1.2 christos log_msg(LOG_ERR, "dnstap collector: out of sync (msglen: %u)",
196 1.1.1.2 christos (unsigned int) msglen);
197 1.1 prlw1 return 0;
198 1.1 prlw1 }
199 1.1 prlw1 buffer_skip(buf, r);
200 1.1 prlw1 buffer_flip(buf);
201 1.1 prlw1 return 1;
202 1.1 prlw1 }
203 1.1 prlw1
204 1.1 prlw1 /* submit the content of the buffer received to dnstap */
205 1.1 prlw1 static void
206 1.1 prlw1 dt_submit_content(struct dt_env* dt_env, struct buffer* buf)
207 1.1 prlw1 {
208 1.1 prlw1 uint8_t is_response, is_tcp;
209 1.1 prlw1 #ifdef INET6
210 1.1.1.2 christos struct sockaddr_storage local_addr, addr;
211 1.1 prlw1 #else
212 1.1.1.2 christos struct sockaddr_in local_addr, addr;
213 1.1 prlw1 #endif
214 1.1 prlw1 socklen_t addrlen;
215 1.1 prlw1 size_t pktlen;
216 1.1 prlw1 uint8_t* data;
217 1.1 prlw1 size_t zonelen;
218 1.1 prlw1 uint8_t* zone;
219 1.1 prlw1
220 1.1 prlw1 /* parse content from buffer */
221 1.1 prlw1 if(!buffer_available(buf, 4+1+4)) return;
222 1.1 prlw1 buffer_skip(buf, 4); /* skip msglen */
223 1.1 prlw1 is_response = buffer_read_u8(buf);
224 1.1 prlw1 addrlen = buffer_read_u32(buf);
225 1.1.1.2 christos if(addrlen > sizeof(local_addr) || addrlen > sizeof(addr)) return;
226 1.1.1.2 christos if(!buffer_available(buf, 2*addrlen)) return;
227 1.1.1.2 christos buffer_read(buf, &local_addr, addrlen);
228 1.1 prlw1 buffer_read(buf, &addr, addrlen);
229 1.1 prlw1 if(!buffer_available(buf, 1+4)) return;
230 1.1 prlw1 is_tcp = buffer_read_u8(buf);
231 1.1 prlw1 pktlen = buffer_read_u32(buf);
232 1.1 prlw1 if(!buffer_available(buf, pktlen)) return;
233 1.1 prlw1 data = buffer_current(buf);
234 1.1 prlw1 buffer_skip(buf, pktlen);
235 1.1 prlw1 if(!buffer_available(buf, 4)) return;
236 1.1 prlw1 zonelen = buffer_read_u32(buf);
237 1.1 prlw1 if(zonelen == 0) {
238 1.1 prlw1 zone = NULL;
239 1.1 prlw1 } else {
240 1.1 prlw1 if(zonelen > MAXDOMAINLEN) return;
241 1.1 prlw1 if(!buffer_available(buf, zonelen)) return;
242 1.1 prlw1 zone = buffer_current(buf);
243 1.1 prlw1 buffer_skip(buf, zonelen);
244 1.1 prlw1 }
245 1.1 prlw1
246 1.1 prlw1 /* submit it */
247 1.1 prlw1 if(is_response) {
248 1.1.1.2 christos dt_msg_send_auth_response(dt_env, &local_addr, &addr, is_tcp, zone,
249 1.1 prlw1 zonelen, data, pktlen);
250 1.1 prlw1 } else {
251 1.1.1.2 christos dt_msg_send_auth_query(dt_env, &local_addr, &addr, is_tcp, zone,
252 1.1 prlw1 zonelen, data, pktlen);
253 1.1 prlw1 }
254 1.1 prlw1 }
255 1.1 prlw1
256 1.1 prlw1 /* handle input from worker for dnstap */
257 1.1 prlw1 void
258 1.1 prlw1 dt_handle_input(int fd, short event, void* arg)
259 1.1 prlw1 {
260 1.1 prlw1 struct dt_collector_input* dt_input = (struct dt_collector_input*)arg;
261 1.1 prlw1 if((event&EV_READ) != 0) {
262 1.1.1.2 christos /* receive */
263 1.1.1.2 christos int r = recv_into_buffer(fd, dt_input->buffer);
264 1.1.1.2 christos if(r == 0)
265 1.1 prlw1 return;
266 1.1.1.2 christos else if(r < 0) {
267 1.1.1.2 christos event_base_loopexit(dt_input->dt_collector->event_base, NULL);
268 1.1.1.2 christos return;
269 1.1.1.2 christos }
270 1.1.1.2 christos /* once data is complete, send it to dnstap */
271 1.1 prlw1 VERBOSITY(4, (LOG_INFO, "dnstap collector: received msg len %d",
272 1.1 prlw1 (int)buffer_remaining(dt_input->buffer)));
273 1.1 prlw1 if(dt_input->dt_collector->dt_env) {
274 1.1 prlw1 dt_submit_content(dt_input->dt_collector->dt_env,
275 1.1 prlw1 dt_input->buffer);
276 1.1 prlw1 }
277 1.1 prlw1
278 1.1 prlw1 /* clear buffer for next message */
279 1.1 prlw1 buffer_clear(dt_input->buffer);
280 1.1 prlw1 }
281 1.1 prlw1 }
282 1.1 prlw1
283 1.1 prlw1 /* init dnstap */
284 1.1 prlw1 static void dt_init_dnstap(struct dt_collector* dt_col, struct nsd* nsd)
285 1.1 prlw1 {
286 1.1 prlw1 int num_workers = 1;
287 1.1 prlw1 #ifdef HAVE_CHROOT
288 1.1 prlw1 if(nsd->chrootdir && nsd->chrootdir[0]) {
289 1.1 prlw1 int l = strlen(nsd->chrootdir)-1; /* ends in trailing slash */
290 1.1 prlw1 if (nsd->options->dnstap_socket_path &&
291 1.1 prlw1 nsd->options->dnstap_socket_path[0] == '/' &&
292 1.1 prlw1 strncmp(nsd->options->dnstap_socket_path,
293 1.1 prlw1 nsd->chrootdir, l) == 0)
294 1.1 prlw1 nsd->options->dnstap_socket_path += l;
295 1.1 prlw1 }
296 1.1 prlw1 #endif
297 1.1.1.3 christos dt_col->dt_env = dt_create(nsd->options->dnstap_socket_path,
298 1.1.1.3 christos nsd->options->dnstap_ip, num_workers, nsd->options->dnstap_tls,
299 1.1.1.3 christos nsd->options->dnstap_tls_server_name,
300 1.1.1.3 christos nsd->options->dnstap_tls_cert_bundle,
301 1.1.1.3 christos nsd->options->dnstap_tls_client_key_file,
302 1.1.1.3 christos nsd->options->dnstap_tls_client_cert_file);
303 1.1 prlw1 if(!dt_col->dt_env) {
304 1.1 prlw1 log_msg(LOG_ERR, "could not create dnstap env");
305 1.1 prlw1 return;
306 1.1 prlw1 }
307 1.1 prlw1 dt_apply_cfg(dt_col->dt_env, nsd->options);
308 1.1 prlw1 dt_init(dt_col->dt_env);
309 1.1 prlw1 }
310 1.1 prlw1
311 1.1 prlw1 /* cleanup dt collector process for exit */
312 1.1 prlw1 static void dt_collector_cleanup(struct dt_collector* dt_col, struct nsd* nsd)
313 1.1 prlw1 {
314 1.1 prlw1 int i;
315 1.1 prlw1 dt_delete(dt_col->dt_env);
316 1.1 prlw1 event_del(dt_col->cmd_event);
317 1.1 prlw1 for(i=0; i<dt_col->count; i++) {
318 1.1 prlw1 event_del(dt_col->inputs[i].event);
319 1.1 prlw1 }
320 1.1 prlw1 dt_collector_close(dt_col, nsd);
321 1.1 prlw1 event_base_free(dt_col->event_base);
322 1.1 prlw1 #ifdef MEMCLEAN
323 1.1 prlw1 free(dt_col->cmd_event);
324 1.1 prlw1 if(dt_col->inputs) {
325 1.1 prlw1 for(i=0; i<dt_col->count; i++) {
326 1.1 prlw1 free(dt_col->inputs[i].event);
327 1.1 prlw1 }
328 1.1 prlw1 free(dt_col->inputs);
329 1.1 prlw1 }
330 1.1 prlw1 dt_collector_destroy(dt_col, nsd);
331 1.1.1.3 christos daemon_remote_delete(nsd->rc); /* ssl-delete secret keys */
332 1.1.1.3 christos nsd_options_destroy(nsd->options);
333 1.1.1.3 christos region_destroy(nsd->region);
334 1.1 prlw1 #endif
335 1.1 prlw1 }
336 1.1 prlw1
337 1.1 prlw1 /* attach events to the event base to listen to the workers and cmd channel */
338 1.1 prlw1 static void dt_attach_events(struct dt_collector* dt_col, struct nsd* nsd)
339 1.1 prlw1 {
340 1.1 prlw1 int i;
341 1.1 prlw1 /* create event base */
342 1.1 prlw1 dt_col->event_base = nsd_child_event_base();
343 1.1 prlw1 if(!dt_col->event_base) {
344 1.1 prlw1 error("dnstap collector: event_base create failed");
345 1.1 prlw1 }
346 1.1 prlw1
347 1.1 prlw1 /* add command handler */
348 1.1 prlw1 dt_col->cmd_event = (struct event*)xalloc_zero(
349 1.1 prlw1 sizeof(*dt_col->cmd_event));
350 1.1 prlw1 event_set(dt_col->cmd_event, dt_col->cmd_socket_dt,
351 1.1 prlw1 EV_PERSIST|EV_READ, dt_handle_cmd_from_nsd, dt_col);
352 1.1 prlw1 if(event_base_set(dt_col->event_base, dt_col->cmd_event) != 0)
353 1.1 prlw1 log_msg(LOG_ERR, "dnstap collector: event_base_set failed");
354 1.1 prlw1 if(event_add(dt_col->cmd_event, NULL) != 0)
355 1.1 prlw1 log_msg(LOG_ERR, "dnstap collector: event_add failed");
356 1.1 prlw1
357 1.1 prlw1 /* add worker input handlers */
358 1.1 prlw1 dt_col->inputs = xalloc_array_zero(dt_col->count,
359 1.1 prlw1 sizeof(*dt_col->inputs));
360 1.1 prlw1 for(i=0; i<dt_col->count; i++) {
361 1.1 prlw1 dt_col->inputs[i].dt_collector = dt_col;
362 1.1 prlw1 dt_col->inputs[i].event = (struct event*)xalloc_zero(
363 1.1 prlw1 sizeof(struct event));
364 1.1 prlw1 event_set(dt_col->inputs[i].event,
365 1.1 prlw1 nsd->dt_collector_fd_recv[i], EV_PERSIST|EV_READ,
366 1.1 prlw1 dt_handle_input, &dt_col->inputs[i]);
367 1.1 prlw1 if(event_base_set(dt_col->event_base,
368 1.1 prlw1 dt_col->inputs[i].event) != 0)
369 1.1 prlw1 log_msg(LOG_ERR, "dnstap collector: event_base_set failed");
370 1.1 prlw1 if(event_add(dt_col->inputs[i].event, NULL) != 0)
371 1.1 prlw1 log_msg(LOG_ERR, "dnstap collector: event_add failed");
372 1.1 prlw1
373 1.1 prlw1 dt_col->inputs[i].buffer = buffer_create(dt_col->region,
374 1.1.1.2 christos /* msglen + is_response + addrlen + is_tcp + packetlen + packet + zonelen + zone + spare + local_addr + addr */
375 1.1 prlw1 4+1+4+1+4+TCP_MAX_MESSAGE_LEN+4+MAXHOSTNAMELEN + 32 +
376 1.1 prlw1 #ifdef INET6
377 1.1.1.2 christos sizeof(struct sockaddr_storage) + sizeof(struct sockaddr_storage)
378 1.1 prlw1 #else
379 1.1.1.2 christos sizeof(struct sockaddr_in) + sizeof(struct sockaddr_in)
380 1.1 prlw1 #endif
381 1.1 prlw1 );
382 1.1 prlw1 assert(buffer_capacity(dt_col->inputs[i].buffer) ==
383 1.1 prlw1 buffer_capacity(dt_col->send_buffer));
384 1.1 prlw1 }
385 1.1 prlw1 }
386 1.1 prlw1
387 1.1 prlw1 /* the dnstap collector process main routine */
388 1.1 prlw1 static void dt_collector_run(struct dt_collector* dt_col, struct nsd* nsd)
389 1.1 prlw1 {
390 1.1 prlw1 /* init dnstap */
391 1.1 prlw1 VERBOSITY(1, (LOG_INFO, "dnstap collector started"));
392 1.1 prlw1 dt_init_dnstap(dt_col, nsd);
393 1.1 prlw1 dt_attach_events(dt_col, nsd);
394 1.1 prlw1
395 1.1 prlw1 /* run */
396 1.1 prlw1 if(event_base_loop(dt_col->event_base, 0) == -1) {
397 1.1 prlw1 error("dnstap collector: event_base_loop failed");
398 1.1 prlw1 }
399 1.1 prlw1
400 1.1 prlw1 /* cleanup and done */
401 1.1 prlw1 VERBOSITY(1, (LOG_INFO, "dnstap collector stopped"));
402 1.1 prlw1 dt_collector_cleanup(dt_col, nsd);
403 1.1 prlw1 exit(0);
404 1.1 prlw1 }
405 1.1 prlw1
406 1.1 prlw1 void dt_collector_start(struct dt_collector* dt_col, struct nsd* nsd)
407 1.1 prlw1 {
408 1.1.1.2 christos int i, *fd_send;
409 1.1 prlw1 /* fork */
410 1.1 prlw1 dt_col->dt_pid = fork();
411 1.1 prlw1 if(dt_col->dt_pid == -1) {
412 1.1 prlw1 error("dnstap_collector: fork failed: %s", strerror(errno));
413 1.1 prlw1 }
414 1.1 prlw1 if(dt_col->dt_pid == 0) {
415 1.1 prlw1 /* the dt collector process is this */
416 1.1 prlw1 /* close the nsd side of the command channel */
417 1.1 prlw1 close(dt_col->cmd_socket_nsd);
418 1.1 prlw1 dt_col->cmd_socket_nsd = -1;
419 1.1.1.2 christos
420 1.1.1.2 christos /* close the send side of the communication channels */
421 1.1.1.2 christos assert(nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap);
422 1.1.1.2 christos fd_send = nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap
423 1.1.1.2 christos ? nsd->dt_collector_fd_send : nsd->dt_collector_fd_swap;
424 1.1.1.2 christos for(i=0; i<dt_col->count; i++) {
425 1.1.1.2 christos if(fd_send[i] != -1) {
426 1.1.1.2 christos close(fd_send[i]);
427 1.1.1.2 christos fd_send[i] = -1;
428 1.1.1.2 christos }
429 1.1.1.2 christos }
430 1.1.1.2 christos #ifdef HAVE_SETPROCTITLE
431 1.1.1.2 christos setproctitle("dnstap_collector");
432 1.1.1.2 christos #endif
433 1.1.1.2 christos /* Free serve process specific memory pages */
434 1.1.1.2 christos #ifdef RATELIMIT
435 1.1.1.2 christos rrl_mmap_deinit_keep_mmap();
436 1.1.1.2 christos #endif
437 1.1.1.2 christos udb_base_free_keep_mmap(nsd->task[0]);
438 1.1.1.2 christos udb_base_free_keep_mmap(nsd->task[1]);
439 1.1.1.2 christos namedb_close(nsd->db);
440 1.1.1.2 christos
441 1.1 prlw1 dt_collector_run(dt_col, nsd);
442 1.1 prlw1 /* NOTREACH */
443 1.1 prlw1 exit(0);
444 1.1 prlw1 } else {
445 1.1 prlw1 /* the parent continues on, with starting NSD */
446 1.1 prlw1 /* close the dt side of the command channel */
447 1.1 prlw1 close(dt_col->cmd_socket_dt);
448 1.1 prlw1 dt_col->cmd_socket_dt = -1;
449 1.1.1.2 christos
450 1.1.1.2 christos /* close the receive side of the communication channels */
451 1.1.1.2 christos for(i=0; i<dt_col->count; i++) {
452 1.1.1.2 christos if(nsd->dt_collector_fd_recv[i] != -1) {
453 1.1.1.2 christos close(nsd->dt_collector_fd_recv[i]);
454 1.1.1.2 christos nsd->dt_collector_fd_recv[i] = -1;
455 1.1.1.2 christos }
456 1.1.1.2 christos }
457 1.1 prlw1 }
458 1.1 prlw1 }
459 1.1 prlw1
460 1.1 prlw1 /* put data for sending to the collector process into the buffer */
461 1.1 prlw1 static int
462 1.1 prlw1 prep_send_data(struct buffer* buf, uint8_t is_response,
463 1.1 prlw1 #ifdef INET6
464 1.1.1.2 christos struct sockaddr_storage* local_addr,
465 1.1 prlw1 struct sockaddr_storage* addr,
466 1.1 prlw1 #else
467 1.1.1.2 christos struct sockaddr_in* local_addr,
468 1.1 prlw1 struct sockaddr_in* addr,
469 1.1 prlw1 #endif
470 1.1 prlw1 socklen_t addrlen, int is_tcp, struct buffer* packet,
471 1.1 prlw1 struct zone* zone)
472 1.1 prlw1 {
473 1.1 prlw1 buffer_clear(buf);
474 1.1.1.2 christos #ifdef INET6
475 1.1.1.2 christos if(local_addr->ss_family != addr->ss_family)
476 1.1.1.2 christos return 0; /* must be same length to send */
477 1.1.1.2 christos #else
478 1.1.1.2 christos if(local_addr->sin_family != addr->sin_family)
479 1.1.1.2 christos return 0; /* must be same length to send */
480 1.1.1.2 christos #endif
481 1.1.1.2 christos if(!buffer_available(buf, 4+1+4+2*addrlen+1+4+buffer_remaining(packet)))
482 1.1 prlw1 return 0; /* does not fit in send_buffer, log is dropped */
483 1.1 prlw1 buffer_skip(buf, 4); /* the length of the message goes here */
484 1.1 prlw1 buffer_write_u8(buf, is_response);
485 1.1 prlw1 buffer_write_u32(buf, addrlen);
486 1.1.1.2 christos buffer_write(buf, local_addr, (size_t)addrlen);
487 1.1 prlw1 buffer_write(buf, addr, (size_t)addrlen);
488 1.1 prlw1 buffer_write_u8(buf, (is_tcp?1:0));
489 1.1 prlw1 buffer_write_u32(buf, buffer_remaining(packet));
490 1.1 prlw1 buffer_write(buf, buffer_begin(packet), buffer_remaining(packet));
491 1.1 prlw1 if(zone && zone->apex && domain_dname(zone->apex)) {
492 1.1 prlw1 if(!buffer_available(buf, 4 + domain_dname(zone->apex)->name_size))
493 1.1 prlw1 return 0;
494 1.1 prlw1 buffer_write_u32(buf, domain_dname(zone->apex)->name_size);
495 1.1 prlw1 buffer_write(buf, dname_name(domain_dname(zone->apex)),
496 1.1 prlw1 domain_dname(zone->apex)->name_size);
497 1.1 prlw1 } else {
498 1.1 prlw1 if(!buffer_available(buf, 4))
499 1.1 prlw1 return 0;
500 1.1 prlw1 buffer_write_u32(buf, 0);
501 1.1 prlw1 }
502 1.1 prlw1
503 1.1 prlw1 buffer_flip(buf);
504 1.1 prlw1 /* write length of message */
505 1.1 prlw1 buffer_write_u32_at(buf, 0, buffer_remaining(buf)-4);
506 1.1 prlw1 return 1;
507 1.1 prlw1 }
508 1.1 prlw1
509 1.1.1.2 christos /* attempt to send buffer to socket, if it blocks do not send it.
510 1.1.1.2 christos * return 0 on success, -1 on error */
511 1.1.1.2 christos static int attempt_to_send(int s, uint8_t* data, size_t len)
512 1.1 prlw1 {
513 1.1 prlw1 ssize_t r;
514 1.1.1.2 christos if(len == 0)
515 1.1.1.2 christos return 0;
516 1.1.1.2 christos r = send(s, data, len, MSG_DONTWAIT | MSG_NOSIGNAL);
517 1.1.1.2 christos if(r == -1) {
518 1.1.1.2 christos if(errno == EAGAIN || errno == EINTR ||
519 1.1.1.2 christos errno == ENOBUFS || errno == EMSGSIZE) {
520 1.1.1.2 christos /* check if pipe is full, if the nonblocking fd blocks,
521 1.1.1.2 christos * then drop the message */
522 1.1.1.2 christos return 0;
523 1.1 prlw1 }
524 1.1.1.2 christos /* some sort of error, print it */
525 1.1.1.2 christos log_msg(LOG_ERR, "dnstap collector: send failed: %s",
526 1.1.1.2 christos strerror(errno));
527 1.1.1.2 christos return -1;
528 1.1 prlw1 }
529 1.1.1.2 christos assert(r > 0);
530 1.1.1.2 christos if(r > 0) {
531 1.1.1.2 christos assert((size_t)r == len);
532 1.1.1.2 christos return 0;
533 1.1.1.2 christos }
534 1.1.1.2 christos /* Other end closed the channel? */
535 1.1.1.2 christos log_msg(LOG_ERR, "dnstap collector: server child closed the channel");
536 1.1.1.2 christos return -1;
537 1.1 prlw1 }
538 1.1 prlw1
539 1.1 prlw1 void dt_collector_submit_auth_query(struct nsd* nsd,
540 1.1 prlw1 #ifdef INET6
541 1.1.1.2 christos struct sockaddr_storage* local_addr,
542 1.1 prlw1 struct sockaddr_storage* addr,
543 1.1 prlw1 #else
544 1.1.1.2 christos struct sockaddr_in* local_addr,
545 1.1 prlw1 struct sockaddr_in* addr,
546 1.1 prlw1 #endif
547 1.1 prlw1 socklen_t addrlen, int is_tcp, struct buffer* packet)
548 1.1 prlw1 {
549 1.1 prlw1 if(!nsd->dt_collector) return;
550 1.1 prlw1 if(!nsd->options->dnstap_log_auth_query_messages) return;
551 1.1.1.2 christos if(nsd->dt_collector_fd_send[nsd->this_child->child_num] == -1) return;
552 1.1 prlw1 VERBOSITY(4, (LOG_INFO, "dnstap submit auth query"));
553 1.1 prlw1
554 1.1 prlw1 /* marshal data into send buffer */
555 1.1.1.2 christos if(!prep_send_data(nsd->dt_collector->send_buffer, 0, local_addr, addr, addrlen,
556 1.1 prlw1 is_tcp, packet, NULL))
557 1.1 prlw1 return; /* probably did not fit in buffer */
558 1.1 prlw1
559 1.1 prlw1 /* attempt to send data; do not block */
560 1.1.1.2 christos if(attempt_to_send(nsd->dt_collector_fd_send[nsd->this_child->child_num],
561 1.1.1.2 christos buffer_begin(nsd->dt_collector->send_buffer),
562 1.1.1.2 christos buffer_remaining(nsd->dt_collector->send_buffer))) {
563 1.1.1.2 christos /* Something went wrong sending to the socket. Don't send to
564 1.1.1.2 christos * this socket again. */
565 1.1.1.2 christos close(nsd->dt_collector_fd_send[nsd->this_child->child_num]);
566 1.1.1.2 christos nsd->dt_collector_fd_send[nsd->this_child->child_num] = -1;
567 1.1.1.2 christos }
568 1.1 prlw1 }
569 1.1 prlw1
570 1.1 prlw1 void dt_collector_submit_auth_response(struct nsd* nsd,
571 1.1 prlw1 #ifdef INET6
572 1.1.1.2 christos struct sockaddr_storage* local_addr,
573 1.1 prlw1 struct sockaddr_storage* addr,
574 1.1 prlw1 #else
575 1.1.1.2 christos struct sockaddr_in* local_addr,
576 1.1 prlw1 struct sockaddr_in* addr,
577 1.1 prlw1 #endif
578 1.1 prlw1 socklen_t addrlen, int is_tcp, struct buffer* packet,
579 1.1 prlw1 struct zone* zone)
580 1.1 prlw1 {
581 1.1 prlw1 if(!nsd->dt_collector) return;
582 1.1 prlw1 if(!nsd->options->dnstap_log_auth_response_messages) return;
583 1.1.1.2 christos if(nsd->dt_collector_fd_send[nsd->this_child->child_num] == -1) return;
584 1.1 prlw1 VERBOSITY(4, (LOG_INFO, "dnstap submit auth response"));
585 1.1 prlw1
586 1.1 prlw1 /* marshal data into send buffer */
587 1.1.1.2 christos if(!prep_send_data(nsd->dt_collector->send_buffer, 1, local_addr, addr, addrlen,
588 1.1 prlw1 is_tcp, packet, zone))
589 1.1 prlw1 return; /* probably did not fit in buffer */
590 1.1 prlw1
591 1.1 prlw1 /* attempt to send data; do not block */
592 1.1.1.2 christos if(attempt_to_send(nsd->dt_collector_fd_send[nsd->this_child->child_num],
593 1.1.1.2 christos buffer_begin(nsd->dt_collector->send_buffer),
594 1.1.1.2 christos buffer_remaining(nsd->dt_collector->send_buffer))) {
595 1.1.1.2 christos /* Something went wrong sending to the socket. Don't send to
596 1.1.1.2 christos * this socket again. */
597 1.1.1.2 christos close(nsd->dt_collector_fd_send[nsd->this_child->child_num]);
598 1.1.1.2 christos nsd->dt_collector_fd_send[nsd->this_child->child_num] = -1;
599 1.1.1.2 christos }
600 1.1 prlw1 }
601