1 1.2 dholland /* $NetBSD: clvmd.c,v 1.2 2015/11/09 00:53:57 dholland Exp $ */ 2 1.1 haad 3 1.1 haad /* 4 1.1 haad * Copyright (C) 2002-2004 Sistina Software, Inc. All rights reserved. 5 1.2 dholland * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 6 1.1 haad * 7 1.1 haad * This file is part of LVM2. 8 1.1 haad * 9 1.1 haad * This copyrighted material is made available to anyone wishing to use, 10 1.1 haad * modify, copy, or redistribute it subject to the terms and conditions 11 1.1 haad * of the GNU General Public License v.2. 12 1.1 haad * 13 1.1 haad * You should have received a copy of the GNU General Public License 14 1.1 haad * along with this program; if not, write to the Free Software Foundation, 15 1.1 haad * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 16 1.1 haad */ 17 1.1 haad 18 1.1 haad /* 19 1.1 haad * CLVMD: Cluster LVM daemon 20 1.1 haad */ 21 1.1 haad 22 1.1 haad #define _GNU_SOURCE 23 1.1 haad #define _FILE_OFFSET_BITS 64 24 1.1 haad 25 1.1 haad #include <configure.h> 26 1.1 haad #include <libdevmapper.h> 27 1.1 haad 28 1.1 haad #include <pthread.h> 29 1.1 haad #include <sys/types.h> 30 1.1 haad #include <sys/stat.h> 31 1.1 haad #include <sys/socket.h> 32 1.1 haad #include <sys/uio.h> 33 1.1 haad #include <sys/un.h> 34 1.1 haad #include <sys/time.h> 35 1.1 haad #include <sys/ioctl.h> 36 1.1 haad #include <sys/utsname.h> 37 1.1 haad #include <netinet/in.h> 38 1.1 haad #include <stdio.h> 39 1.1 haad #include <stdlib.h> 40 1.1 haad #include <stddef.h> 41 1.1 haad #include <stdarg.h> 42 1.1 haad #include <signal.h> 43 1.1 haad #include <unistd.h> 44 1.1 haad #include <fcntl.h> 45 1.1 haad #include <getopt.h> 46 1.1 haad #include <syslog.h> 47 1.1 haad #include <errno.h> 48 1.1 haad #include <limits.h> 49 1.2 dholland #ifdef HAVE_COROSYNC_CONFDB_H 50 1.2 dholland #include <corosync/confdb.h> 51 1.2 dholland #endif 52 1.1 haad 53 1.1 haad #include "clvmd-comms.h" 54 1.1 haad #include "lvm-functions.h" 55 1.1 haad #include "clvm.h" 56 1.2 dholland #include "lvm-version.h" 57 1.1 haad #include "clvmd.h" 58 1.1 haad #include "refresh_clvmd.h" 59 1.1 haad #include "lvm-logging.h" 60 1.1 haad 61 1.1 haad #ifndef TRUE 62 1.1 haad #define TRUE 1 63 1.1 haad #endif 64 1.1 haad #ifndef FALSE 65 1.1 haad #define FALSE 0 66 1.1 haad #endif 67 1.1 haad 68 1.1 haad #define MAX_RETRIES 4 69 1.1 haad 70 1.1 haad #define ISLOCAL_CSID(c) (memcmp(c, our_csid, max_csid_len) == 0) 71 1.1 haad 72 1.1 haad /* Head of the fd list. Also contains 73 1.1 haad the cluster_socket details */ 74 1.1 haad static struct local_client local_client_head; 75 1.1 haad 76 1.1 haad static unsigned short global_xid = 0; /* Last transaction ID issued */ 77 1.1 haad 78 1.1 haad struct cluster_ops *clops = NULL; 79 1.1 haad 80 1.1 haad static char our_csid[MAX_CSID_LEN]; 81 1.1 haad static unsigned max_csid_len; 82 1.1 haad static unsigned max_cluster_message; 83 1.1 haad static unsigned max_cluster_member_name_len; 84 1.1 haad 85 1.1 haad /* Structure of items on the LVM thread list */ 86 1.1 haad struct lvm_thread_cmd { 87 1.1 haad struct dm_list list; 88 1.1 haad 89 1.1 haad struct local_client *client; 90 1.1 haad struct clvm_header *msg; 91 1.1 haad char csid[MAX_CSID_LEN]; 92 1.1 haad int remote; /* Flag */ 93 1.1 haad int msglen; 94 1.1 haad unsigned short xid; 95 1.1 haad }; 96 1.1 haad 97 1.1 haad debug_t debug; 98 1.1 haad static pthread_t lvm_thread; 99 1.1 haad static pthread_mutex_t lvm_thread_mutex; 100 1.1 haad static pthread_cond_t lvm_thread_cond; 101 1.1 haad static pthread_mutex_t lvm_start_mutex; 102 1.1 haad static struct dm_list lvm_cmd_head; 103 1.1 haad static volatile sig_atomic_t quit = 0; 104 1.1 haad static volatile sig_atomic_t reread_config = 0; 105 1.1 haad static int child_pipe[2]; 106 1.1 haad 107 1.1 haad /* Reasons the daemon failed initialisation */ 108 1.1 haad #define DFAIL_INIT 1 109 1.1 haad #define DFAIL_LOCAL_SOCK 2 110 1.1 haad #define DFAIL_CLUSTER_IF 3 111 1.1 haad #define DFAIL_MALLOC 4 112 1.1 haad #define DFAIL_TIMEOUT 5 113 1.1 haad #define SUCCESS 0 114 1.1 haad 115 1.2 dholland typedef enum {IF_AUTO, IF_CMAN, IF_GULM, IF_OPENAIS, IF_COROSYNC} if_type_t; 116 1.2 dholland 117 1.2 dholland typedef void *(lvm_pthread_fn_t)(void*); 118 1.2 dholland 119 1.1 haad /* Prototypes for code further down */ 120 1.1 haad static void sigusr2_handler(int sig); 121 1.1 haad static void sighup_handler(int sig); 122 1.1 haad static void sigterm_handler(int sig); 123 1.1 haad static void send_local_reply(struct local_client *client, int status, 124 1.1 haad int clientid); 125 1.1 haad static void free_reply(struct local_client *client); 126 1.1 haad static void send_version_message(void); 127 1.1 haad static void *pre_and_post_thread(void *arg); 128 1.1 haad static int send_message(void *buf, int msglen, const char *csid, int fd, 129 1.1 haad const char *errtext); 130 1.1 haad static int read_from_local_sock(struct local_client *thisfd); 131 1.1 haad static int process_local_command(struct clvm_header *msg, int msglen, 132 1.1 haad struct local_client *client, 133 1.1 haad unsigned short xid); 134 1.1 haad static void process_remote_command(struct clvm_header *msg, int msglen, int fd, 135 1.1 haad const char *csid); 136 1.1 haad static int process_reply(const struct clvm_header *msg, int msglen, 137 1.1 haad const char *csid); 138 1.1 haad static int open_local_sock(void); 139 1.1 haad static int check_local_clvmd(void); 140 1.1 haad static struct local_client *find_client(int clientid); 141 1.1 haad static void main_loop(int local_sock, int cmd_timeout); 142 1.1 haad static void be_daemon(int start_timeout); 143 1.1 haad static int check_all_clvmds_running(struct local_client *client); 144 1.1 haad static int local_rendezvous_callback(struct local_client *thisfd, char *buf, 145 1.1 haad int len, const char *csid, 146 1.1 haad struct local_client **new_client); 147 1.2 dholland static void lvm_thread_fn(void *) __attribute__ ((noreturn)); 148 1.1 haad static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg, 149 1.1 haad int msglen, const char *csid); 150 1.1 haad static int distribute_command(struct local_client *thisfd); 151 1.1 haad static void hton_clvm(struct clvm_header *hdr); 152 1.1 haad static void ntoh_clvm(struct clvm_header *hdr); 153 1.1 haad static void add_reply_to_list(struct local_client *client, int status, 154 1.1 haad const char *csid, const char *buf, int len); 155 1.2 dholland static if_type_t parse_cluster_interface(char *ifname); 156 1.2 dholland static if_type_t get_cluster_type(void); 157 1.1 haad 158 1.1 haad static void usage(char *prog, FILE *file) 159 1.1 haad { 160 1.1 haad fprintf(file, "Usage:\n"); 161 1.1 haad fprintf(file, "%s [Vhd]\n", prog); 162 1.1 haad fprintf(file, "\n"); 163 1.1 haad fprintf(file, " -V Show version of clvmd\n"); 164 1.1 haad fprintf(file, " -h Show this help information\n"); 165 1.1 haad fprintf(file, " -d Set debug level\n"); 166 1.1 haad fprintf(file, " If starting clvmd then don't fork, run in the foreground\n"); 167 1.1 haad fprintf(file, " -R Tell all running clvmds in the cluster to reload their device cache\n"); 168 1.1 haad fprintf(file, " -C Sets debug level (from -d) on all clvmd instances clusterwide\n"); 169 1.1 haad fprintf(file, " -t<secs> Command timeout (default 60 seconds)\n"); 170 1.1 haad fprintf(file, " -T<secs> Startup timeout (default none)\n"); 171 1.2 dholland fprintf(file, " -I<cmgr> Cluster manager (default: auto)\n"); 172 1.2 dholland fprintf(file, " Available cluster managers: "); 173 1.2 dholland #ifdef USE_COROSYNC 174 1.2 dholland fprintf(file, "corosync "); 175 1.2 dholland #endif 176 1.2 dholland #ifdef USE_CMAN 177 1.2 dholland fprintf(file, "cman "); 178 1.2 dholland #endif 179 1.2 dholland #ifdef USE_OPENAIS 180 1.2 dholland fprintf(file, "openais "); 181 1.2 dholland #endif 182 1.2 dholland #ifdef USE_GULM 183 1.2 dholland fprintf(file, "gulm "); 184 1.2 dholland #endif 185 1.1 haad fprintf(file, "\n"); 186 1.1 haad } 187 1.1 haad 188 1.1 haad /* Called to signal the parent how well we got on during initialisation */ 189 1.1 haad static void child_init_signal(int status) 190 1.1 haad { 191 1.1 haad if (child_pipe[1]) { 192 1.1 haad write(child_pipe[1], &status, sizeof(status)); 193 1.1 haad close(child_pipe[1]); 194 1.1 haad } 195 1.1 haad if (status) 196 1.1 haad exit(status); 197 1.1 haad } 198 1.1 haad 199 1.1 haad 200 1.1 haad void debuglog(const char *fmt, ...) 201 1.1 haad { 202 1.1 haad time_t P; 203 1.1 haad va_list ap; 204 1.1 haad static int syslog_init = 0; 205 1.1 haad 206 1.1 haad if (debug == DEBUG_STDERR) { 207 1.1 haad va_start(ap,fmt); 208 1.1 haad time(&P); 209 1.1 haad fprintf(stderr, "CLVMD[%x]: %.15s ", (int)pthread_self(), ctime(&P)+4 ); 210 1.1 haad vfprintf(stderr, fmt, ap); 211 1.1 haad va_end(ap); 212 1.1 haad } 213 1.1 haad if (debug == DEBUG_SYSLOG) { 214 1.1 haad if (!syslog_init) { 215 1.1 haad openlog("clvmd", LOG_PID, LOG_DAEMON); 216 1.1 haad syslog_init = 1; 217 1.1 haad } 218 1.1 haad 219 1.1 haad va_start(ap,fmt); 220 1.1 haad vsyslog(LOG_DEBUG, fmt, ap); 221 1.1 haad va_end(ap); 222 1.1 haad } 223 1.1 haad } 224 1.1 haad 225 1.1 haad static const char *decode_cmd(unsigned char cmdl) 226 1.1 haad { 227 1.1 haad static char buf[128]; 228 1.1 haad const char *command; 229 1.1 haad 230 1.1 haad switch (cmdl) { 231 1.2 dholland case CLVMD_CMD_TEST: 232 1.2 dholland command = "TEST"; 233 1.1 haad break; 234 1.2 dholland case CLVMD_CMD_LOCK_VG: 235 1.2 dholland command = "LOCK_VG"; 236 1.1 haad break; 237 1.2 dholland case CLVMD_CMD_LOCK_LV: 238 1.2 dholland command = "LOCK_LV"; 239 1.1 haad break; 240 1.2 dholland case CLVMD_CMD_REFRESH: 241 1.2 dholland command = "REFRESH"; 242 1.1 haad break; 243 1.2 dholland case CLVMD_CMD_SET_DEBUG: 244 1.2 dholland command = "SET_DEBUG"; 245 1.1 haad break; 246 1.2 dholland case CLVMD_CMD_GET_CLUSTERNAME: 247 1.1 haad command = "GET_CLUSTERNAME"; 248 1.1 haad break; 249 1.2 dholland case CLVMD_CMD_VG_BACKUP: 250 1.2 dholland command = "VG_BACKUP"; 251 1.2 dholland break; 252 1.2 dholland case CLVMD_CMD_REPLY: 253 1.2 dholland command = "REPLY"; 254 1.1 haad break; 255 1.2 dholland case CLVMD_CMD_VERSION: 256 1.2 dholland command = "VERSION"; 257 1.1 haad break; 258 1.2 dholland case CLVMD_CMD_GOAWAY: 259 1.2 dholland command = "GOAWAY"; 260 1.1 haad break; 261 1.2 dholland case CLVMD_CMD_LOCK: 262 1.2 dholland command = "LOCK"; 263 1.1 haad break; 264 1.2 dholland case CLVMD_CMD_UNLOCK: 265 1.2 dholland command = "UNLOCK"; 266 1.1 haad break; 267 1.2 dholland case CLVMD_CMD_LOCK_QUERY: 268 1.2 dholland command = "LOCK_QUERY"; 269 1.1 haad break; 270 1.2 dholland default: 271 1.2 dholland command = "unknown"; 272 1.1 haad break; 273 1.1 haad } 274 1.1 haad 275 1.1 haad sprintf(buf, "%s (0x%x)", command, cmdl); 276 1.1 haad 277 1.1 haad return buf; 278 1.1 haad } 279 1.1 haad 280 1.1 haad int main(int argc, char *argv[]) 281 1.1 haad { 282 1.1 haad int local_sock; 283 1.1 haad struct local_client *newfd; 284 1.1 haad struct utsname nodeinfo; 285 1.1 haad signed char opt; 286 1.1 haad int cmd_timeout = DEFAULT_CMD_TIMEOUT; 287 1.1 haad int start_timeout = 0; 288 1.2 dholland if_type_t cluster_iface = IF_AUTO; 289 1.1 haad sigset_t ss; 290 1.1 haad int using_gulm = 0; 291 1.1 haad int debug_opt = 0; 292 1.1 haad int clusterwide_opt = 0; 293 1.1 haad 294 1.1 haad /* Deal with command-line arguments */ 295 1.1 haad opterr = 0; 296 1.1 haad optind = 0; 297 1.2 dholland while ((opt = getopt(argc, argv, "?vVhd::t:RT:CI:")) != EOF) { 298 1.1 haad switch (opt) { 299 1.1 haad case 'h': 300 1.1 haad usage(argv[0], stdout); 301 1.1 haad exit(0); 302 1.1 haad 303 1.1 haad case '?': 304 1.1 haad usage(argv[0], stderr); 305 1.1 haad exit(0); 306 1.1 haad 307 1.1 haad case 'R': 308 1.2 dholland return refresh_clvmd()==1?0:1; 309 1.1 haad 310 1.1 haad case 'C': 311 1.1 haad clusterwide_opt = 1; 312 1.1 haad break; 313 1.1 haad 314 1.1 haad case 'd': 315 1.1 haad debug_opt = 1; 316 1.1 haad if (optarg) 317 1.1 haad debug = atoi(optarg); 318 1.1 haad else 319 1.1 haad debug = DEBUG_STDERR; 320 1.1 haad break; 321 1.1 haad 322 1.1 haad case 't': 323 1.1 haad cmd_timeout = atoi(optarg); 324 1.1 haad if (!cmd_timeout) { 325 1.1 haad fprintf(stderr, "command timeout is invalid\n"); 326 1.1 haad usage(argv[0], stderr); 327 1.1 haad exit(1); 328 1.1 haad } 329 1.1 haad break; 330 1.2 dholland case 'I': 331 1.2 dholland cluster_iface = parse_cluster_interface(optarg); 332 1.2 dholland break; 333 1.1 haad case 'T': 334 1.1 haad start_timeout = atoi(optarg); 335 1.1 haad if (start_timeout <= 0) { 336 1.1 haad fprintf(stderr, "startup timeout is invalid\n"); 337 1.1 haad usage(argv[0], stderr); 338 1.1 haad exit(1); 339 1.1 haad } 340 1.1 haad break; 341 1.1 haad 342 1.1 haad case 'V': 343 1.1 haad printf("Cluster LVM daemon version: %s\n", LVM_VERSION); 344 1.1 haad printf("Protocol version: %d.%d.%d\n", 345 1.1 haad CLVMD_MAJOR_VERSION, CLVMD_MINOR_VERSION, 346 1.1 haad CLVMD_PATCH_VERSION); 347 1.1 haad exit(1); 348 1.1 haad break; 349 1.1 haad 350 1.1 haad } 351 1.1 haad } 352 1.1 haad 353 1.1 haad /* Setting debug options on an existing clvmd */ 354 1.1 haad if (debug_opt && !check_local_clvmd()) { 355 1.1 haad 356 1.1 haad /* Sending to stderr makes no sense for a detached daemon */ 357 1.1 haad if (debug == DEBUG_STDERR) 358 1.1 haad debug = DEBUG_SYSLOG; 359 1.2 dholland return debug_clvmd(debug, clusterwide_opt)==1?0:1; 360 1.1 haad } 361 1.1 haad 362 1.1 haad /* Fork into the background (unless requested not to) */ 363 1.1 haad if (debug != DEBUG_STDERR) { 364 1.1 haad be_daemon(start_timeout); 365 1.1 haad } 366 1.1 haad 367 1.1 haad DEBUGLOG("CLVMD started\n"); 368 1.1 haad 369 1.1 haad /* Open the Unix socket we listen for commands on. 370 1.1 haad We do this before opening the cluster socket so that 371 1.1 haad potential clients will block rather than error if we are running 372 1.1 haad but the cluster is not ready yet */ 373 1.1 haad local_sock = open_local_sock(); 374 1.1 haad if (local_sock < 0) 375 1.1 haad child_init_signal(DFAIL_LOCAL_SOCK); 376 1.1 haad 377 1.1 haad /* Set up signal handlers, USR1 is for cluster change notifications (in cman) 378 1.1 haad USR2 causes child threads to exit. 379 1.1 haad HUP causes gulm version to re-read nodes list from CCS. 380 1.1 haad PIPE should be ignored */ 381 1.1 haad signal(SIGUSR2, sigusr2_handler); 382 1.1 haad signal(SIGHUP, sighup_handler); 383 1.1 haad signal(SIGPIPE, SIG_IGN); 384 1.1 haad 385 1.2 dholland /* Block SIGUSR2/SIGINT/SIGTERM in process */ 386 1.1 haad sigemptyset(&ss); 387 1.1 haad sigaddset(&ss, SIGUSR2); 388 1.2 dholland sigaddset(&ss, SIGINT); 389 1.2 dholland sigaddset(&ss, SIGTERM); 390 1.1 haad sigprocmask(SIG_BLOCK, &ss, NULL); 391 1.1 haad 392 1.1 haad /* Initialise the LVM thread variables */ 393 1.1 haad dm_list_init(&lvm_cmd_head); 394 1.1 haad pthread_mutex_init(&lvm_thread_mutex, NULL); 395 1.1 haad pthread_cond_init(&lvm_thread_cond, NULL); 396 1.1 haad pthread_mutex_init(&lvm_start_mutex, NULL); 397 1.1 haad init_lvhash(); 398 1.1 haad 399 1.1 haad /* Start the cluster interface */ 400 1.2 dholland if (cluster_iface == IF_AUTO) 401 1.2 dholland cluster_iface = get_cluster_type(); 402 1.2 dholland 403 1.1 haad #ifdef USE_CMAN 404 1.2 dholland if ((cluster_iface == IF_AUTO || cluster_iface == IF_CMAN) && (clops = init_cman_cluster())) { 405 1.1 haad max_csid_len = CMAN_MAX_CSID_LEN; 406 1.1 haad max_cluster_message = CMAN_MAX_CLUSTER_MESSAGE; 407 1.1 haad max_cluster_member_name_len = CMAN_MAX_NODENAME_LEN; 408 1.1 haad syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to CMAN"); 409 1.1 haad } 410 1.1 haad #endif 411 1.1 haad #ifdef USE_GULM 412 1.1 haad if (!clops) 413 1.2 dholland if ((cluster_iface == IF_AUTO || cluster_iface == IF_GULM) && (clops = init_gulm_cluster())) { 414 1.1 haad max_csid_len = GULM_MAX_CSID_LEN; 415 1.1 haad max_cluster_message = GULM_MAX_CLUSTER_MESSAGE; 416 1.1 haad max_cluster_member_name_len = GULM_MAX_CLUSTER_MEMBER_NAME_LEN; 417 1.1 haad using_gulm = 1; 418 1.1 haad syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to GULM"); 419 1.1 haad } 420 1.1 haad #endif 421 1.2 dholland #ifdef USE_COROSYNC 422 1.2 dholland if (!clops) 423 1.2 dholland if (((cluster_iface == IF_AUTO || cluster_iface == IF_COROSYNC) && (clops = init_corosync_cluster()))) { 424 1.2 dholland max_csid_len = COROSYNC_CSID_LEN; 425 1.2 dholland max_cluster_message = COROSYNC_MAX_CLUSTER_MESSAGE; 426 1.2 dholland max_cluster_member_name_len = COROSYNC_MAX_CLUSTER_MEMBER_NAME_LEN; 427 1.2 dholland syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to Corosync"); 428 1.2 dholland } 429 1.2 dholland #endif 430 1.1 haad #ifdef USE_OPENAIS 431 1.1 haad if (!clops) 432 1.2 dholland if ((cluster_iface == IF_AUTO || cluster_iface == IF_OPENAIS) && (clops = init_openais_cluster())) { 433 1.1 haad max_csid_len = OPENAIS_CSID_LEN; 434 1.1 haad max_cluster_message = OPENAIS_MAX_CLUSTER_MESSAGE; 435 1.1 haad max_cluster_member_name_len = OPENAIS_MAX_CLUSTER_MEMBER_NAME_LEN; 436 1.1 haad syslog(LOG_NOTICE, "Cluster LVM daemon started - connected to OpenAIS"); 437 1.1 haad } 438 1.1 haad #endif 439 1.1 haad 440 1.1 haad if (!clops) { 441 1.1 haad DEBUGLOG("Can't initialise cluster interface\n"); 442 1.1 haad log_error("Can't initialise cluster interface\n"); 443 1.1 haad child_init_signal(DFAIL_CLUSTER_IF); 444 1.1 haad } 445 1.1 haad DEBUGLOG("Cluster ready, doing some more initialisation\n"); 446 1.1 haad 447 1.1 haad /* Save our CSID */ 448 1.1 haad uname(&nodeinfo); 449 1.1 haad clops->get_our_csid(our_csid); 450 1.1 haad 451 1.1 haad /* Initialise the FD list head */ 452 1.1 haad local_client_head.fd = clops->get_main_cluster_fd(); 453 1.1 haad local_client_head.type = CLUSTER_MAIN_SOCK; 454 1.1 haad local_client_head.callback = clops->cluster_fd_callback; 455 1.1 haad 456 1.1 haad /* Add the local socket to the list */ 457 1.1 haad newfd = malloc(sizeof(struct local_client)); 458 1.1 haad if (!newfd) 459 1.1 haad child_init_signal(DFAIL_MALLOC); 460 1.1 haad 461 1.1 haad newfd->fd = local_sock; 462 1.1 haad newfd->removeme = 0; 463 1.1 haad newfd->type = LOCAL_RENDEZVOUS; 464 1.1 haad newfd->callback = local_rendezvous_callback; 465 1.1 haad newfd->next = local_client_head.next; 466 1.1 haad local_client_head.next = newfd; 467 1.1 haad 468 1.1 haad /* This needs to be started after cluster initialisation 469 1.1 haad as it may need to take out locks */ 470 1.1 haad DEBUGLOG("starting LVM thread\n"); 471 1.2 dholland 472 1.2 dholland /* Don't let anyone else to do work until we are started */ 473 1.2 dholland pthread_mutex_lock(&lvm_start_mutex); 474 1.2 dholland pthread_create(&lvm_thread, NULL, (lvm_pthread_fn_t*)lvm_thread_fn, 475 1.1 haad (void *)(long)using_gulm); 476 1.1 haad 477 1.1 haad /* Tell the rest of the cluster our version number */ 478 1.1 haad /* CMAN can do this immediately, gulm needs to wait until 479 1.1 haad the core initialisation has finished and the node list 480 1.1 haad has been gathered */ 481 1.1 haad if (clops->cluster_init_completed) 482 1.1 haad clops->cluster_init_completed(); 483 1.1 haad 484 1.1 haad DEBUGLOG("clvmd ready for work\n"); 485 1.1 haad child_init_signal(SUCCESS); 486 1.1 haad 487 1.1 haad /* Try to shutdown neatly */ 488 1.1 haad signal(SIGTERM, sigterm_handler); 489 1.1 haad signal(SIGINT, sigterm_handler); 490 1.1 haad 491 1.1 haad /* Do some work */ 492 1.1 haad main_loop(local_sock, cmd_timeout); 493 1.1 haad 494 1.2 dholland destroy_lvm(); 495 1.2 dholland 496 1.1 haad return 0; 497 1.1 haad } 498 1.1 haad 499 1.1 haad /* Called when the GuLM cluster layer has completed initialisation. 500 1.1 haad We send the version message */ 501 1.1 haad void clvmd_cluster_init_completed() 502 1.1 haad { 503 1.1 haad send_version_message(); 504 1.1 haad } 505 1.1 haad 506 1.1 haad /* Data on a connected socket */ 507 1.1 haad static int local_sock_callback(struct local_client *thisfd, char *buf, int len, 508 1.1 haad const char *csid, 509 1.1 haad struct local_client **new_client) 510 1.1 haad { 511 1.1 haad *new_client = NULL; 512 1.1 haad return read_from_local_sock(thisfd); 513 1.1 haad } 514 1.1 haad 515 1.1 haad /* Data on a connected socket */ 516 1.1 haad static int local_rendezvous_callback(struct local_client *thisfd, char *buf, 517 1.1 haad int len, const char *csid, 518 1.1 haad struct local_client **new_client) 519 1.1 haad { 520 1.1 haad /* Someone connected to our local socket, accept it. */ 521 1.1 haad 522 1.1 haad struct sockaddr_un socka; 523 1.1 haad struct local_client *newfd; 524 1.1 haad socklen_t sl = sizeof(socka); 525 1.1 haad int client_fd = accept(thisfd->fd, (struct sockaddr *) &socka, &sl); 526 1.1 haad 527 1.1 haad if (client_fd == -1 && errno == EINTR) 528 1.1 haad return 1; 529 1.1 haad 530 1.1 haad if (client_fd >= 0) { 531 1.1 haad newfd = malloc(sizeof(struct local_client)); 532 1.1 haad if (!newfd) { 533 1.1 haad close(client_fd); 534 1.1 haad return 1; 535 1.1 haad } 536 1.1 haad newfd->fd = client_fd; 537 1.1 haad newfd->type = LOCAL_SOCK; 538 1.1 haad newfd->xid = 0; 539 1.1 haad newfd->removeme = 0; 540 1.1 haad newfd->callback = local_sock_callback; 541 1.1 haad newfd->bits.localsock.replies = NULL; 542 1.1 haad newfd->bits.localsock.expected_replies = 0; 543 1.1 haad newfd->bits.localsock.cmd = NULL; 544 1.1 haad newfd->bits.localsock.in_progress = FALSE; 545 1.1 haad newfd->bits.localsock.sent_out = FALSE; 546 1.1 haad newfd->bits.localsock.threadid = 0; 547 1.1 haad newfd->bits.localsock.finished = 0; 548 1.1 haad newfd->bits.localsock.pipe_client = NULL; 549 1.1 haad newfd->bits.localsock.private = NULL; 550 1.1 haad newfd->bits.localsock.all_success = 1; 551 1.1 haad DEBUGLOG("Got new connection on fd %d\n", newfd->fd); 552 1.1 haad *new_client = newfd; 553 1.1 haad } 554 1.1 haad return 1; 555 1.1 haad } 556 1.1 haad 557 1.1 haad static int local_pipe_callback(struct local_client *thisfd, char *buf, 558 1.1 haad int maxlen, const char *csid, 559 1.1 haad struct local_client **new_client) 560 1.1 haad { 561 1.1 haad int len; 562 1.1 haad char buffer[PIPE_BUF]; 563 1.1 haad struct local_client *sock_client = thisfd->bits.pipe.client; 564 1.1 haad int status = -1; /* in error by default */ 565 1.1 haad 566 1.1 haad len = read(thisfd->fd, buffer, sizeof(int)); 567 1.1 haad if (len == -1 && errno == EINTR) 568 1.1 haad return 1; 569 1.1 haad 570 1.1 haad if (len == sizeof(int)) { 571 1.1 haad memcpy(&status, buffer, sizeof(int)); 572 1.1 haad } 573 1.1 haad 574 1.1 haad DEBUGLOG("read on PIPE %d: %d bytes: status: %d\n", 575 1.1 haad thisfd->fd, len, status); 576 1.1 haad 577 1.1 haad /* EOF on pipe or an error, close it */ 578 1.1 haad if (len <= 0) { 579 1.1 haad int jstat; 580 1.1 haad void *ret = &status; 581 1.1 haad close(thisfd->fd); 582 1.1 haad 583 1.1 haad /* Clear out the cross-link */ 584 1.1 haad if (thisfd->bits.pipe.client != NULL) 585 1.1 haad thisfd->bits.pipe.client->bits.localsock.pipe_client = 586 1.1 haad NULL; 587 1.1 haad 588 1.1 haad /* Reap child thread */ 589 1.1 haad if (thisfd->bits.pipe.threadid) { 590 1.1 haad jstat = pthread_join(thisfd->bits.pipe.threadid, &ret); 591 1.1 haad thisfd->bits.pipe.threadid = 0; 592 1.1 haad if (thisfd->bits.pipe.client != NULL) 593 1.1 haad thisfd->bits.pipe.client->bits.localsock. 594 1.1 haad threadid = 0; 595 1.1 haad } 596 1.1 haad return -1; 597 1.1 haad } else { 598 1.1 haad DEBUGLOG("background routine status was %d, sock_client=%p\n", 599 1.1 haad status, sock_client); 600 1.1 haad /* But has the client gone away ?? */ 601 1.1 haad if (sock_client == NULL) { 602 1.1 haad DEBUGLOG 603 1.1 haad ("Got PIPE response for dead client, ignoring it\n"); 604 1.1 haad } else { 605 1.1 haad /* If error then just return that code */ 606 1.1 haad if (status) 607 1.1 haad send_local_reply(sock_client, status, 608 1.1 haad sock_client->fd); 609 1.1 haad else { 610 1.1 haad if (sock_client->bits.localsock.state == 611 1.1 haad POST_COMMAND) { 612 1.1 haad send_local_reply(sock_client, 0, 613 1.1 haad sock_client->fd); 614 1.1 haad } else // PRE_COMMAND finished. 615 1.1 haad { 616 1.1 haad if ( 617 1.1 haad (status = 618 1.1 haad distribute_command(sock_client)) != 619 1.1 haad 0) send_local_reply(sock_client, 620 1.1 haad EFBIG, 621 1.1 haad sock_client-> 622 1.1 haad fd); 623 1.1 haad } 624 1.1 haad } 625 1.1 haad } 626 1.1 haad } 627 1.1 haad return len; 628 1.1 haad } 629 1.1 haad 630 1.1 haad /* If a noed is up, look for it in the reply array, if it's not there then 631 1.1 haad add one with "ETIMEDOUT". 632 1.1 haad NOTE: This won't race with real replies because they happen in the same thread. 633 1.1 haad */ 634 1.1 haad static void timedout_callback(struct local_client *client, const char *csid, 635 1.1 haad int node_up) 636 1.1 haad { 637 1.1 haad if (node_up) { 638 1.1 haad struct node_reply *reply; 639 1.1 haad char nodename[max_cluster_member_name_len]; 640 1.1 haad 641 1.1 haad clops->name_from_csid(csid, nodename); 642 1.1 haad DEBUGLOG("Checking for a reply from %s\n", nodename); 643 1.1 haad pthread_mutex_lock(&client->bits.localsock.reply_mutex); 644 1.1 haad 645 1.1 haad reply = client->bits.localsock.replies; 646 1.1 haad while (reply && strcmp(reply->node, nodename) != 0) { 647 1.1 haad reply = reply->next; 648 1.1 haad } 649 1.1 haad 650 1.1 haad pthread_mutex_unlock(&client->bits.localsock.reply_mutex); 651 1.1 haad 652 1.1 haad if (!reply) { 653 1.1 haad DEBUGLOG("Node %s timed-out\n", nodename); 654 1.1 haad add_reply_to_list(client, ETIMEDOUT, csid, 655 1.1 haad "Command timed out", 18); 656 1.1 haad } 657 1.1 haad } 658 1.1 haad } 659 1.1 haad 660 1.1 haad /* Called when the request has timed out on at least one node. We fill in 661 1.1 haad the remaining node entries with ETIMEDOUT and return. 662 1.1 haad 663 1.1 haad By the time we get here the node that caused 664 1.1 haad the timeout could have gone down, in which case we will never get the expected 665 1.1 haad number of replies that triggers the post command so we need to do it here 666 1.1 haad */ 667 1.1 haad static void request_timed_out(struct local_client *client) 668 1.1 haad { 669 1.1 haad DEBUGLOG("Request timed-out. padding\n"); 670 1.1 haad clops->cluster_do_node_callback(client, timedout_callback); 671 1.1 haad 672 1.1 haad if (client->bits.localsock.num_replies != 673 1.1 haad client->bits.localsock.expected_replies) { 674 1.1 haad /* Post-process the command */ 675 1.1 haad if (client->bits.localsock.threadid) { 676 1.1 haad pthread_mutex_lock(&client->bits.localsock.mutex); 677 1.1 haad client->bits.localsock.state = POST_COMMAND; 678 1.1 haad pthread_cond_signal(&client->bits.localsock.cond); 679 1.1 haad pthread_mutex_unlock(&client->bits.localsock.mutex); 680 1.1 haad } 681 1.1 haad } 682 1.1 haad } 683 1.1 haad 684 1.1 haad /* This is where the real work happens */ 685 1.1 haad static void main_loop(int local_sock, int cmd_timeout) 686 1.1 haad { 687 1.1 haad DEBUGLOG("Using timeout of %d seconds\n", cmd_timeout); 688 1.1 haad 689 1.2 dholland sigset_t ss; 690 1.2 dholland sigemptyset(&ss); 691 1.2 dholland sigaddset(&ss, SIGINT); 692 1.2 dholland sigaddset(&ss, SIGTERM); 693 1.2 dholland pthread_sigmask(SIG_UNBLOCK, &ss, NULL); 694 1.1 haad /* Main loop */ 695 1.1 haad while (!quit) { 696 1.1 haad fd_set in; 697 1.1 haad int select_status; 698 1.1 haad struct local_client *thisfd; 699 1.1 haad struct timeval tv = { cmd_timeout, 0 }; 700 1.1 haad int quorate = clops->is_quorate(); 701 1.1 haad 702 1.1 haad /* Wait on the cluster FD and all local sockets/pipes */ 703 1.1 haad local_client_head.fd = clops->get_main_cluster_fd(); 704 1.1 haad FD_ZERO(&in); 705 1.1 haad for (thisfd = &local_client_head; thisfd != NULL; 706 1.1 haad thisfd = thisfd->next) { 707 1.1 haad 708 1.1 haad if (thisfd->removeme) 709 1.1 haad continue; 710 1.1 haad 711 1.1 haad /* if the cluster is not quorate then don't listen for new requests */ 712 1.1 haad if ((thisfd->type != LOCAL_RENDEZVOUS && 713 1.1 haad thisfd->type != LOCAL_SOCK) || quorate) 714 1.1 haad FD_SET(thisfd->fd, &in); 715 1.1 haad } 716 1.1 haad 717 1.1 haad select_status = select(FD_SETSIZE, &in, NULL, NULL, &tv); 718 1.1 haad 719 1.1 haad if (reread_config) { 720 1.1 haad int saved_errno = errno; 721 1.1 haad 722 1.1 haad reread_config = 0; 723 1.1 haad if (clops->reread_config) 724 1.1 haad clops->reread_config(); 725 1.1 haad errno = saved_errno; 726 1.1 haad } 727 1.1 haad 728 1.1 haad if (select_status > 0) { 729 1.1 haad struct local_client *lastfd = NULL; 730 1.1 haad char csid[MAX_CSID_LEN]; 731 1.1 haad char buf[max_cluster_message]; 732 1.1 haad 733 1.1 haad for (thisfd = &local_client_head; thisfd != NULL; 734 1.1 haad thisfd = thisfd->next) { 735 1.1 haad 736 1.1 haad if (thisfd->removeme) { 737 1.1 haad struct local_client *free_fd; 738 1.1 haad lastfd->next = thisfd->next; 739 1.1 haad free_fd = thisfd; 740 1.1 haad thisfd = lastfd; 741 1.1 haad 742 1.1 haad DEBUGLOG("removeme set for fd %d\n", free_fd->fd); 743 1.1 haad 744 1.1 haad /* Queue cleanup, this also frees the client struct */ 745 1.1 haad add_to_lvmqueue(free_fd, NULL, 0, NULL); 746 1.1 haad break; 747 1.1 haad } 748 1.1 haad 749 1.1 haad if (FD_ISSET(thisfd->fd, &in)) { 750 1.1 haad struct local_client *newfd = NULL; 751 1.1 haad int ret; 752 1.1 haad 753 1.1 haad /* Do callback */ 754 1.1 haad ret = 755 1.1 haad thisfd->callback(thisfd, buf, 756 1.1 haad sizeof(buf), csid, 757 1.1 haad &newfd); 758 1.1 haad /* Ignore EAGAIN */ 759 1.1 haad if (ret < 0 && (errno == EAGAIN || 760 1.1 haad errno == EINTR)) continue; 761 1.1 haad 762 1.1 haad /* Got error or EOF: Remove it from the list safely */ 763 1.1 haad if (ret <= 0) { 764 1.1 haad struct local_client *free_fd; 765 1.1 haad int type = thisfd->type; 766 1.1 haad 767 1.1 haad /* If the cluster socket shuts down, so do we */ 768 1.1 haad if (type == CLUSTER_MAIN_SOCK || 769 1.1 haad type == CLUSTER_INTERNAL) 770 1.1 haad goto closedown; 771 1.1 haad 772 1.1 haad DEBUGLOG("ret == %d, errno = %d. removing client\n", 773 1.1 haad ret, errno); 774 1.1 haad lastfd->next = thisfd->next; 775 1.1 haad free_fd = thisfd; 776 1.1 haad thisfd = lastfd; 777 1.1 haad close(free_fd->fd); 778 1.1 haad 779 1.1 haad /* Queue cleanup, this also frees the client struct */ 780 1.1 haad add_to_lvmqueue(free_fd, NULL, 0, NULL); 781 1.1 haad break; 782 1.1 haad } 783 1.1 haad 784 1.1 haad /* New client...simply add it to the list */ 785 1.1 haad if (newfd) { 786 1.1 haad newfd->next = thisfd->next; 787 1.1 haad thisfd->next = newfd; 788 1.1 haad break; 789 1.1 haad } 790 1.1 haad } 791 1.1 haad lastfd = thisfd; 792 1.1 haad } 793 1.1 haad } 794 1.1 haad 795 1.1 haad /* Select timed out. Check for clients that have been waiting too long for a response */ 796 1.1 haad if (select_status == 0) { 797 1.1 haad time_t the_time = time(NULL); 798 1.1 haad 799 1.1 haad for (thisfd = &local_client_head; thisfd != NULL; 800 1.1 haad thisfd = thisfd->next) { 801 1.1 haad if (thisfd->type == LOCAL_SOCK 802 1.1 haad && thisfd->bits.localsock.sent_out 803 1.1 haad && thisfd->bits.localsock.sent_time + 804 1.1 haad cmd_timeout < the_time 805 1.1 haad && thisfd->bits.localsock. 806 1.1 haad expected_replies != 807 1.1 haad thisfd->bits.localsock.num_replies) { 808 1.1 haad /* Send timed out message + replies we already have */ 809 1.1 haad DEBUGLOG 810 1.1 haad ("Request timed-out (send: %ld, now: %ld)\n", 811 1.1 haad thisfd->bits.localsock.sent_time, 812 1.1 haad the_time); 813 1.1 haad 814 1.1 haad thisfd->bits.localsock.all_success = 0; 815 1.1 haad 816 1.1 haad request_timed_out(thisfd); 817 1.1 haad } 818 1.1 haad } 819 1.1 haad } 820 1.1 haad if (select_status < 0) { 821 1.1 haad if (errno == EINTR) 822 1.1 haad continue; 823 1.1 haad 824 1.1 haad #ifdef DEBUG 825 1.1 haad perror("select error"); 826 1.1 haad exit(-1); 827 1.1 haad #endif 828 1.1 haad } 829 1.1 haad } 830 1.1 haad 831 1.1 haad closedown: 832 1.1 haad clops->cluster_closedown(); 833 1.1 haad close(local_sock); 834 1.1 haad } 835 1.1 haad 836 1.1 haad static __attribute__ ((noreturn)) void wait_for_child(int c_pipe, int timeout) 837 1.1 haad { 838 1.1 haad int child_status; 839 1.1 haad int sstat; 840 1.1 haad fd_set fds; 841 1.1 haad struct timeval tv = {timeout, 0}; 842 1.1 haad 843 1.1 haad FD_ZERO(&fds); 844 1.1 haad FD_SET(c_pipe, &fds); 845 1.1 haad 846 1.1 haad sstat = select(c_pipe+1, &fds, NULL, NULL, timeout? &tv: NULL); 847 1.1 haad if (sstat == 0) { 848 1.1 haad fprintf(stderr, "clvmd startup timed out\n"); 849 1.1 haad exit(DFAIL_TIMEOUT); 850 1.1 haad } 851 1.1 haad if (sstat == 1) { 852 1.1 haad if (read(c_pipe, &child_status, sizeof(child_status)) != 853 1.1 haad sizeof(child_status)) { 854 1.1 haad 855 1.1 haad fprintf(stderr, "clvmd failed in initialisation\n"); 856 1.1 haad exit(DFAIL_INIT); 857 1.1 haad } 858 1.1 haad else { 859 1.1 haad switch (child_status) { 860 1.1 haad case SUCCESS: 861 1.1 haad break; 862 1.1 haad case DFAIL_INIT: 863 1.1 haad fprintf(stderr, "clvmd failed in initialisation\n"); 864 1.1 haad break; 865 1.1 haad case DFAIL_LOCAL_SOCK: 866 1.1 haad fprintf(stderr, "clvmd could not create local socket\n"); 867 1.1 haad fprintf(stderr, "Another clvmd is probably already running\n"); 868 1.1 haad break; 869 1.1 haad case DFAIL_CLUSTER_IF: 870 1.1 haad fprintf(stderr, "clvmd could not connect to cluster manager\n"); 871 1.1 haad fprintf(stderr, "Consult syslog for more information\n"); 872 1.1 haad break; 873 1.1 haad case DFAIL_MALLOC: 874 1.1 haad fprintf(stderr, "clvmd failed, not enough memory\n"); 875 1.1 haad break; 876 1.1 haad default: 877 1.1 haad fprintf(stderr, "clvmd failed, error was %d\n", child_status); 878 1.1 haad break; 879 1.1 haad } 880 1.1 haad exit(child_status); 881 1.1 haad } 882 1.1 haad } 883 1.1 haad fprintf(stderr, "clvmd startup, select failed: %s\n", strerror(errno)); 884 1.1 haad exit(DFAIL_INIT); 885 1.1 haad } 886 1.1 haad 887 1.1 haad /* 888 1.1 haad * Fork into the background and detach from our parent process. 889 1.1 haad * In the interests of user-friendliness we wait for the daemon 890 1.1 haad * to complete initialisation before returning its status 891 1.1 haad * the the user. 892 1.1 haad */ 893 1.1 haad static void be_daemon(int timeout) 894 1.1 haad { 895 1.1 haad pid_t pid; 896 1.1 haad int devnull = open("/dev/null", O_RDWR); 897 1.1 haad if (devnull == -1) { 898 1.1 haad perror("Can't open /dev/null"); 899 1.1 haad exit(3); 900 1.1 haad } 901 1.1 haad 902 1.1 haad pipe(child_pipe); 903 1.1 haad 904 1.1 haad switch (pid = fork()) { 905 1.1 haad case -1: 906 1.1 haad perror("clvmd: can't fork"); 907 1.1 haad exit(2); 908 1.1 haad 909 1.1 haad case 0: /* Child */ 910 1.1 haad close(child_pipe[0]); 911 1.1 haad break; 912 1.1 haad 913 1.1 haad default: /* Parent */ 914 1.1 haad close(child_pipe[1]); 915 1.1 haad wait_for_child(child_pipe[0], timeout); 916 1.1 haad } 917 1.1 haad 918 1.1 haad /* Detach ourself from the calling environment */ 919 1.1 haad if (close(0) || close(1) || close(2)) { 920 1.1 haad perror("Error closing terminal FDs"); 921 1.1 haad exit(4); 922 1.1 haad } 923 1.1 haad setsid(); 924 1.1 haad 925 1.1 haad if (dup2(devnull, 0) < 0 || dup2(devnull, 1) < 0 926 1.1 haad || dup2(devnull, 2) < 0) { 927 1.2 dholland int e = errno; 928 1.1 haad perror("Error setting terminal FDs to /dev/null"); 929 1.2 dholland log_error("Error setting terminal FDs to /dev/null: %s", strerror(e)); 930 1.1 haad exit(5); 931 1.1 haad } 932 1.1 haad if (chdir("/")) { 933 1.2 dholland log_error("Error setting current directory to /: %s", strerror(e)); 934 1.1 haad exit(6); 935 1.1 haad } 936 1.1 haad 937 1.1 haad } 938 1.1 haad 939 1.1 haad /* Called when we have a read from the local socket. 940 1.1 haad was in the main loop but it's grown up and is a big girl now */ 941 1.1 haad static int read_from_local_sock(struct local_client *thisfd) 942 1.1 haad { 943 1.1 haad int len; 944 1.1 haad int argslen; 945 1.1 haad int missing_len; 946 1.1 haad char buffer[PIPE_BUF]; 947 1.1 haad 948 1.1 haad len = read(thisfd->fd, buffer, sizeof(buffer)); 949 1.1 haad if (len == -1 && errno == EINTR) 950 1.1 haad return 1; 951 1.1 haad 952 1.1 haad DEBUGLOG("Read on local socket %d, len = %d\n", thisfd->fd, len); 953 1.1 haad 954 1.1 haad /* EOF or error on socket */ 955 1.1 haad if (len <= 0) { 956 1.1 haad int *status; 957 1.1 haad int jstat; 958 1.1 haad 959 1.1 haad DEBUGLOG("EOF on local socket: inprogress=%d\n", 960 1.1 haad thisfd->bits.localsock.in_progress); 961 1.1 haad 962 1.1 haad thisfd->bits.localsock.finished = 1; 963 1.1 haad 964 1.1 haad /* If the client went away in mid command then tidy up */ 965 1.1 haad if (thisfd->bits.localsock.in_progress) { 966 1.1 haad pthread_kill(thisfd->bits.localsock.threadid, SIGUSR2); 967 1.1 haad pthread_mutex_lock(&thisfd->bits.localsock.mutex); 968 1.1 haad thisfd->bits.localsock.state = POST_COMMAND; 969 1.1 haad pthread_cond_signal(&thisfd->bits.localsock.cond); 970 1.1 haad pthread_mutex_unlock(&thisfd->bits.localsock.mutex); 971 1.1 haad 972 1.1 haad /* Free any unsent buffers */ 973 1.1 haad free_reply(thisfd); 974 1.1 haad } 975 1.1 haad 976 1.1 haad /* Kill the subthread & free resources */ 977 1.1 haad if (thisfd->bits.localsock.threadid) { 978 1.1 haad DEBUGLOG("Waiting for child thread\n"); 979 1.1 haad pthread_mutex_lock(&thisfd->bits.localsock.mutex); 980 1.1 haad thisfd->bits.localsock.state = PRE_COMMAND; 981 1.1 haad pthread_cond_signal(&thisfd->bits.localsock.cond); 982 1.1 haad pthread_mutex_unlock(&thisfd->bits.localsock.mutex); 983 1.1 haad 984 1.1 haad jstat = 985 1.1 haad pthread_join(thisfd->bits.localsock.threadid, 986 1.1 haad (void **) &status); 987 1.1 haad DEBUGLOG("Joined child thread\n"); 988 1.1 haad 989 1.1 haad thisfd->bits.localsock.threadid = 0; 990 1.1 haad pthread_cond_destroy(&thisfd->bits.localsock.cond); 991 1.1 haad pthread_mutex_destroy(&thisfd->bits.localsock.mutex); 992 1.1 haad 993 1.1 haad /* Remove the pipe client */ 994 1.1 haad if (thisfd->bits.localsock.pipe_client != NULL) { 995 1.1 haad struct local_client *newfd; 996 1.1 haad struct local_client *lastfd = NULL; 997 1.1 haad struct local_client *free_fd = NULL; 998 1.1 haad 999 1.1 haad close(thisfd->bits.localsock.pipe_client->fd); /* Close pipe */ 1000 1.1 haad close(thisfd->bits.localsock.pipe); 1001 1.1 haad 1002 1.1 haad /* Remove pipe client */ 1003 1.1 haad for (newfd = &local_client_head; newfd != NULL; 1004 1.1 haad newfd = newfd->next) { 1005 1.1 haad if (thisfd->bits.localsock. 1006 1.1 haad pipe_client == newfd) { 1007 1.1 haad thisfd->bits.localsock. 1008 1.1 haad pipe_client = NULL; 1009 1.1 haad 1010 1.1 haad lastfd->next = newfd->next; 1011 1.1 haad free_fd = newfd; 1012 1.1 haad newfd->next = lastfd; 1013 1.1 haad free(free_fd); 1014 1.1 haad break; 1015 1.1 haad } 1016 1.1 haad lastfd = newfd; 1017 1.1 haad } 1018 1.1 haad } 1019 1.1 haad } 1020 1.1 haad 1021 1.1 haad /* Free the command buffer */ 1022 1.1 haad free(thisfd->bits.localsock.cmd); 1023 1.1 haad 1024 1.1 haad /* Clear out the cross-link */ 1025 1.1 haad if (thisfd->bits.localsock.pipe_client != NULL) 1026 1.1 haad thisfd->bits.localsock.pipe_client->bits.pipe.client = 1027 1.1 haad NULL; 1028 1.1 haad 1029 1.1 haad close(thisfd->fd); 1030 1.1 haad return 0; 1031 1.1 haad } else { 1032 1.1 haad int comms_pipe[2]; 1033 1.1 haad struct local_client *newfd; 1034 1.1 haad char csid[MAX_CSID_LEN]; 1035 1.1 haad struct clvm_header *inheader; 1036 1.1 haad int status; 1037 1.1 haad 1038 1.1 haad inheader = (struct clvm_header *) buffer; 1039 1.1 haad 1040 1.1 haad /* Fill in the client ID */ 1041 1.1 haad inheader->clientid = htonl(thisfd->fd); 1042 1.1 haad 1043 1.1 haad /* If we are already busy then return an error */ 1044 1.1 haad if (thisfd->bits.localsock.in_progress) { 1045 1.1 haad struct clvm_header reply; 1046 1.1 haad reply.cmd = CLVMD_CMD_REPLY; 1047 1.1 haad reply.status = EBUSY; 1048 1.1 haad reply.arglen = 0; 1049 1.1 haad reply.flags = 0; 1050 1.1 haad send_message(&reply, sizeof(reply), our_csid, 1051 1.1 haad thisfd->fd, 1052 1.1 haad "Error sending EBUSY reply to local user"); 1053 1.1 haad return len; 1054 1.1 haad } 1055 1.1 haad 1056 1.1 haad /* Free any old buffer space */ 1057 1.1 haad free(thisfd->bits.localsock.cmd); 1058 1.1 haad 1059 1.1 haad /* See if we have the whole message */ 1060 1.1 haad argslen = 1061 1.1 haad len - strlen(inheader->node) - sizeof(struct clvm_header); 1062 1.1 haad missing_len = inheader->arglen - argslen; 1063 1.1 haad 1064 1.1 haad if (missing_len < 0) 1065 1.1 haad missing_len = 0; 1066 1.1 haad 1067 1.1 haad /* Save the message */ 1068 1.1 haad thisfd->bits.localsock.cmd = malloc(len + missing_len); 1069 1.1 haad 1070 1.1 haad if (!thisfd->bits.localsock.cmd) { 1071 1.1 haad struct clvm_header reply; 1072 1.1 haad reply.cmd = CLVMD_CMD_REPLY; 1073 1.1 haad reply.status = ENOMEM; 1074 1.1 haad reply.arglen = 0; 1075 1.1 haad reply.flags = 0; 1076 1.1 haad send_message(&reply, sizeof(reply), our_csid, 1077 1.1 haad thisfd->fd, 1078 1.1 haad "Error sending ENOMEM reply to local user"); 1079 1.1 haad return 0; 1080 1.1 haad } 1081 1.1 haad memcpy(thisfd->bits.localsock.cmd, buffer, len); 1082 1.1 haad thisfd->bits.localsock.cmd_len = len + missing_len; 1083 1.1 haad inheader = (struct clvm_header *) thisfd->bits.localsock.cmd; 1084 1.1 haad 1085 1.1 haad /* If we don't have the full message then read the rest now */ 1086 1.1 haad if (missing_len) { 1087 1.1 haad char *argptr = 1088 1.1 haad inheader->node + strlen(inheader->node) + 1; 1089 1.1 haad 1090 1.1 haad while (missing_len > 0 && len >= 0) { 1091 1.1 haad DEBUGLOG 1092 1.1 haad ("got %d bytes, need another %d (total %d)\n", 1093 1.1 haad argslen, missing_len, inheader->arglen); 1094 1.1 haad len = read(thisfd->fd, argptr + argslen, 1095 1.1 haad missing_len); 1096 1.1 haad if (len >= 0) { 1097 1.1 haad missing_len -= len; 1098 1.1 haad argslen += len; 1099 1.1 haad } 1100 1.1 haad } 1101 1.1 haad } 1102 1.1 haad 1103 1.1 haad /* Initialise and lock the mutex so the subthread will wait after 1104 1.1 haad finishing the PRE routine */ 1105 1.1 haad if (!thisfd->bits.localsock.threadid) { 1106 1.1 haad pthread_mutex_init(&thisfd->bits.localsock.mutex, NULL); 1107 1.1 haad pthread_cond_init(&thisfd->bits.localsock.cond, NULL); 1108 1.1 haad pthread_mutex_init(&thisfd->bits.localsock.reply_mutex, NULL); 1109 1.1 haad } 1110 1.1 haad 1111 1.1 haad /* Only run the command if all the cluster nodes are running CLVMD */ 1112 1.1 haad if (((inheader->flags & CLVMD_FLAG_LOCAL) == 0) && 1113 1.1 haad (check_all_clvmds_running(thisfd) == -1)) { 1114 1.1 haad thisfd->bits.localsock.expected_replies = 0; 1115 1.1 haad thisfd->bits.localsock.num_replies = 0; 1116 1.1 haad send_local_reply(thisfd, EHOSTDOWN, thisfd->fd); 1117 1.1 haad return len; 1118 1.1 haad } 1119 1.1 haad 1120 1.1 haad /* Check the node name for validity */ 1121 1.1 haad if (inheader->node[0] && clops->csid_from_name(csid, inheader->node)) { 1122 1.1 haad /* Error, node is not in the cluster */ 1123 1.1 haad struct clvm_header reply; 1124 1.1 haad DEBUGLOG("Unknown node: '%s'\n", inheader->node); 1125 1.1 haad 1126 1.1 haad reply.cmd = CLVMD_CMD_REPLY; 1127 1.1 haad reply.status = ENOENT; 1128 1.1 haad reply.flags = 0; 1129 1.1 haad reply.arglen = 0; 1130 1.1 haad send_message(&reply, sizeof(reply), our_csid, 1131 1.1 haad thisfd->fd, 1132 1.1 haad "Error sending ENOENT reply to local user"); 1133 1.1 haad thisfd->bits.localsock.expected_replies = 0; 1134 1.1 haad thisfd->bits.localsock.num_replies = 0; 1135 1.1 haad thisfd->bits.localsock.in_progress = FALSE; 1136 1.1 haad thisfd->bits.localsock.sent_out = FALSE; 1137 1.1 haad return len; 1138 1.1 haad } 1139 1.1 haad 1140 1.1 haad /* If we already have a subthread then just signal it to start */ 1141 1.1 haad if (thisfd->bits.localsock.threadid) { 1142 1.1 haad pthread_mutex_lock(&thisfd->bits.localsock.mutex); 1143 1.1 haad thisfd->bits.localsock.state = PRE_COMMAND; 1144 1.1 haad pthread_cond_signal(&thisfd->bits.localsock.cond); 1145 1.1 haad pthread_mutex_unlock(&thisfd->bits.localsock.mutex); 1146 1.1 haad return len; 1147 1.1 haad } 1148 1.1 haad 1149 1.1 haad /* Create a pipe and add the reading end to our FD list */ 1150 1.1 haad pipe(comms_pipe); 1151 1.1 haad newfd = malloc(sizeof(struct local_client)); 1152 1.1 haad if (!newfd) { 1153 1.1 haad struct clvm_header reply; 1154 1.1 haad close(comms_pipe[0]); 1155 1.1 haad close(comms_pipe[1]); 1156 1.1 haad 1157 1.1 haad reply.cmd = CLVMD_CMD_REPLY; 1158 1.1 haad reply.status = ENOMEM; 1159 1.1 haad reply.arglen = 0; 1160 1.1 haad reply.flags = 0; 1161 1.1 haad send_message(&reply, sizeof(reply), our_csid, 1162 1.1 haad thisfd->fd, 1163 1.1 haad "Error sending ENOMEM reply to local user"); 1164 1.1 haad return len; 1165 1.1 haad } 1166 1.1 haad DEBUGLOG("creating pipe, [%d, %d]\n", comms_pipe[0], 1167 1.1 haad comms_pipe[1]); 1168 1.1 haad newfd->fd = comms_pipe[0]; 1169 1.1 haad newfd->removeme = 0; 1170 1.1 haad newfd->type = THREAD_PIPE; 1171 1.1 haad newfd->callback = local_pipe_callback; 1172 1.1 haad newfd->next = thisfd->next; 1173 1.1 haad newfd->bits.pipe.client = thisfd; 1174 1.1 haad newfd->bits.pipe.threadid = 0; 1175 1.1 haad thisfd->next = newfd; 1176 1.1 haad 1177 1.1 haad /* Store a cross link to the pipe */ 1178 1.1 haad thisfd->bits.localsock.pipe_client = newfd; 1179 1.1 haad 1180 1.1 haad thisfd->bits.localsock.pipe = comms_pipe[1]; 1181 1.1 haad 1182 1.1 haad /* Make sure the thread has a copy of it's own ID */ 1183 1.1 haad newfd->bits.pipe.threadid = thisfd->bits.localsock.threadid; 1184 1.1 haad 1185 1.1 haad /* Run the pre routine */ 1186 1.1 haad thisfd->bits.localsock.in_progress = TRUE; 1187 1.1 haad thisfd->bits.localsock.state = PRE_COMMAND; 1188 1.1 haad DEBUGLOG("Creating pre&post thread\n"); 1189 1.1 haad status = pthread_create(&thisfd->bits.localsock.threadid, NULL, 1190 1.1 haad pre_and_post_thread, thisfd); 1191 1.1 haad DEBUGLOG("Created pre&post thread, state = %d\n", status); 1192 1.1 haad } 1193 1.1 haad return len; 1194 1.1 haad } 1195 1.1 haad 1196 1.1 haad /* Add a file descriptor from the cluster or comms interface to 1197 1.1 haad our list of FDs for select 1198 1.1 haad */ 1199 1.1 haad int add_client(struct local_client *new_client) 1200 1.1 haad { 1201 1.1 haad new_client->next = local_client_head.next; 1202 1.1 haad local_client_head.next = new_client; 1203 1.1 haad 1204 1.1 haad return 0; 1205 1.1 haad } 1206 1.1 haad 1207 1.1 haad /* Called when the pre-command has completed successfully - we 1208 1.1 haad now execute the real command on all the requested nodes */ 1209 1.1 haad static int distribute_command(struct local_client *thisfd) 1210 1.1 haad { 1211 1.1 haad struct clvm_header *inheader = 1212 1.1 haad (struct clvm_header *) thisfd->bits.localsock.cmd; 1213 1.1 haad int len = thisfd->bits.localsock.cmd_len; 1214 1.1 haad 1215 1.1 haad thisfd->xid = global_xid++; 1216 1.1 haad DEBUGLOG("distribute command: XID = %d\n", thisfd->xid); 1217 1.1 haad 1218 1.1 haad /* Forward it to other nodes in the cluster if needed */ 1219 1.1 haad if (!(inheader->flags & CLVMD_FLAG_LOCAL)) { 1220 1.1 haad /* if node is empty then do it on the whole cluster */ 1221 1.1 haad if (inheader->node[0] == '\0') { 1222 1.1 haad thisfd->bits.localsock.expected_replies = 1223 1.1 haad clops->get_num_nodes(); 1224 1.1 haad thisfd->bits.localsock.num_replies = 0; 1225 1.1 haad thisfd->bits.localsock.sent_time = time(NULL); 1226 1.1 haad thisfd->bits.localsock.in_progress = TRUE; 1227 1.1 haad thisfd->bits.localsock.sent_out = TRUE; 1228 1.1 haad 1229 1.1 haad /* Do it here first */ 1230 1.1 haad add_to_lvmqueue(thisfd, inheader, len, NULL); 1231 1.1 haad 1232 1.1 haad DEBUGLOG("Sending message to all cluster nodes\n"); 1233 1.1 haad inheader->xid = thisfd->xid; 1234 1.1 haad send_message(inheader, len, NULL, -1, 1235 1.1 haad "Error forwarding message to cluster"); 1236 1.1 haad } else { 1237 1.1 haad /* Do it on a single node */ 1238 1.1 haad char csid[MAX_CSID_LEN]; 1239 1.1 haad 1240 1.1 haad if (clops->csid_from_name(csid, inheader->node)) { 1241 1.1 haad /* This has already been checked so should not happen */ 1242 1.1 haad return 0; 1243 1.1 haad } else { 1244 1.1 haad /* OK, found a node... */ 1245 1.1 haad thisfd->bits.localsock.expected_replies = 1; 1246 1.1 haad thisfd->bits.localsock.num_replies = 0; 1247 1.1 haad thisfd->bits.localsock.in_progress = TRUE; 1248 1.1 haad 1249 1.1 haad /* Are we the requested node ?? */ 1250 1.1 haad if (memcmp(csid, our_csid, max_csid_len) == 0) { 1251 1.1 haad DEBUGLOG("Doing command on local node only\n"); 1252 1.1 haad add_to_lvmqueue(thisfd, inheader, len, NULL); 1253 1.1 haad } else { 1254 1.1 haad DEBUGLOG("Sending message to single node: %s\n", 1255 1.1 haad inheader->node); 1256 1.1 haad inheader->xid = thisfd->xid; 1257 1.1 haad send_message(inheader, len, 1258 1.1 haad csid, -1, 1259 1.1 haad "Error forwarding message to cluster node"); 1260 1.1 haad } 1261 1.1 haad } 1262 1.1 haad } 1263 1.1 haad } else { 1264 1.1 haad /* Local explicitly requested, ignore nodes */ 1265 1.1 haad thisfd->bits.localsock.in_progress = TRUE; 1266 1.1 haad thisfd->bits.localsock.expected_replies = 1; 1267 1.1 haad thisfd->bits.localsock.num_replies = 0; 1268 1.1 haad add_to_lvmqueue(thisfd, inheader, len, NULL); 1269 1.1 haad } 1270 1.1 haad return 0; 1271 1.1 haad } 1272 1.1 haad 1273 1.1 haad /* Process a command from a remote node and return the result */ 1274 1.1 haad static void process_remote_command(struct clvm_header *msg, int msglen, int fd, 1275 1.1 haad const char *csid) 1276 1.1 haad { 1277 1.1 haad char *replyargs; 1278 1.1 haad char nodename[max_cluster_member_name_len]; 1279 1.1 haad int replylen = 0; 1280 1.1 haad int buflen = max_cluster_message - sizeof(struct clvm_header) - 1; 1281 1.1 haad int status; 1282 1.1 haad int msg_malloced = 0; 1283 1.1 haad 1284 1.1 haad /* Get the node name as we /may/ need it later */ 1285 1.1 haad clops->name_from_csid(csid, nodename); 1286 1.1 haad 1287 1.1 haad DEBUGLOG("process_remote_command %s for clientid 0x%x XID %d on node %s\n", 1288 1.1 haad decode_cmd(msg->cmd), msg->clientid, msg->xid, nodename); 1289 1.1 haad 1290 1.1 haad /* Check for GOAWAY and sulk */ 1291 1.1 haad if (msg->cmd == CLVMD_CMD_GOAWAY) { 1292 1.1 haad 1293 1.1 haad DEBUGLOG("Told to go away by %s\n", nodename); 1294 1.1 haad log_error("Told to go away by %s\n", nodename); 1295 1.1 haad exit(99); 1296 1.1 haad } 1297 1.1 haad 1298 1.1 haad /* Version check is internal - don't bother exposing it in 1299 1.1 haad clvmd-command.c */ 1300 1.1 haad if (msg->cmd == CLVMD_CMD_VERSION) { 1301 1.1 haad int version_nums[3]; 1302 1.1 haad char node[256]; 1303 1.1 haad 1304 1.1 haad memcpy(version_nums, msg->args, sizeof(version_nums)); 1305 1.1 haad 1306 1.1 haad clops->name_from_csid(csid, node); 1307 1.1 haad DEBUGLOG("Remote node %s is version %d.%d.%d\n", 1308 1.1 haad node, 1309 1.1 haad ntohl(version_nums[0]), 1310 1.1 haad ntohl(version_nums[1]), ntohl(version_nums[2])); 1311 1.1 haad 1312 1.1 haad if (ntohl(version_nums[0]) != CLVMD_MAJOR_VERSION) { 1313 1.1 haad struct clvm_header byebyemsg; 1314 1.1 haad DEBUGLOG 1315 1.1 haad ("Telling node %s to go away because of incompatible version number\n", 1316 1.1 haad node); 1317 1.1 haad log_notice 1318 1.1 haad ("Telling node %s to go away because of incompatible version number %d.%d.%d\n", 1319 1.1 haad node, ntohl(version_nums[0]), 1320 1.1 haad ntohl(version_nums[1]), ntohl(version_nums[2])); 1321 1.1 haad 1322 1.1 haad byebyemsg.cmd = CLVMD_CMD_GOAWAY; 1323 1.1 haad byebyemsg.status = 0; 1324 1.1 haad byebyemsg.flags = 0; 1325 1.1 haad byebyemsg.arglen = 0; 1326 1.1 haad byebyemsg.clientid = 0; 1327 1.1 haad clops->cluster_send_message(&byebyemsg, sizeof(byebyemsg), 1328 1.1 haad our_csid, 1329 1.1 haad "Error Sending GOAWAY message"); 1330 1.1 haad } else { 1331 1.1 haad clops->add_up_node(csid); 1332 1.1 haad } 1333 1.1 haad return; 1334 1.1 haad } 1335 1.1 haad 1336 1.1 haad /* Allocate a default reply buffer */ 1337 1.1 haad replyargs = malloc(max_cluster_message - sizeof(struct clvm_header)); 1338 1.1 haad 1339 1.1 haad if (replyargs != NULL) { 1340 1.1 haad /* Run the command */ 1341 1.1 haad status = 1342 1.1 haad do_command(NULL, msg, msglen, &replyargs, buflen, 1343 1.1 haad &replylen); 1344 1.1 haad } else { 1345 1.1 haad status = ENOMEM; 1346 1.1 haad } 1347 1.1 haad 1348 1.1 haad /* If it wasn't a reply, then reply */ 1349 1.1 haad if (msg->cmd != CLVMD_CMD_REPLY) { 1350 1.1 haad char *aggreply; 1351 1.1 haad 1352 1.1 haad aggreply = 1353 1.1 haad realloc(replyargs, replylen + sizeof(struct clvm_header)); 1354 1.1 haad if (aggreply) { 1355 1.1 haad struct clvm_header *agghead = 1356 1.1 haad (struct clvm_header *) aggreply; 1357 1.1 haad 1358 1.1 haad replyargs = aggreply; 1359 1.1 haad /* Move it up so there's room for a header in front of the data */ 1360 1.1 haad memmove(aggreply + offsetof(struct clvm_header, args), 1361 1.1 haad replyargs, replylen); 1362 1.1 haad 1363 1.1 haad agghead->xid = msg->xid; 1364 1.1 haad agghead->cmd = CLVMD_CMD_REPLY; 1365 1.1 haad agghead->status = status; 1366 1.1 haad agghead->flags = 0; 1367 1.1 haad agghead->clientid = msg->clientid; 1368 1.1 haad agghead->arglen = replylen; 1369 1.1 haad agghead->node[0] = '\0'; 1370 1.1 haad send_message(aggreply, 1371 1.1 haad sizeof(struct clvm_header) + 1372 1.1 haad replylen, csid, fd, 1373 1.1 haad "Error sending command reply"); 1374 1.1 haad } else { 1375 1.1 haad struct clvm_header head; 1376 1.1 haad 1377 1.1 haad DEBUGLOG("Error attempting to realloc return buffer\n"); 1378 1.1 haad /* Return a failure response */ 1379 1.1 haad head.cmd = CLVMD_CMD_REPLY; 1380 1.1 haad head.status = ENOMEM; 1381 1.1 haad head.flags = 0; 1382 1.1 haad head.clientid = msg->clientid; 1383 1.1 haad head.arglen = 0; 1384 1.1 haad head.node[0] = '\0'; 1385 1.1 haad send_message(&head, sizeof(struct clvm_header), csid, 1386 1.1 haad fd, "Error sending ENOMEM command reply"); 1387 1.1 haad return; 1388 1.1 haad } 1389 1.1 haad } 1390 1.1 haad 1391 1.1 haad /* Free buffer if it was malloced */ 1392 1.1 haad if (msg_malloced) { 1393 1.1 haad free(msg); 1394 1.1 haad } 1395 1.1 haad free(replyargs); 1396 1.1 haad } 1397 1.1 haad 1398 1.1 haad /* Add a reply to a command to the list of replies for this client. 1399 1.1 haad If we have got a full set then send them to the waiting client down the local 1400 1.1 haad socket */ 1401 1.1 haad static void add_reply_to_list(struct local_client *client, int status, 1402 1.1 haad const char *csid, const char *buf, int len) 1403 1.1 haad { 1404 1.1 haad struct node_reply *reply; 1405 1.1 haad 1406 1.1 haad pthread_mutex_lock(&client->bits.localsock.reply_mutex); 1407 1.1 haad 1408 1.1 haad /* Add it to the list of replies */ 1409 1.1 haad reply = malloc(sizeof(struct node_reply)); 1410 1.1 haad if (reply) { 1411 1.1 haad reply->status = status; 1412 1.1 haad clops->name_from_csid(csid, reply->node); 1413 1.1 haad DEBUGLOG("Reply from node %s: %d bytes\n", reply->node, len); 1414 1.1 haad 1415 1.1 haad if (len > 0) { 1416 1.1 haad reply->replymsg = malloc(len); 1417 1.1 haad if (!reply->replymsg) { 1418 1.1 haad reply->status = ENOMEM; 1419 1.1 haad } else { 1420 1.1 haad memcpy(reply->replymsg, buf, len); 1421 1.1 haad } 1422 1.1 haad } else { 1423 1.1 haad reply->replymsg = NULL; 1424 1.1 haad } 1425 1.1 haad /* Hook it onto the reply chain */ 1426 1.1 haad reply->next = client->bits.localsock.replies; 1427 1.1 haad client->bits.localsock.replies = reply; 1428 1.1 haad } else { 1429 1.1 haad /* It's all gone horribly wrong... */ 1430 1.1 haad pthread_mutex_unlock(&client->bits.localsock.reply_mutex); 1431 1.1 haad send_local_reply(client, ENOMEM, client->fd); 1432 1.1 haad return; 1433 1.1 haad } 1434 1.1 haad DEBUGLOG("Got %d replies, expecting: %d\n", 1435 1.1 haad client->bits.localsock.num_replies + 1, 1436 1.1 haad client->bits.localsock.expected_replies); 1437 1.1 haad 1438 1.1 haad /* If we have the whole lot then do the post-process */ 1439 1.1 haad if (++client->bits.localsock.num_replies == 1440 1.1 haad client->bits.localsock.expected_replies) { 1441 1.1 haad /* Post-process the command */ 1442 1.1 haad if (client->bits.localsock.threadid) { 1443 1.1 haad pthread_mutex_lock(&client->bits.localsock.mutex); 1444 1.1 haad client->bits.localsock.state = POST_COMMAND; 1445 1.1 haad pthread_cond_signal(&client->bits.localsock.cond); 1446 1.1 haad pthread_mutex_unlock(&client->bits.localsock.mutex); 1447 1.1 haad } 1448 1.1 haad } 1449 1.1 haad pthread_mutex_unlock(&client->bits.localsock.reply_mutex); 1450 1.1 haad } 1451 1.1 haad 1452 1.1 haad /* This is the thread that runs the PRE and post commands for a particular connection */ 1453 1.1 haad static __attribute__ ((noreturn)) void *pre_and_post_thread(void *arg) 1454 1.1 haad { 1455 1.1 haad struct local_client *client = (struct local_client *) arg; 1456 1.1 haad int status; 1457 1.1 haad int write_status; 1458 1.1 haad sigset_t ss; 1459 1.1 haad int pipe_fd = client->bits.localsock.pipe; 1460 1.1 haad 1461 1.1 haad DEBUGLOG("in sub thread: client = %p\n", client); 1462 1.1 haad 1463 1.1 haad /* Don't start until the LVM thread is ready */ 1464 1.1 haad pthread_mutex_lock(&lvm_start_mutex); 1465 1.1 haad pthread_mutex_unlock(&lvm_start_mutex); 1466 1.1 haad DEBUGLOG("Sub thread ready for work.\n"); 1467 1.1 haad 1468 1.1 haad /* Ignore SIGUSR1 (handled by master process) but enable 1469 1.1 haad SIGUSR2 (kills subthreads) */ 1470 1.1 haad sigemptyset(&ss); 1471 1.1 haad sigaddset(&ss, SIGUSR1); 1472 1.1 haad pthread_sigmask(SIG_BLOCK, &ss, NULL); 1473 1.1 haad 1474 1.1 haad sigdelset(&ss, SIGUSR1); 1475 1.1 haad sigaddset(&ss, SIGUSR2); 1476 1.1 haad pthread_sigmask(SIG_UNBLOCK, &ss, NULL); 1477 1.1 haad 1478 1.1 haad /* Loop around doing PRE and POST functions until the client goes away */ 1479 1.1 haad while (!client->bits.localsock.finished) { 1480 1.1 haad /* Execute the code */ 1481 1.1 haad status = do_pre_command(client); 1482 1.1 haad 1483 1.1 haad if (status) 1484 1.1 haad client->bits.localsock.all_success = 0; 1485 1.1 haad 1486 1.1 haad DEBUGLOG("Writing status %d down pipe %d\n", status, pipe_fd); 1487 1.1 haad 1488 1.1 haad /* Tell the parent process we have finished this bit */ 1489 1.1 haad do { 1490 1.1 haad write_status = write(pipe_fd, &status, sizeof(int)); 1491 1.1 haad if (write_status == sizeof(int)) 1492 1.1 haad break; 1493 1.1 haad if (write_status < 0 && 1494 1.1 haad (errno == EINTR || errno == EAGAIN)) 1495 1.1 haad continue; 1496 1.2 dholland log_error("Error sending to pipe: %s\n", strerror(errno)); 1497 1.1 haad break; 1498 1.1 haad } while(1); 1499 1.1 haad 1500 1.1 haad if (status) { 1501 1.1 haad client->bits.localsock.state = POST_COMMAND; 1502 1.1 haad goto next_pre; 1503 1.1 haad } 1504 1.1 haad 1505 1.1 haad /* We may need to wait for the condition variable before running the post command */ 1506 1.1 haad pthread_mutex_lock(&client->bits.localsock.mutex); 1507 1.1 haad DEBUGLOG("Waiting to do post command - state = %d\n", 1508 1.1 haad client->bits.localsock.state); 1509 1.1 haad 1510 1.1 haad if (client->bits.localsock.state != POST_COMMAND) { 1511 1.1 haad pthread_cond_wait(&client->bits.localsock.cond, 1512 1.1 haad &client->bits.localsock.mutex); 1513 1.1 haad } 1514 1.1 haad pthread_mutex_unlock(&client->bits.localsock.mutex); 1515 1.1 haad 1516 1.1 haad DEBUGLOG("Got post command condition...\n"); 1517 1.1 haad 1518 1.1 haad /* POST function must always run, even if the client aborts */ 1519 1.1 haad status = 0; 1520 1.1 haad do_post_command(client); 1521 1.1 haad 1522 1.1 haad do { 1523 1.1 haad write_status = write(pipe_fd, &status, sizeof(int)); 1524 1.1 haad if (write_status == sizeof(int)) 1525 1.1 haad break; 1526 1.1 haad if (write_status < 0 && 1527 1.1 haad (errno == EINTR || errno == EAGAIN)) 1528 1.1 haad continue; 1529 1.2 dholland log_error("Error sending to pipe: %s\n", strerror(errno)); 1530 1.1 haad break; 1531 1.1 haad } while(1); 1532 1.1 haad next_pre: 1533 1.1 haad DEBUGLOG("Waiting for next pre command\n"); 1534 1.1 haad 1535 1.1 haad pthread_mutex_lock(&client->bits.localsock.mutex); 1536 1.1 haad if (client->bits.localsock.state != PRE_COMMAND && 1537 1.1 haad !client->bits.localsock.finished) { 1538 1.1 haad pthread_cond_wait(&client->bits.localsock.cond, 1539 1.1 haad &client->bits.localsock.mutex); 1540 1.1 haad } 1541 1.1 haad pthread_mutex_unlock(&client->bits.localsock.mutex); 1542 1.1 haad 1543 1.1 haad DEBUGLOG("Got pre command condition...\n"); 1544 1.1 haad } 1545 1.1 haad DEBUGLOG("Subthread finished\n"); 1546 1.1 haad pthread_exit((void *) 0); 1547 1.1 haad } 1548 1.1 haad 1549 1.1 haad /* Process a command on the local node and store the result */ 1550 1.1 haad static int process_local_command(struct clvm_header *msg, int msglen, 1551 1.1 haad struct local_client *client, 1552 1.1 haad unsigned short xid) 1553 1.1 haad { 1554 1.1 haad char *replybuf = malloc(max_cluster_message); 1555 1.1 haad int buflen = max_cluster_message - sizeof(struct clvm_header) - 1; 1556 1.1 haad int replylen = 0; 1557 1.1 haad int status; 1558 1.1 haad 1559 1.1 haad DEBUGLOG("process_local_command: %s msg=%p, msglen =%d, client=%p\n", 1560 1.1 haad decode_cmd(msg->cmd), msg, msglen, client); 1561 1.1 haad 1562 1.1 haad if (replybuf == NULL) 1563 1.1 haad return -1; 1564 1.1 haad 1565 1.1 haad status = do_command(client, msg, msglen, &replybuf, buflen, &replylen); 1566 1.1 haad 1567 1.1 haad if (status) 1568 1.1 haad client->bits.localsock.all_success = 0; 1569 1.1 haad 1570 1.1 haad /* If we took too long then discard the reply */ 1571 1.1 haad if (xid == client->xid) { 1572 1.1 haad add_reply_to_list(client, status, our_csid, replybuf, replylen); 1573 1.1 haad } else { 1574 1.1 haad DEBUGLOG 1575 1.1 haad ("Local command took too long, discarding xid %d, current is %d\n", 1576 1.1 haad xid, client->xid); 1577 1.1 haad } 1578 1.1 haad 1579 1.1 haad free(replybuf); 1580 1.1 haad return status; 1581 1.1 haad } 1582 1.1 haad 1583 1.1 haad static int process_reply(const struct clvm_header *msg, int msglen, const char *csid) 1584 1.1 haad { 1585 1.1 haad struct local_client *client = NULL; 1586 1.1 haad 1587 1.1 haad client = find_client(msg->clientid); 1588 1.1 haad if (!client) { 1589 1.1 haad DEBUGLOG("Got message for unknown client 0x%x\n", 1590 1.1 haad msg->clientid); 1591 1.1 haad log_error("Got message for unknown client 0x%x\n", 1592 1.1 haad msg->clientid); 1593 1.1 haad return -1; 1594 1.1 haad } 1595 1.1 haad 1596 1.1 haad if (msg->status) 1597 1.1 haad client->bits.localsock.all_success = 0; 1598 1.1 haad 1599 1.1 haad /* Gather replies together for this client id */ 1600 1.1 haad if (msg->xid == client->xid) { 1601 1.1 haad add_reply_to_list(client, msg->status, csid, msg->args, 1602 1.1 haad msg->arglen); 1603 1.1 haad } else { 1604 1.1 haad DEBUGLOG("Discarding reply with old XID %d, current = %d\n", 1605 1.1 haad msg->xid, client->xid); 1606 1.1 haad } 1607 1.1 haad return 0; 1608 1.1 haad } 1609 1.1 haad 1610 1.1 haad /* Send an aggregated reply back to the client */ 1611 1.1 haad static void send_local_reply(struct local_client *client, int status, int fd) 1612 1.1 haad { 1613 1.1 haad struct clvm_header *clientreply; 1614 1.1 haad struct node_reply *thisreply = client->bits.localsock.replies; 1615 1.1 haad char *replybuf; 1616 1.1 haad char *ptr; 1617 1.1 haad int message_len = 0; 1618 1.1 haad 1619 1.1 haad DEBUGLOG("Send local reply\n"); 1620 1.1 haad 1621 1.1 haad /* Work out the total size of the reply */ 1622 1.1 haad while (thisreply) { 1623 1.1 haad if (thisreply->replymsg) 1624 1.1 haad message_len += strlen(thisreply->replymsg) + 1; 1625 1.1 haad else 1626 1.1 haad message_len++; 1627 1.1 haad 1628 1.1 haad message_len += strlen(thisreply->node) + 1 + sizeof(int); 1629 1.1 haad 1630 1.1 haad thisreply = thisreply->next; 1631 1.1 haad } 1632 1.1 haad 1633 1.1 haad /* Add in the size of our header */ 1634 1.1 haad message_len = message_len + sizeof(struct clvm_header) + 1; 1635 1.1 haad replybuf = malloc(message_len); 1636 1.1 haad 1637 1.1 haad clientreply = (struct clvm_header *) replybuf; 1638 1.1 haad clientreply->status = status; 1639 1.1 haad clientreply->cmd = CLVMD_CMD_REPLY; 1640 1.1 haad clientreply->node[0] = '\0'; 1641 1.1 haad clientreply->flags = 0; 1642 1.1 haad 1643 1.1 haad ptr = clientreply->args; 1644 1.1 haad 1645 1.1 haad /* Add in all the replies, and free them as we go */ 1646 1.1 haad thisreply = client->bits.localsock.replies; 1647 1.1 haad while (thisreply) { 1648 1.1 haad struct node_reply *tempreply = thisreply; 1649 1.1 haad 1650 1.1 haad strcpy(ptr, thisreply->node); 1651 1.1 haad ptr += strlen(thisreply->node) + 1; 1652 1.1 haad 1653 1.1 haad if (thisreply->status) 1654 1.1 haad clientreply->flags |= CLVMD_FLAG_NODEERRS; 1655 1.1 haad 1656 1.1 haad memcpy(ptr, &thisreply->status, sizeof(int)); 1657 1.1 haad ptr += sizeof(int); 1658 1.1 haad 1659 1.1 haad if (thisreply->replymsg) { 1660 1.1 haad strcpy(ptr, thisreply->replymsg); 1661 1.1 haad ptr += strlen(thisreply->replymsg) + 1; 1662 1.1 haad } else { 1663 1.1 haad ptr[0] = '\0'; 1664 1.1 haad ptr++; 1665 1.1 haad } 1666 1.1 haad thisreply = thisreply->next; 1667 1.1 haad 1668 1.1 haad free(tempreply->replymsg); 1669 1.1 haad free(tempreply); 1670 1.1 haad } 1671 1.1 haad 1672 1.1 haad /* Terminate with an empty node name */ 1673 1.1 haad *ptr = '\0'; 1674 1.1 haad 1675 1.1 haad clientreply->arglen = ptr - clientreply->args + 1; 1676 1.1 haad 1677 1.1 haad /* And send it */ 1678 1.1 haad send_message(replybuf, message_len, our_csid, fd, 1679 1.1 haad "Error sending REPLY to client"); 1680 1.1 haad free(replybuf); 1681 1.1 haad 1682 1.1 haad /* Reset comms variables */ 1683 1.1 haad client->bits.localsock.replies = NULL; 1684 1.1 haad client->bits.localsock.expected_replies = 0; 1685 1.1 haad client->bits.localsock.in_progress = FALSE; 1686 1.1 haad client->bits.localsock.sent_out = FALSE; 1687 1.1 haad } 1688 1.1 haad 1689 1.1 haad /* Just free a reply chain baceuse it wasn't used. */ 1690 1.1 haad static void free_reply(struct local_client *client) 1691 1.1 haad { 1692 1.1 haad /* Add in all the replies, and free them as we go */ 1693 1.1 haad struct node_reply *thisreply = client->bits.localsock.replies; 1694 1.1 haad while (thisreply) { 1695 1.1 haad struct node_reply *tempreply = thisreply; 1696 1.1 haad 1697 1.1 haad thisreply = thisreply->next; 1698 1.1 haad 1699 1.1 haad free(tempreply->replymsg); 1700 1.1 haad free(tempreply); 1701 1.1 haad } 1702 1.1 haad client->bits.localsock.replies = NULL; 1703 1.1 haad } 1704 1.1 haad 1705 1.1 haad /* Send our version number to the cluster */ 1706 1.1 haad static void send_version_message() 1707 1.1 haad { 1708 1.1 haad char message[sizeof(struct clvm_header) + sizeof(int) * 3]; 1709 1.1 haad struct clvm_header *msg = (struct clvm_header *) message; 1710 1.1 haad int version_nums[3]; 1711 1.1 haad 1712 1.1 haad msg->cmd = CLVMD_CMD_VERSION; 1713 1.1 haad msg->status = 0; 1714 1.1 haad msg->flags = 0; 1715 1.1 haad msg->clientid = 0; 1716 1.1 haad msg->arglen = sizeof(version_nums); 1717 1.1 haad 1718 1.1 haad version_nums[0] = htonl(CLVMD_MAJOR_VERSION); 1719 1.1 haad version_nums[1] = htonl(CLVMD_MINOR_VERSION); 1720 1.1 haad version_nums[2] = htonl(CLVMD_PATCH_VERSION); 1721 1.1 haad 1722 1.1 haad memcpy(&msg->args, version_nums, sizeof(version_nums)); 1723 1.1 haad 1724 1.1 haad hton_clvm(msg); 1725 1.1 haad 1726 1.1 haad clops->cluster_send_message(message, sizeof(message), NULL, 1727 1.1 haad "Error Sending version number"); 1728 1.1 haad } 1729 1.1 haad 1730 1.1 haad /* Send a message to either a local client or another server */ 1731 1.1 haad static int send_message(void *buf, int msglen, const char *csid, int fd, 1732 1.1 haad const char *errtext) 1733 1.1 haad { 1734 1.1 haad int len = 0; 1735 1.1 haad int saved_errno = 0; 1736 1.1 haad struct timespec delay; 1737 1.1 haad struct timespec remtime; 1738 1.1 haad 1739 1.1 haad int retry_cnt = 0; 1740 1.1 haad 1741 1.1 haad /* Send remote messages down the cluster socket */ 1742 1.1 haad if (csid == NULL || !ISLOCAL_CSID(csid)) { 1743 1.1 haad hton_clvm((struct clvm_header *) buf); 1744 1.1 haad return clops->cluster_send_message(buf, msglen, csid, errtext); 1745 1.1 haad } else { 1746 1.1 haad int ptr = 0; 1747 1.1 haad 1748 1.1 haad /* Make sure it all goes */ 1749 1.1 haad do { 1750 1.1 haad if (retry_cnt > MAX_RETRIES) 1751 1.1 haad { 1752 1.1 haad errno = saved_errno; 1753 1.1 haad log_error("%s", errtext); 1754 1.1 haad errno = saved_errno; 1755 1.1 haad break; 1756 1.1 haad } 1757 1.1 haad 1758 1.1 haad len = write(fd, buf + ptr, msglen - ptr); 1759 1.1 haad 1760 1.1 haad if (len <= 0) { 1761 1.1 haad if (errno == EINTR) 1762 1.1 haad continue; 1763 1.1 haad if (errno == EAGAIN || 1764 1.1 haad errno == EIO || 1765 1.1 haad errno == ENOSPC) { 1766 1.1 haad saved_errno = errno; 1767 1.1 haad retry_cnt++; 1768 1.1 haad 1769 1.1 haad delay.tv_sec = 0; 1770 1.1 haad delay.tv_nsec = 100000; 1771 1.1 haad remtime.tv_sec = 0; 1772 1.1 haad remtime.tv_nsec = 0; 1773 1.1 haad (void) nanosleep (&delay, &remtime); 1774 1.1 haad 1775 1.1 haad continue; 1776 1.1 haad } 1777 1.1 haad log_error("%s", errtext); 1778 1.1 haad break; 1779 1.1 haad } 1780 1.1 haad ptr += len; 1781 1.1 haad } while (ptr < msglen); 1782 1.1 haad } 1783 1.1 haad return len; 1784 1.1 haad } 1785 1.1 haad 1786 1.1 haad static int process_work_item(struct lvm_thread_cmd *cmd) 1787 1.1 haad { 1788 1.1 haad /* If msg is NULL then this is a cleanup request */ 1789 1.1 haad if (cmd->msg == NULL) { 1790 1.1 haad DEBUGLOG("process_work_item: free fd %d\n", cmd->client->fd); 1791 1.1 haad cmd_client_cleanup(cmd->client); 1792 1.1 haad free(cmd->client); 1793 1.1 haad return 0; 1794 1.1 haad } 1795 1.1 haad 1796 1.1 haad if (!cmd->remote) { 1797 1.1 haad DEBUGLOG("process_work_item: local\n"); 1798 1.1 haad process_local_command(cmd->msg, cmd->msglen, cmd->client, 1799 1.1 haad cmd->xid); 1800 1.1 haad } else { 1801 1.1 haad DEBUGLOG("process_work_item: remote\n"); 1802 1.1 haad process_remote_command(cmd->msg, cmd->msglen, cmd->client->fd, 1803 1.1 haad cmd->csid); 1804 1.1 haad } 1805 1.1 haad return 0; 1806 1.1 haad } 1807 1.1 haad 1808 1.1 haad /* 1809 1.1 haad * Routine that runs in the "LVM thread". 1810 1.1 haad */ 1811 1.2 dholland static void lvm_thread_fn(void *arg) 1812 1.1 haad { 1813 1.1 haad struct dm_list *cmdl, *tmp; 1814 1.1 haad sigset_t ss; 1815 1.1 haad int using_gulm = (int)(long)arg; 1816 1.1 haad 1817 1.1 haad DEBUGLOG("LVM thread function started\n"); 1818 1.1 haad 1819 1.1 haad /* Ignore SIGUSR1 & 2 */ 1820 1.1 haad sigemptyset(&ss); 1821 1.1 haad sigaddset(&ss, SIGUSR1); 1822 1.1 haad sigaddset(&ss, SIGUSR2); 1823 1.1 haad pthread_sigmask(SIG_BLOCK, &ss, NULL); 1824 1.1 haad 1825 1.1 haad /* Initialise the interface to liblvm */ 1826 1.1 haad init_lvm(using_gulm); 1827 1.1 haad 1828 1.1 haad /* Allow others to get moving */ 1829 1.1 haad pthread_mutex_unlock(&lvm_start_mutex); 1830 1.1 haad 1831 1.1 haad /* Now wait for some actual work */ 1832 1.1 haad for (;;) { 1833 1.1 haad DEBUGLOG("LVM thread waiting for work\n"); 1834 1.1 haad 1835 1.1 haad pthread_mutex_lock(&lvm_thread_mutex); 1836 1.1 haad if (dm_list_empty(&lvm_cmd_head)) 1837 1.1 haad pthread_cond_wait(&lvm_thread_cond, &lvm_thread_mutex); 1838 1.1 haad 1839 1.1 haad dm_list_iterate_safe(cmdl, tmp, &lvm_cmd_head) { 1840 1.1 haad struct lvm_thread_cmd *cmd; 1841 1.1 haad 1842 1.1 haad cmd = 1843 1.1 haad dm_list_struct_base(cmdl, struct lvm_thread_cmd, list); 1844 1.1 haad dm_list_del(&cmd->list); 1845 1.1 haad pthread_mutex_unlock(&lvm_thread_mutex); 1846 1.1 haad 1847 1.1 haad process_work_item(cmd); 1848 1.1 haad free(cmd->msg); 1849 1.1 haad free(cmd); 1850 1.1 haad 1851 1.1 haad pthread_mutex_lock(&lvm_thread_mutex); 1852 1.1 haad } 1853 1.1 haad pthread_mutex_unlock(&lvm_thread_mutex); 1854 1.1 haad } 1855 1.1 haad } 1856 1.1 haad 1857 1.1 haad /* Pass down some work to the LVM thread */ 1858 1.1 haad static int add_to_lvmqueue(struct local_client *client, struct clvm_header *msg, 1859 1.1 haad int msglen, const char *csid) 1860 1.1 haad { 1861 1.1 haad struct lvm_thread_cmd *cmd; 1862 1.1 haad 1863 1.1 haad cmd = malloc(sizeof(struct lvm_thread_cmd)); 1864 1.1 haad if (!cmd) 1865 1.1 haad return ENOMEM; 1866 1.1 haad 1867 1.1 haad if (msglen) { 1868 1.1 haad cmd->msg = malloc(msglen); 1869 1.1 haad if (!cmd->msg) { 1870 1.1 haad log_error("Unable to allocate buffer space\n"); 1871 1.1 haad free(cmd); 1872 1.1 haad return -1; 1873 1.1 haad } 1874 1.1 haad memcpy(cmd->msg, msg, msglen); 1875 1.1 haad } 1876 1.1 haad else { 1877 1.1 haad cmd->msg = NULL; 1878 1.1 haad } 1879 1.1 haad cmd->client = client; 1880 1.1 haad cmd->msglen = msglen; 1881 1.1 haad cmd->xid = client->xid; 1882 1.1 haad 1883 1.1 haad if (csid) { 1884 1.1 haad memcpy(cmd->csid, csid, max_csid_len); 1885 1.1 haad cmd->remote = 1; 1886 1.1 haad } else { 1887 1.1 haad cmd->remote = 0; 1888 1.1 haad } 1889 1.1 haad 1890 1.1 haad DEBUGLOG 1891 1.1 haad ("add_to_lvmqueue: cmd=%p. client=%p, msg=%p, len=%d, csid=%p, xid=%d\n", 1892 1.1 haad cmd, client, msg, msglen, csid, cmd->xid); 1893 1.1 haad pthread_mutex_lock(&lvm_thread_mutex); 1894 1.1 haad dm_list_add(&lvm_cmd_head, &cmd->list); 1895 1.1 haad pthread_cond_signal(&lvm_thread_cond); 1896 1.1 haad pthread_mutex_unlock(&lvm_thread_mutex); 1897 1.1 haad 1898 1.1 haad return 0; 1899 1.1 haad } 1900 1.1 haad 1901 1.1 haad /* Return 0 if we can talk to an existing clvmd */ 1902 1.1 haad static int check_local_clvmd(void) 1903 1.1 haad { 1904 1.1 haad int local_socket; 1905 1.1 haad struct sockaddr_un sockaddr; 1906 1.1 haad int ret = 0; 1907 1.1 haad 1908 1.1 haad /* Open local socket */ 1909 1.1 haad if ((local_socket = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { 1910 1.1 haad return -1; 1911 1.1 haad } 1912 1.1 haad 1913 1.1 haad memset(&sockaddr, 0, sizeof(sockaddr)); 1914 1.1 haad memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME)); 1915 1.1 haad sockaddr.sun_family = AF_UNIX; 1916 1.1 haad 1917 1.1 haad if (connect(local_socket,(struct sockaddr *) &sockaddr, 1918 1.1 haad sizeof(sockaddr))) { 1919 1.1 haad ret = -1; 1920 1.1 haad } 1921 1.1 haad 1922 1.1 haad close(local_socket); 1923 1.1 haad return ret; 1924 1.1 haad } 1925 1.1 haad 1926 1.1 haad 1927 1.1 haad /* Open the local socket, that's the one we talk to libclvm down */ 1928 1.1 haad static int open_local_sock() 1929 1.1 haad { 1930 1.1 haad int local_socket; 1931 1.1 haad struct sockaddr_un sockaddr; 1932 1.1 haad 1933 1.1 haad /* Open local socket */ 1934 1.1 haad if (CLVMD_SOCKNAME[0] != '\0') 1935 1.1 haad unlink(CLVMD_SOCKNAME); 1936 1.1 haad local_socket = socket(PF_UNIX, SOCK_STREAM, 0); 1937 1.1 haad if (local_socket < 0) { 1938 1.2 dholland log_error("Can't create local socket: %s", strerror(errno)); 1939 1.1 haad return -1; 1940 1.1 haad } 1941 1.1 haad /* Set Close-on-exec & non-blocking */ 1942 1.1 haad fcntl(local_socket, F_SETFD, 1); 1943 1.1 haad fcntl(local_socket, F_SETFL, fcntl(local_socket, F_GETFL, 0) | O_NONBLOCK); 1944 1.1 haad 1945 1.1 haad memset(&sockaddr, 0, sizeof(sockaddr)); 1946 1.1 haad memcpy(sockaddr.sun_path, CLVMD_SOCKNAME, sizeof(CLVMD_SOCKNAME)); 1947 1.1 haad sockaddr.sun_family = AF_UNIX; 1948 1.1 haad if (bind(local_socket, (struct sockaddr *) &sockaddr, sizeof(sockaddr))) { 1949 1.2 dholland log_error("can't bind local socket: %s", strerror(errno)); 1950 1.1 haad close(local_socket); 1951 1.1 haad return -1; 1952 1.1 haad } 1953 1.1 haad if (listen(local_socket, 1) != 0) { 1954 1.2 dholland log_error("listen local: %s", strerror(errno)); 1955 1.1 haad close(local_socket); 1956 1.1 haad return -1; 1957 1.1 haad } 1958 1.1 haad if (CLVMD_SOCKNAME[0] != '\0') 1959 1.1 haad chmod(CLVMD_SOCKNAME, 0600); 1960 1.1 haad 1961 1.1 haad return local_socket; 1962 1.1 haad } 1963 1.1 haad 1964 1.1 haad void process_message(struct local_client *client, const char *buf, int len, 1965 1.1 haad const char *csid) 1966 1.1 haad { 1967 1.1 haad struct clvm_header *inheader; 1968 1.1 haad 1969 1.1 haad inheader = (struct clvm_header *) buf; 1970 1.1 haad ntoh_clvm(inheader); /* Byteswap fields */ 1971 1.1 haad if (inheader->cmd == CLVMD_CMD_REPLY) 1972 1.1 haad process_reply(inheader, len, csid); 1973 1.1 haad else 1974 1.1 haad add_to_lvmqueue(client, inheader, len, csid); 1975 1.1 haad } 1976 1.1 haad 1977 1.1 haad 1978 1.1 haad static void check_all_callback(struct local_client *client, const char *csid, 1979 1.1 haad int node_up) 1980 1.1 haad { 1981 1.1 haad if (!node_up) 1982 1.1 haad add_reply_to_list(client, EHOSTDOWN, csid, "CLVMD not running", 1983 1.1 haad 18); 1984 1.1 haad } 1985 1.1 haad 1986 1.1 haad /* Check to see if all CLVMDs are running (ie one on 1987 1.1 haad every node in the cluster). 1988 1.1 haad If not, returns -1 and prints out a list of errant nodes */ 1989 1.1 haad static int check_all_clvmds_running(struct local_client *client) 1990 1.1 haad { 1991 1.1 haad DEBUGLOG("check_all_clvmds_running\n"); 1992 1.1 haad return clops->cluster_do_node_callback(client, check_all_callback); 1993 1.1 haad } 1994 1.1 haad 1995 1.1 haad /* Return a local_client struct given a client ID. 1996 1.1 haad client IDs are in network byte order */ 1997 1.1 haad static struct local_client *find_client(int clientid) 1998 1.1 haad { 1999 1.1 haad struct local_client *thisfd; 2000 1.1 haad for (thisfd = &local_client_head; thisfd != NULL; thisfd = thisfd->next) { 2001 1.1 haad if (thisfd->fd == ntohl(clientid)) 2002 1.1 haad return thisfd; 2003 1.1 haad } 2004 1.1 haad return NULL; 2005 1.1 haad } 2006 1.1 haad 2007 1.1 haad /* Byte-swapping routines for the header so we 2008 1.1 haad work in a heterogeneous environment */ 2009 1.1 haad static void hton_clvm(struct clvm_header *hdr) 2010 1.1 haad { 2011 1.1 haad hdr->status = htonl(hdr->status); 2012 1.1 haad hdr->arglen = htonl(hdr->arglen); 2013 1.1 haad hdr->xid = htons(hdr->xid); 2014 1.1 haad /* Don't swap clientid as it's only a token as far as 2015 1.1 haad remote nodes are concerned */ 2016 1.1 haad } 2017 1.1 haad 2018 1.1 haad static void ntoh_clvm(struct clvm_header *hdr) 2019 1.1 haad { 2020 1.1 haad hdr->status = ntohl(hdr->status); 2021 1.1 haad hdr->arglen = ntohl(hdr->arglen); 2022 1.1 haad hdr->xid = ntohs(hdr->xid); 2023 1.1 haad } 2024 1.1 haad 2025 1.1 haad /* Handler for SIGUSR2 - sent to kill subthreads */ 2026 1.1 haad static void sigusr2_handler(int sig) 2027 1.1 haad { 2028 1.1 haad DEBUGLOG("SIGUSR2 received\n"); 2029 1.1 haad return; 2030 1.1 haad } 2031 1.1 haad 2032 1.1 haad static void sigterm_handler(int sig) 2033 1.1 haad { 2034 1.1 haad DEBUGLOG("SIGTERM received\n"); 2035 1.1 haad quit = 1; 2036 1.1 haad return; 2037 1.1 haad } 2038 1.1 haad 2039 1.1 haad static void sighup_handler(int sig) 2040 1.1 haad { 2041 1.1 haad DEBUGLOG("got SIGHUP\n"); 2042 1.1 haad reread_config = 1; 2043 1.1 haad } 2044 1.1 haad 2045 1.1 haad int sync_lock(const char *resource, int mode, int flags, int *lockid) 2046 1.1 haad { 2047 1.1 haad return clops->sync_lock(resource, mode, flags, lockid); 2048 1.1 haad } 2049 1.1 haad 2050 1.1 haad int sync_unlock(const char *resource, int lockid) 2051 1.1 haad { 2052 1.1 haad return clops->sync_unlock(resource, lockid); 2053 1.1 haad } 2054 1.1 haad 2055 1.2 dholland static if_type_t parse_cluster_interface(char *ifname) 2056 1.2 dholland { 2057 1.2 dholland if_type_t iface = IF_AUTO; 2058 1.2 dholland 2059 1.2 dholland if (!strcmp(ifname, "auto")) 2060 1.2 dholland iface = IF_AUTO; 2061 1.2 dholland if (!strcmp(ifname, "cman")) 2062 1.2 dholland iface = IF_CMAN; 2063 1.2 dholland if (!strcmp(ifname, "gulm")) 2064 1.2 dholland iface = IF_GULM; 2065 1.2 dholland if (!strcmp(ifname, "openais")) 2066 1.2 dholland iface = IF_OPENAIS; 2067 1.2 dholland if (!strcmp(ifname, "corosync")) 2068 1.2 dholland iface = IF_COROSYNC; 2069 1.2 dholland 2070 1.2 dholland return iface; 2071 1.2 dholland } 2072 1.2 dholland 2073 1.2 dholland /* 2074 1.2 dholland * Try and find a cluster system in corosync's objdb, if it is running. This is 2075 1.2 dholland * only called if the command-line option is not present, and if it fails 2076 1.2 dholland * we still try the interfaces in order. 2077 1.2 dholland */ 2078 1.2 dholland static if_type_t get_cluster_type() 2079 1.2 dholland { 2080 1.2 dholland #ifdef HAVE_COROSYNC_CONFDB_H 2081 1.2 dholland confdb_handle_t handle; 2082 1.2 dholland if_type_t type = IF_AUTO; 2083 1.2 dholland int result; 2084 1.2 dholland char buf[255]; 2085 1.2 dholland size_t namelen = sizeof(buf); 2086 1.2 dholland hdb_handle_t cluster_handle; 2087 1.2 dholland hdb_handle_t clvmd_handle; 2088 1.2 dholland confdb_callbacks_t callbacks = { 2089 1.2 dholland .confdb_key_change_notify_fn = NULL, 2090 1.2 dholland .confdb_object_create_change_notify_fn = NULL, 2091 1.2 dholland .confdb_object_delete_change_notify_fn = NULL 2092 1.2 dholland }; 2093 1.2 dholland 2094 1.2 dholland result = confdb_initialize (&handle, &callbacks); 2095 1.2 dholland if (result != CS_OK) 2096 1.2 dholland return type; 2097 1.2 dholland 2098 1.2 dholland result = confdb_object_find_start(handle, OBJECT_PARENT_HANDLE); 2099 1.2 dholland if (result != CS_OK) 2100 1.2 dholland goto out; 2101 1.2 dholland 2102 1.2 dholland result = confdb_object_find(handle, OBJECT_PARENT_HANDLE, (void *)"cluster", strlen("cluster"), &cluster_handle); 2103 1.2 dholland if (result != CS_OK) 2104 1.2 dholland goto out; 2105 1.2 dholland 2106 1.2 dholland result = confdb_object_find_start(handle, cluster_handle); 2107 1.2 dholland if (result != CS_OK) 2108 1.2 dholland goto out; 2109 1.2 dholland 2110 1.2 dholland result = confdb_object_find(handle, cluster_handle, (void *)"clvmd", strlen("clvmd"), &clvmd_handle); 2111 1.2 dholland if (result != CS_OK) 2112 1.2 dholland goto out; 2113 1.2 dholland 2114 1.2 dholland result = confdb_key_get(handle, clvmd_handle, (void *)"interface", strlen("interface"), buf, &namelen); 2115 1.2 dholland if (result != CS_OK) 2116 1.2 dholland goto out; 2117 1.2 dholland 2118 1.2 dholland buf[namelen] = '\0'; 2119 1.2 dholland type = parse_cluster_interface(buf); 2120 1.2 dholland DEBUGLOG("got interface type '%s' from confdb\n", buf); 2121 1.2 dholland out: 2122 1.2 dholland confdb_finalize(handle); 2123 1.2 dholland return type; 2124 1.2 dholland #else 2125 1.2 dholland return IF_AUTO; 2126 1.2 dholland #endif 2127 1.2 dholland } 2128