Home | History | Annotate | Line # | Download | only in clvmd
      1  1.2  dholland /*	$NetBSD: clvmd.c,v 1.2 2015/11/09 00:53:57 dholland Exp $	*/
      2  1.1      haad 
      3  1.1      haad /*
      4  1.1      haad  * Copyright (C) 2002-2004 Sistina Software, Inc. All rights reserved.
      5  1.2  dholland  * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
      6  1.1      haad  *
      7  1.1      haad  * This file is part of LVM2.
      8  1.1      haad  *
      9  1.1      haad  * This copyrighted material is made available to anyone wishing to use,
     10  1.1      haad  * modify, copy, or redistribute it subject to the terms and conditions
     11  1.1      haad  * of the GNU General Public License v.2.
     12  1.1      haad  *
     13  1.1      haad  * You should have received a copy of the GNU General Public License
     14  1.1      haad  * along with this program; if not, write to the Free Software Foundation,
     15  1.1      haad  * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
     16  1.1      haad  */
     17  1.1      haad 
     18  1.1      haad /*
     19  1.1      haad  * CLVMD: Cluster LVM daemon
     20  1.1      haad  */
     21  1.1      haad 
     22  1.1      haad #define _GNU_SOURCE
     23  1.1      haad #define _FILE_OFFSET_BITS 64
     24  1.1      haad 
     25  1.1      haad #include <configure.h>
     26  1.1      haad #include <libdevmapper.h>
     27  1.1      haad 
     28  1.1      haad #include <pthread.h>
     29  1.1      haad #include <sys/types.h>
     30  1.1      haad #include <sys/stat.h>
     31  1.1      haad #include <sys/socket.h>
     32  1.1      haad #include <sys/uio.h>
     33  1.1      haad #include <sys/un.h>
     34  1.1      haad #include <sys/time.h>
     35  1.1      haad #include <sys/ioctl.h>
     36  1.1      haad #include <sys/utsname.h>
     37  1.1      haad #include <netinet/in.h>
     38  1.1      haad #include <stdio.h>
     39  1.1      haad #include <stdlib.h>
     40  1.1      haad #include <stddef.h>
     41  1.1      haad #include <stdarg.h>
     42  1.1      haad #include <signal.h>
     43  1.1      haad #include <unistd.h>
     44  1.1      haad #include <fcntl.h>
     45  1.1      haad #include <getopt.h>
     46  1.1      haad #include <syslog.h>
     47  1.1      haad #include <errno.h>
     48  1.1      haad #include <limits.h>
     49  1.2  dholland #ifdef HAVE_COROSYNC_CONFDB_H
     50  1.2  dholland #include <corosync/confdb.h>
     51  1.2  dholland #endif
     52  1.1      haad 
     53  1.1      haad #include "clvmd-comms.h"
     54  1.1      haad #include "lvm-functions.h"
     55  1.1      haad #include "clvm.h"
     56  1.2  dholland #include "lvm-version.h"
     57  1.1      haad #include "clvmd.h"
     58  1.1      haad #include "refresh_clvmd.h"
     59  1.1      haad #include "lvm-logging.h"
     60  1.1      haad 
     61  1.1      haad #ifndef TRUE
     62  1.1      haad #define TRUE 1
     63  1.1      haad #endif
     64  1.1      haad #ifndef FALSE
     65  1.1      haad #define FALSE 0
     66  1.1      haad #endif
     67  1.1      haad 
     68  1.1      haad #define MAX_RETRIES 4
     69  1.1      haad 
     70  1.1      haad #define ISLOCAL_CSID(c) (memcmp(c, our_csid, max_csid_len) == 0)
     71  1.1      haad 
     72  1.1      haad /* Head of the fd list. Also contains
     73  1.1      haad    the cluster_socket details */
     74  1.1      haad static struct local_client local_client_head;
     75  1.1      haad 
     76  1.1      haad static unsigned short global_xid = 0;	/* Last transaction ID issued */
     77  1.1      haad 
     78  1.1      haad struct cluster_ops *clops = NULL;
     79  1.1      haad 
     80  1.1      haad static char our_csid[MAX_CSID_LEN];
     81  1.1      haad static unsigned max_csid_len;
     82  1.1      haad static unsigned max_cluster_message;
     83  1.1      haad static unsigned max_cluster_member_name_len;
     84  1.1      haad 
     85  1.1      haad /* Structure of items on the LVM thread list */
     86  1.1      haad struct lvm_thread_cmd {
     87  1.1      haad 	struct dm_list list;
     88  1.1      haad 
     89  1.1      haad 	struct local_client *client;
     90  1.1      haad 	struct clvm_header *msg;
     91  1.1      haad 	char csid[MAX_CSID_LEN];
     92  1.1      haad 	int remote;		/* Flag */
     93  1.1      haad 	int msglen;
     94  1.1      haad 	unsigned short xid;
     95  1.1      haad };
     96  1.1      haad 
     97  1.1      haad debug_t debug;
     98  1.1      haad static pthread_t lvm_thread;
     99  1.1      haad static pthread_mutex_t lvm_thread_mutex;
    100  1.1      haad static pthread_cond_t lvm_thread_cond;
    101  1.1      haad static pthread_mutex_t lvm_start_mutex;
    102  1.1      haad static struct dm_list lvm_cmd_head;
    103  1.1      haad static volatile sig_atomic_t quit = 0;
    104  1.1      haad static volatile sig_atomic_t reread_config = 0;
    105  1.1      haad static int child_pipe[2];
    106  1.1      haad 
    107  1.1      haad /* Reasons the daemon failed initialisation */
    108  1.1      haad #define DFAIL_INIT       1
    109  1.1      haad #define DFAIL_LOCAL_SOCK 2
    110  1.1      haad #define DFAIL_CLUSTER_IF 3
    111  1.1      haad #define DFAIL_MALLOC     4
    112  1.1      haad #define DFAIL_TIMEOUT    5
    113  1.1      haad #define SUCCESS          0
    114  1.1      haad 
    115  1.2  dholland typedef enum {IF_AUTO, IF_CMAN, IF_GULM, IF_OPENAIS, IF_COROSYNC} if_type_t;
    116  1.2  dholland 
    117  1.2  dholland typedef void *(lvm_pthread_fn_t)(void*);
    118  1.2  dholland 
    119  1.1      haad /* Prototypes for code further down */
    120  1.1      haad static void sigusr2_handler(int sig);
    121  1.1      haad static void sighup_handler(int sig);
    122  1.1      haad static void sigterm_handler(int sig);
    123  1.1      haad static void send_local_reply(struct local_client *client, int status,
    124  1.1      haad 			     int clientid);
    125  1.1      haad static void free_reply(struct local_client *client);
    126  1.1      haad static void send_version_message(void);
    127  1.1      haad static void *pre_and_post_thread(void *arg);
    128  1.1      haad static int send_message(void *buf, int msglen, const char *csid, int fd,
    129  1.1      haad 			const char *errtext);
    130  1.1      haad static int read_from_local_sock(struct local_client *thisfd);
    131  1.1      haad static int process_local_command(struct clvm_header *msg, int msglen,
    132  1.1      haad 				 struct local_client *client,
    133  1.1      haad 				 unsigned short xid);
    134  1.1      haad static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
    135  1.1      haad 				   const char *csid);
    136  1.1      haad static int process_reply(const struct clvm_header *msg, int msglen,
    137  1.1      haad 			 const char *csid);
    138  1.1      haad static int open_local_sock(void);
    139  1.1      haad static int check_local_clvmd(void);
    140  1.1      haad static struct local_client *find_client(int clientid);
    141  1.1      haad static void main_loop(int local_sock, int cmd_timeout);
    142  1.1      haad static void be_daemon(int start_timeout);
    143  1.1      haad static int check_all_clvmds_running(struct local_client *client);
    144  1.1      haad static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
    145  1.1      haad 				     int len, const char *csid,
    146  1.1      haad 				     struct local_client **new_client);
    147  1.2  dholland static void lvm_thread_fn(void *) __attribute__ ((noreturn));
    148  1.1      haad static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
    149  1.1      haad 			   int msglen, const char *csid);
    150  1.1      haad static int distribute_command(struct local_client *thisfd);
    151  1.1      haad static void hton_clvm(struct clvm_header *hdr);
    152  1.1      haad static void ntoh_clvm(struct clvm_header *hdr);
    153  1.1      haad static void add_reply_to_list(struct local_client *client, int status,
    154  1.1      haad 			      const char *csid, const char *buf, int len);
    155  1.2  dholland static if_type_t parse_cluster_interface(char *ifname);
    156  1.2  dholland static if_type_t get_cluster_type(void);
    157  1.1      haad 
    158  1.1      haad static void usage(char *prog, FILE *file)
    159  1.1      haad {
    160  1.1      haad 	fprintf(file, "Usage:\n");
    161  1.1      haad 	fprintf(file, "%s [Vhd]\n", prog);
    162  1.1      haad 	fprintf(file, "\n");
    163  1.1      haad 	fprintf(file, "   -V       Show version of clvmd\n");
    164  1.1      haad 	fprintf(file, "   -h       Show this help information\n");
    165  1.1      haad 	fprintf(file, "   -d       Set debug level\n");
    166  1.1      haad 	fprintf(file, "            If starting clvmd then don't fork, run in the foreground\n");
    167  1.1      haad 	fprintf(file, "   -R       Tell all running clvmds in the cluster to reload their device cache\n");
    168  1.1      haad 	fprintf(file, "   -C       Sets debug level (from -d) on all clvmd instances clusterwide\n");
    169  1.1      haad 	fprintf(file, "   -t<secs> Command timeout (default 60 seconds)\n");
    170  1.1      haad 	fprintf(file, "   -T<secs> Startup timeout (default none)\n");
    171  1.2  dholland 	fprintf(file, "   -I<cmgr> Cluster manager (default: auto)\n");
    172  1.2  dholland 	fprintf(file, "            Available cluster managers: ");
    173  1.2  dholland #ifdef USE_COROSYNC
    174  1.2  dholland 	fprintf(file, "corosync ");
    175  1.2  dholland #endif
    176  1.2  dholland #ifdef USE_CMAN
    177  1.2  dholland 	fprintf(file, "cman ");
    178  1.2  dholland #endif
    179  1.2  dholland #ifdef USE_OPENAIS
    180  1.2  dholland 	fprintf(file, "openais ");
    181  1.2  dholland #endif
    182  1.2  dholland #ifdef USE_GULM
    183  1.2  dholland 	fprintf(file, "gulm ");
    184  1.2  dholland #endif
    185  1.1      haad 	fprintf(file, "\n");
    186  1.1      haad }
    187  1.1      haad 
    188  1.1      haad /* Called to signal the parent how well we got on during initialisation */
    189  1.1      haad static void child_init_signal(int status)
    190  1.1      haad {
    191  1.1      haad         if (child_pipe[1]) {
    192  1.1      haad 	        write(child_pipe[1], &status, sizeof(status));
    193  1.1      haad 		close(child_pipe[1]);
    194  1.1      haad 	}
    195  1.1      haad 	if (status)
    196  1.1      haad 	        exit(status);
    197  1.1      haad }
    198  1.1      haad 
    199  1.1      haad 
    200  1.1      haad void debuglog(const char *fmt, ...)
    201  1.1      haad {
    202  1.1      haad 	time_t P;
    203  1.1      haad 	va_list ap;
    204  1.1      haad 	static int syslog_init = 0;
    205  1.1      haad 
    206  1.1      haad 	if (debug == DEBUG_STDERR) {
    207  1.1      haad 		va_start(ap,fmt);
    208  1.1      haad 		time(&P);
    209  1.1      haad 		fprintf(stderr, "CLVMD[%x]: %.15s ", (int)pthread_self(), ctime(&P)+4 );
    210  1.1      haad 		vfprintf(stderr, fmt, ap);
    211  1.1      haad 		va_end(ap);
    212  1.1      haad 	}
    213  1.1      haad 	if (debug == DEBUG_SYSLOG) {
    214  1.1      haad 		if (!syslog_init) {
    215  1.1      haad 			openlog("clvmd", LOG_PID, LOG_DAEMON);
    216  1.1      haad 			syslog_init = 1;
    217  1.1      haad 		}
    218  1.1      haad 
    219  1.1      haad 		va_start(ap,fmt);
    220  1.1      haad 		vsyslog(LOG_DEBUG, fmt, ap);
    221  1.1      haad 		va_end(ap);
    222  1.1      haad 	}
    223  1.1      haad }
    224  1.1      haad 
    225  1.1      haad static const char *decode_cmd(unsigned char cmdl)
    226  1.1      haad {
    227  1.1      haad 	static char buf[128];
    228  1.1      haad 	const char *command;
    229  1.1      haad 
    230  1.1      haad 	switch (cmdl) {
    231  1.2  dholland 	case CLVMD_CMD_TEST:
    232  1.2  dholland 		command = "TEST";
    233  1.1      haad 		break;
    234  1.2  dholland 	case CLVMD_CMD_LOCK_VG:
    235  1.2  dholland 		command = "LOCK_VG";
    236  1.1      haad 		break;
    237  1.2  dholland 	case CLVMD_CMD_LOCK_LV:
    238  1.2  dholland 		command = "LOCK_LV";
    239  1.1      haad 		break;
    240  1.2  dholland 	case CLVMD_CMD_REFRESH:
    241  1.2  dholland 		command = "REFRESH";
    242  1.1      haad 		break;
    243  1.2  dholland 	case CLVMD_CMD_SET_DEBUG:
    244  1.2  dholland 		command = "SET_DEBUG";
    245  1.1      haad 		break;
    246  1.2  dholland 	case CLVMD_CMD_GET_CLUSTERNAME:
    247  1.1      haad 		command = "GET_CLUSTERNAME";
    248  1.1      haad 		break;
    249  1.2  dholland 	case CLVMD_CMD_VG_BACKUP:
    250  1.2  dholland 		command = "VG_BACKUP";
    251  1.2  dholland 		break;
    252  1.2  dholland 	case CLVMD_CMD_REPLY:
    253  1.2  dholland 		command = "REPLY";
    254  1.1      haad 		break;
    255  1.2  dholland 	case CLVMD_CMD_VERSION:
    256  1.2  dholland 		command = "VERSION";
    257  1.1      haad 		break;
    258  1.2  dholland 	case CLVMD_CMD_GOAWAY:
    259  1.2  dholland 		command = "GOAWAY";
    260  1.1      haad 		break;
    261  1.2  dholland 	case CLVMD_CMD_LOCK:
    262  1.2  dholland 		command = "LOCK";
    263  1.1      haad 		break;
    264  1.2  dholland 	case CLVMD_CMD_UNLOCK:
    265  1.2  dholland 		command = "UNLOCK";
    266  1.1      haad 		break;
    267  1.2  dholland 	case CLVMD_CMD_LOCK_QUERY:
    268  1.2  dholland 		command = "LOCK_QUERY";
    269  1.1      haad 		break;
    270  1.2  dholland 	default:
    271  1.2  dholland 		command = "unknown";
    272  1.1      haad 		break;
    273  1.1      haad 	}
    274  1.1      haad 
    275  1.1      haad 	sprintf(buf, "%s (0x%x)", command, cmdl);
    276  1.1      haad 
    277  1.1      haad 	return buf;
    278  1.1      haad }
    279  1.1      haad 
    280  1.1      haad int main(int argc, char *argv[])
    281  1.1      haad {
    282  1.1      haad 	int local_sock;
    283  1.1      haad 	struct local_client *newfd;
    284  1.1      haad 	struct utsname nodeinfo;
    285  1.1      haad 	signed char opt;
    286  1.1      haad 	int cmd_timeout = DEFAULT_CMD_TIMEOUT;
    287  1.1      haad 	int start_timeout = 0;
    288  1.2  dholland 	if_type_t cluster_iface = IF_AUTO;
    289  1.1      haad 	sigset_t ss;
    290  1.1      haad 	int using_gulm = 0;
    291  1.1      haad 	int debug_opt = 0;
    292  1.1      haad 	int clusterwide_opt = 0;
    293  1.1      haad 
    294  1.1      haad 	/* Deal with command-line arguments */
    295  1.1      haad 	opterr = 0;
    296  1.1      haad 	optind = 0;
    297  1.2  dholland 	while ((opt = getopt(argc, argv, "?vVhd::t:RT:CI:")) != EOF) {
    298  1.1      haad 		switch (opt) {
    299  1.1      haad 		case 'h':
    300  1.1      haad 			usage(argv[0], stdout);
    301  1.1      haad 			exit(0);
    302  1.1      haad 
    303  1.1      haad 		case '?':
    304  1.1      haad 			usage(argv[0], stderr);
    305  1.1      haad 			exit(0);
    306  1.1      haad 
    307  1.1      haad 		case 'R':
    308  1.2  dholland 			return refresh_clvmd()==1?0:1;
    309  1.1      haad 
    310  1.1      haad 		case 'C':
    311  1.1      haad 			clusterwide_opt = 1;
    312  1.1      haad 			break;
    313  1.1      haad 
    314  1.1      haad 		case 'd':
    315  1.1      haad 			debug_opt = 1;
    316  1.1      haad 			if (optarg)
    317  1.1      haad 				debug = atoi(optarg);
    318  1.1      haad 			else
    319  1.1      haad 				debug = DEBUG_STDERR;
    320  1.1      haad 			break;
    321  1.1      haad 
    322  1.1      haad 		case 't':
    323  1.1      haad 			cmd_timeout = atoi(optarg);
    324  1.1      haad 			if (!cmd_timeout) {
    325  1.1      haad 				fprintf(stderr, "command timeout is invalid\n");
    326  1.1      haad 				usage(argv[0], stderr);
    327  1.1      haad 				exit(1);
    328  1.1      haad 			}
    329  1.1      haad 			break;
    330  1.2  dholland 		case 'I':
    331  1.2  dholland 			cluster_iface = parse_cluster_interface(optarg);
    332  1.2  dholland 			break;
    333  1.1      haad 		case 'T':
    334  1.1      haad 			start_timeout = atoi(optarg);
    335  1.1      haad 			if (start_timeout <= 0) {
    336  1.1      haad 				fprintf(stderr, "startup timeout is invalid\n");
    337  1.1      haad 				usage(argv[0], stderr);
    338  1.1      haad 				exit(1);
    339  1.1      haad 			}
    340  1.1      haad 			break;
    341  1.1      haad 
    342  1.1      haad 		case 'V':
    343  1.1      haad 		        printf("Cluster LVM daemon version: %s\n", LVM_VERSION);
    344  1.1      haad 			printf("Protocol version:           %d.%d.%d\n",
    345  1.1      haad 			       CLVMD_MAJOR_VERSION, CLVMD_MINOR_VERSION,
    346  1.1      haad 			       CLVMD_PATCH_VERSION);
    347  1.1      haad 			exit(1);
    348  1.1      haad 			break;
    349  1.1      haad 
    350  1.1      haad 		}
    351  1.1      haad 	}
    352  1.1      haad 
    353  1.1      haad 	/* Setting debug options on an existing clvmd */
    354  1.1      haad 	if (debug_opt && !check_local_clvmd()) {
    355  1.1      haad 
    356  1.1      haad 		/* Sending to stderr makes no sense for a detached daemon */
    357  1.1      haad 		if (debug == DEBUG_STDERR)
    358  1.1      haad 			debug = DEBUG_SYSLOG;
    359  1.2  dholland 		return debug_clvmd(debug, clusterwide_opt)==1?0:1;
    360  1.1      haad 	}
    361  1.1      haad 
    362  1.1      haad 	/* Fork into the background (unless requested not to) */
    363  1.1      haad 	if (debug != DEBUG_STDERR) {
    364  1.1      haad 		be_daemon(start_timeout);
    365  1.1      haad 	}
    366  1.1      haad 
    367  1.1      haad 	DEBUGLOG("CLVMD started\n");
    368  1.1      haad 
    369  1.1      haad 	/* Open the Unix socket we listen for commands on.
    370  1.1      haad 	   We do this before opening the cluster socket so that
    371  1.1      haad 	   potential clients will block rather than error if we are running
    372  1.1      haad 	   but the cluster is not ready yet */
    373  1.1      haad 	local_sock = open_local_sock();
    374  1.1      haad 	if (local_sock < 0)
    375  1.1      haad 		child_init_signal(DFAIL_LOCAL_SOCK);
    376  1.1      haad 
    377  1.1      haad 	/* Set up signal handlers, USR1 is for cluster change notifications (in cman)
    378  1.1      haad 	   USR2 causes child threads to exit.
    379  1.1      haad 	   HUP causes gulm version to re-read nodes list from CCS.
    380  1.1      haad 	   PIPE should be ignored */
    381  1.1      haad 	signal(SIGUSR2, sigusr2_handler);
    382  1.1      haad 	signal(SIGHUP,  sighup_handler);
    383  1.1      haad 	signal(SIGPIPE, SIG_IGN);
    384  1.1      haad 
    385  1.2  dholland 	/* Block SIGUSR2/SIGINT/SIGTERM in process */
    386  1.1      haad 	sigemptyset(&ss);
    387  1.1      haad 	sigaddset(&ss, SIGUSR2);
    388  1.2  dholland 	sigaddset(&ss, SIGINT);
    389  1.2  dholland 	sigaddset(&ss, SIGTERM);
    390  1.1      haad 	sigprocmask(SIG_BLOCK, &ss, NULL);
    391  1.1      haad 
    392  1.1      haad 	/* Initialise the LVM thread variables */
    393  1.1      haad 	dm_list_init(&lvm_cmd_head);
    394  1.1      haad 	pthread_mutex_init(&lvm_thread_mutex, NULL);
    395  1.1      haad 	pthread_cond_init(&lvm_thread_cond, NULL);
    396  1.1      haad 	pthread_mutex_init(&lvm_start_mutex, NULL);
    397  1.1      haad 	init_lvhash();
    398  1.1      haad 
    399  1.1      haad 	/* Start the cluster interface */
    400  1.2  dholland 	if (cluster_iface == IF_AUTO)
    401  1.2  dholland 		cluster_iface = get_cluster_type();
    402  1.2  dholland 
    403  1.1      haad #ifdef USE_CMAN
    404  1.2  dholland 	if ((cluster_iface == IF_AUTO || cluster_iface == IF_CMAN) && (clops = init_cman_cluster())) {
    405  1.1      haad 		max_csid_len = CMAN_MAX_CSID_LEN;
    406  1.1      haad 		max_cluster_message = CMAN_MAX_CLUSTER_MESSAGE;
    407  1.1      haad 		max_cluster_member_name_len = CMAN_MAX_NODENAME_LEN;
    408  1.1      haad 		syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to CMAN");
    409  1.1      haad 	}
    410  1.1      haad #endif
    411  1.1      haad #ifdef USE_GULM
    412  1.1      haad 	if (!clops)
    413  1.2  dholland 		if ((cluster_iface == IF_AUTO || cluster_iface == IF_GULM) && (clops = init_gulm_cluster())) {
    414  1.1      haad 			max_csid_len = GULM_MAX_CSID_LEN;
    415  1.1      haad 			max_cluster_message = GULM_MAX_CLUSTER_MESSAGE;
    416  1.1      haad 			max_cluster_member_name_len = GULM_MAX_CLUSTER_MEMBER_NAME_LEN;
    417  1.1      haad 			using_gulm = 1;
    418  1.1      haad 			syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to GULM");
    419  1.1      haad 		}
    420  1.1      haad #endif
    421  1.2  dholland #ifdef USE_COROSYNC
    422  1.2  dholland 	if (!clops)
    423  1.2  dholland 		if (((cluster_iface == IF_AUTO || cluster_iface == IF_COROSYNC) && (clops = init_corosync_cluster()))) {
    424  1.2  dholland 			max_csid_len = COROSYNC_CSID_LEN;
    425  1.2  dholland 			max_cluster_message = COROSYNC_MAX_CLUSTER_MESSAGE;
    426  1.2  dholland 			max_cluster_member_name_len = COROSYNC_MAX_CLUSTER_MEMBER_NAME_LEN;
    427  1.2  dholland 			syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to Corosync");
    428  1.2  dholland 		}
    429  1.2  dholland #endif
    430  1.1      haad #ifdef USE_OPENAIS
    431  1.1      haad 	if (!clops)
    432  1.2  dholland 		if ((cluster_iface == IF_AUTO || cluster_iface == IF_OPENAIS) && (clops = init_openais_cluster())) {
    433  1.1      haad 			max_csid_len = OPENAIS_CSID_LEN;
    434  1.1      haad 			max_cluster_message = OPENAIS_MAX_CLUSTER_MESSAGE;
    435  1.1      haad 			max_cluster_member_name_len = OPENAIS_MAX_CLUSTER_MEMBER_NAME_LEN;
    436  1.1      haad 			syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to OpenAIS");
    437  1.1      haad 		}
    438  1.1      haad #endif
    439  1.1      haad 
    440  1.1      haad 	if (!clops) {
    441  1.1      haad 		DEBUGLOG("Can't initialise cluster interface\n");
    442  1.1      haad 		log_error("Can't initialise cluster interface\n");
    443  1.1      haad 		child_init_signal(DFAIL_CLUSTER_IF);
    444  1.1      haad 	}
    445  1.1      haad 	DEBUGLOG("Cluster ready, doing some more initialisation\n");
    446  1.1      haad 
    447  1.1      haad 	/* Save our CSID */
    448  1.1      haad 	uname(&nodeinfo);
    449  1.1      haad 	clops->get_our_csid(our_csid);
    450  1.1      haad 
    451  1.1      haad 	/* Initialise the FD list head */
    452  1.1      haad 	local_client_head.fd = clops->get_main_cluster_fd();
    453  1.1      haad 	local_client_head.type = CLUSTER_MAIN_SOCK;
    454  1.1      haad 	local_client_head.callback = clops->cluster_fd_callback;
    455  1.1      haad 
    456  1.1      haad 	/* Add the local socket to the list */
    457  1.1      haad 	newfd = malloc(sizeof(struct local_client));
    458  1.1      haad 	if (!newfd)
    459  1.1      haad 	        child_init_signal(DFAIL_MALLOC);
    460  1.1      haad 
    461  1.1      haad 	newfd->fd = local_sock;
    462  1.1      haad 	newfd->removeme = 0;
    463  1.1      haad 	newfd->type = LOCAL_RENDEZVOUS;
    464  1.1      haad 	newfd->callback = local_rendezvous_callback;
    465  1.1      haad 	newfd->next = local_client_head.next;
    466  1.1      haad 	local_client_head.next = newfd;
    467  1.1      haad 
    468  1.1      haad 	/* This needs to be started after cluster initialisation
    469  1.1      haad 	   as it may need to take out locks */
    470  1.1      haad 	DEBUGLOG("starting LVM thread\n");
    471  1.2  dholland 
    472  1.2  dholland 	/* Don't let anyone else to do work until we are started */
    473  1.2  dholland 	pthread_mutex_lock(&lvm_start_mutex);
    474  1.2  dholland 	pthread_create(&lvm_thread, NULL, (lvm_pthread_fn_t*)lvm_thread_fn,
    475  1.1      haad 			(void *)(long)using_gulm);
    476  1.1      haad 
    477  1.1      haad 	/* Tell the rest of the cluster our version number */
    478  1.1      haad 	/* CMAN can do this immediately, gulm needs to wait until
    479  1.1      haad 	   the core initialisation has finished and the node list
    480  1.1      haad 	   has been gathered */
    481  1.1      haad 	if (clops->cluster_init_completed)
    482  1.1      haad 		clops->cluster_init_completed();
    483  1.1      haad 
    484  1.1      haad 	DEBUGLOG("clvmd ready for work\n");
    485  1.1      haad 	child_init_signal(SUCCESS);
    486  1.1      haad 
    487  1.1      haad 	/* Try to shutdown neatly */
    488  1.1      haad 	signal(SIGTERM, sigterm_handler);
    489  1.1      haad 	signal(SIGINT, sigterm_handler);
    490  1.1      haad 
    491  1.1      haad 	/* Do some work */
    492  1.1      haad 	main_loop(local_sock, cmd_timeout);
    493  1.1      haad 
    494  1.2  dholland 	destroy_lvm();
    495  1.2  dholland 
    496  1.1      haad 	return 0;
    497  1.1      haad }
    498  1.1      haad 
    499  1.1      haad /* Called when the GuLM cluster layer has completed initialisation.
    500  1.1      haad    We send the version message */
    501  1.1      haad void clvmd_cluster_init_completed()
    502  1.1      haad {
    503  1.1      haad 	send_version_message();
    504  1.1      haad }
    505  1.1      haad 
    506  1.1      haad /* Data on a connected socket */
    507  1.1      haad static int local_sock_callback(struct local_client *thisfd, char *buf, int len,
    508  1.1      haad 			       const char *csid,
    509  1.1      haad 			       struct local_client **new_client)
    510  1.1      haad {
    511  1.1      haad 	*new_client = NULL;
    512  1.1      haad 	return read_from_local_sock(thisfd);
    513  1.1      haad }
    514  1.1      haad 
    515  1.1      haad /* Data on a connected socket */
    516  1.1      haad static int local_rendezvous_callback(struct local_client *thisfd, char *buf,
    517  1.1      haad 				     int len, const char *csid,
    518  1.1      haad 				     struct local_client **new_client)
    519  1.1      haad {
    520  1.1      haad 	/* Someone connected to our local socket, accept it. */
    521  1.1      haad 
    522  1.1      haad 	struct sockaddr_un socka;
    523  1.1      haad 	struct local_client *newfd;
    524  1.1      haad 	socklen_t sl = sizeof(socka);
    525  1.1      haad 	int client_fd = accept(thisfd->fd, (struct sockaddr *) &socka, &sl);
    526  1.1      haad 
    527  1.1      haad 	if (client_fd == -1 && errno == EINTR)
    528  1.1      haad 		return 1;
    529  1.1      haad 
    530  1.1      haad 	if (client_fd >= 0) {
    531  1.1      haad 		newfd = malloc(sizeof(struct local_client));
    532  1.1      haad 		if (!newfd) {
    533  1.1      haad 			close(client_fd);
    534  1.1      haad 			return 1;
    535  1.1      haad 		}
    536  1.1      haad 		newfd->fd = client_fd;
    537  1.1      haad 		newfd->type = LOCAL_SOCK;
    538  1.1      haad 		newfd->xid = 0;
    539  1.1      haad 		newfd->removeme = 0;
    540  1.1      haad 		newfd->callback = local_sock_callback;
    541  1.1      haad 		newfd->bits.localsock.replies = NULL;
    542  1.1      haad 		newfd->bits.localsock.expected_replies = 0;
    543  1.1      haad 		newfd->bits.localsock.cmd = NULL;
    544  1.1      haad 		newfd->bits.localsock.in_progress = FALSE;
    545  1.1      haad 		newfd->bits.localsock.sent_out = FALSE;
    546  1.1      haad 		newfd->bits.localsock.threadid = 0;
    547  1.1      haad 		newfd->bits.localsock.finished = 0;
    548  1.1      haad 		newfd->bits.localsock.pipe_client = NULL;
    549  1.1      haad 		newfd->bits.localsock.private = NULL;
    550  1.1      haad 		newfd->bits.localsock.all_success = 1;
    551  1.1      haad 		DEBUGLOG("Got new connection on fd %d\n", newfd->fd);
    552  1.1      haad 		*new_client = newfd;
    553  1.1      haad 	}
    554  1.1      haad 	return 1;
    555  1.1      haad }
    556  1.1      haad 
    557  1.1      haad static int local_pipe_callback(struct local_client *thisfd, char *buf,
    558  1.1      haad 			       int maxlen, const char *csid,
    559  1.1      haad 			       struct local_client **new_client)
    560  1.1      haad {
    561  1.1      haad 	int len;
    562  1.1      haad 	char buffer[PIPE_BUF];
    563  1.1      haad 	struct local_client *sock_client = thisfd->bits.pipe.client;
    564  1.1      haad 	int status = -1;	/* in error by default */
    565  1.1      haad 
    566  1.1      haad 	len = read(thisfd->fd, buffer, sizeof(int));
    567  1.1      haad 	if (len == -1 && errno == EINTR)
    568  1.1      haad 		return 1;
    569  1.1      haad 
    570  1.1      haad 	if (len == sizeof(int)) {
    571  1.1      haad 		memcpy(&status, buffer, sizeof(int));
    572  1.1      haad 	}
    573  1.1      haad 
    574  1.1      haad 	DEBUGLOG("read on PIPE %d: %d bytes: status: %d\n",
    575  1.1      haad 		 thisfd->fd, len, status);
    576  1.1      haad 
    577  1.1      haad 	/* EOF on pipe or an error, close it */
    578  1.1      haad 	if (len <= 0) {
    579  1.1      haad 		int jstat;
    580  1.1      haad 		void *ret = &status;
    581  1.1      haad 		close(thisfd->fd);
    582  1.1      haad 
    583  1.1      haad 		/* Clear out the cross-link */
    584  1.1      haad 		if (thisfd->bits.pipe.client != NULL)
    585  1.1      haad 			thisfd->bits.pipe.client->bits.localsock.pipe_client =
    586  1.1      haad 			    NULL;
    587  1.1      haad 
    588  1.1      haad 		/* Reap child thread */
    589  1.1      haad 		if (thisfd->bits.pipe.threadid) {
    590  1.1      haad 			jstat = pthread_join(thisfd->bits.pipe.threadid, &ret);
    591  1.1      haad 			thisfd->bits.pipe.threadid = 0;
    592  1.1      haad 			if (thisfd->bits.pipe.client != NULL)
    593  1.1      haad 				thisfd->bits.pipe.client->bits.localsock.
    594  1.1      haad 				    threadid = 0;
    595  1.1      haad 		}
    596  1.1      haad 		return -1;
    597  1.1      haad 	} else {
    598  1.1      haad 		DEBUGLOG("background routine status was %d, sock_client=%p\n",
    599  1.1      haad 			 status, sock_client);
    600  1.1      haad 		/* But has the client gone away ?? */
    601  1.1      haad 		if (sock_client == NULL) {
    602  1.1      haad 			DEBUGLOG
    603  1.1      haad 			    ("Got PIPE response for dead client, ignoring it\n");
    604  1.1      haad 		} else {
    605  1.1      haad 			/* If error then just return that code */
    606  1.1      haad 			if (status)
    607  1.1      haad 				send_local_reply(sock_client, status,
    608  1.1      haad 						 sock_client->fd);
    609  1.1      haad 			else {
    610  1.1      haad 				if (sock_client->bits.localsock.state ==
    611  1.1      haad 				    POST_COMMAND) {
    612  1.1      haad 					send_local_reply(sock_client, 0,
    613  1.1      haad 							 sock_client->fd);
    614  1.1      haad 				} else	// PRE_COMMAND finished.
    615  1.1      haad 				{
    616  1.1      haad 					if (
    617  1.1      haad 					    (status =
    618  1.1      haad 					     distribute_command(sock_client)) !=
    619  1.1      haad 					    0) send_local_reply(sock_client,
    620  1.1      haad 								EFBIG,
    621  1.1      haad 								sock_client->
    622  1.1      haad 								fd);
    623  1.1      haad 				}
    624  1.1      haad 			}
    625  1.1      haad 		}
    626  1.1      haad 	}
    627  1.1      haad 	return len;
    628  1.1      haad }
    629  1.1      haad 
    630  1.1      haad /* If a noed is up, look for it in the reply array, if it's not there then
    631  1.1      haad    add one with "ETIMEDOUT".
    632  1.1      haad    NOTE: This won't race with real replies because they happen in the same thread.
    633  1.1      haad */
    634  1.1      haad static void timedout_callback(struct local_client *client, const char *csid,
    635  1.1      haad 			      int node_up)
    636  1.1      haad {
    637  1.1      haad 	if (node_up) {
    638  1.1      haad 		struct node_reply *reply;
    639  1.1      haad 		char nodename[max_cluster_member_name_len];
    640  1.1      haad 
    641  1.1      haad 		clops->name_from_csid(csid, nodename);
    642  1.1      haad 		DEBUGLOG("Checking for a reply from %s\n", nodename);
    643  1.1      haad 		pthread_mutex_lock(&client->bits.localsock.reply_mutex);
    644  1.1      haad 
    645  1.1      haad 		reply = client->bits.localsock.replies;
    646  1.1      haad 		while (reply && strcmp(reply->node, nodename) != 0) {
    647  1.1      haad 			reply = reply->next;
    648  1.1      haad 		}
    649  1.1      haad 
    650  1.1      haad 		pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
    651  1.1      haad 
    652  1.1      haad 		if (!reply) {
    653  1.1      haad 			DEBUGLOG("Node %s timed-out\n", nodename);
    654  1.1      haad 			add_reply_to_list(client, ETIMEDOUT, csid,
    655  1.1      haad 					  "Command timed out", 18);
    656  1.1      haad 		}
    657  1.1      haad 	}
    658  1.1      haad }
    659  1.1      haad 
    660  1.1      haad /* Called when the request has timed out on at least one node. We fill in
    661  1.1      haad    the remaining node entries with ETIMEDOUT and return.
    662  1.1      haad 
    663  1.1      haad    By the time we get here the node that caused
    664  1.1      haad    the timeout could have gone down, in which case we will never get the expected
    665  1.1      haad    number of replies that triggers the post command so we need to do it here
    666  1.1      haad */
    667  1.1      haad static void request_timed_out(struct local_client *client)
    668  1.1      haad {
    669  1.1      haad 	DEBUGLOG("Request timed-out. padding\n");
    670  1.1      haad 	clops->cluster_do_node_callback(client, timedout_callback);
    671  1.1      haad 
    672  1.1      haad 	if (client->bits.localsock.num_replies !=
    673  1.1      haad 	    client->bits.localsock.expected_replies) {
    674  1.1      haad 		/* Post-process the command */
    675  1.1      haad 		if (client->bits.localsock.threadid) {
    676  1.1      haad 			pthread_mutex_lock(&client->bits.localsock.mutex);
    677  1.1      haad 			client->bits.localsock.state = POST_COMMAND;
    678  1.1      haad 			pthread_cond_signal(&client->bits.localsock.cond);
    679  1.1      haad 			pthread_mutex_unlock(&client->bits.localsock.mutex);
    680  1.1      haad 		}
    681  1.1      haad 	}
    682  1.1      haad }
    683  1.1      haad 
    684  1.1      haad /* This is where the real work happens */
    685  1.1      haad static void main_loop(int local_sock, int cmd_timeout)
    686  1.1      haad {
    687  1.1      haad 	DEBUGLOG("Using timeout of %d seconds\n", cmd_timeout);
    688  1.1      haad 
    689  1.2  dholland 	sigset_t ss;
    690  1.2  dholland 	sigemptyset(&ss);
    691  1.2  dholland 	sigaddset(&ss, SIGINT);
    692  1.2  dholland 	sigaddset(&ss, SIGTERM);
    693  1.2  dholland 	pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
    694  1.1      haad 	/* Main loop */
    695  1.1      haad 	while (!quit) {
    696  1.1      haad 		fd_set in;
    697  1.1      haad 		int select_status;
    698  1.1      haad 		struct local_client *thisfd;
    699  1.1      haad 		struct timeval tv = { cmd_timeout, 0 };
    700  1.1      haad 		int quorate = clops->is_quorate();
    701  1.1      haad 
    702  1.1      haad 		/* Wait on the cluster FD and all local sockets/pipes */
    703  1.1      haad 		local_client_head.fd = clops->get_main_cluster_fd();
    704  1.1      haad 		FD_ZERO(&in);
    705  1.1      haad 		for (thisfd = &local_client_head; thisfd != NULL;
    706  1.1      haad 		     thisfd = thisfd->next) {
    707  1.1      haad 
    708  1.1      haad 			if (thisfd->removeme)
    709  1.1      haad 				continue;
    710  1.1      haad 
    711  1.1      haad 			/* if the cluster is not quorate then don't listen for new requests */
    712  1.1      haad 			if ((thisfd->type != LOCAL_RENDEZVOUS &&
    713  1.1      haad 			     thisfd->type != LOCAL_SOCK) || quorate)
    714  1.1      haad 				FD_SET(thisfd->fd, &in);
    715  1.1      haad 		}
    716  1.1      haad 
    717  1.1      haad 		select_status = select(FD_SETSIZE, &in, NULL, NULL, &tv);
    718  1.1      haad 
    719  1.1      haad 		if (reread_config) {
    720  1.1      haad 			int saved_errno = errno;
    721  1.1      haad 
    722  1.1      haad 			reread_config = 0;
    723  1.1      haad 			if (clops->reread_config)
    724  1.1      haad 				clops->reread_config();
    725  1.1      haad 			errno = saved_errno;
    726  1.1      haad 		}
    727  1.1      haad 
    728  1.1      haad 		if (select_status > 0) {
    729  1.1      haad 			struct local_client *lastfd = NULL;
    730  1.1      haad 			char csid[MAX_CSID_LEN];
    731  1.1      haad 			char buf[max_cluster_message];
    732  1.1      haad 
    733  1.1      haad 			for (thisfd = &local_client_head; thisfd != NULL;
    734  1.1      haad 			     thisfd = thisfd->next) {
    735  1.1      haad 
    736  1.1      haad 				if (thisfd->removeme) {
    737  1.1      haad 					struct local_client *free_fd;
    738  1.1      haad 					lastfd->next = thisfd->next;
    739  1.1      haad 					free_fd = thisfd;
    740  1.1      haad 					thisfd = lastfd;
    741  1.1      haad 
    742  1.1      haad 					DEBUGLOG("removeme set for fd %d\n", free_fd->fd);
    743  1.1      haad 
    744  1.1      haad 					/* Queue cleanup, this also frees the client struct */
    745  1.1      haad 					add_to_lvmqueue(free_fd, NULL, 0, NULL);
    746  1.1      haad 					break;
    747  1.1      haad 				}
    748  1.1      haad 
    749  1.1      haad 				if (FD_ISSET(thisfd->fd, &in)) {
    750  1.1      haad 					struct local_client *newfd = NULL;
    751  1.1      haad 					int ret;
    752  1.1      haad 
    753  1.1      haad 					/* Do callback */
    754  1.1      haad 					ret =
    755  1.1      haad 					    thisfd->callback(thisfd, buf,
    756  1.1      haad 							     sizeof(buf), csid,
    757  1.1      haad 							     &newfd);
    758  1.1      haad 					/* Ignore EAGAIN */
    759  1.1      haad 					if (ret < 0 && (errno == EAGAIN ||
    760  1.1      haad 							errno == EINTR)) continue;
    761  1.1      haad 
    762  1.1      haad 					/* Got error or EOF: Remove it from the list safely */
    763  1.1      haad 					if (ret <= 0) {
    764  1.1      haad 						struct local_client *free_fd;
    765  1.1      haad 						int type = thisfd->type;
    766  1.1      haad 
    767  1.1      haad 						/* If the cluster socket shuts down, so do we */
    768  1.1      haad 						if (type == CLUSTER_MAIN_SOCK ||
    769  1.1      haad 						    type == CLUSTER_INTERNAL)
    770  1.1      haad 							goto closedown;
    771  1.1      haad 
    772  1.1      haad 						DEBUGLOG("ret == %d, errno = %d. removing client\n",
    773  1.1      haad 							 ret, errno);
    774  1.1      haad 						lastfd->next = thisfd->next;
    775  1.1      haad 						free_fd = thisfd;
    776  1.1      haad 						thisfd = lastfd;
    777  1.1      haad 						close(free_fd->fd);
    778  1.1      haad 
    779  1.1      haad 						/* Queue cleanup, this also frees the client struct */
    780  1.1      haad 						add_to_lvmqueue(free_fd, NULL, 0, NULL);
    781  1.1      haad 						break;
    782  1.1      haad 					}
    783  1.1      haad 
    784  1.1      haad 					/* New client...simply add it to the list */
    785  1.1      haad 					if (newfd) {
    786  1.1      haad 						newfd->next = thisfd->next;
    787  1.1      haad 						thisfd->next = newfd;
    788  1.1      haad 						break;
    789  1.1      haad 					}
    790  1.1      haad 				}
    791  1.1      haad 				lastfd = thisfd;
    792  1.1      haad 			}
    793  1.1      haad 		}
    794  1.1      haad 
    795  1.1      haad 		/* Select timed out. Check for clients that have been waiting too long for a response */
    796  1.1      haad 		if (select_status == 0) {
    797  1.1      haad 			time_t the_time = time(NULL);
    798  1.1      haad 
    799  1.1      haad 			for (thisfd = &local_client_head; thisfd != NULL;
    800  1.1      haad 			     thisfd = thisfd->next) {
    801  1.1      haad 				if (thisfd->type == LOCAL_SOCK
    802  1.1      haad 				    && thisfd->bits.localsock.sent_out
    803  1.1      haad 				    && thisfd->bits.localsock.sent_time +
    804  1.1      haad 				    cmd_timeout < the_time
    805  1.1      haad 				    && thisfd->bits.localsock.
    806  1.1      haad 				    expected_replies !=
    807  1.1      haad 				    thisfd->bits.localsock.num_replies) {
    808  1.1      haad 					/* Send timed out message + replies we already have */
    809  1.1      haad 					DEBUGLOG
    810  1.1      haad 					    ("Request timed-out (send: %ld, now: %ld)\n",
    811  1.1      haad 					     thisfd->bits.localsock.sent_time,
    812  1.1      haad 					     the_time);
    813  1.1      haad 
    814  1.1      haad 					thisfd->bits.localsock.all_success = 0;
    815  1.1      haad 
    816  1.1      haad 					request_timed_out(thisfd);
    817  1.1      haad 				}
    818  1.1      haad 			}
    819  1.1      haad 		}
    820  1.1      haad 		if (select_status < 0) {
    821  1.1      haad 			if (errno == EINTR)
    822  1.1      haad 				continue;
    823  1.1      haad 
    824  1.1      haad #ifdef DEBUG
    825  1.1      haad 			perror("select error");
    826  1.1      haad 			exit(-1);
    827  1.1      haad #endif
    828  1.1      haad 		}
    829  1.1      haad 	}
    830  1.1      haad 
    831  1.1      haad       closedown:
    832  1.1      haad 	clops->cluster_closedown();
    833  1.1      haad 	close(local_sock);
    834  1.1      haad }
    835  1.1      haad 
    836  1.1      haad static __attribute__ ((noreturn)) void wait_for_child(int c_pipe, int timeout)
    837  1.1      haad {
    838  1.1      haad 	int child_status;
    839  1.1      haad 	int sstat;
    840  1.1      haad 	fd_set fds;
    841  1.1      haad 	struct timeval tv = {timeout, 0};
    842  1.1      haad 
    843  1.1      haad 	FD_ZERO(&fds);
    844  1.1      haad 	FD_SET(c_pipe, &fds);
    845  1.1      haad 
    846  1.1      haad 	sstat = select(c_pipe+1, &fds, NULL, NULL, timeout? &tv: NULL);
    847  1.1      haad 	if (sstat == 0) {
    848  1.1      haad 		fprintf(stderr, "clvmd startup timed out\n");
    849  1.1      haad 		exit(DFAIL_TIMEOUT);
    850  1.1      haad 	}
    851  1.1      haad 	if (sstat == 1) {
    852  1.1      haad 		if (read(c_pipe, &child_status, sizeof(child_status)) !=
    853  1.1      haad 		    sizeof(child_status)) {
    854  1.1      haad 
    855  1.1      haad 			fprintf(stderr, "clvmd failed in initialisation\n");
    856  1.1      haad 			exit(DFAIL_INIT);
    857  1.1      haad 		}
    858  1.1      haad 		else {
    859  1.1      haad 			switch (child_status) {
    860  1.1      haad 			case SUCCESS:
    861  1.1      haad 				break;
    862  1.1      haad 			case DFAIL_INIT:
    863  1.1      haad 				fprintf(stderr, "clvmd failed in initialisation\n");
    864  1.1      haad 				break;
    865  1.1      haad 			case DFAIL_LOCAL_SOCK:
    866  1.1      haad 				fprintf(stderr, "clvmd could not create local socket\n");
    867  1.1      haad 				fprintf(stderr, "Another clvmd is probably already running\n");
    868  1.1      haad 				break;
    869  1.1      haad 			case DFAIL_CLUSTER_IF:
    870  1.1      haad 				fprintf(stderr, "clvmd could not connect to cluster manager\n");
    871  1.1      haad 				fprintf(stderr, "Consult syslog for more information\n");
    872  1.1      haad 				break;
    873  1.1      haad 			case DFAIL_MALLOC:
    874  1.1      haad 				fprintf(stderr, "clvmd failed, not enough memory\n");
    875  1.1      haad 				break;
    876  1.1      haad 			default:
    877  1.1      haad 				fprintf(stderr, "clvmd failed, error was %d\n", child_status);
    878  1.1      haad 				break;
    879  1.1      haad 			}
    880  1.1      haad 			exit(child_status);
    881  1.1      haad 		}
    882  1.1      haad 	}
    883  1.1      haad 	fprintf(stderr, "clvmd startup, select failed: %s\n", strerror(errno));
    884  1.1      haad 	exit(DFAIL_INIT);
    885  1.1      haad }
    886  1.1      haad 
    887  1.1      haad /*
    888  1.1      haad  * Fork into the background and detach from our parent process.
    889  1.1      haad  * In the interests of user-friendliness we wait for the daemon
    890  1.1      haad  * to complete initialisation before returning its status
    891  1.1      haad  * the the user.
    892  1.1      haad  */
    893  1.1      haad static void be_daemon(int timeout)
    894  1.1      haad {
    895  1.1      haad         pid_t pid;
    896  1.1      haad 	int devnull = open("/dev/null", O_RDWR);
    897  1.1      haad 	if (devnull == -1) {
    898  1.1      haad 		perror("Can't open /dev/null");
    899  1.1      haad 		exit(3);
    900  1.1      haad 	}
    901  1.1      haad 
    902  1.1      haad 	pipe(child_pipe);
    903  1.1      haad 
    904  1.1      haad 	switch (pid = fork()) {
    905  1.1      haad 	case -1:
    906  1.1      haad 		perror("clvmd: can't fork");
    907  1.1      haad 		exit(2);
    908  1.1      haad 
    909  1.1      haad 	case 0:		/* Child */
    910  1.1      haad 	        close(child_pipe[0]);
    911  1.1      haad 		break;
    912  1.1      haad 
    913  1.1      haad 	default:       /* Parent */
    914  1.1      haad 		close(child_pipe[1]);
    915  1.1      haad 		wait_for_child(child_pipe[0], timeout);
    916  1.1      haad 	}
    917  1.1      haad 
    918  1.1      haad 	/* Detach ourself from the calling environment */
    919  1.1      haad 	if (close(0) || close(1) || close(2)) {
    920  1.1      haad 		perror("Error closing terminal FDs");
    921  1.1      haad 		exit(4);
    922  1.1      haad 	}
    923  1.1      haad 	setsid();
    924  1.1      haad 
    925  1.1      haad 	if (dup2(devnull, 0) < 0 || dup2(devnull, 1) < 0
    926  1.1      haad 	    || dup2(devnull, 2) < 0) {
    927  1.2  dholland 		int e = errno;
    928  1.1      haad 		perror("Error setting terminal FDs to /dev/null");
    929  1.2  dholland 		log_error("Error setting terminal FDs to /dev/null: %s", strerror(e));
    930  1.1      haad 		exit(5);
    931  1.1      haad 	}
    932  1.1      haad 	if (chdir("/")) {
    933  1.2  dholland 		log_error("Error setting current directory to /: %s", strerror(e));
    934  1.1      haad 		exit(6);
    935  1.1      haad 	}
    936  1.1      haad 
    937  1.1      haad }
    938  1.1      haad 
    939  1.1      haad /* Called when we have a read from the local socket.
    940  1.1      haad    was in the main loop but it's grown up and is a big girl now */
    941  1.1      haad static int read_from_local_sock(struct local_client *thisfd)
    942  1.1      haad {
    943  1.1      haad 	int len;
    944  1.1      haad 	int argslen;
    945  1.1      haad 	int missing_len;
    946  1.1      haad 	char buffer[PIPE_BUF];
    947  1.1      haad 
    948  1.1      haad 	len = read(thisfd->fd, buffer, sizeof(buffer));
    949  1.1      haad 	if (len == -1 && errno == EINTR)
    950  1.1      haad 		return 1;
    951  1.1      haad 
    952  1.1      haad 	DEBUGLOG("Read on local socket %d, len = %d\n", thisfd->fd, len);
    953  1.1      haad 
    954  1.1      haad 	/* EOF or error on socket */
    955  1.1      haad 	if (len <= 0) {
    956  1.1      haad 		int *status;
    957  1.1      haad 		int jstat;
    958  1.1      haad 
    959  1.1      haad 		DEBUGLOG("EOF on local socket: inprogress=%d\n",
    960  1.1      haad 			 thisfd->bits.localsock.in_progress);
    961  1.1      haad 
    962  1.1      haad 		thisfd->bits.localsock.finished = 1;
    963  1.1      haad 
    964  1.1      haad 		/* If the client went away in mid command then tidy up */
    965  1.1      haad 		if (thisfd->bits.localsock.in_progress) {
    966  1.1      haad 			pthread_kill(thisfd->bits.localsock.threadid, SIGUSR2);
    967  1.1      haad 			pthread_mutex_lock(&thisfd->bits.localsock.mutex);
    968  1.1      haad 			thisfd->bits.localsock.state = POST_COMMAND;
    969  1.1      haad 			pthread_cond_signal(&thisfd->bits.localsock.cond);
    970  1.1      haad 			pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
    971  1.1      haad 
    972  1.1      haad 			/* Free any unsent buffers */
    973  1.1      haad 			free_reply(thisfd);
    974  1.1      haad 		}
    975  1.1      haad 
    976  1.1      haad 		/* Kill the subthread & free resources */
    977  1.1      haad 		if (thisfd->bits.localsock.threadid) {
    978  1.1      haad 			DEBUGLOG("Waiting for child thread\n");
    979  1.1      haad 			pthread_mutex_lock(&thisfd->bits.localsock.mutex);
    980  1.1      haad 			thisfd->bits.localsock.state = PRE_COMMAND;
    981  1.1      haad 			pthread_cond_signal(&thisfd->bits.localsock.cond);
    982  1.1      haad 			pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
    983  1.1      haad 
    984  1.1      haad 			jstat =
    985  1.1      haad 			    pthread_join(thisfd->bits.localsock.threadid,
    986  1.1      haad 					 (void **) &status);
    987  1.1      haad 			DEBUGLOG("Joined child thread\n");
    988  1.1      haad 
    989  1.1      haad 			thisfd->bits.localsock.threadid = 0;
    990  1.1      haad 			pthread_cond_destroy(&thisfd->bits.localsock.cond);
    991  1.1      haad 			pthread_mutex_destroy(&thisfd->bits.localsock.mutex);
    992  1.1      haad 
    993  1.1      haad 			/* Remove the pipe client */
    994  1.1      haad 			if (thisfd->bits.localsock.pipe_client != NULL) {
    995  1.1      haad 				struct local_client *newfd;
    996  1.1      haad 				struct local_client *lastfd = NULL;
    997  1.1      haad 				struct local_client *free_fd = NULL;
    998  1.1      haad 
    999  1.1      haad 				close(thisfd->bits.localsock.pipe_client->fd);	/* Close pipe */
   1000  1.1      haad 				close(thisfd->bits.localsock.pipe);
   1001  1.1      haad 
   1002  1.1      haad 				/* Remove pipe client */
   1003  1.1      haad 				for (newfd = &local_client_head; newfd != NULL;
   1004  1.1      haad 				     newfd = newfd->next) {
   1005  1.1      haad 					if (thisfd->bits.localsock.
   1006  1.1      haad 					    pipe_client == newfd) {
   1007  1.1      haad 						thisfd->bits.localsock.
   1008  1.1      haad 						    pipe_client = NULL;
   1009  1.1      haad 
   1010  1.1      haad 						lastfd->next = newfd->next;
   1011  1.1      haad 						free_fd = newfd;
   1012  1.1      haad 						newfd->next = lastfd;
   1013  1.1      haad 						free(free_fd);
   1014  1.1      haad 						break;
   1015  1.1      haad 					}
   1016  1.1      haad 					lastfd = newfd;
   1017  1.1      haad 				}
   1018  1.1      haad 			}
   1019  1.1      haad 		}
   1020  1.1      haad 
   1021  1.1      haad 		/* Free the command buffer */
   1022  1.1      haad 		free(thisfd->bits.localsock.cmd);
   1023  1.1      haad 
   1024  1.1      haad 		/* Clear out the cross-link */
   1025  1.1      haad 		if (thisfd->bits.localsock.pipe_client != NULL)
   1026  1.1      haad 			thisfd->bits.localsock.pipe_client->bits.pipe.client =
   1027  1.1      haad 			    NULL;
   1028  1.1      haad 
   1029  1.1      haad 		close(thisfd->fd);
   1030  1.1      haad 		return 0;
   1031  1.1      haad 	} else {
   1032  1.1      haad 		int comms_pipe[2];
   1033  1.1      haad 		struct local_client *newfd;
   1034  1.1      haad 		char csid[MAX_CSID_LEN];
   1035  1.1      haad 		struct clvm_header *inheader;
   1036  1.1      haad 		int status;
   1037  1.1      haad 
   1038  1.1      haad 		inheader = (struct clvm_header *) buffer;
   1039  1.1      haad 
   1040  1.1      haad 		/* Fill in the client ID */
   1041  1.1      haad 		inheader->clientid = htonl(thisfd->fd);
   1042  1.1      haad 
   1043  1.1      haad 		/* If we are already busy then return an error */
   1044  1.1      haad 		if (thisfd->bits.localsock.in_progress) {
   1045  1.1      haad 			struct clvm_header reply;
   1046  1.1      haad 			reply.cmd = CLVMD_CMD_REPLY;
   1047  1.1      haad 			reply.status = EBUSY;
   1048  1.1      haad 			reply.arglen = 0;
   1049  1.1      haad 			reply.flags = 0;
   1050  1.1      haad 			send_message(&reply, sizeof(reply), our_csid,
   1051  1.1      haad 				     thisfd->fd,
   1052  1.1      haad 				     "Error sending EBUSY reply to local user");
   1053  1.1      haad 			return len;
   1054  1.1      haad 		}
   1055  1.1      haad 
   1056  1.1      haad 		/* Free any old buffer space */
   1057  1.1      haad 		free(thisfd->bits.localsock.cmd);
   1058  1.1      haad 
   1059  1.1      haad 		/* See if we have the whole message */
   1060  1.1      haad 		argslen =
   1061  1.1      haad 		    len - strlen(inheader->node) - sizeof(struct clvm_header);
   1062  1.1      haad 		missing_len = inheader->arglen - argslen;
   1063  1.1      haad 
   1064  1.1      haad 		if (missing_len < 0)
   1065  1.1      haad 			missing_len = 0;
   1066  1.1      haad 
   1067  1.1      haad 		/* Save the message */
   1068  1.1      haad 		thisfd->bits.localsock.cmd = malloc(len + missing_len);
   1069  1.1      haad 
   1070  1.1      haad 		if (!thisfd->bits.localsock.cmd) {
   1071  1.1      haad 			struct clvm_header reply;
   1072  1.1      haad 			reply.cmd = CLVMD_CMD_REPLY;
   1073  1.1      haad 			reply.status = ENOMEM;
   1074  1.1      haad 			reply.arglen = 0;
   1075  1.1      haad 			reply.flags = 0;
   1076  1.1      haad 			send_message(&reply, sizeof(reply), our_csid,
   1077  1.1      haad 				     thisfd->fd,
   1078  1.1      haad 				     "Error sending ENOMEM reply to local user");
   1079  1.1      haad 			return 0;
   1080  1.1      haad 		}
   1081  1.1      haad 		memcpy(thisfd->bits.localsock.cmd, buffer, len);
   1082  1.1      haad 		thisfd->bits.localsock.cmd_len = len + missing_len;
   1083  1.1      haad 		inheader = (struct clvm_header *) thisfd->bits.localsock.cmd;
   1084  1.1      haad 
   1085  1.1      haad 		/* If we don't have the full message then read the rest now */
   1086  1.1      haad 		if (missing_len) {
   1087  1.1      haad 			char *argptr =
   1088  1.1      haad 			    inheader->node + strlen(inheader->node) + 1;
   1089  1.1      haad 
   1090  1.1      haad 			while (missing_len > 0 && len >= 0) {
   1091  1.1      haad 				DEBUGLOG
   1092  1.1      haad 				    ("got %d bytes, need another %d (total %d)\n",
   1093  1.1      haad 				     argslen, missing_len, inheader->arglen);
   1094  1.1      haad 				len = read(thisfd->fd, argptr + argslen,
   1095  1.1      haad 					   missing_len);
   1096  1.1      haad 				if (len >= 0) {
   1097  1.1      haad 					missing_len -= len;
   1098  1.1      haad 					argslen += len;
   1099  1.1      haad 				}
   1100  1.1      haad 			}
   1101  1.1      haad 		}
   1102  1.1      haad 
   1103  1.1      haad 		/* Initialise and lock the mutex so the subthread will wait after
   1104  1.1      haad 		   finishing the PRE routine */
   1105  1.1      haad 		if (!thisfd->bits.localsock.threadid) {
   1106  1.1      haad 			pthread_mutex_init(&thisfd->bits.localsock.mutex, NULL);
   1107  1.1      haad 			pthread_cond_init(&thisfd->bits.localsock.cond, NULL);
   1108  1.1      haad 			pthread_mutex_init(&thisfd->bits.localsock.reply_mutex, NULL);
   1109  1.1      haad 		}
   1110  1.1      haad 
   1111  1.1      haad 		/* Only run the command if all the cluster nodes are running CLVMD */
   1112  1.1      haad 		if (((inheader->flags & CLVMD_FLAG_LOCAL) == 0) &&
   1113  1.1      haad 		    (check_all_clvmds_running(thisfd) == -1)) {
   1114  1.1      haad 			thisfd->bits.localsock.expected_replies = 0;
   1115  1.1      haad 			thisfd->bits.localsock.num_replies = 0;
   1116  1.1      haad 			send_local_reply(thisfd, EHOSTDOWN, thisfd->fd);
   1117  1.1      haad 			return len;
   1118  1.1      haad 		}
   1119  1.1      haad 
   1120  1.1      haad 		/* Check the node name for validity */
   1121  1.1      haad 		if (inheader->node[0] && clops->csid_from_name(csid, inheader->node)) {
   1122  1.1      haad 			/* Error, node is not in the cluster */
   1123  1.1      haad 			struct clvm_header reply;
   1124  1.1      haad 			DEBUGLOG("Unknown node: '%s'\n", inheader->node);
   1125  1.1      haad 
   1126  1.1      haad 			reply.cmd = CLVMD_CMD_REPLY;
   1127  1.1      haad 			reply.status = ENOENT;
   1128  1.1      haad 			reply.flags = 0;
   1129  1.1      haad 			reply.arglen = 0;
   1130  1.1      haad 			send_message(&reply, sizeof(reply), our_csid,
   1131  1.1      haad 				     thisfd->fd,
   1132  1.1      haad 				     "Error sending ENOENT reply to local user");
   1133  1.1      haad 			thisfd->bits.localsock.expected_replies = 0;
   1134  1.1      haad 			thisfd->bits.localsock.num_replies = 0;
   1135  1.1      haad 			thisfd->bits.localsock.in_progress = FALSE;
   1136  1.1      haad 			thisfd->bits.localsock.sent_out = FALSE;
   1137  1.1      haad 			return len;
   1138  1.1      haad 		}
   1139  1.1      haad 
   1140  1.1      haad 		/* If we already have a subthread then just signal it to start */
   1141  1.1      haad 		if (thisfd->bits.localsock.threadid) {
   1142  1.1      haad 			pthread_mutex_lock(&thisfd->bits.localsock.mutex);
   1143  1.1      haad 			thisfd->bits.localsock.state = PRE_COMMAND;
   1144  1.1      haad 			pthread_cond_signal(&thisfd->bits.localsock.cond);
   1145  1.1      haad 			pthread_mutex_unlock(&thisfd->bits.localsock.mutex);
   1146  1.1      haad 			return len;
   1147  1.1      haad 		}
   1148  1.1      haad 
   1149  1.1      haad 		/* Create a pipe and add the reading end to our FD list */
   1150  1.1      haad 		pipe(comms_pipe);
   1151  1.1      haad 		newfd = malloc(sizeof(struct local_client));
   1152  1.1      haad 		if (!newfd) {
   1153  1.1      haad 			struct clvm_header reply;
   1154  1.1      haad 			close(comms_pipe[0]);
   1155  1.1      haad 			close(comms_pipe[1]);
   1156  1.1      haad 
   1157  1.1      haad 			reply.cmd = CLVMD_CMD_REPLY;
   1158  1.1      haad 			reply.status = ENOMEM;
   1159  1.1      haad 			reply.arglen = 0;
   1160  1.1      haad 			reply.flags = 0;
   1161  1.1      haad 			send_message(&reply, sizeof(reply), our_csid,
   1162  1.1      haad 				     thisfd->fd,
   1163  1.1      haad 				     "Error sending ENOMEM reply to local user");
   1164  1.1      haad 			return len;
   1165  1.1      haad 		}
   1166  1.1      haad 		DEBUGLOG("creating pipe, [%d, %d]\n", comms_pipe[0],
   1167  1.1      haad 			 comms_pipe[1]);
   1168  1.1      haad 		newfd->fd = comms_pipe[0];
   1169  1.1      haad 		newfd->removeme = 0;
   1170  1.1      haad 		newfd->type = THREAD_PIPE;
   1171  1.1      haad 		newfd->callback = local_pipe_callback;
   1172  1.1      haad 		newfd->next = thisfd->next;
   1173  1.1      haad 		newfd->bits.pipe.client = thisfd;
   1174  1.1      haad 		newfd->bits.pipe.threadid = 0;
   1175  1.1      haad 		thisfd->next = newfd;
   1176  1.1      haad 
   1177  1.1      haad 		/* Store a cross link to the pipe */
   1178  1.1      haad 		thisfd->bits.localsock.pipe_client = newfd;
   1179  1.1      haad 
   1180  1.1      haad 		thisfd->bits.localsock.pipe = comms_pipe[1];
   1181  1.1      haad 
   1182  1.1      haad 		/* Make sure the thread has a copy of it's own ID */
   1183  1.1      haad 		newfd->bits.pipe.threadid = thisfd->bits.localsock.threadid;
   1184  1.1      haad 
   1185  1.1      haad 		/* Run the pre routine */
   1186  1.1      haad 		thisfd->bits.localsock.in_progress = TRUE;
   1187  1.1      haad 		thisfd->bits.localsock.state = PRE_COMMAND;
   1188  1.1      haad 		DEBUGLOG("Creating pre&post thread\n");
   1189  1.1      haad 		status = pthread_create(&thisfd->bits.localsock.threadid, NULL,
   1190  1.1      haad 			       pre_and_post_thread, thisfd);
   1191  1.1      haad 		DEBUGLOG("Created pre&post thread, state = %d\n", status);
   1192  1.1      haad 	}
   1193  1.1      haad 	return len;
   1194  1.1      haad }
   1195  1.1      haad 
   1196  1.1      haad /* Add a file descriptor from the cluster or comms interface to
   1197  1.1      haad    our list of FDs for select
   1198  1.1      haad */
   1199  1.1      haad int add_client(struct local_client *new_client)
   1200  1.1      haad {
   1201  1.1      haad 	new_client->next = local_client_head.next;
   1202  1.1      haad 	local_client_head.next = new_client;
   1203  1.1      haad 
   1204  1.1      haad 	return 0;
   1205  1.1      haad }
   1206  1.1      haad 
   1207  1.1      haad /* Called when the pre-command has completed successfully - we
   1208  1.1      haad    now execute the real command on all the requested nodes */
   1209  1.1      haad static int distribute_command(struct local_client *thisfd)
   1210  1.1      haad {
   1211  1.1      haad 	struct clvm_header *inheader =
   1212  1.1      haad 	    (struct clvm_header *) thisfd->bits.localsock.cmd;
   1213  1.1      haad 	int len = thisfd->bits.localsock.cmd_len;
   1214  1.1      haad 
   1215  1.1      haad 	thisfd->xid = global_xid++;
   1216  1.1      haad 	DEBUGLOG("distribute command: XID = %d\n", thisfd->xid);
   1217  1.1      haad 
   1218  1.1      haad 	/* Forward it to other nodes in the cluster if needed */
   1219  1.1      haad 	if (!(inheader->flags & CLVMD_FLAG_LOCAL)) {
   1220  1.1      haad 		/* if node is empty then do it on the whole cluster */
   1221  1.1      haad 		if (inheader->node[0] == '\0') {
   1222  1.1      haad 			thisfd->bits.localsock.expected_replies =
   1223  1.1      haad 			    clops->get_num_nodes();
   1224  1.1      haad 			thisfd->bits.localsock.num_replies = 0;
   1225  1.1      haad 			thisfd->bits.localsock.sent_time = time(NULL);
   1226  1.1      haad 			thisfd->bits.localsock.in_progress = TRUE;
   1227  1.1      haad 			thisfd->bits.localsock.sent_out = TRUE;
   1228  1.1      haad 
   1229  1.1      haad 			/* Do it here first */
   1230  1.1      haad 			add_to_lvmqueue(thisfd, inheader, len, NULL);
   1231  1.1      haad 
   1232  1.1      haad 			DEBUGLOG("Sending message to all cluster nodes\n");
   1233  1.1      haad 			inheader->xid = thisfd->xid;
   1234  1.1      haad 			send_message(inheader, len, NULL, -1,
   1235  1.1      haad 				     "Error forwarding message to cluster");
   1236  1.1      haad 		} else {
   1237  1.1      haad                         /* Do it on a single node */
   1238  1.1      haad 			char csid[MAX_CSID_LEN];
   1239  1.1      haad 
   1240  1.1      haad 			if (clops->csid_from_name(csid, inheader->node)) {
   1241  1.1      haad 				/* This has already been checked so should not happen */
   1242  1.1      haad 				return 0;
   1243  1.1      haad 			} else {
   1244  1.1      haad 			        /* OK, found a node... */
   1245  1.1      haad 				thisfd->bits.localsock.expected_replies = 1;
   1246  1.1      haad 				thisfd->bits.localsock.num_replies = 0;
   1247  1.1      haad 				thisfd->bits.localsock.in_progress = TRUE;
   1248  1.1      haad 
   1249  1.1      haad 				/* Are we the requested node ?? */
   1250  1.1      haad 				if (memcmp(csid, our_csid, max_csid_len) == 0) {
   1251  1.1      haad 					DEBUGLOG("Doing command on local node only\n");
   1252  1.1      haad 					add_to_lvmqueue(thisfd, inheader, len, NULL);
   1253  1.1      haad 				} else {
   1254  1.1      haad 					DEBUGLOG("Sending message to single node: %s\n",
   1255  1.1      haad 						 inheader->node);
   1256  1.1      haad 					inheader->xid = thisfd->xid;
   1257  1.1      haad 					send_message(inheader, len,
   1258  1.1      haad 						     csid, -1,
   1259  1.1      haad 						     "Error forwarding message to cluster node");
   1260  1.1      haad 				}
   1261  1.1      haad 			}
   1262  1.1      haad 		}
   1263  1.1      haad 	} else {
   1264  1.1      haad 		/* Local explicitly requested, ignore nodes */
   1265  1.1      haad 		thisfd->bits.localsock.in_progress = TRUE;
   1266  1.1      haad 		thisfd->bits.localsock.expected_replies = 1;
   1267  1.1      haad 		thisfd->bits.localsock.num_replies = 0;
   1268  1.1      haad 		add_to_lvmqueue(thisfd, inheader, len, NULL);
   1269  1.1      haad 	}
   1270  1.1      haad 	return 0;
   1271  1.1      haad }
   1272  1.1      haad 
   1273  1.1      haad /* Process a command from a remote node and return the result */
   1274  1.1      haad static void process_remote_command(struct clvm_header *msg, int msglen, int fd,
   1275  1.1      haad 			    	   const char *csid)
   1276  1.1      haad {
   1277  1.1      haad 	char *replyargs;
   1278  1.1      haad 	char nodename[max_cluster_member_name_len];
   1279  1.1      haad 	int replylen = 0;
   1280  1.1      haad 	int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
   1281  1.1      haad 	int status;
   1282  1.1      haad 	int msg_malloced = 0;
   1283  1.1      haad 
   1284  1.1      haad 	/* Get the node name as we /may/ need it later */
   1285  1.1      haad 	clops->name_from_csid(csid, nodename);
   1286  1.1      haad 
   1287  1.1      haad 	DEBUGLOG("process_remote_command %s for clientid 0x%x XID %d on node %s\n",
   1288  1.1      haad 		 decode_cmd(msg->cmd), msg->clientid, msg->xid, nodename);
   1289  1.1      haad 
   1290  1.1      haad 	/* Check for GOAWAY and sulk */
   1291  1.1      haad 	if (msg->cmd == CLVMD_CMD_GOAWAY) {
   1292  1.1      haad 
   1293  1.1      haad 		DEBUGLOG("Told to go away by %s\n", nodename);
   1294  1.1      haad 		log_error("Told to go away by %s\n", nodename);
   1295  1.1      haad 		exit(99);
   1296  1.1      haad 	}
   1297  1.1      haad 
   1298  1.1      haad 	/* Version check is internal - don't bother exposing it in
   1299  1.1      haad 	   clvmd-command.c */
   1300  1.1      haad 	if (msg->cmd == CLVMD_CMD_VERSION) {
   1301  1.1      haad 		int version_nums[3];
   1302  1.1      haad 		char node[256];
   1303  1.1      haad 
   1304  1.1      haad 		memcpy(version_nums, msg->args, sizeof(version_nums));
   1305  1.1      haad 
   1306  1.1      haad 		clops->name_from_csid(csid, node);
   1307  1.1      haad 		DEBUGLOG("Remote node %s is version %d.%d.%d\n",
   1308  1.1      haad 			 node,
   1309  1.1      haad 			 ntohl(version_nums[0]),
   1310  1.1      haad 			 ntohl(version_nums[1]), ntohl(version_nums[2]));
   1311  1.1      haad 
   1312  1.1      haad 		if (ntohl(version_nums[0]) != CLVMD_MAJOR_VERSION) {
   1313  1.1      haad 			struct clvm_header byebyemsg;
   1314  1.1      haad 			DEBUGLOG
   1315  1.1      haad 			    ("Telling node %s to go away because of incompatible version number\n",
   1316  1.1      haad 			     node);
   1317  1.1      haad 			log_notice
   1318  1.1      haad 			    ("Telling node %s to go away because of incompatible version number %d.%d.%d\n",
   1319  1.1      haad 			     node, ntohl(version_nums[0]),
   1320  1.1      haad 			     ntohl(version_nums[1]), ntohl(version_nums[2]));
   1321  1.1      haad 
   1322  1.1      haad 			byebyemsg.cmd = CLVMD_CMD_GOAWAY;
   1323  1.1      haad 			byebyemsg.status = 0;
   1324  1.1      haad 			byebyemsg.flags = 0;
   1325  1.1      haad 			byebyemsg.arglen = 0;
   1326  1.1      haad 			byebyemsg.clientid = 0;
   1327  1.1      haad 			clops->cluster_send_message(&byebyemsg, sizeof(byebyemsg),
   1328  1.1      haad 					     our_csid,
   1329  1.1      haad 					     "Error Sending GOAWAY message");
   1330  1.1      haad 		} else {
   1331  1.1      haad 			clops->add_up_node(csid);
   1332  1.1      haad 		}
   1333  1.1      haad 		return;
   1334  1.1      haad 	}
   1335  1.1      haad 
   1336  1.1      haad 	/* Allocate a default reply buffer */
   1337  1.1      haad 	replyargs = malloc(max_cluster_message - sizeof(struct clvm_header));
   1338  1.1      haad 
   1339  1.1      haad 	if (replyargs != NULL) {
   1340  1.1      haad 		/* Run the command */
   1341  1.1      haad 		status =
   1342  1.1      haad 		    do_command(NULL, msg, msglen, &replyargs, buflen,
   1343  1.1      haad 			       &replylen);
   1344  1.1      haad 	} else {
   1345  1.1      haad 		status = ENOMEM;
   1346  1.1      haad 	}
   1347  1.1      haad 
   1348  1.1      haad 	/* If it wasn't a reply, then reply */
   1349  1.1      haad 	if (msg->cmd != CLVMD_CMD_REPLY) {
   1350  1.1      haad 		char *aggreply;
   1351  1.1      haad 
   1352  1.1      haad 		aggreply =
   1353  1.1      haad 		    realloc(replyargs, replylen + sizeof(struct clvm_header));
   1354  1.1      haad 		if (aggreply) {
   1355  1.1      haad 			struct clvm_header *agghead =
   1356  1.1      haad 			    (struct clvm_header *) aggreply;
   1357  1.1      haad 
   1358  1.1      haad 			replyargs = aggreply;
   1359  1.1      haad 			/* Move it up so there's room for a header in front of the data */
   1360  1.1      haad 			memmove(aggreply + offsetof(struct clvm_header, args),
   1361  1.1      haad 				replyargs, replylen);
   1362  1.1      haad 
   1363  1.1      haad 			agghead->xid = msg->xid;
   1364  1.1      haad 			agghead->cmd = CLVMD_CMD_REPLY;
   1365  1.1      haad 			agghead->status = status;
   1366  1.1      haad 			agghead->flags = 0;
   1367  1.1      haad 			agghead->clientid = msg->clientid;
   1368  1.1      haad 			agghead->arglen = replylen;
   1369  1.1      haad 			agghead->node[0] = '\0';
   1370  1.1      haad 			send_message(aggreply,
   1371  1.1      haad 				     sizeof(struct clvm_header) +
   1372  1.1      haad 				     replylen, csid, fd,
   1373  1.1      haad 				     "Error sending command reply");
   1374  1.1      haad 		} else {
   1375  1.1      haad 			struct clvm_header head;
   1376  1.1      haad 
   1377  1.1      haad 			DEBUGLOG("Error attempting to realloc return buffer\n");
   1378  1.1      haad 			/* Return a failure response */
   1379  1.1      haad 			head.cmd = CLVMD_CMD_REPLY;
   1380  1.1      haad 			head.status = ENOMEM;
   1381  1.1      haad 			head.flags = 0;
   1382  1.1      haad 			head.clientid = msg->clientid;
   1383  1.1      haad 			head.arglen = 0;
   1384  1.1      haad 			head.node[0] = '\0';
   1385  1.1      haad 			send_message(&head, sizeof(struct clvm_header), csid,
   1386  1.1      haad 				     fd, "Error sending ENOMEM command reply");
   1387  1.1      haad 			return;
   1388  1.1      haad 		}
   1389  1.1      haad 	}
   1390  1.1      haad 
   1391  1.1      haad 	/* Free buffer if it was malloced */
   1392  1.1      haad 	if (msg_malloced) {
   1393  1.1      haad 		free(msg);
   1394  1.1      haad 	}
   1395  1.1      haad 	free(replyargs);
   1396  1.1      haad }
   1397  1.1      haad 
   1398  1.1      haad /* Add a reply to a command to the list of replies for this client.
   1399  1.1      haad    If we have got a full set then send them to the waiting client down the local
   1400  1.1      haad    socket */
   1401  1.1      haad static void add_reply_to_list(struct local_client *client, int status,
   1402  1.1      haad 			      const char *csid, const char *buf, int len)
   1403  1.1      haad {
   1404  1.1      haad 	struct node_reply *reply;
   1405  1.1      haad 
   1406  1.1      haad 	pthread_mutex_lock(&client->bits.localsock.reply_mutex);
   1407  1.1      haad 
   1408  1.1      haad 	/* Add it to the list of replies */
   1409  1.1      haad 	reply = malloc(sizeof(struct node_reply));
   1410  1.1      haad 	if (reply) {
   1411  1.1      haad 		reply->status = status;
   1412  1.1      haad 		clops->name_from_csid(csid, reply->node);
   1413  1.1      haad 		DEBUGLOG("Reply from node %s: %d bytes\n", reply->node, len);
   1414  1.1      haad 
   1415  1.1      haad 		if (len > 0) {
   1416  1.1      haad 			reply->replymsg = malloc(len);
   1417  1.1      haad 			if (!reply->replymsg) {
   1418  1.1      haad 				reply->status = ENOMEM;
   1419  1.1      haad 			} else {
   1420  1.1      haad 				memcpy(reply->replymsg, buf, len);
   1421  1.1      haad 			}
   1422  1.1      haad 		} else {
   1423  1.1      haad 			reply->replymsg = NULL;
   1424  1.1      haad 		}
   1425  1.1      haad 		/* Hook it onto the reply chain */
   1426  1.1      haad 		reply->next = client->bits.localsock.replies;
   1427  1.1      haad 		client->bits.localsock.replies = reply;
   1428  1.1      haad 	} else {
   1429  1.1      haad 		/* It's all gone horribly wrong... */
   1430  1.1      haad 		pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
   1431  1.1      haad 		send_local_reply(client, ENOMEM, client->fd);
   1432  1.1      haad 		return;
   1433  1.1      haad 	}
   1434  1.1      haad 	DEBUGLOG("Got %d replies, expecting: %d\n",
   1435  1.1      haad 		 client->bits.localsock.num_replies + 1,
   1436  1.1      haad 		 client->bits.localsock.expected_replies);
   1437  1.1      haad 
   1438  1.1      haad 	/* If we have the whole lot then do the post-process */
   1439  1.1      haad 	if (++client->bits.localsock.num_replies ==
   1440  1.1      haad 	    client->bits.localsock.expected_replies) {
   1441  1.1      haad 		/* Post-process the command */
   1442  1.1      haad 		if (client->bits.localsock.threadid) {
   1443  1.1      haad 			pthread_mutex_lock(&client->bits.localsock.mutex);
   1444  1.1      haad 			client->bits.localsock.state = POST_COMMAND;
   1445  1.1      haad 			pthread_cond_signal(&client->bits.localsock.cond);
   1446  1.1      haad 			pthread_mutex_unlock(&client->bits.localsock.mutex);
   1447  1.1      haad 		}
   1448  1.1      haad 	}
   1449  1.1      haad 	pthread_mutex_unlock(&client->bits.localsock.reply_mutex);
   1450  1.1      haad }
   1451  1.1      haad 
   1452  1.1      haad /* This is the thread that runs the PRE and post commands for a particular connection */
   1453  1.1      haad static __attribute__ ((noreturn)) void *pre_and_post_thread(void *arg)
   1454  1.1      haad {
   1455  1.1      haad 	struct local_client *client = (struct local_client *) arg;
   1456  1.1      haad 	int status;
   1457  1.1      haad 	int write_status;
   1458  1.1      haad 	sigset_t ss;
   1459  1.1      haad 	int pipe_fd = client->bits.localsock.pipe;
   1460  1.1      haad 
   1461  1.1      haad 	DEBUGLOG("in sub thread: client = %p\n", client);
   1462  1.1      haad 
   1463  1.1      haad 	/* Don't start until the LVM thread is ready */
   1464  1.1      haad 	pthread_mutex_lock(&lvm_start_mutex);
   1465  1.1      haad 	pthread_mutex_unlock(&lvm_start_mutex);
   1466  1.1      haad 	DEBUGLOG("Sub thread ready for work.\n");
   1467  1.1      haad 
   1468  1.1      haad 	/* Ignore SIGUSR1 (handled by master process) but enable
   1469  1.1      haad 	   SIGUSR2 (kills subthreads) */
   1470  1.1      haad 	sigemptyset(&ss);
   1471  1.1      haad 	sigaddset(&ss, SIGUSR1);
   1472  1.1      haad 	pthread_sigmask(SIG_BLOCK, &ss, NULL);
   1473  1.1      haad 
   1474  1.1      haad 	sigdelset(&ss, SIGUSR1);
   1475  1.1      haad 	sigaddset(&ss, SIGUSR2);
   1476  1.1      haad 	pthread_sigmask(SIG_UNBLOCK, &ss, NULL);
   1477  1.1      haad 
   1478  1.1      haad 	/* Loop around doing PRE and POST functions until the client goes away */
   1479  1.1      haad 	while (!client->bits.localsock.finished) {
   1480  1.1      haad 		/* Execute the code */
   1481  1.1      haad 		status = do_pre_command(client);
   1482  1.1      haad 
   1483  1.1      haad 		if (status)
   1484  1.1      haad 			client->bits.localsock.all_success = 0;
   1485  1.1      haad 
   1486  1.1      haad 		DEBUGLOG("Writing status %d down pipe %d\n", status, pipe_fd);
   1487  1.1      haad 
   1488  1.1      haad 		/* Tell the parent process we have finished this bit */
   1489  1.1      haad 		do {
   1490  1.1      haad 			write_status = write(pipe_fd, &status, sizeof(int));
   1491  1.1      haad 			if (write_status == sizeof(int))
   1492  1.1      haad 				break;
   1493  1.1      haad 			if (write_status < 0 &&
   1494  1.1      haad 			    (errno == EINTR || errno == EAGAIN))
   1495  1.1      haad 				continue;
   1496  1.2  dholland 			log_error("Error sending to pipe: %s\n", strerror(errno));
   1497  1.1      haad 			break;
   1498  1.1      haad 		} while(1);
   1499  1.1      haad 
   1500  1.1      haad 		if (status) {
   1501  1.1      haad 			client->bits.localsock.state = POST_COMMAND;
   1502  1.1      haad 			goto next_pre;
   1503  1.1      haad 		}
   1504  1.1      haad 
   1505  1.1      haad 		/* We may need to wait for the condition variable before running the post command */
   1506  1.1      haad 		pthread_mutex_lock(&client->bits.localsock.mutex);
   1507  1.1      haad 		DEBUGLOG("Waiting to do post command - state = %d\n",
   1508  1.1      haad 			 client->bits.localsock.state);
   1509  1.1      haad 
   1510  1.1      haad 		if (client->bits.localsock.state != POST_COMMAND) {
   1511  1.1      haad 			pthread_cond_wait(&client->bits.localsock.cond,
   1512  1.1      haad 					  &client->bits.localsock.mutex);
   1513  1.1      haad 		}
   1514  1.1      haad 		pthread_mutex_unlock(&client->bits.localsock.mutex);
   1515  1.1      haad 
   1516  1.1      haad 		DEBUGLOG("Got post command condition...\n");
   1517  1.1      haad 
   1518  1.1      haad 		/* POST function must always run, even if the client aborts */
   1519  1.1      haad 		status = 0;
   1520  1.1      haad 		do_post_command(client);
   1521  1.1      haad 
   1522  1.1      haad 		do {
   1523  1.1      haad 			write_status = write(pipe_fd, &status, sizeof(int));
   1524  1.1      haad 			if (write_status == sizeof(int))
   1525  1.1      haad 				break;
   1526  1.1      haad 			if (write_status < 0 &&
   1527  1.1      haad 			    (errno == EINTR || errno == EAGAIN))
   1528  1.1      haad 				continue;
   1529  1.2  dholland 			log_error("Error sending to pipe: %s\n", strerror(errno));
   1530  1.1      haad 			break;
   1531  1.1      haad 		} while(1);
   1532  1.1      haad next_pre:
   1533  1.1      haad 		DEBUGLOG("Waiting for next pre command\n");
   1534  1.1      haad 
   1535  1.1      haad 		pthread_mutex_lock(&client->bits.localsock.mutex);
   1536  1.1      haad 		if (client->bits.localsock.state != PRE_COMMAND &&
   1537  1.1      haad 		    !client->bits.localsock.finished) {
   1538  1.1      haad 			pthread_cond_wait(&client->bits.localsock.cond,
   1539  1.1      haad 					  &client->bits.localsock.mutex);
   1540  1.1      haad 		}
   1541  1.1      haad 		pthread_mutex_unlock(&client->bits.localsock.mutex);
   1542  1.1      haad 
   1543  1.1      haad 		DEBUGLOG("Got pre command condition...\n");
   1544  1.1      haad 	}
   1545  1.1      haad 	DEBUGLOG("Subthread finished\n");
   1546  1.1      haad 	pthread_exit((void *) 0);
   1547  1.1      haad }
   1548  1.1      haad 
   1549  1.1      haad /* Process a command on the local node and store the result */
   1550  1.1      haad static int process_local_command(struct clvm_header *msg, int msglen,
   1551  1.1      haad 				 struct local_client *client,
   1552  1.1      haad 				 unsigned short xid)
   1553  1.1      haad {
   1554  1.1      haad 	char *replybuf = malloc(max_cluster_message);
   1555  1.1      haad 	int buflen = max_cluster_message - sizeof(struct clvm_header) - 1;
   1556  1.1      haad 	int replylen = 0;
   1557  1.1      haad 	int status;
   1558  1.1      haad 
   1559  1.1      haad 	DEBUGLOG("process_local_command: %s msg=%p, msglen =%d, client=%p\n",
   1560  1.1      haad 		 decode_cmd(msg->cmd), msg, msglen, client);
   1561  1.1      haad 
   1562  1.1      haad 	if (replybuf == NULL)
   1563  1.1      haad 		return -1;
   1564  1.1      haad 
   1565  1.1      haad 	status = do_command(client, msg, msglen, &replybuf, buflen, &replylen);
   1566  1.1      haad 
   1567  1.1      haad 	if (status)
   1568  1.1      haad 		client->bits.localsock.all_success = 0;
   1569  1.1      haad 
   1570  1.1      haad 	/* If we took too long then discard the reply */
   1571  1.1      haad 	if (xid == client->xid) {
   1572  1.1      haad 		add_reply_to_list(client, status, our_csid, replybuf, replylen);
   1573  1.1      haad 	} else {
   1574  1.1      haad 		DEBUGLOG
   1575  1.1      haad 		    ("Local command took too long, discarding xid %d, current is %d\n",
   1576  1.1      haad 		     xid, client->xid);
   1577  1.1      haad 	}
   1578  1.1      haad 
   1579  1.1      haad 	free(replybuf);
   1580  1.1      haad 	return status;
   1581  1.1      haad }
   1582  1.1      haad 
   1583  1.1      haad static int process_reply(const struct clvm_header *msg, int msglen, const char *csid)
   1584  1.1      haad {
   1585  1.1      haad 	struct local_client *client = NULL;
   1586  1.1      haad 
   1587  1.1      haad 	client = find_client(msg->clientid);
   1588  1.1      haad 	if (!client) {
   1589  1.1      haad 		DEBUGLOG("Got message for unknown client 0x%x\n",
   1590  1.1      haad 			 msg->clientid);
   1591  1.1      haad 		log_error("Got message for unknown client 0x%x\n",
   1592  1.1      haad 			  msg->clientid);
   1593  1.1      haad 		return -1;
   1594  1.1      haad 	}
   1595  1.1      haad 
   1596  1.1      haad 	if (msg->status)
   1597  1.1      haad 		client->bits.localsock.all_success = 0;
   1598  1.1      haad 
   1599  1.1      haad 	/* Gather replies together for this client id */
   1600  1.1      haad 	if (msg->xid == client->xid) {
   1601  1.1      haad 		add_reply_to_list(client, msg->status, csid, msg->args,
   1602  1.1      haad 				  msg->arglen);
   1603  1.1      haad 	} else {
   1604  1.1      haad 		DEBUGLOG("Discarding reply with old XID %d, current = %d\n",
   1605  1.1      haad 			 msg->xid, client->xid);
   1606  1.1      haad 	}
   1607  1.1      haad 	return 0;
   1608  1.1      haad }
   1609  1.1      haad 
   1610  1.1      haad /* Send an aggregated reply back to the client */
   1611  1.1      haad static void send_local_reply(struct local_client *client, int status, int fd)
   1612  1.1      haad {
   1613  1.1      haad 	struct clvm_header *clientreply;
   1614  1.1      haad 	struct node_reply *thisreply = client->bits.localsock.replies;
   1615  1.1      haad 	char *replybuf;
   1616  1.1      haad 	char *ptr;
   1617  1.1      haad 	int message_len = 0;
   1618  1.1      haad 
   1619  1.1      haad 	DEBUGLOG("Send local reply\n");
   1620  1.1      haad 
   1621  1.1      haad 	/* Work out the total size of the reply */
   1622  1.1      haad 	while (thisreply) {
   1623  1.1      haad 		if (thisreply->replymsg)
   1624  1.1      haad 			message_len += strlen(thisreply->replymsg) + 1;
   1625  1.1      haad 		else
   1626  1.1      haad 			message_len++;
   1627  1.1      haad 
   1628  1.1      haad 		message_len += strlen(thisreply->node) + 1 + sizeof(int);
   1629  1.1      haad 
   1630  1.1      haad 		thisreply = thisreply->next;
   1631  1.1      haad 	}
   1632  1.1      haad 
   1633  1.1      haad 	/* Add in the size of our header */
   1634  1.1      haad 	message_len = message_len + sizeof(struct clvm_header) + 1;
   1635  1.1      haad 	replybuf = malloc(message_len);
   1636  1.1      haad 
   1637  1.1      haad 	clientreply = (struct clvm_header *) replybuf;
   1638  1.1      haad 	clientreply->status = status;
   1639  1.1      haad 	clientreply->cmd = CLVMD_CMD_REPLY;
   1640  1.1      haad 	clientreply->node[0] = '\0';
   1641  1.1      haad 	clientreply->flags = 0;
   1642  1.1      haad 
   1643  1.1      haad 	ptr = clientreply->args;
   1644  1.1      haad 
   1645  1.1      haad 	/* Add in all the replies, and free them as we go */
   1646  1.1      haad 	thisreply = client->bits.localsock.replies;
   1647  1.1      haad 	while (thisreply) {
   1648  1.1      haad 		struct node_reply *tempreply = thisreply;
   1649  1.1      haad 
   1650  1.1      haad 		strcpy(ptr, thisreply->node);
   1651  1.1      haad 		ptr += strlen(thisreply->node) + 1;
   1652  1.1      haad 
   1653  1.1      haad 		if (thisreply->status)
   1654  1.1      haad 			clientreply->flags |= CLVMD_FLAG_NODEERRS;
   1655  1.1      haad 
   1656  1.1      haad 		memcpy(ptr, &thisreply->status, sizeof(int));
   1657  1.1      haad 		ptr += sizeof(int);
   1658  1.1      haad 
   1659  1.1      haad 		if (thisreply->replymsg) {
   1660  1.1      haad 			strcpy(ptr, thisreply->replymsg);
   1661  1.1      haad 			ptr += strlen(thisreply->replymsg) + 1;
   1662  1.1      haad 		} else {
   1663  1.1      haad 			ptr[0] = '\0';
   1664  1.1      haad 			ptr++;
   1665  1.1      haad 		}
   1666  1.1      haad 		thisreply = thisreply->next;
   1667  1.1      haad 
   1668  1.1      haad 		free(tempreply->replymsg);
   1669  1.1      haad 		free(tempreply);
   1670  1.1      haad 	}
   1671  1.1      haad 
   1672  1.1      haad 	/* Terminate with an empty node name */
   1673  1.1      haad 	*ptr = '\0';
   1674  1.1      haad 
   1675  1.1      haad 	clientreply->arglen = ptr - clientreply->args + 1;
   1676  1.1      haad 
   1677  1.1      haad 	/* And send it */
   1678  1.1      haad 	send_message(replybuf, message_len, our_csid, fd,
   1679  1.1      haad 		     "Error sending REPLY to client");
   1680  1.1      haad 	free(replybuf);
   1681  1.1      haad 
   1682  1.1      haad 	/* Reset comms variables */
   1683  1.1      haad 	client->bits.localsock.replies = NULL;
   1684  1.1      haad 	client->bits.localsock.expected_replies = 0;
   1685  1.1      haad 	client->bits.localsock.in_progress = FALSE;
   1686  1.1      haad 	client->bits.localsock.sent_out = FALSE;
   1687  1.1      haad }
   1688  1.1      haad 
   1689  1.1      haad /* Just free a reply chain baceuse it wasn't used. */
   1690  1.1      haad static void free_reply(struct local_client *client)
   1691  1.1      haad {
   1692  1.1      haad 	/* Add in all the replies, and free them as we go */
   1693  1.1      haad 	struct node_reply *thisreply = client->bits.localsock.replies;
   1694  1.1      haad 	while (thisreply) {
   1695  1.1      haad 		struct node_reply *tempreply = thisreply;
   1696  1.1      haad 
   1697  1.1      haad 		thisreply = thisreply->next;
   1698  1.1      haad 
   1699  1.1      haad 		free(tempreply->replymsg);
   1700  1.1      haad 		free(tempreply);
   1701  1.1      haad 	}
   1702  1.1      haad 	client->bits.localsock.replies = NULL;
   1703  1.1      haad }
   1704  1.1      haad 
   1705  1.1      haad /* Send our version number to the cluster */
   1706  1.1      haad static void send_version_message()
   1707  1.1      haad {
   1708  1.1      haad 	char message[sizeof(struct clvm_header) + sizeof(int) * 3];
   1709  1.1      haad 	struct clvm_header *msg = (struct clvm_header *) message;
   1710  1.1      haad 	int version_nums[3];
   1711  1.1      haad 
   1712  1.1      haad 	msg->cmd = CLVMD_CMD_VERSION;
   1713  1.1      haad 	msg->status = 0;
   1714  1.1      haad 	msg->flags = 0;
   1715  1.1      haad 	msg->clientid = 0;
   1716  1.1      haad 	msg->arglen = sizeof(version_nums);
   1717  1.1      haad 
   1718  1.1      haad 	version_nums[0] = htonl(CLVMD_MAJOR_VERSION);
   1719  1.1      haad 	version_nums[1] = htonl(CLVMD_MINOR_VERSION);
   1720  1.1      haad 	version_nums[2] = htonl(CLVMD_PATCH_VERSION);
   1721  1.1      haad 
   1722  1.1      haad 	memcpy(&msg->args, version_nums, sizeof(version_nums));
   1723  1.1      haad 
   1724  1.1      haad 	hton_clvm(msg);
   1725  1.1      haad 
   1726  1.1      haad 	clops->cluster_send_message(message, sizeof(message), NULL,
   1727  1.1      haad 			     "Error Sending version number");
   1728  1.1      haad }
   1729  1.1      haad 
   1730  1.1      haad /* Send a message to either a local client or another server */
   1731  1.1      haad static int send_message(void *buf, int msglen, const char *csid, int fd,
   1732  1.1      haad 			const char *errtext)
   1733  1.1      haad {
   1734  1.1      haad 	int len = 0;
   1735  1.1      haad 	int saved_errno = 0;
   1736  1.1      haad 	struct timespec delay;
   1737  1.1      haad 	struct timespec remtime;
   1738  1.1      haad 
   1739  1.1      haad 	int retry_cnt = 0;
   1740  1.1      haad 
   1741  1.1      haad 	/* Send remote messages down the cluster socket */
   1742  1.1      haad 	if (csid == NULL || !ISLOCAL_CSID(csid)) {
   1743  1.1      haad 		hton_clvm((struct clvm_header *) buf);
   1744  1.1      haad 		return clops->cluster_send_message(buf, msglen, csid, errtext);
   1745  1.1      haad 	} else {
   1746  1.1      haad 		int ptr = 0;
   1747  1.1      haad 
   1748  1.1      haad 		/* Make sure it all goes */
   1749  1.1      haad 		do {
   1750  1.1      haad 			if (retry_cnt > MAX_RETRIES)
   1751  1.1      haad 			{
   1752  1.1      haad 				errno = saved_errno;
   1753  1.1      haad 				log_error("%s", errtext);
   1754  1.1      haad 				errno = saved_errno;
   1755  1.1      haad 				break;
   1756  1.1      haad 			}
   1757  1.1      haad 
   1758  1.1      haad 			len = write(fd, buf + ptr, msglen - ptr);
   1759  1.1      haad 
   1760  1.1      haad 			if (len <= 0) {
   1761  1.1      haad 				if (errno == EINTR)
   1762  1.1      haad 					continue;
   1763  1.1      haad 				if (errno == EAGAIN ||
   1764  1.1      haad 				    errno == EIO ||
   1765  1.1      haad 				    errno == ENOSPC) {
   1766  1.1      haad 					saved_errno = errno;
   1767  1.1      haad 					retry_cnt++;
   1768  1.1      haad 
   1769  1.1      haad 					delay.tv_sec = 0;
   1770  1.1      haad 					delay.tv_nsec = 100000;
   1771  1.1      haad 					remtime.tv_sec = 0;
   1772  1.1      haad 					remtime.tv_nsec = 0;
   1773  1.1      haad 					(void) nanosleep (&delay, &remtime);
   1774  1.1      haad 
   1775  1.1      haad 					continue;
   1776  1.1      haad 				}
   1777  1.1      haad 				log_error("%s", errtext);
   1778  1.1      haad 				break;
   1779  1.1      haad 			}
   1780  1.1      haad 			ptr += len;
   1781  1.1      haad 		} while (ptr < msglen);
   1782  1.1      haad 	}
   1783  1.1      haad 	return len;
   1784  1.1      haad }
   1785  1.1      haad 
   1786  1.1      haad static int process_work_item(struct lvm_thread_cmd *cmd)
   1787  1.1      haad {
   1788  1.1      haad 	/* If msg is NULL then this is a cleanup request */
   1789  1.1      haad 	if (cmd->msg == NULL) {
   1790  1.1      haad 		DEBUGLOG("process_work_item: free fd %d\n", cmd->client->fd);
   1791  1.1      haad 		cmd_client_cleanup(cmd->client);
   1792  1.1      haad 		free(cmd->client);
   1793  1.1      haad 		return 0;
   1794  1.1      haad 	}
   1795  1.1      haad 
   1796  1.1      haad 	if (!cmd->remote) {
   1797  1.1      haad 		DEBUGLOG("process_work_item: local\n");
   1798  1.1      haad 		process_local_command(cmd->msg, cmd->msglen, cmd->client,
   1799  1.1      haad 				      cmd->xid);
   1800  1.1      haad 	} else {
   1801  1.1      haad 		DEBUGLOG("process_work_item: remote\n");
   1802  1.1      haad 		process_remote_command(cmd->msg, cmd->msglen, cmd->client->fd,
   1803  1.1      haad 				       cmd->csid);
   1804  1.1      haad 	}
   1805  1.1      haad 	return 0;
   1806  1.1      haad }
   1807  1.1      haad 
   1808  1.1      haad /*
   1809  1.1      haad  * Routine that runs in the "LVM thread".
   1810  1.1      haad  */
   1811  1.2  dholland static void lvm_thread_fn(void *arg)
   1812  1.1      haad {
   1813  1.1      haad 	struct dm_list *cmdl, *tmp;
   1814  1.1      haad 	sigset_t ss;
   1815  1.1      haad 	int using_gulm = (int)(long)arg;
   1816  1.1      haad 
   1817  1.1      haad 	DEBUGLOG("LVM thread function started\n");
   1818  1.1      haad 
   1819  1.1      haad 	/* Ignore SIGUSR1 & 2 */
   1820  1.1      haad 	sigemptyset(&ss);
   1821  1.1      haad 	sigaddset(&ss, SIGUSR1);
   1822  1.1      haad 	sigaddset(&ss, SIGUSR2);
   1823  1.1      haad 	pthread_sigmask(SIG_BLOCK, &ss, NULL);
   1824  1.1      haad 
   1825  1.1      haad 	/* Initialise the interface to liblvm */
   1826  1.1      haad 	init_lvm(using_gulm);
   1827  1.1      haad 
   1828  1.1      haad 	/* Allow others to get moving */
   1829  1.1      haad 	pthread_mutex_unlock(&lvm_start_mutex);
   1830  1.1      haad 
   1831  1.1      haad 	/* Now wait for some actual work */
   1832  1.1      haad 	for (;;) {
   1833  1.1      haad 		DEBUGLOG("LVM thread waiting for work\n");
   1834  1.1      haad 
   1835  1.1      haad 		pthread_mutex_lock(&lvm_thread_mutex);
   1836  1.1      haad 		if (dm_list_empty(&lvm_cmd_head))
   1837  1.1      haad 			pthread_cond_wait(&lvm_thread_cond, &lvm_thread_mutex);
   1838  1.1      haad 
   1839  1.1      haad 		dm_list_iterate_safe(cmdl, tmp, &lvm_cmd_head) {
   1840  1.1      haad 			struct lvm_thread_cmd *cmd;
   1841  1.1      haad 
   1842  1.1      haad 			cmd =
   1843  1.1      haad 			    dm_list_struct_base(cmdl, struct lvm_thread_cmd, list);
   1844  1.1      haad 			dm_list_del(&cmd->list);
   1845  1.1      haad 			pthread_mutex_unlock(&lvm_thread_mutex);
   1846  1.1      haad 
   1847  1.1      haad 			process_work_item(cmd);
   1848  1.1      haad 			free(cmd->msg);
   1849  1.1      haad 			free(cmd);
   1850  1.1      haad 
   1851  1.1      haad 			pthread_mutex_lock(&lvm_thread_mutex);
   1852  1.1      haad 		}
   1853  1.1      haad 		pthread_mutex_unlock(&lvm_thread_mutex);
   1854  1.1      haad 	}
   1855  1.1      haad }
   1856  1.1      haad 
   1857  1.1      haad /* Pass down some work to the LVM thread */
   1858  1.1      haad static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg,
   1859  1.1      haad 			   int msglen, const char *csid)
   1860  1.1      haad {
   1861  1.1      haad 	struct lvm_thread_cmd *cmd;
   1862  1.1      haad 
   1863  1.1      haad 	cmd = malloc(sizeof(struct lvm_thread_cmd));
   1864  1.1      haad 	if (!cmd)
   1865  1.1      haad 		return ENOMEM;
   1866  1.1      haad 
   1867  1.1      haad 	if (msglen) {
   1868  1.1      haad 		cmd->msg = malloc(msglen);
   1869  1.1      haad 		if (!cmd->msg) {
   1870  1.1      haad 			log_error("Unable to allocate buffer space\n");
   1871  1.1      haad 			free(cmd);
   1872  1.1      haad 			return -1;
   1873  1.1      haad 		}
   1874  1.1      haad 		memcpy(cmd->msg, msg, msglen);
   1875  1.1      haad 	}
   1876  1.1      haad 	else {
   1877  1.1      haad 		cmd->msg = NULL;
   1878  1.1      haad 	}
   1879  1.1      haad 	cmd->client = client;
   1880  1.1      haad 	cmd->msglen = msglen;
   1881  1.1      haad 	cmd->xid = client->xid;
   1882  1.1      haad 
   1883  1.1      haad 	if (csid) {
   1884  1.1      haad 		memcpy(cmd->csid, csid, max_csid_len);
   1885  1.1      haad 		cmd->remote = 1;
   1886  1.1      haad 	} else {
   1887  1.1      haad 		cmd->remote = 0;
   1888  1.1      haad 	}
   1889  1.1      haad 
   1890  1.1      haad 	DEBUGLOG
   1891  1.1      haad 	    ("add_to_lvmqueue: cmd=%p. client=%p, msg=%p, len=%d, csid=%p, xid=%d\n",
   1892  1.1      haad 	     cmd, client, msg, msglen, csid, cmd->xid);
   1893  1.1      haad 	pthread_mutex_lock(&lvm_thread_mutex);
   1894  1.1      haad 	dm_list_add(&lvm_cmd_head, &cmd->list);
   1895  1.1      haad 	pthread_cond_signal(&lvm_thread_cond);
   1896  1.1      haad 	pthread_mutex_unlock(&lvm_thread_mutex);
   1897  1.1      haad 
   1898  1.1      haad 	return 0;
   1899  1.1      haad }
   1900  1.1      haad 
   1901  1.1      haad /* Return 0 if we can talk to an existing clvmd */
   1902  1.1      haad static int check_local_clvmd(void)
   1903  1.1      haad {
   1904  1.1      haad 	int local_socket;
   1905  1.1      haad 	struct sockaddr_un sockaddr;
   1906  1.1      haad 	int ret = 0;
   1907  1.1      haad 
   1908  1.1      haad 	/* Open local socket */
   1909  1.1      haad 	if ((local_socket = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) {
   1910  1.1      haad 		return -1;
   1911  1.1      haad 	}
   1912  1.1      haad 
   1913  1.1      haad 	memset(&sockaddr, 0, sizeof(sockaddr));
   1914  1.1      haad 	memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME));
   1915  1.1      haad 	sockaddr.sun_family = AF_UNIX;
   1916  1.1      haad 
   1917  1.1      haad 	if (connect(local_socket,(struct sockaddr *) &sockaddr,
   1918  1.1      haad 		    sizeof(sockaddr))) {
   1919  1.1      haad 		ret = -1;
   1920  1.1      haad 	}
   1921  1.1      haad 
   1922  1.1      haad 	close(local_socket);
   1923  1.1      haad 	return ret;
   1924  1.1      haad }
   1925  1.1      haad 
   1926  1.1      haad 
   1927  1.1      haad /* Open the local socket, that's the one we talk to libclvm down */
   1928  1.1      haad static int open_local_sock()
   1929  1.1      haad {
   1930  1.1      haad 	int local_socket;
   1931  1.1      haad 	struct sockaddr_un sockaddr;
   1932  1.1      haad 
   1933  1.1      haad 	/* Open local socket */
   1934  1.1      haad 	if (CLVMD_SOCKNAME[0] != '\0')
   1935  1.1      haad 		unlink(CLVMD_SOCKNAME);
   1936  1.1      haad 	local_socket = socket(PF_UNIX, SOCK_STREAM, 0);
   1937  1.1      haad 	if (local_socket < 0) {
   1938  1.2  dholland 		log_error("Can't create local socket: %s", strerror(errno));
   1939  1.1      haad 		return -1;
   1940  1.1      haad 	}
   1941  1.1      haad 	/* Set Close-on-exec & non-blocking */
   1942  1.1      haad 	fcntl(local_socket, F_SETFD, 1);
   1943  1.1      haad 	fcntl(local_socket, F_SETFL, fcntl(local_socket, F_GETFL, 0) | O_NONBLOCK);
   1944  1.1      haad 
   1945  1.1      haad 	memset(&sockaddr, 0, sizeof(sockaddr));
   1946  1.1      haad 	memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME));
   1947  1.1      haad 	sockaddr.sun_family = AF_UNIX;
   1948  1.1      haad 	if (bind(local_socket, (struct sockaddr *) &sockaddr, sizeof(sockaddr))) {
   1949  1.2  dholland 		log_error("can't bind local socket: %s", strerror(errno));
   1950  1.1      haad 		close(local_socket);
   1951  1.1      haad 		return -1;
   1952  1.1      haad 	}
   1953  1.1      haad 	if (listen(local_socket, 1) != 0) {
   1954  1.2  dholland 		log_error("listen local: %s", strerror(errno));
   1955  1.1      haad 		close(local_socket);
   1956  1.1      haad 		return -1;
   1957  1.1      haad 	}
   1958  1.1      haad 	if (CLVMD_SOCKNAME[0] != '\0')
   1959  1.1      haad 		chmod(CLVMD_SOCKNAME, 0600);
   1960  1.1      haad 
   1961  1.1      haad 	return local_socket;
   1962  1.1      haad }
   1963  1.1      haad 
   1964  1.1      haad void process_message(struct local_client *client, const char *buf, int len,
   1965  1.1      haad 		     const char *csid)
   1966  1.1      haad {
   1967  1.1      haad 	struct clvm_header *inheader;
   1968  1.1      haad 
   1969  1.1      haad 	inheader = (struct clvm_header *) buf;
   1970  1.1      haad 	ntoh_clvm(inheader);	/* Byteswap fields */
   1971  1.1      haad 	if (inheader->cmd == CLVMD_CMD_REPLY)
   1972  1.1      haad 		process_reply(inheader, len, csid);
   1973  1.1      haad 	else
   1974  1.1      haad 		add_to_lvmqueue(client, inheader, len, csid);
   1975  1.1      haad }
   1976  1.1      haad 
   1977  1.1      haad 
   1978  1.1      haad static void check_all_callback(struct local_client *client, const char *csid,
   1979  1.1      haad 			       int node_up)
   1980  1.1      haad {
   1981  1.1      haad 	if (!node_up)
   1982  1.1      haad 		add_reply_to_list(client, EHOSTDOWN, csid, "CLVMD not running",
   1983  1.1      haad 				  18);
   1984  1.1      haad }
   1985  1.1      haad 
   1986  1.1      haad /* Check to see if all CLVMDs are running (ie one on
   1987  1.1      haad    every node in the cluster).
   1988  1.1      haad    If not, returns -1 and prints out a list of errant nodes */
   1989  1.1      haad static int check_all_clvmds_running(struct local_client *client)
   1990  1.1      haad {
   1991  1.1      haad 	DEBUGLOG("check_all_clvmds_running\n");
   1992  1.1      haad 	return clops->cluster_do_node_callback(client, check_all_callback);
   1993  1.1      haad }
   1994  1.1      haad 
   1995  1.1      haad /* Return a local_client struct given a client ID.
   1996  1.1      haad    client IDs are in network byte order */
   1997  1.1      haad static struct local_client *find_client(int clientid)
   1998  1.1      haad {
   1999  1.1      haad 	struct local_client *thisfd;
   2000  1.1      haad 	for (thisfd = &local_client_head; thisfd != NULL; thisfd = thisfd->next) {
   2001  1.1      haad 		if (thisfd->fd == ntohl(clientid))
   2002  1.1      haad 			return thisfd;
   2003  1.1      haad 	}
   2004  1.1      haad 	return NULL;
   2005  1.1      haad }
   2006  1.1      haad 
   2007  1.1      haad /* Byte-swapping routines for the header so we
   2008  1.1      haad    work in a heterogeneous environment */
   2009  1.1      haad static void hton_clvm(struct clvm_header *hdr)
   2010  1.1      haad {
   2011  1.1      haad 	hdr->status = htonl(hdr->status);
   2012  1.1      haad 	hdr->arglen = htonl(hdr->arglen);
   2013  1.1      haad 	hdr->xid = htons(hdr->xid);
   2014  1.1      haad 	/* Don't swap clientid as it's only a token as far as
   2015  1.1      haad 	   remote nodes are concerned */
   2016  1.1      haad }
   2017  1.1      haad 
   2018  1.1      haad static void ntoh_clvm(struct clvm_header *hdr)
   2019  1.1      haad {
   2020  1.1      haad 	hdr->status = ntohl(hdr->status);
   2021  1.1      haad 	hdr->arglen = ntohl(hdr->arglen);
   2022  1.1      haad 	hdr->xid = ntohs(hdr->xid);
   2023  1.1      haad }
   2024  1.1      haad 
   2025  1.1      haad /* Handler for SIGUSR2 - sent to kill subthreads */
   2026  1.1      haad static void sigusr2_handler(int sig)
   2027  1.1      haad {
   2028  1.1      haad 	DEBUGLOG("SIGUSR2 received\n");
   2029  1.1      haad 	return;
   2030  1.1      haad }
   2031  1.1      haad 
   2032  1.1      haad static void sigterm_handler(int sig)
   2033  1.1      haad {
   2034  1.1      haad 	DEBUGLOG("SIGTERM received\n");
   2035  1.1      haad 	quit = 1;
   2036  1.1      haad 	return;
   2037  1.1      haad }
   2038  1.1      haad 
   2039  1.1      haad static void sighup_handler(int sig)
   2040  1.1      haad {
   2041  1.1      haad         DEBUGLOG("got SIGHUP\n");
   2042  1.1      haad 	reread_config = 1;
   2043  1.1      haad }
   2044  1.1      haad 
   2045  1.1      haad int sync_lock(const char *resource, int mode, int flags, int *lockid)
   2046  1.1      haad {
   2047  1.1      haad 	return clops->sync_lock(resource, mode, flags, lockid);
   2048  1.1      haad }
   2049  1.1      haad 
   2050  1.1      haad int sync_unlock(const char *resource, int lockid)
   2051  1.1      haad {
   2052  1.1      haad 	return clops->sync_unlock(resource, lockid);
   2053  1.1      haad }
   2054  1.1      haad 
   2055  1.2  dholland static if_type_t parse_cluster_interface(char *ifname)
   2056  1.2  dholland {
   2057  1.2  dholland 	if_type_t iface = IF_AUTO;
   2058  1.2  dholland 
   2059  1.2  dholland 	if (!strcmp(ifname, "auto"))
   2060  1.2  dholland 		iface = IF_AUTO;
   2061  1.2  dholland 	if (!strcmp(ifname, "cman"))
   2062  1.2  dholland 		iface = IF_CMAN;
   2063  1.2  dholland 	if (!strcmp(ifname, "gulm"))
   2064  1.2  dholland 		iface = IF_GULM;
   2065  1.2  dholland 	if (!strcmp(ifname, "openais"))
   2066  1.2  dholland 		iface = IF_OPENAIS;
   2067  1.2  dholland 	if (!strcmp(ifname, "corosync"))
   2068  1.2  dholland 		iface = IF_COROSYNC;
   2069  1.2  dholland 
   2070  1.2  dholland 	return iface;
   2071  1.2  dholland }
   2072  1.2  dholland 
   2073  1.2  dholland /*
   2074  1.2  dholland  * Try and find a cluster system in corosync's objdb, if it is running. This is
   2075  1.2  dholland  * only called if the command-line option is not present, and if it fails
   2076  1.2  dholland  * we still try the interfaces in order.
   2077  1.2  dholland  */
   2078  1.2  dholland static if_type_t get_cluster_type()
   2079  1.2  dholland {
   2080  1.2  dholland #ifdef HAVE_COROSYNC_CONFDB_H
   2081  1.2  dholland 	confdb_handle_t handle;
   2082  1.2  dholland 	if_type_t type = IF_AUTO;
   2083  1.2  dholland 	int result;
   2084  1.2  dholland 	char buf[255];
   2085  1.2  dholland 	size_t namelen = sizeof(buf);
   2086  1.2  dholland 	hdb_handle_t cluster_handle;
   2087  1.2  dholland 	hdb_handle_t clvmd_handle;
   2088  1.2  dholland 	confdb_callbacks_t callbacks = {
   2089  1.2  dholland 		.confdb_key_change_notify_fn = NULL,
   2090  1.2  dholland 		.confdb_object_create_change_notify_fn = NULL,
   2091  1.2  dholland 		.confdb_object_delete_change_notify_fn = NULL
   2092  1.2  dholland 	};
   2093  1.2  dholland 
   2094  1.2  dholland 	result = confdb_initialize (&handle, &callbacks);
   2095  1.2  dholland         if (result != CS_OK)
   2096  1.2  dholland 		return type;
   2097  1.2  dholland 
   2098  1.2  dholland         result = confdb_object_find_start(handle, OBJECT_PARENT_HANDLE);
   2099  1.2  dholland 	if (result != CS_OK)
   2100  1.2  dholland 		goto out;
   2101  1.2  dholland 
   2102  1.2  dholland         result = confdb_object_find(handle, OBJECT_PARENT_HANDLE, (void *)"cluster", strlen("cluster"), &cluster_handle);
   2103  1.2  dholland         if (result != CS_OK)
   2104  1.2  dholland 		goto out;
   2105  1.2  dholland 
   2106  1.2  dholland         result = confdb_object_find_start(handle, cluster_handle);
   2107  1.2  dholland 	if (result != CS_OK)
   2108  1.2  dholland 		goto out;
   2109  1.2  dholland 
   2110  1.2  dholland         result = confdb_object_find(handle, cluster_handle, (void *)"clvmd", strlen("clvmd"), &clvmd_handle);
   2111  1.2  dholland         if (result != CS_OK)
   2112  1.2  dholland 		goto out;
   2113  1.2  dholland 
   2114  1.2  dholland         result = confdb_key_get(handle, clvmd_handle, (void *)"interface", strlen("interface"), buf, &namelen);
   2115  1.2  dholland         if (result != CS_OK)
   2116  1.2  dholland 		goto out;
   2117  1.2  dholland 
   2118  1.2  dholland 	buf[namelen] = '\0';
   2119  1.2  dholland 	type = parse_cluster_interface(buf);
   2120  1.2  dholland 	DEBUGLOG("got interface type '%s' from confdb\n", buf);
   2121  1.2  dholland out:
   2122  1.2  dholland 	confdb_finalize(handle);
   2123  1.2  dholland 	return type;
   2124  1.2  dholland #else
   2125  1.2  dholland 	return IF_AUTO;
   2126  1.2  dholland #endif
   2127  1.2  dholland }
   2128