1 1.1 prlw1 /* 2 1.1 prlw1 * dnstap/dnstap_collector.c -- nsd collector process for dnstap information 3 1.1 prlw1 * 4 1.1 prlw1 * Copyright (c) 2018, NLnet Labs. All rights reserved. 5 1.1 prlw1 * 6 1.1 prlw1 * See LICENSE for the license. 7 1.1 prlw1 * 8 1.1 prlw1 */ 9 1.1 prlw1 10 1.1 prlw1 #include "config.h" 11 1.1 prlw1 #include <sys/types.h> 12 1.1 prlw1 #include <sys/socket.h> 13 1.1 prlw1 #include <errno.h> 14 1.1 prlw1 #include <fcntl.h> 15 1.1 prlw1 #include <unistd.h> 16 1.1 prlw1 #ifndef USE_MINI_EVENT 17 1.1 prlw1 # ifdef HAVE_EVENT_H 18 1.1 prlw1 # include <event.h> 19 1.1 prlw1 # else 20 1.1 prlw1 # include <event2/event.h> 21 1.1 prlw1 # include "event2/event_struct.h" 22 1.1 prlw1 # include "event2/event_compat.h" 23 1.1 prlw1 # endif 24 1.1 prlw1 #else 25 1.1 prlw1 # include "mini_event.h" 26 1.1 prlw1 #endif 27 1.1 prlw1 #include "dnstap/dnstap_collector.h" 28 1.1 prlw1 #include "dnstap/dnstap.h" 29 1.1 prlw1 #include "util.h" 30 1.1 prlw1 #include "nsd.h" 31 1.1 prlw1 #include "region-allocator.h" 32 1.1 prlw1 #include "buffer.h" 33 1.1 prlw1 #include "namedb.h" 34 1.1 prlw1 #include "options.h" 35 1.1.1.3 christos #include "remote.h" 36 1.1 prlw1 37 1.1.1.2 christos #include "udb.h" 38 1.1.1.2 christos #include "rrl.h" 39 1.1.1.2 christos 40 1.1 prlw1 struct dt_collector* dt_collector_create(struct nsd* nsd) 41 1.1 prlw1 { 42 1.1 prlw1 int i, sv[2]; 43 1.1 prlw1 struct dt_collector* dt_col = (struct dt_collector*)xalloc_zero( 44 1.1 prlw1 sizeof(*dt_col)); 45 1.1.1.2 christos dt_col->count = nsd->child_count * 2; 46 1.1 prlw1 dt_col->dt_env = NULL; 47 1.1 prlw1 dt_col->region = region_create(xalloc, free); 48 1.1 prlw1 dt_col->send_buffer = buffer_create(dt_col->region, 49 1.1.1.2 christos /* msglen + is_response + addrlen + is_tcp + packetlen + packet + zonelen + zone + spare + local_addr + addr */ 50 1.1 prlw1 4+1+4+1+4+TCP_MAX_MESSAGE_LEN+4+MAXHOSTNAMELEN + 32 + 51 1.1 prlw1 #ifdef INET6 52 1.1.1.2 christos sizeof(struct sockaddr_storage) + sizeof(struct sockaddr_storage) 53 1.1 prlw1 #else 54 1.1.1.2 christos sizeof(struct sockaddr_in) + sizeof(struct sockaddr_in) 55 1.1 prlw1 #endif 56 1.1 prlw1 ); 57 1.1 prlw1 58 1.1.1.2 christos /* open communication channels in struct nsd */ 59 1.1 prlw1 nsd->dt_collector_fd_send = (int*)xalloc_array_zero(dt_col->count, 60 1.1 prlw1 sizeof(int)); 61 1.1 prlw1 nsd->dt_collector_fd_recv = (int*)xalloc_array_zero(dt_col->count, 62 1.1 prlw1 sizeof(int)); 63 1.1 prlw1 for(i=0; i<dt_col->count; i++) { 64 1.1.1.2 christos int sv[2]; 65 1.1.1.2 christos int bufsz = buffer_capacity(dt_col->send_buffer); 66 1.1.1.2 christos sv[0] = -1; /* For receiving by parent (dnstap-collector) */ 67 1.1.1.2 christos sv[1] = -1; /* For sending by child (server childs) */ 68 1.1.1.3 christos if(socketpair(AF_UNIX, SOCK_DGRAM 69 1.1.1.3 christos #ifdef SOCK_NONBLOCK 70 1.1.1.3 christos | SOCK_NONBLOCK 71 1.1.1.3 christos #endif 72 1.1.1.3 christos , 0, sv) < 0) { 73 1.1.1.2 christos error("dnstap_collector: cannot create communication channel: %s", 74 1.1 prlw1 strerror(errno)); 75 1.1 prlw1 } 76 1.1.1.3 christos #ifndef SOCK_NONBLOCK 77 1.1.1.3 christos if (fcntl(sv[0], F_SETFL, O_NONBLOCK) == -1) { 78 1.1.1.3 christos log_msg(LOG_ERR, "dnstap_collector receive fd fcntl " 79 1.1.1.3 christos "failed: %s", strerror(errno)); 80 1.1.1.3 christos } 81 1.1.1.3 christos if (fcntl(sv[1], F_SETFL, O_NONBLOCK) == -1) { 82 1.1.1.3 christos log_msg(LOG_ERR, "dnstap_collector send fd fcntl " 83 1.1.1.3 christos "failed: %s", strerror(errno)); 84 1.1.1.3 christos } 85 1.1.1.3 christos #endif 86 1.1.1.2 christos if(setsockopt(sv[0], SOL_SOCKET, SO_RCVBUF, &bufsz, sizeof(bufsz))) { 87 1.1.1.2 christos log_msg(LOG_ERR, "setting dnstap_collector " 88 1.1.1.2 christos "receive buffer size failed: %s", strerror(errno)); 89 1.1 prlw1 } 90 1.1.1.2 christos if(setsockopt(sv[1], SOL_SOCKET, SO_SNDBUF, &bufsz, sizeof(bufsz))) { 91 1.1.1.2 christos log_msg(LOG_ERR, "setting dnstap_collector " 92 1.1.1.2 christos "send buffer size failed: %s", strerror(errno)); 93 1.1 prlw1 } 94 1.1.1.2 christos nsd->dt_collector_fd_recv[i] = sv[0]; 95 1.1.1.2 christos nsd->dt_collector_fd_send[i] = sv[1]; 96 1.1 prlw1 } 97 1.1.1.2 christos nsd->dt_collector_fd_swap = nsd->dt_collector_fd_send + nsd->child_count; 98 1.1 prlw1 99 1.1 prlw1 /* open socketpair */ 100 1.1 prlw1 if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) { 101 1.1 prlw1 error("dnstap_collector: cannot create socketpair: %s", 102 1.1 prlw1 strerror(errno)); 103 1.1 prlw1 } 104 1.1 prlw1 if(fcntl(sv[0], F_SETFL, O_NONBLOCK) == -1) { 105 1.1 prlw1 log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno)); 106 1.1 prlw1 } 107 1.1 prlw1 if(fcntl(sv[1], F_SETFL, O_NONBLOCK) == -1) { 108 1.1 prlw1 log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno)); 109 1.1 prlw1 } 110 1.1 prlw1 dt_col->cmd_socket_dt = sv[0]; 111 1.1 prlw1 dt_col->cmd_socket_nsd = sv[1]; 112 1.1 prlw1 113 1.1 prlw1 return dt_col; 114 1.1 prlw1 } 115 1.1 prlw1 116 1.1 prlw1 void dt_collector_destroy(struct dt_collector* dt_col, struct nsd* nsd) 117 1.1 prlw1 { 118 1.1 prlw1 if(!dt_col) return; 119 1.1 prlw1 free(nsd->dt_collector_fd_recv); 120 1.1 prlw1 nsd->dt_collector_fd_recv = NULL; 121 1.1.1.2 christos if (nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap) 122 1.1.1.2 christos free(nsd->dt_collector_fd_send); 123 1.1.1.2 christos else 124 1.1.1.2 christos free(nsd->dt_collector_fd_swap); 125 1.1 prlw1 nsd->dt_collector_fd_send = NULL; 126 1.1.1.2 christos nsd->dt_collector_fd_swap = NULL; 127 1.1 prlw1 region_destroy(dt_col->region); 128 1.1 prlw1 free(dt_col); 129 1.1 prlw1 } 130 1.1 prlw1 131 1.1 prlw1 void dt_collector_close(struct dt_collector* dt_col, struct nsd* nsd) 132 1.1 prlw1 { 133 1.1.1.2 christos int i, *fd_send; 134 1.1 prlw1 if(!dt_col) return; 135 1.1 prlw1 if(dt_col->cmd_socket_dt != -1) { 136 1.1 prlw1 close(dt_col->cmd_socket_dt); 137 1.1 prlw1 dt_col->cmd_socket_dt = -1; 138 1.1 prlw1 } 139 1.1 prlw1 if(dt_col->cmd_socket_nsd != -1) { 140 1.1 prlw1 close(dt_col->cmd_socket_nsd); 141 1.1 prlw1 dt_col->cmd_socket_nsd = -1; 142 1.1 prlw1 } 143 1.1.1.2 christos fd_send = nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap 144 1.1.1.2 christos ? nsd->dt_collector_fd_send : nsd->dt_collector_fd_swap; 145 1.1 prlw1 for(i=0; i<dt_col->count; i++) { 146 1.1 prlw1 if(nsd->dt_collector_fd_recv[i] != -1) { 147 1.1 prlw1 close(nsd->dt_collector_fd_recv[i]); 148 1.1 prlw1 nsd->dt_collector_fd_recv[i] = -1; 149 1.1 prlw1 } 150 1.1.1.2 christos if(fd_send[i] != -1) { 151 1.1.1.2 christos close(fd_send[i]); 152 1.1.1.2 christos fd_send[i] = -1; 153 1.1 prlw1 } 154 1.1 prlw1 } 155 1.1 prlw1 } 156 1.1 prlw1 157 1.1 prlw1 /* handle command from nsd to dt collector. 158 1.1 prlw1 * mostly, check for fd closed, this means we have to exit */ 159 1.1 prlw1 void 160 1.1 prlw1 dt_handle_cmd_from_nsd(int ATTR_UNUSED(fd), short event, void* arg) 161 1.1 prlw1 { 162 1.1 prlw1 struct dt_collector* dt_col = (struct dt_collector*)arg; 163 1.1 prlw1 if((event&EV_READ) != 0) { 164 1.1 prlw1 event_base_loopexit(dt_col->event_base, NULL); 165 1.1 prlw1 } 166 1.1 prlw1 } 167 1.1 prlw1 168 1.1.1.2 christos /* receive data from fd into buffer, 1 when message received, -1 on error */ 169 1.1.1.2 christos static int recv_into_buffer(int fd, struct buffer* buf) 170 1.1 prlw1 { 171 1.1 prlw1 size_t msglen; 172 1.1 prlw1 ssize_t r; 173 1.1 prlw1 174 1.1.1.2 christos assert(buffer_position(buf) == 0); 175 1.1.1.2 christos r = recv(fd, buffer_current(buf), buffer_capacity(buf), MSG_DONTWAIT); 176 1.1 prlw1 if(r == -1) { 177 1.1.1.2 christos if(errno == EAGAIN || errno == EINTR || errno == EMSGSIZE) { 178 1.1.1.2 christos /* continue to receive a message later */ 179 1.1 prlw1 return 0; 180 1.1 prlw1 } 181 1.1.1.2 christos log_msg(LOG_ERR, "dnstap collector: receive failed: %s", 182 1.1 prlw1 strerror(errno)); 183 1.1.1.2 christos return -1; 184 1.1.1.2 christos } 185 1.1.1.2 christos if(r == 0) { 186 1.1.1.2 christos /* Remote end closed the connection? */ 187 1.1.1.2 christos log_msg(LOG_ERR, "dnstap collector: remote closed connection"); 188 1.1.1.2 christos return -1; 189 1.1.1.2 christos } 190 1.1.1.2 christos assert(r > 4); 191 1.1.1.2 christos msglen = buffer_read_u32_at(buf, 0); 192 1.1.1.2 christos if(msglen != (size_t)(r - 4)) { 193 1.1.1.2 christos /* Is this still possible now the communication channel is of 194 1.1.1.2 christos * type SOCK_DGRAM? I think not, but better safe than sorry. */ 195 1.1.1.2 christos log_msg(LOG_ERR, "dnstap collector: out of sync (msglen: %u)", 196 1.1.1.2 christos (unsigned int) msglen); 197 1.1 prlw1 return 0; 198 1.1 prlw1 } 199 1.1 prlw1 buffer_skip(buf, r); 200 1.1 prlw1 buffer_flip(buf); 201 1.1 prlw1 return 1; 202 1.1 prlw1 } 203 1.1 prlw1 204 1.1 prlw1 /* submit the content of the buffer received to dnstap */ 205 1.1 prlw1 static void 206 1.1 prlw1 dt_submit_content(struct dt_env* dt_env, struct buffer* buf) 207 1.1 prlw1 { 208 1.1 prlw1 uint8_t is_response, is_tcp; 209 1.1 prlw1 #ifdef INET6 210 1.1.1.2 christos struct sockaddr_storage local_addr, addr; 211 1.1 prlw1 #else 212 1.1.1.2 christos struct sockaddr_in local_addr, addr; 213 1.1 prlw1 #endif 214 1.1 prlw1 socklen_t addrlen; 215 1.1 prlw1 size_t pktlen; 216 1.1 prlw1 uint8_t* data; 217 1.1 prlw1 size_t zonelen; 218 1.1 prlw1 uint8_t* zone; 219 1.1 prlw1 220 1.1 prlw1 /* parse content from buffer */ 221 1.1 prlw1 if(!buffer_available(buf, 4+1+4)) return; 222 1.1 prlw1 buffer_skip(buf, 4); /* skip msglen */ 223 1.1 prlw1 is_response = buffer_read_u8(buf); 224 1.1 prlw1 addrlen = buffer_read_u32(buf); 225 1.1.1.2 christos if(addrlen > sizeof(local_addr) || addrlen > sizeof(addr)) return; 226 1.1.1.2 christos if(!buffer_available(buf, 2*addrlen)) return; 227 1.1.1.2 christos buffer_read(buf, &local_addr, addrlen); 228 1.1 prlw1 buffer_read(buf, &addr, addrlen); 229 1.1 prlw1 if(!buffer_available(buf, 1+4)) return; 230 1.1 prlw1 is_tcp = buffer_read_u8(buf); 231 1.1 prlw1 pktlen = buffer_read_u32(buf); 232 1.1 prlw1 if(!buffer_available(buf, pktlen)) return; 233 1.1 prlw1 data = buffer_current(buf); 234 1.1 prlw1 buffer_skip(buf, pktlen); 235 1.1 prlw1 if(!buffer_available(buf, 4)) return; 236 1.1 prlw1 zonelen = buffer_read_u32(buf); 237 1.1 prlw1 if(zonelen == 0) { 238 1.1 prlw1 zone = NULL; 239 1.1 prlw1 } else { 240 1.1 prlw1 if(zonelen > MAXDOMAINLEN) return; 241 1.1 prlw1 if(!buffer_available(buf, zonelen)) return; 242 1.1 prlw1 zone = buffer_current(buf); 243 1.1 prlw1 buffer_skip(buf, zonelen); 244 1.1 prlw1 } 245 1.1 prlw1 246 1.1 prlw1 /* submit it */ 247 1.1 prlw1 if(is_response) { 248 1.1.1.2 christos dt_msg_send_auth_response(dt_env, &local_addr, &addr, is_tcp, zone, 249 1.1 prlw1 zonelen, data, pktlen); 250 1.1 prlw1 } else { 251 1.1.1.2 christos dt_msg_send_auth_query(dt_env, &local_addr, &addr, is_tcp, zone, 252 1.1 prlw1 zonelen, data, pktlen); 253 1.1 prlw1 } 254 1.1 prlw1 } 255 1.1 prlw1 256 1.1 prlw1 /* handle input from worker for dnstap */ 257 1.1 prlw1 void 258 1.1 prlw1 dt_handle_input(int fd, short event, void* arg) 259 1.1 prlw1 { 260 1.1 prlw1 struct dt_collector_input* dt_input = (struct dt_collector_input*)arg; 261 1.1 prlw1 if((event&EV_READ) != 0) { 262 1.1.1.2 christos /* receive */ 263 1.1.1.2 christos int r = recv_into_buffer(fd, dt_input->buffer); 264 1.1.1.2 christos if(r == 0) 265 1.1 prlw1 return; 266 1.1.1.2 christos else if(r < 0) { 267 1.1.1.2 christos event_base_loopexit(dt_input->dt_collector->event_base, NULL); 268 1.1.1.2 christos return; 269 1.1.1.2 christos } 270 1.1.1.2 christos /* once data is complete, send it to dnstap */ 271 1.1 prlw1 VERBOSITY(4, (LOG_INFO, "dnstap collector: received msg len %d", 272 1.1 prlw1 (int)buffer_remaining(dt_input->buffer))); 273 1.1 prlw1 if(dt_input->dt_collector->dt_env) { 274 1.1 prlw1 dt_submit_content(dt_input->dt_collector->dt_env, 275 1.1 prlw1 dt_input->buffer); 276 1.1 prlw1 } 277 1.1 prlw1 278 1.1 prlw1 /* clear buffer for next message */ 279 1.1 prlw1 buffer_clear(dt_input->buffer); 280 1.1 prlw1 } 281 1.1 prlw1 } 282 1.1 prlw1 283 1.1 prlw1 /* init dnstap */ 284 1.1 prlw1 static void dt_init_dnstap(struct dt_collector* dt_col, struct nsd* nsd) 285 1.1 prlw1 { 286 1.1 prlw1 int num_workers = 1; 287 1.1 prlw1 #ifdef HAVE_CHROOT 288 1.1 prlw1 if(nsd->chrootdir && nsd->chrootdir[0]) { 289 1.1 prlw1 int l = strlen(nsd->chrootdir)-1; /* ends in trailing slash */ 290 1.1 prlw1 if (nsd->options->dnstap_socket_path && 291 1.1 prlw1 nsd->options->dnstap_socket_path[0] == '/' && 292 1.1 prlw1 strncmp(nsd->options->dnstap_socket_path, 293 1.1 prlw1 nsd->chrootdir, l) == 0) 294 1.1 prlw1 nsd->options->dnstap_socket_path += l; 295 1.1 prlw1 } 296 1.1 prlw1 #endif 297 1.1.1.3 christos dt_col->dt_env = dt_create(nsd->options->dnstap_socket_path, 298 1.1.1.3 christos nsd->options->dnstap_ip, num_workers, nsd->options->dnstap_tls, 299 1.1.1.3 christos nsd->options->dnstap_tls_server_name, 300 1.1.1.3 christos nsd->options->dnstap_tls_cert_bundle, 301 1.1.1.3 christos nsd->options->dnstap_tls_client_key_file, 302 1.1.1.3 christos nsd->options->dnstap_tls_client_cert_file); 303 1.1 prlw1 if(!dt_col->dt_env) { 304 1.1 prlw1 log_msg(LOG_ERR, "could not create dnstap env"); 305 1.1 prlw1 return; 306 1.1 prlw1 } 307 1.1 prlw1 dt_apply_cfg(dt_col->dt_env, nsd->options); 308 1.1 prlw1 dt_init(dt_col->dt_env); 309 1.1 prlw1 } 310 1.1 prlw1 311 1.1 prlw1 /* cleanup dt collector process for exit */ 312 1.1 prlw1 static void dt_collector_cleanup(struct dt_collector* dt_col, struct nsd* nsd) 313 1.1 prlw1 { 314 1.1 prlw1 int i; 315 1.1 prlw1 dt_delete(dt_col->dt_env); 316 1.1 prlw1 event_del(dt_col->cmd_event); 317 1.1 prlw1 for(i=0; i<dt_col->count; i++) { 318 1.1 prlw1 event_del(dt_col->inputs[i].event); 319 1.1 prlw1 } 320 1.1 prlw1 dt_collector_close(dt_col, nsd); 321 1.1 prlw1 event_base_free(dt_col->event_base); 322 1.1 prlw1 #ifdef MEMCLEAN 323 1.1 prlw1 free(dt_col->cmd_event); 324 1.1 prlw1 if(dt_col->inputs) { 325 1.1 prlw1 for(i=0; i<dt_col->count; i++) { 326 1.1 prlw1 free(dt_col->inputs[i].event); 327 1.1 prlw1 } 328 1.1 prlw1 free(dt_col->inputs); 329 1.1 prlw1 } 330 1.1 prlw1 dt_collector_destroy(dt_col, nsd); 331 1.1.1.3 christos daemon_remote_delete(nsd->rc); /* ssl-delete secret keys */ 332 1.1.1.3 christos nsd_options_destroy(nsd->options); 333 1.1.1.3 christos region_destroy(nsd->region); 334 1.1 prlw1 #endif 335 1.1 prlw1 } 336 1.1 prlw1 337 1.1 prlw1 /* attach events to the event base to listen to the workers and cmd channel */ 338 1.1 prlw1 static void dt_attach_events(struct dt_collector* dt_col, struct nsd* nsd) 339 1.1 prlw1 { 340 1.1 prlw1 int i; 341 1.1 prlw1 /* create event base */ 342 1.1 prlw1 dt_col->event_base = nsd_child_event_base(); 343 1.1 prlw1 if(!dt_col->event_base) { 344 1.1 prlw1 error("dnstap collector: event_base create failed"); 345 1.1 prlw1 } 346 1.1 prlw1 347 1.1 prlw1 /* add command handler */ 348 1.1 prlw1 dt_col->cmd_event = (struct event*)xalloc_zero( 349 1.1 prlw1 sizeof(*dt_col->cmd_event)); 350 1.1 prlw1 event_set(dt_col->cmd_event, dt_col->cmd_socket_dt, 351 1.1 prlw1 EV_PERSIST|EV_READ, dt_handle_cmd_from_nsd, dt_col); 352 1.1 prlw1 if(event_base_set(dt_col->event_base, dt_col->cmd_event) != 0) 353 1.1 prlw1 log_msg(LOG_ERR, "dnstap collector: event_base_set failed"); 354 1.1 prlw1 if(event_add(dt_col->cmd_event, NULL) != 0) 355 1.1 prlw1 log_msg(LOG_ERR, "dnstap collector: event_add failed"); 356 1.1 prlw1 357 1.1 prlw1 /* add worker input handlers */ 358 1.1 prlw1 dt_col->inputs = xalloc_array_zero(dt_col->count, 359 1.1 prlw1 sizeof(*dt_col->inputs)); 360 1.1 prlw1 for(i=0; i<dt_col->count; i++) { 361 1.1 prlw1 dt_col->inputs[i].dt_collector = dt_col; 362 1.1 prlw1 dt_col->inputs[i].event = (struct event*)xalloc_zero( 363 1.1 prlw1 sizeof(struct event)); 364 1.1 prlw1 event_set(dt_col->inputs[i].event, 365 1.1 prlw1 nsd->dt_collector_fd_recv[i], EV_PERSIST|EV_READ, 366 1.1 prlw1 dt_handle_input, &dt_col->inputs[i]); 367 1.1 prlw1 if(event_base_set(dt_col->event_base, 368 1.1 prlw1 dt_col->inputs[i].event) != 0) 369 1.1 prlw1 log_msg(LOG_ERR, "dnstap collector: event_base_set failed"); 370 1.1 prlw1 if(event_add(dt_col->inputs[i].event, NULL) != 0) 371 1.1 prlw1 log_msg(LOG_ERR, "dnstap collector: event_add failed"); 372 1.1 prlw1 373 1.1 prlw1 dt_col->inputs[i].buffer = buffer_create(dt_col->region, 374 1.1.1.2 christos /* msglen + is_response + addrlen + is_tcp + packetlen + packet + zonelen + zone + spare + local_addr + addr */ 375 1.1 prlw1 4+1+4+1+4+TCP_MAX_MESSAGE_LEN+4+MAXHOSTNAMELEN + 32 + 376 1.1 prlw1 #ifdef INET6 377 1.1.1.2 christos sizeof(struct sockaddr_storage) + sizeof(struct sockaddr_storage) 378 1.1 prlw1 #else 379 1.1.1.2 christos sizeof(struct sockaddr_in) + sizeof(struct sockaddr_in) 380 1.1 prlw1 #endif 381 1.1 prlw1 ); 382 1.1 prlw1 assert(buffer_capacity(dt_col->inputs[i].buffer) == 383 1.1 prlw1 buffer_capacity(dt_col->send_buffer)); 384 1.1 prlw1 } 385 1.1 prlw1 } 386 1.1 prlw1 387 1.1 prlw1 /* the dnstap collector process main routine */ 388 1.1 prlw1 static void dt_collector_run(struct dt_collector* dt_col, struct nsd* nsd) 389 1.1 prlw1 { 390 1.1 prlw1 /* init dnstap */ 391 1.1 prlw1 VERBOSITY(1, (LOG_INFO, "dnstap collector started")); 392 1.1 prlw1 dt_init_dnstap(dt_col, nsd); 393 1.1 prlw1 dt_attach_events(dt_col, nsd); 394 1.1 prlw1 395 1.1 prlw1 /* run */ 396 1.1 prlw1 if(event_base_loop(dt_col->event_base, 0) == -1) { 397 1.1 prlw1 error("dnstap collector: event_base_loop failed"); 398 1.1 prlw1 } 399 1.1 prlw1 400 1.1 prlw1 /* cleanup and done */ 401 1.1 prlw1 VERBOSITY(1, (LOG_INFO, "dnstap collector stopped")); 402 1.1 prlw1 dt_collector_cleanup(dt_col, nsd); 403 1.1 prlw1 exit(0); 404 1.1 prlw1 } 405 1.1 prlw1 406 1.1 prlw1 void dt_collector_start(struct dt_collector* dt_col, struct nsd* nsd) 407 1.1 prlw1 { 408 1.1.1.2 christos int i, *fd_send; 409 1.1 prlw1 /* fork */ 410 1.1 prlw1 dt_col->dt_pid = fork(); 411 1.1 prlw1 if(dt_col->dt_pid == -1) { 412 1.1 prlw1 error("dnstap_collector: fork failed: %s", strerror(errno)); 413 1.1 prlw1 } 414 1.1 prlw1 if(dt_col->dt_pid == 0) { 415 1.1 prlw1 /* the dt collector process is this */ 416 1.1 prlw1 /* close the nsd side of the command channel */ 417 1.1 prlw1 close(dt_col->cmd_socket_nsd); 418 1.1 prlw1 dt_col->cmd_socket_nsd = -1; 419 1.1.1.2 christos 420 1.1.1.2 christos /* close the send side of the communication channels */ 421 1.1.1.2 christos assert(nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap); 422 1.1.1.2 christos fd_send = nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap 423 1.1.1.2 christos ? nsd->dt_collector_fd_send : nsd->dt_collector_fd_swap; 424 1.1.1.2 christos for(i=0; i<dt_col->count; i++) { 425 1.1.1.2 christos if(fd_send[i] != -1) { 426 1.1.1.2 christos close(fd_send[i]); 427 1.1.1.2 christos fd_send[i] = -1; 428 1.1.1.2 christos } 429 1.1.1.2 christos } 430 1.1.1.2 christos #ifdef HAVE_SETPROCTITLE 431 1.1.1.2 christos setproctitle("dnstap_collector"); 432 1.1.1.2 christos #endif 433 1.1.1.4 christos #ifdef USE_LOG_PROCESS_ROLE 434 1.1.1.4 christos log_set_process_role("dnstap_collector"); 435 1.1.1.4 christos #endif 436 1.1.1.2 christos /* Free serve process specific memory pages */ 437 1.1.1.2 christos #ifdef RATELIMIT 438 1.1.1.2 christos rrl_mmap_deinit_keep_mmap(); 439 1.1.1.2 christos #endif 440 1.1.1.2 christos udb_base_free_keep_mmap(nsd->task[0]); 441 1.1.1.2 christos udb_base_free_keep_mmap(nsd->task[1]); 442 1.1.1.2 christos namedb_close(nsd->db); 443 1.1.1.2 christos 444 1.1 prlw1 dt_collector_run(dt_col, nsd); 445 1.1 prlw1 /* NOTREACH */ 446 1.1 prlw1 exit(0); 447 1.1 prlw1 } else { 448 1.1 prlw1 /* the parent continues on, with starting NSD */ 449 1.1 prlw1 /* close the dt side of the command channel */ 450 1.1 prlw1 close(dt_col->cmd_socket_dt); 451 1.1 prlw1 dt_col->cmd_socket_dt = -1; 452 1.1.1.2 christos 453 1.1.1.2 christos /* close the receive side of the communication channels */ 454 1.1.1.2 christos for(i=0; i<dt_col->count; i++) { 455 1.1.1.2 christos if(nsd->dt_collector_fd_recv[i] != -1) { 456 1.1.1.2 christos close(nsd->dt_collector_fd_recv[i]); 457 1.1.1.2 christos nsd->dt_collector_fd_recv[i] = -1; 458 1.1.1.2 christos } 459 1.1.1.2 christos } 460 1.1 prlw1 } 461 1.1 prlw1 } 462 1.1 prlw1 463 1.1 prlw1 /* put data for sending to the collector process into the buffer */ 464 1.1 prlw1 static int 465 1.1 prlw1 prep_send_data(struct buffer* buf, uint8_t is_response, 466 1.1 prlw1 #ifdef INET6 467 1.1.1.2 christos struct sockaddr_storage* local_addr, 468 1.1 prlw1 struct sockaddr_storage* addr, 469 1.1 prlw1 #else 470 1.1.1.2 christos struct sockaddr_in* local_addr, 471 1.1 prlw1 struct sockaddr_in* addr, 472 1.1 prlw1 #endif 473 1.1 prlw1 socklen_t addrlen, int is_tcp, struct buffer* packet, 474 1.1 prlw1 struct zone* zone) 475 1.1 prlw1 { 476 1.1 prlw1 buffer_clear(buf); 477 1.1.1.2 christos #ifdef INET6 478 1.1.1.2 christos if(local_addr->ss_family != addr->ss_family) 479 1.1.1.2 christos return 0; /* must be same length to send */ 480 1.1.1.2 christos #else 481 1.1.1.2 christos if(local_addr->sin_family != addr->sin_family) 482 1.1.1.2 christos return 0; /* must be same length to send */ 483 1.1.1.2 christos #endif 484 1.1.1.2 christos if(!buffer_available(buf, 4+1+4+2*addrlen+1+4+buffer_remaining(packet))) 485 1.1 prlw1 return 0; /* does not fit in send_buffer, log is dropped */ 486 1.1 prlw1 buffer_skip(buf, 4); /* the length of the message goes here */ 487 1.1 prlw1 buffer_write_u8(buf, is_response); 488 1.1 prlw1 buffer_write_u32(buf, addrlen); 489 1.1.1.2 christos buffer_write(buf, local_addr, (size_t)addrlen); 490 1.1 prlw1 buffer_write(buf, addr, (size_t)addrlen); 491 1.1 prlw1 buffer_write_u8(buf, (is_tcp?1:0)); 492 1.1 prlw1 buffer_write_u32(buf, buffer_remaining(packet)); 493 1.1 prlw1 buffer_write(buf, buffer_begin(packet), buffer_remaining(packet)); 494 1.1 prlw1 if(zone && zone->apex && domain_dname(zone->apex)) { 495 1.1 prlw1 if(!buffer_available(buf, 4 + domain_dname(zone->apex)->name_size)) 496 1.1 prlw1 return 0; 497 1.1 prlw1 buffer_write_u32(buf, domain_dname(zone->apex)->name_size); 498 1.1 prlw1 buffer_write(buf, dname_name(domain_dname(zone->apex)), 499 1.1 prlw1 domain_dname(zone->apex)->name_size); 500 1.1 prlw1 } else { 501 1.1 prlw1 if(!buffer_available(buf, 4)) 502 1.1 prlw1 return 0; 503 1.1 prlw1 buffer_write_u32(buf, 0); 504 1.1 prlw1 } 505 1.1 prlw1 506 1.1 prlw1 buffer_flip(buf); 507 1.1 prlw1 /* write length of message */ 508 1.1 prlw1 buffer_write_u32_at(buf, 0, buffer_remaining(buf)-4); 509 1.1 prlw1 return 1; 510 1.1 prlw1 } 511 1.1 prlw1 512 1.1.1.2 christos /* attempt to send buffer to socket, if it blocks do not send it. 513 1.1.1.2 christos * return 0 on success, -1 on error */ 514 1.1.1.2 christos static int attempt_to_send(int s, uint8_t* data, size_t len) 515 1.1 prlw1 { 516 1.1 prlw1 ssize_t r; 517 1.1.1.2 christos if(len == 0) 518 1.1.1.2 christos return 0; 519 1.1.1.2 christos r = send(s, data, len, MSG_DONTWAIT | MSG_NOSIGNAL); 520 1.1.1.2 christos if(r == -1) { 521 1.1.1.2 christos if(errno == EAGAIN || errno == EINTR || 522 1.1.1.2 christos errno == ENOBUFS || errno == EMSGSIZE) { 523 1.1.1.2 christos /* check if pipe is full, if the nonblocking fd blocks, 524 1.1.1.2 christos * then drop the message */ 525 1.1.1.2 christos return 0; 526 1.1 prlw1 } 527 1.1.1.2 christos /* some sort of error, print it */ 528 1.1.1.2 christos log_msg(LOG_ERR, "dnstap collector: send failed: %s", 529 1.1.1.2 christos strerror(errno)); 530 1.1.1.2 christos return -1; 531 1.1 prlw1 } 532 1.1.1.2 christos assert(r > 0); 533 1.1.1.2 christos if(r > 0) { 534 1.1.1.2 christos assert((size_t)r == len); 535 1.1.1.2 christos return 0; 536 1.1.1.2 christos } 537 1.1.1.2 christos /* Other end closed the channel? */ 538 1.1.1.2 christos log_msg(LOG_ERR, "dnstap collector: server child closed the channel"); 539 1.1.1.2 christos return -1; 540 1.1 prlw1 } 541 1.1 prlw1 542 1.1 prlw1 void dt_collector_submit_auth_query(struct nsd* nsd, 543 1.1 prlw1 #ifdef INET6 544 1.1.1.2 christos struct sockaddr_storage* local_addr, 545 1.1 prlw1 struct sockaddr_storage* addr, 546 1.1 prlw1 #else 547 1.1.1.2 christos struct sockaddr_in* local_addr, 548 1.1 prlw1 struct sockaddr_in* addr, 549 1.1 prlw1 #endif 550 1.1 prlw1 socklen_t addrlen, int is_tcp, struct buffer* packet) 551 1.1 prlw1 { 552 1.1 prlw1 if(!nsd->dt_collector) return; 553 1.1 prlw1 if(!nsd->options->dnstap_log_auth_query_messages) return; 554 1.1.1.2 christos if(nsd->dt_collector_fd_send[nsd->this_child->child_num] == -1) return; 555 1.1 prlw1 VERBOSITY(4, (LOG_INFO, "dnstap submit auth query")); 556 1.1 prlw1 557 1.1 prlw1 /* marshal data into send buffer */ 558 1.1.1.2 christos if(!prep_send_data(nsd->dt_collector->send_buffer, 0, local_addr, addr, addrlen, 559 1.1 prlw1 is_tcp, packet, NULL)) 560 1.1 prlw1 return; /* probably did not fit in buffer */ 561 1.1 prlw1 562 1.1 prlw1 /* attempt to send data; do not block */ 563 1.1.1.2 christos if(attempt_to_send(nsd->dt_collector_fd_send[nsd->this_child->child_num], 564 1.1.1.2 christos buffer_begin(nsd->dt_collector->send_buffer), 565 1.1.1.2 christos buffer_remaining(nsd->dt_collector->send_buffer))) { 566 1.1.1.2 christos /* Something went wrong sending to the socket. Don't send to 567 1.1.1.2 christos * this socket again. */ 568 1.1.1.2 christos close(nsd->dt_collector_fd_send[nsd->this_child->child_num]); 569 1.1.1.2 christos nsd->dt_collector_fd_send[nsd->this_child->child_num] = -1; 570 1.1.1.2 christos } 571 1.1 prlw1 } 572 1.1 prlw1 573 1.1 prlw1 void dt_collector_submit_auth_response(struct nsd* nsd, 574 1.1 prlw1 #ifdef INET6 575 1.1.1.2 christos struct sockaddr_storage* local_addr, 576 1.1 prlw1 struct sockaddr_storage* addr, 577 1.1 prlw1 #else 578 1.1.1.2 christos struct sockaddr_in* local_addr, 579 1.1 prlw1 struct sockaddr_in* addr, 580 1.1 prlw1 #endif 581 1.1 prlw1 socklen_t addrlen, int is_tcp, struct buffer* packet, 582 1.1 prlw1 struct zone* zone) 583 1.1 prlw1 { 584 1.1 prlw1 if(!nsd->dt_collector) return; 585 1.1 prlw1 if(!nsd->options->dnstap_log_auth_response_messages) return; 586 1.1.1.2 christos if(nsd->dt_collector_fd_send[nsd->this_child->child_num] == -1) return; 587 1.1 prlw1 VERBOSITY(4, (LOG_INFO, "dnstap submit auth response")); 588 1.1 prlw1 589 1.1 prlw1 /* marshal data into send buffer */ 590 1.1.1.2 christos if(!prep_send_data(nsd->dt_collector->send_buffer, 1, local_addr, addr, addrlen, 591 1.1 prlw1 is_tcp, packet, zone)) 592 1.1 prlw1 return; /* probably did not fit in buffer */ 593 1.1 prlw1 594 1.1 prlw1 /* attempt to send data; do not block */ 595 1.1.1.2 christos if(attempt_to_send(nsd->dt_collector_fd_send[nsd->this_child->child_num], 596 1.1.1.2 christos buffer_begin(nsd->dt_collector->send_buffer), 597 1.1.1.2 christos buffer_remaining(nsd->dt_collector->send_buffer))) { 598 1.1.1.2 christos /* Something went wrong sending to the socket. Don't send to 599 1.1.1.2 christos * this socket again. */ 600 1.1.1.2 christos close(nsd->dt_collector_fd_send[nsd->this_child->child_num]); 601 1.1.1.2 christos nsd->dt_collector_fd_send[nsd->this_child->child_num] = -1; 602 1.1.1.2 christos } 603 1.1 prlw1 } 604