Home | History | Annotate | Line # | Download | only in dnstap
      1      1.1     prlw1 /*
      2      1.1     prlw1  * dnstap/dnstap_collector.c -- nsd collector process for dnstap information
      3      1.1     prlw1  *
      4      1.1     prlw1  * Copyright (c) 2018, NLnet Labs. All rights reserved.
      5      1.1     prlw1  *
      6      1.1     prlw1  * See LICENSE for the license.
      7      1.1     prlw1  *
      8      1.1     prlw1  */
      9      1.1     prlw1 
     10      1.1     prlw1 #include "config.h"
     11      1.1     prlw1 #include <sys/types.h>
     12      1.1     prlw1 #include <sys/socket.h>
     13      1.1     prlw1 #include <errno.h>
     14      1.1     prlw1 #include <fcntl.h>
     15      1.1     prlw1 #include <unistd.h>
     16      1.1     prlw1 #ifndef USE_MINI_EVENT
     17      1.1     prlw1 #  ifdef HAVE_EVENT_H
     18      1.1     prlw1 #    include <event.h>
     19      1.1     prlw1 #  else
     20      1.1     prlw1 #    include <event2/event.h>
     21      1.1     prlw1 #    include "event2/event_struct.h"
     22      1.1     prlw1 #    include "event2/event_compat.h"
     23      1.1     prlw1 #  endif
     24      1.1     prlw1 #else
     25      1.1     prlw1 #  include "mini_event.h"
     26      1.1     prlw1 #endif
     27      1.1     prlw1 #include "dnstap/dnstap_collector.h"
     28      1.1     prlw1 #include "dnstap/dnstap.h"
     29      1.1     prlw1 #include "util.h"
     30      1.1     prlw1 #include "nsd.h"
     31      1.1     prlw1 #include "region-allocator.h"
     32      1.1     prlw1 #include "buffer.h"
     33      1.1     prlw1 #include "namedb.h"
     34      1.1     prlw1 #include "options.h"
     35  1.1.1.3  christos #include "remote.h"
     36      1.1     prlw1 
     37  1.1.1.2  christos #include "udb.h"
     38  1.1.1.2  christos #include "rrl.h"
     39  1.1.1.2  christos 
     40      1.1     prlw1 struct dt_collector* dt_collector_create(struct nsd* nsd)
     41      1.1     prlw1 {
     42      1.1     prlw1 	int i, sv[2];
     43      1.1     prlw1 	struct dt_collector* dt_col = (struct dt_collector*)xalloc_zero(
     44      1.1     prlw1 		sizeof(*dt_col));
     45  1.1.1.2  christos 	dt_col->count = nsd->child_count * 2;
     46      1.1     prlw1 	dt_col->dt_env = NULL;
     47      1.1     prlw1 	dt_col->region = region_create(xalloc, free);
     48      1.1     prlw1 	dt_col->send_buffer = buffer_create(dt_col->region,
     49  1.1.1.2  christos 		/* msglen + is_response + addrlen + is_tcp + packetlen + packet + zonelen + zone + spare + local_addr + addr */
     50      1.1     prlw1 		4+1+4+1+4+TCP_MAX_MESSAGE_LEN+4+MAXHOSTNAMELEN + 32 +
     51      1.1     prlw1 #ifdef INET6
     52  1.1.1.2  christos 		sizeof(struct sockaddr_storage) + sizeof(struct sockaddr_storage)
     53      1.1     prlw1 #else
     54  1.1.1.2  christos 		sizeof(struct sockaddr_in) + sizeof(struct sockaddr_in)
     55      1.1     prlw1 #endif
     56      1.1     prlw1 		);
     57      1.1     prlw1 
     58  1.1.1.2  christos 	/* open communication channels in struct nsd */
     59      1.1     prlw1 	nsd->dt_collector_fd_send = (int*)xalloc_array_zero(dt_col->count,
     60      1.1     prlw1 		sizeof(int));
     61      1.1     prlw1 	nsd->dt_collector_fd_recv = (int*)xalloc_array_zero(dt_col->count,
     62      1.1     prlw1 		sizeof(int));
     63      1.1     prlw1 	for(i=0; i<dt_col->count; i++) {
     64  1.1.1.2  christos 		int sv[2];
     65  1.1.1.2  christos 		int bufsz = buffer_capacity(dt_col->send_buffer);
     66  1.1.1.2  christos 		sv[0] = -1; /* For receiving by parent (dnstap-collector) */
     67  1.1.1.2  christos 		sv[1] = -1; /* For sending   by child  (server childs) */
     68  1.1.1.3  christos 		if(socketpair(AF_UNIX, SOCK_DGRAM
     69  1.1.1.3  christos #ifdef SOCK_NONBLOCK
     70  1.1.1.3  christos 			| SOCK_NONBLOCK
     71  1.1.1.3  christos #endif
     72  1.1.1.3  christos 			, 0, sv) < 0) {
     73  1.1.1.2  christos 			error("dnstap_collector: cannot create communication channel: %s",
     74      1.1     prlw1 				strerror(errno));
     75      1.1     prlw1 		}
     76  1.1.1.3  christos #ifndef SOCK_NONBLOCK
     77  1.1.1.3  christos 		if (fcntl(sv[0], F_SETFL, O_NONBLOCK) == -1) {
     78  1.1.1.3  christos 			log_msg(LOG_ERR, "dnstap_collector receive fd fcntl "
     79  1.1.1.3  christos 				"failed: %s", strerror(errno));
     80  1.1.1.3  christos 		}
     81  1.1.1.3  christos 		if (fcntl(sv[1], F_SETFL, O_NONBLOCK) == -1) {
     82  1.1.1.3  christos 			log_msg(LOG_ERR, "dnstap_collector send fd fcntl "
     83  1.1.1.3  christos 				"failed: %s", strerror(errno));
     84  1.1.1.3  christos 		}
     85  1.1.1.3  christos #endif
     86  1.1.1.2  christos 		if(setsockopt(sv[0], SOL_SOCKET, SO_RCVBUF, &bufsz, sizeof(bufsz))) {
     87  1.1.1.2  christos 			log_msg(LOG_ERR, "setting dnstap_collector "
     88  1.1.1.2  christos 				"receive buffer size failed: %s", strerror(errno));
     89      1.1     prlw1 		}
     90  1.1.1.2  christos 		if(setsockopt(sv[1], SOL_SOCKET, SO_SNDBUF, &bufsz, sizeof(bufsz))) {
     91  1.1.1.2  christos 			log_msg(LOG_ERR, "setting dnstap_collector "
     92  1.1.1.2  christos 				"send buffer size failed: %s", strerror(errno));
     93      1.1     prlw1 		}
     94  1.1.1.2  christos 		nsd->dt_collector_fd_recv[i] = sv[0];
     95  1.1.1.2  christos 		nsd->dt_collector_fd_send[i] = sv[1];
     96      1.1     prlw1 	}
     97  1.1.1.2  christos 	nsd->dt_collector_fd_swap = nsd->dt_collector_fd_send + nsd->child_count;
     98      1.1     prlw1 
     99      1.1     prlw1 	/* open socketpair */
    100      1.1     prlw1 	if(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == -1) {
    101      1.1     prlw1 		error("dnstap_collector: cannot create socketpair: %s",
    102      1.1     prlw1 			strerror(errno));
    103      1.1     prlw1 	}
    104      1.1     prlw1 	if(fcntl(sv[0], F_SETFL, O_NONBLOCK) == -1) {
    105      1.1     prlw1 		log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno));
    106      1.1     prlw1 	}
    107      1.1     prlw1 	if(fcntl(sv[1], F_SETFL, O_NONBLOCK) == -1) {
    108      1.1     prlw1 		log_msg(LOG_ERR, "fcntl failed: %s", strerror(errno));
    109      1.1     prlw1 	}
    110      1.1     prlw1 	dt_col->cmd_socket_dt = sv[0];
    111      1.1     prlw1 	dt_col->cmd_socket_nsd = sv[1];
    112      1.1     prlw1 
    113      1.1     prlw1 	return dt_col;
    114      1.1     prlw1 }
    115      1.1     prlw1 
    116      1.1     prlw1 void dt_collector_destroy(struct dt_collector* dt_col, struct nsd* nsd)
    117      1.1     prlw1 {
    118      1.1     prlw1 	if(!dt_col) return;
    119      1.1     prlw1 	free(nsd->dt_collector_fd_recv);
    120      1.1     prlw1 	nsd->dt_collector_fd_recv = NULL;
    121  1.1.1.2  christos 	if (nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap)
    122  1.1.1.2  christos 		free(nsd->dt_collector_fd_send);
    123  1.1.1.2  christos 	else
    124  1.1.1.2  christos 		free(nsd->dt_collector_fd_swap);
    125      1.1     prlw1 	nsd->dt_collector_fd_send = NULL;
    126  1.1.1.2  christos 	nsd->dt_collector_fd_swap = NULL;
    127      1.1     prlw1 	region_destroy(dt_col->region);
    128      1.1     prlw1 	free(dt_col);
    129      1.1     prlw1 }
    130      1.1     prlw1 
    131      1.1     prlw1 void dt_collector_close(struct dt_collector* dt_col, struct nsd* nsd)
    132      1.1     prlw1 {
    133  1.1.1.2  christos 	int i, *fd_send;
    134      1.1     prlw1 	if(!dt_col) return;
    135      1.1     prlw1 	if(dt_col->cmd_socket_dt != -1) {
    136      1.1     prlw1 		close(dt_col->cmd_socket_dt);
    137      1.1     prlw1 		dt_col->cmd_socket_dt = -1;
    138      1.1     prlw1 	}
    139      1.1     prlw1 	if(dt_col->cmd_socket_nsd != -1) {
    140      1.1     prlw1 		close(dt_col->cmd_socket_nsd);
    141      1.1     prlw1 		dt_col->cmd_socket_nsd = -1;
    142      1.1     prlw1 	}
    143  1.1.1.2  christos 	fd_send = nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap
    144  1.1.1.2  christos 	        ? nsd->dt_collector_fd_send : nsd->dt_collector_fd_swap;
    145      1.1     prlw1 	for(i=0; i<dt_col->count; i++) {
    146      1.1     prlw1 		if(nsd->dt_collector_fd_recv[i] != -1) {
    147      1.1     prlw1 			close(nsd->dt_collector_fd_recv[i]);
    148      1.1     prlw1 			nsd->dt_collector_fd_recv[i] = -1;
    149      1.1     prlw1 		}
    150  1.1.1.2  christos 		if(fd_send[i] != -1) {
    151  1.1.1.2  christos 			close(fd_send[i]);
    152  1.1.1.2  christos 			fd_send[i] = -1;
    153      1.1     prlw1 		}
    154      1.1     prlw1 	}
    155      1.1     prlw1 }
    156      1.1     prlw1 
    157      1.1     prlw1 /* handle command from nsd to dt collector.
    158      1.1     prlw1  * mostly, check for fd closed, this means we have to exit */
    159      1.1     prlw1 void
    160      1.1     prlw1 dt_handle_cmd_from_nsd(int ATTR_UNUSED(fd), short event, void* arg)
    161      1.1     prlw1 {
    162      1.1     prlw1 	struct dt_collector* dt_col = (struct dt_collector*)arg;
    163      1.1     prlw1 	if((event&EV_READ) != 0) {
    164      1.1     prlw1 		event_base_loopexit(dt_col->event_base, NULL);
    165      1.1     prlw1 	}
    166      1.1     prlw1 }
    167      1.1     prlw1 
    168  1.1.1.2  christos /* receive data from fd into buffer, 1 when message received, -1 on error */
    169  1.1.1.2  christos static int recv_into_buffer(int fd, struct buffer* buf)
    170      1.1     prlw1 {
    171      1.1     prlw1 	size_t msglen;
    172      1.1     prlw1 	ssize_t r;
    173      1.1     prlw1 
    174  1.1.1.2  christos 	assert(buffer_position(buf) == 0);
    175  1.1.1.2  christos 	r = recv(fd, buffer_current(buf), buffer_capacity(buf), MSG_DONTWAIT);
    176      1.1     prlw1 	if(r == -1) {
    177  1.1.1.2  christos 		if(errno == EAGAIN || errno == EINTR || errno == EMSGSIZE) {
    178  1.1.1.2  christos 			/* continue to receive a message later */
    179      1.1     prlw1 			return 0;
    180      1.1     prlw1 		}
    181  1.1.1.2  christos 		log_msg(LOG_ERR, "dnstap collector: receive failed: %s",
    182      1.1     prlw1 			strerror(errno));
    183  1.1.1.2  christos 		return -1;
    184  1.1.1.2  christos 	}
    185  1.1.1.2  christos 	if(r == 0) {
    186  1.1.1.2  christos 		/* Remote end closed the connection? */
    187  1.1.1.2  christos 		log_msg(LOG_ERR, "dnstap collector: remote closed connection");
    188  1.1.1.2  christos 		return -1;
    189  1.1.1.2  christos 	}
    190  1.1.1.2  christos 	assert(r > 4);
    191  1.1.1.2  christos 	msglen = buffer_read_u32_at(buf, 0);
    192  1.1.1.2  christos 	if(msglen != (size_t)(r - 4)) {
    193  1.1.1.2  christos 		/* Is this still possible now the communication channel is of
    194  1.1.1.2  christos 		 * type SOCK_DGRAM? I think not, but better safe than sorry. */
    195  1.1.1.2  christos 		log_msg(LOG_ERR, "dnstap collector: out of sync (msglen: %u)",
    196  1.1.1.2  christos 			(unsigned int) msglen);
    197      1.1     prlw1 		return 0;
    198      1.1     prlw1 	}
    199      1.1     prlw1 	buffer_skip(buf, r);
    200      1.1     prlw1 	buffer_flip(buf);
    201      1.1     prlw1 	return 1;
    202      1.1     prlw1 }
    203      1.1     prlw1 
    204      1.1     prlw1 /* submit the content of the buffer received to dnstap */
    205      1.1     prlw1 static void
    206      1.1     prlw1 dt_submit_content(struct dt_env* dt_env, struct buffer* buf)
    207      1.1     prlw1 {
    208      1.1     prlw1 	uint8_t is_response, is_tcp;
    209      1.1     prlw1 #ifdef INET6
    210  1.1.1.2  christos 	struct sockaddr_storage local_addr, addr;
    211      1.1     prlw1 #else
    212  1.1.1.2  christos 	struct sockaddr_in local_addr, addr;
    213      1.1     prlw1 #endif
    214      1.1     prlw1 	socklen_t addrlen;
    215      1.1     prlw1 	size_t pktlen;
    216      1.1     prlw1 	uint8_t* data;
    217      1.1     prlw1 	size_t zonelen;
    218      1.1     prlw1 	uint8_t* zone;
    219      1.1     prlw1 
    220      1.1     prlw1 	/* parse content from buffer */
    221      1.1     prlw1 	if(!buffer_available(buf, 4+1+4)) return;
    222      1.1     prlw1 	buffer_skip(buf, 4); /* skip msglen */
    223      1.1     prlw1 	is_response = buffer_read_u8(buf);
    224      1.1     prlw1 	addrlen = buffer_read_u32(buf);
    225  1.1.1.2  christos 	if(addrlen > sizeof(local_addr) || addrlen > sizeof(addr)) return;
    226  1.1.1.2  christos 	if(!buffer_available(buf, 2*addrlen)) return;
    227  1.1.1.2  christos 	buffer_read(buf, &local_addr, addrlen);
    228      1.1     prlw1 	buffer_read(buf, &addr, addrlen);
    229      1.1     prlw1 	if(!buffer_available(buf, 1+4)) return;
    230      1.1     prlw1 	is_tcp = buffer_read_u8(buf);
    231      1.1     prlw1 	pktlen = buffer_read_u32(buf);
    232      1.1     prlw1 	if(!buffer_available(buf, pktlen)) return;
    233      1.1     prlw1 	data = buffer_current(buf);
    234      1.1     prlw1 	buffer_skip(buf, pktlen);
    235      1.1     prlw1 	if(!buffer_available(buf, 4)) return;
    236      1.1     prlw1 	zonelen = buffer_read_u32(buf);
    237      1.1     prlw1 	if(zonelen == 0) {
    238      1.1     prlw1 		zone = NULL;
    239      1.1     prlw1 	} else {
    240      1.1     prlw1 		if(zonelen > MAXDOMAINLEN) return;
    241      1.1     prlw1 		if(!buffer_available(buf, zonelen)) return;
    242      1.1     prlw1 		zone = buffer_current(buf);
    243      1.1     prlw1 		buffer_skip(buf, zonelen);
    244      1.1     prlw1 	}
    245      1.1     prlw1 
    246      1.1     prlw1 	/* submit it */
    247      1.1     prlw1 	if(is_response) {
    248  1.1.1.2  christos 		dt_msg_send_auth_response(dt_env, &local_addr, &addr, is_tcp, zone,
    249      1.1     prlw1 			zonelen, data, pktlen);
    250      1.1     prlw1 	} else {
    251  1.1.1.2  christos 		dt_msg_send_auth_query(dt_env, &local_addr, &addr, is_tcp, zone,
    252      1.1     prlw1 			zonelen, data, pktlen);
    253      1.1     prlw1 	}
    254      1.1     prlw1 }
    255      1.1     prlw1 
    256      1.1     prlw1 /* handle input from worker for dnstap */
    257      1.1     prlw1 void
    258      1.1     prlw1 dt_handle_input(int fd, short event, void* arg)
    259      1.1     prlw1 {
    260      1.1     prlw1 	struct dt_collector_input* dt_input = (struct dt_collector_input*)arg;
    261      1.1     prlw1 	if((event&EV_READ) != 0) {
    262  1.1.1.2  christos 		/* receive */
    263  1.1.1.2  christos 		int r = recv_into_buffer(fd, dt_input->buffer);
    264  1.1.1.2  christos 		if(r == 0)
    265      1.1     prlw1 			return;
    266  1.1.1.2  christos 		else if(r < 0) {
    267  1.1.1.2  christos 			event_base_loopexit(dt_input->dt_collector->event_base, NULL);
    268  1.1.1.2  christos 			return;
    269  1.1.1.2  christos 		}
    270  1.1.1.2  christos 		/* once data is complete, send it to dnstap */
    271      1.1     prlw1 		VERBOSITY(4, (LOG_INFO, "dnstap collector: received msg len %d",
    272      1.1     prlw1 			(int)buffer_remaining(dt_input->buffer)));
    273      1.1     prlw1 		if(dt_input->dt_collector->dt_env) {
    274      1.1     prlw1 			dt_submit_content(dt_input->dt_collector->dt_env,
    275      1.1     prlw1 				dt_input->buffer);
    276      1.1     prlw1 		}
    277      1.1     prlw1 
    278      1.1     prlw1 		/* clear buffer for next message */
    279      1.1     prlw1 		buffer_clear(dt_input->buffer);
    280      1.1     prlw1 	}
    281      1.1     prlw1 }
    282      1.1     prlw1 
    283      1.1     prlw1 /* init dnstap */
    284      1.1     prlw1 static void dt_init_dnstap(struct dt_collector* dt_col, struct nsd* nsd)
    285      1.1     prlw1 {
    286      1.1     prlw1 	int num_workers = 1;
    287      1.1     prlw1 #ifdef HAVE_CHROOT
    288      1.1     prlw1 	if(nsd->chrootdir && nsd->chrootdir[0]) {
    289      1.1     prlw1 		int l = strlen(nsd->chrootdir)-1; /* ends in trailing slash */
    290      1.1     prlw1 		if (nsd->options->dnstap_socket_path &&
    291      1.1     prlw1 			nsd->options->dnstap_socket_path[0] == '/' &&
    292      1.1     prlw1 			strncmp(nsd->options->dnstap_socket_path,
    293      1.1     prlw1 				nsd->chrootdir, l) == 0)
    294      1.1     prlw1 			nsd->options->dnstap_socket_path += l;
    295      1.1     prlw1 	}
    296      1.1     prlw1 #endif
    297  1.1.1.3  christos 	dt_col->dt_env = dt_create(nsd->options->dnstap_socket_path,
    298  1.1.1.3  christos 		nsd->options->dnstap_ip, num_workers, nsd->options->dnstap_tls,
    299  1.1.1.3  christos 		nsd->options->dnstap_tls_server_name,
    300  1.1.1.3  christos 		nsd->options->dnstap_tls_cert_bundle,
    301  1.1.1.3  christos 		nsd->options->dnstap_tls_client_key_file,
    302  1.1.1.3  christos 		nsd->options->dnstap_tls_client_cert_file);
    303      1.1     prlw1 	if(!dt_col->dt_env) {
    304      1.1     prlw1 		log_msg(LOG_ERR, "could not create dnstap env");
    305      1.1     prlw1 		return;
    306      1.1     prlw1 	}
    307      1.1     prlw1 	dt_apply_cfg(dt_col->dt_env, nsd->options);
    308      1.1     prlw1 	dt_init(dt_col->dt_env);
    309      1.1     prlw1 }
    310      1.1     prlw1 
    311      1.1     prlw1 /* cleanup dt collector process for exit */
    312      1.1     prlw1 static void dt_collector_cleanup(struct dt_collector* dt_col, struct nsd* nsd)
    313      1.1     prlw1 {
    314      1.1     prlw1 	int i;
    315      1.1     prlw1 	dt_delete(dt_col->dt_env);
    316      1.1     prlw1 	event_del(dt_col->cmd_event);
    317      1.1     prlw1 	for(i=0; i<dt_col->count; i++) {
    318      1.1     prlw1 		event_del(dt_col->inputs[i].event);
    319      1.1     prlw1 	}
    320      1.1     prlw1 	dt_collector_close(dt_col, nsd);
    321      1.1     prlw1 	event_base_free(dt_col->event_base);
    322      1.1     prlw1 #ifdef MEMCLEAN
    323      1.1     prlw1 	free(dt_col->cmd_event);
    324      1.1     prlw1 	if(dt_col->inputs) {
    325      1.1     prlw1 		for(i=0; i<dt_col->count; i++) {
    326      1.1     prlw1 			free(dt_col->inputs[i].event);
    327      1.1     prlw1 		}
    328      1.1     prlw1 		free(dt_col->inputs);
    329      1.1     prlw1 	}
    330      1.1     prlw1 	dt_collector_destroy(dt_col, nsd);
    331  1.1.1.3  christos 	daemon_remote_delete(nsd->rc); /* ssl-delete secret keys */
    332  1.1.1.3  christos 	nsd_options_destroy(nsd->options);
    333  1.1.1.3  christos 	region_destroy(nsd->region);
    334      1.1     prlw1 #endif
    335      1.1     prlw1 }
    336      1.1     prlw1 
    337      1.1     prlw1 /* attach events to the event base to listen to the workers and cmd channel */
    338      1.1     prlw1 static void dt_attach_events(struct dt_collector* dt_col, struct nsd* nsd)
    339      1.1     prlw1 {
    340      1.1     prlw1 	int i;
    341      1.1     prlw1 	/* create event base */
    342      1.1     prlw1 	dt_col->event_base = nsd_child_event_base();
    343      1.1     prlw1 	if(!dt_col->event_base) {
    344      1.1     prlw1 		error("dnstap collector: event_base create failed");
    345      1.1     prlw1 	}
    346      1.1     prlw1 
    347      1.1     prlw1 	/* add command handler */
    348      1.1     prlw1 	dt_col->cmd_event = (struct event*)xalloc_zero(
    349      1.1     prlw1 		sizeof(*dt_col->cmd_event));
    350      1.1     prlw1 	event_set(dt_col->cmd_event, dt_col->cmd_socket_dt,
    351      1.1     prlw1 		EV_PERSIST|EV_READ, dt_handle_cmd_from_nsd, dt_col);
    352      1.1     prlw1 	if(event_base_set(dt_col->event_base, dt_col->cmd_event) != 0)
    353      1.1     prlw1 		log_msg(LOG_ERR, "dnstap collector: event_base_set failed");
    354      1.1     prlw1 	if(event_add(dt_col->cmd_event, NULL) != 0)
    355      1.1     prlw1 		log_msg(LOG_ERR, "dnstap collector: event_add failed");
    356      1.1     prlw1 
    357      1.1     prlw1 	/* add worker input handlers */
    358      1.1     prlw1 	dt_col->inputs = xalloc_array_zero(dt_col->count,
    359      1.1     prlw1 		sizeof(*dt_col->inputs));
    360      1.1     prlw1 	for(i=0; i<dt_col->count; i++) {
    361      1.1     prlw1 		dt_col->inputs[i].dt_collector = dt_col;
    362      1.1     prlw1 		dt_col->inputs[i].event = (struct event*)xalloc_zero(
    363      1.1     prlw1 			sizeof(struct event));
    364      1.1     prlw1 		event_set(dt_col->inputs[i].event,
    365      1.1     prlw1 			nsd->dt_collector_fd_recv[i], EV_PERSIST|EV_READ,
    366      1.1     prlw1 			dt_handle_input, &dt_col->inputs[i]);
    367      1.1     prlw1 		if(event_base_set(dt_col->event_base,
    368      1.1     prlw1 			dt_col->inputs[i].event) != 0)
    369      1.1     prlw1 			log_msg(LOG_ERR, "dnstap collector: event_base_set failed");
    370      1.1     prlw1 		if(event_add(dt_col->inputs[i].event, NULL) != 0)
    371      1.1     prlw1 			log_msg(LOG_ERR, "dnstap collector: event_add failed");
    372      1.1     prlw1 
    373      1.1     prlw1 		dt_col->inputs[i].buffer = buffer_create(dt_col->region,
    374  1.1.1.2  christos 			/* msglen + is_response + addrlen + is_tcp + packetlen + packet + zonelen + zone + spare + local_addr + addr */
    375      1.1     prlw1 			4+1+4+1+4+TCP_MAX_MESSAGE_LEN+4+MAXHOSTNAMELEN + 32 +
    376      1.1     prlw1 #ifdef INET6
    377  1.1.1.2  christos 			sizeof(struct sockaddr_storage) + sizeof(struct sockaddr_storage)
    378      1.1     prlw1 #else
    379  1.1.1.2  christos 			sizeof(struct sockaddr_in) + sizeof(struct sockaddr_in)
    380      1.1     prlw1 #endif
    381      1.1     prlw1 		);
    382      1.1     prlw1 		assert(buffer_capacity(dt_col->inputs[i].buffer) ==
    383      1.1     prlw1 			buffer_capacity(dt_col->send_buffer));
    384      1.1     prlw1 	}
    385      1.1     prlw1 }
    386      1.1     prlw1 
    387      1.1     prlw1 /* the dnstap collector process main routine */
    388      1.1     prlw1 static void dt_collector_run(struct dt_collector* dt_col, struct nsd* nsd)
    389      1.1     prlw1 {
    390      1.1     prlw1 	/* init dnstap */
    391      1.1     prlw1 	VERBOSITY(1, (LOG_INFO, "dnstap collector started"));
    392      1.1     prlw1 	dt_init_dnstap(dt_col, nsd);
    393      1.1     prlw1 	dt_attach_events(dt_col, nsd);
    394      1.1     prlw1 
    395      1.1     prlw1 	/* run */
    396      1.1     prlw1 	if(event_base_loop(dt_col->event_base, 0) == -1) {
    397      1.1     prlw1 		error("dnstap collector: event_base_loop failed");
    398      1.1     prlw1 	}
    399      1.1     prlw1 
    400      1.1     prlw1 	/* cleanup and done */
    401      1.1     prlw1 	VERBOSITY(1, (LOG_INFO, "dnstap collector stopped"));
    402      1.1     prlw1 	dt_collector_cleanup(dt_col, nsd);
    403      1.1     prlw1 	exit(0);
    404      1.1     prlw1 }
    405      1.1     prlw1 
    406      1.1     prlw1 void dt_collector_start(struct dt_collector* dt_col, struct nsd* nsd)
    407      1.1     prlw1 {
    408  1.1.1.2  christos 	int i, *fd_send;
    409      1.1     prlw1 	/* fork */
    410      1.1     prlw1 	dt_col->dt_pid = fork();
    411      1.1     prlw1 	if(dt_col->dt_pid == -1) {
    412      1.1     prlw1 		error("dnstap_collector: fork failed: %s", strerror(errno));
    413      1.1     prlw1 	}
    414      1.1     prlw1 	if(dt_col->dt_pid == 0) {
    415      1.1     prlw1 		/* the dt collector process is this */
    416      1.1     prlw1 		/* close the nsd side of the command channel */
    417      1.1     prlw1 		close(dt_col->cmd_socket_nsd);
    418      1.1     prlw1 		dt_col->cmd_socket_nsd = -1;
    419  1.1.1.2  christos 
    420  1.1.1.2  christos 		/* close the send side of the communication channels */
    421  1.1.1.2  christos 		assert(nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap);
    422  1.1.1.2  christos 		fd_send = nsd->dt_collector_fd_send < nsd->dt_collector_fd_swap
    423  1.1.1.2  christos 			? nsd->dt_collector_fd_send : nsd->dt_collector_fd_swap;
    424  1.1.1.2  christos 		for(i=0; i<dt_col->count; i++) {
    425  1.1.1.2  christos 			if(fd_send[i] != -1) {
    426  1.1.1.2  christos 				close(fd_send[i]);
    427  1.1.1.2  christos 				fd_send[i] = -1;
    428  1.1.1.2  christos 			}
    429  1.1.1.2  christos 		}
    430  1.1.1.2  christos #ifdef HAVE_SETPROCTITLE
    431  1.1.1.2  christos 		setproctitle("dnstap_collector");
    432  1.1.1.2  christos #endif
    433  1.1.1.4  christos #ifdef USE_LOG_PROCESS_ROLE
    434  1.1.1.4  christos                 log_set_process_role("dnstap_collector");
    435  1.1.1.4  christos #endif
    436  1.1.1.2  christos 		/* Free serve process specific memory pages */
    437  1.1.1.2  christos #ifdef RATELIMIT
    438  1.1.1.2  christos 		rrl_mmap_deinit_keep_mmap();
    439  1.1.1.2  christos #endif
    440  1.1.1.2  christos 		udb_base_free_keep_mmap(nsd->task[0]);
    441  1.1.1.2  christos 		udb_base_free_keep_mmap(nsd->task[1]);
    442  1.1.1.2  christos 		namedb_close(nsd->db);
    443  1.1.1.2  christos 
    444      1.1     prlw1 		dt_collector_run(dt_col, nsd);
    445      1.1     prlw1 		/* NOTREACH */
    446      1.1     prlw1 		exit(0);
    447      1.1     prlw1 	} else {
    448      1.1     prlw1 		/* the parent continues on, with starting NSD */
    449      1.1     prlw1 		/* close the dt side of the command channel */
    450      1.1     prlw1 		close(dt_col->cmd_socket_dt);
    451      1.1     prlw1 		dt_col->cmd_socket_dt = -1;
    452  1.1.1.2  christos 
    453  1.1.1.2  christos 		/* close the receive side of the communication channels */
    454  1.1.1.2  christos 		for(i=0; i<dt_col->count; i++) {
    455  1.1.1.2  christos 			if(nsd->dt_collector_fd_recv[i] != -1) {
    456  1.1.1.2  christos 				close(nsd->dt_collector_fd_recv[i]);
    457  1.1.1.2  christos 				nsd->dt_collector_fd_recv[i] = -1;
    458  1.1.1.2  christos 			}
    459  1.1.1.2  christos 		}
    460      1.1     prlw1 	}
    461      1.1     prlw1 }
    462      1.1     prlw1 
    463      1.1     prlw1 /* put data for sending to the collector process into the buffer */
    464      1.1     prlw1 static int
    465      1.1     prlw1 prep_send_data(struct buffer* buf, uint8_t is_response,
    466      1.1     prlw1 #ifdef INET6
    467  1.1.1.2  christos 	struct sockaddr_storage* local_addr,
    468      1.1     prlw1 	struct sockaddr_storage* addr,
    469      1.1     prlw1 #else
    470  1.1.1.2  christos 	struct sockaddr_in* local_addr,
    471      1.1     prlw1 	struct sockaddr_in* addr,
    472      1.1     prlw1 #endif
    473      1.1     prlw1 	socklen_t addrlen, int is_tcp, struct buffer* packet,
    474      1.1     prlw1 	struct zone* zone)
    475      1.1     prlw1 {
    476      1.1     prlw1 	buffer_clear(buf);
    477  1.1.1.2  christos #ifdef INET6
    478  1.1.1.2  christos 	if(local_addr->ss_family != addr->ss_family)
    479  1.1.1.2  christos 		return 0; /* must be same length to send */
    480  1.1.1.2  christos #else
    481  1.1.1.2  christos 	if(local_addr->sin_family != addr->sin_family)
    482  1.1.1.2  christos 		return 0; /* must be same length to send */
    483  1.1.1.2  christos #endif
    484  1.1.1.2  christos 	if(!buffer_available(buf, 4+1+4+2*addrlen+1+4+buffer_remaining(packet)))
    485      1.1     prlw1 		return 0; /* does not fit in send_buffer, log is dropped */
    486      1.1     prlw1 	buffer_skip(buf, 4); /* the length of the message goes here */
    487      1.1     prlw1 	buffer_write_u8(buf, is_response);
    488      1.1     prlw1 	buffer_write_u32(buf, addrlen);
    489  1.1.1.2  christos 	buffer_write(buf, local_addr, (size_t)addrlen);
    490      1.1     prlw1 	buffer_write(buf, addr, (size_t)addrlen);
    491      1.1     prlw1 	buffer_write_u8(buf, (is_tcp?1:0));
    492      1.1     prlw1 	buffer_write_u32(buf, buffer_remaining(packet));
    493      1.1     prlw1 	buffer_write(buf, buffer_begin(packet), buffer_remaining(packet));
    494      1.1     prlw1 	if(zone && zone->apex && domain_dname(zone->apex)) {
    495      1.1     prlw1 		if(!buffer_available(buf, 4 + domain_dname(zone->apex)->name_size))
    496      1.1     prlw1 			return 0;
    497      1.1     prlw1 		buffer_write_u32(buf, domain_dname(zone->apex)->name_size);
    498      1.1     prlw1 		buffer_write(buf, dname_name(domain_dname(zone->apex)),
    499      1.1     prlw1 			domain_dname(zone->apex)->name_size);
    500      1.1     prlw1 	} else {
    501      1.1     prlw1 		if(!buffer_available(buf, 4))
    502      1.1     prlw1 			return 0;
    503      1.1     prlw1 		buffer_write_u32(buf, 0);
    504      1.1     prlw1 	}
    505      1.1     prlw1 
    506      1.1     prlw1 	buffer_flip(buf);
    507      1.1     prlw1 	/* write length of message */
    508      1.1     prlw1 	buffer_write_u32_at(buf, 0, buffer_remaining(buf)-4);
    509      1.1     prlw1 	return 1;
    510      1.1     prlw1 }
    511      1.1     prlw1 
    512  1.1.1.2  christos /* attempt to send buffer to socket, if it blocks do not send it.
    513  1.1.1.2  christos  * return 0 on success, -1 on error */
    514  1.1.1.2  christos static int attempt_to_send(int s, uint8_t* data, size_t len)
    515      1.1     prlw1 {
    516      1.1     prlw1 	ssize_t r;
    517  1.1.1.2  christos 	if(len == 0)
    518  1.1.1.2  christos 		return 0;
    519  1.1.1.2  christos 	r = send(s, data, len, MSG_DONTWAIT | MSG_NOSIGNAL);
    520  1.1.1.2  christos 	if(r == -1) {
    521  1.1.1.2  christos 		if(errno == EAGAIN || errno == EINTR ||
    522  1.1.1.2  christos 				errno == ENOBUFS || errno == EMSGSIZE) {
    523  1.1.1.2  christos 			/* check if pipe is full, if the nonblocking fd blocks,
    524  1.1.1.2  christos 			 * then drop the message */
    525  1.1.1.2  christos 			return 0;
    526      1.1     prlw1 		}
    527  1.1.1.2  christos 		/* some sort of error, print it */
    528  1.1.1.2  christos 		log_msg(LOG_ERR, "dnstap collector: send failed: %s",
    529  1.1.1.2  christos 			strerror(errno));
    530  1.1.1.2  christos 		return -1;
    531      1.1     prlw1 	}
    532  1.1.1.2  christos 	assert(r > 0);
    533  1.1.1.2  christos 	if(r > 0) {
    534  1.1.1.2  christos 		assert((size_t)r == len);
    535  1.1.1.2  christos 		return 0;
    536  1.1.1.2  christos 	}
    537  1.1.1.2  christos 	/* Other end closed the channel? */
    538  1.1.1.2  christos 	log_msg(LOG_ERR, "dnstap collector: server child closed the channel");
    539  1.1.1.2  christos 	return -1;
    540      1.1     prlw1 }
    541      1.1     prlw1 
    542      1.1     prlw1 void dt_collector_submit_auth_query(struct nsd* nsd,
    543      1.1     prlw1 #ifdef INET6
    544  1.1.1.2  christos 	struct sockaddr_storage* local_addr,
    545      1.1     prlw1 	struct sockaddr_storage* addr,
    546      1.1     prlw1 #else
    547  1.1.1.2  christos 	struct sockaddr_in* local_addr,
    548      1.1     prlw1 	struct sockaddr_in* addr,
    549      1.1     prlw1 #endif
    550      1.1     prlw1 	socklen_t addrlen, int is_tcp, struct buffer* packet)
    551      1.1     prlw1 {
    552      1.1     prlw1 	if(!nsd->dt_collector) return;
    553      1.1     prlw1 	if(!nsd->options->dnstap_log_auth_query_messages) return;
    554  1.1.1.2  christos 	if(nsd->dt_collector_fd_send[nsd->this_child->child_num] == -1) return;
    555      1.1     prlw1 	VERBOSITY(4, (LOG_INFO, "dnstap submit auth query"));
    556      1.1     prlw1 
    557      1.1     prlw1 	/* marshal data into send buffer */
    558  1.1.1.2  christos 	if(!prep_send_data(nsd->dt_collector->send_buffer, 0, local_addr, addr, addrlen,
    559      1.1     prlw1 		is_tcp, packet, NULL))
    560      1.1     prlw1 		return; /* probably did not fit in buffer */
    561      1.1     prlw1 
    562      1.1     prlw1 	/* attempt to send data; do not block */
    563  1.1.1.2  christos 	if(attempt_to_send(nsd->dt_collector_fd_send[nsd->this_child->child_num],
    564  1.1.1.2  christos 			buffer_begin(nsd->dt_collector->send_buffer),
    565  1.1.1.2  christos 			buffer_remaining(nsd->dt_collector->send_buffer))) {
    566  1.1.1.2  christos 		/* Something went wrong sending to the socket. Don't send to
    567  1.1.1.2  christos 		 * this socket again. */
    568  1.1.1.2  christos 		close(nsd->dt_collector_fd_send[nsd->this_child->child_num]);
    569  1.1.1.2  christos 		nsd->dt_collector_fd_send[nsd->this_child->child_num] = -1;
    570  1.1.1.2  christos 	}
    571      1.1     prlw1 }
    572      1.1     prlw1 
    573      1.1     prlw1 void dt_collector_submit_auth_response(struct nsd* nsd,
    574      1.1     prlw1 #ifdef INET6
    575  1.1.1.2  christos 	struct sockaddr_storage* local_addr,
    576      1.1     prlw1 	struct sockaddr_storage* addr,
    577      1.1     prlw1 #else
    578  1.1.1.2  christos 	struct sockaddr_in* local_addr,
    579      1.1     prlw1 	struct sockaddr_in* addr,
    580      1.1     prlw1 #endif
    581      1.1     prlw1 	socklen_t addrlen, int is_tcp, struct buffer* packet,
    582      1.1     prlw1 	struct zone* zone)
    583      1.1     prlw1 {
    584      1.1     prlw1 	if(!nsd->dt_collector) return;
    585      1.1     prlw1 	if(!nsd->options->dnstap_log_auth_response_messages) return;
    586  1.1.1.2  christos 	if(nsd->dt_collector_fd_send[nsd->this_child->child_num] == -1) return;
    587      1.1     prlw1 	VERBOSITY(4, (LOG_INFO, "dnstap submit auth response"));
    588      1.1     prlw1 
    589      1.1     prlw1 	/* marshal data into send buffer */
    590  1.1.1.2  christos 	if(!prep_send_data(nsd->dt_collector->send_buffer, 1, local_addr, addr, addrlen,
    591      1.1     prlw1 		is_tcp, packet, zone))
    592      1.1     prlw1 		return; /* probably did not fit in buffer */
    593      1.1     prlw1 
    594      1.1     prlw1 	/* attempt to send data; do not block */
    595  1.1.1.2  christos 	if(attempt_to_send(nsd->dt_collector_fd_send[nsd->this_child->child_num],
    596  1.1.1.2  christos 			buffer_begin(nsd->dt_collector->send_buffer),
    597  1.1.1.2  christos 			buffer_remaining(nsd->dt_collector->send_buffer))) {
    598  1.1.1.2  christos 		/* Something went wrong sending to the socket. Don't send to
    599  1.1.1.2  christos 		 * this socket again. */
    600  1.1.1.2  christos 		close(nsd->dt_collector_fd_send[nsd->this_child->child_num]);
    601  1.1.1.2  christos 		nsd->dt_collector_fd_send[nsd->this_child->child_num] = -1;
    602  1.1.1.2  christos 	}
    603      1.1     prlw1 }
    604